ndbClient.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874
  1. package dbclient
  2. import (
  3. "context"
  4. "fmt"
  5. "io/ioutil"
  6. "log"
  7. "net"
  8. "net/http"
  9. "os"
  10. "regexp"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/sirupsen/logrus"
  15. )
  16. var servicePort string
  17. const totalWrittenKeys string = "totalKeys"
  18. type resultTuple struct {
  19. id string
  20. result int
  21. }
  22. func httpGetFatalError(ip, port, path string) {
  23. body, err := httpGet(ip, port, path)
  24. if err != nil || !strings.Contains(string(body), "OK") {
  25. log.Fatalf("[%s] error %s %s", path, err, body)
  26. }
  27. }
  28. func httpGet(ip, port, path string) ([]byte, error) {
  29. resp, err := http.Get("http://" + ip + ":" + port + path)
  30. if err != nil {
  31. logrus.Errorf("httpGet error:%s", err)
  32. return nil, err
  33. }
  34. defer resp.Body.Close()
  35. body, err := ioutil.ReadAll(resp.Body)
  36. return body, err
  37. }
  38. func joinCluster(ip, port string, members []string, doneCh chan resultTuple) {
  39. httpGetFatalError(ip, port, "/join?members="+strings.Join(members, ","))
  40. if doneCh != nil {
  41. doneCh <- resultTuple{id: ip, result: 0}
  42. }
  43. }
  44. func joinNetwork(ip, port, network string, doneCh chan resultTuple) {
  45. httpGetFatalError(ip, port, "/joinnetwork?nid="+network)
  46. if doneCh != nil {
  47. doneCh <- resultTuple{id: ip, result: 0}
  48. }
  49. }
  50. func leaveNetwork(ip, port, network string, doneCh chan resultTuple) {
  51. httpGetFatalError(ip, port, "/leavenetwork?nid="+network)
  52. if doneCh != nil {
  53. doneCh <- resultTuple{id: ip, result: 0}
  54. }
  55. }
  56. func writeTableKey(ip, port, networkName, tableName, key string) {
  57. createPath := "/createentry?unsafe&nid=" + networkName + "&tname=" + tableName + "&value=v&key="
  58. httpGetFatalError(ip, port, createPath+key)
  59. }
  60. func deleteTableKey(ip, port, networkName, tableName, key string) {
  61. deletePath := "/deleteentry?nid=" + networkName + "&tname=" + tableName + "&key="
  62. httpGetFatalError(ip, port, deletePath+key)
  63. }
  64. func clusterPeersNumber(ip, port string, doneCh chan resultTuple) {
  65. body, err := httpGet(ip, port, "/clusterpeers")
  66. if err != nil {
  67. logrus.Errorf("clusterPeers %s there was an error: %s", ip, err)
  68. doneCh <- resultTuple{id: ip, result: -1}
  69. return
  70. }
  71. peersRegexp := regexp.MustCompile(`total entries: ([0-9]+)`)
  72. peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1])
  73. doneCh <- resultTuple{id: ip, result: peersNum}
  74. }
  75. func networkPeersNumber(ip, port, networkName string, doneCh chan resultTuple) {
  76. body, err := httpGet(ip, port, "/networkpeers?nid="+networkName)
  77. if err != nil {
  78. logrus.Errorf("networkPeersNumber %s there was an error: %s", ip, err)
  79. doneCh <- resultTuple{id: ip, result: -1}
  80. return
  81. }
  82. peersRegexp := regexp.MustCompile(`total entries: ([0-9]+)`)
  83. peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1])
  84. doneCh <- resultTuple{id: ip, result: peersNum}
  85. }
  86. func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan resultTuple) {
  87. body, err := httpGet(ip, port, "/gettable?nid="+networkName+"&tname="+tableName)
  88. if err != nil {
  89. logrus.Errorf("tableEntriesNumber %s there was an error: %s", ip, err)
  90. doneCh <- resultTuple{id: ip, result: -1}
  91. return
  92. }
  93. elementsRegexp := regexp.MustCompile(`total entries: ([0-9]+)`)
  94. entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
  95. doneCh <- resultTuple{id: ip, result: entriesNum}
  96. }
  97. func dbEntriesNumber(ip, port, networkName string, doneCh chan resultTuple) {
  98. body, err := httpGet(ip, port, "/networkstats?nid="+networkName)
  99. if err != nil {
  100. logrus.Errorf("entriesNumber %s there was an error: %s", ip, err)
  101. doneCh <- resultTuple{id: ip, result: -1}
  102. return
  103. }
  104. elementsRegexp := regexp.MustCompile(`entries: ([0-9]+)`)
  105. entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
  106. doneCh <- resultTuple{id: ip, result: entriesNum}
  107. }
  108. func dbQueueLength(ip, port, networkName string, doneCh chan resultTuple) {
  109. body, err := httpGet(ip, port, "/networkstats?nid="+networkName)
  110. if err != nil {
  111. logrus.Errorf("queueLength %s there was an error: %s", ip, err)
  112. doneCh <- resultTuple{id: ip, result: -1}
  113. return
  114. }
  115. elementsRegexp := regexp.MustCompile(`qlen: ([0-9]+)`)
  116. entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
  117. doneCh <- resultTuple{id: ip, result: entriesNum}
  118. }
  119. func clientWatchTable(ip, port, networkName, tableName string, doneCh chan resultTuple) {
  120. httpGetFatalError(ip, port, "/watchtable?nid="+networkName+"&tname="+tableName)
  121. if doneCh != nil {
  122. doneCh <- resultTuple{id: ip, result: 0}
  123. }
  124. }
  125. func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan resultTuple) {
  126. body, err := httpGet(ip, port, "/watchedtableentries?nid="+networkName+"&tname="+tableName)
  127. if err != nil {
  128. logrus.Errorf("clientTableEntriesNumber %s there was an error: %s", ip, err)
  129. doneCh <- resultTuple{id: ip, result: -1}
  130. return
  131. }
  132. elementsRegexp := regexp.MustCompile(`total elements: ([0-9]+)`)
  133. entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
  134. doneCh <- resultTuple{id: ip, result: entriesNum}
  135. }
  136. func writeKeysNumber(ip, port, networkName, tableName, key string, number int, doneCh chan resultTuple) {
  137. x := 0
  138. for ; x < number; x++ {
  139. k := key + strconv.Itoa(x)
  140. // write key
  141. writeTableKey(ip, port, networkName, tableName, k)
  142. }
  143. doneCh <- resultTuple{id: ip, result: x}
  144. }
  145. func deleteKeysNumber(ip, port, networkName, tableName, key string, number int, doneCh chan resultTuple) {
  146. x := 0
  147. for ; x < number; x++ {
  148. k := key + strconv.Itoa(x)
  149. // write key
  150. deleteTableKey(ip, port, networkName, tableName, k)
  151. }
  152. doneCh <- resultTuple{id: ip, result: x}
  153. }
  154. func writeUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
  155. for x := 0; ; x++ {
  156. select {
  157. case <-ctx.Done():
  158. doneCh <- resultTuple{id: ip, result: x}
  159. return
  160. default:
  161. k := key + strconv.Itoa(x)
  162. // write key
  163. writeTableKey(ip, port, networkName, tableName, k)
  164. // give time to send out key writes
  165. time.Sleep(100 * time.Millisecond)
  166. }
  167. }
  168. }
  169. func writeDeleteUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
  170. for x := 0; ; x++ {
  171. select {
  172. case <-ctx.Done():
  173. doneCh <- resultTuple{id: ip, result: x}
  174. return
  175. default:
  176. k := key + strconv.Itoa(x)
  177. // write key
  178. writeTableKey(ip, port, networkName, tableName, k)
  179. // give time to send out key writes
  180. time.Sleep(100 * time.Millisecond)
  181. // delete key
  182. deleteTableKey(ip, port, networkName, tableName, k)
  183. }
  184. }
  185. }
  186. func writeDeleteLeaveJoin(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
  187. for x := 0; ; x++ {
  188. select {
  189. case <-ctx.Done():
  190. doneCh <- resultTuple{id: ip, result: x}
  191. return
  192. default:
  193. k := key + strconv.Itoa(x)
  194. // write key
  195. writeTableKey(ip, port, networkName, tableName, k)
  196. time.Sleep(100 * time.Millisecond)
  197. // delete key
  198. deleteTableKey(ip, port, networkName, tableName, k)
  199. // give some time
  200. time.Sleep(100 * time.Millisecond)
  201. // leave network
  202. leaveNetwork(ip, port, networkName, nil)
  203. // join network
  204. joinNetwork(ip, port, networkName, nil)
  205. }
  206. }
  207. }
  208. func ready(ip, port string, doneCh chan resultTuple) {
  209. for {
  210. body, err := httpGet(ip, port, "/ready")
  211. if err != nil || !strings.Contains(string(body), "OK") {
  212. time.Sleep(500 * time.Millisecond)
  213. continue
  214. }
  215. // success
  216. break
  217. }
  218. // notify the completion
  219. doneCh <- resultTuple{id: ip, result: 0}
  220. }
  221. func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) (opTime time.Duration) {
  222. startTime := time.Now().UnixNano()
  223. var successTime int64
  224. // Loop for 2 minutes to guarantee that the result is stable
  225. for {
  226. select {
  227. case <-ctx.Done():
  228. // Validate test success, if the time is set means that all the tables are empty
  229. if successTime != 0 {
  230. opTime = time.Duration(successTime-startTime) / time.Millisecond
  231. logrus.Infof("Check table passed, the cluster converged in %d msec", opTime)
  232. return
  233. }
  234. log.Fatal("Test failed, there is still entries in the tables of the nodes")
  235. default:
  236. logrus.Infof("Checking table %s expected %d", tableName, expectedEntries)
  237. doneCh := make(chan resultTuple, len(ips))
  238. for _, ip := range ips {
  239. go fn(ip, servicePort, networkName, tableName, doneCh)
  240. }
  241. nodesWithCorrectEntriesNum := 0
  242. for i := len(ips); i > 0; i-- {
  243. tableEntries := <-doneCh
  244. logrus.Infof("Node %s has %d entries", tableEntries.id, tableEntries.result)
  245. if tableEntries.result == expectedEntries {
  246. nodesWithCorrectEntriesNum++
  247. }
  248. }
  249. close(doneCh)
  250. if nodesWithCorrectEntriesNum == len(ips) {
  251. if successTime == 0 {
  252. successTime = time.Now().UnixNano()
  253. logrus.Infof("Success after %d msec", time.Duration(successTime-startTime)/time.Millisecond)
  254. }
  255. } else {
  256. successTime = 0
  257. }
  258. time.Sleep(10 * time.Second)
  259. }
  260. }
  261. }
  262. func waitWriters(parallelWriters int, mustWrite bool, doneCh chan resultTuple) map[string]int {
  263. var totalKeys int
  264. resultTable := make(map[string]int)
  265. for i := 0; i < parallelWriters; i++ {
  266. logrus.Infof("Waiting for %d workers", parallelWriters-i)
  267. workerReturn := <-doneCh
  268. totalKeys += workerReturn.result
  269. if mustWrite && workerReturn.result == 0 {
  270. log.Fatalf("The worker %s did not write any key %d == 0", workerReturn.id, workerReturn.result)
  271. }
  272. if !mustWrite && workerReturn.result != 0 {
  273. log.Fatalf("The worker %s was supposed to return 0 instead %d != 0", workerReturn.id, workerReturn.result)
  274. }
  275. if mustWrite {
  276. resultTable[workerReturn.id] = workerReturn.result
  277. logrus.Infof("The worker %s wrote %d keys", workerReturn.id, workerReturn.result)
  278. }
  279. }
  280. resultTable[totalWrittenKeys] = totalKeys
  281. return resultTable
  282. }
  283. // ready
  284. func doReady(ips []string) {
  285. doneCh := make(chan resultTuple, len(ips))
  286. // check all the nodes
  287. for _, ip := range ips {
  288. go ready(ip, servicePort, doneCh)
  289. }
  290. // wait for the readiness of all nodes
  291. for i := len(ips); i > 0; i-- {
  292. <-doneCh
  293. }
  294. close(doneCh)
  295. }
  296. // join
  297. func doJoin(ips []string) {
  298. doneCh := make(chan resultTuple, len(ips))
  299. // check all the nodes
  300. for i, ip := range ips {
  301. members := append([]string(nil), ips[:i]...)
  302. members = append(members, ips[i+1:]...)
  303. go joinCluster(ip, servicePort, members, doneCh)
  304. }
  305. // wait for the readiness of all nodes
  306. for i := len(ips); i > 0; i-- {
  307. <-doneCh
  308. }
  309. close(doneCh)
  310. }
  311. // cluster-peers expectedNumberPeers maxRetry
  312. func doClusterPeers(ips []string, args []string) {
  313. doneCh := make(chan resultTuple, len(ips))
  314. expectedPeers, _ := strconv.Atoi(args[0])
  315. maxRetry, _ := strconv.Atoi(args[1])
  316. for retry := 0; retry < maxRetry; retry++ {
  317. // check all the nodes
  318. for _, ip := range ips {
  319. go clusterPeersNumber(ip, servicePort, doneCh)
  320. }
  321. var failed bool
  322. // wait for the readiness of all nodes
  323. for i := len(ips); i > 0; i-- {
  324. node := <-doneCh
  325. if node.result != expectedPeers {
  326. failed = true
  327. if retry == maxRetry-1 {
  328. log.Fatalf("Expected peers from %s mismatch %d != %d", node.id, expectedPeers, node.result)
  329. } else {
  330. logrus.Warnf("Expected peers from %s mismatch %d != %d", node.id, expectedPeers, node.result)
  331. }
  332. time.Sleep(1 * time.Second)
  333. }
  334. }
  335. // check if needs retry
  336. if !failed {
  337. break
  338. }
  339. }
  340. close(doneCh)
  341. }
  342. // join-network networkName
  343. func doJoinNetwork(ips []string, args []string) {
  344. doneCh := make(chan resultTuple, len(ips))
  345. // check all the nodes
  346. for _, ip := range ips {
  347. go joinNetwork(ip, servicePort, args[0], doneCh)
  348. }
  349. // wait for the readiness of all nodes
  350. for i := len(ips); i > 0; i-- {
  351. <-doneCh
  352. }
  353. close(doneCh)
  354. }
  355. // leave-network networkName
  356. func doLeaveNetwork(ips []string, args []string) {
  357. doneCh := make(chan resultTuple, len(ips))
  358. // check all the nodes
  359. for _, ip := range ips {
  360. go leaveNetwork(ip, servicePort, args[0], doneCh)
  361. }
  362. // wait for the readiness of all nodes
  363. for i := len(ips); i > 0; i-- {
  364. <-doneCh
  365. }
  366. close(doneCh)
  367. }
  368. // network-peers networkName expectedNumberPeers maxRetry
  369. func doNetworkPeers(ips []string, args []string) {
  370. doneCh := make(chan resultTuple, len(ips))
  371. networkName := args[0]
  372. expectedPeers, _ := strconv.Atoi(args[1])
  373. maxRetry, _ := strconv.Atoi(args[2])
  374. for retry := 0; retry < maxRetry; retry++ {
  375. // check all the nodes
  376. for _, ip := range ips {
  377. go networkPeersNumber(ip, servicePort, networkName, doneCh)
  378. }
  379. var failed bool
  380. // wait for the readiness of all nodes
  381. for i := len(ips); i > 0; i-- {
  382. node := <-doneCh
  383. if node.result != expectedPeers {
  384. failed = true
  385. if retry == maxRetry-1 {
  386. log.Fatalf("Expected peers from %s mismatch %d != %d", node.id, expectedPeers, node.result)
  387. } else {
  388. logrus.Warnf("Expected peers from %s mismatch %d != %d", node.id, expectedPeers, node.result)
  389. }
  390. time.Sleep(1 * time.Second)
  391. }
  392. }
  393. // check if needs retry
  394. if !failed {
  395. break
  396. }
  397. }
  398. close(doneCh)
  399. }
  400. // network-stats-queue networkName <gt/lt> queueSize
  401. func doNetworkStatsQueue(ips []string, args []string) {
  402. doneCh := make(chan resultTuple, len(ips))
  403. networkName := args[0]
  404. comparison := args[1]
  405. size, _ := strconv.Atoi(args[2])
  406. // check all the nodes
  407. for _, ip := range ips {
  408. go dbQueueLength(ip, servicePort, networkName, doneCh)
  409. }
  410. var avgQueueSize int
  411. // wait for the readiness of all nodes
  412. for i := len(ips); i > 0; i-- {
  413. node := <-doneCh
  414. switch comparison {
  415. case "lt":
  416. if node.result > size {
  417. log.Fatalf("Expected queue size from %s to be %d < %d", node.id, node.result, size)
  418. }
  419. case "gt":
  420. if node.result < size {
  421. log.Fatalf("Expected queue size from %s to be %d > %d", node.id, node.result, size)
  422. }
  423. default:
  424. log.Fatal("unknown comparison operator")
  425. }
  426. avgQueueSize += node.result
  427. }
  428. close(doneCh)
  429. avgQueueSize /= len(ips)
  430. fmt.Fprintf(os.Stderr, "doNetworkStatsQueue succeeded with avg queue:%d", avgQueueSize)
  431. }
  432. // write-keys networkName tableName parallelWriters numberOfKeysEach
  433. func doWriteKeys(ips []string, args []string) {
  434. networkName := args[0]
  435. tableName := args[1]
  436. parallelWriters, _ := strconv.Atoi(args[2])
  437. numberOfKeys, _ := strconv.Atoi(args[3])
  438. doneCh := make(chan resultTuple, parallelWriters)
  439. // Enable watch of tables from clients
  440. for i := 0; i < parallelWriters; i++ {
  441. go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
  442. }
  443. waitWriters(parallelWriters, false, doneCh)
  444. // Start parallel writers that will create and delete unique keys
  445. defer close(doneCh)
  446. for i := 0; i < parallelWriters; i++ {
  447. key := "key-" + strconv.Itoa(i) + "-"
  448. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  449. go writeKeysNumber(ips[i], servicePort, networkName, tableName, key, numberOfKeys, doneCh)
  450. }
  451. // Sync with all the writers
  452. keyMap := waitWriters(parallelWriters, true, doneCh)
  453. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  454. // check table entries for 2 minutes
  455. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
  456. opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
  457. cancel()
  458. fmt.Fprintf(os.Stderr, "doWriteKeys succeeded in %d msec", opTime)
  459. }
  460. // delete-keys networkName tableName parallelWriters numberOfKeysEach
  461. func doDeleteKeys(ips []string, args []string) {
  462. networkName := args[0]
  463. tableName := args[1]
  464. parallelWriters, _ := strconv.Atoi(args[2])
  465. numberOfKeys, _ := strconv.Atoi(args[3])
  466. doneCh := make(chan resultTuple, parallelWriters)
  467. // Enable watch of tables from clients
  468. for i := 0; i < parallelWriters; i++ {
  469. go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
  470. }
  471. waitWriters(parallelWriters, false, doneCh)
  472. // Start parallel writers that will create and delete unique keys
  473. defer close(doneCh)
  474. for i := 0; i < parallelWriters; i++ {
  475. key := "key-" + strconv.Itoa(i) + "-"
  476. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  477. go deleteKeysNumber(ips[i], servicePort, networkName, tableName, key, numberOfKeys, doneCh)
  478. }
  479. // Sync with all the writers
  480. keyMap := waitWriters(parallelWriters, true, doneCh)
  481. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  482. // check table entries for 2 minutes
  483. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
  484. opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
  485. cancel()
  486. fmt.Fprintf(os.Stderr, "doDeletekeys succeeded in %d msec", opTime)
  487. }
  488. // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
  489. func doWriteDeleteUniqueKeys(ips []string, args []string) {
  490. networkName := args[0]
  491. tableName := args[1]
  492. parallelWriters, _ := strconv.Atoi(args[2])
  493. writeTimeSec, _ := strconv.Atoi(args[3])
  494. doneCh := make(chan resultTuple, parallelWriters)
  495. // Enable watch of tables from clients
  496. for i := 0; i < parallelWriters; i++ {
  497. go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
  498. }
  499. waitWriters(parallelWriters, false, doneCh)
  500. // Start parallel writers that will create and delete unique keys
  501. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  502. for i := 0; i < parallelWriters; i++ {
  503. key := "key-" + strconv.Itoa(i) + "-"
  504. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  505. go writeDeleteUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  506. }
  507. // Sync with all the writers
  508. keyMap := waitWriters(parallelWriters, true, doneCh)
  509. cancel()
  510. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  511. // check table entries for 2 minutes
  512. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  513. opDBTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
  514. cancel()
  515. ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
  516. opClientTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber)
  517. cancel()
  518. fmt.Fprintf(os.Stderr, "doWriteDeleteUniqueKeys succeeded in %d msec and client %d msec", opDBTime, opClientTime)
  519. }
  520. // write-unique-keys networkName tableName numParallelWriters writeTimeSec
  521. func doWriteUniqueKeys(ips []string, args []string) {
  522. networkName := args[0]
  523. tableName := args[1]
  524. parallelWriters, _ := strconv.Atoi(args[2])
  525. writeTimeSec, _ := strconv.Atoi(args[3])
  526. doneCh := make(chan resultTuple, parallelWriters)
  527. // Enable watch of tables from clients
  528. for i := 0; i < parallelWriters; i++ {
  529. go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
  530. }
  531. waitWriters(parallelWriters, false, doneCh)
  532. // Start parallel writers that will create and delete unique keys
  533. defer close(doneCh)
  534. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  535. for i := 0; i < parallelWriters; i++ {
  536. key := "key-" + strconv.Itoa(i) + "-"
  537. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  538. go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  539. }
  540. // Sync with all the writers
  541. keyMap := waitWriters(parallelWriters, true, doneCh)
  542. cancel()
  543. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  544. // check table entries for 2 minutes
  545. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  546. opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
  547. cancel()
  548. fmt.Fprintf(os.Stderr, "doWriteUniqueKeys succeeded in %d msec", opTime)
  549. }
  550. // write-delete-leave-join networkName tableName numParallelWriters writeTimeSec
  551. func doWriteDeleteLeaveJoin(ips []string, args []string) {
  552. networkName := args[0]
  553. tableName := args[1]
  554. parallelWriters, _ := strconv.Atoi(args[2])
  555. writeTimeSec, _ := strconv.Atoi(args[3])
  556. // Start parallel writers that will create and delete unique keys
  557. doneCh := make(chan resultTuple, parallelWriters)
  558. defer close(doneCh)
  559. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  560. for i := 0; i < parallelWriters; i++ {
  561. key := "key-" + strconv.Itoa(i) + "-"
  562. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  563. go writeDeleteLeaveJoin(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  564. }
  565. // Sync with all the writers
  566. keyMap := waitWriters(parallelWriters, true, doneCh)
  567. cancel()
  568. logrus.Infof("Written a total of %d keys on the cluster", keyMap["totalKeys"])
  569. // check table entries for 2 minutes
  570. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  571. opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
  572. cancel()
  573. fmt.Fprintf(os.Stderr, "doWriteDeleteLeaveJoin succeeded in %d msec", opTime)
  574. }
  575. // write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec
  576. func doWriteDeleteWaitLeaveJoin(ips []string, args []string) {
  577. networkName := args[0]
  578. tableName := args[1]
  579. parallelWriters, _ := strconv.Atoi(args[2])
  580. writeTimeSec, _ := strconv.Atoi(args[3])
  581. // Start parallel writers that will create and delete unique keys
  582. doneCh := make(chan resultTuple, parallelWriters)
  583. defer close(doneCh)
  584. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  585. for i := 0; i < parallelWriters; i++ {
  586. key := "key-" + strconv.Itoa(i) + "-"
  587. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  588. go writeDeleteUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  589. }
  590. // Sync with all the writers
  591. keyMap := waitWriters(parallelWriters, true, doneCh)
  592. cancel()
  593. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  594. // The writers will leave the network
  595. for i := 0; i < parallelWriters; i++ {
  596. logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
  597. go leaveNetwork(ips[i], servicePort, networkName, doneCh)
  598. }
  599. waitWriters(parallelWriters, false, doneCh)
  600. // Give some time
  601. time.Sleep(100 * time.Millisecond)
  602. // The writers will join the network
  603. for i := 0; i < parallelWriters; i++ {
  604. logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i])
  605. go joinNetwork(ips[i], servicePort, networkName, doneCh)
  606. }
  607. waitWriters(parallelWriters, false, doneCh)
  608. // check table entries for 2 minutes
  609. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  610. opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
  611. cancel()
  612. fmt.Fprintf(os.Stderr, "doWriteDeleteWaitLeaveJoin succeeded in %d msec", opTime)
  613. }
  614. // write-wait-leave networkName tableName numParallelWriters writeTimeSec
  615. func doWriteWaitLeave(ips []string, args []string) {
  616. networkName := args[0]
  617. tableName := args[1]
  618. parallelWriters, _ := strconv.Atoi(args[2])
  619. writeTimeSec, _ := strconv.Atoi(args[3])
  620. // Start parallel writers that will create and delete unique keys
  621. doneCh := make(chan resultTuple, parallelWriters)
  622. defer close(doneCh)
  623. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  624. for i := 0; i < parallelWriters; i++ {
  625. key := "key-" + strconv.Itoa(i) + "-"
  626. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  627. go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  628. }
  629. // Sync with all the writers
  630. keyMap := waitWriters(parallelWriters, true, doneCh)
  631. cancel()
  632. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  633. // The writers will leave the network
  634. for i := 0; i < parallelWriters; i++ {
  635. logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
  636. go leaveNetwork(ips[i], servicePort, networkName, doneCh)
  637. }
  638. waitWriters(parallelWriters, false, doneCh)
  639. // check table entries for 2 minutes
  640. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  641. opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
  642. cancel()
  643. fmt.Fprintf(os.Stderr, "doWriteLeaveJoin succeeded in %d msec", opTime)
  644. }
  645. // write-wait-leave-join networkName tableName numParallelWriters writeTimeSec numParallelLeaver
  646. func doWriteWaitLeaveJoin(ips []string, args []string) {
  647. networkName := args[0]
  648. tableName := args[1]
  649. parallelWriters, _ := strconv.Atoi(args[2])
  650. writeTimeSec, _ := strconv.Atoi(args[3])
  651. parallelLeaver, _ := strconv.Atoi(args[4])
  652. // Start parallel writers that will create and delete unique keys
  653. doneCh := make(chan resultTuple, parallelWriters)
  654. defer close(doneCh)
  655. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  656. for i := 0; i < parallelWriters; i++ {
  657. key := "key-" + strconv.Itoa(i) + "-"
  658. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  659. go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  660. }
  661. // Sync with all the writers
  662. keyMap := waitWriters(parallelWriters, true, doneCh)
  663. cancel()
  664. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  665. keysExpected := keyMap[totalWrittenKeys]
  666. // The Leavers will leave the network
  667. for i := 0; i < parallelLeaver; i++ {
  668. logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
  669. go leaveNetwork(ips[i], servicePort, networkName, doneCh)
  670. // Once a node leave all the keys written previously will be deleted, so the expected keys will consider that as removed
  671. keysExpected -= keyMap[ips[i]]
  672. }
  673. waitWriters(parallelLeaver, false, doneCh)
  674. // Give some time
  675. time.Sleep(100 * time.Millisecond)
  676. // The writers will join the network
  677. for i := 0; i < parallelLeaver; i++ {
  678. logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i])
  679. go joinNetwork(ips[i], servicePort, networkName, doneCh)
  680. }
  681. waitWriters(parallelLeaver, false, doneCh)
  682. // check table entries for 2 minutes
  683. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  684. opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber)
  685. cancel()
  686. fmt.Fprintf(os.Stderr, "doWriteWaitLeaveJoin succeeded in %d msec", opTime)
  687. }
  688. var cmdArgChec = map[string]int{
  689. "debug": 0,
  690. "fail": 0,
  691. "ready": 2,
  692. "join": 2,
  693. "leave": 2,
  694. "join-network": 3,
  695. "leave-network": 3,
  696. "cluster-peers": 5,
  697. "network-peers": 5,
  698. "write-delete-unique-keys": 7,
  699. }
  700. // Client is a client
  701. func Client(args []string) {
  702. logrus.Infof("[CLIENT] Starting with arguments %v", args)
  703. command := args[0]
  704. if len(args) < cmdArgChec[command] {
  705. log.Fatalf("Command %s requires %d arguments, passed %d, aborting...", command, cmdArgChec[command], len(args))
  706. }
  707. switch command {
  708. case "debug":
  709. time.Sleep(1 * time.Hour)
  710. os.Exit(0)
  711. case "fail":
  712. log.Fatalf("Test error condition with message: error error error")
  713. }
  714. serviceName := args[1]
  715. ips, _ := net.LookupHost("tasks." + serviceName)
  716. logrus.Infof("got the ips %v", ips)
  717. if len(ips) == 0 {
  718. log.Fatalf("Cannot resolve any IP for the service tasks.%s", serviceName)
  719. }
  720. servicePort = args[2]
  721. commandArgs := args[3:]
  722. logrus.Infof("Executing %s with args:%v", command, commandArgs)
  723. switch command {
  724. case "ready":
  725. doReady(ips)
  726. case "join":
  727. doJoin(ips)
  728. case "leave":
  729. case "cluster-peers":
  730. // cluster-peers maxRetry
  731. doClusterPeers(ips, commandArgs)
  732. case "join-network":
  733. // join-network networkName
  734. doJoinNetwork(ips, commandArgs)
  735. case "leave-network":
  736. // leave-network networkName
  737. doLeaveNetwork(ips, commandArgs)
  738. case "network-peers":
  739. // network-peers networkName expectedNumberPeers maxRetry
  740. doNetworkPeers(ips, commandArgs)
  741. // case "network-stats-entries":
  742. // // network-stats-entries networkName maxRetry
  743. // doNetworkPeers(ips, commandArgs)
  744. case "network-stats-queue":
  745. // network-stats-queue networkName <lt/gt> queueSize
  746. doNetworkStatsQueue(ips, commandArgs)
  747. case "write-keys":
  748. // write-keys networkName tableName parallelWriters numberOfKeysEach
  749. doWriteKeys(ips, commandArgs)
  750. case "delete-keys":
  751. // delete-keys networkName tableName parallelWriters numberOfKeysEach
  752. doDeleteKeys(ips, commandArgs)
  753. case "write-unique-keys":
  754. // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
  755. doWriteUniqueKeys(ips, commandArgs)
  756. case "write-delete-unique-keys":
  757. // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
  758. doWriteDeleteUniqueKeys(ips, commandArgs)
  759. case "write-delete-leave-join":
  760. // write-delete-leave-join networkName tableName numParallelWriters writeTimeSec
  761. doWriteDeleteLeaveJoin(ips, commandArgs)
  762. case "write-delete-wait-leave-join":
  763. // write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec
  764. doWriteDeleteWaitLeaveJoin(ips, commandArgs)
  765. case "write-wait-leave":
  766. // write-wait-leave networkName tableName numParallelWriters writeTimeSec
  767. doWriteWaitLeave(ips, commandArgs)
  768. case "write-wait-leave-join":
  769. // write-wait-leave networkName tableName numParallelWriters writeTimeSec
  770. doWriteWaitLeaveJoin(ips, commandArgs)
  771. default:
  772. log.Fatalf("Command %s not recognized", command)
  773. }
  774. }