controller.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668
  1. package container
  2. import (
  3. "bufio"
  4. "bytes"
  5. "encoding/binary"
  6. "fmt"
  7. "io"
  8. "os"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/docker/docker/api/types"
  13. "github.com/docker/docker/api/types/events"
  14. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  15. "github.com/docker/go-connections/nat"
  16. "github.com/docker/libnetwork"
  17. "github.com/docker/swarmkit/agent/exec"
  18. "github.com/docker/swarmkit/api"
  19. "github.com/docker/swarmkit/log"
  20. gogotypes "github.com/gogo/protobuf/types"
  21. "github.com/pkg/errors"
  22. "golang.org/x/net/context"
  23. "golang.org/x/time/rate"
  24. )
  25. // controller implements agent.Controller against docker's API.
  26. //
  27. // Most operations against docker's API are done through the container name,
  28. // which is unique to the task.
  29. type controller struct {
  30. task *api.Task
  31. adapter *containerAdapter
  32. closed chan struct{}
  33. err error
  34. pulled chan struct{} // closed after pull
  35. cancelPull func() // cancels pull context if not nil
  36. pullErr error // pull error, only read after pulled closed
  37. }
  38. var _ exec.Controller = &controller{}
  39. // NewController returns a docker exec runner for the provided task.
  40. func newController(b executorpkg.Backend, task *api.Task, secrets exec.SecretGetter) (*controller, error) {
  41. adapter, err := newContainerAdapter(b, task, secrets)
  42. if err != nil {
  43. return nil, err
  44. }
  45. return &controller{
  46. task: task,
  47. adapter: adapter,
  48. closed: make(chan struct{}),
  49. }, nil
  50. }
  51. func (r *controller) Task() (*api.Task, error) {
  52. return r.task, nil
  53. }
  54. // ContainerStatus returns the container-specific status for the task.
  55. func (r *controller) ContainerStatus(ctx context.Context) (*api.ContainerStatus, error) {
  56. ctnr, err := r.adapter.inspect(ctx)
  57. if err != nil {
  58. if isUnknownContainer(err) {
  59. return nil, nil
  60. }
  61. return nil, err
  62. }
  63. return parseContainerStatus(ctnr)
  64. }
  65. func (r *controller) PortStatus(ctx context.Context) (*api.PortStatus, error) {
  66. ctnr, err := r.adapter.inspect(ctx)
  67. if err != nil {
  68. if isUnknownContainer(err) {
  69. return nil, nil
  70. }
  71. return nil, err
  72. }
  73. return parsePortStatus(ctnr)
  74. }
  75. // Update tasks a recent task update and applies it to the container.
  76. func (r *controller) Update(ctx context.Context, t *api.Task) error {
  77. // TODO(stevvooe): While assignment of tasks is idempotent, we do allow
  78. // updates of metadata, such as labelling, as well as any other properties
  79. // that make sense.
  80. return nil
  81. }
  82. // Prepare creates a container and ensures the image is pulled.
  83. //
  84. // If the container has already be created, exec.ErrTaskPrepared is returned.
  85. func (r *controller) Prepare(ctx context.Context) error {
  86. if err := r.checkClosed(); err != nil {
  87. return err
  88. }
  89. // Make sure all the networks that the task needs are created.
  90. if err := r.adapter.createNetworks(ctx); err != nil {
  91. return err
  92. }
  93. // Make sure all the volumes that the task needs are created.
  94. if err := r.adapter.createVolumes(ctx); err != nil {
  95. return err
  96. }
  97. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
  98. if r.pulled == nil {
  99. // Fork the pull to a different context to allow pull to continue
  100. // on re-entrant calls to Prepare. This ensures that Prepare can be
  101. // idempotent and not incur the extra cost of pulling when
  102. // cancelled on updates.
  103. var pctx context.Context
  104. r.pulled = make(chan struct{})
  105. pctx, r.cancelPull = context.WithCancel(context.Background()) // TODO(stevvooe): Bind a context to the entire controller.
  106. go func() {
  107. defer close(r.pulled)
  108. r.pullErr = r.adapter.pullImage(pctx) // protected by closing r.pulled
  109. }()
  110. }
  111. select {
  112. case <-ctx.Done():
  113. return ctx.Err()
  114. case <-r.pulled:
  115. if r.pullErr != nil {
  116. // NOTE(stevvooe): We always try to pull the image to make sure we have
  117. // the most up to date version. This will return an error, but we only
  118. // log it. If the image truly doesn't exist, the create below will
  119. // error out.
  120. //
  121. // This gives us some nice behavior where we use up to date versions of
  122. // mutable tags, but will still run if the old image is available but a
  123. // registry is down.
  124. //
  125. // If you don't want this behavior, lock down your image to an
  126. // immutable tag or digest.
  127. log.G(ctx).WithError(r.pullErr).Error("pulling image failed")
  128. }
  129. }
  130. }
  131. if err := r.adapter.create(ctx); err != nil {
  132. if isContainerCreateNameConflict(err) {
  133. if _, err := r.adapter.inspect(ctx); err != nil {
  134. return err
  135. }
  136. // container is already created. success!
  137. return exec.ErrTaskPrepared
  138. }
  139. return err
  140. }
  141. return nil
  142. }
  143. // Start the container. An error will be returned if the container is already started.
  144. func (r *controller) Start(ctx context.Context) error {
  145. if err := r.checkClosed(); err != nil {
  146. return err
  147. }
  148. ctnr, err := r.adapter.inspect(ctx)
  149. if err != nil {
  150. return err
  151. }
  152. // Detect whether the container has *ever* been started. If so, we don't
  153. // issue the start.
  154. //
  155. // TODO(stevvooe): This is very racy. While reading inspect, another could
  156. // start the process and we could end up starting it twice.
  157. if ctnr.State.Status != "created" {
  158. return exec.ErrTaskStarted
  159. }
  160. for {
  161. if err := r.adapter.start(ctx); err != nil {
  162. if _, ok := err.(libnetwork.ErrNoSuchNetwork); ok {
  163. // Retry network creation again if we
  164. // failed because some of the networks
  165. // were not found.
  166. if err := r.adapter.createNetworks(ctx); err != nil {
  167. return err
  168. }
  169. continue
  170. }
  171. return errors.Wrap(err, "starting container failed")
  172. }
  173. break
  174. }
  175. // no health check
  176. if ctnr.Config == nil || ctnr.Config.Healthcheck == nil || len(ctnr.Config.Healthcheck.Test) == 0 || ctnr.Config.Healthcheck.Test[0] == "NONE" {
  177. if err := r.adapter.activateServiceBinding(); err != nil {
  178. log.G(ctx).WithError(err).Errorf("failed to activate service binding for container %s which has no healthcheck config", r.adapter.container.name())
  179. return err
  180. }
  181. return nil
  182. }
  183. // wait for container to be healthy
  184. eventq := r.adapter.events(ctx)
  185. var healthErr error
  186. for {
  187. select {
  188. case event := <-eventq:
  189. if !r.matchevent(event) {
  190. continue
  191. }
  192. switch event.Action {
  193. case "die": // exit on terminal events
  194. ctnr, err := r.adapter.inspect(ctx)
  195. if err != nil {
  196. return errors.Wrap(err, "die event received")
  197. } else if ctnr.State.ExitCode != 0 {
  198. return &exitError{code: ctnr.State.ExitCode, cause: healthErr}
  199. }
  200. return nil
  201. case "destroy":
  202. // If we get here, something has gone wrong but we want to exit
  203. // and report anyways.
  204. return ErrContainerDestroyed
  205. case "health_status: unhealthy":
  206. // in this case, we stop the container and report unhealthy status
  207. if err := r.Shutdown(ctx); err != nil {
  208. return errors.Wrap(err, "unhealthy container shutdown failed")
  209. }
  210. // set health check error, and wait for container to fully exit ("die" event)
  211. healthErr = ErrContainerUnhealthy
  212. case "health_status: healthy":
  213. if err := r.adapter.activateServiceBinding(); err != nil {
  214. log.G(ctx).WithError(err).Errorf("failed to activate service binding for container %s after healthy event", r.adapter.container.name())
  215. return err
  216. }
  217. return nil
  218. }
  219. case <-ctx.Done():
  220. return ctx.Err()
  221. case <-r.closed:
  222. return r.err
  223. }
  224. }
  225. }
  226. // Wait on the container to exit.
  227. func (r *controller) Wait(pctx context.Context) error {
  228. if err := r.checkClosed(); err != nil {
  229. return err
  230. }
  231. ctx, cancel := context.WithCancel(pctx)
  232. defer cancel()
  233. healthErr := make(chan error, 1)
  234. go func() {
  235. ectx, cancel := context.WithCancel(ctx) // cancel event context on first event
  236. defer cancel()
  237. if err := r.checkHealth(ectx); err == ErrContainerUnhealthy {
  238. healthErr <- ErrContainerUnhealthy
  239. if err := r.Shutdown(ectx); err != nil {
  240. log.G(ectx).WithError(err).Debug("shutdown failed on unhealthy")
  241. }
  242. }
  243. }()
  244. err := r.adapter.wait(ctx)
  245. if ctx.Err() != nil {
  246. return ctx.Err()
  247. }
  248. if err != nil {
  249. ee := &exitError{}
  250. if ec, ok := err.(exec.ExitCoder); ok {
  251. ee.code = ec.ExitCode()
  252. }
  253. select {
  254. case e := <-healthErr:
  255. ee.cause = e
  256. default:
  257. if err.Error() != "" {
  258. ee.cause = err
  259. }
  260. }
  261. return ee
  262. }
  263. return nil
  264. }
  265. // Shutdown the container cleanly.
  266. func (r *controller) Shutdown(ctx context.Context) error {
  267. if err := r.checkClosed(); err != nil {
  268. return err
  269. }
  270. if r.cancelPull != nil {
  271. r.cancelPull()
  272. }
  273. // remove container from service binding
  274. if err := r.adapter.deactivateServiceBinding(); err != nil {
  275. log.G(ctx).WithError(err).Warningf("failed to deactivate service binding for container %s", r.adapter.container.name())
  276. // Don't return an error here, because failure to deactivate
  277. // the service binding is expected if the container was never
  278. // started.
  279. }
  280. if err := r.adapter.shutdown(ctx); err != nil {
  281. if isUnknownContainer(err) || isStoppedContainer(err) {
  282. return nil
  283. }
  284. return err
  285. }
  286. return nil
  287. }
  288. // Terminate the container, with force.
  289. func (r *controller) Terminate(ctx context.Context) error {
  290. if err := r.checkClosed(); err != nil {
  291. return err
  292. }
  293. if r.cancelPull != nil {
  294. r.cancelPull()
  295. }
  296. if err := r.adapter.terminate(ctx); err != nil {
  297. if isUnknownContainer(err) {
  298. return nil
  299. }
  300. return err
  301. }
  302. return nil
  303. }
  304. // Remove the container and its resources.
  305. func (r *controller) Remove(ctx context.Context) error {
  306. if err := r.checkClosed(); err != nil {
  307. return err
  308. }
  309. if r.cancelPull != nil {
  310. r.cancelPull()
  311. }
  312. // It may be necessary to shut down the task before removing it.
  313. if err := r.Shutdown(ctx); err != nil {
  314. if isUnknownContainer(err) {
  315. return nil
  316. }
  317. // This may fail if the task was already shut down.
  318. log.G(ctx).WithError(err).Debug("shutdown failed on removal")
  319. }
  320. // Try removing networks referenced in this task in case this
  321. // task is the last one referencing it
  322. if err := r.adapter.removeNetworks(ctx); err != nil {
  323. if isUnknownContainer(err) {
  324. return nil
  325. }
  326. return err
  327. }
  328. if err := r.adapter.remove(ctx); err != nil {
  329. if isUnknownContainer(err) {
  330. return nil
  331. }
  332. return err
  333. }
  334. return nil
  335. }
  336. // waitReady waits for a container to be "ready".
  337. // Ready means it's past the started state.
  338. func (r *controller) waitReady(pctx context.Context) error {
  339. if err := r.checkClosed(); err != nil {
  340. return err
  341. }
  342. ctx, cancel := context.WithCancel(pctx)
  343. defer cancel()
  344. eventq := r.adapter.events(ctx)
  345. ctnr, err := r.adapter.inspect(ctx)
  346. if err != nil {
  347. if !isUnknownContainer(err) {
  348. return errors.Wrap(err, "inspect container failed")
  349. }
  350. } else {
  351. switch ctnr.State.Status {
  352. case "running", "exited", "dead":
  353. return nil
  354. }
  355. }
  356. for {
  357. select {
  358. case event := <-eventq:
  359. if !r.matchevent(event) {
  360. continue
  361. }
  362. switch event.Action {
  363. case "start":
  364. return nil
  365. }
  366. case <-ctx.Done():
  367. return ctx.Err()
  368. case <-r.closed:
  369. return r.err
  370. }
  371. }
  372. }
  373. func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, options api.LogSubscriptionOptions) error {
  374. if err := r.checkClosed(); err != nil {
  375. return err
  376. }
  377. if err := r.waitReady(ctx); err != nil {
  378. return errors.Wrap(err, "container not ready for logs")
  379. }
  380. rc, err := r.adapter.logs(ctx, options)
  381. if err != nil {
  382. return errors.Wrap(err, "failed getting container logs")
  383. }
  384. defer rc.Close()
  385. var (
  386. // use a rate limiter to keep things under control but also provides some
  387. // ability coalesce messages.
  388. limiter = rate.NewLimiter(rate.Every(time.Second), 10<<20) // 10 MB/s
  389. msgctx = api.LogContext{
  390. NodeID: r.task.NodeID,
  391. ServiceID: r.task.ServiceID,
  392. TaskID: r.task.ID,
  393. }
  394. )
  395. brd := bufio.NewReader(rc)
  396. for {
  397. // so, message header is 8 bytes, treat as uint64, pull stream off MSB
  398. var header uint64
  399. if err := binary.Read(brd, binary.BigEndian, &header); err != nil {
  400. if err == io.EOF {
  401. return nil
  402. }
  403. return errors.Wrap(err, "failed reading log header")
  404. }
  405. stream, size := (header>>(7<<3))&0xFF, header & ^(uint64(0xFF)<<(7<<3))
  406. // limit here to decrease allocation back pressure.
  407. if err := limiter.WaitN(ctx, int(size)); err != nil {
  408. return errors.Wrap(err, "failed rate limiter")
  409. }
  410. buf := make([]byte, size)
  411. _, err := io.ReadFull(brd, buf)
  412. if err != nil {
  413. return errors.Wrap(err, "failed reading buffer")
  414. }
  415. // Timestamp is RFC3339Nano with 1 space after. Lop, parse, publish
  416. parts := bytes.SplitN(buf, []byte(" "), 2)
  417. if len(parts) != 2 {
  418. return fmt.Errorf("invalid timestamp in log message: %v", buf)
  419. }
  420. ts, err := time.Parse(time.RFC3339Nano, string(parts[0]))
  421. if err != nil {
  422. return errors.Wrap(err, "failed to parse timestamp")
  423. }
  424. tsp, err := gogotypes.TimestampProto(ts)
  425. if err != nil {
  426. return errors.Wrap(err, "failed to convert timestamp")
  427. }
  428. if err := publisher.Publish(ctx, api.LogMessage{
  429. Context: msgctx,
  430. Timestamp: tsp,
  431. Stream: api.LogStream(stream),
  432. Data: parts[1],
  433. }); err != nil {
  434. return errors.Wrap(err, "failed to publish log message")
  435. }
  436. }
  437. }
  438. // Close the runner and clean up any ephemeral resources.
  439. func (r *controller) Close() error {
  440. select {
  441. case <-r.closed:
  442. return r.err
  443. default:
  444. if r.cancelPull != nil {
  445. r.cancelPull()
  446. }
  447. r.err = exec.ErrControllerClosed
  448. close(r.closed)
  449. }
  450. return nil
  451. }
  452. func (r *controller) matchevent(event events.Message) bool {
  453. if event.Type != events.ContainerEventType {
  454. return false
  455. }
  456. // TODO(stevvooe): Filter based on ID matching, in addition to name.
  457. // Make sure the events are for this container.
  458. if event.Actor.Attributes["name"] != r.adapter.container.name() {
  459. return false
  460. }
  461. return true
  462. }
  463. func (r *controller) checkClosed() error {
  464. select {
  465. case <-r.closed:
  466. return r.err
  467. default:
  468. return nil
  469. }
  470. }
  471. func parseContainerStatus(ctnr types.ContainerJSON) (*api.ContainerStatus, error) {
  472. status := &api.ContainerStatus{
  473. ContainerID: ctnr.ID,
  474. PID: int32(ctnr.State.Pid),
  475. ExitCode: int32(ctnr.State.ExitCode),
  476. }
  477. return status, nil
  478. }
  479. func parsePortStatus(ctnr types.ContainerJSON) (*api.PortStatus, error) {
  480. status := &api.PortStatus{}
  481. if ctnr.NetworkSettings != nil && len(ctnr.NetworkSettings.Ports) > 0 {
  482. exposedPorts, err := parsePortMap(ctnr.NetworkSettings.Ports)
  483. if err != nil {
  484. return nil, err
  485. }
  486. status.Ports = exposedPorts
  487. }
  488. return status, nil
  489. }
  490. func parsePortMap(portMap nat.PortMap) ([]*api.PortConfig, error) {
  491. exposedPorts := make([]*api.PortConfig, 0, len(portMap))
  492. for portProtocol, mapping := range portMap {
  493. parts := strings.SplitN(string(portProtocol), "/", 2)
  494. if len(parts) != 2 {
  495. return nil, fmt.Errorf("invalid port mapping: %s", portProtocol)
  496. }
  497. port, err := strconv.ParseUint(parts[0], 10, 16)
  498. if err != nil {
  499. return nil, err
  500. }
  501. protocol := api.ProtocolTCP
  502. switch strings.ToLower(parts[1]) {
  503. case "tcp":
  504. protocol = api.ProtocolTCP
  505. case "udp":
  506. protocol = api.ProtocolUDP
  507. default:
  508. return nil, fmt.Errorf("invalid protocol: %s", parts[1])
  509. }
  510. for _, binding := range mapping {
  511. hostPort, err := strconv.ParseUint(binding.HostPort, 10, 16)
  512. if err != nil {
  513. return nil, err
  514. }
  515. // TODO(aluzzardi): We're losing the port `name` here since
  516. // there's no way to retrieve it back from the Engine.
  517. exposedPorts = append(exposedPorts, &api.PortConfig{
  518. PublishMode: api.PublishModeHost,
  519. Protocol: protocol,
  520. TargetPort: uint32(port),
  521. PublishedPort: uint32(hostPort),
  522. })
  523. }
  524. }
  525. return exposedPorts, nil
  526. }
  527. type exitError struct {
  528. code int
  529. cause error
  530. }
  531. func (e *exitError) Error() string {
  532. if e.cause != nil {
  533. return fmt.Sprintf("task: non-zero exit (%v): %v", e.code, e.cause)
  534. }
  535. return fmt.Sprintf("task: non-zero exit (%v)", e.code)
  536. }
  537. func (e *exitError) ExitCode() int {
  538. return int(e.code)
  539. }
  540. func (e *exitError) Cause() error {
  541. return e.cause
  542. }
  543. // checkHealth blocks until unhealthy container is detected or ctx exits
  544. func (r *controller) checkHealth(ctx context.Context) error {
  545. eventq := r.adapter.events(ctx)
  546. for {
  547. select {
  548. case <-ctx.Done():
  549. return nil
  550. case <-r.closed:
  551. return nil
  552. case event := <-eventq:
  553. if !r.matchevent(event) {
  554. continue
  555. }
  556. switch event.Action {
  557. case "health_status: unhealthy":
  558. return ErrContainerUnhealthy
  559. }
  560. }
  561. }
  562. }