networkdb_test.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864
  1. package networkdb
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "log"
  6. "net"
  7. "os"
  8. "strings"
  9. "sync/atomic"
  10. "testing"
  11. "time"
  12. "github.com/docker/docker/pkg/stringid"
  13. "github.com/docker/go-events"
  14. "github.com/hashicorp/memberlist"
  15. "github.com/sirupsen/logrus"
  16. "gotest.tools/assert"
  17. is "gotest.tools/assert/cmp"
  18. // this takes care of the incontainer flag
  19. _ "github.com/docker/libnetwork/testutils"
  20. )
  21. var dbPort int32 = 10000
  22. func TestMain(m *testing.M) {
  23. ioutil.WriteFile("/proc/sys/net/ipv6/conf/lo/disable_ipv6", []byte{'0', '\n'}, 0644)
  24. logrus.SetLevel(logrus.ErrorLevel)
  25. os.Exit(m.Run())
  26. }
  27. func launchNode(t *testing.T, conf Config) *NetworkDB {
  28. db, err := New(&conf)
  29. assert.NilError(t, err)
  30. return db
  31. }
  32. func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
  33. var dbs []*NetworkDB
  34. for i := 0; i < num; i++ {
  35. localConfig := *conf
  36. localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
  37. localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
  38. localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1))
  39. db := launchNode(t, localConfig)
  40. if i != 0 {
  41. assert.Check(t, db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)}))
  42. }
  43. dbs = append(dbs, db)
  44. }
  45. // Check that the cluster is properly created
  46. for i := 0; i < num; i++ {
  47. if num != len(dbs[i].ClusterPeers()) {
  48. t.Fatalf("Number of nodes for %s into the cluster does not match %d != %d",
  49. dbs[i].config.Hostname, num, len(dbs[i].ClusterPeers()))
  50. }
  51. }
  52. return dbs
  53. }
  54. func closeNetworkDBInstances(dbs []*NetworkDB) {
  55. log.Print("Closing DB instances...")
  56. for _, db := range dbs {
  57. db.Close()
  58. }
  59. }
  60. func (db *NetworkDB) verifyNodeExistence(t *testing.T, node string, present bool) {
  61. for i := 0; i < 80; i++ {
  62. db.RLock()
  63. _, ok := db.nodes[node]
  64. db.RUnlock()
  65. if present && ok {
  66. return
  67. }
  68. if !present && !ok {
  69. return
  70. }
  71. time.Sleep(50 * time.Millisecond)
  72. }
  73. t.Error(fmt.Sprintf("%v(%v): Node existence verification for node %s failed", db.config.Hostname, db.config.NodeID, node))
  74. }
  75. func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) {
  76. for i := 0; i < 80; i++ {
  77. db.RLock()
  78. nn, nnok := db.networks[node]
  79. db.RUnlock()
  80. if nnok {
  81. n, ok := nn[id]
  82. if present && ok {
  83. return
  84. }
  85. if !present &&
  86. ((ok && n.leaving) ||
  87. !ok) {
  88. return
  89. }
  90. }
  91. time.Sleep(50 * time.Millisecond)
  92. }
  93. t.Error("Network existence verification failed")
  94. }
  95. func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value string, present bool) {
  96. n := 80
  97. for i := 0; i < n; i++ {
  98. entry, err := db.getEntry(tname, nid, key)
  99. if present && err == nil && string(entry.value) == value {
  100. return
  101. }
  102. if !present &&
  103. ((err == nil && entry.deleting) ||
  104. (err != nil)) {
  105. return
  106. }
  107. if i == n-1 && !present && err != nil {
  108. return
  109. }
  110. time.Sleep(50 * time.Millisecond)
  111. }
  112. t.Error(fmt.Sprintf("Entry existence verification test failed for %v(%v)", db.config.Hostname, db.config.NodeID))
  113. }
  114. func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) {
  115. select {
  116. case rcvdEv := <-ch:
  117. assert.Check(t, is.Equal(fmt.Sprintf("%T", rcvdEv), fmt.Sprintf("%T", ev)))
  118. switch rcvdEv.(type) {
  119. case CreateEvent:
  120. assert.Check(t, is.Equal(tname, rcvdEv.(CreateEvent).Table))
  121. assert.Check(t, is.Equal(nid, rcvdEv.(CreateEvent).NetworkID))
  122. assert.Check(t, is.Equal(key, rcvdEv.(CreateEvent).Key))
  123. assert.Check(t, is.Equal(value, string(rcvdEv.(CreateEvent).Value)))
  124. case UpdateEvent:
  125. assert.Check(t, is.Equal(tname, rcvdEv.(UpdateEvent).Table))
  126. assert.Check(t, is.Equal(nid, rcvdEv.(UpdateEvent).NetworkID))
  127. assert.Check(t, is.Equal(key, rcvdEv.(UpdateEvent).Key))
  128. assert.Check(t, is.Equal(value, string(rcvdEv.(UpdateEvent).Value)))
  129. case DeleteEvent:
  130. assert.Check(t, is.Equal(tname, rcvdEv.(DeleteEvent).Table))
  131. assert.Check(t, is.Equal(nid, rcvdEv.(DeleteEvent).NetworkID))
  132. assert.Check(t, is.Equal(key, rcvdEv.(DeleteEvent).Key))
  133. }
  134. case <-time.After(time.Second):
  135. t.Fail()
  136. return
  137. }
  138. }
  139. func TestNetworkDBSimple(t *testing.T) {
  140. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  141. closeNetworkDBInstances(dbs)
  142. }
  143. func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
  144. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  145. err := dbs[0].JoinNetwork("network1")
  146. assert.NilError(t, err)
  147. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  148. err = dbs[0].LeaveNetwork("network1")
  149. assert.NilError(t, err)
  150. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", false)
  151. closeNetworkDBInstances(dbs)
  152. }
  153. func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
  154. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  155. n := 10
  156. for i := 1; i <= n; i++ {
  157. err := dbs[0].JoinNetwork(fmt.Sprintf("network0%d", i))
  158. assert.NilError(t, err)
  159. }
  160. for i := 1; i <= n; i++ {
  161. err := dbs[1].JoinNetwork(fmt.Sprintf("network1%d", i))
  162. assert.NilError(t, err)
  163. }
  164. for i := 1; i <= n; i++ {
  165. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), true)
  166. }
  167. for i := 1; i <= n; i++ {
  168. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), true)
  169. }
  170. for i := 1; i <= n; i++ {
  171. err := dbs[0].LeaveNetwork(fmt.Sprintf("network0%d", i))
  172. assert.NilError(t, err)
  173. }
  174. for i := 1; i <= n; i++ {
  175. err := dbs[1].LeaveNetwork(fmt.Sprintf("network1%d", i))
  176. assert.NilError(t, err)
  177. }
  178. for i := 1; i <= n; i++ {
  179. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), false)
  180. }
  181. for i := 1; i <= n; i++ {
  182. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), false)
  183. }
  184. closeNetworkDBInstances(dbs)
  185. }
  186. func TestNetworkDBCRUDTableEntry(t *testing.T) {
  187. dbs := createNetworkDBInstances(t, 3, "node", DefaultConfig())
  188. err := dbs[0].JoinNetwork("network1")
  189. assert.NilError(t, err)
  190. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  191. err = dbs[1].JoinNetwork("network1")
  192. assert.NilError(t, err)
  193. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  194. assert.NilError(t, err)
  195. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  196. dbs[2].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  197. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  198. assert.NilError(t, err)
  199. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  200. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  201. assert.NilError(t, err)
  202. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  203. closeNetworkDBInstances(dbs)
  204. }
  205. func TestNetworkDBCRUDTableEntries(t *testing.T) {
  206. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  207. err := dbs[0].JoinNetwork("network1")
  208. assert.NilError(t, err)
  209. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  210. err = dbs[1].JoinNetwork("network1")
  211. assert.NilError(t, err)
  212. n := 10
  213. for i := 1; i <= n; i++ {
  214. err = dbs[0].CreateEntry("test_table", "network1",
  215. fmt.Sprintf("test_key0%d", i),
  216. []byte(fmt.Sprintf("test_value0%d", i)))
  217. assert.NilError(t, err)
  218. }
  219. for i := 1; i <= n; i++ {
  220. err = dbs[1].CreateEntry("test_table", "network1",
  221. fmt.Sprintf("test_key1%d", i),
  222. []byte(fmt.Sprintf("test_value1%d", i)))
  223. assert.NilError(t, err)
  224. }
  225. for i := 1; i <= n; i++ {
  226. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  227. fmt.Sprintf("test_key1%d", i),
  228. fmt.Sprintf("test_value1%d", i), true)
  229. assert.NilError(t, err)
  230. }
  231. for i := 1; i <= n; i++ {
  232. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  233. fmt.Sprintf("test_key0%d", i),
  234. fmt.Sprintf("test_value0%d", i), true)
  235. assert.NilError(t, err)
  236. }
  237. // Verify deletes
  238. for i := 1; i <= n; i++ {
  239. err = dbs[0].DeleteEntry("test_table", "network1",
  240. fmt.Sprintf("test_key0%d", i))
  241. assert.NilError(t, err)
  242. }
  243. for i := 1; i <= n; i++ {
  244. err = dbs[1].DeleteEntry("test_table", "network1",
  245. fmt.Sprintf("test_key1%d", i))
  246. assert.NilError(t, err)
  247. }
  248. for i := 1; i <= n; i++ {
  249. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  250. fmt.Sprintf("test_key1%d", i), "", false)
  251. assert.NilError(t, err)
  252. }
  253. for i := 1; i <= n; i++ {
  254. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  255. fmt.Sprintf("test_key0%d", i), "", false)
  256. assert.NilError(t, err)
  257. }
  258. closeNetworkDBInstances(dbs)
  259. }
  260. func TestNetworkDBNodeLeave(t *testing.T) {
  261. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  262. err := dbs[0].JoinNetwork("network1")
  263. assert.NilError(t, err)
  264. err = dbs[1].JoinNetwork("network1")
  265. assert.NilError(t, err)
  266. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  267. assert.NilError(t, err)
  268. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  269. dbs[0].Close()
  270. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  271. dbs[1].Close()
  272. }
  273. func TestNetworkDBWatch(t *testing.T) {
  274. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  275. err := dbs[0].JoinNetwork("network1")
  276. assert.NilError(t, err)
  277. err = dbs[1].JoinNetwork("network1")
  278. assert.NilError(t, err)
  279. ch, cancel := dbs[1].Watch("", "", "")
  280. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  281. assert.NilError(t, err)
  282. testWatch(t, ch.C, CreateEvent{}, "test_table", "network1", "test_key", "test_value")
  283. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  284. assert.NilError(t, err)
  285. testWatch(t, ch.C, UpdateEvent{}, "test_table", "network1", "test_key", "test_updated_value")
  286. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  287. assert.NilError(t, err)
  288. testWatch(t, ch.C, DeleteEvent{}, "test_table", "network1", "test_key", "")
  289. cancel()
  290. closeNetworkDBInstances(dbs)
  291. }
  292. func TestNetworkDBBulkSync(t *testing.T) {
  293. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  294. err := dbs[0].JoinNetwork("network1")
  295. assert.NilError(t, err)
  296. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  297. n := 1000
  298. for i := 1; i <= n; i++ {
  299. err = dbs[0].CreateEntry("test_table", "network1",
  300. fmt.Sprintf("test_key0%d", i),
  301. []byte(fmt.Sprintf("test_value0%d", i)))
  302. assert.NilError(t, err)
  303. }
  304. err = dbs[1].JoinNetwork("network1")
  305. assert.NilError(t, err)
  306. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  307. for i := 1; i <= n; i++ {
  308. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  309. fmt.Sprintf("test_key0%d", i),
  310. fmt.Sprintf("test_value0%d", i), true)
  311. assert.NilError(t, err)
  312. }
  313. closeNetworkDBInstances(dbs)
  314. }
  315. func TestNetworkDBCRUDMediumCluster(t *testing.T) {
  316. n := 5
  317. dbs := createNetworkDBInstances(t, n, "node", DefaultConfig())
  318. for i := 0; i < n; i++ {
  319. for j := 0; j < n; j++ {
  320. if i == j {
  321. continue
  322. }
  323. dbs[i].verifyNodeExistence(t, dbs[j].config.NodeID, true)
  324. }
  325. }
  326. for i := 0; i < n; i++ {
  327. err := dbs[i].JoinNetwork("network1")
  328. assert.NilError(t, err)
  329. }
  330. for i := 0; i < n; i++ {
  331. for j := 0; j < n; j++ {
  332. dbs[i].verifyNetworkExistence(t, dbs[j].config.NodeID, "network1", true)
  333. }
  334. }
  335. err := dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  336. assert.NilError(t, err)
  337. for i := 1; i < n; i++ {
  338. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  339. }
  340. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  341. assert.NilError(t, err)
  342. for i := 1; i < n; i++ {
  343. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  344. }
  345. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  346. assert.NilError(t, err)
  347. for i := 1; i < n; i++ {
  348. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  349. }
  350. for i := 1; i < n; i++ {
  351. _, err = dbs[i].GetEntry("test_table", "network1", "test_key")
  352. assert.Check(t, is.ErrorContains(err, ""))
  353. assert.Check(t, strings.Contains(err.Error(), "deleted and pending garbage collection"))
  354. }
  355. closeNetworkDBInstances(dbs)
  356. }
  357. func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
  358. maxRetry := 5
  359. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  360. // Single node Join/Leave
  361. err := dbs[0].JoinNetwork("network1")
  362. assert.NilError(t, err)
  363. if len(dbs[0].networkNodes["network1"]) != 1 {
  364. t.Fatalf("The networkNodes list has to have be 1 instead of %d", len(dbs[0].networkNodes["network1"]))
  365. }
  366. err = dbs[0].LeaveNetwork("network1")
  367. assert.NilError(t, err)
  368. if len(dbs[0].networkNodes["network1"]) != 0 {
  369. t.Fatalf("The networkNodes list has to have be 0 instead of %d", len(dbs[0].networkNodes["network1"]))
  370. }
  371. // Multiple nodes Join/Leave
  372. err = dbs[0].JoinNetwork("network1")
  373. assert.NilError(t, err)
  374. err = dbs[1].JoinNetwork("network1")
  375. assert.NilError(t, err)
  376. // Wait for the propagation on db[0]
  377. for i := 0; i < maxRetry; i++ {
  378. if len(dbs[0].networkNodes["network1"]) == 2 {
  379. break
  380. }
  381. time.Sleep(1 * time.Second)
  382. }
  383. if len(dbs[0].networkNodes["network1"]) != 2 {
  384. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"])
  385. }
  386. if n, ok := dbs[0].networks[dbs[0].config.NodeID]["network1"]; !ok || n.leaving {
  387. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  388. }
  389. // Wait for the propagation on db[1]
  390. for i := 0; i < maxRetry; i++ {
  391. if len(dbs[1].networkNodes["network1"]) == 2 {
  392. break
  393. }
  394. time.Sleep(1 * time.Second)
  395. }
  396. if len(dbs[1].networkNodes["network1"]) != 2 {
  397. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"])
  398. }
  399. if n, ok := dbs[1].networks[dbs[1].config.NodeID]["network1"]; !ok || n.leaving {
  400. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  401. }
  402. // Try a quick leave/join
  403. err = dbs[0].LeaveNetwork("network1")
  404. assert.NilError(t, err)
  405. err = dbs[0].JoinNetwork("network1")
  406. assert.NilError(t, err)
  407. for i := 0; i < maxRetry; i++ {
  408. if len(dbs[0].networkNodes["network1"]) == 2 {
  409. break
  410. }
  411. time.Sleep(1 * time.Second)
  412. }
  413. if len(dbs[0].networkNodes["network1"]) != 2 {
  414. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"])
  415. }
  416. for i := 0; i < maxRetry; i++ {
  417. if len(dbs[1].networkNodes["network1"]) == 2 {
  418. break
  419. }
  420. time.Sleep(1 * time.Second)
  421. }
  422. if len(dbs[1].networkNodes["network1"]) != 2 {
  423. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"])
  424. }
  425. closeNetworkDBInstances(dbs)
  426. }
  427. func TestNetworkDBGarbageCollection(t *testing.T) {
  428. keysWriteDelete := 5
  429. config := DefaultConfig()
  430. config.reapEntryInterval = 30 * time.Second
  431. config.StatsPrintPeriod = 15 * time.Second
  432. dbs := createNetworkDBInstances(t, 3, "node", config)
  433. // 2 Nodes join network
  434. err := dbs[0].JoinNetwork("network1")
  435. assert.NilError(t, err)
  436. err = dbs[1].JoinNetwork("network1")
  437. assert.NilError(t, err)
  438. for i := 0; i < keysWriteDelete; i++ {
  439. err = dbs[i%2].CreateEntry("testTable", "network1", "key-"+string(i), []byte("value"))
  440. assert.NilError(t, err)
  441. }
  442. time.Sleep(time.Second)
  443. for i := 0; i < keysWriteDelete; i++ {
  444. err = dbs[i%2].DeleteEntry("testTable", "network1", "key-"+string(i))
  445. assert.NilError(t, err)
  446. }
  447. for i := 0; i < 2; i++ {
  448. assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match")
  449. }
  450. // from this point the timer for the garbage collection started, wait 5 seconds and then join a new node
  451. time.Sleep(5 * time.Second)
  452. err = dbs[2].JoinNetwork("network1")
  453. assert.NilError(t, err)
  454. for i := 0; i < 3; i++ {
  455. assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match")
  456. }
  457. // at this point the entries should had been all deleted
  458. time.Sleep(30 * time.Second)
  459. for i := 0; i < 3; i++ {
  460. assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected")
  461. }
  462. // make sure that entries are not coming back
  463. time.Sleep(15 * time.Second)
  464. for i := 0; i < 3; i++ {
  465. assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected")
  466. }
  467. closeNetworkDBInstances(dbs)
  468. }
  469. func TestFindNode(t *testing.T) {
  470. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  471. dbs[0].nodes["active"] = &node{Node: memberlist.Node{Name: "active"}}
  472. dbs[0].failedNodes["failed"] = &node{Node: memberlist.Node{Name: "failed"}}
  473. dbs[0].leftNodes["left"] = &node{Node: memberlist.Node{Name: "left"}}
  474. // active nodes is 2 because the testing node is in the list
  475. assert.Check(t, is.Len(dbs[0].nodes, 2))
  476. assert.Check(t, is.Len(dbs[0].failedNodes, 1))
  477. assert.Check(t, is.Len(dbs[0].leftNodes, 1))
  478. n, currState, m := dbs[0].findNode("active")
  479. assert.Check(t, n != nil)
  480. assert.Check(t, is.Equal("active", n.Name))
  481. assert.Check(t, is.Equal(nodeActiveState, currState))
  482. assert.Check(t, m != nil)
  483. // delete the entry manually
  484. delete(m, "active")
  485. // test if can be still find
  486. n, currState, m = dbs[0].findNode("active")
  487. assert.Check(t, is.Nil(n))
  488. assert.Check(t, is.Equal(nodeNotFound, currState))
  489. assert.Check(t, is.Nil(m))
  490. n, currState, m = dbs[0].findNode("failed")
  491. assert.Check(t, n != nil)
  492. assert.Check(t, is.Equal("failed", n.Name))
  493. assert.Check(t, is.Equal(nodeFailedState, currState))
  494. assert.Check(t, m != nil)
  495. // find and remove
  496. n, currState, m = dbs[0].findNode("left")
  497. assert.Check(t, n != nil)
  498. assert.Check(t, is.Equal("left", n.Name))
  499. assert.Check(t, is.Equal(nodeLeftState, currState))
  500. assert.Check(t, m != nil)
  501. delete(m, "left")
  502. n, currState, m = dbs[0].findNode("left")
  503. assert.Check(t, is.Nil(n))
  504. assert.Check(t, is.Equal(nodeNotFound, currState))
  505. assert.Check(t, is.Nil(m))
  506. closeNetworkDBInstances(dbs)
  507. }
  508. func TestChangeNodeState(t *testing.T) {
  509. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  510. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1"}}
  511. dbs[0].nodes["node2"] = &node{Node: memberlist.Node{Name: "node2"}}
  512. dbs[0].nodes["node3"] = &node{Node: memberlist.Node{Name: "node3"}}
  513. // active nodes is 4 because the testing node is in the list
  514. assert.Check(t, is.Len(dbs[0].nodes, 4))
  515. n, currState, m := dbs[0].findNode("node1")
  516. assert.Check(t, n != nil)
  517. assert.Check(t, is.Equal(nodeActiveState, currState))
  518. assert.Check(t, is.Equal("node1", n.Name))
  519. assert.Check(t, m != nil)
  520. // node1 to failed
  521. dbs[0].changeNodeState("node1", nodeFailedState)
  522. n, currState, m = dbs[0].findNode("node1")
  523. assert.Check(t, n != nil)
  524. assert.Check(t, is.Equal(nodeFailedState, currState))
  525. assert.Check(t, is.Equal("node1", n.Name))
  526. assert.Check(t, m != nil)
  527. assert.Check(t, time.Duration(0) != n.reapTime)
  528. // node1 back to active
  529. dbs[0].changeNodeState("node1", nodeActiveState)
  530. n, currState, m = dbs[0].findNode("node1")
  531. assert.Check(t, n != nil)
  532. assert.Check(t, is.Equal(nodeActiveState, currState))
  533. assert.Check(t, is.Equal("node1", n.Name))
  534. assert.Check(t, m != nil)
  535. assert.Check(t, is.Equal(time.Duration(0), n.reapTime))
  536. // node1 to left
  537. dbs[0].changeNodeState("node1", nodeLeftState)
  538. dbs[0].changeNodeState("node2", nodeLeftState)
  539. dbs[0].changeNodeState("node3", nodeLeftState)
  540. n, currState, m = dbs[0].findNode("node1")
  541. assert.Check(t, n != nil)
  542. assert.Check(t, is.Equal(nodeLeftState, currState))
  543. assert.Check(t, is.Equal("node1", n.Name))
  544. assert.Check(t, m != nil)
  545. assert.Check(t, time.Duration(0) != n.reapTime)
  546. n, currState, m = dbs[0].findNode("node2")
  547. assert.Check(t, n != nil)
  548. assert.Check(t, is.Equal(nodeLeftState, currState))
  549. assert.Check(t, is.Equal("node2", n.Name))
  550. assert.Check(t, m != nil)
  551. assert.Check(t, time.Duration(0) != n.reapTime)
  552. n, currState, m = dbs[0].findNode("node3")
  553. assert.Check(t, n != nil)
  554. assert.Check(t, is.Equal(nodeLeftState, currState))
  555. assert.Check(t, is.Equal("node3", n.Name))
  556. assert.Check(t, m != nil)
  557. assert.Check(t, time.Duration(0) != n.reapTime)
  558. // active nodes is 1 because the testing node is in the list
  559. assert.Check(t, is.Len(dbs[0].nodes, 1))
  560. assert.Check(t, is.Len(dbs[0].failedNodes, 0))
  561. assert.Check(t, is.Len(dbs[0].leftNodes, 3))
  562. closeNetworkDBInstances(dbs)
  563. }
  564. func TestNodeReincarnation(t *testing.T) {
  565. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  566. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1", Addr: net.ParseIP("192.168.1.1")}}
  567. dbs[0].leftNodes["node2"] = &node{Node: memberlist.Node{Name: "node2", Addr: net.ParseIP("192.168.1.2")}}
  568. dbs[0].failedNodes["node3"] = &node{Node: memberlist.Node{Name: "node3", Addr: net.ParseIP("192.168.1.3")}}
  569. // active nodes is 2 because the testing node is in the list
  570. assert.Check(t, is.Len(dbs[0].nodes, 2))
  571. assert.Check(t, is.Len(dbs[0].failedNodes, 1))
  572. assert.Check(t, is.Len(dbs[0].leftNodes, 1))
  573. b := dbs[0].purgeReincarnation(&memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")})
  574. assert.Check(t, b)
  575. dbs[0].nodes["node4"] = &node{Node: memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}}
  576. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.2")})
  577. assert.Check(t, b)
  578. dbs[0].nodes["node5"] = &node{Node: memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.1")}}
  579. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.3")})
  580. assert.Check(t, b)
  581. dbs[0].nodes["node6"] = &node{Node: memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.1")}}
  582. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.10")})
  583. assert.Check(t, !b)
  584. // active nodes is 1 because the testing node is in the list
  585. assert.Check(t, is.Len(dbs[0].nodes, 4))
  586. assert.Check(t, is.Len(dbs[0].failedNodes, 0))
  587. assert.Check(t, is.Len(dbs[0].leftNodes, 3))
  588. closeNetworkDBInstances(dbs)
  589. }
  590. func TestParallelCreate(t *testing.T) {
  591. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  592. startCh := make(chan int)
  593. doneCh := make(chan error)
  594. var success int32
  595. for i := 0; i < 20; i++ {
  596. go func() {
  597. <-startCh
  598. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  599. if err == nil {
  600. atomic.AddInt32(&success, 1)
  601. }
  602. doneCh <- err
  603. }()
  604. }
  605. close(startCh)
  606. for i := 0; i < 20; i++ {
  607. <-doneCh
  608. }
  609. close(doneCh)
  610. // Only 1 write should have succeeded
  611. assert.Check(t, is.Equal(int32(1), success))
  612. closeNetworkDBInstances(dbs)
  613. }
  614. func TestParallelDelete(t *testing.T) {
  615. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  616. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  617. assert.NilError(t, err)
  618. startCh := make(chan int)
  619. doneCh := make(chan error)
  620. var success int32
  621. for i := 0; i < 20; i++ {
  622. go func() {
  623. <-startCh
  624. err := dbs[0].DeleteEntry("testTable", "testNetwork", "key")
  625. if err == nil {
  626. atomic.AddInt32(&success, 1)
  627. }
  628. doneCh <- err
  629. }()
  630. }
  631. close(startCh)
  632. for i := 0; i < 20; i++ {
  633. <-doneCh
  634. }
  635. close(doneCh)
  636. // Only 1 write should have succeeded
  637. assert.Check(t, is.Equal(int32(1), success))
  638. closeNetworkDBInstances(dbs)
  639. }
  640. func TestNetworkDBIslands(t *testing.T) {
  641. logrus.SetLevel(logrus.DebugLevel)
  642. dbs := createNetworkDBInstances(t, 5, "node", DefaultConfig())
  643. // Get the node IP used currently
  644. node, _ := dbs[0].nodes[dbs[0].config.NodeID]
  645. baseIPStr := node.Addr.String()
  646. // Node 0,1,2 are going to be the 3 bootstrap nodes
  647. members := []string{fmt.Sprintf("%s:%d", baseIPStr, dbs[0].config.BindPort),
  648. fmt.Sprintf("%s:%d", baseIPStr, dbs[1].config.BindPort),
  649. fmt.Sprintf("%s:%d", baseIPStr, dbs[2].config.BindPort)}
  650. // Rejoining will update the list of the bootstrap members
  651. for i := 3; i < 5; i++ {
  652. assert.Check(t, dbs[i].Join(members))
  653. }
  654. // Now the 3 bootstrap nodes will cleanly leave, and will be properly removed from the other 2 nodes
  655. for i := 0; i < 3; i++ {
  656. logrus.Infof("node %d leaving", i)
  657. dbs[i].Close()
  658. time.Sleep(2 * time.Second)
  659. }
  660. // Give some time to let the system propagate the messages and free up the ports
  661. time.Sleep(10 * time.Second)
  662. // Verify that the nodes are actually all gone and marked appropiately
  663. for i := 3; i < 5; i++ {
  664. assert.Check(t, is.Len(dbs[i].leftNodes, 3))
  665. assert.Check(t, is.Len(dbs[i].failedNodes, 0))
  666. }
  667. // Spawn again the first 3 nodes with different names but same IP:port
  668. for i := 0; i < 3; i++ {
  669. logrus.Infof("node %d coming back", i)
  670. dbs[i].config.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
  671. dbs[i] = launchNode(t, *dbs[i].config)
  672. time.Sleep(2 * time.Second)
  673. }
  674. // Give some time for the reconnect routine to run, it runs every 60s
  675. time.Sleep(50 * time.Second)
  676. // Verify that the cluster is again all connected. Note that the 3 previous node did not do any join
  677. for i := 0; i < 5; i++ {
  678. assert.Check(t, is.Len(dbs[i].nodes, 5))
  679. assert.Check(t, is.Len(dbs[i].failedNodes, 0))
  680. if i < 3 {
  681. // nodes from 0 to 3 has no left nodes
  682. assert.Check(t, is.Len(dbs[i].leftNodes, 0))
  683. } else {
  684. // nodes from 4 to 5 has the 3 previous left nodes
  685. assert.Check(t, is.Len(dbs[i].leftNodes, 3))
  686. }
  687. }
  688. }