9a8b45c133
Only two address spaces are supported: LocalDefault and GlobalDefault. Support for non-default address spaces in the IPAM Allocator is vestigial, from a time when IPAM state was stored in a persistent shared datastore. There is no way to create non-default address spaces through the IPAM API so there is no need to retain code to support the use of such address spaces. Drop all pretense that more address spaces can exist, to the extent that the IPAM API allows. Signed-off-by: Cory Snider <csnider@mirantis.com>
307 lines
7.3 KiB
Go
307 lines
7.3 KiB
Go
package ipam
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/docker/docker/libnetwork/ipamapi"
|
|
"github.com/docker/docker/libnetwork/ipamutils"
|
|
"golang.org/x/sync/errgroup"
|
|
"golang.org/x/sync/semaphore"
|
|
"gotest.tools/v3/assert"
|
|
is "gotest.tools/v3/assert/cmp"
|
|
)
|
|
|
|
const (
|
|
all = iota
|
|
even
|
|
odd
|
|
)
|
|
|
|
type releaseMode uint
|
|
|
|
type testContext struct {
|
|
a *Allocator
|
|
opts map[string]string
|
|
ipList []*net.IPNet
|
|
ipMap map[string]bool
|
|
pid string
|
|
maxIP int
|
|
}
|
|
|
|
func newTestContext(t *testing.T, mask int, options map[string]string) *testContext {
|
|
a, err := NewAllocator(ipamutils.GetLocalScopeDefaultNetworks(), ipamutils.GetGlobalScopeDefaultNetworks())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
network := fmt.Sprintf("192.168.100.0/%d", mask)
|
|
// total ips 2^(32-mask) - 2 (network and broadcast)
|
|
totalIps := 1<<uint(32-mask) - 2
|
|
|
|
pid, _, _, err := a.RequestPool(localAddressSpace, network, "", nil, false)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
return &testContext{
|
|
a: a,
|
|
opts: options,
|
|
ipList: make([]*net.IPNet, 0, totalIps),
|
|
ipMap: make(map[string]bool),
|
|
pid: pid,
|
|
maxIP: totalIps,
|
|
}
|
|
}
|
|
|
|
func TestDebug(t *testing.T) {
|
|
tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
|
|
tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"})
|
|
tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"})
|
|
}
|
|
|
|
type op struct {
|
|
id int32
|
|
add bool
|
|
name string
|
|
}
|
|
|
|
func (o *op) String() string {
|
|
return fmt.Sprintf("%+v", *o)
|
|
}
|
|
|
|
func TestRequestPoolParallel(t *testing.T) {
|
|
a, err := NewAllocator(ipamutils.GetLocalScopeDefaultNetworks(), ipamutils.GetGlobalScopeDefaultNetworks())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
var operationIndex int32
|
|
ch := make(chan *op, 240)
|
|
|
|
group := new(errgroup.Group)
|
|
defer func() {
|
|
if err := group.Wait(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}()
|
|
|
|
for i := 0; i < 120; i++ {
|
|
group.Go(func() error {
|
|
name, _, _, err := a.RequestPool("GlobalDefault", "", "", nil, false)
|
|
if err != nil {
|
|
t.Log(err) // log so we can see the error in real time rather than at the end when we actually call "Wait".
|
|
return fmt.Errorf("request error %v", err)
|
|
}
|
|
idx := atomic.AddInt32(&operationIndex, 1)
|
|
ch <- &op{idx, true, name}
|
|
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
|
|
idx = atomic.AddInt32(&operationIndex, 1)
|
|
err = a.ReleasePool(name)
|
|
if err != nil {
|
|
t.Log(err) // log so we can see the error in real time rather than at the end when we actually call "Wait".
|
|
return fmt.Errorf("release error %v", err)
|
|
}
|
|
ch <- &op{idx, false, name}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// map of events
|
|
m := make(map[string][]*op)
|
|
for i := 0; i < 240; i++ {
|
|
x := <-ch
|
|
ops, ok := m[x.name]
|
|
if !ok {
|
|
ops = make([]*op, 0, 10)
|
|
}
|
|
ops = append(ops, x)
|
|
m[x.name] = ops
|
|
}
|
|
|
|
// Post processing to avoid event reordering on the channel
|
|
for pool, ops := range m {
|
|
sort.Slice(ops[:], func(i, j int) bool {
|
|
return ops[i].id < ops[j].id
|
|
})
|
|
expected := true
|
|
for _, op := range ops {
|
|
if op.add != expected {
|
|
t.Fatalf("Operations for %v not valid %v, operations %v", pool, op, ops)
|
|
}
|
|
expected = !expected
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestFullAllocateRelease(t *testing.T) {
|
|
for _, parallelism := range []int64{2, 4, 8} {
|
|
for _, mask := range []int{29, 25, 24, 21} {
|
|
tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"})
|
|
allocate(t, tctx, parallelism)
|
|
release(t, tctx, all, parallelism)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestOddAllocateRelease(t *testing.T) {
|
|
for _, parallelism := range []int64{2, 4, 8} {
|
|
for _, mask := range []int{29, 25, 24, 21} {
|
|
tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"})
|
|
allocate(t, tctx, parallelism)
|
|
release(t, tctx, odd, parallelism)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestFullAllocateSerialReleaseParallel(t *testing.T) {
|
|
for _, parallelism := range []int64{1, 4, 8} {
|
|
tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
|
|
allocate(t, tctx, 1)
|
|
release(t, tctx, all, parallelism)
|
|
}
|
|
}
|
|
|
|
func TestOddAllocateSerialReleaseParallel(t *testing.T) {
|
|
for _, parallelism := range []int64{1, 4, 8} {
|
|
tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
|
|
allocate(t, tctx, 1)
|
|
release(t, tctx, odd, parallelism)
|
|
}
|
|
}
|
|
|
|
func TestEvenAllocateSerialReleaseParallel(t *testing.T) {
|
|
for _, parallelism := range []int64{1, 4, 8} {
|
|
tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
|
|
allocate(t, tctx, 1)
|
|
release(t, tctx, even, parallelism)
|
|
}
|
|
}
|
|
|
|
func allocate(t *testing.T, tctx *testContext, parallel int64) {
|
|
// Allocate the whole space
|
|
parallelExec := semaphore.NewWeighted(parallel)
|
|
routineNum := tctx.maxIP + 10
|
|
ch := make(chan *net.IPNet, routineNum)
|
|
var id int
|
|
var wg sync.WaitGroup
|
|
// routine loop
|
|
for {
|
|
wg.Add(1)
|
|
go func(id int) {
|
|
parallelExec.Acquire(context.Background(), 1)
|
|
ip, _, _ := tctx.a.RequestAddress(tctx.pid, nil, tctx.opts)
|
|
ch <- ip
|
|
parallelExec.Release(1)
|
|
wg.Done()
|
|
}(id)
|
|
id++
|
|
if id == routineNum {
|
|
break
|
|
}
|
|
}
|
|
|
|
// give time to all the go routines to finish
|
|
wg.Wait()
|
|
|
|
// process results
|
|
for i := 0; i < routineNum; i++ {
|
|
ip := <-ch
|
|
if ip == nil {
|
|
continue
|
|
}
|
|
if there, ok := tctx.ipMap[ip.String()]; ok && there {
|
|
t.Fatalf("Got duplicate IP %s", ip.String())
|
|
}
|
|
tctx.ipList = append(tctx.ipList, ip)
|
|
tctx.ipMap[ip.String()] = true
|
|
}
|
|
|
|
assert.Assert(t, is.Len(tctx.ipList, tctx.maxIP))
|
|
}
|
|
|
|
func release(t *testing.T, tctx *testContext, mode releaseMode, parallel int64) {
|
|
var startIndex, increment, stopIndex, length int
|
|
switch mode {
|
|
case all:
|
|
startIndex = 0
|
|
increment = 1
|
|
stopIndex = tctx.maxIP - 1
|
|
length = tctx.maxIP
|
|
case odd, even:
|
|
if mode == odd {
|
|
startIndex = 1
|
|
}
|
|
increment = 2
|
|
stopIndex = tctx.maxIP - 1
|
|
length = tctx.maxIP / 2
|
|
if tctx.maxIP%2 > 0 {
|
|
length++
|
|
}
|
|
default:
|
|
t.Fatal("unsupported mode yet")
|
|
}
|
|
|
|
ipIndex := make([]int, 0, length)
|
|
// calculate the index to release from the ipList
|
|
for i := startIndex; ; i += increment {
|
|
ipIndex = append(ipIndex, i)
|
|
if i+increment > stopIndex {
|
|
break
|
|
}
|
|
}
|
|
|
|
var id int
|
|
parallelExec := semaphore.NewWeighted(parallel)
|
|
ch := make(chan *net.IPNet, len(ipIndex))
|
|
group := new(errgroup.Group)
|
|
for index := range ipIndex {
|
|
index := index
|
|
group.Go(func() error {
|
|
parallelExec.Acquire(context.Background(), 1)
|
|
err := tctx.a.ReleaseAddress(tctx.pid, tctx.ipList[index].IP)
|
|
if err != nil {
|
|
return fmt.Errorf("routine %d got %v", id, err)
|
|
}
|
|
ch <- tctx.ipList[index]
|
|
parallelExec.Release(1)
|
|
return nil
|
|
})
|
|
id++
|
|
}
|
|
|
|
if err := group.Wait(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
for i := 0; i < len(ipIndex); i++ {
|
|
ip := <-ch
|
|
|
|
// check if it is really free
|
|
_, _, err := tctx.a.RequestAddress(tctx.pid, ip.IP, nil)
|
|
assert.Check(t, err, "ip %v not properly released", ip)
|
|
if err != nil {
|
|
t.Fatalf("ip %v not properly released, error:%v", ip, err)
|
|
}
|
|
err = tctx.a.ReleaseAddress(tctx.pid, ip.IP)
|
|
assert.NilError(t, err)
|
|
|
|
if there, ok := tctx.ipMap[ip.String()]; !ok || !there {
|
|
t.Fatalf("ip %v got double deallocated", ip)
|
|
}
|
|
tctx.ipMap[ip.String()] = false
|
|
for j, v := range tctx.ipList {
|
|
if v == ip {
|
|
tctx.ipList = append(tctx.ipList[:j], tctx.ipList[j+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
assert.Check(t, is.Len(tctx.ipList, tctx.maxIP-length))
|
|
}
|