task.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. // +build linux
  2. /*
  3. Copyright The containerd Authors.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package linux
  15. import (
  16. "context"
  17. "sync"
  18. "github.com/containerd/cgroups"
  19. eventstypes "github.com/containerd/containerd/api/events"
  20. "github.com/containerd/containerd/api/types/task"
  21. "github.com/containerd/containerd/errdefs"
  22. "github.com/containerd/containerd/events/exchange"
  23. "github.com/containerd/containerd/identifiers"
  24. "github.com/containerd/containerd/log"
  25. "github.com/containerd/containerd/runtime"
  26. "github.com/containerd/containerd/runtime/v1/shim/client"
  27. "github.com/containerd/containerd/runtime/v1/shim/v1"
  28. "github.com/containerd/ttrpc"
  29. "github.com/containerd/typeurl"
  30. "github.com/gogo/protobuf/types"
  31. "github.com/pkg/errors"
  32. )
  33. // Task on a linux based system
  34. type Task struct {
  35. mu sync.Mutex
  36. id string
  37. pid int
  38. shim *client.Client
  39. namespace string
  40. cg cgroups.Cgroup
  41. events *exchange.Exchange
  42. tasks *runtime.TaskList
  43. bundle *bundle
  44. }
  45. func newTask(id, namespace string, pid int, shim *client.Client, events *exchange.Exchange, list *runtime.TaskList, bundle *bundle) (*Task, error) {
  46. var (
  47. err error
  48. cg cgroups.Cgroup
  49. )
  50. if pid > 0 {
  51. cg, err = cgroups.Load(cgroups.V1, cgroups.PidPath(pid))
  52. if err != nil && err != cgroups.ErrCgroupDeleted {
  53. return nil, err
  54. }
  55. }
  56. return &Task{
  57. id: id,
  58. pid: pid,
  59. shim: shim,
  60. namespace: namespace,
  61. cg: cg,
  62. events: events,
  63. tasks: list,
  64. bundle: bundle,
  65. }, nil
  66. }
  67. // ID of the task
  68. func (t *Task) ID() string {
  69. return t.id
  70. }
  71. // Namespace of the task
  72. func (t *Task) Namespace() string {
  73. return t.namespace
  74. }
  75. // PID of the task
  76. func (t *Task) PID() uint32 {
  77. return uint32(t.pid)
  78. }
  79. // Delete the task and return the exit status
  80. func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) {
  81. rsp, shimErr := t.shim.Delete(ctx, empty)
  82. if shimErr != nil {
  83. shimErr = errdefs.FromGRPC(shimErr)
  84. if !errdefs.IsNotFound(shimErr) {
  85. return nil, shimErr
  86. }
  87. }
  88. t.tasks.Delete(ctx, t.id)
  89. if err := t.shim.KillShim(ctx); err != nil {
  90. log.G(ctx).WithError(err).Error("failed to kill shim")
  91. }
  92. if err := t.bundle.Delete(); err != nil {
  93. log.G(ctx).WithError(err).Error("failed to delete bundle")
  94. }
  95. if shimErr != nil {
  96. return nil, shimErr
  97. }
  98. t.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
  99. ContainerID: t.id,
  100. ExitStatus: rsp.ExitStatus,
  101. ExitedAt: rsp.ExitedAt,
  102. Pid: rsp.Pid,
  103. })
  104. return &runtime.Exit{
  105. Status: rsp.ExitStatus,
  106. Timestamp: rsp.ExitedAt,
  107. Pid: rsp.Pid,
  108. }, nil
  109. }
  110. // Start the task
  111. func (t *Task) Start(ctx context.Context) error {
  112. t.mu.Lock()
  113. hasCgroup := t.cg != nil
  114. t.mu.Unlock()
  115. r, err := t.shim.Start(ctx, &shim.StartRequest{
  116. ID: t.id,
  117. })
  118. if err != nil {
  119. return errdefs.FromGRPC(err)
  120. }
  121. t.pid = int(r.Pid)
  122. if !hasCgroup {
  123. cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(t.pid))
  124. if err != nil && err != cgroups.ErrCgroupDeleted {
  125. return err
  126. }
  127. t.mu.Lock()
  128. if err == cgroups.ErrCgroupDeleted {
  129. t.cg = nil
  130. } else {
  131. t.cg = cg
  132. }
  133. t.mu.Unlock()
  134. }
  135. t.events.Publish(ctx, runtime.TaskStartEventTopic, &eventstypes.TaskStart{
  136. ContainerID: t.id,
  137. Pid: uint32(t.pid),
  138. })
  139. return nil
  140. }
  141. // State returns runtime information for the task
  142. func (t *Task) State(ctx context.Context) (runtime.State, error) {
  143. response, err := t.shim.State(ctx, &shim.StateRequest{
  144. ID: t.id,
  145. })
  146. if err != nil {
  147. if errors.Cause(err) != ttrpc.ErrClosed {
  148. return runtime.State{}, errdefs.FromGRPC(err)
  149. }
  150. return runtime.State{}, errdefs.ErrNotFound
  151. }
  152. var status runtime.Status
  153. switch response.Status {
  154. case task.StatusCreated:
  155. status = runtime.CreatedStatus
  156. case task.StatusRunning:
  157. status = runtime.RunningStatus
  158. case task.StatusStopped:
  159. status = runtime.StoppedStatus
  160. case task.StatusPaused:
  161. status = runtime.PausedStatus
  162. case task.StatusPausing:
  163. status = runtime.PausingStatus
  164. }
  165. return runtime.State{
  166. Pid: response.Pid,
  167. Status: status,
  168. Stdin: response.Stdin,
  169. Stdout: response.Stdout,
  170. Stderr: response.Stderr,
  171. Terminal: response.Terminal,
  172. ExitStatus: response.ExitStatus,
  173. ExitedAt: response.ExitedAt,
  174. }, nil
  175. }
  176. // Pause the task and all processes
  177. func (t *Task) Pause(ctx context.Context) error {
  178. if _, err := t.shim.Pause(ctx, empty); err != nil {
  179. return errdefs.FromGRPC(err)
  180. }
  181. t.events.Publish(ctx, runtime.TaskPausedEventTopic, &eventstypes.TaskPaused{
  182. ContainerID: t.id,
  183. })
  184. return nil
  185. }
  186. // Resume the task and all processes
  187. func (t *Task) Resume(ctx context.Context) error {
  188. if _, err := t.shim.Resume(ctx, empty); err != nil {
  189. return errdefs.FromGRPC(err)
  190. }
  191. t.events.Publish(ctx, runtime.TaskResumedEventTopic, &eventstypes.TaskResumed{
  192. ContainerID: t.id,
  193. })
  194. return nil
  195. }
  196. // Kill the task using the provided signal
  197. //
  198. // Optionally send the signal to all processes that are a child of the task
  199. func (t *Task) Kill(ctx context.Context, signal uint32, all bool) error {
  200. if _, err := t.shim.Kill(ctx, &shim.KillRequest{
  201. ID: t.id,
  202. Signal: signal,
  203. All: all,
  204. }); err != nil {
  205. return errdefs.FromGRPC(err)
  206. }
  207. return nil
  208. }
  209. // Exec creates a new process inside the task
  210. func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) {
  211. if err := identifiers.Validate(id); err != nil {
  212. return nil, errors.Wrapf(err, "invalid exec id")
  213. }
  214. request := &shim.ExecProcessRequest{
  215. ID: id,
  216. Stdin: opts.IO.Stdin,
  217. Stdout: opts.IO.Stdout,
  218. Stderr: opts.IO.Stderr,
  219. Terminal: opts.IO.Terminal,
  220. Spec: opts.Spec,
  221. }
  222. if _, err := t.shim.Exec(ctx, request); err != nil {
  223. return nil, errdefs.FromGRPC(err)
  224. }
  225. return &Process{
  226. id: id,
  227. t: t,
  228. }, nil
  229. }
  230. // Pids returns all system level process ids running inside the task
  231. func (t *Task) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) {
  232. resp, err := t.shim.ListPids(ctx, &shim.ListPidsRequest{
  233. ID: t.id,
  234. })
  235. if err != nil {
  236. return nil, errdefs.FromGRPC(err)
  237. }
  238. var processList []runtime.ProcessInfo
  239. for _, p := range resp.Processes {
  240. processList = append(processList, runtime.ProcessInfo{
  241. Pid: p.Pid,
  242. Info: p.Info,
  243. })
  244. }
  245. return processList, nil
  246. }
  247. // ResizePty changes the side of the task's PTY to the provided width and height
  248. func (t *Task) ResizePty(ctx context.Context, size runtime.ConsoleSize) error {
  249. _, err := t.shim.ResizePty(ctx, &shim.ResizePtyRequest{
  250. ID: t.id,
  251. Width: size.Width,
  252. Height: size.Height,
  253. })
  254. if err != nil {
  255. err = errdefs.FromGRPC(err)
  256. }
  257. return err
  258. }
  259. // CloseIO closes the provided IO on the task
  260. func (t *Task) CloseIO(ctx context.Context) error {
  261. _, err := t.shim.CloseIO(ctx, &shim.CloseIORequest{
  262. ID: t.id,
  263. Stdin: true,
  264. })
  265. if err != nil {
  266. err = errdefs.FromGRPC(err)
  267. }
  268. return err
  269. }
  270. // Checkpoint creates a system level dump of the task and process information that can be later restored
  271. func (t *Task) Checkpoint(ctx context.Context, path string, options *types.Any) error {
  272. r := &shim.CheckpointTaskRequest{
  273. Path: path,
  274. Options: options,
  275. }
  276. if _, err := t.shim.Checkpoint(ctx, r); err != nil {
  277. return errdefs.FromGRPC(err)
  278. }
  279. t.events.Publish(ctx, runtime.TaskCheckpointedEventTopic, &eventstypes.TaskCheckpointed{
  280. ContainerID: t.id,
  281. })
  282. return nil
  283. }
  284. // Update changes runtime information of a running task
  285. func (t *Task) Update(ctx context.Context, resources *types.Any) error {
  286. if _, err := t.shim.Update(ctx, &shim.UpdateTaskRequest{
  287. Resources: resources,
  288. }); err != nil {
  289. return errdefs.FromGRPC(err)
  290. }
  291. return nil
  292. }
  293. // Process returns a specific process inside the task by the process id
  294. func (t *Task) Process(ctx context.Context, id string) (runtime.Process, error) {
  295. p := &Process{
  296. id: id,
  297. t: t,
  298. }
  299. if _, err := p.State(ctx); err != nil {
  300. return nil, err
  301. }
  302. return p, nil
  303. }
  304. // Stats returns runtime specific system level metric information for the task
  305. func (t *Task) Stats(ctx context.Context) (*types.Any, error) {
  306. t.mu.Lock()
  307. defer t.mu.Unlock()
  308. if t.cg == nil {
  309. return nil, errors.Wrap(errdefs.ErrNotFound, "cgroup does not exist")
  310. }
  311. stats, err := t.cg.Stat(cgroups.IgnoreNotExist)
  312. if err != nil {
  313. return nil, err
  314. }
  315. return typeurl.MarshalAny(stats)
  316. }
  317. // Cgroup returns the underlying cgroup for a linux task
  318. func (t *Task) Cgroup() (cgroups.Cgroup, error) {
  319. t.mu.Lock()
  320. defer t.mu.Unlock()
  321. if t.cg == nil {
  322. return nil, errors.Wrap(errdefs.ErrNotFound, "cgroup does not exist")
  323. }
  324. return t.cg, nil
  325. }
  326. // Wait for the task to exit returning the status and timestamp
  327. func (t *Task) Wait(ctx context.Context) (*runtime.Exit, error) {
  328. r, err := t.shim.Wait(ctx, &shim.WaitRequest{
  329. ID: t.id,
  330. })
  331. if err != nil {
  332. return nil, err
  333. }
  334. return &runtime.Exit{
  335. Timestamp: r.ExitedAt,
  336. Status: r.ExitStatus,
  337. }, nil
  338. }