package libnetwork //go:generate protoc -I=. -I=../vendor/ --gogofaster_out=import_path=github.com/docker/docker/libnetwork:. agent.proto import ( "context" "encoding/json" "fmt" "net" "sort" "sync" "github.com/containerd/log" "github.com/docker/docker/libnetwork/cluster" "github.com/docker/docker/libnetwork/discoverapi" "github.com/docker/docker/libnetwork/driverapi" "github.com/docker/docker/libnetwork/networkdb" "github.com/docker/docker/libnetwork/scope" "github.com/docker/docker/libnetwork/types" "github.com/docker/go-events" "github.com/gogo/protobuf/proto" ) const ( subsysGossip = "networking:gossip" subsysIPSec = "networking:ipsec" keyringSize = 3 ) // ByTime implements sort.Interface for []*types.EncryptionKey based on // the LamportTime field. type ByTime []*types.EncryptionKey func (b ByTime) Len() int { return len(b) } func (b ByTime) Swap(i, j int) { b[i], b[j] = b[j], b[i] } func (b ByTime) Less(i, j int) bool { return b[i].LamportTime < b[j].LamportTime } type nwAgent struct { networkDB *networkdb.NetworkDB bindAddr net.IP advertiseAddr string dataPathAddr string coreCancelFuncs []func() driverCancelFuncs map[string][]func() mu sync.Mutex } func (a *nwAgent) dataPathAddress() string { a.mu.Lock() defer a.mu.Unlock() if a.dataPathAddr != "" { return a.dataPathAddr } return a.advertiseAddr } const libnetworkEPTable = "endpoint_table" func getBindAddr(ifaceName string) (net.IP, error) { iface, err := net.InterfaceByName(ifaceName) if err != nil { return nil, fmt.Errorf("failed to find interface %s: %v", ifaceName, err) } addrs, err := iface.Addrs() if err != nil { return nil, fmt.Errorf("failed to get interface addresses: %v", err) } for _, a := range addrs { addr, ok := a.(*net.IPNet) if !ok { continue } addrIP := addr.IP if addrIP.IsLinkLocalUnicast() { continue } return addrIP, nil } return nil, fmt.Errorf("failed to get bind address") } // resolveAddr resolves the given address, which can be one of, and // parsed in the following order or priority: // // - a well-formed IP-address // - a hostname // - an interface-name func resolveAddr(addrOrInterface string) (net.IP, error) { // Try and see if this is a valid IP address if ip := net.ParseIP(addrOrInterface); ip != nil { return ip, nil } // If not a valid IP address, it could be a hostname. addr, err := net.ResolveIPAddr("ip", addrOrInterface) if err != nil { // If hostname lookup failed, try to look for an interface with the given name. return getBindAddr(addrOrInterface) } return addr.IP, nil } func (c *Controller) handleKeyChange(keys []*types.EncryptionKey) error { drvEnc := discoverapi.DriverEncryptionUpdate{} agent := c.getAgent() if agent == nil { log.G(context.TODO()).Debug("Skipping key change as agent is nil") return nil } // Find the deleted key. If the deleted key was the primary key, // a new primary key should be set before removing if from keyring. c.mu.Lock() added := []byte{} deleted := []byte{} j := len(c.keys) for i := 0; i < j; { same := false for _, key := range keys { if same = key.LamportTime == c.keys[i].LamportTime; same { break } } if !same { cKey := c.keys[i] if cKey.Subsystem == subsysGossip { deleted = cKey.Key } if cKey.Subsystem == subsysIPSec { drvEnc.Prune = cKey.Key drvEnc.PruneTag = cKey.LamportTime } c.keys[i], c.keys[j-1] = c.keys[j-1], c.keys[i] c.keys[j-1] = nil j-- } i++ } c.keys = c.keys[:j] // Find the new key and add it to the key ring for _, key := range keys { same := false for _, cKey := range c.keys { if same = cKey.LamportTime == key.LamportTime; same { break } } if !same { c.keys = append(c.keys, key) if key.Subsystem == subsysGossip { added = key.Key } if key.Subsystem == subsysIPSec { drvEnc.Key = key.Key drvEnc.Tag = key.LamportTime } } } c.mu.Unlock() if len(added) > 0 { agent.networkDB.SetKey(added) } key, _, err := c.getPrimaryKeyTag(subsysGossip) if err != nil { return err } agent.networkDB.SetPrimaryKey(key) key, tag, err := c.getPrimaryKeyTag(subsysIPSec) if err != nil { return err } drvEnc.Primary = key drvEnc.PrimaryTag = tag if len(deleted) > 0 { agent.networkDB.RemoveKey(deleted) } c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool { dr, ok := driver.(discoverapi.Discover) if !ok { return false } if err := dr.DiscoverNew(discoverapi.EncryptionKeysUpdate, drvEnc); err != nil { log.G(context.TODO()).Warnf("Failed to update datapath keys in driver %s: %v", name, err) // Attempt to reconfigure keys in case of a update failure // which can arise due to a mismatch of keys // if worker nodes get temporarily disconnected log.G(context.TODO()).Warnf("Reconfiguring datapath keys for %s", name) drvCfgEnc := discoverapi.DriverEncryptionConfig{} drvCfgEnc.Keys, drvCfgEnc.Tags = c.getKeys(subsysIPSec) err = dr.DiscoverNew(discoverapi.EncryptionKeysConfig, drvCfgEnc) if err != nil { log.G(context.TODO()).Warnf("Failed to reset datapath keys in driver %s: %v", name, err) } } return false }) return nil } func (c *Controller) agentSetup(clusterProvider cluster.Provider) error { agent := c.getAgent() if agent != nil { // agent is already present, so there is no need initialize it again. return nil } bindAddr := clusterProvider.GetLocalAddress() advAddr := clusterProvider.GetAdvertiseAddress() dataAddr := clusterProvider.GetDataPathAddress() remoteList := clusterProvider.GetRemoteAddressList() remoteAddrList := make([]string, 0, len(remoteList)) for _, remote := range remoteList { addr, _, _ := net.SplitHostPort(remote) remoteAddrList = append(remoteAddrList, addr) } listen := clusterProvider.GetListenAddress() listenAddr, _, _ := net.SplitHostPort(listen) log.G(context.TODO()).WithFields(log.Fields{ "listen-addr": listenAddr, "local-addr": bindAddr, "advertise-addr": advAddr, "data-path-addr": dataAddr, "remote-addr-list": remoteAddrList, "network-control-plane-mtu": c.Config().NetworkControlPlaneMTU, }).Info("Initializing Libnetwork Agent") if advAddr != "" { if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil { log.G(context.TODO()).WithError(err).Errorf("Error in agentInit") return err } c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool { if capability.ConnectivityScope == scope.Global { if d, ok := driver.(discoverapi.Discover); ok { c.agentDriverNotify(d) } } return false }) } if len(remoteAddrList) > 0 { if err := c.agentJoin(remoteAddrList); err != nil { log.G(context.TODO()).WithError(err).Error("Error in joining gossip cluster: join will be retried in background") } } return nil } // For a given subsystem getKeys sorts the keys by lamport time and returns // slice of keys and lamport time which can used as a unique tag for the keys func (c *Controller) getKeys(subsystem string) (keys [][]byte, tags []uint64) { c.mu.Lock() defer c.mu.Unlock() sort.Sort(ByTime(c.keys)) keys = make([][]byte, 0, len(c.keys)) tags = make([]uint64, 0, len(c.keys)) for _, key := range c.keys { if key.Subsystem == subsystem { keys = append(keys, key.Key) tags = append(tags, key.LamportTime) } } if len(keys) > 1 { // TODO(thaJeztah): why are we swapping order here? This code was added in https://github.com/moby/libnetwork/commit/e83d68b7d1fd9c479120914024242238f791b4dc keys[0], keys[1] = keys[1], keys[0] tags[0], tags[1] = tags[1], tags[0] } return keys, tags } // getPrimaryKeyTag returns the primary key for a given subsystem from the // list of sorted key and the associated tag func (c *Controller) getPrimaryKeyTag(subsystem string) (key []byte, lamportTime uint64, _ error) { c.mu.Lock() defer c.mu.Unlock() sort.Sort(ByTime(c.keys)) keys := make([]*types.EncryptionKey, 0, len(c.keys)) for _, k := range c.keys { if k.Subsystem == subsystem { keys = append(keys, k) } } if len(keys) < 2 { return nil, 0, fmt.Errorf("no primary key found for %s subsystem: %d keys found on controller, expected at least 2", subsystem, len(keys)) } return keys[1].Key, keys[1].LamportTime, nil } func (c *Controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error { bindAddr, err := resolveAddr(bindAddrOrInterface) if err != nil { return err } keys, _ := c.getKeys(subsysGossip) netDBConf := networkdb.DefaultConfig() netDBConf.BindAddr = listenAddr netDBConf.AdvertiseAddr = advertiseAddr netDBConf.Keys = keys if c.Config().NetworkControlPlaneMTU != 0 { // Consider the MTU remove the IP hdr (IPv4 or IPv6) and the TCP/UDP hdr. // To be on the safe side let's cut 100 bytes netDBConf.PacketBufferSize = (c.Config().NetworkControlPlaneMTU - 100) log.G(context.TODO()).Debugf("Control plane MTU: %d will initialize NetworkDB with: %d", c.Config().NetworkControlPlaneMTU, netDBConf.PacketBufferSize) } nDB, err := networkdb.New(netDBConf) if err != nil { return err } // Register the diagnostic handlers nDB.RegisterDiagnosticHandlers(c.DiagnosticServer) var cancelList []func() ch, cancel := nDB.Watch(libnetworkEPTable, "") cancelList = append(cancelList, cancel) nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "") cancelList = append(cancelList, cancel) c.mu.Lock() c.agent = &nwAgent{ networkDB: nDB, bindAddr: bindAddr, advertiseAddr: advertiseAddr, dataPathAddr: dataPathAddr, coreCancelFuncs: cancelList, driverCancelFuncs: make(map[string][]func()), } c.mu.Unlock() go c.handleTableEvents(ch, c.handleEpTableEvent) go c.handleTableEvents(nodeCh, c.handleNodeTableEvent) keys, tags := c.getKeys(subsysIPSec) c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool { if dr, ok := driver.(discoverapi.Discover); ok { if err := dr.DiscoverNew(discoverapi.EncryptionKeysConfig, discoverapi.DriverEncryptionConfig{ Keys: keys, Tags: tags, }); err != nil { log.G(context.TODO()).Warnf("Failed to set datapath keys in driver %s: %v", name, err) } } return false }) c.WalkNetworks(joinCluster) return nil } func (c *Controller) agentJoin(remoteAddrList []string) error { agent := c.getAgent() if agent == nil { return nil } return agent.networkDB.Join(remoteAddrList) } func (c *Controller) agentDriverNotify(d discoverapi.Discover) { agent := c.getAgent() if agent == nil { return } if err := d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{ Address: agent.dataPathAddress(), BindAddress: agent.bindAddr.String(), Self: true, }); err != nil { log.G(context.TODO()).Warnf("Failed the node discovery in driver: %v", err) } keys, tags := c.getKeys(subsysIPSec) if err := d.DiscoverNew(discoverapi.EncryptionKeysConfig, discoverapi.DriverEncryptionConfig{ Keys: keys, Tags: tags, }); err != nil { log.G(context.TODO()).Warnf("Failed to set datapath keys in driver: %v", err) } } func (c *Controller) agentClose() { // Acquire current agent instance and reset its pointer // then run closing functions c.mu.Lock() agent := c.agent c.agent = nil c.mu.Unlock() // when the agent is closed the cluster provider should be cleaned up c.SetClusterProvider(nil) if agent == nil { return } var cancelList []func() agent.mu.Lock() for _, cancelFuncs := range agent.driverCancelFuncs { cancelList = append(cancelList, cancelFuncs...) } // Add also the cancel functions for the network db cancelList = append(cancelList, agent.coreCancelFuncs...) agent.mu.Unlock() for _, cancel := range cancelList { cancel() } agent.networkDB.Close() } // Task has the backend container details type Task struct { Name string EndpointID string EndpointIP string Info map[string]string } // ServiceInfo has service specific details along with the list of backend tasks type ServiceInfo struct { VIP string LocalLBIndex int Tasks []Task Ports []string } type epRecord struct { ep EndpointRecord info map[string]string lbIndex int } // Services returns a map of services keyed by the service name with the details // of all the tasks that belong to the service. Applicable only in swarm mode. func (n *Network) Services() map[string]ServiceInfo { agent, ok := n.clusterAgent() if !ok { return nil } nwID := n.ID() d, err := n.driver(true) if err != nil { log.G(context.TODO()).Errorf("Could not resolve driver for network %s/%s while fetching services: %v", n.networkType, nwID, err) return nil } // Walk through libnetworkEPTable and fetch the driver agnostic endpoint info eps := make(map[string]epRecord) c := n.getController() for eid, value := range agent.networkDB.GetTableByNetwork(libnetworkEPTable, nwID) { var epRec EndpointRecord if err := proto.Unmarshal(value.Value, &epRec); err != nil { log.G(context.TODO()).Errorf("Unmarshal of libnetworkEPTable failed for endpoint %s in network %s, %v", eid, nwID, err) continue } eps[eid] = epRecord{ ep: epRec, lbIndex: c.getLBIndex(epRec.ServiceID, nwID, epRec.IngressPorts), } } // Walk through the driver's tables, have the driver decode the entries // and return the tuple {ep ID, value}. value is a string that coveys // relevant info about the endpoint. for _, table := range n.driverTables { if table.objType != driverapi.EndpointObject { continue } for key, value := range agent.networkDB.GetTableByNetwork(table.name, nwID) { epID, info := d.DecodeTableEntry(table.name, key, value.Value) if ep, ok := eps[epID]; !ok { log.G(context.TODO()).Errorf("Inconsistent driver and libnetwork state for endpoint %s", epID) } else { ep.info = info eps[epID] = ep } } } // group the endpoints into a map keyed by the service name sinfo := make(map[string]ServiceInfo) for ep, epr := range eps { s, ok := sinfo[epr.ep.ServiceName] if !ok { s = ServiceInfo{ VIP: epr.ep.VirtualIP, LocalLBIndex: epr.lbIndex, } } if s.Ports == nil { ports := make([]string, 0, len(epr.ep.IngressPorts)) for _, port := range epr.ep.IngressPorts { ports = append(ports, fmt.Sprintf("Target: %d, Publish: %d", port.TargetPort, port.PublishedPort)) } s.Ports = ports } s.Tasks = append(s.Tasks, Task{ Name: epr.ep.Name, EndpointID: ep, EndpointIP: epr.ep.EndpointIP, Info: epr.info, }) sinfo[epr.ep.ServiceName] = s } return sinfo } // clusterAgent returns the cluster agent if the network is a swarm-scoped, // multi-host network. func (n *Network) clusterAgent() (agent *nwAgent, ok bool) { if n.scope != scope.Swarm || !n.driverIsMultihost() { return nil, false } a := n.getController().getAgent() return a, a != nil } func (n *Network) joinCluster() error { agent, ok := n.clusterAgent() if !ok { return nil } return agent.networkDB.JoinNetwork(n.ID()) } func (n *Network) leaveCluster() error { agent, ok := n.clusterAgent() if !ok { return nil } return agent.networkDB.LeaveNetwork(n.ID()) } func (ep *Endpoint) addDriverInfoToCluster() error { if ep.joinInfo == nil || len(ep.joinInfo.driverTableEntries) == 0 { return nil } n := ep.getNetwork() agent, ok := n.clusterAgent() if !ok { return nil } nwID := n.ID() for _, te := range ep.joinInfo.driverTableEntries { if err := agent.networkDB.CreateEntry(te.tableName, nwID, te.key, te.value); err != nil { return err } } return nil } func (ep *Endpoint) deleteDriverInfoFromCluster() error { if ep.joinInfo == nil || len(ep.joinInfo.driverTableEntries) == 0 { return nil } n := ep.getNetwork() agent, ok := n.clusterAgent() if !ok { return nil } nwID := n.ID() for _, te := range ep.joinInfo.driverTableEntries { if err := agent.networkDB.DeleteEntry(te.tableName, nwID, te.key); err != nil { return err } } return nil } func (ep *Endpoint) addServiceInfoToCluster(sb *Sandbox) error { if len(ep.dnsNames) == 0 || ep.Iface() == nil || ep.Iface().Address() == nil { return nil } n := ep.getNetwork() agent, ok := n.clusterAgent() if !ok { return nil } sb.service.Lock() defer sb.service.Unlock() log.G(context.TODO()).Debugf("addServiceInfoToCluster START for %s %s", ep.svcName, ep.ID()) // Check that the endpoint is still present on the sandbox before adding it to the service discovery. // This is to handle a race between the EnableService and the sbLeave // It is possible that the EnableService starts, fetches the list of the endpoints and // by the time the addServiceInfoToCluster is called the endpoint got removed from the sandbox // The risk is that the deleteServiceInfoToCluster happens before the addServiceInfoToCluster. // This check under the Service lock of the sandbox ensure the correct behavior. // If the addServiceInfoToCluster arrives first may find or not the endpoint and will proceed or exit // but in any case the deleteServiceInfoToCluster will follow doing the cleanup if needed. // In case the deleteServiceInfoToCluster arrives first, this one is happening after the endpoint is // removed from the list, in this situation the delete will bail out not finding any data to cleanup // and the add will bail out not finding the endpoint on the sandbox. if err := sb.GetEndpoint(ep.ID()); err == nil { log.G(context.TODO()).Warnf("addServiceInfoToCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID()) return nil } dnsNames := ep.getDNSNames() primaryDNSName, dnsAliases := dnsNames[0], dnsNames[1:] var ingressPorts []*PortConfig if ep.svcID != "" { // This is a task part of a service // Gossip ingress ports only in ingress network. if n.ingress { ingressPorts = ep.ingressPorts } if err := n.getController().addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), primaryDNSName, ep.virtualIP, ingressPorts, ep.svcAliases, dnsAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil { return err } } else { // This is a container simply attached to an attachable network if err := n.getController().addContainerNameResolution(n.ID(), ep.ID(), primaryDNSName, dnsAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil { return err } } buf, err := proto.Marshal(&EndpointRecord{ Name: primaryDNSName, ServiceName: ep.svcName, ServiceID: ep.svcID, VirtualIP: ep.virtualIP.String(), IngressPorts: ingressPorts, Aliases: ep.svcAliases, TaskAliases: dnsAliases, EndpointIP: ep.Iface().Address().IP.String(), ServiceDisabled: false, }) if err != nil { return err } if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil { log.G(context.TODO()).Warnf("addServiceInfoToCluster NetworkDB CreateEntry failed for %s %s err:%s", ep.id, n.id, err) return err } log.G(context.TODO()).Debugf("addServiceInfoToCluster END for %s %s", ep.svcName, ep.ID()) return nil } func (ep *Endpoint) deleteServiceInfoFromCluster(sb *Sandbox, fullRemove bool, method string) error { if len(ep.dnsNames) == 0 { return nil } n := ep.getNetwork() agent, ok := n.clusterAgent() if !ok { return nil } sb.service.Lock() defer sb.service.Unlock() log.G(context.TODO()).Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID()) // Avoid a race w/ with a container that aborts preemptively. This would // get caught in disableServceInNetworkDB, but we check here to make the // nature of the condition more clear. // See comment in addServiceInfoToCluster() if err := sb.GetEndpoint(ep.ID()); err == nil { log.G(context.TODO()).Warnf("deleteServiceInfoFromCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID()) return nil } dnsNames := ep.getDNSNames() primaryDNSName, dnsAliases := dnsNames[0], dnsNames[1:] // First update the networkDB then locally if fullRemove { if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil { log.G(context.TODO()).Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err) } } else { disableServiceInNetworkDB(agent, n, ep) } if ep.Iface() != nil && ep.Iface().Address() != nil { if ep.svcID != "" { // This is a task part of a service var ingressPorts []*PortConfig if n.ingress { ingressPorts = ep.ingressPorts } if err := n.getController().rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), primaryDNSName, ep.virtualIP, ingressPorts, ep.svcAliases, dnsAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true, fullRemove); err != nil { return err } } else { // This is a container simply attached to an attachable network if err := n.getController().delContainerNameResolution(n.ID(), ep.ID(), primaryDNSName, dnsAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil { return err } } } log.G(context.TODO()).Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID()) return nil } func disableServiceInNetworkDB(a *nwAgent, n *Network, ep *Endpoint) { var epRec EndpointRecord log.G(context.TODO()).Debugf("disableServiceInNetworkDB for %s %s", ep.svcName, ep.ID()) // Update existing record to indicate that the service is disabled inBuf, err := a.networkDB.GetEntry(libnetworkEPTable, n.ID(), ep.ID()) if err != nil { log.G(context.TODO()).Warnf("disableServiceInNetworkDB GetEntry failed for %s %s err:%s", ep.id, n.id, err) return } // Should never fail if err := proto.Unmarshal(inBuf, &epRec); err != nil { log.G(context.TODO()).Errorf("disableServiceInNetworkDB unmarshal failed for %s %s err:%s", ep.id, n.id, err) return } epRec.ServiceDisabled = true // Should never fail outBuf, err := proto.Marshal(&epRec) if err != nil { log.G(context.TODO()).Errorf("disableServiceInNetworkDB marshalling failed for %s %s err:%s", ep.id, n.id, err) return } // Send update to the whole cluster if err := a.networkDB.UpdateEntry(libnetworkEPTable, n.ID(), ep.ID(), outBuf); err != nil { log.G(context.TODO()).Warnf("disableServiceInNetworkDB UpdateEntry failed for %s %s err:%s", ep.id, n.id, err) } } func (n *Network) addDriverWatches() { if len(n.driverTables) == 0 { return } agent, ok := n.clusterAgent() if !ok { return } c := n.getController() for _, table := range n.driverTables { ch, cancel := agent.networkDB.Watch(table.name, n.ID()) agent.mu.Lock() agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel) agent.mu.Unlock() go c.handleTableEvents(ch, n.handleDriverTableEvent) d, err := n.driver(false) if err != nil { log.G(context.TODO()).Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err) return } err = agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte, deleted bool) bool { // skip the entries that are mark for deletion, this is safe because this function is // called at initialization time so there is no state to delete if nid == n.ID() && !deleted { d.EventNotify(driverapi.Create, nid, table.name, key, value) } return false }) if err != nil { log.G(context.TODO()).WithError(err).Warn("Error while walking networkdb") } } } func (n *Network) cancelDriverWatches() { agent, ok := n.clusterAgent() if !ok { return } agent.mu.Lock() cancelFuncs := agent.driverCancelFuncs[n.ID()] delete(agent.driverCancelFuncs, n.ID()) agent.mu.Unlock() for _, cancel := range cancelFuncs { cancel() } } func (c *Controller) handleTableEvents(ch *events.Channel, fn func(events.Event)) { for { select { case ev := <-ch.C: fn(ev) case <-ch.Done(): return } } } func (n *Network) handleDriverTableEvent(ev events.Event) { d, err := n.driver(false) if err != nil { log.G(context.TODO()).Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err) return } var ( etype driverapi.EventType tname string key string value []byte ) switch event := ev.(type) { case networkdb.CreateEvent: tname = event.Table key = event.Key value = event.Value etype = driverapi.Create case networkdb.DeleteEvent: tname = event.Table key = event.Key value = event.Value etype = driverapi.Delete case networkdb.UpdateEvent: tname = event.Table key = event.Key value = event.Value etype = driverapi.Delete } d.EventNotify(etype, n.ID(), tname, key, value) } func (c *Controller) handleNodeTableEvent(ev events.Event) { var ( value []byte isAdd bool nodeAddr networkdb.NodeAddr ) switch event := ev.(type) { case networkdb.CreateEvent: value = event.Value isAdd = true case networkdb.DeleteEvent: value = event.Value case networkdb.UpdateEvent: log.G(context.TODO()).Errorf("Unexpected update node table event = %#v", event) } err := json.Unmarshal(value, &nodeAddr) if err != nil { log.G(context.TODO()).Errorf("Error unmarshalling node table event %v", err) return } c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd) } func (c *Controller) handleEpTableEvent(ev events.Event) { var ( nid string eid string value []byte epRec EndpointRecord ) switch event := ev.(type) { case networkdb.CreateEvent: nid = event.NetworkID eid = event.Key value = event.Value case networkdb.DeleteEvent: nid = event.NetworkID eid = event.Key value = event.Value case networkdb.UpdateEvent: nid = event.NetworkID eid = event.Key value = event.Value default: log.G(context.TODO()).Errorf("Unexpected update service table event = %#v", event) return } err := proto.Unmarshal(value, &epRec) if err != nil { log.G(context.TODO()).Errorf("Failed to unmarshal service table value: %v", err) return } containerName := epRec.Name svcName := epRec.ServiceName svcID := epRec.ServiceID vip := net.ParseIP(epRec.VirtualIP) ip := net.ParseIP(epRec.EndpointIP) ingressPorts := epRec.IngressPorts serviceAliases := epRec.Aliases taskAliases := epRec.TaskAliases if containerName == "" || ip == nil { log.G(context.TODO()).Errorf("Invalid endpoint name/ip received while handling service table event %s", value) return } switch ev.(type) { case networkdb.CreateEvent: log.G(context.TODO()).Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec) if svcID != "" { // This is a remote task part of a service if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil { log.G(context.TODO()).Errorf("failed adding service binding for %s epRec:%v err:%v", eid, epRec, err) return } } else { // This is a remote container simply attached to an attachable network if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil { log.G(context.TODO()).Errorf("failed adding container name resolution for %s epRec:%v err:%v", eid, epRec, err) } } case networkdb.DeleteEvent: log.G(context.TODO()).Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec) if svcID != "" { // This is a remote task part of a service if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil { log.G(context.TODO()).Errorf("failed removing service binding for %s epRec:%v err:%v", eid, epRec, err) return } } else { // This is a remote container simply attached to an attachable network if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil { log.G(context.TODO()).Errorf("failed removing container name resolution for %s epRec:%v err:%v", eid, epRec, err) } } case networkdb.UpdateEvent: log.G(context.TODO()).Debugf("handleEpTableEvent UPD %s R:%v", eid, epRec) // We currently should only get these to inform us that an endpoint // is disabled. Report if otherwise. if svcID == "" || !epRec.ServiceDisabled { log.G(context.TODO()).Errorf("Unexpected update table event for %s epRec:%v", eid, epRec) return } // This is a remote task that is part of a service that is now disabled if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil { log.G(context.TODO()).Errorf("failed disabling service binding for %s epRec:%v err:%v", eid, epRec, err) return } } }