service_linux.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958
  1. package libnetwork
  2. import (
  3. "fmt"
  4. "io"
  5. "io/ioutil"
  6. "net"
  7. "os"
  8. "os/exec"
  9. "path/filepath"
  10. "runtime"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "syscall"
  15. "github.com/Sirupsen/logrus"
  16. "github.com/docker/docker/pkg/reexec"
  17. "github.com/docker/libnetwork/iptables"
  18. "github.com/docker/libnetwork/ipvs"
  19. "github.com/docker/libnetwork/ns"
  20. "github.com/gogo/protobuf/proto"
  21. "github.com/vishvananda/netlink/nl"
  22. "github.com/vishvananda/netns"
  23. )
  24. func init() {
  25. reexec.Register("fwmarker", fwMarker)
  26. reexec.Register("redirecter", redirecter)
  27. }
  28. func newService(name string, id string, ingressPorts []*PortConfig, aliases []string) *service {
  29. return &service{
  30. name: name,
  31. id: id,
  32. ingressPorts: ingressPorts,
  33. loadBalancers: make(map[string]*loadBalancer),
  34. aliases: aliases,
  35. }
  36. }
  37. func (c *controller) cleanupServiceBindings(cleanupNID string) {
  38. var cleanupFuncs []func()
  39. c.Lock()
  40. services := make([]*service, 0, len(c.serviceBindings))
  41. for _, s := range c.serviceBindings {
  42. services = append(services, s)
  43. }
  44. c.Unlock()
  45. for _, s := range services {
  46. s.Lock()
  47. for nid, lb := range s.loadBalancers {
  48. if cleanupNID != "" && nid != cleanupNID {
  49. continue
  50. }
  51. for eid, ip := range lb.backEnds {
  52. service := s
  53. loadBalancer := lb
  54. networkID := nid
  55. epID := eid
  56. epIP := ip
  57. cleanupFuncs = append(cleanupFuncs, func() {
  58. if err := c.rmServiceBinding(service.name, service.id, networkID, epID, loadBalancer.vip,
  59. service.ingressPorts, service.aliases, epIP); err != nil {
  60. logrus.Errorf("Failed to remove service bindings for service %s network %s endpoint %s while cleanup: %v",
  61. service.id, networkID, epID, err)
  62. }
  63. })
  64. }
  65. }
  66. s.Unlock()
  67. }
  68. for _, f := range cleanupFuncs {
  69. f()
  70. }
  71. }
  72. func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error {
  73. var (
  74. s *service
  75. addService bool
  76. )
  77. n, err := c.NetworkByID(nid)
  78. if err != nil {
  79. return err
  80. }
  81. skey := serviceKey{
  82. id: sid,
  83. ports: portConfigs(ingressPorts).String(),
  84. }
  85. c.Lock()
  86. s, ok := c.serviceBindings[skey]
  87. if !ok {
  88. // Create a new service if we are seeing this service
  89. // for the first time.
  90. s = newService(name, sid, ingressPorts, aliases)
  91. c.serviceBindings[skey] = s
  92. }
  93. c.Unlock()
  94. // Add endpoint IP to special "tasks.svc_name" so that the
  95. // applications have access to DNS RR.
  96. n.(*network).addSvcRecords("tasks."+name, ip, nil, false)
  97. for _, alias := range aliases {
  98. n.(*network).addSvcRecords("tasks."+alias, ip, nil, false)
  99. }
  100. // Add service name to vip in DNS, if vip is valid. Otherwise resort to DNS RR
  101. svcIP := vip
  102. if len(svcIP) == 0 {
  103. svcIP = ip
  104. }
  105. n.(*network).addSvcRecords(name, svcIP, nil, false)
  106. for _, alias := range aliases {
  107. n.(*network).addSvcRecords(alias, svcIP, nil, false)
  108. }
  109. s.Lock()
  110. defer s.Unlock()
  111. lb, ok := s.loadBalancers[nid]
  112. if !ok {
  113. // Create a new load balancer if we are seeing this
  114. // network attachment on the service for the first
  115. // time.
  116. lb = &loadBalancer{
  117. vip: vip,
  118. fwMark: fwMarkCtr,
  119. backEnds: make(map[string]net.IP),
  120. service: s,
  121. }
  122. fwMarkCtrMu.Lock()
  123. fwMarkCtr++
  124. fwMarkCtrMu.Unlock()
  125. s.loadBalancers[nid] = lb
  126. // Since we just created this load balancer make sure
  127. // we add a new service service in IPVS rules.
  128. addService = true
  129. }
  130. lb.backEnds[eid] = ip
  131. // Add loadbalancer service and backend in all sandboxes in
  132. // the network only if vip is valid.
  133. if len(vip) != 0 {
  134. n.(*network).addLBBackend(ip, vip, lb.fwMark, ingressPorts, addService)
  135. }
  136. return nil
  137. }
  138. func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, aliases []string, ip net.IP) error {
  139. var rmService bool
  140. n, err := c.NetworkByID(nid)
  141. if err != nil {
  142. return err
  143. }
  144. skey := serviceKey{
  145. id: sid,
  146. ports: portConfigs(ingressPorts).String(),
  147. }
  148. c.Lock()
  149. s, ok := c.serviceBindings[skey]
  150. if !ok {
  151. c.Unlock()
  152. return nil
  153. }
  154. c.Unlock()
  155. s.Lock()
  156. lb, ok := s.loadBalancers[nid]
  157. if !ok {
  158. s.Unlock()
  159. return nil
  160. }
  161. _, ok = lb.backEnds[eid]
  162. if !ok {
  163. s.Unlock()
  164. return nil
  165. }
  166. delete(lb.backEnds, eid)
  167. if len(lb.backEnds) == 0 {
  168. // All the backends for this service have been
  169. // removed. Time to remove the load balancer and also
  170. // remove the service entry in IPVS.
  171. rmService = true
  172. delete(s.loadBalancers, nid)
  173. }
  174. if len(s.loadBalancers) == 0 {
  175. // All loadbalancers for the service removed. Time to
  176. // remove the service itself.
  177. delete(c.serviceBindings, skey)
  178. }
  179. // Remove loadbalancer service(if needed) and backend in all
  180. // sandboxes in the network only if the vip is valid.
  181. if len(vip) != 0 {
  182. n.(*network).rmLBBackend(ip, vip, lb.fwMark, ingressPorts, rmService)
  183. }
  184. s.Unlock()
  185. // Delete the special "tasks.svc_name" backend record.
  186. n.(*network).deleteSvcRecords("tasks."+name, ip, nil, false)
  187. for _, alias := range aliases {
  188. n.(*network).deleteSvcRecords("tasks."+alias, ip, nil, false)
  189. }
  190. // If we are doing DNS RR add the endpoint IP to DNS record
  191. // right away.
  192. if len(vip) == 0 {
  193. n.(*network).deleteSvcRecords(name, ip, nil, false)
  194. for _, alias := range aliases {
  195. n.(*network).deleteSvcRecords(alias, ip, nil, false)
  196. }
  197. }
  198. // Remove the DNS record for VIP only if we are removing the service
  199. if rmService && len(vip) != 0 {
  200. n.(*network).deleteSvcRecords(name, vip, nil, false)
  201. for _, alias := range aliases {
  202. n.(*network).deleteSvcRecords(alias, vip, nil, false)
  203. }
  204. }
  205. return nil
  206. }
  207. // Get all loadbalancers on this network that is currently discovered
  208. // on this node.
  209. func (n *network) connectedLoadbalancers() []*loadBalancer {
  210. c := n.getController()
  211. serviceBindings := make([]*service, 0, len(c.serviceBindings))
  212. c.Lock()
  213. for _, s := range c.serviceBindings {
  214. serviceBindings = append(serviceBindings, s)
  215. }
  216. c.Unlock()
  217. var lbs []*loadBalancer
  218. for _, s := range serviceBindings {
  219. s.Lock()
  220. if lb, ok := s.loadBalancers[n.ID()]; ok {
  221. lbs = append(lbs, lb)
  222. }
  223. s.Unlock()
  224. }
  225. return lbs
  226. }
  227. // Populate all loadbalancers on the network that the passed endpoint
  228. // belongs to, into this sandbox.
  229. func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
  230. var gwIP net.IP
  231. // This is an interface less endpoint. Nothing to do.
  232. if ep.Iface() == nil {
  233. return
  234. }
  235. n := ep.getNetwork()
  236. eIP := ep.Iface().Address()
  237. if n.ingress {
  238. if err := addRedirectRules(sb.Key(), eIP, ep.ingressPorts); err != nil {
  239. logrus.Errorf("Failed to add redirect rules for ep %s: %v", ep.Name(), err)
  240. }
  241. }
  242. if sb.ingress {
  243. // For the ingress sandbox if this is not gateway
  244. // endpoint do nothing.
  245. if ep != sb.getGatewayEndpoint() {
  246. return
  247. }
  248. // This is the gateway endpoint. Now get the ingress
  249. // network and plumb the loadbalancers.
  250. gwIP = ep.Iface().Address().IP
  251. for _, ep := range sb.getConnectedEndpoints() {
  252. if !ep.endpointInGWNetwork() {
  253. n = ep.getNetwork()
  254. eIP = ep.Iface().Address()
  255. }
  256. }
  257. }
  258. for _, lb := range n.connectedLoadbalancers() {
  259. // Skip if vip is not valid.
  260. if len(lb.vip) == 0 {
  261. continue
  262. }
  263. lb.service.Lock()
  264. addService := true
  265. for _, ip := range lb.backEnds {
  266. sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts,
  267. eIP, gwIP, addService, n.ingress)
  268. addService = false
  269. }
  270. lb.service.Unlock()
  271. }
  272. }
  273. // Add loadbalancer backend to all sandboxes which has a connection to
  274. // this network. If needed add the service as well, as specified by
  275. // the addService bool.
  276. func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, addService bool) {
  277. n.WalkEndpoints(func(e Endpoint) bool {
  278. ep := e.(*endpoint)
  279. if sb, ok := ep.getSandbox(); ok {
  280. if !sb.isEndpointPopulated(ep) {
  281. return false
  282. }
  283. var gwIP net.IP
  284. if ep := sb.getGatewayEndpoint(); ep != nil {
  285. gwIP = ep.Iface().Address().IP
  286. }
  287. sb.addLBBackend(ip, vip, fwMark, ingressPorts, ep.Iface().Address(), gwIP, addService, n.ingress)
  288. }
  289. return false
  290. })
  291. }
  292. // Remove loadbalancer backend from all sandboxes which has a
  293. // connection to this network. If needed remove the service entry as
  294. // well, as specified by the rmService bool.
  295. func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, rmService bool) {
  296. n.WalkEndpoints(func(e Endpoint) bool {
  297. ep := e.(*endpoint)
  298. if sb, ok := ep.getSandbox(); ok {
  299. if !sb.isEndpointPopulated(ep) {
  300. return false
  301. }
  302. var gwIP net.IP
  303. if ep := sb.getGatewayEndpoint(); ep != nil {
  304. gwIP = ep.Iface().Address().IP
  305. }
  306. sb.rmLBBackend(ip, vip, fwMark, ingressPorts, ep.Iface().Address(), gwIP, rmService, n.ingress)
  307. }
  308. return false
  309. })
  310. }
  311. // Add loadbalancer backend into one connected sandbox.
  312. func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, gwIP net.IP, addService bool, isIngressNetwork bool) {
  313. if sb.osSbox == nil {
  314. return
  315. }
  316. if isIngressNetwork && !sb.ingress {
  317. return
  318. }
  319. i, err := ipvs.New(sb.Key())
  320. if err != nil {
  321. logrus.Errorf("Failed to create an ipvs handle for sbox %s: %v", sb.Key(), err)
  322. return
  323. }
  324. defer i.Close()
  325. s := &ipvs.Service{
  326. AddressFamily: nl.FAMILY_V4,
  327. FWMark: fwMark,
  328. SchedName: ipvs.RoundRobin,
  329. }
  330. if addService {
  331. var filteredPorts []*PortConfig
  332. if sb.ingress {
  333. filteredPorts = filterPortConfigs(ingressPorts, false)
  334. if err := programIngress(gwIP, filteredPorts, false); err != nil {
  335. logrus.Errorf("Failed to add ingress: %v", err)
  336. return
  337. }
  338. }
  339. logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, ingressPorts)
  340. if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, false); err != nil {
  341. logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
  342. return
  343. }
  344. if err := i.NewService(s); err != nil {
  345. logrus.Errorf("Failed to create a new service for vip %s fwmark %d: %v", vip, fwMark, err)
  346. return
  347. }
  348. }
  349. d := &ipvs.Destination{
  350. AddressFamily: nl.FAMILY_V4,
  351. Address: ip,
  352. Weight: 1,
  353. }
  354. // Remove the sched name before using the service to add
  355. // destination.
  356. s.SchedName = ""
  357. if err := i.NewDestination(s, d); err != nil && err != syscall.EEXIST {
  358. logrus.Errorf("Failed to create real server %s for vip %s fwmark %d in sb %s: %v", ip, vip, fwMark, sb.containerID, err)
  359. }
  360. }
  361. // Remove loadbalancer backend from one connected sandbox.
  362. func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, gwIP net.IP, rmService bool, isIngressNetwork bool) {
  363. if sb.osSbox == nil {
  364. return
  365. }
  366. if isIngressNetwork && !sb.ingress {
  367. return
  368. }
  369. i, err := ipvs.New(sb.Key())
  370. if err != nil {
  371. logrus.Errorf("Failed to create an ipvs handle for sbox %s: %v", sb.Key(), err)
  372. return
  373. }
  374. defer i.Close()
  375. s := &ipvs.Service{
  376. AddressFamily: nl.FAMILY_V4,
  377. FWMark: fwMark,
  378. }
  379. d := &ipvs.Destination{
  380. AddressFamily: nl.FAMILY_V4,
  381. Address: ip,
  382. Weight: 1,
  383. }
  384. if err := i.DelDestination(s, d); err != nil {
  385. logrus.Infof("Failed to delete real server %s for vip %s fwmark %d: %v", ip, vip, fwMark, err)
  386. }
  387. if rmService {
  388. s.SchedName = ipvs.RoundRobin
  389. if err := i.DelService(s); err != nil {
  390. logrus.Errorf("Failed to delete a new service for vip %s fwmark %d: %v", vip, fwMark, err)
  391. }
  392. var filteredPorts []*PortConfig
  393. if sb.ingress {
  394. filteredPorts = filterPortConfigs(ingressPorts, true)
  395. if err := programIngress(gwIP, filteredPorts, true); err != nil {
  396. logrus.Errorf("Failed to delete ingress: %v", err)
  397. }
  398. }
  399. if err := invokeFWMarker(sb.Key(), vip, fwMark, ingressPorts, eIP, true); err != nil {
  400. logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
  401. }
  402. }
  403. }
  404. const ingressChain = "DOCKER-INGRESS"
  405. var (
  406. ingressOnce sync.Once
  407. ingressProxyMu sync.Mutex
  408. ingressProxyTbl = make(map[string]io.Closer)
  409. portConfigMu sync.Mutex
  410. portConfigTbl = make(map[PortConfig]int)
  411. )
  412. func filterPortConfigs(ingressPorts []*PortConfig, isDelete bool) []*PortConfig {
  413. portConfigMu.Lock()
  414. iPorts := make([]*PortConfig, 0, len(ingressPorts))
  415. for _, pc := range ingressPorts {
  416. if isDelete {
  417. if cnt, ok := portConfigTbl[*pc]; ok {
  418. // This is the last reference to this
  419. // port config. Delete the port config
  420. // and add it to filtered list to be
  421. // plumbed.
  422. if cnt == 1 {
  423. delete(portConfigTbl, *pc)
  424. iPorts = append(iPorts, pc)
  425. continue
  426. }
  427. portConfigTbl[*pc] = cnt - 1
  428. }
  429. continue
  430. }
  431. if cnt, ok := portConfigTbl[*pc]; ok {
  432. portConfigTbl[*pc] = cnt + 1
  433. continue
  434. }
  435. // We are adding it for the first time. Add it to the
  436. // filter list to be plumbed.
  437. portConfigTbl[*pc] = 1
  438. iPorts = append(iPorts, pc)
  439. }
  440. portConfigMu.Unlock()
  441. return iPorts
  442. }
  443. func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) error {
  444. addDelOpt := "-I"
  445. if isDelete {
  446. addDelOpt = "-D"
  447. }
  448. chainExists := iptables.ExistChain(ingressChain, iptables.Nat)
  449. filterChainExists := iptables.ExistChain(ingressChain, iptables.Filter)
  450. ingressOnce.Do(func() {
  451. // Flush nat table and filter table ingress chain rules during init if it
  452. // exists. It might contain stale rules from previous life.
  453. if chainExists {
  454. if err := iptables.RawCombinedOutput("-t", "nat", "-F", ingressChain); err != nil {
  455. logrus.Errorf("Could not flush nat table ingress chain rules during init: %v", err)
  456. }
  457. }
  458. if filterChainExists {
  459. if err := iptables.RawCombinedOutput("-F", ingressChain); err != nil {
  460. logrus.Errorf("Could not flush filter table ingress chain rules during init: %v", err)
  461. }
  462. }
  463. })
  464. if !isDelete {
  465. if !chainExists {
  466. if err := iptables.RawCombinedOutput("-t", "nat", "-N", ingressChain); err != nil {
  467. return fmt.Errorf("failed to create ingress chain: %v", err)
  468. }
  469. }
  470. if !filterChainExists {
  471. if err := iptables.RawCombinedOutput("-N", ingressChain); err != nil {
  472. return fmt.Errorf("failed to create filter table ingress chain: %v", err)
  473. }
  474. }
  475. if !iptables.Exists(iptables.Nat, ingressChain, "-j", "RETURN") {
  476. if err := iptables.RawCombinedOutput("-t", "nat", "-A", ingressChain, "-j", "RETURN"); err != nil {
  477. return fmt.Errorf("failed to add return rule in nat table ingress chain: %v", err)
  478. }
  479. }
  480. if !iptables.Exists(iptables.Filter, ingressChain, "-j", "RETURN") {
  481. if err := iptables.RawCombinedOutput("-A", ingressChain, "-j", "RETURN"); err != nil {
  482. return fmt.Errorf("failed to add return rule to filter table ingress chain: %v", err)
  483. }
  484. }
  485. for _, chain := range []string{"OUTPUT", "PREROUTING"} {
  486. if !iptables.Exists(iptables.Nat, chain, "-m", "addrtype", "--dst-type", "LOCAL", "-j", ingressChain) {
  487. if err := iptables.RawCombinedOutput("-t", "nat", "-I", chain, "-m", "addrtype", "--dst-type", "LOCAL", "-j", ingressChain); err != nil {
  488. return fmt.Errorf("failed to add jump rule in %s to ingress chain: %v", chain, err)
  489. }
  490. }
  491. }
  492. if !iptables.Exists(iptables.Filter, "FORWARD", "-j", ingressChain) {
  493. if err := iptables.RawCombinedOutput("-I", "FORWARD", "-j", ingressChain); err != nil {
  494. return fmt.Errorf("failed to add jump rule to %s in filter table forward chain: %v", ingressChain, err)
  495. }
  496. }
  497. oifName, err := findOIFName(gwIP)
  498. if err != nil {
  499. return fmt.Errorf("failed to find gateway bridge interface name for %s: %v", gwIP, err)
  500. }
  501. path := filepath.Join("/proc/sys/net/ipv4/conf", oifName, "route_localnet")
  502. if err := ioutil.WriteFile(path, []byte{'1', '\n'}, 0644); err != nil {
  503. return fmt.Errorf("could not write to %s: %v", path, err)
  504. }
  505. ruleArgs := strings.Fields(fmt.Sprintf("-m addrtype --src-type LOCAL -o %s -j MASQUERADE", oifName))
  506. if !iptables.Exists(iptables.Nat, "POSTROUTING", ruleArgs...) {
  507. if err := iptables.RawCombinedOutput(append([]string{"-t", "nat", "-I", "POSTROUTING"}, ruleArgs...)...); err != nil {
  508. return fmt.Errorf("failed to add ingress localhost POSTROUTING rule for %s: %v", oifName, err)
  509. }
  510. }
  511. }
  512. for _, iPort := range ingressPorts {
  513. if iptables.ExistChain(ingressChain, iptables.Nat) {
  514. rule := strings.Fields(fmt.Sprintf("-t nat %s %s -p %s --dport %d -j DNAT --to-destination %s:%d",
  515. addDelOpt, ingressChain, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, gwIP, iPort.PublishedPort))
  516. if err := iptables.RawCombinedOutput(rule...); err != nil {
  517. errStr := fmt.Sprintf("setting up rule failed, %v: %v", rule, err)
  518. if !isDelete {
  519. return fmt.Errorf("%s", errStr)
  520. }
  521. logrus.Infof("%s", errStr)
  522. }
  523. }
  524. // Filter table rules to allow a published service to be accessible in the local node from..
  525. // 1) service tasks attached to other networks
  526. // 2) unmanaged containers on bridge networks
  527. rule := strings.Fields(fmt.Sprintf("%s %s -m state -p %s --sport %d --state ESTABLISHED,RELATED -j ACCEPT",
  528. addDelOpt, ingressChain, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort))
  529. if err := iptables.RawCombinedOutput(rule...); err != nil {
  530. errStr := fmt.Sprintf("setting up rule failed, %v: %v", rule, err)
  531. if !isDelete {
  532. return fmt.Errorf("%s", errStr)
  533. }
  534. logrus.Warnf("%s", errStr)
  535. }
  536. rule = strings.Fields(fmt.Sprintf("%s %s -p %s --dport %d -j ACCEPT",
  537. addDelOpt, ingressChain, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort))
  538. if err := iptables.RawCombinedOutput(rule...); err != nil {
  539. errStr := fmt.Sprintf("setting up rule failed, %v: %v", rule, err)
  540. if !isDelete {
  541. return fmt.Errorf("%s", errStr)
  542. }
  543. logrus.Warnf("%s", errStr)
  544. }
  545. if err := plumbProxy(iPort, isDelete); err != nil {
  546. logrus.Warnf("failed to create proxy for port %d: %v", iPort.PublishedPort, err)
  547. }
  548. }
  549. return nil
  550. }
  551. // In the filter table FORWARD chain first rule should be to jump to INGRESS-CHAIN
  552. // This chain has the rules to allow access to the published ports for swarm tasks
  553. // from local bridge networks and docker_gwbridge (ie:taks on other swarm netwroks)
  554. func arrangeIngressFilterRule() {
  555. if iptables.ExistChain(ingressChain, iptables.Filter) {
  556. if iptables.Exists(iptables.Filter, "FORWARD", "-j", ingressChain) {
  557. if err := iptables.RawCombinedOutput("-D", "FORWARD", "-j", ingressChain); err != nil {
  558. logrus.Warnf("failed to delete jump rule to ingressChain in filter table: %v", err)
  559. }
  560. }
  561. if err := iptables.RawCombinedOutput("-I", "FORWARD", "-j", ingressChain); err != nil {
  562. logrus.Warnf("failed to add jump rule to ingressChain in filter table: %v", err)
  563. }
  564. }
  565. }
  566. func findOIFName(ip net.IP) (string, error) {
  567. nlh := ns.NlHandle()
  568. routes, err := nlh.RouteGet(ip)
  569. if err != nil {
  570. return "", err
  571. }
  572. if len(routes) == 0 {
  573. return "", fmt.Errorf("no route to %s", ip)
  574. }
  575. // Pick the first route(typically there is only one route). We
  576. // don't support multipath.
  577. link, err := nlh.LinkByIndex(routes[0].LinkIndex)
  578. if err != nil {
  579. return "", err
  580. }
  581. return link.Attrs().Name, nil
  582. }
  583. func plumbProxy(iPort *PortConfig, isDelete bool) error {
  584. var (
  585. err error
  586. l io.Closer
  587. )
  588. portSpec := fmt.Sprintf("%d/%s", iPort.PublishedPort, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]))
  589. if isDelete {
  590. ingressProxyMu.Lock()
  591. if listener, ok := ingressProxyTbl[portSpec]; ok {
  592. if listener != nil {
  593. listener.Close()
  594. }
  595. }
  596. ingressProxyMu.Unlock()
  597. return nil
  598. }
  599. switch iPort.Protocol {
  600. case ProtocolTCP:
  601. l, err = net.ListenTCP("tcp", &net.TCPAddr{Port: int(iPort.PublishedPort)})
  602. case ProtocolUDP:
  603. l, err = net.ListenUDP("udp", &net.UDPAddr{Port: int(iPort.PublishedPort)})
  604. }
  605. if err != nil {
  606. return err
  607. }
  608. ingressProxyMu.Lock()
  609. ingressProxyTbl[portSpec] = l
  610. ingressProxyMu.Unlock()
  611. return nil
  612. }
  613. func writePortsToFile(ports []*PortConfig) (string, error) {
  614. f, err := ioutil.TempFile("", "port_configs")
  615. if err != nil {
  616. return "", err
  617. }
  618. defer f.Close()
  619. buf, err := proto.Marshal(&EndpointRecord{
  620. IngressPorts: ports,
  621. })
  622. n, err := f.Write(buf)
  623. if err != nil {
  624. return "", err
  625. }
  626. if n < len(buf) {
  627. return "", io.ErrShortWrite
  628. }
  629. return f.Name(), nil
  630. }
  631. func readPortsFromFile(fileName string) ([]*PortConfig, error) {
  632. buf, err := ioutil.ReadFile(fileName)
  633. if err != nil {
  634. return nil, err
  635. }
  636. var epRec EndpointRecord
  637. err = proto.Unmarshal(buf, &epRec)
  638. if err != nil {
  639. return nil, err
  640. }
  641. return epRec.IngressPorts, nil
  642. }
  643. // Invoke fwmarker reexec routine to mark vip destined packets with
  644. // the passed firewall mark.
  645. func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
  646. var ingressPortsFile string
  647. if len(ingressPorts) != 0 {
  648. var err error
  649. ingressPortsFile, err = writePortsToFile(ingressPorts)
  650. if err != nil {
  651. return err
  652. }
  653. defer os.Remove(ingressPortsFile)
  654. }
  655. addDelOpt := "-A"
  656. if isDelete {
  657. addDelOpt = "-D"
  658. }
  659. cmd := &exec.Cmd{
  660. Path: reexec.Self(),
  661. Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String()),
  662. Stdout: os.Stdout,
  663. Stderr: os.Stderr,
  664. }
  665. if err := cmd.Run(); err != nil {
  666. return fmt.Errorf("reexec failed: %v", err)
  667. }
  668. return nil
  669. }
  670. // Firewall marker reexec function.
  671. func fwMarker() {
  672. runtime.LockOSThread()
  673. defer runtime.UnlockOSThread()
  674. if len(os.Args) < 7 {
  675. logrus.Error("invalid number of arguments..")
  676. os.Exit(1)
  677. }
  678. var ingressPorts []*PortConfig
  679. if os.Args[5] != "" {
  680. var err error
  681. ingressPorts, err = readPortsFromFile(os.Args[5])
  682. if err != nil {
  683. logrus.Errorf("Failed reading ingress ports file: %v", err)
  684. os.Exit(6)
  685. }
  686. }
  687. vip := os.Args[2]
  688. fwMark, err := strconv.ParseUint(os.Args[3], 10, 32)
  689. if err != nil {
  690. logrus.Errorf("bad fwmark value(%s) passed: %v", os.Args[3], err)
  691. os.Exit(2)
  692. }
  693. addDelOpt := os.Args[4]
  694. rules := [][]string{}
  695. for _, iPort := range ingressPorts {
  696. rule := strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
  697. addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, fwMark))
  698. rules = append(rules, rule)
  699. }
  700. ns, err := netns.GetFromPath(os.Args[1])
  701. if err != nil {
  702. logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err)
  703. os.Exit(3)
  704. }
  705. defer ns.Close()
  706. if err := netns.Set(ns); err != nil {
  707. logrus.Errorf("setting into container net ns %v failed, %v", os.Args[1], err)
  708. os.Exit(4)
  709. }
  710. if addDelOpt == "-A" {
  711. eIP, subnet, err := net.ParseCIDR(os.Args[6])
  712. if err != nil {
  713. logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[6], err)
  714. os.Exit(9)
  715. }
  716. ruleParams := strings.Fields(fmt.Sprintf("-m ipvs --ipvs -d %s -j SNAT --to-source %s", subnet, eIP))
  717. if !iptables.Exists("nat", "POSTROUTING", ruleParams...) {
  718. rule := append(strings.Fields("-t nat -A POSTROUTING"), ruleParams...)
  719. rules = append(rules, rule)
  720. err := ioutil.WriteFile("/proc/sys/net/ipv4/vs/conntrack", []byte{'1', '\n'}, 0644)
  721. if err != nil {
  722. logrus.Errorf("Failed to write to /proc/sys/net/ipv4/vs/conntrack: %v", err)
  723. os.Exit(8)
  724. }
  725. }
  726. }
  727. rule := strings.Fields(fmt.Sprintf("-t mangle %s OUTPUT -d %s/32 -j MARK --set-mark %d", addDelOpt, vip, fwMark))
  728. rules = append(rules, rule)
  729. for _, rule := range rules {
  730. if err := iptables.RawCombinedOutputNative(rule...); err != nil {
  731. logrus.Errorf("setting up rule failed, %v: %v", rule, err)
  732. os.Exit(5)
  733. }
  734. }
  735. }
  736. func addRedirectRules(path string, eIP *net.IPNet, ingressPorts []*PortConfig) error {
  737. var ingressPortsFile string
  738. if len(ingressPorts) != 0 {
  739. var err error
  740. ingressPortsFile, err = writePortsToFile(ingressPorts)
  741. if err != nil {
  742. return err
  743. }
  744. defer os.Remove(ingressPortsFile)
  745. }
  746. cmd := &exec.Cmd{
  747. Path: reexec.Self(),
  748. Args: append([]string{"redirecter"}, path, eIP.String(), ingressPortsFile),
  749. Stdout: os.Stdout,
  750. Stderr: os.Stderr,
  751. }
  752. if err := cmd.Run(); err != nil {
  753. return fmt.Errorf("reexec failed: %v", err)
  754. }
  755. return nil
  756. }
  757. // Redirecter reexec function.
  758. func redirecter() {
  759. runtime.LockOSThread()
  760. defer runtime.UnlockOSThread()
  761. if len(os.Args) < 4 {
  762. logrus.Error("invalid number of arguments..")
  763. os.Exit(1)
  764. }
  765. var ingressPorts []*PortConfig
  766. if os.Args[3] != "" {
  767. var err error
  768. ingressPorts, err = readPortsFromFile(os.Args[3])
  769. if err != nil {
  770. logrus.Errorf("Failed reading ingress ports file: %v", err)
  771. os.Exit(2)
  772. }
  773. }
  774. eIP, _, err := net.ParseCIDR(os.Args[2])
  775. if err != nil {
  776. logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[2], err)
  777. os.Exit(3)
  778. }
  779. rules := [][]string{}
  780. for _, iPort := range ingressPorts {
  781. rule := strings.Fields(fmt.Sprintf("-t nat -A PREROUTING -d %s -p %s --dport %d -j REDIRECT --to-port %d",
  782. eIP.String(), strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort))
  783. rules = append(rules, rule)
  784. }
  785. ns, err := netns.GetFromPath(os.Args[1])
  786. if err != nil {
  787. logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err)
  788. os.Exit(4)
  789. }
  790. defer ns.Close()
  791. if err := netns.Set(ns); err != nil {
  792. logrus.Errorf("setting into container net ns %v failed, %v", os.Args[1], err)
  793. os.Exit(5)
  794. }
  795. for _, rule := range rules {
  796. if err := iptables.RawCombinedOutputNative(rule...); err != nil {
  797. logrus.Errorf("setting up rule failed, %v: %v", rule, err)
  798. os.Exit(5)
  799. }
  800. }
  801. }