services.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. package cluster // import "github.com/docker/docker/daemon/cluster"
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "os"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/containerd/log"
  13. "github.com/distribution/reference"
  14. "github.com/docker/docker/api/types"
  15. "github.com/docker/docker/api/types/backend"
  16. "github.com/docker/docker/api/types/container"
  17. "github.com/docker/docker/api/types/registry"
  18. "github.com/docker/docker/api/types/swarm"
  19. timetypes "github.com/docker/docker/api/types/time"
  20. "github.com/docker/docker/daemon/cluster/convert"
  21. "github.com/docker/docker/errdefs"
  22. "github.com/docker/docker/internal/compatcontext"
  23. runconfigopts "github.com/docker/docker/runconfig/opts"
  24. gogotypes "github.com/gogo/protobuf/types"
  25. swarmapi "github.com/moby/swarmkit/v2/api"
  26. "github.com/pkg/errors"
  27. "google.golang.org/grpc"
  28. )
  29. // GetServices returns all services of a managed swarm cluster.
  30. func (c *Cluster) GetServices(options types.ServiceListOptions) ([]swarm.Service, error) {
  31. c.mu.RLock()
  32. defer c.mu.RUnlock()
  33. state := c.currentNodeState()
  34. if !state.IsActiveManager() {
  35. return nil, c.errNoManager(state)
  36. }
  37. // We move the accepted filter check here as "mode" filter
  38. // is processed in the daemon, not in SwarmKit. So it might
  39. // be good to have accepted file check in the same file as
  40. // the filter processing (in the for loop below).
  41. accepted := map[string]bool{
  42. "name": true,
  43. "id": true,
  44. "label": true,
  45. "mode": true,
  46. "runtime": true,
  47. }
  48. if err := options.Filters.Validate(accepted); err != nil {
  49. return nil, err
  50. }
  51. if len(options.Filters.Get("runtime")) == 0 {
  52. // Default to using the container runtime filter
  53. options.Filters.Add("runtime", string(swarm.RuntimeContainer))
  54. }
  55. filters := &swarmapi.ListServicesRequest_Filters{
  56. NamePrefixes: options.Filters.Get("name"),
  57. IDPrefixes: options.Filters.Get("id"),
  58. Labels: runconfigopts.ConvertKVStringsToMap(options.Filters.Get("label")),
  59. Runtimes: options.Filters.Get("runtime"),
  60. }
  61. ctx := context.TODO()
  62. ctx, cancel := c.getRequestContext(ctx)
  63. defer cancel()
  64. r, err := state.controlClient.ListServices(
  65. ctx,
  66. &swarmapi.ListServicesRequest{Filters: filters},
  67. grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
  68. )
  69. if err != nil {
  70. return nil, err
  71. }
  72. services := make([]swarm.Service, 0, len(r.Services))
  73. // if the user requests the service statuses, we'll store the IDs needed
  74. // in this slice
  75. var serviceIDs []string
  76. if options.Status {
  77. serviceIDs = make([]string, 0, len(r.Services))
  78. }
  79. for _, service := range r.Services {
  80. if options.Filters.Contains("mode") {
  81. var mode string
  82. switch service.Spec.GetMode().(type) {
  83. case *swarmapi.ServiceSpec_Global:
  84. mode = "global"
  85. case *swarmapi.ServiceSpec_Replicated:
  86. mode = "replicated"
  87. case *swarmapi.ServiceSpec_ReplicatedJob:
  88. mode = "replicated-job"
  89. case *swarmapi.ServiceSpec_GlobalJob:
  90. mode = "global-job"
  91. }
  92. if !options.Filters.ExactMatch("mode", mode) {
  93. continue
  94. }
  95. }
  96. if options.Status {
  97. serviceIDs = append(serviceIDs, service.ID)
  98. }
  99. svcs, err := convert.ServiceFromGRPC(*service)
  100. if err != nil {
  101. return nil, err
  102. }
  103. services = append(services, svcs)
  104. }
  105. if options.Status {
  106. // Listing service statuses is a separate call because, while it is the
  107. // most common UI operation, it is still just a UI operation, and it
  108. // would be improper to include this data in swarm's Service object.
  109. // We pay the cost with some complexity here, but this is still way
  110. // more efficient than marshalling and unmarshalling all the JSON
  111. // needed to list tasks and get this data otherwise client-side
  112. resp, err := state.controlClient.ListServiceStatuses(
  113. ctx,
  114. &swarmapi.ListServiceStatusesRequest{Services: serviceIDs},
  115. grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse),
  116. )
  117. if err != nil {
  118. return nil, err
  119. }
  120. // we'll need to match up statuses in the response with the services in
  121. // the list operation. if we did this by operating on two lists, the
  122. // result would be quadratic. instead, make a mapping of service IDs to
  123. // service statuses so that this is roughly linear. additionally,
  124. // convert the status response to an engine api service status here.
  125. serviceMap := map[string]*swarm.ServiceStatus{}
  126. for _, status := range resp.Statuses {
  127. serviceMap[status.ServiceID] = &swarm.ServiceStatus{
  128. RunningTasks: status.RunningTasks,
  129. DesiredTasks: status.DesiredTasks,
  130. CompletedTasks: status.CompletedTasks,
  131. }
  132. }
  133. // because this is a list of values and not pointers, make sure we
  134. // actually alter the value when iterating.
  135. for i, service := range services {
  136. // the return value of the ListServiceStatuses operation is
  137. // guaranteed to contain a value in the response for every argument
  138. // in the request, so we can safely do this assignment. and even if
  139. // it wasn't, and the service ID was for some reason absent from
  140. // this map, the resulting value of service.Status would just be
  141. // nil -- the same thing it was before
  142. service.ServiceStatus = serviceMap[service.ID]
  143. services[i] = service
  144. }
  145. }
  146. return services, nil
  147. }
  148. // GetService returns a service based on an ID or name.
  149. func (c *Cluster) GetService(input string, insertDefaults bool) (swarm.Service, error) {
  150. var service *swarmapi.Service
  151. if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
  152. s, err := getService(ctx, state.controlClient, input, insertDefaults)
  153. if err != nil {
  154. return err
  155. }
  156. service = s
  157. return nil
  158. }); err != nil {
  159. return swarm.Service{}, err
  160. }
  161. svc, err := convert.ServiceFromGRPC(*service)
  162. if err != nil {
  163. return swarm.Service{}, err
  164. }
  165. return svc, nil
  166. }
  167. // CreateService creates a new service in a managed swarm cluster.
  168. func (c *Cluster) CreateService(s swarm.ServiceSpec, encodedAuth string, queryRegistry bool) (*swarm.ServiceCreateResponse, error) {
  169. var resp *swarm.ServiceCreateResponse
  170. err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
  171. err := c.populateNetworkID(ctx, state.controlClient, &s)
  172. if err != nil {
  173. return err
  174. }
  175. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  176. if err != nil {
  177. return errdefs.InvalidParameter(err)
  178. }
  179. resp = &swarm.ServiceCreateResponse{}
  180. switch serviceSpec.Task.Runtime.(type) {
  181. case *swarmapi.TaskSpec_Attachment:
  182. return fmt.Errorf("invalid task spec: spec type %q not supported", swarm.RuntimeNetworkAttachment)
  183. // handle other runtimes here
  184. case *swarmapi.TaskSpec_Generic:
  185. switch serviceSpec.Task.GetGeneric().Kind {
  186. case string(swarm.RuntimePlugin):
  187. if !c.config.Backend.HasExperimental() {
  188. return fmt.Errorf("runtime type %q only supported in experimental", swarm.RuntimePlugin)
  189. }
  190. if s.TaskTemplate.PluginSpec == nil {
  191. return errors.New("plugin spec must be set")
  192. }
  193. default:
  194. return fmt.Errorf("unsupported runtime type: %q", serviceSpec.Task.GetGeneric().Kind)
  195. }
  196. r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  197. if err != nil {
  198. return err
  199. }
  200. resp.ID = r.Service.ID
  201. case *swarmapi.TaskSpec_Container:
  202. ctnr := serviceSpec.Task.GetContainer()
  203. if ctnr == nil {
  204. return errors.New("service does not use container tasks")
  205. }
  206. if encodedAuth != "" {
  207. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  208. }
  209. // retrieve auth config from encoded auth
  210. authConfig := &registry.AuthConfig{}
  211. if encodedAuth != "" {
  212. authReader := strings.NewReader(encodedAuth)
  213. dec := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, authReader))
  214. if err := dec.Decode(authConfig); err != nil {
  215. log.G(ctx).Warnf("invalid authconfig: %v", err)
  216. }
  217. }
  218. // pin image by digest for API versions < 1.30
  219. // TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE"
  220. // should be removed in the future. Since integration tests only use the
  221. // latest API version, so this is no longer required.
  222. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" && queryRegistry {
  223. digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
  224. if err != nil {
  225. log.G(ctx).Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
  226. // warning in the client response should be concise
  227. resp.Warnings = append(resp.Warnings, digestWarning(ctnr.Image))
  228. } else if ctnr.Image != digestImage {
  229. log.G(ctx).Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
  230. ctnr.Image = digestImage
  231. } else {
  232. log.G(ctx).Debugf("creating service using supplied digest reference %s", ctnr.Image)
  233. }
  234. // Replace the context with a fresh one.
  235. // If we timed out while communicating with the
  236. // registry, then "ctx" will already be expired, which
  237. // would cause UpdateService below to fail. Reusing
  238. // "ctx" could make it impossible to create a service
  239. // if the registry is slow or unresponsive.
  240. var cancel func()
  241. ctx = compatcontext.WithoutCancel(ctx)
  242. ctx, cancel = c.getRequestContext(ctx)
  243. defer cancel()
  244. }
  245. r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  246. if err != nil {
  247. return err
  248. }
  249. resp.ID = r.Service.ID
  250. }
  251. return nil
  252. })
  253. return resp, err
  254. }
  255. // UpdateService updates existing service to match new properties.
  256. func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec swarm.ServiceSpec, flags types.ServiceUpdateOptions, queryRegistry bool) (*swarm.ServiceUpdateResponse, error) {
  257. var resp *swarm.ServiceUpdateResponse
  258. err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
  259. err := c.populateNetworkID(ctx, state.controlClient, &spec)
  260. if err != nil {
  261. return err
  262. }
  263. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  264. if err != nil {
  265. return errdefs.InvalidParameter(err)
  266. }
  267. currentService, err := getService(ctx, state.controlClient, serviceIDOrName, false)
  268. if err != nil {
  269. return err
  270. }
  271. resp = &swarm.ServiceUpdateResponse{}
  272. switch serviceSpec.Task.Runtime.(type) {
  273. case *swarmapi.TaskSpec_Attachment:
  274. return fmt.Errorf("invalid task spec: spec type %q not supported", swarm.RuntimeNetworkAttachment)
  275. case *swarmapi.TaskSpec_Generic:
  276. switch serviceSpec.Task.GetGeneric().Kind {
  277. case string(swarm.RuntimePlugin):
  278. if spec.TaskTemplate.PluginSpec == nil {
  279. return errors.New("plugin spec must be set")
  280. }
  281. }
  282. case *swarmapi.TaskSpec_Container:
  283. newCtnr := serviceSpec.Task.GetContainer()
  284. if newCtnr == nil {
  285. return errors.New("service does not use container tasks")
  286. }
  287. encodedAuth := flags.EncodedRegistryAuth
  288. if encodedAuth != "" {
  289. newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  290. } else {
  291. // this is needed because if the encodedAuth isn't being updated then we
  292. // shouldn't lose it, and continue to use the one that was already present
  293. var ctnr *swarmapi.ContainerSpec
  294. switch flags.RegistryAuthFrom {
  295. case types.RegistryAuthFromSpec, "":
  296. ctnr = currentService.Spec.Task.GetContainer()
  297. case types.RegistryAuthFromPreviousSpec:
  298. if currentService.PreviousSpec == nil {
  299. return errors.New("service does not have a previous spec")
  300. }
  301. ctnr = currentService.PreviousSpec.Task.GetContainer()
  302. default:
  303. return errors.New("unsupported registryAuthFrom value")
  304. }
  305. if ctnr == nil {
  306. return errors.New("service does not use container tasks")
  307. }
  308. newCtnr.PullOptions = ctnr.PullOptions
  309. // update encodedAuth so it can be used to pin image by digest
  310. if ctnr.PullOptions != nil {
  311. encodedAuth = ctnr.PullOptions.RegistryAuth
  312. }
  313. }
  314. // retrieve auth config from encoded auth
  315. authConfig := &registry.AuthConfig{}
  316. if encodedAuth != "" {
  317. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
  318. log.G(ctx).Warnf("invalid authconfig: %v", err)
  319. }
  320. }
  321. // pin image by digest for API versions < 1.30
  322. // TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE"
  323. // should be removed in the future. Since integration tests only use the
  324. // latest API version, so this is no longer required.
  325. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" && queryRegistry {
  326. digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
  327. if err != nil {
  328. log.G(ctx).Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
  329. // warning in the client response should be concise
  330. resp.Warnings = append(resp.Warnings, digestWarning(newCtnr.Image))
  331. } else if newCtnr.Image != digestImage {
  332. log.G(ctx).Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
  333. newCtnr.Image = digestImage
  334. } else {
  335. log.G(ctx).Debugf("updating service using supplied digest reference %s", newCtnr.Image)
  336. }
  337. // Replace the context with a fresh one.
  338. // If we timed out while communicating with the
  339. // registry, then "ctx" will already be expired, which
  340. // would cause UpdateService below to fail. Reusing
  341. // "ctx" could make it impossible to update a service
  342. // if the registry is slow or unresponsive.
  343. var cancel func()
  344. ctx = compatcontext.WithoutCancel(ctx)
  345. ctx, cancel = c.getRequestContext(ctx)
  346. defer cancel()
  347. }
  348. }
  349. var rollback swarmapi.UpdateServiceRequest_Rollback
  350. switch flags.Rollback {
  351. case "", "none":
  352. rollback = swarmapi.UpdateServiceRequest_NONE
  353. case "previous":
  354. rollback = swarmapi.UpdateServiceRequest_PREVIOUS
  355. default:
  356. return fmt.Errorf("unrecognized rollback option %s", flags.Rollback)
  357. }
  358. _, err = state.controlClient.UpdateService(
  359. ctx,
  360. &swarmapi.UpdateServiceRequest{
  361. ServiceID: currentService.ID,
  362. Spec: &serviceSpec,
  363. ServiceVersion: &swarmapi.Version{
  364. Index: version,
  365. },
  366. Rollback: rollback,
  367. },
  368. )
  369. return err
  370. })
  371. return resp, err
  372. }
  373. // RemoveService removes a service from a managed swarm cluster.
  374. func (c *Cluster) RemoveService(input string) error {
  375. return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
  376. service, err := getService(ctx, state.controlClient, input, false)
  377. if err != nil {
  378. return err
  379. }
  380. _, err = state.controlClient.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID})
  381. return err
  382. })
  383. }
  384. // ServiceLogs collects service logs and writes them back to `config.OutStream`
  385. func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector, config *container.LogsOptions) (<-chan *backend.LogMessage, error) {
  386. c.mu.RLock()
  387. defer c.mu.RUnlock()
  388. state := c.currentNodeState()
  389. if !state.IsActiveManager() {
  390. return nil, c.errNoManager(state)
  391. }
  392. swarmSelector, err := convertSelector(ctx, state.controlClient, selector)
  393. if err != nil {
  394. return nil, errors.Wrap(err, "error making log selector")
  395. }
  396. // set the streams we'll use
  397. stdStreams := []swarmapi.LogStream{}
  398. if config.ShowStdout {
  399. stdStreams = append(stdStreams, swarmapi.LogStreamStdout)
  400. }
  401. if config.ShowStderr {
  402. stdStreams = append(stdStreams, swarmapi.LogStreamStderr)
  403. }
  404. // Get tail value squared away - the number of previous log lines we look at
  405. var tail int64
  406. // in ContainerLogs, if the tail value is ANYTHING non-integer, we just set
  407. // it to -1 (all). i don't agree with that, but i also think no tail value
  408. // should be legitimate. if you don't pass tail, we assume you want "all"
  409. if config.Tail == "all" || config.Tail == "" {
  410. // tail of 0 means send all logs on the swarmkit side
  411. tail = 0
  412. } else {
  413. t, err := strconv.Atoi(config.Tail)
  414. if err != nil {
  415. return nil, errors.New(`tail value must be a positive integer or "all"`)
  416. }
  417. if t < 0 {
  418. return nil, errors.New("negative tail values not supported")
  419. }
  420. // we actually use negative tail in swarmkit to represent messages
  421. // backwards starting from the beginning. also, -1 means no logs. so,
  422. // basically, for api compat with docker container logs, add one and
  423. // flip the sign. we error above if you try to negative tail, which
  424. // isn't supported by docker (and would error deeper in the stack
  425. // anyway)
  426. //
  427. // See the logs protobuf for more information
  428. tail = int64(-(t + 1))
  429. }
  430. // get the since value - the time in the past we're looking at logs starting from
  431. var sinceProto *gogotypes.Timestamp
  432. if config.Since != "" {
  433. s, n, err := timetypes.ParseTimestamps(config.Since, 0)
  434. if err != nil {
  435. return nil, errors.Wrap(err, "could not parse since timestamp")
  436. }
  437. since := time.Unix(s, n)
  438. sinceProto, err = gogotypes.TimestampProto(since)
  439. if err != nil {
  440. return nil, errors.Wrap(err, "could not parse timestamp to proto")
  441. }
  442. }
  443. stream, err := state.logsClient.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
  444. Selector: swarmSelector,
  445. Options: &swarmapi.LogSubscriptionOptions{
  446. Follow: config.Follow,
  447. Streams: stdStreams,
  448. Tail: tail,
  449. Since: sinceProto,
  450. },
  451. })
  452. if err != nil {
  453. return nil, err
  454. }
  455. messageChan := make(chan *backend.LogMessage, 1)
  456. go func() {
  457. defer close(messageChan)
  458. for {
  459. // Check the context before doing anything.
  460. select {
  461. case <-ctx.Done():
  462. return
  463. default:
  464. }
  465. subscribeMsg, err := stream.Recv()
  466. if err == io.EOF {
  467. return
  468. }
  469. // if we're not io.EOF, push the message in and return
  470. if err != nil {
  471. select {
  472. case <-ctx.Done():
  473. case messageChan <- &backend.LogMessage{Err: err}:
  474. }
  475. return
  476. }
  477. for _, msg := range subscribeMsg.Messages {
  478. // make a new message
  479. m := new(backend.LogMessage)
  480. m.Attrs = make([]backend.LogAttr, 0, len(msg.Attrs)+3)
  481. // add the timestamp, adding the error if it fails
  482. m.Timestamp, err = gogotypes.TimestampFromProto(msg.Timestamp)
  483. if err != nil {
  484. m.Err = err
  485. }
  486. nodeKey := contextPrefix + ".node.id"
  487. serviceKey := contextPrefix + ".service.id"
  488. taskKey := contextPrefix + ".task.id"
  489. // copy over all of the details
  490. for _, d := range msg.Attrs {
  491. switch d.Key {
  492. case nodeKey, serviceKey, taskKey:
  493. // we have the final say over context details (in case there
  494. // is a conflict (if the user added a detail with a context's
  495. // key for some reason))
  496. default:
  497. m.Attrs = append(m.Attrs, backend.LogAttr{Key: d.Key, Value: d.Value})
  498. }
  499. }
  500. m.Attrs = append(m.Attrs,
  501. backend.LogAttr{Key: nodeKey, Value: msg.Context.NodeID},
  502. backend.LogAttr{Key: serviceKey, Value: msg.Context.ServiceID},
  503. backend.LogAttr{Key: taskKey, Value: msg.Context.TaskID},
  504. )
  505. switch msg.Stream {
  506. case swarmapi.LogStreamStdout:
  507. m.Source = "stdout"
  508. case swarmapi.LogStreamStderr:
  509. m.Source = "stderr"
  510. }
  511. m.Line = msg.Data
  512. // there could be a case where the reader stops accepting
  513. // messages and the context is canceled. we need to check that
  514. // here, or otherwise we risk blocking forever on the message
  515. // send.
  516. select {
  517. case <-ctx.Done():
  518. return
  519. case messageChan <- m:
  520. }
  521. }
  522. }
  523. }()
  524. return messageChan, nil
  525. }
  526. // convertSelector takes a backend.LogSelector, which contains raw names that
  527. // may or may not be valid, and converts them to an api.LogSelector proto. It
  528. // returns an error if something fails
  529. func convertSelector(ctx context.Context, cc swarmapi.ControlClient, selector *backend.LogSelector) (*swarmapi.LogSelector, error) {
  530. // don't rely on swarmkit to resolve IDs, do it ourselves
  531. swarmSelector := &swarmapi.LogSelector{}
  532. for _, s := range selector.Services {
  533. service, err := getService(ctx, cc, s, false)
  534. if err != nil {
  535. return nil, err
  536. }
  537. c := service.Spec.Task.GetContainer()
  538. if c == nil {
  539. return nil, errors.New("logs only supported on container tasks")
  540. }
  541. swarmSelector.ServiceIDs = append(swarmSelector.ServiceIDs, service.ID)
  542. }
  543. for _, t := range selector.Tasks {
  544. task, err := getTask(ctx, cc, t)
  545. if err != nil {
  546. return nil, err
  547. }
  548. c := task.Spec.GetContainer()
  549. if c == nil {
  550. return nil, errors.New("logs only supported on container tasks")
  551. }
  552. swarmSelector.TaskIDs = append(swarmSelector.TaskIDs, task.ID)
  553. }
  554. return swarmSelector, nil
  555. }
  556. // imageWithDigestString takes an image such as name or name:tag
  557. // and returns the image pinned to a digest, such as name@sha256:34234
  558. func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authConfig *registry.AuthConfig) (string, error) {
  559. ref, err := reference.ParseAnyReference(image)
  560. if err != nil {
  561. return "", err
  562. }
  563. namedRef, ok := ref.(reference.Named)
  564. if !ok {
  565. if _, ok := ref.(reference.Digested); ok {
  566. return image, nil
  567. }
  568. return "", errors.Errorf("unknown image reference format: %s", image)
  569. }
  570. // only query registry if not a canonical reference (i.e. with digest)
  571. if _, ok := namedRef.(reference.Canonical); !ok {
  572. namedRef = reference.TagNameOnly(namedRef)
  573. taggedRef, ok := namedRef.(reference.NamedTagged)
  574. if !ok {
  575. return "", errors.Errorf("image reference not tagged: %s", image)
  576. }
  577. repo, err := c.config.ImageBackend.GetRepository(ctx, taggedRef, authConfig)
  578. if err != nil {
  579. return "", err
  580. }
  581. dscrptr, err := repo.Tags(ctx).Get(ctx, taggedRef.Tag())
  582. if err != nil {
  583. return "", err
  584. }
  585. namedDigestedRef, err := reference.WithDigest(taggedRef, dscrptr.Digest)
  586. if err != nil {
  587. return "", err
  588. }
  589. // return familiar form until interface updated to return type
  590. return reference.FamiliarString(namedDigestedRef), nil
  591. }
  592. // reference already contains a digest, so just return it
  593. return reference.FamiliarString(ref), nil
  594. }
  595. // digestWarning constructs a formatted warning string
  596. // using the image name that could not be pinned by digest. The
  597. // formatting is hardcoded, but could me made smarter in the future
  598. func digestWarning(image string) string {
  599. return fmt.Sprintf("image %s could not be accessed on a registry to record\nits digest. Each node will access %s independently,\npossibly leading to different nodes running different\nversions of the image.\n", image, image)
  600. }