Add test to confirm garbage collection

- Create a test to verify that a node that joins
  in an async way is not going to extend the life
  of a already deleted object

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
This commit is contained in:
Flavio Crisciani 2017-09-26 10:29:33 -07:00
parent 26ad4ebbd8
commit 7fbaf6de2c
4 changed files with 106 additions and 37 deletions

View file

@ -17,15 +17,10 @@ import (
)
const (
// The garbage collection logic for entries leverage the presence of the network.
// For this reason the expiration time of the network is put slightly higher than the entry expiration so that
// there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network.
reapEntryInterval = 30 * time.Minute
reapNetworkInterval = reapEntryInterval + 5*reapPeriod
reapPeriod = 5 * time.Second
retryInterval = 1 * time.Second
nodeReapInterval = 24 * time.Hour
nodeReapPeriod = 2 * time.Hour
reapPeriod = 5 * time.Second
retryInterval = 1 * time.Second
nodeReapInterval = 24 * time.Hour
nodeReapPeriod = 2 * time.Hour
)
type logWriter struct{}

View file

@ -93,14 +93,14 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
nDB.nodes[n.Name] = n
nDB.Unlock()
if !found {
logrus.Infof("Node join event for %s/%s", n.Name, n.Addr)
logrus.Infof("%v(%v): Node join event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
}
return true
case NodeEventTypeLeave:
nDB.Lock()
nDB.leftNodes[n.Name] = n
nDB.Unlock()
logrus.Infof("Node leave event for %s/%s", n.Name, n.Addr)
logrus.Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr)
return true
}
@ -140,7 +140,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool {
n.ltime = nEvent.LTime
n.leaving = nEvent.Type == NetworkEventTypeLeave
if n.leaving {
n.reapTime = reapNetworkInterval
n.reapTime = nDB.config.reapNetworkInterval
// The remote node is leaving the network, but not the gossip cluster.
// Mark all its entries in deleted state, this will guarantee that
@ -216,8 +216,9 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
// This case can happen if the cluster is running different versions of the engine where the old version does not have the
// field. If that is not the case, this can be a BUG
if e.deleting && e.reapTime == 0 {
logrus.Warnf("handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", tEvent)
e.reapTime = reapEntryInterval
logrus.Warnf("%v(%v) handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?",
nDB.config.Hostname, nDB.config.NodeID, tEvent)
e.reapTime = nDB.config.reapEntryInterval
}
nDB.Lock()
@ -229,7 +230,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
// If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around
// most likely the cluster is already aware of it, if not who will sync with this node will catch the state too.
// This also avoids that deletion of entries close to their garbage collection ends up circuling around forever
return e.reapTime > reapEntryInterval/6
return e.reapTime > nDB.config.reapEntryInterval/6
}
var op opType

View file

@ -181,6 +181,13 @@ type Config struct {
// be able to increase this to get more content into each gossip packet.
PacketBufferSize int
// reapEntryInterval duration of a deleted entry before being garbage collected
reapEntryInterval time.Duration
// reapNetworkInterval duration of a delted network before being garbage collected
// NOTE this MUST always be higher than reapEntryInterval
reapNetworkInterval time.Duration
// StatsPrintPeriod the period to use to print queue stats
// Default is 5min
StatsPrintPeriod time.Duration
@ -220,12 +227,18 @@ func DefaultConfig() *Config {
PacketBufferSize: 1400,
StatsPrintPeriod: 5 * time.Minute,
HealthPrintPeriod: 1 * time.Minute,
reapEntryInterval: 30 * time.Minute,
}
}
// New creates a new instance of NetworkDB using the Config passed by
// the caller.
func New(c *Config) (*NetworkDB, error) {
// The garbage collection logic for entries leverage the presence of the network.
// For this reason the expiration time of the network is put slightly higher than the entry expiration so that
// there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network.
c.reapNetworkInterval = c.reapEntryInterval + 5*reapPeriod
nDB := &NetworkDB{
config: c,
indexes: make(map[int]*radix.Tree),
@ -241,7 +254,7 @@ func New(c *Config) (*NetworkDB, error) {
nDB.indexes[byTable] = radix.New()
nDB.indexes[byNetwork] = radix.New()
logrus.Debugf("New memberlist node - Node:%v will use memberlist nodeID:%v", c.Hostname, c.NodeID)
logrus.Infof("New memberlist node - Node:%v will use memberlist nodeID:%v with config:%+v", c.Hostname, c.NodeID, c)
if err := nDB.clusterInit(); err != nil {
return nil, err
}
@ -414,7 +427,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
node: nDB.config.NodeID,
value: value,
deleting: true,
reapTime: reapEntryInterval,
reapTime: nDB.config.reapEntryInterval,
}
if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil {
@ -487,7 +500,7 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) {
node: oldEntry.node,
value: oldEntry.value,
deleting: true,
reapTime: reapEntryInterval,
reapTime: nDB.config.reapEntryInterval,
}
// we arrived at this point in 2 cases:
@ -632,7 +645,7 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error {
logrus.Debugf("%v(%v): leaving network %s", nDB.config.Hostname, nDB.config.NodeID, nid)
n.ltime = ltime
n.reapTime = reapNetworkInterval
n.reapTime = nDB.config.reapNetworkInterval
n.leaving = true
return nil
}

View file

@ -10,6 +10,7 @@ import (
"testing"
"time"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/go-events"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
@ -27,13 +28,14 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}
func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*NetworkDB {
func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
var dbs []*NetworkDB
for i := 0; i < num; i++ {
conf := DefaultConfig()
conf.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
conf.BindPort = int(atomic.AddInt32(&dbPort, 1))
db, err := New(conf)
localConfig := *conf
localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1))
db, err := New(&localConfig)
require.NoError(t, err)
if i != 0 {
@ -44,10 +46,19 @@ func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*Netwo
dbs = append(dbs, db)
}
// Check that the cluster is properly created
for i := 0; i < num; i++ {
if num != len(dbs[i].ClusterPeers()) {
t.Fatalf("Number of nodes for %s into the cluster does not match %d != %d",
dbs[i].config.Hostname, num, len(dbs[i].ClusterPeers()))
}
}
return dbs
}
func closeNetworkDBInstances(dbs []*NetworkDB) {
log.Print("Closing DB instances...")
for _, db := range dbs {
db.Close()
}
@ -147,12 +158,12 @@ func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, k
}
func TestNetworkDBSimple(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
closeNetworkDBInstances(dbs)
}
func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
@ -167,7 +178,7 @@ func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
}
func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
n := 10
for i := 1; i <= n; i++ {
@ -210,7 +221,7 @@ func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
}
func TestNetworkDBCRUDTableEntry(t *testing.T) {
dbs := createNetworkDBInstances(t, 3, "node")
dbs := createNetworkDBInstances(t, 3, "node", DefaultConfig())
err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
@ -240,7 +251,7 @@ func TestNetworkDBCRUDTableEntry(t *testing.T) {
}
func TestNetworkDBCRUDTableEntries(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
@ -308,7 +319,7 @@ func TestNetworkDBCRUDTableEntries(t *testing.T) {
}
func TestNetworkDBNodeLeave(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
@ -327,7 +338,7 @@ func TestNetworkDBNodeLeave(t *testing.T) {
}
func TestNetworkDBWatch(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
@ -356,7 +367,7 @@ func TestNetworkDBWatch(t *testing.T) {
}
func TestNetworkDBBulkSync(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
@ -389,7 +400,7 @@ func TestNetworkDBBulkSync(t *testing.T) {
func TestNetworkDBCRUDMediumCluster(t *testing.T) {
n := 5
dbs := createNetworkDBInstances(t, n, "node")
dbs := createNetworkDBInstances(t, n, "node", DefaultConfig())
for i := 0; i < n; i++ {
for j := 0; j < n; j++ {
@ -433,13 +444,12 @@ func TestNetworkDBCRUDMediumCluster(t *testing.T) {
dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
}
log.Print("Closing DB instances...")
closeNetworkDBInstances(dbs)
}
func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
maxRetry := 5
dbs := createNetworkDBInstances(t, 2, "node")
dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
// Single node Join/Leave
err := dbs[0].JoinNetwork("network1")
@ -517,6 +527,56 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"])
}
dbs[0].Close()
dbs[1].Close()
closeNetworkDBInstances(dbs)
}
func TestNetworkDBGarbageCollection(t *testing.T) {
keysWriteDelete := 5
config := DefaultConfig()
config.reapEntryInterval = 30 * time.Second
config.StatsPrintPeriod = 15 * time.Second
dbs := createNetworkDBInstances(t, 3, "node", config)
// 2 Nodes join network
err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
err = dbs[1].JoinNetwork("network1")
assert.NoError(t, err)
for i := 0; i < keysWriteDelete; i++ {
err = dbs[i%2].CreateEntry("testTable", "network1", "key-"+string(i), []byte("value"))
assert.NoError(t, err)
}
time.Sleep(time.Second)
for i := 0; i < keysWriteDelete; i++ {
err = dbs[i%2].DeleteEntry("testTable", "network1", "key-"+string(i))
assert.NoError(t, err)
}
for i := 0; i < 2; i++ {
assert.Equal(t, keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries number should match")
}
// from this point the timer for the garbage collection started, wait 5 seconds and then join a new node
time.Sleep(5 * time.Second)
err = dbs[2].JoinNetwork("network1")
assert.NoError(t, err)
for i := 0; i < 3; i++ {
assert.Equal(t, keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries number should match")
}
// at this point the entries should had been all deleted
time.Sleep(30 * time.Second)
for i := 0; i < 3; i++ {
assert.Equal(t, 0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries should had been garbage collected")
}
// make sure that entries are not coming back
time.Sleep(15 * time.Second)
for i := 0; i < 3; i++ {
assert.Equal(t, 0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries should had been garbage collected")
}
closeNetworkDBInstances(dbs)
}