networkdb_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. package networkdb
  2. import (
  3. "flag"
  4. "fmt"
  5. "io/ioutil"
  6. "log"
  7. "os"
  8. "sync/atomic"
  9. "testing"
  10. "time"
  11. "github.com/Sirupsen/logrus"
  12. "github.com/docker/go-events"
  13. "github.com/stretchr/testify/assert"
  14. "github.com/stretchr/testify/require"
  15. )
  16. var (
  17. dbPort int32 = 10000
  18. runningInContainer = flag.Bool("incontainer", false, "Indicates if the test is running in a container")
  19. )
  20. func TestMain(m *testing.M) {
  21. ioutil.WriteFile("/proc/sys/net/ipv6/conf/lo/disable_ipv6", []byte{'0', '\n'}, 0644)
  22. logrus.SetLevel(logrus.ErrorLevel)
  23. os.Exit(m.Run())
  24. }
  25. func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*NetworkDB {
  26. var dbs []*NetworkDB
  27. for i := 0; i < num; i++ {
  28. db, err := New(&Config{
  29. NodeName: fmt.Sprintf("%s%d", namePrefix, i+1),
  30. BindPort: int(atomic.AddInt32(&dbPort, 1)),
  31. })
  32. require.NoError(t, err)
  33. if i != 0 {
  34. err = db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)})
  35. assert.NoError(t, err)
  36. }
  37. dbs = append(dbs, db)
  38. }
  39. return dbs
  40. }
  41. func closeNetworkDBInstances(dbs []*NetworkDB) {
  42. for _, db := range dbs {
  43. db.Close()
  44. }
  45. }
  46. func (db *NetworkDB) verifyNodeExistence(t *testing.T, node string, present bool) {
  47. for i := 0; i < 80; i++ {
  48. db.RLock()
  49. _, ok := db.nodes[node]
  50. db.RUnlock()
  51. if present && ok {
  52. return
  53. }
  54. if !present && !ok {
  55. return
  56. }
  57. time.Sleep(50 * time.Millisecond)
  58. }
  59. assert.Fail(t, fmt.Sprintf("%s: Node existence verification for node %s failed", db.config.NodeName, node))
  60. }
  61. func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) {
  62. for i := 0; i < 80; i++ {
  63. db.RLock()
  64. nn, nnok := db.networks[node]
  65. db.RUnlock()
  66. if nnok {
  67. n, ok := nn[id]
  68. if present && ok {
  69. return
  70. }
  71. if !present &&
  72. ((ok && n.leaving) ||
  73. !ok) {
  74. return
  75. }
  76. }
  77. time.Sleep(50 * time.Millisecond)
  78. }
  79. assert.Fail(t, "Network existence verification failed")
  80. }
  81. func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value string, present bool) {
  82. n := 80
  83. for i := 0; i < n; i++ {
  84. entry, err := db.getEntry(tname, nid, key)
  85. if present && err == nil && string(entry.value) == value {
  86. return
  87. }
  88. if !present &&
  89. ((err == nil && entry.deleting) ||
  90. (err != nil)) {
  91. return
  92. }
  93. if i == n-1 && !present && err != nil {
  94. return
  95. }
  96. time.Sleep(50 * time.Millisecond)
  97. }
  98. assert.Fail(t, fmt.Sprintf("Entry existence verification test failed for %s", db.config.NodeName))
  99. }
  100. func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) {
  101. select {
  102. case rcvdEv := <-ch:
  103. assert.Equal(t, fmt.Sprintf("%T", rcvdEv), fmt.Sprintf("%T", ev))
  104. switch rcvdEv.(type) {
  105. case CreateEvent:
  106. assert.Equal(t, tname, rcvdEv.(CreateEvent).Table)
  107. assert.Equal(t, nid, rcvdEv.(CreateEvent).NetworkID)
  108. assert.Equal(t, key, rcvdEv.(CreateEvent).Key)
  109. assert.Equal(t, value, string(rcvdEv.(CreateEvent).Value))
  110. case UpdateEvent:
  111. assert.Equal(t, tname, rcvdEv.(UpdateEvent).Table)
  112. assert.Equal(t, nid, rcvdEv.(UpdateEvent).NetworkID)
  113. assert.Equal(t, key, rcvdEv.(UpdateEvent).Key)
  114. assert.Equal(t, value, string(rcvdEv.(UpdateEvent).Value))
  115. case DeleteEvent:
  116. assert.Equal(t, tname, rcvdEv.(DeleteEvent).Table)
  117. assert.Equal(t, nid, rcvdEv.(DeleteEvent).NetworkID)
  118. assert.Equal(t, key, rcvdEv.(DeleteEvent).Key)
  119. }
  120. case <-time.After(time.Second):
  121. t.Fail()
  122. return
  123. }
  124. }
  125. func TestNetworkDBSimple(t *testing.T) {
  126. dbs := createNetworkDBInstances(t, 2, "node")
  127. closeNetworkDBInstances(dbs)
  128. }
  129. func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
  130. dbs := createNetworkDBInstances(t, 2, "node")
  131. err := dbs[0].JoinNetwork("network1")
  132. assert.NoError(t, err)
  133. dbs[1].verifyNetworkExistence(t, "node1", "network1", true)
  134. err = dbs[0].LeaveNetwork("network1")
  135. assert.NoError(t, err)
  136. dbs[1].verifyNetworkExistence(t, "node1", "network1", false)
  137. closeNetworkDBInstances(dbs)
  138. }
  139. func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
  140. dbs := createNetworkDBInstances(t, 2, "node")
  141. n := 10
  142. for i := 1; i <= n; i++ {
  143. err := dbs[0].JoinNetwork(fmt.Sprintf("network0%d", i))
  144. assert.NoError(t, err)
  145. }
  146. for i := 1; i <= n; i++ {
  147. err := dbs[1].JoinNetwork(fmt.Sprintf("network1%d", i))
  148. assert.NoError(t, err)
  149. }
  150. for i := 1; i <= n; i++ {
  151. dbs[1].verifyNetworkExistence(t, "node1", fmt.Sprintf("network0%d", i), true)
  152. }
  153. for i := 1; i <= n; i++ {
  154. dbs[0].verifyNetworkExistence(t, "node2", fmt.Sprintf("network1%d", i), true)
  155. }
  156. for i := 1; i <= n; i++ {
  157. err := dbs[0].LeaveNetwork(fmt.Sprintf("network0%d", i))
  158. assert.NoError(t, err)
  159. }
  160. for i := 1; i <= n; i++ {
  161. err := dbs[1].LeaveNetwork(fmt.Sprintf("network1%d", i))
  162. assert.NoError(t, err)
  163. }
  164. for i := 1; i <= n; i++ {
  165. dbs[1].verifyNetworkExistence(t, "node1", fmt.Sprintf("network0%d", i), false)
  166. }
  167. for i := 1; i <= n; i++ {
  168. dbs[0].verifyNetworkExistence(t, "node2", fmt.Sprintf("network1%d", i), false)
  169. }
  170. closeNetworkDBInstances(dbs)
  171. }
  172. func TestNetworkDBCRUDTableEntry(t *testing.T) {
  173. dbs := createNetworkDBInstances(t, 3, "node")
  174. err := dbs[0].JoinNetwork("network1")
  175. assert.NoError(t, err)
  176. dbs[1].verifyNetworkExistence(t, "node1", "network1", true)
  177. err = dbs[1].JoinNetwork("network1")
  178. assert.NoError(t, err)
  179. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  180. assert.NoError(t, err)
  181. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  182. dbs[2].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  183. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  184. assert.NoError(t, err)
  185. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  186. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  187. assert.NoError(t, err)
  188. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  189. closeNetworkDBInstances(dbs)
  190. }
  191. func TestNetworkDBCRUDTableEntries(t *testing.T) {
  192. dbs := createNetworkDBInstances(t, 2, "node")
  193. err := dbs[0].JoinNetwork("network1")
  194. assert.NoError(t, err)
  195. dbs[1].verifyNetworkExistence(t, "node1", "network1", true)
  196. err = dbs[1].JoinNetwork("network1")
  197. assert.NoError(t, err)
  198. n := 10
  199. for i := 1; i <= n; i++ {
  200. err = dbs[0].CreateEntry("test_table", "network1",
  201. fmt.Sprintf("test_key0%d", i),
  202. []byte(fmt.Sprintf("test_value0%d", i)))
  203. assert.NoError(t, err)
  204. }
  205. for i := 1; i <= n; i++ {
  206. err = dbs[1].CreateEntry("test_table", "network1",
  207. fmt.Sprintf("test_key1%d", i),
  208. []byte(fmt.Sprintf("test_value1%d", i)))
  209. assert.NoError(t, err)
  210. }
  211. for i := 1; i <= n; i++ {
  212. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  213. fmt.Sprintf("test_key1%d", i),
  214. fmt.Sprintf("test_value1%d", i), true)
  215. assert.NoError(t, err)
  216. }
  217. for i := 1; i <= n; i++ {
  218. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  219. fmt.Sprintf("test_key0%d", i),
  220. fmt.Sprintf("test_value0%d", i), true)
  221. assert.NoError(t, err)
  222. }
  223. // Verify deletes
  224. for i := 1; i <= n; i++ {
  225. err = dbs[0].DeleteEntry("test_table", "network1",
  226. fmt.Sprintf("test_key0%d", i))
  227. assert.NoError(t, err)
  228. }
  229. for i := 1; i <= n; i++ {
  230. err = dbs[1].DeleteEntry("test_table", "network1",
  231. fmt.Sprintf("test_key1%d", i))
  232. assert.NoError(t, err)
  233. }
  234. for i := 1; i <= n; i++ {
  235. dbs[0].verifyEntryExistence(t, "test_table", "network1",
  236. fmt.Sprintf("test_key1%d", i), "", false)
  237. assert.NoError(t, err)
  238. }
  239. for i := 1; i <= n; i++ {
  240. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  241. fmt.Sprintf("test_key0%d", i), "", false)
  242. assert.NoError(t, err)
  243. }
  244. closeNetworkDBInstances(dbs)
  245. }
  246. func TestNetworkDBNodeLeave(t *testing.T) {
  247. dbs := createNetworkDBInstances(t, 2, "node")
  248. err := dbs[0].JoinNetwork("network1")
  249. assert.NoError(t, err)
  250. err = dbs[1].JoinNetwork("network1")
  251. assert.NoError(t, err)
  252. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  253. assert.NoError(t, err)
  254. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  255. dbs[0].Close()
  256. dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
  257. dbs[1].Close()
  258. }
  259. func TestNetworkDBWatch(t *testing.T) {
  260. dbs := createNetworkDBInstances(t, 2, "node")
  261. err := dbs[0].JoinNetwork("network1")
  262. assert.NoError(t, err)
  263. err = dbs[1].JoinNetwork("network1")
  264. assert.NoError(t, err)
  265. ch, cancel := dbs[1].Watch("", "", "")
  266. err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  267. assert.NoError(t, err)
  268. testWatch(t, ch.C, CreateEvent{}, "test_table", "network1", "test_key", "test_value")
  269. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  270. assert.NoError(t, err)
  271. testWatch(t, ch.C, UpdateEvent{}, "test_table", "network1", "test_key", "test_updated_value")
  272. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  273. assert.NoError(t, err)
  274. testWatch(t, ch.C, DeleteEvent{}, "test_table", "network1", "test_key", "")
  275. cancel()
  276. closeNetworkDBInstances(dbs)
  277. }
  278. func TestNetworkDBBulkSync(t *testing.T) {
  279. dbs := createNetworkDBInstances(t, 2, "node")
  280. err := dbs[0].JoinNetwork("network1")
  281. assert.NoError(t, err)
  282. dbs[1].verifyNetworkExistence(t, "node1", "network1", true)
  283. n := 1000
  284. for i := 1; i <= n; i++ {
  285. err = dbs[0].CreateEntry("test_table", "network1",
  286. fmt.Sprintf("test_key0%d", i),
  287. []byte(fmt.Sprintf("test_value0%d", i)))
  288. assert.NoError(t, err)
  289. }
  290. err = dbs[1].JoinNetwork("network1")
  291. assert.NoError(t, err)
  292. dbs[0].verifyNetworkExistence(t, "node2", "network1", true)
  293. for i := 1; i <= n; i++ {
  294. dbs[1].verifyEntryExistence(t, "test_table", "network1",
  295. fmt.Sprintf("test_key0%d", i),
  296. fmt.Sprintf("test_value0%d", i), true)
  297. assert.NoError(t, err)
  298. }
  299. closeNetworkDBInstances(dbs)
  300. }
  301. func TestNetworkDBCRUDMediumCluster(t *testing.T) {
  302. n := 5
  303. dbs := createNetworkDBInstances(t, n, "node")
  304. for i := 0; i < n; i++ {
  305. for j := 0; j < n; j++ {
  306. if i == j {
  307. continue
  308. }
  309. dbs[i].verifyNodeExistence(t, fmt.Sprintf("node%d", j+1), true)
  310. }
  311. }
  312. for i := 0; i < n; i++ {
  313. err := dbs[i].JoinNetwork("network1")
  314. assert.NoError(t, err)
  315. }
  316. for i := 0; i < n; i++ {
  317. for j := 0; j < n; j++ {
  318. dbs[i].verifyNetworkExistence(t, fmt.Sprintf("node%d", j+1), "network1", true)
  319. }
  320. }
  321. err := dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
  322. assert.NoError(t, err)
  323. for i := 1; i < n; i++ {
  324. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
  325. }
  326. err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
  327. assert.NoError(t, err)
  328. for i := 1; i < n; i++ {
  329. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
  330. }
  331. err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
  332. assert.NoError(t, err)
  333. for i := 1; i < n; i++ {
  334. dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
  335. }
  336. log.Print("Closing DB instances...")
  337. closeNetworkDBInstances(dbs)
  338. }