service.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639
  1. package controlapi
  2. import (
  3. "errors"
  4. "path/filepath"
  5. "reflect"
  6. "regexp"
  7. "strconv"
  8. "strings"
  9. "github.com/docker/distribution/reference"
  10. "github.com/docker/swarmkit/api"
  11. "github.com/docker/swarmkit/identity"
  12. "github.com/docker/swarmkit/manager/constraint"
  13. "github.com/docker/swarmkit/manager/state/store"
  14. "github.com/docker/swarmkit/protobuf/ptypes"
  15. "github.com/docker/swarmkit/template"
  16. "golang.org/x/net/context"
  17. "google.golang.org/grpc"
  18. "google.golang.org/grpc/codes"
  19. )
  20. var (
  21. errNetworkUpdateNotSupported = errors.New("changing network in service is not supported")
  22. errRenameNotSupported = errors.New("renaming services is not supported")
  23. errModeChangeNotAllowed = errors.New("service mode change is not allowed")
  24. )
  25. // Regexp pattern for hostname to conform RFC 1123
  26. var hostnamePattern = regexp.MustCompile("^(([[:alnum:]]|[[:alnum:]][[:alnum:]\\-]*[[:alnum:]])\\.)*([[:alnum:]]|[[:alnum:]][[:alnum:]\\-]*[[:alnum:]])$")
  27. func validateResources(r *api.Resources) error {
  28. if r == nil {
  29. return nil
  30. }
  31. if r.NanoCPUs != 0 && r.NanoCPUs < 1e6 {
  32. return grpc.Errorf(codes.InvalidArgument, "invalid cpu value %g: Must be at least %g", float64(r.NanoCPUs)/1e9, 1e6/1e9)
  33. }
  34. if r.MemoryBytes != 0 && r.MemoryBytes < 4*1024*1024 {
  35. return grpc.Errorf(codes.InvalidArgument, "invalid memory value %d: Must be at least 4MiB", r.MemoryBytes)
  36. }
  37. return nil
  38. }
  39. func validateResourceRequirements(r *api.ResourceRequirements) error {
  40. if r == nil {
  41. return nil
  42. }
  43. if err := validateResources(r.Limits); err != nil {
  44. return err
  45. }
  46. if err := validateResources(r.Reservations); err != nil {
  47. return err
  48. }
  49. return nil
  50. }
  51. func validateRestartPolicy(rp *api.RestartPolicy) error {
  52. if rp == nil {
  53. return nil
  54. }
  55. if rp.Delay != nil {
  56. delay, err := ptypes.Duration(rp.Delay)
  57. if err != nil {
  58. return err
  59. }
  60. if delay < 0 {
  61. return grpc.Errorf(codes.InvalidArgument, "TaskSpec: restart-delay cannot be negative")
  62. }
  63. }
  64. if rp.Window != nil {
  65. win, err := ptypes.Duration(rp.Window)
  66. if err != nil {
  67. return err
  68. }
  69. if win < 0 {
  70. return grpc.Errorf(codes.InvalidArgument, "TaskSpec: restart-window cannot be negative")
  71. }
  72. }
  73. return nil
  74. }
  75. func validatePlacement(placement *api.Placement) error {
  76. if placement == nil {
  77. return nil
  78. }
  79. _, err := constraint.Parse(placement.Constraints)
  80. return err
  81. }
  82. func validateUpdate(uc *api.UpdateConfig) error {
  83. if uc == nil {
  84. return nil
  85. }
  86. delay, err := ptypes.Duration(&uc.Delay)
  87. if err != nil {
  88. return err
  89. }
  90. if delay < 0 {
  91. return grpc.Errorf(codes.InvalidArgument, "TaskSpec: update-delay cannot be negative")
  92. }
  93. return nil
  94. }
  95. func validateContainerSpec(container *api.ContainerSpec) error {
  96. if container == nil {
  97. return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: missing in service spec")
  98. }
  99. if err := validateHostname(container.Hostname); err != nil {
  100. return err
  101. }
  102. if container.Image == "" {
  103. return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: image reference must be provided")
  104. }
  105. if _, err := reference.ParseNamed(container.Image); err != nil {
  106. return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: %q is not a valid repository/tag", container.Image)
  107. }
  108. mountMap := make(map[string]bool)
  109. for _, mount := range container.Mounts {
  110. if _, exists := mountMap[mount.Target]; exists {
  111. return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: duplicate mount point: %s", mount.Target)
  112. }
  113. mountMap[mount.Target] = true
  114. }
  115. return nil
  116. }
  117. func validateHostname(hostname string) error {
  118. if hostname != "" {
  119. if len(hostname) > 63 || !hostnamePattern.MatchString(hostname) {
  120. return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: %s is not valid hostname", hostname)
  121. }
  122. }
  123. return nil
  124. }
  125. func validateTask(taskSpec api.TaskSpec) error {
  126. if err := validateResourceRequirements(taskSpec.Resources); err != nil {
  127. return err
  128. }
  129. if err := validateRestartPolicy(taskSpec.Restart); err != nil {
  130. return err
  131. }
  132. if err := validatePlacement(taskSpec.Placement); err != nil {
  133. return err
  134. }
  135. if taskSpec.GetRuntime() == nil {
  136. return grpc.Errorf(codes.InvalidArgument, "TaskSpec: missing runtime")
  137. }
  138. _, ok := taskSpec.GetRuntime().(*api.TaskSpec_Container)
  139. if !ok {
  140. return grpc.Errorf(codes.Unimplemented, "RuntimeSpec: unimplemented runtime in service spec")
  141. }
  142. // Building a empty/dummy Task to validate the templating and
  143. // the resulting container spec as well. This is a *best effort*
  144. // validation.
  145. preparedSpec, err := template.ExpandContainerSpec(&api.Task{
  146. Spec: taskSpec,
  147. ServiceID: "serviceid",
  148. Slot: 1,
  149. NodeID: "nodeid",
  150. Networks: []*api.NetworkAttachment{},
  151. Annotations: api.Annotations{
  152. Name: "taskname",
  153. },
  154. ServiceAnnotations: api.Annotations{
  155. Name: "servicename",
  156. },
  157. Endpoint: &api.Endpoint{},
  158. LogDriver: taskSpec.LogDriver,
  159. })
  160. if err != nil {
  161. return grpc.Errorf(codes.InvalidArgument, err.Error())
  162. }
  163. if err := validateContainerSpec(preparedSpec); err != nil {
  164. return err
  165. }
  166. return nil
  167. }
  168. func validateEndpointSpec(epSpec *api.EndpointSpec) error {
  169. // Endpoint spec is optional
  170. if epSpec == nil {
  171. return nil
  172. }
  173. if len(epSpec.Ports) > 0 && epSpec.Mode == api.ResolutionModeDNSRoundRobin {
  174. return grpc.Errorf(codes.InvalidArgument, "EndpointSpec: ports can't be used with dnsrr mode")
  175. }
  176. type portSpec struct {
  177. publishedPort uint32
  178. protocol api.PortConfig_Protocol
  179. }
  180. portSet := make(map[portSpec]struct{})
  181. for _, port := range epSpec.Ports {
  182. // If published port is not specified, it does not conflict
  183. // with any others.
  184. if port.PublishedPort == 0 {
  185. continue
  186. }
  187. portSpec := portSpec{publishedPort: port.PublishedPort, protocol: port.Protocol}
  188. if _, ok := portSet[portSpec]; ok {
  189. return grpc.Errorf(codes.InvalidArgument, "EndpointSpec: duplicate published ports provided")
  190. }
  191. portSet[portSpec] = struct{}{}
  192. }
  193. return nil
  194. }
  195. // validateSecretRefsSpec finds if the secrets passed in spec are valid and have no
  196. // conflicting targets.
  197. func validateSecretRefsSpec(spec *api.ServiceSpec) error {
  198. container := spec.Task.GetContainer()
  199. if container == nil {
  200. return nil
  201. }
  202. // Keep a map to track all the targets that will be exposed
  203. // The string returned is only used for logging. It could as well be struct{}{}
  204. existingTargets := make(map[string]string)
  205. for _, secretRef := range container.Secrets {
  206. // SecretID and SecretName are mandatory, we have invalid references without them
  207. if secretRef.SecretID == "" || secretRef.SecretName == "" {
  208. return grpc.Errorf(codes.InvalidArgument, "malformed secret reference")
  209. }
  210. // Every secret referece requires a Target
  211. if secretRef.GetTarget() == nil {
  212. return grpc.Errorf(codes.InvalidArgument, "malformed secret reference, no target provided")
  213. }
  214. // If this is a file target, we will ensure filename uniqueness
  215. if secretRef.GetFile() != nil {
  216. fileName := secretRef.GetFile().Name
  217. // Validate the file name
  218. if fileName == "" || fileName != filepath.Base(filepath.Clean(fileName)) {
  219. return grpc.Errorf(codes.InvalidArgument, "malformed file secret reference, invalid target file name provided")
  220. }
  221. // If this target is already in use, we have conflicting targets
  222. if prevSecretName, ok := existingTargets[fileName]; ok {
  223. return grpc.Errorf(codes.InvalidArgument, "secret references '%s' and '%s' have a conflicting target: '%s'", prevSecretName, secretRef.SecretName, fileName)
  224. }
  225. existingTargets[fileName] = secretRef.SecretName
  226. }
  227. }
  228. return nil
  229. }
  230. func (s *Server) validateNetworks(networks []*api.NetworkAttachmentConfig) error {
  231. for _, na := range networks {
  232. var network *api.Network
  233. s.store.View(func(tx store.ReadTx) {
  234. network = store.GetNetwork(tx, na.Target)
  235. })
  236. if network == nil {
  237. continue
  238. }
  239. if _, ok := network.Spec.Annotations.Labels["com.docker.swarm.internal"]; ok {
  240. return grpc.Errorf(codes.InvalidArgument,
  241. "Service cannot be explicitly attached to %q network which is a swarm internal network",
  242. network.Spec.Annotations.Name)
  243. }
  244. }
  245. return nil
  246. }
  247. func validateServiceSpec(spec *api.ServiceSpec) error {
  248. if spec == nil {
  249. return grpc.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
  250. }
  251. if err := validateAnnotations(spec.Annotations); err != nil {
  252. return err
  253. }
  254. if err := validateTask(spec.Task); err != nil {
  255. return err
  256. }
  257. if err := validateUpdate(spec.Update); err != nil {
  258. return err
  259. }
  260. if err := validateEndpointSpec(spec.Endpoint); err != nil {
  261. return err
  262. }
  263. // Check to see if the Secret Reference portion of the spec is valid
  264. if err := validateSecretRefsSpec(spec); err != nil {
  265. return err
  266. }
  267. return nil
  268. }
  269. // checkPortConflicts does a best effort to find if the passed in spec has port
  270. // conflicts with existing services.
  271. // `serviceID string` is the service ID of the spec in service update. If
  272. // `serviceID` is not "", then conflicts check will be skipped against this
  273. // service (the service being updated).
  274. func (s *Server) checkPortConflicts(spec *api.ServiceSpec, serviceID string) error {
  275. if spec.Endpoint == nil {
  276. return nil
  277. }
  278. pcToString := func(pc *api.PortConfig) string {
  279. port := strconv.FormatUint(uint64(pc.PublishedPort), 10)
  280. return port + "/" + pc.Protocol.String()
  281. }
  282. reqPorts := make(map[string]bool)
  283. for _, pc := range spec.Endpoint.Ports {
  284. if pc.PublishedPort > 0 {
  285. reqPorts[pcToString(pc)] = true
  286. }
  287. }
  288. if len(reqPorts) == 0 {
  289. return nil
  290. }
  291. var (
  292. services []*api.Service
  293. err error
  294. )
  295. s.store.View(func(tx store.ReadTx) {
  296. services, err = store.FindServices(tx, store.All)
  297. })
  298. if err != nil {
  299. return err
  300. }
  301. for _, service := range services {
  302. // If service ID is the same (and not "") then this is an update
  303. if serviceID != "" && serviceID == service.ID {
  304. continue
  305. }
  306. if service.Spec.Endpoint != nil {
  307. for _, pc := range service.Spec.Endpoint.Ports {
  308. if reqPorts[pcToString(pc)] {
  309. return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service '%s' (%s)", pc.PublishedPort, service.Spec.Annotations.Name, service.ID)
  310. }
  311. }
  312. }
  313. if service.Endpoint != nil {
  314. for _, pc := range service.Endpoint.Ports {
  315. if reqPorts[pcToString(pc)] {
  316. return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service '%s' (%s)", pc.PublishedPort, service.Spec.Annotations.Name, service.ID)
  317. }
  318. }
  319. }
  320. }
  321. return nil
  322. }
  323. // checkSecretExistence finds if the secret exists
  324. func (s *Server) checkSecretExistence(tx store.Tx, spec *api.ServiceSpec) error {
  325. container := spec.Task.GetContainer()
  326. if container == nil {
  327. return nil
  328. }
  329. var failedSecrets []string
  330. for _, secretRef := range container.Secrets {
  331. secret := store.GetSecret(tx, secretRef.SecretID)
  332. // Check to see if the secret exists and secretRef.SecretName matches the actual secretName
  333. if secret == nil || secret.Spec.Annotations.Name != secretRef.SecretName {
  334. failedSecrets = append(failedSecrets, secretRef.SecretName)
  335. }
  336. }
  337. if len(failedSecrets) > 0 {
  338. secretStr := "secrets"
  339. if len(failedSecrets) == 1 {
  340. secretStr = "secret"
  341. }
  342. return grpc.Errorf(codes.InvalidArgument, "%s not found: %v", secretStr, strings.Join(failedSecrets, ", "))
  343. }
  344. return nil
  345. }
  346. // CreateService creates and return a Service based on the provided ServiceSpec.
  347. // - Returns `InvalidArgument` if the ServiceSpec is malformed.
  348. // - Returns `Unimplemented` if the ServiceSpec references unimplemented features.
  349. // - Returns `AlreadyExists` if the ServiceID conflicts.
  350. // - Returns an error if the creation fails.
  351. func (s *Server) CreateService(ctx context.Context, request *api.CreateServiceRequest) (*api.CreateServiceResponse, error) {
  352. if err := validateServiceSpec(request.Spec); err != nil {
  353. return nil, err
  354. }
  355. if err := s.validateNetworks(request.Spec.Networks); err != nil {
  356. return nil, err
  357. }
  358. if err := s.checkPortConflicts(request.Spec, ""); err != nil {
  359. return nil, err
  360. }
  361. // TODO(aluzzardi): Consider using `Name` as a primary key to handle
  362. // duplicate creations. See #65
  363. service := &api.Service{
  364. ID: identity.NewID(),
  365. Spec: *request.Spec,
  366. }
  367. err := s.store.Update(func(tx store.Tx) error {
  368. // Check to see if all the secrets being added exist as objects
  369. // in our datastore
  370. err := s.checkSecretExistence(tx, request.Spec)
  371. if err != nil {
  372. return err
  373. }
  374. return store.CreateService(tx, service)
  375. })
  376. if err != nil {
  377. return nil, err
  378. }
  379. return &api.CreateServiceResponse{
  380. Service: service,
  381. }, nil
  382. }
  383. // GetService returns a Service given a ServiceID.
  384. // - Returns `InvalidArgument` if ServiceID is not provided.
  385. // - Returns `NotFound` if the Service is not found.
  386. func (s *Server) GetService(ctx context.Context, request *api.GetServiceRequest) (*api.GetServiceResponse, error) {
  387. if request.ServiceID == "" {
  388. return nil, grpc.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
  389. }
  390. var service *api.Service
  391. s.store.View(func(tx store.ReadTx) {
  392. service = store.GetService(tx, request.ServiceID)
  393. })
  394. if service == nil {
  395. return nil, grpc.Errorf(codes.NotFound, "service %s not found", request.ServiceID)
  396. }
  397. return &api.GetServiceResponse{
  398. Service: service,
  399. }, nil
  400. }
  401. // UpdateService updates a Service referenced by ServiceID with the given ServiceSpec.
  402. // - Returns `NotFound` if the Service is not found.
  403. // - Returns `InvalidArgument` if the ServiceSpec is malformed.
  404. // - Returns `Unimplemented` if the ServiceSpec references unimplemented features.
  405. // - Returns an error if the update fails.
  406. func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRequest) (*api.UpdateServiceResponse, error) {
  407. if request.ServiceID == "" || request.ServiceVersion == nil {
  408. return nil, grpc.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
  409. }
  410. if err := validateServiceSpec(request.Spec); err != nil {
  411. return nil, err
  412. }
  413. var service *api.Service
  414. s.store.View(func(tx store.ReadTx) {
  415. service = store.GetService(tx, request.ServiceID)
  416. })
  417. if service == nil {
  418. return nil, grpc.Errorf(codes.NotFound, "service %s not found", request.ServiceID)
  419. }
  420. if request.Spec.Endpoint != nil && !reflect.DeepEqual(request.Spec.Endpoint, service.Spec.Endpoint) {
  421. if err := s.checkPortConflicts(request.Spec, request.ServiceID); err != nil {
  422. return nil, err
  423. }
  424. }
  425. err := s.store.Update(func(tx store.Tx) error {
  426. service = store.GetService(tx, request.ServiceID)
  427. if service == nil {
  428. return nil
  429. }
  430. // temporary disable network update
  431. requestSpecNetworks := request.Spec.Task.Networks
  432. if len(requestSpecNetworks) == 0 {
  433. requestSpecNetworks = request.Spec.Networks
  434. }
  435. specNetworks := service.Spec.Task.Networks
  436. if len(specNetworks) == 0 {
  437. specNetworks = service.Spec.Networks
  438. }
  439. if !reflect.DeepEqual(requestSpecNetworks, specNetworks) {
  440. return errNetworkUpdateNotSupported
  441. }
  442. // Check to see if all the secrets being added exist as objects
  443. // in our datastore
  444. err := s.checkSecretExistence(tx, request.Spec)
  445. if err != nil {
  446. return err
  447. }
  448. // orchestrator is designed to be stateless, so it should not deal
  449. // with service mode change (comparing current config with previous config).
  450. // proper way to change service mode is to delete and re-add.
  451. if reflect.TypeOf(service.Spec.Mode) != reflect.TypeOf(request.Spec.Mode) {
  452. return errModeChangeNotAllowed
  453. }
  454. if service.Spec.Annotations.Name != request.Spec.Annotations.Name {
  455. return errRenameNotSupported
  456. }
  457. service.Meta.Version = *request.ServiceVersion
  458. service.PreviousSpec = service.Spec.Copy()
  459. service.Spec = *request.Spec.Copy()
  460. // Reset update status
  461. service.UpdateStatus = nil
  462. return store.UpdateService(tx, service)
  463. })
  464. if err != nil {
  465. return nil, err
  466. }
  467. if service == nil {
  468. return nil, grpc.Errorf(codes.NotFound, "service %s not found", request.ServiceID)
  469. }
  470. return &api.UpdateServiceResponse{
  471. Service: service,
  472. }, nil
  473. }
  474. // RemoveService removes a Service referenced by ServiceID.
  475. // - Returns `InvalidArgument` if ServiceID is not provided.
  476. // - Returns `NotFound` if the Service is not found.
  477. // - Returns an error if the deletion fails.
  478. func (s *Server) RemoveService(ctx context.Context, request *api.RemoveServiceRequest) (*api.RemoveServiceResponse, error) {
  479. if request.ServiceID == "" {
  480. return nil, grpc.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
  481. }
  482. err := s.store.Update(func(tx store.Tx) error {
  483. return store.DeleteService(tx, request.ServiceID)
  484. })
  485. if err != nil {
  486. if err == store.ErrNotExist {
  487. return nil, grpc.Errorf(codes.NotFound, "service %s not found", request.ServiceID)
  488. }
  489. return nil, err
  490. }
  491. return &api.RemoveServiceResponse{}, nil
  492. }
  493. func filterServices(candidates []*api.Service, filters ...func(*api.Service) bool) []*api.Service {
  494. result := []*api.Service{}
  495. for _, c := range candidates {
  496. match := true
  497. for _, f := range filters {
  498. if !f(c) {
  499. match = false
  500. break
  501. }
  502. }
  503. if match {
  504. result = append(result, c)
  505. }
  506. }
  507. return result
  508. }
  509. // ListServices returns a list of all services.
  510. func (s *Server) ListServices(ctx context.Context, request *api.ListServicesRequest) (*api.ListServicesResponse, error) {
  511. var (
  512. services []*api.Service
  513. err error
  514. )
  515. s.store.View(func(tx store.ReadTx) {
  516. switch {
  517. case request.Filters != nil && len(request.Filters.Names) > 0:
  518. services, err = store.FindServices(tx, buildFilters(store.ByName, request.Filters.Names))
  519. case request.Filters != nil && len(request.Filters.NamePrefixes) > 0:
  520. services, err = store.FindServices(tx, buildFilters(store.ByNamePrefix, request.Filters.NamePrefixes))
  521. case request.Filters != nil && len(request.Filters.IDPrefixes) > 0:
  522. services, err = store.FindServices(tx, buildFilters(store.ByIDPrefix, request.Filters.IDPrefixes))
  523. default:
  524. services, err = store.FindServices(tx, store.All)
  525. }
  526. })
  527. if err != nil {
  528. return nil, err
  529. }
  530. if request.Filters != nil {
  531. services = filterServices(services,
  532. func(e *api.Service) bool {
  533. return filterContains(e.Spec.Annotations.Name, request.Filters.Names)
  534. },
  535. func(e *api.Service) bool {
  536. return filterContainsPrefix(e.Spec.Annotations.Name, request.Filters.NamePrefixes)
  537. },
  538. func(e *api.Service) bool {
  539. return filterContainsPrefix(e.ID, request.Filters.IDPrefixes)
  540. },
  541. func(e *api.Service) bool {
  542. return filterMatchLabels(e.Spec.Annotations.Labels, request.Filters.Labels)
  543. },
  544. )
  545. }
  546. return &api.ListServicesResponse{
  547. Services: services,
  548. }, nil
  549. }