remote_linux.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. package libcontainerd
  2. import (
  3. "fmt"
  4. "io"
  5. "io/ioutil"
  6. "log"
  7. "net"
  8. "os"
  9. "os/exec"
  10. "path/filepath"
  11. "strconv"
  12. "sync"
  13. "syscall"
  14. "time"
  15. "github.com/Sirupsen/logrus"
  16. containerd "github.com/docker/containerd/api/grpc/types"
  17. sysinfo "github.com/docker/docker/pkg/system"
  18. "github.com/docker/docker/utils"
  19. "golang.org/x/net/context"
  20. "google.golang.org/grpc"
  21. "google.golang.org/grpc/grpclog"
  22. )
  23. const (
  24. maxConnectionRetryCount = 3
  25. connectionRetryDelay = 3 * time.Second
  26. containerdShutdownTimeout = 15 * time.Second
  27. containerdBinary = "docker-containerd"
  28. containerdPidFilename = "docker-containerd.pid"
  29. containerdSockFilename = "docker-containerd.sock"
  30. eventTimestampFilename = "event.ts"
  31. )
  32. type remote struct {
  33. sync.RWMutex
  34. apiClient containerd.APIClient
  35. daemonPid int
  36. stateDir string
  37. rpcAddr string
  38. startDaemon bool
  39. debugLog bool
  40. rpcConn *grpc.ClientConn
  41. clients []*client
  42. eventTsPath string
  43. pastEvents map[string]*containerd.Event
  44. runtimeArgs []string
  45. }
  46. // New creates a fresh instance of libcontainerd remote.
  47. func New(stateDir string, options ...RemoteOption) (_ Remote, err error) {
  48. defer func() {
  49. if err != nil {
  50. err = fmt.Errorf("Failed to connect to containerd. Please make sure containerd is installed in your PATH or you have specificed the correct address. Got error: %v", err)
  51. }
  52. }()
  53. r := &remote{
  54. stateDir: stateDir,
  55. daemonPid: -1,
  56. eventTsPath: filepath.Join(stateDir, eventTimestampFilename),
  57. pastEvents: make(map[string]*containerd.Event),
  58. }
  59. for _, option := range options {
  60. if err := option.Apply(r); err != nil {
  61. return nil, err
  62. }
  63. }
  64. if err := sysinfo.MkdirAll(stateDir, 0700); err != nil {
  65. return nil, err
  66. }
  67. if r.rpcAddr == "" {
  68. r.rpcAddr = filepath.Join(stateDir, containerdSockFilename)
  69. }
  70. if r.startDaemon {
  71. if err := r.runContainerdDaemon(); err != nil {
  72. return nil, err
  73. }
  74. }
  75. // don't output the grpc reconnect logging
  76. grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags))
  77. dialOpts := append([]grpc.DialOption{grpc.WithInsecure()},
  78. grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
  79. return net.DialTimeout("unix", addr, timeout)
  80. }),
  81. )
  82. conn, err := grpc.Dial(r.rpcAddr, dialOpts...)
  83. if err != nil {
  84. return nil, fmt.Errorf("error connecting to containerd: %v", err)
  85. }
  86. r.rpcConn = conn
  87. r.apiClient = containerd.NewAPIClient(conn)
  88. go r.handleConnectionChange()
  89. if err := r.startEventsMonitor(); err != nil {
  90. return nil, err
  91. }
  92. return r, nil
  93. }
  94. func (r *remote) handleConnectionChange() {
  95. var transientFailureCount = 0
  96. state := grpc.Idle
  97. for {
  98. s, err := r.rpcConn.WaitForStateChange(context.Background(), state)
  99. if err != nil {
  100. break
  101. }
  102. state = s
  103. logrus.Debugf("containerd connection state change: %v", s)
  104. if r.daemonPid != -1 {
  105. switch state {
  106. case grpc.TransientFailure:
  107. // Reset state to be notified of next failure
  108. transientFailureCount++
  109. if transientFailureCount >= maxConnectionRetryCount {
  110. transientFailureCount = 0
  111. if utils.IsProcessAlive(r.daemonPid) {
  112. utils.KillProcess(r.daemonPid)
  113. }
  114. if err := r.runContainerdDaemon(); err != nil { //FIXME: Handle error
  115. logrus.Errorf("error restarting containerd: %v", err)
  116. }
  117. } else {
  118. state = grpc.Idle
  119. time.Sleep(connectionRetryDelay)
  120. }
  121. case grpc.Shutdown:
  122. // Well, we asked for it to stop, just return
  123. return
  124. }
  125. }
  126. }
  127. }
  128. func (r *remote) Cleanup() {
  129. if r.daemonPid == -1 {
  130. return
  131. }
  132. r.rpcConn.Close()
  133. // Ask the daemon to quit
  134. syscall.Kill(r.daemonPid, syscall.SIGTERM)
  135. // Wait up to 15secs for it to stop
  136. for i := time.Duration(0); i < containerdShutdownTimeout; i += time.Second {
  137. if !utils.IsProcessAlive(r.daemonPid) {
  138. break
  139. }
  140. time.Sleep(time.Second)
  141. }
  142. if utils.IsProcessAlive(r.daemonPid) {
  143. logrus.Warnf("libcontainerd: containerd (%d) didn't stop within 15 secs, killing it\n", r.daemonPid)
  144. syscall.Kill(r.daemonPid, syscall.SIGKILL)
  145. }
  146. // cleanup some files
  147. os.Remove(filepath.Join(r.stateDir, containerdPidFilename))
  148. os.Remove(filepath.Join(r.stateDir, containerdSockFilename))
  149. }
  150. func (r *remote) Client(b Backend) (Client, error) {
  151. c := &client{
  152. clientCommon: clientCommon{
  153. backend: b,
  154. containerMutexes: make(map[string]*sync.Mutex),
  155. containers: make(map[string]*container),
  156. },
  157. remote: r,
  158. exitNotifiers: make(map[string]*exitNotifier),
  159. }
  160. r.Lock()
  161. r.clients = append(r.clients, c)
  162. r.Unlock()
  163. return c, nil
  164. }
  165. func (r *remote) updateEventTimestamp(t time.Time) {
  166. f, err := os.OpenFile(r.eventTsPath, syscall.O_CREAT|syscall.O_WRONLY|syscall.O_TRUNC, 0600)
  167. defer f.Close()
  168. if err != nil {
  169. logrus.Warnf("libcontainerd: failed to open event timestamp file: %v", err)
  170. return
  171. }
  172. b, err := t.MarshalText()
  173. if err != nil {
  174. logrus.Warnf("libcontainerd: failed to encode timestamp: %v", err)
  175. return
  176. }
  177. n, err := f.Write(b)
  178. if err != nil || n != len(b) {
  179. logrus.Warnf("libcontainerd: failed to update event timestamp file: %v", err)
  180. f.Truncate(0)
  181. return
  182. }
  183. }
  184. func (r *remote) getLastEventTimestamp() int64 {
  185. t := time.Now()
  186. fi, err := os.Stat(r.eventTsPath)
  187. if os.IsNotExist(err) {
  188. return t.Unix()
  189. }
  190. f, err := os.Open(r.eventTsPath)
  191. defer f.Close()
  192. if err != nil {
  193. logrus.Warn("libcontainerd: Unable to access last event ts: %v", err)
  194. return t.Unix()
  195. }
  196. b := make([]byte, fi.Size())
  197. n, err := f.Read(b)
  198. if err != nil || n != len(b) {
  199. logrus.Warn("libcontainerd: Unable to read last event ts: %v", err)
  200. return t.Unix()
  201. }
  202. t.UnmarshalText(b)
  203. return t.Unix()
  204. }
  205. func (r *remote) startEventsMonitor() error {
  206. // First, get past events
  207. er := &containerd.EventsRequest{
  208. Timestamp: uint64(r.getLastEventTimestamp()),
  209. }
  210. events, err := r.apiClient.Events(context.Background(), er)
  211. if err != nil {
  212. return err
  213. }
  214. go r.handleEventStream(events)
  215. return nil
  216. }
  217. func (r *remote) handleEventStream(events containerd.API_EventsClient) {
  218. live := false
  219. for {
  220. e, err := events.Recv()
  221. if err != nil {
  222. logrus.Errorf("failed to receive event from containerd: %v", err)
  223. go r.startEventsMonitor()
  224. return
  225. }
  226. if live == false {
  227. logrus.Debugf("received past containerd event: %#v", e)
  228. // Pause/Resume events should never happens after exit one
  229. switch e.Type {
  230. case StateExit:
  231. r.pastEvents[e.Id] = e
  232. case StatePause:
  233. r.pastEvents[e.Id] = e
  234. case StateResume:
  235. r.pastEvents[e.Id] = e
  236. case stateLive:
  237. live = true
  238. r.updateEventTimestamp(time.Unix(int64(e.Timestamp), 0))
  239. }
  240. } else {
  241. logrus.Debugf("received containerd event: %#v", e)
  242. var container *container
  243. var c *client
  244. r.RLock()
  245. for _, c = range r.clients {
  246. container, err = c.getContainer(e.Id)
  247. if err == nil {
  248. break
  249. }
  250. }
  251. r.RUnlock()
  252. if container == nil {
  253. logrus.Errorf("no state for container: %q", err)
  254. continue
  255. }
  256. if err := container.handleEvent(e); err != nil {
  257. logrus.Errorf("error processing state change for %s: %v", e.Id, err)
  258. }
  259. r.updateEventTimestamp(time.Unix(int64(e.Timestamp), 0))
  260. }
  261. }
  262. }
  263. func (r *remote) runContainerdDaemon() error {
  264. pidFilename := filepath.Join(r.stateDir, containerdPidFilename)
  265. f, err := os.OpenFile(pidFilename, os.O_RDWR|os.O_CREATE, 0600)
  266. defer f.Close()
  267. if err != nil {
  268. return err
  269. }
  270. // File exist, check if the daemon is alive
  271. b := make([]byte, 8)
  272. n, err := f.Read(b)
  273. if err != nil && err != io.EOF {
  274. return err
  275. }
  276. if n > 0 {
  277. pid, err := strconv.ParseUint(string(b[:n]), 10, 64)
  278. if err != nil {
  279. return err
  280. }
  281. if utils.IsProcessAlive(int(pid)) {
  282. logrus.Infof("previous instance of containerd still alive (%d)", pid)
  283. r.daemonPid = int(pid)
  284. return nil
  285. }
  286. }
  287. // rewind the file
  288. _, err = f.Seek(0, os.SEEK_SET)
  289. if err != nil {
  290. return err
  291. }
  292. // Truncate it
  293. err = f.Truncate(0)
  294. if err != nil {
  295. return err
  296. }
  297. // Start a new instance
  298. args := []string{"-l", r.rpcAddr, "--runtime", "docker-runc"}
  299. if r.debugLog {
  300. args = append(args, "--debug")
  301. }
  302. if len(r.runtimeArgs) > 0 {
  303. for _, v := range r.runtimeArgs {
  304. args = append(args, "--runtime-args")
  305. args = append(args, v)
  306. }
  307. logrus.Debugf("runContainerdDaemon: runtimeArgs: %s", args)
  308. }
  309. cmd := exec.Command(containerdBinary, args...)
  310. // TODO: store logs?
  311. cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true}
  312. if err := cmd.Start(); err != nil {
  313. return err
  314. }
  315. logrus.Infof("New containerd process, pid: %d\n", cmd.Process.Pid)
  316. if _, err := f.WriteString(fmt.Sprintf("%d", cmd.Process.Pid)); err != nil {
  317. utils.KillProcess(cmd.Process.Pid)
  318. return err
  319. }
  320. go cmd.Wait() // Reap our child when needed
  321. r.daemonPid = cmd.Process.Pid
  322. return nil
  323. }
  324. // WithRemoteAddr sets the external containerd socket to connect to.
  325. func WithRemoteAddr(addr string) RemoteOption {
  326. return rpcAddr(addr)
  327. }
  328. type rpcAddr string
  329. func (a rpcAddr) Apply(r Remote) error {
  330. if remote, ok := r.(*remote); ok {
  331. remote.rpcAddr = string(a)
  332. return nil
  333. }
  334. return fmt.Errorf("WithRemoteAddr option not supported for this remote")
  335. }
  336. // WithRuntimeArgs sets the list of runtime args passed to containerd
  337. func WithRuntimeArgs(args []string) RemoteOption {
  338. return runtimeArgs(args)
  339. }
  340. type runtimeArgs []string
  341. func (rt runtimeArgs) Apply(r Remote) error {
  342. if remote, ok := r.(*remote); ok {
  343. remote.runtimeArgs = rt
  344. return nil
  345. }
  346. return fmt.Errorf("WithRuntimeArgs option not supported for this remote")
  347. }
  348. // WithStartDaemon defines if libcontainerd should also run containerd daemon.
  349. func WithStartDaemon(start bool) RemoteOption {
  350. return startDaemon(start)
  351. }
  352. type startDaemon bool
  353. func (s startDaemon) Apply(r Remote) error {
  354. if remote, ok := r.(*remote); ok {
  355. remote.startDaemon = bool(s)
  356. return nil
  357. }
  358. return fmt.Errorf("WithStartDaemon option not supported for this remote")
  359. }
  360. // WithDebugLog defines if containerd debug logs will be enabled for daemon.
  361. func WithDebugLog(debug bool) RemoteOption {
  362. return debugLog(debug)
  363. }
  364. type debugLog bool
  365. func (d debugLog) Apply(r Remote) error {
  366. if remote, ok := r.(*remote); ok {
  367. remote.debugLog = bool(d)
  368. return nil
  369. }
  370. return fmt.Errorf("WithDebugLog option not supported for this remote")
  371. }