adapter.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. package container
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "strings"
  8. "syscall"
  9. "time"
  10. "github.com/Sirupsen/logrus"
  11. "github.com/docker/distribution/digest"
  12. "github.com/docker/docker/api/types"
  13. "github.com/docker/docker/api/types/backend"
  14. containertypes "github.com/docker/docker/api/types/container"
  15. "github.com/docker/docker/api/types/events"
  16. "github.com/docker/docker/daemon/cluster/convert"
  17. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  18. "github.com/docker/docker/reference"
  19. "github.com/docker/libnetwork"
  20. "github.com/docker/swarmkit/agent/exec"
  21. "github.com/docker/swarmkit/api"
  22. "github.com/docker/swarmkit/log"
  23. "github.com/docker/swarmkit/protobuf/ptypes"
  24. "golang.org/x/net/context"
  25. "golang.org/x/time/rate"
  26. )
  27. // containerAdapter conducts remote operations for a container. All calls
  28. // are mostly naked calls to the client API, seeded with information from
  29. // containerConfig.
  30. type containerAdapter struct {
  31. backend executorpkg.Backend
  32. container *containerConfig
  33. secrets exec.SecretGetter
  34. }
  35. func newContainerAdapter(b executorpkg.Backend, task *api.Task, secrets exec.SecretGetter) (*containerAdapter, error) {
  36. ctnr, err := newContainerConfig(task)
  37. if err != nil {
  38. return nil, err
  39. }
  40. return &containerAdapter{
  41. container: ctnr,
  42. backend: b,
  43. secrets: secrets,
  44. }, nil
  45. }
  46. func (c *containerAdapter) pullImage(ctx context.Context) error {
  47. spec := c.container.spec()
  48. // Skip pulling if the image is referenced by image ID.
  49. if _, err := digest.ParseDigest(spec.Image); err == nil {
  50. return nil
  51. }
  52. // Skip pulling if the image is referenced by digest and already
  53. // exists locally.
  54. named, err := reference.ParseNamed(spec.Image)
  55. if err == nil {
  56. if _, ok := named.(reference.Canonical); ok {
  57. _, err := c.backend.LookupImage(spec.Image)
  58. if err == nil {
  59. return nil
  60. }
  61. }
  62. }
  63. // if the image needs to be pulled, the auth config will be retrieved and updated
  64. var encodedAuthConfig string
  65. if spec.PullOptions != nil {
  66. encodedAuthConfig = spec.PullOptions.RegistryAuth
  67. }
  68. authConfig := &types.AuthConfig{}
  69. if encodedAuthConfig != "" {
  70. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuthConfig))).Decode(authConfig); err != nil {
  71. logrus.Warnf("invalid authconfig: %v", err)
  72. }
  73. }
  74. pr, pw := io.Pipe()
  75. metaHeaders := map[string][]string{}
  76. go func() {
  77. err := c.backend.PullImage(ctx, c.container.image(), "", metaHeaders, authConfig, pw)
  78. pw.CloseWithError(err)
  79. }()
  80. dec := json.NewDecoder(pr)
  81. dec.UseNumber()
  82. m := map[string]interface{}{}
  83. spamLimiter := rate.NewLimiter(rate.Every(time.Second), 1)
  84. lastStatus := ""
  85. for {
  86. if err := dec.Decode(&m); err != nil {
  87. if err == io.EOF {
  88. break
  89. }
  90. return err
  91. }
  92. l := log.G(ctx)
  93. // limit pull progress logs unless the status changes
  94. if spamLimiter.Allow() || lastStatus != m["status"] {
  95. // if we have progress details, we have everything we need
  96. if progress, ok := m["progressDetail"].(map[string]interface{}); ok {
  97. // first, log the image and status
  98. l = l.WithFields(logrus.Fields{
  99. "image": c.container.image(),
  100. "status": m["status"],
  101. })
  102. // then, if we have progress, log the progress
  103. if progress["current"] != nil && progress["total"] != nil {
  104. l = l.WithFields(logrus.Fields{
  105. "current": progress["current"],
  106. "total": progress["total"],
  107. })
  108. }
  109. }
  110. l.Debug("pull in progress")
  111. }
  112. // sometimes, we get no useful information at all, and add no fields
  113. if status, ok := m["status"].(string); ok {
  114. lastStatus = status
  115. }
  116. }
  117. // if the final stream object contained an error, return it
  118. if errMsg, ok := m["error"]; ok {
  119. return fmt.Errorf("%v", errMsg)
  120. }
  121. return nil
  122. }
  123. func (c *containerAdapter) createNetworks(ctx context.Context) error {
  124. for _, network := range c.container.networks() {
  125. ncr, err := c.container.networkCreateRequest(network)
  126. if err != nil {
  127. return err
  128. }
  129. if err := c.backend.CreateManagedNetwork(ncr); err != nil { // todo name missing
  130. if _, ok := err.(libnetwork.NetworkNameError); ok {
  131. continue
  132. }
  133. return err
  134. }
  135. }
  136. return nil
  137. }
  138. func (c *containerAdapter) removeNetworks(ctx context.Context) error {
  139. for _, nid := range c.container.networks() {
  140. if err := c.backend.DeleteManagedNetwork(nid); err != nil {
  141. switch err.(type) {
  142. case *libnetwork.ActiveEndpointsError:
  143. continue
  144. case libnetwork.ErrNoSuchNetwork:
  145. continue
  146. default:
  147. log.G(ctx).Errorf("network %s remove failed: %v", nid, err)
  148. return err
  149. }
  150. }
  151. }
  152. return nil
  153. }
  154. func (c *containerAdapter) networkAttach(ctx context.Context) error {
  155. config := c.container.createNetworkingConfig()
  156. var (
  157. networkName string
  158. networkID string
  159. )
  160. if config != nil {
  161. for n, epConfig := range config.EndpointsConfig {
  162. networkName = n
  163. networkID = epConfig.NetworkID
  164. break
  165. }
  166. }
  167. return c.backend.UpdateAttachment(networkName, networkID, c.container.id(), config)
  168. }
  169. func (c *containerAdapter) waitForDetach(ctx context.Context) error {
  170. config := c.container.createNetworkingConfig()
  171. var (
  172. networkName string
  173. networkID string
  174. )
  175. if config != nil {
  176. for n, epConfig := range config.EndpointsConfig {
  177. networkName = n
  178. networkID = epConfig.NetworkID
  179. break
  180. }
  181. }
  182. return c.backend.WaitForDetachment(ctx, networkName, networkID, c.container.taskID(), c.container.id())
  183. }
  184. func (c *containerAdapter) create(ctx context.Context) error {
  185. var cr containertypes.ContainerCreateCreatedBody
  186. var err error
  187. if cr, err = c.backend.CreateManagedContainer(types.ContainerCreateConfig{
  188. Name: c.container.name(),
  189. Config: c.container.config(),
  190. HostConfig: c.container.hostConfig(),
  191. // Use the first network in container create
  192. NetworkingConfig: c.container.createNetworkingConfig(),
  193. }); err != nil {
  194. return err
  195. }
  196. // Docker daemon currently doesn't support multiple networks in container create
  197. // Connect to all other networks
  198. nc := c.container.connectNetworkingConfig()
  199. if nc != nil {
  200. for n, ep := range nc.EndpointsConfig {
  201. if err := c.backend.ConnectContainerToNetwork(cr.ID, n, ep); err != nil {
  202. return err
  203. }
  204. }
  205. }
  206. container := c.container.task.Spec.GetContainer()
  207. if container == nil {
  208. return fmt.Errorf("unable to get container from task spec")
  209. }
  210. // configure secrets
  211. if err := c.backend.SetContainerSecretStore(cr.ID, c.secrets); err != nil {
  212. return err
  213. }
  214. refs := convert.SecretReferencesFromGRPC(container.Secrets)
  215. if err := c.backend.SetContainerSecretReferences(cr.ID, refs); err != nil {
  216. return err
  217. }
  218. if err := c.backend.UpdateContainerServiceConfig(cr.ID, c.container.serviceConfig()); err != nil {
  219. return err
  220. }
  221. return nil
  222. }
  223. func (c *containerAdapter) start(ctx context.Context) error {
  224. return c.backend.ContainerStart(c.container.name(), nil, "", "")
  225. }
  226. func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) {
  227. cs, err := c.backend.ContainerInspectCurrent(c.container.name(), false)
  228. if ctx.Err() != nil {
  229. return types.ContainerJSON{}, ctx.Err()
  230. }
  231. if err != nil {
  232. return types.ContainerJSON{}, err
  233. }
  234. return *cs, nil
  235. }
  236. // events issues a call to the events API and returns a channel with all
  237. // events. The stream of events can be shutdown by cancelling the context.
  238. func (c *containerAdapter) events(ctx context.Context) <-chan events.Message {
  239. log.G(ctx).Debugf("waiting on events")
  240. buffer, l := c.backend.SubscribeToEvents(time.Time{}, time.Time{}, c.container.eventFilter())
  241. eventsq := make(chan events.Message, len(buffer))
  242. for _, event := range buffer {
  243. eventsq <- event
  244. }
  245. go func() {
  246. defer c.backend.UnsubscribeFromEvents(l)
  247. for {
  248. select {
  249. case ev := <-l:
  250. jev, ok := ev.(events.Message)
  251. if !ok {
  252. log.G(ctx).Warnf("unexpected event message: %q", ev)
  253. continue
  254. }
  255. select {
  256. case eventsq <- jev:
  257. case <-ctx.Done():
  258. return
  259. }
  260. case <-ctx.Done():
  261. return
  262. }
  263. }
  264. }()
  265. return eventsq
  266. }
  267. func (c *containerAdapter) wait(ctx context.Context) error {
  268. return c.backend.ContainerWaitWithContext(ctx, c.container.nameOrID())
  269. }
  270. func (c *containerAdapter) shutdown(ctx context.Context) error {
  271. // Default stop grace period to nil (daemon will use the stopTimeout of the container)
  272. var stopgrace *int
  273. spec := c.container.spec()
  274. if spec.StopGracePeriod != nil {
  275. stopgraceValue := int(spec.StopGracePeriod.Seconds)
  276. stopgrace = &stopgraceValue
  277. }
  278. return c.backend.ContainerStop(c.container.name(), stopgrace)
  279. }
  280. func (c *containerAdapter) terminate(ctx context.Context) error {
  281. return c.backend.ContainerKill(c.container.name(), uint64(syscall.SIGKILL))
  282. }
  283. func (c *containerAdapter) remove(ctx context.Context) error {
  284. return c.backend.ContainerRm(c.container.name(), &types.ContainerRmConfig{
  285. RemoveVolume: true,
  286. ForceRemove: true,
  287. })
  288. }
  289. func (c *containerAdapter) createVolumes(ctx context.Context) error {
  290. // Create plugin volumes that are embedded inside a Mount
  291. for _, mount := range c.container.task.Spec.GetContainer().Mounts {
  292. if mount.Type != api.MountTypeVolume {
  293. continue
  294. }
  295. if mount.VolumeOptions == nil {
  296. continue
  297. }
  298. if mount.VolumeOptions.DriverConfig == nil {
  299. continue
  300. }
  301. req := c.container.volumeCreateRequest(&mount)
  302. // Check if this volume exists on the engine
  303. if _, err := c.backend.VolumeCreate(req.Name, req.Driver, req.DriverOpts, req.Labels); err != nil {
  304. // TODO(amitshukla): Today, volume create through the engine api does not return an error
  305. // when the named volume with the same parameters already exists.
  306. // It returns an error if the driver name is different - that is a valid error
  307. return err
  308. }
  309. }
  310. return nil
  311. }
  312. func (c *containerAdapter) activateServiceBinding() error {
  313. return c.backend.ActivateContainerServiceBinding(c.container.name())
  314. }
  315. func (c *containerAdapter) deactivateServiceBinding() error {
  316. return c.backend.DeactivateContainerServiceBinding(c.container.name())
  317. }
  318. func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (io.ReadCloser, error) {
  319. reader, writer := io.Pipe()
  320. apiOptions := &backend.ContainerLogsConfig{
  321. ContainerLogsOptions: types.ContainerLogsOptions{
  322. Follow: options.Follow,
  323. // TODO(stevvooe): Parse timestamp out of message. This
  324. // absolutely needs to be done before going to production with
  325. // this, at it is completely redundant.
  326. Timestamps: true,
  327. Details: false, // no clue what to do with this, let's just deprecate it.
  328. },
  329. OutStream: writer,
  330. }
  331. if options.Since != nil {
  332. since, err := ptypes.Timestamp(options.Since)
  333. if err != nil {
  334. return nil, err
  335. }
  336. apiOptions.Since = since.Format(time.RFC3339Nano)
  337. }
  338. if options.Tail < 0 {
  339. // See protobuf documentation for details of how this works.
  340. apiOptions.Tail = fmt.Sprint(-options.Tail - 1)
  341. } else if options.Tail > 0 {
  342. return nil, fmt.Errorf("tail relative to start of logs not supported via docker API")
  343. }
  344. if len(options.Streams) == 0 {
  345. // empty == all
  346. apiOptions.ShowStdout, apiOptions.ShowStderr = true, true
  347. } else {
  348. for _, stream := range options.Streams {
  349. switch stream {
  350. case api.LogStreamStdout:
  351. apiOptions.ShowStdout = true
  352. case api.LogStreamStderr:
  353. apiOptions.ShowStderr = true
  354. }
  355. }
  356. }
  357. chStarted := make(chan struct{})
  358. go func() {
  359. defer writer.Close()
  360. c.backend.ContainerLogs(ctx, c.container.name(), apiOptions, chStarted)
  361. }()
  362. return reader, nil
  363. }
  364. // todo: typed/wrapped errors
  365. func isContainerCreateNameConflict(err error) bool {
  366. return strings.Contains(err.Error(), "Conflict. The name")
  367. }
  368. func isUnknownContainer(err error) bool {
  369. return strings.Contains(err.Error(), "No such container:")
  370. }
  371. func isStoppedContainer(err error) bool {
  372. return strings.Contains(err.Error(), "is already stopped")
  373. }