Browse Source

Add BytesPipe datastructure to ioutils

Signed-off-by: Alexander Morozov <lk4d4@docker.com>
Alexander Morozov 9 years ago
parent
commit
24310b5b4a
2 changed files with 163 additions and 0 deletions
  1. 82 0
      pkg/ioutils/bytespipe.go
  2. 81 0
      pkg/ioutils/bytespipe_test.go

+ 82 - 0
pkg/ioutils/bytespipe.go

@@ -0,0 +1,82 @@
+package ioutils
+
+const maxCap = 10 * 1e6
+
+// BytesPipe is io.ReadWriter which works similary to pipe(queue).
+// All written data could be read only once. Also BytesPipe trying to adjust
+// internal []byte slice to current needs, so there won't be overgrown buffer
+// after highload peak.
+// BytesPipe isn't goroutine-safe, caller must synchronize it if needed.
+type BytesPipe struct {
+	buf      []byte
+	lastRead int
+}
+
+// NewBytesPipe creates new BytesPipe, initialized by specified slice.
+// If buf is nil, then it will be initialized with slice which cap is 64.
+// buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf).
+func NewBytesPipe(buf []byte) *BytesPipe {
+	if cap(buf) == 0 {
+		buf = make([]byte, 0, 64)
+	}
+	return &BytesPipe{
+		buf: buf[:0],
+	}
+}
+
+func (bp *BytesPipe) grow(n int) {
+	if len(bp.buf)+n > cap(bp.buf) {
+		// not enough space
+		var buf []byte
+		remain := bp.len()
+		if remain+n <= cap(bp.buf)/2 {
+			// enough space in current buffer, just move data to head
+			copy(bp.buf, bp.buf[bp.lastRead:])
+			buf = bp.buf[:remain]
+		} else {
+			// reallocate buffer
+			buf = make([]byte, remain, 2*cap(bp.buf)+n)
+			copy(buf, bp.buf[bp.lastRead:])
+		}
+		bp.buf = buf
+		bp.lastRead = 0
+	}
+}
+
+// Write writes p to BytesPipe.
+// It can increase cap of internal []byte slice in a process of writing.
+func (bp *BytesPipe) Write(p []byte) (n int, err error) {
+	bp.grow(len(p))
+	bp.buf = append(bp.buf, p...)
+	return
+}
+
+func (bp *BytesPipe) len() int {
+	return len(bp.buf) - bp.lastRead
+}
+
+func (bp *BytesPipe) crop() {
+	// shortcut for empty buffer
+	if bp.lastRead == len(bp.buf) {
+		bp.lastRead = 0
+		bp.buf = bp.buf[:0]
+	}
+	r := bp.len()
+	// if we have too large buffer for too small data
+	if cap(bp.buf) > maxCap && r < cap(bp.buf)/10 {
+		copy(bp.buf, bp.buf[bp.lastRead:])
+		// will use same underlying slice until reach cap
+		bp.buf = bp.buf[:r : cap(bp.buf)/2]
+		bp.lastRead = 0
+	}
+}
+
+// Read reads bytes from BytesPipe.
+// Data could be read only once.
+// Internal []byte slice could be shrinked.
+func (bp *BytesPipe) Read(p []byte) (n int, err error) {
+	n = copy(p, bp.buf[bp.lastRead:])
+	bp.lastRead += n
+	bp.crop()
+	return
+}

+ 81 - 0
pkg/ioutils/bytespipe_test.go

@@ -0,0 +1,81 @@
+package ioutils
+
+import "testing"
+
+func TestBytesPipeRead(t *testing.T) {
+	buf := NewBytesPipe(nil)
+	buf.Write([]byte("12"))
+	buf.Write([]byte("34"))
+	buf.Write([]byte("56"))
+	buf.Write([]byte("78"))
+	buf.Write([]byte("90"))
+	rd := make([]byte, 4)
+	n, err := buf.Read(rd)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if n != 4 {
+		t.Fatalf("Wrong number of bytes read: %d, should be %d", n, 4)
+	}
+	if string(rd) != "1234" {
+		t.Fatalf("Read %s, but must be %s", rd, "1234")
+	}
+	n, err = buf.Read(rd)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if n != 4 {
+		t.Fatalf("Wrong number of bytes read: %d, should be %d", n, 4)
+	}
+	if string(rd) != "5678" {
+		t.Fatalf("Read %s, but must be %s", rd, "5679")
+	}
+	n, err = buf.Read(rd)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if n != 2 {
+		t.Fatalf("Wrong number of bytes read: %d, should be %d", n, 2)
+	}
+	if string(rd[:n]) != "90" {
+		t.Fatalf("Read %s, but must be %s", rd, "90")
+	}
+}
+
+func TestBytesPipeWrite(t *testing.T) {
+	buf := NewBytesPipe(nil)
+	buf.Write([]byte("12"))
+	buf.Write([]byte("34"))
+	buf.Write([]byte("56"))
+	buf.Write([]byte("78"))
+	buf.Write([]byte("90"))
+	if string(buf.buf) != "1234567890" {
+		t.Fatalf("Buffer %s, must be %s", buf.buf, "1234567890")
+	}
+}
+
+func BenchmarkBytesPipeWrite(b *testing.B) {
+	for i := 0; i < b.N; i++ {
+		buf := NewBytesPipe(nil)
+		for j := 0; j < 1000; j++ {
+			buf.Write([]byte("pretty short line, because why not?"))
+		}
+	}
+}
+
+func BenchmarkBytesPipeRead(b *testing.B) {
+	rd := make([]byte, 1024)
+	for i := 0; i < b.N; i++ {
+		b.StopTimer()
+		buf := NewBytesPipe(nil)
+		for j := 0; j < 1000; j++ {
+			buf.Write(make([]byte, 1024))
+		}
+		b.StartTimer()
+		for j := 0; j < 1000; j++ {
+			if n, _ := buf.Read(rd); n != 1024 {
+				b.Fatalf("Wrong number of bytes: %d", n)
+			}
+		}
+	}
+}