controller.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694
  1. package container
  2. import (
  3. "bufio"
  4. "bytes"
  5. "encoding/binary"
  6. "fmt"
  7. "io"
  8. "os"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/docker/docker/api/types"
  13. "github.com/docker/docker/api/types/events"
  14. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  15. "github.com/docker/go-connections/nat"
  16. "github.com/docker/libnetwork"
  17. "github.com/docker/swarmkit/agent/exec"
  18. "github.com/docker/swarmkit/api"
  19. "github.com/docker/swarmkit/log"
  20. gogotypes "github.com/gogo/protobuf/types"
  21. "github.com/pkg/errors"
  22. "golang.org/x/net/context"
  23. "golang.org/x/time/rate"
  24. )
  25. const defaultGossipConvergeDelay = 2 * time.Second
  26. // controller implements agent.Controller against docker's API.
  27. //
  28. // Most operations against docker's API are done through the container name,
  29. // which is unique to the task.
  30. type controller struct {
  31. task *api.Task
  32. adapter *containerAdapter
  33. closed chan struct{}
  34. err error
  35. pulled chan struct{} // closed after pull
  36. cancelPull func() // cancels pull context if not nil
  37. pullErr error // pull error, only read after pulled closed
  38. }
  39. var _ exec.Controller = &controller{}
  40. // NewController returns a docker exec runner for the provided task.
  41. func newController(b executorpkg.Backend, task *api.Task, secrets exec.SecretGetter) (*controller, error) {
  42. adapter, err := newContainerAdapter(b, task, secrets)
  43. if err != nil {
  44. return nil, err
  45. }
  46. return &controller{
  47. task: task,
  48. adapter: adapter,
  49. closed: make(chan struct{}),
  50. }, nil
  51. }
  52. func (r *controller) Task() (*api.Task, error) {
  53. return r.task, nil
  54. }
  55. // ContainerStatus returns the container-specific status for the task.
  56. func (r *controller) ContainerStatus(ctx context.Context) (*api.ContainerStatus, error) {
  57. ctnr, err := r.adapter.inspect(ctx)
  58. if err != nil {
  59. if isUnknownContainer(err) {
  60. return nil, nil
  61. }
  62. return nil, err
  63. }
  64. return parseContainerStatus(ctnr)
  65. }
  66. func (r *controller) PortStatus(ctx context.Context) (*api.PortStatus, error) {
  67. ctnr, err := r.adapter.inspect(ctx)
  68. if err != nil {
  69. if isUnknownContainer(err) {
  70. return nil, nil
  71. }
  72. return nil, err
  73. }
  74. return parsePortStatus(ctnr)
  75. }
  76. // Update tasks a recent task update and applies it to the container.
  77. func (r *controller) Update(ctx context.Context, t *api.Task) error {
  78. // TODO(stevvooe): While assignment of tasks is idempotent, we do allow
  79. // updates of metadata, such as labelling, as well as any other properties
  80. // that make sense.
  81. return nil
  82. }
  83. // Prepare creates a container and ensures the image is pulled.
  84. //
  85. // If the container has already be created, exec.ErrTaskPrepared is returned.
  86. func (r *controller) Prepare(ctx context.Context) error {
  87. if err := r.checkClosed(); err != nil {
  88. return err
  89. }
  90. // Make sure all the networks that the task needs are created.
  91. if err := r.adapter.createNetworks(ctx); err != nil {
  92. return err
  93. }
  94. // Make sure all the volumes that the task needs are created.
  95. if err := r.adapter.createVolumes(ctx); err != nil {
  96. return err
  97. }
  98. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
  99. if r.pulled == nil {
  100. // Fork the pull to a different context to allow pull to continue
  101. // on re-entrant calls to Prepare. This ensures that Prepare can be
  102. // idempotent and not incur the extra cost of pulling when
  103. // cancelled on updates.
  104. var pctx context.Context
  105. r.pulled = make(chan struct{})
  106. pctx, r.cancelPull = context.WithCancel(context.Background()) // TODO(stevvooe): Bind a context to the entire controller.
  107. go func() {
  108. defer close(r.pulled)
  109. r.pullErr = r.adapter.pullImage(pctx) // protected by closing r.pulled
  110. }()
  111. }
  112. select {
  113. case <-ctx.Done():
  114. return ctx.Err()
  115. case <-r.pulled:
  116. if r.pullErr != nil {
  117. // NOTE(stevvooe): We always try to pull the image to make sure we have
  118. // the most up to date version. This will return an error, but we only
  119. // log it. If the image truly doesn't exist, the create below will
  120. // error out.
  121. //
  122. // This gives us some nice behavior where we use up to date versions of
  123. // mutable tags, but will still run if the old image is available but a
  124. // registry is down.
  125. //
  126. // If you don't want this behavior, lock down your image to an
  127. // immutable tag or digest.
  128. log.G(ctx).WithError(r.pullErr).Error("pulling image failed")
  129. }
  130. }
  131. }
  132. if err := r.adapter.create(ctx); err != nil {
  133. if isContainerCreateNameConflict(err) {
  134. if _, err := r.adapter.inspect(ctx); err != nil {
  135. return err
  136. }
  137. // container is already created. success!
  138. return exec.ErrTaskPrepared
  139. }
  140. return err
  141. }
  142. return nil
  143. }
  144. // Start the container. An error will be returned if the container is already started.
  145. func (r *controller) Start(ctx context.Context) error {
  146. if err := r.checkClosed(); err != nil {
  147. return err
  148. }
  149. ctnr, err := r.adapter.inspect(ctx)
  150. if err != nil {
  151. return err
  152. }
  153. // Detect whether the container has *ever* been started. If so, we don't
  154. // issue the start.
  155. //
  156. // TODO(stevvooe): This is very racy. While reading inspect, another could
  157. // start the process and we could end up starting it twice.
  158. if ctnr.State.Status != "created" {
  159. return exec.ErrTaskStarted
  160. }
  161. for {
  162. if err := r.adapter.start(ctx); err != nil {
  163. if _, ok := err.(libnetwork.ErrNoSuchNetwork); ok {
  164. // Retry network creation again if we
  165. // failed because some of the networks
  166. // were not found.
  167. if err := r.adapter.createNetworks(ctx); err != nil {
  168. return err
  169. }
  170. continue
  171. }
  172. return errors.Wrap(err, "starting container failed")
  173. }
  174. break
  175. }
  176. // no health check
  177. if ctnr.Config == nil || ctnr.Config.Healthcheck == nil || len(ctnr.Config.Healthcheck.Test) == 0 || ctnr.Config.Healthcheck.Test[0] == "NONE" {
  178. if err := r.adapter.activateServiceBinding(); err != nil {
  179. log.G(ctx).WithError(err).Errorf("failed to activate service binding for container %s which has no healthcheck config", r.adapter.container.name())
  180. return err
  181. }
  182. return nil
  183. }
  184. // wait for container to be healthy
  185. eventq := r.adapter.events(ctx)
  186. var healthErr error
  187. for {
  188. select {
  189. case event := <-eventq:
  190. if !r.matchevent(event) {
  191. continue
  192. }
  193. switch event.Action {
  194. case "die": // exit on terminal events
  195. ctnr, err := r.adapter.inspect(ctx)
  196. if err != nil {
  197. return errors.Wrap(err, "die event received")
  198. } else if ctnr.State.ExitCode != 0 {
  199. return &exitError{code: ctnr.State.ExitCode, cause: healthErr}
  200. }
  201. return nil
  202. case "destroy":
  203. // If we get here, something has gone wrong but we want to exit
  204. // and report anyways.
  205. return ErrContainerDestroyed
  206. case "health_status: unhealthy":
  207. // in this case, we stop the container and report unhealthy status
  208. if err := r.Shutdown(ctx); err != nil {
  209. return errors.Wrap(err, "unhealthy container shutdown failed")
  210. }
  211. // set health check error, and wait for container to fully exit ("die" event)
  212. healthErr = ErrContainerUnhealthy
  213. case "health_status: healthy":
  214. if err := r.adapter.activateServiceBinding(); err != nil {
  215. log.G(ctx).WithError(err).Errorf("failed to activate service binding for container %s after healthy event", r.adapter.container.name())
  216. return err
  217. }
  218. return nil
  219. }
  220. case <-ctx.Done():
  221. return ctx.Err()
  222. case <-r.closed:
  223. return r.err
  224. }
  225. }
  226. }
  227. // Wait on the container to exit.
  228. func (r *controller) Wait(pctx context.Context) error {
  229. if err := r.checkClosed(); err != nil {
  230. return err
  231. }
  232. ctx, cancel := context.WithCancel(pctx)
  233. defer cancel()
  234. healthErr := make(chan error, 1)
  235. go func() {
  236. ectx, cancel := context.WithCancel(ctx) // cancel event context on first event
  237. defer cancel()
  238. if err := r.checkHealth(ectx); err == ErrContainerUnhealthy {
  239. healthErr <- ErrContainerUnhealthy
  240. if err := r.Shutdown(ectx); err != nil {
  241. log.G(ectx).WithError(err).Debug("shutdown failed on unhealthy")
  242. }
  243. }
  244. }()
  245. err := r.adapter.wait(ctx)
  246. if ctx.Err() != nil {
  247. return ctx.Err()
  248. }
  249. if err != nil {
  250. ee := &exitError{}
  251. if ec, ok := err.(exec.ExitCoder); ok {
  252. ee.code = ec.ExitCode()
  253. }
  254. select {
  255. case e := <-healthErr:
  256. ee.cause = e
  257. default:
  258. if err.Error() != "" {
  259. ee.cause = err
  260. }
  261. }
  262. return ee
  263. }
  264. return nil
  265. }
  266. func (r *controller) hasServiceBinding() bool {
  267. if r.task == nil {
  268. return false
  269. }
  270. // service is attached to a network besides the default bridge
  271. for _, na := range r.task.Networks {
  272. if na.Network == nil ||
  273. na.Network.DriverState == nil ||
  274. na.Network.DriverState.Name == "bridge" && na.Network.Spec.Annotations.Name == "bridge" {
  275. continue
  276. }
  277. return true
  278. }
  279. return false
  280. }
  281. // Shutdown the container cleanly.
  282. func (r *controller) Shutdown(ctx context.Context) error {
  283. if err := r.checkClosed(); err != nil {
  284. return err
  285. }
  286. if r.cancelPull != nil {
  287. r.cancelPull()
  288. }
  289. if r.hasServiceBinding() {
  290. // remove container from service binding
  291. if err := r.adapter.deactivateServiceBinding(); err != nil {
  292. log.G(ctx).WithError(err).Warningf("failed to deactivate service binding for container %s", r.adapter.container.name())
  293. // Don't return an error here, because failure to deactivate
  294. // the service binding is expected if the container was never
  295. // started.
  296. }
  297. // add a delay for gossip converge
  298. // TODO(dongluochen): this delay shoud be configurable to fit different cluster size and network delay.
  299. time.Sleep(defaultGossipConvergeDelay)
  300. }
  301. if err := r.adapter.shutdown(ctx); err != nil {
  302. if isUnknownContainer(err) || isStoppedContainer(err) {
  303. return nil
  304. }
  305. return err
  306. }
  307. return nil
  308. }
  309. // Terminate the container, with force.
  310. func (r *controller) Terminate(ctx context.Context) error {
  311. if err := r.checkClosed(); err != nil {
  312. return err
  313. }
  314. if r.cancelPull != nil {
  315. r.cancelPull()
  316. }
  317. if err := r.adapter.terminate(ctx); err != nil {
  318. if isUnknownContainer(err) {
  319. return nil
  320. }
  321. return err
  322. }
  323. return nil
  324. }
  325. // Remove the container and its resources.
  326. func (r *controller) Remove(ctx context.Context) error {
  327. if err := r.checkClosed(); err != nil {
  328. return err
  329. }
  330. if r.cancelPull != nil {
  331. r.cancelPull()
  332. }
  333. // It may be necessary to shut down the task before removing it.
  334. if err := r.Shutdown(ctx); err != nil {
  335. if isUnknownContainer(err) {
  336. return nil
  337. }
  338. // This may fail if the task was already shut down.
  339. log.G(ctx).WithError(err).Debug("shutdown failed on removal")
  340. }
  341. // Try removing networks referenced in this task in case this
  342. // task is the last one referencing it
  343. if err := r.adapter.removeNetworks(ctx); err != nil {
  344. if isUnknownContainer(err) {
  345. return nil
  346. }
  347. return err
  348. }
  349. if err := r.adapter.remove(ctx); err != nil {
  350. if isUnknownContainer(err) {
  351. return nil
  352. }
  353. return err
  354. }
  355. return nil
  356. }
  357. // waitReady waits for a container to be "ready".
  358. // Ready means it's past the started state.
  359. func (r *controller) waitReady(pctx context.Context) error {
  360. if err := r.checkClosed(); err != nil {
  361. return err
  362. }
  363. ctx, cancel := context.WithCancel(pctx)
  364. defer cancel()
  365. eventq := r.adapter.events(ctx)
  366. ctnr, err := r.adapter.inspect(ctx)
  367. if err != nil {
  368. if !isUnknownContainer(err) {
  369. return errors.Wrap(err, "inspect container failed")
  370. }
  371. } else {
  372. switch ctnr.State.Status {
  373. case "running", "exited", "dead":
  374. return nil
  375. }
  376. }
  377. for {
  378. select {
  379. case event := <-eventq:
  380. if !r.matchevent(event) {
  381. continue
  382. }
  383. switch event.Action {
  384. case "start":
  385. return nil
  386. }
  387. case <-ctx.Done():
  388. return ctx.Err()
  389. case <-r.closed:
  390. return r.err
  391. }
  392. }
  393. }
  394. func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, options api.LogSubscriptionOptions) error {
  395. if err := r.checkClosed(); err != nil {
  396. return err
  397. }
  398. if err := r.waitReady(ctx); err != nil {
  399. return errors.Wrap(err, "container not ready for logs")
  400. }
  401. rc, err := r.adapter.logs(ctx, options)
  402. if err != nil {
  403. return errors.Wrap(err, "failed getting container logs")
  404. }
  405. defer rc.Close()
  406. var (
  407. // use a rate limiter to keep things under control but also provides some
  408. // ability coalesce messages.
  409. limiter = rate.NewLimiter(rate.Every(time.Second), 10<<20) // 10 MB/s
  410. msgctx = api.LogContext{
  411. NodeID: r.task.NodeID,
  412. ServiceID: r.task.ServiceID,
  413. TaskID: r.task.ID,
  414. }
  415. )
  416. brd := bufio.NewReader(rc)
  417. for {
  418. // so, message header is 8 bytes, treat as uint64, pull stream off MSB
  419. var header uint64
  420. if err := binary.Read(brd, binary.BigEndian, &header); err != nil {
  421. if err == io.EOF {
  422. return nil
  423. }
  424. return errors.Wrap(err, "failed reading log header")
  425. }
  426. stream, size := (header>>(7<<3))&0xFF, header & ^(uint64(0xFF)<<(7<<3))
  427. // limit here to decrease allocation back pressure.
  428. if err := limiter.WaitN(ctx, int(size)); err != nil {
  429. return errors.Wrap(err, "failed rate limiter")
  430. }
  431. buf := make([]byte, size)
  432. _, err := io.ReadFull(brd, buf)
  433. if err != nil {
  434. return errors.Wrap(err, "failed reading buffer")
  435. }
  436. // Timestamp is RFC3339Nano with 1 space after. Lop, parse, publish
  437. parts := bytes.SplitN(buf, []byte(" "), 2)
  438. if len(parts) != 2 {
  439. return fmt.Errorf("invalid timestamp in log message: %v", buf)
  440. }
  441. ts, err := time.Parse(time.RFC3339Nano, string(parts[0]))
  442. if err != nil {
  443. return errors.Wrap(err, "failed to parse timestamp")
  444. }
  445. tsp, err := gogotypes.TimestampProto(ts)
  446. if err != nil {
  447. return errors.Wrap(err, "failed to convert timestamp")
  448. }
  449. if err := publisher.Publish(ctx, api.LogMessage{
  450. Context: msgctx,
  451. Timestamp: tsp,
  452. Stream: api.LogStream(stream),
  453. Data: parts[1],
  454. }); err != nil {
  455. return errors.Wrap(err, "failed to publish log message")
  456. }
  457. }
  458. }
  459. // Close the runner and clean up any ephemeral resources.
  460. func (r *controller) Close() error {
  461. select {
  462. case <-r.closed:
  463. return r.err
  464. default:
  465. if r.cancelPull != nil {
  466. r.cancelPull()
  467. }
  468. r.err = exec.ErrControllerClosed
  469. close(r.closed)
  470. }
  471. return nil
  472. }
  473. func (r *controller) matchevent(event events.Message) bool {
  474. if event.Type != events.ContainerEventType {
  475. return false
  476. }
  477. // TODO(stevvooe): Filter based on ID matching, in addition to name.
  478. // Make sure the events are for this container.
  479. if event.Actor.Attributes["name"] != r.adapter.container.name() {
  480. return false
  481. }
  482. return true
  483. }
  484. func (r *controller) checkClosed() error {
  485. select {
  486. case <-r.closed:
  487. return r.err
  488. default:
  489. return nil
  490. }
  491. }
  492. func parseContainerStatus(ctnr types.ContainerJSON) (*api.ContainerStatus, error) {
  493. status := &api.ContainerStatus{
  494. ContainerID: ctnr.ID,
  495. PID: int32(ctnr.State.Pid),
  496. ExitCode: int32(ctnr.State.ExitCode),
  497. }
  498. return status, nil
  499. }
  500. func parsePortStatus(ctnr types.ContainerJSON) (*api.PortStatus, error) {
  501. status := &api.PortStatus{}
  502. if ctnr.NetworkSettings != nil && len(ctnr.NetworkSettings.Ports) > 0 {
  503. exposedPorts, err := parsePortMap(ctnr.NetworkSettings.Ports)
  504. if err != nil {
  505. return nil, err
  506. }
  507. status.Ports = exposedPorts
  508. }
  509. return status, nil
  510. }
  511. func parsePortMap(portMap nat.PortMap) ([]*api.PortConfig, error) {
  512. exposedPorts := make([]*api.PortConfig, 0, len(portMap))
  513. for portProtocol, mapping := range portMap {
  514. parts := strings.SplitN(string(portProtocol), "/", 2)
  515. if len(parts) != 2 {
  516. return nil, fmt.Errorf("invalid port mapping: %s", portProtocol)
  517. }
  518. port, err := strconv.ParseUint(parts[0], 10, 16)
  519. if err != nil {
  520. return nil, err
  521. }
  522. protocol := api.ProtocolTCP
  523. switch strings.ToLower(parts[1]) {
  524. case "tcp":
  525. protocol = api.ProtocolTCP
  526. case "udp":
  527. protocol = api.ProtocolUDP
  528. default:
  529. return nil, fmt.Errorf("invalid protocol: %s", parts[1])
  530. }
  531. for _, binding := range mapping {
  532. hostPort, err := strconv.ParseUint(binding.HostPort, 10, 16)
  533. if err != nil {
  534. return nil, err
  535. }
  536. // TODO(aluzzardi): We're losing the port `name` here since
  537. // there's no way to retrieve it back from the Engine.
  538. exposedPorts = append(exposedPorts, &api.PortConfig{
  539. PublishMode: api.PublishModeHost,
  540. Protocol: protocol,
  541. TargetPort: uint32(port),
  542. PublishedPort: uint32(hostPort),
  543. })
  544. }
  545. }
  546. return exposedPorts, nil
  547. }
  548. type exitError struct {
  549. code int
  550. cause error
  551. }
  552. func (e *exitError) Error() string {
  553. if e.cause != nil {
  554. return fmt.Sprintf("task: non-zero exit (%v): %v", e.code, e.cause)
  555. }
  556. return fmt.Sprintf("task: non-zero exit (%v)", e.code)
  557. }
  558. func (e *exitError) ExitCode() int {
  559. return int(e.code)
  560. }
  561. func (e *exitError) Cause() error {
  562. return e.cause
  563. }
  564. // checkHealth blocks until unhealthy container is detected or ctx exits
  565. func (r *controller) checkHealth(ctx context.Context) error {
  566. eventq := r.adapter.events(ctx)
  567. for {
  568. select {
  569. case <-ctx.Done():
  570. return nil
  571. case <-r.closed:
  572. return nil
  573. case event := <-eventq:
  574. if !r.matchevent(event) {
  575. continue
  576. }
  577. switch event.Action {
  578. case "health_status: unhealthy":
  579. return ErrContainerUnhealthy
  580. }
  581. }
  582. }
  583. }