client_linux.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654
  1. package libcontainerd
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "strings"
  8. "sync"
  9. "syscall"
  10. "time"
  11. "github.com/Sirupsen/logrus"
  12. containerd "github.com/docker/containerd/api/grpc/types"
  13. "github.com/docker/docker/pkg/idtools"
  14. "github.com/docker/docker/pkg/ioutils"
  15. "github.com/docker/docker/pkg/mount"
  16. "github.com/golang/protobuf/ptypes"
  17. "github.com/golang/protobuf/ptypes/timestamp"
  18. specs "github.com/opencontainers/specs/specs-go"
  19. "golang.org/x/net/context"
  20. )
  21. type client struct {
  22. clientCommon
  23. // Platform specific properties below here.
  24. remote *remote
  25. q queue
  26. exitNotifiers map[string]*exitNotifier
  27. liveRestore bool
  28. }
  29. func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) error {
  30. clnt.lock(containerID)
  31. defer clnt.unlock(containerID)
  32. container, err := clnt.getContainer(containerID)
  33. if err != nil {
  34. return err
  35. }
  36. spec, err := container.spec()
  37. if err != nil {
  38. return err
  39. }
  40. sp := spec.Process
  41. sp.Args = specp.Args
  42. sp.Terminal = specp.Terminal
  43. if specp.Env != nil {
  44. sp.Env = specp.Env
  45. }
  46. if specp.Cwd != nil {
  47. sp.Cwd = *specp.Cwd
  48. }
  49. if specp.User != nil {
  50. sp.User = specs.User{
  51. UID: specp.User.UID,
  52. GID: specp.User.GID,
  53. AdditionalGids: specp.User.AdditionalGids,
  54. }
  55. }
  56. if specp.Capabilities != nil {
  57. sp.Capabilities = specp.Capabilities
  58. }
  59. p := container.newProcess(processFriendlyName)
  60. r := &containerd.AddProcessRequest{
  61. Args: sp.Args,
  62. Cwd: sp.Cwd,
  63. Terminal: sp.Terminal,
  64. Id: containerID,
  65. Env: sp.Env,
  66. User: &containerd.User{
  67. Uid: sp.User.UID,
  68. Gid: sp.User.GID,
  69. AdditionalGids: sp.User.AdditionalGids,
  70. },
  71. Pid: processFriendlyName,
  72. Stdin: p.fifo(syscall.Stdin),
  73. Stdout: p.fifo(syscall.Stdout),
  74. Stderr: p.fifo(syscall.Stderr),
  75. Capabilities: sp.Capabilities,
  76. ApparmorProfile: sp.ApparmorProfile,
  77. SelinuxLabel: sp.SelinuxLabel,
  78. NoNewPrivileges: sp.NoNewPrivileges,
  79. Rlimits: convertRlimits(sp.Rlimits),
  80. }
  81. iopipe, err := p.openFifos(sp.Terminal)
  82. if err != nil {
  83. return err
  84. }
  85. if _, err := clnt.remote.apiClient.AddProcess(ctx, r); err != nil {
  86. p.closeFifos(iopipe)
  87. return err
  88. }
  89. var stdinOnce sync.Once
  90. stdin := iopipe.Stdin
  91. iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
  92. var err error
  93. stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
  94. err = stdin.Close()
  95. if err2 := p.sendCloseStdin(); err == nil {
  96. err = err2
  97. }
  98. })
  99. return err
  100. })
  101. container.processes[processFriendlyName] = p
  102. clnt.unlock(containerID)
  103. if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil {
  104. clnt.lock(containerID)
  105. p.closeFifos(iopipe)
  106. return err
  107. }
  108. clnt.lock(containerID)
  109. return nil
  110. }
  111. func (clnt *client) prepareBundleDir(uid, gid int) (string, error) {
  112. root, err := filepath.Abs(clnt.remote.stateDir)
  113. if err != nil {
  114. return "", err
  115. }
  116. if uid == 0 && gid == 0 {
  117. return root, nil
  118. }
  119. p := string(filepath.Separator)
  120. for _, d := range strings.Split(root, string(filepath.Separator))[1:] {
  121. p = filepath.Join(p, d)
  122. fi, err := os.Stat(p)
  123. if err != nil && !os.IsNotExist(err) {
  124. return "", err
  125. }
  126. if os.IsNotExist(err) || fi.Mode()&1 == 0 {
  127. p = fmt.Sprintf("%s.%d.%d", p, uid, gid)
  128. if err := idtools.MkdirAs(p, 0700, uid, gid); err != nil && !os.IsExist(err) {
  129. return "", err
  130. }
  131. }
  132. }
  133. return p, nil
  134. }
  135. func (clnt *client) Create(containerID string, spec Spec, options ...CreateOption) (err error) {
  136. clnt.lock(containerID)
  137. defer clnt.unlock(containerID)
  138. if ctr, err := clnt.getContainer(containerID); err == nil {
  139. if ctr.restarting {
  140. ctr.restartManager.Cancel()
  141. ctr.clean()
  142. } else {
  143. return fmt.Errorf("Container %s is already active", containerID)
  144. }
  145. }
  146. uid, gid, err := getRootIDs(specs.Spec(spec))
  147. if err != nil {
  148. return err
  149. }
  150. dir, err := clnt.prepareBundleDir(uid, gid)
  151. if err != nil {
  152. return err
  153. }
  154. container := clnt.newContainer(filepath.Join(dir, containerID), options...)
  155. if err := container.clean(); err != nil {
  156. return err
  157. }
  158. defer func() {
  159. if err != nil {
  160. container.clean()
  161. clnt.deleteContainer(containerID)
  162. }
  163. }()
  164. if err := idtools.MkdirAllAs(container.dir, 0700, uid, gid); err != nil && !os.IsExist(err) {
  165. return err
  166. }
  167. f, err := os.Create(filepath.Join(container.dir, configFilename))
  168. if err != nil {
  169. return err
  170. }
  171. defer f.Close()
  172. if err := json.NewEncoder(f).Encode(spec); err != nil {
  173. return err
  174. }
  175. return container.start()
  176. }
  177. func (clnt *client) Signal(containerID string, sig int) error {
  178. clnt.lock(containerID)
  179. defer clnt.unlock(containerID)
  180. _, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
  181. Id: containerID,
  182. Pid: InitFriendlyName,
  183. Signal: uint32(sig),
  184. })
  185. return err
  186. }
  187. func (clnt *client) SignalProcess(containerID string, pid string, sig int) error {
  188. clnt.lock(containerID)
  189. defer clnt.unlock(containerID)
  190. _, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
  191. Id: containerID,
  192. Pid: pid,
  193. Signal: uint32(sig),
  194. })
  195. return err
  196. }
  197. func (clnt *client) Resize(containerID, processFriendlyName string, width, height int) error {
  198. clnt.lock(containerID)
  199. defer clnt.unlock(containerID)
  200. if _, err := clnt.getContainer(containerID); err != nil {
  201. return err
  202. }
  203. _, err := clnt.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
  204. Id: containerID,
  205. Pid: processFriendlyName,
  206. Width: uint32(width),
  207. Height: uint32(height),
  208. })
  209. return err
  210. }
  211. func (clnt *client) Pause(containerID string) error {
  212. return clnt.setState(containerID, StatePause)
  213. }
  214. func (clnt *client) setState(containerID, state string) error {
  215. clnt.lock(containerID)
  216. container, err := clnt.getContainer(containerID)
  217. if err != nil {
  218. clnt.unlock(containerID)
  219. return err
  220. }
  221. if container.systemPid == 0 {
  222. clnt.unlock(containerID)
  223. return fmt.Errorf("No active process for container %s", containerID)
  224. }
  225. st := "running"
  226. if state == StatePause {
  227. st = "paused"
  228. }
  229. chstate := make(chan struct{})
  230. _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
  231. Id: containerID,
  232. Pid: InitFriendlyName,
  233. Status: st,
  234. })
  235. if err != nil {
  236. clnt.unlock(containerID)
  237. return err
  238. }
  239. container.pauseMonitor.append(state, chstate)
  240. clnt.unlock(containerID)
  241. <-chstate
  242. return nil
  243. }
  244. func (clnt *client) Resume(containerID string) error {
  245. return clnt.setState(containerID, StateResume)
  246. }
  247. func (clnt *client) Stats(containerID string) (*Stats, error) {
  248. resp, err := clnt.remote.apiClient.Stats(context.Background(), &containerd.StatsRequest{containerID})
  249. if err != nil {
  250. return nil, err
  251. }
  252. return (*Stats)(resp), nil
  253. }
  254. // Take care of the old 1.11.0 behavior in case the version upgrade
  255. // happened without a clean daemon shutdown
  256. func (clnt *client) cleanupOldRootfs(containerID string) {
  257. // Unmount and delete the bundle folder
  258. if mts, err := mount.GetMounts(); err == nil {
  259. for _, mts := range mts {
  260. if strings.HasSuffix(mts.Mountpoint, containerID+"/rootfs") {
  261. if err := syscall.Unmount(mts.Mountpoint, syscall.MNT_DETACH); err == nil {
  262. os.RemoveAll(strings.TrimSuffix(mts.Mountpoint, "/rootfs"))
  263. }
  264. break
  265. }
  266. }
  267. }
  268. }
  269. func (clnt *client) setExited(containerID string, exitCode uint32) error {
  270. clnt.lock(containerID)
  271. defer clnt.unlock(containerID)
  272. err := clnt.backend.StateChanged(containerID, StateInfo{
  273. CommonStateInfo: CommonStateInfo{
  274. State: StateExit,
  275. ExitCode: exitCode,
  276. }})
  277. clnt.cleanupOldRootfs(containerID)
  278. return err
  279. }
  280. func (clnt *client) GetPidsForContainer(containerID string) ([]int, error) {
  281. cont, err := clnt.getContainerdContainer(containerID)
  282. if err != nil {
  283. return nil, err
  284. }
  285. pids := make([]int, len(cont.Pids))
  286. for i, p := range cont.Pids {
  287. pids[i] = int(p)
  288. }
  289. return pids, nil
  290. }
  291. // Summary returns a summary of the processes running in a container.
  292. // This is a no-op on Linux.
  293. func (clnt *client) Summary(containerID string) ([]Summary, error) {
  294. return nil, nil
  295. }
  296. func (clnt *client) getContainerdContainer(containerID string) (*containerd.Container, error) {
  297. resp, err := clnt.remote.apiClient.State(context.Background(), &containerd.StateRequest{Id: containerID})
  298. if err != nil {
  299. return nil, err
  300. }
  301. for _, cont := range resp.Containers {
  302. if cont.Id == containerID {
  303. return cont, nil
  304. }
  305. }
  306. return nil, fmt.Errorf("invalid state response")
  307. }
  308. func (clnt *client) newContainer(dir string, options ...CreateOption) *container {
  309. container := &container{
  310. containerCommon: containerCommon{
  311. process: process{
  312. dir: dir,
  313. processCommon: processCommon{
  314. containerID: filepath.Base(dir),
  315. client: clnt,
  316. friendlyName: InitFriendlyName,
  317. },
  318. },
  319. processes: make(map[string]*process),
  320. },
  321. }
  322. for _, option := range options {
  323. if err := option.Apply(container); err != nil {
  324. logrus.Errorf("libcontainerd: newContainer(): %v", err)
  325. }
  326. }
  327. return container
  328. }
  329. func (clnt *client) UpdateResources(containerID string, resources Resources) error {
  330. clnt.lock(containerID)
  331. defer clnt.unlock(containerID)
  332. container, err := clnt.getContainer(containerID)
  333. if err != nil {
  334. return err
  335. }
  336. if container.systemPid == 0 {
  337. return fmt.Errorf("No active process for container %s", containerID)
  338. }
  339. _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
  340. Id: containerID,
  341. Pid: InitFriendlyName,
  342. Resources: (*containerd.UpdateResource)(&resources),
  343. })
  344. if err != nil {
  345. return err
  346. }
  347. return nil
  348. }
  349. func (clnt *client) getExitNotifier(containerID string) *exitNotifier {
  350. clnt.mapMutex.RLock()
  351. defer clnt.mapMutex.RUnlock()
  352. return clnt.exitNotifiers[containerID]
  353. }
  354. func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
  355. clnt.mapMutex.Lock()
  356. w, ok := clnt.exitNotifiers[containerID]
  357. defer clnt.mapMutex.Unlock()
  358. if !ok {
  359. w = &exitNotifier{c: make(chan struct{}), client: clnt}
  360. clnt.exitNotifiers[containerID] = w
  361. }
  362. return w
  363. }
  364. func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, options ...CreateOption) (err error) {
  365. clnt.lock(cont.Id)
  366. defer clnt.unlock(cont.Id)
  367. logrus.Debugf("libcontainerd: restore container %s state %s", cont.Id, cont.Status)
  368. containerID := cont.Id
  369. if _, err := clnt.getContainer(containerID); err == nil {
  370. return fmt.Errorf("container %s is already active", containerID)
  371. }
  372. defer func() {
  373. if err != nil {
  374. clnt.deleteContainer(cont.Id)
  375. }
  376. }()
  377. container := clnt.newContainer(cont.BundlePath, options...)
  378. container.systemPid = systemPid(cont)
  379. var terminal bool
  380. for _, p := range cont.Processes {
  381. if p.Pid == InitFriendlyName {
  382. terminal = p.Terminal
  383. }
  384. }
  385. iopipe, err := container.openFifos(terminal)
  386. if err != nil {
  387. return err
  388. }
  389. var stdinOnce sync.Once
  390. stdin := iopipe.Stdin
  391. iopipe.Stdin = ioutils.NewWriteCloserWrapper(stdin, func() error {
  392. var err error
  393. stdinOnce.Do(func() { // on error from attach we don't know if stdin was already closed
  394. err = stdin.Close()
  395. })
  396. return err
  397. })
  398. if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil {
  399. container.closeFifos(iopipe)
  400. return err
  401. }
  402. clnt.appendContainer(container)
  403. err = clnt.backend.StateChanged(containerID, StateInfo{
  404. CommonStateInfo: CommonStateInfo{
  405. State: StateRestore,
  406. Pid: container.systemPid,
  407. }})
  408. if err != nil {
  409. container.closeFifos(iopipe)
  410. return err
  411. }
  412. if lastEvent != nil {
  413. // This should only be a pause or resume event
  414. if lastEvent.Type == StatePause || lastEvent.Type == StateResume {
  415. return clnt.backend.StateChanged(containerID, StateInfo{
  416. CommonStateInfo: CommonStateInfo{
  417. State: lastEvent.Type,
  418. Pid: container.systemPid,
  419. }})
  420. }
  421. logrus.Warnf("libcontainerd: unexpected backlog event: %#v", lastEvent)
  422. }
  423. return nil
  424. }
  425. func (clnt *client) getContainerLastEventSinceTime(id string, tsp *timestamp.Timestamp) (*containerd.Event, error) {
  426. er := &containerd.EventsRequest{
  427. Timestamp: tsp,
  428. StoredOnly: true,
  429. Id: id,
  430. }
  431. events, err := clnt.remote.apiClient.Events(context.Background(), er)
  432. if err != nil {
  433. logrus.Errorf("libcontainerd: failed to get container events stream for %s: %q", er.Id, err)
  434. return nil, err
  435. }
  436. var ev *containerd.Event
  437. for {
  438. e, err := events.Recv()
  439. if err != nil {
  440. if err.Error() == "EOF" {
  441. break
  442. }
  443. logrus.Errorf("libcontainerd: failed to get container event for %s: %q", id, err)
  444. return nil, err
  445. }
  446. logrus.Debugf("libcontainerd: received past event %#v", e)
  447. switch e.Type {
  448. case StateExit, StatePause, StateResume:
  449. ev = e
  450. }
  451. }
  452. return ev, nil
  453. }
  454. func (clnt *client) getContainerLastEvent(id string) (*containerd.Event, error) {
  455. ev, err := clnt.getContainerLastEventSinceTime(id, clnt.remote.restoreFromTimestamp)
  456. if err == nil && ev == nil {
  457. // If ev is nil and the container is running in containerd,
  458. // we already consumed all the event of the
  459. // container, included the "exit" one.
  460. // Thus, we request all events containerd has in memory for
  461. // this container in order to get the last one (which should
  462. // be an exit event)
  463. logrus.Warnf("libcontainerd: client is out of sync, restore was called on a fully synced container (%s).", id)
  464. // Request all events since beginning of time
  465. t := time.Unix(0, 0)
  466. tsp, err := ptypes.TimestampProto(t)
  467. if err != nil {
  468. logrus.Errorf("libcontainerd: getLastEventSinceTime() failed to convert timestamp: %q", err)
  469. return nil, err
  470. }
  471. return clnt.getContainerLastEventSinceTime(id, tsp)
  472. }
  473. return ev, err
  474. }
  475. func (clnt *client) Restore(containerID string, options ...CreateOption) error {
  476. // Synchronize with live events
  477. clnt.remote.Lock()
  478. defer clnt.remote.Unlock()
  479. // Check that containerd still knows this container.
  480. //
  481. // In the unlikely event that Restore for this container process
  482. // the its past event before the main loop, the event will be
  483. // processed twice. However, this is not an issue as all those
  484. // events will do is change the state of the container to be
  485. // exactly the same.
  486. cont, err := clnt.getContainerdContainer(containerID)
  487. // Get its last event
  488. ev, eerr := clnt.getContainerLastEvent(containerID)
  489. if err != nil || cont.Status == "Stopped" {
  490. if err != nil && !strings.Contains(err.Error(), "container not found") {
  491. // Legitimate error
  492. return err
  493. }
  494. if ev == nil {
  495. if _, err := clnt.getContainer(containerID); err == nil {
  496. // If ev is nil and the container is running in containerd,
  497. // we already consumed all the event of the
  498. // container, included the "exit" one.
  499. // Thus we return to avoid overriding the Exit Code.
  500. logrus.Warnf("libcontainerd: restore was called on a fully synced container (%s)", containerID)
  501. return nil
  502. }
  503. // the container is not running so we need to fix the state within docker
  504. ev = &containerd.Event{
  505. Type: StateExit,
  506. Status: 1,
  507. }
  508. }
  509. // get the exit status for this container
  510. ec := uint32(0)
  511. if eerr == nil && ev.Type == StateExit {
  512. ec = ev.Status
  513. }
  514. clnt.setExited(containerID, ec)
  515. return nil
  516. }
  517. // container is still alive
  518. if clnt.liveRestore {
  519. if err := clnt.restore(cont, ev, options...); err != nil {
  520. logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err)
  521. }
  522. return nil
  523. }
  524. // Kill the container if liveRestore == false
  525. w := clnt.getOrCreateExitNotifier(containerID)
  526. clnt.lock(cont.Id)
  527. container := clnt.newContainer(cont.BundlePath)
  528. container.systemPid = systemPid(cont)
  529. clnt.appendContainer(container)
  530. clnt.unlock(cont.Id)
  531. container.discardFifos()
  532. if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil {
  533. logrus.Errorf("libcontainerd: error sending sigterm to %v: %v", containerID, err)
  534. }
  535. // Let the main loop handle the exit event
  536. clnt.remote.Unlock()
  537. select {
  538. case <-time.After(10 * time.Second):
  539. if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil {
  540. logrus.Errorf("libcontainerd: error sending sigkill to %v: %v", containerID, err)
  541. }
  542. select {
  543. case <-time.After(2 * time.Second):
  544. case <-w.wait():
  545. // relock because of the defer
  546. clnt.remote.Lock()
  547. return nil
  548. }
  549. case <-w.wait():
  550. // relock because of the defer
  551. clnt.remote.Lock()
  552. return nil
  553. }
  554. // relock because of the defer
  555. clnt.remote.Lock()
  556. clnt.deleteContainer(containerID)
  557. return clnt.setExited(containerID, uint32(255))
  558. }
  559. type exitNotifier struct {
  560. id string
  561. client *client
  562. c chan struct{}
  563. once sync.Once
  564. }
  565. func (en *exitNotifier) close() {
  566. en.once.Do(func() {
  567. close(en.c)
  568. en.client.mapMutex.Lock()
  569. if en == en.client.exitNotifiers[en.id] {
  570. delete(en.client.exitNotifiers, en.id)
  571. }
  572. en.client.mapMutex.Unlock()
  573. })
  574. }
  575. func (en *exitNotifier) wait() <-chan struct{} {
  576. return en.c
  577. }