diff --git a/libnetwork/ipvs/constants.go b/libnetwork/ipvs/constants.go index 103e71a37c..d36bec0e80 100644 --- a/libnetwork/ipvs/constants.go +++ b/libnetwork/ipvs/constants.go @@ -85,6 +85,23 @@ const ( ipvsDestAttrInactiveConnections ipvsDestAttrPersistentConnections ipvsDestAttrStats + ipvsDestAttrAddressFamily +) + +// IPVS Svc Statistics constancs + +const ( + ipvsSvcStatsUnspec int = iota + ipvsSvcStatsConns + ipvsSvcStatsPktsIn + ipvsSvcStatsPktsOut + ipvsSvcStatsBytesIn + ipvsSvcStatsBytesOut + ipvsSvcStatsCPS + ipvsSvcStatsPPSIn + ipvsSvcStatsPPSOut + ipvsSvcStatsBPSIn + ipvsSvcStatsBPSOut ) // Destination forwarding methods diff --git a/libnetwork/ipvs/ipvs.go b/libnetwork/ipvs/ipvs.go index 266cc24dbe..a285e102e3 100644 --- a/libnetwork/ipvs/ipvs.go +++ b/libnetwork/ipvs/ipvs.go @@ -6,6 +6,7 @@ import ( "net" "syscall" + "fmt" "github.com/vishvananda/netlink/nl" "github.com/vishvananda/netns" ) @@ -25,6 +26,21 @@ type Service struct { Netmask uint32 AddressFamily uint16 PEName string + Stats SvcStats +} + +// SvcStats defines an IPVS service statistics +type SvcStats struct { + Connections uint32 + PacketsIn uint32 + PacketsOut uint32 + BytesIn uint64 + BytesOut uint64 + CPS uint32 + BPSOut uint32 + PPSIn uint32 + PPSOut uint32 + BPSIn uint32 } // Destination defines an IPVS destination (real server) in its @@ -117,3 +133,29 @@ func (i *Handle) UpdateDestination(s *Service, d *Destination) error { func (i *Handle) DelDestination(s *Service, d *Destination) error { return i.doCmd(s, d, ipvsCmdDelDest) } + +// GetServices returns an array of services configured on the Node +func (i *Handle) GetServices() ([]*Service, error) { + return i.doGetServicesCmd(nil) +} + +// GetDestinations returns an array of Destinations configured for this Service +func (i *Handle) GetDestinations(s *Service) ([]*Destination, error) { + return i.doGetDestinationsCmd(s, nil) +} + +// GetService gets details of a specific IPVS services, useful in updating statisics etc., +func (i *Handle) GetService(s *Service) (*Service, error) { + + res, err := i.doGetServicesCmd(s) + if err != nil { + return nil, err + } + + // We are looking for exactly one service otherwise error out + if len(res) != 1 { + return nil, fmt.Errorf("Expected only one service obtained=%d", len(res)) + } + + return res[0], nil +} diff --git a/libnetwork/ipvs/ipvs_test.go b/libnetwork/ipvs/ipvs_test.go index 62c9a8f9cf..50b5c532e9 100644 --- a/libnetwork/ipvs/ipvs_test.go +++ b/libnetwork/ipvs/ipvs_test.go @@ -3,10 +3,7 @@ package ipvs import ( - "fmt" "net" - "os/exec" - "strings" "syscall" "testing" @@ -44,75 +41,73 @@ var ( } ) -func checkDestination(t *testing.T, checkPresent bool, protocol, serviceAddress, realAddress, fwdMethod string) { - var ( - realServerStart bool - realServers []string - ) +func lookupFwMethod(fwMethod uint32) string { - out, err := exec.Command("ipvsadm", "-Ln").CombinedOutput() - require.NoError(t, err) - - for _, o := range strings.Split(string(out), "\n") { - cmpStr := serviceAddress - if protocol == "FWM" { - cmpStr = " " + cmpStr - } - - if strings.Contains(o, cmpStr) { - realServerStart = true - continue - } - - if realServerStart { - if !strings.Contains(o, "->") { - break - } - - realServers = append(realServers, o) - } - } - - for _, r := range realServers { - if strings.Contains(r, realAddress) { - parts := strings.Fields(r) - assert.Equal(t, fwdMethod, parts[2]) - return - } - } - - if checkPresent { - t.Fatalf("Did not find the destination %s fwdMethod %s in ipvs output", realAddress, fwdMethod) + switch fwMethod { + case ConnectionFlagMasq: + return fwdMethodStrings[0] + case ConnectionFlagTunnel: + return fwdMethodStrings[1] + case ConnectionFlagDirectRoute: + return fwdMethodStrings[2] } + return "" } -func checkService(t *testing.T, checkPresent bool, protocol, schedMethod, serviceAddress string) { - out, err := exec.Command("ipvsadm", "-Ln").CombinedOutput() +func checkDestination(t *testing.T, i *Handle, s *Service, d *Destination, checkPresent bool) { + var dstFound bool + + dstArray, err := i.GetDestinations(s) require.NoError(t, err) - for _, o := range strings.Split(string(out), "\n") { - cmpStr := serviceAddress - if protocol == "FWM" { - cmpStr = " " + cmpStr - } - - if strings.Contains(o, cmpStr) { - parts := strings.Split(o, " ") - assert.Equal(t, protocol, parts[0]) - assert.Equal(t, serviceAddress, parts[2]) - assert.Equal(t, schedMethod, parts[3]) - - if !checkPresent { - t.Fatalf("Did not expect the service %s in ipvs output", serviceAddress) - } - - return + for _, dst := range dstArray { + if dst.Address.Equal(d.Address) && dst.Port == d.Port && lookupFwMethod(dst.ConnectionFlags) == lookupFwMethod(d.ConnectionFlags) { + dstFound = true + break } } - if checkPresent { - t.Fatalf("Did not find the service %s in ipvs output", serviceAddress) + switch checkPresent { + case true: //The test expects the service to be present + if !dstFound { + + t.Fatalf("Did not find the service %s in ipvs output", d.Address.String()) + } + case false: //The test expects that the service should not be present + if dstFound { + t.Fatalf("Did not find the destination %s fwdMethod %s in ipvs output", d.Address.String(), lookupFwMethod(d.ConnectionFlags)) + } } + +} + +func checkService(t *testing.T, i *Handle, s *Service, checkPresent bool) { + + svcArray, err := i.GetServices() + require.NoError(t, err) + + var svcFound bool + + for _, svc := range svcArray { + + if svc.Protocol == s.Protocol && svc.Address.String() == s.Address.String() && svc.Port == s.Port { + svcFound = true + break + } + } + + switch checkPresent { + case true: //The test expects the service to be present + if !svcFound { + + t.Fatalf("Did not find the service %s in ipvs output", s.Address.String()) + } + case false: //The test expects that the service should not be present + if svcFound { + t.Fatalf("Did not expect the service %s in ipvs output", s.Address.String()) + } + } + } func TestGetFamily(t *testing.T) { @@ -137,7 +132,6 @@ func TestService(t *testing.T) { for _, protocol := range protocols { for _, schedMethod := range schedMethods { - var serviceAddress string s := Service{ AddressFamily: nl.FAMILY_V4, @@ -147,24 +141,20 @@ func TestService(t *testing.T) { switch protocol { case "FWM": s.FWMark = 1234 - serviceAddress = fmt.Sprintf("%d", 1234) case "TCP": s.Protocol = syscall.IPPROTO_TCP s.Port = 80 s.Address = net.ParseIP("1.2.3.4") s.Netmask = 0xFFFFFFFF - serviceAddress = "1.2.3.4:80" case "UDP": s.Protocol = syscall.IPPROTO_UDP s.Port = 53 s.Address = net.ParseIP("2.3.4.5") - serviceAddress = "2.3.4.5:53" } err := i.NewService(&s) assert.NoError(t, err) - checkService(t, true, protocol, schedMethod, serviceAddress) - var lastMethod string + checkService(t, i, &s, true) for _, updateSchedMethod := range schedMethods { if updateSchedMethod == schedMethod { continue @@ -173,13 +163,18 @@ func TestService(t *testing.T) { s.SchedName = updateSchedMethod err = i.UpdateService(&s) assert.NoError(t, err) - checkService(t, true, protocol, updateSchedMethod, serviceAddress) - lastMethod = updateSchedMethod + checkService(t, i, &s, true) + + scopy, err := i.GetService(&s) + assert.NoError(t, err) + assert.Equal(t, (*scopy).Address.String(), s.Address.String()) + assert.Equal(t, (*scopy).Port, s.Port) + assert.Equal(t, (*scopy).Protocol, s.Protocol) } err = i.DelService(&s) assert.NoError(t, err) - checkService(t, false, protocol, lastMethod, serviceAddress) + checkService(t, i, &s, false) } } @@ -220,7 +215,6 @@ func TestDestination(t *testing.T) { require.NoError(t, err) for _, protocol := range protocols { - var serviceAddress string s := Service{ AddressFamily: nl.FAMILY_V4, @@ -230,26 +224,23 @@ func TestDestination(t *testing.T) { switch protocol { case "FWM": s.FWMark = 1234 - serviceAddress = fmt.Sprintf("%d", 1234) case "TCP": s.Protocol = syscall.IPPROTO_TCP s.Port = 80 s.Address = net.ParseIP("1.2.3.4") s.Netmask = 0xFFFFFFFF - serviceAddress = "1.2.3.4:80" case "UDP": s.Protocol = syscall.IPPROTO_UDP s.Port = 53 s.Address = net.ParseIP("2.3.4.5") - serviceAddress = "2.3.4.5:53" } err := i.NewService(&s) assert.NoError(t, err) - checkService(t, true, protocol, RoundRobin, serviceAddress) + checkService(t, i, &s, true) s.SchedName = "" - for j, fwdMethod := range fwdMethods { + for _, fwdMethod := range fwdMethods { d1 := Destination{ AddressFamily: nl.FAMILY_V4, Address: net.ParseIP("10.1.1.2"), @@ -258,10 +249,9 @@ func TestDestination(t *testing.T) { ConnectionFlags: fwdMethod, } - realAddress := "10.1.1.2:5000" err := i.NewDestination(&s, &d1) assert.NoError(t, err) - checkDestination(t, true, protocol, serviceAddress, realAddress, fwdMethodStrings[j]) + checkDestination(t, i, &s, &d1, true) d2 := Destination{ AddressFamily: nl.FAMILY_V4, Address: net.ParseIP("10.1.1.3"), @@ -270,10 +260,9 @@ func TestDestination(t *testing.T) { ConnectionFlags: fwdMethod, } - realAddress = "10.1.1.3:5000" err = i.NewDestination(&s, &d2) assert.NoError(t, err) - checkDestination(t, true, protocol, serviceAddress, realAddress, fwdMethodStrings[j]) + checkDestination(t, i, &s, &d2, true) d3 := Destination{ AddressFamily: nl.FAMILY_V4, @@ -283,32 +272,28 @@ func TestDestination(t *testing.T) { ConnectionFlags: fwdMethod, } - realAddress = "10.1.1.4:5000" err = i.NewDestination(&s, &d3) assert.NoError(t, err) - checkDestination(t, true, protocol, serviceAddress, realAddress, fwdMethodStrings[j]) + checkDestination(t, i, &s, &d3, true) - for m, updateFwdMethod := range fwdMethods { + for _, updateFwdMethod := range fwdMethods { if updateFwdMethod == fwdMethod { continue } d1.ConnectionFlags = updateFwdMethod - realAddress = "10.1.1.2:5000" err = i.UpdateDestination(&s, &d1) assert.NoError(t, err) - checkDestination(t, true, protocol, serviceAddress, realAddress, fwdMethodStrings[m]) + checkDestination(t, i, &s, &d1, true) d2.ConnectionFlags = updateFwdMethod - realAddress = "10.1.1.3:5000" err = i.UpdateDestination(&s, &d2) assert.NoError(t, err) - checkDestination(t, true, protocol, serviceAddress, realAddress, fwdMethodStrings[m]) + checkDestination(t, i, &s, &d2, true) d3.ConnectionFlags = updateFwdMethod - realAddress = "10.1.1.4:5000" err = i.UpdateDestination(&s, &d3) assert.NoError(t, err) - checkDestination(t, true, protocol, serviceAddress, realAddress, fwdMethodStrings[m]) + checkDestination(t, i, &s, &d3, true) } err = i.DelDestination(&s, &d1) @@ -317,6 +302,8 @@ func TestDestination(t *testing.T) { assert.NoError(t, err) err = i.DelDestination(&s, &d3) assert.NoError(t, err) + checkDestination(t, i, &s, &d3, false) + } } } diff --git a/libnetwork/ipvs/netlink.go b/libnetwork/ipvs/netlink.go index 635606dacd..5450679c3b 100644 --- a/libnetwork/ipvs/netlink.go +++ b/libnetwork/ipvs/netlink.go @@ -19,6 +19,7 @@ import ( "github.com/vishvananda/netns" ) +// For Quick Reference IPVS related netlink message is described at the end of this file. var ( native = nl.NativeEndian() ipvsFamily int @@ -89,7 +90,6 @@ func fillService(s *Service) nl.NetlinkRequestData { if s.PEName != "" { nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPEName, nl.ZeroTerminated(s.PEName)) } - f := &ipvsFlags{ flags: s.Flags, mask: 0xFFFFFFFF, @@ -117,20 +117,38 @@ func fillDestinaton(d *Destination) nl.NetlinkRequestData { return cmdAttr } -func (i *Handle) doCmd(s *Service, d *Destination, cmd uint8) error { +func (i *Handle) doCmdwithResponse(s *Service, d *Destination, cmd uint8) ([][]byte, error) { req := newIPVSRequest(cmd) req.Seq = atomic.AddUint32(&i.seq, 1) - req.AddData(fillService(s)) - if d != nil { + if s == nil { + req.Flags |= syscall.NLM_F_DUMP //Flag to dump all messages + req.AddData(nl.NewRtAttr(ipvsCmdAttrService, nil)) //Add a dummy attribute + } else { + req.AddData(fillService(s)) + } + + if d == nil { + if cmd == ipvsCmdGetDest { + req.Flags |= syscall.NLM_F_DUMP + } + + } else { req.AddData(fillDestinaton(d)) } - if _, err := execute(i.sock, req, 0); err != nil { - return err + res, err := execute(i.sock, req, 0) + if err != nil { + return [][]byte{}, err } - return nil + return res, nil +} + +func (i *Handle) doCmd(s *Service, d *Destination, cmd uint8) error { + _, err := i.doCmdwithResponse(s, d, cmd) + + return err } func getIPVSFamily() (int, error) { @@ -171,7 +189,6 @@ func rawIPData(ip net.IP) []byte { if family == nl.FAMILY_V4 { return ip.To4() } - return ip } @@ -235,3 +252,295 @@ done: } return res, nil } + +func parseIP(ip []byte, family uint16) (net.IP, error) { + + var resIP net.IP + + switch family { + case syscall.AF_INET: + resIP = (net.IP)(ip[:4]) + case syscall.AF_INET6: + resIP = (net.IP)(ip[:16]) + default: + return nil, fmt.Errorf("parseIP Error ip=%v", ip) + + } + return resIP, nil +} + +// parseStats +func assembleStats(msg []byte) (SvcStats, error) { + + var s SvcStats + + attrs, err := nl.ParseRouteAttr(msg) + if err != nil { + return s, err + } + + for _, attr := range attrs { + attrType := int(attr.Attr.Type) + switch attrType { + case ipvsSvcStatsConns: + s.Connections = native.Uint32(attr.Value) + case ipvsSvcStatsPktsIn: + s.PacketsIn = native.Uint32(attr.Value) + case ipvsSvcStatsPktsOut: + s.PacketsOut = native.Uint32(attr.Value) + case ipvsSvcStatsBytesIn: + s.BytesIn = native.Uint64(attr.Value) + case ipvsSvcStatsBytesOut: + s.BytesOut = native.Uint64(attr.Value) + case ipvsSvcStatsCPS: + s.CPS = native.Uint32(attr.Value) + case ipvsSvcStatsPPSIn: + s.PPSIn = native.Uint32(attr.Value) + case ipvsSvcStatsPPSOut: + s.PPSOut = native.Uint32(attr.Value) + case ipvsSvcStatsBPSIn: + s.BPSIn = native.Uint32(attr.Value) + case ipvsSvcStatsBPSOut: + s.BPSOut = native.Uint32(attr.Value) + } + } + return s, nil +} + +// assembleService assembles a services back from a hain of netlink attributes +func assembleService(attrs []syscall.NetlinkRouteAttr) (*Service, error) { + + var s Service + + for _, attr := range attrs { + + attrType := int(attr.Attr.Type) + + switch attrType { + + case ipvsSvcAttrAddressFamily: + s.AddressFamily = native.Uint16(attr.Value) + case ipvsSvcAttrProtocol: + s.Protocol = native.Uint16(attr.Value) + case ipvsSvcAttrAddress: + ip, err := parseIP(attr.Value, s.AddressFamily) + if err != nil { + return nil, err + } + s.Address = ip + case ipvsSvcAttrPort: + s.Port = binary.BigEndian.Uint16(attr.Value) + case ipvsSvcAttrFWMark: + s.FWMark = native.Uint32(attr.Value) + case ipvsSvcAttrSchedName: + s.SchedName = nl.BytesToString(attr.Value) + case ipvsSvcAttrFlags: + s.Flags = native.Uint32(attr.Value) + case ipvsSvcAttrTimeout: + s.Timeout = native.Uint32(attr.Value) + case ipvsSvcAttrNetmask: + s.Netmask = native.Uint32(attr.Value) + case ipvsSvcAttrStats: + stats, err := assembleStats(attr.Value) + if err != nil { + return nil, err + } + s.Stats = stats + } + + } + return &s, nil +} + +// parseService given a ipvs netlink response this function will respond with a valid service entry, an error otherwise +func (i *Handle) parseService(msg []byte) (*Service, error) { + + var s *Service + + //Remove General header for this message and parse the NetLink message + hdr := deserializeGenlMsg(msg) + NetLinkAttrs, err := nl.ParseRouteAttr(msg[hdr.Len():]) + if err != nil { + return nil, err + } + if len(NetLinkAttrs) == 0 { + return nil, fmt.Errorf("error no valid netlink message found while parsing service record") + } + + //Now Parse and get IPVS related attributes messages packed in this message. + ipvsAttrs, err := nl.ParseRouteAttr(NetLinkAttrs[0].Value) + if err != nil { + return nil, err + } + + //Assemble all the IPVS related attribute messages and create a service record + s, err = assembleService(ipvsAttrs) + if err != nil { + return nil, err + } + + return s, nil +} + +// doGetServicesCmd a wrapper which could be used commonly for both GetServices() and GetService(*Service) +func (i *Handle) doGetServicesCmd(svc *Service) ([]*Service, error) { + var res []*Service + + msgs, err := i.doCmdwithResponse(svc, nil, ipvsCmdGetService) + if err != nil { + return nil, err + } + + for _, msg := range msgs { + srv, err := i.parseService(msg) + if err != nil { + return nil, err + } + res = append(res, srv) + } + + return res, nil +} + +func assembleDestination(attrs []syscall.NetlinkRouteAttr) (*Destination, error) { + + var d Destination + + for _, attr := range attrs { + + attrType := int(attr.Attr.Type) + + switch attrType { + case ipvsDestAttrAddress: + ip, err := parseIP(attr.Value, syscall.AF_INET) + if err != nil { + return nil, err + } + d.Address = ip + case ipvsDestAttrPort: + d.Port = binary.BigEndian.Uint16(attr.Value) + case ipvsDestAttrForwardingMethod: + d.ConnectionFlags = native.Uint32(attr.Value) + case ipvsDestAttrWeight: + d.Weight = int(native.Uint16(attr.Value)) + case ipvsDestAttrUpperThreshold: + d.UpperThreshold = native.Uint32(attr.Value) + case ipvsDestAttrLowerThreshold: + d.LowerThreshold = native.Uint32(attr.Value) + case ipvsDestAttrAddressFamily: + d.AddressFamily = native.Uint16(attr.Value) + } + } + return &d, nil +} + +// parseDestination given a ipvs netlink response this function will respond with a valid destination entry, an error otherwise +func (i *Handle) parseDestination(msg []byte) (*Destination, error) { + var dst *Destination + + //Remove General header for this message + hdr := deserializeGenlMsg(msg) + NetLinkAttrs, err := nl.ParseRouteAttr(msg[hdr.Len():]) + if err != nil { + return nil, err + } + if len(NetLinkAttrs) == 0 { + return nil, fmt.Errorf("error no valid netlink message found while parsing destination record") + } + + //Now Parse and get IPVS related attributes messages packed in this message. + ipvsAttrs, err := nl.ParseRouteAttr(NetLinkAttrs[0].Value) + if err != nil { + return nil, err + } + + //Assemble netlink attributes and create a Destination record + dst, err = assembleDestination(ipvsAttrs) + if err != nil { + return nil, err + } + + return dst, nil +} + +// doGetDestinationsCmd a wrapper function to be used by GetDestinations and GetDestination(d) apis +func (i *Handle) doGetDestinationsCmd(s *Service, d *Destination) ([]*Destination, error) { + + var res []*Destination + + msgs, err := i.doCmdwithResponse(s, d, ipvsCmdGetDest) + if err != nil { + return nil, err + } + + for _, msg := range msgs { + dest, err := i.parseDestination(msg) + if err != nil { + return res, err + } + res = append(res, dest) + } + return res, nil +} + +// IPVS related netlink message format explained + +/* EACH NETLINK MSG is of the below format, this is what we will receive from execute() api. + If we have multiple netlink objects to process like GetServices() etc., execute() will + supply an array of this below object + + NETLINK MSG +|-----------------------------------| + 0 1 2 3 +|--------|--------|--------|--------| - +| CMD ID | VER | RESERVED | |==> General Message Header represented by genlMsgHdr +|-----------------------------------| - +| ATTR LEN | ATTR TYPE | | +|-----------------------------------| | +| | | +| VALUE | | +| []byte Array of IPVS MSG | |==> Attribute Message represented by syscall.NetlinkRouteAttr +| PADDED BY 4 BYTES | | +| | | +|-----------------------------------| - + + + Once We strip genlMsgHdr from above NETLINK MSG, we should parse the VALUE. + VALUE will have an array of netlink attributes (syscall.NetlinkRouteAttr) such that each attribute will + represent a "Service" or "Destination" object's field. If we assemble these attributes we can construct + Service or Destination. + + IPVS MSG +|-----------------------------------| + 0 1 2 3 +|--------|--------|--------|--------| +| ATTR LEN | ATTR TYPE | +|-----------------------------------| +| | +| | +| []byte IPVS ATTRIBUTE BY 4 BYTES | +| | +| | +|-----------------------------------| + NEXT ATTRIBUTE +|-----------------------------------| +| ATTR LEN | ATTR TYPE | +|-----------------------------------| +| | +| | +| []byte IPVS ATTRIBUTE BY 4 BYTES | +| | +| | +|-----------------------------------| + NEXT ATTRIBUTE +|-----------------------------------| +| ATTR LEN | ATTR TYPE | +|-----------------------------------| +| | +| | +| []byte IPVS ATTRIBUTE BY 4 BYTES | +| | +| | +|-----------------------------------| + +*/