client_linux.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514
  1. package libcontainerd
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "strings"
  8. "sync"
  9. "syscall"
  10. "time"
  11. "github.com/Sirupsen/logrus"
  12. containerd "github.com/docker/containerd/api/grpc/types"
  13. "github.com/docker/docker/pkg/idtools"
  14. "github.com/docker/docker/pkg/mount"
  15. specs "github.com/opencontainers/specs/specs-go"
  16. "golang.org/x/net/context"
  17. )
  18. type client struct {
  19. clientCommon
  20. // Platform specific properties below here.
  21. remote *remote
  22. q queue
  23. exitNotifiers map[string]*exitNotifier
  24. liveRestore bool
  25. }
  26. func (clnt *client) AddProcess(containerID, processFriendlyName string, specp Process) error {
  27. clnt.lock(containerID)
  28. defer clnt.unlock(containerID)
  29. container, err := clnt.getContainer(containerID)
  30. if err != nil {
  31. return err
  32. }
  33. spec, err := container.spec()
  34. if err != nil {
  35. return err
  36. }
  37. sp := spec.Process
  38. sp.Args = specp.Args
  39. sp.Terminal = specp.Terminal
  40. if specp.Env != nil {
  41. sp.Env = specp.Env
  42. }
  43. if specp.Cwd != nil {
  44. sp.Cwd = *specp.Cwd
  45. }
  46. if specp.User != nil {
  47. sp.User = specs.User{
  48. UID: specp.User.UID,
  49. GID: specp.User.GID,
  50. AdditionalGids: specp.User.AdditionalGids,
  51. }
  52. }
  53. if specp.Capabilities != nil {
  54. sp.Capabilities = specp.Capabilities
  55. }
  56. p := container.newProcess(processFriendlyName)
  57. r := &containerd.AddProcessRequest{
  58. Args: sp.Args,
  59. Cwd: sp.Cwd,
  60. Terminal: sp.Terminal,
  61. Id: containerID,
  62. Env: sp.Env,
  63. User: &containerd.User{
  64. Uid: sp.User.UID,
  65. Gid: sp.User.GID,
  66. AdditionalGids: sp.User.AdditionalGids,
  67. },
  68. Pid: processFriendlyName,
  69. Stdin: p.fifo(syscall.Stdin),
  70. Stdout: p.fifo(syscall.Stdout),
  71. Stderr: p.fifo(syscall.Stderr),
  72. Capabilities: sp.Capabilities,
  73. ApparmorProfile: sp.ApparmorProfile,
  74. SelinuxLabel: sp.SelinuxLabel,
  75. NoNewPrivileges: sp.NoNewPrivileges,
  76. Rlimits: convertRlimits(sp.Rlimits),
  77. }
  78. iopipe, err := p.openFifos(sp.Terminal)
  79. if err != nil {
  80. return err
  81. }
  82. if _, err := clnt.remote.apiClient.AddProcess(context.Background(), r); err != nil {
  83. p.closeFifos(iopipe)
  84. return err
  85. }
  86. container.processes[processFriendlyName] = p
  87. clnt.unlock(containerID)
  88. if err := clnt.backend.AttachStreams(processFriendlyName, *iopipe); err != nil {
  89. return err
  90. }
  91. clnt.lock(containerID)
  92. return nil
  93. }
  94. func (clnt *client) prepareBundleDir(uid, gid int) (string, error) {
  95. root, err := filepath.Abs(clnt.remote.stateDir)
  96. if err != nil {
  97. return "", err
  98. }
  99. if uid == 0 && gid == 0 {
  100. return root, nil
  101. }
  102. p := string(filepath.Separator)
  103. for _, d := range strings.Split(root, string(filepath.Separator))[1:] {
  104. p = filepath.Join(p, d)
  105. fi, err := os.Stat(p)
  106. if err != nil && !os.IsNotExist(err) {
  107. return "", err
  108. }
  109. if os.IsNotExist(err) || fi.Mode()&1 == 0 {
  110. p = fmt.Sprintf("%s.%d.%d", p, uid, gid)
  111. if err := idtools.MkdirAs(p, 0700, uid, gid); err != nil && !os.IsExist(err) {
  112. return "", err
  113. }
  114. }
  115. }
  116. return p, nil
  117. }
  118. func (clnt *client) Create(containerID string, spec Spec, options ...CreateOption) (err error) {
  119. clnt.lock(containerID)
  120. defer clnt.unlock(containerID)
  121. if ctr, err := clnt.getContainer(containerID); err == nil {
  122. if ctr.restarting {
  123. ctr.restartManager.Cancel()
  124. ctr.clean()
  125. } else {
  126. return fmt.Errorf("Container %s is already active", containerID)
  127. }
  128. }
  129. uid, gid, err := getRootIDs(specs.Spec(spec))
  130. if err != nil {
  131. return err
  132. }
  133. dir, err := clnt.prepareBundleDir(uid, gid)
  134. if err != nil {
  135. return err
  136. }
  137. container := clnt.newContainer(filepath.Join(dir, containerID), options...)
  138. if err := container.clean(); err != nil {
  139. return err
  140. }
  141. defer func() {
  142. if err != nil {
  143. container.clean()
  144. clnt.deleteContainer(containerID)
  145. }
  146. }()
  147. if err := idtools.MkdirAllAs(container.dir, 0700, uid, gid); err != nil && !os.IsExist(err) {
  148. return err
  149. }
  150. f, err := os.Create(filepath.Join(container.dir, configFilename))
  151. if err != nil {
  152. return err
  153. }
  154. defer f.Close()
  155. if err := json.NewEncoder(f).Encode(spec); err != nil {
  156. return err
  157. }
  158. return container.start()
  159. }
  160. func (clnt *client) Signal(containerID string, sig int) error {
  161. clnt.lock(containerID)
  162. defer clnt.unlock(containerID)
  163. _, err := clnt.remote.apiClient.Signal(context.Background(), &containerd.SignalRequest{
  164. Id: containerID,
  165. Pid: InitFriendlyName,
  166. Signal: uint32(sig),
  167. })
  168. return err
  169. }
  170. func (clnt *client) Resize(containerID, processFriendlyName string, width, height int) error {
  171. clnt.lock(containerID)
  172. defer clnt.unlock(containerID)
  173. if _, err := clnt.getContainer(containerID); err != nil {
  174. return err
  175. }
  176. _, err := clnt.remote.apiClient.UpdateProcess(context.Background(), &containerd.UpdateProcessRequest{
  177. Id: containerID,
  178. Pid: processFriendlyName,
  179. Width: uint32(width),
  180. Height: uint32(height),
  181. })
  182. return err
  183. }
  184. func (clnt *client) Pause(containerID string) error {
  185. return clnt.setState(containerID, StatePause)
  186. }
  187. func (clnt *client) setState(containerID, state string) error {
  188. clnt.lock(containerID)
  189. container, err := clnt.getContainer(containerID)
  190. if err != nil {
  191. clnt.unlock(containerID)
  192. return err
  193. }
  194. if container.systemPid == 0 {
  195. clnt.unlock(containerID)
  196. return fmt.Errorf("No active process for container %s", containerID)
  197. }
  198. st := "running"
  199. if state == StatePause {
  200. st = "paused"
  201. }
  202. chstate := make(chan struct{})
  203. _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
  204. Id: containerID,
  205. Pid: InitFriendlyName,
  206. Status: st,
  207. })
  208. if err != nil {
  209. clnt.unlock(containerID)
  210. return err
  211. }
  212. container.pauseMonitor.append(state, chstate)
  213. clnt.unlock(containerID)
  214. <-chstate
  215. return nil
  216. }
  217. func (clnt *client) Resume(containerID string) error {
  218. return clnt.setState(containerID, StateResume)
  219. }
  220. func (clnt *client) Stats(containerID string) (*Stats, error) {
  221. resp, err := clnt.remote.apiClient.Stats(context.Background(), &containerd.StatsRequest{containerID})
  222. if err != nil {
  223. return nil, err
  224. }
  225. return (*Stats)(resp), nil
  226. }
  227. // Take care of the old 1.11.0 behavior in case the version upgrade
  228. // happenned without a clean daemon shutdown
  229. func (clnt *client) cleanupOldRootfs(containerID string) {
  230. // Unmount and delete the bundle folder
  231. if mts, err := mount.GetMounts(); err == nil {
  232. for _, mts := range mts {
  233. if strings.HasSuffix(mts.Mountpoint, containerID+"/rootfs") {
  234. if err := syscall.Unmount(mts.Mountpoint, syscall.MNT_DETACH); err == nil {
  235. os.RemoveAll(strings.TrimSuffix(mts.Mountpoint, "/rootfs"))
  236. }
  237. break
  238. }
  239. }
  240. }
  241. }
  242. func (clnt *client) setExited(containerID string) error {
  243. clnt.lock(containerID)
  244. defer clnt.unlock(containerID)
  245. var exitCode uint32
  246. if event, ok := clnt.remote.pastEvents[containerID]; ok {
  247. exitCode = event.Status
  248. delete(clnt.remote.pastEvents, containerID)
  249. }
  250. err := clnt.backend.StateChanged(containerID, StateInfo{
  251. CommonStateInfo: CommonStateInfo{
  252. State: StateExit,
  253. ExitCode: exitCode,
  254. }})
  255. clnt.cleanupOldRootfs(containerID)
  256. return err
  257. }
  258. func (clnt *client) GetPidsForContainer(containerID string) ([]int, error) {
  259. cont, err := clnt.getContainerdContainer(containerID)
  260. if err != nil {
  261. return nil, err
  262. }
  263. pids := make([]int, len(cont.Pids))
  264. for i, p := range cont.Pids {
  265. pids[i] = int(p)
  266. }
  267. return pids, nil
  268. }
  269. // Summary returns a summary of the processes running in a container.
  270. // This is a no-op on Linux.
  271. func (clnt *client) Summary(containerID string) ([]Summary, error) {
  272. return nil, nil
  273. }
  274. func (clnt *client) getContainerdContainer(containerID string) (*containerd.Container, error) {
  275. resp, err := clnt.remote.apiClient.State(context.Background(), &containerd.StateRequest{Id: containerID})
  276. if err != nil {
  277. return nil, err
  278. }
  279. for _, cont := range resp.Containers {
  280. if cont.Id == containerID {
  281. return cont, nil
  282. }
  283. }
  284. return nil, fmt.Errorf("invalid state response")
  285. }
  286. func (clnt *client) newContainer(dir string, options ...CreateOption) *container {
  287. container := &container{
  288. containerCommon: containerCommon{
  289. process: process{
  290. dir: dir,
  291. processCommon: processCommon{
  292. containerID: filepath.Base(dir),
  293. client: clnt,
  294. friendlyName: InitFriendlyName,
  295. },
  296. },
  297. processes: make(map[string]*process),
  298. },
  299. }
  300. for _, option := range options {
  301. if err := option.Apply(container); err != nil {
  302. logrus.Error(err)
  303. }
  304. }
  305. return container
  306. }
  307. func (clnt *client) UpdateResources(containerID string, resources Resources) error {
  308. clnt.lock(containerID)
  309. defer clnt.unlock(containerID)
  310. container, err := clnt.getContainer(containerID)
  311. if err != nil {
  312. return err
  313. }
  314. if container.systemPid == 0 {
  315. return fmt.Errorf("No active process for container %s", containerID)
  316. }
  317. _, err = clnt.remote.apiClient.UpdateContainer(context.Background(), &containerd.UpdateContainerRequest{
  318. Id: containerID,
  319. Pid: InitFriendlyName,
  320. Resources: (*containerd.UpdateResource)(&resources),
  321. })
  322. if err != nil {
  323. return err
  324. }
  325. return nil
  326. }
  327. func (clnt *client) getExitNotifier(containerID string) *exitNotifier {
  328. clnt.mapMutex.RLock()
  329. defer clnt.mapMutex.RUnlock()
  330. return clnt.exitNotifiers[containerID]
  331. }
  332. func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier {
  333. clnt.mapMutex.Lock()
  334. w, ok := clnt.exitNotifiers[containerID]
  335. defer clnt.mapMutex.Unlock()
  336. if !ok {
  337. w = &exitNotifier{c: make(chan struct{}), client: clnt}
  338. clnt.exitNotifiers[containerID] = w
  339. }
  340. return w
  341. }
  342. func (clnt *client) restore(cont *containerd.Container, options ...CreateOption) (err error) {
  343. clnt.lock(cont.Id)
  344. defer clnt.unlock(cont.Id)
  345. logrus.Debugf("restore container %s state %s", cont.Id, cont.Status)
  346. containerID := cont.Id
  347. if _, err := clnt.getContainer(containerID); err == nil {
  348. return fmt.Errorf("container %s is already active", containerID)
  349. }
  350. defer func() {
  351. if err != nil {
  352. clnt.deleteContainer(cont.Id)
  353. }
  354. }()
  355. container := clnt.newContainer(cont.BundlePath, options...)
  356. container.systemPid = systemPid(cont)
  357. var terminal bool
  358. for _, p := range cont.Processes {
  359. if p.Pid == InitFriendlyName {
  360. terminal = p.Terminal
  361. }
  362. }
  363. iopipe, err := container.openFifos(terminal)
  364. if err != nil {
  365. return err
  366. }
  367. if err := clnt.backend.AttachStreams(containerID, *iopipe); err != nil {
  368. return err
  369. }
  370. clnt.appendContainer(container)
  371. err = clnt.backend.StateChanged(containerID, StateInfo{
  372. CommonStateInfo: CommonStateInfo{
  373. State: StateRestore,
  374. Pid: container.systemPid,
  375. }})
  376. if err != nil {
  377. return err
  378. }
  379. if event, ok := clnt.remote.pastEvents[containerID]; ok {
  380. // This should only be a pause or resume event
  381. if event.Type == StatePause || event.Type == StateResume {
  382. return clnt.backend.StateChanged(containerID, StateInfo{
  383. CommonStateInfo: CommonStateInfo{
  384. State: event.Type,
  385. Pid: container.systemPid,
  386. }})
  387. }
  388. logrus.Warnf("unexpected backlog event: %#v", event)
  389. }
  390. return nil
  391. }
  392. func (clnt *client) Restore(containerID string, options ...CreateOption) error {
  393. if clnt.liveRestore {
  394. cont, err := clnt.getContainerdContainer(containerID)
  395. if err == nil && cont.Status != "stopped" {
  396. if err := clnt.restore(cont, options...); err != nil {
  397. logrus.Errorf("error restoring %s: %v", containerID, err)
  398. }
  399. return nil
  400. }
  401. return clnt.setExited(containerID)
  402. }
  403. cont, err := clnt.getContainerdContainer(containerID)
  404. if err == nil && cont.Status != "stopped" {
  405. w := clnt.getOrCreateExitNotifier(containerID)
  406. clnt.lock(cont.Id)
  407. container := clnt.newContainer(cont.BundlePath)
  408. container.systemPid = systemPid(cont)
  409. clnt.appendContainer(container)
  410. clnt.unlock(cont.Id)
  411. container.discardFifos()
  412. if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil {
  413. logrus.Errorf("error sending sigterm to %v: %v", containerID, err)
  414. }
  415. select {
  416. case <-time.After(10 * time.Second):
  417. if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil {
  418. logrus.Errorf("error sending sigkill to %v: %v", containerID, err)
  419. }
  420. select {
  421. case <-time.After(2 * time.Second):
  422. case <-w.wait():
  423. return nil
  424. }
  425. case <-w.wait():
  426. return nil
  427. }
  428. }
  429. clnt.deleteContainer(containerID)
  430. return clnt.setExited(containerID)
  431. }
  432. type exitNotifier struct {
  433. id string
  434. client *client
  435. c chan struct{}
  436. once sync.Once
  437. }
  438. func (en *exitNotifier) close() {
  439. en.once.Do(func() {
  440. close(en.c)
  441. en.client.mapMutex.Lock()
  442. if en == en.client.exitNotifiers[en.id] {
  443. delete(en.client.exitNotifiers, en.id)
  444. }
  445. en.client.mapMutex.Unlock()
  446. })
  447. }
  448. func (en *exitNotifier) wait() <-chan struct{} {
  449. return en.c
  450. }