controller.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690
  1. package container
  2. import (
  3. "fmt"
  4. "os"
  5. "strconv"
  6. "strings"
  7. "time"
  8. "github.com/docker/docker/api/types"
  9. "github.com/docker/docker/api/types/events"
  10. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  11. "github.com/docker/go-connections/nat"
  12. "github.com/docker/libnetwork"
  13. "github.com/docker/swarmkit/agent/exec"
  14. "github.com/docker/swarmkit/api"
  15. "github.com/docker/swarmkit/log"
  16. gogotypes "github.com/gogo/protobuf/types"
  17. "github.com/pkg/errors"
  18. "golang.org/x/net/context"
  19. "golang.org/x/time/rate"
  20. )
  21. const defaultGossipConvergeDelay = 2 * time.Second
  22. // controller implements agent.Controller against docker's API.
  23. //
  24. // Most operations against docker's API are done through the container name,
  25. // which is unique to the task.
  26. type controller struct {
  27. task *api.Task
  28. adapter *containerAdapter
  29. closed chan struct{}
  30. err error
  31. pulled chan struct{} // closed after pull
  32. cancelPull func() // cancels pull context if not nil
  33. pullErr error // pull error, only read after pulled closed
  34. }
  35. var _ exec.Controller = &controller{}
  36. // NewController returns a docker exec runner for the provided task.
  37. func newController(b executorpkg.Backend, task *api.Task, dependencies exec.DependencyGetter) (*controller, error) {
  38. adapter, err := newContainerAdapter(b, task, dependencies)
  39. if err != nil {
  40. return nil, err
  41. }
  42. return &controller{
  43. task: task,
  44. adapter: adapter,
  45. closed: make(chan struct{}),
  46. }, nil
  47. }
  48. func (r *controller) Task() (*api.Task, error) {
  49. return r.task, nil
  50. }
  51. // ContainerStatus returns the container-specific status for the task.
  52. func (r *controller) ContainerStatus(ctx context.Context) (*api.ContainerStatus, error) {
  53. ctnr, err := r.adapter.inspect(ctx)
  54. if err != nil {
  55. if isUnknownContainer(err) {
  56. return nil, nil
  57. }
  58. return nil, err
  59. }
  60. return parseContainerStatus(ctnr)
  61. }
  62. func (r *controller) PortStatus(ctx context.Context) (*api.PortStatus, error) {
  63. ctnr, err := r.adapter.inspect(ctx)
  64. if err != nil {
  65. if isUnknownContainer(err) {
  66. return nil, nil
  67. }
  68. return nil, err
  69. }
  70. return parsePortStatus(ctnr)
  71. }
  72. // Update tasks a recent task update and applies it to the container.
  73. func (r *controller) Update(ctx context.Context, t *api.Task) error {
  74. // TODO(stevvooe): While assignment of tasks is idempotent, we do allow
  75. // updates of metadata, such as labelling, as well as any other properties
  76. // that make sense.
  77. return nil
  78. }
  79. // Prepare creates a container and ensures the image is pulled.
  80. //
  81. // If the container has already be created, exec.ErrTaskPrepared is returned.
  82. func (r *controller) Prepare(ctx context.Context) error {
  83. if err := r.checkClosed(); err != nil {
  84. return err
  85. }
  86. // Make sure all the networks that the task needs are created.
  87. if err := r.adapter.createNetworks(ctx); err != nil {
  88. return err
  89. }
  90. // Make sure all the volumes that the task needs are created.
  91. if err := r.adapter.createVolumes(ctx); err != nil {
  92. return err
  93. }
  94. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
  95. if r.pulled == nil {
  96. // Fork the pull to a different context to allow pull to continue
  97. // on re-entrant calls to Prepare. This ensures that Prepare can be
  98. // idempotent and not incur the extra cost of pulling when
  99. // cancelled on updates.
  100. var pctx context.Context
  101. r.pulled = make(chan struct{})
  102. pctx, r.cancelPull = context.WithCancel(context.Background()) // TODO(stevvooe): Bind a context to the entire controller.
  103. go func() {
  104. defer close(r.pulled)
  105. r.pullErr = r.adapter.pullImage(pctx) // protected by closing r.pulled
  106. }()
  107. }
  108. select {
  109. case <-ctx.Done():
  110. return ctx.Err()
  111. case <-r.pulled:
  112. if r.pullErr != nil {
  113. // NOTE(stevvooe): We always try to pull the image to make sure we have
  114. // the most up to date version. This will return an error, but we only
  115. // log it. If the image truly doesn't exist, the create below will
  116. // error out.
  117. //
  118. // This gives us some nice behavior where we use up to date versions of
  119. // mutable tags, but will still run if the old image is available but a
  120. // registry is down.
  121. //
  122. // If you don't want this behavior, lock down your image to an
  123. // immutable tag or digest.
  124. log.G(ctx).WithError(r.pullErr).Error("pulling image failed")
  125. }
  126. }
  127. }
  128. if err := r.adapter.create(ctx); err != nil {
  129. if isContainerCreateNameConflict(err) {
  130. if _, err := r.adapter.inspect(ctx); err != nil {
  131. return err
  132. }
  133. // container is already created. success!
  134. return exec.ErrTaskPrepared
  135. }
  136. return err
  137. }
  138. return nil
  139. }
  140. // Start the container. An error will be returned if the container is already started.
  141. func (r *controller) Start(ctx context.Context) error {
  142. if err := r.checkClosed(); err != nil {
  143. return err
  144. }
  145. ctnr, err := r.adapter.inspect(ctx)
  146. if err != nil {
  147. return err
  148. }
  149. // Detect whether the container has *ever* been started. If so, we don't
  150. // issue the start.
  151. //
  152. // TODO(stevvooe): This is very racy. While reading inspect, another could
  153. // start the process and we could end up starting it twice.
  154. if ctnr.State.Status != "created" {
  155. return exec.ErrTaskStarted
  156. }
  157. for {
  158. if err := r.adapter.start(ctx); err != nil {
  159. if _, ok := err.(libnetwork.ErrNoSuchNetwork); ok {
  160. // Retry network creation again if we
  161. // failed because some of the networks
  162. // were not found.
  163. if err := r.adapter.createNetworks(ctx); err != nil {
  164. return err
  165. }
  166. continue
  167. }
  168. return errors.Wrap(err, "starting container failed")
  169. }
  170. break
  171. }
  172. // no health check
  173. if ctnr.Config == nil || ctnr.Config.Healthcheck == nil || len(ctnr.Config.Healthcheck.Test) == 0 || ctnr.Config.Healthcheck.Test[0] == "NONE" {
  174. if err := r.adapter.activateServiceBinding(); err != nil {
  175. log.G(ctx).WithError(err).Errorf("failed to activate service binding for container %s which has no healthcheck config", r.adapter.container.name())
  176. return err
  177. }
  178. return nil
  179. }
  180. // wait for container to be healthy
  181. eventq := r.adapter.events(ctx)
  182. var healthErr error
  183. for {
  184. select {
  185. case event := <-eventq:
  186. if !r.matchevent(event) {
  187. continue
  188. }
  189. switch event.Action {
  190. case "die": // exit on terminal events
  191. ctnr, err := r.adapter.inspect(ctx)
  192. if err != nil {
  193. return errors.Wrap(err, "die event received")
  194. } else if ctnr.State.ExitCode != 0 {
  195. return &exitError{code: ctnr.State.ExitCode, cause: healthErr}
  196. }
  197. return nil
  198. case "destroy":
  199. // If we get here, something has gone wrong but we want to exit
  200. // and report anyways.
  201. return ErrContainerDestroyed
  202. case "health_status: unhealthy":
  203. // in this case, we stop the container and report unhealthy status
  204. if err := r.Shutdown(ctx); err != nil {
  205. return errors.Wrap(err, "unhealthy container shutdown failed")
  206. }
  207. // set health check error, and wait for container to fully exit ("die" event)
  208. healthErr = ErrContainerUnhealthy
  209. case "health_status: healthy":
  210. if err := r.adapter.activateServiceBinding(); err != nil {
  211. log.G(ctx).WithError(err).Errorf("failed to activate service binding for container %s after healthy event", r.adapter.container.name())
  212. return err
  213. }
  214. return nil
  215. }
  216. case <-ctx.Done():
  217. return ctx.Err()
  218. case <-r.closed:
  219. return r.err
  220. }
  221. }
  222. }
  223. // Wait on the container to exit.
  224. func (r *controller) Wait(pctx context.Context) error {
  225. if err := r.checkClosed(); err != nil {
  226. return err
  227. }
  228. ctx, cancel := context.WithCancel(pctx)
  229. defer cancel()
  230. healthErr := make(chan error, 1)
  231. go func() {
  232. ectx, cancel := context.WithCancel(ctx) // cancel event context on first event
  233. defer cancel()
  234. if err := r.checkHealth(ectx); err == ErrContainerUnhealthy {
  235. healthErr <- ErrContainerUnhealthy
  236. if err := r.Shutdown(ectx); err != nil {
  237. log.G(ectx).WithError(err).Debug("shutdown failed on unhealthy")
  238. }
  239. }
  240. }()
  241. waitC, err := r.adapter.wait(ctx)
  242. if err != nil {
  243. return err
  244. }
  245. if status := <-waitC; status.ExitCode() != 0 {
  246. exitErr := &exitError{
  247. code: status.ExitCode(),
  248. }
  249. // Set the cause if it is knowable.
  250. select {
  251. case e := <-healthErr:
  252. exitErr.cause = e
  253. default:
  254. if status.Err() != nil {
  255. exitErr.cause = status.Err()
  256. }
  257. }
  258. return exitErr
  259. }
  260. return nil
  261. }
  262. func (r *controller) hasServiceBinding() bool {
  263. if r.task == nil {
  264. return false
  265. }
  266. // service is attached to a network besides the default bridge
  267. for _, na := range r.task.Networks {
  268. if na.Network == nil ||
  269. na.Network.DriverState == nil ||
  270. na.Network.DriverState.Name == "bridge" && na.Network.Spec.Annotations.Name == "bridge" {
  271. continue
  272. }
  273. return true
  274. }
  275. return false
  276. }
  277. // Shutdown the container cleanly.
  278. func (r *controller) Shutdown(ctx context.Context) error {
  279. if err := r.checkClosed(); err != nil {
  280. return err
  281. }
  282. if r.cancelPull != nil {
  283. r.cancelPull()
  284. }
  285. if r.hasServiceBinding() {
  286. // remove container from service binding
  287. if err := r.adapter.deactivateServiceBinding(); err != nil {
  288. log.G(ctx).WithError(err).Warningf("failed to deactivate service binding for container %s", r.adapter.container.name())
  289. // Don't return an error here, because failure to deactivate
  290. // the service binding is expected if the container was never
  291. // started.
  292. }
  293. // add a delay for gossip converge
  294. // TODO(dongluochen): this delay should be configurable to fit different cluster size and network delay.
  295. time.Sleep(defaultGossipConvergeDelay)
  296. }
  297. if err := r.adapter.shutdown(ctx); err != nil {
  298. if isUnknownContainer(err) || isStoppedContainer(err) {
  299. return nil
  300. }
  301. return err
  302. }
  303. return nil
  304. }
  305. // Terminate the container, with force.
  306. func (r *controller) Terminate(ctx context.Context) error {
  307. if err := r.checkClosed(); err != nil {
  308. return err
  309. }
  310. if r.cancelPull != nil {
  311. r.cancelPull()
  312. }
  313. if err := r.adapter.terminate(ctx); err != nil {
  314. if isUnknownContainer(err) {
  315. return nil
  316. }
  317. return err
  318. }
  319. return nil
  320. }
  321. // Remove the container and its resources.
  322. func (r *controller) Remove(ctx context.Context) error {
  323. if err := r.checkClosed(); err != nil {
  324. return err
  325. }
  326. if r.cancelPull != nil {
  327. r.cancelPull()
  328. }
  329. // It may be necessary to shut down the task before removing it.
  330. if err := r.Shutdown(ctx); err != nil {
  331. if isUnknownContainer(err) {
  332. return nil
  333. }
  334. // This may fail if the task was already shut down.
  335. log.G(ctx).WithError(err).Debug("shutdown failed on removal")
  336. }
  337. // Try removing networks referenced in this task in case this
  338. // task is the last one referencing it
  339. if err := r.adapter.removeNetworks(ctx); err != nil {
  340. if isUnknownContainer(err) {
  341. return nil
  342. }
  343. return err
  344. }
  345. if err := r.adapter.remove(ctx); err != nil {
  346. if isUnknownContainer(err) {
  347. return nil
  348. }
  349. return err
  350. }
  351. return nil
  352. }
  353. // waitReady waits for a container to be "ready".
  354. // Ready means it's past the started state.
  355. func (r *controller) waitReady(pctx context.Context) error {
  356. if err := r.checkClosed(); err != nil {
  357. return err
  358. }
  359. ctx, cancel := context.WithCancel(pctx)
  360. defer cancel()
  361. eventq := r.adapter.events(ctx)
  362. ctnr, err := r.adapter.inspect(ctx)
  363. if err != nil {
  364. if !isUnknownContainer(err) {
  365. return errors.Wrap(err, "inspect container failed")
  366. }
  367. } else {
  368. switch ctnr.State.Status {
  369. case "running", "exited", "dead":
  370. return nil
  371. }
  372. }
  373. for {
  374. select {
  375. case event := <-eventq:
  376. if !r.matchevent(event) {
  377. continue
  378. }
  379. switch event.Action {
  380. case "start":
  381. return nil
  382. }
  383. case <-ctx.Done():
  384. return ctx.Err()
  385. case <-r.closed:
  386. return r.err
  387. }
  388. }
  389. }
  390. func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, options api.LogSubscriptionOptions) error {
  391. if err := r.checkClosed(); err != nil {
  392. return err
  393. }
  394. // if we're following, wait for this container to be ready. there is a
  395. // problem here: if the container will never be ready (for example, it has
  396. // been totally deleted) then this will wait forever. however, this doesn't
  397. // actually cause any UI issues, and shouldn't be a problem. the stuck wait
  398. // will go away when the follow (context) is canceled.
  399. if options.Follow {
  400. if err := r.waitReady(ctx); err != nil {
  401. return errors.Wrap(err, "container not ready for logs")
  402. }
  403. }
  404. // if we're not following, we're not gonna wait for the container to be
  405. // ready. just call logs. if the container isn't ready, the call will fail
  406. // and return an error. no big deal, we don't care, we only want the logs
  407. // we can get RIGHT NOW with no follow
  408. logsContext, cancel := context.WithCancel(ctx)
  409. msgs, err := r.adapter.logs(logsContext, options)
  410. defer cancel()
  411. if err != nil {
  412. return errors.Wrap(err, "failed getting container logs")
  413. }
  414. var (
  415. // use a rate limiter to keep things under control but also provides some
  416. // ability coalesce messages.
  417. limiter = rate.NewLimiter(rate.Every(time.Second), 10<<20) // 10 MB/s
  418. msgctx = api.LogContext{
  419. NodeID: r.task.NodeID,
  420. ServiceID: r.task.ServiceID,
  421. TaskID: r.task.ID,
  422. }
  423. )
  424. for {
  425. msg, ok := <-msgs
  426. if !ok {
  427. // we're done here, no more messages
  428. return nil
  429. }
  430. if msg.Err != nil {
  431. // the defered cancel closes the adapter's log stream
  432. return msg.Err
  433. }
  434. // wait here for the limiter to catch up
  435. if err := limiter.WaitN(ctx, len(msg.Line)); err != nil {
  436. return errors.Wrap(err, "failed rate limiter")
  437. }
  438. tsp, err := gogotypes.TimestampProto(msg.Timestamp)
  439. if err != nil {
  440. return errors.Wrap(err, "failed to convert timestamp")
  441. }
  442. var stream api.LogStream
  443. if msg.Source == "stdout" {
  444. stream = api.LogStreamStdout
  445. } else if msg.Source == "stderr" {
  446. stream = api.LogStreamStderr
  447. }
  448. // parse the details out of the Attrs map
  449. var attrs []api.LogAttr
  450. if len(msg.Attrs) != 0 {
  451. attrs = make([]api.LogAttr, 0, len(msg.Attrs))
  452. for _, attr := range msg.Attrs {
  453. attrs = append(attrs, api.LogAttr{Key: attr.Key, Value: attr.Value})
  454. }
  455. }
  456. if err := publisher.Publish(ctx, api.LogMessage{
  457. Context: msgctx,
  458. Timestamp: tsp,
  459. Stream: stream,
  460. Attrs: attrs,
  461. Data: msg.Line,
  462. }); err != nil {
  463. return errors.Wrap(err, "failed to publish log message")
  464. }
  465. }
  466. }
  467. // Close the runner and clean up any ephemeral resources.
  468. func (r *controller) Close() error {
  469. select {
  470. case <-r.closed:
  471. return r.err
  472. default:
  473. if r.cancelPull != nil {
  474. r.cancelPull()
  475. }
  476. r.err = exec.ErrControllerClosed
  477. close(r.closed)
  478. }
  479. return nil
  480. }
  481. func (r *controller) matchevent(event events.Message) bool {
  482. if event.Type != events.ContainerEventType {
  483. return false
  484. }
  485. // we can't filter using id since it will have huge chances to introduce a deadlock. see #33377.
  486. return event.Actor.Attributes["name"] == r.adapter.container.name()
  487. }
  488. func (r *controller) checkClosed() error {
  489. select {
  490. case <-r.closed:
  491. return r.err
  492. default:
  493. return nil
  494. }
  495. }
  496. func parseContainerStatus(ctnr types.ContainerJSON) (*api.ContainerStatus, error) {
  497. status := &api.ContainerStatus{
  498. ContainerID: ctnr.ID,
  499. PID: int32(ctnr.State.Pid),
  500. ExitCode: int32(ctnr.State.ExitCode),
  501. }
  502. return status, nil
  503. }
  504. func parsePortStatus(ctnr types.ContainerJSON) (*api.PortStatus, error) {
  505. status := &api.PortStatus{}
  506. if ctnr.NetworkSettings != nil && len(ctnr.NetworkSettings.Ports) > 0 {
  507. exposedPorts, err := parsePortMap(ctnr.NetworkSettings.Ports)
  508. if err != nil {
  509. return nil, err
  510. }
  511. status.Ports = exposedPorts
  512. }
  513. return status, nil
  514. }
  515. func parsePortMap(portMap nat.PortMap) ([]*api.PortConfig, error) {
  516. exposedPorts := make([]*api.PortConfig, 0, len(portMap))
  517. for portProtocol, mapping := range portMap {
  518. parts := strings.SplitN(string(portProtocol), "/", 2)
  519. if len(parts) != 2 {
  520. return nil, fmt.Errorf("invalid port mapping: %s", portProtocol)
  521. }
  522. port, err := strconv.ParseUint(parts[0], 10, 16)
  523. if err != nil {
  524. return nil, err
  525. }
  526. protocol := api.ProtocolTCP
  527. switch strings.ToLower(parts[1]) {
  528. case "tcp":
  529. protocol = api.ProtocolTCP
  530. case "udp":
  531. protocol = api.ProtocolUDP
  532. default:
  533. return nil, fmt.Errorf("invalid protocol: %s", parts[1])
  534. }
  535. for _, binding := range mapping {
  536. hostPort, err := strconv.ParseUint(binding.HostPort, 10, 16)
  537. if err != nil {
  538. return nil, err
  539. }
  540. // TODO(aluzzardi): We're losing the port `name` here since
  541. // there's no way to retrieve it back from the Engine.
  542. exposedPorts = append(exposedPorts, &api.PortConfig{
  543. PublishMode: api.PublishModeHost,
  544. Protocol: protocol,
  545. TargetPort: uint32(port),
  546. PublishedPort: uint32(hostPort),
  547. })
  548. }
  549. }
  550. return exposedPorts, nil
  551. }
  552. type exitError struct {
  553. code int
  554. cause error
  555. }
  556. func (e *exitError) Error() string {
  557. if e.cause != nil {
  558. return fmt.Sprintf("task: non-zero exit (%v): %v", e.code, e.cause)
  559. }
  560. return fmt.Sprintf("task: non-zero exit (%v)", e.code)
  561. }
  562. func (e *exitError) ExitCode() int {
  563. return int(e.code)
  564. }
  565. func (e *exitError) Cause() error {
  566. return e.cause
  567. }
  568. // checkHealth blocks until unhealthy container is detected or ctx exits
  569. func (r *controller) checkHealth(ctx context.Context) error {
  570. eventq := r.adapter.events(ctx)
  571. for {
  572. select {
  573. case <-ctx.Done():
  574. return nil
  575. case <-r.closed:
  576. return nil
  577. case event := <-eventq:
  578. if !r.matchevent(event) {
  579. continue
  580. }
  581. switch event.Action {
  582. case "health_status: unhealthy":
  583. return ErrContainerUnhealthy
  584. }
  585. }
  586. }
  587. }