lock.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. package api
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. const (
  8. // DefaultLockSessionName is the Session Name we assign if none is provided
  9. DefaultLockSessionName = "Consul API Lock"
  10. // DefaultLockSessionTTL is the default session TTL if no Session is provided
  11. // when creating a new Lock. This is used because we do not have another
  12. // other check to depend upon.
  13. DefaultLockSessionTTL = "15s"
  14. // DefaultLockWaitTime is how long we block for at a time to check if lock
  15. // acquisition is possible. This affects the minimum time it takes to cancel
  16. // a Lock acquisition.
  17. DefaultLockWaitTime = 15 * time.Second
  18. // DefaultLockRetryTime is how long we wait after a failed lock acquisition
  19. // before attempting to do the lock again. This is so that once a lock-delay
  20. // is in affect, we do not hot loop retrying the acquisition.
  21. DefaultLockRetryTime = 5 * time.Second
  22. // LockFlagValue is a magic flag we set to indicate a key
  23. // is being used for a lock. It is used to detect a potential
  24. // conflict with a semaphore.
  25. LockFlagValue = 0x2ddccbc058a50c18
  26. )
  27. var (
  28. // ErrLockHeld is returned if we attempt to double lock
  29. ErrLockHeld = fmt.Errorf("Lock already held")
  30. // ErrLockNotHeld is returned if we attempt to unlock a lock
  31. // that we do not hold.
  32. ErrLockNotHeld = fmt.Errorf("Lock not held")
  33. // ErrLockInUse is returned if we attempt to destroy a lock
  34. // that is in use.
  35. ErrLockInUse = fmt.Errorf("Lock in use")
  36. // ErrLockConflict is returned if the flags on a key
  37. // used for a lock do not match expectation
  38. ErrLockConflict = fmt.Errorf("Existing key does not match lock use")
  39. )
  40. // Lock is used to implement client-side leader election. It is follows the
  41. // algorithm as described here: https://consul.io/docs/guides/leader-election.html.
  42. type Lock struct {
  43. c *Client
  44. opts *LockOptions
  45. isHeld bool
  46. sessionRenew chan struct{}
  47. lockSession string
  48. l sync.Mutex
  49. }
  50. // LockOptions is used to parameterize the Lock behavior.
  51. type LockOptions struct {
  52. Key string // Must be set and have write permissions
  53. Value []byte // Optional, value to associate with the lock
  54. Session string // Optional, created if not specified
  55. SessionName string // Optional, defaults to DefaultLockSessionName
  56. SessionTTL string // Optional, defaults to DefaultLockSessionTTL
  57. }
  58. // LockKey returns a handle to a lock struct which can be used
  59. // to acquire and release the mutex. The key used must have
  60. // write permissions.
  61. func (c *Client) LockKey(key string) (*Lock, error) {
  62. opts := &LockOptions{
  63. Key: key,
  64. }
  65. return c.LockOpts(opts)
  66. }
  67. // LockOpts returns a handle to a lock struct which can be used
  68. // to acquire and release the mutex. The key used must have
  69. // write permissions.
  70. func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) {
  71. if opts.Key == "" {
  72. return nil, fmt.Errorf("missing key")
  73. }
  74. if opts.SessionName == "" {
  75. opts.SessionName = DefaultLockSessionName
  76. }
  77. if opts.SessionTTL == "" {
  78. opts.SessionTTL = DefaultLockSessionTTL
  79. } else {
  80. if _, err := time.ParseDuration(opts.SessionTTL); err != nil {
  81. return nil, fmt.Errorf("invalid SessionTTL: %v", err)
  82. }
  83. }
  84. l := &Lock{
  85. c: c,
  86. opts: opts,
  87. }
  88. return l, nil
  89. }
  90. // Lock attempts to acquire the lock and blocks while doing so.
  91. // Providing a non-nil stopCh can be used to abort the lock attempt.
  92. // Returns a channel that is closed if our lock is lost or an error.
  93. // This channel could be closed at any time due to session invalidation,
  94. // communication errors, operator intervention, etc. It is NOT safe to
  95. // assume that the lock is held until Unlock() unless the Session is specifically
  96. // created without any associated health checks. By default Consul sessions
  97. // prefer liveness over safety and an application must be able to handle
  98. // the lock being lost.
  99. func (l *Lock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
  100. // Hold the lock as we try to acquire
  101. l.l.Lock()
  102. defer l.l.Unlock()
  103. // Check if we already hold the lock
  104. if l.isHeld {
  105. return nil, ErrLockHeld
  106. }
  107. // Check if we need to create a session first
  108. l.lockSession = l.opts.Session
  109. if l.lockSession == "" {
  110. if s, err := l.createSession(); err != nil {
  111. return nil, fmt.Errorf("failed to create session: %v", err)
  112. } else {
  113. l.sessionRenew = make(chan struct{})
  114. l.lockSession = s
  115. session := l.c.Session()
  116. go session.RenewPeriodic(l.opts.SessionTTL, s, nil, l.sessionRenew)
  117. // If we fail to acquire the lock, cleanup the session
  118. defer func() {
  119. if !l.isHeld {
  120. close(l.sessionRenew)
  121. l.sessionRenew = nil
  122. }
  123. }()
  124. }
  125. }
  126. // Setup the query options
  127. kv := l.c.KV()
  128. qOpts := &QueryOptions{
  129. WaitTime: DefaultLockWaitTime,
  130. }
  131. WAIT:
  132. // Check if we should quit
  133. select {
  134. case <-stopCh:
  135. return nil, nil
  136. default:
  137. }
  138. // Look for an existing lock, blocking until not taken
  139. pair, meta, err := kv.Get(l.opts.Key, qOpts)
  140. if err != nil {
  141. return nil, fmt.Errorf("failed to read lock: %v", err)
  142. }
  143. if pair != nil && pair.Flags != LockFlagValue {
  144. return nil, ErrLockConflict
  145. }
  146. locked := false
  147. if pair != nil && pair.Session == l.lockSession {
  148. goto HELD
  149. }
  150. if pair != nil && pair.Session != "" {
  151. qOpts.WaitIndex = meta.LastIndex
  152. goto WAIT
  153. }
  154. // Try to acquire the lock
  155. pair = l.lockEntry(l.lockSession)
  156. locked, _, err = kv.Acquire(pair, nil)
  157. if err != nil {
  158. return nil, fmt.Errorf("failed to acquire lock: %v", err)
  159. }
  160. // Handle the case of not getting the lock
  161. if !locked {
  162. select {
  163. case <-time.After(DefaultLockRetryTime):
  164. goto WAIT
  165. case <-stopCh:
  166. return nil, nil
  167. }
  168. }
  169. HELD:
  170. // Watch to ensure we maintain leadership
  171. leaderCh := make(chan struct{})
  172. go l.monitorLock(l.lockSession, leaderCh)
  173. // Set that we own the lock
  174. l.isHeld = true
  175. // Locked! All done
  176. return leaderCh, nil
  177. }
  178. // Unlock released the lock. It is an error to call this
  179. // if the lock is not currently held.
  180. func (l *Lock) Unlock() error {
  181. // Hold the lock as we try to release
  182. l.l.Lock()
  183. defer l.l.Unlock()
  184. // Ensure the lock is actually held
  185. if !l.isHeld {
  186. return ErrLockNotHeld
  187. }
  188. // Set that we no longer own the lock
  189. l.isHeld = false
  190. // Stop the session renew
  191. if l.sessionRenew != nil {
  192. defer func() {
  193. close(l.sessionRenew)
  194. l.sessionRenew = nil
  195. }()
  196. }
  197. // Get the lock entry, and clear the lock session
  198. lockEnt := l.lockEntry(l.lockSession)
  199. l.lockSession = ""
  200. // Release the lock explicitly
  201. kv := l.c.KV()
  202. _, _, err := kv.Release(lockEnt, nil)
  203. if err != nil {
  204. return fmt.Errorf("failed to release lock: %v", err)
  205. }
  206. return nil
  207. }
  208. // Destroy is used to cleanup the lock entry. It is not necessary
  209. // to invoke. It will fail if the lock is in use.
  210. func (l *Lock) Destroy() error {
  211. // Hold the lock as we try to release
  212. l.l.Lock()
  213. defer l.l.Unlock()
  214. // Check if we already hold the lock
  215. if l.isHeld {
  216. return ErrLockHeld
  217. }
  218. // Look for an existing lock
  219. kv := l.c.KV()
  220. pair, _, err := kv.Get(l.opts.Key, nil)
  221. if err != nil {
  222. return fmt.Errorf("failed to read lock: %v", err)
  223. }
  224. // Nothing to do if the lock does not exist
  225. if pair == nil {
  226. return nil
  227. }
  228. // Check for possible flag conflict
  229. if pair.Flags != LockFlagValue {
  230. return ErrLockConflict
  231. }
  232. // Check if it is in use
  233. if pair.Session != "" {
  234. return ErrLockInUse
  235. }
  236. // Attempt the delete
  237. didRemove, _, err := kv.DeleteCAS(pair, nil)
  238. if err != nil {
  239. return fmt.Errorf("failed to remove lock: %v", err)
  240. }
  241. if !didRemove {
  242. return ErrLockInUse
  243. }
  244. return nil
  245. }
  246. // createSession is used to create a new managed session
  247. func (l *Lock) createSession() (string, error) {
  248. session := l.c.Session()
  249. se := &SessionEntry{
  250. Name: l.opts.SessionName,
  251. TTL: l.opts.SessionTTL,
  252. }
  253. id, _, err := session.Create(se, nil)
  254. if err != nil {
  255. return "", err
  256. }
  257. return id, nil
  258. }
  259. // lockEntry returns a formatted KVPair for the lock
  260. func (l *Lock) lockEntry(session string) *KVPair {
  261. return &KVPair{
  262. Key: l.opts.Key,
  263. Value: l.opts.Value,
  264. Session: session,
  265. Flags: LockFlagValue,
  266. }
  267. }
  268. // monitorLock is a long running routine to monitor a lock ownership
  269. // It closes the stopCh if we lose our leadership.
  270. func (l *Lock) monitorLock(session string, stopCh chan struct{}) {
  271. defer close(stopCh)
  272. kv := l.c.KV()
  273. opts := &QueryOptions{RequireConsistent: true}
  274. WAIT:
  275. pair, meta, err := kv.Get(l.opts.Key, opts)
  276. if err != nil {
  277. return
  278. }
  279. if pair != nil && pair.Session == session {
  280. opts.WaitIndex = meta.LastIndex
  281. goto WAIT
  282. }
  283. }