parallel_test.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. package ipam
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "net"
  7. "sort"
  8. "sync"
  9. "sync/atomic"
  10. "testing"
  11. "time"
  12. "github.com/docker/libnetwork/ipamapi"
  13. "golang.org/x/sync/semaphore"
  14. "gotest.tools/assert"
  15. is "gotest.tools/assert/cmp"
  16. )
  17. const (
  18. all = iota
  19. even
  20. odd
  21. )
  22. type releaseMode uint
  23. type testContext struct {
  24. a *Allocator
  25. opts map[string]string
  26. ipList []*net.IPNet
  27. ipMap map[string]bool
  28. pid string
  29. maxIP int
  30. }
  31. func newTestContext(t *testing.T, mask int, options map[string]string) *testContext {
  32. a, err := getAllocator(false)
  33. if err != nil {
  34. t.Fatal(err)
  35. }
  36. a.addrSpaces["giallo"] = &addrSpace{
  37. id: dsConfigKey + "/" + "giallo",
  38. ds: a.addrSpaces[localAddressSpace].ds,
  39. alloc: a.addrSpaces[localAddressSpace].alloc,
  40. scope: a.addrSpaces[localAddressSpace].scope,
  41. subnets: map[SubnetKey]*PoolData{},
  42. }
  43. network := fmt.Sprintf("192.168.100.0/%d", mask)
  44. // total ips 2^(32-mask) - 2 (network and broadcast)
  45. totalIps := 1<<uint(32-mask) - 2
  46. pid, _, _, err := a.RequestPool("giallo", network, "", nil, false)
  47. if err != nil {
  48. t.Fatal(err)
  49. }
  50. return &testContext{
  51. a: a,
  52. opts: options,
  53. ipList: make([]*net.IPNet, 0, totalIps),
  54. ipMap: make(map[string]bool),
  55. pid: pid,
  56. maxIP: totalIps,
  57. }
  58. }
  59. func TestDebug(t *testing.T) {
  60. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  61. tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  62. tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  63. }
  64. type op struct {
  65. id int32
  66. add bool
  67. name string
  68. }
  69. func (o *op) String() string {
  70. return fmt.Sprintf("%+v", *o)
  71. }
  72. func TestRequestPoolParallel(t *testing.T) {
  73. a, err := getAllocator(false)
  74. if err != nil {
  75. t.Fatal(err)
  76. }
  77. var operationIndex int32
  78. ch := make(chan *op, 240)
  79. for i := 0; i < 120; i++ {
  80. go func(t *testing.T, a *Allocator, ch chan *op) {
  81. name, _, _, err := a.RequestPool("GlobalDefault", "", "", nil, false)
  82. if err != nil {
  83. t.Fatalf("request error %v", err)
  84. }
  85. idx := atomic.AddInt32(&operationIndex, 1)
  86. ch <- &op{idx, true, name}
  87. time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
  88. idx = atomic.AddInt32(&operationIndex, 1)
  89. err = a.ReleasePool(name)
  90. if err != nil {
  91. t.Fatalf("release error %v", err)
  92. }
  93. ch <- &op{idx, false, name}
  94. }(t, a, ch)
  95. }
  96. // map of events
  97. m := make(map[string][]*op)
  98. for i := 0; i < 240; i++ {
  99. x := <-ch
  100. ops, ok := m[x.name]
  101. if !ok {
  102. ops = make([]*op, 0, 10)
  103. }
  104. ops = append(ops, x)
  105. m[x.name] = ops
  106. }
  107. // Post processing to avoid event reordering on the channel
  108. for pool, ops := range m {
  109. sort.Slice(ops[:], func(i, j int) bool {
  110. return ops[i].id < ops[j].id
  111. })
  112. expected := true
  113. for _, op := range ops {
  114. if op.add != expected {
  115. t.Fatalf("Operations for %v not valid %v, operations %v", pool, op, ops)
  116. }
  117. expected = !expected
  118. }
  119. }
  120. }
  121. func TestFullAllocateRelease(t *testing.T) {
  122. for _, parallelism := range []int64{2, 4, 8} {
  123. for _, mask := range []int{29, 25, 24, 21} {
  124. tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  125. allocate(t, tctx, parallelism)
  126. release(t, tctx, all, parallelism)
  127. }
  128. }
  129. }
  130. func TestOddAllocateRelease(t *testing.T) {
  131. for _, parallelism := range []int64{2, 4, 8} {
  132. for _, mask := range []int{29, 25, 24, 21} {
  133. tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  134. allocate(t, tctx, parallelism)
  135. release(t, tctx, odd, parallelism)
  136. }
  137. }
  138. }
  139. func TestFullAllocateSerialReleaseParallel(t *testing.T) {
  140. for _, parallelism := range []int64{1, 4, 8} {
  141. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  142. allocate(t, tctx, 1)
  143. release(t, tctx, all, parallelism)
  144. }
  145. }
  146. func TestOddAllocateSerialReleaseParallel(t *testing.T) {
  147. for _, parallelism := range []int64{1, 4, 8} {
  148. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  149. allocate(t, tctx, 1)
  150. release(t, tctx, odd, parallelism)
  151. }
  152. }
  153. func TestEvenAllocateSerialReleaseParallel(t *testing.T) {
  154. for _, parallelism := range []int64{1, 4, 8} {
  155. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  156. allocate(t, tctx, 1)
  157. release(t, tctx, even, parallelism)
  158. }
  159. }
  160. func allocate(t *testing.T, tctx *testContext, parallel int64) {
  161. // Allocate the whole space
  162. parallelExec := semaphore.NewWeighted(parallel)
  163. routineNum := tctx.maxIP + 10
  164. ch := make(chan *net.IPNet, routineNum)
  165. var id int
  166. var wg sync.WaitGroup
  167. // routine loop
  168. for {
  169. wg.Add(1)
  170. go func(id int) {
  171. parallelExec.Acquire(context.Background(), 1)
  172. ip, _, _ := tctx.a.RequestAddress(tctx.pid, nil, tctx.opts)
  173. ch <- ip
  174. parallelExec.Release(1)
  175. wg.Done()
  176. }(id)
  177. id++
  178. if id == routineNum {
  179. break
  180. }
  181. }
  182. // give time to all the go routines to finish
  183. wg.Wait()
  184. // process results
  185. for i := 0; i < routineNum; i++ {
  186. ip := <-ch
  187. if ip == nil {
  188. continue
  189. }
  190. if there, ok := tctx.ipMap[ip.String()]; ok && there {
  191. t.Fatalf("Got duplicate IP %s", ip.String())
  192. break
  193. }
  194. tctx.ipList = append(tctx.ipList, ip)
  195. tctx.ipMap[ip.String()] = true
  196. }
  197. assert.Check(t, is.Len(tctx.ipList, tctx.maxIP))
  198. if len(tctx.ipList) != tctx.maxIP {
  199. t.Fatal("mismatch number allocation")
  200. }
  201. }
  202. func release(t *testing.T, tctx *testContext, mode releaseMode, parallel int64) {
  203. var startIndex, increment, stopIndex, length int
  204. switch mode {
  205. case all:
  206. startIndex = 0
  207. increment = 1
  208. stopIndex = tctx.maxIP - 1
  209. length = tctx.maxIP
  210. case odd, even:
  211. if mode == odd {
  212. startIndex = 1
  213. }
  214. increment = 2
  215. stopIndex = tctx.maxIP - 1
  216. length = tctx.maxIP / 2
  217. if tctx.maxIP%2 > 0 {
  218. length++
  219. }
  220. default:
  221. t.Fatal("unsupported mode yet")
  222. }
  223. ipIndex := make([]int, 0, length)
  224. // calculate the index to release from the ipList
  225. for i := startIndex; ; i += increment {
  226. ipIndex = append(ipIndex, i)
  227. if i+increment > stopIndex {
  228. break
  229. }
  230. }
  231. var id int
  232. parallelExec := semaphore.NewWeighted(parallel)
  233. ch := make(chan *net.IPNet, len(ipIndex))
  234. wg := sync.WaitGroup{}
  235. for index := range ipIndex {
  236. wg.Add(1)
  237. go func(id, index int) {
  238. parallelExec.Acquire(context.Background(), 1)
  239. // logrus.Errorf("index %v", index)
  240. // logrus.Errorf("list %v", tctx.ipList)
  241. err := tctx.a.ReleaseAddress(tctx.pid, tctx.ipList[index].IP)
  242. if err != nil {
  243. t.Fatalf("routine %d got %v", id, err)
  244. }
  245. ch <- tctx.ipList[index]
  246. parallelExec.Release(1)
  247. wg.Done()
  248. }(id, index)
  249. id++
  250. }
  251. wg.Wait()
  252. for i := 0; i < len(ipIndex); i++ {
  253. ip := <-ch
  254. // check if it is really free
  255. _, _, err := tctx.a.RequestAddress(tctx.pid, ip.IP, nil)
  256. assert.Check(t, err, "ip %v not properly released", ip)
  257. if err != nil {
  258. t.Fatalf("ip %v not properly released, error:%v", ip, err)
  259. }
  260. err = tctx.a.ReleaseAddress(tctx.pid, ip.IP)
  261. assert.NilError(t, err)
  262. if there, ok := tctx.ipMap[ip.String()]; !ok || !there {
  263. t.Fatalf("ip %v got double deallocated", ip)
  264. }
  265. tctx.ipMap[ip.String()] = false
  266. for j, v := range tctx.ipList {
  267. if v == ip {
  268. tctx.ipList = append(tctx.ipList[:j], tctx.ipList[j+1:]...)
  269. break
  270. }
  271. }
  272. }
  273. assert.Check(t, is.Len(tctx.ipList, tctx.maxIP-length))
  274. }