builtins.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "github.com/dotcloud/docker/pkg/beam"
  6. "github.com/dotcloud/docker/pkg/beam/data"
  7. "github.com/dotcloud/docker/pkg/term"
  8. "github.com/dotcloud/docker/utils"
  9. "io"
  10. "net"
  11. "net/url"
  12. "os"
  13. "os/exec"
  14. "path"
  15. "strings"
  16. "sync"
  17. "text/template"
  18. )
  19. func CmdLogger(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  20. if err := os.MkdirAll("logs", 0700); err != nil {
  21. fmt.Fprintf(stderr, "%v\n", err)
  22. return
  23. }
  24. var tasks sync.WaitGroup
  25. defer tasks.Wait()
  26. var n int = 1
  27. r := beam.NewRouter(out)
  28. r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error {
  29. tasks.Add(1)
  30. go func(n int) {
  31. defer tasks.Done()
  32. defer attachment.Close()
  33. var streamname string
  34. if cmd := data.Message(payload).Get("cmd"); len(cmd) == 1 || cmd[1] == "stdout" {
  35. streamname = "stdout"
  36. } else {
  37. streamname = cmd[1]
  38. }
  39. if fromcmd := data.Message(payload).Get("fromcmd"); len(fromcmd) != 0 {
  40. streamname = fmt.Sprintf("%s-%s", strings.Replace(strings.Join(fromcmd, "_"), "/", "_", -1), streamname)
  41. }
  42. logfile, err := os.OpenFile(path.Join("logs", fmt.Sprintf("%d-%s", n, streamname)), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0700)
  43. if err != nil {
  44. fmt.Fprintf(stderr, "%v\n", err)
  45. return
  46. }
  47. defer logfile.Close()
  48. io.Copy(logfile, attachment)
  49. logfile.Sync()
  50. }(n)
  51. n++
  52. return nil
  53. }).Tee(out)
  54. if _, err := beam.Copy(r, in); err != nil {
  55. fmt.Fprintf(stderr, "%v\n", err)
  56. return
  57. }
  58. }
  59. func CmdRender(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  60. if len(args) != 2 {
  61. fmt.Fprintf(stderr, "Usage: %s FORMAT\n", args[0])
  62. out.Send(data.Empty().Set("status", "1").Bytes(), nil)
  63. return
  64. }
  65. txt := args[1]
  66. if !strings.HasSuffix(txt, "\n") {
  67. txt += "\n"
  68. }
  69. t := template.Must(template.New("render").Parse(txt))
  70. for {
  71. payload, attachment, err := in.Receive()
  72. if err != nil {
  73. return
  74. }
  75. msg, err := data.Decode(string(payload))
  76. if err != nil {
  77. fmt.Fprintf(stderr, "decode error: %v\n")
  78. }
  79. if err := t.Execute(stdout, msg); err != nil {
  80. fmt.Fprintf(stderr, "rendering error: %v\n", err)
  81. out.Send(data.Empty().Set("status", "1").Bytes(), nil)
  82. return
  83. }
  84. if err := out.Send(payload, attachment); err != nil {
  85. return
  86. }
  87. }
  88. }
  89. func CmdDevnull(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  90. for {
  91. _, attachment, err := in.Receive()
  92. if err != nil {
  93. return
  94. }
  95. if attachment != nil {
  96. attachment.Close()
  97. }
  98. }
  99. }
  100. func CmdPrompt(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  101. if len(args) < 2 {
  102. fmt.Fprintf(stderr, "usage: %s PROMPT...\n", args[0])
  103. return
  104. }
  105. if !term.IsTerminal(0) {
  106. fmt.Fprintf(stderr, "can't prompt: no tty available...\n")
  107. return
  108. }
  109. fmt.Printf("%s: ", strings.Join(args[1:], " "))
  110. oldState, _ := term.SaveState(0)
  111. term.DisableEcho(0, oldState)
  112. line, _, err := bufio.NewReader(os.Stdin).ReadLine()
  113. if err != nil {
  114. fmt.Fprintln(stderr, err.Error())
  115. return
  116. }
  117. val := string(line)
  118. fmt.Printf("\n")
  119. term.RestoreTerminal(0, oldState)
  120. out.Send(data.Empty().Set("fromcmd", args...).Set("value", val).Bytes(), nil)
  121. }
  122. func CmdStdio(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  123. var tasks sync.WaitGroup
  124. defer tasks.Wait()
  125. r := beam.NewRouter(out)
  126. r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error {
  127. tasks.Add(1)
  128. go func() {
  129. defer tasks.Done()
  130. defer attachment.Close()
  131. io.Copy(os.Stdout, attachment)
  132. attachment.Close()
  133. }()
  134. return nil
  135. }).Tee(out)
  136. if _, err := beam.Copy(r, in); err != nil {
  137. Fatal(err)
  138. fmt.Fprintf(stderr, "%v\n", err)
  139. return
  140. }
  141. }
  142. func CmdEcho(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  143. fmt.Fprintln(stdout, strings.Join(args[1:], " "))
  144. }
  145. func CmdPass(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  146. for {
  147. payload, attachment, err := in.Receive()
  148. if err != nil {
  149. return
  150. }
  151. if err := out.Send(payload, attachment); err != nil {
  152. if attachment != nil {
  153. attachment.Close()
  154. }
  155. return
  156. }
  157. }
  158. }
  159. func CmdSpawn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  160. c := exec.Command(utils.SelfPath())
  161. r, w, err := os.Pipe()
  162. if err != nil {
  163. fmt.Fprintf(stderr, "%v\n", err)
  164. return
  165. }
  166. c.Stdin = r
  167. c.Stdout = stdout
  168. c.Stderr = stderr
  169. go func() {
  170. fmt.Fprintf(w, strings.Join(args[1:], " "))
  171. w.Sync()
  172. w.Close()
  173. }()
  174. if err := c.Run(); err != nil {
  175. fmt.Fprintf(stderr, "%v\n", err)
  176. return
  177. }
  178. }
  179. func CmdIn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  180. os.Chdir(args[1])
  181. GetHandler("pass")([]string{"pass"}, stdout, stderr, in, out)
  182. }
  183. func CmdExec(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  184. cmd := exec.Command(args[1], args[2:]...)
  185. cmd.Stdout = stdout
  186. cmd.Stderr = stderr
  187. //cmd.Stdin = os.Stdin
  188. local, remote, err := beam.SocketPair()
  189. if err != nil {
  190. fmt.Fprintf(stderr, "%v\n", err)
  191. return
  192. }
  193. child, err := beam.FileConn(local)
  194. if err != nil {
  195. local.Close()
  196. remote.Close()
  197. fmt.Fprintf(stderr, "%v\n", err)
  198. return
  199. }
  200. local.Close()
  201. cmd.ExtraFiles = append(cmd.ExtraFiles, remote)
  202. var tasks sync.WaitGroup
  203. tasks.Add(1)
  204. go func() {
  205. defer Debugf("done copying to child\n")
  206. defer tasks.Done()
  207. defer child.CloseWrite()
  208. beam.Copy(child, in)
  209. }()
  210. tasks.Add(1)
  211. go func() {
  212. defer Debugf("done copying from child %d\n")
  213. defer tasks.Done()
  214. r := beam.NewRouter(out)
  215. r.NewRoute().All().Handler(func(p []byte, a *os.File) error {
  216. return out.Send(data.Message(p).Set("pid", fmt.Sprintf("%d", cmd.Process.Pid)).Bytes(), a)
  217. })
  218. beam.Copy(r, child)
  219. }()
  220. execErr := cmd.Run()
  221. // We can close both ends of the socket without worrying about data stuck in the buffer,
  222. // because unix socket writes are fully synchronous.
  223. child.Close()
  224. tasks.Wait()
  225. var status string
  226. if execErr != nil {
  227. status = execErr.Error()
  228. } else {
  229. status = "ok"
  230. }
  231. out.Send(data.Empty().Set("status", status).Set("cmd", args...).Bytes(), nil)
  232. }
  233. func CmdTrace(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  234. r := beam.NewRouter(out)
  235. r.NewRoute().All().Handler(func(payload []byte, attachment *os.File) error {
  236. var sfd string = "nil"
  237. if attachment != nil {
  238. sfd = fmt.Sprintf("%d", attachment.Fd())
  239. }
  240. fmt.Printf("===> %s [%s]\n", data.Message(payload).Pretty(), sfd)
  241. out.Send(payload, attachment)
  242. return nil
  243. })
  244. beam.Copy(r, in)
  245. }
  246. func CmdEmit(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  247. out.Send(data.Parse(args[1:]).Bytes(), nil)
  248. }
  249. func CmdPrint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  250. for {
  251. payload, a, err := in.Receive()
  252. if err != nil {
  253. return
  254. }
  255. // Skip commands
  256. if a != nil && data.Message(payload).Get("cmd") == nil {
  257. dup, err := beam.SendPipe(out, payload)
  258. if err != nil {
  259. a.Close()
  260. return
  261. }
  262. io.Copy(io.MultiWriter(os.Stdout, dup), a)
  263. dup.Close()
  264. } else {
  265. if err := out.Send(payload, a); err != nil {
  266. return
  267. }
  268. }
  269. }
  270. }
  271. func CmdMultiprint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  272. var tasks sync.WaitGroup
  273. defer tasks.Wait()
  274. r := beam.NewRouter(out)
  275. multiprint := func(p []byte, a *os.File) error {
  276. tasks.Add(1)
  277. go func() {
  278. defer tasks.Done()
  279. defer a.Close()
  280. msg := data.Message(string(p))
  281. input := bufio.NewScanner(a)
  282. for input.Scan() {
  283. fmt.Printf("[%s] %s\n", msg.Pretty(), input.Text())
  284. }
  285. }()
  286. return nil
  287. }
  288. r.NewRoute().KeyIncludes("type", "job").Passthrough(out)
  289. r.NewRoute().HasAttachment().Handler(multiprint).Tee(out)
  290. beam.Copy(r, in)
  291. }
  292. func CmdListen(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  293. if len(args) != 2 {
  294. out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil)
  295. return
  296. }
  297. u, err := url.Parse(args[1])
  298. if err != nil {
  299. out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
  300. return
  301. }
  302. l, err := net.Listen(u.Scheme, u.Host)
  303. if err != nil {
  304. out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
  305. return
  306. }
  307. for {
  308. conn, err := l.Accept()
  309. if err != nil {
  310. out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
  311. return
  312. }
  313. f, err := connToFile(conn)
  314. if err != nil {
  315. conn.Close()
  316. continue
  317. }
  318. out.Send(data.Empty().Set("type", "socket").Set("remoteaddr", conn.RemoteAddr().String()).Bytes(), f)
  319. }
  320. }
  321. func CmdBeamsend(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  322. if len(args) < 2 {
  323. if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
  324. Fatal(err)
  325. }
  326. return
  327. }
  328. var connector func(string) (chan net.Conn, error)
  329. connector = dialer
  330. connections, err := connector(args[1])
  331. if err != nil {
  332. out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
  333. return
  334. }
  335. // Copy in to conn
  336. SendToConn(connections, in)
  337. }
  338. func CmdBeamreceive(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  339. if len(args) != 2 {
  340. if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
  341. Fatal(err)
  342. }
  343. return
  344. }
  345. var connector func(string) (chan net.Conn, error)
  346. connector = listener
  347. connections, err := connector(args[1])
  348. if err != nil {
  349. out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
  350. return
  351. }
  352. // Copy in to conn
  353. ReceiveFromConn(connections, out)
  354. }
  355. func CmdConnect(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  356. if len(args) != 2 {
  357. out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil)
  358. return
  359. }
  360. u, err := url.Parse(args[1])
  361. if err != nil {
  362. out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
  363. return
  364. }
  365. var tasks sync.WaitGroup
  366. for {
  367. _, attachment, err := in.Receive()
  368. if err != nil {
  369. break
  370. }
  371. if attachment == nil {
  372. continue
  373. }
  374. Logf("connecting to %s/%s\n", u.Scheme, u.Host)
  375. conn, err := net.Dial(u.Scheme, u.Host)
  376. if err != nil {
  377. out.Send(data.Empty().Set("cmd", "msg", "connect error: "+err.Error()).Bytes(), nil)
  378. return
  379. }
  380. out.Send(data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil)
  381. tasks.Add(1)
  382. go func(attachment *os.File, conn net.Conn) {
  383. defer tasks.Done()
  384. // even when successful, conn.File() returns a duplicate,
  385. // so we must close the original
  386. var iotasks sync.WaitGroup
  387. iotasks.Add(2)
  388. go func(attachment *os.File, conn net.Conn) {
  389. defer iotasks.Done()
  390. io.Copy(attachment, conn)
  391. }(attachment, conn)
  392. go func(attachment *os.File, conn net.Conn) {
  393. defer iotasks.Done()
  394. io.Copy(conn, attachment)
  395. }(attachment, conn)
  396. iotasks.Wait()
  397. conn.Close()
  398. attachment.Close()
  399. }(attachment, conn)
  400. }
  401. tasks.Wait()
  402. }
  403. func CmdOpenfile(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  404. for _, name := range args {
  405. f, err := os.Open(name)
  406. if err != nil {
  407. continue
  408. }
  409. if err := out.Send(data.Empty().Set("path", name).Set("type", "file").Bytes(), f); err != nil {
  410. f.Close()
  411. }
  412. }
  413. }
  414. func CmdChdir(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
  415. os.Chdir(args[1])
  416. }