parallel_test.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. package ipam
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "net"
  7. "sort"
  8. "sync"
  9. "sync/atomic"
  10. "testing"
  11. "time"
  12. "github.com/docker/docker/libnetwork/ipamapi"
  13. "github.com/docker/docker/libnetwork/ipamutils"
  14. "golang.org/x/sync/errgroup"
  15. "golang.org/x/sync/semaphore"
  16. "gotest.tools/v3/assert"
  17. is "gotest.tools/v3/assert/cmp"
  18. )
  19. const (
  20. all = iota
  21. even
  22. odd
  23. )
  24. type releaseMode uint
  25. type testContext struct {
  26. a *Allocator
  27. opts map[string]string
  28. ipList []*net.IPNet
  29. ipMap map[string]bool
  30. pid string
  31. maxIP int
  32. }
  33. func newTestContext(t *testing.T, mask int, options map[string]string) *testContext {
  34. a, err := NewAllocator(ipamutils.GetLocalScopeDefaultNetworks(), ipamutils.GetGlobalScopeDefaultNetworks())
  35. if err != nil {
  36. t.Fatal(err)
  37. }
  38. network := fmt.Sprintf("192.168.100.0/%d", mask)
  39. // total ips 2^(32-mask) - 2 (network and broadcast)
  40. totalIps := 1<<uint(32-mask) - 2
  41. pid, _, _, err := a.RequestPool(localAddressSpace, network, "", nil, false)
  42. if err != nil {
  43. t.Fatal(err)
  44. }
  45. return &testContext{
  46. a: a,
  47. opts: options,
  48. ipList: make([]*net.IPNet, 0, totalIps),
  49. ipMap: make(map[string]bool),
  50. pid: pid,
  51. maxIP: totalIps,
  52. }
  53. }
  54. func TestDebug(t *testing.T) {
  55. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  56. tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  57. tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  58. }
  59. type op struct {
  60. id int32
  61. add bool
  62. name string
  63. }
  64. func (o *op) String() string {
  65. return fmt.Sprintf("%+v", *o)
  66. }
  67. func TestRequestPoolParallel(t *testing.T) {
  68. a, err := NewAllocator(ipamutils.GetLocalScopeDefaultNetworks(), ipamutils.GetGlobalScopeDefaultNetworks())
  69. if err != nil {
  70. t.Fatal(err)
  71. }
  72. var operationIndex int32
  73. ch := make(chan *op, 240)
  74. group := new(errgroup.Group)
  75. defer func() {
  76. if err := group.Wait(); err != nil {
  77. t.Fatal(err)
  78. }
  79. }()
  80. for i := 0; i < 120; i++ {
  81. group.Go(func() error {
  82. name, _, _, err := a.RequestPool("GlobalDefault", "", "", nil, false)
  83. if err != nil {
  84. t.Log(err) // log so we can see the error in real time rather than at the end when we actually call "Wait".
  85. return fmt.Errorf("request error %v", err)
  86. }
  87. idx := atomic.AddInt32(&operationIndex, 1)
  88. ch <- &op{idx, true, name}
  89. time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
  90. idx = atomic.AddInt32(&operationIndex, 1)
  91. err = a.ReleasePool(name)
  92. if err != nil {
  93. t.Log(err) // log so we can see the error in real time rather than at the end when we actually call "Wait".
  94. return fmt.Errorf("release error %v", err)
  95. }
  96. ch <- &op{idx, false, name}
  97. return nil
  98. })
  99. }
  100. // map of events
  101. m := make(map[string][]*op)
  102. for i := 0; i < 240; i++ {
  103. x := <-ch
  104. ops, ok := m[x.name]
  105. if !ok {
  106. ops = make([]*op, 0, 10)
  107. }
  108. ops = append(ops, x)
  109. m[x.name] = ops
  110. }
  111. // Post processing to avoid event reordering on the channel
  112. for pool, ops := range m {
  113. sort.Slice(ops[:], func(i, j int) bool {
  114. return ops[i].id < ops[j].id
  115. })
  116. expected := true
  117. for _, op := range ops {
  118. if op.add != expected {
  119. t.Fatalf("Operations for %v not valid %v, operations %v", pool, op, ops)
  120. }
  121. expected = !expected
  122. }
  123. }
  124. }
  125. func TestFullAllocateRelease(t *testing.T) {
  126. for _, parallelism := range []int64{2, 4, 8} {
  127. for _, mask := range []int{29, 25, 24, 21} {
  128. tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  129. allocate(t, tctx, parallelism)
  130. release(t, tctx, all, parallelism)
  131. }
  132. }
  133. }
  134. func TestOddAllocateRelease(t *testing.T) {
  135. for _, parallelism := range []int64{2, 4, 8} {
  136. for _, mask := range []int{29, 25, 24, 21} {
  137. tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  138. allocate(t, tctx, parallelism)
  139. release(t, tctx, odd, parallelism)
  140. }
  141. }
  142. }
  143. func TestFullAllocateSerialReleaseParallel(t *testing.T) {
  144. for _, parallelism := range []int64{1, 4, 8} {
  145. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  146. allocate(t, tctx, 1)
  147. release(t, tctx, all, parallelism)
  148. }
  149. }
  150. func TestOddAllocateSerialReleaseParallel(t *testing.T) {
  151. for _, parallelism := range []int64{1, 4, 8} {
  152. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  153. allocate(t, tctx, 1)
  154. release(t, tctx, odd, parallelism)
  155. }
  156. }
  157. func TestEvenAllocateSerialReleaseParallel(t *testing.T) {
  158. for _, parallelism := range []int64{1, 4, 8} {
  159. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  160. allocate(t, tctx, 1)
  161. release(t, tctx, even, parallelism)
  162. }
  163. }
  164. func allocate(t *testing.T, tctx *testContext, parallel int64) {
  165. // Allocate the whole space
  166. parallelExec := semaphore.NewWeighted(parallel)
  167. routineNum := tctx.maxIP + 10
  168. ch := make(chan *net.IPNet, routineNum)
  169. var id int
  170. var wg sync.WaitGroup
  171. // routine loop
  172. for {
  173. wg.Add(1)
  174. go func(id int) {
  175. parallelExec.Acquire(context.Background(), 1)
  176. ip, _, _ := tctx.a.RequestAddress(tctx.pid, nil, tctx.opts)
  177. ch <- ip
  178. parallelExec.Release(1)
  179. wg.Done()
  180. }(id)
  181. id++
  182. if id == routineNum {
  183. break
  184. }
  185. }
  186. // give time to all the go routines to finish
  187. wg.Wait()
  188. // process results
  189. for i := 0; i < routineNum; i++ {
  190. ip := <-ch
  191. if ip == nil {
  192. continue
  193. }
  194. if there, ok := tctx.ipMap[ip.String()]; ok && there {
  195. t.Fatalf("Got duplicate IP %s", ip.String())
  196. }
  197. tctx.ipList = append(tctx.ipList, ip)
  198. tctx.ipMap[ip.String()] = true
  199. }
  200. assert.Assert(t, is.Len(tctx.ipList, tctx.maxIP))
  201. }
  202. func release(t *testing.T, tctx *testContext, mode releaseMode, parallel int64) {
  203. var startIndex, increment, stopIndex, length int
  204. switch mode {
  205. case all:
  206. startIndex = 0
  207. increment = 1
  208. stopIndex = tctx.maxIP - 1
  209. length = tctx.maxIP
  210. case odd, even:
  211. if mode == odd {
  212. startIndex = 1
  213. }
  214. increment = 2
  215. stopIndex = tctx.maxIP - 1
  216. length = tctx.maxIP / 2
  217. if tctx.maxIP%2 > 0 {
  218. length++
  219. }
  220. default:
  221. t.Fatal("unsupported mode yet")
  222. }
  223. ipIndex := make([]int, 0, length)
  224. // calculate the index to release from the ipList
  225. for i := startIndex; ; i += increment {
  226. ipIndex = append(ipIndex, i)
  227. if i+increment > stopIndex {
  228. break
  229. }
  230. }
  231. var id int
  232. parallelExec := semaphore.NewWeighted(parallel)
  233. ch := make(chan *net.IPNet, len(ipIndex))
  234. group := new(errgroup.Group)
  235. for index := range ipIndex {
  236. index := index
  237. group.Go(func() error {
  238. parallelExec.Acquire(context.Background(), 1)
  239. err := tctx.a.ReleaseAddress(tctx.pid, tctx.ipList[index].IP)
  240. if err != nil {
  241. return fmt.Errorf("routine %d got %v", id, err)
  242. }
  243. ch <- tctx.ipList[index]
  244. parallelExec.Release(1)
  245. return nil
  246. })
  247. id++
  248. }
  249. if err := group.Wait(); err != nil {
  250. t.Fatal(err)
  251. }
  252. for i := 0; i < len(ipIndex); i++ {
  253. ip := <-ch
  254. // check if it is really free
  255. _, _, err := tctx.a.RequestAddress(tctx.pid, ip.IP, nil)
  256. assert.Check(t, err, "ip %v not properly released", ip)
  257. if err != nil {
  258. t.Fatalf("ip %v not properly released, error:%v", ip, err)
  259. }
  260. err = tctx.a.ReleaseAddress(tctx.pid, ip.IP)
  261. assert.NilError(t, err)
  262. if there, ok := tctx.ipMap[ip.String()]; !ok || !there {
  263. t.Fatalf("ip %v got double deallocated", ip)
  264. }
  265. tctx.ipMap[ip.String()] = false
  266. for j, v := range tctx.ipList {
  267. if v == ip {
  268. tctx.ipList = append(tctx.ipList[:j], tctx.ipList[j+1:]...)
  269. break
  270. }
  271. }
  272. }
  273. assert.Check(t, is.Len(tctx.ipList, tctx.maxIP-length))
  274. }