|
@@ -205,10 +205,11 @@ func (s *Store) Close() error {
|
|
|
|
|
|
type StorageItem struct {
|
|
type StorageItem struct {
|
|
id string
|
|
id string
|
|
|
|
+ vmu sync.RWMutex
|
|
values map[string]*Value
|
|
values map[string]*Value
|
|
|
|
+ qmu sync.Mutex
|
|
queue []func(*bolt.Bucket) error
|
|
queue []func(*bolt.Bucket) error
|
|
storage *Store
|
|
storage *Store
|
|
- mu sync.RWMutex
|
|
|
|
}
|
|
}
|
|
|
|
|
|
func newStorageItem(id string, b *bolt.Bucket, s *Store) (*StorageItem, error) {
|
|
func newStorageItem(id string, b *bolt.Bucket, s *Store) (*StorageItem, error) {
|
|
@@ -242,10 +243,6 @@ func (s *StorageItem) ID() string {
|
|
return s.id
|
|
return s.id
|
|
}
|
|
}
|
|
|
|
|
|
-func (s *StorageItem) View(fn func(b *bolt.Bucket) error) error {
|
|
|
|
- return s.storage.View(s.id, fn)
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
func (s *StorageItem) Update(fn func(b *bolt.Bucket) error) error {
|
|
func (s *StorageItem) Update(fn func(b *bolt.Bucket) error) error {
|
|
return s.storage.Update(s.id, fn)
|
|
return s.storage.Update(s.id, fn)
|
|
}
|
|
}
|
|
@@ -255,17 +252,19 @@ func (s *StorageItem) Metadata() *StorageItem {
|
|
}
|
|
}
|
|
|
|
|
|
func (s *StorageItem) Keys() []string {
|
|
func (s *StorageItem) Keys() []string {
|
|
|
|
+ s.vmu.RLock()
|
|
keys := make([]string, 0, len(s.values))
|
|
keys := make([]string, 0, len(s.values))
|
|
for k := range s.values {
|
|
for k := range s.values {
|
|
keys = append(keys, k)
|
|
keys = append(keys, k)
|
|
}
|
|
}
|
|
|
|
+ s.vmu.RUnlock()
|
|
return keys
|
|
return keys
|
|
}
|
|
}
|
|
|
|
|
|
func (s *StorageItem) Get(k string) *Value {
|
|
func (s *StorageItem) Get(k string) *Value {
|
|
- s.mu.RLock()
|
|
|
|
|
|
+ s.vmu.RLock()
|
|
v := s.values[k]
|
|
v := s.values[k]
|
|
- s.mu.RUnlock()
|
|
|
|
|
|
+ s.vmu.RUnlock()
|
|
return v
|
|
return v
|
|
}
|
|
}
|
|
|
|
|
|
@@ -307,14 +306,14 @@ func (s *StorageItem) SetExternal(k string, dt []byte) error {
|
|
}
|
|
}
|
|
|
|
|
|
func (s *StorageItem) Queue(fn func(b *bolt.Bucket) error) {
|
|
func (s *StorageItem) Queue(fn func(b *bolt.Bucket) error) {
|
|
- s.mu.Lock()
|
|
|
|
- defer s.mu.Unlock()
|
|
|
|
|
|
+ s.qmu.Lock()
|
|
|
|
+ defer s.qmu.Unlock()
|
|
s.queue = append(s.queue, fn)
|
|
s.queue = append(s.queue, fn)
|
|
}
|
|
}
|
|
|
|
|
|
func (s *StorageItem) Commit() error {
|
|
func (s *StorageItem) Commit() error {
|
|
- s.mu.Lock()
|
|
|
|
- defer s.mu.Unlock()
|
|
|
|
|
|
+ s.qmu.Lock()
|
|
|
|
+ defer s.qmu.Unlock()
|
|
return errors.WithStack(s.Update(func(b *bolt.Bucket) error {
|
|
return errors.WithStack(s.Update(func(b *bolt.Bucket) error {
|
|
for _, fn := range s.queue {
|
|
for _, fn := range s.queue {
|
|
if err := fn(b); err != nil {
|
|
if err := fn(b); err != nil {
|
|
@@ -327,15 +326,23 @@ func (s *StorageItem) Commit() error {
|
|
}
|
|
}
|
|
|
|
|
|
func (s *StorageItem) Indexes() (out []string) {
|
|
func (s *StorageItem) Indexes() (out []string) {
|
|
|
|
+ s.vmu.RLock()
|
|
for _, v := range s.values {
|
|
for _, v := range s.values {
|
|
if v.Index != "" {
|
|
if v.Index != "" {
|
|
out = append(out, v.Index)
|
|
out = append(out, v.Index)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ s.vmu.RUnlock()
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
func (s *StorageItem) SetValue(b *bolt.Bucket, key string, v *Value) error {
|
|
func (s *StorageItem) SetValue(b *bolt.Bucket, key string, v *Value) error {
|
|
|
|
+ s.vmu.Lock()
|
|
|
|
+ defer s.vmu.Unlock()
|
|
|
|
+ return s.setValue(b, key, v)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (s *StorageItem) setValue(b *bolt.Bucket, key string, v *Value) error {
|
|
if v == nil {
|
|
if v == nil {
|
|
if old, ok := s.values[key]; ok {
|
|
if old, ok := s.values[key]; ok {
|
|
if old.Index != "" {
|
|
if old.Index != "" {
|
|
@@ -375,16 +382,16 @@ func (s *StorageItem) SetValue(b *bolt.Bucket, key string, v *Value) error {
|
|
var ErrSkipSetValue = errors.New("skip setting metadata value")
|
|
var ErrSkipSetValue = errors.New("skip setting metadata value")
|
|
|
|
|
|
func (s *StorageItem) GetAndSetValue(key string, fn func(*Value) (*Value, error)) error {
|
|
func (s *StorageItem) GetAndSetValue(key string, fn func(*Value) (*Value, error)) error {
|
|
- s.mu.Lock()
|
|
|
|
- defer s.mu.Unlock()
|
|
|
|
return s.Update(func(b *bolt.Bucket) error {
|
|
return s.Update(func(b *bolt.Bucket) error {
|
|
|
|
+ s.vmu.Lock()
|
|
|
|
+ defer s.vmu.Unlock()
|
|
v, err := fn(s.values[key])
|
|
v, err := fn(s.values[key])
|
|
if errors.Is(err, ErrSkipSetValue) {
|
|
if errors.Is(err, ErrSkipSetValue) {
|
|
return nil
|
|
return nil
|
|
} else if err != nil {
|
|
} else if err != nil {
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
- return s.SetValue(b, key, v)
|
|
|
|
|
|
+ return s.setValue(b, key, v)
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
|