networkdb_test.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941
  1. package networkdb
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "os"
  7. "strconv"
  8. "sync/atomic"
  9. "testing"
  10. "time"
  11. "github.com/containerd/containerd/log"
  12. "github.com/docker/docker/pkg/stringid"
  13. "github.com/docker/go-events"
  14. "github.com/hashicorp/memberlist"
  15. "github.com/sirupsen/logrus"
  16. "gotest.tools/v3/assert"
  17. is "gotest.tools/v3/assert/cmp"
  18. "gotest.tools/v3/poll"
  19. )
  20. var dbPort int32 = 10000
  21. func TestMain(m *testing.M) {
  22. os.WriteFile("/proc/sys/net/ipv6/conf/lo/disable_ipv6", []byte{'0', '\n'}, 0o644)
  23. logrus.SetLevel(logrus.ErrorLevel)
  24. os.Exit(m.Run())
  25. }
  26. func launchNode(t *testing.T, conf Config) *NetworkDB {
  27. t.Helper()
  28. db, err := New(&conf)
  29. assert.NilError(t, err)
  30. return db
  31. }
  32. func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
  33. t.Helper()
  34. var dbs []*NetworkDB
  35. for i := 0; i < num; i++ {
  36. localConfig := *conf
  37. localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
  38. localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
  39. localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1))
  40. db := launchNode(t, localConfig)
  41. if i != 0 {
  42. assert.Check(t, db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)}))
  43. }
  44. dbs = append(dbs, db)
  45. }
  46. // Wait till the cluster creation is successful
  47. check := func(t poll.LogT) poll.Result {
  48. // Check that the cluster is properly created
  49. for i := 0; i < num; i++ {
  50. if num != len(dbs[i].ClusterPeers()) {
  51. return poll.Continue("%s:Waiting for cluster peers to be established", dbs[i].config.Hostname)
  52. }
  53. }
  54. return poll.Success()
  55. }
  56. poll.WaitOn(t, check, poll.WithDelay(2*time.Second), poll.WithTimeout(20*time.Second))
  57. return dbs
  58. }
  59. func closeNetworkDBInstances(t *testing.T, dbs []*NetworkDB) {
  60. t.Helper()
  61. log.G(context.TODO()).Print("Closing DB instances...")
  62. for _, db := range dbs {
  63. db.Close()
  64. }
  65. }
  66. func (db *NetworkDB) verifyNodeExistence(t *testing.T, node string, present bool) {
  67. t.Helper()
  68. for i := 0; i < 80; i++ {
  69. db.RLock()
  70. _, ok := db.nodes[node]
  71. db.RUnlock()
  72. if present && ok {
  73. return
  74. }
  75. if !present && !ok {
  76. return
  77. }
  78. time.Sleep(50 * time.Millisecond)
  79. }
  80. t.Errorf("%v(%v): Node existence verification for node %s failed", db.config.Hostname, db.config.NodeID, node)
  81. }
  82. func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) {
  83. t.Helper()
  84. const sleepInterval = 50 * time.Millisecond
  85. var maxRetries int64
  86. if dl, ok := t.Deadline(); ok {
  87. maxRetries = int64(time.Until(dl) / sleepInterval)
  88. } else {
  89. maxRetries = 80
  90. }
  91. for i := int64(0); i < maxRetries; i++ {
  92. db.RLock()
  93. nn, nnok := db.networks[node]
  94. if nnok {
  95. n, ok := nn[id]
  96. var leaving bool
  97. if ok {
  98. leaving = n.leaving
  99. }
  100. db.RUnlock()
  101. if present && ok {
  102. return
  103. }
  104. if !present &&
  105. ((ok && leaving) ||
  106. !ok) {
  107. return
  108. }
  109. } else {
  110. db.RUnlock()
  111. }
  112. time.Sleep(sleepInterval)
  113. }
  114. t.Error("Network existence verification failed")
  115. }
  116. func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value string, present bool) {
  117. t.Helper()
  118. n := 80
  119. for i := 0; i < n; i++ {
  120. v, err := db.GetEntry(tname, nid, key)
  121. if present && err == nil && string(v) == value {
  122. return
  123. }
  124. if err != nil && !present {
  125. return
  126. }
  127. time.Sleep(50 * time.Millisecond)
  128. }
  129. t.Errorf("Entry existence verification test failed for %v(%v)", db.config.Hostname, db.config.NodeID)
  130. }
  131. func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) {
  132. t.Helper()
  133. select {
  134. case rcvdEv := <-ch:
  135. assert.Check(t, is.Equal(fmt.Sprintf("%T", rcvdEv), fmt.Sprintf("%T", ev)))
  136. switch typ := rcvdEv.(type) {
  137. case CreateEvent:
  138. assert.Check(t, is.Equal(tname, typ.Table))
  139. assert.Check(t, is.Equal(nid, typ.NetworkID))
  140. assert.Check(t, is.Equal(key, typ.Key))
  141. assert.Check(t, is.Equal(value, string(typ.Value)))
  142. case UpdateEvent:
  143. assert.Check(t, is.Equal(tname, typ.Table))
  144. assert.Check(t, is.Equal(nid, typ.NetworkID))
  145. assert.Check(t, is.Equal(key, typ.Key))
  146. assert.Check(t, is.Equal(value, string(typ.Value)))
  147. case DeleteEvent:
  148. assert.Check(t, is.Equal(tname, typ.Table))
  149. assert.Check(t, is.Equal(nid, typ.NetworkID))
  150. assert.Check(t, is.Equal(key, typ.Key))
  151. }
  152. case <-time.After(time.Second):
  153. t.Fail()
  154. return
  155. }
  156. }
  157. func TestNetworkDBSimple(t *testing.T) {
  158. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  159. closeNetworkDBInstances(t, dbs)
  160. }
  161. func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
  162. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  163. err := dbs[0].JoinNetwork("network1")
  164. assert.NilError(t, err)
  165. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  166. err = dbs[0].LeaveNetwork("network1")
  167. assert.NilError(t, err)
  168. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", false)
  169. closeNetworkDBInstances(t, dbs)
  170. }
  171. func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
  172. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  173. n := 10
  174. for i := 1; i <= n; i++ {
  175. err := dbs[0].JoinNetwork(fmt.Sprintf("network0%d", i))
  176. assert.NilError(t, err)
  177. }
  178. for i := 1; i <= n; i++ {
  179. err := dbs[1].JoinNetwork(fmt.Sprintf("network1%d", i))
  180. assert.NilError(t, err)
  181. }
  182. for i := 1; i <= n; i++ {
  183. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), true)
  184. }
  185. for i := 1; i <= n; i++ {
  186. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), true)
  187. }
  188. for i := 1; i <= n; i++ {
  189. err := dbs[0].LeaveNetwork(fmt.Sprintf("network0%d", i))
  190. assert.NilError(t, err)
  191. }
  192. for i := 1; i <= n; i++ {
  193. err := dbs[1].LeaveNetwork(fmt.Sprintf("network1%d", i))
  194. assert.NilError(t, err)
  195. }
  196. for i := 1; i <= n; i++ {
  197. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), false)
  198. }
  199. for i := 1; i <= n; i++ {
  200. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), false)
  201. }
  202. closeNetworkDBInstances(t, dbs)
  203. }
  204. func TestNetworkDBCRUDTableEntry(t *testing.T) {
  205. dbs := createNetworkDBInstances(t, 3, "node", DefaultConfig())
  206. err := dbs[0].JoinNetwork("network1")
  207. assert.NilError(t, err)
  208. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  209. err = dbs[1].JoinNetwork("network1")
  210. assert.NilError(t, err)
  211. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  212. assert.NilError(t, err)
  213. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  214. dbs[2].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  215. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  216. assert.NilError(t, err)
  217. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  218. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  219. assert.NilError(t, err)
  220. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  221. closeNetworkDBInstances(t, dbs)
  222. }
  223. func TestNetworkDBCRUDTableEntries(t *testing.T) {
  224. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  225. err := dbs[0].JoinNetwork("network1")
  226. assert.NilError(t, err)
  227. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  228. err = dbs[1].JoinNetwork("network1")
  229. assert.NilError(t, err)
  230. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  231. n := 10
  232. for i := 1; i <= n; i++ {
  233. err = dbs[0].CreateEntry("test_table", "network1",
  234. fmt.Sprintf("test_key0%d", i),
  235. []byte(fmt.Sprintf("test_value0%d", i)))
  236. assert.NilError(t, err)
  237. }
  238. for i := 1; i <= n; i++ {
  239. err = dbs[1].CreateEntry("test_table", "network1",
  240. fmt.Sprintf("test_key1%d", i),
  241. []byte(fmt.Sprintf("test_value1%d", i)))
  242. assert.NilError(t, err)
  243. }
  244. for i := 1; i <= n; i++ {
  245. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  246. fmt.Sprintf("test_key1%d", i),
  247. fmt.Sprintf("test_value1%d", i), true)
  248. assert.NilError(t, err)
  249. }
  250. for i := 1; i <= n; i++ {
  251. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  252. fmt.Sprintf("test_key0%d", i),
  253. fmt.Sprintf("test_value0%d", i), true)
  254. assert.NilError(t, err)
  255. }
  256. // Verify deletes
  257. for i := 1; i <= n; i++ {
  258. err = dbs[0].DeleteEntry("test_table", "network1",
  259. fmt.Sprintf("test_key0%d", i))
  260. assert.NilError(t, err)
  261. }
  262. for i := 1; i <= n; i++ {
  263. err = dbs[1].DeleteEntry("test_table", "network1",
  264. fmt.Sprintf("test_key1%d", i))
  265. assert.NilError(t, err)
  266. }
  267. for i := 1; i <= n; i++ {
  268. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  269. fmt.Sprintf("test_key1%d", i), "", false)
  270. assert.NilError(t, err)
  271. }
  272. for i := 1; i <= n; i++ {
  273. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  274. fmt.Sprintf("test_key0%d", i), "", false)
  275. assert.NilError(t, err)
  276. }
  277. closeNetworkDBInstances(t, dbs)
  278. }
  279. func TestNetworkDBNodeLeave(t *testing.T) {
  280. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  281. err := dbs[0].JoinNetwork("network1")
  282. assert.NilError(t, err)
  283. err = dbs[1].JoinNetwork("network1")
  284. assert.NilError(t, err)
  285. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  286. assert.NilError(t, err)
  287. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  288. dbs[0].Close()
  289. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  290. dbs[1].Close()
  291. }
  292. func TestNetworkDBWatch(t *testing.T) {
  293. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  294. err := dbs[0].JoinNetwork("network1")
  295. assert.NilError(t, err)
  296. err = dbs[1].JoinNetwork("network1")
  297. assert.NilError(t, err)
  298. ch, cancel := dbs[1].Watch("", "")
  299. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  300. assert.NilError(t, err)
  301. testWatch(t, ch.C, CreateEvent{}, "test_table", "network1", "test_key", "test_value")
  302. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  303. assert.NilError(t, err)
  304. testWatch(t, ch.C, UpdateEvent{}, "test_table", "network1", "test_key", "test_updated_value")
  305. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  306. assert.NilError(t, err)
  307. testWatch(t, ch.C, DeleteEvent{}, "test_table", "network1", "test_key", "")
  308. cancel()
  309. closeNetworkDBInstances(t, dbs)
  310. }
  311. func TestNetworkDBBulkSync(t *testing.T) {
  312. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  313. err := dbs[0].JoinNetwork("network1")
  314. assert.NilError(t, err)
  315. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  316. n := 1000
  317. for i := 1; i <= n; i++ {
  318. err = dbs[0].CreateEntry("test_table", "network1",
  319. fmt.Sprintf("test_key0%d", i),
  320. []byte(fmt.Sprintf("test_value0%d", i)))
  321. assert.NilError(t, err)
  322. }
  323. err = dbs[1].JoinNetwork("network1")
  324. assert.NilError(t, err)
  325. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  326. for i := 1; i <= n; i++ {
  327. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  328. fmt.Sprintf("test_key0%d", i),
  329. fmt.Sprintf("test_value0%d", i), true)
  330. assert.NilError(t, err)
  331. }
  332. closeNetworkDBInstances(t, dbs)
  333. }
  334. func TestNetworkDBCRUDMediumCluster(t *testing.T) {
  335. n := 5
  336. dbs := createNetworkDBInstances(t, n, "node", DefaultConfig())
  337. for i := 0; i < n; i++ {
  338. for j := 0; j < n; j++ {
  339. if i == j {
  340. continue
  341. }
  342. dbs[i].verifyNodeExistence(t, dbs[j].config.NodeID, true)
  343. }
  344. }
  345. for i := 0; i < n; i++ {
  346. err := dbs[i].JoinNetwork("network1")
  347. assert.NilError(t, err)
  348. }
  349. for i := 0; i < n; i++ {
  350. for j := 0; j < n; j++ {
  351. dbs[i].verifyNetworkExistence(t, dbs[j].config.NodeID, "network1", true)
  352. }
  353. }
  354. err := dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  355. assert.NilError(t, err)
  356. for i := 1; i < n; i++ {
  357. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  358. }
  359. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  360. assert.NilError(t, err)
  361. for i := 1; i < n; i++ {
  362. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  363. }
  364. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  365. assert.NilError(t, err)
  366. for i := 1; i < n; i++ {
  367. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  368. }
  369. for i := 1; i < n; i++ {
  370. _, err = dbs[i].GetEntry("test_table", "network1", "test_key")
  371. assert.Check(t, is.ErrorContains(err, ""))
  372. assert.Check(t, is.Contains(err.Error(), "deleted and pending garbage collection"), err)
  373. }
  374. closeNetworkDBInstances(t, dbs)
  375. }
  376. func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
  377. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  378. dbChangeWitness := func(db *NetworkDB) func(network string, expectNodeCount int) {
  379. staleNetworkTime := db.networkClock.Time()
  380. return func(network string, expectNodeCount int) {
  381. check := func(t poll.LogT) poll.Result {
  382. networkTime := db.networkClock.Time()
  383. if networkTime <= staleNetworkTime {
  384. return poll.Continue("network time is stale, no change registered yet.")
  385. }
  386. count := -1
  387. db.Lock()
  388. if nodes, ok := db.networkNodes[network]; ok {
  389. count = len(nodes)
  390. }
  391. db.Unlock()
  392. if count != expectNodeCount {
  393. return poll.Continue("current number of nodes is %d, expect %d.", count, expectNodeCount)
  394. }
  395. return poll.Success()
  396. }
  397. t.Helper()
  398. poll.WaitOn(t, check, poll.WithTimeout(3*time.Second), poll.WithDelay(5*time.Millisecond))
  399. }
  400. }
  401. // Single node Join/Leave
  402. witness0 := dbChangeWitness(dbs[0])
  403. err := dbs[0].JoinNetwork("network1")
  404. assert.NilError(t, err)
  405. witness0("network1", 1)
  406. witness0 = dbChangeWitness(dbs[0])
  407. err = dbs[0].LeaveNetwork("network1")
  408. assert.NilError(t, err)
  409. witness0("network1", 0)
  410. // Multiple nodes Join/Leave
  411. witness0, witness1 := dbChangeWitness(dbs[0]), dbChangeWitness(dbs[1])
  412. err = dbs[0].JoinNetwork("network1")
  413. assert.NilError(t, err)
  414. err = dbs[1].JoinNetwork("network1")
  415. assert.NilError(t, err)
  416. // Wait for the propagation on db[0]
  417. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  418. witness0("network1", 2)
  419. if n, ok := dbs[0].networks[dbs[0].config.NodeID]["network1"]; !ok || n.leaving {
  420. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  421. }
  422. // Wait for the propagation on db[1]
  423. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  424. witness1("network1", 2)
  425. if n, ok := dbs[1].networks[dbs[1].config.NodeID]["network1"]; !ok || n.leaving {
  426. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  427. }
  428. // Try a quick leave/join
  429. witness0, witness1 = dbChangeWitness(dbs[0]), dbChangeWitness(dbs[1])
  430. err = dbs[0].LeaveNetwork("network1")
  431. assert.NilError(t, err)
  432. err = dbs[0].JoinNetwork("network1")
  433. assert.NilError(t, err)
  434. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  435. witness0("network1", 2)
  436. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  437. witness1("network1", 2)
  438. closeNetworkDBInstances(t, dbs)
  439. }
  440. func TestNetworkDBGarbageCollection(t *testing.T) {
  441. keysWriteDelete := 5
  442. config := DefaultConfig()
  443. config.reapEntryInterval = 30 * time.Second
  444. config.StatsPrintPeriod = 15 * time.Second
  445. dbs := createNetworkDBInstances(t, 3, "node", config)
  446. // 2 Nodes join network
  447. err := dbs[0].JoinNetwork("network1")
  448. assert.NilError(t, err)
  449. err = dbs[1].JoinNetwork("network1")
  450. assert.NilError(t, err)
  451. for i := 0; i < keysWriteDelete; i++ {
  452. err = dbs[i%2].CreateEntry("testTable", "network1", "key-"+strconv.Itoa(i), []byte("value"))
  453. assert.NilError(t, err)
  454. }
  455. time.Sleep(time.Second)
  456. for i := 0; i < keysWriteDelete; i++ {
  457. err = dbs[i%2].DeleteEntry("testTable", "network1", "key-"+strconv.Itoa(i))
  458. assert.NilError(t, err)
  459. }
  460. for i := 0; i < 2; i++ {
  461. dbs[i].Lock()
  462. assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match")
  463. dbs[i].Unlock()
  464. }
  465. // from this point the timer for the garbage collection started, wait 5 seconds and then join a new node
  466. time.Sleep(5 * time.Second)
  467. err = dbs[2].JoinNetwork("network1")
  468. assert.NilError(t, err)
  469. for i := 0; i < 3; i++ {
  470. dbs[i].Lock()
  471. assert.Check(t, is.Equal(int64(keysWriteDelete), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries number should match")
  472. dbs[i].Unlock()
  473. }
  474. // at this point the entries should had been all deleted
  475. time.Sleep(30 * time.Second)
  476. for i := 0; i < 3; i++ {
  477. dbs[i].Lock()
  478. assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected")
  479. dbs[i].Unlock()
  480. }
  481. // make sure that entries are not coming back
  482. time.Sleep(15 * time.Second)
  483. for i := 0; i < 3; i++ {
  484. dbs[i].Lock()
  485. assert.Check(t, is.Equal(int64(0), dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber.Load()), "entries should had been garbage collected")
  486. dbs[i].Unlock()
  487. }
  488. closeNetworkDBInstances(t, dbs)
  489. }
  490. func TestFindNode(t *testing.T) {
  491. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  492. dbs[0].nodes["active"] = &node{Node: memberlist.Node{Name: "active"}}
  493. dbs[0].failedNodes["failed"] = &node{Node: memberlist.Node{Name: "failed"}}
  494. dbs[0].leftNodes["left"] = &node{Node: memberlist.Node{Name: "left"}}
  495. // active nodes is 2 because the testing node is in the list
  496. assert.Check(t, is.Len(dbs[0].nodes, 2))
  497. assert.Check(t, is.Len(dbs[0].failedNodes, 1))
  498. assert.Check(t, is.Len(dbs[0].leftNodes, 1))
  499. n, currState, m := dbs[0].findNode("active")
  500. assert.Check(t, n != nil)
  501. assert.Check(t, is.Equal("active", n.Name))
  502. assert.Check(t, is.Equal(nodeActiveState, currState))
  503. assert.Check(t, m != nil)
  504. // delete the entry manually
  505. delete(m, "active")
  506. // test if can be still find
  507. n, currState, m = dbs[0].findNode("active")
  508. assert.Check(t, is.Nil(n))
  509. assert.Check(t, is.Equal(nodeNotFound, currState))
  510. assert.Check(t, is.Nil(m))
  511. n, currState, m = dbs[0].findNode("failed")
  512. assert.Check(t, n != nil)
  513. assert.Check(t, is.Equal("failed", n.Name))
  514. assert.Check(t, is.Equal(nodeFailedState, currState))
  515. assert.Check(t, m != nil)
  516. // find and remove
  517. n, currState, m = dbs[0].findNode("left")
  518. assert.Check(t, n != nil)
  519. assert.Check(t, is.Equal("left", n.Name))
  520. assert.Check(t, is.Equal(nodeLeftState, currState))
  521. assert.Check(t, m != nil)
  522. delete(m, "left")
  523. n, currState, m = dbs[0].findNode("left")
  524. assert.Check(t, is.Nil(n))
  525. assert.Check(t, is.Equal(nodeNotFound, currState))
  526. assert.Check(t, is.Nil(m))
  527. closeNetworkDBInstances(t, dbs)
  528. }
  529. func TestChangeNodeState(t *testing.T) {
  530. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  531. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1"}}
  532. dbs[0].nodes["node2"] = &node{Node: memberlist.Node{Name: "node2"}}
  533. dbs[0].nodes["node3"] = &node{Node: memberlist.Node{Name: "node3"}}
  534. // active nodes is 4 because the testing node is in the list
  535. assert.Check(t, is.Len(dbs[0].nodes, 4))
  536. n, currState, m := dbs[0].findNode("node1")
  537. assert.Check(t, n != nil)
  538. assert.Check(t, is.Equal(nodeActiveState, currState))
  539. assert.Check(t, is.Equal("node1", n.Name))
  540. assert.Check(t, m != nil)
  541. // node1 to failed
  542. dbs[0].changeNodeState("node1", nodeFailedState)
  543. n, currState, m = dbs[0].findNode("node1")
  544. assert.Check(t, n != nil)
  545. assert.Check(t, is.Equal(nodeFailedState, currState))
  546. assert.Check(t, is.Equal("node1", n.Name))
  547. assert.Check(t, m != nil)
  548. assert.Check(t, time.Duration(0) != n.reapTime)
  549. // node1 back to active
  550. dbs[0].changeNodeState("node1", nodeActiveState)
  551. n, currState, m = dbs[0].findNode("node1")
  552. assert.Check(t, n != nil)
  553. assert.Check(t, is.Equal(nodeActiveState, currState))
  554. assert.Check(t, is.Equal("node1", n.Name))
  555. assert.Check(t, m != nil)
  556. assert.Check(t, is.Equal(time.Duration(0), n.reapTime))
  557. // node1 to left
  558. dbs[0].changeNodeState("node1", nodeLeftState)
  559. dbs[0].changeNodeState("node2", nodeLeftState)
  560. dbs[0].changeNodeState("node3", nodeLeftState)
  561. n, currState, m = dbs[0].findNode("node1")
  562. assert.Check(t, n != nil)
  563. assert.Check(t, is.Equal(nodeLeftState, currState))
  564. assert.Check(t, is.Equal("node1", n.Name))
  565. assert.Check(t, m != nil)
  566. assert.Check(t, time.Duration(0) != n.reapTime)
  567. n, currState, m = dbs[0].findNode("node2")
  568. assert.Check(t, n != nil)
  569. assert.Check(t, is.Equal(nodeLeftState, currState))
  570. assert.Check(t, is.Equal("node2", n.Name))
  571. assert.Check(t, m != nil)
  572. assert.Check(t, time.Duration(0) != n.reapTime)
  573. n, currState, m = dbs[0].findNode("node3")
  574. assert.Check(t, n != nil)
  575. assert.Check(t, is.Equal(nodeLeftState, currState))
  576. assert.Check(t, is.Equal("node3", n.Name))
  577. assert.Check(t, m != nil)
  578. assert.Check(t, time.Duration(0) != n.reapTime)
  579. // active nodes is 1 because the testing node is in the list
  580. assert.Check(t, is.Len(dbs[0].nodes, 1))
  581. assert.Check(t, is.Len(dbs[0].failedNodes, 0))
  582. assert.Check(t, is.Len(dbs[0].leftNodes, 3))
  583. closeNetworkDBInstances(t, dbs)
  584. }
  585. func TestNodeReincarnation(t *testing.T) {
  586. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  587. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1", Addr: net.ParseIP("192.168.1.1")}}
  588. dbs[0].leftNodes["node2"] = &node{Node: memberlist.Node{Name: "node2", Addr: net.ParseIP("192.168.1.2")}}
  589. dbs[0].failedNodes["node3"] = &node{Node: memberlist.Node{Name: "node3", Addr: net.ParseIP("192.168.1.3")}}
  590. // active nodes is 2 because the testing node is in the list
  591. assert.Check(t, is.Len(dbs[0].nodes, 2))
  592. assert.Check(t, is.Len(dbs[0].failedNodes, 1))
  593. assert.Check(t, is.Len(dbs[0].leftNodes, 1))
  594. dbs[0].Lock()
  595. b := dbs[0].purgeReincarnation(&memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")})
  596. assert.Check(t, b)
  597. dbs[0].nodes["node4"] = &node{Node: memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}}
  598. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.2")})
  599. assert.Check(t, b)
  600. dbs[0].nodes["node5"] = &node{Node: memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.1")}}
  601. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.3")})
  602. assert.Check(t, b)
  603. dbs[0].nodes["node6"] = &node{Node: memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.1")}}
  604. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.10")})
  605. assert.Check(t, !b)
  606. // active nodes is 1 because the testing node is in the list
  607. assert.Check(t, is.Len(dbs[0].nodes, 4))
  608. assert.Check(t, is.Len(dbs[0].failedNodes, 0))
  609. assert.Check(t, is.Len(dbs[0].leftNodes, 3))
  610. dbs[0].Unlock()
  611. closeNetworkDBInstances(t, dbs)
  612. }
  613. func TestParallelCreate(t *testing.T) {
  614. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  615. startCh := make(chan int)
  616. doneCh := make(chan error)
  617. var success int32
  618. for i := 0; i < 20; i++ {
  619. go func() {
  620. <-startCh
  621. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  622. if err == nil {
  623. atomic.AddInt32(&success, 1)
  624. }
  625. doneCh <- err
  626. }()
  627. }
  628. close(startCh)
  629. for i := 0; i < 20; i++ {
  630. <-doneCh
  631. }
  632. close(doneCh)
  633. // Only 1 write should have succeeded
  634. assert.Check(t, is.Equal(int32(1), success))
  635. closeNetworkDBInstances(t, dbs)
  636. }
  637. func TestParallelDelete(t *testing.T) {
  638. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  639. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  640. assert.NilError(t, err)
  641. startCh := make(chan int)
  642. doneCh := make(chan error)
  643. var success int32
  644. for i := 0; i < 20; i++ {
  645. go func() {
  646. <-startCh
  647. err := dbs[0].DeleteEntry("testTable", "testNetwork", "key")
  648. if err == nil {
  649. atomic.AddInt32(&success, 1)
  650. }
  651. doneCh <- err
  652. }()
  653. }
  654. close(startCh)
  655. for i := 0; i < 20; i++ {
  656. <-doneCh
  657. }
  658. close(doneCh)
  659. // Only 1 write should have succeeded
  660. assert.Check(t, is.Equal(int32(1), success))
  661. closeNetworkDBInstances(t, dbs)
  662. }
  663. func TestNetworkDBIslands(t *testing.T) {
  664. pollTimeout := func() time.Duration {
  665. const defaultTimeout = 120 * time.Second
  666. dl, ok := t.Deadline()
  667. if !ok {
  668. return defaultTimeout
  669. }
  670. if d := time.Until(dl); d <= defaultTimeout {
  671. return d
  672. }
  673. return defaultTimeout
  674. }
  675. logrus.SetLevel(logrus.DebugLevel)
  676. conf := DefaultConfig()
  677. // Shorten durations to speed up test execution.
  678. conf.rejoinClusterDuration = conf.rejoinClusterDuration / 10
  679. conf.rejoinClusterInterval = conf.rejoinClusterInterval / 10
  680. dbs := createNetworkDBInstances(t, 5, "node", conf)
  681. // Get the node IP used currently
  682. node := dbs[0].nodes[dbs[0].config.NodeID]
  683. baseIPStr := node.Addr.String()
  684. // Node 0,1,2 are going to be the 3 bootstrap nodes
  685. members := []string{
  686. fmt.Sprintf("%s:%d", baseIPStr, dbs[0].config.BindPort),
  687. fmt.Sprintf("%s:%d", baseIPStr, dbs[1].config.BindPort),
  688. fmt.Sprintf("%s:%d", baseIPStr, dbs[2].config.BindPort),
  689. }
  690. // Rejoining will update the list of the bootstrap members
  691. for i := 3; i < 5; i++ {
  692. t.Logf("Re-joining: %d", i)
  693. assert.Check(t, dbs[i].Join(members))
  694. }
  695. // Now the 3 bootstrap nodes will cleanly leave, and will be properly removed from the other 2 nodes
  696. for i := 0; i < 3; i++ {
  697. log.G(context.TODO()).Infof("node %d leaving", i)
  698. dbs[i].Close()
  699. }
  700. checkDBs := make(map[string]*NetworkDB)
  701. for i := 3; i < 5; i++ {
  702. db := dbs[i]
  703. checkDBs[db.config.Hostname] = db
  704. }
  705. // Give some time to let the system propagate the messages and free up the ports
  706. check := func(t poll.LogT) poll.Result {
  707. // Verify that the nodes are actually all gone and marked appropiately
  708. for name, db := range checkDBs {
  709. db.RLock()
  710. if (len(db.leftNodes) != 3) || (len(db.failedNodes) != 0) {
  711. for name := range db.leftNodes {
  712. t.Logf("%s: Node %s left", db.config.Hostname, name)
  713. }
  714. for name := range db.failedNodes {
  715. t.Logf("%s: Node %s failed", db.config.Hostname, name)
  716. }
  717. db.RUnlock()
  718. return poll.Continue("%s:Waiting for all nodes to cleanly leave, left: %d, failed nodes: %d", name, len(db.leftNodes), len(db.failedNodes))
  719. }
  720. db.RUnlock()
  721. t.Logf("%s: OK", name)
  722. delete(checkDBs, name)
  723. }
  724. return poll.Success()
  725. }
  726. poll.WaitOn(t, check, poll.WithDelay(time.Second), poll.WithTimeout(pollTimeout()))
  727. // Spawn again the first 3 nodes with different names but same IP:port
  728. for i := 0; i < 3; i++ {
  729. log.G(context.TODO()).Infof("node %d coming back", i)
  730. conf := *dbs[i].config
  731. conf.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
  732. dbs[i] = launchNode(t, conf)
  733. }
  734. // Give some time for the reconnect routine to run, it runs every 6s.
  735. check = func(t poll.LogT) poll.Result {
  736. // Verify that the cluster is again all connected. Note that the 3 previous node did not do any join
  737. for i := 0; i < 5; i++ {
  738. db := dbs[i]
  739. db.RLock()
  740. if len(db.nodes) != 5 {
  741. db.RUnlock()
  742. return poll.Continue("%s:Waiting to connect to all nodes", dbs[i].config.Hostname)
  743. }
  744. if len(db.failedNodes) != 0 {
  745. db.RUnlock()
  746. return poll.Continue("%s:Waiting for 0 failedNodes", dbs[i].config.Hostname)
  747. }
  748. if i < 3 {
  749. // nodes from 0 to 3 has no left nodes
  750. if len(db.leftNodes) != 0 {
  751. db.RUnlock()
  752. return poll.Continue("%s:Waiting to have no leftNodes", dbs[i].config.Hostname)
  753. }
  754. } else {
  755. // nodes from 4 to 5 has the 3 previous left nodes
  756. if len(db.leftNodes) != 3 {
  757. db.RUnlock()
  758. return poll.Continue("%s:Waiting to have 3 leftNodes", dbs[i].config.Hostname)
  759. }
  760. }
  761. db.RUnlock()
  762. }
  763. return poll.Success()
  764. }
  765. poll.WaitOn(t, check, poll.WithDelay(time.Second), poll.WithTimeout(pollTimeout()))
  766. closeNetworkDBInstances(t, dbs)
  767. }