moby/libnetwork/agent.go
Cory Snider 424ae36046 libnetwork/diagnostic: use standard http.Handler
We don't need C-style callback functions which accept a void* context
parameter: Go has closures. Drop the unnecessary httpHandlerCustom type
and refactor the diagnostic server handler functions into closures which
capture whatever context they need implicitly.

If the node leaves and rejoins a swarm, the cluster agent and its
associated NetworkDB are discarded and replaced with new instances. Upon
rejoin, the agent registers its NetworkDB instance with the diagnostic
server. These handlers would all conflict with the handlers registered
by the previous NetworkDB instance. Attempting to register a second
handler on a http.ServeMux with the same pattern will panic, which the
diagnostic server would historically deal with by ignoring the duplicate
handler registration. Consequently, the first NetworkDB instance to be
registered would "stick" to the diagnostic server for the lifetime of
the process, even after it is replaced with another instance. Improve
duplicate-handler registration such that the most recently-registered
handler for a pattern is used for all subsequent requests.

Signed-off-by: Cory Snider <csnider@mirantis.com>
2023-12-06 11:19:59 -05:00

980 lines
28 KiB
Go

package libnetwork
//go:generate protoc -I=. -I=../vendor/ --gogofaster_out=import_path=github.com/docker/docker/libnetwork:. agent.proto
import (
"context"
"encoding/json"
"fmt"
"net"
"sort"
"sync"
"github.com/containerd/log"
"github.com/docker/docker/libnetwork/cluster"
"github.com/docker/docker/libnetwork/discoverapi"
"github.com/docker/docker/libnetwork/driverapi"
"github.com/docker/docker/libnetwork/networkdb"
"github.com/docker/docker/libnetwork/scope"
"github.com/docker/docker/libnetwork/types"
"github.com/docker/go-events"
"github.com/gogo/protobuf/proto"
)
const (
subsysGossip = "networking:gossip"
subsysIPSec = "networking:ipsec"
keyringSize = 3
)
// ByTime implements sort.Interface for []*types.EncryptionKey based on
// the LamportTime field.
type ByTime []*types.EncryptionKey
func (b ByTime) Len() int { return len(b) }
func (b ByTime) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b ByTime) Less(i, j int) bool { return b[i].LamportTime < b[j].LamportTime }
type nwAgent struct {
networkDB *networkdb.NetworkDB
bindAddr net.IP
advertiseAddr string
dataPathAddr string
coreCancelFuncs []func()
driverCancelFuncs map[string][]func()
mu sync.Mutex
}
func (a *nwAgent) dataPathAddress() string {
a.mu.Lock()
defer a.mu.Unlock()
if a.dataPathAddr != "" {
return a.dataPathAddr
}
return a.advertiseAddr
}
const libnetworkEPTable = "endpoint_table"
func getBindAddr(ifaceName string) (net.IP, error) {
iface, err := net.InterfaceByName(ifaceName)
if err != nil {
return nil, fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
}
addrs, err := iface.Addrs()
if err != nil {
return nil, fmt.Errorf("failed to get interface addresses: %v", err)
}
for _, a := range addrs {
addr, ok := a.(*net.IPNet)
if !ok {
continue
}
addrIP := addr.IP
if addrIP.IsLinkLocalUnicast() {
continue
}
return addrIP, nil
}
return nil, fmt.Errorf("failed to get bind address")
}
// resolveAddr resolves the given address, which can be one of, and
// parsed in the following order or priority:
//
// - a well-formed IP-address
// - a hostname
// - an interface-name
func resolveAddr(addrOrInterface string) (net.IP, error) {
// Try and see if this is a valid IP address
if ip := net.ParseIP(addrOrInterface); ip != nil {
return ip, nil
}
// If not a valid IP address, it could be a hostname.
addr, err := net.ResolveIPAddr("ip", addrOrInterface)
if err != nil {
// If hostname lookup failed, try to look for an interface with the given name.
return getBindAddr(addrOrInterface)
}
return addr.IP, nil
}
func (c *Controller) handleKeyChange(keys []*types.EncryptionKey) error {
drvEnc := discoverapi.DriverEncryptionUpdate{}
agent := c.getAgent()
if agent == nil {
log.G(context.TODO()).Debug("Skipping key change as agent is nil")
return nil
}
// Find the deleted key. If the deleted key was the primary key,
// a new primary key should be set before removing if from keyring.
c.mu.Lock()
added := []byte{}
deleted := []byte{}
j := len(c.keys)
for i := 0; i < j; {
same := false
for _, key := range keys {
if same = key.LamportTime == c.keys[i].LamportTime; same {
break
}
}
if !same {
cKey := c.keys[i]
if cKey.Subsystem == subsysGossip {
deleted = cKey.Key
}
if cKey.Subsystem == subsysIPSec {
drvEnc.Prune = cKey.Key
drvEnc.PruneTag = cKey.LamportTime
}
c.keys[i], c.keys[j-1] = c.keys[j-1], c.keys[i]
c.keys[j-1] = nil
j--
}
i++
}
c.keys = c.keys[:j]
// Find the new key and add it to the key ring
for _, key := range keys {
same := false
for _, cKey := range c.keys {
if same = cKey.LamportTime == key.LamportTime; same {
break
}
}
if !same {
c.keys = append(c.keys, key)
if key.Subsystem == subsysGossip {
added = key.Key
}
if key.Subsystem == subsysIPSec {
drvEnc.Key = key.Key
drvEnc.Tag = key.LamportTime
}
}
}
c.mu.Unlock()
if len(added) > 0 {
agent.networkDB.SetKey(added)
}
key, _, err := c.getPrimaryKeyTag(subsysGossip)
if err != nil {
return err
}
agent.networkDB.SetPrimaryKey(key)
key, tag, err := c.getPrimaryKeyTag(subsysIPSec)
if err != nil {
return err
}
drvEnc.Primary = key
drvEnc.PrimaryTag = tag
if len(deleted) > 0 {
agent.networkDB.RemoveKey(deleted)
}
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
dr, ok := driver.(discoverapi.Discover)
if !ok {
return false
}
if err := dr.DiscoverNew(discoverapi.EncryptionKeysUpdate, drvEnc); err != nil {
log.G(context.TODO()).Warnf("Failed to update datapath keys in driver %s: %v", name, err)
// Attempt to reconfigure keys in case of a update failure
// which can arise due to a mismatch of keys
// if worker nodes get temporarily disconnected
log.G(context.TODO()).Warnf("Reconfiguring datapath keys for %s", name)
drvCfgEnc := discoverapi.DriverEncryptionConfig{}
drvCfgEnc.Keys, drvCfgEnc.Tags = c.getKeys(subsysIPSec)
err = dr.DiscoverNew(discoverapi.EncryptionKeysConfig, drvCfgEnc)
if err != nil {
log.G(context.TODO()).Warnf("Failed to reset datapath keys in driver %s: %v", name, err)
}
}
return false
})
return nil
}
func (c *Controller) agentSetup(clusterProvider cluster.Provider) error {
agent := c.getAgent()
if agent != nil {
// agent is already present, so there is no need initialize it again.
return nil
}
bindAddr := clusterProvider.GetLocalAddress()
advAddr := clusterProvider.GetAdvertiseAddress()
dataAddr := clusterProvider.GetDataPathAddress()
remoteList := clusterProvider.GetRemoteAddressList()
remoteAddrList := make([]string, 0, len(remoteList))
for _, remote := range remoteList {
addr, _, _ := net.SplitHostPort(remote)
remoteAddrList = append(remoteAddrList, addr)
}
listen := clusterProvider.GetListenAddress()
listenAddr, _, _ := net.SplitHostPort(listen)
log.G(context.TODO()).WithFields(log.Fields{
"listen-addr": listenAddr,
"local-addr": bindAddr,
"advertise-addr": advAddr,
"data-path-addr": dataAddr,
"remote-addr-list": remoteAddrList,
"network-control-plane-mtu": c.Config().NetworkControlPlaneMTU,
}).Info("Initializing Libnetwork Agent")
if advAddr != "" {
if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil {
log.G(context.TODO()).WithError(err).Errorf("Error in agentInit")
return err
}
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
if capability.ConnectivityScope == scope.Global {
if d, ok := driver.(discoverapi.Discover); ok {
c.agentDriverNotify(d)
}
}
return false
})
}
if len(remoteAddrList) > 0 {
if err := c.agentJoin(remoteAddrList); err != nil {
log.G(context.TODO()).WithError(err).Error("Error in joining gossip cluster: join will be retried in background")
}
}
return nil
}
// For a given subsystem getKeys sorts the keys by lamport time and returns
// slice of keys and lamport time which can used as a unique tag for the keys
func (c *Controller) getKeys(subsystem string) (keys [][]byte, tags []uint64) {
c.mu.Lock()
defer c.mu.Unlock()
sort.Sort(ByTime(c.keys))
keys = make([][]byte, 0, len(c.keys))
tags = make([]uint64, 0, len(c.keys))
for _, key := range c.keys {
if key.Subsystem == subsystem {
keys = append(keys, key.Key)
tags = append(tags, key.LamportTime)
}
}
if len(keys) > 1 {
// TODO(thaJeztah): why are we swapping order here? This code was added in https://github.com/moby/libnetwork/commit/e83d68b7d1fd9c479120914024242238f791b4dc
keys[0], keys[1] = keys[1], keys[0]
tags[0], tags[1] = tags[1], tags[0]
}
return keys, tags
}
// getPrimaryKeyTag returns the primary key for a given subsystem from the
// list of sorted key and the associated tag
func (c *Controller) getPrimaryKeyTag(subsystem string) (key []byte, lamportTime uint64, _ error) {
c.mu.Lock()
defer c.mu.Unlock()
sort.Sort(ByTime(c.keys))
keys := make([]*types.EncryptionKey, 0, len(c.keys))
for _, k := range c.keys {
if k.Subsystem == subsystem {
keys = append(keys, k)
}
}
if len(keys) < 2 {
return nil, 0, fmt.Errorf("no primary key found for %s subsystem: %d keys found on controller, expected at least 2", subsystem, len(keys))
}
return keys[1].Key, keys[1].LamportTime, nil
}
func (c *Controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, dataPathAddr string) error {
bindAddr, err := resolveAddr(bindAddrOrInterface)
if err != nil {
return err
}
keys, _ := c.getKeys(subsysGossip)
netDBConf := networkdb.DefaultConfig()
netDBConf.BindAddr = listenAddr
netDBConf.AdvertiseAddr = advertiseAddr
netDBConf.Keys = keys
if c.Config().NetworkControlPlaneMTU != 0 {
// Consider the MTU remove the IP hdr (IPv4 or IPv6) and the TCP/UDP hdr.
// To be on the safe side let's cut 100 bytes
netDBConf.PacketBufferSize = (c.Config().NetworkControlPlaneMTU - 100)
log.G(context.TODO()).Debugf("Control plane MTU: %d will initialize NetworkDB with: %d",
c.Config().NetworkControlPlaneMTU, netDBConf.PacketBufferSize)
}
nDB, err := networkdb.New(netDBConf)
if err != nil {
return err
}
// Register the diagnostic handlers
nDB.RegisterDiagnosticHandlers(c.DiagnosticServer)
var cancelList []func()
ch, cancel := nDB.Watch(libnetworkEPTable, "")
cancelList = append(cancelList, cancel)
nodeCh, cancel := nDB.Watch(networkdb.NodeTable, "")
cancelList = append(cancelList, cancel)
c.mu.Lock()
c.agent = &nwAgent{
networkDB: nDB,
bindAddr: bindAddr,
advertiseAddr: advertiseAddr,
dataPathAddr: dataPathAddr,
coreCancelFuncs: cancelList,
driverCancelFuncs: make(map[string][]func()),
}
c.mu.Unlock()
go c.handleTableEvents(ch, c.handleEpTableEvent)
go c.handleTableEvents(nodeCh, c.handleNodeTableEvent)
keys, tags := c.getKeys(subsysIPSec)
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
if dr, ok := driver.(discoverapi.Discover); ok {
if err := dr.DiscoverNew(discoverapi.EncryptionKeysConfig, discoverapi.DriverEncryptionConfig{
Keys: keys,
Tags: tags,
}); err != nil {
log.G(context.TODO()).Warnf("Failed to set datapath keys in driver %s: %v", name, err)
}
}
return false
})
c.WalkNetworks(joinCluster)
return nil
}
func (c *Controller) agentJoin(remoteAddrList []string) error {
agent := c.getAgent()
if agent == nil {
return nil
}
return agent.networkDB.Join(remoteAddrList)
}
func (c *Controller) agentDriverNotify(d discoverapi.Discover) {
agent := c.getAgent()
if agent == nil {
return
}
if err := d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
Address: agent.dataPathAddress(),
BindAddress: agent.bindAddr.String(),
Self: true,
}); err != nil {
log.G(context.TODO()).Warnf("Failed the node discovery in driver: %v", err)
}
keys, tags := c.getKeys(subsysIPSec)
if err := d.DiscoverNew(discoverapi.EncryptionKeysConfig, discoverapi.DriverEncryptionConfig{
Keys: keys,
Tags: tags,
}); err != nil {
log.G(context.TODO()).Warnf("Failed to set datapath keys in driver: %v", err)
}
}
func (c *Controller) agentClose() {
// Acquire current agent instance and reset its pointer
// then run closing functions
c.mu.Lock()
agent := c.agent
c.agent = nil
c.mu.Unlock()
// when the agent is closed the cluster provider should be cleaned up
c.SetClusterProvider(nil)
if agent == nil {
return
}
var cancelList []func()
agent.mu.Lock()
for _, cancelFuncs := range agent.driverCancelFuncs {
cancelList = append(cancelList, cancelFuncs...)
}
// Add also the cancel functions for the network db
cancelList = append(cancelList, agent.coreCancelFuncs...)
agent.mu.Unlock()
for _, cancel := range cancelList {
cancel()
}
agent.networkDB.Close()
}
// Task has the backend container details
type Task struct {
Name string
EndpointID string
EndpointIP string
Info map[string]string
}
// ServiceInfo has service specific details along with the list of backend tasks
type ServiceInfo struct {
VIP string
LocalLBIndex int
Tasks []Task
Ports []string
}
type epRecord struct {
ep EndpointRecord
info map[string]string
lbIndex int
}
// Services returns a map of services keyed by the service name with the details
// of all the tasks that belong to the service. Applicable only in swarm mode.
func (n *Network) Services() map[string]ServiceInfo {
agent, ok := n.clusterAgent()
if !ok {
return nil
}
nwID := n.ID()
d, err := n.driver(true)
if err != nil {
log.G(context.TODO()).Errorf("Could not resolve driver for network %s/%s while fetching services: %v", n.networkType, nwID, err)
return nil
}
// Walk through libnetworkEPTable and fetch the driver agnostic endpoint info
eps := make(map[string]epRecord)
c := n.getController()
for eid, value := range agent.networkDB.GetTableByNetwork(libnetworkEPTable, nwID) {
var epRec EndpointRecord
if err := proto.Unmarshal(value.Value, &epRec); err != nil {
log.G(context.TODO()).Errorf("Unmarshal of libnetworkEPTable failed for endpoint %s in network %s, %v", eid, nwID, err)
continue
}
eps[eid] = epRecord{
ep: epRec,
lbIndex: c.getLBIndex(epRec.ServiceID, nwID, epRec.IngressPorts),
}
}
// Walk through the driver's tables, have the driver decode the entries
// and return the tuple {ep ID, value}. value is a string that coveys
// relevant info about the endpoint.
for _, table := range n.driverTables {
if table.objType != driverapi.EndpointObject {
continue
}
for key, value := range agent.networkDB.GetTableByNetwork(table.name, nwID) {
epID, info := d.DecodeTableEntry(table.name, key, value.Value)
if ep, ok := eps[epID]; !ok {
log.G(context.TODO()).Errorf("Inconsistent driver and libnetwork state for endpoint %s", epID)
} else {
ep.info = info
eps[epID] = ep
}
}
}
// group the endpoints into a map keyed by the service name
sinfo := make(map[string]ServiceInfo)
for ep, epr := range eps {
s, ok := sinfo[epr.ep.ServiceName]
if !ok {
s = ServiceInfo{
VIP: epr.ep.VirtualIP,
LocalLBIndex: epr.lbIndex,
}
}
if s.Ports == nil {
ports := make([]string, 0, len(epr.ep.IngressPorts))
for _, port := range epr.ep.IngressPorts {
ports = append(ports, fmt.Sprintf("Target: %d, Publish: %d", port.TargetPort, port.PublishedPort))
}
s.Ports = ports
}
s.Tasks = append(s.Tasks, Task{
Name: epr.ep.Name,
EndpointID: ep,
EndpointIP: epr.ep.EndpointIP,
Info: epr.info,
})
sinfo[epr.ep.ServiceName] = s
}
return sinfo
}
// clusterAgent returns the cluster agent if the network is a swarm-scoped,
// multi-host network.
func (n *Network) clusterAgent() (agent *nwAgent, ok bool) {
if n.scope != scope.Swarm || !n.driverIsMultihost() {
return nil, false
}
a := n.getController().getAgent()
return a, a != nil
}
func (n *Network) joinCluster() error {
agent, ok := n.clusterAgent()
if !ok {
return nil
}
return agent.networkDB.JoinNetwork(n.ID())
}
func (n *Network) leaveCluster() error {
agent, ok := n.clusterAgent()
if !ok {
return nil
}
return agent.networkDB.LeaveNetwork(n.ID())
}
func (ep *Endpoint) addDriverInfoToCluster() error {
if ep.joinInfo == nil || len(ep.joinInfo.driverTableEntries) == 0 {
return nil
}
n := ep.getNetwork()
agent, ok := n.clusterAgent()
if !ok {
return nil
}
nwID := n.ID()
for _, te := range ep.joinInfo.driverTableEntries {
if err := agent.networkDB.CreateEntry(te.tableName, nwID, te.key, te.value); err != nil {
return err
}
}
return nil
}
func (ep *Endpoint) deleteDriverInfoFromCluster() error {
if ep.joinInfo == nil || len(ep.joinInfo.driverTableEntries) == 0 {
return nil
}
n := ep.getNetwork()
agent, ok := n.clusterAgent()
if !ok {
return nil
}
nwID := n.ID()
for _, te := range ep.joinInfo.driverTableEntries {
if err := agent.networkDB.DeleteEntry(te.tableName, nwID, te.key); err != nil {
return err
}
}
return nil
}
func (ep *Endpoint) addServiceInfoToCluster(sb *Sandbox) error {
if len(ep.myAliases) == 0 && ep.isAnonymous() || ep.Iface() == nil || ep.Iface().Address() == nil {
return nil
}
n := ep.getNetwork()
agent, ok := n.clusterAgent()
if !ok {
return nil
}
sb.service.Lock()
defer sb.service.Unlock()
log.G(context.TODO()).Debugf("addServiceInfoToCluster START for %s %s", ep.svcName, ep.ID())
// Check that the endpoint is still present on the sandbox before adding it to the service discovery.
// This is to handle a race between the EnableService and the sbLeave
// It is possible that the EnableService starts, fetches the list of the endpoints and
// by the time the addServiceInfoToCluster is called the endpoint got removed from the sandbox
// The risk is that the deleteServiceInfoToCluster happens before the addServiceInfoToCluster.
// This check under the Service lock of the sandbox ensure the correct behavior.
// If the addServiceInfoToCluster arrives first may find or not the endpoint and will proceed or exit
// but in any case the deleteServiceInfoToCluster will follow doing the cleanup if needed.
// In case the deleteServiceInfoToCluster arrives first, this one is happening after the endpoint is
// removed from the list, in this situation the delete will bail out not finding any data to cleanup
// and the add will bail out not finding the endpoint on the sandbox.
if err := sb.getEndpoint(ep.ID()); err == nil {
log.G(context.TODO()).Warnf("addServiceInfoToCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
return nil
}
name := ep.Name()
if ep.isAnonymous() {
name = ep.MyAliases()[0]
}
var ingressPorts []*PortConfig
if ep.svcID != "" {
// This is a task part of a service
// Gossip ingress ports only in ingress network.
if n.ingress {
ingressPorts = ep.ingressPorts
}
if err := n.getController().addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
return err
}
} else {
// This is a container simply attached to an attachable network
if err := n.getController().addContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
return err
}
}
buf, err := proto.Marshal(&EndpointRecord{
Name: name,
ServiceName: ep.svcName,
ServiceID: ep.svcID,
VirtualIP: ep.virtualIP.String(),
IngressPorts: ingressPorts,
Aliases: ep.svcAliases,
TaskAliases: ep.myAliases,
EndpointIP: ep.Iface().Address().IP.String(),
ServiceDisabled: false,
})
if err != nil {
return err
}
if err := agent.networkDB.CreateEntry(libnetworkEPTable, n.ID(), ep.ID(), buf); err != nil {
log.G(context.TODO()).Warnf("addServiceInfoToCluster NetworkDB CreateEntry failed for %s %s err:%s", ep.id, n.id, err)
return err
}
log.G(context.TODO()).Debugf("addServiceInfoToCluster END for %s %s", ep.svcName, ep.ID())
return nil
}
func (ep *Endpoint) deleteServiceInfoFromCluster(sb *Sandbox, fullRemove bool, method string) error {
if len(ep.myAliases) == 0 && ep.isAnonymous() {
return nil
}
n := ep.getNetwork()
agent, ok := n.clusterAgent()
if !ok {
return nil
}
sb.service.Lock()
defer sb.service.Unlock()
log.G(context.TODO()).Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())
// Avoid a race w/ with a container that aborts preemptively. This would
// get caught in disableServceInNetworkDB, but we check here to make the
// nature of the condition more clear.
// See comment in addServiceInfoToCluster()
if err := sb.getEndpoint(ep.ID()); err == nil {
log.G(context.TODO()).Warnf("deleteServiceInfoFromCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
return nil
}
name := ep.Name()
if ep.isAnonymous() {
name = ep.MyAliases()[0]
}
// First update the networkDB then locally
if fullRemove {
if err := agent.networkDB.DeleteEntry(libnetworkEPTable, n.ID(), ep.ID()); err != nil {
log.G(context.TODO()).Warnf("deleteServiceInfoFromCluster NetworkDB DeleteEntry failed for %s %s err:%s", ep.id, n.id, err)
}
} else {
disableServiceInNetworkDB(agent, n, ep)
}
if ep.Iface() != nil && ep.Iface().Address() != nil {
if ep.svcID != "" {
// This is a task part of a service
var ingressPorts []*PortConfig
if n.ingress {
ingressPorts = ep.ingressPorts
}
if err := n.getController().rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), name, ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true, fullRemove); err != nil {
return err
}
} else {
// This is a container simply attached to an attachable network
if err := n.getController().delContainerNameResolution(n.ID(), ep.ID(), name, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
return err
}
}
}
log.G(context.TODO()).Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())
return nil
}
func disableServiceInNetworkDB(a *nwAgent, n *Network, ep *Endpoint) {
var epRec EndpointRecord
log.G(context.TODO()).Debugf("disableServiceInNetworkDB for %s %s", ep.svcName, ep.ID())
// Update existing record to indicate that the service is disabled
inBuf, err := a.networkDB.GetEntry(libnetworkEPTable, n.ID(), ep.ID())
if err != nil {
log.G(context.TODO()).Warnf("disableServiceInNetworkDB GetEntry failed for %s %s err:%s", ep.id, n.id, err)
return
}
// Should never fail
if err := proto.Unmarshal(inBuf, &epRec); err != nil {
log.G(context.TODO()).Errorf("disableServiceInNetworkDB unmarshal failed for %s %s err:%s", ep.id, n.id, err)
return
}
epRec.ServiceDisabled = true
// Should never fail
outBuf, err := proto.Marshal(&epRec)
if err != nil {
log.G(context.TODO()).Errorf("disableServiceInNetworkDB marshalling failed for %s %s err:%s", ep.id, n.id, err)
return
}
// Send update to the whole cluster
if err := a.networkDB.UpdateEntry(libnetworkEPTable, n.ID(), ep.ID(), outBuf); err != nil {
log.G(context.TODO()).Warnf("disableServiceInNetworkDB UpdateEntry failed for %s %s err:%s", ep.id, n.id, err)
}
}
func (n *Network) addDriverWatches() {
if len(n.driverTables) == 0 {
return
}
agent, ok := n.clusterAgent()
if !ok {
return
}
c := n.getController()
for _, table := range n.driverTables {
ch, cancel := agent.networkDB.Watch(table.name, n.ID())
agent.mu.Lock()
agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel)
agent.mu.Unlock()
go c.handleTableEvents(ch, n.handleDriverTableEvent)
d, err := n.driver(false)
if err != nil {
log.G(context.TODO()).Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
return
}
err = agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte, deleted bool) bool {
// skip the entries that are mark for deletion, this is safe because this function is
// called at initialization time so there is no state to delete
if nid == n.ID() && !deleted {
d.EventNotify(driverapi.Create, nid, table.name, key, value)
}
return false
})
if err != nil {
log.G(context.TODO()).WithError(err).Warn("Error while walking networkdb")
}
}
}
func (n *Network) cancelDriverWatches() {
agent, ok := n.clusterAgent()
if !ok {
return
}
agent.mu.Lock()
cancelFuncs := agent.driverCancelFuncs[n.ID()]
delete(agent.driverCancelFuncs, n.ID())
agent.mu.Unlock()
for _, cancel := range cancelFuncs {
cancel()
}
}
func (c *Controller) handleTableEvents(ch *events.Channel, fn func(events.Event)) {
for {
select {
case ev := <-ch.C:
fn(ev)
case <-ch.Done():
return
}
}
}
func (n *Network) handleDriverTableEvent(ev events.Event) {
d, err := n.driver(false)
if err != nil {
log.G(context.TODO()).Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
return
}
var (
etype driverapi.EventType
tname string
key string
value []byte
)
switch event := ev.(type) {
case networkdb.CreateEvent:
tname = event.Table
key = event.Key
value = event.Value
etype = driverapi.Create
case networkdb.DeleteEvent:
tname = event.Table
key = event.Key
value = event.Value
etype = driverapi.Delete
case networkdb.UpdateEvent:
tname = event.Table
key = event.Key
value = event.Value
etype = driverapi.Delete
}
d.EventNotify(etype, n.ID(), tname, key, value)
}
func (c *Controller) handleNodeTableEvent(ev events.Event) {
var (
value []byte
isAdd bool
nodeAddr networkdb.NodeAddr
)
switch event := ev.(type) {
case networkdb.CreateEvent:
value = event.Value
isAdd = true
case networkdb.DeleteEvent:
value = event.Value
case networkdb.UpdateEvent:
log.G(context.TODO()).Errorf("Unexpected update node table event = %#v", event)
}
err := json.Unmarshal(value, &nodeAddr)
if err != nil {
log.G(context.TODO()).Errorf("Error unmarshalling node table event %v", err)
return
}
c.processNodeDiscovery([]net.IP{nodeAddr.Addr}, isAdd)
}
func (c *Controller) handleEpTableEvent(ev events.Event) {
var (
nid string
eid string
value []byte
epRec EndpointRecord
)
switch event := ev.(type) {
case networkdb.CreateEvent:
nid = event.NetworkID
eid = event.Key
value = event.Value
case networkdb.DeleteEvent:
nid = event.NetworkID
eid = event.Key
value = event.Value
case networkdb.UpdateEvent:
nid = event.NetworkID
eid = event.Key
value = event.Value
default:
log.G(context.TODO()).Errorf("Unexpected update service table event = %#v", event)
return
}
err := proto.Unmarshal(value, &epRec)
if err != nil {
log.G(context.TODO()).Errorf("Failed to unmarshal service table value: %v", err)
return
}
containerName := epRec.Name
svcName := epRec.ServiceName
svcID := epRec.ServiceID
vip := net.ParseIP(epRec.VirtualIP)
ip := net.ParseIP(epRec.EndpointIP)
ingressPorts := epRec.IngressPorts
serviceAliases := epRec.Aliases
taskAliases := epRec.TaskAliases
if containerName == "" || ip == nil {
log.G(context.TODO()).Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
return
}
switch ev.(type) {
case networkdb.CreateEvent:
log.G(context.TODO()).Debugf("handleEpTableEvent ADD %s R:%v", eid, epRec)
if svcID != "" {
// This is a remote task part of a service
if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
log.G(context.TODO()).Errorf("failed adding service binding for %s epRec:%v err:%v", eid, epRec, err)
return
}
} else {
// This is a remote container simply attached to an attachable network
if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
log.G(context.TODO()).Errorf("failed adding container name resolution for %s epRec:%v err:%v", eid, epRec, err)
}
}
case networkdb.DeleteEvent:
log.G(context.TODO()).Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec)
if svcID != "" {
// This is a remote task part of a service
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, true); err != nil {
log.G(context.TODO()).Errorf("failed removing service binding for %s epRec:%v err:%v", eid, epRec, err)
return
}
} else {
// This is a remote container simply attached to an attachable network
if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
log.G(context.TODO()).Errorf("failed removing container name resolution for %s epRec:%v err:%v", eid, epRec, err)
}
}
case networkdb.UpdateEvent:
log.G(context.TODO()).Debugf("handleEpTableEvent UPD %s R:%v", eid, epRec)
// We currently should only get these to inform us that an endpoint
// is disabled. Report if otherwise.
if svcID == "" || !epRec.ServiceDisabled {
log.G(context.TODO()).Errorf("Unexpected update table event for %s epRec:%v", eid, epRec)
return
}
// This is a remote task that is part of a service that is now disabled
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true, false); err != nil {
log.G(context.TODO()).Errorf("failed disabling service binding for %s epRec:%v err:%v", eid, epRec, err)
return
}
}
}