parallel_test.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. package ipam
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "net"
  7. "sort"
  8. "sync"
  9. "sync/atomic"
  10. "testing"
  11. "time"
  12. "github.com/docker/docker/libnetwork/ipamapi"
  13. "golang.org/x/sync/errgroup"
  14. "golang.org/x/sync/semaphore"
  15. "gotest.tools/v3/assert"
  16. is "gotest.tools/v3/assert/cmp"
  17. )
  18. const (
  19. all = iota
  20. even
  21. odd
  22. )
  23. type releaseMode uint
  24. type testContext struct {
  25. a *Allocator
  26. opts map[string]string
  27. ipList []*net.IPNet
  28. ipMap map[string]bool
  29. pid string
  30. maxIP int
  31. }
  32. func newTestContext(t *testing.T, mask int, options map[string]string) *testContext {
  33. a, err := getAllocator(false)
  34. if err != nil {
  35. t.Fatal(err)
  36. }
  37. a.addrSpaces["giallo"] = &addrSpace{
  38. id: dsConfigKey + "/" + "giallo",
  39. ds: a.addrSpaces[localAddressSpace].ds,
  40. alloc: a.addrSpaces[localAddressSpace].alloc,
  41. scope: a.addrSpaces[localAddressSpace].scope,
  42. subnets: map[SubnetKey]*PoolData{},
  43. }
  44. network := fmt.Sprintf("192.168.100.0/%d", mask)
  45. // total ips 2^(32-mask) - 2 (network and broadcast)
  46. totalIps := 1<<uint(32-mask) - 2
  47. pid, _, _, err := a.RequestPool("giallo", network, "", nil, false)
  48. if err != nil {
  49. t.Fatal(err)
  50. }
  51. return &testContext{
  52. a: a,
  53. opts: options,
  54. ipList: make([]*net.IPNet, 0, totalIps),
  55. ipMap: make(map[string]bool),
  56. pid: pid,
  57. maxIP: totalIps,
  58. }
  59. }
  60. func TestDebug(t *testing.T) {
  61. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  62. tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  63. tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  64. }
  65. type op struct {
  66. id int32
  67. add bool
  68. name string
  69. }
  70. func (o *op) String() string {
  71. return fmt.Sprintf("%+v", *o)
  72. }
  73. func TestRequestPoolParallel(t *testing.T) {
  74. a, err := getAllocator(false)
  75. if err != nil {
  76. t.Fatal(err)
  77. }
  78. var operationIndex int32
  79. ch := make(chan *op, 240)
  80. group := new(errgroup.Group)
  81. defer func() {
  82. if err := group.Wait(); err != nil {
  83. t.Fatal(err)
  84. }
  85. }()
  86. for i := 0; i < 120; i++ {
  87. group.Go(func() error {
  88. name, _, _, err := a.RequestPool("GlobalDefault", "", "", nil, false)
  89. if err != nil {
  90. t.Log(err) // log so we can see the error in real time rather than at the end when we actually call "Wait".
  91. return fmt.Errorf("request error %v", err)
  92. }
  93. idx := atomic.AddInt32(&operationIndex, 1)
  94. ch <- &op{idx, true, name}
  95. time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
  96. idx = atomic.AddInt32(&operationIndex, 1)
  97. err = a.ReleasePool(name)
  98. if err != nil {
  99. t.Log(err) // log so we can see the error in real time rather than at the end when we actually call "Wait".
  100. return fmt.Errorf("release error %v", err)
  101. }
  102. ch <- &op{idx, false, name}
  103. return nil
  104. })
  105. }
  106. // map of events
  107. m := make(map[string][]*op)
  108. for i := 0; i < 240; i++ {
  109. x := <-ch
  110. ops, ok := m[x.name]
  111. if !ok {
  112. ops = make([]*op, 0, 10)
  113. }
  114. ops = append(ops, x)
  115. m[x.name] = ops
  116. }
  117. // Post processing to avoid event reordering on the channel
  118. for pool, ops := range m {
  119. sort.Slice(ops[:], func(i, j int) bool {
  120. return ops[i].id < ops[j].id
  121. })
  122. expected := true
  123. for _, op := range ops {
  124. if op.add != expected {
  125. t.Fatalf("Operations for %v not valid %v, operations %v", pool, op, ops)
  126. }
  127. expected = !expected
  128. }
  129. }
  130. }
  131. func TestFullAllocateRelease(t *testing.T) {
  132. for _, parallelism := range []int64{2, 4, 8} {
  133. for _, mask := range []int{29, 25, 24, 21} {
  134. tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  135. allocate(t, tctx, parallelism)
  136. release(t, tctx, all, parallelism)
  137. }
  138. }
  139. }
  140. func TestOddAllocateRelease(t *testing.T) {
  141. for _, parallelism := range []int64{2, 4, 8} {
  142. for _, mask := range []int{29, 25, 24, 21} {
  143. tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  144. allocate(t, tctx, parallelism)
  145. release(t, tctx, odd, parallelism)
  146. }
  147. }
  148. }
  149. func TestFullAllocateSerialReleaseParallel(t *testing.T) {
  150. for _, parallelism := range []int64{1, 4, 8} {
  151. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  152. allocate(t, tctx, 1)
  153. release(t, tctx, all, parallelism)
  154. }
  155. }
  156. func TestOddAllocateSerialReleaseParallel(t *testing.T) {
  157. for _, parallelism := range []int64{1, 4, 8} {
  158. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  159. allocate(t, tctx, 1)
  160. release(t, tctx, odd, parallelism)
  161. }
  162. }
  163. func TestEvenAllocateSerialReleaseParallel(t *testing.T) {
  164. for _, parallelism := range []int64{1, 4, 8} {
  165. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  166. allocate(t, tctx, 1)
  167. release(t, tctx, even, parallelism)
  168. }
  169. }
  170. func allocate(t *testing.T, tctx *testContext, parallel int64) {
  171. // Allocate the whole space
  172. parallelExec := semaphore.NewWeighted(parallel)
  173. routineNum := tctx.maxIP + 10
  174. ch := make(chan *net.IPNet, routineNum)
  175. var id int
  176. var wg sync.WaitGroup
  177. // routine loop
  178. for {
  179. wg.Add(1)
  180. go func(id int) {
  181. parallelExec.Acquire(context.Background(), 1)
  182. ip, _, _ := tctx.a.RequestAddress(tctx.pid, nil, tctx.opts)
  183. ch <- ip
  184. parallelExec.Release(1)
  185. wg.Done()
  186. }(id)
  187. id++
  188. if id == routineNum {
  189. break
  190. }
  191. }
  192. // give time to all the go routines to finish
  193. wg.Wait()
  194. // process results
  195. for i := 0; i < routineNum; i++ {
  196. ip := <-ch
  197. if ip == nil {
  198. continue
  199. }
  200. if there, ok := tctx.ipMap[ip.String()]; ok && there {
  201. t.Fatalf("Got duplicate IP %s", ip.String())
  202. }
  203. tctx.ipList = append(tctx.ipList, ip)
  204. tctx.ipMap[ip.String()] = true
  205. }
  206. assert.Assert(t, is.Len(tctx.ipList, tctx.maxIP))
  207. }
  208. func release(t *testing.T, tctx *testContext, mode releaseMode, parallel int64) {
  209. var startIndex, increment, stopIndex, length int
  210. switch mode {
  211. case all:
  212. startIndex = 0
  213. increment = 1
  214. stopIndex = tctx.maxIP - 1
  215. length = tctx.maxIP
  216. case odd, even:
  217. if mode == odd {
  218. startIndex = 1
  219. }
  220. increment = 2
  221. stopIndex = tctx.maxIP - 1
  222. length = tctx.maxIP / 2
  223. if tctx.maxIP%2 > 0 {
  224. length++
  225. }
  226. default:
  227. t.Fatal("unsupported mode yet")
  228. }
  229. ipIndex := make([]int, 0, length)
  230. // calculate the index to release from the ipList
  231. for i := startIndex; ; i += increment {
  232. ipIndex = append(ipIndex, i)
  233. if i+increment > stopIndex {
  234. break
  235. }
  236. }
  237. var id int
  238. parallelExec := semaphore.NewWeighted(parallel)
  239. ch := make(chan *net.IPNet, len(ipIndex))
  240. group := new(errgroup.Group)
  241. for index := range ipIndex {
  242. index := index
  243. group.Go(func() error {
  244. parallelExec.Acquire(context.Background(), 1)
  245. err := tctx.a.ReleaseAddress(tctx.pid, tctx.ipList[index].IP)
  246. if err != nil {
  247. return fmt.Errorf("routine %d got %v", id, err)
  248. }
  249. ch <- tctx.ipList[index]
  250. parallelExec.Release(1)
  251. return nil
  252. })
  253. id++
  254. }
  255. if err := group.Wait(); err != nil {
  256. t.Fatal(err)
  257. }
  258. for i := 0; i < len(ipIndex); i++ {
  259. ip := <-ch
  260. // check if it is really free
  261. _, _, err := tctx.a.RequestAddress(tctx.pid, ip.IP, nil)
  262. assert.Check(t, err, "ip %v not properly released", ip)
  263. if err != nil {
  264. t.Fatalf("ip %v not properly released, error:%v", ip, err)
  265. }
  266. err = tctx.a.ReleaseAddress(tctx.pid, ip.IP)
  267. assert.NilError(t, err)
  268. if there, ok := tctx.ipMap[ip.String()]; !ok || !there {
  269. t.Fatalf("ip %v got double deallocated", ip)
  270. }
  271. tctx.ipMap[ip.String()] = false
  272. for j, v := range tctx.ipList {
  273. if v == ip {
  274. tctx.ipList = append(tctx.ipList[:j], tctx.ipList[j+1:]...)
  275. break
  276. }
  277. }
  278. }
  279. assert.Check(t, is.Len(tctx.ipList, tctx.maxIP-length))
  280. }