services.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552
  1. package cluster
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "os"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/Sirupsen/logrus"
  12. "github.com/docker/distribution/reference"
  13. apierrors "github.com/docker/docker/api/errors"
  14. apitypes "github.com/docker/docker/api/types"
  15. "github.com/docker/docker/api/types/backend"
  16. types "github.com/docker/docker/api/types/swarm"
  17. timetypes "github.com/docker/docker/api/types/time"
  18. "github.com/docker/docker/daemon/cluster/convert"
  19. runconfigopts "github.com/docker/docker/runconfig/opts"
  20. swarmapi "github.com/docker/swarmkit/api"
  21. gogotypes "github.com/gogo/protobuf/types"
  22. "github.com/pkg/errors"
  23. "golang.org/x/net/context"
  24. )
  25. // GetServices returns all services of a managed swarm cluster.
  26. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  27. c.mu.RLock()
  28. defer c.mu.RUnlock()
  29. state := c.currentNodeState()
  30. if !state.IsActiveManager() {
  31. return nil, c.errNoManager(state)
  32. }
  33. // We move the accepted filter check here as "mode" filter
  34. // is processed in the daemon, not in SwarmKit. So it might
  35. // be good to have accepted file check in the same file as
  36. // the filter processing (in the for loop below).
  37. accepted := map[string]bool{
  38. "name": true,
  39. "id": true,
  40. "label": true,
  41. "mode": true,
  42. "runtime": true,
  43. }
  44. if err := options.Filters.Validate(accepted); err != nil {
  45. return nil, err
  46. }
  47. filters := &swarmapi.ListServicesRequest_Filters{
  48. NamePrefixes: options.Filters.Get("name"),
  49. IDPrefixes: options.Filters.Get("id"),
  50. Labels: runconfigopts.ConvertKVStringsToMap(options.Filters.Get("label")),
  51. // (ehazlett): hardcode runtime for now. eventually we will
  52. // be able to filter for the desired runtimes once more
  53. // are supported.
  54. Runtimes: []string{string(types.RuntimeContainer)},
  55. }
  56. ctx, cancel := c.getRequestContext()
  57. defer cancel()
  58. r, err := state.controlClient.ListServices(
  59. ctx,
  60. &swarmapi.ListServicesRequest{Filters: filters})
  61. if err != nil {
  62. return nil, err
  63. }
  64. services := make([]types.Service, 0, len(r.Services))
  65. for _, service := range r.Services {
  66. if options.Filters.Include("mode") {
  67. var mode string
  68. switch service.Spec.GetMode().(type) {
  69. case *swarmapi.ServiceSpec_Global:
  70. mode = "global"
  71. case *swarmapi.ServiceSpec_Replicated:
  72. mode = "replicated"
  73. }
  74. if !options.Filters.ExactMatch("mode", mode) {
  75. continue
  76. }
  77. }
  78. svcs, err := convert.ServiceFromGRPC(*service)
  79. if err != nil {
  80. return nil, err
  81. }
  82. services = append(services, svcs)
  83. }
  84. return services, nil
  85. }
  86. // GetService returns a service based on an ID or name.
  87. func (c *Cluster) GetService(input string, insertDefaults bool) (types.Service, error) {
  88. var service *swarmapi.Service
  89. if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
  90. s, err := getService(ctx, state.controlClient, input, insertDefaults)
  91. if err != nil {
  92. return err
  93. }
  94. service = s
  95. return nil
  96. }); err != nil {
  97. return types.Service{}, err
  98. }
  99. svc, err := convert.ServiceFromGRPC(*service)
  100. if err != nil {
  101. return types.Service{}, err
  102. }
  103. return svc, nil
  104. }
  105. // CreateService creates a new service in a managed swarm cluster.
  106. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string, queryRegistry bool) (*apitypes.ServiceCreateResponse, error) {
  107. var resp *apitypes.ServiceCreateResponse
  108. err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
  109. err := c.populateNetworkID(ctx, state.controlClient, &s)
  110. if err != nil {
  111. return err
  112. }
  113. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  114. if err != nil {
  115. return apierrors.NewBadRequestError(err)
  116. }
  117. resp = &apitypes.ServiceCreateResponse{}
  118. switch serviceSpec.Task.Runtime.(type) {
  119. // handle other runtimes here
  120. case *swarmapi.TaskSpec_Container:
  121. ctnr := serviceSpec.Task.GetContainer()
  122. if ctnr == nil {
  123. return errors.New("service does not use container tasks")
  124. }
  125. if encodedAuth != "" {
  126. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  127. }
  128. // retrieve auth config from encoded auth
  129. authConfig := &apitypes.AuthConfig{}
  130. if encodedAuth != "" {
  131. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
  132. logrus.Warnf("invalid authconfig: %v", err)
  133. }
  134. }
  135. // pin image by digest for API versions < 1.30
  136. // TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE"
  137. // should be removed in the future. Since integration tests only use the
  138. // latest API version, so this is no longer required.
  139. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" && queryRegistry {
  140. digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
  141. if err != nil {
  142. logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
  143. // warning in the client response should be concise
  144. resp.Warnings = append(resp.Warnings, digestWarning(ctnr.Image))
  145. } else if ctnr.Image != digestImage {
  146. logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
  147. ctnr.Image = digestImage
  148. } else {
  149. logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image)
  150. }
  151. // Replace the context with a fresh one.
  152. // If we timed out while communicating with the
  153. // registry, then "ctx" will already be expired, which
  154. // would cause UpdateService below to fail. Reusing
  155. // "ctx" could make it impossible to create a service
  156. // if the registry is slow or unresponsive.
  157. var cancel func()
  158. ctx, cancel = c.getRequestContext()
  159. defer cancel()
  160. }
  161. r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  162. if err != nil {
  163. return err
  164. }
  165. resp.ID = r.Service.ID
  166. }
  167. return nil
  168. })
  169. return resp, err
  170. }
  171. // UpdateService updates existing service to match new properties.
  172. func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, flags apitypes.ServiceUpdateOptions, queryRegistry bool) (*apitypes.ServiceUpdateResponse, error) {
  173. var resp *apitypes.ServiceUpdateResponse
  174. err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
  175. err := c.populateNetworkID(ctx, state.controlClient, &spec)
  176. if err != nil {
  177. return err
  178. }
  179. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  180. if err != nil {
  181. return apierrors.NewBadRequestError(err)
  182. }
  183. currentService, err := getService(ctx, state.controlClient, serviceIDOrName, false)
  184. if err != nil {
  185. return err
  186. }
  187. newCtnr := serviceSpec.Task.GetContainer()
  188. if newCtnr == nil {
  189. return errors.New("service does not use container tasks")
  190. }
  191. encodedAuth := flags.EncodedRegistryAuth
  192. if encodedAuth != "" {
  193. newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  194. } else {
  195. // this is needed because if the encodedAuth isn't being updated then we
  196. // shouldn't lose it, and continue to use the one that was already present
  197. var ctnr *swarmapi.ContainerSpec
  198. switch flags.RegistryAuthFrom {
  199. case apitypes.RegistryAuthFromSpec, "":
  200. ctnr = currentService.Spec.Task.GetContainer()
  201. case apitypes.RegistryAuthFromPreviousSpec:
  202. if currentService.PreviousSpec == nil {
  203. return errors.New("service does not have a previous spec")
  204. }
  205. ctnr = currentService.PreviousSpec.Task.GetContainer()
  206. default:
  207. return errors.New("unsupported registryAuthFrom value")
  208. }
  209. if ctnr == nil {
  210. return errors.New("service does not use container tasks")
  211. }
  212. newCtnr.PullOptions = ctnr.PullOptions
  213. // update encodedAuth so it can be used to pin image by digest
  214. if ctnr.PullOptions != nil {
  215. encodedAuth = ctnr.PullOptions.RegistryAuth
  216. }
  217. }
  218. // retrieve auth config from encoded auth
  219. authConfig := &apitypes.AuthConfig{}
  220. if encodedAuth != "" {
  221. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
  222. logrus.Warnf("invalid authconfig: %v", err)
  223. }
  224. }
  225. resp = &apitypes.ServiceUpdateResponse{}
  226. // pin image by digest for API versions < 1.30
  227. // TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE"
  228. // should be removed in the future. Since integration tests only use the
  229. // latest API version, so this is no longer required.
  230. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" && queryRegistry {
  231. digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
  232. if err != nil {
  233. logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
  234. // warning in the client response should be concise
  235. resp.Warnings = append(resp.Warnings, digestWarning(newCtnr.Image))
  236. } else if newCtnr.Image != digestImage {
  237. logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
  238. newCtnr.Image = digestImage
  239. } else {
  240. logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image)
  241. }
  242. // Replace the context with a fresh one.
  243. // If we timed out while communicating with the
  244. // registry, then "ctx" will already be expired, which
  245. // would cause UpdateService below to fail. Reusing
  246. // "ctx" could make it impossible to update a service
  247. // if the registry is slow or unresponsive.
  248. var cancel func()
  249. ctx, cancel = c.getRequestContext()
  250. defer cancel()
  251. }
  252. var rollback swarmapi.UpdateServiceRequest_Rollback
  253. switch flags.Rollback {
  254. case "", "none":
  255. rollback = swarmapi.UpdateServiceRequest_NONE
  256. case "previous":
  257. rollback = swarmapi.UpdateServiceRequest_PREVIOUS
  258. default:
  259. return fmt.Errorf("unrecognized rollback option %s", flags.Rollback)
  260. }
  261. _, err = state.controlClient.UpdateService(
  262. ctx,
  263. &swarmapi.UpdateServiceRequest{
  264. ServiceID: currentService.ID,
  265. Spec: &serviceSpec,
  266. ServiceVersion: &swarmapi.Version{
  267. Index: version,
  268. },
  269. Rollback: rollback,
  270. },
  271. )
  272. return err
  273. })
  274. return resp, err
  275. }
  276. // RemoveService removes a service from a managed swarm cluster.
  277. func (c *Cluster) RemoveService(input string) error {
  278. return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
  279. service, err := getService(ctx, state.controlClient, input, false)
  280. if err != nil {
  281. return err
  282. }
  283. _, err = state.controlClient.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID})
  284. return err
  285. })
  286. }
  287. // ServiceLogs collects service logs and writes them back to `config.OutStream`
  288. func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector, config *apitypes.ContainerLogsOptions) (<-chan *backend.LogMessage, error) {
  289. c.mu.RLock()
  290. defer c.mu.RUnlock()
  291. state := c.currentNodeState()
  292. if !state.IsActiveManager() {
  293. return nil, c.errNoManager(state)
  294. }
  295. swarmSelector, err := convertSelector(ctx, state.controlClient, selector)
  296. if err != nil {
  297. return nil, errors.Wrap(err, "error making log selector")
  298. }
  299. // set the streams we'll use
  300. stdStreams := []swarmapi.LogStream{}
  301. if config.ShowStdout {
  302. stdStreams = append(stdStreams, swarmapi.LogStreamStdout)
  303. }
  304. if config.ShowStderr {
  305. stdStreams = append(stdStreams, swarmapi.LogStreamStderr)
  306. }
  307. // Get tail value squared away - the number of previous log lines we look at
  308. var tail int64
  309. // in ContainerLogs, if the tail value is ANYTHING non-integer, we just set
  310. // it to -1 (all). i don't agree with that, but i also think no tail value
  311. // should be legitimate. if you don't pass tail, we assume you want "all"
  312. if config.Tail == "all" || config.Tail == "" {
  313. // tail of 0 means send all logs on the swarmkit side
  314. tail = 0
  315. } else {
  316. t, err := strconv.Atoi(config.Tail)
  317. if err != nil {
  318. return nil, errors.New("tail value must be a positive integer or \"all\"")
  319. }
  320. if t < 0 {
  321. return nil, errors.New("negative tail values not supported")
  322. }
  323. // we actually use negative tail in swarmkit to represent messages
  324. // backwards starting from the beginning. also, -1 means no logs. so,
  325. // basically, for api compat with docker container logs, add one and
  326. // flip the sign. we error above if you try to negative tail, which
  327. // isn't supported by docker (and would error deeper in the stack
  328. // anyway)
  329. //
  330. // See the logs protobuf for more information
  331. tail = int64(-(t + 1))
  332. }
  333. // get the since value - the time in the past we're looking at logs starting from
  334. var sinceProto *gogotypes.Timestamp
  335. if config.Since != "" {
  336. s, n, err := timetypes.ParseTimestamps(config.Since, 0)
  337. if err != nil {
  338. return nil, errors.Wrap(err, "could not parse since timestamp")
  339. }
  340. since := time.Unix(s, n)
  341. sinceProto, err = gogotypes.TimestampProto(since)
  342. if err != nil {
  343. return nil, errors.Wrap(err, "could not parse timestamp to proto")
  344. }
  345. }
  346. stream, err := state.logsClient.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
  347. Selector: swarmSelector,
  348. Options: &swarmapi.LogSubscriptionOptions{
  349. Follow: config.Follow,
  350. Streams: stdStreams,
  351. Tail: tail,
  352. Since: sinceProto,
  353. },
  354. })
  355. if err != nil {
  356. return nil, err
  357. }
  358. messageChan := make(chan *backend.LogMessage, 1)
  359. go func() {
  360. defer close(messageChan)
  361. for {
  362. // Check the context before doing anything.
  363. select {
  364. case <-ctx.Done():
  365. return
  366. default:
  367. }
  368. subscribeMsg, err := stream.Recv()
  369. if err == io.EOF {
  370. return
  371. }
  372. // if we're not io.EOF, push the message in and return
  373. if err != nil {
  374. select {
  375. case <-ctx.Done():
  376. case messageChan <- &backend.LogMessage{Err: err}:
  377. }
  378. return
  379. }
  380. for _, msg := range subscribeMsg.Messages {
  381. // make a new message
  382. m := new(backend.LogMessage)
  383. m.Attrs = make(backend.LogAttributes)
  384. // add the timestamp, adding the error if it fails
  385. m.Timestamp, err = gogotypes.TimestampFromProto(msg.Timestamp)
  386. if err != nil {
  387. m.Err = err
  388. }
  389. // copy over all of the details
  390. for _, d := range msg.Attrs {
  391. m.Attrs[d.Key] = d.Value
  392. }
  393. // we have the final say over context details (in case there
  394. // is a conflict (if the user added a detail with a context's
  395. // key for some reason))
  396. m.Attrs[contextPrefix+".node.id"] = msg.Context.NodeID
  397. m.Attrs[contextPrefix+".service.id"] = msg.Context.ServiceID
  398. m.Attrs[contextPrefix+".task.id"] = msg.Context.TaskID
  399. switch msg.Stream {
  400. case swarmapi.LogStreamStdout:
  401. m.Source = "stdout"
  402. case swarmapi.LogStreamStderr:
  403. m.Source = "stderr"
  404. }
  405. m.Line = msg.Data
  406. // there could be a case where the reader stops accepting
  407. // messages and the context is canceled. we need to check that
  408. // here, or otherwise we risk blocking forever on the message
  409. // send.
  410. select {
  411. case <-ctx.Done():
  412. return
  413. case messageChan <- m:
  414. }
  415. }
  416. }
  417. }()
  418. return messageChan, nil
  419. }
  420. // convertSelector takes a backend.LogSelector, which contains raw names that
  421. // may or may not be valid, and converts them to an api.LogSelector proto. It
  422. // returns an error if something fails
  423. func convertSelector(ctx context.Context, cc swarmapi.ControlClient, selector *backend.LogSelector) (*swarmapi.LogSelector, error) {
  424. // don't rely on swarmkit to resolve IDs, do it ourselves
  425. swarmSelector := &swarmapi.LogSelector{}
  426. for _, s := range selector.Services {
  427. service, err := getService(ctx, cc, s, false)
  428. if err != nil {
  429. return nil, err
  430. }
  431. c := service.Spec.Task.GetContainer()
  432. if c == nil {
  433. return nil, errors.New("logs only supported on container tasks")
  434. }
  435. swarmSelector.ServiceIDs = append(swarmSelector.ServiceIDs, service.ID)
  436. }
  437. for _, t := range selector.Tasks {
  438. task, err := getTask(ctx, cc, t)
  439. if err != nil {
  440. return nil, err
  441. }
  442. c := task.Spec.GetContainer()
  443. if c == nil {
  444. return nil, errors.New("logs only supported on container tasks")
  445. }
  446. swarmSelector.TaskIDs = append(swarmSelector.TaskIDs, task.ID)
  447. }
  448. return swarmSelector, nil
  449. }
  450. // imageWithDigestString takes an image such as name or name:tag
  451. // and returns the image pinned to a digest, such as name@sha256:34234
  452. func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authConfig *apitypes.AuthConfig) (string, error) {
  453. ref, err := reference.ParseAnyReference(image)
  454. if err != nil {
  455. return "", err
  456. }
  457. namedRef, ok := ref.(reference.Named)
  458. if !ok {
  459. if _, ok := ref.(reference.Digested); ok {
  460. return image, nil
  461. }
  462. return "", errors.Errorf("unknown image reference format: %s", image)
  463. }
  464. // only query registry if not a canonical reference (i.e. with digest)
  465. if _, ok := namedRef.(reference.Canonical); !ok {
  466. namedRef = reference.TagNameOnly(namedRef)
  467. taggedRef, ok := namedRef.(reference.NamedTagged)
  468. if !ok {
  469. return "", errors.Errorf("image reference not tagged: %s", image)
  470. }
  471. repo, _, err := c.config.Backend.GetRepository(ctx, taggedRef, authConfig)
  472. if err != nil {
  473. return "", err
  474. }
  475. dscrptr, err := repo.Tags(ctx).Get(ctx, taggedRef.Tag())
  476. if err != nil {
  477. return "", err
  478. }
  479. namedDigestedRef, err := reference.WithDigest(taggedRef, dscrptr.Digest)
  480. if err != nil {
  481. return "", err
  482. }
  483. // return familiar form until interface updated to return type
  484. return reference.FamiliarString(namedDigestedRef), nil
  485. }
  486. // reference already contains a digest, so just return it
  487. return reference.FamiliarString(ref), nil
  488. }
  489. // digestWarning constructs a formatted warning string
  490. // using the image name that could not be pinned by digest. The
  491. // formatting is hardcoded, but could me made smarter in the future
  492. func digestWarning(image string) string {
  493. return fmt.Sprintf("image %s could not be accessed on a registry to record\nits digest. Each node will access %s independently,\npossibly leading to different nodes running different\nversions of the image.\n", image, image)
  494. }