containerd.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package containerd // import "github.com/docker/docker/plugin/executor/containerd"
  2. import (
  3. "context"
  4. "io"
  5. "path/filepath"
  6. "sync"
  7. "github.com/containerd/containerd"
  8. "github.com/containerd/containerd/cio"
  9. "github.com/containerd/containerd/runtime/linux/runctypes"
  10. "github.com/docker/docker/errdefs"
  11. "github.com/docker/docker/libcontainerd"
  12. libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
  13. "github.com/opencontainers/runtime-spec/specs-go"
  14. "github.com/pkg/errors"
  15. "github.com/sirupsen/logrus"
  16. )
  17. // PluginNamespace is the name used for the plugins namespace
  18. const PluginNamespace = "plugins.moby"
  19. // ExitHandler represents an object that is called when the exit event is received from containerd
  20. type ExitHandler interface {
  21. HandleExitEvent(id string) error
  22. }
  23. // New creates a new containerd plugin executor
  24. func New(ctx context.Context, rootDir string, cli *containerd.Client, exitHandler ExitHandler) (*Executor, error) {
  25. e := &Executor{
  26. rootDir: rootDir,
  27. exitHandler: exitHandler,
  28. }
  29. client, err := libcontainerd.NewClient(ctx, cli, rootDir, PluginNamespace, e)
  30. if err != nil {
  31. return nil, errors.Wrap(err, "error creating containerd exec client")
  32. }
  33. e.client = client
  34. return e, nil
  35. }
  36. // Executor is the containerd client implementation of a plugin executor
  37. type Executor struct {
  38. rootDir string
  39. client libcontainerdtypes.Client
  40. exitHandler ExitHandler
  41. }
  42. // deleteTaskAndContainer deletes plugin task and then plugin container from containerd
  43. func deleteTaskAndContainer(ctx context.Context, cli libcontainerdtypes.Client, id string, p libcontainerdtypes.Process) {
  44. if p != nil {
  45. if _, _, err := p.Delete(ctx); err != nil && !errdefs.IsNotFound(err) {
  46. logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd")
  47. }
  48. } else {
  49. if _, _, err := cli.DeleteTask(ctx, id); err != nil && !errdefs.IsNotFound(err) {
  50. logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd")
  51. }
  52. }
  53. if err := cli.Delete(ctx, id); err != nil && !errdefs.IsNotFound(err) {
  54. logrus.WithError(err).WithField("id", id).Error("failed to delete plugin container from containerd")
  55. }
  56. }
  57. // Create creates a new container
  58. func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error {
  59. opts := runctypes.RuncOptions{
  60. RuntimeRoot: filepath.Join(e.rootDir, "runtime-root"),
  61. }
  62. ctx := context.Background()
  63. err := e.client.Create(ctx, id, &spec, &opts)
  64. if err != nil {
  65. status, err2 := e.client.Status(ctx, id)
  66. if err2 != nil {
  67. if !errdefs.IsNotFound(err2) {
  68. logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to read plugin status")
  69. }
  70. } else {
  71. if status != containerd.Running && status != containerd.Unknown {
  72. if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
  73. logrus.WithError(err2).WithField("plugin", id).Error("Error cleaning up containerd container")
  74. }
  75. err = e.client.Create(ctx, id, &spec, &opts)
  76. }
  77. }
  78. if err != nil {
  79. return errors.Wrap(err, "error creating containerd container")
  80. }
  81. }
  82. _, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr))
  83. if err != nil {
  84. deleteTaskAndContainer(ctx, e.client, id, nil)
  85. }
  86. return err
  87. }
  88. // Restore restores a container
  89. func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) (bool, error) {
  90. alive, _, p, err := e.client.Restore(context.Background(), id, attachStreamsFunc(stdout, stderr))
  91. if err != nil && !errdefs.IsNotFound(err) {
  92. return false, err
  93. }
  94. if !alive {
  95. deleteTaskAndContainer(context.Background(), e.client, id, p)
  96. }
  97. return alive, nil
  98. }
  99. // IsRunning returns if the container with the given id is running
  100. func (e *Executor) IsRunning(id string) (bool, error) {
  101. status, err := e.client.Status(context.Background(), id)
  102. return status == containerd.Running, err
  103. }
  104. // Signal sends the specified signal to the container
  105. func (e *Executor) Signal(id string, signal int) error {
  106. return e.client.SignalProcess(context.Background(), id, libcontainerdtypes.InitProcessName, signal)
  107. }
  108. // ProcessEvent handles events from containerd
  109. // All events are ignored except the exit event, which is sent of to the stored handler
  110. func (e *Executor) ProcessEvent(id string, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) error {
  111. switch et {
  112. case libcontainerdtypes.EventExit:
  113. deleteTaskAndContainer(context.Background(), e.client, id, nil)
  114. return e.exitHandler.HandleExitEvent(ei.ContainerID)
  115. }
  116. return nil
  117. }
  118. type rio struct {
  119. cio.IO
  120. wg sync.WaitGroup
  121. }
  122. func (c *rio) Wait() {
  123. c.wg.Wait()
  124. c.IO.Wait()
  125. }
  126. func attachStreamsFunc(stdout, stderr io.WriteCloser) libcontainerdtypes.StdioCallback {
  127. return func(iop *cio.DirectIO) (cio.IO, error) {
  128. if iop.Stdin != nil {
  129. iop.Stdin.Close()
  130. // closing stdin shouldn't be needed here, it should never be open
  131. panic("plugin stdin shouldn't have been created!")
  132. }
  133. rio := &rio{IO: iop}
  134. rio.wg.Add(2)
  135. go func() {
  136. io.Copy(stdout, iop.Stdout)
  137. stdout.Close()
  138. rio.wg.Done()
  139. }()
  140. go func() {
  141. io.Copy(stderr, iop.Stderr)
  142. stderr.Close()
  143. rio.wg.Done()
  144. }()
  145. return rio, nil
  146. }
  147. }