Implement plugin restore after daemon restart

This ensures that:

- The in-memory plugin store is populated with all the plugins
- Plugins which were active before daemon restart are active after.
  This utilizes the liverestore feature when available, otherwise it
  manually starts the plugin.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
Brian Goff 2016-06-15 13:39:33 -04:00
parent 5e156fd3d4
commit dfd9187305
5 changed files with 89 additions and 47 deletions

View file

@ -10,5 +10,5 @@ import (
)
func pluginInit(config *daemon.Config, remote libcontainerd.Remote, rs registry.Service) error {
return plugin.Init(config.Root, config.ExecRoot, remote, rs)
return plugin.Init(config.Root, config.ExecRoot, remote, rs, config.LiveRestore)
}

View file

@ -40,7 +40,7 @@ func (pm *Manager) Inspect(name string) (tp types.Plugin, err error) {
if err != nil {
return tp, err
}
return p.p, nil
return p.P, nil
}
// Pull pulls a plugin and enables it.
@ -86,14 +86,14 @@ func (pm *Manager) Pull(name string, metaHeader http.Header, authConfig *types.A
pm.save()
pm.Unlock()
return computePrivileges(&p.p.Manifest), nil
return computePrivileges(&p.P.Manifest), nil
}
// List displays the list of plugins and associated metadata.
func (pm *Manager) List() ([]types.Plugin, error) {
out := make([]types.Plugin, 0, len(pm.plugins))
for _, p := range pm.plugins {
out = append(out, p.p)
out = append(out, p.P)
}
return out, nil
}
@ -101,7 +101,7 @@ func (pm *Manager) List() ([]types.Plugin, error) {
// Push pushes a plugin to the store.
func (pm *Manager) Push(name string, metaHeader http.Header, authConfig *types.AuthConfig) error {
p, err := pm.get(name)
dest := filepath.Join(pm.libRoot, p.p.ID)
dest := filepath.Join(pm.libRoot, p.P.ID)
config, err := os.Open(filepath.Join(dest, "manifest.json"))
if err != nil {
return err

View file

@ -46,7 +46,7 @@ func (e ErrInadequateCapability) Error() string {
type plugin struct {
//sync.RWMutex TODO
p types.Plugin
P types.Plugin `json:"plugin"`
client *plugins.Client
restartManager restartmanager.RestartManager
stateSourcePath string
@ -58,17 +58,17 @@ func (p *plugin) Client() *plugins.Client {
}
func (p *plugin) Name() string {
name := p.p.Name
if len(p.p.Tag) > 0 {
name := p.P.Name
if len(p.P.Tag) > 0 {
// TODO: this feels hacky, maybe we should be storing the distribution reference rather than splitting these
name += ":" + p.p.Tag
name += ":" + p.P.Tag
}
return name
}
func (pm *Manager) newPlugin(ref reference.Named, id string) *plugin {
p := &plugin{
p: types.Plugin{
P: types.Plugin{
Name: ref.Name(),
ID: id,
},
@ -76,12 +76,20 @@ func (pm *Manager) newPlugin(ref reference.Named, id string) *plugin {
runtimeSourcePath: filepath.Join(pm.runRoot, id),
}
if ref, ok := ref.(reference.NamedTagged); ok {
p.p.Tag = ref.Tag()
p.P.Tag = ref.Tag()
}
return p
}
// TODO: figure out why save() doesn't json encode *plugin object
func (pm *Manager) restorePlugin(p *plugin) error {
p.stateSourcePath = filepath.Join(pm.libRoot, p.P.ID, "state")
p.runtimeSourcePath = filepath.Join(pm.runRoot, p.P.ID)
if p.P.Active {
return pm.restore(p)
}
return nil
}
type pluginMap map[string]*plugin
// Manager controls the plugin subsystem.
@ -95,6 +103,7 @@ type Manager struct {
containerdClient libcontainerd.Client
registryService registry.Service
handleLegacy bool
liveRestore bool
}
// GetManager returns the singleton plugin Manager
@ -104,7 +113,7 @@ func GetManager() *Manager {
// Init (was NewManager) instantiates the singleton Manager.
// TODO: revert this to NewManager once we get rid of all the singletons.
func Init(root, execRoot string, remote libcontainerd.Remote, rs registry.Service) (err error) {
func Init(root, execRoot string, remote libcontainerd.Remote, rs registry.Service, liveRestore bool) (err error) {
if manager != nil {
return nil
}
@ -125,17 +134,18 @@ func Init(root, execRoot string, remote libcontainerd.Remote, rs registry.Servic
handlers: make(map[string]func(string, *plugins.Client)),
registryService: rs,
handleLegacy: true,
liveRestore: liveRestore,
}
if err := os.MkdirAll(manager.runRoot, 0700); err != nil {
return err
}
if err := manager.init(); err != nil {
return err
}
manager.containerdClient, err = remote.Client(manager)
if err != nil {
return err
}
if err := manager.init(); err != nil {
return err
}
return nil
}
@ -170,7 +180,7 @@ func FindWithCapability(capability string) ([]Plugin, error) {
defer manager.RUnlock()
pluginLoop:
for _, p := range manager.plugins {
for _, typ := range p.p.Manifest.Interface.Types {
for _, typ := range p.P.Manifest.Interface.Types {
if typ.Capability != capability || typ.Prefix != "docker" {
continue pluginLoop
}
@ -221,7 +231,7 @@ func LookupWithCapability(name, capability string) (Plugin, error) {
}
capability = strings.ToLower(capability)
for _, typ := range p.p.Manifest.Interface.Types {
for _, typ := range p.P.Manifest.Interface.Types {
if typ.Capability == capability && typ.Prefix == "docker" {
return p, nil
}
@ -262,55 +272,78 @@ func (pm *Manager) init() error {
}
return err
}
// TODO: Populate pm.plugins
if err := json.NewDecoder(dt).Decode(&pm.nameToID); err != nil {
if err := json.NewDecoder(dt).Decode(&pm.plugins); err != nil {
return err
}
// FIXME: validate, restore
return nil
var group sync.WaitGroup
group.Add(len(pm.plugins))
for _, p := range pm.plugins {
go func(p *plugin) {
defer group.Done()
if err := pm.restorePlugin(p); err != nil {
logrus.Errorf("Error restoring plugin '%s': %s", p.Name(), err)
return
}
pm.Lock()
pm.nameToID[p.Name()] = p.P.ID
requiresManualRestore := !pm.liveRestore && p.P.Active
pm.Unlock()
if requiresManualRestore {
// if liveRestore is not enabled, the plugin will be stopped now so we should enable it
if err := pm.enable(p); err != nil {
logrus.Errorf("Error restoring plugin '%s': %s", p.Name(), err)
}
}
}(p)
group.Wait()
}
return pm.save()
}
func (pm *Manager) initPlugin(p *plugin) error {
dt, err := os.Open(filepath.Join(pm.libRoot, p.p.ID, "manifest.json"))
dt, err := os.Open(filepath.Join(pm.libRoot, p.P.ID, "manifest.json"))
if err != nil {
return err
}
err = json.NewDecoder(dt).Decode(&p.p.Manifest)
err = json.NewDecoder(dt).Decode(&p.P.Manifest)
dt.Close()
if err != nil {
return err
}
p.p.Config.Mounts = make([]types.PluginMount, len(p.p.Manifest.Mounts))
for i, mount := range p.p.Manifest.Mounts {
p.p.Config.Mounts[i] = mount
p.P.Config.Mounts = make([]types.PluginMount, len(p.P.Manifest.Mounts))
for i, mount := range p.P.Manifest.Mounts {
p.P.Config.Mounts[i] = mount
}
p.p.Config.Env = make([]string, 0, len(p.p.Manifest.Env))
for _, env := range p.p.Manifest.Env {
p.P.Config.Env = make([]string, 0, len(p.P.Manifest.Env))
for _, env := range p.P.Manifest.Env {
if env.Value != nil {
p.p.Config.Env = append(p.p.Config.Env, fmt.Sprintf("%s=%s", env.Name, *env.Value))
p.P.Config.Env = append(p.P.Config.Env, fmt.Sprintf("%s=%s", env.Name, *env.Value))
}
}
copy(p.p.Config.Args, p.p.Manifest.Args.Value)
copy(p.P.Config.Args, p.P.Manifest.Args.Value)
f, err := os.Create(filepath.Join(pm.libRoot, p.p.ID, "plugin-config.json"))
f, err := os.Create(filepath.Join(pm.libRoot, p.P.ID, "plugin-config.json"))
if err != nil {
return err
}
err = json.NewEncoder(f).Encode(&p.p.Config)
err = json.NewEncoder(f).Encode(&p.P.Config)
f.Close()
return err
}
func (pm *Manager) remove(p *plugin) error {
if p.p.Active {
if p.P.Active {
return fmt.Errorf("plugin %s is active", p.Name())
}
pm.Lock() // fixme: lock single record
defer pm.Unlock()
os.RemoveAll(p.stateSourcePath)
delete(pm.plugins, p.p.ID)
delete(pm.plugins, p.P.ID)
delete(pm.nameToID, p.Name())
pm.save()
return nil
@ -332,7 +365,7 @@ func (pm *Manager) set(p *plugin, args []string) error {
func (pm *Manager) save() error {
filePath := filepath.Join(pm.libRoot, "plugins.json")
jsonData, err := json.Marshal(pm.nameToID)
jsonData, err := json.Marshal(pm.plugins)
if err != nil {
logrus.Debugf("Error in json.Marshal: %v", err)
return err

View file

@ -25,11 +25,11 @@ func (pm *Manager) enable(p *plugin) error {
}
p.restartManager = restartmanager.New(container.RestartPolicy{Name: "always"}, 0)
if err := pm.containerdClient.Create(p.p.ID, libcontainerd.Spec(*spec), libcontainerd.WithRestartManager(p.restartManager)); err != nil { // POC-only
if err := pm.containerdClient.Create(p.P.ID, libcontainerd.Spec(*spec), libcontainerd.WithRestartManager(p.restartManager)); err != nil { // POC-only
return err
}
socket := p.p.Manifest.Interface.Socket
socket := p.P.Manifest.Interface.Socket
p.client, err = plugins.NewClient("unix://"+filepath.Join(p.runtimeSourcePath, socket), nil)
if err != nil {
return err
@ -38,11 +38,11 @@ func (pm *Manager) enable(p *plugin) error {
//TODO: check net.Dial
pm.Lock() // fixme: lock single record
p.p.Active = true
p.P.Active = true
pm.save()
pm.Unlock()
for _, typ := range p.p.Manifest.Interface.Types {
for _, typ := range p.P.Manifest.Interface.Types {
if handler := pm.handlers[typ.String()]; handler != nil {
handler(p.Name(), p.Client())
}
@ -51,16 +51,21 @@ func (pm *Manager) enable(p *plugin) error {
return nil
}
func (pm *Manager) restore(p *plugin) error {
p.restartManager = restartmanager.New(container.RestartPolicy{Name: "always"}, 0)
return pm.containerdClient.Restore(p.P.ID, libcontainerd.WithRestartManager(p.restartManager))
}
func (pm *Manager) initSpec(p *plugin) (*specs.Spec, error) {
s := oci.DefaultSpec()
rootfs := filepath.Join(pm.libRoot, p.p.ID, "rootfs")
rootfs := filepath.Join(pm.libRoot, p.P.ID, "rootfs")
s.Root = specs.Root{
Path: rootfs,
Readonly: false, // TODO: all plugins should be readonly? settable in manifest?
}
mounts := append(p.p.Config.Mounts, types.PluginMount{
mounts := append(p.P.Config.Mounts, types.PluginMount{
Source: &p.runtimeSourcePath,
Destination: defaultPluginRuntimeDestination,
Type: "bind",
@ -95,11 +100,11 @@ func (pm *Manager) initSpec(p *plugin) (*specs.Spec, error) {
s.Mounts = append(s.Mounts, m)
}
envs := make([]string, 1, len(p.p.Config.Env)+1)
envs := make([]string, 1, len(p.P.Config.Env)+1)
envs[0] = "PATH=" + system.DefaultPathEnv
envs = append(envs, p.p.Config.Env...)
envs = append(envs, p.P.Config.Env...)
args := append(p.p.Manifest.Entrypoint, p.p.Config.Args...)
args := append(p.P.Manifest.Entrypoint, p.P.Config.Args...)
s.Process = specs.Process{
Terminal: false,
Args: args,
@ -114,13 +119,13 @@ func (pm *Manager) disable(p *plugin) error {
if err := p.restartManager.Cancel(); err != nil {
logrus.Error(err)
}
if err := pm.containerdClient.Signal(p.p.ID, int(syscall.SIGKILL)); err != nil {
if err := pm.containerdClient.Signal(p.P.ID, int(syscall.SIGKILL)); err != nil {
logrus.Error(err)
}
os.RemoveAll(p.runtimeSourcePath)
pm.Lock() // fixme: lock single record
defer pm.Unlock()
p.p.Active = false
p.P.Active = false
pm.save()
return nil
}

View file

@ -19,3 +19,7 @@ func (pm *Manager) initSpec(p *plugin) (*specs.Spec, error) {
func (pm *Manager) disable(p *plugin) error {
return fmt.Errorf("Not implemented")
}
func (pm *Manager) restore(p *plugin) error {
return fmt.Errorf("Not implemented")
}