client_linux.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677
  1. package libcontainerd
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "strings"
  8. "sync"
  9. "syscall"
  10. "time"
  11. "github.com/Sirupsen/logrus"
  12. containerd "github.com/docker/containerd/api/grpc/types"
  13. "github.com/docker/docker/pkg/idtools"
  14. "github.com/docker/docker/pkg/mount"
  15. "github.com/golang/protobuf/ptypes"
  16. "github.com/golang/protobuf/ptypes/timestamp"
  17. specs "github.com/opencontainers/runtime-spec/specs-go"
  18. "golang.org/x/net/context"
  19. )
  20. type client struct {
  21. clientCommon
  22. // Platform specific properties below here.
  23. remote *remote
  24. q queue
  25. exitNotifiers map[string]*exitNotifier
  26. liveRestore bool
  27. }
  28. func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) error {
  29. clnt.lock(containerID)
  30. defer clnt.unlock(containerID)
  31. container, err := clnt.getContainer(containerID)
  32. if err != nil {
  33. return err
  34. }
  35. spec, err := container.spec()
  36. if err != nil {
  37. return err
  38. }
  39. sp := spec.Process
  40. sp.Args = specp.Args
  41. sp.Terminal = specp.Terminal
  42. if specp.Env != nil {
  43. sp.Env = specp.Env
  44. }
  45. if specp.Cwd != nil {
  46. sp.Cwd = *specp.Cwd
  47. }
  48. if specp.User != nil {
  49. sp.User = specs.User{
  50. UID: specp.User.UID,
  51. GID: specp.User.GID,
  52. AdditionalGids: specp.User.AdditionalGids,
  53. }
  54. }
  55. if specp.Capabilities != nil {
  56. sp.Capabilities = specp.Capabilities
  57. }
  58. p := container.newProcess(processFriendlyName)
  59. r := &containerd.AddProcessRequest{
  60. Args: sp.Args,
  61. Cwd: sp.Cwd,
  62. Terminal: sp.Terminal,
  63. Id: containerID,
  64. Env: sp.Env,
  65. User: &containerd.User{
  66. Uid: sp.User.UID,
  67. Gid: sp.User.GID,
  68. AdditionalGids: sp.User.AdditionalGids,
  69. },
  70. Pid: processFriendlyName,
  71. Stdin: p.fifo(syscall.Stdin),
  72. Stdout: p.fifo(syscall.Stdout),
  73. Stderr: p.fifo(syscall.Stderr),
  74. Capabilities: sp.Capabilities,
  75. ApparmorProfile: sp.ApparmorProfile,
  76. SelinuxLabel: sp.SelinuxLabel,
  77. NoNewPrivileges: sp.NoNewPrivileges,
  78. Rlimits: convertRlimits(sp.Rlimits),
  79. }
  80. iopipe, err := p.openFifos(sp.Terminal)
  81. if err != nil {
  82. return err
  83. }
  84. if _, err := clnt.remote.apiClient.AddProcess(ctx, r); err != nil {
  85. p.closeFifos(iopipe)
  86. return err
  87. }
  88. container.processes[processFriendlyName] = p
  89. clnt.unlock(containerID)
  90. if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil {
  91. clnt.lock(containerID)
  92. return err
  93. }
  94. clnt.lock(containerID)
  95. return nil
  96. }
  97. func (clnt *client) prepareBundleDir(uid, gid int) (string, error) {
  98. root, err := filepath.Abs(clnt.remote.stateDir)
  99. if err != nil {
  100. return "", err
  101. }
  102. if uid == 0 && gid == 0 {
  103. return root, nil
  104. }
  105. p := string(filepath.Separator)
  106. for _, d := range strings.Split(root, string(filepath.Separator))[1:] {
  107. p = filepath.Join(p, d)
  108. fi, err := os.Stat(p)
  109. if err != nil && !os.IsNotExist(err) {
  110. return "", err
  111. }
  112. if os.IsNotExist(err) || fi.Mode()&1 == 0 {
  113. p = fmt.Sprintf("%s.%d.%d", p, uid, gid)
  114. if err := idtools.MkdirAs(p, 0700, uid, gid); err != nil && !os.IsExist(err) {
  115. return "", err
  116. }
  117. }
  118. }
  119. return p, nil
  120. }
  121. func (clnt *client) Create(containerID string, checkpoint string, checkpointDir string, spec specs.Spec, options ...CreateOption) (err error) {
  122. clnt.lock(containerID)
  123. defer clnt.unlock(containerID)
  124. if _, err := clnt.getContainer(containerID); err == nil {
  125. return fmt.Errorf("Container %s is already active", containerID)
  126. }
  127. uid, gid, err := getRootIDs(specs.Spec(spec))
  128. if err != nil {
  129. return err
  130. }
  131. dir, err := clnt.prepareBundleDir(uid, gid)
  132. if err != nil {
  133. return err
  134. }
  135. container := clnt.newContainer(filepath.Join(dir, containerID), options...)
  136. if err := container.clean(); err != nil {
  137. return err
  138. }
  139. defer func() {
  140. if err != nil {
  141. container.clean()
  142. clnt.deleteContainer(containerID)
  143. }
  144. }()
  145. if err := idtools.MkdirAllAs(container.dir, 0700, uid, gid); err != nil && !os.IsExist(err) {
  146. return err
  147. }
  148. f, err := os.Create(filepath.Join(container.dir, configFilename))
  149. if err != nil {
  150. return err
  151. }
  152. defer f.Close()
  153. if err := json.NewEncoder(f).Encode(spec); err != nil {
  154. return err
  155. }
  156. return container.start(checkpoint, checkpointDir)
  157. }
  158. func (clnt *client) Signal(containerID string, sig int) error {
  159. clnt.lock(containerID)
  160. defer clnt.unlock(containerID)
  161. _, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
  162. Id: containerID,
  163. Pid: InitFriendlyName,
  164. Signal: uint32(sig),
  165. })
  166. return err
  167. }
  168. func (clnt *client) SignalProcess(containerID string, pid string, sig int) error {
  169. clnt.lock(containerID)
  170. defer clnt.unlock(containerID)
  171. _, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
  172. Id: containerID,
  173. Pid: pid,
  174. Signal: uint32(sig),
  175. })
  176. return err
  177. }
  178. func (clnt *client) Resize(containerID, processFriendlyName string, width, height int) error {
  179. clnt.lock(containerID)
  180. defer clnt.unlock(containerID)
  181. if _, err := clnt.getContainer(containerID); err != nil {
  182. return err
  183. }
  184. _, err := clnt.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
  185. Id: containerID,
  186. Pid: processFriendlyName,
  187. Width: uint32(width),
  188. Height: uint32(height),
  189. })
  190. return err
  191. }
  192. func (clnt *client) Pause(containerID string) error {
  193. return clnt.setState(containerID, StatePause)
  194. }
  195. func (clnt *client) setState(containerID, state string) error {
  196. clnt.lock(containerID)
  197. container, err := clnt.getContainer(containerID)
  198. if err != nil {
  199. clnt.unlock(containerID)
  200. return err
  201. }
  202. if container.systemPid == 0 {
  203. clnt.unlock(containerID)
  204. return fmt.Errorf("No active process for container %s", containerID)
  205. }
  206. st := "running"
  207. if state == StatePause {
  208. st = "paused"
  209. }
  210. chstate := make(chan struct{})
  211. _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
  212. Id: containerID,
  213. Pid: InitFriendlyName,
  214. Status: st,
  215. })
  216. if err != nil {
  217. clnt.unlock(containerID)
  218. return err
  219. }
  220. container.pauseMonitor.append(state, chstate)
  221. clnt.unlock(containerID)
  222. <-chstate
  223. return nil
  224. }
  225. func (clnt *client) Resume(containerID string) error {
  226. return clnt.setState(containerID, StateResume)
  227. }
  228. func (clnt *client) Stats(containerID string) (*Stats, error) {
  229. resp, err := clnt.remote.apiClient.Stats(context.Background(), &containerd.StatsRequest{containerID})
  230. if err != nil {
  231. return nil, err
  232. }
  233. return (*Stats)(resp), nil
  234. }
  235. // Take care of the old 1.11.0 behavior in case the version upgrade
  236. // happened without a clean daemon shutdown
  237. func (clnt *client) cleanupOldRootfs(containerID string) {
  238. // Unmount and delete the bundle folder
  239. if mts, err := mount.GetMounts(); err == nil {
  240. for _, mts := range mts {
  241. if strings.HasSuffix(mts.Mountpoint, containerID+"/rootfs") {
  242. if err := syscall.Unmount(mts.Mountpoint, syscall.MNT_DETACH); err == nil {
  243. os.RemoveAll(strings.TrimSuffix(mts.Mountpoint, "/rootfs"))
  244. }
  245. break
  246. }
  247. }
  248. }
  249. }
  250. func (clnt *client) setExited(containerID string, exitCode uint32) error {
  251. clnt.lock(containerID)
  252. defer clnt.unlock(containerID)
  253. err := clnt.backend.StateChanged(containerID, StateInfo{
  254. CommonStateInfo: CommonStateInfo{
  255. State: StateExit,
  256. ExitCode: exitCode,
  257. }})
  258. clnt.cleanupOldRootfs(containerID)
  259. return err
  260. }
  261. func (clnt *client) GetPidsForContainer(containerID string) ([]int, error) {
  262. cont, err := clnt.getContainerdContainer(containerID)
  263. if err != nil {
  264. return nil, err
  265. }
  266. pids := make([]int, len(cont.Pids))
  267. for i, p := range cont.Pids {
  268. pids[i] = int(p)
  269. }
  270. return pids, nil
  271. }
  272. // Summary returns a summary of the processes running in a container.
  273. // This is a no-op on Linux.
  274. func (clnt *client) Summary(containerID string) ([]Summary, error) {
  275. return nil, nil
  276. }
  277. func (clnt *client) getContainerdContainer(containerID string) (*containerd.Container, error) {
  278. resp, err := clnt.remote.apiClient.State(context.Background(), &containerd.StateRequest{Id: containerID})
  279. if err != nil {
  280. return nil, err
  281. }
  282. for _, cont := range resp.Containers {
  283. if cont.Id == containerID {
  284. return cont, nil
  285. }
  286. }
  287. return nil, fmt.Errorf("invalid state response")
  288. }
  289. func (clnt *client) newContainer(dir string, options ...CreateOption) *container {
  290. container := &container{
  291. containerCommon: containerCommon{
  292. process: process{
  293. dir: dir,
  294. processCommon: processCommon{
  295. containerID: filepath.Base(dir),
  296. client: clnt,
  297. friendlyName: InitFriendlyName,
  298. },
  299. },
  300. processes: make(map[string]*process),
  301. },
  302. }
  303. for _, option := range options {
  304. if err := option.Apply(container); err != nil {
  305. logrus.Errorf("libcontainerd: newContainer(): %v", err)
  306. }
  307. }
  308. return container
  309. }
  310. func (clnt *client) UpdateResources(containerID string, resources Resources) error {
  311. clnt.lock(containerID)
  312. defer clnt.unlock(containerID)
  313. container, err := clnt.getContainer(containerID)
  314. if err != nil {
  315. return err
  316. }
  317. if container.systemPid == 0 {
  318. return fmt.Errorf("No active process for container %s", containerID)
  319. }
  320. _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
  321. Id: containerID,
  322. Pid: InitFriendlyName,
  323. Resources: (*containerd.UpdateResource)(&resources),
  324. })
  325. if err != nil {
  326. return err
  327. }
  328. return nil
  329. }
  330. func (clnt *client) getExitNotifier(containerID string) *exitNotifier {
  331. clnt.mapMutex.RLock()
  332. defer clnt.mapMutex.RUnlock()
  333. return clnt.exitNotifiers[containerID]
  334. }
  335. func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
  336. clnt.mapMutex.Lock()
  337. w, ok := clnt.exitNotifiers[containerID]
  338. defer clnt.mapMutex.Unlock()
  339. if !ok {
  340. w = &exitNotifier{c: make(chan struct{}), client: clnt}
  341. clnt.exitNotifiers[containerID] = w
  342. }
  343. return w
  344. }
  345. func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, options ...CreateOption) (err error) {
  346. clnt.lock(cont.Id)
  347. defer clnt.unlock(cont.Id)
  348. logrus.Debugf("libcontainerd: restore container %s state %s", cont.Id, cont.Status)
  349. containerID := cont.Id
  350. if _, err := clnt.getContainer(containerID); err == nil {
  351. return fmt.Errorf("container %s is already active", containerID)
  352. }
  353. defer func() {
  354. if err != nil {
  355. clnt.deleteContainer(cont.Id)
  356. }
  357. }()
  358. container := clnt.newContainer(cont.BundlePath, options...)
  359. container.systemPid = systemPid(cont)
  360. var terminal bool
  361. for _, p := range cont.Processes {
  362. if p.Pid == InitFriendlyName {
  363. terminal = p.Terminal
  364. }
  365. }
  366. iopipe, err := container.openFifos(terminal)
  367. if err != nil {
  368. return err
  369. }
  370. if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil {
  371. return err
  372. }
  373. clnt.appendContainer(container)
  374. err = clnt.backend.StateChanged(containerID, StateInfo{
  375. CommonStateInfo: CommonStateInfo{
  376. State: StateRestore,
  377. Pid: container.systemPid,
  378. }})
  379. if err != nil {
  380. return err
  381. }
  382. if lastEvent != nil {
  383. // This should only be a pause or resume event
  384. if lastEvent.Type == StatePause || lastEvent.Type == StateResume {
  385. return clnt.backend.StateChanged(containerID, StateInfo{
  386. CommonStateInfo: CommonStateInfo{
  387. State: lastEvent.Type,
  388. Pid: container.systemPid,
  389. }})
  390. }
  391. logrus.Warnf("libcontainerd: unexpected backlog event: %#v", lastEvent)
  392. }
  393. return nil
  394. }
  395. func (clnt *client) getContainerLastEventSinceTime(id string, tsp *timestamp.Timestamp) (*containerd.Event, error) {
  396. er := &containerd.EventsRequest{
  397. Timestamp: tsp,
  398. StoredOnly: true,
  399. Id: id,
  400. }
  401. events, err := clnt.remote.apiClient.Events(context.Background(), er)
  402. if err != nil {
  403. logrus.Errorf("libcontainerd: failed to get container events stream for %s: %q", er.Id, err)
  404. return nil, err
  405. }
  406. var ev *containerd.Event
  407. for {
  408. e, err := events.Recv()
  409. if err != nil {
  410. if err.Error() == "EOF" {
  411. break
  412. }
  413. logrus.Errorf("libcontainerd: failed to get container event for %s: %q", id, err)
  414. return nil, err
  415. }
  416. logrus.Debugf("libcontainerd: received past event %#v", e)
  417. switch e.Type {
  418. case StateExit, StatePause, StateResume:
  419. ev = e
  420. }
  421. }
  422. return ev, nil
  423. }
  424. func (clnt *client) getContainerLastEvent(id string) (*containerd.Event, error) {
  425. ev, err := clnt.getContainerLastEventSinceTime(id, clnt.remote.restoreFromTimestamp)
  426. if err == nil && ev == nil {
  427. // If ev is nil and the container is running in containerd,
  428. // we already consumed all the event of the
  429. // container, included the "exit" one.
  430. // Thus, we request all events containerd has in memory for
  431. // this container in order to get the last one (which should
  432. // be an exit event)
  433. logrus.Warnf("libcontainerd: client is out of sync, restore was called on a fully synced container (%s).", id)
  434. // Request all events since beginning of time
  435. t := time.Unix(0, 0)
  436. tsp, err := ptypes.TimestampProto(t)
  437. if err != nil {
  438. logrus.Errorf("libcontainerd: getLastEventSinceTime() failed to convert timestamp: %q", err)
  439. return nil, err
  440. }
  441. return clnt.getContainerLastEventSinceTime(id, tsp)
  442. }
  443. return ev, err
  444. }
  445. func (clnt *client) Restore(containerID string, options ...CreateOption) error {
  446. // Synchronize with live events
  447. clnt.remote.Lock()
  448. defer clnt.remote.Unlock()
  449. // Check that containerd still knows this container.
  450. //
  451. // In the unlikely event that Restore for this container process
  452. // the its past event before the main loop, the event will be
  453. // processed twice. However, this is not an issue as all those
  454. // events will do is change the state of the container to be
  455. // exactly the same.
  456. cont, err := clnt.getContainerdContainer(containerID)
  457. // Get its last event
  458. ev, eerr := clnt.getContainerLastEvent(containerID)
  459. if err != nil || cont.Status == "Stopped" {
  460. if err != nil && !strings.Contains(err.Error(), "container not found") {
  461. // Legitimate error
  462. return err
  463. }
  464. if ev == nil {
  465. if _, err := clnt.getContainer(containerID); err == nil {
  466. // If ev is nil and the container is running in containerd,
  467. // we already consumed all the event of the
  468. // container, included the "exit" one.
  469. // Thus we return to avoid overriding the Exit Code.
  470. logrus.Warnf("libcontainerd: restore was called on a fully synced container (%s)", containerID)
  471. return nil
  472. }
  473. // the container is not running so we need to fix the state within docker
  474. ev = &containerd.Event{
  475. Type: StateExit,
  476. Status: 1,
  477. }
  478. }
  479. // get the exit status for this container
  480. ec := uint32(0)
  481. if eerr == nil && ev.Type == StateExit {
  482. ec = ev.Status
  483. }
  484. clnt.setExited(containerID, ec)
  485. return nil
  486. }
  487. // container is still alive
  488. if clnt.liveRestore {
  489. if err := clnt.restore(cont, ev, options...); err != nil {
  490. logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err)
  491. }
  492. return nil
  493. }
  494. // Kill the container if liveRestore == false
  495. w := clnt.getOrCreateExitNotifier(containerID)
  496. clnt.lock(cont.Id)
  497. container := clnt.newContainer(cont.BundlePath)
  498. container.systemPid = systemPid(cont)
  499. clnt.appendContainer(container)
  500. clnt.unlock(cont.Id)
  501. container.discardFifos()
  502. if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil {
  503. logrus.Errorf("libcontainerd: error sending sigterm to %v: %v", containerID, err)
  504. }
  505. // Let the main loop handle the exit event
  506. clnt.remote.Unlock()
  507. select {
  508. case <-time.After(10 * time.Second):
  509. if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil {
  510. logrus.Errorf("libcontainerd: error sending sigkill to %v: %v", containerID, err)
  511. }
  512. select {
  513. case <-time.After(2 * time.Second):
  514. case <-w.wait():
  515. // relock because of the defer
  516. clnt.remote.Lock()
  517. return nil
  518. }
  519. case <-w.wait():
  520. // relock because of the defer
  521. clnt.remote.Lock()
  522. return nil
  523. }
  524. // relock because of the defer
  525. clnt.remote.Lock()
  526. clnt.deleteContainer(containerID)
  527. return clnt.setExited(containerID, uint32(255))
  528. }
  529. type exitNotifier struct {
  530. id string
  531. client *client
  532. c chan struct{}
  533. once sync.Once
  534. }
  535. func (en *exitNotifier) close() {
  536. en.once.Do(func() {
  537. close(en.c)
  538. en.client.mapMutex.Lock()
  539. if en == en.client.exitNotifiers[en.id] {
  540. delete(en.client.exitNotifiers, en.id)
  541. }
  542. en.client.mapMutex.Unlock()
  543. })
  544. }
  545. func (en *exitNotifier) wait() <-chan struct{} {
  546. return en.c
  547. }
  548. func (clnt *client) CreateCheckpoint(containerID string, checkpointID string, checkpointDir string, exit bool) error {
  549. clnt.lock(containerID)
  550. defer clnt.unlock(containerID)
  551. if _, err := clnt.getContainer(containerID); err != nil {
  552. return err
  553. }
  554. _, err := clnt.remote.apiClient.CreateCheckpoint(context.Background(), &containerd.CreateCheckpointRequest{
  555. Id: containerID,
  556. Checkpoint: &containerd.Checkpoint{
  557. Name: checkpointID,
  558. Exit: exit,
  559. Tcp: true,
  560. UnixSockets: true,
  561. Shell: false,
  562. EmptyNS: []string{"network"},
  563. },
  564. CheckpointDir: checkpointDir,
  565. })
  566. return err
  567. }
  568. func (clnt *client) DeleteCheckpoint(containerID string, checkpointID string, checkpointDir string) error {
  569. clnt.lock(containerID)
  570. defer clnt.unlock(containerID)
  571. if _, err := clnt.getContainer(containerID); err != nil {
  572. return err
  573. }
  574. _, err := clnt.remote.apiClient.DeleteCheckpoint(context.Background(), &containerd.DeleteCheckpointRequest{
  575. Id: containerID,
  576. Name: checkpointID,
  577. CheckpointDir: checkpointDir,
  578. })
  579. return err
  580. }
  581. func (clnt *client) ListCheckpoints(containerID string, checkpointDir string) (*Checkpoints, error) {
  582. clnt.lock(containerID)
  583. defer clnt.unlock(containerID)
  584. if _, err := clnt.getContainer(containerID); err != nil {
  585. return nil, err
  586. }
  587. resp, err := clnt.remote.apiClient.ListCheckpoint(context.Background(), &containerd.ListCheckpointRequest{
  588. Id: containerID,
  589. CheckpointDir: checkpointDir,
  590. })
  591. if err != nil {
  592. return nil, err
  593. }
  594. return (*Checkpoints)(resp), nil
  595. }