123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- package datastore
- import (
- "fmt"
- "sync"
- store "github.com/docker/docker/libnetwork/internal/kvstore"
- )
- type kvMap map[string]KVObject
- type cache struct {
- mu sync.Mutex
- kmm map[string]kvMap
- ds store.Store
- }
- func newCache(ds store.Store) *cache {
- return &cache{kmm: make(map[string]kvMap), ds: ds}
- }
- func (c *cache) kmap(kvObject KVObject) (kvMap, error) {
- var err error
- c.mu.Lock()
- keyPrefix := Key(kvObject.KeyPrefix()...)
- kmap, ok := c.kmm[keyPrefix]
- c.mu.Unlock()
- if ok {
- return kmap, nil
- }
- kmap = kvMap{}
- kvList, err := c.ds.List(keyPrefix)
- if err != nil {
- if err == store.ErrKeyNotFound {
- // If the store doesn't have anything then there is nothing to
- // populate in the cache. Just bail out.
- goto out
- }
- return nil, fmt.Errorf("error while populating kmap: %v", err)
- }
- for _, kvPair := range kvList {
- // Ignore empty kvPair values
- if len(kvPair.Value) == 0 {
- continue
- }
- dstO := kvObject.New()
- err = dstO.SetValue(kvPair.Value)
- if err != nil {
- return nil, err
- }
- // Make sure the object has a correct view of the DB index in
- // case we need to modify it and update the DB.
- dstO.SetIndex(kvPair.LastIndex)
- kmap[Key(dstO.Key()...)] = dstO
- }
- out:
- // There may multiple go routines racing to fill the
- // cache. The one which places the kmap in c.kmm first
- // wins. The others should just use what the first populated.
- c.mu.Lock()
- kmapNew, ok := c.kmm[keyPrefix]
- if ok {
- c.mu.Unlock()
- return kmapNew, nil
- }
- c.kmm[keyPrefix] = kmap
- c.mu.Unlock()
- return kmap, nil
- }
- func (c *cache) add(kvObject KVObject, atomic bool) error {
- kmap, err := c.kmap(kvObject)
- if err != nil {
- return err
- }
- c.mu.Lock()
- // If atomic is true, cache needs to maintain its own index
- // for atomicity and the add needs to be atomic.
- if atomic {
- if prev, ok := kmap[Key(kvObject.Key()...)]; ok {
- if prev.Index() != kvObject.Index() {
- c.mu.Unlock()
- return ErrKeyModified
- }
- }
- // Increment index
- index := kvObject.Index()
- index++
- kvObject.SetIndex(index)
- }
- kmap[Key(kvObject.Key()...)] = kvObject
- c.mu.Unlock()
- return nil
- }
- func (c *cache) del(kvObject KVObject, atomic bool) error {
- kmap, err := c.kmap(kvObject)
- if err != nil {
- return err
- }
- c.mu.Lock()
- // If atomic is true, cache needs to maintain its own index
- // for atomicity and del needs to be atomic.
- if atomic {
- if prev, ok := kmap[Key(kvObject.Key()...)]; ok {
- if prev.Index() != kvObject.Index() {
- c.mu.Unlock()
- return ErrKeyModified
- }
- }
- }
- delete(kmap, Key(kvObject.Key()...))
- c.mu.Unlock()
- return nil
- }
- func (c *cache) get(kvObject KVObject) error {
- kmap, err := c.kmap(kvObject)
- if err != nil {
- return err
- }
- c.mu.Lock()
- defer c.mu.Unlock()
- o, ok := kmap[Key(kvObject.Key()...)]
- if !ok {
- return ErrKeyNotFound
- }
- return o.CopyTo(kvObject)
- }
- func (c *cache) list(kvObject KVObject) ([]KVObject, error) {
- kmap, err := c.kmap(kvObject)
- if err != nil {
- return nil, err
- }
- c.mu.Lock()
- defer c.mu.Unlock()
- var kvol []KVObject
- for _, v := range kmap {
- kvol = append(kvol, v)
- }
- return kvol, nil
- }
|