adapter.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. package container
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "os"
  9. "runtime"
  10. "strings"
  11. "syscall"
  12. "time"
  13. "github.com/docker/distribution/reference"
  14. "github.com/docker/docker/api/types"
  15. "github.com/docker/docker/api/types/backend"
  16. containertypes "github.com/docker/docker/api/types/container"
  17. "github.com/docker/docker/api/types/events"
  18. containerpkg "github.com/docker/docker/container"
  19. "github.com/docker/docker/daemon/cluster/convert"
  20. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  21. "github.com/docker/docker/pkg/system"
  22. "github.com/docker/libnetwork"
  23. "github.com/docker/swarmkit/agent/exec"
  24. "github.com/docker/swarmkit/api"
  25. "github.com/docker/swarmkit/log"
  26. gogotypes "github.com/gogo/protobuf/types"
  27. "github.com/opencontainers/go-digest"
  28. "github.com/sirupsen/logrus"
  29. "golang.org/x/net/context"
  30. "golang.org/x/time/rate"
  31. )
  32. // containerAdapter conducts remote operations for a container. All calls
  33. // are mostly naked calls to the client API, seeded with information from
  34. // containerConfig.
  35. type containerAdapter struct {
  36. backend executorpkg.Backend
  37. container *containerConfig
  38. dependencies exec.DependencyGetter
  39. }
  40. func newContainerAdapter(b executorpkg.Backend, task *api.Task, node *api.NodeDescription, dependencies exec.DependencyGetter) (*containerAdapter, error) {
  41. ctnr, err := newContainerConfig(task, node)
  42. if err != nil {
  43. return nil, err
  44. }
  45. return &containerAdapter{
  46. container: ctnr,
  47. backend: b,
  48. dependencies: dependencies,
  49. }, nil
  50. }
  51. func (c *containerAdapter) pullImage(ctx context.Context) error {
  52. spec := c.container.spec()
  53. // Skip pulling if the image is referenced by image ID.
  54. if _, err := digest.Parse(spec.Image); err == nil {
  55. return nil
  56. }
  57. // Skip pulling if the image is referenced by digest and already
  58. // exists locally.
  59. named, err := reference.ParseNormalizedNamed(spec.Image)
  60. if err == nil {
  61. if _, ok := named.(reference.Canonical); ok {
  62. _, err := c.backend.LookupImage(spec.Image)
  63. if err == nil {
  64. return nil
  65. }
  66. }
  67. }
  68. // if the image needs to be pulled, the auth config will be retrieved and updated
  69. var encodedAuthConfig string
  70. if spec.PullOptions != nil {
  71. encodedAuthConfig = spec.PullOptions.RegistryAuth
  72. }
  73. authConfig := &types.AuthConfig{}
  74. if encodedAuthConfig != "" {
  75. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuthConfig))).Decode(authConfig); err != nil {
  76. logrus.Warnf("invalid authconfig: %v", err)
  77. }
  78. }
  79. pr, pw := io.Pipe()
  80. metaHeaders := map[string][]string{}
  81. go func() {
  82. // TODO @jhowardmsft LCOW Support: This will need revisiting as
  83. // the stack is built up to include LCOW support for swarm.
  84. platform := runtime.GOOS
  85. if system.LCOWSupported() {
  86. platform = "linux"
  87. }
  88. err := c.backend.PullImage(ctx, c.container.image(), "", platform, metaHeaders, authConfig, pw)
  89. pw.CloseWithError(err)
  90. }()
  91. dec := json.NewDecoder(pr)
  92. dec.UseNumber()
  93. m := map[string]interface{}{}
  94. spamLimiter := rate.NewLimiter(rate.Every(time.Second), 1)
  95. lastStatus := ""
  96. for {
  97. if err := dec.Decode(&m); err != nil {
  98. if err == io.EOF {
  99. break
  100. }
  101. return err
  102. }
  103. l := log.G(ctx)
  104. // limit pull progress logs unless the status changes
  105. if spamLimiter.Allow() || lastStatus != m["status"] {
  106. // if we have progress details, we have everything we need
  107. if progress, ok := m["progressDetail"].(map[string]interface{}); ok {
  108. // first, log the image and status
  109. l = l.WithFields(logrus.Fields{
  110. "image": c.container.image(),
  111. "status": m["status"],
  112. })
  113. // then, if we have progress, log the progress
  114. if progress["current"] != nil && progress["total"] != nil {
  115. l = l.WithFields(logrus.Fields{
  116. "current": progress["current"],
  117. "total": progress["total"],
  118. })
  119. }
  120. }
  121. l.Debug("pull in progress")
  122. }
  123. // sometimes, we get no useful information at all, and add no fields
  124. if status, ok := m["status"].(string); ok {
  125. lastStatus = status
  126. }
  127. }
  128. // if the final stream object contained an error, return it
  129. if errMsg, ok := m["error"]; ok {
  130. return fmt.Errorf("%v", errMsg)
  131. }
  132. return nil
  133. }
  134. func (c *containerAdapter) createNetworks(ctx context.Context) error {
  135. for _, network := range c.container.networks() {
  136. ncr, err := c.container.networkCreateRequest(network)
  137. if err != nil {
  138. return err
  139. }
  140. if err := c.backend.CreateManagedNetwork(ncr); err != nil { // todo name missing
  141. if _, ok := err.(libnetwork.NetworkNameError); ok {
  142. continue
  143. }
  144. return err
  145. }
  146. }
  147. return nil
  148. }
  149. func (c *containerAdapter) removeNetworks(ctx context.Context) error {
  150. for _, nid := range c.container.networks() {
  151. if err := c.backend.DeleteManagedNetwork(nid); err != nil {
  152. switch err.(type) {
  153. case *libnetwork.ActiveEndpointsError:
  154. continue
  155. case libnetwork.ErrNoSuchNetwork:
  156. continue
  157. default:
  158. log.G(ctx).Errorf("network %s remove failed: %v", nid, err)
  159. return err
  160. }
  161. }
  162. }
  163. return nil
  164. }
  165. func (c *containerAdapter) networkAttach(ctx context.Context) error {
  166. config := c.container.createNetworkingConfig(c.backend)
  167. var (
  168. networkName string
  169. networkID string
  170. )
  171. if config != nil {
  172. for n, epConfig := range config.EndpointsConfig {
  173. networkName = n
  174. networkID = epConfig.NetworkID
  175. break
  176. }
  177. }
  178. return c.backend.UpdateAttachment(networkName, networkID, c.container.networkAttachmentContainerID(), config)
  179. }
  180. func (c *containerAdapter) waitForDetach(ctx context.Context) error {
  181. config := c.container.createNetworkingConfig(c.backend)
  182. var (
  183. networkName string
  184. networkID string
  185. )
  186. if config != nil {
  187. for n, epConfig := range config.EndpointsConfig {
  188. networkName = n
  189. networkID = epConfig.NetworkID
  190. break
  191. }
  192. }
  193. return c.backend.WaitForDetachment(ctx, networkName, networkID, c.container.taskID(), c.container.networkAttachmentContainerID())
  194. }
  195. func (c *containerAdapter) create(ctx context.Context) error {
  196. var cr containertypes.ContainerCreateCreatedBody
  197. var err error
  198. if cr, err = c.backend.CreateManagedContainer(types.ContainerCreateConfig{
  199. Name: c.container.name(),
  200. Config: c.container.config(),
  201. HostConfig: c.container.hostConfig(),
  202. // Use the first network in container create
  203. NetworkingConfig: c.container.createNetworkingConfig(c.backend),
  204. }); err != nil {
  205. return err
  206. }
  207. // Docker daemon currently doesn't support multiple networks in container create
  208. // Connect to all other networks
  209. nc := c.container.connectNetworkingConfig(c.backend)
  210. if nc != nil {
  211. for n, ep := range nc.EndpointsConfig {
  212. if err := c.backend.ConnectContainerToNetwork(cr.ID, n, ep); err != nil {
  213. return err
  214. }
  215. }
  216. }
  217. container := c.container.task.Spec.GetContainer()
  218. if container == nil {
  219. return errors.New("unable to get container from task spec")
  220. }
  221. if err := c.backend.SetContainerDependencyStore(cr.ID, c.dependencies); err != nil {
  222. return err
  223. }
  224. // configure secrets
  225. secretRefs := convert.SecretReferencesFromGRPC(container.Secrets)
  226. if err := c.backend.SetContainerSecretReferences(cr.ID, secretRefs); err != nil {
  227. return err
  228. }
  229. configRefs := convert.ConfigReferencesFromGRPC(container.Configs)
  230. if err := c.backend.SetContainerConfigReferences(cr.ID, configRefs); err != nil {
  231. return err
  232. }
  233. if err := c.backend.UpdateContainerServiceConfig(cr.ID, c.container.serviceConfig()); err != nil {
  234. return err
  235. }
  236. return nil
  237. }
  238. // checkMounts ensures that the provided mounts won't have any host-specific
  239. // problems at start up. For example, we disallow bind mounts without an
  240. // existing path, which slightly different from the container API.
  241. func (c *containerAdapter) checkMounts() error {
  242. spec := c.container.spec()
  243. for _, mount := range spec.Mounts {
  244. switch mount.Type {
  245. case api.MountTypeBind:
  246. if _, err := os.Stat(mount.Source); os.IsNotExist(err) {
  247. return fmt.Errorf("invalid bind mount source, source path not found: %s", mount.Source)
  248. }
  249. }
  250. }
  251. return nil
  252. }
  253. func (c *containerAdapter) start(ctx context.Context) error {
  254. if err := c.checkMounts(); err != nil {
  255. return err
  256. }
  257. return c.backend.ContainerStart(c.container.name(), nil, "", "")
  258. }
  259. func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) {
  260. cs, err := c.backend.ContainerInspectCurrent(c.container.name(), false)
  261. if ctx.Err() != nil {
  262. return types.ContainerJSON{}, ctx.Err()
  263. }
  264. if err != nil {
  265. return types.ContainerJSON{}, err
  266. }
  267. return *cs, nil
  268. }
  269. // events issues a call to the events API and returns a channel with all
  270. // events. The stream of events can be shutdown by cancelling the context.
  271. func (c *containerAdapter) events(ctx context.Context) <-chan events.Message {
  272. log.G(ctx).Debugf("waiting on events")
  273. buffer, l := c.backend.SubscribeToEvents(time.Time{}, time.Time{}, c.container.eventFilter())
  274. eventsq := make(chan events.Message, len(buffer))
  275. for _, event := range buffer {
  276. eventsq <- event
  277. }
  278. go func() {
  279. defer c.backend.UnsubscribeFromEvents(l)
  280. for {
  281. select {
  282. case ev := <-l:
  283. jev, ok := ev.(events.Message)
  284. if !ok {
  285. log.G(ctx).Warnf("unexpected event message: %q", ev)
  286. continue
  287. }
  288. select {
  289. case eventsq <- jev:
  290. case <-ctx.Done():
  291. return
  292. }
  293. case <-ctx.Done():
  294. return
  295. }
  296. }
  297. }()
  298. return eventsq
  299. }
  300. func (c *containerAdapter) wait(ctx context.Context) (<-chan containerpkg.StateStatus, error) {
  301. return c.backend.ContainerWait(ctx, c.container.nameOrID(), containerpkg.WaitConditionNotRunning)
  302. }
  303. func (c *containerAdapter) shutdown(ctx context.Context) error {
  304. // Default stop grace period to nil (daemon will use the stopTimeout of the container)
  305. var stopgrace *int
  306. spec := c.container.spec()
  307. if spec.StopGracePeriod != nil {
  308. stopgraceValue := int(spec.StopGracePeriod.Seconds)
  309. stopgrace = &stopgraceValue
  310. }
  311. return c.backend.ContainerStop(c.container.name(), stopgrace)
  312. }
  313. func (c *containerAdapter) terminate(ctx context.Context) error {
  314. return c.backend.ContainerKill(c.container.name(), uint64(syscall.SIGKILL))
  315. }
  316. func (c *containerAdapter) remove(ctx context.Context) error {
  317. return c.backend.ContainerRm(c.container.name(), &types.ContainerRmConfig{
  318. RemoveVolume: true,
  319. ForceRemove: true,
  320. })
  321. }
  322. func (c *containerAdapter) createVolumes(ctx context.Context) error {
  323. // Create plugin volumes that are embedded inside a Mount
  324. for _, mount := range c.container.task.Spec.GetContainer().Mounts {
  325. if mount.Type != api.MountTypeVolume {
  326. continue
  327. }
  328. if mount.VolumeOptions == nil {
  329. continue
  330. }
  331. if mount.VolumeOptions.DriverConfig == nil {
  332. continue
  333. }
  334. req := c.container.volumeCreateRequest(&mount)
  335. // Check if this volume exists on the engine
  336. if _, err := c.backend.VolumeCreate(req.Name, req.Driver, req.DriverOpts, req.Labels); err != nil {
  337. // TODO(amitshukla): Today, volume create through the engine api does not return an error
  338. // when the named volume with the same parameters already exists.
  339. // It returns an error if the driver name is different - that is a valid error
  340. return err
  341. }
  342. }
  343. return nil
  344. }
  345. func (c *containerAdapter) activateServiceBinding() error {
  346. return c.backend.ActivateContainerServiceBinding(c.container.name())
  347. }
  348. func (c *containerAdapter) deactivateServiceBinding() error {
  349. return c.backend.DeactivateContainerServiceBinding(c.container.name())
  350. }
  351. func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (<-chan *backend.LogMessage, error) {
  352. apiOptions := &types.ContainerLogsOptions{
  353. Follow: options.Follow,
  354. // Always say yes to Timestamps and Details. we make the decision
  355. // of whether to return these to the user or not way higher up the
  356. // stack.
  357. Timestamps: true,
  358. Details: true,
  359. }
  360. if options.Since != nil {
  361. since, err := gogotypes.TimestampFromProto(options.Since)
  362. if err != nil {
  363. return nil, err
  364. }
  365. // print since as this formatted string because the docker container
  366. // logs interface expects it like this.
  367. // see github.com/docker/docker/api/types/time.ParseTimestamps
  368. apiOptions.Since = fmt.Sprintf("%d.%09d", since.Unix(), int64(since.Nanosecond()))
  369. }
  370. if options.Tail < 0 {
  371. // See protobuf documentation for details of how this works.
  372. apiOptions.Tail = fmt.Sprint(-options.Tail - 1)
  373. } else if options.Tail > 0 {
  374. return nil, errors.New("tail relative to start of logs not supported via docker API")
  375. }
  376. if len(options.Streams) == 0 {
  377. // empty == all
  378. apiOptions.ShowStdout, apiOptions.ShowStderr = true, true
  379. } else {
  380. for _, stream := range options.Streams {
  381. switch stream {
  382. case api.LogStreamStdout:
  383. apiOptions.ShowStdout = true
  384. case api.LogStreamStderr:
  385. apiOptions.ShowStderr = true
  386. }
  387. }
  388. }
  389. msgs, _, err := c.backend.ContainerLogs(ctx, c.container.name(), apiOptions)
  390. if err != nil {
  391. return nil, err
  392. }
  393. return msgs, nil
  394. }
  395. // todo: typed/wrapped errors
  396. func isContainerCreateNameConflict(err error) bool {
  397. return strings.Contains(err.Error(), "Conflict. The name")
  398. }
  399. func isUnknownContainer(err error) bool {
  400. return strings.Contains(err.Error(), "No such container:")
  401. }
  402. func isStoppedContainer(err error) bool {
  403. return strings.Contains(err.Error(), "is already stopped")
  404. }