client_daemon.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862
  1. // +build !windows
  2. package libcontainerd
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "reflect"
  11. "runtime"
  12. "strings"
  13. "sync"
  14. "syscall"
  15. "time"
  16. "google.golang.org/grpc"
  17. "google.golang.org/grpc/codes"
  18. "google.golang.org/grpc/status"
  19. "github.com/containerd/containerd"
  20. "github.com/containerd/containerd/api/events"
  21. eventsapi "github.com/containerd/containerd/api/services/events/v1"
  22. "github.com/containerd/containerd/api/types"
  23. "github.com/containerd/containerd/archive"
  24. "github.com/containerd/containerd/cio"
  25. "github.com/containerd/containerd/content"
  26. containerderrors "github.com/containerd/containerd/errdefs"
  27. "github.com/containerd/containerd/images"
  28. "github.com/containerd/containerd/linux/runctypes"
  29. "github.com/containerd/typeurl"
  30. "github.com/docker/docker/errdefs"
  31. "github.com/docker/docker/pkg/ioutils"
  32. "github.com/opencontainers/image-spec/specs-go/v1"
  33. specs "github.com/opencontainers/runtime-spec/specs-go"
  34. "github.com/pkg/errors"
  35. "github.com/sirupsen/logrus"
  36. )
  37. // InitProcessName is the name given to the first process of a
  38. // container
  39. const InitProcessName = "init"
  40. type container struct {
  41. mu sync.Mutex
  42. bundleDir string
  43. ctr containerd.Container
  44. task containerd.Task
  45. execs map[string]containerd.Process
  46. oomKilled bool
  47. }
  48. func (c *container) setTask(t containerd.Task) {
  49. c.mu.Lock()
  50. c.task = t
  51. c.mu.Unlock()
  52. }
  53. func (c *container) getTask() containerd.Task {
  54. c.mu.Lock()
  55. t := c.task
  56. c.mu.Unlock()
  57. return t
  58. }
  59. func (c *container) addProcess(id string, p containerd.Process) {
  60. c.mu.Lock()
  61. if c.execs == nil {
  62. c.execs = make(map[string]containerd.Process)
  63. }
  64. c.execs[id] = p
  65. c.mu.Unlock()
  66. }
  67. func (c *container) deleteProcess(id string) {
  68. c.mu.Lock()
  69. delete(c.execs, id)
  70. c.mu.Unlock()
  71. }
  72. func (c *container) getProcess(id string) containerd.Process {
  73. c.mu.Lock()
  74. p := c.execs[id]
  75. c.mu.Unlock()
  76. return p
  77. }
  78. func (c *container) setOOMKilled(killed bool) {
  79. c.mu.Lock()
  80. c.oomKilled = killed
  81. c.mu.Unlock()
  82. }
  83. func (c *container) getOOMKilled() bool {
  84. c.mu.Lock()
  85. killed := c.oomKilled
  86. c.mu.Unlock()
  87. return killed
  88. }
  89. type client struct {
  90. sync.RWMutex // protects containers map
  91. remote *containerd.Client
  92. stateDir string
  93. logger *logrus.Entry
  94. namespace string
  95. backend Backend
  96. eventQ queue
  97. containers map[string]*container
  98. }
  99. func (c *client) Version(ctx context.Context) (containerd.Version, error) {
  100. return c.remote.Version(ctx)
  101. }
  102. func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallback) (alive bool, pid int, err error) {
  103. c.Lock()
  104. defer c.Unlock()
  105. var dio *cio.DirectIO
  106. defer func() {
  107. if err != nil && dio != nil {
  108. dio.Cancel()
  109. dio.Close()
  110. }
  111. err = wrapError(err)
  112. }()
  113. ctr, err := c.remote.LoadContainer(ctx, id)
  114. if err != nil {
  115. return false, -1, errors.WithStack(err)
  116. }
  117. attachIO := func(fifos *cio.FIFOSet) (cio.IO, error) {
  118. // dio must be assigned to the previously defined dio for the defer above
  119. // to handle cleanup
  120. dio, err = cio.NewDirectIO(ctx, fifos)
  121. if err != nil {
  122. return nil, err
  123. }
  124. return attachStdio(dio)
  125. }
  126. t, err := ctr.Task(ctx, attachIO)
  127. if err != nil && !errdefs.IsNotFound(errors.Cause(err)) {
  128. return false, -1, err
  129. }
  130. if t != nil {
  131. s, err := t.Status(ctx)
  132. if err != nil {
  133. return false, -1, err
  134. }
  135. alive = s.Status != containerd.Stopped
  136. pid = int(t.Pid())
  137. }
  138. c.containers[id] = &container{
  139. bundleDir: filepath.Join(c.stateDir, id),
  140. ctr: ctr,
  141. task: t,
  142. // TODO(mlaventure): load execs
  143. }
  144. c.logger.WithFields(logrus.Fields{
  145. "container": id,
  146. "alive": alive,
  147. "pid": pid,
  148. }).Debug("restored container")
  149. return alive, pid, nil
  150. }
  151. func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, runtimeOptions interface{}) error {
  152. if ctr := c.getContainer(id); ctr != nil {
  153. return errors.WithStack(newConflictError("id already in use"))
  154. }
  155. bdir, err := prepareBundleDir(filepath.Join(c.stateDir, id), ociSpec)
  156. if err != nil {
  157. return errdefs.System(errors.Wrap(err, "prepare bundle dir failed"))
  158. }
  159. c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")
  160. cdCtr, err := c.remote.NewContainer(ctx, id,
  161. containerd.WithSpec(ociSpec),
  162. // TODO(mlaventure): when containerd support lcow, revisit runtime value
  163. containerd.WithRuntime(fmt.Sprintf("io.containerd.runtime.v1.%s", runtime.GOOS), runtimeOptions))
  164. if err != nil {
  165. return err
  166. }
  167. c.Lock()
  168. c.containers[id] = &container{
  169. bundleDir: bdir,
  170. ctr: cdCtr,
  171. }
  172. c.Unlock()
  173. return nil
  174. }
  175. // Start create and start a task for the specified containerd id
  176. func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio StdioCallback) (int, error) {
  177. ctr := c.getContainer(id)
  178. if ctr == nil {
  179. return -1, errors.WithStack(newNotFoundError("no such container"))
  180. }
  181. if t := ctr.getTask(); t != nil {
  182. return -1, errors.WithStack(newConflictError("container already started"))
  183. }
  184. var (
  185. cp *types.Descriptor
  186. t containerd.Task
  187. rio cio.IO
  188. err error
  189. stdinCloseSync = make(chan struct{})
  190. )
  191. if checkpointDir != "" {
  192. // write checkpoint to the content store
  193. tar := archive.Diff(ctx, "", checkpointDir)
  194. cp, err = c.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, checkpointDir, tar)
  195. // remove the checkpoint when we're done
  196. defer func() {
  197. if cp != nil {
  198. err := c.remote.ContentStore().Delete(context.Background(), cp.Digest)
  199. if err != nil {
  200. c.logger.WithError(err).WithFields(logrus.Fields{
  201. "ref": checkpointDir,
  202. "digest": cp.Digest,
  203. }).Warnf("failed to delete temporary checkpoint entry")
  204. }
  205. }
  206. }()
  207. if err := tar.Close(); err != nil {
  208. return -1, errors.Wrap(err, "failed to close checkpoint tar stream")
  209. }
  210. if err != nil {
  211. return -1, errors.Wrapf(err, "failed to upload checkpoint to containerd")
  212. }
  213. }
  214. spec, err := ctr.ctr.Spec(ctx)
  215. if err != nil {
  216. return -1, errors.Wrap(err, "failed to retrieve spec")
  217. }
  218. uid, gid := getSpecUser(spec)
  219. t, err = ctr.ctr.NewTask(ctx,
  220. func(id string) (cio.IO, error) {
  221. fifos := newFIFOSet(ctr.bundleDir, InitProcessName, withStdin, spec.Process.Terminal)
  222. rio, err = c.createIO(fifos, id, InitProcessName, stdinCloseSync, attachStdio)
  223. return rio, err
  224. },
  225. func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
  226. info.Checkpoint = cp
  227. info.Options = &runctypes.CreateOptions{
  228. IoUid: uint32(uid),
  229. IoGid: uint32(gid),
  230. NoPivotRoot: os.Getenv("DOCKER_RAMDISK") != "",
  231. }
  232. return nil
  233. })
  234. if err != nil {
  235. close(stdinCloseSync)
  236. if rio != nil {
  237. rio.Cancel()
  238. rio.Close()
  239. }
  240. return -1, err
  241. }
  242. ctr.setTask(t)
  243. // Signal c.createIO that it can call CloseIO
  244. close(stdinCloseSync)
  245. if err := t.Start(ctx); err != nil {
  246. if _, err := t.Delete(ctx); err != nil {
  247. c.logger.WithError(err).WithField("container", id).
  248. Error("failed to delete task after fail start")
  249. }
  250. ctr.setTask(nil)
  251. return -1, err
  252. }
  253. return int(t.Pid()), nil
  254. }
  255. func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (int, error) {
  256. ctr := c.getContainer(containerID)
  257. if ctr == nil {
  258. return -1, errors.WithStack(newNotFoundError("no such container"))
  259. }
  260. t := ctr.getTask()
  261. if t == nil {
  262. return -1, errors.WithStack(newInvalidParameterError("container is not running"))
  263. }
  264. if p := ctr.getProcess(processID); p != nil {
  265. return -1, errors.WithStack(newConflictError("id already in use"))
  266. }
  267. var (
  268. p containerd.Process
  269. rio cio.IO
  270. err error
  271. stdinCloseSync = make(chan struct{})
  272. )
  273. fifos := newFIFOSet(ctr.bundleDir, processID, withStdin, spec.Terminal)
  274. defer func() {
  275. if err != nil {
  276. if rio != nil {
  277. rio.Cancel()
  278. rio.Close()
  279. }
  280. }
  281. }()
  282. p, err = t.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
  283. rio, err = c.createIO(fifos, containerID, processID, stdinCloseSync, attachStdio)
  284. return rio, err
  285. })
  286. if err != nil {
  287. close(stdinCloseSync)
  288. return -1, err
  289. }
  290. ctr.addProcess(processID, p)
  291. // Signal c.createIO that it can call CloseIO
  292. close(stdinCloseSync)
  293. if err = p.Start(ctx); err != nil {
  294. p.Delete(context.Background())
  295. ctr.deleteProcess(processID)
  296. return -1, err
  297. }
  298. return int(p.Pid()), nil
  299. }
  300. func (c *client) SignalProcess(ctx context.Context, containerID, processID string, signal int) error {
  301. p, err := c.getProcess(containerID, processID)
  302. if err != nil {
  303. return err
  304. }
  305. return wrapError(p.Kill(ctx, syscall.Signal(signal)))
  306. }
  307. func (c *client) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error {
  308. p, err := c.getProcess(containerID, processID)
  309. if err != nil {
  310. return err
  311. }
  312. return p.Resize(ctx, uint32(width), uint32(height))
  313. }
  314. func (c *client) CloseStdin(ctx context.Context, containerID, processID string) error {
  315. p, err := c.getProcess(containerID, processID)
  316. if err != nil {
  317. return err
  318. }
  319. return p.CloseIO(ctx, containerd.WithStdinCloser)
  320. }
  321. func (c *client) Pause(ctx context.Context, containerID string) error {
  322. p, err := c.getProcess(containerID, InitProcessName)
  323. if err != nil {
  324. return err
  325. }
  326. return p.(containerd.Task).Pause(ctx)
  327. }
  328. func (c *client) Resume(ctx context.Context, containerID string) error {
  329. p, err := c.getProcess(containerID, InitProcessName)
  330. if err != nil {
  331. return err
  332. }
  333. return p.(containerd.Task).Resume(ctx)
  334. }
  335. func (c *client) Stats(ctx context.Context, containerID string) (*Stats, error) {
  336. p, err := c.getProcess(containerID, InitProcessName)
  337. if err != nil {
  338. return nil, err
  339. }
  340. m, err := p.(containerd.Task).Metrics(ctx)
  341. if err != nil {
  342. return nil, err
  343. }
  344. v, err := typeurl.UnmarshalAny(m.Data)
  345. if err != nil {
  346. return nil, err
  347. }
  348. return interfaceToStats(m.Timestamp, v), nil
  349. }
  350. func (c *client) ListPids(ctx context.Context, containerID string) ([]uint32, error) {
  351. p, err := c.getProcess(containerID, InitProcessName)
  352. if err != nil {
  353. return nil, err
  354. }
  355. pis, err := p.(containerd.Task).Pids(ctx)
  356. if err != nil {
  357. return nil, err
  358. }
  359. var pids []uint32
  360. for _, i := range pis {
  361. pids = append(pids, i.Pid)
  362. }
  363. return pids, nil
  364. }
  365. func (c *client) Summary(ctx context.Context, containerID string) ([]Summary, error) {
  366. p, err := c.getProcess(containerID, InitProcessName)
  367. if err != nil {
  368. return nil, err
  369. }
  370. pis, err := p.(containerd.Task).Pids(ctx)
  371. if err != nil {
  372. return nil, err
  373. }
  374. var infos []Summary
  375. for _, pi := range pis {
  376. i, err := typeurl.UnmarshalAny(pi.Info)
  377. if err != nil {
  378. return nil, errors.Wrap(err, "unable to decode process details")
  379. }
  380. s, err := summaryFromInterface(i)
  381. if err != nil {
  382. return nil, err
  383. }
  384. infos = append(infos, *s)
  385. }
  386. return infos, nil
  387. }
  388. func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) {
  389. p, err := c.getProcess(containerID, InitProcessName)
  390. if err != nil {
  391. return 255, time.Now(), nil
  392. }
  393. status, err := p.(containerd.Task).Delete(ctx)
  394. if err != nil {
  395. return 255, time.Now(), nil
  396. }
  397. if ctr := c.getContainer(containerID); ctr != nil {
  398. ctr.setTask(nil)
  399. }
  400. return status.ExitCode(), status.ExitTime(), nil
  401. }
  402. func (c *client) Delete(ctx context.Context, containerID string) error {
  403. ctr := c.getContainer(containerID)
  404. if ctr == nil {
  405. return errors.WithStack(newNotFoundError("no such container"))
  406. }
  407. if err := ctr.ctr.Delete(ctx); err != nil {
  408. return err
  409. }
  410. if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" {
  411. if err := os.RemoveAll(ctr.bundleDir); err != nil {
  412. c.logger.WithError(err).WithFields(logrus.Fields{
  413. "container": containerID,
  414. "bundle": ctr.bundleDir,
  415. }).Error("failed to remove state dir")
  416. }
  417. }
  418. c.removeContainer(containerID)
  419. return nil
  420. }
  421. func (c *client) Status(ctx context.Context, containerID string) (Status, error) {
  422. ctr := c.getContainer(containerID)
  423. if ctr == nil {
  424. return StatusUnknown, errors.WithStack(newNotFoundError("no such container"))
  425. }
  426. t := ctr.getTask()
  427. if t == nil {
  428. return StatusUnknown, errors.WithStack(newNotFoundError("no such task"))
  429. }
  430. s, err := t.Status(ctx)
  431. if err != nil {
  432. return StatusUnknown, err
  433. }
  434. return Status(s.Status), nil
  435. }
  436. func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error {
  437. p, err := c.getProcess(containerID, InitProcessName)
  438. if err != nil {
  439. return err
  440. }
  441. img, err := p.(containerd.Task).Checkpoint(ctx)
  442. if err != nil {
  443. return err
  444. }
  445. // Whatever happens, delete the checkpoint from containerd
  446. defer func() {
  447. err := c.remote.ImageService().Delete(context.Background(), img.Name())
  448. if err != nil {
  449. c.logger.WithError(err).WithField("digest", img.Target().Digest).
  450. Warnf("failed to delete checkpoint image")
  451. }
  452. }()
  453. b, err := content.ReadBlob(ctx, c.remote.ContentStore(), img.Target().Digest)
  454. if err != nil {
  455. return errdefs.System(errors.Wrapf(err, "failed to retrieve checkpoint data"))
  456. }
  457. var index v1.Index
  458. if err := json.Unmarshal(b, &index); err != nil {
  459. return errdefs.System(errors.Wrapf(err, "failed to decode checkpoint data"))
  460. }
  461. var cpDesc *v1.Descriptor
  462. for _, m := range index.Manifests {
  463. if m.MediaType == images.MediaTypeContainerd1Checkpoint {
  464. cpDesc = &m
  465. break
  466. }
  467. }
  468. if cpDesc == nil {
  469. return errdefs.System(errors.Wrapf(err, "invalid checkpoint"))
  470. }
  471. rat, err := c.remote.ContentStore().ReaderAt(ctx, cpDesc.Digest)
  472. if err != nil {
  473. return errdefs.System(errors.Wrapf(err, "failed to get checkpoint reader"))
  474. }
  475. defer rat.Close()
  476. _, err = archive.Apply(ctx, checkpointDir, content.NewReader(rat))
  477. if err != nil {
  478. return errdefs.System(errors.Wrapf(err, "failed to read checkpoint reader"))
  479. }
  480. return err
  481. }
  482. func (c *client) getContainer(id string) *container {
  483. c.RLock()
  484. ctr := c.containers[id]
  485. c.RUnlock()
  486. return ctr
  487. }
  488. func (c *client) removeContainer(id string) {
  489. c.Lock()
  490. delete(c.containers, id)
  491. c.Unlock()
  492. }
  493. func (c *client) getProcess(containerID, processID string) (containerd.Process, error) {
  494. ctr := c.getContainer(containerID)
  495. if ctr == nil {
  496. return nil, errors.WithStack(newNotFoundError("no such container"))
  497. }
  498. t := ctr.getTask()
  499. if t == nil {
  500. return nil, errors.WithStack(newNotFoundError("container is not running"))
  501. }
  502. if processID == InitProcessName {
  503. return t, nil
  504. }
  505. p := ctr.getProcess(processID)
  506. if p == nil {
  507. return nil, errors.WithStack(newNotFoundError("no such exec"))
  508. }
  509. return p, nil
  510. }
  511. // createIO creates the io to be used by a process
  512. // This needs to get a pointer to interface as upon closure the process may not have yet been registered
  513. func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, stdinCloseSync chan struct{}, attachStdio StdioCallback) (cio.IO, error) {
  514. io, err := cio.NewDirectIO(context.Background(), fifos)
  515. if err != nil {
  516. return nil, err
  517. }
  518. if io.Stdin != nil {
  519. var (
  520. err error
  521. stdinOnce sync.Once
  522. )
  523. pipe := io.Stdin
  524. io.Stdin = ioutils.NewWriteCloserWrapper(pipe, func() error {
  525. stdinOnce.Do(func() {
  526. err = pipe.Close()
  527. // Do the rest in a new routine to avoid a deadlock if the
  528. // Exec/Start call failed.
  529. go func() {
  530. <-stdinCloseSync
  531. p, err := c.getProcess(containerID, processID)
  532. if err == nil {
  533. err = p.CloseIO(context.Background(), containerd.WithStdinCloser)
  534. if err != nil && strings.Contains(err.Error(), "transport is closing") {
  535. err = nil
  536. }
  537. }
  538. }()
  539. })
  540. return err
  541. })
  542. }
  543. rio, err := attachStdio(io)
  544. if err != nil {
  545. io.Cancel()
  546. io.Close()
  547. }
  548. return rio, err
  549. }
  550. func (c *client) processEvent(ctr *container, et EventType, ei EventInfo) {
  551. c.eventQ.append(ei.ContainerID, func() {
  552. err := c.backend.ProcessEvent(ei.ContainerID, et, ei)
  553. if err != nil {
  554. c.logger.WithError(err).WithFields(logrus.Fields{
  555. "container": ei.ContainerID,
  556. "event": et,
  557. "event-info": ei,
  558. }).Error("failed to process event")
  559. }
  560. if et == EventExit && ei.ProcessID != ei.ContainerID {
  561. p := ctr.getProcess(ei.ProcessID)
  562. if p == nil {
  563. c.logger.WithError(errors.New("no such process")).
  564. WithFields(logrus.Fields{
  565. "container": ei.ContainerID,
  566. "process": ei.ProcessID,
  567. }).Error("exit event")
  568. return
  569. }
  570. _, err = p.Delete(context.Background())
  571. if err != nil {
  572. c.logger.WithError(err).WithFields(logrus.Fields{
  573. "container": ei.ContainerID,
  574. "process": ei.ProcessID,
  575. }).Warn("failed to delete process")
  576. }
  577. ctr.deleteProcess(ei.ProcessID)
  578. ctr := c.getContainer(ei.ContainerID)
  579. if ctr == nil {
  580. c.logger.WithFields(logrus.Fields{
  581. "container": ei.ContainerID,
  582. }).Error("failed to find container")
  583. } else {
  584. newFIFOSet(ctr.bundleDir, ei.ProcessID, true, false).Close()
  585. }
  586. }
  587. })
  588. }
  589. func (c *client) processEventStream(ctx context.Context) {
  590. var (
  591. err error
  592. eventStream eventsapi.Events_SubscribeClient
  593. ev *eventsapi.Envelope
  594. et EventType
  595. ei EventInfo
  596. ctr *container
  597. )
  598. defer func() {
  599. if err != nil {
  600. select {
  601. case <-ctx.Done():
  602. c.logger.WithError(ctx.Err()).
  603. Info("stopping event stream following graceful shutdown")
  604. default:
  605. go c.processEventStream(ctx)
  606. }
  607. }
  608. }()
  609. eventStream, err = c.remote.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{
  610. Filters: []string{
  611. // Filter on both namespace *and* topic. To create an "and" filter,
  612. // this must be a single, comma-separated string
  613. "namespace==" + c.namespace + ",topic~=|^/tasks/|",
  614. },
  615. }, grpc.FailFast(false))
  616. if err != nil {
  617. return
  618. }
  619. var oomKilled bool
  620. for {
  621. ev, err = eventStream.Recv()
  622. if err != nil {
  623. errStatus, ok := status.FromError(err)
  624. if !ok || errStatus.Code() != codes.Canceled {
  625. c.logger.WithError(err).Error("failed to get event")
  626. }
  627. return
  628. }
  629. if ev.Event == nil {
  630. c.logger.WithField("event", ev).Warn("invalid event")
  631. continue
  632. }
  633. v, err := typeurl.UnmarshalAny(ev.Event)
  634. if err != nil {
  635. c.logger.WithError(err).WithField("event", ev).Warn("failed to unmarshal event")
  636. continue
  637. }
  638. c.logger.WithField("topic", ev.Topic).Debug("event")
  639. switch t := v.(type) {
  640. case *events.TaskCreate:
  641. et = EventCreate
  642. ei = EventInfo{
  643. ContainerID: t.ContainerID,
  644. ProcessID: t.ContainerID,
  645. Pid: t.Pid,
  646. }
  647. case *events.TaskStart:
  648. et = EventStart
  649. ei = EventInfo{
  650. ContainerID: t.ContainerID,
  651. ProcessID: t.ContainerID,
  652. Pid: t.Pid,
  653. }
  654. case *events.TaskExit:
  655. et = EventExit
  656. ei = EventInfo{
  657. ContainerID: t.ContainerID,
  658. ProcessID: t.ID,
  659. Pid: t.Pid,
  660. ExitCode: t.ExitStatus,
  661. ExitedAt: t.ExitedAt,
  662. }
  663. case *events.TaskOOM:
  664. et = EventOOM
  665. ei = EventInfo{
  666. ContainerID: t.ContainerID,
  667. OOMKilled: true,
  668. }
  669. oomKilled = true
  670. case *events.TaskExecAdded:
  671. et = EventExecAdded
  672. ei = EventInfo{
  673. ContainerID: t.ContainerID,
  674. ProcessID: t.ExecID,
  675. }
  676. case *events.TaskExecStarted:
  677. et = EventExecStarted
  678. ei = EventInfo{
  679. ContainerID: t.ContainerID,
  680. ProcessID: t.ExecID,
  681. Pid: t.Pid,
  682. }
  683. case *events.TaskPaused:
  684. et = EventPaused
  685. ei = EventInfo{
  686. ContainerID: t.ContainerID,
  687. }
  688. case *events.TaskResumed:
  689. et = EventResumed
  690. ei = EventInfo{
  691. ContainerID: t.ContainerID,
  692. }
  693. default:
  694. c.logger.WithFields(logrus.Fields{
  695. "topic": ev.Topic,
  696. "type": reflect.TypeOf(t)},
  697. ).Info("ignoring event")
  698. continue
  699. }
  700. ctr = c.getContainer(ei.ContainerID)
  701. if ctr == nil {
  702. c.logger.WithField("container", ei.ContainerID).Warn("unknown container")
  703. continue
  704. }
  705. if oomKilled {
  706. ctr.setOOMKilled(true)
  707. oomKilled = false
  708. }
  709. ei.OOMKilled = ctr.getOOMKilled()
  710. c.processEvent(ctr, et, ei)
  711. }
  712. }
  713. func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
  714. writer, err := c.remote.ContentStore().Writer(ctx, ref, 0, "")
  715. if err != nil {
  716. return nil, err
  717. }
  718. defer writer.Close()
  719. size, err := io.Copy(writer, r)
  720. if err != nil {
  721. return nil, err
  722. }
  723. labels := map[string]string{
  724. "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
  725. }
  726. if err := writer.Commit(ctx, 0, "", content.WithLabels(labels)); err != nil {
  727. return nil, err
  728. }
  729. return &types.Descriptor{
  730. MediaType: mediaType,
  731. Digest: writer.Digest(),
  732. Size_: size,
  733. }, nil
  734. }
  735. func wrapError(err error) error {
  736. switch {
  737. case err == nil:
  738. return nil
  739. case containerderrors.IsNotFound(err):
  740. return errdefs.NotFound(err)
  741. }
  742. msg := err.Error()
  743. for _, s := range []string{"container does not exist", "not found", "no such container"} {
  744. if strings.Contains(msg, s) {
  745. return errdefs.NotFound(err)
  746. }
  747. }
  748. return err
  749. }