networkdb_test.go 27 KB


  1. package networkdb
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "os"
  7. "strconv"
  8. "sync/atomic"
  9. "testing"
  10. "time"
  11. "github.com/containerd/log"
  12. "github.com/docker/docker/pkg/stringid"
  13. "github.com/docker/go-events"
  14. "github.com/hashicorp/memberlist"
  15. "gotest.tools/v3/assert"
  16. is "gotest.tools/v3/assert/cmp"
  17. "gotest.tools/v3/poll"
  18. )
  19. var dbPort int32 = 10000
  20. func TestMain(m *testing.M) {
  21. os.WriteFile("/proc/sys/net/ipv6/conf/lo/disable_ipv6", []byte{'0', '\n'}, 0o644)
  22. log.SetLevel("error")
  23. os.Exit(m.Run())
  24. }
  25. func launchNode(t *testing.T, conf Config) *NetworkDB {
  26. t.Helper()
  27. db, err := New(&conf)
  28. assert.NilError(t, err)
  29. return db
  30. }
  31. func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
  32. t.Helper()
  33. var dbs []*NetworkDB
  34. for i := 0; i < num; i++ {
  35. localConfig := *conf
  36. localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
  37. localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
  38. localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1))
  39. db := launchNode(t, localConfig)
  40. if i != 0 {
  41. assert.Check(t, db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)}))
  42. }
  43. dbs = append(dbs, db)
  44. }
  45. // Wait till the cluster creation is successful
  46. check := func(t poll.LogT) poll.Result {
  47. // Check that the cluster is properly created
  48. for i := 0; i < num; i++ {
  49. if num != len(dbs[i].ClusterPeers()) {
  50. return poll.Continue("%s:Waiting for cluster peers to be established", dbs[i].config.Hostname)
  51. }
  52. }
  53. return poll.Success()
  54. }
  55. poll.WaitOn(t, check, poll.WithDelay(2*time.Second), poll.WithTimeout(20*time.Second))
  56. return dbs
  57. }
  58. func closeNetworkDBInstances(t *testing.T, dbs []*NetworkDB) {
  59. t.Helper()
  60. log.G(context.TODO()).Print("Closing DB instances...")
  61. for _, db := range dbs {
  62. db.Close()
  63. }
  64. }
  65. func (db *NetworkDB) verifyNodeExistence(t *testing.T, node string, present bool) {
  66. t.Helper()
  67. for i := 0; i < 80; i++ {
  68. db.RLock()
  69. _, ok := db.nodes[node]
  70. db.RUnlock()
  71. if present && ok {
  72. return
  73. }
  74. if !present && !ok {
  75. return
  76. }
  77. time.Sleep(50 * time.Millisecond)
  78. }
  79. t.Errorf("%v(%v): Node existence verification for node %s failed", db.config.Hostname, db.config.NodeID, node)
  80. }
  81. func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) {
  82. t.Helper()
  83. const sleepInterval = 50 * time.Millisecond
  84. var maxRetries int64
  85. if dl, ok := t.Deadline(); ok {
  86. maxRetries = int64(time.Until(dl) / sleepInterval)
  87. } else {
  88. maxRetries = 80
  89. }
  90. for i := int64(0); i < maxRetries; i++ {
  91. db.RLock()
  92. nn, nnok := db.networks[node]
  93. if nnok {
  94. n, ok := nn[id]
  95. var leaving bool
  96. if ok {
  97. leaving = n.leaving
  98. }
  99. db.RUnlock()
  100. if present && ok {
  101. return
  102. }
  103. if !present &&
  104. ((ok && leaving) ||
  105. !ok) {
  106. return
  107. }
  108. } else {
  109. db.RUnlock()
  110. }
  111. time.Sleep(sleepInterval)
  112. }
  113. t.Error("Network existence verification failed")
  114. }
  115. func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value string, present bool) {
  116. t.Helper()
  117. n := 80
  118. for i := 0; i < n; i++ {
  119. v, err := db.GetEntry(tname, nid, key)
  120. if present && err == nil && string(v) == value {
  121. return
  122. }
  123. if err != nil && !present {
  124. return
  125. }
  126. time.Sleep(50 * time.Millisecond)
  127. }
  128. t.Errorf("Entry existence verification test failed for %v(%v)", db.config.Hostname, db.config.NodeID)
  129. }
  130. func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) {
  131. t.Helper()
  132. select {
  133. case rcvdEv := <-ch:
  134. assert.Check(t, is.Equal(fmt.Sprintf("%T", rcvdEv), fmt.Sprintf("%T", ev)))
  135. switch typ := rcvdEv.(type) {
  136. case CreateEvent:
  137. assert.Check(t, is.Equal(tname, typ.Table))
  138. assert.Check(t, is.Equal(nid, typ.NetworkID))
  139. assert.Check(t, is.Equal(key, typ.Key))
  140. assert.Check(t, is.Equal(value, string(typ.Value)))
  141. case UpdateEvent:
  142. assert.Check(t, is.Equal(tname, typ.Table))
  143. assert.Check(t, is.Equal(nid, typ.NetworkID))
  144. assert.Check(t, is.Equal(key, typ.Key))
  145. assert.Check(t, is.Equal(value, string(typ.Value)))
  146. case DeleteEvent:
  147. assert.Check(t, is.Equal(tname, typ.Table))
  148. assert.Check(t, is.Equal(nid, typ.NetworkID))
  149. assert.Check(t, is.Equal(key, typ.Key))
  150. }
  151. case <-time.After(time.Second):
  152. t.Fail()
  153. return
  154. }
  155. }
  156. func TestNetworkDBSimple(t *testing.T) {
  157. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  158. closeNetworkDBInstances(t, dbs)
  159. }
  160. func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
  161. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  162. err := dbs[0].JoinNetwork("network1")
  163. assert.NilError(t, err)
  164. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  165. err = dbs[0].LeaveNetwork("network1")
  166. assert.NilError(t, err)
  167. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", false)
  168. closeNetworkDBInstances(t, dbs)
  169. }
  170. func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
  171. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  172. n := 10
  173. for i := 1; i <= n; i++ {
  174. err := dbs[0].JoinNetwork(fmt.Sprintf("network0%d", i))
  175. assert.NilError(t, err)
  176. }
  177. for i := 1; i <= n; i++ {
  178. err := dbs[1].JoinNetwork(fmt.Sprintf("network1%d", i))
  179. assert.NilError(t, err)
  180. }
  181. for i := 1; i <= n; i++ {
  182. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), true)
  183. }
  184. for i := 1; i <= n; i++ {
  185. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), true)
  186. }
  187. for i := 1; i <= n; i++ {
  188. err := dbs[0].LeaveNetwork(fmt.Sprintf("network0%d", i))
  189. assert.NilError(t, err)
  190. }
  191. for i := 1; i <= n; i++ {
  192. err := dbs[1].LeaveNetwork(fmt.Sprintf("network1%d", i))
  193. assert.NilError(t, err)
  194. }
  195. for i := 1; i <= n; i++ {
  196. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), false)
  197. }
  198. for i := 1; i <= n; i++ {
  199. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), false)
  200. }
  201. closeNetworkDBInstances(t, dbs)
  202. }
  203. func TestNetworkDBCRUDTableEntry(t *testing.T) {
  204. dbs := createNetworkDBInstances(t, 3, "node", DefaultConfig())
  205. err := dbs[0].JoinNetwork("network1")
  206. assert.NilError(t, err)
  207. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  208. err = dbs[1].JoinNetwork("network1")
  209. assert.NilError(t, err)
  210. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  211. assert.NilError(t, err)
  212. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  213. dbs[2].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  214. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  215. assert.NilError(t, err)
  216. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  217. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  218. assert.NilError(t, err)
  219. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  220. closeNetworkDBInstances(t, dbs)
  221. }
  222. func TestNetworkDBCRUDTableEntries(t *testing.T) {
  223. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  224. err := dbs[0].JoinNetwork("network1")
  225. assert.NilError(t, err)
  226. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  227. err = dbs[1].JoinNetwork("network1")
  228. assert.NilError(t, err)
  229. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  230. n := 10
  231. for i := 1; i <= n; i++ {
  232. err = dbs[0].CreateEntry("test_table", "network1",
  233. fmt.Sprintf("test_key0%d", i),
  234. []byte(fmt.Sprintf("test_value0%d", i)))
  235. assert.NilError(t, err)
  236. }
  237. for i := 1; i <= n; i++ {
  238. err = dbs[1].CreateEntry("test_table", "network1",
  239. fmt.Sprintf("test_key1%d", i),
  240. []byte(fmt.Sprintf("test_value1%d", i)))
  241. assert.NilError(t, err)
  242. }
  243. for i := 1; i <= n; i++ {
  244. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  245. fmt.Sprintf("test_key1%d", i),
  246. fmt.Sprintf("test_value1%d", i), true)
  247. assert.NilError(t, err)
  248. }
  249. for i := 1; i <= n; i++ {
  250. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  251. fmt.Sprintf("test_key0%d", i),
  252. fmt.Sprintf("test_value0%d", i), true)
  253. assert.NilError(t, err)
  254. }
  255. // Verify deletes
  256. for i := 1; i <= n; i++ {
  257. err = dbs[0].DeleteEntry("test_table", "network1",
  258. fmt.Sprintf("test_key0%d", i))
  259. assert.NilError(t, err)
  260. }
  261. for i := 1; i <= n; i++ {
  262. err = dbs[1].DeleteEntry("test_table", "network1",
  263. fmt.Sprintf("test_key1%d", i))
  264. assert.NilError(t, err)
  265. }
  266. for i := 1; i <= n; i++ {
  267. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  268. fmt.Sprintf("test_key1%d", i), "", false)
  269. assert.NilError(t, err)
  270. }
  271. for i := 1; i <= n; i++ {
  272. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  273. fmt.Sprintf("test_key0%d", i), "", false)
  274. assert.NilError(t, err)
  275. }
  276. closeNetworkDBInstances(t, dbs)
  277. }
  278. func TestNetworkDBNodeLeave(t *testing.T) {
  279. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  280. err := dbs[0].JoinNetwork("network1")
  281. assert.NilError(t, err)
  282. err = dbs[1].JoinNetwork("network1")
  283. assert.NilError(t, err)
  284. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  285. assert.NilError(t, err)
  286. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  287. dbs[0].Close()
  288. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  289. dbs[1].Close()
  290. }
  291. func TestNetworkDBWatch(t *testing.T) {
  292. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  293. err := dbs[0].JoinNetwork("network1")
  294. assert.NilError(t, err)
  295. err = dbs[1].JoinNetwork("network1")
  296. assert.NilError(t, err)
  297. ch, cancel := dbs[1].Watch("", "")
  298. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  299. assert.NilError(t, err)
  300. testWatch(t, ch.C, CreateEvent{}, "test_table", "network1", "test_key", "test_value")
  301. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  302. assert.NilError(t, err)
  303. testWatch(t, ch.C, UpdateEvent{}, "test_table", "network1", "test_key", "test_updated_value")
  304. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  305. assert.NilError(t, err)
  306. testWatch(t, ch.C, DeleteEvent{}, "test_table", "network1", "test_key", "")
  307. cancel()
  308. closeNetworkDBInstances(t, dbs)
  309. }
  310. func TestNetworkDBBulkSync(t *testing.T) {
  311. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  312. err := dbs[0].JoinNetwork("network1")
  313. assert.NilError(t, err)
  314. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  315. n := 1000
  316. for i := 1; i <= n; i++ {
  317. err = dbs[0].CreateEntry("test_table", "network1",
  318. fmt.Sprintf("test_key0%d", i),
  319. []byte(fmt.Sprintf("test_value0%d", i)))
  320. assert.NilError(t, err)
  321. }
  322. err = dbs[1].JoinNetwork("network1")
  323. assert.NilError(t, err)
  324. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  325. for i := 1; i <= n; i++ {
  326. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  327. fmt.Sprintf("test_key0%d", i),
  328. fmt.Sprintf("test_value0%d", i), true)
  329. assert.NilError(t, err)
  330. }
  331. closeNetworkDBInstances(t, dbs)
  332. }
  333. func TestNetworkDBCRUDMediumCluster(t *testing.T) {
  334. n := 5
  335. dbs := createNetworkDBInstances(t, n, "node", DefaultConfig())
  336. for i := 0; i < n; i++ {
  337. for j := 0; j < n; j++ {
  338. if i == j {
  339. continue
  340. }
  341. dbs[i].verifyNodeExistence(t, dbs[j].config.NodeID, true)
  342. }
  343. }
  344. for i := 0; i < n; i++ {
  345. err := dbs[i].JoinNetwork("network1")
  346. assert.NilError(t, err)
  347. }
  348. for i := 0; i < n; i++ {
  349. for j := 0; j < n; j++ {
  350. dbs[i].verifyNetworkExistence(t, dbs[j].config.NodeID, "network1", true)
  351. }
  352. }
  353. err := dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  354. assert.NilError(t, err)
  355. for i := 1; i < n; i++ {
  356. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  357. }
  358. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  359. assert.NilError(t, err)
  360. for i := 1; i < n; i++ {
  361. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  362. }
  363. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  364. assert.NilError(t, err)
  365. for i := 1; i < n; i++ {
  366. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  367. }
  368. for i := 1; i < n; i++ {
  369. _, err = dbs[i].GetEntry("test_table", "network1", "test_key")
  370. assert.Check(t, is.ErrorContains(err, ""))
  371. assert.Check(t, is.Contains(err.Error(), "deleted and pending garbage collection"), err)
  372. }
  373. closeNetworkDBInstances(t, dbs)
  374. }
  375. func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
  376. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  377. dbChangeWitness := func(db *NetworkDB) func(network string, expectNodeCount int) {
  378. staleNetworkTime := db.networkClock.Time()
  379. return func(network string, expectNodeCount int) {
  380. check := func(t poll.LogT) poll.Result {
  381. networkTime := db.networkClock.Time()
  382. if networkTime <= staleNetworkTime {
  383. return poll.Continue("network time is stale, no change registered yet.")
  384. }
  385. count := -1
  386. db.Lock()
  387. if nodes, ok := db.networkNodes[network]; ok {
  388. count = len(nodes)
  389. }
  390. db.Unlock()
  391. if count != expectNodeCount {
  392. return poll.Continue("current number of nodes is %d, expect %d.", count, expectNodeCount)
  393. }
  394. return poll.Success()
  395. }
  396. t.Helper()
  397. poll.WaitOn(t, check, poll.WithTimeout(3*time.Second), poll.WithDelay(5*time.Millisecond))
  398. }
  399. }
  400. // Single node Join/Leave
  401. witness0 := dbChangeWitness(dbs[0])
  402. err := dbs[0].JoinNetwork("network1")
  403. assert.NilError(t, err)
  404. witness0("network1", 1)
  405. witness0 = dbChangeWitness(dbs[0])
  406. err = dbs[0].LeaveNetwork("network1")
  407. assert.NilError(t, err)
  408. witness0("network1", 0)
  409. // Multiple nodes Join/Leave
  410. witness0, witness1 := dbChangeWitness(dbs[0]), dbChangeWitness(dbs[1])
  411. err = dbs[0].JoinNetwork("network1")
  412. assert.NilError(t, err)
  413. err = dbs[1].JoinNetwork("network1")
  414. assert.NilError(t, err)
  415. // Wait for the propagation on db[0]
  416. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  417. witness0("network1", 2)
  418. if n, ok := dbs[0].networks[dbs[0].config.NodeID]["network1"]; !ok || n.leaving {
  419. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  420. }
  421. // Wait for the propagation on db[1]
  422. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  423. witness1("network1", 2)
  424. if n, ok := dbs[1].networks[dbs[1].config.NodeID]["network1"]; !ok || n.leaving {
  425. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  426. }
  427. // Try a quick leave/join
  428. witness0, witness1 = dbChangeWitness(dbs[0]), dbChangeWitness(dbs[1])
  429. err = dbs[0].LeaveNetwork("network1")
  430. assert.NilError(t, err)
  431. err = dbs[0].JoinNetwork("network1")
  432. assert.NilError(t, err)
  433. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  434. witness0("network1", 2)
  435. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  436. witness1("network1", 2)
  437. closeNetworkDBInstances(t, dbs)
  438. }
  439. func TestNetworkDBGarbageCollection(t *testing.T) {
  440. keysWriteDelete := 5
  441. config := DefaultConfig()
  442. config.reapEntryInterval = 30 * time.Second
  443. config.StatsPrintPeriod = 15 * time.Second
  444. dbs := createNetworkDBInstances(t, 3, "node", config)
  445. // 2 Nodes join network
  446. err := dbs[0].JoinNetwork("network1")
  447. assert.NilError(t, err)
  448. err = dbs[1].JoinNetwork("network1")
  449. assert.NilError(t, err)
  450. for i := 0; i < keysWriteDelete; i++ {
  451. err = dbs[i%2].CreateEntry("testTable", "network1", "key-"+strconv.Itoa(i), []byte("value"))
  452. assert.NilError(t, err)
  453. }
  454. time.Sleep(time.Second)
  455. for i := 0; i < keysWriteDelete; i++ {
  456. err = dbs[i%2].DeleteEntry("testTable", "network1", "key-"+strconv.Itoa(i))
  457. assert.NilError(t, err)
  458. }
  459. for i := 0; i < 2; i++ {
  460. dbs[i].Lock()
  461. assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match")
  462. dbs[i].Unlock()
  463. }
  464. // from this point the timer for the garbage collection started, wait 5 seconds and then join a new node
  465. time.Sleep(5 * time.Second)
  466. err = dbs[2].JoinNetwork("network1")
  467. assert.NilError(t, err)
  468. for i := 0; i < 3; i++ {
  469. dbs[i].Lock()
  470. assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match")
  471. dbs[i].Unlock()
  472. }
  473. // at this point the entries should had been all deleted
  474. time.Sleep(30 * time.Second)
  475. for i := 0; i < 3; i++ {
  476. dbs[i].Lock()
  477. assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected")
  478. dbs[i].Unlock()
  479. }
  480. // make sure that entries are not coming back
  481. time.Sleep(15 * time.Second)
  482. for i := 0; i < 3; i++ {
  483. dbs[i].Lock()
  484. assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected")
  485. dbs[i].Unlock()
  486. }
  487. closeNetworkDBInstances(t, dbs)
  488. }
  489. func TestFindNode(t *testing.T) {
  490. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  491. dbs[0].nodes["active"] = &node{Node: memberlist.Node{Name: "active"}}
  492. dbs[0].failedNodes["failed"] = &node{Node: memberlist.Node{Name: "failed"}}
  493. dbs[0].leftNodes["left"] = &node{Node: memberlist.Node{Name: "left"}}
  494. // active nodes is 2 because the testing node is in the list
  495. assert.Check(t, is.Len(dbs[0].nodes, 2))
  496. assert.Check(t, is.Len(dbs[0].failedNodes, 1))
  497. assert.Check(t, is.Len(dbs[0].leftNodes, 1))
  498. n, currState, m := dbs[0].findNode("active")
  499. assert.Check(t, n != nil)
  500. assert.Check(t, is.Equal("active", n.Name))
  501. assert.Check(t, is.Equal(nodeActiveState, currState))
  502. assert.Check(t, m != nil)
  503. // delete the entry manually
  504. delete(m, "active")
  505. // test if can be still find
  506. n, currState, m = dbs[0].findNode("active")
  507. assert.Check(t, is.Nil(n))
  508. assert.Check(t, is.Equal(nodeNotFound, currState))
  509. assert.Check(t, is.Nil(m))
  510. n, currState, m = dbs[0].findNode("failed")
  511. assert.Check(t, n != nil)
  512. assert.Check(t, is.Equal("failed", n.Name))
  513. assert.Check(t, is.Equal(nodeFailedState, currState))
  514. assert.Check(t, m != nil)
  515. // find and remove
  516. n, currState, m = dbs[0].findNode("left")
  517. assert.Check(t, n != nil)
  518. assert.Check(t, is.Equal("left", n.Name))
  519. assert.Check(t, is.Equal(nodeLeftState, currState))
  520. assert.Check(t, m != nil)
  521. delete(m, "left")
  522. n, currState, m = dbs[0].findNode("left")
  523. assert.Check(t, is.Nil(n))
  524. assert.Check(t, is.Equal(nodeNotFound, currState))
  525. assert.Check(t, is.Nil(m))
  526. closeNetworkDBInstances(t, dbs)
  527. }
  528. func TestChangeNodeState(t *testing.T) {
  529. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  530. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1"}}
  531. dbs[0].nodes["node2"] = &node{Node: memberlist.Node{Name: "node2"}}
  532. dbs[0].nodes["node3"] = &node{Node: memberlist.Node{Name: "node3"}}
  533. // active nodes is 4 because the testing node is in the list
  534. assert.Check(t, is.Len(dbs[0].nodes, 4))
  535. n, currState, m := dbs[0].findNode("node1")
  536. assert.Check(t, n != nil)
  537. assert.Check(t, is.Equal(nodeActiveState, currState))
  538. assert.Check(t, is.Equal("node1", n.Name))
  539. assert.Check(t, m != nil)
  540. // node1 to failed
  541. dbs[0].changeNodeState("node1", nodeFailedState)
  542. n, currState, m = dbs[0].findNode("node1")
  543. assert.Check(t, n != nil)
  544. assert.Check(t, is.Equal(nodeFailedState, currState))
  545. assert.Check(t, is.Equal("node1", n.Name))
  546. assert.Check(t, m != nil)
  547. assert.Check(t, time.Duration(0) != n.reapTime)
  548. // node1 back to active
  549. dbs[0].changeNodeState("node1", nodeActiveState)
  550. n, currState, m = dbs[0].findNode("node1")
  551. assert.Check(t, n != nil)
  552. assert.Check(t, is.Equal(nodeActiveState, currState))
  553. assert.Check(t, is.Equal("node1", n.Name))
  554. assert.Check(t, m != nil)
  555. assert.Check(t, is.Equal(time.Duration(0), n.reapTime))
  556. // node1 to left
  557. dbs[0].changeNodeState("node1", nodeLeftState)
  558. dbs[0].changeNodeState("node2", nodeLeftState)
  559. dbs[0].changeNodeState("node3", nodeLeftState)
  560. n, currState, m = dbs[0].findNode("node1")
  561. assert.Check(t, n != nil)
  562. assert.Check(t, is.Equal(nodeLeftState, currState))
  563. assert.Check(t, is.Equal("node1", n.Name))
  564. assert.Check(t, m != nil)
  565. assert.Check(t, time.Duration(0) != n.reapTime)
  566. n, currState, m = dbs[0].findNode("node2")
  567. assert.Check(t, n != nil)
  568. assert.Check(t, is.Equal(nodeLeftState, currState))
  569. assert.Check(t, is.Equal("node2", n.Name))
  570. assert.Check(t, m != nil)
  571. assert.Check(t, time.Duration(0) != n.reapTime)
  572. n, currState, m = dbs[0].findNode("node3")
  573. assert.Check(t, n != nil)
  574. assert.Check(t, is.Equal(nodeLeftState, currState))
  575. assert.Check(t, is.Equal("node3", n.Name))
  576. assert.Check(t, m != nil)
  577. assert.Check(t, time.Duration(0) != n.reapTime)
  578. // active nodes is 1 because the testing node is in the list
  579. assert.Check(t, is.Len(dbs[0].nodes, 1))
  580. assert.Check(t, is.Len(dbs[0].failedNodes, 0))
  581. assert.Check(t, is.Len(dbs[0].leftNodes, 3))
  582. closeNetworkDBInstances(t, dbs)
  583. }
  584. func TestNodeReincarnation(t *testing.T) {
  585. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  586. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1", Addr: net.ParseIP("192.168.1.1")}}
  587. dbs[0].leftNodes["node2"] = &node{Node: memberlist.Node{Name: "node2", Addr: net.ParseIP("192.168.1.2")}}
  588. dbs[0].failedNodes["node3"] = &node{Node: memberlist.Node{Name: "node3", Addr: net.ParseIP("192.168.1.3")}}
  589. // active nodes is 2 because the testing node is in the list
  590. assert.Check(t, is.Len(dbs[0].nodes, 2))
  591. assert.Check(t, is.Len(dbs[0].failedNodes, 1))
  592. assert.Check(t, is.Len(dbs[0].leftNodes, 1))
  593. dbs[0].Lock()
  594. b := dbs[0].purgeReincarnation(&memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")})
  595. assert.Check(t, b)
  596. dbs[0].nodes["node4"] = &node{Node: memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}}
  597. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.2")})
  598. assert.Check(t, b)
  599. dbs[0].nodes["node5"] = &node{Node: memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.1")}}
  600. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.3")})
  601. assert.Check(t, b)
  602. dbs[0].nodes["node6"] = &node{Node: memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.1")}}
  603. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.10")})
  604. assert.Check(t, !b)
  605. // active nodes is 1 because the testing node is in the list
  606. assert.Check(t, is.Len(dbs[0].nodes, 4))
  607. assert.Check(t, is.Len(dbs[0].failedNodes, 0))
  608. assert.Check(t, is.Len(dbs[0].leftNodes, 3))
  609. dbs[0].Unlock()
  610. closeNetworkDBInstances(t, dbs)
  611. }
  612. func TestParallelCreate(t *testing.T) {
  613. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  614. startCh := make(chan int)
  615. doneCh := make(chan error)
  616. var success int32
  617. for i := 0; i < 20; i++ {
  618. go func() {
  619. <-startCh
  620. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  621. if err == nil {
  622. atomic.AddInt32(&success, 1)
  623. }
  624. doneCh <- err
  625. }()
  626. }
  627. close(startCh)
  628. for i := 0; i < 20; i++ {
  629. <-doneCh
  630. }
  631. close(doneCh)
  632. // Only 1 write should have succeeded
  633. assert.Check(t, is.Equal(int32(1), success))
  634. closeNetworkDBInstances(t, dbs)
  635. }
  636. func TestParallelDelete(t *testing.T) {
  637. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  638. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  639. assert.NilError(t, err)
  640. startCh := make(chan int)
  641. doneCh := make(chan error)
  642. var success int32
  643. for i := 0; i < 20; i++ {
  644. go func() {
  645. <-startCh
  646. err := dbs[0].DeleteEntry("testTable", "testNetwork", "key")
  647. if err == nil {
  648. atomic.AddInt32(&success, 1)
  649. }
  650. doneCh <- err
  651. }()
  652. }
  653. close(startCh)
  654. for i := 0; i < 20; i++ {
  655. <-doneCh
  656. }
  657. close(doneCh)
  658. // Only 1 write should have succeeded
  659. assert.Check(t, is.Equal(int32(1), success))
  660. closeNetworkDBInstances(t, dbs)
  661. }
  662. func TestNetworkDBIslands(t *testing.T) {
  663. pollTimeout := func() time.Duration {
  664. const defaultTimeout = 120 * time.Second
  665. dl, ok := t.Deadline()
  666. if !ok {
  667. return defaultTimeout
  668. }
  669. if d := time.Until(dl); d <= defaultTimeout {
  670. return d
  671. }
  672. return defaultTimeout
  673. }
  674. _ = log.SetLevel("debug")
  675. conf := DefaultConfig()
  676. // Shorten durations to speed up test execution.
  677. conf.rejoinClusterDuration = conf.rejoinClusterDuration / 10
  678. conf.rejoinClusterInterval = conf.rejoinClusterInterval / 10
  679. dbs := createNetworkDBInstances(t, 5, "node", conf)
  680. // Get the node IP used currently
  681. node := dbs[0].nodes[dbs[0].config.NodeID]
  682. baseIPStr := node.Addr.String()
  683. // Node 0,1,2 are going to be the 3 bootstrap nodes
  684. members := []string{
  685. fmt.Sprintf("%s:%d", baseIPStr, dbs[0].config.BindPort),
  686. fmt.Sprintf("%s:%d", baseIPStr, dbs[1].config.BindPort),
  687. fmt.Sprintf("%s:%d", baseIPStr, dbs[2].config.BindPort),
  688. }
  689. // Rejoining will update the list of the bootstrap members
  690. for i := 3; i < 5; i++ {
  691. t.Logf("Re-joining: %d", i)
  692. assert.Check(t, dbs[i].Join(members))
  693. }
  694. // Now the 3 bootstrap nodes will cleanly leave, and will be properly removed from the other 2 nodes
  695. for i := 0; i < 3; i++ {
  696. log.G(context.TODO()).Infof("node %d leaving", i)
  697. dbs[i].Close()
  698. }
  699. checkDBs := make(map[string]*NetworkDB)
  700. for i := 3; i < 5; i++ {
  701. db := dbs[i]
  702. checkDBs[db.config.Hostname] = db
  703. }
  704. // Give some time to let the system propagate the messages and free up the ports
  705. check := func(t poll.LogT) poll.Result {
  706. // Verify that the nodes are actually all gone and marked appropiately
  707. for name, db := range checkDBs {
  708. db.RLock()
  709. if (len(db.leftNodes) != 3) || (len(db.failedNodes) != 0) {
  710. for name := range db.leftNodes {
  711. t.Logf("%s: Node %s left", db.config.Hostname, name)
  712. }
  713. for name := range db.failedNodes {
  714. t.Logf("%s: Node %s failed", db.config.Hostname, name)
  715. }
  716. db.RUnlock()
  717. return poll.Continue("%s:Waiting for all nodes to cleanly leave, left: %d, failed nodes: %d", name, len(db.leftNodes), len(db.failedNodes))
  718. }
  719. db.RUnlock()
  720. t.Logf("%s: OK", name)
  721. delete(checkDBs, name)
  722. }
  723. return poll.Success()
  724. }
  725. poll.WaitOn(t, check, poll.WithDelay(time.Second), poll.WithTimeout(pollTimeout()))
  726. // Spawn again the first 3 nodes with different names but same IP:port
  727. for i := 0; i < 3; i++ {
  728. log.G(context.TODO()).Infof("node %d coming back", i)
  729. conf := *dbs[i].config
  730. conf.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
  731. dbs[i] = launchNode(t, conf)
  732. }
  733. // Give some time for the reconnect routine to run, it runs every 6s.
  734. check = func(t poll.LogT) poll.Result {
  735. // Verify that the cluster is again all connected. Note that the 3 previous node did not do any join
  736. for i := 0; i < 5; i++ {
  737. db := dbs[i]
  738. db.RLock()
  739. if len(db.nodes) != 5 {
  740. db.RUnlock()
  741. return poll.Continue("%s:Waiting to connect to all nodes", dbs[i].config.Hostname)
  742. }
  743. if len(db.failedNodes) != 0 {
  744. db.RUnlock()
  745. return poll.Continue("%s:Waiting for 0 failedNodes", dbs[i].config.Hostname)
  746. }
  747. if i < 3 {
  748. // nodes from 0 to 3 has no left nodes
  749. if len(db.leftNodes) != 0 {
  750. db.RUnlock()
  751. return poll.Continue("%s:Waiting to have no leftNodes", dbs[i].config.Hostname)
  752. }
  753. } else {
  754. // nodes from 4 to 5 has the 3 previous left nodes
  755. if len(db.leftNodes) != 3 {
  756. db.RUnlock()
  757. return poll.Continue("%s:Waiting to have 3 leftNodes", dbs[i].config.Hostname)
  758. }
  759. }
  760. db.RUnlock()
  761. }
  762. return poll.Success()
  763. }
  764. poll.WaitOn(t, check, poll.WithDelay(time.Second), poll.WithTimeout(pollTimeout()))
  765. closeNetworkDBInstances(t, dbs)
  766. }