lock.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package zk
  2. import (
  3. "errors"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. )
  8. var (
  9. ErrDeadlock = errors.New("zk: trying to acquire a lock twice")
  10. ErrNotLocked = errors.New("zk: not locked")
  11. )
  12. type Lock struct {
  13. c *Conn
  14. path string
  15. acl []ACL
  16. lockPath string
  17. seq int
  18. }
  19. func NewLock(c *Conn, path string, acl []ACL) *Lock {
  20. return &Lock{
  21. c: c,
  22. path: path,
  23. acl: acl,
  24. }
  25. }
  26. func parseSeq(path string) (int, error) {
  27. parts := strings.Split(path, "-")
  28. return strconv.Atoi(parts[len(parts)-1])
  29. }
  30. func (l *Lock) Lock() error {
  31. if l.lockPath != "" {
  32. return ErrDeadlock
  33. }
  34. prefix := fmt.Sprintf("%s/lock-", l.path)
  35. path := ""
  36. var err error
  37. for i := 0; i < 3; i++ {
  38. path, err = l.c.CreateProtectedEphemeralSequential(prefix, []byte{}, l.acl)
  39. if err == ErrNoNode {
  40. // Create parent node.
  41. parts := strings.Split(l.path, "/")
  42. pth := ""
  43. for _, p := range parts[1:] {
  44. pth += "/" + p
  45. _, err := l.c.Create(pth, []byte{}, 0, l.acl)
  46. if err != nil && err != ErrNodeExists {
  47. return err
  48. }
  49. }
  50. } else if err == nil {
  51. break
  52. } else {
  53. return err
  54. }
  55. }
  56. if err != nil {
  57. return err
  58. }
  59. seq, err := parseSeq(path)
  60. if err != nil {
  61. return err
  62. }
  63. for {
  64. children, _, err := l.c.Children(l.path)
  65. if err != nil {
  66. return err
  67. }
  68. lowestSeq := seq
  69. prevSeq := 0
  70. prevSeqPath := ""
  71. for _, p := range children {
  72. s, err := parseSeq(p)
  73. if err != nil {
  74. return err
  75. }
  76. if s < lowestSeq {
  77. lowestSeq = s
  78. }
  79. if s < seq && s > prevSeq {
  80. prevSeq = s
  81. prevSeqPath = p
  82. }
  83. }
  84. if seq == lowestSeq {
  85. // Acquired the lock
  86. break
  87. }
  88. // Wait on the node next in line for the lock
  89. _, _, ch, err := l.c.GetW(l.path + "/" + prevSeqPath)
  90. if err != nil && err != ErrNoNode {
  91. return err
  92. } else if err != nil && err == ErrNoNode {
  93. // try again
  94. continue
  95. }
  96. ev := <-ch
  97. if ev.Err != nil {
  98. return ev.Err
  99. }
  100. }
  101. l.seq = seq
  102. l.lockPath = path
  103. return nil
  104. }
  105. func (l *Lock) Unlock() error {
  106. if l.lockPath == "" {
  107. return ErrNotLocked
  108. }
  109. if err := l.c.Delete(l.lockPath, -1); err != nil {
  110. return err
  111. }
  112. l.lockPath = ""
  113. l.seq = 0
  114. return nil
  115. }