services.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598
  1. package cluster
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "os"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/docker/distribution/reference"
  12. apitypes "github.com/docker/docker/api/types"
  13. "github.com/docker/docker/api/types/backend"
  14. types "github.com/docker/docker/api/types/swarm"
  15. timetypes "github.com/docker/docker/api/types/time"
  16. "github.com/docker/docker/daemon/cluster/convert"
  17. "github.com/docker/docker/errdefs"
  18. runconfigopts "github.com/docker/docker/runconfig/opts"
  19. swarmapi "github.com/docker/swarmkit/api"
  20. gogotypes "github.com/gogo/protobuf/types"
  21. "github.com/pkg/errors"
  22. "github.com/sirupsen/logrus"
  23. "golang.org/x/net/context"
  24. )
  25. // GetServices returns all services of a managed swarm cluster.
  26. func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
  27. c.mu.RLock()
  28. defer c.mu.RUnlock()
  29. state := c.currentNodeState()
  30. if !state.IsActiveManager() {
  31. return nil, c.errNoManager(state)
  32. }
  33. // We move the accepted filter check here as "mode" filter
  34. // is processed in the daemon, not in SwarmKit. So it might
  35. // be good to have accepted file check in the same file as
  36. // the filter processing (in the for loop below).
  37. accepted := map[string]bool{
  38. "name": true,
  39. "id": true,
  40. "label": true,
  41. "mode": true,
  42. "runtime": true,
  43. }
  44. if err := options.Filters.Validate(accepted); err != nil {
  45. return nil, err
  46. }
  47. if len(options.Filters.Get("runtime")) == 0 {
  48. // Default to using the container runtime filter
  49. options.Filters.Add("runtime", string(types.RuntimeContainer))
  50. }
  51. filters := &swarmapi.ListServicesRequest_Filters{
  52. NamePrefixes: options.Filters.Get("name"),
  53. IDPrefixes: options.Filters.Get("id"),
  54. Labels: runconfigopts.ConvertKVStringsToMap(options.Filters.Get("label")),
  55. Runtimes: options.Filters.Get("runtime"),
  56. }
  57. ctx, cancel := c.getRequestContext()
  58. defer cancel()
  59. r, err := state.controlClient.ListServices(
  60. ctx,
  61. &swarmapi.ListServicesRequest{Filters: filters})
  62. if err != nil {
  63. return nil, err
  64. }
  65. services := make([]types.Service, 0, len(r.Services))
  66. for _, service := range r.Services {
  67. if options.Filters.Contains("mode") {
  68. var mode string
  69. switch service.Spec.GetMode().(type) {
  70. case *swarmapi.ServiceSpec_Global:
  71. mode = "global"
  72. case *swarmapi.ServiceSpec_Replicated:
  73. mode = "replicated"
  74. }
  75. if !options.Filters.ExactMatch("mode", mode) {
  76. continue
  77. }
  78. }
  79. svcs, err := convert.ServiceFromGRPC(*service)
  80. if err != nil {
  81. return nil, err
  82. }
  83. services = append(services, svcs)
  84. }
  85. return services, nil
  86. }
  87. // GetService returns a service based on an ID or name.
  88. func (c *Cluster) GetService(input string, insertDefaults bool) (types.Service, error) {
  89. var service *swarmapi.Service
  90. if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
  91. s, err := getService(ctx, state.controlClient, input, insertDefaults)
  92. if err != nil {
  93. return err
  94. }
  95. service = s
  96. return nil
  97. }); err != nil {
  98. return types.Service{}, err
  99. }
  100. svc, err := convert.ServiceFromGRPC(*service)
  101. if err != nil {
  102. return types.Service{}, err
  103. }
  104. return svc, nil
  105. }
  106. // CreateService creates a new service in a managed swarm cluster.
  107. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string, queryRegistry bool) (*apitypes.ServiceCreateResponse, error) {
  108. var resp *apitypes.ServiceCreateResponse
  109. err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
  110. err := c.populateNetworkID(ctx, state.controlClient, &s)
  111. if err != nil {
  112. return err
  113. }
  114. serviceSpec, err := convert.ServiceSpecToGRPC(s)
  115. if err != nil {
  116. return errdefs.InvalidParameter(err)
  117. }
  118. resp = &apitypes.ServiceCreateResponse{}
  119. switch serviceSpec.Task.Runtime.(type) {
  120. // handle other runtimes here
  121. case *swarmapi.TaskSpec_Generic:
  122. switch serviceSpec.Task.GetGeneric().Kind {
  123. case string(types.RuntimePlugin):
  124. info, _ := c.config.Backend.SystemInfo()
  125. if !info.ExperimentalBuild {
  126. return fmt.Errorf("runtime type %q only supported in experimental", types.RuntimePlugin)
  127. }
  128. if s.TaskTemplate.PluginSpec == nil {
  129. return errors.New("plugin spec must be set")
  130. }
  131. default:
  132. return fmt.Errorf("unsupported runtime type: %q", serviceSpec.Task.GetGeneric().Kind)
  133. }
  134. r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  135. if err != nil {
  136. return err
  137. }
  138. resp.ID = r.Service.ID
  139. case *swarmapi.TaskSpec_Container:
  140. ctnr := serviceSpec.Task.GetContainer()
  141. if ctnr == nil {
  142. return errors.New("service does not use container tasks")
  143. }
  144. if encodedAuth != "" {
  145. ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  146. }
  147. // retrieve auth config from encoded auth
  148. authConfig := &apitypes.AuthConfig{}
  149. if encodedAuth != "" {
  150. authReader := strings.NewReader(encodedAuth)
  151. dec := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, authReader))
  152. if err := dec.Decode(authConfig); err != nil {
  153. logrus.Warnf("invalid authconfig: %v", err)
  154. }
  155. }
  156. // pin image by digest for API versions < 1.30
  157. // TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE"
  158. // should be removed in the future. Since integration tests only use the
  159. // latest API version, so this is no longer required.
  160. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" && queryRegistry {
  161. digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
  162. if err != nil {
  163. logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
  164. // warning in the client response should be concise
  165. resp.Warnings = append(resp.Warnings, digestWarning(ctnr.Image))
  166. } else if ctnr.Image != digestImage {
  167. logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
  168. ctnr.Image = digestImage
  169. } else {
  170. logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image)
  171. }
  172. // Replace the context with a fresh one.
  173. // If we timed out while communicating with the
  174. // registry, then "ctx" will already be expired, which
  175. // would cause UpdateService below to fail. Reusing
  176. // "ctx" could make it impossible to create a service
  177. // if the registry is slow or unresponsive.
  178. var cancel func()
  179. ctx, cancel = c.getRequestContext()
  180. defer cancel()
  181. }
  182. r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
  183. if err != nil {
  184. return err
  185. }
  186. resp.ID = r.Service.ID
  187. }
  188. return nil
  189. })
  190. return resp, err
  191. }
  192. // UpdateService updates existing service to match new properties.
  193. func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, flags apitypes.ServiceUpdateOptions, queryRegistry bool) (*apitypes.ServiceUpdateResponse, error) {
  194. var resp *apitypes.ServiceUpdateResponse
  195. err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
  196. err := c.populateNetworkID(ctx, state.controlClient, &spec)
  197. if err != nil {
  198. return err
  199. }
  200. serviceSpec, err := convert.ServiceSpecToGRPC(spec)
  201. if err != nil {
  202. return errdefs.InvalidParameter(err)
  203. }
  204. currentService, err := getService(ctx, state.controlClient, serviceIDOrName, false)
  205. if err != nil {
  206. return err
  207. }
  208. resp = &apitypes.ServiceUpdateResponse{}
  209. switch serviceSpec.Task.Runtime.(type) {
  210. case *swarmapi.TaskSpec_Generic:
  211. switch serviceSpec.Task.GetGeneric().Kind {
  212. case string(types.RuntimePlugin):
  213. if spec.TaskTemplate.PluginSpec == nil {
  214. return errors.New("plugin spec must be set")
  215. }
  216. }
  217. case *swarmapi.TaskSpec_Container:
  218. newCtnr := serviceSpec.Task.GetContainer()
  219. if newCtnr == nil {
  220. return errors.New("service does not use container tasks")
  221. }
  222. encodedAuth := flags.EncodedRegistryAuth
  223. if encodedAuth != "" {
  224. newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
  225. } else {
  226. // this is needed because if the encodedAuth isn't being updated then we
  227. // shouldn't lose it, and continue to use the one that was already present
  228. var ctnr *swarmapi.ContainerSpec
  229. switch flags.RegistryAuthFrom {
  230. case apitypes.RegistryAuthFromSpec, "":
  231. ctnr = currentService.Spec.Task.GetContainer()
  232. case apitypes.RegistryAuthFromPreviousSpec:
  233. if currentService.PreviousSpec == nil {
  234. return errors.New("service does not have a previous spec")
  235. }
  236. ctnr = currentService.PreviousSpec.Task.GetContainer()
  237. default:
  238. return errors.New("unsupported registryAuthFrom value")
  239. }
  240. if ctnr == nil {
  241. return errors.New("service does not use container tasks")
  242. }
  243. newCtnr.PullOptions = ctnr.PullOptions
  244. // update encodedAuth so it can be used to pin image by digest
  245. if ctnr.PullOptions != nil {
  246. encodedAuth = ctnr.PullOptions.RegistryAuth
  247. }
  248. }
  249. // retrieve auth config from encoded auth
  250. authConfig := &apitypes.AuthConfig{}
  251. if encodedAuth != "" {
  252. if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
  253. logrus.Warnf("invalid authconfig: %v", err)
  254. }
  255. }
  256. // pin image by digest for API versions < 1.30
  257. // TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE"
  258. // should be removed in the future. Since integration tests only use the
  259. // latest API version, so this is no longer required.
  260. if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" && queryRegistry {
  261. digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
  262. if err != nil {
  263. logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
  264. // warning in the client response should be concise
  265. resp.Warnings = append(resp.Warnings, digestWarning(newCtnr.Image))
  266. } else if newCtnr.Image != digestImage {
  267. logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
  268. newCtnr.Image = digestImage
  269. } else {
  270. logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image)
  271. }
  272. // Replace the context with a fresh one.
  273. // If we timed out while communicating with the
  274. // registry, then "ctx" will already be expired, which
  275. // would cause UpdateService below to fail. Reusing
  276. // "ctx" could make it impossible to update a service
  277. // if the registry is slow or unresponsive.
  278. var cancel func()
  279. ctx, cancel = c.getRequestContext()
  280. defer cancel()
  281. }
  282. }
  283. var rollback swarmapi.UpdateServiceRequest_Rollback
  284. switch flags.Rollback {
  285. case "", "none":
  286. rollback = swarmapi.UpdateServiceRequest_NONE
  287. case "previous":
  288. rollback = swarmapi.UpdateServiceRequest_PREVIOUS
  289. default:
  290. return fmt.Errorf("unrecognized rollback option %s", flags.Rollback)
  291. }
  292. _, err = state.controlClient.UpdateService(
  293. ctx,
  294. &swarmapi.UpdateServiceRequest{
  295. ServiceID: currentService.ID,
  296. Spec: &serviceSpec,
  297. ServiceVersion: &swarmapi.Version{
  298. Index: version,
  299. },
  300. Rollback: rollback,
  301. },
  302. )
  303. return err
  304. })
  305. return resp, err
  306. }
  307. // RemoveService removes a service from a managed swarm cluster.
  308. func (c *Cluster) RemoveService(input string) error {
  309. return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
  310. service, err := getService(ctx, state.controlClient, input, false)
  311. if err != nil {
  312. return err
  313. }
  314. _, err = state.controlClient.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID})
  315. return err
  316. })
  317. }
  318. // ServiceLogs collects service logs and writes them back to `config.OutStream`
  319. func (c *Cluster) ServiceLogs(ctx context.Context, selector *backend.LogSelector, config *apitypes.ContainerLogsOptions) (<-chan *backend.LogMessage, error) {
  320. c.mu.RLock()
  321. defer c.mu.RUnlock()
  322. state := c.currentNodeState()
  323. if !state.IsActiveManager() {
  324. return nil, c.errNoManager(state)
  325. }
  326. swarmSelector, err := convertSelector(ctx, state.controlClient, selector)
  327. if err != nil {
  328. return nil, errors.Wrap(err, "error making log selector")
  329. }
  330. // set the streams we'll use
  331. stdStreams := []swarmapi.LogStream{}
  332. if config.ShowStdout {
  333. stdStreams = append(stdStreams, swarmapi.LogStreamStdout)
  334. }
  335. if config.ShowStderr {
  336. stdStreams = append(stdStreams, swarmapi.LogStreamStderr)
  337. }
  338. // Get tail value squared away - the number of previous log lines we look at
  339. var tail int64
  340. // in ContainerLogs, if the tail value is ANYTHING non-integer, we just set
  341. // it to -1 (all). i don't agree with that, but i also think no tail value
  342. // should be legitimate. if you don't pass tail, we assume you want "all"
  343. if config.Tail == "all" || config.Tail == "" {
  344. // tail of 0 means send all logs on the swarmkit side
  345. tail = 0
  346. } else {
  347. t, err := strconv.Atoi(config.Tail)
  348. if err != nil {
  349. return nil, errors.New("tail value must be a positive integer or \"all\"")
  350. }
  351. if t < 0 {
  352. return nil, errors.New("negative tail values not supported")
  353. }
  354. // we actually use negative tail in swarmkit to represent messages
  355. // backwards starting from the beginning. also, -1 means no logs. so,
  356. // basically, for api compat with docker container logs, add one and
  357. // flip the sign. we error above if you try to negative tail, which
  358. // isn't supported by docker (and would error deeper in the stack
  359. // anyway)
  360. //
  361. // See the logs protobuf for more information
  362. tail = int64(-(t + 1))
  363. }
  364. // get the since value - the time in the past we're looking at logs starting from
  365. var sinceProto *gogotypes.Timestamp
  366. if config.Since != "" {
  367. s, n, err := timetypes.ParseTimestamps(config.Since, 0)
  368. if err != nil {
  369. return nil, errors.Wrap(err, "could not parse since timestamp")
  370. }
  371. since := time.Unix(s, n)
  372. sinceProto, err = gogotypes.TimestampProto(since)
  373. if err != nil {
  374. return nil, errors.Wrap(err, "could not parse timestamp to proto")
  375. }
  376. }
  377. stream, err := state.logsClient.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
  378. Selector: swarmSelector,
  379. Options: &swarmapi.LogSubscriptionOptions{
  380. Follow: config.Follow,
  381. Streams: stdStreams,
  382. Tail: tail,
  383. Since: sinceProto,
  384. },
  385. })
  386. if err != nil {
  387. return nil, err
  388. }
  389. messageChan := make(chan *backend.LogMessage, 1)
  390. go func() {
  391. defer close(messageChan)
  392. for {
  393. // Check the context before doing anything.
  394. select {
  395. case <-ctx.Done():
  396. return
  397. default:
  398. }
  399. subscribeMsg, err := stream.Recv()
  400. if err == io.EOF {
  401. return
  402. }
  403. // if we're not io.EOF, push the message in and return
  404. if err != nil {
  405. select {
  406. case <-ctx.Done():
  407. case messageChan <- &backend.LogMessage{Err: err}:
  408. }
  409. return
  410. }
  411. for _, msg := range subscribeMsg.Messages {
  412. // make a new message
  413. m := new(backend.LogMessage)
  414. m.Attrs = make([]backend.LogAttr, 0, len(msg.Attrs)+3)
  415. // add the timestamp, adding the error if it fails
  416. m.Timestamp, err = gogotypes.TimestampFromProto(msg.Timestamp)
  417. if err != nil {
  418. m.Err = err
  419. }
  420. nodeKey := contextPrefix + ".node.id"
  421. serviceKey := contextPrefix + ".service.id"
  422. taskKey := contextPrefix + ".task.id"
  423. // copy over all of the details
  424. for _, d := range msg.Attrs {
  425. switch d.Key {
  426. case nodeKey, serviceKey, taskKey:
  427. // we have the final say over context details (in case there
  428. // is a conflict (if the user added a detail with a context's
  429. // key for some reason))
  430. default:
  431. m.Attrs = append(m.Attrs, backend.LogAttr{Key: d.Key, Value: d.Value})
  432. }
  433. }
  434. m.Attrs = append(m.Attrs,
  435. backend.LogAttr{Key: nodeKey, Value: msg.Context.NodeID},
  436. backend.LogAttr{Key: serviceKey, Value: msg.Context.ServiceID},
  437. backend.LogAttr{Key: taskKey, Value: msg.Context.TaskID},
  438. )
  439. switch msg.Stream {
  440. case swarmapi.LogStreamStdout:
  441. m.Source = "stdout"
  442. case swarmapi.LogStreamStderr:
  443. m.Source = "stderr"
  444. }
  445. m.Line = msg.Data
  446. // there could be a case where the reader stops accepting
  447. // messages and the context is canceled. we need to check that
  448. // here, or otherwise we risk blocking forever on the message
  449. // send.
  450. select {
  451. case <-ctx.Done():
  452. return
  453. case messageChan <- m:
  454. }
  455. }
  456. }
  457. }()
  458. return messageChan, nil
  459. }
  460. // convertSelector takes a backend.LogSelector, which contains raw names that
  461. // may or may not be valid, and converts them to an api.LogSelector proto. It
  462. // returns an error if something fails
  463. func convertSelector(ctx context.Context, cc swarmapi.ControlClient, selector *backend.LogSelector) (*swarmapi.LogSelector, error) {
  464. // don't rely on swarmkit to resolve IDs, do it ourselves
  465. swarmSelector := &swarmapi.LogSelector{}
  466. for _, s := range selector.Services {
  467. service, err := getService(ctx, cc, s, false)
  468. if err != nil {
  469. return nil, err
  470. }
  471. c := service.Spec.Task.GetContainer()
  472. if c == nil {
  473. return nil, errors.New("logs only supported on container tasks")
  474. }
  475. swarmSelector.ServiceIDs = append(swarmSelector.ServiceIDs, service.ID)
  476. }
  477. for _, t := range selector.Tasks {
  478. task, err := getTask(ctx, cc, t)
  479. if err != nil {
  480. return nil, err
  481. }
  482. c := task.Spec.GetContainer()
  483. if c == nil {
  484. return nil, errors.New("logs only supported on container tasks")
  485. }
  486. swarmSelector.TaskIDs = append(swarmSelector.TaskIDs, task.ID)
  487. }
  488. return swarmSelector, nil
  489. }
  490. // imageWithDigestString takes an image such as name or name:tag
  491. // and returns the image pinned to a digest, such as name@sha256:34234
  492. func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authConfig *apitypes.AuthConfig) (string, error) {
  493. ref, err := reference.ParseAnyReference(image)
  494. if err != nil {
  495. return "", err
  496. }
  497. namedRef, ok := ref.(reference.Named)
  498. if !ok {
  499. if _, ok := ref.(reference.Digested); ok {
  500. return image, nil
  501. }
  502. return "", errors.Errorf("unknown image reference format: %s", image)
  503. }
  504. // only query registry if not a canonical reference (i.e. with digest)
  505. if _, ok := namedRef.(reference.Canonical); !ok {
  506. namedRef = reference.TagNameOnly(namedRef)
  507. taggedRef, ok := namedRef.(reference.NamedTagged)
  508. if !ok {
  509. return "", errors.Errorf("image reference not tagged: %s", image)
  510. }
  511. repo, _, err := c.config.Backend.GetRepository(ctx, taggedRef, authConfig)
  512. if err != nil {
  513. return "", err
  514. }
  515. dscrptr, err := repo.Tags(ctx).Get(ctx, taggedRef.Tag())
  516. if err != nil {
  517. return "", err
  518. }
  519. namedDigestedRef, err := reference.WithDigest(taggedRef, dscrptr.Digest)
  520. if err != nil {
  521. return "", err
  522. }
  523. // return familiar form until interface updated to return type
  524. return reference.FamiliarString(namedDigestedRef), nil
  525. }
  526. // reference already contains a digest, so just return it
  527. return reference.FamiliarString(ref), nil
  528. }
  529. // digestWarning constructs a formatted warning string
  530. // using the image name that could not be pinned by digest. The
  531. // formatting is hardcoded, but could me made smarter in the future
  532. func digestWarning(image string) string {
  533. return fmt.Sprintf("image %s could not be accessed on a registry to record\nits digest. Each node will access %s independently,\npossibly leading to different nodes running different\nversions of the image.\n", image, image)
  534. }