ndbClient.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693
  1. package dbclient
  2. import (
  3. "context"
  4. "io/ioutil"
  5. "log"
  6. "net"
  7. "net/http"
  8. "os"
  9. "regexp"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/sirupsen/logrus"
  14. )
  15. var servicePort string
  16. const totalWrittenKeys string = "totalKeys"
  17. type resultTuple struct {
  18. id string
  19. result int
  20. }
  21. func httpGetFatalError(ip, port, path string) {
  22. // for {
  23. body, err := httpGet(ip, port, path)
  24. if err != nil || !strings.Contains(string(body), "OK") {
  25. // if strings.Contains(err.Error(), "EOF") {
  26. // logrus.Warnf("Got EOF path:%s err:%s", path, err)
  27. // continue
  28. // }
  29. log.Fatalf("[%s] error %s %s", path, err, body)
  30. }
  31. // break
  32. // }
  33. }
  34. func httpGet(ip, port, path string) ([]byte, error) {
  35. resp, err := http.Get("http://" + ip + ":" + port + path)
  36. if err != nil {
  37. logrus.Errorf("httpGet error:%s", err)
  38. return nil, err
  39. }
  40. defer resp.Body.Close()
  41. body, err := ioutil.ReadAll(resp.Body)
  42. return body, err
  43. }
  44. func joinCluster(ip, port string, members []string, doneCh chan resultTuple) {
  45. httpGetFatalError(ip, port, "/join?members="+strings.Join(members, ","))
  46. if doneCh != nil {
  47. doneCh <- resultTuple{id: ip, result: 0}
  48. }
  49. }
  50. func joinNetwork(ip, port, network string, doneCh chan resultTuple) {
  51. httpGetFatalError(ip, port, "/joinnetwork?nid="+network)
  52. if doneCh != nil {
  53. doneCh <- resultTuple{id: ip, result: 0}
  54. }
  55. }
  56. func leaveNetwork(ip, port, network string, doneCh chan resultTuple) {
  57. httpGetFatalError(ip, port, "/leavenetwork?nid="+network)
  58. if doneCh != nil {
  59. doneCh <- resultTuple{id: ip, result: 0}
  60. }
  61. }
  62. func writeTableKey(ip, port, networkName, tableName, key string) {
  63. createPath := "/createentry?unsafe&nid=" + networkName + "&tname=" + tableName + "&value=v&key="
  64. httpGetFatalError(ip, port, createPath+key)
  65. }
  66. func deleteTableKey(ip, port, networkName, tableName, key string) {
  67. deletePath := "/deleteentry?nid=" + networkName + "&tname=" + tableName + "&key="
  68. httpGetFatalError(ip, port, deletePath+key)
  69. }
  70. func clusterPeersNumber(ip, port string, doneCh chan resultTuple) {
  71. body, err := httpGet(ip, port, "/clusterpeers")
  72. if err != nil {
  73. logrus.Errorf("clusterPeers %s there was an error: %s\n", ip, err)
  74. doneCh <- resultTuple{id: ip, result: -1}
  75. return
  76. }
  77. peersRegexp := regexp.MustCompile(`total entries: ([0-9]+)`)
  78. peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1])
  79. doneCh <- resultTuple{id: ip, result: peersNum}
  80. }
  81. func networkPeersNumber(ip, port, networkName string, doneCh chan resultTuple) {
  82. body, err := httpGet(ip, port, "/networkpeers?nid="+networkName)
  83. if err != nil {
  84. logrus.Errorf("networkPeersNumber %s there was an error: %s\n", ip, err)
  85. doneCh <- resultTuple{id: ip, result: -1}
  86. return
  87. }
  88. peersRegexp := regexp.MustCompile(`total entries: ([0-9]+)`)
  89. peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1])
  90. doneCh <- resultTuple{id: ip, result: peersNum}
  91. }
  92. func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan resultTuple) {
  93. body, err := httpGet(ip, port, "/gettable?nid="+networkName+"&tname="+tableName)
  94. if err != nil {
  95. logrus.Errorf("tableEntriesNumber %s there was an error: %s\n", ip, err)
  96. doneCh <- resultTuple{id: ip, result: -1}
  97. return
  98. }
  99. elementsRegexp := regexp.MustCompile(`total entries: ([0-9]+)`)
  100. entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
  101. doneCh <- resultTuple{id: ip, result: entriesNum}
  102. }
  103. func clientWatchTable(ip, port, networkName, tableName string, doneCh chan resultTuple) {
  104. httpGetFatalError(ip, port, "/watchtable?nid="+networkName+"&tname="+tableName)
  105. if doneCh != nil {
  106. doneCh <- resultTuple{id: ip, result: 0}
  107. }
  108. }
  109. func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan resultTuple) {
  110. body, err := httpGet(ip, port, "/watchedtableentries?nid="+networkName+"&tname="+tableName)
  111. if err != nil {
  112. logrus.Errorf("clientTableEntriesNumber %s there was an error: %s\n", ip, err)
  113. doneCh <- resultTuple{id: ip, result: -1}
  114. return
  115. }
  116. elementsRegexp := regexp.MustCompile(`total elements: ([0-9]+)`)
  117. entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
  118. doneCh <- resultTuple{id: ip, result: entriesNum}
  119. }
  120. func writeUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
  121. for x := 0; ; x++ {
  122. select {
  123. case <-ctx.Done():
  124. doneCh <- resultTuple{id: ip, result: x}
  125. return
  126. default:
  127. k := key + strconv.Itoa(x)
  128. // write key
  129. writeTableKey(ip, port, networkName, tableName, k)
  130. // give time to send out key writes
  131. time.Sleep(100 * time.Millisecond)
  132. }
  133. }
  134. }
  135. func writeDeleteUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
  136. for x := 0; ; x++ {
  137. select {
  138. case <-ctx.Done():
  139. doneCh <- resultTuple{id: ip, result: x}
  140. return
  141. default:
  142. k := key + strconv.Itoa(x)
  143. // write key
  144. writeTableKey(ip, port, networkName, tableName, k)
  145. // give time to send out key writes
  146. time.Sleep(100 * time.Millisecond)
  147. // delete key
  148. deleteTableKey(ip, port, networkName, tableName, k)
  149. }
  150. }
  151. }
  152. func writeDeleteLeaveJoin(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
  153. for x := 0; ; x++ {
  154. select {
  155. case <-ctx.Done():
  156. doneCh <- resultTuple{id: ip, result: x}
  157. return
  158. default:
  159. k := key + strconv.Itoa(x)
  160. // write key
  161. writeTableKey(ip, port, networkName, tableName, k)
  162. time.Sleep(100 * time.Millisecond)
  163. // delete key
  164. deleteTableKey(ip, port, networkName, tableName, k)
  165. // give some time
  166. time.Sleep(100 * time.Millisecond)
  167. // leave network
  168. leaveNetwork(ip, port, networkName, nil)
  169. // join network
  170. joinNetwork(ip, port, networkName, nil)
  171. }
  172. }
  173. }
  174. func ready(ip, port string, doneCh chan resultTuple) {
  175. for {
  176. body, err := httpGet(ip, port, "/ready")
  177. if err != nil || !strings.Contains(string(body), "OK") {
  178. time.Sleep(500 * time.Millisecond)
  179. continue
  180. }
  181. // success
  182. break
  183. }
  184. // notify the completion
  185. doneCh <- resultTuple{id: ip, result: 0}
  186. }
  187. func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) {
  188. startTime := time.Now().UnixNano()
  189. var successTime int64
  190. // Loop for 2 minutes to guartee that the result is stable
  191. for {
  192. select {
  193. case <-ctx.Done():
  194. // Validate test success, if the time is set means that all the tables are empty
  195. if successTime != 0 {
  196. logrus.Infof("Check table passed, the cluster converged in %d msec", time.Duration(successTime-startTime)/time.Millisecond)
  197. return
  198. }
  199. log.Fatal("Test failed, there is still entries in the tables of the nodes")
  200. default:
  201. logrus.Infof("Checking table %s expected %d", tableName, expectedEntries)
  202. doneCh := make(chan resultTuple, len(ips))
  203. for _, ip := range ips {
  204. go fn(ip, servicePort, networkName, tableName, doneCh)
  205. }
  206. nodesWithCorrectEntriesNum := 0
  207. for i := len(ips); i > 0; i-- {
  208. tableEntries := <-doneCh
  209. logrus.Infof("Node %s has %d entries", tableEntries.id, tableEntries.result)
  210. if tableEntries.result == expectedEntries {
  211. nodesWithCorrectEntriesNum++
  212. }
  213. }
  214. close(doneCh)
  215. if nodesWithCorrectEntriesNum == len(ips) {
  216. if successTime == 0 {
  217. successTime = time.Now().UnixNano()
  218. logrus.Infof("Success after %d msec", time.Duration(successTime-startTime)/time.Millisecond)
  219. }
  220. } else {
  221. successTime = 0
  222. }
  223. time.Sleep(10 * time.Second)
  224. }
  225. }
  226. }
  227. func waitWriters(parallelWriters int, mustWrite bool, doneCh chan resultTuple) map[string]int {
  228. var totalKeys int
  229. resultTable := make(map[string]int)
  230. for i := 0; i < parallelWriters; i++ {
  231. logrus.Infof("Waiting for %d workers", parallelWriters-i)
  232. workerReturn := <-doneCh
  233. totalKeys += workerReturn.result
  234. if mustWrite && workerReturn.result == 0 {
  235. log.Fatalf("The worker %s did not write any key %d == 0", workerReturn.id, workerReturn.result)
  236. }
  237. if !mustWrite && workerReturn.result != 0 {
  238. log.Fatalf("The worker %s was supposed to return 0 instead %d != 0", workerReturn.id, workerReturn.result)
  239. }
  240. if mustWrite {
  241. resultTable[workerReturn.id] = workerReturn.result
  242. logrus.Infof("The worker %s wrote %d keys", workerReturn.id, workerReturn.result)
  243. }
  244. }
  245. resultTable[totalWrittenKeys] = totalKeys
  246. return resultTable
  247. }
  248. // ready
  249. func doReady(ips []string) {
  250. doneCh := make(chan resultTuple, len(ips))
  251. // check all the nodes
  252. for _, ip := range ips {
  253. go ready(ip, servicePort, doneCh)
  254. }
  255. // wait for the readiness of all nodes
  256. for i := len(ips); i > 0; i-- {
  257. <-doneCh
  258. }
  259. close(doneCh)
  260. }
  261. // join
  262. func doJoin(ips []string) {
  263. doneCh := make(chan resultTuple, len(ips))
  264. // check all the nodes
  265. for i, ip := range ips {
  266. members := append([]string(nil), ips[:i]...)
  267. members = append(members, ips[i+1:]...)
  268. go joinCluster(ip, servicePort, members, doneCh)
  269. }
  270. // wait for the readiness of all nodes
  271. for i := len(ips); i > 0; i-- {
  272. <-doneCh
  273. }
  274. close(doneCh)
  275. }
  276. // cluster-peers expectedNumberPeers
  277. func doClusterPeers(ips []string, args []string) {
  278. doneCh := make(chan resultTuple, len(ips))
  279. expectedPeers, _ := strconv.Atoi(args[0])
  280. // check all the nodes
  281. for _, ip := range ips {
  282. go clusterPeersNumber(ip, servicePort, doneCh)
  283. }
  284. // wait for the readiness of all nodes
  285. for i := len(ips); i > 0; i-- {
  286. node := <-doneCh
  287. if node.result != expectedPeers {
  288. log.Fatalf("Expected peers from %s missmatch %d != %d", node.id, expectedPeers, node.result)
  289. }
  290. }
  291. close(doneCh)
  292. }
  293. // join-network networkName
  294. func doJoinNetwork(ips []string, args []string) {
  295. doneCh := make(chan resultTuple, len(ips))
  296. // check all the nodes
  297. for _, ip := range ips {
  298. go joinNetwork(ip, servicePort, args[0], doneCh)
  299. }
  300. // wait for the readiness of all nodes
  301. for i := len(ips); i > 0; i-- {
  302. <-doneCh
  303. }
  304. close(doneCh)
  305. }
  306. // leave-network networkName
  307. func doLeaveNetwork(ips []string, args []string) {
  308. doneCh := make(chan resultTuple, len(ips))
  309. // check all the nodes
  310. for _, ip := range ips {
  311. go leaveNetwork(ip, servicePort, args[0], doneCh)
  312. }
  313. // wait for the readiness of all nodes
  314. for i := len(ips); i > 0; i-- {
  315. <-doneCh
  316. }
  317. close(doneCh)
  318. }
  319. // cluster-peers networkName expectedNumberPeers maxRetry
  320. func doNetworkPeers(ips []string, args []string) {
  321. doneCh := make(chan resultTuple, len(ips))
  322. networkName := args[0]
  323. expectedPeers, _ := strconv.Atoi(args[1])
  324. maxRetry, _ := strconv.Atoi(args[2])
  325. for retry := 0; retry < maxRetry; retry++ {
  326. // check all the nodes
  327. for _, ip := range ips {
  328. go networkPeersNumber(ip, servicePort, networkName, doneCh)
  329. }
  330. // wait for the readiness of all nodes
  331. for i := len(ips); i > 0; i-- {
  332. node := <-doneCh
  333. if node.result != expectedPeers {
  334. if retry == maxRetry-1 {
  335. log.Fatalf("Expected peers from %s missmatch %d != %d", node.id, expectedPeers, node.result)
  336. } else {
  337. logrus.Warnf("Expected peers from %s missmatch %d != %d", node.id, expectedPeers, node.result)
  338. }
  339. time.Sleep(1 * time.Second)
  340. }
  341. }
  342. }
  343. close(doneCh)
  344. }
  345. // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
  346. func doWriteDeleteUniqueKeys(ips []string, args []string) {
  347. networkName := args[0]
  348. tableName := args[1]
  349. parallelWriters, _ := strconv.Atoi(args[2])
  350. writeTimeSec, _ := strconv.Atoi(args[3])
  351. doneCh := make(chan resultTuple, parallelWriters)
  352. // Enable watch of tables from clients
  353. for i := 0; i < parallelWriters; i++ {
  354. go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
  355. }
  356. waitWriters(parallelWriters, false, doneCh)
  357. // Start parallel writers that will create and delete unique keys
  358. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  359. for i := 0; i < parallelWriters; i++ {
  360. key := "key-" + strconv.Itoa(i) + "-"
  361. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  362. go writeDeleteUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  363. }
  364. // Sync with all the writers
  365. keyMap := waitWriters(parallelWriters, true, doneCh)
  366. cancel()
  367. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  368. // check table entries for 2 minutes
  369. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  370. checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
  371. cancel()
  372. ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
  373. checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber)
  374. cancel()
  375. }
  376. // write-unique-keys networkName tableName numParallelWriters writeTimeSec
  377. func doWriteUniqueKeys(ips []string, args []string) {
  378. networkName := args[0]
  379. tableName := args[1]
  380. parallelWriters, _ := strconv.Atoi(args[2])
  381. writeTimeSec, _ := strconv.Atoi(args[3])
  382. doneCh := make(chan resultTuple, parallelWriters)
  383. // Enable watch of tables from clients
  384. for i := 0; i < parallelWriters; i++ {
  385. go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
  386. }
  387. waitWriters(parallelWriters, false, doneCh)
  388. // Start parallel writers that will create and delete unique keys
  389. defer close(doneCh)
  390. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  391. for i := 0; i < parallelWriters; i++ {
  392. key := "key-" + strconv.Itoa(i) + "-"
  393. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  394. go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  395. }
  396. // Sync with all the writers
  397. keyMap := waitWriters(parallelWriters, true, doneCh)
  398. cancel()
  399. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  400. // check table entries for 2 minutes
  401. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  402. checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
  403. cancel()
  404. }
  405. // write-delete-leave-join networkName tableName numParallelWriters writeTimeSec
  406. func doWriteDeleteLeaveJoin(ips []string, args []string) {
  407. networkName := args[0]
  408. tableName := args[1]
  409. parallelWriters, _ := strconv.Atoi(args[2])
  410. writeTimeSec, _ := strconv.Atoi(args[3])
  411. // Start parallel writers that will create and delete unique keys
  412. doneCh := make(chan resultTuple, parallelWriters)
  413. defer close(doneCh)
  414. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  415. for i := 0; i < parallelWriters; i++ {
  416. key := "key-" + strconv.Itoa(i) + "-"
  417. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  418. go writeDeleteLeaveJoin(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  419. }
  420. // Sync with all the writers
  421. keyMap := waitWriters(parallelWriters, true, doneCh)
  422. cancel()
  423. logrus.Infof("Written a total of %d keys on the cluster", keyMap["totalKeys"])
  424. // check table entries for 2 minutes
  425. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  426. checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
  427. cancel()
  428. }
  429. // write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec
  430. func doWriteDeleteWaitLeaveJoin(ips []string, args []string) {
  431. networkName := args[0]
  432. tableName := args[1]
  433. parallelWriters, _ := strconv.Atoi(args[2])
  434. writeTimeSec, _ := strconv.Atoi(args[3])
  435. // Start parallel writers that will create and delete unique keys
  436. doneCh := make(chan resultTuple, parallelWriters)
  437. defer close(doneCh)
  438. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  439. for i := 0; i < parallelWriters; i++ {
  440. key := "key-" + strconv.Itoa(i) + "-"
  441. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  442. go writeDeleteUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  443. }
  444. // Sync with all the writers
  445. keyMap := waitWriters(parallelWriters, true, doneCh)
  446. cancel()
  447. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  448. // The writers will leave the network
  449. for i := 0; i < parallelWriters; i++ {
  450. logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
  451. go leaveNetwork(ips[i], servicePort, networkName, doneCh)
  452. }
  453. waitWriters(parallelWriters, false, doneCh)
  454. // Give some time
  455. time.Sleep(100 * time.Millisecond)
  456. // The writers will join the network
  457. for i := 0; i < parallelWriters; i++ {
  458. logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i])
  459. go joinNetwork(ips[i], servicePort, networkName, doneCh)
  460. }
  461. waitWriters(parallelWriters, false, doneCh)
  462. // check table entries for 2 minutes
  463. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  464. checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
  465. cancel()
  466. }
  467. // write-wait-leave networkName tableName numParallelWriters writeTimeSec
  468. func doWriteWaitLeave(ips []string, args []string) {
  469. networkName := args[0]
  470. tableName := args[1]
  471. parallelWriters, _ := strconv.Atoi(args[2])
  472. writeTimeSec, _ := strconv.Atoi(args[3])
  473. // Start parallel writers that will create and delete unique keys
  474. doneCh := make(chan resultTuple, parallelWriters)
  475. defer close(doneCh)
  476. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  477. for i := 0; i < parallelWriters; i++ {
  478. key := "key-" + strconv.Itoa(i) + "-"
  479. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  480. go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  481. }
  482. // Sync with all the writers
  483. keyMap := waitWriters(parallelWriters, true, doneCh)
  484. cancel()
  485. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  486. // The writers will leave the network
  487. for i := 0; i < parallelWriters; i++ {
  488. logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
  489. go leaveNetwork(ips[i], servicePort, networkName, doneCh)
  490. }
  491. waitWriters(parallelWriters, false, doneCh)
  492. // check table entries for 2 minutes
  493. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  494. checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
  495. cancel()
  496. }
  497. // write-wait-leave-join networkName tableName numParallelWriters writeTimeSec numParallelLeaver
  498. func doWriteWaitLeaveJoin(ips []string, args []string) {
  499. networkName := args[0]
  500. tableName := args[1]
  501. parallelWriters, _ := strconv.Atoi(args[2])
  502. writeTimeSec, _ := strconv.Atoi(args[3])
  503. parallelLeaver, _ := strconv.Atoi(args[4])
  504. // Start parallel writers that will create and delete unique keys
  505. doneCh := make(chan resultTuple, parallelWriters)
  506. defer close(doneCh)
  507. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  508. for i := 0; i < parallelWriters; i++ {
  509. key := "key-" + strconv.Itoa(i) + "-"
  510. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  511. go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  512. }
  513. // Sync with all the writers
  514. keyMap := waitWriters(parallelWriters, true, doneCh)
  515. cancel()
  516. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  517. keysExpected := keyMap[totalWrittenKeys]
  518. // The Leavers will leave the network
  519. for i := 0; i < parallelLeaver; i++ {
  520. logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
  521. go leaveNetwork(ips[i], servicePort, networkName, doneCh)
  522. // Once a node leave all the keys written previously will be deleted, so the expected keys will consider that as removed
  523. keysExpected -= keyMap[ips[i]]
  524. }
  525. waitWriters(parallelLeaver, false, doneCh)
  526. // Give some time
  527. time.Sleep(100 * time.Millisecond)
  528. // The writers will join the network
  529. for i := 0; i < parallelLeaver; i++ {
  530. logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i])
  531. go joinNetwork(ips[i], servicePort, networkName, doneCh)
  532. }
  533. waitWriters(parallelLeaver, false, doneCh)
  534. // check table entries for 2 minutes
  535. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  536. checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber)
  537. cancel()
  538. }
  539. var cmdArgChec = map[string]int{
  540. "debug": 0,
  541. "fail": 0,
  542. "ready": 2,
  543. "join": 2,
  544. "leave": 2,
  545. "join-network": 3,
  546. "leave-network": 3,
  547. "cluster-peers": 3,
  548. "write-delete-unique-keys": 4,
  549. }
  550. // Client is a client
  551. func Client(args []string) {
  552. logrus.Infof("[CLIENT] Starting with arguments %v", args)
  553. command := args[0]
  554. if len(args) < cmdArgChec[command] {
  555. log.Fatalf("Command %s requires %d arguments, aborting...", command, cmdArgChec[command])
  556. }
  557. switch command {
  558. case "debug":
  559. time.Sleep(1 * time.Hour)
  560. os.Exit(0)
  561. case "fail":
  562. log.Fatalf("Test error condition with message: error error error")
  563. }
  564. serviceName := args[1]
  565. ips, _ := net.LookupHost("tasks." + serviceName)
  566. logrus.Infof("got the ips %v", ips)
  567. if len(ips) == 0 {
  568. log.Fatalf("Cannot resolve any IP for the service tasks.%s", serviceName)
  569. }
  570. servicePort = args[2]
  571. commandArgs := args[3:]
  572. logrus.Infof("Executing %s with args:%v", command, commandArgs)
  573. switch command {
  574. case "ready":
  575. doReady(ips)
  576. case "join":
  577. doJoin(ips)
  578. case "leave":
  579. case "cluster-peers":
  580. // cluster-peers
  581. doClusterPeers(ips, commandArgs)
  582. case "join-network":
  583. // join-network networkName
  584. doJoinNetwork(ips, commandArgs)
  585. case "leave-network":
  586. // leave-network networkName
  587. doLeaveNetwork(ips, commandArgs)
  588. case "network-peers":
  589. // network-peers networkName maxRetry
  590. doNetworkPeers(ips, commandArgs)
  591. case "write-unique-keys":
  592. // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
  593. doWriteUniqueKeys(ips, commandArgs)
  594. case "write-delete-unique-keys":
  595. // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
  596. doWriteDeleteUniqueKeys(ips, commandArgs)
  597. case "write-delete-leave-join":
  598. // write-delete-leave-join networkName tableName numParallelWriters writeTimeSec
  599. doWriteDeleteLeaveJoin(ips, commandArgs)
  600. case "write-delete-wait-leave-join":
  601. // write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec
  602. doWriteDeleteWaitLeaveJoin(ips, commandArgs)
  603. case "write-wait-leave":
  604. // write-wait-leave networkName tableName numParallelWriters writeTimeSec
  605. doWriteWaitLeave(ips, commandArgs)
  606. case "write-wait-leave-join":
  607. // write-wait-leave networkName tableName numParallelWriters writeTimeSec
  608. doWriteWaitLeaveJoin(ips, commandArgs)
  609. default:
  610. log.Fatalf("Command %s not recognized", command)
  611. }
  612. }