adapter.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. package container
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "strings"
  8. "syscall"
  9. "time"
  10. "github.com/Sirupsen/logrus"
  11. "github.com/docker/docker/api/server/httputils"
  12. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  13. "github.com/docker/engine-api/types"
  14. "github.com/docker/engine-api/types/events"
  15. "github.com/docker/engine-api/types/versions"
  16. "github.com/docker/libnetwork"
  17. "github.com/docker/swarmkit/api"
  18. "github.com/docker/swarmkit/log"
  19. "golang.org/x/net/context"
  20. )
  21. // containerAdapter conducts remote operations for a container. All calls
  22. // are mostly naked calls to the client API, seeded with information from
  23. // containerConfig.
  24. type containerAdapter struct {
  25. backend executorpkg.Backend
  26. container *containerConfig
  27. }
  28. func newContainerAdapter(b executorpkg.Backend, task *api.Task) (*containerAdapter, error) {
  29. ctnr, err := newContainerConfig(task)
  30. if err != nil {
  31. return nil, err
  32. }
  33. return &containerAdapter{
  34. container: ctnr,
  35. backend: b,
  36. }, nil
  37. }
  38. func (c *containerAdapter) pullImage(ctx context.Context) error {
  39. spec := c.container.spec()
  40. // if the image needs to be pulled, the auth config will be retrieved and updated
  41. var encodedAuthConfig string
  42. if spec.PullOptions != nil {
  43. encodedAuthConfig = spec.PullOptions.RegistryAuth
  44. }
  45. authConfig := &types.AuthConfig{}
  46. if encodedAuthConfig != "" {
  47. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuthConfig))).Decode(authConfig); err != nil {
  48. logrus.Warnf("invalid authconfig: %v", err)
  49. }
  50. }
  51. pr, pw := io.Pipe()
  52. metaHeaders := map[string][]string{}
  53. go func() {
  54. err := c.backend.PullImage(ctx, c.container.image(), "", metaHeaders, authConfig, pw)
  55. pw.CloseWithError(err)
  56. }()
  57. dec := json.NewDecoder(pr)
  58. m := map[string]interface{}{}
  59. for {
  60. if err := dec.Decode(&m); err != nil {
  61. if err == io.EOF {
  62. break
  63. }
  64. return err
  65. }
  66. // TODO(stevvooe): Report this status somewhere.
  67. logrus.Debugln("pull progress", m)
  68. }
  69. // if the final stream object contained an error, return it
  70. if errMsg, ok := m["error"]; ok {
  71. return fmt.Errorf("%v", errMsg)
  72. }
  73. return nil
  74. }
  75. func (c *containerAdapter) createNetworks(ctx context.Context) error {
  76. for _, network := range c.container.networks() {
  77. ncr, err := c.container.networkCreateRequest(network)
  78. if err != nil {
  79. return err
  80. }
  81. if err := c.backend.CreateManagedNetwork(ncr); err != nil { // todo name missing
  82. if _, ok := err.(libnetwork.NetworkNameError); ok {
  83. continue
  84. }
  85. return err
  86. }
  87. }
  88. return nil
  89. }
  90. func (c *containerAdapter) removeNetworks(ctx context.Context) error {
  91. for _, nid := range c.container.networks() {
  92. if err := c.backend.DeleteManagedNetwork(nid); err != nil {
  93. if _, ok := err.(*libnetwork.ActiveEndpointsError); ok {
  94. continue
  95. }
  96. log.G(ctx).Errorf("network %s remove failed: %v", nid, err)
  97. return err
  98. }
  99. }
  100. return nil
  101. }
  102. func (c *containerAdapter) create(ctx context.Context, backend executorpkg.Backend) error {
  103. var cr types.ContainerCreateResponse
  104. var err error
  105. version := httputils.VersionFromContext(ctx)
  106. validateHostname := versions.GreaterThanOrEqualTo(version, "1.24")
  107. if cr, err = backend.CreateManagedContainer(types.ContainerCreateConfig{
  108. Name: c.container.name(),
  109. Config: c.container.config(),
  110. HostConfig: c.container.hostConfig(),
  111. // Use the first network in container create
  112. NetworkingConfig: c.container.createNetworkingConfig(),
  113. }, validateHostname); err != nil {
  114. return err
  115. }
  116. // Docker daemon currently doesn't support multiple networks in container create
  117. // Connect to all other networks
  118. nc := c.container.connectNetworkingConfig()
  119. if nc != nil {
  120. for n, ep := range nc.EndpointsConfig {
  121. if err := backend.ConnectContainerToNetwork(cr.ID, n, ep); err != nil {
  122. return err
  123. }
  124. }
  125. }
  126. if err := backend.UpdateContainerServiceConfig(cr.ID, c.container.serviceConfig()); err != nil {
  127. return err
  128. }
  129. return nil
  130. }
  131. func (c *containerAdapter) start(ctx context.Context) error {
  132. version := httputils.VersionFromContext(ctx)
  133. validateHostname := versions.GreaterThanOrEqualTo(version, "1.24")
  134. return c.backend.ContainerStart(c.container.name(), nil, validateHostname)
  135. }
  136. func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) {
  137. cs, err := c.backend.ContainerInspectCurrent(c.container.name(), false)
  138. if ctx.Err() != nil {
  139. return types.ContainerJSON{}, ctx.Err()
  140. }
  141. if err != nil {
  142. return types.ContainerJSON{}, err
  143. }
  144. return *cs, nil
  145. }
  146. // events issues a call to the events API and returns a channel with all
  147. // events. The stream of events can be shutdown by cancelling the context.
  148. func (c *containerAdapter) events(ctx context.Context) <-chan events.Message {
  149. log.G(ctx).Debugf("waiting on events")
  150. buffer, l := c.backend.SubscribeToEvents(time.Time{}, time.Time{}, c.container.eventFilter())
  151. eventsq := make(chan events.Message, len(buffer))
  152. for _, event := range buffer {
  153. eventsq <- event
  154. }
  155. go func() {
  156. defer c.backend.UnsubscribeFromEvents(l)
  157. for {
  158. select {
  159. case ev := <-l:
  160. jev, ok := ev.(events.Message)
  161. if !ok {
  162. log.G(ctx).Warnf("unexpected event message: %q", ev)
  163. continue
  164. }
  165. select {
  166. case eventsq <- jev:
  167. case <-ctx.Done():
  168. return
  169. }
  170. case <-ctx.Done():
  171. return
  172. }
  173. }
  174. }()
  175. return eventsq
  176. }
  177. func (c *containerAdapter) wait(ctx context.Context) error {
  178. return c.backend.ContainerWaitWithContext(ctx, c.container.name())
  179. }
  180. func (c *containerAdapter) shutdown(ctx context.Context) error {
  181. // Default stop grace period to 10s.
  182. stopgrace := 10
  183. spec := c.container.spec()
  184. if spec.StopGracePeriod != nil {
  185. stopgrace = int(spec.StopGracePeriod.Seconds)
  186. }
  187. return c.backend.ContainerStop(c.container.name(), stopgrace)
  188. }
  189. func (c *containerAdapter) terminate(ctx context.Context) error {
  190. return c.backend.ContainerKill(c.container.name(), uint64(syscall.SIGKILL))
  191. }
  192. func (c *containerAdapter) remove(ctx context.Context) error {
  193. return c.backend.ContainerRm(c.container.name(), &types.ContainerRmConfig{
  194. RemoveVolume: true,
  195. ForceRemove: true,
  196. })
  197. }
  198. func (c *containerAdapter) createVolumes(ctx context.Context, backend executorpkg.Backend) error {
  199. // Create plugin volumes that are embedded inside a Mount
  200. for _, mount := range c.container.task.Spec.GetContainer().Mounts {
  201. if mount.Type != api.MountTypeVolume {
  202. continue
  203. }
  204. if mount.VolumeOptions == nil {
  205. continue
  206. }
  207. if mount.VolumeOptions.DriverConfig == nil {
  208. continue
  209. }
  210. req := c.container.volumeCreateRequest(&mount)
  211. // Check if this volume exists on the engine
  212. if _, err := backend.VolumeCreate(req.Name, req.Driver, req.DriverOpts, req.Labels); err != nil {
  213. // TODO(amitshukla): Today, volume create through the engine api does not return an error
  214. // when the named volume with the same parameters already exists.
  215. // It returns an error if the driver name is different - that is a valid error
  216. return err
  217. }
  218. }
  219. return nil
  220. }
  221. // todo: typed/wrapped errors
  222. func isContainerCreateNameConflict(err error) bool {
  223. return strings.Contains(err.Error(), "Conflict. The name")
  224. }
  225. func isUnknownContainer(err error) bool {
  226. return strings.Contains(err.Error(), "No such container:")
  227. }
  228. func isStoppedContainer(err error) bool {
  229. return strings.Contains(err.Error(), "is already stopped")
  230. }