123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556 |
- // +build linux
- package ipvs
- import (
- "bytes"
- "encoding/binary"
- "fmt"
- "net"
- "os/exec"
- "strings"
- "sync"
- "sync/atomic"
- "syscall"
- "unsafe"
- "github.com/sirupsen/logrus"
- "github.com/vishvananda/netlink/nl"
- "github.com/vishvananda/netns"
- )
- // For Quick Reference IPVS related netlink message is described at the end of this file.
- var (
- native = nl.NativeEndian()
- ipvsFamily int
- ipvsOnce sync.Once
- )
- type genlMsgHdr struct {
- cmd uint8
- version uint8
- reserved uint16
- }
- type ipvsFlags struct {
- flags uint32
- mask uint32
- }
- func deserializeGenlMsg(b []byte) (hdr *genlMsgHdr) {
- return (*genlMsgHdr)(unsafe.Pointer(&b[0:unsafe.Sizeof(*hdr)][0]))
- }
- func (hdr *genlMsgHdr) Serialize() []byte {
- return (*(*[unsafe.Sizeof(*hdr)]byte)(unsafe.Pointer(hdr)))[:]
- }
- func (hdr *genlMsgHdr) Len() int {
- return int(unsafe.Sizeof(*hdr))
- }
- func (f *ipvsFlags) Serialize() []byte {
- return (*(*[unsafe.Sizeof(*f)]byte)(unsafe.Pointer(f)))[:]
- }
- func (f *ipvsFlags) Len() int {
- return int(unsafe.Sizeof(*f))
- }
- func setup() {
- ipvsOnce.Do(func() {
- var err error
- if out, err := exec.Command("modprobe", "-va", "ip_vs").CombinedOutput(); err != nil {
- logrus.Warnf("Running modprobe ip_vs failed with message: `%s`, error: %v", strings.TrimSpace(string(out)), err)
- }
- ipvsFamily, err = getIPVSFamily()
- if err != nil {
- logrus.Error("Could not get ipvs family information from the kernel. It is possible that ipvs is not enabled in your kernel. Native loadbalancing will not work until this is fixed.")
- }
- })
- }
- func fillService(s *Service) nl.NetlinkRequestData {
- cmdAttr := nl.NewRtAttr(ipvsCmdAttrService, nil)
- nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddressFamily, nl.Uint16Attr(s.AddressFamily))
- if s.FWMark != 0 {
- nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFWMark, nl.Uint32Attr(s.FWMark))
- } else {
- nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrProtocol, nl.Uint16Attr(s.Protocol))
- nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddress, rawIPData(s.Address))
- // Port needs to be in network byte order.
- portBuf := new(bytes.Buffer)
- binary.Write(portBuf, binary.BigEndian, s.Port)
- nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPort, portBuf.Bytes())
- }
- nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrSchedName, nl.ZeroTerminated(s.SchedName))
- if s.PEName != "" {
- nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPEName, nl.ZeroTerminated(s.PEName))
- }
- f := &ipvsFlags{
- flags: s.Flags,
- mask: 0xFFFFFFFF,
- }
- nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFlags, f.Serialize())
- nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrTimeout, nl.Uint32Attr(s.Timeout))
- nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrNetmask, nl.Uint32Attr(s.Netmask))
- return cmdAttr
- }
- func fillDestinaton(d *Destination) nl.NetlinkRequestData {
- cmdAttr := nl.NewRtAttr(ipvsCmdAttrDest, nil)
- nl.NewRtAttrChild(cmdAttr, ipvsDestAttrAddress, rawIPData(d.Address))
- // Port needs to be in network byte order.
- portBuf := new(bytes.Buffer)
- binary.Write(portBuf, binary.BigEndian, d.Port)
- nl.NewRtAttrChild(cmdAttr, ipvsDestAttrPort, portBuf.Bytes())
- nl.NewRtAttrChild(cmdAttr, ipvsDestAttrForwardingMethod, nl.Uint32Attr(d.ConnectionFlags&ConnectionFlagFwdMask))
- nl.NewRtAttrChild(cmdAttr, ipvsDestAttrWeight, nl.Uint32Attr(uint32(d.Weight)))
- nl.NewRtAttrChild(cmdAttr, ipvsDestAttrUpperThreshold, nl.Uint32Attr(d.UpperThreshold))
- nl.NewRtAttrChild(cmdAttr, ipvsDestAttrLowerThreshold, nl.Uint32Attr(d.LowerThreshold))
- return cmdAttr
- }
- func (i *Handle) doCmdwithResponse(s *Service, d *Destination, cmd uint8) ([][]byte, error) {
- req := newIPVSRequest(cmd)
- req.Seq = atomic.AddUint32(&i.seq, 1)
- if s == nil {
- req.Flags |= syscall.NLM_F_DUMP //Flag to dump all messages
- req.AddData(nl.NewRtAttr(ipvsCmdAttrService, nil)) //Add a dummy attribute
- } else {
- req.AddData(fillService(s))
- }
- if d == nil {
- if cmd == ipvsCmdGetDest {
- req.Flags |= syscall.NLM_F_DUMP
- }
- } else {
- req.AddData(fillDestinaton(d))
- }
- res, err := execute(i.sock, req, 0)
- if err != nil {
- return [][]byte{}, err
- }
- return res, nil
- }
- func (i *Handle) doCmd(s *Service, d *Destination, cmd uint8) error {
- _, err := i.doCmdwithResponse(s, d, cmd)
- return err
- }
- func getIPVSFamily() (int, error) {
- sock, err := nl.GetNetlinkSocketAt(netns.None(), netns.None(), syscall.NETLINK_GENERIC)
- if err != nil {
- return 0, err
- }
- defer sock.Close()
- req := newGenlRequest(genlCtrlID, genlCtrlCmdGetFamily)
- req.AddData(nl.NewRtAttr(genlCtrlAttrFamilyName, nl.ZeroTerminated("IPVS")))
- msgs, err := execute(sock, req, 0)
- if err != nil {
- return 0, err
- }
- for _, m := range msgs {
- hdr := deserializeGenlMsg(m)
- attrs, err := nl.ParseRouteAttr(m[hdr.Len():])
- if err != nil {
- return 0, err
- }
- for _, attr := range attrs {
- switch int(attr.Attr.Type) {
- case genlCtrlAttrFamilyID:
- return int(native.Uint16(attr.Value[0:2])), nil
- }
- }
- }
- return 0, fmt.Errorf("no family id in the netlink response")
- }
- func rawIPData(ip net.IP) []byte {
- family := nl.GetIPFamily(ip)
- if family == nl.FAMILY_V4 {
- return ip.To4()
- }
- return ip
- }
- func newIPVSRequest(cmd uint8) *nl.NetlinkRequest {
- return newGenlRequest(ipvsFamily, cmd)
- }
- func newGenlRequest(familyID int, cmd uint8) *nl.NetlinkRequest {
- req := nl.NewNetlinkRequest(familyID, syscall.NLM_F_ACK)
- req.AddData(&genlMsgHdr{cmd: cmd, version: 1})
- return req
- }
- func execute(s *nl.NetlinkSocket, req *nl.NetlinkRequest, resType uint16) ([][]byte, error) {
- if err := s.Send(req); err != nil {
- return nil, err
- }
- pid, err := s.GetPid()
- if err != nil {
- return nil, err
- }
- var res [][]byte
- done:
- for {
- msgs, err := s.Receive()
- if err != nil {
- if s.GetFd() == -1 {
- return nil, fmt.Errorf("Socket got closed on receive")
- }
- if err == syscall.EAGAIN {
- // timeout fired
- continue
- }
- return nil, err
- }
- for _, m := range msgs {
- if m.Header.Seq != req.Seq {
- continue
- }
- if m.Header.Pid != pid {
- return nil, fmt.Errorf("Wrong pid %d, expected %d", m.Header.Pid, pid)
- }
- if m.Header.Type == syscall.NLMSG_DONE {
- break done
- }
- if m.Header.Type == syscall.NLMSG_ERROR {
- error := int32(native.Uint32(m.Data[0:4]))
- if error == 0 {
- break done
- }
- return nil, syscall.Errno(-error)
- }
- if resType != 0 && m.Header.Type != resType {
- continue
- }
- res = append(res, m.Data)
- if m.Header.Flags&syscall.NLM_F_MULTI == 0 {
- break done
- }
- }
- }
- return res, nil
- }
- func parseIP(ip []byte, family uint16) (net.IP, error) {
- var resIP net.IP
- switch family {
- case syscall.AF_INET:
- resIP = (net.IP)(ip[:4])
- case syscall.AF_INET6:
- resIP = (net.IP)(ip[:16])
- default:
- return nil, fmt.Errorf("parseIP Error ip=%v", ip)
- }
- return resIP, nil
- }
- // parseStats
- func assembleStats(msg []byte) (SvcStats, error) {
- var s SvcStats
- attrs, err := nl.ParseRouteAttr(msg)
- if err != nil {
- return s, err
- }
- for _, attr := range attrs {
- attrType := int(attr.Attr.Type)
- switch attrType {
- case ipvsSvcStatsConns:
- s.Connections = native.Uint32(attr.Value)
- case ipvsSvcStatsPktsIn:
- s.PacketsIn = native.Uint32(attr.Value)
- case ipvsSvcStatsPktsOut:
- s.PacketsOut = native.Uint32(attr.Value)
- case ipvsSvcStatsBytesIn:
- s.BytesIn = native.Uint64(attr.Value)
- case ipvsSvcStatsBytesOut:
- s.BytesOut = native.Uint64(attr.Value)
- case ipvsSvcStatsCPS:
- s.CPS = native.Uint32(attr.Value)
- case ipvsSvcStatsPPSIn:
- s.PPSIn = native.Uint32(attr.Value)
- case ipvsSvcStatsPPSOut:
- s.PPSOut = native.Uint32(attr.Value)
- case ipvsSvcStatsBPSIn:
- s.BPSIn = native.Uint32(attr.Value)
- case ipvsSvcStatsBPSOut:
- s.BPSOut = native.Uint32(attr.Value)
- }
- }
- return s, nil
- }
- // assembleService assembles a services back from a hain of netlink attributes
- func assembleService(attrs []syscall.NetlinkRouteAttr) (*Service, error) {
- var s Service
- for _, attr := range attrs {
- attrType := int(attr.Attr.Type)
- switch attrType {
- case ipvsSvcAttrAddressFamily:
- s.AddressFamily = native.Uint16(attr.Value)
- case ipvsSvcAttrProtocol:
- s.Protocol = native.Uint16(attr.Value)
- case ipvsSvcAttrAddress:
- ip, err := parseIP(attr.Value, s.AddressFamily)
- if err != nil {
- return nil, err
- }
- s.Address = ip
- case ipvsSvcAttrPort:
- s.Port = binary.BigEndian.Uint16(attr.Value)
- case ipvsSvcAttrFWMark:
- s.FWMark = native.Uint32(attr.Value)
- case ipvsSvcAttrSchedName:
- s.SchedName = nl.BytesToString(attr.Value)
- case ipvsSvcAttrFlags:
- s.Flags = native.Uint32(attr.Value)
- case ipvsSvcAttrTimeout:
- s.Timeout = native.Uint32(attr.Value)
- case ipvsSvcAttrNetmask:
- s.Netmask = native.Uint32(attr.Value)
- case ipvsSvcAttrStats:
- stats, err := assembleStats(attr.Value)
- if err != nil {
- return nil, err
- }
- s.Stats = stats
- }
- }
- return &s, nil
- }
- // parseService given a ipvs netlink response this function will respond with a valid service entry, an error otherwise
- func (i *Handle) parseService(msg []byte) (*Service, error) {
- var s *Service
- //Remove General header for this message and parse the NetLink message
- hdr := deserializeGenlMsg(msg)
- NetLinkAttrs, err := nl.ParseRouteAttr(msg[hdr.Len():])
- if err != nil {
- return nil, err
- }
- if len(NetLinkAttrs) == 0 {
- return nil, fmt.Errorf("error no valid netlink message found while parsing service record")
- }
- //Now Parse and get IPVS related attributes messages packed in this message.
- ipvsAttrs, err := nl.ParseRouteAttr(NetLinkAttrs[0].Value)
- if err != nil {
- return nil, err
- }
- //Assemble all the IPVS related attribute messages and create a service record
- s, err = assembleService(ipvsAttrs)
- if err != nil {
- return nil, err
- }
- return s, nil
- }
- // doGetServicesCmd a wrapper which could be used commonly for both GetServices() and GetService(*Service)
- func (i *Handle) doGetServicesCmd(svc *Service) ([]*Service, error) {
- var res []*Service
- msgs, err := i.doCmdwithResponse(svc, nil, ipvsCmdGetService)
- if err != nil {
- return nil, err
- }
- for _, msg := range msgs {
- srv, err := i.parseService(msg)
- if err != nil {
- return nil, err
- }
- res = append(res, srv)
- }
- return res, nil
- }
- // doCmdWithoutAttr a simple wrapper of netlink socket execute command
- func (i *Handle) doCmdWithoutAttr(cmd uint8) ([][]byte, error) {
- req := newIPVSRequest(cmd)
- req.Seq = atomic.AddUint32(&i.seq, 1)
- return execute(i.sock, req, 0)
- }
- func assembleDestination(attrs []syscall.NetlinkRouteAttr) (*Destination, error) {
- var d Destination
- for _, attr := range attrs {
- attrType := int(attr.Attr.Type)
- switch attrType {
- case ipvsDestAttrAddress:
- ip, err := parseIP(attr.Value, syscall.AF_INET)
- if err != nil {
- return nil, err
- }
- d.Address = ip
- case ipvsDestAttrPort:
- d.Port = binary.BigEndian.Uint16(attr.Value)
- case ipvsDestAttrForwardingMethod:
- d.ConnectionFlags = native.Uint32(attr.Value)
- case ipvsDestAttrWeight:
- d.Weight = int(native.Uint16(attr.Value))
- case ipvsDestAttrUpperThreshold:
- d.UpperThreshold = native.Uint32(attr.Value)
- case ipvsDestAttrLowerThreshold:
- d.LowerThreshold = native.Uint32(attr.Value)
- case ipvsDestAttrAddressFamily:
- d.AddressFamily = native.Uint16(attr.Value)
- }
- }
- return &d, nil
- }
- // parseDestination given a ipvs netlink response this function will respond with a valid destination entry, an error otherwise
- func (i *Handle) parseDestination(msg []byte) (*Destination, error) {
- var dst *Destination
- //Remove General header for this message
- hdr := deserializeGenlMsg(msg)
- NetLinkAttrs, err := nl.ParseRouteAttr(msg[hdr.Len():])
- if err != nil {
- return nil, err
- }
- if len(NetLinkAttrs) == 0 {
- return nil, fmt.Errorf("error no valid netlink message found while parsing destination record")
- }
- //Now Parse and get IPVS related attributes messages packed in this message.
- ipvsAttrs, err := nl.ParseRouteAttr(NetLinkAttrs[0].Value)
- if err != nil {
- return nil, err
- }
- //Assemble netlink attributes and create a Destination record
- dst, err = assembleDestination(ipvsAttrs)
- if err != nil {
- return nil, err
- }
- return dst, nil
- }
- // doGetDestinationsCmd a wrapper function to be used by GetDestinations and GetDestination(d) apis
- func (i *Handle) doGetDestinationsCmd(s *Service, d *Destination) ([]*Destination, error) {
- var res []*Destination
- msgs, err := i.doCmdwithResponse(s, d, ipvsCmdGetDest)
- if err != nil {
- return nil, err
- }
- for _, msg := range msgs {
- dest, err := i.parseDestination(msg)
- if err != nil {
- return res, err
- }
- res = append(res, dest)
- }
- return res, nil
- }
- // IPVS related netlink message format explained
- /* EACH NETLINK MSG is of the below format, this is what we will receive from execute() api.
- If we have multiple netlink objects to process like GetServices() etc., execute() will
- supply an array of this below object
- NETLINK MSG
- |-----------------------------------|
- 0 1 2 3
- |--------|--------|--------|--------| -
- | CMD ID | VER | RESERVED | |==> General Message Header represented by genlMsgHdr
- |-----------------------------------| -
- | ATTR LEN | ATTR TYPE | |
- |-----------------------------------| |
- | | |
- | VALUE | |
- | []byte Array of IPVS MSG | |==> Attribute Message represented by syscall.NetlinkRouteAttr
- | PADDED BY 4 BYTES | |
- | | |
- |-----------------------------------| -
- Once We strip genlMsgHdr from above NETLINK MSG, we should parse the VALUE.
- VALUE will have an array of netlink attributes (syscall.NetlinkRouteAttr) such that each attribute will
- represent a "Service" or "Destination" object's field. If we assemble these attributes we can construct
- Service or Destination.
- IPVS MSG
- |-----------------------------------|
- 0 1 2 3
- |--------|--------|--------|--------|
- | ATTR LEN | ATTR TYPE |
- |-----------------------------------|
- | |
- | |
- | []byte IPVS ATTRIBUTE BY 4 BYTES |
- | |
- | |
- |-----------------------------------|
- NEXT ATTRIBUTE
- |-----------------------------------|
- | ATTR LEN | ATTR TYPE |
- |-----------------------------------|
- | |
- | |
- | []byte IPVS ATTRIBUTE BY 4 BYTES |
- | |
- | |
- |-----------------------------------|
- NEXT ATTRIBUTE
- |-----------------------------------|
- | ATTR LEN | ATTR TYPE |
- |-----------------------------------|
- | |
- | |
- | []byte IPVS ATTRIBUTE BY 4 BYTES |
- | |
- | |
- |-----------------------------------|
- */
|