diff --git a/docker/docker.go b/docker/docker.go index 66311efac1..1c3c5a366b 100644 --- a/docker/docker.go +++ b/docker/docker.go @@ -2,6 +2,7 @@ package main import ( "github.com/dotcloud/docker/rcli" + "github.com/dotcloud/docker/future" "io" "log" "os" @@ -174,15 +175,25 @@ func main() { if err != nil { Fatal(err) } - go func() { - if _, err := io.Copy(os.Stdout, conn); err != nil { - Fatal(err) + receive_stdout := future.Go(func() error { + _, err := io.Copy(os.Stdout, conn) + return err + }) + send_stdin := future.Go(func() error { + _, err := io.Copy(conn, os.Stdin) + if err := conn.CloseWrite(); err != nil { + log.Printf("Couldn't send EOF: " + err.Error()) } - Restore(0, oldState) - os.Exit(0) - }() - if _, err := io.Copy(conn, os.Stdin); err != nil { + return err + }) + if err := <-receive_stdout; err != nil { Fatal(err) } - Restore(0, oldState) + if IsTerminal(0) { + Restore(0, oldState) + } else { + if err := <-send_stdin; err != nil { + Fatal(err) + } + } } diff --git a/dockerd/dockerd.go b/dockerd/dockerd.go index fe011b1752..6ee17bb45b 100644 --- a/dockerd/dockerd.go +++ b/dockerd/dockerd.go @@ -19,6 +19,7 @@ import ( "sort" "os" "time" + "net/http" ) @@ -138,9 +139,16 @@ func (docker *Docker) CmdPull(stdin io.ReadCloser, stdout io.Writer, args ...str if len(args) < 1 { return errors.New("Not enough arguments") } - time.Sleep(2 * time.Second) - layer := docker.addContainer(args[0], "download", 0) - fmt.Fprintln(stdout, layer.Id) + resp, err := http.Get(args[0]) + if err != nil { + return err + } + layer, err := docker.layers.AddLayer(resp.Body, stdout) + if err != nil { + return err + } + docker.addContainer(args[0], "download", 0) + fmt.Fprintln(stdout, layer.Id()) return nil } @@ -148,12 +156,17 @@ func (docker *Docker) CmdPut(stdin io.ReadCloser, stdout io.Writer, args ...stri if len(args) < 1 { return errors.New("Not enough arguments") } - time.Sleep(1 * time.Second) - layer := docker.addContainer(args[0], "upload", 0) - fmt.Fprintln(stdout, layer.Id) + fmt.Printf("Adding layer\n") + layer, err := docker.layers.AddLayer(stdin, stdout) + if err != nil { + return err + } + docker.addContainer(args[0], "upload", 0) + fmt.Fprintln(stdout, layer.Id()) return nil } + func (docker *Docker) CmdCommit(stdin io.ReadCloser, stdout io.Writer, args ...string) error { flags := rcli.Subcmd(stdout, "fork", "[OPTIONS] CONTAINER [DEST]", @@ -286,7 +299,7 @@ func (docker *Docker) addContainer(name string, source string, size uint) *Conta size = fake.RandomContainerSize() } c := &Container{ - Id: fake.RandomId(), + Id: future.RandomId(), Name: name, Created: time.Now(), Source: source, @@ -342,6 +355,7 @@ func (docker *Docker) CmdLogs(stdin io.ReadCloser, stdout io.Writer, args ...str func (docker *Docker) CmdRun(stdin io.ReadCloser, stdout io.Writer, args ...string) error { flags := rcli.Subcmd(stdout, "run", "[OPTIONS] CONTAINER COMMAND [ARG...]", "Run a command in a container") fl_attach := flags.Bool("a", false, "Attach stdin and stdout") + fl_tty := flags.Bool("t", false, "Allocate a pseudo-tty") if err := flags.Parse(args); err != nil { return nil } @@ -355,9 +369,9 @@ func (docker *Docker) CmdRun(stdin io.ReadCloser, stdout io.Writer, args ...stri return errors.New("Already running: " + name) } if *fl_attach { - return container.Run(cmd[0], cmd[1:], stdin, stdout) + return container.Run(cmd[0], cmd[1:], stdin, stdout, *fl_tty) } else { - go container.Run(cmd[0], cmd[1:], ioutil.NopCloser(new(bytes.Buffer)), ioutil.Discard) + go container.Run(cmd[0], cmd[1:], ioutil.NopCloser(new(bytes.Buffer)), ioutil.Discard, *fl_tty) fmt.Fprintln(stdout, container.Id) return nil } @@ -389,9 +403,12 @@ func startCommand(cmd *exec.Cmd, interactive bool) (io.WriteCloser, io.ReadClose func main() { - fake.Seed() + future.Seed() flag.Parse() - docker := New() + docker, err := New() + if err != nil { + log.Fatal(err) + } go func() { if err := rcli.ListenAndServeHTTP(":8080", docker); err != nil { log.Fatal(err) @@ -402,11 +419,19 @@ func main() { } } -func New() *Docker { +func New() (*Docker, error) { + store, err := future.NewStore("/var/lib/docker/layers") + if err != nil { + return nil, err + } + if err := store.Init(); err != nil { + return nil, err + } return &Docker{ containersByName: make(map[string]*ByDate), containers: make(map[string]*Container), - } + layers: store, + }, nil } @@ -450,17 +475,10 @@ func (docker *Docker) CmdWeb(stdin io.ReadCloser, stdout io.Writer, args ...stri } -func Go(f func() error) chan error { - ch := make(chan error) - go func() { - ch <- f() - }() - return ch -} - type Docker struct { containers map[string]*Container containersByName map[string]*ByDate + layers *future.Store } type Container struct { @@ -478,7 +496,7 @@ type Container struct { stdinLog *bytes.Buffer } -func (c *Container) Run(command string, args []string, stdin io.ReadCloser, stdout io.Writer) error { +func (c *Container) Run(command string, args []string, stdin io.ReadCloser, stdout io.Writer, tty bool) error { // Not thread-safe if c.Running { return errors.New("Already running") @@ -488,26 +506,26 @@ func (c *Container) Run(command string, args []string, stdin io.ReadCloser, stdo // Reset logs c.stdoutLog.Reset() c.stdinLog.Reset() - c.Running = true cmd := exec.Command(c.Cmd, c.Args...) - cmd_stdin, cmd_stdout, err := startCommand(cmd, true) - // ADD FAKE RANDOM CHANGES - c.FilesChanged = fake.RandomFilesChanged() - c.BytesChanged = fake.RandomBytesChanged() + cmd_stdin, cmd_stdout, err := startCommand(cmd, tty) if err != nil { return err } - copy_out := Go(func() error { + c.Running = true + // ADD FAKE RANDOM CHANGES + c.FilesChanged = fake.RandomFilesChanged() + c.BytesChanged = fake.RandomBytesChanged() + copy_out := future.Go(func() error { _, err := io.Copy(io.MultiWriter(stdout, c.stdoutLog), cmd_stdout) return err }) - Go(func() error { + future.Go(func() error { _, err := io.Copy(io.MultiWriter(cmd_stdin, c.stdinLog), stdin) cmd_stdin.Close() stdin.Close() return err }) - wait := Go(func() error { + wait := future.Go(func() error { err := cmd.Wait() c.Running = false return err diff --git a/fake/fake.go b/fake/fake.go index df2731a8a3..0e0de77507 100644 --- a/fake/fake.go +++ b/fake/fake.go @@ -1,22 +1,12 @@ package fake import ( - "github.com/dotcloud/docker/future" "bytes" "math/rand" - "time" "io" "archive/tar" - "fmt" ) -func Seed() { - rand.Seed(time.Now().UTC().UnixNano()) -} - -func randomBytes() io.Reader { - return bytes.NewBuffer([]byte(fmt.Sprintf("%x", rand.Int()))) -} func FakeTar() (io.Reader, error) { content := []byte("Hello world!\n") @@ -46,12 +36,6 @@ func WriteFakeTar(dst io.Writer) error { } -func RandomId() string { - id, _ := future.ComputeId(randomBytes()) // can't fail - return id -} - - func RandomBytesChanged() uint { return uint(rand.Int31n(24 * 1024 * 1024)) } diff --git a/future/future.go b/future/future.go index 3b33e7754d..a0efacc03c 100644 --- a/future/future.go +++ b/future/future.go @@ -5,8 +5,14 @@ import ( "io" "fmt" "time" + "bytes" + "math/rand" ) +func Seed() { + rand.Seed(time.Now().UTC().UnixNano()) +} + func ComputeId(content io.Reader) (string, error) { h := sha256.New() if _, err := io.Copy(h, content); err != nil { @@ -37,3 +43,21 @@ func HumanDuration(d time.Duration) string { } return fmt.Sprintf("%d years", d.Hours() / 24 / 365) } + +func randomBytes() io.Reader { + return bytes.NewBuffer([]byte(fmt.Sprintf("%x", rand.Int()))) +} + +func RandomId() string { + id, _ := ComputeId(randomBytes()) // can't fail + return id +} + +func Go(f func() error) chan error { + ch := make(chan error) + go func() { + ch <- f() + }() + return ch +} + diff --git a/future/layers.go b/future/layers.go new file mode 100644 index 0000000000..b85802dc4f --- /dev/null +++ b/future/layers.go @@ -0,0 +1,135 @@ +package future + +import ( + "errors" + "path" + "path/filepath" + "io" + "os" + "os/exec" +) + +type Store struct { + Root string +} + + +func NewStore(root string) (*Store, error) { + abspath, err := filepath.Abs(root) + if err != nil { + return nil, err + } + return &Store{ + Root: abspath, + }, nil +} + +func (store *Store) Get(id string) (*Layer, bool) { + layer := &Layer{Path: store.layerPath(id)} + if !layer.Exists() { + return nil, false + } + return layer, true +} + +func (store *Store) Exists() (bool, error) { + if stat, err := os.Stat(store.Root); err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, err + } else if !stat.IsDir() { + return false, errors.New("Not a directory: " + store.Root) + } + return true, nil +} + +func (store *Store) Init() error { + if exists, err := store.Exists(); err != nil { + return err + } else if exists { + return nil + } + return os.Mkdir(store.Root, 0700) +} + + +func (store *Store) Mktemp() (string, error) { + tmpName := RandomId() + tmpPath := path.Join(store.Root, "tmp-" + tmpName) + if err := os.Mkdir(tmpPath, 0700); err != nil { + return "", err + } + return tmpPath, nil +} + +func (store *Store) layerPath(id string) string { + return path.Join(store.Root, id) +} + + +func (store *Store) AddLayer(archive io.Reader, stderr io.Writer) (*Layer, error) { + tmp, err := store.Mktemp() + defer os.RemoveAll(tmp) + if err != nil { + return nil, err + } + untarCmd := exec.Command("tar", "-C", tmp, "-x") + untarW, err := untarCmd.StdinPipe() + if err != nil { + return nil, err + } + untarStderr, err := untarCmd.StderrPipe() + if err != nil { + return nil, err + } + go io.Copy(stderr, untarStderr) + untarStdout, err := untarCmd.StdoutPipe() + if err != nil { + return nil, err + } + go io.Copy(stderr, untarStdout) + untarCmd.Start() + hashR, hashW := io.Pipe() + job_copy := Go(func() error { + _, err := io.Copy(io.MultiWriter(hashW, untarW), archive) + hashW.Close() + untarW.Close() + return err + }) + id, err := ComputeId(hashR) + if err != nil { + return nil, err + } + if err := untarCmd.Wait(); err != nil { + return nil, err + } + if err := <-job_copy; err != nil { + return nil, err + } + layer := &Layer{Path: store.layerPath(id)} + if !layer.Exists() { + if err := os.Rename(tmp, layer.Path); err != nil { + return nil, err + } + } + + return layer, nil +} + + +type Layer struct { + Path string +} + +func (layer *Layer) Exists() bool { + st, err := os.Stat(layer.Path) + if err != nil { + return false + } + return st.IsDir() +} + +func (layer *Layer) Id() string { + return path.Base(layer.Path) +} diff --git a/rcli/tcp.go b/rcli/tcp.go index 572dabb56c..c2703a712f 100644 --- a/rcli/tcp.go +++ b/rcli/tcp.go @@ -10,7 +10,7 @@ import ( "bufio" ) -func CallTCP(addr string, args ...string) (io.ReadWriteCloser, error) { +func CallTCP(addr string, args ...string) (*net.TCPConn, error) { cmd, err := json.Marshal(args) if err != nil { return nil, err @@ -22,7 +22,7 @@ func CallTCP(addr string, args ...string) (io.ReadWriteCloser, error) { if _, err := fmt.Fprintln(conn, string(cmd)); err != nil { return nil, err } - return conn, nil + return conn.(*net.TCPConn), nil } func ListenAndServeTCP(addr string, service Service) error {