瀏覽代碼

Merge pull request #29191 from anusha-ragunathan/plugin_races

Make v2/Plugin accesses safe.
Anusha Ragunathan 8 年之前
父節點
當前提交
9d898b872e
共有 7 個文件被更改,包括 133 次插入74 次删除
  1. 13 10
      plugin/backend_linux.go
  2. 27 10
      plugin/manager.go
  3. 24 21
      plugin/manager_linux.go
  4. 2 2
      plugin/manager_solaris.go
  5. 2 2
      plugin/manager_windows.go
  6. 1 3
      plugin/store/store.go
  7. 64 26
      plugin/v2/plugin.go

+ 13 - 10
plugin/backend_linux.go

@@ -37,7 +37,11 @@ func (pm *Manager) Disable(name string) error {
 	if err != nil {
 		return err
 	}
-	if err := pm.disable(p); err != nil {
+	pm.mu.RLock()
+	c := pm.cMap[p]
+	pm.mu.RUnlock()
+
+	if err := pm.disable(p, c); err != nil {
 		return err
 	}
 	pm.pluginEventLogger(p.GetID(), name, "disable")
@@ -46,14 +50,13 @@ func (pm *Manager) Disable(name string) error {
 
 // Enable activates a plugin, which implies that they are ready to be used by containers.
 func (pm *Manager) Enable(name string, config *types.PluginEnableConfig) error {
-
 	p, err := pm.pluginStore.GetByName(name)
 	if err != nil {
 		return err
 	}
 
-	p.TimeoutInSecs = config.Timeout
-	if err := pm.enable(p, false); err != nil {
+	c := &controller{timeoutInSecs: config.Timeout}
+	if err := pm.enable(p, c, false); err != nil {
 		return err
 	}
 	pm.pluginEventLogger(p.GetID(), name, "enable")
@@ -267,25 +270,25 @@ func (pm *Manager) Push(name string, metaHeader http.Header, authConfig *types.A
 // Remove deletes plugin's root directory.
 func (pm *Manager) Remove(name string, config *types.PluginRmConfig) error {
 	p, err := pm.pluginStore.GetByName(name)
+	pm.mu.RLock()
+	c := pm.cMap[p]
+	pm.mu.RUnlock()
+
 	if err != nil {
 		return err
 	}
 
 	if !config.ForceRemove {
-		p.RLock()
-		if p.RefCount > 0 {
-			p.RUnlock()
+		if p.GetRefCount() > 0 {
 			return fmt.Errorf("plugin %s is in use", p.Name())
 		}
-		p.RUnlock()
-
 		if p.IsEnabled() {
 			return fmt.Errorf("plugin %s is enabled", p.Name())
 		}
 	}
 
 	if p.IsEnabled() {
-		if err := pm.disable(p); err != nil {
+		if err := pm.disable(p, c); err != nil {
 			logrus.Errorf("failed to disable plugin '%s': %s", p.Name(), err)
 		}
 	}

+ 27 - 10
plugin/manager.go

@@ -19,7 +19,7 @@ var (
 )
 
 func (pm *Manager) restorePlugin(p *v2.Plugin) error {
-	p.RuntimeSourcePath = filepath.Join(pm.runRoot, p.GetID())
+	p.Restore(pm.runRoot)
 	if p.IsEnabled() {
 		return pm.restore(p)
 	}
@@ -37,6 +37,15 @@ type Manager struct {
 	registryService   registry.Service
 	liveRestore       bool
 	pluginEventLogger eventLogger
+	mu                sync.RWMutex // protects cMap
+	cMap              map[*v2.Plugin]*controller
+}
+
+// controller represents the manager's control on a plugin.
+type controller struct {
+	restart       bool
+	exitChan      chan bool
+	timeoutInSecs int
 }
 
 // GetManager returns the singleton plugin Manager
@@ -67,7 +76,8 @@ func Init(root string, ps *store.Store, remote libcontainerd.Remote, rs registry
 	if err != nil {
 		return err
 	}
-	if err := manager.init(); err != nil {
+	manager.cMap = make(map[*v2.Plugin]*controller)
+	if err := manager.reload(); err != nil {
 		return err
 	}
 	return nil
@@ -83,22 +93,27 @@ func (pm *Manager) StateChanged(id string, e libcontainerd.StateInfo) error {
 		if err != nil {
 			return err
 		}
-		p.RLock()
-		if p.ExitChan != nil {
-			close(p.ExitChan)
+
+		pm.mu.RLock()
+		c := pm.cMap[p]
+
+		if c.exitChan != nil {
+			close(c.exitChan)
 		}
-		restart := p.Restart
-		p.RUnlock()
+		restart := c.restart
+		pm.mu.RUnlock()
+
 		p.RemoveFromDisk()
 		if restart {
-			pm.enable(p, true)
+			pm.enable(p, c, true)
 		}
 	}
 
 	return nil
 }
 
-func (pm *Manager) init() error {
+// reload is used on daemon restarts to load the manager's state
+func (pm *Manager) reload() error {
 	dt, err := os.Open(filepath.Join(pm.libRoot, "plugins.json"))
 	if err != nil {
 		if os.IsNotExist(err) {
@@ -117,6 +132,8 @@ func (pm *Manager) init() error {
 	var group sync.WaitGroup
 	group.Add(len(plugins))
 	for _, p := range plugins {
+		c := &controller{}
+		pm.cMap[p] = c
 		go func(p *v2.Plugin) {
 			defer group.Done()
 			if err := pm.restorePlugin(p); err != nil {
@@ -129,7 +146,7 @@ func (pm *Manager) init() error {
 
 			if requiresManualRestore {
 				// if liveRestore is not enabled, the plugin will be stopped now so we should enable it
-				if err := pm.enable(p, true); err != nil {
+				if err := pm.enable(p, c, true); err != nil {
 					logrus.Errorf("failed to enable plugin '%s': %s", p.Name(), err)
 				}
 			}

+ 24 - 21
plugin/manager_linux.go

@@ -16,7 +16,7 @@ import (
 	specs "github.com/opencontainers/runtime-spec/specs-go"
 )
 
-func (pm *Manager) enable(p *v2.Plugin, force bool) error {
+func (pm *Manager) enable(p *v2.Plugin, c *controller, force bool) error {
 	if p.IsEnabled() && !force {
 		return fmt.Errorf("plugin %s is already enabled", p.Name())
 	}
@@ -24,23 +24,26 @@ func (pm *Manager) enable(p *v2.Plugin, force bool) error {
 	if err != nil {
 		return err
 	}
-	p.Lock()
-	p.Restart = true
-	p.ExitChan = make(chan bool)
-	p.Unlock()
+
+	c.restart = true
+	c.exitChan = make(chan bool)
+
+	pm.mu.Lock()
+	pm.cMap[p] = c
+	pm.mu.Unlock()
+
 	if err := pm.containerdClient.Create(p.GetID(), "", "", specs.Spec(*spec), attachToLog(p.GetID())); err != nil {
 		return err
 	}
 
-	p.PClient, err = plugins.NewClientWithTimeout("unix://"+filepath.Join(p.RuntimeSourcePath, p.GetSocket()), nil, p.TimeoutInSecs)
+	client, err := plugins.NewClientWithTimeout("unix://"+filepath.Join(p.GetRuntimeSourcePath(), p.GetSocket()), nil, c.timeoutInSecs)
 	if err != nil {
-		p.Lock()
-		p.Restart = false
-		p.Unlock()
-		shutdownPlugin(p, pm.containerdClient)
+		c.restart = false
+		shutdownPlugin(p, c, pm.containerdClient)
 		return err
 	}
 
+	p.SetPClient(client)
 	pm.pluginStore.SetState(p, true)
 	pm.pluginStore.CallHandler(p)
 
@@ -51,7 +54,7 @@ func (pm *Manager) restore(p *v2.Plugin) error {
 	return pm.containerdClient.Restore(p.GetID(), attachToLog(p.GetID()))
 }
 
-func shutdownPlugin(p *v2.Plugin, containerdClient libcontainerd.Client) {
+func shutdownPlugin(p *v2.Plugin, c *controller, containerdClient libcontainerd.Client) {
 	pluginID := p.GetID()
 
 	err := containerdClient.Signal(pluginID, int(syscall.SIGTERM))
@@ -59,7 +62,7 @@ func shutdownPlugin(p *v2.Plugin, containerdClient libcontainerd.Client) {
 		logrus.Errorf("Sending SIGTERM to plugin failed with error: %v", err)
 	} else {
 		select {
-		case <-p.ExitChan:
+		case <-c.exitChan:
 			logrus.Debug("Clean shutdown of plugin")
 		case <-time.After(time.Second * 10):
 			logrus.Debug("Force shutdown plugin")
@@ -70,15 +73,13 @@ func shutdownPlugin(p *v2.Plugin, containerdClient libcontainerd.Client) {
 	}
 }
 
-func (pm *Manager) disable(p *v2.Plugin) error {
+func (pm *Manager) disable(p *v2.Plugin, c *controller) error {
 	if !p.IsEnabled() {
 		return fmt.Errorf("plugin %s is already disabled", p.Name())
 	}
-	p.Lock()
-	p.Restart = false
-	p.Unlock()
 
-	shutdownPlugin(p, pm.containerdClient)
+	c.restart = false
+	shutdownPlugin(p, c, pm.containerdClient)
 	pm.pluginStore.SetState(p, false)
 	return nil
 }
@@ -87,15 +88,17 @@ func (pm *Manager) disable(p *v2.Plugin) error {
 func (pm *Manager) Shutdown() {
 	plugins := pm.pluginStore.GetAll()
 	for _, p := range plugins {
+		pm.mu.RLock()
+		c := pm.cMap[p]
+		pm.mu.RUnlock()
+
 		if pm.liveRestore && p.IsEnabled() {
 			logrus.Debug("Plugin active when liveRestore is set, skipping shutdown")
 			continue
 		}
 		if pm.containerdClient != nil && p.IsEnabled() {
-			p.Lock()
-			p.Restart = false
-			p.Unlock()
-			shutdownPlugin(p, pm.containerdClient)
+			c.restart = false
+			shutdownPlugin(p, c, pm.containerdClient)
 		}
 	}
 }

+ 2 - 2
plugin/manager_solaris.go

@@ -7,7 +7,7 @@ import (
 	specs "github.com/opencontainers/runtime-spec/specs-go"
 )
 
-func (pm *Manager) enable(p *v2.Plugin, force bool) error {
+func (pm *Manager) enable(p *v2.Plugin, c *controller, force bool) error {
 	return fmt.Errorf("Not implemented")
 }
 
@@ -15,7 +15,7 @@ func (pm *Manager) initSpec(p *v2.Plugin) (*specs.Spec, error) {
 	return nil, fmt.Errorf("Not implemented")
 }
 
-func (pm *Manager) disable(p *v2.Plugin) error {
+func (pm *Manager) disable(p *v2.Plugin, c *controller) error {
 	return fmt.Errorf("Not implemented")
 }
 

+ 2 - 2
plugin/manager_windows.go

@@ -9,7 +9,7 @@ import (
 	specs "github.com/opencontainers/runtime-spec/specs-go"
 )
 
-func (pm *Manager) enable(p *v2.Plugin, force bool) error {
+func (pm *Manager) enable(p *v2.Plugin, c *controller, force bool) error {
 	return fmt.Errorf("Not implemented")
 }
 
@@ -17,7 +17,7 @@ func (pm *Manager) initSpec(p *v2.Plugin) (*specs.Spec, error) {
 	return nil, fmt.Errorf("Not implemented")
 }
 
-func (pm *Manager) disable(p *v2.Plugin) error {
+func (pm *Manager) disable(p *v2.Plugin, c *controller) error {
 	return fmt.Errorf("Not implemented")
 }
 

+ 1 - 3
plugin/store/store.go

@@ -174,9 +174,7 @@ func (ps *Store) Get(name, capability string, mode int) (plugingetter.CompatPlug
 		}
 		p, err = ps.GetByName(fullName)
 		if err == nil {
-			p.Lock()
-			p.RefCount += mode
-			p.Unlock()
+			p.SetRefCount(mode + p.GetRefCount())
 			if p.IsEnabled() {
 				return p.FilterByCap(capability)
 			}

+ 64 - 26
plugin/v2/plugin.go

@@ -17,15 +17,12 @@ import (
 
 // Plugin represents an individual plugin.
 type Plugin struct {
-	sync.RWMutex
-	PluginObj         types.Plugin    `json:"plugin"`
-	PClient           *plugins.Client `json:"-"`
-	RuntimeSourcePath string          `json:"-"`
-	RefCount          int             `json:"-"`
-	Restart           bool            `json:"-"`
-	ExitChan          chan bool       `json:"-"`
-	LibRoot           string          `json:"-"`
-	TimeoutInSecs     int             `json:"-"`
+	mu                sync.RWMutex
+	PluginObj         types.Plugin `json:"plugin"`
+	pClient           *plugins.Client
+	runtimeSourcePath string
+	refCount          int
+	libRoot           string
 }
 
 const defaultPluginRuntimeDestination = "/run/docker/plugins"
@@ -47,14 +44,39 @@ func newPluginObj(name, id, tag string) types.Plugin {
 func NewPlugin(name, id, runRoot, libRoot, tag string) *Plugin {
 	return &Plugin{
 		PluginObj:         newPluginObj(name, id, tag),
-		RuntimeSourcePath: filepath.Join(runRoot, id),
-		LibRoot:           libRoot,
+		runtimeSourcePath: filepath.Join(runRoot, id),
+		libRoot:           libRoot,
 	}
 }
 
+// Restore restores the plugin
+func (p *Plugin) Restore(runRoot string) {
+	p.runtimeSourcePath = filepath.Join(runRoot, p.GetID())
+}
+
+// GetRuntimeSourcePath gets the Source (host) path of the plugin socket
+// This path gets bind mounted into the plugin.
+func (p *Plugin) GetRuntimeSourcePath() string {
+	p.mu.RLock()
+	defer p.mu.RUnlock()
+
+	return p.runtimeSourcePath
+}
+
 // Client returns the plugin client.
 func (p *Plugin) Client() *plugins.Client {
-	return p.PClient
+	p.mu.RLock()
+	defer p.mu.RUnlock()
+
+	return p.pClient
+}
+
+// SetPClient set the plugin client.
+func (p *Plugin) SetPClient(client *plugins.Client) {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+
+	p.pClient = client
 }
 
 // IsV1 returns true for V1 plugins and false otherwise.
@@ -85,12 +107,12 @@ func (p *Plugin) FilterByCap(capability string) (*Plugin, error) {
 
 // RemoveFromDisk deletes the plugin's runtime files from disk.
 func (p *Plugin) RemoveFromDisk() error {
-	return os.RemoveAll(p.RuntimeSourcePath)
+	return os.RemoveAll(p.runtimeSourcePath)
 }
 
 // InitPlugin populates the plugin object from the plugin config file.
 func (p *Plugin) InitPlugin() error {
-	dt, err := os.Open(filepath.Join(p.LibRoot, p.PluginObj.ID, "config.json"))
+	dt, err := os.Open(filepath.Join(p.libRoot, p.PluginObj.ID, "config.json"))
 	if err != nil {
 		return err
 	}
@@ -118,7 +140,7 @@ func (p *Plugin) InitPlugin() error {
 }
 
 func (p *Plugin) writeSettings() error {
-	f, err := os.Create(filepath.Join(p.LibRoot, p.PluginObj.ID, "plugin-settings.json"))
+	f, err := os.Create(filepath.Join(p.libRoot, p.PluginObj.ID, "plugin-settings.json"))
 	if err != nil {
 		return err
 	}
@@ -129,8 +151,8 @@ func (p *Plugin) writeSettings() error {
 
 // Set is used to pass arguments to the plugin.
 func (p *Plugin) Set(args []string) error {
-	p.Lock()
-	defer p.Unlock()
+	p.mu.Lock()
+	defer p.mu.Unlock()
 
 	if p.PluginObj.Enabled {
 		return fmt.Errorf("cannot set on an active plugin, disable plugin before setting")
@@ -218,36 +240,52 @@ next:
 
 // IsEnabled returns the active state of the plugin.
 func (p *Plugin) IsEnabled() bool {
-	p.RLock()
-	defer p.RUnlock()
+	p.mu.RLock()
+	defer p.mu.RUnlock()
 
 	return p.PluginObj.Enabled
 }
 
 // GetID returns the plugin's ID.
 func (p *Plugin) GetID() string {
-	p.RLock()
-	defer p.RUnlock()
+	p.mu.RLock()
+	defer p.mu.RUnlock()
 
 	return p.PluginObj.ID
 }
 
 // GetSocket returns the plugin socket.
 func (p *Plugin) GetSocket() string {
-	p.RLock()
-	defer p.RUnlock()
+	p.mu.RLock()
+	defer p.mu.RUnlock()
 
 	return p.PluginObj.Config.Interface.Socket
 }
 
 // GetTypes returns the interface types of a plugin.
 func (p *Plugin) GetTypes() []types.PluginInterfaceType {
-	p.RLock()
-	defer p.RUnlock()
+	p.mu.RLock()
+	defer p.mu.RUnlock()
 
 	return p.PluginObj.Config.Interface.Types
 }
 
+// GetRefCount returns the reference count.
+func (p *Plugin) GetRefCount() int {
+	p.mu.RLock()
+	defer p.mu.RUnlock()
+
+	return p.refCount
+}
+
+// SetRefCount sets the reference count.
+func (p *Plugin) SetRefCount(count int) {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+
+	p.refCount = count
+}
+
 // InitSpec creates an OCI spec from the plugin's config.
 func (p *Plugin) InitSpec(s specs.Spec, libRoot string) (*specs.Spec, error) {
 	rootfs := filepath.Join(libRoot, p.PluginObj.ID, "rootfs")
@@ -262,7 +300,7 @@ func (p *Plugin) InitSpec(s specs.Spec, libRoot string) (*specs.Spec, error) {
 	}
 
 	mounts := append(p.PluginObj.Config.Mounts, types.PluginMount{
-		Source:      &p.RuntimeSourcePath,
+		Source:      &p.runtimeSourcePath,
 		Destination: defaultPluginRuntimeDestination,
 		Type:        "bind",
 		Options:     []string{"rbind", "rshared"},