boltdb.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. package boltdb
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. store "github.com/docker/docker/libnetwork/internal/kvstore"
  12. bolt "go.etcd.io/bbolt"
  13. )
  14. var (
  15. // ErrMultipleEndpointsUnsupported is thrown when multiple endpoints specified for
  16. // BoltDB. Endpoint has to be a local file path
  17. ErrMultipleEndpointsUnsupported = errors.New("boltdb supports one endpoint and should be a file path")
  18. // ErrBoltBucketOptionMissing is thrown when boltBcuket config option is missing
  19. ErrBoltBucketOptionMissing = errors.New("boltBucket config option missing")
  20. )
  21. const filePerm = 0o644
  22. // BoltDB type implements the Store interface
  23. type BoltDB struct {
  24. mu sync.Mutex
  25. client *bolt.DB
  26. boltBucket []byte
  27. dbIndex uint64
  28. path string
  29. timeout time.Duration
  30. // By default libkv opens and closes the bolt DB connection for every
  31. // get/put operation. This allows multiple apps to use a Bolt DB at the
  32. // same time.
  33. // PersistConnection flag provides an option to override ths behavior.
  34. // ie: open the connection in New and use it till Close is called.
  35. PersistConnection bool
  36. }
  37. const (
  38. libkvmetadatalen = 8
  39. transientTimeout = time.Duration(10) * time.Second
  40. )
  41. // New opens a new BoltDB connection to the specified path and bucket
  42. func New(endpoints []string, options *store.Config) (store.Store, error) {
  43. if len(endpoints) > 1 {
  44. return nil, ErrMultipleEndpointsUnsupported
  45. }
  46. if (options == nil) || (len(options.Bucket) == 0) {
  47. return nil, ErrBoltBucketOptionMissing
  48. }
  49. dir, _ := filepath.Split(endpoints[0])
  50. if err := os.MkdirAll(dir, 0o750); err != nil {
  51. return nil, err
  52. }
  53. var db *bolt.DB
  54. if options.PersistConnection {
  55. var err error
  56. db, err = bolt.Open(endpoints[0], filePerm, &bolt.Options{
  57. Timeout: options.ConnectionTimeout,
  58. })
  59. if err != nil {
  60. return nil, err
  61. }
  62. }
  63. timeout := transientTimeout
  64. if options.ConnectionTimeout != 0 {
  65. timeout = options.ConnectionTimeout
  66. }
  67. b := &BoltDB{
  68. client: db,
  69. path: endpoints[0],
  70. boltBucket: []byte(options.Bucket),
  71. timeout: timeout,
  72. PersistConnection: options.PersistConnection,
  73. }
  74. return b, nil
  75. }
  76. func (b *BoltDB) reset() {
  77. b.path = ""
  78. b.boltBucket = []byte{}
  79. }
  80. func (b *BoltDB) getDBhandle() (*bolt.DB, error) {
  81. if !b.PersistConnection {
  82. db, err := bolt.Open(b.path, filePerm, &bolt.Options{Timeout: b.timeout})
  83. if err != nil {
  84. return nil, err
  85. }
  86. b.client = db
  87. }
  88. return b.client, nil
  89. }
  90. func (b *BoltDB) releaseDBhandle() {
  91. if !b.PersistConnection {
  92. b.client.Close()
  93. }
  94. }
  95. // Get the value at "key". BoltDB doesn't provide an inbuilt last modified index with every kv pair. Its implemented by
  96. // by a atomic counter maintained by the libkv and appened to the value passed by the client.
  97. func (b *BoltDB) Get(key string) (*store.KVPair, error) {
  98. b.mu.Lock()
  99. defer b.mu.Unlock()
  100. db, err := b.getDBhandle()
  101. if err != nil {
  102. return nil, err
  103. }
  104. defer b.releaseDBhandle()
  105. var val []byte
  106. err = db.View(func(tx *bolt.Tx) error {
  107. bucket := tx.Bucket(b.boltBucket)
  108. if bucket == nil {
  109. return store.ErrKeyNotFound
  110. }
  111. v := bucket.Get([]byte(key))
  112. val = make([]byte, len(v))
  113. copy(val, v)
  114. return nil
  115. })
  116. if err != nil {
  117. return nil, err
  118. }
  119. if len(val) == 0 {
  120. return nil, store.ErrKeyNotFound
  121. }
  122. dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
  123. val = val[libkvmetadatalen:]
  124. return &store.KVPair{Key: key, Value: val, LastIndex: dbIndex}, nil
  125. }
  126. // Put the key, value pair. index number metadata is prepended to the value
  127. func (b *BoltDB) Put(key string, value []byte) error {
  128. b.mu.Lock()
  129. defer b.mu.Unlock()
  130. db, err := b.getDBhandle()
  131. if err != nil {
  132. return err
  133. }
  134. defer b.releaseDBhandle()
  135. return db.Update(func(tx *bolt.Tx) error {
  136. bucket, err := tx.CreateBucketIfNotExists(b.boltBucket)
  137. if err != nil {
  138. return err
  139. }
  140. dbIndex := atomic.AddUint64(&b.dbIndex, 1)
  141. dbval := make([]byte, libkvmetadatalen)
  142. binary.LittleEndian.PutUint64(dbval, dbIndex)
  143. dbval = append(dbval, value...)
  144. return bucket.Put([]byte(key), dbval)
  145. })
  146. }
  147. // Exists checks if the key exists inside the store
  148. func (b *BoltDB) Exists(key string) (bool, error) {
  149. b.mu.Lock()
  150. defer b.mu.Unlock()
  151. db, err := b.getDBhandle()
  152. if err != nil {
  153. return false, err
  154. }
  155. defer b.releaseDBhandle()
  156. var exists bool
  157. err = db.View(func(tx *bolt.Tx) error {
  158. bucket := tx.Bucket(b.boltBucket)
  159. if bucket == nil {
  160. return store.ErrKeyNotFound
  161. }
  162. exists = len(bucket.Get([]byte(key))) > 0
  163. return nil
  164. })
  165. if err != nil {
  166. return false, err
  167. }
  168. if !exists {
  169. return false, store.ErrKeyNotFound
  170. }
  171. return true, nil
  172. }
  173. // List returns the range of keys starting with the passed in prefix
  174. func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
  175. b.mu.Lock()
  176. defer b.mu.Unlock()
  177. db, err := b.getDBhandle()
  178. if err != nil {
  179. return nil, err
  180. }
  181. defer b.releaseDBhandle()
  182. var kv []*store.KVPair
  183. err = db.View(func(tx *bolt.Tx) error {
  184. bucket := tx.Bucket(b.boltBucket)
  185. if bucket == nil {
  186. return store.ErrKeyNotFound
  187. }
  188. cursor := bucket.Cursor()
  189. prefix := []byte(keyPrefix)
  190. for key, v := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, v = cursor.Next() {
  191. dbIndex := binary.LittleEndian.Uint64(v[:libkvmetadatalen])
  192. v = v[libkvmetadatalen:]
  193. val := make([]byte, len(v))
  194. copy(val, v)
  195. kv = append(kv, &store.KVPair{
  196. Key: string(key),
  197. Value: val,
  198. LastIndex: dbIndex,
  199. })
  200. }
  201. return nil
  202. })
  203. if err != nil {
  204. return nil, err
  205. }
  206. if len(kv) == 0 {
  207. return nil, store.ErrKeyNotFound
  208. }
  209. return kv, nil
  210. }
  211. // AtomicDelete deletes a value at "key" if the key
  212. // has not been modified in the meantime, throws an
  213. // error if this is the case
  214. func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) error {
  215. b.mu.Lock()
  216. defer b.mu.Unlock()
  217. if previous == nil {
  218. return store.ErrPreviousNotSpecified
  219. }
  220. db, err := b.getDBhandle()
  221. if err != nil {
  222. return err
  223. }
  224. defer b.releaseDBhandle()
  225. return db.Update(func(tx *bolt.Tx) error {
  226. bucket := tx.Bucket(b.boltBucket)
  227. if bucket == nil {
  228. return store.ErrKeyNotFound
  229. }
  230. val := bucket.Get([]byte(key))
  231. if val == nil {
  232. return store.ErrKeyNotFound
  233. }
  234. dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
  235. if dbIndex != previous.LastIndex {
  236. return store.ErrKeyModified
  237. }
  238. return bucket.Delete([]byte(key))
  239. })
  240. }
  241. // AtomicPut puts a value at "key" if the key has not been
  242. // modified since the last Put, throws an error if this is the case
  243. func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair) (*store.KVPair, error) {
  244. b.mu.Lock()
  245. defer b.mu.Unlock()
  246. db, err := b.getDBhandle()
  247. if err != nil {
  248. return nil, err
  249. }
  250. defer b.releaseDBhandle()
  251. var dbIndex uint64
  252. dbval := make([]byte, libkvmetadatalen)
  253. err = db.Update(func(tx *bolt.Tx) error {
  254. bucket := tx.Bucket(b.boltBucket)
  255. if bucket == nil {
  256. if previous != nil {
  257. return store.ErrKeyNotFound
  258. }
  259. var err error
  260. bucket, err = tx.CreateBucket(b.boltBucket)
  261. if err != nil {
  262. return err
  263. }
  264. }
  265. // AtomicPut is equivalent to Put if previous is nil and the Ky
  266. // doesn't exist in the DB.
  267. val := bucket.Get([]byte(key))
  268. if previous == nil && len(val) != 0 {
  269. return store.ErrKeyExists
  270. }
  271. if previous != nil {
  272. if len(val) == 0 {
  273. return store.ErrKeyNotFound
  274. }
  275. dbIndex = binary.LittleEndian.Uint64(val[:libkvmetadatalen])
  276. if dbIndex != previous.LastIndex {
  277. return store.ErrKeyModified
  278. }
  279. }
  280. dbIndex = atomic.AddUint64(&b.dbIndex, 1)
  281. binary.LittleEndian.PutUint64(dbval, b.dbIndex)
  282. dbval = append(dbval, value...)
  283. return bucket.Put([]byte(key), dbval)
  284. })
  285. if err != nil {
  286. return nil, err
  287. }
  288. return &store.KVPair{Key: key, Value: value, LastIndex: dbIndex}, nil
  289. }
  290. // Close the db connection to the BoltDB
  291. func (b *BoltDB) Close() {
  292. b.mu.Lock()
  293. defer b.mu.Unlock()
  294. if !b.PersistConnection {
  295. b.reset()
  296. } else {
  297. b.client.Close()
  298. }
  299. }