Browse Source

Merge pull request #32283 from aboch/clearingress

Daemon to take care of ingress cleanup on cluster leave and graceful shutdown
Vincent Demeester 8 years ago
parent
commit
87562af45b

+ 2 - 2
daemon/cluster/executor/backend.go

@@ -27,8 +27,8 @@ type Backend interface {
 	CreateManagedNetwork(clustertypes.NetworkCreateRequest) error
 	DeleteManagedNetwork(name string) error
 	FindNetwork(idName string) (libnetwork.Network, error)
-	SetupIngress(req clustertypes.NetworkCreateRequest, nodeIP string) error
-	ReleaseIngress() error
+	SetupIngress(clustertypes.NetworkCreateRequest, string) (<-chan struct{}, error)
+	ReleaseIngress() (<-chan struct{}, error)
 	PullImage(ctx context.Context, image, tag string, metaHeaders map[string][]string, authConfig *types.AuthConfig, outStream io.Writer) error
 	CreateManagedContainer(config types.ContainerCreateConfig) (container.ContainerCreateCreatedBody, error)
 	ContainerStart(name string, hostConfig *container.HostConfig, checkpoint string, checkpointDir string) error

+ 3 - 1
daemon/cluster/executor/container/executor.go

@@ -139,13 +139,15 @@ func (e *executor) Configure(ctx context.Context, node *api.Node) error {
 		options.IPAM.Config = append(options.IPAM.Config, c)
 	}
 
-	return e.backend.SetupIngress(clustertypes.NetworkCreateRequest{
+	_, err := e.backend.SetupIngress(clustertypes.NetworkCreateRequest{
 		ID: na.Network.ID,
 		NetworkCreateRequest: types.NetworkCreateRequest{
 			Name:          na.Network.Spec.Annotations.Name,
 			NetworkCreate: options,
 		},
 	}, na.Addresses[0])
+
+	return err
 }
 
 // Controller returns a docker container runner.

+ 24 - 0
daemon/daemon.go

@@ -445,7 +445,25 @@ func (daemon *Daemon) DaemonLeavesCluster() {
 	// Daemon is in charge of removing the attachable networks with
 	// connected containers when the node leaves the swarm
 	daemon.clearAttachableNetworks()
+	// We no longer need the cluster provider, stop it now so that
+	// the network agent will stop listening to cluster events.
 	daemon.setClusterProvider(nil)
+	// Wait for the networking cluster agent to stop
+	daemon.netController.AgentStopWait()
+	// Daemon is in charge of removing the ingress network when the
+	// node leaves the swarm. Wait for job to be done or timeout.
+	// This is called also on graceful daemon shutdown. We need to
+	// wait, because the ingress release has to happen before the
+	// network controller is stopped.
+	if done, err := daemon.ReleaseIngress(); err == nil {
+		select {
+		case <-done:
+		case <-time.After(5 * time.Second):
+			logrus.Warnf("timeout while waiting for ingress network removal")
+		}
+	} else {
+		logrus.Warnf("failed to initiate ingress network removal: %v", err)
+	}
 }
 
 // setClusterProvider sets a component for querying the current cluster state.
@@ -832,6 +850,12 @@ func (daemon *Daemon) Shutdown() error {
 		}
 	}
 
+	// If we are part of a cluster, clean up cluster's stuff
+	if daemon.clusterProvider != nil {
+		logrus.Debugf("start clean shutdown of cluster resources...")
+		daemon.DaemonLeavesCluster()
+	}
+
 	// Shutdown plugins after containers and layerstore. Don't change the order.
 	daemon.pluginShutdown()
 

+ 15 - 9
daemon/network.go

@@ -101,8 +101,9 @@ func (daemon *Daemon) getAllNetworks() []libnetwork.Network {
 }
 
 type ingressJob struct {
-	create *clustertypes.NetworkCreateRequest
-	ip     net.IP
+	create  *clustertypes.NetworkCreateRequest
+	ip      net.IP
+	jobDone chan struct{}
 }
 
 var (
@@ -124,6 +125,7 @@ func (daemon *Daemon) startIngressWorker() {
 					daemon.releaseIngress(ingressID)
 					ingressID = ""
 				}
+				close(r.jobDone)
 			}
 		}
 	}()
@@ -137,19 +139,23 @@ func (daemon *Daemon) enqueueIngressJob(job *ingressJob) {
 }
 
 // SetupIngress setups ingress networking.
-func (daemon *Daemon) SetupIngress(create clustertypes.NetworkCreateRequest, nodeIP string) error {
+// The function returns a channel which will signal the caller when the programming is completed.
+func (daemon *Daemon) SetupIngress(create clustertypes.NetworkCreateRequest, nodeIP string) (<-chan struct{}, error) {
 	ip, _, err := net.ParseCIDR(nodeIP)
 	if err != nil {
-		return err
+		return nil, err
 	}
-	daemon.enqueueIngressJob(&ingressJob{&create, ip})
-	return nil
+	done := make(chan struct{})
+	daemon.enqueueIngressJob(&ingressJob{&create, ip, done})
+	return done, nil
 }
 
 // ReleaseIngress releases the ingress networking.
-func (daemon *Daemon) ReleaseIngress() error {
-	daemon.enqueueIngressJob(&ingressJob{nil, nil})
-	return nil
+// The function returns a channel which will signal the caller when the programming is completed.
+func (daemon *Daemon) ReleaseIngress() (<-chan struct{}, error) {
+	done := make(chan struct{})
+	daemon.enqueueIngressJob(&ingressJob{nil, nil, done})
+	return done, nil
 }
 
 func (daemon *Daemon) setupIngress(create *clustertypes.NetworkCreateRequest, ip net.IP, staleID string) {

+ 1 - 1
vendor.conf

@@ -24,7 +24,7 @@ github.com/RackSec/srslog 456df3a81436d29ba874f3590eeeee25d666f8a5
 github.com/imdario/mergo 0.2.1
 
 #get libnetwork packages
-github.com/docker/libnetwork b6cb1eee1e7fc27ee05f0eb830d3e60e67a88565
+github.com/docker/libnetwork f3c4ca8ce5c128e071bab198c4ed9fd0d08384eb
 github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894
 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
 github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec

+ 1 - 0
vendor/github.com/docker/libnetwork/agent.go

@@ -218,6 +218,7 @@ func (c *controller) agentSetup() error {
 	if c.agent != nil && c.agentInitDone != nil {
 		close(c.agentInitDone)
 		c.agentInitDone = nil
+		c.agentStopDone = make(chan struct{})
 	}
 	c.Unlock()
 

+ 19 - 40
vendor/github.com/docker/libnetwork/controller.go

@@ -127,6 +127,9 @@ type NetworkController interface {
 	// Wait for agent initialization complete in libnetwork controller
 	AgentInitWait()
 
+	// Wait for agent to stop if running
+	AgentStopWait()
+
 	// SetKeys configures the encryption key for gossip and overlay data path
 	SetKeys(keys []*types.EncryptionKey) error
 }
@@ -160,6 +163,7 @@ type controller struct {
 	agent                  *agent
 	networkLocker          *locker.Locker
 	agentInitDone          chan struct{}
+	agentStopDone          chan struct{}
 	keys                   []*types.EncryptionKey
 	clusterConfigAvailable bool
 	sync.Mutex
@@ -338,7 +342,12 @@ func (c *controller) clusterAgentInit() {
 			c.agentClose()
 			c.cleanupServiceBindings("")
 
-			c.clearIngress(true)
+			c.Lock()
+			if c.agentStopDone != nil {
+				close(c.agentStopDone)
+				c.agentStopDone = nil
+			}
+			c.Unlock()
 
 			return
 		}
@@ -357,6 +366,15 @@ func (c *controller) AgentInitWait() {
 	}
 }
 
+func (c *controller) AgentStopWait() {
+	c.Lock()
+	agentStopDone := c.agentStopDone
+	c.Unlock()
+	if agentStopDone != nil {
+		<-agentStopDone
+	}
+}
+
 func (c *controller) makeDriverConfig(ntype string) map[string]interface{} {
 	if c.cfg == nil {
 		return nil
@@ -1153,46 +1171,7 @@ func (c *controller) getIPAMDriver(name string) (ipamapi.Ipam, *ipamapi.Capabili
 }
 
 func (c *controller) Stop() {
-	c.clearIngress(false)
 	c.closeStores()
 	c.stopExternalKeyListener()
 	osl.GC()
 }
-
-func (c *controller) clearIngress(clusterLeave bool) {
-	c.Lock()
-	ingressSandbox := c.ingressSandbox
-	c.ingressSandbox = nil
-	c.Unlock()
-
-	var n *network
-	if ingressSandbox != nil {
-		for _, ep := range ingressSandbox.getConnectedEndpoints() {
-			if nw := ep.getNetwork(); nw.ingress {
-				n = nw
-				break
-			}
-		}
-		if err := ingressSandbox.Delete(); err != nil {
-			logrus.Warnf("Could not delete ingress sandbox while leaving: %v", err)
-		}
-	}
-
-	if n == nil {
-		for _, nw := range c.Networks() {
-			if nw.Info().Ingress() {
-				n = nw.(*network)
-				break
-			}
-		}
-	}
-	if n == nil && clusterLeave {
-		logrus.Warnf("Could not find ingress network while leaving")
-	}
-
-	if n != nil {
-		if err := n.Delete(); err != nil {
-			logrus.Warnf("Could not delete ingress network while leaving: %v", err)
-		}
-	}
-}

+ 1 - 1
vendor/github.com/docker/libnetwork/drivers/overlay/joinleave.go

@@ -205,7 +205,7 @@ func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key stri
 		return
 	}
 
-	d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true)
+	d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true, false, false)
 }
 
 // Leave method is invoked when a Sandbox detaches from an endpoint.

+ 19 - 10
vendor/github.com/docker/libnetwork/drivers/overlay/ov_network.go

@@ -645,19 +645,28 @@ func (n *network) watchMiss(nlSock *nl.NetlinkSocket) {
 				continue
 			}
 
-			if neigh.IP.To4() == nil {
-				if neigh.HardwareAddr != nil {
-					logrus.Debugf("Miss notification, l2 mac %v", neigh.HardwareAddr)
-				}
+			var (
+				ip             net.IP
+				mac            net.HardwareAddr
+				l2Miss, l3Miss bool
+			)
+			if neigh.IP.To4() != nil {
+				ip = neigh.IP
+				l3Miss = true
+			} else if neigh.HardwareAddr != nil {
+				mac = []byte(neigh.HardwareAddr)
+				ip = net.IP(mac[2:])
+				l2Miss = true
+			} else {
 				continue
 			}
 
 			// Not any of the network's subnets. Ignore.
-			if !n.contains(neigh.IP) {
+			if !n.contains(ip) {
 				continue
 			}
 
-			logrus.Debugf("miss notification for dest IP, %v", neigh.IP.String())
+			logrus.Debugf("miss notification: dest IP %v, dest MAC %v", ip, mac)
 
 			if neigh.State&(netlink.NUD_STALE|netlink.NUD_INCOMPLETE) == 0 {
 				continue
@@ -667,14 +676,14 @@ func (n *network) watchMiss(nlSock *nl.NetlinkSocket) {
 				continue
 			}
 
-			mac, IPmask, vtep, err := n.driver.resolvePeer(n.id, neigh.IP)
+			mac, IPmask, vtep, err := n.driver.resolvePeer(n.id, ip)
 			if err != nil {
-				logrus.Errorf("could not resolve peer %q: %v", neigh.IP, err)
+				logrus.Errorf("could not resolve peer %q: %v", ip, err)
 				continue
 			}
 
-			if err := n.driver.peerAdd(n.id, "dummy", neigh.IP, IPmask, mac, vtep, true); err != nil {
-				logrus.Errorf("could not add neighbor entry for missed peer %q: %v", neigh.IP, err)
+			if err := n.driver.peerAdd(n.id, "dummy", ip, IPmask, mac, vtep, true, l2Miss, l3Miss); err != nil {
+				logrus.Errorf("could not add neighbor entry for missed peer %q: %v", ip, err)
 			}
 		}
 	}

+ 1 - 1
vendor/github.com/docker/libnetwork/drivers/overlay/ov_serf.go

@@ -121,7 +121,7 @@ func (d *driver) processEvent(u serf.UserEvent) {
 	switch action {
 	case "join":
 		if err := d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac,
-			net.ParseIP(vtepStr), true); err != nil {
+			net.ParseIP(vtepStr), true, false, false); err != nil {
 			logrus.Errorf("Peer add failed in the driver: %v\n", err)
 		}
 	case "leave":

+ 4 - 4
vendor/github.com/docker/libnetwork/drivers/overlay/peerdb.go

@@ -236,7 +236,7 @@ func (d *driver) peerDbUpdateSandbox(nid string) {
 		op := func() {
 			if err := d.peerAdd(nid, entry.eid, pKey.peerIP, entry.peerIPMask,
 				pKey.peerMac, entry.vtep,
-				false); err != nil {
+				false, false, false); err != nil {
 				fmt.Printf("peerdbupdate in sandbox failed for ip %s and mac %s: %v",
 					pKey.peerIP, pKey.peerMac, err)
 			}
@@ -254,7 +254,7 @@ func (d *driver) peerDbUpdateSandbox(nid string) {
 }
 
 func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
-	peerMac net.HardwareAddr, vtep net.IP, updateDb bool) error {
+	peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss bool) error {
 
 	if err := validateID(nid, eid); err != nil {
 		return err
@@ -297,12 +297,12 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
 	}
 
 	// Add neighbor entry for the peer IP
-	if err := sbox.AddNeighbor(peerIP, peerMac, sbox.NeighborOptions().LinkName(s.vxlanName)); err != nil {
+	if err := sbox.AddNeighbor(peerIP, peerMac, l3Miss, sbox.NeighborOptions().LinkName(s.vxlanName)); err != nil {
 		return fmt.Errorf("could not add neighbor entry into the sandbox: %v", err)
 	}
 
 	// Add fdb entry to the bridge for the peer mac
-	if err := sbox.AddNeighbor(vtep, peerMac, sbox.NeighborOptions().LinkName(s.vxlanName),
+	if err := sbox.AddNeighbor(vtep, peerMac, l2Miss, sbox.NeighborOptions().LinkName(s.vxlanName),
 		sbox.NeighborOptions().Family(syscall.AF_BRIDGE)); err != nil {
 		return fmt.Errorf("could not add fdb entry into the sandbox: %v", err)
 	}

+ 1 - 1
vendor/github.com/docker/libnetwork/drivers/solaris/overlay/peerdb.go

@@ -279,7 +279,7 @@ func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
 	}
 
 	// Add neighbor entry for the peer IP
-	if err := sbox.AddNeighbor(peerIP, peerMac, sbox.NeighborOptions().LinkName(s.vxlanName)); err != nil {
+	if err := sbox.AddNeighbor(peerIP, peerMac, false, sbox.NeighborOptions().LinkName(s.vxlanName)); err != nil {
 		return fmt.Errorf("could not add neigbor entry into the sandbox: %v", err)
 	}
 

+ 50 - 17
vendor/github.com/docker/libnetwork/endpoint.go

@@ -519,6 +519,14 @@ func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error {
 		return err
 	}
 
+	defer func() {
+		if err != nil {
+			if e := ep.deleteDriverInfoFromCluster(); e != nil {
+				logrus.Errorf("Could not delete endpoint state for endpoint %s from cluster on join failure: %v", ep.Name(), e)
+			}
+		}
+	}()
+
 	if sb.needDefaultGW() && sb.getEndpointInGWNetwork() == nil {
 		return sb.setupDefaultGW()
 	}
@@ -576,39 +584,64 @@ func doUpdateHostsFile(n *network, sb *sandbox) bool {
 }
 
 func (ep *endpoint) rename(name string) error {
-	var err error
+	var (
+		err      error
+		netWatch *netWatch
+		ok       bool
+	)
+
 	n := ep.getNetwork()
 	if n == nil {
 		return fmt.Errorf("network not connected for ep %q", ep.name)
 	}
 
-	n.getController().Lock()
-	netWatch, ok := n.getController().nmap[n.ID()]
-	n.getController().Unlock()
+	c := n.getController()
 
-	if !ok {
-		return fmt.Errorf("watch null for network %q", n.Name())
+	if c.isAgent() {
+		if err = ep.deleteServiceInfoFromCluster(); err != nil {
+			return types.InternalErrorf("Could not delete service state for endpoint %s from cluster on rename: %v", ep.Name(), err)
+		}
+	} else {
+		c.Lock()
+		netWatch, ok = c.nmap[n.ID()]
+		c.Unlock()
+		if !ok {
+			return fmt.Errorf("watch null for network %q", n.Name())
+		}
+		n.updateSvcRecord(ep, c.getLocalEps(netWatch), false)
 	}
 
-	n.updateSvcRecord(ep, n.getController().getLocalEps(netWatch), false)
-
 	oldName := ep.name
 	oldAnonymous := ep.anonymous
 	ep.name = name
 	ep.anonymous = false
 
-	n.updateSvcRecord(ep, n.getController().getLocalEps(netWatch), true)
-	defer func() {
-		if err != nil {
-			n.updateSvcRecord(ep, n.getController().getLocalEps(netWatch), false)
-			ep.name = oldName
-			ep.anonymous = oldAnonymous
-			n.updateSvcRecord(ep, n.getController().getLocalEps(netWatch), true)
+	if c.isAgent() {
+		if err = ep.addServiceInfoToCluster(); err != nil {
+			return types.InternalErrorf("Could not add service state for endpoint %s to cluster on rename: %v", ep.Name(), err)
 		}
-	}()
+		defer func() {
+			if err != nil {
+				ep.deleteServiceInfoFromCluster()
+				ep.name = oldName
+				ep.anonymous = oldAnonymous
+				ep.addServiceInfoToCluster()
+			}
+		}()
+	} else {
+		n.updateSvcRecord(ep, c.getLocalEps(netWatch), true)
+		defer func() {
+			if err != nil {
+				n.updateSvcRecord(ep, c.getLocalEps(netWatch), false)
+				ep.name = oldName
+				ep.anonymous = oldAnonymous
+				n.updateSvcRecord(ep, c.getLocalEps(netWatch), true)
+			}
+		}()
+	}
 
 	// Update the store with the updated name
-	if err = n.getController().updateToStore(ep); err != nil {
+	if err = c.updateToStore(ep); err != nil {
 		return err
 	}
 	// After the name change do a dummy endpoint count update to

+ 8 - 0
vendor/github.com/docker/libnetwork/error.go

@@ -183,3 +183,11 @@ func (mr ManagerRedirectError) Error() string {
 
 // Maskable denotes the type of this error
 func (mr ManagerRedirectError) Maskable() {}
+
+// ErrDataStoreNotInitialized is returned if an invalid data scope is passed
+// for getting data store
+type ErrDataStoreNotInitialized string
+
+func (dsni ErrDataStoreNotInitialized) Error() string {
+	return fmt.Sprintf("datastore for scope %q is not initialized", string(dsni))
+}

+ 1 - 2
vendor/github.com/docker/libnetwork/networkdb/broadcast.go

@@ -105,8 +105,7 @@ type tableEventMessage struct {
 }
 
 func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool {
-	otherm := other.(*tableEventMessage)
-	return m.id == otherm.id && m.tname == otherm.tname && m.key == otherm.key
+	return false
 }
 
 func (m *tableEventMessage) Message() []byte {

+ 14 - 5
vendor/github.com/docker/libnetwork/osl/neigh_linux.go

@@ -72,8 +72,11 @@ func (n *networkNamespace) DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr,
 			nlnh.LinkIndex = iface.Attrs().Index
 		}
 
+		// If the kernel deletion fails for the neighbor entry still remote it
+		// from the namespace cache. Otherwise if the neighbor moves back to the
+		// same host again, kernel update can fail.
 		if err := nlh.NeighDel(nlnh); err != nil {
-			return fmt.Errorf("could not delete neighbor entry: %v", err)
+			logrus.Warnf("Deleting neighbor IP %s, mac %s failed, %v", dstIP, dstMac, err)
 		}
 	}
 
@@ -85,21 +88,26 @@ func (n *networkNamespace) DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr,
 		}
 	}
 	n.Unlock()
+	logrus.Debugf("Neighbor entry deleted for IP %v, mac %v", dstIP, dstMac)
 
 	return nil
 }
 
-func (n *networkNamespace) AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, options ...NeighOption) error {
+func (n *networkNamespace) AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, force bool, options ...NeighOption) error {
 	var (
 		iface netlink.Link
 		err   error
 	)
 
+	// If the namespace already has the neighbor entry but the AddNeighbor is called
+	// because of a miss notification (force flag) program the kernel anyway.
 	nh := n.findNeighbor(dstIP, dstMac)
 	if nh != nil {
-		logrus.Debugf("Neighbor entry already present for IP %v, mac %v", dstIP, dstMac)
-		// If it exists silently return
-		return nil
+		if !force {
+			logrus.Warnf("Neighbor entry already present for IP %v, mac %v", dstIP, dstMac)
+			return nil
+		}
+		logrus.Warnf("Force kernel update, Neighbor entry already present for IP %v, mac %v", dstIP, dstMac)
 	}
 
 	nh = &neigh{
@@ -150,6 +158,7 @@ func (n *networkNamespace) AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, op
 	n.Lock()
 	n.neighbors = append(n.neighbors, nh)
 	n.Unlock()
+	logrus.Debugf("Neighbor entry added for IP %v, mac %v", dstIP, dstMac)
 
 	return nil
 }

+ 1 - 1
vendor/github.com/docker/libnetwork/osl/sandbox.go

@@ -39,7 +39,7 @@ type Sandbox interface {
 	RemoveStaticRoute(*types.StaticRoute) error
 
 	// AddNeighbor adds a neighbor entry into the sandbox.
-	AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, option ...NeighOption) error
+	AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, force bool, option ...NeighOption) error
 
 	// DeleteNeighbor deletes neighbor entry from the sandbox.
 	DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr, osDelete bool) error

+ 2 - 2
vendor/github.com/docker/libnetwork/store.go

@@ -225,7 +225,7 @@ func (n *network) getEndpointsFromStore() ([]*endpoint, error) {
 func (c *controller) updateToStore(kvObject datastore.KVObject) error {
 	cs := c.getStore(kvObject.DataScope())
 	if cs == nil {
-		return fmt.Errorf("datastore for scope %q is not initialized ", kvObject.DataScope())
+		return ErrDataStoreNotInitialized(kvObject.DataScope())
 	}
 
 	if err := cs.PutObjectAtomic(kvObject); err != nil {
@@ -241,7 +241,7 @@ func (c *controller) updateToStore(kvObject datastore.KVObject) error {
 func (c *controller) deleteFromStore(kvObject datastore.KVObject) error {
 	cs := c.getStore(kvObject.DataScope())
 	if cs == nil {
-		return fmt.Errorf("datastore for scope %q is not initialized ", kvObject.DataScope())
+		return ErrDataStoreNotInitialized(kvObject.DataScope())
 	}
 
 retry: