controller.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598
  1. package container
  2. import (
  3. "bufio"
  4. "bytes"
  5. "encoding/binary"
  6. "fmt"
  7. "io"
  8. "os"
  9. "time"
  10. "github.com/docker/docker/api/types"
  11. "github.com/docker/docker/api/types/events"
  12. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  13. "github.com/docker/libnetwork"
  14. "github.com/docker/swarmkit/agent/exec"
  15. "github.com/docker/swarmkit/api"
  16. "github.com/docker/swarmkit/log"
  17. "github.com/docker/swarmkit/protobuf/ptypes"
  18. "github.com/pkg/errors"
  19. "golang.org/x/net/context"
  20. "golang.org/x/time/rate"
  21. )
  22. // controller implements agent.Controller against docker's API.
  23. //
  24. // Most operations against docker's API are done through the container name,
  25. // which is unique to the task.
  26. type controller struct {
  27. task *api.Task
  28. adapter *containerAdapter
  29. closed chan struct{}
  30. err error
  31. pulled chan struct{} // closed after pull
  32. cancelPull func() // cancels pull context if not nil
  33. pullErr error // pull error, only read after pulled closed
  34. }
  35. var _ exec.Controller = &controller{}
  36. // NewController returns a docker exec runner for the provided task.
  37. func newController(b executorpkg.Backend, task *api.Task, secrets exec.SecretGetter) (*controller, error) {
  38. adapter, err := newContainerAdapter(b, task, secrets)
  39. if err != nil {
  40. return nil, err
  41. }
  42. return &controller{
  43. task: task,
  44. adapter: adapter,
  45. closed: make(chan struct{}),
  46. }, nil
  47. }
  48. func (r *controller) Task() (*api.Task, error) {
  49. return r.task, nil
  50. }
  51. // ContainerStatus returns the container-specific status for the task.
  52. func (r *controller) ContainerStatus(ctx context.Context) (*api.ContainerStatus, error) {
  53. ctnr, err := r.adapter.inspect(ctx)
  54. if err != nil {
  55. if isUnknownContainer(err) {
  56. return nil, nil
  57. }
  58. return nil, err
  59. }
  60. return parseContainerStatus(ctnr)
  61. }
  62. // Update tasks a recent task update and applies it to the container.
  63. func (r *controller) Update(ctx context.Context, t *api.Task) error {
  64. // TODO(stevvooe): While assignment of tasks is idempotent, we do allow
  65. // updates of metadata, such as labelling, as well as any other properties
  66. // that make sense.
  67. return nil
  68. }
  69. // Prepare creates a container and ensures the image is pulled.
  70. //
  71. // If the container has already be created, exec.ErrTaskPrepared is returned.
  72. func (r *controller) Prepare(ctx context.Context) error {
  73. if err := r.checkClosed(); err != nil {
  74. return err
  75. }
  76. // Make sure all the networks that the task needs are created.
  77. if err := r.adapter.createNetworks(ctx); err != nil {
  78. return err
  79. }
  80. // Make sure all the volumes that the task needs are created.
  81. if err := r.adapter.createVolumes(ctx); err != nil {
  82. return err
  83. }
  84. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
  85. if r.pulled == nil {
  86. // Fork the pull to a different context to allow pull to continue
  87. // on re-entrant calls to Prepare. This ensures that Prepare can be
  88. // idempotent and not incur the extra cost of pulling when
  89. // cancelled on updates.
  90. var pctx context.Context
  91. r.pulled = make(chan struct{})
  92. pctx, r.cancelPull = context.WithCancel(context.Background()) // TODO(stevvooe): Bind a context to the entire controller.
  93. go func() {
  94. defer close(r.pulled)
  95. r.pullErr = r.adapter.pullImage(pctx) // protected by closing r.pulled
  96. }()
  97. }
  98. select {
  99. case <-ctx.Done():
  100. return ctx.Err()
  101. case <-r.pulled:
  102. if r.pullErr != nil {
  103. // NOTE(stevvooe): We always try to pull the image to make sure we have
  104. // the most up to date version. This will return an error, but we only
  105. // log it. If the image truly doesn't exist, the create below will
  106. // error out.
  107. //
  108. // This gives us some nice behavior where we use up to date versions of
  109. // mutable tags, but will still run if the old image is available but a
  110. // registry is down.
  111. //
  112. // If you don't want this behavior, lock down your image to an
  113. // immutable tag or digest.
  114. log.G(ctx).WithError(r.pullErr).Error("pulling image failed")
  115. }
  116. }
  117. }
  118. if err := r.adapter.create(ctx); err != nil {
  119. if isContainerCreateNameConflict(err) {
  120. if _, err := r.adapter.inspect(ctx); err != nil {
  121. return err
  122. }
  123. // container is already created. success!
  124. return exec.ErrTaskPrepared
  125. }
  126. return err
  127. }
  128. return nil
  129. }
  130. // Start the container. An error will be returned if the container is already started.
  131. func (r *controller) Start(ctx context.Context) error {
  132. if err := r.checkClosed(); err != nil {
  133. return err
  134. }
  135. ctnr, err := r.adapter.inspect(ctx)
  136. if err != nil {
  137. return err
  138. }
  139. // Detect whether the container has *ever* been started. If so, we don't
  140. // issue the start.
  141. //
  142. // TODO(stevvooe): This is very racy. While reading inspect, another could
  143. // start the process and we could end up starting it twice.
  144. if ctnr.State.Status != "created" {
  145. return exec.ErrTaskStarted
  146. }
  147. for {
  148. if err := r.adapter.start(ctx); err != nil {
  149. if _, ok := err.(libnetwork.ErrNoSuchNetwork); ok {
  150. // Retry network creation again if we
  151. // failed because some of the networks
  152. // were not found.
  153. if err := r.adapter.createNetworks(ctx); err != nil {
  154. return err
  155. }
  156. continue
  157. }
  158. return errors.Wrap(err, "starting container failed")
  159. }
  160. break
  161. }
  162. // no health check
  163. if ctnr.Config == nil || ctnr.Config.Healthcheck == nil {
  164. if err := r.adapter.activateServiceBinding(); err != nil {
  165. log.G(ctx).WithError(err).Errorf("failed to activate service binding for container %s which has no healthcheck config", r.adapter.container.name())
  166. return err
  167. }
  168. return nil
  169. }
  170. healthCmd := ctnr.Config.Healthcheck.Test
  171. if len(healthCmd) == 0 || healthCmd[0] == "NONE" {
  172. return nil
  173. }
  174. // wait for container to be healthy
  175. eventq := r.adapter.events(ctx)
  176. var healthErr error
  177. for {
  178. select {
  179. case event := <-eventq:
  180. if !r.matchevent(event) {
  181. continue
  182. }
  183. switch event.Action {
  184. case "die": // exit on terminal events
  185. ctnr, err := r.adapter.inspect(ctx)
  186. if err != nil {
  187. return errors.Wrap(err, "die event received")
  188. } else if ctnr.State.ExitCode != 0 {
  189. return &exitError{code: ctnr.State.ExitCode, cause: healthErr}
  190. }
  191. return nil
  192. case "destroy":
  193. // If we get here, something has gone wrong but we want to exit
  194. // and report anyways.
  195. return ErrContainerDestroyed
  196. case "health_status: unhealthy":
  197. // in this case, we stop the container and report unhealthy status
  198. if err := r.Shutdown(ctx); err != nil {
  199. return errors.Wrap(err, "unhealthy container shutdown failed")
  200. }
  201. // set health check error, and wait for container to fully exit ("die" event)
  202. healthErr = ErrContainerUnhealthy
  203. case "health_status: healthy":
  204. if err := r.adapter.activateServiceBinding(); err != nil {
  205. log.G(ctx).WithError(err).Errorf("failed to activate service binding for container %s after healthy event", r.adapter.container.name())
  206. return err
  207. }
  208. return nil
  209. }
  210. case <-ctx.Done():
  211. return ctx.Err()
  212. case <-r.closed:
  213. return r.err
  214. }
  215. }
  216. }
  217. // Wait on the container to exit.
  218. func (r *controller) Wait(pctx context.Context) error {
  219. if err := r.checkClosed(); err != nil {
  220. return err
  221. }
  222. ctx, cancel := context.WithCancel(pctx)
  223. defer cancel()
  224. healthErr := make(chan error, 1)
  225. go func() {
  226. ectx, cancel := context.WithCancel(ctx) // cancel event context on first event
  227. defer cancel()
  228. if err := r.checkHealth(ectx); err == ErrContainerUnhealthy {
  229. healthErr <- ErrContainerUnhealthy
  230. if err := r.Shutdown(ectx); err != nil {
  231. log.G(ectx).WithError(err).Debug("shutdown failed on unhealthy")
  232. }
  233. }
  234. }()
  235. err := r.adapter.wait(ctx)
  236. if ctx.Err() != nil {
  237. return ctx.Err()
  238. }
  239. if err != nil {
  240. ee := &exitError{}
  241. if ec, ok := err.(exec.ExitCoder); ok {
  242. ee.code = ec.ExitCode()
  243. }
  244. select {
  245. case e := <-healthErr:
  246. ee.cause = e
  247. default:
  248. if err.Error() != "" {
  249. ee.cause = err
  250. }
  251. }
  252. return ee
  253. }
  254. return nil
  255. }
  256. // Shutdown the container cleanly.
  257. func (r *controller) Shutdown(ctx context.Context) error {
  258. if err := r.checkClosed(); err != nil {
  259. return err
  260. }
  261. if r.cancelPull != nil {
  262. r.cancelPull()
  263. }
  264. // remove container from service binding
  265. if err := r.adapter.deactivateServiceBinding(); err != nil {
  266. log.G(ctx).WithError(err).Errorf("failed to deactivate service binding for container %s", r.adapter.container.name())
  267. return err
  268. }
  269. if err := r.adapter.shutdown(ctx); err != nil {
  270. if isUnknownContainer(err) || isStoppedContainer(err) {
  271. return nil
  272. }
  273. return err
  274. }
  275. return nil
  276. }
  277. // Terminate the container, with force.
  278. func (r *controller) Terminate(ctx context.Context) error {
  279. if err := r.checkClosed(); err != nil {
  280. return err
  281. }
  282. if r.cancelPull != nil {
  283. r.cancelPull()
  284. }
  285. if err := r.adapter.terminate(ctx); err != nil {
  286. if isUnknownContainer(err) {
  287. return nil
  288. }
  289. return err
  290. }
  291. return nil
  292. }
  293. // Remove the container and its resources.
  294. func (r *controller) Remove(ctx context.Context) error {
  295. if err := r.checkClosed(); err != nil {
  296. return err
  297. }
  298. if r.cancelPull != nil {
  299. r.cancelPull()
  300. }
  301. // It may be necessary to shut down the task before removing it.
  302. if err := r.Shutdown(ctx); err != nil {
  303. if isUnknownContainer(err) {
  304. return nil
  305. }
  306. // This may fail if the task was already shut down.
  307. log.G(ctx).WithError(err).Debug("shutdown failed on removal")
  308. }
  309. // Try removing networks referenced in this task in case this
  310. // task is the last one referencing it
  311. if err := r.adapter.removeNetworks(ctx); err != nil {
  312. if isUnknownContainer(err) {
  313. return nil
  314. }
  315. return err
  316. }
  317. if err := r.adapter.remove(ctx); err != nil {
  318. if isUnknownContainer(err) {
  319. return nil
  320. }
  321. return err
  322. }
  323. return nil
  324. }
  325. // waitReady waits for a container to be "ready".
  326. // Ready means it's past the started state.
  327. func (r *controller) waitReady(pctx context.Context) error {
  328. if err := r.checkClosed(); err != nil {
  329. return err
  330. }
  331. ctx, cancel := context.WithCancel(pctx)
  332. defer cancel()
  333. eventq := r.adapter.events(ctx)
  334. ctnr, err := r.adapter.inspect(ctx)
  335. if err != nil {
  336. if !isUnknownContainer(err) {
  337. return errors.Wrap(err, "inspect container failed")
  338. }
  339. } else {
  340. switch ctnr.State.Status {
  341. case "running", "exited", "dead":
  342. return nil
  343. }
  344. }
  345. for {
  346. select {
  347. case event := <-eventq:
  348. if !r.matchevent(event) {
  349. continue
  350. }
  351. switch event.Action {
  352. case "start":
  353. return nil
  354. }
  355. case <-ctx.Done():
  356. return ctx.Err()
  357. case <-r.closed:
  358. return r.err
  359. }
  360. }
  361. }
  362. func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, options api.LogSubscriptionOptions) error {
  363. if err := r.checkClosed(); err != nil {
  364. return err
  365. }
  366. if err := r.waitReady(ctx); err != nil {
  367. return errors.Wrap(err, "container not ready for logs")
  368. }
  369. rc, err := r.adapter.logs(ctx, options)
  370. if err != nil {
  371. return errors.Wrap(err, "failed getting container logs")
  372. }
  373. defer rc.Close()
  374. var (
  375. // use a rate limiter to keep things under control but also provides some
  376. // ability coalesce messages.
  377. limiter = rate.NewLimiter(rate.Every(time.Second), 10<<20) // 10 MB/s
  378. msgctx = api.LogContext{
  379. NodeID: r.task.NodeID,
  380. ServiceID: r.task.ServiceID,
  381. TaskID: r.task.ID,
  382. }
  383. )
  384. brd := bufio.NewReader(rc)
  385. for {
  386. // so, message header is 8 bytes, treat as uint64, pull stream off MSB
  387. var header uint64
  388. if err := binary.Read(brd, binary.BigEndian, &header); err != nil {
  389. if err == io.EOF {
  390. return nil
  391. }
  392. return errors.Wrap(err, "failed reading log header")
  393. }
  394. stream, size := (header>>(7<<3))&0xFF, header & ^(uint64(0xFF)<<(7<<3))
  395. // limit here to decrease allocation back pressure.
  396. if err := limiter.WaitN(ctx, int(size)); err != nil {
  397. return errors.Wrap(err, "failed rate limiter")
  398. }
  399. buf := make([]byte, size)
  400. _, err := io.ReadFull(brd, buf)
  401. if err != nil {
  402. return errors.Wrap(err, "failed reading buffer")
  403. }
  404. // Timestamp is RFC3339Nano with 1 space after. Lop, parse, publish
  405. parts := bytes.SplitN(buf, []byte(" "), 2)
  406. if len(parts) != 2 {
  407. return fmt.Errorf("invalid timestamp in log message: %v", buf)
  408. }
  409. ts, err := time.Parse(time.RFC3339Nano, string(parts[0]))
  410. if err != nil {
  411. return errors.Wrap(err, "failed to parse timestamp")
  412. }
  413. tsp, err := ptypes.TimestampProto(ts)
  414. if err != nil {
  415. return errors.Wrap(err, "failed to convert timestamp")
  416. }
  417. if err := publisher.Publish(ctx, api.LogMessage{
  418. Context: msgctx,
  419. Timestamp: tsp,
  420. Stream: api.LogStream(stream),
  421. Data: parts[1],
  422. }); err != nil {
  423. return errors.Wrap(err, "failed to publish log message")
  424. }
  425. }
  426. }
  427. // Close the runner and clean up any ephemeral resources.
  428. func (r *controller) Close() error {
  429. select {
  430. case <-r.closed:
  431. return r.err
  432. default:
  433. if r.cancelPull != nil {
  434. r.cancelPull()
  435. }
  436. r.err = exec.ErrControllerClosed
  437. close(r.closed)
  438. }
  439. return nil
  440. }
  441. func (r *controller) matchevent(event events.Message) bool {
  442. if event.Type != events.ContainerEventType {
  443. return false
  444. }
  445. // TODO(stevvooe): Filter based on ID matching, in addition to name.
  446. // Make sure the events are for this container.
  447. if event.Actor.Attributes["name"] != r.adapter.container.name() {
  448. return false
  449. }
  450. return true
  451. }
  452. func (r *controller) checkClosed() error {
  453. select {
  454. case <-r.closed:
  455. return r.err
  456. default:
  457. return nil
  458. }
  459. }
  460. func parseContainerStatus(ctnr types.ContainerJSON) (*api.ContainerStatus, error) {
  461. status := &api.ContainerStatus{
  462. ContainerID: ctnr.ID,
  463. PID: int32(ctnr.State.Pid),
  464. ExitCode: int32(ctnr.State.ExitCode),
  465. }
  466. return status, nil
  467. }
  468. type exitError struct {
  469. code int
  470. cause error
  471. }
  472. func (e *exitError) Error() string {
  473. if e.cause != nil {
  474. return fmt.Sprintf("task: non-zero exit (%v): %v", e.code, e.cause)
  475. }
  476. return fmt.Sprintf("task: non-zero exit (%v)", e.code)
  477. }
  478. func (e *exitError) ExitCode() int {
  479. return int(e.code)
  480. }
  481. func (e *exitError) Cause() error {
  482. return e.cause
  483. }
  484. // checkHealth blocks until unhealthy container is detected or ctx exits
  485. func (r *controller) checkHealth(ctx context.Context) error {
  486. eventq := r.adapter.events(ctx)
  487. for {
  488. select {
  489. case <-ctx.Done():
  490. return nil
  491. case <-r.closed:
  492. return nil
  493. case event := <-eventq:
  494. if !r.matchevent(event) {
  495. continue
  496. }
  497. switch event.Action {
  498. case "health_status: unhealthy":
  499. return ErrContainerUnhealthy
  500. }
  501. }
  502. }
  503. }