adapter.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. package container
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "os"
  9. "strings"
  10. "syscall"
  11. "time"
  12. "github.com/Sirupsen/logrus"
  13. "github.com/docker/distribution/reference"
  14. "github.com/docker/docker/api/types"
  15. "github.com/docker/docker/api/types/backend"
  16. containertypes "github.com/docker/docker/api/types/container"
  17. "github.com/docker/docker/api/types/events"
  18. "github.com/docker/docker/daemon/cluster/convert"
  19. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  20. "github.com/docker/libnetwork"
  21. "github.com/docker/swarmkit/agent/exec"
  22. "github.com/docker/swarmkit/api"
  23. "github.com/docker/swarmkit/log"
  24. gogotypes "github.com/gogo/protobuf/types"
  25. "github.com/opencontainers/go-digest"
  26. "golang.org/x/net/context"
  27. "golang.org/x/time/rate"
  28. )
  29. // containerAdapter conducts remote operations for a container. All calls
  30. // are mostly naked calls to the client API, seeded with information from
  31. // containerConfig.
  32. type containerAdapter struct {
  33. backend executorpkg.Backend
  34. container *containerConfig
  35. secrets exec.SecretGetter
  36. }
  37. func newContainerAdapter(b executorpkg.Backend, task *api.Task, secrets exec.SecretGetter) (*containerAdapter, error) {
  38. ctnr, err := newContainerConfig(task)
  39. if err != nil {
  40. return nil, err
  41. }
  42. return &containerAdapter{
  43. container: ctnr,
  44. backend: b,
  45. secrets: secrets,
  46. }, nil
  47. }
  48. func (c *containerAdapter) pullImage(ctx context.Context) error {
  49. spec := c.container.spec()
  50. // Skip pulling if the image is referenced by image ID.
  51. if _, err := digest.Parse(spec.Image); err == nil {
  52. return nil
  53. }
  54. // Skip pulling if the image is referenced by digest and already
  55. // exists locally.
  56. named, err := reference.ParseNormalizedNamed(spec.Image)
  57. if err == nil {
  58. if _, ok := named.(reference.Canonical); ok {
  59. _, err := c.backend.LookupImage(spec.Image)
  60. if err == nil {
  61. return nil
  62. }
  63. }
  64. }
  65. // if the image needs to be pulled, the auth config will be retrieved and updated
  66. var encodedAuthConfig string
  67. if spec.PullOptions != nil {
  68. encodedAuthConfig = spec.PullOptions.RegistryAuth
  69. }
  70. authConfig := &types.AuthConfig{}
  71. if encodedAuthConfig != "" {
  72. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuthConfig))).Decode(authConfig); err != nil {
  73. logrus.Warnf("invalid authconfig: %v", err)
  74. }
  75. }
  76. pr, pw := io.Pipe()
  77. metaHeaders := map[string][]string{}
  78. go func() {
  79. err := c.backend.PullImage(ctx, c.container.image(), "", metaHeaders, authConfig, pw)
  80. pw.CloseWithError(err)
  81. }()
  82. dec := json.NewDecoder(pr)
  83. dec.UseNumber()
  84. m := map[string]interface{}{}
  85. spamLimiter := rate.NewLimiter(rate.Every(time.Second), 1)
  86. lastStatus := ""
  87. for {
  88. if err := dec.Decode(&m); err != nil {
  89. if err == io.EOF {
  90. break
  91. }
  92. return err
  93. }
  94. l := log.G(ctx)
  95. // limit pull progress logs unless the status changes
  96. if spamLimiter.Allow() || lastStatus != m["status"] {
  97. // if we have progress details, we have everything we need
  98. if progress, ok := m["progressDetail"].(map[string]interface{}); ok {
  99. // first, log the image and status
  100. l = l.WithFields(logrus.Fields{
  101. "image": c.container.image(),
  102. "status": m["status"],
  103. })
  104. // then, if we have progress, log the progress
  105. if progress["current"] != nil && progress["total"] != nil {
  106. l = l.WithFields(logrus.Fields{
  107. "current": progress["current"],
  108. "total": progress["total"],
  109. })
  110. }
  111. }
  112. l.Debug("pull in progress")
  113. }
  114. // sometimes, we get no useful information at all, and add no fields
  115. if status, ok := m["status"].(string); ok {
  116. lastStatus = status
  117. }
  118. }
  119. // if the final stream object contained an error, return it
  120. if errMsg, ok := m["error"]; ok {
  121. return fmt.Errorf("%v", errMsg)
  122. }
  123. return nil
  124. }
  125. func (c *containerAdapter) createNetworks(ctx context.Context) error {
  126. for _, network := range c.container.networks() {
  127. ncr, err := c.container.networkCreateRequest(network)
  128. if err != nil {
  129. return err
  130. }
  131. if err := c.backend.CreateManagedNetwork(ncr); err != nil { // todo name missing
  132. if _, ok := err.(libnetwork.NetworkNameError); ok {
  133. continue
  134. }
  135. return err
  136. }
  137. }
  138. return nil
  139. }
  140. func (c *containerAdapter) removeNetworks(ctx context.Context) error {
  141. for _, nid := range c.container.networks() {
  142. if err := c.backend.DeleteManagedNetwork(nid); err != nil {
  143. switch err.(type) {
  144. case *libnetwork.ActiveEndpointsError:
  145. continue
  146. case libnetwork.ErrNoSuchNetwork:
  147. continue
  148. default:
  149. log.G(ctx).Errorf("network %s remove failed: %v", nid, err)
  150. return err
  151. }
  152. }
  153. }
  154. return nil
  155. }
  156. func (c *containerAdapter) networkAttach(ctx context.Context) error {
  157. config := c.container.createNetworkingConfig()
  158. var (
  159. networkName string
  160. networkID string
  161. )
  162. if config != nil {
  163. for n, epConfig := range config.EndpointsConfig {
  164. networkName = n
  165. networkID = epConfig.NetworkID
  166. break
  167. }
  168. }
  169. return c.backend.UpdateAttachment(networkName, networkID, c.container.id(), config)
  170. }
  171. func (c *containerAdapter) waitForDetach(ctx context.Context) error {
  172. config := c.container.createNetworkingConfig()
  173. var (
  174. networkName string
  175. networkID string
  176. )
  177. if config != nil {
  178. for n, epConfig := range config.EndpointsConfig {
  179. networkName = n
  180. networkID = epConfig.NetworkID
  181. break
  182. }
  183. }
  184. return c.backend.WaitForDetachment(ctx, networkName, networkID, c.container.taskID(), c.container.id())
  185. }
  186. func (c *containerAdapter) create(ctx context.Context) error {
  187. var cr containertypes.ContainerCreateCreatedBody
  188. var err error
  189. if cr, err = c.backend.CreateManagedContainer(types.ContainerCreateConfig{
  190. Name: c.container.name(),
  191. Config: c.container.config(),
  192. HostConfig: c.container.hostConfig(),
  193. // Use the first network in container create
  194. NetworkingConfig: c.container.createNetworkingConfig(),
  195. }); err != nil {
  196. return err
  197. }
  198. // Docker daemon currently doesn't support multiple networks in container create
  199. // Connect to all other networks
  200. nc := c.container.connectNetworkingConfig()
  201. if nc != nil {
  202. for n, ep := range nc.EndpointsConfig {
  203. if err := c.backend.ConnectContainerToNetwork(cr.ID, n, ep); err != nil {
  204. return err
  205. }
  206. }
  207. }
  208. container := c.container.task.Spec.GetContainer()
  209. if container == nil {
  210. return errors.New("unable to get container from task spec")
  211. }
  212. // configure secrets
  213. if err := c.backend.SetContainerSecretStore(cr.ID, c.secrets); err != nil {
  214. return err
  215. }
  216. refs := convert.SecretReferencesFromGRPC(container.Secrets)
  217. if err := c.backend.SetContainerSecretReferences(cr.ID, refs); err != nil {
  218. return err
  219. }
  220. if err := c.backend.UpdateContainerServiceConfig(cr.ID, c.container.serviceConfig()); err != nil {
  221. return err
  222. }
  223. return nil
  224. }
  225. // checkMounts ensures that the provided mounts won't have any host-specific
  226. // problems at start up. For example, we disallow bind mounts without an
  227. // existing path, which slightly different from the container API.
  228. func (c *containerAdapter) checkMounts() error {
  229. spec := c.container.spec()
  230. for _, mount := range spec.Mounts {
  231. switch mount.Type {
  232. case api.MountTypeBind:
  233. if _, err := os.Stat(mount.Source); os.IsNotExist(err) {
  234. return fmt.Errorf("invalid bind mount source, source path not found: %s", mount.Source)
  235. }
  236. }
  237. }
  238. return nil
  239. }
  240. func (c *containerAdapter) start(ctx context.Context) error {
  241. if err := c.checkMounts(); err != nil {
  242. return err
  243. }
  244. return c.backend.ContainerStart(c.container.name(), nil, "", "")
  245. }
  246. func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) {
  247. cs, err := c.backend.ContainerInspectCurrent(c.container.name(), false)
  248. if ctx.Err() != nil {
  249. return types.ContainerJSON{}, ctx.Err()
  250. }
  251. if err != nil {
  252. return types.ContainerJSON{}, err
  253. }
  254. return *cs, nil
  255. }
  256. // events issues a call to the events API and returns a channel with all
  257. // events. The stream of events can be shutdown by cancelling the context.
  258. func (c *containerAdapter) events(ctx context.Context) <-chan events.Message {
  259. log.G(ctx).Debugf("waiting on events")
  260. buffer, l := c.backend.SubscribeToEvents(time.Time{}, time.Time{}, c.container.eventFilter())
  261. eventsq := make(chan events.Message, len(buffer))
  262. for _, event := range buffer {
  263. eventsq <- event
  264. }
  265. go func() {
  266. defer c.backend.UnsubscribeFromEvents(l)
  267. for {
  268. select {
  269. case ev := <-l:
  270. jev, ok := ev.(events.Message)
  271. if !ok {
  272. log.G(ctx).Warnf("unexpected event message: %q", ev)
  273. continue
  274. }
  275. select {
  276. case eventsq <- jev:
  277. case <-ctx.Done():
  278. return
  279. }
  280. case <-ctx.Done():
  281. return
  282. }
  283. }
  284. }()
  285. return eventsq
  286. }
  287. func (c *containerAdapter) wait(ctx context.Context) error {
  288. return c.backend.ContainerWaitWithContext(ctx, c.container.nameOrID())
  289. }
  290. func (c *containerAdapter) shutdown(ctx context.Context) error {
  291. // Default stop grace period to nil (daemon will use the stopTimeout of the container)
  292. var stopgrace *int
  293. spec := c.container.spec()
  294. if spec.StopGracePeriod != nil {
  295. stopgraceValue := int(spec.StopGracePeriod.Seconds)
  296. stopgrace = &stopgraceValue
  297. }
  298. return c.backend.ContainerStop(c.container.name(), stopgrace)
  299. }
  300. func (c *containerAdapter) terminate(ctx context.Context) error {
  301. return c.backend.ContainerKill(c.container.name(), uint64(syscall.SIGKILL))
  302. }
  303. func (c *containerAdapter) remove(ctx context.Context) error {
  304. return c.backend.ContainerRm(c.container.name(), &types.ContainerRmConfig{
  305. RemoveVolume: true,
  306. ForceRemove: true,
  307. })
  308. }
  309. func (c *containerAdapter) createVolumes(ctx context.Context) error {
  310. // Create plugin volumes that are embedded inside a Mount
  311. for _, mount := range c.container.task.Spec.GetContainer().Mounts {
  312. if mount.Type != api.MountTypeVolume {
  313. continue
  314. }
  315. if mount.VolumeOptions == nil {
  316. continue
  317. }
  318. if mount.VolumeOptions.DriverConfig == nil {
  319. continue
  320. }
  321. req := c.container.volumeCreateRequest(&mount)
  322. // Check if this volume exists on the engine
  323. if _, err := c.backend.VolumeCreate(req.Name, req.Driver, req.DriverOpts, req.Labels); err != nil {
  324. // TODO(amitshukla): Today, volume create through the engine api does not return an error
  325. // when the named volume with the same parameters already exists.
  326. // It returns an error if the driver name is different - that is a valid error
  327. return err
  328. }
  329. }
  330. return nil
  331. }
  332. func (c *containerAdapter) activateServiceBinding() error {
  333. return c.backend.ActivateContainerServiceBinding(c.container.name())
  334. }
  335. func (c *containerAdapter) deactivateServiceBinding() error {
  336. return c.backend.DeactivateContainerServiceBinding(c.container.name())
  337. }
  338. func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (io.ReadCloser, error) {
  339. reader, writer := io.Pipe()
  340. apiOptions := &backend.ContainerLogsConfig{
  341. ContainerLogsOptions: types.ContainerLogsOptions{
  342. Follow: options.Follow,
  343. // TODO(stevvooe): Parse timestamp out of message. This
  344. // absolutely needs to be done before going to production with
  345. // this, at it is completely redundant.
  346. Timestamps: true,
  347. Details: false, // no clue what to do with this, let's just deprecate it.
  348. },
  349. OutStream: writer,
  350. }
  351. if options.Since != nil {
  352. since, err := gogotypes.TimestampFromProto(options.Since)
  353. if err != nil {
  354. return nil, err
  355. }
  356. apiOptions.Since = since.Format(time.RFC3339Nano)
  357. }
  358. if options.Tail < 0 {
  359. // See protobuf documentation for details of how this works.
  360. apiOptions.Tail = fmt.Sprint(-options.Tail - 1)
  361. } else if options.Tail > 0 {
  362. return nil, errors.New("tail relative to start of logs not supported via docker API")
  363. }
  364. if len(options.Streams) == 0 {
  365. // empty == all
  366. apiOptions.ShowStdout, apiOptions.ShowStderr = true, true
  367. } else {
  368. for _, stream := range options.Streams {
  369. switch stream {
  370. case api.LogStreamStdout:
  371. apiOptions.ShowStdout = true
  372. case api.LogStreamStderr:
  373. apiOptions.ShowStderr = true
  374. }
  375. }
  376. }
  377. chStarted := make(chan struct{})
  378. go func() {
  379. defer writer.Close()
  380. c.backend.ContainerLogs(ctx, c.container.name(), apiOptions, chStarted)
  381. }()
  382. return reader, nil
  383. }
  384. // todo: typed/wrapped errors
  385. func isContainerCreateNameConflict(err error) bool {
  386. return strings.Contains(err.Error(), "Conflict. The name")
  387. }
  388. func isUnknownContainer(err error) bool {
  389. return strings.Contains(err.Error(), "No such container:")
  390. }
  391. func isStoppedContainer(err error) bool {
  392. return strings.Contains(err.Error(), "is already stopped")
  393. }