boltdb.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. package boltdb
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/boltdb/bolt"
  12. "github.com/docker/libkv"
  13. "github.com/docker/libkv/store"
  14. )
  15. var (
  16. // ErrMultipleEndpointsUnsupported is thrown when multiple endpoints specified for
  17. // BoltDB. Endpoint has to be a local file path
  18. ErrMultipleEndpointsUnsupported = errors.New("boltdb supports one endpoint and should be a file path")
  19. // ErrBoltBucketOptionMissing is thrown when boltBcuket config option is missing
  20. ErrBoltBucketOptionMissing = errors.New("boltBucket config option missing")
  21. )
  22. const (
  23. filePerm os.FileMode = 0644
  24. )
  25. //BoltDB type implements the Store interface
  26. type BoltDB struct {
  27. client *bolt.DB
  28. boltBucket []byte
  29. dbIndex uint64
  30. path string
  31. timeout time.Duration
  32. // By default libkv opens and closes the bolt DB connection for every
  33. // get/put operation. This allows multiple apps to use a Bolt DB at the
  34. // same time.
  35. // PersistConnection flag provides an option to override ths behavior.
  36. // ie: open the connection in New and use it till Close is called.
  37. PersistConnection bool
  38. sync.Mutex
  39. }
  40. const (
  41. libkvmetadatalen = 8
  42. transientTimeout = time.Duration(10) * time.Second
  43. )
  44. // Register registers boltdb to libkv
  45. func Register() {
  46. libkv.AddStore(store.BOLTDB, New)
  47. }
  48. // New opens a new BoltDB connection to the specified path and bucket
  49. func New(endpoints []string, options *store.Config) (store.Store, error) {
  50. var (
  51. db *bolt.DB
  52. err error
  53. boltOptions *bolt.Options
  54. timeout = transientTimeout
  55. )
  56. if len(endpoints) > 1 {
  57. return nil, ErrMultipleEndpointsUnsupported
  58. }
  59. if (options == nil) || (len(options.Bucket) == 0) {
  60. return nil, ErrBoltBucketOptionMissing
  61. }
  62. dir, _ := filepath.Split(endpoints[0])
  63. if err = os.MkdirAll(dir, 0750); err != nil {
  64. return nil, err
  65. }
  66. if options.PersistConnection {
  67. boltOptions = &bolt.Options{Timeout: options.ConnectionTimeout}
  68. db, err = bolt.Open(endpoints[0], filePerm, boltOptions)
  69. if err != nil {
  70. return nil, err
  71. }
  72. }
  73. if options.ConnectionTimeout != 0 {
  74. timeout = options.ConnectionTimeout
  75. }
  76. b := &BoltDB{
  77. client: db,
  78. path: endpoints[0],
  79. boltBucket: []byte(options.Bucket),
  80. timeout: timeout,
  81. PersistConnection: options.PersistConnection,
  82. }
  83. return b, nil
  84. }
  85. func (b *BoltDB) reset() {
  86. b.path = ""
  87. b.boltBucket = []byte{}
  88. }
  89. func (b *BoltDB) getDBhandle() (*bolt.DB, error) {
  90. var (
  91. db *bolt.DB
  92. err error
  93. )
  94. if !b.PersistConnection {
  95. boltOptions := &bolt.Options{Timeout: b.timeout}
  96. if db, err = bolt.Open(b.path, filePerm, boltOptions); err != nil {
  97. return nil, err
  98. }
  99. b.client = db
  100. }
  101. return b.client, nil
  102. }
  103. func (b *BoltDB) releaseDBhandle() {
  104. if !b.PersistConnection {
  105. b.client.Close()
  106. }
  107. }
  108. // Get the value at "key". BoltDB doesn't provide an inbuilt last modified index with every kv pair. Its implemented by
  109. // by a atomic counter maintained by the libkv and appened to the value passed by the client.
  110. func (b *BoltDB) Get(key string) (*store.KVPair, error) {
  111. var (
  112. val []byte
  113. db *bolt.DB
  114. err error
  115. )
  116. b.Lock()
  117. defer b.Unlock()
  118. if db, err = b.getDBhandle(); err != nil {
  119. return nil, err
  120. }
  121. defer b.releaseDBhandle()
  122. err = db.View(func(tx *bolt.Tx) error {
  123. bucket := tx.Bucket(b.boltBucket)
  124. if bucket == nil {
  125. return store.ErrKeyNotFound
  126. }
  127. v := bucket.Get([]byte(key))
  128. val = make([]byte, len(v))
  129. copy(val, v)
  130. return nil
  131. })
  132. if len(val) == 0 {
  133. return nil, store.ErrKeyNotFound
  134. }
  135. if err != nil {
  136. return nil, err
  137. }
  138. dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
  139. val = val[libkvmetadatalen:]
  140. return &store.KVPair{Key: key, Value: val, LastIndex: (dbIndex)}, nil
  141. }
  142. //Put the key, value pair. index number metadata is prepended to the value
  143. func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error {
  144. var (
  145. dbIndex uint64
  146. db *bolt.DB
  147. err error
  148. )
  149. b.Lock()
  150. defer b.Unlock()
  151. dbval := make([]byte, libkvmetadatalen)
  152. if db, err = b.getDBhandle(); err != nil {
  153. return err
  154. }
  155. defer b.releaseDBhandle()
  156. err = db.Update(func(tx *bolt.Tx) error {
  157. bucket, err := tx.CreateBucketIfNotExists(b.boltBucket)
  158. if err != nil {
  159. return err
  160. }
  161. dbIndex = atomic.AddUint64(&b.dbIndex, 1)
  162. binary.LittleEndian.PutUint64(dbval, dbIndex)
  163. dbval = append(dbval, value...)
  164. err = bucket.Put([]byte(key), dbval)
  165. if err != nil {
  166. return err
  167. }
  168. return nil
  169. })
  170. return err
  171. }
  172. //Delete the value for the given key.
  173. func (b *BoltDB) Delete(key string) error {
  174. var (
  175. db *bolt.DB
  176. err error
  177. )
  178. b.Lock()
  179. defer b.Unlock()
  180. if db, err = b.getDBhandle(); err != nil {
  181. return err
  182. }
  183. defer b.releaseDBhandle()
  184. err = db.Update(func(tx *bolt.Tx) error {
  185. bucket := tx.Bucket(b.boltBucket)
  186. if bucket == nil {
  187. return store.ErrKeyNotFound
  188. }
  189. err := bucket.Delete([]byte(key))
  190. return err
  191. })
  192. return err
  193. }
  194. // Exists checks if the key exists inside the store
  195. func (b *BoltDB) Exists(key string) (bool, error) {
  196. var (
  197. val []byte
  198. db *bolt.DB
  199. err error
  200. )
  201. b.Lock()
  202. defer b.Unlock()
  203. if db, err = b.getDBhandle(); err != nil {
  204. return false, err
  205. }
  206. defer b.releaseDBhandle()
  207. err = db.View(func(tx *bolt.Tx) error {
  208. bucket := tx.Bucket(b.boltBucket)
  209. if bucket == nil {
  210. return store.ErrKeyNotFound
  211. }
  212. val = bucket.Get([]byte(key))
  213. return nil
  214. })
  215. if len(val) == 0 {
  216. return false, err
  217. }
  218. return true, err
  219. }
  220. // List returns the range of keys starting with the passed in prefix
  221. func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
  222. var (
  223. db *bolt.DB
  224. err error
  225. )
  226. b.Lock()
  227. defer b.Unlock()
  228. kv := []*store.KVPair{}
  229. if db, err = b.getDBhandle(); err != nil {
  230. return nil, err
  231. }
  232. defer b.releaseDBhandle()
  233. err = db.View(func(tx *bolt.Tx) error {
  234. bucket := tx.Bucket(b.boltBucket)
  235. if bucket == nil {
  236. return store.ErrKeyNotFound
  237. }
  238. cursor := bucket.Cursor()
  239. prefix := []byte(keyPrefix)
  240. for key, v := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, v = cursor.Next() {
  241. dbIndex := binary.LittleEndian.Uint64(v[:libkvmetadatalen])
  242. v = v[libkvmetadatalen:]
  243. val := make([]byte, len(v))
  244. copy(val, v)
  245. kv = append(kv, &store.KVPair{
  246. Key: string(key),
  247. Value: val,
  248. LastIndex: dbIndex,
  249. })
  250. }
  251. return nil
  252. })
  253. if len(kv) == 0 {
  254. return nil, store.ErrKeyNotFound
  255. }
  256. return kv, err
  257. }
  258. // AtomicDelete deletes a value at "key" if the key
  259. // has not been modified in the meantime, throws an
  260. // error if this is the case
  261. func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
  262. var (
  263. val []byte
  264. db *bolt.DB
  265. err error
  266. )
  267. b.Lock()
  268. defer b.Unlock()
  269. if previous == nil {
  270. return false, store.ErrPreviousNotSpecified
  271. }
  272. if db, err = b.getDBhandle(); err != nil {
  273. return false, err
  274. }
  275. defer b.releaseDBhandle()
  276. err = db.Update(func(tx *bolt.Tx) error {
  277. bucket := tx.Bucket(b.boltBucket)
  278. if bucket == nil {
  279. return store.ErrKeyNotFound
  280. }
  281. val = bucket.Get([]byte(key))
  282. if val == nil {
  283. return store.ErrKeyNotFound
  284. }
  285. dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
  286. if dbIndex != previous.LastIndex {
  287. return store.ErrKeyModified
  288. }
  289. err := bucket.Delete([]byte(key))
  290. return err
  291. })
  292. if err != nil {
  293. return false, err
  294. }
  295. return true, err
  296. }
  297. // AtomicPut puts a value at "key" if the key has not been
  298. // modified since the last Put, throws an error if this is the case
  299. func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
  300. var (
  301. val []byte
  302. dbIndex uint64
  303. db *bolt.DB
  304. err error
  305. )
  306. b.Lock()
  307. defer b.Unlock()
  308. dbval := make([]byte, libkvmetadatalen)
  309. if db, err = b.getDBhandle(); err != nil {
  310. return false, nil, err
  311. }
  312. defer b.releaseDBhandle()
  313. err = db.Update(func(tx *bolt.Tx) error {
  314. var err error
  315. bucket := tx.Bucket(b.boltBucket)
  316. if bucket == nil {
  317. if previous != nil {
  318. return store.ErrKeyNotFound
  319. }
  320. bucket, err = tx.CreateBucket(b.boltBucket)
  321. if err != nil {
  322. return err
  323. }
  324. }
  325. // AtomicPut is equivalent to Put if previous is nil and the Ky
  326. // doesn't exist in the DB.
  327. val = bucket.Get([]byte(key))
  328. if previous == nil && len(val) != 0 {
  329. return store.ErrKeyExists
  330. }
  331. if previous != nil {
  332. if len(val) == 0 {
  333. return store.ErrKeyNotFound
  334. }
  335. dbIndex = binary.LittleEndian.Uint64(val[:libkvmetadatalen])
  336. if dbIndex != previous.LastIndex {
  337. return store.ErrKeyModified
  338. }
  339. }
  340. dbIndex = atomic.AddUint64(&b.dbIndex, 1)
  341. binary.LittleEndian.PutUint64(dbval, b.dbIndex)
  342. dbval = append(dbval, value...)
  343. return (bucket.Put([]byte(key), dbval))
  344. })
  345. if err != nil {
  346. return false, nil, err
  347. }
  348. updated := &store.KVPair{
  349. Key: key,
  350. Value: value,
  351. LastIndex: dbIndex,
  352. }
  353. return true, updated, nil
  354. }
  355. // Close the db connection to the BoltDB
  356. func (b *BoltDB) Close() {
  357. b.Lock()
  358. defer b.Unlock()
  359. if !b.PersistConnection {
  360. b.reset()
  361. } else {
  362. b.client.Close()
  363. }
  364. return
  365. }
  366. // DeleteTree deletes a range of keys with a given prefix
  367. func (b *BoltDB) DeleteTree(keyPrefix string) error {
  368. var (
  369. db *bolt.DB
  370. err error
  371. )
  372. b.Lock()
  373. defer b.Unlock()
  374. if db, err = b.getDBhandle(); err != nil {
  375. return err
  376. }
  377. defer b.releaseDBhandle()
  378. err = db.Update(func(tx *bolt.Tx) error {
  379. bucket := tx.Bucket(b.boltBucket)
  380. if bucket == nil {
  381. return store.ErrKeyNotFound
  382. }
  383. cursor := bucket.Cursor()
  384. prefix := []byte(keyPrefix)
  385. for key, _ := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, _ = cursor.Next() {
  386. _ = bucket.Delete([]byte(key))
  387. }
  388. return nil
  389. })
  390. return err
  391. }
  392. // NewLock has to implemented at the library level since its not supported by BoltDB
  393. func (b *BoltDB) NewLock(key string, options *store.LockOptions) (store.Locker, error) {
  394. return nil, store.ErrCallNotSupported
  395. }
  396. // Watch has to implemented at the library level since its not supported by BoltDB
  397. func (b *BoltDB) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
  398. return nil, store.ErrCallNotSupported
  399. }
  400. // WatchTree has to implemented at the library level since its not supported by BoltDB
  401. func (b *BoltDB) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
  402. return nil, store.ErrCallNotSupported
  403. }