Przeglądaj źródła

Index service on both id and portconfigs

While trying to update loadbalancer state index the service both on id
and portconfig. From libnetwork point of view a service is not just
defined by its id but also the ports it exposes. When a service updates
its port its id remains the same but its portconfigs change which should
be treated as a new service in libnetwork in order to ensure proper
cleanup of old LB state and creation of new LB state.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
Jana Radhakrishnan 9 lat temu
rodzic
commit
bc89397105
3 zmienionych plików z 80 dodań i 9 usunięć
  1. 2 2
      libnetwork/controller.go
  2. 23 1
      libnetwork/service.go
  3. 55 6
      libnetwork/service_linux.go

+ 2 - 2
libnetwork/controller.go

@@ -144,7 +144,7 @@ type controller struct {
 	unWatchCh              chan *endpoint
 	svcRecords             map[string]svcInfo
 	nmap                   map[string]*netWatch
-	serviceBindings        map[string]*service
+	serviceBindings        map[serviceKey]*service
 	defOsSbox              osl.Sandbox
 	ingressSandbox         *sandbox
 	sboxOnce               sync.Once
@@ -167,7 +167,7 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
 		cfg:             config.ParseConfigOptions(cfgOptions...),
 		sandboxes:       sandboxTable{},
 		svcRecords:      make(map[string]svcInfo),
-		serviceBindings: make(map[string]*service),
+		serviceBindings: make(map[serviceKey]*service),
 		agentInitDone:   make(chan struct{}),
 	}
 

+ 23 - 1
libnetwork/service.go

@@ -1,6 +1,7 @@
 package libnetwork
 
 import (
+	"fmt"
 	"net"
 	"sync"
 )
@@ -12,6 +13,27 @@ var (
 	fwMarkCtrMu sync.Mutex
 )
 
+type portConfigs []*PortConfig
+
+func (p portConfigs) String() string {
+	if len(p) == 0 {
+		return ""
+	}
+
+	pc := p[0]
+	str := fmt.Sprintf("%d:%d/%s", pc.PublishedPort, pc.TargetPort, PortConfig_Protocol_name[int32(pc.Protocol)])
+	for _, pc := range p[1:] {
+		str = str + fmt.Sprintf(",%d:%d/%s", pc.PublishedPort, pc.TargetPort, PortConfig_Protocol_name[int32(pc.Protocol)])
+	}
+
+	return str
+}
+
+type serviceKey struct {
+	id    string
+	ports string
+}
+
 type service struct {
 	name string // Service Name
 	id   string // Service ID
@@ -21,7 +43,7 @@ type service struct {
 	loadBalancers map[string]*loadBalancer
 
 	// List of ingress ports exposed by the service
-	ingressPorts []*PortConfig
+	ingressPorts portConfigs
 
 	sync.Mutex
 }

+ 55 - 6
libnetwork/service_linux.go

@@ -48,13 +48,18 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i
 		return err
 	}
 
+	skey := serviceKey{
+		id:    sid,
+		ports: portConfigs(ingressPorts).String(),
+	}
+
 	c.Lock()
-	s, ok := c.serviceBindings[sid]
+	s, ok := c.serviceBindings[skey]
 	if !ok {
 		// Create a new service if we are seeing this service
 		// for the first time.
 		s = newService(name, sid, ingressPorts)
-		c.serviceBindings[sid] = s
+		c.serviceBindings[skey] = s
 	}
 	c.Unlock()
 
@@ -121,8 +126,13 @@ func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, in
 		return err
 	}
 
+	skey := serviceKey{
+		id:    sid,
+		ports: portConfigs(ingressPorts).String(),
+	}
+
 	c.Lock()
-	s, ok := c.serviceBindings[sid]
+	s, ok := c.serviceBindings[skey]
 	if !ok {
 		c.Unlock()
 		return nil
@@ -167,7 +177,7 @@ func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, in
 	if len(s.loadBalancers) == 0 {
 		// All loadbalancers for the service removed. Time to
 		// remove the service itself.
-		delete(c.serviceBindings, sid)
+		delete(c.serviceBindings, skey)
 	}
 
 	// Remove loadbalancer service(if needed) and backend in all
@@ -314,7 +324,7 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P
 	if addService {
 		var iPorts []*PortConfig
 		if sb.ingress {
-			iPorts = ingressPorts
+			iPorts = filterPortConfigs(ingressPorts, false)
 			if err := programIngress(gwIP, iPorts, false); err != nil {
 				logrus.Errorf("Failed to add ingress: %v", err)
 				return
@@ -383,7 +393,7 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
 
 		var iPorts []*PortConfig
 		if sb.ingress {
-			iPorts = ingressPorts
+			iPorts = filterPortConfigs(ingressPorts, true)
 			if err := programIngress(gwIP, iPorts, true); err != nil {
 				logrus.Errorf("Failed to delete ingress: %v", err)
 			}
@@ -401,8 +411,47 @@ var (
 	ingressOnce     sync.Once
 	ingressProxyMu  sync.Mutex
 	ingressProxyTbl = make(map[string]io.Closer)
+	portConfigMu    sync.Mutex
+	portConfigTbl   = make(map[PortConfig]int)
 )
 
+func filterPortConfigs(ingressPorts []*PortConfig, isDelete bool) []*PortConfig {
+	portConfigMu.Lock()
+	iPorts := make([]*PortConfig, 0, len(ingressPorts))
+	for _, pc := range ingressPorts {
+		if isDelete {
+			if cnt, ok := portConfigTbl[*pc]; ok {
+				// This is the last reference to this
+				// port config. Delete the port config
+				// and add it to filtered list to be
+				// plumbed.
+				if cnt == 1 {
+					delete(portConfigTbl, *pc)
+					iPorts = append(iPorts, pc)
+					continue
+				}
+
+				portConfigTbl[*pc] = cnt - 1
+			}
+
+			continue
+		}
+
+		if cnt, ok := portConfigTbl[*pc]; ok {
+			portConfigTbl[*pc] = cnt + 1
+			continue
+		}
+
+		// We are adding it for the first time. Add it to the
+		// filter list to be plumbed.
+		portConfigTbl[*pc] = 1
+		iPorts = append(iPorts, pc)
+	}
+	portConfigMu.Unlock()
+
+	return iPorts
+}
+
 func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) error {
 	addDelOpt := "-I"
 	if isDelete {