Synchronize datastore apis

Currently there are 3 distinct operations performed by
datastore
   - Pushing the data to the store
   - Updating the Index of the local object
   - Updating the cache (in case of localscope)

Without a lock racing datastore api calls can interleave
in various surprising ways. Best thing is to keep these
3 above operation inseparable. Use a datastore lock to
achieve this.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
This commit is contained in:
Jana Radhakrishnan 2015-10-11 18:51:10 -07:00
parent d751a1d35e
commit a6c2dd75b5

View file

@ -5,6 +5,7 @@ import (
"log"
"reflect"
"strings"
"sync"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
@ -55,6 +56,7 @@ type datastore struct {
scope string
store store.Store
cache *cache
sync.Mutex
}
// KVObject is Key/Value interface used by objects to be part of the DataStore
@ -287,6 +289,8 @@ func (ds *datastore) PutObjectAtomic(kvObject KVObject) error {
pair *store.KVPair
err error
)
ds.Lock()
defer ds.Unlock()
if kvObject == nil {
return types.BadRequestErrorf("invalid KV Object : nil")
@ -325,6 +329,9 @@ add_cache:
// PutObject adds a new Record based on an object into the datastore
func (ds *datastore) PutObject(kvObject KVObject) error {
ds.Lock()
defer ds.Unlock()
if kvObject == nil {
return types.BadRequestErrorf("invalid KV Object : nil")
}
@ -356,6 +363,9 @@ func (ds *datastore) putObjectWithKey(kvObject KVObject, key ...string) error {
// GetObject returns a record matching the key
func (ds *datastore) GetObject(key string, o KVObject) error {
ds.Lock()
defer ds.Unlock()
if ds.cache != nil {
return ds.cache.get(key, o)
}
@ -387,6 +397,9 @@ func (ds *datastore) ensureKey(key string) error {
}
func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) {
ds.Lock()
defer ds.Unlock()
if ds.cache != nil {
return ds.cache.list(kvObject)
}
@ -430,6 +443,9 @@ func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) {
// DeleteObject unconditionally deletes a record from the store
func (ds *datastore) DeleteObject(kvObject KVObject) error {
ds.Lock()
defer ds.Unlock()
// cleaup the cache first
if ds.cache != nil {
ds.cache.del(kvObject)
@ -444,6 +460,9 @@ func (ds *datastore) DeleteObject(kvObject KVObject) error {
// DeleteObjectAtomic performs atomic delete on a record
func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error {
ds.Lock()
defer ds.Unlock()
if kvObject == nil {
return types.BadRequestErrorf("invalid KV Object : nil")
}
@ -469,6 +488,9 @@ del_cache:
// DeleteTree unconditionally deletes a record from the store
func (ds *datastore) DeleteTree(kvObject KVObject) error {
ds.Lock()
defer ds.Unlock()
// cleaup the cache first
if ds.cache != nil {
ds.cache.del(kvObject)