lock.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package zk
  2. import (
  3. "errors"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. )
  8. var (
  9. // ErrDeadlock is returned by Lock when trying to lock twice without unlocking first
  10. ErrDeadlock = errors.New("zk: trying to acquire a lock twice")
  11. // ErrNotLocked is returned by Unlock when trying to release a lock that has not first be acquired.
  12. ErrNotLocked = errors.New("zk: not locked")
  13. )
  14. // Lock is a mutual exclusion lock.
  15. type Lock struct {
  16. c *Conn
  17. path string
  18. acl []ACL
  19. lockPath string
  20. seq int
  21. }
  22. // NewLock creates a new lock instance using the provided connection, path, and acl.
  23. // The path must be a node that is only used by this lock. A lock instances starts
  24. // unlocked until Lock() is called.
  25. func NewLock(c *Conn, path string, acl []ACL) *Lock {
  26. return &Lock{
  27. c: c,
  28. path: path,
  29. acl: acl,
  30. }
  31. }
  32. func parseSeq(path string) (int, error) {
  33. parts := strings.Split(path, "-")
  34. return strconv.Atoi(parts[len(parts)-1])
  35. }
  36. // Lock attempts to acquire the lock. It will wait to return until the lock
  37. // is acquired or an error occurs. If this instance already has the lock
  38. // then ErrDeadlock is returned.
  39. func (l *Lock) Lock() error {
  40. if l.lockPath != "" {
  41. return ErrDeadlock
  42. }
  43. prefix := fmt.Sprintf("%s/lock-", l.path)
  44. path := ""
  45. var err error
  46. for i := 0; i < 3; i++ {
  47. path, err = l.c.CreateProtectedEphemeralSequential(prefix, []byte{}, l.acl)
  48. if err == ErrNoNode {
  49. // Create parent node.
  50. parts := strings.Split(l.path, "/")
  51. pth := ""
  52. for _, p := range parts[1:] {
  53. var exists bool
  54. pth += "/" + p
  55. exists, _, err = l.c.Exists(pth)
  56. if err != nil {
  57. return err
  58. }
  59. if exists == true {
  60. continue
  61. }
  62. _, err = l.c.Create(pth, []byte{}, 0, l.acl)
  63. if err != nil && err != ErrNodeExists {
  64. return err
  65. }
  66. }
  67. } else if err == nil {
  68. break
  69. } else {
  70. return err
  71. }
  72. }
  73. if err != nil {
  74. return err
  75. }
  76. seq, err := parseSeq(path)
  77. if err != nil {
  78. return err
  79. }
  80. for {
  81. children, _, err := l.c.Children(l.path)
  82. if err != nil {
  83. return err
  84. }
  85. lowestSeq := seq
  86. prevSeq := -1
  87. prevSeqPath := ""
  88. for _, p := range children {
  89. s, err := parseSeq(p)
  90. if err != nil {
  91. return err
  92. }
  93. if s < lowestSeq {
  94. lowestSeq = s
  95. }
  96. if s < seq && s > prevSeq {
  97. prevSeq = s
  98. prevSeqPath = p
  99. }
  100. }
  101. if seq == lowestSeq {
  102. // Acquired the lock
  103. break
  104. }
  105. // Wait on the node next in line for the lock
  106. _, _, ch, err := l.c.GetW(l.path + "/" + prevSeqPath)
  107. if err != nil && err != ErrNoNode {
  108. return err
  109. } else if err != nil && err == ErrNoNode {
  110. // try again
  111. continue
  112. }
  113. ev := <-ch
  114. if ev.Err != nil {
  115. return ev.Err
  116. }
  117. }
  118. l.seq = seq
  119. l.lockPath = path
  120. return nil
  121. }
  122. // Unlock releases an acquired lock. If the lock is not currently acquired by
  123. // this Lock instance than ErrNotLocked is returned.
  124. func (l *Lock) Unlock() error {
  125. if l.lockPath == "" {
  126. return ErrNotLocked
  127. }
  128. if err := l.c.Delete(l.lockPath, -1); err != nil {
  129. return err
  130. }
  131. l.lockPath = ""
  132. l.seq = 0
  133. return nil
  134. }