Bladeren bron

Fix panic on daemon restart with running plugin

Scenario:

Daemon is ungracefully shutdown and leaves plugins running (no
live-restore).
Daemon comes back up.
The next time a container tries to use that plugin it will cause a
daemon panic because the plugin client is not set.

This fixes that by ensuring that the plugin does get shutdown.
Note, I do not think there would be any harm in just re-attaching to the
running plugin instead of shutting it down, however historically we shut
down plugins and containers when live-restore is not enabled.

[kir@: consolidate code to deleteTaskAndContainer, a few minor nits]

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
Signed-off-by: Kir Kolyshkin <kolyshkin@gmail.com>
Brian Goff 7 jaren geleden
bovenliggende
commit
dbeb432965
5 gewijzigde bestanden met toevoegingen van 199 en 55 verwijderingen
  1. 19 26
      plugin/executor/containerd/containerd.go
  2. 14 8
      plugin/manager.go
  3. 17 12
      plugin/manager_linux.go
  4. 148 8
      plugin/manager_linux_test.go
  5. 1 1
      plugin/manager_windows.go

+ 19 - 26
plugin/executor/containerd/containerd.go

@@ -58,6 +58,19 @@ type Executor struct {
 	exitHandler ExitHandler
 	exitHandler ExitHandler
 }
 }
 
 
+// deleteTaskAndContainer deletes plugin task and then plugin container from containerd
+func deleteTaskAndContainer(ctx context.Context, cli Client, id string) {
+	_, _, err := cli.DeleteTask(ctx, id)
+	if err != nil && !errdefs.IsNotFound(err) {
+		logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd")
+	}
+
+	err = cli.Delete(ctx, id)
+	if err != nil && !errdefs.IsNotFound(err) {
+		logrus.WithError(err).WithField("id", id).Error("failed to delete plugin container from containerd")
+	}
+}
+
 // Create creates a new container
 // Create creates a new container
 func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error {
 func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error {
 	opts := runctypes.RuncOptions{
 	opts := runctypes.RuncOptions{
@@ -87,34 +100,21 @@ func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteClo
 
 
 	_, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr))
 	_, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr))
 	if err != nil {
 	if err != nil {
-		if _, _, err2 := e.client.DeleteTask(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
-			logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to clean up containerd plugin task after failed start")
-		}
-		if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
-			logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to clean up containerd plugin container after failed start")
-		}
+		deleteTaskAndContainer(ctx, e.client, id)
 	}
 	}
 	return err
 	return err
 }
 }
 
 
 // Restore restores a container
 // Restore restores a container
-func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) error {
+func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) (bool, error) {
 	alive, _, err := e.client.Restore(context.Background(), id, attachStreamsFunc(stdout, stderr))
 	alive, _, err := e.client.Restore(context.Background(), id, attachStreamsFunc(stdout, stderr))
 	if err != nil && !errdefs.IsNotFound(err) {
 	if err != nil && !errdefs.IsNotFound(err) {
-		return err
+		return false, err
 	}
 	}
 	if !alive {
 	if !alive {
-		_, _, err = e.client.DeleteTask(context.Background(), id)
-		if err != nil && !errdefs.IsNotFound(err) {
-			logrus.WithError(err).Errorf("failed to delete container plugin %s task from containerd", id)
-		}
-
-		err = e.client.Delete(context.Background(), id)
-		if err != nil && !errdefs.IsNotFound(err) {
-			logrus.WithError(err).Errorf("failed to delete container plugin %s from containerd", id)
-		}
+		deleteTaskAndContainer(context.Background(), e.client, id)
 	}
 	}
-	return nil
+	return alive, nil
 }
 }
 
 
 // IsRunning returns if the container with the given id is running
 // IsRunning returns if the container with the given id is running
@@ -133,14 +133,7 @@ func (e *Executor) Signal(id string, signal int) error {
 func (e *Executor) ProcessEvent(id string, et libcontainerd.EventType, ei libcontainerd.EventInfo) error {
 func (e *Executor) ProcessEvent(id string, et libcontainerd.EventType, ei libcontainerd.EventInfo) error {
 	switch et {
 	switch et {
 	case libcontainerd.EventExit:
 	case libcontainerd.EventExit:
-		// delete task and container
-		if _, _, err := e.client.DeleteTask(context.Background(), id); err != nil {
-			logrus.WithError(err).Errorf("failed to delete container plugin %s task from containerd", id)
-		}
-
-		if err := e.client.Delete(context.Background(), id); err != nil {
-			logrus.WithError(err).Errorf("failed to delete container plugin %s from containerd", id)
-		}
+		deleteTaskAndContainer(context.Background(), e.client, id)
 		return e.exitHandler.HandleExitEvent(ei.ContainerID)
 		return e.exitHandler.HandleExitEvent(ei.ContainerID)
 	}
 	}
 	return nil
 	return nil

+ 14 - 8
plugin/manager.go

@@ -37,14 +37,14 @@ var validFullID = regexp.MustCompile(`^([a-f0-9]{64})$`)
 // Executor is the interface that the plugin manager uses to interact with for starting/stopping plugins
 // Executor is the interface that the plugin manager uses to interact with for starting/stopping plugins
 type Executor interface {
 type Executor interface {
 	Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error
 	Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error
-	Restore(id string, stdout, stderr io.WriteCloser) error
 	IsRunning(id string) (bool, error)
 	IsRunning(id string) (bool, error)
+	Restore(id string, stdout, stderr io.WriteCloser) (alive bool, err error)
 	Signal(id string, signal int) error
 	Signal(id string, signal int) error
 }
 }
 
 
-func (pm *Manager) restorePlugin(p *v2.Plugin) error {
+func (pm *Manager) restorePlugin(p *v2.Plugin, c *controller) error {
 	if p.IsEnabled() {
 	if p.IsEnabled() {
-		return pm.restore(p)
+		return pm.restore(p, c)
 	}
 	}
 	return nil
 	return nil
 }
 }
@@ -143,12 +143,15 @@ func (pm *Manager) HandleExitEvent(id string) error {
 		return err
 		return err
 	}
 	}
 
 
-	os.RemoveAll(filepath.Join(pm.config.ExecRoot, id))
+	if err := os.RemoveAll(filepath.Join(pm.config.ExecRoot, id)); err != nil && !os.IsNotExist(err) {
+		logrus.WithError(err).WithField("id", id).Error("Could not remove plugin bundle dir")
+	}
 
 
 	pm.mu.RLock()
 	pm.mu.RLock()
 	c := pm.cMap[p]
 	c := pm.cMap[p]
 	if c.exitChan != nil {
 	if c.exitChan != nil {
 		close(c.exitChan)
 		close(c.exitChan)
+		c.exitChan = nil // ignore duplicate events (containerd issue #2299)
 	}
 	}
 	restart := c.restart
 	restart := c.restart
 	pm.mu.RUnlock()
 	pm.mu.RUnlock()
@@ -205,12 +208,15 @@ func (pm *Manager) reload() error { // todo: restore
 	var wg sync.WaitGroup
 	var wg sync.WaitGroup
 	wg.Add(len(plugins))
 	wg.Add(len(plugins))
 	for _, p := range plugins {
 	for _, p := range plugins {
-		c := &controller{} // todo: remove this
+		c := &controller{exitChan: make(chan bool)}
+		pm.mu.Lock()
 		pm.cMap[p] = c
 		pm.cMap[p] = c
+		pm.mu.Unlock()
+
 		go func(p *v2.Plugin) {
 		go func(p *v2.Plugin) {
 			defer wg.Done()
 			defer wg.Done()
-			if err := pm.restorePlugin(p); err != nil {
-				logrus.Errorf("failed to restore plugin '%s': %s", p.Name(), err)
+			if err := pm.restorePlugin(p, c); err != nil {
+				logrus.WithError(err).WithField("id", p.GetID()).Error("Failed to restore plugin")
 				return
 				return
 			}
 			}
 
 
@@ -248,7 +254,7 @@ func (pm *Manager) reload() error { // todo: restore
 			if requiresManualRestore {
 			if requiresManualRestore {
 				// if liveRestore is not enabled, the plugin will be stopped now so we should enable it
 				// if liveRestore is not enabled, the plugin will be stopped now so we should enable it
 				if err := pm.enable(p, c, true); err != nil {
 				if err := pm.enable(p, c, true); err != nil {
-					logrus.Errorf("failed to enable plugin '%s': %s", p.Name(), err)
+					logrus.WithError(err).WithField("id", p.GetID()).Error("failed to enable plugin")
 				}
 				}
 			}
 			}
 		}(p)
 		}(p)

+ 17 - 12
plugin/manager_linux.go

@@ -79,7 +79,7 @@ func (pm *Manager) pluginPostStart(p *v2.Plugin, c *controller) error {
 		client, err := plugins.NewClientWithTimeout(addr.Network()+"://"+addr.String(), nil, p.Timeout())
 		client, err := plugins.NewClientWithTimeout(addr.Network()+"://"+addr.String(), nil, p.Timeout())
 		if err != nil {
 		if err != nil {
 			c.restart = false
 			c.restart = false
-			shutdownPlugin(p, c, pm.executor)
+			shutdownPlugin(p, c.exitChan, pm.executor)
 			return errors.WithStack(err)
 			return errors.WithStack(err)
 		}
 		}
 
 
@@ -106,7 +106,7 @@ func (pm *Manager) pluginPostStart(p *v2.Plugin, c *controller) error {
 			c.restart = false
 			c.restart = false
 			// While restoring plugins, we need to explicitly set the state to disabled
 			// While restoring plugins, we need to explicitly set the state to disabled
 			pm.config.Store.SetState(p, false)
 			pm.config.Store.SetState(p, false)
-			shutdownPlugin(p, c, pm.executor)
+			shutdownPlugin(p, c.exitChan, pm.executor)
 			return err
 			return err
 		}
 		}
 
 
@@ -117,16 +117,15 @@ func (pm *Manager) pluginPostStart(p *v2.Plugin, c *controller) error {
 	return pm.save(p)
 	return pm.save(p)
 }
 }
 
 
-func (pm *Manager) restore(p *v2.Plugin) error {
+func (pm *Manager) restore(p *v2.Plugin, c *controller) error {
 	stdout, stderr := makeLoggerStreams(p.GetID())
 	stdout, stderr := makeLoggerStreams(p.GetID())
-	if err := pm.executor.Restore(p.GetID(), stdout, stderr); err != nil {
+	alive, err := pm.executor.Restore(p.GetID(), stdout, stderr)
+	if err != nil {
 		return err
 		return err
 	}
 	}
 
 
 	if pm.config.LiveRestoreEnabled {
 	if pm.config.LiveRestoreEnabled {
-		c := &controller{}
-		if isRunning, _ := pm.executor.IsRunning(p.GetID()); !isRunning {
-			// plugin is not running, so follow normal startup procedure
+		if !alive {
 			return pm.enable(p, c, true)
 			return pm.enable(p, c, true)
 		}
 		}
 
 
@@ -138,10 +137,16 @@ func (pm *Manager) restore(p *v2.Plugin) error {
 		return pm.pluginPostStart(p, c)
 		return pm.pluginPostStart(p, c)
 	}
 	}
 
 
+	if alive {
+		// TODO(@cpuguy83): Should we always just re-attach to the running plugin instead of doing this?
+		c.restart = false
+		shutdownPlugin(p, c.exitChan, pm.executor)
+	}
+
 	return nil
 	return nil
 }
 }
 
 
-func shutdownPlugin(p *v2.Plugin, c *controller, executor Executor) {
+func shutdownPlugin(p *v2.Plugin, ec chan bool, executor Executor) {
 	pluginID := p.GetID()
 	pluginID := p.GetID()
 
 
 	err := executor.Signal(pluginID, int(unix.SIGTERM))
 	err := executor.Signal(pluginID, int(unix.SIGTERM))
@@ -149,7 +154,7 @@ func shutdownPlugin(p *v2.Plugin, c *controller, executor Executor) {
 		logrus.Errorf("Sending SIGTERM to plugin failed with error: %v", err)
 		logrus.Errorf("Sending SIGTERM to plugin failed with error: %v", err)
 	} else {
 	} else {
 		select {
 		select {
-		case <-c.exitChan:
+		case <-ec:
 			logrus.Debug("Clean shutdown of plugin")
 			logrus.Debug("Clean shutdown of plugin")
 		case <-time.After(time.Second * 10):
 		case <-time.After(time.Second * 10):
 			logrus.Debug("Force shutdown plugin")
 			logrus.Debug("Force shutdown plugin")
@@ -157,7 +162,7 @@ func shutdownPlugin(p *v2.Plugin, c *controller, executor Executor) {
 				logrus.Errorf("Sending SIGKILL to plugin failed with error: %v", err)
 				logrus.Errorf("Sending SIGKILL to plugin failed with error: %v", err)
 			}
 			}
 			select {
 			select {
-			case <-c.exitChan:
+			case <-ec:
 				logrus.Debug("SIGKILL plugin shutdown")
 				logrus.Debug("SIGKILL plugin shutdown")
 			case <-time.After(time.Second * 10):
 			case <-time.After(time.Second * 10):
 				logrus.Debug("Force shutdown plugin FAILED")
 				logrus.Debug("Force shutdown plugin FAILED")
@@ -172,7 +177,7 @@ func (pm *Manager) disable(p *v2.Plugin, c *controller) error {
 	}
 	}
 
 
 	c.restart = false
 	c.restart = false
-	shutdownPlugin(p, c, pm.executor)
+	shutdownPlugin(p, c.exitChan, pm.executor)
 	pm.config.Store.SetState(p, false)
 	pm.config.Store.SetState(p, false)
 	return pm.save(p)
 	return pm.save(p)
 }
 }
@@ -191,7 +196,7 @@ func (pm *Manager) Shutdown() {
 		}
 		}
 		if pm.executor != nil && p.IsEnabled() {
 		if pm.executor != nil && p.IsEnabled() {
 			c.restart = false
 			c.restart = false
-			shutdownPlugin(p, c, pm.executor)
+			shutdownPlugin(p, c.exitChan, pm.executor)
 		}
 		}
 	}
 	}
 	if err := mount.RecursiveUnmount(pm.config.Root); err != nil {
 	if err := mount.RecursiveUnmount(pm.config.Root); err != nil {

+ 148 - 8
plugin/manager_linux_test.go

@@ -3,12 +3,14 @@ package plugin // import "github.com/docker/docker/plugin"
 import (
 import (
 	"io"
 	"io"
 	"io/ioutil"
 	"io/ioutil"
+	"net"
 	"os"
 	"os"
 	"path/filepath"
 	"path/filepath"
 	"testing"
 	"testing"
 
 
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/pkg/mount"
 	"github.com/docker/docker/pkg/mount"
+	"github.com/docker/docker/pkg/stringid"
 	"github.com/docker/docker/pkg/system"
 	"github.com/docker/docker/pkg/system"
 	"github.com/docker/docker/plugin/v2"
 	"github.com/docker/docker/plugin/v2"
 	"github.com/gotestyourself/gotestyourself/skip"
 	"github.com/gotestyourself/gotestyourself/skip"
@@ -59,7 +61,7 @@ func TestManagerWithPluginMounts(t *testing.T) {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
-	if err := m.Remove(p1.Name(), &types.PluginRmConfig{ForceRemove: true}); err != nil {
+	if err := m.Remove(p1.GetID(), &types.PluginRmConfig{ForceRemove: true}); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 	if mounted, err := mount.Mounted(p2Mount); !mounted || err != nil {
 	if mounted, err := mount.Mounted(p2Mount); !mounted || err != nil {
@@ -68,17 +70,18 @@ func TestManagerWithPluginMounts(t *testing.T) {
 }
 }
 
 
 func newTestPlugin(t *testing.T, name, cap, root string) *v2.Plugin {
 func newTestPlugin(t *testing.T, name, cap, root string) *v2.Plugin {
-	rootfs := filepath.Join(root, name)
+	id := stringid.GenerateNonCryptoID()
+	rootfs := filepath.Join(root, id)
 	if err := os.MkdirAll(rootfs, 0755); err != nil {
 	if err := os.MkdirAll(rootfs, 0755); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
-	p := v2.Plugin{PluginObj: types.Plugin{Name: name}}
+	p := v2.Plugin{PluginObj: types.Plugin{ID: id, Name: name}}
 	p.Rootfs = rootfs
 	p.Rootfs = rootfs
 	iType := types.PluginInterfaceType{Capability: cap, Prefix: "docker", Version: "1.0"}
 	iType := types.PluginInterfaceType{Capability: cap, Prefix: "docker", Version: "1.0"}
-	i := types.PluginConfigInterface{Socket: "plugins.sock", Types: []types.PluginInterfaceType{iType}}
+	i := types.PluginConfigInterface{Socket: "plugin.sock", Types: []types.PluginInterfaceType{iType}}
 	p.PluginObj.Config.Interface = i
 	p.PluginObj.Config.Interface = i
-	p.PluginObj.ID = name
+	p.PluginObj.ID = id
 
 
 	return &p
 	return &p
 }
 }
@@ -90,8 +93,8 @@ func (e *simpleExecutor) Create(id string, spec specs.Spec, stdout, stderr io.Wr
 	return errors.New("Create failed")
 	return errors.New("Create failed")
 }
 }
 
 
-func (e *simpleExecutor) Restore(id string, stdout, stderr io.WriteCloser) error {
-	return nil
+func (e *simpleExecutor) Restore(id string, stdout, stderr io.WriteCloser) (bool, error) {
+	return false, nil
 }
 }
 
 
 func (e *simpleExecutor) IsRunning(id string) (bool, error) {
 func (e *simpleExecutor) IsRunning(id string) (bool, error) {
@@ -133,7 +136,144 @@ func TestCreateFailed(t *testing.T) {
 		t.Fatalf("expected Create failed error, got %v", err)
 		t.Fatalf("expected Create failed error, got %v", err)
 	}
 	}
 
 
-	if err := m.Remove(p.Name(), &types.PluginRmConfig{ForceRemove: true}); err != nil {
+	if err := m.Remove(p.GetID(), &types.PluginRmConfig{ForceRemove: true}); err != nil {
+		t.Fatal(err)
+	}
+}
+
+type executorWithRunning struct {
+	m         *Manager
+	root      string
+	exitChans map[string]chan struct{}
+}
+
+func (e *executorWithRunning) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error {
+	sockAddr := filepath.Join(e.root, id, "plugin.sock")
+	ch := make(chan struct{})
+	if e.exitChans == nil {
+		e.exitChans = make(map[string]chan struct{})
+	}
+	e.exitChans[id] = ch
+	listenTestPlugin(sockAddr, ch)
+	return nil
+}
+
+func (e *executorWithRunning) IsRunning(id string) (bool, error) {
+	return true, nil
+}
+func (e *executorWithRunning) Restore(id string, stdout, stderr io.WriteCloser) (bool, error) {
+	return true, nil
+}
+
+func (e *executorWithRunning) Signal(id string, signal int) error {
+	ch := e.exitChans[id]
+	ch <- struct{}{}
+	<-ch
+	e.m.HandleExitEvent(id)
+	return nil
+}
+
+func TestPluginAlreadyRunningOnStartup(t *testing.T) {
+	t.Parallel()
+
+	root, err := ioutil.TempDir("", t.Name())
+	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
+	defer system.EnsureRemoveAll(root)
+
+	for _, test := range []struct {
+		desc   string
+		config ManagerConfig
+	}{
+		{
+			desc: "live-restore-disabled",
+			config: ManagerConfig{
+				LogPluginEvent: func(_, _, _ string) {},
+			},
+		},
+		{
+			desc: "live-restore-enabled",
+			config: ManagerConfig{
+				LogPluginEvent:     func(_, _, _ string) {},
+				LiveRestoreEnabled: true,
+			},
+		},
+	} {
+		t.Run(test.desc, func(t *testing.T) {
+			config := test.config
+			desc := test.desc
+			t.Parallel()
+
+			p := newTestPlugin(t, desc, desc, config.Root)
+			p.PluginObj.Enabled = true
+
+			// Need a short-ish path here so we don't run into unix socket path length issues.
+			config.ExecRoot, err = ioutil.TempDir("", "plugintest")
+
+			executor := &executorWithRunning{root: config.ExecRoot}
+			config.CreateExecutor = func(m *Manager) (Executor, error) { executor.m = m; return executor, nil }
+
+			if err := executor.Create(p.GetID(), specs.Spec{}, nil, nil); err != nil {
+				t.Fatal(err)
+			}
+
+			root := filepath.Join(root, desc)
+			config.Root = filepath.Join(root, "manager")
+			if err := os.MkdirAll(filepath.Join(config.Root, p.GetID()), 0755); err != nil {
+				t.Fatal(err)
+			}
+
+			if !p.IsEnabled() {
+				t.Fatal("plugin should be enabled")
+			}
+			if err := (&Manager{config: config}).save(p); err != nil {
+				t.Fatal(err)
+			}
+
+			s := NewStore()
+			config.Store = s
+			if err != nil {
+				t.Fatal(err)
+			}
+			defer system.EnsureRemoveAll(config.ExecRoot)
+
+			m, err := NewManager(config)
+			if err != nil {
+				t.Fatal(err)
+			}
+			defer m.Shutdown()
+
+			p = s.GetAll()[p.GetID()] // refresh `p` with what the manager knows
+			if p.Client() == nil {
+				t.Fatal("plugin client should not be nil")
+			}
+		})
+	}
+}
+
+func listenTestPlugin(sockAddr string, exit chan struct{}) (net.Listener, error) {
+	if err := os.MkdirAll(filepath.Dir(sockAddr), 0755); err != nil {
+		return nil, err
+	}
+	l, err := net.Listen("unix", sockAddr)
+	if err != nil {
+		return nil, err
+	}
+	go func() {
+		for {
+			conn, err := l.Accept()
+			if err != nil {
+				return
+			}
+			conn.Close()
+		}
+	}()
+	go func() {
+		<-exit
+		l.Close()
+		os.Remove(sockAddr)
+		exit <- struct{}{}
+	}()
+	return l, nil
 }
 }

+ 1 - 1
plugin/manager_windows.go

@@ -19,7 +19,7 @@ func (pm *Manager) disable(p *v2.Plugin, c *controller) error {
 	return fmt.Errorf("Not implemented")
 	return fmt.Errorf("Not implemented")
 }
 }
 
 
-func (pm *Manager) restore(p *v2.Plugin) error {
+func (pm *Manager) restore(p *v2.Plugin, c *controller) error {
 	return fmt.Errorf("Not implemented")
 	return fmt.Errorf("Not implemented")
 }
 }