Merge pull request #23580 from cpuguy83/plugin_restore

Fix removing plugins and Implement plugin restore after daemon restart
This commit is contained in:
Tibor Vass 2016-06-16 18:54:57 -07:00 committed by GitHub
commit f522ed9752
5 changed files with 94 additions and 50 deletions

View file

@ -10,5 +10,5 @@ import (
) )
func pluginInit(config *daemon.Config, remote libcontainerd.Remote, rs registry.Service) error { func pluginInit(config *daemon.Config, remote libcontainerd.Remote, rs registry.Service) error {
return plugin.Init(config.Root, config.ExecRoot, remote, rs) return plugin.Init(config.Root, config.ExecRoot, remote, rs, config.LiveRestore)
} }

View file

@ -40,7 +40,7 @@ func (pm *Manager) Inspect(name string) (tp types.Plugin, err error) {
if err != nil { if err != nil {
return tp, err return tp, err
} }
return p.p, nil return p.P, nil
} }
// Pull pulls a plugin and enables it. // Pull pulls a plugin and enables it.
@ -76,10 +76,6 @@ func (pm *Manager) Pull(name string, metaHeader http.Header, authConfig *types.A
} }
p := pm.newPlugin(ref, pluginID) p := pm.newPlugin(ref, pluginID)
if ref, ok := ref.(reference.NamedTagged); ok {
p.p.Tag = ref.Tag()
}
if err := pm.initPlugin(p); err != nil { if err := pm.initPlugin(p); err != nil {
return nil, err return nil, err
} }
@ -90,14 +86,14 @@ func (pm *Manager) Pull(name string, metaHeader http.Header, authConfig *types.A
pm.save() pm.save()
pm.Unlock() pm.Unlock()
return computePrivileges(&p.p.Manifest), nil return computePrivileges(&p.P.Manifest), nil
} }
// List displays the list of plugins and associated metadata. // List displays the list of plugins and associated metadata.
func (pm *Manager) List() ([]types.Plugin, error) { func (pm *Manager) List() ([]types.Plugin, error) {
out := make([]types.Plugin, 0, len(pm.plugins)) out := make([]types.Plugin, 0, len(pm.plugins))
for _, p := range pm.plugins { for _, p := range pm.plugins {
out = append(out, p.p) out = append(out, p.P)
} }
return out, nil return out, nil
} }
@ -105,7 +101,7 @@ func (pm *Manager) List() ([]types.Plugin, error) {
// Push pushes a plugin to the store. // Push pushes a plugin to the store.
func (pm *Manager) Push(name string, metaHeader http.Header, authConfig *types.AuthConfig) error { func (pm *Manager) Push(name string, metaHeader http.Header, authConfig *types.AuthConfig) error {
p, err := pm.get(name) p, err := pm.get(name)
dest := filepath.Join(pm.libRoot, p.p.ID) dest := filepath.Join(pm.libRoot, p.P.ID)
config, err := os.Open(filepath.Join(dest, "manifest.json")) config, err := os.Open(filepath.Join(dest, "manifest.json"))
if err != nil { if err != nil {
return err return err

View file

@ -46,7 +46,7 @@ func (e ErrInadequateCapability) Error() string {
type plugin struct { type plugin struct {
//sync.RWMutex TODO //sync.RWMutex TODO
p types.Plugin P types.Plugin `json:"plugin"`
client *plugins.Client client *plugins.Client
restartManager restartmanager.RestartManager restartManager restartmanager.RestartManager
stateSourcePath string stateSourcePath string
@ -58,12 +58,17 @@ func (p *plugin) Client() *plugins.Client {
} }
func (p *plugin) Name() string { func (p *plugin) Name() string {
return p.p.Name name := p.P.Name
if len(p.P.Tag) > 0 {
// TODO: this feels hacky, maybe we should be storing the distribution reference rather than splitting these
name += ":" + p.P.Tag
}
return name
} }
func (pm *Manager) newPlugin(ref reference.Named, id string) *plugin { func (pm *Manager) newPlugin(ref reference.Named, id string) *plugin {
p := &plugin{ p := &plugin{
p: types.Plugin{ P: types.Plugin{
Name: ref.Name(), Name: ref.Name(),
ID: id, ID: id,
}, },
@ -71,12 +76,20 @@ func (pm *Manager) newPlugin(ref reference.Named, id string) *plugin {
runtimeSourcePath: filepath.Join(pm.runRoot, id), runtimeSourcePath: filepath.Join(pm.runRoot, id),
} }
if ref, ok := ref.(reference.NamedTagged); ok { if ref, ok := ref.(reference.NamedTagged); ok {
p.p.Tag = ref.Tag() p.P.Tag = ref.Tag()
} }
return p return p
} }
// TODO: figure out why save() doesn't json encode *plugin object func (pm *Manager) restorePlugin(p *plugin) error {
p.stateSourcePath = filepath.Join(pm.libRoot, p.P.ID, "state")
p.runtimeSourcePath = filepath.Join(pm.runRoot, p.P.ID)
if p.P.Active {
return pm.restore(p)
}
return nil
}
type pluginMap map[string]*plugin type pluginMap map[string]*plugin
// Manager controls the plugin subsystem. // Manager controls the plugin subsystem.
@ -90,6 +103,7 @@ type Manager struct {
containerdClient libcontainerd.Client containerdClient libcontainerd.Client
registryService registry.Service registryService registry.Service
handleLegacy bool handleLegacy bool
liveRestore bool
} }
// GetManager returns the singleton plugin Manager // GetManager returns the singleton plugin Manager
@ -99,7 +113,7 @@ func GetManager() *Manager {
// Init (was NewManager) instantiates the singleton Manager. // Init (was NewManager) instantiates the singleton Manager.
// TODO: revert this to NewManager once we get rid of all the singletons. // TODO: revert this to NewManager once we get rid of all the singletons.
func Init(root, execRoot string, remote libcontainerd.Remote, rs registry.Service) (err error) { func Init(root, execRoot string, remote libcontainerd.Remote, rs registry.Service, liveRestore bool) (err error) {
if manager != nil { if manager != nil {
return nil return nil
} }
@ -120,17 +134,18 @@ func Init(root, execRoot string, remote libcontainerd.Remote, rs registry.Servic
handlers: make(map[string]func(string, *plugins.Client)), handlers: make(map[string]func(string, *plugins.Client)),
registryService: rs, registryService: rs,
handleLegacy: true, handleLegacy: true,
liveRestore: liveRestore,
} }
if err := os.MkdirAll(manager.runRoot, 0700); err != nil { if err := os.MkdirAll(manager.runRoot, 0700); err != nil {
return err return err
} }
if err := manager.init(); err != nil {
return err
}
manager.containerdClient, err = remote.Client(manager) manager.containerdClient, err = remote.Client(manager)
if err != nil { if err != nil {
return err return err
} }
if err := manager.init(); err != nil {
return err
}
return nil return nil
} }
@ -165,7 +180,7 @@ func FindWithCapability(capability string) ([]Plugin, error) {
defer manager.RUnlock() defer manager.RUnlock()
pluginLoop: pluginLoop:
for _, p := range manager.plugins { for _, p := range manager.plugins {
for _, typ := range p.p.Manifest.Interface.Types { for _, typ := range p.P.Manifest.Interface.Types {
if typ.Capability != capability || typ.Prefix != "docker" { if typ.Capability != capability || typ.Prefix != "docker" {
continue pluginLoop continue pluginLoop
} }
@ -216,7 +231,7 @@ func LookupWithCapability(name, capability string) (Plugin, error) {
} }
capability = strings.ToLower(capability) capability = strings.ToLower(capability)
for _, typ := range p.p.Manifest.Interface.Types { for _, typ := range p.P.Manifest.Interface.Types {
if typ.Capability == capability && typ.Prefix == "docker" { if typ.Capability == capability && typ.Prefix == "docker" {
return p, nil return p, nil
} }
@ -257,55 +272,79 @@ func (pm *Manager) init() error {
} }
return err return err
} }
// TODO: Populate pm.plugins
if err := json.NewDecoder(dt).Decode(&pm.nameToID); err != nil { if err := json.NewDecoder(dt).Decode(&pm.plugins); err != nil {
return err return err
} }
// FIXME: validate, restore
return nil var group sync.WaitGroup
group.Add(len(pm.plugins))
for _, p := range pm.plugins {
go func(p *plugin) {
defer group.Done()
if err := pm.restorePlugin(p); err != nil {
logrus.Errorf("Error restoring plugin '%s': %s", p.Name(), err)
return
}
pm.Lock()
pm.nameToID[p.Name()] = p.P.ID
requiresManualRestore := !pm.liveRestore && p.P.Active
pm.Unlock()
if requiresManualRestore {
// if liveRestore is not enabled, the plugin will be stopped now so we should enable it
if err := pm.enable(p); err != nil {
logrus.Errorf("Error restoring plugin '%s': %s", p.Name(), err)
}
}
}(p)
group.Wait()
}
return pm.save()
} }
func (pm *Manager) initPlugin(p *plugin) error { func (pm *Manager) initPlugin(p *plugin) error {
dt, err := os.Open(filepath.Join(pm.libRoot, p.p.ID, "manifest.json")) dt, err := os.Open(filepath.Join(pm.libRoot, p.P.ID, "manifest.json"))
if err != nil { if err != nil {
return err return err
} }
err = json.NewDecoder(dt).Decode(&p.p.Manifest) err = json.NewDecoder(dt).Decode(&p.P.Manifest)
dt.Close() dt.Close()
if err != nil { if err != nil {
return err return err
} }
p.p.Config.Mounts = make([]types.PluginMount, len(p.p.Manifest.Mounts)) p.P.Config.Mounts = make([]types.PluginMount, len(p.P.Manifest.Mounts))
for i, mount := range p.p.Manifest.Mounts { for i, mount := range p.P.Manifest.Mounts {
p.p.Config.Mounts[i] = mount p.P.Config.Mounts[i] = mount
} }
p.p.Config.Env = make([]string, 0, len(p.p.Manifest.Env)) p.P.Config.Env = make([]string, 0, len(p.P.Manifest.Env))
for _, env := range p.p.Manifest.Env { for _, env := range p.P.Manifest.Env {
if env.Value != nil { if env.Value != nil {
p.p.Config.Env = append(p.p.Config.Env, fmt.Sprintf("%s=%s", env.Name, *env.Value)) p.P.Config.Env = append(p.P.Config.Env, fmt.Sprintf("%s=%s", env.Name, *env.Value))
} }
} }
copy(p.p.Config.Args, p.p.Manifest.Args.Value) copy(p.P.Config.Args, p.P.Manifest.Args.Value)
f, err := os.Create(filepath.Join(pm.libRoot, p.p.ID, "plugin-config.json")) f, err := os.Create(filepath.Join(pm.libRoot, p.P.ID, "plugin-config.json"))
if err != nil { if err != nil {
return err return err
} }
err = json.NewEncoder(f).Encode(&p.p.Config) err = json.NewEncoder(f).Encode(&p.P.Config)
f.Close() f.Close()
return err return err
} }
func (pm *Manager) remove(p *plugin) error { func (pm *Manager) remove(p *plugin) error {
if p.p.Active { if p.P.Active {
return fmt.Errorf("plugin %s is active", p.p.Name) return fmt.Errorf("plugin %s is active", p.Name())
} }
pm.Lock() // fixme: lock single record pm.Lock() // fixme: lock single record
defer pm.Unlock() defer pm.Unlock()
os.RemoveAll(p.stateSourcePath) os.RemoveAll(p.stateSourcePath)
delete(pm.plugins, p.p.Name) delete(pm.plugins, p.P.ID)
delete(pm.nameToID, p.Name())
pm.save() pm.save()
return nil return nil
} }
@ -326,7 +365,7 @@ func (pm *Manager) set(p *plugin, args []string) error {
func (pm *Manager) save() error { func (pm *Manager) save() error {
filePath := filepath.Join(pm.libRoot, "plugins.json") filePath := filepath.Join(pm.libRoot, "plugins.json")
jsonData, err := json.Marshal(pm.nameToID) jsonData, err := json.Marshal(pm.plugins)
if err != nil { if err != nil {
logrus.Debugf("Error in json.Marshal: %v", err) logrus.Debugf("Error in json.Marshal: %v", err)
return err return err

View file

@ -25,11 +25,11 @@ func (pm *Manager) enable(p *plugin) error {
} }
p.restartManager = restartmanager.New(container.RestartPolicy{Name: "always"}, 0) p.restartManager = restartmanager.New(container.RestartPolicy{Name: "always"}, 0)
if err := pm.containerdClient.Create(p.p.ID, libcontainerd.Spec(*spec), libcontainerd.WithRestartManager(p.restartManager)); err != nil { // POC-only if err := pm.containerdClient.Create(p.P.ID, libcontainerd.Spec(*spec), libcontainerd.WithRestartManager(p.restartManager)); err != nil { // POC-only
return err return err
} }
socket := p.p.Manifest.Interface.Socket socket := p.P.Manifest.Interface.Socket
p.client, err = plugins.NewClient("unix://"+filepath.Join(p.runtimeSourcePath, socket), nil) p.client, err = plugins.NewClient("unix://"+filepath.Join(p.runtimeSourcePath, socket), nil)
if err != nil { if err != nil {
return err return err
@ -38,11 +38,11 @@ func (pm *Manager) enable(p *plugin) error {
//TODO: check net.Dial //TODO: check net.Dial
pm.Lock() // fixme: lock single record pm.Lock() // fixme: lock single record
p.p.Active = true p.P.Active = true
pm.save() pm.save()
pm.Unlock() pm.Unlock()
for _, typ := range p.p.Manifest.Interface.Types { for _, typ := range p.P.Manifest.Interface.Types {
if handler := pm.handlers[typ.String()]; handler != nil { if handler := pm.handlers[typ.String()]; handler != nil {
handler(p.Name(), p.Client()) handler(p.Name(), p.Client())
} }
@ -51,16 +51,21 @@ func (pm *Manager) enable(p *plugin) error {
return nil return nil
} }
func (pm *Manager) restore(p *plugin) error {
p.restartManager = restartmanager.New(container.RestartPolicy{Name: "always"}, 0)
return pm.containerdClient.Restore(p.P.ID, libcontainerd.WithRestartManager(p.restartManager))
}
func (pm *Manager) initSpec(p *plugin) (*specs.Spec, error) { func (pm *Manager) initSpec(p *plugin) (*specs.Spec, error) {
s := oci.DefaultSpec() s := oci.DefaultSpec()
rootfs := filepath.Join(pm.libRoot, p.p.ID, "rootfs") rootfs := filepath.Join(pm.libRoot, p.P.ID, "rootfs")
s.Root = specs.Root{ s.Root = specs.Root{
Path: rootfs, Path: rootfs,
Readonly: false, // TODO: all plugins should be readonly? settable in manifest? Readonly: false, // TODO: all plugins should be readonly? settable in manifest?
} }
mounts := append(p.p.Config.Mounts, types.PluginMount{ mounts := append(p.P.Config.Mounts, types.PluginMount{
Source: &p.runtimeSourcePath, Source: &p.runtimeSourcePath,
Destination: defaultPluginRuntimeDestination, Destination: defaultPluginRuntimeDestination,
Type: "bind", Type: "bind",
@ -95,11 +100,11 @@ func (pm *Manager) initSpec(p *plugin) (*specs.Spec, error) {
s.Mounts = append(s.Mounts, m) s.Mounts = append(s.Mounts, m)
} }
envs := make([]string, 1, len(p.p.Config.Env)+1) envs := make([]string, 1, len(p.P.Config.Env)+1)
envs[0] = "PATH=" + system.DefaultPathEnv envs[0] = "PATH=" + system.DefaultPathEnv
envs = append(envs, p.p.Config.Env...) envs = append(envs, p.P.Config.Env...)
args := append(p.p.Manifest.Entrypoint, p.p.Config.Args...) args := append(p.P.Manifest.Entrypoint, p.P.Config.Args...)
s.Process = specs.Process{ s.Process = specs.Process{
Terminal: false, Terminal: false,
Args: args, Args: args,
@ -114,13 +119,13 @@ func (pm *Manager) disable(p *plugin) error {
if err := p.restartManager.Cancel(); err != nil { if err := p.restartManager.Cancel(); err != nil {
logrus.Error(err) logrus.Error(err)
} }
if err := pm.containerdClient.Signal(p.p.ID, int(syscall.SIGKILL)); err != nil { if err := pm.containerdClient.Signal(p.P.ID, int(syscall.SIGKILL)); err != nil {
logrus.Error(err) logrus.Error(err)
} }
os.RemoveAll(p.runtimeSourcePath) os.RemoveAll(p.runtimeSourcePath)
pm.Lock() // fixme: lock single record pm.Lock() // fixme: lock single record
defer pm.Unlock() defer pm.Unlock()
p.p.Active = false p.P.Active = false
pm.save() pm.save()
return nil return nil
} }

View file

@ -19,3 +19,7 @@ func (pm *Manager) initSpec(p *plugin) (*specs.Spec, error) {
func (pm *Manager) disable(p *plugin) error { func (pm *Manager) disable(p *plugin) error {
return fmt.Errorf("Not implemented") return fmt.Errorf("Not implemented")
} }
func (pm *Manager) restore(p *plugin) error {
return fmt.Errorf("Not implemented")
}