networkdb_test.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929
  1. package networkdb
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "log"
  6. "net"
  7. "os"
  8. "strconv"
  9. "sync/atomic"
  10. "testing"
  11. "time"
  12. "github.com/docker/docker/pkg/stringid"
  13. "github.com/docker/go-events"
  14. "github.com/hashicorp/memberlist"
  15. "github.com/sirupsen/logrus"
  16. "gotest.tools/v3/assert"
  17. is "gotest.tools/v3/assert/cmp"
  18. "gotest.tools/v3/poll"
  19. // this takes care of the incontainer flag
  20. _ "github.com/docker/docker/libnetwork/testutils"
  21. )
  22. var dbPort int32 = 10000
  23. func TestMain(m *testing.M) {
  24. ioutil.WriteFile("/proc/sys/net/ipv6/conf/lo/disable_ipv6", []byte{'0', '\n'}, 0644)
  25. logrus.SetLevel(logrus.ErrorLevel)
  26. os.Exit(m.Run())
  27. }
  28. func launchNode(t *testing.T, conf Config) *NetworkDB {
  29. t.Helper()
  30. db, err := New(&conf)
  31. assert.NilError(t, err)
  32. return db
  33. }
  34. func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
  35. t.Helper()
  36. var dbs []*NetworkDB
  37. for i := 0; i < num; i++ {
  38. localConfig := *conf
  39. localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
  40. localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
  41. localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1))
  42. db := launchNode(t, localConfig)
  43. if i != 0 {
  44. assert.Check(t, db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)}))
  45. }
  46. dbs = append(dbs, db)
  47. }
  48. // Wait till the cluster creation is successful
  49. check := func(t poll.LogT) poll.Result {
  50. // Check that the cluster is properly created
  51. for i := 0; i < num; i++ {
  52. if num != len(dbs[i].ClusterPeers()) {
  53. return poll.Continue("%s:Waiting for cluser peers to be established", dbs[i].config.Hostname)
  54. }
  55. }
  56. return poll.Success()
  57. }
  58. poll.WaitOn(t, check, poll.WithDelay(2*time.Second), poll.WithTimeout(20*time.Second))
  59. return dbs
  60. }
  61. func closeNetworkDBInstances(t *testing.T, dbs []*NetworkDB) {
  62. t.Helper()
  63. log.Print("Closing DB instances...")
  64. for _, db := range dbs {
  65. db.Close()
  66. }
  67. }
  68. func (db *NetworkDB) verifyNodeExistence(t *testing.T, node string, present bool) {
  69. t.Helper()
  70. for i := 0; i < 80; i++ {
  71. db.RLock()
  72. _, ok := db.nodes[node]
  73. db.RUnlock()
  74. if present && ok {
  75. return
  76. }
  77. if !present && !ok {
  78. return
  79. }
  80. time.Sleep(50 * time.Millisecond)
  81. }
  82. t.Errorf("%v(%v): Node existence verification for node %s failed", db.config.Hostname, db.config.NodeID, node)
  83. }
  84. func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) {
  85. t.Helper()
  86. const sleepInterval = 50 * time.Millisecond
  87. var maxRetries int64
  88. if dl, ok := t.Deadline(); ok {
  89. maxRetries = int64(time.Until(dl) / sleepInterval)
  90. } else {
  91. maxRetries = 80
  92. }
  93. for i := int64(0); i < maxRetries; i++ {
  94. db.RLock()
  95. nn, nnok := db.networks[node]
  96. db.RUnlock()
  97. if nnok {
  98. n, ok := nn[id]
  99. if present && ok {
  100. return
  101. }
  102. if !present &&
  103. ((ok && n.leaving) ||
  104. !ok) {
  105. return
  106. }
  107. }
  108. time.Sleep(sleepInterval)
  109. }
  110. t.Error("Network existence verification failed")
  111. }
  112. func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value string, present bool) {
  113. t.Helper()
  114. n := 80
  115. for i := 0; i < n; i++ {
  116. entry, err := db.getEntry(tname, nid, key)
  117. if present && err == nil && string(entry.value) == value {
  118. return
  119. }
  120. if !present &&
  121. ((err == nil && entry.deleting) ||
  122. (err != nil)) {
  123. return
  124. }
  125. if i == n-1 && !present && err != nil {
  126. return
  127. }
  128. time.Sleep(50 * time.Millisecond)
  129. }
  130. t.Errorf("Entry existence verification test failed for %v(%v)", db.config.Hostname, db.config.NodeID)
  131. }
  132. func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) {
  133. t.Helper()
  134. select {
  135. case rcvdEv := <-ch:
  136. assert.Check(t, is.Equal(fmt.Sprintf("%T", rcvdEv), fmt.Sprintf("%T", ev)))
  137. switch typ := rcvdEv.(type) {
  138. case CreateEvent:
  139. assert.Check(t, is.Equal(tname, typ.Table))
  140. assert.Check(t, is.Equal(nid, typ.NetworkID))
  141. assert.Check(t, is.Equal(key, typ.Key))
  142. assert.Check(t, is.Equal(value, string(typ.Value)))
  143. case UpdateEvent:
  144. assert.Check(t, is.Equal(tname, typ.Table))
  145. assert.Check(t, is.Equal(nid, typ.NetworkID))
  146. assert.Check(t, is.Equal(key, typ.Key))
  147. assert.Check(t, is.Equal(value, string(typ.Value)))
  148. case DeleteEvent:
  149. assert.Check(t, is.Equal(tname, typ.Table))
  150. assert.Check(t, is.Equal(nid, typ.NetworkID))
  151. assert.Check(t, is.Equal(key, typ.Key))
  152. }
  153. case <-time.After(time.Second):
  154. t.Fail()
  155. return
  156. }
  157. }
  158. func TestNetworkDBSimple(t *testing.T) {
  159. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  160. closeNetworkDBInstances(t, dbs)
  161. }
  162. func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
  163. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  164. err := dbs[0].JoinNetwork("network1")
  165. assert.NilError(t, err)
  166. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  167. err = dbs[0].LeaveNetwork("network1")
  168. assert.NilError(t, err)
  169. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", false)
  170. closeNetworkDBInstances(t, dbs)
  171. }
  172. func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
  173. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  174. n := 10
  175. for i := 1; i <= n; i++ {
  176. err := dbs[0].JoinNetwork(fmt.Sprintf("network0%d", i))
  177. assert.NilError(t, err)
  178. }
  179. for i := 1; i <= n; i++ {
  180. err := dbs[1].JoinNetwork(fmt.Sprintf("network1%d", i))
  181. assert.NilError(t, err)
  182. }
  183. for i := 1; i <= n; i++ {
  184. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), true)
  185. }
  186. for i := 1; i <= n; i++ {
  187. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), true)
  188. }
  189. for i := 1; i <= n; i++ {
  190. err := dbs[0].LeaveNetwork(fmt.Sprintf("network0%d", i))
  191. assert.NilError(t, err)
  192. }
  193. for i := 1; i <= n; i++ {
  194. err := dbs[1].LeaveNetwork(fmt.Sprintf("network1%d", i))
  195. assert.NilError(t, err)
  196. }
  197. for i := 1; i <= n; i++ {
  198. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), false)
  199. }
  200. for i := 1; i <= n; i++ {
  201. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), false)
  202. }
  203. closeNetworkDBInstances(t, dbs)
  204. }
  205. func TestNetworkDBCRUDTableEntry(t *testing.T) {
  206. dbs := createNetworkDBInstances(t, 3, "node", DefaultConfig())
  207. err := dbs[0].JoinNetwork("network1")
  208. assert.NilError(t, err)
  209. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  210. err = dbs[1].JoinNetwork("network1")
  211. assert.NilError(t, err)
  212. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  213. assert.NilError(t, err)
  214. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  215. dbs[2].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  216. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  217. assert.NilError(t, err)
  218. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  219. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  220. assert.NilError(t, err)
  221. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  222. closeNetworkDBInstances(t, dbs)
  223. }
  224. func TestNetworkDBCRUDTableEntries(t *testing.T) {
  225. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  226. err := dbs[0].JoinNetwork("network1")
  227. assert.NilError(t, err)
  228. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  229. err = dbs[1].JoinNetwork("network1")
  230. assert.NilError(t, err)
  231. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  232. n := 10
  233. for i := 1; i <= n; i++ {
  234. err = dbs[0].CreateEntry("test_table", "network1",
  235. fmt.Sprintf("test_key0%d", i),
  236. []byte(fmt.Sprintf("test_value0%d", i)))
  237. assert.NilError(t, err)
  238. }
  239. for i := 1; i <= n; i++ {
  240. err = dbs[1].CreateEntry("test_table", "network1",
  241. fmt.Sprintf("test_key1%d", i),
  242. []byte(fmt.Sprintf("test_value1%d", i)))
  243. assert.NilError(t, err)
  244. }
  245. for i := 1; i <= n; i++ {
  246. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  247. fmt.Sprintf("test_key1%d", i),
  248. fmt.Sprintf("test_value1%d", i), true)
  249. assert.NilError(t, err)
  250. }
  251. for i := 1; i <= n; i++ {
  252. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  253. fmt.Sprintf("test_key0%d", i),
  254. fmt.Sprintf("test_value0%d", i), true)
  255. assert.NilError(t, err)
  256. }
  257. // Verify deletes
  258. for i := 1; i <= n; i++ {
  259. err = dbs[0].DeleteEntry("test_table", "network1",
  260. fmt.Sprintf("test_key0%d", i))
  261. assert.NilError(t, err)
  262. }
  263. for i := 1; i <= n; i++ {
  264. err = dbs[1].DeleteEntry("test_table", "network1",
  265. fmt.Sprintf("test_key1%d", i))
  266. assert.NilError(t, err)
  267. }
  268. for i := 1; i <= n; i++ {
  269. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  270. fmt.Sprintf("test_key1%d", i), "", false)
  271. assert.NilError(t, err)
  272. }
  273. for i := 1; i <= n; i++ {
  274. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  275. fmt.Sprintf("test_key0%d", i), "", false)
  276. assert.NilError(t, err)
  277. }
  278. closeNetworkDBInstances(t, dbs)
  279. }
  280. func TestNetworkDBNodeLeave(t *testing.T) {
  281. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  282. err := dbs[0].JoinNetwork("network1")
  283. assert.NilError(t, err)
  284. err = dbs[1].JoinNetwork("network1")
  285. assert.NilError(t, err)
  286. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  287. assert.NilError(t, err)
  288. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  289. dbs[0].Close()
  290. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  291. dbs[1].Close()
  292. }
  293. func TestNetworkDBWatch(t *testing.T) {
  294. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  295. err := dbs[0].JoinNetwork("network1")
  296. assert.NilError(t, err)
  297. err = dbs[1].JoinNetwork("network1")
  298. assert.NilError(t, err)
  299. ch, cancel := dbs[1].Watch("", "", "")
  300. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  301. assert.NilError(t, err)
  302. testWatch(t, ch.C, CreateEvent{}, "test_table", "network1", "test_key", "test_value")
  303. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  304. assert.NilError(t, err)
  305. testWatch(t, ch.C, UpdateEvent{}, "test_table", "network1", "test_key", "test_updated_value")
  306. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  307. assert.NilError(t, err)
  308. testWatch(t, ch.C, DeleteEvent{}, "test_table", "network1", "test_key", "")
  309. cancel()
  310. closeNetworkDBInstances(t, dbs)
  311. }
  312. func TestNetworkDBBulkSync(t *testing.T) {
  313. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  314. err := dbs[0].JoinNetwork("network1")
  315. assert.NilError(t, err)
  316. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  317. n := 1000
  318. for i := 1; i <= n; i++ {
  319. err = dbs[0].CreateEntry("test_table", "network1",
  320. fmt.Sprintf("test_key0%d", i),
  321. []byte(fmt.Sprintf("test_value0%d", i)))
  322. assert.NilError(t, err)
  323. }
  324. err = dbs[1].JoinNetwork("network1")
  325. assert.NilError(t, err)
  326. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  327. for i := 1; i <= n; i++ {
  328. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  329. fmt.Sprintf("test_key0%d", i),
  330. fmt.Sprintf("test_value0%d", i), true)
  331. assert.NilError(t, err)
  332. }
  333. closeNetworkDBInstances(t, dbs)
  334. }
  335. func TestNetworkDBCRUDMediumCluster(t *testing.T) {
  336. n := 5
  337. dbs := createNetworkDBInstances(t, n, "node", DefaultConfig())
  338. for i := 0; i < n; i++ {
  339. for j := 0; j < n; j++ {
  340. if i == j {
  341. continue
  342. }
  343. dbs[i].verifyNodeExistence(t, dbs[j].config.NodeID, true)
  344. }
  345. }
  346. for i := 0; i < n; i++ {
  347. err := dbs[i].JoinNetwork("network1")
  348. assert.NilError(t, err)
  349. }
  350. for i := 0; i < n; i++ {
  351. for j := 0; j < n; j++ {
  352. dbs[i].verifyNetworkExistence(t, dbs[j].config.NodeID, "network1", true)
  353. }
  354. }
  355. err := dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  356. assert.NilError(t, err)
  357. for i := 1; i < n; i++ {
  358. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  359. }
  360. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  361. assert.NilError(t, err)
  362. for i := 1; i < n; i++ {
  363. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  364. }
  365. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  366. assert.NilError(t, err)
  367. for i := 1; i < n; i++ {
  368. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  369. }
  370. for i := 1; i < n; i++ {
  371. _, err = dbs[i].GetEntry("test_table", "network1", "test_key")
  372. assert.Check(t, is.ErrorContains(err, ""))
  373. assert.Check(t, is.Contains(err.Error(), "deleted and pending garbage collection"), err)
  374. }
  375. closeNetworkDBInstances(t, dbs)
  376. }
  377. func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
  378. maxRetry := 5
  379. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  380. // Single node Join/Leave
  381. err := dbs[0].JoinNetwork("network1")
  382. assert.NilError(t, err)
  383. if len(dbs[0].networkNodes["network1"]) != 1 {
  384. t.Fatalf("The networkNodes list has to have be 1 instead of %d", len(dbs[0].networkNodes["network1"]))
  385. }
  386. err = dbs[0].LeaveNetwork("network1")
  387. assert.NilError(t, err)
  388. if len(dbs[0].networkNodes["network1"]) != 0 {
  389. t.Fatalf("The networkNodes list has to have be 0 instead of %d", len(dbs[0].networkNodes["network1"]))
  390. }
  391. // Multiple nodes Join/Leave
  392. err = dbs[0].JoinNetwork("network1")
  393. assert.NilError(t, err)
  394. err = dbs[1].JoinNetwork("network1")
  395. assert.NilError(t, err)
  396. // Wait for the propagation on db[0]
  397. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  398. if len(dbs[0].networkNodes["network1"]) != 2 {
  399. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"])
  400. }
  401. if n, ok := dbs[0].networks[dbs[0].config.NodeID]["network1"]; !ok || n.leaving {
  402. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  403. }
  404. // Wait for the propagation on db[1]
  405. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  406. if len(dbs[1].networkNodes["network1"]) != 2 {
  407. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"])
  408. }
  409. if n, ok := dbs[1].networks[dbs[1].config.NodeID]["network1"]; !ok || n.leaving {
  410. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  411. }
  412. // Try a quick leave/join
  413. err = dbs[0].LeaveNetwork("network1")
  414. assert.NilError(t, err)
  415. err = dbs[0].JoinNetwork("network1")
  416. assert.NilError(t, err)
  417. for i := 0; i < maxRetry; i++ {
  418. if len(dbs[0].networkNodes["network1"]) == 2 {
  419. break
  420. }
  421. time.Sleep(1 * time.Second)
  422. }
  423. if len(dbs[0].networkNodes["network1"]) != 2 {
  424. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"])
  425. }
  426. for i := 0; i < maxRetry; i++ {
  427. if len(dbs[1].networkNodes["network1"]) == 2 {
  428. break
  429. }
  430. time.Sleep(1 * time.Second)
  431. }
  432. if len(dbs[1].networkNodes["network1"]) != 2 {
  433. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"])
  434. }
  435. closeNetworkDBInstances(t, dbs)
  436. }
  437. func TestNetworkDBGarbageCollection(t *testing.T) {
  438. keysWriteDelete := 5
  439. config := DefaultConfig()
  440. config.reapEntryInterval = 30 * time.Second
  441. config.StatsPrintPeriod = 15 * time.Second
  442. dbs := createNetworkDBInstances(t, 3, "node", config)
  443. // 2 Nodes join network
  444. err := dbs[0].JoinNetwork("network1")
  445. assert.NilError(t, err)
  446. err = dbs[1].JoinNetwork("network1")
  447. assert.NilError(t, err)
  448. for i := 0; i < keysWriteDelete; i++ {
  449. err = dbs[i%2].CreateEntry("testTable", "network1", "key-"+strconv.Itoa(i), []byte("value"))
  450. assert.NilError(t, err)
  451. }
  452. time.Sleep(time.Second)
  453. for i := 0; i < keysWriteDelete; i++ {
  454. err = dbs[i%2].DeleteEntry("testTable", "network1", "key-"+strconv.Itoa(i))
  455. assert.NilError(t, err)
  456. }
  457. for i := 0; i < 2; i++ {
  458. assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match")
  459. }
  460. // from this point the timer for the garbage collection started, wait 5 seconds and then join a new node
  461. time.Sleep(5 * time.Second)
  462. err = dbs[2].JoinNetwork("network1")
  463. assert.NilError(t, err)
  464. for i := 0; i < 3; i++ {
  465. assert.Check(t, is.Equal(keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries number should match")
  466. }
  467. // at this point the entries should had been all deleted
  468. time.Sleep(30 * time.Second)
  469. for i := 0; i < 3; i++ {
  470. assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected")
  471. }
  472. // make sure that entries are not coming back
  473. time.Sleep(15 * time.Second)
  474. for i := 0; i < 3; i++ {
  475. assert.Check(t, is.Equal(0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber), "entries should had been garbage collected")
  476. }
  477. closeNetworkDBInstances(t, dbs)
  478. }
  479. func TestFindNode(t *testing.T) {
  480. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  481. dbs[0].nodes["active"] = &node{Node: memberlist.Node{Name: "active"}}
  482. dbs[0].failedNodes["failed"] = &node{Node: memberlist.Node{Name: "failed"}}
  483. dbs[0].leftNodes["left"] = &node{Node: memberlist.Node{Name: "left"}}
  484. // active nodes is 2 because the testing node is in the list
  485. assert.Check(t, is.Len(dbs[0].nodes, 2))
  486. assert.Check(t, is.Len(dbs[0].failedNodes, 1))
  487. assert.Check(t, is.Len(dbs[0].leftNodes, 1))
  488. n, currState, m := dbs[0].findNode("active")
  489. assert.Check(t, n != nil)
  490. assert.Check(t, is.Equal("active", n.Name))
  491. assert.Check(t, is.Equal(nodeActiveState, currState))
  492. assert.Check(t, m != nil)
  493. // delete the entry manually
  494. delete(m, "active")
  495. // test if can be still find
  496. n, currState, m = dbs[0].findNode("active")
  497. assert.Check(t, is.Nil(n))
  498. assert.Check(t, is.Equal(nodeNotFound, currState))
  499. assert.Check(t, is.Nil(m))
  500. n, currState, m = dbs[0].findNode("failed")
  501. assert.Check(t, n != nil)
  502. assert.Check(t, is.Equal("failed", n.Name))
  503. assert.Check(t, is.Equal(nodeFailedState, currState))
  504. assert.Check(t, m != nil)
  505. // find and remove
  506. n, currState, m = dbs[0].findNode("left")
  507. assert.Check(t, n != nil)
  508. assert.Check(t, is.Equal("left", n.Name))
  509. assert.Check(t, is.Equal(nodeLeftState, currState))
  510. assert.Check(t, m != nil)
  511. delete(m, "left")
  512. n, currState, m = dbs[0].findNode("left")
  513. assert.Check(t, is.Nil(n))
  514. assert.Check(t, is.Equal(nodeNotFound, currState))
  515. assert.Check(t, is.Nil(m))
  516. closeNetworkDBInstances(t, dbs)
  517. }
  518. func TestChangeNodeState(t *testing.T) {
  519. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  520. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1"}}
  521. dbs[0].nodes["node2"] = &node{Node: memberlist.Node{Name: "node2"}}
  522. dbs[0].nodes["node3"] = &node{Node: memberlist.Node{Name: "node3"}}
  523. // active nodes is 4 because the testing node is in the list
  524. assert.Check(t, is.Len(dbs[0].nodes, 4))
  525. n, currState, m := dbs[0].findNode("node1")
  526. assert.Check(t, n != nil)
  527. assert.Check(t, is.Equal(nodeActiveState, currState))
  528. assert.Check(t, is.Equal("node1", n.Name))
  529. assert.Check(t, m != nil)
  530. // node1 to failed
  531. dbs[0].changeNodeState("node1", nodeFailedState)
  532. n, currState, m = dbs[0].findNode("node1")
  533. assert.Check(t, n != nil)
  534. assert.Check(t, is.Equal(nodeFailedState, currState))
  535. assert.Check(t, is.Equal("node1", n.Name))
  536. assert.Check(t, m != nil)
  537. assert.Check(t, time.Duration(0) != n.reapTime)
  538. // node1 back to active
  539. dbs[0].changeNodeState("node1", nodeActiveState)
  540. n, currState, m = dbs[0].findNode("node1")
  541. assert.Check(t, n != nil)
  542. assert.Check(t, is.Equal(nodeActiveState, currState))
  543. assert.Check(t, is.Equal("node1", n.Name))
  544. assert.Check(t, m != nil)
  545. assert.Check(t, is.Equal(time.Duration(0), n.reapTime))
  546. // node1 to left
  547. dbs[0].changeNodeState("node1", nodeLeftState)
  548. dbs[0].changeNodeState("node2", nodeLeftState)
  549. dbs[0].changeNodeState("node3", nodeLeftState)
  550. n, currState, m = dbs[0].findNode("node1")
  551. assert.Check(t, n != nil)
  552. assert.Check(t, is.Equal(nodeLeftState, currState))
  553. assert.Check(t, is.Equal("node1", n.Name))
  554. assert.Check(t, m != nil)
  555. assert.Check(t, time.Duration(0) != n.reapTime)
  556. n, currState, m = dbs[0].findNode("node2")
  557. assert.Check(t, n != nil)
  558. assert.Check(t, is.Equal(nodeLeftState, currState))
  559. assert.Check(t, is.Equal("node2", n.Name))
  560. assert.Check(t, m != nil)
  561. assert.Check(t, time.Duration(0) != n.reapTime)
  562. n, currState, m = dbs[0].findNode("node3")
  563. assert.Check(t, n != nil)
  564. assert.Check(t, is.Equal(nodeLeftState, currState))
  565. assert.Check(t, is.Equal("node3", n.Name))
  566. assert.Check(t, m != nil)
  567. assert.Check(t, time.Duration(0) != n.reapTime)
  568. // active nodes is 1 because the testing node is in the list
  569. assert.Check(t, is.Len(dbs[0].nodes, 1))
  570. assert.Check(t, is.Len(dbs[0].failedNodes, 0))
  571. assert.Check(t, is.Len(dbs[0].leftNodes, 3))
  572. closeNetworkDBInstances(t, dbs)
  573. }
  574. func TestNodeReincarnation(t *testing.T) {
  575. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  576. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1", Addr: net.ParseIP("192.168.1.1")}}
  577. dbs[0].leftNodes["node2"] = &node{Node: memberlist.Node{Name: "node2", Addr: net.ParseIP("192.168.1.2")}}
  578. dbs[0].failedNodes["node3"] = &node{Node: memberlist.Node{Name: "node3", Addr: net.ParseIP("192.168.1.3")}}
  579. // active nodes is 2 because the testing node is in the list
  580. assert.Check(t, is.Len(dbs[0].nodes, 2))
  581. assert.Check(t, is.Len(dbs[0].failedNodes, 1))
  582. assert.Check(t, is.Len(dbs[0].leftNodes, 1))
  583. b := dbs[0].purgeReincarnation(&memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")})
  584. assert.Check(t, b)
  585. dbs[0].nodes["node4"] = &node{Node: memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}}
  586. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.2")})
  587. assert.Check(t, b)
  588. dbs[0].nodes["node5"] = &node{Node: memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.1")}}
  589. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.3")})
  590. assert.Check(t, b)
  591. dbs[0].nodes["node6"] = &node{Node: memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.1")}}
  592. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.10")})
  593. assert.Check(t, !b)
  594. // active nodes is 1 because the testing node is in the list
  595. assert.Check(t, is.Len(dbs[0].nodes, 4))
  596. assert.Check(t, is.Len(dbs[0].failedNodes, 0))
  597. assert.Check(t, is.Len(dbs[0].leftNodes, 3))
  598. closeNetworkDBInstances(t, dbs)
  599. }
  600. func TestParallelCreate(t *testing.T) {
  601. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  602. startCh := make(chan int)
  603. doneCh := make(chan error)
  604. var success int32
  605. for i := 0; i < 20; i++ {
  606. go func() {
  607. <-startCh
  608. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  609. if err == nil {
  610. atomic.AddInt32(&success, 1)
  611. }
  612. doneCh <- err
  613. }()
  614. }
  615. close(startCh)
  616. for i := 0; i < 20; i++ {
  617. <-doneCh
  618. }
  619. close(doneCh)
  620. // Only 1 write should have succeeded
  621. assert.Check(t, is.Equal(int32(1), success))
  622. closeNetworkDBInstances(t, dbs)
  623. }
  624. func TestParallelDelete(t *testing.T) {
  625. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  626. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  627. assert.NilError(t, err)
  628. startCh := make(chan int)
  629. doneCh := make(chan error)
  630. var success int32
  631. for i := 0; i < 20; i++ {
  632. go func() {
  633. <-startCh
  634. err := dbs[0].DeleteEntry("testTable", "testNetwork", "key")
  635. if err == nil {
  636. atomic.AddInt32(&success, 1)
  637. }
  638. doneCh <- err
  639. }()
  640. }
  641. close(startCh)
  642. for i := 0; i < 20; i++ {
  643. <-doneCh
  644. }
  645. close(doneCh)
  646. // Only 1 write should have succeeded
  647. assert.Check(t, is.Equal(int32(1), success))
  648. closeNetworkDBInstances(t, dbs)
  649. }
  650. func TestNetworkDBIslands(t *testing.T) {
  651. pollTimeout := func() time.Duration {
  652. const defaultTimeout = 120 * time.Second
  653. dl, ok := t.Deadline()
  654. if !ok {
  655. return defaultTimeout
  656. }
  657. if d := time.Until(dl); d <= defaultTimeout {
  658. return d
  659. }
  660. return defaultTimeout
  661. }
  662. logrus.SetLevel(logrus.DebugLevel)
  663. conf := DefaultConfig()
  664. // Shorten durations to speed up test execution.
  665. conf.rejoinClusterDuration = conf.rejoinClusterDuration / 10
  666. conf.rejoinClusterInterval = conf.rejoinClusterInterval / 10
  667. dbs := createNetworkDBInstances(t, 5, "node", conf)
  668. // Get the node IP used currently
  669. node := dbs[0].nodes[dbs[0].config.NodeID]
  670. baseIPStr := node.Addr.String()
  671. // Node 0,1,2 are going to be the 3 bootstrap nodes
  672. members := []string{fmt.Sprintf("%s:%d", baseIPStr, dbs[0].config.BindPort),
  673. fmt.Sprintf("%s:%d", baseIPStr, dbs[1].config.BindPort),
  674. fmt.Sprintf("%s:%d", baseIPStr, dbs[2].config.BindPort)}
  675. // Rejoining will update the list of the bootstrap members
  676. for i := 3; i < 5; i++ {
  677. t.Logf("Re-joining: %d", i)
  678. assert.Check(t, dbs[i].Join(members))
  679. }
  680. // Now the 3 bootstrap nodes will cleanly leave, and will be properly removed from the other 2 nodes
  681. for i := 0; i < 3; i++ {
  682. logrus.Infof("node %d leaving", i)
  683. dbs[i].Close()
  684. }
  685. checkDBs := make(map[string]*NetworkDB)
  686. for i := 3; i < 5; i++ {
  687. db := dbs[i]
  688. checkDBs[db.config.Hostname] = db
  689. }
  690. // Give some time to let the system propagate the messages and free up the ports
  691. check := func(t poll.LogT) poll.Result {
  692. // Verify that the nodes are actually all gone and marked appropiately
  693. for name, db := range checkDBs {
  694. db.RLock()
  695. if (len(db.leftNodes) != 3) || (len(db.failedNodes) != 0) {
  696. for name := range db.leftNodes {
  697. t.Logf("%s: Node %s left", db.config.Hostname, name)
  698. }
  699. for name := range db.failedNodes {
  700. t.Logf("%s: Node %s failed", db.config.Hostname, name)
  701. }
  702. db.RUnlock()
  703. return poll.Continue("%s:Waiting for all nodes to cleanly leave, left: %d, failed nodes: %d", name, len(db.leftNodes), len(db.failedNodes))
  704. }
  705. db.RUnlock()
  706. t.Logf("%s: OK", name)
  707. delete(checkDBs, name)
  708. }
  709. return poll.Success()
  710. }
  711. poll.WaitOn(t, check, poll.WithDelay(time.Second), poll.WithTimeout(pollTimeout()))
  712. // Spawn again the first 3 nodes with different names but same IP:port
  713. for i := 0; i < 3; i++ {
  714. logrus.Infof("node %d coming back", i)
  715. dbs[i].config.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
  716. dbs[i] = launchNode(t, *dbs[i].config)
  717. }
  718. // Give some time for the reconnect routine to run, it runs every 6s.
  719. check = func(t poll.LogT) poll.Result {
  720. // Verify that the cluster is again all connected. Note that the 3 previous node did not do any join
  721. for i := 0; i < 5; i++ {
  722. db := dbs[i]
  723. db.RLock()
  724. if len(db.nodes) != 5 {
  725. db.RUnlock()
  726. return poll.Continue("%s:Waiting to connect to all nodes", dbs[i].config.Hostname)
  727. }
  728. if len(db.failedNodes) != 0 {
  729. db.RUnlock()
  730. return poll.Continue("%s:Waiting for 0 failedNodes", dbs[i].config.Hostname)
  731. }
  732. if i < 3 {
  733. // nodes from 0 to 3 has no left nodes
  734. if len(db.leftNodes) != 0 {
  735. db.RUnlock()
  736. return poll.Continue("%s:Waiting to have no leftNodes", dbs[i].config.Hostname)
  737. }
  738. } else {
  739. // nodes from 4 to 5 has the 3 previous left nodes
  740. if len(db.leftNodes) != 3 {
  741. db.RUnlock()
  742. return poll.Continue("%s:Waiting to have 3 leftNodes", dbs[i].config.Hostname)
  743. }
  744. }
  745. db.RUnlock()
  746. }
  747. return poll.Success()
  748. }
  749. poll.WaitOn(t, check, poll.WithDelay(time.Second), poll.WithTimeout(pollTimeout()))
  750. closeNetworkDBInstances(t, dbs)
  751. }