remote_daemon.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. package supervisor // import "github.com/docker/docker/libcontainerd/supervisor"
  2. import (
  3. "context"
  4. "io"
  5. "os"
  6. "os/exec"
  7. "path/filepath"
  8. "runtime"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/containerd/containerd"
  13. "github.com/containerd/containerd/services/server/config"
  14. "github.com/containerd/containerd/sys"
  15. "github.com/docker/docker/pkg/system"
  16. "github.com/pelletier/go-toml"
  17. "github.com/pkg/errors"
  18. "github.com/sirupsen/logrus"
  19. )
  20. const (
  21. maxConnectionRetryCount = 3
  22. healthCheckTimeout = 3 * time.Second
  23. shutdownTimeout = 15 * time.Second
  24. startupTimeout = 15 * time.Second
  25. configFile = "containerd.toml"
  26. binaryName = "containerd"
  27. pidFile = "containerd.pid"
  28. )
  29. type remote struct {
  30. config.Config
  31. // configFile is the location where the generated containerd configuration
  32. // file is saved.
  33. configFile string
  34. daemonPid int
  35. pidFile string
  36. logger *logrus.Entry
  37. daemonWaitCh chan struct{}
  38. daemonStartCh chan error
  39. daemonStopCh chan struct{}
  40. stateDir string
  41. // oomScore adjusts the OOM score for the containerd process.
  42. oomScore int
  43. // logLevel overrides the containerd logging-level through the --log-level
  44. // command-line option.
  45. logLevel string
  46. }
  47. // Daemon represents a running containerd daemon
  48. type Daemon interface {
  49. WaitTimeout(time.Duration) error
  50. Address() string
  51. }
  52. // DaemonOpt allows to configure parameters of container daemons
  53. type DaemonOpt func(c *remote) error
  54. // Start starts a containerd daemon and monitors it
  55. func Start(ctx context.Context, rootDir, stateDir string, opts ...DaemonOpt) (Daemon, error) {
  56. r := &remote{
  57. stateDir: stateDir,
  58. Config: config.Config{
  59. Version: 2,
  60. Root: filepath.Join(rootDir, "daemon"),
  61. State: filepath.Join(stateDir, "daemon"),
  62. },
  63. configFile: filepath.Join(stateDir, configFile),
  64. daemonPid: -1,
  65. pidFile: filepath.Join(stateDir, pidFile),
  66. logger: logrus.WithField("module", "libcontainerd"),
  67. daemonStartCh: make(chan error, 1),
  68. daemonStopCh: make(chan struct{}),
  69. }
  70. for _, opt := range opts {
  71. if err := opt(r); err != nil {
  72. return nil, err
  73. }
  74. }
  75. r.setDefaults()
  76. if err := system.MkdirAll(stateDir, 0700); err != nil {
  77. return nil, err
  78. }
  79. go r.monitorDaemon(ctx)
  80. timeout := time.NewTimer(startupTimeout)
  81. defer timeout.Stop()
  82. select {
  83. case <-timeout.C:
  84. return nil, errors.New("timeout waiting for containerd to start")
  85. case err := <-r.daemonStartCh:
  86. if err != nil {
  87. return nil, err
  88. }
  89. }
  90. return r, nil
  91. }
  92. func (r *remote) WaitTimeout(d time.Duration) error {
  93. timeout := time.NewTimer(d)
  94. defer timeout.Stop()
  95. select {
  96. case <-timeout.C:
  97. return errors.New("timeout waiting for containerd to stop")
  98. case <-r.daemonStopCh:
  99. }
  100. return nil
  101. }
  102. func (r *remote) Address() string {
  103. return r.GRPC.Address
  104. }
  105. func (r *remote) getContainerdPid() (int, error) {
  106. f, err := os.OpenFile(r.pidFile, os.O_RDWR, 0600)
  107. if err != nil {
  108. if os.IsNotExist(err) {
  109. return -1, nil
  110. }
  111. return -1, err
  112. }
  113. defer f.Close()
  114. b := make([]byte, 8)
  115. n, err := f.Read(b)
  116. if err != nil && err != io.EOF {
  117. return -1, err
  118. }
  119. if n > 0 {
  120. pid, err := strconv.ParseUint(string(b[:n]), 10, 64)
  121. if err != nil {
  122. return -1, err
  123. }
  124. if system.IsProcessAlive(int(pid)) {
  125. return int(pid), nil
  126. }
  127. }
  128. return -1, nil
  129. }
  130. func (r *remote) getContainerdConfig() (string, error) {
  131. f, err := os.OpenFile(r.configFile, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
  132. if err != nil {
  133. return "", errors.Wrapf(err, "failed to open containerd config file (%s)", r.configFile)
  134. }
  135. defer f.Close()
  136. if err := toml.NewEncoder(f).Encode(r); err != nil {
  137. return "", errors.Wrapf(err, "failed to write containerd config file (%s)", r.configFile)
  138. }
  139. return r.configFile, nil
  140. }
  141. func (r *remote) startContainerd() error {
  142. pid, err := r.getContainerdPid()
  143. if err != nil {
  144. return err
  145. }
  146. if pid != -1 {
  147. r.daemonPid = pid
  148. r.logger.WithField("pid", pid).Infof("%s is still running", binaryName)
  149. return nil
  150. }
  151. configFile, err := r.getContainerdConfig()
  152. if err != nil {
  153. return err
  154. }
  155. args := []string{"--config", configFile}
  156. if r.logLevel != "" {
  157. args = append(args, "--log-level", r.logLevel)
  158. }
  159. cmd := exec.Command(binaryName, args...)
  160. // redirect containerd logs to docker logs
  161. cmd.Stdout = os.Stdout
  162. cmd.Stderr = os.Stderr
  163. cmd.SysProcAttr = containerdSysProcAttr()
  164. // clear the NOTIFY_SOCKET from the env when starting containerd
  165. cmd.Env = nil
  166. for _, e := range os.Environ() {
  167. if !strings.HasPrefix(e, "NOTIFY_SOCKET") {
  168. cmd.Env = append(cmd.Env, e)
  169. }
  170. }
  171. startedCh := make(chan error)
  172. go func() {
  173. // On Linux, when cmd.SysProcAttr.Pdeathsig is set,
  174. // the signal is sent to the subprocess when the creating thread
  175. // terminates. The runtime terminates a thread if a goroutine
  176. // exits while locked to it. Prevent the containerd process
  177. // from getting killed prematurely by ensuring that the thread
  178. // used to start it remains alive until it or the daemon process
  179. // exits. See https://go.dev/issue/27505 for more details.
  180. runtime.LockOSThread()
  181. defer runtime.UnlockOSThread()
  182. err := cmd.Start()
  183. startedCh <- err
  184. if err != nil {
  185. return
  186. }
  187. r.daemonWaitCh = make(chan struct{})
  188. // Reap our child when needed
  189. if err := cmd.Wait(); err != nil {
  190. r.logger.WithError(err).Errorf("containerd did not exit successfully")
  191. }
  192. close(r.daemonWaitCh)
  193. }()
  194. if err := <-startedCh; err != nil {
  195. return err
  196. }
  197. r.daemonPid = cmd.Process.Pid
  198. if err := r.adjustOOMScore(); err != nil {
  199. r.logger.WithError(err).Warn("failed to adjust OOM score")
  200. }
  201. err = os.WriteFile(r.pidFile, []byte(strconv.Itoa(r.daemonPid)), 0660)
  202. if err != nil {
  203. system.KillProcess(r.daemonPid)
  204. return errors.Wrap(err, "libcontainerd: failed to save daemon pid to disk")
  205. }
  206. r.logger.WithField("pid", r.daemonPid).WithField("address", r.Address()).Infof("started new %s process", binaryName)
  207. return nil
  208. }
  209. func (r *remote) adjustOOMScore() error {
  210. if r.oomScore == 0 || r.daemonPid <= 1 {
  211. // no score configured, or daemonPid contains an invalid PID (we don't
  212. // expect containerd to be running as PID 1 :)).
  213. return nil
  214. }
  215. if err := sys.SetOOMScore(r.daemonPid, r.oomScore); err != nil {
  216. return errors.Wrap(err, "failed to adjust OOM score for containerd process")
  217. }
  218. return nil
  219. }
  220. func (r *remote) monitorDaemon(ctx context.Context) {
  221. var (
  222. transientFailureCount = 0
  223. client *containerd.Client
  224. err error
  225. delay time.Duration
  226. timer = time.NewTimer(0)
  227. started bool
  228. )
  229. defer func() {
  230. if r.daemonPid != -1 {
  231. r.stopDaemon()
  232. }
  233. // cleanup some files
  234. _ = os.Remove(r.pidFile)
  235. r.platformCleanup()
  236. close(r.daemonStopCh)
  237. timer.Stop()
  238. }()
  239. // ensure no races on sending to timer.C even though there is a 0 duration.
  240. if !timer.Stop() {
  241. <-timer.C
  242. }
  243. for {
  244. timer.Reset(delay)
  245. select {
  246. case <-ctx.Done():
  247. r.logger.Info("stopping healthcheck following graceful shutdown")
  248. if client != nil {
  249. client.Close()
  250. }
  251. return
  252. case <-timer.C:
  253. }
  254. if r.daemonPid == -1 {
  255. if r.daemonWaitCh != nil {
  256. select {
  257. case <-ctx.Done():
  258. r.logger.Info("stopping containerd startup following graceful shutdown")
  259. return
  260. case <-r.daemonWaitCh:
  261. }
  262. }
  263. os.RemoveAll(r.GRPC.Address)
  264. if err := r.startContainerd(); err != nil {
  265. if !started {
  266. r.daemonStartCh <- err
  267. return
  268. }
  269. r.logger.WithError(err).Error("failed restarting containerd")
  270. delay = 50 * time.Millisecond
  271. continue
  272. }
  273. client, err = containerd.New(r.GRPC.Address, containerd.WithTimeout(60*time.Second))
  274. if err != nil {
  275. r.logger.WithError(err).Error("failed connecting to containerd")
  276. delay = 100 * time.Millisecond
  277. continue
  278. }
  279. r.logger.WithField("address", r.GRPC.Address).Debug("created containerd monitoring client")
  280. }
  281. if client != nil {
  282. tctx, cancel := context.WithTimeout(ctx, healthCheckTimeout)
  283. _, err := client.IsServing(tctx)
  284. cancel()
  285. if err == nil {
  286. if !started {
  287. close(r.daemonStartCh)
  288. started = true
  289. }
  290. transientFailureCount = 0
  291. select {
  292. case <-r.daemonWaitCh:
  293. case <-ctx.Done():
  294. }
  295. // Set a small delay in case there is a recurring failure (or bug in this code)
  296. // to ensure we don't end up in a super tight loop.
  297. delay = 500 * time.Millisecond
  298. continue
  299. }
  300. r.logger.WithError(err).WithField("binary", binaryName).Debug("daemon is not responding")
  301. transientFailureCount++
  302. if transientFailureCount < maxConnectionRetryCount || system.IsProcessAlive(r.daemonPid) {
  303. delay = time.Duration(transientFailureCount) * 200 * time.Millisecond
  304. continue
  305. }
  306. client.Close()
  307. client = nil
  308. }
  309. if system.IsProcessAlive(r.daemonPid) {
  310. r.logger.WithField("pid", r.daemonPid).Info("killing and restarting containerd")
  311. r.killDaemon()
  312. }
  313. r.daemonPid = -1
  314. delay = 0
  315. transientFailureCount = 0
  316. }
  317. }