123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312 |
- package ipam
- import (
- "context"
- "fmt"
- "math/rand"
- "net"
- "sort"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "github.com/docker/docker/libnetwork/ipamapi"
- "github.com/docker/docker/libnetwork/ipamutils"
- "golang.org/x/sync/errgroup"
- "golang.org/x/sync/semaphore"
- "gotest.tools/v3/assert"
- is "gotest.tools/v3/assert/cmp"
- )
- const (
- all = iota
- even
- odd
- )
- type releaseMode uint
- type testContext struct {
- a *Allocator
- opts map[string]string
- ipList []*net.IPNet
- ipMap map[string]bool
- pid string
- maxIP int
- }
- func newTestContext(t *testing.T, mask int, options map[string]string) *testContext {
- a, err := NewAllocator(ipamutils.GetLocalScopeDefaultNetworks(), ipamutils.GetGlobalScopeDefaultNetworks())
- if err != nil {
- t.Fatal(err)
- }
- a.addrSpaces["giallo"] = &addrSpace{
- alloc: a.addrSpaces[localAddressSpace].alloc,
- subnets: map[SubnetKey]*PoolData{},
- }
- network := fmt.Sprintf("192.168.100.0/%d", mask)
- // total ips 2^(32-mask) - 2 (network and broadcast)
- totalIps := 1<<uint(32-mask) - 2
- pid, _, _, err := a.RequestPool("giallo", network, "", nil, false)
- if err != nil {
- t.Fatal(err)
- }
- return &testContext{
- a: a,
- opts: options,
- ipList: make([]*net.IPNet, 0, totalIps),
- ipMap: make(map[string]bool),
- pid: pid,
- maxIP: totalIps,
- }
- }
- func TestDebug(t *testing.T) {
- tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
- tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"})
- tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"})
- }
- type op struct {
- id int32
- add bool
- name string
- }
- func (o *op) String() string {
- return fmt.Sprintf("%+v", *o)
- }
- func TestRequestPoolParallel(t *testing.T) {
- a, err := NewAllocator(ipamutils.GetLocalScopeDefaultNetworks(), ipamutils.GetGlobalScopeDefaultNetworks())
- if err != nil {
- t.Fatal(err)
- }
- var operationIndex int32
- ch := make(chan *op, 240)
- group := new(errgroup.Group)
- defer func() {
- if err := group.Wait(); err != nil {
- t.Fatal(err)
- }
- }()
- for i := 0; i < 120; i++ {
- group.Go(func() error {
- name, _, _, err := a.RequestPool("GlobalDefault", "", "", nil, false)
- if err != nil {
- t.Log(err) // log so we can see the error in real time rather than at the end when we actually call "Wait".
- return fmt.Errorf("request error %v", err)
- }
- idx := atomic.AddInt32(&operationIndex, 1)
- ch <- &op{idx, true, name}
- time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
- idx = atomic.AddInt32(&operationIndex, 1)
- err = a.ReleasePool(name)
- if err != nil {
- t.Log(err) // log so we can see the error in real time rather than at the end when we actually call "Wait".
- return fmt.Errorf("release error %v", err)
- }
- ch <- &op{idx, false, name}
- return nil
- })
- }
- // map of events
- m := make(map[string][]*op)
- for i := 0; i < 240; i++ {
- x := <-ch
- ops, ok := m[x.name]
- if !ok {
- ops = make([]*op, 0, 10)
- }
- ops = append(ops, x)
- m[x.name] = ops
- }
- // Post processing to avoid event reordering on the channel
- for pool, ops := range m {
- sort.Slice(ops[:], func(i, j int) bool {
- return ops[i].id < ops[j].id
- })
- expected := true
- for _, op := range ops {
- if op.add != expected {
- t.Fatalf("Operations for %v not valid %v, operations %v", pool, op, ops)
- }
- expected = !expected
- }
- }
- }
- func TestFullAllocateRelease(t *testing.T) {
- for _, parallelism := range []int64{2, 4, 8} {
- for _, mask := range []int{29, 25, 24, 21} {
- tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"})
- allocate(t, tctx, parallelism)
- release(t, tctx, all, parallelism)
- }
- }
- }
- func TestOddAllocateRelease(t *testing.T) {
- for _, parallelism := range []int64{2, 4, 8} {
- for _, mask := range []int{29, 25, 24, 21} {
- tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"})
- allocate(t, tctx, parallelism)
- release(t, tctx, odd, parallelism)
- }
- }
- }
- func TestFullAllocateSerialReleaseParallel(t *testing.T) {
- for _, parallelism := range []int64{1, 4, 8} {
- tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
- allocate(t, tctx, 1)
- release(t, tctx, all, parallelism)
- }
- }
- func TestOddAllocateSerialReleaseParallel(t *testing.T) {
- for _, parallelism := range []int64{1, 4, 8} {
- tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
- allocate(t, tctx, 1)
- release(t, tctx, odd, parallelism)
- }
- }
- func TestEvenAllocateSerialReleaseParallel(t *testing.T) {
- for _, parallelism := range []int64{1, 4, 8} {
- tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
- allocate(t, tctx, 1)
- release(t, tctx, even, parallelism)
- }
- }
- func allocate(t *testing.T, tctx *testContext, parallel int64) {
- // Allocate the whole space
- parallelExec := semaphore.NewWeighted(parallel)
- routineNum := tctx.maxIP + 10
- ch := make(chan *net.IPNet, routineNum)
- var id int
- var wg sync.WaitGroup
- // routine loop
- for {
- wg.Add(1)
- go func(id int) {
- parallelExec.Acquire(context.Background(), 1)
- ip, _, _ := tctx.a.RequestAddress(tctx.pid, nil, tctx.opts)
- ch <- ip
- parallelExec.Release(1)
- wg.Done()
- }(id)
- id++
- if id == routineNum {
- break
- }
- }
- // give time to all the go routines to finish
- wg.Wait()
- // process results
- for i := 0; i < routineNum; i++ {
- ip := <-ch
- if ip == nil {
- continue
- }
- if there, ok := tctx.ipMap[ip.String()]; ok && there {
- t.Fatalf("Got duplicate IP %s", ip.String())
- }
- tctx.ipList = append(tctx.ipList, ip)
- tctx.ipMap[ip.String()] = true
- }
- assert.Assert(t, is.Len(tctx.ipList, tctx.maxIP))
- }
- func release(t *testing.T, tctx *testContext, mode releaseMode, parallel int64) {
- var startIndex, increment, stopIndex, length int
- switch mode {
- case all:
- startIndex = 0
- increment = 1
- stopIndex = tctx.maxIP - 1
- length = tctx.maxIP
- case odd, even:
- if mode == odd {
- startIndex = 1
- }
- increment = 2
- stopIndex = tctx.maxIP - 1
- length = tctx.maxIP / 2
- if tctx.maxIP%2 > 0 {
- length++
- }
- default:
- t.Fatal("unsupported mode yet")
- }
- ipIndex := make([]int, 0, length)
- // calculate the index to release from the ipList
- for i := startIndex; ; i += increment {
- ipIndex = append(ipIndex, i)
- if i+increment > stopIndex {
- break
- }
- }
- var id int
- parallelExec := semaphore.NewWeighted(parallel)
- ch := make(chan *net.IPNet, len(ipIndex))
- group := new(errgroup.Group)
- for index := range ipIndex {
- index := index
- group.Go(func() error {
- parallelExec.Acquire(context.Background(), 1)
- err := tctx.a.ReleaseAddress(tctx.pid, tctx.ipList[index].IP)
- if err != nil {
- return fmt.Errorf("routine %d got %v", id, err)
- }
- ch <- tctx.ipList[index]
- parallelExec.Release(1)
- return nil
- })
- id++
- }
- if err := group.Wait(); err != nil {
- t.Fatal(err)
- }
- for i := 0; i < len(ipIndex); i++ {
- ip := <-ch
- // check if it is really free
- _, _, err := tctx.a.RequestAddress(tctx.pid, ip.IP, nil)
- assert.Check(t, err, "ip %v not properly released", ip)
- if err != nil {
- t.Fatalf("ip %v not properly released, error:%v", ip, err)
- }
- err = tctx.a.ReleaseAddress(tctx.pid, ip.IP)
- assert.NilError(t, err)
- if there, ok := tctx.ipMap[ip.String()]; !ok || !there {
- t.Fatalf("ip %v got double deallocated", ip)
- }
- tctx.ipMap[ip.String()] = false
- for j, v := range tctx.ipList {
- if v == ip {
- tctx.ipList = append(tctx.ipList[:j], tctx.ipList[j+1:]...)
- break
- }
- }
- }
- assert.Check(t, is.Len(tctx.ipList, tctx.maxIP-length))
- }
|