Przeglądaj źródła

Merge pull request #28403 from cpuguy83/logging_plugins

Implement plugins for logging drivers
Sebastiaan van Stijn 8 lat temu
rodzic
commit
28334c1d82

+ 449 - 0
api/types/plugins/logdriver/entry.pb.go

@@ -0,0 +1,449 @@
+// Code generated by protoc-gen-gogo.
+// source: entry.proto
+// DO NOT EDIT!
+
+/*
+	Package logdriver is a generated protocol buffer package.
+
+	It is generated from these files:
+		entry.proto
+
+	It has these top-level messages:
+		LogEntry
+*/
+package logdriver
+
+import proto "github.com/gogo/protobuf/proto"
+import fmt "fmt"
+import math "math"
+
+import io "io"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
+
+type LogEntry struct {
+	Source   string `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"`
+	TimeNano int64  `protobuf:"varint,2,opt,name=time_nano,json=timeNano,proto3" json:"time_nano,omitempty"`
+	Line     []byte `protobuf:"bytes,3,opt,name=line,proto3" json:"line,omitempty"`
+	Partial  bool   `protobuf:"varint,4,opt,name=partial,proto3" json:"partial,omitempty"`
+}
+
+func (m *LogEntry) Reset()                    { *m = LogEntry{} }
+func (m *LogEntry) String() string            { return proto.CompactTextString(m) }
+func (*LogEntry) ProtoMessage()               {}
+func (*LogEntry) Descriptor() ([]byte, []int) { return fileDescriptorEntry, []int{0} }
+
+func (m *LogEntry) GetSource() string {
+	if m != nil {
+		return m.Source
+	}
+	return ""
+}
+
+func (m *LogEntry) GetTimeNano() int64 {
+	if m != nil {
+		return m.TimeNano
+	}
+	return 0
+}
+
+func (m *LogEntry) GetLine() []byte {
+	if m != nil {
+		return m.Line
+	}
+	return nil
+}
+
+func (m *LogEntry) GetPartial() bool {
+	if m != nil {
+		return m.Partial
+	}
+	return false
+}
+
+func init() {
+	proto.RegisterType((*LogEntry)(nil), "LogEntry")
+}
+func (m *LogEntry) Marshal() (dAtA []byte, err error) {
+	size := m.Size()
+	dAtA = make([]byte, size)
+	n, err := m.MarshalTo(dAtA)
+	if err != nil {
+		return nil, err
+	}
+	return dAtA[:n], nil
+}
+
+func (m *LogEntry) MarshalTo(dAtA []byte) (int, error) {
+	var i int
+	_ = i
+	var l int
+	_ = l
+	if len(m.Source) > 0 {
+		dAtA[i] = 0xa
+		i++
+		i = encodeVarintEntry(dAtA, i, uint64(len(m.Source)))
+		i += copy(dAtA[i:], m.Source)
+	}
+	if m.TimeNano != 0 {
+		dAtA[i] = 0x10
+		i++
+		i = encodeVarintEntry(dAtA, i, uint64(m.TimeNano))
+	}
+	if len(m.Line) > 0 {
+		dAtA[i] = 0x1a
+		i++
+		i = encodeVarintEntry(dAtA, i, uint64(len(m.Line)))
+		i += copy(dAtA[i:], m.Line)
+	}
+	if m.Partial {
+		dAtA[i] = 0x20
+		i++
+		if m.Partial {
+			dAtA[i] = 1
+		} else {
+			dAtA[i] = 0
+		}
+		i++
+	}
+	return i, nil
+}
+
+func encodeFixed64Entry(dAtA []byte, offset int, v uint64) int {
+	dAtA[offset] = uint8(v)
+	dAtA[offset+1] = uint8(v >> 8)
+	dAtA[offset+2] = uint8(v >> 16)
+	dAtA[offset+3] = uint8(v >> 24)
+	dAtA[offset+4] = uint8(v >> 32)
+	dAtA[offset+5] = uint8(v >> 40)
+	dAtA[offset+6] = uint8(v >> 48)
+	dAtA[offset+7] = uint8(v >> 56)
+	return offset + 8
+}
+func encodeFixed32Entry(dAtA []byte, offset int, v uint32) int {
+	dAtA[offset] = uint8(v)
+	dAtA[offset+1] = uint8(v >> 8)
+	dAtA[offset+2] = uint8(v >> 16)
+	dAtA[offset+3] = uint8(v >> 24)
+	return offset + 4
+}
+func encodeVarintEntry(dAtA []byte, offset int, v uint64) int {
+	for v >= 1<<7 {
+		dAtA[offset] = uint8(v&0x7f | 0x80)
+		v >>= 7
+		offset++
+	}
+	dAtA[offset] = uint8(v)
+	return offset + 1
+}
+func (m *LogEntry) Size() (n int) {
+	var l int
+	_ = l
+	l = len(m.Source)
+	if l > 0 {
+		n += 1 + l + sovEntry(uint64(l))
+	}
+	if m.TimeNano != 0 {
+		n += 1 + sovEntry(uint64(m.TimeNano))
+	}
+	l = len(m.Line)
+	if l > 0 {
+		n += 1 + l + sovEntry(uint64(l))
+	}
+	if m.Partial {
+		n += 2
+	}
+	return n
+}
+
+func sovEntry(x uint64) (n int) {
+	for {
+		n++
+		x >>= 7
+		if x == 0 {
+			break
+		}
+	}
+	return n
+}
+func sozEntry(x uint64) (n int) {
+	return sovEntry(uint64((x << 1) ^ uint64((int64(x) >> 63))))
+}
+func (m *LogEntry) Unmarshal(dAtA []byte) error {
+	l := len(dAtA)
+	iNdEx := 0
+	for iNdEx < l {
+		preIndex := iNdEx
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return ErrIntOverflowEntry
+			}
+			if iNdEx >= l {
+				return io.ErrUnexpectedEOF
+			}
+			b := dAtA[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		fieldNum := int32(wire >> 3)
+		wireType := int(wire & 0x7)
+		if wireType == 4 {
+			return fmt.Errorf("proto: LogEntry: wiretype end group for non-group")
+		}
+		if fieldNum <= 0 {
+			return fmt.Errorf("proto: LogEntry: illegal tag %d (wire type %d)", fieldNum, wire)
+		}
+		switch fieldNum {
+		case 1:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Source", wireType)
+			}
+			var stringLen uint64
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowEntry
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				stringLen |= (uint64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			intStringLen := int(stringLen)
+			if intStringLen < 0 {
+				return ErrInvalidLengthEntry
+			}
+			postIndex := iNdEx + intStringLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Source = string(dAtA[iNdEx:postIndex])
+			iNdEx = postIndex
+		case 2:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field TimeNano", wireType)
+			}
+			m.TimeNano = 0
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowEntry
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				m.TimeNano |= (int64(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+		case 3:
+			if wireType != 2 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Line", wireType)
+			}
+			var byteLen int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowEntry
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				byteLen |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			if byteLen < 0 {
+				return ErrInvalidLengthEntry
+			}
+			postIndex := iNdEx + byteLen
+			if postIndex > l {
+				return io.ErrUnexpectedEOF
+			}
+			m.Line = append(m.Line[:0], dAtA[iNdEx:postIndex]...)
+			if m.Line == nil {
+				m.Line = []byte{}
+			}
+			iNdEx = postIndex
+		case 4:
+			if wireType != 0 {
+				return fmt.Errorf("proto: wrong wireType = %d for field Partial", wireType)
+			}
+			var v int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return ErrIntOverflowEntry
+				}
+				if iNdEx >= l {
+					return io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				v |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			m.Partial = bool(v != 0)
+		default:
+			iNdEx = preIndex
+			skippy, err := skipEntry(dAtA[iNdEx:])
+			if err != nil {
+				return err
+			}
+			if skippy < 0 {
+				return ErrInvalidLengthEntry
+			}
+			if (iNdEx + skippy) > l {
+				return io.ErrUnexpectedEOF
+			}
+			iNdEx += skippy
+		}
+	}
+
+	if iNdEx > l {
+		return io.ErrUnexpectedEOF
+	}
+	return nil
+}
+func skipEntry(dAtA []byte) (n int, err error) {
+	l := len(dAtA)
+	iNdEx := 0
+	for iNdEx < l {
+		var wire uint64
+		for shift := uint(0); ; shift += 7 {
+			if shift >= 64 {
+				return 0, ErrIntOverflowEntry
+			}
+			if iNdEx >= l {
+				return 0, io.ErrUnexpectedEOF
+			}
+			b := dAtA[iNdEx]
+			iNdEx++
+			wire |= (uint64(b) & 0x7F) << shift
+			if b < 0x80 {
+				break
+			}
+		}
+		wireType := int(wire & 0x7)
+		switch wireType {
+		case 0:
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return 0, ErrIntOverflowEntry
+				}
+				if iNdEx >= l {
+					return 0, io.ErrUnexpectedEOF
+				}
+				iNdEx++
+				if dAtA[iNdEx-1] < 0x80 {
+					break
+				}
+			}
+			return iNdEx, nil
+		case 1:
+			iNdEx += 8
+			return iNdEx, nil
+		case 2:
+			var length int
+			for shift := uint(0); ; shift += 7 {
+				if shift >= 64 {
+					return 0, ErrIntOverflowEntry
+				}
+				if iNdEx >= l {
+					return 0, io.ErrUnexpectedEOF
+				}
+				b := dAtA[iNdEx]
+				iNdEx++
+				length |= (int(b) & 0x7F) << shift
+				if b < 0x80 {
+					break
+				}
+			}
+			iNdEx += length
+			if length < 0 {
+				return 0, ErrInvalidLengthEntry
+			}
+			return iNdEx, nil
+		case 3:
+			for {
+				var innerWire uint64
+				var start int = iNdEx
+				for shift := uint(0); ; shift += 7 {
+					if shift >= 64 {
+						return 0, ErrIntOverflowEntry
+					}
+					if iNdEx >= l {
+						return 0, io.ErrUnexpectedEOF
+					}
+					b := dAtA[iNdEx]
+					iNdEx++
+					innerWire |= (uint64(b) & 0x7F) << shift
+					if b < 0x80 {
+						break
+					}
+				}
+				innerWireType := int(innerWire & 0x7)
+				if innerWireType == 4 {
+					break
+				}
+				next, err := skipEntry(dAtA[start:])
+				if err != nil {
+					return 0, err
+				}
+				iNdEx = start + next
+			}
+			return iNdEx, nil
+		case 4:
+			return iNdEx, nil
+		case 5:
+			iNdEx += 4
+			return iNdEx, nil
+		default:
+			return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
+		}
+	}
+	panic("unreachable")
+}
+
+var (
+	ErrInvalidLengthEntry = fmt.Errorf("proto: negative length found during unmarshaling")
+	ErrIntOverflowEntry   = fmt.Errorf("proto: integer overflow")
+)
+
+func init() { proto.RegisterFile("entry.proto", fileDescriptorEntry) }
+
+var fileDescriptorEntry = []byte{
+	// 149 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0xcd, 0x2b, 0x29,
+	0xaa, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x57, 0xca, 0xe5, 0xe2, 0xf0, 0xc9, 0x4f, 0x77, 0x05,
+	0x89, 0x08, 0x89, 0x71, 0xb1, 0x15, 0xe7, 0x97, 0x16, 0x25, 0xa7, 0x4a, 0x30, 0x2a, 0x30, 0x6a,
+	0x70, 0x06, 0x41, 0x79, 0x42, 0xd2, 0x5c, 0x9c, 0x25, 0x99, 0xb9, 0xa9, 0xf1, 0x79, 0x89, 0x79,
+	0xf9, 0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0xcc, 0x41, 0x1c, 0x20, 0x01, 0xbf, 0xc4, 0xbc, 0x7c, 0x21,
+	0x21, 0x2e, 0x96, 0x9c, 0xcc, 0xbc, 0x54, 0x09, 0x66, 0x05, 0x46, 0x0d, 0x9e, 0x20, 0x30, 0x5b,
+	0x48, 0x82, 0x8b, 0xbd, 0x20, 0xb1, 0xa8, 0x24, 0x33, 0x31, 0x47, 0x82, 0x45, 0x81, 0x51, 0x83,
+	0x23, 0x08, 0xc6, 0x75, 0xe2, 0x39, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x07, 0x8f,
+	0xe4, 0x18, 0x93, 0xd8, 0xc0, 0x6e, 0x30, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0x2d, 0x24, 0x5a,
+	0xd4, 0x92, 0x00, 0x00, 0x00,
+}

+ 8 - 0
api/types/plugins/logdriver/entry.proto

@@ -0,0 +1,8 @@
+syntax = "proto3";
+
+message LogEntry {
+	string source = 1;
+	int64 time_nano = 2;
+	bytes line = 3;
+	bool partial = 4;
+}

+ 3 - 0
api/types/plugins/logdriver/gen.go

@@ -0,0 +1,3 @@
+//go:generate protoc --gogofast_out=import_path=github.com/docker/docker/api/types/plugins/logdriver:. entry.proto
+
+package logdriver

+ 87 - 0
api/types/plugins/logdriver/io.go

@@ -0,0 +1,87 @@
+package logdriver
+
+import (
+	"encoding/binary"
+	"io"
+)
+
+const binaryEncodeLen = 4
+
+// LogEntryEncoder encodes a LogEntry to a protobuf stream
+// The stream should look like:
+//
+// [uint32 binary encoded message size][protobuf message]
+//
+// To decode an entry, read the first 4 bytes to get the size of the entry,
+// then read `size` bytes from the stream.
+type LogEntryEncoder interface {
+	Encode(*LogEntry) error
+}
+
+// NewLogEntryEncoder creates a protobuf stream encoder for log entries.
+// This is used to write out  log entries to a stream.
+func NewLogEntryEncoder(w io.Writer) LogEntryEncoder {
+	return &logEntryEncoder{
+		w:   w,
+		buf: make([]byte, 1024),
+	}
+}
+
+type logEntryEncoder struct {
+	buf []byte
+	w   io.Writer
+}
+
+func (e *logEntryEncoder) Encode(l *LogEntry) error {
+	n := l.Size()
+
+	total := n + binaryEncodeLen
+	if total > len(e.buf) {
+		e.buf = make([]byte, total)
+	}
+	binary.BigEndian.PutUint32(e.buf, uint32(n))
+
+	if _, err := l.MarshalTo(e.buf[binaryEncodeLen:]); err != nil {
+		return err
+	}
+	_, err := e.w.Write(e.buf[:total])
+	return err
+}
+
+// LogEntryDecoder decodes log entries from a stream
+// It is expected that the wire format is as defined by LogEntryEncoder.
+type LogEntryDecoder interface {
+	Decode(*LogEntry) error
+}
+
+// NewLogEntryDecoder creates a new stream decoder for log entries
+func NewLogEntryDecoder(r io.Reader) LogEntryDecoder {
+	return &logEntryDecoder{
+		lenBuf: make([]byte, binaryEncodeLen),
+		buf:    make([]byte, 1024),
+		r:      r,
+	}
+}
+
+type logEntryDecoder struct {
+	r      io.Reader
+	lenBuf []byte
+	buf    []byte
+}
+
+func (d *logEntryDecoder) Decode(l *LogEntry) error {
+	_, err := io.ReadFull(d.r, d.lenBuf)
+	if err != nil {
+		return err
+	}
+
+	size := int(binary.BigEndian.Uint32(d.lenBuf))
+	if len(d.buf) < size {
+		d.buf = make([]byte, size)
+	}
+
+	if _, err := io.ReadFull(d.r, d.buf[:size]); err != nil {
+		return err
+	}
+	return l.Unmarshal(d.buf[:size])
+}

+ 2 - 0
daemon/daemon.go

@@ -27,6 +27,7 @@ import (
 	"github.com/docker/docker/daemon/discovery"
 	"github.com/docker/docker/daemon/discovery"
 	"github.com/docker/docker/daemon/events"
 	"github.com/docker/docker/daemon/events"
 	"github.com/docker/docker/daemon/exec"
 	"github.com/docker/docker/daemon/exec"
+	"github.com/docker/docker/daemon/logger"
 	// register graph drivers
 	// register graph drivers
 	_ "github.com/docker/docker/daemon/graphdriver/register"
 	_ "github.com/docker/docker/daemon/graphdriver/register"
 	"github.com/docker/docker/daemon/initlayer"
 	"github.com/docker/docker/daemon/initlayer"
@@ -586,6 +587,7 @@ func NewDaemon(config *config.Config, registryService registry.Service, containe
 
 
 	d.RegistryService = registryService
 	d.RegistryService = registryService
 	d.PluginStore = pluginStore
 	d.PluginStore = pluginStore
+	logger.RegisterPluginGetter(d.PluginStore)
 
 
 	// Plugin system initialization should happen before restore. Do not change order.
 	// Plugin system initialization should happen before restore. Do not change order.
 	d.pluginManager, err = plugin.NewManager(plugin.ManagerConfig{
 	d.pluginManager, err = plugin.NewManager(plugin.ManagerConfig{

+ 135 - 0
daemon/logger/adapter.go

@@ -0,0 +1,135 @@
+package logger
+
+import (
+	"io"
+	"os"
+	"sync"
+	"time"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/docker/docker/api/types/plugins/logdriver"
+	"github.com/docker/docker/pkg/plugingetter"
+	"github.com/pkg/errors"
+)
+
+// pluginAdapter takes a plugin and implements the Logger interface for logger
+// instances
+type pluginAdapter struct {
+	driverName   string
+	id           string
+	plugin       logPlugin
+	fifoPath     string
+	capabilities Capability
+	logInfo      Info
+
+	// synchronize access to the log stream and shared buffer
+	mu     sync.Mutex
+	enc    logdriver.LogEntryEncoder
+	stream io.WriteCloser
+	// buf is shared for each `Log()` call to reduce allocations.
+	// buf must be protected by mutex
+	buf logdriver.LogEntry
+}
+
+func (a *pluginAdapter) Log(msg *Message) error {
+	a.mu.Lock()
+
+	a.buf.Line = msg.Line
+	a.buf.TimeNano = msg.Timestamp.UnixNano()
+	a.buf.Partial = msg.Partial
+	a.buf.Source = msg.Source
+
+	err := a.enc.Encode(&a.buf)
+	a.buf.Reset()
+
+	a.mu.Unlock()
+
+	PutMessage(msg)
+	return err
+}
+
+func (a *pluginAdapter) Name() string {
+	return a.driverName
+}
+
+func (a *pluginAdapter) Close() error {
+	a.mu.Lock()
+	defer a.mu.Unlock()
+
+	if err := a.plugin.StopLogging(a.fifoPath); err != nil {
+		return err
+	}
+
+	if err := a.stream.Close(); err != nil {
+		logrus.WithError(err).Error("error closing plugin fifo")
+	}
+	if err := os.Remove(a.fifoPath); err != nil && !os.IsNotExist(err) {
+		logrus.WithError(err).Error("error cleaning up plugin fifo")
+	}
+
+	// may be nil, especially for unit tests
+	if pluginGetter != nil {
+		pluginGetter.Get(a.Name(), extName, plugingetter.Release)
+	}
+	return nil
+}
+
+type pluginAdapterWithRead struct {
+	*pluginAdapter
+}
+
+func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
+	watcher := NewLogWatcher()
+
+	go func() {
+		defer close(watcher.Msg)
+		stream, err := a.plugin.ReadLogs(a.logInfo, config)
+		if err != nil {
+			watcher.Err <- errors.Wrap(err, "error getting log reader")
+			return
+		}
+		defer stream.Close()
+
+		dec := logdriver.NewLogEntryDecoder(stream)
+		for {
+			select {
+			case <-watcher.WatchClose():
+				return
+			default:
+			}
+
+			var buf logdriver.LogEntry
+			if err := dec.Decode(&buf); err != nil {
+				if err == io.EOF {
+					return
+				}
+				select {
+				case watcher.Err <- errors.Wrap(err, "error decoding log message"):
+				case <-watcher.WatchClose():
+				}
+				return
+			}
+
+			msg := &Message{
+				Timestamp: time.Unix(0, buf.TimeNano),
+				Line:      buf.Line,
+				Source:    buf.Source,
+			}
+
+			// plugin should handle this, but check just in case
+			if !config.Since.IsZero() && msg.Timestamp.Before(config.Since) {
+				continue
+			}
+
+			select {
+			case watcher.Msg <- msg:
+			case <-watcher.WatchClose():
+				// make sure the message we consumed is sent
+				watcher.Msg <- msg
+				return
+			}
+		}
+	}()
+
+	return watcher
+}

+ 208 - 0
daemon/logger/adapter_test.go

@@ -0,0 +1,208 @@
+package logger
+
+import (
+	"bytes"
+	"encoding/binary"
+	"io"
+	"io/ioutil"
+	"os"
+	"runtime"
+	"testing"
+	"time"
+
+	"github.com/docker/docker/api/types/plugins/logdriver"
+	protoio "github.com/gogo/protobuf/io"
+)
+
+// mockLoggingPlugin implements the loggingPlugin interface for testing purposes
+// it only supports a single log stream
+type mockLoggingPlugin struct {
+	inStream io.ReadCloser
+	f        *os.File
+	closed   chan struct{}
+	t        *testing.T
+}
+
+func (l *mockLoggingPlugin) StartLogging(file string, info Info) error {
+	go func() {
+		io.Copy(l.f, l.inStream)
+		close(l.closed)
+	}()
+	return nil
+}
+
+func (l *mockLoggingPlugin) StopLogging(file string) error {
+	l.inStream.Close()
+	l.f.Close()
+	os.Remove(l.f.Name())
+	return nil
+}
+
+func (l *mockLoggingPlugin) Capabilities() (cap Capability, err error) {
+	return Capability{ReadLogs: true}, nil
+}
+
+func (l *mockLoggingPlugin) ReadLogs(info Info, config ReadConfig) (io.ReadCloser, error) {
+	r, w := io.Pipe()
+	f, err := os.Open(l.f.Name())
+	if err != nil {
+		return nil, err
+	}
+	go func() {
+		defer f.Close()
+		dec := protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6)
+		enc := logdriver.NewLogEntryEncoder(w)
+
+		for {
+			select {
+			case <-l.closed:
+				w.Close()
+				return
+			default:
+			}
+
+			var msg logdriver.LogEntry
+			if err := dec.ReadMsg(&msg); err != nil {
+				if err == io.EOF {
+					if !config.Follow {
+						w.Close()
+						return
+					}
+					dec = protoio.NewUint32DelimitedReader(f, binary.BigEndian, 1e6)
+					continue
+				}
+
+				l.t.Fatal(err)
+				continue
+			}
+
+			if err := enc.Encode(&msg); err != nil {
+				w.CloseWithError(err)
+				return
+			}
+		}
+	}()
+
+	return r, nil
+}
+
+func newMockPluginAdapter(t *testing.T) Logger {
+	r, w := io.Pipe()
+	f, err := ioutil.TempFile("", "mock-plugin-adapter")
+	if err != nil {
+		t.Fatal(err)
+	}
+	enc := logdriver.NewLogEntryEncoder(w)
+	a := &pluginAdapterWithRead{
+		&pluginAdapter{
+			plugin: &mockLoggingPlugin{
+				inStream: r,
+				f:        f,
+				closed:   make(chan struct{}),
+				t:        t,
+			},
+			stream: w,
+			enc:    enc,
+		},
+	}
+	a.plugin.StartLogging("", Info{})
+	return a
+}
+
+func TestAdapterReadLogs(t *testing.T) {
+	l := newMockPluginAdapter(t)
+
+	testMsg := []Message{
+		{Line: []byte("Are you the keymaker?"), Timestamp: time.Now()},
+		{Line: []byte("Follow the white rabbit"), Timestamp: time.Now()},
+	}
+	for _, msg := range testMsg {
+		m := msg.copy()
+		if err := l.Log(m); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	lr, ok := l.(LogReader)
+	if !ok {
+		t.Fatal("expected log reader")
+	}
+
+	lw := lr.ReadLogs(ReadConfig{})
+
+	for _, x := range testMsg {
+		select {
+		case msg := <-lw.Msg:
+			testMessageEqual(t, &x, msg)
+		case <-time.After(10 * time.Millisecond):
+			t.Fatal("timeout reading logs")
+		}
+	}
+
+	select {
+	case _, ok := <-lw.Msg:
+		if ok {
+			t.Fatal("expected message channel to be closed")
+		}
+	case <-time.After(10 * time.Second):
+		t.Fatal("timeout waiting for message channel to close")
+
+	}
+	lw.Close()
+
+	lw = lr.ReadLogs(ReadConfig{Follow: true})
+	for _, x := range testMsg {
+		select {
+		case msg := <-lw.Msg:
+			testMessageEqual(t, &x, msg)
+		case <-time.After(10 * time.Second):
+			t.Fatal("timeout reading logs")
+		}
+	}
+
+	x := Message{Line: []byte("Too infinity and beyond!"), Timestamp: time.Now()}
+
+	if err := l.Log(x.copy()); err != nil {
+		t.Fatal(err)
+	}
+
+	select {
+	case msg, ok := <-lw.Msg:
+		if !ok {
+			t.Fatal("message channel unexpectedly closed")
+		}
+		testMessageEqual(t, &x, msg)
+	case <-time.After(10 * time.Second):
+		t.Fatal("timeout reading logs")
+	}
+
+	l.Close()
+	select {
+	case msg, ok := <-lw.Msg:
+		if ok {
+			t.Fatal("expected message channel to be closed")
+		}
+		if msg != nil {
+			t.Fatal("expected nil message")
+		}
+	case <-time.After(10 * time.Second):
+		t.Fatal("timeout waiting for logger to close")
+	}
+}
+
+func testMessageEqual(t *testing.T, a, b *Message) {
+	_, _, n, _ := runtime.Caller(1)
+	errFmt := "line %d: expected same messages:\nwant: %+v\nhave: %+v"
+
+	if !bytes.Equal(a.Line, b.Line) {
+		t.Fatalf(errFmt, n, *a, *b)
+	}
+
+	if a.Timestamp.UnixNano() != b.Timestamp.UnixNano() {
+		t.Fatalf(errFmt, n, *a, *b)
+	}
+
+	if a.Source != b.Source {
+		t.Fatalf(errFmt, n, *a, *b)
+	}
+}

+ 13 - 3
daemon/logger/factory.go

@@ -5,6 +5,7 @@ import (
 	"sync"
 	"sync"
 
 
 	containertypes "github.com/docker/docker/api/types/container"
 	containertypes "github.com/docker/docker/api/types/container"
+	"github.com/docker/docker/pkg/plugingetter"
 	units "github.com/docker/go-units"
 	units "github.com/docker/go-units"
 	"github.com/pkg/errors"
 	"github.com/pkg/errors"
 )
 )
@@ -37,6 +38,13 @@ func (lf *logdriverFactory) driverRegistered(name string) bool {
 	lf.m.Lock()
 	lf.m.Lock()
 	_, ok := lf.registry[name]
 	_, ok := lf.registry[name]
 	lf.m.Unlock()
 	lf.m.Unlock()
+	if !ok {
+		if pluginGetter != nil { // this can be nil when the init functions are running
+			if l, _ := getPlugin(name, plugingetter.Lookup); l != nil {
+				return true
+			}
+		}
+	}
 	return ok
 	return ok
 }
 }
 
 
@@ -56,10 +64,12 @@ func (lf *logdriverFactory) get(name string) (Creator, error) {
 	defer lf.m.Unlock()
 	defer lf.m.Unlock()
 
 
 	c, ok := lf.registry[name]
 	c, ok := lf.registry[name]
-	if !ok {
-		return c, fmt.Errorf("logger: no log driver named '%s' is registered", name)
+	if ok {
+		return c, nil
 	}
 	}
-	return c, nil
+
+	c, err := getPlugin(name, plugingetter.Acquire)
+	return c, errors.Wrapf(err, "logger: no log driver named '%s' is registered", name)
 }
 }
 
 
 func (lf *logdriverFactory) getLogOptValidator(name string) LogOptValidator {
 func (lf *logdriverFactory) getLogOptValidator(name string) LogOptValidator {

+ 8 - 0
daemon/logger/logger.go

@@ -126,3 +126,11 @@ func (w *LogWatcher) Close() {
 func (w *LogWatcher) WatchClose() <-chan struct{} {
 func (w *LogWatcher) WatchClose() <-chan struct{} {
 	return w.closeNotifier
 	return w.closeNotifier
 }
 }
+
+// Capability defines the list of capabilties that a driver can implement
+// These capabilities are not required to be a logging driver, however do
+// determine how a logging driver can be used
+type Capability struct {
+	// Determines if a log driver can read back logs
+	ReadLogs bool
+}

+ 19 - 0
daemon/logger/logger_test.go

@@ -0,0 +1,19 @@
+package logger
+
+func (m *Message) copy() *Message {
+	msg := &Message{
+		Source:    m.Source,
+		Partial:   m.Partial,
+		Timestamp: m.Timestamp,
+	}
+
+	if m.Attrs != nil {
+		msg.Attrs = make(map[string]string, len(m.Attrs))
+		for k, v := range m.Attrs {
+			msg.Attrs[k] = v
+		}
+	}
+
+	msg.Line = append(make([]byte, 0, len(m.Line)), m.Line...)
+	return msg
+}

+ 89 - 0
daemon/logger/plugin.go

@@ -0,0 +1,89 @@
+package logger
+
+import (
+	"fmt"
+	"io"
+	"os"
+	"path/filepath"
+	"strings"
+
+	"github.com/docker/docker/api/types/plugins/logdriver"
+	getter "github.com/docker/docker/pkg/plugingetter"
+	"github.com/docker/docker/pkg/stringid"
+	"github.com/pkg/errors"
+)
+
+var pluginGetter getter.PluginGetter
+
+const extName = "LogDriver"
+
+// logPlugin defines the available functions that logging plugins must implement.
+type logPlugin interface {
+	StartLogging(streamPath string, info Info) (err error)
+	StopLogging(streamPath string) (err error)
+	Capabilities() (cap Capability, err error)
+	ReadLogs(info Info, config ReadConfig) (stream io.ReadCloser, err error)
+}
+
+// RegisterPluginGetter sets the plugingetter
+func RegisterPluginGetter(plugingetter getter.PluginGetter) {
+	pluginGetter = plugingetter
+}
+
+// GetDriver returns a logging driver by its name.
+// If the driver is empty, it looks for the local driver.
+func getPlugin(name string, mode int) (Creator, error) {
+	p, err := pluginGetter.Get(name, extName, mode)
+	if err != nil {
+		return nil, fmt.Errorf("error looking up logging plugin %s: %v", name, err)
+	}
+
+	d := &logPluginProxy{p.Client()}
+	return makePluginCreator(name, d, p.BasePath()), nil
+}
+
+func makePluginCreator(name string, l *logPluginProxy, basePath string) Creator {
+	return func(logCtx Info) (logger Logger, err error) {
+		defer func() {
+			if err != nil {
+				pluginGetter.Get(name, extName, getter.Release)
+			}
+		}()
+		root := filepath.Join(basePath, "run", "docker", "logging")
+		if err := os.MkdirAll(root, 0700); err != nil {
+			return nil, err
+		}
+
+		id := stringid.GenerateNonCryptoID()
+		a := &pluginAdapter{
+			driverName: name,
+			id:         id,
+			plugin:     l,
+			fifoPath:   filepath.Join(root, id),
+			logInfo:    logCtx,
+		}
+
+		cap, err := a.plugin.Capabilities()
+		if err == nil {
+			a.capabilities = cap
+		}
+
+		stream, err := openPluginStream(a)
+		if err != nil {
+			return nil, err
+		}
+
+		a.stream = stream
+		a.enc = logdriver.NewLogEntryEncoder(a.stream)
+
+		if err := l.StartLogging(strings.TrimPrefix(a.fifoPath, basePath), logCtx); err != nil {
+			return nil, errors.Wrapf(err, "error creating logger")
+		}
+
+		if cap.ReadLogs {
+			return &pluginAdapterWithRead{a}, nil
+		}
+
+		return a, nil
+	}
+}

+ 20 - 0
daemon/logger/plugin_unix.go

@@ -0,0 +1,20 @@
+// +build linux solaris freebsd
+
+package logger
+
+import (
+	"context"
+	"io"
+
+	"github.com/pkg/errors"
+	"github.com/tonistiigi/fifo"
+	"golang.org/x/sys/unix"
+)
+
+func openPluginStream(a *pluginAdapter) (io.WriteCloser, error) {
+	f, err := fifo.OpenFifo(context.Background(), a.fifoPath, unix.O_WRONLY|unix.O_CREAT|unix.O_NONBLOCK, 0700)
+	if err != nil {
+		return nil, errors.Wrapf(err, "error creating i/o pipe for log plugin: %s", a.Name())
+	}
+	return f, nil
+}

+ 12 - 0
daemon/logger/plugin_unsupported.go

@@ -0,0 +1,12 @@
+// +build !linux,!solaris,!freebsd
+
+package logger
+
+import (
+	"errors"
+	"io"
+)
+
+func openPluginStream(a *pluginAdapter) (io.WriteCloser, error) {
+	return nil, errors.New("log plugin not supported")
+}

+ 107 - 0
daemon/logger/proxy.go

@@ -0,0 +1,107 @@
+package logger
+
+import (
+	"errors"
+	"io"
+)
+
+type client interface {
+	Call(string, interface{}, interface{}) error
+	Stream(string, interface{}) (io.ReadCloser, error)
+}
+
+type logPluginProxy struct {
+	client
+}
+
+type logPluginProxyStartLoggingRequest struct {
+	File string
+	Info Info
+}
+
+type logPluginProxyStartLoggingResponse struct {
+	Err string
+}
+
+func (pp *logPluginProxy) StartLogging(file string, info Info) (err error) {
+	var (
+		req logPluginProxyStartLoggingRequest
+		ret logPluginProxyStartLoggingResponse
+	)
+
+	req.File = file
+	req.Info = info
+	if err = pp.Call("LogDriver.StartLogging", req, &ret); err != nil {
+		return
+	}
+
+	if ret.Err != "" {
+		err = errors.New(ret.Err)
+	}
+
+	return
+}
+
+type logPluginProxyStopLoggingRequest struct {
+	File string
+}
+
+type logPluginProxyStopLoggingResponse struct {
+	Err string
+}
+
+func (pp *logPluginProxy) StopLogging(file string) (err error) {
+	var (
+		req logPluginProxyStopLoggingRequest
+		ret logPluginProxyStopLoggingResponse
+	)
+
+	req.File = file
+	if err = pp.Call("LogDriver.StopLogging", req, &ret); err != nil {
+		return
+	}
+
+	if ret.Err != "" {
+		err = errors.New(ret.Err)
+	}
+
+	return
+}
+
+type logPluginProxyCapabilitiesResponse struct {
+	Cap Capability
+	Err string
+}
+
+func (pp *logPluginProxy) Capabilities() (cap Capability, err error) {
+	var (
+		ret logPluginProxyCapabilitiesResponse
+	)
+
+	if err = pp.Call("LogDriver.Capabilities", nil, &ret); err != nil {
+		return
+	}
+
+	cap = ret.Cap
+
+	if ret.Err != "" {
+		err = errors.New(ret.Err)
+	}
+
+	return
+}
+
+type logPluginProxyReadLogsRequest struct {
+	Info   Info
+	Config ReadConfig
+}
+
+func (pp *logPluginProxy) ReadLogs(info Info, config ReadConfig) (stream io.ReadCloser, err error) {
+	var (
+		req logPluginProxyReadLogsRequest
+	)
+
+	req.Info = info
+	req.Config = config
+	return pp.Stream("LogDriver.ReadLogs", req)
+}

+ 2 - 0
docs/extend/config.md

@@ -59,6 +59,8 @@ Config provides the base accessible fields for working with V0 plugin format
 
 
         - **docker.authz/1.0**
         - **docker.authz/1.0**
 
 
+        - **docker.logdriver/1.0**
+
     - **`socket`** *string*
     - **`socket`** *string*
 
 
       socket is the name of the socket the engine should use to communicate with the plugins.
       socket is the name of the socket the engine should use to communicate with the plugins.

+ 220 - 0
docs/extend/plugins_logging.md

@@ -0,0 +1,220 @@
+---
+title: "Docker log driver plugins"
+description: "Log driver plugins."
+keywords: "Examples, Usage, plugins, docker, documentation, user guide, logging"
+---
+
+<!-- This file is maintained within the docker/docker Github
+     repository at https://github.com/docker/docker/. Make all
+     pull requests against that repo. If you see this file in
+     another repository, consider it read-only there, as it will
+     periodically be overwritten by the definitive file. Pull
+     requests which include edits to this file in other repositories
+     will be rejected.
+-->
+
+# Logging driver plugins
+
+This document describes logging driver plugins for Docker.
+
+Logging drivers enables users to forward container logs to another service for
+processing. Docker includes several logging drivers as built-ins, however can
+never hope to support all use-cases with built-in drivers. Plugins allow Docker
+to support a wide range of logging services without requiring to embed client
+libraries for these services in the main Docker codebase. See the
+[plugin documentation](legacy_plugins.md) for more information.
+
+## Create a logging plugin
+
+The main interface for logging plugins uses the same JSON+HTTP RPC protocol used
+by other plugin types. See the
+[example](https://github.com/cpuguy83/docker-log-driver-test) plugin for a
+reference implementation of a logging plugin. The example wraps the built-in
+`jsonfilelog` log driver.
+
+## LogDriver protocol
+
+Logging plugins must register as a `LogDriver` during plugin activation. Once
+activated users can specify the plugin as a log driver.
+
+There are two HTTP endpoints that logging plugins must implement:
+
+### `/LogDriver.StartLogging`
+
+Signals to the plugin that a container is starting that the plugin should start
+receiving logs for.
+
+Logs will be streamed over the defined file in the request. On Linux this file
+is a FIFO. Logging plugins are not currently supported on Windows.
+
+**Request**:
+```json
+{
+		"File": "/path/to/file/stream",
+		"Info": {
+			"ContainerID": "123456"
+		}
+}
+```
+
+`File` is the path to the log stream that needs to be consumed. Each call to
+`StartLogging` should provide a different file path, even if it's a container
+that the plugin has already received logs for prior. The file is created by
+docker with a randomly generated name.
+
+`Info` is details about the container that's being logged. This is fairly
+free-form, but is defined by the following struct definition:
+
+```go
+type Info struct {
+	Config              map[string]string
+	ContainerID         string
+	ContainerName       string
+	ContainerEntrypoint string
+	ContainerArgs       []string
+	ContainerImageID    string
+	ContainerImageName  string
+	ContainerCreated    time.Time
+	ContainerEnv        []string
+	ContainerLabels     map[string]string
+	LogPath             string
+	DaemonName          string
+}
+```
+
+
+`ContainerID` will always be supplied with this struct, but other fields may be
+empty or missing.
+
+**Response**
+```json
+{
+	"Err": ""
+}
+```
+
+If an error occurred during this request, add an error message to the `Err` field
+in the response. If no error then you can either send an empty response (`{}`)
+or an empty value for the `Err` field.
+
+The driver should at this point be consuming log messages from the passed in file.
+If messages are unconsumed, it may cause the contaier to block while trying to
+write to its stdio streams.
+
+Log stream messages are encoded as protocol buffers. The protobuf definitions are
+in the
+[docker repository](https://github.com/docker/docker/blob/master/api/types/plugins/logdriver/entry.proto).
+
+Since protocol buffers are not self-delimited you must decode them from the stream
+using the following stream format:
+
+```
+[size][message]
+```
+
+Where `size` is a 4-byte big endian binary encoded uint32. `size` in this case
+defines the size of the next message. `message` is the actual log entry.
+
+A reference golang implementation of a stream encoder/decoder can be found
+[here](https://github.com/docker/docker/blob/master/api/types/plugins/logdriver/io.go)
+
+### `/LogDriver.StopLogging`
+
+Signals to the plugin to stop collecting logs from the defined file.
+Once a response is received, the file will be removed by Docker. You must make
+sure to collect all logs on the stream before responding to this request or risk
+losing log data.
+
+Requests on this endpoint does not mean that the container has been removed
+only that it has stopped.
+
+**Request**:
+```json
+{
+		"File": "/path/to/file/stream"
+}
+```
+
+**Response**:
+```json
+{
+	"Err": ""
+}
+```
+
+If an error occurred during this request, add an error message to the `Err` field
+in the response. If no error then you can either send an empty response (`{}`)
+or an empty value for the `Err` field.
+
+## Optional endpoints
+
+Logging plugins can implement two extra logging endpoints:
+
+### `/LogDriver.Capabilities`
+
+Defines the capabilities of the log driver. You must implement this endpoint for
+Docker to be able to take advantage of any of the defined capabilities.
+
+**Request**:
+```json
+{}
+```
+
+**Response**:
+```json
+{
+	"ReadLogs": true
+}
+```
+
+Supported capabilities:
+
+- `ReadLogs` - this tells Docker that the plugin is capable of reading back logs
+to clients. Plugins that report that they support `ReadLogs` must implement the
+`/LogDriver.ReadLogs` endpoint
+
+### `/LogDriver.ReadLogs`
+
+Reads back logs to the client. This is used when `docker logs <container>` is
+called.
+
+In order for Docker to use this endpoint, the plugin must specify as much when
+`/LogDriver.Capabilities` is called.
+
+
+**Request**:
+```json
+{
+	"ReadConfig": {},
+	"Info": {
+		"ContainerID": "123456"
+	}
+}
+```
+
+`ReadConfig` is the list of options for reading, it is defined with the following
+golang struct:
+
+```go
+type ReadConfig struct {
+	Since  time.Time
+	Tail   int
+	Follow bool
+}
+```
+
+- `Since` defines the oldest log that should be sent.
+- `Tail` defines the number of lines to read (e.g. like the command `tail -n 10`)
+- `Follow` signals that the client wants to stay attached to receive new log messages
+as they come in once the existing logs have been read.
+
+`Info` is the same type defined in `/LogDriver.StartLogging`. It should be used
+to determine what set of logs to read.
+
+**Response**:
+```
+{{ log stream }}
+```
+
+The response should be the encoded log message using the same format as the
+messages that the plugin consumed from Docker.

+ 1 - 1
hack/validate/lint

@@ -4,7 +4,7 @@ export SCRIPTDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
 source "${SCRIPTDIR}/.validate"
 source "${SCRIPTDIR}/.validate"
 
 
 IFS=$'\n'
 IFS=$'\n'
-files=( $(validate_diff --diff-filter=ACMR --name-only -- '*.go' | grep -v '^vendor/' | grep -v '^api/types/container/' | grep -v '^cli/compose/schema/bindata.go' || true) )
+files=( $(validate_diff --diff-filter=ACMR --name-only -- '*.go' | grep -v '^vendor/' | grep -v '^api/types/container/' | grep -v '^cli/compose/schema/bindata.go' | grep -v '^api/types/plugins/logdriver/entry.pb.go' || true) )
 unset IFS
 unset IFS
 
 
 errors=()
 errors=()

+ 27 - 0
integration-cli/docker_cli_plugins_logdriver_test.go

@@ -0,0 +1,27 @@
+package main
+
+import (
+	"strings"
+
+	"github.com/docker/docker/integration-cli/checker"
+	"github.com/go-check/check"
+)
+
+func (s *DockerSuite) TestPluginLogDriver(c *check.C) {
+	testRequires(c, IsAmd64, DaemonIsLinux)
+
+	pluginName := "cpuguy83/docker-logdriver-test:latest"
+
+	dockerCmd(c, "plugin", "install", pluginName)
+	dockerCmd(c, "run", "--log-driver", pluginName, "--name=test", "busybox", "echo", "hello")
+	out, _ := dockerCmd(c, "logs", "test")
+	c.Assert(strings.TrimSpace(out), checker.Equals, "hello")
+
+	dockerCmd(c, "start", "-a", "test")
+	out, _ = dockerCmd(c, "logs", "test")
+	c.Assert(strings.TrimSpace(out), checker.Equals, "hello\nhello")
+
+	dockerCmd(c, "rm", "test")
+	dockerCmd(c, "plugin", "disable", pluginName)
+	dockerCmd(c, "plugin", "rm", pluginName)
+}

+ 102 - 0
vendor/github.com/gogo/protobuf/io/full.go

@@ -0,0 +1,102 @@
+// Protocol Buffers for Go with Gadgets
+//
+// Copyright (c) 2013, The GoGo Authors. All rights reserved.
+// http://github.com/gogo/protobuf
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+//     * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+package io
+
+import (
+	"github.com/gogo/protobuf/proto"
+	"io"
+)
+
+func NewFullWriter(w io.Writer) WriteCloser {
+	return &fullWriter{w, nil}
+}
+
+type fullWriter struct {
+	w      io.Writer
+	buffer []byte
+}
+
+func (this *fullWriter) WriteMsg(msg proto.Message) (err error) {
+	var data []byte
+	if m, ok := msg.(marshaler); ok {
+		n, ok := getSize(m)
+		if !ok {
+			data, err = proto.Marshal(msg)
+			if err != nil {
+				return err
+			}
+		}
+		if n >= len(this.buffer) {
+			this.buffer = make([]byte, n)
+		}
+		_, err = m.MarshalTo(this.buffer)
+		if err != nil {
+			return err
+		}
+		data = this.buffer[:n]
+	} else {
+		data, err = proto.Marshal(msg)
+		if err != nil {
+			return err
+		}
+	}
+	_, err = this.w.Write(data)
+	return err
+}
+
+func (this *fullWriter) Close() error {
+	if closer, ok := this.w.(io.Closer); ok {
+		return closer.Close()
+	}
+	return nil
+}
+
+type fullReader struct {
+	r   io.Reader
+	buf []byte
+}
+
+func NewFullReader(r io.Reader, maxSize int) ReadCloser {
+	return &fullReader{r, make([]byte, maxSize)}
+}
+
+func (this *fullReader) ReadMsg(msg proto.Message) error {
+	length, err := this.r.Read(this.buf)
+	if err != nil {
+		return err
+	}
+	return proto.Unmarshal(this.buf[:length], msg)
+}
+
+func (this *fullReader) Close() error {
+	if closer, ok := this.r.(io.Closer); ok {
+		return closer.Close()
+	}
+	return nil
+}

+ 70 - 0
vendor/github.com/gogo/protobuf/io/io.go

@@ -0,0 +1,70 @@
+// Protocol Buffers for Go with Gadgets
+//
+// Copyright (c) 2013, The GoGo Authors. All rights reserved.
+// http://github.com/gogo/protobuf
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+//     * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+package io
+
+import (
+	"github.com/gogo/protobuf/proto"
+	"io"
+)
+
+type Writer interface {
+	WriteMsg(proto.Message) error
+}
+
+type WriteCloser interface {
+	Writer
+	io.Closer
+}
+
+type Reader interface {
+	ReadMsg(msg proto.Message) error
+}
+
+type ReadCloser interface {
+	Reader
+	io.Closer
+}
+
+type marshaler interface {
+	MarshalTo(data []byte) (n int, err error)
+}
+
+func getSize(v interface{}) (int, bool) {
+	if sz, ok := v.(interface {
+		Size() (n int)
+	}); ok {
+		return sz.Size(), true
+	} else if sz, ok := v.(interface {
+		ProtoSize() (n int)
+	}); ok {
+		return sz.ProtoSize(), true
+	} else {
+		return 0, false
+	}
+}

+ 126 - 0
vendor/github.com/gogo/protobuf/io/uint32.go

@@ -0,0 +1,126 @@
+// Protocol Buffers for Go with Gadgets
+//
+// Copyright (c) 2013, The GoGo Authors. All rights reserved.
+// http://github.com/gogo/protobuf
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+//     * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+package io
+
+import (
+	"encoding/binary"
+	"github.com/gogo/protobuf/proto"
+	"io"
+)
+
+func NewUint32DelimitedWriter(w io.Writer, byteOrder binary.ByteOrder) WriteCloser {
+	return &uint32Writer{w, byteOrder, nil}
+}
+
+func NewSizeUint32DelimitedWriter(w io.Writer, byteOrder binary.ByteOrder, size int) WriteCloser {
+	return &uint32Writer{w, byteOrder, make([]byte, size)}
+}
+
+type uint32Writer struct {
+	w         io.Writer
+	byteOrder binary.ByteOrder
+	buffer    []byte
+}
+
+func (this *uint32Writer) WriteMsg(msg proto.Message) (err error) {
+	var data []byte
+	if m, ok := msg.(marshaler); ok {
+		n, ok := getSize(m)
+		if !ok {
+			data, err = proto.Marshal(msg)
+			if err != nil {
+				return err
+			}
+		}
+		if n >= len(this.buffer) {
+			this.buffer = make([]byte, n)
+		}
+		_, err = m.MarshalTo(this.buffer)
+		if err != nil {
+			return err
+		}
+		data = this.buffer[:n]
+	} else {
+		data, err = proto.Marshal(msg)
+		if err != nil {
+			return err
+		}
+	}
+	length := uint32(len(data))
+	if err = binary.Write(this.w, this.byteOrder, &length); err != nil {
+		return err
+	}
+	_, err = this.w.Write(data)
+	return err
+}
+
+func (this *uint32Writer) Close() error {
+	if closer, ok := this.w.(io.Closer); ok {
+		return closer.Close()
+	}
+	return nil
+}
+
+type uint32Reader struct {
+	r         io.Reader
+	byteOrder binary.ByteOrder
+	lenBuf    []byte
+	buf       []byte
+	maxSize   int
+}
+
+func NewUint32DelimitedReader(r io.Reader, byteOrder binary.ByteOrder, maxSize int) ReadCloser {
+	return &uint32Reader{r, byteOrder, make([]byte, 4), nil, maxSize}
+}
+
+func (this *uint32Reader) ReadMsg(msg proto.Message) error {
+	if _, err := io.ReadFull(this.r, this.lenBuf); err != nil {
+		return err
+	}
+	length32 := this.byteOrder.Uint32(this.lenBuf)
+	length := int(length32)
+	if length < 0 || length > this.maxSize {
+		return io.ErrShortBuffer
+	}
+	if length >= len(this.buf) {
+		this.buf = make([]byte, length)
+	}
+	_, err := io.ReadFull(this.r, this.buf[:length])
+	if err != nil {
+		return err
+	}
+	return proto.Unmarshal(this.buf[:length], msg)
+}
+
+func (this *uint32Reader) Close() error {
+	if closer, ok := this.r.(io.Closer); ok {
+		return closer.Close()
+	}
+	return nil
+}

+ 134 - 0
vendor/github.com/gogo/protobuf/io/varint.go

@@ -0,0 +1,134 @@
+// Protocol Buffers for Go with Gadgets
+//
+// Copyright (c) 2013, The GoGo Authors. All rights reserved.
+// http://github.com/gogo/protobuf
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+//     * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+package io
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"github.com/gogo/protobuf/proto"
+	"io"
+)
+
+var (
+	errSmallBuffer = errors.New("Buffer Too Small")
+	errLargeValue  = errors.New("Value is Larger than 64 bits")
+)
+
+func NewDelimitedWriter(w io.Writer) WriteCloser {
+	return &varintWriter{w, make([]byte, 10), nil}
+}
+
+type varintWriter struct {
+	w      io.Writer
+	lenBuf []byte
+	buffer []byte
+}
+
+func (this *varintWriter) WriteMsg(msg proto.Message) (err error) {
+	var data []byte
+	if m, ok := msg.(marshaler); ok {
+		n, ok := getSize(m)
+		if !ok {
+			data, err = proto.Marshal(msg)
+			if err != nil {
+				return err
+			}
+		}
+		if n >= len(this.buffer) {
+			this.buffer = make([]byte, n)
+		}
+		_, err = m.MarshalTo(this.buffer)
+		if err != nil {
+			return err
+		}
+		data = this.buffer[:n]
+	} else {
+		data, err = proto.Marshal(msg)
+		if err != nil {
+			return err
+		}
+	}
+	length := uint64(len(data))
+	n := binary.PutUvarint(this.lenBuf, length)
+	_, err = this.w.Write(this.lenBuf[:n])
+	if err != nil {
+		return err
+	}
+	_, err = this.w.Write(data)
+	return err
+}
+
+func (this *varintWriter) Close() error {
+	if closer, ok := this.w.(io.Closer); ok {
+		return closer.Close()
+	}
+	return nil
+}
+
+func NewDelimitedReader(r io.Reader, maxSize int) ReadCloser {
+	var closer io.Closer
+	if c, ok := r.(io.Closer); ok {
+		closer = c
+	}
+	return &varintReader{bufio.NewReader(r), nil, maxSize, closer}
+}
+
+type varintReader struct {
+	r       *bufio.Reader
+	buf     []byte
+	maxSize int
+	closer  io.Closer
+}
+
+func (this *varintReader) ReadMsg(msg proto.Message) error {
+	length64, err := binary.ReadUvarint(this.r)
+	if err != nil {
+		return err
+	}
+	length := int(length64)
+	if length < 0 || length > this.maxSize {
+		return io.ErrShortBuffer
+	}
+	if len(this.buf) < length {
+		this.buf = make([]byte, length)
+	}
+	buf := this.buf[:length]
+	if _, err := io.ReadFull(this.r, buf); err != nil {
+		return err
+	}
+	return proto.Unmarshal(buf, msg)
+}
+
+func (this *varintReader) Close() error {
+	if this.closer != nil {
+		return this.closer.Close()
+	}
+	return nil
+}