containerd.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package containerd // import "github.com/docker/docker/plugin/executor/containerd"
  2. import (
  3. "context"
  4. "io"
  5. "path/filepath"
  6. "sync"
  7. "time"
  8. "github.com/containerd/containerd"
  9. "github.com/containerd/containerd/cio"
  10. "github.com/containerd/containerd/runtime/linux/runctypes"
  11. "github.com/docker/docker/errdefs"
  12. "github.com/docker/docker/libcontainerd"
  13. libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
  14. "github.com/opencontainers/runtime-spec/specs-go"
  15. "github.com/pkg/errors"
  16. "github.com/sirupsen/logrus"
  17. )
  18. // PluginNamespace is the name used for the plugins namespace
  19. const PluginNamespace = "plugins.moby"
  20. // ExitHandler represents an object that is called when the exit event is received from containerd
  21. type ExitHandler interface {
  22. HandleExitEvent(id string) error
  23. }
  24. // Client is used by the exector to perform operations.
  25. // TODO(@cpuguy83): This should really just be based off the containerd client interface.
  26. // However right now this whole package is tied to github.com/docker/docker/libcontainerd
  27. type Client interface {
  28. Create(ctx context.Context, containerID string, spec *specs.Spec, runtimeOptions interface{}) error
  29. Restore(ctx context.Context, containerID string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, err error)
  30. Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error)
  31. Delete(ctx context.Context, containerID string) error
  32. DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error)
  33. Start(ctx context.Context, containerID, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (pid int, err error)
  34. SignalProcess(ctx context.Context, containerID, processID string, signal int) error
  35. }
  36. // New creates a new containerd plugin executor
  37. func New(ctx context.Context, rootDir string, cli *containerd.Client, exitHandler ExitHandler) (*Executor, error) {
  38. e := &Executor{
  39. rootDir: rootDir,
  40. exitHandler: exitHandler,
  41. }
  42. client, err := libcontainerd.NewClient(ctx, cli, rootDir, PluginNamespace, e)
  43. if err != nil {
  44. return nil, errors.Wrap(err, "error creating containerd exec client")
  45. }
  46. e.client = client
  47. return e, nil
  48. }
  49. // Executor is the containerd client implementation of a plugin executor
  50. type Executor struct {
  51. rootDir string
  52. client Client
  53. exitHandler ExitHandler
  54. }
  55. // deleteTaskAndContainer deletes plugin task and then plugin container from containerd
  56. func deleteTaskAndContainer(ctx context.Context, cli Client, id string) {
  57. _, _, err := cli.DeleteTask(ctx, id)
  58. if err != nil && !errdefs.IsNotFound(err) {
  59. logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd")
  60. }
  61. err = cli.Delete(ctx, id)
  62. if err != nil && !errdefs.IsNotFound(err) {
  63. logrus.WithError(err).WithField("id", id).Error("failed to delete plugin container from containerd")
  64. }
  65. }
  66. // Create creates a new container
  67. func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error {
  68. opts := runctypes.RuncOptions{
  69. RuntimeRoot: filepath.Join(e.rootDir, "runtime-root"),
  70. }
  71. ctx := context.Background()
  72. err := e.client.Create(ctx, id, &spec, &opts)
  73. if err != nil {
  74. status, err2 := e.client.Status(ctx, id)
  75. if err2 != nil {
  76. if !errdefs.IsNotFound(err2) {
  77. logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to read plugin status")
  78. }
  79. } else {
  80. if status != containerd.Running && status != containerd.Unknown {
  81. if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
  82. logrus.WithError(err2).WithField("plugin", id).Error("Error cleaning up containerd container")
  83. }
  84. err = e.client.Create(ctx, id, &spec, &opts)
  85. }
  86. }
  87. if err != nil {
  88. return errors.Wrap(err, "error creating containerd container")
  89. }
  90. }
  91. _, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr))
  92. if err != nil {
  93. deleteTaskAndContainer(ctx, e.client, id)
  94. }
  95. return err
  96. }
  97. // Restore restores a container
  98. func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) (bool, error) {
  99. alive, _, err := e.client.Restore(context.Background(), id, attachStreamsFunc(stdout, stderr))
  100. if err != nil && !errdefs.IsNotFound(err) {
  101. return false, err
  102. }
  103. if !alive {
  104. deleteTaskAndContainer(context.Background(), e.client, id)
  105. }
  106. return alive, nil
  107. }
  108. // IsRunning returns if the container with the given id is running
  109. func (e *Executor) IsRunning(id string) (bool, error) {
  110. status, err := e.client.Status(context.Background(), id)
  111. return status == containerd.Running, err
  112. }
  113. // Signal sends the specified signal to the container
  114. func (e *Executor) Signal(id string, signal int) error {
  115. return e.client.SignalProcess(context.Background(), id, libcontainerdtypes.InitProcessName, signal)
  116. }
  117. // ProcessEvent handles events from containerd
  118. // All events are ignored except the exit event, which is sent of to the stored handler
  119. func (e *Executor) ProcessEvent(id string, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) error {
  120. switch et {
  121. case libcontainerdtypes.EventExit:
  122. deleteTaskAndContainer(context.Background(), e.client, id)
  123. return e.exitHandler.HandleExitEvent(ei.ContainerID)
  124. }
  125. return nil
  126. }
  127. type rio struct {
  128. cio.IO
  129. wg sync.WaitGroup
  130. }
  131. func (c *rio) Wait() {
  132. c.wg.Wait()
  133. c.IO.Wait()
  134. }
  135. func attachStreamsFunc(stdout, stderr io.WriteCloser) libcontainerdtypes.StdioCallback {
  136. return func(iop *cio.DirectIO) (cio.IO, error) {
  137. if iop.Stdin != nil {
  138. iop.Stdin.Close()
  139. // closing stdin shouldn't be needed here, it should never be open
  140. panic("plugin stdin shouldn't have been created!")
  141. }
  142. rio := &rio{IO: iop}
  143. rio.wg.Add(2)
  144. go func() {
  145. io.Copy(stdout, iop.Stdout)
  146. stdout.Close()
  147. rio.wg.Done()
  148. }()
  149. go func() {
  150. io.Copy(stderr, iop.Stderr)
  151. stderr.Close()
  152. rio.wg.Done()
  153. }()
  154. return rio, nil
  155. }
  156. }