adapter.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package container
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "strings"
  8. "syscall"
  9. "github.com/Sirupsen/logrus"
  10. executorpkg "github.com/docker/docker/daemon/cluster/executor"
  11. "github.com/docker/engine-api/types"
  12. "github.com/docker/libnetwork"
  13. "github.com/docker/swarmkit/api"
  14. "github.com/docker/swarmkit/log"
  15. "golang.org/x/net/context"
  16. )
  17. // containerAdapter conducts remote operations for a container. All calls
  18. // are mostly naked calls to the client API, seeded with information from
  19. // containerConfig.
  20. type containerAdapter struct {
  21. backend executorpkg.Backend
  22. container *containerConfig
  23. }
  24. func newContainerAdapter(b executorpkg.Backend, task *api.Task) (*containerAdapter, error) {
  25. ctnr, err := newContainerConfig(task)
  26. if err != nil {
  27. return nil, err
  28. }
  29. return &containerAdapter{
  30. container: ctnr,
  31. backend: b,
  32. }, nil
  33. }
  34. func (c *containerAdapter) pullImage(ctx context.Context) error {
  35. spec := c.container.spec()
  36. // if the image needs to be pulled, the auth config will be retrieved and updated
  37. var encodedAuthConfig string
  38. if spec.PullOptions != nil {
  39. encodedAuthConfig = spec.PullOptions.RegistryAuth
  40. }
  41. authConfig := &types.AuthConfig{}
  42. if encodedAuthConfig != "" {
  43. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuthConfig))).Decode(authConfig); err != nil {
  44. logrus.Warnf("invalid authconfig: %v", err)
  45. }
  46. }
  47. pr, pw := io.Pipe()
  48. metaHeaders := map[string][]string{}
  49. go func() {
  50. err := c.backend.PullImage(ctx, c.container.image(), "", metaHeaders, authConfig, pw)
  51. pw.CloseWithError(err)
  52. }()
  53. dec := json.NewDecoder(pr)
  54. m := map[string]interface{}{}
  55. for {
  56. if err := dec.Decode(&m); err != nil {
  57. if err == io.EOF {
  58. break
  59. }
  60. return err
  61. }
  62. // TODO(stevvooe): Report this status somewhere.
  63. logrus.Debugln("pull progress", m)
  64. }
  65. // if the final stream object contained an error, return it
  66. if errMsg, ok := m["error"]; ok {
  67. return fmt.Errorf("%v", errMsg)
  68. }
  69. return nil
  70. }
  71. func (c *containerAdapter) createNetworks(ctx context.Context) error {
  72. for _, network := range c.container.networks() {
  73. ncr, err := c.container.networkCreateRequest(network)
  74. if err != nil {
  75. return err
  76. }
  77. if err := c.backend.CreateManagedNetwork(ncr); err != nil { // todo name missing
  78. if _, ok := err.(libnetwork.NetworkNameError); ok {
  79. continue
  80. }
  81. return err
  82. }
  83. }
  84. return nil
  85. }
  86. func (c *containerAdapter) removeNetworks(ctx context.Context) error {
  87. for _, nid := range c.container.networks() {
  88. if err := c.backend.DeleteManagedNetwork(nid); err != nil {
  89. if _, ok := err.(*libnetwork.ActiveEndpointsError); ok {
  90. continue
  91. }
  92. log.G(ctx).Errorf("network %s remove failed: %v", nid, err)
  93. return err
  94. }
  95. }
  96. return nil
  97. }
  98. func (c *containerAdapter) create(ctx context.Context, backend executorpkg.Backend) error {
  99. var cr types.ContainerCreateResponse
  100. var err error
  101. if cr, err = backend.CreateManagedContainer(types.ContainerCreateConfig{
  102. Name: c.container.name(),
  103. Config: c.container.config(),
  104. HostConfig: c.container.hostConfig(),
  105. // Use the first network in container create
  106. NetworkingConfig: c.container.createNetworkingConfig(),
  107. }); err != nil {
  108. return err
  109. }
  110. // Docker daemon currently doesn't support multiple networks in container create
  111. // Connect to all other networks
  112. nc := c.container.connectNetworkingConfig()
  113. if nc != nil {
  114. for n, ep := range nc.EndpointsConfig {
  115. if err := backend.ConnectContainerToNetwork(cr.ID, n, ep); err != nil {
  116. return err
  117. }
  118. }
  119. }
  120. if err := backend.UpdateContainerServiceConfig(cr.ID, c.container.serviceConfig()); err != nil {
  121. return err
  122. }
  123. return nil
  124. }
  125. func (c *containerAdapter) start(ctx context.Context) error {
  126. return c.backend.ContainerStart(c.container.name(), nil)
  127. }
  128. func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) {
  129. cs, err := c.backend.ContainerInspectCurrent(c.container.name(), false)
  130. if ctx.Err() != nil {
  131. return types.ContainerJSON{}, ctx.Err()
  132. }
  133. if err != nil {
  134. return types.ContainerJSON{}, err
  135. }
  136. return *cs, nil
  137. }
  138. // events issues a call to the events API and returns a channel with all
  139. // events. The stream of events can be shutdown by cancelling the context.
  140. //
  141. // A chan struct{} is returned that will be closed if the event processing
  142. // fails and needs to be restarted.
  143. func (c *containerAdapter) wait(ctx context.Context) error {
  144. return c.backend.ContainerWaitWithContext(ctx, c.container.name())
  145. }
  146. func (c *containerAdapter) shutdown(ctx context.Context) error {
  147. // Default stop grace period to 10s.
  148. stopgrace := 10
  149. spec := c.container.spec()
  150. if spec.StopGracePeriod != nil {
  151. stopgrace = int(spec.StopGracePeriod.Seconds)
  152. }
  153. return c.backend.ContainerStop(c.container.name(), stopgrace)
  154. }
  155. func (c *containerAdapter) terminate(ctx context.Context) error {
  156. return c.backend.ContainerKill(c.container.name(), uint64(syscall.SIGKILL))
  157. }
  158. func (c *containerAdapter) remove(ctx context.Context) error {
  159. return c.backend.ContainerRm(c.container.name(), &types.ContainerRmConfig{
  160. RemoveVolume: true,
  161. ForceRemove: true,
  162. })
  163. }
  164. func (c *containerAdapter) createVolumes(ctx context.Context, backend executorpkg.Backend) error {
  165. // Create plugin volumes that are embedded inside a Mount
  166. for _, mount := range c.container.task.Spec.GetContainer().Mounts {
  167. if mount.Type != api.MountTypeVolume {
  168. continue
  169. }
  170. if mount.VolumeOptions == nil {
  171. continue
  172. }
  173. if mount.VolumeOptions.DriverConfig == nil {
  174. continue
  175. }
  176. req := c.container.volumeCreateRequest(&mount)
  177. // Check if this volume exists on the engine
  178. if _, err := backend.VolumeCreate(req.Name, req.Driver, req.DriverOpts, req.Labels); err != nil {
  179. // TODO(amitshukla): Today, volume create through the engine api does not return an error
  180. // when the named volume with the same parameters already exists.
  181. // It returns an error if the driver name is different - that is a valid error
  182. return err
  183. }
  184. }
  185. return nil
  186. }
  187. // todo: typed/wrapped errors
  188. func isContainerCreateNameConflict(err error) bool {
  189. return strings.Contains(err.Error(), "Conflict. The name")
  190. }
  191. func isUnknownContainer(err error) bool {
  192. return strings.Contains(err.Error(), "No such container:")
  193. }
  194. func isStoppedContainer(err error) bool {
  195. return strings.Contains(err.Error(), "is already stopped")
  196. }