Integrate github.com/docker/libkv

A reduced set of the dependency, only taking the parts that are used. Taken from
upstream commit: dfacc563de

    # install filter-repo (https://github.com/newren/git-filter-repo/blob/main/INSTALL.md)
    brew install git-filter-repo

    cd ~/projects

    # create a temporary clone of docker
    git clone https://github.com/docker/libkv.git temp_libkv
    cd temp_libkv

    # create branch to work with
    git checkout -b migrate_libkv

    # remove all code, except for the files we need; rename the remaining ones to their new target location
    git filter-repo --force \
        --path libkv.go \
        --path store/store.go \
        --path store/boltdb/boltdb.go \
        --path-rename libkv.go:libnetwork/internal/kvstore/kvstore_manage.go \
        --path-rename store/store.go:libnetwork/internal/kvstore/kvstore.go \
        --path-rename store/boltdb/:libnetwork/internal/kvstore/boltdb/

    # go to the target github.com/moby/moby repository
    cd ~/projects/docker

    # create a branch to work with
    git checkout -b integrate_libkv

    # add the temporary repository as an upstream and make sure it's up-to-date
    git remote add temp_libkv ~/projects/temp_libkv
    git fetch temp_libkv

    # merge the upstream code, rewriting "pkg/symlink" to "symlink"
    git merge --allow-unrelated-histories --signoff -S temp_libkv/migrate_libkv

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2023-06-26 20:47:08 +02:00
commit 3887475971
No known key found for this signature in database
GPG key ID: 76698F39D527CE8C
3 changed files with 646 additions and 0 deletions

View file

@ -0,0 +1,474 @@
package boltdb
import (
"bytes"
"encoding/binary"
"errors"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
bolt "go.etcd.io/bbolt"
)
var (
// ErrMultipleEndpointsUnsupported is thrown when multiple endpoints specified for
// BoltDB. Endpoint has to be a local file path
ErrMultipleEndpointsUnsupported = errors.New("boltdb supports one endpoint and should be a file path")
// ErrBoltBucketOptionMissing is thrown when boltBcuket config option is missing
ErrBoltBucketOptionMissing = errors.New("boltBucket config option missing")
)
const (
filePerm os.FileMode = 0644
)
//BoltDB type implements the Store interface
type BoltDB struct {
client *bolt.DB
boltBucket []byte
dbIndex uint64
path string
timeout time.Duration
// By default libkv opens and closes the bolt DB connection for every
// get/put operation. This allows multiple apps to use a Bolt DB at the
// same time.
// PersistConnection flag provides an option to override ths behavior.
// ie: open the connection in New and use it till Close is called.
PersistConnection bool
sync.Mutex
}
const (
libkvmetadatalen = 8
transientTimeout = time.Duration(10) * time.Second
)
// Register registers boltdb to libkv
func Register() {
libkv.AddStore(store.BOLTDB, New)
}
// New opens a new BoltDB connection to the specified path and bucket
func New(endpoints []string, options *store.Config) (store.Store, error) {
var (
db *bolt.DB
err error
boltOptions *bolt.Options
timeout = transientTimeout
)
if len(endpoints) > 1 {
return nil, ErrMultipleEndpointsUnsupported
}
if (options == nil) || (len(options.Bucket) == 0) {
return nil, ErrBoltBucketOptionMissing
}
dir, _ := filepath.Split(endpoints[0])
if err = os.MkdirAll(dir, 0750); err != nil {
return nil, err
}
if options.PersistConnection {
boltOptions = &bolt.Options{Timeout: options.ConnectionTimeout}
db, err = bolt.Open(endpoints[0], filePerm, boltOptions)
if err != nil {
return nil, err
}
}
if options.ConnectionTimeout != 0 {
timeout = options.ConnectionTimeout
}
b := &BoltDB{
client: db,
path: endpoints[0],
boltBucket: []byte(options.Bucket),
timeout: timeout,
PersistConnection: options.PersistConnection,
}
return b, nil
}
func (b *BoltDB) reset() {
b.path = ""
b.boltBucket = []byte{}
}
func (b *BoltDB) getDBhandle() (*bolt.DB, error) {
var (
db *bolt.DB
err error
)
if !b.PersistConnection {
boltOptions := &bolt.Options{Timeout: b.timeout}
if db, err = bolt.Open(b.path, filePerm, boltOptions); err != nil {
return nil, err
}
b.client = db
}
return b.client, nil
}
func (b *BoltDB) releaseDBhandle() {
if !b.PersistConnection {
b.client.Close()
}
}
// Get the value at "key". BoltDB doesn't provide an inbuilt last modified index with every kv pair. Its implemented by
// by a atomic counter maintained by the libkv and appened to the value passed by the client.
func (b *BoltDB) Get(key string) (*store.KVPair, error) {
var (
val []byte
db *bolt.DB
err error
)
b.Lock()
defer b.Unlock()
if db, err = b.getDBhandle(); err != nil {
return nil, err
}
defer b.releaseDBhandle()
err = db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return store.ErrKeyNotFound
}
v := bucket.Get([]byte(key))
val = make([]byte, len(v))
copy(val, v)
return nil
})
if len(val) == 0 {
return nil, store.ErrKeyNotFound
}
if err != nil {
return nil, err
}
dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
val = val[libkvmetadatalen:]
return &store.KVPair{Key: key, Value: val, LastIndex: (dbIndex)}, nil
}
//Put the key, value pair. index number metadata is prepended to the value
func (b *BoltDB) Put(key string, value []byte, opts *store.WriteOptions) error {
var (
dbIndex uint64
db *bolt.DB
err error
)
b.Lock()
defer b.Unlock()
dbval := make([]byte, libkvmetadatalen)
if db, err = b.getDBhandle(); err != nil {
return err
}
defer b.releaseDBhandle()
err = db.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(b.boltBucket)
if err != nil {
return err
}
dbIndex = atomic.AddUint64(&b.dbIndex, 1)
binary.LittleEndian.PutUint64(dbval, dbIndex)
dbval = append(dbval, value...)
err = bucket.Put([]byte(key), dbval)
if err != nil {
return err
}
return nil
})
return err
}
//Delete the value for the given key.
func (b *BoltDB) Delete(key string) error {
var (
db *bolt.DB
err error
)
b.Lock()
defer b.Unlock()
if db, err = b.getDBhandle(); err != nil {
return err
}
defer b.releaseDBhandle()
err = db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return store.ErrKeyNotFound
}
err := bucket.Delete([]byte(key))
return err
})
return err
}
// Exists checks if the key exists inside the store
func (b *BoltDB) Exists(key string) (bool, error) {
var (
val []byte
db *bolt.DB
err error
)
b.Lock()
defer b.Unlock()
if db, err = b.getDBhandle(); err != nil {
return false, err
}
defer b.releaseDBhandle()
err = db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return store.ErrKeyNotFound
}
val = bucket.Get([]byte(key))
return nil
})
if len(val) == 0 {
return false, err
}
return true, err
}
// List returns the range of keys starting with the passed in prefix
func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
var (
db *bolt.DB
err error
)
b.Lock()
defer b.Unlock()
kv := []*store.KVPair{}
if db, err = b.getDBhandle(); err != nil {
return nil, err
}
defer b.releaseDBhandle()
err = db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return store.ErrKeyNotFound
}
cursor := bucket.Cursor()
prefix := []byte(keyPrefix)
for key, v := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, v = cursor.Next() {
dbIndex := binary.LittleEndian.Uint64(v[:libkvmetadatalen])
v = v[libkvmetadatalen:]
val := make([]byte, len(v))
copy(val, v)
kv = append(kv, &store.KVPair{
Key: string(key),
Value: val,
LastIndex: dbIndex,
})
}
return nil
})
if len(kv) == 0 {
return nil, store.ErrKeyNotFound
}
return kv, err
}
// AtomicDelete deletes a value at "key" if the key
// has not been modified in the meantime, throws an
// error if this is the case
func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) (bool, error) {
var (
val []byte
db *bolt.DB
err error
)
b.Lock()
defer b.Unlock()
if previous == nil {
return false, store.ErrPreviousNotSpecified
}
if db, err = b.getDBhandle(); err != nil {
return false, err
}
defer b.releaseDBhandle()
err = db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return store.ErrKeyNotFound
}
val = bucket.Get([]byte(key))
if val == nil {
return store.ErrKeyNotFound
}
dbIndex := binary.LittleEndian.Uint64(val[:libkvmetadatalen])
if dbIndex != previous.LastIndex {
return store.ErrKeyModified
}
err := bucket.Delete([]byte(key))
return err
})
if err != nil {
return false, err
}
return true, err
}
// AtomicPut puts a value at "key" if the key has not been
// modified since the last Put, throws an error if this is the case
func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
var (
val []byte
dbIndex uint64
db *bolt.DB
err error
)
b.Lock()
defer b.Unlock()
dbval := make([]byte, libkvmetadatalen)
if db, err = b.getDBhandle(); err != nil {
return false, nil, err
}
defer b.releaseDBhandle()
err = db.Update(func(tx *bolt.Tx) error {
var err error
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
if previous != nil {
return store.ErrKeyNotFound
}
bucket, err = tx.CreateBucket(b.boltBucket)
if err != nil {
return err
}
}
// AtomicPut is equivalent to Put if previous is nil and the Ky
// doesn't exist in the DB.
val = bucket.Get([]byte(key))
if previous == nil && len(val) != 0 {
return store.ErrKeyExists
}
if previous != nil {
if len(val) == 0 {
return store.ErrKeyNotFound
}
dbIndex = binary.LittleEndian.Uint64(val[:libkvmetadatalen])
if dbIndex != previous.LastIndex {
return store.ErrKeyModified
}
}
dbIndex = atomic.AddUint64(&b.dbIndex, 1)
binary.LittleEndian.PutUint64(dbval, b.dbIndex)
dbval = append(dbval, value...)
return (bucket.Put([]byte(key), dbval))
})
if err != nil {
return false, nil, err
}
updated := &store.KVPair{
Key: key,
Value: value,
LastIndex: dbIndex,
}
return true, updated, nil
}
// Close the db connection to the BoltDB
func (b *BoltDB) Close() {
b.Lock()
defer b.Unlock()
if !b.PersistConnection {
b.reset()
} else {
b.client.Close()
}
return
}
// DeleteTree deletes a range of keys with a given prefix
func (b *BoltDB) DeleteTree(keyPrefix string) error {
var (
db *bolt.DB
err error
)
b.Lock()
defer b.Unlock()
if db, err = b.getDBhandle(); err != nil {
return err
}
defer b.releaseDBhandle()
err = db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return store.ErrKeyNotFound
}
cursor := bucket.Cursor()
prefix := []byte(keyPrefix)
for key, _ := cursor.Seek(prefix); bytes.HasPrefix(key, prefix); key, _ = cursor.Next() {
_ = bucket.Delete([]byte(key))
}
return nil
})
return err
}
// NewLock has to implemented at the library level since its not supported by BoltDB
func (b *BoltDB) NewLock(key string, options *store.LockOptions) (store.Locker, error) {
return nil, store.ErrCallNotSupported
}
// Watch has to implemented at the library level since its not supported by BoltDB
func (b *BoltDB) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) {
return nil, store.ErrCallNotSupported
}
// WatchTree has to implemented at the library level since its not supported by BoltDB
func (b *BoltDB) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) {
return nil, store.ErrCallNotSupported
}

View file

@ -0,0 +1,132 @@
package store
import (
"crypto/tls"
"errors"
"time"
)
// Backend represents a KV Store Backend
type Backend string
const (
// CONSUL backend
CONSUL Backend = "consul"
// ETCD backend
ETCD Backend = "etcd"
// ZK backend
ZK Backend = "zk"
// BOLTDB backend
BOLTDB Backend = "boltdb"
)
var (
// ErrBackendNotSupported is thrown when the backend k/v store is not supported by libkv
ErrBackendNotSupported = errors.New("Backend storage not supported yet, please choose one of")
// ErrCallNotSupported is thrown when a method is not implemented/supported by the current backend
ErrCallNotSupported = errors.New("The current call is not supported with this backend")
// ErrNotReachable is thrown when the API cannot be reached for issuing common store operations
ErrNotReachable = errors.New("Api not reachable")
// ErrCannotLock is thrown when there is an error acquiring a lock on a key
ErrCannotLock = errors.New("Error acquiring the lock")
// ErrKeyModified is thrown during an atomic operation if the index does not match the one in the store
ErrKeyModified = errors.New("Unable to complete atomic operation, key modified")
// ErrKeyNotFound is thrown when the key is not found in the store during a Get operation
ErrKeyNotFound = errors.New("Key not found in store")
// ErrPreviousNotSpecified is thrown when the previous value is not specified for an atomic operation
ErrPreviousNotSpecified = errors.New("Previous K/V pair should be provided for the Atomic operation")
// ErrKeyExists is thrown when the previous value exists in the case of an AtomicPut
ErrKeyExists = errors.New("Previous K/V pair exists, cannot complete Atomic operation")
)
// Config contains the options for a storage client
type Config struct {
ClientTLS *ClientTLSConfig
TLS *tls.Config
ConnectionTimeout time.Duration
Bucket string
PersistConnection bool
Username string
Password string
}
// ClientTLSConfig contains data for a Client TLS configuration in the form
// the etcd client wants it. Eventually we'll adapt it for ZK and Consul.
type ClientTLSConfig struct {
CertFile string
KeyFile string
CACertFile string
}
// Store represents the backend K/V storage
// Each store should support every call listed
// here. Or it couldn't be implemented as a K/V
// backend for libkv
type Store interface {
// Put a value at the specified key
Put(key string, value []byte, options *WriteOptions) error
// Get a value given its key
Get(key string) (*KVPair, error)
// Delete the value at the specified key
Delete(key string) error
// Verify if a Key exists in the store
Exists(key string) (bool, error)
// Watch for changes on a key
Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error)
// WatchTree watches for changes on child nodes under
// a given directory
WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*KVPair, error)
// NewLock creates a lock for a given key.
// The returned Locker is not held and must be acquired
// with `.Lock`. The Value is optional.
NewLock(key string, options *LockOptions) (Locker, error)
// List the content of a given prefix
List(directory string) ([]*KVPair, error)
// DeleteTree deletes a range of keys under a given directory
DeleteTree(directory string) error
// Atomic CAS operation on a single value.
// Pass previous = nil to create a new key.
AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error)
// Atomic delete of a single value
AtomicDelete(key string, previous *KVPair) (bool, error)
// Close the store connection
Close()
}
// KVPair represents {Key, Value, Lastindex} tuple
type KVPair struct {
Key string
Value []byte
LastIndex uint64
}
// WriteOptions contains optional request parameters
type WriteOptions struct {
IsDir bool
TTL time.Duration
}
// LockOptions contains optional request parameters
type LockOptions struct {
Value []byte // Optional, value to associate with the lock
TTL time.Duration // Optional, expiration ttl associated with the lock
RenewLock chan struct{} // Optional, chan used to control and stop the session ttl renewal for the lock
}
// Locker provides locking mechanism on top of the store.
// Similar to `sync.Lock` except it may return errors.
type Locker interface {
Lock(stopChan chan struct{}) (<-chan struct{}, error)
Unlock() error
}

View file

@ -0,0 +1,40 @@
package libkv
import (
"fmt"
"sort"
"strings"
"github.com/docker/libkv/store"
)
// Initialize creates a new Store object, initializing the client
type Initialize func(addrs []string, options *store.Config) (store.Store, error)
var (
// Backend initializers
initializers = make(map[store.Backend]Initialize)
supportedBackend = func() string {
keys := make([]string, 0, len(initializers))
for k := range initializers {
keys = append(keys, string(k))
}
sort.Strings(keys)
return strings.Join(keys, ", ")
}()
)
// NewStore creates an instance of store
func NewStore(backend store.Backend, addrs []string, options *store.Config) (store.Store, error) {
if init, exists := initializers[backend]; exists {
return init(addrs, options)
}
return nil, fmt.Errorf("%s %s", store.ErrBackendNotSupported.Error(), supportedBackend)
}
// AddStore adds a new store backend to libkv
func AddStore(store store.Backend, init Initialize) {
initializers[store] = init
}