flw.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. package zk
  2. import (
  3. "bufio"
  4. "bytes"
  5. "fmt"
  6. "io/ioutil"
  7. "math/big"
  8. "net"
  9. "regexp"
  10. "strconv"
  11. "time"
  12. )
  13. // FLWSrvr is a FourLetterWord helper function. In particular, this function pulls the srvr output
  14. // from the zookeeper instances and parses the output. A slice of *ServerStats structs are returned
  15. // as well as a boolean value to indicate whether this function processed successfully.
  16. //
  17. // If the boolean value is false there was a problem. If the *ServerStats slice is empty or nil,
  18. // then the error happened before we started to obtain 'srvr' values. Otherwise, one of the
  19. // servers had an issue and the "Error" value in the struct should be inspected to determine
  20. // which server had the issue.
  21. func FLWSrvr(servers []string, timeout time.Duration) ([]*ServerStats, bool) {
  22. // different parts of the regular expression that are required to parse the srvr output
  23. var (
  24. zrVer = `^Zookeeper version: ([A-Za-z0-9\.\-]+), built on (\d\d/\d\d/\d\d\d\d \d\d:\d\d [A-Za-z0-9:\+\-]+)`
  25. zrLat = `^Latency min/avg/max: (\d+)/(\d+)/(\d+)`
  26. zrNet = `^Received: (\d+).*\n^Sent: (\d+).*\n^Connections: (\d+).*\n^Outstanding: (\d+)`
  27. zrState = `^Zxid: (0x[A-Za-z0-9]+).*\n^Mode: (\w+).*\n^Node count: (\d+)`
  28. )
  29. // build the regex from the pieces above
  30. re, err := regexp.Compile(fmt.Sprintf(`(?m:\A%v.*\n%v.*\n%v.*\n%v)`, zrVer, zrLat, zrNet, zrState))
  31. if err != nil {
  32. return nil, false
  33. }
  34. imOk := true
  35. servers = FormatServers(servers)
  36. ss := make([]*ServerStats, len(servers))
  37. for i := range ss {
  38. response, err := fourLetterWord(servers[i], "srvr", timeout)
  39. if err != nil {
  40. ss[i] = &ServerStats{Error: err}
  41. imOk = false
  42. continue
  43. }
  44. match := re.FindAllStringSubmatch(string(response), -1)[0][1:]
  45. if match == nil {
  46. err := fmt.Errorf("unable to parse fields from zookeeper response (no regex matches)")
  47. ss[i] = &ServerStats{Error: err}
  48. imOk = false
  49. continue
  50. }
  51. // determine current server
  52. var srvrMode Mode
  53. switch match[10] {
  54. case "leader":
  55. srvrMode = ModeLeader
  56. case "follower":
  57. srvrMode = ModeFollower
  58. case "standalone":
  59. srvrMode = ModeStandalone
  60. default:
  61. srvrMode = ModeUnknown
  62. }
  63. buildTime, err := time.Parse("01/02/2006 15:04 MST", match[1])
  64. if err != nil {
  65. ss[i] = &ServerStats{Error: err}
  66. imOk = false
  67. continue
  68. }
  69. parsedInt, err := strconv.ParseInt(match[9], 0, 64)
  70. if err != nil {
  71. ss[i] = &ServerStats{Error: err}
  72. imOk = false
  73. continue
  74. }
  75. // the ZxID value is an int64 with two int32s packed inside
  76. // the high int32 is the epoch (i.e., number of leader elections)
  77. // the low int32 is the counter
  78. epoch := int32(parsedInt >> 32)
  79. counter := int32(parsedInt & 0xFFFFFFFF)
  80. // within the regex above, these values must be numerical
  81. // so we can avoid useless checking of the error return value
  82. minLatency, _ := strconv.ParseInt(match[2], 0, 64)
  83. avgLatency, _ := strconv.ParseInt(match[3], 0, 64)
  84. maxLatency, _ := strconv.ParseInt(match[4], 0, 64)
  85. recv, _ := strconv.ParseInt(match[5], 0, 64)
  86. sent, _ := strconv.ParseInt(match[6], 0, 64)
  87. cons, _ := strconv.ParseInt(match[7], 0, 64)
  88. outs, _ := strconv.ParseInt(match[8], 0, 64)
  89. ncnt, _ := strconv.ParseInt(match[11], 0, 64)
  90. ss[i] = &ServerStats{
  91. Sent: sent,
  92. Received: recv,
  93. NodeCount: ncnt,
  94. MinLatency: minLatency,
  95. AvgLatency: avgLatency,
  96. MaxLatency: maxLatency,
  97. Connections: cons,
  98. Outstanding: outs,
  99. Epoch: epoch,
  100. Counter: counter,
  101. BuildTime: buildTime,
  102. Mode: srvrMode,
  103. Version: match[0],
  104. }
  105. }
  106. return ss, imOk
  107. }
  108. // FLWRuok is a FourLetterWord helper function. In particular, this function
  109. // pulls the ruok output from each server.
  110. func FLWRuok(servers []string, timeout time.Duration) []bool {
  111. servers = FormatServers(servers)
  112. oks := make([]bool, len(servers))
  113. for i := range oks {
  114. response, err := fourLetterWord(servers[i], "ruok", timeout)
  115. if err != nil {
  116. continue
  117. }
  118. if bytes.Equal(response[:4], []byte("imok")) {
  119. oks[i] = true
  120. }
  121. }
  122. return oks
  123. }
  124. // FLWCons is a FourLetterWord helper function. In particular, this function
  125. // pulls the ruok output from each server.
  126. //
  127. // As with FLWSrvr, the boolean value indicates whether one of the requests had
  128. // an issue. The Clients struct has an Error value that can be checked.
  129. func FLWCons(servers []string, timeout time.Duration) ([]*ServerClients, bool) {
  130. var (
  131. zrAddr = `^ /((?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?):(?:\d+))\[\d+\]`
  132. zrPac = `\(queued=(\d+),recved=(\d+),sent=(\d+),sid=(0x[A-Za-z0-9]+),lop=(\w+),est=(\d+),to=(\d+),`
  133. zrSesh = `lcxid=(0x[A-Za-z0-9]+),lzxid=(0x[A-Za-z0-9]+),lresp=(\d+),llat=(\d+),minlat=(\d+),avglat=(\d+),maxlat=(\d+)\)`
  134. )
  135. re, err := regexp.Compile(fmt.Sprintf("%v%v%v", zrAddr, zrPac, zrSesh))
  136. if err != nil {
  137. return nil, false
  138. }
  139. servers = FormatServers(servers)
  140. sc := make([]*ServerClients, len(servers))
  141. imOk := true
  142. for i := range sc {
  143. response, err := fourLetterWord(servers[i], "cons", timeout)
  144. if err != nil {
  145. sc[i] = &ServerClients{Error: err}
  146. imOk = false
  147. continue
  148. }
  149. scan := bufio.NewScanner(bytes.NewReader(response))
  150. var clients []*ServerClient
  151. for scan.Scan() {
  152. line := scan.Bytes()
  153. if len(line) == 0 {
  154. continue
  155. }
  156. m := re.FindAllStringSubmatch(string(line), -1)
  157. if m == nil {
  158. err := fmt.Errorf("unable to parse fields from zookeeper response (no regex matches)")
  159. sc[i] = &ServerClients{Error: err}
  160. imOk = false
  161. continue
  162. }
  163. match := m[0][1:]
  164. queued, _ := strconv.ParseInt(match[1], 0, 64)
  165. recvd, _ := strconv.ParseInt(match[2], 0, 64)
  166. sent, _ := strconv.ParseInt(match[3], 0, 64)
  167. sid, _ := strconv.ParseInt(match[4], 0, 64)
  168. est, _ := strconv.ParseInt(match[6], 0, 64)
  169. timeout, _ := strconv.ParseInt(match[7], 0, 32)
  170. lresp, _ := strconv.ParseInt(match[10], 0, 64)
  171. llat, _ := strconv.ParseInt(match[11], 0, 32)
  172. minlat, _ := strconv.ParseInt(match[12], 0, 32)
  173. avglat, _ := strconv.ParseInt(match[13], 0, 32)
  174. maxlat, _ := strconv.ParseInt(match[14], 0, 32)
  175. // zookeeper returns a value, '0xffffffffffffffff', as the
  176. // Lzxid for PING requests in the 'cons' output.
  177. // unfortunately, in Go that is an invalid int64 and is not represented
  178. // as -1.
  179. // However, converting the string value to a big.Int and then back to
  180. // and int64 properly sets the value to -1
  181. lzxid, ok := new(big.Int).SetString(match[9], 0)
  182. var errVal error
  183. if !ok {
  184. errVal = fmt.Errorf("failed to convert lzxid value to big.Int")
  185. imOk = false
  186. }
  187. lcxid, ok := new(big.Int).SetString(match[8], 0)
  188. if !ok && errVal == nil {
  189. errVal = fmt.Errorf("failed to convert lcxid value to big.Int")
  190. imOk = false
  191. }
  192. clients = append(clients, &ServerClient{
  193. Queued: queued,
  194. Received: recvd,
  195. Sent: sent,
  196. SessionID: sid,
  197. Lcxid: lcxid.Int64(),
  198. Lzxid: lzxid.Int64(),
  199. Timeout: int32(timeout),
  200. LastLatency: int32(llat),
  201. MinLatency: int32(minlat),
  202. AvgLatency: int32(avglat),
  203. MaxLatency: int32(maxlat),
  204. Established: time.Unix(est, 0),
  205. LastResponse: time.Unix(lresp, 0),
  206. Addr: match[0],
  207. LastOperation: match[5],
  208. Error: errVal,
  209. })
  210. }
  211. sc[i] = &ServerClients{Clients: clients}
  212. }
  213. return sc, imOk
  214. }
  215. func fourLetterWord(server, command string, timeout time.Duration) ([]byte, error) {
  216. conn, err := net.DialTimeout("tcp", server, timeout)
  217. if err != nil {
  218. return nil, err
  219. }
  220. // the zookeeper server should automatically close this socket
  221. // once the command has been processed, but better safe than sorry
  222. defer conn.Close()
  223. conn.SetWriteDeadline(time.Now().Add(timeout))
  224. _, err = conn.Write([]byte(command))
  225. if err != nil {
  226. return nil, err
  227. }
  228. conn.SetReadDeadline(time.Now().Add(timeout))
  229. resp, err := ioutil.ReadAll(conn)
  230. if err != nil {
  231. return nil, err
  232. }
  233. return resp, nil
  234. }