ndbClient.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714
  1. package dbclient
  2. import (
  3. "context"
  4. "io/ioutil"
  5. "log"
  6. "net"
  7. "net/http"
  8. "os"
  9. "regexp"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/sirupsen/logrus"
  14. )
  15. var servicePort string
  16. const totalWrittenKeys string = "totalKeys"
  17. type resultTuple struct {
  18. id string
  19. result int
  20. }
  21. func httpGetFatalError(ip, port, path string) {
  22. // for {
  23. body, err := httpGet(ip, port, path)
  24. if err != nil || !strings.Contains(string(body), "OK") {
  25. // if strings.Contains(err.Error(), "EOF") {
  26. // logrus.Warnf("Got EOF path:%s err:%s", path, err)
  27. // continue
  28. // }
  29. log.Fatalf("[%s] error %s %s", path, err, body)
  30. }
  31. // break
  32. // }
  33. }
  34. func httpGet(ip, port, path string) ([]byte, error) {
  35. resp, err := http.Get("http://" + ip + ":" + port + path)
  36. if err != nil {
  37. logrus.Errorf("httpGet error:%s", err)
  38. return nil, err
  39. }
  40. defer resp.Body.Close()
  41. body, err := ioutil.ReadAll(resp.Body)
  42. return body, err
  43. }
  44. func joinCluster(ip, port string, members []string, doneCh chan resultTuple) {
  45. httpGetFatalError(ip, port, "/join?members="+strings.Join(members, ","))
  46. if doneCh != nil {
  47. doneCh <- resultTuple{id: ip, result: 0}
  48. }
  49. }
  50. func joinNetwork(ip, port, network string, doneCh chan resultTuple) {
  51. httpGetFatalError(ip, port, "/joinnetwork?nid="+network)
  52. if doneCh != nil {
  53. doneCh <- resultTuple{id: ip, result: 0}
  54. }
  55. }
  56. func leaveNetwork(ip, port, network string, doneCh chan resultTuple) {
  57. httpGetFatalError(ip, port, "/leavenetwork?nid="+network)
  58. if doneCh != nil {
  59. doneCh <- resultTuple{id: ip, result: 0}
  60. }
  61. }
  62. func writeTableKey(ip, port, networkName, tableName, key string) {
  63. createPath := "/createentry?unsafe&nid=" + networkName + "&tname=" + tableName + "&value=v&key="
  64. httpGetFatalError(ip, port, createPath+key)
  65. }
  66. func deleteTableKey(ip, port, networkName, tableName, key string) {
  67. deletePath := "/deleteentry?nid=" + networkName + "&tname=" + tableName + "&key="
  68. httpGetFatalError(ip, port, deletePath+key)
  69. }
  70. func clusterPeersNumber(ip, port string, doneCh chan resultTuple) {
  71. body, err := httpGet(ip, port, "/clusterpeers")
  72. if err != nil {
  73. logrus.Errorf("clusterPeers %s there was an error: %s\n", ip, err)
  74. doneCh <- resultTuple{id: ip, result: -1}
  75. return
  76. }
  77. peersRegexp := regexp.MustCompile(`total entries: ([0-9]+)`)
  78. peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1])
  79. doneCh <- resultTuple{id: ip, result: peersNum}
  80. }
  81. func networkPeersNumber(ip, port, networkName string, doneCh chan resultTuple) {
  82. body, err := httpGet(ip, port, "/networkpeers?nid="+networkName)
  83. if err != nil {
  84. logrus.Errorf("networkPeersNumber %s there was an error: %s\n", ip, err)
  85. doneCh <- resultTuple{id: ip, result: -1}
  86. return
  87. }
  88. peersRegexp := regexp.MustCompile(`total entries: ([0-9]+)`)
  89. peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1])
  90. doneCh <- resultTuple{id: ip, result: peersNum}
  91. }
  92. func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan resultTuple) {
  93. body, err := httpGet(ip, port, "/gettable?nid="+networkName+"&tname="+tableName)
  94. if err != nil {
  95. logrus.Errorf("tableEntriesNumber %s there was an error: %s\n", ip, err)
  96. doneCh <- resultTuple{id: ip, result: -1}
  97. return
  98. }
  99. elementsRegexp := regexp.MustCompile(`total entries: ([0-9]+)`)
  100. entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
  101. doneCh <- resultTuple{id: ip, result: entriesNum}
  102. }
  103. func clientWatchTable(ip, port, networkName, tableName string, doneCh chan resultTuple) {
  104. httpGetFatalError(ip, port, "/watchtable?nid="+networkName+"&tname="+tableName)
  105. if doneCh != nil {
  106. doneCh <- resultTuple{id: ip, result: 0}
  107. }
  108. }
  109. func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan resultTuple) {
  110. body, err := httpGet(ip, port, "/watchedtableentries?nid="+networkName+"&tname="+tableName)
  111. if err != nil {
  112. logrus.Errorf("clientTableEntriesNumber %s there was an error: %s\n", ip, err)
  113. doneCh <- resultTuple{id: ip, result: -1}
  114. return
  115. }
  116. elementsRegexp := regexp.MustCompile(`total elements: ([0-9]+)`)
  117. entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
  118. doneCh <- resultTuple{id: ip, result: entriesNum}
  119. }
  120. func writeUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
  121. for x := 0; ; x++ {
  122. select {
  123. case <-ctx.Done():
  124. doneCh <- resultTuple{id: ip, result: x}
  125. return
  126. default:
  127. k := key + strconv.Itoa(x)
  128. // write key
  129. writeTableKey(ip, port, networkName, tableName, k)
  130. // give time to send out key writes
  131. time.Sleep(100 * time.Millisecond)
  132. }
  133. }
  134. }
  135. func writeDeleteUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
  136. for x := 0; ; x++ {
  137. select {
  138. case <-ctx.Done():
  139. doneCh <- resultTuple{id: ip, result: x}
  140. return
  141. default:
  142. k := key + strconv.Itoa(x)
  143. // write key
  144. writeTableKey(ip, port, networkName, tableName, k)
  145. // give time to send out key writes
  146. time.Sleep(100 * time.Millisecond)
  147. // delete key
  148. deleteTableKey(ip, port, networkName, tableName, k)
  149. }
  150. }
  151. }
  152. func writeDeleteLeaveJoin(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
  153. for x := 0; ; x++ {
  154. select {
  155. case <-ctx.Done():
  156. doneCh <- resultTuple{id: ip, result: x}
  157. return
  158. default:
  159. k := key + strconv.Itoa(x)
  160. // write key
  161. writeTableKey(ip, port, networkName, tableName, k)
  162. time.Sleep(100 * time.Millisecond)
  163. // delete key
  164. deleteTableKey(ip, port, networkName, tableName, k)
  165. // give some time
  166. time.Sleep(100 * time.Millisecond)
  167. // leave network
  168. leaveNetwork(ip, port, networkName, nil)
  169. // join network
  170. joinNetwork(ip, port, networkName, nil)
  171. }
  172. }
  173. }
  174. func ready(ip, port string, doneCh chan resultTuple) {
  175. for {
  176. body, err := httpGet(ip, port, "/ready")
  177. if err != nil || !strings.Contains(string(body), "OK") {
  178. time.Sleep(500 * time.Millisecond)
  179. continue
  180. }
  181. // success
  182. break
  183. }
  184. // notify the completion
  185. doneCh <- resultTuple{id: ip, result: 0}
  186. }
  187. func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) {
  188. startTime := time.Now().UnixNano()
  189. var successTime int64
  190. // Loop for 2 minutes to guartee that the result is stable
  191. for {
  192. select {
  193. case <-ctx.Done():
  194. // Validate test success, if the time is set means that all the tables are empty
  195. if successTime != 0 {
  196. logrus.Infof("Check table passed, the cluster converged in %d msec", time.Duration(successTime-startTime)/time.Millisecond)
  197. return
  198. }
  199. log.Fatal("Test failed, there is still entries in the tables of the nodes")
  200. default:
  201. logrus.Infof("Checking table %s expected %d", tableName, expectedEntries)
  202. doneCh := make(chan resultTuple, len(ips))
  203. for _, ip := range ips {
  204. go fn(ip, servicePort, networkName, tableName, doneCh)
  205. }
  206. nodesWithCorrectEntriesNum := 0
  207. for i := len(ips); i > 0; i-- {
  208. tableEntries := <-doneCh
  209. logrus.Infof("Node %s has %d entries", tableEntries.id, tableEntries.result)
  210. if tableEntries.result == expectedEntries {
  211. nodesWithCorrectEntriesNum++
  212. }
  213. }
  214. close(doneCh)
  215. if nodesWithCorrectEntriesNum == len(ips) {
  216. if successTime == 0 {
  217. successTime = time.Now().UnixNano()
  218. logrus.Infof("Success after %d msec", time.Duration(successTime-startTime)/time.Millisecond)
  219. }
  220. } else {
  221. successTime = 0
  222. }
  223. time.Sleep(10 * time.Second)
  224. }
  225. }
  226. }
  227. func waitWriters(parallelWriters int, mustWrite bool, doneCh chan resultTuple) map[string]int {
  228. var totalKeys int
  229. resultTable := make(map[string]int)
  230. for i := 0; i < parallelWriters; i++ {
  231. logrus.Infof("Waiting for %d workers", parallelWriters-i)
  232. workerReturn := <-doneCh
  233. totalKeys += workerReturn.result
  234. if mustWrite && workerReturn.result == 0 {
  235. log.Fatalf("The worker %s did not write any key %d == 0", workerReturn.id, workerReturn.result)
  236. }
  237. if !mustWrite && workerReturn.result != 0 {
  238. log.Fatalf("The worker %s was supposed to return 0 instead %d != 0", workerReturn.id, workerReturn.result)
  239. }
  240. if mustWrite {
  241. resultTable[workerReturn.id] = workerReturn.result
  242. logrus.Infof("The worker %s wrote %d keys", workerReturn.id, workerReturn.result)
  243. }
  244. }
  245. resultTable[totalWrittenKeys] = totalKeys
  246. return resultTable
  247. }
  248. // ready
  249. func doReady(ips []string) {
  250. doneCh := make(chan resultTuple, len(ips))
  251. // check all the nodes
  252. for _, ip := range ips {
  253. go ready(ip, servicePort, doneCh)
  254. }
  255. // wait for the readiness of all nodes
  256. for i := len(ips); i > 0; i-- {
  257. <-doneCh
  258. }
  259. close(doneCh)
  260. }
  261. // join
  262. func doJoin(ips []string) {
  263. doneCh := make(chan resultTuple, len(ips))
  264. // check all the nodes
  265. for i, ip := range ips {
  266. members := append([]string(nil), ips[:i]...)
  267. members = append(members, ips[i+1:]...)
  268. go joinCluster(ip, servicePort, members, doneCh)
  269. }
  270. // wait for the readiness of all nodes
  271. for i := len(ips); i > 0; i-- {
  272. <-doneCh
  273. }
  274. close(doneCh)
  275. }
  276. // cluster-peers expectedNumberPeers maxRetry
  277. func doClusterPeers(ips []string, args []string) {
  278. doneCh := make(chan resultTuple, len(ips))
  279. expectedPeers, _ := strconv.Atoi(args[0])
  280. maxRetry, _ := strconv.Atoi(args[1])
  281. for retry := 0; retry < maxRetry; retry++ {
  282. // check all the nodes
  283. for _, ip := range ips {
  284. go clusterPeersNumber(ip, servicePort, doneCh)
  285. }
  286. var failed bool
  287. // wait for the readiness of all nodes
  288. for i := len(ips); i > 0; i-- {
  289. node := <-doneCh
  290. if node.result != expectedPeers {
  291. failed = true
  292. if retry == maxRetry-1 {
  293. log.Fatalf("Expected peers from %s missmatch %d != %d", node.id, expectedPeers, node.result)
  294. } else {
  295. logrus.Warnf("Expected peers from %s missmatch %d != %d", node.id, expectedPeers, node.result)
  296. }
  297. time.Sleep(1 * time.Second)
  298. }
  299. }
  300. // check if needs retry
  301. if !failed {
  302. break
  303. }
  304. }
  305. close(doneCh)
  306. }
  307. // join-network networkName
  308. func doJoinNetwork(ips []string, args []string) {
  309. doneCh := make(chan resultTuple, len(ips))
  310. // check all the nodes
  311. for _, ip := range ips {
  312. go joinNetwork(ip, servicePort, args[0], doneCh)
  313. }
  314. // wait for the readiness of all nodes
  315. for i := len(ips); i > 0; i-- {
  316. <-doneCh
  317. }
  318. close(doneCh)
  319. }
  320. // leave-network networkName
  321. func doLeaveNetwork(ips []string, args []string) {
  322. doneCh := make(chan resultTuple, len(ips))
  323. // check all the nodes
  324. for _, ip := range ips {
  325. go leaveNetwork(ip, servicePort, args[0], doneCh)
  326. }
  327. // wait for the readiness of all nodes
  328. for i := len(ips); i > 0; i-- {
  329. <-doneCh
  330. }
  331. close(doneCh)
  332. }
  333. // network-peers networkName expectedNumberPeers maxRetry
  334. func doNetworkPeers(ips []string, args []string) {
  335. doneCh := make(chan resultTuple, len(ips))
  336. networkName := args[0]
  337. expectedPeers, _ := strconv.Atoi(args[1])
  338. maxRetry, _ := strconv.Atoi(args[2])
  339. for retry := 0; retry < maxRetry; retry++ {
  340. // check all the nodes
  341. for _, ip := range ips {
  342. go networkPeersNumber(ip, servicePort, networkName, doneCh)
  343. }
  344. var failed bool
  345. // wait for the readiness of all nodes
  346. for i := len(ips); i > 0; i-- {
  347. node := <-doneCh
  348. if node.result != expectedPeers {
  349. failed = true
  350. if retry == maxRetry-1 {
  351. log.Fatalf("Expected peers from %s missmatch %d != %d", node.id, expectedPeers, node.result)
  352. } else {
  353. logrus.Warnf("Expected peers from %s missmatch %d != %d", node.id, expectedPeers, node.result)
  354. }
  355. time.Sleep(1 * time.Second)
  356. }
  357. }
  358. // check if needs retry
  359. if !failed {
  360. break
  361. }
  362. }
  363. close(doneCh)
  364. }
  365. // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
  366. func doWriteDeleteUniqueKeys(ips []string, args []string) {
  367. networkName := args[0]
  368. tableName := args[1]
  369. parallelWriters, _ := strconv.Atoi(args[2])
  370. writeTimeSec, _ := strconv.Atoi(args[3])
  371. doneCh := make(chan resultTuple, parallelWriters)
  372. // Enable watch of tables from clients
  373. for i := 0; i < parallelWriters; i++ {
  374. go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
  375. }
  376. waitWriters(parallelWriters, false, doneCh)
  377. // Start parallel writers that will create and delete unique keys
  378. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  379. for i := 0; i < parallelWriters; i++ {
  380. key := "key-" + strconv.Itoa(i) + "-"
  381. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  382. go writeDeleteUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  383. }
  384. // Sync with all the writers
  385. keyMap := waitWriters(parallelWriters, true, doneCh)
  386. cancel()
  387. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  388. // check table entries for 2 minutes
  389. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  390. checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
  391. cancel()
  392. ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
  393. checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber)
  394. cancel()
  395. }
  396. // write-unique-keys networkName tableName numParallelWriters writeTimeSec
  397. func doWriteUniqueKeys(ips []string, args []string) {
  398. networkName := args[0]
  399. tableName := args[1]
  400. parallelWriters, _ := strconv.Atoi(args[2])
  401. writeTimeSec, _ := strconv.Atoi(args[3])
  402. doneCh := make(chan resultTuple, parallelWriters)
  403. // Enable watch of tables from clients
  404. for i := 0; i < parallelWriters; i++ {
  405. go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
  406. }
  407. waitWriters(parallelWriters, false, doneCh)
  408. // Start parallel writers that will create and delete unique keys
  409. defer close(doneCh)
  410. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  411. for i := 0; i < parallelWriters; i++ {
  412. key := "key-" + strconv.Itoa(i) + "-"
  413. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  414. go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  415. }
  416. // Sync with all the writers
  417. keyMap := waitWriters(parallelWriters, true, doneCh)
  418. cancel()
  419. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  420. // check table entries for 2 minutes
  421. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  422. checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
  423. cancel()
  424. }
  425. // write-delete-leave-join networkName tableName numParallelWriters writeTimeSec
  426. func doWriteDeleteLeaveJoin(ips []string, args []string) {
  427. networkName := args[0]
  428. tableName := args[1]
  429. parallelWriters, _ := strconv.Atoi(args[2])
  430. writeTimeSec, _ := strconv.Atoi(args[3])
  431. // Start parallel writers that will create and delete unique keys
  432. doneCh := make(chan resultTuple, parallelWriters)
  433. defer close(doneCh)
  434. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  435. for i := 0; i < parallelWriters; i++ {
  436. key := "key-" + strconv.Itoa(i) + "-"
  437. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  438. go writeDeleteLeaveJoin(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  439. }
  440. // Sync with all the writers
  441. keyMap := waitWriters(parallelWriters, true, doneCh)
  442. cancel()
  443. logrus.Infof("Written a total of %d keys on the cluster", keyMap["totalKeys"])
  444. // check table entries for 2 minutes
  445. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  446. checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
  447. cancel()
  448. }
  449. // write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec
  450. func doWriteDeleteWaitLeaveJoin(ips []string, args []string) {
  451. networkName := args[0]
  452. tableName := args[1]
  453. parallelWriters, _ := strconv.Atoi(args[2])
  454. writeTimeSec, _ := strconv.Atoi(args[3])
  455. // Start parallel writers that will create and delete unique keys
  456. doneCh := make(chan resultTuple, parallelWriters)
  457. defer close(doneCh)
  458. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  459. for i := 0; i < parallelWriters; i++ {
  460. key := "key-" + strconv.Itoa(i) + "-"
  461. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  462. go writeDeleteUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  463. }
  464. // Sync with all the writers
  465. keyMap := waitWriters(parallelWriters, true, doneCh)
  466. cancel()
  467. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  468. // The writers will leave the network
  469. for i := 0; i < parallelWriters; i++ {
  470. logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
  471. go leaveNetwork(ips[i], servicePort, networkName, doneCh)
  472. }
  473. waitWriters(parallelWriters, false, doneCh)
  474. // Give some time
  475. time.Sleep(100 * time.Millisecond)
  476. // The writers will join the network
  477. for i := 0; i < parallelWriters; i++ {
  478. logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i])
  479. go joinNetwork(ips[i], servicePort, networkName, doneCh)
  480. }
  481. waitWriters(parallelWriters, false, doneCh)
  482. // check table entries for 2 minutes
  483. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  484. checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
  485. cancel()
  486. }
  487. // write-wait-leave networkName tableName numParallelWriters writeTimeSec
  488. func doWriteWaitLeave(ips []string, args []string) {
  489. networkName := args[0]
  490. tableName := args[1]
  491. parallelWriters, _ := strconv.Atoi(args[2])
  492. writeTimeSec, _ := strconv.Atoi(args[3])
  493. // Start parallel writers that will create and delete unique keys
  494. doneCh := make(chan resultTuple, parallelWriters)
  495. defer close(doneCh)
  496. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  497. for i := 0; i < parallelWriters; i++ {
  498. key := "key-" + strconv.Itoa(i) + "-"
  499. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  500. go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  501. }
  502. // Sync with all the writers
  503. keyMap := waitWriters(parallelWriters, true, doneCh)
  504. cancel()
  505. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  506. // The writers will leave the network
  507. for i := 0; i < parallelWriters; i++ {
  508. logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
  509. go leaveNetwork(ips[i], servicePort, networkName, doneCh)
  510. }
  511. waitWriters(parallelWriters, false, doneCh)
  512. // check table entries for 2 minutes
  513. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  514. checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
  515. cancel()
  516. }
  517. // write-wait-leave-join networkName tableName numParallelWriters writeTimeSec numParallelLeaver
  518. func doWriteWaitLeaveJoin(ips []string, args []string) {
  519. networkName := args[0]
  520. tableName := args[1]
  521. parallelWriters, _ := strconv.Atoi(args[2])
  522. writeTimeSec, _ := strconv.Atoi(args[3])
  523. parallelLeaver, _ := strconv.Atoi(args[4])
  524. // Start parallel writers that will create and delete unique keys
  525. doneCh := make(chan resultTuple, parallelWriters)
  526. defer close(doneCh)
  527. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  528. for i := 0; i < parallelWriters; i++ {
  529. key := "key-" + strconv.Itoa(i) + "-"
  530. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  531. go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  532. }
  533. // Sync with all the writers
  534. keyMap := waitWriters(parallelWriters, true, doneCh)
  535. cancel()
  536. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  537. keysExpected := keyMap[totalWrittenKeys]
  538. // The Leavers will leave the network
  539. for i := 0; i < parallelLeaver; i++ {
  540. logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
  541. go leaveNetwork(ips[i], servicePort, networkName, doneCh)
  542. // Once a node leave all the keys written previously will be deleted, so the expected keys will consider that as removed
  543. keysExpected -= keyMap[ips[i]]
  544. }
  545. waitWriters(parallelLeaver, false, doneCh)
  546. // Give some time
  547. time.Sleep(100 * time.Millisecond)
  548. // The writers will join the network
  549. for i := 0; i < parallelLeaver; i++ {
  550. logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i])
  551. go joinNetwork(ips[i], servicePort, networkName, doneCh)
  552. }
  553. waitWriters(parallelLeaver, false, doneCh)
  554. // check table entries for 2 minutes
  555. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  556. checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber)
  557. cancel()
  558. }
  559. var cmdArgChec = map[string]int{
  560. "debug": 0,
  561. "fail": 0,
  562. "ready": 2,
  563. "join": 2,
  564. "leave": 2,
  565. "join-network": 3,
  566. "leave-network": 3,
  567. "cluster-peers": 5,
  568. "network-peers": 5,
  569. "write-delete-unique-keys": 7,
  570. }
  571. // Client is a client
  572. func Client(args []string) {
  573. logrus.Infof("[CLIENT] Starting with arguments %v", args)
  574. command := args[0]
  575. if len(args) < cmdArgChec[command] {
  576. log.Fatalf("Command %s requires %d arguments, passed %d, aborting...", command, cmdArgChec[command], len(args))
  577. }
  578. switch command {
  579. case "debug":
  580. time.Sleep(1 * time.Hour)
  581. os.Exit(0)
  582. case "fail":
  583. log.Fatalf("Test error condition with message: error error error")
  584. }
  585. serviceName := args[1]
  586. ips, _ := net.LookupHost("tasks." + serviceName)
  587. logrus.Infof("got the ips %v", ips)
  588. if len(ips) == 0 {
  589. log.Fatalf("Cannot resolve any IP for the service tasks.%s", serviceName)
  590. }
  591. servicePort = args[2]
  592. commandArgs := args[3:]
  593. logrus.Infof("Executing %s with args:%v", command, commandArgs)
  594. switch command {
  595. case "ready":
  596. doReady(ips)
  597. case "join":
  598. doJoin(ips)
  599. case "leave":
  600. case "cluster-peers":
  601. // cluster-peers maxRetry
  602. doClusterPeers(ips, commandArgs)
  603. case "join-network":
  604. // join-network networkName
  605. doJoinNetwork(ips, commandArgs)
  606. case "leave-network":
  607. // leave-network networkName
  608. doLeaveNetwork(ips, commandArgs)
  609. case "network-peers":
  610. // network-peers networkName maxRetry
  611. doNetworkPeers(ips, commandArgs)
  612. case "write-unique-keys":
  613. // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
  614. doWriteUniqueKeys(ips, commandArgs)
  615. case "write-delete-unique-keys":
  616. // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
  617. doWriteDeleteUniqueKeys(ips, commandArgs)
  618. case "write-delete-leave-join":
  619. // write-delete-leave-join networkName tableName numParallelWriters writeTimeSec
  620. doWriteDeleteLeaveJoin(ips, commandArgs)
  621. case "write-delete-wait-leave-join":
  622. // write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec
  623. doWriteDeleteWaitLeaveJoin(ips, commandArgs)
  624. case "write-wait-leave":
  625. // write-wait-leave networkName tableName numParallelWriters writeTimeSec
  626. doWriteWaitLeave(ips, commandArgs)
  627. case "write-wait-leave-join":
  628. // write-wait-leave networkName tableName numParallelWriters writeTimeSec
  629. doWriteWaitLeaveJoin(ips, commandArgs)
  630. default:
  631. log.Fatalf("Command %s not recognized", command)
  632. }
  633. }