client_linux.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591
  1. package libcontainerd
  2. import (
  3. "fmt"
  4. "os"
  5. "strings"
  6. "sync"
  7. "syscall"
  8. "time"
  9. "github.com/Sirupsen/logrus"
  10. containerd "github.com/docker/containerd/api/grpc/types"
  11. "github.com/docker/docker/pkg/ioutils"
  12. "github.com/docker/docker/pkg/mount"
  13. "github.com/golang/protobuf/ptypes"
  14. "github.com/golang/protobuf/ptypes/timestamp"
  15. specs "github.com/opencontainers/runtime-spec/specs-go"
  16. "golang.org/x/net/context"
  17. )
  18. type client struct {
  19. clientCommon
  20. // Platform specific properties below here.
  21. remote *remote
  22. q queue
  23. exitNotifiers map[string]*exitNotifier
  24. liveRestore bool
  25. }
  26. // GetServerVersion returns the connected server version information
  27. func (clnt *client) GetServerVersion(ctx context.Context) (*ServerVersion, error) {
  28. resp, err := clnt.remote.apiClient.GetServerVersion(ctx, &containerd.GetServerVersionRequest{})
  29. if err != nil {
  30. return nil, err
  31. }
  32. sv := &ServerVersion{
  33. GetServerVersionResponse: *resp,
  34. }
  35. return sv, nil
  36. }
  37. // AddProcess is the handler for adding a process to an already running
  38. // container. It's called through docker exec. It returns the system pid of the
  39. // exec'd process.
  40. func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process, attachStdio StdioCallback) (int, error) {
  41. clnt.lock(containerID)
  42. defer clnt.unlock(containerID)
  43. container, err := clnt.getContainer(containerID)
  44. if err != nil {
  45. return -1, err
  46. }
  47. spec, err := container.spec()
  48. if err != nil {
  49. return -1, err
  50. }
  51. sp := spec.Process
  52. sp.Args = specp.Args
  53. sp.Terminal = specp.Terminal
  54. if len(specp.Env) > 0 {
  55. sp.Env = specp.Env
  56. }
  57. if specp.Cwd != nil {
  58. sp.Cwd = *specp.Cwd
  59. }
  60. if specp.User != nil {
  61. sp.User = specs.User{
  62. UID: specp.User.UID,
  63. GID: specp.User.GID,
  64. AdditionalGids: specp.User.AdditionalGids,
  65. }
  66. }
  67. if specp.Capabilities != nil {
  68. sp.Capabilities = specp.Capabilities
  69. }
  70. p := container.newProcess(processFriendlyName)
  71. r := &containerd.AddProcessRequest{
  72. Args: sp.Args,
  73. Cwd: sp.Cwd,
  74. Terminal: sp.Terminal,
  75. Id: containerID,
  76. Env: sp.Env,
  77. User: &containerd.User{
  78. Uid: sp.User.UID,
  79. Gid: sp.User.GID,
  80. AdditionalGids: sp.User.AdditionalGids,
  81. },
  82. Pid: processFriendlyName,
  83. Stdin: p.fifo(syscall.Stdin),
  84. Stdout: p.fifo(syscall.Stdout),
  85. Stderr: p.fifo(syscall.Stderr),
  86. Capabilities: sp.Capabilities,
  87. ApparmorProfile: sp.ApparmorProfile,
  88. SelinuxLabel: sp.SelinuxLabel,
  89. NoNewPrivileges: sp.NoNewPrivileges,
  90. Rlimits: convertRlimits(sp.Rlimits),
  91. }
  92. iopipe, err := p.openFifos(sp.Terminal)
  93. if err != nil {
  94. return -1, err
  95. }
  96. resp, err := clnt.remote.apiClient.AddProcess(ctx, r)
  97. if err != nil {
  98. p.closeFifos(iopipe)
  99. return -1, err
  100. }
  101. var stdinOnce sync.Once
  102. stdin := iopipe.Stdin
  103. iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
  104. var err error
  105. stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
  106. err = stdin.Close()
  107. if err2 := p.sendCloseStdin(); err == nil {
  108. err = err2
  109. }
  110. })
  111. return err
  112. })
  113. container.processes[processFriendlyName] = p
  114. if err := attachStdio(*iopipe); err != nil {
  115. p.closeFifos(iopipe)
  116. return -1, err
  117. }
  118. return int(resp.SystemPid), nil
  119. }
  120. func (clnt *client) SignalProcess(containerID string, pid string, sig int) error {
  121. clnt.lock(containerID)
  122. defer clnt.unlock(containerID)
  123. _, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
  124. Id: containerID,
  125. Pid: pid,
  126. Signal: uint32(sig),
  127. })
  128. return err
  129. }
  130. func (clnt *client) Resize(containerID, processFriendlyName string, width, height int) error {
  131. clnt.lock(containerID)
  132. defer clnt.unlock(containerID)
  133. if _, err := clnt.getContainer(containerID); err != nil {
  134. return err
  135. }
  136. _, err := clnt.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
  137. Id: containerID,
  138. Pid: processFriendlyName,
  139. Width: uint32(width),
  140. Height: uint32(height),
  141. })
  142. return err
  143. }
  144. func (clnt *client) Pause(containerID string) error {
  145. return clnt.setState(containerID, StatePause)
  146. }
  147. func (clnt *client) setState(containerID, state string) error {
  148. clnt.lock(containerID)
  149. container, err := clnt.getContainer(containerID)
  150. if err != nil {
  151. clnt.unlock(containerID)
  152. return err
  153. }
  154. if container.systemPid == 0 {
  155. clnt.unlock(containerID)
  156. return fmt.Errorf("No active process for container %s", containerID)
  157. }
  158. st := "running"
  159. if state == StatePause {
  160. st = "paused"
  161. }
  162. chstate := make(chan struct{})
  163. _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
  164. Id: containerID,
  165. Pid: InitFriendlyName,
  166. Status: st,
  167. })
  168. if err != nil {
  169. clnt.unlock(containerID)
  170. return err
  171. }
  172. container.pauseMonitor.append(state, chstate)
  173. clnt.unlock(containerID)
  174. <-chstate
  175. return nil
  176. }
  177. func (clnt *client) Resume(containerID string) error {
  178. return clnt.setState(containerID, StateResume)
  179. }
  180. func (clnt *client) Stats(containerID string) (*Stats, error) {
  181. resp, err := clnt.remote.apiClient.Stats(context.Background(), &containerd.StatsRequest{containerID})
  182. if err != nil {
  183. return nil, err
  184. }
  185. return (*Stats)(resp), nil
  186. }
  187. // Take care of the old 1.11.0 behavior in case the version upgrade
  188. // happened without a clean daemon shutdown
  189. func (clnt *client) cleanupOldRootfs(containerID string) {
  190. // Unmount and delete the bundle folder
  191. if mts, err := mount.GetMounts(); err == nil {
  192. for _, mts := range mts {
  193. if strings.HasSuffix(mts.Mountpoint, containerID+"/rootfs") {
  194. if err := syscall.Unmount(mts.Mountpoint, syscall.MNT_DETACH); err == nil {
  195. os.RemoveAll(strings.TrimSuffix(mts.Mountpoint, "/rootfs"))
  196. }
  197. break
  198. }
  199. }
  200. }
  201. }
  202. func (clnt *client) setExited(containerID string, exitCode uint32) error {
  203. clnt.lock(containerID)
  204. defer clnt.unlock(containerID)
  205. err := clnt.backend.StateChanged(containerID, StateInfo{
  206. CommonStateInfo: CommonStateInfo{
  207. State: StateExit,
  208. ExitCode: exitCode,
  209. }})
  210. clnt.cleanupOldRootfs(containerID)
  211. return err
  212. }
  213. func (clnt *client) GetPidsForContainer(containerID string) ([]int, error) {
  214. cont, err := clnt.getContainerdContainer(containerID)
  215. if err != nil {
  216. return nil, err
  217. }
  218. pids := make([]int, len(cont.Pids))
  219. for i, p := range cont.Pids {
  220. pids[i] = int(p)
  221. }
  222. return pids, nil
  223. }
  224. // Summary returns a summary of the processes running in a container.
  225. // This is a no-op on Linux.
  226. func (clnt *client) Summary(containerID string) ([]Summary, error) {
  227. return nil, nil
  228. }
  229. func (clnt *client) getContainerdContainer(containerID string) (*containerd.Container, error) {
  230. resp, err := clnt.remote.apiClient.State(context.Background(), &containerd.StateRequest{Id: containerID})
  231. if err != nil {
  232. return nil, err
  233. }
  234. for _, cont := range resp.Containers {
  235. if cont.Id == containerID {
  236. return cont, nil
  237. }
  238. }
  239. return nil, fmt.Errorf("invalid state response")
  240. }
  241. func (clnt *client) UpdateResources(containerID string, resources Resources) error {
  242. clnt.lock(containerID)
  243. defer clnt.unlock(containerID)
  244. container, err := clnt.getContainer(containerID)
  245. if err != nil {
  246. return err
  247. }
  248. if container.systemPid == 0 {
  249. return fmt.Errorf("No active process for container %s", containerID)
  250. }
  251. _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
  252. Id: containerID,
  253. Pid: InitFriendlyName,
  254. Resources: (*containerd.UpdateResource)(&resources),
  255. })
  256. if err != nil {
  257. return err
  258. }
  259. return nil
  260. }
  261. func (clnt *client) getExitNotifier(containerID string) *exitNotifier {
  262. clnt.mapMutex.RLock()
  263. defer clnt.mapMutex.RUnlock()
  264. return clnt.exitNotifiers[containerID]
  265. }
  266. func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
  267. clnt.mapMutex.Lock()
  268. w, ok := clnt.exitNotifiers[containerID]
  269. defer clnt.mapMutex.Unlock()
  270. if !ok {
  271. w = &exitNotifier{c: make(chan struct{}), client: clnt}
  272. clnt.exitNotifiers[containerID] = w
  273. }
  274. return w
  275. }
  276. func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, attachStdio StdioCallback, options ...CreateOption) (err error) {
  277. clnt.lock(cont.Id)
  278. defer clnt.unlock(cont.Id)
  279. logrus.Debugf("libcontainerd: restore container %s state %s", cont.Id, cont.Status)
  280. containerID := cont.Id
  281. if _, err := clnt.getContainer(containerID); err == nil {
  282. return fmt.Errorf("container %s is already active", containerID)
  283. }
  284. defer func() {
  285. if err != nil {
  286. clnt.deleteContainer(cont.Id)
  287. }
  288. }()
  289. container := clnt.newContainer(cont.BundlePath, options...)
  290. container.systemPid = systemPid(cont)
  291. var terminal bool
  292. for _, p := range cont.Processes {
  293. if p.Pid == InitFriendlyName {
  294. terminal = p.Terminal
  295. }
  296. }
  297. iopipe, err := container.openFifos(terminal)
  298. if err != nil {
  299. return err
  300. }
  301. var stdinOnce sync.Once
  302. stdin := iopipe.Stdin
  303. iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
  304. var err error
  305. stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
  306. err = stdin.Close()
  307. })
  308. return err
  309. })
  310. if err := attachStdio(*iopipe); err != nil {
  311. container.closeFifos(iopipe)
  312. return err
  313. }
  314. clnt.appendContainer(container)
  315. err = clnt.backend.StateChanged(containerID, StateInfo{
  316. CommonStateInfo: CommonStateInfo{
  317. State: StateRestore,
  318. Pid: container.systemPid,
  319. }})
  320. if err != nil {
  321. container.closeFifos(iopipe)
  322. return err
  323. }
  324. if lastEvent != nil {
  325. // This should only be a pause or resume event
  326. if lastEvent.Type == StatePause || lastEvent.Type == StateResume {
  327. return clnt.backend.StateChanged(containerID, StateInfo{
  328. CommonStateInfo: CommonStateInfo{
  329. State: lastEvent.Type,
  330. Pid: container.systemPid,
  331. }})
  332. }
  333. logrus.Warnf("libcontainerd: unexpected backlog event: %#v", lastEvent)
  334. }
  335. return nil
  336. }
  337. func (clnt *client) getContainerLastEventSinceTime(id string, tsp *timestamp.Timestamp) (*containerd.Event, error) {
  338. er := &containerd.EventsRequest{
  339. Timestamp: tsp,
  340. StoredOnly: true,
  341. Id: id,
  342. }
  343. events, err := clnt.remote.apiClient.Events(context.Background(), er)
  344. if err != nil {
  345. logrus.Errorf("libcontainerd: failed to get container events stream for %s: %q", er.Id, err)
  346. return nil, err
  347. }
  348. var ev *containerd.Event
  349. for {
  350. e, err := events.Recv()
  351. if err != nil {
  352. if err.Error() == "EOF" {
  353. break
  354. }
  355. logrus.Errorf("libcontainerd: failed to get container event for %s: %q", id, err)
  356. return nil, err
  357. }
  358. ev = e
  359. logrus.Debugf("libcontainerd: received past event %#v", ev)
  360. }
  361. return ev, nil
  362. }
  363. func (clnt *client) getContainerLastEvent(id string) (*containerd.Event, error) {
  364. ev, err := clnt.getContainerLastEventSinceTime(id, clnt.remote.restoreFromTimestamp)
  365. if err == nil && ev == nil {
  366. // If ev is nil and the container is running in containerd,
  367. // we already consumed all the event of the
  368. // container, included the "exit" one.
  369. // Thus, we request all events containerd has in memory for
  370. // this container in order to get the last one (which should
  371. // be an exit event)
  372. logrus.Warnf("libcontainerd: client is out of sync, restore was called on a fully synced container (%s).", id)
  373. // Request all events since beginning of time
  374. t := time.Unix(0, 0)
  375. tsp, err := ptypes.TimestampProto(t)
  376. if err != nil {
  377. logrus.Errorf("libcontainerd: getLastEventSinceTime() failed to convert timestamp: %q", err)
  378. return nil, err
  379. }
  380. return clnt.getContainerLastEventSinceTime(id, tsp)
  381. }
  382. return ev, err
  383. }
  384. func (clnt *client) Restore(containerID string, attachStdio StdioCallback, options ...CreateOption) error {
  385. // Synchronize with live events
  386. clnt.remote.Lock()
  387. defer clnt.remote.Unlock()
  388. // Check that containerd still knows this container.
  389. //
  390. // In the unlikely event that Restore for this container process
  391. // the its past event before the main loop, the event will be
  392. // processed twice. However, this is not an issue as all those
  393. // events will do is change the state of the container to be
  394. // exactly the same.
  395. cont, err := clnt.getContainerdContainer(containerID)
  396. // Get its last event
  397. ev, eerr := clnt.getContainerLastEvent(containerID)
  398. if err != nil || cont.Status == "Stopped" {
  399. if err != nil {
  400. logrus.Warnf("libcontainerd: failed to retrieve container %s state: %v", containerID, err)
  401. }
  402. if ev != nil && (ev.Pid != InitFriendlyName || ev.Type != StateExit) {
  403. // Wait a while for the exit event
  404. timeout := time.NewTimer(10 * time.Second)
  405. tick := time.NewTicker(100 * time.Millisecond)
  406. stop:
  407. for {
  408. select {
  409. case <-timeout.C:
  410. break stop
  411. case <-tick.C:
  412. ev, eerr = clnt.getContainerLastEvent(containerID)
  413. if eerr != nil {
  414. break stop
  415. }
  416. if ev != nil && ev.Pid == InitFriendlyName && ev.Type == StateExit {
  417. break stop
  418. }
  419. }
  420. }
  421. timeout.Stop()
  422. tick.Stop()
  423. }
  424. // get the exit status for this container, if we don't have
  425. // one, indicate an error
  426. ec := uint32(255)
  427. if eerr == nil && ev != nil && ev.Pid == InitFriendlyName && ev.Type == StateExit {
  428. ec = ev.Status
  429. }
  430. clnt.setExited(containerID, ec)
  431. return nil
  432. }
  433. // container is still alive
  434. if clnt.liveRestore {
  435. if err := clnt.restore(cont, ev, attachStdio, options...); err != nil {
  436. logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err)
  437. }
  438. return nil
  439. }
  440. // Kill the container if liveRestore == false
  441. w := clnt.getOrCreateExitNotifier(containerID)
  442. clnt.lock(cont.Id)
  443. container := clnt.newContainer(cont.BundlePath)
  444. container.systemPid = systemPid(cont)
  445. clnt.appendContainer(container)
  446. clnt.unlock(cont.Id)
  447. container.discardFifos()
  448. if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil {
  449. logrus.Errorf("libcontainerd: error sending sigterm to %v: %v", containerID, err)
  450. }
  451. // Let the main loop handle the exit event
  452. clnt.remote.Unlock()
  453. select {
  454. case <-time.After(10 * time.Second):
  455. if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil {
  456. logrus.Errorf("libcontainerd: error sending sigkill to %v: %v", containerID, err)
  457. }
  458. select {
  459. case <-time.After(2 * time.Second):
  460. case <-w.wait():
  461. // relock because of the defer
  462. clnt.remote.Lock()
  463. return nil
  464. }
  465. case <-w.wait():
  466. // relock because of the defer
  467. clnt.remote.Lock()
  468. return nil
  469. }
  470. // relock because of the defer
  471. clnt.remote.Lock()
  472. clnt.deleteContainer(containerID)
  473. return clnt.setExited(containerID, uint32(255))
  474. }
  475. func (clnt *client) CreateCheckpoint(containerID string, checkpointID string, checkpointDir string, exit bool) error {
  476. clnt.lock(containerID)
  477. defer clnt.unlock(containerID)
  478. if _, err := clnt.getContainer(containerID); err != nil {
  479. return err
  480. }
  481. _, err := clnt.remote.apiClient.CreateCheckpoint(context.Background(), &containerd.CreateCheckpointRequest{
  482. Id: containerID,
  483. Checkpoint: &containerd.Checkpoint{
  484. Name: checkpointID,
  485. Exit: exit,
  486. Tcp: true,
  487. UnixSockets: true,
  488. Shell: false,
  489. EmptyNS: []string{"network"},
  490. },
  491. CheckpointDir: checkpointDir,
  492. })
  493. return err
  494. }
  495. func (clnt *client) DeleteCheckpoint(containerID string, checkpointID string, checkpointDir string) error {
  496. clnt.lock(containerID)
  497. defer clnt.unlock(containerID)
  498. if _, err := clnt.getContainer(containerID); err != nil {
  499. return err
  500. }
  501. _, err := clnt.remote.apiClient.DeleteCheckpoint(context.Background(), &containerd.DeleteCheckpointRequest{
  502. Id: containerID,
  503. Name: checkpointID,
  504. CheckpointDir: checkpointDir,
  505. })
  506. return err
  507. }
  508. func (clnt *client) ListCheckpoints(containerID string, checkpointDir string) (*Checkpoints, error) {
  509. clnt.lock(containerID)
  510. defer clnt.unlock(containerID)
  511. if _, err := clnt.getContainer(containerID); err != nil {
  512. return nil, err
  513. }
  514. resp, err := clnt.remote.apiClient.ListCheckpoint(context.Background(), &containerd.ListCheckpointRequest{
  515. Id: containerID,
  516. CheckpointDir: checkpointDir,
  517. })
  518. if err != nil {
  519. return nil, err
  520. }
  521. return (*Checkpoints)(resp), nil
  522. }