فهرست منبع

Merge pull request #2781 from shykes/engine-status-int

Engine: integer status, better streaming, lots of tests
Guillaume J. Charmes 11 سال پیش
والد
کامیت
2dac7b5209
14فایلهای تغییر یافته به همراه691 افزوده شده و 154 حذف شده
  1. 10 2
      api.go
  2. 6 4
      engine/engine.go
  3. 51 3
      engine/engine_test.go
  4. 2 16
      engine/helpers_test.go
  5. 47 98
      engine/job.go
  6. 80 0
      engine/job_test.go
  7. 166 0
      engine/streams.go
  8. 274 0
      engine/streams_test.go
  9. 4 0
      integration/api_test.go
  10. 2 2
      integration/container_test.go
  11. 1 1
      integration/runtime_test.go
  12. 1 1
      integration/server_test.go
  13. 1 1
      integration/utils_test.go
  14. 46 26
      server.go

+ 10 - 2
api.go

@@ -1,6 +1,8 @@
 package docker
 
 import (
+	"bufio"
+	"bytes"
 	"code.google.com/p/go.net/websocket"
 	"encoding/base64"
 	"encoding/json"
@@ -565,12 +567,18 @@ func postContainersCreate(srv *Server, version float64, w http.ResponseWriter, r
 		job.SetenvList("Dns", defaultDns)
 	}
 	// Read container ID from the first line of stdout
-	job.StdoutParseString(&out.ID)
+	job.Stdout.AddString(&out.ID)
 	// Read warnings from stderr
-	job.StderrParseLines(&out.Warnings, 0)
+	warnings := &bytes.Buffer{}
+	job.Stderr.Add(warnings)
 	if err := job.Run(); err != nil {
 		return err
 	}
+	// Parse warnings from stderr
+	scanner := bufio.NewScanner(warnings)
+	for scanner.Scan() {
+		out.Warnings = append(out.Warnings, scanner.Text())
+	}
 	if job.GetenvInt("Memory") > 0 && !srv.runtime.capabilities.MemoryLimit {
 		log.Println("WARNING: Your kernel does not support memory limit capabilities. Limitation discarded.")
 		out.Warnings = append(out.Warnings, "Your kernel does not support memory limit capabilities. Limitation discarded.")

+ 6 - 4
engine/engine.go

@@ -9,7 +9,7 @@ import (
 	"strings"
 )
 
-type Handler func(*Job) string
+type Handler func(*Job) Status
 
 var globalHandlers map[string]Handler
 
@@ -99,10 +99,12 @@ func (eng *Engine) Job(name string, args ...string) *Job {
 		Eng:    eng,
 		Name:   name,
 		Args:   args,
-		Stdin:  os.Stdin,
-		Stdout: os.Stdout,
-		Stderr: os.Stderr,
+		Stdin:  NewInput(),
+		Stdout: NewOutput(),
+		Stderr: NewOutput(),
 	}
+	job.Stdout.Add(utils.NopWriteCloser(os.Stdout))
+	job.Stderr.Add(utils.NopWriteCloser(os.Stderr))
 	handler, exists := eng.handlers[name]
 	if exists {
 		job.handler = handler

+ 51 - 3
engine/engine_test.go

@@ -1,6 +1,9 @@
 package engine
 
 import (
+	"io/ioutil"
+	"os"
+	"path"
 	"testing"
 )
 
@@ -38,8 +41,9 @@ func TestJob(t *testing.T) {
 		t.Fatalf("job1.handler should be empty")
 	}
 
-	h := func(j *Job) string {
-		return j.Name
+	h := func(j *Job) Status {
+		j.Printf("%s\n", j.Name)
+		return 42
 	}
 
 	eng.Register("dummy2", h)
@@ -49,7 +53,51 @@ func TestJob(t *testing.T) {
 		t.Fatalf("job2.handler shouldn't be nil")
 	}
 
-	if job2.handler(job2) != job2.Name {
+	if job2.handler(job2) != 42 {
 		t.Fatalf("handler dummy2 was not found in job2")
 	}
 }
+
+func TestEngineRoot(t *testing.T) {
+	tmp, err := ioutil.TempDir("", "docker-test-TestEngineCreateDir")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(tmp)
+	dir := path.Join(tmp, "dir")
+	eng, err := New(dir)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if st, err := os.Stat(dir); err != nil {
+		t.Fatal(err)
+	} else if !st.IsDir() {
+		t.Fatalf("engine.New() created something other than a directory at %s", dir)
+	}
+	if r := eng.Root(); r != dir {
+		t.Fatalf("Expected: %v\nReceived: %v", dir, r)
+	}
+}
+
+func TestEngineString(t *testing.T) {
+	eng1 := newTestEngine(t)
+	defer os.RemoveAll(eng1.Root())
+	eng2 := newTestEngine(t)
+	defer os.RemoveAll(eng2.Root())
+	s1 := eng1.String()
+	s2 := eng2.String()
+	if eng1 == eng2 {
+		t.Fatalf("Different engines should have different names (%v == %v)", s1, s2)
+	}
+}
+
+func TestEngineLogf(t *testing.T) {
+	eng := newTestEngine(t)
+	defer os.RemoveAll(eng.Root())
+	input := "Test log line"
+	if n, err := eng.Logf("%s\n", input); err != nil {
+		t.Fatal(err)
+	} else if n < len(input) {
+		t.Fatalf("Test: Logf() should print at least as much as the input\ninput=%d\nprinted=%d", len(input), n)
+	}
+}

+ 2 - 16
engine/helpers_test.go

@@ -1,32 +1,18 @@
 package engine
 
 import (
-	"fmt"
 	"github.com/dotcloud/docker/utils"
-	"io/ioutil"
-	"runtime"
-	"strings"
 	"testing"
 )
 
 var globalTestID string
 
 func newTestEngine(t *testing.T) *Engine {
-	// Use the caller function name as a prefix.
-	// This helps trace temp directories back to their test.
-	pc, _, _, _ := runtime.Caller(1)
-	callerLongName := runtime.FuncForPC(pc).Name()
-	parts := strings.Split(callerLongName, ".")
-	callerShortName := parts[len(parts)-1]
-	if globalTestID == "" {
-		globalTestID = utils.RandomString()[:4]
-	}
-	prefix := fmt.Sprintf("docker-test%s-%s-", globalTestID, callerShortName)
-	root, err := ioutil.TempDir("", prefix)
+	tmp, err := utils.TestDirectory("")
 	if err != nil {
 		t.Fatal(err)
 	}
-	eng, err := New(root)
+	eng, err := New(tmp)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 47 - 98
engine/job.go

@@ -1,16 +1,13 @@
 package engine
 
 import (
-	"bufio"
 	"bytes"
 	"encoding/json"
 	"fmt"
 	"io"
-	"io/ioutil"
-	"os"
 	"strconv"
 	"strings"
-	"sync"
+	"time"
 )
 
 // A job is the fundamental unit of work in the docker engine.
@@ -31,126 +28,75 @@ type Job struct {
 	Name    string
 	Args    []string
 	env     []string
-	Stdin   io.Reader
-	Stdout  io.Writer
-	Stderr  io.Writer
-	handler func(*Job) string
-	status  string
+	Stdout  *Output
+	Stderr  *Output
+	Stdin   *Input
+	handler Handler
+	status  Status
+	end     time.Time
 	onExit  []func()
 }
 
+type Status int
+
+const (
+	StatusOK       Status = 0
+	StatusErr      Status = 1
+	StatusNotFound Status = 127
+)
+
 // Run executes the job and blocks until the job completes.
 // If the job returns a failure status, an error is returned
 // which includes the status.
 func (job *Job) Run() error {
-	defer func() {
-		var wg sync.WaitGroup
-		for _, f := range job.onExit {
-			wg.Add(1)
-			go func(f func()) {
-				f()
-				wg.Done()
-			}(f)
-		}
-		wg.Wait()
-	}()
-	if job.Stdout != nil && job.Stdout != os.Stdout {
-		job.Stdout = io.MultiWriter(job.Stdout, os.Stdout)
-	}
-	if job.Stderr != nil && job.Stderr != os.Stderr {
-		job.Stderr = io.MultiWriter(job.Stderr, os.Stderr)
+	// FIXME: make this thread-safe
+	// FIXME: implement wait
+	if !job.end.IsZero() {
+		return fmt.Errorf("%s: job has already completed", job.Name)
 	}
+	// Log beginning and end of the job
 	job.Eng.Logf("+job %s", job.CallString())
 	defer func() {
 		job.Eng.Logf("-job %s%s", job.CallString(), job.StatusString())
 	}()
+	var errorMessage string
+	job.Stderr.AddString(&errorMessage)
 	if job.handler == nil {
-		job.status = "command not found"
+		job.Errorf("%s: command not found", job.Name)
+		job.status = 127
 	} else {
 		job.status = job.handler(job)
+		job.end = time.Now()
 	}
-	if job.status != "0" {
-		return fmt.Errorf("%s: %s", job.Name, job.status)
+	// Wait for all background tasks to complete
+	if err := job.Stdout.Close(); err != nil {
+		return err
+	}
+	if err := job.Stderr.Close(); err != nil {
+		return err
+	}
+	if job.status != 0 {
+		return fmt.Errorf("%s: %s", job.Name, errorMessage)
 	}
 	return nil
 }
 
-func (job *Job) StdoutParseLines(dst *[]string, limit int) {
-	job.parseLines(job.StdoutPipe(), dst, limit)
-}
-
-func (job *Job) StderrParseLines(dst *[]string, limit int) {
-	job.parseLines(job.StderrPipe(), dst, limit)
-}
-
-func (job *Job) parseLines(src io.Reader, dst *[]string, limit int) {
-	var wg sync.WaitGroup
-	wg.Add(1)
-	go func() {
-		defer wg.Done()
-		scanner := bufio.NewScanner(src)
-		for scanner.Scan() {
-			// If the limit is reached, flush the rest of the source and return
-			if limit > 0 && len(*dst) >= limit {
-				io.Copy(ioutil.Discard, src)
-				return
-			}
-			line := scanner.Text()
-			// Append the line (with delimitor removed)
-			*dst = append(*dst, line)
-		}
-	}()
-	job.onExit = append(job.onExit, wg.Wait)
-}
-
-func (job *Job) StdoutParseString(dst *string) {
-	lines := make([]string, 0, 1)
-	job.StdoutParseLines(&lines, 1)
-	job.onExit = append(job.onExit, func() {
-		if len(lines) >= 1 {
-			*dst = lines[0]
-		}
-	})
-}
-
-func (job *Job) StderrParseString(dst *string) {
-	lines := make([]string, 0, 1)
-	job.StderrParseLines(&lines, 1)
-	job.onExit = append(job.onExit, func() { *dst = lines[0] })
-}
-
-func (job *Job) StdoutPipe() io.ReadCloser {
-	r, w := io.Pipe()
-	job.Stdout = w
-	job.onExit = append(job.onExit, func() { w.Close() })
-	return r
-}
-
-func (job *Job) StderrPipe() io.ReadCloser {
-	r, w := io.Pipe()
-	job.Stderr = w
-	job.onExit = append(job.onExit, func() { w.Close() })
-	return r
-}
-
 func (job *Job) CallString() string {
 	return fmt.Sprintf("%s(%s)", job.Name, strings.Join(job.Args, ", "))
 }
 
 func (job *Job) StatusString() string {
-	// FIXME: if a job returns the empty string, it will be printed
-	// as not having returned.
-	// (this only affects String which is a convenience function).
-	if job.status != "" {
-		var okerr string
-		if job.status == "0" {
-			okerr = "OK"
-		} else {
-			okerr = "ERR"
-		}
-		return fmt.Sprintf(" = %s (%s)", okerr, job.status)
+	// If the job hasn't completed, status string is empty
+	if job.end.IsZero() {
+		return ""
+	}
+	var okerr string
+	if job.status == StatusOK {
+		okerr = "OK"
+	} else {
+		okerr = "ERR"
 	}
-	return ""
+	return fmt.Sprintf(" = %s (%d)", okerr, job.status)
 }
 
 // String returns a human-readable description of `job`
@@ -338,5 +284,8 @@ func (job *Job) Printf(format string, args ...interface{}) (n int, err error) {
 
 func (job *Job) Errorf(format string, args ...interface{}) (n int, err error) {
 	return fmt.Fprintf(job.Stderr, format, args...)
+}
 
+func (job *Job) Error(err error) (int, error) {
+	return fmt.Fprintf(job.Stderr, "%s", err)
 }

+ 80 - 0
engine/job_test.go

@@ -0,0 +1,80 @@
+package engine
+
+import (
+	"os"
+	"testing"
+)
+
+func TestJobStatusOK(t *testing.T) {
+	eng := newTestEngine(t)
+	defer os.RemoveAll(eng.Root())
+	eng.Register("return_ok", func(job *Job) Status { return StatusOK })
+	err := eng.Job("return_ok").Run()
+	if err != nil {
+		t.Fatalf("Expected: err=%v\nReceived: err=%v", nil, err)
+	}
+}
+
+func TestJobStatusErr(t *testing.T) {
+	eng := newTestEngine(t)
+	defer os.RemoveAll(eng.Root())
+	eng.Register("return_err", func(job *Job) Status { return StatusErr })
+	err := eng.Job("return_err").Run()
+	if err == nil {
+		t.Fatalf("When a job returns StatusErr, Run() should return an error")
+	}
+}
+
+func TestJobStatusNotFound(t *testing.T) {
+	eng := newTestEngine(t)
+	defer os.RemoveAll(eng.Root())
+	eng.Register("return_not_found", func(job *Job) Status { return StatusNotFound })
+	err := eng.Job("return_not_found").Run()
+	if err == nil {
+		t.Fatalf("When a job returns StatusNotFound, Run() should return an error")
+	}
+}
+
+func TestJobStdoutString(t *testing.T) {
+	eng := newTestEngine(t)
+	defer os.RemoveAll(eng.Root())
+	// FIXME: test multiple combinations of output and status
+	eng.Register("say_something_in_stdout", func(job *Job) Status {
+		job.Printf("Hello world\n")
+		return StatusOK
+	})
+
+	job := eng.Job("say_something_in_stdout")
+	var output string
+	if err := job.Stdout.AddString(&output); err != nil {
+		t.Fatal(err)
+	}
+	if err := job.Run(); err != nil {
+		t.Fatal(err)
+	}
+	if expectedOutput := "Hello world"; output != expectedOutput {
+		t.Fatalf("Stdout last line:\nExpected: %v\nReceived: %v", expectedOutput, output)
+	}
+}
+
+func TestJobStderrString(t *testing.T) {
+	eng := newTestEngine(t)
+	defer os.RemoveAll(eng.Root())
+	// FIXME: test multiple combinations of output and status
+	eng.Register("say_something_in_stderr", func(job *Job) Status {
+		job.Errorf("Warning, something might happen\nHere it comes!\nOh no...\nSomething happened\n")
+		return StatusOK
+	})
+
+	job := eng.Job("say_something_in_stderr")
+	var output string
+	if err := job.Stderr.AddString(&output); err != nil {
+		t.Fatal(err)
+	}
+	if err := job.Run(); err != nil {
+		t.Fatal(err)
+	}
+	if expectedOutput := "Something happened"; output != expectedOutput {
+		t.Fatalf("Stderr last line:\nExpected: %v\nReceived: %v", expectedOutput, output)
+	}
+}

+ 166 - 0
engine/streams.go

@@ -0,0 +1,166 @@
+package engine
+
+import (
+	"bufio"
+	"container/ring"
+	"fmt"
+	"io"
+	"sync"
+)
+
+type Output struct {
+	sync.Mutex
+	dests []io.Writer
+	tasks sync.WaitGroup
+}
+
+// NewOutput returns a new Output object with no destinations attached.
+// Writing to an empty Output will cause the written data to be discarded.
+func NewOutput() *Output {
+	return &Output{}
+}
+
+// Add attaches a new destination to the Output. Any data subsequently written
+// to the output will be written to the new destination in addition to all the others.
+// This method is thread-safe.
+// FIXME: Add cannot fail
+func (o *Output) Add(dst io.Writer) error {
+	o.Mutex.Lock()
+	defer o.Mutex.Unlock()
+	o.dests = append(o.dests, dst)
+	return nil
+}
+
+// AddPipe creates an in-memory pipe with io.Pipe(), adds its writing end as a destination,
+// and returns its reading end for consumption by the caller.
+// This is a rough equivalent similar to Cmd.StdoutPipe() in the standard os/exec package.
+// This method is thread-safe.
+func (o *Output) AddPipe() (io.Reader, error) {
+	r, w := io.Pipe()
+	o.Add(w)
+	return r, nil
+}
+
+// AddTail starts a new goroutine which will read all subsequent data written to the output,
+// line by line, and append the last `n` lines to `dst`.
+func (o *Output) AddTail(dst *[]string, n int) error {
+	src, err := o.AddPipe()
+	if err != nil {
+		return err
+	}
+	o.tasks.Add(1)
+	go func() {
+		defer o.tasks.Done()
+		Tail(src, n, dst)
+	}()
+	return nil
+}
+
+// AddString starts a new goroutine which will read all subsequent data written to the output,
+// line by line, and store the last line into `dst`.
+func (o *Output) AddString(dst *string) error {
+	src, err := o.AddPipe()
+	if err != nil {
+		return err
+	}
+	o.tasks.Add(1)
+	go func() {
+		defer o.tasks.Done()
+		lines := make([]string, 0, 1)
+		Tail(src, 1, &lines)
+		if len(lines) == 0 {
+			*dst = ""
+		} else {
+			*dst = lines[0]
+		}
+	}()
+	return nil
+}
+
+// Write writes the same data to all registered destinations.
+// This method is thread-safe.
+func (o *Output) Write(p []byte) (n int, err error) {
+	o.Mutex.Lock()
+	defer o.Mutex.Unlock()
+	var firstErr error
+	for _, dst := range o.dests {
+		_, err := dst.Write(p)
+		if err != nil && firstErr == nil {
+			firstErr = err
+		}
+	}
+	return len(p), firstErr
+}
+
+// Close unregisters all destinations and waits for all background
+// AddTail and AddString tasks to complete.
+// The Close method of each destination is called if it exists.
+func (o *Output) Close() error {
+	o.Mutex.Lock()
+	defer o.Mutex.Unlock()
+	var firstErr error
+	for _, dst := range o.dests {
+		if closer, ok := dst.(io.WriteCloser); ok {
+			err := closer.Close()
+			if err != nil && firstErr == nil {
+				firstErr = err
+			}
+		}
+	}
+	o.tasks.Wait()
+	return firstErr
+}
+
+type Input struct {
+	src io.Reader
+	sync.Mutex
+}
+
+// NewInput returns a new Input object with no source attached.
+// Reading to an empty Input will return io.EOF.
+func NewInput() *Input {
+	return &Input{}
+}
+
+// Read reads from the input in a thread-safe way.
+func (i *Input) Read(p []byte) (n int, err error) {
+	i.Mutex.Lock()
+	defer i.Mutex.Unlock()
+	if i.src == nil {
+		return 0, io.EOF
+	}
+	return i.src.Read(p)
+}
+
+// Add attaches a new source to the input.
+// Add can only be called once per input. Subsequent calls will
+// return an error.
+func (i *Input) Add(src io.Reader) error {
+	i.Mutex.Lock()
+	defer i.Mutex.Unlock()
+	if i.src != nil {
+		return fmt.Errorf("Maximum number of sources reached: 1")
+	}
+	i.src = src
+	return nil
+}
+
+// Tail reads from `src` line per line, and returns the last `n` lines as an array.
+// A ring buffer is used to only store `n` lines at any time.
+func Tail(src io.Reader, n int, dst *[]string) {
+	scanner := bufio.NewScanner(src)
+	r := ring.New(n)
+	for scanner.Scan() {
+		if n == 0 {
+			continue
+		}
+		r.Value = scanner.Text()
+		r = r.Next()
+	}
+	r.Do(func(v interface{}) {
+		if v == nil {
+			return
+		}
+		*dst = append(*dst, v.(string))
+	})
+}

+ 274 - 0
engine/streams_test.go

@@ -0,0 +1,274 @@
+package engine
+
+import (
+	"bufio"
+	"bytes"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"strings"
+	"testing"
+)
+
+func TestOutputAddString(t *testing.T) {
+	var testInputs = [][2]string{
+		{
+			"hello, world!",
+			"hello, world!",
+		},
+
+		{
+			"One\nTwo\nThree",
+			"Three",
+		},
+
+		{
+			"",
+			"",
+		},
+
+		{
+			"A line\nThen another nl-terminated line\n",
+			"Then another nl-terminated line",
+		},
+
+		{
+			"A line followed by an empty line\n\n",
+			"",
+		},
+	}
+	for _, testData := range testInputs {
+		input := testData[0]
+		expectedOutput := testData[1]
+		o := NewOutput()
+		var output string
+		if err := o.AddString(&output); err != nil {
+			t.Error(err)
+		}
+		if n, err := o.Write([]byte(input)); err != nil {
+			t.Error(err)
+		} else if n != len(input) {
+			t.Errorf("Expected %d, got %d", len(input), n)
+		}
+		o.Close()
+		if output != expectedOutput {
+			t.Errorf("Last line is not stored as return string.\nInput:   '%s'\nExpected: '%s'\nGot:       '%s'", input, expectedOutput, output)
+		}
+	}
+}
+
+type sentinelWriteCloser struct {
+	calledWrite bool
+	calledClose bool
+}
+
+func (w *sentinelWriteCloser) Write(p []byte) (int, error) {
+	w.calledWrite = true
+	return len(p), nil
+}
+
+func (w *sentinelWriteCloser) Close() error {
+	w.calledClose = true
+	return nil
+}
+
+func TestOutputAddClose(t *testing.T) {
+	o := NewOutput()
+	var s sentinelWriteCloser
+	if err := o.Add(&s); err != nil {
+		t.Fatal(err)
+	}
+	if err := o.Close(); err != nil {
+		t.Fatal(err)
+	}
+	// Write data after the output is closed.
+	// Write should succeed, but no destination should receive it.
+	if _, err := o.Write([]byte("foo bar")); err != nil {
+		t.Fatal(err)
+	}
+	if !s.calledClose {
+		t.Fatal("Output.Close() didn't close the destination")
+	}
+}
+
+func TestOutputAddPipe(t *testing.T) {
+	var testInputs = []string{
+		"hello, world!",
+		"One\nTwo\nThree",
+		"",
+		"A line\nThen another nl-terminated line\n",
+		"A line followed by an empty line\n\n",
+	}
+	for _, input := range testInputs {
+		expectedOutput := input
+		o := NewOutput()
+		r, err := o.AddPipe()
+		if err != nil {
+			t.Fatal(err)
+		}
+		go func(o *Output) {
+			if n, err := o.Write([]byte(input)); err != nil {
+				t.Error(err)
+			} else if n != len(input) {
+				t.Errorf("Expected %d, got %d", len(input), n)
+			}
+			if err := o.Close(); err != nil {
+				t.Error(err)
+			}
+		}(o)
+		output, err := ioutil.ReadAll(r)
+		if err != nil {
+			t.Fatal(err)
+		}
+		if string(output) != expectedOutput {
+			t.Errorf("Last line is not stored as return string.\nExpected: '%s'\nGot:       '%s'", expectedOutput, output)
+		}
+	}
+}
+
+func TestTail(t *testing.T) {
+	var tests = make(map[string][][]string)
+	tests["hello, world!"] = [][]string{
+		{},
+		{"hello, world!"},
+		{"hello, world!"},
+		{"hello, world!"},
+	}
+	tests["One\nTwo\nThree"] = [][]string{
+		{},
+		{"Three"},
+		{"Two", "Three"},
+		{"One", "Two", "Three"},
+	}
+	for input, outputs := range tests {
+		for n, expectedOutput := range outputs {
+			var output []string
+			Tail(strings.NewReader(input), n, &output)
+			if fmt.Sprintf("%v", output) != fmt.Sprintf("%v", expectedOutput) {
+				t.Errorf("Tail n=%d returned wrong result.\nExpected: '%s'\nGot     : '%s'", expectedOutput, output)
+			}
+		}
+	}
+}
+
+func TestOutputAddTail(t *testing.T) {
+	var tests = make(map[string][][]string)
+	tests["hello, world!"] = [][]string{
+		{},
+		{"hello, world!"},
+		{"hello, world!"},
+		{"hello, world!"},
+	}
+	tests["One\nTwo\nThree"] = [][]string{
+		{},
+		{"Three"},
+		{"Two", "Three"},
+		{"One", "Two", "Three"},
+	}
+	for input, outputs := range tests {
+		for n, expectedOutput := range outputs {
+			o := NewOutput()
+			var output []string
+			if err := o.AddTail(&output, n); err != nil {
+				t.Error(err)
+			}
+			if n, err := o.Write([]byte(input)); err != nil {
+				t.Error(err)
+			} else if n != len(input) {
+				t.Errorf("Expected %d, got %d", len(input), n)
+			}
+			o.Close()
+			if fmt.Sprintf("%v", output) != fmt.Sprintf("%v", expectedOutput) {
+				t.Errorf("Tail(%d) returned wrong result.\nExpected: %v\nGot:      %v", n, expectedOutput, output)
+			}
+		}
+	}
+}
+
+func lastLine(txt string) string {
+	scanner := bufio.NewScanner(strings.NewReader(txt))
+	var lastLine string
+	for scanner.Scan() {
+		lastLine = scanner.Text()
+	}
+	return lastLine
+}
+
+func TestOutputAdd(t *testing.T) {
+	o := NewOutput()
+	b := &bytes.Buffer{}
+	o.Add(b)
+	input := "hello, world!"
+	if n, err := o.Write([]byte(input)); err != nil {
+		t.Fatal(err)
+	} else if n != len(input) {
+		t.Fatalf("Expected %d, got %d", len(input), n)
+	}
+	if output := b.String(); output != input {
+		t.Fatal("Received wrong data from Add.\nExpected: '%s'\nGot:     '%s'", input, output)
+	}
+}
+
+func TestOutputWriteError(t *testing.T) {
+	o := NewOutput()
+	buf := &bytes.Buffer{}
+	o.Add(buf)
+	r, w := io.Pipe()
+	input := "Hello there"
+	expectedErr := fmt.Errorf("This is an error")
+	r.CloseWithError(expectedErr)
+	o.Add(w)
+	n, err := o.Write([]byte(input))
+	if err != expectedErr {
+		t.Fatalf("Output.Write() should return the first error encountered, if any")
+	}
+	if buf.String() != input {
+		t.Fatalf("Output.Write() should attempt write on all destinations, even after encountering an error")
+	}
+	if n != len(input) {
+		t.Fatalf("Output.Write() should return the size of the input if it successfully writes to at least one destination")
+	}
+}
+
+func TestInputAddEmpty(t *testing.T) {
+	i := NewInput()
+	var b bytes.Buffer
+	if err := i.Add(&b); err != nil {
+		t.Fatal(err)
+	}
+	data, err := ioutil.ReadAll(i)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(data) > 0 {
+		t.Fatalf("Read from empty input shoul yield no data")
+	}
+}
+
+func TestInputAddTwo(t *testing.T) {
+	i := NewInput()
+	var b1 bytes.Buffer
+	// First add should succeed
+	if err := i.Add(&b1); err != nil {
+		t.Fatal(err)
+	}
+	var b2 bytes.Buffer
+	// Second add should fail
+	if err := i.Add(&b2); err == nil {
+		t.Fatalf("Adding a second source should return an error")
+	}
+}
+
+func TestInputAddNotEmpty(t *testing.T) {
+	i := NewInput()
+	b := bytes.NewBufferString("hello world\nabc")
+	expectedResult := b.String()
+	i.Add(b)
+	result, err := ioutil.ReadAll(i)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if string(result) != expectedResult {
+		t.Fatalf("Expected: %v\nReceived: %v", expectedResult, result)
+	}
+}

+ 4 - 0
integration/api_test.go

@@ -304,6 +304,10 @@ func TestGetContainersJSON(t *testing.T) {
 		Cmd:   []string{"echo", "test"},
 	}, t)
 
+	if containerID == "" {
+		t.Fatalf("Received empty container ID")
+	}
+
 	req, err := http.NewRequest("GET", "/containers/json?all=1", nil)
 	if err != nil {
 		t.Fatal(err)

+ 2 - 2
integration/container_test.go

@@ -499,7 +499,7 @@ func TestCreateVolume(t *testing.T) {
 		t.Fatal(err)
 	}
 	var id string
-	jobCreate.StdoutParseString(&id)
+	jobCreate.Stdout.AddString(&id)
 	if err := jobCreate.Run(); err != nil {
 		t.Fatal(err)
 	}
@@ -1502,7 +1502,7 @@ func TestOnlyLoopbackExistsWhenUsingDisableNetworkOption(t *testing.T) {
 		t.Fatal(err)
 	}
 	var id string
-	jobCreate.StdoutParseString(&id)
+	jobCreate.Stdout.AddString(&id)
 	if err := jobCreate.Run(); err != nil {
 		t.Fatal(err)
 	}

+ 1 - 1
integration/runtime_test.go

@@ -390,7 +390,7 @@ func startEchoServerContainer(t *testing.T, proto string) (*docker.Runtime, *doc
 		jobCreate.SetenvList("Cmd", []string{"sh", "-c", cmd})
 		jobCreate.SetenvList("PortSpecs", []string{fmt.Sprintf("%s/%s", strPort, proto)})
 		jobCreate.SetenvJson("ExposedPorts", ep)
-		jobCreate.StdoutParseString(&id)
+		jobCreate.Stdout.AddString(&id)
 		if err := jobCreate.Run(); err != nil {
 			t.Fatal(err)
 		}

+ 1 - 1
integration/server_test.go

@@ -224,7 +224,7 @@ func TestRunWithTooLowMemoryLimit(t *testing.T) {
 	job.Setenv("CpuShares", "1000")
 	job.SetenvList("Cmd", []string{"/bin/cat"})
 	var id string
-	job.StdoutParseString(&id)
+	job.Stdout.AddString(&id)
 	if err := job.Run(); err == nil {
 		t.Errorf("Memory limit is smaller than the allowed limit. Container creation should've failed!")
 	}

+ 1 - 1
integration/utils_test.go

@@ -46,7 +46,7 @@ func createNamedTestContainer(eng *engine.Engine, config *docker.Config, f utils
 	if err := job.ImportEnv(config); err != nil {
 		f.Fatal(err)
 	}
-	job.StdoutParseString(&shortId)
+	job.Stdout.AddString(&shortId)
 	if err := job.Run(); err != nil {
 		f.Fatal(err)
 	}

+ 46 - 26
server.go

@@ -39,15 +39,18 @@ func init() {
 // jobInitApi runs the remote api server `srv` as a daemon,
 // Only one api server can run at the same time - this is enforced by a pidfile.
 // The signals SIGINT and SIGTERM are intercepted for cleanup.
-func jobInitApi(job *engine.Job) string {
+func jobInitApi(job *engine.Job) engine.Status {
 	job.Logf("Creating server")
+	// FIXME: ImportEnv deprecates ConfigFromJob
 	srv, err := NewServer(job.Eng, ConfigFromJob(job))
 	if err != nil {
-		return err.Error()
+		job.Error(err)
+		return engine.StatusErr
 	}
 	if srv.runtime.config.Pidfile != "" {
 		job.Logf("Creating pidfile")
 		if err := utils.CreatePidFile(srv.runtime.config.Pidfile); err != nil {
+			// FIXME: do we need fatal here instead of returning a job error?
 			log.Fatal(err)
 		}
 	}
@@ -68,18 +71,21 @@ func jobInitApi(job *engine.Job) string {
 		job.Eng.Hack_SetGlobalVar("httpapi.bridgeIP", srv.runtime.networkManager.bridgeNetwork.IP)
 	}
 	if err := job.Eng.Register("create", srv.ContainerCreate); err != nil {
-		return err.Error()
+		job.Error(err)
+		return engine.StatusErr
 	}
 	if err := job.Eng.Register("start", srv.ContainerStart); err != nil {
-		return err.Error()
+		job.Error(err)
+		return engine.StatusErr
 	}
 	if err := job.Eng.Register("serveapi", srv.ListenAndServe); err != nil {
-		return err.Error()
+		job.Error(err)
+		return engine.StatusErr
 	}
-	return "0"
+	return engine.StatusOK
 }
 
-func (srv *Server) ListenAndServe(job *engine.Job) string {
+func (srv *Server) ListenAndServe(job *engine.Job) engine.Status {
 	protoAddrs := job.Args
 	chErrors := make(chan error, len(protoAddrs))
 	for _, protoAddr := range protoAddrs {
@@ -94,7 +100,8 @@ func (srv *Server) ListenAndServe(job *engine.Job) string {
 				log.Println("/!\\ DON'T BIND ON ANOTHER IP ADDRESS THAN 127.0.0.1 IF YOU DON'T KNOW WHAT YOU'RE DOING /!\\")
 			}
 		default:
-			return "Invalid protocol format."
+			job.Errorf("Invalid protocol format.")
+			return engine.StatusErr
 		}
 		go func() {
 			// FIXME: merge Server.ListenAndServe with ListenAndServe
@@ -104,10 +111,11 @@ func (srv *Server) ListenAndServe(job *engine.Job) string {
 	for i := 0; i < len(protoAddrs); i += 1 {
 		err := <-chErrors
 		if err != nil {
-			return err.Error()
+			job.Error(err)
+			return engine.StatusErr
 		}
 	}
-	return "0"
+	return engine.StatusOK
 }
 
 func (srv *Server) DockerVersion() APIVersion {
@@ -1260,19 +1268,22 @@ func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Write
 	return nil
 }
 
-func (srv *Server) ContainerCreate(job *engine.Job) string {
+func (srv *Server) ContainerCreate(job *engine.Job) engine.Status {
 	var name string
 	if len(job.Args) == 1 {
 		name = job.Args[0]
 	} else if len(job.Args) > 1 {
-		return fmt.Sprintf("Usage: %s ", job.Name)
+		job.Printf("Usage: %s", job.Name)
+		return engine.StatusErr
 	}
 	var config Config
 	if err := job.ExportEnv(&config); err != nil {
-		return err.Error()
+		job.Error(err)
+		return engine.StatusErr
 	}
 	if config.Memory != 0 && config.Memory < 524288 {
-		return "Minimum memory limit allowed is 512k"
+		job.Errorf("Minimum memory limit allowed is 512k")
+		return engine.StatusErr
 	}
 	if config.Memory > 0 && !srv.runtime.capabilities.MemoryLimit {
 		config.Memory = 0
@@ -1287,9 +1298,11 @@ func (srv *Server) ContainerCreate(job *engine.Job) string {
 			if tag == "" {
 				tag = DEFAULTTAG
 			}
-			return fmt.Sprintf("No such image: %s (tag: %s)", config.Image, tag)
+			job.Errorf("No such image: %s (tag: %s)", config.Image, tag)
+			return engine.StatusErr
 		}
-		return err.Error()
+		job.Error(err)
+		return engine.StatusErr
 	}
 	srv.LogEvent("create", container.ID, srv.runtime.repositories.ImageName(container.Image))
 	// FIXME: this is necessary because runtime.Create might return a nil container
@@ -1301,7 +1314,7 @@ func (srv *Server) ContainerCreate(job *engine.Job) string {
 	for _, warning := range buildWarnings {
 		job.Errorf("%s\n", warning)
 	}
-	return "0"
+	return engine.StatusOK
 }
 
 func (srv *Server) ContainerRestart(name string, t int) error {
@@ -1619,22 +1632,25 @@ func (srv *Server) RegisterLinks(name string, hostConfig *HostConfig) error {
 	return nil
 }
 
-func (srv *Server) ContainerStart(job *engine.Job) string {
+func (srv *Server) ContainerStart(job *engine.Job) engine.Status {
 	if len(job.Args) < 1 {
-		return fmt.Sprintf("Usage: %s container_id", job.Name)
+		job.Errorf("Usage: %s container_id", job.Name)
+		return engine.StatusErr
 	}
 	name := job.Args[0]
 	runtime := srv.runtime
 	container := runtime.Get(name)
 
 	if container == nil {
-		return fmt.Sprintf("No such container: %s", name)
+		job.Errorf("No such container: %s", name)
+		return engine.StatusErr
 	}
 	// If no environment was set, then no hostconfig was passed.
 	if len(job.Environ()) > 0 {
 		var hostConfig HostConfig
 		if err := job.ExportEnv(&hostConfig); err != nil {
-			return err.Error()
+			job.Error(err)
+			return engine.StatusErr
 		}
 		// Validate the HostConfig binds. Make sure that:
 		// 1) the source of a bind mount isn't /
@@ -1647,29 +1663,33 @@ func (srv *Server) ContainerStart(job *engine.Job) string {
 
 			// refuse to bind mount "/" to the container
 			if source == "/" {
-				return fmt.Sprintf("Invalid bind mount '%s' : source can't be '/'", bind)
+				job.Errorf("Invalid bind mount '%s' : source can't be '/'", bind)
+				return engine.StatusErr
 			}
 
 			// ensure the source exists on the host
 			_, err := os.Stat(source)
 			if err != nil && os.IsNotExist(err) {
-				return fmt.Sprintf("Invalid bind mount '%s' : source doesn't exist", bind)
+				job.Errorf("Invalid bind mount '%s' : source doesn't exist", bind)
+				return engine.StatusErr
 			}
 		}
 		// Register any links from the host config before starting the container
 		// FIXME: we could just pass the container here, no need to lookup by name again.
 		if err := srv.RegisterLinks(name, &hostConfig); err != nil {
-			return err.Error()
+			job.Error(err)
+			return engine.StatusErr
 		}
 		container.hostConfig = &hostConfig
 		container.ToDisk()
 	}
 	if err := container.Start(); err != nil {
-		return fmt.Sprintf("Cannot start container %s: %s", name, err)
+		job.Errorf("Cannot start container %s: %s", name, err)
+		return engine.StatusErr
 	}
 	srv.LogEvent("start", container.ID, runtime.repositories.ImageName(container.Image))
 
-	return "0"
+	return engine.StatusOK
 }
 
 func (srv *Server) ContainerStop(name string, t int) error {