endpoint_cnt.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package libnetwork
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "sync"
  6. "github.com/docker/libnetwork/datastore"
  7. )
  8. type endpointCnt struct {
  9. n *network
  10. Count uint64
  11. dbIndex uint64
  12. dbExists bool
  13. sync.Mutex
  14. }
  15. const epCntKeyPrefix = "endpoint_count"
  16. func (ec *endpointCnt) Key() []string {
  17. ec.Lock()
  18. defer ec.Unlock()
  19. return []string{epCntKeyPrefix, ec.n.id}
  20. }
  21. func (ec *endpointCnt) KeyPrefix() []string {
  22. ec.Lock()
  23. defer ec.Unlock()
  24. return []string{epCntKeyPrefix, ec.n.id}
  25. }
  26. func (ec *endpointCnt) Value() []byte {
  27. ec.Lock()
  28. defer ec.Unlock()
  29. b, err := json.Marshal(ec)
  30. if err != nil {
  31. return nil
  32. }
  33. return b
  34. }
  35. func (ec *endpointCnt) SetValue(value []byte) error {
  36. ec.Lock()
  37. defer ec.Unlock()
  38. return json.Unmarshal(value, &ec)
  39. }
  40. func (ec *endpointCnt) Index() uint64 {
  41. ec.Lock()
  42. defer ec.Unlock()
  43. return ec.dbIndex
  44. }
  45. func (ec *endpointCnt) SetIndex(index uint64) {
  46. ec.Lock()
  47. ec.dbIndex = index
  48. ec.dbExists = true
  49. ec.Unlock()
  50. }
  51. func (ec *endpointCnt) Exists() bool {
  52. ec.Lock()
  53. defer ec.Unlock()
  54. return ec.dbExists
  55. }
  56. func (ec *endpointCnt) Skip() bool {
  57. ec.Lock()
  58. defer ec.Unlock()
  59. return !ec.n.persist
  60. }
  61. func (ec *endpointCnt) New() datastore.KVObject {
  62. ec.Lock()
  63. defer ec.Unlock()
  64. return &endpointCnt{
  65. n: ec.n,
  66. }
  67. }
  68. func (ec *endpointCnt) CopyTo(o datastore.KVObject) error {
  69. ec.Lock()
  70. defer ec.Unlock()
  71. dstEc := o.(*endpointCnt)
  72. dstEc.n = ec.n
  73. dstEc.Count = ec.Count
  74. dstEc.dbExists = ec.dbExists
  75. dstEc.dbIndex = ec.dbIndex
  76. return nil
  77. }
  78. func (ec *endpointCnt) DataScope() string {
  79. return ec.n.DataScope()
  80. }
  81. func (ec *endpointCnt) EndpointCnt() uint64 {
  82. ec.Lock()
  83. defer ec.Unlock()
  84. return ec.Count
  85. }
  86. func (ec *endpointCnt) updateStore() error {
  87. store := ec.n.getController().getStore(ec.DataScope())
  88. if store == nil {
  89. return fmt.Errorf("store not found for scope %s on endpoint count update", ec.DataScope())
  90. }
  91. // make a copy of count and n to avoid being overwritten by store.GetObject
  92. count := ec.EndpointCnt()
  93. n := ec.n
  94. for {
  95. if err := ec.n.getController().updateToStore(ec); err == nil || err != datastore.ErrKeyModified {
  96. return err
  97. }
  98. if err := store.GetObject(datastore.Key(ec.Key()...), ec); err != nil {
  99. return fmt.Errorf("could not update the kvobject to latest on endpoint count update: %v", err)
  100. }
  101. ec.Lock()
  102. ec.Count = count
  103. ec.n = n
  104. ec.Unlock()
  105. }
  106. }
  107. func (ec *endpointCnt) setCnt(cnt uint64) error {
  108. ec.Lock()
  109. ec.Count = cnt
  110. ec.Unlock()
  111. return ec.updateStore()
  112. }
  113. func (ec *endpointCnt) atomicIncDecEpCnt(inc bool) error {
  114. retry:
  115. ec.Lock()
  116. if inc {
  117. ec.Count++
  118. } else {
  119. if ec.Count > 0 {
  120. ec.Count--
  121. }
  122. }
  123. ec.Unlock()
  124. store := ec.n.getController().getStore(ec.DataScope())
  125. if store == nil {
  126. return fmt.Errorf("store not found for scope %s", ec.DataScope())
  127. }
  128. if err := ec.n.getController().updateToStore(ec); err != nil {
  129. if err == datastore.ErrKeyModified {
  130. if err := store.GetObject(datastore.Key(ec.Key()...), ec); err != nil {
  131. return fmt.Errorf("could not update the kvobject to latest when trying to atomic add endpoint count: %v", err)
  132. }
  133. goto retry
  134. }
  135. return err
  136. }
  137. return nil
  138. }
  139. func (ec *endpointCnt) IncEndpointCnt() error {
  140. return ec.atomicIncDecEpCnt(true)
  141. }
  142. func (ec *endpointCnt) DecEndpointCnt() error {
  143. return ec.atomicIncDecEpCnt(false)
  144. }