فهرست منبع

Merge pull request #1407 from mrjana/lb

Cleanup service bindings when leaving cluster
Santhosh Manohar 9 سال پیش
والد
کامیت
10c617ed4d
5فایلهای تغییر یافته به همراه68 افزوده شده و 9 حذف شده
  1. 10 1
      libnetwork/controller.go
  2. 13 6
      libnetwork/network.go
  3. 3 0
      libnetwork/service.go
  4. 39 2
      libnetwork/service_linux.go
  5. 3 0
      libnetwork/service_unsupported.go

+ 10 - 1
libnetwork/controller.go

@@ -310,6 +310,16 @@ func (c *controller) clusterAgentInit() {
 			c.keys = nil
 			c.Unlock()
 
+			// We are leaving the cluster. Make sure we
+			// close the gossip so that we stop all
+			// incoming gossip updates before cleaning up
+			// any remaining service bindings. But before
+			// deleting the networks since the networks
+			// should still be present when cleaning up
+			// service bindings
+			c.agentClose()
+			c.cleanupServiceBindings("")
+
 			if err := c.ingressSandbox.Delete(); err != nil {
 				log.Warnf("Could not delete ingress sandbox while leaving: %v", err)
 			}
@@ -329,7 +339,6 @@ func (c *controller) clusterAgentInit() {
 				}
 			}
 
-			c.agentClose()
 			return
 		}
 	}

+ 13 - 6
libnetwork/network.go

@@ -756,6 +756,19 @@ func (n *network) delete(force bool) error {
 		log.Warnf("Failed to update store after ipam release for network %s (%s): %v", n.Name(), n.ID(), err)
 	}
 
+	// We are about to delete the network. Leave the gossip
+	// cluster for the network to stop all incoming network
+	// specific gossip updates before cleaning up all the service
+	// bindings for the network. But cleanup service binding
+	// before deleting the network from the store since service
+	// bindings cleanup requires the network in the store.
+	n.cancelDriverWatches()
+	if err = n.leaveCluster(); err != nil {
+		log.Errorf("Failed leaving network %s from the agent cluster: %v", n.Name(), err)
+	}
+
+	c.cleanupServiceBindings(n.ID())
+
 	// deleteFromStore performs an atomic delete operation and the
 	// network.epCnt will help prevent any possible
 	// race between endpoint join and network delete
@@ -770,12 +783,6 @@ func (n *network) delete(force bool) error {
 		return fmt.Errorf("error deleting network from store: %v", err)
 	}
 
-	n.cancelDriverWatches()
-
-	if err = n.leaveCluster(); err != nil {
-		log.Errorf("Failed leaving network %s from the agent cluster: %v", n.Name(), err)
-	}
-
 	return nil
 }
 

+ 3 - 0
libnetwork/service.go

@@ -45,6 +45,9 @@ type service struct {
 	// List of ingress ports exposed by the service
 	ingressPorts portConfigs
 
+	// Service aliases
+	aliases []string
+
 	sync.Mutex
 }
 

+ 39 - 2
libnetwork/service_linux.go

@@ -28,15 +28,52 @@ func init() {
 	reexec.Register("fwmarker", fwMarker)
 }
 
-func newService(name string, id string, ingressPorts []*PortConfig) *service {
+func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service {
 	return &service{
 		name:          name,
 		id:            id,
 		ingressPorts:  ingressPorts,
 		loadBalancers: make(map[string]*loadBalancer),
+		aliases:       aliases,
 	}
 }
 
+func (c *controller) cleanupServiceBindings(cleanupNID string) {
+	var cleanupFuncs []func()
+	c.Lock()
+	for _, s := range c.serviceBindings {
+		s.Lock()
+		for nid, lb := range s.loadBalancers {
+			if cleanupNID != "" && nid != cleanupNID {
+				continue
+			}
+
+			for eid, ip := range lb.backEnds {
+				service := s
+				loadBalancer := lb
+				networkID := nid
+				epID := eid
+				epIP := ip
+
+				cleanupFuncs = append(cleanupFuncs, func() {
+					if err := c.rmServiceBinding(service.name, service.id, networkID, epID, loadBalancer.vip,
+						service.ingressPorts, service.aliases, epIP); err != nil {
+						logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v",
+							service.id, networkID, epID, err)
+					}
+				})
+			}
+		}
+		s.Unlock()
+	}
+	c.Unlock()
+
+	for _, f := range cleanupFuncs {
+		f()
+	}
+
+}
+
 func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error {
 	var (
 		s          *service
@@ -58,7 +95,7 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i
 	if !ok {
 		// Create a new service if we are seeing this service
 		// for the first time.
-		s = newService(name, sid, ingressPorts)
+		s = newService(name, sid, ingressPorts, aliases)
 		c.serviceBindings[skey] = s
 	}
 	c.Unlock()

+ 3 - 0
libnetwork/service_unsupported.go

@@ -7,6 +7,9 @@ import (
 	"net"
 )
 
+func (c *controller) cleanupServiceBindings(nid string) {
+}
+
 func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error {
 	return fmt.Errorf("not supported")
 }