endpoint_cnt.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package libnetwork
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "sync"
  6. "github.com/docker/docker/libnetwork/datastore"
  7. )
  8. type endpointCnt struct {
  9. n *Network
  10. Count uint64
  11. dbIndex uint64
  12. dbExists bool
  13. sync.Mutex
  14. }
  15. const epCntKeyPrefix = "endpoint_count"
  16. func (ec *endpointCnt) Key() []string {
  17. ec.Lock()
  18. defer ec.Unlock()
  19. return []string{epCntKeyPrefix, ec.n.id}
  20. }
  21. func (ec *endpointCnt) KeyPrefix() []string {
  22. ec.Lock()
  23. defer ec.Unlock()
  24. return []string{epCntKeyPrefix, ec.n.id}
  25. }
  26. func (ec *endpointCnt) Value() []byte {
  27. ec.Lock()
  28. defer ec.Unlock()
  29. b, err := json.Marshal(ec)
  30. if err != nil {
  31. return nil
  32. }
  33. return b
  34. }
  35. func (ec *endpointCnt) SetValue(value []byte) error {
  36. ec.Lock()
  37. defer ec.Unlock()
  38. return json.Unmarshal(value, &ec)
  39. }
  40. func (ec *endpointCnt) Index() uint64 {
  41. ec.Lock()
  42. defer ec.Unlock()
  43. return ec.dbIndex
  44. }
  45. func (ec *endpointCnt) SetIndex(index uint64) {
  46. ec.Lock()
  47. ec.dbIndex = index
  48. ec.dbExists = true
  49. ec.Unlock()
  50. }
  51. func (ec *endpointCnt) Exists() bool {
  52. ec.Lock()
  53. defer ec.Unlock()
  54. return ec.dbExists
  55. }
  56. func (ec *endpointCnt) Skip() bool {
  57. ec.Lock()
  58. defer ec.Unlock()
  59. return !ec.n.persist
  60. }
  61. func (ec *endpointCnt) New() datastore.KVObject {
  62. ec.Lock()
  63. defer ec.Unlock()
  64. return &endpointCnt{
  65. n: ec.n,
  66. }
  67. }
  68. func (ec *endpointCnt) CopyTo(o datastore.KVObject) error {
  69. ec.Lock()
  70. defer ec.Unlock()
  71. dstEc := o.(*endpointCnt)
  72. dstEc.n = ec.n
  73. dstEc.Count = ec.Count
  74. dstEc.dbExists = ec.dbExists
  75. dstEc.dbIndex = ec.dbIndex
  76. return nil
  77. }
  78. func (ec *endpointCnt) EndpointCnt() uint64 {
  79. ec.Lock()
  80. defer ec.Unlock()
  81. return ec.Count
  82. }
  83. func (ec *endpointCnt) updateStore() error {
  84. store := ec.n.getController().getStore()
  85. if store == nil {
  86. return fmt.Errorf("store not found on endpoint count update")
  87. }
  88. // make a copy of count and n to avoid being overwritten by store.GetObject
  89. count := ec.EndpointCnt()
  90. n := ec.n
  91. for {
  92. if err := ec.n.getController().updateToStore(ec); err == nil || err != datastore.ErrKeyModified {
  93. return err
  94. }
  95. if err := store.GetObject(datastore.Key(ec.Key()...), ec); err != nil {
  96. return fmt.Errorf("could not update the kvobject to latest on endpoint count update: %v", err)
  97. }
  98. ec.Lock()
  99. ec.Count = count
  100. ec.n = n
  101. ec.Unlock()
  102. }
  103. }
  104. func (ec *endpointCnt) setCnt(cnt uint64) error {
  105. ec.Lock()
  106. ec.Count = cnt
  107. ec.Unlock()
  108. return ec.updateStore()
  109. }
  110. func (ec *endpointCnt) atomicIncDecEpCnt(inc bool) error {
  111. store := ec.n.getController().getStore()
  112. if store == nil {
  113. return fmt.Errorf("store not found on endpoint count atomic inc/dec")
  114. }
  115. tmp := &endpointCnt{n: ec.n}
  116. if err := store.GetObject(datastore.Key(ec.Key()...), tmp); err != nil {
  117. return err
  118. }
  119. retry:
  120. ec.Lock()
  121. if inc {
  122. ec.Count++
  123. } else {
  124. if ec.Count > 0 {
  125. ec.Count--
  126. }
  127. }
  128. ec.Unlock()
  129. if err := ec.n.getController().updateToStore(ec); err != nil {
  130. if err == datastore.ErrKeyModified {
  131. if err := store.GetObject(datastore.Key(ec.Key()...), ec); err != nil {
  132. return fmt.Errorf("could not update the kvobject to latest when trying to atomic add endpoint count: %v", err)
  133. }
  134. goto retry
  135. }
  136. return err
  137. }
  138. return nil
  139. }
  140. func (ec *endpointCnt) IncEndpointCnt() error {
  141. return ec.atomicIncDecEpCnt(true)
  142. }
  143. func (ec *endpointCnt) DecEndpointCnt() error {
  144. return ec.atomicIncDecEpCnt(false)
  145. }