diff --git a/libnetwork/drivers/overlay/joinleave.go b/libnetwork/drivers/overlay/joinleave.go index 6b14abd1c4..78014307b8 100644 --- a/libnetwork/drivers/overlay/joinleave.go +++ b/libnetwork/drivers/overlay/joinleave.go @@ -134,8 +134,6 @@ func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo, logrus.Errorf("overlay: Failed adding table entry to joininfo: %v", err) } - d.pushLocalEndpointEvent("join", nid, eid) - return nil } @@ -219,14 +217,6 @@ func (d *driver) Leave(nid, eid string) error { return types.InternalMaskableErrorf("could not find endpoint with id %s", eid) } - if d.notifyCh != nil { - d.notifyCh <- ovNotify{ - action: "leave", - nw: n, - ep: ep, - } - } - d.peerDelete(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac, net.ParseIP(d.advertiseAddress), true) n.leaveSandbox() diff --git a/libnetwork/drivers/overlay/ov_network.go b/libnetwork/drivers/overlay/ov_network.go index d98263c63c..c41d853de3 100644 --- a/libnetwork/drivers/overlay/ov_network.go +++ b/libnetwork/drivers/overlay/ov_network.go @@ -25,7 +25,6 @@ import ( "github.com/hashicorp/go-multierror" "github.com/sirupsen/logrus" "github.com/vishvananda/netlink" - "github.com/vishvananda/netlink/nl" "github.com/vishvananda/netns" "golang.org/x/sys/unix" ) @@ -60,7 +59,6 @@ type network struct { dbIndex uint64 dbExists bool sbox osl.Sandbox - nlSocket *nl.NetlinkSocket endpoints endpointTable driver *driver joinCnt int @@ -75,11 +73,11 @@ type network struct { func init() { // Lock main() to the initial thread to exclude the goroutines executing - // func (*network).watchMiss() or func setDefaultVLAN() from being - // scheduled onto that thread. Changes to the network namespace of the - // initial thread alter /proc/self/ns/net, which would break any code - // which (incorrectly) assumes that that file is a handle to the network - // namespace for the thread it is currently executing on. + // func setDefaultVLAN() from being scheduled onto that thread. Changes to + // the network namespace of the initial thread alter /proc/self/ns/net, + // which would break any code which (incorrectly) assumes that that file is + // a handle to the network namespace for the thread it is currently + // executing on. runtime.LockOSThread() } @@ -352,12 +350,6 @@ func (n *network) destroySandbox() { } } - // Close the netlink socket, this will also release the watchMiss goroutine that is using it - if n.nlSocket != nil { - n.nlSocket.Close() - n.nlSocket = nil - } - n.sbox.Destroy() n.sbox = nil } @@ -744,133 +736,9 @@ func (n *network) initSandbox(restore bool) error { // this is needed to let the peerAdd configure the sandbox n.sbox = sbox - // If we are in swarm mode, we don't need anymore the watchMiss routine. - // This will save 1 thread and 1 netlink socket per network - if !n.driver.isSerfAlive() { - return nil - } - - var nlSock *nl.NetlinkSocket - sbox.InvokeFunc(func() { - nlSock, err = nl.Subscribe(unix.NETLINK_ROUTE, unix.RTNLGRP_NEIGH) - if err != nil { - return - } - // set the receive timeout to not remain stuck on the RecvFrom if the fd gets closed - tv := unix.NsecToTimeval(soTimeout.Nanoseconds()) - err = nlSock.SetReceiveTimeout(&tv) - }) - n.nlSocket = nlSock - - if err == nil { - go n.watchMiss(nlSock, key) - } else { - logrus.Errorf("failed to subscribe to neighbor group netlink messages for overlay network %s in sbox %s: %v", - n.id, sbox.Key(), err) - } - return nil } -func (n *network) watchMiss(nlSock *nl.NetlinkSocket, nsPath string) { - // With the new version of the netlink library the deserialize function makes - // requests about the interface of the netlink message. This can succeed only - // if this go routine is in the target namespace. - origNs, err := netns.Get() - if err != nil { - logrus.WithError(err).Error("failed to get the initial network namespace") - return - } - defer origNs.Close() - newNs, err := netns.GetFromPath(nsPath) - if err != nil { - logrus.WithError(err).Errorf("failed to get the namespace %s", nsPath) - return - } - defer newNs.Close() - - runtime.LockOSThread() - if err = netns.Set(newNs); err != nil { - logrus.WithError(err).Errorf("failed to enter the namespace %s", nsPath) - runtime.UnlockOSThread() - return - } - defer func() { - if err := netns.Set(origNs); err != nil { - logrus.WithError(err).Error("failed to restore the thread's initial network namespace") - // The error is only fatal for the current thread. Keep this - // goroutine locked to the thread to make the runtime replace it - // with a clean thread once this goroutine terminates. - } else { - runtime.UnlockOSThread() - } - }() - for { - msgs, _, err := nlSock.Receive() - if err != nil { - n.Lock() - nlFd := nlSock.GetFd() - n.Unlock() - if nlFd == -1 { - // The netlink socket got closed, simply exit to not leak this goroutine - return - } - // When the receive timeout expires the receive will return EAGAIN - if err == unix.EAGAIN { - // we continue here to avoid spam for timeouts - continue - } - logrus.Errorf("Failed to receive from netlink: %v ", err) - continue - } - - for _, msg := range msgs { - if msg.Header.Type != unix.RTM_GETNEIGH && msg.Header.Type != unix.RTM_NEWNEIGH { - continue - } - - neigh, err := netlink.NeighDeserialize(msg.Data) - if err != nil { - logrus.Errorf("Failed to deserialize netlink ndmsg: %v", err) - continue - } - - var ( - ip net.IP - mac net.HardwareAddr - l2Miss, l3Miss bool - ) - if neigh.IP.To4() != nil { - ip = neigh.IP - l3Miss = true - } else if neigh.HardwareAddr != nil { - mac = []byte(neigh.HardwareAddr) - ip = net.IP(mac[2:]) - l2Miss = true - } else { - continue - } - - // Not any of the network's subnets. Ignore. - if !n.contains(ip) { - continue - } - - if neigh.State&(netlink.NUD_STALE|netlink.NUD_INCOMPLETE) == 0 { - continue - } - - logrus.Debugf("miss notification: dest IP %v, dest MAC %v", ip, mac) - mac, IPmask, vtep, err := n.driver.resolvePeer(n.id, ip) - if err != nil { - logrus.Errorf("could not resolve peer %q: %v", ip, err) - continue - } - n.driver.peerAdd(n.id, "dummy", ip, IPmask, mac, vtep, l2Miss, l3Miss, false) - } - } -} - func (d *driver) network(nid string) *network { d.Lock() n := d.networks[nid] @@ -1039,18 +907,6 @@ func (n *network) obtainVxlanID(s *subnet) error { return fmt.Errorf("no valid vxlan id and no datastore configured, cannot obtain vxlan id") } -// contains return true if the passed ip belongs to one the network's -// subnets -func (n *network) contains(ip net.IP) bool { - for _, s := range n.subnets { - if s.subnetIP.Contains(ip) { - return true - } - } - - return false -} - // getSubnetforIP returns the subnet to which the given IP belongs func (n *network) getSubnetforIP(ip *net.IPNet) *subnet { for _, s := range n.subnets { diff --git a/libnetwork/drivers/overlay/ov_serf.go b/libnetwork/drivers/overlay/ov_serf.go deleted file mode 100644 index 2b871184aa..0000000000 --- a/libnetwork/drivers/overlay/ov_serf.go +++ /dev/null @@ -1,230 +0,0 @@ -//go:build linux -// +build linux - -package overlay - -import ( - "fmt" - "net" - "strings" - "time" - - "github.com/hashicorp/serf/serf" - "github.com/sirupsen/logrus" -) - -type ovNotify struct { - action string - ep *endpoint - nw *network -} - -type logWriter struct{} - -func (l *logWriter) Write(p []byte) (int, error) { - str := string(p) - - switch { - case strings.Contains(str, "[WARN]"): - logrus.Warn(str) - case strings.Contains(str, "[DEBUG]"): - logrus.Debug(str) - case strings.Contains(str, "[INFO]"): - logrus.Info(str) - case strings.Contains(str, "[ERR]"): - logrus.Error(str) - } - - return len(p), nil -} - -func (d *driver) serfInit() error { - var err error - - config := serf.DefaultConfig() - config.Init() - config.MemberlistConfig.BindAddr = d.advertiseAddress - - d.eventCh = make(chan serf.Event, 4) - config.EventCh = d.eventCh - config.UserCoalescePeriod = 1 * time.Second - config.UserQuiescentPeriod = 50 * time.Millisecond - - config.LogOutput = &logWriter{} - config.MemberlistConfig.LogOutput = config.LogOutput - - s, err := serf.Create(config) - if err != nil { - return fmt.Errorf("failed to create cluster node: %v", err) - } - defer func() { - if err != nil { - s.Shutdown() - } - }() - - d.serfInstance = s - - d.notifyCh = make(chan ovNotify) - d.exitCh = make(chan chan struct{}) - - go d.startSerfLoop(d.eventCh, d.notifyCh, d.exitCh) - return nil -} - -func (d *driver) serfJoin(neighIP string) error { - if neighIP == "" { - return fmt.Errorf("no neighbor to join") - } - if _, err := d.serfInstance.Join([]string{neighIP}, true); err != nil { - return fmt.Errorf("Failed to join the cluster at neigh IP %s: %v", - neighIP, err) - } - return nil -} - -func (d *driver) notifyEvent(event ovNotify) { - ep := event.ep - - ePayload := fmt.Sprintf("%s %s %s %s", event.action, ep.addr.IP.String(), - net.IP(ep.addr.Mask).String(), ep.mac.String()) - eName := fmt.Sprintf("jl %s %s %s", d.serfInstance.LocalMember().Addr.String(), - event.nw.id, ep.id) - - if err := d.serfInstance.UserEvent(eName, []byte(ePayload), true); err != nil { - logrus.Errorf("Sending user event failed: %v\n", err) - } -} - -func (d *driver) processEvent(u serf.UserEvent) { - logrus.Debugf("Received user event name:%s, payload:%s LTime:%d \n", u.Name, - string(u.Payload), uint64(u.LTime)) - - var dummy, action, vtepStr, nid, eid, ipStr, maskStr, macStr string - if _, err := fmt.Sscan(u.Name, &dummy, &vtepStr, &nid, &eid); err != nil { - fmt.Printf("Failed to scan name string: %v\n", err) - } - - if _, err := fmt.Sscan(string(u.Payload), &action, - &ipStr, &maskStr, &macStr); err != nil { - fmt.Printf("Failed to scan value string: %v\n", err) - } - - logrus.Debugf("Parsed data = %s/%s/%s/%s/%s/%s\n", nid, eid, vtepStr, ipStr, maskStr, macStr) - - mac, err := net.ParseMAC(macStr) - if err != nil { - logrus.Errorf("Failed to parse mac: %v\n", err) - } - - if d.serfInstance.LocalMember().Addr.String() == vtepStr { - return - } - - switch action { - case "join": - d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr), false, false, false) - case "leave": - d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr), false) - } -} - -func (d *driver) processQuery(q *serf.Query) { - logrus.Debugf("Received query name:%s, payload:%s\n", q.Name, - string(q.Payload)) - - var nid, ipStr string - if _, err := fmt.Sscan(string(q.Payload), &nid, &ipStr); err != nil { - fmt.Printf("Failed to scan query payload string: %v\n", err) - } - - pKey, pEntry, err := d.peerDbSearch(nid, net.ParseIP(ipStr)) - if err != nil { - return - } - - logrus.Debugf("Sending peer query resp mac %v, mask %s, vtep %s", pKey.peerMac, net.IP(pEntry.peerIPMask).String(), pEntry.vtep) - q.Respond([]byte(fmt.Sprintf("%s %s %s", pKey.peerMac.String(), net.IP(pEntry.peerIPMask).String(), pEntry.vtep.String()))) -} - -func (d *driver) resolvePeer(nid string, peerIP net.IP) (net.HardwareAddr, net.IPMask, net.IP, error) { - if d.serfInstance == nil { - return nil, nil, nil, fmt.Errorf("could not resolve peer: serf instance not initialized") - } - - qPayload := fmt.Sprintf("%s %s", nid, peerIP.String()) - resp, err := d.serfInstance.Query("peerlookup", []byte(qPayload), nil) - if err != nil { - return nil, nil, nil, fmt.Errorf("resolving peer by querying the cluster failed: %v", err) - } - - respCh := resp.ResponseCh() - select { - case r := <-respCh: - var macStr, maskStr, vtepStr string - if _, err := fmt.Sscan(string(r.Payload), &macStr, &maskStr, &vtepStr); err != nil { - return nil, nil, nil, fmt.Errorf("bad response %q for the resolve query: %v", string(r.Payload), err) - } - - mac, err := net.ParseMAC(macStr) - if err != nil { - return nil, nil, nil, fmt.Errorf("failed to parse mac: %v", err) - } - - logrus.Debugf("Received peer query response, mac %s, vtep %s, mask %s", macStr, vtepStr, maskStr) - return mac, net.IPMask(net.ParseIP(maskStr).To4()), net.ParseIP(vtepStr), nil - - case <-time.After(time.Second): - return nil, nil, nil, fmt.Errorf("timed out resolving peer by querying the cluster") - } -} - -func (d *driver) startSerfLoop(eventCh chan serf.Event, notifyCh chan ovNotify, exitCh chan chan struct{}) { - for { - select { - case notify, ok := <-notifyCh: - if !ok { - break - } - - d.notifyEvent(notify) - case ch, ok := <-exitCh: - if !ok { - break - } - - if err := d.serfInstance.Leave(); err != nil { - logrus.Errorf("failed leaving the cluster: %v\n", err) - } - - d.serfInstance.Shutdown() - close(ch) - return - case e, ok := <-eventCh: - if !ok { - break - } - - if e.EventType() == serf.EventQuery { - d.processQuery(e.(*serf.Query)) - break - } - - u, ok := e.(serf.UserEvent) - if !ok { - break - } - d.processEvent(u) - } - } -} - -func (d *driver) isSerfAlive() bool { - d.Lock() - serfInstance := d.serfInstance - d.Unlock() - if serfInstance == nil || serfInstance.State() != serf.SerfAlive { - return false - } - return true -} diff --git a/libnetwork/drivers/overlay/overlay.go b/libnetwork/drivers/overlay/overlay.go index a97578a1c6..3d3cb3605f 100644 --- a/libnetwork/drivers/overlay/overlay.go +++ b/libnetwork/drivers/overlay/overlay.go @@ -17,7 +17,6 @@ import ( "github.com/docker/docker/libnetwork/netlabel" "github.com/docker/docker/libnetwork/osl" "github.com/docker/docker/libnetwork/types" - "github.com/hashicorp/serf/serf" "github.com/sirupsen/logrus" ) @@ -32,21 +31,15 @@ const ( var initVxlanIdm = make(chan (bool), 1) type driver struct { - eventCh chan serf.Event - notifyCh chan ovNotify - exitCh chan chan struct{} bindAddress string advertiseAddress string - neighIP string config map[string]interface{} peerDb peerNetworkMap secMap *encrMap - serfInstance *serf.Serf networks networkTable localStore datastore.DataStore vxlanIdm *idm.Idm initOS sync.Once - joinOnce sync.Once localJoinOnce sync.Once keys []*key peerOpMu sync.Mutex @@ -139,19 +132,6 @@ func (d *driver) restoreEndpoints() error { return nil } -// Fini cleans up the driver resources -func Fini(drv driverapi.Driver) { - d := drv.(*driver) - - if d.exitCh != nil { - waitCh := make(chan struct{}) - - d.exitCh <- waitCh - - <-waitCh - } -} - func (d *driver) configure() error { // Apply OS specific kernel configs if needed d.initOS.Do(applyOStweaks) @@ -168,7 +148,7 @@ func (d *driver) IsBuiltIn() bool { } func (d *driver) nodeJoin(advertiseAddress, bindAddress string, self bool) { - if self && !d.isSerfAlive() { + if self { d.Lock() d.advertiseAddress = advertiseAddress d.bindAddress = bindAddress @@ -180,52 +160,6 @@ func (d *driver) nodeJoin(advertiseAddress, bindAddress string, self bool) { d.peerDBUpdateSelf() }) } - - d.Lock() - if !self { - d.neighIP = advertiseAddress - } - neighIP := d.neighIP - d.Unlock() - - if d.serfInstance != nil && neighIP != "" { - var err error - d.joinOnce.Do(func() { - err = d.serfJoin(neighIP) - if err == nil { - d.pushLocalDb() - } - }) - if err != nil { - logrus.Errorf("joining serf neighbor %s failed: %v", advertiseAddress, err) - d.Lock() - d.joinOnce = sync.Once{} - d.Unlock() - return - } - } -} - -func (d *driver) pushLocalEndpointEvent(action, nid, eid string) { - n := d.network(nid) - if n == nil { - logrus.Debugf("Error pushing local endpoint event for network %s", nid) - return - } - ep := n.endpoint(eid) - if ep == nil { - logrus.Debugf("Error pushing local endpoint event for ep %s / %s", nid, eid) - return - } - - if !d.isSerfAlive() { - return - } - d.notifyCh <- ovNotify{ - action: "join", - nw: n, - ep: ep, - } } // DiscoverNew is a notification for a new discovery event, such as a new node joining a cluster diff --git a/libnetwork/drivers/overlay/overlay_test.go b/libnetwork/drivers/overlay/overlay_test.go index 755ea3fad0..c22e3f2fe2 100644 --- a/libnetwork/drivers/overlay/overlay_test.go +++ b/libnetwork/drivers/overlay/overlay_test.go @@ -4,24 +4,11 @@ package overlay import ( - "context" - "fmt" - "net" - "os" - "path/filepath" - "syscall" "testing" - "time" - "github.com/docker/docker/libnetwork/datastore" - "github.com/docker/docker/libnetwork/discoverapi" "github.com/docker/docker/libnetwork/driverapi" - "github.com/docker/docker/libnetwork/netlabel" "github.com/docker/docker/pkg/plugingetter" - "github.com/docker/libkv/store" "github.com/docker/libkv/store/boltdb" - "github.com/vishvananda/netlink/nl" - "golang.org/x/sys/unix" ) func init() { @@ -35,64 +22,6 @@ type driverTester struct { const testNetworkType = "overlay" -func setupDriver(t *testing.T) *driverTester { - dt := &driverTester{t: t} - config := make(map[string]interface{}) - - tmp, err := os.CreateTemp(t.TempDir(), "libnetwork-") - if err != nil { - t.Fatalf("Error creating temp file: %v", err) - } - err = tmp.Close() - if err != nil { - t.Fatalf("Error closing temp file: %v", err) - } - defaultPrefix := filepath.Join(os.TempDir(), "libnetwork", "test", "overlay") - - config[netlabel.LocalKVClient] = discoverapi.DatastoreConfigData{ - Scope: datastore.LocalScope, - Provider: "boltdb", - Address: filepath.Join(defaultPrefix, filepath.Base(tmp.Name())), - Config: &store.Config{ - Bucket: "libnetwork", - ConnectionTimeout: 3 * time.Second, - }, - } - - if err := Register(dt, config); err != nil { - t.Fatal(err) - } - - iface, err := net.InterfaceByName("eth0") - if err != nil { - t.Fatal(err) - } - addrs, err := iface.Addrs() - if err != nil || len(addrs) == 0 { - t.Fatal(err) - } - data := discoverapi.NodeDiscoveryData{ - Address: addrs[0].String(), - Self: true, - } - dt.d.DiscoverNew(discoverapi.NodeDiscovery, data) - return dt -} - -func cleanupDriver(t *testing.T, dt *driverTester) { - ch := make(chan struct{}) - go func() { - Fini(dt.d) - close(ch) - }() - - select { - case <-ch: - case <-time.After(10 * time.Second): - t.Fatal("test timed out because Fini() did not return on time") - } -} - func (dt *driverTester) GetPluginGetter() plugingetter.PluginGetter { return nil } @@ -119,15 +48,6 @@ func TestOverlayInit(t *testing.T) { } } -func TestOverlayFiniWithoutConfig(t *testing.T) { - dt := &driverTester{t: t} - if err := Register(dt, nil); err != nil { - t.Fatal(err) - } - - cleanupDriver(t, dt) -} - func TestOverlayType(t *testing.T) { dt := &driverTester{t: t} if err := Register(dt, nil); err != nil { @@ -139,36 +59,3 @@ func TestOverlayType(t *testing.T) { dt.d.Type()) } } - -// Test that the netlink socket close unblock the watchMiss to avoid deadlock -func TestNetlinkSocket(t *testing.T) { - // This is the same code used by the overlay driver to create the netlink interface - // for the watch miss - nlSock, err := nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_NEIGH) - if err != nil { - t.Fatal() - } - // set the receive timeout to not remain stuck on the RecvFrom if the fd gets closed - tv := unix.NsecToTimeval(soTimeout.Nanoseconds()) - err = nlSock.SetReceiveTimeout(&tv) - if err != nil { - t.Fatal() - } - n := &network{id: "testnetid"} - ch := make(chan error) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - go func() { - n.watchMiss(nlSock, fmt.Sprintf("/proc/%d/task/%d/ns/net", os.Getpid(), syscall.Gettid())) - ch <- nil - }() - time.Sleep(5 * time.Second) - nlSock.Close() - select { - case <-ch: - case <-ctx.Done(): - { - t.Fatalf("Timeout expired") - } - } -} diff --git a/libnetwork/drivers/overlay/peerdb.go b/libnetwork/drivers/overlay/peerdb.go index 255fea1b67..cae9f500c3 100644 --- a/libnetwork/drivers/overlay/peerdb.go +++ b/libnetwork/drivers/overlay/peerdb.go @@ -434,15 +434,6 @@ func (d *driver) peerFlushOp(nid string) error { return nil } -func (d *driver) pushLocalDb() { - d.peerDbWalk(func(nid string, pKey *peerKey, pEntry *peerEntry) bool { - if pEntry.isLocal { - d.pushLocalEndpointEvent("join", nid, pEntry.eid) - } - return false - }) -} - func (d *driver) peerDBUpdateSelf() { d.peerDbWalk(func(nid string, pkey *peerKey, pEntry *peerEntry) bool { if pEntry.isLocal {