networkdb_test.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937
  1. package networkdb
  2. import (
  3. "fmt"
  4. "log"
  5. "net"
  6. "os"
  7. "strconv"
  8. "sync/atomic"
  9. "testing"
  10. "time"
  11. "github.com/docker/docker/pkg/stringid"
  12. "github.com/docker/go-events"
  13. "github.com/hashicorp/memberlist"
  14. "github.com/hashicorp/serf/serf"
  15. "github.com/sirupsen/logrus"
  16. "gotest.tools/v3/assert"
  17. is "gotest.tools/v3/assert/cmp"
  18. "gotest.tools/v3/poll"
  19. )
  20. var dbPort int32 = 10000
  21. func TestMain(m *testing.M) {
  22. os.WriteFile("/proc/sys/net/ipv6/conf/lo/disable_ipv6", []byte{'0', '\n'}, 0644)
  23. logrus.SetLevel(logrus.ErrorLevel)
  24. os.Exit(m.Run())
  25. }
  26. func launchNode(t *testing.T, conf Config) *NetworkDB {
  27. t.Helper()
  28. db, err := New(&conf)
  29. assert.NilError(t, err)
  30. return db
  31. }
  32. func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
  33. t.Helper()
  34. var dbs []*NetworkDB
  35. for i := 0; i < num; i++ {
  36. localConfig := *conf
  37. localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
  38. localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
  39. localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1))
  40. db := launchNode(t, localConfig)
  41. if i != 0 {
  42. assert.Check(t, db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)}))
  43. }
  44. dbs = append(dbs, db)
  45. }
  46. // Wait till the cluster creation is successful
  47. check := func(t poll.LogT) poll.Result {
  48. // Check that the cluster is properly created
  49. for i := 0; i < num; i++ {
  50. if num != len(dbs[i].ClusterPeers()) {
  51. return poll.Continue("%s:Waiting for cluster peers to be established", dbs[i].config.Hostname)
  52. }
  53. }
  54. return poll.Success()
  55. }
  56. poll.WaitOn(t, check, poll.WithDelay(2*time.Second), poll.WithTimeout(20*time.Second))
  57. return dbs
  58. }
  59. func closeNetworkDBInstances(t *testing.T, dbs []*NetworkDB) {
  60. t.Helper()
  61. log.Print("Closing DB instances...")
  62. for _, db := range dbs {
  63. db.Close()
  64. }
  65. }
  66. func (db *NetworkDB) verifyNodeExistence(t *testing.T, node string, present bool) {
  67. t.Helper()
  68. for i := 0; i < 80; i++ {
  69. db.RLock()
  70. _, ok := db.nodes[node]
  71. db.RUnlock()
  72. if present && ok {
  73. return
  74. }
  75. if !present && !ok {
  76. return
  77. }
  78. time.Sleep(50 * time.Millisecond)
  79. }
  80. t.Errorf("%v(%v): Node existence verification for node %s failed", db.config.Hostname, db.config.NodeID, node)
  81. }
  82. func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) {
  83. t.Helper()
  84. const sleepInterval = 50 * time.Millisecond
  85. var maxRetries int64
  86. if dl, ok := t.Deadline(); ok {
  87. maxRetries = int64(time.Until(dl) / sleepInterval)
  88. } else {
  89. maxRetries = 80
  90. }
  91. for i := int64(0); i < maxRetries; i++ {
  92. db.RLock()
  93. nn, nnok := db.networks[node]
  94. db.RUnlock()
  95. if nnok {
  96. n, ok := nn[id]
  97. if present && ok {
  98. return
  99. }
  100. if !present &&
  101. ((ok && n.leaving) ||
  102. !ok) {
  103. return
  104. }
  105. }
  106. time.Sleep(sleepInterval)
  107. }
  108. t.Error("Network existence verification failed")
  109. }
  110. func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value string, present bool) {
  111. t.Helper()
  112. n := 80
  113. for i := 0; i < n; i++ {
  114. entry, err := db.getEntry(tname, nid, key)
  115. if present && err == nil && string(entry.value) == value {
  116. return
  117. }
  118. if !present &&
  119. ((err == nil && entry.deleting) ||
  120. (err != nil)) {
  121. return
  122. }
  123. if i == n-1 && !present && err != nil {
  124. return
  125. }
  126. time.Sleep(50 * time.Millisecond)
  127. }
  128. t.Errorf("Entry existence verification test failed for %v(%v)", db.config.Hostname, db.config.NodeID)
  129. }
  130. func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) {
  131. t.Helper()
  132. select {
  133. case rcvdEv := <-ch:
  134. assert.Check(t, is.Equal(fmt.Sprintf("%T", rcvdEv), fmt.Sprintf("%T", ev)))
  135. switch typ := rcvdEv.(type) {
  136. case CreateEvent:
  137. assert.Check(t, is.Equal(tname, typ.Table))
  138. assert.Check(t, is.Equal(nid, typ.NetworkID))
  139. assert.Check(t, is.Equal(key, typ.Key))
  140. assert.Check(t, is.Equal(value, string(typ.Value)))
  141. case UpdateEvent:
  142. assert.Check(t, is.Equal(tname, typ.Table))
  143. assert.Check(t, is.Equal(nid, typ.NetworkID))
  144. assert.Check(t, is.Equal(key, typ.Key))
  145. assert.Check(t, is.Equal(value, string(typ.Value)))
  146. case DeleteEvent:
  147. assert.Check(t, is.Equal(tname, typ.Table))
  148. assert.Check(t, is.Equal(nid, typ.NetworkID))
  149. assert.Check(t, is.Equal(key, typ.Key))
  150. }
  151. case <-time.After(time.Second):
  152. t.Fail()
  153. return
  154. }
  155. }
  156. func TestNetworkDBSimple(t *testing.T) {
  157. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  158. closeNetworkDBInstances(t, dbs)
  159. }
  160. func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
  161. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  162. err := dbs[0].JoinNetwork("network1")
  163. assert.NilError(t, err)
  164. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  165. err = dbs[0].LeaveNetwork("network1")
  166. assert.NilError(t, err)
  167. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", false)
  168. closeNetworkDBInstances(t, dbs)
  169. }
  170. func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
  171. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  172. n := 10
  173. for i := 1; i <= n; i++ {
  174. err := dbs[0].JoinNetwork(fmt.Sprintf("network0%d", i))
  175. assert.NilError(t, err)
  176. }
  177. for i := 1; i <= n; i++ {
  178. err := dbs[1].JoinNetwork(fmt.Sprintf("network1%d", i))
  179. assert.NilError(t, err)
  180. }
  181. for i := 1; i <= n; i++ {
  182. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), true)
  183. }
  184. for i := 1; i <= n; i++ {
  185. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), true)
  186. }
  187. for i := 1; i <= n; i++ {
  188. err := dbs[0].LeaveNetwork(fmt.Sprintf("network0%d", i))
  189. assert.NilError(t, err)
  190. }
  191. for i := 1; i <= n; i++ {
  192. err := dbs[1].LeaveNetwork(fmt.Sprintf("network1%d", i))
  193. assert.NilError(t, err)
  194. }
  195. for i := 1; i <= n; i++ {
  196. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), false)
  197. }
  198. for i := 1; i <= n; i++ {
  199. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), false)
  200. }
  201. closeNetworkDBInstances(t, dbs)
  202. }
  203. func TestNetworkDBCRUDTableEntry(t *testing.T) {
  204. dbs := createNetworkDBInstances(t, 3, "node", DefaultConfig())
  205. err := dbs[0].JoinNetwork("network1")
  206. assert.NilError(t, err)
  207. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  208. err = dbs[1].JoinNetwork("network1")
  209. assert.NilError(t, err)
  210. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  211. assert.NilError(t, err)
  212. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  213. dbs[2].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  214. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  215. assert.NilError(t, err)
  216. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  217. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  218. assert.NilError(t, err)
  219. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  220. closeNetworkDBInstances(t, dbs)
  221. }
  222. func TestNetworkDBCRUDTableEntries(t *testing.T) {
  223. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  224. err := dbs[0].JoinNetwork("network1")
  225. assert.NilError(t, err)
  226. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  227. err = dbs[1].JoinNetwork("network1")
  228. assert.NilError(t, err)
  229. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  230. n := 10
  231. for i := 1; i <= n; i++ {
  232. err = dbs[0].CreateEntry("test_table", "network1",
  233. fmt.Sprintf("test_key0%d", i),
  234. []byte(fmt.Sprintf("test_value0%d", i)))
  235. assert.NilError(t, err)
  236. }
  237. for i := 1; i <= n; i++ {
  238. err = dbs[1].CreateEntry("test_table", "network1",
  239. fmt.Sprintf("test_key1%d", i),
  240. []byte(fmt.Sprintf("test_value1%d", i)))
  241. assert.NilError(t, err)
  242. }
  243. for i := 1; i <= n; i++ {
  244. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  245. fmt.Sprintf("test_key1%d", i),
  246. fmt.Sprintf("test_value1%d", i), true)
  247. assert.NilError(t, err)
  248. }
  249. for i := 1; i <= n; i++ {
  250. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  251. fmt.Sprintf("test_key0%d", i),
  252. fmt.Sprintf("test_value0%d", i), true)
  253. assert.NilError(t, err)
  254. }
  255. // Verify deletes
  256. for i := 1; i <= n; i++ {
  257. err = dbs[0].DeleteEntry("test_table", "network1",
  258. fmt.Sprintf("test_key0%d", i))
  259. assert.NilError(t, err)
  260. }
  261. for i := 1; i <= n; i++ {
  262. err = dbs[1].DeleteEntry("test_table", "network1",
  263. fmt.Sprintf("test_key1%d", i))
  264. assert.NilError(t, err)
  265. }
  266. for i := 1; i <= n; i++ {
  267. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  268. fmt.Sprintf("test_key1%d", i), "", false)
  269. assert.NilError(t, err)
  270. }
  271. for i := 1; i <= n; i++ {
  272. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  273. fmt.Sprintf("test_key0%d", i), "", false)
  274. assert.NilError(t, err)
  275. }
  276. closeNetworkDBInstances(t, dbs)
  277. }
  278. func TestNetworkDBNodeLeave(t *testing.T) {
  279. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  280. err := dbs[0].JoinNetwork("network1")
  281. assert.NilError(t, err)
  282. err = dbs[1].JoinNetwork("network1")
  283. assert.NilError(t, err)
  284. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  285. assert.NilError(t, err)
  286. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  287. dbs[0].Close()
  288. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  289. dbs[1].Close()
  290. }
  291. func TestNetworkDBWatch(t *testing.T) {
  292. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  293. err := dbs[0].JoinNetwork("network1")
  294. assert.NilError(t, err)
  295. err = dbs[1].JoinNetwork("network1")
  296. assert.NilError(t, err)
  297. ch, cancel := dbs[1].Watch("", "", "")
  298. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  299. assert.NilError(t, err)
  300. testWatch(t, ch.C, CreateEvent{}, "test_table", "network1", "test_key", "test_value")
  301. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  302. assert.NilError(t, err)
  303. testWatch(t, ch.C, UpdateEvent{}, "test_table", "network1", "test_key", "test_updated_value")
  304. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  305. assert.NilError(t, err)
  306. testWatch(t, ch.C, DeleteEvent{}, "test_table", "network1", "test_key", "")
  307. cancel()
  308. closeNetworkDBInstances(t, dbs)
  309. }
  310. func TestNetworkDBBulkSync(t *testing.T) {
  311. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  312. err := dbs[0].JoinNetwork("network1")
  313. assert.NilError(t, err)
  314. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  315. n := 1000
  316. for i := 1; i <= n; i++ {
  317. err = dbs[0].CreateEntry("test_table", "network1",
  318. fmt.Sprintf("test_key0%d", i),
  319. []byte(fmt.Sprintf("test_value0%d", i)))
  320. assert.NilError(t, err)
  321. }
  322. err = dbs[1].JoinNetwork("network1")
  323. assert.NilError(t, err)
  324. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  325. for i := 1; i <= n; i++ {
  326. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  327. fmt.Sprintf("test_key0%d", i),
  328. fmt.Sprintf("test_value0%d", i), true)
  329. assert.NilError(t, err)
  330. }
  331. closeNetworkDBInstances(t, dbs)
  332. }
  333. func TestNetworkDBCRUDMediumCluster(t *testing.T) {
  334. n := 5
  335. dbs := createNetworkDBInstances(t, n, "node", DefaultConfig())
  336. for i := 0; i < n; i++ {
  337. for j := 0; j < n; j++ {
  338. if i == j {
  339. continue
  340. }
  341. dbs[i].verifyNodeExistence(t, dbs[j].config.NodeID, true)
  342. }
  343. }
  344. for i := 0; i < n; i++ {
  345. err := dbs[i].JoinNetwork("network1")
  346. assert.NilError(t, err)
  347. }
  348. for i := 0; i < n; i++ {
  349. for j := 0; j < n; j++ {
  350. dbs[i].verifyNetworkExistence(t, dbs[j].config.NodeID, "network1", true)
  351. }
  352. }
  353. err := dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  354. assert.NilError(t, err)
  355. for i := 1; i < n; i++ {
  356. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  357. }
  358. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  359. assert.NilError(t, err)
  360. for i := 1; i < n; i++ {
  361. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  362. }
  363. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  364. assert.NilError(t, err)
  365. for i := 1; i < n; i++ {
  366. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  367. }
  368. for i := 1; i < n; i++ {
  369. _, err = dbs[i].GetEntry("test_table", "network1", "test_key")
  370. assert.Check(t, is.ErrorContains(err, ""))
  371. assert.Check(t, is.Contains(err.Error(), "deleted and pending garbage collection"), err)
  372. }
  373. closeNetworkDBInstances(t, dbs)
  374. }
  375. func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
  376. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  377. var (
  378. dbIndex int32
  379. staleNetworkTime [2]serf.LamportTime
  380. expectNodeCount int
  381. network = "network1"
  382. )
  383. dbChangeWitness := func(t poll.LogT) poll.Result {
  384. db := dbs[dbIndex]
  385. networkTime := db.networkClock.Time()
  386. if networkTime <= staleNetworkTime[dbIndex] {
  387. return poll.Continue("network time is stale, no change registered yet.")
  388. }
  389. count := -1
  390. db.Lock()
  391. if nodes, ok := db.networkNodes[network]; ok {
  392. count = len(nodes)
  393. }
  394. db.Unlock()
  395. if count != expectNodeCount {
  396. return poll.Continue("current number of nodes is %d, expect %d.", count, expectNodeCount)
  397. }
  398. return poll.Success()
  399. }
  400. // Single node Join/Leave
  401. staleNetworkTime[0], staleNetworkTime[1] = dbs[0].networkClock.Time(), dbs[1].networkClock.Time()
  402. err := dbs[0].JoinNetwork("network1")
  403. assert.NilError(t, err)
  404. dbIndex, expectNodeCount = 0, 1
  405. poll.WaitOn(t, dbChangeWitness, poll.WithTimeout(3*time.Second), poll.WithDelay(5*time.Millisecond))
  406. staleNetworkTime[0], staleNetworkTime[1] = dbs[0].networkClock.Time(), dbs[1].networkClock.Time()
  407. err = dbs[0].LeaveNetwork("network1")
  408. assert.NilError(t, err)
  409. dbIndex, expectNodeCount = 0, 0
  410. poll.WaitOn(t, dbChangeWitness, poll.WithTimeout(3*time.Second), poll.WithDelay(5*time.Millisecond))
  411. // Multiple nodes Join/Leave
  412. staleNetworkTime[0], staleNetworkTime[1] = dbs[0].networkClock.Time(), dbs[1].networkClock.Time()
  413. err = dbs[0].JoinNetwork("network1")
  414. assert.NilError(t, err)
  415. err = dbs[1].JoinNetwork("network1")
  416. assert.NilError(t, err)
  417. // Wait for the propagation on db[0]
  418. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  419. dbIndex, expectNodeCount = 0, 2
  420. poll.WaitOn(t, dbChangeWitness, poll.WithTimeout(3*time.Second), poll.WithDelay(5*time.Millisecond))
  421. if n, ok := dbs[0].networks[dbs[0].config.NodeID]["network1"]; !ok || n.leaving {
  422. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  423. }
  424. // Wait for the propagation on db[1]
  425. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  426. dbIndex, expectNodeCount = 1, 2
  427. poll.WaitOn(t, dbChangeWitness, poll.WithTimeout(3*time.Second), poll.WithDelay(5*time.Millisecond))
  428. if n, ok := dbs[1].networks[dbs[1].config.NodeID]["network1"]; !ok || n.leaving {
  429. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  430. }
  431. // Try a quick leave/join
  432. staleNetworkTime[0], staleNetworkTime[1] = dbs[0].networkClock.Time(), dbs[1].networkClock.Time()
  433. err = dbs[0].LeaveNetwork("network1")
  434. assert.NilError(t, err)
  435. err = dbs[0].JoinNetwork("network1")
  436. assert.NilError(t, err)
  437. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  438. dbIndex, expectNodeCount = 0, 2
  439. poll.WaitOn(t, dbChangeWitness, poll.WithTimeout(3*time.Second), poll.WithDelay(5*time.Millisecond))
  440. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  441. dbIndex, expectNodeCount = 1, 2
  442. poll.WaitOn(t, dbChangeWitness, poll.WithTimeout(3*time.Second), poll.WithDelay(5*time.Millisecond))
  443. closeNetworkDBInstances(t, dbs)
  444. }
  445. func TestNetworkDBGarbageCollection(t *testing.T) {
  446. keysWriteDelete := 5
  447. config := DefaultConfig()
  448. config.reapEntryInterval = 30 * time.Second
  449. config.StatsPrintPeriod = 15 * time.Second
  450. dbs := createNetworkDBInstances(t, 3, "node", config)
  451. // 2 Nodes join network
  452. err := dbs[0].JoinNetwork("network1")
  453. assert.NilError(t, err)
  454. err = dbs[1].JoinNetwork("network1")
  455. assert.NilError(t, err)
  456. for i := 0; i < keysWriteDelete; i++ {
  457. err = dbs[i%2].CreateEntry("testTable", "network1", "key-"+strconv.Itoa(i), []byte("value"))
  458. assert.NilError(t, err)
  459. }
  460. time.Sleep(time.Second)
  461. for i := 0; i < keysWriteDelete; i++ {
  462. err = dbs[i%2].DeleteEntry("testTable", "network1", "key-"+strconv.Itoa(i))
  463. assert.NilError(t, err)
  464. }
  465. for i := 0; i < 2; i++ {
  466. assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match")
  467. }
  468. // from this point the timer for the garbage collection started, wait 5 seconds and then join a new node
  469. time.Sleep(5 * time.Second)
  470. err = dbs[2].JoinNetwork("network1")
  471. assert.NilError(t, err)
  472. for i := 0; i < 3; i++ {
  473. assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match")
  474. }
  475. // at this point the entries should had been all deleted
  476. time.Sleep(30 * time.Second)
  477. for i := 0; i < 3; i++ {
  478. assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected")
  479. }
  480. // make sure that entries are not coming back
  481. time.Sleep(15 * time.Second)
  482. for i := 0; i < 3; i++ {
  483. assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected")
  484. }
  485. closeNetworkDBInstances(t, dbs)
  486. }
  487. func TestFindNode(t *testing.T) {
  488. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  489. dbs[0].nodes["active"] = &node{Node: memberlist.Node{Name: "active"}}
  490. dbs[0].failedNodes["failed"] = &node{Node: memberlist.Node{Name: "failed"}}
  491. dbs[0].leftNodes["left"] = &node{Node: memberlist.Node{Name: "left"}}
  492. // active nodes is 2 because the testing node is in the list
  493. assert.Check(t, is.Len(dbs[0].nodes, 2))
  494. assert.Check(t, is.Len(dbs[0].failedNodes, 1))
  495. assert.Check(t, is.Len(dbs[0].leftNodes, 1))
  496. n, currState, m := dbs[0].findNode("active")
  497. assert.Check(t, n != nil)
  498. assert.Check(t, is.Equal("active", n.Name))
  499. assert.Check(t, is.Equal(nodeActiveState, currState))
  500. assert.Check(t, m != nil)
  501. // delete the entry manually
  502. delete(m, "active")
  503. // test if can be still find
  504. n, currState, m = dbs[0].findNode("active")
  505. assert.Check(t, is.Nil(n))
  506. assert.Check(t, is.Equal(nodeNotFound, currState))
  507. assert.Check(t, is.Nil(m))
  508. n, currState, m = dbs[0].findNode("failed")
  509. assert.Check(t, n != nil)
  510. assert.Check(t, is.Equal("failed", n.Name))
  511. assert.Check(t, is.Equal(nodeFailedState, currState))
  512. assert.Check(t, m != nil)
  513. // find and remove
  514. n, currState, m = dbs[0].findNode("left")
  515. assert.Check(t, n != nil)
  516. assert.Check(t, is.Equal("left", n.Name))
  517. assert.Check(t, is.Equal(nodeLeftState, currState))
  518. assert.Check(t, m != nil)
  519. delete(m, "left")
  520. n, currState, m = dbs[0].findNode("left")
  521. assert.Check(t, is.Nil(n))
  522. assert.Check(t, is.Equal(nodeNotFound, currState))
  523. assert.Check(t, is.Nil(m))
  524. closeNetworkDBInstances(t, dbs)
  525. }
  526. func TestChangeNodeState(t *testing.T) {
  527. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  528. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1"}}
  529. dbs[0].nodes["node2"] = &node{Node: memberlist.Node{Name: "node2"}}
  530. dbs[0].nodes["node3"] = &node{Node: memberlist.Node{Name: "node3"}}
  531. // active nodes is 4 because the testing node is in the list
  532. assert.Check(t, is.Len(dbs[0].nodes, 4))
  533. n, currState, m := dbs[0].findNode("node1")
  534. assert.Check(t, n != nil)
  535. assert.Check(t, is.Equal(nodeActiveState, currState))
  536. assert.Check(t, is.Equal("node1", n.Name))
  537. assert.Check(t, m != nil)
  538. // node1 to failed
  539. dbs[0].changeNodeState("node1", nodeFailedState)
  540. n, currState, m = dbs[0].findNode("node1")
  541. assert.Check(t, n != nil)
  542. assert.Check(t, is.Equal(nodeFailedState, currState))
  543. assert.Check(t, is.Equal("node1", n.Name))
  544. assert.Check(t, m != nil)
  545. assert.Check(t, time.Duration(0) != n.reapTime)
  546. // node1 back to active
  547. dbs[0].changeNodeState("node1", nodeActiveState)
  548. n, currState, m = dbs[0].findNode("node1")
  549. assert.Check(t, n != nil)
  550. assert.Check(t, is.Equal(nodeActiveState, currState))
  551. assert.Check(t, is.Equal("node1", n.Name))
  552. assert.Check(t, m != nil)
  553. assert.Check(t, is.Equal(time.Duration(0), n.reapTime))
  554. // node1 to left
  555. dbs[0].changeNodeState("node1", nodeLeftState)
  556. dbs[0].changeNodeState("node2", nodeLeftState)
  557. dbs[0].changeNodeState("node3", nodeLeftState)
  558. n, currState, m = dbs[0].findNode("node1")
  559. assert.Check(t, n != nil)
  560. assert.Check(t, is.Equal(nodeLeftState, currState))
  561. assert.Check(t, is.Equal("node1", n.Name))
  562. assert.Check(t, m != nil)
  563. assert.Check(t, time.Duration(0) != n.reapTime)
  564. n, currState, m = dbs[0].findNode("node2")
  565. assert.Check(t, n != nil)
  566. assert.Check(t, is.Equal(nodeLeftState, currState))
  567. assert.Check(t, is.Equal("node2", n.Name))
  568. assert.Check(t, m != nil)
  569. assert.Check(t, time.Duration(0) != n.reapTime)
  570. n, currState, m = dbs[0].findNode("node3")
  571. assert.Check(t, n != nil)
  572. assert.Check(t, is.Equal(nodeLeftState, currState))
  573. assert.Check(t, is.Equal("node3", n.Name))
  574. assert.Check(t, m != nil)
  575. assert.Check(t, time.Duration(0) != n.reapTime)
  576. // active nodes is 1 because the testing node is in the list
  577. assert.Check(t, is.Len(dbs[0].nodes, 1))
  578. assert.Check(t, is.Len(dbs[0].failedNodes, 0))
  579. assert.Check(t, is.Len(dbs[0].leftNodes, 3))
  580. closeNetworkDBInstances(t, dbs)
  581. }
  582. func TestNodeReincarnation(t *testing.T) {
  583. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  584. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1", Addr: net.ParseIP("192.168.1.1")}}
  585. dbs[0].leftNodes["node2"] = &node{Node: memberlist.Node{Name: "node2", Addr: net.ParseIP("192.168.1.2")}}
  586. dbs[0].failedNodes["node3"] = &node{Node: memberlist.Node{Name: "node3", Addr: net.ParseIP("192.168.1.3")}}
  587. // active nodes is 2 because the testing node is in the list
  588. assert.Check(t, is.Len(dbs[0].nodes, 2))
  589. assert.Check(t, is.Len(dbs[0].failedNodes, 1))
  590. assert.Check(t, is.Len(dbs[0].leftNodes, 1))
  591. b := dbs[0].purgeReincarnation(&memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")})
  592. assert.Check(t, b)
  593. dbs[0].nodes["node4"] = &node{Node: memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}}
  594. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.2")})
  595. assert.Check(t, b)
  596. dbs[0].nodes["node5"] = &node{Node: memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.1")}}
  597. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.3")})
  598. assert.Check(t, b)
  599. dbs[0].nodes["node6"] = &node{Node: memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.1")}}
  600. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.10")})
  601. assert.Check(t, !b)
  602. // active nodes is 1 because the testing node is in the list
  603. assert.Check(t, is.Len(dbs[0].nodes, 4))
  604. assert.Check(t, is.Len(dbs[0].failedNodes, 0))
  605. assert.Check(t, is.Len(dbs[0].leftNodes, 3))
  606. closeNetworkDBInstances(t, dbs)
  607. }
  608. func TestParallelCreate(t *testing.T) {
  609. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  610. startCh := make(chan int)
  611. doneCh := make(chan error)
  612. var success int32
  613. for i := 0; i < 20; i++ {
  614. go func() {
  615. <-startCh
  616. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  617. if err == nil {
  618. atomic.AddInt32(&success, 1)
  619. }
  620. doneCh <- err
  621. }()
  622. }
  623. close(startCh)
  624. for i := 0; i < 20; i++ {
  625. <-doneCh
  626. }
  627. close(doneCh)
  628. // Only 1 write should have succeeded
  629. assert.Check(t, is.Equal(int32(1), success))
  630. closeNetworkDBInstances(t, dbs)
  631. }
  632. func TestParallelDelete(t *testing.T) {
  633. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  634. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  635. assert.NilError(t, err)
  636. startCh := make(chan int)
  637. doneCh := make(chan error)
  638. var success int32
  639. for i := 0; i < 20; i++ {
  640. go func() {
  641. <-startCh
  642. err := dbs[0].DeleteEntry("testTable", "testNetwork", "key")
  643. if err == nil {
  644. atomic.AddInt32(&success, 1)
  645. }
  646. doneCh <- err
  647. }()
  648. }
  649. close(startCh)
  650. for i := 0; i < 20; i++ {
  651. <-doneCh
  652. }
  653. close(doneCh)
  654. // Only 1 write should have succeeded
  655. assert.Check(t, is.Equal(int32(1), success))
  656. closeNetworkDBInstances(t, dbs)
  657. }
  658. func TestNetworkDBIslands(t *testing.T) {
  659. pollTimeout := func() time.Duration {
  660. const defaultTimeout = 120 * time.Second
  661. dl, ok := t.Deadline()
  662. if !ok {
  663. return defaultTimeout
  664. }
  665. if d := time.Until(dl); d <= defaultTimeout {
  666. return d
  667. }
  668. return defaultTimeout
  669. }
  670. logrus.SetLevel(logrus.DebugLevel)
  671. conf := DefaultConfig()
  672. // Shorten durations to speed up test execution.
  673. conf.rejoinClusterDuration = conf.rejoinClusterDuration / 10
  674. conf.rejoinClusterInterval = conf.rejoinClusterInterval / 10
  675. dbs := createNetworkDBInstances(t, 5, "node", conf)
  676. // Get the node IP used currently
  677. node := dbs[0].nodes[dbs[0].config.NodeID]
  678. baseIPStr := node.Addr.String()
  679. // Node 0,1,2 are going to be the 3 bootstrap nodes
  680. members := []string{fmt.Sprintf("%s:%d", baseIPStr, dbs[0].config.BindPort),
  681. fmt.Sprintf("%s:%d", baseIPStr, dbs[1].config.BindPort),
  682. fmt.Sprintf("%s:%d", baseIPStr, dbs[2].config.BindPort)}
  683. // Rejoining will update the list of the bootstrap members
  684. for i := 3; i < 5; i++ {
  685. t.Logf("Re-joining: %d", i)
  686. assert.Check(t, dbs[i].Join(members))
  687. }
  688. // Now the 3 bootstrap nodes will cleanly leave, and will be properly removed from the other 2 nodes
  689. for i := 0; i < 3; i++ {
  690. logrus.Infof("node %d leaving", i)
  691. dbs[i].Close()
  692. }
  693. checkDBs := make(map[string]*NetworkDB)
  694. for i := 3; i < 5; i++ {
  695. db := dbs[i]
  696. checkDBs[db.config.Hostname] = db
  697. }
  698. // Give some time to let the system propagate the messages and free up the ports
  699. check := func(t poll.LogT) poll.Result {
  700. // Verify that the nodes are actually all gone and marked appropiately
  701. for name, db := range checkDBs {
  702. db.RLock()
  703. if (len(db.leftNodes) != 3) || (len(db.failedNodes) != 0) {
  704. for name := range db.leftNodes {
  705. t.Logf("%s: Node %s left", db.config.Hostname, name)
  706. }
  707. for name := range db.failedNodes {
  708. t.Logf("%s: Node %s failed", db.config.Hostname, name)
  709. }
  710. db.RUnlock()
  711. return poll.Continue("%s:Waiting for all nodes to cleanly leave, left: %d, failed nodes: %d", name, len(db.leftNodes), len(db.failedNodes))
  712. }
  713. db.RUnlock()
  714. t.Logf("%s: OK", name)
  715. delete(checkDBs, name)
  716. }
  717. return poll.Success()
  718. }
  719. poll.WaitOn(t, check, poll.WithDelay(time.Second), poll.WithTimeout(pollTimeout()))
  720. // Spawn again the first 3 nodes with different names but same IP:port
  721. for i := 0; i < 3; i++ {
  722. logrus.Infof("node %d coming back", i)
  723. dbs[i].config.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
  724. dbs[i] = launchNode(t, *dbs[i].config)
  725. }
  726. // Give some time for the reconnect routine to run, it runs every 6s.
  727. check = func(t poll.LogT) poll.Result {
  728. // Verify that the cluster is again all connected. Note that the 3 previous node did not do any join
  729. for i := 0; i < 5; i++ {
  730. db := dbs[i]
  731. db.RLock()
  732. if len(db.nodes) != 5 {
  733. db.RUnlock()
  734. return poll.Continue("%s:Waiting to connect to all nodes", dbs[i].config.Hostname)
  735. }
  736. if len(db.failedNodes) != 0 {
  737. db.RUnlock()
  738. return poll.Continue("%s:Waiting for 0 failedNodes", dbs[i].config.Hostname)
  739. }
  740. if i < 3 {
  741. // nodes from 0 to 3 has no left nodes
  742. if len(db.leftNodes) != 0 {
  743. db.RUnlock()
  744. return poll.Continue("%s:Waiting to have no leftNodes", dbs[i].config.Hostname)
  745. }
  746. } else {
  747. // nodes from 4 to 5 has the 3 previous left nodes
  748. if len(db.leftNodes) != 3 {
  749. db.RUnlock()
  750. return poll.Continue("%s:Waiting to have 3 leftNodes", dbs[i].config.Hostname)
  751. }
  752. }
  753. db.RUnlock()
  754. }
  755. return poll.Success()
  756. }
  757. poll.WaitOn(t, check, poll.WithDelay(time.Second), poll.WithTimeout(pollTimeout()))
  758. closeNetworkDBInstances(t, dbs)
  759. }