Vendoring libnetwork

Fix for networkDB garbage collection (PR: https://github.com/docker/libnetwork/pull/1944)
Added extra logs to monitor the netowrkDB status and number of entries per network

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
This commit is contained in:
Flavio Crisciani 2017-09-24 16:44:16 -07:00
parent c982ee805d
commit 04043428ea
No known key found for this signature in database
GPG key ID: 28CAFCE754CF3A48
14 changed files with 642 additions and 445 deletions

View file

@ -30,7 +30,7 @@ golang.org/x/sync de49d9dcd27d4f764488181bea099dfe6179bcf0
github.com/containerd/continuity 22694c680ee48fb8f50015b44618517e2bde77e8
#get libnetwork packages
github.com/docker/libnetwork 60e002dd61885e1cd909582f00f7eb4da634518a
github.com/docker/libnetwork 0f08d31bf0e640e0cdc6d5161227f87602d605c5
github.com/docker/go-events 9461782956ad83b30282bf90e31fa6a70c255ba9
github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec

View file

@ -763,11 +763,7 @@ func (d *driver) createNetwork(config *networkConfiguration) error {
// Apply the prepared list of steps, and abort at the first error.
bridgeSetup.queueStep(setupDeviceUp)
if err = bridgeSetup.apply(); err != nil {
return err
}
return nil
return bridgeSetup.apply()
}
func (d *driver) DeleteNetwork(nid string) error {

View file

@ -169,11 +169,7 @@ func setupIPTablesInternal(bridgeIface string, addr net.Addr, icc, ipmasq, hairp
}
// Set Accept on all non-intercontainer outgoing packets.
if err := programChainRule(outRule, "ACCEPT NON_ICC OUTGOING", enable); err != nil {
return err
}
return nil
return programChainRule(outRule, "ACCEPT NON_ICC OUTGOING", enable)
}
func programChainRule(rule iptRule, ruleDescr string, insert bool) error {
@ -304,10 +300,7 @@ func setupInternalNetworkRules(bridgeIface string, addr net.Addr, icc, insert bo
return err
}
// Set Inter Container Communication.
if err := setIcc(bridgeIface, icc, insert); err != nil {
return err
}
return nil
return setIcc(bridgeIface, icc, insert)
}
func clearEndpointConnections(nlh *netlink.Handle, ep *bridgeEndpoint) {

View file

@ -144,11 +144,7 @@ func (d *driver) deleteEndpointFromStore(e *endpoint) error {
return fmt.Errorf("overlay local store not initialized, ep not deleted")
}
if err := d.localStore.DeleteObjectAtomic(e); err != nil {
return err
}
return nil
return d.localStore.DeleteObjectAtomic(e)
}
func (d *driver) writeEndpointToStore(e *endpoint) error {
@ -156,10 +152,7 @@ func (d *driver) writeEndpointToStore(e *endpoint) error {
return fmt.Errorf("overlay local store not initialized, ep not added")
}
if err := d.localStore.PutObjectAtomic(e); err != nil {
return err
}
return nil
return d.localStore.PutObjectAtomic(e)
}
func (ep *endpoint) DataScope() string {

View file

@ -134,11 +134,7 @@ func (d *driver) deleteEndpointFromStore(e *endpoint) error {
return fmt.Errorf("overlay local store not initialized, ep not deleted")
}
if err := d.localStore.DeleteObjectAtomic(e); err != nil {
return err
}
return nil
return d.localStore.DeleteObjectAtomic(e)
}
func (d *driver) writeEndpointToStore(e *endpoint) error {
@ -146,10 +142,7 @@ func (d *driver) writeEndpointToStore(e *endpoint) error {
return fmt.Errorf("overlay local store not initialized, ep not added")
}
if err := d.localStore.PutObjectAtomic(e); err != nil {
return err
}
return nil
return d.localStore.PutObjectAtomic(e)
}
func (ep *endpoint) DataScope() string {

View file

@ -202,11 +202,7 @@ func (ep *endpoint) Info() EndpointInfo {
return ep
}
if epi := sb.getEndpoint(ep.ID()); epi != nil {
return epi
}
return nil
return sb.getEndpoint(ep.ID())
}
func (ep *endpoint) Iface() InterfaceInfo {

View file

@ -276,11 +276,7 @@ func (c *ChainInfo) Forward(action Action, ip net.IP, port int, proto, destAddr
"--dport", strconv.Itoa(destPort),
"-j", "MASQUERADE",
}
if err := ProgramRule(Nat, "POSTROUTING", action, args); err != nil {
return err
}
return nil
return ProgramRule(Nat, "POSTROUTING", action, args)
}
// Link adds reciprocal ACCEPT rule for two supplied IP addresses.
@ -301,10 +297,7 @@ func (c *ChainInfo) Link(action Action, ip1, ip2 net.IP, port int, proto string,
// reverse
args[7], args[9] = args[9], args[7]
args[10] = "--sport"
if err := ProgramRule(Filter, c.Name, action, args); err != nil {
return err
}
return nil
return ProgramRule(Filter, c.Name, action, args)
}
// ProgramRule adds the rule specified by args only if the

View file

@ -134,6 +134,8 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
TableName: tname,
Key: key,
Value: entry.value,
// The duration in second is a float that below would be truncated
ResidualReapTime: int32(entry.reapTime.Seconds()),
}
raw, err := encodeMessage(MessageTypeTableEvent, &tEvent)

View file

@ -17,11 +17,15 @@ import (
)
const (
reapInterval = 30 * time.Minute
reapPeriod = 5 * time.Second
retryInterval = 1 * time.Second
nodeReapInterval = 24 * time.Hour
nodeReapPeriod = 2 * time.Hour
// The garbage collection logic for entries leverage the presence of the network.
// For this reason the expiration time of the network is put slightly higher than the entry expiration so that
// there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network.
reapEntryInterval = 30 * time.Minute
reapNetworkInterval = reapEntryInterval + 5*reapPeriod
reapPeriod = 5 * time.Second
retryInterval = 1 * time.Second
nodeReapInterval = 24 * time.Hour
nodeReapPeriod = 2 * time.Hour
)
type logWriter struct{}
@ -300,8 +304,9 @@ func (nDB *NetworkDB) reconnectNode() {
// the reaper runs. NOTE nDB.reapTableEntries updates the reapTime with a readlock. This
// is safe as long as no other concurrent path touches the reapTime field.
func (nDB *NetworkDB) reapState() {
nDB.reapNetworks()
// The reapTableEntries leverage the presence of the network so garbage collect entries first
nDB.reapTableEntries()
nDB.reapNetworks()
}
func (nDB *NetworkDB) reapNetworks() {
@ -321,43 +326,51 @@ func (nDB *NetworkDB) reapNetworks() {
}
func (nDB *NetworkDB) reapTableEntries() {
var paths []string
var nodeNetworks []string
// This is best effort, if the list of network changes will be picked up in the next cycle
nDB.RLock()
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
entry, ok := v.(*entry)
if !ok {
return false
}
if !entry.deleting {
return false
}
if entry.reapTime > 0 {
entry.reapTime -= reapPeriod
return false
}
paths = append(paths, path)
return false
})
for nid := range nDB.networks[nDB.config.NodeName] {
nodeNetworks = append(nodeNetworks, nid)
}
nDB.RUnlock()
nDB.Lock()
for _, path := range paths {
params := strings.Split(path[1:], "/")
tname := params[0]
nid := params[1]
key := params[2]
cycleStart := time.Now()
// In order to avoid blocking the database for a long time, apply the garbage collection logic by network
// The lock is taken at the beginning of the cycle and the deletion is inline
for _, nid := range nodeNetworks {
nDB.Lock()
nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool {
// timeCompensation compensate in case the lock took some time to be released
timeCompensation := time.Since(cycleStart)
entry, ok := v.(*entry)
if !ok || !entry.deleting {
return false
}
if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
}
// In this check we are adding an extra 1 second to guarantee that when the number is truncated to int32 to fit the packet
// for the tableEvent the number is always strictly > 1 and never 0
if entry.reapTime > reapPeriod+timeCompensation+time.Second {
entry.reapTime -= reapPeriod + timeCompensation
return false
}
if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
}
params := strings.Split(path[1:], "/")
nid := params[0]
tname := params[1]
key := params[2]
okTable, okNetwork := nDB.deleteEntry(nid, tname, key)
if !okTable {
logrus.Errorf("Table tree delete failed, entry with key:%s does not exists in the table:%s network:%s", key, tname, nid)
}
if !okNetwork {
logrus.Errorf("Network tree delete failed, entry with key:%s does not exists in the network:%s table:%s", key, nid, tname)
}
return false
})
nDB.Unlock()
}
nDB.Unlock()
}
func (nDB *NetworkDB) gossip() {
@ -406,8 +419,9 @@ func (nDB *NetworkDB) gossip() {
// Collect stats and print the queue info, note this code is here also to have a view of the queues empty
network.qMessagesSent += len(msgs)
if printStats {
logrus.Infof("NetworkDB stats - Queue net:%s qLen:%d netPeers:%d netMsg/s:%d",
nid, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
logrus.Infof("NetworkDB stats - netID:%s leaving:%t netPeers:%d entries:%d Queue qLen:%d netMsg/s:%d",
nid, network.leaving, broadcastQ.NumNodes(), network.entriesNumber, broadcastQ.NumQueued(),
network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second)))
network.qMessagesSent = 0
}
@ -572,6 +586,8 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b
TableName: params[1],
Key: params[2],
Value: entry.value,
// The duration in second is a float that below would be truncated
ResidualReapTime: int32(entry.reapTime.Seconds()),
}
msg, err := encodeMessage(MessageTypeTableEvent, &tEvent)

View file

@ -1,9 +1,9 @@
package networkdb
import (
"fmt"
"net"
"strings"
"time"
"github.com/gogo/protobuf/proto"
"github.com/sirupsen/logrus"
@ -165,7 +165,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
n.ltime = nEvent.LTime
n.leaving = nEvent.Type == NetworkEventTypeLeave
if n.leaving {
n.reapTime = reapInterval
n.reapTime = reapNetworkInterval
// The remote node is leaving the network, but not the gossip cluster.
// Mark all its entries in deleted state, this will guarantee that
@ -198,8 +198,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
}
func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
// Update our local clock if the received messages has newer
// time.
// Update our local clock if the received messages has newer time.
nDB.tableClock.Witness(tEvent.LTime)
// Ignore the table events for networks that are in the process of going away
@ -235,20 +234,26 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
node: tEvent.NodeName,
value: tEvent.Value,
deleting: tEvent.Type == TableEventTypeDelete,
reapTime: time.Duration(tEvent.ResidualReapTime) * time.Second,
}
if e.deleting {
e.reapTime = reapInterval
// All the entries marked for deletion should have a reapTime set greater than 0
// This case can happen if the cluster is running different versions of the engine where the old version does not have the
// field. If that is not the case, this can be a BUG
if e.deleting && e.reapTime == 0 {
logrus.Warnf("handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", tEvent)
e.reapTime = reapEntryInterval
}
nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), e)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e)
nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e)
nDB.Unlock()
if err != nil && tEvent.Type == TableEventTypeDelete {
// If it is a delete event and we didn't have the entry here don't repropagate
return true
// If it is a delete event and we did not have a state for it, don't propagate to the application
// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
// most likely the cluster is already aware of it, if not who will sync with this node will catch the state too.
return e.reapTime > reapPeriod/6
}
var op opType
@ -303,22 +308,17 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID]
nDB.RUnlock()
if !ok {
// if the network is not there anymore, OR we are leaving the network OR the broadcast queue is not present
if !ok || n.leaving || n.tableBroadcasts == nil {
return
}
broadcastQ := n.tableBroadcasts
if broadcastQ == nil {
return
}
broadcastQ.QueueBroadcast(&tableEventMessage{
n.tableBroadcasts.QueueBroadcast(&tableEventMessage{
msg: buf,
id: tEvent.NetworkID,
tname: tEvent.TableName,
key: tEvent.Key,
node: nDB.config.NodeName,
node: tEvent.NodeName,
})
}
}

View file

@ -141,6 +141,11 @@ type network struct {
// Number of gossip messages sent related to this network during the last stats collection period
qMessagesSent int
// Number of entries on the network. This value is the sum of all the entries of all the tables of a specific network.
// Its use is for statistics purposes. It keep tracks of database size and is printed per network every StatsPrintPeriod
// interval
entriesNumber int
}
// Config represents the configuration of the networdb instance and
@ -338,8 +343,7 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
}
nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()
return nil
@ -365,8 +369,7 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
}
nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()
return nil
@ -402,7 +405,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
node: nDB.config.NodeName,
value: value,
deleting: true,
reapTime: reapInterval,
reapTime: reapEntryInterval,
}
if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
@ -410,8 +413,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
}
nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
nDB.Unlock()
return nil
@ -473,10 +475,10 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
entry := &entry{
ltime: oldEntry.ltime,
node: node,
node: oldEntry.node,
value: oldEntry.value,
deleting: true,
reapTime: reapInterval,
reapTime: reapEntryInterval,
}
// we arrived at this point in 2 cases:
@ -488,12 +490,10 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
// without doing a delete of all the objects
entry.ltime++
}
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.createOrUpdateEntry(nid, tname, key, entry)
} else {
// the local node is leaving the network, all the entries of remote nodes can be safely removed
nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
nDB.deleteEntry(nid, tname, key)
}
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value))
@ -513,8 +513,7 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
nid := params[1]
key := params[2]
nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
nDB.deleteEntry(nid, tname, key)
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value))
return false
@ -558,7 +557,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
nodeNetworks = make(map[string]*network)
nDB.networks[nDB.config.NodeName] = nodeNetworks
}
nodeNetworks[nid] = &network{id: nid, ltime: ltime}
n, ok := nodeNetworks[nid]
var entries int
if ok {
entries = n.entriesNumber
}
nodeNetworks[nid] = &network{id: nid, ltime: ltime, entriesNumber: entries}
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
nDB.RLock()
@ -567,6 +571,7 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error {
},
RetransmitMult: 4,
}
nDB.addNetworkNode(nid, nDB.config.NodeName)
networkNodes := nDB.networkNodes[nid]
nDB.Unlock()
@ -614,8 +619,9 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
return fmt.Errorf("could not find network %s while trying to leave", nid)
}
logrus.Debugf("%s: leaving network %s", nDB.config.NodeName, nid)
n.ltime = ltime
n.reapTime = reapInterval
n.reapTime = reapNetworkInterval
n.leaving = true
return nil
}
@ -679,3 +685,33 @@ func (nDB *NetworkDB) updateLocalNetworkTime() {
n.ltime = ltime
}
}
// createOrUpdateEntry this function handles the creation or update of entries into the local
// tree store. It is also used to keep in sync the entries number of the network (all tables are aggregated)
func (nDB *NetworkDB) createOrUpdateEntry(nid, tname, key string, entry interface{}) (bool, bool) {
_, okTable := nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
_, okNetwork := nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
if !okNetwork {
// Add only if it is an insert not an update
n, ok := nDB.networks[nDB.config.NodeName][nid]
if ok {
n.entriesNumber++
}
}
return okTable, okNetwork
}
// deleteEntry this function handles the deletion of entries into the local tree store.
// It is also used to keep in sync the entries number of the network (all tables are aggregated)
func (nDB *NetworkDB) deleteEntry(nid, tname, key string) (bool, bool) {
_, okTable := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
_, okNetwork := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key))
if okNetwork {
// Remove only if the delete is successful
n, ok := nDB.networks[nDB.config.NodeName][nid]
if ok {
n.entriesNumber--
}
}
return okTable, okNetwork
}

File diff suppressed because it is too large Load diff

View file

@ -109,7 +109,7 @@ message NetworkEntry {
// network event was recorded.
uint64 l_time = 2 [(gogoproto.customtype) = "github.com/hashicorp/serf/serf.LamportTime", (gogoproto.nullable) = false];
// Source node name where this network attachment happened.
string node_name = 3;
string node_name = 3 [(gogoproto.customname) = "NodeName"];
// Indicates if a leave from this network is in progress.
bool leaving = 4;
}
@ -119,6 +119,8 @@ message NetworkPushPull {
// Lamport time when this push pull was initiated.
uint64 l_time = 1 [(gogoproto.customtype) = "github.com/hashicorp/serf/serf.LamportTime", (gogoproto.nullable) = false];
repeated NetworkEntry networks = 2;
// Name of the node sending this push pull payload.
string node_name = 3 [(gogoproto.customname) = "NodeName"];
}
// TableEvent message payload definition.
@ -152,6 +154,8 @@ message TableEvent {
string key = 6;
// Entry value.
bytes value = 7;
// Residual reap time for the entry before getting deleted in seconds
int32 residual_reap_time = 8 [(gogoproto.customname) = "ResidualReapTime"];;
}
// BulkSync message payload definition.
@ -180,4 +184,4 @@ message CompoundMessage {
// A list of simple messages.
repeated SimpleMessage messages = 1;
}
}

View file

@ -67,11 +67,7 @@ func (sb *sandbox) setupResolutionFiles() error {
return err
}
if err := sb.setupDNS(); err != nil {
return err
}
return nil
return sb.setupDNS()
}
func (sb *sandbox) buildHostsFile() error {