Browse Source

Merge pull request #1208 from mrjana/lb

Add ingress load balancer
Madhu Venugopal 9 years ago
parent
commit
79c0292f53

+ 22 - 9
libnetwork/agent.go

@@ -167,18 +167,25 @@ func (ep *endpoint) addToCluster() error {
 
 	c := n.getController()
 	if !ep.isAnonymous() && ep.Iface().Address() != nil {
+		var ingressPorts []*PortConfig
 		if ep.svcID != "" {
-			if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ep.Iface().Address().IP); err != nil {
+			// Gossip ingress ports only in ingress network.
+			if n.ingress {
+				ingressPorts = ep.ingressPorts
+			}
+
+			if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.Iface().Address().IP); err != nil {
 				return err
 			}
 		}
 
 		buf, err := proto.Marshal(&EndpointRecord{
-			Name:        ep.Name(),
-			ServiceName: ep.svcName,
-			ServiceID:   ep.svcID,
-			VirtualIP:   ep.virtualIP.String(),
-			EndpointIP:  ep.Iface().Address().IP.String(),
+			Name:         ep.Name(),
+			ServiceName:  ep.svcName,
+			ServiceID:    ep.svcID,
+			VirtualIP:    ep.virtualIP.String(),
+			IngressPorts: ingressPorts,
+			EndpointIP:   ep.Iface().Address().IP.String(),
 		})
 
 		if err != nil {
@@ -208,7 +215,12 @@ func (ep *endpoint) deleteFromCluster() error {
 	c := n.getController()
 	if !ep.isAnonymous() {
 		if ep.svcID != "" && ep.Iface().Address() != nil {
-			if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ep.Iface().Address().IP); err != nil {
+			var ingressPorts []*PortConfig
+			if n.ingress {
+				ingressPorts = ep.ingressPorts
+			}
+
+			if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.Iface().Address().IP); err != nil {
 				return err
 			}
 		}
@@ -362,6 +374,7 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
 	svcID := epRec.ServiceID
 	vip := net.ParseIP(epRec.VirtualIP)
 	ip := net.ParseIP(epRec.EndpointIP)
+	ingressPorts := epRec.IngressPorts
 
 	if name == "" || ip == nil {
 		logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
@@ -370,7 +383,7 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
 
 	if isAdd {
 		if svcID != "" {
-			if err := c.addServiceBinding(svcName, svcID, nid, eid, vip, ip); err != nil {
+			if err := c.addServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, ip); err != nil {
 				logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
 				return
 			}
@@ -379,7 +392,7 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
 		n.addSvcRecords(name, ip, nil, true)
 	} else {
 		if svcID != "" {
-			if err := c.rmServiceBinding(svcName, svcID, nid, eid, vip, ip); err != nil {
+			if err := c.rmServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, ip); err != nil {
 				logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
 				return
 			}

+ 369 - 21
libnetwork/agent.pb.go

@@ -10,6 +10,7 @@
 
 	It has these top-level messages:
 		EndpointRecord
+		PortConfig
 */
 package libnetwork
 
@@ -35,32 +36,113 @@ var _ = math.Inf
 // is compatible with the proto package it is being compiled against.
 const _ = proto.GoGoProtoPackageIsVersion1
 
+type PortConfig_Protocol int32
+
+const (
+	ProtocolTCP PortConfig_Protocol = 0
+	ProtocolUDP PortConfig_Protocol = 1
+)
+
+var PortConfig_Protocol_name = map[int32]string{
+	0: "TCP",
+	1: "UDP",
+}
+var PortConfig_Protocol_value = map[string]int32{
+	"TCP": 0,
+	"UDP": 1,
+}
+
+func (x PortConfig_Protocol) String() string {
+	return proto.EnumName(PortConfig_Protocol_name, int32(x))
+}
+func (PortConfig_Protocol) EnumDescriptor() ([]byte, []int) { return fileDescriptorAgent, []int{1, 0} }
+
+// EndpointRecord specifies all the endpoint specific information that
+// needs to gossiped to nodes participating in the network.
 type EndpointRecord struct {
-	Name        string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+	// Name of the endpoint
+	Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+	// Service name of the service to which this endpoint belongs.
 	ServiceName string `protobuf:"bytes,2,opt,name=service_name,json=serviceName,proto3" json:"service_name,omitempty"`
-	ServiceID   string `protobuf:"bytes,3,opt,name=service_id,json=serviceId,proto3" json:"service_id,omitempty"`
-	VirtualIP   string `protobuf:"bytes,4,opt,name=virtual_ip,json=virtualIp,proto3" json:"virtual_ip,omitempty"`
-	EndpointIP  string `protobuf:"bytes,5,opt,name=endpoint_ip,json=endpointIp,proto3" json:"endpoint_ip,omitempty"`
+	// Service ID of the service to which this endpoint belongs.
+	ServiceID string `protobuf:"bytes,3,opt,name=service_id,json=serviceId,proto3" json:"service_id,omitempty"`
+	// Virtual IP of the service to which this endpoint belongs.
+	VirtualIP string `protobuf:"bytes,4,opt,name=virtual_ip,json=virtualIp,proto3" json:"virtual_ip,omitempty"`
+	// IP assigned to this endpoint.
+	EndpointIP string `protobuf:"bytes,5,opt,name=endpoint_ip,json=endpointIp,proto3" json:"endpoint_ip,omitempty"`
+	// IngressPorts exposed by the service to which this endpoint belongs.
+	IngressPorts []*PortConfig `protobuf:"bytes,6,rep,name=ingress_ports,json=ingressPorts" json:"ingress_ports,omitempty"`
 }
 
 func (m *EndpointRecord) Reset()                    { *m = EndpointRecord{} }
 func (*EndpointRecord) ProtoMessage()               {}
 func (*EndpointRecord) Descriptor() ([]byte, []int) { return fileDescriptorAgent, []int{0} }
 
+func (m *EndpointRecord) GetIngressPorts() []*PortConfig {
+	if m != nil {
+		return m.IngressPorts
+	}
+	return nil
+}
+
+// PortConfig specifies an exposed port which can be
+// addressed using the given name. This can be later queried
+// using a service discovery api or a DNS SRV query. The node
+// port specifies a port that can be used to address this
+// service external to the cluster by sending a connection
+// request to this port to any node on the cluster.
+type PortConfig struct {
+	// Name for the port. If provided the port information can
+	// be queried using the name as in a DNS SRV query.
+	Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+	// Protocol for the port which is exposed.
+	Protocol PortConfig_Protocol `protobuf:"varint,2,opt,name=protocol,proto3,enum=libnetwork.PortConfig_Protocol" json:"protocol,omitempty"`
+	// The port which the application is exposing and is bound to.
+	Port uint32 `protobuf:"varint,3,opt,name=port,proto3" json:"port,omitempty"`
+	// NodePort specifies the port on which the service is
+	// exposed on all nodes on the cluster. If not specified an
+	// arbitrary port in the node port range is allocated by the
+	// system. If specified it should be within the node port
+	// range and it should be available.
+	NodePort uint32 `protobuf:"varint,4,opt,name=node_port,json=nodePort,proto3" json:"node_port,omitempty"`
+}
+
+func (m *PortConfig) Reset()                    { *m = PortConfig{} }
+func (*PortConfig) ProtoMessage()               {}
+func (*PortConfig) Descriptor() ([]byte, []int) { return fileDescriptorAgent, []int{1} }
+
 func init() {
 	proto.RegisterType((*EndpointRecord)(nil), "libnetwork.EndpointRecord")
+	proto.RegisterType((*PortConfig)(nil), "libnetwork.PortConfig")
+	proto.RegisterEnum("libnetwork.PortConfig_Protocol", PortConfig_Protocol_name, PortConfig_Protocol_value)
 }
 func (this *EndpointRecord) GoString() string {
 	if this == nil {
 		return "nil"
 	}
-	s := make([]string, 0, 9)
+	s := make([]string, 0, 10)
 	s = append(s, "&libnetwork.EndpointRecord{")
 	s = append(s, "Name: "+fmt.Sprintf("%#v", this.Name)+",\n")
 	s = append(s, "ServiceName: "+fmt.Sprintf("%#v", this.ServiceName)+",\n")
 	s = append(s, "ServiceID: "+fmt.Sprintf("%#v", this.ServiceID)+",\n")
 	s = append(s, "VirtualIP: "+fmt.Sprintf("%#v", this.VirtualIP)+",\n")
 	s = append(s, "EndpointIP: "+fmt.Sprintf("%#v", this.EndpointIP)+",\n")
+	if this.IngressPorts != nil {
+		s = append(s, "IngressPorts: "+fmt.Sprintf("%#v", this.IngressPorts)+",\n")
+	}
+	s = append(s, "}")
+	return strings.Join(s, "")
+}
+func (this *PortConfig) GoString() string {
+	if this == nil {
+		return "nil"
+	}
+	s := make([]string, 0, 8)
+	s = append(s, "&libnetwork.PortConfig{")
+	s = append(s, "Name: "+fmt.Sprintf("%#v", this.Name)+",\n")
+	s = append(s, "Protocol: "+fmt.Sprintf("%#v", this.Protocol)+",\n")
+	s = append(s, "Port: "+fmt.Sprintf("%#v", this.Port)+",\n")
+	s = append(s, "NodePort: "+fmt.Sprintf("%#v", this.NodePort)+",\n")
 	s = append(s, "}")
 	return strings.Join(s, "")
 }
@@ -134,6 +216,57 @@ func (m *EndpointRecord) MarshalTo(data []byte) (int, error) {
 		i = encodeVarintAgent(data, i, uint64(len(m.EndpointIP)))
 		i += copy(data[i:], m.EndpointIP)
 	}
+	if len(m.IngressPorts) > 0 {
+		for _, msg := range m.IngressPorts {
+			data[i] = 0x32
+			i++
+			i = encodeVarintAgent(data, i, uint64(msg.Size()))
+			n, err := msg.MarshalTo(data[i:])
+			if err != nil {
+				return 0, err
+			}
+			i += n
+		}
+	}
+	return i, nil
+}
+
+func (m *PortConfig) Marshal() (data []byte, err error) {
+	size := m.Size()
+	data = make([]byte, size)
+	n, err := m.MarshalTo(data)
+	if err != nil {
+		return nil, err
+	}
+	return data[:n], nil
+}
+
+func (m *PortConfig) MarshalTo(data []byte) (int, error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	if len(m.Name) > 0 {
+		data[i] = 0xa
+		i++
+		i = encodeVarintAgent(data, i, uint64(len(m.Name)))
+		i += copy(data[i:], m.Name)
+	}
+	if m.Protocol != 0 {
+		data[i] = 0x10
+		i++
+		i = encodeVarintAgent(data, i, uint64(m.Protocol))
+	}
+	if m.Port != 0 {
+		data[i] = 0x18
+		i++
+		i = encodeVarintAgent(data, i, uint64(m.Port))
+	}
+	if m.NodePort != 0 {
+		data[i] = 0x20
+		i++
+		i = encodeVarintAgent(data, i, uint64(m.NodePort))
+	}
 	return i, nil
 }
 
@@ -187,6 +320,31 @@ func (m *EndpointRecord) Size() (n int) {
 	if l > 0 {
 		n += 1 + l + sovAgent(uint64(l))
 	}
+	if len(m.IngressPorts) > 0 {
+		for _, e := range m.IngressPorts {
+			l = e.Size()
+			n += 1 + l + sovAgent(uint64(l))
+		}
+	}
+	return n
+}
+
+func (m *PortConfig) Size() (n int) {
+	var l int
+	_ = l
+	l = len(m.Name)
+	if l > 0 {
+		n += 1 + l + sovAgent(uint64(l))
+	}
+	if m.Protocol != 0 {
+		n += 1 + sovAgent(uint64(m.Protocol))
+	}
+	if m.Port != 0 {
+		n += 1 + sovAgent(uint64(m.Port))
+	}
+	if m.NodePort != 0 {
+		n += 1 + sovAgent(uint64(m.NodePort))
+	}
 	return n
 }
 
@@ -213,6 +371,20 @@ func (this *EndpointRecord) String() string {
 		`ServiceID:` + fmt.Sprintf("%v", this.ServiceID) + `,`,
 		`VirtualIP:` + fmt.Sprintf("%v", this.VirtualIP) + `,`,
 		`EndpointIP:` + fmt.Sprintf("%v", this.EndpointIP) + `,`,
+		`IngressPorts:` + strings.Replace(fmt.Sprintf("%v", this.IngressPorts), "PortConfig", "PortConfig", 1) + `,`,
+		`}`,
+	}, "")
+	return s
+}
+func (this *PortConfig) String() string {
+	if this == nil {
+		return "nil"
+	}
+	s := strings.Join([]string{`&PortConfig{`,
+		`Name:` + fmt.Sprintf("%v", this.Name) + `,`,
+		`Protocol:` + fmt.Sprintf("%v", this.Protocol) + `,`,
+		`Port:` + fmt.Sprintf("%v", this.Port) + `,`,
+		`NodePort:` + fmt.Sprintf("%v", this.NodePort) + `,`,
 		`}`,
 	}, "")
 	return s
@@ -399,6 +571,173 @@ func (m *EndpointRecord) Unmarshal(data []byte) error {
 			}
 			m.EndpointIP = string(data[iNdEx:postIndex])
 			iNdEx = postIndex
+		case 6:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field IngressPorts", wireType)
+			}
+			var msglen int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowAgent
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				msglen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if msglen < 0 {
+				return ErrInvalidLengthAgent
+			}
+			postIndex := iNdEx + msglen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.IngressPorts = append(m.IngressPorts, &PortConfig{})
+			if err := m.IngressPorts[len(m.IngressPorts)-1].Unmarshal(data[iNdEx:postIndex]); err != nil {
+				return err
+			}
+			iNdEx = postIndex
+		default:
+			iNdEx = preIndex
+			skippy, err := skipAgent(data[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if skippy < 0 {
+				return ErrInvalidLengthAgent
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			iNdEx += skippy
+		}
+	}
+
+	if iNdEx > l {
+		return io.ErrUnexpectedEOF
+	}
+	return nil
+}
+func (m *PortConfig) Unmarshal(data []byte) error {
+	l := len(data)
+	iNdEx := 0
+	for iNdEx < l {
+		preIndex := iNdEx
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return ErrIntOverflowAgent
+			}
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := data[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		if wireType == 4 {
+			return fmt.Errorf("proto: PortConfig: wiretype end group for non-group")
+		}
+		if fieldNum <= 0 {
+			return fmt.Errorf("proto: PortConfig: illegal tag %d (wire type %d)", fieldNum, wire)
+		}
+		switch fieldNum {
+		case 1:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
+			}
+			var stringLen uint64
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowAgent
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				stringLen |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			intStringLen := int(stringLen)
+			if intStringLen < 0 {
+				return ErrInvalidLengthAgent
+			}
+			postIndex := iNdEx + intStringLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Name = string(data[iNdEx:postIndex])
+			iNdEx = postIndex
+		case 2:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Protocol", wireType)
+			}
+			m.Protocol = 0
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowAgent
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				m.Protocol |= (PortConfig_Protocol(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 3:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Port", wireType)
+			}
+			m.Port = 0
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowAgent
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				m.Port |= (uint32(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 4:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field NodePort", wireType)
+			}
+			m.NodePort = 0
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowAgent
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := data[iNdEx]
+				iNdEx++
+				m.NodePort |= (uint32(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
 		default:
 			iNdEx = preIndex
 			skippy, err := skipAgent(data[iNdEx:])
@@ -526,20 +865,29 @@ var (
 )
 
 var fileDescriptorAgent = []byte{
-	// 228 bytes of a gzipped FileDescriptorProto
-	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0x4c, 0x4f, 0xcd,
-	0x2b, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0xca, 0xc9, 0x4c, 0xca, 0x4b, 0x2d, 0x29,
-	0xcf, 0x2f, 0xca, 0x96, 0x12, 0x49, 0xcf, 0x4f, 0xcf, 0x07, 0x0b, 0xeb, 0x83, 0x58, 0x10, 0x15,
-	0x4a, 0x57, 0x18, 0xb9, 0xf8, 0x5c, 0xf3, 0x52, 0x0a, 0xf2, 0x33, 0xf3, 0x4a, 0x82, 0x52, 0x93,
-	0xf3, 0x8b, 0x52, 0x84, 0x84, 0xb8, 0x58, 0xf2, 0x12, 0x73, 0x53, 0x25, 0x18, 0x15, 0x18, 0x35,
-	0x38, 0x83, 0xc0, 0x6c, 0x21, 0x45, 0x2e, 0x9e, 0xe2, 0xd4, 0xa2, 0xb2, 0xcc, 0xe4, 0xd4, 0x78,
-	0xb0, 0x1c, 0x13, 0x58, 0x8e, 0x1b, 0x2a, 0xe6, 0x07, 0x52, 0xa2, 0xc3, 0xc5, 0x05, 0x53, 0x92,
-	0x99, 0x22, 0xc1, 0x0c, 0x52, 0xe0, 0xc4, 0xfb, 0xe8, 0x9e, 0x3c, 0x67, 0x30, 0x44, 0xd4, 0xd3,
-	0x25, 0x88, 0x13, 0xaa, 0xc0, 0x33, 0x05, 0xa4, 0xba, 0x2c, 0xb3, 0xa8, 0xa4, 0x34, 0x31, 0x27,
-	0x3e, 0xb3, 0x40, 0x82, 0x05, 0xa1, 0x3a, 0x0c, 0x22, 0xea, 0x19, 0x10, 0xc4, 0x09, 0x55, 0xe0,
-	0x59, 0x20, 0xa4, 0xcf, 0xc5, 0x9d, 0x0a, 0x75, 0x24, 0x48, 0x39, 0x2b, 0x58, 0x39, 0x1f, 0x50,
-	0x39, 0x17, 0xcc, 0xed, 0x40, 0xf5, 0x5c, 0x30, 0x25, 0x9e, 0x05, 0x4e, 0x12, 0x37, 0x1e, 0xca,
-	0x31, 0x7c, 0x78, 0x28, 0xc7, 0xd8, 0xf0, 0x48, 0x8e, 0xf1, 0x04, 0x10, 0x5f, 0x00, 0xe2, 0x07,
-	0x40, 0x9c, 0xc4, 0x06, 0xf6, 0xb7, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0xae, 0x11, 0xc5, 0x8d,
-	0x28, 0x01, 0x00, 0x00,
+	// 370 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x6c, 0x90, 0xbf, 0x4e, 0x32, 0x41,
+	0x14, 0xc5, 0x59, 0xe0, 0x23, 0xec, 0x5d, 0x96, 0x8f, 0x4c, 0x8c, 0xd9, 0x60, 0xb2, 0x20, 0x15,
+	0x85, 0x59, 0x12, 0x2c, 0xe9, 0x00, 0x8b, 0x6d, 0xcc, 0x66, 0xfc, 0xd3, 0x12, 0x60, 0xc7, 0xcd,
+	0x44, 0x9c, 0xd9, 0xcc, 0xae, 0xd8, 0x5a, 0x1a, 0x3b, 0x1f, 0xc0, 0xca, 0x97, 0xb1, 0xb4, 0xb0,
+	0xb0, 0x32, 0xc2, 0x13, 0xf8, 0x08, 0xce, 0x0c, 0xbb, 0x12, 0x13, 0x8a, 0x9b, 0xdc, 0x9c, 0xf3,
+	0xbb, 0x37, 0x27, 0x07, 0xac, 0x69, 0x44, 0x58, 0xea, 0xc5, 0x82, 0xa7, 0x1c, 0xc1, 0x82, 0xce,
+	0x18, 0x49, 0xef, 0xb8, 0xb8, 0x6e, 0xee, 0x45, 0x3c, 0xe2, 0x5a, 0xee, 0xa9, 0x6d, 0x43, 0x74,
+	0x9e, 0x8a, 0x50, 0x3f, 0x61, 0x61, 0xcc, 0x29, 0x4b, 0x31, 0x99, 0x73, 0x11, 0x22, 0x04, 0x65,
+	0x36, 0xbd, 0x21, 0x8e, 0xd1, 0x36, 0xba, 0x26, 0xd6, 0x3b, 0x3a, 0x84, 0x5a, 0x42, 0xc4, 0x92,
+	0xce, 0xc9, 0x44, 0x7b, 0x45, 0xed, 0x59, 0x99, 0x76, 0xaa, 0x90, 0x23, 0x80, 0x1c, 0xa1, 0xa1,
+	0x53, 0x52, 0xc0, 0xd0, 0x5e, 0x7f, 0xb6, 0xcc, 0xb3, 0x8d, 0xea, 0x8f, 0xb1, 0x99, 0x01, 0x7e,
+	0xa8, 0xe8, 0x25, 0x15, 0xe9, 0xed, 0x74, 0x31, 0xa1, 0xb1, 0x53, 0xde, 0xd2, 0x97, 0x1b, 0xd5,
+	0x0f, 0xb0, 0x99, 0x01, 0x7e, 0x8c, 0x7a, 0x60, 0x91, 0x2c, 0xa4, 0xc2, 0xff, 0x69, 0xbc, 0x2e,
+	0x71, 0xc8, 0xb3, 0x4b, 0x1e, 0x72, 0x44, 0x1e, 0x0c, 0xc0, 0xa6, 0x2c, 0x12, 0x24, 0x49, 0x26,
+	0x31, 0x17, 0x69, 0xe2, 0x54, 0xda, 0xa5, 0xae, 0xd5, 0xdf, 0xf7, 0xb6, 0x85, 0x78, 0x81, 0x34,
+	0x46, 0x9c, 0x5d, 0xd1, 0x08, 0xd7, 0x32, 0x58, 0x49, 0x49, 0xe7, 0xdd, 0x00, 0xd8, 0x9a, 0x3b,
+	0xfb, 0x18, 0x40, 0x55, 0xf7, 0x37, 0xe7, 0x0b, 0xdd, 0x45, 0xbd, 0xdf, 0xda, 0xfd, 0xda, 0x0b,
+	0x32, 0x0c, 0xff, 0x1e, 0xa8, 0x87, 0x2a, 0x94, 0xee, 0xc8, 0xc6, 0x7a, 0x47, 0x07, 0x60, 0x32,
+	0x1e, 0x12, 0x9d, 0x56, 0xd7, 0x61, 0xe3, 0xaa, 0x12, 0xd4, 0xa7, 0xce, 0x18, 0xaa, 0xf9, 0x1b,
+	0xe4, 0x40, 0xe9, 0x7c, 0x14, 0x34, 0x0a, 0xcd, 0xff, 0x8f, 0xcf, 0x6d, 0x2b, 0x97, 0xa5, 0xa4,
+	0x9c, 0x8b, 0x71, 0xd0, 0x30, 0xfe, 0x3a, 0x52, 0x6a, 0x96, 0x1f, 0x5e, 0xdc, 0xc2, 0xd0, 0xf9,
+	0x58, 0xb9, 0x85, 0xef, 0x95, 0x6b, 0xdc, 0xaf, 0x5d, 0xe3, 0x55, 0xce, 0x9b, 0x9c, 0x2f, 0x39,
+	0xb3, 0x8a, 0x8e, 0x76, 0xfc, 0x13, 0x00, 0x00, 0xff, 0xff, 0xca, 0xbb, 0xca, 0xdf, 0x3c, 0x02,
+	0x00, 0x00,
 }

+ 47 - 1
libnetwork/agent.proto

@@ -11,10 +11,56 @@ option (gogoproto.gostring_all) = true;
 option (gogoproto.sizer_all) = true;
 option (gogoproto.goproto_stringer_all) = false;
 
+// EndpointRecord specifies all the endpoint specific information that
+// needs to gossiped to nodes participating in the network.
 message EndpointRecord {
+	// Name of the endpoint
 	string name = 1;
+
+	// Service name of the service to which this endpoint belongs.
 	string service_name = 2;
+
+	// Service ID of the service to which this endpoint belongs.
 	string service_id = 3 [(gogoproto.customname) = "ServiceID"];
+
+	// Virtual IP of the service to which this endpoint belongs.
 	string virtual_ip = 4 [(gogoproto.customname) = "VirtualIP"];
+
+	// IP assigned to this endpoint.
 	string endpoint_ip = 5 [(gogoproto.customname) = "EndpointIP"];
-}
+
+	// IngressPorts exposed by the service to which this endpoint belongs.
+	repeated PortConfig ingress_ports = 6;
+}
+
+// PortConfig specifies an exposed port which can be
+// addressed using the given name. This can be later queried
+// using a service discovery api or a DNS SRV query. The node
+// port specifies a port that can be used to address this
+// service external to the cluster by sending a connection
+// request to this port to any node on the cluster.
+message PortConfig {
+	enum Protocol {
+		option (gogoproto.goproto_enum_prefix) = false;
+
+		TCP = 0 [(gogoproto.enumvalue_customname) = "ProtocolTCP"];
+		UDP = 1 [(gogoproto.enumvalue_customname) = "ProtocolUDP"];
+	}
+
+	// Name for the port. If provided the port information can
+	// be queried using the name as in a DNS SRV query.
+	string name = 1;
+
+	// Protocol for the port which is exposed.
+	Protocol protocol = 2;
+
+	// The port which the application is exposing and is bound to.
+	uint32 port = 3;
+
+	// NodePort specifies the port on which the service is
+	// exposed on all nodes on the cluster. If not specified an
+	// arbitrary port in the node port range is allocated by the
+	// system. If specified it should be within the node port
+	// range and it should be available.
+	uint32 node_port = 4;
+}

+ 20 - 3
libnetwork/controller.go

@@ -136,6 +136,7 @@ type controller struct {
 	nmap            map[string]*netWatch
 	serviceBindings map[string]*service
 	defOsSbox       osl.Sandbox
+	ingressSandbox  *sandbox
 	sboxOnce        sync.Once
 	agent           *agent
 	sync.Mutex
@@ -671,9 +672,7 @@ func (c *controller) NetworkByID(id string) (Network, error) {
 }
 
 // NewSandbox creates a new sandbox for the passed container id
-func (c *controller) NewSandbox(containerID string, options ...SandboxOption) (Sandbox, error) {
-	var err error
-
+func (c *controller) NewSandbox(containerID string, options ...SandboxOption) (sBox Sandbox, err error) {
 	if containerID == "" {
 		return nil, types.BadRequestErrorf("invalid container ID")
 	}
@@ -710,11 +709,29 @@ func (c *controller) NewSandbox(containerID string, options ...SandboxOption) (S
 			controller:  c,
 		}
 	}
+	sBox = sb
 
 	heap.Init(&sb.endpoints)
 
 	sb.processOptions(options...)
 
+	c.Lock()
+	if sb.ingress && c.ingressSandbox != nil {
+		return nil, fmt.Errorf("ingress sandbox already present")
+	}
+
+	c.ingressSandbox = sb
+	c.Unlock()
+	defer func() {
+		if err != nil {
+			c.Lock()
+			if sb.ingress {
+				c.ingressSandbox = nil
+			}
+			c.Unlock()
+		}
+	}()
+
 	if err = sb.setupResolutionFiles(); err != nil {
 		return nil, err
 	}

+ 12 - 1
libnetwork/endpoint.go

@@ -70,6 +70,7 @@ type endpoint struct {
 	svcID             string
 	svcName           string
 	virtualIP         net.IP
+	ingressPorts      []*PortConfig
 	dbIndex           uint64
 	dbExists          bool
 	sync.Mutex
@@ -95,6 +96,7 @@ func (ep *endpoint) MarshalJSON() ([]byte, error) {
 	epMap["svcName"] = ep.svcName
 	epMap["svcID"] = ep.svcID
 	epMap["virtualIP"] = ep.virtualIP.String()
+	epMap["ingressPorts"] = ep.ingressPorts
 
 	return json.Marshal(epMap)
 }
@@ -192,6 +194,11 @@ func (ep *endpoint) UnmarshalJSON(b []byte) (err error) {
 		ep.virtualIP = net.ParseIP(vip.(string))
 	}
 
+	pc, _ := json.Marshal(epMap["ingressPorts"])
+	var ingressPorts []*PortConfig
+	json.Unmarshal(pc, &ingressPorts)
+	ep.ingressPorts = ingressPorts
+
 	ma, _ := json.Marshal(epMap["myAliases"])
 	var myAliases []string
 	json.Unmarshal(ma, &myAliases)
@@ -220,6 +227,9 @@ func (ep *endpoint) CopyTo(o datastore.KVObject) error {
 	dstEp.svcID = ep.svcID
 	dstEp.virtualIP = ep.virtualIP
 
+	dstEp.ingressPorts = make([]*PortConfig, len(ep.ingressPorts))
+	copy(dstEp.ingressPorts, ep.ingressPorts)
+
 	if ep.iface != nil {
 		dstEp.iface = &endpointInterface{}
 		ep.iface.CopyTo(dstEp.iface)
@@ -902,11 +912,12 @@ func CreateOptionAlias(name string, alias string) EndpointOption {
 }
 
 // CreateOptionService function returns an option setter for setting service binding configuration
-func CreateOptionService(name, id string, vip net.IP) EndpointOption {
+func CreateOptionService(name, id string, vip net.IP, ingressPorts []*PortConfig) EndpointOption {
 	return func(ep *endpoint) {
 		ep.svcName = name
 		ep.svcID = id
 		ep.virtualIP = vip
+		ep.ingressPorts = ingressPorts
 	}
 }
 

+ 7 - 0
libnetwork/ipvs/netlink.go

@@ -7,10 +7,13 @@ import (
 	"encoding/binary"
 	"fmt"
 	"net"
+	"os/exec"
+	"strings"
 	"sync"
 	"syscall"
 	"unsafe"
 
+	"github.com/Sirupsen/logrus"
 	"github.com/vishvananda/netlink/nl"
 	"github.com/vishvananda/netns"
 )
@@ -55,6 +58,10 @@ func (f *ipvsFlags) Len() int {
 func setup() {
 	ipvsOnce.Do(func() {
 		var err error
+		if out, err := exec.Command("modprobe", "-va", "ip_vs").CombinedOutput(); err != nil {
+			logrus.Warnf("Running modprobe nf_nat failed with message: `%s`, error: %v", strings.TrimSpace(string(out)), err)
+		}
+
 		ipvsFamily, err = getIPVSFamily()
 		if err != nil {
 			panic("could not get ipvs family")

+ 14 - 0
libnetwork/network.go

@@ -185,6 +185,7 @@ type network struct {
 	drvOnce      *sync.Once
 	internal     bool
 	inDelete     bool
+	ingress      bool
 	driverTables []string
 	sync.Mutex
 }
@@ -326,6 +327,7 @@ func (n *network) CopyTo(o datastore.KVObject) error {
 	dstN.drvOnce = n.drvOnce
 	dstN.internal = n.internal
 	dstN.inDelete = n.inDelete
+	dstN.ingress = n.ingress
 
 	// copy labels
 	if dstN.labels == nil {
@@ -432,6 +434,7 @@ func (n *network) MarshalJSON() ([]byte, error) {
 	}
 	netMap["internal"] = n.internal
 	netMap["inDelete"] = n.inDelete
+	netMap["ingress"] = n.ingress
 	return json.Marshal(netMap)
 }
 
@@ -522,6 +525,9 @@ func (n *network) UnmarshalJSON(b []byte) (err error) {
 	if v, ok := netMap["inDelete"]; ok {
 		n.inDelete = v.(bool)
 	}
+	if v, ok := netMap["ingress"]; ok {
+		n.ingress = v.(bool)
+	}
 	// Reconcile old networks with the recently added `--ipv6` flag
 	if !n.enableIPv6 {
 		n.enableIPv6 = len(n.ipamV6Info) > 0
@@ -553,6 +559,14 @@ func NetworkOptionGeneric(generic map[string]interface{}) NetworkOption {
 	}
 }
 
+// NetworkOptionIngress returns an option setter to indicate if a network is
+// an ingress network.
+func NetworkOptionIngress() NetworkOption {
+	return func(n *network) {
+		n.ingress = true
+	}
+}
+
 // NetworkOptionPersist returns an option setter to set persistence policy for a network
 func NetworkOptionPersist(persist bool) NetworkOption {
 	return func(n *network) {

+ 9 - 0
libnetwork/sandbox.go

@@ -84,6 +84,7 @@ type sandbox struct {
 	dbExists      bool
 	isStub        bool
 	inDelete      bool
+	ingress       bool
 	sync.Mutex
 }
 
@@ -1013,6 +1014,14 @@ func OptionPortMapping(portBindings []types.PortBinding) SandboxOption {
 	}
 }
 
+// OptionIngress function returns an option setter for marking a
+// sandbox as the controller's ingress sandbox.
+func OptionIngress() SandboxOption {
+	return func(sb *sandbox) {
+		sb.ingress = true
+	}
+}
+
 func (eh epHeap) Len() int { return len(eh) }
 
 func (eh epHeap) Less(i, j int) bool {

+ 7 - 0
libnetwork/service.go

@@ -19,6 +19,10 @@ type service struct {
 	// Map of loadbalancers for the service one-per attached
 	// network. It is keyed with network ID.
 	loadBalancers map[string]*loadBalancer
+
+	// List of ingress ports exposed by the service
+	ingressPorts []*PortConfig
+
 	sync.Mutex
 }
 
@@ -29,4 +33,7 @@ type loadBalancer struct {
 	// Map of backend IPs backing this loadbalancer on this
 	// network. It is keyed with endpoint ID.
 	backEnds map[string]net.IP
+
+	// Back pointer to service to which the loadbalancer belongs.
+	service *service
 }

+ 171 - 23
libnetwork/service_linux.go

@@ -2,6 +2,8 @@ package libnetwork
 
 import (
 	"fmt"
+	"io"
+	"io/ioutil"
 	"net"
 	"os"
 	"os/exec"
@@ -13,6 +15,7 @@ import (
 	"github.com/docker/docker/pkg/reexec"
 	"github.com/docker/libnetwork/iptables"
 	"github.com/docker/libnetwork/ipvs"
+	"github.com/gogo/protobuf/proto"
 	"github.com/vishvananda/netlink/nl"
 	"github.com/vishvananda/netns"
 )
@@ -21,15 +24,16 @@ func init() {
 	reexec.Register("fwmarker", fwMarker)
 }
 
-func newService(name string, id string) *service {
+func newService(name string, id string, ingressPorts []*PortConfig) *service {
 	return &service{
 		name:          name,
 		id:            id,
+		ingressPorts:  ingressPorts,
 		loadBalancers: make(map[string]*loadBalancer),
 	}
 }
 
-func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ip net.IP) error {
+func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, ip net.IP) error {
 	var (
 		s          *service
 		addService bool
@@ -45,7 +49,7 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i
 	if !ok {
 		// Create a new service if we are seeing this service
 		// for the first time.
-		s = newService(name, sid)
+		s = newService(name, sid, ingressPorts)
 		c.serviceBindings[sid] = s
 	}
 	c.Unlock()
@@ -60,6 +64,7 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i
 			vip:      vip,
 			fwMark:   fwMarkCtr,
 			backEnds: make(map[string]net.IP),
+			service:  s,
 		}
 
 		fwMarkCtrMu.Lock()
@@ -91,13 +96,13 @@ func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, i
 	// Add loadbalancer service and backend in all sandboxes in
 	// the network only if vip is valid.
 	if len(vip) != 0 {
-		n.(*network).addLBBackend(ip, vip, lb.fwMark, addService)
+		n.(*network).addLBBackend(ip, vip, lb.fwMark, ingressPorts, addService)
 	}
 
 	return nil
 }
 
-func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ip net.IP) error {
+func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, ip net.IP) error {
 	var rmService bool
 
 	n, err := c.NetworkByID(nid)
@@ -151,14 +156,14 @@ func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ip
 	// Remove loadbalancer service(if needed) and backend in all
 	// sandboxes in the network only if the vip is valid.
 	if len(vip) != 0 {
-		n.(*network).rmLBBackend(ip, vip, lb.fwMark, rmService)
+		n.(*network).rmLBBackend(ip, vip, lb.fwMark, ingressPorts, rmService)
 	}
 
 	return nil
 }
 
 // Get all loadbalancers on this network that is currently discovered
-// on this node..
+// on this node.
 func (n *network) connectedLoadbalancers() []*loadBalancer {
 	c := n.getController()
 
@@ -178,7 +183,29 @@ func (n *network) connectedLoadbalancers() []*loadBalancer {
 // Populate all loadbalancers on the network that the passed endpoint
 // belongs to, into this sandbox.
 func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
+	var gwIP net.IP
+
 	n := ep.getNetwork()
+	eIP := ep.Iface().Address()
+
+	if sb.ingress {
+		// For the ingress sandbox if this is not gateway
+		// endpoint do nothing.
+		if ep != sb.getGatewayEndpoint() {
+			return
+		}
+
+		// This is the gateway endpoint. Now get the ingress
+		// network and plumb the loadbalancers.
+		gwIP = ep.Iface().Address().IP
+		for _, ep := range sb.getConnectedEndpoints() {
+			if !ep.endpointInGWNetwork() {
+				n = ep.getNetwork()
+				eIP = ep.Iface().Address()
+			}
+		}
+	}
+
 	for _, lb := range n.connectedLoadbalancers() {
 		// Skip if vip is not valid.
 		if len(lb.vip) == 0 {
@@ -187,7 +214,8 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
 
 		addService := true
 		for _, ip := range lb.backEnds {
-			sb.addLBBackend(ip, lb.vip, lb.fwMark, addService)
+			sb.addLBBackend(ip, lb.vip, lb.fwMark, lb.service.ingressPorts,
+				eIP, gwIP, addService)
 			addService = false
 		}
 	}
@@ -196,11 +224,16 @@ func (sb *sandbox) populateLoadbalancers(ep *endpoint) {
 // Add loadbalancer backend to all sandboxes which has a connection to
 // this network. If needed add the service as well, as specified by
 // the addService bool.
-func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, addService bool) {
+func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, addService bool) {
 	n.WalkEndpoints(func(e Endpoint) bool {
 		ep := e.(*endpoint)
 		if sb, ok := ep.getSandbox(); ok {
-			sb.addLBBackend(ip, vip, fwMark, addService)
+			var gwIP net.IP
+			if ep := sb.getGatewayEndpoint(); ep != nil {
+				gwIP = ep.Iface().Address().IP
+			}
+
+			sb.addLBBackend(ip, vip, fwMark, ingressPorts, ep.Iface().Address(), gwIP, addService)
 		}
 
 		return false
@@ -210,11 +243,16 @@ func (n *network) addLBBackend(ip, vip net.IP, fwMark uint32, addService bool) {
 // Remove loadbalancer backend from all sandboxes which has a
 // connection to this network. If needed remove the service entry as
 // well, as specified by the rmService bool.
-func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, rmService bool) {
+func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, rmService bool) {
 	n.WalkEndpoints(func(e Endpoint) bool {
 		ep := e.(*endpoint)
 		if sb, ok := ep.getSandbox(); ok {
-			sb.rmLBBackend(ip, vip, fwMark, rmService)
+			var gwIP net.IP
+			if ep := sb.getGatewayEndpoint(); ep != nil {
+				gwIP = ep.Iface().Address().IP
+			}
+
+			sb.rmLBBackend(ip, vip, fwMark, ingressPorts, ep.Iface().Address(), gwIP, rmService)
 		}
 
 		return false
@@ -222,7 +260,7 @@ func (n *network) rmLBBackend(ip, vip net.IP, fwMark uint32, rmService bool) {
 }
 
 // Add loadbalancer backend into one connected sandbox.
-func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, addService bool) {
+func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, gwIP net.IP, addService bool) {
 	i, err := ipvs.New(sb.Key())
 	if err != nil {
 		logrus.Errorf("Failed to create a ipvs handle for sbox %s: %v", sb.Key(), err)
@@ -237,8 +275,17 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, addService bool)
 	}
 
 	if addService {
-		logrus.Debugf("Creating service for vip %s fwMark %d", vip, fwMark)
-		if err := invokeFWMarker(sb.Key(), vip, fwMark, false); err != nil {
+		var iPorts []*PortConfig
+		if sb.ingress {
+			iPorts = ingressPorts
+			if err := programIngress(gwIP, iPorts, false); err != nil {
+				logrus.Errorf("Failed to add ingress: %v", err)
+				return
+			}
+		}
+
+		logrus.Debugf("Creating service for vip %s fwMark %d ingressPorts %#v", vip, fwMark, iPorts)
+		if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, false); err != nil {
 			logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
 			return
 		}
@@ -264,7 +311,7 @@ func (sb *sandbox) addLBBackend(ip, vip net.IP, fwMark uint32, addService bool)
 }
 
 // Remove loadbalancer backend from one connected sandbox.
-func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, rmService bool) {
+func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, gwIP net.IP, rmService bool) {
 	i, err := ipvs.New(sb.Key())
 	if err != nil {
 		logrus.Errorf("Failed to create a ipvs handle for sbox %s: %v", sb.Key(), err)
@@ -295,16 +342,68 @@ func (sb *sandbox) rmLBBackend(ip, vip net.IP, fwMark uint32, rmService bool) {
 			return
 		}
 
-		if err := invokeFWMarker(sb.Key(), vip, fwMark, true); err != nil {
+		var iPorts []*PortConfig
+		if sb.ingress {
+			iPorts = ingressPorts
+			if err := programIngress(gwIP, iPorts, true); err != nil {
+				logrus.Errorf("Failed to delete ingress: %v", err)
+				return
+			}
+		}
+
+		if err := invokeFWMarker(sb.Key(), vip, fwMark, iPorts, eIP, true); err != nil {
 			logrus.Errorf("Failed to add firewall mark rule in sbox %s: %v", sb.Key(), err)
 			return
 		}
 	}
 }
 
+func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) error {
+	addDelOpt := "-A"
+	if isDelete {
+		addDelOpt = "-D"
+	}
+
+	for _, iPort := range ingressPorts {
+		rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j DNAT --to-destination %s:%d",
+			addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.NodePort, gwIP, iPort.NodePort))
+		if err := iptables.RawCombinedOutput(rule...); err != nil {
+			return fmt.Errorf("setting up rule failed, %v: %v", rule, err)
+		}
+	}
+
+	return nil
+}
+
 // Invoke fwmarker reexec routine to mark vip destined packets with
 // the passed firewall mark.
-func invokeFWMarker(path string, vip net.IP, fwMark uint32, isDelete bool) error {
+func invokeFWMarker(path string, vip net.IP, fwMark uint32, ingressPorts []*PortConfig, eIP *net.IPNet, isDelete bool) error {
+	var ingressPortsFile string
+	if len(ingressPorts) != 0 {
+		f, err := ioutil.TempFile("", "port_configs")
+		if err != nil {
+			return err
+		}
+
+		buf, err := proto.Marshal(&EndpointRecord{
+			IngressPorts: ingressPorts,
+		})
+
+		n, err := f.Write(buf)
+		if err != nil {
+			f.Close()
+			return err
+		}
+
+		if n < len(buf) {
+			f.Close()
+			return io.ErrShortWrite
+		}
+
+		ingressPortsFile = f.Name()
+		f.Close()
+	}
+
 	addDelOpt := "-A"
 	if isDelete {
 		addDelOpt = "-D"
@@ -312,13 +411,15 @@ func invokeFWMarker(path string, vip net.IP, fwMark uint32, isDelete bool) error
 
 	cmd := &exec.Cmd{
 		Path:   reexec.Self(),
-		Args:   append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt),
+		Args:   append([]string{"fwmarker"}, path, vip.String(), fmt.Sprintf("%d", fwMark), addDelOpt, ingressPortsFile, eIP.IP.String()),
 		Stdout: os.Stdout,
 		Stderr: os.Stderr,
 	}
+
 	if err := cmd.Run(); err != nil {
 		return fmt.Errorf("reexec failed: %v", err)
 	}
+
 	return nil
 }
 
@@ -327,11 +428,29 @@ func fwMarker() {
 	runtime.LockOSThread()
 	defer runtime.UnlockOSThread()
 
-	if len(os.Args) < 5 {
+	if len(os.Args) < 7 {
 		logrus.Error("invalid number of arguments..")
 		os.Exit(1)
 	}
 
+	var ingressPorts []*PortConfig
+	if os.Args[5] != "" {
+		buf, err := ioutil.ReadFile(os.Args[5])
+		if err != nil {
+			logrus.Errorf("Failed to read ports config file: %v", err)
+			os.Exit(6)
+		}
+
+		var epRec EndpointRecord
+		err = proto.Unmarshal(buf, &epRec)
+		if err != nil {
+			logrus.Errorf("Failed to unmarshal ports config data: %v", err)
+			os.Exit(7)
+		}
+
+		ingressPorts = epRec.IngressPorts
+	}
+
 	vip := os.Args[2]
 	fwMark, err := strconv.ParseUint(os.Args[3], 10, 32)
 	if err != nil {
@@ -340,6 +459,17 @@ func fwMarker() {
 	}
 	addDelOpt := os.Args[4]
 
+	rules := [][]string{}
+	for _, iPort := range ingressPorts {
+		rule := strings.Fields(fmt.Sprintf("-t nat %s PREROUTING -p %s --dport %d -j REDIRECT --to-port %d",
+			addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.NodePort, iPort.Port))
+		rules = append(rules, rule)
+
+		rule = strings.Fields(fmt.Sprintf("-t mangle %s PREROUTING -p %s --dport %d -j MARK --set-mark %d",
+			addDelOpt, strings.ToLower(PortConfig_Protocol_name[int32(iPort.Protocol)]), iPort.NodePort, fwMark))
+		rules = append(rules, rule)
+	}
+
 	ns, err := netns.GetFromPath(os.Args[1])
 	if err != nil {
 		logrus.Errorf("failed get network namespace %q: %v", os.Args[1], err)
@@ -352,9 +482,27 @@ func fwMarker() {
 		os.Exit(4)
 	}
 
+	if len(ingressPorts) != 0 && addDelOpt == "-A" {
+		ruleParams := strings.Fields(fmt.Sprintf("-m ipvs --ipvs -j SNAT --to-source %s", os.Args[6]))
+		if !iptables.Exists("nat", "POSTROUTING", ruleParams...) {
+			rule := append(strings.Fields("-t nat -A POSTROUTING"), ruleParams...)
+			rules = append(rules, rule)
+
+			err := ioutil.WriteFile("/proc/sys/net/ipv4/vs/conntrack", []byte{'1', '\n'}, 0644)
+			if err != nil {
+				logrus.Errorf("Failed to write to /proc/sys/net/ipv4/vs/conntrack: %v", err)
+				os.Exit(8)
+			}
+		}
+	}
+
 	rule := strings.Fields(fmt.Sprintf("-t mangle %s OUTPUT -d %s/32 -j MARK --set-mark %d", addDelOpt, vip, fwMark))
-	if err := iptables.RawCombinedOutputNative(rule...); err != nil {
-		logrus.Errorf("setting up rule failed, %v: %v", rule, err)
-		os.Exit(5)
+	rules = append(rules, rule)
+
+	for _, rule := range rules {
+		if err := iptables.RawCombinedOutputNative(rule...); err != nil {
+			logrus.Errorf("setting up rule failed, %v: %v", rule, err)
+			os.Exit(5)
+		}
 	}
 }

+ 2 - 2
libnetwork/service_unsupported.go

@@ -7,11 +7,11 @@ import (
 	"net"
 )
 
-func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ip net.IP) error {
+func (c *controller) addServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, ip net.IP) error {
 	return fmt.Errorf("not supported")
 }
 
-func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ip net.IP) error {
+func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, ingressPorts []*PortConfig, ip net.IP) error {
 	return fmt.Errorf("not supported")
 }