containerd.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package containerd // import "github.com/docker/docker/plugin/executor/containerd"
  2. import (
  3. "context"
  4. "io"
  5. "sync"
  6. "github.com/containerd/containerd"
  7. "github.com/containerd/containerd/cio"
  8. "github.com/docker/docker/api/types"
  9. "github.com/docker/docker/errdefs"
  10. "github.com/docker/docker/libcontainerd"
  11. libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
  12. specs "github.com/opencontainers/runtime-spec/specs-go"
  13. "github.com/pkg/errors"
  14. "github.com/sirupsen/logrus"
  15. )
  16. // PluginNamespace is the name used for the plugins namespace
  17. const PluginNamespace = "plugins.moby"
  18. // ExitHandler represents an object that is called when the exit event is received from containerd
  19. type ExitHandler interface {
  20. HandleExitEvent(id string) error
  21. }
  22. // New creates a new containerd plugin executor
  23. func New(ctx context.Context, rootDir string, cli *containerd.Client, ns string, exitHandler ExitHandler, runtime types.Runtime) (*Executor, error) {
  24. e := &Executor{
  25. rootDir: rootDir,
  26. exitHandler: exitHandler,
  27. runtime: runtime,
  28. }
  29. client, err := libcontainerd.NewClient(ctx, cli, rootDir, ns, e)
  30. if err != nil {
  31. return nil, errors.Wrap(err, "error creating containerd exec client")
  32. }
  33. e.client = client
  34. return e, nil
  35. }
  36. // Executor is the containerd client implementation of a plugin executor
  37. type Executor struct {
  38. rootDir string
  39. client libcontainerdtypes.Client
  40. exitHandler ExitHandler
  41. runtime types.Runtime
  42. }
  43. // deleteTaskAndContainer deletes plugin task and then plugin container from containerd
  44. func deleteTaskAndContainer(ctx context.Context, cli libcontainerdtypes.Client, id string, p libcontainerdtypes.Process) {
  45. if p != nil {
  46. if _, _, err := p.Delete(ctx); err != nil && !errdefs.IsNotFound(err) {
  47. logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd")
  48. }
  49. } else {
  50. if _, _, err := cli.DeleteTask(ctx, id); err != nil && !errdefs.IsNotFound(err) {
  51. logrus.WithError(err).WithField("id", id).Error("failed to delete plugin task from containerd")
  52. }
  53. }
  54. if err := cli.Delete(ctx, id); err != nil && !errdefs.IsNotFound(err) {
  55. logrus.WithError(err).WithField("id", id).Error("failed to delete plugin container from containerd")
  56. }
  57. }
  58. // Create creates a new container
  59. func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error {
  60. ctx := context.Background()
  61. err := e.client.Create(ctx, id, &spec, e.runtime.Shim.Binary, e.runtime.Shim.Opts)
  62. if err != nil {
  63. status, err2 := e.client.Status(ctx, id)
  64. if err2 != nil {
  65. if !errdefs.IsNotFound(err2) {
  66. logrus.WithError(err2).WithField("id", id).Warn("Received an error while attempting to read plugin status")
  67. }
  68. } else {
  69. if status != containerd.Running && status != containerd.Unknown {
  70. if err2 := e.client.Delete(ctx, id); err2 != nil && !errdefs.IsNotFound(err2) {
  71. logrus.WithError(err2).WithField("plugin", id).Error("Error cleaning up containerd container")
  72. }
  73. err = e.client.Create(ctx, id, &spec, e.runtime.Shim.Binary, e.runtime.Shim.Opts)
  74. }
  75. }
  76. if err != nil {
  77. return errors.Wrap(err, "error creating containerd container")
  78. }
  79. }
  80. _, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr))
  81. if err != nil {
  82. deleteTaskAndContainer(ctx, e.client, id, nil)
  83. }
  84. return err
  85. }
  86. // Restore restores a container
  87. func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) (bool, error) {
  88. alive, _, p, err := e.client.Restore(context.Background(), id, attachStreamsFunc(stdout, stderr))
  89. if err != nil && !errdefs.IsNotFound(err) {
  90. return false, err
  91. }
  92. if !alive {
  93. deleteTaskAndContainer(context.Background(), e.client, id, p)
  94. }
  95. return alive, nil
  96. }
  97. // IsRunning returns if the container with the given id is running
  98. func (e *Executor) IsRunning(id string) (bool, error) {
  99. status, err := e.client.Status(context.Background(), id)
  100. return status == containerd.Running, err
  101. }
  102. // Signal sends the specified signal to the container
  103. func (e *Executor) Signal(id string, signal int) error {
  104. return e.client.SignalProcess(context.Background(), id, libcontainerdtypes.InitProcessName, signal)
  105. }
  106. // ProcessEvent handles events from containerd
  107. // All events are ignored except the exit event, which is sent of to the stored handler
  108. func (e *Executor) ProcessEvent(id string, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) error {
  109. switch et {
  110. case libcontainerdtypes.EventExit:
  111. deleteTaskAndContainer(context.Background(), e.client, id, nil)
  112. return e.exitHandler.HandleExitEvent(ei.ContainerID)
  113. }
  114. return nil
  115. }
  116. type rio struct {
  117. cio.IO
  118. wg sync.WaitGroup
  119. }
  120. func (c *rio) Wait() {
  121. c.wg.Wait()
  122. c.IO.Wait()
  123. }
  124. func attachStreamsFunc(stdout, stderr io.WriteCloser) libcontainerdtypes.StdioCallback {
  125. return func(iop *cio.DirectIO) (cio.IO, error) {
  126. if iop.Stdin != nil {
  127. iop.Stdin.Close()
  128. // closing stdin shouldn't be needed here, it should never be open
  129. panic("plugin stdin shouldn't have been created!")
  130. }
  131. rio := &rio{IO: iop}
  132. rio.wg.Add(2)
  133. go func() {
  134. io.Copy(stdout, iop.Stdout)
  135. stdout.Close()
  136. rio.wg.Done()
  137. }()
  138. go func() {
  139. io.Copy(stderr, iop.Stderr)
  140. stderr.Close()
  141. rio.wg.Done()
  142. }()
  143. return rio, nil
  144. }
  145. }