Vendoring libnetwork e282a91b294ab413a172b3c4e37d15fa92d79ef5

Carries a bunch of patches to resolve routing-mesh and swarm-mode
networking issues.

Signed-off-by: Madhu Venugopal <madhu@docker.com>
This commit is contained in:
Madhu Venugopal 2016-09-23 17:26:40 -07:00
parent 07fcfd0c28
commit 0ce34bdb12
29 changed files with 1302 additions and 254 deletions

View file

@ -65,7 +65,7 @@ clone git github.com/RackSec/srslog 259aed10dfa74ea2961eddd1d9847619f6e98837
clone git github.com/imdario/mergo 0.2.1
#get libnetwork packages
clone git github.com/docker/libnetwork 82fb373e3eaa4e9bbb5b5ac148b0a3a71f80fca6
clone git github.com/docker/libnetwork e282a91b294ab413a172b3c4e37d15fa92d79ef5
clone git github.com/docker/go-events afb2b9f2c23f33ada1a22b03651775fdc65a5089
clone git github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
clone git github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec

View file

@ -9,6 +9,7 @@ import (
"sort"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/go-events"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/discoverapi"
@ -171,10 +172,12 @@ func (c *controller) agentSetup() error {
advAddr := clusterProvider.GetAdvertiseAddress()
remote := clusterProvider.GetRemoteAddress()
remoteAddr, _, _ := net.SplitHostPort(remote)
listen := clusterProvider.GetListenAddress()
listenAddr, _, _ := net.SplitHostPort(listen)
logrus.Infof("Initializing Libnetwork Agent Local-addr=%s Adv-addr=%s Remote-addr =%s", bindAddr, advAddr, remoteAddr)
logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Remote-addr =%s", listenAddr, bindAddr, advAddr, remoteAddr)
if advAddr != "" && c.agent == nil {
if err := c.agentInit(bindAddr, advAddr); err != nil {
if err := c.agentInit(listenAddr, bindAddr, advAddr); err != nil {
logrus.Errorf("Error in agentInit : %v", err)
} else {
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
@ -183,17 +186,23 @@ func (c *controller) agentSetup() error {
}
return false
})
if c.agent != nil {
close(c.agentInitDone)
}
}
}
if remoteAddr != "" {
if err := c.agentJoin(remoteAddr); err != nil {
logrus.Errorf("Error in agentJoin : %v", err)
return nil
}
}
c.Lock()
if c.agent != nil && c.agentInitDone != nil {
close(c.agentInitDone)
c.agentInitDone = nil
}
c.Unlock()
return nil
}
@ -229,7 +238,7 @@ func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) {
return keys[1].Key, keys[1].LamportTime, nil
}
func (c *controller) agentInit(bindAddrOrInterface, advertiseAddr string) error {
func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr string) error {
if !c.isAgent() {
return nil
}
@ -241,9 +250,13 @@ func (c *controller) agentInit(bindAddrOrInterface, advertiseAddr string) error
keys, tags := c.getKeys(subsysGossip)
hostname, _ := os.Hostname()
nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
logrus.Info("Gossip cluster hostname ", nodeName)
nDB, err := networkdb.New(&networkdb.Config{
BindAddr: listenAddr,
AdvertiseAddr: advertiseAddr,
NodeName: hostname,
NodeName: nodeName,
Keys: keys,
})
@ -328,7 +341,10 @@ func (c *controller) agentClose() {
c.agent.epTblCancel()
c.agent.networkDB.Close()
c.Lock()
c.agent = nil
c.Unlock()
}
func (n *network) isClusterEligible() bool {
@ -455,8 +471,12 @@ func (n *network) addDriverWatches() {
c := n.getController()
for _, tableName := range n.driverTables {
ch, cancel := c.agent.networkDB.Watch(tableName, n.ID(), "")
c.Lock()
if c.agent == nil {
c.Unlock()
return
}
ch, cancel := c.agent.networkDB.Watch(tableName, n.ID(), "")
c.agent.driverCancelFuncs[n.ID()] = append(c.agent.driverCancelFuncs[n.ID()], cancel)
c.Unlock()

View file

@ -5,6 +5,7 @@ type Provider interface {
IsManager() bool
IsAgent() bool
GetLocalAddress() string
GetListenAddress() string
GetAdvertiseAddress() string
GetRemoteAddress() string
ListenClusterEvents() <-chan struct{}

View file

@ -52,6 +52,7 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/discovery"
"github.com/docker/docker/pkg/locker"
"github.com/docker/docker/pkg/plugins"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/libnetwork/cluster"
@ -149,6 +150,7 @@ type controller struct {
ingressSandbox *sandbox
sboxOnce sync.Once
agent *agent
networkLocker *locker.Locker
agentInitDone chan struct{}
keys []*types.EncryptionKey
clusterConfigAvailable bool
@ -169,6 +171,7 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
svcRecords: make(map[string]svcInfo),
serviceBindings: make(map[serviceKey]*service),
agentInitDone: make(chan struct{}),
networkLocker: locker.New(),
}
if err := c.initStores(); err != nil {
@ -307,8 +310,41 @@ func (c *controller) clusterAgentInit() {
c.Lock()
c.clusterConfigAvailable = false
c.agentInitDone = make(chan struct{})
c.keys = nil
c.Unlock()
// We are leaving the cluster. Make sure we
// close the gossip so that we stop all
// incoming gossip updates before cleaning up
// any remaining service bindings. But before
// deleting the networks since the networks
// should still be present when cleaning up
// service bindings
c.agentClose()
c.cleanupServiceBindings("")
c.Lock()
ingressSandbox := c.ingressSandbox
c.ingressSandbox = nil
c.Unlock()
if ingressSandbox != nil {
if err := ingressSandbox.Delete(); err != nil {
log.Warnf("Could not delete ingress sandbox while leaving: %v", err)
}
}
n, err := c.NetworkByName("ingress")
if err != nil {
log.Warnf("Could not find ingress network while leaving: %v", err)
}
if n != nil {
if err := n.Delete(); err != nil {
log.Warnf("Could not delete ingress network while leaving: %v", err)
}
}
return
}
}
@ -317,7 +353,13 @@ func (c *controller) clusterAgentInit() {
// AgentInitWait waits for agent initialization to be completed in the
// controller.
func (c *controller) AgentInitWait() {
<-c.agentInitDone
c.Lock()
agentInitDone := c.agentInitDone
c.Unlock()
if agentInitDone != nil {
<-agentInitDone
}
}
func (c *controller) makeDriverConfig(ntype string) map[string]interface{} {
@ -575,6 +617,15 @@ func (c *controller) RegisterDriver(networkType string, driver driverapi.Driver,
// NewNetwork creates a new network of the specified network type. The options
// are network specific and modeled in a generic way.
func (c *controller) NewNetwork(networkType, name string, id string, options ...NetworkOption) (Network, error) {
if id != "" {
c.networkLocker.Lock(id)
defer c.networkLocker.Unlock(id)
if _, err := c.NetworkByID(id); err == nil {
return nil, NetworkNameError(id)
}
}
if !config.IsValidName(name) {
return nil, ErrInvalidName(name)
}
@ -660,6 +711,9 @@ func (c *controller) NewNetwork(networkType, name string, id string, options ...
}
joinCluster(network)
if !c.isDistributedControl() {
arrangeIngressFilterRule()
}
return network, nil
}
@ -699,9 +753,11 @@ func (c *controller) reservePools() {
c.Gateway = n.ipamV4Info[i].Gateway.IP.String()
}
}
for i, c := range n.ipamV6Config {
if c.Gateway == "" && n.ipamV6Info[i].Gateway != nil {
c.Gateway = n.ipamV6Info[i].Gateway.IP.String()
if n.enableIPv6 {
for i, c := range n.ipamV6Config {
if c.Gateway == "" && n.ipamV6Info[i].Gateway != nil {
c.Gateway = n.ipamV6Info[i].Gateway.IP.String()
}
}
}
// Reserve pools
@ -866,6 +922,7 @@ func (c *controller) NewSandbox(containerID string, options ...SandboxOption) (s
if sb.ingress {
c.ingressSandbox = sb
sb.id = "ingress_sbox"
}
c.Unlock()
defer func() {

View file

@ -1243,8 +1243,17 @@ func (d *driver) ProgramExternalConnectivity(nid, eid string, options map[string
return err
}
defer func() {
if err != nil {
if e := network.releasePorts(endpoint); e != nil {
logrus.Errorf("Failed to release ports allocated for the bridge endpoint %s on failure %v because of %v",
eid, err, e)
}
endpoint.portMapping = nil
}
}()
if err = d.storeUpdate(endpoint); err != nil {
endpoint.portMapping = nil
return fmt.Errorf("failed to update bridge endpoint %s to store: %v", endpoint.id[0:7], err)
}

View file

@ -392,10 +392,11 @@ func (d *driver) secMapWalk(f func(string, []*spi) ([]*spi, bool)) error {
}
func (d *driver) setKeys(keys []*key) error {
if d.keys != nil {
return types.ForbiddenErrorf("initial keys are already present")
}
// Accept the encryption keys and clear any stale encryption map
d.Lock()
d.keys = keys
d.secMap = &encrMap{nodes: map[string][]*spi{}}
d.Unlock()
log.Debugf("Initial encryption keys: %v", d.keys)
return nil
}
@ -433,10 +434,8 @@ func (d *driver) updateKeys(newKey, primary, pruneKey *key) error {
if (newKey != nil && newIdx == -1) ||
(primary != nil && priIdx == -1) ||
(pruneKey != nil && delIdx == -1) {
err := types.BadRequestErrorf("cannot find proper key indices while processing key update:"+
return types.BadRequestErrorf("cannot find proper key indices while processing key update:"+
"(newIdx,priIdx,delIdx):(%d, %d, %d)", newIdx, priIdx, delIdx)
log.Warn(err)
return err
}
d.secMapWalk(func(rIPs string, spis []*spi) ([]*spi, bool) {

View file

@ -12,6 +12,13 @@ const globalChain = "DOCKER-OVERLAY"
var filterOnce sync.Once
var filterChan = make(chan struct{}, 1)
func filterWait() func() {
filterChan <- struct{}{}
return func() { <-filterChan }
}
func chainExists(cname string) bool {
if _, err := iptables.Raw("-L", cname); err != nil {
return false
@ -69,10 +76,14 @@ func setNetworkChain(cname string, remove bool) error {
}
func addNetworkChain(cname string) error {
defer filterWait()()
return setNetworkChain(cname, false)
}
func removeNetworkChain(cname string) error {
defer filterWait()()
return setNetworkChain(cname, true)
}
@ -119,9 +130,13 @@ func setFilters(cname, brName string, remove bool) error {
}
func addFilters(cname, brName string) error {
defer filterWait()()
return setFilters(cname, brName, false)
}
func removeFilters(cname, brName string) error {
defer filterWait()()
return setFilters(cname, brName, true)
}

View file

@ -500,9 +500,13 @@ func (n *network) initSubnetSandbox(s *subnet, restore bool) error {
vxlanName := n.generateVxlanName(s)
if restore {
n.restoreSubnetSandbox(s, brName, vxlanName)
if err := n.restoreSubnetSandbox(s, brName, vxlanName); err != nil {
return err
}
} else {
n.setupSubnetSandbox(s, brName, vxlanName)
if err := n.setupSubnetSandbox(s, brName, vxlanName); err != nil {
return err
}
}
n.Lock()

View file

@ -90,7 +90,20 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
}
}
d.restoreEndpoints()
if err := d.restoreEndpoints(); err != nil {
logrus.Warnf("Failure during overlay endpoints restore: %v", err)
}
// If an error happened when the network join the sandbox during the endpoints restore
// we should reset it now along with the once variable, so that subsequent endpoint joins
// outside of the restore path can potentially fix the network join and succeed.
for nid, n := range d.networks {
if n.initErr != nil {
logrus.Infof("resetting init error and once variable for network %s after unsuccesful endpoint restore: %v", nid, n.initErr)
n.initErr = nil
n.once = &sync.Once{}
}
}
return dc.RegisterDriver(networkType, d, c)
}
@ -323,7 +336,9 @@ func (d *driver) DiscoverNew(dType discoverapi.DiscoveryType, data interface{})
}
keys = append(keys, k)
}
d.setKeys(keys)
if err := d.setKeys(keys); err != nil {
logrus.Warn(err)
}
case discoverapi.EncryptionKeysUpdate:
var newKey, delKey, priKey *key
encrData, ok := data.(discoverapi.DriverEncryptionUpdate)
@ -348,7 +363,9 @@ func (d *driver) DiscoverNew(dType discoverapi.DiscoveryType, data interface{})
tag: uint32(encrData.PruneTag),
}
}
d.updateKeys(newKey, priKey, delKey)
if err := d.updateKeys(newKey, priKey, delKey); err != nil {
logrus.Warn(err)
}
default:
}
return nil

View file

@ -168,14 +168,14 @@ func (d *driver) peerDbAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask
}
func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
peerMac net.HardwareAddr, vtep net.IP) bool {
peerMac net.HardwareAddr, vtep net.IP) peerEntry {
peerDbWg.Wait()
d.peerDb.Lock()
pMap, ok := d.peerDb.mp[nid]
if !ok {
d.peerDb.Unlock()
return false
return peerEntry{}
}
d.peerDb.Unlock()
@ -186,19 +186,20 @@ func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPM
pMap.Lock()
if pEntry, ok := pMap.mp[pKey.String()]; ok {
pEntry, ok := pMap.mp[pKey.String()]
if ok {
// Mismatched endpoint ID(possibly outdated). Do not
// delete peerdb
if pEntry.eid != eid {
pMap.Unlock()
return false
return pEntry
}
}
delete(pMap.mp, pKey.String())
pMap.Unlock()
return true
return pEntry
}
func (d *driver) peerDbUpdateSandbox(nid string) {
@ -312,10 +313,9 @@ func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMas
return err
}
var pEntry peerEntry
if updateDb {
if !d.peerDbDelete(nid, eid, peerIP, peerIPMask, peerMac, vtep) {
return nil
}
pEntry = d.peerDbDelete(nid, eid, peerIP, peerIPMask, peerMac, vtep)
}
n := d.network(nid)
@ -328,14 +328,24 @@ func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMas
return nil
}
// Delete fdb entry to the bridge for the peer mac
if err := sbox.DeleteNeighbor(vtep, peerMac); err != nil {
return fmt.Errorf("could not delete fdb entry into the sandbox: %v", err)
// Delete fdb entry to the bridge for the peer mac only if the
// entry existed in local peerdb. If it is a stale delete
// request, still call DeleteNeighbor but only to cleanup any
// leftover sandbox neighbor cache and not actually delete the
// kernel state.
if (eid == pEntry.eid && vtep.Equal(pEntry.vtep)) ||
(eid != pEntry.eid && !vtep.Equal(pEntry.vtep)) {
if err := sbox.DeleteNeighbor(vtep, peerMac,
eid == pEntry.eid && vtep.Equal(pEntry.vtep)); err != nil {
return fmt.Errorf("could not delete fdb entry into the sandbox: %v", err)
}
}
// Delete neighbor entry for the peer IP
if err := sbox.DeleteNeighbor(peerIP, peerMac); err != nil {
return fmt.Errorf("could not delete neighbor entry into the sandbox: %v", err)
if eid == pEntry.eid {
if err := sbox.DeleteNeighbor(peerIP, peerMac, true); err != nil {
return fmt.Errorf("could not delete neighbor entry into the sandbox: %v", err)
}
}
if err := d.checkEncryption(nid, vtep, 0, false, false); err != nil {

View file

@ -757,17 +757,6 @@ func (ep *endpoint) Delete(force bool) error {
}
}()
if err = n.getEpCnt().DecEndpointCnt(); err != nil && !force {
return err
}
defer func() {
if err != nil && !force {
if e := n.getEpCnt().IncEndpointCnt(); e != nil {
log.Warnf("failed to update network %s : %v", n.name, e)
}
}
}()
// unwatch for service records
n.getController().unWatchSvcRecord(ep)
@ -777,6 +766,10 @@ func (ep *endpoint) Delete(force bool) error {
ep.releaseAddress()
if err := n.getEpCnt().DecEndpointCnt(); err != nil {
log.Warnf("failed to decrement endpoint coint for ep %s: %v", ep.ID(), err)
}
return nil
}

View file

@ -756,6 +756,19 @@ func (n *network) delete(force bool) error {
log.Warnf("Failed to update store after ipam release for network %s (%s): %v", n.Name(), n.ID(), err)
}
// We are about to delete the network. Leave the gossip
// cluster for the network to stop all incoming network
// specific gossip updates before cleaning up all the service
// bindings for the network. But cleanup service binding
// before deleting the network from the store since service
// bindings cleanup requires the network in the store.
n.cancelDriverWatches()
if err = n.leaveCluster(); err != nil {
log.Errorf("Failed leaving network %s from the agent cluster: %v", n.Name(), err)
}
c.cleanupServiceBindings(n.ID())
// deleteFromStore performs an atomic delete operation and the
// network.epCnt will help prevent any possible
// race between endpoint join and network delete
@ -770,12 +783,6 @@ func (n *network) delete(force bool) error {
return fmt.Errorf("error deleting network from store: %v", err)
}
n.cancelDriverWatches()
if err = n.leaveCluster(); err != nil {
log.Errorf("Failed leaving network %s from the agent cluster: %v", n.Name(), err)
}
return nil
}
@ -1118,7 +1125,7 @@ func (n *network) getSvcRecords(ep *endpoint) []etchosts.Record {
continue
}
if len(ip) == 0 {
log.Warnf("Found empty list of IP addresses for service %s on network %s (%s)", h, n.Name(), n.ID())
log.Warnf("Found empty list of IP addresses for service %s on network %s (%s)", h, n.name, n.id)
continue
}
recs = append(recs, etchosts.Record{

View file

@ -1,10 +1,15 @@
package networkdb
import (
"fmt"
"time"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
)
const broadcastTimeout = 5 * time.Second
type networkEventMessage struct {
id string
node string
@ -44,6 +49,53 @@ func (nDB *NetworkDB) sendNetworkEvent(nid string, event NetworkEvent_Type, ltim
return nil
}
type nodeEventMessage struct {
msg []byte
notify chan<- struct{}
}
func (m *nodeEventMessage) Invalidates(other memberlist.Broadcast) bool {
return false
}
func (m *nodeEventMessage) Message() []byte {
return m.msg
}
func (m *nodeEventMessage) Finished() {
if m.notify != nil {
close(m.notify)
}
}
func (nDB *NetworkDB) sendNodeEvent(event NodeEvent_Type) error {
nEvent := NodeEvent{
Type: event,
LTime: nDB.networkClock.Increment(),
NodeName: nDB.config.NodeName,
}
raw, err := encodeMessage(MessageTypeNodeEvent, &nEvent)
if err != nil {
return err
}
notifyCh := make(chan struct{})
nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{
msg: raw,
notify: notifyCh,
})
// Wait for the broadcast
select {
case <-notifyCh:
case <-time.After(broadcastTimeout):
return fmt.Errorf("timed out broadcasting node event")
}
return nil
}
type tableEventMessage struct {
id string
tname string

View file

@ -7,6 +7,7 @@ import (
"fmt"
"math/big"
rnd "math/rand"
"net"
"strings"
"time"
@ -14,7 +15,11 @@ import (
"github.com/hashicorp/memberlist"
)
const reapInterval = 30 * time.Second
const (
reapInterval = 60 * time.Second
reapPeriod = 5 * time.Second
retryInterval = 1 * time.Second
)
type logWriter struct{}
@ -81,6 +86,7 @@ func (nDB *NetworkDB) RemoveKey(key []byte) {
func (nDB *NetworkDB) clusterInit() error {
config := memberlist.DefaultLANConfig()
config.Name = nDB.config.NodeName
config.BindAddr = nDB.config.BindAddr
config.AdvertiseAddr = nDB.config.AdvertiseAddr
if nDB.config.BindPort != 0 {
@ -111,6 +117,13 @@ func (nDB *NetworkDB) clusterInit() error {
RetransmitMult: config.RetransmitMult,
}
nDB.nodeBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(nDB.nodes)
},
RetransmitMult: config.RetransmitMult,
}
mlist, err := memberlist.Create(config)
if err != nil {
return fmt.Errorf("failed to create memberlist: %v", err)
@ -124,9 +137,10 @@ func (nDB *NetworkDB) clusterInit() error {
interval time.Duration
fn func()
}{
{reapInterval, nDB.reapState},
{reapPeriod, nDB.reapState},
{config.GossipInterval, nDB.gossip},
{config.PushPullInterval, nDB.bulkSyncTables},
{retryInterval, nDB.reconnectNode},
} {
t := time.NewTicker(trigger.interval)
go nDB.triggerFunc(trigger.interval, t.C, nDB.stopCh, trigger.fn)
@ -136,19 +150,49 @@ func (nDB *NetworkDB) clusterInit() error {
return nil
}
func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) {
t := time.NewTicker(retryInterval)
defer t.Stop()
for {
select {
case <-t.C:
if _, err := nDB.memberlist.Join(members); err != nil {
logrus.Errorf("Failed to join memberlist %s on retry: %v", members, err)
continue
}
return
case <-stop:
return
}
}
}
func (nDB *NetworkDB) clusterJoin(members []string) error {
mlist := nDB.memberlist
if _, err := mlist.Join(members); err != nil {
// Incase of failure, keep retrying join until it succeeds or the cluster is shutdown.
go nDB.retryJoin(members, nDB.stopCh)
return fmt.Errorf("could not join node to memberlist: %v", err)
}
if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil {
return fmt.Errorf("failed to send node join: %v", err)
}
return nil
}
func (nDB *NetworkDB) clusterLeave() error {
mlist := nDB.memberlist
if err := nDB.sendNodeEvent(NodeEventTypeLeave); err != nil {
logrus.Errorf("failed to send node leave: %v", err)
}
if err := mlist.Leave(time.Second); err != nil {
return err
}
@ -180,6 +224,42 @@ func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, sto
}
}
func (nDB *NetworkDB) reconnectNode() {
nDB.RLock()
if len(nDB.failedNodes) == 0 {
nDB.RUnlock()
return
}
nodes := make([]*node, 0, len(nDB.failedNodes))
for _, n := range nDB.failedNodes {
nodes = append(nodes, n)
}
nDB.RUnlock()
node := nodes[randomOffset(len(nodes))]
addr := net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
if _, err := nDB.memberlist.Join([]string{addr.String()}); err != nil {
return
}
if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil {
logrus.Errorf("failed to send node join during reconnect: %v", err)
return
}
// Update all the local table state to a new time to
// force update on the node we are trying to rejoin, just in
// case that node has these in deleting state still. This is
// facilitate fast convergence after recovering from a gossip
// failure.
nDB.updateLocalTableTime()
logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name)
nDB.bulkSync([]string{node.Name}, true)
}
func (nDB *NetworkDB) reapState() {
nDB.reapNetworks()
nDB.reapTableEntries()
@ -200,10 +280,7 @@ func (nDB *NetworkDB) reapNetworks() {
}
func (nDB *NetworkDB) reapTableEntries() {
var (
paths []string
entries []*entry
)
var paths []string
now := time.Now()
@ -219,14 +296,12 @@ func (nDB *NetworkDB) reapTableEntries() {
}
paths = append(paths, path)
entries = append(entries, entry)
return false
})
nDB.RUnlock()
nDB.Lock()
for i, path := range paths {
entry := entries[i]
for _, path := range paths {
params := strings.Split(path[1:], "/")
tname := params[0]
nid := params[1]
@ -239,8 +314,6 @@ func (nDB *NetworkDB) reapTableEntries() {
if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
}
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
}
nDB.Unlock()
}
@ -295,7 +368,7 @@ func (nDB *NetworkDB) gossip() {
}
// Send the compound message
if err := nDB.memberlist.SendToUDP(mnode, compound); err != nil {
if err := nDB.memberlist.SendToUDP(&mnode.Node, compound); err != nil {
logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err)
}
}
@ -330,7 +403,7 @@ func (nDB *NetworkDB) bulkSyncTables() {
continue
}
completed, err := nDB.bulkSync(nid, nodes, false)
completed, err := nDB.bulkSync(nodes, false)
if err != nil {
logrus.Errorf("periodic bulk sync failure for network %s: %v", nid, err)
continue
@ -357,7 +430,7 @@ func (nDB *NetworkDB) bulkSyncTables() {
}
}
func (nDB *NetworkDB) bulkSync(nid string, nodes []string, all bool) ([]string, error) {
func (nDB *NetworkDB) bulkSync(nodes []string, all bool) ([]string, error) {
if !all {
// If not all, then just pick one.
nodes = nDB.mRandomNodes(1, nodes)
@ -395,7 +468,12 @@ func (nDB *NetworkDB) bulkSync(nid string, nodes []string, all bool) ([]string,
func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited bool) error {
var msgs [][]byte
logrus.Debugf("%s: Initiating bulk sync for networks %v with node %s", nDB.config.NodeName, networks, node)
var unsolMsg string
if unsolicited {
unsolMsg = "unsolicited"
}
logrus.Debugf("%s: Initiating %s bulk sync for networks %v with node %s", nDB.config.NodeName, unsolMsg, networks, node)
nDB.RLock()
mnode := nDB.nodes[node]
@ -411,15 +489,14 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
return false
}
// Do not bulk sync state which is in the
// process of getting deleted.
eType := TableEventTypeCreate
if entry.deleting {
return false
eType = TableEventTypeDelete
}
params := strings.Split(path[1:], "/")
tEvent := TableEvent{
Type: TableEventTypeCreate,
Type: eType,
LTime: entry.ltime,
NodeName: entry.node,
NetworkID: nid,
@ -461,7 +538,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
nDB.bulkSyncAckTbl[node] = ch
nDB.Unlock()
err = nDB.memberlist.SendToTCP(mnode, buf)
err = nDB.memberlist.SendToTCP(&mnode.Node, buf)
if err != nil {
nDB.Lock()
delete(nDB.bulkSyncAckTbl, node)

View file

@ -3,6 +3,7 @@ package networkdb
import (
"fmt"
"net"
"strings"
"time"
"github.com/Sirupsen/logrus"
@ -17,6 +18,76 @@ func (d *delegate) NodeMeta(limit int) []byte {
return []byte{}
}
func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
nDB.Lock()
defer nDB.Unlock()
for _, nodes := range []map[string]*node{
nDB.failedNodes,
nDB.leftNodes,
nDB.nodes,
} {
if n, ok := nodes[nEvent.NodeName]; ok {
if n.ltime >= nEvent.LTime {
return nil
}
delete(nodes, n.Name)
return n
}
}
return nil
}
func (nDB *NetworkDB) purgeSameNode(n *node) {
nDB.Lock()
defer nDB.Unlock()
prefix := strings.Split(n.Name, "-")[0]
for _, nodes := range []map[string]*node{
nDB.failedNodes,
nDB.leftNodes,
nDB.nodes,
} {
var nodeNames []string
for name, node := range nodes {
if strings.HasPrefix(name, prefix) && n.Addr.Equal(node.Addr) {
nodeNames = append(nodeNames, name)
}
}
for _, name := range nodeNames {
delete(nodes, name)
}
}
}
func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
n := nDB.checkAndGetNode(nEvent)
if n == nil {
return false
}
nDB.purgeSameNode(n)
n.ltime = nEvent.LTime
switch nEvent.Type {
case NodeEventTypeJoin:
nDB.Lock()
nDB.nodes[n.Name] = n
nDB.Unlock()
return true
case NodeEventTypeLeave:
nDB.Lock()
nDB.leftNodes[n.Name] = n
nDB.Unlock()
return true
}
return false
}
func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
// Update our local clock if the received messages has newer
// time.
@ -53,6 +124,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
n.leaveTime = time.Now()
}
nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
return true
}
@ -66,7 +138,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
ltime: nEvent.LTime,
}
nDB.networkNodes[nEvent.NetworkID] = append(nDB.networkNodes[nEvent.NetworkID], nEvent.NodeName)
nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName)
return true
}
@ -84,28 +156,34 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
return true
}
if entry, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key); err == nil {
e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key)
if err != nil && tEvent.Type == TableEventTypeDelete {
// If it is a delete event and we don't have the entry here nothing to do.
return false
}
if err == nil {
// We have the latest state. Ignore the event
// since it is stale.
if entry.ltime >= tEvent.LTime {
if e.ltime >= tEvent.LTime {
return false
}
}
entry := &entry{
e = &entry{
ltime: tEvent.LTime,
node: tEvent.NodeName,
value: tEvent.Value,
deleting: tEvent.Type == TableEventTypeDelete,
}
if entry.deleting {
entry.deleteTime = time.Now()
if e.deleting {
e.deleteTime = time.Now()
}
nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), entry)
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), e)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e)
nDB.Unlock()
var op opType
@ -181,6 +259,27 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
}
}
func (nDB *NetworkDB) handleNodeMessage(buf []byte) {
var nEvent NodeEvent
if err := proto.Unmarshal(buf, &nEvent); err != nil {
logrus.Errorf("Error decoding node event message: %v", err)
return
}
if rebroadcast := nDB.handleNodeEvent(&nEvent); rebroadcast {
var err error
buf, err = encodeRawMessage(MessageTypeNodeEvent, buf)
if err != nil {
logrus.Errorf("Error marshalling gossip message for node event rebroadcast: %v", err)
return
}
nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{
msg: buf,
})
}
}
func (nDB *NetworkDB) handleNetworkMessage(buf []byte) {
var nEvent NetworkEvent
if err := proto.Unmarshal(buf, &nEvent); err != nil {
@ -249,6 +348,8 @@ func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) {
}
switch mType {
case MessageTypeNodeEvent:
nDB.handleNodeMessage(data)
case MessageTypeNetworkEvent:
nDB.handleNetworkMessage(data)
case MessageTypeTableEvent:
@ -271,15 +372,27 @@ func (d *delegate) NotifyMsg(buf []byte) {
}
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
return d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
msgs := d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
msgs = append(msgs, d.nDB.nodeBroadcasts.GetBroadcasts(overhead, limit)...)
return msgs
}
func (d *delegate) LocalState(join bool) []byte {
if join {
// Update all the local node/network state to a new time to
// force update on the node we are trying to rejoin, just in
// case that node has these in leaving state still. This is
// facilitate fast convergence after recovering from a gossip
// failure.
d.nDB.updateLocalNetworkTime()
}
d.nDB.RLock()
defer d.nDB.RUnlock()
pp := NetworkPushPull{
LTime: d.nDB.networkClock.Time(),
LTime: d.nDB.networkClock.Time(),
NodeName: d.nDB.config.NodeName,
}
for name, nn := range d.nDB.networks {
@ -325,9 +438,12 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
return
}
if pp.LTime > 0 {
d.nDB.networkClock.Witness(pp.LTime)
nodeEvent := &NodeEvent{
LTime: pp.LTime,
NodeName: pp.NodeName,
Type: NodeEventTypeJoin,
}
d.nDB.handleNodeEvent(nodeEvent)
for _, n := range pp.Networks {
nEvent := &NetworkEvent{

View file

@ -6,17 +6,31 @@ type eventDelegate struct {
nDB *NetworkDB
}
func (e *eventDelegate) NotifyJoin(n *memberlist.Node) {
func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
e.nDB.Lock()
e.nDB.nodes[n.Name] = n
// In case the node is rejoining after a failure or leave,
// wait until an explicit join message arrives before adding
// it to the nodes just to make sure this is not a stale
// join. If you don't know about this node add it immediately.
_, fOk := e.nDB.failedNodes[mn.Name]
_, lOk := e.nDB.leftNodes[mn.Name]
if fOk || lOk {
e.nDB.Unlock()
return
}
e.nDB.nodes[mn.Name] = &node{Node: *mn}
e.nDB.Unlock()
}
func (e *eventDelegate) NotifyLeave(n *memberlist.Node) {
e.nDB.deleteNodeTableEntries(n.Name)
e.nDB.deleteNetworkNodeEntries(n.Name)
func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
e.nDB.deleteNodeTableEntries(mn.Name)
e.nDB.deleteNetworkEntriesForNode(mn.Name)
e.nDB.Lock()
delete(e.nDB.nodes, n.Name)
if n, ok := e.nDB.nodes[mn.Name]; ok {
delete(e.nDB.nodes, mn.Name)
e.nDB.failedNodes[mn.Name] = n
}
e.nDB.Unlock()
}

View file

@ -41,7 +41,13 @@ type NetworkDB struct {
// List of all peer nodes in the cluster not-limited to any
// network.
nodes map[string]*memberlist.Node
nodes map[string]*node
// List of all peer nodes which have failed
failedNodes map[string]*node
// List of all peer nodes which have left
leftNodes map[string]*node
// A multi-dimensional map of network/node attachmemts. The
// first key is a node name and the second key is a network ID
@ -66,6 +72,9 @@ type NetworkDB struct {
// Broadcast queue for network event gossip.
networkBroadcasts *memberlist.TransmitLimitedQueue
// Broadcast queue for node event gossip.
nodeBroadcasts *memberlist.TransmitLimitedQueue
// A central stop channel to stop all go routines running on
// behalf of the NetworkDB instance.
stopCh chan struct{}
@ -82,6 +91,11 @@ type NetworkDB struct {
keyring *memberlist.Keyring
}
type node struct {
memberlist.Node
ltime serf.LamportTime
}
// network describes the node/network attachment.
type network struct {
// Network ID
@ -107,6 +121,10 @@ type Config struct {
// NodeName is the cluster wide unique name for this node.
NodeName string
// BindAddr is the IP on which networkdb listens. It can be
// 0.0.0.0 to listen on all addresses on the host.
BindAddr string
// AdvertiseAddr is the node's IP address that we advertise for
// cluster communication.
AdvertiseAddr string
@ -146,7 +164,9 @@ func New(c *Config) (*NetworkDB, error) {
config: c,
indexes: make(map[int]*radix.Tree),
networks: make(map[string]map[string]*network),
nodes: make(map[string]*memberlist.Node),
nodes: make(map[string]*node),
failedNodes: make(map[string]*node),
leftNodes: make(map[string]*node),
networkNodes: make(map[string][]string),
bulkSyncAckTbl: make(map[string]chan struct{}),
broadcaster: events.NewBroadcaster(),
@ -286,7 +306,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
return nil
}
func (nDB *NetworkDB) deleteNetworkNodeEntries(deletedNode string) {
func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) {
nDB.Lock()
for nid, nodes := range nDB.networkNodes {
updatedNodes := make([]string, 0, len(nodes))
@ -300,6 +320,8 @@ func (nDB *NetworkDB) deleteNetworkNodeEntries(deletedNode string) {
nDB.networkNodes[nid] = updatedNodes
}
delete(nDB.networks, deletedNode)
nDB.Unlock()
}
@ -326,6 +348,8 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
return false
})
nDB.Unlock()
@ -387,7 +411,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
}
logrus.Debugf("%s: joined network %s", nDB.config.NodeName, nid)
if _, err := nDB.bulkSync(nid, networkNodes, true); err != nil {
if _, err := nDB.bulkSync(networkNodes, true); err != nil {
logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
}
@ -453,6 +477,20 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
return nil
}
// addNetworkNode adds the node to the list of nodes which participate
// in the passed network only if it is not already present. Caller
// should hold the NetworkDB lock while calling this
func (nDB *NetworkDB) addNetworkNode(nid string, nodeName string) {
nodes := nDB.networkNodes[nid]
for _, node := range nodes {
if node == nodeName {
return
}
}
nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nodeName)
}
// Deletes the node from the list of nodes which participate in the
// passed network. Caller should hold the NetworkDB lock while calling
// this
@ -476,10 +514,46 @@ func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
var networks []string
for nid := range nDB.networks[nDB.config.NodeName] {
if _, ok := nDB.networks[nodeName][nid]; ok {
networks = append(networks, nid)
if n, ok := nDB.networks[nodeName][nid]; ok {
if !n.leaving {
networks = append(networks, nid)
}
}
}
return networks
}
func (nDB *NetworkDB) updateLocalNetworkTime() {
nDB.Lock()
defer nDB.Unlock()
ltime := nDB.networkClock.Increment()
for _, n := range nDB.networks[nDB.config.NodeName] {
n.ltime = ltime
}
}
func (nDB *NetworkDB) updateLocalTableTime() {
nDB.Lock()
defer nDB.Unlock()
ltime := nDB.tableClock.Increment()
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
entry := v.(*entry)
if entry.node != nDB.config.NodeName {
return false
}
params := strings.Split(path[1:], "/")
tname := params[0]
nid := params[1]
key := params[2]
entry.ltime = ltime
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
return false
})
}

View file

@ -10,6 +10,7 @@
It has these top-level messages:
GossipMessage
NodeEvent
NetworkEvent
NetworkEntry
NetworkPushPull
@ -67,6 +68,9 @@ const (
// which is a pack of many message of above types, packed into
// a single compound message.
MessageTypeCompound MessageType = 5
// NodeEvent message type is used to communicare node
// join/leave events in the cluster
MessageTypeNodeEvent MessageType = 6
)
var MessageType_name = map[int32]string{
@ -76,6 +80,7 @@ var MessageType_name = map[int32]string{
3: "PUSH_PULL",
4: "BULK_SYNC",
5: "COMPOUND",
6: "NODE_EVENT",
}
var MessageType_value = map[string]int32{
"INVALID": 0,
@ -84,6 +89,7 @@ var MessageType_value = map[string]int32{
"PUSH_PULL": 3,
"BULK_SYNC": 4,
"COMPOUND": 5,
"NODE_EVENT": 6,
}
func (x MessageType) String() string {
@ -91,6 +97,32 @@ func (x MessageType) String() string {
}
func (MessageType) EnumDescriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{0} }
type NodeEvent_Type int32
const (
NodeEventTypeInvalid NodeEvent_Type = 0
// Join event is generated when this node joins the cluster.
NodeEventTypeJoin NodeEvent_Type = 1
// Leave event is generated when this node leaves the cluster.
NodeEventTypeLeave NodeEvent_Type = 2
)
var NodeEvent_Type_name = map[int32]string{
0: "INVALID",
1: "JOIN",
2: "LEAVE",
}
var NodeEvent_Type_value = map[string]int32{
"INVALID": 0,
"JOIN": 1,
"LEAVE": 2,
}
func (x NodeEvent_Type) String() string {
return proto.EnumName(NodeEvent_Type_name, int32(x))
}
func (NodeEvent_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{1, 0} }
type NetworkEvent_Type int32
const (
@ -115,7 +147,7 @@ var NetworkEvent_Type_value = map[string]int32{
func (x NetworkEvent_Type) String() string {
return proto.EnumName(NetworkEvent_Type_name, int32(x))
}
func (NetworkEvent_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{1, 0} }
func (NetworkEvent_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{2, 0} }
type TableEvent_Type int32
@ -148,7 +180,7 @@ var TableEvent_Type_value = map[string]int32{
func (x TableEvent_Type) String() string {
return proto.EnumName(TableEvent_Type_name, int32(x))
}
func (TableEvent_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{4, 0} }
func (TableEvent_Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{5, 0} }
// GossipMessage is a basic message header used by all messages types.
type GossipMessage struct {
@ -160,6 +192,21 @@ func (m *GossipMessage) Reset() { *m = GossipMessage{} }
func (*GossipMessage) ProtoMessage() {}
func (*GossipMessage) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{0} }
// NodeEvent message payload definition.
type NodeEvent struct {
Type NodeEvent_Type `protobuf:"varint,1,opt,name=type,proto3,enum=networkdb.NodeEvent_Type" json:"type,omitempty"`
// Lamport time using a network lamport clock indicating the
// time this event was generated on the node where it was
// generated.
LTime github_com_hashicorp_serf_serf.LamportTime `protobuf:"varint,2,opt,name=l_time,json=lTime,proto3,customtype=github.com/hashicorp/serf/serf.LamportTime" json:"l_time"`
// Source node name.
NodeName string `protobuf:"bytes,3,opt,name=node_name,json=nodeName,proto3" json:"node_name,omitempty"`
}
func (m *NodeEvent) Reset() { *m = NodeEvent{} }
func (*NodeEvent) ProtoMessage() {}
func (*NodeEvent) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{1} }
// NetworkEvent message payload definition.
type NetworkEvent struct {
Type NetworkEvent_Type `protobuf:"varint,1,opt,name=type,proto3,enum=networkdb.NetworkEvent_Type" json:"type,omitempty"`
@ -175,7 +222,7 @@ type NetworkEvent struct {
func (m *NetworkEvent) Reset() { *m = NetworkEvent{} }
func (*NetworkEvent) ProtoMessage() {}
func (*NetworkEvent) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{1} }
func (*NetworkEvent) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{2} }
// NetworkEntry for push pull of networks.
type NetworkEntry struct {
@ -192,18 +239,20 @@ type NetworkEntry struct {
func (m *NetworkEntry) Reset() { *m = NetworkEntry{} }
func (*NetworkEntry) ProtoMessage() {}
func (*NetworkEntry) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{2} }
func (*NetworkEntry) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{3} }
// NetworkPushpull message payload definition.
type NetworkPushPull struct {
// Lamport time when this push pull was initiated.
LTime github_com_hashicorp_serf_serf.LamportTime `protobuf:"varint,1,opt,name=l_time,json=lTime,proto3,customtype=github.com/hashicorp/serf/serf.LamportTime" json:"l_time"`
Networks []*NetworkEntry `protobuf:"bytes,2,rep,name=networks" json:"networks,omitempty"`
// Name of the node sending this push pull payload.
NodeName string `protobuf:"bytes,3,opt,name=node_name,json=nodeName,proto3" json:"node_name,omitempty"`
}
func (m *NetworkPushPull) Reset() { *m = NetworkPushPull{} }
func (*NetworkPushPull) ProtoMessage() {}
func (*NetworkPushPull) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{3} }
func (*NetworkPushPull) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{4} }
func (m *NetworkPushPull) GetNetworks() []*NetworkEntry {
if m != nil {
@ -231,7 +280,7 @@ type TableEvent struct {
func (m *TableEvent) Reset() { *m = TableEvent{} }
func (*TableEvent) ProtoMessage() {}
func (*TableEvent) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{4} }
func (*TableEvent) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{5} }
// BulkSync message payload definition.
type BulkSyncMessage struct {
@ -251,7 +300,7 @@ type BulkSyncMessage struct {
func (m *BulkSyncMessage) Reset() { *m = BulkSyncMessage{} }
func (*BulkSyncMessage) ProtoMessage() {}
func (*BulkSyncMessage) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{5} }
func (*BulkSyncMessage) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{6} }
// Compound message payload definition.
type CompoundMessage struct {
@ -261,7 +310,7 @@ type CompoundMessage struct {
func (m *CompoundMessage) Reset() { *m = CompoundMessage{} }
func (*CompoundMessage) ProtoMessage() {}
func (*CompoundMessage) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{6} }
func (*CompoundMessage) Descriptor() ([]byte, []int) { return fileDescriptorNetworkdb, []int{7} }
func (m *CompoundMessage) GetMessages() []*CompoundMessage_SimpleMessage {
if m != nil {
@ -279,11 +328,12 @@ type CompoundMessage_SimpleMessage struct {
func (m *CompoundMessage_SimpleMessage) Reset() { *m = CompoundMessage_SimpleMessage{} }
func (*CompoundMessage_SimpleMessage) ProtoMessage() {}
func (*CompoundMessage_SimpleMessage) Descriptor() ([]byte, []int) {
return fileDescriptorNetworkdb, []int{6, 0}
return fileDescriptorNetworkdb, []int{7, 0}
}
func init() {
proto.RegisterType((*GossipMessage)(nil), "networkdb.GossipMessage")
proto.RegisterType((*NodeEvent)(nil), "networkdb.NodeEvent")
proto.RegisterType((*NetworkEvent)(nil), "networkdb.NetworkEvent")
proto.RegisterType((*NetworkEntry)(nil), "networkdb.NetworkEntry")
proto.RegisterType((*NetworkPushPull)(nil), "networkdb.NetworkPushPull")
@ -292,6 +342,7 @@ func init() {
proto.RegisterType((*CompoundMessage)(nil), "networkdb.CompoundMessage")
proto.RegisterType((*CompoundMessage_SimpleMessage)(nil), "networkdb.CompoundMessage.SimpleMessage")
proto.RegisterEnum("networkdb.MessageType", MessageType_name, MessageType_value)
proto.RegisterEnum("networkdb.NodeEvent_Type", NodeEvent_Type_name, NodeEvent_Type_value)
proto.RegisterEnum("networkdb.NetworkEvent_Type", NetworkEvent_Type_name, NetworkEvent_Type_value)
proto.RegisterEnum("networkdb.TableEvent_Type", TableEvent_Type_name, TableEvent_Type_value)
}
@ -306,6 +357,18 @@ func (this *GossipMessage) GoString() string {
s = append(s, "}")
return strings.Join(s, "")
}
func (this *NodeEvent) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 7)
s = append(s, "&networkdb.NodeEvent{")
s = append(s, "Type: "+fmt.Sprintf("%#v", this.Type)+",\n")
s = append(s, "LTime: "+fmt.Sprintf("%#v", this.LTime)+",\n")
s = append(s, "NodeName: "+fmt.Sprintf("%#v", this.NodeName)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
func (this *NetworkEvent) GoString() string {
if this == nil {
return "nil"
@ -336,12 +399,13 @@ func (this *NetworkPushPull) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 6)
s := make([]string, 0, 7)
s = append(s, "&networkdb.NetworkPushPull{")
s = append(s, "LTime: "+fmt.Sprintf("%#v", this.LTime)+",\n")
if this.Networks != nil {
s = append(s, "Networks: "+fmt.Sprintf("%#v", this.Networks)+",\n")
}
s = append(s, "NodeName: "+fmt.Sprintf("%#v", this.NodeName)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
@ -451,6 +515,40 @@ func (m *GossipMessage) MarshalTo(data []byte) (int, error) {
return i, nil
}
func (m *NodeEvent) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
n, err := m.MarshalTo(data)
if err != nil {
return nil, err
}
return data[:n], nil
}
func (m *NodeEvent) MarshalTo(data []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.Type != 0 {
data[i] = 0x8
i++
i = encodeVarintNetworkdb(data, i, uint64(m.Type))
}
if m.LTime != 0 {
data[i] = 0x10
i++
i = encodeVarintNetworkdb(data, i, uint64(m.LTime))
}
if len(m.NodeName) > 0 {
data[i] = 0x1a
i++
i = encodeVarintNetworkdb(data, i, uint64(len(m.NodeName)))
i += copy(data[i:], m.NodeName)
}
return i, nil
}
func (m *NetworkEvent) Marshal() (data []byte, err error) {
size := m.Size()
data = make([]byte, size)
@ -568,6 +666,12 @@ func (m *NetworkPushPull) MarshalTo(data []byte) (int, error) {
i += n
}
}
if len(m.NodeName) > 0 {
data[i] = 0x1a
i++
i = encodeVarintNetworkdb(data, i, uint64(len(m.NodeName)))
i += copy(data[i:], m.NodeName)
}
return i, nil
}
@ -783,6 +887,22 @@ func (m *GossipMessage) Size() (n int) {
return n
}
func (m *NodeEvent) Size() (n int) {
var l int
_ = l
if m.Type != 0 {
n += 1 + sovNetworkdb(uint64(m.Type))
}
if m.LTime != 0 {
n += 1 + sovNetworkdb(uint64(m.LTime))
}
l = len(m.NodeName)
if l > 0 {
n += 1 + l + sovNetworkdb(uint64(l))
}
return n
}
func (m *NetworkEvent) Size() (n int) {
var l int
_ = l
@ -835,6 +955,10 @@ func (m *NetworkPushPull) Size() (n int) {
n += 1 + l + sovNetworkdb(uint64(l))
}
}
l = len(m.NodeName)
if l > 0 {
n += 1 + l + sovNetworkdb(uint64(l))
}
return n
}
@ -942,6 +1066,18 @@ func (this *GossipMessage) String() string {
}, "")
return s
}
func (this *NodeEvent) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&NodeEvent{`,
`Type:` + fmt.Sprintf("%v", this.Type) + `,`,
`LTime:` + fmt.Sprintf("%v", this.LTime) + `,`,
`NodeName:` + fmt.Sprintf("%v", this.NodeName) + `,`,
`}`,
}, "")
return s
}
func (this *NetworkEvent) String() string {
if this == nil {
return "nil"
@ -975,6 +1111,7 @@ func (this *NetworkPushPull) String() string {
s := strings.Join([]string{`&NetworkPushPull{`,
`LTime:` + fmt.Sprintf("%v", this.LTime) + `,`,
`Networks:` + strings.Replace(fmt.Sprintf("%v", this.Networks), "NetworkEntry", "NetworkEntry", 1) + `,`,
`NodeName:` + fmt.Sprintf("%v", this.NodeName) + `,`,
`}`,
}, "")
return s
@ -1137,6 +1274,123 @@ func (m *GossipMessage) Unmarshal(data []byte) error {
}
return nil
}
func (m *NodeEvent) Unmarshal(data []byte) error {
l := len(data)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowNetworkdb
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: NodeEvent: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: NodeEvent: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType)
}
m.Type = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowNetworkdb
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.Type |= (NodeEvent_Type(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field LTime", wireType)
}
m.LTime = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowNetworkdb
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
m.LTime |= (github_com_hashicorp_serf_serf.LamportTime(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field NodeName", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowNetworkdb
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthNetworkdb
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.NodeName = string(data[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipNetworkdb(data[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthNetworkdb
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *NetworkEvent) Unmarshal(data []byte) error {
l := len(data)
iNdEx := 0
@ -1509,6 +1763,35 @@ func (m *NetworkPushPull) Unmarshal(data []byte) error {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field NodeName", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowNetworkdb
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthNetworkdb
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.NodeName = string(data[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipNetworkdb(data[iNdEx:])
@ -2211,56 +2494,61 @@ var (
)
var fileDescriptorNetworkdb = []byte{
// 812 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xcc, 0x95, 0x4d, 0x6f, 0xe2, 0x46,
0x18, 0xc7, 0x31, 0x18, 0x02, 0x0f, 0xd0, 0x20, 0x27, 0x4d, 0x5c, 0xa7, 0x25, 0x91, 0x9b, 0x46,
0x14, 0x55, 0x4e, 0x95, 0x7c, 0x02, 0x5e, 0xac, 0x96, 0xc4, 0x31, 0xc8, 0x40, 0xaa, 0x9e, 0x90,
0xc1, 0x53, 0xb0, 0x62, 0x6c, 0x0b, 0x9b, 0x54, 0xdc, 0xaa, 0x9e, 0xa2, 0xde, 0x7a, 0xad, 0xd4,
0x53, 0x7b, 0xee, 0x07, 0xe8, 0xa1, 0xe7, 0xa8, 0xa7, 0xf6, 0xb6, 0xda, 0x43, 0xb4, 0xc9, 0x27,
0xd8, 0x8f, 0xb0, 0xe3, 0xc1, 0x86, 0x81, 0x44, 0xb9, 0xec, 0x6a, 0xb5, 0x07, 0xc3, 0xbc, 0xfc,
0xe6, 0xd1, 0xff, 0x79, 0xe6, 0x3f, 0x33, 0xb0, 0x69, 0x23, 0xff, 0x47, 0x67, 0x72, 0x65, 0xf4,
0x25, 0x77, 0xe2, 0xf8, 0x0e, 0x97, 0x59, 0x0c, 0x08, 0xdb, 0x43, 0x67, 0xe8, 0x90, 0xd1, 0xe3,
0xa0, 0x35, 0x07, 0xc4, 0x26, 0xe4, 0xbf, 0x71, 0x3c, 0xcf, 0x74, 0x2f, 0x90, 0xe7, 0xe9, 0x43,
0xc4, 0x95, 0x81, 0xf5, 0x67, 0x2e, 0xe2, 0x99, 0x03, 0xa6, 0xf4, 0xd1, 0xc9, 0x8e, 0xb4, 0x8c,
0x18, 0x12, 0x1d, 0x3c, 0xab, 0x11, 0x86, 0xe3, 0x80, 0x35, 0x74, 0x5f, 0xe7, 0xe3, 0x98, 0xcd,
0x69, 0xa4, 0x2d, 0xde, 0xc7, 0x21, 0xa7, 0xce, 0xd7, 0xc8, 0xd7, 0xc8, 0xf6, 0xb9, 0xaf, 0x57,
0x02, 0x7e, 0x4a, 0x05, 0xa4, 0x31, 0x89, 0x0a, 0xdb, 0x80, 0x94, 0xd5, 0xf3, 0xcd, 0x31, 0x22,
0x81, 0xd9, 0xea, 0xc9, 0xed, 0xdd, 0x7e, 0xec, 0xe5, 0xdd, 0x7e, 0x79, 0x68, 0xfa, 0xa3, 0x69,
0x5f, 0x1a, 0x38, 0xe3, 0xe3, 0x91, 0xee, 0x8d, 0xcc, 0x81, 0x33, 0x71, 0x8f, 0x3d, 0x34, 0xf9,
0x81, 0xfc, 0x48, 0x8a, 0x3e, 0x76, 0x9d, 0x89, 0xdf, 0xc1, 0x2b, 0xb5, 0xa4, 0x15, 0xfc, 0x71,
0x7b, 0x90, 0xb1, 0x1d, 0x03, 0xf5, 0x6c, 0x1d, 0x47, 0x4b, 0xe0, 0x68, 0x19, 0x2d, 0x1d, 0x0c,
0xa8, 0xb8, 0xcf, 0x7d, 0x05, 0x10, 0x8a, 0xe9, 0x99, 0x06, 0xcf, 0x06, 0xb3, 0xd5, 0xfc, 0xc3,
0xdd, 0x7e, 0x26, 0x14, 0xd6, 0xa8, 0x6b, 0x51, 0xfd, 0x1a, 0x86, 0x78, 0xc3, 0x00, 0x1b, 0x88,
0xe4, 0x4a, 0xb0, 0xd1, 0x50, 0x2f, 0x2b, 0x4a, 0xa3, 0x5e, 0x88, 0x09, 0x7b, 0xbf, 0xfc, 0x7e,
0xb0, 0x4b, 0x27, 0x12, 0x20, 0x0d, 0xfb, 0x5a, 0xb7, 0x4c, 0x83, 0x13, 0x81, 0x3d, 0x6b, 0x36,
0xd4, 0x02, 0x23, 0xf0, 0x18, 0xdb, 0x5e, 0xc7, 0xce, 0x1c, 0xd3, 0xe6, 0x0e, 0x21, 0xa9, 0xc8,
0x95, 0x4b, 0xb9, 0x10, 0x17, 0x3e, 0xc1, 0xd0, 0xc7, 0xeb, 0x90, 0x82, 0xf4, 0x6b, 0x24, 0xe4,
0x6e, 0xfe, 0x28, 0xc6, 0xfe, 0xfe, 0xb3, 0x48, 0x14, 0x88, 0xff, 0x30, 0xcb, 0x1a, 0xdb, 0xfe,
0x64, 0xb6, 0x96, 0x09, 0xf3, 0x7c, 0x26, 0xef, 0xad, 0xbe, 0x3c, 0x6c, 0x58, 0x58, 0xbd, 0x69,
0x0f, 0x49, 0x71, 0xd3, 0x5a, 0xd4, 0x15, 0x7f, 0x65, 0x60, 0x33, 0x94, 0xd6, 0x9a, 0x7a, 0xa3,
0xd6, 0xd4, 0xb2, 0x28, 0x55, 0xcc, 0xdb, 0xaa, 0x3a, 0x85, 0x74, 0x98, 0xad, 0x87, 0x53, 0x4c,
0x94, 0xb2, 0x27, 0xbb, 0x4f, 0xd8, 0x2e, 0xa8, 0x9c, 0xb6, 0x00, 0xc5, 0x7f, 0x13, 0x00, 0x1d,
0xbd, 0x6f, 0xa1, 0xb9, 0x6d, 0xa5, 0x15, 0xdb, 0x0a, 0xd4, 0xfa, 0x25, 0xf4, 0xc1, 0x9b, 0x96,
0xfb, 0x0c, 0xc0, 0x0f, 0xe4, 0xce, 0x63, 0x25, 0x49, 0xac, 0x0c, 0x19, 0x21, 0xc1, 0x0a, 0x90,
0xb8, 0x42, 0x33, 0x3e, 0x45, 0xc6, 0x83, 0x26, 0xb7, 0x0d, 0x49, 0xec, 0xdd, 0x29, 0xe2, 0x37,
0xc8, 0x99, 0x9e, 0x77, 0xc4, 0xbf, 0x22, 0xef, 0x1f, 0xd1, 0xde, 0x27, 0x7e, 0x5d, 0x56, 0x83,
0x76, 0xfe, 0x21, 0xa4, 0x6a, 0x9a, 0x5c, 0xe9, 0xc8, 0x91, 0xf7, 0x57, 0xb1, 0xda, 0x04, 0xe9,
0x3e, 0x0a, 0xa8, 0x6e, 0xab, 0x1e, 0x50, 0xf1, 0xa7, 0xa8, 0xae, 0x6b, 0x84, 0x54, 0x5d, 0x56,
0x64, 0x4c, 0x25, 0x9e, 0xa2, 0xea, 0xc8, 0x42, 0xfe, 0xfa, 0x09, 0xf9, 0x1f, 0x1b, 0xac, 0x3a,
0xb5, 0xae, 0xda, 0x33, 0x7b, 0x10, 0xdd, 0x6c, 0xef, 0xd0, 0x60, 0x07, 0x90, 0x9d, 0xda, 0x9e,
0x63, 0x99, 0x03, 0xd3, 0x47, 0x06, 0xd9, 0xf1, 0xb4, 0x46, 0x0f, 0x3d, 0xbf, 0x87, 0x02, 0xe5,
0x4f, 0x16, 0xfb, 0x33, 0xb3, 0xb4, 0x61, 0x70, 0x68, 0x5c, 0x7d, 0x66, 0x39, 0xba, 0x41, 0xb6,
0x2b, 0xa7, 0x45, 0x5d, 0xf1, 0x67, 0x9c, 0x53, 0xcd, 0xc1, 0x5a, 0xa6, 0xb6, 0x11, 0xe5, 0x54,
0x87, 0xf4, 0x78, 0xde, 0xf4, 0x70, 0x56, 0x81, 0xd3, 0x4b, 0x94, 0x53, 0xd7, 0x68, 0xa9, 0x6d,
0x8e, 0x5d, 0x0b, 0x85, 0x3d, 0x6d, 0xb1, 0x52, 0xf8, 0x12, 0xf2, 0x2b, 0x53, 0x81, 0x88, 0x56,
0x28, 0x82, 0x59, 0x11, 0x51, 0xfe, 0x2d, 0x0e, 0x59, 0xea, 0x21, 0xe0, 0x3e, 0xa7, 0x0d, 0xb1,
0x83, 0x77, 0x87, 0xa3, 0x66, 0x23, 0x37, 0x48, 0x90, 0x57, 0xe5, 0xce, 0x77, 0x4d, 0xed, 0xbc,
0x27, 0x5f, 0xca, 0x6a, 0x07, 0x9b, 0x82, 0xdc, 0x9b, 0x14, 0xba, 0xf2, 0x64, 0x94, 0x21, 0xdb,
0xa9, 0x54, 0x15, 0x39, 0xa4, 0xc3, 0x9b, 0x91, 0xa2, 0xa9, 0x73, 0x7a, 0x04, 0x99, 0x56, 0xb7,
0xfd, 0x6d, 0xaf, 0xd5, 0x55, 0x14, 0x6c, 0x90, 0x5d, 0x4c, 0x6e, 0x51, 0xe4, 0xe2, 0x7a, 0xc1,
0x5c, 0xb5, 0xab, 0x9c, 0xf7, 0xda, 0xdf, 0xab, 0xb5, 0x02, 0xfb, 0x88, 0x8b, 0xcc, 0xc2, 0x7d,
0x01, 0xe9, 0x5a, 0xf3, 0xa2, 0xd5, 0xec, 0xaa, 0xf5, 0x42, 0xf2, 0x11, 0x16, 0x55, 0x54, 0xd8,
0x0a, 0xed, 0x46, 0x17, 0xa3, 0xca, 0xbf, 0xb8, 0x2f, 0xc6, 0x5e, 0xdf, 0x17, 0x99, 0x9f, 0x1e,
0x8a, 0xcc, 0x2d, 0xfe, 0xfe, 0xc3, 0xdf, 0x2b, 0xfc, 0xf5, 0x53, 0xe4, 0xb5, 0x3d, 0x7d, 0x13,
0x00, 0x00, 0xff, 0xff, 0x7d, 0x9c, 0x5f, 0x56, 0xa1, 0x07, 0x00, 0x00,
// 887 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xcc, 0x96, 0xc1, 0x6e, 0xe3, 0x44,
0x18, 0xc7, 0xeb, 0xc4, 0x49, 0xe3, 0xaf, 0x0d, 0x1b, 0xbc, 0xdd, 0xad, 0xd7, 0x0b, 0x49, 0x31,
0xcb, 0x2a, 0x44, 0xe0, 0xa2, 0xee, 0x13, 0x24, 0xb1, 0x05, 0xd9, 0xf5, 0x3a, 0x91, 0x93, 0x14,
0x71, 0x8a, 0x9c, 0x78, 0x48, 0xac, 0x3a, 0xb6, 0x15, 0x3b, 0x45, 0x39, 0x81, 0x38, 0xad, 0x78,
0x07, 0x4e, 0xcb, 0x99, 0x07, 0xe0, 0xc0, 0x89, 0xc3, 0x8a, 0x13, 0xdc, 0x10, 0x87, 0x8a, 0xee,
0x13, 0xf0, 0x08, 0x8c, 0xc7, 0x76, 0x32, 0x4e, 0xa3, 0x5e, 0x40, 0xc0, 0xc1, 0xad, 0x67, 0xe6,
0xe7, 0xcf, 0xdf, 0xf7, 0x9f, 0xff, 0xe7, 0x09, 0xdc, 0x71, 0x51, 0xf8, 0x85, 0xb7, 0xb8, 0xb0,
0xc6, 0xb2, 0xbf, 0xf0, 0x42, 0x8f, 0xe7, 0xd6, 0x13, 0xe2, 0xd1, 0xd4, 0x9b, 0x7a, 0x64, 0xf6,
0x34, 0xba, 0x8b, 0x01, 0xa9, 0x0b, 0xe5, 0x8f, 0xbd, 0x20, 0xb0, 0xfd, 0xe7, 0x28, 0x08, 0xcc,
0x29, 0xe2, 0x1b, 0xc0, 0x86, 0x2b, 0x1f, 0x09, 0xcc, 0x09, 0x53, 0x7f, 0xe3, 0xec, 0xbe, 0xbc,
0x89, 0x98, 0x10, 0x03, 0xbc, 0x6a, 0x10, 0x86, 0xe7, 0x81, 0xb5, 0xcc, 0xd0, 0x14, 0x72, 0x98,
0x3d, 0x34, 0xc8, 0xbd, 0xf4, 0x32, 0x07, 0x9c, 0xee, 0x59, 0x48, 0xbd, 0x44, 0x6e, 0xc8, 0x7f,
0x98, 0x89, 0xf6, 0x80, 0x8a, 0xb6, 0x66, 0x64, 0x2a, 0x60, 0x07, 0x8a, 0xce, 0x28, 0xb4, 0xe7,
0x88, 0x84, 0x64, 0x5b, 0x67, 0xaf, 0xae, 0x6a, 0x7b, 0xbf, 0x5f, 0xd5, 0x1a, 0x53, 0x3b, 0x9c,
0x2d, 0xc7, 0xf2, 0xc4, 0x9b, 0x9f, 0xce, 0xcc, 0x60, 0x66, 0x4f, 0xbc, 0x85, 0x7f, 0x1a, 0xa0,
0xc5, 0xe7, 0xe4, 0x8f, 0xac, 0x99, 0x73, 0xdf, 0x5b, 0x84, 0x03, 0xfc, 0xa4, 0x51, 0x70, 0xa2,
0x7f, 0xfc, 0x43, 0xe0, 0x5c, 0xfc, 0x8a, 0x91, 0x6b, 0xe2, 0x68, 0x79, 0x1c, 0x8d, 0x33, 0x4a,
0xd1, 0x84, 0x8e, 0xc7, 0xd2, 0x97, 0xc0, 0x46, 0x6f, 0xe5, 0xdf, 0x83, 0xfd, 0x8e, 0x7e, 0xde,
0xd4, 0x3a, 0x4a, 0x65, 0x4f, 0x14, 0xbe, 0xf9, 0xf6, 0xe4, 0x68, 0x9d, 0x56, 0xb4, 0xde, 0x71,
0x2f, 0x4d, 0xc7, 0xb6, 0xf8, 0x1a, 0xb0, 0x4f, 0xbb, 0x1d, 0xbd, 0xc2, 0x88, 0xf7, 0x30, 0xf3,
0x66, 0x86, 0x79, 0xea, 0xd9, 0x2e, 0xff, 0x0e, 0x14, 0x34, 0xb5, 0x79, 0xae, 0x56, 0x72, 0xe2,
0x7d, 0x4c, 0xf0, 0x19, 0x42, 0x43, 0xe6, 0x25, 0x12, 0x0f, 0x5f, 0xbc, 0xac, 0xee, 0xfd, 0xf0,
0x5d, 0x95, 0xbc, 0x58, 0xba, 0xce, 0xc1, 0xa1, 0x1e, 0x6b, 0x11, 0x0b, 0xf5, 0x51, 0x46, 0xa8,
0xb7, 0x68, 0xa1, 0x28, 0xec, 0x3f, 0xd0, 0x8a, 0xff, 0x00, 0x20, 0x49, 0x66, 0x64, 0x5b, 0x02,
0x1b, 0xad, 0xb6, 0xca, 0xaf, 0xaf, 0x6a, 0x5c, 0x92, 0x58, 0x47, 0x31, 0x52, 0x97, 0x75, 0x2c,
0xe9, 0x05, 0x93, 0x48, 0x5b, 0xa7, 0xa5, 0x7d, 0x88, 0x45, 0x39, 0xa6, 0x0b, 0xa1, 0xd5, 0x95,
0xd6, 0xea, 0xc6, 0x3b, 0xb0, 0x85, 0x11, 0x81, 0x1f, 0x6d, 0x04, 0x7e, 0x80, 0xa1, 0x7b, 0xdb,
0xd0, 0x2e, 0x8d, 0x7f, 0x64, 0x36, 0x1a, 0xbb, 0xe1, 0x62, 0xb5, 0x55, 0x09, 0x73, 0x7b, 0x25,
0xff, 0x9a, 0xbe, 0x02, 0xec, 0x3b, 0x38, 0x7b, 0xdb, 0x9d, 0x12, 0x71, 0x4b, 0x46, 0x3a, 0x94,
0xbe, 0x67, 0xe0, 0x4e, 0x92, 0x5a, 0x6f, 0x19, 0xcc, 0x7a, 0x4b, 0xc7, 0xa1, 0xb2, 0x62, 0xfe,
0x6e, 0x56, 0x4f, 0xa0, 0x94, 0x54, 0x1b, 0xe0, 0x12, 0xf3, 0xf5, 0x83, 0xb3, 0xe3, 0x1d, 0xb6,
0x8b, 0x94, 0x33, 0xd6, 0xe0, 0xed, 0x6d, 0xf5, 0x73, 0x1e, 0x60, 0x60, 0x8e, 0x9d, 0xa4, 0xf9,
0xe5, 0x8c, 0xa7, 0x45, 0x2a, 0xf8, 0x06, 0xfa, 0xdf, 0x3b, 0x9a, 0x7f, 0x1b, 0x20, 0x8c, 0xd2,
0x8d, 0x63, 0x15, 0x48, 0x2c, 0x8e, 0xcc, 0x90, 0x60, 0x15, 0xc8, 0x5f, 0xa0, 0x95, 0x50, 0x24,
0xf3, 0xd1, 0x2d, 0x7f, 0x04, 0x05, 0x6c, 0xec, 0x25, 0x12, 0xf6, 0xc9, 0x67, 0x31, 0x1e, 0x44,
0x9b, 0x19, 0x37, 0xc6, 0x63, 0xba, 0x31, 0x88, 0x99, 0x37, 0x6a, 0xd0, 0x6d, 0xf1, 0x08, 0x8a,
0x6d, 0x43, 0x6d, 0x0e, 0xd4, 0xb4, 0x31, 0xb2, 0x58, 0x7b, 0x81, 0xcc, 0x10, 0x45, 0xd4, 0xb0,
0xa7, 0x44, 0x54, 0x6e, 0x17, 0x35, 0xf4, 0xad, 0x84, 0x52, 0x54, 0x4d, 0xc5, 0x54, 0x7e, 0x17,
0xa5, 0x20, 0x07, 0x85, 0xdb, 0xed, 0xf3, 0x2b, 0x76, 0x5f, 0x6b, 0xe9, 0x5c, 0xf4, 0x57, 0xee,
0x24, 0x3d, 0x1c, 0xfe, 0x41, 0xf7, 0x9d, 0xc0, 0xc1, 0xd2, 0x0d, 0x3c, 0xc7, 0x9e, 0xd8, 0x21,
0xb2, 0xc8, 0x8e, 0x97, 0x0c, 0x7a, 0xea, 0xf6, 0x3d, 0x14, 0x29, 0xf3, 0xb2, 0xd8, 0xbc, 0x1c,
0xe5, 0x51, 0xdc, 0x51, 0xbe, 0xb9, 0x72, 0x3c, 0xd3, 0x22, 0xdb, 0x75, 0x68, 0xa4, 0x43, 0xe9,
0x6b, 0x5c, 0x53, 0xdb, 0xc3, 0xb9, 0x2c, 0x5d, 0x2b, 0xad, 0x49, 0x81, 0xd2, 0x3c, 0xbe, 0x0d,
0x70, 0x55, 0x51, 0x1b, 0xd4, 0x29, 0xa7, 0x6e, 0xd1, 0x72, 0xdf, 0x9e, 0xfb, 0x0e, 0x4a, 0x46,
0xc6, 0xfa, 0x49, 0xf1, 0x7d, 0x28, 0x67, 0x96, 0xa2, 0x24, 0x7a, 0x49, 0x12, 0x4c, 0x26, 0x89,
0xc6, 0x4f, 0x39, 0x38, 0xa0, 0xce, 0x52, 0xfe, 0x5d, 0xda, 0x10, 0xe4, 0xf8, 0xa0, 0x56, 0x53,
0x37, 0xc8, 0x50, 0xd6, 0xd5, 0xc1, 0xa7, 0x5d, 0xe3, 0xd9, 0x48, 0x3d, 0x57, 0xf5, 0x01, 0x36,
0x05, 0xf9, 0xa8, 0x52, 0x68, 0xe6, 0x3c, 0x69, 0xc0, 0xc1, 0xa0, 0xd9, 0xd2, 0xd4, 0x84, 0x4e,
0x3e, 0x9b, 0x14, 0x4d, 0xf5, 0xe9, 0x63, 0xe0, 0x7a, 0xc3, 0xfe, 0x27, 0xa3, 0xde, 0x50, 0xd3,
0xb0, 0x41, 0x8e, 0x31, 0x79, 0x97, 0x22, 0xd7, 0xdf, 0x1e, 0xcc, 0xb5, 0x86, 0xda, 0xb3, 0x51,
0xff, 0x33, 0xbd, 0x5d, 0x61, 0x6f, 0x70, 0xa9, 0x59, 0xf0, 0xa9, 0x5a, 0x6a, 0x77, 0x9f, 0xf7,
0xba, 0x43, 0x5d, 0xa9, 0x14, 0x6e, 0x60, 0xa9, 0xa2, 0xf8, 0x84, 0x00, 0xbd, 0xab, 0xa4, 0x19,
0x16, 0x63, 0x63, 0xd2, 0xf5, 0xa4, 0x87, 0xa8, 0x78, 0x37, 0x31, 0x26, 0x2d, 0x5b, 0x4b, 0xf8,
0xed, 0xba, 0xba, 0xf7, 0xe7, 0x75, 0x95, 0xf9, 0xea, 0x75, 0x95, 0x79, 0x85, 0xaf, 0x5f, 0xf0,
0xf5, 0x07, 0xbe, 0xc6, 0x45, 0xf2, 0xd3, 0xe6, 0xc9, 0x5f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x21,
0x78, 0x72, 0xc3, 0x0e, 0x09, 0x00, 0x00,
}

View file

@ -41,6 +41,10 @@ enum MessageType {
// which is a pack of many message of above types, packed into
// a single compound message.
COMPOUND = 5 [(gogoproto.enumvalue_customname) = "MessageTypeCompound"];
// NodeEvent message type is used to communicare node
// join/leave events in the cluster
NODE_EVENT = 6 [(gogoproto.enumvalue_customname) = "MessageTypeNodeEvent"];
}
// GossipMessage is a basic message header used by all messages types.
@ -49,6 +53,29 @@ message GossipMessage {
bytes data = 2; // Payload of the message of any type defined here.
}
// NodeEvent message payload definition.
message NodeEvent {
enum Type {
option (gogoproto.goproto_enum_prefix) = false;
option (gogoproto.enum_customname) = "Type";
INVALID = 0 [(gogoproto.enumvalue_customname) = "NodeEventTypeInvalid"];
// Join event is generated when this node joins the cluster.
JOIN = 1 [(gogoproto.enumvalue_customname) = "NodeEventTypeJoin"];;
// Leave event is generated when this node leaves the cluster.
LEAVE = 2 [(gogoproto.enumvalue_customname) = "NodeEventTypeLeave"];;
}
Type type = 1;
// Lamport time using a network lamport clock indicating the
// time this event was generated on the node where it was
// generated.
uint64 l_time = 2 [(gogoproto.customtype) = "github.com/hashicorp/serf/serf.LamportTime", (gogoproto.nullable) = false];
// Source node name.
string node_name = 3;
}
// NetworkEvent message payload definition.
message NetworkEvent {
enum Type {

View file

@ -32,7 +32,7 @@ func (n *networkNamespace) findNeighbor(dstIP net.IP, dstMac net.HardwareAddr) *
return nil
}
func (n *networkNamespace) DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr) error {
func (n *networkNamespace) DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr, osDelete bool) error {
var (
iface netlink.Link
err error
@ -43,42 +43,46 @@ func (n *networkNamespace) DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr)
return fmt.Errorf("could not find the neighbor entry to delete")
}
n.Lock()
nlh := n.nlHandle
n.Unlock()
if osDelete {
n.Lock()
nlh := n.nlHandle
n.Unlock()
if nh.linkDst != "" {
iface, err = nlh.LinkByName(nh.linkDst)
if err != nil {
return fmt.Errorf("could not find interface with destination name %s: %v",
nh.linkDst, err)
if nh.linkDst != "" {
iface, err = nlh.LinkByName(nh.linkDst)
if err != nil {
return fmt.Errorf("could not find interface with destination name %s: %v",
nh.linkDst, err)
}
}
nlnh := &netlink.Neigh{
IP: dstIP,
State: netlink.NUD_PERMANENT,
Family: nh.family,
}
if nlnh.Family > 0 {
nlnh.HardwareAddr = dstMac
nlnh.Flags = netlink.NTF_SELF
}
if nh.linkDst != "" {
nlnh.LinkIndex = iface.Attrs().Index
}
if err := nlh.NeighDel(nlnh); err != nil {
return fmt.Errorf("could not delete neighbor entry: %v", err)
}
}
nlnh := &netlink.Neigh{
IP: dstIP,
State: netlink.NUD_PERMANENT,
Family: nh.family,
}
if nlnh.Family > 0 {
nlnh.HardwareAddr = dstMac
nlnh.Flags = netlink.NTF_SELF
}
if nh.linkDst != "" {
nlnh.LinkIndex = iface.Attrs().Index
}
if err := nlh.NeighDel(nlnh); err != nil {
return fmt.Errorf("could not delete neighbor entry: %v", err)
}
n.Lock()
for i, nh := range n.neighbors {
if nh.dstIP.Equal(dstIP) && bytes.Equal(nh.dstMac, dstMac) {
n.neighbors = append(n.neighbors[:i], n.neighbors[i+1:]...)
}
}
n.Unlock()
return nil
}

View file

@ -42,7 +42,7 @@ type Sandbox interface {
AddNeighbor(dstIP net.IP, dstMac net.HardwareAddr, option ...NeighOption) error
// DeleteNeighbor deletes neighbor entry from the sandbox.
DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr) error
DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr, osDelete bool) error
// Returns an interface with methods to set neighbor options.
NeighborOptions() NeighborOptionSetter

View file

@ -354,8 +354,8 @@ func (r *resolver) ServeDNS(w dns.ResponseWriter, query *dns.Msg) {
extConn, err = net.DialTimeout(proto, addr, extIOTimeout)
}
r.sb.execFunc(extConnect)
if err != nil {
execErr := r.sb.execFunc(extConnect)
if execErr != nil || err != nil {
log.Debugf("Connect failed, %s", err)
continue
}

View file

@ -147,7 +147,7 @@ func (sb *sandbox) Key() string {
func (sb *sandbox) Labels() map[string]interface{} {
sb.Lock()
sb.Unlock()
defer sb.Unlock()
opts := make(map[string]interface{}, len(sb.config.generic))
for k, v := range sb.config.generic {
opts[k] = v
@ -202,12 +202,14 @@ func (sb *sandbox) delete(force bool) error {
retain := false
for _, ep := range sb.getConnectedEndpoints() {
// gw network endpoint detach and removal are automatic
if ep.endpointInGWNetwork() {
if ep.endpointInGWNetwork() && !force {
continue
}
// Retain the sanbdox if we can't obtain the network from store.
if _, err := c.getNetworkFromStore(ep.getNetwork().ID()); err != nil {
retain = true
if c.isDistributedControl() {
retain = true
}
log.Warnf("Failed getting network for ep %s during sandbox %s delete: %v", ep.ID(), sb.ID(), err)
continue
}
@ -434,8 +436,8 @@ func (sb *sandbox) ResolveIP(ip string) string {
return svc
}
func (sb *sandbox) execFunc(f func()) {
sb.osSbox.InvokeFunc(f)
func (sb *sandbox) execFunc(f func()) error {
return sb.osSbox.InvokeFunc(f)
}
func (sb *sandbox) ResolveService(name string) ([]*net.SRV, []net.IP, error) {
@ -705,9 +707,12 @@ func (sb *sandbox) SetKey(basePath string) error {
if oldosSbox != nil && sb.resolver != nil {
sb.resolver.Stop()
sb.osSbox.InvokeFunc(sb.resolver.SetupFunc())
if err := sb.resolver.Start(); err != nil {
log.Errorf("Resolver Setup/Start failed for container %s, %q", sb.ContainerID(), err)
if err := sb.osSbox.InvokeFunc(sb.resolver.SetupFunc()); err == nil {
if err := sb.resolver.Start(); err != nil {
log.Errorf("Resolver Start failed for container %s, %q", sb.ContainerID(), err)
}
} else {
log.Errorf("Resolver Setup Function failed for container %s, %q", sb.ContainerID(), err)
}
}

View file

@ -44,9 +44,13 @@ func (sb *sandbox) startResolver(restore bool) {
}
sb.resolver.SetExtServers(sb.extDNS)
sb.osSbox.InvokeFunc(sb.resolver.SetupFunc())
if err = sb.osSbox.InvokeFunc(sb.resolver.SetupFunc()); err != nil {
log.Errorf("Resolver Setup function failed for container %s, %q", sb.ContainerID(), err)
return
}
if err = sb.resolver.Start(); err != nil {
log.Errorf("Resolver Setup/Start failed for container %s, %q", sb.ContainerID(), err)
log.Errorf("Resolver Start failed for container %s, %q", sb.ContainerID(), err)
}
})
}

View file

@ -45,6 +45,9 @@ type service struct {
// List of ingress ports exposed by the service
ingressPorts portConfigs
// Service aliases
aliases []string
sync.Mutex
}

View file

@ -26,17 +26,55 @@ import (
func init() {
reexec.Register("fwmarker", fwMarker)
reexec.Register("redirecter", redirecter)
}
func newService(name string, id string, ingressPorts []*PortConfig) *service {
func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service {
return &service{
name: name,
id: id,
ingressPorts: ingressPorts,
loadBalancers: make(map[string]*loadBalancer),
aliases: aliases,
}
}
func (c *controller) cleanupServiceBindings(cleanupNID string) {
var cleanupFuncs []func()
c.Lock()
for _, s := range c.serviceBindings {
s.Lock()
for nid, lb := range s.loadBalancers {
if cleanupNID != "" && nid != cleanupNID {
continue
}
for eid, ip := range lb.backEnds {
service := s
loadBalancer := lb
networkID := nid
epID := eid
epIP := ip
cleanupFuncs = append(cleanupFuncs, func() {
if err := c.rmServiceBinding(service.name, service.id, networkID, epID, loadBalancer.vip,
service.ingressPorts, service.aliases, epIP); err != nil {
logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v",
service.id, networkID, epID, err)
}
})
}
}
s.Unlock()
}
c.Unlock()
for _, f := range cleanupFuncs {
f()
}
}
func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error {
var (
s *service
@ -58,7 +96,7 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i
if !ok {
// Create a new service if we are seeing this service
// for the first time.
s = newService(name, sid, ingressPorts)
s = newService(name, sid, ingressPorts, aliases)
c.serviceBindings[skey] = s
}
c.Unlock()
@ -230,9 +268,20 @@ func (n *network) connectedLoadbalancers() []*loadBalancer {
func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
var gwIP net.IP
// This is an interface less endpoint. Nothing to do.
if ep.Iface() == nil {
return
}
n := ep.getNetwork()
eIP := ep.Iface().Address()
if n.ingress {
if err := addRedirectRules(sb.Key(), eIP, ep.ingressPorts); err != nil {
logrus.Errorf("Failed to add redirect rules for ep %s: %v", ep.Name(), err)
}
}
if sb.ingress {
// For the ingress sandbox if this is not gateway
// endpoint do nothing.
@ -338,17 +387,17 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*P
}
if addService {
var iPorts []*PortConfig
var filteredPorts []*PortConfig
if sb.ingress {
iPorts = filterPortConfigs(ingressPorts, false)
if err := programIngress(gwIP, iPorts, false); err != nil {
filteredPorts = filterPortConfigs(ingressPorts, false)
if err := programIngress(gwIP, filteredPorts, false); err != nil {
logrus.Errorf("Failed to add ingress: %v", err)
return
}
}
logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, iPorts)
if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, false); err != nil {
logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, ingressPorts)
if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, false); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
return
}
@ -411,15 +460,15 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*Po
logrus.Errorf("Failed to delete a new service for vip %s fwmark %d: %v", vip, fwMark, err)
}
var iPorts []*PortConfig
var filteredPorts []*PortConfig
if sb.ingress {
iPorts = filterPortConfigs(ingressPorts, true)
if err := programIngress(gwIP, iPorts, true); err != nil {
filteredPorts = filterPortConfigs(ingressPorts, true)
if err := programIngress(gwIP, filteredPorts, true); err != nil {
logrus.Errorf("Failed to delete ingress: %v", err)
}
}
if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, true); err != nil {
if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, true); err != nil {
logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
}
}
@ -479,14 +528,19 @@ func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) erro
}
chainExists := iptables.ExistChain(ingressChain, iptables.Nat)
filterChainExists := iptables.ExistChain(ingressChain, iptables.Filter)
ingressOnce.Do(func() {
// Flush nat table and filter table ingress chain rules during init if it
// exists. It might contain stale rules from previous life.
if chainExists {
// Flush ingress chain rules during init if it
// exists. It might contain stale rules from
// previous life.
if err := iptables.RawCombinedOutput("-t", "nat", "-F", ingressChain); err != nil {
logrus.Errorf("Could not flush ingress chain rules during init: %v", err)
logrus.Errorf("Could not flush nat table ingress chain rules during init: %v", err)
}
}
if filterChainExists {
if err := iptables.RawCombinedOutput("-F", ingressChain); err != nil {
logrus.Errorf("Could not flush filter table ingress chain rules during init: %v", err)
}
}
})
@ -497,10 +551,21 @@ func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) erro
return fmt.Errorf("failed to create ingress chain: %v", err)
}
}
if !filterChainExists {
if err := iptables.RawCombinedOutput("-N", ingressChain); err != nil {
return fmt.Errorf("failed to create filter table ingress chain: %v", err)
}
}
if !iptables.Exists(iptables.Nat, ingressChain, "-j", "RETURN") {
if err := iptables.RawCombinedOutput("-t", "nat", "-A", ingressChain, "-j", "RETURN"); err != nil {
return fmt.Errorf("failed to add return rule in ingress chain: %v", err)
return fmt.Errorf("failed to add return rule in nat table ingress chain: %v", err)
}
}
if !iptables.Exists(iptables.Filter, ingressChain, "-j", "RETURN") {
if err := iptables.RawCombinedOutput("-A", ingressChain, "-j", "RETURN"); err != nil {
return fmt.Errorf("failed to add return rule to filter table ingress chain: %v", err)
}
}
@ -512,6 +577,12 @@ func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) erro
}
}
if !iptables.Exists(iptables.Filter, "FORWARD", "-j", ingressChain) {
if err := iptables.RawCombinedOutput("-I", "FORWARD", "-j", ingressChain); err != nil {
return fmt.Errorf("failed to add jump rule to %s in filter table forward chain: %v", ingressChain, err)
}
}
oifName, err := findOIFName(gwIP)
if err != nil {
return fmt.Errorf("failed to find gateway bridge interface name for %s: %v", gwIP, err)
@ -544,6 +615,30 @@ func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) erro
}
}
// Filter table rules to allow a published service to be accessible in the local node from..
// 1) service tasks attached to other networks
// 2) unmanaged containers on bridge networks
rule := strings.Fields(fmt.Sprintf("%s %s -m state -p %s --sport %d --state ESTABLISHED,RELATED -j ACCEPT",
addDelOpt, ingressChain, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort))
if err := iptables.RawCombinedOutput(rule...); err != nil {
errStr := fmt.Sprintf("setting up rule failed, %v: %v", rule, err)
if !isDelete {
return fmt.Errorf("%s", errStr)
}
logrus.Warnf("%s", errStr)
}
rule = strings.Fields(fmt.Sprintf("%s %s -p %s --dport %d -j ACCEPT",
addDelOpt, ingressChain, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort))
if err := iptables.RawCombinedOutput(rule...); err != nil {
errStr := fmt.Sprintf("setting up rule failed, %v: %v", rule, err)
if !isDelete {
return fmt.Errorf("%s", errStr)
}
logrus.Warnf("%s", errStr)
}
if err := plumbProxy(iPort, isDelete); err != nil {
logrus.Warnf("failed to create proxy for port %d: %v", iPort.PublishedPort, err)
}
@ -552,6 +647,22 @@ func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) erro
return nil
}
// In the filter table FORWARD chain first rule should be to jump to INGRESS-CHAIN
// This chain has the rules to allow access to the published ports for swarm tasks
// from local bridge networks and docker_gwbridge (ie:taks on other swarm netwroks)
func arrangeIngressFilterRule() {
if iptables.ExistChain(ingressChain, iptables.Filter) {
if iptables.Exists(iptables.Filter, "FORWARD", "-j", ingressChain) {
if err := iptables.RawCombinedOutput("-D", "FORWARD", "-j", ingressChain); err != nil {
logrus.Warnf("failed to delete jump rule to ingressChain in filter table: %v", err)
}
}
if err := iptables.RawCombinedOutput("-I", "FORWARD", "-j", ingressChain); err != nil {
logrus.Warnf("failed to add jump rule to ingressChain in filter table: %v", err)
}
}
}
func findOIFName(ip net.IP) (string, error) {
nlh := ns.NlHandle()
@ -611,33 +722,57 @@ func plumbProxy(iPort *PortConfig, isDelete bool) error {
return nil
}
func writePortsToFile(ports []*PortConfig) (string, error) {
f, err := ioutil.TempFile("", "port_configs")
if err != nil {
return "", err
}
defer f.Close()
buf, err := proto.Marshal(&EndpointRecord{
IngressPorts: ports,
})
n, err := f.Write(buf)
if err != nil {
return "", err
}
if n < len(buf) {
return "", io.ErrShortWrite
}
return f.Name(), nil
}
func readPortsFromFile(fileName string) ([]*PortConfig, error) {
buf, err := ioutil.ReadFile(fileName)
if err != nil {
return nil, err
}
var epRec EndpointRecord
err = proto.Unmarshal(buf, &epRec)
if err != nil {
return nil, err
}
return epRec.IngressPorts, nil
}
// Invoke fwmarker reexec routine to mark vip destined packets with
// the passed firewall mark.
func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
var ingressPortsFile string
if len(ingressPorts) != 0 {
f, err := ioutil.TempFile("", "port_configs")
var err error
ingressPortsFile, err = writePortsToFile(ingressPorts)
if err != nil {
return err
}
buf, err := proto.Marshal(&EndpointRecord{
IngressPorts: ingressPorts,
})
n, err := f.Write(buf)
if err != nil {
f.Close()
return err
}
if n < len(buf) {
f.Close()
return io.ErrShortWrite
}
ingressPortsFile = f.Name()
f.Close()
defer os.Remove(ingressPortsFile)
}
addDelOpt := "-A"
@ -671,20 +806,12 @@ func fwMarker() {
var ingressPorts []*PortConfig
if os.Args[5] != "" {
buf, err := ioutil.ReadFile(os.Args[5])
var err error
ingressPorts, err = readPortsFromFile(os.Args[5])
if err != nil {
logrus.Errorf("Failed to read ports config file: %v", err)
logrus.Errorf("Failed reading ingress ports file: %v", err)
os.Exit(6)
}
var epRec EndpointRecord
err = proto.Unmarshal(buf, &epRec)
if err != nil {
logrus.Errorf("Failed to unmarshal ports config data: %v", err)
os.Exit(7)
}
ingressPorts = epRec.IngressPorts
}
vip := os.Args[2]
@ -697,11 +824,7 @@ func fwMarker() {
rules := [][]string{}
for _, iPort := range ingressPorts {
rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j REDIRECT --to-port %d",
addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort))
rules = append(rules, rule)
rule = strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
rule := strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, fwMark))
rules = append(rules, rule)
}
@ -748,3 +871,82 @@ func fwMarker() {
}
}
}
func addRedirectRules(path string, eIP *net.IPNet, ingressPorts []*PortConfig) error {
var ingressPortsFile string
if len(ingressPorts) != 0 {
var err error
ingressPortsFile, err = writePortsToFile(ingressPorts)
if err != nil {
return err
}
defer os.Remove(ingressPortsFile)
}
cmd := &exec.Cmd{
Path: reexec.Self(),
Args: append([]string{"redirecter"}, path, eIP.String(), ingressPortsFile),
Stdout: os.Stdout,
Stderr: os.Stderr,
}
if err := cmd.Run(); err != nil {
return fmt.Errorf("reexec failed: %v", err)
}
return nil
}
// Redirecter reexec function.
func redirecter() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
if len(os.Args) < 4 {
logrus.Error("invalid number of arguments..")
os.Exit(1)
}
var ingressPorts []*PortConfig
if os.Args[3] != "" {
var err error
ingressPorts, err = readPortsFromFile(os.Args[3])
if err != nil {
logrus.Errorf("Failed reading ingress ports file: %v", err)
os.Exit(2)
}
}
eIP, _, err := net.ParseCIDR(os.Args[2])
if err != nil {
logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[2], err)
os.Exit(3)
}
rules := [][]string{}
for _, iPort := range ingressPorts {
rule := strings.Fields(fmt.Sprintf("-t nat -A PREROUTING -d %s -p %s --dport %d -j REDIRECT --to-port %d",
eIP.String(), strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort))
rules = append(rules, rule)
}
ns, err := netns.GetFromPath(os.Args[1])
if err != nil {
logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err)
os.Exit(4)
}
defer ns.Close()
if err := netns.Set(ns); err != nil {
logrus.Errorf("setting into container net ns %v failed, %v", os.Args[1], err)
os.Exit(5)
}
for _, rule := range rules {
if err := iptables.RawCombinedOutputNative(rule...); err != nil {
logrus.Errorf("setting up rule failed, %v: %v", rule, err)
os.Exit(5)
}
}
}

View file

@ -7,6 +7,9 @@ import (
"net"
)
func (c *controller) cleanupServiceBindings(nid string) {
}
func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error {
return fmt.Errorf("not supported")
}
@ -17,3 +20,6 @@ func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, in
func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
}
func arrangeIngressFilterRule() {
}

View file

@ -346,6 +346,10 @@ func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, ecCh <-chan da
}
func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoint) {
if !c.isDistributedControl() && ep.getNetwork().driverScope() == datastore.GlobalScope {
return
}
c.Lock()
nw, ok := nmap[ep.getNetwork().ID()]
c.Unlock()
@ -400,6 +404,10 @@ func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoi
}
func (c *controller) processEndpointDelete(nmap map[string]*netWatch, ep *endpoint) {
if !c.isDistributedControl() && ep.getNetwork().driverScope() == datastore.GlobalScope {
return
}
c.Lock()
nw, ok := nmap[ep.getNetwork().ID()]

View file

@ -0,0 +1,36 @@
#!/usr/bin/env bash
# Required tools
DOCKER="${DOCKER:-docker}"
NSENTER="${NSENTER:-nsenter}"
BRIDGE="${BRIDGE:-bridge}"
BRCTL="${BRCTL:-brctl}"
IPTABLES="${IPTABLES:-iptables}"
NSDIR=/var/run/docker/netns
BRIDGEIF=br0
function die {
echo $*
exit 1
}
type -P ${DOCKER} > /dev/null || die "This tool requires the docker binary"
type -P ${NSENTER} > /dev/null || die "This tool requires nsenter"
type -P ${BRIDGE} > /dev/null || die "This tool requires bridge"
type -P ${BRCTL} > /dev/null || die "This tool requires brctl"
type -P ${IPTABLES} > /dev/null || die "This tool requires iptables"
echo "iptables configuration"
${IPTABLES} -n -v -L -t filter
${IPTABLES} -n -v -L -t nat
echo ""
echo "Overlay network configuration"
for networkID in $(${DOCKER} network ls --filter driver=overlay -q) ; do
echo "Network ${networkID}"
nspath=(${NSDIR}/*-$(echo ${networkID}| cut -c1-10))
${NSENTER} --net=${nspath[0]} ${BRIDGE} fdb show ${BRIDGEIF}
${NSENTER} --net=${nspath[0]} ${BRCTL} showmacs ${BRIDGEIF}
echo ""
done