networkdb_test.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805
  1. package networkdb
  2. import (
  3. "flag"
  4. "fmt"
  5. "io/ioutil"
  6. "log"
  7. "net"
  8. "os"
  9. "strings"
  10. "sync/atomic"
  11. "testing"
  12. "time"
  13. "github.com/docker/docker/pkg/stringid"
  14. "github.com/docker/go-events"
  15. "github.com/hashicorp/memberlist"
  16. "github.com/sirupsen/logrus"
  17. "github.com/stretchr/testify/assert"
  18. "github.com/stretchr/testify/require"
  19. )
  20. var (
  21. dbPort int32 = 10000
  22. runningInContainer = flag.Bool("incontainer", false, "Indicates if the test is running in a container")
  23. )
  24. func TestMain(m *testing.M) {
  25. ioutil.WriteFile("/proc/sys/net/ipv6/conf/lo/disable_ipv6", []byte{'0', '\n'}, 0644)
  26. logrus.SetLevel(logrus.ErrorLevel)
  27. os.Exit(m.Run())
  28. }
  29. func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB {
  30. var dbs []*NetworkDB
  31. for i := 0; i < num; i++ {
  32. localConfig := *conf
  33. localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1)
  34. localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID())
  35. localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1))
  36. db, err := New(&localConfig)
  37. require.NoError(t, err)
  38. if i != 0 {
  39. err = db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)})
  40. assert.NoError(t, err)
  41. }
  42. dbs = append(dbs, db)
  43. }
  44. // Check that the cluster is properly created
  45. for i := 0; i < num; i++ {
  46. if num != len(dbs[i].ClusterPeers()) {
  47. t.Fatalf("Number of nodes for %s into the cluster does not match %d != %d",
  48. dbs[i].config.Hostname, num, len(dbs[i].ClusterPeers()))
  49. }
  50. }
  51. return dbs
  52. }
  53. func closeNetworkDBInstances(dbs []*NetworkDB) {
  54. log.Print("Closing DB instances...")
  55. for _, db := range dbs {
  56. db.Close()
  57. }
  58. }
  59. func (db *NetworkDB) verifyNodeExistence(t *testing.T, node string, present bool) {
  60. for i := 0; i < 80; i++ {
  61. db.RLock()
  62. _, ok := db.nodes[node]
  63. db.RUnlock()
  64. if present && ok {
  65. return
  66. }
  67. if !present && !ok {
  68. return
  69. }
  70. time.Sleep(50 * time.Millisecond)
  71. }
  72. assert.Fail(t, fmt.Sprintf("%v(%v): Node existence verification for node %s failed", db.config.Hostname, db.config.NodeID, node))
  73. }
  74. func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) {
  75. for i := 0; i < 80; i++ {
  76. db.RLock()
  77. nn, nnok := db.networks[node]
  78. db.RUnlock()
  79. if nnok {
  80. n, ok := nn[id]
  81. if present && ok {
  82. return
  83. }
  84. if !present &&
  85. ((ok && n.leaving) ||
  86. !ok) {
  87. return
  88. }
  89. }
  90. time.Sleep(50 * time.Millisecond)
  91. }
  92. assert.Fail(t, "Network existence verification failed")
  93. }
  94. func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value string, present bool) {
  95. n := 80
  96. for i := 0; i < n; i++ {
  97. entry, err := db.getEntry(tname, nid, key)
  98. if present && err == nil && string(entry.value) == value {
  99. return
  100. }
  101. if !present &&
  102. ((err == nil && entry.deleting) ||
  103. (err != nil)) {
  104. return
  105. }
  106. if i == n-1 && !present && err != nil {
  107. return
  108. }
  109. time.Sleep(50 * time.Millisecond)
  110. }
  111. assert.Fail(t, fmt.Sprintf("Entry existence verification test failed for %v(%v)", db.config.Hostname, db.config.NodeID))
  112. }
  113. func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) {
  114. select {
  115. case rcvdEv := <-ch:
  116. assert.Equal(t, fmt.Sprintf("%T", rcvdEv), fmt.Sprintf("%T", ev))
  117. switch rcvdEv.(type) {
  118. case CreateEvent:
  119. assert.Equal(t, tname, rcvdEv.(CreateEvent).Table)
  120. assert.Equal(t, nid, rcvdEv.(CreateEvent).NetworkID)
  121. assert.Equal(t, key, rcvdEv.(CreateEvent).Key)
  122. assert.Equal(t, value, string(rcvdEv.(CreateEvent).Value))
  123. case UpdateEvent:
  124. assert.Equal(t, tname, rcvdEv.(UpdateEvent).Table)
  125. assert.Equal(t, nid, rcvdEv.(UpdateEvent).NetworkID)
  126. assert.Equal(t, key, rcvdEv.(UpdateEvent).Key)
  127. assert.Equal(t, value, string(rcvdEv.(UpdateEvent).Value))
  128. case DeleteEvent:
  129. assert.Equal(t, tname, rcvdEv.(DeleteEvent).Table)
  130. assert.Equal(t, nid, rcvdEv.(DeleteEvent).NetworkID)
  131. assert.Equal(t, key, rcvdEv.(DeleteEvent).Key)
  132. }
  133. case <-time.After(time.Second):
  134. t.Fail()
  135. return
  136. }
  137. }
  138. func TestNetworkDBSimple(t *testing.T) {
  139. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  140. closeNetworkDBInstances(dbs)
  141. }
  142. func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
  143. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  144. err := dbs[0].JoinNetwork("network1")
  145. assert.NoError(t, err)
  146. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  147. err = dbs[0].LeaveNetwork("network1")
  148. assert.NoError(t, err)
  149. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", false)
  150. closeNetworkDBInstances(dbs)
  151. }
  152. func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
  153. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  154. n := 10
  155. for i := 1; i <= n; i++ {
  156. err := dbs[0].JoinNetwork(fmt.Sprintf("network0%d", i))
  157. assert.NoError(t, err)
  158. }
  159. for i := 1; i <= n; i++ {
  160. err := dbs[1].JoinNetwork(fmt.Sprintf("network1%d", i))
  161. assert.NoError(t, err)
  162. }
  163. for i := 1; i <= n; i++ {
  164. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), true)
  165. }
  166. for i := 1; i <= n; i++ {
  167. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), true)
  168. }
  169. for i := 1; i <= n; i++ {
  170. err := dbs[0].LeaveNetwork(fmt.Sprintf("network0%d", i))
  171. assert.NoError(t, err)
  172. }
  173. for i := 1; i <= n; i++ {
  174. err := dbs[1].LeaveNetwork(fmt.Sprintf("network1%d", i))
  175. assert.NoError(t, err)
  176. }
  177. for i := 1; i <= n; i++ {
  178. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, fmt.Sprintf("network0%d", i), false)
  179. }
  180. for i := 1; i <= n; i++ {
  181. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, fmt.Sprintf("network1%d", i), false)
  182. }
  183. closeNetworkDBInstances(dbs)
  184. }
  185. func TestNetworkDBCRUDTableEntry(t *testing.T) {
  186. dbs := createNetworkDBInstances(t, 3, "node", DefaultConfig())
  187. err := dbs[0].JoinNetwork("network1")
  188. assert.NoError(t, err)
  189. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  190. err = dbs[1].JoinNetwork("network1")
  191. assert.NoError(t, err)
  192. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  193. assert.NoError(t, err)
  194. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  195. dbs[2].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  196. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  197. assert.NoError(t, err)
  198. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  199. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  200. assert.NoError(t, err)
  201. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  202. closeNetworkDBInstances(dbs)
  203. }
  204. func TestNetworkDBCRUDTableEntries(t *testing.T) {
  205. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  206. err := dbs[0].JoinNetwork("network1")
  207. assert.NoError(t, err)
  208. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  209. err = dbs[1].JoinNetwork("network1")
  210. assert.NoError(t, err)
  211. n := 10
  212. for i := 1; i <= n; i++ {
  213. err = dbs[0].CreateEntry("test_table", "network1",
  214. fmt.Sprintf("test_key0%d", i),
  215. []byte(fmt.Sprintf("test_value0%d", i)))
  216. assert.NoError(t, err)
  217. }
  218. for i := 1; i <= n; i++ {
  219. err = dbs[1].CreateEntry("test_table", "network1",
  220. fmt.Sprintf("test_key1%d", i),
  221. []byte(fmt.Sprintf("test_value1%d", i)))
  222. assert.NoError(t, err)
  223. }
  224. for i := 1; i <= n; i++ {
  225. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  226. fmt.Sprintf("test_key1%d", i),
  227. fmt.Sprintf("test_value1%d", i), true)
  228. assert.NoError(t, err)
  229. }
  230. for i := 1; i <= n; i++ {
  231. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  232. fmt.Sprintf("test_key0%d", i),
  233. fmt.Sprintf("test_value0%d", i), true)
  234. assert.NoError(t, err)
  235. }
  236. // Verify deletes
  237. for i := 1; i <= n; i++ {
  238. err = dbs[0].DeleteEntry("test_table", "network1",
  239. fmt.Sprintf("test_key0%d", i))
  240. assert.NoError(t, err)
  241. }
  242. for i := 1; i <= n; i++ {
  243. err = dbs[1].DeleteEntry("test_table", "network1",
  244. fmt.Sprintf("test_key1%d", i))
  245. assert.NoError(t, err)
  246. }
  247. for i := 1; i <= n; i++ {
  248. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  249. fmt.Sprintf("test_key1%d", i), "", false)
  250. assert.NoError(t, err)
  251. }
  252. for i := 1; i <= n; i++ {
  253. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  254. fmt.Sprintf("test_key0%d", i), "", false)
  255. assert.NoError(t, err)
  256. }
  257. closeNetworkDBInstances(dbs)
  258. }
  259. func TestNetworkDBNodeLeave(t *testing.T) {
  260. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  261. err := dbs[0].JoinNetwork("network1")
  262. assert.NoError(t, err)
  263. err = dbs[1].JoinNetwork("network1")
  264. assert.NoError(t, err)
  265. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  266. assert.NoError(t, err)
  267. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  268. dbs[0].Close()
  269. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  270. dbs[1].Close()
  271. }
  272. func TestNetworkDBWatch(t *testing.T) {
  273. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  274. err := dbs[0].JoinNetwork("network1")
  275. assert.NoError(t, err)
  276. err = dbs[1].JoinNetwork("network1")
  277. assert.NoError(t, err)
  278. ch, cancel := dbs[1].Watch("", "", "")
  279. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  280. assert.NoError(t, err)
  281. testWatch(t, ch.C, CreateEvent{}, "test_table", "network1", "test_key", "test_value")
  282. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  283. assert.NoError(t, err)
  284. testWatch(t, ch.C, UpdateEvent{}, "test_table", "network1", "test_key", "test_updated_value")
  285. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  286. assert.NoError(t, err)
  287. testWatch(t, ch.C, DeleteEvent{}, "test_table", "network1", "test_key", "")
  288. cancel()
  289. closeNetworkDBInstances(dbs)
  290. }
  291. func TestNetworkDBBulkSync(t *testing.T) {
  292. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  293. err := dbs[0].JoinNetwork("network1")
  294. assert.NoError(t, err)
  295. dbs[1].verifyNetworkExistence(t, dbs[0].config.NodeID, "network1", true)
  296. n := 1000
  297. for i := 1; i <= n; i++ {
  298. err = dbs[0].CreateEntry("test_table", "network1",
  299. fmt.Sprintf("test_key0%d", i),
  300. []byte(fmt.Sprintf("test_value0%d", i)))
  301. assert.NoError(t, err)
  302. }
  303. err = dbs[1].JoinNetwork("network1")
  304. assert.NoError(t, err)
  305. dbs[0].verifyNetworkExistence(t, dbs[1].config.NodeID, "network1", true)
  306. for i := 1; i <= n; i++ {
  307. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  308. fmt.Sprintf("test_key0%d", i),
  309. fmt.Sprintf("test_value0%d", i), true)
  310. assert.NoError(t, err)
  311. }
  312. closeNetworkDBInstances(dbs)
  313. }
  314. func TestNetworkDBCRUDMediumCluster(t *testing.T) {
  315. n := 5
  316. dbs := createNetworkDBInstances(t, n, "node", DefaultConfig())
  317. for i := 0; i < n; i++ {
  318. for j := 0; j < n; j++ {
  319. if i == j {
  320. continue
  321. }
  322. dbs[i].verifyNodeExistence(t, dbs[j].config.NodeID, true)
  323. }
  324. }
  325. for i := 0; i < n; i++ {
  326. err := dbs[i].JoinNetwork("network1")
  327. assert.NoError(t, err)
  328. }
  329. for i := 0; i < n; i++ {
  330. for j := 0; j < n; j++ {
  331. dbs[i].verifyNetworkExistence(t, dbs[j].config.NodeID, "network1", true)
  332. }
  333. }
  334. err := dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  335. assert.NoError(t, err)
  336. for i := 1; i < n; i++ {
  337. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  338. }
  339. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  340. assert.NoError(t, err)
  341. for i := 1; i < n; i++ {
  342. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  343. }
  344. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  345. assert.NoError(t, err)
  346. for i := 1; i < n; i++ {
  347. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  348. }
  349. for i := 1; i < n; i++ {
  350. _, err = dbs[i].GetEntry("test_table", "network1", "test_key")
  351. assert.Error(t, err)
  352. assert.True(t, strings.Contains(err.Error(), "deleted and pending garbage collection"))
  353. }
  354. closeNetworkDBInstances(dbs)
  355. }
  356. func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) {
  357. maxRetry := 5
  358. dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig())
  359. // Single node Join/Leave
  360. err := dbs[0].JoinNetwork("network1")
  361. assert.NoError(t, err)
  362. if len(dbs[0].networkNodes["network1"]) != 1 {
  363. t.Fatalf("The networkNodes list has to have be 1 instead of %d", len(dbs[0].networkNodes["network1"]))
  364. }
  365. err = dbs[0].LeaveNetwork("network1")
  366. assert.NoError(t, err)
  367. if len(dbs[0].networkNodes["network1"]) != 0 {
  368. t.Fatalf("The networkNodes list has to have be 0 instead of %d", len(dbs[0].networkNodes["network1"]))
  369. }
  370. // Multiple nodes Join/Leave
  371. err = dbs[0].JoinNetwork("network1")
  372. assert.NoError(t, err)
  373. err = dbs[1].JoinNetwork("network1")
  374. assert.NoError(t, err)
  375. // Wait for the propagation on db[0]
  376. for i := 0; i < maxRetry; i++ {
  377. if len(dbs[0].networkNodes["network1"]) == 2 {
  378. break
  379. }
  380. time.Sleep(1 * time.Second)
  381. }
  382. if len(dbs[0].networkNodes["network1"]) != 2 {
  383. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"])
  384. }
  385. if n, ok := dbs[0].networks[dbs[0].config.NodeID]["network1"]; !ok || n.leaving {
  386. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  387. }
  388. // Wait for the propagation on db[1]
  389. for i := 0; i < maxRetry; i++ {
  390. if len(dbs[1].networkNodes["network1"]) == 2 {
  391. break
  392. }
  393. time.Sleep(1 * time.Second)
  394. }
  395. if len(dbs[1].networkNodes["network1"]) != 2 {
  396. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"])
  397. }
  398. if n, ok := dbs[1].networks[dbs[1].config.NodeID]["network1"]; !ok || n.leaving {
  399. t.Fatalf("The network should not be marked as leaving:%t", n.leaving)
  400. }
  401. // Try a quick leave/join
  402. err = dbs[0].LeaveNetwork("network1")
  403. assert.NoError(t, err)
  404. err = dbs[0].JoinNetwork("network1")
  405. assert.NoError(t, err)
  406. for i := 0; i < maxRetry; i++ {
  407. if len(dbs[0].networkNodes["network1"]) == 2 {
  408. break
  409. }
  410. time.Sleep(1 * time.Second)
  411. }
  412. if len(dbs[0].networkNodes["network1"]) != 2 {
  413. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[0].networkNodes["network1"]), dbs[0].networkNodes["network1"])
  414. }
  415. for i := 0; i < maxRetry; i++ {
  416. if len(dbs[1].networkNodes["network1"]) == 2 {
  417. break
  418. }
  419. time.Sleep(1 * time.Second)
  420. }
  421. if len(dbs[1].networkNodes["network1"]) != 2 {
  422. t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"])
  423. }
  424. closeNetworkDBInstances(dbs)
  425. }
  426. func TestNetworkDBGarbageCollection(t *testing.T) {
  427. keysWriteDelete := 5
  428. config := DefaultConfig()
  429. config.reapEntryInterval = 30 * time.Second
  430. config.StatsPrintPeriod = 15 * time.Second
  431. dbs := createNetworkDBInstances(t, 3, "node", config)
  432. // 2 Nodes join network
  433. err := dbs[0].JoinNetwork("network1")
  434. assert.NoError(t, err)
  435. err = dbs[1].JoinNetwork("network1")
  436. assert.NoError(t, err)
  437. for i := 0; i < keysWriteDelete; i++ {
  438. err = dbs[i%2].CreateEntry("testTable", "network1", "key-"+string(i), []byte("value"))
  439. assert.NoError(t, err)
  440. }
  441. time.Sleep(time.Second)
  442. for i := 0; i < keysWriteDelete; i++ {
  443. err = dbs[i%2].DeleteEntry("testTable", "network1", "key-"+string(i))
  444. assert.NoError(t, err)
  445. }
  446. for i := 0; i < 2; i++ {
  447. assert.Equal(t, keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries number should match")
  448. }
  449. // from this point the timer for the garbage collection started, wait 5 seconds and then join a new node
  450. time.Sleep(5 * time.Second)
  451. err = dbs[2].JoinNetwork("network1")
  452. assert.NoError(t, err)
  453. for i := 0; i < 3; i++ {
  454. assert.Equal(t, keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries number should match")
  455. }
  456. // at this point the entries should had been all deleted
  457. time.Sleep(30 * time.Second)
  458. for i := 0; i < 3; i++ {
  459. assert.Equal(t, 0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries should had been garbage collected")
  460. }
  461. // make sure that entries are not coming back
  462. time.Sleep(15 * time.Second)
  463. for i := 0; i < 3; i++ {
  464. assert.Equal(t, 0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries should had been garbage collected")
  465. }
  466. closeNetworkDBInstances(dbs)
  467. }
  468. func TestFindNode(t *testing.T) {
  469. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  470. dbs[0].nodes["active"] = &node{Node: memberlist.Node{Name: "active"}}
  471. dbs[0].failedNodes["failed"] = &node{Node: memberlist.Node{Name: "failed"}}
  472. dbs[0].leftNodes["left"] = &node{Node: memberlist.Node{Name: "left"}}
  473. // active nodes is 2 because the testing node is in the list
  474. assert.Equal(t, 2, len(dbs[0].nodes))
  475. assert.Equal(t, 1, len(dbs[0].failedNodes))
  476. assert.Equal(t, 1, len(dbs[0].leftNodes))
  477. n, currState, m := dbs[0].findNode("active")
  478. assert.NotNil(t, n)
  479. assert.Equal(t, "active", n.Name)
  480. assert.Equal(t, nodeActiveState, currState)
  481. assert.NotNil(t, m)
  482. // delete the entry manually
  483. delete(m, "active")
  484. // test if can be still find
  485. n, currState, m = dbs[0].findNode("active")
  486. assert.Nil(t, n)
  487. assert.Equal(t, nodeNotFound, currState)
  488. assert.Nil(t, m)
  489. n, currState, m = dbs[0].findNode("failed")
  490. assert.NotNil(t, n)
  491. assert.Equal(t, "failed", n.Name)
  492. assert.Equal(t, nodeFailedState, currState)
  493. assert.NotNil(t, m)
  494. // find and remove
  495. n, currState, m = dbs[0].findNode("left")
  496. assert.NotNil(t, n)
  497. assert.Equal(t, "left", n.Name)
  498. assert.Equal(t, nodeLeftState, currState)
  499. assert.NotNil(t, m)
  500. delete(m, "left")
  501. n, currState, m = dbs[0].findNode("left")
  502. assert.Nil(t, n)
  503. assert.Equal(t, nodeNotFound, currState)
  504. assert.Nil(t, m)
  505. closeNetworkDBInstances(dbs)
  506. }
  507. func TestChangeNodeState(t *testing.T) {
  508. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  509. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1"}}
  510. dbs[0].nodes["node2"] = &node{Node: memberlist.Node{Name: "node2"}}
  511. dbs[0].nodes["node3"] = &node{Node: memberlist.Node{Name: "node3"}}
  512. // active nodes is 4 because the testing node is in the list
  513. assert.Equal(t, 4, len(dbs[0].nodes))
  514. n, currState, m := dbs[0].findNode("node1")
  515. assert.NotNil(t, n)
  516. assert.Equal(t, nodeActiveState, currState)
  517. assert.Equal(t, "node1", n.Name)
  518. assert.NotNil(t, m)
  519. // node1 to failed
  520. dbs[0].changeNodeState("node1", nodeFailedState)
  521. n, currState, m = dbs[0].findNode("node1")
  522. assert.NotNil(t, n)
  523. assert.Equal(t, nodeFailedState, currState)
  524. assert.Equal(t, "node1", n.Name)
  525. assert.NotNil(t, m)
  526. assert.NotEqual(t, time.Duration(0), n.reapTime)
  527. // node1 back to active
  528. dbs[0].changeNodeState("node1", nodeActiveState)
  529. n, currState, m = dbs[0].findNode("node1")
  530. assert.NotNil(t, n)
  531. assert.Equal(t, nodeActiveState, currState)
  532. assert.Equal(t, "node1", n.Name)
  533. assert.NotNil(t, m)
  534. assert.Equal(t, time.Duration(0), n.reapTime)
  535. // node1 to left
  536. dbs[0].changeNodeState("node1", nodeLeftState)
  537. dbs[0].changeNodeState("node2", nodeLeftState)
  538. dbs[0].changeNodeState("node3", nodeLeftState)
  539. n, currState, m = dbs[0].findNode("node1")
  540. assert.NotNil(t, n)
  541. assert.Equal(t, nodeLeftState, currState)
  542. assert.Equal(t, "node1", n.Name)
  543. assert.NotNil(t, m)
  544. assert.NotEqual(t, time.Duration(0), n.reapTime)
  545. n, currState, m = dbs[0].findNode("node2")
  546. assert.NotNil(t, n)
  547. assert.Equal(t, nodeLeftState, currState)
  548. assert.Equal(t, "node2", n.Name)
  549. assert.NotNil(t, m)
  550. assert.NotEqual(t, time.Duration(0), n.reapTime)
  551. n, currState, m = dbs[0].findNode("node3")
  552. assert.NotNil(t, n)
  553. assert.Equal(t, nodeLeftState, currState)
  554. assert.Equal(t, "node3", n.Name)
  555. assert.NotNil(t, m)
  556. assert.NotEqual(t, time.Duration(0), n.reapTime)
  557. // active nodes is 1 because the testing node is in the list
  558. assert.Equal(t, 1, len(dbs[0].nodes))
  559. assert.Equal(t, 0, len(dbs[0].failedNodes))
  560. assert.Equal(t, 3, len(dbs[0].leftNodes))
  561. closeNetworkDBInstances(dbs)
  562. }
  563. func TestNodeReincarnation(t *testing.T) {
  564. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  565. dbs[0].nodes["node1"] = &node{Node: memberlist.Node{Name: "node1", Addr: net.ParseIP("192.168.1.1")}}
  566. dbs[0].leftNodes["node2"] = &node{Node: memberlist.Node{Name: "node2", Addr: net.ParseIP("192.168.1.2")}}
  567. dbs[0].failedNodes["node3"] = &node{Node: memberlist.Node{Name: "node3", Addr: net.ParseIP("192.168.1.3")}}
  568. // active nodes is 2 because the testing node is in the list
  569. assert.Equal(t, 2, len(dbs[0].nodes))
  570. assert.Equal(t, 1, len(dbs[0].failedNodes))
  571. assert.Equal(t, 1, len(dbs[0].leftNodes))
  572. b := dbs[0].purgeReincarnation(&memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")})
  573. assert.True(t, b)
  574. dbs[0].nodes["node4"] = &node{Node: memberlist.Node{Name: "node4", Addr: net.ParseIP("192.168.1.1")}}
  575. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.2")})
  576. assert.True(t, b)
  577. dbs[0].nodes["node5"] = &node{Node: memberlist.Node{Name: "node5", Addr: net.ParseIP("192.168.1.1")}}
  578. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.3")})
  579. assert.True(t, b)
  580. dbs[0].nodes["node6"] = &node{Node: memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.1")}}
  581. b = dbs[0].purgeReincarnation(&memberlist.Node{Name: "node6", Addr: net.ParseIP("192.168.1.10")})
  582. assert.False(t, b)
  583. // active nodes is 1 because the testing node is in the list
  584. assert.Equal(t, 4, len(dbs[0].nodes))
  585. assert.Equal(t, 0, len(dbs[0].failedNodes))
  586. assert.Equal(t, 3, len(dbs[0].leftNodes))
  587. closeNetworkDBInstances(dbs)
  588. }
  589. func TestParallelCreate(t *testing.T) {
  590. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  591. startCh := make(chan int)
  592. doneCh := make(chan error)
  593. var success int32
  594. for i := 0; i < 20; i++ {
  595. go func() {
  596. <-startCh
  597. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  598. if err == nil {
  599. atomic.AddInt32(&success, 1)
  600. }
  601. doneCh <- err
  602. }()
  603. }
  604. close(startCh)
  605. for i := 0; i < 20; i++ {
  606. <-doneCh
  607. }
  608. close(doneCh)
  609. // Only 1 write should have succeeded
  610. assert.Equal(t, int32(1), success)
  611. closeNetworkDBInstances(dbs)
  612. }
  613. func TestParallelDelete(t *testing.T) {
  614. dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig())
  615. err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value"))
  616. assert.NoError(t, err)
  617. startCh := make(chan int)
  618. doneCh := make(chan error)
  619. var success int32
  620. for i := 0; i < 20; i++ {
  621. go func() {
  622. <-startCh
  623. err := dbs[0].DeleteEntry("testTable", "testNetwork", "key")
  624. if err == nil {
  625. atomic.AddInt32(&success, 1)
  626. }
  627. doneCh <- err
  628. }()
  629. }
  630. close(startCh)
  631. for i := 0; i < 20; i++ {
  632. <-doneCh
  633. }
  634. close(doneCh)
  635. // Only 1 write should have succeeded
  636. assert.Equal(t, int32(1), success)
  637. closeNetworkDBInstances(dbs)
  638. }