task.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. // +build linux
  2. /*
  3. Copyright The containerd Authors.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package linux
  15. import (
  16. "context"
  17. "sync"
  18. "github.com/containerd/cgroups"
  19. eventstypes "github.com/containerd/containerd/api/events"
  20. "github.com/containerd/containerd/api/types/task"
  21. "github.com/containerd/containerd/errdefs"
  22. "github.com/containerd/containerd/events/exchange"
  23. "github.com/containerd/containerd/identifiers"
  24. "github.com/containerd/containerd/log"
  25. "github.com/containerd/containerd/runtime"
  26. "github.com/containerd/containerd/runtime/v1/shim/client"
  27. "github.com/containerd/containerd/runtime/v1/shim/v1"
  28. "github.com/containerd/ttrpc"
  29. "github.com/containerd/typeurl"
  30. "github.com/gogo/protobuf/types"
  31. "github.com/pkg/errors"
  32. )
  33. // Task on a linux based system
  34. type Task struct {
  35. mu sync.Mutex
  36. id string
  37. pid int
  38. shim *client.Client
  39. namespace string
  40. cg cgroups.Cgroup
  41. events *exchange.Exchange
  42. tasks *runtime.TaskList
  43. bundle *bundle
  44. }
  45. func newTask(id, namespace string, pid int, shim *client.Client, events *exchange.Exchange, list *runtime.TaskList, bundle *bundle) (*Task, error) {
  46. var (
  47. err error
  48. cg cgroups.Cgroup
  49. )
  50. if pid > 0 {
  51. cg, err = cgroups.Load(cgroups.V1, cgroups.PidPath(pid))
  52. if err != nil && err != cgroups.ErrCgroupDeleted {
  53. return nil, err
  54. }
  55. }
  56. return &Task{
  57. id: id,
  58. pid: pid,
  59. shim: shim,
  60. namespace: namespace,
  61. cg: cg,
  62. events: events,
  63. tasks: list,
  64. bundle: bundle,
  65. }, nil
  66. }
  67. // ID of the task
  68. func (t *Task) ID() string {
  69. return t.id
  70. }
  71. // Namespace of the task
  72. func (t *Task) Namespace() string {
  73. return t.namespace
  74. }
  75. // Delete the task and return the exit status
  76. func (t *Task) Delete(ctx context.Context) (*runtime.Exit, error) {
  77. rsp, err := t.shim.Delete(ctx, empty)
  78. if err != nil {
  79. return nil, errdefs.FromGRPC(err)
  80. }
  81. t.tasks.Delete(ctx, t.id)
  82. if err := t.shim.KillShim(ctx); err != nil {
  83. log.G(ctx).WithError(err).Error("failed to kill shim")
  84. }
  85. if err := t.bundle.Delete(); err != nil {
  86. log.G(ctx).WithError(err).Error("failed to delete bundle")
  87. }
  88. t.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
  89. ContainerID: t.id,
  90. ExitStatus: rsp.ExitStatus,
  91. ExitedAt: rsp.ExitedAt,
  92. Pid: rsp.Pid,
  93. })
  94. return &runtime.Exit{
  95. Status: rsp.ExitStatus,
  96. Timestamp: rsp.ExitedAt,
  97. Pid: rsp.Pid,
  98. }, nil
  99. }
  100. // Start the task
  101. func (t *Task) Start(ctx context.Context) error {
  102. t.mu.Lock()
  103. hasCgroup := t.cg != nil
  104. t.mu.Unlock()
  105. r, err := t.shim.Start(ctx, &shim.StartRequest{
  106. ID: t.id,
  107. })
  108. if err != nil {
  109. return errdefs.FromGRPC(err)
  110. }
  111. t.pid = int(r.Pid)
  112. if !hasCgroup {
  113. cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(t.pid))
  114. if err != nil {
  115. return err
  116. }
  117. t.mu.Lock()
  118. t.cg = cg
  119. t.mu.Unlock()
  120. }
  121. t.events.Publish(ctx, runtime.TaskStartEventTopic, &eventstypes.TaskStart{
  122. ContainerID: t.id,
  123. Pid: uint32(t.pid),
  124. })
  125. return nil
  126. }
  127. // State returns runtime information for the task
  128. func (t *Task) State(ctx context.Context) (runtime.State, error) {
  129. response, err := t.shim.State(ctx, &shim.StateRequest{
  130. ID: t.id,
  131. })
  132. if err != nil {
  133. if errors.Cause(err) != ttrpc.ErrClosed {
  134. return runtime.State{}, errdefs.FromGRPC(err)
  135. }
  136. return runtime.State{}, errdefs.ErrNotFound
  137. }
  138. var status runtime.Status
  139. switch response.Status {
  140. case task.StatusCreated:
  141. status = runtime.CreatedStatus
  142. case task.StatusRunning:
  143. status = runtime.RunningStatus
  144. case task.StatusStopped:
  145. status = runtime.StoppedStatus
  146. case task.StatusPaused:
  147. status = runtime.PausedStatus
  148. case task.StatusPausing:
  149. status = runtime.PausingStatus
  150. }
  151. return runtime.State{
  152. Pid: response.Pid,
  153. Status: status,
  154. Stdin: response.Stdin,
  155. Stdout: response.Stdout,
  156. Stderr: response.Stderr,
  157. Terminal: response.Terminal,
  158. ExitStatus: response.ExitStatus,
  159. ExitedAt: response.ExitedAt,
  160. }, nil
  161. }
  162. // Pause the task and all processes
  163. func (t *Task) Pause(ctx context.Context) error {
  164. if _, err := t.shim.Pause(ctx, empty); err != nil {
  165. return errdefs.FromGRPC(err)
  166. }
  167. t.events.Publish(ctx, runtime.TaskPausedEventTopic, &eventstypes.TaskPaused{
  168. ContainerID: t.id,
  169. })
  170. return nil
  171. }
  172. // Resume the task and all processes
  173. func (t *Task) Resume(ctx context.Context) error {
  174. if _, err := t.shim.Resume(ctx, empty); err != nil {
  175. return errdefs.FromGRPC(err)
  176. }
  177. t.events.Publish(ctx, runtime.TaskResumedEventTopic, &eventstypes.TaskResumed{
  178. ContainerID: t.id,
  179. })
  180. return nil
  181. }
  182. // Kill the task using the provided signal
  183. //
  184. // Optionally send the signal to all processes that are a child of the task
  185. func (t *Task) Kill(ctx context.Context, signal uint32, all bool) error {
  186. if _, err := t.shim.Kill(ctx, &shim.KillRequest{
  187. ID: t.id,
  188. Signal: signal,
  189. All: all,
  190. }); err != nil {
  191. return errdefs.FromGRPC(err)
  192. }
  193. return nil
  194. }
  195. // Exec creates a new process inside the task
  196. func (t *Task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) {
  197. if err := identifiers.Validate(id); err != nil {
  198. return nil, errors.Wrapf(err, "invalid exec id")
  199. }
  200. request := &shim.ExecProcessRequest{
  201. ID: id,
  202. Stdin: opts.IO.Stdin,
  203. Stdout: opts.IO.Stdout,
  204. Stderr: opts.IO.Stderr,
  205. Terminal: opts.IO.Terminal,
  206. Spec: opts.Spec,
  207. }
  208. if _, err := t.shim.Exec(ctx, request); err != nil {
  209. return nil, errdefs.FromGRPC(err)
  210. }
  211. return &Process{
  212. id: id,
  213. t: t,
  214. }, nil
  215. }
  216. // Pids returns all system level process ids running inside the task
  217. func (t *Task) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) {
  218. resp, err := t.shim.ListPids(ctx, &shim.ListPidsRequest{
  219. ID: t.id,
  220. })
  221. if err != nil {
  222. return nil, errdefs.FromGRPC(err)
  223. }
  224. var processList []runtime.ProcessInfo
  225. for _, p := range resp.Processes {
  226. processList = append(processList, runtime.ProcessInfo{
  227. Pid: p.Pid,
  228. Info: p.Info,
  229. })
  230. }
  231. return processList, nil
  232. }
  233. // ResizePty changes the side of the task's PTY to the provided width and height
  234. func (t *Task) ResizePty(ctx context.Context, size runtime.ConsoleSize) error {
  235. _, err := t.shim.ResizePty(ctx, &shim.ResizePtyRequest{
  236. ID: t.id,
  237. Width: size.Width,
  238. Height: size.Height,
  239. })
  240. if err != nil {
  241. err = errdefs.FromGRPC(err)
  242. }
  243. return err
  244. }
  245. // CloseIO closes the provided IO on the task
  246. func (t *Task) CloseIO(ctx context.Context) error {
  247. _, err := t.shim.CloseIO(ctx, &shim.CloseIORequest{
  248. ID: t.id,
  249. Stdin: true,
  250. })
  251. if err != nil {
  252. err = errdefs.FromGRPC(err)
  253. }
  254. return err
  255. }
  256. // Checkpoint creates a system level dump of the task and process information that can be later restored
  257. func (t *Task) Checkpoint(ctx context.Context, path string, options *types.Any) error {
  258. r := &shim.CheckpointTaskRequest{
  259. Path: path,
  260. Options: options,
  261. }
  262. if _, err := t.shim.Checkpoint(ctx, r); err != nil {
  263. return errdefs.FromGRPC(err)
  264. }
  265. t.events.Publish(ctx, runtime.TaskCheckpointedEventTopic, &eventstypes.TaskCheckpointed{
  266. ContainerID: t.id,
  267. })
  268. return nil
  269. }
  270. // Update changes runtime information of a running task
  271. func (t *Task) Update(ctx context.Context, resources *types.Any) error {
  272. if _, err := t.shim.Update(ctx, &shim.UpdateTaskRequest{
  273. Resources: resources,
  274. }); err != nil {
  275. return errdefs.FromGRPC(err)
  276. }
  277. return nil
  278. }
  279. // Process returns a specific process inside the task by the process id
  280. func (t *Task) Process(ctx context.Context, id string) (runtime.Process, error) {
  281. p := &Process{
  282. id: id,
  283. t: t,
  284. }
  285. if _, err := p.State(ctx); err != nil {
  286. return nil, err
  287. }
  288. return p, nil
  289. }
  290. // Stats returns runtime specific system level metric information for the task
  291. func (t *Task) Stats(ctx context.Context) (*types.Any, error) {
  292. t.mu.Lock()
  293. defer t.mu.Unlock()
  294. if t.cg == nil {
  295. return nil, errors.Wrap(errdefs.ErrNotFound, "cgroup does not exist")
  296. }
  297. stats, err := t.cg.Stat(cgroups.IgnoreNotExist)
  298. if err != nil {
  299. return nil, err
  300. }
  301. return typeurl.MarshalAny(stats)
  302. }
  303. // Cgroup returns the underlying cgroup for a linux task
  304. func (t *Task) Cgroup() (cgroups.Cgroup, error) {
  305. t.mu.Lock()
  306. defer t.mu.Unlock()
  307. if t.cg == nil {
  308. return nil, errors.Wrap(errdefs.ErrNotFound, "cgroup does not exist")
  309. }
  310. return t.cg, nil
  311. }
  312. // Wait for the task to exit returning the status and timestamp
  313. func (t *Task) Wait(ctx context.Context) (*runtime.Exit, error) {
  314. r, err := t.shim.Wait(ctx, &shim.WaitRequest{
  315. ID: t.id,
  316. })
  317. if err != nil {
  318. return nil, err
  319. }
  320. return &runtime.Exit{
  321. Timestamp: r.ExitedAt,
  322. Status: r.ExitStatus,
  323. }, nil
  324. }