client_daemon.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877
  1. // +build !windows
  2. package libcontainerd // import "github.com/docker/docker/libcontainerd"
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "reflect"
  11. "runtime"
  12. "strings"
  13. "sync"
  14. "syscall"
  15. "time"
  16. "google.golang.org/grpc"
  17. "google.golang.org/grpc/codes"
  18. "google.golang.org/grpc/status"
  19. "github.com/containerd/containerd"
  20. "github.com/containerd/containerd/api/events"
  21. eventsapi "github.com/containerd/containerd/api/services/events/v1"
  22. "github.com/containerd/containerd/api/types"
  23. "github.com/containerd/containerd/archive"
  24. "github.com/containerd/containerd/cio"
  25. "github.com/containerd/containerd/content"
  26. containerderrors "github.com/containerd/containerd/errdefs"
  27. "github.com/containerd/containerd/images"
  28. "github.com/containerd/containerd/linux/runctypes"
  29. "github.com/containerd/typeurl"
  30. "github.com/docker/docker/errdefs"
  31. "github.com/docker/docker/pkg/ioutils"
  32. "github.com/opencontainers/image-spec/specs-go/v1"
  33. specs "github.com/opencontainers/runtime-spec/specs-go"
  34. "github.com/pkg/errors"
  35. "github.com/sirupsen/logrus"
  36. )
  37. // InitProcessName is the name given to the first process of a
  38. // container
  39. const InitProcessName = "init"
  40. type container struct {
  41. mu sync.Mutex
  42. bundleDir string
  43. ctr containerd.Container
  44. task containerd.Task
  45. execs map[string]containerd.Process
  46. oomKilled bool
  47. }
  48. func (c *container) setTask(t containerd.Task) {
  49. c.mu.Lock()
  50. c.task = t
  51. c.mu.Unlock()
  52. }
  53. func (c *container) getTask() containerd.Task {
  54. c.mu.Lock()
  55. t := c.task
  56. c.mu.Unlock()
  57. return t
  58. }
  59. func (c *container) addProcess(id string, p containerd.Process) {
  60. c.mu.Lock()
  61. if c.execs == nil {
  62. c.execs = make(map[string]containerd.Process)
  63. }
  64. c.execs[id] = p
  65. c.mu.Unlock()
  66. }
  67. func (c *container) deleteProcess(id string) {
  68. c.mu.Lock()
  69. delete(c.execs, id)
  70. c.mu.Unlock()
  71. }
  72. func (c *container) getProcess(id string) containerd.Process {
  73. c.mu.Lock()
  74. p := c.execs[id]
  75. c.mu.Unlock()
  76. return p
  77. }
  78. func (c *container) setOOMKilled(killed bool) {
  79. c.mu.Lock()
  80. c.oomKilled = killed
  81. c.mu.Unlock()
  82. }
  83. func (c *container) getOOMKilled() bool {
  84. c.mu.Lock()
  85. killed := c.oomKilled
  86. c.mu.Unlock()
  87. return killed
  88. }
  89. type client struct {
  90. sync.RWMutex // protects containers map
  91. remote *containerd.Client
  92. stateDir string
  93. logger *logrus.Entry
  94. namespace string
  95. backend Backend
  96. eventQ queue
  97. containers map[string]*container
  98. }
  99. func (c *client) setRemote(remote *containerd.Client) {
  100. c.Lock()
  101. c.remote = remote
  102. c.Unlock()
  103. }
  104. func (c *client) getRemote() *containerd.Client {
  105. c.RLock()
  106. remote := c.remote
  107. c.RUnlock()
  108. return remote
  109. }
  110. func (c *client) Version(ctx context.Context) (containerd.Version, error) {
  111. return c.getRemote().Version(ctx)
  112. }
  113. func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallback) (alive bool, pid int, err error) {
  114. c.Lock()
  115. defer c.Unlock()
  116. var dio *cio.DirectIO
  117. defer func() {
  118. if err != nil && dio != nil {
  119. dio.Cancel()
  120. dio.Close()
  121. }
  122. err = wrapError(err)
  123. }()
  124. ctr, err := c.remote.LoadContainer(ctx, id)
  125. if err != nil {
  126. return false, -1, errors.WithStack(err)
  127. }
  128. attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) {
  129. // dio must be assigned to the previously defined dio for the defer above
  130. // to handle cleanup
  131. dio, err = cio.NewDirectIO(ctx, fifos)
  132. if err != nil {
  133. return nil, err
  134. }
  135. return attachStdio(dio)
  136. }
  137. t, err := ctr.Task(ctx, attachIO)
  138. if err != nil && !containerderrors.IsNotFound(err) {
  139. return false, -1, err
  140. }
  141. if t != nil {
  142. s, err := t.Status(ctx)
  143. if err != nil {
  144. return false, -1, err
  145. }
  146. alive = s.Status != containerd.Stopped
  147. pid = int(t.Pid())
  148. }
  149. c.containers[id] = &container{
  150. bundleDir: filepath.Join(c.stateDir, id),
  151. ctr: ctr,
  152. task: t,
  153. // TODO(mlaventure): load execs
  154. }
  155. c.logger.WithFields(logrus.Fields{
  156. "container": id,
  157. "alive": alive,
  158. "pid": pid,
  159. }).Debug("restored container")
  160. return alive, pid, nil
  161. }
  162. func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, runtimeOptions interface{}) error {
  163. if ctr := c.getContainer(id); ctr != nil {
  164. return errors.WithStack(newConflictError("id already in use"))
  165. }
  166. bdir, err := prepareBundleDir(filepath.Join(c.stateDir, id), ociSpec)
  167. if err != nil {
  168. return errdefs.System(errors.Wrap(err, "prepare bundle dir failed"))
  169. }
  170. c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")
  171. cdCtr, err := c.getRemote().NewContainer(ctx, id,
  172. containerd.WithSpec(ociSpec),
  173. // TODO(mlaventure): when containerd support lcow, revisit runtime value
  174. containerd.WithRuntime(fmt.Sprintf("io.containerd.runtime.v1.%s", runtime.GOOS), runtimeOptions))
  175. if err != nil {
  176. return err
  177. }
  178. c.Lock()
  179. c.containers[id] = &container{
  180. bundleDir: bdir,
  181. ctr: cdCtr,
  182. }
  183. c.Unlock()
  184. return nil
  185. }
  186. // Start create and start a task for the specified containerd id
  187. func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio StdioCallback) (int, error) {
  188. ctr := c.getContainer(id)
  189. if ctr == nil {
  190. return -1, errors.WithStack(newNotFoundError("no such container"))
  191. }
  192. if t := ctr.getTask(); t != nil {
  193. return -1, errors.WithStack(newConflictError("container already started"))
  194. }
  195. var (
  196. cp *types.Descriptor
  197. t containerd.Task
  198. rio cio.IO
  199. err error
  200. stdinCloseSync = make(chan struct{})
  201. )
  202. if checkpointDir != "" {
  203. // write checkpoint to the content store
  204. tar := archive.Diff(ctx, "", checkpointDir)
  205. cp, err = c.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, checkpointDir, tar)
  206. // remove the checkpoint when we're done
  207. defer func() {
  208. if cp != nil {
  209. err := c.getRemote().ContentStore().Delete(context.Background(), cp.Digest)
  210. if err != nil {
  211. c.logger.WithError(err).WithFields(logrus.Fields{
  212. "ref": checkpointDir,
  213. "digest": cp.Digest,
  214. }).Warnf("failed to delete temporary checkpoint entry")
  215. }
  216. }
  217. }()
  218. if err := tar.Close(); err != nil {
  219. return -1, errors.Wrap(err, "failed to close checkpoint tar stream")
  220. }
  221. if err != nil {
  222. return -1, errors.Wrapf(err, "failed to upload checkpoint to containerd")
  223. }
  224. }
  225. spec, err := ctr.ctr.Spec(ctx)
  226. if err != nil {
  227. return -1, errors.Wrap(err, "failed to retrieve spec")
  228. }
  229. uid, gid := getSpecUser(spec)
  230. t, err = ctr.ctr.NewTask(ctx,
  231. func(id string) (cio.IO, error) {
  232. fifos := newFIFOSet(ctr.bundleDir, InitProcessName, withStdin, spec.Process.Terminal)
  233. rio, err = c.createIO(fifos, id, InitProcessName, stdinCloseSync, attachStdio)
  234. return rio, err
  235. },
  236. func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
  237. info.Checkpoint = cp
  238. info.Options = &runctypes.CreateOptions{
  239. IoUid: uint32(uid),
  240. IoGid: uint32(gid),
  241. NoPivotRoot: os.Getenv("DOCKER_RAMDISK") != "",
  242. }
  243. return nil
  244. })
  245. if err != nil {
  246. close(stdinCloseSync)
  247. if rio != nil {
  248. rio.Cancel()
  249. rio.Close()
  250. }
  251. return -1, err
  252. }
  253. ctr.setTask(t)
  254. // Signal c.createIO that it can call CloseIO
  255. close(stdinCloseSync)
  256. if err := t.Start(ctx); err != nil {
  257. if _, err := t.Delete(ctx); err != nil {
  258. c.logger.WithError(err).WithField("container", id).
  259. Error("failed to delete task after fail start")
  260. }
  261. ctr.setTask(nil)
  262. return -1, err
  263. }
  264. return int(t.Pid()), nil
  265. }
  266. func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (int, error) {
  267. ctr := c.getContainer(containerID)
  268. if ctr == nil {
  269. return -1, errors.WithStack(newNotFoundError("no such container"))
  270. }
  271. t := ctr.getTask()
  272. if t == nil {
  273. return -1, errors.WithStack(newInvalidParameterError("container is not running"))
  274. }
  275. if p := ctr.getProcess(processID); p != nil {
  276. return -1, errors.WithStack(newConflictError("id already in use"))
  277. }
  278. var (
  279. p containerd.Process
  280. rio cio.IO
  281. err error
  282. stdinCloseSync = make(chan struct{})
  283. )
  284. fifos := newFIFOSet(ctr.bundleDir, processID, withStdin, spec.Terminal)
  285. defer func() {
  286. if err != nil {
  287. if rio != nil {
  288. rio.Cancel()
  289. rio.Close()
  290. }
  291. }
  292. }()
  293. p, err = t.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
  294. rio, err = c.createIO(fifos, containerID, processID, stdinCloseSync, attachStdio)
  295. return rio, err
  296. })
  297. if err != nil {
  298. close(stdinCloseSync)
  299. return -1, err
  300. }
  301. ctr.addProcess(processID, p)
  302. // Signal c.createIO that it can call CloseIO
  303. close(stdinCloseSync)
  304. if err = p.Start(ctx); err != nil {
  305. p.Delete(context.Background())
  306. ctr.deleteProcess(processID)
  307. return -1, err
  308. }
  309. return int(p.Pid()), nil
  310. }
  311. func (c *client) SignalProcess(ctx context.Context, containerID, processID string, signal int) error {
  312. p, err := c.getProcess(containerID, processID)
  313. if err != nil {
  314. return err
  315. }
  316. return wrapError(p.Kill(ctx, syscall.Signal(signal)))
  317. }
  318. func (c *client) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error {
  319. p, err := c.getProcess(containerID, processID)
  320. if err != nil {
  321. return err
  322. }
  323. return p.Resize(ctx, uint32(width), uint32(height))
  324. }
  325. func (c *client) CloseStdin(ctx context.Context, containerID, processID string) error {
  326. p, err := c.getProcess(containerID, processID)
  327. if err != nil {
  328. return err
  329. }
  330. return p.CloseIO(ctx, containerd.WithStdinCloser)
  331. }
  332. func (c *client) Pause(ctx context.Context, containerID string) error {
  333. p, err := c.getProcess(containerID, InitProcessName)
  334. if err != nil {
  335. return err
  336. }
  337. return p.(containerd.Task).Pause(ctx)
  338. }
  339. func (c *client) Resume(ctx context.Context, containerID string) error {
  340. p, err := c.getProcess(containerID, InitProcessName)
  341. if err != nil {
  342. return err
  343. }
  344. return p.(containerd.Task).Resume(ctx)
  345. }
  346. func (c *client) Stats(ctx context.Context, containerID string) (*Stats, error) {
  347. p, err := c.getProcess(containerID, InitProcessName)
  348. if err != nil {
  349. return nil, err
  350. }
  351. m, err := p.(containerd.Task).Metrics(ctx)
  352. if err != nil {
  353. return nil, err
  354. }
  355. v, err := typeurl.UnmarshalAny(m.Data)
  356. if err != nil {
  357. return nil, err
  358. }
  359. return interfaceToStats(m.Timestamp, v), nil
  360. }
  361. func (c *client) ListPids(ctx context.Context, containerID string) ([]uint32, error) {
  362. p, err := c.getProcess(containerID, InitProcessName)
  363. if err != nil {
  364. return nil, err
  365. }
  366. pis, err := p.(containerd.Task).Pids(ctx)
  367. if err != nil {
  368. return nil, err
  369. }
  370. var pids []uint32
  371. for _, i := range pis {
  372. pids = append(pids, i.Pid)
  373. }
  374. return pids, nil
  375. }
  376. func (c *client) Summary(ctx context.Context, containerID string) ([]Summary, error) {
  377. p, err := c.getProcess(containerID, InitProcessName)
  378. if err != nil {
  379. return nil, err
  380. }
  381. pis, err := p.(containerd.Task).Pids(ctx)
  382. if err != nil {
  383. return nil, err
  384. }
  385. var infos []Summary
  386. for _, pi := range pis {
  387. i, err := typeurl.UnmarshalAny(pi.Info)
  388. if err != nil {
  389. return nil, errors.Wrap(err, "unable to decode process details")
  390. }
  391. s, err := summaryFromInterface(i)
  392. if err != nil {
  393. return nil, err
  394. }
  395. infos = append(infos, *s)
  396. }
  397. return infos, nil
  398. }
  399. func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) {
  400. p, err := c.getProcess(containerID, InitProcessName)
  401. if err != nil {
  402. return 255, time.Now(), nil
  403. }
  404. status, err := p.(containerd.Task).Delete(ctx)
  405. if err != nil {
  406. return 255, time.Now(), nil
  407. }
  408. if ctr := c.getContainer(containerID); ctr != nil {
  409. ctr.setTask(nil)
  410. }
  411. return status.ExitCode(), status.ExitTime(), nil
  412. }
  413. func (c *client) Delete(ctx context.Context, containerID string) error {
  414. ctr := c.getContainer(containerID)
  415. if ctr == nil {
  416. return errors.WithStack(newNotFoundError("no such container"))
  417. }
  418. if err := ctr.ctr.Delete(ctx); err != nil {
  419. return err
  420. }
  421. if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" {
  422. if err := os.RemoveAll(ctr.bundleDir); err != nil {
  423. c.logger.WithError(err).WithFields(logrus.Fields{
  424. "container": containerID,
  425. "bundle": ctr.bundleDir,
  426. }).Error("failed to remove state dir")
  427. }
  428. }
  429. c.removeContainer(containerID)
  430. return nil
  431. }
  432. func (c *client) Status(ctx context.Context, containerID string) (Status, error) {
  433. ctr := c.getContainer(containerID)
  434. if ctr == nil {
  435. return StatusUnknown, errors.WithStack(newNotFoundError("no such container"))
  436. }
  437. t := ctr.getTask()
  438. if t == nil {
  439. return StatusUnknown, errors.WithStack(newNotFoundError("no such task"))
  440. }
  441. s, err := t.Status(ctx)
  442. if err != nil {
  443. return StatusUnknown, err
  444. }
  445. return Status(s.Status), nil
  446. }
  447. func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error {
  448. p, err := c.getProcess(containerID, InitProcessName)
  449. if err != nil {
  450. return err
  451. }
  452. img, err := p.(containerd.Task).Checkpoint(ctx)
  453. if err != nil {
  454. return err
  455. }
  456. // Whatever happens, delete the checkpoint from containerd
  457. defer func() {
  458. err := c.getRemote().ImageService().Delete(context.Background(), img.Name())
  459. if err != nil {
  460. c.logger.WithError(err).WithField("digest", img.Target().Digest).
  461. Warnf("failed to delete checkpoint image")
  462. }
  463. }()
  464. b, err := content.ReadBlob(ctx, c.getRemote().ContentStore(), img.Target().Digest)
  465. if err != nil {
  466. return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data"))
  467. }
  468. var index v1.Index
  469. if err := json.Unmarshal(b, &index); err != nil {
  470. return errdefs.System(errors.Wrapf(err, "failed to decode checkpoint data"))
  471. }
  472. var cpDesc *v1.Descriptor
  473. for _, m := range index.Manifests {
  474. if m.MediaType == images.MediaTypeContainerd1Checkpoint {
  475. cpDesc = &m
  476. break
  477. }
  478. }
  479. if cpDesc == nil {
  480. return errdefs.System(errors.Wrapf(err, "invalid checkpoint"))
  481. }
  482. rat, err := c.getRemote().ContentStore().ReaderAt(ctx, cpDesc.Digest)
  483. if err != nil {
  484. return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader"))
  485. }
  486. defer rat.Close()
  487. _, err = archive.Apply(ctx, checkpointDir, content.NewReader(rat))
  488. if err != nil {
  489. return errdefs.System(errors.Wrapf(err, "failed to read checkpoint reader"))
  490. }
  491. return err
  492. }
  493. func (c *client) getContainer(id string) *container {
  494. c.RLock()
  495. ctr := c.containers[id]
  496. c.RUnlock()
  497. return ctr
  498. }
  499. func (c *client) removeContainer(id string) {
  500. c.Lock()
  501. delete(c.containers, id)
  502. c.Unlock()
  503. }
  504. func (c *client) getProcess(containerID, processID string) (containerd.Process, error) {
  505. ctr := c.getContainer(containerID)
  506. if ctr == nil {
  507. return nil, errors.WithStack(newNotFoundError("no such container"))
  508. }
  509. t := ctr.getTask()
  510. if t == nil {
  511. return nil, errors.WithStack(newNotFoundError("container is not running"))
  512. }
  513. if processID == InitProcessName {
  514. return t, nil
  515. }
  516. p := ctr.getProcess(processID)
  517. if p == nil {
  518. return nil, errors.WithStack(newNotFoundError("no such exec"))
  519. }
  520. return p, nil
  521. }
  522. // createIO creates the io to be used by a process
  523. // This needs to get a pointer to interface as upon closure the process may not have yet been registered
  524. func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, stdinCloseSync chan struct{}, attachStdio StdioCallback) (cio.IO, error) {
  525. io, err := cio.NewDirectIO(context.Background(), fifos)
  526. if err != nil {
  527. return nil, err
  528. }
  529. if io.Stdin != nil {
  530. var (
  531. err error
  532. stdinOnce sync.Once
  533. )
  534. pipe := io.Stdin
  535. io.Stdin = ioutils.NewWriteCloserWrapper(pipe, func() error {
  536. stdinOnce.Do(func() {
  537. err = pipe.Close()
  538. // Do the rest in a new routine to avoid a deadlock if the
  539. // Exec/Start call failed.
  540. go func() {
  541. <-stdinCloseSync
  542. p, err := c.getProcess(containerID, processID)
  543. if err == nil {
  544. err = p.CloseIO(context.Background(), containerd.WithStdinCloser)
  545. if err != nil && strings.Contains(err.Error(), "transport is closing") {
  546. err = nil
  547. }
  548. }
  549. }()
  550. })
  551. return err
  552. })
  553. }
  554. rio, err := attachStdio(io)
  555. if err != nil {
  556. io.Cancel()
  557. io.Close()
  558. }
  559. return rio, err
  560. }
  561. func (c *client) processEvent(ctr *container, et EventType, ei EventInfo) {
  562. c.eventQ.append(ei.ContainerID, func() {
  563. err := c.backend.ProcessEvent(ei.ContainerID, et, ei)
  564. if err != nil {
  565. c.logger.WithError(err).WithFields(logrus.Fields{
  566. "container": ei.ContainerID,
  567. "event": et,
  568. "event-info": ei,
  569. }).Error("failed to process event")
  570. }
  571. if et == EventExit && ei.ProcessID != ei.ContainerID {
  572. p := ctr.getProcess(ei.ProcessID)
  573. if p == nil {
  574. c.logger.WithError(errors.New("no such process")).
  575. WithFields(logrus.Fields{
  576. "container": ei.ContainerID,
  577. "process": ei.ProcessID,
  578. }).Error("exit event")
  579. return
  580. }
  581. _, err = p.Delete(context.Background())
  582. if err != nil {
  583. c.logger.WithError(err).WithFields(logrus.Fields{
  584. "container": ei.ContainerID,
  585. "process": ei.ProcessID,
  586. }).Warn("failed to delete process")
  587. }
  588. ctr.deleteProcess(ei.ProcessID)
  589. ctr := c.getContainer(ei.ContainerID)
  590. if ctr == nil {
  591. c.logger.WithFields(logrus.Fields{
  592. "container": ei.ContainerID,
  593. }).Error("failed to find container")
  594. } else {
  595. newFIFOSet(ctr.bundleDir, ei.ProcessID, true, false).Close()
  596. }
  597. }
  598. })
  599. }
  600. func (c *client) processEventStream(ctx context.Context) {
  601. var (
  602. err error
  603. eventStream eventsapi.Events_SubscribeClient
  604. ev *eventsapi.Envelope
  605. et EventType
  606. ei EventInfo
  607. ctr *container
  608. )
  609. defer func() {
  610. if err != nil {
  611. select {
  612. case <-ctx.Done():
  613. c.logger.WithError(ctx.Err()).
  614. Info("stopping event stream following graceful shutdown")
  615. default:
  616. go c.processEventStream(ctx)
  617. }
  618. }
  619. }()
  620. eventStream, err = c.getRemote().EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{
  621. Filters: []string{
  622. // Filter on both namespace *and* topic. To create an "and" filter,
  623. // this must be a single, comma-separated string
  624. "namespace==" + c.namespace + ",topic~=|^/tasks/|",
  625. },
  626. }, grpc.FailFast(false))
  627. if err != nil {
  628. return
  629. }
  630. c.logger.WithField("namespace", c.namespace).Debug("processing event stream")
  631. var oomKilled bool
  632. for {
  633. ev, err = eventStream.Recv()
  634. if err != nil {
  635. errStatus, ok := status.FromError(err)
  636. if !ok || errStatus.Code() != codes.Canceled {
  637. c.logger.WithError(err).Error("failed to get event")
  638. }
  639. return
  640. }
  641. if ev.Event == nil {
  642. c.logger.WithField("event", ev).Warn("invalid event")
  643. continue
  644. }
  645. v, err := typeurl.UnmarshalAny(ev.Event)
  646. if err != nil {
  647. c.logger.WithError(err).WithField("event", ev).Warn("failed to unmarshal event")
  648. continue
  649. }
  650. c.logger.WithField("topic", ev.Topic).Debug("event")
  651. switch t := v.(type) {
  652. case *events.TaskCreate:
  653. et = EventCreate
  654. ei = EventInfo{
  655. ContainerID: t.ContainerID,
  656. ProcessID: t.ContainerID,
  657. Pid: t.Pid,
  658. }
  659. case *events.TaskStart:
  660. et = EventStart
  661. ei = EventInfo{
  662. ContainerID: t.ContainerID,
  663. ProcessID: t.ContainerID,
  664. Pid: t.Pid,
  665. }
  666. case *events.TaskExit:
  667. et = EventExit
  668. ei = EventInfo{
  669. ContainerID: t.ContainerID,
  670. ProcessID: t.ID,
  671. Pid: t.Pid,
  672. ExitCode: t.ExitStatus,
  673. ExitedAt: t.ExitedAt,
  674. }
  675. case *events.TaskOOM:
  676. et = EventOOM
  677. ei = EventInfo{
  678. ContainerID: t.ContainerID,
  679. OOMKilled: true,
  680. }
  681. oomKilled = true
  682. case *events.TaskExecAdded:
  683. et = EventExecAdded
  684. ei = EventInfo{
  685. ContainerID: t.ContainerID,
  686. ProcessID: t.ExecID,
  687. }
  688. case *events.TaskExecStarted:
  689. et = EventExecStarted
  690. ei = EventInfo{
  691. ContainerID: t.ContainerID,
  692. ProcessID: t.ExecID,
  693. Pid: t.Pid,
  694. }
  695. case *events.TaskPaused:
  696. et = EventPaused
  697. ei = EventInfo{
  698. ContainerID: t.ContainerID,
  699. }
  700. case *events.TaskResumed:
  701. et = EventResumed
  702. ei = EventInfo{
  703. ContainerID: t.ContainerID,
  704. }
  705. default:
  706. c.logger.WithFields(logrus.Fields{
  707. "topic": ev.Topic,
  708. "type": reflect.TypeOf(t)},
  709. ).Info("ignoring event")
  710. continue
  711. }
  712. ctr = c.getContainer(ei.ContainerID)
  713. if ctr == nil {
  714. c.logger.WithField("container", ei.ContainerID).Warn("unknown container")
  715. continue
  716. }
  717. if oomKilled {
  718. ctr.setOOMKilled(true)
  719. oomKilled = false
  720. }
  721. ei.OOMKilled = ctr.getOOMKilled()
  722. c.processEvent(ctr, et, ei)
  723. }
  724. }
  725. func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
  726. writer, err := c.getRemote().ContentStore().Writer(ctx, ref, 0, "")
  727. if err != nil {
  728. return nil, err
  729. }
  730. defer writer.Close()
  731. size, err := io.Copy(writer, r)
  732. if err != nil {
  733. return nil, err
  734. }
  735. labels := map[string]string{
  736. "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
  737. }
  738. if err := writer.Commit(ctx, 0, "", content.WithLabels(labels)); err != nil {
  739. return nil, err
  740. }
  741. return &types.Descriptor{
  742. MediaType: mediaType,
  743. Digest: writer.Digest(),
  744. Size_: size,
  745. }, nil
  746. }
  747. func wrapError(err error) error {
  748. switch {
  749. case err == nil:
  750. return nil
  751. case containerderrors.IsNotFound(err):
  752. return errdefs.NotFound(err)
  753. }
  754. msg := err.Error()
  755. for _, s := range []string{"container does not exist", "not found", "no such container"} {
  756. if strings.Contains(msg, s) {
  757. return errdefs.NotFound(err)
  758. }
  759. }
  760. return err
  761. }