conn.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844
  1. package zk
  2. /*
  3. TODO:
  4. * make sure a ping response comes back in a reasonable time
  5. Possible watcher events:
  6. * Event{Type: EventNotWatching, State: StateDisconnected, Path: path, Err: err}
  7. */
  8. import (
  9. "crypto/rand"
  10. "encoding/binary"
  11. "errors"
  12. "fmt"
  13. "io"
  14. "log"
  15. "net"
  16. "strconv"
  17. "strings"
  18. "sync"
  19. "sync/atomic"
  20. "time"
  21. )
  22. var ErrNoServer = errors.New("zk: could not connect to a server")
  23. const (
  24. bufferSize = 1536 * 1024
  25. eventChanSize = 6
  26. sendChanSize = 16
  27. protectedPrefix = "_c_"
  28. )
  29. type watchType int
  30. const (
  31. watchTypeData = iota
  32. watchTypeExist = iota
  33. watchTypeChild = iota
  34. )
  35. type watchPathType struct {
  36. path string
  37. wType watchType
  38. }
  39. type Dialer func(network, address string, timeout time.Duration) (net.Conn, error)
  40. type Conn struct {
  41. lastZxid int64
  42. sessionID int64
  43. state State // must be 32-bit aligned
  44. xid uint32
  45. timeout int32 // session timeout in milliseconds
  46. passwd []byte
  47. dialer Dialer
  48. servers []string
  49. serverIndex int // remember last server that was tried during connect to round-robin attempts to servers
  50. lastServerIndex int // index of the last server that was successfully connected to and authenticated with
  51. conn net.Conn
  52. eventChan chan Event
  53. shouldQuit chan struct{}
  54. pingInterval time.Duration
  55. recvTimeout time.Duration
  56. connectTimeout time.Duration
  57. sendChan chan *request
  58. requests map[int32]*request // Xid -> pending request
  59. requestsLock sync.Mutex
  60. watchers map[watchPathType][]chan Event
  61. watchersLock sync.Mutex
  62. // Debug (used by unit tests)
  63. reconnectDelay time.Duration
  64. }
  65. type request struct {
  66. xid int32
  67. opcode int32
  68. pkt interface{}
  69. recvStruct interface{}
  70. recvChan chan response
  71. // Because sending and receiving happen in separate go routines, there's
  72. // a possible race condition when creating watches from outside the read
  73. // loop. We must ensure that a watcher gets added to the list synchronously
  74. // with the response from the server on any request that creates a watch.
  75. // In order to not hard code the watch logic for each opcode in the recv
  76. // loop the caller can use recvFunc to insert some synchronously code
  77. // after a response.
  78. recvFunc func(*request, *responseHeader, error)
  79. }
  80. type response struct {
  81. zxid int64
  82. err error
  83. }
  84. type Event struct {
  85. Type EventType
  86. State State
  87. Path string // For non-session events, the path of the watched node.
  88. Err error
  89. Server string // For connection events
  90. }
  91. // Connect establishes a new connection to a pool of zookeeper servers
  92. // using the default net.Dialer. See ConnectWithDialer for further
  93. // information about session timeout.
  94. func Connect(servers []string, sessionTimeout time.Duration) (*Conn, <-chan Event, error) {
  95. return ConnectWithDialer(servers, sessionTimeout, nil)
  96. }
  97. // ConnectWithDialer establishes a new connection to a pool of zookeeper
  98. // servers. The provided session timeout sets the amount of time for which
  99. // a session is considered valid after losing connection to a server. Within
  100. // the session timeout it's possible to reestablish a connection to a different
  101. // server and keep the same session. This is means any ephemeral nodes and
  102. // watches are maintained.
  103. func ConnectWithDialer(servers []string, sessionTimeout time.Duration, dialer Dialer) (*Conn, <-chan Event, error) {
  104. if len(servers) == 0 {
  105. return nil, nil, errors.New("zk: server list must not be empty")
  106. }
  107. recvTimeout := sessionTimeout * 2 / 3
  108. srvs := make([]string, len(servers))
  109. for i, addr := range servers {
  110. if strings.Contains(addr, ":") {
  111. srvs[i] = addr
  112. } else {
  113. srvs[i] = addr + ":" + strconv.Itoa(DefaultPort)
  114. }
  115. }
  116. // Randomize the order of the servers to avoid creating hotspots
  117. stringShuffle(srvs)
  118. ec := make(chan Event, eventChanSize)
  119. if dialer == nil {
  120. dialer = net.DialTimeout
  121. }
  122. conn := Conn{
  123. dialer: dialer,
  124. servers: srvs,
  125. serverIndex: 0,
  126. lastServerIndex: -1,
  127. conn: nil,
  128. state: StateDisconnected,
  129. eventChan: ec,
  130. shouldQuit: make(chan struct{}),
  131. recvTimeout: recvTimeout,
  132. pingInterval: recvTimeout / 2,
  133. connectTimeout: 1 * time.Second,
  134. sendChan: make(chan *request, sendChanSize),
  135. requests: make(map[int32]*request),
  136. watchers: make(map[watchPathType][]chan Event),
  137. passwd: emptyPassword,
  138. timeout: int32(sessionTimeout.Nanoseconds() / 1e6),
  139. // Debug
  140. reconnectDelay: 0,
  141. }
  142. go func() {
  143. conn.loop()
  144. conn.flushRequests(ErrClosing)
  145. conn.invalidateWatches(ErrClosing)
  146. close(conn.eventChan)
  147. }()
  148. return &conn, ec, nil
  149. }
  150. func (c *Conn) Close() {
  151. close(c.shouldQuit)
  152. select {
  153. case <-c.queueRequest(opClose, &closeRequest{}, &closeResponse{}, nil):
  154. case <-time.After(time.Second):
  155. }
  156. }
  157. func (c *Conn) State() State {
  158. return State(atomic.LoadInt32((*int32)(&c.state)))
  159. }
  160. func (c *Conn) setState(state State) {
  161. atomic.StoreInt32((*int32)(&c.state), int32(state))
  162. select {
  163. case c.eventChan <- Event{Type: EventSession, State: state, Server: c.servers[c.serverIndex]}:
  164. default:
  165. // panic("zk: event channel full - it must be monitored and never allowed to be full")
  166. }
  167. }
  168. func (c *Conn) connect() error {
  169. c.setState(StateConnecting)
  170. for {
  171. c.serverIndex = (c.serverIndex + 1) % len(c.servers)
  172. if c.serverIndex == c.lastServerIndex {
  173. c.flushUnsentRequests(ErrNoServer)
  174. select {
  175. case <-time.After(time.Second):
  176. // pass
  177. case <-c.shouldQuit:
  178. c.setState(StateDisconnected)
  179. c.flushUnsentRequests(ErrClosing)
  180. return ErrClosing
  181. }
  182. } else if c.lastServerIndex < 0 {
  183. // lastServerIndex defaults to -1 to avoid a delay on the initial connect
  184. c.lastServerIndex = 0
  185. }
  186. zkConn, err := c.dialer("tcp", c.servers[c.serverIndex], c.connectTimeout)
  187. if err == nil {
  188. c.conn = zkConn
  189. c.setState(StateConnected)
  190. return nil
  191. }
  192. log.Printf("Failed to connect to %s: %+v", c.servers[c.serverIndex], err)
  193. }
  194. }
  195. func (c *Conn) loop() {
  196. for {
  197. if err := c.connect(); err != nil {
  198. // c.Close() was called
  199. return
  200. }
  201. err := c.authenticate()
  202. switch {
  203. case err == ErrSessionExpired:
  204. c.invalidateWatches(err)
  205. case err != nil && c.conn != nil:
  206. c.conn.Close()
  207. case err == nil:
  208. c.lastServerIndex = c.serverIndex
  209. closeChan := make(chan struct{}) // channel to tell send loop stop
  210. var wg sync.WaitGroup
  211. wg.Add(1)
  212. go func() {
  213. c.sendLoop(c.conn, closeChan)
  214. c.conn.Close() // causes recv loop to EOF/exit
  215. wg.Done()
  216. }()
  217. wg.Add(1)
  218. go func() {
  219. err = c.recvLoop(c.conn)
  220. if err == nil {
  221. panic("zk: recvLoop should never return nil error")
  222. }
  223. close(closeChan) // tell send loop to exit
  224. wg.Done()
  225. }()
  226. wg.Wait()
  227. }
  228. c.setState(StateDisconnected)
  229. // Yeesh
  230. if err != io.EOF && err != ErrSessionExpired && !strings.Contains(err.Error(), "use of closed network connection") {
  231. log.Println(err)
  232. }
  233. select {
  234. case <-c.shouldQuit:
  235. c.flushRequests(ErrClosing)
  236. return
  237. default:
  238. }
  239. if err != ErrSessionExpired {
  240. err = ErrConnectionClosed
  241. }
  242. c.flushRequests(err)
  243. if c.reconnectDelay > 0 {
  244. select {
  245. case <-c.shouldQuit:
  246. return
  247. case <-time.After(c.reconnectDelay):
  248. }
  249. }
  250. }
  251. }
  252. func (c *Conn) flushUnsentRequests(err error) {
  253. for {
  254. select {
  255. default:
  256. return
  257. case req := <-c.sendChan:
  258. req.recvChan <- response{-1, err}
  259. }
  260. }
  261. }
  262. // Send error to all pending requests and clear request map
  263. func (c *Conn) flushRequests(err error) {
  264. c.requestsLock.Lock()
  265. for _, req := range c.requests {
  266. req.recvChan <- response{-1, err}
  267. }
  268. c.requests = make(map[int32]*request)
  269. c.requestsLock.Unlock()
  270. }
  271. // Send error to all watchers and clear watchers map
  272. func (c *Conn) invalidateWatches(err error) {
  273. c.watchersLock.Lock()
  274. defer c.watchersLock.Unlock()
  275. if len(c.watchers) >= 0 {
  276. for pathType, watchers := range c.watchers {
  277. ev := Event{Type: EventNotWatching, State: StateDisconnected, Path: pathType.path, Err: err}
  278. for _, ch := range watchers {
  279. ch <- ev
  280. close(ch)
  281. }
  282. }
  283. c.watchers = make(map[watchPathType][]chan Event)
  284. }
  285. }
  286. func (c *Conn) sendSetWatches() {
  287. c.watchersLock.Lock()
  288. defer c.watchersLock.Unlock()
  289. if len(c.watchers) == 0 {
  290. return
  291. }
  292. req := &setWatchesRequest{
  293. RelativeZxid: c.lastZxid,
  294. DataWatches: make([]string, 0),
  295. ExistWatches: make([]string, 0),
  296. ChildWatches: make([]string, 0),
  297. }
  298. n := 0
  299. for pathType, watchers := range c.watchers {
  300. if len(watchers) == 0 {
  301. continue
  302. }
  303. switch pathType.wType {
  304. case watchTypeData:
  305. req.DataWatches = append(req.DataWatches, pathType.path)
  306. case watchTypeExist:
  307. req.ExistWatches = append(req.ExistWatches, pathType.path)
  308. case watchTypeChild:
  309. req.ChildWatches = append(req.ChildWatches, pathType.path)
  310. }
  311. n++
  312. }
  313. if n == 0 {
  314. return
  315. }
  316. go func() {
  317. res := &setWatchesResponse{}
  318. _, err := c.request(opSetWatches, req, res, nil)
  319. if err != nil {
  320. log.Printf("Failed to set previous watches: %s", err.Error())
  321. }
  322. }()
  323. }
  324. func (c *Conn) authenticate() error {
  325. buf := make([]byte, 256)
  326. // connect request
  327. n, err := encodePacket(buf[4:], &connectRequest{
  328. ProtocolVersion: protocolVersion,
  329. LastZxidSeen: c.lastZxid,
  330. TimeOut: c.timeout,
  331. SessionID: c.sessionID,
  332. Passwd: c.passwd,
  333. })
  334. if err != nil {
  335. return err
  336. }
  337. binary.BigEndian.PutUint32(buf[:4], uint32(n))
  338. c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout * 10))
  339. _, err = c.conn.Write(buf[:n+4])
  340. c.conn.SetWriteDeadline(time.Time{})
  341. if err != nil {
  342. return err
  343. }
  344. c.sendSetWatches()
  345. // connect response
  346. // package length
  347. c.conn.SetReadDeadline(time.Now().Add(c.recvTimeout * 10))
  348. _, err = io.ReadFull(c.conn, buf[:4])
  349. c.conn.SetReadDeadline(time.Time{})
  350. if err != nil {
  351. // Sometimes zookeeper just drops connection on invalid session data,
  352. // we prefer to drop session and start from scratch when that event
  353. // occurs instead of dropping into loop of connect/disconnect attempts
  354. c.sessionID = 0
  355. c.passwd = emptyPassword
  356. c.lastZxid = 0
  357. c.setState(StateExpired)
  358. return ErrSessionExpired
  359. }
  360. blen := int(binary.BigEndian.Uint32(buf[:4]))
  361. if cap(buf) < blen {
  362. buf = make([]byte, blen)
  363. }
  364. _, err = io.ReadFull(c.conn, buf[:blen])
  365. if err != nil {
  366. return err
  367. }
  368. r := connectResponse{}
  369. _, err = decodePacket(buf[:blen], &r)
  370. if err != nil {
  371. return err
  372. }
  373. if r.SessionID == 0 {
  374. c.sessionID = 0
  375. c.passwd = emptyPassword
  376. c.lastZxid = 0
  377. c.setState(StateExpired)
  378. return ErrSessionExpired
  379. }
  380. if c.sessionID != r.SessionID {
  381. atomic.StoreUint32(&c.xid, 0)
  382. }
  383. c.timeout = r.TimeOut
  384. c.sessionID = r.SessionID
  385. c.passwd = r.Passwd
  386. c.setState(StateHasSession)
  387. return nil
  388. }
  389. func (c *Conn) sendLoop(conn net.Conn, closeChan <-chan struct{}) error {
  390. pingTicker := time.NewTicker(c.pingInterval)
  391. defer pingTicker.Stop()
  392. buf := make([]byte, bufferSize)
  393. for {
  394. select {
  395. case req := <-c.sendChan:
  396. header := &requestHeader{req.xid, req.opcode}
  397. n, err := encodePacket(buf[4:], header)
  398. if err != nil {
  399. req.recvChan <- response{-1, err}
  400. continue
  401. }
  402. n2, err := encodePacket(buf[4+n:], req.pkt)
  403. if err != nil {
  404. req.recvChan <- response{-1, err}
  405. continue
  406. }
  407. n += n2
  408. binary.BigEndian.PutUint32(buf[:4], uint32(n))
  409. c.requestsLock.Lock()
  410. select {
  411. case <-closeChan:
  412. req.recvChan <- response{-1, ErrConnectionClosed}
  413. c.requestsLock.Unlock()
  414. return ErrConnectionClosed
  415. default:
  416. }
  417. c.requests[req.xid] = req
  418. c.requestsLock.Unlock()
  419. conn.SetWriteDeadline(time.Now().Add(c.recvTimeout))
  420. _, err = conn.Write(buf[:n+4])
  421. conn.SetWriteDeadline(time.Time{})
  422. if err != nil {
  423. req.recvChan <- response{-1, err}
  424. conn.Close()
  425. return err
  426. }
  427. case <-pingTicker.C:
  428. n, err := encodePacket(buf[4:], &requestHeader{Xid: -2, Opcode: opPing})
  429. if err != nil {
  430. panic("zk: opPing should never fail to serialize")
  431. }
  432. binary.BigEndian.PutUint32(buf[:4], uint32(n))
  433. conn.SetWriteDeadline(time.Now().Add(c.recvTimeout))
  434. _, err = conn.Write(buf[:n+4])
  435. conn.SetWriteDeadline(time.Time{})
  436. if err != nil {
  437. conn.Close()
  438. return err
  439. }
  440. case <-closeChan:
  441. return nil
  442. }
  443. }
  444. }
  445. func (c *Conn) recvLoop(conn net.Conn) error {
  446. buf := make([]byte, bufferSize)
  447. for {
  448. // package length
  449. conn.SetReadDeadline(time.Now().Add(c.recvTimeout))
  450. _, err := io.ReadFull(conn, buf[:4])
  451. if err != nil {
  452. return err
  453. }
  454. blen := int(binary.BigEndian.Uint32(buf[:4]))
  455. if cap(buf) < blen {
  456. buf = make([]byte, blen)
  457. }
  458. _, err = io.ReadFull(conn, buf[:blen])
  459. conn.SetReadDeadline(time.Time{})
  460. if err != nil {
  461. return err
  462. }
  463. res := responseHeader{}
  464. _, err = decodePacket(buf[:16], &res)
  465. if err != nil {
  466. return err
  467. }
  468. if res.Xid == -1 {
  469. res := &watcherEvent{}
  470. _, err := decodePacket(buf[16:16+blen], res)
  471. if err != nil {
  472. return err
  473. }
  474. ev := Event{
  475. Type: res.Type,
  476. State: res.State,
  477. Path: res.Path,
  478. Err: nil,
  479. }
  480. select {
  481. case c.eventChan <- ev:
  482. default:
  483. }
  484. wTypes := make([]watchType, 0, 2)
  485. switch res.Type {
  486. case EventNodeCreated:
  487. wTypes = append(wTypes, watchTypeExist)
  488. case EventNodeDeleted, EventNodeDataChanged:
  489. wTypes = append(wTypes, watchTypeExist, watchTypeData, watchTypeChild)
  490. case EventNodeChildrenChanged:
  491. wTypes = append(wTypes, watchTypeChild)
  492. }
  493. c.watchersLock.Lock()
  494. for _, t := range wTypes {
  495. wpt := watchPathType{res.Path, t}
  496. if watchers := c.watchers[wpt]; watchers != nil && len(watchers) > 0 {
  497. for _, ch := range watchers {
  498. ch <- ev
  499. close(ch)
  500. }
  501. delete(c.watchers, wpt)
  502. }
  503. }
  504. c.watchersLock.Unlock()
  505. } else if res.Xid == -2 {
  506. // Ping response. Ignore.
  507. } else if res.Xid < 0 {
  508. log.Printf("Xid < 0 (%d) but not ping or watcher event", res.Xid)
  509. } else {
  510. if res.Zxid > 0 {
  511. c.lastZxid = res.Zxid
  512. }
  513. c.requestsLock.Lock()
  514. req, ok := c.requests[res.Xid]
  515. if ok {
  516. delete(c.requests, res.Xid)
  517. }
  518. c.requestsLock.Unlock()
  519. if !ok {
  520. log.Printf("Response for unknown request with xid %d", res.Xid)
  521. } else {
  522. if res.Err != 0 {
  523. err = res.Err.toError()
  524. } else {
  525. _, err = decodePacket(buf[16:16+blen], req.recvStruct)
  526. }
  527. if req.recvFunc != nil {
  528. req.recvFunc(req, &res, err)
  529. }
  530. req.recvChan <- response{res.Zxid, err}
  531. if req.opcode == opClose {
  532. return io.EOF
  533. }
  534. }
  535. }
  536. }
  537. }
  538. func (c *Conn) nextXid() int32 {
  539. return int32(atomic.AddUint32(&c.xid, 1) & 0x7fffffff)
  540. }
  541. func (c *Conn) addWatcher(path string, watchType watchType) <-chan Event {
  542. c.watchersLock.Lock()
  543. defer c.watchersLock.Unlock()
  544. ch := make(chan Event, 1)
  545. wpt := watchPathType{path, watchType}
  546. c.watchers[wpt] = append(c.watchers[wpt], ch)
  547. return ch
  548. }
  549. func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) <-chan response {
  550. rq := &request{
  551. xid: c.nextXid(),
  552. opcode: opcode,
  553. pkt: req,
  554. recvStruct: res,
  555. recvChan: make(chan response, 1),
  556. recvFunc: recvFunc,
  557. }
  558. c.sendChan <- rq
  559. return rq.recvChan
  560. }
  561. func (c *Conn) request(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) (int64, error) {
  562. r := <-c.queueRequest(opcode, req, res, recvFunc)
  563. return r.zxid, r.err
  564. }
  565. func (c *Conn) AddAuth(scheme string, auth []byte) error {
  566. _, err := c.request(opSetAuth, &setAuthRequest{Type: 0, Scheme: scheme, Auth: auth}, &setAuthResponse{}, nil)
  567. return err
  568. }
  569. func (c *Conn) Children(path string) ([]string, *Stat, error) {
  570. res := &getChildren2Response{}
  571. _, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: false}, res, nil)
  572. return res.Children, &res.Stat, err
  573. }
  574. func (c *Conn) ChildrenW(path string) ([]string, *Stat, <-chan Event, error) {
  575. var ech <-chan Event
  576. res := &getChildren2Response{}
  577. _, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
  578. if err == nil {
  579. ech = c.addWatcher(path, watchTypeChild)
  580. }
  581. })
  582. if err != nil {
  583. return nil, nil, nil, err
  584. }
  585. return res.Children, &res.Stat, ech, err
  586. }
  587. func (c *Conn) Get(path string) ([]byte, *Stat, error) {
  588. res := &getDataResponse{}
  589. _, err := c.request(opGetData, &getDataRequest{Path: path, Watch: false}, res, nil)
  590. return res.Data, &res.Stat, err
  591. }
  592. // GetW returns the contents of a znode and sets a watch
  593. func (c *Conn) GetW(path string) ([]byte, *Stat, <-chan Event, error) {
  594. var ech <-chan Event
  595. res := &getDataResponse{}
  596. _, err := c.request(opGetData, &getDataRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
  597. if err == nil {
  598. ech = c.addWatcher(path, watchTypeData)
  599. }
  600. })
  601. if err != nil {
  602. return nil, nil, nil, err
  603. }
  604. return res.Data, &res.Stat, ech, err
  605. }
  606. func (c *Conn) Set(path string, data []byte, version int32) (*Stat, error) {
  607. res := &setDataResponse{}
  608. _, err := c.request(opSetData, &SetDataRequest{path, data, version}, res, nil)
  609. return &res.Stat, err
  610. }
  611. func (c *Conn) Create(path string, data []byte, flags int32, acl []ACL) (string, error) {
  612. res := &createResponse{}
  613. _, err := c.request(opCreate, &CreateRequest{path, data, acl, flags}, res, nil)
  614. return res.Path, err
  615. }
  616. // CreateProtectedEphemeralSequential fixes a race condition if the server crashes
  617. // after it creates the node. On reconnect the session may still be valid so the
  618. // ephemeral node still exists. Therefore, on reconnect we need to check if a node
  619. // with a GUID generated on create exists.
  620. func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl []ACL) (string, error) {
  621. var guid [16]byte
  622. _, err := io.ReadFull(rand.Reader, guid[:16])
  623. if err != nil {
  624. return "", err
  625. }
  626. guidStr := fmt.Sprintf("%x", guid)
  627. parts := strings.Split(path, "/")
  628. parts[len(parts)-1] = fmt.Sprintf("%s%s-%s", protectedPrefix, guidStr, parts[len(parts)-1])
  629. rootPath := strings.Join(parts[:len(parts)-1], "/")
  630. protectedPath := strings.Join(parts, "/")
  631. var newPath string
  632. for i := 0; i < 3; i++ {
  633. newPath, err = c.Create(protectedPath, data, FlagEphemeral|FlagSequence, acl)
  634. switch err {
  635. case ErrSessionExpired:
  636. // No need to search for the node since it can't exist. Just try again.
  637. case ErrConnectionClosed:
  638. children, _, err := c.Children(rootPath)
  639. if err != nil {
  640. return "", err
  641. }
  642. for _, p := range children {
  643. parts := strings.Split(p, "/")
  644. if pth := parts[len(parts)-1]; strings.HasPrefix(pth, protectedPrefix) {
  645. if g := pth[len(protectedPrefix) : len(protectedPrefix)+32]; g == guidStr {
  646. return rootPath + "/" + p, nil
  647. }
  648. }
  649. }
  650. case nil:
  651. return newPath, nil
  652. default:
  653. return "", err
  654. }
  655. }
  656. return "", err
  657. }
  658. func (c *Conn) Delete(path string, version int32) error {
  659. _, err := c.request(opDelete, &DeleteRequest{path, version}, &deleteResponse{}, nil)
  660. return err
  661. }
  662. func (c *Conn) Exists(path string) (bool, *Stat, error) {
  663. res := &existsResponse{}
  664. _, err := c.request(opExists, &existsRequest{Path: path, Watch: false}, res, nil)
  665. exists := true
  666. if err == ErrNoNode {
  667. exists = false
  668. err = nil
  669. }
  670. return exists, &res.Stat, err
  671. }
  672. func (c *Conn) ExistsW(path string) (bool, *Stat, <-chan Event, error) {
  673. var ech <-chan Event
  674. res := &existsResponse{}
  675. _, err := c.request(opExists, &existsRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
  676. if err == nil {
  677. ech = c.addWatcher(path, watchTypeData)
  678. } else if err == ErrNoNode {
  679. ech = c.addWatcher(path, watchTypeExist)
  680. }
  681. })
  682. exists := true
  683. if err == ErrNoNode {
  684. exists = false
  685. err = nil
  686. }
  687. if err != nil {
  688. return false, nil, nil, err
  689. }
  690. return exists, &res.Stat, ech, err
  691. }
  692. func (c *Conn) GetACL(path string) ([]ACL, *Stat, error) {
  693. res := &getAclResponse{}
  694. _, err := c.request(opGetAcl, &getAclRequest{Path: path}, res, nil)
  695. return res.Acl, &res.Stat, err
  696. }
  697. func (c *Conn) SetACL(path string, acl []ACL, version int32) (*Stat, error) {
  698. res := &setAclResponse{}
  699. _, err := c.request(opSetAcl, &setAclRequest{Path: path, Acl: acl, Version: version}, res, nil)
  700. return &res.Stat, err
  701. }
  702. func (c *Conn) Sync(path string) (string, error) {
  703. res := &syncResponse{}
  704. _, err := c.request(opSync, &syncRequest{Path: path}, res, nil)
  705. return res.Path, err
  706. }
  707. type MultiResponse struct {
  708. Stat *Stat
  709. String string
  710. }
  711. // Multi executes multiple ZooKeeper operations or none of them. The provided
  712. // ops must be one of *CreateRequest, *DeleteRequest, *SetDataRequest, or
  713. // *CheckVersionRequest.
  714. func (c *Conn) Multi(ops ...interface{}) ([]MultiResponse, error) {
  715. req := &multiRequest{
  716. Ops: make([]multiRequestOp, 0, len(ops)),
  717. DoneHeader: multiHeader{Type: -1, Done: true, Err: -1},
  718. }
  719. for _, op := range ops {
  720. var opCode int32
  721. switch op.(type) {
  722. case *CreateRequest:
  723. opCode = opCreate
  724. case *SetDataRequest:
  725. opCode = opSetData
  726. case *DeleteRequest:
  727. opCode = opDelete
  728. case *CheckVersionRequest:
  729. opCode = opCheck
  730. default:
  731. return nil, fmt.Errorf("uknown operation type %T", op)
  732. }
  733. req.Ops = append(req.Ops, multiRequestOp{multiHeader{opCode, false, -1}, op})
  734. }
  735. res := &multiResponse{}
  736. _, err := c.request(opMulti, req, res, nil)
  737. mr := make([]MultiResponse, len(res.Ops))
  738. for i, op := range res.Ops {
  739. mr[i] = MultiResponse{Stat: op.Stat, String: op.String}
  740. }
  741. return mr, err
  742. }