client_linux.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601
  1. package libcontainerd
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "strings"
  8. "sync"
  9. "syscall"
  10. "time"
  11. "github.com/Sirupsen/logrus"
  12. containerd "github.com/docker/containerd/api/grpc/types"
  13. "github.com/docker/docker/pkg/idtools"
  14. "github.com/docker/docker/pkg/mount"
  15. specs "github.com/opencontainers/specs/specs-go"
  16. "golang.org/x/net/context"
  17. )
  18. type client struct {
  19. clientCommon
  20. // Platform specific properties below here.
  21. remote *remote
  22. q queue
  23. exitNotifiers map[string]*exitNotifier
  24. liveRestore bool
  25. }
  26. func (clnt *client) AddProcess(ctx context.Context, containerID, processFriendlyName string, specp Process) error {
  27. clnt.lock(containerID)
  28. defer clnt.unlock(containerID)
  29. container, err := clnt.getContainer(containerID)
  30. if err != nil {
  31. return err
  32. }
  33. spec, err := container.spec()
  34. if err != nil {
  35. return err
  36. }
  37. sp := spec.Process
  38. sp.Args = specp.Args
  39. sp.Terminal = specp.Terminal
  40. if specp.Env != nil {
  41. sp.Env = specp.Env
  42. }
  43. if specp.Cwd != nil {
  44. sp.Cwd = *specp.Cwd
  45. }
  46. if specp.User != nil {
  47. sp.User = specs.User{
  48. UID: specp.User.UID,
  49. GID: specp.User.GID,
  50. AdditionalGids: specp.User.AdditionalGids,
  51. }
  52. }
  53. if specp.Capabilities != nil {
  54. sp.Capabilities = specp.Capabilities
  55. }
  56. p := container.newProcess(processFriendlyName)
  57. r := &containerd.AddProcessRequest{
  58. Args: sp.Args,
  59. Cwd: sp.Cwd,
  60. Terminal: sp.Terminal,
  61. Id: containerID,
  62. Env: sp.Env,
  63. User: &containerd.User{
  64. Uid: sp.User.UID,
  65. Gid: sp.User.GID,
  66. AdditionalGids: sp.User.AdditionalGids,
  67. },
  68. Pid: processFriendlyName,
  69. Stdin: p.fifo(syscall.Stdin),
  70. Stdout: p.fifo(syscall.Stdout),
  71. Stderr: p.fifo(syscall.Stderr),
  72. Capabilities: sp.Capabilities,
  73. ApparmorProfile: sp.ApparmorProfile,
  74. SelinuxLabel: sp.SelinuxLabel,
  75. NoNewPrivileges: sp.NoNewPrivileges,
  76. Rlimits: convertRlimits(sp.Rlimits),
  77. }
  78. iopipe, err := p.openFifos(sp.Terminal)
  79. if err != nil {
  80. return err
  81. }
  82. if _, err := clnt.remote.apiClient.AddProcess(ctx, r); err != nil {
  83. p.closeFifos(iopipe)
  84. return err
  85. }
  86. container.processes[processFriendlyName] = p
  87. clnt.unlock(containerID)
  88. if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil {
  89. return err
  90. }
  91. clnt.lock(containerID)
  92. return nil
  93. }
  94. func (clnt *client) prepareBundleDir(uid, gid int) (string, error) {
  95. root, err := filepath.Abs(clnt.remote.stateDir)
  96. if err != nil {
  97. return "", err
  98. }
  99. if uid == 0 && gid == 0 {
  100. return root, nil
  101. }
  102. p := string(filepath.Separator)
  103. for _, d := range strings.Split(root, string(filepath.Separator))[1:] {
  104. p = filepath.Join(p, d)
  105. fi, err := os.Stat(p)
  106. if err != nil && !os.IsNotExist(err) {
  107. return "", err
  108. }
  109. if os.IsNotExist(err) || fi.Mode()&1 == 0 {
  110. p = fmt.Sprintf("%s.%d.%d", p, uid, gid)
  111. if err := idtools.MkdirAs(p, 0700, uid, gid); err != nil && !os.IsExist(err) {
  112. return "", err
  113. }
  114. }
  115. }
  116. return p, nil
  117. }
  118. func (clnt *client) Create(containerID string, spec Spec, options ...CreateOption) (err error) {
  119. clnt.lock(containerID)
  120. defer clnt.unlock(containerID)
  121. if ctr, err := clnt.getContainer(containerID); err == nil {
  122. if ctr.restarting {
  123. ctr.restartManager.Cancel()
  124. ctr.clean()
  125. } else {
  126. return fmt.Errorf("Container %s is already active", containerID)
  127. }
  128. }
  129. uid, gid, err := getRootIDs(specs.Spec(spec))
  130. if err != nil {
  131. return err
  132. }
  133. dir, err := clnt.prepareBundleDir(uid, gid)
  134. if err != nil {
  135. return err
  136. }
  137. container := clnt.newContainer(filepath.Join(dir, containerID), options...)
  138. if err := container.clean(); err != nil {
  139. return err
  140. }
  141. defer func() {
  142. if err != nil {
  143. container.clean()
  144. clnt.deleteContainer(containerID)
  145. }
  146. }()
  147. if err := idtools.MkdirAllAs(container.dir, 0700, uid, gid); err != nil && !os.IsExist(err) {
  148. return err
  149. }
  150. f, err := os.Create(filepath.Join(container.dir, configFilename))
  151. if err != nil {
  152. return err
  153. }
  154. defer f.Close()
  155. if err := json.NewEncoder(f).Encode(spec); err != nil {
  156. return err
  157. }
  158. return container.start()
  159. }
  160. func (clnt *client) Signal(containerID string, sig int) error {
  161. clnt.lock(containerID)
  162. defer clnt.unlock(containerID)
  163. _, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
  164. Id: containerID,
  165. Pid: InitFriendlyName,
  166. Signal: uint32(sig),
  167. })
  168. return err
  169. }
  170. func (clnt *client) SignalProcess(containerID string, pid string, sig int) error {
  171. clnt.lock(containerID)
  172. defer clnt.unlock(containerID)
  173. _, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
  174. Id: containerID,
  175. Pid: pid,
  176. Signal: uint32(sig),
  177. })
  178. return err
  179. }
  180. func (clnt *client) Resize(containerID, processFriendlyName string, width, height int) error {
  181. clnt.lock(containerID)
  182. defer clnt.unlock(containerID)
  183. if _, err := clnt.getContainer(containerID); err != nil {
  184. return err
  185. }
  186. _, err := clnt.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
  187. Id: containerID,
  188. Pid: processFriendlyName,
  189. Width: uint32(width),
  190. Height: uint32(height),
  191. })
  192. return err
  193. }
  194. func (clnt *client) Pause(containerID string) error {
  195. return clnt.setState(containerID, StatePause)
  196. }
  197. func (clnt *client) setState(containerID, state string) error {
  198. clnt.lock(containerID)
  199. container, err := clnt.getContainer(containerID)
  200. if err != nil {
  201. clnt.unlock(containerID)
  202. return err
  203. }
  204. if container.systemPid == 0 {
  205. clnt.unlock(containerID)
  206. return fmt.Errorf("No active process for container %s", containerID)
  207. }
  208. st := "running"
  209. if state == StatePause {
  210. st = "paused"
  211. }
  212. chstate := make(chan struct{})
  213. _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
  214. Id: containerID,
  215. Pid: InitFriendlyName,
  216. Status: st,
  217. })
  218. if err != nil {
  219. clnt.unlock(containerID)
  220. return err
  221. }
  222. container.pauseMonitor.append(state, chstate)
  223. clnt.unlock(containerID)
  224. <-chstate
  225. return nil
  226. }
  227. func (clnt *client) Resume(containerID string) error {
  228. return clnt.setState(containerID, StateResume)
  229. }
  230. func (clnt *client) Stats(containerID string) (*Stats, error) {
  231. resp, err := clnt.remote.apiClient.Stats(context.Background(), &containerd.StatsRequest{containerID})
  232. if err != nil {
  233. return nil, err
  234. }
  235. return (*Stats)(resp), nil
  236. }
  237. // Take care of the old 1.11.0 behavior in case the version upgrade
  238. // happened without a clean daemon shutdown
  239. func (clnt *client) cleanupOldRootfs(containerID string) {
  240. // Unmount and delete the bundle folder
  241. if mts, err := mount.GetMounts(); err == nil {
  242. for _, mts := range mts {
  243. if strings.HasSuffix(mts.Mountpoint, containerID+"/rootfs") {
  244. if err := syscall.Unmount(mts.Mountpoint, syscall.MNT_DETACH); err == nil {
  245. os.RemoveAll(strings.TrimSuffix(mts.Mountpoint, "/rootfs"))
  246. }
  247. break
  248. }
  249. }
  250. }
  251. }
  252. func (clnt *client) setExited(containerID string, exitCode uint32) error {
  253. clnt.lock(containerID)
  254. defer clnt.unlock(containerID)
  255. err := clnt.backend.StateChanged(containerID, StateInfo{
  256. CommonStateInfo: CommonStateInfo{
  257. State: StateExit,
  258. ExitCode: exitCode,
  259. }})
  260. clnt.cleanupOldRootfs(containerID)
  261. return err
  262. }
  263. func (clnt *client) GetPidsForContainer(containerID string) ([]int, error) {
  264. cont, err := clnt.getContainerdContainer(containerID)
  265. if err != nil {
  266. return nil, err
  267. }
  268. pids := make([]int, len(cont.Pids))
  269. for i, p := range cont.Pids {
  270. pids[i] = int(p)
  271. }
  272. return pids, nil
  273. }
  274. // Summary returns a summary of the processes running in a container.
  275. // This is a no-op on Linux.
  276. func (clnt *client) Summary(containerID string) ([]Summary, error) {
  277. return nil, nil
  278. }
  279. func (clnt *client) getContainerdContainer(containerID string) (*containerd.Container, error) {
  280. resp, err := clnt.remote.apiClient.State(context.Background(), &containerd.StateRequest{Id: containerID})
  281. if err != nil {
  282. return nil, err
  283. }
  284. for _, cont := range resp.Containers {
  285. if cont.Id == containerID {
  286. return cont, nil
  287. }
  288. }
  289. return nil, fmt.Errorf("invalid state response")
  290. }
  291. func (clnt *client) newContainer(dir string, options ...CreateOption) *container {
  292. container := &container{
  293. containerCommon: containerCommon{
  294. process: process{
  295. dir: dir,
  296. processCommon: processCommon{
  297. containerID: filepath.Base(dir),
  298. client: clnt,
  299. friendlyName: InitFriendlyName,
  300. },
  301. },
  302. processes: make(map[string]*process),
  303. },
  304. }
  305. for _, option := range options {
  306. if err := option.Apply(container); err != nil {
  307. logrus.Errorf("libcontainerd: newContainer(): %v", err)
  308. }
  309. }
  310. return container
  311. }
  312. func (clnt *client) UpdateResources(containerID string, resources Resources) error {
  313. clnt.lock(containerID)
  314. defer clnt.unlock(containerID)
  315. container, err := clnt.getContainer(containerID)
  316. if err != nil {
  317. return err
  318. }
  319. if container.systemPid == 0 {
  320. return fmt.Errorf("No active process for container %s", containerID)
  321. }
  322. _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
  323. Id: containerID,
  324. Pid: InitFriendlyName,
  325. Resources: (*containerd.UpdateResource)(&resources),
  326. })
  327. if err != nil {
  328. return err
  329. }
  330. return nil
  331. }
  332. func (clnt *client) getExitNotifier(containerID string) *exitNotifier {
  333. clnt.mapMutex.RLock()
  334. defer clnt.mapMutex.RUnlock()
  335. return clnt.exitNotifiers[containerID]
  336. }
  337. func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
  338. clnt.mapMutex.Lock()
  339. w, ok := clnt.exitNotifiers[containerID]
  340. defer clnt.mapMutex.Unlock()
  341. if !ok {
  342. w = &exitNotifier{c: make(chan struct{}), client: clnt}
  343. clnt.exitNotifiers[containerID] = w
  344. }
  345. return w
  346. }
  347. func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, options ...CreateOption) (err error) {
  348. clnt.lock(cont.Id)
  349. defer clnt.unlock(cont.Id)
  350. logrus.Debugf("libcontainerd: restore container %s state %s", cont.Id, cont.Status)
  351. containerID := cont.Id
  352. if _, err := clnt.getContainer(containerID); err == nil {
  353. return fmt.Errorf("container %s is already active", containerID)
  354. }
  355. defer func() {
  356. if err != nil {
  357. clnt.deleteContainer(cont.Id)
  358. }
  359. }()
  360. container := clnt.newContainer(cont.BundlePath, options...)
  361. container.systemPid = systemPid(cont)
  362. var terminal bool
  363. for _, p := range cont.Processes {
  364. if p.Pid == InitFriendlyName {
  365. terminal = p.Terminal
  366. }
  367. }
  368. iopipe, err := container.openFifos(terminal)
  369. if err != nil {
  370. return err
  371. }
  372. if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil {
  373. return err
  374. }
  375. clnt.appendContainer(container)
  376. err = clnt.backend.StateChanged(containerID, StateInfo{
  377. CommonStateInfo: CommonStateInfo{
  378. State: StateRestore,
  379. Pid: container.systemPid,
  380. }})
  381. if err != nil {
  382. return err
  383. }
  384. if lastEvent != nil {
  385. // This should only be a pause or resume event
  386. if lastEvent.Type == StatePause || lastEvent.Type == StateResume {
  387. return clnt.backend.StateChanged(containerID, StateInfo{
  388. CommonStateInfo: CommonStateInfo{
  389. State: lastEvent.Type,
  390. Pid: container.systemPid,
  391. }})
  392. }
  393. logrus.Warnf("libcontainerd: unexpected backlog event: %#v", lastEvent)
  394. }
  395. return nil
  396. }
  397. func (clnt *client) getContainerLastEvent(containerID string) (*containerd.Event, error) {
  398. er := &containerd.EventsRequest{
  399. Timestamp: clnt.remote.restoreFromTimestamp,
  400. StoredOnly: true,
  401. Id: containerID,
  402. }
  403. events, err := clnt.remote.apiClient.Events(context.Background(), er)
  404. if err != nil {
  405. logrus.Errorf("libcontainerd: failed to get container events stream for %s: %q", er.Id, err)
  406. return nil, err
  407. }
  408. var ev *containerd.Event
  409. for {
  410. e, err := events.Recv()
  411. if err != nil {
  412. if err.Error() == "EOF" {
  413. break
  414. }
  415. logrus.Errorf("libcontainerd: failed to get container event for %s: %q", containerID, err)
  416. return nil, err
  417. }
  418. logrus.Debugf("libcontainerd: received past event %#v", e)
  419. switch e.Type {
  420. case StateExit, StatePause, StateResume:
  421. ev = e
  422. }
  423. }
  424. return ev, nil
  425. }
  426. func (clnt *client) Restore(containerID string, options ...CreateOption) error {
  427. // Synchronize with live events
  428. clnt.remote.Lock()
  429. defer clnt.remote.Unlock()
  430. // Check that containerd still knows this container.
  431. //
  432. // In the unlikely event that Restore for this container process
  433. // the its past event before the main loop, the event will be
  434. // processed twice. However, this is not an issue as all those
  435. // events will do is change the state of the container to be
  436. // exactly the same.
  437. cont, err := clnt.getContainerdContainer(containerID)
  438. // Get its last event
  439. ev, eerr := clnt.getContainerLastEvent(containerID)
  440. if err != nil || cont.Status == "Stopped" {
  441. if err != nil && !strings.Contains(err.Error(), "container not found") {
  442. // Legitimate error
  443. return err
  444. }
  445. if ev == nil {
  446. if _, err := clnt.getContainer(containerID); err == nil {
  447. // If ev is nil and the container is running in containerd,
  448. // we already consumed all the event of the
  449. // container, included the "exit" one.
  450. // Thus we return to avoid overriding the Exit Code.
  451. logrus.Warnf("libcontainerd: restore was called on a fully synced container (%s)", containerID)
  452. return nil
  453. }
  454. // the container is not running so we need to fix the state within docker
  455. ev = &containerd.Event{
  456. Type: StateExit,
  457. Status: 1,
  458. }
  459. }
  460. // get the exit status for this container
  461. ec := uint32(0)
  462. if eerr == nil && ev.Type == StateExit {
  463. ec = ev.Status
  464. }
  465. clnt.setExited(containerID, ec)
  466. return nil
  467. }
  468. // container is still alive
  469. if clnt.liveRestore {
  470. if err := clnt.restore(cont, ev, options...); err != nil {
  471. logrus.Errorf("libcontainerd: error restoring %s: %v", containerID, err)
  472. }
  473. return nil
  474. }
  475. // Kill the container if liveRestore == false
  476. w := clnt.getOrCreateExitNotifier(containerID)
  477. clnt.lock(cont.Id)
  478. container := clnt.newContainer(cont.BundlePath)
  479. container.systemPid = systemPid(cont)
  480. clnt.appendContainer(container)
  481. clnt.unlock(cont.Id)
  482. container.discardFifos()
  483. if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil {
  484. logrus.Errorf("libcontainerd: error sending sigterm to %v: %v", containerID, err)
  485. }
  486. // Let the main loop handle the exit event
  487. clnt.remote.Unlock()
  488. select {
  489. case <-time.After(10 * time.Second):
  490. if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil {
  491. logrus.Errorf("libcontainerd: error sending sigkill to %v: %v", containerID, err)
  492. }
  493. select {
  494. case <-time.After(2 * time.Second):
  495. case <-w.wait():
  496. // relock because of the defer
  497. clnt.remote.Lock()
  498. return nil
  499. }
  500. case <-w.wait():
  501. // relock because of the defer
  502. clnt.remote.Lock()
  503. return nil
  504. }
  505. // relock because of the defer
  506. clnt.remote.Lock()
  507. clnt.deleteContainer(containerID)
  508. return clnt.setExited(containerID, uint32(255))
  509. }
  510. type exitNotifier struct {
  511. id string
  512. client *client
  513. c chan struct{}
  514. once sync.Once
  515. }
  516. func (en *exitNotifier) close() {
  517. en.once.Do(func() {
  518. close(en.c)
  519. en.client.mapMutex.Lock()
  520. if en == en.client.exitNotifiers[en.id] {
  521. delete(en.client.exitNotifiers, en.id)
  522. }
  523. en.client.mapMutex.Unlock()
  524. })
  525. }
  526. func (en *exitNotifier) wait() <-chan struct{} {
  527. return en.c
  528. }