diff --git a/libnetwork/networkdb/broadcast.go b/libnetwork/networkdb/broadcast.go new file mode 100644 index 0000000000..a1c3c61c84 --- /dev/null +++ b/libnetwork/networkdb/broadcast.go @@ -0,0 +1,127 @@ +package networkdb + +import ( + "github.com/hashicorp/memberlist" + "github.com/hashicorp/serf/serf" +) + +type networkEventType uint8 + +const ( + networkJoin networkEventType = 1 + iota + networkLeave +) + +type networkEventData struct { + Event networkEventType + LTime serf.LamportTime + NodeName string + NetworkID string +} + +type networkEventMessage struct { + id string + node string + msg []byte +} + +func (m *networkEventMessage) Invalidates(other memberlist.Broadcast) bool { + otherm := other.(*networkEventMessage) + return m.id == otherm.id && m.node == otherm.node +} + +func (m *networkEventMessage) Message() []byte { + return m.msg +} + +func (m *networkEventMessage) Finished() { +} + +func (nDB *NetworkDB) sendNetworkEvent(nid string, event networkEventType, ltime serf.LamportTime) error { + nEvent := networkEventData{ + Event: event, + LTime: ltime, + NodeName: nDB.config.NodeName, + NetworkID: nid, + } + + raw, err := encodeMessage(networkEventMsg, &nEvent) + if err != nil { + return err + } + + nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{ + msg: raw, + id: nid, + node: nDB.config.NodeName, + }) + return nil +} + +type tableEventType uint8 + +const ( + tableEntryCreate tableEventType = 1 + iota + tableEntryUpdate + tableEntryDelete +) + +type tableEventData struct { + Event tableEventType + LTime serf.LamportTime + NetworkID string + TableName string + NodeName string + Value []byte + Key string +} + +type tableEventMessage struct { + id string + tname string + key string + msg []byte + node string +} + +func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool { + otherm := other.(*tableEventMessage) + return m.id == otherm.id && m.tname == otherm.tname && m.key == otherm.key +} + +func (m *tableEventMessage) Message() []byte { + return m.msg +} + +func (m *tableEventMessage) Finished() { +} + +func (nDB *NetworkDB) sendTableEvent(event tableEventType, nid string, tname string, key string, entry *entry) error { + tEvent := tableEventData{ + Event: event, + LTime: entry.ltime, + NodeName: nDB.config.NodeName, + NetworkID: nid, + TableName: tname, + Key: key, + Value: entry.value, + } + + raw, err := encodeMessage(tableEventMsg, &tEvent) + if err != nil { + return err + } + + nDB.RLock() + broadcastQ := nDB.networks[nDB.config.NodeName][nid].tableBroadcasts + nDB.RUnlock() + + broadcastQ.QueueBroadcast(&tableEventMessage{ + msg: raw, + id: nid, + tname: tname, + key: key, + node: nDB.config.NodeName, + }) + return nil +} diff --git a/libnetwork/networkdb/cluster.go b/libnetwork/networkdb/cluster.go new file mode 100644 index 0000000000..ea3cfa8b1c --- /dev/null +++ b/libnetwork/networkdb/cluster.go @@ -0,0 +1,439 @@ +package networkdb + +import ( + "crypto/rand" + "fmt" + "math/big" + rnd "math/rand" + "strings" + "time" + + "github.com/Sirupsen/logrus" + "github.com/hashicorp/memberlist" + "github.com/hashicorp/serf/serf" +) + +const reapInterval = 2 * time.Second + +type logWriter struct{} + +func (l *logWriter) Write(p []byte) (int, error) { + str := string(p) + + switch { + case strings.Contains(str, "[WARN]"): + logrus.Warn(str) + case strings.Contains(str, "[DEBUG]"): + logrus.Debug(str) + case strings.Contains(str, "[INFO]"): + logrus.Info(str) + case strings.Contains(str, "[ERR]"): + logrus.Warn(str) + } + + return len(p), nil +} + +func (nDB *NetworkDB) clusterInit() error { + config := memberlist.DefaultLANConfig() + config.Name = nDB.config.NodeName + config.BindAddr = nDB.config.BindAddr + if nDB.config.BindPort != 0 { + config.BindPort = nDB.config.BindPort + } + config.ProtocolVersion = memberlist.ProtocolVersionMax + config.Delegate = &delegate{nDB: nDB} + config.Events = &eventDelegate{nDB: nDB} + config.LogOutput = &logWriter{} + + nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{ + NumNodes: func() int { + return len(nDB.nodes) + }, + RetransmitMult: config.RetransmitMult, + } + + mlist, err := memberlist.Create(config) + if err != nil { + return fmt.Errorf("failed to create memberlist: %v", err) + } + + nDB.stopCh = make(chan struct{}) + nDB.memberlist = mlist + nDB.mConfig = config + + for _, trigger := range []struct { + interval time.Duration + fn func() + }{ + {reapInterval, nDB.reapState}, + {config.GossipInterval, nDB.gossip}, + {config.PushPullInterval, nDB.bulkSyncTables}, + } { + t := time.NewTicker(trigger.interval) + go nDB.triggerFunc(trigger.interval, t.C, nDB.stopCh, trigger.fn) + nDB.tickers = append(nDB.tickers, t) + } + + return nil +} + +func (nDB *NetworkDB) clusterJoin(members []string) error { + mlist := nDB.memberlist + + if _, err := mlist.Join(members); err != nil { + return fmt.Errorf("could not join node to memberlist: %v", err) + } + + return nil +} + +func (nDB *NetworkDB) clusterLeave() error { + mlist := nDB.memberlist + + if err := mlist.Leave(time.Second); err != nil { + return err + } + + close(nDB.stopCh) + + for _, t := range nDB.tickers { + t.Stop() + } + + return mlist.Shutdown() +} + +func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) { + // Use a random stagger to avoid syncronizing + randStagger := time.Duration(uint64(rnd.Int63()) % uint64(stagger)) + select { + case <-time.After(randStagger): + case <-stop: + return + } + for { + select { + case <-C: + f() + case <-stop: + return + } + } +} + +func (nDB *NetworkDB) reapState() { + nDB.reapNetworks() + nDB.reapTableEntries() +} + +func (nDB *NetworkDB) reapNetworks() { + now := time.Now() + nDB.Lock() + for name, nn := range nDB.networks { + for id, n := range nn { + if n.leaving && now.Sub(n.leaveTime) > reapInterval { + delete(nn, id) + nDB.deleteNetworkNode(id, name) + } + } + } + nDB.Unlock() +} + +func (nDB *NetworkDB) reapTableEntries() { + var paths []string + + now := time.Now() + + nDB.RLock() + nDB.indexes[byTable].Walk(func(path string, v interface{}) bool { + entry, ok := v.(*entry) + if !ok { + return false + } + + if !entry.deleting || now.Sub(entry.deleteTime) <= reapInterval { + return false + } + + paths = append(paths, path) + return false + }) + nDB.RUnlock() + + nDB.Lock() + for _, path := range paths { + params := strings.Split(path[1:], "/") + tname := params[0] + nid := params[1] + key := params[2] + + if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok { + logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key) + } + + if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok { + logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key) + } + } + nDB.Unlock() +} + +func (nDB *NetworkDB) gossip() { + networkNodes := make(map[string][]string) + nDB.RLock() + for nid := range nDB.networks[nDB.config.NodeName] { + networkNodes[nid] = nDB.networkNodes[nid] + + } + nDB.RUnlock() + + for nid, nodes := range networkNodes { + mNodes := nDB.mRandomNodes(3, nodes) + bytesAvail := udpSendBuf - compoundHeaderOverhead + + nDB.RLock() + broadcastQ := nDB.networks[nDB.config.NodeName][nid].tableBroadcasts + nDB.RUnlock() + + msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail) + if len(msgs) == 0 { + break + } + + // Create a compound message + compound := makeCompoundMessage(msgs) + + for _, node := range mNodes { + nDB.RLock() + mnode := nDB.nodes[node] + nDB.RUnlock() + + if mnode == nil { + break + } + + // Send the compound message + if err := nDB.memberlist.SendToUDP(mnode, compound.Bytes()); err != nil { + logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err) + } + } + } +} + +type bulkSyncMessage struct { + LTime serf.LamportTime + Unsolicited bool + NodeName string + Networks []string + Payload []byte +} + +func (nDB *NetworkDB) bulkSyncTables() { + var networks []string + nDB.RLock() + for nid := range nDB.networks[nDB.config.NodeName] { + networks = append(networks, nid) + } + nDB.RUnlock() + + for { + if len(networks) == 0 { + break + } + + nid := networks[0] + networks = networks[1:] + + completed, err := nDB.bulkSync(nid, false) + if err != nil { + logrus.Errorf("periodic bulk sync failure for network %s: %v", nid, err) + continue + } + + // Remove all the networks for which we have + // successfully completed bulk sync in this iteration. + updatedNetworks := make([]string, 0, len(networks)) + for _, nid := range networks { + for _, completedNid := range completed { + if nid == completedNid { + continue + } + + updatedNetworks = append(updatedNetworks, nid) + } + } + + networks = updatedNetworks + } +} + +func (nDB *NetworkDB) bulkSync(nid string, all bool) ([]string, error) { + nDB.RLock() + nodes := nDB.networkNodes[nid] + nDB.RUnlock() + + if !all { + // If not all, then just pick one. + nodes = nDB.mRandomNodes(1, nodes) + } + + logrus.Debugf("%s: Initiating bulk sync with nodes %v", nDB.config.NodeName, nodes) + var err error + var networks []string + for _, node := range nodes { + if node == nDB.config.NodeName { + continue + } + + networks = nDB.findCommonNetworks(node) + err = nDB.bulkSyncNode(networks, node, true) + if err != nil { + err = fmt.Errorf("bulk sync failed on node %s: %v", node, err) + } + } + + if err != nil { + return nil, err + } + + return networks, nil +} + +// Bulk sync all the table entries belonging to a set of networks to a +// single peer node. It can be unsolicited or can be in response to an +// unsolicited bulk sync +func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited bool) error { + var msgs [][]byte + + logrus.Debugf("%s: Initiating bulk sync for networks %v with node %s", nDB.config.NodeName, networks, node) + + nDB.RLock() + mnode := nDB.nodes[node] + if mnode == nil { + nDB.RUnlock() + return nil + } + + for _, nid := range networks { + nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool { + entry, ok := v.(*entry) + if !ok { + return false + } + + params := strings.Split(path[1:], "/") + tEvent := tableEventData{ + Event: tableEntryCreate, + LTime: entry.ltime, + NodeName: entry.node, + NetworkID: nid, + TableName: params[1], + Key: params[2], + Value: entry.value, + } + + msg, err := encodeMessage(tableEventMsg, &tEvent) + if err != nil { + logrus.Errorf("Encode failure during bulk sync: %#v", tEvent) + return false + } + + msgs = append(msgs, msg) + return false + }) + } + nDB.RUnlock() + + // Create a compound message + compound := makeCompoundMessage(msgs) + + bsm := bulkSyncMessage{ + LTime: nDB.tableClock.Time(), + Unsolicited: unsolicited, + NodeName: nDB.config.NodeName, + Networks: networks, + Payload: compound.Bytes(), + } + + buf, err := encodeMessage(bulkSyncMsg, &bsm) + if err != nil { + return fmt.Errorf("failed to encode bulk sync message: %v", err) + } + + nDB.Lock() + ch := make(chan struct{}) + nDB.bulkSyncAckTbl[node] = ch + nDB.Unlock() + + err = nDB.memberlist.SendToTCP(mnode, buf) + if err != nil { + nDB.Lock() + delete(nDB.bulkSyncAckTbl, node) + nDB.Unlock() + + return fmt.Errorf("failed to send a TCP message during bulk sync: %v", err) + } + + startTime := time.Now() + select { + case <-time.After(30 * time.Second): + logrus.Errorf("Bulk sync to node %s timed out", node) + case <-ch: + nDB.Lock() + delete(nDB.bulkSyncAckTbl, node) + nDB.Unlock() + + logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Now().Sub(startTime)) + } + + return nil +} + +// Returns a random offset between 0 and n +func randomOffset(n int) int { + if n == 0 { + return 0 + } + + val, err := rand.Int(rand.Reader, big.NewInt(int64(n))) + if err != nil { + logrus.Errorf("Failed to get a random offset: %v", err) + return 0 + } + + return int(val.Int64()) +} + +// mRandomNodes is used to select up to m random nodes. It is possible +// that less than m nodes are returned. +func (nDB *NetworkDB) mRandomNodes(m int, nodes []string) []string { + n := len(nodes) + mNodes := make([]string, 0, m) +OUTER: + // Probe up to 3*n times, with large n this is not necessary + // since k << n, but with small n we want search to be + // exhaustive + for i := 0; i < 3*n && len(mNodes) < m; i++ { + // Get random node + idx := randomOffset(n) + node := nodes[idx] + + if node == nDB.config.NodeName { + continue + } + + // Check if we have this node already + for j := 0; j < len(mNodes); j++ { + if node == mNodes[j] { + continue OUTER + } + } + + // Append the node + mNodes = append(mNodes, node) + } + + return mNodes +} diff --git a/libnetwork/networkdb/delegate.go b/libnetwork/networkdb/delegate.go new file mode 100644 index 0000000000..f2c7b2ff76 --- /dev/null +++ b/libnetwork/networkdb/delegate.go @@ -0,0 +1,315 @@ +package networkdb + +import ( + "fmt" + "time" + + "github.com/Sirupsen/logrus" + "github.com/hashicorp/serf/serf" +) + +type networkData struct { + LTime serf.LamportTime + ID string + NodeName string + Leaving bool +} + +type networkPushPull struct { + LTime serf.LamportTime + Networks []networkData +} + +type delegate struct { + nDB *NetworkDB +} + +func (d *delegate) NodeMeta(limit int) []byte { + return []byte{} +} + +func (nDB *NetworkDB) handleNetworkEvent(nEvent *networkEventData) bool { + // Update our local clock if the received messages has newer + // time. + nDB.networkClock.Witness(nEvent.LTime) + + nDB.Lock() + defer nDB.Unlock() + + nodeNetworks, ok := nDB.networks[nEvent.NodeName] + if !ok { + // We haven't heard about this node at all. Ignore the leave + if nEvent.Event == networkLeave { + return false + } + + nodeNetworks = make(map[string]*network) + nDB.networks[nEvent.NodeName] = nodeNetworks + } + + if n, ok := nodeNetworks[nEvent.NetworkID]; ok { + // We have the latest state. Ignore the event + // since it is stale. + if n.ltime >= nEvent.LTime { + return false + } + + n.ltime = nEvent.LTime + n.leaving = nEvent.Event == networkLeave + if n.leaving { + n.leaveTime = time.Now() + } + + return true + } + + if nEvent.Event == networkLeave { + return false + } + + // This remote network join is being seen the first time. + nodeNetworks[nEvent.NetworkID] = &network{ + id: nEvent.NetworkID, + ltime: nEvent.LTime, + } + + nDB.networkNodes[nEvent.NetworkID] = append(nDB.networkNodes[nEvent.NetworkID], nEvent.NodeName) + return true +} + +func (nDB *NetworkDB) handleTableEvent(tEvent *tableEventData) bool { + // Update our local clock if the received messages has newer + // time. + nDB.tableClock.Witness(tEvent.LTime) + + if entry, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key); err == nil { + // We have the latest state. Ignore the event + // since it is stale. + if entry.ltime >= tEvent.LTime { + return false + } + } + + entry := &entry{ + ltime: tEvent.LTime, + node: tEvent.NodeName, + value: tEvent.Value, + deleting: tEvent.Event == tableEntryDelete, + } + + if entry.deleting { + entry.deleteTime = time.Now() + } + + nDB.Lock() + nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), entry) + nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), entry) + nDB.Unlock() + + var op opType + switch tEvent.Event { + case tableEntryCreate: + op = opCreate + case tableEntryUpdate: + op = opUpdate + case tableEntryDelete: + op = opDelete + } + + nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value)) + return true +} + +func (nDB *NetworkDB) handleCompound(buf []byte) { + // Decode the parts + trunc, parts, err := decodeCompoundMessage(buf[1:]) + if err != nil { + logrus.Errorf("Failed to decode compound request: %v", err) + return + } + + // Log any truncation + if trunc > 0 { + logrus.Warnf("Compound request had %d truncated messages", trunc) + } + + // Handle each message + for _, part := range parts { + nDB.handleMessage(part) + } +} + +func (nDB *NetworkDB) handleTableMessage(buf []byte) { + var tEvent tableEventData + if err := decodeMessage(buf[1:], &tEvent); err != nil { + logrus.Errorf("Error decoding table event message: %v", err) + return + } + + if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast { + // Copy the buffer since we cannot rely on the slice not changing + newBuf := make([]byte, len(buf)) + copy(newBuf, buf) + + nDB.RLock() + n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID] + nDB.RUnlock() + + if !ok { + return + } + + broadcastQ := n.tableBroadcasts + broadcastQ.QueueBroadcast(&tableEventMessage{ + msg: newBuf, + id: tEvent.NetworkID, + tname: tEvent.TableName, + key: tEvent.Key, + node: nDB.config.NodeName, + }) + } +} + +func (nDB *NetworkDB) handleNetworkMessage(buf []byte) { + var nEvent networkEventData + if err := decodeMessage(buf[1:], &nEvent); err != nil { + logrus.Errorf("Error decoding network event message: %v", err) + return + } + + if rebroadcast := nDB.handleNetworkEvent(&nEvent); rebroadcast { + // Copy the buffer since it we cannot rely on the slice not changing + newBuf := make([]byte, len(buf)) + copy(newBuf, buf) + + nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{ + msg: newBuf, + id: nEvent.NetworkID, + node: nEvent.NodeName, + }) + } +} + +func (nDB *NetworkDB) handleBulkSync(buf []byte) { + var bsm bulkSyncMessage + if err := decodeMessage(buf[1:], &bsm); err != nil { + logrus.Errorf("Error decoding bulk sync message: %v", err) + return + } + + if bsm.LTime > 0 { + nDB.tableClock.Witness(bsm.LTime) + } + + nDB.handleMessage(bsm.Payload) + + // Don't respond to a bulk sync which was not unsolicited + if !bsm.Unsolicited { + nDB.RLock() + ch, ok := nDB.bulkSyncAckTbl[bsm.NodeName] + nDB.RUnlock() + if ok { + close(ch) + } + + return + } + + if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil { + logrus.Errorf("Error in responding to bulk sync from node %s: %v", nDB.nodes[bsm.NodeName].Addr, err) + } +} + +func (nDB *NetworkDB) handleMessage(buf []byte) { + msgType := messageType(buf[0]) + + switch msgType { + case networkEventMsg: + nDB.handleNetworkMessage(buf) + case tableEventMsg: + nDB.handleTableMessage(buf) + case compoundMsg: + nDB.handleCompound(buf) + case bulkSyncMsg: + nDB.handleBulkSync(buf) + default: + logrus.Errorf("%s: unknown message type %d payload = %v", nDB.config.NodeName, msgType, buf[:8]) + } +} + +func (d *delegate) NotifyMsg(buf []byte) { + if len(buf) == 0 { + return + } + + d.nDB.handleMessage(buf) +} + +func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { + return d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit) +} + +func (d *delegate) LocalState(join bool) []byte { + d.nDB.RLock() + defer d.nDB.RUnlock() + + pp := networkPushPull{ + LTime: d.nDB.networkClock.Time(), + } + + for name, nn := range d.nDB.networks { + for _, n := range nn { + pp.Networks = append(pp.Networks, networkData{ + LTime: n.ltime, + ID: n.id, + NodeName: name, + Leaving: n.leaving, + }) + } + } + + buf, err := encodeMessage(networkPushPullMsg, &pp) + if err != nil { + logrus.Errorf("Failed to encode local network state: %v", err) + return nil + } + + return buf +} + +func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) { + if len(buf) == 0 { + logrus.Error("zero byte remote network state received") + return + } + + if messageType(buf[0]) != networkPushPullMsg { + logrus.Errorf("Invalid message type %v received from remote", buf[0]) + } + + pp := networkPushPull{} + if err := decodeMessage(buf[1:], &pp); err != nil { + logrus.Errorf("Failed to decode remote network state: %v", err) + return + } + + if pp.LTime > 0 { + d.nDB.networkClock.Witness(pp.LTime) + } + + for _, n := range pp.Networks { + nEvent := &networkEventData{ + LTime: n.LTime, + NodeName: n.NodeName, + NetworkID: n.ID, + Event: networkJoin, + } + + if n.Leaving { + nEvent.Event = networkLeave + } + + d.nDB.handleNetworkEvent(nEvent) + } + +} diff --git a/libnetwork/networkdb/event_delegate.go b/libnetwork/networkdb/event_delegate.go new file mode 100644 index 0000000000..4a924482e7 --- /dev/null +++ b/libnetwork/networkdb/event_delegate.go @@ -0,0 +1,23 @@ +package networkdb + +import "github.com/hashicorp/memberlist" + +type eventDelegate struct { + nDB *NetworkDB +} + +func (e *eventDelegate) NotifyJoin(n *memberlist.Node) { + e.nDB.Lock() + e.nDB.nodes[n.Name] = n + e.nDB.Unlock() +} + +func (e *eventDelegate) NotifyLeave(n *memberlist.Node) { + e.nDB.deleteNodeTableEntries(n.Name) + e.nDB.Lock() + delete(e.nDB.nodes, n.Name) + e.nDB.Unlock() +} + +func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) { +} diff --git a/libnetwork/networkdb/message.go b/libnetwork/networkdb/message.go new file mode 100644 index 0000000000..48f69da0e0 --- /dev/null +++ b/libnetwork/networkdb/message.go @@ -0,0 +1,122 @@ +package networkdb + +import ( + "bytes" + "encoding/binary" + "fmt" + + "github.com/hashicorp/go-msgpack/codec" +) + +type messageType uint8 + +const ( + // For network join/leave event message + networkEventMsg messageType = 1 + iota + + // For pushing/pulling network/node association state + networkPushPullMsg + + // For table entry CRUD event message + tableEventMsg + + // For building a compound message which packs many different + // message types together + compoundMsg + + // For syncing table entries in bulk b/w nodes. + bulkSyncMsg +) + +const ( + // Max udp message size chosen to avoid network packet + // fragmentation. + udpSendBuf = 1400 + + // Compound message header overhead 1 byte(message type) + 4 + // bytes (num messages) + compoundHeaderOverhead = 5 + + // Overhead for each embedded message in a compound message 2 + // bytes (len of embedded message) + compoundOverhead = 2 +) + +func decodeMessage(buf []byte, out interface{}) error { + var handle codec.MsgpackHandle + return codec.NewDecoder(bytes.NewReader(buf), &handle).Decode(out) +} + +func encodeMessage(t messageType, msg interface{}) ([]byte, error) { + buf := bytes.NewBuffer(nil) + buf.WriteByte(uint8(t)) + + handle := codec.MsgpackHandle{} + encoder := codec.NewEncoder(buf, &handle) + err := encoder.Encode(msg) + return buf.Bytes(), err +} + +// makeCompoundMessage takes a list of messages and generates +// a single compound message containing all of them +func makeCompoundMessage(msgs [][]byte) *bytes.Buffer { + // Create a local buffer + buf := bytes.NewBuffer(nil) + + // Write out the type + buf.WriteByte(uint8(compoundMsg)) + + // Write out the number of message + binary.Write(buf, binary.BigEndian, uint32(len(msgs))) + + // Add the message lengths + for _, m := range msgs { + binary.Write(buf, binary.BigEndian, uint16(len(m))) + } + + // Append the messages + for _, m := range msgs { + buf.Write(m) + } + + return buf +} + +// decodeCompoundMessage splits a compound message and returns +// the slices of individual messages. Also returns the number +// of truncated messages and any potential error +func decodeCompoundMessage(buf []byte) (trunc int, parts [][]byte, err error) { + if len(buf) < 1 { + err = fmt.Errorf("missing compound length byte") + return + } + numParts := binary.BigEndian.Uint32(buf[0:4]) + buf = buf[4:] + + // Check we have enough bytes + if len(buf) < int(numParts*2) { + err = fmt.Errorf("truncated len slice") + return + } + + // Decode the lengths + lengths := make([]uint16, numParts) + for i := 0; i < int(numParts); i++ { + lengths[i] = binary.BigEndian.Uint16(buf[i*2 : i*2+2]) + } + buf = buf[numParts*2:] + + // Split each message + for idx, msgLen := range lengths { + if len(buf) < int(msgLen) { + trunc = int(numParts) - idx + return + } + + // Extract the slice, seek past on the buffer + slice := buf[:msgLen] + buf = buf[msgLen:] + parts = append(parts, slice) + } + return +} diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go new file mode 100644 index 0000000000..a0ddf2a4f1 --- /dev/null +++ b/libnetwork/networkdb/networkdb.go @@ -0,0 +1,424 @@ +package networkdb + +import ( + "fmt" + "strings" + "sync" + "time" + + "github.com/Sirupsen/logrus" + "github.com/armon/go-radix" + "github.com/docker/go-events" + "github.com/hashicorp/memberlist" + "github.com/hashicorp/serf/serf" +) + +const ( + byTable int = 1 + iota + byNetwork +) + +// NetworkDB instance drives the networkdb cluster and acts the broker +// for cluster-scoped and network-scoped gossip and watches. +type NetworkDB struct { + sync.RWMutex + + // NetworkDB configuration. + config *Config + + // local copy of memberlist config that we use to driver + // network scoped gossip and bulk sync. + mConfig *memberlist.Config + + // All the tree index (byTable, byNetwork) that we maintain + // the db. + indexes map[int]*radix.Tree + + // Memberlist we use to drive the cluster. + memberlist *memberlist.Memberlist + + // List of all peer nodes in the cluster not-limited to any + // network. + nodes map[string]*memberlist.Node + + // A multi-dimensional map of network/node attachmemts. The + // first key is a node name and the second key is a network ID + // for the network that node is participating in. + networks map[string]map[string]*network + + // A map of nodes which are participating in a given + // network. The key is a network ID. + + networkNodes map[string][]string + + // A table of ack channels for every node from which we are + // waiting for an ack. + bulkSyncAckTbl map[string]chan struct{} + + // Global lamport clock for node network attach events. + networkClock serf.LamportClock + + // Global lamport clock for table events. + tableClock serf.LamportClock + + // Broadcast queue for network event gossip. + networkBroadcasts *memberlist.TransmitLimitedQueue + + // A central stop channel to stop all go routines running on + // behalf of the NetworkDB instance. + stopCh chan struct{} + + // A central broadcaster for all local watchers watching table + // events. + broadcaster *events.Broadcaster + + // List of all tickers which needed to be stopped when + // cleaning up. + tickers []*time.Ticker +} + +// network describes the node/network attachment. +type network struct { + // Network ID + id string + + // Lamport time for the latest state of the entry. + ltime serf.LamportTime + + // Node leave is in progress. + leaving bool + + // The time this node knew about the node's network leave. + leaveTime time.Time + + // The broadcast queue for table event gossip. This is only + // initialized for this node's network attachment entries. + tableBroadcasts *memberlist.TransmitLimitedQueue +} + +// Config represents the configuration of the networdb instance and +// can be passed by the caller. +type Config struct { + // NodeName is the cluster wide unique name for this node. + NodeName string + + // BindAddr is the local node's IP address that we bind to for + // cluster communication. + BindAddr string + + // BindPort is the local node's port to which we bind to for + // cluster communication. + BindPort int +} + +// entry defines a table entry +type entry struct { + // node from which this entry was learned. + node string + + // Lamport time for the most recent update to the entry + ltime serf.LamportTime + + // Opaque value store in the entry + value []byte + + // Deleting the entry is in progress. All entries linger in + // the cluster for certain amount of time after deletion. + deleting bool + + // The wall clock time when this node learned about this deletion. + deleteTime time.Time +} + +// New creates a new instance of NetworkDB using the Config passed by +// the caller. +func New(c *Config) (*NetworkDB, error) { + nDB := &NetworkDB{ + config: c, + indexes: make(map[int]*radix.Tree), + networks: make(map[string]map[string]*network), + nodes: make(map[string]*memberlist.Node), + networkNodes: make(map[string][]string), + bulkSyncAckTbl: make(map[string]chan struct{}), + broadcaster: events.NewBroadcaster(), + } + + nDB.indexes[byTable] = radix.New() + nDB.indexes[byNetwork] = radix.New() + + if err := nDB.clusterInit(); err != nil { + return nil, err + } + + return nDB, nil +} + +// Join joins this NetworkDB instance with a list of peer NetworkDB +// instances passed by the caller in the form of addr:port +func (nDB *NetworkDB) Join(members []string) error { + return nDB.clusterJoin(members) +} + +// Close destroys this NetworkDB instance by leave the cluster, +// stopping timers, canceling goroutines etc. +func (nDB *NetworkDB) Close() { + if err := nDB.clusterLeave(); err != nil { + logrus.Errorf("Could not close DB %s: %v", nDB.config.NodeName, err) + } +} + +// GetEntry retrieves the value of a table entry in a given (network, +// table, key) tuple +func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) { + entry, err := nDB.getEntry(tname, nid, key) + if err != nil { + return nil, err + } + + return entry.value, nil +} + +func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) { + nDB.RLock() + defer nDB.RUnlock() + + e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key)) + if !ok { + return nil, fmt.Errorf("could not get entry in table %s with network id %s and key %s", tname, nid, key) + } + + return e.(*entry), nil +} + +// CreateEntry creates a table entry in NetworkDB for given (network, +// table, key) tuple and if the NetworkDB is part of the cluster +// propogates this event to the cluster. It is an error to create an +// entry for the same tuple for which there is already an existing +// entry. +func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error { + if _, err := nDB.GetEntry(tname, nid, key); err == nil { + return fmt.Errorf("cannot create entry as the entry in table %s with network id %s and key %s already exists", tname, nid, key) + } + + entry := &entry{ + ltime: nDB.tableClock.Increment(), + node: nDB.config.NodeName, + value: value, + } + + if err := nDB.sendTableEvent(tableEntryCreate, nid, tname, key, entry); err != nil { + return fmt.Errorf("cannot send table create event: %v", err) + } + + nDB.Lock() + nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) + nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) + nDB.Unlock() + + nDB.broadcaster.Write(makeEvent(opCreate, tname, nid, key, value)) + return nil +} + +// UpdateEntry updates a table entry in NetworkDB for given (network, +// table, key) tuple and if the NetworkDB is part of the cluster +// propogates this event to the cluster. It is an error to update a +// non-existent entry. +func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error { + if _, err := nDB.GetEntry(tname, nid, key); err != nil { + return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key) + } + + entry := &entry{ + ltime: nDB.tableClock.Increment(), + node: nDB.config.NodeName, + value: value, + } + + if err := nDB.sendTableEvent(tableEntryUpdate, nid, tname, key, entry); err != nil { + return fmt.Errorf("cannot send table update event: %v", err) + } + + nDB.Lock() + nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) + nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) + nDB.Unlock() + + nDB.broadcaster.Write(makeEvent(opUpdate, tname, nid, key, value)) + return nil +} + +// DeleteEntry deletes a table entry in NetworkDB for given (network, +// table, key) tuple and if the NetworkDB is part of the cluster +// propogates this event to the cluster. +func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error { + value, err := nDB.GetEntry(tname, nid, key) + if err != nil { + return fmt.Errorf("cannot delete entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key) + } + + entry := &entry{ + ltime: nDB.tableClock.Increment(), + node: nDB.config.NodeName, + value: value, + deleting: true, + deleteTime: time.Now(), + } + + if err := nDB.sendTableEvent(tableEntryDelete, nid, tname, key, entry); err != nil { + return fmt.Errorf("cannot send table delete event: %v", err) + } + + nDB.Lock() + nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) + nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) + nDB.Unlock() + + nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, value)) + return nil +} + +func (nDB *NetworkDB) deleteNodeTableEntries(node string) { + nDB.Lock() + nDB.indexes[byTable].Walk(func(path string, v interface{}) bool { + oldEntry := v.(*entry) + if oldEntry.node != node { + return false + } + + params := strings.Split(path[1:], "/") + tname := params[0] + nid := params[1] + key := params[2] + + entry := &entry{ + ltime: oldEntry.ltime, + node: node, + value: oldEntry.value, + deleting: true, + deleteTime: time.Now(), + } + + nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) + nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) + return false + }) + nDB.Unlock() +} + +// WalkTable walks a single table in NetworkDB and invokes the passed +// function for each entry in the table passing the network, key, +// value. The walk stops if the passed function returns a true. +func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bool) error { + nDB.RLock() + values := make(map[string]interface{}) + nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s", tname), func(path string, v interface{}) bool { + values[path] = v + return false + }) + nDB.RUnlock() + + for k, v := range values { + params := strings.Split(k[1:], "/") + nid := params[1] + key := params[2] + if fn(nid, key, v.(*entry).value) { + return nil + } + } + + return nil +} + +// JoinNetwork joins this node to a given network and propogates this +// event across the cluster. This triggers this node joining the +// sub-cluster of this network and participates in the network-scoped +// gossip and bulk sync for this network. +func (nDB *NetworkDB) JoinNetwork(nid string) error { + ltime := nDB.networkClock.Increment() + + if err := nDB.sendNetworkEvent(nid, networkJoin, ltime); err != nil { + return fmt.Errorf("failed to send leave network event for %s: %v", nid, err) + } + + nDB.Lock() + nodeNetworks, ok := nDB.networks[nDB.config.NodeName] + if !ok { + nodeNetworks = make(map[string]*network) + nDB.networks[nDB.config.NodeName] = nodeNetworks + } + nodeNetworks[nid] = &network{id: nid, ltime: ltime} + nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{ + NumNodes: func() int { + return len(nDB.networkNodes[nid]) + }, + RetransmitMult: 4, + } + nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nDB.config.NodeName) + nDB.Unlock() + + logrus.Debugf("%s: joined network %s", nDB.config.NodeName, nid) + if _, err := nDB.bulkSync(nid, true); err != nil { + logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err) + } + + return nil +} + +// LeaveNetwork leaves this node from a given network and propogates +// this event across the cluster. This triggers this node leaving the +// sub-cluster of this network and as a result will no longer +// participate in the network-scoped gossip and bulk sync for this +// network. +func (nDB *NetworkDB) LeaveNetwork(nid string) error { + ltime := nDB.networkClock.Increment() + if err := nDB.sendNetworkEvent(nid, networkLeave, ltime); err != nil { + return fmt.Errorf("failed to send leave network event for %s: %v", nid, err) + } + + nDB.Lock() + defer nDB.Unlock() + nodeNetworks, ok := nDB.networks[nDB.config.NodeName] + if !ok { + return fmt.Errorf("could not find self node for network %s while trying to leave", nid) + } + + n, ok := nodeNetworks[nid] + if !ok { + return fmt.Errorf("could not find network %s while trying to leave", nid) + } + + n.ltime = ltime + n.leaving = true + return nil +} + +// Deletes the node from the list of nodes which participate in the +// passed network. Caller should hold the NetworkDB lock while calling +// this +func (nDB *NetworkDB) deleteNetworkNode(nid string, nodeName string) { + nodes := nDB.networkNodes[nid] + for i, name := range nodes { + if name == nodeName { + nodes[i] = nodes[len(nodes)-1] + nodes = nodes[:len(nodes)-1] + break + } + } + nDB.networkNodes[nid] = nodes +} + +// findCommonnetworks find the networks that both this node and the +// passed node have joined. +func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string { + nDB.RLock() + defer nDB.RUnlock() + + var networks []string + for nid := range nDB.networks[nDB.config.NodeName] { + if _, ok := nDB.networks[nodeName][nid]; ok { + networks = append(networks, nid) + } + } + + return networks +} diff --git a/libnetwork/networkdb/networkdb_test.go b/libnetwork/networkdb/networkdb_test.go new file mode 100644 index 0000000000..bdcb6c51a5 --- /dev/null +++ b/libnetwork/networkdb/networkdb_test.go @@ -0,0 +1,431 @@ +package networkdb + +import ( + "flag" + "fmt" + "log" + "os" + "sync/atomic" + "testing" + "time" + + "github.com/Sirupsen/logrus" + "github.com/docker/go-events" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + dbPort int32 = 10000 + runningInContainer = flag.Bool("incontainer", false, "Indicates if the test is running in a container") +) + +func TestMain(m *testing.M) { + logrus.SetLevel(logrus.ErrorLevel) + os.Exit(m.Run()) +} + +func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*NetworkDB { + var dbs []*NetworkDB + for i := 0; i < num; i++ { + db, err := New(&Config{ + NodeName: fmt.Sprintf("%s%d", namePrefix, i+1), + BindPort: int(atomic.AddInt32(&dbPort, 1)), + }) + require.NoError(t, err) + + if i != 0 { + err = db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)}) + assert.NoError(t, err) + } + + dbs = append(dbs, db) + } + + return dbs +} + +func closeNetworkDBInstances(dbs []*NetworkDB) { + for _, db := range dbs { + db.Close() + } +} + +func (db *NetworkDB) verifyNodeExistence(t *testing.T, node string, present bool) { + for i := 0; i < 80; i++ { + _, ok := db.nodes[node] + if present && ok { + return + } + + if !present && !ok { + return + } + + time.Sleep(50 * time.Millisecond) + } + + assert.Fail(t, fmt.Sprintf("%s: Node existence verification for node %s failed", db.config.NodeName, node)) +} + +func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) { + for i := 0; i < 80; i++ { + if nn, nnok := db.networks[node]; nnok { + n, ok := nn[id] + if present && ok { + return + } + + if !present && + ((ok && n.leaving) || + !ok) { + return + } + } + + time.Sleep(50 * time.Millisecond) + } + + assert.Fail(t, "Network existence verification failed") +} + +func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value string, present bool) { + n := 80 + for i := 0; i < n; i++ { + entry, err := db.getEntry(tname, nid, key) + if present && err == nil && string(entry.value) == value { + return + } + + if !present && + ((err == nil && entry.deleting) || + (err != nil)) { + return + } + + if i == n-1 && !present && err != nil { + return + } + + time.Sleep(50 * time.Millisecond) + } + + assert.Fail(t, fmt.Sprintf("Entry existence verification test failed for %s", db.config.NodeName)) +} + +func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) { + select { + case rcvdEv := <-ch: + assert.Equal(t, fmt.Sprintf("%T", rcvdEv), fmt.Sprintf("%T", ev)) + switch rcvdEv.(type) { + case CreateEvent: + assert.Equal(t, tname, rcvdEv.(CreateEvent).Table) + assert.Equal(t, nid, rcvdEv.(CreateEvent).NetworkID) + assert.Equal(t, key, rcvdEv.(CreateEvent).Key) + assert.Equal(t, value, string(rcvdEv.(CreateEvent).Value)) + case UpdateEvent: + assert.Equal(t, tname, rcvdEv.(UpdateEvent).Table) + assert.Equal(t, nid, rcvdEv.(UpdateEvent).NetworkID) + assert.Equal(t, key, rcvdEv.(UpdateEvent).Key) + assert.Equal(t, value, string(rcvdEv.(UpdateEvent).Value)) + case DeleteEvent: + assert.Equal(t, tname, rcvdEv.(DeleteEvent).Table) + assert.Equal(t, nid, rcvdEv.(DeleteEvent).NetworkID) + assert.Equal(t, key, rcvdEv.(DeleteEvent).Key) + } + case <-time.After(time.Second): + t.Fail() + return + } +} + +func TestNetworkDBSimple(t *testing.T) { + dbs := createNetworkDBInstances(t, 2, "node") + closeNetworkDBInstances(dbs) +} + +func TestNetworkDBJoinLeaveNetwork(t *testing.T) { + dbs := createNetworkDBInstances(t, 2, "node") + + err := dbs[0].JoinNetwork("network1") + assert.NoError(t, err) + + dbs[1].verifyNetworkExistence(t, "node1", "network1", true) + + err = dbs[0].LeaveNetwork("network1") + assert.NoError(t, err) + + dbs[1].verifyNetworkExistence(t, "node1", "network1", false) + closeNetworkDBInstances(dbs) +} + +func TestNetworkDBJoinLeaveNetworks(t *testing.T) { + dbs := createNetworkDBInstances(t, 2, "node") + + n := 10 + for i := 1; i <= n; i++ { + err := dbs[0].JoinNetwork(fmt.Sprintf("network0%d", i)) + assert.NoError(t, err) + } + + for i := 1; i <= n; i++ { + err := dbs[1].JoinNetwork(fmt.Sprintf("network1%d", i)) + assert.NoError(t, err) + } + + for i := 1; i <= n; i++ { + dbs[1].verifyNetworkExistence(t, "node1", fmt.Sprintf("network0%d", i), true) + } + + for i := 1; i <= n; i++ { + dbs[0].verifyNetworkExistence(t, "node2", fmt.Sprintf("network1%d", i), true) + } + + for i := 1; i <= n; i++ { + err := dbs[0].LeaveNetwork(fmt.Sprintf("network0%d", i)) + assert.NoError(t, err) + } + + for i := 1; i <= n; i++ { + err := dbs[1].LeaveNetwork(fmt.Sprintf("network1%d", i)) + assert.NoError(t, err) + } + + for i := 1; i <= n; i++ { + dbs[1].verifyNetworkExistence(t, "node1", fmt.Sprintf("network0%d", i), false) + } + + for i := 1; i <= n; i++ { + dbs[0].verifyNetworkExistence(t, "node2", fmt.Sprintf("network1%d", i), false) + } + + closeNetworkDBInstances(dbs) +} + +func TestNetworkDBCRUDTableEntry(t *testing.T) { + dbs := createNetworkDBInstances(t, 3, "node") + + err := dbs[0].JoinNetwork("network1") + assert.NoError(t, err) + + dbs[1].verifyNetworkExistence(t, "node1", "network1", true) + + err = dbs[1].JoinNetwork("network1") + assert.NoError(t, err) + + err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value")) + assert.NoError(t, err) + + dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true) + dbs[2].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false) + + err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value")) + assert.NoError(t, err) + + dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true) + + err = dbs[0].DeleteEntry("test_table", "network1", "test_key") + assert.NoError(t, err) + + dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false) + + closeNetworkDBInstances(dbs) +} + +func TestNetworkDBCRUDTableEntries(t *testing.T) { + dbs := createNetworkDBInstances(t, 2, "node") + + err := dbs[0].JoinNetwork("network1") + assert.NoError(t, err) + + dbs[1].verifyNetworkExistence(t, "node1", "network1", true) + + err = dbs[1].JoinNetwork("network1") + assert.NoError(t, err) + + n := 10 + for i := 1; i <= n; i++ { + err = dbs[0].CreateEntry("test_table", "network1", + fmt.Sprintf("test_key0%d", i), + []byte(fmt.Sprintf("test_value0%d", i))) + assert.NoError(t, err) + } + + for i := 1; i <= n; i++ { + err = dbs[1].CreateEntry("test_table", "network1", + fmt.Sprintf("test_key1%d", i), + []byte(fmt.Sprintf("test_value1%d", i))) + assert.NoError(t, err) + } + + for i := 1; i <= n; i++ { + dbs[0].verifyEntryExistence(t, "test_table", "network1", + fmt.Sprintf("test_key1%d", i), + fmt.Sprintf("test_value1%d", i), true) + assert.NoError(t, err) + } + + for i := 1; i <= n; i++ { + dbs[1].verifyEntryExistence(t, "test_table", "network1", + fmt.Sprintf("test_key0%d", i), + fmt.Sprintf("test_value0%d", i), true) + assert.NoError(t, err) + } + + // Verify deletes + for i := 1; i <= n; i++ { + err = dbs[0].DeleteEntry("test_table", "network1", + fmt.Sprintf("test_key0%d", i)) + assert.NoError(t, err) + } + + for i := 1; i <= n; i++ { + err = dbs[1].DeleteEntry("test_table", "network1", + fmt.Sprintf("test_key1%d", i)) + assert.NoError(t, err) + } + + for i := 1; i <= n; i++ { + dbs[0].verifyEntryExistence(t, "test_table", "network1", + fmt.Sprintf("test_key1%d", i), "", false) + assert.NoError(t, err) + } + + for i := 1; i <= n; i++ { + dbs[1].verifyEntryExistence(t, "test_table", "network1", + fmt.Sprintf("test_key0%d", i), "", false) + assert.NoError(t, err) + } + + closeNetworkDBInstances(dbs) +} + +func TestNetworkDBNodeLeave(t *testing.T) { + dbs := createNetworkDBInstances(t, 2, "node") + + err := dbs[0].JoinNetwork("network1") + assert.NoError(t, err) + + err = dbs[1].JoinNetwork("network1") + assert.NoError(t, err) + + err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value")) + assert.NoError(t, err) + + dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true) + + dbs[0].Close() + dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false) + dbs[1].Close() +} + +func TestNetworkDBWatch(t *testing.T) { + dbs := createNetworkDBInstances(t, 2, "node") + err := dbs[0].JoinNetwork("network1") + assert.NoError(t, err) + + err = dbs[1].JoinNetwork("network1") + assert.NoError(t, err) + + ch, cancel := dbs[1].Watch("", "", "") + + err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value")) + assert.NoError(t, err) + + testWatch(t, ch, CreateEvent{}, "test_table", "network1", "test_key", "test_value") + + err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value")) + assert.NoError(t, err) + + testWatch(t, ch, UpdateEvent{}, "test_table", "network1", "test_key", "test_updated_value") + + err = dbs[0].DeleteEntry("test_table", "network1", "test_key") + assert.NoError(t, err) + + testWatch(t, ch, DeleteEvent{}, "test_table", "network1", "test_key", "") + + cancel() + closeNetworkDBInstances(dbs) +} + +func TestNetworkDBBulkSync(t *testing.T) { + dbs := createNetworkDBInstances(t, 2, "node") + + err := dbs[0].JoinNetwork("network1") + assert.NoError(t, err) + + dbs[1].verifyNetworkExistence(t, "node1", "network1", true) + + n := 1000 + for i := 1; i <= n; i++ { + err = dbs[0].CreateEntry("test_table", "network1", + fmt.Sprintf("test_key0%d", i), + []byte(fmt.Sprintf("test_value0%d", i))) + assert.NoError(t, err) + } + + err = dbs[1].JoinNetwork("network1") + assert.NoError(t, err) + + dbs[0].verifyNetworkExistence(t, "node2", "network1", true) + + for i := 1; i <= n; i++ { + dbs[1].verifyEntryExistence(t, "test_table", "network1", + fmt.Sprintf("test_key0%d", i), + fmt.Sprintf("test_value0%d", i), true) + assert.NoError(t, err) + } + + closeNetworkDBInstances(dbs) +} + +func TestNetworkDBCRUDMediumCluster(t *testing.T) { + n := 5 + + dbs := createNetworkDBInstances(t, n, "node") + + for i := 0; i < n; i++ { + for j := 0; j < n; j++ { + if i == j { + continue + } + + dbs[i].verifyNodeExistence(t, fmt.Sprintf("node%d", j+1), true) + } + } + + for i := 0; i < n; i++ { + err := dbs[i].JoinNetwork("network1") + assert.NoError(t, err) + } + + for i := 0; i < n; i++ { + for j := 0; j < n; j++ { + dbs[i].verifyNetworkExistence(t, fmt.Sprintf("node%d", j+1), "network1", true) + } + } + + err := dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value")) + assert.NoError(t, err) + + for i := 1; i < n; i++ { + dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true) + } + + err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value")) + assert.NoError(t, err) + + for i := 1; i < n; i++ { + dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true) + } + + err = dbs[0].DeleteEntry("test_table", "network1", "test_key") + assert.NoError(t, err) + + for i := 1; i < n; i++ { + dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false) + } + + log.Printf("Closing DB instances...") + closeNetworkDBInstances(dbs) +} diff --git a/libnetwork/networkdb/watch.go b/libnetwork/networkdb/watch.go new file mode 100644 index 0000000000..2df00fa54f --- /dev/null +++ b/libnetwork/networkdb/watch.go @@ -0,0 +1,98 @@ +package networkdb + +import "github.com/docker/go-events" + +type opType uint8 + +const ( + opCreate opType = 1 + iota + opUpdate + opDelete +) + +type event struct { + Table string + NetworkID string + Key string + Value []byte +} + +// CreateEvent generates a table entry create event to the watchers +type CreateEvent event + +// UpdateEvent generates a table entry update event to the watchers +type UpdateEvent event + +// DeleteEvent generates a table entry delete event to the watchers +type DeleteEvent event + +// Watch creates a watcher with filters for a particular table or +// network or key or any combination of the tuple. If any of the +// filter is an empty string it acts as a wildcard for that +// field. Watch returns a channel of events, where the events will be +// sent. +func (nDB *NetworkDB) Watch(tname, nid, key string) (chan events.Event, func()) { + var matcher events.Matcher + + if tname != "" || nid != "" || key != "" { + matcher = events.MatcherFunc(func(ev events.Event) bool { + var evt event + switch ev := ev.(type) { + case CreateEvent: + evt = event(ev) + case UpdateEvent: + evt = event(ev) + case DeleteEvent: + evt = event(ev) + } + + if tname != "" && evt.Table != tname { + return false + } + + if nid != "" && evt.NetworkID != nid { + return false + } + + if key != "" && evt.Key != key { + return false + } + + return true + }) + } + + ch := events.NewChannel(0) + sink := events.Sink(events.NewQueue(ch)) + + if matcher != nil { + sink = events.NewFilter(sink, matcher) + } + + nDB.broadcaster.Add(sink) + return ch.C, func() { + nDB.broadcaster.Remove(sink) + ch.Close() + sink.Close() + } +} + +func makeEvent(op opType, tname, nid, key string, value []byte) events.Event { + ev := event{ + Table: tname, + NetworkID: nid, + Key: key, + Value: value, + } + + switch op { + case opCreate: + return CreateEvent(ev) + case opUpdate: + return UpdateEvent(ev) + case opDelete: + return DeleteEvent(ev) + } + + return nil +}