parallel_test.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. package ipam
  2. import (
  3. "context"
  4. "fmt"
  5. "math/rand"
  6. "net"
  7. "sort"
  8. "sync"
  9. "sync/atomic"
  10. "testing"
  11. "time"
  12. "github.com/docker/docker/libnetwork/ipamapi"
  13. "github.com/docker/docker/libnetwork/ipamutils"
  14. "golang.org/x/sync/errgroup"
  15. "golang.org/x/sync/semaphore"
  16. "gotest.tools/v3/assert"
  17. is "gotest.tools/v3/assert/cmp"
  18. )
  19. const (
  20. all = iota
  21. even
  22. odd
  23. )
  24. type releaseMode uint
  25. type testContext struct {
  26. a *Allocator
  27. opts map[string]string
  28. ipList []*net.IPNet
  29. ipMap map[string]bool
  30. pid string
  31. maxIP int
  32. }
  33. func newTestContext(t *testing.T, mask int, options map[string]string) *testContext {
  34. a, err := NewAllocator(ipamutils.GetLocalScopeDefaultNetworks(), ipamutils.GetGlobalScopeDefaultNetworks())
  35. if err != nil {
  36. t.Fatal(err)
  37. }
  38. a.addrSpaces["giallo"] = &addrSpace{
  39. alloc: a.addrSpaces[localAddressSpace].alloc,
  40. subnets: map[SubnetKey]*PoolData{},
  41. }
  42. network := fmt.Sprintf("192.168.100.0/%d", mask)
  43. // total ips 2^(32-mask) - 2 (network and broadcast)
  44. totalIps := 1<<uint(32-mask) - 2
  45. pid, _, _, err := a.RequestPool("giallo", network, "", nil, false)
  46. if err != nil {
  47. t.Fatal(err)
  48. }
  49. return &testContext{
  50. a: a,
  51. opts: options,
  52. ipList: make([]*net.IPNet, 0, totalIps),
  53. ipMap: make(map[string]bool),
  54. pid: pid,
  55. maxIP: totalIps,
  56. }
  57. }
  58. func TestDebug(t *testing.T) {
  59. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  60. tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  61. tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  62. }
  63. type op struct {
  64. id int32
  65. add bool
  66. name string
  67. }
  68. func (o *op) String() string {
  69. return fmt.Sprintf("%+v", *o)
  70. }
  71. func TestRequestPoolParallel(t *testing.T) {
  72. a, err := NewAllocator(ipamutils.GetLocalScopeDefaultNetworks(), ipamutils.GetGlobalScopeDefaultNetworks())
  73. if err != nil {
  74. t.Fatal(err)
  75. }
  76. var operationIndex int32
  77. ch := make(chan *op, 240)
  78. group := new(errgroup.Group)
  79. defer func() {
  80. if err := group.Wait(); err != nil {
  81. t.Fatal(err)
  82. }
  83. }()
  84. for i := 0; i < 120; i++ {
  85. group.Go(func() error {
  86. name, _, _, err := a.RequestPool("GlobalDefault", "", "", nil, false)
  87. if err != nil {
  88. t.Log(err) // log so we can see the error in real time rather than at the end when we actually call "Wait".
  89. return fmt.Errorf("request error %v", err)
  90. }
  91. idx := atomic.AddInt32(&operationIndex, 1)
  92. ch <- &op{idx, true, name}
  93. time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
  94. idx = atomic.AddInt32(&operationIndex, 1)
  95. err = a.ReleasePool(name)
  96. if err != nil {
  97. t.Log(err) // log so we can see the error in real time rather than at the end when we actually call "Wait".
  98. return fmt.Errorf("release error %v", err)
  99. }
  100. ch <- &op{idx, false, name}
  101. return nil
  102. })
  103. }
  104. // map of events
  105. m := make(map[string][]*op)
  106. for i := 0; i < 240; i++ {
  107. x := <-ch
  108. ops, ok := m[x.name]
  109. if !ok {
  110. ops = make([]*op, 0, 10)
  111. }
  112. ops = append(ops, x)
  113. m[x.name] = ops
  114. }
  115. // Post processing to avoid event reordering on the channel
  116. for pool, ops := range m {
  117. sort.Slice(ops[:], func(i, j int) bool {
  118. return ops[i].id < ops[j].id
  119. })
  120. expected := true
  121. for _, op := range ops {
  122. if op.add != expected {
  123. t.Fatalf("Operations for %v not valid %v, operations %v", pool, op, ops)
  124. }
  125. expected = !expected
  126. }
  127. }
  128. }
  129. func TestFullAllocateRelease(t *testing.T) {
  130. for _, parallelism := range []int64{2, 4, 8} {
  131. for _, mask := range []int{29, 25, 24, 21} {
  132. tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  133. allocate(t, tctx, parallelism)
  134. release(t, tctx, all, parallelism)
  135. }
  136. }
  137. }
  138. func TestOddAllocateRelease(t *testing.T) {
  139. for _, parallelism := range []int64{2, 4, 8} {
  140. for _, mask := range []int{29, 25, 24, 21} {
  141. tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  142. allocate(t, tctx, parallelism)
  143. release(t, tctx, odd, parallelism)
  144. }
  145. }
  146. }
  147. func TestFullAllocateSerialReleaseParallel(t *testing.T) {
  148. for _, parallelism := range []int64{1, 4, 8} {
  149. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  150. allocate(t, tctx, 1)
  151. release(t, tctx, all, parallelism)
  152. }
  153. }
  154. func TestOddAllocateSerialReleaseParallel(t *testing.T) {
  155. for _, parallelism := range []int64{1, 4, 8} {
  156. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  157. allocate(t, tctx, 1)
  158. release(t, tctx, odd, parallelism)
  159. }
  160. }
  161. func TestEvenAllocateSerialReleaseParallel(t *testing.T) {
  162. for _, parallelism := range []int64{1, 4, 8} {
  163. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  164. allocate(t, tctx, 1)
  165. release(t, tctx, even, parallelism)
  166. }
  167. }
  168. func allocate(t *testing.T, tctx *testContext, parallel int64) {
  169. // Allocate the whole space
  170. parallelExec := semaphore.NewWeighted(parallel)
  171. routineNum := tctx.maxIP + 10
  172. ch := make(chan *net.IPNet, routineNum)
  173. var id int
  174. var wg sync.WaitGroup
  175. // routine loop
  176. for {
  177. wg.Add(1)
  178. go func(id int) {
  179. parallelExec.Acquire(context.Background(), 1)
  180. ip, _, _ := tctx.a.RequestAddress(tctx.pid, nil, tctx.opts)
  181. ch <- ip
  182. parallelExec.Release(1)
  183. wg.Done()
  184. }(id)
  185. id++
  186. if id == routineNum {
  187. break
  188. }
  189. }
  190. // give time to all the go routines to finish
  191. wg.Wait()
  192. // process results
  193. for i := 0; i < routineNum; i++ {
  194. ip := <-ch
  195. if ip == nil {
  196. continue
  197. }
  198. if there, ok := tctx.ipMap[ip.String()]; ok && there {
  199. t.Fatalf("Got duplicate IP %s", ip.String())
  200. }
  201. tctx.ipList = append(tctx.ipList, ip)
  202. tctx.ipMap[ip.String()] = true
  203. }
  204. assert.Assert(t, is.Len(tctx.ipList, tctx.maxIP))
  205. }
  206. func release(t *testing.T, tctx *testContext, mode releaseMode, parallel int64) {
  207. var startIndex, increment, stopIndex, length int
  208. switch mode {
  209. case all:
  210. startIndex = 0
  211. increment = 1
  212. stopIndex = tctx.maxIP - 1
  213. length = tctx.maxIP
  214. case odd, even:
  215. if mode == odd {
  216. startIndex = 1
  217. }
  218. increment = 2
  219. stopIndex = tctx.maxIP - 1
  220. length = tctx.maxIP / 2
  221. if tctx.maxIP%2 > 0 {
  222. length++
  223. }
  224. default:
  225. t.Fatal("unsupported mode yet")
  226. }
  227. ipIndex := make([]int, 0, length)
  228. // calculate the index to release from the ipList
  229. for i := startIndex; ; i += increment {
  230. ipIndex = append(ipIndex, i)
  231. if i+increment > stopIndex {
  232. break
  233. }
  234. }
  235. var id int
  236. parallelExec := semaphore.NewWeighted(parallel)
  237. ch := make(chan *net.IPNet, len(ipIndex))
  238. group := new(errgroup.Group)
  239. for index := range ipIndex {
  240. index := index
  241. group.Go(func() error {
  242. parallelExec.Acquire(context.Background(), 1)
  243. err := tctx.a.ReleaseAddress(tctx.pid, tctx.ipList[index].IP)
  244. if err != nil {
  245. return fmt.Errorf("routine %d got %v", id, err)
  246. }
  247. ch <- tctx.ipList[index]
  248. parallelExec.Release(1)
  249. return nil
  250. })
  251. id++
  252. }
  253. if err := group.Wait(); err != nil {
  254. t.Fatal(err)
  255. }
  256. for i := 0; i < len(ipIndex); i++ {
  257. ip := <-ch
  258. // check if it is really free
  259. _, _, err := tctx.a.RequestAddress(tctx.pid, ip.IP, nil)
  260. assert.Check(t, err, "ip %v not properly released", ip)
  261. if err != nil {
  262. t.Fatalf("ip %v not properly released, error:%v", ip, err)
  263. }
  264. err = tctx.a.ReleaseAddress(tctx.pid, ip.IP)
  265. assert.NilError(t, err)
  266. if there, ok := tctx.ipMap[ip.String()]; !ok || !there {
  267. t.Fatalf("ip %v got double deallocated", ip)
  268. }
  269. tctx.ipMap[ip.String()] = false
  270. for j, v := range tctx.ipList {
  271. if v == ip {
  272. tctx.ipList = append(tctx.ipList[:j], tctx.ipList[j+1:]...)
  273. break
  274. }
  275. }
  276. }
  277. assert.Check(t, is.Len(tctx.ipList, tctx.maxIP-length))
  278. }