controller.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650
  1. package container
  2. import (
  3. "fmt"
  4. "os"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/docker/docker/api/types"
  9. "github.com/docker/docker/api/types/events"
  10. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  11. "github.com/docker/go-connections/nat"
  12. "github.com/docker/libnetwork"
  13. "github.com/docker/swarmkit/agent/exec"
  14. "github.com/docker/swarmkit/api"
  15. "github.com/docker/swarmkit/log"
  16. gogotypes "github.com/gogo/protobuf/types"
  17. "github.com/pkg/errors"
  18. "golang.org/x/net/context"
  19. "golang.org/x/time/rate"
  20. )
  21. // controller implements agent.Controller against docker's API.
  22. //
  23. // Most operations against docker's API are done through the container name,
  24. // which is unique to the task.
  25. type controller struct {
  26. task *api.Task
  27. adapter *containerAdapter
  28. closed chan struct{}
  29. err error
  30. pulled chan struct{} // closed after pull
  31. cancelPull func() // cancels pull context if not nil
  32. pullErr error // pull error, only read after pulled closed
  33. }
  34. var _ exec.Controller = &controller{}
  35. // NewController returns a docker exec runner for the provided task.
  36. func newController(b executorpkg.Backend, task *api.Task, secrets exec.SecretGetter) (*controller, error) {
  37. adapter, err := newContainerAdapter(b, task, secrets)
  38. if err != nil {
  39. return nil, err
  40. }
  41. return &controller{
  42. task: task,
  43. adapter: adapter,
  44. closed: make(chan struct{}),
  45. }, nil
  46. }
  47. func (r *controller) Task() (*api.Task, error) {
  48. return r.task, nil
  49. }
  50. // ContainerStatus returns the container-specific status for the task.
  51. func (r *controller) ContainerStatus(ctx context.Context) (*api.ContainerStatus, error) {
  52. ctnr, err := r.adapter.inspect(ctx)
  53. if err != nil {
  54. if isUnknownContainer(err) {
  55. return nil, nil
  56. }
  57. return nil, err
  58. }
  59. return parseContainerStatus(ctnr)
  60. }
  61. func (r *controller) PortStatus(ctx context.Context) (*api.PortStatus, error) {
  62. ctnr, err := r.adapter.inspect(ctx)
  63. if err != nil {
  64. if isUnknownContainer(err) {
  65. return nil, nil
  66. }
  67. return nil, err
  68. }
  69. return parsePortStatus(ctnr)
  70. }
  71. // Update tasks a recent task update and applies it to the container.
  72. func (r *controller) Update(ctx context.Context, t *api.Task) error {
  73. // TODO(stevvooe): While assignment of tasks is idempotent, we do allow
  74. // updates of metadata, such as labelling, as well as any other properties
  75. // that make sense.
  76. return nil
  77. }
  78. // Prepare creates a container and ensures the image is pulled.
  79. //
  80. // If the container has already be created, exec.ErrTaskPrepared is returned.
  81. func (r *controller) Prepare(ctx context.Context) error {
  82. if err := r.checkClosed(); err != nil {
  83. return err
  84. }
  85. // Make sure all the networks that the task needs are created.
  86. if err := r.adapter.createNetworks(ctx); err != nil {
  87. return err
  88. }
  89. // Make sure all the volumes that the task needs are created.
  90. if err := r.adapter.createVolumes(ctx); err != nil {
  91. return err
  92. }
  93. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
  94. if r.pulled == nil {
  95. // Fork the pull to a different context to allow pull to continue
  96. // on re-entrant calls to Prepare. This ensures that Prepare can be
  97. // idempotent and not incur the extra cost of pulling when
  98. // cancelled on updates.
  99. var pctx context.Context
  100. r.pulled = make(chan struct{})
  101. pctx, r.cancelPull = context.WithCancel(context.Background()) // TODO(stevvooe): Bind a context to the entire controller.
  102. go func() {
  103. defer close(r.pulled)
  104. r.pullErr = r.adapter.pullImage(pctx) // protected by closing r.pulled
  105. }()
  106. }
  107. select {
  108. case <-ctx.Done():
  109. return ctx.Err()
  110. case <-r.pulled:
  111. if r.pullErr != nil {
  112. // NOTE(stevvooe): We always try to pull the image to make sure we have
  113. // the most up to date version. This will return an error, but we only
  114. // log it. If the image truly doesn't exist, the create below will
  115. // error out.
  116. //
  117. // This gives us some nice behavior where we use up to date versions of
  118. // mutable tags, but will still run if the old image is available but a
  119. // registry is down.
  120. //
  121. // If you don't want this behavior, lock down your image to an
  122. // immutable tag or digest.
  123. log.G(ctx).WithError(r.pullErr).Error("pulling image failed")
  124. }
  125. }
  126. }
  127. if err := r.adapter.create(ctx); err != nil {
  128. if isContainerCreateNameConflict(err) {
  129. if _, err := r.adapter.inspect(ctx); err != nil {
  130. return err
  131. }
  132. // container is already created. success!
  133. return exec.ErrTaskPrepared
  134. }
  135. return err
  136. }
  137. return nil
  138. }
  139. // Start the container. An error will be returned if the container is already started.
  140. func (r *controller) Start(ctx context.Context) error {
  141. if err := r.checkClosed(); err != nil {
  142. return err
  143. }
  144. ctnr, err := r.adapter.inspect(ctx)
  145. if err != nil {
  146. return err
  147. }
  148. // Detect whether the container has *ever* been started. If so, we don't
  149. // issue the start.
  150. //
  151. // TODO(stevvooe): This is very racy. While reading inspect, another could
  152. // start the process and we could end up starting it twice.
  153. if ctnr.State.Status != "created" {
  154. return exec.ErrTaskStarted
  155. }
  156. for {
  157. if err := r.adapter.start(ctx); err != nil {
  158. if _, ok := err.(libnetwork.ErrNoSuchNetwork); ok {
  159. // Retry network creation again if we
  160. // failed because some of the networks
  161. // were not found.
  162. if err := r.adapter.createNetworks(ctx); err != nil {
  163. return err
  164. }
  165. continue
  166. }
  167. return errors.Wrap(err, "starting container failed")
  168. }
  169. break
  170. }
  171. // no health check
  172. if ctnr.Config == nil || ctnr.Config.Healthcheck == nil || len(ctnr.Config.Healthcheck.Test) == 0 || ctnr.Config.Healthcheck.Test[0] == "NONE" {
  173. if err := r.adapter.activateServiceBinding(); err != nil {
  174. log.G(ctx).WithError(err).Errorf("failed to activate service binding for container %s which has no healthcheck config", r.adapter.container.name())
  175. return err
  176. }
  177. return nil
  178. }
  179. // wait for container to be healthy
  180. eventq := r.adapter.events(ctx)
  181. var healthErr error
  182. for {
  183. select {
  184. case event := <-eventq:
  185. if !r.matchevent(event) {
  186. continue
  187. }
  188. switch event.Action {
  189. case "die": // exit on terminal events
  190. ctnr, err := r.adapter.inspect(ctx)
  191. if err != nil {
  192. return errors.Wrap(err, "die event received")
  193. } else if ctnr.State.ExitCode != 0 {
  194. return &exitError{code: ctnr.State.ExitCode, cause: healthErr}
  195. }
  196. return nil
  197. case "destroy":
  198. // If we get here, something has gone wrong but we want to exit
  199. // and report anyways.
  200. return ErrContainerDestroyed
  201. case "health_status: unhealthy":
  202. // in this case, we stop the container and report unhealthy status
  203. if err := r.Shutdown(ctx); err != nil {
  204. return errors.Wrap(err, "unhealthy container shutdown failed")
  205. }
  206. // set health check error, and wait for container to fully exit ("die" event)
  207. healthErr = ErrContainerUnhealthy
  208. case "health_status: healthy":
  209. if err := r.adapter.activateServiceBinding(); err != nil {
  210. log.G(ctx).WithError(err).Errorf("failed to activate service binding for container %s after healthy event", r.adapter.container.name())
  211. return err
  212. }
  213. return nil
  214. }
  215. case <-ctx.Done():
  216. return ctx.Err()
  217. case <-r.closed:
  218. return r.err
  219. }
  220. }
  221. }
  222. // Wait on the container to exit.
  223. func (r *controller) Wait(pctx context.Context) error {
  224. if err := r.checkClosed(); err != nil {
  225. return err
  226. }
  227. ctx, cancel := context.WithCancel(pctx)
  228. defer cancel()
  229. healthErr := make(chan error, 1)
  230. go func() {
  231. ectx, cancel := context.WithCancel(ctx) // cancel event context on first event
  232. defer cancel()
  233. if err := r.checkHealth(ectx); err == ErrContainerUnhealthy {
  234. healthErr <- ErrContainerUnhealthy
  235. if err := r.Shutdown(ectx); err != nil {
  236. log.G(ectx).WithError(err).Debug("shutdown failed on unhealthy")
  237. }
  238. }
  239. }()
  240. err := r.adapter.wait(ctx)
  241. if ctx.Err() != nil {
  242. return ctx.Err()
  243. }
  244. if err != nil {
  245. ee := &exitError{}
  246. if ec, ok := err.(exec.ExitCoder); ok {
  247. ee.code = ec.ExitCode()
  248. }
  249. select {
  250. case e := <-healthErr:
  251. ee.cause = e
  252. default:
  253. if err.Error() != "" {
  254. ee.cause = err
  255. }
  256. }
  257. return ee
  258. }
  259. return nil
  260. }
  261. // Shutdown the container cleanly.
  262. func (r *controller) Shutdown(ctx context.Context) error {
  263. if err := r.checkClosed(); err != nil {
  264. return err
  265. }
  266. if r.cancelPull != nil {
  267. r.cancelPull()
  268. }
  269. // remove container from service binding
  270. if err := r.adapter.deactivateServiceBinding(); err != nil {
  271. log.G(ctx).WithError(err).Warningf("failed to deactivate service binding for container %s", r.adapter.container.name())
  272. // Don't return an error here, because failure to deactivate
  273. // the service binding is expected if the container was never
  274. // started.
  275. }
  276. if err := r.adapter.shutdown(ctx); err != nil {
  277. if isUnknownContainer(err) || isStoppedContainer(err) {
  278. return nil
  279. }
  280. return err
  281. }
  282. return nil
  283. }
  284. // Terminate the container, with force.
  285. func (r *controller) Terminate(ctx context.Context) error {
  286. if err := r.checkClosed(); err != nil {
  287. return err
  288. }
  289. if r.cancelPull != nil {
  290. r.cancelPull()
  291. }
  292. if err := r.adapter.terminate(ctx); err != nil {
  293. if isUnknownContainer(err) {
  294. return nil
  295. }
  296. return err
  297. }
  298. return nil
  299. }
  300. // Remove the container and its resources.
  301. func (r *controller) Remove(ctx context.Context) error {
  302. if err := r.checkClosed(); err != nil {
  303. return err
  304. }
  305. if r.cancelPull != nil {
  306. r.cancelPull()
  307. }
  308. // It may be necessary to shut down the task before removing it.
  309. if err := r.Shutdown(ctx); err != nil {
  310. if isUnknownContainer(err) {
  311. return nil
  312. }
  313. // This may fail if the task was already shut down.
  314. log.G(ctx).WithError(err).Debug("shutdown failed on removal")
  315. }
  316. // Try removing networks referenced in this task in case this
  317. // task is the last one referencing it
  318. if err := r.adapter.removeNetworks(ctx); err != nil {
  319. if isUnknownContainer(err) {
  320. return nil
  321. }
  322. return err
  323. }
  324. if err := r.adapter.remove(ctx); err != nil {
  325. if isUnknownContainer(err) {
  326. return nil
  327. }
  328. return err
  329. }
  330. return nil
  331. }
  332. // waitReady waits for a container to be "ready".
  333. // Ready means it's past the started state.
  334. func (r *controller) waitReady(pctx context.Context) error {
  335. if err := r.checkClosed(); err != nil {
  336. return err
  337. }
  338. ctx, cancel := context.WithCancel(pctx)
  339. defer cancel()
  340. eventq := r.adapter.events(ctx)
  341. ctnr, err := r.adapter.inspect(ctx)
  342. if err != nil {
  343. if !isUnknownContainer(err) {
  344. return errors.Wrap(err, "inspect container failed")
  345. }
  346. } else {
  347. switch ctnr.State.Status {
  348. case "running", "exited", "dead":
  349. return nil
  350. }
  351. }
  352. for {
  353. select {
  354. case event := <-eventq:
  355. if !r.matchevent(event) {
  356. continue
  357. }
  358. switch event.Action {
  359. case "start":
  360. return nil
  361. }
  362. case <-ctx.Done():
  363. return ctx.Err()
  364. case <-r.closed:
  365. return r.err
  366. }
  367. }
  368. }
  369. func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, options api.LogSubscriptionOptions) error {
  370. if err := r.checkClosed(); err != nil {
  371. return err
  372. }
  373. if err := r.waitReady(ctx); err != nil {
  374. return errors.Wrap(err, "container not ready for logs")
  375. }
  376. logsContext, cancel := context.WithCancel(ctx)
  377. msgs, err := r.adapter.logs(logsContext, options)
  378. defer cancel()
  379. if err != nil {
  380. return errors.Wrap(err, "failed getting container logs")
  381. }
  382. var (
  383. // use a rate limiter to keep things under control but also provides some
  384. // ability coalesce messages.
  385. limiter = rate.NewLimiter(rate.Every(time.Second), 10<<20) // 10 MB/s
  386. msgctx = api.LogContext{
  387. NodeID: r.task.NodeID,
  388. ServiceID: r.task.ServiceID,
  389. TaskID: r.task.ID,
  390. }
  391. )
  392. for {
  393. msg, ok := <-msgs
  394. if !ok {
  395. // we're done here, no more messages
  396. return nil
  397. }
  398. if msg.Err != nil {
  399. // the defered cancel closes the adapter's log stream
  400. return msg.Err
  401. }
  402. // wait here for the limiter to catch up
  403. if err := limiter.WaitN(ctx, len(msg.Line)); err != nil {
  404. return errors.Wrap(err, "failed rate limiter")
  405. }
  406. tsp, err := gogotypes.TimestampProto(msg.Timestamp)
  407. if err != nil {
  408. return errors.Wrap(err, "failed to convert timestamp")
  409. }
  410. var stream api.LogStream
  411. if msg.Source == "stdout" {
  412. stream = api.LogStreamStdout
  413. } else if msg.Source == "stderr" {
  414. stream = api.LogStreamStderr
  415. }
  416. if err := publisher.Publish(ctx, api.LogMessage{
  417. Context: msgctx,
  418. Timestamp: tsp,
  419. Stream: stream,
  420. Data: msg.Line,
  421. }); err != nil {
  422. return errors.Wrap(err, "failed to publish log message")
  423. }
  424. }
  425. }
  426. // Close the runner and clean up any ephemeral resources.
  427. func (r *controller) Close() error {
  428. select {
  429. case <-r.closed:
  430. return r.err
  431. default:
  432. if r.cancelPull != nil {
  433. r.cancelPull()
  434. }
  435. r.err = exec.ErrControllerClosed
  436. close(r.closed)
  437. }
  438. return nil
  439. }
  440. func (r *controller) matchevent(event events.Message) bool {
  441. if event.Type != events.ContainerEventType {
  442. return false
  443. }
  444. // TODO(stevvooe): Filter based on ID matching, in addition to name.
  445. // Make sure the events are for this container.
  446. if event.Actor.Attributes["name"] != r.adapter.container.name() {
  447. return false
  448. }
  449. return true
  450. }
  451. func (r *controller) checkClosed() error {
  452. select {
  453. case <-r.closed:
  454. return r.err
  455. default:
  456. return nil
  457. }
  458. }
  459. func parseContainerStatus(ctnr types.ContainerJSON) (*api.ContainerStatus, error) {
  460. status := &api.ContainerStatus{
  461. ContainerID: ctnr.ID,
  462. PID: int32(ctnr.State.Pid),
  463. ExitCode: int32(ctnr.State.ExitCode),
  464. }
  465. return status, nil
  466. }
  467. func parsePortStatus(ctnr types.ContainerJSON) (*api.PortStatus, error) {
  468. status := &api.PortStatus{}
  469. if ctnr.NetworkSettings != nil && len(ctnr.NetworkSettings.Ports) > 0 {
  470. exposedPorts, err := parsePortMap(ctnr.NetworkSettings.Ports)
  471. if err != nil {
  472. return nil, err
  473. }
  474. status.Ports = exposedPorts
  475. }
  476. return status, nil
  477. }
  478. func parsePortMap(portMap nat.PortMap) ([]*api.PortConfig, error) {
  479. exposedPorts := make([]*api.PortConfig, 0, len(portMap))
  480. for portProtocol, mapping := range portMap {
  481. parts := strings.SplitN(string(portProtocol), "/", 2)
  482. if len(parts) != 2 {
  483. return nil, fmt.Errorf("invalid port mapping: %s", portProtocol)
  484. }
  485. port, err := strconv.ParseUint(parts[0], 10, 16)
  486. if err != nil {
  487. return nil, err
  488. }
  489. protocol := api.ProtocolTCP
  490. switch strings.ToLower(parts[1]) {
  491. case "tcp":
  492. protocol = api.ProtocolTCP
  493. case "udp":
  494. protocol = api.ProtocolUDP
  495. default:
  496. return nil, fmt.Errorf("invalid protocol: %s", parts[1])
  497. }
  498. for _, binding := range mapping {
  499. hostPort, err := strconv.ParseUint(binding.HostPort, 10, 16)
  500. if err != nil {
  501. return nil, err
  502. }
  503. // TODO(aluzzardi): We're losing the port `name` here since
  504. // there's no way to retrieve it back from the Engine.
  505. exposedPorts = append(exposedPorts, &api.PortConfig{
  506. PublishMode: api.PublishModeHost,
  507. Protocol: protocol,
  508. TargetPort: uint32(port),
  509. PublishedPort: uint32(hostPort),
  510. })
  511. }
  512. }
  513. return exposedPorts, nil
  514. }
  515. type exitError struct {
  516. code int
  517. cause error
  518. }
  519. func (e *exitError) Error() string {
  520. if e.cause != nil {
  521. return fmt.Sprintf("task: non-zero exit (%v): %v", e.code, e.cause)
  522. }
  523. return fmt.Sprintf("task: non-zero exit (%v)", e.code)
  524. }
  525. func (e *exitError) ExitCode() int {
  526. return int(e.code)
  527. }
  528. func (e *exitError) Cause() error {
  529. return e.cause
  530. }
  531. // checkHealth blocks until unhealthy container is detected or ctx exits
  532. func (r *controller) checkHealth(ctx context.Context) error {
  533. eventq := r.adapter.events(ctx)
  534. for {
  535. select {
  536. case <-ctx.Done():
  537. return nil
  538. case <-r.closed:
  539. return nil
  540. case event := <-eventq:
  541. if !r.matchevent(event) {
  542. continue
  543. }
  544. switch event.Action {
  545. case "health_status: unhealthy":
  546. return ErrContainerUnhealthy
  547. }
  548. }
  549. }
  550. }