cache.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package datastore
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync"
  6. "github.com/docker/libkv/store"
  7. )
  8. type kvMap map[string]KVObject
  9. type cache struct {
  10. sync.Mutex
  11. kmm map[string]kvMap
  12. ds *datastore
  13. }
  14. func newCache(ds *datastore) *cache {
  15. return &cache{kmm: make(map[string]kvMap), ds: ds}
  16. }
  17. func (c *cache) kmap(kvObject KVObject) (kvMap, error) {
  18. var err error
  19. c.Lock()
  20. keyPrefix := Key(kvObject.KeyPrefix()...)
  21. kmap, ok := c.kmm[keyPrefix]
  22. c.Unlock()
  23. if ok {
  24. return kmap, nil
  25. }
  26. kmap = kvMap{}
  27. // Bail out right away if the kvObject does not implement KVConstructor
  28. ctor, ok := kvObject.(KVConstructor)
  29. if !ok {
  30. return nil, errors.New("error while populating kmap, object does not implement KVConstructor interface")
  31. }
  32. kvList, err := c.ds.store.List(keyPrefix)
  33. if err != nil {
  34. if err == store.ErrKeyNotFound {
  35. // If the store doesn't have anything then there is nothing to
  36. // populate in the cache. Just bail out.
  37. goto out
  38. }
  39. return nil, fmt.Errorf("error while populating kmap: %v", err)
  40. }
  41. for _, kvPair := range kvList {
  42. // Ignore empty kvPair values
  43. if len(kvPair.Value) == 0 {
  44. continue
  45. }
  46. dstO := ctor.New()
  47. err = dstO.SetValue(kvPair.Value)
  48. if err != nil {
  49. return nil, err
  50. }
  51. // Make sure the object has a correct view of the DB index in
  52. // case we need to modify it and update the DB.
  53. dstO.SetIndex(kvPair.LastIndex)
  54. kmap[Key(dstO.Key()...)] = dstO
  55. }
  56. out:
  57. // There may multiple go routines racing to fill the
  58. // cache. The one which places the kmap in c.kmm first
  59. // wins. The others should just use what the first populated.
  60. c.Lock()
  61. kmapNew, ok := c.kmm[keyPrefix]
  62. if ok {
  63. c.Unlock()
  64. return kmapNew, nil
  65. }
  66. c.kmm[keyPrefix] = kmap
  67. c.Unlock()
  68. return kmap, nil
  69. }
  70. func (c *cache) add(kvObject KVObject, atomic bool) error {
  71. kmap, err := c.kmap(kvObject)
  72. if err != nil {
  73. return err
  74. }
  75. c.Lock()
  76. // If atomic is true, cache needs to maintain its own index
  77. // for atomicity and the add needs to be atomic.
  78. if atomic {
  79. if prev, ok := kmap[Key(kvObject.Key()...)]; ok {
  80. if prev.Index() != kvObject.Index() {
  81. c.Unlock()
  82. return ErrKeyModified
  83. }
  84. }
  85. // Increment index
  86. index := kvObject.Index()
  87. index++
  88. kvObject.SetIndex(index)
  89. }
  90. kmap[Key(kvObject.Key()...)] = kvObject
  91. c.Unlock()
  92. return nil
  93. }
  94. func (c *cache) del(kvObject KVObject, atomic bool) error {
  95. kmap, err := c.kmap(kvObject)
  96. if err != nil {
  97. return err
  98. }
  99. c.Lock()
  100. // If atomic is true, cache needs to maintain its own index
  101. // for atomicity and del needs to be atomic.
  102. if atomic {
  103. if prev, ok := kmap[Key(kvObject.Key()...)]; ok {
  104. if prev.Index() != kvObject.Index() {
  105. c.Unlock()
  106. return ErrKeyModified
  107. }
  108. }
  109. }
  110. delete(kmap, Key(kvObject.Key()...))
  111. c.Unlock()
  112. return nil
  113. }
  114. func (c *cache) get(key string, kvObject KVObject) error {
  115. kmap, err := c.kmap(kvObject)
  116. if err != nil {
  117. return err
  118. }
  119. c.Lock()
  120. defer c.Unlock()
  121. o, ok := kmap[Key(kvObject.Key()...)]
  122. if !ok {
  123. return ErrKeyNotFound
  124. }
  125. ctor, ok := o.(KVConstructor)
  126. if !ok {
  127. return errors.New("kvobject does not implement KVConstructor interface. could not get object")
  128. }
  129. return ctor.CopyTo(kvObject)
  130. }
  131. func (c *cache) list(kvObject KVObject) ([]KVObject, error) {
  132. kmap, err := c.kmap(kvObject)
  133. if err != nil {
  134. return nil, err
  135. }
  136. c.Lock()
  137. defer c.Unlock()
  138. var kvol []KVObject
  139. for _, v := range kmap {
  140. kvol = append(kvol, v)
  141. }
  142. return kvol, nil
  143. }