client_linux.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619
  1. package libcontainerd
  2. import (
  3. "fmt"
  4. "os"
  5. "strings"
  6. "sync"
  7. "syscall"
  8. "time"
  9. "github.com/Sirupsen/logrus"
  10. containerd "github.com/containerd/containerd/api/grpc/types"
  11. containerd_runtime_types "github.com/containerd/containerd/runtime"
  12. "github.com/docker/docker/pkg/ioutils"
  13. "github.com/docker/docker/pkg/mount"
  14. "github.com/golang/protobuf/ptypes"
  15. "github.com/golang/protobuf/ptypes/timestamp"
  16. specs "github.com/opencontainers/runtime-spec/specs-go"
  17. "golang.org/x/net/context"
  18. )
  19. type client struct {
  20. clientCommon
  21. // Platform specific properties below here.
  22. remote *remote
  23. q queue
  24. exitNotifiers map[string]*exitNotifier
  25. liveRestore bool
  26. }
  27. // GetServerVersion returns the connected server version information
  28. func (clnt *client) GetServerVersion(ctx context.Context) (*ServerVersion, error) {
  29. resp, err := clnt.remote.apiClient.GetServerVersion(ctx, &containerd.GetServerVersionRequest{})
  30. if err != nil {
  31. return nil, err
  32. }
  33. sv := &ServerVersion{
  34. GetServerVersionResponse: *resp,
  35. }
  36. return sv, nil
  37. }
  38. // AddProcess is the handler for adding a process to an already running
  39. // container. It's called through docker exec. It returns the system pid of the
  40. // exec'd process.
  41. func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) (pid int, err error) {
  42. clnt.lock(containerID)
  43. defer clnt.unlock(containerID)
  44. container, err := clnt.getContainer(containerID)
  45. if err != nil {
  46. return -1, err
  47. }
  48. spec, err := container.spec()
  49. if err != nil {
  50. return -1, err
  51. }
  52. sp := spec.Process
  53. sp.Args = specp.Args
  54. sp.Terminal = specp.Terminal
  55. if len(specp.Env) > 0 {
  56. sp.Env = specp.Env
  57. }
  58. if specp.Cwd != nil {
  59. sp.Cwd = *specp.Cwd
  60. }
  61. if specp.User != nil {
  62. sp.User = specs.User{
  63. UID: specp.User.UID,
  64. GID: specp.User.GID,
  65. AdditionalGids: specp.User.AdditionalGids,
  66. }
  67. }
  68. if specp.Capabilities != nil {
  69. sp.Capabilities.Bounding = specp.Capabilities
  70. sp.Capabilities.Effective = specp.Capabilities
  71. sp.Capabilities.Inheritable = specp.Capabilities
  72. sp.Capabilities.Permitted = specp.Capabilities
  73. }
  74. p := container.newProcess(processFriendlyName)
  75. r := &containerd.AddProcessRequest{
  76. Args: sp.Args,
  77. Cwd: sp.Cwd,
  78. Terminal: sp.Terminal,
  79. Id: containerID,
  80. Env: sp.Env,
  81. User: &containerd.User{
  82. Uid: sp.User.UID,
  83. Gid: sp.User.GID,
  84. AdditionalGids: sp.User.AdditionalGids,
  85. },
  86. Pid: processFriendlyName,
  87. Stdin: p.fifo(syscall.Stdin),
  88. Stdout: p.fifo(syscall.Stdout),
  89. Stderr: p.fifo(syscall.Stderr),
  90. Capabilities: sp.Capabilities.Effective,
  91. ApparmorProfile: sp.ApparmorProfile,
  92. SelinuxLabel: sp.SelinuxLabel,
  93. NoNewPrivileges: sp.NoNewPrivileges,
  94. Rlimits: convertRlimits(sp.Rlimits),
  95. }
  96. fifoCtx, cancel := context.WithCancel(context.Background())
  97. defer func() {
  98. if err != nil {
  99. cancel()
  100. }
  101. }()
  102. iopipe, err := p.openFifos(fifoCtx, sp.Terminal)
  103. if err != nil {
  104. return -1, err
  105. }
  106. resp, err := clnt.remote.apiClient.AddProcess(ctx, r)
  107. if err != nil {
  108. p.closeFifos(iopipe)
  109. return -1, err
  110. }
  111. var stdinOnce sync.Once
  112. stdin := iopipe.Stdin
  113. iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
  114. var err error
  115. stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
  116. err = stdin.Close()
  117. if err2 := p.sendCloseStdin(); err == nil {
  118. err = err2
  119. }
  120. })
  121. return err
  122. })
  123. container.processes[processFriendlyName] = p
  124. if err := attachStdio(*iopipe); err != nil {
  125. p.closeFifos(iopipe)
  126. return -1, err
  127. }
  128. return int(resp.SystemPid), nil
  129. }
  130. func (clnt *client) SignalProcess(containerID string, pid string, sig int) error {
  131. clnt.lock(containerID)
  132. defer clnt.unlock(containerID)
  133. _, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
  134. Id: containerID,
  135. Pid: pid,
  136. Signal: uint32(sig),
  137. })
  138. return err
  139. }
  140. func (clnt *client) Resize(containerID, processFriendlyName string, width, height int) error {
  141. clnt.lock(containerID)
  142. defer clnt.unlock(containerID)
  143. if _, err := clnt.getContainer(containerID); err != nil {
  144. return err
  145. }
  146. _, err := clnt.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
  147. Id: containerID,
  148. Pid: processFriendlyName,
  149. Width: uint32(width),
  150. Height: uint32(height),
  151. })
  152. return err
  153. }
  154. func (clnt *client) Pause(containerID string) error {
  155. return clnt.setState(containerID, StatePause)
  156. }
  157. func (clnt *client) setState(containerID, state string) error {
  158. clnt.lock(containerID)
  159. container, err := clnt.getContainer(containerID)
  160. if err != nil {
  161. clnt.unlock(containerID)
  162. return err
  163. }
  164. if container.systemPid == 0 {
  165. clnt.unlock(containerID)
  166. return fmt.Errorf("No active process for container %s", containerID)
  167. }
  168. st := "running"
  169. if state == StatePause {
  170. st = "paused"
  171. }
  172. chstate := make(chan struct{})
  173. _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
  174. Id: containerID,
  175. Pid: InitFriendlyName,
  176. Status: st,
  177. })
  178. if err != nil {
  179. clnt.unlock(containerID)
  180. return err
  181. }
  182. container.pauseMonitor.append(state, chstate)
  183. clnt.unlock(containerID)
  184. <-chstate
  185. return nil
  186. }
  187. func (clnt *client) Resume(containerID string) error {
  188. return clnt.setState(containerID, StateResume)
  189. }
  190. func (clnt *client) Stats(containerID string) (*Stats, error) {
  191. resp, err := clnt.remote.apiClient.Stats(context.Background(), &containerd.StatsRequest{containerID})
  192. if err != nil {
  193. return nil, err
  194. }
  195. return (*Stats)(resp), nil
  196. }
  197. // Take care of the old 1.11.0 behavior in case the version upgrade
  198. // happened without a clean daemon shutdown
  199. func (clnt *client) cleanupOldRootfs(containerID string) {
  200. // Unmount and delete the bundle folder
  201. if mts, err := mount.GetMounts(); err == nil {
  202. for _, mts := range mts {
  203. if strings.HasSuffix(mts.Mountpoint, containerID+"/rootfs") {
  204. if err := syscall.Unmount(mts.Mountpoint, syscall.MNT_DETACH); err == nil {
  205. os.RemoveAll(strings.TrimSuffix(mts.Mountpoint, "/rootfs"))
  206. }
  207. break
  208. }
  209. }
  210. }
  211. }
  212. func (clnt *client) setExited(containerID string, exitCode uint32) error {
  213. clnt.lock(containerID)
  214. defer clnt.unlock(containerID)
  215. err := clnt.backend.StateChanged(containerID, StateInfo{
  216. CommonStateInfo: CommonStateInfo{
  217. State: StateExit,
  218. ExitCode: exitCode,
  219. }})
  220. clnt.cleanupOldRootfs(containerID)
  221. return err
  222. }
  223. func (clnt *client) GetPidsForContainer(containerID string) ([]int, error) {
  224. cont, err := clnt.getContainerdContainer(containerID)
  225. if err != nil {
  226. return nil, err
  227. }
  228. pids := make([]int, len(cont.Pids))
  229. for i, p := range cont.Pids {
  230. pids[i] = int(p)
  231. }
  232. return pids, nil
  233. }
  234. // Summary returns a summary of the processes running in a container.
  235. // This is a no-op on Linux.
  236. func (clnt *client) Summary(containerID string) ([]Summary, error) {
  237. return nil, nil
  238. }
  239. func (clnt *client) getContainerdContainer(containerID string) (*containerd.Container, error) {
  240. resp, err := clnt.remote.apiClient.State(context.Background(), &containerd.StateRequest{Id: containerID})
  241. if err != nil {
  242. return nil, err
  243. }
  244. for _, cont := range resp.Containers {
  245. if cont.Id == containerID {
  246. return cont, nil
  247. }
  248. }
  249. return nil, fmt.Errorf("invalid state response")
  250. }
  251. func (clnt *client) UpdateResources(containerID string, resources Resources) error {
  252. clnt.lock(containerID)
  253. defer clnt.unlock(containerID)
  254. container, err := clnt.getContainer(containerID)
  255. if err != nil {
  256. return err
  257. }
  258. if container.systemPid == 0 {
  259. return fmt.Errorf("No active process for container %s", containerID)
  260. }
  261. _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
  262. Id: containerID,
  263. Pid: InitFriendlyName,
  264. Resources: (*containerd.UpdateResource)(&resources),
  265. })
  266. if err != nil {
  267. return err
  268. }
  269. return nil
  270. }
  271. func (clnt *client) getExitNotifier(containerID string) *exitNotifier {
  272. clnt.mapMutex.RLock()
  273. defer clnt.mapMutex.RUnlock()
  274. return clnt.exitNotifiers[containerID]
  275. }
  276. func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
  277. clnt.mapMutex.Lock()
  278. w, ok := clnt.exitNotifiers[containerID]
  279. defer clnt.mapMutex.Unlock()
  280. if !ok {
  281. w = &exitNotifier{c: make(chan struct{}), client: clnt}
  282. clnt.exitNotifiers[containerID] = w
  283. }
  284. return w
  285. }
  286. func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, attachStdio StdioCallback, options ...CreateOption) (err error) {
  287. clnt.lock(cont.Id)
  288. defer clnt.unlock(cont.Id)
  289. logrus.Debugf("libcontainerd: restore container %s state %s", cont.Id, cont.Status)
  290. containerID := cont.Id
  291. if _, err := clnt.getContainer(containerID); err == nil {
  292. return fmt.Errorf("container %s is already active", containerID)
  293. }
  294. defer func() {
  295. if err != nil {
  296. clnt.deleteContainer(cont.Id)
  297. }
  298. }()
  299. container := clnt.newContainer(cont.BundlePath, options...)
  300. container.systemPid = systemPid(cont)
  301. var terminal bool
  302. for _, p := range cont.Processes {
  303. if p.Pid == InitFriendlyName {
  304. terminal = p.Terminal
  305. }
  306. }
  307. fifoCtx, cancel := context.WithCancel(context.Background())
  308. defer func() {
  309. if err != nil {
  310. cancel()
  311. }
  312. }()
  313. iopipe, err := container.openFifos(fifoCtx, terminal)
  314. if err != nil {
  315. return err
  316. }
  317. var stdinOnce sync.Once
  318. stdin := iopipe.Stdin
  319. iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
  320. var err error
  321. stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
  322. err = stdin.Close()
  323. })
  324. return err
  325. })
  326. if err := attachStdio(*iopipe); err != nil {
  327. container.closeFifos(iopipe)
  328. return err
  329. }
  330. clnt.appendContainer(container)
  331. err = clnt.backend.StateChanged(containerID, StateInfo{
  332. CommonStateInfo: CommonStateInfo{
  333. State: StateRestore,
  334. Pid: container.systemPid,
  335. }})
  336. if err != nil {
  337. container.closeFifos(iopipe)
  338. return err
  339. }
  340. if lastEvent != nil {
  341. // This should only be a pause or resume event
  342. if lastEvent.Type == StatePause || lastEvent.Type == StateResume {
  343. return clnt.backend.StateChanged(containerID, StateInfo{
  344. CommonStateInfo: CommonStateInfo{
  345. State: lastEvent.Type,
  346. Pid: container.systemPid,
  347. }})
  348. }
  349. logrus.Warnf("libcontainerd: unexpected backlog event: %#v", lastEvent)
  350. }
  351. return nil
  352. }
  353. func (clnt *client) getContainerLastEventSinceTime(id string, tsp *timestamp.Timestamp) (*containerd.Event, error) {
  354. er := &containerd.EventsRequest{
  355. Timestamp: tsp,
  356. StoredOnly: true,
  357. Id: id,
  358. }
  359. events, err := clnt.remote.apiClient.Events(context.Background(), er)
  360. if err != nil {
  361. logrus.Errorf("libcontainerd: failed to get container events stream for %s: %q", er.Id, err)
  362. return nil, err
  363. }
  364. var ev *containerd.Event
  365. for {
  366. e, err := events.Recv()
  367. if err != nil {
  368. if err.Error() == "EOF" {
  369. break
  370. }
  371. logrus.Errorf("libcontainerd: failed to get container event for %s: %q", id, err)
  372. return nil, err
  373. }
  374. ev = e
  375. logrus.Debugf("libcontainerd: received past event %#v", ev)
  376. }
  377. return ev, nil
  378. }
  379. func (clnt *client) getContainerLastEvent(id string) (*containerd.Event, error) {
  380. ev, err := clnt.getContainerLastEventSinceTime(id, clnt.remote.restoreFromTimestamp)
  381. if err == nil && ev == nil {
  382. // If ev is nil and the container is running in containerd,
  383. // we already consumed all the event of the
  384. // container, included the "exit" one.
  385. // Thus, we request all events containerd has in memory for
  386. // this container in order to get the last one (which should
  387. // be an exit event)
  388. logrus.Warnf("libcontainerd: client is out of sync, restore was called on a fully synced container (%s).", id)
  389. // Request all events since beginning of time
  390. t := time.Unix(0, 0)
  391. tsp, err := ptypes.TimestampProto(t)
  392. if err != nil {
  393. logrus.Errorf("libcontainerd: getLastEventSinceTime() failed to convert timestamp: %q", err)
  394. return nil, err
  395. }
  396. return clnt.getContainerLastEventSinceTime(id, tsp)
  397. }
  398. return ev, err
  399. }
  400. func (clnt *client) Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error {
  401. // Synchronize with live events
  402. clnt.remote.Lock()
  403. defer clnt.remote.Unlock()
  404. // Check that containerd still knows this container.
  405. //
  406. // In the unlikely event that Restore for this container process
  407. // the its past event before the main loop, the event will be
  408. // processed twice. However, this is not an issue as all those
  409. // events will do is change the state of the container to be
  410. // exactly the same.
  411. cont, err := clnt.getContainerdContainer(containerID)
  412. // Get its last event
  413. ev, eerr := clnt.getContainerLastEvent(containerID)
  414. if err != nil || containerd_runtime_types.State(cont.Status) == containerd_runtime_types.Stopped {
  415. if err != nil {
  416. logrus.Warnf("libcontainerd: failed to retrieve container %s state: %v", containerID, err)
  417. }
  418. if ev != nil && (ev.Pid != InitFriendlyName || ev.Type != StateExit) {
  419. // Wait a while for the exit event
  420. timeout := time.NewTimer(10 * time.Second)
  421. tick := time.NewTicker(100 * time.Millisecond)
  422. stop:
  423. for {
  424. select {
  425. case <-timeout.C:
  426. break stop
  427. case <-tick.C:
  428. ev, eerr = clnt.getContainerLastEvent(containerID)
  429. if eerr != nil {
  430. break stop
  431. }
  432. if ev != nil && ev.Pid == InitFriendlyName && ev.Type == StateExit {
  433. break stop
  434. }
  435. }
  436. }
  437. timeout.Stop()
  438. tick.Stop()
  439. }
  440. // get the exit status for this container, if we don't have
  441. // one, indicate an error
  442. ec := uint32(255)
  443. if eerr == nil && ev != nil && ev.Pid == InitFriendlyName && ev.Type == StateExit {
  444. ec = ev.Status
  445. }
  446. clnt.setExited(containerID, ec)
  447. return nil
  448. }
  449. // container is still alive
  450. if clnt.liveRestore {
  451. if err := clnt.restore(cont, ev, attachStdio, options...); err != nil {
  452. logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err)
  453. }
  454. return nil
  455. }
  456. // Kill the container if liveRestore == false
  457. w := clnt.getOrCreateExitNotifier(containerID)
  458. clnt.lock(cont.Id)
  459. container := clnt.newContainer(cont.BundlePath)
  460. container.systemPid = systemPid(cont)
  461. clnt.appendContainer(container)
  462. clnt.unlock(cont.Id)
  463. container.discardFifos()
  464. if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil {
  465. logrus.Errorf("libcontainerd: error sending sigterm to %v: %v", containerID, err)
  466. }
  467. // Let the main loop handle the exit event
  468. clnt.remote.Unlock()
  469. if ev != nil && ev.Type == StatePause {
  470. // resume container, it depends on the main loop, so we do it after Unlock()
  471. logrus.Debugf("libcontainerd: %s was paused, resuming it so it can die", containerID)
  472. if err := clnt.Resume(containerID); err != nil {
  473. return fmt.Errorf("failed to resume container: %v", err)
  474. }
  475. }
  476. select {
  477. case <-time.After(10 * time.Second):
  478. if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil {
  479. logrus.Errorf("libcontainerd: error sending sigkill to %v: %v", containerID, err)
  480. }
  481. select {
  482. case <-time.After(2 * time.Second):
  483. case <-w.wait():
  484. // relock because of the defer
  485. clnt.remote.Lock()
  486. return nil
  487. }
  488. case <-w.wait():
  489. // relock because of the defer
  490. clnt.remote.Lock()
  491. return nil
  492. }
  493. // relock because of the defer
  494. clnt.remote.Lock()
  495. clnt.deleteContainer(containerID)
  496. return clnt.setExited(containerID, uint32(255))
  497. }
  498. func (clnt *client) CreateCheckpoint(containerID string, checkpointID string, checkpointDir string, exit bool) error {
  499. clnt.lock(containerID)
  500. defer clnt.unlock(containerID)
  501. if _, err := clnt.getContainer(containerID); err != nil {
  502. return err
  503. }
  504. _, err := clnt.remote.apiClient.CreateCheckpoint(context.Background(), &containerd.CreateCheckpointRequest{
  505. Id: containerID,
  506. Checkpoint: &containerd.Checkpoint{
  507. Name: checkpointID,
  508. Exit: exit,
  509. Tcp: true,
  510. UnixSockets: true,
  511. Shell: false,
  512. EmptyNS: []string{"network"},
  513. },
  514. CheckpointDir: checkpointDir,
  515. })
  516. return err
  517. }
  518. func (clnt *client) DeleteCheckpoint(containerID string, checkpointID string, checkpointDir string) error {
  519. clnt.lock(containerID)
  520. defer clnt.unlock(containerID)
  521. if _, err := clnt.getContainer(containerID); err != nil {
  522. return err
  523. }
  524. _, err := clnt.remote.apiClient.DeleteCheckpoint(context.Background(), &containerd.DeleteCheckpointRequest{
  525. Id: containerID,
  526. Name: checkpointID,
  527. CheckpointDir: checkpointDir,
  528. })
  529. return err
  530. }
  531. func (clnt *client) ListCheckpoints(containerID string, checkpointDir string) (*Checkpoints, error) {
  532. clnt.lock(containerID)
  533. defer clnt.unlock(containerID)
  534. if _, err := clnt.getContainer(containerID); err != nil {
  535. return nil, err
  536. }
  537. resp, err := clnt.remote.apiClient.ListCheckpoint(context.Background(), &containerd.ListCheckpointRequest{
  538. Id: containerID,
  539. CheckpointDir: checkpointDir,
  540. })
  541. if err != nil {
  542. return nil, err
  543. }
  544. return (*Checkpoints)(resp), nil
  545. }