浏览代码

Merge pull request #46348 from thaJeztah/refactor_isClusterEligible

libnetwork: rewrite Network.isClusterEligible to return agent
Sebastiaan van Stijn 1 年之前
父节点
当前提交
d3afa80b96
共有 1 个文件被更改,包括 58 次插入84 次删除
  1. 58 84
      libnetwork/agent.go

+ 58 - 84
libnetwork/agent.go

@@ -108,8 +108,8 @@ func resolveAddr(addrOrInterface string) (net.IP, error) {
 func (c *Controller) handleKeyChange(keys []*types.EncryptionKey) error {
 	drvEnc := discoverapi.DriverEncryptionUpdate{}
 
-	a := c.getAgent()
-	if a == nil {
+	agent := c.getAgent()
+	if agent == nil {
 		log.G(context.TODO()).Debug("Skipping key change as agent is nil")
 		return nil
 	}
@@ -168,14 +168,14 @@ func (c *Controller) handleKeyChange(keys []*types.EncryptionKey) error {
 	c.mu.Unlock()
 
 	if len(added) > 0 {
-		a.networkDB.SetKey(added)
+		agent.networkDB.SetKey(added)
 	}
 
 	key, _, err := c.getPrimaryKeyTag(subsysGossip)
 	if err != nil {
 		return err
 	}
-	a.networkDB.SetPrimaryKey(key)
+	agent.networkDB.SetPrimaryKey(key)
 
 	key, tag, err := c.getPrimaryKeyTag(subsysIPSec)
 	if err != nil {
@@ -185,7 +185,7 @@ func (c *Controller) handleKeyChange(keys []*types.EncryptionKey) error {
 	drvEnc.PrimaryTag = tag
 
 	if len(deleted) > 0 {
-		a.networkDB.RemoveKey(deleted)
+		agent.networkDB.RemoveKey(deleted)
 	}
 
 	c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
@@ -214,9 +214,8 @@ func (c *Controller) handleKeyChange(keys []*types.EncryptionKey) error {
 
 func (c *Controller) agentSetup(clusterProvider cluster.Provider) error {
 	agent := c.getAgent()
-
-	// If the agent is already present there is no need to try to initialize it again
 	if agent != nil {
+		// agent is already present, so there is no need initialize it again.
 		return nil
 	}
 
@@ -233,11 +232,17 @@ func (c *Controller) agentSetup(clusterProvider cluster.Provider) error {
 	listen := clusterProvider.GetListenAddress()
 	listenAddr, _, _ := net.SplitHostPort(listen)
 
-	log.G(context.TODO()).Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v MTU=%d",
-		listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList, c.Config().NetworkControlPlaneMTU)
-	if advAddr != "" && agent == nil {
+	log.G(context.TODO()).WithFields(log.Fields{
+		"listen-addr":               listenAddr,
+		"local-addr":                bindAddr,
+		"advertise-addr":            advAddr,
+		"data-path-addr":            dataAddr,
+		"remote-addr-list":          remoteAddrList,
+		"network-control-plane-mtu": c.Config().NetworkControlPlaneMTU,
+	}).Info("Initializing Libnetwork Agent")
+	if advAddr != "" {
 		if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
-			log.G(context.TODO()).Errorf("error in agentInit: %v", err)
+			log.G(context.TODO()).WithError(err).Errorf("Error in agentInit")
 			return err
 		}
 		c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
@@ -252,7 +257,7 @@ func (c *Controller) agentSetup(clusterProvider cluster.Provider) error {
 
 	if len(remoteAddrList) > 0 {
 		if err := c.agentJoin(remoteAddrList); err != nil {
-			log.G(context.TODO()).Errorf("Error in joining gossip cluster : %v(join will be retried in background)", err)
+			log.G(context.TODO()).WithError(err).Error("Error in joining gossip cluster: join will be retried in background")
 		}
 	}
 
@@ -452,11 +457,8 @@ type epRecord struct {
 // Services returns a map of services keyed by the service name with the details
 // of all the tasks that belong to the service. Applicable only in swarm mode.
 func (n *Network) Services() map[string]ServiceInfo {
-	if !n.isClusterEligible() {
-		return nil
-	}
-	agent := n.getController().getAgent()
-	if agent == nil {
+	agent, ok := n.clusterAgent()
+	if !ok {
 		return nil
 	}
 	nwID := n.ID()
@@ -527,36 +529,29 @@ func (n *Network) Services() map[string]ServiceInfo {
 	return sinfo
 }
 
-func (n *Network) isClusterEligible() bool {
+// clusterAgent returns the cluster agent if the network is a swarm-scoped,
+// multi-host network.
+func (n *Network) clusterAgent() (agent *nwAgent, ok bool) {
 	if n.scope != scope.Swarm || !n.driverIsMultihost() {
-		return false
+		return nil, false
 	}
-	return n.getController().getAgent() != nil
+	a := n.getController().getAgent()
+	return a, a != nil
 }
 
 func (n *Network) joinCluster() error {
-	if !n.isClusterEligible() {
+	agent, ok := n.clusterAgent()
+	if !ok {
 		return nil
 	}
-
-	agent := n.getController().getAgent()
-	if agent == nil {
-		return nil
-	}
-
 	return agent.networkDB.JoinNetwork(n.ID())
 }
 
 func (n *Network) leaveCluster() error {
-	if !n.isClusterEligible() {
+	agent, ok := n.clusterAgent()
+	if !ok {
 		return nil
 	}
-
-	agent := n.getController().getAgent()
-	if agent == nil {
-		return nil
-	}
-
 	return agent.networkDB.LeaveNetwork(n.ID())
 }
 
@@ -565,17 +560,14 @@ func (ep *Endpoint) addDriverInfoToCluster() error {
 		return nil
 	}
 	n := ep.getNetwork()
-	if !n.isClusterEligible() {
-		return nil
-	}
-
-	agent := n.getController().getAgent()
-	if agent == nil {
+	agent, ok := n.clusterAgent()
+	if !ok {
 		return nil
 	}
 
+	nwID := n.ID()
 	for _, te := range ep.joinInfo.driverTableEntries {
-		if err := agent.networkDB.CreateEntry(te.tableName, n.ID(), te.key, te.value); err != nil {
+		if err := agent.networkDB.CreateEntry(te.tableName, nwID, te.key, te.value); err != nil {
 			return err
 		}
 	}
@@ -587,17 +579,14 @@ func (ep *Endpoint) deleteDriverInfoFromCluster() error {
 		return nil
 	}
 	n := ep.getNetwork()
-	if !n.isClusterEligible() {
-		return nil
-	}
-
-	agent := n.getController().getAgent()
-	if agent == nil {
+	agent, ok := n.clusterAgent()
+	if !ok {
 		return nil
 	}
 
+	nwID := n.ID()
 	for _, te := range ep.joinInfo.driverTableEntries {
-		if err := agent.networkDB.DeleteEntry(te.tableName, n.ID(), te.key); err != nil {
+		if err := agent.networkDB.DeleteEntry(te.tableName, nwID, te.key); err != nil {
 			return err
 		}
 	}
@@ -610,7 +599,8 @@ func (ep *Endpoint) addServiceInfoToCluster(sb *Sandbox) error {
 	}
 
 	n := ep.getNetwork()
-	if !n.isClusterEligible() {
+	agent, ok := n.clusterAgent()
+	if !ok {
 		return nil
 	}
 
@@ -634,9 +624,6 @@ func (ep *Endpoint) addServiceInfoToCluster(sb *Sandbox) error {
 		return nil
 	}
 
-	c := n.getController()
-	agent := c.getAgent()
-
 	name := ep.Name()
 	if ep.isAnonymous() {
 		name = ep.MyAliases()[0]
@@ -649,12 +636,12 @@ func (ep *Endpoint) addServiceInfoToCluster(sb *Sandbox) error {
 		if n.ingress {
 			ingressPorts = ep.ingressPorts
 		}
-		if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
+		if err := n.getController().addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
 			return err
 		}
 	} else {
 		// This is a container simply attached to an attachable network
-		if err := c.addContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
+		if err := n.getController().addContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
 			return err
 		}
 	}
@@ -674,11 +661,9 @@ func (ep *Endpoint) addServiceInfoToCluster(sb *Sandbox) error {
 		return err
 	}
 
-	if agent != nil {
-		if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil {
-			log.G(context.TODO()).Warnf("addServiceInfoToCluster NetworkDB CreateEntry failed for %s %s err:%s", ep.id, n.id, err)
-			return err
-		}
+	if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil {
+		log.G(context.TODO()).Warnf("addServiceInfoToCluster NetworkDB CreateEntry failed for %s %s err:%s", ep.id, n.id, err)
+		return err
 	}
 
 	log.G(context.TODO()).Debugf("addServiceInfoToCluster END for %s %s", ep.svcName, ep.ID())
@@ -692,7 +677,8 @@ func (ep *Endpoint) deleteServiceInfoFromCluster(sb *Sandbox, fullRemove bool, m
 	}
 
 	n := ep.getNetwork()
-	if !n.isClusterEligible() {
+	agent, ok := n.clusterAgent()
+	if !ok {
 		return nil
 	}
 
@@ -709,23 +695,18 @@ func (ep *Endpoint) deleteServiceInfoFromCluster(sb *Sandbox, fullRemove bool, m
 		return nil
 	}
 
-	c := n.getController()
-	agent := c.getAgent()
-
 	name := ep.Name()
 	if ep.isAnonymous() {
 		name = ep.MyAliases()[0]
 	}
 
-	if agent != nil {
-		// First update the networkDB then locally
-		if fullRemove {
-			if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
-				log.G(context.TODO()).Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err)
-			}
-		} else {
-			disableServiceInNetworkDB(agent, n, ep)
+	// First update the networkDB then locally
+	if fullRemove {
+		if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
+			log.G(context.TODO()).Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err)
 		}
+	} else {
+		disableServiceInNetworkDB(agent, n, ep)
 	}
 
 	if ep.Iface() != nil && ep.Iface().Address() != nil {
@@ -735,12 +716,12 @@ func (ep *Endpoint) deleteServiceInfoFromCluster(sb *Sandbox, fullRemove bool, m
 			if n.ingress {
 				ingressPorts = ep.ingressPorts
 			}
-			if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true, fullRemove); err != nil {
+			if err := n.getController().rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true, fullRemove); err != nil {
 				return err
 			}
 		} else {
 			// This is a container simply attached to an attachable network
-			if err := c.delContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
+			if err := n.getController().delContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
 				return err
 			}
 		}
@@ -784,15 +765,12 @@ func (n *Network) addDriverWatches() {
 	if len(n.driverTables) == 0 {
 		return
 	}
-	if !n.isClusterEligible() {
+	agent, ok := n.clusterAgent()
+	if !ok {
 		return
 	}
 
 	c := n.getController()
-	agent := c.getAgent()
-	if agent == nil {
-		return
-	}
 	for _, table := range n.driverTables {
 		ch, cancel := agent.networkDB.Watch(table.name, n.ID())
 		agent.Lock()
@@ -820,12 +798,8 @@ func (n *Network) addDriverWatches() {
 }
 
 func (n *Network) cancelDriverWatches() {
-	if !n.isClusterEligible() {
-		return
-	}
-
-	agent := n.getController().getAgent()
-	if agent == nil {
+	agent, ok := n.clusterAgent()
+	if !ok {
 		return
 	}