From 9422451ac3f541a17daba0d5f6dc8b40a6edc9e9 Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Tue, 22 Apr 2014 16:51:06 -0700 Subject: [PATCH 1/5] engine.Installer: a standard interface for "installable" services Installer is a standard interface for objects which can "install" themselves an engine by registering handlers. This can be used as an entrypoint for external plugins etc. Docker-DCO-1.1-Signed-off-by: Solomon Hykes (github: shykes) --- engine/engine.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/engine/engine.go b/engine/engine.go index 58c37ab933..aaf5c1f595 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -10,6 +10,13 @@ import ( "strings" ) +// Installer is a standard interface for objects which can "install" themselves +// on an engine by registering handlers. +// This can be used as an entrypoint for external plugins etc. +type Installer interface { + Install(*Engine) error +} + type Handler func(*Job) Status var globalHandlers map[string]Handler From 68d3e757503fab422fc96a00d511336a3fdfd619 Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Thu, 24 Apr 2014 00:36:21 -0700 Subject: [PATCH 2/5] engine: allow registering a "catchall" handler which receives all commands Docker-DCO-1.1-Signed-off-by: Solomon Hykes (github: shykes) --- engine/engine.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/engine/engine.go b/engine/engine.go index aaf5c1f595..dc1984ccb5 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -43,6 +43,7 @@ func unregister(name string) { // containers by executing *jobs*. type Engine struct { handlers map[string]Handler + catchall Handler hack Hack // data for temporary hackery (see hack.go) id string Stdout io.Writer @@ -60,6 +61,10 @@ func (eng *Engine) Register(name string, handler Handler) error { return nil } +func (eng *Engine) RegisterCatchall(catchall Handler) { + eng.catchall = catchall +} + // New initializes a new engine. func New() *Engine { eng := &Engine{ @@ -113,9 +118,13 @@ func (eng *Engine) Job(name string, args ...string) *Job { if eng.Logging { job.Stderr.Add(utils.NopWriteCloser(eng.Stderr)) } - handler, exists := eng.handlers[name] - if exists { - job.handler = handler + if eng.catchall != nil { + job.handler = eng.catchall + } else { + handler, exists := eng.handlers[name] + if exists { + job.handler = handler + } } return job } From 3c1d5ca33ecbd644d4e8d864ff59b389f4a4a555 Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Thu, 24 Apr 2014 00:46:32 -0700 Subject: [PATCH 3/5] Remote communication between engines using beam Docker-DCO-1.1-Signed-off-by: Solomon Hykes (github: shykes) --- engine/remote.go | 109 ++++++++++++++++++++++++++++++++++++++++++ engine/remote_test.go | 3 ++ 2 files changed, 112 insertions(+) create mode 100644 engine/remote.go create mode 100644 engine/remote_test.go diff --git a/engine/remote.go b/engine/remote.go new file mode 100644 index 0000000000..1e8777a4b7 --- /dev/null +++ b/engine/remote.go @@ -0,0 +1,109 @@ +package engine + +import ( + "fmt" + "github.com/dotcloud/docker/pkg/beam" + "github.com/dotcloud/docker/pkg/beam/data" + "io" + "os" + "strconv" + "sync" +) + +type Sender struct { + beam.Sender +} + +func NewSender(s beam.Sender) *Sender { + return &Sender{s} +} + +func (s *Sender) Install(eng *Engine) error { + // FIXME: this doesn't exist yet. + eng.RegisterCatchall(s.Handle) + return nil +} + +func (s *Sender) Handle(job *Job) Status { + msg := data.Empty().Set("cmd", append([]string{job.Name}, job.Args...)...) + peer, err := beam.SendConn(s, msg.Bytes()) + if err != nil { + return job.Errorf("beamsend: %v", err) + } + defer peer.Close() + var tasks sync.WaitGroup + defer tasks.Wait() + r := beam.NewRouter(nil) + r.NewRoute().KeyStartsWith("cmd", "log", "stdout").HasAttachment().Handler(func(p []byte, stdout *os.File) error { + tasks.Add(1) + io.Copy(job.Stdout, stdout) + tasks.Done() + return nil + }) + r.NewRoute().KeyStartsWith("cmd", "log", "stderr").HasAttachment().Handler(func(p []byte, stderr *os.File) error { + tasks.Add(1) + io.Copy(job.Stderr, stderr) + tasks.Done() + return nil + }) + var status int + r.NewRoute().KeyStartsWith("cmd", "status").Handler(func(p []byte, f *os.File) error { + cmd := data.Message(p).Get("cmd") + if len(cmd) != 3 { + return fmt.Errorf("usage: %s <0-127>", cmd[0]) + } + s, err := strconv.ParseUint(cmd[2], 10, 8) + if err != nil { + return fmt.Errorf("usage: %s <0-127>", cmd[0]) + } + status = int(s) + return nil + + }) + if _, err := beam.Copy(r, peer); err != nil { + return job.Errorf("%v", err) + } + return Status(status) +} + +type Receiver struct { + *Engine + peer beam.Receiver +} + +func NewReceiver(peer beam.Receiver) *Receiver { + return &Receiver{Engine: New(), peer: peer} +} + +func (rcv *Receiver) Run() error { + r := beam.NewRouter(nil) + r.NewRoute().KeyExists("cmd").Handler(func(p []byte, f *os.File) error { + // Use the attachment as a beam return channel + peer, err := beam.FileConn(f) + if err != nil { + f.Close() + return err + } + cmd := data.Message(p).Get("cmd") + job := rcv.Engine.Job(cmd[0], cmd[1:]...) + stdout, err := beam.SendPipe(peer, data.Empty().Set("cmd", "log", "stdout").Bytes()) + if err != nil { + return err + } + job.Stdout.Add(stdout) + stderr, err := beam.SendPipe(peer, data.Empty().Set("cmd", "log", "stderr").Bytes()) + if err != nil { + return err + } + job.Stderr.Add(stderr) + // ignore error because we pass the raw status + job.Run() + err = peer.Send(data.Empty().Set("cmd", "status", fmt.Sprintf("%d", job.status)).Bytes(), nil) + if err != nil { + return err + } + return nil + }) + _, err := beam.Copy(r, rcv.peer) + return err +} diff --git a/engine/remote_test.go b/engine/remote_test.go new file mode 100644 index 0000000000..54092ec934 --- /dev/null +++ b/engine/remote_test.go @@ -0,0 +1,3 @@ +package engine + +import () From b63b98ee2766321e2ca6f3b159c2bfb303870105 Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Fri, 25 Apr 2014 16:47:03 -0700 Subject: [PATCH 4/5] engine.Sender and engine.Receiver support stdin Docker-DCO-1.1-Signed-off-by: Solomon Hykes (github: shykes) --- engine/remote.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/engine/remote.go b/engine/remote.go index 1e8777a4b7..48638e4383 100644 --- a/engine/remote.go +++ b/engine/remote.go @@ -46,6 +46,12 @@ func (s *Sender) Handle(job *Job) Status { tasks.Done() return nil }) + r.NewRoute().KeyStartsWith("cmd", "log", "stdin").HasAttachment().Handler(func(p []byte, stdin *os.File) error { + tasks.Add(1) + io.Copy(stdin, job.Stdin) + tasks.Done() + return nil + }) var status int r.NewRoute().KeyStartsWith("cmd", "status").Handler(func(p []byte, f *os.File) error { cmd := data.Message(p).Get("cmd") @@ -96,6 +102,11 @@ func (rcv *Receiver) Run() error { return err } job.Stderr.Add(stderr) + stdin, err := beam.SendPipe(peer, data.Empty().Set("cmd", "log", "stdin").Bytes()) + if err != nil { + return err + } + job.Stdin.Add(stdin) // ignore error because we pass the raw status job.Run() err = peer.Send(data.Empty().Set("cmd", "status", fmt.Sprintf("%d", job.status)).Bytes(), nil) From 7e3624a498b8b96a4e8a0f1d59fc2c50bf48efb3 Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Fri, 25 Apr 2014 16:48:16 -0700 Subject: [PATCH 5/5] engine: 'rengine' is a small command-line utility to debug remote engine Docker-DCO-1.1-Signed-off-by: Solomon Hykes (github: shykes) --- engine/rengine/main.go | 43 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 engine/rengine/main.go diff --git a/engine/rengine/main.go b/engine/rengine/main.go new file mode 100644 index 0000000000..b4fa01d39c --- /dev/null +++ b/engine/rengine/main.go @@ -0,0 +1,43 @@ +package main + +import ( + "fmt" + "github.com/dotcloud/docker/engine" + "github.com/dotcloud/docker/pkg/beam" + "net" + "os" +) + +func main() { + eng := engine.New() + + c, err := net.Dial("unix", "beam.sock") + if err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + return + } + defer c.Close() + f, err := c.(*net.UnixConn).File() + if err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + return + } + + child, err := beam.FileConn(f) + if err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + return + } + defer child.Close() + + sender := engine.NewSender(child) + sender.Install(eng) + + cmd := eng.Job(os.Args[1], os.Args[2:]...) + cmd.Stdout.Add(os.Stdout) + cmd.Stderr.Add(os.Stderr) + if err := cmd.Run(); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } +}