diff --git a/libnetwork/Godeps/Godeps.json b/libnetwork/Godeps/Godeps.json index b37fc5b6cb..db6d99bd97 100644 --- a/libnetwork/Godeps/Godeps.json +++ b/libnetwork/Godeps/Godeps.json @@ -81,8 +81,8 @@ }, { "ImportPath": "github.com/docker/docker/pkg/parsers", - "Comment": "v1.4.1-4106-g637023a", - "Rev": "637023a5f8d8347a0e271c09d5c9bc84fbc97693" + "Comment": "v1.4.1-4106-g637023a", + "Rev": "637023a5f8d8347a0e271c09d5c9bc84fbc97693" }, { "ImportPath": "github.com/docker/docker/pkg/plugins", @@ -146,7 +146,7 @@ }, { "ImportPath": "github.com/docker/libkv", - "Rev": "ea7ff6ae76485ab93ac36799d3e13b1905787ffe" + "Rev": "9d9710dcf6419a58455d0a80e0a8a21ca5f21a94" }, { "ImportPath": "github.com/godbus/dbus", diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/README.md b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/README.md index b433e536be..dc4286e2b9 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/README.md +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/README.md @@ -68,9 +68,16 @@ func main() { You can find other usage examples for `libkv` under the `docker/swarm` or `docker/libnetwork` repositories. +## Supported versions + +`libkv` supports: +- Consul version >= `0.5.1` because it uses Sessions with `Delete` behavior for the use of `TTLs` (mimics zookeeper's Ephemeral node support), If you don't plan to use `TTLs`: you can use Consul version `0.4.0+`. +- Etcd version >= `2.0` because it uses the `2.0.0` branch of the `coreos/go-etcd` client, this might change in the future as the support for `APIv3` comes along. +- Zookeeper version >= `3.4.5`. Although this might work with previous version but this remains untested as of now. + ## TLS -The etcd backend supports etcd servers that require TLS Client Authentication. Zookeeper and Consul support are planned. This feature is somewhat experimental and the store.ClientTLSConfig struct may change to accommodate the additional backends. +The etcd backend supports etcd servers that require TLS Client Authentication. Zookeeper and Consul support are planned. This feature is somewhat experimental and the store.ClientTLSConfig struct may change to accommodate the additional backends. ## Warning diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/script/coverage b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/script/coverage old mode 100755 new mode 100644 diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/script/travis_consul.sh b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/script/travis_consul.sh old mode 100755 new mode 100644 diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/script/travis_etcd.sh b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/script/travis_etcd.sh old mode 100755 new mode 100644 diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/script/travis_zk.sh b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/script/travis_zk.sh old mode 100755 new mode 100644 diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/script/validate-gofmt b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/script/validate-gofmt old mode 100755 new mode 100644 diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/boltdb/boltdb.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/boltdb/boltdb.go index a2f8e1f87d..aa3cc71370 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/boltdb/boltdb.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/boltdb/boltdb.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "sync/atomic" + "time" "github.com/boltdb/bolt" "github.com/docker/libkv" @@ -25,15 +26,28 @@ var ( ErrBoltAPIUnsupported = errors.New("API not supported by BoltDB backend") ) +const ( + filePerm os.FileMode = 0644 +) + //BoltDB type implements the Store interface type BoltDB struct { client *bolt.DB boltBucket []byte dbIndex uint64 + path string + timeout time.Duration + // By default libkv opens and closes the bolt DB connection for every + // get/put operation. This allows multiple apps to use a Bolt DB at the + // same time. + // PersistConnection flag provides an option to override ths behavior. + // ie: open the connection in New and use it till Close is called. + PersistConnection bool } const ( libkvmetadatalen = 8 + transientTimeout = time.Duration(10) * time.Second ) // Register registers boltdb to libkv @@ -43,6 +57,12 @@ func Register() { // New opens a new BoltDB connection to the specified path and bucket func New(endpoints []string, options *store.Config) (store.Store, error) { + var ( + db *bolt.DB + err error + boltOptions *bolt.Options + ) + if len(endpoints) > 1 { return nil, ErrMultipleEndpointsUnsupported } @@ -52,39 +72,79 @@ func New(endpoints []string, options *store.Config) (store.Store, error) { } dir, _ := filepath.Split(endpoints[0]) - if err := os.MkdirAll(dir, 0750); err != nil { + if err = os.MkdirAll(dir, 0750); err != nil { return nil, err } - var boltOptions *bolt.Options - if options != nil { + if options.PersistConnection { boltOptions = &bolt.Options{Timeout: options.ConnectionTimeout} - } - db, err := bolt.Open(endpoints[0], 0644, boltOptions) - if err != nil { - return nil, err + db, err = bolt.Open(endpoints[0], filePerm, boltOptions) + if err != nil { + return nil, err + } } - b := &BoltDB{} + b := &BoltDB{ + client: db, + path: endpoints[0], + boltBucket: []byte(options.Bucket), + timeout: transientTimeout, + PersistConnection: options.PersistConnection, + } - b.client = db - b.boltBucket = []byte(options.Bucket) return b, nil } +func (b *BoltDB) reset() { + b.path = "" + b.boltBucket = []byte{} +} + +func (b *BoltDB) getDBhandle() (*bolt.DB, error) { + var ( + db *bolt.DB + err error + ) + if !b.PersistConnection { + boltOptions := &bolt.Options{Timeout: b.timeout} + if db, err = bolt.Open(b.path, filePerm, boltOptions); err != nil { + return nil, err + } + b.client = db + } + + return b.client, nil +} + +func (b *BoltDB) releaseDBhandle() { + if !b.PersistConnection { + b.client.Close() + } +} + // Get the value at "key". BoltDB doesn't provide an inbuilt last modified index with every kv pair. Its implemented by // by a atomic counter maintained by the libkv and appened to the value passed by the client. func (b *BoltDB) Get(key string) (*store.KVPair, error) { - var val []byte + var ( + val []byte + db *bolt.DB + err error + ) - db := b.client - err := db.View(func(tx *bolt.Tx) error { + if db, err = b.getDBhandle(); err != nil { + return nil, err + } + defer b.releaseDBhandle() + + err = db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket(b.boltBucket) if bucket == nil { - return (ErrBoltBucketNotFound) + return ErrBoltBucketNotFound } - val = bucket.Get([]byte(key)) + v := bucket.Get([]byte(key)) + val = make([]byte, len(v)) + copy(val, v) return nil }) @@ -104,11 +164,19 @@ func (b *BoltDB) Get(key string) (*store.KVPair, error) { //Put the key, value pair. index number metadata is prepended to the value func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error { - var dbIndex uint64 - db := b.client + var ( + dbIndex uint64 + db *bolt.DB + err error + ) dbval := make([]byte, libkvmetadatalen) - err := db.Update(func(tx *bolt.Tx) error { + if db, err = b.getDBhandle(); err != nil { + return err + } + defer b.releaseDBhandle() + + err = db.Update(func(tx *bolt.Tx) error { bucket, err := tx.CreateBucketIfNotExists(b.boltBucket) if err != nil { return err @@ -129,12 +197,19 @@ func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error { //Delete the value for the given key. func (b *BoltDB) Delete(key string) error { - db := b.client + var ( + db *bolt.DB + err error + ) + if db, err = b.getDBhandle(); err != nil { + return err + } + defer b.releaseDBhandle() - err := db.Update(func(tx *bolt.Tx) error { + err = db.Update(func(tx *bolt.Tx) error { bucket := tx.Bucket(b.boltBucket) if bucket == nil { - return (ErrBoltBucketNotFound) + return ErrBoltBucketNotFound } err := bucket.Delete([]byte(key)) return err @@ -144,13 +219,21 @@ func (b *BoltDB) Delete(key string) error { // Exists checks if the key exists inside the store func (b *BoltDB) Exists(key string) (bool, error) { - var val []byte + var ( + val []byte + db *bolt.DB + err error + ) - db := b.client - err := db.View(func(tx *bolt.Tx) error { + if db, err = b.getDBhandle(); err != nil { + return false, err + } + defer b.releaseDBhandle() + + err = db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket(b.boltBucket) if bucket == nil { - return (ErrBoltBucketNotFound) + return ErrBoltBucketNotFound } val = bucket.Get([]byte(key)) @@ -166,22 +249,32 @@ func (b *BoltDB) Exists(key string) (bool, error) { // List returns the range of keys starting with the passed in prefix func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) { + var ( + db *bolt.DB + err error + ) kv := []*store.KVPair{} - db := b.client - err := db.View(func(tx *bolt.Tx) error { + if db, err = b.getDBhandle(); err != nil { + return nil, err + } + defer b.releaseDBhandle() + + err = db.View(func(tx *bolt.Tx) error { bucket := tx.Bucket(b.boltBucket) if bucket == nil { - return (ErrBoltBucketNotFound) + return ErrBoltBucketNotFound } cursor := bucket.Cursor() prefix := []byte(keyPrefix) - for key, val := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, val = cursor.Next() { + for key, v := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, v = cursor.Next() { - dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen]) - val = val[libkvmetadatalen:] + dbIndex := binary.LittleEndian.Uint64(v[:libkvmetadatalen]) + v = v[libkvmetadatalen:] + val := make([]byte, len(v)) + copy(val, v) kv = append(kv, &store.KVPair{ Key: string(key), @@ -201,22 +294,28 @@ func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) { // has not been modified in the meantime, throws an // error if this is the case func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error) { - var val []byte - var dbIndex uint64 + var ( + val []byte + db *bolt.DB + err error + ) if previous == nil { return false, store.ErrPreviousNotSpecified } - db := b.client + if db, err = b.getDBhandle(); err != nil { + return false, err + } + defer b.releaseDBhandle() - err := db.Update(func(tx *bolt.Tx) error { + err = db.Update(func(tx *bolt.Tx) error { bucket := tx.Bucket(b.boltBucket) if bucket == nil { return ErrBoltBucketNotFound } val = bucket.Get([]byte(key)) - dbIndex = binary.LittleEndian.Uint64(val[:libkvmetadatalen]) + dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen]) if dbIndex != previous.LastIndex { return store.ErrKeyModified } @@ -232,13 +331,20 @@ func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error) // AtomicPut puts a value at "key" if the key has not been // modified since the last Put, throws an error if this is the case func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) { - var val []byte - var dbIndex uint64 + var ( + val []byte + dbIndex uint64 + db *bolt.DB + err error + ) dbval := make([]byte, libkvmetadatalen) - db := b.client + if db, err = b.getDBhandle(); err != nil { + return false, nil, err + } + defer b.releaseDBhandle() - err := db.Update(func(tx *bolt.Tx) error { + err = db.Update(func(tx *bolt.Tx) error { var err error bucket := tx.Bucket(b.boltBucket) if bucket == nil { @@ -285,18 +391,29 @@ func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, opt // Close the db connection to the BoltDB func (b *BoltDB) Close() { - db := b.client - - db.Close() + if !b.PersistConnection { + b.reset() + } else { + b.client.Close() + } + return } // DeleteTree deletes a range of keys with a given prefix func (b *BoltDB) DeleteTree(keyPrefix string) error { - db := b.client - err := db.Update(func(tx *bolt.Tx) error { + var ( + db *bolt.DB + err error + ) + if db, err = b.getDBhandle(); err != nil { + return err + } + defer b.releaseDBhandle() + + err = db.Update(func(tx *bolt.Tx) error { bucket := tx.Bucket(b.boltBucket) if bucket == nil { - return (ErrBoltBucketNotFound) + return ErrBoltBucketNotFound } cursor := bucket.Cursor() diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/boltdb/boltdb_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/boltdb/boltdb_test.go index d277f44310..3eb4e84b21 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/boltdb/boltdb_test.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/boltdb/boltdb_test.go @@ -39,11 +39,16 @@ func TestRegister(t *testing.T) { _ = os.Remove("/tmp/not_exist_dir/__boltdbtest") } -func TestTimeout(t *testing.T) { +// TestMultiplePersistConnection tests the second connection to a +// BoltDB fails when one is already open with PersistConnection flag +func TestMultiplePersistConnection(t *testing.T) { kv, err := libkv.NewStore( store.BOLTDB, []string{"/tmp/not_exist_dir/__boltdbtest"}, - &store.Config{Bucket: "boltDBTest", ConnectionTimeout: 1 * time.Second}, + &store.Config{ + Bucket: "boltDBTest", + ConnectionTimeout: 1 * time.Second, + PersistConnection: true}, ) assert.NoError(t, err) assert.NotNil(t, kv) @@ -56,13 +61,79 @@ func TestTimeout(t *testing.T) { kv, err = libkv.NewStore( store.BOLTDB, []string{"/tmp/not_exist_dir/__boltdbtest"}, - &store.Config{Bucket: "boltDBTest", ConnectionTimeout: 1 * time.Second}, + &store.Config{ + Bucket: "boltDBTest", + ConnectionTimeout: 1 * time.Second, + PersistConnection: true}, ) assert.Error(t, err) _ = os.Remove("/tmp/not_exist_dir/__boltdbtest") } +// TestConcurrentConnection tests simultaenous get/put using +// two handles. +func TestConcurrentConnection(t *testing.T) { + var err error + kv1, err1 := libkv.NewStore( + store.BOLTDB, + []string{"/tmp/__boltdbtest"}, + &store.Config{ + Bucket: "boltDBTest", + ConnectionTimeout: 1 * time.Second}, + ) + assert.NoError(t, err1) + assert.NotNil(t, kv1) + + kv2, err2 := libkv.NewStore( + store.BOLTDB, + []string{"/tmp/__boltdbtest"}, + &store.Config{Bucket: "boltDBTest", + ConnectionTimeout: 1 * time.Second}, + ) + assert.NoError(t, err2) + assert.NotNil(t, kv2) + + key1 := "TestKV1" + value1 := []byte("TestVal1") + err = kv1.Put(key1, value1, nil) + assert.NoError(t, err) + + key2 := "TestKV2" + value2 := []byte("TestVal2") + err = kv2.Put(key2, value2, nil) + assert.NoError(t, err) + + pair1, err1 := kv1.Get(key1) + assert.NoError(t, err) + if assert.NotNil(t, pair1) { + assert.NotNil(t, pair1.Value) + } + assert.Equal(t, pair1.Value, value1) + + pair2, err2 := kv2.Get(key2) + assert.NoError(t, err) + if assert.NotNil(t, pair2) { + assert.NotNil(t, pair2.Value) + } + assert.Equal(t, pair2.Value, value2) + + // AtomicPut using kv1 and kv2 should succeed + _, _, err = kv1.AtomicPut(key1, []byte("TestnewVal1"), pair1, nil) + assert.NoError(t, err) + + _, _, err = kv2.AtomicPut(key2, []byte("TestnewVal2"), pair2, nil) + assert.NoError(t, err) + + testutils.RunTestCommon(t, kv1) + testutils.RunTestCommon(t, kv2) + + kv1.Close() + kv2.Close() + + _ = os.Remove("/tmp/__boltdbtest") +} + func TestBoldDBStore(t *testing.T) { kv := makeBoltDBClient(t) diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go index 76762ce54a..1462d2dae3 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go @@ -35,7 +35,8 @@ type Consul struct { } type consulLock struct { - lock *api.Lock + lock *api.Lock + renewCh chan struct{} } // Register registers consul to libkv @@ -87,7 +88,7 @@ func (s *Consul) setTLS(tls *tls.Config) { s.config.Scheme = "https" } -// SetTimeout sets the timout for connecting to Consul +// SetTimeout sets the timeout for connecting to Consul func (s *Consul) setTimeout(time time.Duration) { s.config.WaitTime = time } @@ -360,32 +361,63 @@ func (s *Consul) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []* // NewLock returns a handle to a lock struct which can // be used to provide mutual exclusion on a key func (s *Consul) NewLock(key string, options *store.LockOptions) (store.Locker, error) { - consulOpts := &api.LockOptions{ + lockOpts := &api.LockOptions{ Key: s.normalize(key), } + lock := &consulLock{} + if options != nil { - consulOpts.Value = options.Value + // Set optional TTL on Lock + if options.TTL != 0 { + entry := &api.SessionEntry{ + Behavior: api.SessionBehaviorRelease, // Release the lock when the session expires + TTL: (options.TTL / 2).String(), // Consul multiplies the TTL by 2x + LockDelay: 1 * time.Millisecond, // Virtually disable lock delay + } + + // Create the key session + session, _, err := s.client.Session().Create(entry, nil) + if err != nil { + return nil, err + } + + // Place the session on lock + lockOpts.Session = session + + // Renew the session ttl lock periodically + go s.client.Session().RenewPeriodic(entry.TTL, session, nil, options.RenewLock) + lock.renewCh = options.RenewLock + } + + // Set optional value on Lock + if options.Value != nil { + lockOpts.Value = options.Value + } } - l, err := s.client.LockOpts(consulOpts) + l, err := s.client.LockOpts(lockOpts) if err != nil { return nil, err } - return &consulLock{lock: l}, nil + lock.lock = l + return lock, nil } // Lock attempts to acquire the lock and blocks while // doing so. It returns a channel that is closed if our // lock is lost or if an error occurs -func (l *consulLock) Lock() (<-chan struct{}, error) { - return l.lock.Lock(nil) +func (l *consulLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) { + return l.lock.Lock(stopChan) } // Unlock the "key". Calling unlock while // not holding the lock will throw an error func (l *consulLock) Unlock() error { + if l.renewCh != nil { + close(l.renewCh) + } return l.lock.Unlock() } diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul_test.go index a15bf9e91d..5019494ca7 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul_test.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul_test.go @@ -44,13 +44,15 @@ func TestRegister(t *testing.T) { func TestConsulStore(t *testing.T) { kv := makeConsulClient(t) - backup := makeConsulClient(t) + lockKV := makeConsulClient(t) + ttlKV := makeConsulClient(t) testutils.RunTestCommon(t, kv) testutils.RunTestAtomic(t, kv) testutils.RunTestWatch(t, kv) testutils.RunTestLock(t, kv) - testutils.RunTestTTL(t, kv, backup) + testutils.RunTestLockTTL(t, kv, lockKV) + testutils.RunTestTTL(t, kv, ttlKV) testutils.RunCleanup(t, kv) } diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go index 233b3c671e..d27b13a758 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go @@ -2,6 +2,7 @@ package etcd import ( "crypto/tls" + "errors" "net" "net/http" "strings" @@ -12,6 +13,13 @@ import ( "github.com/docker/libkv/store" ) +var ( + // ErrAbortTryLock is thrown when a user stops trying to seek the lock + // by sending a signal to the stop chan, this is used to verify if the + // operation succeeded + ErrAbortTryLock = errors.New("lock operation aborted") +) + // Etcd is the receiver type for the // Store interface type Etcd struct { @@ -19,12 +27,13 @@ type Etcd struct { } type etcdLock struct { - client *etcd.Client - stopLock chan struct{} - key string - value string - last *etcd.Response - ttl uint64 + client *etcd.Client + stopLock chan struct{} + stopRenew chan struct{} + key string + value string + last *etcd.Response + ttl uint64 } const ( @@ -395,6 +404,7 @@ func (s *Etcd) DeleteTree(directory string) error { func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locker, err error) { var value string ttl := uint64(time.Duration(defaultLockTTL).Seconds()) + renewCh := make(chan struct{}) // Apply options on Lock if options != nil { @@ -404,14 +414,18 @@ func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locke if options.TTL != 0 { ttl = uint64(options.TTL.Seconds()) } + if options.RenewLock != nil { + renewCh = options.RenewLock + } } // Create lock object lock = &etcdLock{ - client: s.client, - key: key, - value: value, - ttl: ttl, + client: s.client, + stopRenew: renewCh, + key: key, + value: value, + ttl: ttl, } return lock, nil @@ -420,13 +434,13 @@ func (s *Etcd) NewLock(key string, options *store.LockOptions) (lock store.Locke // Lock attempts to acquire the lock and blocks while // doing so. It returns a channel that is closed if our // lock is lost or if an error occurs -func (l *etcdLock) Lock() (<-chan struct{}, error) { +func (l *etcdLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) { key := store.Normalize(l.key) - // Lock holder channels + // Lock holder channel lockHeld := make(chan struct{}) - stopLocking := make(chan struct{}) + stopLocking := l.stopRenew var lastIndex uint64 @@ -454,7 +468,18 @@ func (l *etcdLock) Lock() (<-chan struct{}, error) { // Seeker section chW := make(chan *etcd.Response) chWStop := make(chan bool) - l.waitLock(key, chW, chWStop) + free := make(chan bool) + + go l.waitLock(key, chW, chWStop, free) + + // Wait for the key to be available or for + // a signal to stop trying to lock the key + select { + case _ = <-free: + break + case _ = <-stopChan: + return nil, ErrAbortTryLock + } // Delete or Expire event occured // Retry @@ -467,10 +492,10 @@ func (l *etcdLock) Lock() (<-chan struct{}, error) { // Hold the lock as long as we can // Updates the key ttl periodically until we receive // an explicit stop signal from the Unlock method -func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking chan struct{}) { +func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking <-chan struct{}) { defer close(lockHeld) - update := time.NewTicker(defaultUpdateTime) + update := time.NewTicker(time.Duration(l.ttl) * time.Second / 3) defer update.Stop() var err error @@ -490,11 +515,12 @@ func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking chan } // WaitLock simply waits for the key to be available for creation -func (l *etcdLock) waitLock(key string, eventCh chan *etcd.Response, stopWatchCh chan bool) { +func (l *etcdLock) waitLock(key string, eventCh chan *etcd.Response, stopWatchCh chan bool, free chan<- bool) { go l.client.Watch(key, 0, false, eventCh, stopWatchCh) + for event := range eventCh { if event.Action == "delete" || event.Action == "expire" { - return + free <- true } } } diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd_test.go index 3f79ce09f7..67daf3e54a 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd_test.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd_test.go @@ -43,12 +43,14 @@ func TestRegister(t *testing.T) { func TestEtcdStore(t *testing.T) { kv := makeEtcdClient(t) - backup := makeEtcdClient(t) + lockKV := makeEtcdClient(t) + ttlKV := makeEtcdClient(t) testutils.RunTestCommon(t, kv) testutils.RunTestAtomic(t, kv) testutils.RunTestWatch(t, kv) testutils.RunTestLock(t, kv) - testutils.RunTestTTL(t, kv, backup) + testutils.RunTestLockTTL(t, kv, lockKV) + testutils.RunTestTTL(t, kv, ttlKV) testutils.RunCleanup(t, kv) } diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/mock/mock.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/mock/mock.go index 9f63e50ae0..82a5b03b47 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/mock/mock.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/mock/mock.go @@ -96,8 +96,8 @@ type Lock struct { } // Lock mock -func (l *Lock) Lock() (<-chan struct{}, error) { - args := l.Mock.Called() +func (l *Lock) Lock(stopCh chan struct{}) (<-chan struct{}, error) { + args := l.Mock.Called(stopCh) return args.Get(0).(<-chan struct{}), args.Error(1) } diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/store.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/store.go index 108242bd5c..e7aace9b3d 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/store.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/store.go @@ -43,6 +43,7 @@ type Config struct { TLS *tls.Config ConnectionTimeout time.Duration Bucket string + PersistConnection bool } // ClientTLSConfig contains data for a Client TLS configuration in the form @@ -113,13 +114,14 @@ type WriteOptions struct { // LockOptions contains optional request parameters type LockOptions struct { - Value []byte // Optional, value to associate with the lock - TTL time.Duration // Optional, expiration ttl associated with the lock + Value []byte // Optional, value to associate with the lock + TTL time.Duration // Optional, expiration ttl associated with the lock + RenewLock chan struct{} // Optional, chan used to control and stop the session ttl renewal for the lock } // Locker provides locking mechanism on top of the store. // Similar to `sync.Lock` except it may return errors. type Locker interface { - Lock() (<-chan struct{}, error) + Lock(stopChan chan struct{}) (<-chan struct{}, error) Unlock() error } diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go index ff6b481947..858226d425 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go @@ -10,6 +10,9 @@ import ( ) const ( + // SOH control character + SOH = "\x01" + defaultTimeout = 10 * time.Second ) @@ -72,6 +75,12 @@ func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) { return nil, err } + // FIXME handle very rare cases where Get returns the + // SOH control character instead of the actual value + if string(resp) == SOH { + return s.Get(store.Normalize(key)) + } + pair = &store.KVPair{ Key: key, Value: resp, @@ -371,7 +380,7 @@ func (s *Zookeeper) NewLock(key string, options *store.LockOptions) (lock store. // Lock attempts to acquire the lock and blocks while // doing so. It returns a channel that is closed if our // lock is lost or if an error occurs -func (l *zookeeperLock) Lock() (<-chan struct{}, error) { +func (l *zookeeperLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) { err := l.lock.Lock() if err == nil { diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper_test.go index 739c2ba619..c36087e034 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper_test.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper_test.go @@ -43,12 +43,12 @@ func TestRegister(t *testing.T) { func TestZkStore(t *testing.T) { kv := makeZkClient(t) - backup := makeZkClient(t) + ttlKV := makeZkClient(t) testutils.RunTestCommon(t, kv) testutils.RunTestAtomic(t, kv) testutils.RunTestWatch(t, kv) testutils.RunTestLock(t, kv) - testutils.RunTestTTL(t, kv, backup) + testutils.RunTestTTL(t, kv, ttlKV) testutils.RunCleanup(t, kv) } diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go index bb8f26336d..2c71d7c49f 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go @@ -38,6 +38,12 @@ func RunTestLock(t *testing.T, kv store.Store) { testLockUnlock(t, kv) } +// RunTestLockTTL tests the KV pair Lock with TTL APIs supported +// by the K/V backends. +func RunTestLockTTL(t *testing.T, kv store.Store, backup store.Store) { + testLockTTL(t, kv, backup) +} + // RunTestTTL tests the TTL funtionality of the K/V backend. func RunTestTTL(t *testing.T, kv store.Store, backup store.Store) { testPutTTL(t, kv, backup) @@ -307,7 +313,7 @@ func testLockUnlock(t *testing.T, kv store.Store) { assert.NotNil(t, lock) // Lock should successfully succeed or block - lockChan, err := lock.Lock() + lockChan, err := lock.Lock(nil) assert.NoError(t, err) assert.NotNil(t, lockChan) @@ -325,7 +331,7 @@ func testLockUnlock(t *testing.T, kv store.Store) { assert.NoError(t, err) // Lock should succeed again - lockChan, err = lock.Lock() + lockChan, err = lock.Lock(nil) assert.NoError(t, err) assert.NotNil(t, lockChan) @@ -337,6 +343,110 @@ func testLockUnlock(t *testing.T, kv store.Store) { } assert.Equal(t, pair.Value, value) assert.NotEqual(t, pair.LastIndex, 0) + + err = lock.Unlock() + assert.NoError(t, err) +} + +func testLockTTL(t *testing.T, kv store.Store, otherConn store.Store) { + key := "testLockTTL" + value := []byte("bar") + + renewCh := make(chan struct{}) + + // We should be able to create a new lock on key + lock, err := otherConn.NewLock(key, &store.LockOptions{ + Value: value, + TTL: 2 * time.Second, + RenewLock: renewCh, + }) + assert.NoError(t, err) + assert.NotNil(t, lock) + + // Lock should successfully succeed + lockChan, err := lock.Lock(nil) + assert.NoError(t, err) + assert.NotNil(t, lockChan) + + // Get should work + pair, err := otherConn.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + time.Sleep(3 * time.Second) + + done := make(chan struct{}) + stop := make(chan struct{}) + + value = []byte("foobar") + + // Create a new lock with another connection + lock, err = kv.NewLock( + key, + &store.LockOptions{ + Value: value, + TTL: 3 * time.Second, + }, + ) + assert.NoError(t, err) + assert.NotNil(t, lock) + + // Lock should block, the session on the lock + // is still active and renewed periodically + go func(<-chan struct{}) { + _, _ = lock.Lock(stop) + done <- struct{}{} + }(done) + + select { + case _ = <-done: + t.Fatal("Lock succeeded on a key that is supposed to be locked by another client") + case <-time.After(4 * time.Second): + // Stop requesting the lock as we are blocked as expected + stop <- struct{}{} + break + } + + // Close the connection + otherConn.Close() + + // Force stop the session renewal for the lock + close(renewCh) + + // Let the session on the lock expire + time.Sleep(3 * time.Second) + locked := make(chan struct{}) + + // Lock should now succeed for the other client + go func(<-chan struct{}) { + lockChan, err = lock.Lock(nil) + assert.NoError(t, err) + assert.NotNil(t, lockChan) + locked <- struct{}{} + }(locked) + + select { + case _ = <-locked: + break + case <-time.After(4 * time.Second): + t.Fatal("Unable to take the lock, timed out") + } + + // Get should work with the new value + pair, err = kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + assert.NotEqual(t, pair.LastIndex, 0) + + err = lock.Unlock() + assert.NoError(t, err) } func testPutTTL(t *testing.T, kv store.Store, otherConn store.Store) { @@ -482,6 +592,7 @@ func RunCleanup(t *testing.T, kv store.Store) { "testAtomicPutCreate", "testAtomicDelete", "testLockUnlock", + "testLockTTL", "testPutTTL", "testList", "testDeleteTree", diff --git a/libnetwork/datastore/datastore.go b/libnetwork/datastore/datastore.go index b5315023e2..726c1fe711 100644 --- a/libnetwork/datastore/datastore.go +++ b/libnetwork/datastore/datastore.go @@ -6,7 +6,6 @@ import ( "reflect" "strings" "sync" - "time" "github.com/docker/libkv" "github.com/docker/libkv/store" @@ -141,8 +140,7 @@ func makeDefaultScopes() map[string]*ScopeCfg { Provider: "boltdb", Address: defaultPrefix + "/boltdb.db", Config: &store.Config{ - Bucket: "libnetwork", - ConnectionTimeout: 3 * time.Second, + Bucket: "libnetwork", }, }, } diff --git a/libnetwork/store_test.go b/libnetwork/store_test.go index b435a788d2..61a4a3d72c 100644 --- a/libnetwork/store_test.go +++ b/libnetwork/store_test.go @@ -5,7 +5,6 @@ import ( "io/ioutil" "os" "testing" - "time" "github.com/docker/libkv/store" "github.com/docker/libnetwork/config" @@ -36,7 +35,7 @@ func TestBoltdbBackend(t *testing.T) { defer os.Remove(datastore.DefaultScopes("")[datastore.LocalScope].Client.Address) testLocalBackend(t, "", "", nil) defer os.Remove("/tmp/boltdb.db") - config := &store.Config{Bucket: "testBackend", ConnectionTimeout: 3 * time.Second} + config := &store.Config{Bucket: "testBackend"} testLocalBackend(t, "boltdb", "/tmp/boltdb.db", config) } @@ -130,12 +129,12 @@ func OptionBoltdbWithRandomDBFile() ([]config.Option, error) { cfgOptions := []config.Option{} cfgOptions = append(cfgOptions, config.OptionLocalKVProvider("boltdb")) cfgOptions = append(cfgOptions, config.OptionLocalKVProviderURL(tmp.Name())) - sCfg := &store.Config{Bucket: "testBackend", ConnectionTimeout: 3 * time.Second} + sCfg := &store.Config{Bucket: "testBackend"} cfgOptions = append(cfgOptions, config.OptionLocalKVProviderConfig(sCfg)) return cfgOptions, nil } -func TestLocalStoreLockTimeout(t *testing.T) { +func TestMultipleControllersWithSameStore(t *testing.T) { cfgOptions, err := OptionBoltdbWithRandomDBFile() if err != nil { t.Fatalf("Error getting random boltdb configs %v", err) @@ -146,11 +145,8 @@ func TestLocalStoreLockTimeout(t *testing.T) { } defer ctrl1.Stop() // Use the same boltdb file without closing the previous controller - // with a slightly altered configuration - sCfg := &store.Config{Bucket: "testBackend", ConnectionTimeout: 1 * time.Second} - _, err = New(append(cfgOptions[:len(cfgOptions)-1], - config.OptionLocalKVProviderConfig(sCfg))...) - if err == nil { - t.Fatalf("Expected to fail but succeeded") + _, err = New(cfgOptions...) + if err != nil { + t.Fatalf("Local store must support concurrent controllers") } }