networkdb_test.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865
  1. package networkdb
  2. import (
  3. "flag"
  4. "fmt"
  5. "io/ioutil"
  6. "log"
  7. "net"
  8. "os"
  9. "strings"
  10. "sync/atomic"
  11. "testing"
  12. "time"
  13. "github.com/docker/docker/pkg/stringid"
  14. "github.com/docker/go-events"
  15. "github.com/hashicorp/memberlist"
  16. "github.com/sirupsen/logrus"
  17. "github.com/stretchr/testify/assert"
  18. "github.com/stretchr/testify/require"
  19. )
  20. var (
  21. dbPort int32 = 10000
  22. runningInContainer = flag.Bool("incontainer", false, "Indicates if the test is running in a container")
  23. )
  24. func TestMain(m *testing.M) {
  25. ioutil.WriteFile("/proc/sys/net/ipv6/conf/lo/disable_ipv6", []byte{'0', '\n'}, 0644)
  26. logrus.SetLevel(logrus.ErrorLevel)
  27. os.Exit(m.Run())
  28. }
  29. func launchNode(t *testing.T, conf Config) *NetworkDB {
  30. db, err := New(&conf)
  31. require.NoError(t, err)
  32. return db
  33. }
  34. func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
  35. var dbs []*NetworkDB
  36. for i := 0; i < num; i++ {
  37. localConfig := *conf
  38. localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
  39. localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
  40. localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1))
  41. db := launchNode(t, localConfig)
  42. if i != 0 {
  43. assert.NoError(t, db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)}))
  44. }
  45. dbs = append(dbs, db)
  46. }
  47. // Check that the cluster is properly created
  48. for i := 0; i < num; i++ {
  49. if num != len(dbs[i].ClusterPeers()) {
  50. t.Fatalf("Number of nodes for %s into the cluster does not match %d != %d",
  51. dbs[i].config.Hostname, num, len(dbs[i].ClusterPeers()))
  52. }
  53. }
  54. return dbs
  55. }
  56. func closeNetworkDBInstances(dbs []*NetworkDB) {
  57. log.Print("Closing DB instances...")
  58. for _, db := range dbs {
  59. db.Close()
  60. }
  61. }
  62. func (db *NetworkDB) verifyNodeExistence(t *testing.T, node string, present bool) {
  63. for i := 0; i < 80; i++ {
  64. db.RLock()
  65. _, ok := db.nodes[node]
  66. db.RUnlock()
  67. if present && ok {
  68. return
  69. }
  70. if !present && !ok {
  71. return
  72. }
  73. time.Sleep(50 * time.Millisecond)
  74. }
  75. assert.Fail(t, fmt.Sprintf("%v(%v): Node existence verification for node %s failed", db.config.Hostname, db.config.NodeID, node))
  76. }
  77. func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) {
  78. for i := 0; i < 80; i++ {
  79. db.RLock()
  80. nn, nnok := db.networks[node]
  81. db.RUnlock()
  82. if nnok {
  83. n, ok := nn[id]
  84. if present && ok {
  85. return
  86. }
  87. if !present &&
  88. ((ok && n.leaving) ||
  89. !ok) {
  90. return
  91. }
  92. }
  93. time.Sleep(50 * time.Millisecond)
  94. }
  95. assert.Fail(t, "Network existence verification failed")
  96. }
  97. func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value string, present bool) {
  98. n := 80
  99. for i := 0; i < n; i++ {
  100. entry, err := db.getEntry(tname, nid, key)
  101. if present && err == nil && string(entry.value) == value {
  102. return
  103. }
  104. if !present &&
  105. ((err == nil && entry.deleting) ||
  106. (err != nil)) {
  107. return
  108. }
  109. if i == n-1 && !present && err != nil {
  110. return
  111. }
  112. time.Sleep(50 * time.Millisecond)
  113. }
  114. assert.Fail(t, fmt.Sprintf("Entry existence verification test failed for %v(%v)", db.config.Hostname, db.config.NodeID))
  115. }
  116. func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) {
  117. select {
  118. case rcvdEv := <-ch:
  119. assert.Equal(t, fmt.Sprintf("%T", rcvdEv), fmt.Sprintf("%T", ev))
  120. switch rcvdEv.(type) {
  121. case CreateEvent:
  122. assert.Equal(t, tname, rcvdEv.(CreateEvent).Table)
  123. assert.Equal(t, nid, rcvdEv.(CreateEvent).NetworkID)
  124. assert.Equal(t, key, rcvdEv.(CreateEvent).Key)
  125. assert.Equal(t, value, string(rcvdEv.(CreateEvent).Value))
  126. case UpdateEvent:
  127. assert.Equal(t, tname, rcvdEv.(UpdateEvent).Table)
  128. assert.Equal(t, nid, rcvdEv.(UpdateEvent).NetworkID)
  129. assert.Equal(t, key, rcvdEv.(UpdateEvent).Key)
  130. assert.Equal(t, value, string(rcvdEv.(UpdateEvent).Value))
  131. case DeleteEvent:
  132. assert.Equal(t, tname, rcvdEv.(DeleteEvent).Table)
  133. assert.Equal(t, nid, rcvdEv.(DeleteEvent).NetworkID)
  134. assert.Equal(t, key, rcvdEv.(DeleteEvent).Key)
  135. }
  136. case <-time.After(time.Second):
  137. t.Fail()
  138. return
  139. }
  140. }
  141. func TestNetworkDBSimple(t *testing.T) {
  142. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  143. closeNetworkDBInstances(dbs)
  144. }
  145. func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
  146. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  147. err := dbs[0].JoinNetwork("network1")
  148. assert.NoError(t, err)
  149. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  150. err = dbs[0].LeaveNetwork("network1")
  151. assert.NoError(t, err)
  152. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", false)
  153. closeNetworkDBInstances(dbs)
  154. }
  155. func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
  156. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  157. n := 10
  158. for i := 1; i <= n; i++ {
  159. err := dbs[0].JoinNetwork(fmt.Sprintf("network0%d", i))
  160. assert.NoError(t, err)
  161. }
  162. for i := 1; i <= n; i++ {
  163. err := dbs[1].JoinNetwork(fmt.Sprintf("network1%d", i))
  164. assert.NoError(t, err)
  165. }
  166. for i := 1; i <= n; i++ {
  167. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), true)
  168. }
  169. for i := 1; i <= n; i++ {
  170. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), true)
  171. }
  172. for i := 1; i <= n; i++ {
  173. err := dbs[0].LeaveNetwork(fmt.Sprintf("network0%d", i))
  174. assert.NoError(t, err)
  175. }
  176. for i := 1; i <= n; i++ {
  177. err := dbs[1].LeaveNetwork(fmt.Sprintf("network1%d", i))
  178. assert.NoError(t, err)
  179. }
  180. for i := 1; i <= n; i++ {
  181. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), false)
  182. }
  183. for i := 1; i <= n; i++ {
  184. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), false)
  185. }
  186. closeNetworkDBInstances(dbs)
  187. }
  188. func TestNetworkDBCRUDTableEntry(t *testing.T) {
  189. dbs := createNetworkDBInstances(t, 3, "node", DefaultConfig())
  190. err := dbs[0].JoinNetwork("network1")
  191. assert.NoError(t, err)
  192. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  193. err = dbs[1].JoinNetwork("network1")
  194. assert.NoError(t, err)
  195. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  196. assert.NoError(t, err)
  197. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  198. dbs[2].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  199. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  200. assert.NoError(t, err)
  201. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  202. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  203. assert.NoError(t, err)
  204. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  205. closeNetworkDBInstances(dbs)
  206. }
  207. func TestNetworkDBCRUDTableEntries(t *testing.T) {
  208. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  209. err := dbs[0].JoinNetwork("network1")
  210. assert.NoError(t, err)
  211. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  212. err = dbs[1].JoinNetwork("network1")
  213. assert.NoError(t, err)
  214. n := 10
  215. for i := 1; i <= n; i++ {
  216. err = dbs[0].CreateEntry("test_table", "network1",
  217. fmt.Sprintf("test_key0%d", i),
  218. []byte(fmt.Sprintf("test_value0%d", i)))
  219. assert.NoError(t, err)
  220. }
  221. for i := 1; i <= n; i++ {
  222. err = dbs[1].CreateEntry("test_table", "network1",
  223. fmt.Sprintf("test_key1%d", i),
  224. []byte(fmt.Sprintf("test_value1%d", i)))
  225. assert.NoError(t, err)
  226. }
  227. for i := 1; i <= n; i++ {
  228. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  229. fmt.Sprintf("test_key1%d", i),
  230. fmt.Sprintf("test_value1%d", i), true)
  231. assert.NoError(t, err)
  232. }
  233. for i := 1; i <= n; i++ {
  234. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  235. fmt.Sprintf("test_key0%d", i),
  236. fmt.Sprintf("test_value0%d", i), true)
  237. assert.NoError(t, err)
  238. }
  239. // Verify deletes
  240. for i := 1; i <= n; i++ {
  241. err = dbs[0].DeleteEntry("test_table", "network1",
  242. fmt.Sprintf("test_key0%d", i))
  243. assert.NoError(t, err)
  244. }
  245. for i := 1; i <= n; i++ {
  246. err = dbs[1].DeleteEntry("test_table", "network1",
  247. fmt.Sprintf("test_key1%d", i))
  248. assert.NoError(t, err)
  249. }
  250. for i := 1; i <= n; i++ {
  251. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  252. fmt.Sprintf("test_key1%d", i), "", false)
  253. assert.NoError(t, err)
  254. }
  255. for i := 1; i <= n; i++ {
  256. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  257. fmt.Sprintf("test_key0%d", i), "", false)
  258. assert.NoError(t, err)
  259. }
  260. closeNetworkDBInstances(dbs)
  261. }
  262. func TestNetworkDBNodeLeave(t *testing.T) {
  263. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  264. err := dbs[0].JoinNetwork("network1")
  265. assert.NoError(t, err)
  266. err = dbs[1].JoinNetwork("network1")
  267. assert.NoError(t, err)
  268. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  269. assert.NoError(t, err)
  270. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  271. dbs[0].Close()
  272. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  273. dbs[1].Close()
  274. }
  275. func TestNetworkDBWatch(t *testing.T) {
  276. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  277. err := dbs[0].JoinNetwork("network1")
  278. assert.NoError(t, err)
  279. err = dbs[1].JoinNetwork("network1")
  280. assert.NoError(t, err)
  281. ch, cancel := dbs[1].Watch("", "", "")
  282. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  283. assert.NoError(t, err)
  284. testWatch(t, ch.C, CreateEvent{}, "test_table", "network1", "test_key", "test_value")
  285. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  286. assert.NoError(t, err)
  287. testWatch(t, ch.C, UpdateEvent{}, "test_table", "network1", "test_key", "test_updated_value")
  288. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  289. assert.NoError(t, err)
  290. testWatch(t, ch.C, DeleteEvent{}, "test_table", "network1", "test_key", "")
  291. cancel()
  292. closeNetworkDBInstances(dbs)
  293. }
  294. func TestNetworkDBBulkSync(t *testing.T) {
  295. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  296. err := dbs[0].JoinNetwork("network1")
  297. assert.NoError(t, err)
  298. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  299. n := 1000
  300. for i := 1; i <= n; i++ {
  301. err = dbs[0].CreateEntry("test_table", "network1",
  302. fmt.Sprintf("test_key0%d", i),
  303. []byte(fmt.Sprintf("test_value0%d", i)))
  304. assert.NoError(t, err)
  305. }
  306. err = dbs[1].JoinNetwork("network1")
  307. assert.NoError(t, err)
  308. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  309. for i := 1; i <= n; i++ {
  310. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  311. fmt.Sprintf("test_key0%d", i),
  312. fmt.Sprintf("test_value0%d", i), true)
  313. assert.NoError(t, err)
  314. }
  315. closeNetworkDBInstances(dbs)
  316. }
  317. func TestNetworkDBCRUDMediumCluster(t *testing.T) {
  318. n := 5
  319. dbs := createNetworkDBInstances(t, n, "node", DefaultConfig())
  320. for i := 0; i < n; i++ {
  321. for j := 0; j < n; j++ {
  322. if i == j {
  323. continue
  324. }
  325. dbs[i].verifyNodeExistence(t, dbs[j].config.NodeID, true)
  326. }
  327. }
  328. for i := 0; i < n; i++ {
  329. err := dbs[i].JoinNetwork("network1")
  330. assert.NoError(t, err)
  331. }
  332. for i := 0; i < n; i++ {
  333. for j := 0; j < n; j++ {
  334. dbs[i].verifyNetworkExistence(t, dbs[j].config.NodeID, "network1", true)
  335. }
  336. }
  337. err := dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  338. assert.NoError(t, err)
  339. for i := 1; i < n; i++ {
  340. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  341. }
  342. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  343. assert.NoError(t, err)
  344. for i := 1; i < n; i++ {
  345. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  346. }
  347. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  348. assert.NoError(t, err)
  349. for i := 1; i < n; i++ {
  350. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  351. }
  352. for i := 1; i < n; i++ {
  353. _, err = dbs[i].GetEntry("test_table", "network1", "test_key")
  354. assert.Error(t, err)
  355. assert.True(t, strings.Contains(err.Error(), "deleted and pending garbage collection"))
  356. }
  357. closeNetworkDBInstances(dbs)
  358. }
  359. func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
  360. maxRetry := 5
  361. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  362. // Single node Join/Leave
  363. err := dbs[0].JoinNetwork("network1")
  364. assert.NoError(t, err)
  365. if len(dbs[0].networkNodes["network1"]) != 1 {
  366. t.Fatalf("The networkNodes list has to have be 1 instead of %d", len(dbs[0].networkNodes["network1"]))
  367. }
  368. err = dbs[0].LeaveNetwork("network1")
  369. assert.NoError(t, err)
  370. if len(dbs[0].networkNodes["network1"]) != 0 {
  371. t.Fatalf("The networkNodes list has to have be 0 instead of %d", len(dbs[0].networkNodes["network1"]))
  372. }
  373. // Multiple nodes Join/Leave
  374. err = dbs[0].JoinNetwork("network1")
  375. assert.NoError(t, err)
  376. err = dbs[1].JoinNetwork("network1")
  377. assert.NoError(t, err)
  378. // Wait for the propagation on db[0]
  379. for i := 0; i < maxRetry; i++ {
  380. if len(dbs[0].networkNodes["network1"]) == 2 {
  381. break
  382. }
  383. time.Sleep(1 * time.Second)
  384. }
  385. if len(dbs[0].networkNodes["network1"]) != 2 {
  386. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"])
  387. }
  388. if n, ok := dbs[0].networks[dbs[0].config.NodeID]["network1"]; !ok || n.leaving {
  389. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  390. }
  391. // Wait for the propagation on db[1]
  392. for i := 0; i < maxRetry; i++ {
  393. if len(dbs[1].networkNodes["network1"]) == 2 {
  394. break
  395. }
  396. time.Sleep(1 * time.Second)
  397. }
  398. if len(dbs[1].networkNodes["network1"]) != 2 {
  399. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"])
  400. }
  401. if n, ok := dbs[1].networks[dbs[1].config.NodeID]["network1"]; !ok || n.leaving {
  402. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  403. }
  404. // Try a quick leave/join
  405. err = dbs[0].LeaveNetwork("network1")
  406. assert.NoError(t, err)
  407. err = dbs[0].JoinNetwork("network1")
  408. assert.NoError(t, err)
  409. for i := 0; i < maxRetry; i++ {
  410. if len(dbs[0].networkNodes["network1"]) == 2 {
  411. break
  412. }
  413. time.Sleep(1 * time.Second)
  414. }
  415. if len(dbs[0].networkNodes["network1"]) != 2 {
  416. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"])
  417. }
  418. for i := 0; i < maxRetry; i++ {
  419. if len(dbs[1].networkNodes["network1"]) == 2 {
  420. break
  421. }
  422. time.Sleep(1 * time.Second)
  423. }
  424. if len(dbs[1].networkNodes["network1"]) != 2 {
  425. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"])
  426. }
  427. closeNetworkDBInstances(dbs)
  428. }
  429. func TestNetworkDBGarbageCollection(t *testing.T) {
  430. keysWriteDelete := 5
  431. config := DefaultConfig()
  432. config.reapEntryInterval = 30 * time.Second
  433. config.StatsPrintPeriod = 15 * time.Second
  434. dbs := createNetworkDBInstances(t, 3, "node", config)
  435. // 2 Nodes join network
  436. err := dbs[0].JoinNetwork("network1")
  437. assert.NoError(t, err)
  438. err = dbs[1].JoinNetwork("network1")
  439. assert.NoError(t, err)
  440. for i := 0; i < keysWriteDelete; i++ {
  441. err = dbs[i%2].CreateEntry("testTable", "network1", "key-"+string(i), []byte("value"))
  442. assert.NoError(t, err)
  443. }
  444. time.Sleep(time.Second)
  445. for i := 0; i < keysWriteDelete; i++ {
  446. err = dbs[i%2].DeleteEntry("testTable", "network1", "key-"+string(i))
  447. assert.NoError(t, err)
  448. }
  449. for i := 0; i < 2; i++ {
  450. assert.Equal(t, keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries number should match")
  451. }
  452. // from this point the timer for the garbage collection started, wait 5 seconds and then join a new node
  453. time.Sleep(5 * time.Second)
  454. err = dbs[2].JoinNetwork("network1")
  455. assert.NoError(t, err)
  456. for i := 0; i < 3; i++ {
  457. assert.Equal(t, keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries number should match")
  458. }
  459. // at this point the entries should had been all deleted
  460. time.Sleep(30 * time.Second)
  461. for i := 0; i < 3; i++ {
  462. assert.Equal(t, 0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries should had been garbage collected")
  463. }
  464. // make sure that entries are not coming back
  465. time.Sleep(15 * time.Second)
  466. for i := 0; i < 3; i++ {
  467. assert.Equal(t, 0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries should had been garbage collected")
  468. }
  469. closeNetworkDBInstances(dbs)
  470. }
  471. func TestFindNode(t *testing.T) {
  472. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  473. dbs[0].nodes["active"] = &node{Node: memberlist.Node{Name: "active"}}
  474. dbs[0].failedNodes["failed"] = &node{Node: memberlist.Node{Name: "failed"}}
  475. dbs[0].leftNodes["left"] = &node{Node: memberlist.Node{Name: "left"}}
  476. // active nodes is 2 because the testing node is in the list
  477. assert.Equal(t, 2, len(dbs[0].nodes))
  478. assert.Equal(t, 1, len(dbs[0].failedNodes))
  479. assert.Equal(t, 1, len(dbs[0].leftNodes))
  480. n, currState, m := dbs[0].findNode("active")
  481. assert.NotNil(t, n)
  482. assert.Equal(t, "active", n.Name)
  483. assert.Equal(t, nodeActiveState, currState)
  484. assert.NotNil(t, m)
  485. // delete the entry manually
  486. delete(m, "active")
  487. // test if can be still find
  488. n, currState, m = dbs[0].findNode("active")
  489. assert.Nil(t, n)
  490. assert.Equal(t, nodeNotFound, currState)
  491. assert.Nil(t, m)
  492. n, currState, m = dbs[0].findNode("failed")
  493. assert.NotNil(t, n)
  494. assert.Equal(t, "failed", n.Name)
  495. assert.Equal(t, nodeFailedState, currState)
  496. assert.NotNil(t, m)
  497. // find and remove
  498. n, currState, m = dbs[0].findNode("left")
  499. assert.NotNil(t, n)
  500. assert.Equal(t, "left", n.Name)
  501. assert.Equal(t, nodeLeftState, currState)
  502. assert.NotNil(t, m)
  503. delete(m, "left")
  504. n, currState, m = dbs[0].findNode("left")
  505. assert.Nil(t, n)
  506. assert.Equal(t, nodeNotFound, currState)
  507. assert.Nil(t, m)
  508. closeNetworkDBInstances(dbs)
  509. }
  510. func TestChangeNodeState(t *testing.T) {
  511. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  512. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1"}}
  513. dbs[0].nodes["node2"] = &node{Node: memberlist.Node{Name: "node2"}}
  514. dbs[0].nodes["node3"] = &node{Node: memberlist.Node{Name: "node3"}}
  515. // active nodes is 4 because the testing node is in the list
  516. assert.Equal(t, 4, len(dbs[0].nodes))
  517. n, currState, m := dbs[0].findNode("node1")
  518. assert.NotNil(t, n)
  519. assert.Equal(t, nodeActiveState, currState)
  520. assert.Equal(t, "node1", n.Name)
  521. assert.NotNil(t, m)
  522. // node1 to failed
  523. dbs[0].changeNodeState("node1", nodeFailedState)
  524. n, currState, m = dbs[0].findNode("node1")
  525. assert.NotNil(t, n)
  526. assert.Equal(t, nodeFailedState, currState)
  527. assert.Equal(t, "node1", n.Name)
  528. assert.NotNil(t, m)
  529. assert.NotEqual(t, time.Duration(0), n.reapTime)
  530. // node1 back to active
  531. dbs[0].changeNodeState("node1", nodeActiveState)
  532. n, currState, m = dbs[0].findNode("node1")
  533. assert.NotNil(t, n)
  534. assert.Equal(t, nodeActiveState, currState)
  535. assert.Equal(t, "node1", n.Name)
  536. assert.NotNil(t, m)
  537. assert.Equal(t, time.Duration(0), n.reapTime)
  538. // node1 to left
  539. dbs[0].changeNodeState("node1", nodeLeftState)
  540. dbs[0].changeNodeState("node2", nodeLeftState)
  541. dbs[0].changeNodeState("node3", nodeLeftState)
  542. n, currState, m = dbs[0].findNode("node1")
  543. assert.NotNil(t, n)
  544. assert.Equal(t, nodeLeftState, currState)
  545. assert.Equal(t, "node1", n.Name)
  546. assert.NotNil(t, m)
  547. assert.NotEqual(t, time.Duration(0), n.reapTime)
  548. n, currState, m = dbs[0].findNode("node2")
  549. assert.NotNil(t, n)
  550. assert.Equal(t, nodeLeftState, currState)
  551. assert.Equal(t, "node2", n.Name)
  552. assert.NotNil(t, m)
  553. assert.NotEqual(t, time.Duration(0), n.reapTime)
  554. n, currState, m = dbs[0].findNode("node3")
  555. assert.NotNil(t, n)
  556. assert.Equal(t, nodeLeftState, currState)
  557. assert.Equal(t, "node3", n.Name)
  558. assert.NotNil(t, m)
  559. assert.NotEqual(t, time.Duration(0), n.reapTime)
  560. // active nodes is 1 because the testing node is in the list
  561. assert.Equal(t, 1, len(dbs[0].nodes))
  562. assert.Equal(t, 0, len(dbs[0].failedNodes))
  563. assert.Equal(t, 3, len(dbs[0].leftNodes))
  564. closeNetworkDBInstances(dbs)
  565. }
  566. func TestNodeReincarnation(t *testing.T) {
  567. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  568. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1", Addr: net.ParseIP("192.168.1.1")}}
  569. dbs[0].leftNodes["node2"] = &node{Node: memberlist.Node{Name: "node2", Addr: net.ParseIP("192.168.1.2")}}
  570. dbs[0].failedNodes["node3"] = &node{Node: memberlist.Node{Name: "node3", Addr: net.ParseIP("192.168.1.3")}}
  571. // active nodes is 2 because the testing node is in the list
  572. assert.Equal(t, 2, len(dbs[0].nodes))
  573. assert.Equal(t, 1, len(dbs[0].failedNodes))
  574. assert.Equal(t, 1, len(dbs[0].leftNodes))
  575. b := dbs[0].purgeReincarnation(&memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")})
  576. assert.True(t, b)
  577. dbs[0].nodes["node4"] = &node{Node: memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}}
  578. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.2")})
  579. assert.True(t, b)
  580. dbs[0].nodes["node5"] = &node{Node: memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.1")}}
  581. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.3")})
  582. assert.True(t, b)
  583. dbs[0].nodes["node6"] = &node{Node: memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.1")}}
  584. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.10")})
  585. assert.False(t, b)
  586. // active nodes is 1 because the testing node is in the list
  587. assert.Equal(t, 4, len(dbs[0].nodes))
  588. assert.Equal(t, 0, len(dbs[0].failedNodes))
  589. assert.Equal(t, 3, len(dbs[0].leftNodes))
  590. closeNetworkDBInstances(dbs)
  591. }
  592. func TestParallelCreate(t *testing.T) {
  593. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  594. startCh := make(chan int)
  595. doneCh := make(chan error)
  596. var success int32
  597. for i := 0; i < 20; i++ {
  598. go func() {
  599. <-startCh
  600. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  601. if err == nil {
  602. atomic.AddInt32(&success, 1)
  603. }
  604. doneCh <- err
  605. }()
  606. }
  607. close(startCh)
  608. for i := 0; i < 20; i++ {
  609. <-doneCh
  610. }
  611. close(doneCh)
  612. // Only 1 write should have succeeded
  613. assert.Equal(t, int32(1), success)
  614. closeNetworkDBInstances(dbs)
  615. }
  616. func TestParallelDelete(t *testing.T) {
  617. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  618. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  619. assert.NoError(t, err)
  620. startCh := make(chan int)
  621. doneCh := make(chan error)
  622. var success int32
  623. for i := 0; i < 20; i++ {
  624. go func() {
  625. <-startCh
  626. err := dbs[0].DeleteEntry("testTable", "testNetwork", "key")
  627. if err == nil {
  628. atomic.AddInt32(&success, 1)
  629. }
  630. doneCh <- err
  631. }()
  632. }
  633. close(startCh)
  634. for i := 0; i < 20; i++ {
  635. <-doneCh
  636. }
  637. close(doneCh)
  638. // Only 1 write should have succeeded
  639. assert.Equal(t, int32(1), success)
  640. closeNetworkDBInstances(dbs)
  641. }
  642. func TestNetworkDBIslands(t *testing.T) {
  643. logrus.SetLevel(logrus.DebugLevel)
  644. dbs := createNetworkDBInstances(t, 5, "node", DefaultConfig())
  645. // Get the node IP used currently
  646. node, _ := dbs[0].nodes[dbs[0].config.NodeID]
  647. baseIPStr := node.Addr.String()
  648. // Node 0,1,2 are going to be the 3 bootstrap nodes
  649. members := []string{fmt.Sprintf("%s:%d", baseIPStr, dbs[0].config.BindPort),
  650. fmt.Sprintf("%s:%d", baseIPStr, dbs[1].config.BindPort),
  651. fmt.Sprintf("%s:%d", baseIPStr, dbs[2].config.BindPort)}
  652. // Rejoining will update the list of the bootstrap members
  653. for i := 3; i < 5; i++ {
  654. assert.NoError(t, dbs[i].Join(members))
  655. }
  656. // Now the 3 bootstrap nodes will cleanly leave, and will be properly removed from the other 2 nodes
  657. for i := 0; i < 3; i++ {
  658. logrus.Infof("node %d leaving", i)
  659. dbs[i].Close()
  660. time.Sleep(2 * time.Second)
  661. }
  662. // Give some time to let the system propagate the messages and free up the ports
  663. time.Sleep(10 * time.Second)
  664. // Verify that the nodes are actually all gone and marked appropiately
  665. for i := 3; i < 5; i++ {
  666. assert.Len(t, dbs[i].leftNodes, 3)
  667. assert.Len(t, dbs[i].failedNodes, 0)
  668. }
  669. // Spawn again the first 3 nodes with different names but same IP:port
  670. for i := 0; i < 3; i++ {
  671. logrus.Infof("node %d coming back", i)
  672. dbs[i].config.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
  673. dbs[i] = launchNode(t, *dbs[i].config)
  674. time.Sleep(2 * time.Second)
  675. }
  676. // Give some time for the reconnect routine to run, it runs every 60s
  677. time.Sleep(50 * time.Second)
  678. // Verify that the cluster is again all connected. Note that the 3 previous node did not do any join
  679. for i := 0; i < 5; i++ {
  680. assert.Len(t, dbs[i].nodes, 5)
  681. assert.Len(t, dbs[i].failedNodes, 0)
  682. if i < 3 {
  683. // nodes from 0 to 3 has no left nodes
  684. assert.Len(t, dbs[i].leftNodes, 0)
  685. } else {
  686. // nodes from 4 to 5 has the 3 previous left nodes
  687. assert.Len(t, dbs[i].leftNodes, 3)
  688. }
  689. }
  690. }