containerd.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package containerd
  2. import (
  3. "context"
  4. "io"
  5. "path/filepath"
  6. "sync"
  7. "github.com/containerd/containerd/cio"
  8. "github.com/containerd/containerd/linux/runctypes"
  9. "github.com/docker/docker/errdefs"
  10. "github.com/docker/docker/libcontainerd"
  11. "github.com/opencontainers/runtime-spec/specs-go"
  12. "github.com/pkg/errors"
  13. "github.com/sirupsen/logrus"
  14. )
  15. // PluginNamespace is the name used for the plugins namespace
  16. var PluginNamespace = "plugins.moby"
  17. // ExitHandler represents an object that is called when the exit event is received from containerd
  18. type ExitHandler interface {
  19. HandleExitEvent(id string) error
  20. }
  21. // New creates a new containerd plugin executor
  22. func New(rootDir string, remote libcontainerd.Remote, exitHandler ExitHandler) (*Executor, error) {
  23. e := &Executor{
  24. rootDir: rootDir,
  25. exitHandler: exitHandler,
  26. }
  27. client, err := remote.NewClient(PluginNamespace, e)
  28. if err != nil {
  29. return nil, errors.Wrap(err, "error creating containerd exec client")
  30. }
  31. e.client = client
  32. return e, nil
  33. }
  34. // Executor is the containerd client implementation of a plugin executor
  35. type Executor struct {
  36. rootDir string
  37. client libcontainerd.Client
  38. exitHandler ExitHandler
  39. }
  40. // Create creates a new container
  41. func (e *Executor) Create(id string, spec specs.Spec, stdout, stderr io.WriteCloser) error {
  42. opts := runctypes.RuncOptions{
  43. RuntimeRoot: filepath.Join(e.rootDir, "runtime-root"),
  44. }
  45. ctx := context.Background()
  46. err := e.client.Create(ctx, id, &spec, &opts)
  47. if err != nil {
  48. return err
  49. }
  50. _, err = e.client.Start(ctx, id, "", false, attachStreamsFunc(stdout, stderr))
  51. return err
  52. }
  53. // Restore restores a container
  54. func (e *Executor) Restore(id string, stdout, stderr io.WriteCloser) error {
  55. alive, _, err := e.client.Restore(context.Background(), id, attachStreamsFunc(stdout, stderr))
  56. if err != nil && !errdefs.IsNotFound(err) {
  57. return err
  58. }
  59. if !alive {
  60. _, _, err = e.client.DeleteTask(context.Background(), id)
  61. if err != nil && !errdefs.IsNotFound(err) {
  62. logrus.WithError(err).Errorf("failed to delete container plugin %s task from containerd", id)
  63. return err
  64. }
  65. err = e.client.Delete(context.Background(), id)
  66. if err != nil && !errdefs.IsNotFound(err) {
  67. logrus.WithError(err).Errorf("failed to delete container plugin %s from containerd", id)
  68. return err
  69. }
  70. }
  71. return nil
  72. }
  73. // IsRunning returns if the container with the given id is running
  74. func (e *Executor) IsRunning(id string) (bool, error) {
  75. status, err := e.client.Status(context.Background(), id)
  76. return status == libcontainerd.StatusRunning, err
  77. }
  78. // Signal sends the specified signal to the container
  79. func (e *Executor) Signal(id string, signal int) error {
  80. return e.client.SignalProcess(context.Background(), id, libcontainerd.InitProcessName, signal)
  81. }
  82. // ProcessEvent handles events from containerd
  83. // All events are ignored except the exit event, which is sent of to the stored handler
  84. func (e *Executor) ProcessEvent(id string, et libcontainerd.EventType, ei libcontainerd.EventInfo) error {
  85. switch et {
  86. case libcontainerd.EventExit:
  87. // delete task and container
  88. if _, _, err := e.client.DeleteTask(context.Background(), id); err != nil {
  89. logrus.WithError(err).Errorf("failed to delete container plugin %s task from containerd", id)
  90. }
  91. if err := e.client.Delete(context.Background(), id); err != nil {
  92. logrus.WithError(err).Errorf("failed to delete container plugin %s from containerd", id)
  93. }
  94. return e.exitHandler.HandleExitEvent(ei.ContainerID)
  95. }
  96. return nil
  97. }
  98. type rio struct {
  99. cio.IO
  100. wg sync.WaitGroup
  101. }
  102. func (c *rio) Wait() {
  103. c.wg.Wait()
  104. c.IO.Wait()
  105. }
  106. func attachStreamsFunc(stdout, stderr io.WriteCloser) libcontainerd.StdioCallback {
  107. return func(iop *cio.DirectIO) (cio.IO, error) {
  108. if iop.Stdin != nil {
  109. iop.Stdin.Close()
  110. // closing stdin shouldn't be needed here, it should never be open
  111. panic("plugin stdin shouldn't have been created!")
  112. }
  113. rio := &rio{IO: iop}
  114. rio.wg.Add(2)
  115. go func() {
  116. io.Copy(stdout, iop.Stdout)
  117. stdout.Close()
  118. rio.wg.Done()
  119. }()
  120. go func() {
  121. io.Copy(stderr, iop.Stderr)
  122. stderr.Close()
  123. rio.wg.Done()
  124. }()
  125. return rio, nil
  126. }
  127. }