adapter.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560
  1. package container // import "github.com/docker/docker/daemon/cluster/executor/container"
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "os"
  9. "strings"
  10. "syscall"
  11. "time"
  12. "github.com/docker/distribution/reference"
  13. "github.com/docker/docker/api/types"
  14. "github.com/docker/docker/api/types/backend"
  15. containertypes "github.com/docker/docker/api/types/container"
  16. "github.com/docker/docker/api/types/events"
  17. imagetypes "github.com/docker/docker/api/types/image"
  18. "github.com/docker/docker/api/types/registry"
  19. containerpkg "github.com/docker/docker/container"
  20. "github.com/docker/docker/daemon"
  21. "github.com/docker/docker/daemon/cluster/convert"
  22. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  23. "github.com/docker/docker/libnetwork"
  24. volumeopts "github.com/docker/docker/volume/service/opts"
  25. gogotypes "github.com/gogo/protobuf/types"
  26. "github.com/moby/swarmkit/v2/agent/exec"
  27. "github.com/moby/swarmkit/v2/api"
  28. "github.com/moby/swarmkit/v2/log"
  29. "github.com/opencontainers/go-digest"
  30. "github.com/pkg/errors"
  31. "github.com/sirupsen/logrus"
  32. "golang.org/x/time/rate"
  33. )
  34. // nodeAttachmentReadyInterval is the interval to poll
  35. const nodeAttachmentReadyInterval = 100 * time.Millisecond
  36. // containerAdapter conducts remote operations for a container. All calls
  37. // are mostly naked calls to the client API, seeded with information from
  38. // containerConfig.
  39. type containerAdapter struct {
  40. backend executorpkg.Backend
  41. imageBackend executorpkg.ImageBackend
  42. volumeBackend executorpkg.VolumeBackend
  43. container *containerConfig
  44. dependencies exec.DependencyGetter
  45. }
  46. func newContainerAdapter(b executorpkg.Backend, i executorpkg.ImageBackend, v executorpkg.VolumeBackend, task *api.Task, node *api.NodeDescription, dependencies exec.DependencyGetter) (*containerAdapter, error) {
  47. ctnr, err := newContainerConfig(task, node)
  48. if err != nil {
  49. return nil, err
  50. }
  51. return &containerAdapter{
  52. container: ctnr,
  53. backend: b,
  54. imageBackend: i,
  55. volumeBackend: v,
  56. dependencies: dependencies,
  57. }, nil
  58. }
  59. func (c *containerAdapter) pullImage(ctx context.Context) error {
  60. spec := c.container.spec()
  61. // Skip pulling if the image is referenced by image ID.
  62. if _, err := digest.Parse(spec.Image); err == nil {
  63. return nil
  64. }
  65. // Skip pulling if the image is referenced by digest and already
  66. // exists locally.
  67. named, err := reference.ParseNormalizedNamed(spec.Image)
  68. if err == nil {
  69. if _, ok := named.(reference.Canonical); ok {
  70. _, err := c.imageBackend.GetImage(ctx, spec.Image, imagetypes.GetImageOpts{})
  71. if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
  72. return err
  73. }
  74. if err == nil {
  75. return nil
  76. }
  77. }
  78. }
  79. // if the image needs to be pulled, the auth config will be retrieved and updated
  80. var encodedAuthConfig string
  81. if spec.PullOptions != nil {
  82. encodedAuthConfig = spec.PullOptions.RegistryAuth
  83. }
  84. authConfig := &registry.AuthConfig{}
  85. if encodedAuthConfig != "" {
  86. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuthConfig))).Decode(authConfig); err != nil {
  87. log.G(ctx).Warnf("invalid authconfig: %v", err)
  88. }
  89. }
  90. pr, pw := io.Pipe()
  91. metaHeaders := map[string][]string{}
  92. go func() {
  93. // TODO LCOW Support: This will need revisiting as
  94. // the stack is built up to include LCOW support for swarm.
  95. err := c.imageBackend.PullImage(ctx, c.container.image(), "", nil, metaHeaders, authConfig, pw)
  96. pw.CloseWithError(err)
  97. }()
  98. dec := json.NewDecoder(pr)
  99. dec.UseNumber()
  100. m := map[string]interface{}{}
  101. spamLimiter := rate.NewLimiter(rate.Every(time.Second), 1)
  102. lastStatus := ""
  103. for {
  104. if err := dec.Decode(&m); err != nil {
  105. if err == io.EOF {
  106. break
  107. }
  108. return err
  109. }
  110. l := log.G(ctx)
  111. // limit pull progress logs unless the status changes
  112. if spamLimiter.Allow() || lastStatus != m["status"] {
  113. // if we have progress details, we have everything we need
  114. if progress, ok := m["progressDetail"].(map[string]interface{}); ok {
  115. // first, log the image and status
  116. l = l.WithFields(logrus.Fields{
  117. "image": c.container.image(),
  118. "status": m["status"],
  119. })
  120. // then, if we have progress, log the progress
  121. if progress["current"] != nil && progress["total"] != nil {
  122. l = l.WithFields(logrus.Fields{
  123. "current": progress["current"],
  124. "total": progress["total"],
  125. })
  126. }
  127. }
  128. l.Debug("pull in progress")
  129. }
  130. // sometimes, we get no useful information at all, and add no fields
  131. if status, ok := m["status"].(string); ok {
  132. lastStatus = status
  133. }
  134. }
  135. // if the final stream object contained an error, return it
  136. if errMsg, ok := m["error"]; ok {
  137. return fmt.Errorf("%v", errMsg)
  138. }
  139. return nil
  140. }
  141. // waitNodeAttachments validates that NetworkAttachments exist on this node
  142. // for every network in use by this task. It blocks until the network
  143. // attachments are ready, or the context times out. If it returns nil, then the
  144. // node's network attachments are all there.
  145. func (c *containerAdapter) waitNodeAttachments(ctx context.Context) error {
  146. // to do this, we're going to get the attachment store and try getting the
  147. // IP address for each network. if any network comes back not existing,
  148. // we'll wait and try again.
  149. attachmentStore := c.backend.GetAttachmentStore()
  150. if attachmentStore == nil {
  151. return fmt.Errorf("error getting attachment store")
  152. }
  153. // essentially, we're long-polling here. this is really sub-optimal, but a
  154. // better solution based off signaling channels would require a more
  155. // substantial rearchitecture and probably not be worth our time in terms
  156. // of performance gains.
  157. poll := time.NewTicker(nodeAttachmentReadyInterval)
  158. defer poll.Stop()
  159. for {
  160. // set a flag ready to true. if we try to get a network IP that doesn't
  161. // exist yet, we will set this flag to "false"
  162. ready := true
  163. for _, attachment := range c.container.networksAttachments {
  164. // we only need node attachments (IP address) for overlay networks
  165. // TODO(dperny): unsure if this will work with other network
  166. // drivers, but i also don't think other network drivers use the
  167. // node attachment IP address.
  168. if attachment.Network.DriverState.Name == "overlay" {
  169. if _, exists := attachmentStore.GetIPForNetwork(attachment.Network.ID); !exists {
  170. ready = false
  171. }
  172. }
  173. }
  174. // if everything is ready here, then we can just return no error
  175. if ready {
  176. return nil
  177. }
  178. // otherwise, try polling again, or wait for context canceled.
  179. select {
  180. case <-ctx.Done():
  181. return fmt.Errorf("node is missing network attachments, ip addresses may be exhausted")
  182. case <-poll.C:
  183. }
  184. }
  185. }
  186. func (c *containerAdapter) createNetworks(ctx context.Context) error {
  187. for name := range c.container.networksAttachments {
  188. ncr, err := c.container.networkCreateRequest(name)
  189. if err != nil {
  190. return err
  191. }
  192. if err := c.backend.CreateManagedNetwork(ncr); err != nil { // todo name missing
  193. if _, ok := err.(libnetwork.NetworkNameError); ok {
  194. continue
  195. }
  196. // We will continue if CreateManagedNetwork returns PredefinedNetworkError error.
  197. // Other callers still can treat it as Error.
  198. if _, ok := err.(daemon.PredefinedNetworkError); ok {
  199. continue
  200. }
  201. return err
  202. }
  203. }
  204. return nil
  205. }
  206. func (c *containerAdapter) removeNetworks(ctx context.Context) error {
  207. var (
  208. activeEndpointsError *libnetwork.ActiveEndpointsError
  209. errNoSuchNetwork libnetwork.ErrNoSuchNetwork
  210. )
  211. for name, v := range c.container.networksAttachments {
  212. if err := c.backend.DeleteManagedNetwork(v.Network.ID); err != nil {
  213. switch {
  214. case errors.As(err, &activeEndpointsError):
  215. continue
  216. case errors.As(err, &errNoSuchNetwork):
  217. continue
  218. default:
  219. log.G(ctx).Errorf("network %s remove failed: %v", name, err)
  220. return err
  221. }
  222. }
  223. }
  224. return nil
  225. }
  226. func (c *containerAdapter) networkAttach(ctx context.Context) error {
  227. config := c.container.createNetworkingConfig(c.backend)
  228. var (
  229. networkName string
  230. networkID string
  231. )
  232. if config != nil {
  233. for n, epConfig := range config.EndpointsConfig {
  234. networkName = n
  235. networkID = epConfig.NetworkID
  236. break
  237. }
  238. }
  239. return c.backend.UpdateAttachment(networkName, networkID, c.container.networkAttachmentContainerID(), config)
  240. }
  241. func (c *containerAdapter) waitForDetach(ctx context.Context) error {
  242. config := c.container.createNetworkingConfig(c.backend)
  243. var (
  244. networkName string
  245. networkID string
  246. )
  247. if config != nil {
  248. for n, epConfig := range config.EndpointsConfig {
  249. networkName = n
  250. networkID = epConfig.NetworkID
  251. break
  252. }
  253. }
  254. return c.backend.WaitForDetachment(ctx, networkName, networkID, c.container.taskID(), c.container.networkAttachmentContainerID())
  255. }
  256. func (c *containerAdapter) create(ctx context.Context) error {
  257. var cr containertypes.CreateResponse
  258. var err error
  259. if cr, err = c.backend.CreateManagedContainer(ctx, types.ContainerCreateConfig{
  260. Name: c.container.name(),
  261. Config: c.container.config(),
  262. HostConfig: c.container.hostConfig(c.dependencies.Volumes()),
  263. // Use the first network in container create
  264. NetworkingConfig: c.container.createNetworkingConfig(c.backend),
  265. }); err != nil {
  266. return err
  267. }
  268. // Docker daemon currently doesn't support multiple networks in container create
  269. // Connect to all other networks
  270. nc := c.container.connectNetworkingConfig(c.backend)
  271. if nc != nil {
  272. for n, ep := range nc.EndpointsConfig {
  273. if err := c.backend.ConnectContainerToNetwork(cr.ID, n, ep); err != nil {
  274. return err
  275. }
  276. }
  277. }
  278. container := c.container.task.Spec.GetContainer()
  279. if container == nil {
  280. return errors.New("unable to get container from task spec")
  281. }
  282. if err := c.backend.SetContainerDependencyStore(cr.ID, c.dependencies); err != nil {
  283. return err
  284. }
  285. // configure secrets
  286. secretRefs := convert.SecretReferencesFromGRPC(container.Secrets)
  287. if err := c.backend.SetContainerSecretReferences(cr.ID, secretRefs); err != nil {
  288. return err
  289. }
  290. configRefs := convert.ConfigReferencesFromGRPC(container.Configs)
  291. if err := c.backend.SetContainerConfigReferences(cr.ID, configRefs); err != nil {
  292. return err
  293. }
  294. return c.backend.UpdateContainerServiceConfig(cr.ID, c.container.serviceConfig())
  295. }
  296. // checkMounts ensures that the provided mounts won't have any host-specific
  297. // problems at start up. For example, we disallow bind mounts without an
  298. // existing path, which slightly different from the container API.
  299. func (c *containerAdapter) checkMounts() error {
  300. spec := c.container.spec()
  301. for _, mount := range spec.Mounts {
  302. switch mount.Type {
  303. case api.MountTypeBind:
  304. if _, err := os.Stat(mount.Source); os.IsNotExist(err) {
  305. return fmt.Errorf("invalid bind mount source, source path not found: %s", mount.Source)
  306. }
  307. }
  308. }
  309. return nil
  310. }
  311. func (c *containerAdapter) start(ctx context.Context) error {
  312. if err := c.checkMounts(); err != nil {
  313. return err
  314. }
  315. return c.backend.ContainerStart(ctx, c.container.name(), nil, "", "")
  316. }
  317. func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) {
  318. cs, err := c.backend.ContainerInspectCurrent(ctx, c.container.name(), false)
  319. if ctx.Err() != nil {
  320. return types.ContainerJSON{}, ctx.Err()
  321. }
  322. if err != nil {
  323. return types.ContainerJSON{}, err
  324. }
  325. return *cs, nil
  326. }
  327. // events issues a call to the events API and returns a channel with all
  328. // events. The stream of events can be shutdown by cancelling the context.
  329. func (c *containerAdapter) events(ctx context.Context) <-chan events.Message {
  330. log.G(ctx).Debugf("waiting on events")
  331. buffer, l := c.backend.SubscribeToEvents(time.Time{}, time.Time{}, c.container.eventFilter())
  332. eventsq := make(chan events.Message, len(buffer))
  333. for _, event := range buffer {
  334. eventsq <- event
  335. }
  336. go func() {
  337. defer c.backend.UnsubscribeFromEvents(l)
  338. for {
  339. select {
  340. case ev := <-l:
  341. jev, ok := ev.(events.Message)
  342. if !ok {
  343. log.G(ctx).Warnf("unexpected event message: %q", ev)
  344. continue
  345. }
  346. select {
  347. case eventsq <- jev:
  348. case <-ctx.Done():
  349. return
  350. }
  351. case <-ctx.Done():
  352. return
  353. }
  354. }
  355. }()
  356. return eventsq
  357. }
  358. func (c *containerAdapter) wait(ctx context.Context) (<-chan containerpkg.StateStatus, error) {
  359. return c.backend.ContainerWait(ctx, c.container.nameOrID(), containerpkg.WaitConditionNotRunning)
  360. }
  361. func (c *containerAdapter) shutdown(ctx context.Context) error {
  362. options := containertypes.StopOptions{}
  363. // Default stop grace period to nil (daemon will use the stopTimeout of the container)
  364. if spec := c.container.spec(); spec.StopGracePeriod != nil {
  365. timeout := int(spec.StopGracePeriod.Seconds)
  366. options.Timeout = &timeout
  367. }
  368. return c.backend.ContainerStop(ctx, c.container.name(), options)
  369. }
  370. func (c *containerAdapter) terminate(ctx context.Context) error {
  371. return c.backend.ContainerKill(c.container.name(), syscall.SIGKILL.String())
  372. }
  373. func (c *containerAdapter) remove(ctx context.Context) error {
  374. return c.backend.ContainerRm(c.container.name(), &types.ContainerRmConfig{
  375. RemoveVolume: true,
  376. ForceRemove: true,
  377. })
  378. }
  379. func (c *containerAdapter) createVolumes(ctx context.Context) error {
  380. // Create plugin volumes that are embedded inside a Mount
  381. for _, mount := range c.container.task.Spec.GetContainer().Mounts {
  382. mount := mount
  383. if mount.Type != api.MountTypeVolume {
  384. continue
  385. }
  386. if mount.VolumeOptions == nil {
  387. continue
  388. }
  389. if mount.VolumeOptions.DriverConfig == nil {
  390. continue
  391. }
  392. req := c.container.volumeCreateRequest(&mount)
  393. // Check if this volume exists on the engine
  394. if _, err := c.volumeBackend.Create(ctx, req.Name, req.Driver,
  395. volumeopts.WithCreateOptions(req.DriverOpts),
  396. volumeopts.WithCreateLabels(req.Labels),
  397. ); err != nil {
  398. // TODO(amitshukla): Today, volume create through the engine api does not return an error
  399. // when the named volume with the same parameters already exists.
  400. // It returns an error if the driver name is different - that is a valid error
  401. return err
  402. }
  403. }
  404. return nil
  405. }
  406. // waitClusterVolumes blocks until the VolumeGetter returns a path for each
  407. // cluster volume in use by this task
  408. func (c *containerAdapter) waitClusterVolumes(ctx context.Context) error {
  409. for _, attached := range c.container.task.Volumes {
  410. // for every attachment, try until we succeed or until the context
  411. // is canceled.
  412. for {
  413. select {
  414. case <-ctx.Done():
  415. return ctx.Err()
  416. default:
  417. // continue through the code.
  418. }
  419. path, err := c.dependencies.Volumes().Get(attached.ID)
  420. if err == nil && path != "" {
  421. // break out of the inner-most loop
  422. break
  423. }
  424. }
  425. }
  426. log.G(ctx).Debug("volumes ready")
  427. return nil
  428. }
  429. func (c *containerAdapter) activateServiceBinding() error {
  430. return c.backend.ActivateContainerServiceBinding(c.container.name())
  431. }
  432. func (c *containerAdapter) deactivateServiceBinding() error {
  433. return c.backend.DeactivateContainerServiceBinding(c.container.name())
  434. }
  435. func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (<-chan *backend.LogMessage, error) {
  436. apiOptions := &types.ContainerLogsOptions{
  437. Follow: options.Follow,
  438. // Always say yes to Timestamps and Details. we make the decision
  439. // of whether to return these to the user or not way higher up the
  440. // stack.
  441. Timestamps: true,
  442. Details: true,
  443. }
  444. if options.Since != nil {
  445. since, err := gogotypes.TimestampFromProto(options.Since)
  446. if err != nil {
  447. return nil, err
  448. }
  449. // print since as this formatted string because the docker container
  450. // logs interface expects it like this.
  451. // see github.com/docker/docker/api/types/time.ParseTimestamps
  452. apiOptions.Since = fmt.Sprintf("%d.%09d", since.Unix(), int64(since.Nanosecond()))
  453. }
  454. if options.Tail < 0 {
  455. // See protobuf documentation for details of how this works.
  456. apiOptions.Tail = fmt.Sprint(-options.Tail - 1)
  457. } else if options.Tail > 0 {
  458. return nil, errors.New("tail relative to start of logs not supported via docker API")
  459. }
  460. if len(options.Streams) == 0 {
  461. // empty == all
  462. apiOptions.ShowStdout, apiOptions.ShowStderr = true, true
  463. } else {
  464. for _, stream := range options.Streams {
  465. switch stream {
  466. case api.LogStreamStdout:
  467. apiOptions.ShowStdout = true
  468. case api.LogStreamStderr:
  469. apiOptions.ShowStderr = true
  470. }
  471. }
  472. }
  473. msgs, _, err := c.backend.ContainerLogs(ctx, c.container.name(), apiOptions)
  474. if err != nil {
  475. return nil, err
  476. }
  477. return msgs, nil
  478. }
  479. // todo: typed/wrapped errors
  480. func isContainerCreateNameConflict(err error) bool {
  481. return strings.Contains(err.Error(), "Conflict. The name")
  482. }
  483. func isUnknownContainer(err error) bool {
  484. return strings.Contains(err.Error(), "No such container:")
  485. }
  486. func isStoppedContainer(err error) bool {
  487. return strings.Contains(err.Error(), "is already stopped")
  488. }