libcontainerd: split client and supervisor

Adds a supervisor package for starting and monitoring containerd.
Separates grpc connection allowing access from daemon.

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
Derek McGowan 2018-05-23 12:15:21 -07:00
parent 724c5f317e
commit dd2e19ebd5
No known key found for this signature in database
GPG key ID: F58C5D0A4405ACDB
19 changed files with 339 additions and 475 deletions

View file

@ -36,7 +36,7 @@ import (
"github.com/docker/docker/daemon/config"
"github.com/docker/docker/daemon/listeners"
"github.com/docker/docker/dockerversion"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/libcontainerd/supervisor"
dopts "github.com/docker/docker/opts"
"github.com/docker/docker/pkg/authorization"
"github.com/docker/docker/pkg/jsonmessage"
@ -45,7 +45,6 @@ import (
"github.com/docker/docker/pkg/signal"
"github.com/docker/docker/pkg/system"
"github.com/docker/docker/plugin"
"github.com/docker/docker/registry"
"github.com/docker/docker/runconfig"
"github.com/docker/go-connections/tlsconfig"
swarmapi "github.com/docker/swarmkit/api"
@ -112,6 +111,10 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
return err
}
if err := system.MkdirAll(cli.Config.ExecRoot, 0700, ""); err != nil {
return err
}
if cli.Pidfile != "" {
pf, err := pidfile.New(cli.Pidfile)
if err != nil {
@ -135,19 +138,27 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
return fmt.Errorf("Failed to load listeners: %v", err)
}
registryService, err := registry.NewService(cli.Config.ServiceOptions)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(context.Background())
if cli.Config.ContainerdAddr == "" && runtime.GOOS != "windows" {
opts, err := cli.getContainerdDaemonOpts()
if err != nil {
cancel()
return fmt.Errorf("Failed to generate containerd options: %v", err)
}
rOpts, err := cli.getRemoteOptions()
if err != nil {
return fmt.Errorf("Failed to generate containerd options: %v", err)
}
containerdRemote, err := libcontainerd.New(filepath.Join(cli.Config.Root, "containerd"), filepath.Join(cli.Config.ExecRoot, "containerd"), rOpts...)
if err != nil {
return err
r, err := supervisor.Start(ctx, filepath.Join(cli.Config.Root, "containerd"), filepath.Join(cli.Config.ExecRoot, "containerd"), opts...)
if err != nil {
cancel()
return fmt.Errorf("Failed to start containerd: %v", err)
}
cli.Config.ContainerdAddr = r.Address()
// Try to wait for containerd to shutdown
defer r.WaitTimeout(10 * time.Second)
}
defer cancel()
signal.Trap(func() {
cli.stop()
<-stopc // wait for daemonCli.start() to return
@ -162,7 +173,7 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
logrus.Fatalf("Error creating middlewares: %v", err)
}
d, err := daemon.NewDaemon(cli.Config, registryService, containerdRemote, pluginStore)
d, err := daemon.NewDaemon(ctx, cli.Config, pluginStore)
if err != nil {
return fmt.Errorf("Error starting daemon: %v", err)
}
@ -207,10 +218,7 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
initRouter(routerOptions)
// process cluster change notifications
watchCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go d.ProcessClusterNotifications(watchCtx, c.GetWatchStream())
go d.ProcessClusterNotifications(ctx, c.GetWatchStream())
cli.setupConfigReloadTrap()
@ -227,8 +235,12 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
// Wait for serve API to complete
errAPI := <-serveAPIWait
c.Cleanup()
shutdownDaemon(d)
containerdRemote.Cleanup()
// Stop notification processing and any background processes
cancel()
if errAPI != nil {
return fmt.Errorf("Shutting down due to ServeAPI error: %v", errAPI)
}
@ -511,14 +523,22 @@ func (cli *DaemonCli) initMiddlewares(s *apiserver.Server, cfg *apiserver.Config
return nil
}
func (cli *DaemonCli) getRemoteOptions() ([]libcontainerd.RemoteOption, error) {
opts := []libcontainerd.RemoteOption{}
pOpts, err := cli.getPlatformRemoteOptions()
func (cli *DaemonCli) getContainerdDaemonOpts() ([]supervisor.DaemonOpt, error) {
opts, err := cli.getPlatformContainerdDaemonOpts()
if err != nil {
return nil, err
}
opts = append(opts, pOpts...)
if cli.Config.Debug {
opts = append(opts, supervisor.WithLogLevel("debug"))
} else if cli.Config.LogLevel != "" {
opts = append(opts, supervisor.WithLogLevel(cli.Config.LogLevel))
}
if !cli.Config.CriContainerd {
opts = append(opts, supervisor.WithPlugin("cri", nil))
}
return opts, nil
}

View file

@ -13,7 +13,7 @@ import (
"github.com/containerd/containerd/runtime/linux"
"github.com/docker/docker/cmd/dockerd/hack"
"github.com/docker/docker/daemon"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/libcontainerd/supervisor"
"github.com/docker/libnetwork/portallocator"
"golang.org/x/sys/unix"
)
@ -36,29 +36,16 @@ func getDaemonConfDir(_ string) string {
return "/etc/docker"
}
func (cli *DaemonCli) getPlatformRemoteOptions() ([]libcontainerd.RemoteOption, error) {
opts := []libcontainerd.RemoteOption{
libcontainerd.WithOOMScore(cli.Config.OOMScoreAdjust),
libcontainerd.WithPlugin("linux", &linux.Config{
func (cli *DaemonCli) getPlatformContainerdDaemonOpts() ([]supervisor.DaemonOpt, error) {
opts := []supervisor.DaemonOpt{
supervisor.WithOOMScore(cli.Config.OOMScoreAdjust),
supervisor.WithPlugin("linux", &linux.Config{
Shim: daemon.DefaultShimBinary,
Runtime: daemon.DefaultRuntimeBinary,
RuntimeRoot: filepath.Join(cli.Config.Root, "runc"),
ShimDebug: cli.Config.Debug,
}),
}
if cli.Config.Debug {
opts = append(opts, libcontainerd.WithLogLevel("debug"))
} else if cli.Config.LogLevel != "" {
opts = append(opts, libcontainerd.WithLogLevel(cli.Config.LogLevel))
}
if cli.Config.ContainerdAddr != "" {
opts = append(opts, libcontainerd.WithRemoteAddr(cli.Config.ContainerdAddr))
} else {
opts = append(opts, libcontainerd.WithStartDaemon(true))
}
if !cli.Config.CriContainerd {
opts = append(opts, libcontainerd.WithPlugin("cri", nil))
}
return opts, nil
}

View file

@ -6,7 +6,7 @@ import (
"os"
"path/filepath"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/libcontainerd/supervisor"
"github.com/sirupsen/logrus"
"golang.org/x/sys/windows"
)
@ -48,7 +48,7 @@ func notifyShutdown(err error) {
}
}
func (cli *DaemonCli) getPlatformRemoteOptions() ([]libcontainerd.RemoteOption, error) {
func (cli *DaemonCli) getPlatformContainerdDaemonOpts() ([]supervisor.DaemonOpt, error) {
return nil, nil
}

View file

@ -18,6 +18,11 @@ import (
"sync"
"time"
"google.golang.org/grpc"
"github.com/containerd/containerd"
"github.com/containerd/containerd/defaults"
"github.com/containerd/containerd/pkg/dialer"
"github.com/docker/docker/api/types"
containertypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/swarm"
@ -94,6 +99,7 @@ type Daemon struct {
PluginStore *plugin.Store // todo: remove
pluginManager *plugin.Manager
linkIndex *linkIndex
containerdCli *containerd.Client
containerd libcontainerd.Client
defaultIsolation containertypes.Isolation // Default isolation mode on Windows
clusterProvider cluster.Provider
@ -565,9 +571,14 @@ func (daemon *Daemon) IsSwarmCompatible() error {
// NewDaemon sets up everything for the daemon to be able to service
// requests from the webserver.
func NewDaemon(config *config.Config, registryService registry.Service, containerdRemote libcontainerd.Remote, pluginStore *plugin.Store) (daemon *Daemon, err error) {
func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.Store) (daemon *Daemon, err error) {
setDefaultMtu(config)
registryService, err := registry.NewService(config.ServiceOptions)
if err != nil {
return nil, err
}
// Ensure that we have a correct root key limit for launching containers.
if err := ModifyRootKeyLimit(); err != nil {
logrus.Warnf("unable to modify root key limit, number of containers could be limited by this quota: %v", err)
@ -720,8 +731,35 @@ func NewDaemon(config *config.Config, registryService registry.Service, containe
}
registerMetricsPluginCallback(d.PluginStore, metricsSockPath)
gopts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithBackoffMaxDelay(3 * time.Second),
grpc.WithDialer(dialer.Dialer),
// TODO(stevvooe): We may need to allow configuration of this on the client.
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)),
}
if config.ContainerdAddr != "" {
d.containerdCli, err = containerd.New(config.ContainerdAddr, containerd.WithDefaultNamespace(ContainersNamespace), containerd.WithDialOpts(gopts))
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", config.ContainerdAddr)
}
}
createPluginExec := func(m *plugin.Manager) (plugin.Executor, error) {
return pluginexec.New(getPluginExecRoot(config.Root), containerdRemote, m)
var pluginCli *containerd.Client
// Windows is not currently using containerd, keep the
// client as nil
if config.ContainerdAddr != "" {
pluginCli, err = containerd.New(config.ContainerdAddr, containerd.WithDefaultNamespace(pluginexec.PluginNamespace), containerd.WithDialOpts(gopts))
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", config.ContainerdAddr)
}
}
return pluginexec.New(ctx, getPluginExecRoot(config.Root), pluginCli, m)
}
// Plugin system initialization should happen before restore. Do not change order.
@ -880,7 +918,7 @@ func NewDaemon(config *config.Config, registryService registry.Service, containe
go d.execCommandGC()
d.containerd, err = containerdRemote.NewClient(ContainersNamespace, d)
d.containerd, err = libcontainerd.NewClient(ctx, d.containerdCli, filepath.Join(config.ExecRoot, "containerd"), ContainersNamespace, d)
if err != nil {
return nil, err
}
@ -1037,6 +1075,10 @@ func (daemon *Daemon) Shutdown() error {
daemon.netController.Stop()
}
if daemon.containerdCli != nil {
daemon.containerdCli.Close()
}
return daemon.cleanupMounts()
}

View file

@ -102,38 +102,34 @@ func (c *container) getOOMKilled() bool {
type client struct {
sync.RWMutex // protects containers map
remote *containerd.Client
client *containerd.Client
stateDir string
logger *logrus.Entry
ns string
namespace string
backend Backend
eventQ queue
containers map[string]*container
}
func (c *client) reconnect() error {
c.Lock()
err := c.remote.Reconnect()
c.Unlock()
return err
}
// NewClient creates a new libcontainerd client from a containerd client
func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b Backend) (Client, error) {
c := &client{
client: cli,
stateDir: stateDir,
logger: logrus.WithField("module", "libcontainerd").WithField("namespace", ns),
ns: ns,
backend: b,
containers: make(map[string]*container),
}
func (c *client) setRemote(remote *containerd.Client) {
c.Lock()
c.remote = remote
c.Unlock()
}
go c.processEventStream(ctx, ns)
func (c *client) getRemote() *containerd.Client {
c.RLock()
remote := c.remote
c.RUnlock()
return remote
return c, nil
}
func (c *client) Version(ctx context.Context) (containerd.Version, error) {
return c.getRemote().Version(ctx)
return c.client.Version(ctx)
}
// Restore loads the containerd container.
@ -170,7 +166,7 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallba
err = wrapError(err)
}()
ctr, err := c.getRemote().LoadContainer(ctx, id)
ctr, err := c.client.LoadContainer(ctx, id)
if err != nil {
return false, -1, errors.WithStack(wrapError(err))
}
@ -225,7 +221,7 @@ func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, run
c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")
cdCtr, err := c.getRemote().NewContainer(ctx, id,
cdCtr, err := c.client.NewContainer(ctx, id,
containerd.WithSpec(ociSpec),
// TODO(mlaventure): when containerd support lcow, revisit runtime value
containerd.WithRuntime(fmt.Sprintf("io.containerd.runtime.v1.%s", runtime.GOOS), runtimeOptions))
@ -268,7 +264,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
// remove the checkpoint when we're done
defer func() {
if cp != nil {
err := c.getRemote().ContentStore().Delete(context.Background(), cp.Digest)
err := c.client.ContentStore().Delete(context.Background(), cp.Digest)
if err != nil {
c.logger.WithError(err).WithFields(logrus.Fields{
"ref": checkpointDir,
@ -571,14 +567,14 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi
}
// Whatever happens, delete the checkpoint from containerd
defer func() {
err := c.getRemote().ImageService().Delete(context.Background(), img.Name())
err := c.client.ImageService().Delete(context.Background(), img.Name())
if err != nil {
c.logger.WithError(err).WithField("digest", img.Target().Digest).
Warnf("failed to delete checkpoint image")
}
}()
b, err := content.ReadBlob(ctx, c.getRemote().ContentStore(), img.Target())
b, err := content.ReadBlob(ctx, c.client.ContentStore(), img.Target())
if err != nil {
return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data"))
}
@ -598,7 +594,7 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi
return errdefs.System(errors.Wrapf(err, "invalid checkpoint"))
}
rat, err := c.getRemote().ContentStore().ReaderAt(ctx, *cpDesc)
rat, err := c.client.ContentStore().ReaderAt(ctx, *cpDesc)
if err != nil {
return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader"))
}
@ -735,7 +731,7 @@ func (c *client) processEvent(ctr *container, et EventType, ei EventInfo) {
})
}
func (c *client) processEventStream(ctx context.Context) {
func (c *client) processEventStream(ctx context.Context, ns string) {
var (
err error
ev *events.Envelope
@ -746,9 +742,9 @@ func (c *client) processEventStream(ctx context.Context) {
// Filter on both namespace *and* topic. To create an "and" filter,
// this must be a single, comma-separated string
eventStream, errC := c.getRemote().EventService().Subscribe(ctx, "namespace=="+c.namespace+",topic~=|^/tasks/|")
eventStream, errC := c.client.EventService().Subscribe(ctx, "namespace=="+ns+",topic~=|^/tasks/|")
c.logger.WithField("namespace", c.namespace).Debug("processing event stream")
c.logger.Debug("processing event stream")
var oomKilled bool
for {
@ -758,7 +754,7 @@ func (c *client) processEventStream(ctx context.Context) {
errStatus, ok := status.FromError(err)
if !ok || errStatus.Code() != codes.Canceled {
c.logger.WithError(err).Error("failed to get event")
go c.processEventStream(ctx)
go c.processEventStream(ctx, ns)
} else {
c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
}
@ -858,7 +854,7 @@ func (c *client) processEventStream(ctx context.Context) {
}
func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
writer, err := c.getRemote().ContentStore().Writer(ctx, content.WithRef(ref))
writer, err := c.client.ContentStore().Writer(ctx, content.WithRef(ref))
if err != nil {
return nil, err
}

View file

@ -71,6 +71,28 @@ const (
// of docker.
const defaultOwner = "docker"
type client struct {
sync.Mutex
stateDir string
backend Backend
logger *logrus.Entry
eventQ queue
containers map[string]*container
}
// NewClient creates a new local executor for windows
func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b Backend) (Client, error) {
c := &client{
stateDir: stateDir,
backend: b,
logger: logrus.WithField("module", "libcontainerd").WithField("module", "libcontainerd").WithField("namespace", ns),
containers: make(map[string]*container),
}
return c, nil
}
func (c *client) Version(ctx context.Context) (containerd.Version, error) {
return containerd.Version{}, errors.New("not implemented on Windows")
}

View file

@ -1,142 +0,0 @@
// +build !windows
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import "fmt"
// WithRemoteAddr sets the external containerd socket to connect to.
func WithRemoteAddr(addr string) RemoteOption {
return rpcAddr(addr)
}
type rpcAddr string
func (a rpcAddr) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.GRPC.Address = string(a)
return nil
}
return fmt.Errorf("WithRemoteAddr option not supported for this remote")
}
// WithRemoteAddrUser sets the uid and gid to create the RPC address with
func WithRemoteAddrUser(uid, gid int) RemoteOption {
return rpcUser{uid, gid}
}
type rpcUser struct {
uid int
gid int
}
func (u rpcUser) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.GRPC.UID = u.uid
remote.GRPC.GID = u.gid
return nil
}
return fmt.Errorf("WithRemoteAddr option not supported for this remote")
}
// WithStartDaemon defines if libcontainerd should also run containerd daemon.
func WithStartDaemon(start bool) RemoteOption {
return startDaemon(start)
}
type startDaemon bool
func (s startDaemon) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.startDaemon = bool(s)
return nil
}
return fmt.Errorf("WithStartDaemon option not supported for this remote")
}
// WithLogLevel defines which log level to starts containerd with.
// This only makes sense if WithStartDaemon() was set to true.
func WithLogLevel(lvl string) RemoteOption {
return logLevel(lvl)
}
type logLevel string
func (l logLevel) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.Debug.Level = string(l)
return nil
}
return fmt.Errorf("WithDebugLog option not supported for this remote")
}
// WithDebugAddress defines at which location the debug GRPC connection
// should be made
func WithDebugAddress(addr string) RemoteOption {
return debugAddress(addr)
}
type debugAddress string
func (d debugAddress) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.Debug.Address = string(d)
return nil
}
return fmt.Errorf("WithDebugAddress option not supported for this remote")
}
// WithMetricsAddress defines at which location the debug GRPC connection
// should be made
func WithMetricsAddress(addr string) RemoteOption {
return metricsAddress(addr)
}
type metricsAddress string
func (m metricsAddress) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.Metrics.Address = string(m)
return nil
}
return fmt.Errorf("WithMetricsAddress option not supported for this remote")
}
// WithSnapshotter defines snapshotter driver should be used
func WithSnapshotter(name string) RemoteOption {
return snapshotter(name)
}
type snapshotter string
func (s snapshotter) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.snapshotter = string(s)
return nil
}
return fmt.Errorf("WithSnapshotter option not supported for this remote")
}
// WithPlugin allow configuring a containerd plugin
// configuration values passed needs to be quoted if quotes are needed in
// the toml format.
// Setting the config to nil will disable a built-in plugin
func WithPlugin(name string, conf interface{}) RemoteOption {
return pluginConf{
name: name,
conf: conf,
}
}
type pluginConf struct {
// Name is the name of the plugin
name string
conf interface{}
}
func (p pluginConf) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.pluginConfs.Plugins[p.name] = p.conf
return nil
}
return fmt.Errorf("WithPlugin option not supported for this remote")
}

View file

@ -1,18 +0,0 @@
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import "fmt"
// WithOOMScore defines the oom_score_adj to set for the containerd process.
func WithOOMScore(score int) RemoteOption {
return oomScore(score)
}
type oomScore int
func (o oomScore) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.OOMScore = int(o)
return nil
}
return fmt.Errorf("WithOOMScore option not supported for this remote")
}

View file

@ -1,59 +0,0 @@
// +build windows
package libcontainerd // import "github.com/docker/docker/libcontainerd"
import (
"sync"
"github.com/sirupsen/logrus"
)
type remote struct {
sync.RWMutex
logger *logrus.Entry
clients []*client
// Options
rootDir string
stateDir string
}
// New creates a fresh instance of libcontainerd remote.
func New(rootDir, stateDir string, options ...RemoteOption) (Remote, error) {
return &remote{
logger: logrus.WithField("module", "libcontainerd"),
rootDir: rootDir,
stateDir: stateDir,
}, nil
}
type client struct {
sync.Mutex
rootDir string
stateDir string
backend Backend
logger *logrus.Entry
eventQ queue
containers map[string]*container
}
func (r *remote) NewClient(ns string, b Backend) (Client, error) {
c := &client{
rootDir: r.rootDir,
stateDir: r.stateDir,
backend: b,
logger: r.logger.WithField("namespace", ns),
containers: make(map[string]*container),
}
r.Lock()
r.clients = append(r.clients, c)
r.Unlock()
return c, nil
}
func (r *remote) Cleanup() {
// Nothing to do
}

View file

@ -1,6 +1,4 @@
// +build !windows
package libcontainerd // import "github.com/docker/docker/libcontainerd"
package supervisor // import "github.com/docker/docker/libcontainerd/supervisor"
import (
"context"
@ -13,7 +11,6 @@ import (
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/BurntSushi/toml"
@ -28,6 +25,7 @@ const (
maxConnectionRetryCount = 3
healthCheckTimeout = 3 * time.Second
shutdownTimeout = 15 * time.Second
startupTimeout = 15 * time.Second
configFile = "containerd.toml"
binaryName = "docker-containerd"
pidFile = "docker-containerd.pid"
@ -44,28 +42,26 @@ type remote struct {
daemonPid int
logger *logrus.Entry
daemonWaitCh chan struct{}
clients []*client
shutdownContext context.Context
shutdownCancel context.CancelFunc
shutdown bool
daemonWaitCh chan struct{}
daemonStartCh chan struct{}
daemonStopCh chan struct{}
// Options
startDaemon bool
rootDir string
stateDir string
snapshotter string
pluginConfs pluginConfigs
}
// New creates a fresh instance of libcontainerd remote.
func New(rootDir, stateDir string, options ...RemoteOption) (rem Remote, err error) {
defer func() {
if err != nil {
err = errors.Wrap(err, "Failed to connect to containerd")
}
}()
// Daemon represents a running containerd daemon
type Daemon interface {
WaitTimeout(time.Duration) error
Address() string
}
// DaemonOpt allows to configure parameters of container daemons
type DaemonOpt func(c *remote) error
// Start starts a containerd daemon and monitors it
func Start(ctx context.Context, rootDir, stateDir string, opts ...DaemonOpt) (Daemon, error) {
r := &remote{
rootDir: rootDir,
stateDir: stateDir,
@ -73,86 +69,47 @@ func New(rootDir, stateDir string, options ...RemoteOption) (rem Remote, err err
Root: filepath.Join(rootDir, "daemon"),
State: filepath.Join(stateDir, "daemon"),
},
pluginConfs: pluginConfigs{make(map[string]interface{})},
daemonPid: -1,
logger: logrus.WithField("module", "libcontainerd"),
pluginConfs: pluginConfigs{make(map[string]interface{})},
daemonPid: -1,
logger: logrus.WithField("module", "libcontainerd"),
daemonStartCh: make(chan struct{}),
daemonStopCh: make(chan struct{}),
}
r.shutdownContext, r.shutdownCancel = context.WithCancel(context.Background())
rem = r
for _, option := range options {
if err = option.Apply(r); err != nil {
return
for _, opt := range opts {
if err := opt(r); err != nil {
return nil, err
}
}
r.setDefaults()
if err = system.MkdirAll(stateDir, 0700, ""); err != nil {
return
if err := system.MkdirAll(stateDir, 0700, ""); err != nil {
return nil, err
}
if r.startDaemon {
os.Remove(r.GRPC.Address)
if err = r.startContainerd(); err != nil {
return
}
defer func() {
if err != nil {
r.Cleanup()
}
}()
}
go r.monitorDaemon(ctx)
// This connection is just used to monitor the connection
client, err := containerd.New(r.GRPC.Address)
if err != nil {
return
select {
case <-time.After(startupTimeout):
return nil, errors.New("timeout waiting for containerd to start")
case <-r.daemonStartCh:
}
if _, err := client.Version(context.Background()); err != nil {
system.KillProcess(r.daemonPid)
return nil, errors.Wrapf(err, "unable to get containerd version")
}
go r.monitorConnection(client)
return r, nil
}
func (r *remote) NewClient(ns string, b Backend) (Client, error) {
c := &client{
stateDir: r.stateDir,
logger: r.logger.WithField("namespace", ns),
namespace: ns,
backend: b,
containers: make(map[string]*container),
func (r *remote) WaitTimeout(d time.Duration) error {
select {
case <-time.After(d):
return errors.New("timeout waiting for containerd to stop")
case <-r.daemonStopCh:
}
rclient, err := containerd.New(r.GRPC.Address, containerd.WithDefaultNamespace(ns))
if err != nil {
return nil, err
}
c.remote = rclient
go c.processEventStream(r.shutdownContext)
r.Lock()
r.clients = append(r.clients, c)
r.Unlock()
return c, nil
return nil
}
func (r *remote) Cleanup() {
if r.daemonPid != -1 {
r.shutdownCancel()
r.stopDaemon()
}
// cleanup some files
os.Remove(filepath.Join(r.stateDir, pidFile))
r.platformCleanup()
func (r *remote) Address() string {
return r.GRPC.Address
}
func (r *remote) getContainerdPid() (int, error) {
pidFile := filepath.Join(r.stateDir, pidFile)
f, err := os.OpenFile(pidFile, os.O_RDWR, 0600)
@ -265,85 +222,90 @@ func (r *remote) startContainerd() error {
return nil
}
func (r *remote) monitorConnection(monitor *containerd.Client) {
var transientFailureCount = 0
func (r *remote) monitorDaemon(ctx context.Context) {
var (
transientFailureCount = 0
client *containerd.Client
err error
delay <-chan time.Time
started bool
)
defer func() {
if r.daemonPid != -1 {
r.stopDaemon()
}
// cleanup some files
os.Remove(filepath.Join(r.stateDir, pidFile))
r.platformCleanup()
close(r.daemonStopCh)
}()
for {
select {
case <-r.shutdownContext.Done():
case <-ctx.Done():
r.logger.Info("stopping healthcheck following graceful shutdown")
monitor.Close()
if client != nil {
client.Close()
}
return
case <-time.After(500 * time.Millisecond):
case <-delay:
default:
}
ctx, cancel := context.WithTimeout(r.shutdownContext, healthCheckTimeout)
_, err := monitor.IsServing(ctx)
if r.daemonPid == -1 {
if r.daemonWaitCh != nil {
<-r.daemonWaitCh
}
os.RemoveAll(r.GRPC.Address)
if err := r.startContainerd(); err != nil {
r.logger.WithError(err).Error("failed starting containerd")
delay = time.After(50 * time.Millisecond)
continue
}
client, err = containerd.New(r.GRPC.Address)
if err != nil {
r.logger.WithError(err).Error("failed connecting to containerd")
delay = time.After(100 * time.Millisecond)
continue
}
}
tctx, cancel := context.WithTimeout(ctx, healthCheckTimeout)
_, err := client.IsServing(tctx)
cancel()
if err == nil {
transientFailureCount = 0
continue
}
if !started {
close(r.daemonStartCh)
started = true
}
select {
case <-r.shutdownContext.Done():
r.logger.Info("stopping healthcheck following graceful shutdown")
monitor.Close()
return
default:
transientFailureCount = 0
delay = time.After(500 * time.Millisecond)
continue
}
r.logger.WithError(err).WithField("binary", binaryName).Debug("daemon is not responding")
if r.daemonPid == -1 {
continue
}
transientFailureCount++
if transientFailureCount < maxConnectionRetryCount || system.IsProcessAlive(r.daemonPid) {
delay = time.After(time.Duration(transientFailureCount) * 200 * time.Millisecond)
continue
}
transientFailureCount = 0
if system.IsProcessAlive(r.daemonPid) {
r.logger.WithField("pid", r.daemonPid).Info("killing and restarting containerd")
// Try to get a stack trace
syscall.Kill(r.daemonPid, syscall.SIGUSR1)
<-time.After(100 * time.Millisecond)
system.KillProcess(r.daemonPid)
}
if r.daemonWaitCh != nil {
<-r.daemonWaitCh
r.killDaemon()
}
os.Remove(r.GRPC.Address)
if err := r.startContainerd(); err != nil {
r.logger.WithError(err).Error("failed restarting containerd")
continue
}
if err := monitor.Reconnect(); err != nil {
r.logger.WithError(err).Error("failed connect to containerd")
continue
}
var wg sync.WaitGroup
for _, c := range r.clients {
wg.Add(1)
go func(c *client) {
defer wg.Done()
c.logger.WithField("namespace", c.namespace).Debug("creating new containerd remote client")
if err := c.reconnect(); err != nil {
r.logger.WithError(err).Error("failed to connect to containerd")
// TODO: Better way to handle this?
// This *shouldn't* happen, but this could wind up where the daemon
// is not able to communicate with an eventually up containerd
}
}(c)
wg.Wait()
}
client.Close()
r.daemonPid = -1
delay = nil
transientFailureCount = 0
}
}

View file

@ -1,4 +1,4 @@
package libcontainerd // import "github.com/docker/docker/libcontainerd"
package supervisor // import "github.com/docker/docker/libcontainerd/supervisor"
import (
"os"
@ -38,10 +38,6 @@ func (r *remote) setDefaults() {
delete(r.pluginConfs.Plugins, key)
}
}
if r.snapshotter == "" {
r.snapshotter = "overlay"
}
}
func (r *remote) stopDaemon() {
@ -61,6 +57,13 @@ func (r *remote) stopDaemon() {
}
}
func (r *remote) killDaemon() {
// Try to get a stack trace
syscall.Kill(r.daemonPid, syscall.SIGUSR1)
<-time.After(100 * time.Millisecond)
system.KillProcess(r.daemonPid)
}
func (r *remote) platformCleanup() {
os.Remove(filepath.Join(r.stateDir, sockFile))
}

View file

@ -0,0 +1,55 @@
package supervisor // import "github.com/docker/docker/libcontainerd/supervisor"
// WithRemoteAddr sets the external containerd socket to connect to.
func WithRemoteAddr(addr string) DaemonOpt {
return func(r *remote) error {
r.GRPC.Address = addr
return nil
}
}
// WithRemoteAddrUser sets the uid and gid to create the RPC address with
func WithRemoteAddrUser(uid, gid int) DaemonOpt {
return func(r *remote) error {
r.GRPC.UID = uid
r.GRPC.GID = gid
return nil
}
}
// WithLogLevel defines which log level to starts containerd with.
// This only makes sense if WithStartDaemon() was set to true.
func WithLogLevel(lvl string) DaemonOpt {
return func(r *remote) error {
r.Debug.Level = lvl
return nil
}
}
// WithDebugAddress defines at which location the debug GRPC connection
// should be made
func WithDebugAddress(addr string) DaemonOpt {
return func(r *remote) error {
r.Debug.Address = addr
return nil
}
}
// WithMetricsAddress defines at which location the debug GRPC connection
// should be made
func WithMetricsAddress(addr string) DaemonOpt {
return func(r *remote) error {
r.Metrics.Address = addr
return nil
}
}
// WithPlugin allow configuring a containerd plugin
// configuration values passed needs to be quoted if quotes are needed in
// the toml format.
func WithPlugin(name string, conf interface{}) DaemonOpt {
return func(r *remote) error {
r.pluginConfs.Plugins[name] = conf
return nil
}
}

View file

@ -0,0 +1,9 @@
package supervisor // import "github.com/docker/docker/libcontainerd/supervisor"
// WithOOMScore defines the oom_score_adj to set for the containerd process.
func WithOOMScore(score int) DaemonOpt {
return func(r *remote) error {
r.OOMScore = score
return nil
}
}

View file

@ -1,9 +1,9 @@
// +build remote_daemon
package libcontainerd // import "github.com/docker/docker/libcontainerd"
package supervisor // import "github.com/docker/docker/libcontainerd/supervisor"
import (
"os"
"github.com/docker/docker/pkg/system"
)
const (
@ -18,9 +18,6 @@ func (r *remote) setDefaults() {
if r.Debug.Address == "" {
r.Debug.Address = debugPipeName
}
if r.snapshotter == "" {
r.snapshotter = "naive" // TODO(mlaventure): switch to "windows" once implemented
}
}
func (r *remote) stopDaemon() {
@ -42,6 +39,10 @@ func (r *remote) stopDaemon() {
}
}
func (r *remote) killDaemon() {
system.KillProcess(r.daemonPid)
}
func (r *remote) platformCleanup() {
// Nothing to do
}

View file

@ -1,4 +1,4 @@
package libcontainerd // import "github.com/docker/docker/libcontainerd"
package supervisor // import "github.com/docker/docker/libcontainerd/supervisor"
import "syscall"

View file

@ -0,0 +1,9 @@
package supervisor // import "github.com/docker/docker/libcontainerd/supervisor"
import "syscall"
// containerdSysProcAttr returns the SysProcAttr to use when exec'ing
// containerd
func containerdSysProcAttr() *syscall.SysProcAttr {
return nil
}

View file

@ -46,23 +46,6 @@ const (
StatusUnknown Status = "unknown"
)
// Remote on Linux defines the accesspoint to the containerd grpc API.
// Remote on Windows is largely an unimplemented interface as there is
// no remote containerd.
type Remote interface {
// Client returns a new Client instance connected with given Backend.
NewClient(namespace string, backend Backend) (Client, error)
// Cleanup stops containerd if it was started by libcontainerd.
// Note this is not used on Windows as there is no remote containerd.
Cleanup()
}
// RemoteOption allows to configure parameters of remotes.
// This is unused on Windows.
type RemoteOption interface {
Apply(Remote) error
}
// EventInfo contains the event info
type EventInfo struct {
ContainerID string

View file

@ -3,8 +3,6 @@ package libcontainerd // import "github.com/docker/docker/libcontainerd"
import (
"strings"
"syscall"
opengcs "github.com/Microsoft/opengcs/client"
)
@ -38,9 +36,3 @@ func (c *container) debugGCS() {
}
cfg.DebugGCS()
}
// containerdSysProcAttr returns the SysProcAttr to use when exec'ing
// containerd
func containerdSysProcAttr() *syscall.SysProcAttr {
return nil
}

View file

@ -7,6 +7,7 @@ import (
"sync"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/runtime/linux/runctypes"
"github.com/docker/docker/errdefs"
@ -16,8 +17,8 @@ import (
"github.com/sirupsen/logrus"
)
// pluginNamespace is the name used for the plugins namespace
const pluginNamespace = "plugins.moby"
// PluginNamespace is the name used for the plugins namespace
const PluginNamespace = "plugins.moby"
// ExitHandler represents an object that is called when the exit event is received from containerd
type ExitHandler interface {
@ -38,12 +39,13 @@ type Client interface {
}
// New creates a new containerd plugin executor
func New(rootDir string, remote libcontainerd.Remote, exitHandler ExitHandler) (*Executor, error) {
func New(ctx context.Context, rootDir string, cli *containerd.Client, exitHandler ExitHandler) (*Executor, error) {
e := &Executor{
rootDir: rootDir,
exitHandler: exitHandler,
}
client, err := remote.NewClient(pluginNamespace, e)
client, err := libcontainerd.NewClient(ctx, cli, rootDir, PluginNamespace, e)
if err != nil {
return nil, errors.Wrap(err, "error creating containerd exec client")
}