endpoint_cnt.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package libnetwork
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "sync"
  6. "github.com/docker/docker/libnetwork/datastore"
  7. )
  8. type endpointCnt struct {
  9. n *Network
  10. Count uint64
  11. dbIndex uint64
  12. dbExists bool
  13. sync.Mutex
  14. }
  15. const epCntKeyPrefix = "endpoint_count"
  16. func (ec *endpointCnt) Key() []string {
  17. ec.Lock()
  18. defer ec.Unlock()
  19. return []string{epCntKeyPrefix, ec.n.id}
  20. }
  21. func (ec *endpointCnt) KeyPrefix() []string {
  22. ec.Lock()
  23. defer ec.Unlock()
  24. return []string{epCntKeyPrefix, ec.n.id}
  25. }
  26. func (ec *endpointCnt) Value() []byte {
  27. ec.Lock()
  28. defer ec.Unlock()
  29. b, err := json.Marshal(ec)
  30. if err != nil {
  31. return nil
  32. }
  33. return b
  34. }
  35. func (ec *endpointCnt) SetValue(value []byte) error {
  36. ec.Lock()
  37. defer ec.Unlock()
  38. return json.Unmarshal(value, &ec)
  39. }
  40. func (ec *endpointCnt) Index() uint64 {
  41. ec.Lock()
  42. defer ec.Unlock()
  43. return ec.dbIndex
  44. }
  45. func (ec *endpointCnt) SetIndex(index uint64) {
  46. ec.Lock()
  47. ec.dbIndex = index
  48. ec.dbExists = true
  49. ec.Unlock()
  50. }
  51. func (ec *endpointCnt) Exists() bool {
  52. ec.Lock()
  53. defer ec.Unlock()
  54. return ec.dbExists
  55. }
  56. func (ec *endpointCnt) Skip() bool {
  57. ec.Lock()
  58. defer ec.Unlock()
  59. return !ec.n.persist
  60. }
  61. func (ec *endpointCnt) New() datastore.KVObject {
  62. ec.Lock()
  63. defer ec.Unlock()
  64. return &endpointCnt{
  65. n: ec.n,
  66. }
  67. }
  68. func (ec *endpointCnt) CopyTo(o datastore.KVObject) error {
  69. ec.Lock()
  70. defer ec.Unlock()
  71. dstEc := o.(*endpointCnt)
  72. dstEc.n = ec.n
  73. dstEc.Count = ec.Count
  74. dstEc.dbExists = ec.dbExists
  75. dstEc.dbIndex = ec.dbIndex
  76. return nil
  77. }
  78. func (ec *endpointCnt) DataScope() string {
  79. return ec.n.DataScope()
  80. }
  81. func (ec *endpointCnt) EndpointCnt() uint64 {
  82. ec.Lock()
  83. defer ec.Unlock()
  84. return ec.Count
  85. }
  86. func (ec *endpointCnt) updateStore() error {
  87. store := ec.n.getController().getStore()
  88. if store == nil {
  89. return fmt.Errorf("store not found for scope %s on endpoint count update", ec.DataScope())
  90. }
  91. // make a copy of count and n to avoid being overwritten by store.GetObject
  92. count := ec.EndpointCnt()
  93. n := ec.n
  94. for {
  95. if err := ec.n.getController().updateToStore(ec); err == nil || err != datastore.ErrKeyModified {
  96. return err
  97. }
  98. if err := store.GetObject(datastore.Key(ec.Key()...), ec); err != nil {
  99. return fmt.Errorf("could not update the kvobject to latest on endpoint count update: %v", err)
  100. }
  101. ec.Lock()
  102. ec.Count = count
  103. ec.n = n
  104. ec.Unlock()
  105. }
  106. }
  107. func (ec *endpointCnt) setCnt(cnt uint64) error {
  108. ec.Lock()
  109. ec.Count = cnt
  110. ec.Unlock()
  111. return ec.updateStore()
  112. }
  113. func (ec *endpointCnt) atomicIncDecEpCnt(inc bool) error {
  114. store := ec.n.getController().getStore()
  115. if store == nil {
  116. return fmt.Errorf("store not found for scope %s", ec.DataScope())
  117. }
  118. tmp := &endpointCnt{n: ec.n}
  119. if err := store.GetObject(datastore.Key(ec.Key()...), tmp); err != nil {
  120. return err
  121. }
  122. retry:
  123. ec.Lock()
  124. if inc {
  125. ec.Count++
  126. } else {
  127. if ec.Count > 0 {
  128. ec.Count--
  129. }
  130. }
  131. ec.Unlock()
  132. if err := ec.n.getController().updateToStore(ec); err != nil {
  133. if err == datastore.ErrKeyModified {
  134. if err := store.GetObject(datastore.Key(ec.Key()...), ec); err != nil {
  135. return fmt.Errorf("could not update the kvobject to latest when trying to atomic add endpoint count: %v", err)
  136. }
  137. goto retry
  138. }
  139. return err
  140. }
  141. return nil
  142. }
  143. func (ec *endpointCnt) IncEndpointCnt() error {
  144. return ec.atomicIncDecEpCnt(true)
  145. }
  146. func (ec *endpointCnt) DecEndpointCnt() error {
  147. return ec.atomicIncDecEpCnt(false)
  148. }