Selaa lähdekoodia

Convert endpoint gossip to use protobuf

Endpoint gossip will use protobuf so that we can make changes in a
backward compatible way.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
Jana Radhakrishnan 9 vuotta sitten
vanhempi
commit
b1e5178bc3
3 muutettua tiedostoa jossa 546 lisäystä ja 13 poistoa
  1. 26 13
      libnetwork/agent.go
  2. 501 0
      libnetwork/agent.pb.go
  3. 19 0
      libnetwork/agent.proto

+ 26 - 13
libnetwork/agent.go

@@ -1,10 +1,11 @@
 package libnetwork
 
+//go:generate protoc -I.:Godeps/_workspace/src/github.com/gogo/protobuf  --gogo_out=import_path=github.com/docker/libnetwork,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. agent.proto
+
 import (
 	"fmt"
 	"net"
 	"os"
-	"strings"
 
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/go-events"
@@ -12,6 +13,7 @@ import (
 	"github.com/docker/libnetwork/discoverapi"
 	"github.com/docker/libnetwork/driverapi"
 	"github.com/docker/libnetwork/networkdb"
+	"github.com/gogo/protobuf/proto"
 )
 
 type agent struct {
@@ -169,8 +171,18 @@ func (ep *endpoint) addToCluster() error {
 			return err
 		}
 
-		if err := c.agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), []byte(fmt.Sprintf("%s,%s,%s,%s", ep.Name(), ep.svcName,
-			ep.svcID, ep.Iface().Address().IP))); err != nil {
+		buf, err := proto.Marshal(&EndpointRecord{
+			Name:        ep.Name(),
+			ServiceName: ep.svcName,
+			ServiceID:   ep.svcID,
+			EndpointIP:  ep.Iface().Address().IP.String(),
+		})
+
+		if err != nil {
+			return err
+		}
+
+		if err := c.agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), buf); err != nil {
 			return err
 		}
 	}
@@ -310,20 +322,21 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
 	var (
 		nid   string
 		eid   string
-		value string
+		value []byte
 		isAdd bool
+		epRec EndpointRecord
 	)
 
 	switch event := ev.(type) {
 	case networkdb.CreateEvent:
 		nid = event.NetworkID
 		eid = event.Key
-		value = string(event.Value)
+		value = event.Value
 		isAdd = true
 	case networkdb.DeleteEvent:
 		nid = event.NetworkID
 		eid = event.Key
-		value = string(event.Value)
+		value = event.Value
 	case networkdb.UpdateEvent:
 		logrus.Errorf("Unexpected update service table event = %#v", event)
 	}
@@ -335,16 +348,16 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
 	}
 	n := nw.(*network)
 
-	vals := strings.Split(value, ",")
-	if len(vals) < 4 {
-		logrus.Errorf("Incorrect service table value = %s", value)
+	err = proto.Unmarshal(value, &epRec)
+	if err != nil {
+		logrus.Errorf("Failed to unmarshal service table value: %v", err)
 		return
 	}
 
-	name := vals[0]
-	svcName := vals[1]
-	svcID := vals[2]
-	ip := net.ParseIP(vals[3])
+	name := epRec.Name
+	svcName := epRec.ServiceName
+	svcID := epRec.ServiceID
+	ip := net.ParseIP(epRec.EndpointIP)
 
 	if name == "" || ip == nil {
 		logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)

+ 501 - 0
libnetwork/agent.pb.go

@@ -0,0 +1,501 @@
+// Code generated by protoc-gen-gogo.
+// source: agent.proto
+// DO NOT EDIT!
+
+/*
+	Package libnetwork is a generated protocol buffer package.
+
+	It is generated from these files:
+		agent.proto
+
+	It has these top-level messages:
+		EndpointRecord
+*/
+package libnetwork
+
+import proto "github.com/gogo/protobuf/proto"
+import fmt "fmt"
+import math "math"
+import _ "github.com/gogo/protobuf/gogoproto"
+
+import strings "strings"
+import github_com_gogo_protobuf_proto "github.com/gogo/protobuf/proto"
+import sort "sort"
+import strconv "strconv"
+import reflect "reflect"
+
+import io "io"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+const _ = proto.GoGoProtoPackageIsVersion1
+
+type EndpointRecord struct {
+	Name        string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+	ServiceName string `protobuf:"bytes,2,opt,name=service_name,json=serviceName,proto3" json:"service_name,omitempty"`
+	ServiceID   string `protobuf:"bytes,3,opt,name=service_id,json=serviceId,proto3" json:"service_id,omitempty"`
+	EndpointIP  string `protobuf:"bytes,4,opt,name=endpoint_ip,json=endpointIp,proto3" json:"endpoint_ip,omitempty"`
+}
+
+func (m *EndpointRecord) Reset()                    { *m = EndpointRecord{} }
+func (*EndpointRecord) ProtoMessage()               {}
+func (*EndpointRecord) Descriptor() ([]byte, []int) { return fileDescriptorAgent, []int{0} }
+
+func init() {
+	proto.RegisterType((*EndpointRecord)(nil), "libnetwork.EndpointRecord")
+}
+func (this *EndpointRecord) GoString() string {
+	if this == nil {
+		return "nil"
+	}
+	s := make([]string, 0, 8)
+	s = append(s, "&libnetwork.EndpointRecord{")
+	s = append(s, "Name: "+fmt.Sprintf("%#v", this.Name)+",\n")
+	s = append(s, "ServiceName: "+fmt.Sprintf("%#v", this.ServiceName)+",\n")
+	s = append(s, "ServiceID: "+fmt.Sprintf("%#v", this.ServiceID)+",\n")
+	s = append(s, "EndpointIP: "+fmt.Sprintf("%#v", this.EndpointIP)+",\n")
+	s = append(s, "}")
+	return strings.Join(s, "")
+}
+func valueToGoStringAgent(v interface{}, typ string) string {
+	rv := reflect.ValueOf(v)
+	if rv.IsNil() {
+		return "nil"
+	}
+	pv := reflect.Indirect(rv).Interface()
+	return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv)
+}
+func extensionToGoStringAgent(e map[int32]github_com_gogo_protobuf_proto.Extension) string {
+	if e == nil {
+		return "nil"
+	}
+	s := "map[int32]proto.Extension{"
+	keys := make([]int, 0, len(e))
+	for k := range e {
+		keys = append(keys, int(k))
+	}
+	sort.Ints(keys)
+	ss := []string{}
+	for _, k := range keys {
+		ss = append(ss, strconv.Itoa(k)+": "+e[int32(k)].GoString())
+	}
+	s += strings.Join(ss, ",") + "}"
+	return s
+}
+func (m *EndpointRecord) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *EndpointRecord) MarshalTo(data []byte) (int, error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	if len(m.Name) > 0 {
+		data[i] = 0xa
+		i++
+		i = encodeVarintAgent(data, i, uint64(len(m.Name)))
+		i += copy(data[i:], m.Name)
+	}
+	if len(m.ServiceName) > 0 {
+		data[i] = 0x12
+		i++
+		i = encodeVarintAgent(data, i, uint64(len(m.ServiceName)))
+		i += copy(data[i:], m.ServiceName)
+	}
+	if len(m.ServiceID) > 0 {
+		data[i] = 0x1a
+		i++
+		i = encodeVarintAgent(data, i, uint64(len(m.ServiceID)))
+		i += copy(data[i:], m.ServiceID)
+	}
+	if len(m.EndpointIP) > 0 {
+		data[i] = 0x22
+		i++
+		i = encodeVarintAgent(data, i, uint64(len(m.EndpointIP)))
+		i += copy(data[i:], m.EndpointIP)
+	}
+	return i, nil
+}
+
+func encodeFixed64Agent(data []byte, offset int, v uint64) int {
+	data[offset] = uint8(v)
+	data[offset+1] = uint8(v >> 8)
+	data[offset+2] = uint8(v >> 16)
+	data[offset+3] = uint8(v >> 24)
+	data[offset+4] = uint8(v >> 32)
+	data[offset+5] = uint8(v >> 40)
+	data[offset+6] = uint8(v >> 48)
+	data[offset+7] = uint8(v >> 56)
+	return offset + 8
+}
+func encodeFixed32Agent(data []byte, offset int, v uint32) int {
+	data[offset] = uint8(v)
+	data[offset+1] = uint8(v >> 8)
+	data[offset+2] = uint8(v >> 16)
+	data[offset+3] = uint8(v >> 24)
+	return offset + 4
+}
+func encodeVarintAgent(data []byte, offset int, v uint64) int {
+	for v >= 1<<7 {
+		data[offset] = uint8(v&0x7f | 0x80)
+		v >>= 7
+		offset++
+	}
+	data[offset] = uint8(v)
+	return offset + 1
+}
+func (m *EndpointRecord) Size() (n int) {
+	var l int
+	_ = l
+	l = len(m.Name)
+	if l > 0 {
+		n += 1 + l + sovAgent(uint64(l))
+	}
+	l = len(m.ServiceName)
+	if l > 0 {
+		n += 1 + l + sovAgent(uint64(l))
+	}
+	l = len(m.ServiceID)
+	if l > 0 {
+		n += 1 + l + sovAgent(uint64(l))
+	}
+	l = len(m.EndpointIP)
+	if l > 0 {
+		n += 1 + l + sovAgent(uint64(l))
+	}
+	return n
+}
+
+func sovAgent(x uint64) (n int) {
+	for {
+		n++
+		x >>= 7
+		if x == 0 {
+			break
+		}
+	}
+	return n
+}
+func sozAgent(x uint64) (n int) {
+	return sovAgent(uint64((x << 1) ^ uint64((int64(x) >> 63))))
+}
+func (this *EndpointRecord) String() string {
+	if this == nil {
+		return "nil"
+	}
+	s := strings.Join([]string{`&EndpointRecord{`,
+		`Name:` + fmt.Sprintf("%v", this.Name) + `,`,
+		`ServiceName:` + fmt.Sprintf("%v", this.ServiceName) + `,`,
+		`ServiceID:` + fmt.Sprintf("%v", this.ServiceID) + `,`,
+		`EndpointIP:` + fmt.Sprintf("%v", this.EndpointIP) + `,`,
+		`}`,
+	}, "")
+	return s
+}
+func valueToStringAgent(v interface{}) string {
+	rv := reflect.ValueOf(v)
+	if rv.IsNil() {
+		return "nil"
+	}
+	pv := reflect.Indirect(rv).Interface()
+	return fmt.Sprintf("*%v", pv)
+}
+func (m *EndpointRecord) Unmarshal(data []byte) error {
+	l := len(data)
+	iNdEx := 0
+	for iNdEx < l {
+		preIndex := iNdEx
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return ErrIntOverflowAgent
+			}
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		if wireType == 4 {
+			return fmt.Errorf("proto: EndpointRecord: wiretype end group for non-group")
+		}
+		if fieldNum <= 0 {
+			return fmt.Errorf("proto: EndpointRecord: illegal tag %d (wire type %d)", fieldNum, wire)
+		}
+		switch fieldNum {
+		case 1:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
+			}
+			var stringLen uint64
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowAgent
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				stringLen |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			intStringLen := int(stringLen)
+			if intStringLen < 0 {
+				return ErrInvalidLengthAgent
+			}
+			postIndex := iNdEx + intStringLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Name = string(data[iNdEx:postIndex])
+			iNdEx = postIndex
+		case 2:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field ServiceName", wireType)
+			}
+			var stringLen uint64
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowAgent
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				stringLen |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			intStringLen := int(stringLen)
+			if intStringLen < 0 {
+				return ErrInvalidLengthAgent
+			}
+			postIndex := iNdEx + intStringLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.ServiceName = string(data[iNdEx:postIndex])
+			iNdEx = postIndex
+		case 3:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field ServiceID", wireType)
+			}
+			var stringLen uint64
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowAgent
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				stringLen |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			intStringLen := int(stringLen)
+			if intStringLen < 0 {
+				return ErrInvalidLengthAgent
+			}
+			postIndex := iNdEx + intStringLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.ServiceID = string(data[iNdEx:postIndex])
+			iNdEx = postIndex
+		case 4:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field EndpointIP", wireType)
+			}
+			var stringLen uint64
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowAgent
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				stringLen |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			intStringLen := int(stringLen)
+			if intStringLen < 0 {
+				return ErrInvalidLengthAgent
+			}
+			postIndex := iNdEx + intStringLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.EndpointIP = string(data[iNdEx:postIndex])
+			iNdEx = postIndex
+		default:
+			iNdEx = preIndex
+			skippy, err := skipAgent(data[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if skippy < 0 {
+				return ErrInvalidLengthAgent
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			iNdEx += skippy
+		}
+	}
+
+	if iNdEx > l {
+		return io.ErrUnexpectedEOF
+	}
+	return nil
+}
+func skipAgent(data []byte) (n int, err error) {
+	l := len(data)
+	iNdEx := 0
+	for iNdEx < l {
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return 0, ErrIntOverflowAgent
+			}
+			if iNdEx >= l {
+				return 0, io.ErrUnexpectedEOF
+			}
+			b := data[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		wireType := int(wire & 0x7)
+		switch wireType {
+		case 0:
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return 0, ErrIntOverflowAgent
+				}
+				if iNdEx >= l {
+					return 0, io.ErrUnexpectedEOF
+				}
+				iNdEx++
+				if data[iNdEx-1] < 0x80 {
+					break
+				}
+			}
+			return iNdEx, nil
+		case 1:
+			iNdEx += 8
+			return iNdEx, nil
+		case 2:
+			var length int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return 0, ErrIntOverflowAgent
+				}
+				if iNdEx >= l {
+					return 0, io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				length |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			iNdEx += length
+			if length < 0 {
+				return 0, ErrInvalidLengthAgent
+			}
+			return iNdEx, nil
+		case 3:
+			for {
+				var innerWire uint64
+				var start int = iNdEx
+				for shift := uint(0); ; shift += 7 {
+					if shift >= 64 {
+						return 0, ErrIntOverflowAgent
+					}
+					if iNdEx >= l {
+						return 0, io.ErrUnexpectedEOF
+					}
+					b := data[iNdEx]
+					iNdEx++
+					innerWire |= (uint64(b) & 0x7F) << shift
+					if b < 0x80 {
+						break
+					}
+				}
+				innerWireType := int(innerWire & 0x7)
+				if innerWireType == 4 {
+					break
+				}
+				next, err := skipAgent(data[start:])
+				if err != nil {
+					return 0, err
+				}
+				iNdEx = start + next
+			}
+			return iNdEx, nil
+		case 4:
+			return iNdEx, nil
+		case 5:
+			iNdEx += 4
+			return iNdEx, nil
+		default:
+			return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
+		}
+	}
+	panic("unreachable")
+}
+
+var (
+	ErrInvalidLengthAgent = fmt.Errorf("proto: negative length found during unmarshaling")
+	ErrIntOverflowAgent   = fmt.Errorf("proto: integer overflow")
+)
+
+var fileDescriptorAgent = []byte{
+	// 204 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0x4c, 0x4f, 0xcd,
+	0x2b, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0xca, 0xc9, 0x4c, 0xca, 0x4b, 0x2d, 0x29,
+	0xcf, 0x2f, 0xca, 0x96, 0x12, 0x49, 0xcf, 0x4f, 0xcf, 0x07, 0x0b, 0xeb, 0x83, 0x58, 0x10, 0x15,
+	0x4a, 0xcb, 0x18, 0xb9, 0xf8, 0x5c, 0xf3, 0x52, 0x0a, 0xf2, 0x33, 0xf3, 0x4a, 0x82, 0x52, 0x93,
+	0xf3, 0x8b, 0x52, 0x84, 0x84, 0xb8, 0x58, 0xf2, 0x12, 0x73, 0x53, 0x25, 0x18, 0x15, 0x18, 0x35,
+	0x38, 0x83, 0xc0, 0x6c, 0x21, 0x45, 0x2e, 0x9e, 0xe2, 0xd4, 0xa2, 0xb2, 0xcc, 0xe4, 0xd4, 0x78,
+	0xb0, 0x1c, 0x13, 0x58, 0x8e, 0x1b, 0x2a, 0xe6, 0x07, 0x52, 0xa2, 0xc3, 0xc5, 0x05, 0x53, 0x92,
+	0x99, 0x22, 0xc1, 0x0c, 0x52, 0xe0, 0xc4, 0xfb, 0xe8, 0x9e, 0x3c, 0x67, 0x30, 0x44, 0xd4, 0xd3,
+	0x25, 0x88, 0x13, 0xaa, 0xc0, 0x33, 0x45, 0x48, 0x9f, 0x8b, 0x3b, 0x15, 0x6a, 0x6d, 0x7c, 0x66,
+	0x81, 0x04, 0x0b, 0x58, 0x39, 0x1f, 0x50, 0x39, 0x17, 0xcc, 0x35, 0x9e, 0x01, 0x41, 0x5c, 0x30,
+	0x25, 0x9e, 0x05, 0x4e, 0x12, 0x37, 0x1e, 0xca, 0x31, 0x7c, 0x78, 0x28, 0xc7, 0xd8, 0xf0, 0x48,
+	0x8e, 0xf1, 0x04, 0x10, 0x5f, 0x00, 0xe2, 0x07, 0x40, 0x9c, 0xc4, 0x06, 0xf6, 0x89, 0x31, 0x20,
+	0x00, 0x00, 0xff, 0xff, 0x94, 0x78, 0x3e, 0xce, 0xfa, 0x00, 0x00, 0x00,
+}

+ 19 - 0
libnetwork/agent.proto

@@ -0,0 +1,19 @@
+syntax = "proto3";
+
+import "gogoproto/gogo.proto";
+
+package libnetwork;
+
+option (gogoproto.marshaler_all) = true;
+option (gogoproto.unmarshaler_all) = true;
+option (gogoproto.stringer_all) = true;
+option (gogoproto.gostring_all) = true;
+option (gogoproto.sizer_all) = true;
+option (gogoproto.goproto_stringer_all) = false;
+
+message EndpointRecord {
+	string name = 1;
+	string service_name = 2;
+	string service_id = 3 [(gogoproto.customname) = "ServiceID"];
+	string endpoint_ip = 4 [(gogoproto.customname) = "EndpointIP"];
+}