libnetwork: remove zookeeper-related code and tests

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2021-12-16 13:30:34 +01:00
parent e202ac3f38
commit a7d0f3060a
No known key found for this signature in database
GPG key ID: 76698F39D527CE8C
19 changed files with 4 additions and 3140 deletions

View file

@ -114,7 +114,7 @@ type ScopeClientCfg struct {
const (
// LocalScope indicates to store the KV object in local datastore such as boltdb
LocalScope = "local"
// GlobalScope indicates to store the KV object in global datastore such as consul/etcd/zookeeper
// GlobalScope indicates to store the KV object in global datastore such as consul/etcd
GlobalScope = "global"
// SwarmScope is not indicating a datastore location. It is defined here
// along with the other two scopes just for consistency.

View file

@ -27,7 +27,7 @@ Docker version 1.8.0-dev, build f39b9a0, experimental
Multi-host networking uses a pluggable Key-Value store backend to distribute states using `libkv`.
`libkv` supports multiple pluggable backends such as `consul`, `etcd` & `zookeeper` (more to come).
`libkv` supports multiple pluggable backends such as `consul`, `etcd` (more to come).
In this example we will use `consul`

View file

@ -8,13 +8,11 @@ import (
"github.com/docker/libkv/store/boltdb"
"github.com/docker/libkv/store/consul"
"github.com/docker/libkv/store/etcd"
"github.com/docker/libkv/store/zookeeper"
"github.com/sirupsen/logrus"
)
func registerKVStores() {
consul.Register()
zookeeper.Register()
etcd.Register()
boltdb.Register()
}

View file

@ -149,9 +149,7 @@ function start_dnet() {
# Try discovery URLs with or without path
neigh_ip=""
neighbors=""
if [ "$store" = "zookeeper" ]; then
read discovery provider address < <(parse_discovery_str zk://${bridge_ip}:2182)
elif [ "$store" = "etcd" ]; then
if [ "$store" = "etcd" ]; then
read discovery provider address < <(parse_discovery_str etcd://${bridge_ip}:42000/custom_prefix)
elif [ "$store" = "consul" ]; then
read discovery provider address < <(parse_discovery_str consul://${bridge_ip}:8500/custom_prefix)
@ -288,21 +286,6 @@ function stop_etcd() {
docker rm -f dn_etcd || true
}
function start_zookeeper() {
stop_zookeeper
docker run -d \
--name=zookeeper_server \
-p 2182:2181 \
-h zookeeper \
dnephin/docker-zookeeper:3.4.6
sleep 2
}
function stop_zookeeper() {
echo "zookeeper started"
docker rm -f zookeeper_server || true
}
function test_overlay() {
dnet_suffix=$1

View file

@ -1,8 +0,0 @@
# -*- mode: sh -*-
#!/usr/bin/env bats
load helpers
@test "Test overlay network with zookeeper" {
test_overlay zookeeper
}

View file

@ -89,25 +89,6 @@ function run_overlay_consul_host_tests() {
unset _OVERLAY_HOST_MODE
}
function run_overlay_zk_tests() {
## Test overlay network with zookeeper
start_dnet 1 zookeeper 1>> ${INTEGRATION_ROOT}/test.log 2>&1
cmap[dnet - 1 - zookeeper]=dnet-1-zookeeper
start_dnet 2 zookeeper 1>> ${INTEGRATION_ROOT}/test.log 2>&1
cmap[dnet - 2 - zookeeper]=dnet-2-zookeeper
start_dnet 3 zookeeper 1>> ${INTEGRATION_ROOT}/test.log 2>&1
cmap[dnet - 3 - zookeeper]=dnet-3-zookeeper
./integration-tmp/bin/bats ./test/integration/dnet/overlay-zookeeper.bats
stop_dnet 1 zookeeper 1>> ${INTEGRATION_ROOT}/test.log 2>&1
unset cmap[dnet-1-zookeeper]
stop_dnet 2 zookeeper 1>> ${INTEGRATION_ROOT}/test.log 2>&1
unset cmap[dnet-2-zookeeper]
stop_dnet 3 zookeeper 1>> ${INTEGRATION_ROOT}/test.log 2>&1
unset cmap[dnet-3-zookeeper]
}
function run_overlay_etcd_tests() {
## Test overlay network with etcd
start_dnet 1 etcd 1>> ${INTEGRATION_ROOT}/test.log 2>&1
@ -155,29 +136,6 @@ function run_multi_consul_tests() {
unset cmap[dnet-3-multi_consul]
}
function run_multi_zk_tests() {
# Test multi node configuration with a global scope test driver backed by zookeeper
## Setup
start_dnet 1 multi_zk zookeeper 1>> ${INTEGRATION_ROOT}/test.log 2>&1
cmap[dnet - 1 - multi_zk]=dnet-1-multi_zk
start_dnet 2 multi_zk zookeeper 1>> ${INTEGRATION_ROOT}/test.log 2>&1
cmap[dnet - 2 - multi_zk]=dnet-2-multi_zk
start_dnet 3 multi_zk zookeeper 1>> ${INTEGRATION_ROOT}/test.log 2>&1
cmap[dnet - 3 - multi_zk]=dnet-3-multi_zk
## Run the test cases
./integration-tmp/bin/bats ./test/integration/dnet/multi.bats
## Teardown
stop_dnet 1 multi_zk 1>> ${INTEGRATION_ROOT}/test.log 2>&1
unset cmap[dnet-1-multi_zk]
stop_dnet 2 multi_zk 1>> ${INTEGRATION_ROOT}/test.log 2>&1
unset cmap[dnet-2-multi_zk]
stop_dnet 3 multi_zk 1>> ${INTEGRATION_ROOT}/test.log 2>&1
unset cmap[dnet-3-multi_zk]
}
function run_multi_etcd_tests() {
# Test multi node configuration with a global scope test driver backed by etcd
@ -220,7 +178,7 @@ fi
# Suite setup
if [ -z "$SUITES" ]; then
suites="dnet multi_consul multi_zk multi_etcd bridge overlay_consul overlay_consul_host overlay_zk overlay_etcd"
suites="dnet multi_consul multi_etcd bridge overlay_consul overlay_consul_host overlay_etcd"
else
suites="$SUITES"
fi
@ -231,12 +189,6 @@ if [[ ("$suites" =~ .*consul.*) || ("$suites" =~ .*bridge.*) ]]; then
cmap[pr_consul]=pr_consul
fi
if [[ "$suites" =~ .*zk.* ]]; then
echo "Starting zookeeper ..."
start_zookeeper 1>> ${INTEGRATION_ROOT}/test.log 2>&1
cmap[zookeeper_server]=zookeeper_server
fi
if [[ "$suites" =~ .*etcd.* ]]; then
echo "Starting etcd ..."
start_etcd 1>> ${INTEGRATION_ROOT}/test.log 2>&1

View file

@ -60,7 +60,6 @@ github.com/vishvananda/netlink f049be6f391489d3f374498fe0c8
github.com/moby/ipvs 4566ccea0e08d68e9614c3e7a64a23b850c4bb35 # v1.0.1
github.com/google/btree 479b5e81b0a93ec038d201b0b33d17db599531d3 # v1.0.1
github.com/samuel/go-zookeeper d0e0d8e11f318e000a8cc434616d69e329edc374
github.com/deckarep/golang-set ef32fa3046d9f249d399f98ebaf9be944430fd1d
github.com/coreos/etcd 973882f697a8db3d59815bf132c6c506434334bd # v3.3.27
github.com/coreos/go-semver 8ab6407b697782a06568d4b7f1db25550ec2e4c6 # v0.2.0

View file

@ -1,429 +0,0 @@
package zookeeper
import (
"strings"
"time"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
zk "github.com/samuel/go-zookeeper/zk"
)
const (
// SOH control character
SOH = "\x01"
defaultTimeout = 10 * time.Second
)
// Zookeeper is the receiver type for
// the Store interface
type Zookeeper struct {
timeout time.Duration
client *zk.Conn
}
type zookeeperLock struct {
client *zk.Conn
lock *zk.Lock
key string
value []byte
}
// Register registers zookeeper to libkv
func Register() {
libkv.AddStore(store.ZK, New)
}
// New creates a new Zookeeper client given a
// list of endpoints and an optional tls config
func New(endpoints []string, options *store.Config) (store.Store, error) {
s := &Zookeeper{}
s.timeout = defaultTimeout
// Set options
if options != nil {
if options.ConnectionTimeout != 0 {
s.setTimeout(options.ConnectionTimeout)
}
}
// Connect to Zookeeper
conn, _, err := zk.Connect(endpoints, s.timeout)
if err != nil {
return nil, err
}
s.client = conn
return s, nil
}
// setTimeout sets the timeout for connecting to Zookeeper
func (s *Zookeeper) setTimeout(time time.Duration) {
s.timeout = time
}
// Get the value at "key", returns the last modified index
// to use in conjunction to Atomic calls
func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) {
resp, meta, err := s.client.Get(s.normalize(key))
if err != nil {
if err == zk.ErrNoNode {
return nil, store.ErrKeyNotFound
}
return nil, err
}
// FIXME handle very rare cases where Get returns the
// SOH control character instead of the actual value
if string(resp) == SOH {
return s.Get(store.Normalize(key))
}
pair = &store.KVPair{
Key: key,
Value: resp,
LastIndex: uint64(meta.Version),
}
return pair, nil
}
// createFullPath creates the entire path for a directory
// that does not exist
func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error {
for i := 1; i <= len(path); i++ {
newpath := "/" + strings.Join(path[:i], "/")
if i == len(path) && ephemeral {
_, err := s.client.Create(newpath, []byte{}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
return err
}
_, err := s.client.Create(newpath, []byte{}, 0, zk.WorldACL(zk.PermAll))
if err != nil {
// Skip if node already exists
if err != zk.ErrNodeExists {
return err
}
}
}
return nil
}
// Put a value at "key"
func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) error {
fkey := s.normalize(key)
exists, err := s.Exists(key)
if err != nil {
return err
}
if !exists {
if opts != nil && opts.TTL > 0 {
s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), true)
} else {
s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), false)
}
}
_, err = s.client.Set(fkey, value, -1)
return err
}
// Delete a value at "key"
func (s *Zookeeper) Delete(key string) error {
err := s.client.Delete(s.normalize(key), -1)
if err == zk.ErrNoNode {
return store.ErrKeyNotFound
}
return err
}
// Exists checks if the key exists inside the store
func (s *Zookeeper) Exists(key string) (bool, error) {
exists, _, err := s.client.Exists(s.normalize(key))
if err != nil {
return false, err
}
return exists, nil
}
// Watch for changes on a "key"
// It returns a channel that will receive changes or pass
// on errors. Upon creation, the current value will first
// be sent to the channel. Providing a non-nil stopCh can
// be used to stop watching.
func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
// Get the key first
pair, err := s.Get(key)
if err != nil {
return nil, err
}
// Catch zk notifications and fire changes into the channel.
watchCh := make(chan *store.KVPair)
go func() {
defer close(watchCh)
// Get returns the current value to the channel prior
// to listening to any event that may occur on that key
watchCh <- pair
for {
_, _, eventCh, err := s.client.GetW(s.normalize(key))
if err != nil {
return
}
select {
case e := <-eventCh:
if e.Type == zk.EventNodeDataChanged {
if entry, err := s.Get(key); err == nil {
watchCh <- entry
}
}
case <-stopCh:
// There is no way to stop GetW so just quit
return
}
}
}()
return watchCh, nil
}
// WatchTree watches for changes on a "directory"
// It returns a channel that will receive changes or pass
// on errors. Upon creating a watch, the current childs values
// will be sent to the channel .Providing a non-nil stopCh can
// be used to stop watching.
func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
// List the childrens first
entries, err := s.List(directory)
if err != nil {
return nil, err
}
// Catch zk notifications and fire changes into the channel.
watchCh := make(chan []*store.KVPair)
go func() {
defer close(watchCh)
// List returns the children values to the channel
// prior to listening to any events that may occur
// on those keys
watchCh <- entries
for {
_, _, eventCh, err := s.client.ChildrenW(s.normalize(directory))
if err != nil {
return
}
select {
case e := <-eventCh:
if e.Type == zk.EventNodeChildrenChanged {
if kv, err := s.List(directory); err == nil {
watchCh <- kv
}
}
case <-stopCh:
// There is no way to stop GetW so just quit
return
}
}
}()
return watchCh, nil
}
// List child nodes of a given directory
func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) {
keys, stat, err := s.client.Children(s.normalize(directory))
if err != nil {
if err == zk.ErrNoNode {
return nil, store.ErrKeyNotFound
}
return nil, err
}
kv := []*store.KVPair{}
// FIXME Costly Get request for each child key..
for _, key := range keys {
pair, err := s.Get(strings.TrimSuffix(directory, "/") + s.normalize(key))
if err != nil {
// If node is not found: List is out of date, retry
if err == store.ErrKeyNotFound {
return s.List(directory)
}
return nil, err
}
kv = append(kv, &store.KVPair{
Key: key,
Value: []byte(pair.Value),
LastIndex: uint64(stat.Version),
})
}
return kv, nil
}
// DeleteTree deletes a range of keys under a given directory
func (s *Zookeeper) DeleteTree(directory string) error {
pairs, err := s.List(directory)
if err != nil {
return err
}
var reqs []interface{}
for _, pair := range pairs {
reqs = append(reqs, &zk.DeleteRequest{
Path: s.normalize(directory + "/" + pair.Key),
Version: -1,
})
}
_, err = s.client.Multi(reqs...)
return err
}
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, _ *store.WriteOptions) (bool, *store.KVPair, error) {
var lastIndex uint64
if previous != nil {
meta, err := s.client.Set(s.normalize(key), value, int32(previous.LastIndex))
if err != nil {
// Compare Failed
if err == zk.ErrBadVersion {
return false, nil, store.ErrKeyModified
}
return false, nil, err
}
lastIndex = uint64(meta.Version)
} else {
// Interpret previous == nil as create operation.
_, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll))
if err != nil {
// Directory does not exist
if err == zk.ErrNoNode {
// Create the directory
parts := store.SplitKey(strings.TrimSuffix(key, "/"))
parts = parts[:len(parts)-1]
if err = s.createFullPath(parts, false); err != nil {
// Failed to create the directory.
return false, nil, err
}
// Create the node
if _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil {
// Node exist error (when previous nil)
if err == zk.ErrNodeExists {
return false, nil, store.ErrKeyExists
}
return false, nil, err
}
} else {
// Node Exists error (when previous nil)
if err == zk.ErrNodeExists {
return false, nil, store.ErrKeyExists
}
// Unhandled error
return false, nil, err
}
}
lastIndex = 0 // Newly created nodes have version 0.
}
pair := &store.KVPair{
Key: key,
Value: value,
LastIndex: lastIndex,
}
return true, pair, nil
}
// AtomicDelete deletes a value at "key" if the key
// has not been modified in the meantime, throws an
// error if this is the case
func (s *Zookeeper) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
if previous == nil {
return false, store.ErrPreviousNotSpecified
}
err := s.client.Delete(s.normalize(key), int32(previous.LastIndex))
if err != nil {
// Key not found
if err == zk.ErrNoNode {
return false, store.ErrKeyNotFound
}
// Compare failed
if err == zk.ErrBadVersion {
return false, store.ErrKeyModified
}
// General store error
return false, err
}
return true, nil
}
// NewLock returns a handle to a lock struct which can
// be used to provide mutual exclusion on a key
func (s *Zookeeper) NewLock(key string, options *store.LockOptions) (lock store.Locker, err error) {
value := []byte("")
// Apply options
if options != nil {
if options.Value != nil {
value = options.Value
}
}
lock = &zookeeperLock{
client: s.client,
key: s.normalize(key),
value: value,
lock: zk.NewLock(s.client, s.normalize(key), zk.WorldACL(zk.PermAll)),
}
return lock, err
}
// Lock attempts to acquire the lock and blocks while
// doing so. It returns a channel that is closed if our
// lock is lost or if an error occurs
func (l *zookeeperLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) {
err := l.lock.Lock()
if err == nil {
// We hold the lock, we can set our value
// FIXME: The value is left behind
// (problematic for leader election)
_, err = l.client.Set(l.key, l.value, -1)
}
return make(chan struct{}), err
}
// Unlock the "key". Calling unlock while
// not holding the lock will throw an error
func (l *zookeeperLock) Unlock() error {
return l.lock.Unlock()
}
// Close closes the client connection
func (s *Zookeeper) Close() {
s.client.Close()
}
// Normalize the key for usage in Zookeeper
func (s *Zookeeper) normalize(key string) string {
key = store.Normalize(key)
return strings.TrimSuffix(key, "/")
}

View file

@ -1,25 +0,0 @@
Copyright (c) 2013, Samuel Stauffer <samuel@descolada.com>
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of the author nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View file

@ -1,11 +0,0 @@
Native Go Zookeeper Client Library
===================================
[![Build Status](https://travis-ci.org/samuel/go-zookeeper.png)](https://travis-ci.org/samuel/go-zookeeper)
Documentation: http://godoc.org/github.com/samuel/go-zookeeper/zk
License
-------
3-clause BSD. See LICENSE file.

View file

@ -1,844 +0,0 @@
package zk
/*
TODO:
* make sure a ping response comes back in a reasonable time
Possible watcher events:
* Event{Type: EventNotWatching, State: StateDisconnected, Path: path, Err: err}
*/
import (
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
"io"
"log"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)
var ErrNoServer = errors.New("zk: could not connect to a server")
const (
bufferSize = 1536 * 1024
eventChanSize = 6
sendChanSize = 16
protectedPrefix = "_c_"
)
type watchType int
const (
watchTypeData = iota
watchTypeExist = iota
watchTypeChild = iota
)
type watchPathType struct {
path string
wType watchType
}
type Dialer func(network, address string, timeout time.Duration) (net.Conn, error)
type Conn struct {
lastZxid int64
sessionID int64
state State // must be 32-bit aligned
xid uint32
timeout int32 // session timeout in milliseconds
passwd []byte
dialer Dialer
servers []string
serverIndex int // remember last server that was tried during connect to round-robin attempts to servers
lastServerIndex int // index of the last server that was successfully connected to and authenticated with
conn net.Conn
eventChan chan Event
shouldQuit chan struct{}
pingInterval time.Duration
recvTimeout time.Duration
connectTimeout time.Duration
sendChan chan *request
requests map[int32]*request // Xid -> pending request
requestsLock sync.Mutex
watchers map[watchPathType][]chan Event
watchersLock sync.Mutex
// Debug (used by unit tests)
reconnectDelay time.Duration
}
type request struct {
xid int32
opcode int32
pkt interface{}
recvStruct interface{}
recvChan chan response
// Because sending and receiving happen in separate go routines, there's
// a possible race condition when creating watches from outside the read
// loop. We must ensure that a watcher gets added to the list synchronously
// with the response from the server on any request that creates a watch.
// In order to not hard code the watch logic for each opcode in the recv
// loop the caller can use recvFunc to insert some synchronously code
// after a response.
recvFunc func(*request, *responseHeader, error)
}
type response struct {
zxid int64
err error
}
type Event struct {
Type EventType
State State
Path string // For non-session events, the path of the watched node.
Err error
Server string // For connection events
}
// Connect establishes a new connection to a pool of zookeeper servers
// using the default net.Dialer. See ConnectWithDialer for further
// information about session timeout.
func Connect(servers []string, sessionTimeout time.Duration) (*Conn, <-chan Event, error) {
return ConnectWithDialer(servers, sessionTimeout, nil)
}
// ConnectWithDialer establishes a new connection to a pool of zookeeper
// servers. The provided session timeout sets the amount of time for which
// a session is considered valid after losing connection to a server. Within
// the session timeout it's possible to reestablish a connection to a different
// server and keep the same session. This is means any ephemeral nodes and
// watches are maintained.
func ConnectWithDialer(servers []string, sessionTimeout time.Duration, dialer Dialer) (*Conn, <-chan Event, error) {
if len(servers) == 0 {
return nil, nil, errors.New("zk: server list must not be empty")
}
recvTimeout := sessionTimeout * 2 / 3
srvs := make([]string, len(servers))
for i, addr := range servers {
if strings.Contains(addr, ":") {
srvs[i] = addr
} else {
srvs[i] = addr + ":" + strconv.Itoa(DefaultPort)
}
}
// Randomize the order of the servers to avoid creating hotspots
stringShuffle(srvs)
ec := make(chan Event, eventChanSize)
if dialer == nil {
dialer = net.DialTimeout
}
conn := Conn{
dialer: dialer,
servers: srvs,
serverIndex: 0,
lastServerIndex: -1,
conn: nil,
state: StateDisconnected,
eventChan: ec,
shouldQuit: make(chan struct{}),
recvTimeout: recvTimeout,
pingInterval: recvTimeout / 2,
connectTimeout: 1 * time.Second,
sendChan: make(chan *request, sendChanSize),
requests: make(map[int32]*request),
watchers: make(map[watchPathType][]chan Event),
passwd: emptyPassword,
timeout: int32(sessionTimeout.Nanoseconds() / 1e6),
// Debug
reconnectDelay: 0,
}
go func() {
conn.loop()
conn.flushRequests(ErrClosing)
conn.invalidateWatches(ErrClosing)
close(conn.eventChan)
}()
return &conn, ec, nil
}
func (c *Conn) Close() {
close(c.shouldQuit)
select {
case <-c.queueRequest(opClose, &closeRequest{}, &closeResponse{}, nil):
case <-time.After(time.Second):
}
}
func (c *Conn) State() State {
return State(atomic.LoadInt32((*int32)(&c.state)))
}
func (c *Conn) setState(state State) {
atomic.StoreInt32((*int32)(&c.state), int32(state))
select {
case c.eventChan <- Event{Type: EventSession, State: state, Server: c.servers[c.serverIndex]}:
default:
// panic("zk: event channel full - it must be monitored and never allowed to be full")
}
}
func (c *Conn) connect() error {
c.setState(StateConnecting)
for {
c.serverIndex = (c.serverIndex + 1) % len(c.servers)
if c.serverIndex == c.lastServerIndex {
c.flushUnsentRequests(ErrNoServer)
select {
case <-time.After(time.Second):
// pass
case <-c.shouldQuit:
c.setState(StateDisconnected)
c.flushUnsentRequests(ErrClosing)
return ErrClosing
}
} else if c.lastServerIndex < 0 {
// lastServerIndex defaults to -1 to avoid a delay on the initial connect
c.lastServerIndex = 0
}
zkConn, err := c.dialer("tcp", c.servers[c.serverIndex], c.connectTimeout)
if err == nil {
c.conn = zkConn
c.setState(StateConnected)
return nil
}
log.Printf("Failed to connect to %s: %+v", c.servers[c.serverIndex], err)
}
}
func (c *Conn) loop() {
for {
if err := c.connect(); err != nil {
// c.Close() was called
return
}
err := c.authenticate()
switch {
case err == ErrSessionExpired:
c.invalidateWatches(err)
case err != nil && c.conn != nil:
c.conn.Close()
case err == nil:
c.lastServerIndex = c.serverIndex
closeChan := make(chan struct{}) // channel to tell send loop stop
var wg sync.WaitGroup
wg.Add(1)
go func() {
c.sendLoop(c.conn, closeChan)
c.conn.Close() // causes recv loop to EOF/exit
wg.Done()
}()
wg.Add(1)
go func() {
err = c.recvLoop(c.conn)
if err == nil {
panic("zk: recvLoop should never return nil error")
}
close(closeChan) // tell send loop to exit
wg.Done()
}()
wg.Wait()
}
c.setState(StateDisconnected)
// Yeesh
if err != io.EOF && err != ErrSessionExpired && !strings.Contains(err.Error(), "use of closed network connection") {
log.Println(err)
}
select {
case <-c.shouldQuit:
c.flushRequests(ErrClosing)
return
default:
}
if err != ErrSessionExpired {
err = ErrConnectionClosed
}
c.flushRequests(err)
if c.reconnectDelay > 0 {
select {
case <-c.shouldQuit:
return
case <-time.After(c.reconnectDelay):
}
}
}
}
func (c *Conn) flushUnsentRequests(err error) {
for {
select {
default:
return
case req := <-c.sendChan:
req.recvChan <- response{-1, err}
}
}
}
// Send error to all pending requests and clear request map
func (c *Conn) flushRequests(err error) {
c.requestsLock.Lock()
for _, req := range c.requests {
req.recvChan <- response{-1, err}
}
c.requests = make(map[int32]*request)
c.requestsLock.Unlock()
}
// Send error to all watchers and clear watchers map
func (c *Conn) invalidateWatches(err error) {
c.watchersLock.Lock()
defer c.watchersLock.Unlock()
if len(c.watchers) >= 0 {
for pathType, watchers := range c.watchers {
ev := Event{Type: EventNotWatching, State: StateDisconnected, Path: pathType.path, Err: err}
for _, ch := range watchers {
ch <- ev
close(ch)
}
}
c.watchers = make(map[watchPathType][]chan Event)
}
}
func (c *Conn) sendSetWatches() {
c.watchersLock.Lock()
defer c.watchersLock.Unlock()
if len(c.watchers) == 0 {
return
}
req := &setWatchesRequest{
RelativeZxid: c.lastZxid,
DataWatches: make([]string, 0),
ExistWatches: make([]string, 0),
ChildWatches: make([]string, 0),
}
n := 0
for pathType, watchers := range c.watchers {
if len(watchers) == 0 {
continue
}
switch pathType.wType {
case watchTypeData:
req.DataWatches = append(req.DataWatches, pathType.path)
case watchTypeExist:
req.ExistWatches = append(req.ExistWatches, pathType.path)
case watchTypeChild:
req.ChildWatches = append(req.ChildWatches, pathType.path)
}
n++
}
if n == 0 {
return
}
go func() {
res := &setWatchesResponse{}
_, err := c.request(opSetWatches, req, res, nil)
if err != nil {
log.Printf("Failed to set previous watches: %s", err.Error())
}
}()
}
func (c *Conn) authenticate() error {
buf := make([]byte, 256)
// connect request
n, err := encodePacket(buf[4:], &connectRequest{
ProtocolVersion: protocolVersion,
LastZxidSeen: c.lastZxid,
TimeOut: c.timeout,
SessionID: c.sessionID,
Passwd: c.passwd,
})
if err != nil {
return err
}
binary.BigEndian.PutUint32(buf[:4], uint32(n))
c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout * 10))
_, err = c.conn.Write(buf[:n+4])
c.conn.SetWriteDeadline(time.Time{})
if err != nil {
return err
}
c.sendSetWatches()
// connect response
// package length
c.conn.SetReadDeadline(time.Now().Add(c.recvTimeout * 10))
_, err = io.ReadFull(c.conn, buf[:4])
c.conn.SetReadDeadline(time.Time{})
if err != nil {
// Sometimes zookeeper just drops connection on invalid session data,
// we prefer to drop session and start from scratch when that event
// occurs instead of dropping into loop of connect/disconnect attempts
c.sessionID = 0
c.passwd = emptyPassword
c.lastZxid = 0
c.setState(StateExpired)
return ErrSessionExpired
}
blen := int(binary.BigEndian.Uint32(buf[:4]))
if cap(buf) < blen {
buf = make([]byte, blen)
}
_, err = io.ReadFull(c.conn, buf[:blen])
if err != nil {
return err
}
r := connectResponse{}
_, err = decodePacket(buf[:blen], &r)
if err != nil {
return err
}
if r.SessionID == 0 {
c.sessionID = 0
c.passwd = emptyPassword
c.lastZxid = 0
c.setState(StateExpired)
return ErrSessionExpired
}
if c.sessionID != r.SessionID {
atomic.StoreUint32(&c.xid, 0)
}
c.timeout = r.TimeOut
c.sessionID = r.SessionID
c.passwd = r.Passwd
c.setState(StateHasSession)
return nil
}
func (c *Conn) sendLoop(conn net.Conn, closeChan <-chan struct{}) error {
pingTicker := time.NewTicker(c.pingInterval)
defer pingTicker.Stop()
buf := make([]byte, bufferSize)
for {
select {
case req := <-c.sendChan:
header := &requestHeader{req.xid, req.opcode}
n, err := encodePacket(buf[4:], header)
if err != nil {
req.recvChan <- response{-1, err}
continue
}
n2, err := encodePacket(buf[4+n:], req.pkt)
if err != nil {
req.recvChan <- response{-1, err}
continue
}
n += n2
binary.BigEndian.PutUint32(buf[:4], uint32(n))
c.requestsLock.Lock()
select {
case <-closeChan:
req.recvChan <- response{-1, ErrConnectionClosed}
c.requestsLock.Unlock()
return ErrConnectionClosed
default:
}
c.requests[req.xid] = req
c.requestsLock.Unlock()
conn.SetWriteDeadline(time.Now().Add(c.recvTimeout))
_, err = conn.Write(buf[:n+4])
conn.SetWriteDeadline(time.Time{})
if err != nil {
req.recvChan <- response{-1, err}
conn.Close()
return err
}
case <-pingTicker.C:
n, err := encodePacket(buf[4:], &requestHeader{Xid: -2, Opcode: opPing})
if err != nil {
panic("zk: opPing should never fail to serialize")
}
binary.BigEndian.PutUint32(buf[:4], uint32(n))
conn.SetWriteDeadline(time.Now().Add(c.recvTimeout))
_, err = conn.Write(buf[:n+4])
conn.SetWriteDeadline(time.Time{})
if err != nil {
conn.Close()
return err
}
case <-closeChan:
return nil
}
}
}
func (c *Conn) recvLoop(conn net.Conn) error {
buf := make([]byte, bufferSize)
for {
// package length
conn.SetReadDeadline(time.Now().Add(c.recvTimeout))
_, err := io.ReadFull(conn, buf[:4])
if err != nil {
return err
}
blen := int(binary.BigEndian.Uint32(buf[:4]))
if cap(buf) < blen {
buf = make([]byte, blen)
}
_, err = io.ReadFull(conn, buf[:blen])
conn.SetReadDeadline(time.Time{})
if err != nil {
return err
}
res := responseHeader{}
_, err = decodePacket(buf[:16], &res)
if err != nil {
return err
}
if res.Xid == -1 {
res := &watcherEvent{}
_, err := decodePacket(buf[16:16+blen], res)
if err != nil {
return err
}
ev := Event{
Type: res.Type,
State: res.State,
Path: res.Path,
Err: nil,
}
select {
case c.eventChan <- ev:
default:
}
wTypes := make([]watchType, 0, 2)
switch res.Type {
case EventNodeCreated:
wTypes = append(wTypes, watchTypeExist)
case EventNodeDeleted, EventNodeDataChanged:
wTypes = append(wTypes, watchTypeExist, watchTypeData, watchTypeChild)
case EventNodeChildrenChanged:
wTypes = append(wTypes, watchTypeChild)
}
c.watchersLock.Lock()
for _, t := range wTypes {
wpt := watchPathType{res.Path, t}
if watchers := c.watchers[wpt]; watchers != nil && len(watchers) > 0 {
for _, ch := range watchers {
ch <- ev
close(ch)
}
delete(c.watchers, wpt)
}
}
c.watchersLock.Unlock()
} else if res.Xid == -2 {
// Ping response. Ignore.
} else if res.Xid < 0 {
log.Printf("Xid < 0 (%d) but not ping or watcher event", res.Xid)
} else {
if res.Zxid > 0 {
c.lastZxid = res.Zxid
}
c.requestsLock.Lock()
req, ok := c.requests[res.Xid]
if ok {
delete(c.requests, res.Xid)
}
c.requestsLock.Unlock()
if !ok {
log.Printf("Response for unknown request with xid %d", res.Xid)
} else {
if res.Err != 0 {
err = res.Err.toError()
} else {
_, err = decodePacket(buf[16:16+blen], req.recvStruct)
}
if req.recvFunc != nil {
req.recvFunc(req, &res, err)
}
req.recvChan <- response{res.Zxid, err}
if req.opcode == opClose {
return io.EOF
}
}
}
}
}
func (c *Conn) nextXid() int32 {
return int32(atomic.AddUint32(&c.xid, 1) & 0x7fffffff)
}
func (c *Conn) addWatcher(path string, watchType watchType) <-chan Event {
c.watchersLock.Lock()
defer c.watchersLock.Unlock()
ch := make(chan Event, 1)
wpt := watchPathType{path, watchType}
c.watchers[wpt] = append(c.watchers[wpt], ch)
return ch
}
func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) <-chan response {
rq := &request{
xid: c.nextXid(),
opcode: opcode,
pkt: req,
recvStruct: res,
recvChan: make(chan response, 1),
recvFunc: recvFunc,
}
c.sendChan <- rq
return rq.recvChan
}
func (c *Conn) request(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) (int64, error) {
r := <-c.queueRequest(opcode, req, res, recvFunc)
return r.zxid, r.err
}
func (c *Conn) AddAuth(scheme string, auth []byte) error {
_, err := c.request(opSetAuth, &setAuthRequest{Type: 0, Scheme: scheme, Auth: auth}, &setAuthResponse{}, nil)
return err
}
func (c *Conn) Children(path string) ([]string, *Stat, error) {
res := &getChildren2Response{}
_, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: false}, res, nil)
return res.Children, &res.Stat, err
}
func (c *Conn) ChildrenW(path string) ([]string, *Stat, <-chan Event, error) {
var ech <-chan Event
res := &getChildren2Response{}
_, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
if err == nil {
ech = c.addWatcher(path, watchTypeChild)
}
})
if err != nil {
return nil, nil, nil, err
}
return res.Children, &res.Stat, ech, err
}
func (c *Conn) Get(path string) ([]byte, *Stat, error) {
res := &getDataResponse{}
_, err := c.request(opGetData, &getDataRequest{Path: path, Watch: false}, res, nil)
return res.Data, &res.Stat, err
}
// GetW returns the contents of a znode and sets a watch
func (c *Conn) GetW(path string) ([]byte, *Stat, <-chan Event, error) {
var ech <-chan Event
res := &getDataResponse{}
_, err := c.request(opGetData, &getDataRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
if err == nil {
ech = c.addWatcher(path, watchTypeData)
}
})
if err != nil {
return nil, nil, nil, err
}
return res.Data, &res.Stat, ech, err
}
func (c *Conn) Set(path string, data []byte, version int32) (*Stat, error) {
res := &setDataResponse{}
_, err := c.request(opSetData, &SetDataRequest{path, data, version}, res, nil)
return &res.Stat, err
}
func (c *Conn) Create(path string, data []byte, flags int32, acl []ACL) (string, error) {
res := &createResponse{}
_, err := c.request(opCreate, &CreateRequest{path, data, acl, flags}, res, nil)
return res.Path, err
}
// CreateProtectedEphemeralSequential fixes a race condition if the server crashes
// after it creates the node. On reconnect the session may still be valid so the
// ephemeral node still exists. Therefore, on reconnect we need to check if a node
// with a GUID generated on create exists.
func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl []ACL) (string, error) {
var guid [16]byte
_, err := io.ReadFull(rand.Reader, guid[:16])
if err != nil {
return "", err
}
guidStr := fmt.Sprintf("%x", guid)
parts := strings.Split(path, "/")
parts[len(parts)-1] = fmt.Sprintf("%s%s-%s", protectedPrefix, guidStr, parts[len(parts)-1])
rootPath := strings.Join(parts[:len(parts)-1], "/")
protectedPath := strings.Join(parts, "/")
var newPath string
for i := 0; i < 3; i++ {
newPath, err = c.Create(protectedPath, data, FlagEphemeral|FlagSequence, acl)
switch err {
case ErrSessionExpired:
// No need to search for the node since it can't exist. Just try again.
case ErrConnectionClosed:
children, _, err := c.Children(rootPath)
if err != nil {
return "", err
}
for _, p := range children {
parts := strings.Split(p, "/")
if pth := parts[len(parts)-1]; strings.HasPrefix(pth, protectedPrefix) {
if g := pth[len(protectedPrefix) : len(protectedPrefix)+32]; g == guidStr {
return rootPath + "/" + p, nil
}
}
}
case nil:
return newPath, nil
default:
return "", err
}
}
return "", err
}
func (c *Conn) Delete(path string, version int32) error {
_, err := c.request(opDelete, &DeleteRequest{path, version}, &deleteResponse{}, nil)
return err
}
func (c *Conn) Exists(path string) (bool, *Stat, error) {
res := &existsResponse{}
_, err := c.request(opExists, &existsRequest{Path: path, Watch: false}, res, nil)
exists := true
if err == ErrNoNode {
exists = false
err = nil
}
return exists, &res.Stat, err
}
func (c *Conn) ExistsW(path string) (bool, *Stat, <-chan Event, error) {
var ech <-chan Event
res := &existsResponse{}
_, err := c.request(opExists, &existsRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
if err == nil {
ech = c.addWatcher(path, watchTypeData)
} else if err == ErrNoNode {
ech = c.addWatcher(path, watchTypeExist)
}
})
exists := true
if err == ErrNoNode {
exists = false
err = nil
}
if err != nil {
return false, nil, nil, err
}
return exists, &res.Stat, ech, err
}
func (c *Conn) GetACL(path string) ([]ACL, *Stat, error) {
res := &getAclResponse{}
_, err := c.request(opGetAcl, &getAclRequest{Path: path}, res, nil)
return res.Acl, &res.Stat, err
}
func (c *Conn) SetACL(path string, acl []ACL, version int32) (*Stat, error) {
res := &setAclResponse{}
_, err := c.request(opSetAcl, &setAclRequest{Path: path, Acl: acl, Version: version}, res, nil)
return &res.Stat, err
}
func (c *Conn) Sync(path string) (string, error) {
res := &syncResponse{}
_, err := c.request(opSync, &syncRequest{Path: path}, res, nil)
return res.Path, err
}
type MultiResponse struct {
Stat *Stat
String string
}
// Multi executes multiple ZooKeeper operations or none of them. The provided
// ops must be one of *CreateRequest, *DeleteRequest, *SetDataRequest, or
// *CheckVersionRequest.
func (c *Conn) Multi(ops ...interface{}) ([]MultiResponse, error) {
req := &multiRequest{
Ops: make([]multiRequestOp, 0, len(ops)),
DoneHeader: multiHeader{Type: -1, Done: true, Err: -1},
}
for _, op := range ops {
var opCode int32
switch op.(type) {
case *CreateRequest:
opCode = opCreate
case *SetDataRequest:
opCode = opSetData
case *DeleteRequest:
opCode = opDelete
case *CheckVersionRequest:
opCode = opCheck
default:
return nil, fmt.Errorf("uknown operation type %T", op)
}
req.Ops = append(req.Ops, multiRequestOp{multiHeader{opCode, false, -1}, op})
}
res := &multiResponse{}
_, err := c.request(opMulti, req, res, nil)
mr := make([]MultiResponse, len(res.Ops))
for i, op := range res.Ops {
mr[i] = MultiResponse{Stat: op.Stat, String: op.String}
}
return mr, err
}

View file

@ -1,242 +0,0 @@
package zk
import (
"errors"
)
const (
protocolVersion = 0
DefaultPort = 2181
)
const (
opNotify = 0
opCreate = 1
opDelete = 2
opExists = 3
opGetData = 4
opSetData = 5
opGetAcl = 6
opSetAcl = 7
opGetChildren = 8
opSync = 9
opPing = 11
opGetChildren2 = 12
opCheck = 13
opMulti = 14
opClose = -11
opSetAuth = 100
opSetWatches = 101
// Not in protocol, used internally
opWatcherEvent = -2
)
const (
EventNodeCreated = EventType(1)
EventNodeDeleted = EventType(2)
EventNodeDataChanged = EventType(3)
EventNodeChildrenChanged = EventType(4)
EventSession = EventType(-1)
EventNotWatching = EventType(-2)
)
var (
eventNames = map[EventType]string{
EventNodeCreated: "EventNodeCreated",
EventNodeDeleted: "EventNodeDeleted",
EventNodeDataChanged: "EventNodeDataChanged",
EventNodeChildrenChanged: "EventNodeChildrenChanged",
EventSession: "EventSession",
EventNotWatching: "EventNotWatching",
}
)
const (
StateUnknown = State(-1)
StateDisconnected = State(0)
StateConnecting = State(1)
StateSyncConnected = State(3)
StateAuthFailed = State(4)
StateConnectedReadOnly = State(5)
StateSaslAuthenticated = State(6)
StateExpired = State(-112)
// StateAuthFailed = State(-113)
StateConnected = State(100)
StateHasSession = State(101)
)
const (
FlagEphemeral = 1
FlagSequence = 2
)
var (
stateNames = map[State]string{
StateUnknown: "StateUnknown",
StateDisconnected: "StateDisconnected",
StateSyncConnected: "StateSyncConnected",
StateConnectedReadOnly: "StateConnectedReadOnly",
StateSaslAuthenticated: "StateSaslAuthenticated",
StateExpired: "StateExpired",
StateAuthFailed: "StateAuthFailed",
StateConnecting: "StateConnecting",
StateConnected: "StateConnected",
StateHasSession: "StateHasSession",
}
)
type State int32
func (s State) String() string {
if name := stateNames[s]; name != "" {
return name
}
return "Unknown"
}
type ErrCode int32
var (
ErrConnectionClosed = errors.New("zk: connection closed")
ErrUnknown = errors.New("zk: unknown error")
ErrAPIError = errors.New("zk: api error")
ErrNoNode = errors.New("zk: node does not exist")
ErrNoAuth = errors.New("zk: not authenticated")
ErrBadVersion = errors.New("zk: version conflict")
ErrNoChildrenForEphemerals = errors.New("zk: ephemeral nodes may not have children")
ErrNodeExists = errors.New("zk: node already exists")
ErrNotEmpty = errors.New("zk: node has children")
ErrSessionExpired = errors.New("zk: session has been expired by the server")
ErrInvalidACL = errors.New("zk: invalid ACL specified")
ErrAuthFailed = errors.New("zk: client authentication failed")
ErrClosing = errors.New("zk: zookeeper is closing")
ErrNothing = errors.New("zk: no server responsees to process")
ErrSessionMoved = errors.New("zk: session moved to another server, so operation is ignored")
// ErrInvalidCallback = errors.New("zk: invalid callback specified")
errCodeToError = map[ErrCode]error{
0: nil,
errAPIError: ErrAPIError,
errNoNode: ErrNoNode,
errNoAuth: ErrNoAuth,
errBadVersion: ErrBadVersion,
errNoChildrenForEphemerals: ErrNoChildrenForEphemerals,
errNodeExists: ErrNodeExists,
errNotEmpty: ErrNotEmpty,
errSessionExpired: ErrSessionExpired,
// errInvalidCallback: ErrInvalidCallback,
errInvalidAcl: ErrInvalidACL,
errAuthFailed: ErrAuthFailed,
errClosing: ErrClosing,
errNothing: ErrNothing,
errSessionMoved: ErrSessionMoved,
}
)
func (e ErrCode) toError() error {
if err, ok := errCodeToError[e]; ok {
return err
}
return ErrUnknown
}
const (
errOk = 0
// System and server-side errors
errSystemError = -1
errRuntimeInconsistency = -2
errDataInconsistency = -3
errConnectionLoss = -4
errMarshallingError = -5
errUnimplemented = -6
errOperationTimeout = -7
errBadArguments = -8
errInvalidState = -9
// API errors
errAPIError = ErrCode(-100)
errNoNode = ErrCode(-101) // *
errNoAuth = ErrCode(-102)
errBadVersion = ErrCode(-103) // *
errNoChildrenForEphemerals = ErrCode(-108)
errNodeExists = ErrCode(-110) // *
errNotEmpty = ErrCode(-111)
errSessionExpired = ErrCode(-112)
errInvalidCallback = ErrCode(-113)
errInvalidAcl = ErrCode(-114)
errAuthFailed = ErrCode(-115)
errClosing = ErrCode(-116)
errNothing = ErrCode(-117)
errSessionMoved = ErrCode(-118)
)
// Constants for ACL permissions
const (
PermRead = 1 << iota
PermWrite
PermCreate
PermDelete
PermAdmin
PermAll = 0x1f
)
var (
emptyPassword = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
opNames = map[int32]string{
opNotify: "notify",
opCreate: "create",
opDelete: "delete",
opExists: "exists",
opGetData: "getData",
opSetData: "setData",
opGetAcl: "getACL",
opSetAcl: "setACL",
opGetChildren: "getChildren",
opSync: "sync",
opPing: "ping",
opGetChildren2: "getChildren2",
opCheck: "check",
opMulti: "multi",
opClose: "close",
opSetAuth: "setAuth",
opSetWatches: "setWatches",
opWatcherEvent: "watcherEvent",
}
)
type EventType int32
func (t EventType) String() string {
if name := eventNames[t]; name != "" {
return name
}
return "Unknown"
}
// Mode is used to build custom server modes (leader|follower|standalone).
type Mode uint8
func (m Mode) String() string {
if name := modeNames[m]; name != "" {
return name
}
return "unknown"
}
const (
ModeUnknown Mode = iota
ModeLeader Mode = iota
ModeFollower Mode = iota
ModeStandalone Mode = iota
)
var (
modeNames = map[Mode]string{
ModeLeader: "leader",
ModeFollower: "follower",
ModeStandalone: "standalone",
}
)

View file

@ -1,288 +0,0 @@
package zk
import (
"bufio"
"bytes"
"fmt"
"io/ioutil"
"math/big"
"net"
"regexp"
"strconv"
"time"
)
// FLWSrvr is a FourLetterWord helper function. In particular, this function pulls the srvr output
// from the zookeeper instances and parses the output. A slice of *ServerStats structs are returned
// as well as a boolean value to indicate whether this function processed successfully.
//
// If the boolean value is false there was a problem. If the *ServerStats slice is empty or nil,
// then the error happened before we started to obtain 'srvr' values. Otherwise, one of the
// servers had an issue and the "Error" value in the struct should be inspected to determine
// which server had the issue.
func FLWSrvr(servers []string, timeout time.Duration) ([]*ServerStats, bool) {
// different parts of the regular expression that are required to parse the srvr output
var (
zrVer = `^Zookeeper version: ([A-Za-z0-9\.\-]+), built on (\d\d/\d\d/\d\d\d\d \d\d:\d\d [A-Za-z0-9:\+\-]+)`
zrLat = `^Latency min/avg/max: (\d+)/(\d+)/(\d+)`
zrNet = `^Received: (\d+).*\n^Sent: (\d+).*\n^Connections: (\d+).*\n^Outstanding: (\d+)`
zrState = `^Zxid: (0x[A-Za-z0-9]+).*\n^Mode: (\w+).*\n^Node count: (\d+)`
)
// build the regex from the pieces above
re, err := regexp.Compile(fmt.Sprintf(`(?m:\A%v.*\n%v.*\n%v.*\n%v)`, zrVer, zrLat, zrNet, zrState))
if err != nil {
return nil, false
}
imOk := true
servers = FormatServers(servers)
ss := make([]*ServerStats, len(servers))
for i := range ss {
response, err := fourLetterWord(servers[i], "srvr", timeout)
if err != nil {
ss[i] = &ServerStats{Error: err}
imOk = false
continue
}
match := re.FindAllStringSubmatch(string(response), -1)[0][1:]
if match == nil {
err := fmt.Errorf("unable to parse fields from zookeeper response (no regex matches)")
ss[i] = &ServerStats{Error: err}
imOk = false
continue
}
// determine current server
var srvrMode Mode
switch match[10] {
case "leader":
srvrMode = ModeLeader
case "follower":
srvrMode = ModeFollower
case "standalone":
srvrMode = ModeStandalone
default:
srvrMode = ModeUnknown
}
buildTime, err := time.Parse("01/02/2006 15:04 MST", match[1])
if err != nil {
ss[i] = &ServerStats{Error: err}
imOk = false
continue
}
parsedInt, err := strconv.ParseInt(match[9], 0, 64)
if err != nil {
ss[i] = &ServerStats{Error: err}
imOk = false
continue
}
// the ZxID value is an int64 with two int32s packed inside
// the high int32 is the epoch (i.e., number of leader elections)
// the low int32 is the counter
epoch := int32(parsedInt >> 32)
counter := int32(parsedInt & 0xFFFFFFFF)
// within the regex above, these values must be numerical
// so we can avoid useless checking of the error return value
minLatency, _ := strconv.ParseInt(match[2], 0, 64)
avgLatency, _ := strconv.ParseInt(match[3], 0, 64)
maxLatency, _ := strconv.ParseInt(match[4], 0, 64)
recv, _ := strconv.ParseInt(match[5], 0, 64)
sent, _ := strconv.ParseInt(match[6], 0, 64)
cons, _ := strconv.ParseInt(match[7], 0, 64)
outs, _ := strconv.ParseInt(match[8], 0, 64)
ncnt, _ := strconv.ParseInt(match[11], 0, 64)
ss[i] = &ServerStats{
Sent: sent,
Received: recv,
NodeCount: ncnt,
MinLatency: minLatency,
AvgLatency: avgLatency,
MaxLatency: maxLatency,
Connections: cons,
Outstanding: outs,
Epoch: epoch,
Counter: counter,
BuildTime: buildTime,
Mode: srvrMode,
Version: match[0],
}
}
return ss, imOk
}
// FLWRuok is a FourLetterWord helper function. In particular, this function
// pulls the ruok output from each server.
func FLWRuok(servers []string, timeout time.Duration) []bool {
servers = FormatServers(servers)
oks := make([]bool, len(servers))
for i := range oks {
response, err := fourLetterWord(servers[i], "ruok", timeout)
if err != nil {
continue
}
if bytes.Equal(response[:4], []byte("imok")) {
oks[i] = true
}
}
return oks
}
// FLWCons is a FourLetterWord helper function. In particular, this function
// pulls the ruok output from each server.
//
// As with FLWSrvr, the boolean value indicates whether one of the requests had
// an issue. The Clients struct has an Error value that can be checked.
func FLWCons(servers []string, timeout time.Duration) ([]*ServerClients, bool) {
var (
zrAddr = `^ /((?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?):(?:\d+))\[\d+\]`
zrPac = `\(queued=(\d+),recved=(\d+),sent=(\d+),sid=(0x[A-Za-z0-9]+),lop=(\w+),est=(\d+),to=(\d+),`
zrSesh = `lcxid=(0x[A-Za-z0-9]+),lzxid=(0x[A-Za-z0-9]+),lresp=(\d+),llat=(\d+),minlat=(\d+),avglat=(\d+),maxlat=(\d+)\)`
)
re, err := regexp.Compile(fmt.Sprintf("%v%v%v", zrAddr, zrPac, zrSesh))
if err != nil {
return nil, false
}
servers = FormatServers(servers)
sc := make([]*ServerClients, len(servers))
imOk := true
for i := range sc {
response, err := fourLetterWord(servers[i], "cons", timeout)
if err != nil {
sc[i] = &ServerClients{Error: err}
imOk = false
continue
}
scan := bufio.NewScanner(bytes.NewReader(response))
var clients []*ServerClient
for scan.Scan() {
line := scan.Bytes()
if len(line) == 0 {
continue
}
m := re.FindAllStringSubmatch(string(line), -1)
if m == nil {
err := fmt.Errorf("unable to parse fields from zookeeper response (no regex matches)")
sc[i] = &ServerClients{Error: err}
imOk = false
continue
}
match := m[0][1:]
queued, _ := strconv.ParseInt(match[1], 0, 64)
recvd, _ := strconv.ParseInt(match[2], 0, 64)
sent, _ := strconv.ParseInt(match[3], 0, 64)
sid, _ := strconv.ParseInt(match[4], 0, 64)
est, _ := strconv.ParseInt(match[6], 0, 64)
timeout, _ := strconv.ParseInt(match[7], 0, 32)
lresp, _ := strconv.ParseInt(match[10], 0, 64)
llat, _ := strconv.ParseInt(match[11], 0, 32)
minlat, _ := strconv.ParseInt(match[12], 0, 32)
avglat, _ := strconv.ParseInt(match[13], 0, 32)
maxlat, _ := strconv.ParseInt(match[14], 0, 32)
// zookeeper returns a value, '0xffffffffffffffff', as the
// Lzxid for PING requests in the 'cons' output.
// unfortunately, in Go that is an invalid int64 and is not represented
// as -1.
// However, converting the string value to a big.Int and then back to
// and int64 properly sets the value to -1
lzxid, ok := new(big.Int).SetString(match[9], 0)
var errVal error
if !ok {
errVal = fmt.Errorf("failed to convert lzxid value to big.Int")
imOk = false
}
lcxid, ok := new(big.Int).SetString(match[8], 0)
if !ok && errVal == nil {
errVal = fmt.Errorf("failed to convert lcxid value to big.Int")
imOk = false
}
clients = append(clients, &ServerClient{
Queued: queued,
Received: recvd,
Sent: sent,
SessionID: sid,
Lcxid: lcxid.Int64(),
Lzxid: lzxid.Int64(),
Timeout: int32(timeout),
LastLatency: int32(llat),
MinLatency: int32(minlat),
AvgLatency: int32(avglat),
MaxLatency: int32(maxlat),
Established: time.Unix(est, 0),
LastResponse: time.Unix(lresp, 0),
Addr: match[0],
LastOperation: match[5],
Error: errVal,
})
}
sc[i] = &ServerClients{Clients: clients}
}
return sc, imOk
}
func fourLetterWord(server, command string, timeout time.Duration) ([]byte, error) {
conn, err := net.DialTimeout("tcp", server, timeout)
if err != nil {
return nil, err
}
// the zookeeper server should automatically close this socket
// once the command has been processed, but better safe than sorry
defer conn.Close()
conn.SetWriteDeadline(time.Now().Add(timeout))
_, err = conn.Write([]byte(command))
if err != nil {
return nil, err
}
conn.SetReadDeadline(time.Now().Add(timeout))
resp, err := ioutil.ReadAll(conn)
if err != nil {
return nil, err
}
return resp, nil
}

View file

@ -1,131 +0,0 @@
package zk
import (
"errors"
"fmt"
"strconv"
"strings"
)
var (
ErrDeadlock = errors.New("zk: trying to acquire a lock twice")
ErrNotLocked = errors.New("zk: not locked")
)
type Lock struct {
c *Conn
path string
acl []ACL
lockPath string
seq int
}
func NewLock(c *Conn, path string, acl []ACL) *Lock {
return &Lock{
c: c,
path: path,
acl: acl,
}
}
func parseSeq(path string) (int, error) {
parts := strings.Split(path, "-")
return strconv.Atoi(parts[len(parts)-1])
}
func (l *Lock) Lock() error {
if l.lockPath != "" {
return ErrDeadlock
}
prefix := fmt.Sprintf("%s/lock-", l.path)
path := ""
var err error
for i := 0; i < 3; i++ {
path, err = l.c.CreateProtectedEphemeralSequential(prefix, []byte{}, l.acl)
if err == ErrNoNode {
// Create parent node.
parts := strings.Split(l.path, "/")
pth := ""
for _, p := range parts[1:] {
pth += "/" + p
_, err := l.c.Create(pth, []byte{}, 0, l.acl)
if err != nil && err != ErrNodeExists {
return err
}
}
} else if err == nil {
break
} else {
return err
}
}
if err != nil {
return err
}
seq, err := parseSeq(path)
if err != nil {
return err
}
for {
children, _, err := l.c.Children(l.path)
if err != nil {
return err
}
lowestSeq := seq
prevSeq := 0
prevSeqPath := ""
for _, p := range children {
s, err := parseSeq(p)
if err != nil {
return err
}
if s < lowestSeq {
lowestSeq = s
}
if s < seq && s > prevSeq {
prevSeq = s
prevSeqPath = p
}
}
if seq == lowestSeq {
// Acquired the lock
break
}
// Wait on the node next in line for the lock
_, _, ch, err := l.c.GetW(l.path + "/" + prevSeqPath)
if err != nil && err != ErrNoNode {
return err
} else if err != nil && err == ErrNoNode {
// try again
continue
}
ev := <-ch
if ev.Err != nil {
return ev.Err
}
}
l.seq = seq
l.lockPath = path
return nil
}
func (l *Lock) Unlock() error {
if l.lockPath == "" {
return ErrNotLocked
}
if err := l.c.Delete(l.lockPath, -1); err != nil {
return err
}
l.lockPath = ""
l.seq = 0
return nil
}

View file

@ -1,119 +0,0 @@
package zk
import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"time"
)
type TestServer struct {
Port int
Path string
Srv *Server
}
type TestCluster struct {
Path string
Servers []TestServer
}
func StartTestCluster(size int, stdout, stderr io.Writer) (*TestCluster, error) {
tmpPath, err := ioutil.TempDir("", "gozk")
if err != nil {
return nil, err
}
success := false
startPort := int(rand.Int31n(6000) + 10000)
cluster := &TestCluster{Path: tmpPath}
defer func() {
if !success {
cluster.Stop()
}
}()
for serverN := 0; serverN < size; serverN++ {
srvPath := filepath.Join(tmpPath, fmt.Sprintf("srv%d", serverN))
if err := os.Mkdir(srvPath, 0700); err != nil {
return nil, err
}
port := startPort + serverN*3
cfg := ServerConfig{
ClientPort: port,
DataDir: srvPath,
}
for i := 0; i < size; i++ {
cfg.Servers = append(cfg.Servers, ServerConfigServer{
ID: i + 1,
Host: "127.0.0.1",
PeerPort: startPort + i*3 + 1,
LeaderElectionPort: startPort + i*3 + 2,
})
}
cfgPath := filepath.Join(srvPath, "zoo.cfg")
fi, err := os.Create(cfgPath)
if err != nil {
return nil, err
}
err = cfg.Marshall(fi)
fi.Close()
if err != nil {
return nil, err
}
fi, err = os.Create(filepath.Join(srvPath, "myid"))
if err != nil {
return nil, err
}
_, err = fmt.Fprintf(fi, "%d\n", serverN+1)
fi.Close()
if err != nil {
return nil, err
}
srv := &Server{
ConfigPath: cfgPath,
Stdout: stdout,
Stderr: stderr,
}
if err := srv.Start(); err != nil {
return nil, err
}
cluster.Servers = append(cluster.Servers, TestServer{
Path: srvPath,
Port: cfg.ClientPort,
Srv: srv,
})
}
success = true
time.Sleep(time.Second) // Give the server time to become active. Should probably actually attempt to connect to verify.
return cluster, nil
}
func (ts *TestCluster) Connect(idx int) (*Conn, error) {
zk, _, err := Connect([]string{fmt.Sprintf("127.0.0.1:%d", ts.Servers[idx].Port)}, time.Second*15)
return zk, err
}
func (ts *TestCluster) ConnectAll() (*Conn, <-chan Event, error) {
return ts.ConnectAllTimeout(time.Second * 15)
}
func (ts *TestCluster) ConnectAllTimeout(sessionTimeout time.Duration) (*Conn, <-chan Event, error) {
hosts := make([]string, len(ts.Servers))
for i, srv := range ts.Servers {
hosts[i] = fmt.Sprintf("127.0.0.1:%d", srv.Port)
}
zk, ch, err := Connect(hosts, sessionTimeout)
return zk, ch, err
}
func (ts *TestCluster) Stop() error {
for _, srv := range ts.Servers {
srv.Srv.Stop()
}
defer os.RemoveAll(ts.Path)
return nil
}

View file

@ -1,136 +0,0 @@
package zk
import (
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
)
type ErrMissingServerConfigField string
func (e ErrMissingServerConfigField) Error() string {
return fmt.Sprintf("zk: missing server config field '%s'", string(e))
}
const (
DefaultServerTickTime = 2000
DefaultServerInitLimit = 10
DefaultServerSyncLimit = 5
DefaultServerAutoPurgeSnapRetainCount = 3
DefaultPeerPort = 2888
DefaultLeaderElectionPort = 3888
)
type ServerConfigServer struct {
ID int
Host string
PeerPort int
LeaderElectionPort int
}
type ServerConfig struct {
TickTime int // Number of milliseconds of each tick
InitLimit int // Number of ticks that the initial synchronization phase can take
SyncLimit int // Number of ticks that can pass between sending a request and getting an acknowledgement
DataDir string // Direcrory where the snapshot is stored
ClientPort int // Port at which clients will connect
AutoPurgeSnapRetainCount int // Number of snapshots to retain in dataDir
AutoPurgePurgeInterval int // Purge task internal in hours (0 to disable auto purge)
Servers []ServerConfigServer
}
func (sc ServerConfig) Marshall(w io.Writer) error {
if sc.DataDir == "" {
return ErrMissingServerConfigField("dataDir")
}
fmt.Fprintf(w, "dataDir=%s\n", sc.DataDir)
if sc.TickTime <= 0 {
sc.TickTime = DefaultServerTickTime
}
fmt.Fprintf(w, "tickTime=%d\n", sc.TickTime)
if sc.InitLimit <= 0 {
sc.InitLimit = DefaultServerInitLimit
}
fmt.Fprintf(w, "initLimit=%d\n", sc.InitLimit)
if sc.SyncLimit <= 0 {
sc.SyncLimit = DefaultServerSyncLimit
}
fmt.Fprintf(w, "syncLimit=%d\n", sc.SyncLimit)
if sc.ClientPort <= 0 {
sc.ClientPort = DefaultPort
}
fmt.Fprintf(w, "clientPort=%d\n", sc.ClientPort)
if sc.AutoPurgePurgeInterval > 0 {
if sc.AutoPurgeSnapRetainCount <= 0 {
sc.AutoPurgeSnapRetainCount = DefaultServerAutoPurgeSnapRetainCount
}
fmt.Fprintf(w, "autopurge.snapRetainCount=%d\n", sc.AutoPurgeSnapRetainCount)
fmt.Fprintf(w, "autopurge.purgeInterval=%d\n", sc.AutoPurgePurgeInterval)
}
if len(sc.Servers) > 0 {
for _, srv := range sc.Servers {
if srv.PeerPort <= 0 {
srv.PeerPort = DefaultPeerPort
}
if srv.LeaderElectionPort <= 0 {
srv.LeaderElectionPort = DefaultLeaderElectionPort
}
fmt.Fprintf(w, "server.%d=%s:%d:%d\n", srv.ID, srv.Host, srv.PeerPort, srv.LeaderElectionPort)
}
}
return nil
}
var jarSearchPaths = []string{
"zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar",
"../zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar",
"/usr/share/java/zookeeper-*.jar",
"/usr/local/zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar",
"/usr/local/Cellar/zookeeper/*/libexec/contrib/fatjar/zookeeper-*-fatjar.jar",
}
func findZookeeperFatJar() string {
var paths []string
zkPath := os.Getenv("ZOOKEEPER_PATH")
if zkPath == "" {
paths = jarSearchPaths
} else {
paths = []string{filepath.Join(zkPath, "contrib/fatjar/zookeeper-*-fatjar.jar")}
}
for _, path := range paths {
matches, _ := filepath.Glob(path)
// TODO: could sort by version and pick latest
if len(matches) > 0 {
return matches[0]
}
}
return ""
}
type Server struct {
JarPath string
ConfigPath string
Stdout, Stderr io.Writer
cmd *exec.Cmd
}
func (srv *Server) Start() error {
if srv.JarPath == "" {
srv.JarPath = findZookeeperFatJar()
if srv.JarPath == "" {
return fmt.Errorf("zk: unable to find server jar")
}
}
srv.cmd = exec.Command("java", "-jar", srv.JarPath, "server", srv.ConfigPath)
srv.cmd.Stdout = srv.Stdout
srv.cmd.Stderr = srv.Stderr
return srv.cmd.Start()
}
func (srv *Server) Stop() error {
srv.cmd.Process.Signal(os.Kill)
return srv.cmd.Wait()
}

View file

@ -1,633 +0,0 @@
package zk
import (
"encoding/binary"
"errors"
"reflect"
"runtime"
"time"
)
var (
ErrUnhandledFieldType = errors.New("zk: unhandled field type")
ErrPtrExpected = errors.New("zk: encode/decode expect a non-nil pointer to struct")
ErrShortBuffer = errors.New("zk: buffer too small")
)
type ACL struct {
Perms int32
Scheme string
ID string
}
type Stat struct {
Czxid int64 // The zxid of the change that caused this znode to be created.
Mzxid int64 // The zxid of the change that last modified this znode.
Ctime int64 // The time in milliseconds from epoch when this znode was created.
Mtime int64 // The time in milliseconds from epoch when this znode was last modified.
Version int32 // The number of changes to the data of this znode.
Cversion int32 // The number of changes to the children of this znode.
Aversion int32 // The number of changes to the ACL of this znode.
EphemeralOwner int64 // The session id of the owner of this znode if the znode is an ephemeral node. If it is not an ephemeral node, it will be zero.
DataLength int32 // The length of the data field of this znode.
NumChildren int32 // The number of children of this znode.
Pzxid int64 // last modified children
}
// ServerClient is the information for a single Zookeeper client and its session.
// This is used to parse/extract the output fo the `cons` command.
type ServerClient struct {
Queued int64
Received int64
Sent int64
SessionID int64
Lcxid int64
Lzxid int64
Timeout int32
LastLatency int32
MinLatency int32
AvgLatency int32
MaxLatency int32
Established time.Time
LastResponse time.Time
Addr string
LastOperation string // maybe?
Error error
}
// ServerClients is a struct for the FLWCons() function. It's used to provide
// the list of Clients.
//
// This is needed because FLWCons() takes multiple servers.
type ServerClients struct {
Clients []*ServerClient
Error error
}
// ServerStats is the information pulled from the Zookeeper `stat` command.
type ServerStats struct {
Sent int64
Received int64
NodeCount int64
MinLatency int64
AvgLatency int64
MaxLatency int64
Connections int64
Outstanding int64
Epoch int32
Counter int32
BuildTime time.Time
Mode Mode
Version string
Error error
}
type requestHeader struct {
Xid int32
Opcode int32
}
type responseHeader struct {
Xid int32
Zxid int64
Err ErrCode
}
type multiHeader struct {
Type int32
Done bool
Err ErrCode
}
type auth struct {
Type int32
Scheme string
Auth []byte
}
// Generic request structs
type pathRequest struct {
Path string
}
type PathVersionRequest struct {
Path string
Version int32
}
type pathWatchRequest struct {
Path string
Watch bool
}
type pathResponse struct {
Path string
}
type statResponse struct {
Stat Stat
}
//
type CheckVersionRequest PathVersionRequest
type closeRequest struct{}
type closeResponse struct{}
type connectRequest struct {
ProtocolVersion int32
LastZxidSeen int64
TimeOut int32
SessionID int64
Passwd []byte
}
type connectResponse struct {
ProtocolVersion int32
TimeOut int32
SessionID int64
Passwd []byte
}
type CreateRequest struct {
Path string
Data []byte
Acl []ACL
Flags int32
}
type createResponse pathResponse
type DeleteRequest PathVersionRequest
type deleteResponse struct{}
type errorResponse struct {
Err int32
}
type existsRequest pathWatchRequest
type existsResponse statResponse
type getAclRequest pathRequest
type getAclResponse struct {
Acl []ACL
Stat Stat
}
type getChildrenRequest pathRequest
type getChildrenResponse struct {
Children []string
}
type getChildren2Request pathWatchRequest
type getChildren2Response struct {
Children []string
Stat Stat
}
type getDataRequest pathWatchRequest
type getDataResponse struct {
Data []byte
Stat Stat
}
type getMaxChildrenRequest pathRequest
type getMaxChildrenResponse struct {
Max int32
}
type getSaslRequest struct {
Token []byte
}
type pingRequest struct{}
type pingResponse struct{}
type setAclRequest struct {
Path string
Acl []ACL
Version int32
}
type setAclResponse statResponse
type SetDataRequest struct {
Path string
Data []byte
Version int32
}
type setDataResponse statResponse
type setMaxChildren struct {
Path string
Max int32
}
type setSaslRequest struct {
Token string
}
type setSaslResponse struct {
Token string
}
type setWatchesRequest struct {
RelativeZxid int64
DataWatches []string
ExistWatches []string
ChildWatches []string
}
type setWatchesResponse struct{}
type syncRequest pathRequest
type syncResponse pathResponse
type setAuthRequest auth
type setAuthResponse struct{}
type multiRequestOp struct {
Header multiHeader
Op interface{}
}
type multiRequest struct {
Ops []multiRequestOp
DoneHeader multiHeader
}
type multiResponseOp struct {
Header multiHeader
String string
Stat *Stat
}
type multiResponse struct {
Ops []multiResponseOp
DoneHeader multiHeader
}
func (r *multiRequest) Encode(buf []byte) (int, error) {
total := 0
for _, op := range r.Ops {
op.Header.Done = false
n, err := encodePacketValue(buf[total:], reflect.ValueOf(op))
if err != nil {
return total, err
}
total += n
}
r.DoneHeader.Done = true
n, err := encodePacketValue(buf[total:], reflect.ValueOf(r.DoneHeader))
if err != nil {
return total, err
}
total += n
return total, nil
}
func (r *multiRequest) Decode(buf []byte) (int, error) {
r.Ops = make([]multiRequestOp, 0)
r.DoneHeader = multiHeader{-1, true, -1}
total := 0
for {
header := &multiHeader{}
n, err := decodePacketValue(buf[total:], reflect.ValueOf(header))
if err != nil {
return total, err
}
total += n
if header.Done {
r.DoneHeader = *header
break
}
req := requestStructForOp(header.Type)
if req == nil {
return total, ErrAPIError
}
n, err = decodePacketValue(buf[total:], reflect.ValueOf(req))
if err != nil {
return total, err
}
total += n
r.Ops = append(r.Ops, multiRequestOp{*header, req})
}
return total, nil
}
func (r *multiResponse) Decode(buf []byte) (int, error) {
r.Ops = make([]multiResponseOp, 0)
r.DoneHeader = multiHeader{-1, true, -1}
total := 0
for {
header := &multiHeader{}
n, err := decodePacketValue(buf[total:], reflect.ValueOf(header))
if err != nil {
return total, err
}
total += n
if header.Done {
r.DoneHeader = *header
break
}
res := multiResponseOp{Header: *header}
var w reflect.Value
switch header.Type {
default:
return total, ErrAPIError
case opCreate:
w = reflect.ValueOf(&res.String)
case opSetData:
res.Stat = new(Stat)
w = reflect.ValueOf(res.Stat)
case opCheck, opDelete:
}
if w.IsValid() {
n, err := decodePacketValue(buf[total:], w)
if err != nil {
return total, err
}
total += n
}
r.Ops = append(r.Ops, res)
}
return total, nil
}
type watcherEvent struct {
Type EventType
State State
Path string
}
type decoder interface {
Decode(buf []byte) (int, error)
}
type encoder interface {
Encode(buf []byte) (int, error)
}
func decodePacket(buf []byte, st interface{}) (n int, err error) {
defer func() {
if r := recover(); r != nil {
if e, ok := r.(runtime.Error); ok && e.Error() == "runtime error: slice bounds out of range" {
err = ErrShortBuffer
} else {
panic(r)
}
}
}()
v := reflect.ValueOf(st)
if v.Kind() != reflect.Ptr || v.IsNil() {
return 0, ErrPtrExpected
}
return decodePacketValue(buf, v)
}
func decodePacketValue(buf []byte, v reflect.Value) (int, error) {
rv := v
kind := v.Kind()
if kind == reflect.Ptr {
if v.IsNil() {
v.Set(reflect.New(v.Type().Elem()))
}
v = v.Elem()
kind = v.Kind()
}
n := 0
switch kind {
default:
return n, ErrUnhandledFieldType
case reflect.Struct:
if de, ok := rv.Interface().(decoder); ok {
return de.Decode(buf)
} else if de, ok := v.Interface().(decoder); ok {
return de.Decode(buf)
} else {
for i := 0; i < v.NumField(); i++ {
field := v.Field(i)
n2, err := decodePacketValue(buf[n:], field)
n += n2
if err != nil {
return n, err
}
}
}
case reflect.Bool:
v.SetBool(buf[n] != 0)
n++
case reflect.Int32:
v.SetInt(int64(binary.BigEndian.Uint32(buf[n : n+4])))
n += 4
case reflect.Int64:
v.SetInt(int64(binary.BigEndian.Uint64(buf[n : n+8])))
n += 8
case reflect.String:
ln := int(binary.BigEndian.Uint32(buf[n : n+4]))
v.SetString(string(buf[n+4 : n+4+ln]))
n += 4 + ln
case reflect.Slice:
switch v.Type().Elem().Kind() {
default:
count := int(binary.BigEndian.Uint32(buf[n : n+4]))
n += 4
values := reflect.MakeSlice(v.Type(), count, count)
v.Set(values)
for i := 0; i < count; i++ {
n2, err := decodePacketValue(buf[n:], values.Index(i))
n += n2
if err != nil {
return n, err
}
}
case reflect.Uint8:
ln := int(int32(binary.BigEndian.Uint32(buf[n : n+4])))
if ln < 0 {
n += 4
v.SetBytes(nil)
} else {
bytes := make([]byte, ln)
copy(bytes, buf[n+4:n+4+ln])
v.SetBytes(bytes)
n += 4 + ln
}
}
}
return n, nil
}
func encodePacket(buf []byte, st interface{}) (n int, err error) {
defer func() {
if r := recover(); r != nil {
if e, ok := r.(runtime.Error); ok && e.Error() == "runtime error: slice bounds out of range" {
err = ErrShortBuffer
} else {
panic(r)
}
}
}()
v := reflect.ValueOf(st)
if v.Kind() != reflect.Ptr || v.IsNil() {
return 0, ErrPtrExpected
}
return encodePacketValue(buf, v)
}
func encodePacketValue(buf []byte, v reflect.Value) (int, error) {
rv := v
for v.Kind() == reflect.Ptr || v.Kind() == reflect.Interface {
v = v.Elem()
}
n := 0
switch v.Kind() {
default:
return n, ErrUnhandledFieldType
case reflect.Struct:
if en, ok := rv.Interface().(encoder); ok {
return en.Encode(buf)
} else if en, ok := v.Interface().(encoder); ok {
return en.Encode(buf)
} else {
for i := 0; i < v.NumField(); i++ {
field := v.Field(i)
n2, err := encodePacketValue(buf[n:], field)
n += n2
if err != nil {
return n, err
}
}
}
case reflect.Bool:
if v.Bool() {
buf[n] = 1
} else {
buf[n] = 0
}
n++
case reflect.Int32:
binary.BigEndian.PutUint32(buf[n:n+4], uint32(v.Int()))
n += 4
case reflect.Int64:
binary.BigEndian.PutUint64(buf[n:n+8], uint64(v.Int()))
n += 8
case reflect.String:
str := v.String()
binary.BigEndian.PutUint32(buf[n:n+4], uint32(len(str)))
copy(buf[n+4:n+4+len(str)], []byte(str))
n += 4 + len(str)
case reflect.Slice:
switch v.Type().Elem().Kind() {
default:
count := v.Len()
startN := n
n += 4
for i := 0; i < count; i++ {
n2, err := encodePacketValue(buf[n:], v.Index(i))
n += n2
if err != nil {
return n, err
}
}
binary.BigEndian.PutUint32(buf[startN:startN+4], uint32(count))
case reflect.Uint8:
if v.IsNil() {
binary.BigEndian.PutUint32(buf[n:n+4], uint32(0xffffffff))
n += 4
} else {
bytes := v.Bytes()
binary.BigEndian.PutUint32(buf[n:n+4], uint32(len(bytes)))
copy(buf[n+4:n+4+len(bytes)], bytes)
n += 4 + len(bytes)
}
}
}
return n, nil
}
func requestStructForOp(op int32) interface{} {
switch op {
case opClose:
return &closeRequest{}
case opCreate:
return &CreateRequest{}
case opDelete:
return &DeleteRequest{}
case opExists:
return &existsRequest{}
case opGetAcl:
return &getAclRequest{}
case opGetChildren:
return &getChildrenRequest{}
case opGetChildren2:
return &getChildren2Request{}
case opGetData:
return &getDataRequest{}
case opPing:
return &pingRequest{}
case opSetAcl:
return &setAclRequest{}
case opSetData:
return &SetDataRequest{}
case opSetWatches:
return &setWatchesRequest{}
case opSync:
return &syncRequest{}
case opSetAuth:
return &setAuthRequest{}
case opCheck:
return &CheckVersionRequest{}
case opMulti:
return &multiRequest{}
}
return nil
}
func responseStructForOp(op int32) interface{} {
switch op {
case opClose:
return &closeResponse{}
case opCreate:
return &createResponse{}
case opDelete:
return &deleteResponse{}
case opExists:
return &existsResponse{}
case opGetAcl:
return &getAclResponse{}
case opGetChildren:
return &getChildrenResponse{}
case opGetChildren2:
return &getChildren2Response{}
case opGetData:
return &getDataResponse{}
case opPing:
return &pingResponse{}
case opSetAcl:
return &setAclResponse{}
case opSetData:
return &setDataResponse{}
case opSetWatches:
return &setWatchesResponse{}
case opSync:
return &syncResponse{}
case opWatcherEvent:
return &watcherEvent{}
case opSetAuth:
return &setAuthResponse{}
// case opCheck:
// return &checkVersionResponse{}
case opMulti:
return &multiResponse{}
}
return nil
}

View file

@ -1,148 +0,0 @@
package zk
import (
"encoding/binary"
"fmt"
"io"
"net"
"sync"
)
var (
requests = make(map[int32]int32) // Map of Xid -> Opcode
requestsLock = &sync.Mutex{}
)
func trace(conn1, conn2 net.Conn, client bool) {
defer conn1.Close()
defer conn2.Close()
buf := make([]byte, 10*1024)
init := true
for {
_, err := io.ReadFull(conn1, buf[:4])
if err != nil {
fmt.Println("1>", client, err)
return
}
blen := int(binary.BigEndian.Uint32(buf[:4]))
_, err = io.ReadFull(conn1, buf[4:4+blen])
if err != nil {
fmt.Println("2>", client, err)
return
}
var cr interface{}
opcode := int32(-1)
readHeader := true
if client {
if init {
cr = &connectRequest{}
readHeader = false
} else {
xid := int32(binary.BigEndian.Uint32(buf[4:8]))
opcode = int32(binary.BigEndian.Uint32(buf[8:12]))
requestsLock.Lock()
requests[xid] = opcode
requestsLock.Unlock()
cr = requestStructForOp(opcode)
if cr == nil {
fmt.Printf("Unknown opcode %d\n", opcode)
}
}
} else {
if init {
cr = &connectResponse{}
readHeader = false
} else {
xid := int32(binary.BigEndian.Uint32(buf[4:8]))
zxid := int64(binary.BigEndian.Uint64(buf[8:16]))
errnum := int32(binary.BigEndian.Uint32(buf[16:20]))
if xid != -1 || zxid != -1 {
requestsLock.Lock()
found := false
opcode, found = requests[xid]
if !found {
opcode = 0
}
delete(requests, xid)
requestsLock.Unlock()
} else {
opcode = opWatcherEvent
}
cr = responseStructForOp(opcode)
if cr == nil {
fmt.Printf("Unknown opcode %d\n", opcode)
}
if errnum != 0 {
cr = &struct{}{}
}
}
}
opname := "."
if opcode != -1 {
opname = opNames[opcode]
}
if cr == nil {
fmt.Printf("%+v %s %+v\n", client, opname, buf[4:4+blen])
} else {
n := 4
hdrStr := ""
if readHeader {
var hdr interface{}
if client {
hdr = &requestHeader{}
} else {
hdr = &responseHeader{}
}
if n2, err := decodePacket(buf[n:n+blen], hdr); err != nil {
fmt.Println(err)
} else {
n += n2
}
hdrStr = fmt.Sprintf(" %+v", hdr)
}
if _, err := decodePacket(buf[n:n+blen], cr); err != nil {
fmt.Println(err)
}
fmt.Printf("%+v %s%s %+v\n", client, opname, hdrStr, cr)
}
init = false
written, err := conn2.Write(buf[:4+blen])
if err != nil {
fmt.Println("3>", client, err)
return
} else if written != 4+blen {
fmt.Printf("Written != read: %d != %d\n", written, blen)
return
}
}
}
func handleConnection(addr string, conn net.Conn) {
zkConn, err := net.Dial("tcp", addr)
if err != nil {
fmt.Println(err)
return
}
go trace(conn, zkConn, true)
trace(zkConn, conn, false)
}
func StartTracer(listenAddr, serverAddr string) {
ln, err := net.Listen("tcp", listenAddr)
if err != nil {
panic(err)
}
for {
conn, err := ln.Accept()
if err != nil {
fmt.Println(err)
continue
}
go handleConnection(serverAddr, conn)
}
}

View file

@ -1,54 +0,0 @@
package zk
import (
"crypto/sha1"
"encoding/base64"
"fmt"
"math/rand"
"strconv"
"strings"
)
// AuthACL produces an ACL list containing a single ACL which uses the
// provided permissions, with the scheme "auth", and ID "", which is used
// by ZooKeeper to represent any authenticated user.
func AuthACL(perms int32) []ACL {
return []ACL{{perms, "auth", ""}}
}
// WorldACL produces an ACL list containing a single ACL which uses the
// provided permissions, with the scheme "world", and ID "anyone", which
// is used by ZooKeeper to represent any user at all.
func WorldACL(perms int32) []ACL {
return []ACL{{perms, "world", "anyone"}}
}
func DigestACL(perms int32, user, password string) []ACL {
userPass := []byte(fmt.Sprintf("%s:%s", user, password))
h := sha1.New()
if n, err := h.Write(userPass); err != nil || n != len(userPass) {
panic("SHA1 failed")
}
digest := base64.StdEncoding.EncodeToString(h.Sum(nil))
return []ACL{{perms, "digest", fmt.Sprintf("%s:%s", user, digest)}}
}
// FormatServers takes a slice of addresses, and makes sure they are in a format
// that resembles <addr>:<port>. If the server has no port provided, the
// DefaultPort constant is added to the end.
func FormatServers(servers []string) []string {
for i := range servers {
if !strings.Contains(servers[i], ":") {
servers[i] = servers[i] + ":" + strconv.Itoa(DefaultPort)
}
}
return servers
}
// stringShuffle performs a Fisher-Yates shuffle on a slice of strings
func stringShuffle(s []string) {
for i := len(s) - 1; i > 0; i-- {
j := rand.Intn(i + 1)
s[i], s[j] = s[j], s[i]
}
}