netlink.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. // +build linux
  2. package ipvs
  3. import (
  4. "bytes"
  5. "encoding/binary"
  6. "fmt"
  7. "net"
  8. "os/exec"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "syscall"
  13. "unsafe"
  14. "github.com/sirupsen/logrus"
  15. "github.com/vishvananda/netlink/nl"
  16. "github.com/vishvananda/netns"
  17. )
  18. // For Quick Reference IPVS related netlink message is described at the end of this file.
  19. var (
  20. native = nl.NativeEndian()
  21. ipvsFamily int
  22. ipvsOnce sync.Once
  23. )
  24. type genlMsgHdr struct {
  25. cmd uint8
  26. version uint8
  27. reserved uint16
  28. }
  29. type ipvsFlags struct {
  30. flags uint32
  31. mask uint32
  32. }
  33. func deserializeGenlMsg(b []byte) (hdr *genlMsgHdr) {
  34. return (*genlMsgHdr)(unsafe.Pointer(&b[0:unsafe.Sizeof(*hdr)][0]))
  35. }
  36. func (hdr *genlMsgHdr) Serialize() []byte {
  37. return (*(*[unsafe.Sizeof(*hdr)]byte)(unsafe.Pointer(hdr)))[:]
  38. }
  39. func (hdr *genlMsgHdr) Len() int {
  40. return int(unsafe.Sizeof(*hdr))
  41. }
  42. func (f *ipvsFlags) Serialize() []byte {
  43. return (*(*[unsafe.Sizeof(*f)]byte)(unsafe.Pointer(f)))[:]
  44. }
  45. func (f *ipvsFlags) Len() int {
  46. return int(unsafe.Sizeof(*f))
  47. }
  48. func setup() {
  49. ipvsOnce.Do(func() {
  50. var err error
  51. if out, err := exec.Command("modprobe", "-va", "ip_vs").CombinedOutput(); err != nil {
  52. logrus.Warnf("Running modprobe ip_vs failed with message: `%s`, error: %v", strings.TrimSpace(string(out)), err)
  53. }
  54. ipvsFamily, err = getIPVSFamily()
  55. if err != nil {
  56. logrus.Error("Could not get ipvs family information from the kernel. It is possible that ipvs is not enabled in your kernel. Native loadbalancing will not work until this is fixed.")
  57. }
  58. })
  59. }
  60. func fillService(s *Service) nl.NetlinkRequestData {
  61. cmdAttr := nl.NewRtAttr(ipvsCmdAttrService, nil)
  62. nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddressFamily, nl.Uint16Attr(s.AddressFamily))
  63. if s.FWMark != 0 {
  64. nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFWMark, nl.Uint32Attr(s.FWMark))
  65. } else {
  66. nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrProtocol, nl.Uint16Attr(s.Protocol))
  67. nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddress, rawIPData(s.Address))
  68. // Port needs to be in network byte order.
  69. portBuf := new(bytes.Buffer)
  70. binary.Write(portBuf, binary.BigEndian, s.Port)
  71. nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPort, portBuf.Bytes())
  72. }
  73. nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrSchedName, nl.ZeroTerminated(s.SchedName))
  74. if s.PEName != "" {
  75. nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPEName, nl.ZeroTerminated(s.PEName))
  76. }
  77. f := &ipvsFlags{
  78. flags: s.Flags,
  79. mask: 0xFFFFFFFF,
  80. }
  81. nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFlags, f.Serialize())
  82. nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrTimeout, nl.Uint32Attr(s.Timeout))
  83. nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrNetmask, nl.Uint32Attr(s.Netmask))
  84. return cmdAttr
  85. }
  86. func fillDestinaton(d *Destination) nl.NetlinkRequestData {
  87. cmdAttr := nl.NewRtAttr(ipvsCmdAttrDest, nil)
  88. nl.NewRtAttrChild(cmdAttr, ipvsDestAttrAddress, rawIPData(d.Address))
  89. // Port needs to be in network byte order.
  90. portBuf := new(bytes.Buffer)
  91. binary.Write(portBuf, binary.BigEndian, d.Port)
  92. nl.NewRtAttrChild(cmdAttr, ipvsDestAttrPort, portBuf.Bytes())
  93. nl.NewRtAttrChild(cmdAttr, ipvsDestAttrForwardingMethod, nl.Uint32Attr(d.ConnectionFlags&ConnectionFlagFwdMask))
  94. nl.NewRtAttrChild(cmdAttr, ipvsDestAttrWeight, nl.Uint32Attr(uint32(d.Weight)))
  95. nl.NewRtAttrChild(cmdAttr, ipvsDestAttrUpperThreshold, nl.Uint32Attr(d.UpperThreshold))
  96. nl.NewRtAttrChild(cmdAttr, ipvsDestAttrLowerThreshold, nl.Uint32Attr(d.LowerThreshold))
  97. return cmdAttr
  98. }
  99. func (i *Handle) doCmdwithResponse(s *Service, d *Destination, cmd uint8) ([][]byte, error) {
  100. req := newIPVSRequest(cmd)
  101. req.Seq = atomic.AddUint32(&i.seq, 1)
  102. if s == nil {
  103. req.Flags |= syscall.NLM_F_DUMP //Flag to dump all messages
  104. req.AddData(nl.NewRtAttr(ipvsCmdAttrService, nil)) //Add a dummy attribute
  105. } else {
  106. req.AddData(fillService(s))
  107. }
  108. if d == nil {
  109. if cmd == ipvsCmdGetDest {
  110. req.Flags |= syscall.NLM_F_DUMP
  111. }
  112. } else {
  113. req.AddData(fillDestinaton(d))
  114. }
  115. res, err := execute(i.sock, req, 0)
  116. if err != nil {
  117. return [][]byte{}, err
  118. }
  119. return res, nil
  120. }
  121. func (i *Handle) doCmd(s *Service, d *Destination, cmd uint8) error {
  122. _, err := i.doCmdwithResponse(s, d, cmd)
  123. return err
  124. }
  125. func getIPVSFamily() (int, error) {
  126. sock, err := nl.GetNetlinkSocketAt(netns.None(), netns.None(), syscall.NETLINK_GENERIC)
  127. if err != nil {
  128. return 0, err
  129. }
  130. defer sock.Close()
  131. req := newGenlRequest(genlCtrlID, genlCtrlCmdGetFamily)
  132. req.AddData(nl.NewRtAttr(genlCtrlAttrFamilyName, nl.ZeroTerminated("IPVS")))
  133. msgs, err := execute(sock, req, 0)
  134. if err != nil {
  135. return 0, err
  136. }
  137. for _, m := range msgs {
  138. hdr := deserializeGenlMsg(m)
  139. attrs, err := nl.ParseRouteAttr(m[hdr.Len():])
  140. if err != nil {
  141. return 0, err
  142. }
  143. for _, attr := range attrs {
  144. switch int(attr.Attr.Type) {
  145. case genlCtrlAttrFamilyID:
  146. return int(native.Uint16(attr.Value[0:2])), nil
  147. }
  148. }
  149. }
  150. return 0, fmt.Errorf("no family id in the netlink response")
  151. }
  152. func rawIPData(ip net.IP) []byte {
  153. family := nl.GetIPFamily(ip)
  154. if family == nl.FAMILY_V4 {
  155. return ip.To4()
  156. }
  157. return ip
  158. }
  159. func newIPVSRequest(cmd uint8) *nl.NetlinkRequest {
  160. return newGenlRequest(ipvsFamily, cmd)
  161. }
  162. func newGenlRequest(familyID int, cmd uint8) *nl.NetlinkRequest {
  163. req := nl.NewNetlinkRequest(familyID, syscall.NLM_F_ACK)
  164. req.AddData(&genlMsgHdr{cmd: cmd, version: 1})
  165. return req
  166. }
  167. func execute(s *nl.NetlinkSocket, req *nl.NetlinkRequest, resType uint16) ([][]byte, error) {
  168. if err := s.Send(req); err != nil {
  169. return nil, err
  170. }
  171. pid, err := s.GetPid()
  172. if err != nil {
  173. return nil, err
  174. }
  175. var res [][]byte
  176. done:
  177. for {
  178. msgs, err := s.Receive()
  179. if err != nil {
  180. if s.GetFd() == -1 {
  181. return nil, fmt.Errorf("Socket got closed on receive")
  182. }
  183. if err == syscall.EAGAIN {
  184. // timeout fired
  185. continue
  186. }
  187. return nil, err
  188. }
  189. for _, m := range msgs {
  190. if m.Header.Seq != req.Seq {
  191. continue
  192. }
  193. if m.Header.Pid != pid {
  194. return nil, fmt.Errorf("Wrong pid %d, expected %d", m.Header.Pid, pid)
  195. }
  196. if m.Header.Type == syscall.NLMSG_DONE {
  197. break done
  198. }
  199. if m.Header.Type == syscall.NLMSG_ERROR {
  200. error := int32(native.Uint32(m.Data[0:4]))
  201. if error == 0 {
  202. break done
  203. }
  204. return nil, syscall.Errno(-error)
  205. }
  206. if resType != 0 && m.Header.Type != resType {
  207. continue
  208. }
  209. res = append(res, m.Data)
  210. if m.Header.Flags&syscall.NLM_F_MULTI == 0 {
  211. break done
  212. }
  213. }
  214. }
  215. return res, nil
  216. }
  217. func parseIP(ip []byte, family uint16) (net.IP, error) {
  218. var resIP net.IP
  219. switch family {
  220. case syscall.AF_INET:
  221. resIP = (net.IP)(ip[:4])
  222. case syscall.AF_INET6:
  223. resIP = (net.IP)(ip[:16])
  224. default:
  225. return nil, fmt.Errorf("parseIP Error ip=%v", ip)
  226. }
  227. return resIP, nil
  228. }
  229. // parseStats
  230. func assembleStats(msg []byte) (SvcStats, error) {
  231. var s SvcStats
  232. attrs, err := nl.ParseRouteAttr(msg)
  233. if err != nil {
  234. return s, err
  235. }
  236. for _, attr := range attrs {
  237. attrType := int(attr.Attr.Type)
  238. switch attrType {
  239. case ipvsSvcStatsConns:
  240. s.Connections = native.Uint32(attr.Value)
  241. case ipvsSvcStatsPktsIn:
  242. s.PacketsIn = native.Uint32(attr.Value)
  243. case ipvsSvcStatsPktsOut:
  244. s.PacketsOut = native.Uint32(attr.Value)
  245. case ipvsSvcStatsBytesIn:
  246. s.BytesIn = native.Uint64(attr.Value)
  247. case ipvsSvcStatsBytesOut:
  248. s.BytesOut = native.Uint64(attr.Value)
  249. case ipvsSvcStatsCPS:
  250. s.CPS = native.Uint32(attr.Value)
  251. case ipvsSvcStatsPPSIn:
  252. s.PPSIn = native.Uint32(attr.Value)
  253. case ipvsSvcStatsPPSOut:
  254. s.PPSOut = native.Uint32(attr.Value)
  255. case ipvsSvcStatsBPSIn:
  256. s.BPSIn = native.Uint32(attr.Value)
  257. case ipvsSvcStatsBPSOut:
  258. s.BPSOut = native.Uint32(attr.Value)
  259. }
  260. }
  261. return s, nil
  262. }
  263. // assembleService assembles a services back from a hain of netlink attributes
  264. func assembleService(attrs []syscall.NetlinkRouteAttr) (*Service, error) {
  265. var s Service
  266. for _, attr := range attrs {
  267. attrType := int(attr.Attr.Type)
  268. switch attrType {
  269. case ipvsSvcAttrAddressFamily:
  270. s.AddressFamily = native.Uint16(attr.Value)
  271. case ipvsSvcAttrProtocol:
  272. s.Protocol = native.Uint16(attr.Value)
  273. case ipvsSvcAttrAddress:
  274. ip, err := parseIP(attr.Value, s.AddressFamily)
  275. if err != nil {
  276. return nil, err
  277. }
  278. s.Address = ip
  279. case ipvsSvcAttrPort:
  280. s.Port = binary.BigEndian.Uint16(attr.Value)
  281. case ipvsSvcAttrFWMark:
  282. s.FWMark = native.Uint32(attr.Value)
  283. case ipvsSvcAttrSchedName:
  284. s.SchedName = nl.BytesToString(attr.Value)
  285. case ipvsSvcAttrFlags:
  286. s.Flags = native.Uint32(attr.Value)
  287. case ipvsSvcAttrTimeout:
  288. s.Timeout = native.Uint32(attr.Value)
  289. case ipvsSvcAttrNetmask:
  290. s.Netmask = native.Uint32(attr.Value)
  291. case ipvsSvcAttrStats:
  292. stats, err := assembleStats(attr.Value)
  293. if err != nil {
  294. return nil, err
  295. }
  296. s.Stats = stats
  297. }
  298. }
  299. return &s, nil
  300. }
  301. // parseService given a ipvs netlink response this function will respond with a valid service entry, an error otherwise
  302. func (i *Handle) parseService(msg []byte) (*Service, error) {
  303. var s *Service
  304. //Remove General header for this message and parse the NetLink message
  305. hdr := deserializeGenlMsg(msg)
  306. NetLinkAttrs, err := nl.ParseRouteAttr(msg[hdr.Len():])
  307. if err != nil {
  308. return nil, err
  309. }
  310. if len(NetLinkAttrs) == 0 {
  311. return nil, fmt.Errorf("error no valid netlink message found while parsing service record")
  312. }
  313. //Now Parse and get IPVS related attributes messages packed in this message.
  314. ipvsAttrs, err := nl.ParseRouteAttr(NetLinkAttrs[0].Value)
  315. if err != nil {
  316. return nil, err
  317. }
  318. //Assemble all the IPVS related attribute messages and create a service record
  319. s, err = assembleService(ipvsAttrs)
  320. if err != nil {
  321. return nil, err
  322. }
  323. return s, nil
  324. }
  325. // doGetServicesCmd a wrapper which could be used commonly for both GetServices() and GetService(*Service)
  326. func (i *Handle) doGetServicesCmd(svc *Service) ([]*Service, error) {
  327. var res []*Service
  328. msgs, err := i.doCmdwithResponse(svc, nil, ipvsCmdGetService)
  329. if err != nil {
  330. return nil, err
  331. }
  332. for _, msg := range msgs {
  333. srv, err := i.parseService(msg)
  334. if err != nil {
  335. return nil, err
  336. }
  337. res = append(res, srv)
  338. }
  339. return res, nil
  340. }
  341. // doCmdWithoutAttr a simple wrapper of netlink socket execute command
  342. func (i *Handle) doCmdWithoutAttr(cmd uint8) ([][]byte, error) {
  343. req := newIPVSRequest(cmd)
  344. req.Seq = atomic.AddUint32(&i.seq, 1)
  345. return execute(i.sock, req, 0)
  346. }
  347. func assembleDestination(attrs []syscall.NetlinkRouteAttr) (*Destination, error) {
  348. var d Destination
  349. for _, attr := range attrs {
  350. attrType := int(attr.Attr.Type)
  351. switch attrType {
  352. case ipvsDestAttrAddress:
  353. ip, err := parseIP(attr.Value, syscall.AF_INET)
  354. if err != nil {
  355. return nil, err
  356. }
  357. d.Address = ip
  358. case ipvsDestAttrPort:
  359. d.Port = binary.BigEndian.Uint16(attr.Value)
  360. case ipvsDestAttrForwardingMethod:
  361. d.ConnectionFlags = native.Uint32(attr.Value)
  362. case ipvsDestAttrWeight:
  363. d.Weight = int(native.Uint16(attr.Value))
  364. case ipvsDestAttrUpperThreshold:
  365. d.UpperThreshold = native.Uint32(attr.Value)
  366. case ipvsDestAttrLowerThreshold:
  367. d.LowerThreshold = native.Uint32(attr.Value)
  368. case ipvsDestAttrAddressFamily:
  369. d.AddressFamily = native.Uint16(attr.Value)
  370. }
  371. }
  372. return &d, nil
  373. }
  374. // parseDestination given a ipvs netlink response this function will respond with a valid destination entry, an error otherwise
  375. func (i *Handle) parseDestination(msg []byte) (*Destination, error) {
  376. var dst *Destination
  377. //Remove General header for this message
  378. hdr := deserializeGenlMsg(msg)
  379. NetLinkAttrs, err := nl.ParseRouteAttr(msg[hdr.Len():])
  380. if err != nil {
  381. return nil, err
  382. }
  383. if len(NetLinkAttrs) == 0 {
  384. return nil, fmt.Errorf("error no valid netlink message found while parsing destination record")
  385. }
  386. //Now Parse and get IPVS related attributes messages packed in this message.
  387. ipvsAttrs, err := nl.ParseRouteAttr(NetLinkAttrs[0].Value)
  388. if err != nil {
  389. return nil, err
  390. }
  391. //Assemble netlink attributes and create a Destination record
  392. dst, err = assembleDestination(ipvsAttrs)
  393. if err != nil {
  394. return nil, err
  395. }
  396. return dst, nil
  397. }
  398. // doGetDestinationsCmd a wrapper function to be used by GetDestinations and GetDestination(d) apis
  399. func (i *Handle) doGetDestinationsCmd(s *Service, d *Destination) ([]*Destination, error) {
  400. var res []*Destination
  401. msgs, err := i.doCmdwithResponse(s, d, ipvsCmdGetDest)
  402. if err != nil {
  403. return nil, err
  404. }
  405. for _, msg := range msgs {
  406. dest, err := i.parseDestination(msg)
  407. if err != nil {
  408. return res, err
  409. }
  410. res = append(res, dest)
  411. }
  412. return res, nil
  413. }
  414. // IPVS related netlink message format explained
  415. /* EACH NETLINK MSG is of the below format, this is what we will receive from execute() api.
  416. If we have multiple netlink objects to process like GetServices() etc., execute() will
  417. supply an array of this below object
  418. NETLINK MSG
  419. |-----------------------------------|
  420. 0 1 2 3
  421. |--------|--------|--------|--------| -
  422. | CMD ID | VER | RESERVED | |==> General Message Header represented by genlMsgHdr
  423. |-----------------------------------| -
  424. | ATTR LEN | ATTR TYPE | |
  425. |-----------------------------------| |
  426. | | |
  427. | VALUE | |
  428. | []byte Array of IPVS MSG | |==> Attribute Message represented by syscall.NetlinkRouteAttr
  429. | PADDED BY 4 BYTES | |
  430. | | |
  431. |-----------------------------------| -
  432. Once We strip genlMsgHdr from above NETLINK MSG, we should parse the VALUE.
  433. VALUE will have an array of netlink attributes (syscall.NetlinkRouteAttr) such that each attribute will
  434. represent a "Service" or "Destination" object's field. If we assemble these attributes we can construct
  435. Service or Destination.
  436. IPVS MSG
  437. |-----------------------------------|
  438. 0 1 2 3
  439. |--------|--------|--------|--------|
  440. | ATTR LEN | ATTR TYPE |
  441. |-----------------------------------|
  442. | |
  443. | |
  444. | []byte IPVS ATTRIBUTE BY 4 BYTES |
  445. | |
  446. | |
  447. |-----------------------------------|
  448. NEXT ATTRIBUTE
  449. |-----------------------------------|
  450. | ATTR LEN | ATTR TYPE |
  451. |-----------------------------------|
  452. | |
  453. | |
  454. | []byte IPVS ATTRIBUTE BY 4 BYTES |
  455. | |
  456. | |
  457. |-----------------------------------|
  458. NEXT ATTRIBUTE
  459. |-----------------------------------|
  460. | ATTR LEN | ATTR TYPE |
  461. |-----------------------------------|
  462. | |
  463. | |
  464. | []byte IPVS ATTRIBUTE BY 4 BYTES |
  465. | |
  466. | |
  467. |-----------------------------------|
  468. */