|
@@ -22,6 +22,14 @@ const (
|
|
// RenewSessionRetryMax is the number of time we should try
|
|
// RenewSessionRetryMax is the number of time we should try
|
|
// to renew the session before giving up and throwing an error
|
|
// to renew the session before giving up and throwing an error
|
|
RenewSessionRetryMax = 5
|
|
RenewSessionRetryMax = 5
|
|
|
|
+
|
|
|
|
+ // MaxSessionDestroyAttempts is the maximum times we will try
|
|
|
|
+ // to explicitely destroy the session attached to a lock after
|
|
|
|
+ // the connectivity to the store has been lost
|
|
|
|
+ MaxSessionDestroyAttempts = 5
|
|
|
|
+
|
|
|
|
+ // defaultLockTTL is the default ttl for the consul lock
|
|
|
|
+ defaultLockTTL = 20 * time.Second
|
|
)
|
|
)
|
|
|
|
|
|
var (
|
|
var (
|
|
@@ -186,6 +194,7 @@ func (s *Consul) Put(key string, value []byte, opts *store.WriteOptions) error {
|
|
p := &api.KVPair{
|
|
p := &api.KVPair{
|
|
Key: key,
|
|
Key: key,
|
|
Value: value,
|
|
Value: value,
|
|
|
|
+ Flags: api.LockFlagValue,
|
|
}
|
|
}
|
|
|
|
|
|
if opts != nil && opts.TTL > 0 {
|
|
if opts != nil && opts.TTL > 0 {
|
|
@@ -378,44 +387,99 @@ func (s *Consul) NewLock(key string, options *store.LockOptions) (store.Locker,
|
|
|
|
|
|
lock := &consulLock{}
|
|
lock := &consulLock{}
|
|
|
|
|
|
|
|
+ ttl := defaultLockTTL
|
|
|
|
+
|
|
if options != nil {
|
|
if options != nil {
|
|
// Set optional TTL on Lock
|
|
// Set optional TTL on Lock
|
|
if options.TTL != 0 {
|
|
if options.TTL != 0 {
|
|
- entry := &api.SessionEntry{
|
|
|
|
- Behavior: api.SessionBehaviorRelease, // Release the lock when the session expires
|
|
|
|
- TTL: (options.TTL / 2).String(), // Consul multiplies the TTL by 2x
|
|
|
|
- LockDelay: 1 * time.Millisecond, // Virtually disable lock delay
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Create the key session
|
|
|
|
- session, _, err := s.client.Session().Create(entry, nil)
|
|
|
|
- if err != nil {
|
|
|
|
- return nil, err
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Place the session on lock
|
|
|
|
- lockOpts.Session = session
|
|
|
|
-
|
|
|
|
- // Renew the session ttl lock periodically
|
|
|
|
- go s.client.Session().RenewPeriodic(entry.TTL, session, nil, options.RenewLock)
|
|
|
|
- lock.renewCh = options.RenewLock
|
|
|
|
|
|
+ ttl = options.TTL
|
|
}
|
|
}
|
|
-
|
|
|
|
// Set optional value on Lock
|
|
// Set optional value on Lock
|
|
if options.Value != nil {
|
|
if options.Value != nil {
|
|
lockOpts.Value = options.Value
|
|
lockOpts.Value = options.Value
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ entry := &api.SessionEntry{
|
|
|
|
+ Behavior: api.SessionBehaviorRelease, // Release the lock when the session expires
|
|
|
|
+ TTL: (ttl / 2).String(), // Consul multiplies the TTL by 2x
|
|
|
|
+ LockDelay: 1 * time.Millisecond, // Virtually disable lock delay
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Create the key session
|
|
|
|
+ session, _, err := s.client.Session().Create(entry, nil)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Place the session and renew chan on lock
|
|
|
|
+ lockOpts.Session = session
|
|
|
|
+ lock.renewCh = options.RenewLock
|
|
|
|
+
|
|
l, err := s.client.LockOpts(lockOpts)
|
|
l, err := s.client.LockOpts(lockOpts)
|
|
if err != nil {
|
|
if err != nil {
|
|
return nil, err
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // Renew the session ttl lock periodically
|
|
|
|
+ s.renewLockSession(entry.TTL, session, options.RenewLock)
|
|
|
|
+
|
|
lock.lock = l
|
|
lock.lock = l
|
|
return lock, nil
|
|
return lock, nil
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// renewLockSession is used to renew a session Lock, it takes
|
|
|
|
+// a stopRenew chan which is used to explicitely stop the session
|
|
|
|
+// renew process. The renew routine never stops until a signal is
|
|
|
|
+// sent to this channel. If deleting the session fails because the
|
|
|
|
+// connection to the store is lost, it keeps trying to delete the
|
|
|
|
+// session periodically until it can contact the store, this ensures
|
|
|
|
+// that the lock is not maintained indefinitely which ensures liveness
|
|
|
|
+// over safety for the lock when the store becomes unavailable.
|
|
|
|
+func (s *Consul) renewLockSession(initialTTL string, id string, stopRenew chan struct{}) {
|
|
|
|
+ sessionDestroyAttempts := 0
|
|
|
|
+ ttl, err := time.ParseDuration(initialTTL)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ go func() {
|
|
|
|
+ for {
|
|
|
|
+ select {
|
|
|
|
+ case <-time.After(ttl / 2):
|
|
|
|
+ entry, _, err := s.client.Session().Renew(id, nil)
|
|
|
|
+ if err != nil {
|
|
|
|
+ // If an error occurs, continue until the
|
|
|
|
+ // session gets destroyed explicitely or
|
|
|
|
+ // the session ttl times out
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ if entry == nil {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Handle the server updating the TTL
|
|
|
|
+ ttl, _ = time.ParseDuration(entry.TTL)
|
|
|
|
+
|
|
|
|
+ case <-stopRenew:
|
|
|
|
+ // Attempt a session destroy
|
|
|
|
+ _, err := s.client.Session().Destroy(id, nil)
|
|
|
|
+ if err == nil {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if sessionDestroyAttempts >= MaxSessionDestroyAttempts {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // We can't destroy the session because the store
|
|
|
|
+ // is unavailable, wait for the session renew period
|
|
|
|
+ sessionDestroyAttempts++
|
|
|
|
+ time.Sleep(ttl / 2)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }()
|
|
|
|
+}
|
|
|
|
+
|
|
// Lock attempts to acquire the lock and blocks while
|
|
// Lock attempts to acquire the lock and blocks while
|
|
// doing so. It returns a channel that is closed if our
|
|
// doing so. It returns a channel that is closed if our
|
|
// lock is lost or if an error occurs
|
|
// lock is lost or if an error occurs
|
|
@@ -436,7 +500,7 @@ func (l *consulLock) Unlock() error {
|
|
// modified in the meantime, throws an error if this is the case
|
|
// modified in the meantime, throws an error if this is the case
|
|
func (s *Consul) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
|
|
func (s *Consul) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
|
|
|
|
|
|
- p := &api.KVPair{Key: s.normalize(key), Value: value}
|
|
|
|
|
|
+ p := &api.KVPair{Key: s.normalize(key), Value: value, Flags: api.LockFlagValue}
|
|
|
|
|
|
if previous == nil {
|
|
if previous == nil {
|
|
// Consul interprets ModifyIndex = 0 as new key.
|
|
// Consul interprets ModifyIndex = 0 as new key.
|
|
@@ -445,9 +509,14 @@ func (s *Consul) AtomicPut(key string, value []byte, previous *store.KVPair, opt
|
|
p.ModifyIndex = previous.LastIndex
|
|
p.ModifyIndex = previous.LastIndex
|
|
}
|
|
}
|
|
|
|
|
|
- if work, _, err := s.client.KV().CAS(p, nil); err != nil {
|
|
|
|
|
|
+ ok, _, err := s.client.KV().CAS(p, nil)
|
|
|
|
+ if err != nil {
|
|
return false, nil, err
|
|
return false, nil, err
|
|
- } else if !work {
|
|
|
|
|
|
+ }
|
|
|
|
+ if !ok {
|
|
|
|
+ if previous == nil {
|
|
|
|
+ return false, nil, store.ErrKeyExists
|
|
|
|
+ }
|
|
return false, nil, store.ErrKeyModified
|
|
return false, nil, store.ErrKeyModified
|
|
}
|
|
}
|
|
|
|
|
|
@@ -466,7 +535,7 @@ func (s *Consul) AtomicDelete(key string, previous *store.KVPair) (bool, error)
|
|
return false, store.ErrPreviousNotSpecified
|
|
return false, store.ErrPreviousNotSpecified
|
|
}
|
|
}
|
|
|
|
|
|
- p := &api.KVPair{Key: s.normalize(key), ModifyIndex: previous.LastIndex}
|
|
|
|
|
|
+ p := &api.KVPair{Key: s.normalize(key), ModifyIndex: previous.LastIndex, Flags: api.LockFlagValue}
|
|
|
|
|
|
// Extra Get operation to check on the key
|
|
// Extra Get operation to check on the key
|
|
_, err := s.Get(key)
|
|
_, err := s.Get(key)
|