Merge pull request #607 from mavenugo/libkv-upd

Libkv update with boltdb concurrent connection fix
This commit is contained in:
Jana Radhakrishnan 2015-10-08 15:40:24 -07:00
commit 305ab4a6ad
20 changed files with 479 additions and 106 deletions

View file

@ -81,8 +81,8 @@
},
{
"ImportPath": "github.com/docker/docker/pkg/parsers",
"Comment": "v1.4.1-4106-g637023a",
"Rev": "637023a5f8d8347a0e271c09d5c9bc84fbc97693"
"Comment": "v1.4.1-4106-g637023a",
"Rev": "637023a5f8d8347a0e271c09d5c9bc84fbc97693"
},
{
"ImportPath": "github.com/docker/docker/pkg/plugins",
@ -146,7 +146,7 @@
},
{
"ImportPath": "github.com/docker/libkv",
"Rev": "ea7ff6ae76485ab93ac36799d3e13b1905787ffe"
"Rev": "9d9710dcf6419a58455d0a80e0a8a21ca5f21a94"
},
{
"ImportPath": "github.com/godbus/dbus",

View file

@ -68,9 +68,16 @@ func main() {
You can find other usage examples for `libkv` under the `docker/swarm` or `docker/libnetwork` repositories.
## Supported versions
`libkv` supports:
- Consul version >= `0.5.1` because it uses Sessions with `Delete` behavior for the use of `TTLs` (mimics zookeeper's Ephemeral node support), If you don't plan to use `TTLs`: you can use Consul version `0.4.0+`.
- Etcd version >= `2.0` because it uses the `2.0.0` branch of the `coreos/go-etcd` client, this might change in the future as the support for `APIv3` comes along.
- Zookeeper version >= `3.4.5`. Although this might work with previous version but this remains untested as of now.
## TLS
The etcd backend supports etcd servers that require TLS Client Authentication. Zookeeper and Consul support are planned. This feature is somewhat experimental and the store.ClientTLSConfig struct may change to accommodate the additional backends.
The etcd backend supports etcd servers that require TLS Client Authentication. Zookeeper and Consul support are planned. This feature is somewhat experimental and the store.ClientTLSConfig struct may change to accommodate the additional backends.
## Warning

0
libnetwork/Godeps/_workspace/src/github.com/docker/libkv/script/coverage generated vendored Executable file → Normal file
View file

0
libnetwork/Godeps/_workspace/src/github.com/docker/libkv/script/travis_consul.sh generated vendored Executable file → Normal file
View file

0
libnetwork/Godeps/_workspace/src/github.com/docker/libkv/script/travis_etcd.sh generated vendored Executable file → Normal file
View file

0
libnetwork/Godeps/_workspace/src/github.com/docker/libkv/script/travis_zk.sh generated vendored Executable file → Normal file
View file

0
libnetwork/Godeps/_workspace/src/github.com/docker/libkv/script/validate-gofmt generated vendored Executable file → Normal file
View file

View file

@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"sync/atomic"
"time"
"github.com/boltdb/bolt"
"github.com/docker/libkv"
@ -25,15 +26,28 @@ var (
ErrBoltAPIUnsupported = errors.New("API not supported by BoltDB backend")
)
const (
filePerm os.FileMode = 0644
)
//BoltDB type implements the Store interface
type BoltDB struct {
client *bolt.DB
boltBucket []byte
dbIndex uint64
path string
timeout time.Duration
// By default libkv opens and closes the bolt DB connection for every
// get/put operation. This allows multiple apps to use a Bolt DB at the
// same time.
// PersistConnection flag provides an option to override ths behavior.
// ie: open the connection in New and use it till Close is called.
PersistConnection bool
}
const (
libkvmetadatalen = 8
transientTimeout = time.Duration(10) * time.Second
)
// Register registers boltdb to libkv
@ -43,6 +57,12 @@ func Register() {
// New opens a new BoltDB connection to the specified path and bucket
func New(endpoints []string, options *store.Config) (store.Store, error) {
var (
db *bolt.DB
err error
boltOptions *bolt.Options
)
if len(endpoints) > 1 {
return nil, ErrMultipleEndpointsUnsupported
}
@ -52,39 +72,79 @@ func New(endpoints []string, options *store.Config) (store.Store, error) {
}
dir, _ := filepath.Split(endpoints[0])
if err := os.MkdirAll(dir, 0750); err != nil {
if err = os.MkdirAll(dir, 0750); err != nil {
return nil, err
}
var boltOptions *bolt.Options
if options != nil {
if options.PersistConnection {
boltOptions = &bolt.Options{Timeout: options.ConnectionTimeout}
}
db, err := bolt.Open(endpoints[0], 0644, boltOptions)
if err != nil {
return nil, err
db, err = bolt.Open(endpoints[0], filePerm, boltOptions)
if err != nil {
return nil, err
}
}
b := &BoltDB{}
b := &BoltDB{
client: db,
path: endpoints[0],
boltBucket: []byte(options.Bucket),
timeout: transientTimeout,
PersistConnection: options.PersistConnection,
}
b.client = db
b.boltBucket = []byte(options.Bucket)
return b, nil
}
func (b *BoltDB) reset() {
b.path = ""
b.boltBucket = []byte{}
}
func (b *BoltDB) getDBhandle() (*bolt.DB, error) {
var (
db *bolt.DB
err error
)
if !b.PersistConnection {
boltOptions := &bolt.Options{Timeout: b.timeout}
if db, err = bolt.Open(b.path, filePerm, boltOptions); err != nil {
return nil, err
}
b.client = db
}
return b.client, nil
}
func (b *BoltDB) releaseDBhandle() {
if !b.PersistConnection {
b.client.Close()
}
}
// Get the value at "key". BoltDB doesn't provide an inbuilt last modified index with every kv pair. Its implemented by
// by a atomic counter maintained by the libkv and appened to the value passed by the client.
func (b *BoltDB) Get(key string) (*store.KVPair, error) {
var val []byte
var (
val []byte
db *bolt.DB
err error
)
db := b.client
err := db.View(func(tx *bolt.Tx) error {
if db, err = b.getDBhandle(); err != nil {
return nil, err
}
defer b.releaseDBhandle()
err = db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return (ErrBoltBucketNotFound)
return ErrBoltBucketNotFound
}
val = bucket.Get([]byte(key))
v := bucket.Get([]byte(key))
val = make([]byte, len(v))
copy(val, v)
return nil
})
@ -104,11 +164,19 @@ func (b *BoltDB) Get(key string) (*store.KVPair, error) {
//Put the key, value pair. index number metadata is prepended to the value
func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error {
var dbIndex uint64
db := b.client
var (
dbIndex uint64
db *bolt.DB
err error
)
dbval := make([]byte, libkvmetadatalen)
err := db.Update(func(tx *bolt.Tx) error {
if db, err = b.getDBhandle(); err != nil {
return err
}
defer b.releaseDBhandle()
err = db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(b.boltBucket)
if err != nil {
return err
@ -129,12 +197,19 @@ func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error {
//Delete the value for the given key.
func (b *BoltDB) Delete(key string) error {
db := b.client
var (
db *bolt.DB
err error
)
if db, err = b.getDBhandle(); err != nil {
return err
}
defer b.releaseDBhandle()
err := db.Update(func(tx *bolt.Tx) error {
err = db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return (ErrBoltBucketNotFound)
return ErrBoltBucketNotFound
}
err := bucket.Delete([]byte(key))
return err
@ -144,13 +219,21 @@ func (b *BoltDB) Delete(key string) error {
// Exists checks if the key exists inside the store
func (b *BoltDB) Exists(key string) (bool, error) {
var val []byte
var (
val []byte
db *bolt.DB
err error
)
db := b.client
err := db.View(func(tx *bolt.Tx) error {
if db, err = b.getDBhandle(); err != nil {
return false, err
}
defer b.releaseDBhandle()
err = db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return (ErrBoltBucketNotFound)
return ErrBoltBucketNotFound
}
val = bucket.Get([]byte(key))
@ -166,22 +249,32 @@ func (b *BoltDB) Exists(key string) (bool, error) {
// List returns the range of keys starting with the passed in prefix
func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
var (
db *bolt.DB
err error
)
kv := []*store.KVPair{}
db := b.client
err := db.View(func(tx *bolt.Tx) error {
if db, err = b.getDBhandle(); err != nil {
return nil, err
}
defer b.releaseDBhandle()
err = db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return (ErrBoltBucketNotFound)
return ErrBoltBucketNotFound
}
cursor := bucket.Cursor()
prefix := []byte(keyPrefix)
for key, val := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, val = cursor.Next() {
for key, v := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, v = cursor.Next() {
dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
val = val[libkvmetadatalen:]
dbIndex := binary.LittleEndian.Uint64(v[:libkvmetadatalen])
v = v[libkvmetadatalen:]
val := make([]byte, len(v))
copy(val, v)
kv = append(kv, &store.KVPair{
Key: string(key),
@ -201,22 +294,28 @@ func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
// has not been modified in the meantime, throws an
// error if this is the case
func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
var val []byte
var dbIndex uint64
var (
val []byte
db *bolt.DB
err error
)
if previous == nil {
return false, store.ErrPreviousNotSpecified
}
db := b.client
if db, err = b.getDBhandle(); err != nil {
return false, err
}
defer b.releaseDBhandle()
err := db.Update(func(tx *bolt.Tx) error {
err = db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return ErrBoltBucketNotFound
}
val = bucket.Get([]byte(key))
dbIndex = binary.LittleEndian.Uint64(val[:libkvmetadatalen])
dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
if dbIndex != previous.LastIndex {
return store.ErrKeyModified
}
@ -232,13 +331,20 @@ func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error)
// AtomicPut puts a value at "key" if the key has not been
// modified since the last Put, throws an error if this is the case
func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
var val []byte
var dbIndex uint64
var (
val []byte
dbIndex uint64
db *bolt.DB
err error
)
dbval := make([]byte, libkvmetadatalen)
db := b.client
if db, err = b.getDBhandle(); err != nil {
return false, nil, err
}
defer b.releaseDBhandle()
err := db.Update(func(tx *bolt.Tx) error {
err = db.Update(func(tx *bolt.Tx) error {
var err error
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
@ -285,18 +391,29 @@ func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, opt
// Close the db connection to the BoltDB
func (b *BoltDB) Close() {
db := b.client
db.Close()
if !b.PersistConnection {
b.reset()
} else {
b.client.Close()
}
return
}
// DeleteTree deletes a range of keys with a given prefix
func (b *BoltDB) DeleteTree(keyPrefix string) error {
db := b.client
err := db.Update(func(tx *bolt.Tx) error {
var (
db *bolt.DB
err error
)
if db, err = b.getDBhandle(); err != nil {
return err
}
defer b.releaseDBhandle()
err = db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return (ErrBoltBucketNotFound)
return ErrBoltBucketNotFound
}
cursor := bucket.Cursor()

View file

@ -39,11 +39,16 @@ func TestRegister(t *testing.T) {
_ = os.Remove("/tmp/not_exist_dir/__boltdbtest")
}
func TestTimeout(t *testing.T) {
// TestMultiplePersistConnection tests the second connection to a
// BoltDB fails when one is already open with PersistConnection flag
func TestMultiplePersistConnection(t *testing.T) {
kv, err := libkv.NewStore(
store.BOLTDB,
[]string{"/tmp/not_exist_dir/__boltdbtest"},
&store.Config{Bucket: "boltDBTest", ConnectionTimeout: 1 * time.Second},
&store.Config{
Bucket: "boltDBTest",
ConnectionTimeout: 1 * time.Second,
PersistConnection: true},
)
assert.NoError(t, err)
assert.NotNil(t, kv)
@ -56,13 +61,79 @@ func TestTimeout(t *testing.T) {
kv, err = libkv.NewStore(
store.BOLTDB,
[]string{"/tmp/not_exist_dir/__boltdbtest"},
&store.Config{Bucket: "boltDBTest", ConnectionTimeout: 1 * time.Second},
&store.Config{
Bucket: "boltDBTest",
ConnectionTimeout: 1 * time.Second,
PersistConnection: true},
)
assert.Error(t, err)
_ = os.Remove("/tmp/not_exist_dir/__boltdbtest")
}
// TestConcurrentConnection tests simultaenous get/put using
// two handles.
func TestConcurrentConnection(t *testing.T) {
var err error
kv1, err1 := libkv.NewStore(
store.BOLTDB,
[]string{"/tmp/__boltdbtest"},
&store.Config{
Bucket: "boltDBTest",
ConnectionTimeout: 1 * time.Second},
)
assert.NoError(t, err1)
assert.NotNil(t, kv1)
kv2, err2 := libkv.NewStore(
store.BOLTDB,
[]string{"/tmp/__boltdbtest"},
&store.Config{Bucket: "boltDBTest",
ConnectionTimeout: 1 * time.Second},
)
assert.NoError(t, err2)
assert.NotNil(t, kv2)
key1 := "TestKV1"
value1 := []byte("TestVal1")
err = kv1.Put(key1, value1, nil)
assert.NoError(t, err)
key2 := "TestKV2"
value2 := []byte("TestVal2")
err = kv2.Put(key2, value2, nil)
assert.NoError(t, err)
pair1, err1 := kv1.Get(key1)
assert.NoError(t, err)
if assert.NotNil(t, pair1) {
assert.NotNil(t, pair1.Value)
}
assert.Equal(t, pair1.Value, value1)
pair2, err2 := kv2.Get(key2)
assert.NoError(t, err)
if assert.NotNil(t, pair2) {
assert.NotNil(t, pair2.Value)
}
assert.Equal(t, pair2.Value, value2)
// AtomicPut using kv1 and kv2 should succeed
_, _, err = kv1.AtomicPut(key1, []byte("TestnewVal1"), pair1, nil)
assert.NoError(t, err)
_, _, err = kv2.AtomicPut(key2, []byte("TestnewVal2"), pair2, nil)
assert.NoError(t, err)
testutils.RunTestCommon(t, kv1)
testutils.RunTestCommon(t, kv2)
kv1.Close()
kv2.Close()
_ = os.Remove("/tmp/__boltdbtest")
}
func TestBoldDBStore(t *testing.T) {
kv := makeBoltDBClient(t)

View file

@ -35,7 +35,8 @@ type Consul struct {
}
type consulLock struct {
lock *api.Lock
lock *api.Lock
renewCh chan struct{}
}
// Register registers consul to libkv
@ -87,7 +88,7 @@ func (s *Consul) setTLS(tls *tls.Config) {
s.config.Scheme = "https"
}
// SetTimeout sets the timout for connecting to Consul
// SetTimeout sets the timeout for connecting to Consul
func (s *Consul) setTimeout(time time.Duration) {
s.config.WaitTime = time
}
@ -360,32 +361,63 @@ func (s *Consul) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*
// NewLock returns a handle to a lock struct which can
// be used to provide mutual exclusion on a key
func (s *Consul) NewLock(key string, options *store.LockOptions) (store.Locker, error) {
consulOpts := &api.LockOptions{
lockOpts := &api.LockOptions{
Key: s.normalize(key),
}
lock := &consulLock{}
if options != nil {
consulOpts.Value = options.Value
// Set optional TTL on Lock
if options.TTL != 0 {
entry := &api.SessionEntry{
Behavior: api.SessionBehaviorRelease, // Release the lock when the session expires
TTL: (options.TTL / 2).String(), // Consul multiplies the TTL by 2x
LockDelay: 1 * time.Millisecond, // Virtually disable lock delay
}
// Create the key session
session, _, err := s.client.Session().Create(entry, nil)
if err != nil {
return nil, err
}
// Place the session on lock
lockOpts.Session = session
// Renew the session ttl lock periodically
go s.client.Session().RenewPeriodic(entry.TTL, session, nil, options.RenewLock)
lock.renewCh = options.RenewLock
}
// Set optional value on Lock
if options.Value != nil {
lockOpts.Value = options.Value
}
}
l, err := s.client.LockOpts(consulOpts)
l, err := s.client.LockOpts(lockOpts)
if err != nil {
return nil, err
}
return &consulLock{lock: l}, nil
lock.lock = l
return lock, nil
}
// Lock attempts to acquire the lock and blocks while
// doing so. It returns a channel that is closed if our
// lock is lost or if an error occurs
func (l *consulLock) Lock() (<-chan struct{}, error) {
return l.lock.Lock(nil)
func (l *consulLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) {
return l.lock.Lock(stopChan)
}
// Unlock the "key". Calling unlock while
// not holding the lock will throw an error
func (l *consulLock) Unlock() error {
if l.renewCh != nil {
close(l.renewCh)
}
return l.lock.Unlock()
}

View file

@ -44,13 +44,15 @@ func TestRegister(t *testing.T) {
func TestConsulStore(t *testing.T) {
kv := makeConsulClient(t)
backup := makeConsulClient(t)
lockKV := makeConsulClient(t)
ttlKV := makeConsulClient(t)
testutils.RunTestCommon(t, kv)
testutils.RunTestAtomic(t, kv)
testutils.RunTestWatch(t, kv)
testutils.RunTestLock(t, kv)
testutils.RunTestTTL(t, kv, backup)
testutils.RunTestLockTTL(t, kv, lockKV)
testutils.RunTestTTL(t, kv, ttlKV)
testutils.RunCleanup(t, kv)
}

View file

@ -2,6 +2,7 @@ package etcd
import (
"crypto/tls"
"errors"
"net"
"net/http"
"strings"
@ -12,6 +13,13 @@ import (
"github.com/docker/libkv/store"
)
var (
// ErrAbortTryLock is thrown when a user stops trying to seek the lock
// by sending a signal to the stop chan, this is used to verify if the
// operation succeeded
ErrAbortTryLock = errors.New("lock operation aborted")
)
// Etcd is the receiver type for the
// Store interface
type Etcd struct {
@ -19,12 +27,13 @@ type Etcd struct {
}
type etcdLock struct {
client *etcd.Client
stopLock chan struct{}
key string
value string
last *etcd.Response
ttl uint64
client *etcd.Client
stopLock chan struct{}
stopRenew chan struct{}
key string
value string
last *etcd.Response
ttl uint64
}
const (
@ -395,6 +404,7 @@ func (s *Etcd) DeleteTree(directory string) error {
func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locker, err error) {
var value string
ttl := uint64(time.Duration(defaultLockTTL).Seconds())
renewCh := make(chan struct{})
// Apply options on Lock
if options != nil {
@ -404,14 +414,18 @@ func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locke
if options.TTL != 0 {
ttl = uint64(options.TTL.Seconds())
}
if options.RenewLock != nil {
renewCh = options.RenewLock
}
}
// Create lock object
lock = &etcdLock{
client: s.client,
key: key,
value: value,
ttl: ttl,
client: s.client,
stopRenew: renewCh,
key: key,
value: value,
ttl: ttl,
}
return lock, nil
@ -420,13 +434,13 @@ func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locke
// Lock attempts to acquire the lock and blocks while
// doing so. It returns a channel that is closed if our
// lock is lost or if an error occurs
func (l *etcdLock) Lock() (<-chan struct{}, error) {
func (l *etcdLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) {
key := store.Normalize(l.key)
// Lock holder channels
// Lock holder channel
lockHeld := make(chan struct{})
stopLocking := make(chan struct{})
stopLocking := l.stopRenew
var lastIndex uint64
@ -454,7 +468,18 @@ func (l *etcdLock) Lock() (<-chan struct{}, error) {
// Seeker section
chW := make(chan *etcd.Response)
chWStop := make(chan bool)
l.waitLock(key, chW, chWStop)
free := make(chan bool)
go l.waitLock(key, chW, chWStop, free)
// Wait for the key to be available or for
// a signal to stop trying to lock the key
select {
case _ = <-free:
break
case _ = <-stopChan:
return nil, ErrAbortTryLock
}
// Delete or Expire event occured
// Retry
@ -467,10 +492,10 @@ func (l *etcdLock) Lock() (<-chan struct{}, error) {
// Hold the lock as long as we can
// Updates the key ttl periodically until we receive
// an explicit stop signal from the Unlock method
func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking chan struct{}) {
func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking <-chan struct{}) {
defer close(lockHeld)
update := time.NewTicker(defaultUpdateTime)
update := time.NewTicker(time.Duration(l.ttl) * time.Second / 3)
defer update.Stop()
var err error
@ -490,11 +515,12 @@ func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking chan
}
// WaitLock simply waits for the key to be available for creation
func (l *etcdLock) waitLock(key string, eventCh chan *etcd.Response, stopWatchCh chan bool) {
func (l *etcdLock) waitLock(key string, eventCh chan *etcd.Response, stopWatchCh chan bool, free chan<- bool) {
go l.client.Watch(key, 0, false, eventCh, stopWatchCh)
for event := range eventCh {
if event.Action == "delete" || event.Action == "expire" {
return
free <- true
}
}
}

View file

@ -43,12 +43,14 @@ func TestRegister(t *testing.T) {
func TestEtcdStore(t *testing.T) {
kv := makeEtcdClient(t)
backup := makeEtcdClient(t)
lockKV := makeEtcdClient(t)
ttlKV := makeEtcdClient(t)
testutils.RunTestCommon(t, kv)
testutils.RunTestAtomic(t, kv)
testutils.RunTestWatch(t, kv)
testutils.RunTestLock(t, kv)
testutils.RunTestTTL(t, kv, backup)
testutils.RunTestLockTTL(t, kv, lockKV)
testutils.RunTestTTL(t, kv, ttlKV)
testutils.RunCleanup(t, kv)
}

View file

@ -96,8 +96,8 @@ type Lock struct {
}
// Lock mock
func (l *Lock) Lock() (<-chan struct{}, error) {
args := l.Mock.Called()
func (l *Lock) Lock(stopCh chan struct{}) (<-chan struct{}, error) {
args := l.Mock.Called(stopCh)
return args.Get(0).(<-chan struct{}), args.Error(1)
}

View file

@ -43,6 +43,7 @@ type Config struct {
TLS *tls.Config
ConnectionTimeout time.Duration
Bucket string
PersistConnection bool
}
// ClientTLSConfig contains data for a Client TLS configuration in the form
@ -113,13 +114,14 @@ type WriteOptions struct {
// LockOptions contains optional request parameters
type LockOptions struct {
Value []byte // Optional, value to associate with the lock
TTL time.Duration // Optional, expiration ttl associated with the lock
Value []byte // Optional, value to associate with the lock
TTL time.Duration // Optional, expiration ttl associated with the lock
RenewLock chan struct{} // Optional, chan used to control and stop the session ttl renewal for the lock
}
// Locker provides locking mechanism on top of the store.
// Similar to `sync.Lock` except it may return errors.
type Locker interface {
Lock() (<-chan struct{}, error)
Lock(stopChan chan struct{}) (<-chan struct{}, error)
Unlock() error
}

View file

@ -10,6 +10,9 @@ import (
)
const (
// SOH control character
SOH = "\x01"
defaultTimeout = 10 * time.Second
)
@ -72,6 +75,12 @@ func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) {
return nil, err
}
// FIXME handle very rare cases where Get returns the
// SOH control character instead of the actual value
if string(resp) == SOH {
return s.Get(store.Normalize(key))
}
pair = &store.KVPair{
Key: key,
Value: resp,
@ -371,7 +380,7 @@ func (s *Zookeeper) NewLock(key string, options *store.LockOptions) (lock store.
// Lock attempts to acquire the lock and blocks while
// doing so. It returns a channel that is closed if our
// lock is lost or if an error occurs
func (l *zookeeperLock) Lock() (<-chan struct{}, error) {
func (l *zookeeperLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) {
err := l.lock.Lock()
if err == nil {

View file

@ -43,12 +43,12 @@ func TestRegister(t *testing.T) {
func TestZkStore(t *testing.T) {
kv := makeZkClient(t)
backup := makeZkClient(t)
ttlKV := makeZkClient(t)
testutils.RunTestCommon(t, kv)
testutils.RunTestAtomic(t, kv)
testutils.RunTestWatch(t, kv)
testutils.RunTestLock(t, kv)
testutils.RunTestTTL(t, kv, backup)
testutils.RunTestTTL(t, kv, ttlKV)
testutils.RunCleanup(t, kv)
}

View file

@ -38,6 +38,12 @@ func RunTestLock(t *testing.T, kv store.Store) {
testLockUnlock(t, kv)
}
// RunTestLockTTL tests the KV pair Lock with TTL APIs supported
// by the K/V backends.
func RunTestLockTTL(t *testing.T, kv store.Store, backup store.Store) {
testLockTTL(t, kv, backup)
}
// RunTestTTL tests the TTL funtionality of the K/V backend.
func RunTestTTL(t *testing.T, kv store.Store, backup store.Store) {
testPutTTL(t, kv, backup)
@ -307,7 +313,7 @@ func testLockUnlock(t *testing.T, kv store.Store) {
assert.NotNil(t, lock)
// Lock should successfully succeed or block
lockChan, err := lock.Lock()
lockChan, err := lock.Lock(nil)
assert.NoError(t, err)
assert.NotNil(t, lockChan)
@ -325,7 +331,7 @@ func testLockUnlock(t *testing.T, kv store.Store) {
assert.NoError(t, err)
// Lock should succeed again
lockChan, err = lock.Lock()
lockChan, err = lock.Lock(nil)
assert.NoError(t, err)
assert.NotNil(t, lockChan)
@ -337,6 +343,110 @@ func testLockUnlock(t *testing.T, kv store.Store) {
}
assert.Equal(t, pair.Value, value)
assert.NotEqual(t, pair.LastIndex, 0)
err = lock.Unlock()
assert.NoError(t, err)
}
func testLockTTL(t *testing.T, kv store.Store, otherConn store.Store) {
key := "testLockTTL"
value := []byte("bar")
renewCh := make(chan struct{})
// We should be able to create a new lock on key
lock, err := otherConn.NewLock(key, &store.LockOptions{
Value: value,
TTL: 2 * time.Second,
RenewLock: renewCh,
})
assert.NoError(t, err)
assert.NotNil(t, lock)
// Lock should successfully succeed
lockChan, err := lock.Lock(nil)
assert.NoError(t, err)
assert.NotNil(t, lockChan)
// Get should work
pair, err := otherConn.Get(key)
assert.NoError(t, err)
if assert.NotNil(t, pair) {
assert.NotNil(t, pair.Value)
}
assert.Equal(t, pair.Value, value)
assert.NotEqual(t, pair.LastIndex, 0)
time.Sleep(3 * time.Second)
done := make(chan struct{})
stop := make(chan struct{})
value = []byte("foobar")
// Create a new lock with another connection
lock, err = kv.NewLock(
key,
&store.LockOptions{
Value: value,
TTL: 3 * time.Second,
},
)
assert.NoError(t, err)
assert.NotNil(t, lock)
// Lock should block, the session on the lock
// is still active and renewed periodically
go func(<-chan struct{}) {
_, _ = lock.Lock(stop)
done <- struct{}{}
}(done)
select {
case _ = <-done:
t.Fatal("Lock succeeded on a key that is supposed to be locked by another client")
case <-time.After(4 * time.Second):
// Stop requesting the lock as we are blocked as expected
stop <- struct{}{}
break
}
// Close the connection
otherConn.Close()
// Force stop the session renewal for the lock
close(renewCh)
// Let the session on the lock expire
time.Sleep(3 * time.Second)
locked := make(chan struct{})
// Lock should now succeed for the other client
go func(<-chan struct{}) {
lockChan, err = lock.Lock(nil)
assert.NoError(t, err)
assert.NotNil(t, lockChan)
locked <- struct{}{}
}(locked)
select {
case _ = <-locked:
break
case <-time.After(4 * time.Second):
t.Fatal("Unable to take the lock, timed out")
}
// Get should work with the new value
pair, err = kv.Get(key)
assert.NoError(t, err)
if assert.NotNil(t, pair) {
assert.NotNil(t, pair.Value)
}
assert.Equal(t, pair.Value, value)
assert.NotEqual(t, pair.LastIndex, 0)
err = lock.Unlock()
assert.NoError(t, err)
}
func testPutTTL(t *testing.T, kv store.Store, otherConn store.Store) {
@ -482,6 +592,7 @@ func RunCleanup(t *testing.T, kv store.Store) {
"testAtomicPutCreate",
"testAtomicDelete",
"testLockUnlock",
"testLockTTL",
"testPutTTL",
"testList",
"testDeleteTree",

View file

@ -6,7 +6,6 @@ import (
"reflect"
"strings"
"sync"
"time"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
@ -141,8 +140,7 @@ func makeDefaultScopes() map[string]*ScopeCfg {
Provider: "boltdb",
Address: defaultPrefix + "/boltdb.db",
Config: &store.Config{
Bucket: "libnetwork",
ConnectionTimeout: 3 * time.Second,
Bucket: "libnetwork",
},
},
}

View file

@ -5,7 +5,6 @@ import (
"io/ioutil"
"os"
"testing"
"time"
"github.com/docker/libkv/store"
"github.com/docker/libnetwork/config"
@ -36,7 +35,7 @@ func TestBoltdbBackend(t *testing.T) {
defer os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address)
testLocalBackend(t, "", "", nil)
defer os.Remove("/tmp/boltdb.db")
config := &store.Config{Bucket: "testBackend", ConnectionTimeout: 3 * time.Second}
config := &store.Config{Bucket: "testBackend"}
testLocalBackend(t, "boltdb", "/tmp/boltdb.db", config)
}
@ -130,12 +129,12 @@ func OptionBoltdbWithRandomDBFile() ([]config.Option, error) {
cfgOptions := []config.Option{}
cfgOptions = append(cfgOptions, config.OptionLocalKVProvider("boltdb"))
cfgOptions = append(cfgOptions, config.OptionLocalKVProviderURL(tmp.Name()))
sCfg := &store.Config{Bucket: "testBackend", ConnectionTimeout: 3 * time.Second}
sCfg := &store.Config{Bucket: "testBackend"}
cfgOptions = append(cfgOptions, config.OptionLocalKVProviderConfig(sCfg))
return cfgOptions, nil
}
func TestLocalStoreLockTimeout(t *testing.T) {
func TestMultipleControllersWithSameStore(t *testing.T) {
cfgOptions, err := OptionBoltdbWithRandomDBFile()
if err != nil {
t.Fatalf("Error getting random boltdb configs %v", err)
@ -146,11 +145,8 @@ func TestLocalStoreLockTimeout(t *testing.T) {
}
defer ctrl1.Stop()
// Use the same boltdb file without closing the previous controller
// with a slightly altered configuration
sCfg := &store.Config{Bucket: "testBackend", ConnectionTimeout: 1 * time.Second}
_, err = New(append(cfgOptions[:len(cfgOptions)-1],
config.OptionLocalKVProviderConfig(sCfg))...)
if err == nil {
t.Fatalf("Expected to fail but succeeded")
_, err = New(cfgOptions...)
if err != nil {
t.Fatalf("Local store must support concurrent controllers")
}
}