Merge branch 'master' of github.com:dotcloud/docker

This commit is contained in:
Andrea Luzzardi 2013-01-25 14:09:54 -08:00
commit 1df9819b25
6 changed files with 228 additions and 56 deletions

View file

@ -2,6 +2,7 @@ package main
import (
"github.com/dotcloud/docker/rcli"
"github.com/dotcloud/docker/future"
"io"
"log"
"os"
@ -174,15 +175,25 @@ func main() {
if err != nil {
Fatal(err)
}
go func() {
if _, err := io.Copy(os.Stdout, conn); err != nil {
Fatal(err)
receive_stdout := future.Go(func() error {
_, err := io.Copy(os.Stdout, conn)
return err
})
send_stdin := future.Go(func() error {
_, err := io.Copy(conn, os.Stdin)
if err := conn.CloseWrite(); err != nil {
log.Printf("Couldn't send EOF: " + err.Error())
}
Restore(0, oldState)
os.Exit(0)
}()
if _, err := io.Copy(conn, os.Stdin); err != nil {
return err
})
if err := <-receive_stdout; err != nil {
Fatal(err)
}
Restore(0, oldState)
if IsTerminal(0) {
Restore(0, oldState)
} else {
if err := <-send_stdin; err != nil {
Fatal(err)
}
}
}

View file

@ -19,6 +19,7 @@ import (
"sort"
"os"
"time"
"net/http"
)
@ -138,9 +139,16 @@ func (docker *Docker) CmdPull(stdin io.ReadCloser, stdout io.Writer, args ...str
if len(args) < 1 {
return errors.New("Not enough arguments")
}
time.Sleep(2 * time.Second)
layer := docker.addContainer(args[0], "download", 0)
fmt.Fprintln(stdout, layer.Id)
resp, err := http.Get(args[0])
if err != nil {
return err
}
layer, err := docker.layers.AddLayer(resp.Body, stdout)
if err != nil {
return err
}
docker.addContainer(args[0], "download", 0)
fmt.Fprintln(stdout, layer.Id())
return nil
}
@ -148,12 +156,17 @@ func (docker *Docker) CmdPut(stdin io.ReadCloser, stdout io.Writer, args ...stri
if len(args) < 1 {
return errors.New("Not enough arguments")
}
time.Sleep(1 * time.Second)
layer := docker.addContainer(args[0], "upload", 0)
fmt.Fprintln(stdout, layer.Id)
fmt.Printf("Adding layer\n")
layer, err := docker.layers.AddLayer(stdin, stdout)
if err != nil {
return err
}
docker.addContainer(args[0], "upload", 0)
fmt.Fprintln(stdout, layer.Id())
return nil
}
func (docker *Docker) CmdCommit(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
flags := rcli.Subcmd(stdout,
"fork", "[OPTIONS] CONTAINER [DEST]",
@ -286,7 +299,7 @@ func (docker *Docker) addContainer(name string, source string, size uint) *Conta
size = fake.RandomContainerSize()
}
c := &Container{
Id: fake.RandomId(),
Id: future.RandomId(),
Name: name,
Created: time.Now(),
Source: source,
@ -342,6 +355,7 @@ func (docker *Docker) CmdLogs(stdin io.ReadCloser, stdout io.Writer, args ...str
func (docker *Docker) CmdRun(stdin io.ReadCloser, stdout io.Writer, args ...string) error {
flags := rcli.Subcmd(stdout, "run", "[OPTIONS] CONTAINER COMMAND [ARG...]", "Run a command in a container")
fl_attach := flags.Bool("a", false, "Attach stdin and stdout")
fl_tty := flags.Bool("t", false, "Allocate a pseudo-tty")
if err := flags.Parse(args); err != nil {
return nil
}
@ -355,9 +369,9 @@ func (docker *Docker) CmdRun(stdin io.ReadCloser, stdout io.Writer, args ...stri
return errors.New("Already running: " + name)
}
if *fl_attach {
return container.Run(cmd[0], cmd[1:], stdin, stdout)
return container.Run(cmd[0], cmd[1:], stdin, stdout, *fl_tty)
} else {
go container.Run(cmd[0], cmd[1:], ioutil.NopCloser(new(bytes.Buffer)), ioutil.Discard)
go container.Run(cmd[0], cmd[1:], ioutil.NopCloser(new(bytes.Buffer)), ioutil.Discard, *fl_tty)
fmt.Fprintln(stdout, container.Id)
return nil
}
@ -389,9 +403,12 @@ func startCommand(cmd *exec.Cmd, interactive bool) (io.WriteCloser, io.ReadClose
func main() {
fake.Seed()
future.Seed()
flag.Parse()
docker := New()
docker, err := New()
if err != nil {
log.Fatal(err)
}
go func() {
if err := rcli.ListenAndServeHTTP(":8080", docker); err != nil {
log.Fatal(err)
@ -402,11 +419,19 @@ func main() {
}
}
func New() *Docker {
func New() (*Docker, error) {
store, err := future.NewStore("/var/lib/docker/layers")
if err != nil {
return nil, err
}
if err := store.Init(); err != nil {
return nil, err
}
return &Docker{
containersByName: make(map[string]*ByDate),
containers: make(map[string]*Container),
}
layers: store,
}, nil
}
@ -450,17 +475,10 @@ func (docker *Docker) CmdWeb(stdin io.ReadCloser, stdout io.Writer, args ...stri
}
func Go(f func() error) chan error {
ch := make(chan error)
go func() {
ch <- f()
}()
return ch
}
type Docker struct {
containers map[string]*Container
containersByName map[string]*ByDate
layers *future.Store
}
type Container struct {
@ -478,7 +496,7 @@ type Container struct {
stdinLog *bytes.Buffer
}
func (c *Container) Run(command string, args []string, stdin io.ReadCloser, stdout io.Writer) error {
func (c *Container) Run(command string, args []string, stdin io.ReadCloser, stdout io.Writer, tty bool) error {
// Not thread-safe
if c.Running {
return errors.New("Already running")
@ -488,26 +506,26 @@ func (c *Container) Run(command string, args []string, stdin io.ReadCloser, stdo
// Reset logs
c.stdoutLog.Reset()
c.stdinLog.Reset()
c.Running = true
cmd := exec.Command(c.Cmd, c.Args...)
cmd_stdin, cmd_stdout, err := startCommand(cmd, true)
// ADD FAKE RANDOM CHANGES
c.FilesChanged = fake.RandomFilesChanged()
c.BytesChanged = fake.RandomBytesChanged()
cmd_stdin, cmd_stdout, err := startCommand(cmd, tty)
if err != nil {
return err
}
copy_out := Go(func() error {
c.Running = true
// ADD FAKE RANDOM CHANGES
c.FilesChanged = fake.RandomFilesChanged()
c.BytesChanged = fake.RandomBytesChanged()
copy_out := future.Go(func() error {
_, err := io.Copy(io.MultiWriter(stdout, c.stdoutLog), cmd_stdout)
return err
})
Go(func() error {
future.Go(func() error {
_, err := io.Copy(io.MultiWriter(cmd_stdin, c.stdinLog), stdin)
cmd_stdin.Close()
stdin.Close()
return err
})
wait := Go(func() error {
wait := future.Go(func() error {
err := cmd.Wait()
c.Running = false
return err

View file

@ -1,22 +1,12 @@
package fake
import (
"github.com/dotcloud/docker/future"
"bytes"
"math/rand"
"time"
"io"
"archive/tar"
"fmt"
)
func Seed() {
rand.Seed(time.Now().UTC().UnixNano())
}
func randomBytes() io.Reader {
return bytes.NewBuffer([]byte(fmt.Sprintf("%x", rand.Int())))
}
func FakeTar() (io.Reader, error) {
content := []byte("Hello world!\n")
@ -46,12 +36,6 @@ func WriteFakeTar(dst io.Writer) error {
}
func RandomId() string {
id, _ := future.ComputeId(randomBytes()) // can't fail
return id
}
func RandomBytesChanged() uint {
return uint(rand.Int31n(24 * 1024 * 1024))
}

View file

@ -5,8 +5,14 @@ import (
"io"
"fmt"
"time"
"bytes"
"math/rand"
)
func Seed() {
rand.Seed(time.Now().UTC().UnixNano())
}
func ComputeId(content io.Reader) (string, error) {
h := sha256.New()
if _, err := io.Copy(h, content); err != nil {
@ -37,3 +43,21 @@ func HumanDuration(d time.Duration) string {
}
return fmt.Sprintf("%d years", d.Hours() / 24 / 365)
}
func randomBytes() io.Reader {
return bytes.NewBuffer([]byte(fmt.Sprintf("%x", rand.Int())))
}
func RandomId() string {
id, _ := ComputeId(randomBytes()) // can't fail
return id
}
func Go(f func() error) chan error {
ch := make(chan error)
go func() {
ch <- f()
}()
return ch
}

135
future/layers.go Normal file
View file

@ -0,0 +1,135 @@
package future
import (
"errors"
"path"
"path/filepath"
"io"
"os"
"os/exec"
)
type Store struct {
Root string
}
func NewStore(root string) (*Store, error) {
abspath, err := filepath.Abs(root)
if err != nil {
return nil, err
}
return &Store{
Root: abspath,
}, nil
}
func (store *Store) Get(id string) (*Layer, bool) {
layer := &Layer{Path: store.layerPath(id)}
if !layer.Exists() {
return nil, false
}
return layer, true
}
func (store *Store) Exists() (bool, error) {
if stat, err := os.Stat(store.Root); err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, err
} else if !stat.IsDir() {
return false, errors.New("Not a directory: " + store.Root)
}
return true, nil
}
func (store *Store) Init() error {
if exists, err := store.Exists(); err != nil {
return err
} else if exists {
return nil
}
return os.Mkdir(store.Root, 0700)
}
func (store *Store) Mktemp() (string, error) {
tmpName := RandomId()
tmpPath := path.Join(store.Root, "tmp-" + tmpName)
if err := os.Mkdir(tmpPath, 0700); err != nil {
return "", err
}
return tmpPath, nil
}
func (store *Store) layerPath(id string) string {
return path.Join(store.Root, id)
}
func (store *Store) AddLayer(archive io.Reader, stderr io.Writer) (*Layer, error) {
tmp, err := store.Mktemp()
defer os.RemoveAll(tmp)
if err != nil {
return nil, err
}
untarCmd := exec.Command("tar", "-C", tmp, "-x")
untarW, err := untarCmd.StdinPipe()
if err != nil {
return nil, err
}
untarStderr, err := untarCmd.StderrPipe()
if err != nil {
return nil, err
}
go io.Copy(stderr, untarStderr)
untarStdout, err := untarCmd.StdoutPipe()
if err != nil {
return nil, err
}
go io.Copy(stderr, untarStdout)
untarCmd.Start()
hashR, hashW := io.Pipe()
job_copy := Go(func() error {
_, err := io.Copy(io.MultiWriter(hashW, untarW), archive)
hashW.Close()
untarW.Close()
return err
})
id, err := ComputeId(hashR)
if err != nil {
return nil, err
}
if err := untarCmd.Wait(); err != nil {
return nil, err
}
if err := <-job_copy; err != nil {
return nil, err
}
layer := &Layer{Path: store.layerPath(id)}
if !layer.Exists() {
if err := os.Rename(tmp, layer.Path); err != nil {
return nil, err
}
}
return layer, nil
}
type Layer struct {
Path string
}
func (layer *Layer) Exists() bool {
st, err := os.Stat(layer.Path)
if err != nil {
return false
}
return st.IsDir()
}
func (layer *Layer) Id() string {
return path.Base(layer.Path)
}

View file

@ -10,7 +10,7 @@ import (
"bufio"
)
func CallTCP(addr string, args ...string) (io.ReadWriteCloser, error) {
func CallTCP(addr string, args ...string) (*net.TCPConn, error) {
cmd, err := json.Marshal(args)
if err != nil {
return nil, err
@ -22,7 +22,7 @@ func CallTCP(addr string, args ...string) (io.ReadWriteCloser, error) {
if _, err := fmt.Fprintln(conn, string(cmd)); err != nil {
return nil, err
}
return conn, nil
return conn.(*net.TCPConn), nil
}
func ListenAndServeTCP(addr string, service Service) error {