adapter.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. package container
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "strings"
  8. "syscall"
  9. "time"
  10. "github.com/Sirupsen/logrus"
  11. "github.com/docker/docker/api/server/httputils"
  12. "github.com/docker/docker/api/types"
  13. "github.com/docker/docker/api/types/events"
  14. "github.com/docker/docker/api/types/versions"
  15. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  16. "github.com/docker/libnetwork"
  17. "github.com/docker/swarmkit/api"
  18. "github.com/docker/swarmkit/log"
  19. "golang.org/x/net/context"
  20. "golang.org/x/time/rate"
  21. )
  22. // containerAdapter conducts remote operations for a container. All calls
  23. // are mostly naked calls to the client API, seeded with information from
  24. // containerConfig.
  25. type containerAdapter struct {
  26. backend executorpkg.Backend
  27. container *containerConfig
  28. }
  29. func newContainerAdapter(b executorpkg.Backend, task *api.Task) (*containerAdapter, error) {
  30. ctnr, err := newContainerConfig(task)
  31. if err != nil {
  32. return nil, err
  33. }
  34. return &containerAdapter{
  35. container: ctnr,
  36. backend: b,
  37. }, nil
  38. }
  39. func (c *containerAdapter) pullImage(ctx context.Context) error {
  40. spec := c.container.spec()
  41. // if the image needs to be pulled, the auth config will be retrieved and updated
  42. var encodedAuthConfig string
  43. if spec.PullOptions != nil {
  44. encodedAuthConfig = spec.PullOptions.RegistryAuth
  45. }
  46. authConfig := &types.AuthConfig{}
  47. if encodedAuthConfig != "" {
  48. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuthConfig))).Decode(authConfig); err != nil {
  49. logrus.Warnf("invalid authconfig: %v", err)
  50. }
  51. }
  52. pr, pw := io.Pipe()
  53. metaHeaders := map[string][]string{}
  54. go func() {
  55. err := c.backend.PullImage(ctx, c.container.image(), "", metaHeaders, authConfig, pw)
  56. pw.CloseWithError(err)
  57. }()
  58. dec := json.NewDecoder(pr)
  59. dec.UseNumber()
  60. m := map[string]interface{}{}
  61. spamLimiter := rate.NewLimiter(rate.Every(time.Second), 1)
  62. lastStatus := ""
  63. for {
  64. if err := dec.Decode(&m); err != nil {
  65. if err == io.EOF {
  66. break
  67. }
  68. return err
  69. }
  70. l := log.G(ctx)
  71. // limit pull progress logs unless the status changes
  72. if spamLimiter.Allow() || lastStatus != m["status"] {
  73. // if we have progress details, we have everything we need
  74. if progress, ok := m["progressDetail"].(map[string]interface{}); ok {
  75. // first, log the image and status
  76. l = l.WithFields(logrus.Fields{
  77. "image": c.container.image(),
  78. "status": m["status"],
  79. })
  80. // then, if we have progress, log the progress
  81. if progress["current"] != nil && progress["total"] != nil {
  82. l = l.WithFields(logrus.Fields{
  83. "current": progress["current"],
  84. "total": progress["total"],
  85. })
  86. }
  87. }
  88. l.Debug("pull in progress")
  89. }
  90. // sometimes, we get no useful information at all, and add no fields
  91. if status, ok := m["status"].(string); ok {
  92. lastStatus = status
  93. }
  94. }
  95. // if the final stream object contained an error, return it
  96. if errMsg, ok := m["error"]; ok {
  97. return fmt.Errorf("%v", errMsg)
  98. }
  99. return nil
  100. }
  101. func (c *containerAdapter) createNetworks(ctx context.Context) error {
  102. for _, network := range c.container.networks() {
  103. ncr, err := c.container.networkCreateRequest(network)
  104. if err != nil {
  105. return err
  106. }
  107. if err := c.backend.CreateManagedNetwork(ncr); err != nil { // todo name missing
  108. if _, ok := err.(libnetwork.NetworkNameError); ok {
  109. continue
  110. }
  111. return err
  112. }
  113. }
  114. return nil
  115. }
  116. func (c *containerAdapter) removeNetworks(ctx context.Context) error {
  117. for _, nid := range c.container.networks() {
  118. if err := c.backend.DeleteManagedNetwork(nid); err != nil {
  119. switch err.(type) {
  120. case *libnetwork.ActiveEndpointsError:
  121. continue
  122. case libnetwork.ErrNoSuchNetwork:
  123. continue
  124. default:
  125. log.G(ctx).Errorf("network %s remove failed: %v", nid, err)
  126. return err
  127. }
  128. }
  129. }
  130. return nil
  131. }
  132. func (c *containerAdapter) networkAttach(ctx context.Context) error {
  133. config := c.container.createNetworkingConfig()
  134. var (
  135. networkName string
  136. networkID string
  137. )
  138. if config != nil {
  139. for n, epConfig := range config.EndpointsConfig {
  140. networkName = n
  141. networkID = epConfig.NetworkID
  142. break
  143. }
  144. }
  145. return c.backend.UpdateAttachment(networkName, networkID, c.container.id(), config)
  146. }
  147. func (c *containerAdapter) waitForDetach(ctx context.Context) error {
  148. config := c.container.createNetworkingConfig()
  149. var (
  150. networkName string
  151. networkID string
  152. )
  153. if config != nil {
  154. for n, epConfig := range config.EndpointsConfig {
  155. networkName = n
  156. networkID = epConfig.NetworkID
  157. break
  158. }
  159. }
  160. return c.backend.WaitForDetachment(ctx, networkName, networkID, c.container.taskID(), c.container.id())
  161. }
  162. func (c *containerAdapter) create(ctx context.Context) error {
  163. var cr types.ContainerCreateResponse
  164. var err error
  165. version := httputils.VersionFromContext(ctx)
  166. validateHostname := versions.GreaterThanOrEqualTo(version, "1.24")
  167. if cr, err = c.backend.CreateManagedContainer(types.ContainerCreateConfig{
  168. Name: c.container.name(),
  169. Config: c.container.config(),
  170. HostConfig: c.container.hostConfig(),
  171. // Use the first network in container create
  172. NetworkingConfig: c.container.createNetworkingConfig(),
  173. }, validateHostname); err != nil {
  174. return err
  175. }
  176. // Docker daemon currently doesn't support multiple networks in container create
  177. // Connect to all other networks
  178. nc := c.container.connectNetworkingConfig()
  179. if nc != nil {
  180. for n, ep := range nc.EndpointsConfig {
  181. if err := c.backend.ConnectContainerToNetwork(cr.ID, n, ep); err != nil {
  182. return err
  183. }
  184. }
  185. }
  186. if err := c.backend.UpdateContainerServiceConfig(cr.ID, c.container.serviceConfig()); err != nil {
  187. return err
  188. }
  189. return nil
  190. }
  191. func (c *containerAdapter) start(ctx context.Context) error {
  192. version := httputils.VersionFromContext(ctx)
  193. validateHostname := versions.GreaterThanOrEqualTo(version, "1.24")
  194. return c.backend.ContainerStart(c.container.name(), nil, validateHostname, "")
  195. }
  196. func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) {
  197. cs, err := c.backend.ContainerInspectCurrent(c.container.name(), false)
  198. if ctx.Err() != nil {
  199. return types.ContainerJSON{}, ctx.Err()
  200. }
  201. if err != nil {
  202. return types.ContainerJSON{}, err
  203. }
  204. return *cs, nil
  205. }
  206. // events issues a call to the events API and returns a channel with all
  207. // events. The stream of events can be shutdown by cancelling the context.
  208. func (c *containerAdapter) events(ctx context.Context) <-chan events.Message {
  209. log.G(ctx).Debugf("waiting on events")
  210. buffer, l := c.backend.SubscribeToEvents(time.Time{}, time.Time{}, c.container.eventFilter())
  211. eventsq := make(chan events.Message, len(buffer))
  212. for _, event := range buffer {
  213. eventsq <- event
  214. }
  215. go func() {
  216. defer c.backend.UnsubscribeFromEvents(l)
  217. for {
  218. select {
  219. case ev := <-l:
  220. jev, ok := ev.(events.Message)
  221. if !ok {
  222. log.G(ctx).Warnf("unexpected event message: %q", ev)
  223. continue
  224. }
  225. select {
  226. case eventsq <- jev:
  227. case <-ctx.Done():
  228. return
  229. }
  230. case <-ctx.Done():
  231. return
  232. }
  233. }
  234. }()
  235. return eventsq
  236. }
  237. func (c *containerAdapter) wait(ctx context.Context) error {
  238. return c.backend.ContainerWaitWithContext(ctx, c.container.nameOrID())
  239. }
  240. func (c *containerAdapter) shutdown(ctx context.Context) error {
  241. // Default stop grace period to 10s.
  242. stopgrace := 10
  243. spec := c.container.spec()
  244. if spec.StopGracePeriod != nil {
  245. stopgrace = int(spec.StopGracePeriod.Seconds)
  246. }
  247. return c.backend.ContainerStop(c.container.name(), stopgrace)
  248. }
  249. func (c *containerAdapter) terminate(ctx context.Context) error {
  250. return c.backend.ContainerKill(c.container.name(), uint64(syscall.SIGKILL))
  251. }
  252. func (c *containerAdapter) remove(ctx context.Context) error {
  253. return c.backend.ContainerRm(c.container.name(), &types.ContainerRmConfig{
  254. RemoveVolume: true,
  255. ForceRemove: true,
  256. })
  257. }
  258. func (c *containerAdapter) createVolumes(ctx context.Context) error {
  259. // Create plugin volumes that are embedded inside a Mount
  260. for _, mount := range c.container.task.Spec.GetContainer().Mounts {
  261. if mount.Type != api.MountTypeVolume {
  262. continue
  263. }
  264. if mount.VolumeOptions == nil {
  265. continue
  266. }
  267. if mount.VolumeOptions.DriverConfig == nil {
  268. continue
  269. }
  270. req := c.container.volumeCreateRequest(&mount)
  271. // Check if this volume exists on the engine
  272. if _, err := c.backend.VolumeCreate(req.Name, req.Driver, req.DriverOpts, req.Labels); err != nil {
  273. // TODO(amitshukla): Today, volume create through the engine api does not return an error
  274. // when the named volume with the same parameters already exists.
  275. // It returns an error if the driver name is different - that is a valid error
  276. return err
  277. }
  278. }
  279. return nil
  280. }
  281. // todo: typed/wrapped errors
  282. func isContainerCreateNameConflict(err error) bool {
  283. return strings.Contains(err.Error(), "Conflict. The name")
  284. }
  285. func isUnknownContainer(err error) bool {
  286. return strings.Contains(err.Error(), "No such container:")
  287. }
  288. func isStoppedContainer(err error) bool {
  289. return strings.Contains(err.Error(), "is already stopped")
  290. }