Merge pull request #47302 from akerouanton/libnet-ds-PersistConnection
libnet: boltdb: remove PersistConnection
This commit is contained in:
commit
701dd989f1
4 changed files with 15 additions and 98 deletions
libnetwork
|
@ -92,7 +92,6 @@ func DefaultScope(dataDir string) ScopeCfg {
|
|||
Config: &store.Config{
|
||||
Bucket: "libnetwork",
|
||||
ConnectionTimeout: time.Minute,
|
||||
PersistConnection: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -29,12 +29,6 @@ type BoltDB struct {
|
|||
dbIndex uint64
|
||||
path string
|
||||
timeout time.Duration
|
||||
// By default libkv opens and closes the bolt DB connection for every
|
||||
// get/put operation. This allows multiple apps to use a Bolt DB at the
|
||||
// same time.
|
||||
// PersistConnection flag provides an option to override ths behavior.
|
||||
// ie: open the connection in New and use it till Close is called.
|
||||
PersistConnection bool
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -53,15 +47,11 @@ func New(endpoint string, options *store.Config) (store.Store, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
var db *bolt.DB
|
||||
if options.PersistConnection {
|
||||
var err error
|
||||
db, err = bolt.Open(endpoint, filePerm, &bolt.Options{
|
||||
Timeout: options.ConnectionTimeout,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
db, err := bolt.Open(endpoint, filePerm, &bolt.Options{
|
||||
Timeout: options.ConnectionTimeout,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
timeout := transientTimeout
|
||||
|
@ -70,50 +60,21 @@ func New(endpoint string, options *store.Config) (store.Store, error) {
|
|||
}
|
||||
|
||||
b := &BoltDB{
|
||||
client: db,
|
||||
path: endpoint,
|
||||
boltBucket: []byte(options.Bucket),
|
||||
timeout: timeout,
|
||||
PersistConnection: options.PersistConnection,
|
||||
client: db,
|
||||
path: endpoint,
|
||||
boltBucket: []byte(options.Bucket),
|
||||
timeout: timeout,
|
||||
}
|
||||
|
||||
return b, nil
|
||||
}
|
||||
|
||||
func (b *BoltDB) reset() {
|
||||
b.path = ""
|
||||
b.boltBucket = []byte{}
|
||||
}
|
||||
|
||||
func (b *BoltDB) getDBhandle() (*bolt.DB, error) {
|
||||
if !b.PersistConnection {
|
||||
db, err := bolt.Open(b.path, filePerm, &bolt.Options{Timeout: b.timeout})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b.client = db
|
||||
}
|
||||
return b.client, nil
|
||||
}
|
||||
|
||||
func (b *BoltDB) releaseDBhandle() {
|
||||
if !b.PersistConnection {
|
||||
b.client.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Put the key, value pair. index number metadata is prepended to the value
|
||||
func (b *BoltDB) Put(key string, value []byte) error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
db, err := b.getDBhandle()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer b.releaseDBhandle()
|
||||
|
||||
return db.Update(func(tx *bolt.Tx) error {
|
||||
return b.client.Update(func(tx *bolt.Tx) error {
|
||||
bucket, err := tx.CreateBucketIfNotExists(b.boltBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -133,14 +94,8 @@ func (b *BoltDB) Exists(key string) (bool, error) {
|
|||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
db, err := b.getDBhandle()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer b.releaseDBhandle()
|
||||
|
||||
var exists bool
|
||||
err = db.View(func(tx *bolt.Tx) error {
|
||||
err := b.client.View(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket(b.boltBucket)
|
||||
if bucket == nil {
|
||||
return store.ErrKeyNotFound
|
||||
|
@ -163,14 +118,8 @@ func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
|
|||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
db, err := b.getDBhandle()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer b.releaseDBhandle()
|
||||
|
||||
var kv []*store.KVPair
|
||||
err = db.View(func(tx *bolt.Tx) error {
|
||||
err := b.client.View(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket(b.boltBucket)
|
||||
if bucket == nil {
|
||||
return store.ErrKeyNotFound
|
||||
|
@ -212,13 +161,8 @@ func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) error {
|
|||
if previous == nil {
|
||||
return store.ErrPreviousNotSpecified
|
||||
}
|
||||
db, err := b.getDBhandle()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer b.releaseDBhandle()
|
||||
|
||||
return db.Update(func(tx *bolt.Tx) error {
|
||||
return b.client.Update(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket(b.boltBucket)
|
||||
if bucket == nil {
|
||||
return store.ErrKeyNotFound
|
||||
|
@ -242,15 +186,9 @@ func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair) (*s
|
|||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
db, err := b.getDBhandle()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer b.releaseDBhandle()
|
||||
|
||||
var dbIndex uint64
|
||||
dbval := make([]byte, libkvmetadatalen)
|
||||
err = db.Update(func(tx *bolt.Tx) error {
|
||||
err := b.client.Update(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket(b.boltBucket)
|
||||
if bucket == nil {
|
||||
if previous != nil {
|
||||
|
@ -293,9 +231,5 @@ func (b *BoltDB) Close() {
|
|||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
if !b.PersistConnection {
|
||||
b.reset()
|
||||
} else {
|
||||
b.client.Close()
|
||||
}
|
||||
b.client.Close()
|
||||
}
|
||||
|
|
|
@ -28,7 +28,6 @@ var (
|
|||
type Config struct {
|
||||
ConnectionTimeout time.Duration
|
||||
Bucket string
|
||||
PersistConnection bool
|
||||
}
|
||||
|
||||
// Store represents the backend K/V storage
|
||||
|
|
|
@ -80,18 +80,3 @@ func OptionBoltdbWithRandomDBFile(t *testing.T) config.Option {
|
|||
c.Scope.Client.Config = &store.Config{Bucket: "testBackend"}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMultipleControllersWithSameStore(t *testing.T) {
|
||||
cfgOptions := OptionBoltdbWithRandomDBFile(t)
|
||||
ctrl1, err := New(cfgOptions)
|
||||
if err != nil {
|
||||
t.Fatalf("Error new controller: %v", err)
|
||||
}
|
||||
defer ctrl1.Stop()
|
||||
// Use the same boltdb file without closing the previous controller
|
||||
ctrl2, err := New(cfgOptions)
|
||||
if err != nil {
|
||||
t.Fatalf("Local store must support concurrent controllers")
|
||||
}
|
||||
ctrl2.Stop()
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue