123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- package engine
- import (
- "fmt"
- "github.com/dotcloud/docker/pkg/beam"
- "github.com/dotcloud/docker/pkg/beam/data"
- "io"
- "os"
- "strconv"
- "sync"
- )
- type Sender struct {
- beam.Sender
- }
- func NewSender(s beam.Sender) *Sender {
- return &Sender{s}
- }
- func (s *Sender) Install(eng *Engine) error {
- // FIXME: this doesn't exist yet.
- eng.RegisterCatchall(s.Handle)
- return nil
- }
- func (s *Sender) Handle(job *Job) Status {
- cmd := append([]string{job.Name}, job.Args...)
- env := data.Encode(job.Env().MultiMap())
- msg := data.Empty().Set("cmd", cmd...).Set("env", env)
- peer, err := beam.SendConn(s, msg.Bytes())
- if err != nil {
- return job.Errorf("beamsend: %v", err)
- }
- defer peer.Close()
- var tasks sync.WaitGroup
- defer tasks.Wait()
- r := beam.NewRouter(nil)
- r.NewRoute().KeyStartsWith("cmd", "log", "stdout").HasAttachment().Handler(func(p []byte, stdout *os.File) error {
- tasks.Add(1)
- go func() {
- io.Copy(job.Stdout, stdout)
- stdout.Close()
- tasks.Done()
- }()
- return nil
- })
- r.NewRoute().KeyStartsWith("cmd", "log", "stderr").HasAttachment().Handler(func(p []byte, stderr *os.File) error {
- tasks.Add(1)
- go func() {
- io.Copy(job.Stderr, stderr)
- stderr.Close()
- tasks.Done()
- }()
- return nil
- })
- r.NewRoute().KeyStartsWith("cmd", "log", "stdin").HasAttachment().Handler(func(p []byte, stdin *os.File) error {
- go func() {
- io.Copy(stdin, job.Stdin)
- stdin.Close()
- }()
- return nil
- })
- var status int
- r.NewRoute().KeyStartsWith("cmd", "status").Handler(func(p []byte, f *os.File) error {
- cmd := data.Message(p).Get("cmd")
- if len(cmd) != 2 {
- return fmt.Errorf("usage: %s <0-127>", cmd[0])
- }
- s, err := strconv.ParseUint(cmd[1], 10, 8)
- if err != nil {
- return fmt.Errorf("usage: %s <0-127>", cmd[0])
- }
- status = int(s)
- return nil
- })
- if _, err := beam.Copy(r, peer); err != nil {
- return job.Errorf("%v", err)
- }
- return Status(status)
- }
- type Receiver struct {
- *Engine
- peer beam.Receiver
- }
- func NewReceiver(peer beam.Receiver) *Receiver {
- return &Receiver{Engine: New(), peer: peer}
- }
- func (rcv *Receiver) Run() error {
- r := beam.NewRouter(nil)
- r.NewRoute().KeyExists("cmd").Handler(func(p []byte, f *os.File) error {
- // Use the attachment as a beam return channel
- peer, err := beam.FileConn(f)
- if err != nil {
- f.Close()
- return err
- }
- f.Close()
- defer peer.Close()
- msg := data.Message(p)
- cmd := msg.Get("cmd")
- job := rcv.Engine.Job(cmd[0], cmd[1:]...)
- // Decode env
- env, err := data.Decode(msg.GetOne("env"))
- if err != nil {
- return fmt.Errorf("error decoding 'env': %v", err)
- }
- job.Env().InitMultiMap(env)
- stdout, err := beam.SendRPipe(peer, data.Empty().Set("cmd", "log", "stdout").Bytes())
- if err != nil {
- return err
- }
- job.Stdout.Add(stdout)
- stderr, err := beam.SendRPipe(peer, data.Empty().Set("cmd", "log", "stderr").Bytes())
- if err != nil {
- return err
- }
- job.Stderr.Add(stderr)
- stdin, err := beam.SendWPipe(peer, data.Empty().Set("cmd", "log", "stdin").Bytes())
- if err != nil {
- return err
- }
- job.Stdin.Add(stdin)
- // ignore error because we pass the raw status
- job.Run()
- err = peer.Send(data.Empty().Set("cmd", "status", fmt.Sprintf("%d", job.status)).Bytes(), nil)
- if err != nil {
- return err
- }
- return nil
- })
- _, err := beam.Copy(r, rcv.peer)
- return err
- }
|