container.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
  1. package container
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "strings"
  7. "time"
  8. "github.com/Sirupsen/logrus"
  9. "github.com/docker/docker/api/types"
  10. enginecontainer "github.com/docker/docker/api/types/container"
  11. "github.com/docker/docker/api/types/events"
  12. "github.com/docker/docker/api/types/filters"
  13. "github.com/docker/docker/api/types/network"
  14. clustertypes "github.com/docker/docker/daemon/cluster/provider"
  15. "github.com/docker/docker/reference"
  16. "github.com/docker/swarmkit/agent/exec"
  17. "github.com/docker/swarmkit/api"
  18. )
  19. const (
  20. // Explicitly use the kernel's default setting for CPU quota of 100ms.
  21. // https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt
  22. cpuQuotaPeriod = 100 * time.Millisecond
  23. // systemLabelPrefix represents the reserved namespace for system labels.
  24. systemLabelPrefix = "com.docker.swarm"
  25. )
  26. // containerConfig converts task properties into docker container compatible
  27. // components.
  28. type containerConfig struct {
  29. task *api.Task
  30. networksAttachments map[string]*api.NetworkAttachment
  31. }
  32. // newContainerConfig returns a validated container config. No methods should
  33. // return an error if this function returns without error.
  34. func newContainerConfig(t *api.Task) (*containerConfig, error) {
  35. var c containerConfig
  36. return &c, c.setTask(t)
  37. }
  38. func (c *containerConfig) setTask(t *api.Task) error {
  39. if t.Spec.GetContainer() == nil && t.Spec.GetAttachment() == nil {
  40. return exec.ErrRuntimeUnsupported
  41. }
  42. container := t.Spec.GetContainer()
  43. if container != nil {
  44. if container.Image == "" {
  45. return ErrImageRequired
  46. }
  47. if err := validateMounts(container.Mounts); err != nil {
  48. return err
  49. }
  50. }
  51. // index the networks by name
  52. c.networksAttachments = make(map[string]*api.NetworkAttachment, len(t.Networks))
  53. for _, attachment := range t.Networks {
  54. c.networksAttachments[attachment.Network.Spec.Annotations.Name] = attachment
  55. }
  56. c.task = t
  57. return nil
  58. }
  59. func (c *containerConfig) id() string {
  60. attachment := c.task.Spec.GetAttachment()
  61. if attachment == nil {
  62. return ""
  63. }
  64. return attachment.ContainerID
  65. }
  66. func (c *containerConfig) taskID() string {
  67. return c.task.ID
  68. }
  69. func (c *containerConfig) endpoint() *api.Endpoint {
  70. return c.task.Endpoint
  71. }
  72. func (c *containerConfig) spec() *api.ContainerSpec {
  73. return c.task.Spec.GetContainer()
  74. }
  75. func (c *containerConfig) nameOrID() string {
  76. if c.task.Spec.GetContainer() != nil {
  77. return c.name()
  78. }
  79. return c.id()
  80. }
  81. func (c *containerConfig) name() string {
  82. if c.task.Annotations.Name != "" {
  83. // if set, use the container Annotations.Name field, set in the orchestrator.
  84. return c.task.Annotations.Name
  85. }
  86. // fallback to service.slot.id.
  87. return strings.Join([]string{c.task.ServiceAnnotations.Name, fmt.Sprint(c.task.Slot), c.task.ID}, ".")
  88. }
  89. func (c *containerConfig) image() string {
  90. raw := c.spec().Image
  91. ref, err := reference.ParseNamed(raw)
  92. if err != nil {
  93. return raw
  94. }
  95. return reference.WithDefaultTag(ref).String()
  96. }
  97. func (c *containerConfig) config() *enginecontainer.Config {
  98. config := &enginecontainer.Config{
  99. Labels: c.labels(),
  100. User: c.spec().User,
  101. Env: c.spec().Env,
  102. WorkingDir: c.spec().Dir,
  103. Image: c.image(),
  104. Volumes: c.volumes(),
  105. }
  106. if len(c.spec().Command) > 0 {
  107. // If Command is provided, we replace the whole invocation with Command
  108. // by replacing Entrypoint and specifying Cmd. Args is ignored in this
  109. // case.
  110. config.Entrypoint = append(config.Entrypoint, c.spec().Command...)
  111. config.Cmd = append(config.Cmd, c.spec().Args...)
  112. } else if len(c.spec().Args) > 0 {
  113. // In this case, we assume the image has an Entrypoint and Args
  114. // specifies the arguments for that entrypoint.
  115. config.Cmd = c.spec().Args
  116. }
  117. return config
  118. }
  119. func (c *containerConfig) labels() map[string]string {
  120. taskName := c.task.Annotations.Name
  121. if taskName == "" {
  122. if c.task.Slot != 0 {
  123. taskName = fmt.Sprintf("%v.%v.%v", c.task.ServiceAnnotations.Name, c.task.Slot, c.task.ID)
  124. } else {
  125. taskName = fmt.Sprintf("%v.%v.%v", c.task.ServiceAnnotations.Name, c.task.NodeID, c.task.ID)
  126. }
  127. }
  128. var (
  129. system = map[string]string{
  130. "task": "", // mark as cluster task
  131. "task.id": c.task.ID,
  132. "task.name": taskName,
  133. "node.id": c.task.NodeID,
  134. "service.id": c.task.ServiceID,
  135. "service.name": c.task.ServiceAnnotations.Name,
  136. }
  137. labels = make(map[string]string)
  138. )
  139. // base labels are those defined in the spec.
  140. for k, v := range c.spec().Labels {
  141. labels[k] = v
  142. }
  143. // we then apply the overrides from the task, which may be set via the
  144. // orchestrator.
  145. for k, v := range c.task.Annotations.Labels {
  146. labels[k] = v
  147. }
  148. // finally, we apply the system labels, which override all labels.
  149. for k, v := range system {
  150. labels[strings.Join([]string{systemLabelPrefix, k}, ".")] = v
  151. }
  152. return labels
  153. }
  154. // volumes gets placed into the Volumes field on the containerConfig.
  155. func (c *containerConfig) volumes() map[string]struct{} {
  156. r := make(map[string]struct{})
  157. // Volumes *only* creates anonymous volumes. The rest is mixed in with
  158. // binds, which aren't actually binds. Basically, any volume that
  159. // results in a single component must be added here.
  160. //
  161. // This is reversed engineered from the behavior of the engine API.
  162. for _, mount := range c.spec().Mounts {
  163. if mount.Type == api.MountTypeVolume && mount.Source == "" {
  164. r[mount.Target] = struct{}{}
  165. }
  166. }
  167. return r
  168. }
  169. func (c *containerConfig) tmpfs() map[string]string {
  170. r := make(map[string]string)
  171. for _, spec := range c.spec().Mounts {
  172. if spec.Type != api.MountTypeTmpfs {
  173. continue
  174. }
  175. r[spec.Target] = getMountMask(&spec)
  176. }
  177. return r
  178. }
  179. func (c *containerConfig) binds() []string {
  180. var r []string
  181. for _, mount := range c.spec().Mounts {
  182. if mount.Type == api.MountTypeBind || (mount.Type == api.MountTypeVolume && mount.Source != "") {
  183. spec := fmt.Sprintf("%s:%s", mount.Source, mount.Target)
  184. mask := getMountMask(&mount)
  185. if mask != "" {
  186. spec = fmt.Sprintf("%s:%s", spec, mask)
  187. }
  188. r = append(r, spec)
  189. }
  190. }
  191. return r
  192. }
  193. func getMountMask(m *api.Mount) string {
  194. var maskOpts []string
  195. if m.ReadOnly {
  196. maskOpts = append(maskOpts, "ro")
  197. }
  198. switch m.Type {
  199. case api.MountTypeVolume:
  200. if m.VolumeOptions != nil && m.VolumeOptions.NoCopy {
  201. maskOpts = append(maskOpts, "nocopy")
  202. }
  203. case api.MountTypeBind:
  204. if m.BindOptions == nil {
  205. break
  206. }
  207. switch m.BindOptions.Propagation {
  208. case api.MountPropagationPrivate:
  209. maskOpts = append(maskOpts, "private")
  210. case api.MountPropagationRPrivate:
  211. maskOpts = append(maskOpts, "rprivate")
  212. case api.MountPropagationShared:
  213. maskOpts = append(maskOpts, "shared")
  214. case api.MountPropagationRShared:
  215. maskOpts = append(maskOpts, "rshared")
  216. case api.MountPropagationSlave:
  217. maskOpts = append(maskOpts, "slave")
  218. case api.MountPropagationRSlave:
  219. maskOpts = append(maskOpts, "rslave")
  220. }
  221. case api.MountTypeTmpfs:
  222. if m.TmpfsOptions == nil {
  223. break
  224. }
  225. if m.TmpfsOptions.Mode != 0 {
  226. maskOpts = append(maskOpts, fmt.Sprintf("mode=%o", m.TmpfsOptions.Mode))
  227. }
  228. if m.TmpfsOptions.SizeBytes != 0 {
  229. // calculate suffix here, making this linux specific, but that is
  230. // okay, since API is that way anyways.
  231. // we do this by finding the suffix that divides evenly into the
  232. // value, returing the value itself, with no suffix, if it fails.
  233. //
  234. // For the most part, we don't enforce any semantic to this values.
  235. // The operating system will usually align this and enforce minimum
  236. // and maximums.
  237. var (
  238. size = m.TmpfsOptions.SizeBytes
  239. suffix string
  240. )
  241. for _, r := range []struct {
  242. suffix string
  243. divisor int64
  244. }{
  245. {"g", 1 << 30},
  246. {"m", 1 << 20},
  247. {"k", 1 << 10},
  248. } {
  249. if size%r.divisor == 0 {
  250. size = size / r.divisor
  251. suffix = r.suffix
  252. break
  253. }
  254. }
  255. maskOpts = append(maskOpts, fmt.Sprintf("size=%d%s", size, suffix))
  256. }
  257. }
  258. return strings.Join(maskOpts, ",")
  259. }
  260. func (c *containerConfig) hostConfig() *enginecontainer.HostConfig {
  261. hc := &enginecontainer.HostConfig{
  262. Resources: c.resources(),
  263. Binds: c.binds(),
  264. Tmpfs: c.tmpfs(),
  265. GroupAdd: c.spec().Groups,
  266. }
  267. if c.task.LogDriver != nil {
  268. hc.LogConfig = enginecontainer.LogConfig{
  269. Type: c.task.LogDriver.Name,
  270. Config: c.task.LogDriver.Options,
  271. }
  272. }
  273. return hc
  274. }
  275. // This handles the case of volumes that are defined inside a service Mount
  276. func (c *containerConfig) volumeCreateRequest(mount *api.Mount) *types.VolumeCreateRequest {
  277. var (
  278. driverName string
  279. driverOpts map[string]string
  280. labels map[string]string
  281. )
  282. if mount.VolumeOptions != nil && mount.VolumeOptions.DriverConfig != nil {
  283. driverName = mount.VolumeOptions.DriverConfig.Name
  284. driverOpts = mount.VolumeOptions.DriverConfig.Options
  285. labels = mount.VolumeOptions.Labels
  286. }
  287. if mount.VolumeOptions != nil {
  288. return &types.VolumeCreateRequest{
  289. Name: mount.Source,
  290. Driver: driverName,
  291. DriverOpts: driverOpts,
  292. Labels: labels,
  293. }
  294. }
  295. return nil
  296. }
  297. func (c *containerConfig) resources() enginecontainer.Resources {
  298. resources := enginecontainer.Resources{}
  299. // If no limits are specified let the engine use its defaults.
  300. //
  301. // TODO(aluzzardi): We might want to set some limits anyway otherwise
  302. // "unlimited" tasks will step over the reservation of other tasks.
  303. r := c.task.Spec.Resources
  304. if r == nil || r.Limits == nil {
  305. return resources
  306. }
  307. if r.Limits.MemoryBytes > 0 {
  308. resources.Memory = r.Limits.MemoryBytes
  309. }
  310. if r.Limits.NanoCPUs > 0 {
  311. // CPU Period must be set in microseconds.
  312. resources.CPUPeriod = int64(cpuQuotaPeriod / time.Microsecond)
  313. resources.CPUQuota = r.Limits.NanoCPUs * resources.CPUPeriod / 1e9
  314. }
  315. return resources
  316. }
  317. // Docker daemon supports just 1 network during container create.
  318. func (c *containerConfig) createNetworkingConfig() *network.NetworkingConfig {
  319. var networks []*api.NetworkAttachment
  320. if c.task.Spec.GetContainer() != nil || c.task.Spec.GetAttachment() != nil {
  321. networks = c.task.Networks
  322. }
  323. epConfig := make(map[string]*network.EndpointSettings)
  324. if len(networks) > 0 {
  325. epConfig[networks[0].Network.Spec.Annotations.Name] = getEndpointConfig(networks[0])
  326. }
  327. return &network.NetworkingConfig{EndpointsConfig: epConfig}
  328. }
  329. // TODO: Merge this function with createNetworkingConfig after daemon supports multiple networks in container create
  330. func (c *containerConfig) connectNetworkingConfig() *network.NetworkingConfig {
  331. var networks []*api.NetworkAttachment
  332. if c.task.Spec.GetContainer() != nil {
  333. networks = c.task.Networks
  334. }
  335. // First network is used during container create. Other networks are used in "docker network connect"
  336. if len(networks) < 2 {
  337. return nil
  338. }
  339. epConfig := make(map[string]*network.EndpointSettings)
  340. for _, na := range networks[1:] {
  341. epConfig[na.Network.Spec.Annotations.Name] = getEndpointConfig(na)
  342. }
  343. return &network.NetworkingConfig{EndpointsConfig: epConfig}
  344. }
  345. func getEndpointConfig(na *api.NetworkAttachment) *network.EndpointSettings {
  346. var ipv4, ipv6 string
  347. for _, addr := range na.Addresses {
  348. ip, _, err := net.ParseCIDR(addr)
  349. if err != nil {
  350. continue
  351. }
  352. if ip.To4() != nil {
  353. ipv4 = ip.String()
  354. continue
  355. }
  356. if ip.To16() != nil {
  357. ipv6 = ip.String()
  358. }
  359. }
  360. return &network.EndpointSettings{
  361. NetworkID: na.Network.ID,
  362. IPAMConfig: &network.EndpointIPAMConfig{
  363. IPv4Address: ipv4,
  364. IPv6Address: ipv6,
  365. },
  366. }
  367. }
  368. func (c *containerConfig) virtualIP(networkID string) string {
  369. if c.task.Endpoint == nil {
  370. return ""
  371. }
  372. for _, eVip := range c.task.Endpoint.VirtualIPs {
  373. // We only support IPv4 VIPs for now.
  374. if eVip.NetworkID == networkID {
  375. vip, _, err := net.ParseCIDR(eVip.Addr)
  376. if err != nil {
  377. return ""
  378. }
  379. return vip.String()
  380. }
  381. }
  382. return ""
  383. }
  384. func (c *containerConfig) serviceConfig() *clustertypes.ServiceConfig {
  385. if len(c.task.Networks) == 0 {
  386. return nil
  387. }
  388. logrus.Debugf("Creating service config in agent for t = %+v", c.task)
  389. svcCfg := &clustertypes.ServiceConfig{
  390. Name: c.task.ServiceAnnotations.Name,
  391. Aliases: make(map[string][]string),
  392. ID: c.task.ServiceID,
  393. VirtualAddresses: make(map[string]*clustertypes.VirtualAddress),
  394. }
  395. for _, na := range c.task.Networks {
  396. svcCfg.VirtualAddresses[na.Network.ID] = &clustertypes.VirtualAddress{
  397. // We support only IPv4 virtual IP for now.
  398. IPv4: c.virtualIP(na.Network.ID),
  399. }
  400. if len(na.Aliases) > 0 {
  401. svcCfg.Aliases[na.Network.ID] = na.Aliases
  402. }
  403. }
  404. if c.task.Endpoint != nil {
  405. for _, ePort := range c.task.Endpoint.Ports {
  406. svcCfg.ExposedPorts = append(svcCfg.ExposedPorts, &clustertypes.PortConfig{
  407. Name: ePort.Name,
  408. Protocol: int32(ePort.Protocol),
  409. TargetPort: ePort.TargetPort,
  410. PublishedPort: ePort.PublishedPort,
  411. })
  412. }
  413. }
  414. return svcCfg
  415. }
  416. // networks returns a list of network names attached to the container. The
  417. // returned name can be used to lookup the corresponding network create
  418. // options.
  419. func (c *containerConfig) networks() []string {
  420. var networks []string
  421. for name := range c.networksAttachments {
  422. networks = append(networks, name)
  423. }
  424. return networks
  425. }
  426. func (c *containerConfig) networkCreateRequest(name string) (clustertypes.NetworkCreateRequest, error) {
  427. na, ok := c.networksAttachments[name]
  428. if !ok {
  429. return clustertypes.NetworkCreateRequest{}, errors.New("container: unknown network referenced")
  430. }
  431. options := types.NetworkCreate{
  432. // ID: na.Network.ID,
  433. Driver: na.Network.DriverState.Name,
  434. IPAM: &network.IPAM{
  435. Driver: na.Network.IPAM.Driver.Name,
  436. },
  437. Options: na.Network.DriverState.Options,
  438. Labels: na.Network.Spec.Annotations.Labels,
  439. Internal: na.Network.Spec.Internal,
  440. EnableIPv6: na.Network.Spec.Ipv6Enabled,
  441. CheckDuplicate: true,
  442. }
  443. for _, ic := range na.Network.IPAM.Configs {
  444. c := network.IPAMConfig{
  445. Subnet: ic.Subnet,
  446. IPRange: ic.Range,
  447. Gateway: ic.Gateway,
  448. }
  449. options.IPAM.Config = append(options.IPAM.Config, c)
  450. }
  451. return clustertypes.NetworkCreateRequest{na.Network.ID, types.NetworkCreateRequest{Name: name, NetworkCreate: options}}, nil
  452. }
  453. func (c containerConfig) eventFilter() filters.Args {
  454. filter := filters.NewArgs()
  455. filter.Add("type", events.ContainerEventType)
  456. filter.Add("name", c.name())
  457. filter.Add("label", fmt.Sprintf("%v.task.id=%v", systemLabelPrefix, c.task.ID))
  458. return filter
  459. }