networkdb_test.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798
  1. package networkdb
  2. import (
  3. "flag"
  4. "fmt"
  5. "io/ioutil"
  6. "log"
  7. "net"
  8. "os"
  9. "sync/atomic"
  10. "testing"
  11. "time"
  12. "github.com/docker/docker/pkg/stringid"
  13. "github.com/docker/go-events"
  14. "github.com/hashicorp/memberlist"
  15. "github.com/sirupsen/logrus"
  16. "github.com/stretchr/testify/assert"
  17. "github.com/stretchr/testify/require"
  18. )
  19. var (
  20. dbPort int32 = 10000
  21. runningInContainer = flag.Bool("incontainer", false, "Indicates if the test is running in a container")
  22. )
  23. func TestMain(m *testing.M) {
  24. ioutil.WriteFile("/proc/sys/net/ipv6/conf/lo/disable_ipv6", []byte{'0', '\n'}, 0644)
  25. logrus.SetLevel(logrus.ErrorLevel)
  26. os.Exit(m.Run())
  27. }
  28. func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
  29. var dbs []*NetworkDB
  30. for i := 0; i < num; i++ {
  31. localConfig := *conf
  32. localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
  33. localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
  34. localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1))
  35. db, err := New(&localConfig)
  36. require.NoError(t, err)
  37. if i != 0 {
  38. err = db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)})
  39. assert.NoError(t, err)
  40. }
  41. dbs = append(dbs, db)
  42. }
  43. // Check that the cluster is properly created
  44. for i := 0; i < num; i++ {
  45. if num != len(dbs[i].ClusterPeers()) {
  46. t.Fatalf("Number of nodes for %s into the cluster does not match %d != %d",
  47. dbs[i].config.Hostname, num, len(dbs[i].ClusterPeers()))
  48. }
  49. }
  50. return dbs
  51. }
  52. func closeNetworkDBInstances(dbs []*NetworkDB) {
  53. log.Print("Closing DB instances...")
  54. for _, db := range dbs {
  55. db.Close()
  56. }
  57. }
  58. func (db *NetworkDB) verifyNodeExistence(t *testing.T, node string, present bool) {
  59. for i := 0; i < 80; i++ {
  60. db.RLock()
  61. _, ok := db.nodes[node]
  62. db.RUnlock()
  63. if present && ok {
  64. return
  65. }
  66. if !present && !ok {
  67. return
  68. }
  69. time.Sleep(50 * time.Millisecond)
  70. }
  71. assert.Fail(t, fmt.Sprintf("%v(%v): Node existence verification for node %s failed", db.config.Hostname, db.config.NodeID, node))
  72. }
  73. func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) {
  74. for i := 0; i < 80; i++ {
  75. db.RLock()
  76. nn, nnok := db.networks[node]
  77. db.RUnlock()
  78. if nnok {
  79. n, ok := nn[id]
  80. if present && ok {
  81. return
  82. }
  83. if !present &&
  84. ((ok && n.leaving) ||
  85. !ok) {
  86. return
  87. }
  88. }
  89. time.Sleep(50 * time.Millisecond)
  90. }
  91. assert.Fail(t, "Network existence verification failed")
  92. }
  93. func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value string, present bool) {
  94. n := 80
  95. for i := 0; i < n; i++ {
  96. entry, err := db.getEntry(tname, nid, key)
  97. if present && err == nil && string(entry.value) == value {
  98. return
  99. }
  100. if !present &&
  101. ((err == nil && entry.deleting) ||
  102. (err != nil)) {
  103. return
  104. }
  105. if i == n-1 && !present && err != nil {
  106. return
  107. }
  108. time.Sleep(50 * time.Millisecond)
  109. }
  110. assert.Fail(t, fmt.Sprintf("Entry existence verification test failed for %v(%v)", db.config.Hostname, db.config.NodeID))
  111. }
  112. func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) {
  113. select {
  114. case rcvdEv := <-ch:
  115. assert.Equal(t, fmt.Sprintf("%T", rcvdEv), fmt.Sprintf("%T", ev))
  116. switch rcvdEv.(type) {
  117. case CreateEvent:
  118. assert.Equal(t, tname, rcvdEv.(CreateEvent).Table)
  119. assert.Equal(t, nid, rcvdEv.(CreateEvent).NetworkID)
  120. assert.Equal(t, key, rcvdEv.(CreateEvent).Key)
  121. assert.Equal(t, value, string(rcvdEv.(CreateEvent).Value))
  122. case UpdateEvent:
  123. assert.Equal(t, tname, rcvdEv.(UpdateEvent).Table)
  124. assert.Equal(t, nid, rcvdEv.(UpdateEvent).NetworkID)
  125. assert.Equal(t, key, rcvdEv.(UpdateEvent).Key)
  126. assert.Equal(t, value, string(rcvdEv.(UpdateEvent).Value))
  127. case DeleteEvent:
  128. assert.Equal(t, tname, rcvdEv.(DeleteEvent).Table)
  129. assert.Equal(t, nid, rcvdEv.(DeleteEvent).NetworkID)
  130. assert.Equal(t, key, rcvdEv.(DeleteEvent).Key)
  131. }
  132. case <-time.After(time.Second):
  133. t.Fail()
  134. return
  135. }
  136. }
  137. func TestNetworkDBSimple(t *testing.T) {
  138. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  139. closeNetworkDBInstances(dbs)
  140. }
  141. func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
  142. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  143. err := dbs[0].JoinNetwork("network1")
  144. assert.NoError(t, err)
  145. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  146. err = dbs[0].LeaveNetwork("network1")
  147. assert.NoError(t, err)
  148. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", false)
  149. closeNetworkDBInstances(dbs)
  150. }
  151. func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
  152. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  153. n := 10
  154. for i := 1; i <= n; i++ {
  155. err := dbs[0].JoinNetwork(fmt.Sprintf("network0%d", i))
  156. assert.NoError(t, err)
  157. }
  158. for i := 1; i <= n; i++ {
  159. err := dbs[1].JoinNetwork(fmt.Sprintf("network1%d", i))
  160. assert.NoError(t, err)
  161. }
  162. for i := 1; i <= n; i++ {
  163. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), true)
  164. }
  165. for i := 1; i <= n; i++ {
  166. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), true)
  167. }
  168. for i := 1; i <= n; i++ {
  169. err := dbs[0].LeaveNetwork(fmt.Sprintf("network0%d", i))
  170. assert.NoError(t, err)
  171. }
  172. for i := 1; i <= n; i++ {
  173. err := dbs[1].LeaveNetwork(fmt.Sprintf("network1%d", i))
  174. assert.NoError(t, err)
  175. }
  176. for i := 1; i <= n; i++ {
  177. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), false)
  178. }
  179. for i := 1; i <= n; i++ {
  180. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), false)
  181. }
  182. closeNetworkDBInstances(dbs)
  183. }
  184. func TestNetworkDBCRUDTableEntry(t *testing.T) {
  185. dbs := createNetworkDBInstances(t, 3, "node", DefaultConfig())
  186. err := dbs[0].JoinNetwork("network1")
  187. assert.NoError(t, err)
  188. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  189. err = dbs[1].JoinNetwork("network1")
  190. assert.NoError(t, err)
  191. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  192. assert.NoError(t, err)
  193. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  194. dbs[2].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  195. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  196. assert.NoError(t, err)
  197. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  198. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  199. assert.NoError(t, err)
  200. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  201. closeNetworkDBInstances(dbs)
  202. }
  203. func TestNetworkDBCRUDTableEntries(t *testing.T) {
  204. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  205. err := dbs[0].JoinNetwork("network1")
  206. assert.NoError(t, err)
  207. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  208. err = dbs[1].JoinNetwork("network1")
  209. assert.NoError(t, err)
  210. n := 10
  211. for i := 1; i <= n; i++ {
  212. err = dbs[0].CreateEntry("test_table", "network1",
  213. fmt.Sprintf("test_key0%d", i),
  214. []byte(fmt.Sprintf("test_value0%d", i)))
  215. assert.NoError(t, err)
  216. }
  217. for i := 1; i <= n; i++ {
  218. err = dbs[1].CreateEntry("test_table", "network1",
  219. fmt.Sprintf("test_key1%d", i),
  220. []byte(fmt.Sprintf("test_value1%d", i)))
  221. assert.NoError(t, err)
  222. }
  223. for i := 1; i <= n; i++ {
  224. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  225. fmt.Sprintf("test_key1%d", i),
  226. fmt.Sprintf("test_value1%d", i), true)
  227. assert.NoError(t, err)
  228. }
  229. for i := 1; i <= n; i++ {
  230. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  231. fmt.Sprintf("test_key0%d", i),
  232. fmt.Sprintf("test_value0%d", i), true)
  233. assert.NoError(t, err)
  234. }
  235. // Verify deletes
  236. for i := 1; i <= n; i++ {
  237. err = dbs[0].DeleteEntry("test_table", "network1",
  238. fmt.Sprintf("test_key0%d", i))
  239. assert.NoError(t, err)
  240. }
  241. for i := 1; i <= n; i++ {
  242. err = dbs[1].DeleteEntry("test_table", "network1",
  243. fmt.Sprintf("test_key1%d", i))
  244. assert.NoError(t, err)
  245. }
  246. for i := 1; i <= n; i++ {
  247. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  248. fmt.Sprintf("test_key1%d", i), "", false)
  249. assert.NoError(t, err)
  250. }
  251. for i := 1; i <= n; i++ {
  252. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  253. fmt.Sprintf("test_key0%d", i), "", false)
  254. assert.NoError(t, err)
  255. }
  256. closeNetworkDBInstances(dbs)
  257. }
  258. func TestNetworkDBNodeLeave(t *testing.T) {
  259. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  260. err := dbs[0].JoinNetwork("network1")
  261. assert.NoError(t, err)
  262. err = dbs[1].JoinNetwork("network1")
  263. assert.NoError(t, err)
  264. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  265. assert.NoError(t, err)
  266. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  267. dbs[0].Close()
  268. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  269. dbs[1].Close()
  270. }
  271. func TestNetworkDBWatch(t *testing.T) {
  272. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  273. err := dbs[0].JoinNetwork("network1")
  274. assert.NoError(t, err)
  275. err = dbs[1].JoinNetwork("network1")
  276. assert.NoError(t, err)
  277. ch, cancel := dbs[1].Watch("", "", "")
  278. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  279. assert.NoError(t, err)
  280. testWatch(t, ch.C, CreateEvent{}, "test_table", "network1", "test_key", "test_value")
  281. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  282. assert.NoError(t, err)
  283. testWatch(t, ch.C, UpdateEvent{}, "test_table", "network1", "test_key", "test_updated_value")
  284. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  285. assert.NoError(t, err)
  286. testWatch(t, ch.C, DeleteEvent{}, "test_table", "network1", "test_key", "")
  287. cancel()
  288. closeNetworkDBInstances(dbs)
  289. }
  290. func TestNetworkDBBulkSync(t *testing.T) {
  291. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  292. err := dbs[0].JoinNetwork("network1")
  293. assert.NoError(t, err)
  294. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  295. n := 1000
  296. for i := 1; i <= n; i++ {
  297. err = dbs[0].CreateEntry("test_table", "network1",
  298. fmt.Sprintf("test_key0%d", i),
  299. []byte(fmt.Sprintf("test_value0%d", i)))
  300. assert.NoError(t, err)
  301. }
  302. err = dbs[1].JoinNetwork("network1")
  303. assert.NoError(t, err)
  304. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  305. for i := 1; i <= n; i++ {
  306. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  307. fmt.Sprintf("test_key0%d", i),
  308. fmt.Sprintf("test_value0%d", i), true)
  309. assert.NoError(t, err)
  310. }
  311. closeNetworkDBInstances(dbs)
  312. }
  313. func TestNetworkDBCRUDMediumCluster(t *testing.T) {
  314. n := 5
  315. dbs := createNetworkDBInstances(t, n, "node", DefaultConfig())
  316. for i := 0; i < n; i++ {
  317. for j := 0; j < n; j++ {
  318. if i == j {
  319. continue
  320. }
  321. dbs[i].verifyNodeExistence(t, dbs[j].config.NodeID, true)
  322. }
  323. }
  324. for i := 0; i < n; i++ {
  325. err := dbs[i].JoinNetwork("network1")
  326. assert.NoError(t, err)
  327. }
  328. for i := 0; i < n; i++ {
  329. for j := 0; j < n; j++ {
  330. dbs[i].verifyNetworkExistence(t, dbs[j].config.NodeID, "network1", true)
  331. }
  332. }
  333. err := dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  334. assert.NoError(t, err)
  335. for i := 1; i < n; i++ {
  336. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  337. }
  338. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  339. assert.NoError(t, err)
  340. for i := 1; i < n; i++ {
  341. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  342. }
  343. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  344. assert.NoError(t, err)
  345. for i := 1; i < n; i++ {
  346. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  347. }
  348. closeNetworkDBInstances(dbs)
  349. }
  350. func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
  351. maxRetry := 5
  352. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  353. // Single node Join/Leave
  354. err := dbs[0].JoinNetwork("network1")
  355. assert.NoError(t, err)
  356. if len(dbs[0].networkNodes["network1"]) != 1 {
  357. t.Fatalf("The networkNodes list has to have be 1 instead of %d", len(dbs[0].networkNodes["network1"]))
  358. }
  359. err = dbs[0].LeaveNetwork("network1")
  360. assert.NoError(t, err)
  361. if len(dbs[0].networkNodes["network1"]) != 0 {
  362. t.Fatalf("The networkNodes list has to have be 0 instead of %d", len(dbs[0].networkNodes["network1"]))
  363. }
  364. // Multiple nodes Join/Leave
  365. err = dbs[0].JoinNetwork("network1")
  366. assert.NoError(t, err)
  367. err = dbs[1].JoinNetwork("network1")
  368. assert.NoError(t, err)
  369. // Wait for the propagation on db[0]
  370. for i := 0; i < maxRetry; i++ {
  371. if len(dbs[0].networkNodes["network1"]) == 2 {
  372. break
  373. }
  374. time.Sleep(1 * time.Second)
  375. }
  376. if len(dbs[0].networkNodes["network1"]) != 2 {
  377. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"])
  378. }
  379. if n, ok := dbs[0].networks[dbs[0].config.NodeID]["network1"]; !ok || n.leaving {
  380. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  381. }
  382. // Wait for the propagation on db[1]
  383. for i := 0; i < maxRetry; i++ {
  384. if len(dbs[1].networkNodes["network1"]) == 2 {
  385. break
  386. }
  387. time.Sleep(1 * time.Second)
  388. }
  389. if len(dbs[1].networkNodes["network1"]) != 2 {
  390. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"])
  391. }
  392. if n, ok := dbs[1].networks[dbs[1].config.NodeID]["network1"]; !ok || n.leaving {
  393. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  394. }
  395. // Try a quick leave/join
  396. err = dbs[0].LeaveNetwork("network1")
  397. assert.NoError(t, err)
  398. err = dbs[0].JoinNetwork("network1")
  399. assert.NoError(t, err)
  400. for i := 0; i < maxRetry; i++ {
  401. if len(dbs[0].networkNodes["network1"]) == 2 {
  402. break
  403. }
  404. time.Sleep(1 * time.Second)
  405. }
  406. if len(dbs[0].networkNodes["network1"]) != 2 {
  407. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"])
  408. }
  409. for i := 0; i < maxRetry; i++ {
  410. if len(dbs[1].networkNodes["network1"]) == 2 {
  411. break
  412. }
  413. time.Sleep(1 * time.Second)
  414. }
  415. if len(dbs[1].networkNodes["network1"]) != 2 {
  416. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"])
  417. }
  418. closeNetworkDBInstances(dbs)
  419. }
  420. func TestNetworkDBGarbageCollection(t *testing.T) {
  421. keysWriteDelete := 5
  422. config := DefaultConfig()
  423. config.reapEntryInterval = 30 * time.Second
  424. config.StatsPrintPeriod = 15 * time.Second
  425. dbs := createNetworkDBInstances(t, 3, "node", config)
  426. // 2 Nodes join network
  427. err := dbs[0].JoinNetwork("network1")
  428. assert.NoError(t, err)
  429. err = dbs[1].JoinNetwork("network1")
  430. assert.NoError(t, err)
  431. for i := 0; i < keysWriteDelete; i++ {
  432. err = dbs[i%2].CreateEntry("testTable", "network1", "key-"+string(i), []byte("value"))
  433. assert.NoError(t, err)
  434. }
  435. time.Sleep(time.Second)
  436. for i := 0; i < keysWriteDelete; i++ {
  437. err = dbs[i%2].DeleteEntry("testTable", "network1", "key-"+string(i))
  438. assert.NoError(t, err)
  439. }
  440. for i := 0; i < 2; i++ {
  441. assert.Equal(t, keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries number should match")
  442. }
  443. // from this point the timer for the garbage collection started, wait 5 seconds and then join a new node
  444. time.Sleep(5 * time.Second)
  445. err = dbs[2].JoinNetwork("network1")
  446. assert.NoError(t, err)
  447. for i := 0; i < 3; i++ {
  448. assert.Equal(t, keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries number should match")
  449. }
  450. // at this point the entries should had been all deleted
  451. time.Sleep(30 * time.Second)
  452. for i := 0; i < 3; i++ {
  453. assert.Equal(t, 0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries should had been garbage collected")
  454. }
  455. // make sure that entries are not coming back
  456. time.Sleep(15 * time.Second)
  457. for i := 0; i < 3; i++ {
  458. assert.Equal(t, 0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries should had been garbage collected")
  459. }
  460. closeNetworkDBInstances(dbs)
  461. }
  462. func TestFindNode(t *testing.T) {
  463. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  464. dbs[0].nodes["active"] = &node{Node: memberlist.Node{Name: "active"}}
  465. dbs[0].failedNodes["failed"] = &node{Node: memberlist.Node{Name: "failed"}}
  466. dbs[0].leftNodes["left"] = &node{Node: memberlist.Node{Name: "left"}}
  467. // active nodes is 2 because the testing node is in the list
  468. assert.Equal(t, 2, len(dbs[0].nodes))
  469. assert.Equal(t, 1, len(dbs[0].failedNodes))
  470. assert.Equal(t, 1, len(dbs[0].leftNodes))
  471. n, currState, m := dbs[0].findNode("active")
  472. assert.NotNil(t, n)
  473. assert.Equal(t, "active", n.Name)
  474. assert.Equal(t, nodeActiveState, currState)
  475. assert.NotNil(t, m)
  476. // delete the entry manually
  477. delete(m, "active")
  478. // test if can be still find
  479. n, currState, m = dbs[0].findNode("active")
  480. assert.Nil(t, n)
  481. assert.Equal(t, nodeNotFound, currState)
  482. assert.Nil(t, m)
  483. n, currState, m = dbs[0].findNode("failed")
  484. assert.NotNil(t, n)
  485. assert.Equal(t, "failed", n.Name)
  486. assert.Equal(t, nodeFailedState, currState)
  487. assert.NotNil(t, m)
  488. // find and remove
  489. n, currState, m = dbs[0].findNode("left")
  490. assert.NotNil(t, n)
  491. assert.Equal(t, "left", n.Name)
  492. assert.Equal(t, nodeLeftState, currState)
  493. assert.NotNil(t, m)
  494. delete(m, "left")
  495. n, currState, m = dbs[0].findNode("left")
  496. assert.Nil(t, n)
  497. assert.Equal(t, nodeNotFound, currState)
  498. assert.Nil(t, m)
  499. closeNetworkDBInstances(dbs)
  500. }
  501. func TestChangeNodeState(t *testing.T) {
  502. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  503. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1"}}
  504. dbs[0].nodes["node2"] = &node{Node: memberlist.Node{Name: "node2"}}
  505. dbs[0].nodes["node3"] = &node{Node: memberlist.Node{Name: "node3"}}
  506. // active nodes is 4 because the testing node is in the list
  507. assert.Equal(t, 4, len(dbs[0].nodes))
  508. n, currState, m := dbs[0].findNode("node1")
  509. assert.NotNil(t, n)
  510. assert.Equal(t, nodeActiveState, currState)
  511. assert.Equal(t, "node1", n.Name)
  512. assert.NotNil(t, m)
  513. // node1 to failed
  514. dbs[0].changeNodeState("node1", nodeFailedState)
  515. n, currState, m = dbs[0].findNode("node1")
  516. assert.NotNil(t, n)
  517. assert.Equal(t, nodeFailedState, currState)
  518. assert.Equal(t, "node1", n.Name)
  519. assert.NotNil(t, m)
  520. assert.NotEqual(t, time.Duration(0), n.reapTime)
  521. // node1 back to active
  522. dbs[0].changeNodeState("node1", nodeActiveState)
  523. n, currState, m = dbs[0].findNode("node1")
  524. assert.NotNil(t, n)
  525. assert.Equal(t, nodeActiveState, currState)
  526. assert.Equal(t, "node1", n.Name)
  527. assert.NotNil(t, m)
  528. assert.Equal(t, time.Duration(0), n.reapTime)
  529. // node1 to left
  530. dbs[0].changeNodeState("node1", nodeLeftState)
  531. dbs[0].changeNodeState("node2", nodeLeftState)
  532. dbs[0].changeNodeState("node3", nodeLeftState)
  533. n, currState, m = dbs[0].findNode("node1")
  534. assert.NotNil(t, n)
  535. assert.Equal(t, nodeLeftState, currState)
  536. assert.Equal(t, "node1", n.Name)
  537. assert.NotNil(t, m)
  538. assert.NotEqual(t, time.Duration(0), n.reapTime)
  539. n, currState, m = dbs[0].findNode("node2")
  540. assert.NotNil(t, n)
  541. assert.Equal(t, nodeLeftState, currState)
  542. assert.Equal(t, "node2", n.Name)
  543. assert.NotNil(t, m)
  544. assert.NotEqual(t, time.Duration(0), n.reapTime)
  545. n, currState, m = dbs[0].findNode("node3")
  546. assert.NotNil(t, n)
  547. assert.Equal(t, nodeLeftState, currState)
  548. assert.Equal(t, "node3", n.Name)
  549. assert.NotNil(t, m)
  550. assert.NotEqual(t, time.Duration(0), n.reapTime)
  551. // active nodes is 1 because the testing node is in the list
  552. assert.Equal(t, 1, len(dbs[0].nodes))
  553. assert.Equal(t, 0, len(dbs[0].failedNodes))
  554. assert.Equal(t, 3, len(dbs[0].leftNodes))
  555. closeNetworkDBInstances(dbs)
  556. }
  557. func TestNodeReincarnation(t *testing.T) {
  558. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  559. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1", Addr: net.ParseIP("192.168.1.1")}}
  560. dbs[0].leftNodes["node2"] = &node{Node: memberlist.Node{Name: "node2", Addr: net.ParseIP("192.168.1.2")}}
  561. dbs[0].failedNodes["node3"] = &node{Node: memberlist.Node{Name: "node3", Addr: net.ParseIP("192.168.1.3")}}
  562. // active nodes is 2 because the testing node is in the list
  563. assert.Equal(t, 2, len(dbs[0].nodes))
  564. assert.Equal(t, 1, len(dbs[0].failedNodes))
  565. assert.Equal(t, 1, len(dbs[0].leftNodes))
  566. b := dbs[0].purgeReincarnation(&memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")})
  567. assert.True(t, b)
  568. dbs[0].nodes["node4"] = &node{Node: memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}}
  569. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.2")})
  570. assert.True(t, b)
  571. dbs[0].nodes["node5"] = &node{Node: memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.1")}}
  572. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.3")})
  573. assert.True(t, b)
  574. dbs[0].nodes["node6"] = &node{Node: memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.1")}}
  575. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.10")})
  576. assert.False(t, b)
  577. // active nodes is 1 because the testing node is in the list
  578. assert.Equal(t, 4, len(dbs[0].nodes))
  579. assert.Equal(t, 0, len(dbs[0].failedNodes))
  580. assert.Equal(t, 3, len(dbs[0].leftNodes))
  581. closeNetworkDBInstances(dbs)
  582. }
  583. func TestParallelCreate(t *testing.T) {
  584. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  585. startCh := make(chan int)
  586. doneCh := make(chan error)
  587. var success int32
  588. for i := 0; i < 20; i++ {
  589. go func() {
  590. <-startCh
  591. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  592. if err == nil {
  593. atomic.AddInt32(&success, 1)
  594. }
  595. doneCh <- err
  596. }()
  597. }
  598. close(startCh)
  599. for i := 0; i < 20; i++ {
  600. <-doneCh
  601. }
  602. close(doneCh)
  603. // Only 1 write should have succeeded
  604. assert.Equal(t, int32(1), success)
  605. closeNetworkDBInstances(dbs)
  606. }
  607. func TestParallelDelete(t *testing.T) {
  608. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  609. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  610. assert.NoError(t, err)
  611. startCh := make(chan int)
  612. doneCh := make(chan error)
  613. var success int32
  614. for i := 0; i < 20; i++ {
  615. go func() {
  616. <-startCh
  617. err := dbs[0].DeleteEntry("testTable", "testNetwork", "key")
  618. if err == nil {
  619. atomic.AddInt32(&success, 1)
  620. }
  621. doneCh <- err
  622. }()
  623. }
  624. close(startCh)
  625. for i := 0; i < 20; i++ {
  626. <-doneCh
  627. }
  628. close(doneCh)
  629. // Only 1 write should have succeeded
  630. assert.Equal(t, int32(1), success)
  631. closeNetworkDBInstances(dbs)
  632. }