client_daemon.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825
  1. // +build !windows
  2. package libcontainerd
  3. import (
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "os"
  9. "path/filepath"
  10. "reflect"
  11. "runtime"
  12. "strings"
  13. "sync"
  14. "syscall"
  15. "time"
  16. "google.golang.org/grpc"
  17. "google.golang.org/grpc/codes"
  18. "google.golang.org/grpc/status"
  19. "github.com/containerd/containerd"
  20. "github.com/containerd/containerd/api/events"
  21. eventsapi "github.com/containerd/containerd/api/services/events/v1"
  22. "github.com/containerd/containerd/api/types"
  23. "github.com/containerd/containerd/archive"
  24. "github.com/containerd/containerd/cio"
  25. "github.com/containerd/containerd/content"
  26. "github.com/containerd/containerd/images"
  27. "github.com/containerd/containerd/linux/runctypes"
  28. "github.com/containerd/typeurl"
  29. "github.com/docker/docker/pkg/ioutils"
  30. "github.com/opencontainers/image-spec/specs-go/v1"
  31. specs "github.com/opencontainers/runtime-spec/specs-go"
  32. "github.com/pkg/errors"
  33. "github.com/sirupsen/logrus"
  34. )
  35. // InitProcessName is the name given to the first process of a
  36. // container
  37. const InitProcessName = "init"
  38. type container struct {
  39. sync.Mutex
  40. bundleDir string
  41. ctr containerd.Container
  42. task containerd.Task
  43. execs map[string]containerd.Process
  44. oomKilled bool
  45. }
  46. type client struct {
  47. sync.RWMutex // protects containers map
  48. remote *containerd.Client
  49. stateDir string
  50. logger *logrus.Entry
  51. namespace string
  52. backend Backend
  53. eventQ queue
  54. containers map[string]*container
  55. }
  56. func (c *client) Version(ctx context.Context) (containerd.Version, error) {
  57. return c.remote.Version(ctx)
  58. }
  59. func (c *client) Restore(ctx context.Context, id string, attachStdio StdioCallback) (alive bool, pid int, err error) {
  60. c.Lock()
  61. defer c.Unlock()
  62. var rio cio.IO
  63. defer func() {
  64. err = wrapError(err)
  65. }()
  66. ctr, err := c.remote.LoadContainer(ctx, id)
  67. if err != nil {
  68. return false, -1, errors.WithStack(err)
  69. }
  70. defer func() {
  71. if err != nil && rio != nil {
  72. rio.Cancel()
  73. rio.Close()
  74. }
  75. }()
  76. t, err := ctr.Task(ctx, func(fifos *cio.FIFOSet) (cio.IO, error) {
  77. io, err := newIOPipe(fifos)
  78. if err != nil {
  79. return nil, err
  80. }
  81. rio, err = attachStdio(io)
  82. return rio, err
  83. })
  84. if err != nil && !strings.Contains(err.Error(), "no running task found") {
  85. return false, -1, err
  86. }
  87. if t != nil {
  88. s, err := t.Status(ctx)
  89. if err != nil {
  90. return false, -1, err
  91. }
  92. alive = s.Status != containerd.Stopped
  93. pid = int(t.Pid())
  94. }
  95. c.containers[id] = &container{
  96. bundleDir: filepath.Join(c.stateDir, id),
  97. ctr: ctr,
  98. task: t,
  99. // TODO(mlaventure): load execs
  100. }
  101. c.logger.WithFields(logrus.Fields{
  102. "container": id,
  103. "alive": alive,
  104. "pid": pid,
  105. }).Debug("restored container")
  106. return alive, pid, nil
  107. }
  108. func (c *client) Create(ctx context.Context, id string, ociSpec *specs.Spec, runtimeOptions interface{}) error {
  109. if ctr := c.getContainer(id); ctr != nil {
  110. return errors.WithStack(newConflictError("id already in use"))
  111. }
  112. bdir, err := prepareBundleDir(filepath.Join(c.stateDir, id), ociSpec)
  113. if err != nil {
  114. return wrapSystemError(errors.Wrap(err, "prepare bundle dir failed"))
  115. }
  116. c.logger.WithField("bundle", bdir).WithField("root", ociSpec.Root.Path).Debug("bundle dir created")
  117. cdCtr, err := c.remote.NewContainer(ctx, id,
  118. containerd.WithSpec(ociSpec),
  119. // TODO(mlaventure): when containerd support lcow, revisit runtime value
  120. containerd.WithRuntime(fmt.Sprintf("io.containerd.runtime.v1.%s", runtime.GOOS), runtimeOptions))
  121. if err != nil {
  122. return err
  123. }
  124. c.Lock()
  125. c.containers[id] = &container{
  126. bundleDir: bdir,
  127. ctr: cdCtr,
  128. }
  129. c.Unlock()
  130. return nil
  131. }
  132. // Start create and start a task for the specified containerd id
  133. func (c *client) Start(ctx context.Context, id, checkpointDir string, withStdin bool, attachStdio StdioCallback) (int, error) {
  134. ctr := c.getContainer(id)
  135. switch {
  136. case ctr == nil:
  137. return -1, errors.WithStack(newNotFoundError("no such container"))
  138. case ctr.task != nil:
  139. return -1, errors.WithStack(newConflictError("container already started"))
  140. }
  141. var (
  142. cp *types.Descriptor
  143. t containerd.Task
  144. rio cio.IO
  145. err error
  146. stdinCloseSync = make(chan struct{})
  147. )
  148. if checkpointDir != "" {
  149. // write checkpoint to the content store
  150. tar := archive.Diff(ctx, "", checkpointDir)
  151. cp, err = c.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, checkpointDir, tar)
  152. // remove the checkpoint when we're done
  153. defer func() {
  154. if cp != nil {
  155. err := c.remote.ContentStore().Delete(context.Background(), cp.Digest)
  156. if err != nil {
  157. c.logger.WithError(err).WithFields(logrus.Fields{
  158. "ref": checkpointDir,
  159. "digest": cp.Digest,
  160. }).Warnf("failed to delete temporary checkpoint entry")
  161. }
  162. }
  163. }()
  164. if err := tar.Close(); err != nil {
  165. return -1, errors.Wrap(err, "failed to close checkpoint tar stream")
  166. }
  167. if err != nil {
  168. return -1, errors.Wrapf(err, "failed to upload checkpoint to containerd")
  169. }
  170. }
  171. spec, err := ctr.ctr.Spec(ctx)
  172. if err != nil {
  173. return -1, errors.Wrap(err, "failed to retrieve spec")
  174. }
  175. uid, gid := getSpecUser(spec)
  176. t, err = ctr.ctr.NewTask(ctx,
  177. func(id string) (cio.IO, error) {
  178. fifos := newFIFOSet(ctr.bundleDir, id, InitProcessName, withStdin, spec.Process.Terminal)
  179. rio, err = c.createIO(fifos, id, InitProcessName, stdinCloseSync, attachStdio)
  180. return rio, err
  181. },
  182. func(_ context.Context, _ *containerd.Client, info *containerd.TaskInfo) error {
  183. info.Checkpoint = cp
  184. info.Options = &runctypes.CreateOptions{
  185. IoUid: uint32(uid),
  186. IoGid: uint32(gid),
  187. }
  188. return nil
  189. })
  190. if err != nil {
  191. close(stdinCloseSync)
  192. if rio != nil {
  193. rio.Cancel()
  194. rio.Close()
  195. }
  196. return -1, err
  197. }
  198. c.Lock()
  199. c.containers[id].task = t
  200. c.Unlock()
  201. // Signal c.createIO that it can call CloseIO
  202. close(stdinCloseSync)
  203. if err := t.Start(ctx); err != nil {
  204. if _, err := t.Delete(ctx); err != nil {
  205. c.logger.WithError(err).WithField("container", id).
  206. Error("failed to delete task after fail start")
  207. }
  208. c.Lock()
  209. c.containers[id].task = nil
  210. c.Unlock()
  211. return -1, err
  212. }
  213. return int(t.Pid()), nil
  214. }
  215. func (c *client) Exec(ctx context.Context, containerID, processID string, spec *specs.Process, withStdin bool, attachStdio StdioCallback) (int, error) {
  216. ctr := c.getContainer(containerID)
  217. switch {
  218. case ctr == nil:
  219. return -1, errors.WithStack(newNotFoundError("no such container"))
  220. case ctr.task == nil:
  221. return -1, errors.WithStack(newInvalidParameterError("container is not running"))
  222. case ctr.execs != nil && ctr.execs[processID] != nil:
  223. return -1, errors.WithStack(newConflictError("id already in use"))
  224. }
  225. var (
  226. p containerd.Process
  227. rio cio.IO
  228. err error
  229. stdinCloseSync = make(chan struct{})
  230. )
  231. fifos := newFIFOSet(ctr.bundleDir, containerID, processID, withStdin, spec.Terminal)
  232. defer func() {
  233. if err != nil {
  234. if rio != nil {
  235. rio.Cancel()
  236. rio.Close()
  237. }
  238. rmFIFOSet(fifos)
  239. }
  240. }()
  241. p, err = ctr.task.Exec(ctx, processID, spec, func(id string) (cio.IO, error) {
  242. rio, err = c.createIO(fifos, containerID, processID, stdinCloseSync, attachStdio)
  243. return rio, err
  244. })
  245. if err != nil {
  246. close(stdinCloseSync)
  247. if rio != nil {
  248. rio.Cancel()
  249. rio.Close()
  250. }
  251. return -1, err
  252. }
  253. ctr.Lock()
  254. if ctr.execs == nil {
  255. ctr.execs = make(map[string]containerd.Process)
  256. }
  257. ctr.execs[processID] = p
  258. ctr.Unlock()
  259. // Signal c.createIO that it can call CloseIO
  260. close(stdinCloseSync)
  261. if err = p.Start(ctx); err != nil {
  262. p.Delete(context.Background())
  263. ctr.Lock()
  264. delete(ctr.execs, processID)
  265. ctr.Unlock()
  266. return -1, err
  267. }
  268. return int(p.Pid()), nil
  269. }
  270. func (c *client) SignalProcess(ctx context.Context, containerID, processID string, signal int) error {
  271. p, err := c.getProcess(containerID, processID)
  272. if err != nil {
  273. return err
  274. }
  275. return p.Kill(ctx, syscall.Signal(signal))
  276. }
  277. func (c *client) ResizeTerminal(ctx context.Context, containerID, processID string, width, height int) error {
  278. p, err := c.getProcess(containerID, processID)
  279. if err != nil {
  280. return err
  281. }
  282. return p.Resize(ctx, uint32(width), uint32(height))
  283. }
  284. func (c *client) CloseStdin(ctx context.Context, containerID, processID string) error {
  285. p, err := c.getProcess(containerID, processID)
  286. if err != nil {
  287. return err
  288. }
  289. return p.CloseIO(ctx, containerd.WithStdinCloser)
  290. }
  291. func (c *client) Pause(ctx context.Context, containerID string) error {
  292. p, err := c.getProcess(containerID, InitProcessName)
  293. if err != nil {
  294. return err
  295. }
  296. return p.(containerd.Task).Pause(ctx)
  297. }
  298. func (c *client) Resume(ctx context.Context, containerID string) error {
  299. p, err := c.getProcess(containerID, InitProcessName)
  300. if err != nil {
  301. return err
  302. }
  303. return p.(containerd.Task).Resume(ctx)
  304. }
  305. func (c *client) Stats(ctx context.Context, containerID string) (*Stats, error) {
  306. p, err := c.getProcess(containerID, InitProcessName)
  307. if err != nil {
  308. return nil, err
  309. }
  310. m, err := p.(containerd.Task).Metrics(ctx)
  311. if err != nil {
  312. return nil, err
  313. }
  314. v, err := typeurl.UnmarshalAny(m.Data)
  315. if err != nil {
  316. return nil, err
  317. }
  318. return interfaceToStats(m.Timestamp, v), nil
  319. }
  320. func (c *client) ListPids(ctx context.Context, containerID string) ([]uint32, error) {
  321. p, err := c.getProcess(containerID, InitProcessName)
  322. if err != nil {
  323. return nil, err
  324. }
  325. pis, err := p.(containerd.Task).Pids(ctx)
  326. if err != nil {
  327. return nil, err
  328. }
  329. var pids []uint32
  330. for _, i := range pis {
  331. pids = append(pids, i.Pid)
  332. }
  333. return pids, nil
  334. }
  335. func (c *client) Summary(ctx context.Context, containerID string) ([]Summary, error) {
  336. p, err := c.getProcess(containerID, InitProcessName)
  337. if err != nil {
  338. return nil, err
  339. }
  340. pis, err := p.(containerd.Task).Pids(ctx)
  341. if err != nil {
  342. return nil, err
  343. }
  344. var infos []Summary
  345. for _, pi := range pis {
  346. i, err := typeurl.UnmarshalAny(pi.Info)
  347. if err != nil {
  348. return nil, errors.Wrap(err, "unable to decode process details")
  349. }
  350. s, err := summaryFromInterface(i)
  351. if err != nil {
  352. return nil, err
  353. }
  354. infos = append(infos, *s)
  355. }
  356. return infos, nil
  357. }
  358. func (c *client) DeleteTask(ctx context.Context, containerID string) (uint32, time.Time, error) {
  359. p, err := c.getProcess(containerID, InitProcessName)
  360. if err != nil {
  361. return 255, time.Now(), nil
  362. }
  363. status, err := p.(containerd.Task).Delete(ctx)
  364. if err != nil {
  365. return 255, time.Now(), nil
  366. }
  367. c.Lock()
  368. if ctr, ok := c.containers[containerID]; ok {
  369. ctr.task = nil
  370. }
  371. c.Unlock()
  372. return status.ExitCode(), status.ExitTime(), nil
  373. }
  374. func (c *client) Delete(ctx context.Context, containerID string) error {
  375. ctr := c.getContainer(containerID)
  376. if ctr == nil {
  377. return errors.WithStack(newNotFoundError("no such container"))
  378. }
  379. if err := ctr.ctr.Delete(ctx); err != nil {
  380. return err
  381. }
  382. if os.Getenv("LIBCONTAINERD_NOCLEAN") != "1" {
  383. if err := os.RemoveAll(ctr.bundleDir); err != nil {
  384. c.logger.WithError(err).WithFields(logrus.Fields{
  385. "container": containerID,
  386. "bundle": ctr.bundleDir,
  387. }).Error("failed to remove state dir")
  388. }
  389. }
  390. c.removeContainer(containerID)
  391. return nil
  392. }
  393. func (c *client) Status(ctx context.Context, containerID string) (Status, error) {
  394. ctr := c.getContainer(containerID)
  395. if ctr == nil {
  396. return StatusUnknown, errors.WithStack(newNotFoundError("no such container"))
  397. }
  398. s, err := ctr.task.Status(ctx)
  399. if err != nil {
  400. return StatusUnknown, err
  401. }
  402. return Status(s.Status), nil
  403. }
  404. func (c *client) CreateCheckpoint(ctx context.Context, containerID, checkpointDir string, exit bool) error {
  405. p, err := c.getProcess(containerID, InitProcessName)
  406. if err != nil {
  407. return err
  408. }
  409. img, err := p.(containerd.Task).Checkpoint(ctx)
  410. if err != nil {
  411. return err
  412. }
  413. // Whatever happens, delete the checkpoint from containerd
  414. defer func() {
  415. err := c.remote.ImageService().Delete(context.Background(), img.Name())
  416. if err != nil {
  417. c.logger.WithError(err).WithField("digest", img.Target().Digest).
  418. Warnf("failed to delete checkpoint image")
  419. }
  420. }()
  421. b, err := content.ReadBlob(ctx, c.remote.ContentStore(), img.Target().Digest)
  422. if err != nil {
  423. return wrapSystemError(errors.Wrapf(err, "failed to retrieve checkpoint data"))
  424. }
  425. var index v1.Index
  426. if err := json.Unmarshal(b, &index); err != nil {
  427. return wrapSystemError(errors.Wrapf(err, "failed to decode checkpoint data"))
  428. }
  429. var cpDesc *v1.Descriptor
  430. for _, m := range index.Manifests {
  431. if m.MediaType == images.MediaTypeContainerd1Checkpoint {
  432. cpDesc = &m
  433. break
  434. }
  435. }
  436. if cpDesc == nil {
  437. return wrapSystemError(errors.Wrapf(err, "invalid checkpoint"))
  438. }
  439. rat, err := c.remote.ContentStore().ReaderAt(ctx, cpDesc.Digest)
  440. if err != nil {
  441. return wrapSystemError(errors.Wrapf(err, "failed to get checkpoint reader"))
  442. }
  443. defer rat.Close()
  444. _, err = archive.Apply(ctx, checkpointDir, content.NewReader(rat))
  445. if err != nil {
  446. return wrapSystemError(errors.Wrapf(err, "failed to read checkpoint reader"))
  447. }
  448. return err
  449. }
  450. func (c *client) getContainer(id string) *container {
  451. c.RLock()
  452. ctr := c.containers[id]
  453. c.RUnlock()
  454. return ctr
  455. }
  456. func (c *client) removeContainer(id string) {
  457. c.Lock()
  458. delete(c.containers, id)
  459. c.Unlock()
  460. }
  461. func (c *client) getProcess(containerID, processID string) (containerd.Process, error) {
  462. ctr := c.getContainer(containerID)
  463. switch {
  464. case ctr == nil:
  465. return nil, errors.WithStack(newNotFoundError("no such container"))
  466. case ctr.task == nil:
  467. return nil, errors.WithStack(newNotFoundError("container is not running"))
  468. case processID == InitProcessName:
  469. return ctr.task, nil
  470. default:
  471. ctr.Lock()
  472. defer ctr.Unlock()
  473. if ctr.execs == nil {
  474. return nil, errors.WithStack(newNotFoundError("no execs"))
  475. }
  476. }
  477. p := ctr.execs[processID]
  478. if p == nil {
  479. return nil, errors.WithStack(newNotFoundError("no such exec"))
  480. }
  481. return p, nil
  482. }
  483. // createIO creates the io to be used by a process
  484. // This needs to get a pointer to interface as upon closure the process may not have yet been registered
  485. func (c *client) createIO(fifos *cio.FIFOSet, containerID, processID string, stdinCloseSync chan struct{}, attachStdio StdioCallback) (cio.IO, error) {
  486. io, err := newIOPipe(fifos)
  487. if err != nil {
  488. return nil, err
  489. }
  490. if io.Stdin != nil {
  491. var (
  492. err error
  493. stdinOnce sync.Once
  494. )
  495. pipe := io.Stdin
  496. io.Stdin = ioutils.NewWriteCloserWrapper(pipe, func() error {
  497. stdinOnce.Do(func() {
  498. err = pipe.Close()
  499. // Do the rest in a new routine to avoid a deadlock if the
  500. // Exec/Start call failed.
  501. go func() {
  502. <-stdinCloseSync
  503. p, err := c.getProcess(containerID, processID)
  504. if err == nil {
  505. err = p.CloseIO(context.Background(), containerd.WithStdinCloser)
  506. if err != nil && strings.Contains(err.Error(), "transport is closing") {
  507. err = nil
  508. }
  509. }
  510. }()
  511. })
  512. return err
  513. })
  514. }
  515. rio, err := attachStdio(io)
  516. if err != nil {
  517. io.Cancel()
  518. io.Close()
  519. }
  520. return rio, err
  521. }
  522. func (c *client) processEvent(ctr *container, et EventType, ei EventInfo) {
  523. c.eventQ.append(ei.ContainerID, func() {
  524. err := c.backend.ProcessEvent(ei.ContainerID, et, ei)
  525. if err != nil {
  526. c.logger.WithError(err).WithFields(logrus.Fields{
  527. "container": ei.ContainerID,
  528. "event": et,
  529. "event-info": ei,
  530. }).Error("failed to process event")
  531. }
  532. if et == EventExit && ei.ProcessID != ei.ContainerID {
  533. var p containerd.Process
  534. ctr.Lock()
  535. if ctr.execs != nil {
  536. p = ctr.execs[ei.ProcessID]
  537. }
  538. ctr.Unlock()
  539. if p == nil {
  540. c.logger.WithError(errors.New("no such process")).
  541. WithFields(logrus.Fields{
  542. "container": ei.ContainerID,
  543. "process": ei.ProcessID,
  544. }).Error("exit event")
  545. return
  546. }
  547. _, err = p.Delete(context.Background())
  548. if err != nil {
  549. c.logger.WithError(err).WithFields(logrus.Fields{
  550. "container": ei.ContainerID,
  551. "process": ei.ProcessID,
  552. }).Warn("failed to delete process")
  553. }
  554. c.Lock()
  555. delete(ctr.execs, ei.ProcessID)
  556. c.Unlock()
  557. ctr := c.getContainer(ei.ContainerID)
  558. if ctr == nil {
  559. c.logger.WithFields(logrus.Fields{
  560. "container": ei.ContainerID,
  561. }).Error("failed to find container")
  562. } else {
  563. rmFIFOSet(newFIFOSet(ctr.bundleDir, ei.ContainerID, ei.ProcessID, true, false))
  564. }
  565. }
  566. })
  567. }
  568. func (c *client) processEventStream(ctx context.Context) {
  569. var (
  570. err error
  571. eventStream eventsapi.Events_SubscribeClient
  572. ev *eventsapi.Envelope
  573. et EventType
  574. ei EventInfo
  575. ctr *container
  576. )
  577. defer func() {
  578. if err != nil {
  579. select {
  580. case <-ctx.Done():
  581. c.logger.WithError(ctx.Err()).
  582. Info("stopping event stream following graceful shutdown")
  583. default:
  584. go c.processEventStream(ctx)
  585. }
  586. }
  587. }()
  588. eventStream, err = c.remote.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{
  589. Filters: []string{"namespace==" + c.namespace + ",topic~=/tasks/.+"},
  590. }, grpc.FailFast(false))
  591. if err != nil {
  592. return
  593. }
  594. var oomKilled bool
  595. for {
  596. ev, err = eventStream.Recv()
  597. if err != nil {
  598. errStatus, ok := status.FromError(err)
  599. if !ok || errStatus.Code() != codes.Canceled {
  600. c.logger.WithError(err).Error("failed to get event")
  601. }
  602. return
  603. }
  604. if ev.Event == nil {
  605. c.logger.WithField("event", ev).Warn("invalid event")
  606. continue
  607. }
  608. v, err := typeurl.UnmarshalAny(ev.Event)
  609. if err != nil {
  610. c.logger.WithError(err).WithField("event", ev).Warn("failed to unmarshal event")
  611. continue
  612. }
  613. c.logger.WithField("topic", ev.Topic).Debug("event")
  614. switch t := v.(type) {
  615. case *events.TaskCreate:
  616. et = EventCreate
  617. ei = EventInfo{
  618. ContainerID: t.ContainerID,
  619. ProcessID: t.ContainerID,
  620. Pid: t.Pid,
  621. }
  622. case *events.TaskStart:
  623. et = EventStart
  624. ei = EventInfo{
  625. ContainerID: t.ContainerID,
  626. ProcessID: t.ContainerID,
  627. Pid: t.Pid,
  628. }
  629. case *events.TaskExit:
  630. et = EventExit
  631. ei = EventInfo{
  632. ContainerID: t.ContainerID,
  633. ProcessID: t.ID,
  634. Pid: t.Pid,
  635. ExitCode: t.ExitStatus,
  636. ExitedAt: t.ExitedAt,
  637. }
  638. case *events.TaskOOM:
  639. et = EventOOM
  640. ei = EventInfo{
  641. ContainerID: t.ContainerID,
  642. OOMKilled: true,
  643. }
  644. oomKilled = true
  645. case *events.TaskExecAdded:
  646. et = EventExecAdded
  647. ei = EventInfo{
  648. ContainerID: t.ContainerID,
  649. ProcessID: t.ExecID,
  650. }
  651. case *events.TaskExecStarted:
  652. et = EventExecStarted
  653. ei = EventInfo{
  654. ContainerID: t.ContainerID,
  655. ProcessID: t.ExecID,
  656. Pid: t.Pid,
  657. }
  658. case *events.TaskPaused:
  659. et = EventPaused
  660. ei = EventInfo{
  661. ContainerID: t.ContainerID,
  662. }
  663. case *events.TaskResumed:
  664. et = EventResumed
  665. ei = EventInfo{
  666. ContainerID: t.ContainerID,
  667. }
  668. default:
  669. c.logger.WithFields(logrus.Fields{
  670. "topic": ev.Topic,
  671. "type": reflect.TypeOf(t)},
  672. ).Info("ignoring event")
  673. continue
  674. }
  675. ctr = c.getContainer(ei.ContainerID)
  676. if ctr == nil {
  677. c.logger.WithField("container", ei.ContainerID).Warn("unknown container")
  678. continue
  679. }
  680. if oomKilled {
  681. ctr.oomKilled = true
  682. oomKilled = false
  683. }
  684. ei.OOMKilled = ctr.oomKilled
  685. c.processEvent(ctr, et, ei)
  686. }
  687. }
  688. func (c *client) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*types.Descriptor, error) {
  689. writer, err := c.remote.ContentStore().Writer(ctx, ref, 0, "")
  690. if err != nil {
  691. return nil, err
  692. }
  693. defer writer.Close()
  694. size, err := io.Copy(writer, r)
  695. if err != nil {
  696. return nil, err
  697. }
  698. labels := map[string]string{
  699. "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
  700. }
  701. if err := writer.Commit(ctx, 0, "", content.WithLabels(labels)); err != nil {
  702. return nil, err
  703. }
  704. return &types.Descriptor{
  705. MediaType: mediaType,
  706. Digest: writer.Digest(),
  707. Size_: size,
  708. }, nil
  709. }
  710. func wrapError(err error) error {
  711. if err != nil {
  712. msg := err.Error()
  713. for _, s := range []string{"container does not exist", "not found", "no such container"} {
  714. if strings.Contains(msg, s) {
  715. return wrapNotFoundError(err)
  716. }
  717. }
  718. }
  719. return err
  720. }