structs.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633
  1. package zk
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "reflect"
  6. "runtime"
  7. "time"
  8. )
  9. var (
  10. ErrUnhandledFieldType = errors.New("zk: unhandled field type")
  11. ErrPtrExpected = errors.New("zk: encode/decode expect a non-nil pointer to struct")
  12. ErrShortBuffer = errors.New("zk: buffer too small")
  13. )
  14. type ACL struct {
  15. Perms int32
  16. Scheme string
  17. ID string
  18. }
  19. type Stat struct {
  20. Czxid int64 // The zxid of the change that caused this znode to be created.
  21. Mzxid int64 // The zxid of the change that last modified this znode.
  22. Ctime int64 // The time in milliseconds from epoch when this znode was created.
  23. Mtime int64 // The time in milliseconds from epoch when this znode was last modified.
  24. Version int32 // The number of changes to the data of this znode.
  25. Cversion int32 // The number of changes to the children of this znode.
  26. Aversion int32 // The number of changes to the ACL of this znode.
  27. EphemeralOwner int64 // The session id of the owner of this znode if the znode is an ephemeral node. If it is not an ephemeral node, it will be zero.
  28. DataLength int32 // The length of the data field of this znode.
  29. NumChildren int32 // The number of children of this znode.
  30. Pzxid int64 // last modified children
  31. }
  32. // ServerClient is the information for a single Zookeeper client and its session.
  33. // This is used to parse/extract the output fo the `cons` command.
  34. type ServerClient struct {
  35. Queued int64
  36. Received int64
  37. Sent int64
  38. SessionID int64
  39. Lcxid int64
  40. Lzxid int64
  41. Timeout int32
  42. LastLatency int32
  43. MinLatency int32
  44. AvgLatency int32
  45. MaxLatency int32
  46. Established time.Time
  47. LastResponse time.Time
  48. Addr string
  49. LastOperation string // maybe?
  50. Error error
  51. }
  52. // ServerClients is a struct for the FLWCons() function. It's used to provide
  53. // the list of Clients.
  54. //
  55. // This is needed because FLWCons() takes multiple servers.
  56. type ServerClients struct {
  57. Clients []*ServerClient
  58. Error error
  59. }
  60. // ServerStats is the information pulled from the Zookeeper `stat` command.
  61. type ServerStats struct {
  62. Sent int64
  63. Received int64
  64. NodeCount int64
  65. MinLatency int64
  66. AvgLatency int64
  67. MaxLatency int64
  68. Connections int64
  69. Outstanding int64
  70. Epoch int32
  71. Counter int32
  72. BuildTime time.Time
  73. Mode Mode
  74. Version string
  75. Error error
  76. }
  77. type requestHeader struct {
  78. Xid int32
  79. Opcode int32
  80. }
  81. type responseHeader struct {
  82. Xid int32
  83. Zxid int64
  84. Err ErrCode
  85. }
  86. type multiHeader struct {
  87. Type int32
  88. Done bool
  89. Err ErrCode
  90. }
  91. type auth struct {
  92. Type int32
  93. Scheme string
  94. Auth []byte
  95. }
  96. // Generic request structs
  97. type pathRequest struct {
  98. Path string
  99. }
  100. type PathVersionRequest struct {
  101. Path string
  102. Version int32
  103. }
  104. type pathWatchRequest struct {
  105. Path string
  106. Watch bool
  107. }
  108. type pathResponse struct {
  109. Path string
  110. }
  111. type statResponse struct {
  112. Stat Stat
  113. }
  114. //
  115. type CheckVersionRequest PathVersionRequest
  116. type closeRequest struct{}
  117. type closeResponse struct{}
  118. type connectRequest struct {
  119. ProtocolVersion int32
  120. LastZxidSeen int64
  121. TimeOut int32
  122. SessionID int64
  123. Passwd []byte
  124. }
  125. type connectResponse struct {
  126. ProtocolVersion int32
  127. TimeOut int32
  128. SessionID int64
  129. Passwd []byte
  130. }
  131. type CreateRequest struct {
  132. Path string
  133. Data []byte
  134. Acl []ACL
  135. Flags int32
  136. }
  137. type createResponse pathResponse
  138. type DeleteRequest PathVersionRequest
  139. type deleteResponse struct{}
  140. type errorResponse struct {
  141. Err int32
  142. }
  143. type existsRequest pathWatchRequest
  144. type existsResponse statResponse
  145. type getAclRequest pathRequest
  146. type getAclResponse struct {
  147. Acl []ACL
  148. Stat Stat
  149. }
  150. type getChildrenRequest pathRequest
  151. type getChildrenResponse struct {
  152. Children []string
  153. }
  154. type getChildren2Request pathWatchRequest
  155. type getChildren2Response struct {
  156. Children []string
  157. Stat Stat
  158. }
  159. type getDataRequest pathWatchRequest
  160. type getDataResponse struct {
  161. Data []byte
  162. Stat Stat
  163. }
  164. type getMaxChildrenRequest pathRequest
  165. type getMaxChildrenResponse struct {
  166. Max int32
  167. }
  168. type getSaslRequest struct {
  169. Token []byte
  170. }
  171. type pingRequest struct{}
  172. type pingResponse struct{}
  173. type setAclRequest struct {
  174. Path string
  175. Acl []ACL
  176. Version int32
  177. }
  178. type setAclResponse statResponse
  179. type SetDataRequest struct {
  180. Path string
  181. Data []byte
  182. Version int32
  183. }
  184. type setDataResponse statResponse
  185. type setMaxChildren struct {
  186. Path string
  187. Max int32
  188. }
  189. type setSaslRequest struct {
  190. Token string
  191. }
  192. type setSaslResponse struct {
  193. Token string
  194. }
  195. type setWatchesRequest struct {
  196. RelativeZxid int64
  197. DataWatches []string
  198. ExistWatches []string
  199. ChildWatches []string
  200. }
  201. type setWatchesResponse struct{}
  202. type syncRequest pathRequest
  203. type syncResponse pathResponse
  204. type setAuthRequest auth
  205. type setAuthResponse struct{}
  206. type multiRequestOp struct {
  207. Header multiHeader
  208. Op interface{}
  209. }
  210. type multiRequest struct {
  211. Ops []multiRequestOp
  212. DoneHeader multiHeader
  213. }
  214. type multiResponseOp struct {
  215. Header multiHeader
  216. String string
  217. Stat *Stat
  218. }
  219. type multiResponse struct {
  220. Ops []multiResponseOp
  221. DoneHeader multiHeader
  222. }
  223. func (r *multiRequest) Encode(buf []byte) (int, error) {
  224. total := 0
  225. for _, op := range r.Ops {
  226. op.Header.Done = false
  227. n, err := encodePacketValue(buf[total:], reflect.ValueOf(op))
  228. if err != nil {
  229. return total, err
  230. }
  231. total += n
  232. }
  233. r.DoneHeader.Done = true
  234. n, err := encodePacketValue(buf[total:], reflect.ValueOf(r.DoneHeader))
  235. if err != nil {
  236. return total, err
  237. }
  238. total += n
  239. return total, nil
  240. }
  241. func (r *multiRequest) Decode(buf []byte) (int, error) {
  242. r.Ops = make([]multiRequestOp, 0)
  243. r.DoneHeader = multiHeader{-1, true, -1}
  244. total := 0
  245. for {
  246. header := &multiHeader{}
  247. n, err := decodePacketValue(buf[total:], reflect.ValueOf(header))
  248. if err != nil {
  249. return total, err
  250. }
  251. total += n
  252. if header.Done {
  253. r.DoneHeader = *header
  254. break
  255. }
  256. req := requestStructForOp(header.Type)
  257. if req == nil {
  258. return total, ErrAPIError
  259. }
  260. n, err = decodePacketValue(buf[total:], reflect.ValueOf(req))
  261. if err != nil {
  262. return total, err
  263. }
  264. total += n
  265. r.Ops = append(r.Ops, multiRequestOp{*header, req})
  266. }
  267. return total, nil
  268. }
  269. func (r *multiResponse) Decode(buf []byte) (int, error) {
  270. r.Ops = make([]multiResponseOp, 0)
  271. r.DoneHeader = multiHeader{-1, true, -1}
  272. total := 0
  273. for {
  274. header := &multiHeader{}
  275. n, err := decodePacketValue(buf[total:], reflect.ValueOf(header))
  276. if err != nil {
  277. return total, err
  278. }
  279. total += n
  280. if header.Done {
  281. r.DoneHeader = *header
  282. break
  283. }
  284. res := multiResponseOp{Header: *header}
  285. var w reflect.Value
  286. switch header.Type {
  287. default:
  288. return total, ErrAPIError
  289. case opCreate:
  290. w = reflect.ValueOf(&res.String)
  291. case opSetData:
  292. res.Stat = new(Stat)
  293. w = reflect.ValueOf(res.Stat)
  294. case opCheck, opDelete:
  295. }
  296. if w.IsValid() {
  297. n, err := decodePacketValue(buf[total:], w)
  298. if err != nil {
  299. return total, err
  300. }
  301. total += n
  302. }
  303. r.Ops = append(r.Ops, res)
  304. }
  305. return total, nil
  306. }
  307. type watcherEvent struct {
  308. Type EventType
  309. State State
  310. Path string
  311. }
  312. type decoder interface {
  313. Decode(buf []byte) (int, error)
  314. }
  315. type encoder interface {
  316. Encode(buf []byte) (int, error)
  317. }
  318. func decodePacket(buf []byte, st interface{}) (n int, err error) {
  319. defer func() {
  320. if r := recover(); r != nil {
  321. if e, ok := r.(runtime.Error); ok && e.Error() == "runtime error: slice bounds out of range" {
  322. err = ErrShortBuffer
  323. } else {
  324. panic(r)
  325. }
  326. }
  327. }()
  328. v := reflect.ValueOf(st)
  329. if v.Kind() != reflect.Ptr || v.IsNil() {
  330. return 0, ErrPtrExpected
  331. }
  332. return decodePacketValue(buf, v)
  333. }
  334. func decodePacketValue(buf []byte, v reflect.Value) (int, error) {
  335. rv := v
  336. kind := v.Kind()
  337. if kind == reflect.Ptr {
  338. if v.IsNil() {
  339. v.Set(reflect.New(v.Type().Elem()))
  340. }
  341. v = v.Elem()
  342. kind = v.Kind()
  343. }
  344. n := 0
  345. switch kind {
  346. default:
  347. return n, ErrUnhandledFieldType
  348. case reflect.Struct:
  349. if de, ok := rv.Interface().(decoder); ok {
  350. return de.Decode(buf)
  351. } else if de, ok := v.Interface().(decoder); ok {
  352. return de.Decode(buf)
  353. } else {
  354. for i := 0; i < v.NumField(); i++ {
  355. field := v.Field(i)
  356. n2, err := decodePacketValue(buf[n:], field)
  357. n += n2
  358. if err != nil {
  359. return n, err
  360. }
  361. }
  362. }
  363. case reflect.Bool:
  364. v.SetBool(buf[n] != 0)
  365. n++
  366. case reflect.Int32:
  367. v.SetInt(int64(binary.BigEndian.Uint32(buf[n : n+4])))
  368. n += 4
  369. case reflect.Int64:
  370. v.SetInt(int64(binary.BigEndian.Uint64(buf[n : n+8])))
  371. n += 8
  372. case reflect.String:
  373. ln := int(binary.BigEndian.Uint32(buf[n : n+4]))
  374. v.SetString(string(buf[n+4 : n+4+ln]))
  375. n += 4 + ln
  376. case reflect.Slice:
  377. switch v.Type().Elem().Kind() {
  378. default:
  379. count := int(binary.BigEndian.Uint32(buf[n : n+4]))
  380. n += 4
  381. values := reflect.MakeSlice(v.Type(), count, count)
  382. v.Set(values)
  383. for i := 0; i < count; i++ {
  384. n2, err := decodePacketValue(buf[n:], values.Index(i))
  385. n += n2
  386. if err != nil {
  387. return n, err
  388. }
  389. }
  390. case reflect.Uint8:
  391. ln := int(int32(binary.BigEndian.Uint32(buf[n : n+4])))
  392. if ln < 0 {
  393. n += 4
  394. v.SetBytes(nil)
  395. } else {
  396. bytes := make([]byte, ln)
  397. copy(bytes, buf[n+4:n+4+ln])
  398. v.SetBytes(bytes)
  399. n += 4 + ln
  400. }
  401. }
  402. }
  403. return n, nil
  404. }
  405. func encodePacket(buf []byte, st interface{}) (n int, err error) {
  406. defer func() {
  407. if r := recover(); r != nil {
  408. if e, ok := r.(runtime.Error); ok && e.Error() == "runtime error: slice bounds out of range" {
  409. err = ErrShortBuffer
  410. } else {
  411. panic(r)
  412. }
  413. }
  414. }()
  415. v := reflect.ValueOf(st)
  416. if v.Kind() != reflect.Ptr || v.IsNil() {
  417. return 0, ErrPtrExpected
  418. }
  419. return encodePacketValue(buf, v)
  420. }
  421. func encodePacketValue(buf []byte, v reflect.Value) (int, error) {
  422. rv := v
  423. for v.Kind() == reflect.Ptr || v.Kind() == reflect.Interface {
  424. v = v.Elem()
  425. }
  426. n := 0
  427. switch v.Kind() {
  428. default:
  429. return n, ErrUnhandledFieldType
  430. case reflect.Struct:
  431. if en, ok := rv.Interface().(encoder); ok {
  432. return en.Encode(buf)
  433. } else if en, ok := v.Interface().(encoder); ok {
  434. return en.Encode(buf)
  435. } else {
  436. for i := 0; i < v.NumField(); i++ {
  437. field := v.Field(i)
  438. n2, err := encodePacketValue(buf[n:], field)
  439. n += n2
  440. if err != nil {
  441. return n, err
  442. }
  443. }
  444. }
  445. case reflect.Bool:
  446. if v.Bool() {
  447. buf[n] = 1
  448. } else {
  449. buf[n] = 0
  450. }
  451. n++
  452. case reflect.Int32:
  453. binary.BigEndian.PutUint32(buf[n:n+4], uint32(v.Int()))
  454. n += 4
  455. case reflect.Int64:
  456. binary.BigEndian.PutUint64(buf[n:n+8], uint64(v.Int()))
  457. n += 8
  458. case reflect.String:
  459. str := v.String()
  460. binary.BigEndian.PutUint32(buf[n:n+4], uint32(len(str)))
  461. copy(buf[n+4:n+4+len(str)], []byte(str))
  462. n += 4 + len(str)
  463. case reflect.Slice:
  464. switch v.Type().Elem().Kind() {
  465. default:
  466. count := v.Len()
  467. startN := n
  468. n += 4
  469. for i := 0; i < count; i++ {
  470. n2, err := encodePacketValue(buf[n:], v.Index(i))
  471. n += n2
  472. if err != nil {
  473. return n, err
  474. }
  475. }
  476. binary.BigEndian.PutUint32(buf[startN:startN+4], uint32(count))
  477. case reflect.Uint8:
  478. if v.IsNil() {
  479. binary.BigEndian.PutUint32(buf[n:n+4], uint32(0xffffffff))
  480. n += 4
  481. } else {
  482. bytes := v.Bytes()
  483. binary.BigEndian.PutUint32(buf[n:n+4], uint32(len(bytes)))
  484. copy(buf[n+4:n+4+len(bytes)], bytes)
  485. n += 4 + len(bytes)
  486. }
  487. }
  488. }
  489. return n, nil
  490. }
  491. func requestStructForOp(op int32) interface{} {
  492. switch op {
  493. case opClose:
  494. return &closeRequest{}
  495. case opCreate:
  496. return &CreateRequest{}
  497. case opDelete:
  498. return &DeleteRequest{}
  499. case opExists:
  500. return &existsRequest{}
  501. case opGetAcl:
  502. return &getAclRequest{}
  503. case opGetChildren:
  504. return &getChildrenRequest{}
  505. case opGetChildren2:
  506. return &getChildren2Request{}
  507. case opGetData:
  508. return &getDataRequest{}
  509. case opPing:
  510. return &pingRequest{}
  511. case opSetAcl:
  512. return &setAclRequest{}
  513. case opSetData:
  514. return &SetDataRequest{}
  515. case opSetWatches:
  516. return &setWatchesRequest{}
  517. case opSync:
  518. return &syncRequest{}
  519. case opSetAuth:
  520. return &setAuthRequest{}
  521. case opCheck:
  522. return &CheckVersionRequest{}
  523. case opMulti:
  524. return &multiRequest{}
  525. }
  526. return nil
  527. }
  528. func responseStructForOp(op int32) interface{} {
  529. switch op {
  530. case opClose:
  531. return &closeResponse{}
  532. case opCreate:
  533. return &createResponse{}
  534. case opDelete:
  535. return &deleteResponse{}
  536. case opExists:
  537. return &existsResponse{}
  538. case opGetAcl:
  539. return &getAclResponse{}
  540. case opGetChildren:
  541. return &getChildrenResponse{}
  542. case opGetChildren2:
  543. return &getChildren2Response{}
  544. case opGetData:
  545. return &getDataResponse{}
  546. case opPing:
  547. return &pingResponse{}
  548. case opSetAcl:
  549. return &setAclResponse{}
  550. case opSetData:
  551. return &setDataResponse{}
  552. case opSetWatches:
  553. return &setWatchesResponse{}
  554. case opSync:
  555. return &syncResponse{}
  556. case opWatcherEvent:
  557. return &watcherEvent{}
  558. case opSetAuth:
  559. return &setAuthResponse{}
  560. // case opCheck:
  561. // return &checkVersionResponse{}
  562. case opMulti:
  563. return &multiResponse{}
  564. }
  565. return nil
  566. }