networkdb_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. package networkdb
  2. import (
  3. "flag"
  4. "fmt"
  5. "log"
  6. "os"
  7. "sync/atomic"
  8. "testing"
  9. "time"
  10. "github.com/Sirupsen/logrus"
  11. "github.com/docker/go-events"
  12. "github.com/stretchr/testify/assert"
  13. "github.com/stretchr/testify/require"
  14. )
  15. var (
  16. dbPort int32 = 10000
  17. runningInContainer = flag.Bool("incontainer", false, "Indicates if the test is running in a container")
  18. )
  19. func TestMain(m *testing.M) {
  20. logrus.SetLevel(logrus.ErrorLevel)
  21. os.Exit(m.Run())
  22. }
  23. func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*NetworkDB {
  24. var dbs []*NetworkDB
  25. for i := 0; i < num; i++ {
  26. db, err := New(&Config{
  27. NodeName: fmt.Sprintf("%s%d", namePrefix, i+1),
  28. BindPort: int(atomic.AddInt32(&dbPort, 1)),
  29. })
  30. require.NoError(t, err)
  31. if i != 0 {
  32. err = db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)})
  33. assert.NoError(t, err)
  34. }
  35. dbs = append(dbs, db)
  36. }
  37. return dbs
  38. }
  39. func closeNetworkDBInstances(dbs []*NetworkDB) {
  40. for _, db := range dbs {
  41. db.Close()
  42. }
  43. }
  44. func (db *NetworkDB) verifyNodeExistence(t *testing.T, node string, present bool) {
  45. for i := 0; i < 80; i++ {
  46. _, ok := db.nodes[node]
  47. if present && ok {
  48. return
  49. }
  50. if !present && !ok {
  51. return
  52. }
  53. time.Sleep(50 * time.Millisecond)
  54. }
  55. assert.Fail(t, fmt.Sprintf("%s: Node existence verification for node %s failed", db.config.NodeName, node))
  56. }
  57. func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) {
  58. for i := 0; i < 80; i++ {
  59. if nn, nnok := db.networks[node]; nnok {
  60. n, ok := nn[id]
  61. if present && ok {
  62. return
  63. }
  64. if !present &&
  65. ((ok && n.leaving) ||
  66. !ok) {
  67. return
  68. }
  69. }
  70. time.Sleep(50 * time.Millisecond)
  71. }
  72. assert.Fail(t, "Network existence verification failed")
  73. }
  74. func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value string, present bool) {
  75. n := 80
  76. for i := 0; i < n; i++ {
  77. entry, err := db.getEntry(tname, nid, key)
  78. if present && err == nil && string(entry.value) == value {
  79. return
  80. }
  81. if !present &&
  82. ((err == nil && entry.deleting) ||
  83. (err != nil)) {
  84. return
  85. }
  86. if i == n-1 && !present && err != nil {
  87. return
  88. }
  89. time.Sleep(50 * time.Millisecond)
  90. }
  91. assert.Fail(t, fmt.Sprintf("Entry existence verification test failed for %s", db.config.NodeName))
  92. }
  93. func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) {
  94. select {
  95. case rcvdEv := <-ch:
  96. assert.Equal(t, fmt.Sprintf("%T", rcvdEv), fmt.Sprintf("%T", ev))
  97. switch rcvdEv.(type) {
  98. case CreateEvent:
  99. assert.Equal(t, tname, rcvdEv.(CreateEvent).Table)
  100. assert.Equal(t, nid, rcvdEv.(CreateEvent).NetworkID)
  101. assert.Equal(t, key, rcvdEv.(CreateEvent).Key)
  102. assert.Equal(t, value, string(rcvdEv.(CreateEvent).Value))
  103. case UpdateEvent:
  104. assert.Equal(t, tname, rcvdEv.(UpdateEvent).Table)
  105. assert.Equal(t, nid, rcvdEv.(UpdateEvent).NetworkID)
  106. assert.Equal(t, key, rcvdEv.(UpdateEvent).Key)
  107. assert.Equal(t, value, string(rcvdEv.(UpdateEvent).Value))
  108. case DeleteEvent:
  109. assert.Equal(t, tname, rcvdEv.(DeleteEvent).Table)
  110. assert.Equal(t, nid, rcvdEv.(DeleteEvent).NetworkID)
  111. assert.Equal(t, key, rcvdEv.(DeleteEvent).Key)
  112. }
  113. case <-time.After(time.Second):
  114. t.Fail()
  115. return
  116. }
  117. }
  118. func TestNetworkDBSimple(t *testing.T) {
  119. dbs := createNetworkDBInstances(t, 2, "node")
  120. closeNetworkDBInstances(dbs)
  121. }
  122. func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
  123. dbs := createNetworkDBInstances(t, 2, "node")
  124. err := dbs[0].JoinNetwork("network1")
  125. assert.NoError(t, err)
  126. dbs[1].verifyNetworkExistence(t, "node1", "network1", true)
  127. err = dbs[0].LeaveNetwork("network1")
  128. assert.NoError(t, err)
  129. dbs[1].verifyNetworkExistence(t, "node1", "network1", false)
  130. closeNetworkDBInstances(dbs)
  131. }
  132. func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
  133. dbs := createNetworkDBInstances(t, 2, "node")
  134. n := 10
  135. for i := 1; i <= n; i++ {
  136. err := dbs[0].JoinNetwork(fmt.Sprintf("network0%d", i))
  137. assert.NoError(t, err)
  138. }
  139. for i := 1; i <= n; i++ {
  140. err := dbs[1].JoinNetwork(fmt.Sprintf("network1%d", i))
  141. assert.NoError(t, err)
  142. }
  143. for i := 1; i <= n; i++ {
  144. dbs[1].verifyNetworkExistence(t, "node1", fmt.Sprintf("network0%d", i), true)
  145. }
  146. for i := 1; i <= n; i++ {
  147. dbs[0].verifyNetworkExistence(t, "node2", fmt.Sprintf("network1%d", i), true)
  148. }
  149. for i := 1; i <= n; i++ {
  150. err := dbs[0].LeaveNetwork(fmt.Sprintf("network0%d", i))
  151. assert.NoError(t, err)
  152. }
  153. for i := 1; i <= n; i++ {
  154. err := dbs[1].LeaveNetwork(fmt.Sprintf("network1%d", i))
  155. assert.NoError(t, err)
  156. }
  157. for i := 1; i <= n; i++ {
  158. dbs[1].verifyNetworkExistence(t, "node1", fmt.Sprintf("network0%d", i), false)
  159. }
  160. for i := 1; i <= n; i++ {
  161. dbs[0].verifyNetworkExistence(t, "node2", fmt.Sprintf("network1%d", i), false)
  162. }
  163. closeNetworkDBInstances(dbs)
  164. }
  165. func TestNetworkDBCRUDTableEntry(t *testing.T) {
  166. dbs := createNetworkDBInstances(t, 3, "node")
  167. err := dbs[0].JoinNetwork("network1")
  168. assert.NoError(t, err)
  169. dbs[1].verifyNetworkExistence(t, "node1", "network1", true)
  170. err = dbs[1].JoinNetwork("network1")
  171. assert.NoError(t, err)
  172. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  173. assert.NoError(t, err)
  174. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  175. dbs[2].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  176. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  177. assert.NoError(t, err)
  178. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  179. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  180. assert.NoError(t, err)
  181. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  182. closeNetworkDBInstances(dbs)
  183. }
  184. func TestNetworkDBCRUDTableEntries(t *testing.T) {
  185. dbs := createNetworkDBInstances(t, 2, "node")
  186. err := dbs[0].JoinNetwork("network1")
  187. assert.NoError(t, err)
  188. dbs[1].verifyNetworkExistence(t, "node1", "network1", true)
  189. err = dbs[1].JoinNetwork("network1")
  190. assert.NoError(t, err)
  191. n := 10
  192. for i := 1; i <= n; i++ {
  193. err = dbs[0].CreateEntry("test_table", "network1",
  194. fmt.Sprintf("test_key0%d", i),
  195. []byte(fmt.Sprintf("test_value0%d", i)))
  196. assert.NoError(t, err)
  197. }
  198. for i := 1; i <= n; i++ {
  199. err = dbs[1].CreateEntry("test_table", "network1",
  200. fmt.Sprintf("test_key1%d", i),
  201. []byte(fmt.Sprintf("test_value1%d", i)))
  202. assert.NoError(t, err)
  203. }
  204. for i := 1; i <= n; i++ {
  205. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  206. fmt.Sprintf("test_key1%d", i),
  207. fmt.Sprintf("test_value1%d", i), true)
  208. assert.NoError(t, err)
  209. }
  210. for i := 1; i <= n; i++ {
  211. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  212. fmt.Sprintf("test_key0%d", i),
  213. fmt.Sprintf("test_value0%d", i), true)
  214. assert.NoError(t, err)
  215. }
  216. // Verify deletes
  217. for i := 1; i <= n; i++ {
  218. err = dbs[0].DeleteEntry("test_table", "network1",
  219. fmt.Sprintf("test_key0%d", i))
  220. assert.NoError(t, err)
  221. }
  222. for i := 1; i <= n; i++ {
  223. err = dbs[1].DeleteEntry("test_table", "network1",
  224. fmt.Sprintf("test_key1%d", i))
  225. assert.NoError(t, err)
  226. }
  227. for i := 1; i <= n; i++ {
  228. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  229. fmt.Sprintf("test_key1%d", i), "", false)
  230. assert.NoError(t, err)
  231. }
  232. for i := 1; i <= n; i++ {
  233. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  234. fmt.Sprintf("test_key0%d", i), "", false)
  235. assert.NoError(t, err)
  236. }
  237. closeNetworkDBInstances(dbs)
  238. }
  239. func TestNetworkDBNodeLeave(t *testing.T) {
  240. dbs := createNetworkDBInstances(t, 2, "node")
  241. err := dbs[0].JoinNetwork("network1")
  242. assert.NoError(t, err)
  243. err = dbs[1].JoinNetwork("network1")
  244. assert.NoError(t, err)
  245. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  246. assert.NoError(t, err)
  247. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  248. dbs[0].Close()
  249. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  250. dbs[1].Close()
  251. }
  252. func TestNetworkDBWatch(t *testing.T) {
  253. dbs := createNetworkDBInstances(t, 2, "node")
  254. err := dbs[0].JoinNetwork("network1")
  255. assert.NoError(t, err)
  256. err = dbs[1].JoinNetwork("network1")
  257. assert.NoError(t, err)
  258. ch, cancel := dbs[1].Watch("", "", "")
  259. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  260. assert.NoError(t, err)
  261. testWatch(t, ch, CreateEvent{}, "test_table", "network1", "test_key", "test_value")
  262. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  263. assert.NoError(t, err)
  264. testWatch(t, ch, UpdateEvent{}, "test_table", "network1", "test_key", "test_updated_value")
  265. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  266. assert.NoError(t, err)
  267. testWatch(t, ch, DeleteEvent{}, "test_table", "network1", "test_key", "")
  268. cancel()
  269. closeNetworkDBInstances(dbs)
  270. }
  271. func TestNetworkDBBulkSync(t *testing.T) {
  272. dbs := createNetworkDBInstances(t, 2, "node")
  273. err := dbs[0].JoinNetwork("network1")
  274. assert.NoError(t, err)
  275. dbs[1].verifyNetworkExistence(t, "node1", "network1", true)
  276. n := 1000
  277. for i := 1; i <= n; i++ {
  278. err = dbs[0].CreateEntry("test_table", "network1",
  279. fmt.Sprintf("test_key0%d", i),
  280. []byte(fmt.Sprintf("test_value0%d", i)))
  281. assert.NoError(t, err)
  282. }
  283. err = dbs[1].JoinNetwork("network1")
  284. assert.NoError(t, err)
  285. dbs[0].verifyNetworkExistence(t, "node2", "network1", true)
  286. for i := 1; i <= n; i++ {
  287. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  288. fmt.Sprintf("test_key0%d", i),
  289. fmt.Sprintf("test_value0%d", i), true)
  290. assert.NoError(t, err)
  291. }
  292. closeNetworkDBInstances(dbs)
  293. }
  294. func TestNetworkDBCRUDMediumCluster(t *testing.T) {
  295. n := 5
  296. dbs := createNetworkDBInstances(t, n, "node")
  297. for i := 0; i < n; i++ {
  298. for j := 0; j < n; j++ {
  299. if i == j {
  300. continue
  301. }
  302. dbs[i].verifyNodeExistence(t, fmt.Sprintf("node%d", j+1), true)
  303. }
  304. }
  305. for i := 0; i < n; i++ {
  306. err := dbs[i].JoinNetwork("network1")
  307. assert.NoError(t, err)
  308. }
  309. for i := 0; i < n; i++ {
  310. for j := 0; j < n; j++ {
  311. dbs[i].verifyNetworkExistence(t, fmt.Sprintf("node%d", j+1), "network1", true)
  312. }
  313. }
  314. err := dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  315. assert.NoError(t, err)
  316. for i := 1; i < n; i++ {
  317. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  318. }
  319. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  320. assert.NoError(t, err)
  321. for i := 1; i < n; i++ {
  322. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  323. }
  324. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  325. assert.NoError(t, err)
  326. for i := 1; i < n; i++ {
  327. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  328. }
  329. log.Printf("Closing DB instances...")
  330. closeNetworkDBInstances(dbs)
  331. }