datastore.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package datastore
  2. import (
  3. "reflect"
  4. "strings"
  5. "github.com/docker/libkv"
  6. "github.com/docker/libkv/store"
  7. "github.com/docker/libkv/store/consul"
  8. "github.com/docker/libkv/store/etcd"
  9. "github.com/docker/libkv/store/zookeeper"
  10. "github.com/docker/libnetwork/config"
  11. "github.com/docker/libnetwork/types"
  12. )
  13. //DataStore exported
  14. type DataStore interface {
  15. // GetObject gets data from datastore and unmarshals to the specified object
  16. GetObject(key string, o KV) error
  17. // PutObject adds a new Record based on an object into the datastore
  18. PutObject(kvObject KV) error
  19. // PutObjectAtomic provides an atomic add and update operation for a Record
  20. PutObjectAtomic(kvObject KV) error
  21. // DeleteObject deletes a record
  22. DeleteObject(kvObject KV) error
  23. // DeleteObjectAtomic performs an atomic delete operation
  24. DeleteObjectAtomic(kvObject KV) error
  25. // DeleteTree deletes a record
  26. DeleteTree(kvObject KV) error
  27. // KVStore returns access to the KV Store
  28. KVStore() store.Store
  29. }
  30. // ErrKeyModified is raised for an atomic update when the update is working on a stale state
  31. var (
  32. ErrKeyModified = store.ErrKeyModified
  33. ErrKeyNotFound = store.ErrKeyNotFound
  34. )
  35. type datastore struct {
  36. store store.Store
  37. }
  38. //KV Key Value interface used by objects to be part of the DataStore
  39. type KV interface {
  40. // Key method lets an object to provide the Key to be used in KV Store
  41. Key() []string
  42. // KeyPrefix method lets an object to return immediate parent key that can be used for tree walk
  43. KeyPrefix() []string
  44. // Value method lets an object to marshal its content to be stored in the KV store
  45. Value() []byte
  46. // SetValue is used by the datastore to set the object's value when loaded from the data store.
  47. SetValue([]byte) error
  48. // Index method returns the latest DB Index as seen by the object
  49. Index() uint64
  50. // SetIndex method allows the datastore to store the latest DB Index into the object
  51. SetIndex(uint64)
  52. // True if the object exists in the datastore, false if it hasn't been stored yet.
  53. // When SetIndex() is called, the object has been stored.
  54. Exists() bool
  55. }
  56. const (
  57. // NetworkKeyPrefix is the prefix for network key in the kv store
  58. NetworkKeyPrefix = "network"
  59. // EndpointKeyPrefix is the prefix for endpoint key in the kv store
  60. EndpointKeyPrefix = "endpoint"
  61. )
  62. var rootChain = []string{"docker", "libnetwork"}
  63. func init() {
  64. consul.Register()
  65. zookeeper.Register()
  66. etcd.Register()
  67. }
  68. //Key provides convenient method to create a Key
  69. func Key(key ...string) string {
  70. keychain := append(rootChain, key...)
  71. str := strings.Join(keychain, "/")
  72. return str + "/"
  73. }
  74. //ParseKey provides convenient method to unpack the key to complement the Key function
  75. func ParseKey(key string) ([]string, error) {
  76. chain := strings.Split(strings.Trim(key, "/"), "/")
  77. // The key must atleast be equal to the rootChain in order to be considered as valid
  78. if len(chain) <= len(rootChain) || !reflect.DeepEqual(chain[0:len(rootChain)], rootChain) {
  79. return nil, types.BadRequestErrorf("invalid Key : %s", key)
  80. }
  81. return chain[len(rootChain):], nil
  82. }
  83. // newClient used to connect to KV Store
  84. func newClient(kv string, addrs string) (DataStore, error) {
  85. store, err := libkv.NewStore(store.Backend(kv), []string{addrs}, &store.Config{})
  86. if err != nil {
  87. return nil, err
  88. }
  89. ds := &datastore{store: store}
  90. return ds, nil
  91. }
  92. // NewDataStore creates a new instance of LibKV data store
  93. func NewDataStore(cfg *config.DatastoreCfg) (DataStore, error) {
  94. if cfg == nil {
  95. return nil, types.BadRequestErrorf("invalid configuration passed to datastore")
  96. }
  97. // TODO : cfg.Embedded case
  98. return newClient(cfg.Client.Provider, cfg.Client.Address)
  99. }
  100. // NewCustomDataStore can be used by clients to plugin cusom datatore that adhers to store.Store
  101. func NewCustomDataStore(customStore store.Store) DataStore {
  102. return &datastore{store: customStore}
  103. }
  104. func (ds *datastore) KVStore() store.Store {
  105. return ds.store
  106. }
  107. // PutObjectAtomic adds a new Record based on an object into the datastore
  108. func (ds *datastore) PutObjectAtomic(kvObject KV) error {
  109. if kvObject == nil {
  110. return types.BadRequestErrorf("invalid KV Object : nil")
  111. }
  112. kvObjValue := kvObject.Value()
  113. if kvObjValue == nil {
  114. return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
  115. }
  116. var previous *store.KVPair
  117. if kvObject.Exists() {
  118. previous = &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
  119. } else {
  120. previous = nil
  121. }
  122. _, pair, err := ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous, nil)
  123. if err != nil {
  124. return err
  125. }
  126. kvObject.SetIndex(pair.LastIndex)
  127. return nil
  128. }
  129. // PutObject adds a new Record based on an object into the datastore
  130. func (ds *datastore) PutObject(kvObject KV) error {
  131. if kvObject == nil {
  132. return types.BadRequestErrorf("invalid KV Object : nil")
  133. }
  134. return ds.putObjectWithKey(kvObject, kvObject.Key()...)
  135. }
  136. func (ds *datastore) putObjectWithKey(kvObject KV, key ...string) error {
  137. kvObjValue := kvObject.Value()
  138. if kvObjValue == nil {
  139. return types.BadRequestErrorf("invalid KV Object with a nil Value for key %s", Key(kvObject.Key()...))
  140. }
  141. return ds.store.Put(Key(key...), kvObjValue, nil)
  142. }
  143. // GetObject returns a record matching the key
  144. func (ds *datastore) GetObject(key string, o KV) error {
  145. kvPair, err := ds.store.Get(key)
  146. if err != nil {
  147. return err
  148. }
  149. err = o.SetValue(kvPair.Value)
  150. if err != nil {
  151. return err
  152. }
  153. // Make sure the object has a correct view of the DB index in case we need to modify it
  154. // and update the DB.
  155. o.SetIndex(kvPair.LastIndex)
  156. return nil
  157. }
  158. // DeleteObject unconditionally deletes a record from the store
  159. func (ds *datastore) DeleteObject(kvObject KV) error {
  160. return ds.store.Delete(Key(kvObject.Key()...))
  161. }
  162. // DeleteObjectAtomic performs atomic delete on a record
  163. func (ds *datastore) DeleteObjectAtomic(kvObject KV) error {
  164. if kvObject == nil {
  165. return types.BadRequestErrorf("invalid KV Object : nil")
  166. }
  167. previous := &store.KVPair{Key: Key(kvObject.Key()...), LastIndex: kvObject.Index()}
  168. _, err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous)
  169. return err
  170. }
  171. // DeleteTree unconditionally deletes a record from the store
  172. func (ds *datastore) DeleteTree(kvObject KV) error {
  173. return ds.store.DeleteTree(Key(kvObject.KeyPrefix()...))
  174. }