Make v2/Plugin accesses safe.

v2/Plugin struct had fields that were
- purely used by the manager.
- unsafely exposed without proper locking.
This change fixes this, by moving relevant fields to the manager as well
as making remaining fields as private and providing proper accessors for
them.

Signed-off-by: Anusha Ragunathan <anusha@docker.com>
(cherry picked from commit b35490a8ba)
Signed-off-by: Victor Vieux <vieux@docker.com>
This commit is contained in:
Anusha Ragunathan 2016-12-01 11:36:56 -08:00 committed by Victor Vieux
parent f081b22a4a
commit 213ee20706
7 changed files with 133 additions and 74 deletions

View file

@ -37,7 +37,11 @@ func (pm *Manager) Disable(name string) error {
if err != nil {
return err
}
if err := pm.disable(p); err != nil {
pm.mu.RLock()
c := pm.cMap[p]
pm.mu.RUnlock()
if err := pm.disable(p, c); err != nil {
return err
}
pm.pluginEventLogger(p.GetID(), name, "disable")
@ -46,14 +50,13 @@ func (pm *Manager) Disable(name string) error {
// Enable activates a plugin, which implies that they are ready to be used by containers.
func (pm *Manager) Enable(name string, config *types.PluginEnableConfig) error {
p, err := pm.pluginStore.GetByName(name)
if err != nil {
return err
}
p.TimeoutInSecs = config.Timeout
if err := pm.enable(p, false); err != nil {
c := &controller{timeoutInSecs: config.Timeout}
if err := pm.enable(p, c, false); err != nil {
return err
}
pm.pluginEventLogger(p.GetID(), name, "enable")
@ -267,25 +270,25 @@ func (pm *Manager) Push(name string, metaHeader http.Header, authConfig *types.A
// Remove deletes plugin's root directory.
func (pm *Manager) Remove(name string, config *types.PluginRmConfig) error {
p, err := pm.pluginStore.GetByName(name)
pm.mu.RLock()
c := pm.cMap[p]
pm.mu.RUnlock()
if err != nil {
return err
}
if !config.ForceRemove {
p.RLock()
if p.RefCount > 0 {
p.RUnlock()
if p.GetRefCount() > 0 {
return fmt.Errorf("plugin %s is in use", p.Name())
}
p.RUnlock()
if p.IsEnabled() {
return fmt.Errorf("plugin %s is enabled", p.Name())
}
}
if p.IsEnabled() {
if err := pm.disable(p); err != nil {
if err := pm.disable(p, c); err != nil {
logrus.Errorf("failed to disable plugin '%s': %s", p.Name(), err)
}
}

View file

@ -19,7 +19,7 @@ var (
)
func (pm *Manager) restorePlugin(p *v2.Plugin) error {
p.RuntimeSourcePath = filepath.Join(pm.runRoot, p.GetID())
p.Restore(pm.runRoot)
if p.IsEnabled() {
return pm.restore(p)
}
@ -37,6 +37,15 @@ type Manager struct {
registryService registry.Service
liveRestore bool
pluginEventLogger eventLogger
mu sync.RWMutex // protects cMap
cMap map[*v2.Plugin]*controller
}
// controller represents the manager's control on a plugin.
type controller struct {
restart bool
exitChan chan bool
timeoutInSecs int
}
// GetManager returns the singleton plugin Manager
@ -67,7 +76,8 @@ func Init(root string, ps *store.Store, remote libcontainerd.Remote, rs registry
if err != nil {
return err
}
if err := manager.init(); err != nil {
manager.cMap = make(map[*v2.Plugin]*controller)
if err := manager.reload(); err != nil {
return err
}
return nil
@ -83,22 +93,27 @@ func (pm *Manager) StateChanged(id string, e libcontainerd.StateInfo) error {
if err != nil {
return err
}
p.RLock()
if p.ExitChan != nil {
close(p.ExitChan)
pm.mu.RLock()
c := pm.cMap[p]
if c.exitChan != nil {
close(c.exitChan)
}
restart := p.Restart
p.RUnlock()
restart := c.restart
pm.mu.RUnlock()
p.RemoveFromDisk()
if restart {
pm.enable(p, true)
pm.enable(p, c, true)
}
}
return nil
}
func (pm *Manager) init() error {
// reload is used on daemon restarts to load the manager's state
func (pm *Manager) reload() error {
dt, err := os.Open(filepath.Join(pm.libRoot, "plugins.json"))
if err != nil {
if os.IsNotExist(err) {
@ -117,6 +132,8 @@ func (pm *Manager) init() error {
var group sync.WaitGroup
group.Add(len(plugins))
for _, p := range plugins {
c := &controller{}
pm.cMap[p] = c
go func(p *v2.Plugin) {
defer group.Done()
if err := pm.restorePlugin(p); err != nil {
@ -129,7 +146,7 @@ func (pm *Manager) init() error {
if requiresManualRestore {
// if liveRestore is not enabled, the plugin will be stopped now so we should enable it
if err := pm.enable(p, true); err != nil {
if err := pm.enable(p, c, true); err != nil {
logrus.Errorf("failed to enable plugin '%s': %s", p.Name(), err)
}
}

View file

@ -16,7 +16,7 @@ import (
specs "github.com/opencontainers/runtime-spec/specs-go"
)
func (pm *Manager) enable(p *v2.Plugin, force bool) error {
func (pm *Manager) enable(p *v2.Plugin, c *controller, force bool) error {
if p.IsEnabled() && !force {
return fmt.Errorf("plugin %s is already enabled", p.Name())
}
@ -24,23 +24,26 @@ func (pm *Manager) enable(p *v2.Plugin, force bool) error {
if err != nil {
return err
}
p.Lock()
p.Restart = true
p.ExitChan = make(chan bool)
p.Unlock()
c.restart = true
c.exitChan = make(chan bool)
pm.mu.Lock()
pm.cMap[p] = c
pm.mu.Unlock()
if err := pm.containerdClient.Create(p.GetID(), "", "", specs.Spec(*spec), attachToLog(p.GetID())); err != nil {
return err
}
p.PClient, err = plugins.NewClientWithTimeout("unix://"+filepath.Join(p.RuntimeSourcePath, p.GetSocket()), nil, p.TimeoutInSecs)
client, err := plugins.NewClientWithTimeout("unix://"+filepath.Join(p.GetRuntimeSourcePath(), p.GetSocket()), nil, c.timeoutInSecs)
if err != nil {
p.Lock()
p.Restart = false
p.Unlock()
shutdownPlugin(p, pm.containerdClient)
c.restart = false
shutdownPlugin(p, c, pm.containerdClient)
return err
}
p.SetPClient(client)
pm.pluginStore.SetState(p, true)
pm.pluginStore.CallHandler(p)
@ -51,7 +54,7 @@ func (pm *Manager) restore(p *v2.Plugin) error {
return pm.containerdClient.Restore(p.GetID(), attachToLog(p.GetID()))
}
func shutdownPlugin(p *v2.Plugin, containerdClient libcontainerd.Client) {
func shutdownPlugin(p *v2.Plugin, c *controller, containerdClient libcontainerd.Client) {
pluginID := p.GetID()
err := containerdClient.Signal(pluginID, int(syscall.SIGTERM))
@ -59,7 +62,7 @@ func shutdownPlugin(p *v2.Plugin, containerdClient libcontainerd.Client) {
logrus.Errorf("Sending SIGTERM to plugin failed with error: %v", err)
} else {
select {
case <-p.ExitChan:
case <-c.exitChan:
logrus.Debug("Clean shutdown of plugin")
case <-time.After(time.Second * 10):
logrus.Debug("Force shutdown plugin")
@ -70,15 +73,13 @@ func shutdownPlugin(p *v2.Plugin, containerdClient libcontainerd.Client) {
}
}
func (pm *Manager) disable(p *v2.Plugin) error {
func (pm *Manager) disable(p *v2.Plugin, c *controller) error {
if !p.IsEnabled() {
return fmt.Errorf("plugin %s is already disabled", p.Name())
}
p.Lock()
p.Restart = false
p.Unlock()
shutdownPlugin(p, pm.containerdClient)
c.restart = false
shutdownPlugin(p, c, pm.containerdClient)
pm.pluginStore.SetState(p, false)
return nil
}
@ -87,15 +88,17 @@ func (pm *Manager) disable(p *v2.Plugin) error {
func (pm *Manager) Shutdown() {
plugins := pm.pluginStore.GetAll()
for _, p := range plugins {
pm.mu.RLock()
c := pm.cMap[p]
pm.mu.RUnlock()
if pm.liveRestore && p.IsEnabled() {
logrus.Debug("Plugin active when liveRestore is set, skipping shutdown")
continue
}
if pm.containerdClient != nil && p.IsEnabled() {
p.Lock()
p.Restart = false
p.Unlock()
shutdownPlugin(p, pm.containerdClient)
c.restart = false
shutdownPlugin(p, c, pm.containerdClient)
}
}
}

View file

@ -7,7 +7,7 @@ import (
specs "github.com/opencontainers/runtime-spec/specs-go"
)
func (pm *Manager) enable(p *v2.Plugin, force bool) error {
func (pm *Manager) enable(p *v2.Plugin, c *controller, force bool) error {
return fmt.Errorf("Not implemented")
}
@ -15,7 +15,7 @@ func (pm *Manager) initSpec(p *v2.Plugin) (*specs.Spec, error) {
return nil, fmt.Errorf("Not implemented")
}
func (pm *Manager) disable(p *v2.Plugin) error {
func (pm *Manager) disable(p *v2.Plugin, c *controller) error {
return fmt.Errorf("Not implemented")
}

View file

@ -9,7 +9,7 @@ import (
specs "github.com/opencontainers/runtime-spec/specs-go"
)
func (pm *Manager) enable(p *v2.Plugin, force bool) error {
func (pm *Manager) enable(p *v2.Plugin, c *controller, force bool) error {
return fmt.Errorf("Not implemented")
}
@ -17,7 +17,7 @@ func (pm *Manager) initSpec(p *v2.Plugin) (*specs.Spec, error) {
return nil, fmt.Errorf("Not implemented")
}
func (pm *Manager) disable(p *v2.Plugin) error {
func (pm *Manager) disable(p *v2.Plugin, c *controller) error {
return fmt.Errorf("Not implemented")
}

View file

@ -174,9 +174,7 @@ func (ps *Store) Get(name, capability string, mode int) (plugingetter.CompatPlug
}
p, err = ps.GetByName(fullName)
if err == nil {
p.Lock()
p.RefCount += mode
p.Unlock()
p.SetRefCount(mode + p.GetRefCount())
if p.IsEnabled() {
return p.FilterByCap(capability)
}

View file

@ -17,15 +17,12 @@ import (
// Plugin represents an individual plugin.
type Plugin struct {
sync.RWMutex
PluginObj types.Plugin `json:"plugin"`
PClient *plugins.Client `json:"-"`
RuntimeSourcePath string `json:"-"`
RefCount int `json:"-"`
Restart bool `json:"-"`
ExitChan chan bool `json:"-"`
LibRoot string `json:"-"`
TimeoutInSecs int `json:"-"`
mu sync.RWMutex
PluginObj types.Plugin `json:"plugin"`
pClient *plugins.Client
runtimeSourcePath string
refCount int
libRoot string
}
const defaultPluginRuntimeDestination = "/run/docker/plugins"
@ -47,14 +44,39 @@ func newPluginObj(name, id, tag string) types.Plugin {
func NewPlugin(name, id, runRoot, libRoot, tag string) *Plugin {
return &Plugin{
PluginObj: newPluginObj(name, id, tag),
RuntimeSourcePath: filepath.Join(runRoot, id),
LibRoot: libRoot,
runtimeSourcePath: filepath.Join(runRoot, id),
libRoot: libRoot,
}
}
// Restore restores the plugin
func (p *Plugin) Restore(runRoot string) {
p.runtimeSourcePath = filepath.Join(runRoot, p.GetID())
}
// GetRuntimeSourcePath gets the Source (host) path of the plugin socket
// This path gets bind mounted into the plugin.
func (p *Plugin) GetRuntimeSourcePath() string {
p.mu.RLock()
defer p.mu.RUnlock()
return p.runtimeSourcePath
}
// Client returns the plugin client.
func (p *Plugin) Client() *plugins.Client {
return p.PClient
p.mu.RLock()
defer p.mu.RUnlock()
return p.pClient
}
// SetPClient set the plugin client.
func (p *Plugin) SetPClient(client *plugins.Client) {
p.mu.Lock()
defer p.mu.Unlock()
p.pClient = client
}
// IsV1 returns true for V1 plugins and false otherwise.
@ -85,12 +107,12 @@ func (p *Plugin) FilterByCap(capability string) (*Plugin, error) {
// RemoveFromDisk deletes the plugin's runtime files from disk.
func (p *Plugin) RemoveFromDisk() error {
return os.RemoveAll(p.RuntimeSourcePath)
return os.RemoveAll(p.runtimeSourcePath)
}
// InitPlugin populates the plugin object from the plugin config file.
func (p *Plugin) InitPlugin() error {
dt, err := os.Open(filepath.Join(p.LibRoot, p.PluginObj.ID, "config.json"))
dt, err := os.Open(filepath.Join(p.libRoot, p.PluginObj.ID, "config.json"))
if err != nil {
return err
}
@ -118,7 +140,7 @@ func (p *Plugin) InitPlugin() error {
}
func (p *Plugin) writeSettings() error {
f, err := os.Create(filepath.Join(p.LibRoot, p.PluginObj.ID, "plugin-settings.json"))
f, err := os.Create(filepath.Join(p.libRoot, p.PluginObj.ID, "plugin-settings.json"))
if err != nil {
return err
}
@ -129,8 +151,8 @@ func (p *Plugin) writeSettings() error {
// Set is used to pass arguments to the plugin.
func (p *Plugin) Set(args []string) error {
p.Lock()
defer p.Unlock()
p.mu.Lock()
defer p.mu.Unlock()
if p.PluginObj.Enabled {
return fmt.Errorf("cannot set on an active plugin, disable plugin before setting")
@ -218,36 +240,52 @@ next:
// IsEnabled returns the active state of the plugin.
func (p *Plugin) IsEnabled() bool {
p.RLock()
defer p.RUnlock()
p.mu.RLock()
defer p.mu.RUnlock()
return p.PluginObj.Enabled
}
// GetID returns the plugin's ID.
func (p *Plugin) GetID() string {
p.RLock()
defer p.RUnlock()
p.mu.RLock()
defer p.mu.RUnlock()
return p.PluginObj.ID
}
// GetSocket returns the plugin socket.
func (p *Plugin) GetSocket() string {
p.RLock()
defer p.RUnlock()
p.mu.RLock()
defer p.mu.RUnlock()
return p.PluginObj.Config.Interface.Socket
}
// GetTypes returns the interface types of a plugin.
func (p *Plugin) GetTypes() []types.PluginInterfaceType {
p.RLock()
defer p.RUnlock()
p.mu.RLock()
defer p.mu.RUnlock()
return p.PluginObj.Config.Interface.Types
}
// GetRefCount returns the reference count.
func (p *Plugin) GetRefCount() int {
p.mu.RLock()
defer p.mu.RUnlock()
return p.refCount
}
// SetRefCount sets the reference count.
func (p *Plugin) SetRefCount(count int) {
p.mu.Lock()
defer p.mu.Unlock()
p.refCount = count
}
// InitSpec creates an OCI spec from the plugin's config.
func (p *Plugin) InitSpec(s specs.Spec, libRoot string) (*specs.Spec, error) {
rootfs := filepath.Join(libRoot, p.PluginObj.ID, "rootfs")
@ -262,7 +300,7 @@ func (p *Plugin) InitSpec(s specs.Spec, libRoot string) (*specs.Spec, error) {
}
mounts := append(p.PluginObj.Config.Mounts, types.PluginMount{
Source: &p.RuntimeSourcePath,
Source: &p.runtimeSourcePath,
Destination: defaultPluginRuntimeDestination,
Type: "bind",
Options: []string{"rbind", "rshared"},