123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534 |
- // +build linux
- /*
- Copyright The containerd Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package linux
- import (
- "context"
- "fmt"
- "io"
- "io/ioutil"
- "os"
- "path/filepath"
- "time"
- eventstypes "github.com/containerd/containerd/api/events"
- "github.com/containerd/containerd/api/types"
- "github.com/containerd/containerd/containers"
- "github.com/containerd/containerd/errdefs"
- "github.com/containerd/containerd/events/exchange"
- "github.com/containerd/containerd/identifiers"
- "github.com/containerd/containerd/log"
- "github.com/containerd/containerd/metadata"
- "github.com/containerd/containerd/mount"
- "github.com/containerd/containerd/namespaces"
- "github.com/containerd/containerd/pkg/process"
- "github.com/containerd/containerd/platforms"
- "github.com/containerd/containerd/plugin"
- "github.com/containerd/containerd/runtime"
- "github.com/containerd/containerd/runtime/linux/runctypes"
- v1 "github.com/containerd/containerd/runtime/v1"
- shim "github.com/containerd/containerd/runtime/v1/shim/v1"
- runc "github.com/containerd/go-runc"
- "github.com/containerd/typeurl"
- ptypes "github.com/gogo/protobuf/types"
- ocispec "github.com/opencontainers/image-spec/specs-go/v1"
- "github.com/pkg/errors"
- "github.com/sirupsen/logrus"
- "golang.org/x/sys/unix"
- )
- var (
- pluginID = fmt.Sprintf("%s.%s", plugin.RuntimePlugin, "linux")
- empty = &ptypes.Empty{}
- )
- const (
- configFilename = "config.json"
- defaultRuntime = "runc"
- defaultShim = "containerd-shim"
- // cleanupTimeout is default timeout for cleanup operations
- cleanupTimeout = 1 * time.Minute
- )
- func init() {
- plugin.Register(&plugin.Registration{
- Type: plugin.RuntimePlugin,
- ID: "linux",
- InitFn: New,
- Requires: []plugin.Type{
- plugin.MetadataPlugin,
- },
- Config: &Config{
- Shim: defaultShim,
- Runtime: defaultRuntime,
- },
- })
- }
- var _ = (runtime.PlatformRuntime)(&Runtime{})
- // Config options for the runtime
- type Config struct {
- // Shim is a path or name of binary implementing the Shim GRPC API
- Shim string `toml:"shim"`
- // Runtime is a path or name of an OCI runtime used by the shim
- Runtime string `toml:"runtime"`
- // RuntimeRoot is the path that shall be used by the OCI runtime for its data
- RuntimeRoot string `toml:"runtime_root"`
- // NoShim calls runc directly from within the pkg
- NoShim bool `toml:"no_shim"`
- // Debug enable debug on the shim
- ShimDebug bool `toml:"shim_debug"`
- }
- // New returns a configured runtime
- func New(ic *plugin.InitContext) (interface{}, error) {
- ic.Meta.Platforms = []ocispec.Platform{platforms.DefaultSpec()}
- if err := os.MkdirAll(ic.Root, 0711); err != nil {
- return nil, err
- }
- if err := os.MkdirAll(ic.State, 0711); err != nil {
- return nil, err
- }
- m, err := ic.Get(plugin.MetadataPlugin)
- if err != nil {
- return nil, err
- }
- cfg := ic.Config.(*Config)
- r := &Runtime{
- root: ic.Root,
- state: ic.State,
- tasks: runtime.NewTaskList(),
- containers: metadata.NewContainerStore(m.(*metadata.DB)),
- address: ic.Address,
- events: ic.Events,
- config: cfg,
- }
- tasks, err := r.restoreTasks(ic.Context)
- if err != nil {
- return nil, err
- }
- for _, t := range tasks {
- if err := r.tasks.AddWithNamespace(t.namespace, t); err != nil {
- return nil, err
- }
- }
- return r, nil
- }
- // Runtime for a linux based system
- type Runtime struct {
- root string
- state string
- address string
- tasks *runtime.TaskList
- containers containers.Store
- events *exchange.Exchange
- config *Config
- }
- // ID of the runtime
- func (r *Runtime) ID() string {
- return pluginID
- }
- // Create a new task
- func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, err error) {
- namespace, err := namespaces.NamespaceRequired(ctx)
- if err != nil {
- return nil, err
- }
- if err := identifiers.Validate(id); err != nil {
- return nil, errors.Wrapf(err, "invalid task id")
- }
- ropts, err := r.getRuncOptions(ctx, id)
- if err != nil {
- return nil, err
- }
- bundle, err := newBundle(id,
- filepath.Join(r.state, namespace),
- filepath.Join(r.root, namespace),
- opts.Spec.Value)
- if err != nil {
- return nil, err
- }
- defer func() {
- if err != nil {
- bundle.Delete()
- }
- }()
- shimopt := ShimLocal(r.config, r.events)
- if !r.config.NoShim {
- var cgroup string
- if opts.TaskOptions != nil {
- v, err := typeurl.UnmarshalAny(opts.TaskOptions)
- if err != nil {
- return nil, err
- }
- cgroup = v.(*runctypes.CreateOptions).ShimCgroup
- }
- exitHandler := func() {
- log.G(ctx).WithField("id", id).Info("shim reaped")
- if _, err := r.tasks.Get(ctx, id); err != nil {
- // Task was never started or was already successfully deleted
- return
- }
- if err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id); err != nil {
- log.G(ctx).WithError(err).WithFields(logrus.Fields{
- "id": id,
- "namespace": namespace,
- }).Warn("failed to clean up after killed shim")
- }
- }
- shimopt = ShimRemote(r.config, r.address, cgroup, exitHandler)
- }
- s, err := bundle.NewShimClient(ctx, namespace, shimopt, ropts)
- if err != nil {
- return nil, err
- }
- defer func() {
- if err != nil {
- deferCtx, deferCancel := context.WithTimeout(
- namespaces.WithNamespace(context.TODO(), namespace), cleanupTimeout)
- defer deferCancel()
- if kerr := s.KillShim(deferCtx); kerr != nil {
- log.G(ctx).WithError(err).Error("failed to kill shim")
- }
- }
- }()
- rt := r.config.Runtime
- if ropts != nil && ropts.Runtime != "" {
- rt = ropts.Runtime
- }
- sopts := &shim.CreateTaskRequest{
- ID: id,
- Bundle: bundle.path,
- Runtime: rt,
- Stdin: opts.IO.Stdin,
- Stdout: opts.IO.Stdout,
- Stderr: opts.IO.Stderr,
- Terminal: opts.IO.Terminal,
- Checkpoint: opts.Checkpoint,
- Options: opts.TaskOptions,
- }
- for _, m := range opts.Rootfs {
- sopts.Rootfs = append(sopts.Rootfs, &types.Mount{
- Type: m.Type,
- Source: m.Source,
- Options: m.Options,
- })
- }
- cr, err := s.Create(ctx, sopts)
- if err != nil {
- return nil, errdefs.FromGRPC(err)
- }
- t, err := newTask(id, namespace, int(cr.Pid), s, r.events, r.tasks, bundle)
- if err != nil {
- return nil, err
- }
- if err := r.tasks.Add(ctx, t); err != nil {
- return nil, err
- }
- r.events.Publish(ctx, runtime.TaskCreateEventTopic, &eventstypes.TaskCreate{
- ContainerID: sopts.ID,
- Bundle: sopts.Bundle,
- Rootfs: sopts.Rootfs,
- IO: &eventstypes.TaskIO{
- Stdin: sopts.Stdin,
- Stdout: sopts.Stdout,
- Stderr: sopts.Stderr,
- Terminal: sopts.Terminal,
- },
- Checkpoint: sopts.Checkpoint,
- Pid: uint32(t.pid),
- })
- return t, nil
- }
- // Tasks returns all tasks known to the runtime
- func (r *Runtime) Tasks(ctx context.Context, all bool) ([]runtime.Task, error) {
- return r.tasks.GetAll(ctx, all)
- }
- func (r *Runtime) restoreTasks(ctx context.Context) ([]*Task, error) {
- dir, err := ioutil.ReadDir(r.state)
- if err != nil {
- return nil, err
- }
- var o []*Task
- for _, namespace := range dir {
- if !namespace.IsDir() {
- continue
- }
- name := namespace.Name()
- // skip hidden directories
- if len(name) > 0 && name[0] == '.' {
- continue
- }
- log.G(ctx).WithField("namespace", name).Debug("loading tasks in namespace")
- tasks, err := r.loadTasks(ctx, name)
- if err != nil {
- return nil, err
- }
- o = append(o, tasks...)
- }
- return o, nil
- }
- // Get a specific task by task id
- func (r *Runtime) Get(ctx context.Context, id string) (runtime.Task, error) {
- return r.tasks.Get(ctx, id)
- }
- // Add a runtime task
- func (r *Runtime) Add(ctx context.Context, task runtime.Task) error {
- return r.tasks.Add(ctx, task)
- }
- // Delete a runtime task
- func (r *Runtime) Delete(ctx context.Context, id string) {
- r.tasks.Delete(ctx, id)
- }
- func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
- dir, err := ioutil.ReadDir(filepath.Join(r.state, ns))
- if err != nil {
- return nil, err
- }
- var o []*Task
- for _, path := range dir {
- if !path.IsDir() {
- continue
- }
- id := path.Name()
- // skip hidden directories
- if len(id) > 0 && id[0] == '.' {
- continue
- }
- bundle := loadBundle(
- id,
- filepath.Join(r.state, ns, id),
- filepath.Join(r.root, ns, id),
- )
- ctx = namespaces.WithNamespace(ctx, ns)
- pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, process.InitPidFile))
- shimExit := make(chan struct{})
- s, err := bundle.NewShimClient(ctx, ns, ShimConnect(r.config, func() {
- defer close(shimExit)
- if _, err := r.tasks.Get(ctx, id); err != nil {
- // Task was never started or was already successfully deleted
- return
- }
- if err := r.cleanupAfterDeadShim(ctx, bundle, ns, id); err != nil {
- log.G(ctx).WithError(err).WithField("bundle", bundle.path).
- Error("cleaning up after dead shim")
- }
- }), nil)
- if err != nil {
- log.G(ctx).WithError(err).WithFields(logrus.Fields{
- "id": id,
- "namespace": ns,
- }).Error("connecting to shim")
- err := r.cleanupAfterDeadShim(ctx, bundle, ns, id)
- if err != nil {
- log.G(ctx).WithError(err).WithField("bundle", bundle.path).
- Error("cleaning up after dead shim")
- }
- continue
- }
- logDirPath := filepath.Join(r.root, ns, id)
- copyAndClose := func(dst io.Writer, src io.ReadWriteCloser) {
- copyDone := make(chan struct{})
- go func() {
- io.Copy(dst, src)
- close(copyDone)
- }()
- select {
- case <-shimExit:
- case <-copyDone:
- }
- src.Close()
- }
- shimStdoutLog, err := v1.OpenShimStdoutLog(ctx, logDirPath)
- if err != nil {
- log.G(ctx).WithError(err).WithFields(logrus.Fields{
- "id": id,
- "namespace": ns,
- "logDirPath": logDirPath,
- }).Error("opening shim stdout log pipe")
- continue
- }
- if r.config.ShimDebug {
- go copyAndClose(os.Stdout, shimStdoutLog)
- } else {
- go copyAndClose(ioutil.Discard, shimStdoutLog)
- }
- shimStderrLog, err := v1.OpenShimStderrLog(ctx, logDirPath)
- if err != nil {
- log.G(ctx).WithError(err).WithFields(logrus.Fields{
- "id": id,
- "namespace": ns,
- "logDirPath": logDirPath,
- }).Error("opening shim stderr log pipe")
- continue
- }
- if r.config.ShimDebug {
- go copyAndClose(os.Stderr, shimStderrLog)
- } else {
- go copyAndClose(ioutil.Discard, shimStderrLog)
- }
- t, err := newTask(id, ns, pid, s, r.events, r.tasks, bundle)
- if err != nil {
- log.G(ctx).WithError(err).Error("loading task type")
- continue
- }
- o = append(o, t)
- }
- return o, nil
- }
- func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string) error {
- log.G(ctx).WithFields(logrus.Fields{
- "id": id,
- "namespace": ns,
- }).Warn("cleaning up after shim dead")
- pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, process.InitPidFile))
- ctx = namespaces.WithNamespace(ctx, ns)
- if err := r.terminate(ctx, bundle, ns, id); err != nil {
- if r.config.ShimDebug {
- return errors.Wrap(err, "failed to terminate task, leaving bundle for debugging")
- }
- log.G(ctx).WithError(err).Warn("failed to terminate task")
- }
- // Notify Client
- exitedAt := time.Now().UTC()
- r.events.Publish(ctx, runtime.TaskExitEventTopic, &eventstypes.TaskExit{
- ContainerID: id,
- ID: id,
- Pid: uint32(pid),
- ExitStatus: 128 + uint32(unix.SIGKILL),
- ExitedAt: exitedAt,
- })
- r.tasks.Delete(ctx, id)
- if err := bundle.Delete(); err != nil {
- log.G(ctx).WithError(err).Error("delete bundle")
- }
- // kill shim
- if shimPid, err := runc.ReadPidFile(filepath.Join(bundle.path, "shim.pid")); err == nil && shimPid > 0 {
- unix.Kill(shimPid, unix.SIGKILL)
- }
- r.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
- ContainerID: id,
- Pid: uint32(pid),
- ExitStatus: 128 + uint32(unix.SIGKILL),
- ExitedAt: exitedAt,
- })
- return nil
- }
- func (r *Runtime) terminate(ctx context.Context, bundle *bundle, ns, id string) error {
- rt, err := r.getRuntime(ctx, ns, id)
- if err != nil {
- return err
- }
- if err := rt.Delete(ctx, id, &runc.DeleteOpts{
- Force: true,
- }); err != nil {
- log.G(ctx).WithError(err).Warnf("delete runtime state %s", id)
- }
- if err := mount.Unmount(filepath.Join(bundle.path, "rootfs"), 0); err != nil {
- log.G(ctx).WithError(err).WithFields(logrus.Fields{
- "path": bundle.path,
- "id": id,
- }).Warnf("unmount task rootfs")
- }
- return nil
- }
- func (r *Runtime) getRuntime(ctx context.Context, ns, id string) (*runc.Runc, error) {
- ropts, err := r.getRuncOptions(ctx, id)
- if err != nil {
- return nil, err
- }
- var (
- cmd = r.config.Runtime
- root = process.RuncRoot
- )
- if ropts != nil {
- if ropts.Runtime != "" {
- cmd = ropts.Runtime
- }
- if ropts.RuntimeRoot != "" {
- root = ropts.RuntimeRoot
- }
- }
- return &runc.Runc{
- Command: cmd,
- LogFormat: runc.JSON,
- PdeathSignal: unix.SIGKILL,
- Root: filepath.Join(root, ns),
- Debug: r.config.ShimDebug,
- }, nil
- }
- func (r *Runtime) getRuncOptions(ctx context.Context, id string) (*runctypes.RuncOptions, error) {
- container, err := r.containers.Get(ctx, id)
- if err != nil {
- return nil, err
- }
- if container.Runtime.Options != nil {
- v, err := typeurl.UnmarshalAny(container.Runtime.Options)
- if err != nil {
- return nil, err
- }
- ropts, ok := v.(*runctypes.RuncOptions)
- if !ok {
- return nil, errors.New("invalid runtime options format")
- }
- return ropts, nil
- }
- return &runctypes.RuncOptions{}, nil
- }
|