Procházet zdrojové kódy

Merge pull request #33634 from mavenugo/sdh

Vendoring libnetwork f4a15a0890383619ad797b3bd2481cc6f46a978d
Kenfe-Mickaël Laventure před 8 roky
rodič
revize
c23d76b29b

+ 1 - 1
vendor.conf

@@ -26,7 +26,7 @@ github.com/imdario/mergo 0.2.1
 golang.org/x/sync de49d9dcd27d4f764488181bea099dfe6179bcf0
 
 #get libnetwork packages
-github.com/docker/libnetwork eb57059e91bc54c9da23c5a633b75b3faf910a68
+github.com/docker/libnetwork f4a15a0890383619ad797b3bd2481cc6f46a978d
 github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894
 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
 github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec

+ 84 - 41
vendor/github.com/docker/libnetwork/agent.go

@@ -583,7 +583,7 @@ func (ep *endpoint) deleteDriverInfoFromCluster() error {
 	return nil
 }
 
-func (ep *endpoint) addServiceInfoToCluster() error {
+func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
 	if ep.isAnonymous() && len(ep.myAliases) == 0 || ep.Iface().Address() == nil {
 		return nil
 	}
@@ -593,24 +593,49 @@ func (ep *endpoint) addServiceInfoToCluster() error {
 		return nil
 	}
 
+	sb.Service.Lock()
+	defer sb.Service.Unlock()
+	logrus.Debugf("addServiceInfoToCluster START for %s %s", ep.svcName, ep.ID())
+
+	// Check that the endpoint is still present on the sandbox before adding it to the service discovery.
+	// This is to handle a race between the EnableService and the sbLeave
+	// It is possible that the EnableService starts, fetches the list of the endpoints and
+	// by the time the addServiceInfoToCluster is called the endpoint got removed from the sandbox
+	// The risk is that the deleteServiceInfoToCluster happens before the addServiceInfoToCluster.
+	// This check under the Service lock of the sandbox ensure the correct behavior.
+	// If the addServiceInfoToCluster arrives first may find or not the endpoint and will proceed or exit
+	// but in any case the deleteServiceInfoToCluster will follow doing the cleanup if needed.
+	// In case the deleteServiceInfoToCluster arrives first, this one is happening after the endpoint is
+	// removed from the list, in this situation the delete will bail out not finding any data to cleanup
+	// and the add will bail out not finding the endpoint on the sandbox.
+	if e := sb.getEndpoint(ep.ID()); e == nil {
+		logrus.Warnf("addServiceInfoToCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
+		return nil
+	}
+
 	c := n.getController()
 	agent := c.getAgent()
 
+	name := ep.Name()
+	if ep.isAnonymous() {
+		name = ep.MyAliases()[0]
+	}
+
 	var ingressPorts []*PortConfig
 	if ep.svcID != "" {
+		// This is a task part of a service
 		// Gossip ingress ports only in ingress network.
 		if n.ingress {
 			ingressPorts = ep.ingressPorts
 		}
-
-		if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
+		if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
+			return err
+		}
+	} else {
+		// This is a container simply attached to an attachable network
+		if err := c.addContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
 			return err
 		}
-	}
-
-	name := ep.Name()
-	if ep.isAnonymous() {
-		name = ep.MyAliases()[0]
 	}
 
 	buf, err := proto.Marshal(&EndpointRecord{
@@ -634,10 +659,12 @@ func (ep *endpoint) addServiceInfoToCluster() error {
 		}
 	}
 
+	logrus.Debugf("addServiceInfoToCluster END for %s %s", ep.svcName, ep.ID())
+
 	return nil
 }
 
-func (ep *endpoint) deleteServiceInfoFromCluster() error {
+func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) error {
 	if ep.isAnonymous() && len(ep.myAliases) == 0 {
 		return nil
 	}
@@ -647,17 +674,33 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
 		return nil
 	}
 
+	sb.Service.Lock()
+	defer sb.Service.Unlock()
+	logrus.Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())
+
 	c := n.getController()
 	agent := c.getAgent()
 
-	if ep.svcID != "" && ep.Iface().Address() != nil {
-		var ingressPorts []*PortConfig
-		if n.ingress {
-			ingressPorts = ep.ingressPorts
-		}
+	name := ep.Name()
+	if ep.isAnonymous() {
+		name = ep.MyAliases()[0]
+	}
 
-		if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
-			return err
+	if ep.Iface().Address() != nil {
+		if ep.svcID != "" {
+			// This is a task part of a service
+			var ingressPorts []*PortConfig
+			if n.ingress {
+				ingressPorts = ep.ingressPorts
+			}
+			if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
+				return err
+			}
+		} else {
+			// This is a container simply attached to an attachable network
+			if err := c.delContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
+				return err
+			}
 		}
 	}
 
@@ -667,6 +710,8 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
 		}
 	}
 
+	logrus.Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())
+
 	return nil
 }
 
@@ -814,58 +859,56 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
 		value = event.Value
 	case networkdb.UpdateEvent:
 		logrus.Errorf("Unexpected update service table event = %#v", event)
-	}
-
-	nw, err := c.NetworkByID(nid)
-	if err != nil {
-		logrus.Errorf("Could not find network %s while handling service table event: %v", nid, err)
 		return
 	}
-	n := nw.(*network)
 
-	err = proto.Unmarshal(value, &epRec)
+	err := proto.Unmarshal(value, &epRec)
 	if err != nil {
 		logrus.Errorf("Failed to unmarshal service table value: %v", err)
 		return
 	}
 
-	name := epRec.Name
+	containerName := epRec.Name
 	svcName := epRec.ServiceName
 	svcID := epRec.ServiceID
 	vip := net.ParseIP(epRec.VirtualIP)
 	ip := net.ParseIP(epRec.EndpointIP)
 	ingressPorts := epRec.IngressPorts
-	aliases := epRec.Aliases
-	taskaliases := epRec.TaskAliases
+	serviceAliases := epRec.Aliases
+	taskAliases := epRec.TaskAliases
 
-	if name == "" || ip == nil {
+	if containerName == "" || ip == nil {
 		logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
 		return
 	}
 
 	if isAdd {
+		logrus.Debugf("handleEpTableEvent ADD %s R:%v", isAdd, eid, epRec)
 		if svcID != "" {
-			if err := c.addServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
-				logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
+			// This is a remote task part of a service
+			if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
+				logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
 				return
 			}
-		}
-
-		n.addSvcRecords(name, ip, nil, true)
-		for _, alias := range taskaliases {
-			n.addSvcRecords(alias, ip, nil, true)
+		} else {
+			// This is a remote container simply attached to an attachable network
+			if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
+				logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
+			}
 		}
 	} else {
+		logrus.Debugf("handleEpTableEvent DEL %s R:%v", isAdd, eid, epRec)
 		if svcID != "" {
-			if err := c.rmServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
-				logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
+			// This is a remote task part of a service
+			if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
+				logrus.Errorf("failed removing service binding for %s epRec:%v err:%s", eid, epRec, err)
 				return
 			}
-		}
-
-		n.deleteSvcRecords(name, ip, nil, true)
-		for _, alias := range taskaliases {
-			n.deleteSvcRecords(alias, ip, nil, true)
+		} else {
+			// This is a remote container simply attached to an attachable network
+			if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
+				logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
+			}
 		}
 	}
 }

+ 1 - 1
vendor/github.com/docker/libnetwork/agent.proto

@@ -14,7 +14,7 @@ option (gogoproto.goproto_stringer_all) = false;
 // EndpointRecord specifies all the endpoint specific information that
 // needs to gossiped to nodes participating in the network.
 message EndpointRecord {
-	// Name of the endpoint
+	// Name of the container
 	string name = 1;
 
 	// Service name of the service to which this endpoint belongs.

+ 123 - 0
vendor/github.com/docker/libnetwork/common/setmatrix.go

@@ -0,0 +1,123 @@
+package common
+
+import (
+	"sync"
+
+	mapset "github.com/deckarep/golang-set"
+)
+
+// SetMatrix is a map of Sets
+type SetMatrix interface {
+	// Get returns the members of the set for a specific key as a slice.
+	Get(key string) ([]interface{}, bool)
+	// Contains is used to verify is an element is in a set for a specific key
+	// returns true if the element is in the set
+	// returns true if there is a set for the key
+	Contains(key string, value interface{}) (bool, bool)
+	// Insert inserts the mapping between the IP and the endpoint identifier
+	// returns true if the mapping was not present, false otherwise
+	// returns also the number of endpoints associated to the IP
+	Insert(key string, value interface{}) (bool, int)
+	// Remove removes the mapping between the IP and the endpoint identifier
+	// returns true if the mapping was deleted, false otherwise
+	// returns also the number of endpoints associated to the IP
+	Remove(key string, value interface{}) (bool, int)
+	// Cardinality returns the number of elements in the set of a specfic key
+	// returns false if the key is not in the map
+	Cardinality(key string) (int, bool)
+	// String returns the string version of the set, empty otherwise
+	// returns false if the key is not in the map
+	String(key string) (string, bool)
+}
+
+type setMatrix struct {
+	matrix map[string]mapset.Set
+
+	sync.Mutex
+}
+
+// NewSetMatrix creates a new set matrix object
+func NewSetMatrix() SetMatrix {
+	s := &setMatrix{}
+	s.init()
+	return s
+}
+
+func (s *setMatrix) init() {
+	s.matrix = make(map[string]mapset.Set)
+}
+
+func (s *setMatrix) Get(key string) ([]interface{}, bool) {
+	s.Lock()
+	defer s.Unlock()
+	set, ok := s.matrix[key]
+	if !ok {
+		return nil, ok
+	}
+	return set.ToSlice(), ok
+}
+
+func (s *setMatrix) Contains(key string, value interface{}) (bool, bool) {
+	s.Lock()
+	defer s.Unlock()
+	set, ok := s.matrix[key]
+	if !ok {
+		return false, ok
+	}
+	return set.Contains(value), ok
+}
+
+func (s *setMatrix) Insert(key string, value interface{}) (bool, int) {
+	s.Lock()
+	defer s.Unlock()
+	set, ok := s.matrix[key]
+	if !ok {
+		s.matrix[key] = mapset.NewSet()
+		s.matrix[key].Add(value)
+		return true, 1
+	}
+
+	return set.Add(value), set.Cardinality()
+}
+
+func (s *setMatrix) Remove(key string, value interface{}) (bool, int) {
+	s.Lock()
+	defer s.Unlock()
+	set, ok := s.matrix[key]
+	if !ok {
+		return false, 0
+	}
+
+	var removed bool
+	if set.Contains(value) {
+		set.Remove(value)
+		removed = true
+		// If the set is empty remove it from the matrix
+		if set.Cardinality() == 0 {
+			delete(s.matrix, key)
+		}
+	}
+
+	return removed, set.Cardinality()
+}
+
+func (s *setMatrix) Cardinality(key string) (int, bool) {
+	s.Lock()
+	defer s.Unlock()
+	set, ok := s.matrix[key]
+	if !ok {
+		return 0, ok
+	}
+
+	return set.Cardinality(), ok
+}
+
+func (s *setMatrix) String(key string) (string, bool) {
+	s.Lock()
+	defer s.Unlock()
+	set, ok := s.matrix[key]
+	if !ok {
+		return "", ok
+	}
+	return set.String(), ok
+}

+ 11 - 5
vendor/github.com/docker/libnetwork/endpoint.go

@@ -597,8 +597,14 @@ func (ep *endpoint) rename(name string) error {
 
 	c := n.getController()
 
+	sb, ok := ep.getSandbox()
+	if !ok {
+		logrus.Warnf("rename for %s aborted, sandbox %s is not anymore present", ep.ID(), ep.sandboxID)
+		return nil
+	}
+
 	if c.isAgent() {
-		if err = ep.deleteServiceInfoFromCluster(); err != nil {
+		if err = ep.deleteServiceInfoFromCluster(sb, "rename"); err != nil {
 			return types.InternalErrorf("Could not delete service state for endpoint %s from cluster on rename: %v", ep.Name(), err)
 		}
 	} else {
@@ -617,15 +623,15 @@ func (ep *endpoint) rename(name string) error {
 	ep.anonymous = false
 
 	if c.isAgent() {
-		if err = ep.addServiceInfoToCluster(); err != nil {
+		if err = ep.addServiceInfoToCluster(sb); err != nil {
 			return types.InternalErrorf("Could not add service state for endpoint %s to cluster on rename: %v", ep.Name(), err)
 		}
 		defer func() {
 			if err != nil {
-				ep.deleteServiceInfoFromCluster()
+				ep.deleteServiceInfoFromCluster(sb, "rename")
 				ep.name = oldName
 				ep.anonymous = oldAnonymous
-				ep.addServiceInfoToCluster()
+				ep.addServiceInfoToCluster(sb)
 			}
 		}()
 	} else {
@@ -746,7 +752,7 @@ func (ep *endpoint) sbLeave(sb *sandbox, force bool, options ...EndpointOption)
 		return err
 	}
 
-	if e := ep.deleteServiceInfoFromCluster(); e != nil {
+	if e := ep.deleteServiceInfoFromCluster(sb, "sbLeave"); e != nil {
 		logrus.Errorf("Could not delete service state for endpoint %s from cluster: %v", ep.Name(), e)
 	}
 

+ 53 - 26
vendor/github.com/docker/libnetwork/network.go

@@ -10,6 +10,7 @@ import (
 
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/pkg/stringid"
+	"github.com/docker/libnetwork/common"
 	"github.com/docker/libnetwork/config"
 	"github.com/docker/libnetwork/datastore"
 	"github.com/docker/libnetwork/driverapi"
@@ -97,7 +98,7 @@ type ipInfo struct {
 type svcInfo struct {
 	svcMap     map[string][]net.IP
 	svcIPv6Map map[string][]net.IP
-	ipMap      map[string]*ipInfo
+	ipMap      common.SetMatrix
 	service    map[string][]servicePorts
 }
 
@@ -990,6 +991,12 @@ func (n *network) delete(force bool) error {
 
 	c.cleanupServiceBindings(n.ID())
 
+	// The network had been left, the service discovery can be cleaned up
+	c.Lock()
+	logrus.Debugf("network %s delete, clean svcRecords", n.id)
+	delete(c.svcRecords, n.id)
+	c.Unlock()
+
 removeFromStore:
 	// deleteFromStore performs an atomic delete operation and the
 	// network.epCnt will help prevent any possible
@@ -1227,36 +1234,34 @@ func (n *network) updateSvcRecord(ep *endpoint, localEps []*endpoint, isAdd bool
 			// breaks some apps
 			if ep.isAnonymous() {
 				if len(myAliases) > 0 {
-					n.addSvcRecords(myAliases[0], iface.Address().IP, ipv6, true)
+					n.addSvcRecords(ep.ID(), myAliases[0], iface.Address().IP, ipv6, true, "updateSvcRecord")
 				}
 			} else {
-				n.addSvcRecords(epName, iface.Address().IP, ipv6, true)
+				n.addSvcRecords(ep.ID(), epName, iface.Address().IP, ipv6, true, "updateSvcRecord")
 			}
 			for _, alias := range myAliases {
-				n.addSvcRecords(alias, iface.Address().IP, ipv6, false)
+				n.addSvcRecords(ep.ID(), alias, iface.Address().IP, ipv6, false, "updateSvcRecord")
 			}
 		} else {
 			if ep.isAnonymous() {
 				if len(myAliases) > 0 {
-					n.deleteSvcRecords(myAliases[0], iface.Address().IP, ipv6, true)
+					n.deleteSvcRecords(ep.ID(), myAliases[0], iface.Address().IP, ipv6, true, "updateSvcRecord")
 				}
 			} else {
-				n.deleteSvcRecords(epName, iface.Address().IP, ipv6, true)
+				n.deleteSvcRecords(ep.ID(), epName, iface.Address().IP, ipv6, true, "updateSvcRecord")
 			}
 			for _, alias := range myAliases {
-				n.deleteSvcRecords(alias, iface.Address().IP, ipv6, false)
+				n.deleteSvcRecords(ep.ID(), alias, iface.Address().IP, ipv6, false, "updateSvcRecord")
 			}
 		}
 	}
 }
 
-func addIPToName(ipMap map[string]*ipInfo, name string, ip net.IP) {
+func addIPToName(ipMap common.SetMatrix, name string, ip net.IP) {
 	reverseIP := netutils.ReverseIP(ip.String())
-	if _, ok := ipMap[reverseIP]; !ok {
-		ipMap[reverseIP] = &ipInfo{
-			name: name,
-		}
-	}
+	ipMap.Insert(reverseIP, ipInfo{
+		name: name,
+	})
 }
 
 func addNameToIP(svcMap map[string][]net.IP, name string, epIP net.IP) {
@@ -1284,24 +1289,25 @@ func delNameToIP(svcMap map[string][]net.IP, name string, epIP net.IP) {
 	}
 }
 
-func (n *network) addSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool) {
+func (n *network) addSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) {
 	// Do not add service names for ingress network as this is a
 	// routing only network
 	if n.ingress {
 		return
 	}
 
-	logrus.Debugf("(%s).addSvcRecords(%s, %s, %s, %t)", n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate)
+	logrus.Debugf("%s (%s).addSvcRecords(%s, %s, %s, %t) %s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method)
 
 	c := n.getController()
 	c.Lock()
 	defer c.Unlock()
+
 	sr, ok := c.svcRecords[n.ID()]
 	if !ok {
 		sr = svcInfo{
 			svcMap:     make(map[string][]net.IP),
 			svcIPv6Map: make(map[string][]net.IP),
-			ipMap:      make(map[string]*ipInfo),
+			ipMap:      common.NewSetMatrix(),
 		}
 		c.svcRecords[n.ID()] = sr
 	}
@@ -1319,28 +1325,33 @@ func (n *network) addSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMapUp
 	}
 }
 
-func (n *network) deleteSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool) {
+func (n *network) deleteSvcRecords(eID, name string, epIP net.IP, epIPv6 net.IP, ipMapUpdate bool, method string) {
 	// Do not delete service names from ingress network as this is a
 	// routing only network
 	if n.ingress {
 		return
 	}
 
-	logrus.Debugf("(%s).deleteSvcRecords(%s, %s, %s, %t)", n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate)
+	logrus.Debugf("%s (%s).deleteSvcRecords(%s, %s, %s, %t) %s", eID, n.ID()[0:7], name, epIP, epIPv6, ipMapUpdate, method)
 
 	c := n.getController()
 	c.Lock()
 	defer c.Unlock()
+
 	sr, ok := c.svcRecords[n.ID()]
 	if !ok {
 		return
 	}
 
 	if ipMapUpdate {
-		delete(sr.ipMap, netutils.ReverseIP(epIP.String()))
+		sr.ipMap.Remove(netutils.ReverseIP(epIP.String()), ipInfo{
+			name: name,
+		})
 
 		if epIPv6 != nil {
-			delete(sr.ipMap, netutils.ReverseIP(epIPv6.String()))
+			sr.ipMap.Remove(netutils.ReverseIP(epIPv6.String()), ipInfo{
+				name: name,
+			})
 		}
 	}
 
@@ -1868,9 +1879,11 @@ func (n *network) HandleQueryResp(name string, ip net.IP) {
 	}
 
 	ipStr := netutils.ReverseIP(ip.String())
-
-	if ipInfo, ok := sr.ipMap[ipStr]; ok {
-		ipInfo.extResolver = true
+	// If an object with extResolver == true is already in the set this call will fail
+	// but anyway it means that has already been inserted before
+	if ok, _ := sr.ipMap.Contains(ipStr, ipInfo{name: name}); ok {
+		sr.ipMap.Remove(ipStr, ipInfo{name: name})
+		sr.ipMap.Insert(ipStr, ipInfo{name: name, extResolver: true})
 	}
 }
 
@@ -1886,13 +1899,27 @@ func (n *network) ResolveIP(ip string) string {
 
 	nwName := n.Name()
 
-	ipInfo, ok := sr.ipMap[ip]
+	elemSet, ok := sr.ipMap.Get(ip)
+	if !ok || len(elemSet) == 0 {
+		return ""
+	}
+	// NOTE it is possible to have more than one element in the Set, this will happen
+	// because of interleave of diffent events from differnt sources (local container create vs
+	// network db notifications)
+	// In such cases the resolution will be based on the first element of the set, and can vary
+	// during the system stabilitation
+	elem, ok := elemSet[0].(ipInfo)
+	if !ok {
+		setStr, b := sr.ipMap.String(ip)
+		logrus.Errorf("expected set of ipInfo type for key %s set:%t %s", ip, b, setStr)
+		return ""
+	}
 
-	if !ok || ipInfo.extResolver {
+	if elem.extResolver {
 		return ""
 	}
 
-	return ipInfo.name + "." + nwName
+	return elem.name + "." + nwName
 }
 
 func (n *network) ResolveService(name string) ([]*net.SRV, []net.IP) {

+ 0 - 3
vendor/github.com/docker/libnetwork/networkdb/networkdb.go

@@ -285,7 +285,6 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
 	nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
 	nDB.Unlock()
 
-	nDB.broadcaster.Write(makeEvent(opCreate, tname, nid, key, value))
 	return nil
 }
 
@@ -313,7 +312,6 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
 	nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
 	nDB.Unlock()
 
-	nDB.broadcaster.Write(makeEvent(opUpdate, tname, nid, key, value))
 	return nil
 }
 
@@ -359,7 +357,6 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
 	nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
 	nDB.Unlock()
 
-	nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, value))
 	return nil
 }
 

+ 9 - 7
vendor/github.com/docker/libnetwork/sandbox.go

@@ -86,6 +86,9 @@ type sandbox struct {
 	ingress            bool
 	ndotsSet           bool
 	sync.Mutex
+	// This mutex is used to serialize service related operation for an endpoint
+	// The lock is here because the endpoint is saved into the store so is not unique
+	Service sync.Mutex
 }
 
 // These are the container configs used to customize container /etc/hosts file.
@@ -668,26 +671,25 @@ func (sb *sandbox) SetKey(basePath string) error {
 }
 
 func (sb *sandbox) EnableService() error {
+	logrus.Debugf("EnableService %s START", sb.containerID)
 	for _, ep := range sb.getConnectedEndpoints() {
 		if ep.enableService(true) {
-			if err := ep.addServiceInfoToCluster(); err != nil {
+			if err := ep.addServiceInfoToCluster(sb); err != nil {
 				ep.enableService(false)
 				return fmt.Errorf("could not update state for endpoint %s into cluster: %v", ep.Name(), err)
 			}
 		}
 	}
+	logrus.Debugf("EnableService %s DONE", sb.containerID)
 	return nil
 }
 
 func (sb *sandbox) DisableService() error {
+	logrus.Debugf("DisableService %s START", sb.containerID)
 	for _, ep := range sb.getConnectedEndpoints() {
-		if ep.enableService(false) {
-			if err := ep.deleteServiceInfoFromCluster(); err != nil {
-				ep.enableService(true)
-				return fmt.Errorf("could not delete state for endpoint %s from cluster: %v", ep.Name(), err)
-			}
-		}
+		ep.enableService(false)
 	}
+	logrus.Debugf("DisableService %s DONE", sb.containerID)
 	return nil
 }
 

+ 35 - 1
vendor/github.com/docker/libnetwork/service.go

@@ -4,6 +4,8 @@ import (
 	"fmt"
 	"net"
 	"sync"
+
+	"github.com/docker/libnetwork/common"
 )
 
 var (
@@ -48,17 +50,49 @@ type service struct {
 	// Service aliases
 	aliases []string
 
+	// This maps tracks for each IP address the list of endpoints ID
+	// associated with it. At stable state the endpoint ID expected is 1
+	// but during transition and service change it is possible to have
+	// temporary more than 1
+	ipToEndpoint common.SetMatrix
+
+	deleted bool
+
 	sync.Mutex
 }
 
+// assignIPToEndpoint inserts the mapping between the IP and the endpoint identifier
+// returns true if the mapping was not present, false otherwise
+// returns also the number of endpoints associated to the IP
+func (s *service) assignIPToEndpoint(ip, eID string) (bool, int) {
+	return s.ipToEndpoint.Insert(ip, eID)
+}
+
+// removeIPToEndpoint removes the mapping between the IP and the endpoint identifier
+// returns true if the mapping was deleted, false otherwise
+// returns also the number of endpoints associated to the IP
+func (s *service) removeIPToEndpoint(ip, eID string) (bool, int) {
+	return s.ipToEndpoint.Remove(ip, eID)
+}
+
+func (s *service) printIPToEndpoint(ip string) (string, bool) {
+	return s.ipToEndpoint.String(ip)
+}
+
 type loadBalancer struct {
 	vip    net.IP
 	fwMark uint32
 
 	// Map of backend IPs backing this loadbalancer on this
 	// network. It is keyed with endpoint ID.
-	backEnds map[string]net.IP
+	backEnds map[string]loadBalancerBackend
 
 	// Back pointer to service to which the loadbalancer belongs.
 	service *service
 }
+
+type loadBalancerBackend struct {
+	ip            net.IP
+	containerName string
+	taskAliases   []string
+}

+ 192 - 73
vendor/github.com/docker/libnetwork/service_common.go

@@ -6,15 +6,126 @@ import (
 	"net"
 
 	"github.com/Sirupsen/logrus"
+	"github.com/docker/libnetwork/common"
 )
 
-func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service {
+func (c *controller) addEndpointNameResolution(svcName, svcID, nID, eID, containerName string, vip net.IP, serviceAliases, taskAliases []string, ip net.IP, addService bool, method string) error {
+	n, err := c.NetworkByID(nID)
+	if err != nil {
+		return err
+	}
+
+	logrus.Debugf("addEndpointNameResolution %s %s add_service:%t", eID, svcName, addService)
+
+	// Add container resolution mappings
+	c.addContainerNameResolution(nID, eID, containerName, taskAliases, ip, method)
+
+	// Add endpoint IP to special "tasks.svc_name" so that the applications have access to DNS RR.
+	n.(*network).addSvcRecords(eID, "tasks."+svcName, ip, nil, false, method)
+	for _, alias := range serviceAliases {
+		n.(*network).addSvcRecords(eID, "tasks."+alias, ip, nil, false, method)
+	}
+
+	// Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR
+	if len(vip) == 0 {
+		n.(*network).addSvcRecords(eID, svcName, ip, nil, false, method)
+		for _, alias := range serviceAliases {
+			n.(*network).addSvcRecords(eID, alias, ip, nil, false, method)
+		}
+	}
+
+	if addService && len(vip) != 0 {
+		n.(*network).addSvcRecords(eID, svcName, vip, nil, false, method)
+		for _, alias := range serviceAliases {
+			n.(*network).addSvcRecords(eID, alias, vip, nil, false, method)
+		}
+	}
+
+	return nil
+}
+
+func (c *controller) addContainerNameResolution(nID, eID, containerName string, taskAliases []string, ip net.IP, method string) error {
+	n, err := c.NetworkByID(nID)
+	if err != nil {
+		return err
+	}
+	logrus.Debugf("addContainerNameResolution %s %s", eID, containerName)
+
+	// Add resolution for container name
+	n.(*network).addSvcRecords(eID, containerName, ip, nil, true, method)
+
+	// Add resolution for taskaliases
+	for _, alias := range taskAliases {
+		n.(*network).addSvcRecords(eID, alias, ip, nil, true, method)
+	}
+
+	return nil
+}
+
+func (c *controller) deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName string, vip net.IP, serviceAliases, taskAliases []string, ip net.IP, rmService, multipleEntries bool, method string) error {
+	n, err := c.NetworkByID(nID)
+	if err != nil {
+		return err
+	}
+
+	logrus.Debugf("deleteEndpointNameResolution %s %s rm_service:%t suppress:%t", eID, svcName, rmService, multipleEntries)
+
+	// Delete container resolution mappings
+	c.delContainerNameResolution(nID, eID, containerName, taskAliases, ip, method)
+
+	// Delete the special "tasks.svc_name" backend record.
+	if !multipleEntries {
+		n.(*network).deleteSvcRecords(eID, "tasks."+svcName, ip, nil, false, method)
+		for _, alias := range serviceAliases {
+			n.(*network).deleteSvcRecords(eID, "tasks."+alias, ip, nil, false, method)
+		}
+	}
+
+	// If we are doing DNS RR delete the endpoint IP from DNS record right away.
+	if !multipleEntries && len(vip) == 0 {
+		n.(*network).deleteSvcRecords(eID, svcName, ip, nil, false, method)
+		for _, alias := range serviceAliases {
+			n.(*network).deleteSvcRecords(eID, alias, ip, nil, false, method)
+		}
+	}
+
+	// Remove the DNS record for VIP only if we are removing the service
+	if rmService && len(vip) != 0 && !multipleEntries {
+		n.(*network).deleteSvcRecords(eID, svcName, vip, nil, false, method)
+		for _, alias := range serviceAliases {
+			n.(*network).deleteSvcRecords(eID, alias, vip, nil, false, method)
+		}
+	}
+
+	return nil
+}
+
+func (c *controller) delContainerNameResolution(nID, eID, containerName string, taskAliases []string, ip net.IP, method string) error {
+	n, err := c.NetworkByID(nID)
+	if err != nil {
+		return err
+	}
+	logrus.Debugf("delContainerNameResolution %s %s", eID, containerName)
+
+	// Delete resolution for container name
+	n.(*network).deleteSvcRecords(eID, containerName, ip, nil, true, method)
+
+	// Delete resolution for taskaliases
+	for _, alias := range taskAliases {
+		n.(*network).deleteSvcRecords(eID, alias, ip, nil, true, method)
+	}
+
+	return nil
+}
+
+func newService(name string, id string, ingressPorts []*PortConfig, serviceAliases []string) *service {
 	return &service{
 		name:          name,
 		id:            id,
 		ingressPorts:  ingressPorts,
 		loadBalancers: make(map[string]*loadBalancer),
-		aliases:       aliases,
+		aliases:       serviceAliases,
+		ipToEndpoint:  common.NewSetMatrix(),
 	}
 }
 
@@ -50,21 +161,26 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) {
 
 	for _, s := range services {
 		s.Lock()
+		// Skip the serviceBindings that got deleted
+		if s.deleted {
+			s.Unlock()
+			continue
+		}
 		for nid, lb := range s.loadBalancers {
 			if cleanupNID != "" && nid != cleanupNID {
 				continue
 			}
 
-			for eid, ip := range lb.backEnds {
+			for eid, be := range lb.backEnds {
 				service := s
 				loadBalancer := lb
 				networkID := nid
 				epID := eid
-				epIP := ip
+				epIP := be.ip
 
 				cleanupFuncs = append(cleanupFuncs, func() {
-					if err := c.rmServiceBinding(service.name, service.id, networkID, epID, loadBalancer.vip,
-						service.ingressPorts, service.aliases, epIP); err != nil {
+					if err := c.rmServiceBinding(service.name, service.id, networkID, epID, be.containerName, loadBalancer.vip,
+						service.ingressPorts, service.aliases, be.taskAliases, epIP, "cleanupServiceBindings"); err != nil {
 						logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v",
 							service.id, networkID, epID, err)
 					}
@@ -80,67 +196,72 @@ func (c *controller) cleanupServiceBindings(cleanupNID string) {
 
 }
 
-func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error {
-	n, err := c.NetworkByID(nid)
+func (c *controller) addServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases, taskAliases []string, ip net.IP, method string) error {
+	var addService bool
+
+	n, err := c.NetworkByID(nID)
 	if err != nil {
 		return err
 	}
 
 	skey := serviceKey{
-		id:    sid,
+		id:    svcID,
 		ports: portConfigs(ingressPorts).String(),
 	}
 
-	c.Lock()
-	s, ok := c.serviceBindings[skey]
-	if !ok {
-		// Create a new service if we are seeing this service
-		// for the first time.
-		s = newService(name, sid, ingressPorts, aliases)
-		c.serviceBindings[skey] = s
-	}
-	c.Unlock()
-
-	// Add endpoint IP to special "tasks.svc_name" so that the
-	// applications have access to DNS RR.
-	n.(*network).addSvcRecords("tasks."+name, ip, nil, false)
-	for _, alias := range aliases {
-		n.(*network).addSvcRecords("tasks."+alias, ip, nil, false)
-	}
-
-	// Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR
-	svcIP := vip
-	if len(svcIP) == 0 {
-		svcIP = ip
-	}
-	n.(*network).addSvcRecords(name, svcIP, nil, false)
-	for _, alias := range aliases {
-		n.(*network).addSvcRecords(alias, svcIP, nil, false)
+	var s *service
+	for {
+		c.Lock()
+		var ok bool
+		s, ok = c.serviceBindings[skey]
+		if !ok {
+			// Create a new service if we are seeing this service
+			// for the first time.
+			s = newService(svcName, svcID, ingressPorts, serviceAliases)
+			c.serviceBindings[skey] = s
+		}
+		c.Unlock()
+		s.Lock()
+		if !s.deleted {
+			// ok the object is good to be used
+			break
+		}
+		s.Unlock()
 	}
+	logrus.Debugf("addServiceBinding from %s START for %s %s", method, svcName, eID)
 
-	s.Lock()
 	defer s.Unlock()
 
-	lb, ok := s.loadBalancers[nid]
+	lb, ok := s.loadBalancers[nID]
 	if !ok {
 		// Create a new load balancer if we are seeing this
 		// network attachment on the service for the first
 		// time.
+		fwMarkCtrMu.Lock()
+
 		lb = &loadBalancer{
 			vip:      vip,
 			fwMark:   fwMarkCtr,
-			backEnds: make(map[string]net.IP),
+			backEnds: make(map[string]loadBalancerBackend),
 			service:  s,
 		}
 
-		fwMarkCtrMu.Lock()
 		fwMarkCtr++
 		fwMarkCtrMu.Unlock()
 
-		s.loadBalancers[nid] = lb
+		s.loadBalancers[nID] = lb
+		addService = true
 	}
 
-	lb.backEnds[eid] = ip
+	lb.backEnds[eID] = loadBalancerBackend{ip: ip,
+		containerName: containerName,
+		taskAliases:   taskAliases}
+
+	ok, entries := s.assignIPToEndpoint(ip.String(), eID)
+	if !ok || entries > 1 {
+		setStr, b := s.printIPToEndpoint(ip.String())
+		logrus.Warnf("addServiceBinding %s possible trainsient state ok:%t entries:%d set:%t %s", eID, ok, entries, b, setStr)
+	}
 
 	// Add loadbalancer service and backend in all sandboxes in
 	// the network only if vip is valid.
@@ -148,89 +269,87 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i
 		n.(*network).addLBBackend(ip, vip, lb.fwMark, ingressPorts)
 	}
 
+	// Add the appropriate name resolutions
+	c.addEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, addService, "addServiceBinding")
+
+	logrus.Debugf("addServiceBinding from %s END for %s %s", method, svcName, eID)
+
 	return nil
 }
 
-func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error {
+func (c *controller) rmServiceBinding(svcName, svcID, nID, eID, containerName string, vip net.IP, ingressPorts []*PortConfig, serviceAliases []string, taskAliases []string, ip net.IP, method string) error {
+
 	var rmService bool
 
-	n, err := c.NetworkByID(nid)
+	n, err := c.NetworkByID(nID)
 	if err != nil {
 		return err
 	}
 
 	skey := serviceKey{
-		id:    sid,
+		id:    svcID,
 		ports: portConfigs(ingressPorts).String(),
 	}
 
 	c.Lock()
 	s, ok := c.serviceBindings[skey]
 	c.Unlock()
+	logrus.Debugf("rmServiceBinding from %s START for %s %s", method, svcName, eID)
 	if !ok {
+		logrus.Warnf("rmServiceBinding %s %s %s aborted c.serviceBindings[skey] !ok", method, svcName, eID)
 		return nil
 	}
 
 	s.Lock()
-	lb, ok := s.loadBalancers[nid]
+	defer s.Unlock()
+	lb, ok := s.loadBalancers[nID]
 	if !ok {
-		s.Unlock()
+		logrus.Warnf("rmServiceBinding %s %s %s aborted s.loadBalancers[nid] !ok", method, svcName, eID)
 		return nil
 	}
 
-	_, ok = lb.backEnds[eid]
+	_, ok = lb.backEnds[eID]
 	if !ok {
-		s.Unlock()
+		logrus.Warnf("rmServiceBinding %s %s %s aborted lb.backEnds[eid] !ok", method, svcName, eID)
 		return nil
 	}
 
-	delete(lb.backEnds, eid)
+	delete(lb.backEnds, eID)
 	if len(lb.backEnds) == 0 {
 		// All the backends for this service have been
 		// removed. Time to remove the load balancer and also
 		// remove the service entry in IPVS.
 		rmService = true
 
-		delete(s.loadBalancers, nid)
+		delete(s.loadBalancers, nID)
 	}
 
 	if len(s.loadBalancers) == 0 {
 		// All loadbalancers for the service removed. Time to
 		// remove the service itself.
 		c.Lock()
+
+		// Mark the object as deleted so that the add won't use it wrongly
+		s.deleted = true
 		delete(c.serviceBindings, skey)
 		c.Unlock()
 	}
 
+	ok, entries := s.removeIPToEndpoint(ip.String(), eID)
+	if !ok || entries > 0 {
+		setStr, b := s.printIPToEndpoint(ip.String())
+		logrus.Warnf("rmServiceBinding %s possible trainsient state ok:%t entries:%d set:%t %s", eID, ok, entries, b, setStr)
+	}
+
 	// Remove loadbalancer service(if needed) and backend in all
 	// sandboxes in the network only if the vip is valid.
-	if len(vip) != 0 {
+	if len(vip) != 0 && entries == 0 {
 		n.(*network).rmLBBackend(ip, vip, lb.fwMark, ingressPorts, rmService)
 	}
-	s.Unlock()
 
-	// Delete the special "tasks.svc_name" backend record.
-	n.(*network).deleteSvcRecords("tasks."+name, ip, nil, false)
-	for _, alias := range aliases {
-		n.(*network).deleteSvcRecords("tasks."+alias, ip, nil, false)
-	}
-
-	// If we are doing DNS RR add the endpoint IP to DNS record
-	// right away.
-	if len(vip) == 0 {
-		n.(*network).deleteSvcRecords(name, ip, nil, false)
-		for _, alias := range aliases {
-			n.(*network).deleteSvcRecords(alias, ip, nil, false)
-		}
-	}
-
-	// Remove the DNS record for VIP only if we are removing the service
-	if rmService && len(vip) != 0 {
-		n.(*network).deleteSvcRecords(name, vip, nil, false)
-		for _, alias := range aliases {
-			n.(*network).deleteSvcRecords(alias, vip, nil, false)
-		}
-	}
+	// Delete the name resolutions
+	c.deleteEndpointNameResolution(svcName, svcID, nID, eID, containerName, vip, serviceAliases, taskAliases, ip, rmService, entries > 0, "rmServiceBinding")
 
+	logrus.Debugf("rmServiceBinding from %s END for %s %s", method, svcName, eID)
 	return nil
 }

+ 7 - 2
vendor/github.com/docker/libnetwork/service_linux.go

@@ -44,6 +44,11 @@ func (n *network) connectedLoadbalancers() []*loadBalancer {
 	var lbs []*loadBalancer
 	for _, s := range serviceBindings {
 		s.Lock()
+		// Skip the serviceBindings that got deleted
+		if s.deleted {
+			s.Unlock()
+			continue
+		}
 		if lb, ok := s.loadBalancers[n.ID()]; ok {
 			lbs = append(lbs, lb)
 		}
@@ -97,8 +102,8 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
 		}
 
 		lb.service.Lock()
-		for _, ip := range lb.backEnds {
-			sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress)
+		for _, l := range lb.backEnds {
+			sb.addLBBackend(l.ip, lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, gwIP, n.ingress)
 		}
 		lb.service.Unlock()
 	}