Merge pull request #37149 from dmcgowan/split-libcontainerd
libcontainerd: split client and daemon supervision
This commit is contained in:
commit
4d62192646
19 changed files with 339 additions and 475 deletions
|
@ -36,7 +36,7 @@ import (
|
|||
"github.com/docker/docker/daemon/config"
|
||||
"github.com/docker/docker/daemon/listeners"
|
||||
"github.com/docker/docker/dockerversion"
|
||||
"github.com/docker/docker/libcontainerd"
|
||||
"github.com/docker/docker/libcontainerd/supervisor"
|
||||
dopts "github.com/docker/docker/opts"
|
||||
"github.com/docker/docker/pkg/authorization"
|
||||
"github.com/docker/docker/pkg/jsonmessage"
|
||||
|
@ -45,7 +45,6 @@ import (
|
|||
"github.com/docker/docker/pkg/signal"
|
||||
"github.com/docker/docker/pkg/system"
|
||||
"github.com/docker/docker/plugin"
|
||||
"github.com/docker/docker/registry"
|
||||
"github.com/docker/docker/runconfig"
|
||||
"github.com/docker/go-connections/tlsconfig"
|
||||
swarmapi "github.com/docker/swarmkit/api"
|
||||
|
@ -112,6 +111,10 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := system.MkdirAll(cli.Config.ExecRoot, 0700, ""); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if cli.Pidfile != "" {
|
||||
pf, err := pidfile.New(cli.Pidfile)
|
||||
if err != nil {
|
||||
|
@ -135,19 +138,27 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
|
|||
return fmt.Errorf("Failed to load listeners: %v", err)
|
||||
}
|
||||
|
||||
registryService, err := registry.NewService(cli.Config.ServiceOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
if cli.Config.ContainerdAddr == "" && runtime.GOOS != "windows" {
|
||||
opts, err := cli.getContainerdDaemonOpts()
|
||||
if err != nil {
|
||||
cancel()
|
||||
return fmt.Errorf("Failed to generate containerd options: %v", err)
|
||||
}
|
||||
|
||||
rOpts, err := cli.getRemoteOptions()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to generate containerd options: %v", err)
|
||||
}
|
||||
containerdRemote, err := libcontainerd.New(filepath.Join(cli.Config.Root, "containerd"), filepath.Join(cli.Config.ExecRoot, "containerd"), rOpts...)
|
||||
if err != nil {
|
||||
return err
|
||||
r, err := supervisor.Start(ctx, filepath.Join(cli.Config.Root, "containerd"), filepath.Join(cli.Config.ExecRoot, "containerd"), opts...)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return fmt.Errorf("Failed to start containerd: %v", err)
|
||||
}
|
||||
|
||||
cli.Config.ContainerdAddr = r.Address()
|
||||
|
||||
// Try to wait for containerd to shutdown
|
||||
defer r.WaitTimeout(10 * time.Second)
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
signal.Trap(func() {
|
||||
cli.stop()
|
||||
<-stopc // wait for daemonCli.start() to return
|
||||
|
@ -162,7 +173,7 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
|
|||
logrus.Fatalf("Error creating middlewares: %v", err)
|
||||
}
|
||||
|
||||
d, err := daemon.NewDaemon(cli.Config, registryService, containerdRemote, pluginStore)
|
||||
d, err := daemon.NewDaemon(ctx, cli.Config, pluginStore)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error starting daemon: %v", err)
|
||||
}
|
||||
|
@ -207,10 +218,7 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
|
|||
|
||||
initRouter(routerOptions)
|
||||
|
||||
// process cluster change notifications
|
||||
watchCtx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go d.ProcessClusterNotifications(watchCtx, c.GetWatchStream())
|
||||
go d.ProcessClusterNotifications(ctx, c.GetWatchStream())
|
||||
|
||||
cli.setupConfigReloadTrap()
|
||||
|
||||
|
@ -227,8 +235,12 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
|
|||
// Wait for serve API to complete
|
||||
errAPI := <-serveAPIWait
|
||||
c.Cleanup()
|
||||
|
||||
shutdownDaemon(d)
|
||||
containerdRemote.Cleanup()
|
||||
|
||||
// Stop notification processing and any background processes
|
||||
cancel()
|
||||
|
||||
if errAPI != nil {
|
||||
return fmt.Errorf("Shutting down due to ServeAPI error: %v", errAPI)
|
||||
}
|
||||
|
@ -511,14 +523,22 @@ func (cli *DaemonCli) initMiddlewares(s *apiserver.Server, cfg *apiserver.Config
|
|||
return nil
|
||||
}
|
||||
|
||||
func (cli *DaemonCli) getRemoteOptions() ([]libcontainerd.RemoteOption, error) {
|
||||
opts := []libcontainerd.RemoteOption{}
|
||||
|
||||
pOpts, err := cli.getPlatformRemoteOptions()
|
||||
func (cli *DaemonCli) getContainerdDaemonOpts() ([]supervisor.DaemonOpt, error) {
|
||||
opts, err := cli.getPlatformContainerdDaemonOpts()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opts = append(opts, pOpts...)
|
||||
|
||||
if cli.Config.Debug {
|
||||
opts = append(opts, supervisor.WithLogLevel("debug"))
|
||||
} else if cli.Config.LogLevel != "" {
|
||||
opts = append(opts, supervisor.WithLogLevel(cli.Config.LogLevel))
|
||||
}
|
||||
|
||||
if !cli.Config.CriContainerd {
|
||||
opts = append(opts, supervisor.WithPlugin("cri", nil))
|
||||
}
|
||||
|
||||
return opts, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ import (
|
|||
"github.com/containerd/containerd/runtime/linux"
|
||||
"github.com/docker/docker/cmd/dockerd/hack"
|
||||
"github.com/docker/docker/daemon"
|
||||
"github.com/docker/docker/libcontainerd"
|
||||
"github.com/docker/docker/libcontainerd/supervisor"
|
||||
"github.com/docker/libnetwork/portallocator"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
@ -36,29 +36,16 @@ func getDaemonConfDir(_ string) string {
|
|||
return "/etc/docker"
|
||||
}
|
||||
|
||||
func (cli *DaemonCli) getPlatformRemoteOptions() ([]libcontainerd.RemoteOption, error) {
|
||||
opts := []libcontainerd.RemoteOption{
|
||||
libcontainerd.WithOOMScore(cli.Config.OOMScoreAdjust),
|
||||
libcontainerd.WithPlugin("linux", &linux.Config{
|
||||
func (cli *DaemonCli) getPlatformContainerdDaemonOpts() ([]supervisor.DaemonOpt, error) {
|
||||
opts := []supervisor.DaemonOpt{
|
||||
supervisor.WithOOMScore(cli.Config.OOMScoreAdjust),
|
||||
supervisor.WithPlugin("linux", &linux.Config{
|
||||
Shim: daemon.DefaultShimBinary,
|
||||
Runtime: daemon.DefaultRuntimeBinary,
|
||||
RuntimeRoot: filepath.Join(cli.Config.Root, "runc"),
|
||||
ShimDebug: cli.Config.Debug,
|
||||
}),
|
||||
}
|
||||
if cli.Config.Debug {
|
||||
opts = append(opts, libcontainerd.WithLogLevel("debug"))
|
||||
} else if cli.Config.LogLevel != "" {
|
||||
opts = append(opts, libcontainerd.WithLogLevel(cli.Config.LogLevel))
|
||||
}
|
||||
if cli.Config.ContainerdAddr != "" {
|
||||
opts = append(opts, libcontainerd.WithRemoteAddr(cli.Config.ContainerdAddr))
|
||||
} else {
|
||||
opts = append(opts, libcontainerd.WithStartDaemon(true))
|
||||
}
|
||||
if !cli.Config.CriContainerd {
|
||||
opts = append(opts, libcontainerd.WithPlugin("cri", nil))
|
||||
}
|
||||
|
||||
return opts, nil
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/docker/docker/libcontainerd"
|
||||
"github.com/docker/docker/libcontainerd/supervisor"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/sys/windows"
|
||||
)
|
||||
|
@ -48,7 +48,7 @@ func notifyShutdown(err error) {
|
|||
}
|
||||
}
|
||||
|
||||
func (cli *DaemonCli) getPlatformRemoteOptions() ([]libcontainerd.RemoteOption, error) {
|
||||
func (cli *DaemonCli) getPlatformContainerdDaemonOpts() ([]supervisor.DaemonOpt, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,11 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/defaults"
|
||||
"github.com/containerd/containerd/pkg/dialer"
|
||||
"github.com/docker/docker/api/types"
|
||||
containertypes "github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/api/types/swarm"
|
||||
|
@ -94,6 +99,7 @@ type Daemon struct {
|
|||
PluginStore *plugin.Store // todo: remove
|
||||
pluginManager *plugin.Manager
|
||||
linkIndex *linkIndex
|
||||
containerdCli *containerd.Client
|
||||
containerd libcontainerd.Client
|
||||
defaultIsolation containertypes.Isolation // Default isolation mode on Windows
|
||||
clusterProvider cluster.Provider
|
||||
|
@ -565,9 +571,14 @@ func (daemon *Daemon) IsSwarmCompatible() error {
|
|||
|
||||
// NewDaemon sets up everything for the daemon to be able to service
|
||||
// requests from the webserver.
|
||||
func NewDaemon(config *config.Config, registryService registry.Service, containerdRemote libcontainerd.Remote, pluginStore *plugin.Store) (daemon *Daemon, err error) {
|
||||
func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.Store) (daemon *Daemon, err error) {
|
||||
setDefaultMtu(config)
|
||||
|
||||
registryService, err := registry.NewService(config.ServiceOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Ensure that we have a correct root key limit for launching containers.
|
||||
if err := ModifyRootKeyLimit(); err != nil {
|
||||
logrus.Warnf("unable to modify root key limit, number of containers could be limited by this quota: %v", err)
|
||||
|
@ -720,8 +731,35 @@ func NewDaemon(config *config.Config, registryService registry.Service, containe
|
|||
}
|
||||
registerMetricsPluginCallback(d.PluginStore, metricsSockPath)
|
||||
|
||||
gopts := []grpc.DialOption{
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithBackoffMaxDelay(3 * time.Second),
|
||||
grpc.WithDialer(dialer.Dialer),
|
||||
|
||||
// TODO(stevvooe): We may need to allow configuration of this on the client.
|
||||
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)),
|
||||
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)),
|
||||
}
|
||||
if config.ContainerdAddr != "" {
|
||||
d.containerdCli, err = containerd.New(config.ContainerdAddr, containerd.WithDefaultNamespace(ContainersNamespace), containerd.WithDialOpts(gopts))
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to dial %q", config.ContainerdAddr)
|
||||
}
|
||||
}
|
||||
|
||||
createPluginExec := func(m *plugin.Manager) (plugin.Executor, error) {
|
||||
return pluginexec.New(getPluginExecRoot(config.Root), containerdRemote, m)
|
||||
var pluginCli *containerd.Client
|
||||
|
||||
// Windows is not currently using containerd, keep the
|
||||
// client as nil
|
||||
if config.ContainerdAddr != "" {
|
||||
pluginCli, err = containerd.New(config.ContainerdAddr, containerd.WithDefaultNamespace(pluginexec.PluginNamespace), containerd.WithDialOpts(gopts))
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to dial %q", config.ContainerdAddr)
|
||||
}
|
||||
}
|
||||
|
||||
return pluginexec.New(ctx, getPluginExecRoot(config.Root), pluginCli, m)
|
||||
}
|
||||
|
||||
// Plugin system initialization should happen before restore. Do not change order.
|
||||
|
@ -880,7 +918,7 @@ func NewDaemon(config *config.Config, registryService registry.Service, containe
|
|||
|
||||
go d.execCommandGC()
|
||||
|
||||
d.containerd, err = containerdRemote.NewClient(ContainersNamespace, d)
|
||||
d.containerd, err = libcontainerd.NewClient(ctx, d.containerdCli, filepath.Join(config.ExecRoot, "containerd"), ContainersNamespace, d)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1037,6 +1075,10 @@ func (daemon *Daemon) Shutdown() error {
|
|||
daemon.netController.Stop()
|
||||
}
|
||||
|
||||
if daemon.containerdCli != nil {
|
||||
daemon.containerdCli.Close()
|
||||
}
|
||||
|
||||
return daemon.cleanupMounts()
|
||||
}
|
||||
|
||||
|
|
|
@ -102,38 +102,34 @@ func (c *container) getOOMKilled() bool {
|
|||
type client struct {
|
||||
sync.RWMutex // protects containers map
|
||||
|
||||
remote *containerd.Client
|
||||
client *containerd.Client
|
||||
stateDir string
|
||||
logger *logrus.Entry
|
||||
ns string
|
||||
|
||||
namespace string
|
||||
backend Backend
|
||||
eventQ queue
|
||||
containers map[string]*container
|
||||
}
|
||||
|
||||
func (c *client) reconnect() error {
|
||||
c.Lock()
|
||||
err := c.remote.Reconnect()
|
||||
c.Unlock()
|
||||
return err
|
||||
}
|
||||
// NewClient creates a new libcontainerd client from a containerd client
|
||||
func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b Backend) (Client, error) {
|
||||
c := &client{
|
||||
client: cli,
|
||||
stateDir: stateDir,
|
||||
logger: logrus.WithField("module", "libcontainerd").WithField("namespace", ns),
|
||||
ns: ns,
|
||||
backend: b,
|
||||
containers: make(map[string]*container),
|
||||
}
|
||||
|
||||
func (c *client) setRemote(remote *containerd.Client) {
|
||||
c.Lock()
|
||||
c.remote = remote
|
||||
c.Unlock()
|
||||
}
|
||||
go c.processEventStream(ctx, ns)
|
||||
|
||||
func (c *client) getRemote() *containerd.Client {
|
||||
c.RLock()
|
||||
remote := c.remote
|
||||
c.RUnlock()
|
||||
return remote
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *client) Version(ctx context.Context) (containerd.Version, error) {
|
||||
return c.getRemote().Version(ctx)
|
||||
return c.client.Version(ctx)
|
||||
}
|
||||
|
||||
// Restore loads the containerd container.
|
||||
|
@ -170,7 +166,7 @@ func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallba
|
|||
err = wrapError(err)
|
||||
}()
|
||||
|
||||
ctr, err := c.getRemote().LoadContainer(ctx, id)
|
||||
ctr, err := c.client.LoadContainer(ctx, id)
|
||||
if err != nil {
|
||||
return false, -1, errors.WithStack(wrapError(err))
|
||||
}
|
||||
|
@ -225,7 +221,7 @@ func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, run
|
|||
|
||||
c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")
|
||||
|
||||
cdCtr, err := c.getRemote().NewContainer(ctx, id,
|
||||
cdCtr, err := c.client.NewContainer(ctx, id,
|
||||
containerd.WithSpec(ociSpec),
|
||||
// TODO(mlaventure): when containerd support lcow, revisit runtime value
|
||||
containerd.WithRuntime(fmt.Sprintf("io.containerd.runtime.v1.%s", runtime.GOOS), runtimeOptions))
|
||||
|
@ -268,7 +264,7 @@ func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin
|
|||
// remove the checkpoint when we're done
|
||||
defer func() {
|
||||
if cp != nil {
|
||||
err := c.getRemote().ContentStore().Delete(context.Background(), cp.Digest)
|
||||
err := c.client.ContentStore().Delete(context.Background(), cp.Digest)
|
||||
if err != nil {
|
||||
c.logger.WithError(err).WithFields(logrus.Fields{
|
||||
"ref": checkpointDir,
|
||||
|
@ -571,14 +567,14 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi
|
|||
}
|
||||
// Whatever happens, delete the checkpoint from containerd
|
||||
defer func() {
|
||||
err := c.getRemote().ImageService().Delete(context.Background(), img.Name())
|
||||
err := c.client.ImageService().Delete(context.Background(), img.Name())
|
||||
if err != nil {
|
||||
c.logger.WithError(err).WithField("digest", img.Target().Digest).
|
||||
Warnf("failed to delete checkpoint image")
|
||||
}
|
||||
}()
|
||||
|
||||
b, err := content.ReadBlob(ctx, c.getRemote().ContentStore(), img.Target())
|
||||
b, err := content.ReadBlob(ctx, c.client.ContentStore(), img.Target())
|
||||
if err != nil {
|
||||
return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data"))
|
||||
}
|
||||
|
@ -598,7 +594,7 @@ func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDi
|
|||
return errdefs.System(errors.Wrapf(err, "invalid checkpoint"))
|
||||
}
|
||||
|
||||
rat, err := c.getRemote().ContentStore().ReaderAt(ctx, *cpDesc)
|
||||
rat, err := c.client.ContentStore().ReaderAt(ctx, *cpDesc)
|
||||
if err != nil {
|
||||
return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader"))
|
||||
}
|
||||
|
@ -735,7 +731,7 @@ func (c *client) processEvent(ctr *container, et EventType, ei EventInfo) {
|
|||
})
|
||||
}
|
||||
|
||||
func (c *client) processEventStream(ctx context.Context) {
|
||||
func (c *client) processEventStream(ctx context.Context, ns string) {
|
||||
var (
|
||||
err error
|
||||
ev *events.Envelope
|
||||
|
@ -746,9 +742,9 @@ func (c *client) processEventStream(ctx context.Context) {
|
|||
|
||||
// Filter on both namespace *and* topic. To create an "and" filter,
|
||||
// this must be a single, comma-separated string
|
||||
eventStream, errC := c.getRemote().EventService().Subscribe(ctx, "namespace=="+c.namespace+",topic~=|^/tasks/|")
|
||||
eventStream, errC := c.client.EventService().Subscribe(ctx, "namespace=="+ns+",topic~=|^/tasks/|")
|
||||
|
||||
c.logger.WithField("namespace", c.namespace).Debug("processing event stream")
|
||||
c.logger.Debug("processing event stream")
|
||||
|
||||
var oomKilled bool
|
||||
for {
|
||||
|
@ -758,7 +754,7 @@ func (c *client) processEventStream(ctx context.Context) {
|
|||
errStatus, ok := status.FromError(err)
|
||||
if !ok || errStatus.Code() != codes.Canceled {
|
||||
c.logger.WithError(err).Error("failed to get event")
|
||||
go c.processEventStream(ctx)
|
||||
go c.processEventStream(ctx, ns)
|
||||
} else {
|
||||
c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
|
||||
}
|
||||
|
@ -858,7 +854,7 @@ func (c *client) processEventStream(ctx context.Context) {
|
|||
}
|
||||
|
||||
func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
|
||||
writer, err := c.getRemote().ContentStore().Writer(ctx, content.WithRef(ref))
|
||||
writer, err := c.client.ContentStore().Writer(ctx, content.WithRef(ref))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -70,6 +70,28 @@ const (
|
|||
// of docker.
|
||||
const defaultOwner = "docker"
|
||||
|
||||
type client struct {
|
||||
sync.Mutex
|
||||
|
||||
stateDir string
|
||||
backend Backend
|
||||
logger *logrus.Entry
|
||||
eventQ queue
|
||||
containers map[string]*container
|
||||
}
|
||||
|
||||
// NewClient creates a new local executor for windows
|
||||
func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b Backend) (Client, error) {
|
||||
c := &client{
|
||||
stateDir: stateDir,
|
||||
backend: b,
|
||||
logger: logrus.WithField("module", "libcontainerd").WithField("module", "libcontainerd").WithField("namespace", ns),
|
||||
containers: make(map[string]*container),
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *client) Version(ctx context.Context) (containerd.Version, error) {
|
||||
return containerd.Version{}, errors.New("not implemented on Windows")
|
||||
}
|
||||
|
|
|
@ -1,142 +0,0 @@
|
|||
// +build !windows
|
||||
|
||||
package libcontainerd // import "github.com/docker/docker/libcontainerd"
|
||||
|
||||
import "fmt"
|
||||
|
||||
// WithRemoteAddr sets the external containerd socket to connect to.
|
||||
func WithRemoteAddr(addr string) RemoteOption {
|
||||
return rpcAddr(addr)
|
||||
}
|
||||
|
||||
type rpcAddr string
|
||||
|
||||
func (a rpcAddr) Apply(r Remote) error {
|
||||
if remote, ok := r.(*remote); ok {
|
||||
remote.GRPC.Address = string(a)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("WithRemoteAddr option not supported for this remote")
|
||||
}
|
||||
|
||||
// WithRemoteAddrUser sets the uid and gid to create the RPC address with
|
||||
func WithRemoteAddrUser(uid, gid int) RemoteOption {
|
||||
return rpcUser{uid, gid}
|
||||
}
|
||||
|
||||
type rpcUser struct {
|
||||
uid int
|
||||
gid int
|
||||
}
|
||||
|
||||
func (u rpcUser) Apply(r Remote) error {
|
||||
if remote, ok := r.(*remote); ok {
|
||||
remote.GRPC.UID = u.uid
|
||||
remote.GRPC.GID = u.gid
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("WithRemoteAddr option not supported for this remote")
|
||||
}
|
||||
|
||||
// WithStartDaemon defines if libcontainerd should also run containerd daemon.
|
||||
func WithStartDaemon(start bool) RemoteOption {
|
||||
return startDaemon(start)
|
||||
}
|
||||
|
||||
type startDaemon bool
|
||||
|
||||
func (s startDaemon) Apply(r Remote) error {
|
||||
if remote, ok := r.(*remote); ok {
|
||||
remote.startDaemon = bool(s)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("WithStartDaemon option not supported for this remote")
|
||||
}
|
||||
|
||||
// WithLogLevel defines which log level to starts containerd with.
|
||||
// This only makes sense if WithStartDaemon() was set to true.
|
||||
func WithLogLevel(lvl string) RemoteOption {
|
||||
return logLevel(lvl)
|
||||
}
|
||||
|
||||
type logLevel string
|
||||
|
||||
func (l logLevel) Apply(r Remote) error {
|
||||
if remote, ok := r.(*remote); ok {
|
||||
remote.Debug.Level = string(l)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("WithDebugLog option not supported for this remote")
|
||||
}
|
||||
|
||||
// WithDebugAddress defines at which location the debug GRPC connection
|
||||
// should be made
|
||||
func WithDebugAddress(addr string) RemoteOption {
|
||||
return debugAddress(addr)
|
||||
}
|
||||
|
||||
type debugAddress string
|
||||
|
||||
func (d debugAddress) Apply(r Remote) error {
|
||||
if remote, ok := r.(*remote); ok {
|
||||
remote.Debug.Address = string(d)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("WithDebugAddress option not supported for this remote")
|
||||
}
|
||||
|
||||
// WithMetricsAddress defines at which location the debug GRPC connection
|
||||
// should be made
|
||||
func WithMetricsAddress(addr string) RemoteOption {
|
||||
return metricsAddress(addr)
|
||||
}
|
||||
|
||||
type metricsAddress string
|
||||
|
||||
func (m metricsAddress) Apply(r Remote) error {
|
||||
if remote, ok := r.(*remote); ok {
|
||||
remote.Metrics.Address = string(m)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("WithMetricsAddress option not supported for this remote")
|
||||
}
|
||||
|
||||
// WithSnapshotter defines snapshotter driver should be used
|
||||
func WithSnapshotter(name string) RemoteOption {
|
||||
return snapshotter(name)
|
||||
}
|
||||
|
||||
type snapshotter string
|
||||
|
||||
func (s snapshotter) Apply(r Remote) error {
|
||||
if remote, ok := r.(*remote); ok {
|
||||
remote.snapshotter = string(s)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("WithSnapshotter option not supported for this remote")
|
||||
}
|
||||
|
||||
// WithPlugin allow configuring a containerd plugin
|
||||
// configuration values passed needs to be quoted if quotes are needed in
|
||||
// the toml format.
|
||||
// Setting the config to nil will disable a built-in plugin
|
||||
func WithPlugin(name string, conf interface{}) RemoteOption {
|
||||
return pluginConf{
|
||||
name: name,
|
||||
conf: conf,
|
||||
}
|
||||
}
|
||||
|
||||
type pluginConf struct {
|
||||
// Name is the name of the plugin
|
||||
name string
|
||||
conf interface{}
|
||||
}
|
||||
|
||||
func (p pluginConf) Apply(r Remote) error {
|
||||
if remote, ok := r.(*remote); ok {
|
||||
remote.pluginConfs.Plugins[p.name] = p.conf
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("WithPlugin option not supported for this remote")
|
||||
}
|
|
@ -1,18 +0,0 @@
|
|||
package libcontainerd // import "github.com/docker/docker/libcontainerd"
|
||||
|
||||
import "fmt"
|
||||
|
||||
// WithOOMScore defines the oom_score_adj to set for the containerd process.
|
||||
func WithOOMScore(score int) RemoteOption {
|
||||
return oomScore(score)
|
||||
}
|
||||
|
||||
type oomScore int
|
||||
|
||||
func (o oomScore) Apply(r Remote) error {
|
||||
if remote, ok := r.(*remote); ok {
|
||||
remote.OOMScore = int(o)
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("WithOOMScore option not supported for this remote")
|
||||
}
|
|
@ -1,59 +0,0 @@
|
|||
// +build windows
|
||||
|
||||
package libcontainerd // import "github.com/docker/docker/libcontainerd"
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type remote struct {
|
||||
sync.RWMutex
|
||||
|
||||
logger *logrus.Entry
|
||||
clients []*client
|
||||
|
||||
// Options
|
||||
rootDir string
|
||||
stateDir string
|
||||
}
|
||||
|
||||
// New creates a fresh instance of libcontainerd remote.
|
||||
func New(rootDir, stateDir string, options ...RemoteOption) (Remote, error) {
|
||||
return &remote{
|
||||
logger: logrus.WithField("module", "libcontainerd"),
|
||||
rootDir: rootDir,
|
||||
stateDir: stateDir,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type client struct {
|
||||
sync.Mutex
|
||||
|
||||
rootDir string
|
||||
stateDir string
|
||||
backend Backend
|
||||
logger *logrus.Entry
|
||||
eventQ queue
|
||||
containers map[string]*container
|
||||
}
|
||||
|
||||
func (r *remote) NewClient(ns string, b Backend) (Client, error) {
|
||||
c := &client{
|
||||
rootDir: r.rootDir,
|
||||
stateDir: r.stateDir,
|
||||
backend: b,
|
||||
logger: r.logger.WithField("namespace", ns),
|
||||
containers: make(map[string]*container),
|
||||
}
|
||||
r.Lock()
|
||||
r.clients = append(r.clients, c)
|
||||
r.Unlock()
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (r *remote) Cleanup() {
|
||||
// Nothing to do
|
||||
}
|
|
@ -1,6 +1,4 @@
|
|||
// +build !windows
|
||||
|
||||
package libcontainerd // import "github.com/docker/docker/libcontainerd"
|
||||
package supervisor // import "github.com/docker/docker/libcontainerd/supervisor"
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -13,7 +11,6 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
|
@ -28,6 +25,7 @@ const (
|
|||
maxConnectionRetryCount = 3
|
||||
healthCheckTimeout = 3 * time.Second
|
||||
shutdownTimeout = 15 * time.Second
|
||||
startupTimeout = 15 * time.Second
|
||||
configFile = "containerd.toml"
|
||||
binaryName = "docker-containerd"
|
||||
pidFile = "docker-containerd.pid"
|
||||
|
@ -44,28 +42,26 @@ type remote struct {
|
|||
daemonPid int
|
||||
logger *logrus.Entry
|
||||
|
||||
daemonWaitCh chan struct{}
|
||||
clients []*client
|
||||
shutdownContext context.Context
|
||||
shutdownCancel context.CancelFunc
|
||||
shutdown bool
|
||||
daemonWaitCh chan struct{}
|
||||
daemonStartCh chan struct{}
|
||||
daemonStopCh chan struct{}
|
||||
|
||||
// Options
|
||||
startDaemon bool
|
||||
rootDir string
|
||||
stateDir string
|
||||
snapshotter string
|
||||
pluginConfs pluginConfigs
|
||||
}
|
||||
|
||||
// New creates a fresh instance of libcontainerd remote.
|
||||
func New(rootDir, stateDir string, options ...RemoteOption) (rem Remote, err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
err = errors.Wrap(err, "Failed to connect to containerd")
|
||||
}
|
||||
}()
|
||||
// Daemon represents a running containerd daemon
|
||||
type Daemon interface {
|
||||
WaitTimeout(time.Duration) error
|
||||
Address() string
|
||||
}
|
||||
|
||||
// DaemonOpt allows to configure parameters of container daemons
|
||||
type DaemonOpt func(c *remote) error
|
||||
|
||||
// Start starts a containerd daemon and monitors it
|
||||
func Start(ctx context.Context, rootDir, stateDir string, opts ...DaemonOpt) (Daemon, error) {
|
||||
r := &remote{
|
||||
rootDir: rootDir,
|
||||
stateDir: stateDir,
|
||||
|
@ -73,86 +69,47 @@ func New(rootDir, stateDir string, options ...RemoteOption) (rem Remote, err err
|
|||
Root: filepath.Join(rootDir, "daemon"),
|
||||
State: filepath.Join(stateDir, "daemon"),
|
||||
},
|
||||
pluginConfs: pluginConfigs{make(map[string]interface{})},
|
||||
daemonPid: -1,
|
||||
logger: logrus.WithField("module", "libcontainerd"),
|
||||
pluginConfs: pluginConfigs{make(map[string]interface{})},
|
||||
daemonPid: -1,
|
||||
logger: logrus.WithField("module", "libcontainerd"),
|
||||
daemonStartCh: make(chan struct{}),
|
||||
daemonStopCh: make(chan struct{}),
|
||||
}
|
||||
r.shutdownContext, r.shutdownCancel = context.WithCancel(context.Background())
|
||||
|
||||
rem = r
|
||||
for _, option := range options {
|
||||
if err = option.Apply(r); err != nil {
|
||||
return
|
||||
for _, opt := range opts {
|
||||
if err := opt(r); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
r.setDefaults()
|
||||
|
||||
if err = system.MkdirAll(stateDir, 0700, ""); err != nil {
|
||||
return
|
||||
if err := system.MkdirAll(stateDir, 0700, ""); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if r.startDaemon {
|
||||
os.Remove(r.GRPC.Address)
|
||||
if err = r.startContainerd(); err != nil {
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
r.Cleanup()
|
||||
}
|
||||
}()
|
||||
}
|
||||
go r.monitorDaemon(ctx)
|
||||
|
||||
// This connection is just used to monitor the connection
|
||||
client, err := containerd.New(r.GRPC.Address)
|
||||
if err != nil {
|
||||
return
|
||||
select {
|
||||
case <-time.After(startupTimeout):
|
||||
return nil, errors.New("timeout waiting for containerd to start")
|
||||
case <-r.daemonStartCh:
|
||||
}
|
||||
if _, err := client.Version(context.Background()); err != nil {
|
||||
system.KillProcess(r.daemonPid)
|
||||
return nil, errors.Wrapf(err, "unable to get containerd version")
|
||||
}
|
||||
|
||||
go r.monitorConnection(client)
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (r *remote) NewClient(ns string, b Backend) (Client, error) {
|
||||
c := &client{
|
||||
stateDir: r.stateDir,
|
||||
logger: r.logger.WithField("namespace", ns),
|
||||
namespace: ns,
|
||||
backend: b,
|
||||
containers: make(map[string]*container),
|
||||
func (r *remote) WaitTimeout(d time.Duration) error {
|
||||
select {
|
||||
case <-time.After(d):
|
||||
return errors.New("timeout waiting for containerd to stop")
|
||||
case <-r.daemonStopCh:
|
||||
}
|
||||
|
||||
rclient, err := containerd.New(r.GRPC.Address, containerd.WithDefaultNamespace(ns))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.remote = rclient
|
||||
|
||||
go c.processEventStream(r.shutdownContext)
|
||||
|
||||
r.Lock()
|
||||
r.clients = append(r.clients, c)
|
||||
r.Unlock()
|
||||
return c, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *remote) Cleanup() {
|
||||
if r.daemonPid != -1 {
|
||||
r.shutdownCancel()
|
||||
r.stopDaemon()
|
||||
}
|
||||
|
||||
// cleanup some files
|
||||
os.Remove(filepath.Join(r.stateDir, pidFile))
|
||||
|
||||
r.platformCleanup()
|
||||
func (r *remote) Address() string {
|
||||
return r.GRPC.Address
|
||||
}
|
||||
|
||||
func (r *remote) getContainerdPid() (int, error) {
|
||||
pidFile := filepath.Join(r.stateDir, pidFile)
|
||||
f, err := os.OpenFile(pidFile, os.O_RDWR, 0600)
|
||||
|
@ -265,85 +222,90 @@ func (r *remote) startContainerd() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (r *remote) monitorConnection(monitor *containerd.Client) {
|
||||
var transientFailureCount = 0
|
||||
func (r *remote) monitorDaemon(ctx context.Context) {
|
||||
var (
|
||||
transientFailureCount = 0
|
||||
client *containerd.Client
|
||||
err error
|
||||
delay <-chan time.Time
|
||||
started bool
|
||||
)
|
||||
|
||||
defer func() {
|
||||
if r.daemonPid != -1 {
|
||||
r.stopDaemon()
|
||||
}
|
||||
|
||||
// cleanup some files
|
||||
os.Remove(filepath.Join(r.stateDir, pidFile))
|
||||
|
||||
r.platformCleanup()
|
||||
|
||||
close(r.daemonStopCh)
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.shutdownContext.Done():
|
||||
case <-ctx.Done():
|
||||
r.logger.Info("stopping healthcheck following graceful shutdown")
|
||||
monitor.Close()
|
||||
if client != nil {
|
||||
client.Close()
|
||||
}
|
||||
return
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
case <-delay:
|
||||
default:
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.shutdownContext, healthCheckTimeout)
|
||||
_, err := monitor.IsServing(ctx)
|
||||
if r.daemonPid == -1 {
|
||||
if r.daemonWaitCh != nil {
|
||||
<-r.daemonWaitCh
|
||||
}
|
||||
|
||||
os.RemoveAll(r.GRPC.Address)
|
||||
if err := r.startContainerd(); err != nil {
|
||||
r.logger.WithError(err).Error("failed starting containerd")
|
||||
delay = time.After(50 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
||||
client, err = containerd.New(r.GRPC.Address)
|
||||
if err != nil {
|
||||
r.logger.WithError(err).Error("failed connecting to containerd")
|
||||
delay = time.After(100 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
tctx, cancel := context.WithTimeout(ctx, healthCheckTimeout)
|
||||
_, err := client.IsServing(tctx)
|
||||
cancel()
|
||||
if err == nil {
|
||||
transientFailureCount = 0
|
||||
continue
|
||||
}
|
||||
if !started {
|
||||
close(r.daemonStartCh)
|
||||
started = true
|
||||
}
|
||||
|
||||
select {
|
||||
case <-r.shutdownContext.Done():
|
||||
r.logger.Info("stopping healthcheck following graceful shutdown")
|
||||
monitor.Close()
|
||||
return
|
||||
default:
|
||||
transientFailureCount = 0
|
||||
delay = time.After(500 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
||||
r.logger.WithError(err).WithField("binary", binaryName).Debug("daemon is not responding")
|
||||
|
||||
if r.daemonPid == -1 {
|
||||
continue
|
||||
}
|
||||
|
||||
transientFailureCount++
|
||||
if transientFailureCount < maxConnectionRetryCount || system.IsProcessAlive(r.daemonPid) {
|
||||
delay = time.After(time.Duration(transientFailureCount) * 200 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
||||
transientFailureCount = 0
|
||||
if system.IsProcessAlive(r.daemonPid) {
|
||||
r.logger.WithField("pid", r.daemonPid).Info("killing and restarting containerd")
|
||||
// Try to get a stack trace
|
||||
syscall.Kill(r.daemonPid, syscall.SIGUSR1)
|
||||
<-time.After(100 * time.Millisecond)
|
||||
system.KillProcess(r.daemonPid)
|
||||
}
|
||||
if r.daemonWaitCh != nil {
|
||||
<-r.daemonWaitCh
|
||||
r.killDaemon()
|
||||
}
|
||||
|
||||
os.Remove(r.GRPC.Address)
|
||||
if err := r.startContainerd(); err != nil {
|
||||
r.logger.WithError(err).Error("failed restarting containerd")
|
||||
continue
|
||||
}
|
||||
|
||||
if err := monitor.Reconnect(); err != nil {
|
||||
r.logger.WithError(err).Error("failed connect to containerd")
|
||||
continue
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, c := range r.clients {
|
||||
wg.Add(1)
|
||||
|
||||
go func(c *client) {
|
||||
defer wg.Done()
|
||||
c.logger.WithField("namespace", c.namespace).Debug("creating new containerd remote client")
|
||||
if err := c.reconnect(); err != nil {
|
||||
r.logger.WithError(err).Error("failed to connect to containerd")
|
||||
// TODO: Better way to handle this?
|
||||
// This *shouldn't* happen, but this could wind up where the daemon
|
||||
// is not able to communicate with an eventually up containerd
|
||||
}
|
||||
}(c)
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
client.Close()
|
||||
r.daemonPid = -1
|
||||
delay = nil
|
||||
transientFailureCount = 0
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package libcontainerd // import "github.com/docker/docker/libcontainerd"
|
||||
package supervisor // import "github.com/docker/docker/libcontainerd/supervisor"
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
@ -38,10 +38,6 @@ func (r *remote) setDefaults() {
|
|||
delete(r.pluginConfs.Plugins, key)
|
||||
}
|
||||
}
|
||||
|
||||
if r.snapshotter == "" {
|
||||
r.snapshotter = "overlay"
|
||||
}
|
||||
}
|
||||
|
||||
func (r *remote) stopDaemon() {
|
||||
|
@ -61,6 +57,13 @@ func (r *remote) stopDaemon() {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *remote) killDaemon() {
|
||||
// Try to get a stack trace
|
||||
syscall.Kill(r.daemonPid, syscall.SIGUSR1)
|
||||
<-time.After(100 * time.Millisecond)
|
||||
system.KillProcess(r.daemonPid)
|
||||
}
|
||||
|
||||
func (r *remote) platformCleanup() {
|
||||
os.Remove(filepath.Join(r.stateDir, sockFile))
|
||||
}
|
55
libcontainerd/supervisor/remote_daemon_options.go
Normal file
55
libcontainerd/supervisor/remote_daemon_options.go
Normal file
|
@ -0,0 +1,55 @@
|
|||
package supervisor // import "github.com/docker/docker/libcontainerd/supervisor"
|
||||
|
||||
// WithRemoteAddr sets the external containerd socket to connect to.
|
||||
func WithRemoteAddr(addr string) DaemonOpt {
|
||||
return func(r *remote) error {
|
||||
r.GRPC.Address = addr
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithRemoteAddrUser sets the uid and gid to create the RPC address with
|
||||
func WithRemoteAddrUser(uid, gid int) DaemonOpt {
|
||||
return func(r *remote) error {
|
||||
r.GRPC.UID = uid
|
||||
r.GRPC.GID = gid
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogLevel defines which log level to starts containerd with.
|
||||
// This only makes sense if WithStartDaemon() was set to true.
|
||||
func WithLogLevel(lvl string) DaemonOpt {
|
||||
return func(r *remote) error {
|
||||
r.Debug.Level = lvl
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithDebugAddress defines at which location the debug GRPC connection
|
||||
// should be made
|
||||
func WithDebugAddress(addr string) DaemonOpt {
|
||||
return func(r *remote) error {
|
||||
r.Debug.Address = addr
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithMetricsAddress defines at which location the debug GRPC connection
|
||||
// should be made
|
||||
func WithMetricsAddress(addr string) DaemonOpt {
|
||||
return func(r *remote) error {
|
||||
r.Metrics.Address = addr
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithPlugin allow configuring a containerd plugin
|
||||
// configuration values passed needs to be quoted if quotes are needed in
|
||||
// the toml format.
|
||||
func WithPlugin(name string, conf interface{}) DaemonOpt {
|
||||
return func(r *remote) error {
|
||||
r.pluginConfs.Plugins[name] = conf
|
||||
return nil
|
||||
}
|
||||
}
|
9
libcontainerd/supervisor/remote_daemon_options_linux.go
Normal file
9
libcontainerd/supervisor/remote_daemon_options_linux.go
Normal file
|
@ -0,0 +1,9 @@
|
|||
package supervisor // import "github.com/docker/docker/libcontainerd/supervisor"
|
||||
|
||||
// WithOOMScore defines the oom_score_adj to set for the containerd process.
|
||||
func WithOOMScore(score int) DaemonOpt {
|
||||
return func(r *remote) error {
|
||||
r.OOMScore = score
|
||||
return nil
|
||||
}
|
||||
}
|
|
@ -1,9 +1,9 @@
|
|||
// +build remote_daemon
|
||||
|
||||
package libcontainerd // import "github.com/docker/docker/libcontainerd"
|
||||
package supervisor // import "github.com/docker/docker/libcontainerd/supervisor"
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/docker/docker/pkg/system"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -18,9 +18,6 @@ func (r *remote) setDefaults() {
|
|||
if r.Debug.Address == "" {
|
||||
r.Debug.Address = debugPipeName
|
||||
}
|
||||
if r.snapshotter == "" {
|
||||
r.snapshotter = "naive" // TODO(mlaventure): switch to "windows" once implemented
|
||||
}
|
||||
}
|
||||
|
||||
func (r *remote) stopDaemon() {
|
||||
|
@ -42,6 +39,10 @@ func (r *remote) stopDaemon() {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *remote) killDaemon() {
|
||||
system.KillProcess(r.daemonPid)
|
||||
}
|
||||
|
||||
func (r *remote) platformCleanup() {
|
||||
// Nothing to do
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package libcontainerd // import "github.com/docker/docker/libcontainerd"
|
||||
package supervisor // import "github.com/docker/docker/libcontainerd/supervisor"
|
||||
|
||||
import "syscall"
|
||||
|
9
libcontainerd/supervisor/utils_windows.go
Normal file
9
libcontainerd/supervisor/utils_windows.go
Normal file
|
@ -0,0 +1,9 @@
|
|||
package supervisor // import "github.com/docker/docker/libcontainerd/supervisor"
|
||||
|
||||
import "syscall"
|
||||
|
||||
// containerdSysProcAttr returns the SysProcAttr to use when exec'ing
|
||||
// containerd
|
||||
func containerdSysProcAttr() *syscall.SysProcAttr {
|
||||
return nil
|
||||
}
|
|
@ -46,23 +46,6 @@ const (
|
|||
StatusUnknown Status = "unknown"
|
||||
)
|
||||
|
||||
// Remote on Linux defines the accesspoint to the containerd grpc API.
|
||||
// Remote on Windows is largely an unimplemented interface as there is
|
||||
// no remote containerd.
|
||||
type Remote interface {
|
||||
// Client returns a new Client instance connected with given Backend.
|
||||
NewClient(namespace string, backend Backend) (Client, error)
|
||||
// Cleanup stops containerd if it was started by libcontainerd.
|
||||
// Note this is not used on Windows as there is no remote containerd.
|
||||
Cleanup()
|
||||
}
|
||||
|
||||
// RemoteOption allows to configure parameters of remotes.
|
||||
// This is unused on Windows.
|
||||
type RemoteOption interface {
|
||||
Apply(Remote) error
|
||||
}
|
||||
|
||||
// EventInfo contains the event info
|
||||
type EventInfo struct {
|
||||
ContainerID string
|
||||
|
|
|
@ -3,8 +3,6 @@ package libcontainerd // import "github.com/docker/docker/libcontainerd"
|
|||
import (
|
||||
"strings"
|
||||
|
||||
"syscall"
|
||||
|
||||
opengcs "github.com/Microsoft/opengcs/client"
|
||||
)
|
||||
|
||||
|
@ -38,9 +36,3 @@ func (c *container) debugGCS() {
|
|||
}
|
||||
cfg.DebugGCS()
|
||||
}
|
||||
|
||||
// containerdSysProcAttr returns the SysProcAttr to use when exec'ing
|
||||
// containerd
|
||||
func containerdSysProcAttr() *syscall.SysProcAttr {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/cio"
|
||||
"github.com/containerd/containerd/runtime/linux/runctypes"
|
||||
"github.com/docker/docker/errdefs"
|
||||
|
@ -16,8 +17,8 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// pluginNamespace is the name used for the plugins namespace
|
||||
const pluginNamespace = "plugins.moby"
|
||||
// PluginNamespace is the name used for the plugins namespace
|
||||
const PluginNamespace = "plugins.moby"
|
||||
|
||||
// ExitHandler represents an object that is called when the exit event is received from containerd
|
||||
type ExitHandler interface {
|
||||
|
@ -38,12 +39,13 @@ type Client interface {
|
|||
}
|
||||
|
||||
// New creates a new containerd plugin executor
|
||||
func New(rootDir string, remote libcontainerd.Remote, exitHandler ExitHandler) (*Executor, error) {
|
||||
func New(ctx context.Context, rootDir string, cli *containerd.Client, exitHandler ExitHandler) (*Executor, error) {
|
||||
e := &Executor{
|
||||
rootDir: rootDir,
|
||||
exitHandler: exitHandler,
|
||||
}
|
||||
client, err := remote.NewClient(pluginNamespace, e)
|
||||
|
||||
client, err := libcontainerd.NewClient(ctx, cli, rootDir, PluginNamespace, e)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error creating containerd exec client")
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue