boltdb.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. package boltdb
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. store "github.com/docker/docker/libnetwork/internal/kvstore"
  12. bolt "go.etcd.io/bbolt"
  13. )
  14. var (
  15. // ErrMultipleEndpointsUnsupported is thrown when multiple endpoints specified for
  16. // BoltDB. Endpoint has to be a local file path
  17. ErrMultipleEndpointsUnsupported = errors.New("boltdb supports one endpoint and should be a file path")
  18. // ErrBoltBucketOptionMissing is thrown when boltBcuket config option is missing
  19. ErrBoltBucketOptionMissing = errors.New("boltBucket config option missing")
  20. )
  21. const filePerm = 0o644
  22. // BoltDB type implements the Store interface
  23. type BoltDB struct {
  24. mu sync.Mutex
  25. client *bolt.DB
  26. boltBucket []byte
  27. dbIndex uint64
  28. path string
  29. timeout time.Duration
  30. // By default libkv opens and closes the bolt DB connection for every
  31. // get/put operation. This allows multiple apps to use a Bolt DB at the
  32. // same time.
  33. // PersistConnection flag provides an option to override ths behavior.
  34. // ie: open the connection in New and use it till Close is called.
  35. PersistConnection bool
  36. }
  37. const (
  38. libkvmetadatalen = 8
  39. transientTimeout = time.Duration(10) * time.Second
  40. )
  41. // New opens a new BoltDB connection to the specified path and bucket
  42. func New(endpoints []string, options *store.Config) (store.Store, error) {
  43. if len(endpoints) > 1 {
  44. return nil, ErrMultipleEndpointsUnsupported
  45. }
  46. if (options == nil) || (len(options.Bucket) == 0) {
  47. return nil, ErrBoltBucketOptionMissing
  48. }
  49. dir, _ := filepath.Split(endpoints[0])
  50. if err := os.MkdirAll(dir, 0o750); err != nil {
  51. return nil, err
  52. }
  53. var db *bolt.DB
  54. if options.PersistConnection {
  55. var err error
  56. db, err = bolt.Open(endpoints[0], filePerm, &bolt.Options{
  57. Timeout: options.ConnectionTimeout,
  58. })
  59. if err != nil {
  60. return nil, err
  61. }
  62. }
  63. timeout := transientTimeout
  64. if options.ConnectionTimeout != 0 {
  65. timeout = options.ConnectionTimeout
  66. }
  67. b := &BoltDB{
  68. client: db,
  69. path: endpoints[0],
  70. boltBucket: []byte(options.Bucket),
  71. timeout: timeout,
  72. PersistConnection: options.PersistConnection,
  73. }
  74. return b, nil
  75. }
  76. func (b *BoltDB) reset() {
  77. b.path = ""
  78. b.boltBucket = []byte{}
  79. }
  80. func (b *BoltDB) getDBhandle() (*bolt.DB, error) {
  81. if !b.PersistConnection {
  82. db, err := bolt.Open(b.path, filePerm, &bolt.Options{Timeout: b.timeout})
  83. if err != nil {
  84. return nil, err
  85. }
  86. b.client = db
  87. }
  88. return b.client, nil
  89. }
  90. func (b *BoltDB) releaseDBhandle() {
  91. if !b.PersistConnection {
  92. b.client.Close()
  93. }
  94. }
  95. // Put the key, value pair. index number metadata is prepended to the value
  96. func (b *BoltDB) Put(key string, value []byte) error {
  97. b.mu.Lock()
  98. defer b.mu.Unlock()
  99. db, err := b.getDBhandle()
  100. if err != nil {
  101. return err
  102. }
  103. defer b.releaseDBhandle()
  104. return db.Update(func(tx *bolt.Tx) error {
  105. bucket, err := tx.CreateBucketIfNotExists(b.boltBucket)
  106. if err != nil {
  107. return err
  108. }
  109. dbIndex := atomic.AddUint64(&b.dbIndex, 1)
  110. dbval := make([]byte, libkvmetadatalen)
  111. binary.LittleEndian.PutUint64(dbval, dbIndex)
  112. dbval = append(dbval, value...)
  113. return bucket.Put([]byte(key), dbval)
  114. })
  115. }
  116. // Exists checks if the key exists inside the store
  117. func (b *BoltDB) Exists(key string) (bool, error) {
  118. b.mu.Lock()
  119. defer b.mu.Unlock()
  120. db, err := b.getDBhandle()
  121. if err != nil {
  122. return false, err
  123. }
  124. defer b.releaseDBhandle()
  125. var exists bool
  126. err = db.View(func(tx *bolt.Tx) error {
  127. bucket := tx.Bucket(b.boltBucket)
  128. if bucket == nil {
  129. return store.ErrKeyNotFound
  130. }
  131. exists = len(bucket.Get([]byte(key))) > 0
  132. return nil
  133. })
  134. if err != nil {
  135. return false, err
  136. }
  137. if !exists {
  138. return false, store.ErrKeyNotFound
  139. }
  140. return true, nil
  141. }
  142. // List returns the range of keys starting with the passed in prefix
  143. func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
  144. b.mu.Lock()
  145. defer b.mu.Unlock()
  146. db, err := b.getDBhandle()
  147. if err != nil {
  148. return nil, err
  149. }
  150. defer b.releaseDBhandle()
  151. var kv []*store.KVPair
  152. err = db.View(func(tx *bolt.Tx) error {
  153. bucket := tx.Bucket(b.boltBucket)
  154. if bucket == nil {
  155. return store.ErrKeyNotFound
  156. }
  157. cursor := bucket.Cursor()
  158. prefix := []byte(keyPrefix)
  159. for key, v := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, v = cursor.Next() {
  160. dbIndex := binary.LittleEndian.Uint64(v[:libkvmetadatalen])
  161. v = v[libkvmetadatalen:]
  162. val := make([]byte, len(v))
  163. copy(val, v)
  164. kv = append(kv, &store.KVPair{
  165. Key: string(key),
  166. Value: val,
  167. LastIndex: dbIndex,
  168. })
  169. }
  170. return nil
  171. })
  172. if err != nil {
  173. return nil, err
  174. }
  175. if len(kv) == 0 {
  176. return nil, store.ErrKeyNotFound
  177. }
  178. return kv, nil
  179. }
  180. // AtomicDelete deletes a value at "key" if the key
  181. // has not been modified in the meantime, throws an
  182. // error if this is the case
  183. func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) error {
  184. b.mu.Lock()
  185. defer b.mu.Unlock()
  186. if previous == nil {
  187. return store.ErrPreviousNotSpecified
  188. }
  189. db, err := b.getDBhandle()
  190. if err != nil {
  191. return err
  192. }
  193. defer b.releaseDBhandle()
  194. return db.Update(func(tx *bolt.Tx) error {
  195. bucket := tx.Bucket(b.boltBucket)
  196. if bucket == nil {
  197. return store.ErrKeyNotFound
  198. }
  199. val := bucket.Get([]byte(key))
  200. if val == nil {
  201. return store.ErrKeyNotFound
  202. }
  203. dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
  204. if dbIndex != previous.LastIndex {
  205. return store.ErrKeyModified
  206. }
  207. return bucket.Delete([]byte(key))
  208. })
  209. }
  210. // AtomicPut puts a value at "key" if the key has not been
  211. // modified since the last Put, throws an error if this is the case
  212. func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair) (*store.KVPair, error) {
  213. b.mu.Lock()
  214. defer b.mu.Unlock()
  215. db, err := b.getDBhandle()
  216. if err != nil {
  217. return nil, err
  218. }
  219. defer b.releaseDBhandle()
  220. var dbIndex uint64
  221. dbval := make([]byte, libkvmetadatalen)
  222. err = db.Update(func(tx *bolt.Tx) error {
  223. bucket := tx.Bucket(b.boltBucket)
  224. if bucket == nil {
  225. if previous != nil {
  226. return store.ErrKeyNotFound
  227. }
  228. var err error
  229. bucket, err = tx.CreateBucket(b.boltBucket)
  230. if err != nil {
  231. return err
  232. }
  233. }
  234. // AtomicPut is equivalent to Put if previous is nil and the Ky
  235. // doesn't exist in the DB.
  236. val := bucket.Get([]byte(key))
  237. if previous == nil && len(val) != 0 {
  238. return store.ErrKeyExists
  239. }
  240. if previous != nil {
  241. if len(val) == 0 {
  242. return store.ErrKeyNotFound
  243. }
  244. dbIndex = binary.LittleEndian.Uint64(val[:libkvmetadatalen])
  245. if dbIndex != previous.LastIndex {
  246. return store.ErrKeyModified
  247. }
  248. }
  249. dbIndex = atomic.AddUint64(&b.dbIndex, 1)
  250. binary.LittleEndian.PutUint64(dbval, b.dbIndex)
  251. dbval = append(dbval, value...)
  252. return bucket.Put([]byte(key), dbval)
  253. })
  254. if err != nil {
  255. return nil, err
  256. }
  257. return &store.KVPair{Key: key, Value: value, LastIndex: dbIndex}, nil
  258. }
  259. // Close the db connection to the BoltDB
  260. func (b *BoltDB) Close() {
  261. b.mu.Lock()
  262. defer b.mu.Unlock()
  263. if !b.PersistConnection {
  264. b.reset()
  265. } else {
  266. b.client.Close()
  267. }
  268. }