services.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
  1. package cluster // import "github.com/docker/docker/daemon/cluster"
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "os"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/docker/distribution/reference"
  13. apitypes "github.com/docker/docker/api/types"
  14. "github.com/docker/docker/api/types/backend"
  15. types "github.com/docker/docker/api/types/swarm"
  16. timetypes "github.com/docker/docker/api/types/time"
  17. "github.com/docker/docker/daemon/cluster/convert"
  18. "github.com/docker/docker/errdefs"
  19. runconfigopts "github.com/docker/docker/runconfig/opts"
  20. swarmapi "github.com/docker/swarmkit/api"
  21. gogotypes "github.com/gogo/protobuf/types"
  22. "github.com/pkg/errors"
  23. "github.com/sirupsen/logrus"
  24. "google.golang.org/grpc"
  25. )
  26. // GetServices returns all services of a managed swarm cluster.
  27. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  28. c.mu.RLock()
  29. defer c.mu.RUnlock()
  30. state := c.currentNodeState()
  31. if !state.IsActiveManager() {
  32. return nil, c.errNoManager(state)
  33. }
  34. // We move the accepted filter check here as "mode" filter
  35. // is processed in the daemon, not in SwarmKit. So it might
  36. // be good to have accepted file check in the same file as
  37. // the filter processing (in the for loop below).
  38. accepted := map[string]bool{
  39. "name": true,
  40. "id": true,
  41. "label": true,
  42. "mode": true,
  43. "runtime": true,
  44. }
  45. if err := options.Filters.Validate(accepted); err != nil {
  46. return nil, err
  47. }
  48. if len(options.Filters.Get("runtime")) == 0 {
  49. // Default to using the container runtime filter
  50. options.Filters.Add("runtime", string(types.RuntimeContainer))
  51. }
  52. filters := &swarmapi.ListServicesRequest_Filters{
  53. NamePrefixes: options.Filters.Get("name"),
  54. IDPrefixes: options.Filters.Get("id"),
  55. Labels: runconfigopts.ConvertKVStringsToMap(options.Filters.Get("label")),
  56. Runtimes: options.Filters.Get("runtime"),
  57. }
  58. ctx, cancel := c.getRequestContext()
  59. defer cancel()
  60. r, err := state.controlClient.ListServices(
  61. ctx,
  62. &swarmapi.ListServicesRequest{Filters: filters},
  63. grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
  64. )
  65. if err != nil {
  66. return nil, err
  67. }
  68. services := make([]types.Service, 0, len(r.Services))
  69. // if the user requests the service statuses, we'll store the IDs needed
  70. // in this slice
  71. var serviceIDs []string
  72. if options.Status {
  73. serviceIDs = make([]string, 0, len(r.Services))
  74. }
  75. for _, service := range r.Services {
  76. if options.Filters.Contains("mode") {
  77. var mode string
  78. switch service.Spec.GetMode().(type) {
  79. case *swarmapi.ServiceSpec_Global:
  80. mode = "global"
  81. case *swarmapi.ServiceSpec_Replicated:
  82. mode = "replicated"
  83. case *swarmapi.ServiceSpec_ReplicatedJob:
  84. mode = "replicated-job"
  85. case *swarmapi.ServiceSpec_GlobalJob:
  86. mode = "global-job"
  87. }
  88. if !options.Filters.ExactMatch("mode", mode) {
  89. continue
  90. }
  91. }
  92. if options.Status {
  93. serviceIDs = append(serviceIDs, service.ID)
  94. }
  95. svcs, err := convert.ServiceFromGRPC(*service)
  96. if err != nil {
  97. return nil, err
  98. }
  99. services = append(services, svcs)
  100. }
  101. if options.Status {
  102. // Listing service statuses is a separate call because, while it is the
  103. // most common UI operation, it is still just a UI operation, and it
  104. // would be improper to include this data in swarm's Service object.
  105. // We pay the cost with some complexity here, but this is still way
  106. // more efficient than marshalling and unmarshalling all the JSON
  107. // needed to list tasks and get this data otherwise client-side
  108. resp, err := state.controlClient.ListServiceStatuses(
  109. ctx,
  110. &swarmapi.ListServiceStatusesRequest{Services: serviceIDs},
  111. grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
  112. )
  113. if err != nil {
  114. return nil, err
  115. }
  116. // we'll need to match up statuses in the response with the services in
  117. // the list operation. if we did this by operating on two lists, the
  118. // result would be quadratic. instead, make a mapping of service IDs to
  119. // service statuses so that this is roughly linear. additionally,
  120. // convert the status response to an engine api service status here.
  121. serviceMap := map[string]*types.ServiceStatus{}
  122. for _, status := range resp.Statuses {
  123. serviceMap[status.ServiceID] = &types.ServiceStatus{
  124. RunningTasks: status.RunningTasks,
  125. DesiredTasks: status.DesiredTasks,
  126. CompletedTasks: status.CompletedTasks,
  127. }
  128. }
  129. // because this is a list of values and not pointers, make sure we
  130. // actually alter the value when iterating.
  131. for i, service := range services {
  132. // the return value of the ListServiceStatuses operation is
  133. // guaranteed to contain a value in the response for every argument
  134. // in the request, so we can safely do this assignment. and even if
  135. // it wasn't, and the service ID was for some reason absent from
  136. // this map, the resulting value of service.Status would just be
  137. // nil -- the same thing it was before
  138. service.ServiceStatus = serviceMap[service.ID]
  139. services[i] = service
  140. }
  141. }
  142. return services, nil
  143. }
  144. // GetService returns a service based on an ID or name.
  145. func (c *Cluster) GetService(input string, insertDefaults bool) (types.Service, error) {
  146. var service *swarmapi.Service
  147. if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
  148. s, err := getService(ctx, state.controlClient, input, insertDefaults)
  149. if err != nil {
  150. return err
  151. }
  152. service = s
  153. return nil
  154. }); err != nil {
  155. return types.Service{}, err
  156. }
  157. svc, err := convert.ServiceFromGRPC(*service)
  158. if err != nil {
  159. return types.Service{}, err
  160. }
  161. return svc, nil
  162. }
  163. // CreateService creates a new service in a managed swarm cluster.
  164. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string, queryRegistry bool) (*apitypes.ServiceCreateResponse, error) {
  165. var resp *apitypes.ServiceCreateResponse
  166. err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
  167. err := c.populateNetworkID(ctx, state.controlClient, &s)
  168. if err != nil {
  169. return err
  170. }
  171. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  172. if err != nil {
  173. return errdefs.InvalidParameter(err)
  174. }
  175. resp = &apitypes.ServiceCreateResponse{}
  176. switch serviceSpec.Task.Runtime.(type) {
  177. case *swarmapi.TaskSpec_Attachment:
  178. return fmt.Errorf("invalid task spec: spec type %q not supported", types.RuntimeNetworkAttachment)
  179. // handle other runtimes here
  180. case *swarmapi.TaskSpec_Generic:
  181. switch serviceSpec.Task.GetGeneric().Kind {
  182. case string(types.RuntimePlugin):
  183. if !c.config.Backend.HasExperimental() {
  184. return fmt.Errorf("runtime type %q only supported in experimental", types.RuntimePlugin)
  185. }
  186. if s.TaskTemplate.PluginSpec == nil {
  187. return errors.New("plugin spec must be set")
  188. }
  189. default:
  190. return fmt.Errorf("unsupported runtime type: %q", serviceSpec.Task.GetGeneric().Kind)
  191. }
  192. r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  193. if err != nil {
  194. return err
  195. }
  196. resp.ID = r.Service.ID
  197. case *swarmapi.TaskSpec_Container:
  198. ctnr := serviceSpec.Task.GetContainer()
  199. if ctnr == nil {
  200. return errors.New("service does not use container tasks")
  201. }
  202. if encodedAuth != "" {
  203. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  204. }
  205. // retrieve auth config from encoded auth
  206. authConfig := &apitypes.AuthConfig{}
  207. if encodedAuth != "" {
  208. authReader := strings.NewReader(encodedAuth)
  209. dec := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, authReader))
  210. if err := dec.Decode(authConfig); err != nil {
  211. logrus.Warnf("invalid authconfig: %v", err)
  212. }
  213. }
  214. // pin image by digest for API versions < 1.30
  215. // TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE"
  216. // should be removed in the future. Since integration tests only use the
  217. // latest API version, so this is no longer required.
  218. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" && queryRegistry {
  219. digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
  220. if err != nil {
  221. logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
  222. // warning in the client response should be concise
  223. resp.Warnings = append(resp.Warnings, digestWarning(ctnr.Image))
  224. } else if ctnr.Image != digestImage {
  225. logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
  226. ctnr.Image = digestImage
  227. } else {
  228. logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image)
  229. }
  230. // Replace the context with a fresh one.
  231. // If we timed out while communicating with the
  232. // registry, then "ctx" will already be expired, which
  233. // would cause UpdateService below to fail. Reusing
  234. // "ctx" could make it impossible to create a service
  235. // if the registry is slow or unresponsive.
  236. var cancel func()
  237. ctx, cancel = c.getRequestContext()
  238. defer cancel()
  239. }
  240. r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  241. if err != nil {
  242. return err
  243. }
  244. resp.ID = r.Service.ID
  245. }
  246. return nil
  247. })
  248. return resp, err
  249. }
  250. // UpdateService updates existing service to match new properties.
  251. func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, flags apitypes.ServiceUpdateOptions, queryRegistry bool) (*apitypes.ServiceUpdateResponse, error) {
  252. var resp *apitypes.ServiceUpdateResponse
  253. err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
  254. err := c.populateNetworkID(ctx, state.controlClient, &spec)
  255. if err != nil {
  256. return err
  257. }
  258. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  259. if err != nil {
  260. return errdefs.InvalidParameter(err)
  261. }
  262. currentService, err := getService(ctx, state.controlClient, serviceIDOrName, false)
  263. if err != nil {
  264. return err
  265. }
  266. resp = &apitypes.ServiceUpdateResponse{}
  267. switch serviceSpec.Task.Runtime.(type) {
  268. case *swarmapi.TaskSpec_Attachment:
  269. return fmt.Errorf("invalid task spec: spec type %q not supported", types.RuntimeNetworkAttachment)
  270. case *swarmapi.TaskSpec_Generic:
  271. switch serviceSpec.Task.GetGeneric().Kind {
  272. case string(types.RuntimePlugin):
  273. if spec.TaskTemplate.PluginSpec == nil {
  274. return errors.New("plugin spec must be set")
  275. }
  276. }
  277. case *swarmapi.TaskSpec_Container:
  278. newCtnr := serviceSpec.Task.GetContainer()
  279. if newCtnr == nil {
  280. return errors.New("service does not use container tasks")
  281. }
  282. encodedAuth := flags.EncodedRegistryAuth
  283. if encodedAuth != "" {
  284. newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  285. } else {
  286. // this is needed because if the encodedAuth isn't being updated then we
  287. // shouldn't lose it, and continue to use the one that was already present
  288. var ctnr *swarmapi.ContainerSpec
  289. switch flags.RegistryAuthFrom {
  290. case apitypes.RegistryAuthFromSpec, "":
  291. ctnr = currentService.Spec.Task.GetContainer()
  292. case apitypes.RegistryAuthFromPreviousSpec:
  293. if currentService.PreviousSpec == nil {
  294. return errors.New("service does not have a previous spec")
  295. }
  296. ctnr = currentService.PreviousSpec.Task.GetContainer()
  297. default:
  298. return errors.New("unsupported registryAuthFrom value")
  299. }
  300. if ctnr == nil {
  301. return errors.New("service does not use container tasks")
  302. }
  303. newCtnr.PullOptions = ctnr.PullOptions
  304. // update encodedAuth so it can be used to pin image by digest
  305. if ctnr.PullOptions != nil {
  306. encodedAuth = ctnr.PullOptions.RegistryAuth
  307. }
  308. }
  309. // retrieve auth config from encoded auth
  310. authConfig := &apitypes.AuthConfig{}
  311. if encodedAuth != "" {
  312. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
  313. logrus.Warnf("invalid authconfig: %v", err)
  314. }
  315. }
  316. // pin image by digest for API versions < 1.30
  317. // TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE"
  318. // should be removed in the future. Since integration tests only use the
  319. // latest API version, so this is no longer required.
  320. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" && queryRegistry {
  321. digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
  322. if err != nil {
  323. logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
  324. // warning in the client response should be concise
  325. resp.Warnings = append(resp.Warnings, digestWarning(newCtnr.Image))
  326. } else if newCtnr.Image != digestImage {
  327. logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
  328. newCtnr.Image = digestImage
  329. } else {
  330. logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image)
  331. }
  332. // Replace the context with a fresh one.
  333. // If we timed out while communicating with the
  334. // registry, then "ctx" will already be expired, which
  335. // would cause UpdateService below to fail. Reusing
  336. // "ctx" could make it impossible to update a service
  337. // if the registry is slow or unresponsive.
  338. var cancel func()
  339. ctx, cancel = c.getRequestContext()
  340. defer cancel()
  341. }
  342. }
  343. var rollback swarmapi.UpdateServiceRequest_Rollback
  344. switch flags.Rollback {
  345. case "", "none":
  346. rollback = swarmapi.UpdateServiceRequest_NONE
  347. case "previous":
  348. rollback = swarmapi.UpdateServiceRequest_PREVIOUS
  349. default:
  350. return fmt.Errorf("unrecognized rollback option %s", flags.Rollback)
  351. }
  352. _, err = state.controlClient.UpdateService(
  353. ctx,
  354. &swarmapi.UpdateServiceRequest{
  355. ServiceID: currentService.ID,
  356. Spec: &serviceSpec,
  357. ServiceVersion: &swarmapi.Version{
  358. Index: version,
  359. },
  360. Rollback: rollback,
  361. },
  362. )
  363. return err
  364. })
  365. return resp, err
  366. }
  367. // RemoveService removes a service from a managed swarm cluster.
  368. func (c *Cluster) RemoveService(input string) error {
  369. return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
  370. service, err := getService(ctx, state.controlClient, input, false)
  371. if err != nil {
  372. return err
  373. }
  374. _, err = state.controlClient.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID})
  375. return err
  376. })
  377. }
  378. // ServiceLogs collects service logs and writes them back to `config.OutStream`
  379. func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector, config *apitypes.ContainerLogsOptions) (<-chan *backend.LogMessage, error) {
  380. c.mu.RLock()
  381. defer c.mu.RUnlock()
  382. state := c.currentNodeState()
  383. if !state.IsActiveManager() {
  384. return nil, c.errNoManager(state)
  385. }
  386. swarmSelector, err := convertSelector(ctx, state.controlClient, selector)
  387. if err != nil {
  388. return nil, errors.Wrap(err, "error making log selector")
  389. }
  390. // set the streams we'll use
  391. stdStreams := []swarmapi.LogStream{}
  392. if config.ShowStdout {
  393. stdStreams = append(stdStreams, swarmapi.LogStreamStdout)
  394. }
  395. if config.ShowStderr {
  396. stdStreams = append(stdStreams, swarmapi.LogStreamStderr)
  397. }
  398. // Get tail value squared away - the number of previous log lines we look at
  399. var tail int64
  400. // in ContainerLogs, if the tail value is ANYTHING non-integer, we just set
  401. // it to -1 (all). i don't agree with that, but i also think no tail value
  402. // should be legitimate. if you don't pass tail, we assume you want "all"
  403. if config.Tail == "all" || config.Tail == "" {
  404. // tail of 0 means send all logs on the swarmkit side
  405. tail = 0
  406. } else {
  407. t, err := strconv.Atoi(config.Tail)
  408. if err != nil {
  409. return nil, errors.New("tail value must be a positive integer or \"all\"")
  410. }
  411. if t < 0 {
  412. return nil, errors.New("negative tail values not supported")
  413. }
  414. // we actually use negative tail in swarmkit to represent messages
  415. // backwards starting from the beginning. also, -1 means no logs. so,
  416. // basically, for api compat with docker container logs, add one and
  417. // flip the sign. we error above if you try to negative tail, which
  418. // isn't supported by docker (and would error deeper in the stack
  419. // anyway)
  420. //
  421. // See the logs protobuf for more information
  422. tail = int64(-(t + 1))
  423. }
  424. // get the since value - the time in the past we're looking at logs starting from
  425. var sinceProto *gogotypes.Timestamp
  426. if config.Since != "" {
  427. s, n, err := timetypes.ParseTimestamps(config.Since, 0)
  428. if err != nil {
  429. return nil, errors.Wrap(err, "could not parse since timestamp")
  430. }
  431. since := time.Unix(s, n)
  432. sinceProto, err = gogotypes.TimestampProto(since)
  433. if err != nil {
  434. return nil, errors.Wrap(err, "could not parse timestamp to proto")
  435. }
  436. }
  437. stream, err := state.logsClient.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
  438. Selector: swarmSelector,
  439. Options: &swarmapi.LogSubscriptionOptions{
  440. Follow: config.Follow,
  441. Streams: stdStreams,
  442. Tail: tail,
  443. Since: sinceProto,
  444. },
  445. })
  446. if err != nil {
  447. return nil, err
  448. }
  449. messageChan := make(chan *backend.LogMessage, 1)
  450. go func() {
  451. defer close(messageChan)
  452. for {
  453. // Check the context before doing anything.
  454. select {
  455. case <-ctx.Done():
  456. return
  457. default:
  458. }
  459. subscribeMsg, err := stream.Recv()
  460. if err == io.EOF {
  461. return
  462. }
  463. // if we're not io.EOF, push the message in and return
  464. if err != nil {
  465. select {
  466. case <-ctx.Done():
  467. case messageChan <- &backend.LogMessage{Err: err}:
  468. }
  469. return
  470. }
  471. for _, msg := range subscribeMsg.Messages {
  472. // make a new message
  473. m := new(backend.LogMessage)
  474. m.Attrs = make([]backend.LogAttr, 0, len(msg.Attrs)+3)
  475. // add the timestamp, adding the error if it fails
  476. m.Timestamp, err = gogotypes.TimestampFromProto(msg.Timestamp)
  477. if err != nil {
  478. m.Err = err
  479. }
  480. nodeKey := contextPrefix + ".node.id"
  481. serviceKey := contextPrefix + ".service.id"
  482. taskKey := contextPrefix + ".task.id"
  483. // copy over all of the details
  484. for _, d := range msg.Attrs {
  485. switch d.Key {
  486. case nodeKey, serviceKey, taskKey:
  487. // we have the final say over context details (in case there
  488. // is a conflict (if the user added a detail with a context's
  489. // key for some reason))
  490. default:
  491. m.Attrs = append(m.Attrs, backend.LogAttr{Key: d.Key, Value: d.Value})
  492. }
  493. }
  494. m.Attrs = append(m.Attrs,
  495. backend.LogAttr{Key: nodeKey, Value: msg.Context.NodeID},
  496. backend.LogAttr{Key: serviceKey, Value: msg.Context.ServiceID},
  497. backend.LogAttr{Key: taskKey, Value: msg.Context.TaskID},
  498. )
  499. switch msg.Stream {
  500. case swarmapi.LogStreamStdout:
  501. m.Source = "stdout"
  502. case swarmapi.LogStreamStderr:
  503. m.Source = "stderr"
  504. }
  505. m.Line = msg.Data
  506. // there could be a case where the reader stops accepting
  507. // messages and the context is canceled. we need to check that
  508. // here, or otherwise we risk blocking forever on the message
  509. // send.
  510. select {
  511. case <-ctx.Done():
  512. return
  513. case messageChan <- m:
  514. }
  515. }
  516. }
  517. }()
  518. return messageChan, nil
  519. }
  520. // convertSelector takes a backend.LogSelector, which contains raw names that
  521. // may or may not be valid, and converts them to an api.LogSelector proto. It
  522. // returns an error if something fails
  523. func convertSelector(ctx context.Context, cc swarmapi.ControlClient, selector *backend.LogSelector) (*swarmapi.LogSelector, error) {
  524. // don't rely on swarmkit to resolve IDs, do it ourselves
  525. swarmSelector := &swarmapi.LogSelector{}
  526. for _, s := range selector.Services {
  527. service, err := getService(ctx, cc, s, false)
  528. if err != nil {
  529. return nil, err
  530. }
  531. c := service.Spec.Task.GetContainer()
  532. if c == nil {
  533. return nil, errors.New("logs only supported on container tasks")
  534. }
  535. swarmSelector.ServiceIDs = append(swarmSelector.ServiceIDs, service.ID)
  536. }
  537. for _, t := range selector.Tasks {
  538. task, err := getTask(ctx, cc, t)
  539. if err != nil {
  540. return nil, err
  541. }
  542. c := task.Spec.GetContainer()
  543. if c == nil {
  544. return nil, errors.New("logs only supported on container tasks")
  545. }
  546. swarmSelector.TaskIDs = append(swarmSelector.TaskIDs, task.ID)
  547. }
  548. return swarmSelector, nil
  549. }
  550. // imageWithDigestString takes an image such as name or name:tag
  551. // and returns the image pinned to a digest, such as name@sha256:34234
  552. func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authConfig *apitypes.AuthConfig) (string, error) {
  553. ref, err := reference.ParseAnyReference(image)
  554. if err != nil {
  555. return "", err
  556. }
  557. namedRef, ok := ref.(reference.Named)
  558. if !ok {
  559. if _, ok := ref.(reference.Digested); ok {
  560. return image, nil
  561. }
  562. return "", errors.Errorf("unknown image reference format: %s", image)
  563. }
  564. // only query registry if not a canonical reference (i.e. with digest)
  565. if _, ok := namedRef.(reference.Canonical); !ok {
  566. namedRef = reference.TagNameOnly(namedRef)
  567. taggedRef, ok := namedRef.(reference.NamedTagged)
  568. if !ok {
  569. return "", errors.Errorf("image reference not tagged: %s", image)
  570. }
  571. repo, err := c.config.ImageBackend.GetRepository(ctx, taggedRef, authConfig)
  572. if err != nil {
  573. return "", err
  574. }
  575. dscrptr, err := repo.Tags(ctx).Get(ctx, taggedRef.Tag())
  576. if err != nil {
  577. return "", err
  578. }
  579. namedDigestedRef, err := reference.WithDigest(taggedRef, dscrptr.Digest)
  580. if err != nil {
  581. return "", err
  582. }
  583. // return familiar form until interface updated to return type
  584. return reference.FamiliarString(namedDigestedRef), nil
  585. }
  586. // reference already contains a digest, so just return it
  587. return reference.FamiliarString(ref), nil
  588. }
  589. // digestWarning constructs a formatted warning string
  590. // using the image name that could not be pinned by digest. The
  591. // formatting is hardcoded, but could me made smarter in the future
  592. func digestWarning(image string) string {
  593. return fmt.Sprintf("image %s could not be accessed on a registry to record\nits digest. Each node will access %s independently,\npossibly leading to different nodes running different\nversions of the image.\n", image, image)
  594. }