parallel_test.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package ipam
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "testing"
  8. "github.com/golang/sync/semaphore"
  9. "github.com/docker/libnetwork/ipamapi"
  10. "github.com/stretchr/testify/assert"
  11. )
  12. const (
  13. all = iota
  14. even
  15. odd
  16. )
  17. type releaseMode uint
  18. type testContext struct {
  19. a *Allocator
  20. opts map[string]string
  21. ipList []*net.IPNet
  22. ipMap map[string]bool
  23. pid string
  24. maxIP int
  25. }
  26. func newTestContext(t *testing.T, mask int, options map[string]string) *testContext {
  27. a, err := getAllocator(true)
  28. if err != nil {
  29. t.Fatal(err)
  30. }
  31. a.addrSpaces["giallo"] = &addrSpace{
  32. id: dsConfigKey + "/" + "giallo",
  33. ds: a.addrSpaces[localAddressSpace].ds,
  34. alloc: a.addrSpaces[localAddressSpace].alloc,
  35. scope: a.addrSpaces[localAddressSpace].scope,
  36. subnets: map[SubnetKey]*PoolData{},
  37. }
  38. network := fmt.Sprintf("192.168.100.0/%d", mask)
  39. // total ips 2^(32-mask) - 2 (network and broadcast)
  40. totalIps := 1<<uint(32-mask) - 2
  41. pid, _, _, err := a.RequestPool("giallo", network, "", nil, false)
  42. if err != nil {
  43. t.Fatal(err)
  44. }
  45. return &testContext{
  46. a: a,
  47. opts: options,
  48. ipList: make([]*net.IPNet, 0, totalIps),
  49. ipMap: make(map[string]bool),
  50. pid: pid,
  51. maxIP: totalIps,
  52. }
  53. }
  54. func TestDebug(t *testing.T) {
  55. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  56. tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  57. tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  58. }
  59. func TestFullAllocateRelease(t *testing.T) {
  60. for _, parallelism := range []int64{2, 4, 8} {
  61. for _, mask := range []int{29, 25, 24, 21} {
  62. tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  63. allocate(t, tctx, parallelism)
  64. release(t, tctx, all, parallelism)
  65. }
  66. }
  67. }
  68. func TestOddAllocateRelease(t *testing.T) {
  69. for _, parallelism := range []int64{2, 4, 8} {
  70. for _, mask := range []int{29, 25, 24, 21} {
  71. tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  72. allocate(t, tctx, parallelism)
  73. release(t, tctx, odd, parallelism)
  74. }
  75. }
  76. }
  77. func TestFullAllocateSerialReleaseParallel(t *testing.T) {
  78. for _, parallelism := range []int64{1, 4, 8} {
  79. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  80. allocate(t, tctx, 1)
  81. release(t, tctx, all, parallelism)
  82. }
  83. }
  84. func TestOddAllocateSerialReleaseParallel(t *testing.T) {
  85. for _, parallelism := range []int64{1, 4, 8} {
  86. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  87. allocate(t, tctx, 1)
  88. release(t, tctx, odd, parallelism)
  89. }
  90. }
  91. func TestEvenAllocateSerialReleaseParallel(t *testing.T) {
  92. for _, parallelism := range []int64{1, 4, 8} {
  93. tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
  94. allocate(t, tctx, 1)
  95. release(t, tctx, even, parallelism)
  96. }
  97. }
  98. func allocate(t *testing.T, tctx *testContext, parallel int64) {
  99. // Allocate the whole space
  100. parallelExec := semaphore.NewWeighted(parallel)
  101. routineNum := tctx.maxIP + 10
  102. ch := make(chan *net.IPNet, routineNum)
  103. var id int
  104. var wg sync.WaitGroup
  105. // routine loop
  106. for {
  107. wg.Add(1)
  108. go func(id int) {
  109. parallelExec.Acquire(context.Background(), 1)
  110. ip, _, _ := tctx.a.RequestAddress(tctx.pid, nil, tctx.opts)
  111. ch <- ip
  112. parallelExec.Release(1)
  113. wg.Done()
  114. }(id)
  115. id++
  116. if id == routineNum {
  117. break
  118. }
  119. }
  120. // give time to all the go routines to finish
  121. wg.Wait()
  122. // process results
  123. for i := 0; i < routineNum; i++ {
  124. ip := <-ch
  125. if ip == nil {
  126. continue
  127. }
  128. if there, ok := tctx.ipMap[ip.String()]; ok && there {
  129. t.Fatalf("Got duplicate IP %s", ip.String())
  130. break
  131. }
  132. tctx.ipList = append(tctx.ipList, ip)
  133. tctx.ipMap[ip.String()] = true
  134. }
  135. assert.Len(t, tctx.ipList, tctx.maxIP)
  136. if len(tctx.ipList) != tctx.maxIP {
  137. t.Fatal("missmatch number allocation")
  138. }
  139. }
  140. func release(t *testing.T, tctx *testContext, mode releaseMode, parallel int64) {
  141. var startIndex, increment, stopIndex, length int
  142. switch mode {
  143. case all:
  144. startIndex = 0
  145. increment = 1
  146. stopIndex = tctx.maxIP - 1
  147. length = tctx.maxIP
  148. case odd, even:
  149. if mode == odd {
  150. startIndex = 1
  151. }
  152. increment = 2
  153. stopIndex = tctx.maxIP - 1
  154. length = tctx.maxIP / 2
  155. if tctx.maxIP%2 > 0 {
  156. length++
  157. }
  158. default:
  159. t.Fatal("unsupported mode yet")
  160. }
  161. ipIndex := make([]int, 0, length)
  162. // calculate the index to release from the ipList
  163. for i := startIndex; ; i += increment {
  164. ipIndex = append(ipIndex, i)
  165. if i+increment > stopIndex {
  166. break
  167. }
  168. }
  169. var id int
  170. parallelExec := semaphore.NewWeighted(parallel)
  171. ch := make(chan *net.IPNet, len(ipIndex))
  172. wg := sync.WaitGroup{}
  173. for index := range ipIndex {
  174. wg.Add(1)
  175. go func(id, index int) {
  176. parallelExec.Acquire(context.Background(), 1)
  177. // logrus.Errorf("index %v", index)
  178. // logrus.Errorf("list %v", tctx.ipList)
  179. err := tctx.a.ReleaseAddress(tctx.pid, tctx.ipList[index].IP)
  180. if err != nil {
  181. t.Fatalf("routine %d got %v", id, err)
  182. }
  183. ch <- tctx.ipList[index]
  184. parallelExec.Release(1)
  185. wg.Done()
  186. }(id, index)
  187. id++
  188. }
  189. wg.Wait()
  190. for i := 0; i < len(ipIndex); i++ {
  191. ip := <-ch
  192. // check if it is really free
  193. _, _, err := tctx.a.RequestAddress(tctx.pid, ip.IP, nil)
  194. assert.NoError(t, err, "ip %v not properly released", ip)
  195. if err != nil {
  196. t.Fatalf("ip %v not properly released, error:%v", ip, err)
  197. }
  198. err = tctx.a.ReleaseAddress(tctx.pid, ip.IP)
  199. assert.NoError(t, err)
  200. if there, ok := tctx.ipMap[ip.String()]; !ok || !there {
  201. t.Fatalf("ip %v got double deallocated", ip)
  202. }
  203. tctx.ipMap[ip.String()] = false
  204. for j, v := range tctx.ipList {
  205. if v == ip {
  206. tctx.ipList = append(tctx.ipList[:j], tctx.ipList[j+1:]...)
  207. break
  208. }
  209. }
  210. }
  211. assert.Len(t, tctx.ipList, tctx.maxIP-length)
  212. }