ndbClient.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861
  1. package dbclient
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "log"
  7. "net"
  8. "net/http"
  9. "os"
  10. "regexp"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/sirupsen/logrus"
  15. )
  16. var servicePort string
  17. const totalWrittenKeys string = "totalKeys"
  18. type resultTuple struct {
  19. id string
  20. result int
  21. }
  22. func httpGetFatalError(ip, port, path string) {
  23. body, err := httpGet(ip, port, path)
  24. if err != nil || !strings.Contains(string(body), "OK") {
  25. log.Fatalf("[%s] error %s %s", path, err, body)
  26. }
  27. }
  28. func httpGet(ip, port, path string) ([]byte, error) {
  29. resp, err := http.Get("http://" + ip + ":" + port + path)
  30. if err != nil {
  31. logrus.Errorf("httpGet error:%s", err)
  32. return nil, err
  33. }
  34. defer resp.Body.Close()
  35. body, err := io.ReadAll(resp.Body)
  36. return body, err
  37. }
  38. func joinCluster(ip, port string, members []string, doneCh chan resultTuple) {
  39. httpGetFatalError(ip, port, "/join?members="+strings.Join(members, ","))
  40. if doneCh != nil {
  41. doneCh <- resultTuple{id: ip, result: 0}
  42. }
  43. }
  44. func joinNetwork(ip, port, network string, doneCh chan resultTuple) {
  45. httpGetFatalError(ip, port, "/joinnetwork?nid="+network)
  46. if doneCh != nil {
  47. doneCh <- resultTuple{id: ip, result: 0}
  48. }
  49. }
  50. func leaveNetwork(ip, port, network string, doneCh chan resultTuple) {
  51. httpGetFatalError(ip, port, "/leavenetwork?nid="+network)
  52. if doneCh != nil {
  53. doneCh <- resultTuple{id: ip, result: 0}
  54. }
  55. }
  56. func writeTableKey(ip, port, networkName, tableName, key string) {
  57. createPath := "/createentry?unsafe&nid=" + networkName + "&tname=" + tableName + "&value=v&key="
  58. httpGetFatalError(ip, port, createPath+key)
  59. }
  60. func deleteTableKey(ip, port, networkName, tableName, key string) {
  61. deletePath := "/deleteentry?nid=" + networkName + "&tname=" + tableName + "&key="
  62. httpGetFatalError(ip, port, deletePath+key)
  63. }
  64. func clusterPeersNumber(ip, port string, doneCh chan resultTuple) {
  65. body, err := httpGet(ip, port, "/clusterpeers")
  66. if err != nil {
  67. logrus.Errorf("clusterPeers %s there was an error: %s", ip, err)
  68. doneCh <- resultTuple{id: ip, result: -1}
  69. return
  70. }
  71. peersRegexp := regexp.MustCompile(`total entries: ([0-9]+)`)
  72. peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1])
  73. doneCh <- resultTuple{id: ip, result: peersNum}
  74. }
  75. func networkPeersNumber(ip, port, networkName string, doneCh chan resultTuple) {
  76. body, err := httpGet(ip, port, "/networkpeers?nid="+networkName)
  77. if err != nil {
  78. logrus.Errorf("networkPeersNumber %s there was an error: %s", ip, err)
  79. doneCh <- resultTuple{id: ip, result: -1}
  80. return
  81. }
  82. peersRegexp := regexp.MustCompile(`total entries: ([0-9]+)`)
  83. peersNum, _ := strconv.Atoi(peersRegexp.FindStringSubmatch(string(body))[1])
  84. doneCh <- resultTuple{id: ip, result: peersNum}
  85. }
  86. func dbTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan resultTuple) {
  87. body, err := httpGet(ip, port, "/gettable?nid="+networkName+"&tname="+tableName)
  88. if err != nil {
  89. logrus.Errorf("tableEntriesNumber %s there was an error: %s", ip, err)
  90. doneCh <- resultTuple{id: ip, result: -1}
  91. return
  92. }
  93. elementsRegexp := regexp.MustCompile(`total entries: ([0-9]+)`)
  94. entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
  95. doneCh <- resultTuple{id: ip, result: entriesNum}
  96. }
  97. func dbQueueLength(ip, port, networkName string, doneCh chan resultTuple) {
  98. body, err := httpGet(ip, port, "/networkstats?nid="+networkName)
  99. if err != nil {
  100. logrus.Errorf("queueLength %s there was an error: %s", ip, err)
  101. doneCh <- resultTuple{id: ip, result: -1}
  102. return
  103. }
  104. elementsRegexp := regexp.MustCompile(`qlen: ([0-9]+)`)
  105. entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
  106. doneCh <- resultTuple{id: ip, result: entriesNum}
  107. }
  108. func clientWatchTable(ip, port, networkName, tableName string, doneCh chan resultTuple) {
  109. httpGetFatalError(ip, port, "/watchtable?nid="+networkName+"&tname="+tableName)
  110. if doneCh != nil {
  111. doneCh <- resultTuple{id: ip, result: 0}
  112. }
  113. }
  114. func clientTableEntriesNumber(ip, port, networkName, tableName string, doneCh chan resultTuple) {
  115. body, err := httpGet(ip, port, "/watchedtableentries?nid="+networkName+"&tname="+tableName)
  116. if err != nil {
  117. logrus.Errorf("clientTableEntriesNumber %s there was an error: %s", ip, err)
  118. doneCh <- resultTuple{id: ip, result: -1}
  119. return
  120. }
  121. elementsRegexp := regexp.MustCompile(`total elements: ([0-9]+)`)
  122. entriesNum, _ := strconv.Atoi(elementsRegexp.FindStringSubmatch(string(body))[1])
  123. doneCh <- resultTuple{id: ip, result: entriesNum}
  124. }
  125. func writeKeysNumber(ip, port, networkName, tableName, key string, number int, doneCh chan resultTuple) {
  126. x := 0
  127. for ; x < number; x++ {
  128. k := key + strconv.Itoa(x)
  129. // write key
  130. writeTableKey(ip, port, networkName, tableName, k)
  131. }
  132. doneCh <- resultTuple{id: ip, result: x}
  133. }
  134. func deleteKeysNumber(ip, port, networkName, tableName, key string, number int, doneCh chan resultTuple) {
  135. x := 0
  136. for ; x < number; x++ {
  137. k := key + strconv.Itoa(x)
  138. // write key
  139. deleteTableKey(ip, port, networkName, tableName, k)
  140. }
  141. doneCh <- resultTuple{id: ip, result: x}
  142. }
  143. func writeUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
  144. for x := 0; ; x++ {
  145. select {
  146. case <-ctx.Done():
  147. doneCh <- resultTuple{id: ip, result: x}
  148. return
  149. default:
  150. k := key + strconv.Itoa(x)
  151. // write key
  152. writeTableKey(ip, port, networkName, tableName, k)
  153. // give time to send out key writes
  154. time.Sleep(100 * time.Millisecond)
  155. }
  156. }
  157. }
  158. func writeDeleteUniqueKeys(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
  159. for x := 0; ; x++ {
  160. select {
  161. case <-ctx.Done():
  162. doneCh <- resultTuple{id: ip, result: x}
  163. return
  164. default:
  165. k := key + strconv.Itoa(x)
  166. // write key
  167. writeTableKey(ip, port, networkName, tableName, k)
  168. // give time to send out key writes
  169. time.Sleep(100 * time.Millisecond)
  170. // delete key
  171. deleteTableKey(ip, port, networkName, tableName, k)
  172. }
  173. }
  174. }
  175. func writeDeleteLeaveJoin(ctx context.Context, ip, port, networkName, tableName, key string, doneCh chan resultTuple) {
  176. for x := 0; ; x++ {
  177. select {
  178. case <-ctx.Done():
  179. doneCh <- resultTuple{id: ip, result: x}
  180. return
  181. default:
  182. k := key + strconv.Itoa(x)
  183. // write key
  184. writeTableKey(ip, port, networkName, tableName, k)
  185. time.Sleep(100 * time.Millisecond)
  186. // delete key
  187. deleteTableKey(ip, port, networkName, tableName, k)
  188. // give some time
  189. time.Sleep(100 * time.Millisecond)
  190. // leave network
  191. leaveNetwork(ip, port, networkName, nil)
  192. // join network
  193. joinNetwork(ip, port, networkName, nil)
  194. }
  195. }
  196. }
  197. func ready(ip, port string, doneCh chan resultTuple) {
  198. for {
  199. body, err := httpGet(ip, port, "/ready")
  200. if err != nil || !strings.Contains(string(body), "OK") {
  201. time.Sleep(500 * time.Millisecond)
  202. continue
  203. }
  204. // success
  205. break
  206. }
  207. // notify the completion
  208. doneCh <- resultTuple{id: ip, result: 0}
  209. }
  210. func checkTable(ctx context.Context, ips []string, port, networkName, tableName string, expectedEntries int, fn func(string, string, string, string, chan resultTuple)) (opTime time.Duration) {
  211. startTime := time.Now().UnixNano()
  212. var successTime int64
  213. // Loop for 2 minutes to guarantee that the result is stable
  214. for {
  215. select {
  216. case <-ctx.Done():
  217. // Validate test success, if the time is set means that all the tables are empty
  218. if successTime != 0 {
  219. opTime = time.Duration(successTime-startTime) / time.Millisecond
  220. logrus.Infof("Check table passed, the cluster converged in %d msec", opTime)
  221. return
  222. }
  223. log.Fatal("Test failed, there is still entries in the tables of the nodes")
  224. default:
  225. logrus.Infof("Checking table %s expected %d", tableName, expectedEntries)
  226. doneCh := make(chan resultTuple, len(ips))
  227. for _, ip := range ips {
  228. go fn(ip, servicePort, networkName, tableName, doneCh)
  229. }
  230. nodesWithCorrectEntriesNum := 0
  231. for i := len(ips); i > 0; i-- {
  232. tableEntries := <-doneCh
  233. logrus.Infof("Node %s has %d entries", tableEntries.id, tableEntries.result)
  234. if tableEntries.result == expectedEntries {
  235. nodesWithCorrectEntriesNum++
  236. }
  237. }
  238. close(doneCh)
  239. if nodesWithCorrectEntriesNum == len(ips) {
  240. if successTime == 0 {
  241. successTime = time.Now().UnixNano()
  242. logrus.Infof("Success after %d msec", time.Duration(successTime-startTime)/time.Millisecond)
  243. }
  244. } else {
  245. successTime = 0
  246. }
  247. time.Sleep(10 * time.Second)
  248. }
  249. }
  250. }
  251. func waitWriters(parallelWriters int, mustWrite bool, doneCh chan resultTuple) map[string]int {
  252. var totalKeys int
  253. resultTable := make(map[string]int)
  254. for i := 0; i < parallelWriters; i++ {
  255. logrus.Infof("Waiting for %d workers", parallelWriters-i)
  256. workerReturn := <-doneCh
  257. totalKeys += workerReturn.result
  258. if mustWrite && workerReturn.result == 0 {
  259. log.Fatalf("The worker %s did not write any key %d == 0", workerReturn.id, workerReturn.result)
  260. }
  261. if !mustWrite && workerReturn.result != 0 {
  262. log.Fatalf("The worker %s was supposed to return 0 instead %d != 0", workerReturn.id, workerReturn.result)
  263. }
  264. if mustWrite {
  265. resultTable[workerReturn.id] = workerReturn.result
  266. logrus.Infof("The worker %s wrote %d keys", workerReturn.id, workerReturn.result)
  267. }
  268. }
  269. resultTable[totalWrittenKeys] = totalKeys
  270. return resultTable
  271. }
  272. // ready
  273. func doReady(ips []string) {
  274. doneCh := make(chan resultTuple, len(ips))
  275. // check all the nodes
  276. for _, ip := range ips {
  277. go ready(ip, servicePort, doneCh)
  278. }
  279. // wait for the readiness of all nodes
  280. for i := len(ips); i > 0; i-- {
  281. <-doneCh
  282. }
  283. close(doneCh)
  284. }
  285. // join
  286. func doJoin(ips []string) {
  287. doneCh := make(chan resultTuple, len(ips))
  288. // check all the nodes
  289. for i, ip := range ips {
  290. members := append([]string(nil), ips[:i]...)
  291. members = append(members, ips[i+1:]...)
  292. go joinCluster(ip, servicePort, members, doneCh)
  293. }
  294. // wait for the readiness of all nodes
  295. for i := len(ips); i > 0; i-- {
  296. <-doneCh
  297. }
  298. close(doneCh)
  299. }
  300. // cluster-peers expectedNumberPeers maxRetry
  301. func doClusterPeers(ips []string, args []string) {
  302. doneCh := make(chan resultTuple, len(ips))
  303. expectedPeers, _ := strconv.Atoi(args[0])
  304. maxRetry, _ := strconv.Atoi(args[1])
  305. for retry := 0; retry < maxRetry; retry++ {
  306. // check all the nodes
  307. for _, ip := range ips {
  308. go clusterPeersNumber(ip, servicePort, doneCh)
  309. }
  310. var failed bool
  311. // wait for the readiness of all nodes
  312. for i := len(ips); i > 0; i-- {
  313. node := <-doneCh
  314. if node.result != expectedPeers {
  315. failed = true
  316. if retry == maxRetry-1 {
  317. log.Fatalf("Expected peers from %s mismatch %d != %d", node.id, expectedPeers, node.result)
  318. } else {
  319. logrus.Warnf("Expected peers from %s mismatch %d != %d", node.id, expectedPeers, node.result)
  320. }
  321. time.Sleep(1 * time.Second)
  322. }
  323. }
  324. // check if needs retry
  325. if !failed {
  326. break
  327. }
  328. }
  329. close(doneCh)
  330. }
  331. // join-network networkName
  332. func doJoinNetwork(ips []string, args []string) {
  333. doneCh := make(chan resultTuple, len(ips))
  334. // check all the nodes
  335. for _, ip := range ips {
  336. go joinNetwork(ip, servicePort, args[0], doneCh)
  337. }
  338. // wait for the readiness of all nodes
  339. for i := len(ips); i > 0; i-- {
  340. <-doneCh
  341. }
  342. close(doneCh)
  343. }
  344. // leave-network networkName
  345. func doLeaveNetwork(ips []string, args []string) {
  346. doneCh := make(chan resultTuple, len(ips))
  347. // check all the nodes
  348. for _, ip := range ips {
  349. go leaveNetwork(ip, servicePort, args[0], doneCh)
  350. }
  351. // wait for the readiness of all nodes
  352. for i := len(ips); i > 0; i-- {
  353. <-doneCh
  354. }
  355. close(doneCh)
  356. }
  357. // network-peers networkName expectedNumberPeers maxRetry
  358. func doNetworkPeers(ips []string, args []string) {
  359. doneCh := make(chan resultTuple, len(ips))
  360. networkName := args[0]
  361. expectedPeers, _ := strconv.Atoi(args[1])
  362. maxRetry, _ := strconv.Atoi(args[2])
  363. for retry := 0; retry < maxRetry; retry++ {
  364. // check all the nodes
  365. for _, ip := range ips {
  366. go networkPeersNumber(ip, servicePort, networkName, doneCh)
  367. }
  368. var failed bool
  369. // wait for the readiness of all nodes
  370. for i := len(ips); i > 0; i-- {
  371. node := <-doneCh
  372. if node.result != expectedPeers {
  373. failed = true
  374. if retry == maxRetry-1 {
  375. log.Fatalf("Expected peers from %s mismatch %d != %d", node.id, expectedPeers, node.result)
  376. } else {
  377. logrus.Warnf("Expected peers from %s mismatch %d != %d", node.id, expectedPeers, node.result)
  378. }
  379. time.Sleep(1 * time.Second)
  380. }
  381. }
  382. // check if needs retry
  383. if !failed {
  384. break
  385. }
  386. }
  387. close(doneCh)
  388. }
  389. // network-stats-queue networkName <gt/lt> queueSize
  390. func doNetworkStatsQueue(ips []string, args []string) {
  391. doneCh := make(chan resultTuple, len(ips))
  392. networkName := args[0]
  393. comparison := args[1]
  394. size, _ := strconv.Atoi(args[2])
  395. // check all the nodes
  396. for _, ip := range ips {
  397. go dbQueueLength(ip, servicePort, networkName, doneCh)
  398. }
  399. var avgQueueSize int
  400. // wait for the readiness of all nodes
  401. for i := len(ips); i > 0; i-- {
  402. node := <-doneCh
  403. switch comparison {
  404. case "lt":
  405. if node.result > size {
  406. log.Fatalf("Expected queue size from %s to be %d < %d", node.id, node.result, size)
  407. }
  408. case "gt":
  409. if node.result < size {
  410. log.Fatalf("Expected queue size from %s to be %d > %d", node.id, node.result, size)
  411. }
  412. default:
  413. log.Fatal("unknown comparison operator")
  414. }
  415. avgQueueSize += node.result
  416. }
  417. close(doneCh)
  418. avgQueueSize /= len(ips)
  419. fmt.Fprintf(os.Stderr, "doNetworkStatsQueue succeeded with avg queue:%d", avgQueueSize)
  420. }
  421. // write-keys networkName tableName parallelWriters numberOfKeysEach
  422. func doWriteKeys(ips []string, args []string) {
  423. networkName := args[0]
  424. tableName := args[1]
  425. parallelWriters, _ := strconv.Atoi(args[2])
  426. numberOfKeys, _ := strconv.Atoi(args[3])
  427. doneCh := make(chan resultTuple, parallelWriters)
  428. // Enable watch of tables from clients
  429. for i := 0; i < parallelWriters; i++ {
  430. go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
  431. }
  432. waitWriters(parallelWriters, false, doneCh)
  433. // Start parallel writers that will create and delete unique keys
  434. defer close(doneCh)
  435. for i := 0; i < parallelWriters; i++ {
  436. key := "key-" + strconv.Itoa(i) + "-"
  437. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  438. go writeKeysNumber(ips[i], servicePort, networkName, tableName, key, numberOfKeys, doneCh)
  439. }
  440. // Sync with all the writers
  441. keyMap := waitWriters(parallelWriters, true, doneCh)
  442. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  443. // check table entries for 2 minutes
  444. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
  445. opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
  446. cancel()
  447. fmt.Fprintf(os.Stderr, "doWriteKeys succeeded in %d msec", opTime)
  448. }
  449. // delete-keys networkName tableName parallelWriters numberOfKeysEach
  450. func doDeleteKeys(ips []string, args []string) {
  451. networkName := args[0]
  452. tableName := args[1]
  453. parallelWriters, _ := strconv.Atoi(args[2])
  454. numberOfKeys, _ := strconv.Atoi(args[3])
  455. doneCh := make(chan resultTuple, parallelWriters)
  456. // Enable watch of tables from clients
  457. for i := 0; i < parallelWriters; i++ {
  458. go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
  459. }
  460. waitWriters(parallelWriters, false, doneCh)
  461. // Start parallel writers that will create and delete unique keys
  462. defer close(doneCh)
  463. for i := 0; i < parallelWriters; i++ {
  464. key := "key-" + strconv.Itoa(i) + "-"
  465. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  466. go deleteKeysNumber(ips[i], servicePort, networkName, tableName, key, numberOfKeys, doneCh)
  467. }
  468. // Sync with all the writers
  469. keyMap := waitWriters(parallelWriters, true, doneCh)
  470. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  471. // check table entries for 2 minutes
  472. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
  473. opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
  474. cancel()
  475. fmt.Fprintf(os.Stderr, "doDeletekeys succeeded in %d msec", opTime)
  476. }
  477. // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
  478. func doWriteDeleteUniqueKeys(ips []string, args []string) {
  479. networkName := args[0]
  480. tableName := args[1]
  481. parallelWriters, _ := strconv.Atoi(args[2])
  482. writeTimeSec, _ := strconv.Atoi(args[3])
  483. doneCh := make(chan resultTuple, parallelWriters)
  484. // Enable watch of tables from clients
  485. for i := 0; i < parallelWriters; i++ {
  486. go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
  487. }
  488. waitWriters(parallelWriters, false, doneCh)
  489. // Start parallel writers that will create and delete unique keys
  490. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  491. for i := 0; i < parallelWriters; i++ {
  492. key := "key-" + strconv.Itoa(i) + "-"
  493. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  494. go writeDeleteUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  495. }
  496. // Sync with all the writers
  497. keyMap := waitWriters(parallelWriters, true, doneCh)
  498. cancel()
  499. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  500. // check table entries for 2 minutes
  501. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  502. opDBTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
  503. cancel()
  504. ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
  505. opClientTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, clientTableEntriesNumber)
  506. cancel()
  507. fmt.Fprintf(os.Stderr, "doWriteDeleteUniqueKeys succeeded in %d msec and client %d msec", opDBTime, opClientTime)
  508. }
  509. // write-unique-keys networkName tableName numParallelWriters writeTimeSec
  510. func doWriteUniqueKeys(ips []string, args []string) {
  511. networkName := args[0]
  512. tableName := args[1]
  513. parallelWriters, _ := strconv.Atoi(args[2])
  514. writeTimeSec, _ := strconv.Atoi(args[3])
  515. doneCh := make(chan resultTuple, parallelWriters)
  516. // Enable watch of tables from clients
  517. for i := 0; i < parallelWriters; i++ {
  518. go clientWatchTable(ips[i], servicePort, networkName, tableName, doneCh)
  519. }
  520. waitWriters(parallelWriters, false, doneCh)
  521. // Start parallel writers that will create and delete unique keys
  522. defer close(doneCh)
  523. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  524. for i := 0; i < parallelWriters; i++ {
  525. key := "key-" + strconv.Itoa(i) + "-"
  526. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  527. go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  528. }
  529. // Sync with all the writers
  530. keyMap := waitWriters(parallelWriters, true, doneCh)
  531. cancel()
  532. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  533. // check table entries for 2 minutes
  534. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  535. opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keyMap[totalWrittenKeys], dbTableEntriesNumber)
  536. cancel()
  537. fmt.Fprintf(os.Stderr, "doWriteUniqueKeys succeeded in %d msec", opTime)
  538. }
  539. // write-delete-leave-join networkName tableName numParallelWriters writeTimeSec
  540. func doWriteDeleteLeaveJoin(ips []string, args []string) {
  541. networkName := args[0]
  542. tableName := args[1]
  543. parallelWriters, _ := strconv.Atoi(args[2])
  544. writeTimeSec, _ := strconv.Atoi(args[3])
  545. // Start parallel writers that will create and delete unique keys
  546. doneCh := make(chan resultTuple, parallelWriters)
  547. defer close(doneCh)
  548. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  549. for i := 0; i < parallelWriters; i++ {
  550. key := "key-" + strconv.Itoa(i) + "-"
  551. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  552. go writeDeleteLeaveJoin(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  553. }
  554. // Sync with all the writers
  555. keyMap := waitWriters(parallelWriters, true, doneCh)
  556. cancel()
  557. logrus.Infof("Written a total of %d keys on the cluster", keyMap["totalKeys"])
  558. // check table entries for 2 minutes
  559. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  560. opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
  561. cancel()
  562. fmt.Fprintf(os.Stderr, "doWriteDeleteLeaveJoin succeeded in %d msec", opTime)
  563. }
  564. // write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec
  565. func doWriteDeleteWaitLeaveJoin(ips []string, args []string) {
  566. networkName := args[0]
  567. tableName := args[1]
  568. parallelWriters, _ := strconv.Atoi(args[2])
  569. writeTimeSec, _ := strconv.Atoi(args[3])
  570. // Start parallel writers that will create and delete unique keys
  571. doneCh := make(chan resultTuple, parallelWriters)
  572. defer close(doneCh)
  573. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  574. for i := 0; i < parallelWriters; i++ {
  575. key := "key-" + strconv.Itoa(i) + "-"
  576. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  577. go writeDeleteUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  578. }
  579. // Sync with all the writers
  580. keyMap := waitWriters(parallelWriters, true, doneCh)
  581. cancel()
  582. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  583. // The writers will leave the network
  584. for i := 0; i < parallelWriters; i++ {
  585. logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
  586. go leaveNetwork(ips[i], servicePort, networkName, doneCh)
  587. }
  588. waitWriters(parallelWriters, false, doneCh)
  589. // Give some time
  590. time.Sleep(100 * time.Millisecond)
  591. // The writers will join the network
  592. for i := 0; i < parallelWriters; i++ {
  593. logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i])
  594. go joinNetwork(ips[i], servicePort, networkName, doneCh)
  595. }
  596. waitWriters(parallelWriters, false, doneCh)
  597. // check table entries for 2 minutes
  598. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  599. opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
  600. cancel()
  601. fmt.Fprintf(os.Stderr, "doWriteDeleteWaitLeaveJoin succeeded in %d msec", opTime)
  602. }
  603. // write-wait-leave networkName tableName numParallelWriters writeTimeSec
  604. func doWriteWaitLeave(ips []string, args []string) {
  605. networkName := args[0]
  606. tableName := args[1]
  607. parallelWriters, _ := strconv.Atoi(args[2])
  608. writeTimeSec, _ := strconv.Atoi(args[3])
  609. // Start parallel writers that will create and delete unique keys
  610. doneCh := make(chan resultTuple, parallelWriters)
  611. defer close(doneCh)
  612. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  613. for i := 0; i < parallelWriters; i++ {
  614. key := "key-" + strconv.Itoa(i) + "-"
  615. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  616. go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  617. }
  618. // Sync with all the writers
  619. keyMap := waitWriters(parallelWriters, true, doneCh)
  620. cancel()
  621. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  622. // The writers will leave the network
  623. for i := 0; i < parallelWriters; i++ {
  624. logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
  625. go leaveNetwork(ips[i], servicePort, networkName, doneCh)
  626. }
  627. waitWriters(parallelWriters, false, doneCh)
  628. // check table entries for 2 minutes
  629. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  630. opTime := checkTable(ctx, ips, servicePort, networkName, tableName, 0, dbTableEntriesNumber)
  631. cancel()
  632. fmt.Fprintf(os.Stderr, "doWriteLeaveJoin succeeded in %d msec", opTime)
  633. }
  634. // write-wait-leave-join networkName tableName numParallelWriters writeTimeSec numParallelLeaver
  635. func doWriteWaitLeaveJoin(ips []string, args []string) {
  636. networkName := args[0]
  637. tableName := args[1]
  638. parallelWriters, _ := strconv.Atoi(args[2])
  639. writeTimeSec, _ := strconv.Atoi(args[3])
  640. parallelLeaver, _ := strconv.Atoi(args[4])
  641. // Start parallel writers that will create and delete unique keys
  642. doneCh := make(chan resultTuple, parallelWriters)
  643. defer close(doneCh)
  644. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(writeTimeSec)*time.Second)
  645. for i := 0; i < parallelWriters; i++ {
  646. key := "key-" + strconv.Itoa(i) + "-"
  647. logrus.Infof("Spawn worker: %d on IP:%s", i, ips[i])
  648. go writeUniqueKeys(ctx, ips[i], servicePort, networkName, tableName, key, doneCh)
  649. }
  650. // Sync with all the writers
  651. keyMap := waitWriters(parallelWriters, true, doneCh)
  652. cancel()
  653. logrus.Infof("Written a total of %d keys on the cluster", keyMap[totalWrittenKeys])
  654. keysExpected := keyMap[totalWrittenKeys]
  655. // The Leavers will leave the network
  656. for i := 0; i < parallelLeaver; i++ {
  657. logrus.Infof("worker leaveNetwork: %d on IP:%s", i, ips[i])
  658. go leaveNetwork(ips[i], servicePort, networkName, doneCh)
  659. // Once a node leave all the keys written previously will be deleted, so the expected keys will consider that as removed
  660. keysExpected -= keyMap[ips[i]]
  661. }
  662. waitWriters(parallelLeaver, false, doneCh)
  663. // Give some time
  664. time.Sleep(100 * time.Millisecond)
  665. // The writers will join the network
  666. for i := 0; i < parallelLeaver; i++ {
  667. logrus.Infof("worker joinNetwork: %d on IP:%s", i, ips[i])
  668. go joinNetwork(ips[i], servicePort, networkName, doneCh)
  669. }
  670. waitWriters(parallelLeaver, false, doneCh)
  671. // check table entries for 2 minutes
  672. ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
  673. opTime := checkTable(ctx, ips, servicePort, networkName, tableName, keysExpected, dbTableEntriesNumber)
  674. cancel()
  675. fmt.Fprintf(os.Stderr, "doWriteWaitLeaveJoin succeeded in %d msec", opTime)
  676. }
  677. var cmdArgChec = map[string]int{
  678. "debug": 0,
  679. "fail": 0,
  680. "ready": 2,
  681. "join": 2,
  682. "leave": 2,
  683. "join-network": 3,
  684. "leave-network": 3,
  685. "cluster-peers": 5,
  686. "network-peers": 5,
  687. "write-delete-unique-keys": 7,
  688. }
  689. // Client is a client
  690. func Client(args []string) {
  691. logrus.Infof("[CLIENT] Starting with arguments %v", args)
  692. command := args[0]
  693. if len(args) < cmdArgChec[command] {
  694. log.Fatalf("Command %s requires %d arguments, passed %d, aborting...", command, cmdArgChec[command], len(args))
  695. }
  696. switch command {
  697. case "debug":
  698. time.Sleep(1 * time.Hour)
  699. os.Exit(0)
  700. case "fail":
  701. log.Fatalf("Test error condition with message: error error error")
  702. }
  703. serviceName := args[1]
  704. ips, _ := net.LookupHost("tasks." + serviceName)
  705. logrus.Infof("got the ips %v", ips)
  706. if len(ips) == 0 {
  707. log.Fatalf("Cannot resolve any IP for the service tasks.%s", serviceName)
  708. }
  709. servicePort = args[2]
  710. commandArgs := args[3:]
  711. logrus.Infof("Executing %s with args:%v", command, commandArgs)
  712. switch command {
  713. case "ready":
  714. doReady(ips)
  715. case "join":
  716. doJoin(ips)
  717. case "leave":
  718. case "cluster-peers":
  719. // cluster-peers maxRetry
  720. doClusterPeers(ips, commandArgs)
  721. case "join-network":
  722. // join-network networkName
  723. doJoinNetwork(ips, commandArgs)
  724. case "leave-network":
  725. // leave-network networkName
  726. doLeaveNetwork(ips, commandArgs)
  727. case "network-peers":
  728. // network-peers networkName expectedNumberPeers maxRetry
  729. doNetworkPeers(ips, commandArgs)
  730. // case "network-stats-entries":
  731. // // network-stats-entries networkName maxRetry
  732. // doNetworkPeers(ips, commandArgs)
  733. case "network-stats-queue":
  734. // network-stats-queue networkName <lt/gt> queueSize
  735. doNetworkStatsQueue(ips, commandArgs)
  736. case "write-keys":
  737. // write-keys networkName tableName parallelWriters numberOfKeysEach
  738. doWriteKeys(ips, commandArgs)
  739. case "delete-keys":
  740. // delete-keys networkName tableName parallelWriters numberOfKeysEach
  741. doDeleteKeys(ips, commandArgs)
  742. case "write-unique-keys":
  743. // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
  744. doWriteUniqueKeys(ips, commandArgs)
  745. case "write-delete-unique-keys":
  746. // write-delete-unique-keys networkName tableName numParallelWriters writeTimeSec
  747. doWriteDeleteUniqueKeys(ips, commandArgs)
  748. case "write-delete-leave-join":
  749. // write-delete-leave-join networkName tableName numParallelWriters writeTimeSec
  750. doWriteDeleteLeaveJoin(ips, commandArgs)
  751. case "write-delete-wait-leave-join":
  752. // write-delete-wait-leave-join networkName tableName numParallelWriters writeTimeSec
  753. doWriteDeleteWaitLeaveJoin(ips, commandArgs)
  754. case "write-wait-leave":
  755. // write-wait-leave networkName tableName numParallelWriters writeTimeSec
  756. doWriteWaitLeave(ips, commandArgs)
  757. case "write-wait-leave-join":
  758. // write-wait-leave networkName tableName numParallelWriters writeTimeSec
  759. doWriteWaitLeaveJoin(ips, commandArgs)
  760. default:
  761. log.Fatalf("Command %s not recognized", command)
  762. }
  763. }