runtime.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542
  1. // +build linux
  2. /*
  3. Copyright The containerd Authors.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package linux
  15. import (
  16. "context"
  17. "fmt"
  18. "io/ioutil"
  19. "os"
  20. "path/filepath"
  21. "time"
  22. "github.com/boltdb/bolt"
  23. eventstypes "github.com/containerd/containerd/api/events"
  24. "github.com/containerd/containerd/api/types"
  25. "github.com/containerd/containerd/containers"
  26. "github.com/containerd/containerd/errdefs"
  27. "github.com/containerd/containerd/events/exchange"
  28. "github.com/containerd/containerd/identifiers"
  29. "github.com/containerd/containerd/log"
  30. "github.com/containerd/containerd/metadata"
  31. "github.com/containerd/containerd/mount"
  32. "github.com/containerd/containerd/namespaces"
  33. "github.com/containerd/containerd/platforms"
  34. "github.com/containerd/containerd/plugin"
  35. "github.com/containerd/containerd/runtime"
  36. "github.com/containerd/containerd/runtime/linux/proc"
  37. "github.com/containerd/containerd/runtime/linux/runctypes"
  38. shim "github.com/containerd/containerd/runtime/shim/v1"
  39. runc "github.com/containerd/go-runc"
  40. "github.com/containerd/typeurl"
  41. ptypes "github.com/gogo/protobuf/types"
  42. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  43. "github.com/pkg/errors"
  44. "github.com/sirupsen/logrus"
  45. "golang.org/x/sys/unix"
  46. )
  47. var (
  48. pluginID = fmt.Sprintf("%s.%s", plugin.RuntimePlugin, "linux")
  49. empty = &ptypes.Empty{}
  50. )
  51. const (
  52. configFilename = "config.json"
  53. defaultRuntime = "runc"
  54. defaultShim = "containerd-shim"
  55. )
  56. func init() {
  57. plugin.Register(&plugin.Registration{
  58. Type: plugin.RuntimePlugin,
  59. ID: "linux",
  60. InitFn: New,
  61. Requires: []plugin.Type{
  62. plugin.TaskMonitorPlugin,
  63. plugin.MetadataPlugin,
  64. },
  65. Config: &Config{
  66. Shim: defaultShim,
  67. Runtime: defaultRuntime,
  68. },
  69. })
  70. }
  71. var _ = (runtime.PlatformRuntime)(&Runtime{})
  72. // Config options for the runtime
  73. type Config struct {
  74. // Shim is a path or name of binary implementing the Shim GRPC API
  75. Shim string `toml:"shim"`
  76. // Runtime is a path or name of an OCI runtime used by the shim
  77. Runtime string `toml:"runtime"`
  78. // RuntimeRoot is the path that shall be used by the OCI runtime for its data
  79. RuntimeRoot string `toml:"runtime_root"`
  80. // NoShim calls runc directly from within the pkg
  81. NoShim bool `toml:"no_shim"`
  82. // Debug enable debug on the shim
  83. ShimDebug bool `toml:"shim_debug"`
  84. }
  85. // New returns a configured runtime
  86. func New(ic *plugin.InitContext) (interface{}, error) {
  87. ic.Meta.Platforms = []ocispec.Platform{platforms.DefaultSpec()}
  88. if err := os.MkdirAll(ic.Root, 0711); err != nil {
  89. return nil, err
  90. }
  91. if err := os.MkdirAll(ic.State, 0711); err != nil {
  92. return nil, err
  93. }
  94. monitor, err := ic.Get(plugin.TaskMonitorPlugin)
  95. if err != nil {
  96. return nil, err
  97. }
  98. m, err := ic.Get(plugin.MetadataPlugin)
  99. if err != nil {
  100. return nil, err
  101. }
  102. cfg := ic.Config.(*Config)
  103. r := &Runtime{
  104. root: ic.Root,
  105. state: ic.State,
  106. monitor: monitor.(runtime.TaskMonitor),
  107. tasks: runtime.NewTaskList(),
  108. db: m.(*metadata.DB),
  109. address: ic.Address,
  110. events: ic.Events,
  111. config: cfg,
  112. }
  113. tasks, err := r.restoreTasks(ic.Context)
  114. if err != nil {
  115. return nil, err
  116. }
  117. // TODO: need to add the tasks to the monitor
  118. for _, t := range tasks {
  119. if err := r.tasks.AddWithNamespace(t.namespace, t); err != nil {
  120. return nil, err
  121. }
  122. }
  123. return r, nil
  124. }
  125. // Runtime for a linux based system
  126. type Runtime struct {
  127. root string
  128. state string
  129. address string
  130. monitor runtime.TaskMonitor
  131. tasks *runtime.TaskList
  132. db *metadata.DB
  133. events *exchange.Exchange
  134. config *Config
  135. }
  136. // ID of the runtime
  137. func (r *Runtime) ID() string {
  138. return pluginID
  139. }
  140. // Create a new task
  141. func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts) (_ runtime.Task, err error) {
  142. namespace, err := namespaces.NamespaceRequired(ctx)
  143. if err != nil {
  144. return nil, err
  145. }
  146. if err := identifiers.Validate(id); err != nil {
  147. return nil, errors.Wrapf(err, "invalid task id")
  148. }
  149. ropts, err := r.getRuncOptions(ctx, id)
  150. if err != nil {
  151. return nil, err
  152. }
  153. bundle, err := newBundle(id,
  154. filepath.Join(r.state, namespace),
  155. filepath.Join(r.root, namespace),
  156. opts.Spec.Value)
  157. if err != nil {
  158. return nil, err
  159. }
  160. defer func() {
  161. if err != nil {
  162. bundle.Delete()
  163. }
  164. }()
  165. shimopt := ShimLocal(r.config, r.events)
  166. if !r.config.NoShim {
  167. var cgroup string
  168. if opts.Options != nil {
  169. v, err := typeurl.UnmarshalAny(opts.Options)
  170. if err != nil {
  171. return nil, err
  172. }
  173. cgroup = v.(*runctypes.CreateOptions).ShimCgroup
  174. }
  175. exitHandler := func() {
  176. log.G(ctx).WithField("id", id).Info("shim reaped")
  177. t, err := r.tasks.Get(ctx, id)
  178. if err != nil {
  179. // Task was never started or was already successfully deleted
  180. return
  181. }
  182. lc := t.(*Task)
  183. // Stop the monitor
  184. if err := r.monitor.Stop(lc); err != nil {
  185. log.G(ctx).WithError(err).WithFields(logrus.Fields{
  186. "id": id,
  187. "namespace": namespace,
  188. }).Warn("failed to stop monitor")
  189. }
  190. log.G(ctx).WithFields(logrus.Fields{
  191. "id": id,
  192. "namespace": namespace,
  193. }).Warn("cleaning up after killed shim")
  194. if err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid); err != nil {
  195. log.G(ctx).WithError(err).WithFields(logrus.Fields{
  196. "id": id,
  197. "namespace": namespace,
  198. }).Warn("failed to clen up after killed shim")
  199. }
  200. }
  201. shimopt = ShimRemote(r.config, r.address, cgroup, exitHandler)
  202. }
  203. s, err := bundle.NewShimClient(ctx, namespace, shimopt, ropts)
  204. if err != nil {
  205. return nil, err
  206. }
  207. defer func() {
  208. if err != nil {
  209. if kerr := s.KillShim(ctx); kerr != nil {
  210. log.G(ctx).WithError(err).Error("failed to kill shim")
  211. }
  212. }
  213. }()
  214. rt := r.config.Runtime
  215. if ropts != nil && ropts.Runtime != "" {
  216. rt = ropts.Runtime
  217. }
  218. sopts := &shim.CreateTaskRequest{
  219. ID: id,
  220. Bundle: bundle.path,
  221. Runtime: rt,
  222. Stdin: opts.IO.Stdin,
  223. Stdout: opts.IO.Stdout,
  224. Stderr: opts.IO.Stderr,
  225. Terminal: opts.IO.Terminal,
  226. Checkpoint: opts.Checkpoint,
  227. Options: opts.Options,
  228. }
  229. for _, m := range opts.Rootfs {
  230. sopts.Rootfs = append(sopts.Rootfs, &types.Mount{
  231. Type: m.Type,
  232. Source: m.Source,
  233. Options: m.Options,
  234. })
  235. }
  236. cr, err := s.Create(ctx, sopts)
  237. if err != nil {
  238. return nil, errdefs.FromGRPC(err)
  239. }
  240. t, err := newTask(id, namespace, int(cr.Pid), s, r.monitor, r.events,
  241. proc.NewRunc(ropts.RuntimeRoot, sopts.Bundle, namespace, rt, ropts.CriuPath, ropts.SystemdCgroup))
  242. if err != nil {
  243. return nil, err
  244. }
  245. if err := r.tasks.Add(ctx, t); err != nil {
  246. return nil, err
  247. }
  248. // after the task is created, add it to the monitor if it has a cgroup
  249. // this can be different on a checkpoint/restore
  250. if t.cg != nil {
  251. if err = r.monitor.Monitor(t); err != nil {
  252. if _, err := r.Delete(ctx, t); err != nil {
  253. log.G(ctx).WithError(err).Error("deleting task after failed monitor")
  254. }
  255. return nil, err
  256. }
  257. }
  258. r.events.Publish(ctx, runtime.TaskCreateEventTopic, &eventstypes.TaskCreate{
  259. ContainerID: sopts.ID,
  260. Bundle: sopts.Bundle,
  261. Rootfs: sopts.Rootfs,
  262. IO: &eventstypes.TaskIO{
  263. Stdin: sopts.Stdin,
  264. Stdout: sopts.Stdout,
  265. Stderr: sopts.Stderr,
  266. Terminal: sopts.Terminal,
  267. },
  268. Checkpoint: sopts.Checkpoint,
  269. Pid: uint32(t.pid),
  270. })
  271. return t, nil
  272. }
  273. // Delete a task removing all on disk state
  274. func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, error) {
  275. namespace, err := namespaces.NamespaceRequired(ctx)
  276. if err != nil {
  277. return nil, err
  278. }
  279. lc, ok := c.(*Task)
  280. if !ok {
  281. return nil, fmt.Errorf("task cannot be cast as *linux.Task")
  282. }
  283. if err := r.monitor.Stop(lc); err != nil {
  284. return nil, err
  285. }
  286. bundle := loadBundle(
  287. lc.id,
  288. filepath.Join(r.state, namespace, lc.id),
  289. filepath.Join(r.root, namespace, lc.id),
  290. )
  291. rsp, err := lc.shim.Delete(ctx, empty)
  292. if err != nil {
  293. if cerr := r.cleanupAfterDeadShim(ctx, bundle, namespace, c.ID(), lc.pid); cerr != nil {
  294. log.G(ctx).WithError(err).Error("unable to cleanup task")
  295. }
  296. return nil, errdefs.FromGRPC(err)
  297. }
  298. r.tasks.Delete(ctx, lc.id)
  299. if err := lc.shim.KillShim(ctx); err != nil {
  300. log.G(ctx).WithError(err).Error("failed to kill shim")
  301. }
  302. if err := bundle.Delete(); err != nil {
  303. log.G(ctx).WithError(err).Error("failed to delete bundle")
  304. }
  305. r.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
  306. ContainerID: lc.id,
  307. ExitStatus: rsp.ExitStatus,
  308. ExitedAt: rsp.ExitedAt,
  309. Pid: rsp.Pid,
  310. })
  311. return &runtime.Exit{
  312. Status: rsp.ExitStatus,
  313. Timestamp: rsp.ExitedAt,
  314. Pid: rsp.Pid,
  315. }, nil
  316. }
  317. // Tasks returns all tasks known to the runtime
  318. func (r *Runtime) Tasks(ctx context.Context) ([]runtime.Task, error) {
  319. return r.tasks.GetAll(ctx)
  320. }
  321. func (r *Runtime) restoreTasks(ctx context.Context) ([]*Task, error) {
  322. dir, err := ioutil.ReadDir(r.state)
  323. if err != nil {
  324. return nil, err
  325. }
  326. var o []*Task
  327. for _, namespace := range dir {
  328. if !namespace.IsDir() {
  329. continue
  330. }
  331. name := namespace.Name()
  332. log.G(ctx).WithField("namespace", name).Debug("loading tasks in namespace")
  333. tasks, err := r.loadTasks(ctx, name)
  334. if err != nil {
  335. return nil, err
  336. }
  337. o = append(o, tasks...)
  338. }
  339. return o, nil
  340. }
  341. // Get a specific task by task id
  342. func (r *Runtime) Get(ctx context.Context, id string) (runtime.Task, error) {
  343. return r.tasks.Get(ctx, id)
  344. }
  345. func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
  346. dir, err := ioutil.ReadDir(filepath.Join(r.state, ns))
  347. if err != nil {
  348. return nil, err
  349. }
  350. var o []*Task
  351. for _, path := range dir {
  352. if !path.IsDir() {
  353. continue
  354. }
  355. id := path.Name()
  356. bundle := loadBundle(
  357. id,
  358. filepath.Join(r.state, ns, id),
  359. filepath.Join(r.root, ns, id),
  360. )
  361. ctx = namespaces.WithNamespace(ctx, ns)
  362. pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, proc.InitPidFile))
  363. s, err := bundle.NewShimClient(ctx, ns, ShimConnect(r.config, func() {
  364. err := r.cleanupAfterDeadShim(ctx, bundle, ns, id, pid)
  365. if err != nil {
  366. log.G(ctx).WithError(err).WithField("bundle", bundle.path).
  367. Error("cleaning up after dead shim")
  368. }
  369. }), nil)
  370. if err != nil {
  371. log.G(ctx).WithError(err).WithFields(logrus.Fields{
  372. "id": id,
  373. "namespace": ns,
  374. }).Error("connecting to shim")
  375. err := r.cleanupAfterDeadShim(ctx, bundle, ns, id, pid)
  376. if err != nil {
  377. log.G(ctx).WithError(err).WithField("bundle", bundle.path).
  378. Error("cleaning up after dead shim")
  379. }
  380. continue
  381. }
  382. ropts, err := r.getRuncOptions(ctx, id)
  383. if err != nil {
  384. log.G(ctx).WithError(err).WithField("id", id).
  385. Error("get runtime options")
  386. continue
  387. }
  388. t, err := newTask(id, ns, pid, s, r.monitor, r.events,
  389. proc.NewRunc(ropts.RuntimeRoot, bundle.path, ns, ropts.Runtime, ropts.CriuPath, ropts.SystemdCgroup))
  390. if err != nil {
  391. log.G(ctx).WithError(err).Error("loading task type")
  392. continue
  393. }
  394. o = append(o, t)
  395. }
  396. return o, nil
  397. }
  398. func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, id string, pid int) error {
  399. ctx = namespaces.WithNamespace(ctx, ns)
  400. if err := r.terminate(ctx, bundle, ns, id); err != nil {
  401. if r.config.ShimDebug {
  402. return errors.Wrap(err, "failed to terminate task, leaving bundle for debugging")
  403. }
  404. log.G(ctx).WithError(err).Warn("failed to terminate task")
  405. }
  406. // Notify Client
  407. exitedAt := time.Now().UTC()
  408. r.events.Publish(ctx, runtime.TaskExitEventTopic, &eventstypes.TaskExit{
  409. ContainerID: id,
  410. ID: id,
  411. Pid: uint32(pid),
  412. ExitStatus: 128 + uint32(unix.SIGKILL),
  413. ExitedAt: exitedAt,
  414. })
  415. r.tasks.Delete(ctx, id)
  416. if err := bundle.Delete(); err != nil {
  417. log.G(ctx).WithError(err).Error("delete bundle")
  418. }
  419. r.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
  420. ContainerID: id,
  421. Pid: uint32(pid),
  422. ExitStatus: 128 + uint32(unix.SIGKILL),
  423. ExitedAt: exitedAt,
  424. })
  425. return nil
  426. }
  427. func (r *Runtime) terminate(ctx context.Context, bundle *bundle, ns, id string) error {
  428. rt, err := r.getRuntime(ctx, ns, id)
  429. if err != nil {
  430. return err
  431. }
  432. if err := rt.Delete(ctx, id, &runc.DeleteOpts{
  433. Force: true,
  434. }); err != nil {
  435. log.G(ctx).WithError(err).Warnf("delete runtime state %s", id)
  436. }
  437. if err := mount.Unmount(filepath.Join(bundle.path, "rootfs"), 0); err != nil {
  438. log.G(ctx).WithError(err).WithFields(logrus.Fields{
  439. "path": bundle.path,
  440. "id": id,
  441. }).Warnf("unmount task rootfs")
  442. }
  443. return nil
  444. }
  445. func (r *Runtime) getRuntime(ctx context.Context, ns, id string) (*runc.Runc, error) {
  446. ropts, err := r.getRuncOptions(ctx, id)
  447. if err != nil {
  448. return nil, err
  449. }
  450. var (
  451. cmd = r.config.Runtime
  452. root = proc.RuncRoot
  453. )
  454. if ropts != nil {
  455. if ropts.Runtime != "" {
  456. cmd = ropts.Runtime
  457. }
  458. if ropts.RuntimeRoot != "" {
  459. root = ropts.RuntimeRoot
  460. }
  461. }
  462. return &runc.Runc{
  463. Command: cmd,
  464. LogFormat: runc.JSON,
  465. PdeathSignal: unix.SIGKILL,
  466. Root: filepath.Join(root, ns),
  467. Debug: r.config.ShimDebug,
  468. }, nil
  469. }
  470. func (r *Runtime) getRuncOptions(ctx context.Context, id string) (*runctypes.RuncOptions, error) {
  471. var container containers.Container
  472. if err := r.db.View(func(tx *bolt.Tx) error {
  473. store := metadata.NewContainerStore(tx)
  474. var err error
  475. container, err = store.Get(ctx, id)
  476. return err
  477. }); err != nil {
  478. return nil, err
  479. }
  480. if container.Runtime.Options != nil {
  481. v, err := typeurl.UnmarshalAny(container.Runtime.Options)
  482. if err != nil {
  483. return nil, err
  484. }
  485. ropts, ok := v.(*runctypes.RuncOptions)
  486. if !ok {
  487. return nil, errors.New("invalid runtime options format")
  488. }
  489. return ropts, nil
  490. }
  491. return &runctypes.RuncOptions{}, nil
  492. }