client.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. // +build !windows
  2. /*
  3. Copyright The containerd Authors.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package client
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "io/ioutil"
  20. "net"
  21. "os"
  22. "os/exec"
  23. "path/filepath"
  24. "strconv"
  25. "strings"
  26. "sync"
  27. "syscall"
  28. "time"
  29. "golang.org/x/sys/unix"
  30. "github.com/containerd/ttrpc"
  31. "github.com/pkg/errors"
  32. "github.com/sirupsen/logrus"
  33. "github.com/containerd/containerd/events"
  34. "github.com/containerd/containerd/log"
  35. "github.com/containerd/containerd/pkg/dialer"
  36. v1 "github.com/containerd/containerd/runtime/v1"
  37. "github.com/containerd/containerd/runtime/v1/shim"
  38. shimapi "github.com/containerd/containerd/runtime/v1/shim/v1"
  39. "github.com/containerd/containerd/sys"
  40. ptypes "github.com/gogo/protobuf/types"
  41. )
  42. var empty = &ptypes.Empty{}
  43. // Opt is an option for a shim client configuration
  44. type Opt func(context.Context, shim.Config) (shimapi.ShimService, io.Closer, error)
  45. // WithStart executes a new shim process
  46. func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHandler func()) Opt {
  47. return func(ctx context.Context, config shim.Config) (_ shimapi.ShimService, _ io.Closer, err error) {
  48. socket, err := newSocket(address)
  49. if err != nil {
  50. if !eaddrinuse(err) {
  51. return nil, nil, err
  52. }
  53. if err := RemoveSocket(address); err != nil {
  54. return nil, nil, errors.Wrap(err, "remove already used socket")
  55. }
  56. if socket, err = newSocket(address); err != nil {
  57. return nil, nil, err
  58. }
  59. }
  60. f, err := socket.File()
  61. if err != nil {
  62. return nil, nil, errors.Wrapf(err, "failed to get fd for socket %s", address)
  63. }
  64. defer f.Close()
  65. stdoutCopy := ioutil.Discard
  66. stderrCopy := ioutil.Discard
  67. stdoutLog, err := v1.OpenShimStdoutLog(ctx, config.WorkDir)
  68. if err != nil {
  69. return nil, nil, errors.Wrapf(err, "failed to create stdout log")
  70. }
  71. stderrLog, err := v1.OpenShimStderrLog(ctx, config.WorkDir)
  72. if err != nil {
  73. return nil, nil, errors.Wrapf(err, "failed to create stderr log")
  74. }
  75. if debug {
  76. stdoutCopy = os.Stdout
  77. stderrCopy = os.Stderr
  78. }
  79. go io.Copy(stdoutCopy, stdoutLog)
  80. go io.Copy(stderrCopy, stderrLog)
  81. cmd, err := newCommand(binary, daemonAddress, debug, config, f, stdoutLog, stderrLog)
  82. if err != nil {
  83. return nil, nil, err
  84. }
  85. if err := cmd.Start(); err != nil {
  86. return nil, nil, errors.Wrapf(err, "failed to start shim")
  87. }
  88. defer func() {
  89. if err != nil {
  90. cmd.Process.Kill()
  91. }
  92. }()
  93. go func() {
  94. cmd.Wait()
  95. exitHandler()
  96. if stdoutLog != nil {
  97. stdoutLog.Close()
  98. }
  99. if stderrLog != nil {
  100. stderrLog.Close()
  101. }
  102. socket.Close()
  103. RemoveSocket(address)
  104. }()
  105. log.G(ctx).WithFields(logrus.Fields{
  106. "pid": cmd.Process.Pid,
  107. "address": address,
  108. "debug": debug,
  109. }).Infof("shim %s started", binary)
  110. if err := writeFile(filepath.Join(config.Path, "address"), address); err != nil {
  111. return nil, nil, err
  112. }
  113. if err := writeFile(filepath.Join(config.Path, "shim.pid"), strconv.Itoa(cmd.Process.Pid)); err != nil {
  114. return nil, nil, err
  115. }
  116. // set shim in cgroup if it is provided
  117. if cgroup != "" {
  118. if err := setCgroup(cgroup, cmd); err != nil {
  119. return nil, nil, err
  120. }
  121. log.G(ctx).WithFields(logrus.Fields{
  122. "pid": cmd.Process.Pid,
  123. "address": address,
  124. }).Infof("shim placed in cgroup %s", cgroup)
  125. }
  126. if err = setupOOMScore(cmd.Process.Pid); err != nil {
  127. return nil, nil, err
  128. }
  129. c, clo, err := WithConnect(address, func() {})(ctx, config)
  130. if err != nil {
  131. return nil, nil, errors.Wrap(err, "failed to connect")
  132. }
  133. return c, clo, nil
  134. }
  135. }
  136. func eaddrinuse(err error) bool {
  137. cause := errors.Cause(err)
  138. netErr, ok := cause.(*net.OpError)
  139. if !ok {
  140. return false
  141. }
  142. if netErr.Op != "listen" {
  143. return false
  144. }
  145. syscallErr, ok := netErr.Err.(*os.SyscallError)
  146. if !ok {
  147. return false
  148. }
  149. errno, ok := syscallErr.Err.(syscall.Errno)
  150. if !ok {
  151. return false
  152. }
  153. return errno == syscall.EADDRINUSE
  154. }
  155. // setupOOMScore gets containerd's oom score and adds +1 to it
  156. // to ensure a shim has a lower* score than the daemons
  157. func setupOOMScore(shimPid int) error {
  158. pid := os.Getpid()
  159. score, err := sys.GetOOMScoreAdj(pid)
  160. if err != nil {
  161. return errors.Wrap(err, "get daemon OOM score")
  162. }
  163. shimScore := score + 1
  164. if err := sys.SetOOMScore(shimPid, shimScore); err != nil {
  165. return errors.Wrap(err, "set shim OOM score")
  166. }
  167. return nil
  168. }
  169. func newCommand(binary, daemonAddress string, debug bool, config shim.Config, socket *os.File, stdout, stderr io.Writer) (*exec.Cmd, error) {
  170. selfExe, err := os.Executable()
  171. if err != nil {
  172. return nil, err
  173. }
  174. args := []string{
  175. "-namespace", config.Namespace,
  176. "-workdir", config.WorkDir,
  177. "-address", daemonAddress,
  178. "-containerd-binary", selfExe,
  179. }
  180. if config.Criu != "" {
  181. args = append(args, "-criu-path", config.Criu)
  182. }
  183. if config.RuntimeRoot != "" {
  184. args = append(args, "-runtime-root", config.RuntimeRoot)
  185. }
  186. if config.SystemdCgroup {
  187. args = append(args, "-systemd-cgroup")
  188. }
  189. if debug {
  190. args = append(args, "-debug")
  191. }
  192. cmd := exec.Command(binary, args...)
  193. cmd.Dir = config.Path
  194. // make sure the shim can be re-parented to system init
  195. // and is cloned in a new mount namespace because the overlay/filesystems
  196. // will be mounted by the shim
  197. cmd.SysProcAttr = getSysProcAttr()
  198. cmd.ExtraFiles = append(cmd.ExtraFiles, socket)
  199. cmd.Env = append(os.Environ(), "GOMAXPROCS=2")
  200. cmd.Stdout = stdout
  201. cmd.Stderr = stderr
  202. return cmd, nil
  203. }
  204. // writeFile writes a address file atomically
  205. func writeFile(path, address string) error {
  206. path, err := filepath.Abs(path)
  207. if err != nil {
  208. return err
  209. }
  210. tempPath := filepath.Join(filepath.Dir(path), fmt.Sprintf(".%s", filepath.Base(path)))
  211. f, err := os.OpenFile(tempPath, os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, 0666)
  212. if err != nil {
  213. return err
  214. }
  215. _, err = f.WriteString(address)
  216. f.Close()
  217. if err != nil {
  218. return err
  219. }
  220. return os.Rename(tempPath, path)
  221. }
  222. const (
  223. abstractSocketPrefix = "\x00"
  224. socketPathLimit = 106
  225. )
  226. type socket string
  227. func (s socket) isAbstract() bool {
  228. return !strings.HasPrefix(string(s), "unix://")
  229. }
  230. func (s socket) path() string {
  231. path := strings.TrimPrefix(string(s), "unix://")
  232. // if there was no trim performed, we assume an abstract socket
  233. if len(path) == len(s) {
  234. path = abstractSocketPrefix + path
  235. }
  236. return path
  237. }
  238. func newSocket(address string) (*net.UnixListener, error) {
  239. if len(address) > socketPathLimit {
  240. return nil, errors.Errorf("%q: unix socket path too long (> %d)", address, socketPathLimit)
  241. }
  242. var (
  243. sock = socket(address)
  244. path = sock.path()
  245. )
  246. if !sock.isAbstract() {
  247. if err := os.MkdirAll(filepath.Dir(path), 0600); err != nil {
  248. return nil, errors.Wrapf(err, "%s", path)
  249. }
  250. }
  251. l, err := net.Listen("unix", path)
  252. if err != nil {
  253. return nil, errors.Wrapf(err, "failed to listen to unix socket %q (abstract: %t)", address, sock.isAbstract())
  254. }
  255. if err := os.Chmod(path, 0600); err != nil {
  256. l.Close()
  257. return nil, err
  258. }
  259. return l.(*net.UnixListener), nil
  260. }
  261. // RemoveSocket removes the socket at the specified address if
  262. // it exists on the filesystem
  263. func RemoveSocket(address string) error {
  264. sock := socket(address)
  265. if !sock.isAbstract() {
  266. return os.Remove(sock.path())
  267. }
  268. return nil
  269. }
  270. func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
  271. return d(address, 100*time.Second)
  272. }
  273. func anonDialer(address string, timeout time.Duration) (net.Conn, error) {
  274. return dialer.Dialer(socket(address).path(), timeout)
  275. }
  276. // WithConnect connects to an existing shim
  277. func WithConnect(address string, onClose func()) Opt {
  278. return func(ctx context.Context, config shim.Config) (shimapi.ShimService, io.Closer, error) {
  279. conn, err := connect(address, anonDialer)
  280. if err != nil {
  281. return nil, nil, err
  282. }
  283. client := ttrpc.NewClient(conn, ttrpc.WithOnClose(onClose))
  284. return shimapi.NewShimClient(client), conn, nil
  285. }
  286. }
  287. // WithLocal uses an in process shim
  288. func WithLocal(publisher events.Publisher) func(context.Context, shim.Config) (shimapi.ShimService, io.Closer, error) {
  289. return func(ctx context.Context, config shim.Config) (shimapi.ShimService, io.Closer, error) {
  290. service, err := shim.NewService(config, publisher)
  291. if err != nil {
  292. return nil, nil, err
  293. }
  294. return shim.NewLocal(service), nil, nil
  295. }
  296. }
  297. // New returns a new shim client
  298. func New(ctx context.Context, config shim.Config, opt Opt) (*Client, error) {
  299. s, c, err := opt(ctx, config)
  300. if err != nil {
  301. return nil, err
  302. }
  303. return &Client{
  304. ShimService: s,
  305. c: c,
  306. exitCh: make(chan struct{}),
  307. }, nil
  308. }
  309. // Client is a shim client containing the connection to a shim
  310. type Client struct {
  311. shimapi.ShimService
  312. c io.Closer
  313. exitCh chan struct{}
  314. exitOnce sync.Once
  315. }
  316. // IsAlive returns true if the shim can be contacted.
  317. // NOTE: a negative answer doesn't mean that the process is gone.
  318. func (c *Client) IsAlive(ctx context.Context) (bool, error) {
  319. _, err := c.ShimInfo(ctx, empty)
  320. if err != nil {
  321. // TODO(stevvooe): There are some error conditions that need to be
  322. // handle with unix sockets existence to give the right answer here.
  323. return false, err
  324. }
  325. return true, nil
  326. }
  327. // StopShim signals the shim to exit and wait for the process to disappear
  328. func (c *Client) StopShim(ctx context.Context) error {
  329. return c.signalShim(ctx, unix.SIGTERM)
  330. }
  331. // KillShim kills the shim forcefully and wait for the process to disappear
  332. func (c *Client) KillShim(ctx context.Context) error {
  333. return c.signalShim(ctx, unix.SIGKILL)
  334. }
  335. // Close the client connection
  336. func (c *Client) Close() error {
  337. if c.c == nil {
  338. return nil
  339. }
  340. return c.c.Close()
  341. }
  342. func (c *Client) signalShim(ctx context.Context, sig syscall.Signal) error {
  343. info, err := c.ShimInfo(ctx, empty)
  344. if err != nil {
  345. return err
  346. }
  347. pid := int(info.ShimPid)
  348. // make sure we don't kill ourselves if we are running a local shim
  349. if os.Getpid() == pid {
  350. return nil
  351. }
  352. if err := unix.Kill(pid, sig); err != nil && err != unix.ESRCH {
  353. return err
  354. }
  355. // wait for shim to die after being signaled
  356. select {
  357. case <-ctx.Done():
  358. return ctx.Err()
  359. case <-c.waitForExit(ctx, pid):
  360. return nil
  361. }
  362. }
  363. func (c *Client) waitForExit(ctx context.Context, pid int) <-chan struct{} {
  364. go c.exitOnce.Do(func() {
  365. defer close(c.exitCh)
  366. ticker := time.NewTicker(10 * time.Millisecond)
  367. defer ticker.Stop()
  368. for {
  369. // use kill(pid, 0) here because the shim could have been reparented
  370. // and we are no longer able to waitpid(pid, ...) on the shim
  371. if err := unix.Kill(pid, 0); err == unix.ESRCH {
  372. return
  373. }
  374. select {
  375. case <-ticker.C:
  376. case <-ctx.Done():
  377. log.G(ctx).WithField("pid", pid).Warn("timed out while waiting for shim to exit")
  378. return
  379. }
  380. }
  381. })
  382. return c.exitCh
  383. }