diff --git a/libnetwork/controller.go b/libnetwork/controller.go index b19f51d707..1646568f27 100644 --- a/libnetwork/controller.go +++ b/libnetwork/controller.go @@ -810,12 +810,13 @@ func (c *controller) NewSandbox(containerID string, options ...SandboxOption) (s // Create sandbox and process options first. Key generation depends on an option if sb == nil { sb = &sandbox{ - id: stringid.GenerateRandomID(), - containerID: containerID, - endpoints: epHeap{}, - epPriority: map[string]int{}, - config: containerConfig{}, - controller: c, + id: stringid.GenerateRandomID(), + containerID: containerID, + endpoints: epHeap{}, + epPriority: map[string]int{}, + populatedEndpoints: map[string]struct{}{}, + config: containerConfig{}, + controller: c, } } sBox = sb diff --git a/libnetwork/sandbox.go b/libnetwork/sandbox.go index b8f57fc592..6bb2766a71 100644 --- a/libnetwork/sandbox.go +++ b/libnetwork/sandbox.go @@ -68,23 +68,24 @@ func (sb *sandbox) processOptions(options ...SandboxOption) { type epHeap []*endpoint type sandbox struct { - id string - containerID string - config containerConfig - extDNS []string - osSbox osl.Sandbox - controller *controller - resolver Resolver - resolverOnce sync.Once - refCnt int - endpoints epHeap - epPriority map[string]int - joinLeaveDone chan struct{} - dbIndex uint64 - dbExists bool - isStub bool - inDelete bool - ingress bool + id string + containerID string + config containerConfig + extDNS []string + osSbox osl.Sandbox + controller *controller + resolver Resolver + resolverOnce sync.Once + refCnt int + endpoints epHeap + epPriority map[string]int + populatedEndpoints map[string]struct{} + joinLeaveDone chan struct{} + dbIndex uint64 + dbExists bool + isStub bool + inDelete bool + ingress bool sync.Mutex } @@ -798,6 +799,12 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error { } } + // Make sure to add the endpoint to the populated endpoint set + // before populating loadbalancers. + sb.Lock() + sb.populatedEndpoints[ep.ID()] = struct{}{} + sb.Unlock() + // Populate load balancer only after updating all the other // information including gateway and other routes so that // loadbalancers are populated all the network state is in @@ -830,6 +837,7 @@ func (sb *sandbox) clearNetworkResources(origEp *endpoint) error { releaseOSSboxResources(osSbox, ep) } + delete(sb.populatedEndpoints, ep.ID()) sb.Lock() if len(sb.endpoints) == 0 { // sb.endpoints should never be empty and this is unexpected error condition @@ -879,6 +887,13 @@ func (sb *sandbox) clearNetworkResources(origEp *endpoint) error { return nil } +func (sb *sandbox) isEndpointPopulated(ep *endpoint) bool { + sb.Lock() + _, ok := sb.populatedEndpoints[ep.ID()] + sb.Unlock() + return ok +} + // joinLeaveStart waits to ensure there are no joins or leaves in progress and // marks this join/leave in progress without race func (sb *sandbox) joinLeaveStart() { diff --git a/libnetwork/sandbox_store.go b/libnetwork/sandbox_store.go index de76579ff6..5b963e71c3 100644 --- a/libnetwork/sandbox_store.go +++ b/libnetwork/sandbox_store.go @@ -197,14 +197,15 @@ func (c *controller) sandboxCleanup(activeSandboxes map[string]interface{}) { sbs := kvo.(*sbState) sb := &sandbox{ - id: sbs.ID, - controller: sbs.c, - containerID: sbs.Cid, - endpoints: epHeap{}, - dbIndex: sbs.dbIndex, - isStub: true, - dbExists: true, - extDNS: sbs.ExtDNS, + id: sbs.ID, + controller: sbs.c, + containerID: sbs.Cid, + endpoints: epHeap{}, + populatedEndpoints: map[string]struct{}{}, + dbIndex: sbs.dbIndex, + isStub: true, + dbExists: true, + extDNS: sbs.ExtDNS, } msg := " for cleanup" diff --git a/libnetwork/service_linux.go b/libnetwork/service_linux.go index 1d36c4538d..780889f0bc 100644 --- a/libnetwork/service_linux.go +++ b/libnetwork/service_linux.go @@ -184,14 +184,20 @@ func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, in func (n *network) connectedLoadbalancers() []*loadBalancer { c := n.getController() + serviceBindings := make([]*service, 0, len(c.serviceBindings)) c.Lock() - defer c.Unlock() + for _, s := range c.serviceBindings { + serviceBindings = append(serviceBindings, s) + } + c.Unlock() var lbs []*loadBalancer - for _, s := range c.serviceBindings { + for _, s := range serviceBindings { + s.Lock() if lb, ok := s.loadBalancers[n.ID()]; ok { lbs = append(lbs, lb) } + s.Unlock() } return lbs @@ -229,12 +235,14 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) { continue } + lb.service.Lock() addService := true for _, ip := range lb.backEnds { sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, addService) addService = false } + lb.service.Unlock() } } @@ -245,6 +253,10 @@ func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po n.WalkEndpoints(func(e Endpoint) bool { ep := e.(*endpoint) if sb, ok := ep.getSandbox(); ok { + if !sb.isEndpointPopulated(ep) { + return false + } + var gwIP net.IP if ep := sb.getGatewayEndpoint(); ep != nil { gwIP = ep.Iface().Address().IP @@ -264,6 +276,10 @@ func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Por n.WalkEndpoints(func(e Endpoint) bool { ep := e.(*endpoint) if sb, ok := ep.getSandbox(); ok { + if !sb.isEndpointPopulated(ep) { + return false + } + var gwIP net.IP if ep := sb.getGatewayEndpoint(); ep != nil { gwIP = ep.Iface().Address().IP @@ -356,15 +372,13 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po } if err := i.DelDestination(s, d); err != nil { - logrus.Errorf("Failed to delete real server %s for vip %s fwmark %d: %v", ip, vip, fwMark, err) - return + logrus.Infof("Failed to delete real server %s for vip %s fwmark %d: %v", ip, vip, fwMark, err) } if rmService { s.SchedName = ipvs.RoundRobin if err := i.DelService(s); err != nil { - logrus.Errorf("Failed to create a new service for vip %s fwmark %d: %v", vip, fwMark, err) - return + logrus.Errorf("Failed to delete a new service for vip %s fwmark %d: %v", vip, fwMark, err) } var iPorts []*PortConfig @@ -372,13 +386,11 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po iPorts = ingressPorts if err := programIngress(gwIP, iPorts, true); err != nil { logrus.Errorf("Failed to delete ingress: %v", err) - return } } if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, true); err != nil { logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err) - return } } } @@ -454,12 +466,17 @@ func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) erro rule := strings.Fields(fmt.Sprintf("-t nat %s %s -p %s --dport %d -j DNAT --to-destination %s:%d", addDelOpt, ingressChain, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, gwIP, iPort.PublishedPort)) if err := iptables.RawCombinedOutput(rule...); err != nil { - return fmt.Errorf("setting up rule failed, %v: %v", rule, err) + errStr := fmt.Sprintf("setting up rule failed, %v: %v", rule, err) + if !isDelete { + return fmt.Errorf("%s", errStr) + } + + logrus.Infof("%s", errStr) } } if err := plumbProxy(iPort, isDelete); err != nil { - return fmt.Errorf("failed to create proxy for port %d: %v", iPort.PublishedPort, err) + logrus.Warnf("failed to create proxy for port %d: %v", iPort.PublishedPort, err) } }