client_linux.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627
  1. package libcontainerd
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "strings"
  8. "sync"
  9. "syscall"
  10. "time"
  11. "github.com/Sirupsen/logrus"
  12. containerd "github.com/docker/containerd/api/grpc/types"
  13. "github.com/docker/docker/pkg/idtools"
  14. "github.com/docker/docker/pkg/mount"
  15. "github.com/golang/protobuf/ptypes"
  16. "github.com/golang/protobuf/ptypes/timestamp"
  17. specs "github.com/opencontainers/specs/specs-go"
  18. "golang.org/x/net/context"
  19. )
  20. type client struct {
  21. clientCommon
  22. // Platform specific properties below here.
  23. remote *remote
  24. q queue
  25. exitNotifiers map[string]*exitNotifier
  26. liveRestore bool
  27. }
  28. func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) error {
  29. clnt.lock(containerID)
  30. defer clnt.unlock(containerID)
  31. container, err := clnt.getContainer(containerID)
  32. if err != nil {
  33. return err
  34. }
  35. spec, err := container.spec()
  36. if err != nil {
  37. return err
  38. }
  39. sp := spec.Process
  40. sp.Args = specp.Args
  41. sp.Terminal = specp.Terminal
  42. if specp.Env != nil {
  43. sp.Env = specp.Env
  44. }
  45. if specp.Cwd != nil {
  46. sp.Cwd = *specp.Cwd
  47. }
  48. if specp.User != nil {
  49. sp.User = specs.User{
  50. UID: specp.User.UID,
  51. GID: specp.User.GID,
  52. AdditionalGids: specp.User.AdditionalGids,
  53. }
  54. }
  55. if specp.Capabilities != nil {
  56. sp.Capabilities = specp.Capabilities
  57. }
  58. p := container.newProcess(processFriendlyName)
  59. r := &containerd.AddProcessRequest{
  60. Args: sp.Args,
  61. Cwd: sp.Cwd,
  62. Terminal: sp.Terminal,
  63. Id: containerID,
  64. Env: sp.Env,
  65. User: &containerd.User{
  66. Uid: sp.User.UID,
  67. Gid: sp.User.GID,
  68. AdditionalGids: sp.User.AdditionalGids,
  69. },
  70. Pid: processFriendlyName,
  71. Stdin: p.fifo(syscall.Stdin),
  72. Stdout: p.fifo(syscall.Stdout),
  73. Stderr: p.fifo(syscall.Stderr),
  74. Capabilities: sp.Capabilities,
  75. ApparmorProfile: sp.ApparmorProfile,
  76. SelinuxLabel: sp.SelinuxLabel,
  77. NoNewPrivileges: sp.NoNewPrivileges,
  78. Rlimits: convertRlimits(sp.Rlimits),
  79. }
  80. iopipe, err := p.openFifos(sp.Terminal)
  81. if err != nil {
  82. return err
  83. }
  84. if _, err := clnt.remote.apiClient.AddProcess(ctx, r); err != nil {
  85. p.closeFifos(iopipe)
  86. return err
  87. }
  88. container.processes[processFriendlyName] = p
  89. clnt.unlock(containerID)
  90. if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil {
  91. return err
  92. }
  93. clnt.lock(containerID)
  94. return nil
  95. }
  96. func (clnt *client) prepareBundleDir(uid, gid int) (string, error) {
  97. root, err := filepath.Abs(clnt.remote.stateDir)
  98. if err != nil {
  99. return "", err
  100. }
  101. if uid == 0 && gid == 0 {
  102. return root, nil
  103. }
  104. p := string(filepath.Separator)
  105. for _, d := range strings.Split(root, string(filepath.Separator))[1:] {
  106. p = filepath.Join(p, d)
  107. fi, err := os.Stat(p)
  108. if err != nil && !os.IsNotExist(err) {
  109. return "", err
  110. }
  111. if os.IsNotExist(err) || fi.Mode()&1 == 0 {
  112. p = fmt.Sprintf("%s.%d.%d", p, uid, gid)
  113. if err := idtools.MkdirAs(p, 0700, uid, gid); err != nil && !os.IsExist(err) {
  114. return "", err
  115. }
  116. }
  117. }
  118. return p, nil
  119. }
  120. func (clnt *client) Create(containerID string, spec Spec, options ...CreateOption) (err error) {
  121. clnt.lock(containerID)
  122. defer clnt.unlock(containerID)
  123. if ctr, err := clnt.getContainer(containerID); err == nil {
  124. if ctr.restarting {
  125. ctr.restartManager.Cancel()
  126. ctr.clean()
  127. } else {
  128. return fmt.Errorf("Container %s is already active", containerID)
  129. }
  130. }
  131. uid, gid, err := getRootIDs(specs.Spec(spec))
  132. if err != nil {
  133. return err
  134. }
  135. dir, err := clnt.prepareBundleDir(uid, gid)
  136. if err != nil {
  137. return err
  138. }
  139. container := clnt.newContainer(filepath.Join(dir, containerID), options...)
  140. if err := container.clean(); err != nil {
  141. return err
  142. }
  143. defer func() {
  144. if err != nil {
  145. container.clean()
  146. clnt.deleteContainer(containerID)
  147. }
  148. }()
  149. if err := idtools.MkdirAllAs(container.dir, 0700, uid, gid); err != nil && !os.IsExist(err) {
  150. return err
  151. }
  152. f, err := os.Create(filepath.Join(container.dir, configFilename))
  153. if err != nil {
  154. return err
  155. }
  156. defer f.Close()
  157. if err := json.NewEncoder(f).Encode(spec); err != nil {
  158. return err
  159. }
  160. return container.start()
  161. }
  162. func (clnt *client) Signal(containerID string, sig int) error {
  163. clnt.lock(containerID)
  164. defer clnt.unlock(containerID)
  165. _, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
  166. Id: containerID,
  167. Pid: InitFriendlyName,
  168. Signal: uint32(sig),
  169. })
  170. return err
  171. }
  172. func (clnt *client) SignalProcess(containerID string, pid string, sig int) error {
  173. clnt.lock(containerID)
  174. defer clnt.unlock(containerID)
  175. _, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
  176. Id: containerID,
  177. Pid: pid,
  178. Signal: uint32(sig),
  179. })
  180. return err
  181. }
  182. func (clnt *client) Resize(containerID, processFriendlyName string, width, height int) error {
  183. clnt.lock(containerID)
  184. defer clnt.unlock(containerID)
  185. if _, err := clnt.getContainer(containerID); err != nil {
  186. return err
  187. }
  188. _, err := clnt.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
  189. Id: containerID,
  190. Pid: processFriendlyName,
  191. Width: uint32(width),
  192. Height: uint32(height),
  193. })
  194. return err
  195. }
  196. func (clnt *client) Pause(containerID string) error {
  197. return clnt.setState(containerID, StatePause)
  198. }
  199. func (clnt *client) setState(containerID, state string) error {
  200. clnt.lock(containerID)
  201. container, err := clnt.getContainer(containerID)
  202. if err != nil {
  203. clnt.unlock(containerID)
  204. return err
  205. }
  206. if container.systemPid == 0 {
  207. clnt.unlock(containerID)
  208. return fmt.Errorf("No active process for container %s", containerID)
  209. }
  210. st := "running"
  211. if state == StatePause {
  212. st = "paused"
  213. }
  214. chstate := make(chan struct{})
  215. _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
  216. Id: containerID,
  217. Pid: InitFriendlyName,
  218. Status: st,
  219. })
  220. if err != nil {
  221. clnt.unlock(containerID)
  222. return err
  223. }
  224. container.pauseMonitor.append(state, chstate)
  225. clnt.unlock(containerID)
  226. <-chstate
  227. return nil
  228. }
  229. func (clnt *client) Resume(containerID string) error {
  230. return clnt.setState(containerID, StateResume)
  231. }
  232. func (clnt *client) Stats(containerID string) (*Stats, error) {
  233. resp, err := clnt.remote.apiClient.Stats(context.Background(), &containerd.StatsRequest{containerID})
  234. if err != nil {
  235. return nil, err
  236. }
  237. return (*Stats)(resp), nil
  238. }
  239. // Take care of the old 1.11.0 behavior in case the version upgrade
  240. // happened without a clean daemon shutdown
  241. func (clnt *client) cleanupOldRootfs(containerID string) {
  242. // Unmount and delete the bundle folder
  243. if mts, err := mount.GetMounts(); err == nil {
  244. for _, mts := range mts {
  245. if strings.HasSuffix(mts.Mountpoint, containerID+"/rootfs") {
  246. if err := syscall.Unmount(mts.Mountpoint, syscall.MNT_DETACH); err == nil {
  247. os.RemoveAll(strings.TrimSuffix(mts.Mountpoint, "/rootfs"))
  248. }
  249. break
  250. }
  251. }
  252. }
  253. }
  254. func (clnt *client) setExited(containerID string, exitCode uint32) error {
  255. clnt.lock(containerID)
  256. defer clnt.unlock(containerID)
  257. err := clnt.backend.StateChanged(containerID, StateInfo{
  258. CommonStateInfo: CommonStateInfo{
  259. State: StateExit,
  260. ExitCode: exitCode,
  261. }})
  262. clnt.cleanupOldRootfs(containerID)
  263. return err
  264. }
  265. func (clnt *client) GetPidsForContainer(containerID string) ([]int, error) {
  266. cont, err := clnt.getContainerdContainer(containerID)
  267. if err != nil {
  268. return nil, err
  269. }
  270. pids := make([]int, len(cont.Pids))
  271. for i, p := range cont.Pids {
  272. pids[i] = int(p)
  273. }
  274. return pids, nil
  275. }
  276. // Summary returns a summary of the processes running in a container.
  277. // This is a no-op on Linux.
  278. func (clnt *client) Summary(containerID string) ([]Summary, error) {
  279. return nil, nil
  280. }
  281. func (clnt *client) getContainerdContainer(containerID string) (*containerd.Container, error) {
  282. resp, err := clnt.remote.apiClient.State(context.Background(), &containerd.StateRequest{Id: containerID})
  283. if err != nil {
  284. return nil, err
  285. }
  286. for _, cont := range resp.Containers {
  287. if cont.Id == containerID {
  288. return cont, nil
  289. }
  290. }
  291. return nil, fmt.Errorf("invalid state response")
  292. }
  293. func (clnt *client) newContainer(dir string, options ...CreateOption) *container {
  294. container := &container{
  295. containerCommon: containerCommon{
  296. process: process{
  297. dir: dir,
  298. processCommon: processCommon{
  299. containerID: filepath.Base(dir),
  300. client: clnt,
  301. friendlyName: InitFriendlyName,
  302. },
  303. },
  304. processes: make(map[string]*process),
  305. },
  306. }
  307. for _, option := range options {
  308. if err := option.Apply(container); err != nil {
  309. logrus.Errorf("libcontainerd: newContainer(): %v", err)
  310. }
  311. }
  312. return container
  313. }
  314. func (clnt *client) UpdateResources(containerID string, resources Resources) error {
  315. clnt.lock(containerID)
  316. defer clnt.unlock(containerID)
  317. container, err := clnt.getContainer(containerID)
  318. if err != nil {
  319. return err
  320. }
  321. if container.systemPid == 0 {
  322. return fmt.Errorf("No active process for container %s", containerID)
  323. }
  324. _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
  325. Id: containerID,
  326. Pid: InitFriendlyName,
  327. Resources: (*containerd.UpdateResource)(&resources),
  328. })
  329. if err != nil {
  330. return err
  331. }
  332. return nil
  333. }
  334. func (clnt *client) getExitNotifier(containerID string) *exitNotifier {
  335. clnt.mapMutex.RLock()
  336. defer clnt.mapMutex.RUnlock()
  337. return clnt.exitNotifiers[containerID]
  338. }
  339. func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
  340. clnt.mapMutex.Lock()
  341. w, ok := clnt.exitNotifiers[containerID]
  342. defer clnt.mapMutex.Unlock()
  343. if !ok {
  344. w = &exitNotifier{c: make(chan struct{}), client: clnt}
  345. clnt.exitNotifiers[containerID] = w
  346. }
  347. return w
  348. }
  349. func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, options ...CreateOption) (err error) {
  350. clnt.lock(cont.Id)
  351. defer clnt.unlock(cont.Id)
  352. logrus.Debugf("libcontainerd: restore container %s state %s", cont.Id, cont.Status)
  353. containerID := cont.Id
  354. if _, err := clnt.getContainer(containerID); err == nil {
  355. return fmt.Errorf("container %s is already active", containerID)
  356. }
  357. defer func() {
  358. if err != nil {
  359. clnt.deleteContainer(cont.Id)
  360. }
  361. }()
  362. container := clnt.newContainer(cont.BundlePath, options...)
  363. container.systemPid = systemPid(cont)
  364. var terminal bool
  365. for _, p := range cont.Processes {
  366. if p.Pid == InitFriendlyName {
  367. terminal = p.Terminal
  368. }
  369. }
  370. iopipe, err := container.openFifos(terminal)
  371. if err != nil {
  372. return err
  373. }
  374. if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil {
  375. return err
  376. }
  377. clnt.appendContainer(container)
  378. err = clnt.backend.StateChanged(containerID, StateInfo{
  379. CommonStateInfo: CommonStateInfo{
  380. State: StateRestore,
  381. Pid: container.systemPid,
  382. }})
  383. if err != nil {
  384. return err
  385. }
  386. if lastEvent != nil {
  387. // This should only be a pause or resume event
  388. if lastEvent.Type == StatePause || lastEvent.Type == StateResume {
  389. return clnt.backend.StateChanged(containerID, StateInfo{
  390. CommonStateInfo: CommonStateInfo{
  391. State: lastEvent.Type,
  392. Pid: container.systemPid,
  393. }})
  394. }
  395. logrus.Warnf("libcontainerd: unexpected backlog event: %#v", lastEvent)
  396. }
  397. return nil
  398. }
  399. func (clnt *client) getContainerLastEventSinceTime(id string, tsp *timestamp.Timestamp) (*containerd.Event, error) {
  400. er := &containerd.EventsRequest{
  401. Timestamp: tsp,
  402. StoredOnly: true,
  403. Id: id,
  404. }
  405. events, err := clnt.remote.apiClient.Events(context.Background(), er)
  406. if err != nil {
  407. logrus.Errorf("libcontainerd: failed to get container events stream for %s: %q", er.Id, err)
  408. return nil, err
  409. }
  410. var ev *containerd.Event
  411. for {
  412. e, err := events.Recv()
  413. if err != nil {
  414. if err.Error() == "EOF" {
  415. break
  416. }
  417. logrus.Errorf("libcontainerd: failed to get container event for %s: %q", id, err)
  418. return nil, err
  419. }
  420. logrus.Debugf("libcontainerd: received past event %#v", e)
  421. switch e.Type {
  422. case StateExit, StatePause, StateResume:
  423. ev = e
  424. }
  425. }
  426. return ev, nil
  427. }
  428. func (clnt *client) getContainerLastEvent(id string) (*containerd.Event, error) {
  429. ev, err := clnt.getContainerLastEventSinceTime(id, clnt.remote.restoreFromTimestamp)
  430. if err == nil && ev == nil {
  431. // If ev is nil and the container is running in containerd,
  432. // we already consumed all the event of the
  433. // container, included the "exit" one.
  434. // Thus, we request all events containerd has in memory for
  435. // this container in order to get the last one (which should
  436. // be an exit event)
  437. logrus.Warnf("libcontainerd: client is out of sync, restore was called on a fully synced container (%s).", id)
  438. // Request all events since beginning of time
  439. t := time.Unix(0, 0)
  440. tsp, err := ptypes.TimestampProto(t)
  441. if err != nil {
  442. logrus.Errorf("libcontainerd: getLastEventSinceTime() failed to convert timestamp: %q", err)
  443. return nil, err
  444. }
  445. return clnt.getContainerLastEventSinceTime(id, tsp)
  446. }
  447. return ev, err
  448. }
  449. func (clnt *client) Restore(containerID string, options ...CreateOption) error {
  450. // Synchronize with live events
  451. clnt.remote.Lock()
  452. defer clnt.remote.Unlock()
  453. // Check that containerd still knows this container.
  454. //
  455. // In the unlikely event that Restore for this container process
  456. // the its past event before the main loop, the event will be
  457. // processed twice. However, this is not an issue as all those
  458. // events will do is change the state of the container to be
  459. // exactly the same.
  460. cont, err := clnt.getContainerdContainer(containerID)
  461. // Get its last event
  462. ev, eerr := clnt.getContainerLastEvent(containerID)
  463. if err != nil || cont.Status == "Stopped" {
  464. if err != nil && !strings.Contains(err.Error(), "container not found") {
  465. // Legitimate error
  466. return err
  467. }
  468. if ev == nil {
  469. if _, err := clnt.getContainer(containerID); err == nil {
  470. // If ev is nil and the container is running in containerd,
  471. // we already consumed all the event of the
  472. // container, included the "exit" one.
  473. // Thus we return to avoid overriding the Exit Code.
  474. logrus.Warnf("libcontainerd: restore was called on a fully synced container (%s)", containerID)
  475. return nil
  476. }
  477. // the container is not running so we need to fix the state within docker
  478. ev = &containerd.Event{
  479. Type: StateExit,
  480. Status: 1,
  481. }
  482. }
  483. // get the exit status for this container
  484. ec := uint32(0)
  485. if eerr == nil && ev.Type == StateExit {
  486. ec = ev.Status
  487. }
  488. clnt.setExited(containerID, ec)
  489. return nil
  490. }
  491. // container is still alive
  492. if clnt.liveRestore {
  493. if err := clnt.restore(cont, ev, options...); err != nil {
  494. logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err)
  495. }
  496. return nil
  497. }
  498. // Kill the container if liveRestore == false
  499. w := clnt.getOrCreateExitNotifier(containerID)
  500. clnt.lock(cont.Id)
  501. container := clnt.newContainer(cont.BundlePath)
  502. container.systemPid = systemPid(cont)
  503. clnt.appendContainer(container)
  504. clnt.unlock(cont.Id)
  505. container.discardFifos()
  506. if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil {
  507. logrus.Errorf("libcontainerd: error sending sigterm to %v: %v", containerID, err)
  508. }
  509. // Let the main loop handle the exit event
  510. clnt.remote.Unlock()
  511. select {
  512. case <-time.After(10 * time.Second):
  513. if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil {
  514. logrus.Errorf("libcontainerd: error sending sigkill to %v: %v", containerID, err)
  515. }
  516. select {
  517. case <-time.After(2 * time.Second):
  518. case <-w.wait():
  519. // relock because of the defer
  520. clnt.remote.Lock()
  521. return nil
  522. }
  523. case <-w.wait():
  524. // relock because of the defer
  525. clnt.remote.Lock()
  526. return nil
  527. }
  528. // relock because of the defer
  529. clnt.remote.Lock()
  530. clnt.deleteContainer(containerID)
  531. return clnt.setExited(containerID, uint32(255))
  532. }
  533. type exitNotifier struct {
  534. id string
  535. client *client
  536. c chan struct{}
  537. once sync.Once
  538. }
  539. func (en *exitNotifier) close() {
  540. en.once.Do(func() {
  541. close(en.c)
  542. en.client.mapMutex.Lock()
  543. if en == en.client.exitNotifiers[en.id] {
  544. delete(en.client.exitNotifiers, en.id)
  545. }
  546. en.client.mapMutex.Unlock()
  547. })
  548. }
  549. func (en *exitNotifier) wait() <-chan struct{} {
  550. return en.c
  551. }