commit
3d6425dedf
7 changed files with 1131 additions and 683 deletions
File diff suppressed because it is too large
Load diff
239
libnetwork/ipam/parallel_test.go
Normal file
239
libnetwork/ipam/parallel_test.go
Normal file
|
@ -0,0 +1,239 @@
|
|||
package ipam
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/sync/semaphore"
|
||||
|
||||
"github.com/docker/libnetwork/ipamapi"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
const (
|
||||
all = iota
|
||||
even
|
||||
odd
|
||||
)
|
||||
|
||||
type releaseMode uint
|
||||
|
||||
type testContext struct {
|
||||
a *Allocator
|
||||
opts map[string]string
|
||||
ipList []*net.IPNet
|
||||
ipMap map[string]bool
|
||||
pid string
|
||||
maxIP int
|
||||
}
|
||||
|
||||
func newTestContext(t *testing.T, mask int, options map[string]string) *testContext {
|
||||
a, err := getAllocator(true)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
a.addrSpaces["giallo"] = &addrSpace{
|
||||
id: dsConfigKey + "/" + "giallo",
|
||||
ds: a.addrSpaces[localAddressSpace].ds,
|
||||
alloc: a.addrSpaces[localAddressSpace].alloc,
|
||||
scope: a.addrSpaces[localAddressSpace].scope,
|
||||
subnets: map[SubnetKey]*PoolData{},
|
||||
}
|
||||
|
||||
network := fmt.Sprintf("192.168.100.0/%d", mask)
|
||||
// total ips 2^(32-mask) - 2 (network and broadcast)
|
||||
totalIps := 1<<uint(32-mask) - 2
|
||||
|
||||
pid, _, _, err := a.RequestPool("giallo", network, "", nil, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
return &testContext{
|
||||
a: a,
|
||||
opts: options,
|
||||
ipList: make([]*net.IPNet, 0, totalIps),
|
||||
ipMap: make(map[string]bool),
|
||||
pid: pid,
|
||||
maxIP: totalIps,
|
||||
}
|
||||
}
|
||||
|
||||
func TestDebug(t *testing.T) {
|
||||
tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
|
||||
tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"})
|
||||
tctx.a.RequestAddress(tctx.pid, nil, map[string]string{ipamapi.AllocSerialPrefix: "true"})
|
||||
}
|
||||
|
||||
func TestFullAllocateRelease(t *testing.T) {
|
||||
for _, parallelism := range []int64{2, 4, 8} {
|
||||
for _, mask := range []int{29, 25, 24, 21} {
|
||||
tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"})
|
||||
allocate(t, tctx, parallelism)
|
||||
release(t, tctx, all, parallelism)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestOddAllocateRelease(t *testing.T) {
|
||||
for _, parallelism := range []int64{2, 4, 8} {
|
||||
for _, mask := range []int{29, 25, 24, 21} {
|
||||
tctx := newTestContext(t, mask, map[string]string{ipamapi.AllocSerialPrefix: "true"})
|
||||
allocate(t, tctx, parallelism)
|
||||
release(t, tctx, odd, parallelism)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFullAllocateSerialReleaseParallel(t *testing.T) {
|
||||
for _, parallelism := range []int64{1, 4, 8} {
|
||||
tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
|
||||
allocate(t, tctx, 1)
|
||||
release(t, tctx, all, parallelism)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOddAllocateSerialReleaseParallel(t *testing.T) {
|
||||
for _, parallelism := range []int64{1, 4, 8} {
|
||||
tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
|
||||
allocate(t, tctx, 1)
|
||||
release(t, tctx, odd, parallelism)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvenAllocateSerialReleaseParallel(t *testing.T) {
|
||||
for _, parallelism := range []int64{1, 4, 8} {
|
||||
tctx := newTestContext(t, 23, map[string]string{ipamapi.AllocSerialPrefix: "true"})
|
||||
allocate(t, tctx, 1)
|
||||
release(t, tctx, even, parallelism)
|
||||
}
|
||||
}
|
||||
|
||||
func allocate(t *testing.T, tctx *testContext, parallel int64) {
|
||||
// Allocate the whole space
|
||||
parallelExec := semaphore.NewWeighted(parallel)
|
||||
routineNum := tctx.maxIP + 10
|
||||
ch := make(chan *net.IPNet, routineNum)
|
||||
var id int
|
||||
var wg sync.WaitGroup
|
||||
// routine loop
|
||||
for {
|
||||
wg.Add(1)
|
||||
go func(id int) {
|
||||
parallelExec.Acquire(context.Background(), 1)
|
||||
ip, _, _ := tctx.a.RequestAddress(tctx.pid, nil, tctx.opts)
|
||||
ch <- ip
|
||||
parallelExec.Release(1)
|
||||
wg.Done()
|
||||
}(id)
|
||||
id++
|
||||
if id == routineNum {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// give time to all the go routines to finish
|
||||
wg.Wait()
|
||||
|
||||
// process results
|
||||
for i := 0; i < routineNum; i++ {
|
||||
ip := <-ch
|
||||
if ip == nil {
|
||||
continue
|
||||
}
|
||||
if there, ok := tctx.ipMap[ip.String()]; ok && there {
|
||||
t.Fatalf("Got duplicate IP %s", ip.String())
|
||||
break
|
||||
}
|
||||
tctx.ipList = append(tctx.ipList, ip)
|
||||
tctx.ipMap[ip.String()] = true
|
||||
}
|
||||
|
||||
assert.Len(t, tctx.ipList, tctx.maxIP)
|
||||
if len(tctx.ipList) != tctx.maxIP {
|
||||
t.Fatal("missmatch number allocation")
|
||||
}
|
||||
}
|
||||
|
||||
func release(t *testing.T, tctx *testContext, mode releaseMode, parallel int64) {
|
||||
var startIndex, increment, stopIndex, length int
|
||||
switch mode {
|
||||
case all:
|
||||
startIndex = 0
|
||||
increment = 1
|
||||
stopIndex = tctx.maxIP - 1
|
||||
length = tctx.maxIP
|
||||
case odd, even:
|
||||
if mode == odd {
|
||||
startIndex = 1
|
||||
}
|
||||
increment = 2
|
||||
stopIndex = tctx.maxIP - 1
|
||||
length = tctx.maxIP / 2
|
||||
if tctx.maxIP%2 > 0 {
|
||||
length++
|
||||
}
|
||||
default:
|
||||
t.Fatal("unsupported mode yet")
|
||||
}
|
||||
|
||||
ipIndex := make([]int, 0, length)
|
||||
// calculate the index to release from the ipList
|
||||
for i := startIndex; ; i += increment {
|
||||
ipIndex = append(ipIndex, i)
|
||||
if i+increment > stopIndex {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
var id int
|
||||
parallelExec := semaphore.NewWeighted(parallel)
|
||||
ch := make(chan *net.IPNet, len(ipIndex))
|
||||
wg := sync.WaitGroup{}
|
||||
for index := range ipIndex {
|
||||
wg.Add(1)
|
||||
go func(id, index int) {
|
||||
parallelExec.Acquire(context.Background(), 1)
|
||||
// logrus.Errorf("index %v", index)
|
||||
// logrus.Errorf("list %v", tctx.ipList)
|
||||
err := tctx.a.ReleaseAddress(tctx.pid, tctx.ipList[index].IP)
|
||||
if err != nil {
|
||||
t.Fatalf("routine %d got %v", id, err)
|
||||
}
|
||||
ch <- tctx.ipList[index]
|
||||
parallelExec.Release(1)
|
||||
wg.Done()
|
||||
}(id, index)
|
||||
id++
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
for i := 0; i < len(ipIndex); i++ {
|
||||
ip := <-ch
|
||||
|
||||
// check if it is really free
|
||||
_, _, err := tctx.a.RequestAddress(tctx.pid, ip.IP, nil)
|
||||
assert.NoError(t, err, "ip %v not properly released", ip)
|
||||
if err != nil {
|
||||
t.Fatalf("ip %v not properly released, error:%v", ip, err)
|
||||
}
|
||||
err = tctx.a.ReleaseAddress(tctx.pid, ip.IP)
|
||||
assert.NoError(t, err)
|
||||
|
||||
if there, ok := tctx.ipMap[ip.String()]; !ok || !there {
|
||||
t.Fatalf("ip %v got double deallocated", ip)
|
||||
}
|
||||
tctx.ipMap[ip.String()] = false
|
||||
for j, v := range tctx.ipList {
|
||||
if v == ip {
|
||||
tctx.ipList = append(tctx.ipList[:j], tctx.ipList[j+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert.Len(t, tctx.ipList, tctx.maxIP-length)
|
||||
}
|
|
@ -50,5 +50,6 @@ github.com/vishvananda/netns 604eaf189ee867d8c147fafc28def2394e878d25
|
|||
golang.org/x/crypto 558b6879de74bc843225cde5686419267ff707ca
|
||||
golang.org/x/net 7dcfb8076726a3fdd9353b6b8a1f1b6be6811bd6
|
||||
golang.org/x/sys 07c182904dbd53199946ba614a412c61d3c548f5
|
||||
github.com/golang/sync fd80eb99c8f653c847d294a001bdf2a3a6f768f5
|
||||
github.com/pkg/errors 839d9e913e063e28dfd0e6c7b7512793e0a48be9
|
||||
github.com/ishidawataru/sctp 07191f837fedd2f13d1ec7b5f885f0f3ec54b1cb
|
||||
|
|
27
libnetwork/vendor/github.com/golang/sync/LICENSE
generated
vendored
Normal file
27
libnetwork/vendor/github.com/golang/sync/LICENSE
generated
vendored
Normal file
|
@ -0,0 +1,27 @@
|
|||
Copyright (c) 2009 The Go Authors. All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are
|
||||
met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
* Redistributions in binary form must reproduce the above
|
||||
copyright notice, this list of conditions and the following disclaimer
|
||||
in the documentation and/or other materials provided with the
|
||||
distribution.
|
||||
* Neither the name of Google Inc. nor the names of its
|
||||
contributors may be used to endorse or promote products derived from
|
||||
this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
22
libnetwork/vendor/github.com/golang/sync/PATENTS
generated
vendored
Normal file
22
libnetwork/vendor/github.com/golang/sync/PATENTS
generated
vendored
Normal file
|
@ -0,0 +1,22 @@
|
|||
Additional IP Rights Grant (Patents)
|
||||
|
||||
"This implementation" means the copyrightable works distributed by
|
||||
Google as part of the Go project.
|
||||
|
||||
Google hereby grants to You a perpetual, worldwide, non-exclusive,
|
||||
no-charge, royalty-free, irrevocable (except as stated in this section)
|
||||
patent license to make, have made, use, offer to sell, sell, import,
|
||||
transfer and otherwise run, modify and propagate the contents of this
|
||||
implementation of Go, where such license applies only to those patent
|
||||
claims, both currently owned or controlled by Google and acquired in
|
||||
the future, licensable by Google that are necessarily infringed by this
|
||||
implementation of Go. This grant does not include claims that would be
|
||||
infringed only as a consequence of further modification of this
|
||||
implementation. If you or your agent or exclusive licensee institute or
|
||||
order or agree to the institution of patent litigation against any
|
||||
entity (including a cross-claim or counterclaim in a lawsuit) alleging
|
||||
that this implementation of Go or any code incorporated within this
|
||||
implementation of Go constitutes direct or contributory patent
|
||||
infringement, or inducement of patent infringement, then any patent
|
||||
rights granted to you under this License for this implementation of Go
|
||||
shall terminate as of the date such litigation is filed.
|
18
libnetwork/vendor/github.com/golang/sync/README.md
generated
vendored
Normal file
18
libnetwork/vendor/github.com/golang/sync/README.md
generated
vendored
Normal file
|
@ -0,0 +1,18 @@
|
|||
# Go Sync
|
||||
|
||||
This repository provides Go concurrency primitives in addition to the
|
||||
ones provided by the language and "sync" and "sync/atomic" packages.
|
||||
|
||||
## Download/Install
|
||||
|
||||
The easiest way to install is to run `go get -u golang.org/x/sync`. You can
|
||||
also manually git clone the repository to `$GOPATH/src/golang.org/x/sync`.
|
||||
|
||||
## Report Issues / Send Patches
|
||||
|
||||
This repository uses Gerrit for code changes. To learn how to submit changes to
|
||||
this repository, see https://golang.org/doc/contribute.html.
|
||||
|
||||
The main issue tracker for the sync repository is located at
|
||||
https://github.com/golang/go/issues. Prefix your issue with "x/sync:" in the
|
||||
subject line, so it is easy to find.
|
131
libnetwork/vendor/github.com/golang/sync/semaphore/semaphore.go
generated
vendored
Normal file
131
libnetwork/vendor/github.com/golang/sync/semaphore/semaphore.go
generated
vendored
Normal file
|
@ -0,0 +1,131 @@
|
|||
// Copyright 2017 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package semaphore provides a weighted semaphore implementation.
|
||||
package semaphore // import "golang.org/x/sync/semaphore"
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"sync"
|
||||
|
||||
// Use the old context because packages that depend on this one
|
||||
// (e.g. cloud.google.com/go/...) must run on Go 1.6.
|
||||
// TODO(jba): update to "context" when possible.
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type waiter struct {
|
||||
n int64
|
||||
ready chan<- struct{} // Closed when semaphore acquired.
|
||||
}
|
||||
|
||||
// NewWeighted creates a new weighted semaphore with the given
|
||||
// maximum combined weight for concurrent access.
|
||||
func NewWeighted(n int64) *Weighted {
|
||||
w := &Weighted{size: n}
|
||||
return w
|
||||
}
|
||||
|
||||
// Weighted provides a way to bound concurrent access to a resource.
|
||||
// The callers can request access with a given weight.
|
||||
type Weighted struct {
|
||||
size int64
|
||||
cur int64
|
||||
mu sync.Mutex
|
||||
waiters list.List
|
||||
}
|
||||
|
||||
// Acquire acquires the semaphore with a weight of n, blocking only until ctx
|
||||
// is done. On success, returns nil. On failure, returns ctx.Err() and leaves
|
||||
// the semaphore unchanged.
|
||||
//
|
||||
// If ctx is already done, Acquire may still succeed without blocking.
|
||||
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
|
||||
s.mu.Lock()
|
||||
if s.size-s.cur >= n && s.waiters.Len() == 0 {
|
||||
s.cur += n
|
||||
s.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
if n > s.size {
|
||||
// Don't make other Acquire calls block on one that's doomed to fail.
|
||||
s.mu.Unlock()
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
ready := make(chan struct{})
|
||||
w := waiter{n: n, ready: ready}
|
||||
elem := s.waiters.PushBack(w)
|
||||
s.mu.Unlock()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err := ctx.Err()
|
||||
s.mu.Lock()
|
||||
select {
|
||||
case <-ready:
|
||||
// Acquired the semaphore after we were canceled. Rather than trying to
|
||||
// fix up the queue, just pretend we didn't notice the cancelation.
|
||||
err = nil
|
||||
default:
|
||||
s.waiters.Remove(elem)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
return err
|
||||
|
||||
case <-ready:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// TryAcquire acquires the semaphore with a weight of n without blocking.
|
||||
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
|
||||
func (s *Weighted) TryAcquire(n int64) bool {
|
||||
s.mu.Lock()
|
||||
success := s.size-s.cur >= n && s.waiters.Len() == 0
|
||||
if success {
|
||||
s.cur += n
|
||||
}
|
||||
s.mu.Unlock()
|
||||
return success
|
||||
}
|
||||
|
||||
// Release releases the semaphore with a weight of n.
|
||||
func (s *Weighted) Release(n int64) {
|
||||
s.mu.Lock()
|
||||
s.cur -= n
|
||||
if s.cur < 0 {
|
||||
s.mu.Unlock()
|
||||
panic("semaphore: bad release")
|
||||
}
|
||||
for {
|
||||
next := s.waiters.Front()
|
||||
if next == nil {
|
||||
break // No more waiters blocked.
|
||||
}
|
||||
|
||||
w := next.Value.(waiter)
|
||||
if s.size-s.cur < w.n {
|
||||
// Not enough tokens for the next waiter. We could keep going (to try to
|
||||
// find a waiter with a smaller request), but under load that could cause
|
||||
// starvation for large requests; instead, we leave all remaining waiters
|
||||
// blocked.
|
||||
//
|
||||
// Consider a semaphore used as a read-write lock, with N tokens, N
|
||||
// readers, and one writer. Each reader can Acquire(1) to obtain a read
|
||||
// lock. The writer can Acquire(N) to obtain a write lock, excluding all
|
||||
// of the readers. If we allow the readers to jump ahead in the queue,
|
||||
// the writer will starve — there is always one token available for every
|
||||
// reader.
|
||||
break
|
||||
}
|
||||
|
||||
s.cur += w.n
|
||||
s.waiters.Remove(next)
|
||||
close(w.ready)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
Loading…
Reference in a new issue