Explorar o código

Merge pull request #550 from sanimej/ext_conn

Serialize the endpoint join/leave at the sandbox level
Madhu Venugopal %!s(int64=9) %!d(string=hai) anos
pai
achega
01a0be8e33
Modificáronse 3 ficheiros con 85 adicións e 58 borrados
  1. 3 2
      libnetwork/default_gateway.go
  2. 23 37
      libnetwork/endpoint.go
  3. 59 19
      libnetwork/sandbox.go

+ 3 - 2
libnetwork/default_gateway.go

@@ -68,8 +68,9 @@ func (sb *sandbox) setupDefaultGW(srcEp *endpoint) error {
 	if err != nil {
 	if err != nil {
 		return fmt.Errorf("container %s: endpoint create on GW Network failed: %v", sb.containerID, err)
 		return fmt.Errorf("container %s: endpoint create on GW Network failed: %v", sb.containerID, err)
 	}
 	}
+	epLocal := newEp.(*endpoint)
 
 
-	if err := newEp.Join(sb); err != nil {
+	if err := epLocal.sbJoin(sb); err != nil {
 		return fmt.Errorf("container %s: endpoint join on GW Network failed: %v", sb.containerID, err)
 		return fmt.Errorf("container %s: endpoint join on GW Network failed: %v", sb.containerID, err)
 	}
 	}
 	return nil
 	return nil
@@ -82,7 +83,7 @@ func (sb *sandbox) clearDefaultGW() error {
 		return nil
 		return nil
 	}
 	}
 
 
-	if err := ep.Leave(sb); err != nil {
+	if err := ep.sbLeave(sb); err != nil {
 		return fmt.Errorf("container %s: endpoint leaving GW Network failed: %v", sb.containerID, err)
 		return fmt.Errorf("container %s: endpoint leaving GW Network failed: %v", sb.containerID, err)
 	}
 	}
 	if err := ep.Delete(); err != nil {
 	if err := ep.Delete(); err != nil {

+ 23 - 37
libnetwork/endpoint.go

@@ -188,53 +188,30 @@ func (ep *endpoint) processOptions(options ...EndpointOption) {
 	}
 	}
 }
 }
 
 
-// joinLeaveStart waits to ensure there are no joins or leaves in progress and
-// marks this join/leave in progress without race
-func (ep *endpoint) joinLeaveStart() {
-	ep.Lock()
-	defer ep.Unlock()
-
-	for ep.joinLeaveDone != nil {
-		joinLeaveDone := ep.joinLeaveDone
-		ep.Unlock()
-
-		select {
-		case <-joinLeaveDone:
-		}
+func (ep *endpoint) Join(sbox Sandbox, options ...EndpointOption) error {
 
 
-		ep.Lock()
+	if sbox == nil {
+		return types.BadRequestErrorf("endpoint cannot be joined by nil container")
 	}
 	}
 
 
-	ep.joinLeaveDone = make(chan struct{})
-}
+	sb, ok := sbox.(*sandbox)
+	if !ok {
+		return types.BadRequestErrorf("not a valid Sandbox interface")
+	}
 
 
-// joinLeaveEnd marks the end of this join/leave operation and
-// signals the same without race to other join and leave waiters
-func (ep *endpoint) joinLeaveEnd() {
-	ep.Lock()
-	defer ep.Unlock()
+	sb.joinLeaveStart()
+	defer sb.joinLeaveEnd()
 
 
-	if ep.joinLeaveDone != nil {
-		close(ep.joinLeaveDone)
-		ep.joinLeaveDone = nil
-	}
+	return ep.sbJoin(sbox, options...)
 }
 }
 
 
-func (ep *endpoint) Join(sbox Sandbox, options ...EndpointOption) error {
+func (ep *endpoint) sbJoin(sbox Sandbox, options ...EndpointOption) error {
 	var err error
 	var err error
-
-	if sbox == nil {
-		return types.BadRequestErrorf("endpoint cannot be joined by nil container")
-	}
-
 	sb, ok := sbox.(*sandbox)
 	sb, ok := sbox.(*sandbox)
 	if !ok {
 	if !ok {
 		return types.BadRequestErrorf("not a valid Sandbox interface")
 		return types.BadRequestErrorf("not a valid Sandbox interface")
 	}
 	}
 
 
-	ep.joinLeaveStart()
-	defer ep.joinLeaveEnd()
-
 	ep.Lock()
 	ep.Lock()
 	if ep.sandboxID != "" {
 	if ep.sandboxID != "" {
 		ep.Unlock()
 		ep.Unlock()
@@ -326,9 +303,6 @@ func (ep *endpoint) hasInterface(iName string) bool {
 }
 }
 
 
 func (ep *endpoint) Leave(sbox Sandbox, options ...EndpointOption) error {
 func (ep *endpoint) Leave(sbox Sandbox, options ...EndpointOption) error {
-	ep.joinLeaveStart()
-	defer ep.joinLeaveEnd()
-
 	if sbox == nil || sbox.ID() == "" || sbox.Key() == "" {
 	if sbox == nil || sbox.ID() == "" || sbox.Key() == "" {
 		return types.BadRequestErrorf("invalid Sandbox passed to enpoint leave: %v", sbox)
 		return types.BadRequestErrorf("invalid Sandbox passed to enpoint leave: %v", sbox)
 	}
 	}
@@ -338,6 +312,18 @@ func (ep *endpoint) Leave(sbox Sandbox, options ...EndpointOption) error {
 		return types.BadRequestErrorf("not a valid Sandbox interface")
 		return types.BadRequestErrorf("not a valid Sandbox interface")
 	}
 	}
 
 
+	sb.joinLeaveStart()
+	defer sb.joinLeaveEnd()
+
+	return ep.sbLeave(sbox, options...)
+}
+
+func (ep *endpoint) sbLeave(sbox Sandbox, options ...EndpointOption) error {
+	sb, ok := sbox.(*sandbox)
+	if !ok {
+		return types.BadRequestErrorf("not a valid Sandbox interface")
+	}
+
 	ep.Lock()
 	ep.Lock()
 	sid := ep.sandboxID
 	sid := ep.sandboxID
 	ep.Unlock()
 	ep.Unlock()

+ 59 - 19
libnetwork/sandbox.go

@@ -347,15 +347,16 @@ func (sb *sandbox) populateNetworkResources(ep *endpoint) error {
 		}
 		}
 	}
 	}
 
 
-	sb.Lock()
-	highEp := sb.endpoints[0]
-	sb.Unlock()
-	if ep == highEp {
-		if err := sb.updateGateway(ep); err != nil {
-			return err
+	for _, gwep := range sb.getConnectedEndpoints() {
+		if len(gwep.Gateway()) > 0 {
+			if gwep != ep {
+				return nil
+			}
+			if err := sb.updateGateway(gwep); err != nil {
+				return err
+			}
 		}
 		}
 	}
 	}
-
 	return nil
 	return nil
 }
 }
 
 
@@ -394,26 +395,33 @@ func (sb *sandbox) clearNetworkResources(ep *endpoint) error {
 		return nil
 		return nil
 	}
 	}
 
 
-	highEpBefore := sb.endpoints[0]
 	var (
 	var (
-		i int
-		e *endpoint
+		gwepBefore, gwepAfter *endpoint
+		index                 = -1
 	)
 	)
-	for i, e = range sb.endpoints {
+	for i, e := range sb.endpoints {
 		if e == ep {
 		if e == ep {
+			index = i
+		}
+		if len(e.Gateway()) > 0 && gwepBefore == nil {
+			gwepBefore = e
+		}
+		if index != -1 && gwepBefore != nil {
 			break
 			break
 		}
 		}
 	}
 	}
-	heap.Remove(&sb.endpoints, i)
-	var highEpAfter *endpoint
-	if len(sb.endpoints) > 0 {
-		highEpAfter = sb.endpoints[0]
+	heap.Remove(&sb.endpoints, index)
+	for _, e := range sb.endpoints {
+		if len(e.Gateway()) > 0 {
+			gwepAfter = e
+			break
+		}
 	}
 	}
 	delete(sb.epPriority, ep.ID())
 	delete(sb.epPriority, ep.ID())
 	sb.Unlock()
 	sb.Unlock()
 
 
-	if highEpBefore != highEpAfter {
-		sb.updateGateway(highEpAfter)
+	if gwepAfter != nil && gwepBefore != gwepAfter {
+		sb.updateGateway(gwepAfter)
 	}
 	}
 
 
 	return nil
 	return nil
@@ -634,6 +642,38 @@ func (sb *sandbox) updateDNS(ipv6Enabled bool) error {
 	return os.Rename(tmpResolvFile.Name(), sb.config.resolvConfPath)
 	return os.Rename(tmpResolvFile.Name(), sb.config.resolvConfPath)
 }
 }
 
 
+// joinLeaveStart waits to ensure there are no joins or leaves in progress and
+// marks this join/leave in progress without race
+func (sb *sandbox) joinLeaveStart() {
+	sb.Lock()
+	defer sb.Unlock()
+
+	for sb.joinLeaveDone != nil {
+		joinLeaveDone := sb.joinLeaveDone
+		sb.Unlock()
+
+		select {
+		case <-joinLeaveDone:
+		}
+
+		sb.Lock()
+	}
+
+	sb.joinLeaveDone = make(chan struct{})
+}
+
+// joinLeaveEnd marks the end of this join/leave operation and
+// signals the same without race to other join and leave waiters
+func (sb *sandbox) joinLeaveEnd() {
+	sb.Lock()
+	defer sb.Unlock()
+
+	if sb.joinLeaveDone != nil {
+		close(sb.joinLeaveDone)
+		sb.joinLeaveDone = nil
+	}
+}
+
 // OptionHostname function returns an option setter for hostname option to
 // OptionHostname function returns an option setter for hostname option to
 // be passed to NewSandbox method.
 // be passed to NewSandbox method.
 func OptionHostname(name string) SandboxOption {
 func OptionHostname(name string) SandboxOption {
@@ -757,11 +797,11 @@ func (eh epHeap) Less(i, j int) bool {
 	epj := eh[j]
 	epj := eh[j]
 
 
 	if epi.endpointInGWNetwork() {
 	if epi.endpointInGWNetwork() {
-		return true
+		return false
 	}
 	}
 
 
 	if epj.endpointInGWNetwork() {
 	if epj.endpointInGWNetwork() {
-		return false
+		return true
 	}
 	}
 
 
 	cip, ok := ci.epPriority[eh[i].ID()]
 	cip, ok := ci.epPriority[eh[i].ID()]