networkdb_test.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928
  1. package networkdb
  2. import (
  3. "fmt"
  4. "log"
  5. "net"
  6. "os"
  7. "strconv"
  8. "sync/atomic"
  9. "testing"
  10. "time"
  11. "github.com/docker/docker/pkg/stringid"
  12. "github.com/docker/go-events"
  13. "github.com/hashicorp/memberlist"
  14. "github.com/sirupsen/logrus"
  15. "gotest.tools/v3/assert"
  16. is "gotest.tools/v3/assert/cmp"
  17. "gotest.tools/v3/poll"
  18. )
  19. var dbPort int32 = 10000
  20. func TestMain(m *testing.M) {
  21. os.WriteFile("/proc/sys/net/ipv6/conf/lo/disable_ipv6", []byte{'0', '\n'}, 0644)
  22. logrus.SetLevel(logrus.ErrorLevel)
  23. os.Exit(m.Run())
  24. }
  25. func launchNode(t *testing.T, conf Config) *NetworkDB {
  26. t.Helper()
  27. db, err := New(&conf)
  28. assert.NilError(t, err)
  29. return db
  30. }
  31. func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
  32. t.Helper()
  33. var dbs []*NetworkDB
  34. for i := 0; i < num; i++ {
  35. localConfig := *conf
  36. localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
  37. localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
  38. localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1))
  39. db := launchNode(t, localConfig)
  40. if i != 0 {
  41. assert.Check(t, db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)}))
  42. }
  43. dbs = append(dbs, db)
  44. }
  45. // Wait till the cluster creation is successful
  46. check := func(t poll.LogT) poll.Result {
  47. // Check that the cluster is properly created
  48. for i := 0; i < num; i++ {
  49. if num != len(dbs[i].ClusterPeers()) {
  50. return poll.Continue("%s:Waiting for cluster peers to be established", dbs[i].config.Hostname)
  51. }
  52. }
  53. return poll.Success()
  54. }
  55. poll.WaitOn(t, check, poll.WithDelay(2*time.Second), poll.WithTimeout(20*time.Second))
  56. return dbs
  57. }
  58. func closeNetworkDBInstances(t *testing.T, dbs []*NetworkDB) {
  59. t.Helper()
  60. log.Print("Closing DB instances...")
  61. for _, db := range dbs {
  62. db.Close()
  63. }
  64. }
  65. func (db *NetworkDB) verifyNodeExistence(t *testing.T, node string, present bool) {
  66. t.Helper()
  67. for i := 0; i < 80; i++ {
  68. db.RLock()
  69. _, ok := db.nodes[node]
  70. db.RUnlock()
  71. if present && ok {
  72. return
  73. }
  74. if !present && !ok {
  75. return
  76. }
  77. time.Sleep(50 * time.Millisecond)
  78. }
  79. t.Errorf("%v(%v): Node existence verification for node %s failed", db.config.Hostname, db.config.NodeID, node)
  80. }
  81. func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) {
  82. t.Helper()
  83. const sleepInterval = 50 * time.Millisecond
  84. var maxRetries int64
  85. if dl, ok := t.Deadline(); ok {
  86. maxRetries = int64(time.Until(dl) / sleepInterval)
  87. } else {
  88. maxRetries = 80
  89. }
  90. for i := int64(0); i < maxRetries; i++ {
  91. db.RLock()
  92. nn, nnok := db.networks[node]
  93. db.RUnlock()
  94. if nnok {
  95. n, ok := nn[id]
  96. if present && ok {
  97. return
  98. }
  99. if !present &&
  100. ((ok && n.leaving) ||
  101. !ok) {
  102. return
  103. }
  104. }
  105. time.Sleep(sleepInterval)
  106. }
  107. t.Error("Network existence verification failed")
  108. }
  109. func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value string, present bool) {
  110. t.Helper()
  111. n := 80
  112. for i := 0; i < n; i++ {
  113. entry, err := db.getEntry(tname, nid, key)
  114. if present && err == nil && string(entry.value) == value {
  115. return
  116. }
  117. if !present &&
  118. ((err == nil && entry.deleting) ||
  119. (err != nil)) {
  120. return
  121. }
  122. if i == n-1 && !present && err != nil {
  123. return
  124. }
  125. time.Sleep(50 * time.Millisecond)
  126. }
  127. t.Errorf("Entry existence verification test failed for %v(%v)", db.config.Hostname, db.config.NodeID)
  128. }
  129. func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) {
  130. t.Helper()
  131. select {
  132. case rcvdEv := <-ch:
  133. assert.Check(t, is.Equal(fmt.Sprintf("%T", rcvdEv), fmt.Sprintf("%T", ev)))
  134. switch typ := rcvdEv.(type) {
  135. case CreateEvent:
  136. assert.Check(t, is.Equal(tname, typ.Table))
  137. assert.Check(t, is.Equal(nid, typ.NetworkID))
  138. assert.Check(t, is.Equal(key, typ.Key))
  139. assert.Check(t, is.Equal(value, string(typ.Value)))
  140. case UpdateEvent:
  141. assert.Check(t, is.Equal(tname, typ.Table))
  142. assert.Check(t, is.Equal(nid, typ.NetworkID))
  143. assert.Check(t, is.Equal(key, typ.Key))
  144. assert.Check(t, is.Equal(value, string(typ.Value)))
  145. case DeleteEvent:
  146. assert.Check(t, is.Equal(tname, typ.Table))
  147. assert.Check(t, is.Equal(nid, typ.NetworkID))
  148. assert.Check(t, is.Equal(key, typ.Key))
  149. }
  150. case <-time.After(time.Second):
  151. t.Fail()
  152. return
  153. }
  154. }
  155. func TestNetworkDBSimple(t *testing.T) {
  156. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  157. closeNetworkDBInstances(t, dbs)
  158. }
  159. func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
  160. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  161. err := dbs[0].JoinNetwork("network1")
  162. assert.NilError(t, err)
  163. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  164. err = dbs[0].LeaveNetwork("network1")
  165. assert.NilError(t, err)
  166. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", false)
  167. closeNetworkDBInstances(t, dbs)
  168. }
  169. func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
  170. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  171. n := 10
  172. for i := 1; i <= n; i++ {
  173. err := dbs[0].JoinNetwork(fmt.Sprintf("network0%d", i))
  174. assert.NilError(t, err)
  175. }
  176. for i := 1; i <= n; i++ {
  177. err := dbs[1].JoinNetwork(fmt.Sprintf("network1%d", i))
  178. assert.NilError(t, err)
  179. }
  180. for i := 1; i <= n; i++ {
  181. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), true)
  182. }
  183. for i := 1; i <= n; i++ {
  184. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), true)
  185. }
  186. for i := 1; i <= n; i++ {
  187. err := dbs[0].LeaveNetwork(fmt.Sprintf("network0%d", i))
  188. assert.NilError(t, err)
  189. }
  190. for i := 1; i <= n; i++ {
  191. err := dbs[1].LeaveNetwork(fmt.Sprintf("network1%d", i))
  192. assert.NilError(t, err)
  193. }
  194. for i := 1; i <= n; i++ {
  195. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), false)
  196. }
  197. for i := 1; i <= n; i++ {
  198. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), false)
  199. }
  200. closeNetworkDBInstances(t, dbs)
  201. }
  202. func TestNetworkDBCRUDTableEntry(t *testing.T) {
  203. dbs := createNetworkDBInstances(t, 3, "node", DefaultConfig())
  204. err := dbs[0].JoinNetwork("network1")
  205. assert.NilError(t, err)
  206. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  207. err = dbs[1].JoinNetwork("network1")
  208. assert.NilError(t, err)
  209. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  210. assert.NilError(t, err)
  211. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  212. dbs[2].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  213. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  214. assert.NilError(t, err)
  215. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  216. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  217. assert.NilError(t, err)
  218. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  219. closeNetworkDBInstances(t, dbs)
  220. }
  221. func TestNetworkDBCRUDTableEntries(t *testing.T) {
  222. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  223. err := dbs[0].JoinNetwork("network1")
  224. assert.NilError(t, err)
  225. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  226. err = dbs[1].JoinNetwork("network1")
  227. assert.NilError(t, err)
  228. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  229. n := 10
  230. for i := 1; i <= n; i++ {
  231. err = dbs[0].CreateEntry("test_table", "network1",
  232. fmt.Sprintf("test_key0%d", i),
  233. []byte(fmt.Sprintf("test_value0%d", i)))
  234. assert.NilError(t, err)
  235. }
  236. for i := 1; i <= n; i++ {
  237. err = dbs[1].CreateEntry("test_table", "network1",
  238. fmt.Sprintf("test_key1%d", i),
  239. []byte(fmt.Sprintf("test_value1%d", i)))
  240. assert.NilError(t, err)
  241. }
  242. for i := 1; i <= n; i++ {
  243. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  244. fmt.Sprintf("test_key1%d", i),
  245. fmt.Sprintf("test_value1%d", i), true)
  246. assert.NilError(t, err)
  247. }
  248. for i := 1; i <= n; i++ {
  249. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  250. fmt.Sprintf("test_key0%d", i),
  251. fmt.Sprintf("test_value0%d", i), true)
  252. assert.NilError(t, err)
  253. }
  254. // Verify deletes
  255. for i := 1; i <= n; i++ {
  256. err = dbs[0].DeleteEntry("test_table", "network1",
  257. fmt.Sprintf("test_key0%d", i))
  258. assert.NilError(t, err)
  259. }
  260. for i := 1; i <= n; i++ {
  261. err = dbs[1].DeleteEntry("test_table", "network1",
  262. fmt.Sprintf("test_key1%d", i))
  263. assert.NilError(t, err)
  264. }
  265. for i := 1; i <= n; i++ {
  266. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  267. fmt.Sprintf("test_key1%d", i), "", false)
  268. assert.NilError(t, err)
  269. }
  270. for i := 1; i <= n; i++ {
  271. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  272. fmt.Sprintf("test_key0%d", i), "", false)
  273. assert.NilError(t, err)
  274. }
  275. closeNetworkDBInstances(t, dbs)
  276. }
  277. func TestNetworkDBNodeLeave(t *testing.T) {
  278. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  279. err := dbs[0].JoinNetwork("network1")
  280. assert.NilError(t, err)
  281. err = dbs[1].JoinNetwork("network1")
  282. assert.NilError(t, err)
  283. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  284. assert.NilError(t, err)
  285. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  286. dbs[0].Close()
  287. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  288. dbs[1].Close()
  289. }
  290. func TestNetworkDBWatch(t *testing.T) {
  291. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  292. err := dbs[0].JoinNetwork("network1")
  293. assert.NilError(t, err)
  294. err = dbs[1].JoinNetwork("network1")
  295. assert.NilError(t, err)
  296. ch, cancel := dbs[1].Watch("", "", "")
  297. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  298. assert.NilError(t, err)
  299. testWatch(t, ch.C, CreateEvent{}, "test_table", "network1", "test_key", "test_value")
  300. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  301. assert.NilError(t, err)
  302. testWatch(t, ch.C, UpdateEvent{}, "test_table", "network1", "test_key", "test_updated_value")
  303. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  304. assert.NilError(t, err)
  305. testWatch(t, ch.C, DeleteEvent{}, "test_table", "network1", "test_key", "")
  306. cancel()
  307. closeNetworkDBInstances(t, dbs)
  308. }
  309. func TestNetworkDBBulkSync(t *testing.T) {
  310. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  311. err := dbs[0].JoinNetwork("network1")
  312. assert.NilError(t, err)
  313. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  314. n := 1000
  315. for i := 1; i <= n; i++ {
  316. err = dbs[0].CreateEntry("test_table", "network1",
  317. fmt.Sprintf("test_key0%d", i),
  318. []byte(fmt.Sprintf("test_value0%d", i)))
  319. assert.NilError(t, err)
  320. }
  321. err = dbs[1].JoinNetwork("network1")
  322. assert.NilError(t, err)
  323. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  324. for i := 1; i <= n; i++ {
  325. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  326. fmt.Sprintf("test_key0%d", i),
  327. fmt.Sprintf("test_value0%d", i), true)
  328. assert.NilError(t, err)
  329. }
  330. closeNetworkDBInstances(t, dbs)
  331. }
  332. func TestNetworkDBCRUDMediumCluster(t *testing.T) {
  333. n := 5
  334. dbs := createNetworkDBInstances(t, n, "node", DefaultConfig())
  335. for i := 0; i < n; i++ {
  336. for j := 0; j < n; j++ {
  337. if i == j {
  338. continue
  339. }
  340. dbs[i].verifyNodeExistence(t, dbs[j].config.NodeID, true)
  341. }
  342. }
  343. for i := 0; i < n; i++ {
  344. err := dbs[i].JoinNetwork("network1")
  345. assert.NilError(t, err)
  346. }
  347. for i := 0; i < n; i++ {
  348. for j := 0; j < n; j++ {
  349. dbs[i].verifyNetworkExistence(t, dbs[j].config.NodeID, "network1", true)
  350. }
  351. }
  352. err := dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  353. assert.NilError(t, err)
  354. for i := 1; i < n; i++ {
  355. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  356. }
  357. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  358. assert.NilError(t, err)
  359. for i := 1; i < n; i++ {
  360. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  361. }
  362. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  363. assert.NilError(t, err)
  364. for i := 1; i < n; i++ {
  365. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  366. }
  367. for i := 1; i < n; i++ {
  368. _, err = dbs[i].GetEntry("test_table", "network1", "test_key")
  369. assert.Check(t, is.ErrorContains(err, ""))
  370. assert.Check(t, is.Contains(err.Error(), "deleted and pending garbage collection"), err)
  371. }
  372. closeNetworkDBInstances(t, dbs)
  373. }
  374. func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
  375. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  376. dbChangeWitness := func(db *NetworkDB) func(network string, expectNodeCount int) {
  377. staleNetworkTime := db.networkClock.Time()
  378. return func(network string, expectNodeCount int) {
  379. check := func(t poll.LogT) poll.Result {
  380. networkTime := db.networkClock.Time()
  381. if networkTime <= staleNetworkTime {
  382. return poll.Continue("network time is stale, no change registered yet.")
  383. }
  384. count := -1
  385. db.Lock()
  386. if nodes, ok := db.networkNodes[network]; ok {
  387. count = len(nodes)
  388. }
  389. db.Unlock()
  390. if count != expectNodeCount {
  391. return poll.Continue("current number of nodes is %d, expect %d.", count, expectNodeCount)
  392. }
  393. return poll.Success()
  394. }
  395. t.Helper()
  396. poll.WaitOn(t, check, poll.WithTimeout(3*time.Second), poll.WithDelay(5*time.Millisecond))
  397. }
  398. }
  399. // Single node Join/Leave
  400. witness0 := dbChangeWitness(dbs[0])
  401. err := dbs[0].JoinNetwork("network1")
  402. assert.NilError(t, err)
  403. witness0("network1", 1)
  404. witness0 = dbChangeWitness(dbs[0])
  405. err = dbs[0].LeaveNetwork("network1")
  406. assert.NilError(t, err)
  407. witness0("network1", 0)
  408. // Multiple nodes Join/Leave
  409. witness0, witness1 := dbChangeWitness(dbs[0]), dbChangeWitness(dbs[1])
  410. err = dbs[0].JoinNetwork("network1")
  411. assert.NilError(t, err)
  412. err = dbs[1].JoinNetwork("network1")
  413. assert.NilError(t, err)
  414. // Wait for the propagation on db[0]
  415. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  416. witness0("network1", 2)
  417. if n, ok := dbs[0].networks[dbs[0].config.NodeID]["network1"]; !ok || n.leaving {
  418. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  419. }
  420. // Wait for the propagation on db[1]
  421. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  422. witness1("network1", 2)
  423. if n, ok := dbs[1].networks[dbs[1].config.NodeID]["network1"]; !ok || n.leaving {
  424. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  425. }
  426. // Try a quick leave/join
  427. witness0, witness1 = dbChangeWitness(dbs[0]), dbChangeWitness(dbs[1])
  428. err = dbs[0].LeaveNetwork("network1")
  429. assert.NilError(t, err)
  430. err = dbs[0].JoinNetwork("network1")
  431. assert.NilError(t, err)
  432. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  433. witness0("network1", 2)
  434. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  435. witness1("network1", 2)
  436. closeNetworkDBInstances(t, dbs)
  437. }
  438. func TestNetworkDBGarbageCollection(t *testing.T) {
  439. keysWriteDelete := 5
  440. config := DefaultConfig()
  441. config.reapEntryInterval = 30 * time.Second
  442. config.StatsPrintPeriod = 15 * time.Second
  443. dbs := createNetworkDBInstances(t, 3, "node", config)
  444. // 2 Nodes join network
  445. err := dbs[0].JoinNetwork("network1")
  446. assert.NilError(t, err)
  447. err = dbs[1].JoinNetwork("network1")
  448. assert.NilError(t, err)
  449. for i := 0; i < keysWriteDelete; i++ {
  450. err = dbs[i%2].CreateEntry("testTable", "network1", "key-"+strconv.Itoa(i), []byte("value"))
  451. assert.NilError(t, err)
  452. }
  453. time.Sleep(time.Second)
  454. for i := 0; i < keysWriteDelete; i++ {
  455. err = dbs[i%2].DeleteEntry("testTable", "network1", "key-"+strconv.Itoa(i))
  456. assert.NilError(t, err)
  457. }
  458. for i := 0; i < 2; i++ {
  459. assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match")
  460. }
  461. // from this point the timer for the garbage collection started, wait 5 seconds and then join a new node
  462. time.Sleep(5 * time.Second)
  463. err = dbs[2].JoinNetwork("network1")
  464. assert.NilError(t, err)
  465. for i := 0; i < 3; i++ {
  466. assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match")
  467. }
  468. // at this point the entries should had been all deleted
  469. time.Sleep(30 * time.Second)
  470. for i := 0; i < 3; i++ {
  471. assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected")
  472. }
  473. // make sure that entries are not coming back
  474. time.Sleep(15 * time.Second)
  475. for i := 0; i < 3; i++ {
  476. assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected")
  477. }
  478. closeNetworkDBInstances(t, dbs)
  479. }
  480. func TestFindNode(t *testing.T) {
  481. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  482. dbs[0].nodes["active"] = &node{Node: memberlist.Node{Name: "active"}}
  483. dbs[0].failedNodes["failed"] = &node{Node: memberlist.Node{Name: "failed"}}
  484. dbs[0].leftNodes["left"] = &node{Node: memberlist.Node{Name: "left"}}
  485. // active nodes is 2 because the testing node is in the list
  486. assert.Check(t, is.Len(dbs[0].nodes, 2))
  487. assert.Check(t, is.Len(dbs[0].failedNodes, 1))
  488. assert.Check(t, is.Len(dbs[0].leftNodes, 1))
  489. n, currState, m := dbs[0].findNode("active")
  490. assert.Check(t, n != nil)
  491. assert.Check(t, is.Equal("active", n.Name))
  492. assert.Check(t, is.Equal(nodeActiveState, currState))
  493. assert.Check(t, m != nil)
  494. // delete the entry manually
  495. delete(m, "active")
  496. // test if can be still find
  497. n, currState, m = dbs[0].findNode("active")
  498. assert.Check(t, is.Nil(n))
  499. assert.Check(t, is.Equal(nodeNotFound, currState))
  500. assert.Check(t, is.Nil(m))
  501. n, currState, m = dbs[0].findNode("failed")
  502. assert.Check(t, n != nil)
  503. assert.Check(t, is.Equal("failed", n.Name))
  504. assert.Check(t, is.Equal(nodeFailedState, currState))
  505. assert.Check(t, m != nil)
  506. // find and remove
  507. n, currState, m = dbs[0].findNode("left")
  508. assert.Check(t, n != nil)
  509. assert.Check(t, is.Equal("left", n.Name))
  510. assert.Check(t, is.Equal(nodeLeftState, currState))
  511. assert.Check(t, m != nil)
  512. delete(m, "left")
  513. n, currState, m = dbs[0].findNode("left")
  514. assert.Check(t, is.Nil(n))
  515. assert.Check(t, is.Equal(nodeNotFound, currState))
  516. assert.Check(t, is.Nil(m))
  517. closeNetworkDBInstances(t, dbs)
  518. }
  519. func TestChangeNodeState(t *testing.T) {
  520. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  521. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1"}}
  522. dbs[0].nodes["node2"] = &node{Node: memberlist.Node{Name: "node2"}}
  523. dbs[0].nodes["node3"] = &node{Node: memberlist.Node{Name: "node3"}}
  524. // active nodes is 4 because the testing node is in the list
  525. assert.Check(t, is.Len(dbs[0].nodes, 4))
  526. n, currState, m := dbs[0].findNode("node1")
  527. assert.Check(t, n != nil)
  528. assert.Check(t, is.Equal(nodeActiveState, currState))
  529. assert.Check(t, is.Equal("node1", n.Name))
  530. assert.Check(t, m != nil)
  531. // node1 to failed
  532. dbs[0].changeNodeState("node1", nodeFailedState)
  533. n, currState, m = dbs[0].findNode("node1")
  534. assert.Check(t, n != nil)
  535. assert.Check(t, is.Equal(nodeFailedState, currState))
  536. assert.Check(t, is.Equal("node1", n.Name))
  537. assert.Check(t, m != nil)
  538. assert.Check(t, time.Duration(0) != n.reapTime)
  539. // node1 back to active
  540. dbs[0].changeNodeState("node1", nodeActiveState)
  541. n, currState, m = dbs[0].findNode("node1")
  542. assert.Check(t, n != nil)
  543. assert.Check(t, is.Equal(nodeActiveState, currState))
  544. assert.Check(t, is.Equal("node1", n.Name))
  545. assert.Check(t, m != nil)
  546. assert.Check(t, is.Equal(time.Duration(0), n.reapTime))
  547. // node1 to left
  548. dbs[0].changeNodeState("node1", nodeLeftState)
  549. dbs[0].changeNodeState("node2", nodeLeftState)
  550. dbs[0].changeNodeState("node3", nodeLeftState)
  551. n, currState, m = dbs[0].findNode("node1")
  552. assert.Check(t, n != nil)
  553. assert.Check(t, is.Equal(nodeLeftState, currState))
  554. assert.Check(t, is.Equal("node1", n.Name))
  555. assert.Check(t, m != nil)
  556. assert.Check(t, time.Duration(0) != n.reapTime)
  557. n, currState, m = dbs[0].findNode("node2")
  558. assert.Check(t, n != nil)
  559. assert.Check(t, is.Equal(nodeLeftState, currState))
  560. assert.Check(t, is.Equal("node2", n.Name))
  561. assert.Check(t, m != nil)
  562. assert.Check(t, time.Duration(0) != n.reapTime)
  563. n, currState, m = dbs[0].findNode("node3")
  564. assert.Check(t, n != nil)
  565. assert.Check(t, is.Equal(nodeLeftState, currState))
  566. assert.Check(t, is.Equal("node3", n.Name))
  567. assert.Check(t, m != nil)
  568. assert.Check(t, time.Duration(0) != n.reapTime)
  569. // active nodes is 1 because the testing node is in the list
  570. assert.Check(t, is.Len(dbs[0].nodes, 1))
  571. assert.Check(t, is.Len(dbs[0].failedNodes, 0))
  572. assert.Check(t, is.Len(dbs[0].leftNodes, 3))
  573. closeNetworkDBInstances(t, dbs)
  574. }
  575. func TestNodeReincarnation(t *testing.T) {
  576. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  577. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1", Addr: net.ParseIP("192.168.1.1")}}
  578. dbs[0].leftNodes["node2"] = &node{Node: memberlist.Node{Name: "node2", Addr: net.ParseIP("192.168.1.2")}}
  579. dbs[0].failedNodes["node3"] = &node{Node: memberlist.Node{Name: "node3", Addr: net.ParseIP("192.168.1.3")}}
  580. // active nodes is 2 because the testing node is in the list
  581. assert.Check(t, is.Len(dbs[0].nodes, 2))
  582. assert.Check(t, is.Len(dbs[0].failedNodes, 1))
  583. assert.Check(t, is.Len(dbs[0].leftNodes, 1))
  584. b := dbs[0].purgeReincarnation(&memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")})
  585. assert.Check(t, b)
  586. dbs[0].nodes["node4"] = &node{Node: memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}}
  587. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.2")})
  588. assert.Check(t, b)
  589. dbs[0].nodes["node5"] = &node{Node: memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.1")}}
  590. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.3")})
  591. assert.Check(t, b)
  592. dbs[0].nodes["node6"] = &node{Node: memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.1")}}
  593. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.10")})
  594. assert.Check(t, !b)
  595. // active nodes is 1 because the testing node is in the list
  596. assert.Check(t, is.Len(dbs[0].nodes, 4))
  597. assert.Check(t, is.Len(dbs[0].failedNodes, 0))
  598. assert.Check(t, is.Len(dbs[0].leftNodes, 3))
  599. closeNetworkDBInstances(t, dbs)
  600. }
  601. func TestParallelCreate(t *testing.T) {
  602. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  603. startCh := make(chan int)
  604. doneCh := make(chan error)
  605. var success int32
  606. for i := 0; i < 20; i++ {
  607. go func() {
  608. <-startCh
  609. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  610. if err == nil {
  611. atomic.AddInt32(&success, 1)
  612. }
  613. doneCh <- err
  614. }()
  615. }
  616. close(startCh)
  617. for i := 0; i < 20; i++ {
  618. <-doneCh
  619. }
  620. close(doneCh)
  621. // Only 1 write should have succeeded
  622. assert.Check(t, is.Equal(int32(1), success))
  623. closeNetworkDBInstances(t, dbs)
  624. }
  625. func TestParallelDelete(t *testing.T) {
  626. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  627. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  628. assert.NilError(t, err)
  629. startCh := make(chan int)
  630. doneCh := make(chan error)
  631. var success int32
  632. for i := 0; i < 20; i++ {
  633. go func() {
  634. <-startCh
  635. err := dbs[0].DeleteEntry("testTable", "testNetwork", "key")
  636. if err == nil {
  637. atomic.AddInt32(&success, 1)
  638. }
  639. doneCh <- err
  640. }()
  641. }
  642. close(startCh)
  643. for i := 0; i < 20; i++ {
  644. <-doneCh
  645. }
  646. close(doneCh)
  647. // Only 1 write should have succeeded
  648. assert.Check(t, is.Equal(int32(1), success))
  649. closeNetworkDBInstances(t, dbs)
  650. }
  651. func TestNetworkDBIslands(t *testing.T) {
  652. pollTimeout := func() time.Duration {
  653. const defaultTimeout = 120 * time.Second
  654. dl, ok := t.Deadline()
  655. if !ok {
  656. return defaultTimeout
  657. }
  658. if d := time.Until(dl); d <= defaultTimeout {
  659. return d
  660. }
  661. return defaultTimeout
  662. }
  663. logrus.SetLevel(logrus.DebugLevel)
  664. conf := DefaultConfig()
  665. // Shorten durations to speed up test execution.
  666. conf.rejoinClusterDuration = conf.rejoinClusterDuration / 10
  667. conf.rejoinClusterInterval = conf.rejoinClusterInterval / 10
  668. dbs := createNetworkDBInstances(t, 5, "node", conf)
  669. // Get the node IP used currently
  670. node := dbs[0].nodes[dbs[0].config.NodeID]
  671. baseIPStr := node.Addr.String()
  672. // Node 0,1,2 are going to be the 3 bootstrap nodes
  673. members := []string{fmt.Sprintf("%s:%d", baseIPStr, dbs[0].config.BindPort),
  674. fmt.Sprintf("%s:%d", baseIPStr, dbs[1].config.BindPort),
  675. fmt.Sprintf("%s:%d", baseIPStr, dbs[2].config.BindPort)}
  676. // Rejoining will update the list of the bootstrap members
  677. for i := 3; i < 5; i++ {
  678. t.Logf("Re-joining: %d", i)
  679. assert.Check(t, dbs[i].Join(members))
  680. }
  681. // Now the 3 bootstrap nodes will cleanly leave, and will be properly removed from the other 2 nodes
  682. for i := 0; i < 3; i++ {
  683. logrus.Infof("node %d leaving", i)
  684. dbs[i].Close()
  685. }
  686. checkDBs := make(map[string]*NetworkDB)
  687. for i := 3; i < 5; i++ {
  688. db := dbs[i]
  689. checkDBs[db.config.Hostname] = db
  690. }
  691. // Give some time to let the system propagate the messages and free up the ports
  692. check := func(t poll.LogT) poll.Result {
  693. // Verify that the nodes are actually all gone and marked appropiately
  694. for name, db := range checkDBs {
  695. db.RLock()
  696. if (len(db.leftNodes) != 3) || (len(db.failedNodes) != 0) {
  697. for name := range db.leftNodes {
  698. t.Logf("%s: Node %s left", db.config.Hostname, name)
  699. }
  700. for name := range db.failedNodes {
  701. t.Logf("%s: Node %s failed", db.config.Hostname, name)
  702. }
  703. db.RUnlock()
  704. return poll.Continue("%s:Waiting for all nodes to cleanly leave, left: %d, failed nodes: %d", name, len(db.leftNodes), len(db.failedNodes))
  705. }
  706. db.RUnlock()
  707. t.Logf("%s: OK", name)
  708. delete(checkDBs, name)
  709. }
  710. return poll.Success()
  711. }
  712. poll.WaitOn(t, check, poll.WithDelay(time.Second), poll.WithTimeout(pollTimeout()))
  713. // Spawn again the first 3 nodes with different names but same IP:port
  714. for i := 0; i < 3; i++ {
  715. logrus.Infof("node %d coming back", i)
  716. dbs[i].config.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
  717. dbs[i] = launchNode(t, *dbs[i].config)
  718. }
  719. // Give some time for the reconnect routine to run, it runs every 6s.
  720. check = func(t poll.LogT) poll.Result {
  721. // Verify that the cluster is again all connected. Note that the 3 previous node did not do any join
  722. for i := 0; i < 5; i++ {
  723. db := dbs[i]
  724. db.RLock()
  725. if len(db.nodes) != 5 {
  726. db.RUnlock()
  727. return poll.Continue("%s:Waiting to connect to all nodes", dbs[i].config.Hostname)
  728. }
  729. if len(db.failedNodes) != 0 {
  730. db.RUnlock()
  731. return poll.Continue("%s:Waiting for 0 failedNodes", dbs[i].config.Hostname)
  732. }
  733. if i < 3 {
  734. // nodes from 0 to 3 has no left nodes
  735. if len(db.leftNodes) != 0 {
  736. db.RUnlock()
  737. return poll.Continue("%s:Waiting to have no leftNodes", dbs[i].config.Hostname)
  738. }
  739. } else {
  740. // nodes from 4 to 5 has the 3 previous left nodes
  741. if len(db.leftNodes) != 3 {
  742. db.RUnlock()
  743. return poll.Continue("%s:Waiting to have 3 leftNodes", dbs[i].config.Hostname)
  744. }
  745. }
  746. db.RUnlock()
  747. }
  748. return poll.Success()
  749. }
  750. poll.WaitOn(t, check, poll.WithDelay(time.Second), poll.WithTimeout(pollTimeout()))
  751. closeNetworkDBInstances(t, dbs)
  752. }