client.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842
  1. package remote // import "github.com/docker/docker/libcontainerd/remote"
  2. import (
  3. "context"
  4. "encoding/json"
  5. "io"
  6. "os"
  7. "path/filepath"
  8. "reflect"
  9. "runtime"
  10. "strings"
  11. "sync"
  12. "syscall"
  13. "time"
  14. "github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/options"
  15. "github.com/containerd/containerd"
  16. apievents "github.com/containerd/containerd/api/events"
  17. "github.com/containerd/containerd/api/types"
  18. "github.com/containerd/containerd/archive"
  19. "github.com/containerd/containerd/cio"
  20. "github.com/containerd/containerd/content"
  21. containerderrors "github.com/containerd/containerd/errdefs"
  22. "github.com/containerd/containerd/events"
  23. "github.com/containerd/containerd/images"
  24. "github.com/containerd/containerd/runtime/linux/runctypes"
  25. "github.com/containerd/typeurl"
  26. "github.com/docker/docker/errdefs"
  27. "github.com/docker/docker/libcontainerd/queue"
  28. libcontainerdtypes "github.com/docker/docker/libcontainerd/types"
  29. "github.com/docker/docker/pkg/ioutils"
  30. v1 "github.com/opencontainers/image-spec/specs-go/v1"
  31. specs "github.com/opencontainers/runtime-spec/specs-go"
  32. "github.com/pkg/errors"
  33. "github.com/sirupsen/logrus"
  34. "google.golang.org/grpc/codes"
  35. "google.golang.org/grpc/status"
  36. )
  37. // DockerContainerBundlePath is the label key pointing to the container's bundle path
  38. const DockerContainerBundlePath = "com.docker/engine.bundle.path"
  39. type client struct {
  40. client *containerd.Client
  41. stateDir string
  42. logger *logrus.Entry
  43. ns string
  44. backend libcontainerdtypes.Backend
  45. eventQ queue.Queue
  46. oomMu sync.Mutex
  47. oom map[string]bool
  48. }
  49. // NewClient creates a new libcontainerd client from a containerd client
  50. func NewClient(ctx context.Context, cli *containerd.Client, stateDir, ns string, b libcontainerdtypes.Backend) (libcontainerdtypes.Client, error) {
  51. c := &client{
  52. client: cli,
  53. stateDir: stateDir,
  54. logger: logrus.WithField("module", "libcontainerd").WithField("namespace", ns),
  55. ns: ns,
  56. backend: b,
  57. oom: make(map[string]bool),
  58. }
  59. go c.processEventStream(ctx, ns)
  60. return c, nil
  61. }
  62. func (c *client) Version(ctx context.Context) (containerd.Version, error) {
  63. return c.client.Version(ctx)
  64. }
  65. // Restore loads the containerd container.
  66. // It should not be called concurrently with any other operation for the given ID.
  67. func (c *client) Restore(ctx context.Context, id string, attachStdio libcontainerdtypes.StdioCallback) (alive bool, pid int, p libcontainerdtypes.Process, err error) {
  68. var dio *cio.DirectIO
  69. defer func() {
  70. if err != nil && dio != nil {
  71. dio.Cancel()
  72. dio.Close()
  73. }
  74. err = wrapError(err)
  75. }()
  76. ctr, err := c.client.LoadContainer(ctx, id)
  77. if err != nil {
  78. return false, -1, nil, errors.WithStack(wrapError(err))
  79. }
  80. attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) {
  81. // dio must be assigned to the previously defined dio for the defer above
  82. // to handle cleanup
  83. dio, err = c.newDirectIO(ctx, fifos)
  84. if err != nil {
  85. return nil, err
  86. }
  87. return attachStdio(dio)
  88. }
  89. t, err := ctr.Task(ctx, attachIO)
  90. if err != nil && !containerderrors.IsNotFound(err) {
  91. return false, -1, nil, errors.Wrap(wrapError(err), "error getting containerd task for container")
  92. }
  93. if t != nil {
  94. s, err := t.Status(ctx)
  95. if err != nil {
  96. return false, -1, nil, errors.Wrap(wrapError(err), "error getting task status")
  97. }
  98. alive = s.Status != containerd.Stopped
  99. pid = int(t.Pid())
  100. }
  101. c.logger.WithFields(logrus.Fields{
  102. "container": id,
  103. "alive": alive,
  104. "pid": pid,
  105. }).Debug("restored container")
  106. return alive, pid, &restoredProcess{
  107. p: t,
  108. }, nil
  109. }
  110. func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, runtimeOptions interface{}) error {
  111. bdir := c.bundleDir(id)
  112. c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")
  113. _, err := c.client.NewContainer(ctx, id,
  114. containerd.WithSpec(ociSpec),
  115. containerd.WithRuntime(runtimeName, runtimeOptions),
  116. WithBundle(bdir, ociSpec),
  117. )
  118. if err != nil {
  119. if containerderrors.IsAlreadyExists(err) {
  120. return errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
  121. }
  122. return wrapError(err)
  123. }
  124. return nil
  125. }
  126. // Start create and start a task for the specified containerd id
  127. func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) {
  128. ctr, err := c.getContainer(ctx, id)
  129. if err != nil {
  130. return -1, err
  131. }
  132. var (
  133. cp *types.Descriptor
  134. t containerd.Task
  135. rio cio.IO
  136. stdinCloseSync = make(chan struct{})
  137. )
  138. if checkpointDir != "" {
  139. // write checkpoint to the content store
  140. tar := archive.Diff(ctx, "", checkpointDir)
  141. cp, err = c.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, checkpointDir, tar)
  142. // remove the checkpoint when we're done
  143. defer func() {
  144. if cp != nil {
  145. err := c.client.ContentStore().Delete(context.Background(), cp.Digest)
  146. if err != nil {
  147. c.logger.WithError(err).WithFields(logrus.Fields{
  148. "ref": checkpointDir,
  149. "digest": cp.Digest,
  150. }).Warnf("failed to delete temporary checkpoint entry")
  151. }
  152. }
  153. }()
  154. if err := tar.Close(); err != nil {
  155. return -1, errors.Wrap(err, "failed to close checkpoint tar stream")
  156. }
  157. if err != nil {
  158. return -1, errors.Wrapf(err, "failed to upload checkpoint to containerd")
  159. }
  160. }
  161. spec, err := ctr.Spec(ctx)
  162. if err != nil {
  163. return -1, errors.Wrap(err, "failed to retrieve spec")
  164. }
  165. labels, err := ctr.Labels(ctx)
  166. if err != nil {
  167. return -1, errors.Wrap(err, "failed to retreive labels")
  168. }
  169. bundle := labels[DockerContainerBundlePath]
  170. uid, gid := getSpecUser(spec)
  171. t, err = ctr.NewTask(ctx,
  172. func(id string) (cio.IO, error) {
  173. fifos := newFIFOSet(bundle, libcontainerdtypes.InitProcessName, withStdin, spec.Process.Terminal)
  174. rio, err = c.createIO(fifos, id, libcontainerdtypes.InitProcessName, stdinCloseSync, attachStdio)
  175. return rio, err
  176. },
  177. func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
  178. info.Checkpoint = cp
  179. if runtime.GOOS != "windows" {
  180. info.Options = &runctypes.CreateOptions{
  181. IoUid: uint32(uid),
  182. IoGid: uint32(gid),
  183. NoPivotRoot: os.Getenv("DOCKER_RAMDISK") != "",
  184. }
  185. } else {
  186. // Make sure we set the runhcs options to debug if we are at debug level.
  187. if c.logger.Level == logrus.DebugLevel {
  188. info.Options = &options.Options{Debug: true}
  189. }
  190. }
  191. return nil
  192. })
  193. if err != nil {
  194. close(stdinCloseSync)
  195. if rio != nil {
  196. rio.Cancel()
  197. rio.Close()
  198. }
  199. return -1, wrapError(err)
  200. }
  201. // Signal c.createIO that it can call CloseIO
  202. close(stdinCloseSync)
  203. if err := t.Start(ctx); err != nil {
  204. if _, err := t.Delete(ctx); err != nil {
  205. c.logger.WithError(err).WithField("container", id).
  206. Error("failed to delete task after fail start")
  207. }
  208. return -1, wrapError(err)
  209. }
  210. return int(t.Pid()), nil
  211. }
  212. // Exec creates exec process.
  213. //
  214. // The containerd client calls Exec to register the exec config in the shim side.
  215. // When the client calls Start, the shim will create stdin fifo if needs. But
  216. // for the container main process, the stdin fifo will be created in Create not
  217. // the Start call. stdinCloseSync channel should be closed after Start exec
  218. // process.
  219. func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio libcontainerdtypes.StdioCallback) (int, error) {
  220. ctr, err := c.getContainer(ctx, containerID)
  221. if err != nil {
  222. return -1, err
  223. }
  224. t, err := ctr.Task(ctx, nil)
  225. if err != nil {
  226. if containerderrors.IsNotFound(err) {
  227. return -1, errors.WithStack(errdefs.InvalidParameter(errors.New("container is not running")))
  228. }
  229. return -1, wrapError(err)
  230. }
  231. var (
  232. p containerd.Process
  233. rio cio.IO
  234. stdinCloseSync = make(chan struct{})
  235. )
  236. labels, err := ctr.Labels(ctx)
  237. if err != nil {
  238. return -1, wrapError(err)
  239. }
  240. fifos := newFIFOSet(labels[DockerContainerBundlePath], processID, withStdin, spec.Terminal)
  241. defer func() {
  242. if err != nil {
  243. if rio != nil {
  244. rio.Cancel()
  245. rio.Close()
  246. }
  247. }
  248. }()
  249. p, err = t.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
  250. rio, err = c.createIO(fifos, containerID, processID, stdinCloseSync, attachStdio)
  251. return rio, err
  252. })
  253. if err != nil {
  254. close(stdinCloseSync)
  255. if containerderrors.IsAlreadyExists(err) {
  256. return -1, errors.WithStack(errdefs.Conflict(errors.New("id already in use")))
  257. }
  258. return -1, wrapError(err)
  259. }
  260. // Signal c.createIO that it can call CloseIO
  261. //
  262. // the stdin of exec process will be created after p.Start in containerd
  263. defer close(stdinCloseSync)
  264. if err = p.Start(ctx); err != nil {
  265. // use new context for cleanup because old one may be cancelled by user, but leave a timeout to make sure
  266. // we are not waiting forever if containerd is unresponsive or to work around fifo cancelling issues in
  267. // older containerd-shim
  268. ctx, cancel := context.WithTimeout(context.Background(), 45*time.Second)
  269. defer cancel()
  270. p.Delete(ctx)
  271. return -1, wrapError(err)
  272. }
  273. return int(p.Pid()), nil
  274. }
  275. func (c *client) SignalProcess(ctx context.Context, containerID, processID string, signal int) error {
  276. p, err := c.getProcess(ctx, containerID, processID)
  277. if err != nil {
  278. return err
  279. }
  280. return wrapError(p.Kill(ctx, syscall.Signal(signal)))
  281. }
  282. func (c *client) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error {
  283. p, err := c.getProcess(ctx, containerID, processID)
  284. if err != nil {
  285. return err
  286. }
  287. return p.Resize(ctx, uint32(width), uint32(height))
  288. }
  289. func (c *client) CloseStdin(ctx context.Context, containerID, processID string) error {
  290. p, err := c.getProcess(ctx, containerID, processID)
  291. if err != nil {
  292. return err
  293. }
  294. return p.CloseIO(ctx, containerd.WithStdinCloser)
  295. }
  296. func (c *client) Pause(ctx context.Context, containerID string) error {
  297. p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
  298. if err != nil {
  299. return err
  300. }
  301. return wrapError(p.(containerd.Task).Pause(ctx))
  302. }
  303. func (c *client) Resume(ctx context.Context, containerID string) error {
  304. p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
  305. if err != nil {
  306. return err
  307. }
  308. return p.(containerd.Task).Resume(ctx)
  309. }
  310. func (c *client) Stats(ctx context.Context, containerID string) (*libcontainerdtypes.Stats, error) {
  311. p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
  312. if err != nil {
  313. return nil, err
  314. }
  315. m, err := p.(containerd.Task).Metrics(ctx)
  316. if err != nil {
  317. return nil, err
  318. }
  319. v, err := typeurl.UnmarshalAny(m.Data)
  320. if err != nil {
  321. return nil, err
  322. }
  323. return libcontainerdtypes.InterfaceToStats(m.Timestamp, v), nil
  324. }
  325. func (c *client) ListPids(ctx context.Context, containerID string) ([]uint32, error) {
  326. p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
  327. if err != nil {
  328. return nil, err
  329. }
  330. pis, err := p.(containerd.Task).Pids(ctx)
  331. if err != nil {
  332. return nil, err
  333. }
  334. var pids []uint32
  335. for _, i := range pis {
  336. pids = append(pids, i.Pid)
  337. }
  338. return pids, nil
  339. }
  340. func (c *client) Summary(ctx context.Context, containerID string) ([]libcontainerdtypes.Summary, error) {
  341. p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
  342. if err != nil {
  343. return nil, err
  344. }
  345. pis, err := p.(containerd.Task).Pids(ctx)
  346. if err != nil {
  347. return nil, err
  348. }
  349. var infos []libcontainerdtypes.Summary
  350. for _, pi := range pis {
  351. i, err := typeurl.UnmarshalAny(pi.Info)
  352. if err != nil {
  353. return nil, errors.Wrap(err, "unable to decode process details")
  354. }
  355. s, err := summaryFromInterface(i)
  356. if err != nil {
  357. return nil, err
  358. }
  359. infos = append(infos, *s)
  360. }
  361. return infos, nil
  362. }
  363. type restoredProcess struct {
  364. p containerd.Process
  365. }
  366. func (p *restoredProcess) Delete(ctx context.Context) (uint32, time.Time, error) {
  367. if p.p == nil {
  368. return 255, time.Now(), nil
  369. }
  370. status, err := p.p.Delete(ctx)
  371. if err != nil {
  372. return 255, time.Now(), nil
  373. }
  374. return status.ExitCode(), status.ExitTime(), nil
  375. }
  376. func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) {
  377. p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
  378. if err != nil {
  379. return 255, time.Now(), nil
  380. }
  381. status, err := p.Delete(ctx)
  382. if err != nil {
  383. return 255, time.Now(), nil
  384. }
  385. return status.ExitCode(), status.ExitTime(), nil
  386. }
  387. func (c *client) Delete(ctx context.Context, containerID string) error {
  388. ctr, err := c.getContainer(ctx, containerID)
  389. if err != nil {
  390. return err
  391. }
  392. labels, err := ctr.Labels(ctx)
  393. if err != nil {
  394. return err
  395. }
  396. bundle := labels[DockerContainerBundlePath]
  397. if err := ctr.Delete(ctx); err != nil {
  398. return wrapError(err)
  399. }
  400. c.oomMu.Lock()
  401. delete(c.oom, containerID)
  402. c.oomMu.Unlock()
  403. if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" {
  404. if err := os.RemoveAll(bundle); err != nil {
  405. c.logger.WithError(err).WithFields(logrus.Fields{
  406. "container": containerID,
  407. "bundle": bundle,
  408. }).Error("failed to remove state dir")
  409. }
  410. }
  411. return nil
  412. }
  413. func (c *client) Status(ctx context.Context, containerID string) (containerd.ProcessStatus, error) {
  414. t, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
  415. if err != nil {
  416. return containerd.Unknown, err
  417. }
  418. s, err := t.Status(ctx)
  419. if err != nil {
  420. return containerd.Unknown, wrapError(err)
  421. }
  422. return s.Status, nil
  423. }
  424. func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error {
  425. p, err := c.getProcess(ctx, containerID, libcontainerdtypes.InitProcessName)
  426. if err != nil {
  427. return err
  428. }
  429. opts := []containerd.CheckpointTaskOpts{}
  430. if exit {
  431. opts = append(opts, func(r *containerd.CheckpointTaskInfo) error {
  432. if r.Options == nil {
  433. r.Options = &runctypes.CheckpointOptions{
  434. Exit: true,
  435. }
  436. } else {
  437. opts, _ := r.Options.(*runctypes.CheckpointOptions)
  438. opts.Exit = true
  439. }
  440. return nil
  441. })
  442. }
  443. img, err := p.(containerd.Task).Checkpoint(ctx, opts...)
  444. if err != nil {
  445. return wrapError(err)
  446. }
  447. // Whatever happens, delete the checkpoint from containerd
  448. defer func() {
  449. err := c.client.ImageService().Delete(context.Background(), img.Name())
  450. if err != nil {
  451. c.logger.WithError(err).WithField("digest", img.Target().Digest).
  452. Warnf("failed to delete checkpoint image")
  453. }
  454. }()
  455. b, err := content.ReadBlob(ctx, c.client.ContentStore(), img.Target())
  456. if err != nil {
  457. return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data"))
  458. }
  459. var index v1.Index
  460. if err := json.Unmarshal(b, &index); err != nil {
  461. return errdefs.System(errors.Wrapf(err, "failed to decode checkpoint data"))
  462. }
  463. var cpDesc *v1.Descriptor
  464. for _, m := range index.Manifests {
  465. if m.MediaType == images.MediaTypeContainerd1Checkpoint {
  466. cpDesc = &m
  467. break
  468. }
  469. }
  470. if cpDesc == nil {
  471. return errdefs.System(errors.Wrapf(err, "invalid checkpoint"))
  472. }
  473. rat, err := c.client.ContentStore().ReaderAt(ctx, *cpDesc)
  474. if err != nil {
  475. return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader"))
  476. }
  477. defer rat.Close()
  478. _, err = archive.Apply(ctx, checkpointDir, content.NewReader(rat))
  479. if err != nil {
  480. return errdefs.System(errors.Wrapf(err, "failed to read checkpoint reader"))
  481. }
  482. return err
  483. }
  484. func (c *client) getContainer(ctx context.Context, id string) (containerd.Container, error) {
  485. ctr, err := c.client.LoadContainer(ctx, id)
  486. if err != nil {
  487. if containerderrors.IsNotFound(err) {
  488. return nil, errors.WithStack(errdefs.NotFound(errors.New("no such container")))
  489. }
  490. return nil, wrapError(err)
  491. }
  492. return ctr, nil
  493. }
  494. func (c *client) getProcess(ctx context.Context, containerID, processID string) (containerd.Process, error) {
  495. ctr, err := c.getContainer(ctx, containerID)
  496. if err != nil {
  497. return nil, err
  498. }
  499. t, err := ctr.Task(ctx, nil)
  500. if err != nil {
  501. if containerderrors.IsNotFound(err) {
  502. return nil, errors.WithStack(errdefs.NotFound(errors.New("container is not running")))
  503. }
  504. return nil, wrapError(err)
  505. }
  506. if processID == libcontainerdtypes.InitProcessName {
  507. return t, nil
  508. }
  509. p, err := t.LoadProcess(ctx, processID, nil)
  510. if err != nil {
  511. if containerderrors.IsNotFound(err) {
  512. return nil, errors.WithStack(errdefs.NotFound(errors.New("no such exec")))
  513. }
  514. return nil, wrapError(err)
  515. }
  516. return p, nil
  517. }
  518. // createIO creates the io to be used by a process
  519. // This needs to get a pointer to interface as upon closure the process may not have yet been registered
  520. func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, stdinCloseSync chan struct{}, attachStdio libcontainerdtypes.StdioCallback) (cio.IO, error) {
  521. var (
  522. io *cio.DirectIO
  523. err error
  524. )
  525. io, err = c.newDirectIO(context.Background(), fifos)
  526. if err != nil {
  527. return nil, err
  528. }
  529. if io.Stdin != nil {
  530. var (
  531. err error
  532. stdinOnce sync.Once
  533. )
  534. pipe := io.Stdin
  535. io.Stdin = ioutils.NewWriteCloserWrapper(pipe, func() error {
  536. stdinOnce.Do(func() {
  537. err = pipe.Close()
  538. // Do the rest in a new routine to avoid a deadlock if the
  539. // Exec/Start call failed.
  540. go func() {
  541. <-stdinCloseSync
  542. p, err := c.getProcess(context.Background(), containerID, processID)
  543. if err == nil {
  544. err = p.CloseIO(context.Background(), containerd.WithStdinCloser)
  545. if err != nil && strings.Contains(err.Error(), "transport is closing") {
  546. err = nil
  547. }
  548. }
  549. }()
  550. })
  551. return err
  552. })
  553. }
  554. rio, err := attachStdio(io)
  555. if err != nil {
  556. io.Cancel()
  557. io.Close()
  558. }
  559. return rio, err
  560. }
  561. func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventType, ei libcontainerdtypes.EventInfo) {
  562. c.eventQ.Append(ei.ContainerID, func() {
  563. err := c.backend.ProcessEvent(ei.ContainerID, et, ei)
  564. if err != nil {
  565. c.logger.WithError(err).WithFields(logrus.Fields{
  566. "container": ei.ContainerID,
  567. "event": et,
  568. "event-info": ei,
  569. }).Error("failed to process event")
  570. }
  571. if et == libcontainerdtypes.EventExit && ei.ProcessID != ei.ContainerID {
  572. p, err := c.getProcess(ctx, ei.ContainerID, ei.ProcessID)
  573. if err != nil {
  574. c.logger.WithError(errors.New("no such process")).
  575. WithFields(logrus.Fields{
  576. "error": err,
  577. "container": ei.ContainerID,
  578. "process": ei.ProcessID,
  579. }).Error("exit event")
  580. return
  581. }
  582. _, err = p.Delete(context.Background())
  583. if err != nil {
  584. c.logger.WithError(err).WithFields(logrus.Fields{
  585. "container": ei.ContainerID,
  586. "process": ei.ProcessID,
  587. }).Warn("failed to delete process")
  588. }
  589. ctr, err := c.getContainer(ctx, ei.ContainerID)
  590. if err != nil {
  591. c.logger.WithFields(logrus.Fields{
  592. "container": ei.ContainerID,
  593. "error": err,
  594. }).Error("failed to find container")
  595. } else {
  596. labels, err := ctr.Labels(ctx)
  597. if err != nil {
  598. c.logger.WithFields(logrus.Fields{
  599. "container": ei.ContainerID,
  600. "error": err,
  601. }).Error("failed to find container")
  602. return
  603. }
  604. newFIFOSet(labels[DockerContainerBundlePath], ei.ProcessID, true, false).Close()
  605. }
  606. }
  607. })
  608. }
  609. func (c *client) processEventStream(ctx context.Context, ns string) {
  610. var (
  611. err error
  612. ev *events.Envelope
  613. et libcontainerdtypes.EventType
  614. ei libcontainerdtypes.EventInfo
  615. )
  616. // Filter on both namespace *and* topic. To create an "and" filter,
  617. // this must be a single, comma-separated string
  618. eventStream, errC := c.client.EventService().Subscribe(ctx, "namespace=="+ns+",topic~=|^/tasks/|")
  619. c.logger.Debug("processing event stream")
  620. for {
  621. var oomKilled bool
  622. select {
  623. case err = <-errC:
  624. if err != nil {
  625. errStatus, ok := status.FromError(err)
  626. if !ok || errStatus.Code() != codes.Canceled {
  627. c.logger.WithError(err).Error("failed to get event")
  628. go c.processEventStream(ctx, ns)
  629. } else {
  630. c.logger.WithError(ctx.Err()).Info("stopping event stream following graceful shutdown")
  631. }
  632. }
  633. return
  634. case ev = <-eventStream:
  635. if ev.Event == nil {
  636. c.logger.WithField("event", ev).Warn("invalid event")
  637. continue
  638. }
  639. v, err := typeurl.UnmarshalAny(ev.Event)
  640. if err != nil {
  641. c.logger.WithError(err).WithField("event", ev).Warn("failed to unmarshal event")
  642. continue
  643. }
  644. c.logger.WithField("topic", ev.Topic).Debug("event")
  645. switch t := v.(type) {
  646. case *apievents.TaskCreate:
  647. et = libcontainerdtypes.EventCreate
  648. ei = libcontainerdtypes.EventInfo{
  649. ContainerID: t.ContainerID,
  650. ProcessID: t.ContainerID,
  651. Pid: t.Pid,
  652. }
  653. case *apievents.TaskStart:
  654. et = libcontainerdtypes.EventStart
  655. ei = libcontainerdtypes.EventInfo{
  656. ContainerID: t.ContainerID,
  657. ProcessID: t.ContainerID,
  658. Pid: t.Pid,
  659. }
  660. case *apievents.TaskExit:
  661. et = libcontainerdtypes.EventExit
  662. ei = libcontainerdtypes.EventInfo{
  663. ContainerID: t.ContainerID,
  664. ProcessID: t.ID,
  665. Pid: t.Pid,
  666. ExitCode: t.ExitStatus,
  667. ExitedAt: t.ExitedAt,
  668. }
  669. case *apievents.TaskOOM:
  670. et = libcontainerdtypes.EventOOM
  671. ei = libcontainerdtypes.EventInfo{
  672. ContainerID: t.ContainerID,
  673. OOMKilled: true,
  674. }
  675. oomKilled = true
  676. case *apievents.TaskExecAdded:
  677. et = libcontainerdtypes.EventExecAdded
  678. ei = libcontainerdtypes.EventInfo{
  679. ContainerID: t.ContainerID,
  680. ProcessID: t.ExecID,
  681. }
  682. case *apievents.TaskExecStarted:
  683. et = libcontainerdtypes.EventExecStarted
  684. ei = libcontainerdtypes.EventInfo{
  685. ContainerID: t.ContainerID,
  686. ProcessID: t.ExecID,
  687. Pid: t.Pid,
  688. }
  689. case *apievents.TaskPaused:
  690. et = libcontainerdtypes.EventPaused
  691. ei = libcontainerdtypes.EventInfo{
  692. ContainerID: t.ContainerID,
  693. }
  694. case *apievents.TaskResumed:
  695. et = libcontainerdtypes.EventResumed
  696. ei = libcontainerdtypes.EventInfo{
  697. ContainerID: t.ContainerID,
  698. }
  699. default:
  700. c.logger.WithFields(logrus.Fields{
  701. "topic": ev.Topic,
  702. "type": reflect.TypeOf(t)},
  703. ).Info("ignoring event")
  704. continue
  705. }
  706. c.oomMu.Lock()
  707. if oomKilled {
  708. c.oom[ei.ContainerID] = true
  709. }
  710. ei.OOMKilled = c.oom[ei.ContainerID]
  711. c.oomMu.Unlock()
  712. c.processEvent(ctx, et, ei)
  713. }
  714. }
  715. }
  716. func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
  717. writer, err := c.client.ContentStore().Writer(ctx, content.WithRef(ref))
  718. if err != nil {
  719. return nil, err
  720. }
  721. defer writer.Close()
  722. size, err := io.Copy(writer, r)
  723. if err != nil {
  724. return nil, err
  725. }
  726. labels := map[string]string{
  727. "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
  728. }
  729. if err := writer.Commit(ctx, 0, "", content.WithLabels(labels)); err != nil {
  730. return nil, err
  731. }
  732. return &types.Descriptor{
  733. MediaType: mediaType,
  734. Digest: writer.Digest(),
  735. Size_: size,
  736. }, nil
  737. }
  738. func (c *client) bundleDir(id string) string {
  739. return filepath.Join(c.stateDir, id)
  740. }
  741. func wrapError(err error) error {
  742. switch {
  743. case err == nil:
  744. return nil
  745. case containerderrors.IsNotFound(err):
  746. return errdefs.NotFound(err)
  747. }
  748. msg := err.Error()
  749. for _, s := range []string{"container does not exist", "not found", "no such container"} {
  750. if strings.Contains(msg, s) {
  751. return errdefs.NotFound(err)
  752. }
  753. }
  754. return err
  755. }