123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980 |
- package libnetwork
- //go:generate protoc -I=. -I=../vendor/ --gogofaster_out=import_path=github.com/docker/docker/libnetwork:. agent.proto
- import (
- "context"
- "encoding/json"
- "fmt"
- "net"
- "sort"
- "sync"
- "github.com/containerd/log"
- "github.com/docker/docker/libnetwork/cluster"
- "github.com/docker/docker/libnetwork/discoverapi"
- "github.com/docker/docker/libnetwork/driverapi"
- "github.com/docker/docker/libnetwork/networkdb"
- "github.com/docker/docker/libnetwork/scope"
- "github.com/docker/docker/libnetwork/types"
- "github.com/docker/go-events"
- "github.com/gogo/protobuf/proto"
- )
- const (
- subsysGossip = "networking:gossip"
- subsysIPSec = "networking:ipsec"
- keyringSize = 3
- )
- // ByTime implements sort.Interface for []*types.EncryptionKey based on
- // the LamportTime field.
- type ByTime []*types.EncryptionKey
- func (b ByTime) Len() int { return len(b) }
- func (b ByTime) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
- func (b ByTime) Less(i, j int) bool { return b[i].LamportTime < b[j].LamportTime }
- type nwAgent struct {
- networkDB *networkdb.NetworkDB
- bindAddr net.IP
- advertiseAddr string
- dataPathAddr string
- coreCancelFuncs []func()
- driverCancelFuncs map[string][]func()
- mu sync.Mutex
- }
- func (a *nwAgent) dataPathAddress() string {
- a.mu.Lock()
- defer a.mu.Unlock()
- if a.dataPathAddr != "" {
- return a.dataPathAddr
- }
- return a.advertiseAddr
- }
- const libnetworkEPTable = "endpoint_table"
- func getBindAddr(ifaceName string) (net.IP, error) {
- iface, err := net.InterfaceByName(ifaceName)
- if err != nil {
- return nil, fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
- }
- addrs, err := iface.Addrs()
- if err != nil {
- return nil, fmt.Errorf("failed to get interface addresses: %v", err)
- }
- for _, a := range addrs {
- addr, ok := a.(*net.IPNet)
- if !ok {
- continue
- }
- addrIP := addr.IP
- if addrIP.IsLinkLocalUnicast() {
- continue
- }
- return addrIP, nil
- }
- return nil, fmt.Errorf("failed to get bind address")
- }
- // resolveAddr resolves the given address, which can be one of, and
- // parsed in the following order or priority:
- //
- // - a well-formed IP-address
- // - a hostname
- // - an interface-name
- func resolveAddr(addrOrInterface string) (net.IP, error) {
- // Try and see if this is a valid IP address
- if ip := net.ParseIP(addrOrInterface); ip != nil {
- return ip, nil
- }
- // If not a valid IP address, it could be a hostname.
- addr, err := net.ResolveIPAddr("ip", addrOrInterface)
- if err != nil {
- // If hostname lookup failed, try to look for an interface with the given name.
- return getBindAddr(addrOrInterface)
- }
- return addr.IP, nil
- }
- func (c *Controller) handleKeyChange(keys []*types.EncryptionKey) error {
- drvEnc := discoverapi.DriverEncryptionUpdate{}
- agent := c.getAgent()
- if agent == nil {
- log.G(context.TODO()).Debug("Skipping key change as agent is nil")
- return nil
- }
- // Find the deleted key. If the deleted key was the primary key,
- // a new primary key should be set before removing if from keyring.
- c.mu.Lock()
- added := []byte{}
- deleted := []byte{}
- j := len(c.keys)
- for i := 0; i < j; {
- same := false
- for _, key := range keys {
- if same = key.LamportTime == c.keys[i].LamportTime; same {
- break
- }
- }
- if !same {
- cKey := c.keys[i]
- if cKey.Subsystem == subsysGossip {
- deleted = cKey.Key
- }
- if cKey.Subsystem == subsysIPSec {
- drvEnc.Prune = cKey.Key
- drvEnc.PruneTag = cKey.LamportTime
- }
- c.keys[i], c.keys[j-1] = c.keys[j-1], c.keys[i]
- c.keys[j-1] = nil
- j--
- }
- i++
- }
- c.keys = c.keys[:j]
- // Find the new key and add it to the key ring
- for _, key := range keys {
- same := false
- for _, cKey := range c.keys {
- if same = cKey.LamportTime == key.LamportTime; same {
- break
- }
- }
- if !same {
- c.keys = append(c.keys, key)
- if key.Subsystem == subsysGossip {
- added = key.Key
- }
- if key.Subsystem == subsysIPSec {
- drvEnc.Key = key.Key
- drvEnc.Tag = key.LamportTime
- }
- }
- }
- c.mu.Unlock()
- if len(added) > 0 {
- agent.networkDB.SetKey(added)
- }
- key, _, err := c.getPrimaryKeyTag(subsysGossip)
- if err != nil {
- return err
- }
- agent.networkDB.SetPrimaryKey(key)
- key, tag, err := c.getPrimaryKeyTag(subsysIPSec)
- if err != nil {
- return err
- }
- drvEnc.Primary = key
- drvEnc.PrimaryTag = tag
- if len(deleted) > 0 {
- agent.networkDB.RemoveKey(deleted)
- }
- c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
- dr, ok := driver.(discoverapi.Discover)
- if !ok {
- return false
- }
- if err := dr.DiscoverNew(discoverapi.EncryptionKeysUpdate, drvEnc); err != nil {
- log.G(context.TODO()).Warnf("Failed to update datapath keys in driver %s: %v", name, err)
- // Attempt to reconfigure keys in case of a update failure
- // which can arise due to a mismatch of keys
- // if worker nodes get temporarily disconnected
- log.G(context.TODO()).Warnf("Reconfiguring datapath keys for %s", name)
- drvCfgEnc := discoverapi.DriverEncryptionConfig{}
- drvCfgEnc.Keys, drvCfgEnc.Tags = c.getKeys(subsysIPSec)
- err = dr.DiscoverNew(discoverapi.EncryptionKeysConfig, drvCfgEnc)
- if err != nil {
- log.G(context.TODO()).Warnf("Failed to reset datapath keys in driver %s: %v", name, err)
- }
- }
- return false
- })
- return nil
- }
- func (c *Controller) agentSetup(clusterProvider cluster.Provider) error {
- agent := c.getAgent()
- if agent != nil {
- // agent is already present, so there is no need initialize it again.
- return nil
- }
- bindAddr := clusterProvider.GetLocalAddress()
- advAddr := clusterProvider.GetAdvertiseAddress()
- dataAddr := clusterProvider.GetDataPathAddress()
- remoteList := clusterProvider.GetRemoteAddressList()
- remoteAddrList := make([]string, 0, len(remoteList))
- for _, remote := range remoteList {
- addr, _, _ := net.SplitHostPort(remote)
- remoteAddrList = append(remoteAddrList, addr)
- }
- listen := clusterProvider.GetListenAddress()
- listenAddr, _, _ := net.SplitHostPort(listen)
- log.G(context.TODO()).WithFields(log.Fields{
- "listen-addr": listenAddr,
- "local-addr": bindAddr,
- "advertise-addr": advAddr,
- "data-path-addr": dataAddr,
- "remote-addr-list": remoteAddrList,
- "network-control-plane-mtu": c.Config().NetworkControlPlaneMTU,
- }).Info("Initializing Libnetwork Agent")
- if advAddr != "" {
- if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
- log.G(context.TODO()).WithError(err).Errorf("Error in agentInit")
- return err
- }
- c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
- if capability.ConnectivityScope == scope.Global {
- if d, ok := driver.(discoverapi.Discover); ok {
- c.agentDriverNotify(d)
- }
- }
- return false
- })
- }
- if len(remoteAddrList) > 0 {
- if err := c.agentJoin(remoteAddrList); err != nil {
- log.G(context.TODO()).WithError(err).Error("Error in joining gossip cluster: join will be retried in background")
- }
- }
- return nil
- }
- // For a given subsystem getKeys sorts the keys by lamport time and returns
- // slice of keys and lamport time which can used as a unique tag for the keys
- func (c *Controller) getKeys(subsystem string) (keys [][]byte, tags []uint64) {
- c.mu.Lock()
- defer c.mu.Unlock()
- sort.Sort(ByTime(c.keys))
- keys = make([][]byte, 0, len(c.keys))
- tags = make([]uint64, 0, len(c.keys))
- for _, key := range c.keys {
- if key.Subsystem == subsystem {
- keys = append(keys, key.Key)
- tags = append(tags, key.LamportTime)
- }
- }
- if len(keys) > 1 {
- // TODO(thaJeztah): why are we swapping order here? This code was added in https://github.com/moby/libnetwork/commit/e83d68b7d1fd9c479120914024242238f791b4dc
- keys[0], keys[1] = keys[1], keys[0]
- tags[0], tags[1] = tags[1], tags[0]
- }
- return keys, tags
- }
- // getPrimaryKeyTag returns the primary key for a given subsystem from the
- // list of sorted key and the associated tag
- func (c *Controller) getPrimaryKeyTag(subsystem string) (key []byte, lamportTime uint64, _ error) {
- c.mu.Lock()
- defer c.mu.Unlock()
- sort.Sort(ByTime(c.keys))
- keys := make([]*types.EncryptionKey, 0, len(c.keys))
- for _, k := range c.keys {
- if k.Subsystem == subsystem {
- keys = append(keys, k)
- }
- }
- if len(keys) < 2 {
- return nil, 0, fmt.Errorf("no primary key found for %s subsystem: %d keys found on controller, expected at least 2", subsystem, len(keys))
- }
- return keys[1].Key, keys[1].LamportTime, nil
- }
- func (c *Controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error {
- bindAddr, err := resolveAddr(bindAddrOrInterface)
- if err != nil {
- return err
- }
- keys, _ := c.getKeys(subsysGossip)
- netDBConf := networkdb.DefaultConfig()
- netDBConf.BindAddr = listenAddr
- netDBConf.AdvertiseAddr = advertiseAddr
- netDBConf.Keys = keys
- if c.Config().NetworkControlPlaneMTU != 0 {
- // Consider the MTU remove the IP hdr (IPv4 or IPv6) and the TCP/UDP hdr.
- // To be on the safe side let's cut 100 bytes
- netDBConf.PacketBufferSize = (c.Config().NetworkControlPlaneMTU - 100)
- log.G(context.TODO()).Debugf("Control plane MTU: %d will initialize NetworkDB with: %d",
- c.Config().NetworkControlPlaneMTU, netDBConf.PacketBufferSize)
- }
- nDB, err := networkdb.New(netDBConf)
- if err != nil {
- return err
- }
- // Register the diagnostic handlers
- c.DiagnosticServer.RegisterHandler(nDB, networkdb.NetDbPaths2Func)
- var cancelList []func()
- ch, cancel := nDB.Watch(libnetworkEPTable, "")
- cancelList = append(cancelList, cancel)
- nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "")
- cancelList = append(cancelList, cancel)
- c.mu.Lock()
- c.agent = &nwAgent{
- networkDB: nDB,
- bindAddr: bindAddr,
- advertiseAddr: advertiseAddr,
- dataPathAddr: dataPathAddr,
- coreCancelFuncs: cancelList,
- driverCancelFuncs: make(map[string][]func()),
- }
- c.mu.Unlock()
- go c.handleTableEvents(ch, c.handleEpTableEvent)
- go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
- keys, tags := c.getKeys(subsysIPSec)
- c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
- if dr, ok := driver.(discoverapi.Discover); ok {
- if err := dr.DiscoverNew(discoverapi.EncryptionKeysConfig, discoverapi.DriverEncryptionConfig{
- Keys: keys,
- Tags: tags,
- }); err != nil {
- log.G(context.TODO()).Warnf("Failed to set datapath keys in driver %s: %v", name, err)
- }
- }
- return false
- })
- c.WalkNetworks(joinCluster)
- return nil
- }
- func (c *Controller) agentJoin(remoteAddrList []string) error {
- agent := c.getAgent()
- if agent == nil {
- return nil
- }
- return agent.networkDB.Join(remoteAddrList)
- }
- func (c *Controller) agentDriverNotify(d discoverapi.Discover) {
- agent := c.getAgent()
- if agent == nil {
- return
- }
- if err := d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
- Address: agent.dataPathAddress(),
- BindAddress: agent.bindAddr.String(),
- Self: true,
- }); err != nil {
- log.G(context.TODO()).Warnf("Failed the node discovery in driver: %v", err)
- }
- keys, tags := c.getKeys(subsysIPSec)
- if err := d.DiscoverNew(discoverapi.EncryptionKeysConfig, discoverapi.DriverEncryptionConfig{
- Keys: keys,
- Tags: tags,
- }); err != nil {
- log.G(context.TODO()).Warnf("Failed to set datapath keys in driver: %v", err)
- }
- }
- func (c *Controller) agentClose() {
- // Acquire current agent instance and reset its pointer
- // then run closing functions
- c.mu.Lock()
- agent := c.agent
- c.agent = nil
- c.mu.Unlock()
- // when the agent is closed the cluster provider should be cleaned up
- c.SetClusterProvider(nil)
- if agent == nil {
- return
- }
- var cancelList []func()
- agent.mu.Lock()
- for _, cancelFuncs := range agent.driverCancelFuncs {
- cancelList = append(cancelList, cancelFuncs...)
- }
- // Add also the cancel functions for the network db
- cancelList = append(cancelList, agent.coreCancelFuncs...)
- agent.mu.Unlock()
- for _, cancel := range cancelList {
- cancel()
- }
- agent.networkDB.Close()
- }
- // Task has the backend container details
- type Task struct {
- Name string
- EndpointID string
- EndpointIP string
- Info map[string]string
- }
- // ServiceInfo has service specific details along with the list of backend tasks
- type ServiceInfo struct {
- VIP string
- LocalLBIndex int
- Tasks []Task
- Ports []string
- }
- type epRecord struct {
- ep EndpointRecord
- info map[string]string
- lbIndex int
- }
- // Services returns a map of services keyed by the service name with the details
- // of all the tasks that belong to the service. Applicable only in swarm mode.
- func (n *Network) Services() map[string]ServiceInfo {
- agent, ok := n.clusterAgent()
- if !ok {
- return nil
- }
- nwID := n.ID()
- d, err := n.driver(true)
- if err != nil {
- log.G(context.TODO()).Errorf("Could not resolve driver for network %s/%s while fetching services: %v", n.networkType, nwID, err)
- return nil
- }
- // Walk through libnetworkEPTable and fetch the driver agnostic endpoint info
- eps := make(map[string]epRecord)
- c := n.getController()
- for eid, value := range agent.networkDB.GetTableByNetwork(libnetworkEPTable, nwID) {
- var epRec EndpointRecord
- if err := proto.Unmarshal(value.Value, &epRec); err != nil {
- log.G(context.TODO()).Errorf("Unmarshal of libnetworkEPTable failed for endpoint %s in network %s, %v", eid, nwID, err)
- continue
- }
- eps[eid] = epRecord{
- ep: epRec,
- lbIndex: c.getLBIndex(epRec.ServiceID, nwID, epRec.IngressPorts),
- }
- }
- // Walk through the driver's tables, have the driver decode the entries
- // and return the tuple {ep ID, value}. value is a string that coveys
- // relevant info about the endpoint.
- for _, table := range n.driverTables {
- if table.objType != driverapi.EndpointObject {
- continue
- }
- for key, value := range agent.networkDB.GetTableByNetwork(table.name, nwID) {
- epID, info := d.DecodeTableEntry(table.name, key, value.Value)
- if ep, ok := eps[epID]; !ok {
- log.G(context.TODO()).Errorf("Inconsistent driver and libnetwork state for endpoint %s", epID)
- } else {
- ep.info = info
- eps[epID] = ep
- }
- }
- }
- // group the endpoints into a map keyed by the service name
- sinfo := make(map[string]ServiceInfo)
- for ep, epr := range eps {
- s, ok := sinfo[epr.ep.ServiceName]
- if !ok {
- s = ServiceInfo{
- VIP: epr.ep.VirtualIP,
- LocalLBIndex: epr.lbIndex,
- }
- }
- if s.Ports == nil {
- ports := make([]string, 0, len(epr.ep.IngressPorts))
- for _, port := range epr.ep.IngressPorts {
- ports = append(ports, fmt.Sprintf("Target: %d, Publish: %d", port.TargetPort, port.PublishedPort))
- }
- s.Ports = ports
- }
- s.Tasks = append(s.Tasks, Task{
- Name: epr.ep.Name,
- EndpointID: ep,
- EndpointIP: epr.ep.EndpointIP,
- Info: epr.info,
- })
- sinfo[epr.ep.ServiceName] = s
- }
- return sinfo
- }
- // clusterAgent returns the cluster agent if the network is a swarm-scoped,
- // multi-host network.
- func (n *Network) clusterAgent() (agent *nwAgent, ok bool) {
- if n.scope != scope.Swarm || !n.driverIsMultihost() {
- return nil, false
- }
- a := n.getController().getAgent()
- return a, a != nil
- }
- func (n *Network) joinCluster() error {
- agent, ok := n.clusterAgent()
- if !ok {
- return nil
- }
- return agent.networkDB.JoinNetwork(n.ID())
- }
- func (n *Network) leaveCluster() error {
- agent, ok := n.clusterAgent()
- if !ok {
- return nil
- }
- return agent.networkDB.LeaveNetwork(n.ID())
- }
- func (ep *Endpoint) addDriverInfoToCluster() error {
- if ep.joinInfo == nil || len(ep.joinInfo.driverTableEntries) == 0 {
- return nil
- }
- n := ep.getNetwork()
- agent, ok := n.clusterAgent()
- if !ok {
- return nil
- }
- nwID := n.ID()
- for _, te := range ep.joinInfo.driverTableEntries {
- if err := agent.networkDB.CreateEntry(te.tableName, nwID, te.key, te.value); err != nil {
- return err
- }
- }
- return nil
- }
- func (ep *Endpoint) deleteDriverInfoFromCluster() error {
- if ep.joinInfo == nil || len(ep.joinInfo.driverTableEntries) == 0 {
- return nil
- }
- n := ep.getNetwork()
- agent, ok := n.clusterAgent()
- if !ok {
- return nil
- }
- nwID := n.ID()
- for _, te := range ep.joinInfo.driverTableEntries {
- if err := agent.networkDB.DeleteEntry(te.tableName, nwID, te.key); err != nil {
- return err
- }
- }
- return nil
- }
- func (ep *Endpoint) addServiceInfoToCluster(sb *Sandbox) error {
- if len(ep.myAliases) == 0 && ep.isAnonymous() || ep.Iface() == nil || ep.Iface().Address() == nil {
- return nil
- }
- n := ep.getNetwork()
- agent, ok := n.clusterAgent()
- if !ok {
- return nil
- }
- sb.service.Lock()
- defer sb.service.Unlock()
- log.G(context.TODO()).Debugf("addServiceInfoToCluster START for %s %s", ep.svcName, ep.ID())
- // Check that the endpoint is still present on the sandbox before adding it to the service discovery.
- // This is to handle a race between the EnableService and the sbLeave
- // It is possible that the EnableService starts, fetches the list of the endpoints and
- // by the time the addServiceInfoToCluster is called the endpoint got removed from the sandbox
- // The risk is that the deleteServiceInfoToCluster happens before the addServiceInfoToCluster.
- // This check under the Service lock of the sandbox ensure the correct behavior.
- // If the addServiceInfoToCluster arrives first may find or not the endpoint and will proceed or exit
- // but in any case the deleteServiceInfoToCluster will follow doing the cleanup if needed.
- // In case the deleteServiceInfoToCluster arrives first, this one is happening after the endpoint is
- // removed from the list, in this situation the delete will bail out not finding any data to cleanup
- // and the add will bail out not finding the endpoint on the sandbox.
- if err := sb.getEndpoint(ep.ID()); err == nil {
- log.G(context.TODO()).Warnf("addServiceInfoToCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
- return nil
- }
- name := ep.Name()
- if ep.isAnonymous() {
- name = ep.MyAliases()[0]
- }
- var ingressPorts []*PortConfig
- if ep.svcID != "" {
- // This is a task part of a service
- // Gossip ingress ports only in ingress network.
- if n.ingress {
- ingressPorts = ep.ingressPorts
- }
- if err := n.getController().addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
- return err
- }
- } else {
- // This is a container simply attached to an attachable network
- if err := n.getController().addContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
- return err
- }
- }
- buf, err := proto.Marshal(&EndpointRecord{
- Name: name,
- ServiceName: ep.svcName,
- ServiceID: ep.svcID,
- VirtualIP: ep.virtualIP.String(),
- IngressPorts: ingressPorts,
- Aliases: ep.svcAliases,
- TaskAliases: ep.myAliases,
- EndpointIP: ep.Iface().Address().IP.String(),
- ServiceDisabled: false,
- })
- if err != nil {
- return err
- }
- if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil {
- log.G(context.TODO()).Warnf("addServiceInfoToCluster NetworkDB CreateEntry failed for %s %s err:%s", ep.id, n.id, err)
- return err
- }
- log.G(context.TODO()).Debugf("addServiceInfoToCluster END for %s %s", ep.svcName, ep.ID())
- return nil
- }
- func (ep *Endpoint) deleteServiceInfoFromCluster(sb *Sandbox, fullRemove bool, method string) error {
- if len(ep.myAliases) == 0 && ep.isAnonymous() {
- return nil
- }
- n := ep.getNetwork()
- agent, ok := n.clusterAgent()
- if !ok {
- return nil
- }
- sb.service.Lock()
- defer sb.service.Unlock()
- log.G(context.TODO()).Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())
- // Avoid a race w/ with a container that aborts preemptively. This would
- // get caught in disableServceInNetworkDB, but we check here to make the
- // nature of the condition more clear.
- // See comment in addServiceInfoToCluster()
- if err := sb.getEndpoint(ep.ID()); err == nil {
- log.G(context.TODO()).Warnf("deleteServiceInfoFromCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
- return nil
- }
- name := ep.Name()
- if ep.isAnonymous() {
- name = ep.MyAliases()[0]
- }
- // First update the networkDB then locally
- if fullRemove {
- if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
- log.G(context.TODO()).Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err)
- }
- } else {
- disableServiceInNetworkDB(agent, n, ep)
- }
- if ep.Iface() != nil && ep.Iface().Address() != nil {
- if ep.svcID != "" {
- // This is a task part of a service
- var ingressPorts []*PortConfig
- if n.ingress {
- ingressPorts = ep.ingressPorts
- }
- if err := n.getController().rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true, fullRemove); err != nil {
- return err
- }
- } else {
- // This is a container simply attached to an attachable network
- if err := n.getController().delContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
- return err
- }
- }
- }
- log.G(context.TODO()).Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())
- return nil
- }
- func disableServiceInNetworkDB(a *nwAgent, n *Network, ep *Endpoint) {
- var epRec EndpointRecord
- log.G(context.TODO()).Debugf("disableServiceInNetworkDB for %s %s", ep.svcName, ep.ID())
- // Update existing record to indicate that the service is disabled
- inBuf, err := a.networkDB.GetEntry(libnetworkEPTable, n.ID(), ep.ID())
- if err != nil {
- log.G(context.TODO()).Warnf("disableServiceInNetworkDB GetEntry failed for %s %s err:%s", ep.id, n.id, err)
- return
- }
- // Should never fail
- if err := proto.Unmarshal(inBuf, &epRec); err != nil {
- log.G(context.TODO()).Errorf("disableServiceInNetworkDB unmarshal failed for %s %s err:%s", ep.id, n.id, err)
- return
- }
- epRec.ServiceDisabled = true
- // Should never fail
- outBuf, err := proto.Marshal(&epRec)
- if err != nil {
- log.G(context.TODO()).Errorf("disableServiceInNetworkDB marshalling failed for %s %s err:%s", ep.id, n.id, err)
- return
- }
- // Send update to the whole cluster
- if err := a.networkDB.UpdateEntry(libnetworkEPTable, n.ID(), ep.ID(), outBuf); err != nil {
- log.G(context.TODO()).Warnf("disableServiceInNetworkDB UpdateEntry failed for %s %s err:%s", ep.id, n.id, err)
- }
- }
- func (n *Network) addDriverWatches() {
- if len(n.driverTables) == 0 {
- return
- }
- agent, ok := n.clusterAgent()
- if !ok {
- return
- }
- c := n.getController()
- for _, table := range n.driverTables {
- ch, cancel := agent.networkDB.Watch(table.name, n.ID())
- agent.mu.Lock()
- agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
- agent.mu.Unlock()
- go c.handleTableEvents(ch, n.handleDriverTableEvent)
- d, err := n.driver(false)
- if err != nil {
- log.G(context.TODO()).Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
- return
- }
- err = agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte, deleted bool) bool {
- // skip the entries that are mark for deletion, this is safe because this function is
- // called at initialization time so there is no state to delete
- if nid == n.ID() && !deleted {
- d.EventNotify(driverapi.Create, nid, table.name, key, value)
- }
- return false
- })
- if err != nil {
- log.G(context.TODO()).WithError(err).Warn("Error while walking networkdb")
- }
- }
- }
- func (n *Network) cancelDriverWatches() {
- agent, ok := n.clusterAgent()
- if !ok {
- return
- }
- agent.mu.Lock()
- cancelFuncs := agent.driverCancelFuncs[n.ID()]
- delete(agent.driverCancelFuncs, n.ID())
- agent.mu.Unlock()
- for _, cancel := range cancelFuncs {
- cancel()
- }
- }
- func (c *Controller) handleTableEvents(ch *events.Channel, fn func(events.Event)) {
- for {
- select {
- case ev := <-ch.C:
- fn(ev)
- case <-ch.Done():
- return
- }
- }
- }
- func (n *Network) handleDriverTableEvent(ev events.Event) {
- d, err := n.driver(false)
- if err != nil {
- log.G(context.TODO()).Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
- return
- }
- var (
- etype driverapi.EventType
- tname string
- key string
- value []byte
- )
- switch event := ev.(type) {
- case networkdb.CreateEvent:
- tname = event.Table
- key = event.Key
- value = event.Value
- etype = driverapi.Create
- case networkdb.DeleteEvent:
- tname = event.Table
- key = event.Key
- value = event.Value
- etype = driverapi.Delete
- case networkdb.UpdateEvent:
- tname = event.Table
- key = event.Key
- value = event.Value
- etype = driverapi.Delete
- }
- d.EventNotify(etype, n.ID(), tname, key, value)
- }
- func (c *Controller) handleNodeTableEvent(ev events.Event) {
- var (
- value []byte
- isAdd bool
- nodeAddr networkdb.NodeAddr
- )
- switch event := ev.(type) {
- case networkdb.CreateEvent:
- value = event.Value
- isAdd = true
- case networkdb.DeleteEvent:
- value = event.Value
- case networkdb.UpdateEvent:
- log.G(context.TODO()).Errorf("Unexpected update node table event = %#v", event)
- }
- err := json.Unmarshal(value, &nodeAddr)
- if err != nil {
- log.G(context.TODO()).Errorf("Error unmarshalling node table event %v", err)
- return
- }
- c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd)
- }
- func (c *Controller) handleEpTableEvent(ev events.Event) {
- var (
- nid string
- eid string
- value []byte
- epRec EndpointRecord
- )
- switch event := ev.(type) {
- case networkdb.CreateEvent:
- nid = event.NetworkID
- eid = event.Key
- value = event.Value
- case networkdb.DeleteEvent:
- nid = event.NetworkID
- eid = event.Key
- value = event.Value
- case networkdb.UpdateEvent:
- nid = event.NetworkID
- eid = event.Key
- value = event.Value
- default:
- log.G(context.TODO()).Errorf("Unexpected update service table event = %#v", event)
- return
- }
- err := proto.Unmarshal(value, &epRec)
- if err != nil {
- log.G(context.TODO()).Errorf("Failed to unmarshal service table value: %v", err)
- return
- }
- containerName := epRec.Name
- svcName := epRec.ServiceName
- svcID := epRec.ServiceID
- vip := net.ParseIP(epRec.VirtualIP)
- ip := net.ParseIP(epRec.EndpointIP)
- ingressPorts := epRec.IngressPorts
- serviceAliases := epRec.Aliases
- taskAliases := epRec.TaskAliases
- if containerName == "" || ip == nil {
- log.G(context.TODO()).Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
- return
- }
- switch ev.(type) {
- case networkdb.CreateEvent:
- log.G(context.TODO()).Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec)
- if svcID != "" {
- // This is a remote task part of a service
- if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
- log.G(context.TODO()).Errorf("failed adding service binding for %s epRec:%v err:%v", eid, epRec, err)
- return
- }
- } else {
- // This is a remote container simply attached to an attachable network
- if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
- log.G(context.TODO()).Errorf("failed adding container name resolution for %s epRec:%v err:%v", eid, epRec, err)
- }
- }
- case networkdb.DeleteEvent:
- log.G(context.TODO()).Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec)
- if svcID != "" {
- // This is a remote task part of a service
- if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil {
- log.G(context.TODO()).Errorf("failed removing service binding for %s epRec:%v err:%v", eid, epRec, err)
- return
- }
- } else {
- // This is a remote container simply attached to an attachable network
- if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
- log.G(context.TODO()).Errorf("failed removing container name resolution for %s epRec:%v err:%v", eid, epRec, err)
- }
- }
- case networkdb.UpdateEvent:
- log.G(context.TODO()).Debugf("handleEpTableEvent UPD %s R:%v", eid, epRec)
- // We currently should only get these to inform us that an endpoint
- // is disabled. Report if otherwise.
- if svcID == "" || !epRec.ServiceDisabled {
- log.G(context.TODO()).Errorf("Unexpected update table event for %s epRec:%v", eid, epRec)
- return
- }
- // This is a remote task that is part of a service that is now disabled
- if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
- log.G(context.TODO()).Errorf("failed disabling service binding for %s epRec:%v err:%v", eid, epRec, err)
- return
- }
- }
- }
|