service_linux.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804
  1. package libnetwork
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. "os"
  7. "os/exec"
  8. "path/filepath"
  9. "runtime"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "syscall"
  14. "github.com/docker/docker/libnetwork/iptables"
  15. "github.com/docker/docker/libnetwork/ns"
  16. "github.com/docker/docker/pkg/reexec"
  17. "github.com/gogo/protobuf/proto"
  18. "github.com/ishidawataru/sctp"
  19. "github.com/moby/ipvs"
  20. "github.com/sirupsen/logrus"
  21. "github.com/vishvananda/netlink/nl"
  22. "github.com/vishvananda/netns"
  23. )
  24. func init() {
  25. reexec.Register("fwmarker", fwMarker)
  26. reexec.Register("redirector", redirector)
  27. }
  28. // Populate all loadbalancers on the network that the passed endpoint
  29. // belongs to, into this sandbox.
  30. func (sb *sandbox) populateLoadBalancers(ep *endpoint) {
  31. // This is an interface less endpoint. Nothing to do.
  32. if ep.Iface() == nil {
  33. return
  34. }
  35. n := ep.getNetwork()
  36. eIP := ep.Iface().Address()
  37. if n.ingress {
  38. if err := addRedirectRules(sb.Key(), eIP, ep.ingressPorts); err != nil {
  39. logrus.Errorf("Failed to add redirect rules for ep %s (%.7s): %v", ep.Name(), ep.ID(), err)
  40. }
  41. }
  42. }
  43. func (n *network) findLBEndpointSandbox() (*endpoint, *sandbox, error) {
  44. // TODO: get endpoint from store? See EndpointInfo()
  45. var ep *endpoint
  46. // Find this node's LB sandbox endpoint: there should be exactly one
  47. for _, e := range n.Endpoints() {
  48. epi := e.Info()
  49. if epi != nil && epi.LoadBalancer() {
  50. ep = e.(*endpoint)
  51. break
  52. }
  53. }
  54. if ep == nil {
  55. return nil, nil, fmt.Errorf("Unable to find load balancing endpoint for network %s", n.ID())
  56. }
  57. // Get the load balancer sandbox itself as well
  58. sb, ok := ep.getSandbox()
  59. if !ok {
  60. return nil, nil, fmt.Errorf("Unable to get sandbox for %s(%s) in for %s", ep.Name(), ep.ID(), n.ID())
  61. }
  62. sep := sb.getEndpoint(ep.ID())
  63. if sep == nil {
  64. return nil, nil, fmt.Errorf("Load balancing endpoint %s(%s) removed from %s", ep.Name(), ep.ID(), n.ID())
  65. }
  66. return sep, sb, nil
  67. }
  68. // Searches the OS sandbox for the name of the endpoint interface
  69. // within the sandbox. This is required for adding/removing IP
  70. // aliases to the interface.
  71. func findIfaceDstName(sb *sandbox, ep *endpoint) string {
  72. srcName := ep.Iface().SrcName()
  73. for _, i := range sb.osSbox.Info().Interfaces() {
  74. if i.SrcName() == srcName {
  75. return i.DstName()
  76. }
  77. }
  78. return ""
  79. }
  80. // Add loadbalancer backend to the loadbalncer sandbox for the network.
  81. // If needed add the service as well.
  82. func (n *network) addLBBackend(ip net.IP, lb *loadBalancer) {
  83. if len(lb.vip) == 0 {
  84. return
  85. }
  86. ep, sb, err := n.findLBEndpointSandbox()
  87. if err != nil {
  88. logrus.Errorf("addLBBackend %s/%s: %v", n.ID(), n.Name(), err)
  89. return
  90. }
  91. if sb.osSbox == nil {
  92. return
  93. }
  94. eIP := ep.Iface().Address()
  95. i, err := ipvs.New(sb.Key())
  96. if err != nil {
  97. logrus.Errorf("Failed to create an ipvs handle for sbox %.7s (%.7s,%s) for lb addition: %v", sb.ID(), sb.ContainerID(), sb.Key(), err)
  98. return
  99. }
  100. defer i.Close()
  101. s := &ipvs.Service{
  102. AddressFamily: nl.FAMILY_V4,
  103. FWMark: lb.fwMark,
  104. SchedName: ipvs.RoundRobin,
  105. }
  106. if !i.IsServicePresent(s) {
  107. // Add IP alias for the VIP to the endpoint
  108. ifName := findIfaceDstName(sb, ep)
  109. if ifName == "" {
  110. logrus.Errorf("Failed find interface name for endpoint %s(%s) to create LB alias", ep.ID(), ep.Name())
  111. return
  112. }
  113. err := sb.osSbox.AddAliasIP(ifName, &net.IPNet{IP: lb.vip, Mask: net.CIDRMask(32, 32)})
  114. if err != nil {
  115. logrus.Errorf("Failed add IP alias %s to network %s LB endpoint interface %s: %v", lb.vip, n.ID(), ifName, err)
  116. return
  117. }
  118. if sb.ingress {
  119. var gwIP net.IP
  120. if ep := sb.getGatewayEndpoint(); ep != nil {
  121. gwIP = ep.Iface().Address().IP
  122. }
  123. if err := programIngress(gwIP, lb.service.ingressPorts, false); err != nil {
  124. logrus.Errorf("Failed to add ingress: %v", err)
  125. return
  126. }
  127. }
  128. logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v in sbox %.7s (%.7s)", lb.vip, lb.fwMark, lb.service.ingressPorts, sb.ID(), sb.ContainerID())
  129. if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, false, n.loadBalancerMode); err != nil {
  130. logrus.Errorf("Failed to add firewall mark rule in sbox %.7s (%.7s): %v", sb.ID(), sb.ContainerID(), err)
  131. return
  132. }
  133. if err := i.NewService(s); err != nil && err != syscall.EEXIST {
  134. logrus.Errorf("Failed to create a new service for vip %s fwmark %d in sbox %.7s (%.7s): %v", lb.vip, lb.fwMark, sb.ID(), sb.ContainerID(), err)
  135. return
  136. }
  137. }
  138. d := &ipvs.Destination{
  139. AddressFamily: nl.FAMILY_V4,
  140. Address: ip,
  141. Weight: 1,
  142. }
  143. if n.loadBalancerMode == loadBalancerModeDSR {
  144. d.ConnectionFlags = ipvs.ConnFwdDirectRoute
  145. }
  146. // Remove the sched name before using the service to add
  147. // destination.
  148. s.SchedName = ""
  149. if err := i.NewDestination(s, d); err != nil && err != syscall.EEXIST {
  150. logrus.Errorf("Failed to create real server %s for vip %s fwmark %d in sbox %.7s (%.7s): %v", ip, lb.vip, lb.fwMark, sb.ID(), sb.ContainerID(), err)
  151. }
  152. }
  153. // Remove loadbalancer backend the load balancing endpoint for this
  154. // network. If 'rmService' is true, then remove the service entry as well.
  155. // If 'fullRemove' is true then completely remove the entry, otherwise
  156. // just deweight it for now.
  157. func (n *network) rmLBBackend(ip net.IP, lb *loadBalancer, rmService bool, fullRemove bool) {
  158. if len(lb.vip) == 0 {
  159. return
  160. }
  161. ep, sb, err := n.findLBEndpointSandbox()
  162. if err != nil {
  163. logrus.Debugf("rmLBBackend for %s/%s: %v -- probably transient state", n.ID(), n.Name(), err)
  164. return
  165. }
  166. if sb.osSbox == nil {
  167. return
  168. }
  169. eIP := ep.Iface().Address()
  170. i, err := ipvs.New(sb.Key())
  171. if err != nil {
  172. logrus.Errorf("Failed to create an ipvs handle for sbox %.7s (%.7s,%s) for lb removal: %v", sb.ID(), sb.ContainerID(), sb.Key(), err)
  173. return
  174. }
  175. defer i.Close()
  176. s := &ipvs.Service{
  177. AddressFamily: nl.FAMILY_V4,
  178. FWMark: lb.fwMark,
  179. }
  180. d := &ipvs.Destination{
  181. AddressFamily: nl.FAMILY_V4,
  182. Address: ip,
  183. Weight: 1,
  184. }
  185. if n.loadBalancerMode == loadBalancerModeDSR {
  186. d.ConnectionFlags = ipvs.ConnFwdDirectRoute
  187. }
  188. if fullRemove {
  189. if err := i.DelDestination(s, d); err != nil && err != syscall.ENOENT {
  190. logrus.Errorf("Failed to delete real server %s for vip %s fwmark %d in sbox %.7s (%.7s): %v", ip, lb.vip, lb.fwMark, sb.ID(), sb.ContainerID(), err)
  191. }
  192. } else {
  193. d.Weight = 0
  194. if err := i.UpdateDestination(s, d); err != nil && err != syscall.ENOENT {
  195. logrus.Errorf("Failed to set LB weight of real server %s to 0 for vip %s fwmark %d in sbox %.7s (%.7s): %v", ip, lb.vip, lb.fwMark, sb.ID(), sb.ContainerID(), err)
  196. }
  197. }
  198. if rmService {
  199. s.SchedName = ipvs.RoundRobin
  200. if err := i.DelService(s); err != nil && err != syscall.ENOENT {
  201. logrus.Errorf("Failed to delete service for vip %s fwmark %d in sbox %.7s (%.7s): %v", lb.vip, lb.fwMark, sb.ID(), sb.ContainerID(), err)
  202. }
  203. if sb.ingress {
  204. var gwIP net.IP
  205. if ep := sb.getGatewayEndpoint(); ep != nil {
  206. gwIP = ep.Iface().Address().IP
  207. }
  208. if err := programIngress(gwIP, lb.service.ingressPorts, true); err != nil {
  209. logrus.Errorf("Failed to delete ingress: %v", err)
  210. }
  211. }
  212. if err := invokeFWMarker(sb.Key(), lb.vip, lb.fwMark, lb.service.ingressPorts, eIP, true, n.loadBalancerMode); err != nil {
  213. logrus.Errorf("Failed to delete firewall mark rule in sbox %.7s (%.7s): %v", sb.ID(), sb.ContainerID(), err)
  214. }
  215. // Remove IP alias from the VIP to the endpoint
  216. ifName := findIfaceDstName(sb, ep)
  217. if ifName == "" {
  218. logrus.Errorf("Failed find interface name for endpoint %s(%s) to create LB alias", ep.ID(), ep.Name())
  219. return
  220. }
  221. err := sb.osSbox.RemoveAliasIP(ifName, &net.IPNet{IP: lb.vip, Mask: net.CIDRMask(32, 32)})
  222. if err != nil {
  223. logrus.Errorf("Failed add IP alias %s to network %s LB endpoint interface %s: %v", lb.vip, n.ID(), ifName, err)
  224. }
  225. }
  226. }
  227. const ingressChain = "DOCKER-INGRESS"
  228. var (
  229. ingressOnce sync.Once
  230. ingressMu sync.Mutex // lock for operations on ingress
  231. ingressProxyTbl = make(map[string]io.Closer)
  232. portConfigMu sync.Mutex
  233. portConfigTbl = make(map[PortConfig]int)
  234. )
  235. func filterPortConfigs(ingressPorts []*PortConfig, isDelete bool) []*PortConfig {
  236. portConfigMu.Lock()
  237. iPorts := make([]*PortConfig, 0, len(ingressPorts))
  238. for _, pc := range ingressPorts {
  239. if isDelete {
  240. if cnt, ok := portConfigTbl[*pc]; ok {
  241. // This is the last reference to this
  242. // port config. Delete the port config
  243. // and add it to filtered list to be
  244. // plumbed.
  245. if cnt == 1 {
  246. delete(portConfigTbl, *pc)
  247. iPorts = append(iPorts, pc)
  248. continue
  249. }
  250. portConfigTbl[*pc] = cnt - 1
  251. }
  252. continue
  253. }
  254. if cnt, ok := portConfigTbl[*pc]; ok {
  255. portConfigTbl[*pc] = cnt + 1
  256. continue
  257. }
  258. // We are adding it for the first time. Add it to the
  259. // filter list to be plumbed.
  260. portConfigTbl[*pc] = 1
  261. iPorts = append(iPorts, pc)
  262. }
  263. portConfigMu.Unlock()
  264. return iPorts
  265. }
  266. func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) error {
  267. // TODO IPv6 support
  268. iptable := iptables.GetIptable(iptables.IPv4)
  269. addDelOpt := "-I"
  270. rollbackAddDelOpt := "-D"
  271. if isDelete {
  272. addDelOpt = "-D"
  273. rollbackAddDelOpt = "-I"
  274. }
  275. ingressMu.Lock()
  276. defer ingressMu.Unlock()
  277. chainExists := iptable.ExistChain(ingressChain, iptables.Nat)
  278. filterChainExists := iptable.ExistChain(ingressChain, iptables.Filter)
  279. ingressOnce.Do(func() {
  280. // Flush nat table and filter table ingress chain rules during init if it
  281. // exists. It might contain stale rules from previous life.
  282. if chainExists {
  283. if err := iptable.RawCombinedOutput("-t", "nat", "-F", ingressChain); err != nil {
  284. logrus.Errorf("Could not flush nat table ingress chain rules during init: %v", err)
  285. }
  286. }
  287. if filterChainExists {
  288. if err := iptable.RawCombinedOutput("-F", ingressChain); err != nil {
  289. logrus.Errorf("Could not flush filter table ingress chain rules during init: %v", err)
  290. }
  291. }
  292. })
  293. if !isDelete {
  294. if !chainExists {
  295. if err := iptable.RawCombinedOutput("-t", "nat", "-N", ingressChain); err != nil {
  296. return fmt.Errorf("failed to create ingress chain: %v", err)
  297. }
  298. }
  299. if !filterChainExists {
  300. if err := iptable.RawCombinedOutput("-N", ingressChain); err != nil {
  301. return fmt.Errorf("failed to create filter table ingress chain: %v", err)
  302. }
  303. }
  304. if !iptable.Exists(iptables.Nat, ingressChain, "-j", "RETURN") {
  305. if err := iptable.RawCombinedOutput("-t", "nat", "-A", ingressChain, "-j", "RETURN"); err != nil {
  306. return fmt.Errorf("failed to add return rule in nat table ingress chain: %v", err)
  307. }
  308. }
  309. if !iptable.Exists(iptables.Filter, ingressChain, "-j", "RETURN") {
  310. if err := iptable.RawCombinedOutput("-A", ingressChain, "-j", "RETURN"); err != nil {
  311. return fmt.Errorf("failed to add return rule to filter table ingress chain: %v", err)
  312. }
  313. }
  314. for _, chain := range []string{"OUTPUT", "PREROUTING"} {
  315. if !iptable.Exists(iptables.Nat, chain, "-m", "addrtype", "--dst-type", "LOCAL", "-j", ingressChain) {
  316. if err := iptable.RawCombinedOutput("-t", "nat", "-I", chain, "-m", "addrtype", "--dst-type", "LOCAL", "-j", ingressChain); err != nil {
  317. return fmt.Errorf("failed to add jump rule in %s to ingress chain: %v", chain, err)
  318. }
  319. }
  320. }
  321. if !iptable.Exists(iptables.Filter, "FORWARD", "-j", ingressChain) {
  322. if err := iptable.RawCombinedOutput("-I", "FORWARD", "-j", ingressChain); err != nil {
  323. return fmt.Errorf("failed to add jump rule to %s in filter table forward chain: %v", ingressChain, err)
  324. }
  325. arrangeUserFilterRule()
  326. }
  327. oifName, err := findOIFName(gwIP)
  328. if err != nil {
  329. return fmt.Errorf("failed to find gateway bridge interface name for %s: %v", gwIP, err)
  330. }
  331. path := filepath.Join("/proc/sys/net/ipv4/conf", oifName, "route_localnet")
  332. if err := os.WriteFile(path, []byte{'1', '\n'}, 0644); err != nil { //nolint:gosec // gosec complains about perms here, which must be 0644 in this case
  333. return fmt.Errorf("could not write to %s: %v", path, err)
  334. }
  335. ruleArgs := strings.Fields(fmt.Sprintf("-m addrtype --src-type LOCAL -o %s -j MASQUERADE", oifName))
  336. if !iptable.Exists(iptables.Nat, "POSTROUTING", ruleArgs...) {
  337. if err := iptable.RawCombinedOutput(append([]string{"-t", "nat", "-I", "POSTROUTING"}, ruleArgs...)...); err != nil {
  338. return fmt.Errorf("failed to add ingress localhost POSTROUTING rule for %s: %v", oifName, err)
  339. }
  340. }
  341. }
  342. //Filter the ingress ports until port rules start to be added/deleted
  343. filteredPorts := filterPortConfigs(ingressPorts, isDelete)
  344. rollbackRules := make([][]string, 0, len(filteredPorts)*3)
  345. var portErr error
  346. defer func() {
  347. if portErr != nil && !isDelete {
  348. filterPortConfigs(filteredPorts, !isDelete)
  349. for _, rule := range rollbackRules {
  350. if err := iptable.RawCombinedOutput(rule...); err != nil {
  351. logrus.Warnf("roll back rule failed, %v: %v", rule, err)
  352. }
  353. }
  354. }
  355. }()
  356. for _, iPort := range filteredPorts {
  357. if iptable.ExistChain(ingressChain, iptables.Nat) {
  358. rule := strings.Fields(fmt.Sprintf("-t nat %s %s -p %s --dport %d -j DNAT --to-destination %s:%d",
  359. addDelOpt, ingressChain, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, gwIP, iPort.PublishedPort))
  360. if portErr = iptable.RawCombinedOutput(rule...); portErr != nil {
  361. errStr := fmt.Sprintf("set up rule failed, %v: %v", rule, portErr)
  362. if !isDelete {
  363. return fmt.Errorf("%s", errStr)
  364. }
  365. logrus.Infof("%s", errStr)
  366. }
  367. rollbackRule := strings.Fields(fmt.Sprintf("-t nat %s %s -p %s --dport %d -j DNAT --to-destination %s:%d", rollbackAddDelOpt,
  368. ingressChain, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, gwIP, iPort.PublishedPort))
  369. rollbackRules = append(rollbackRules, rollbackRule)
  370. }
  371. // Filter table rules to allow a published service to be accessible in the local node from..
  372. // 1) service tasks attached to other networks
  373. // 2) unmanaged containers on bridge networks
  374. rule := strings.Fields(fmt.Sprintf("%s %s -m state -p %s --sport %d --state ESTABLISHED,RELATED -j ACCEPT",
  375. addDelOpt, ingressChain, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort))
  376. if portErr = iptable.RawCombinedOutput(rule...); portErr != nil {
  377. errStr := fmt.Sprintf("set up rule failed, %v: %v", rule, portErr)
  378. if !isDelete {
  379. return fmt.Errorf("%s", errStr)
  380. }
  381. logrus.Warnf("%s", errStr)
  382. }
  383. rollbackRule := strings.Fields(fmt.Sprintf("%s %s -m state -p %s --sport %d --state ESTABLISHED,RELATED -j ACCEPT", rollbackAddDelOpt,
  384. ingressChain, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort))
  385. rollbackRules = append(rollbackRules, rollbackRule)
  386. rule = strings.Fields(fmt.Sprintf("%s %s -p %s --dport %d -j ACCEPT",
  387. addDelOpt, ingressChain, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort))
  388. if portErr = iptable.RawCombinedOutput(rule...); portErr != nil {
  389. errStr := fmt.Sprintf("set up rule failed, %v: %v", rule, portErr)
  390. if !isDelete {
  391. return fmt.Errorf("%s", errStr)
  392. }
  393. logrus.Warnf("%s", errStr)
  394. }
  395. rollbackRule = strings.Fields(fmt.Sprintf("%s %s -p %s --dport %d -j ACCEPT", rollbackAddDelOpt,
  396. ingressChain, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort))
  397. rollbackRules = append(rollbackRules, rollbackRule)
  398. if err := plumbProxy(iPort, isDelete); err != nil {
  399. logrus.Warnf("failed to create proxy for port %d: %v", iPort.PublishedPort, err)
  400. }
  401. }
  402. return nil
  403. }
  404. // In the filter table FORWARD chain the first rule should be to jump to
  405. // DOCKER-USER so the user is able to filter packet first.
  406. // The second rule should be jump to INGRESS-CHAIN.
  407. // This chain has the rules to allow access to the published ports for swarm tasks
  408. // from local bridge networks and docker_gwbridge (ie:taks on other swarm networks)
  409. func arrangeIngressFilterRule() {
  410. // TODO IPv6 support
  411. iptable := iptables.GetIptable(iptables.IPv4)
  412. if iptable.ExistChain(ingressChain, iptables.Filter) {
  413. if iptable.Exists(iptables.Filter, "FORWARD", "-j", ingressChain) {
  414. if err := iptable.RawCombinedOutput("-D", "FORWARD", "-j", ingressChain); err != nil {
  415. logrus.Warnf("failed to delete jump rule to ingressChain in filter table: %v", err)
  416. }
  417. }
  418. if err := iptable.RawCombinedOutput("-I", "FORWARD", "-j", ingressChain); err != nil {
  419. logrus.Warnf("failed to add jump rule to ingressChain in filter table: %v", err)
  420. }
  421. }
  422. }
  423. func findOIFName(ip net.IP) (string, error) {
  424. nlh := ns.NlHandle()
  425. routes, err := nlh.RouteGet(ip)
  426. if err != nil {
  427. return "", err
  428. }
  429. if len(routes) == 0 {
  430. return "", fmt.Errorf("no route to %s", ip)
  431. }
  432. // Pick the first route(typically there is only one route). We
  433. // don't support multipath.
  434. link, err := nlh.LinkByIndex(routes[0].LinkIndex)
  435. if err != nil {
  436. return "", err
  437. }
  438. return link.Attrs().Name, nil
  439. }
  440. func plumbProxy(iPort *PortConfig, isDelete bool) error {
  441. var (
  442. err error
  443. l io.Closer
  444. )
  445. portSpec := fmt.Sprintf("%d/%s", iPort.PublishedPort, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]))
  446. if isDelete {
  447. if listener, ok := ingressProxyTbl[portSpec]; ok {
  448. if listener != nil {
  449. listener.Close()
  450. }
  451. }
  452. return nil
  453. }
  454. switch iPort.Protocol {
  455. case ProtocolTCP:
  456. l, err = net.ListenTCP("tcp", &net.TCPAddr{Port: int(iPort.PublishedPort)})
  457. case ProtocolUDP:
  458. l, err = net.ListenUDP("udp", &net.UDPAddr{Port: int(iPort.PublishedPort)})
  459. case ProtocolSCTP:
  460. l, err = sctp.ListenSCTP("sctp", &sctp.SCTPAddr{Port: int(iPort.PublishedPort)})
  461. default:
  462. err = fmt.Errorf("unknown protocol %v", iPort.Protocol)
  463. }
  464. if err != nil {
  465. return err
  466. }
  467. ingressProxyTbl[portSpec] = l
  468. return nil
  469. }
  470. func writePortsToFile(ports []*PortConfig) (string, error) {
  471. f, err := os.CreateTemp("", "port_configs")
  472. if err != nil {
  473. return "", err
  474. }
  475. defer f.Close() //nolint:gosec
  476. buf, _ := proto.Marshal(&EndpointRecord{
  477. IngressPorts: ports,
  478. })
  479. n, err := f.Write(buf)
  480. if err != nil {
  481. return "", err
  482. }
  483. if n < len(buf) {
  484. return "", io.ErrShortWrite
  485. }
  486. return f.Name(), nil
  487. }
  488. func readPortsFromFile(fileName string) ([]*PortConfig, error) {
  489. buf, err := os.ReadFile(fileName)
  490. if err != nil {
  491. return nil, err
  492. }
  493. var epRec EndpointRecord
  494. err = proto.Unmarshal(buf, &epRec)
  495. if err != nil {
  496. return nil, err
  497. }
  498. return epRec.IngressPorts, nil
  499. }
  500. // Invoke fwmarker reexec routine to mark vip destined packets with
  501. // the passed firewall mark.
  502. func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool, lbMode string) error {
  503. var ingressPortsFile string
  504. if len(ingressPorts) != 0 {
  505. var err error
  506. ingressPortsFile, err = writePortsToFile(ingressPorts)
  507. if err != nil {
  508. return err
  509. }
  510. defer os.Remove(ingressPortsFile)
  511. }
  512. addDelOpt := "-A"
  513. if isDelete {
  514. addDelOpt = "-D"
  515. }
  516. cmd := &exec.Cmd{
  517. Path: reexec.Self(),
  518. Args: append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.String(), lbMode),
  519. Stdout: os.Stdout,
  520. Stderr: os.Stderr,
  521. }
  522. if err := cmd.Run(); err != nil {
  523. return fmt.Errorf("reexec failed: %v", err)
  524. }
  525. return nil
  526. }
  527. // Firewall marker reexec function.
  528. func fwMarker() {
  529. // TODO IPv6 support
  530. iptable := iptables.GetIptable(iptables.IPv4)
  531. runtime.LockOSThread()
  532. defer runtime.UnlockOSThread()
  533. if len(os.Args) < 8 {
  534. logrus.Error("invalid number of arguments..")
  535. os.Exit(1)
  536. }
  537. var ingressPorts []*PortConfig
  538. if os.Args[5] != "" {
  539. var err error
  540. ingressPorts, err = readPortsFromFile(os.Args[5])
  541. if err != nil {
  542. logrus.Errorf("Failed reading ingress ports file: %v", err)
  543. os.Exit(2)
  544. }
  545. }
  546. vip := os.Args[2]
  547. fwMark, err := strconv.ParseUint(os.Args[3], 10, 32)
  548. if err != nil {
  549. logrus.Errorf("bad fwmark value(%s) passed: %v", os.Args[3], err)
  550. os.Exit(3)
  551. }
  552. addDelOpt := os.Args[4]
  553. rules := [][]string{}
  554. for _, iPort := range ingressPorts {
  555. rule := strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
  556. addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, fwMark))
  557. rules = append(rules, rule)
  558. }
  559. ns, err := netns.GetFromPath(os.Args[1])
  560. if err != nil {
  561. logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err)
  562. os.Exit(4)
  563. }
  564. defer ns.Close()
  565. if err := netns.Set(ns); err != nil {
  566. logrus.Errorf("setting into container net ns %v failed, %v", os.Args[1], err)
  567. os.Exit(5)
  568. }
  569. lbMode := os.Args[7]
  570. if addDelOpt == "-A" && lbMode == loadBalancerModeNAT {
  571. eIP, subnet, err := net.ParseCIDR(os.Args[6])
  572. if err != nil {
  573. logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[6], err)
  574. os.Exit(6)
  575. }
  576. ruleParams := strings.Fields(fmt.Sprintf("-m ipvs --ipvs -d %s -j SNAT --to-source %s", subnet, eIP))
  577. if !iptable.Exists("nat", "POSTROUTING", ruleParams...) {
  578. rule := append(strings.Fields("-t nat -A POSTROUTING"), ruleParams...)
  579. rules = append(rules, rule)
  580. err := os.WriteFile("/proc/sys/net/ipv4/vs/conntrack", []byte{'1', '\n'}, 0644)
  581. if err != nil {
  582. logrus.Errorf("Failed to write to /proc/sys/net/ipv4/vs/conntrack: %v", err)
  583. os.Exit(7)
  584. }
  585. }
  586. }
  587. rule := strings.Fields(fmt.Sprintf("-t mangle %s INPUT -d %s/32 -j MARK --set-mark %d", addDelOpt, vip, fwMark))
  588. rules = append(rules, rule)
  589. for _, rule := range rules {
  590. if err := iptable.RawCombinedOutputNative(rule...); err != nil {
  591. logrus.Errorf("set up rule failed, %v: %v", rule, err)
  592. os.Exit(8)
  593. }
  594. }
  595. }
  596. func addRedirectRules(path string, eIP *net.IPNet, ingressPorts []*PortConfig) error {
  597. var ingressPortsFile string
  598. if len(ingressPorts) != 0 {
  599. var err error
  600. ingressPortsFile, err = writePortsToFile(ingressPorts)
  601. if err != nil {
  602. return err
  603. }
  604. defer os.Remove(ingressPortsFile)
  605. }
  606. cmd := &exec.Cmd{
  607. Path: reexec.Self(),
  608. Args: append([]string{"redirector"}, path, eIP.String(), ingressPortsFile),
  609. Stdout: os.Stdout,
  610. Stderr: os.Stderr,
  611. }
  612. if err := cmd.Run(); err != nil {
  613. return fmt.Errorf("reexec failed: %v", err)
  614. }
  615. return nil
  616. }
  617. // Redirector reexec function.
  618. func redirector() {
  619. // TODO IPv6 support
  620. iptable := iptables.GetIptable(iptables.IPv4)
  621. runtime.LockOSThread()
  622. defer runtime.UnlockOSThread()
  623. if len(os.Args) < 4 {
  624. logrus.Error("invalid number of arguments..")
  625. os.Exit(1)
  626. }
  627. var ingressPorts []*PortConfig
  628. if os.Args[3] != "" {
  629. var err error
  630. ingressPorts, err = readPortsFromFile(os.Args[3])
  631. if err != nil {
  632. logrus.Errorf("Failed reading ingress ports file: %v", err)
  633. os.Exit(2)
  634. }
  635. }
  636. eIP, _, err := net.ParseCIDR(os.Args[2])
  637. if err != nil {
  638. logrus.Errorf("Failed to parse endpoint IP %s: %v", os.Args[2], err)
  639. os.Exit(3)
  640. }
  641. rules := [][]string{}
  642. for _, iPort := range ingressPorts {
  643. rule := strings.Fields(fmt.Sprintf("-t nat -A PREROUTING -d %s -p %s --dport %d -j REDIRECT --to-port %d",
  644. eIP.String(), strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.PublishedPort, iPort.TargetPort))
  645. rules = append(rules, rule)
  646. // Allow only incoming connections to exposed ports
  647. iRule := strings.Fields(fmt.Sprintf("-I INPUT -d %s -p %s --dport %d -m conntrack --ctstate NEW,ESTABLISHED -j ACCEPT",
  648. eIP.String(), strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.TargetPort))
  649. rules = append(rules, iRule)
  650. // Allow only outgoing connections from exposed ports
  651. oRule := strings.Fields(fmt.Sprintf("-I OUTPUT -s %s -p %s --sport %d -m conntrack --ctstate ESTABLISHED -j ACCEPT",
  652. eIP.String(), strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.TargetPort))
  653. rules = append(rules, oRule)
  654. }
  655. ns, err := netns.GetFromPath(os.Args[1])
  656. if err != nil {
  657. logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err)
  658. os.Exit(4)
  659. }
  660. defer ns.Close()
  661. if err := netns.Set(ns); err != nil {
  662. logrus.Errorf("setting into container net ns %v failed, %v", os.Args[1], err)
  663. os.Exit(5)
  664. }
  665. for _, rule := range rules {
  666. if err := iptable.RawCombinedOutputNative(rule...); err != nil {
  667. logrus.Errorf("set up rule failed, %v: %v", rule, err)
  668. os.Exit(6)
  669. }
  670. }
  671. if len(ingressPorts) == 0 {
  672. return
  673. }
  674. // Ensure blocking rules for anything else in/to ingress network
  675. for _, rule := range [][]string{
  676. {"-d", eIP.String(), "-p", "sctp", "-j", "DROP"},
  677. {"-d", eIP.String(), "-p", "udp", "-j", "DROP"},
  678. {"-d", eIP.String(), "-p", "tcp", "-j", "DROP"},
  679. } {
  680. if !iptable.ExistsNative(iptables.Filter, "INPUT", rule...) {
  681. if err := iptable.RawCombinedOutputNative(append([]string{"-A", "INPUT"}, rule...)...); err != nil {
  682. logrus.Errorf("set up rule failed, %v: %v", rule, err)
  683. os.Exit(7)
  684. }
  685. }
  686. rule[0] = "-s"
  687. if !iptable.ExistsNative(iptables.Filter, "OUTPUT", rule...) {
  688. if err := iptable.RawCombinedOutputNative(append([]string{"-A", "OUTPUT"}, rule...)...); err != nil {
  689. logrus.Errorf("set up rule failed, %v: %v", rule, err)
  690. os.Exit(8)
  691. }
  692. }
  693. }
  694. }