Преглед изворни кода

Add service support

Add a notion of service in libnetwork so that a group of endpoints
which form a service can be treated as such so that service level
features can be added on top. Initially as part of this PR the support
to assign a name to the said service is added which results in DNS
queries to the service name to return all the IPs of the backing
endpoints so that DNS RR behavior on the service name can be achieved.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
Jana Radhakrishnan пре 9 година
родитељ
комит
ffdceda255

+ 36 - 10
libnetwork/agent.go

@@ -165,7 +165,12 @@ func (ep *endpoint) addToCluster() error {
 
 
 	c := n.getController()
 	c := n.getController()
 	if !ep.isAnonymous() && ep.Iface().Address() != nil {
 	if !ep.isAnonymous() && ep.Iface().Address() != nil {
-		if err := c.agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), []byte(fmt.Sprintf("%s=%s", ep.Name(), ep.Iface().Address().IP))); err != nil {
+		if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Iface().Address().IP); err != nil {
+			return err
+		}
+
+		if err := c.agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), []byte(fmt.Sprintf("%s,%s,%s,%s", ep.Name(), ep.svcName,
+			ep.svcID, ep.Iface().Address().IP))); err != nil {
 			return err
 			return err
 		}
 		}
 	}
 	}
@@ -187,6 +192,12 @@ func (ep *endpoint) deleteFromCluster() error {
 
 
 	c := n.getController()
 	c := n.getController()
 	if !ep.isAnonymous() {
 	if !ep.isAnonymous() {
+		if ep.Iface().Address() != nil {
+			if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Iface().Address().IP); err != nil {
+				return err
+			}
+		}
+
 		if err := c.agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil {
 		if err := c.agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil {
 			return err
 			return err
 		}
 		}
@@ -297,38 +308,43 @@ func (n *network) handleDriverTableEvent(ev events.Event) {
 
 
 func (c *controller) handleEpTableEvent(ev events.Event) {
 func (c *controller) handleEpTableEvent(ev events.Event) {
 	var (
 	var (
-		id    string
+		nid   string
+		eid   string
 		value string
 		value string
 		isAdd bool
 		isAdd bool
 	)
 	)
 
 
 	switch event := ev.(type) {
 	switch event := ev.(type) {
 	case networkdb.CreateEvent:
 	case networkdb.CreateEvent:
-		id = event.NetworkID
+		nid = event.NetworkID
+		eid = event.Key
 		value = string(event.Value)
 		value = string(event.Value)
 		isAdd = true
 		isAdd = true
 	case networkdb.DeleteEvent:
 	case networkdb.DeleteEvent:
-		id = event.NetworkID
+		nid = event.NetworkID
+		eid = event.Key
 		value = string(event.Value)
 		value = string(event.Value)
 	case networkdb.UpdateEvent:
 	case networkdb.UpdateEvent:
 		logrus.Errorf("Unexpected update service table event = %#v", event)
 		logrus.Errorf("Unexpected update service table event = %#v", event)
 	}
 	}
 
 
-	nw, err := c.NetworkByID(id)
+	nw, err := c.NetworkByID(nid)
 	if err != nil {
 	if err != nil {
-		logrus.Errorf("Could not find network %s while handling service table event: %v", id, err)
+		logrus.Errorf("Could not find network %s while handling service table event: %v", nid, err)
 		return
 		return
 	}
 	}
 	n := nw.(*network)
 	n := nw.(*network)
 
 
-	pair := strings.Split(value, "=")
-	if len(pair) < 2 {
+	vals := strings.Split(value, ",")
+	if len(vals) < 4 {
 		logrus.Errorf("Incorrect service table value = %s", value)
 		logrus.Errorf("Incorrect service table value = %s", value)
 		return
 		return
 	}
 	}
 
 
-	name := pair[0]
-	ip := net.ParseIP(pair[1])
+	name := vals[0]
+	svcName := vals[1]
+	svcID := vals[2]
+	ip := net.ParseIP(vals[3])
 
 
 	if name == "" || ip == nil {
 	if name == "" || ip == nil {
 		logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
 		logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
@@ -336,8 +352,18 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
 	}
 	}
 
 
 	if isAdd {
 	if isAdd {
+		if err := c.addServiceBinding(svcName, svcID, nid, eid, ip); err != nil {
+			logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
+			return
+		}
+
 		n.addSvcRecords(name, ip, nil, true)
 		n.addSvcRecords(name, ip, nil, true)
 	} else {
 	} else {
+		if err := c.rmServiceBinding(svcName, svcID, nid, eid, ip); err != nil {
+			logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
+			return
+		}
+
 		n.deleteSvcRecords(name, ip, nil, true)
 		n.deleteSvcRecords(name, ip, nil, true)
 	}
 	}
 }
 }

+ 20 - 18
libnetwork/controller.go

@@ -123,20 +123,21 @@ type SandboxWalker func(sb Sandbox) bool
 type sandboxTable map[string]*sandbox
 type sandboxTable map[string]*sandbox
 
 
 type controller struct {
 type controller struct {
-	id             string
-	drvRegistry    *drvregistry.DrvRegistry
-	sandboxes      sandboxTable
-	cfg            *config.Config
-	stores         []datastore.DataStore
-	discovery      hostdiscovery.HostDiscovery
-	extKeyListener net.Listener
-	watchCh        chan *endpoint
-	unWatchCh      chan *endpoint
-	svcDb          map[string]svcInfo
-	nmap           map[string]*netWatch
-	defOsSbox      osl.Sandbox
-	sboxOnce       sync.Once
-	agent          *agent
+	id              string
+	drvRegistry     *drvregistry.DrvRegistry
+	sandboxes       sandboxTable
+	cfg             *config.Config
+	stores          []datastore.DataStore
+	discovery       hostdiscovery.HostDiscovery
+	extKeyListener  net.Listener
+	watchCh         chan *endpoint
+	unWatchCh       chan *endpoint
+	svcRecords      map[string]svcInfo
+	nmap            map[string]*netWatch
+	serviceBindings map[string]*service
+	defOsSbox       osl.Sandbox
+	sboxOnce        sync.Once
+	agent           *agent
 	sync.Mutex
 	sync.Mutex
 }
 }
 
 
@@ -148,10 +149,11 @@ type initializer struct {
 // New creates a new instance of network controller.
 // New creates a new instance of network controller.
 func New(cfgOptions ...config.Option) (NetworkController, error) {
 func New(cfgOptions ...config.Option) (NetworkController, error) {
 	c := &controller{
 	c := &controller{
-		id:        stringid.GenerateRandomID(),
-		cfg:       config.ParseConfigOptions(cfgOptions...),
-		sandboxes: sandboxTable{},
-		svcDb:     make(map[string]svcInfo),
+		id:              stringid.GenerateRandomID(),
+		cfg:             config.ParseConfigOptions(cfgOptions...),
+		sandboxes:       sandboxTable{},
+		svcRecords:      make(map[string]svcInfo),
+		serviceBindings: make(map[string]*service),
 	}
 	}
 
 
 	if err := c.agentInit(c.cfg.Daemon.Bind); err != nil {
 	if err := c.agentInit(c.cfg.Daemon.Bind); err != nil {

+ 30 - 2
libnetwork/endpoint.go

@@ -67,6 +67,8 @@ type endpoint struct {
 	ipamOptions       map[string]string
 	ipamOptions       map[string]string
 	aliases           map[string]string
 	aliases           map[string]string
 	myAliases         []string
 	myAliases         []string
+	svcID             string
+	svcName           string
 	dbIndex           uint64
 	dbIndex           uint64
 	dbExists          bool
 	dbExists          bool
 	sync.Mutex
 	sync.Mutex
@@ -89,6 +91,9 @@ func (ep *endpoint) MarshalJSON() ([]byte, error) {
 	epMap["anonymous"] = ep.anonymous
 	epMap["anonymous"] = ep.anonymous
 	epMap["disableResolution"] = ep.disableResolution
 	epMap["disableResolution"] = ep.disableResolution
 	epMap["myAliases"] = ep.myAliases
 	epMap["myAliases"] = ep.myAliases
+	epMap["svcName"] = ep.svcName
+	epMap["svcID"] = ep.svcID
+
 	return json.Marshal(epMap)
 	return json.Marshal(epMap)
 }
 }
 
 
@@ -172,6 +177,15 @@ func (ep *endpoint) UnmarshalJSON(b []byte) (err error) {
 	if l, ok := epMap["locator"]; ok {
 	if l, ok := epMap["locator"]; ok {
 		ep.locator = l.(string)
 		ep.locator = l.(string)
 	}
 	}
+
+	if sn, ok := epMap["svcName"]; ok {
+		ep.svcName = sn.(string)
+	}
+
+	if si, ok := epMap["svcID"]; ok {
+		ep.svcID = si.(string)
+	}
+
 	ma, _ := json.Marshal(epMap["myAliases"])
 	ma, _ := json.Marshal(epMap["myAliases"])
 	var myAliases []string
 	var myAliases []string
 	json.Unmarshal(ma, &myAliases)
 	json.Unmarshal(ma, &myAliases)
@@ -196,6 +210,8 @@ func (ep *endpoint) CopyTo(o datastore.KVObject) error {
 	dstEp.dbExists = ep.dbExists
 	dstEp.dbExists = ep.dbExists
 	dstEp.anonymous = ep.anonymous
 	dstEp.anonymous = ep.anonymous
 	dstEp.disableResolution = ep.disableResolution
 	dstEp.disableResolution = ep.disableResolution
+	dstEp.svcName = ep.svcName
+	dstEp.svcID = ep.svcID
 
 
 	if ep.iface != nil {
 	if ep.iface != nil {
 		dstEp.iface = &endpointInterface{}
 		dstEp.iface = &endpointInterface{}
@@ -413,7 +429,9 @@ func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error {
 	}()
 	}()
 
 
 	// Watch for service records
 	// Watch for service records
-	n.getController().watchSvcRecord(ep)
+	if !n.getController().cfg.Daemon.IsAgent {
+		n.getController().watchSvcRecord(ep)
+	}
 
 
 	address := ""
 	address := ""
 	if ip := ep.getFirstInterfaceAddress(); ip != nil {
 	if ip := ep.getFirstInterfaceAddress(); ip != nil {
@@ -738,7 +756,9 @@ func (ep *endpoint) Delete(force bool) error {
 	}()
 	}()
 
 
 	// unwatch for service records
 	// unwatch for service records
-	n.getController().unWatchSvcRecord(ep)
+	if !n.getController().cfg.Daemon.IsAgent {
+		n.getController().unWatchSvcRecord(ep)
+	}
 
 
 	if err = ep.deleteEndpoint(force); err != nil && !force {
 	if err = ep.deleteEndpoint(force); err != nil && !force {
 		return err
 		return err
@@ -871,6 +891,14 @@ func CreateOptionAlias(name string, alias string) EndpointOption {
 	}
 	}
 }
 }
 
 
+// CreateOptionService function returns an option setter for setting service binding configuration
+func CreateOptionService(name, id string) EndpointOption {
+	return func(ep *endpoint) {
+		ep.svcName = name
+		ep.svcID = id
+	}
+}
+
 //CreateOptionMyAlias function returns an option setter for setting endpoint's self alias
 //CreateOptionMyAlias function returns an option setter for setting endpoint's self alias
 func CreateOptionMyAlias(alias string) EndpointOption {
 func CreateOptionMyAlias(alias string) EndpointOption {
 	return func(ep *endpoint) {
 	return func(ep *endpoint) {

+ 4 - 4
libnetwork/network.go

@@ -1002,14 +1002,14 @@ func (n *network) addSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMapUp
 	c := n.getController()
 	c := n.getController()
 	c.Lock()
 	c.Lock()
 	defer c.Unlock()
 	defer c.Unlock()
-	sr, ok := c.svcDb[n.ID()]
+	sr, ok := c.svcRecords[n.ID()]
 	if !ok {
 	if !ok {
 		sr = svcInfo{
 		sr = svcInfo{
 			svcMap:     make(map[string][]net.IP),
 			svcMap:     make(map[string][]net.IP),
 			svcIPv6Map: make(map[string][]net.IP),
 			svcIPv6Map: make(map[string][]net.IP),
 			ipMap:      make(map[string]string),
 			ipMap:      make(map[string]string),
 		}
 		}
-		c.svcDb[n.ID()] = sr
+		c.svcRecords[n.ID()] = sr
 	}
 	}
 
 
 	if ipMapUpdate {
 	if ipMapUpdate {
@@ -1029,7 +1029,7 @@ func (n *network) deleteSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMa
 	c := n.getController()
 	c := n.getController()
 	c.Lock()
 	c.Lock()
 	defer c.Unlock()
 	defer c.Unlock()
-	sr, ok := c.svcDb[n.ID()]
+	sr, ok := c.svcRecords[n.ID()]
 	if !ok {
 	if !ok {
 		return
 		return
 	}
 	}
@@ -1054,7 +1054,7 @@ func (n *network) getSvcRecords(ep *endpoint) []etchosts.Record {
 	defer n.Unlock()
 	defer n.Unlock()
 
 
 	var recs []etchosts.Record
 	var recs []etchosts.Record
-	sr, _ := n.ctrlr.svcDb[n.id]
+	sr, _ := n.ctrlr.svcRecords[n.id]
 
 
 	for h, ip := range sr.svcMap {
 	for h, ip := range sr.svcMap {
 		if ep != nil && strings.Split(h, ".")[0] == ep.Name() {
 		if ep != nil && strings.Split(h, ".")[0] == ep.Name() {

+ 2 - 2
libnetwork/sandbox.go

@@ -405,7 +405,7 @@ func (sb *sandbox) ResolveIP(ip string) string {
 	for _, ep := range sb.getConnectedEndpoints() {
 	for _, ep := range sb.getConnectedEndpoints() {
 		n := ep.getNetwork()
 		n := ep.getNetwork()
 
 
-		sr, ok := n.getController().svcDb[n.ID()]
+		sr, ok := n.getController().svcRecords[n.ID()]
 		if !ok {
 		if !ok {
 			continue
 			continue
 		}
 		}
@@ -512,7 +512,7 @@ func (sb *sandbox) resolveName(req string, networkName string, epList []*endpoin
 			ep.Unlock()
 			ep.Unlock()
 		}
 		}
 
 
-		sr, ok := n.getController().svcDb[n.ID()]
+		sr, ok := n.getController().svcRecords[n.ID()]
 		if !ok {
 		if !ok {
 			continue
 			continue
 		}
 		}

+ 80 - 0
libnetwork/service.go

@@ -0,0 +1,80 @@
+package libnetwork
+
+import "net"
+
+type service struct {
+	name     string
+	id       string
+	backEnds map[string]map[string]net.IP
+}
+
+func newService(name string, id string) *service {
+	return &service{
+		name:     name,
+		id:       id,
+		backEnds: make(map[string]map[string]net.IP),
+	}
+}
+
+func (c *controller) addServiceBinding(name, sid, nid, eid string, ip net.IP) error {
+	var s *service
+
+	n, err := c.NetworkByID(nid)
+	if err != nil {
+		return err
+	}
+
+	c.Lock()
+	s, ok := c.serviceBindings[sid]
+	if !ok {
+		s = newService(name, sid)
+	}
+
+	netBackEnds, ok := s.backEnds[nid]
+	if !ok {
+		netBackEnds = make(map[string]net.IP)
+		s.backEnds[nid] = netBackEnds
+	}
+
+	netBackEnds[eid] = ip
+	c.serviceBindings[sid] = s
+	c.Unlock()
+
+	n.(*network).addSvcRecords(name, ip, nil, false)
+	return nil
+}
+
+func (c *controller) rmServiceBinding(name, sid, nid, eid string, ip net.IP) error {
+	n, err := c.NetworkByID(nid)
+	if err != nil {
+		return err
+	}
+
+	c.Lock()
+	s, ok := c.serviceBindings[sid]
+	if !ok {
+		c.Unlock()
+		return nil
+	}
+
+	netBackEnds, ok := s.backEnds[nid]
+	if !ok {
+		c.Unlock()
+		return nil
+	}
+
+	delete(netBackEnds, eid)
+
+	if len(netBackEnds) == 0 {
+		delete(s.backEnds, nid)
+	}
+
+	if len(s.backEnds) == 0 {
+		delete(c.serviceBindings, sid)
+	}
+	c.Unlock()
+
+	n.(*network).deleteSvcRecords(name, ip, nil, false)
+
+	return err
+}

+ 1 - 1
libnetwork/store.go

@@ -420,7 +420,7 @@ func (c *controller) processEndpointDelete(nmap map[string]*netWatch, ep *endpoi
 
 
 			// This is the last container going away for the network. Destroy
 			// This is the last container going away for the network. Destroy
 			// this network's svc db entry
 			// this network's svc db entry
-			delete(c.svcDb, ep.getNetwork().ID())
+			delete(c.svcRecords, ep.getNetwork().ID())
 
 
 			delete(nmap, ep.getNetwork().ID())
 			delete(nmap, ep.getNetwork().ID())
 		}
 		}

+ 1 - 1
libnetwork/test/integration/dnet/helpers.bash

@@ -199,7 +199,7 @@ EOF
     cat ${tomlfile}
     cat ${tomlfile}
     docker run \
     docker run \
 	   -d \
 	   -d \
-	   --hostname=${name} \
+	   --hostname=$(echo ${name} | sed s/_/-/g) \
 	   --name=${name}  \
 	   --name=${name}  \
 	   --privileged \
 	   --privileged \
 	   -p ${hport}:${cport} \
 	   -p ${hport}:${cport} \