Przeglądaj źródła

Add container join priority option to endpoint

When an endpoint is joined by a container it may
optionally pass a priority to resolve resource
conflicts inside the sandbox when more than one
endpoint provides the same kind of resource. If the
the priority is the same for two endpoints with
conflicting resources then the endpoint network names
are used to resolve the conflict.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
Jana Radhakrishnan 10 lat temu
rodzic
commit
a93d08aef5

+ 0 - 51
libnetwork/controller.go

@@ -59,7 +59,6 @@ import (
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/driverapi"
 	"github.com/docker/libnetwork/hostdiscovery"
-	"github.com/docker/libnetwork/sandbox"
 	"github.com/docker/libnetwork/types"
 	"github.com/docker/swarm/pkg/store"
 )
@@ -91,11 +90,6 @@ type NetworkController interface {
 // When the function returns true, the walk will stop.
 type NetworkWalker func(nw Network) bool
 
-type sandboxData struct {
-	sandbox sandbox.Sandbox
-	refCnt  int
-}
-
 type networkTable map[types.UUID]*network
 type endpointTable map[types.UUID]*endpoint
 type sandboxTable map[string]*sandboxData
@@ -382,51 +376,6 @@ func (c *controller) NetworkByID(id string) (Network, error) {
 	return nil, ErrNoSuchNetwork(id)
 }
 
-func (c *controller) sandboxAdd(key string, create bool) (sandbox.Sandbox, error) {
-	c.Lock()
-	defer c.Unlock()
-
-	sData, ok := c.sandboxes[key]
-	if !ok {
-		sb, err := sandbox.NewSandbox(key, create)
-		if err != nil {
-			return nil, err
-		}
-
-		sData = &sandboxData{sandbox: sb, refCnt: 1}
-		c.sandboxes[key] = sData
-		return sData.sandbox, nil
-	}
-
-	sData.refCnt++
-	return sData.sandbox, nil
-}
-
-func (c *controller) sandboxRm(key string) {
-	c.Lock()
-	defer c.Unlock()
-
-	sData := c.sandboxes[key]
-	sData.refCnt--
-
-	if sData.refCnt == 0 {
-		sData.sandbox.Destroy()
-		delete(c.sandboxes, key)
-	}
-}
-
-func (c *controller) sandboxGet(key string) sandbox.Sandbox {
-	c.Lock()
-	defer c.Unlock()
-
-	sData, ok := c.sandboxes[key]
-	if !ok {
-		return nil
-	}
-
-	return sData.sandbox
-}
-
 func (c *controller) loadDriver(networkType string) (driverapi.Driver, error) {
 	// Plugins pkg performs lazy loading of plugins that acts as remote drivers.
 	// As per the design, this Get call will result in remote driver discovery if there is a corresponding plugin available.

+ 12 - 59
libnetwork/endpoint.go

@@ -8,7 +8,6 @@ import (
 	"path/filepath"
 	"sync"
 
-	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/libnetwork/etchosts"
 	"github.com/docker/libnetwork/netlabel"
@@ -78,6 +77,7 @@ type containerConfig struct {
 	resolvConfPathConfig
 	generic           map[string]interface{}
 	useDefaultSandBox bool
+	prio              int // higher the value, more the priority
 }
 
 type extraHost struct {
@@ -101,7 +101,6 @@ type endpoint struct {
 	name          string
 	id            types.UUID
 	network       *network
-	sandboxInfo   *sandbox.Info
 	iFaces        []*endpointInterface
 	joinInfo      *endpointJoinInfo
 	container     *containerInfo
@@ -233,8 +232,6 @@ func (ep *endpoint) Join(containerID string, options ...EndpointOption) error {
 	container := ep.container
 	network := ep.network
 	epid := ep.id
-	joinInfo := ep.joinInfo
-	ifaces := ep.iFaces
 
 	ep.Unlock()
 	defer func() {
@@ -278,49 +275,16 @@ func (ep *endpoint) Join(containerID string, options ...EndpointOption) error {
 		return err
 	}
 
-	sb, err := ctrlr.sandboxAdd(sboxKey, !container.config.useDefaultSandBox)
+	sb, err := ctrlr.sandboxAdd(sboxKey, !container.config.useDefaultSandBox, ep)
 	if err != nil {
 		return err
 	}
 	defer func() {
 		if err != nil {
-			ctrlr.sandboxRm(sboxKey)
+			ctrlr.sandboxRm(sboxKey, ep)
 		}
 	}()
 
-	for _, i := range ifaces {
-		iface := &sandbox.Interface{
-			SrcName: i.srcName,
-			DstName: i.dstPrefix,
-			Address: &i.addr,
-			Routes:  i.routes,
-		}
-		if i.addrv6.IP.To16() != nil {
-			iface.AddressIPv6 = &i.addrv6
-		}
-		err = sb.AddInterface(iface)
-		if err != nil {
-			return err
-		}
-	}
-	// Set up non-interface routes.
-	for _, r := range ep.joinInfo.StaticRoutes {
-		err = sb.AddStaticRoute(r)
-		if err != nil {
-			return err
-		}
-	}
-
-	err = sb.SetGateway(joinInfo.gw)
-	if err != nil {
-		return err
-	}
-
-	err = sb.SetGatewayIPv6(joinInfo.gw6)
-	if err != nil {
-		return err
-	}
-
 	container.data.SandboxKey = sb.Key()
 
 	return nil
@@ -372,26 +336,7 @@ func (ep *endpoint) Leave(containerID string, options ...EndpointOption) error {
 
 	err = driver.Leave(n.id, ep.id)
 
-	sb := ctrlr.sandboxGet(container.data.SandboxKey)
-	for _, i := range sb.Interfaces() {
-		// Only remove the interfaces owned by this endpoint from the sandbox.
-		if ep.hasInterface(i.SrcName) {
-			err = sb.RemoveInterface(i)
-			if err != nil {
-				logrus.Debugf("Remove interface failed: %v", err)
-			}
-		}
-	}
-
-	// Remove non-interface routes.
-	for _, r := range ep.joinInfo.StaticRoutes {
-		err = sb.RemoveStaticRoute(r)
-		if err != nil {
-			logrus.Debugf("Remove route failed: %v", err)
-		}
-	}
-
-	ctrlr.sandboxRm(container.data.SandboxKey)
+	ctrlr.sandboxRm(container.data.SandboxKey, ep)
 
 	return err
 }
@@ -648,6 +593,14 @@ func EndpointOptionGeneric(generic map[string]interface{}) EndpointOption {
 	}
 }
 
+// JoinOptionPriority function returns an option setter for priority option to
+// be passed to endpoint Join method.
+func JoinOptionPriority(prio int) EndpointOption {
+	return func(ep *endpoint) {
+		ep.container.config.prio = prio
+	}
+}
+
 // JoinOptionHostname function returns an option setter for hostname option to
 // be passed to endpoint Join method.
 func JoinOptionHostname(name string) EndpointOption {

+ 246 - 0
libnetwork/sandboxdata.go

@@ -0,0 +1,246 @@
+package libnetwork
+
+import (
+	"container/heap"
+	"sync"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/docker/libnetwork/sandbox"
+)
+
+type epHeap []*endpoint
+
+type sandboxData struct {
+	sbox      sandbox.Sandbox
+	refCnt    int
+	endpoints epHeap
+	sync.Mutex
+}
+
+func (eh epHeap) Len() int { return len(eh) }
+
+func (eh epHeap) Less(i, j int) bool {
+	eh[i].Lock()
+	eh[j].Lock()
+	defer eh[j].Unlock()
+	defer eh[i].Unlock()
+
+	if eh[i].container.config.prio == eh[j].container.config.prio {
+		return eh[i].network.Name() < eh[j].network.Name()
+	}
+
+	return eh[i].container.config.prio > eh[j].container.config.prio
+}
+
+func (eh epHeap) Swap(i, j int) { eh[i], eh[j] = eh[j], eh[i] }
+
+func (eh *epHeap) Push(x interface{}) {
+	*eh = append(*eh, x.(*endpoint))
+}
+
+func (eh *epHeap) Pop() interface{} {
+	old := *eh
+	n := len(old)
+	x := old[n-1]
+	*eh = old[0 : n-1]
+	return x
+}
+
+func (s *sandboxData) updateGateway(ep *endpoint) error {
+	sb := s.sandbox()
+	if err := sb.UnsetGateway(); err != nil {
+		return err
+	}
+
+	if err := sb.UnsetGatewayIPv6(); err != nil {
+		return err
+	}
+
+	if ep == nil {
+		return nil
+	}
+
+	ep.Lock()
+	joinInfo := ep.joinInfo
+	ep.Unlock()
+
+	if err := sb.SetGateway(joinInfo.gw); err != nil {
+		return err
+	}
+
+	if err := sb.SetGatewayIPv6(joinInfo.gw6); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (s *sandboxData) addEndpoint(ep *endpoint) error {
+	ep.Lock()
+	joinInfo := ep.joinInfo
+	ifaces := ep.iFaces
+	ep.Unlock()
+
+	sb := s.sandbox()
+	for _, i := range ifaces {
+		iface := &sandbox.Interface{
+			SrcName: i.srcName,
+			DstName: i.dstPrefix,
+			Address: &i.addr,
+			Routes:  i.routes,
+		}
+		if i.addrv6.IP.To16() != nil {
+			iface.AddressIPv6 = &i.addrv6
+		}
+
+		if err := sb.AddInterface(iface); err != nil {
+			return err
+		}
+	}
+
+	if joinInfo != nil {
+		// Set up non-interface routes.
+		for _, r := range ep.joinInfo.StaticRoutes {
+			if err := sb.AddStaticRoute(r); err != nil {
+				return err
+			}
+		}
+	}
+
+	s.Lock()
+	heap.Push(&s.endpoints, ep)
+	highEp := s.endpoints[0]
+	s.Unlock()
+
+	if ep == highEp {
+		if err := s.updateGateway(ep); err != nil {
+			return err
+		}
+	}
+
+	s.Lock()
+	s.refCnt++
+	s.Unlock()
+
+	return nil
+}
+
+func (s *sandboxData) rmEndpoint(ep *endpoint) int {
+	ep.Lock()
+	joinInfo := ep.joinInfo
+	ep.Unlock()
+
+	sb := s.sandbox()
+	for _, i := range sb.Interfaces() {
+		// Only remove the interfaces owned by this endpoint from the sandbox.
+		if ep.hasInterface(i.SrcName) {
+			if err := sb.RemoveInterface(i); err != nil {
+				logrus.Debugf("Remove interface failed: %v", err)
+			}
+		}
+	}
+
+	// Remove non-interface routes.
+	for _, r := range joinInfo.StaticRoutes {
+		if err := sb.RemoveStaticRoute(r); err != nil {
+			logrus.Debugf("Remove route failed: %v", err)
+		}
+	}
+
+	// We don't check if s.endpoints is empty here because
+	// it should never be empty during a rmEndpoint call and
+	// if it is we will rightfully panic here
+	s.Lock()
+	highEpBefore := s.endpoints[0]
+	var (
+		i int
+		e *endpoint
+	)
+	for i, e = range s.endpoints {
+		if e == ep {
+			break
+		}
+	}
+	heap.Remove(&s.endpoints, i)
+	var highEpAfter *endpoint
+	if len(s.endpoints) > 0 {
+		highEpAfter = s.endpoints[0]
+	}
+
+	s.Unlock()
+
+	if highEpBefore != highEpAfter {
+		s.updateGateway(highEpAfter)
+	}
+
+	s.Lock()
+	s.refCnt--
+	refCnt := s.refCnt
+	s.Unlock()
+
+	if refCnt == 0 {
+		s.sandbox().Destroy()
+	}
+
+	return refCnt
+}
+
+func (s *sandboxData) sandbox() sandbox.Sandbox {
+	s.Lock()
+	defer s.Unlock()
+
+	return s.sbox
+}
+
+func (c *controller) sandboxAdd(key string, create bool, ep *endpoint) (sandbox.Sandbox, error) {
+	c.Lock()
+	sData, ok := c.sandboxes[key]
+	c.Unlock()
+
+	if !ok {
+		sb, err := sandbox.NewSandbox(key, create)
+		if err != nil {
+			return nil, err
+		}
+
+		sData = &sandboxData{
+			sbox:      sb,
+			endpoints: epHeap{},
+		}
+
+		heap.Init(&sData.endpoints)
+		c.Lock()
+		c.sandboxes[key] = sData
+		c.Unlock()
+	}
+
+	if err := sData.addEndpoint(ep); err != nil {
+		return nil, err
+	}
+
+	return sData.sandbox(), nil
+}
+
+func (c *controller) sandboxRm(key string, ep *endpoint) {
+	c.Lock()
+	sData := c.sandboxes[key]
+	c.Unlock()
+
+	if sData.rmEndpoint(ep) == 0 {
+		c.Lock()
+		delete(c.sandboxes, key)
+		c.Unlock()
+	}
+}
+
+func (c *controller) sandboxGet(key string) sandbox.Sandbox {
+	c.Lock()
+	sData, ok := c.sandboxes[key]
+	c.Unlock()
+
+	if !ok {
+		return nil
+	}
+
+	return sData.sandbox()
+}

+ 130 - 0
libnetwork/sandboxdata_test.go

@@ -0,0 +1,130 @@
+package libnetwork
+
+import "testing"
+
+func createEmptyCtrlr() *controller {
+	return &controller{sandboxes: sandboxTable{}}
+}
+
+func createEmptyEndpoint() *endpoint {
+	return &endpoint{
+		container: &containerInfo{},
+		joinInfo:  &endpointJoinInfo{},
+		iFaces:    []*endpointInterface{},
+	}
+}
+
+func TestSandboxAddEmpty(t *testing.T) {
+	ctrlr := createEmptyCtrlr()
+	ep := createEmptyEndpoint()
+
+	if _, err := ctrlr.sandboxAdd("sandbox1", true, ep); err != nil {
+		t.Fatal(err)
+	}
+
+	if ctrlr.sandboxes["sandbox1"].refCnt != 1 {
+		t.Fatalf("Unexpected sandbox ref count. Expected 1, got %d",
+			ctrlr.sandboxes["sandbox1"].refCnt)
+	}
+
+	ctrlr.sandboxRm("sandbox1", ep)
+	if len(ctrlr.sandboxes) != 0 {
+		t.Fatalf("controller sandboxes is not empty. len = %d", len(ctrlr.sandboxes))
+	}
+}
+
+func TestSandboxAddMultiPrio(t *testing.T) {
+	ctrlr := createEmptyCtrlr()
+	ep1 := createEmptyEndpoint()
+	ep2 := createEmptyEndpoint()
+	ep3 := createEmptyEndpoint()
+
+	ep1.container.config.prio = 1
+	ep2.container.config.prio = 2
+	ep3.container.config.prio = 3
+
+	if _, err := ctrlr.sandboxAdd("sandbox1", true, ep1); err != nil {
+		t.Fatal(err)
+	}
+
+	if _, err := ctrlr.sandboxAdd("sandbox1", true, ep2); err != nil {
+		t.Fatal(err)
+	}
+
+	if _, err := ctrlr.sandboxAdd("sandbox1", true, ep3); err != nil {
+		t.Fatal(err)
+	}
+
+	if ctrlr.sandboxes["sandbox1"].refCnt != 3 {
+		t.Fatalf("Unexpected sandbox ref count. Expected 3, got %d",
+			ctrlr.sandboxes["sandbox1"].refCnt)
+	}
+
+	if ctrlr.sandboxes["sandbox1"].endpoints[0] != ep3 {
+		t.Fatal("Expected ep3 to be at the top of the heap. But did not find ep3 at the top of the heap")
+	}
+
+	ctrlr.sandboxRm("sandbox1", ep3)
+
+	if ctrlr.sandboxes["sandbox1"].endpoints[0] != ep2 {
+		t.Fatal("Expected ep2 to be at the top of the heap after removing ep3. But did not find ep2 at the top of the heap")
+	}
+
+	ctrlr.sandboxRm("sandbox1", ep2)
+
+	if ctrlr.sandboxes["sandbox1"].endpoints[0] != ep1 {
+		t.Fatal("Expected ep1 to be at the top of the heap after removing ep2. But did not find ep1 at the top of the heap")
+	}
+
+	// Re-add ep3 back
+	if _, err := ctrlr.sandboxAdd("sandbox1", true, ep3); err != nil {
+		t.Fatal(err)
+	}
+
+	if ctrlr.sandboxes["sandbox1"].endpoints[0] != ep3 {
+		t.Fatal("Expected ep3 to be at the top of the heap after adding ep3 back. But did not find ep3 at the top of the heap")
+	}
+
+	ctrlr.sandboxRm("sandbox1", ep3)
+	ctrlr.sandboxRm("sandbox1", ep1)
+	if len(ctrlr.sandboxes) != 0 {
+		t.Fatalf("controller sandboxes is not empty. len = %d", len(ctrlr.sandboxes))
+	}
+}
+
+func TestSandboxAddSamePrio(t *testing.T) {
+	ctrlr := createEmptyCtrlr()
+	ep1 := createEmptyEndpoint()
+	ep2 := createEmptyEndpoint()
+
+	ep1.network = &network{name: "aaa"}
+	ep2.network = &network{name: "bbb"}
+
+	if _, err := ctrlr.sandboxAdd("sandbox1", true, ep1); err != nil {
+		t.Fatal(err)
+	}
+
+	if _, err := ctrlr.sandboxAdd("sandbox1", true, ep2); err != nil {
+		t.Fatal(err)
+	}
+
+	if ctrlr.sandboxes["sandbox1"].refCnt != 2 {
+		t.Fatalf("Unexpected sandbox ref count. Expected 2, got %d",
+			ctrlr.sandboxes["sandbox1"].refCnt)
+	}
+
+	if ctrlr.sandboxes["sandbox1"].endpoints[0] != ep1 {
+		t.Fatal("Expected ep1 to be at the top of the heap. But did not find ep1 at the top of the heap")
+	}
+
+	ctrlr.sandboxRm("sandbox1", ep1)
+
+	if ctrlr.sandboxes["sandbox1"].endpoints[0] != ep2 {
+		t.Fatal("Expected ep2 to be at the top of the heap after removing ep3. But did not find ep2 at the top of the heap")
+	}
+
+	ctrlr.sandboxRm("sandbox1", ep2)
+	if len(ctrlr.sandboxes) != 0 {
+		t.Fatalf("controller sandboxes is not empty. len = %d", len(ctrlr.sandboxes))
+	}
+}