浏览代码

Sequence non-persistent objects in cache

Since the datastore interface is common for persistent and
non-persistent objects we need to provide the same kind of sequencing
and atomicity guarantess to non-persistent data operations as we do for
persistent operations. So added sequencing and atomicity checks in the
data cache layer.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
Jana Radhakrishnan 9 年之前
父节点
当前提交
1272f90eae
共有 2 个文件被更改,包括 44 次插入7 次删除
  1. 29 2
      libnetwork/datastore/cache.go
  2. 15 5
      libnetwork/datastore/datastore.go

+ 29 - 2
libnetwork/datastore/cache.go

@@ -86,25 +86,52 @@ out:
 	return kmap, nil
 }
 
-func (c *cache) add(kvObject KVObject) error {
+func (c *cache) add(kvObject KVObject, atomic bool) error {
 	kmap, err := c.kmap(kvObject)
 	if err != nil {
 		return err
 	}
 
 	c.Lock()
+	// If atomic is true, cache needs to maintain its own index
+	// for atomicity and the add needs to be atomic.
+	if atomic {
+		if prev, ok := kmap[Key(kvObject.Key()...)]; ok {
+			if prev.Index() != kvObject.Index() {
+				c.Unlock()
+				return ErrKeyModified
+			}
+		}
+
+		// Increment index
+		index := kvObject.Index()
+		index++
+		kvObject.SetIndex(index)
+	}
+
 	kmap[Key(kvObject.Key()...)] = kvObject
 	c.Unlock()
 	return nil
 }
 
-func (c *cache) del(kvObject KVObject) error {
+func (c *cache) del(kvObject KVObject, atomic bool) error {
 	kmap, err := c.kmap(kvObject)
 	if err != nil {
 		return err
 	}
 
 	c.Lock()
+	// If atomic is true, cache needs to maintain its own index
+	// for atomicity and del needs to be atomic.
+	if atomic {
+		if prev, ok := kmap[Key(kvObject.Key()...)]; ok {
+			if prev.Index() != kvObject.Index() {
+				c.Unlock()
+				return ErrKeyModified
+			}
+		}
+	}
+
 	delete(kmap, Key(kvObject.Key()...))
 	c.Unlock()
 	return nil

+ 15 - 5
libnetwork/datastore/datastore.go

@@ -410,7 +410,9 @@ func (ds *datastore) PutObjectAtomic(kvObject KVObject) error {
 
 add_cache:
 	if ds.cache != nil {
-		return ds.cache.add(kvObject)
+		// If persistent store is skipped, sequencing needs to
+		// happen in cache.
+		return ds.cache.add(kvObject, kvObject.Skip())
 	}
 
 	return nil
@@ -435,7 +437,9 @@ func (ds *datastore) PutObject(kvObject KVObject) error {
 
 add_cache:
 	if ds.cache != nil {
-		return ds.cache.add(kvObject)
+		// If persistent store is skipped, sequencing needs to
+		// happen in cache.
+		return ds.cache.add(kvObject, kvObject.Skip())
 	}
 
 	return nil
@@ -537,7 +541,9 @@ func (ds *datastore) DeleteObject(kvObject KVObject) error {
 
 	// cleaup the cache first
 	if ds.cache != nil {
-		ds.cache.del(kvObject)
+		// If persistent store is skipped, sequencing needs to
+		// happen in cache.
+		ds.cache.del(kvObject, kvObject.Skip())
 	}
 
 	if kvObject.Skip() {
@@ -572,7 +578,9 @@ func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error {
 del_cache:
 	// cleanup the cache only if AtomicDelete went through successfully
 	if ds.cache != nil {
-		return ds.cache.del(kvObject)
+		// If persistent store is skipped, sequencing needs to
+		// happen in cache.
+		return ds.cache.del(kvObject, kvObject.Skip())
 	}
 
 	return nil
@@ -585,7 +593,9 @@ func (ds *datastore) DeleteTree(kvObject KVObject) error {
 
 	// cleaup the cache first
 	if ds.cache != nil {
-		ds.cache.del(kvObject)
+		// If persistent store is skipped, sequencing needs to
+		// happen in cache.
+		ds.cache.del(kvObject, kvObject.Skip())
 	}
 
 	if kvObject.Skip() {