Jelajahi Sumber

Initial commit

Andrea Luzzardi 12 tahun lalu
melakukan
a27b4b8cb8
10 mengubah file dengan 1146 tambahan dan 0 penghapusan
  1. 203 0
      container.go
  2. 186 0
      container_test.go
  3. 112 0
      docker.go
  4. 175 0
      docker_test.go
  5. 52 0
      filesystem.go
  6. 35 0
      filesystem_test.go
  7. 94 0
      lxc_template.go
  8. 48 0
      state.go
  9. 115 0
      utils.go
  10. 126 0
      utils_test.go

+ 203 - 0
container.go

@@ -0,0 +1,203 @@
+package docker
+
+import (
+	"encoding/json"
+	"errors"
+	"io"
+	"io/ioutil"
+	"os"
+	"os/exec"
+	"path"
+	"syscall"
+)
+
+type Container struct {
+	Name string
+	Root string
+	Path string
+	Args []string
+
+	*Config
+	*Filesystem
+	*State
+
+	lxcConfigPath string
+	cmd           *exec.Cmd
+	stdout        *writeBroadcaster
+	stderr        *writeBroadcaster
+}
+
+type Config struct {
+	Hostname string
+	Ram      int64
+}
+
+func createContainer(name string, root string, command string, args []string, layers []string, config *Config) (*Container, error) {
+	container := &Container{
+		Name:       name,
+		Root:       root,
+		Path:       command,
+		Args:       args,
+		Config:     config,
+		Filesystem: newFilesystem(path.Join(root, "rootfs"), path.Join(root, "rw"), layers),
+		State:      newState(),
+
+		lxcConfigPath: path.Join(root, "config.lxc"),
+		stdout:        newWriteBroadcaster(),
+		stderr:        newWriteBroadcaster(),
+	}
+
+	if err := os.Mkdir(root, 0700); err != nil {
+		return nil, err
+	}
+
+	if err := container.save(); err != nil {
+		return nil, err
+	}
+	if err := container.generateLXCConfig(); err != nil {
+		return nil, err
+	}
+	return container, nil
+}
+
+func loadContainer(containerPath string) (*Container, error) {
+	configPath := path.Join(containerPath, "config.json")
+	fi, err := os.Open(configPath)
+	if err != nil {
+		return nil, err
+	}
+	defer fi.Close()
+	enc := json.NewDecoder(fi)
+	container := &Container{}
+	if err := enc.Decode(container); err != nil {
+		return nil, err
+	}
+	return container, nil
+}
+
+func (container *Container) save() error {
+	configPath := path.Join(container.Root, "config.json")
+	fo, err := os.Create(configPath)
+	if err != nil {
+		return err
+	}
+	defer fo.Close()
+	enc := json.NewEncoder(fo)
+	if err := enc.Encode(container); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (container *Container) generateLXCConfig() error {
+	fo, err := os.Create(container.lxcConfigPath)
+	if err != nil {
+		return err
+	}
+	defer fo.Close()
+
+	if err := LxcTemplateCompiled.Execute(fo, container); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (container *Container) Start() error {
+	if err := container.Filesystem.Mount(); err != nil {
+		return err
+	}
+
+	params := []string{
+		"-n", container.Name,
+		"-f", container.lxcConfigPath,
+		"--",
+		container.Path,
+	}
+	params = append(params, container.Args...)
+
+	container.cmd = exec.Command("/usr/bin/lxc-start", params...)
+	container.cmd.Stdout = container.stdout
+	container.cmd.Stderr = container.stderr
+
+	if err := container.cmd.Start(); err != nil {
+		return err
+	}
+	container.State.setRunning(container.cmd.Process.Pid)
+	go container.monitor()
+
+	// Wait until we are out of the STARTING state before returning
+	//
+	// Even though lxc-wait blocks until the container reaches a given state,
+	// sometimes it returns an error code, which is why we have to retry.
+	//
+	// This is a rare race condition that happens for short lived programs
+	for retries := 0; retries < 3; retries++ {
+		err := exec.Command("/usr/bin/lxc-wait", "-n", container.Name, "-s", "RUNNING|STOPPED").Run()
+		if err == nil {
+			return nil
+		}
+	}
+	return errors.New("Container failed to start")
+}
+
+func (container *Container) Run() error {
+	if err := container.Start(); err != nil {
+		return err
+	}
+	container.Wait()
+	return nil
+}
+
+func (container *Container) Output() (output []byte, err error) {
+	pipe, err := container.StdoutPipe()
+	if err != nil {
+		return nil, err
+	}
+	defer pipe.Close()
+	if err := container.Start(); err != nil {
+		return nil, err
+	}
+	output, err = ioutil.ReadAll(pipe)
+	container.Wait()
+	return output, err
+}
+
+func (container *Container) StdoutPipe() (io.ReadCloser, error) {
+	reader, writer := io.Pipe()
+	container.stdout.AddWriter(writer)
+	return newBufReader(reader), nil
+}
+
+func (container *Container) StderrPipe() (io.ReadCloser, error) {
+	reader, writer := io.Pipe()
+	container.stderr.AddWriter(writer)
+	return newBufReader(reader), nil
+}
+
+func (container *Container) monitor() {
+	container.cmd.Wait()
+	container.stdout.Close()
+	container.stderr.Close()
+	container.State.setStopped(container.cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus())
+}
+
+func (container *Container) Stop() error {
+	if container.State.Running {
+		if err := exec.Command("/usr/bin/lxc-stop", "-n", container.Name).Run(); err != nil {
+			return err
+		}
+		//FIXME: We should lxc-wait for the container to stop
+	}
+
+	if err := container.Filesystem.Umount(); err != nil {
+		// FIXME: Do not abort, probably already umounted?
+		return nil
+	}
+	return nil
+}
+
+func (container *Container) Wait() {
+	for container.State.Running {
+		container.State.wait()
+	}
+}

+ 186 - 0
container_test.go

@@ -0,0 +1,186 @@
+package docker
+
+import (
+	"testing"
+)
+
+func TestStart(t *testing.T) {
+	docker, err := newTestDocker()
+	if err != nil {
+		t.Fatal(err)
+	}
+	container, err := docker.Create(
+		"start_test",
+		"ls",
+		[]string{"-al"},
+		[]string{"/var/lib/docker/images/ubuntu"},
+		&Config{
+			Ram: 33554432,
+		},
+	)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer docker.Destroy(container)
+
+	if container.State.Running {
+		t.Errorf("Container shouldn't be running")
+	}
+	if err := container.Start(); err != nil {
+		t.Fatal(err)
+	}
+	container.Wait()
+	if container.State.Running {
+		t.Errorf("Container shouldn't be running")
+	}
+	// We should be able to call Wait again
+	container.Wait()
+	if container.State.Running {
+		t.Errorf("Container shouldn't be running")
+	}
+}
+
+func TestRun(t *testing.T) {
+	docker, err := newTestDocker()
+	if err != nil {
+		t.Fatal(err)
+	}
+	container, err := docker.Create(
+		"run_test",
+		"ls",
+		[]string{"-al"},
+		[]string{"/var/lib/docker/images/ubuntu"},
+		&Config{
+			Ram: 33554432,
+		},
+	)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer docker.Destroy(container)
+
+	if container.State.Running {
+		t.Errorf("Container shouldn't be running")
+	}
+	if err := container.Run(); err != nil {
+		t.Fatal(err)
+	}
+	if container.State.Running {
+		t.Errorf("Container shouldn't be running")
+	}
+}
+
+func TestOutput(t *testing.T) {
+	docker, err := newTestDocker()
+	if err != nil {
+		t.Fatal(err)
+	}
+	container, err := docker.Create(
+		"output_test",
+		"echo",
+		[]string{"-n", "foobar"},
+		[]string{"/var/lib/docker/images/ubuntu"},
+		&Config{},
+	)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer docker.Destroy(container)
+
+	pipe, err := container.StdoutPipe()
+	defer pipe.Close()
+	output, err := container.Output()
+	if err != nil {
+		t.Fatal(err)
+	}
+	if string(output) != "foobar" {
+		t.Error(string(output))
+	}
+}
+
+func TestStop(t *testing.T) {
+	docker, err := newTestDocker()
+	if err != nil {
+		t.Fatal(err)
+	}
+	container, err := docker.Create(
+		"stop_test",
+		"sleep",
+		[]string{"300"},
+		[]string{"/var/lib/docker/images/ubuntu"},
+		&Config{},
+	)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer docker.Destroy(container)
+
+	if container.State.Running {
+		t.Errorf("Container shouldn't be running")
+	}
+	if err := container.Start(); err != nil {
+		t.Fatal(err)
+	}
+	if !container.State.Running {
+		t.Errorf("Container should be running")
+	}
+	if err := container.Stop(); err != nil {
+		t.Fatal(err)
+	}
+	if container.State.Running {
+		t.Errorf("Container shouldn't be running")
+	}
+	container.Wait()
+	if container.State.Running {
+		t.Errorf("Container shouldn't be running")
+	}
+	// Try stopping twice
+	if err := container.Stop(); err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestExitCode(t *testing.T) {
+	docker, err := newTestDocker()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	trueContainer, err := docker.Create(
+		"exit_test_1",
+		"/bin/true",
+		[]string{""},
+		[]string{"/var/lib/docker/images/ubuntu"},
+		&Config{},
+	)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer docker.Destroy(trueContainer)
+	if err := trueContainer.Run(); err != nil {
+		t.Fatal(err)
+	}
+
+	falseContainer, err := docker.Create(
+		"exit_test_2",
+		"/bin/false",
+		[]string{""},
+		[]string{"/var/lib/docker/images/ubuntu"},
+		&Config{},
+	)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer docker.Destroy(falseContainer)
+	if err := falseContainer.Run(); err != nil {
+		t.Fatal(err)
+	}
+
+	if trueContainer.State.ExitCode != 0 {
+		t.Errorf("Unexpected exit code %v", trueContainer.State.ExitCode)
+	}
+
+	if falseContainer.State.ExitCode != 1 {
+		t.Errorf("Unexpected exit code %v", falseContainer.State.ExitCode)
+	}
+}

+ 112 - 0
docker.go

@@ -0,0 +1,112 @@
+package docker
+
+import (
+	"container/list"
+	"fmt"
+	"io/ioutil"
+	"os"
+	"path"
+)
+
+type Docker struct {
+	root       string
+	repository string
+	containers *list.List
+}
+
+func (docker *Docker) List() []*Container {
+	containers := []*Container{}
+	for e := docker.containers.Front(); e != nil; e = e.Next() {
+		containers = append(containers, e.Value.(*Container))
+	}
+	return containers
+}
+
+func (docker *Docker) getContainerElement(name string) *list.Element {
+	for e := docker.containers.Front(); e != nil; e = e.Next() {
+		container := e.Value.(*Container)
+		if container.Name == name {
+			return e
+		}
+	}
+	return nil
+}
+
+func (docker *Docker) Get(name string) *Container {
+	e := docker.getContainerElement(name)
+	if e == nil {
+		return nil
+	}
+	return e.Value.(*Container)
+}
+
+func (docker *Docker) Exists(name string) bool {
+	return docker.Get(name) != nil
+}
+
+func (docker *Docker) Create(name string, command string, args []string, layers []string, config *Config) (*Container, error) {
+	if docker.Exists(name) {
+		return nil, fmt.Errorf("Container %v already exists", name)
+	}
+	root := path.Join(docker.repository, name)
+	container, err := createContainer(name, root, command, args, layers, config)
+	if err != nil {
+		return nil, err
+	}
+	docker.containers.PushBack(container)
+	return container, nil
+}
+
+func (docker *Docker) Destroy(container *Container) error {
+	element := docker.getContainerElement(container.Name)
+	if element == nil {
+		return fmt.Errorf("Container %v not found - maybe it was already destroyed?", container.Name)
+	}
+
+	if err := container.Stop(); err != nil {
+		return err
+	}
+	if err := os.RemoveAll(container.Root); err != nil {
+		return err
+	}
+
+	docker.containers.Remove(element)
+	return nil
+}
+
+func (docker *Docker) restore() error {
+	dir, err := ioutil.ReadDir(docker.repository)
+	if err != nil {
+		return err
+	}
+	for _, v := range dir {
+		container, err := loadContainer(path.Join(docker.repository, v.Name()))
+		if err != nil {
+			fmt.Errorf("Failed to load %v: %v", v.Name(), err)
+			continue
+		}
+		docker.containers.PushBack(container)
+	}
+	return nil
+}
+
+func New() (*Docker, error) {
+	return NewFromDirectory("/var/lib/docker")
+}
+
+func NewFromDirectory(root string) (*Docker, error) {
+	docker := &Docker{
+		root:       root,
+		repository: path.Join(root, "containers"),
+		containers: list.New(),
+	}
+
+	if err := os.Mkdir(docker.repository, 0700); err != nil && !os.IsExist(err) {
+		return nil, err
+	}
+
+	if err := docker.restore(); err != nil {
+		return nil, err
+	}
+	return docker, nil
+}

+ 175 - 0
docker_test.go

@@ -0,0 +1,175 @@
+package docker
+
+import (
+	"io/ioutil"
+	"os"
+	"testing"
+)
+
+func newTestDocker() (*Docker, error) {
+	root, err := ioutil.TempDir("", "docker-test")
+	if err != nil {
+		return nil, err
+	}
+	docker, err := NewFromDirectory(root)
+	if err != nil {
+		return nil, err
+	}
+	return docker, nil
+}
+
+func TestCreate(t *testing.T) {
+	docker, err := newTestDocker()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Make sure we start we 0 containers
+	if len(docker.List()) != 0 {
+		t.Errorf("Expected 0 containers, %v found", len(docker.List()))
+	}
+	container, err := docker.Create(
+		"test_create",
+		"ls",
+		[]string{"-al"},
+		[]string{"/var/lib/docker/images/ubuntu"},
+		&Config{},
+	)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	defer func() {
+		if err := docker.Destroy(container); err != nil {
+			t.Error(err)
+		}
+	}()
+
+	// Make sure we can find the newly created container with List()
+	if len(docker.List()) != 1 {
+		t.Errorf("Expected 1 container, %v found", len(docker.List()))
+	}
+
+	// Make sure the container List() returns is the right one
+	if docker.List()[0].Name != "test_create" {
+		t.Errorf("Unexpected container %v returned by List", docker.List()[0])
+	}
+
+	// Make sure we can get the container with Get()
+	if docker.Get("test_create") == nil {
+		t.Errorf("Unable to get newly created container")
+	}
+
+	// Make sure it is the right container
+	if docker.Get("test_create") != container {
+		t.Errorf("Get() returned the wrong container")
+	}
+
+	// Make sure Exists returns it as existing
+	if !docker.Exists("test_create") {
+		t.Errorf("Exists() returned false for a newly created container")
+	}
+}
+
+func TestDestroy(t *testing.T) {
+	docker, err := newTestDocker()
+	if err != nil {
+		t.Fatal(err)
+	}
+	container, err := docker.Create(
+		"test_destroy",
+		"ls",
+		[]string{"-al"},
+		[]string{"/var/lib/docker/images/ubuntu"},
+		&Config{},
+	)
+	if err != nil {
+		t.Fatal(err)
+	}
+	// Destroy
+	if err := docker.Destroy(container); err != nil {
+		t.Error(err)
+	}
+
+	// Make sure docker.Exists() behaves correctly
+	if docker.Exists("test_destroy") {
+		t.Errorf("Exists() returned true")
+	}
+
+	// Make sure docker.List() doesn't list the destroyed container
+	if len(docker.List()) != 0 {
+		t.Errorf("Expected 0 container, %v found", len(docker.List()))
+	}
+
+	// Make sure docker.Get() refuses to return the unexisting container
+	if docker.Get("test_destroy") != nil {
+		t.Errorf("Unable to get newly created container")
+	}
+
+	// Make sure the container root directory does not exist anymore
+	_, err = os.Stat(container.Root)
+	if err == nil || !os.IsNotExist(err) {
+		t.Errorf("Container root directory still exists after destroy")
+	}
+
+	// Test double destroy
+	if err := docker.Destroy(container); err == nil {
+		// It should have failed
+		t.Errorf("Double destroy did not fail")
+	}
+}
+
+func TestGet(t *testing.T) {
+	docker, err := newTestDocker()
+	if err != nil {
+		t.Fatal(err)
+	}
+	container1, err := docker.Create(
+		"test1",
+		"ls",
+		[]string{"-al"},
+		[]string{"/var/lib/docker/images/ubuntu"},
+		&Config{},
+	)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer docker.Destroy(container1)
+
+	container2, err := docker.Create(
+		"test2",
+		"ls",
+		[]string{"-al"},
+		[]string{"/var/lib/docker/images/ubuntu"},
+		&Config{},
+	)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer docker.Destroy(container2)
+
+	container3, err := docker.Create(
+		"test3",
+		"ls",
+		[]string{"-al"},
+		[]string{"/var/lib/docker/images/ubuntu"},
+		&Config{},
+	)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer docker.Destroy(container3)
+
+	if docker.Get("test1") != container1 {
+		t.Errorf("Get(test1) returned %v while expecting %v", docker.Get("test1"), container1)
+	}
+
+	if docker.Get("test2") != container2 {
+		t.Errorf("Get(test2) returned %v while expecting %v", docker.Get("test2"), container2)
+	}
+
+	if docker.Get("test3") != container3 {
+		t.Errorf("Get(test3) returned %v while expecting %v", docker.Get("test3"), container3)
+	}
+
+}

+ 52 - 0
filesystem.go

@@ -0,0 +1,52 @@
+package docker
+
+import (
+	"fmt"
+	"os"
+	"os/exec"
+)
+
+type Filesystem struct {
+	RootFS string
+	RWPath string
+	Layers []string
+}
+
+func (fs *Filesystem) createMountPoints() error {
+	if err := os.Mkdir(fs.RootFS, 0700); err != nil && !os.IsExist(err) {
+		return err
+	}
+	if err := os.Mkdir(fs.RWPath, 0700); err != nil && !os.IsExist(err) {
+		return err
+	}
+	return nil
+}
+
+func (fs *Filesystem) Mount() error {
+	if err := fs.createMountPoints(); err != nil {
+		return err
+	}
+	rwBranch := fmt.Sprintf("%v=rw", fs.RWPath)
+	roBranches := ""
+	for _, layer := range fs.Layers {
+		roBranches += fmt.Sprintf("%v=ro:", layer)
+	}
+	branches := fmt.Sprintf("br:%v:%v", rwBranch, roBranches)
+	cmd := exec.Command("mount", "-t", "aufs", "-o", branches, "none", fs.RootFS)
+	if err := cmd.Run(); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (fs *Filesystem) Umount() error {
+	return exec.Command("umount", fs.RootFS).Run()
+}
+
+func newFilesystem(rootfs string, rwpath string, layers []string) *Filesystem {
+	return &Filesystem{
+		RootFS: rootfs,
+		RWPath: rwpath,
+		Layers: layers,
+	}
+}

+ 35 - 0
filesystem_test.go

@@ -0,0 +1,35 @@
+package docker
+
+import (
+	"io/ioutil"
+	"testing"
+)
+
+func TestFilesystem(t *testing.T) {
+	rootfs, err := ioutil.TempDir("", "docker-test-root")
+	if err != nil {
+		t.Fatal(err)
+	}
+	rwpath, err := ioutil.TempDir("", "docker-test-rw")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	filesystem := newFilesystem(rootfs, rwpath, []string{"/var/lib/docker/images/ubuntu", "/var/lib/docker/images/test"})
+
+	if err := filesystem.Umount(); err == nil {
+		t.Errorf("Umount succeeded even though the filesystem was not mounted")
+	}
+
+	if err := filesystem.Mount(); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := filesystem.Umount(); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := filesystem.Umount(); err == nil {
+		t.Errorf("Umount succeeded even though the filesystem was already umounted")
+	}
+}

+ 94 - 0
lxc_template.go

@@ -0,0 +1,94 @@
+package docker
+
+import (
+	"text/template"
+)
+
+const LxcTemplate = `
+# hostname
+{{if .Config.Hostname}}
+lxc.utsname = {{.Config.Hostname}}
+{{else}}
+lxc.utsname = {{.Name}}
+{{end}}
+#lxc.aa_profile = unconfined
+
+# network configuration
+#lxc.network.type = veth
+#lxc.network.flags = up
+#lxc.network.link = br0
+#lxc.network.name = eth0  # Internal container network interface name
+#lxc.network.mtu = 1500
+#lxc.network.ipv4 = {ip_address}/{ip_prefix_len}
+
+# root filesystem
+lxc.rootfs = {{.Filesystem.RootFS}}
+
+# use a dedicated pts for the container (and limit the number of pseudo terminal
+# available)
+lxc.pts = 1024
+
+# disable the main console
+lxc.console = none
+
+# no controlling tty at all
+lxc.tty = 1
+
+# no implicit access to devices
+lxc.cgroup.devices.deny = a
+
+# /dev/null and zero
+lxc.cgroup.devices.allow = c 1:3 rwm
+lxc.cgroup.devices.allow = c 1:5 rwm
+
+# consoles
+lxc.cgroup.devices.allow = c 5:1 rwm
+lxc.cgroup.devices.allow = c 5:0 rwm
+lxc.cgroup.devices.allow = c 4:0 rwm
+lxc.cgroup.devices.allow = c 4:1 rwm
+
+# /dev/urandom,/dev/random
+lxc.cgroup.devices.allow = c 1:9 rwm
+lxc.cgroup.devices.allow = c 1:8 rwm
+
+# /dev/pts/* - pts namespaces are "coming soon"
+lxc.cgroup.devices.allow = c 136:* rwm
+lxc.cgroup.devices.allow = c 5:2 rwm
+
+# tuntap
+lxc.cgroup.devices.allow = c 10:200 rwm
+
+# fuse
+#lxc.cgroup.devices.allow = c 10:229 rwm
+
+# rtc
+#lxc.cgroup.devices.allow = c 254:0 rwm
+
+
+# standard mount point
+lxc.mount.entry = proc {{.Filesystem.RootFS}}/proc proc nosuid,nodev,noexec 0 0
+lxc.mount.entry = sysfs {{.Filesystem.RootFS}}/sys sysfs nosuid,nodev,noexec 0 0
+lxc.mount.entry = devpts {{.Filesystem.RootFS}}/dev/pts devpts newinstance,ptmxmode=0666,nosuid,noexec 0 0
+#lxc.mount.entry = varrun {{.Filesystem.RootFS}}/var/run tmpfs mode=755,size=4096k,nosuid,nodev,noexec 0 0
+#lxc.mount.entry = varlock {{.Filesystem.RootFS}}/var/lock tmpfs size=1024k,nosuid,nodev,noexec 0 0
+#lxc.mount.entry = shm {{.Filesystem.RootFS}}/dev/shm tmpfs size=65536k,nosuid,nodev,noexec 0 0
+
+
+# drop linux capabilities (apply mainly to the user root in the container)
+lxc.cap.drop = audit_control audit_write mac_admin mac_override mknod net_raw setfcap setpcap sys_admin sys_boot sys_module sys_nice sys_pacct sys_rawio sys_resource sys_time sys_tty_config
+
+# limits
+{{if .Config.Ram}}
+lxc.cgroup.memory.limit_in_bytes = {{.Config.Ram}}
+{{end}}
+`
+
+var LxcTemplateCompiled *template.Template
+
+func init() {
+	var err error
+	LxcTemplateCompiled, err = template.New("lxc").Parse(LxcTemplate)
+	if err != nil {
+		panic(err)
+	}
+}

+ 48 - 0
state.go

@@ -0,0 +1,48 @@
+package docker
+
+import (
+	"sync"
+)
+
+type State struct {
+	Running  bool
+	Pid      int
+	ExitCode int
+
+	stateChangeLock *sync.Mutex
+	stateChangeCond *sync.Cond
+}
+
+func newState() *State {
+	lock := new(sync.Mutex)
+	return &State{
+		stateChangeLock: lock,
+		stateChangeCond: sync.NewCond(lock),
+	}
+}
+
+func (s *State) setRunning(pid int) {
+	s.Running = true
+	s.ExitCode = 0
+	s.Pid = pid
+	s.broadcast()
+}
+
+func (s *State) setStopped(exitCode int) {
+	s.Running = false
+	s.Pid = 0
+	s.ExitCode = exitCode
+	s.broadcast()
+}
+
+func (s *State) broadcast() {
+	s.stateChangeLock.Lock()
+	s.stateChangeCond.Broadcast()
+	s.stateChangeLock.Unlock()
+}
+
+func (s *State) wait() {
+	s.stateChangeLock.Lock()
+	s.stateChangeCond.Wait()
+	s.stateChangeLock.Unlock()
+}

+ 115 - 0
utils.go

@@ -0,0 +1,115 @@
+package docker
+
+import (
+	"bytes"
+	"container/list"
+	"io"
+	"sync"
+)
+
+type bufReader struct {
+	buf    *bytes.Buffer
+	reader io.Reader
+	err    error
+	l      sync.Mutex
+	wait   sync.Cond
+}
+
+func newBufReader(r io.Reader) *bufReader {
+	reader := &bufReader{
+		buf:    &bytes.Buffer{},
+		reader: r,
+	}
+	reader.wait.L = &reader.l
+	go reader.drain()
+	return reader
+}
+
+func (r *bufReader) drain() {
+	buf := make([]byte, 1024)
+	for {
+		n, err := r.reader.Read(buf)
+		if err != nil {
+			r.err = err
+		} else {
+			r.buf.Write(buf[0:n])
+		}
+		r.l.Lock()
+		r.wait.Signal()
+		r.l.Unlock()
+		if err != nil {
+			break
+		}
+	}
+}
+
+func (r *bufReader) Read(p []byte) (n int, err error) {
+	for {
+		n, err = r.buf.Read(p)
+		if n > 0 {
+			return n, err
+		}
+		if r.err != nil {
+			return 0, r.err
+		}
+		r.l.Lock()
+		r.wait.Wait()
+		r.l.Unlock()
+	}
+	return
+}
+
+func (r *bufReader) Close() error {
+	closer, ok := r.reader.(io.ReadCloser)
+	if !ok {
+		return nil
+	}
+	return closer.Close()
+}
+
+type writeBroadcaster struct {
+	writers *list.List
+}
+
+func (w *writeBroadcaster) AddWriter(writer io.WriteCloser) {
+	w.writers.PushBack(writer)
+}
+
+func (w *writeBroadcaster) RemoveWriter(writer io.WriteCloser) {
+	for e := w.writers.Front(); e != nil; e = e.Next() {
+		v := e.Value.(io.Writer)
+		if v == writer {
+			w.writers.Remove(e)
+			return
+		}
+	}
+}
+
+func (w *writeBroadcaster) Write(p []byte) (n int, err error) {
+	failed := []*list.Element{}
+	for e := w.writers.Front(); e != nil; e = e.Next() {
+		writer := e.Value.(io.Writer)
+		if n, err := writer.Write(p); err != nil || n != len(p) {
+			// On error, evict the writer
+			failed = append(failed, e)
+		}
+	}
+	// We cannot remove while iterating, so it has to be done in
+	// a separate step
+	for _, e := range failed {
+		w.writers.Remove(e)
+	}
+	return len(p), nil
+}
+
+func (w *writeBroadcaster) Close() error {
+	for e := w.writers.Front(); e != nil; e = e.Next() {
+		writer := e.Value.(io.WriteCloser)
+		writer.Close()
+	}
+	return nil
+}
+
+func newWriteBroadcaster() *writeBroadcaster {
+	return &writeBroadcaster{list.New()}
+}

+ 126 - 0
utils_test.go

@@ -0,0 +1,126 @@
+package docker
+
+import (
+	"bytes"
+	"errors"
+	"io"
+	"io/ioutil"
+	"testing"
+)
+
+func TestBufReader(t *testing.T) {
+	reader, writer := io.Pipe()
+	bufreader := newBufReader(reader)
+
+	// Write everything down to a Pipe
+	// Usually, a pipe should block but because of the buffered reader,
+	// the writes will go through
+	done := make(chan bool)
+	go func() {
+		writer.Write([]byte("hello world"))
+		writer.Close()
+		done <- true
+	}()
+
+	// Drain the reader *after* everything has been written, just to verify
+	// it is indeed buffering
+	<-done
+	output, err := ioutil.ReadAll(bufreader)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if !bytes.Equal(output, []byte("hello world")) {
+		t.Error(string(output))
+	}
+}
+
+type dummyWriter struct {
+	buffer      bytes.Buffer
+	failOnWrite bool
+}
+
+func (dw *dummyWriter) Write(p []byte) (n int, err error) {
+	if dw.failOnWrite {
+		return 0, errors.New("Fake fail")
+	}
+	return dw.buffer.Write(p)
+}
+
+func (dw *dummyWriter) String() string {
+	return dw.buffer.String()
+}
+
+func (dw *dummyWriter) Close() error {
+	return nil
+}
+
+func TestWriteBroadcaster(t *testing.T) {
+	writer := newWriteBroadcaster()
+
+	// Test 1: Both bufferA and bufferB should contain "foo"
+	bufferA := &dummyWriter{}
+	writer.AddWriter(bufferA)
+	bufferB := &dummyWriter{}
+	writer.AddWriter(bufferB)
+	writer.Write([]byte("foo"))
+
+	if bufferA.String() != "foo" {
+		t.Errorf("Buffer contains %v", bufferA.String())
+	}
+
+	if bufferB.String() != "foo" {
+		t.Errorf("Buffer contains %v", bufferB.String())
+	}
+
+	// Test2: bufferA and bufferB should contain "foobar",
+	// while bufferC should only contain "bar"
+	bufferC := &dummyWriter{}
+	writer.AddWriter(bufferC)
+	writer.Write([]byte("bar"))
+
+	if bufferA.String() != "foobar" {
+		t.Errorf("Buffer contains %v", bufferA.String())
+	}
+
+	if bufferB.String() != "foobar" {
+		t.Errorf("Buffer contains %v", bufferB.String())
+	}
+
+	if bufferC.String() != "bar" {
+		t.Errorf("Buffer contains %v", bufferC.String())
+	}
+
+	// Test3: Test removal
+	writer.RemoveWriter(bufferB)
+	writer.Write([]byte("42"))
+	if bufferA.String() != "foobar42" {
+		t.Errorf("Buffer contains %v", bufferA.String())
+	}
+	if bufferB.String() != "foobar" {
+		t.Errorf("Buffer contains %v", bufferB.String())
+	}
+	if bufferC.String() != "bar42" {
+		t.Errorf("Buffer contains %v", bufferC.String())
+	}
+
+	// Test4: Test eviction on failure
+	bufferA.failOnWrite = true
+	writer.Write([]byte("fail"))
+	if bufferA.String() != "foobar42" {
+		t.Errorf("Buffer contains %v", bufferA.String())
+	}
+	if bufferC.String() != "bar42fail" {
+		t.Errorf("Buffer contains %v", bufferC.String())
+	}
+	// Even though we reset the flag, no more writes should go in there
+	bufferA.failOnWrite = false
+	writer.Write([]byte("test"))
+	if bufferA.String() != "foobar42" {
+		t.Errorf("Buffer contains %v", bufferA.String())
+	}
+	if bufferC.String() != "bar42failtest" {
+		t.Errorf("Buffer contains %v", bufferC.String())
+	}
+
+	writer.Close()
+}