client_linux.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616
  1. package libcontainerd
  2. import (
  3. "fmt"
  4. "os"
  5. "strings"
  6. "sync"
  7. "time"
  8. containerd "github.com/containerd/containerd/api/grpc/types"
  9. containerd_runtime_types "github.com/containerd/containerd/runtime"
  10. "github.com/docker/docker/pkg/ioutils"
  11. "github.com/docker/docker/pkg/mount"
  12. "github.com/golang/protobuf/ptypes"
  13. "github.com/golang/protobuf/ptypes/timestamp"
  14. specs "github.com/opencontainers/runtime-spec/specs-go"
  15. "github.com/sirupsen/logrus"
  16. "golang.org/x/net/context"
  17. "golang.org/x/sys/unix"
  18. )
  19. type client struct {
  20. clientCommon
  21. // Platform specific properties below here.
  22. remote *remote
  23. q queue
  24. exitNotifiers map[string]*exitNotifier
  25. liveRestore bool
  26. }
  27. // GetServerVersion returns the connected server version information
  28. func (clnt *client) GetServerVersion(ctx context.Context) (*ServerVersion, error) {
  29. resp, err := clnt.remote.apiClient.GetServerVersion(ctx, &containerd.GetServerVersionRequest{})
  30. if err != nil {
  31. return nil, err
  32. }
  33. sv := &ServerVersion{
  34. GetServerVersionResponse: *resp,
  35. }
  36. return sv, nil
  37. }
  38. // AddProcess is the handler for adding a process to an already running
  39. // container. It's called through docker exec. It returns the system pid of the
  40. // exec'd process.
  41. func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) (pid int, err error) {
  42. clnt.lock(containerID)
  43. defer clnt.unlock(containerID)
  44. container, err := clnt.getContainer(containerID)
  45. if err != nil {
  46. return -1, err
  47. }
  48. spec, err := container.spec()
  49. if err != nil {
  50. return -1, err
  51. }
  52. sp := spec.Process
  53. sp.Args = specp.Args
  54. sp.Terminal = specp.Terminal
  55. if len(specp.Env) > 0 {
  56. sp.Env = specp.Env
  57. }
  58. if specp.Cwd != nil {
  59. sp.Cwd = *specp.Cwd
  60. }
  61. if specp.User != nil {
  62. sp.User = specs.User{
  63. UID: specp.User.UID,
  64. GID: specp.User.GID,
  65. AdditionalGids: specp.User.AdditionalGids,
  66. }
  67. }
  68. if specp.Capabilities != nil {
  69. sp.Capabilities.Bounding = specp.Capabilities
  70. sp.Capabilities.Effective = specp.Capabilities
  71. sp.Capabilities.Inheritable = specp.Capabilities
  72. sp.Capabilities.Permitted = specp.Capabilities
  73. }
  74. p := container.newProcess(processFriendlyName)
  75. r := &containerd.AddProcessRequest{
  76. Args: sp.Args,
  77. Cwd: sp.Cwd,
  78. Terminal: sp.Terminal,
  79. Id: containerID,
  80. Env: sp.Env,
  81. User: &containerd.User{
  82. Uid: sp.User.UID,
  83. Gid: sp.User.GID,
  84. AdditionalGids: sp.User.AdditionalGids,
  85. },
  86. Pid: processFriendlyName,
  87. Stdin: p.fifo(unix.Stdin),
  88. Stdout: p.fifo(unix.Stdout),
  89. Stderr: p.fifo(unix.Stderr),
  90. Capabilities: sp.Capabilities.Effective,
  91. ApparmorProfile: sp.ApparmorProfile,
  92. SelinuxLabel: sp.SelinuxLabel,
  93. NoNewPrivileges: sp.NoNewPrivileges,
  94. Rlimits: convertRlimits(sp.Rlimits),
  95. }
  96. fifoCtx, cancel := context.WithCancel(context.Background())
  97. defer func() {
  98. if err != nil {
  99. cancel()
  100. }
  101. }()
  102. iopipe, err := p.openFifos(fifoCtx, sp.Terminal)
  103. if err != nil {
  104. return -1, err
  105. }
  106. resp, err := clnt.remote.apiClient.AddProcess(ctx, r)
  107. if err != nil {
  108. p.closeFifos(iopipe)
  109. return -1, err
  110. }
  111. var stdinOnce sync.Once
  112. stdin := iopipe.Stdin
  113. iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
  114. var err error
  115. stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
  116. err = stdin.Close()
  117. if err2 := p.sendCloseStdin(); err == nil {
  118. err = err2
  119. }
  120. })
  121. return err
  122. })
  123. container.processes[processFriendlyName] = p
  124. if err := attachStdio(*iopipe); err != nil {
  125. p.closeFifos(iopipe)
  126. return -1, err
  127. }
  128. return int(resp.SystemPid), nil
  129. }
  130. func (clnt *client) SignalProcess(containerID string, pid string, sig int) error {
  131. clnt.lock(containerID)
  132. defer clnt.unlock(containerID)
  133. _, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
  134. Id: containerID,
  135. Pid: pid,
  136. Signal: uint32(sig),
  137. })
  138. return err
  139. }
  140. func (clnt *client) Resize(containerID, processFriendlyName string, width, height int) error {
  141. clnt.lock(containerID)
  142. defer clnt.unlock(containerID)
  143. if _, err := clnt.getContainer(containerID); err != nil {
  144. return err
  145. }
  146. _, err := clnt.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
  147. Id: containerID,
  148. Pid: processFriendlyName,
  149. Width: uint32(width),
  150. Height: uint32(height),
  151. })
  152. return err
  153. }
  154. func (clnt *client) Pause(containerID string) error {
  155. return clnt.setState(containerID, StatePause)
  156. }
  157. func (clnt *client) setState(containerID, state string) error {
  158. clnt.lock(containerID)
  159. container, err := clnt.getContainer(containerID)
  160. if err != nil {
  161. clnt.unlock(containerID)
  162. return err
  163. }
  164. if container.systemPid == 0 {
  165. clnt.unlock(containerID)
  166. return fmt.Errorf("No active process for container %s", containerID)
  167. }
  168. st := "running"
  169. if state == StatePause {
  170. st = "paused"
  171. }
  172. chstate := make(chan struct{})
  173. _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
  174. Id: containerID,
  175. Pid: InitFriendlyName,
  176. Status: st,
  177. })
  178. if err != nil {
  179. clnt.unlock(containerID)
  180. return err
  181. }
  182. container.pauseMonitor.append(state, chstate)
  183. clnt.unlock(containerID)
  184. <-chstate
  185. return nil
  186. }
  187. func (clnt *client) Resume(containerID string) error {
  188. return clnt.setState(containerID, StateResume)
  189. }
  190. func (clnt *client) Stats(containerID string) (*Stats, error) {
  191. resp, err := clnt.remote.apiClient.Stats(context.Background(), &containerd.StatsRequest{containerID})
  192. if err != nil {
  193. return nil, err
  194. }
  195. return (*Stats)(resp), nil
  196. }
  197. // Take care of the old 1.11.0 behavior in case the version upgrade
  198. // happened without a clean daemon shutdown
  199. func (clnt *client) cleanupOldRootfs(containerID string) {
  200. // Unmount and delete the bundle folder
  201. if mts, err := mount.GetMounts(); err == nil {
  202. for _, mts := range mts {
  203. if strings.HasSuffix(mts.Mountpoint, containerID+"/rootfs") {
  204. if err := unix.Unmount(mts.Mountpoint, unix.MNT_DETACH); err == nil {
  205. os.RemoveAll(strings.TrimSuffix(mts.Mountpoint, "/rootfs"))
  206. }
  207. break
  208. }
  209. }
  210. }
  211. }
  212. func (clnt *client) setExited(containerID string, exitCode uint32) error {
  213. clnt.lock(containerID)
  214. defer clnt.unlock(containerID)
  215. err := clnt.backend.StateChanged(containerID, StateInfo{
  216. CommonStateInfo: CommonStateInfo{
  217. State: StateExit,
  218. ExitCode: exitCode,
  219. }})
  220. clnt.cleanupOldRootfs(containerID)
  221. return err
  222. }
  223. func (clnt *client) GetPidsForContainer(containerID string) ([]int, error) {
  224. cont, err := clnt.getContainerdContainer(containerID)
  225. if err != nil {
  226. return nil, err
  227. }
  228. pids := make([]int, len(cont.Pids))
  229. for i, p := range cont.Pids {
  230. pids[i] = int(p)
  231. }
  232. return pids, nil
  233. }
  234. // Summary returns a summary of the processes running in a container.
  235. // This is a no-op on Linux.
  236. func (clnt *client) Summary(containerID string) ([]Summary, error) {
  237. return nil, nil
  238. }
  239. func (clnt *client) getContainerdContainer(containerID string) (*containerd.Container, error) {
  240. resp, err := clnt.remote.apiClient.State(context.Background(), &containerd.StateRequest{Id: containerID})
  241. if err != nil {
  242. return nil, err
  243. }
  244. for _, cont := range resp.Containers {
  245. if cont.Id == containerID {
  246. return cont, nil
  247. }
  248. }
  249. return nil, fmt.Errorf("invalid state response")
  250. }
  251. func (clnt *client) UpdateResources(containerID string, resources Resources) error {
  252. clnt.lock(containerID)
  253. defer clnt.unlock(containerID)
  254. container, err := clnt.getContainer(containerID)
  255. if err != nil {
  256. return err
  257. }
  258. if container.systemPid == 0 {
  259. return fmt.Errorf("No active process for container %s", containerID)
  260. }
  261. _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
  262. Id: containerID,
  263. Pid: InitFriendlyName,
  264. Resources: (*containerd.UpdateResource)(&resources),
  265. })
  266. return err
  267. }
  268. func (clnt *client) getExitNotifier(containerID string) *exitNotifier {
  269. clnt.mapMutex.RLock()
  270. defer clnt.mapMutex.RUnlock()
  271. return clnt.exitNotifiers[containerID]
  272. }
  273. func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
  274. clnt.mapMutex.Lock()
  275. w, ok := clnt.exitNotifiers[containerID]
  276. defer clnt.mapMutex.Unlock()
  277. if !ok {
  278. w = &exitNotifier{c: make(chan struct{}), client: clnt}
  279. clnt.exitNotifiers[containerID] = w
  280. }
  281. return w
  282. }
  283. func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, attachStdio StdioCallback, options ...CreateOption) (err error) {
  284. clnt.lock(cont.Id)
  285. defer clnt.unlock(cont.Id)
  286. logrus.Debugf("libcontainerd: restore container %s state %s", cont.Id, cont.Status)
  287. containerID := cont.Id
  288. if _, err := clnt.getContainer(containerID); err == nil {
  289. return fmt.Errorf("container %s is already active", containerID)
  290. }
  291. defer func() {
  292. if err != nil {
  293. clnt.deleteContainer(cont.Id)
  294. }
  295. }()
  296. container := clnt.newContainer(cont.BundlePath, options...)
  297. container.systemPid = systemPid(cont)
  298. var terminal bool
  299. for _, p := range cont.Processes {
  300. if p.Pid == InitFriendlyName {
  301. terminal = p.Terminal
  302. }
  303. }
  304. fifoCtx, cancel := context.WithCancel(context.Background())
  305. defer func() {
  306. if err != nil {
  307. cancel()
  308. }
  309. }()
  310. iopipe, err := container.openFifos(fifoCtx, terminal)
  311. if err != nil {
  312. return err
  313. }
  314. var stdinOnce sync.Once
  315. stdin := iopipe.Stdin
  316. iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
  317. var err error
  318. stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
  319. err = stdin.Close()
  320. })
  321. return err
  322. })
  323. if err := attachStdio(*iopipe); err != nil {
  324. container.closeFifos(iopipe)
  325. return err
  326. }
  327. clnt.appendContainer(container)
  328. err = clnt.backend.StateChanged(containerID, StateInfo{
  329. CommonStateInfo: CommonStateInfo{
  330. State: StateRestore,
  331. Pid: container.systemPid,
  332. }})
  333. if err != nil {
  334. container.closeFifos(iopipe)
  335. return err
  336. }
  337. if lastEvent != nil {
  338. // This should only be a pause or resume event
  339. if lastEvent.Type == StatePause || lastEvent.Type == StateResume {
  340. return clnt.backend.StateChanged(containerID, StateInfo{
  341. CommonStateInfo: CommonStateInfo{
  342. State: lastEvent.Type,
  343. Pid: container.systemPid,
  344. }})
  345. }
  346. logrus.Warnf("libcontainerd: unexpected backlog event: %#v", lastEvent)
  347. }
  348. return nil
  349. }
  350. func (clnt *client) getContainerLastEventSinceTime(id string, tsp *timestamp.Timestamp) (*containerd.Event, error) {
  351. er := &containerd.EventsRequest{
  352. Timestamp: tsp,
  353. StoredOnly: true,
  354. Id: id,
  355. }
  356. events, err := clnt.remote.apiClient.Events(context.Background(), er)
  357. if err != nil {
  358. logrus.Errorf("libcontainerd: failed to get container events stream for %s: %q", er.Id, err)
  359. return nil, err
  360. }
  361. var ev *containerd.Event
  362. for {
  363. e, err := events.Recv()
  364. if err != nil {
  365. if err.Error() == "EOF" {
  366. break
  367. }
  368. logrus.Errorf("libcontainerd: failed to get container event for %s: %q", id, err)
  369. return nil, err
  370. }
  371. ev = e
  372. logrus.Debugf("libcontainerd: received past event %#v", ev)
  373. }
  374. return ev, nil
  375. }
  376. func (clnt *client) getContainerLastEvent(id string) (*containerd.Event, error) {
  377. ev, err := clnt.getContainerLastEventSinceTime(id, clnt.remote.restoreFromTimestamp)
  378. if err == nil && ev == nil {
  379. // If ev is nil and the container is running in containerd,
  380. // we already consumed all the event of the
  381. // container, included the "exit" one.
  382. // Thus, we request all events containerd has in memory for
  383. // this container in order to get the last one (which should
  384. // be an exit event)
  385. logrus.Warnf("libcontainerd: client is out of sync, restore was called on a fully synced container (%s).", id)
  386. // Request all events since beginning of time
  387. t := time.Unix(0, 0)
  388. tsp, err := ptypes.TimestampProto(t)
  389. if err != nil {
  390. logrus.Errorf("libcontainerd: getLastEventSinceTime() failed to convert timestamp: %q", err)
  391. return nil, err
  392. }
  393. return clnt.getContainerLastEventSinceTime(id, tsp)
  394. }
  395. return ev, err
  396. }
  397. func (clnt *client) Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error {
  398. // Synchronize with live events
  399. clnt.remote.Lock()
  400. defer clnt.remote.Unlock()
  401. // Check that containerd still knows this container.
  402. //
  403. // In the unlikely event that Restore for this container process
  404. // the its past event before the main loop, the event will be
  405. // processed twice. However, this is not an issue as all those
  406. // events will do is change the state of the container to be
  407. // exactly the same.
  408. cont, err := clnt.getContainerdContainer(containerID)
  409. // Get its last event
  410. ev, eerr := clnt.getContainerLastEvent(containerID)
  411. if err != nil || containerd_runtime_types.State(cont.Status) == containerd_runtime_types.Stopped {
  412. if err != nil {
  413. logrus.Warnf("libcontainerd: failed to retrieve container %s state: %v", containerID, err)
  414. }
  415. if ev != nil && (ev.Pid != InitFriendlyName || ev.Type != StateExit) {
  416. // Wait a while for the exit event
  417. timeout := time.NewTimer(10 * time.Second)
  418. tick := time.NewTicker(100 * time.Millisecond)
  419. stop:
  420. for {
  421. select {
  422. case <-timeout.C:
  423. break stop
  424. case <-tick.C:
  425. ev, eerr = clnt.getContainerLastEvent(containerID)
  426. if eerr != nil {
  427. break stop
  428. }
  429. if ev != nil && ev.Pid == InitFriendlyName && ev.Type == StateExit {
  430. break stop
  431. }
  432. }
  433. }
  434. timeout.Stop()
  435. tick.Stop()
  436. }
  437. // get the exit status for this container, if we don't have
  438. // one, indicate an error
  439. ec := uint32(255)
  440. if eerr == nil && ev != nil && ev.Pid == InitFriendlyName && ev.Type == StateExit {
  441. ec = ev.Status
  442. }
  443. clnt.setExited(containerID, ec)
  444. return nil
  445. }
  446. // container is still alive
  447. if clnt.liveRestore {
  448. if err := clnt.restore(cont, ev, attachStdio, options...); err != nil {
  449. logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err)
  450. }
  451. return nil
  452. }
  453. // Kill the container if liveRestore == false
  454. w := clnt.getOrCreateExitNotifier(containerID)
  455. clnt.lock(cont.Id)
  456. container := clnt.newContainer(cont.BundlePath)
  457. container.systemPid = systemPid(cont)
  458. clnt.appendContainer(container)
  459. clnt.unlock(cont.Id)
  460. container.discardFifos()
  461. if err := clnt.Signal(containerID, int(unix.SIGTERM)); err != nil {
  462. logrus.Errorf("libcontainerd: error sending sigterm to %v: %v", containerID, err)
  463. }
  464. // Let the main loop handle the exit event
  465. clnt.remote.Unlock()
  466. if ev != nil && ev.Type == StatePause {
  467. // resume container, it depends on the main loop, so we do it after Unlock()
  468. logrus.Debugf("libcontainerd: %s was paused, resuming it so it can die", containerID)
  469. if err := clnt.Resume(containerID); err != nil {
  470. return fmt.Errorf("failed to resume container: %v", err)
  471. }
  472. }
  473. select {
  474. case <-time.After(10 * time.Second):
  475. if err := clnt.Signal(containerID, int(unix.SIGKILL)); err != nil {
  476. logrus.Errorf("libcontainerd: error sending sigkill to %v: %v", containerID, err)
  477. }
  478. select {
  479. case <-time.After(2 * time.Second):
  480. case <-w.wait():
  481. // relock because of the defer
  482. clnt.remote.Lock()
  483. return nil
  484. }
  485. case <-w.wait():
  486. // relock because of the defer
  487. clnt.remote.Lock()
  488. return nil
  489. }
  490. // relock because of the defer
  491. clnt.remote.Lock()
  492. clnt.deleteContainer(containerID)
  493. return clnt.setExited(containerID, uint32(255))
  494. }
  495. func (clnt *client) CreateCheckpoint(containerID string, checkpointID string, checkpointDir string, exit bool) error {
  496. clnt.lock(containerID)
  497. defer clnt.unlock(containerID)
  498. if _, err := clnt.getContainer(containerID); err != nil {
  499. return err
  500. }
  501. _, err := clnt.remote.apiClient.CreateCheckpoint(context.Background(), &containerd.CreateCheckpointRequest{
  502. Id: containerID,
  503. Checkpoint: &containerd.Checkpoint{
  504. Name: checkpointID,
  505. Exit: exit,
  506. Tcp: true,
  507. UnixSockets: true,
  508. Shell: false,
  509. EmptyNS: []string{"network"},
  510. },
  511. CheckpointDir: checkpointDir,
  512. })
  513. return err
  514. }
  515. func (clnt *client) DeleteCheckpoint(containerID string, checkpointID string, checkpointDir string) error {
  516. clnt.lock(containerID)
  517. defer clnt.unlock(containerID)
  518. if _, err := clnt.getContainer(containerID); err != nil {
  519. return err
  520. }
  521. _, err := clnt.remote.apiClient.DeleteCheckpoint(context.Background(), &containerd.DeleteCheckpointRequest{
  522. Id: containerID,
  523. Name: checkpointID,
  524. CheckpointDir: checkpointDir,
  525. })
  526. return err
  527. }
  528. func (clnt *client) ListCheckpoints(containerID string, checkpointDir string) (*Checkpoints, error) {
  529. clnt.lock(containerID)
  530. defer clnt.unlock(containerID)
  531. if _, err := clnt.getContainer(containerID); err != nil {
  532. return nil, err
  533. }
  534. resp, err := clnt.remote.apiClient.ListCheckpoint(context.Background(), &containerd.ListCheckpointRequest{
  535. Id: containerID,
  536. CheckpointDir: checkpointDir,
  537. })
  538. if err != nil {
  539. return nil, err
  540. }
  541. return (*Checkpoints)(resp), nil
  542. }