adapter.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. package container
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "strings"
  8. "syscall"
  9. "time"
  10. "github.com/Sirupsen/logrus"
  11. "github.com/docker/docker/api/server/httputils"
  12. "github.com/docker/docker/api/types"
  13. "github.com/docker/docker/api/types/backend"
  14. containertypes "github.com/docker/docker/api/types/container"
  15. "github.com/docker/docker/api/types/events"
  16. "github.com/docker/docker/api/types/versions"
  17. "github.com/docker/docker/daemon/cluster/convert"
  18. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  19. "github.com/docker/docker/reference"
  20. "github.com/docker/libnetwork"
  21. "github.com/docker/swarmkit/agent/exec"
  22. "github.com/docker/swarmkit/api"
  23. "github.com/docker/swarmkit/log"
  24. "github.com/docker/swarmkit/protobuf/ptypes"
  25. "golang.org/x/net/context"
  26. "golang.org/x/time/rate"
  27. )
  28. // containerAdapter conducts remote operations for a container. All calls
  29. // are mostly naked calls to the client API, seeded with information from
  30. // containerConfig.
  31. type containerAdapter struct {
  32. backend executorpkg.Backend
  33. container *containerConfig
  34. secrets exec.SecretGetter
  35. }
  36. func newContainerAdapter(b executorpkg.Backend, task *api.Task, secrets exec.SecretGetter) (*containerAdapter, error) {
  37. ctnr, err := newContainerConfig(task)
  38. if err != nil {
  39. return nil, err
  40. }
  41. return &containerAdapter{
  42. container: ctnr,
  43. backend: b,
  44. secrets: secrets,
  45. }, nil
  46. }
  47. func (c *containerAdapter) pullImage(ctx context.Context) error {
  48. spec := c.container.spec()
  49. // Skip pulling if the image is referenced by digest and already
  50. // exists locally.
  51. named, err := reference.ParseNamed(spec.Image)
  52. if err == nil {
  53. if _, ok := named.(reference.Canonical); ok {
  54. _, err := c.backend.LookupImage(spec.Image)
  55. if err == nil {
  56. return nil
  57. }
  58. }
  59. }
  60. // if the image needs to be pulled, the auth config will be retrieved and updated
  61. var encodedAuthConfig string
  62. if spec.PullOptions != nil {
  63. encodedAuthConfig = spec.PullOptions.RegistryAuth
  64. }
  65. authConfig := &types.AuthConfig{}
  66. if encodedAuthConfig != "" {
  67. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuthConfig))).Decode(authConfig); err != nil {
  68. logrus.Warnf("invalid authconfig: %v", err)
  69. }
  70. }
  71. pr, pw := io.Pipe()
  72. metaHeaders := map[string][]string{}
  73. go func() {
  74. err := c.backend.PullImage(ctx, c.container.image(), "", metaHeaders, authConfig, pw)
  75. pw.CloseWithError(err)
  76. }()
  77. dec := json.NewDecoder(pr)
  78. dec.UseNumber()
  79. m := map[string]interface{}{}
  80. spamLimiter := rate.NewLimiter(rate.Every(time.Second), 1)
  81. lastStatus := ""
  82. for {
  83. if err := dec.Decode(&m); err != nil {
  84. if err == io.EOF {
  85. break
  86. }
  87. return err
  88. }
  89. l := log.G(ctx)
  90. // limit pull progress logs unless the status changes
  91. if spamLimiter.Allow() || lastStatus != m["status"] {
  92. // if we have progress details, we have everything we need
  93. if progress, ok := m["progressDetail"].(map[string]interface{}); ok {
  94. // first, log the image and status
  95. l = l.WithFields(logrus.Fields{
  96. "image": c.container.image(),
  97. "status": m["status"],
  98. })
  99. // then, if we have progress, log the progress
  100. if progress["current"] != nil && progress["total"] != nil {
  101. l = l.WithFields(logrus.Fields{
  102. "current": progress["current"],
  103. "total": progress["total"],
  104. })
  105. }
  106. }
  107. l.Debug("pull in progress")
  108. }
  109. // sometimes, we get no useful information at all, and add no fields
  110. if status, ok := m["status"].(string); ok {
  111. lastStatus = status
  112. }
  113. }
  114. // if the final stream object contained an error, return it
  115. if errMsg, ok := m["error"]; ok {
  116. return fmt.Errorf("%v", errMsg)
  117. }
  118. return nil
  119. }
  120. func (c *containerAdapter) createNetworks(ctx context.Context) error {
  121. for _, network := range c.container.networks() {
  122. ncr, err := c.container.networkCreateRequest(network)
  123. if err != nil {
  124. return err
  125. }
  126. if err := c.backend.CreateManagedNetwork(ncr); err != nil { // todo name missing
  127. if _, ok := err.(libnetwork.NetworkNameError); ok {
  128. continue
  129. }
  130. return err
  131. }
  132. }
  133. return nil
  134. }
  135. func (c *containerAdapter) removeNetworks(ctx context.Context) error {
  136. for _, nid := range c.container.networks() {
  137. if err := c.backend.DeleteManagedNetwork(nid); err != nil {
  138. switch err.(type) {
  139. case *libnetwork.ActiveEndpointsError:
  140. continue
  141. case libnetwork.ErrNoSuchNetwork:
  142. continue
  143. default:
  144. log.G(ctx).Errorf("network %s remove failed: %v", nid, err)
  145. return err
  146. }
  147. }
  148. }
  149. return nil
  150. }
  151. func (c *containerAdapter) networkAttach(ctx context.Context) error {
  152. config := c.container.createNetworkingConfig()
  153. var (
  154. networkName string
  155. networkID string
  156. )
  157. if config != nil {
  158. for n, epConfig := range config.EndpointsConfig {
  159. networkName = n
  160. networkID = epConfig.NetworkID
  161. break
  162. }
  163. }
  164. return c.backend.UpdateAttachment(networkName, networkID, c.container.id(), config)
  165. }
  166. func (c *containerAdapter) waitForDetach(ctx context.Context) error {
  167. config := c.container.createNetworkingConfig()
  168. var (
  169. networkName string
  170. networkID string
  171. )
  172. if config != nil {
  173. for n, epConfig := range config.EndpointsConfig {
  174. networkName = n
  175. networkID = epConfig.NetworkID
  176. break
  177. }
  178. }
  179. return c.backend.WaitForDetachment(ctx, networkName, networkID, c.container.taskID(), c.container.id())
  180. }
  181. func (c *containerAdapter) create(ctx context.Context) error {
  182. var cr containertypes.ContainerCreateCreatedBody
  183. var err error
  184. version := httputils.VersionFromContext(ctx)
  185. validateHostname := versions.GreaterThanOrEqualTo(version, "1.24")
  186. if cr, err = c.backend.CreateManagedContainer(types.ContainerCreateConfig{
  187. Name: c.container.name(),
  188. Config: c.container.config(),
  189. HostConfig: c.container.hostConfig(),
  190. // Use the first network in container create
  191. NetworkingConfig: c.container.createNetworkingConfig(),
  192. }, validateHostname); err != nil {
  193. return err
  194. }
  195. // Docker daemon currently doesn't support multiple networks in container create
  196. // Connect to all other networks
  197. nc := c.container.connectNetworkingConfig()
  198. if nc != nil {
  199. for n, ep := range nc.EndpointsConfig {
  200. if err := c.backend.ConnectContainerToNetwork(cr.ID, n, ep); err != nil {
  201. return err
  202. }
  203. }
  204. }
  205. container := c.container.task.Spec.GetContainer()
  206. if container == nil {
  207. return fmt.Errorf("unable to get container from task spec")
  208. }
  209. // configure secrets
  210. if err := c.backend.SetContainerSecretStore(cr.ID, c.secrets); err != nil {
  211. return err
  212. }
  213. refs := convert.SecretReferencesFromGRPC(container.Secrets)
  214. if err := c.backend.SetContainerSecretReferences(cr.ID, refs); err != nil {
  215. return err
  216. }
  217. if err := c.backend.UpdateContainerServiceConfig(cr.ID, c.container.serviceConfig()); err != nil {
  218. return err
  219. }
  220. return nil
  221. }
  222. func (c *containerAdapter) start(ctx context.Context) error {
  223. version := httputils.VersionFromContext(ctx)
  224. validateHostname := versions.GreaterThanOrEqualTo(version, "1.24")
  225. return c.backend.ContainerStart(c.container.name(), nil, validateHostname, "", "")
  226. }
  227. func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) {
  228. cs, err := c.backend.ContainerInspectCurrent(c.container.name(), false)
  229. if ctx.Err() != nil {
  230. return types.ContainerJSON{}, ctx.Err()
  231. }
  232. if err != nil {
  233. return types.ContainerJSON{}, err
  234. }
  235. return *cs, nil
  236. }
  237. // events issues a call to the events API and returns a channel with all
  238. // events. The stream of events can be shutdown by cancelling the context.
  239. func (c *containerAdapter) events(ctx context.Context) <-chan events.Message {
  240. log.G(ctx).Debugf("waiting on events")
  241. buffer, l := c.backend.SubscribeToEvents(time.Time{}, time.Time{}, c.container.eventFilter())
  242. eventsq := make(chan events.Message, len(buffer))
  243. for _, event := range buffer {
  244. eventsq <- event
  245. }
  246. go func() {
  247. defer c.backend.UnsubscribeFromEvents(l)
  248. for {
  249. select {
  250. case ev := <-l:
  251. jev, ok := ev.(events.Message)
  252. if !ok {
  253. log.G(ctx).Warnf("unexpected event message: %q", ev)
  254. continue
  255. }
  256. select {
  257. case eventsq <- jev:
  258. case <-ctx.Done():
  259. return
  260. }
  261. case <-ctx.Done():
  262. return
  263. }
  264. }
  265. }()
  266. return eventsq
  267. }
  268. func (c *containerAdapter) wait(ctx context.Context) error {
  269. return c.backend.ContainerWaitWithContext(ctx, c.container.nameOrID())
  270. }
  271. func (c *containerAdapter) shutdown(ctx context.Context) error {
  272. // Default stop grace period to nil (daemon will use the stopTimeout of the container)
  273. var stopgrace *int
  274. spec := c.container.spec()
  275. if spec.StopGracePeriod != nil {
  276. stopgraceValue := int(spec.StopGracePeriod.Seconds)
  277. stopgrace = &stopgraceValue
  278. }
  279. return c.backend.ContainerStop(c.container.name(), stopgrace)
  280. }
  281. func (c *containerAdapter) terminate(ctx context.Context) error {
  282. return c.backend.ContainerKill(c.container.name(), uint64(syscall.SIGKILL))
  283. }
  284. func (c *containerAdapter) remove(ctx context.Context) error {
  285. return c.backend.ContainerRm(c.container.name(), &types.ContainerRmConfig{
  286. RemoveVolume: true,
  287. ForceRemove: true,
  288. })
  289. }
  290. func (c *containerAdapter) createVolumes(ctx context.Context) error {
  291. // Create plugin volumes that are embedded inside a Mount
  292. for _, mount := range c.container.task.Spec.GetContainer().Mounts {
  293. if mount.Type != api.MountTypeVolume {
  294. continue
  295. }
  296. if mount.VolumeOptions == nil {
  297. continue
  298. }
  299. if mount.VolumeOptions.DriverConfig == nil {
  300. continue
  301. }
  302. req := c.container.volumeCreateRequest(&mount)
  303. // Check if this volume exists on the engine
  304. if _, err := c.backend.VolumeCreate(req.Name, req.Driver, req.DriverOpts, req.Labels); err != nil {
  305. // TODO(amitshukla): Today, volume create through the engine api does not return an error
  306. // when the named volume with the same parameters already exists.
  307. // It returns an error if the driver name is different - that is a valid error
  308. return err
  309. }
  310. }
  311. return nil
  312. }
  313. func (c *containerAdapter) activateServiceBinding() error {
  314. return c.backend.ActivateContainerServiceBinding(c.container.name())
  315. }
  316. func (c *containerAdapter) deactivateServiceBinding() error {
  317. return c.backend.DeactivateContainerServiceBinding(c.container.name())
  318. }
  319. func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (io.ReadCloser, error) {
  320. reader, writer := io.Pipe()
  321. apiOptions := &backend.ContainerLogsConfig{
  322. ContainerLogsOptions: types.ContainerLogsOptions{
  323. Follow: options.Follow,
  324. // TODO(stevvooe): Parse timestamp out of message. This
  325. // absolutely needs to be done before going to production with
  326. // this, at it is completely redundant.
  327. Timestamps: true,
  328. Details: false, // no clue what to do with this, let's just deprecate it.
  329. },
  330. OutStream: writer,
  331. }
  332. if options.Since != nil {
  333. since, err := ptypes.Timestamp(options.Since)
  334. if err != nil {
  335. return nil, err
  336. }
  337. apiOptions.Since = since.Format(time.RFC3339Nano)
  338. }
  339. if options.Tail < 0 {
  340. // See protobuf documentation for details of how this works.
  341. apiOptions.Tail = fmt.Sprint(-options.Tail - 1)
  342. } else if options.Tail > 0 {
  343. return nil, fmt.Errorf("tail relative to start of logs not supported via docker API")
  344. }
  345. if len(options.Streams) == 0 {
  346. // empty == all
  347. apiOptions.ShowStdout, apiOptions.ShowStderr = true, true
  348. } else {
  349. for _, stream := range options.Streams {
  350. switch stream {
  351. case api.LogStreamStdout:
  352. apiOptions.ShowStdout = true
  353. case api.LogStreamStderr:
  354. apiOptions.ShowStderr = true
  355. }
  356. }
  357. }
  358. chStarted := make(chan struct{})
  359. go c.backend.ContainerLogs(ctx, c.container.name(), apiOptions, chStarted)
  360. return reader, nil
  361. }
  362. // todo: typed/wrapped errors
  363. func isContainerCreateNameConflict(err error) bool {
  364. return strings.Contains(err.Error(), "Conflict. The name")
  365. }
  366. func isUnknownContainer(err error) bool {
  367. return strings.Contains(err.Error(), "No such container:")
  368. }
  369. func isStoppedContainer(err error) bool {
  370. return strings.Contains(err.Error(), "is already stopped")
  371. }