فهرست منبع

Merge pull request #5422 from shykes/engine-spawn

engine/spawn: run an engine in a subprocess, remote-controlled by Beam
Solomon Hykes 11 سال پیش
والد
کامیت
d8332f433f
3فایلهای تغییر یافته به همراه182 افزوده شده و 2 حذف شده
  1. 2 2
      engine/remote.go
  2. 119 0
      engine/spawn/spawn.go
  3. 61 0
      engine/spawn/subengine/main.go

+ 2 - 2
engine/remote.go

@@ -55,10 +55,10 @@ func (s *Sender) Handle(job *Job) Status {
 	var status int
 	r.NewRoute().KeyStartsWith("cmd", "status").Handler(func(p []byte, f *os.File) error {
 		cmd := data.Message(p).Get("cmd")
-		if len(cmd) != 3 {
+		if len(cmd) != 2 {
 			return fmt.Errorf("usage: %s <0-127>", cmd[0])
 		}
-		s, err := strconv.ParseUint(cmd[2], 10, 8)
+		s, err := strconv.ParseUint(cmd[1], 10, 8)
 		if err != nil {
 			return fmt.Errorf("usage: %s <0-127>", cmd[0])
 		}

+ 119 - 0
engine/spawn/spawn.go

@@ -0,0 +1,119 @@
+package spawn
+
+import (
+	"fmt"
+	"github.com/dotcloud/docker/engine"
+	"github.com/dotcloud/docker/pkg/beam"
+	"github.com/dotcloud/docker/utils"
+	"os"
+	"os/exec"
+)
+
+var initCalled bool
+
+// Init checks if the current process has been created by Spawn.
+//
+// If no, it returns nil and the original program can continue
+// unmodified.
+//
+// If no, it hijacks the process to run as a child worker controlled
+// by its parent over a beam connection, with f exposed as a remote
+// service. In this case Init never returns.
+//
+// The hijacking process takes place as follows:
+//	- Open file descriptor 3 as a beam endpoint. If this fails,
+//	terminate the current process.
+//	- Start a new engine.
+//	- Call f.Install on the engine. Any handlers registered
+//	will be available for remote invocation by the parent.
+//	- Listen for beam messages from the parent and pass them to
+//	the handlers.
+//	- When the beam endpoint is closed by the parent, terminate
+//	the current process.
+//
+// NOTE: Init must be called at the beginning of the same program
+// calling Spawn. This is because Spawn approximates a "fork" by
+// re-executing the current binary - where it expects spawn.Init
+// to intercept the control flow and execute the worker code.
+func Init(f engine.Installer) error {
+	initCalled = true
+	if os.Getenv("ENGINESPAWN") != "1" {
+		return nil
+	}
+	fmt.Printf("[%d child]\n", os.Getpid())
+	// Hijack the process
+	childErr := func() error {
+		fd3 := os.NewFile(3, "beam-introspect")
+		introsp, err := beam.FileConn(fd3)
+		if err != nil {
+			return fmt.Errorf("beam introspection error: %v", err)
+		}
+		fd3.Close()
+		defer introsp.Close()
+		eng := engine.NewReceiver(introsp)
+		if err := f.Install(eng.Engine); err != nil {
+			return err
+		}
+		if err := eng.Run(); err != nil {
+			return err
+		}
+		return nil
+	}()
+	if childErr != nil {
+		os.Exit(1)
+	}
+	os.Exit(0)
+	return nil // Never reached
+}
+
+// Spawn starts a new Engine in a child process and returns
+// a proxy Engine through which it can be controlled.
+//
+// The commands available on the child engine are determined
+// by an earlier call to Init. It is important that Init be
+// called at the very beginning of the current program - this
+// allows it to be called as a re-execution hook in the child
+// process.
+//
+// Long story short, if you want to expose `myservice` in a child
+// process, do this:
+//
+// func main() {
+//     spawn.Init(myservice)
+//     [..]
+//     child, err := spawn.Spawn()
+//     [..]
+//     child.Job("dosomething").Run()
+// }
+func Spawn() (*engine.Engine, error) {
+	if !initCalled {
+		return nil, fmt.Errorf("spawn.Init must be called at the top of the main() function")
+	}
+	cmd := exec.Command(utils.SelfPath())
+	cmd.Env = append(cmd.Env, "ENGINESPAWN=1")
+	local, remote, err := beam.SocketPair()
+	if err != nil {
+		return nil, err
+	}
+	child, err := beam.FileConn(local)
+	if err != nil {
+		local.Close()
+		remote.Close()
+		return nil, err
+	}
+	local.Close()
+	cmd.ExtraFiles = append(cmd.ExtraFiles, remote)
+	// FIXME: the beam/engine glue has no way to inform the caller
+	// of the child's termination. The next call will simply return
+	// an error.
+	if err := cmd.Start(); err != nil {
+		child.Close()
+		return nil, err
+	}
+	eng := engine.New()
+	if err := engine.NewSender(child).Install(eng); err != nil {
+		child.Close()
+		return nil, err
+	}
+	return eng, nil
+}

+ 61 - 0
engine/spawn/subengine/main.go

@@ -0,0 +1,61 @@
+package main
+
+import (
+	"fmt"
+	"github.com/dotcloud/docker/engine"
+	"github.com/dotcloud/docker/engine/spawn"
+	"log"
+	"os"
+	"os/exec"
+	"strings"
+)
+
+func main() {
+	fmt.Printf("[%d] MAIN\n", os.Getpid())
+	spawn.Init(&Worker{})
+	fmt.Printf("[%d parent] spawning\n", os.Getpid())
+	eng, err := spawn.Spawn()
+	if err != nil {
+		log.Fatal(err)
+	}
+	fmt.Printf("[parent] spawned\n")
+	job := eng.Job(os.Args[1], os.Args[2:]...)
+	job.Stdout.Add(os.Stdout)
+	job.Stderr.Add(os.Stderr)
+	job.Run()
+	// FIXME: use the job's status code
+	os.Exit(0)
+}
+
+type Worker struct {
+}
+
+func (w *Worker) Install(eng *engine.Engine) error {
+	eng.Register("exec", w.Exec)
+	eng.Register("cd", w.Cd)
+	eng.Register("echo", w.Echo)
+	return nil
+}
+
+func (w *Worker) Exec(job *engine.Job) engine.Status {
+	fmt.Printf("--> %v\n", job.Args)
+	cmd := exec.Command(job.Args[0], job.Args[1:]...)
+	cmd.Stdout = job.Stdout
+	cmd.Stderr = os.Stderr
+	if err := cmd.Run(); err != nil {
+		return job.Errorf("%v\n", err)
+	}
+	return engine.StatusOK
+}
+
+func (w *Worker) Cd(job *engine.Job) engine.Status {
+	if err := os.Chdir(job.Args[0]); err != nil {
+		return job.Errorf("%v\n", err)
+	}
+	return engine.StatusOK
+}
+
+func (w *Worker) Echo(job *engine.Job) engine.Status {
+	fmt.Fprintf(job.Stdout, "%s\n", strings.Join(job.Args, " "))
+	return engine.StatusOK
+}