client_linux.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605
  1. package libcontainerd
  2. import (
  3. "fmt"
  4. "os"
  5. "strings"
  6. "sync"
  7. "syscall"
  8. "time"
  9. "github.com/Sirupsen/logrus"
  10. containerd "github.com/docker/containerd/api/grpc/types"
  11. "github.com/docker/docker/pkg/ioutils"
  12. "github.com/docker/docker/pkg/mount"
  13. "github.com/golang/protobuf/ptypes"
  14. "github.com/golang/protobuf/ptypes/timestamp"
  15. specs "github.com/opencontainers/runtime-spec/specs-go"
  16. "golang.org/x/net/context"
  17. )
  18. type client struct {
  19. clientCommon
  20. // Platform specific properties below here.
  21. remote *remote
  22. q queue
  23. exitNotifiers map[string]*exitNotifier
  24. liveRestore bool
  25. }
  26. // GetServerVersion returns the connected server version information
  27. func (clnt *client) GetServerVersion(ctx context.Context) (*ServerVersion, error) {
  28. resp, err := clnt.remote.apiClient.GetServerVersion(ctx, &containerd.GetServerVersionRequest{})
  29. if err != nil {
  30. return nil, err
  31. }
  32. sv := &ServerVersion{
  33. GetServerVersionResponse: *resp,
  34. }
  35. return sv, nil
  36. }
  37. // AddProcess is the handler for adding a process to an already running
  38. // container. It's called through docker exec. It returns the system pid of the
  39. // exec'd process.
  40. func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) (pid int, err error) {
  41. clnt.lock(containerID)
  42. defer clnt.unlock(containerID)
  43. container, err := clnt.getContainer(containerID)
  44. if err != nil {
  45. return -1, err
  46. }
  47. spec, err := container.spec()
  48. if err != nil {
  49. return -1, err
  50. }
  51. sp := spec.Process
  52. sp.Args = specp.Args
  53. sp.Terminal = specp.Terminal
  54. if len(specp.Env) > 0 {
  55. sp.Env = specp.Env
  56. }
  57. if specp.Cwd != nil {
  58. sp.Cwd = *specp.Cwd
  59. }
  60. if specp.User != nil {
  61. sp.User = specs.User{
  62. UID: specp.User.UID,
  63. GID: specp.User.GID,
  64. AdditionalGids: specp.User.AdditionalGids,
  65. }
  66. }
  67. if specp.Capabilities != nil {
  68. sp.Capabilities = specp.Capabilities
  69. }
  70. p := container.newProcess(processFriendlyName)
  71. r := &containerd.AddProcessRequest{
  72. Args: sp.Args,
  73. Cwd: sp.Cwd,
  74. Terminal: sp.Terminal,
  75. Id: containerID,
  76. Env: sp.Env,
  77. User: &containerd.User{
  78. Uid: sp.User.UID,
  79. Gid: sp.User.GID,
  80. AdditionalGids: sp.User.AdditionalGids,
  81. },
  82. Pid: processFriendlyName,
  83. Stdin: p.fifo(syscall.Stdin),
  84. Stdout: p.fifo(syscall.Stdout),
  85. Stderr: p.fifo(syscall.Stderr),
  86. Capabilities: sp.Capabilities,
  87. ApparmorProfile: sp.ApparmorProfile,
  88. SelinuxLabel: sp.SelinuxLabel,
  89. NoNewPrivileges: sp.NoNewPrivileges,
  90. Rlimits: convertRlimits(sp.Rlimits),
  91. }
  92. fifoCtx, cancel := context.WithCancel(context.Background())
  93. defer func() {
  94. if err != nil {
  95. cancel()
  96. }
  97. }()
  98. iopipe, err := p.openFifos(fifoCtx, sp.Terminal)
  99. if err != nil {
  100. return -1, err
  101. }
  102. resp, err := clnt.remote.apiClient.AddProcess(ctx, r)
  103. if err != nil {
  104. p.closeFifos(iopipe)
  105. return -1, err
  106. }
  107. var stdinOnce sync.Once
  108. stdin := iopipe.Stdin
  109. iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
  110. var err error
  111. stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
  112. err = stdin.Close()
  113. if err2 := p.sendCloseStdin(); err == nil {
  114. err = err2
  115. }
  116. })
  117. return err
  118. })
  119. container.processes[processFriendlyName] = p
  120. if err := attachStdio(*iopipe); err != nil {
  121. p.closeFifos(iopipe)
  122. return -1, err
  123. }
  124. return int(resp.SystemPid), nil
  125. }
  126. func (clnt *client) SignalProcess(containerID string, pid string, sig int) error {
  127. clnt.lock(containerID)
  128. defer clnt.unlock(containerID)
  129. _, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
  130. Id: containerID,
  131. Pid: pid,
  132. Signal: uint32(sig),
  133. })
  134. return err
  135. }
  136. func (clnt *client) Resize(containerID, processFriendlyName string, width, height int) error {
  137. clnt.lock(containerID)
  138. defer clnt.unlock(containerID)
  139. if _, err := clnt.getContainer(containerID); err != nil {
  140. return err
  141. }
  142. _, err := clnt.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
  143. Id: containerID,
  144. Pid: processFriendlyName,
  145. Width: uint32(width),
  146. Height: uint32(height),
  147. })
  148. return err
  149. }
  150. func (clnt *client) Pause(containerID string) error {
  151. return clnt.setState(containerID, StatePause)
  152. }
  153. func (clnt *client) setState(containerID, state string) error {
  154. clnt.lock(containerID)
  155. container, err := clnt.getContainer(containerID)
  156. if err != nil {
  157. clnt.unlock(containerID)
  158. return err
  159. }
  160. if container.systemPid == 0 {
  161. clnt.unlock(containerID)
  162. return fmt.Errorf("No active process for container %s", containerID)
  163. }
  164. st := "running"
  165. if state == StatePause {
  166. st = "paused"
  167. }
  168. chstate := make(chan struct{})
  169. _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
  170. Id: containerID,
  171. Pid: InitFriendlyName,
  172. Status: st,
  173. })
  174. if err != nil {
  175. clnt.unlock(containerID)
  176. return err
  177. }
  178. container.pauseMonitor.append(state, chstate)
  179. clnt.unlock(containerID)
  180. <-chstate
  181. return nil
  182. }
  183. func (clnt *client) Resume(containerID string) error {
  184. return clnt.setState(containerID, StateResume)
  185. }
  186. func (clnt *client) Stats(containerID string) (*Stats, error) {
  187. resp, err := clnt.remote.apiClient.Stats(context.Background(), &containerd.StatsRequest{containerID})
  188. if err != nil {
  189. return nil, err
  190. }
  191. return (*Stats)(resp), nil
  192. }
  193. // Take care of the old 1.11.0 behavior in case the version upgrade
  194. // happened without a clean daemon shutdown
  195. func (clnt *client) cleanupOldRootfs(containerID string) {
  196. // Unmount and delete the bundle folder
  197. if mts, err := mount.GetMounts(); err == nil {
  198. for _, mts := range mts {
  199. if strings.HasSuffix(mts.Mountpoint, containerID+"/rootfs") {
  200. if err := syscall.Unmount(mts.Mountpoint, syscall.MNT_DETACH); err == nil {
  201. os.RemoveAll(strings.TrimSuffix(mts.Mountpoint, "/rootfs"))
  202. }
  203. break
  204. }
  205. }
  206. }
  207. }
  208. func (clnt *client) setExited(containerID string, exitCode uint32) error {
  209. clnt.lock(containerID)
  210. defer clnt.unlock(containerID)
  211. err := clnt.backend.StateChanged(containerID, StateInfo{
  212. CommonStateInfo: CommonStateInfo{
  213. State: StateExit,
  214. ExitCode: exitCode,
  215. }})
  216. clnt.cleanupOldRootfs(containerID)
  217. return err
  218. }
  219. func (clnt *client) GetPidsForContainer(containerID string) ([]int, error) {
  220. cont, err := clnt.getContainerdContainer(containerID)
  221. if err != nil {
  222. return nil, err
  223. }
  224. pids := make([]int, len(cont.Pids))
  225. for i, p := range cont.Pids {
  226. pids[i] = int(p)
  227. }
  228. return pids, nil
  229. }
  230. // Summary returns a summary of the processes running in a container.
  231. // This is a no-op on Linux.
  232. func (clnt *client) Summary(containerID string) ([]Summary, error) {
  233. return nil, nil
  234. }
  235. func (clnt *client) getContainerdContainer(containerID string) (*containerd.Container, error) {
  236. resp, err := clnt.remote.apiClient.State(context.Background(), &containerd.StateRequest{Id: containerID})
  237. if err != nil {
  238. return nil, err
  239. }
  240. for _, cont := range resp.Containers {
  241. if cont.Id == containerID {
  242. return cont, nil
  243. }
  244. }
  245. return nil, fmt.Errorf("invalid state response")
  246. }
  247. func (clnt *client) UpdateResources(containerID string, resources Resources) error {
  248. clnt.lock(containerID)
  249. defer clnt.unlock(containerID)
  250. container, err := clnt.getContainer(containerID)
  251. if err != nil {
  252. return err
  253. }
  254. if container.systemPid == 0 {
  255. return fmt.Errorf("No active process for container %s", containerID)
  256. }
  257. _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
  258. Id: containerID,
  259. Pid: InitFriendlyName,
  260. Resources: (*containerd.UpdateResource)(&resources),
  261. })
  262. if err != nil {
  263. return err
  264. }
  265. return nil
  266. }
  267. func (clnt *client) getExitNotifier(containerID string) *exitNotifier {
  268. clnt.mapMutex.RLock()
  269. defer clnt.mapMutex.RUnlock()
  270. return clnt.exitNotifiers[containerID]
  271. }
  272. func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
  273. clnt.mapMutex.Lock()
  274. w, ok := clnt.exitNotifiers[containerID]
  275. defer clnt.mapMutex.Unlock()
  276. if !ok {
  277. w = &exitNotifier{c: make(chan struct{}), client: clnt}
  278. clnt.exitNotifiers[containerID] = w
  279. }
  280. return w
  281. }
  282. func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, attachStdio StdioCallback, options ...CreateOption) (err error) {
  283. clnt.lock(cont.Id)
  284. defer clnt.unlock(cont.Id)
  285. logrus.Debugf("libcontainerd: restore container %s state %s", cont.Id, cont.Status)
  286. containerID := cont.Id
  287. if _, err := clnt.getContainer(containerID); err == nil {
  288. return fmt.Errorf("container %s is already active", containerID)
  289. }
  290. defer func() {
  291. if err != nil {
  292. clnt.deleteContainer(cont.Id)
  293. }
  294. }()
  295. container := clnt.newContainer(cont.BundlePath, options...)
  296. container.systemPid = systemPid(cont)
  297. var terminal bool
  298. for _, p := range cont.Processes {
  299. if p.Pid == InitFriendlyName {
  300. terminal = p.Terminal
  301. }
  302. }
  303. fifoCtx, cancel := context.WithCancel(context.Background())
  304. defer func() {
  305. if err != nil {
  306. cancel()
  307. }
  308. }()
  309. iopipe, err := container.openFifos(fifoCtx, terminal)
  310. if err != nil {
  311. return err
  312. }
  313. var stdinOnce sync.Once
  314. stdin := iopipe.Stdin
  315. iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
  316. var err error
  317. stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
  318. err = stdin.Close()
  319. })
  320. return err
  321. })
  322. if err := attachStdio(*iopipe); err != nil {
  323. container.closeFifos(iopipe)
  324. return err
  325. }
  326. clnt.appendContainer(container)
  327. err = clnt.backend.StateChanged(containerID, StateInfo{
  328. CommonStateInfo: CommonStateInfo{
  329. State: StateRestore,
  330. Pid: container.systemPid,
  331. }})
  332. if err != nil {
  333. container.closeFifos(iopipe)
  334. return err
  335. }
  336. if lastEvent != nil {
  337. // This should only be a pause or resume event
  338. if lastEvent.Type == StatePause || lastEvent.Type == StateResume {
  339. return clnt.backend.StateChanged(containerID, StateInfo{
  340. CommonStateInfo: CommonStateInfo{
  341. State: lastEvent.Type,
  342. Pid: container.systemPid,
  343. }})
  344. }
  345. logrus.Warnf("libcontainerd: unexpected backlog event: %#v", lastEvent)
  346. }
  347. return nil
  348. }
  349. func (clnt *client) getContainerLastEventSinceTime(id string, tsp *timestamp.Timestamp) (*containerd.Event, error) {
  350. er := &containerd.EventsRequest{
  351. Timestamp: tsp,
  352. StoredOnly: true,
  353. Id: id,
  354. }
  355. events, err := clnt.remote.apiClient.Events(context.Background(), er)
  356. if err != nil {
  357. logrus.Errorf("libcontainerd: failed to get container events stream for %s: %q", er.Id, err)
  358. return nil, err
  359. }
  360. var ev *containerd.Event
  361. for {
  362. e, err := events.Recv()
  363. if err != nil {
  364. if err.Error() == "EOF" {
  365. break
  366. }
  367. logrus.Errorf("libcontainerd: failed to get container event for %s: %q", id, err)
  368. return nil, err
  369. }
  370. ev = e
  371. logrus.Debugf("libcontainerd: received past event %#v", ev)
  372. }
  373. return ev, nil
  374. }
  375. func (clnt *client) getContainerLastEvent(id string) (*containerd.Event, error) {
  376. ev, err := clnt.getContainerLastEventSinceTime(id, clnt.remote.restoreFromTimestamp)
  377. if err == nil && ev == nil {
  378. // If ev is nil and the container is running in containerd,
  379. // we already consumed all the event of the
  380. // container, included the "exit" one.
  381. // Thus, we request all events containerd has in memory for
  382. // this container in order to get the last one (which should
  383. // be an exit event)
  384. logrus.Warnf("libcontainerd: client is out of sync, restore was called on a fully synced container (%s).", id)
  385. // Request all events since beginning of time
  386. t := time.Unix(0, 0)
  387. tsp, err := ptypes.TimestampProto(t)
  388. if err != nil {
  389. logrus.Errorf("libcontainerd: getLastEventSinceTime() failed to convert timestamp: %q", err)
  390. return nil, err
  391. }
  392. return clnt.getContainerLastEventSinceTime(id, tsp)
  393. }
  394. return ev, err
  395. }
  396. func (clnt *client) Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error {
  397. // Synchronize with live events
  398. clnt.remote.Lock()
  399. defer clnt.remote.Unlock()
  400. // Check that containerd still knows this container.
  401. //
  402. // In the unlikely event that Restore for this container process
  403. // the its past event before the main loop, the event will be
  404. // processed twice. However, this is not an issue as all those
  405. // events will do is change the state of the container to be
  406. // exactly the same.
  407. cont, err := clnt.getContainerdContainer(containerID)
  408. // Get its last event
  409. ev, eerr := clnt.getContainerLastEvent(containerID)
  410. if err != nil || cont.Status == "Stopped" {
  411. if err != nil {
  412. logrus.Warnf("libcontainerd: failed to retrieve container %s state: %v", containerID, err)
  413. }
  414. if ev != nil && (ev.Pid != InitFriendlyName || ev.Type != StateExit) {
  415. // Wait a while for the exit event
  416. timeout := time.NewTimer(10 * time.Second)
  417. tick := time.NewTicker(100 * time.Millisecond)
  418. stop:
  419. for {
  420. select {
  421. case <-timeout.C:
  422. break stop
  423. case <-tick.C:
  424. ev, eerr = clnt.getContainerLastEvent(containerID)
  425. if eerr != nil {
  426. break stop
  427. }
  428. if ev != nil && ev.Pid == InitFriendlyName && ev.Type == StateExit {
  429. break stop
  430. }
  431. }
  432. }
  433. timeout.Stop()
  434. tick.Stop()
  435. }
  436. // get the exit status for this container, if we don't have
  437. // one, indicate an error
  438. ec := uint32(255)
  439. if eerr == nil && ev != nil && ev.Pid == InitFriendlyName && ev.Type == StateExit {
  440. ec = ev.Status
  441. }
  442. clnt.setExited(containerID, ec)
  443. return nil
  444. }
  445. // container is still alive
  446. if clnt.liveRestore {
  447. if err := clnt.restore(cont, ev, attachStdio, options...); err != nil {
  448. logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err)
  449. }
  450. return nil
  451. }
  452. // Kill the container if liveRestore == false
  453. w := clnt.getOrCreateExitNotifier(containerID)
  454. clnt.lock(cont.Id)
  455. container := clnt.newContainer(cont.BundlePath)
  456. container.systemPid = systemPid(cont)
  457. clnt.appendContainer(container)
  458. clnt.unlock(cont.Id)
  459. container.discardFifos()
  460. if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil {
  461. logrus.Errorf("libcontainerd: error sending sigterm to %v: %v", containerID, err)
  462. }
  463. // Let the main loop handle the exit event
  464. clnt.remote.Unlock()
  465. select {
  466. case <-time.After(10 * time.Second):
  467. if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil {
  468. logrus.Errorf("libcontainerd: error sending sigkill to %v: %v", containerID, err)
  469. }
  470. select {
  471. case <-time.After(2 * time.Second):
  472. case <-w.wait():
  473. // relock because of the defer
  474. clnt.remote.Lock()
  475. return nil
  476. }
  477. case <-w.wait():
  478. // relock because of the defer
  479. clnt.remote.Lock()
  480. return nil
  481. }
  482. // relock because of the defer
  483. clnt.remote.Lock()
  484. clnt.deleteContainer(containerID)
  485. return clnt.setExited(containerID, uint32(255))
  486. }
  487. func (clnt *client) CreateCheckpoint(containerID string, checkpointID string, checkpointDir string, exit bool) error {
  488. clnt.lock(containerID)
  489. defer clnt.unlock(containerID)
  490. if _, err := clnt.getContainer(containerID); err != nil {
  491. return err
  492. }
  493. _, err := clnt.remote.apiClient.CreateCheckpoint(context.Background(), &containerd.CreateCheckpointRequest{
  494. Id: containerID,
  495. Checkpoint: &containerd.Checkpoint{
  496. Name: checkpointID,
  497. Exit: exit,
  498. Tcp: true,
  499. UnixSockets: true,
  500. Shell: false,
  501. EmptyNS: []string{"network"},
  502. },
  503. CheckpointDir: checkpointDir,
  504. })
  505. return err
  506. }
  507. func (clnt *client) DeleteCheckpoint(containerID string, checkpointID string, checkpointDir string) error {
  508. clnt.lock(containerID)
  509. defer clnt.unlock(containerID)
  510. if _, err := clnt.getContainer(containerID); err != nil {
  511. return err
  512. }
  513. _, err := clnt.remote.apiClient.DeleteCheckpoint(context.Background(), &containerd.DeleteCheckpointRequest{
  514. Id: containerID,
  515. Name: checkpointID,
  516. CheckpointDir: checkpointDir,
  517. })
  518. return err
  519. }
  520. func (clnt *client) ListCheckpoints(containerID string, checkpointDir string) (*Checkpoints, error) {
  521. clnt.lock(containerID)
  522. defer clnt.unlock(containerID)
  523. if _, err := clnt.getContainer(containerID); err != nil {
  524. return nil, err
  525. }
  526. resp, err := clnt.remote.apiClient.ListCheckpoint(context.Background(), &containerd.ListCheckpointRequest{
  527. Id: containerID,
  528. CheckpointDir: checkpointDir,
  529. })
  530. if err != nil {
  531. return nil, err
  532. }
  533. return (*Checkpoints)(resp), nil
  534. }