remote.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package engine
  2. import (
  3. "fmt"
  4. "github.com/dotcloud/docker/pkg/beam"
  5. "github.com/dotcloud/docker/pkg/beam/data"
  6. "io"
  7. "os"
  8. "strconv"
  9. "sync"
  10. )
  11. type Sender struct {
  12. beam.Sender
  13. }
  14. func NewSender(s beam.Sender) *Sender {
  15. return &Sender{s}
  16. }
  17. func (s *Sender) Install(eng *Engine) error {
  18. // FIXME: this doesn't exist yet.
  19. eng.RegisterCatchall(s.Handle)
  20. return nil
  21. }
  22. func (s *Sender) Handle(job *Job) Status {
  23. cmd := append([]string{job.Name}, job.Args...)
  24. env := data.Encode(job.Env().MultiMap())
  25. msg := data.Empty().Set("cmd", cmd...).Set("env", env)
  26. peer, err := beam.SendConn(s, msg.Bytes())
  27. if err != nil {
  28. return job.Errorf("beamsend: %v", err)
  29. }
  30. defer peer.Close()
  31. var tasks sync.WaitGroup
  32. defer tasks.Wait()
  33. r := beam.NewRouter(nil)
  34. r.NewRoute().KeyStartsWith("cmd", "log", "stdout").HasAttachment().Handler(func(p []byte, stdout *os.File) error {
  35. tasks.Add(1)
  36. go func() {
  37. io.Copy(job.Stdout, stdout)
  38. stdout.Close()
  39. tasks.Done()
  40. }()
  41. return nil
  42. })
  43. r.NewRoute().KeyStartsWith("cmd", "log", "stderr").HasAttachment().Handler(func(p []byte, stderr *os.File) error {
  44. tasks.Add(1)
  45. go func() {
  46. io.Copy(job.Stderr, stderr)
  47. stderr.Close()
  48. tasks.Done()
  49. }()
  50. return nil
  51. })
  52. r.NewRoute().KeyStartsWith("cmd", "log", "stdin").HasAttachment().Handler(func(p []byte, stdin *os.File) error {
  53. go func() {
  54. io.Copy(stdin, job.Stdin)
  55. stdin.Close()
  56. }()
  57. return nil
  58. })
  59. var status int
  60. r.NewRoute().KeyStartsWith("cmd", "status").Handler(func(p []byte, f *os.File) error {
  61. cmd := data.Message(p).Get("cmd")
  62. if len(cmd) != 2 {
  63. return fmt.Errorf("usage: %s <0-127>", cmd[0])
  64. }
  65. s, err := strconv.ParseUint(cmd[1], 10, 8)
  66. if err != nil {
  67. return fmt.Errorf("usage: %s <0-127>", cmd[0])
  68. }
  69. status = int(s)
  70. return nil
  71. })
  72. if _, err := beam.Copy(r, peer); err != nil {
  73. return job.Errorf("%v", err)
  74. }
  75. return Status(status)
  76. }
  77. type Receiver struct {
  78. *Engine
  79. peer beam.Receiver
  80. }
  81. func NewReceiver(peer beam.Receiver) *Receiver {
  82. return &Receiver{Engine: New(), peer: peer}
  83. }
  84. func (rcv *Receiver) Run() error {
  85. r := beam.NewRouter(nil)
  86. r.NewRoute().KeyExists("cmd").Handler(func(p []byte, f *os.File) error {
  87. // Use the attachment as a beam return channel
  88. peer, err := beam.FileConn(f)
  89. if err != nil {
  90. f.Close()
  91. return err
  92. }
  93. f.Close()
  94. defer peer.Close()
  95. msg := data.Message(p)
  96. cmd := msg.Get("cmd")
  97. job := rcv.Engine.Job(cmd[0], cmd[1:]...)
  98. // Decode env
  99. env, err := data.Decode(msg.GetOne("env"))
  100. if err != nil {
  101. return fmt.Errorf("error decoding 'env': %v", err)
  102. }
  103. job.Env().InitMultiMap(env)
  104. stdout, err := beam.SendRPipe(peer, data.Empty().Set("cmd", "log", "stdout").Bytes())
  105. if err != nil {
  106. return err
  107. }
  108. job.Stdout.Add(stdout)
  109. stderr, err := beam.SendRPipe(peer, data.Empty().Set("cmd", "log", "stderr").Bytes())
  110. if err != nil {
  111. return err
  112. }
  113. job.Stderr.Add(stderr)
  114. stdin, err := beam.SendWPipe(peer, data.Empty().Set("cmd", "log", "stdin").Bytes())
  115. if err != nil {
  116. return err
  117. }
  118. job.Stdin.Add(stdin)
  119. // ignore error because we pass the raw status
  120. job.Run()
  121. err = peer.Send(data.Empty().Set("cmd", "status", fmt.Sprintf("%d", job.status)).Bytes(), nil)
  122. if err != nil {
  123. return err
  124. }
  125. return nil
  126. })
  127. _, err := beam.Copy(r, rcv.peer)
  128. return err
  129. }