Browse Source

Merge pull request #1269 from mrjana/lb

Do not add loadbalancer to unpopulated sandboxes
Santhosh Manohar 9 years ago
parent
commit
131b1aa136
4 changed files with 75 additions and 41 deletions
  1. 7 6
      libnetwork/controller.go
  2. 32 17
      libnetwork/sandbox.go
  3. 9 8
      libnetwork/sandbox_store.go
  4. 27 10
      libnetwork/service_linux.go

+ 7 - 6
libnetwork/controller.go

@@ -810,12 +810,13 @@ func (c *controller) NewSandbox(containerID string, options ...SandboxOption) (s
 	// Create sandbox and process options first. Key generation depends on an option
 	// Create sandbox and process options first. Key generation depends on an option
 	if sb == nil {
 	if sb == nil {
 		sb = &sandbox{
 		sb = &sandbox{
-			id:          stringid.GenerateRandomID(),
-			containerID: containerID,
-			endpoints:   epHeap{},
-			epPriority:  map[string]int{},
-			config:      containerConfig{},
-			controller:  c,
+			id:                 stringid.GenerateRandomID(),
+			containerID:        containerID,
+			endpoints:          epHeap{},
+			epPriority:         map[string]int{},
+			populatedEndpoints: map[string]struct{}{},
+			config:             containerConfig{},
+			controller:         c,
 		}
 		}
 	}
 	}
 	sBox = sb
 	sBox = sb

+ 32 - 17
libnetwork/sandbox.go

@@ -68,23 +68,24 @@ func (sb *sandbox) processOptions(options ...SandboxOption) {
 type epHeap []*endpoint
 type epHeap []*endpoint
 
 
 type sandbox struct {
 type sandbox struct {
-	id            string
-	containerID   string
-	config        containerConfig
-	extDNS        []string
-	osSbox        osl.Sandbox
-	controller    *controller
-	resolver      Resolver
-	resolverOnce  sync.Once
-	refCnt        int
-	endpoints     epHeap
-	epPriority    map[string]int
-	joinLeaveDone chan struct{}
-	dbIndex       uint64
-	dbExists      bool
-	isStub        bool
-	inDelete      bool
-	ingress       bool
+	id                 string
+	containerID        string
+	config             containerConfig
+	extDNS             []string
+	osSbox             osl.Sandbox
+	controller         *controller
+	resolver           Resolver
+	resolverOnce       sync.Once
+	refCnt             int
+	endpoints          epHeap
+	epPriority         map[string]int
+	populatedEndpoints map[string]struct{}
+	joinLeaveDone      chan struct{}
+	dbIndex            uint64
+	dbExists           bool
+	isStub             bool
+	inDelete           bool
+	ingress            bool
 	sync.Mutex
 	sync.Mutex
 }
 }
 
 
@@ -798,6 +799,12 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
 		}
 		}
 	}
 	}
 
 
+	// Make sure to add the endpoint to the populated endpoint set
+	// before populating loadbalancers.
+	sb.Lock()
+	sb.populatedEndpoints[ep.ID()] = struct{}{}
+	sb.Unlock()
+
 	// Populate load balancer only after updating all the other
 	// Populate load balancer only after updating all the other
 	// information including gateway and other routes so that
 	// information including gateway and other routes so that
 	// loadbalancers are populated all the network state is in
 	// loadbalancers are populated all the network state is in
@@ -830,6 +837,7 @@ func (sb *sandbox) clearNetworkResources(origEp *endpoint) error {
 		releaseOSSboxResources(osSbox, ep)
 		releaseOSSboxResources(osSbox, ep)
 	}
 	}
 
 
+	delete(sb.populatedEndpoints, ep.ID())
 	sb.Lock()
 	sb.Lock()
 	if len(sb.endpoints) == 0 {
 	if len(sb.endpoints) == 0 {
 		// sb.endpoints should never be empty and this is unexpected error condition
 		// sb.endpoints should never be empty and this is unexpected error condition
@@ -879,6 +887,13 @@ func (sb *sandbox) clearNetworkResources(origEp *endpoint) error {
 	return nil
 	return nil
 }
 }
 
 
+func (sb *sandbox) isEndpointPopulated(ep *endpoint) bool {
+	sb.Lock()
+	_, ok := sb.populatedEndpoints[ep.ID()]
+	sb.Unlock()
+	return ok
+}
+
 // joinLeaveStart waits to ensure there are no joins or leaves in progress and
 // joinLeaveStart waits to ensure there are no joins or leaves in progress and
 // marks this join/leave in progress without race
 // marks this join/leave in progress without race
 func (sb *sandbox) joinLeaveStart() {
 func (sb *sandbox) joinLeaveStart() {

+ 9 - 8
libnetwork/sandbox_store.go

@@ -197,14 +197,15 @@ func (c *controller) sandboxCleanup(activeSandboxes map[string]interface{}) {
 		sbs := kvo.(*sbState)
 		sbs := kvo.(*sbState)
 
 
 		sb := &sandbox{
 		sb := &sandbox{
-			id:          sbs.ID,
-			controller:  sbs.c,
-			containerID: sbs.Cid,
-			endpoints:   epHeap{},
-			dbIndex:     sbs.dbIndex,
-			isStub:      true,
-			dbExists:    true,
-			extDNS:      sbs.ExtDNS,
+			id:                 sbs.ID,
+			controller:         sbs.c,
+			containerID:        sbs.Cid,
+			endpoints:          epHeap{},
+			populatedEndpoints: map[string]struct{}{},
+			dbIndex:            sbs.dbIndex,
+			isStub:             true,
+			dbExists:           true,
+			extDNS:             sbs.ExtDNS,
 		}
 		}
 
 
 		msg := " for cleanup"
 		msg := " for cleanup"

+ 27 - 10
libnetwork/service_linux.go

@@ -184,14 +184,20 @@ func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, in
 func (n *network) connectedLoadbalancers() []*loadBalancer {
 func (n *network) connectedLoadbalancers() []*loadBalancer {
 	c := n.getController()
 	c := n.getController()
 
 
+	serviceBindings := make([]*service, 0, len(c.serviceBindings))
 	c.Lock()
 	c.Lock()
-	defer c.Unlock()
+	for _, s := range c.serviceBindings {
+		serviceBindings = append(serviceBindings, s)
+	}
+	c.Unlock()
 
 
 	var lbs []*loadBalancer
 	var lbs []*loadBalancer
-	for _, s := range c.serviceBindings {
+	for _, s := range serviceBindings {
+		s.Lock()
 		if lb, ok := s.loadBalancers[n.ID()]; ok {
 		if lb, ok := s.loadBalancers[n.ID()]; ok {
 			lbs = append(lbs, lb)
 			lbs = append(lbs, lb)
 		}
 		}
+		s.Unlock()
 	}
 	}
 
 
 	return lbs
 	return lbs
@@ -229,12 +235,14 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
 			continue
 			continue
 		}
 		}
 
 
+		lb.service.Lock()
 		addService := true
 		addService := true
 		for _, ip := range lb.backEnds {
 		for _, ip := range lb.backEnds {
 			sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts,
 			sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts,
 				eIP, gwIP, addService)
 				eIP, gwIP, addService)
 			addService = false
 			addService = false
 		}
 		}
+		lb.service.Unlock()
 	}
 	}
 }
 }
 
 
@@ -245,6 +253,10 @@ func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
 	n.WalkEndpoints(func(e Endpoint) bool {
 	n.WalkEndpoints(func(e Endpoint) bool {
 		ep := e.(*endpoint)
 		ep := e.(*endpoint)
 		if sb, ok := ep.getSandbox(); ok {
 		if sb, ok := ep.getSandbox(); ok {
+			if !sb.isEndpointPopulated(ep) {
+				return false
+			}
+
 			var gwIP net.IP
 			var gwIP net.IP
 			if ep := sb.getGatewayEndpoint(); ep != nil {
 			if ep := sb.getGatewayEndpoint(); ep != nil {
 				gwIP = ep.Iface().Address().IP
 				gwIP = ep.Iface().Address().IP
@@ -264,6 +276,10 @@ func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Por
 	n.WalkEndpoints(func(e Endpoint) bool {
 	n.WalkEndpoints(func(e Endpoint) bool {
 		ep := e.(*endpoint)
 		ep := e.(*endpoint)
 		if sb, ok := ep.getSandbox(); ok {
 		if sb, ok := ep.getSandbox(); ok {
+			if !sb.isEndpointPopulated(ep) {
+				return false
+			}
+
 			var gwIP net.IP
 			var gwIP net.IP
 			if ep := sb.getGatewayEndpoint(); ep != nil {
 			if ep := sb.getGatewayEndpoint(); ep != nil {
 				gwIP = ep.Iface().Address().IP
 				gwIP = ep.Iface().Address().IP
@@ -356,15 +372,13 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
 	}
 	}
 
 
 	if err := i.DelDestination(s, d); err != nil {
 	if err := i.DelDestination(s, d); err != nil {
-		logrus.Errorf("Failed to delete real server %s for vip %s fwmark %d: %v", ip, vip, fwMark, err)
-		return
+		logrus.Infof("Failed to delete real server %s for vip %s fwmark %d: %v", ip, vip, fwMark, err)
 	}
 	}
 
 
 	if rmService {
 	if rmService {
 		s.SchedName = ipvs.RoundRobin
 		s.SchedName = ipvs.RoundRobin
 		if err := i.DelService(s); err != nil {
 		if err := i.DelService(s); err != nil {
-			logrus.Errorf("Failed to create a new service for vip %s fwmark %d: %v", vip, fwMark, err)
-			return
+			logrus.Errorf("Failed to delete a new service for vip %s fwmark %d: %v", vip, fwMark, err)
 		}
 		}
 
 
 		var iPorts []*PortConfig
 		var iPorts []*PortConfig
@@ -372,13 +386,11 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
 			iPorts = ingressPorts
 			iPorts = ingressPorts
 			if err := programIngress(gwIP, iPorts, true); err != nil {
 			if err := programIngress(gwIP, iPorts, true); err != nil {
 				logrus.Errorf("Failed to delete ingress: %v", err)
 				logrus.Errorf("Failed to delete ingress: %v", err)
-				return
 			}
 			}
 		}
 		}
 
 
 		if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, true); err != nil {
 		if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, true); err != nil {
 			logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
 			logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
-			return
 		}
 		}
 	}
 	}
 }
 }
@@ -454,12 +466,17 @@ func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) erro
 			rule := strings.Fields(fmt.Sprintf("-t nat %s %s -p %s --dport %d -j DNAT --to-destination %s:%d",
 			rule := strings.Fields(fmt.Sprintf("-t nat %s %s -p %s --dport %d -j DNAT --to-destination %s:%d",
 				addDelOpt, ingressChain, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, gwIP, iPort.PublishedPort))
 				addDelOpt, ingressChain, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, gwIP, iPort.PublishedPort))
 			if err := iptables.RawCombinedOutput(rule...); err != nil {
 			if err := iptables.RawCombinedOutput(rule...); err != nil {
-				return fmt.Errorf("setting up rule failed, %v: %v", rule, err)
+				errStr := fmt.Sprintf("setting up rule failed, %v: %v", rule, err)
+				if !isDelete {
+					return fmt.Errorf("%s", errStr)
+				}
+
+				logrus.Infof("%s", errStr)
 			}
 			}
 		}
 		}
 
 
 		if err := plumbProxy(iPort, isDelete); err != nil {
 		if err := plumbProxy(iPort, isDelete); err != nil {
-			return fmt.Errorf("failed to create proxy for port %d: %v", iPort.PublishedPort, err)
+			logrus.Warnf("failed to create proxy for port %d: %v", iPort.PublishedPort, err)
 		}
 		}
 	}
 	}