libnetwork/overlay: remove Serf-based clustering
Prior to 0fa873c
, the serf-based event loop was started when a global
store was available. Since there's no more global store, this event loop
and all its associated code is dead.
Most dead code detected by golangci-lint in prior commits is now gone.
Signed-off-by: Albin Kerouanton <albinker@gmail.com>
This commit is contained in:
parent
644e3d4cdb
commit
e251837445
6 changed files with 6 additions and 578 deletions
|
@ -134,8 +134,6 @@ func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo,
|
|||
logrus.Errorf("overlay: Failed adding table entry to joininfo: %v", err)
|
||||
}
|
||||
|
||||
d.pushLocalEndpointEvent("join", nid, eid)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -219,14 +217,6 @@ func (d *driver) Leave(nid, eid string) error {
|
|||
return types.InternalMaskableErrorf("could not find endpoint with id %s", eid)
|
||||
}
|
||||
|
||||
if d.notifyCh != nil {
|
||||
d.notifyCh <- ovNotify{
|
||||
action: "leave",
|
||||
nw: n,
|
||||
ep: ep,
|
||||
}
|
||||
}
|
||||
|
||||
d.peerDelete(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true)
|
||||
|
||||
n.leaveSandbox()
|
||||
|
|
|
@ -25,7 +25,6 @@ import (
|
|||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/vishvananda/netlink"
|
||||
"github.com/vishvananda/netlink/nl"
|
||||
"github.com/vishvananda/netns"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
@ -60,7 +59,6 @@ type network struct {
|
|||
dbIndex uint64
|
||||
dbExists bool
|
||||
sbox osl.Sandbox
|
||||
nlSocket *nl.NetlinkSocket
|
||||
endpoints endpointTable
|
||||
driver *driver
|
||||
joinCnt int
|
||||
|
@ -75,11 +73,11 @@ type network struct {
|
|||
|
||||
func init() {
|
||||
// Lock main() to the initial thread to exclude the goroutines executing
|
||||
// func (*network).watchMiss() or func setDefaultVLAN() from being
|
||||
// scheduled onto that thread. Changes to the network namespace of the
|
||||
// initial thread alter /proc/self/ns/net, which would break any code
|
||||
// which (incorrectly) assumes that that file is a handle to the network
|
||||
// namespace for the thread it is currently executing on.
|
||||
// func setDefaultVLAN() from being scheduled onto that thread. Changes to
|
||||
// the network namespace of the initial thread alter /proc/self/ns/net,
|
||||
// which would break any code which (incorrectly) assumes that that file is
|
||||
// a handle to the network namespace for the thread it is currently
|
||||
// executing on.
|
||||
runtime.LockOSThread()
|
||||
}
|
||||
|
||||
|
@ -352,12 +350,6 @@ func (n *network) destroySandbox() {
|
|||
}
|
||||
}
|
||||
|
||||
// Close the netlink socket, this will also release the watchMiss goroutine that is using it
|
||||
if n.nlSocket != nil {
|
||||
n.nlSocket.Close()
|
||||
n.nlSocket = nil
|
||||
}
|
||||
|
||||
n.sbox.Destroy()
|
||||
n.sbox = nil
|
||||
}
|
||||
|
@ -744,133 +736,9 @@ func (n *network) initSandbox(restore bool) error {
|
|||
// this is needed to let the peerAdd configure the sandbox
|
||||
n.sbox = sbox
|
||||
|
||||
// If we are in swarm mode, we don't need anymore the watchMiss routine.
|
||||
// This will save 1 thread and 1 netlink socket per network
|
||||
if !n.driver.isSerfAlive() {
|
||||
return nil
|
||||
}
|
||||
|
||||
var nlSock *nl.NetlinkSocket
|
||||
sbox.InvokeFunc(func() {
|
||||
nlSock, err = nl.Subscribe(unix.NETLINK_ROUTE, unix.RTNLGRP_NEIGH)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// set the receive timeout to not remain stuck on the RecvFrom if the fd gets closed
|
||||
tv := unix.NsecToTimeval(soTimeout.Nanoseconds())
|
||||
err = nlSock.SetReceiveTimeout(&tv)
|
||||
})
|
||||
n.nlSocket = nlSock
|
||||
|
||||
if err == nil {
|
||||
go n.watchMiss(nlSock, key)
|
||||
} else {
|
||||
logrus.Errorf("failed to subscribe to neighbor group netlink messages for overlay network %s in sbox %s: %v",
|
||||
n.id, sbox.Key(), err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *network) watchMiss(nlSock *nl.NetlinkSocket, nsPath string) {
|
||||
// With the new version of the netlink library the deserialize function makes
|
||||
// requests about the interface of the netlink message. This can succeed only
|
||||
// if this go routine is in the target namespace.
|
||||
origNs, err := netns.Get()
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("failed to get the initial network namespace")
|
||||
return
|
||||
}
|
||||
defer origNs.Close()
|
||||
newNs, err := netns.GetFromPath(nsPath)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Errorf("failed to get the namespace %s", nsPath)
|
||||
return
|
||||
}
|
||||
defer newNs.Close()
|
||||
|
||||
runtime.LockOSThread()
|
||||
if err = netns.Set(newNs); err != nil {
|
||||
logrus.WithError(err).Errorf("failed to enter the namespace %s", nsPath)
|
||||
runtime.UnlockOSThread()
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if err := netns.Set(origNs); err != nil {
|
||||
logrus.WithError(err).Error("failed to restore the thread's initial network namespace")
|
||||
// The error is only fatal for the current thread. Keep this
|
||||
// goroutine locked to the thread to make the runtime replace it
|
||||
// with a clean thread once this goroutine terminates.
|
||||
} else {
|
||||
runtime.UnlockOSThread()
|
||||
}
|
||||
}()
|
||||
for {
|
||||
msgs, _, err := nlSock.Receive()
|
||||
if err != nil {
|
||||
n.Lock()
|
||||
nlFd := nlSock.GetFd()
|
||||
n.Unlock()
|
||||
if nlFd == -1 {
|
||||
// The netlink socket got closed, simply exit to not leak this goroutine
|
||||
return
|
||||
}
|
||||
// When the receive timeout expires the receive will return EAGAIN
|
||||
if err == unix.EAGAIN {
|
||||
// we continue here to avoid spam for timeouts
|
||||
continue
|
||||
}
|
||||
logrus.Errorf("Failed to receive from netlink: %v ", err)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, msg := range msgs {
|
||||
if msg.Header.Type != unix.RTM_GETNEIGH && msg.Header.Type != unix.RTM_NEWNEIGH {
|
||||
continue
|
||||
}
|
||||
|
||||
neigh, err := netlink.NeighDeserialize(msg.Data)
|
||||
if err != nil {
|
||||
logrus.Errorf("Failed to deserialize netlink ndmsg: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
var (
|
||||
ip net.IP
|
||||
mac net.HardwareAddr
|
||||
l2Miss, l3Miss bool
|
||||
)
|
||||
if neigh.IP.To4() != nil {
|
||||
ip = neigh.IP
|
||||
l3Miss = true
|
||||
} else if neigh.HardwareAddr != nil {
|
||||
mac = []byte(neigh.HardwareAddr)
|
||||
ip = net.IP(mac[2:])
|
||||
l2Miss = true
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
|
||||
// Not any of the network's subnets. Ignore.
|
||||
if !n.contains(ip) {
|
||||
continue
|
||||
}
|
||||
|
||||
if neigh.State&(netlink.NUD_STALE|netlink.NUD_INCOMPLETE) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
logrus.Debugf("miss notification: dest IP %v, dest MAC %v", ip, mac)
|
||||
mac, IPmask, vtep, err := n.driver.resolvePeer(n.id, ip)
|
||||
if err != nil {
|
||||
logrus.Errorf("could not resolve peer %q: %v", ip, err)
|
||||
continue
|
||||
}
|
||||
n.driver.peerAdd(n.id, "dummy", ip, IPmask, mac, vtep, l2Miss, l3Miss, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *driver) network(nid string) *network {
|
||||
d.Lock()
|
||||
n := d.networks[nid]
|
||||
|
@ -1039,18 +907,6 @@ func (n *network) obtainVxlanID(s *subnet) error {
|
|||
return fmt.Errorf("no valid vxlan id and no datastore configured, cannot obtain vxlan id")
|
||||
}
|
||||
|
||||
// contains return true if the passed ip belongs to one the network's
|
||||
// subnets
|
||||
func (n *network) contains(ip net.IP) bool {
|
||||
for _, s := range n.subnets {
|
||||
if s.subnetIP.Contains(ip) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// getSubnetforIP returns the subnet to which the given IP belongs
|
||||
func (n *network) getSubnetforIP(ip *net.IPNet) *subnet {
|
||||
for _, s := range n.subnets {
|
||||
|
|
|
@ -1,230 +0,0 @@
|
|||
//go:build linux
|
||||
// +build linux
|
||||
|
||||
package overlay
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type ovNotify struct {
|
||||
action string
|
||||
ep *endpoint
|
||||
nw *network
|
||||
}
|
||||
|
||||
type logWriter struct{}
|
||||
|
||||
func (l *logWriter) Write(p []byte) (int, error) {
|
||||
str := string(p)
|
||||
|
||||
switch {
|
||||
case strings.Contains(str, "[WARN]"):
|
||||
logrus.Warn(str)
|
||||
case strings.Contains(str, "[DEBUG]"):
|
||||
logrus.Debug(str)
|
||||
case strings.Contains(str, "[INFO]"):
|
||||
logrus.Info(str)
|
||||
case strings.Contains(str, "[ERR]"):
|
||||
logrus.Error(str)
|
||||
}
|
||||
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (d *driver) serfInit() error {
|
||||
var err error
|
||||
|
||||
config := serf.DefaultConfig()
|
||||
config.Init()
|
||||
config.MemberlistConfig.BindAddr = d.advertiseAddress
|
||||
|
||||
d.eventCh = make(chan serf.Event, 4)
|
||||
config.EventCh = d.eventCh
|
||||
config.UserCoalescePeriod = 1 * time.Second
|
||||
config.UserQuiescentPeriod = 50 * time.Millisecond
|
||||
|
||||
config.LogOutput = &logWriter{}
|
||||
config.MemberlistConfig.LogOutput = config.LogOutput
|
||||
|
||||
s, err := serf.Create(config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create cluster node: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
s.Shutdown()
|
||||
}
|
||||
}()
|
||||
|
||||
d.serfInstance = s
|
||||
|
||||
d.notifyCh = make(chan ovNotify)
|
||||
d.exitCh = make(chan chan struct{})
|
||||
|
||||
go d.startSerfLoop(d.eventCh, d.notifyCh, d.exitCh)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *driver) serfJoin(neighIP string) error {
|
||||
if neighIP == "" {
|
||||
return fmt.Errorf("no neighbor to join")
|
||||
}
|
||||
if _, err := d.serfInstance.Join([]string{neighIP}, true); err != nil {
|
||||
return fmt.Errorf("Failed to join the cluster at neigh IP %s: %v",
|
||||
neighIP, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *driver) notifyEvent(event ovNotify) {
|
||||
ep := event.ep
|
||||
|
||||
ePayload := fmt.Sprintf("%s %s %s %s", event.action, ep.addr.IP.String(),
|
||||
net.IP(ep.addr.Mask).String(), ep.mac.String())
|
||||
eName := fmt.Sprintf("jl %s %s %s", d.serfInstance.LocalMember().Addr.String(),
|
||||
event.nw.id, ep.id)
|
||||
|
||||
if err := d.serfInstance.UserEvent(eName, []byte(ePayload), true); err != nil {
|
||||
logrus.Errorf("Sending user event failed: %v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *driver) processEvent(u serf.UserEvent) {
|
||||
logrus.Debugf("Received user event name:%s, payload:%s LTime:%d \n", u.Name,
|
||||
string(u.Payload), uint64(u.LTime))
|
||||
|
||||
var dummy, action, vtepStr, nid, eid, ipStr, maskStr, macStr string
|
||||
if _, err := fmt.Sscan(u.Name, &dummy, &vtepStr, &nid, &eid); err != nil {
|
||||
fmt.Printf("Failed to scan name string: %v\n", err)
|
||||
}
|
||||
|
||||
if _, err := fmt.Sscan(string(u.Payload), &action,
|
||||
&ipStr, &maskStr, &macStr); err != nil {
|
||||
fmt.Printf("Failed to scan value string: %v\n", err)
|
||||
}
|
||||
|
||||
logrus.Debugf("Parsed data = %s/%s/%s/%s/%s/%s\n", nid, eid, vtepStr, ipStr, maskStr, macStr)
|
||||
|
||||
mac, err := net.ParseMAC(macStr)
|
||||
if err != nil {
|
||||
logrus.Errorf("Failed to parse mac: %v\n", err)
|
||||
}
|
||||
|
||||
if d.serfInstance.LocalMember().Addr.String() == vtepStr {
|
||||
return
|
||||
}
|
||||
|
||||
switch action {
|
||||
case "join":
|
||||
d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr), false, false, false)
|
||||
case "leave":
|
||||
d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr), false)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *driver) processQuery(q *serf.Query) {
|
||||
logrus.Debugf("Received query name:%s, payload:%s\n", q.Name,
|
||||
string(q.Payload))
|
||||
|
||||
var nid, ipStr string
|
||||
if _, err := fmt.Sscan(string(q.Payload), &nid, &ipStr); err != nil {
|
||||
fmt.Printf("Failed to scan query payload string: %v\n", err)
|
||||
}
|
||||
|
||||
pKey, pEntry, err := d.peerDbSearch(nid, net.ParseIP(ipStr))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
logrus.Debugf("Sending peer query resp mac %v, mask %s, vtep %s", pKey.peerMac, net.IP(pEntry.peerIPMask).String(), pEntry.vtep)
|
||||
q.Respond([]byte(fmt.Sprintf("%s %s %s", pKey.peerMac.String(), net.IP(pEntry.peerIPMask).String(), pEntry.vtep.String())))
|
||||
}
|
||||
|
||||
func (d *driver) resolvePeer(nid string, peerIP net.IP) (net.HardwareAddr, net.IPMask, net.IP, error) {
|
||||
if d.serfInstance == nil {
|
||||
return nil, nil, nil, fmt.Errorf("could not resolve peer: serf instance not initialized")
|
||||
}
|
||||
|
||||
qPayload := fmt.Sprintf("%s %s", nid, peerIP.String())
|
||||
resp, err := d.serfInstance.Query("peerlookup", []byte(qPayload), nil)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("resolving peer by querying the cluster failed: %v", err)
|
||||
}
|
||||
|
||||
respCh := resp.ResponseCh()
|
||||
select {
|
||||
case r := <-respCh:
|
||||
var macStr, maskStr, vtepStr string
|
||||
if _, err := fmt.Sscan(string(r.Payload), &macStr, &maskStr, &vtepStr); err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("bad response %q for the resolve query: %v", string(r.Payload), err)
|
||||
}
|
||||
|
||||
mac, err := net.ParseMAC(macStr)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("failed to parse mac: %v", err)
|
||||
}
|
||||
|
||||
logrus.Debugf("Received peer query response, mac %s, vtep %s, mask %s", macStr, vtepStr, maskStr)
|
||||
return mac, net.IPMask(net.ParseIP(maskStr).To4()), net.ParseIP(vtepStr), nil
|
||||
|
||||
case <-time.After(time.Second):
|
||||
return nil, nil, nil, fmt.Errorf("timed out resolving peer by querying the cluster")
|
||||
}
|
||||
}
|
||||
|
||||
func (d *driver) startSerfLoop(eventCh chan serf.Event, notifyCh chan ovNotify, exitCh chan chan struct{}) {
|
||||
for {
|
||||
select {
|
||||
case notify, ok := <-notifyCh:
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
d.notifyEvent(notify)
|
||||
case ch, ok := <-exitCh:
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
if err := d.serfInstance.Leave(); err != nil {
|
||||
logrus.Errorf("failed leaving the cluster: %v\n", err)
|
||||
}
|
||||
|
||||
d.serfInstance.Shutdown()
|
||||
close(ch)
|
||||
return
|
||||
case e, ok := <-eventCh:
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
if e.EventType() == serf.EventQuery {
|
||||
d.processQuery(e.(*serf.Query))
|
||||
break
|
||||
}
|
||||
|
||||
u, ok := e.(serf.UserEvent)
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
d.processEvent(u)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *driver) isSerfAlive() bool {
|
||||
d.Lock()
|
||||
serfInstance := d.serfInstance
|
||||
d.Unlock()
|
||||
if serfInstance == nil || serfInstance.State() != serf.SerfAlive {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
|
@ -17,7 +17,6 @@ import (
|
|||
"github.com/docker/docker/libnetwork/netlabel"
|
||||
"github.com/docker/docker/libnetwork/osl"
|
||||
"github.com/docker/docker/libnetwork/types"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
|
@ -32,21 +31,15 @@ const (
|
|||
var initVxlanIdm = make(chan (bool), 1)
|
||||
|
||||
type driver struct {
|
||||
eventCh chan serf.Event
|
||||
notifyCh chan ovNotify
|
||||
exitCh chan chan struct{}
|
||||
bindAddress string
|
||||
advertiseAddress string
|
||||
neighIP string
|
||||
config map[string]interface{}
|
||||
peerDb peerNetworkMap
|
||||
secMap *encrMap
|
||||
serfInstance *serf.Serf
|
||||
networks networkTable
|
||||
localStore datastore.DataStore
|
||||
vxlanIdm *idm.Idm
|
||||
initOS sync.Once
|
||||
joinOnce sync.Once
|
||||
localJoinOnce sync.Once
|
||||
keys []*key
|
||||
peerOpMu sync.Mutex
|
||||
|
@ -139,19 +132,6 @@ func (d *driver) restoreEndpoints() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Fini cleans up the driver resources
|
||||
func Fini(drv driverapi.Driver) {
|
||||
d := drv.(*driver)
|
||||
|
||||
if d.exitCh != nil {
|
||||
waitCh := make(chan struct{})
|
||||
|
||||
d.exitCh <- waitCh
|
||||
|
||||
<-waitCh
|
||||
}
|
||||
}
|
||||
|
||||
func (d *driver) configure() error {
|
||||
// Apply OS specific kernel configs if needed
|
||||
d.initOS.Do(applyOStweaks)
|
||||
|
@ -168,7 +148,7 @@ func (d *driver) IsBuiltIn() bool {
|
|||
}
|
||||
|
||||
func (d *driver) nodeJoin(advertiseAddress, bindAddress string, self bool) {
|
||||
if self && !d.isSerfAlive() {
|
||||
if self {
|
||||
d.Lock()
|
||||
d.advertiseAddress = advertiseAddress
|
||||
d.bindAddress = bindAddress
|
||||
|
@ -180,52 +160,6 @@ func (d *driver) nodeJoin(advertiseAddress, bindAddress string, self bool) {
|
|||
d.peerDBUpdateSelf()
|
||||
})
|
||||
}
|
||||
|
||||
d.Lock()
|
||||
if !self {
|
||||
d.neighIP = advertiseAddress
|
||||
}
|
||||
neighIP := d.neighIP
|
||||
d.Unlock()
|
||||
|
||||
if d.serfInstance != nil && neighIP != "" {
|
||||
var err error
|
||||
d.joinOnce.Do(func() {
|
||||
err = d.serfJoin(neighIP)
|
||||
if err == nil {
|
||||
d.pushLocalDb()
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
logrus.Errorf("joining serf neighbor %s failed: %v", advertiseAddress, err)
|
||||
d.Lock()
|
||||
d.joinOnce = sync.Once{}
|
||||
d.Unlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *driver) pushLocalEndpointEvent(action, nid, eid string) {
|
||||
n := d.network(nid)
|
||||
if n == nil {
|
||||
logrus.Debugf("Error pushing local endpoint event for network %s", nid)
|
||||
return
|
||||
}
|
||||
ep := n.endpoint(eid)
|
||||
if ep == nil {
|
||||
logrus.Debugf("Error pushing local endpoint event for ep %s / %s", nid, eid)
|
||||
return
|
||||
}
|
||||
|
||||
if !d.isSerfAlive() {
|
||||
return
|
||||
}
|
||||
d.notifyCh <- ovNotify{
|
||||
action: "join",
|
||||
nw: n,
|
||||
ep: ep,
|
||||
}
|
||||
}
|
||||
|
||||
// DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster
|
||||
|
|
|
@ -4,24 +4,11 @@
|
|||
package overlay
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/libnetwork/datastore"
|
||||
"github.com/docker/docker/libnetwork/discoverapi"
|
||||
"github.com/docker/docker/libnetwork/driverapi"
|
||||
"github.com/docker/docker/libnetwork/netlabel"
|
||||
"github.com/docker/docker/pkg/plugingetter"
|
||||
"github.com/docker/libkv/store"
|
||||
"github.com/docker/libkv/store/boltdb"
|
||||
"github.com/vishvananda/netlink/nl"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -35,64 +22,6 @@ type driverTester struct {
|
|||
|
||||
const testNetworkType = "overlay"
|
||||
|
||||
func setupDriver(t *testing.T) *driverTester {
|
||||
dt := &driverTester{t: t}
|
||||
config := make(map[string]interface{})
|
||||
|
||||
tmp, err := os.CreateTemp(t.TempDir(), "libnetwork-")
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating temp file: %v", err)
|
||||
}
|
||||
err = tmp.Close()
|
||||
if err != nil {
|
||||
t.Fatalf("Error closing temp file: %v", err)
|
||||
}
|
||||
defaultPrefix := filepath.Join(os.TempDir(), "libnetwork", "test", "overlay")
|
||||
|
||||
config[netlabel.LocalKVClient] = discoverapi.DatastoreConfigData{
|
||||
Scope: datastore.LocalScope,
|
||||
Provider: "boltdb",
|
||||
Address: filepath.Join(defaultPrefix, filepath.Base(tmp.Name())),
|
||||
Config: &store.Config{
|
||||
Bucket: "libnetwork",
|
||||
ConnectionTimeout: 3 * time.Second,
|
||||
},
|
||||
}
|
||||
|
||||
if err := Register(dt, config); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
iface, err := net.InterfaceByName("eth0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
addrs, err := iface.Addrs()
|
||||
if err != nil || len(addrs) == 0 {
|
||||
t.Fatal(err)
|
||||
}
|
||||
data := discoverapi.NodeDiscoveryData{
|
||||
Address: addrs[0].String(),
|
||||
Self: true,
|
||||
}
|
||||
dt.d.DiscoverNew(discoverapi.NodeDiscovery, data)
|
||||
return dt
|
||||
}
|
||||
|
||||
func cleanupDriver(t *testing.T, dt *driverTester) {
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
Fini(dt.d)
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ch:
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("test timed out because Fini() did not return on time")
|
||||
}
|
||||
}
|
||||
|
||||
func (dt *driverTester) GetPluginGetter() plugingetter.PluginGetter {
|
||||
return nil
|
||||
}
|
||||
|
@ -119,15 +48,6 @@ func TestOverlayInit(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestOverlayFiniWithoutConfig(t *testing.T) {
|
||||
dt := &driverTester{t: t}
|
||||
if err := Register(dt, nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cleanupDriver(t, dt)
|
||||
}
|
||||
|
||||
func TestOverlayType(t *testing.T) {
|
||||
dt := &driverTester{t: t}
|
||||
if err := Register(dt, nil); err != nil {
|
||||
|
@ -139,36 +59,3 @@ func TestOverlayType(t *testing.T) {
|
|||
dt.d.Type())
|
||||
}
|
||||
}
|
||||
|
||||
// Test that the netlink socket close unblock the watchMiss to avoid deadlock
|
||||
func TestNetlinkSocket(t *testing.T) {
|
||||
// This is the same code used by the overlay driver to create the netlink interface
|
||||
// for the watch miss
|
||||
nlSock, err := nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_NEIGH)
|
||||
if err != nil {
|
||||
t.Fatal()
|
||||
}
|
||||
// set the receive timeout to not remain stuck on the RecvFrom if the fd gets closed
|
||||
tv := unix.NsecToTimeval(soTimeout.Nanoseconds())
|
||||
err = nlSock.SetReceiveTimeout(&tv)
|
||||
if err != nil {
|
||||
t.Fatal()
|
||||
}
|
||||
n := &network{id: "testnetid"}
|
||||
ch := make(chan error)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
go func() {
|
||||
n.watchMiss(nlSock, fmt.Sprintf("/proc/%d/task/%d/ns/net", os.Getpid(), syscall.Gettid()))
|
||||
ch <- nil
|
||||
}()
|
||||
time.Sleep(5 * time.Second)
|
||||
nlSock.Close()
|
||||
select {
|
||||
case <-ch:
|
||||
case <-ctx.Done():
|
||||
{
|
||||
t.Fatalf("Timeout expired")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -434,15 +434,6 @@ func (d *driver) peerFlushOp(nid string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (d *driver) pushLocalDb() {
|
||||
d.peerDbWalk(func(nid string, pKey *peerKey, pEntry *peerEntry) bool {
|
||||
if pEntry.isLocal {
|
||||
d.pushLocalEndpointEvent("join", nid, pEntry.eid)
|
||||
}
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
func (d *driver) peerDBUpdateSelf() {
|
||||
d.peerDbWalk(func(nid string, pkey *peerKey, pEntry *peerEntry) bool {
|
||||
if pEntry.isLocal {
|
||||
|
|
Loading…
Add table
Reference in a new issue