fix bugs 'fatal error: concurrent map read and map write' to change VolumeStore.globalLock type from Mutex to RWMutex, and add globalLock.RLock() for the read of names, refs, labels and options in VolumeStore

Signed-off-by: He Xin <he_xinworld@126.com>
(cherry picked from commit 19bd1cee23)
Signed-off-by: Victor Vieux <victorvieux@gmail.com>
This commit is contained in:
He Xin 2016-11-16 11:35:39 +08:00 committed by Victor Vieux
parent df8a5e48de
commit 649445a206

View file

@ -103,9 +103,9 @@ func New(rootPath string) (*VolumeStore, error) {
} }
func (s *VolumeStore) getNamed(name string) (volume.Volume, bool) { func (s *VolumeStore) getNamed(name string) (volume.Volume, bool) {
s.globalLock.Lock() s.globalLock.RLock()
v, exists := s.names[name] v, exists := s.names[name]
s.globalLock.Unlock() s.globalLock.RUnlock()
return v, exists return v, exists
} }
@ -121,9 +121,9 @@ func (s *VolumeStore) setNamed(v volume.Volume, ref string) {
// getRefs gets the list of refs for a given name // getRefs gets the list of refs for a given name
// Callers of this function are expected to hold the name lock. // Callers of this function are expected to hold the name lock.
func (s *VolumeStore) getRefs(name string) []string { func (s *VolumeStore) getRefs(name string) []string {
s.globalLock.Lock() s.globalLock.RLock()
refs := s.refs[name] refs := s.refs[name]
s.globalLock.Unlock() s.globalLock.RUnlock()
return refs return refs
} }
@ -140,8 +140,11 @@ func (s *VolumeStore) Purge(name string) {
// VolumeStore is a struct that stores the list of volumes available and keeps track of their usage counts // VolumeStore is a struct that stores the list of volumes available and keeps track of their usage counts
type VolumeStore struct { type VolumeStore struct {
// locks ensures that only one action is being performed on a particular volume at a time without locking the entire store
// since actions on volumes can be quite slow, this ensures the store is free to handle requests for other volumes.
locks *locker.Locker locks *locker.Locker
globalLock sync.Mutex // globalLock is used to protect access to mutable structures used by the store object
globalLock sync.RWMutex
// names stores the volume name -> volume relationship. // names stores the volume name -> volume relationship.
// This is used for making lookups faster so we don't have to probe all drivers // This is used for making lookups faster so we don't have to probe all drivers
names map[string]volume.Volume names map[string]volume.Volume
@ -210,7 +213,9 @@ func (s *VolumeStore) list() ([]volume.Volume, []string, error) {
return return
} }
for i, v := range vs { for i, v := range vs {
s.globalLock.RLock()
vs[i] = volumeWrapper{v, s.labels[v.Name()], d.Scope(), s.options[v.Name()]} vs[i] = volumeWrapper{v, s.labels[v.Name()], d.Scope(), s.options[v.Name()]}
s.globalLock.RUnlock()
} }
chVols <- vols{vols: vs} chVols <- vols{vols: vs}
@ -230,11 +235,13 @@ func (s *VolumeStore) list() ([]volume.Volume, []string, error) {
} }
if len(badDrivers) > 0 { if len(badDrivers) > 0 {
s.globalLock.RLock()
for _, v := range s.names { for _, v := range s.names {
if _, exists := badDrivers[v.DriverName()]; exists { if _, exists := badDrivers[v.DriverName()]; exists {
ls = append(ls, v) ls = append(ls, v)
} }
} }
s.globalLock.RUnlock()
} }
return ls, warnings, nil return ls, warnings, nil
} }
@ -423,6 +430,8 @@ func (s *VolumeStore) GetWithRef(name, driverName, ref string) (volume.Volume, e
s.setNamed(v, ref) s.setNamed(v, ref)
s.globalLock.RLock()
defer s.globalLock.RUnlock()
return volumeWrapper{v, s.labels[name], vd.Scope(), s.options[name]}, nil return volumeWrapper{v, s.labels[name], vd.Scope(), s.options[name]}, nil
} }
@ -473,9 +482,9 @@ func (s *VolumeStore) getVolume(name string) (volume.Volume, error) {
} }
logrus.Debugf("Getting volume reference for name: %s", name) logrus.Debugf("Getting volume reference for name: %s", name)
s.globalLock.Lock() s.globalLock.RLock()
v, exists := s.names[name] v, exists := s.names[name]
s.globalLock.Unlock() s.globalLock.RUnlock()
if exists { if exists {
vd, err := volumedrivers.GetDriver(v.DriverName()) vd, err := volumedrivers.GetDriver(v.DriverName())
if err != nil { if err != nil {
@ -571,10 +580,12 @@ func (s *VolumeStore) FilterByDriver(name string) ([]volume.Volume, error) {
} }
for i, v := range ls { for i, v := range ls {
options := map[string]string{} options := map[string]string{}
s.globalLock.RLock()
for key, value := range s.options[v.Name()] { for key, value := range s.options[v.Name()] {
options[key] = value options[key] = value
} }
ls[i] = volumeWrapper{v, s.labels[v.Name()], vd.Scope(), options} ls[i] = volumeWrapper{v, s.labels[v.Name()], vd.Scope(), options}
s.globalLock.RUnlock()
} }
return ls, nil return ls, nil
} }