// +build !windows package libcontainerd // import "github.com/docker/docker/libcontainerd" import ( "context" "fmt" "io" "io/ioutil" "os" "os/exec" "path/filepath" "strconv" "strings" "sync" "syscall" "time" "github.com/BurntSushi/toml" "github.com/containerd/containerd" "github.com/containerd/containerd/services/server" "github.com/docker/docker/pkg/system" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) const ( maxConnectionRetryCount = 3 healthCheckTimeout = 3 * time.Second shutdownTimeout = 15 * time.Second configFile = "containerd.toml" binaryName = "docker-containerd" pidFile = "docker-containerd.pid" ) type pluginConfigs struct { Plugins map[string]interface{} `toml:"plugins"` } type remote struct { sync.RWMutex server.Config daemonPid int logger *logrus.Entry daemonWaitCh chan struct{} clients []*client shutdownContext context.Context shutdownCancel context.CancelFunc shutdown bool // Options startDaemon bool rootDir string stateDir string snapshotter string pluginConfs pluginConfigs } // New creates a fresh instance of libcontainerd remote. func New(rootDir, stateDir string, options ...RemoteOption) (rem Remote, err error) { defer func() { if err != nil { err = errors.Wrap(err, "Failed to connect to containerd") } }() r := &remote{ rootDir: rootDir, stateDir: stateDir, Config: server.Config{ Root: filepath.Join(rootDir, "daemon"), State: filepath.Join(stateDir, "daemon"), }, pluginConfs: pluginConfigs{make(map[string]interface{})}, daemonPid: -1, logger: logrus.WithField("module", "libcontainerd"), } r.shutdownContext, r.shutdownCancel = context.WithCancel(context.Background()) rem = r for _, option := range options { if err = option.Apply(r); err != nil { return } } r.setDefaults() if err = system.MkdirAll(stateDir, 0700, ""); err != nil { return } if r.startDaemon { os.Remove(r.GRPC.Address) if err = r.startContainerd(); err != nil { return } defer func() { if err != nil { r.Cleanup() } }() } // This connection is just used to monitor the connection client, err := containerd.New(r.GRPC.Address) if err != nil { return } if _, err := client.Version(context.Background()); err != nil { system.KillProcess(r.daemonPid) return nil, errors.Wrapf(err, "unable to get containerd version") } go r.monitorConnection(client) return r, nil } func (r *remote) NewClient(ns string, b Backend) (Client, error) { c := &client{ stateDir: r.stateDir, logger: r.logger.WithField("namespace", ns), namespace: ns, backend: b, containers: make(map[string]*container), } rclient, err := containerd.New(r.GRPC.Address, containerd.WithDefaultNamespace(ns)) if err != nil { return nil, err } c.remote = rclient go c.processEventStream(r.shutdownContext) r.Lock() r.clients = append(r.clients, c) r.Unlock() return c, nil } func (r *remote) Cleanup() { if r.daemonPid != -1 { r.shutdownCancel() r.stopDaemon() } // cleanup some files os.Remove(filepath.Join(r.stateDir, pidFile)) r.platformCleanup() } func (r *remote) getContainerdPid() (int, error) { pidFile := filepath.Join(r.stateDir, pidFile) f, err := os.OpenFile(pidFile, os.O_RDWR, 0600) if err != nil { if os.IsNotExist(err) { return -1, nil } return -1, err } defer f.Close() b := make([]byte, 8) n, err := f.Read(b) if err != nil && err != io.EOF { return -1, err } if n > 0 { pid, err := strconv.ParseUint(string(b[:n]), 10, 64) if err != nil { return -1, err } if system.IsProcessAlive(int(pid)) { return int(pid), nil } } return -1, nil } func (r *remote) getContainerdConfig() (string, error) { path := filepath.Join(r.stateDir, configFile) f, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600) if err != nil { return "", errors.Wrapf(err, "failed to open containerd config file at %s", path) } defer f.Close() enc := toml.NewEncoder(f) if err = enc.Encode(r.Config); err != nil { return "", errors.Wrapf(err, "failed to encode general config") } if err = enc.Encode(r.pluginConfs); err != nil { return "", errors.Wrapf(err, "failed to encode plugin configs") } return path, nil } func (r *remote) startContainerd() error { pid, err := r.getContainerdPid() if err != nil { return err } if pid != -1 { r.daemonPid = pid logrus.WithField("pid", pid). Infof("libcontainerd: %s is still running", binaryName) return nil } configFile, err := r.getContainerdConfig() if err != nil { return err } args := []string{"--config", configFile} if r.Debug.Level != "" { args = append(args, "--log-level", r.Debug.Level) } cmd := exec.Command(binaryName, args...) // redirect containerd logs to docker logs cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr cmd.SysProcAttr = containerdSysProcAttr() // clear the NOTIFY_SOCKET from the env when starting containerd cmd.Env = nil for _, e := range os.Environ() { if !strings.HasPrefix(e, "NOTIFY_SOCKET") { cmd.Env = append(cmd.Env, e) } } if err := cmd.Start(); err != nil { return err } r.daemonWaitCh = make(chan struct{}) go func() { // Reap our child when needed if err := cmd.Wait(); err != nil { r.logger.WithError(err).Errorf("containerd did not exit successfully") } close(r.daemonWaitCh) }() r.daemonPid = cmd.Process.Pid err = ioutil.WriteFile(filepath.Join(r.stateDir, pidFile), []byte(fmt.Sprintf("%d", r.daemonPid)), 0660) if err != nil { system.KillProcess(r.daemonPid) return errors.Wrap(err, "libcontainerd: failed to save daemon pid to disk") } logrus.WithField("pid", r.daemonPid). Infof("libcontainerd: started new %s process", binaryName) return nil } func (r *remote) monitorConnection(monitor *containerd.Client) { var transientFailureCount = 0 for { select { case <-r.shutdownContext.Done(): r.logger.Info("stopping healthcheck following graceful shutdown") monitor.Close() return case <-time.After(500 * time.Millisecond): } ctx, cancel := context.WithTimeout(r.shutdownContext, healthCheckTimeout) _, err := monitor.IsServing(ctx) cancel() if err == nil { transientFailureCount = 0 continue } select { case <-r.shutdownContext.Done(): r.logger.Info("stopping healthcheck following graceful shutdown") monitor.Close() return default: } r.logger.WithError(err).WithField("binary", binaryName).Debug("daemon is not responding") if r.daemonPid == -1 { continue } transientFailureCount++ if transientFailureCount < maxConnectionRetryCount || system.IsProcessAlive(r.daemonPid) { continue } transientFailureCount = 0 if system.IsProcessAlive(r.daemonPid) { r.logger.WithField("pid", r.daemonPid).Info("killing and restarting containerd") // Try to get a stack trace syscall.Kill(r.daemonPid, syscall.SIGUSR1) <-time.After(100 * time.Millisecond) system.KillProcess(r.daemonPid) } if r.daemonWaitCh != nil { <-r.daemonWaitCh } os.Remove(r.GRPC.Address) if err := r.startContainerd(); err != nil { r.logger.WithError(err).Error("failed restarting containerd") continue } if err := monitor.Reconnect(); err != nil { r.logger.WithError(err).Error("failed connect to containerd") continue } var wg sync.WaitGroup for _, c := range r.clients { wg.Add(1) go func(c *client) { defer wg.Done() c.logger.WithField("namespace", c.namespace).Debug("creating new containerd remote client") if err := c.reconnect(); err != nil { r.logger.WithError(err).Error("failed to connect to containerd") // TODO: Better way to handle this? // This *shouldn't* happen, but this could wind up where the daemon // is not able to communicate with an eventually up containerd } }(c) wg.Wait() } } }