123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873 |
- package libnetwork
- //go:generate protoc -I.:Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. agent.proto
- import (
- "encoding/json"
- "fmt"
- "net"
- "os"
- "sort"
- "sync"
- "github.com/Sirupsen/logrus"
- "github.com/docker/docker/pkg/stringid"
- "github.com/docker/go-events"
- "github.com/docker/libnetwork/cluster"
- "github.com/docker/libnetwork/datastore"
- "github.com/docker/libnetwork/discoverapi"
- "github.com/docker/libnetwork/driverapi"
- "github.com/docker/libnetwork/networkdb"
- "github.com/docker/libnetwork/types"
- "github.com/gogo/protobuf/proto"
- )
- const (
- subsysGossip = "networking:gossip"
- subsysIPSec = "networking:ipsec"
- keyringSize = 3
- )
- // ByTime implements sort.Interface for []*types.EncryptionKey based on
- // the LamportTime field.
- type ByTime []*types.EncryptionKey
- func (b ByTime) Len() int { return len(b) }
- func (b ByTime) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
- func (b ByTime) Less(i, j int) bool { return b[i].LamportTime < b[j].LamportTime }
- type agent struct {
- networkDB *networkdb.NetworkDB
- bindAddr string
- advertiseAddr string
- dataPathAddr string
- coreCancelFuncs []func()
- driverCancelFuncs map[string][]func()
- sync.Mutex
- }
- func (a *agent) dataPathAddress() string {
- a.Lock()
- defer a.Unlock()
- if a.dataPathAddr != "" {
- return a.dataPathAddr
- }
- return a.advertiseAddr
- }
- const libnetworkEPTable = "endpoint_table"
- func getBindAddr(ifaceName string) (string, error) {
- iface, err := net.InterfaceByName(ifaceName)
- if err != nil {
- return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
- }
- addrs, err := iface.Addrs()
- if err != nil {
- return "", fmt.Errorf("failed to get interface addresses: %v", err)
- }
- for _, a := range addrs {
- addr, ok := a.(*net.IPNet)
- if !ok {
- continue
- }
- addrIP := addr.IP
- if addrIP.IsLinkLocalUnicast() {
- continue
- }
- return addrIP.String(), nil
- }
- return "", fmt.Errorf("failed to get bind address")
- }
- func resolveAddr(addrOrInterface string) (string, error) {
- // Try and see if this is a valid IP address
- if net.ParseIP(addrOrInterface) != nil {
- return addrOrInterface, nil
- }
- addr, err := net.ResolveIPAddr("ip", addrOrInterface)
- if err != nil {
- // If not a valid IP address, it should be a valid interface
- return getBindAddr(addrOrInterface)
- }
- return addr.String(), nil
- }
- func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error {
- drvEnc := discoverapi.DriverEncryptionUpdate{}
- a := c.getAgent()
- if a == nil {
- logrus.Debug("Skipping key change as agent is nil")
- return nil
- }
- // Find the deleted key. If the deleted key was the primary key,
- // a new primary key should be set before removing if from keyring.
- c.Lock()
- added := []byte{}
- deleted := []byte{}
- j := len(c.keys)
- for i := 0; i < j; {
- same := false
- for _, key := range keys {
- if same = key.LamportTime == c.keys[i].LamportTime; same {
- break
- }
- }
- if !same {
- cKey := c.keys[i]
- if cKey.Subsystem == subsysGossip {
- deleted = cKey.Key
- }
- if cKey.Subsystem == subsysIPSec {
- drvEnc.Prune = cKey.Key
- drvEnc.PruneTag = cKey.LamportTime
- }
- c.keys[i], c.keys[j-1] = c.keys[j-1], c.keys[i]
- c.keys[j-1] = nil
- j--
- }
- i++
- }
- c.keys = c.keys[:j]
- // Find the new key and add it to the key ring
- for _, key := range keys {
- same := false
- for _, cKey := range c.keys {
- if same = cKey.LamportTime == key.LamportTime; same {
- break
- }
- }
- if !same {
- c.keys = append(c.keys, key)
- if key.Subsystem == subsysGossip {
- added = key.Key
- }
- if key.Subsystem == subsysIPSec {
- drvEnc.Key = key.Key
- drvEnc.Tag = key.LamportTime
- }
- }
- }
- c.Unlock()
- if len(added) > 0 {
- a.networkDB.SetKey(added)
- }
- key, tag, err := c.getPrimaryKeyTag(subsysGossip)
- if err != nil {
- return err
- }
- a.networkDB.SetPrimaryKey(key)
- key, tag, err = c.getPrimaryKeyTag(subsysIPSec)
- if err != nil {
- return err
- }
- drvEnc.Primary = key
- drvEnc.PrimaryTag = tag
- if len(deleted) > 0 {
- a.networkDB.RemoveKey(deleted)
- }
- c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
- err := driver.DiscoverNew(discoverapi.EncryptionKeysUpdate, drvEnc)
- if err != nil {
- logrus.Warnf("Failed to update datapath keys in driver %s: %v", name, err)
- }
- return false
- })
- return nil
- }
- func (c *controller) agentSetup(clusterProvider cluster.Provider) error {
- agent := c.getAgent()
- // If the agent is already present there is no need to try to initilize it again
- if agent != nil {
- return nil
- }
- bindAddr := clusterProvider.GetLocalAddress()
- advAddr := clusterProvider.GetAdvertiseAddress()
- dataAddr := clusterProvider.GetDataPathAddress()
- remoteList := clusterProvider.GetRemoteAddressList()
- remoteAddrList := make([]string, 0, len(remoteList))
- for _, remote := range remoteList {
- addr, _, _ := net.SplitHostPort(remote)
- remoteAddrList = append(remoteAddrList, addr)
- }
- listen := clusterProvider.GetListenAddress()
- listenAddr, _, _ := net.SplitHostPort(listen)
- logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v",
- listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList)
- if advAddr != "" && agent == nil {
- if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
- logrus.Errorf("error in agentInit: %v", err)
- return err
- }
- c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
- if capability.ConnectivityScope == datastore.GlobalScope {
- c.agentDriverNotify(driver)
- }
- return false
- })
- }
- if len(remoteAddrList) > 0 {
- if err := c.agentJoin(remoteAddrList); err != nil {
- logrus.Errorf("Error in joining gossip cluster : %v(join will be retried in background)", err)
- }
- }
- return nil
- }
- // For a given subsystem getKeys sorts the keys by lamport time and returns
- // slice of keys and lamport time which can used as a unique tag for the keys
- func (c *controller) getKeys(subsys string) ([][]byte, []uint64) {
- c.Lock()
- defer c.Unlock()
- sort.Sort(ByTime(c.keys))
- keys := [][]byte{}
- tags := []uint64{}
- for _, key := range c.keys {
- if key.Subsystem == subsys {
- keys = append(keys, key.Key)
- tags = append(tags, key.LamportTime)
- }
- }
- keys[0], keys[1] = keys[1], keys[0]
- tags[0], tags[1] = tags[1], tags[0]
- return keys, tags
- }
- // getPrimaryKeyTag returns the primary key for a given subsystem from the
- // list of sorted key and the associated tag
- func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) {
- c.Lock()
- defer c.Unlock()
- sort.Sort(ByTime(c.keys))
- keys := []*types.EncryptionKey{}
- for _, key := range c.keys {
- if key.Subsystem == subsys {
- keys = append(keys, key)
- }
- }
- return keys[1].Key, keys[1].LamportTime, nil
- }
- func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error {
- bindAddr, err := resolveAddr(bindAddrOrInterface)
- if err != nil {
- return err
- }
- keys, _ := c.getKeys(subsysGossip)
- hostname, _ := os.Hostname()
- nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
- logrus.Info("Gossip cluster hostname ", nodeName)
- nDB, err := networkdb.New(&networkdb.Config{
- BindAddr: listenAddr,
- AdvertiseAddr: advertiseAddr,
- NodeName: nodeName,
- Keys: keys,
- })
- if err != nil {
- return err
- }
- var cancelList []func()
- ch, cancel := nDB.Watch(libnetworkEPTable, "", "")
- cancelList = append(cancelList, cancel)
- nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "", "")
- cancelList = append(cancelList, cancel)
- c.Lock()
- c.agent = &agent{
- networkDB: nDB,
- bindAddr: bindAddr,
- advertiseAddr: advertiseAddr,
- dataPathAddr: dataPathAddr,
- coreCancelFuncs: cancelList,
- driverCancelFuncs: make(map[string][]func()),
- }
- c.Unlock()
- go c.handleTableEvents(ch, c.handleEpTableEvent)
- go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
- drvEnc := discoverapi.DriverEncryptionConfig{}
- keys, tags := c.getKeys(subsysIPSec)
- drvEnc.Keys = keys
- drvEnc.Tags = tags
- c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
- err := driver.DiscoverNew(discoverapi.EncryptionKeysConfig, drvEnc)
- if err != nil {
- logrus.Warnf("Failed to set datapath keys in driver %s: %v", name, err)
- }
- return false
- })
- c.WalkNetworks(joinCluster)
- return nil
- }
- func (c *controller) agentJoin(remoteAddrList []string) error {
- agent := c.getAgent()
- if agent == nil {
- return nil
- }
- return agent.networkDB.Join(remoteAddrList)
- }
- func (c *controller) agentDriverNotify(d driverapi.Driver) {
- agent := c.getAgent()
- if agent == nil {
- return
- }
- if err := d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
- Address: agent.dataPathAddress(),
- BindAddress: agent.bindAddr,
- Self: true,
- }); err != nil {
- logrus.Warnf("Failed the node discovery in driver: %v", err)
- }
- drvEnc := discoverapi.DriverEncryptionConfig{}
- keys, tags := c.getKeys(subsysIPSec)
- drvEnc.Keys = keys
- drvEnc.Tags = tags
- if err := d.DiscoverNew(discoverapi.EncryptionKeysConfig, drvEnc); err != nil {
- logrus.Warnf("Failed to set datapath keys in driver: %v", err)
- }
- }
- func (c *controller) agentClose() {
- // Acquire current agent instance and reset its pointer
- // then run closing functions
- c.Lock()
- agent := c.agent
- c.agent = nil
- c.Unlock()
- if agent == nil {
- return
- }
- var cancelList []func()
- agent.Lock()
- for _, cancelFuncs := range agent.driverCancelFuncs {
- for _, cancel := range cancelFuncs {
- cancelList = append(cancelList, cancel)
- }
- }
- // Add also the cancel functions for the network db
- for _, cancel := range agent.coreCancelFuncs {
- cancelList = append(cancelList, cancel)
- }
- agent.Unlock()
- for _, cancel := range cancelList {
- cancel()
- }
- agent.networkDB.Close()
- }
- // Task has the backend container details
- type Task struct {
- Name string
- EndpointID string
- EndpointIP string
- Info map[string]string
- }
- // ServiceInfo has service specific details along with the list of backend tasks
- type ServiceInfo struct {
- VIP string
- LocalLBIndex int
- Tasks []Task
- Ports []string
- }
- type epRecord struct {
- ep EndpointRecord
- info map[string]string
- lbIndex int
- }
- func (n *network) Services() map[string]ServiceInfo {
- eps := make(map[string]epRecord)
- if !n.isClusterEligible() {
- return nil
- }
- agent := n.getController().getAgent()
- if agent == nil {
- return nil
- }
- // Walk through libnetworkEPTable and fetch the driver agnostic endpoint info
- entries := agent.networkDB.GetTableByNetwork(libnetworkEPTable, n.id)
- for eid, value := range entries {
- var epRec EndpointRecord
- nid := n.ID()
- if err := proto.Unmarshal(value.([]byte), &epRec); err != nil {
- logrus.Errorf("Unmarshal of libnetworkEPTable failed for endpoint %s in network %s, %v", eid, nid, err)
- continue
- }
- i := n.getController().getLBIndex(epRec.ServiceID, nid, epRec.IngressPorts)
- eps[eid] = epRecord{
- ep: epRec,
- lbIndex: i,
- }
- }
- // Walk through the driver's tables, have the driver decode the entries
- // and return the tuple {ep ID, value}. value is a string that coveys
- // relevant info about the endpoint.
- d, err := n.driver(true)
- if err != nil {
- logrus.Errorf("Could not resolve driver for network %s/%s while fetching services: %v", n.networkType, n.ID(), err)
- return nil
- }
- for _, table := range n.driverTables {
- if table.objType != driverapi.EndpointObject {
- continue
- }
- entries := agent.networkDB.GetTableByNetwork(table.name, n.id)
- for key, value := range entries {
- epID, info := d.DecodeTableEntry(table.name, key, value.([]byte))
- if ep, ok := eps[epID]; !ok {
- logrus.Errorf("Inconsistent driver and libnetwork state for endpoint %s", epID)
- } else {
- ep.info = info
- eps[epID] = ep
- }
- }
- }
- // group the endpoints into a map keyed by the service name
- sinfo := make(map[string]ServiceInfo)
- for ep, epr := range eps {
- var (
- s ServiceInfo
- ok bool
- )
- if s, ok = sinfo[epr.ep.ServiceName]; !ok {
- s = ServiceInfo{
- VIP: epr.ep.VirtualIP,
- LocalLBIndex: epr.lbIndex,
- }
- }
- ports := []string{}
- if s.Ports == nil {
- for _, port := range epr.ep.IngressPorts {
- p := fmt.Sprintf("Target: %d, Publish: %d", port.TargetPort, port.PublishedPort)
- ports = append(ports, p)
- }
- s.Ports = ports
- }
- s.Tasks = append(s.Tasks, Task{
- Name: epr.ep.Name,
- EndpointID: ep,
- EndpointIP: epr.ep.EndpointIP,
- Info: epr.info,
- })
- sinfo[epr.ep.ServiceName] = s
- }
- return sinfo
- }
- func (n *network) isClusterEligible() bool {
- if n.scope != datastore.SwarmScope || !n.driverIsMultihost() {
- return false
- }
- return n.getController().getAgent() != nil
- }
- func (n *network) joinCluster() error {
- if !n.isClusterEligible() {
- return nil
- }
- agent := n.getController().getAgent()
- if agent == nil {
- return nil
- }
- return agent.networkDB.JoinNetwork(n.ID())
- }
- func (n *network) leaveCluster() error {
- if !n.isClusterEligible() {
- return nil
- }
- agent := n.getController().getAgent()
- if agent == nil {
- return nil
- }
- return agent.networkDB.LeaveNetwork(n.ID())
- }
- func (ep *endpoint) addDriverInfoToCluster() error {
- n := ep.getNetwork()
- if !n.isClusterEligible() {
- return nil
- }
- if ep.joinInfo == nil {
- return nil
- }
- agent := n.getController().getAgent()
- if agent == nil {
- return nil
- }
- for _, te := range ep.joinInfo.driverTableEntries {
- if err := agent.networkDB.CreateEntry(te.tableName, n.ID(), te.key, te.value); err != nil {
- return err
- }
- }
- return nil
- }
- func (ep *endpoint) deleteDriverInfoFromCluster() error {
- n := ep.getNetwork()
- if !n.isClusterEligible() {
- return nil
- }
- if ep.joinInfo == nil {
- return nil
- }
- agent := n.getController().getAgent()
- if agent == nil {
- return nil
- }
- for _, te := range ep.joinInfo.driverTableEntries {
- if err := agent.networkDB.DeleteEntry(te.tableName, n.ID(), te.key); err != nil {
- return err
- }
- }
- return nil
- }
- func (ep *endpoint) addServiceInfoToCluster() error {
- if ep.isAnonymous() && len(ep.myAliases) == 0 || ep.Iface().Address() == nil {
- return nil
- }
- n := ep.getNetwork()
- if !n.isClusterEligible() {
- return nil
- }
- c := n.getController()
- agent := c.getAgent()
- var ingressPorts []*PortConfig
- if ep.svcID != "" {
- // Gossip ingress ports only in ingress network.
- if n.ingress {
- ingressPorts = ep.ingressPorts
- }
- if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
- return err
- }
- }
- name := ep.Name()
- if ep.isAnonymous() {
- name = ep.MyAliases()[0]
- }
- buf, err := proto.Marshal(&EndpointRecord{
- Name: name,
- ServiceName: ep.svcName,
- ServiceID: ep.svcID,
- VirtualIP: ep.virtualIP.String(),
- IngressPorts: ingressPorts,
- Aliases: ep.svcAliases,
- TaskAliases: ep.myAliases,
- EndpointIP: ep.Iface().Address().IP.String(),
- })
- if err != nil {
- return err
- }
- if agent != nil {
- if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil {
- return err
- }
- }
- return nil
- }
- func (ep *endpoint) deleteServiceInfoFromCluster() error {
- if ep.isAnonymous() && len(ep.myAliases) == 0 {
- return nil
- }
- n := ep.getNetwork()
- if !n.isClusterEligible() {
- return nil
- }
- c := n.getController()
- agent := c.getAgent()
- if ep.svcID != "" && ep.Iface().Address() != nil {
- var ingressPorts []*PortConfig
- if n.ingress {
- ingressPorts = ep.ingressPorts
- }
- if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
- return err
- }
- }
- if agent != nil {
- if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
- return err
- }
- }
- return nil
- }
- func (n *network) addDriverWatches() {
- if !n.isClusterEligible() {
- return
- }
- c := n.getController()
- agent := c.getAgent()
- if agent == nil {
- return
- }
- for _, table := range n.driverTables {
- ch, cancel := agent.networkDB.Watch(table.name, n.ID(), "")
- agent.Lock()
- agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
- agent.Unlock()
- go c.handleTableEvents(ch, n.handleDriverTableEvent)
- d, err := n.driver(false)
- if err != nil {
- logrus.Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
- return
- }
- agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte) bool {
- if nid == n.ID() {
- d.EventNotify(driverapi.Create, nid, table.name, key, value)
- }
- return false
- })
- }
- }
- func (n *network) cancelDriverWatches() {
- if !n.isClusterEligible() {
- return
- }
- agent := n.getController().getAgent()
- if agent == nil {
- return
- }
- agent.Lock()
- cancelFuncs := agent.driverCancelFuncs[n.ID()]
- delete(agent.driverCancelFuncs, n.ID())
- agent.Unlock()
- for _, cancel := range cancelFuncs {
- cancel()
- }
- }
- func (c *controller) handleTableEvents(ch chan events.Event, fn func(events.Event)) {
- for {
- select {
- case ev, ok := <-ch:
- if !ok {
- return
- }
- fn(ev)
- }
- }
- }
- func (n *network) handleDriverTableEvent(ev events.Event) {
- d, err := n.driver(false)
- if err != nil {
- logrus.Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
- return
- }
- var (
- etype driverapi.EventType
- tname string
- key string
- value []byte
- )
- switch event := ev.(type) {
- case networkdb.CreateEvent:
- tname = event.Table
- key = event.Key
- value = event.Value
- etype = driverapi.Create
- case networkdb.DeleteEvent:
- tname = event.Table
- key = event.Key
- value = event.Value
- etype = driverapi.Delete
- case networkdb.UpdateEvent:
- tname = event.Table
- key = event.Key
- value = event.Value
- etype = driverapi.Delete
- }
- d.EventNotify(etype, n.ID(), tname, key, value)
- }
- func (c *controller) handleNodeTableEvent(ev events.Event) {
- var (
- value []byte
- isAdd bool
- nodeAddr networkdb.NodeAddr
- )
- switch event := ev.(type) {
- case networkdb.CreateEvent:
- value = event.Value
- isAdd = true
- case networkdb.DeleteEvent:
- value = event.Value
- case networkdb.UpdateEvent:
- logrus.Errorf("Unexpected update node table event = %#v", event)
- }
- err := json.Unmarshal(value, &nodeAddr)
- if err != nil {
- logrus.Errorf("Error unmarshalling node table event %v", err)
- return
- }
- c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd)
- }
- func (c *controller) handleEpTableEvent(ev events.Event) {
- var (
- nid string
- eid string
- value []byte
- isAdd bool
- epRec EndpointRecord
- )
- switch event := ev.(type) {
- case networkdb.CreateEvent:
- nid = event.NetworkID
- eid = event.Key
- value = event.Value
- isAdd = true
- case networkdb.DeleteEvent:
- nid = event.NetworkID
- eid = event.Key
- value = event.Value
- case networkdb.UpdateEvent:
- logrus.Errorf("Unexpected update service table event = %#v", event)
- }
- nw, err := c.NetworkByID(nid)
- if err != nil {
- logrus.Errorf("Could not find network %s while handling service table event: %v", nid, err)
- return
- }
- n := nw.(*network)
- err = proto.Unmarshal(value, &epRec)
- if err != nil {
- logrus.Errorf("Failed to unmarshal service table value: %v", err)
- return
- }
- name := epRec.Name
- svcName := epRec.ServiceName
- svcID := epRec.ServiceID
- vip := net.ParseIP(epRec.VirtualIP)
- ip := net.ParseIP(epRec.EndpointIP)
- ingressPorts := epRec.IngressPorts
- aliases := epRec.Aliases
- taskaliases := epRec.TaskAliases
- if name == "" || ip == nil {
- logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
- return
- }
- if isAdd {
- if svcID != "" {
- if err := c.addServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
- logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
- return
- }
- }
- n.addSvcRecords(name, ip, nil, true)
- for _, alias := range taskaliases {
- n.addSvcRecords(alias, ip, nil, true)
- }
- } else {
- if svcID != "" {
- if err := c.rmServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
- logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
- return
- }
- }
- n.deleteSvcRecords(name, ip, nil, true)
- for _, alias := range taskaliases {
- n.deleteSvcRecords(alias, ip, nil, true)
- }
- }
- }
|