networkdb_test.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915
  1. package networkdb
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "log"
  6. "net"
  7. "os"
  8. "strconv"
  9. "sync/atomic"
  10. "testing"
  11. "time"
  12. "github.com/docker/docker/pkg/stringid"
  13. "github.com/docker/go-events"
  14. "github.com/hashicorp/memberlist"
  15. "github.com/sirupsen/logrus"
  16. "gotest.tools/v3/assert"
  17. is "gotest.tools/v3/assert/cmp"
  18. "gotest.tools/v3/poll"
  19. )
  20. var dbPort int32 = 10000
  21. func TestMain(m *testing.M) {
  22. ioutil.WriteFile("/proc/sys/net/ipv6/conf/lo/disable_ipv6", []byte{'0', '\n'}, 0644)
  23. logrus.SetLevel(logrus.ErrorLevel)
  24. os.Exit(m.Run())
  25. }
  26. func launchNode(t *testing.T, conf Config) *NetworkDB {
  27. t.Helper()
  28. db, err := New(&conf)
  29. assert.NilError(t, err)
  30. return db
  31. }
  32. func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
  33. t.Helper()
  34. var dbs []*NetworkDB
  35. for i := 0; i < num; i++ {
  36. localConfig := *conf
  37. localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
  38. localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
  39. localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1))
  40. db := launchNode(t, localConfig)
  41. if i != 0 {
  42. assert.Check(t, db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)}))
  43. }
  44. dbs = append(dbs, db)
  45. }
  46. // Wait till the cluster creation is successful
  47. check := func(t poll.LogT) poll.Result {
  48. // Check that the cluster is properly created
  49. for i := 0; i < num; i++ {
  50. if num != len(dbs[i].ClusterPeers()) {
  51. return poll.Continue("%s:Waiting for cluser peers to be established", dbs[i].config.Hostname)
  52. }
  53. }
  54. return poll.Success()
  55. }
  56. poll.WaitOn(t, check, poll.WithDelay(2*time.Second), poll.WithTimeout(20*time.Second))
  57. return dbs
  58. }
  59. func closeNetworkDBInstances(t *testing.T, dbs []*NetworkDB) {
  60. t.Helper()
  61. log.Print("Closing DB instances...")
  62. for _, db := range dbs {
  63. db.Close()
  64. }
  65. }
  66. func (db *NetworkDB) verifyNodeExistence(t *testing.T, node string, present bool) {
  67. t.Helper()
  68. for i := 0; i < 80; i++ {
  69. db.RLock()
  70. _, ok := db.nodes[node]
  71. db.RUnlock()
  72. if present && ok {
  73. return
  74. }
  75. if !present && !ok {
  76. return
  77. }
  78. time.Sleep(50 * time.Millisecond)
  79. }
  80. t.Errorf("%v(%v): Node existence verification for node %s failed", db.config.Hostname, db.config.NodeID, node)
  81. }
  82. func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) {
  83. t.Helper()
  84. const sleepInterval = 50 * time.Millisecond
  85. var maxRetries int64
  86. if dl, ok := t.Deadline(); ok {
  87. maxRetries = int64(time.Until(dl) / sleepInterval)
  88. } else {
  89. maxRetries = 80
  90. }
  91. for i := int64(0); i < maxRetries; i++ {
  92. db.RLock()
  93. nn, nnok := db.networks[node]
  94. db.RUnlock()
  95. if nnok {
  96. n, ok := nn[id]
  97. if present && ok {
  98. return
  99. }
  100. if !present &&
  101. ((ok && n.leaving) ||
  102. !ok) {
  103. return
  104. }
  105. }
  106. time.Sleep(sleepInterval)
  107. }
  108. t.Error("Network existence verification failed")
  109. }
  110. func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value string, present bool) {
  111. t.Helper()
  112. n := 80
  113. for i := 0; i < n; i++ {
  114. entry, err := db.getEntry(tname, nid, key)
  115. if present && err == nil && string(entry.value) == value {
  116. return
  117. }
  118. if !present &&
  119. ((err == nil && entry.deleting) ||
  120. (err != nil)) {
  121. return
  122. }
  123. if i == n-1 && !present && err != nil {
  124. return
  125. }
  126. time.Sleep(50 * time.Millisecond)
  127. }
  128. t.Errorf("Entry existence verification test failed for %v(%v)", db.config.Hostname, db.config.NodeID)
  129. }
  130. func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) {
  131. t.Helper()
  132. select {
  133. case rcvdEv := <-ch:
  134. assert.Check(t, is.Equal(fmt.Sprintf("%T", rcvdEv), fmt.Sprintf("%T", ev)))
  135. switch typ := rcvdEv.(type) {
  136. case CreateEvent:
  137. assert.Check(t, is.Equal(tname, typ.Table))
  138. assert.Check(t, is.Equal(nid, typ.NetworkID))
  139. assert.Check(t, is.Equal(key, typ.Key))
  140. assert.Check(t, is.Equal(value, string(typ.Value)))
  141. case UpdateEvent:
  142. assert.Check(t, is.Equal(tname, typ.Table))
  143. assert.Check(t, is.Equal(nid, typ.NetworkID))
  144. assert.Check(t, is.Equal(key, typ.Key))
  145. assert.Check(t, is.Equal(value, string(typ.Value)))
  146. case DeleteEvent:
  147. assert.Check(t, is.Equal(tname, typ.Table))
  148. assert.Check(t, is.Equal(nid, typ.NetworkID))
  149. assert.Check(t, is.Equal(key, typ.Key))
  150. }
  151. case <-time.After(time.Second):
  152. t.Fail()
  153. return
  154. }
  155. }
  156. func TestNetworkDBSimple(t *testing.T) {
  157. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  158. closeNetworkDBInstances(t, dbs)
  159. }
  160. func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
  161. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  162. err := dbs[0].JoinNetwork("network1")
  163. assert.NilError(t, err)
  164. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  165. err = dbs[0].LeaveNetwork("network1")
  166. assert.NilError(t, err)
  167. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", false)
  168. closeNetworkDBInstances(t, dbs)
  169. }
  170. func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
  171. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  172. n := 10
  173. for i := 1; i <= n; i++ {
  174. err := dbs[0].JoinNetwork(fmt.Sprintf("network0%d", i))
  175. assert.NilError(t, err)
  176. }
  177. for i := 1; i <= n; i++ {
  178. err := dbs[1].JoinNetwork(fmt.Sprintf("network1%d", i))
  179. assert.NilError(t, err)
  180. }
  181. for i := 1; i <= n; i++ {
  182. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), true)
  183. }
  184. for i := 1; i <= n; i++ {
  185. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), true)
  186. }
  187. for i := 1; i <= n; i++ {
  188. err := dbs[0].LeaveNetwork(fmt.Sprintf("network0%d", i))
  189. assert.NilError(t, err)
  190. }
  191. for i := 1; i <= n; i++ {
  192. err := dbs[1].LeaveNetwork(fmt.Sprintf("network1%d", i))
  193. assert.NilError(t, err)
  194. }
  195. for i := 1; i <= n; i++ {
  196. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), false)
  197. }
  198. for i := 1; i <= n; i++ {
  199. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), false)
  200. }
  201. closeNetworkDBInstances(t, dbs)
  202. }
  203. func TestNetworkDBCRUDTableEntry(t *testing.T) {
  204. dbs := createNetworkDBInstances(t, 3, "node", DefaultConfig())
  205. err := dbs[0].JoinNetwork("network1")
  206. assert.NilError(t, err)
  207. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  208. err = dbs[1].JoinNetwork("network1")
  209. assert.NilError(t, err)
  210. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  211. assert.NilError(t, err)
  212. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  213. dbs[2].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  214. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  215. assert.NilError(t, err)
  216. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  217. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  218. assert.NilError(t, err)
  219. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  220. closeNetworkDBInstances(t, dbs)
  221. }
  222. func TestNetworkDBCRUDTableEntries(t *testing.T) {
  223. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  224. err := dbs[0].JoinNetwork("network1")
  225. assert.NilError(t, err)
  226. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  227. err = dbs[1].JoinNetwork("network1")
  228. assert.NilError(t, err)
  229. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  230. n := 10
  231. for i := 1; i <= n; i++ {
  232. err = dbs[0].CreateEntry("test_table", "network1",
  233. fmt.Sprintf("test_key0%d", i),
  234. []byte(fmt.Sprintf("test_value0%d", i)))
  235. assert.NilError(t, err)
  236. }
  237. for i := 1; i <= n; i++ {
  238. err = dbs[1].CreateEntry("test_table", "network1",
  239. fmt.Sprintf("test_key1%d", i),
  240. []byte(fmt.Sprintf("test_value1%d", i)))
  241. assert.NilError(t, err)
  242. }
  243. for i := 1; i <= n; i++ {
  244. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  245. fmt.Sprintf("test_key1%d", i),
  246. fmt.Sprintf("test_value1%d", i), true)
  247. assert.NilError(t, err)
  248. }
  249. for i := 1; i <= n; i++ {
  250. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  251. fmt.Sprintf("test_key0%d", i),
  252. fmt.Sprintf("test_value0%d", i), true)
  253. assert.NilError(t, err)
  254. }
  255. // Verify deletes
  256. for i := 1; i <= n; i++ {
  257. err = dbs[0].DeleteEntry("test_table", "network1",
  258. fmt.Sprintf("test_key0%d", i))
  259. assert.NilError(t, err)
  260. }
  261. for i := 1; i <= n; i++ {
  262. err = dbs[1].DeleteEntry("test_table", "network1",
  263. fmt.Sprintf("test_key1%d", i))
  264. assert.NilError(t, err)
  265. }
  266. for i := 1; i <= n; i++ {
  267. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  268. fmt.Sprintf("test_key1%d", i), "", false)
  269. assert.NilError(t, err)
  270. }
  271. for i := 1; i <= n; i++ {
  272. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  273. fmt.Sprintf("test_key0%d", i), "", false)
  274. assert.NilError(t, err)
  275. }
  276. closeNetworkDBInstances(t, dbs)
  277. }
  278. func TestNetworkDBNodeLeave(t *testing.T) {
  279. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  280. err := dbs[0].JoinNetwork("network1")
  281. assert.NilError(t, err)
  282. err = dbs[1].JoinNetwork("network1")
  283. assert.NilError(t, err)
  284. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  285. assert.NilError(t, err)
  286. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  287. dbs[0].Close()
  288. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  289. dbs[1].Close()
  290. }
  291. func TestNetworkDBWatch(t *testing.T) {
  292. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  293. err := dbs[0].JoinNetwork("network1")
  294. assert.NilError(t, err)
  295. err = dbs[1].JoinNetwork("network1")
  296. assert.NilError(t, err)
  297. ch, cancel := dbs[1].Watch("", "", "")
  298. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  299. assert.NilError(t, err)
  300. testWatch(t, ch.C, CreateEvent{}, "test_table", "network1", "test_key", "test_value")
  301. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  302. assert.NilError(t, err)
  303. testWatch(t, ch.C, UpdateEvent{}, "test_table", "network1", "test_key", "test_updated_value")
  304. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  305. assert.NilError(t, err)
  306. testWatch(t, ch.C, DeleteEvent{}, "test_table", "network1", "test_key", "")
  307. cancel()
  308. closeNetworkDBInstances(t, dbs)
  309. }
  310. func TestNetworkDBBulkSync(t *testing.T) {
  311. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  312. err := dbs[0].JoinNetwork("network1")
  313. assert.NilError(t, err)
  314. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  315. n := 1000
  316. for i := 1; i <= n; i++ {
  317. err = dbs[0].CreateEntry("test_table", "network1",
  318. fmt.Sprintf("test_key0%d", i),
  319. []byte(fmt.Sprintf("test_value0%d", i)))
  320. assert.NilError(t, err)
  321. }
  322. err = dbs[1].JoinNetwork("network1")
  323. assert.NilError(t, err)
  324. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  325. for i := 1; i <= n; i++ {
  326. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  327. fmt.Sprintf("test_key0%d", i),
  328. fmt.Sprintf("test_value0%d", i), true)
  329. assert.NilError(t, err)
  330. }
  331. closeNetworkDBInstances(t, dbs)
  332. }
  333. func TestNetworkDBCRUDMediumCluster(t *testing.T) {
  334. n := 5
  335. dbs := createNetworkDBInstances(t, n, "node", DefaultConfig())
  336. for i := 0; i < n; i++ {
  337. for j := 0; j < n; j++ {
  338. if i == j {
  339. continue
  340. }
  341. dbs[i].verifyNodeExistence(t, dbs[j].config.NodeID, true)
  342. }
  343. }
  344. for i := 0; i < n; i++ {
  345. err := dbs[i].JoinNetwork("network1")
  346. assert.NilError(t, err)
  347. }
  348. for i := 0; i < n; i++ {
  349. for j := 0; j < n; j++ {
  350. dbs[i].verifyNetworkExistence(t, dbs[j].config.NodeID, "network1", true)
  351. }
  352. }
  353. err := dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  354. assert.NilError(t, err)
  355. for i := 1; i < n; i++ {
  356. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  357. }
  358. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  359. assert.NilError(t, err)
  360. for i := 1; i < n; i++ {
  361. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  362. }
  363. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  364. assert.NilError(t, err)
  365. for i := 1; i < n; i++ {
  366. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  367. }
  368. for i := 1; i < n; i++ {
  369. _, err = dbs[i].GetEntry("test_table", "network1", "test_key")
  370. assert.Check(t, is.ErrorContains(err, ""))
  371. assert.Check(t, is.Contains(err.Error(), "deleted and pending garbage collection"), err)
  372. }
  373. closeNetworkDBInstances(t, dbs)
  374. }
  375. func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
  376. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  377. // Single node Join/Leave
  378. err := dbs[0].JoinNetwork("network1")
  379. assert.NilError(t, err)
  380. if len(dbs[0].networkNodes["network1"]) != 1 {
  381. t.Fatalf("The networkNodes list has to have be 1 instead of %d", len(dbs[0].networkNodes["network1"]))
  382. }
  383. err = dbs[0].LeaveNetwork("network1")
  384. assert.NilError(t, err)
  385. if len(dbs[0].networkNodes["network1"]) != 0 {
  386. t.Fatalf("The networkNodes list has to have be 0 instead of %d", len(dbs[0].networkNodes["network1"]))
  387. }
  388. // Multiple nodes Join/Leave
  389. err = dbs[0].JoinNetwork("network1")
  390. assert.NilError(t, err)
  391. err = dbs[1].JoinNetwork("network1")
  392. assert.NilError(t, err)
  393. // Wait for the propagation on db[0]
  394. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  395. if len(dbs[0].networkNodes["network1"]) != 2 {
  396. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"])
  397. }
  398. if n, ok := dbs[0].networks[dbs[0].config.NodeID]["network1"]; !ok || n.leaving {
  399. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  400. }
  401. // Wait for the propagation on db[1]
  402. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  403. if len(dbs[1].networkNodes["network1"]) != 2 {
  404. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"])
  405. }
  406. if n, ok := dbs[1].networks[dbs[1].config.NodeID]["network1"]; !ok || n.leaving {
  407. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  408. }
  409. // Try a quick leave/join
  410. err = dbs[0].LeaveNetwork("network1")
  411. assert.NilError(t, err)
  412. err = dbs[0].JoinNetwork("network1")
  413. assert.NilError(t, err)
  414. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  415. if len(dbs[0].networkNodes["network1"]) != 2 {
  416. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"])
  417. }
  418. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  419. if len(dbs[1].networkNodes["network1"]) != 2 {
  420. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"])
  421. }
  422. closeNetworkDBInstances(t, dbs)
  423. }
  424. func TestNetworkDBGarbageCollection(t *testing.T) {
  425. keysWriteDelete := 5
  426. config := DefaultConfig()
  427. config.reapEntryInterval = 30 * time.Second
  428. config.StatsPrintPeriod = 15 * time.Second
  429. dbs := createNetworkDBInstances(t, 3, "node", config)
  430. // 2 Nodes join network
  431. err := dbs[0].JoinNetwork("network1")
  432. assert.NilError(t, err)
  433. err = dbs[1].JoinNetwork("network1")
  434. assert.NilError(t, err)
  435. for i := 0; i < keysWriteDelete; i++ {
  436. err = dbs[i%2].CreateEntry("testTable", "network1", "key-"+strconv.Itoa(i), []byte("value"))
  437. assert.NilError(t, err)
  438. }
  439. time.Sleep(time.Second)
  440. for i := 0; i < keysWriteDelete; i++ {
  441. err = dbs[i%2].DeleteEntry("testTable", "network1", "key-"+strconv.Itoa(i))
  442. assert.NilError(t, err)
  443. }
  444. for i := 0; i < 2; i++ {
  445. assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match")
  446. }
  447. // from this point the timer for the garbage collection started, wait 5 seconds and then join a new node
  448. time.Sleep(5 * time.Second)
  449. err = dbs[2].JoinNetwork("network1")
  450. assert.NilError(t, err)
  451. for i := 0; i < 3; i++ {
  452. assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match")
  453. }
  454. // at this point the entries should had been all deleted
  455. time.Sleep(30 * time.Second)
  456. for i := 0; i < 3; i++ {
  457. assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected")
  458. }
  459. // make sure that entries are not coming back
  460. time.Sleep(15 * time.Second)
  461. for i := 0; i < 3; i++ {
  462. assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected")
  463. }
  464. closeNetworkDBInstances(t, dbs)
  465. }
  466. func TestFindNode(t *testing.T) {
  467. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  468. dbs[0].nodes["active"] = &node{Node: memberlist.Node{Name: "active"}}
  469. dbs[0].failedNodes["failed"] = &node{Node: memberlist.Node{Name: "failed"}}
  470. dbs[0].leftNodes["left"] = &node{Node: memberlist.Node{Name: "left"}}
  471. // active nodes is 2 because the testing node is in the list
  472. assert.Check(t, is.Len(dbs[0].nodes, 2))
  473. assert.Check(t, is.Len(dbs[0].failedNodes, 1))
  474. assert.Check(t, is.Len(dbs[0].leftNodes, 1))
  475. n, currState, m := dbs[0].findNode("active")
  476. assert.Check(t, n != nil)
  477. assert.Check(t, is.Equal("active", n.Name))
  478. assert.Check(t, is.Equal(nodeActiveState, currState))
  479. assert.Check(t, m != nil)
  480. // delete the entry manually
  481. delete(m, "active")
  482. // test if can be still find
  483. n, currState, m = dbs[0].findNode("active")
  484. assert.Check(t, is.Nil(n))
  485. assert.Check(t, is.Equal(nodeNotFound, currState))
  486. assert.Check(t, is.Nil(m))
  487. n, currState, m = dbs[0].findNode("failed")
  488. assert.Check(t, n != nil)
  489. assert.Check(t, is.Equal("failed", n.Name))
  490. assert.Check(t, is.Equal(nodeFailedState, currState))
  491. assert.Check(t, m != nil)
  492. // find and remove
  493. n, currState, m = dbs[0].findNode("left")
  494. assert.Check(t, n != nil)
  495. assert.Check(t, is.Equal("left", n.Name))
  496. assert.Check(t, is.Equal(nodeLeftState, currState))
  497. assert.Check(t, m != nil)
  498. delete(m, "left")
  499. n, currState, m = dbs[0].findNode("left")
  500. assert.Check(t, is.Nil(n))
  501. assert.Check(t, is.Equal(nodeNotFound, currState))
  502. assert.Check(t, is.Nil(m))
  503. closeNetworkDBInstances(t, dbs)
  504. }
  505. func TestChangeNodeState(t *testing.T) {
  506. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  507. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1"}}
  508. dbs[0].nodes["node2"] = &node{Node: memberlist.Node{Name: "node2"}}
  509. dbs[0].nodes["node3"] = &node{Node: memberlist.Node{Name: "node3"}}
  510. // active nodes is 4 because the testing node is in the list
  511. assert.Check(t, is.Len(dbs[0].nodes, 4))
  512. n, currState, m := dbs[0].findNode("node1")
  513. assert.Check(t, n != nil)
  514. assert.Check(t, is.Equal(nodeActiveState, currState))
  515. assert.Check(t, is.Equal("node1", n.Name))
  516. assert.Check(t, m != nil)
  517. // node1 to failed
  518. dbs[0].changeNodeState("node1", nodeFailedState)
  519. n, currState, m = dbs[0].findNode("node1")
  520. assert.Check(t, n != nil)
  521. assert.Check(t, is.Equal(nodeFailedState, currState))
  522. assert.Check(t, is.Equal("node1", n.Name))
  523. assert.Check(t, m != nil)
  524. assert.Check(t, time.Duration(0) != n.reapTime)
  525. // node1 back to active
  526. dbs[0].changeNodeState("node1", nodeActiveState)
  527. n, currState, m = dbs[0].findNode("node1")
  528. assert.Check(t, n != nil)
  529. assert.Check(t, is.Equal(nodeActiveState, currState))
  530. assert.Check(t, is.Equal("node1", n.Name))
  531. assert.Check(t, m != nil)
  532. assert.Check(t, is.Equal(time.Duration(0), n.reapTime))
  533. // node1 to left
  534. dbs[0].changeNodeState("node1", nodeLeftState)
  535. dbs[0].changeNodeState("node2", nodeLeftState)
  536. dbs[0].changeNodeState("node3", nodeLeftState)
  537. n, currState, m = dbs[0].findNode("node1")
  538. assert.Check(t, n != nil)
  539. assert.Check(t, is.Equal(nodeLeftState, currState))
  540. assert.Check(t, is.Equal("node1", n.Name))
  541. assert.Check(t, m != nil)
  542. assert.Check(t, time.Duration(0) != n.reapTime)
  543. n, currState, m = dbs[0].findNode("node2")
  544. assert.Check(t, n != nil)
  545. assert.Check(t, is.Equal(nodeLeftState, currState))
  546. assert.Check(t, is.Equal("node2", n.Name))
  547. assert.Check(t, m != nil)
  548. assert.Check(t, time.Duration(0) != n.reapTime)
  549. n, currState, m = dbs[0].findNode("node3")
  550. assert.Check(t, n != nil)
  551. assert.Check(t, is.Equal(nodeLeftState, currState))
  552. assert.Check(t, is.Equal("node3", n.Name))
  553. assert.Check(t, m != nil)
  554. assert.Check(t, time.Duration(0) != n.reapTime)
  555. // active nodes is 1 because the testing node is in the list
  556. assert.Check(t, is.Len(dbs[0].nodes, 1))
  557. assert.Check(t, is.Len(dbs[0].failedNodes, 0))
  558. assert.Check(t, is.Len(dbs[0].leftNodes, 3))
  559. closeNetworkDBInstances(t, dbs)
  560. }
  561. func TestNodeReincarnation(t *testing.T) {
  562. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  563. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1", Addr: net.ParseIP("192.168.1.1")}}
  564. dbs[0].leftNodes["node2"] = &node{Node: memberlist.Node{Name: "node2", Addr: net.ParseIP("192.168.1.2")}}
  565. dbs[0].failedNodes["node3"] = &node{Node: memberlist.Node{Name: "node3", Addr: net.ParseIP("192.168.1.3")}}
  566. // active nodes is 2 because the testing node is in the list
  567. assert.Check(t, is.Len(dbs[0].nodes, 2))
  568. assert.Check(t, is.Len(dbs[0].failedNodes, 1))
  569. assert.Check(t, is.Len(dbs[0].leftNodes, 1))
  570. b := dbs[0].purgeReincarnation(&memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")})
  571. assert.Check(t, b)
  572. dbs[0].nodes["node4"] = &node{Node: memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}}
  573. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.2")})
  574. assert.Check(t, b)
  575. dbs[0].nodes["node5"] = &node{Node: memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.1")}}
  576. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.3")})
  577. assert.Check(t, b)
  578. dbs[0].nodes["node6"] = &node{Node: memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.1")}}
  579. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.10")})
  580. assert.Check(t, !b)
  581. // active nodes is 1 because the testing node is in the list
  582. assert.Check(t, is.Len(dbs[0].nodes, 4))
  583. assert.Check(t, is.Len(dbs[0].failedNodes, 0))
  584. assert.Check(t, is.Len(dbs[0].leftNodes, 3))
  585. closeNetworkDBInstances(t, dbs)
  586. }
  587. func TestParallelCreate(t *testing.T) {
  588. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  589. startCh := make(chan int)
  590. doneCh := make(chan error)
  591. var success int32
  592. for i := 0; i < 20; i++ {
  593. go func() {
  594. <-startCh
  595. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  596. if err == nil {
  597. atomic.AddInt32(&success, 1)
  598. }
  599. doneCh <- err
  600. }()
  601. }
  602. close(startCh)
  603. for i := 0; i < 20; i++ {
  604. <-doneCh
  605. }
  606. close(doneCh)
  607. // Only 1 write should have succeeded
  608. assert.Check(t, is.Equal(int32(1), success))
  609. closeNetworkDBInstances(t, dbs)
  610. }
  611. func TestParallelDelete(t *testing.T) {
  612. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  613. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  614. assert.NilError(t, err)
  615. startCh := make(chan int)
  616. doneCh := make(chan error)
  617. var success int32
  618. for i := 0; i < 20; i++ {
  619. go func() {
  620. <-startCh
  621. err := dbs[0].DeleteEntry("testTable", "testNetwork", "key")
  622. if err == nil {
  623. atomic.AddInt32(&success, 1)
  624. }
  625. doneCh <- err
  626. }()
  627. }
  628. close(startCh)
  629. for i := 0; i < 20; i++ {
  630. <-doneCh
  631. }
  632. close(doneCh)
  633. // Only 1 write should have succeeded
  634. assert.Check(t, is.Equal(int32(1), success))
  635. closeNetworkDBInstances(t, dbs)
  636. }
  637. func TestNetworkDBIslands(t *testing.T) {
  638. pollTimeout := func() time.Duration {
  639. const defaultTimeout = 120 * time.Second
  640. dl, ok := t.Deadline()
  641. if !ok {
  642. return defaultTimeout
  643. }
  644. if d := time.Until(dl); d <= defaultTimeout {
  645. return d
  646. }
  647. return defaultTimeout
  648. }
  649. logrus.SetLevel(logrus.DebugLevel)
  650. conf := DefaultConfig()
  651. // Shorten durations to speed up test execution.
  652. conf.rejoinClusterDuration = conf.rejoinClusterDuration / 10
  653. conf.rejoinClusterInterval = conf.rejoinClusterInterval / 10
  654. dbs := createNetworkDBInstances(t, 5, "node", conf)
  655. // Get the node IP used currently
  656. node := dbs[0].nodes[dbs[0].config.NodeID]
  657. baseIPStr := node.Addr.String()
  658. // Node 0,1,2 are going to be the 3 bootstrap nodes
  659. members := []string{fmt.Sprintf("%s:%d", baseIPStr, dbs[0].config.BindPort),
  660. fmt.Sprintf("%s:%d", baseIPStr, dbs[1].config.BindPort),
  661. fmt.Sprintf("%s:%d", baseIPStr, dbs[2].config.BindPort)}
  662. // Rejoining will update the list of the bootstrap members
  663. for i := 3; i < 5; i++ {
  664. t.Logf("Re-joining: %d", i)
  665. assert.Check(t, dbs[i].Join(members))
  666. }
  667. // Now the 3 bootstrap nodes will cleanly leave, and will be properly removed from the other 2 nodes
  668. for i := 0; i < 3; i++ {
  669. logrus.Infof("node %d leaving", i)
  670. dbs[i].Close()
  671. }
  672. checkDBs := make(map[string]*NetworkDB)
  673. for i := 3; i < 5; i++ {
  674. db := dbs[i]
  675. checkDBs[db.config.Hostname] = db
  676. }
  677. // Give some time to let the system propagate the messages and free up the ports
  678. check := func(t poll.LogT) poll.Result {
  679. // Verify that the nodes are actually all gone and marked appropiately
  680. for name, db := range checkDBs {
  681. db.RLock()
  682. if (len(db.leftNodes) != 3) || (len(db.failedNodes) != 0) {
  683. for name := range db.leftNodes {
  684. t.Logf("%s: Node %s left", db.config.Hostname, name)
  685. }
  686. for name := range db.failedNodes {
  687. t.Logf("%s: Node %s failed", db.config.Hostname, name)
  688. }
  689. db.RUnlock()
  690. return poll.Continue("%s:Waiting for all nodes to cleanly leave, left: %d, failed nodes: %d", name, len(db.leftNodes), len(db.failedNodes))
  691. }
  692. db.RUnlock()
  693. t.Logf("%s: OK", name)
  694. delete(checkDBs, name)
  695. }
  696. return poll.Success()
  697. }
  698. poll.WaitOn(t, check, poll.WithDelay(time.Second), poll.WithTimeout(pollTimeout()))
  699. // Spawn again the first 3 nodes with different names but same IP:port
  700. for i := 0; i < 3; i++ {
  701. logrus.Infof("node %d coming back", i)
  702. dbs[i].config.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
  703. dbs[i] = launchNode(t, *dbs[i].config)
  704. }
  705. // Give some time for the reconnect routine to run, it runs every 6s.
  706. check = func(t poll.LogT) poll.Result {
  707. // Verify that the cluster is again all connected. Note that the 3 previous node did not do any join
  708. for i := 0; i < 5; i++ {
  709. db := dbs[i]
  710. db.RLock()
  711. if len(db.nodes) != 5 {
  712. db.RUnlock()
  713. return poll.Continue("%s:Waiting to connect to all nodes", dbs[i].config.Hostname)
  714. }
  715. if len(db.failedNodes) != 0 {
  716. db.RUnlock()
  717. return poll.Continue("%s:Waiting for 0 failedNodes", dbs[i].config.Hostname)
  718. }
  719. if i < 3 {
  720. // nodes from 0 to 3 has no left nodes
  721. if len(db.leftNodes) != 0 {
  722. db.RUnlock()
  723. return poll.Continue("%s:Waiting to have no leftNodes", dbs[i].config.Hostname)
  724. }
  725. } else {
  726. // nodes from 4 to 5 has the 3 previous left nodes
  727. if len(db.leftNodes) != 3 {
  728. db.RUnlock()
  729. return poll.Continue("%s:Waiting to have 3 leftNodes", dbs[i].config.Hostname)
  730. }
  731. }
  732. db.RUnlock()
  733. }
  734. return poll.Success()
  735. }
  736. poll.WaitOn(t, check, poll.WithDelay(time.Second), poll.WithTimeout(pollTimeout()))
  737. closeNetworkDBInstances(t, dbs)
  738. }