|
@@ -26,6 +26,7 @@ const filePerm = 0o644
|
|
|
|
|
|
// BoltDB type implements the Store interface
|
|
|
type BoltDB struct {
|
|
|
+ mu sync.Mutex
|
|
|
client *bolt.DB
|
|
|
boltBucket []byte
|
|
|
dbIndex uint64
|
|
@@ -37,7 +38,6 @@ type BoltDB struct {
|
|
|
// PersistConnection flag provides an option to override ths behavior.
|
|
|
// ie: open the connection in New and use it till Close is called.
|
|
|
PersistConnection bool
|
|
|
- sync.Mutex
|
|
|
}
|
|
|
|
|
|
const (
|
|
@@ -117,8 +117,8 @@ func (b *BoltDB) releaseDBhandle() {
|
|
|
// Get the value at "key". BoltDB doesn't provide an inbuilt last modified index with every kv pair. Its implemented by
|
|
|
// by a atomic counter maintained by the libkv and appened to the value passed by the client.
|
|
|
func (b *BoltDB) Get(key string) (*store.KVPair, error) {
|
|
|
- b.Lock()
|
|
|
- defer b.Unlock()
|
|
|
+ b.mu.Lock()
|
|
|
+ defer b.mu.Unlock()
|
|
|
|
|
|
db, err := b.getDBhandle()
|
|
|
if err != nil {
|
|
@@ -154,8 +154,8 @@ func (b *BoltDB) Get(key string) (*store.KVPair, error) {
|
|
|
|
|
|
// Put the key, value pair. index number metadata is prepended to the value
|
|
|
func (b *BoltDB) Put(key string, value []byte) error {
|
|
|
- b.Lock()
|
|
|
- defer b.Unlock()
|
|
|
+ b.mu.Lock()
|
|
|
+ defer b.mu.Unlock()
|
|
|
|
|
|
db, err := b.getDBhandle()
|
|
|
if err != nil {
|
|
@@ -180,8 +180,8 @@ func (b *BoltDB) Put(key string, value []byte) error {
|
|
|
|
|
|
// Delete the value for the given key.
|
|
|
func (b *BoltDB) Delete(key string) error {
|
|
|
- b.Lock()
|
|
|
- defer b.Unlock()
|
|
|
+ b.mu.Lock()
|
|
|
+ defer b.mu.Unlock()
|
|
|
|
|
|
db, err := b.getDBhandle()
|
|
|
if err != nil {
|
|
@@ -200,8 +200,8 @@ func (b *BoltDB) Delete(key string) error {
|
|
|
|
|
|
// Exists checks if the key exists inside the store
|
|
|
func (b *BoltDB) Exists(key string) (bool, error) {
|
|
|
- b.Lock()
|
|
|
- defer b.Unlock()
|
|
|
+ b.mu.Lock()
|
|
|
+ defer b.mu.Unlock()
|
|
|
|
|
|
db, err := b.getDBhandle()
|
|
|
if err != nil {
|
|
@@ -230,8 +230,8 @@ func (b *BoltDB) Exists(key string) (bool, error) {
|
|
|
|
|
|
// List returns the range of keys starting with the passed in prefix
|
|
|
func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
|
|
|
- b.Lock()
|
|
|
- defer b.Unlock()
|
|
|
+ b.mu.Lock()
|
|
|
+ defer b.mu.Unlock()
|
|
|
|
|
|
db, err := b.getDBhandle()
|
|
|
if err != nil {
|
|
@@ -276,8 +276,8 @@ func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
|
|
|
// has not been modified in the meantime, throws an
|
|
|
// error if this is the case
|
|
|
func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) error {
|
|
|
- b.Lock()
|
|
|
- defer b.Unlock()
|
|
|
+ b.mu.Lock()
|
|
|
+ defer b.mu.Unlock()
|
|
|
|
|
|
if previous == nil {
|
|
|
return store.ErrPreviousNotSpecified
|
|
@@ -309,8 +309,8 @@ func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) error {
|
|
|
// AtomicPut puts a value at "key" if the key has not been
|
|
|
// modified since the last Put, throws an error if this is the case
|
|
|
func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair) (*store.KVPair, error) {
|
|
|
- b.Lock()
|
|
|
- defer b.Unlock()
|
|
|
+ b.mu.Lock()
|
|
|
+ defer b.mu.Unlock()
|
|
|
|
|
|
db, err := b.getDBhandle()
|
|
|
if err != nil {
|
|
@@ -360,8 +360,8 @@ func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair) (*s
|
|
|
|
|
|
// Close the db connection to the BoltDB
|
|
|
func (b *BoltDB) Close() {
|
|
|
- b.Lock()
|
|
|
- defer b.Unlock()
|
|
|
+ b.mu.Lock()
|
|
|
+ defer b.mu.Unlock()
|
|
|
|
|
|
if !b.PersistConnection {
|
|
|
b.reset()
|
|
@@ -376,8 +376,8 @@ func (b *BoltDB) DeleteTree(keyPrefix string) error {
|
|
|
db *bolt.DB
|
|
|
err error
|
|
|
)
|
|
|
- b.Lock()
|
|
|
- defer b.Unlock()
|
|
|
+ b.mu.Lock()
|
|
|
+ defer b.mu.Unlock()
|
|
|
|
|
|
if db, err = b.getDBhandle(); err != nil {
|
|
|
return err
|