diff --git a/hack/vendor.sh b/hack/vendor.sh index 665aa079ce..3aadb4f0cf 100755 --- a/hack/vendor.sh +++ b/hack/vendor.sh @@ -65,7 +65,7 @@ clone git github.com/RackSec/srslog 259aed10dfa74ea2961eddd1d9847619f6e98837 clone git github.com/imdario/mergo 0.2.1 #get libnetwork packages -clone git github.com/docker/libnetwork 82fb373e3eaa4e9bbb5b5ac148b0a3a71f80fca6 +clone git github.com/docker/libnetwork e282a91b294ab413a172b3c4e37d15fa92d79ef5 clone git github.com/docker/go-events afb2b9f2c23f33ada1a22b03651775fdc65a5089 clone git github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80 clone git github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec diff --git a/vendor/src/github.com/docker/libnetwork/agent.go b/vendor/src/github.com/docker/libnetwork/agent.go index 5a7e028cd6..fb0c342257 100644 --- a/vendor/src/github.com/docker/libnetwork/agent.go +++ b/vendor/src/github.com/docker/libnetwork/agent.go @@ -9,6 +9,7 @@ import ( "sort" "github.com/Sirupsen/logrus" + "github.com/docker/docker/pkg/stringid" "github.com/docker/go-events" "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/discoverapi" @@ -171,10 +172,12 @@ func (c *controller) agentSetup() error { advAddr := clusterProvider.GetAdvertiseAddress() remote := clusterProvider.GetRemoteAddress() remoteAddr, _, _ := net.SplitHostPort(remote) + listen := clusterProvider.GetListenAddress() + listenAddr, _, _ := net.SplitHostPort(listen) - logrus.Infof("Initializing Libnetwork Agent Local-addr=%s Adv-addr=%s Remote-addr =%s", bindAddr, advAddr, remoteAddr) + logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Remote-addr =%s", listenAddr, bindAddr, advAddr, remoteAddr) if advAddr != "" && c.agent == nil { - if err := c.agentInit(bindAddr, advAddr); err != nil { + if err := c.agentInit(listenAddr, bindAddr, advAddr); err != nil { logrus.Errorf("Error in agentInit : %v", err) } else { c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool { @@ -183,17 +186,23 @@ func (c *controller) agentSetup() error { } return false }) - - if c.agent != nil { - close(c.agentInitDone) - } } } + if remoteAddr != "" { if err := c.agentJoin(remoteAddr); err != nil { logrus.Errorf("Error in agentJoin : %v", err) + return nil } } + + c.Lock() + if c.agent != nil && c.agentInitDone != nil { + close(c.agentInitDone) + c.agentInitDone = nil + } + c.Unlock() + return nil } @@ -229,7 +238,7 @@ func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) { return keys[1].Key, keys[1].LamportTime, nil } -func (c *controller) agentInit(bindAddrOrInterface, advertiseAddr string) error { +func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr string) error { if !c.isAgent() { return nil } @@ -241,9 +250,13 @@ func (c *controller) agentInit(bindAddrOrInterface, advertiseAddr string) error keys, tags := c.getKeys(subsysGossip) hostname, _ := os.Hostname() + nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID()) + logrus.Info("Gossip cluster hostname ", nodeName) + nDB, err := networkdb.New(&networkdb.Config{ + BindAddr: listenAddr, AdvertiseAddr: advertiseAddr, - NodeName: hostname, + NodeName: nodeName, Keys: keys, }) @@ -328,7 +341,10 @@ func (c *controller) agentClose() { c.agent.epTblCancel() c.agent.networkDB.Close() + + c.Lock() c.agent = nil + c.Unlock() } func (n *network) isClusterEligible() bool { @@ -455,8 +471,12 @@ func (n *network) addDriverWatches() { c := n.getController() for _, tableName := range n.driverTables { - ch, cancel := c.agent.networkDB.Watch(tableName, n.ID(), "") c.Lock() + if c.agent == nil { + c.Unlock() + return + } + ch, cancel := c.agent.networkDB.Watch(tableName, n.ID(), "") c.agent.driverCancelFuncs[n.ID()] = append(c.agent.driverCancelFuncs[n.ID()], cancel) c.Unlock() diff --git a/vendor/src/github.com/docker/libnetwork/cluster/provider.go b/vendor/src/github.com/docker/libnetwork/cluster/provider.go index f8dc6f09b8..00c50806e3 100644 --- a/vendor/src/github.com/docker/libnetwork/cluster/provider.go +++ b/vendor/src/github.com/docker/libnetwork/cluster/provider.go @@ -5,6 +5,7 @@ type Provider interface { IsManager() bool IsAgent() bool GetLocalAddress() string + GetListenAddress() string GetAdvertiseAddress() string GetRemoteAddress() string ListenClusterEvents() <-chan struct{} diff --git a/vendor/src/github.com/docker/libnetwork/controller.go b/vendor/src/github.com/docker/libnetwork/controller.go index 382e64cdfb..0aa166c5cc 100644 --- a/vendor/src/github.com/docker/libnetwork/controller.go +++ b/vendor/src/github.com/docker/libnetwork/controller.go @@ -52,6 +52,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/docker/docker/pkg/discovery" + "github.com/docker/docker/pkg/locker" "github.com/docker/docker/pkg/plugins" "github.com/docker/docker/pkg/stringid" "github.com/docker/libnetwork/cluster" @@ -149,6 +150,7 @@ type controller struct { ingressSandbox *sandbox sboxOnce sync.Once agent *agent + networkLocker *locker.Locker agentInitDone chan struct{} keys []*types.EncryptionKey clusterConfigAvailable bool @@ -169,6 +171,7 @@ func New(cfgOptions ...config.Option) (NetworkController, error) { svcRecords: make(map[string]svcInfo), serviceBindings: make(map[serviceKey]*service), agentInitDone: make(chan struct{}), + networkLocker: locker.New(), } if err := c.initStores(); err != nil { @@ -307,8 +310,41 @@ func (c *controller) clusterAgentInit() { c.Lock() c.clusterConfigAvailable = false c.agentInitDone = make(chan struct{}) + c.keys = nil c.Unlock() + + // We are leaving the cluster. Make sure we + // close the gossip so that we stop all + // incoming gossip updates before cleaning up + // any remaining service bindings. But before + // deleting the networks since the networks + // should still be present when cleaning up + // service bindings c.agentClose() + c.cleanupServiceBindings("") + + c.Lock() + ingressSandbox := c.ingressSandbox + c.ingressSandbox = nil + c.Unlock() + + if ingressSandbox != nil { + if err := ingressSandbox.Delete(); err != nil { + log.Warnf("Could not delete ingress sandbox while leaving: %v", err) + } + } + + n, err := c.NetworkByName("ingress") + if err != nil { + log.Warnf("Could not find ingress network while leaving: %v", err) + } + + if n != nil { + if err := n.Delete(); err != nil { + log.Warnf("Could not delete ingress network while leaving: %v", err) + } + } + return } } @@ -317,7 +353,13 @@ func (c *controller) clusterAgentInit() { // AgentInitWait waits for agent initialization to be completed in the // controller. func (c *controller) AgentInitWait() { - <-c.agentInitDone + c.Lock() + agentInitDone := c.agentInitDone + c.Unlock() + + if agentInitDone != nil { + <-agentInitDone + } } func (c *controller) makeDriverConfig(ntype string) map[string]interface{} { @@ -575,6 +617,15 @@ func (c *controller) RegisterDriver(networkType string, driver driverapi.Driver, // NewNetwork creates a new network of the specified network type. The options // are network specific and modeled in a generic way. func (c *controller) NewNetwork(networkType, name string, id string, options ...NetworkOption) (Network, error) { + if id != "" { + c.networkLocker.Lock(id) + defer c.networkLocker.Unlock(id) + + if _, err := c.NetworkByID(id); err == nil { + return nil, NetworkNameError(id) + } + } + if !config.IsValidName(name) { return nil, ErrInvalidName(name) } @@ -660,6 +711,9 @@ func (c *controller) NewNetwork(networkType, name string, id string, options ... } joinCluster(network) + if !c.isDistributedControl() { + arrangeIngressFilterRule() + } return network, nil } @@ -699,9 +753,11 @@ func (c *controller) reservePools() { c.Gateway = n.ipamV4Info[i].Gateway.IP.String() } } - for i, c := range n.ipamV6Config { - if c.Gateway == "" && n.ipamV6Info[i].Gateway != nil { - c.Gateway = n.ipamV6Info[i].Gateway.IP.String() + if n.enableIPv6 { + for i, c := range n.ipamV6Config { + if c.Gateway == "" && n.ipamV6Info[i].Gateway != nil { + c.Gateway = n.ipamV6Info[i].Gateway.IP.String() + } } } // Reserve pools @@ -866,6 +922,7 @@ func (c *controller) NewSandbox(containerID string, options ...SandboxOption) (s if sb.ingress { c.ingressSandbox = sb + sb.id = "ingress_sbox" } c.Unlock() defer func() { diff --git a/vendor/src/github.com/docker/libnetwork/drivers/bridge/bridge.go b/vendor/src/github.com/docker/libnetwork/drivers/bridge/bridge.go index 8410eb037a..e276bc7b96 100644 --- a/vendor/src/github.com/docker/libnetwork/drivers/bridge/bridge.go +++ b/vendor/src/github.com/docker/libnetwork/drivers/bridge/bridge.go @@ -1243,8 +1243,17 @@ func (d *driver) ProgramExternalConnectivity(nid, eid string, options map[string return err } + defer func() { + if err != nil { + if e := network.releasePorts(endpoint); e != nil { + logrus.Errorf("Failed to release ports allocated for the bridge endpoint %s on failure %v because of %v", + eid, err, e) + } + endpoint.portMapping = nil + } + }() + if err = d.storeUpdate(endpoint); err != nil { - endpoint.portMapping = nil return fmt.Errorf("failed to update bridge endpoint %s to store: %v", endpoint.id[0:7], err) } diff --git a/vendor/src/github.com/docker/libnetwork/drivers/overlay/encryption.go b/vendor/src/github.com/docker/libnetwork/drivers/overlay/encryption.go index 5b4800716b..fefb5da65c 100644 --- a/vendor/src/github.com/docker/libnetwork/drivers/overlay/encryption.go +++ b/vendor/src/github.com/docker/libnetwork/drivers/overlay/encryption.go @@ -392,10 +392,11 @@ func (d *driver) secMapWalk(f func(string, []*spi) ([]*spi, bool)) error { } func (d *driver) setKeys(keys []*key) error { - if d.keys != nil { - return types.ForbiddenErrorf("initial keys are already present") - } + // Accept the encryption keys and clear any stale encryption map + d.Lock() d.keys = keys + d.secMap = &encrMap{nodes: map[string][]*spi{}} + d.Unlock() log.Debugf("Initial encryption keys: %v", d.keys) return nil } @@ -433,10 +434,8 @@ func (d *driver) updateKeys(newKey, primary, pruneKey *key) error { if (newKey != nil && newIdx == -1) || (primary != nil && priIdx == -1) || (pruneKey != nil && delIdx == -1) { - err := types.BadRequestErrorf("cannot find proper key indices while processing key update:"+ + return types.BadRequestErrorf("cannot find proper key indices while processing key update:"+ "(newIdx,priIdx,delIdx):(%d, %d, %d)", newIdx, priIdx, delIdx) - log.Warn(err) - return err } d.secMapWalk(func(rIPs string, spis []*spi) ([]*spi, bool) { diff --git a/vendor/src/github.com/docker/libnetwork/drivers/overlay/filter.go b/vendor/src/github.com/docker/libnetwork/drivers/overlay/filter.go index 2bf76b33a5..40cd7d9f28 100644 --- a/vendor/src/github.com/docker/libnetwork/drivers/overlay/filter.go +++ b/vendor/src/github.com/docker/libnetwork/drivers/overlay/filter.go @@ -12,6 +12,13 @@ const globalChain = "DOCKER-OVERLAY" var filterOnce sync.Once +var filterChan = make(chan struct{}, 1) + +func filterWait() func() { + filterChan <- struct{}{} + return func() { <-filterChan } +} + func chainExists(cname string) bool { if _, err := iptables.Raw("-L", cname); err != nil { return false @@ -69,10 +76,14 @@ func setNetworkChain(cname string, remove bool) error { } func addNetworkChain(cname string) error { + defer filterWait()() + return setNetworkChain(cname, false) } func removeNetworkChain(cname string) error { + defer filterWait()() + return setNetworkChain(cname, true) } @@ -119,9 +130,13 @@ func setFilters(cname, brName string, remove bool) error { } func addFilters(cname, brName string) error { + defer filterWait()() + return setFilters(cname, brName, false) } func removeFilters(cname, brName string) error { + defer filterWait()() + return setFilters(cname, brName, true) } diff --git a/vendor/src/github.com/docker/libnetwork/drivers/overlay/ov_network.go b/vendor/src/github.com/docker/libnetwork/drivers/overlay/ov_network.go index 4b4c0c417f..b5af0f4b04 100644 --- a/vendor/src/github.com/docker/libnetwork/drivers/overlay/ov_network.go +++ b/vendor/src/github.com/docker/libnetwork/drivers/overlay/ov_network.go @@ -500,9 +500,13 @@ func (n *network) initSubnetSandbox(s *subnet, restore bool) error { vxlanName := n.generateVxlanName(s) if restore { - n.restoreSubnetSandbox(s, brName, vxlanName) + if err := n.restoreSubnetSandbox(s, brName, vxlanName); err != nil { + return err + } } else { - n.setupSubnetSandbox(s, brName, vxlanName) + if err := n.setupSubnetSandbox(s, brName, vxlanName); err != nil { + return err + } } n.Lock() diff --git a/vendor/src/github.com/docker/libnetwork/drivers/overlay/overlay.go b/vendor/src/github.com/docker/libnetwork/drivers/overlay/overlay.go index 5b52a15c63..492f7f4254 100644 --- a/vendor/src/github.com/docker/libnetwork/drivers/overlay/overlay.go +++ b/vendor/src/github.com/docker/libnetwork/drivers/overlay/overlay.go @@ -90,7 +90,20 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error { } } - d.restoreEndpoints() + if err := d.restoreEndpoints(); err != nil { + logrus.Warnf("Failure during overlay endpoints restore: %v", err) + } + + // If an error happened when the network join the sandbox during the endpoints restore + // we should reset it now along with the once variable, so that subsequent endpoint joins + // outside of the restore path can potentially fix the network join and succeed. + for nid, n := range d.networks { + if n.initErr != nil { + logrus.Infof("resetting init error and once variable for network %s after unsuccesful endpoint restore: %v", nid, n.initErr) + n.initErr = nil + n.once = &sync.Once{} + } + } return dc.RegisterDriver(networkType, d, c) } @@ -323,7 +336,9 @@ func (d *driver) DiscoverNew(dType discoverapi.DiscoveryType, data interface{}) } keys = append(keys, k) } - d.setKeys(keys) + if err := d.setKeys(keys); err != nil { + logrus.Warn(err) + } case discoverapi.EncryptionKeysUpdate: var newKey, delKey, priKey *key encrData, ok := data.(discoverapi.DriverEncryptionUpdate) @@ -348,7 +363,9 @@ func (d *driver) DiscoverNew(dType discoverapi.DiscoveryType, data interface{}) tag: uint32(encrData.PruneTag), } } - d.updateKeys(newKey, priKey, delKey) + if err := d.updateKeys(newKey, priKey, delKey); err != nil { + logrus.Warn(err) + } default: } return nil diff --git a/vendor/src/github.com/docker/libnetwork/drivers/overlay/peerdb.go b/vendor/src/github.com/docker/libnetwork/drivers/overlay/peerdb.go index c9808f692a..517909a816 100644 --- a/vendor/src/github.com/docker/libnetwork/drivers/overlay/peerdb.go +++ b/vendor/src/github.com/docker/libnetwork/drivers/overlay/peerdb.go @@ -168,14 +168,14 @@ func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask } func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask, - peerMac net.HardwareAddr, vtep net.IP) bool { + peerMac net.HardwareAddr, vtep net.IP) peerEntry { peerDbWg.Wait() d.peerDb.Lock() pMap, ok := d.peerDb.mp[nid] if !ok { d.peerDb.Unlock() - return false + return peerEntry{} } d.peerDb.Unlock() @@ -186,19 +186,20 @@ func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPM pMap.Lock() - if pEntry, ok := pMap.mp[pKey.String()]; ok { + pEntry, ok := pMap.mp[pKey.String()] + if ok { // Mismatched endpoint ID(possibly outdated). Do not // delete peerdb if pEntry.eid != eid { pMap.Unlock() - return false + return pEntry } } delete(pMap.mp, pKey.String()) pMap.Unlock() - return true + return pEntry } func (d *driver) peerDbUpdateSandbox(nid string) { @@ -312,10 +313,9 @@ func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMas return err } + var pEntry peerEntry if updateDb { - if !d.peerDbDelete(nid, eid, peerIP, peerIPMask, peerMac, vtep) { - return nil - } + pEntry = d.peerDbDelete(nid, eid, peerIP, peerIPMask, peerMac, vtep) } n := d.network(nid) @@ -328,14 +328,24 @@ func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMas return nil } - // Delete fdb entry to the bridge for the peer mac - if err := sbox.DeleteNeighbor(vtep, peerMac); err != nil { - return fmt.Errorf("could not delete fdb entry into the sandbox: %v", err) + // Delete fdb entry to the bridge for the peer mac only if the + // entry existed in local peerdb. If it is a stale delete + // request, still call DeleteNeighbor but only to cleanup any + // leftover sandbox neighbor cache and not actually delete the + // kernel state. + if (eid == pEntry.eid && vtep.Equal(pEntry.vtep)) || + (eid != pEntry.eid && !vtep.Equal(pEntry.vtep)) { + if err := sbox.DeleteNeighbor(vtep, peerMac, + eid == pEntry.eid && vtep.Equal(pEntry.vtep)); err != nil { + return fmt.Errorf("could not delete fdb entry into the sandbox: %v", err) + } } // Delete neighbor entry for the peer IP - if err := sbox.DeleteNeighbor(peerIP, peerMac); err != nil { - return fmt.Errorf("could not delete neighbor entry into the sandbox: %v", err) + if eid == pEntry.eid { + if err := sbox.DeleteNeighbor(peerIP, peerMac, true); err != nil { + return fmt.Errorf("could not delete neighbor entry into the sandbox: %v", err) + } } if err := d.checkEncryption(nid, vtep, 0, false, false); err != nil { diff --git a/vendor/src/github.com/docker/libnetwork/endpoint.go b/vendor/src/github.com/docker/libnetwork/endpoint.go index d211dafc53..44893573ff 100644 --- a/vendor/src/github.com/docker/libnetwork/endpoint.go +++ b/vendor/src/github.com/docker/libnetwork/endpoint.go @@ -757,17 +757,6 @@ func (ep *endpoint) Delete(force bool) error { } }() - if err = n.getEpCnt().DecEndpointCnt(); err != nil && !force { - return err - } - defer func() { - if err != nil && !force { - if e := n.getEpCnt().IncEndpointCnt(); e != nil { - log.Warnf("failed to update network %s : %v", n.name, e) - } - } - }() - // unwatch for service records n.getController().unWatchSvcRecord(ep) @@ -777,6 +766,10 @@ func (ep *endpoint) Delete(force bool) error { ep.releaseAddress() + if err := n.getEpCnt().DecEndpointCnt(); err != nil { + log.Warnf("failed to decrement endpoint coint for ep %s: %v", ep.ID(), err) + } + return nil } diff --git a/vendor/src/github.com/docker/libnetwork/network.go b/vendor/src/github.com/docker/libnetwork/network.go index ffdc232486..2cd857a462 100644 --- a/vendor/src/github.com/docker/libnetwork/network.go +++ b/vendor/src/github.com/docker/libnetwork/network.go @@ -756,6 +756,19 @@ func (n *network) delete(force bool) error { log.Warnf("Failed to update store after ipam release for network %s (%s): %v", n.Name(), n.ID(), err) } + // We are about to delete the network. Leave the gossip + // cluster for the network to stop all incoming network + // specific gossip updates before cleaning up all the service + // bindings for the network. But cleanup service binding + // before deleting the network from the store since service + // bindings cleanup requires the network in the store. + n.cancelDriverWatches() + if err = n.leaveCluster(); err != nil { + log.Errorf("Failed leaving network %s from the agent cluster: %v", n.Name(), err) + } + + c.cleanupServiceBindings(n.ID()) + // deleteFromStore performs an atomic delete operation and the // network.epCnt will help prevent any possible // race between endpoint join and network delete @@ -770,12 +783,6 @@ func (n *network) delete(force bool) error { return fmt.Errorf("error deleting network from store: %v", err) } - n.cancelDriverWatches() - - if err = n.leaveCluster(); err != nil { - log.Errorf("Failed leaving network %s from the agent cluster: %v", n.Name(), err) - } - return nil } @@ -1118,7 +1125,7 @@ func (n *network) getSvcRecords(ep *endpoint) []etchosts.Record { continue } if len(ip) == 0 { - log.Warnf("Found empty list of IP addresses for service %s on network %s (%s)", h, n.Name(), n.ID()) + log.Warnf("Found empty list of IP addresses for service %s on network %s (%s)", h, n.name, n.id) continue } recs = append(recs, etchosts.Record{ diff --git a/vendor/src/github.com/docker/libnetwork/networkdb/broadcast.go b/vendor/src/github.com/docker/libnetwork/networkdb/broadcast.go index 2e07729569..faaf642948 100644 --- a/vendor/src/github.com/docker/libnetwork/networkdb/broadcast.go +++ b/vendor/src/github.com/docker/libnetwork/networkdb/broadcast.go @@ -1,10 +1,15 @@ package networkdb import ( + "fmt" + "time" + "github.com/hashicorp/memberlist" "github.com/hashicorp/serf/serf" ) +const broadcastTimeout = 5 * time.Second + type networkEventMessage struct { id string node string @@ -44,6 +49,53 @@ func (nDB *NetworkDB) sendNetworkEvent(nid string, event NetworkEvent_Type, ltim return nil } +type nodeEventMessage struct { + msg []byte + notify chan<- struct{} +} + +func (m *nodeEventMessage) Invalidates(other memberlist.Broadcast) bool { + return false +} + +func (m *nodeEventMessage) Message() []byte { + return m.msg +} + +func (m *nodeEventMessage) Finished() { + if m.notify != nil { + close(m.notify) + } +} + +func (nDB *NetworkDB) sendNodeEvent(event NodeEvent_Type) error { + nEvent := NodeEvent{ + Type: event, + LTime: nDB.networkClock.Increment(), + NodeName: nDB.config.NodeName, + } + + raw, err := encodeMessage(MessageTypeNodeEvent, &nEvent) + if err != nil { + return err + } + + notifyCh := make(chan struct{}) + nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{ + msg: raw, + notify: notifyCh, + }) + + // Wait for the broadcast + select { + case <-notifyCh: + case <-time.After(broadcastTimeout): + return fmt.Errorf("timed out broadcasting node event") + } + + return nil +} + type tableEventMessage struct { id string tname string diff --git a/vendor/src/github.com/docker/libnetwork/networkdb/cluster.go b/vendor/src/github.com/docker/libnetwork/networkdb/cluster.go index 50661e1743..3b624c9a27 100644 --- a/vendor/src/github.com/docker/libnetwork/networkdb/cluster.go +++ b/vendor/src/github.com/docker/libnetwork/networkdb/cluster.go @@ -7,6 +7,7 @@ import ( "fmt" "math/big" rnd "math/rand" + "net" "strings" "time" @@ -14,7 +15,11 @@ import ( "github.com/hashicorp/memberlist" ) -const reapInterval = 30 * time.Second +const ( + reapInterval = 60 * time.Second + reapPeriod = 5 * time.Second + retryInterval = 1 * time.Second +) type logWriter struct{} @@ -81,6 +86,7 @@ func (nDB *NetworkDB) RemoveKey(key []byte) { func (nDB *NetworkDB) clusterInit() error { config := memberlist.DefaultLANConfig() config.Name = nDB.config.NodeName + config.BindAddr = nDB.config.BindAddr config.AdvertiseAddr = nDB.config.AdvertiseAddr if nDB.config.BindPort != 0 { @@ -111,6 +117,13 @@ func (nDB *NetworkDB) clusterInit() error { RetransmitMult: config.RetransmitMult, } + nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{ + NumNodes: func() int { + return len(nDB.nodes) + }, + RetransmitMult: config.RetransmitMult, + } + mlist, err := memberlist.Create(config) if err != nil { return fmt.Errorf("failed to create memberlist: %v", err) @@ -124,9 +137,10 @@ func (nDB *NetworkDB) clusterInit() error { interval time.Duration fn func() }{ - {reapInterval, nDB.reapState}, + {reapPeriod, nDB.reapState}, {config.GossipInterval, nDB.gossip}, {config.PushPullInterval, nDB.bulkSyncTables}, + {retryInterval, nDB.reconnectNode}, } { t := time.NewTicker(trigger.interval) go nDB.triggerFunc(trigger.interval, t.C, nDB.stopCh, trigger.fn) @@ -136,19 +150,49 @@ func (nDB *NetworkDB) clusterInit() error { return nil } +func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) { + t := time.NewTicker(retryInterval) + defer t.Stop() + + for { + select { + case <-t.C: + if _, err := nDB.memberlist.Join(members); err != nil { + logrus.Errorf("Failed to join memberlist %s on retry: %v", members, err) + continue + } + return + case <-stop: + return + } + } + +} + func (nDB *NetworkDB) clusterJoin(members []string) error { mlist := nDB.memberlist if _, err := mlist.Join(members); err != nil { + // Incase of failure, keep retrying join until it succeeds or the cluster is shutdown. + go nDB.retryJoin(members, nDB.stopCh) + return fmt.Errorf("could not join node to memberlist: %v", err) } + if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil { + return fmt.Errorf("failed to send node join: %v", err) + } + return nil } func (nDB *NetworkDB) clusterLeave() error { mlist := nDB.memberlist + if err := nDB.sendNodeEvent(NodeEventTypeLeave); err != nil { + logrus.Errorf("failed to send node leave: %v", err) + } + if err := mlist.Leave(time.Second); err != nil { return err } @@ -180,6 +224,42 @@ func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, sto } } +func (nDB *NetworkDB) reconnectNode() { + nDB.RLock() + if len(nDB.failedNodes) == 0 { + nDB.RUnlock() + return + } + + nodes := make([]*node, 0, len(nDB.failedNodes)) + for _, n := range nDB.failedNodes { + nodes = append(nodes, n) + } + nDB.RUnlock() + + node := nodes[randomOffset(len(nodes))] + addr := net.UDPAddr{IP: node.Addr, Port: int(node.Port)} + + if _, err := nDB.memberlist.Join([]string{addr.String()}); err != nil { + return + } + + if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil { + logrus.Errorf("failed to send node join during reconnect: %v", err) + return + } + + // Update all the local table state to a new time to + // force update on the node we are trying to rejoin, just in + // case that node has these in deleting state still. This is + // facilitate fast convergence after recovering from a gossip + // failure. + nDB.updateLocalTableTime() + + logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name) + nDB.bulkSync([]string{node.Name}, true) +} + func (nDB *NetworkDB) reapState() { nDB.reapNetworks() nDB.reapTableEntries() @@ -200,10 +280,7 @@ func (nDB *NetworkDB) reapNetworks() { } func (nDB *NetworkDB) reapTableEntries() { - var ( - paths []string - entries []*entry - ) + var paths []string now := time.Now() @@ -219,14 +296,12 @@ func (nDB *NetworkDB) reapTableEntries() { } paths = append(paths, path) - entries = append(entries, entry) return false }) nDB.RUnlock() nDB.Lock() - for i, path := range paths { - entry := entries[i] + for _, path := range paths { params := strings.Split(path[1:], "/") tname := params[0] nid := params[1] @@ -239,8 +314,6 @@ func (nDB *NetworkDB) reapTableEntries() { if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok { logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key) } - - nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value)) } nDB.Unlock() } @@ -295,7 +368,7 @@ func (nDB *NetworkDB) gossip() { } // Send the compound message - if err := nDB.memberlist.SendToUDP(mnode, compound); err != nil { + if err := nDB.memberlist.SendToUDP(&mnode.Node, compound); err != nil { logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err) } } @@ -330,7 +403,7 @@ func (nDB *NetworkDB) bulkSyncTables() { continue } - completed, err := nDB.bulkSync(nid, nodes, false) + completed, err := nDB.bulkSync(nodes, false) if err != nil { logrus.Errorf("periodic bulk sync failure for network %s: %v", nid, err) continue @@ -357,7 +430,7 @@ func (nDB *NetworkDB) bulkSyncTables() { } } -func (nDB *NetworkDB) bulkSync(nid string, nodes []string, all bool) ([]string, error) { +func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) { if !all { // If not all, then just pick one. nodes = nDB.mRandomNodes(1, nodes) @@ -395,7 +468,12 @@ func (nDB *NetworkDB) bulkSync(nid string, nodes []string, all bool) ([]string, func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited bool) error { var msgs [][]byte - logrus.Debugf("%s: Initiating bulk sync for networks %v with node %s", nDB.config.NodeName, networks, node) + var unsolMsg string + if unsolicited { + unsolMsg = "unsolicited" + } + + logrus.Debugf("%s: Initiating %s bulk sync for networks %v with node %s", nDB.config.NodeName, unsolMsg, networks, node) nDB.RLock() mnode := nDB.nodes[node] @@ -411,15 +489,14 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b return false } - // Do not bulk sync state which is in the - // process of getting deleted. + eType := TableEventTypeCreate if entry.deleting { - return false + eType = TableEventTypeDelete } params := strings.Split(path[1:], "/") tEvent := TableEvent{ - Type: TableEventTypeCreate, + Type: eType, LTime: entry.ltime, NodeName: entry.node, NetworkID: nid, @@ -461,7 +538,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b nDB.bulkSyncAckTbl[node] = ch nDB.Unlock() - err = nDB.memberlist.SendToTCP(mnode, buf) + err = nDB.memberlist.SendToTCP(&mnode.Node, buf) if err != nil { nDB.Lock() delete(nDB.bulkSyncAckTbl, node) diff --git a/vendor/src/github.com/docker/libnetwork/networkdb/delegate.go b/vendor/src/github.com/docker/libnetwork/networkdb/delegate.go index 98007516d4..eb8d18557d 100644 --- a/vendor/src/github.com/docker/libnetwork/networkdb/delegate.go +++ b/vendor/src/github.com/docker/libnetwork/networkdb/delegate.go @@ -3,6 +3,7 @@ package networkdb import ( "fmt" "net" + "strings" "time" "github.com/Sirupsen/logrus" @@ -17,6 +18,76 @@ func (d *delegate) NodeMeta(limit int) []byte { return []byte{} } +func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node { + nDB.Lock() + defer nDB.Unlock() + + for _, nodes := range []map[string]*node{ + nDB.failedNodes, + nDB.leftNodes, + nDB.nodes, + } { + if n, ok := nodes[nEvent.NodeName]; ok { + if n.ltime >= nEvent.LTime { + return nil + } + + delete(nodes, n.Name) + return n + } + } + + return nil +} + +func (nDB *NetworkDB) purgeSameNode(n *node) { + nDB.Lock() + defer nDB.Unlock() + + prefix := strings.Split(n.Name, "-")[0] + for _, nodes := range []map[string]*node{ + nDB.failedNodes, + nDB.leftNodes, + nDB.nodes, + } { + var nodeNames []string + for name, node := range nodes { + if strings.HasPrefix(name, prefix) && n.Addr.Equal(node.Addr) { + nodeNames = append(nodeNames, name) + } + } + + for _, name := range nodeNames { + delete(nodes, name) + } + } +} + +func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { + n := nDB.checkAndGetNode(nEvent) + if n == nil { + return false + } + + nDB.purgeSameNode(n) + n.ltime = nEvent.LTime + + switch nEvent.Type { + case NodeEventTypeJoin: + nDB.Lock() + nDB.nodes[n.Name] = n + nDB.Unlock() + return true + case NodeEventTypeLeave: + nDB.Lock() + nDB.leftNodes[n.Name] = n + nDB.Unlock() + return true + } + + return false +} + func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { // Update our local clock if the received messages has newer // time. @@ -53,6 +124,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { n.leaveTime = time.Now() } + nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName) return true } @@ -66,7 +138,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { ltime: nEvent.LTime, } - nDB.networkNodes[nEvent.NetworkID] = append(nDB.networkNodes[nEvent.NetworkID], nEvent.NodeName) + nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName) return true } @@ -84,28 +156,34 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { return true } - if entry, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key); err == nil { + e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key) + if err != nil && tEvent.Type == TableEventTypeDelete { + // If it is a delete event and we don't have the entry here nothing to do. + return false + } + + if err == nil { // We have the latest state. Ignore the event // since it is stale. - if entry.ltime >= tEvent.LTime { + if e.ltime >= tEvent.LTime { return false } } - entry := &entry{ + e = &entry{ ltime: tEvent.LTime, node: tEvent.NodeName, value: tEvent.Value, deleting: tEvent.Type == TableEventTypeDelete, } - if entry.deleting { - entry.deleteTime = time.Now() + if e.deleting { + e.deleteTime = time.Now() } nDB.Lock() - nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), entry) - nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), entry) + nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), e) + nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e) nDB.Unlock() var op opType @@ -181,6 +259,27 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) { } } +func (nDB *NetworkDB) handleNodeMessage(buf []byte) { + var nEvent NodeEvent + if err := proto.Unmarshal(buf, &nEvent); err != nil { + logrus.Errorf("Error decoding node event message: %v", err) + return + } + + if rebroadcast := nDB.handleNodeEvent(&nEvent); rebroadcast { + var err error + buf, err = encodeRawMessage(MessageTypeNodeEvent, buf) + if err != nil { + logrus.Errorf("Error marshalling gossip message for node event rebroadcast: %v", err) + return + } + + nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{ + msg: buf, + }) + } +} + func (nDB *NetworkDB) handleNetworkMessage(buf []byte) { var nEvent NetworkEvent if err := proto.Unmarshal(buf, &nEvent); err != nil { @@ -249,6 +348,8 @@ func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) { } switch mType { + case MessageTypeNodeEvent: + nDB.handleNodeMessage(data) case MessageTypeNetworkEvent: nDB.handleNetworkMessage(data) case MessageTypeTableEvent: @@ -271,15 +372,27 @@ func (d *delegate) NotifyMsg(buf []byte) { } func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { - return d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit) + msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit) + msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...) + return msgs } func (d *delegate) LocalState(join bool) []byte { + if join { + // Update all the local node/network state to a new time to + // force update on the node we are trying to rejoin, just in + // case that node has these in leaving state still. This is + // facilitate fast convergence after recovering from a gossip + // failure. + d.nDB.updateLocalNetworkTime() + } + d.nDB.RLock() defer d.nDB.RUnlock() pp := NetworkPushPull{ - LTime: d.nDB.networkClock.Time(), + LTime: d.nDB.networkClock.Time(), + NodeName: d.nDB.config.NodeName, } for name, nn := range d.nDB.networks { @@ -325,9 +438,12 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) { return } - if pp.LTime > 0 { - d.nDB.networkClock.Witness(pp.LTime) + nodeEvent := &NodeEvent{ + LTime: pp.LTime, + NodeName: pp.NodeName, + Type: NodeEventTypeJoin, } + d.nDB.handleNodeEvent(nodeEvent) for _, n := range pp.Networks { nEvent := &NetworkEvent{ diff --git a/vendor/src/github.com/docker/libnetwork/networkdb/event_delegate.go b/vendor/src/github.com/docker/libnetwork/networkdb/event_delegate.go index 7dfea84f6e..019cafbd06 100644 --- a/vendor/src/github.com/docker/libnetwork/networkdb/event_delegate.go +++ b/vendor/src/github.com/docker/libnetwork/networkdb/event_delegate.go @@ -6,17 +6,31 @@ type eventDelegate struct { nDB *NetworkDB } -func (e *eventDelegate) NotifyJoin(n *memberlist.Node) { +func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) { e.nDB.Lock() - e.nDB.nodes[n.Name] = n + // In case the node is rejoining after a failure or leave, + // wait until an explicit join message arrives before adding + // it to the nodes just to make sure this is not a stale + // join. If you don't know about this node add it immediately. + _, fOk := e.nDB.failedNodes[mn.Name] + _, lOk := e.nDB.leftNodes[mn.Name] + if fOk || lOk { + e.nDB.Unlock() + return + } + + e.nDB.nodes[mn.Name] = &node{Node: *mn} e.nDB.Unlock() } -func (e *eventDelegate) NotifyLeave(n *memberlist.Node) { - e.nDB.deleteNodeTableEntries(n.Name) - e.nDB.deleteNetworkNodeEntries(n.Name) +func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) { + e.nDB.deleteNodeTableEntries(mn.Name) + e.nDB.deleteNetworkEntriesForNode(mn.Name) e.nDB.Lock() - delete(e.nDB.nodes, n.Name) + if n, ok := e.nDB.nodes[mn.Name]; ok { + delete(e.nDB.nodes, mn.Name) + e.nDB.failedNodes[mn.Name] = n + } e.nDB.Unlock() } diff --git a/vendor/src/github.com/docker/libnetwork/networkdb/networkdb.go b/vendor/src/github.com/docker/libnetwork/networkdb/networkdb.go index f21e572e69..a8c942c9cc 100644 --- a/vendor/src/github.com/docker/libnetwork/networkdb/networkdb.go +++ b/vendor/src/github.com/docker/libnetwork/networkdb/networkdb.go @@ -41,7 +41,13 @@ type NetworkDB struct { // List of all peer nodes in the cluster not-limited to any // network. - nodes map[string]*memberlist.Node + nodes map[string]*node + + // List of all peer nodes which have failed + failedNodes map[string]*node + + // List of all peer nodes which have left + leftNodes map[string]*node // A multi-dimensional map of network/node attachmemts. The // first key is a node name and the second key is a network ID @@ -66,6 +72,9 @@ type NetworkDB struct { // Broadcast queue for network event gossip. networkBroadcasts *memberlist.TransmitLimitedQueue + // Broadcast queue for node event gossip. + nodeBroadcasts *memberlist.TransmitLimitedQueue + // A central stop channel to stop all go routines running on // behalf of the NetworkDB instance. stopCh chan struct{} @@ -82,6 +91,11 @@ type NetworkDB struct { keyring *memberlist.Keyring } +type node struct { + memberlist.Node + ltime serf.LamportTime +} + // network describes the node/network attachment. type network struct { // Network ID @@ -107,6 +121,10 @@ type Config struct { // NodeName is the cluster wide unique name for this node. NodeName string + // BindAddr is the IP on which networkdb listens. It can be + // 0.0.0.0 to listen on all addresses on the host. + BindAddr string + // AdvertiseAddr is the node's IP address that we advertise for // cluster communication. AdvertiseAddr string @@ -146,7 +164,9 @@ func New(c *Config) (*NetworkDB, error) { config: c, indexes: make(map[int]*radix.Tree), networks: make(map[string]map[string]*network), - nodes: make(map[string]*memberlist.Node), + nodes: make(map[string]*node), + failedNodes: make(map[string]*node), + leftNodes: make(map[string]*node), networkNodes: make(map[string][]string), bulkSyncAckTbl: make(map[string]chan struct{}), broadcaster: events.NewBroadcaster(), @@ -286,7 +306,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error { return nil } -func (nDB *NetworkDB) deleteNetworkNodeEntries(deletedNode string) { +func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) { nDB.Lock() for nid, nodes := range nDB.networkNodes { updatedNodes := make([]string, 0, len(nodes)) @@ -300,6 +320,8 @@ func (nDB *NetworkDB) deleteNetworkNodeEntries(deletedNode string) { nDB.networkNodes[nid] = updatedNodes } + + delete(nDB.networks, deletedNode) nDB.Unlock() } @@ -326,6 +348,8 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) { nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) + + nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value)) return false }) nDB.Unlock() @@ -387,7 +411,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { } logrus.Debugf("%s: joined network %s", nDB.config.NodeName, nid) - if _, err := nDB.bulkSync(nid, networkNodes, true); err != nil { + if _, err := nDB.bulkSync(networkNodes, true); err != nil { logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err) } @@ -453,6 +477,20 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error { return nil } +// addNetworkNode adds the node to the list of nodes which participate +// in the passed network only if it is not already present. Caller +// should hold the NetworkDB lock while calling this +func (nDB *NetworkDB) addNetworkNode(nid string, nodeName string) { + nodes := nDB.networkNodes[nid] + for _, node := range nodes { + if node == nodeName { + return + } + } + + nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nodeName) +} + // Deletes the node from the list of nodes which participate in the // passed network. Caller should hold the NetworkDB lock while calling // this @@ -476,10 +514,46 @@ func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string { var networks []string for nid := range nDB.networks[nDB.config.NodeName] { - if _, ok := nDB.networks[nodeName][nid]; ok { - networks = append(networks, nid) + if n, ok := nDB.networks[nodeName][nid]; ok { + if !n.leaving { + networks = append(networks, nid) + } } } return networks } + +func (nDB *NetworkDB) updateLocalNetworkTime() { + nDB.Lock() + defer nDB.Unlock() + + ltime := nDB.networkClock.Increment() + for _, n := range nDB.networks[nDB.config.NodeName] { + n.ltime = ltime + } +} + +func (nDB *NetworkDB) updateLocalTableTime() { + nDB.Lock() + defer nDB.Unlock() + + ltime := nDB.tableClock.Increment() + nDB.indexes[byTable].Walk(func(path string, v interface{}) bool { + entry := v.(*entry) + if entry.node != nDB.config.NodeName { + return false + } + + params := strings.Split(path[1:], "/") + tname := params[0] + nid := params[1] + key := params[2] + entry.ltime = ltime + + nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) + nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) + + return false + }) +} diff --git a/vendor/src/github.com/docker/libnetwork/networkdb/networkdb.pb.go b/vendor/src/github.com/docker/libnetwork/networkdb/networkdb.pb.go index 86177cf315..dfbc7131fb 100644 --- a/vendor/src/github.com/docker/libnetwork/networkdb/networkdb.pb.go +++ b/vendor/src/github.com/docker/libnetwork/networkdb/networkdb.pb.go @@ -10,6 +10,7 @@ It has these top-level messages: GossipMessage + NodeEvent NetworkEvent NetworkEntry NetworkPushPull @@ -67,6 +68,9 @@ const ( // which is a pack of many message of above types, packed into // a single compound message. MessageTypeCompound MessageType = 5 + // NodeEvent message type is used to communicare node + // join/leave events in the cluster + MessageTypeNodeEvent MessageType = 6 ) var MessageType_name = map[int32]string{ @@ -76,6 +80,7 @@ var MessageType_name = map[int32]string{ 3: "PUSH_PULL", 4: "BULK_SYNC", 5: "COMPOUND", + 6: "NODE_EVENT", } var MessageType_value = map[string]int32{ "INVALID": 0, @@ -84,6 +89,7 @@ var MessageType_value = map[string]int32{ "PUSH_PULL": 3, "BULK_SYNC": 4, "COMPOUND": 5, + "NODE_EVENT": 6, } func (x MessageType) String() string { @@ -91,6 +97,32 @@ func (x MessageType) String() string { } func (MessageType) EnumDescriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{0} } +type NodeEvent_Type int32 + +const ( + NodeEventTypeInvalid NodeEvent_Type = 0 + // Join event is generated when this node joins the cluster. + NodeEventTypeJoin NodeEvent_Type = 1 + // Leave event is generated when this node leaves the cluster. + NodeEventTypeLeave NodeEvent_Type = 2 +) + +var NodeEvent_Type_name = map[int32]string{ + 0: "INVALID", + 1: "JOIN", + 2: "LEAVE", +} +var NodeEvent_Type_value = map[string]int32{ + "INVALID": 0, + "JOIN": 1, + "LEAVE": 2, +} + +func (x NodeEvent_Type) String() string { + return proto.EnumName(NodeEvent_Type_name, int32(x)) +} +func (NodeEvent_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{1, 0} } + type NetworkEvent_Type int32 const ( @@ -115,7 +147,7 @@ var NetworkEvent_Type_value = map[string]int32{ func (x NetworkEvent_Type) String() string { return proto.EnumName(NetworkEvent_Type_name, int32(x)) } -func (NetworkEvent_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{1, 0} } +func (NetworkEvent_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{2, 0} } type TableEvent_Type int32 @@ -148,7 +180,7 @@ var TableEvent_Type_value = map[string]int32{ func (x TableEvent_Type) String() string { return proto.EnumName(TableEvent_Type_name, int32(x)) } -func (TableEvent_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{4, 0} } +func (TableEvent_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{5, 0} } // GossipMessage is a basic message header used by all messages types. type GossipMessage struct { @@ -160,6 +192,21 @@ func (m *GossipMessage) Reset() { *m = GossipMessage{} } func (*GossipMessage) ProtoMessage() {} func (*GossipMessage) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{0} } +// NodeEvent message payload definition. +type NodeEvent struct { + Type NodeEvent_Type `protobuf:"varint,1,opt,name=type,proto3,enum=networkdb.NodeEvent_Type" json:"type,omitempty"` + // Lamport time using a network lamport clock indicating the + // time this event was generated on the node where it was + // generated. + LTime github_com_hashicorp_serf_serf.LamportTime `protobuf:"varint,2,opt,name=l_time,json=lTime,proto3,customtype=github.com/hashicorp/serf/serf.LamportTime" json:"l_time"` + // Source node name. + NodeName string `protobuf:"bytes,3,opt,name=node_name,json=nodeName,proto3" json:"node_name,omitempty"` +} + +func (m *NodeEvent) Reset() { *m = NodeEvent{} } +func (*NodeEvent) ProtoMessage() {} +func (*NodeEvent) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{1} } + // NetworkEvent message payload definition. type NetworkEvent struct { Type NetworkEvent_Type `protobuf:"varint,1,opt,name=type,proto3,enum=networkdb.NetworkEvent_Type" json:"type,omitempty"` @@ -175,7 +222,7 @@ type NetworkEvent struct { func (m *NetworkEvent) Reset() { *m = NetworkEvent{} } func (*NetworkEvent) ProtoMessage() {} -func (*NetworkEvent) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{1} } +func (*NetworkEvent) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{2} } // NetworkEntry for push pull of networks. type NetworkEntry struct { @@ -192,18 +239,20 @@ type NetworkEntry struct { func (m *NetworkEntry) Reset() { *m = NetworkEntry{} } func (*NetworkEntry) ProtoMessage() {} -func (*NetworkEntry) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{2} } +func (*NetworkEntry) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{3} } // NetworkPushpull message payload definition. type NetworkPushPull struct { // Lamport time when this push pull was initiated. LTime github_com_hashicorp_serf_serf.LamportTime `protobuf:"varint,1,opt,name=l_time,json=lTime,proto3,customtype=github.com/hashicorp/serf/serf.LamportTime" json:"l_time"` Networks []*NetworkEntry `protobuf:"bytes,2,rep,name=networks" json:"networks,omitempty"` + // Name of the node sending this push pull payload. + NodeName string `protobuf:"bytes,3,opt,name=node_name,json=nodeName,proto3" json:"node_name,omitempty"` } func (m *NetworkPushPull) Reset() { *m = NetworkPushPull{} } func (*NetworkPushPull) ProtoMessage() {} -func (*NetworkPushPull) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{3} } +func (*NetworkPushPull) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{4} } func (m *NetworkPushPull) GetNetworks() []*NetworkEntry { if m != nil { @@ -231,7 +280,7 @@ type TableEvent struct { func (m *TableEvent) Reset() { *m = TableEvent{} } func (*TableEvent) ProtoMessage() {} -func (*TableEvent) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{4} } +func (*TableEvent) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{5} } // BulkSync message payload definition. type BulkSyncMessage struct { @@ -251,7 +300,7 @@ type BulkSyncMessage struct { func (m *BulkSyncMessage) Reset() { *m = BulkSyncMessage{} } func (*BulkSyncMessage) ProtoMessage() {} -func (*BulkSyncMessage) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{5} } +func (*BulkSyncMessage) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{6} } // Compound message payload definition. type CompoundMessage struct { @@ -261,7 +310,7 @@ type CompoundMessage struct { func (m *CompoundMessage) Reset() { *m = CompoundMessage{} } func (*CompoundMessage) ProtoMessage() {} -func (*CompoundMessage) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{6} } +func (*CompoundMessage) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{7} } func (m *CompoundMessage) GetMessages() []*CompoundMessage_SimpleMessage { if m != nil { @@ -279,11 +328,12 @@ type CompoundMessage_SimpleMessage struct { func (m *CompoundMessage_SimpleMessage) Reset() { *m = CompoundMessage_SimpleMessage{} } func (*CompoundMessage_SimpleMessage) ProtoMessage() {} func (*CompoundMessage_SimpleMessage) Descriptor() ([]byte, []int) { - return fileDescriptorNetworkdb, []int{6, 0} + return fileDescriptorNetworkdb, []int{7, 0} } func init() { proto.RegisterType((*GossipMessage)(nil), "networkdb.GossipMessage") + proto.RegisterType((*NodeEvent)(nil), "networkdb.NodeEvent") proto.RegisterType((*NetworkEvent)(nil), "networkdb.NetworkEvent") proto.RegisterType((*NetworkEntry)(nil), "networkdb.NetworkEntry") proto.RegisterType((*NetworkPushPull)(nil), "networkdb.NetworkPushPull") @@ -292,6 +342,7 @@ func init() { proto.RegisterType((*CompoundMessage)(nil), "networkdb.CompoundMessage") proto.RegisterType((*CompoundMessage_SimpleMessage)(nil), "networkdb.CompoundMessage.SimpleMessage") proto.RegisterEnum("networkdb.MessageType", MessageType_name, MessageType_value) + proto.RegisterEnum("networkdb.NodeEvent_Type", NodeEvent_Type_name, NodeEvent_Type_value) proto.RegisterEnum("networkdb.NetworkEvent_Type", NetworkEvent_Type_name, NetworkEvent_Type_value) proto.RegisterEnum("networkdb.TableEvent_Type", TableEvent_Type_name, TableEvent_Type_value) } @@ -306,6 +357,18 @@ func (this *GossipMessage) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *NodeEvent) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&networkdb.NodeEvent{") + s = append(s, "Type: "+fmt.Sprintf("%#v", this.Type)+",\n") + s = append(s, "LTime: "+fmt.Sprintf("%#v", this.LTime)+",\n") + s = append(s, "NodeName: "+fmt.Sprintf("%#v", this.NodeName)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} func (this *NetworkEvent) GoString() string { if this == nil { return "nil" @@ -336,12 +399,13 @@ func (this *NetworkPushPull) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 6) + s := make([]string, 0, 7) s = append(s, "&networkdb.NetworkPushPull{") s = append(s, "LTime: "+fmt.Sprintf("%#v", this.LTime)+",\n") if this.Networks != nil { s = append(s, "Networks: "+fmt.Sprintf("%#v", this.Networks)+",\n") } + s = append(s, "NodeName: "+fmt.Sprintf("%#v", this.NodeName)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -451,6 +515,40 @@ func (m *GossipMessage) MarshalTo(data []byte) (int, error) { return i, nil } +func (m *NodeEvent) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *NodeEvent) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Type != 0 { + data[i] = 0x8 + i++ + i = encodeVarintNetworkdb(data, i, uint64(m.Type)) + } + if m.LTime != 0 { + data[i] = 0x10 + i++ + i = encodeVarintNetworkdb(data, i, uint64(m.LTime)) + } + if len(m.NodeName) > 0 { + data[i] = 0x1a + i++ + i = encodeVarintNetworkdb(data, i, uint64(len(m.NodeName))) + i += copy(data[i:], m.NodeName) + } + return i, nil +} + func (m *NetworkEvent) Marshal() (data []byte, err error) { size := m.Size() data = make([]byte, size) @@ -568,6 +666,12 @@ func (m *NetworkPushPull) MarshalTo(data []byte) (int, error) { i += n } } + if len(m.NodeName) > 0 { + data[i] = 0x1a + i++ + i = encodeVarintNetworkdb(data, i, uint64(len(m.NodeName))) + i += copy(data[i:], m.NodeName) + } return i, nil } @@ -783,6 +887,22 @@ func (m *GossipMessage) Size() (n int) { return n } +func (m *NodeEvent) Size() (n int) { + var l int + _ = l + if m.Type != 0 { + n += 1 + sovNetworkdb(uint64(m.Type)) + } + if m.LTime != 0 { + n += 1 + sovNetworkdb(uint64(m.LTime)) + } + l = len(m.NodeName) + if l > 0 { + n += 1 + l + sovNetworkdb(uint64(l)) + } + return n +} + func (m *NetworkEvent) Size() (n int) { var l int _ = l @@ -835,6 +955,10 @@ func (m *NetworkPushPull) Size() (n int) { n += 1 + l + sovNetworkdb(uint64(l)) } } + l = len(m.NodeName) + if l > 0 { + n += 1 + l + sovNetworkdb(uint64(l)) + } return n } @@ -942,6 +1066,18 @@ func (this *GossipMessage) String() string { }, "") return s } +func (this *NodeEvent) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&NodeEvent{`, + `Type:` + fmt.Sprintf("%v", this.Type) + `,`, + `LTime:` + fmt.Sprintf("%v", this.LTime) + `,`, + `NodeName:` + fmt.Sprintf("%v", this.NodeName) + `,`, + `}`, + }, "") + return s +} func (this *NetworkEvent) String() string { if this == nil { return "nil" @@ -975,6 +1111,7 @@ func (this *NetworkPushPull) String() string { s := strings.Join([]string{`&NetworkPushPull{`, `LTime:` + fmt.Sprintf("%v", this.LTime) + `,`, `Networks:` + strings.Replace(fmt.Sprintf("%v", this.Networks), "NetworkEntry", "NetworkEntry", 1) + `,`, + `NodeName:` + fmt.Sprintf("%v", this.NodeName) + `,`, `}`, }, "") return s @@ -1137,6 +1274,123 @@ func (m *GossipMessage) Unmarshal(data []byte) error { } return nil } +func (m *NodeEvent) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNetworkdb + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: NodeEvent: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: NodeEvent: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + m.Type = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNetworkdb + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Type |= (NodeEvent_Type(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field LTime", wireType) + } + m.LTime = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNetworkdb + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.LTime |= (github_com_hashicorp_serf_serf.LamportTime(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field NodeName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNetworkdb + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthNetworkdb + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.NodeName = string(data[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipNetworkdb(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthNetworkdb + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *NetworkEvent) Unmarshal(data []byte) error { l := len(data) iNdEx := 0 @@ -1509,6 +1763,35 @@ func (m *NetworkPushPull) Unmarshal(data []byte) error { return err } iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field NodeName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNetworkdb + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthNetworkdb + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.NodeName = string(data[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipNetworkdb(data[iNdEx:]) @@ -2211,56 +2494,61 @@ var ( ) var fileDescriptorNetworkdb = []byte{ - // 812 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xcc, 0x95, 0x4d, 0x6f, 0xe2, 0x46, - 0x18, 0xc7, 0x31, 0x18, 0x02, 0x0f, 0xd0, 0x20, 0x27, 0x4d, 0x5c, 0xa7, 0x25, 0x91, 0x9b, 0x46, - 0x14, 0x55, 0x4e, 0x95, 0x7c, 0x02, 0x5e, 0xac, 0x96, 0xc4, 0x31, 0xc8, 0x40, 0xaa, 0x9e, 0x90, - 0xc1, 0x53, 0xb0, 0x62, 0x6c, 0x0b, 0x9b, 0x54, 0xdc, 0xaa, 0x9e, 0xa2, 0xde, 0x7a, 0xad, 0xd4, - 0x53, 0x7b, 0xee, 0x07, 0xe8, 0xa1, 0xe7, 0xa8, 0xa7, 0xf6, 0xb6, 0xda, 0x43, 0xb4, 0xc9, 0x27, - 0xd8, 0x8f, 0xb0, 0xe3, 0xc1, 0x86, 0x81, 0x44, 0xb9, 0xec, 0x6a, 0xb5, 0x07, 0xc3, 0xbc, 0xfc, - 0xe6, 0xd1, 0xff, 0x79, 0xe6, 0x3f, 0x33, 0xb0, 0x69, 0x23, 0xff, 0x47, 0x67, 0x72, 0x65, 0xf4, - 0x25, 0x77, 0xe2, 0xf8, 0x0e, 0x97, 0x59, 0x0c, 0x08, 0xdb, 0x43, 0x67, 0xe8, 0x90, 0xd1, 0xe3, - 0xa0, 0x35, 0x07, 0xc4, 0x26, 0xe4, 0xbf, 0x71, 0x3c, 0xcf, 0x74, 0x2f, 0x90, 0xe7, 0xe9, 0x43, - 0xc4, 0x95, 0x81, 0xf5, 0x67, 0x2e, 0xe2, 0x99, 0x03, 0xa6, 0xf4, 0xd1, 0xc9, 0x8e, 0xb4, 0x8c, - 0x18, 0x12, 0x1d, 0x3c, 0xab, 0x11, 0x86, 0xe3, 0x80, 0x35, 0x74, 0x5f, 0xe7, 0xe3, 0x98, 0xcd, - 0x69, 0xa4, 0x2d, 0xde, 0xc7, 0x21, 0xa7, 0xce, 0xd7, 0xc8, 0xd7, 0xc8, 0xf6, 0xb9, 0xaf, 0x57, - 0x02, 0x7e, 0x4a, 0x05, 0xa4, 0x31, 0x89, 0x0a, 0xdb, 0x80, 0x94, 0xd5, 0xf3, 0xcd, 0x31, 0x22, - 0x81, 0xd9, 0xea, 0xc9, 0xed, 0xdd, 0x7e, 0xec, 0xe5, 0xdd, 0x7e, 0x79, 0x68, 0xfa, 0xa3, 0x69, - 0x5f, 0x1a, 0x38, 0xe3, 0xe3, 0x91, 0xee, 0x8d, 0xcc, 0x81, 0x33, 0x71, 0x8f, 0x3d, 0x34, 0xf9, - 0x81, 0xfc, 0x48, 0x8a, 0x3e, 0x76, 0x9d, 0x89, 0xdf, 0xc1, 0x2b, 0xb5, 0xa4, 0x15, 0xfc, 0x71, - 0x7b, 0x90, 0xb1, 0x1d, 0x03, 0xf5, 0x6c, 0x1d, 0x47, 0x4b, 0xe0, 0x68, 0x19, 0x2d, 0x1d, 0x0c, - 0xa8, 0xb8, 0xcf, 0x7d, 0x05, 0x10, 0x8a, 0xe9, 0x99, 0x06, 0xcf, 0x06, 0xb3, 0xd5, 0xfc, 0xc3, - 0xdd, 0x7e, 0x26, 0x14, 0xd6, 0xa8, 0x6b, 0x51, 0xfd, 0x1a, 0x86, 0x78, 0xc3, 0x00, 0x1b, 0x88, - 0xe4, 0x4a, 0xb0, 0xd1, 0x50, 0x2f, 0x2b, 0x4a, 0xa3, 0x5e, 0x88, 0x09, 0x7b, 0xbf, 0xfc, 0x7e, - 0xb0, 0x4b, 0x27, 0x12, 0x20, 0x0d, 0xfb, 0x5a, 0xb7, 0x4c, 0x83, 0x13, 0x81, 0x3d, 0x6b, 0x36, - 0xd4, 0x02, 0x23, 0xf0, 0x18, 0xdb, 0x5e, 0xc7, 0xce, 0x1c, 0xd3, 0xe6, 0x0e, 0x21, 0xa9, 0xc8, - 0x95, 0x4b, 0xb9, 0x10, 0x17, 0x3e, 0xc1, 0xd0, 0xc7, 0xeb, 0x90, 0x82, 0xf4, 0x6b, 0x24, 0xe4, - 0x6e, 0xfe, 0x28, 0xc6, 0xfe, 0xfe, 0xb3, 0x48, 0x14, 0x88, 0xff, 0x30, 0xcb, 0x1a, 0xdb, 0xfe, - 0x64, 0xb6, 0x96, 0x09, 0xf3, 0x7c, 0x26, 0xef, 0xad, 0xbe, 0x3c, 0x6c, 0x58, 0x58, 0xbd, 0x69, - 0x0f, 0x49, 0x71, 0xd3, 0x5a, 0xd4, 0x15, 0x7f, 0x65, 0x60, 0x33, 0x94, 0xd6, 0x9a, 0x7a, 0xa3, - 0xd6, 0xd4, 0xb2, 0x28, 0x55, 0xcc, 0xdb, 0xaa, 0x3a, 0x85, 0x74, 0x98, 0xad, 0x87, 0x53, 0x4c, - 0x94, 0xb2, 0x27, 0xbb, 0x4f, 0xd8, 0x2e, 0xa8, 0x9c, 0xb6, 0x00, 0xc5, 0x7f, 0x13, 0x00, 0x1d, - 0xbd, 0x6f, 0xa1, 0xb9, 0x6d, 0xa5, 0x15, 0xdb, 0x0a, 0xd4, 0xfa, 0x25, 0xf4, 0xc1, 0x9b, 0x96, - 0xfb, 0x0c, 0xc0, 0x0f, 0xe4, 0xce, 0x63, 0x25, 0x49, 0xac, 0x0c, 0x19, 0x21, 0xc1, 0x0a, 0x90, - 0xb8, 0x42, 0x33, 0x3e, 0x45, 0xc6, 0x83, 0x26, 0xb7, 0x0d, 0x49, 0xec, 0xdd, 0x29, 0xe2, 0x37, - 0xc8, 0x99, 0x9e, 0x77, 0xc4, 0xbf, 0x22, 0xef, 0x1f, 0xd1, 0xde, 0x27, 0x7e, 0x5d, 0x56, 0x83, - 0x76, 0xfe, 0x21, 0xa4, 0x6a, 0x9a, 0x5c, 0xe9, 0xc8, 0x91, 0xf7, 0x57, 0xb1, 0xda, 0x04, 0xe9, - 0x3e, 0x0a, 0xa8, 0x6e, 0xab, 0x1e, 0x50, 0xf1, 0xa7, 0xa8, 0xae, 0x6b, 0x84, 0x54, 0x5d, 0x56, - 0x64, 0x4c, 0x25, 0x9e, 0xa2, 0xea, 0xc8, 0x42, 0xfe, 0xfa, 0x09, 0xf9, 0x1f, 0x1b, 0xac, 0x3a, - 0xb5, 0xae, 0xda, 0x33, 0x7b, 0x10, 0xdd, 0x6c, 0xef, 0xd0, 0x60, 0x07, 0x90, 0x9d, 0xda, 0x9e, - 0x63, 0x99, 0x03, 0xd3, 0x47, 0x06, 0xd9, 0xf1, 0xb4, 0x46, 0x0f, 0x3d, 0xbf, 0x87, 0x02, 0xe5, - 0x4f, 0x16, 0xfb, 0x33, 0xb3, 0xb4, 0x61, 0x70, 0x68, 0x5c, 0x7d, 0x66, 0x39, 0xba, 0x41, 0xb6, - 0x2b, 0xa7, 0x45, 0x5d, 0xf1, 0x67, 0x9c, 0x53, 0xcd, 0xc1, 0x5a, 0xa6, 0xb6, 0x11, 0xe5, 0x54, - 0x87, 0xf4, 0x78, 0xde, 0xf4, 0x70, 0x56, 0x81, 0xd3, 0x4b, 0x94, 0x53, 0xd7, 0x68, 0xa9, 0x6d, - 0x8e, 0x5d, 0x0b, 0x85, 0x3d, 0x6d, 0xb1, 0x52, 0xf8, 0x12, 0xf2, 0x2b, 0x53, 0x81, 0x88, 0x56, - 0x28, 0x82, 0x59, 0x11, 0x51, 0xfe, 0x2d, 0x0e, 0x59, 0xea, 0x21, 0xe0, 0x3e, 0xa7, 0x0d, 0xb1, - 0x83, 0x77, 0x87, 0xa3, 0x66, 0x23, 0x37, 0x48, 0x90, 0x57, 0xe5, 0xce, 0x77, 0x4d, 0xed, 0xbc, - 0x27, 0x5f, 0xca, 0x6a, 0x07, 0x9b, 0x82, 0xdc, 0x9b, 0x14, 0xba, 0xf2, 0x64, 0x94, 0x21, 0xdb, - 0xa9, 0x54, 0x15, 0x39, 0xa4, 0xc3, 0x9b, 0x91, 0xa2, 0xa9, 0x73, 0x7a, 0x04, 0x99, 0x56, 0xb7, - 0xfd, 0x6d, 0xaf, 0xd5, 0x55, 0x14, 0x6c, 0x90, 0x5d, 0x4c, 0x6e, 0x51, 0xe4, 0xe2, 0x7a, 0xc1, - 0x5c, 0xb5, 0xab, 0x9c, 0xf7, 0xda, 0xdf, 0xab, 0xb5, 0x02, 0xfb, 0x88, 0x8b, 0xcc, 0xc2, 0x7d, - 0x01, 0xe9, 0x5a, 0xf3, 0xa2, 0xd5, 0xec, 0xaa, 0xf5, 0x42, 0xf2, 0x11, 0x16, 0x55, 0x54, 0xd8, - 0x0a, 0xed, 0x46, 0x17, 0xa3, 0xca, 0xbf, 0xb8, 0x2f, 0xc6, 0x5e, 0xdf, 0x17, 0x99, 0x9f, 0x1e, - 0x8a, 0xcc, 0x2d, 0xfe, 0xfe, 0xc3, 0xdf, 0x2b, 0xfc, 0xf5, 0x53, 0xe4, 0xb5, 0x3d, 0x7d, 0x13, - 0x00, 0x00, 0xff, 0xff, 0x7d, 0x9c, 0x5f, 0x56, 0xa1, 0x07, 0x00, 0x00, + // 887 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xcc, 0x96, 0xc1, 0x6e, 0xe3, 0x44, + 0x18, 0xc7, 0xeb, 0xc4, 0x49, 0xe3, 0xaf, 0x0d, 0x1b, 0xbc, 0xdd, 0xad, 0xd7, 0x0b, 0x49, 0x31, + 0xcb, 0x2a, 0x44, 0xe0, 0xa2, 0xee, 0x13, 0x24, 0xb1, 0x05, 0xd9, 0xf5, 0x3a, 0x91, 0x93, 0x14, + 0x71, 0x8a, 0x9c, 0x78, 0x48, 0xac, 0x3a, 0xb6, 0x15, 0x3b, 0x45, 0x39, 0x81, 0x38, 0xad, 0x78, + 0x07, 0x4e, 0xcb, 0x99, 0x07, 0xe0, 0xc0, 0x89, 0xc3, 0x8a, 0x13, 0xdc, 0x10, 0x87, 0x8a, 0xee, + 0x13, 0xf0, 0x08, 0x8c, 0xc7, 0x76, 0x32, 0x4e, 0xa3, 0x5e, 0x40, 0xc0, 0xc1, 0xad, 0x67, 0xe6, + 0xe7, 0xcf, 0xdf, 0xf7, 0x9f, 0xff, 0xe7, 0x09, 0xdc, 0x71, 0x51, 0xf8, 0x85, 0xb7, 0xb8, 0xb0, + 0xc6, 0xb2, 0xbf, 0xf0, 0x42, 0x8f, 0xe7, 0xd6, 0x13, 0xe2, 0xd1, 0xd4, 0x9b, 0x7a, 0x64, 0xf6, + 0x34, 0xba, 0x8b, 0x01, 0xa9, 0x0b, 0xe5, 0x8f, 0xbd, 0x20, 0xb0, 0xfd, 0xe7, 0x28, 0x08, 0xcc, + 0x29, 0xe2, 0x1b, 0xc0, 0x86, 0x2b, 0x1f, 0x09, 0xcc, 0x09, 0x53, 0x7f, 0xe3, 0xec, 0xbe, 0xbc, + 0x89, 0x98, 0x10, 0x03, 0xbc, 0x6a, 0x10, 0x86, 0xe7, 0x81, 0xb5, 0xcc, 0xd0, 0x14, 0x72, 0x98, + 0x3d, 0x34, 0xc8, 0xbd, 0xf4, 0x32, 0x07, 0x9c, 0xee, 0x59, 0x48, 0xbd, 0x44, 0x6e, 0xc8, 0x7f, + 0x98, 0x89, 0xf6, 0x80, 0x8a, 0xb6, 0x66, 0x64, 0x2a, 0x60, 0x07, 0x8a, 0xce, 0x28, 0xb4, 0xe7, + 0x88, 0x84, 0x64, 0x5b, 0x67, 0xaf, 0xae, 0x6a, 0x7b, 0xbf, 0x5f, 0xd5, 0x1a, 0x53, 0x3b, 0x9c, + 0x2d, 0xc7, 0xf2, 0xc4, 0x9b, 0x9f, 0xce, 0xcc, 0x60, 0x66, 0x4f, 0xbc, 0x85, 0x7f, 0x1a, 0xa0, + 0xc5, 0xe7, 0xe4, 0x8f, 0xac, 0x99, 0x73, 0xdf, 0x5b, 0x84, 0x03, 0xfc, 0xa4, 0x51, 0x70, 0xa2, + 0x7f, 0xfc, 0x43, 0xe0, 0x5c, 0xfc, 0x8a, 0x91, 0x6b, 0xe2, 0x68, 0x79, 0x1c, 0x8d, 0x33, 0x4a, + 0xd1, 0x84, 0x8e, 0xc7, 0xd2, 0x97, 0xc0, 0x46, 0x6f, 0xe5, 0xdf, 0x83, 0xfd, 0x8e, 0x7e, 0xde, + 0xd4, 0x3a, 0x4a, 0x65, 0x4f, 0x14, 0xbe, 0xf9, 0xf6, 0xe4, 0x68, 0x9d, 0x56, 0xb4, 0xde, 0x71, + 0x2f, 0x4d, 0xc7, 0xb6, 0xf8, 0x1a, 0xb0, 0x4f, 0xbb, 0x1d, 0xbd, 0xc2, 0x88, 0xf7, 0x30, 0xf3, + 0x66, 0x86, 0x79, 0xea, 0xd9, 0x2e, 0xff, 0x0e, 0x14, 0x34, 0xb5, 0x79, 0xae, 0x56, 0x72, 0xe2, + 0x7d, 0x4c, 0xf0, 0x19, 0x42, 0x43, 0xe6, 0x25, 0x12, 0x0f, 0x5f, 0xbc, 0xac, 0xee, 0xfd, 0xf0, + 0x5d, 0x95, 0xbc, 0x58, 0xba, 0xce, 0xc1, 0xa1, 0x1e, 0x6b, 0x11, 0x0b, 0xf5, 0x51, 0x46, 0xa8, + 0xb7, 0x68, 0xa1, 0x28, 0xec, 0x3f, 0xd0, 0x8a, 0xff, 0x00, 0x20, 0x49, 0x66, 0x64, 0x5b, 0x02, + 0x1b, 0xad, 0xb6, 0xca, 0xaf, 0xaf, 0x6a, 0x5c, 0x92, 0x58, 0x47, 0x31, 0x52, 0x97, 0x75, 0x2c, + 0xe9, 0x05, 0x93, 0x48, 0x5b, 0xa7, 0xa5, 0x7d, 0x88, 0x45, 0x39, 0xa6, 0x0b, 0xa1, 0xd5, 0x95, + 0xd6, 0xea, 0xc6, 0x3b, 0xb0, 0x85, 0x11, 0x81, 0x1f, 0x6d, 0x04, 0x7e, 0x80, 0xa1, 0x7b, 0xdb, + 0xd0, 0x2e, 0x8d, 0x7f, 0x64, 0x36, 0x1a, 0xbb, 0xe1, 0x62, 0xb5, 0x55, 0x09, 0x73, 0x7b, 0x25, + 0xff, 0x9a, 0xbe, 0x02, 0xec, 0x3b, 0x38, 0x7b, 0xdb, 0x9d, 0x12, 0x71, 0x4b, 0x46, 0x3a, 0x94, + 0xbe, 0x67, 0xe0, 0x4e, 0x92, 0x5a, 0x6f, 0x19, 0xcc, 0x7a, 0x4b, 0xc7, 0xa1, 0xb2, 0x62, 0xfe, + 0x6e, 0x56, 0x4f, 0xa0, 0x94, 0x54, 0x1b, 0xe0, 0x12, 0xf3, 0xf5, 0x83, 0xb3, 0xe3, 0x1d, 0xb6, + 0x8b, 0x94, 0x33, 0xd6, 0xe0, 0xed, 0x6d, 0xf5, 0x73, 0x1e, 0x60, 0x60, 0x8e, 0x9d, 0xa4, 0xf9, + 0xe5, 0x8c, 0xa7, 0x45, 0x2a, 0xf8, 0x06, 0xfa, 0xdf, 0x3b, 0x9a, 0x7f, 0x1b, 0x20, 0x8c, 0xd2, + 0x8d, 0x63, 0x15, 0x48, 0x2c, 0x8e, 0xcc, 0x90, 0x60, 0x15, 0xc8, 0x5f, 0xa0, 0x95, 0x50, 0x24, + 0xf3, 0xd1, 0x2d, 0x7f, 0x04, 0x05, 0x6c, 0xec, 0x25, 0x12, 0xf6, 0xc9, 0x67, 0x31, 0x1e, 0x44, + 0x9b, 0x19, 0x37, 0xc6, 0x63, 0xba, 0x31, 0x88, 0x99, 0x37, 0x6a, 0xd0, 0x6d, 0xf1, 0x08, 0x8a, + 0x6d, 0x43, 0x6d, 0x0e, 0xd4, 0xb4, 0x31, 0xb2, 0x58, 0x7b, 0x81, 0xcc, 0x10, 0x45, 0xd4, 0xb0, + 0xa7, 0x44, 0x54, 0x6e, 0x17, 0x35, 0xf4, 0xad, 0x84, 0x52, 0x54, 0x4d, 0xc5, 0x54, 0x7e, 0x17, + 0xa5, 0x20, 0x07, 0x85, 0xdb, 0xed, 0xf3, 0x2b, 0x76, 0x5f, 0x6b, 0xe9, 0x5c, 0xf4, 0x57, 0xee, + 0x24, 0x3d, 0x1c, 0xfe, 0x41, 0xf7, 0x9d, 0xc0, 0xc1, 0xd2, 0x0d, 0x3c, 0xc7, 0x9e, 0xd8, 0x21, + 0xb2, 0xc8, 0x8e, 0x97, 0x0c, 0x7a, 0xea, 0xf6, 0x3d, 0x14, 0x29, 0xf3, 0xb2, 0xd8, 0xbc, 0x1c, + 0xe5, 0x51, 0xdc, 0x51, 0xbe, 0xb9, 0x72, 0x3c, 0xd3, 0x22, 0xdb, 0x75, 0x68, 0xa4, 0x43, 0xe9, + 0x6b, 0x5c, 0x53, 0xdb, 0xc3, 0xb9, 0x2c, 0x5d, 0x2b, 0xad, 0x49, 0x81, 0xd2, 0x3c, 0xbe, 0x0d, + 0x70, 0x55, 0x51, 0x1b, 0xd4, 0x29, 0xa7, 0x6e, 0xd1, 0x72, 0xdf, 0x9e, 0xfb, 0x0e, 0x4a, 0x46, + 0xc6, 0xfa, 0x49, 0xf1, 0x7d, 0x28, 0x67, 0x96, 0xa2, 0x24, 0x7a, 0x49, 0x12, 0x4c, 0x26, 0x89, + 0xc6, 0x4f, 0x39, 0x38, 0xa0, 0xce, 0x52, 0xfe, 0x5d, 0xda, 0x10, 0xe4, 0xf8, 0xa0, 0x56, 0x53, + 0x37, 0xc8, 0x50, 0xd6, 0xd5, 0xc1, 0xa7, 0x5d, 0xe3, 0xd9, 0x48, 0x3d, 0x57, 0xf5, 0x01, 0x36, + 0x05, 0xf9, 0xa8, 0x52, 0x68, 0xe6, 0x3c, 0x69, 0xc0, 0xc1, 0xa0, 0xd9, 0xd2, 0xd4, 0x84, 0x4e, + 0x3e, 0x9b, 0x14, 0x4d, 0xf5, 0xe9, 0x63, 0xe0, 0x7a, 0xc3, 0xfe, 0x27, 0xa3, 0xde, 0x50, 0xd3, + 0xb0, 0x41, 0x8e, 0x31, 0x79, 0x97, 0x22, 0xd7, 0xdf, 0x1e, 0xcc, 0xb5, 0x86, 0xda, 0xb3, 0x51, + 0xff, 0x33, 0xbd, 0x5d, 0x61, 0x6f, 0x70, 0xa9, 0x59, 0xf0, 0xa9, 0x5a, 0x6a, 0x77, 0x9f, 0xf7, + 0xba, 0x43, 0x5d, 0xa9, 0x14, 0x6e, 0x60, 0xa9, 0xa2, 0xf8, 0x84, 0x00, 0xbd, 0xab, 0xa4, 0x19, + 0x16, 0x63, 0x63, 0xd2, 0xf5, 0xa4, 0x87, 0xa8, 0x78, 0x37, 0x31, 0x26, 0x2d, 0x5b, 0x4b, 0xf8, + 0xed, 0xba, 0xba, 0xf7, 0xe7, 0x75, 0x95, 0xf9, 0xea, 0x75, 0x95, 0x79, 0x85, 0xaf, 0x5f, 0xf0, + 0xf5, 0x07, 0xbe, 0xc6, 0x45, 0xf2, 0xd3, 0xe6, 0xc9, 0x5f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x21, + 0x78, 0x72, 0xc3, 0x0e, 0x09, 0x00, 0x00, } diff --git a/vendor/src/github.com/docker/libnetwork/networkdb/networkdb.proto b/vendor/src/github.com/docker/libnetwork/networkdb/networkdb.proto index dbc7a921d0..7df1b42dca 100644 --- a/vendor/src/github.com/docker/libnetwork/networkdb/networkdb.proto +++ b/vendor/src/github.com/docker/libnetwork/networkdb/networkdb.proto @@ -41,6 +41,10 @@ enum MessageType { // which is a pack of many message of above types, packed into // a single compound message. COMPOUND = 5 [(gogoproto.enumvalue_customname) = "MessageTypeCompound"]; + + // NodeEvent message type is used to communicare node + // join/leave events in the cluster + NODE_EVENT = 6 [(gogoproto.enumvalue_customname) = "MessageTypeNodeEvent"]; } // GossipMessage is a basic message header used by all messages types. @@ -49,6 +53,29 @@ message GossipMessage { bytes data = 2; // Payload of the message of any type defined here. } +// NodeEvent message payload definition. +message NodeEvent { + enum Type { + option (gogoproto.goproto_enum_prefix) = false; + option (gogoproto.enum_customname) = "Type"; + + INVALID = 0 [(gogoproto.enumvalue_customname) = "NodeEventTypeInvalid"]; + // Join event is generated when this node joins the cluster. + JOIN = 1 [(gogoproto.enumvalue_customname) = "NodeEventTypeJoin"];; + // Leave event is generated when this node leaves the cluster. + LEAVE = 2 [(gogoproto.enumvalue_customname) = "NodeEventTypeLeave"];; + } + + Type type = 1; + + // Lamport time using a network lamport clock indicating the + // time this event was generated on the node where it was + // generated. + uint64 l_time = 2 [(gogoproto.customtype) = "github.com/hashicorp/serf/serf.LamportTime", (gogoproto.nullable) = false]; + // Source node name. + string node_name = 3; +} + // NetworkEvent message payload definition. message NetworkEvent { enum Type { diff --git a/vendor/src/github.com/docker/libnetwork/osl/neigh_linux.go b/vendor/src/github.com/docker/libnetwork/osl/neigh_linux.go index c5c6c103d0..a7669a111b 100644 --- a/vendor/src/github.com/docker/libnetwork/osl/neigh_linux.go +++ b/vendor/src/github.com/docker/libnetwork/osl/neigh_linux.go @@ -32,7 +32,7 @@ func (n *networkNamespace) findNeighbor(dstIP net.IP, dstMac net.HardwareAddr) * return nil } -func (n *networkNamespace) DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr) error { +func (n *networkNamespace) DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr, osDelete bool) error { var ( iface netlink.Link err error @@ -43,42 +43,46 @@ func (n *networkNamespace) DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr) return fmt.Errorf("could not find the neighbor entry to delete") } - n.Lock() - nlh := n.nlHandle - n.Unlock() + if osDelete { + n.Lock() + nlh := n.nlHandle + n.Unlock() - if nh.linkDst != "" { - iface, err = nlh.LinkByName(nh.linkDst) - if err != nil { - return fmt.Errorf("could not find interface with destination name %s: %v", - nh.linkDst, err) + if nh.linkDst != "" { + iface, err = nlh.LinkByName(nh.linkDst) + if err != nil { + return fmt.Errorf("could not find interface with destination name %s: %v", + nh.linkDst, err) + } + } + + nlnh := &netlink.Neigh{ + IP: dstIP, + State: netlink.NUD_PERMANENT, + Family: nh.family, + } + + if nlnh.Family > 0 { + nlnh.HardwareAddr = dstMac + nlnh.Flags = netlink.NTF_SELF + } + + if nh.linkDst != "" { + nlnh.LinkIndex = iface.Attrs().Index + } + + if err := nlh.NeighDel(nlnh); err != nil { + return fmt.Errorf("could not delete neighbor entry: %v", err) } } - nlnh := &netlink.Neigh{ - IP: dstIP, - State: netlink.NUD_PERMANENT, - Family: nh.family, - } - - if nlnh.Family > 0 { - nlnh.HardwareAddr = dstMac - nlnh.Flags = netlink.NTF_SELF - } - - if nh.linkDst != "" { - nlnh.LinkIndex = iface.Attrs().Index - } - - if err := nlh.NeighDel(nlnh); err != nil { - return fmt.Errorf("could not delete neighbor entry: %v", err) - } - + n.Lock() for i, nh := range n.neighbors { if nh.dstIP.Equal(dstIP) && bytes.Equal(nh.dstMac, dstMac) { n.neighbors = append(n.neighbors[:i], n.neighbors[i+1:]...) } } + n.Unlock() return nil } diff --git a/vendor/src/github.com/docker/libnetwork/osl/sandbox.go b/vendor/src/github.com/docker/libnetwork/osl/sandbox.go index 75968a0f87..18085c9082 100644 --- a/vendor/src/github.com/docker/libnetwork/osl/sandbox.go +++ b/vendor/src/github.com/docker/libnetwork/osl/sandbox.go @@ -42,7 +42,7 @@ type Sandbox interface { AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, option ...NeighOption) error // DeleteNeighbor deletes neighbor entry from the sandbox. - DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr) error + DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr, osDelete bool) error // Returns an interface with methods to set neighbor options. NeighborOptions() NeighborOptionSetter diff --git a/vendor/src/github.com/docker/libnetwork/resolver.go b/vendor/src/github.com/docker/libnetwork/resolver.go index b9eb4f3ff2..85b87ffb18 100644 --- a/vendor/src/github.com/docker/libnetwork/resolver.go +++ b/vendor/src/github.com/docker/libnetwork/resolver.go @@ -354,8 +354,8 @@ func (r *resolver) ServeDNS(w dns.ResponseWriter, query *dns.Msg) { extConn, err = net.DialTimeout(proto, addr, extIOTimeout) } - r.sb.execFunc(extConnect) - if err != nil { + execErr := r.sb.execFunc(extConnect) + if execErr != nil || err != nil { log.Debugf("Connect failed, %s", err) continue } diff --git a/vendor/src/github.com/docker/libnetwork/sandbox.go b/vendor/src/github.com/docker/libnetwork/sandbox.go index 52e9bb6783..b796b010d2 100644 --- a/vendor/src/github.com/docker/libnetwork/sandbox.go +++ b/vendor/src/github.com/docker/libnetwork/sandbox.go @@ -147,7 +147,7 @@ func (sb *sandbox) Key() string { func (sb *sandbox) Labels() map[string]interface{} { sb.Lock() - sb.Unlock() + defer sb.Unlock() opts := make(map[string]interface{}, len(sb.config.generic)) for k, v := range sb.config.generic { opts[k] = v @@ -202,12 +202,14 @@ func (sb *sandbox) delete(force bool) error { retain := false for _, ep := range sb.getConnectedEndpoints() { // gw network endpoint detach and removal are automatic - if ep.endpointInGWNetwork() { + if ep.endpointInGWNetwork() && !force { continue } // Retain the sanbdox if we can't obtain the network from store. if _, err := c.getNetworkFromStore(ep.getNetwork().ID()); err != nil { - retain = true + if c.isDistributedControl() { + retain = true + } log.Warnf("Failed getting network for ep %s during sandbox %s delete: %v", ep.ID(), sb.ID(), err) continue } @@ -434,8 +436,8 @@ func (sb *sandbox) ResolveIP(ip string) string { return svc } -func (sb *sandbox) execFunc(f func()) { - sb.osSbox.InvokeFunc(f) +func (sb *sandbox) execFunc(f func()) error { + return sb.osSbox.InvokeFunc(f) } func (sb *sandbox) ResolveService(name string) ([]*net.SRV, []net.IP, error) { @@ -705,9 +707,12 @@ func (sb *sandbox) SetKey(basePath string) error { if oldosSbox != nil && sb.resolver != nil { sb.resolver.Stop() - sb.osSbox.InvokeFunc(sb.resolver.SetupFunc()) - if err := sb.resolver.Start(); err != nil { - log.Errorf("Resolver Setup/Start failed for container %s, %q", sb.ContainerID(), err) + if err := sb.osSbox.InvokeFunc(sb.resolver.SetupFunc()); err == nil { + if err := sb.resolver.Start(); err != nil { + log.Errorf("Resolver Start failed for container %s, %q", sb.ContainerID(), err) + } + } else { + log.Errorf("Resolver Setup Function failed for container %s, %q", sb.ContainerID(), err) } } diff --git a/vendor/src/github.com/docker/libnetwork/sandbox_dns_unix.go b/vendor/src/github.com/docker/libnetwork/sandbox_dns_unix.go index 0c649a9b9e..bc1c1a4993 100644 --- a/vendor/src/github.com/docker/libnetwork/sandbox_dns_unix.go +++ b/vendor/src/github.com/docker/libnetwork/sandbox_dns_unix.go @@ -44,9 +44,13 @@ func (sb *sandbox) startResolver(restore bool) { } sb.resolver.SetExtServers(sb.extDNS) - sb.osSbox.InvokeFunc(sb.resolver.SetupFunc()) + if err = sb.osSbox.InvokeFunc(sb.resolver.SetupFunc()); err != nil { + log.Errorf("Resolver Setup function failed for container %s, %q", sb.ContainerID(), err) + return + } + if err = sb.resolver.Start(); err != nil { - log.Errorf("Resolver Setup/Start failed for container %s, %q", sb.ContainerID(), err) + log.Errorf("Resolver Start failed for container %s, %q", sb.ContainerID(), err) } }) } diff --git a/vendor/src/github.com/docker/libnetwork/service.go b/vendor/src/github.com/docker/libnetwork/service.go index 30a17c5056..a957026b2f 100644 --- a/vendor/src/github.com/docker/libnetwork/service.go +++ b/vendor/src/github.com/docker/libnetwork/service.go @@ -45,6 +45,9 @@ type service struct { // List of ingress ports exposed by the service ingressPorts portConfigs + // Service aliases + aliases []string + sync.Mutex } diff --git a/vendor/src/github.com/docker/libnetwork/service_linux.go b/vendor/src/github.com/docker/libnetwork/service_linux.go index 9dc27f5578..5e3802a823 100644 --- a/vendor/src/github.com/docker/libnetwork/service_linux.go +++ b/vendor/src/github.com/docker/libnetwork/service_linux.go @@ -26,17 +26,55 @@ import ( func init() { reexec.Register("fwmarker", fwMarker) + reexec.Register("redirecter", redirecter) } -func newService(name string, id string, ingressPorts []*PortConfig) *service { +func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service { return &service{ name: name, id: id, ingressPorts: ingressPorts, loadBalancers: make(map[string]*loadBalancer), + aliases: aliases, } } +func (c *controller) cleanupServiceBindings(cleanupNID string) { + var cleanupFuncs []func() + c.Lock() + for _, s := range c.serviceBindings { + s.Lock() + for nid, lb := range s.loadBalancers { + if cleanupNID != "" && nid != cleanupNID { + continue + } + + for eid, ip := range lb.backEnds { + service := s + loadBalancer := lb + networkID := nid + epID := eid + epIP := ip + + cleanupFuncs = append(cleanupFuncs, func() { + if err := c.rmServiceBinding(service.name, service.id, networkID, epID, loadBalancer.vip, + service.ingressPorts, service.aliases, epIP); err != nil { + logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v", + service.id, networkID, epID, err) + } + }) + } + } + s.Unlock() + } + c.Unlock() + + for _, f := range cleanupFuncs { + f() + } + +} + func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error { var ( s *service @@ -58,7 +96,7 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i if !ok { // Create a new service if we are seeing this service // for the first time. - s = newService(name, sid, ingressPorts) + s = newService(name, sid, ingressPorts, aliases) c.serviceBindings[skey] = s } c.Unlock() @@ -230,9 +268,20 @@ func (n *network) connectedLoadbalancers() []*loadBalancer { func (sb *sandbox) populateLoadbalancers(ep *endpoint) { var gwIP net.IP + // This is an interface less endpoint. Nothing to do. + if ep.Iface() == nil { + return + } + n := ep.getNetwork() eIP := ep.Iface().Address() + if n.ingress { + if err := addRedirectRules(sb.Key(), eIP, ep.ingressPorts); err != nil { + logrus.Errorf("Failed to add redirect rules for ep %s: %v", ep.Name(), err) + } + } + if sb.ingress { // For the ingress sandbox if this is not gateway // endpoint do nothing. @@ -338,17 +387,17 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P } if addService { - var iPorts []*PortConfig + var filteredPorts []*PortConfig if sb.ingress { - iPorts = filterPortConfigs(ingressPorts, false) - if err := programIngress(gwIP, iPorts, false); err != nil { + filteredPorts = filterPortConfigs(ingressPorts, false) + if err := programIngress(gwIP, filteredPorts, false); err != nil { logrus.Errorf("Failed to add ingress: %v", err) return } } - logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, iPorts) - if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, false); err != nil { + logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, ingressPorts) + if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, false); err != nil { logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err) return } @@ -411,15 +460,15 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po logrus.Errorf("Failed to delete a new service for vip %s fwmark %d: %v", vip, fwMark, err) } - var iPorts []*PortConfig + var filteredPorts []*PortConfig if sb.ingress { - iPorts = filterPortConfigs(ingressPorts, true) - if err := programIngress(gwIP, iPorts, true); err != nil { + filteredPorts = filterPortConfigs(ingressPorts, true) + if err := programIngress(gwIP, filteredPorts, true); err != nil { logrus.Errorf("Failed to delete ingress: %v", err) } } - if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, true); err != nil { + if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, true); err != nil { logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err) } } @@ -479,14 +528,19 @@ func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) erro } chainExists := iptables.ExistChain(ingressChain, iptables.Nat) + filterChainExists := iptables.ExistChain(ingressChain, iptables.Filter) ingressOnce.Do(func() { + // Flush nat table and filter table ingress chain rules during init if it + // exists. It might contain stale rules from previous life. if chainExists { - // Flush ingress chain rules during init if it - // exists. It might contain stale rules from - // previous life. if err := iptables.RawCombinedOutput("-t", "nat", "-F", ingressChain); err != nil { - logrus.Errorf("Could not flush ingress chain rules during init: %v", err) + logrus.Errorf("Could not flush nat table ingress chain rules during init: %v", err) + } + } + if filterChainExists { + if err := iptables.RawCombinedOutput("-F", ingressChain); err != nil { + logrus.Errorf("Could not flush filter table ingress chain rules during init: %v", err) } } }) @@ -497,10 +551,21 @@ func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) erro return fmt.Errorf("failed to create ingress chain: %v", err) } } + if !filterChainExists { + if err := iptables.RawCombinedOutput("-N", ingressChain); err != nil { + return fmt.Errorf("failed to create filter table ingress chain: %v", err) + } + } if !iptables.Exists(iptables.Nat, ingressChain, "-j", "RETURN") { if err := iptables.RawCombinedOutput("-t", "nat", "-A", ingressChain, "-j", "RETURN"); err != nil { - return fmt.Errorf("failed to add return rule in ingress chain: %v", err) + return fmt.Errorf("failed to add return rule in nat table ingress chain: %v", err) + } + } + + if !iptables.Exists(iptables.Filter, ingressChain, "-j", "RETURN") { + if err := iptables.RawCombinedOutput("-A", ingressChain, "-j", "RETURN"); err != nil { + return fmt.Errorf("failed to add return rule to filter table ingress chain: %v", err) } } @@ -512,6 +577,12 @@ func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) erro } } + if !iptables.Exists(iptables.Filter, "FORWARD", "-j", ingressChain) { + if err := iptables.RawCombinedOutput("-I", "FORWARD", "-j", ingressChain); err != nil { + return fmt.Errorf("failed to add jump rule to %s in filter table forward chain: %v", ingressChain, err) + } + } + oifName, err := findOIFName(gwIP) if err != nil { return fmt.Errorf("failed to find gateway bridge interface name for %s: %v", gwIP, err) @@ -544,6 +615,30 @@ func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) erro } } + // Filter table rules to allow a published service to be accessible in the local node from.. + // 1) service tasks attached to other networks + // 2) unmanaged containers on bridge networks + rule := strings.Fields(fmt.Sprintf("%s %s -m state -p %s --sport %d --state ESTABLISHED,RELATED -j ACCEPT", + addDelOpt, ingressChain, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort)) + if err := iptables.RawCombinedOutput(rule...); err != nil { + errStr := fmt.Sprintf("setting up rule failed, %v: %v", rule, err) + if !isDelete { + return fmt.Errorf("%s", errStr) + } + logrus.Warnf("%s", errStr) + } + + rule = strings.Fields(fmt.Sprintf("%s %s -p %s --dport %d -j ACCEPT", + addDelOpt, ingressChain, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort)) + if err := iptables.RawCombinedOutput(rule...); err != nil { + errStr := fmt.Sprintf("setting up rule failed, %v: %v", rule, err) + if !isDelete { + return fmt.Errorf("%s", errStr) + } + + logrus.Warnf("%s", errStr) + } + if err := plumbProxy(iPort, isDelete); err != nil { logrus.Warnf("failed to create proxy for port %d: %v", iPort.PublishedPort, err) } @@ -552,6 +647,22 @@ func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) erro return nil } +// In the filter table FORWARD chain first rule should be to jump to INGRESS-CHAIN +// This chain has the rules to allow access to the published ports for swarm tasks +// from local bridge networks and docker_gwbridge (ie:taks on other swarm netwroks) +func arrangeIngressFilterRule() { + if iptables.ExistChain(ingressChain, iptables.Filter) { + if iptables.Exists(iptables.Filter, "FORWARD", "-j", ingressChain) { + if err := iptables.RawCombinedOutput("-D", "FORWARD", "-j", ingressChain); err != nil { + logrus.Warnf("failed to delete jump rule to ingressChain in filter table: %v", err) + } + } + if err := iptables.RawCombinedOutput("-I", "FORWARD", "-j", ingressChain); err != nil { + logrus.Warnf("failed to add jump rule to ingressChain in filter table: %v", err) + } + } +} + func findOIFName(ip net.IP) (string, error) { nlh := ns.NlHandle() @@ -611,33 +722,57 @@ func plumbProxy(iPort *PortConfig, isDelete bool) error { return nil } +func writePortsToFile(ports []*PortConfig) (string, error) { + f, err := ioutil.TempFile("", "port_configs") + if err != nil { + return "", err + } + defer f.Close() + + buf, err := proto.Marshal(&EndpointRecord{ + IngressPorts: ports, + }) + + n, err := f.Write(buf) + if err != nil { + return "", err + } + + if n < len(buf) { + return "", io.ErrShortWrite + } + + return f.Name(), nil +} + +func readPortsFromFile(fileName string) ([]*PortConfig, error) { + buf, err := ioutil.ReadFile(fileName) + if err != nil { + return nil, err + } + + var epRec EndpointRecord + err = proto.Unmarshal(buf, &epRec) + if err != nil { + return nil, err + } + + return epRec.IngressPorts, nil +} + // Invoke fwmarker reexec routine to mark vip destined packets with // the passed firewall mark. func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error { var ingressPortsFile string + if len(ingressPorts) != 0 { - f, err := ioutil.TempFile("", "port_configs") + var err error + ingressPortsFile, err = writePortsToFile(ingressPorts) if err != nil { return err } - buf, err := proto.Marshal(&EndpointRecord{ - IngressPorts: ingressPorts, - }) - - n, err := f.Write(buf) - if err != nil { - f.Close() - return err - } - - if n < len(buf) { - f.Close() - return io.ErrShortWrite - } - - ingressPortsFile = f.Name() - f.Close() + defer os.Remove(ingressPortsFile) } addDelOpt := "-A" @@ -671,20 +806,12 @@ func fwMarker() { var ingressPorts []*PortConfig if os.Args[5] != "" { - buf, err := ioutil.ReadFile(os.Args[5]) + var err error + ingressPorts, err = readPortsFromFile(os.Args[5]) if err != nil { - logrus.Errorf("Failed to read ports config file: %v", err) + logrus.Errorf("Failed reading ingress ports file: %v", err) os.Exit(6) } - - var epRec EndpointRecord - err = proto.Unmarshal(buf, &epRec) - if err != nil { - logrus.Errorf("Failed to unmarshal ports config data: %v", err) - os.Exit(7) - } - - ingressPorts = epRec.IngressPorts } vip := os.Args[2] @@ -697,11 +824,7 @@ func fwMarker() { rules := [][]string{} for _, iPort := range ingressPorts { - rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j REDIRECT --to-port %d", - addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort)) - rules = append(rules, rule) - - rule = strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d", + rule := strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d", addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, fwMark)) rules = append(rules, rule) } @@ -748,3 +871,82 @@ func fwMarker() { } } } + +func addRedirectRules(path string, eIP *net.IPNet, ingressPorts []*PortConfig) error { + var ingressPortsFile string + + if len(ingressPorts) != 0 { + var err error + ingressPortsFile, err = writePortsToFile(ingressPorts) + if err != nil { + return err + } + defer os.Remove(ingressPortsFile) + } + + cmd := &exec.Cmd{ + Path: reexec.Self(), + Args: append([]string{"redirecter"}, path, eIP.String(), ingressPortsFile), + Stdout: os.Stdout, + Stderr: os.Stderr, + } + + if err := cmd.Run(); err != nil { + return fmt.Errorf("reexec failed: %v", err) + } + + return nil +} + +// Redirecter reexec function. +func redirecter() { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + if len(os.Args) < 4 { + logrus.Error("invalid number of arguments..") + os.Exit(1) + } + + var ingressPorts []*PortConfig + if os.Args[3] != "" { + var err error + ingressPorts, err = readPortsFromFile(os.Args[3]) + if err != nil { + logrus.Errorf("Failed reading ingress ports file: %v", err) + os.Exit(2) + } + } + + eIP, _, err := net.ParseCIDR(os.Args[2]) + if err != nil { + logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[2], err) + os.Exit(3) + } + + rules := [][]string{} + for _, iPort := range ingressPorts { + rule := strings.Fields(fmt.Sprintf("-t nat -A PREROUTING -d %s -p %s --dport %d -j REDIRECT --to-port %d", + eIP.String(), strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort)) + rules = append(rules, rule) + } + + ns, err := netns.GetFromPath(os.Args[1]) + if err != nil { + logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err) + os.Exit(4) + } + defer ns.Close() + + if err := netns.Set(ns); err != nil { + logrus.Errorf("setting into container net ns %v failed, %v", os.Args[1], err) + os.Exit(5) + } + + for _, rule := range rules { + if err := iptables.RawCombinedOutputNative(rule...); err != nil { + logrus.Errorf("setting up rule failed, %v: %v", rule, err) + os.Exit(5) + } + } +} diff --git a/vendor/src/github.com/docker/libnetwork/service_unsupported.go b/vendor/src/github.com/docker/libnetwork/service_unsupported.go index 9668dcc07e..0ae384a99c 100644 --- a/vendor/src/github.com/docker/libnetwork/service_unsupported.go +++ b/vendor/src/github.com/docker/libnetwork/service_unsupported.go @@ -7,6 +7,9 @@ import ( "net" ) +func (c *controller) cleanupServiceBindings(nid string) { +} + func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error { return fmt.Errorf("not supported") } @@ -17,3 +20,6 @@ func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, in func (sb *sandbox) populateLoadbalancers(ep *endpoint) { } + +func arrangeIngressFilterRule() { +} diff --git a/vendor/src/github.com/docker/libnetwork/store.go b/vendor/src/github.com/docker/libnetwork/store.go index b622836498..9830a22d47 100644 --- a/vendor/src/github.com/docker/libnetwork/store.go +++ b/vendor/src/github.com/docker/libnetwork/store.go @@ -346,6 +346,10 @@ func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, ecCh <-chan da } func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoint) { + if !c.isDistributedControl() && ep.getNetwork().driverScope() == datastore.GlobalScope { + return + } + c.Lock() nw, ok := nmap[ep.getNetwork().ID()] c.Unlock() @@ -400,6 +404,10 @@ func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoi } func (c *controller) processEndpointDelete(nmap map[string]*netWatch, ep *endpoint) { + if !c.isDistributedControl() && ep.getNetwork().driverScope() == datastore.GlobalScope { + return + } + c.Lock() nw, ok := nmap[ep.getNetwork().ID()] diff --git a/vendor/src/github.com/docker/libnetwork/support.sh b/vendor/src/github.com/docker/libnetwork/support.sh new file mode 100755 index 0000000000..e913bf33dd --- /dev/null +++ b/vendor/src/github.com/docker/libnetwork/support.sh @@ -0,0 +1,36 @@ +#!/usr/bin/env bash + +# Required tools +DOCKER="${DOCKER:-docker}" +NSENTER="${NSENTER:-nsenter}" +BRIDGE="${BRIDGE:-bridge}" +BRCTL="${BRCTL:-brctl}" +IPTABLES="${IPTABLES:-iptables}" + +NSDIR=/var/run/docker/netns +BRIDGEIF=br0 + +function die { + echo $* + exit 1 +} + +type -P ${DOCKER} > /dev/null || die "This tool requires the docker binary" +type -P ${NSENTER} > /dev/null || die "This tool requires nsenter" +type -P ${BRIDGE} > /dev/null || die "This tool requires bridge" +type -P ${BRCTL} > /dev/null || die "This tool requires brctl" +type -P ${IPTABLES} > /dev/null || die "This tool requires iptables" + +echo "iptables configuration" +${IPTABLES} -n -v -L -t filter +${IPTABLES} -n -v -L -t nat +echo "" + +echo "Overlay network configuration" +for networkID in $(${DOCKER} network ls --filter driver=overlay -q) ; do + echo "Network ${networkID}" + nspath=(${NSDIR}/*-$(echo ${networkID}| cut -c1-10)) + ${NSENTER} --net=${nspath[0]} ${BRIDGE} fdb show ${BRIDGEIF} + ${NSENTER} --net=${nspath[0]} ${BRCTL} showmacs ${BRIDGEIF} + echo "" +done