diff --git a/Makefile b/Makefile index b29d21746e..d358678223 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,7 @@ DOCKER_MOUNT := $(if $(BINDDIR),-v "$(CURDIR)/$(BINDDIR):/go/src/github.com/dotc DOCKER_RUN_DOCKER := docker run --rm -it --privileged -e TESTFLAGS -e DOCKER_GRAPHDRIVER -e DOCKER_EXECDRIVER $(DOCKER_MOUNT) "$(DOCKER_IMAGE)" # to allow `make DOCSDIR=docs docs-shell` -DOCKER_RUN_DOCS := docker run --rm -it -p $(if $(DOCSPORT),$(DOCSPORT):)8000 $(if $(DOCSDIR),-v $(CURDIR)/$(DOCSDIR):/$(DOCSDIR)) -e AWS_S3_BUCKET +DOCKER_RUN_DOCS := docker run --rm -it $(if $(DOCSDIR),-v $(CURDIR)/$(DOCSDIR):/$(DOCSDIR)) -e AWS_S3_BUCKET default: binary @@ -26,10 +26,10 @@ cross: build $(DOCKER_RUN_DOCKER) hack/make.sh binary cross docs: docs-build - $(DOCKER_RUN_DOCS) "$(DOCKER_DOCS_IMAGE)" mkdocs serve + $(DOCKER_RUN_DOCS) -p $(if $(DOCSPORT),$(DOCSPORT):)8000 "$(DOCKER_DOCS_IMAGE)" mkdocs serve docs-shell: docs-build - $(DOCKER_RUN_DOCS) "$(DOCKER_DOCS_IMAGE)" bash + $(DOCKER_RUN_DOCS) -p $(if $(DOCSPORT),$(DOCSPORT):)8000 "$(DOCKER_DOCS_IMAGE)" bash docs-release: docs-build $(DOCKER_RUN_DOCS) "$(DOCKER_DOCS_IMAGE)" ./release.sh diff --git a/pkg/beam/MAINTAINERS b/pkg/beam/MAINTAINERS new file mode 100644 index 0000000000..db33365bcd --- /dev/null +++ b/pkg/beam/MAINTAINERS @@ -0,0 +1 @@ +Solomon Hykes diff --git a/pkg/beam/beam.go b/pkg/beam/beam.go new file mode 100644 index 0000000000..b1e4667a3f --- /dev/null +++ b/pkg/beam/beam.go @@ -0,0 +1,135 @@ +package beam + +import ( + "fmt" + "io" + "os" +) + +type Sender interface { + Send([]byte, *os.File) error +} + +type Receiver interface { + Receive() ([]byte, *os.File, error) +} + +type ReceiveCloser interface { + Receiver + Close() error +} + +type SendCloser interface { + Sender + Close() error +} + +type ReceiveSender interface { + Receiver + Sender +} + +func SendPipe(dst Sender, data []byte) (*os.File, error) { + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + if err := dst.Send(data, r); err != nil { + r.Close() + w.Close() + return nil, err + } + return w, nil +} + +func SendConn(dst Sender, data []byte) (conn *UnixConn, err error) { + local, remote, err := SocketPair() + if err != nil { + return nil, err + } + defer func() { + if err != nil { + local.Close() + remote.Close() + } + }() + conn, err = FileConn(local) + if err != nil { + return nil, err + } + local.Close() + if err := dst.Send(data, remote); err != nil { + return nil, err + } + return conn, nil +} + +func ReceiveConn(src Receiver) ([]byte, *UnixConn, error) { + for { + data, f, err := src.Receive() + if err != nil { + return nil, nil, err + } + if f == nil { + // Skip empty attachments + continue + } + conn, err := FileConn(f) + if err != nil { + // Skip beam attachments which are not connections + // (for example might be a regular file, directory etc) + continue + } + return data, conn, nil + } + panic("impossibru!") + return nil, nil, nil +} + +func Copy(dst Sender, src Receiver) (int, error) { + var n int + for { + payload, attachment, err := src.Receive() + if err == io.EOF { + return n, nil + } else if err != nil { + return n, err + } + if err := dst.Send(payload, attachment); err != nil { + if attachment != nil { + attachment.Close() + } + return n, err + } + n++ + } + panic("impossibru!") + return n, nil +} + +// MsgDesc returns a human readable description of a beam message, usually +// for debugging purposes. +func MsgDesc(payload []byte, attachment *os.File) string { + var filedesc string = "" + if attachment != nil { + filedesc = fmt.Sprintf("%d", attachment.Fd()) + } + return fmt.Sprintf("'%s'[%s]", payload, filedesc) +} + +type devnull struct{} + +func Devnull() ReceiveSender { + return devnull{} +} + +func (d devnull) Send(p []byte, a *os.File) error { + if a != nil { + a.Close() + } + return nil +} + +func (d devnull) Receive() ([]byte, *os.File, error) { + return nil, nil, io.EOF +} diff --git a/pkg/beam/beam_test.go b/pkg/beam/beam_test.go new file mode 100644 index 0000000000..2822861a37 --- /dev/null +++ b/pkg/beam/beam_test.go @@ -0,0 +1,39 @@ +package beam + +import ( + "github.com/dotcloud/docker/pkg/beam/data" + "testing" +) + +func TestSendConn(t *testing.T) { + a, b, err := USocketPair() + if err != nil { + t.Fatal(err) + } + defer a.Close() + defer b.Close() + go func() { + conn, err := SendConn(a, data.Empty().Set("type", "connection").Bytes()) + if err != nil { + t.Fatal(err) + } + if err := conn.Send(data.Empty().Set("foo", "bar").Bytes(), nil); err != nil { + t.Fatal(err) + } + conn.CloseWrite() + }() + payload, conn, err := ReceiveConn(b) + if err != nil { + t.Fatal(err) + } + if val := data.Message(string(payload)).Get("type"); val == nil || val[0] != "connection" { + t.Fatalf("%v != %v\n", val, "connection") + } + msg, _, err := conn.Receive() + if err != nil { + t.Fatal(err) + } + if val := data.Message(string(msg)).Get("foo"); val == nil || val[0] != "bar" { + t.Fatalf("%v != %v\n", val, "bar") + } +} diff --git a/pkg/beam/data/data.go b/pkg/beam/data/data.go new file mode 100644 index 0000000000..e205fe43f4 --- /dev/null +++ b/pkg/beam/data/data.go @@ -0,0 +1,115 @@ +package data + +import ( + "fmt" + "strconv" + "strings" +) + +func Encode(obj map[string][]string) string { + var msg string + msg += encodeHeader(0) + for k, values := range obj { + msg += encodeNamedList(k, values) + } + return msg +} + +func encodeHeader(msgtype int) string { + return fmt.Sprintf("%03.3d;", msgtype) +} + +func encodeString(s string) string { + return fmt.Sprintf("%d:%s,", len(s), s) +} + +var EncodeString = encodeString +var DecodeString = decodeString + +func encodeList(l []string) string { + values := make([]string, 0, len(l)) + for _, s := range l { + values = append(values, encodeString(s)) + } + return encodeString(strings.Join(values, "")) +} + +func encodeNamedList(name string, l []string) string { + return encodeString(name) + encodeList(l) +} + +func Decode(msg string) (map[string][]string, error) { + msgtype, skip, err := decodeHeader(msg) + if err != nil { + return nil, err + } + if msgtype != 0 { + // FIXME: use special error type so the caller can easily ignore + return nil, fmt.Errorf("unknown message type: %d", msgtype) + } + msg = msg[skip:] + obj := make(map[string][]string) + for len(msg) > 0 { + k, skip, err := decodeString(msg) + if err != nil { + return nil, err + } + msg = msg[skip:] + values, skip, err := decodeList(msg) + if err != nil { + return nil, err + } + msg = msg[skip:] + obj[k] = values + } + return obj, nil +} + +func decodeList(msg string) ([]string, int, error) { + blob, skip, err := decodeString(msg) + if err != nil { + return nil, 0, err + } + var l []string + for len(blob) > 0 { + v, skipv, err := decodeString(blob) + if err != nil { + return nil, 0, err + } + l = append(l, v) + blob = blob[skipv:] + } + return l, skip, nil +} + +func decodeString(msg string) (string, int, error) { + parts := strings.SplitN(msg, ":", 2) + if len(parts) != 2 { + return "", 0, fmt.Errorf("invalid format: no column") + } + var length int + if l, err := strconv.ParseUint(parts[0], 10, 64); err != nil { + return "", 0, err + } else { + length = int(l) + } + if len(parts[1]) < length+1 { + return "", 0, fmt.Errorf("message '%s' is %d bytes, expected at least %d", parts[1], len(parts[1]), length+1) + } + payload := parts[1][:length+1] + if payload[length] != ',' { + return "", 0, fmt.Errorf("message is not comma-terminated") + } + return payload[:length], len(parts[0]) + 1 + length + 1, nil +} + +func decodeHeader(msg string) (int, int, error) { + if len(msg) < 4 { + return 0, 0, fmt.Errorf("message too small") + } + msgtype, err := strconv.ParseInt(msg[:3], 10, 32) + if err != nil { + return 0, 0, err + } + return int(msgtype), 4, nil +} diff --git a/pkg/beam/data/data_test.go b/pkg/beam/data/data_test.go new file mode 100644 index 0000000000..9059922b3b --- /dev/null +++ b/pkg/beam/data/data_test.go @@ -0,0 +1,129 @@ +package data + +import ( + "strings" + "testing" +) + +func TestEncodeHelloWorld(t *testing.T) { + input := "hello world!" + output := encodeString(input) + expectedOutput := "12:hello world!," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} + +func TestEncodeEmptyString(t *testing.T) { + input := "" + output := encodeString(input) + expectedOutput := "0:," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} + +func TestEncodeEmptyList(t *testing.T) { + input := []string{} + output := encodeList(input) + expectedOutput := "0:," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} + +func TestEncodeEmptyMap(t *testing.T) { + input := make(map[string][]string) + output := Encode(input) + expectedOutput := "000;" + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} + +func TestEncode1Key1Value(t *testing.T) { + input := make(map[string][]string) + input["hello"] = []string{"world"} + output := Encode(input) + expectedOutput := "000;5:hello,8:5:world,," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} + +func TestEncode1Key2Value(t *testing.T) { + input := make(map[string][]string) + input["hello"] = []string{"beautiful", "world"} + output := Encode(input) + expectedOutput := "000;5:hello,20:9:beautiful,5:world,," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} + +func TestEncodeEmptyValue(t *testing.T) { + input := make(map[string][]string) + input["foo"] = []string{} + output := Encode(input) + expectedOutput := "000;3:foo,0:," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} + +func TestEncodeBinaryKey(t *testing.T) { + input := make(map[string][]string) + input["foo\x00bar\x7f"] = []string{} + output := Encode(input) + expectedOutput := "000;8:foo\x00bar\x7f,0:," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} + +func TestEncodeBinaryValue(t *testing.T) { + input := make(map[string][]string) + input["foo\x00bar\x7f"] = []string{"\x01\x02\x03\x04"} + output := Encode(input) + expectedOutput := "000;8:foo\x00bar\x7f,7:4:\x01\x02\x03\x04,," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} + +func TestDecodeString(t *testing.T) { + validEncodedStrings := []struct { + input string + output string + skip int + }{ + {"3:foo,", "foo", 6}, + {"5:hello,", "hello", 8}, + {"5:hello,5:world,", "hello", 8}, + } + for _, sample := range validEncodedStrings { + output, skip, err := decodeString(sample.input) + if err != nil { + t.Fatalf("error decoding '%v': %v", sample.input, err) + } + if skip != sample.skip { + t.Fatalf("invalid skip: %v!=%v", skip, sample.skip) + } + if output != sample.output { + t.Fatalf("invalid output: %v!=%v", output, sample.output) + } + } +} + +func TestDecode1Key1Value(t *testing.T) { + input := "000;3:foo,6:3:bar,," + output, err := Decode(input) + if err != nil { + t.Fatal(err) + } + if v, exists := output["foo"]; !exists { + t.Fatalf("wrong output: %v\n", output) + } else if len(v) != 1 || strings.Join(v, "") != "bar" { + t.Fatalf("wrong output: %v\n", output) + } +} diff --git a/pkg/beam/data/message.go b/pkg/beam/data/message.go new file mode 100644 index 0000000000..193fb7b241 --- /dev/null +++ b/pkg/beam/data/message.go @@ -0,0 +1,93 @@ +package data + +import ( + "fmt" + "strings" +) + +type Message string + +func Empty() Message { + return Message(Encode(nil)) +} + +func Parse(args []string) Message { + data := make(map[string][]string) + for _, word := range args { + if strings.Contains(word, "=") { + kv := strings.SplitN(word, "=", 2) + key := kv[0] + var val string + if len(kv) == 2 { + val = kv[1] + } + data[key] = []string{val} + } + } + return Message(Encode(data)) +} + +func (m Message) Add(k, v string) Message { + data, err := Decode(string(m)) + if err != nil { + return m + } + if values, exists := data[k]; exists { + data[k] = append(values, v) + } else { + data[k] = []string{v} + } + return Message(Encode(data)) +} + +func (m Message) Set(k string, v ...string) Message { + data, err := Decode(string(m)) + if err != nil { + panic(err) + return m + } + data[k] = v + return Message(Encode(data)) +} + +func (m Message) Del(k string) Message { + data, err := Decode(string(m)) + if err != nil { + panic(err) + return m + } + delete(data, k) + return Message(Encode(data)) +} + +func (m Message) Get(k string) []string { + data, err := Decode(string(m)) + if err != nil { + return nil + } + v, exists := data[k] + if !exists { + return nil + } + return v +} + +func (m Message) Pretty() string { + data, err := Decode(string(m)) + if err != nil { + return "" + } + entries := make([]string, 0, len(data)) + for k, values := range data { + entries = append(entries, fmt.Sprintf("%s=%s", k, strings.Join(values, ","))) + } + return strings.Join(entries, " ") +} + +func (m Message) String() string { + return string(m) +} + +func (m Message) Bytes() []byte { + return []byte(m) +} diff --git a/pkg/beam/data/message_test.go b/pkg/beam/data/message_test.go new file mode 100644 index 0000000000..7685769069 --- /dev/null +++ b/pkg/beam/data/message_test.go @@ -0,0 +1,53 @@ +package data + +import ( + "testing" +) + +func TestEmptyMessage(t *testing.T) { + m := Empty() + if m.String() != Encode(nil) { + t.Fatalf("%v != %v", m.String(), Encode(nil)) + } +} + +func TestSetMessage(t *testing.T) { + m := Empty().Set("foo", "bar") + output := m.String() + expectedOutput := "000;3:foo,6:3:bar,," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } + decodedOutput, err := Decode(output) + if err != nil { + t.Fatal(err) + } + if len(decodedOutput) != 1 { + t.Fatalf("wrong output data: %#v\n", decodedOutput) + } +} + +func TestSetMessageTwice(t *testing.T) { + m := Empty().Set("foo", "bar").Set("ga", "bu") + output := m.String() + expectedOutput := "000;3:foo,6:3:bar,,2:ga,5:2:bu,," + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } + decodedOutput, err := Decode(output) + if err != nil { + t.Fatal(err) + } + if len(decodedOutput) != 2 { + t.Fatalf("wrong output data: %#v\n", decodedOutput) + } +} + +func TestSetDelMessage(t *testing.T) { + m := Empty().Set("foo", "bar").Del("foo") + output := m.String() + expectedOutput := Encode(nil) + if output != expectedOutput { + t.Fatalf("'%v' != '%v'", output, expectedOutput) + } +} diff --git a/pkg/beam/data/netstring.txt b/pkg/beam/data/netstring.txt new file mode 100644 index 0000000000..17560929b6 --- /dev/null +++ b/pkg/beam/data/netstring.txt @@ -0,0 +1,92 @@ +## +## Netstrings spec copied as-is from http://cr.yp.to/proto/netstrings.txt +## + +Netstrings +D. J. Bernstein, djb@pobox.com +19970201 + + +1. Introduction + + A netstring is a self-delimiting encoding of a string. Netstrings are + very easy to generate and to parse. Any string may be encoded as a + netstring; there are no restrictions on length or on allowed bytes. + Another virtue of a netstring is that it declares the string size up + front. Thus an application can check in advance whether it has enough + space to store the entire string. + + Netstrings may be used as a basic building block for reliable network + protocols. Most high-level protocols, in effect, transmit a sequence + of strings; those strings may be encoded as netstrings and then + concatenated into a sequence of characters, which in turn may be + transmitted over a reliable stream protocol such as TCP. + + Note that netstrings can be used recursively. The result of encoding + a sequence of strings is a single string. A series of those encoded + strings may in turn be encoded into a single string. And so on. + + In this document, a string of 8-bit bytes may be written in two + different forms: as a series of hexadecimal numbers between angle + brackets, or as a sequence of ASCII characters between double quotes. + For example, <68 65 6c 6c 6f 20 77 6f 72 6c 64 21> is a string of + length 12; it is the same as the string "hello world!". + + Although this document restricts attention to strings of 8-bit bytes, + netstrings could be used with any 6-bit-or-larger character set. + + +2. Definition + + Any string of 8-bit bytes may be encoded as [len]":"[string]",". + Here [string] is the string and [len] is a nonempty sequence of ASCII + digits giving the length of [string] in decimal. The ASCII digits are + <30> for 0, <31> for 1, and so on up through <39> for 9. Extra zeros + at the front of [len] are prohibited: [len] begins with <30> exactly + when [string] is empty. + + For example, the string "hello world!" is encoded as <31 32 3a 68 + 65 6c 6c 6f 20 77 6f 72 6c 64 21 2c>, i.e., "12:hello world!,". The + empty string is encoded as "0:,". + + [len]":"[string]"," is called a netstring. [string] is called the + interpretation of the netstring. + + +3. Sample code + + The following C code starts with a buffer buf of length len and + prints it as a netstring. + + if (printf("%lu:",len) < 0) barf(); + if (fwrite(buf,1,len,stdout) < len) barf(); + if (putchar(',') < 0) barf(); + + The following C code reads a netstring and decodes it into a + dynamically allocated buffer buf of length len. + + if (scanf("%9lu",&len) < 1) barf(); /* >999999999 bytes is bad */ + if (getchar() != ':') barf(); + buf = malloc(len + 1); /* malloc(0) is not portable */ + if (!buf) barf(); + if (fread(buf,1,len,stdin) < len) barf(); + if (getchar() != ',') barf(); + + Both of these code fragments assume that the local character set is + ASCII, and that the relevant stdio streams are in binary mode. + + +4. Security considerations + + The famous Finger security hole may be blamed on Finger's use of the + CRLF encoding. In that encoding, each string is simply terminated by + CRLF. This encoding has several problems. Most importantly, it does + not declare the string size in advance. This means that a correct + CRLF parser must be prepared to ask for more and more memory as it is + reading the string. In the case of Finger, a lazy implementor found + this to be too much trouble; instead he simply declared a fixed-size + buffer and used C's gets() function. The rest is history. + + In contrast, as the above sample code shows, it is very easy to + handle netstrings without risking buffer overflow. Thus widespread + use of netstrings may improve network security. diff --git a/pkg/beam/examples/beamsh/beamsh b/pkg/beam/examples/beamsh/beamsh new file mode 100755 index 0000000000..9bfe78ef4a Binary files /dev/null and b/pkg/beam/examples/beamsh/beamsh differ diff --git a/pkg/beam/examples/beamsh/beamsh.go b/pkg/beam/examples/beamsh/beamsh.go new file mode 100644 index 0000000000..3f258de332 --- /dev/null +++ b/pkg/beam/examples/beamsh/beamsh.go @@ -0,0 +1,542 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "github.com/dotcloud/docker/pkg/beam" + "github.com/dotcloud/docker/pkg/beam/data" + "github.com/dotcloud/docker/pkg/dockerscript" + "github.com/dotcloud/docker/pkg/term" + "io" + "net" + "net/url" + "os" + "path" + "strings" + "sync" +) + +var rootPlugins = []string{ + "stdio", +} + +var ( + flX bool + flPing bool + introspect beam.ReceiveSender = beam.Devnull() +) + +func main() { + fd3 := os.NewFile(3, "beam-introspect") + if introsp, err := beam.FileConn(fd3); err == nil { + introspect = introsp + Logf("introspection enabled\n") + } else { + Logf("introspection disabled\n") + } + fd3.Close() + flag.BoolVar(&flX, "x", false, "print commands as they are being executed") + flag.Parse() + if flag.NArg() == 0 { + if term.IsTerminal(0) { + // No arguments, stdin is terminal --> interactive mode + input := bufio.NewScanner(os.Stdin) + for { + fmt.Printf("[%d] beamsh> ", os.Getpid()) + if !input.Scan() { + break + } + line := input.Text() + if len(line) != 0 { + cmd, err := dockerscript.Parse(strings.NewReader(line)) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + continue + } + if err := executeRootScript(cmd); err != nil { + Fatal(err) + } + } + if err := input.Err(); err == io.EOF { + break + } else if err != nil { + Fatal(err) + } + } + } else { + // No arguments, stdin not terminal --> batch mode + script, err := dockerscript.Parse(os.Stdin) + if err != nil { + Fatal("parse error: %v\n", err) + } + if err := executeRootScript(script); err != nil { + Fatal(err) + } + } + } else { + // 1+ arguments: parse them as script files + for _, scriptpath := range flag.Args() { + f, err := os.Open(scriptpath) + if err != nil { + Fatal(err) + } + script, err := dockerscript.Parse(f) + if err != nil { + Fatal("parse error: %v\n", err) + } + if err := executeRootScript(script); err != nil { + Fatal(err) + } + } + } +} + +func executeRootScript(script []*dockerscript.Command) error { + if len(rootPlugins) > 0 { + // If there are root plugins, wrap the script inside them + var ( + rootCmd *dockerscript.Command + lastCmd *dockerscript.Command + ) + for _, plugin := range rootPlugins { + pluginCmd := &dockerscript.Command{ + Args: []string{plugin}, + } + if rootCmd == nil { + rootCmd = pluginCmd + } else { + lastCmd.Children = []*dockerscript.Command{pluginCmd} + } + lastCmd = pluginCmd + } + lastCmd.Children = script + script = []*dockerscript.Command{rootCmd} + } + handlers, err := Handlers(introspect) + if err != nil { + return err + } + defer handlers.Close() + var tasks sync.WaitGroup + defer func() { + Debugf("Waiting for introspection...\n") + tasks.Wait() + Debugf("DONE Waiting for introspection\n") + }() + if introspect != nil { + tasks.Add(1) + go func() { + Debugf("starting introspection\n") + defer Debugf("done with introspection\n") + defer tasks.Done() + introspect.Send(data.Empty().Set("cmd", "log", "stdout").Set("message", "introspection worked!").Bytes(), nil) + Debugf("XXX starting reading introspection messages\n") + r := beam.NewRouter(handlers) + r.NewRoute().All().Handler(func(p []byte, a *os.File) error { + Logf("[INTROSPECTION] %s\n", beam.MsgDesc(p, a)) + return handlers.Send(p, a) + }) + n, err := beam.Copy(r, introspect) + Debugf("XXX done reading %d introspection messages: %v\n", n, err) + }() + } + if err := executeScript(handlers, script); err != nil { + return err + } + return nil +} + +func executeScript(out beam.Sender, script []*dockerscript.Command) error { + Debugf("executeScript(%s)\n", scriptString(script)) + defer Debugf("executeScript(%s) DONE\n", scriptString(script)) + var background sync.WaitGroup + defer background.Wait() + for _, cmd := range script { + if cmd.Background { + background.Add(1) + go func(out beam.Sender, cmd *dockerscript.Command) { + executeCommand(out, cmd) + background.Done() + }(out, cmd) + } else { + if err := executeCommand(out, cmd); err != nil { + return err + } + } + } + return nil +} + +// 1) Find a handler for the command (if no handler, fail) +// 2) Attach new in & out pair to the handler +// 3) [in the background] Copy handler output to our own output +// 4) [in the background] Run the handler +// 5) Recursively executeScript() all children commands and wait for them to complete +// 6) Wait for handler to return and (shortly afterwards) output copy to complete +// 7) Profit +func executeCommand(out beam.Sender, cmd *dockerscript.Command) error { + if flX { + fmt.Printf("+ %v\n", strings.Replace(strings.TrimRight(cmd.String(), "\n"), "\n", "\n+ ", -1)) + } + Debugf("executeCommand(%s)\n", strings.Join(cmd.Args, " ")) + defer Debugf("executeCommand(%s) DONE\n", strings.Join(cmd.Args, " ")) + if len(cmd.Args) == 0 { + return fmt.Errorf("empty command") + } + Debugf("[executeCommand] sending job '%s'\n", strings.Join(cmd.Args, " ")) + job, err := beam.SendConn(out, data.Empty().Set("cmd", cmd.Args...).Set("type", "job").Bytes()) + if err != nil { + return fmt.Errorf("%v\n", err) + } + var tasks sync.WaitGroup + tasks.Add(1) + Debugf("[executeCommand] spawning background copy of the output of '%s'\n", strings.Join(cmd.Args, " ")) + go func() { + if out != nil { + Debugf("[executeCommand] background copy of the output of '%s'\n", strings.Join(cmd.Args, " ")) + n, err := beam.Copy(out, job) + if err != nil { + Fatalf("[executeCommand] [%s] error during background copy: %v\n", strings.Join(cmd.Args, " "), err) + } + Debugf("[executeCommand] background copy done of the output of '%s': copied %d messages\n", strings.Join(cmd.Args, " "), n) + } + tasks.Done() + }() + // depth-first execution of children commands + // executeScript() blocks until all commands are completed + Debugf("[executeCommand] recursively running children of '%s'\n", strings.Join(cmd.Args, " ")) + executeScript(job, cmd.Children) + Debugf("[executeCommand] DONE recursively running children of '%s'\n", strings.Join(cmd.Args, " ")) + job.CloseWrite() + Debugf("[executeCommand] closing the input of '%s' (all children are completed)\n", strings.Join(cmd.Args, " ")) + Debugf("[executeCommand] waiting for background copy of '%s' to complete...\n", strings.Join(cmd.Args, " ")) + tasks.Wait() + Debugf("[executeCommand] background copy of '%s' complete! This means the job completed.\n", strings.Join(cmd.Args, " ")) + return nil +} + +type Handler func([]string, io.Writer, io.Writer, beam.Receiver, beam.Sender) + +func Handlers(sink beam.Sender) (*beam.UnixConn, error) { + var tasks sync.WaitGroup + pub, priv, err := beam.USocketPair() + if err != nil { + return nil, err + } + go func() { + defer func() { + Debugf("[handlers] closewrite() on endpoint\n") + // FIXME: this is not yet necessary but will be once + // there is synchronization over standard beam messages + priv.CloseWrite() + Debugf("[handlers] done closewrite() on endpoint\n") + }() + r := beam.NewRouter(sink) + r.NewRoute().HasAttachment().KeyIncludes("type", "job").Handler(func(payload []byte, attachment *os.File) error { + conn, err := beam.FileConn(attachment) + if err != nil { + attachment.Close() + return err + } + // attachment.Close() + tasks.Add(1) + go func() { + defer tasks.Done() + defer func() { + Debugf("[handlers] '%s' closewrite\n", payload) + conn.CloseWrite() + Debugf("[handlers] '%s' done closewrite\n", payload) + }() + cmd := data.Message(payload).Get("cmd") + Debugf("[handlers] received %s\n", strings.Join(cmd, " ")) + if len(cmd) == 0 { + return + } + handler := GetHandler(cmd[0]) + if handler == nil { + return + } + stdout, err := beam.SendPipe(conn, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", cmd...).Bytes()) + if err != nil { + return + } + defer stdout.Close() + stderr, err := beam.SendPipe(conn, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", cmd...).Bytes()) + if err != nil { + return + } + defer stderr.Close() + Debugf("[handlers] calling %s\n", strings.Join(cmd, " ")) + handler(cmd, stdout, stderr, beam.Receiver(conn), beam.Sender(conn)) + Debugf("[handlers] returned: %s\n", strings.Join(cmd, " ")) + }() + return nil + }) + beam.Copy(r, priv) + Debugf("[handlers] waiting for all tasks\n") + tasks.Wait() + Debugf("[handlers] all tasks returned\n") + }() + return pub, nil +} + +func GetHandler(name string) Handler { + if name == "logger" { + return CmdLogger + } else if name == "render" { + return CmdRender + } else if name == "devnull" { + return CmdDevnull + } else if name == "prompt" { + return CmdPrompt + } else if name == "stdio" { + return CmdStdio + } else if name == "echo" { + return CmdEcho + } else if name == "pass" { + return CmdPass + } else if name == "in" { + return CmdIn + } else if name == "exec" { + return CmdExec + } else if name == "trace" { + return CmdTrace + } else if name == "emit" { + return CmdEmit + } else if name == "print" { + return CmdPrint + } else if name == "multiprint" { + return CmdMultiprint + } else if name == "listen" { + return CmdListen + } else if name == "beamsend" { + return CmdBeamsend + } else if name == "beamreceive" { + return CmdBeamreceive + } else if name == "connect" { + return CmdConnect + } else if name == "openfile" { + return CmdOpenfile + } else if name == "spawn" { + return CmdSpawn + } else if name == "chdir" { + return CmdChdir + } + return nil +} + +// VARIOUS HELPER FUNCTIONS: + +func connToFile(conn net.Conn) (f *os.File, err error) { + if connWithFile, ok := conn.(interface { + File() (*os.File, error) + }); !ok { + return nil, fmt.Errorf("no file descriptor available") + } else { + f, err = connWithFile.File() + if err != nil { + return nil, err + } + } + return f, err +} + +type Msg struct { + payload []byte + attachment *os.File +} + +func Logf(msg string, args ...interface{}) (int, error) { + if len(msg) == 0 || msg[len(msg)-1] != '\n' { + msg = msg + "\n" + } + msg = fmt.Sprintf("[%v] [%v] %s", os.Getpid(), path.Base(os.Args[0]), msg) + return fmt.Printf(msg, args...) +} + +func Debugf(msg string, args ...interface{}) { + if os.Getenv("BEAMDEBUG") != "" { + Logf(msg, args...) + } +} + +func Fatalf(msg string, args ...interface{}) { + Logf(msg, args...) + os.Exit(1) +} + +func Fatal(args ...interface{}) { + Fatalf("%v", args[0]) +} + +func scriptString(script []*dockerscript.Command) string { + lines := make([]string, 0, len(script)) + for _, cmd := range script { + line := strings.Join(cmd.Args, " ") + if len(cmd.Children) > 0 { + line += fmt.Sprintf(" { %s }", scriptString(cmd.Children)) + } else { + line += " {}" + } + lines = append(lines, line) + } + return fmt.Sprintf("'%s'", strings.Join(lines, "; ")) +} + +func dialer(addr string) (chan net.Conn, error) { + u, err := url.Parse(addr) + if err != nil { + return nil, err + } + connections := make(chan net.Conn) + go func() { + defer close(connections) + for { + conn, err := net.Dial(u.Scheme, u.Host) + if err != nil { + return + } + connections <- conn + } + }() + return connections, nil +} + +func listener(addr string) (chan net.Conn, error) { + u, err := url.Parse(addr) + if err != nil { + return nil, err + } + l, err := net.Listen(u.Scheme, u.Host) + if err != nil { + return nil, err + } + connections := make(chan net.Conn) + go func() { + defer close(connections) + for { + conn, err := l.Accept() + if err != nil { + return + } + Logf("new connection\n") + connections <- conn + } + }() + return connections, nil +} + +func SendToConn(connections chan net.Conn, src beam.Receiver) error { + var tasks sync.WaitGroup + defer tasks.Wait() + for { + payload, attachment, err := src.Receive() + if err == io.EOF { + return nil + } else if err != nil { + return err + } + conn, ok := <-connections + if !ok { + break + } + Logf("Sending %s\n", msgDesc(payload, attachment)) + tasks.Add(1) + go func(payload []byte, attachment *os.File, conn net.Conn) { + defer tasks.Done() + if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil { + return + } + if attachment == nil { + conn.Close() + return + } + var iotasks sync.WaitGroup + iotasks.Add(2) + go func(attachment *os.File, conn net.Conn) { + defer iotasks.Done() + Debugf("copying the connection to [%d]\n", attachment.Fd()) + io.Copy(attachment, conn) + attachment.Close() + Debugf("done copying the connection to [%d]\n", attachment.Fd()) + }(attachment, conn) + go func(attachment *os.File, conn net.Conn) { + defer iotasks.Done() + Debugf("copying [%d] to the connection\n", attachment.Fd()) + io.Copy(conn, attachment) + conn.Close() + Debugf("done copying [%d] to the connection\n", attachment.Fd()) + }(attachment, conn) + iotasks.Wait() + }(payload, attachment, conn) + } + return nil +} + +func msgDesc(payload []byte, attachment *os.File) string { + return beam.MsgDesc(payload, attachment) +} + +func ReceiveFromConn(connections chan net.Conn, dst beam.Sender) error { + for conn := range connections { + err := func() error { + Logf("parsing message from network...\n") + defer Logf("done parsing message from network\n") + buf := make([]byte, 4098) + n, err := conn.Read(buf) + if n == 0 { + conn.Close() + if err == io.EOF { + return nil + } else { + return err + } + } + Logf("decoding message from '%s'\n", buf[:n]) + header, skip, err := data.DecodeString(string(buf[:n])) + if err != nil { + conn.Close() + return err + } + pub, priv, err := beam.SocketPair() + if err != nil { + return err + } + Logf("decoded message: %s\n", data.Message(header).Pretty()) + go func(skipped []byte, conn net.Conn, f *os.File) { + // this closes both conn and f + if len(skipped) > 0 { + if _, err := f.Write(skipped); err != nil { + Logf("ERROR: %v\n", err) + f.Close() + conn.Close() + return + } + } + bicopy(conn, f) + }(buf[skip:n], conn, pub) + if err := dst.Send([]byte(header), priv); err != nil { + return err + } + return nil + }() + if err != nil { + Logf("Error reading from connection: %v\n", err) + } + } + return nil +} + +func bicopy(a, b io.ReadWriteCloser) { + var iotasks sync.WaitGroup + oneCopy := func(dst io.WriteCloser, src io.Reader) { + defer iotasks.Done() + io.Copy(dst, src) + dst.Close() + } + iotasks.Add(2) + go oneCopy(a, b) + go oneCopy(b, a) + iotasks.Wait() +} diff --git a/pkg/beam/examples/beamsh/builtins.go b/pkg/beam/examples/beamsh/builtins.go new file mode 100644 index 0000000000..cc94d2b5fb --- /dev/null +++ b/pkg/beam/examples/beamsh/builtins.go @@ -0,0 +1,441 @@ +package main + +import ( + "bufio" + "fmt" + "github.com/dotcloud/docker/pkg/beam" + "github.com/dotcloud/docker/pkg/beam/data" + "github.com/dotcloud/docker/pkg/term" + "github.com/dotcloud/docker/utils" + "io" + "net" + "net/url" + "os" + "os/exec" + "path" + "strings" + "sync" + "text/template" +) + +func CmdLogger(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if err := os.MkdirAll("logs", 0700); err != nil { + fmt.Fprintf(stderr, "%v\n", err) + return + } + var tasks sync.WaitGroup + defer tasks.Wait() + var n int = 1 + r := beam.NewRouter(out) + r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error { + tasks.Add(1) + go func(n int) { + defer tasks.Done() + defer attachment.Close() + var streamname string + if cmd := data.Message(payload).Get("cmd"); len(cmd) == 1 || cmd[1] == "stdout" { + streamname = "stdout" + } else { + streamname = cmd[1] + } + if fromcmd := data.Message(payload).Get("fromcmd"); len(fromcmd) != 0 { + streamname = fmt.Sprintf("%s-%s", strings.Replace(strings.Join(fromcmd, "_"), "/", "_", -1), streamname) + } + logfile, err := os.OpenFile(path.Join("logs", fmt.Sprintf("%d-%s", n, streamname)), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0700) + if err != nil { + fmt.Fprintf(stderr, "%v\n", err) + return + } + defer logfile.Close() + io.Copy(logfile, attachment) + logfile.Sync() + }(n) + n++ + return nil + }).Tee(out) + if _, err := beam.Copy(r, in); err != nil { + fmt.Fprintf(stderr, "%v\n", err) + return + } +} + +func CmdRender(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if len(args) != 2 { + fmt.Fprintf(stderr, "Usage: %s FORMAT\n", args[0]) + out.Send(data.Empty().Set("status", "1").Bytes(), nil) + return + } + txt := args[1] + if !strings.HasSuffix(txt, "\n") { + txt += "\n" + } + t := template.Must(template.New("render").Parse(txt)) + for { + payload, attachment, err := in.Receive() + if err != nil { + return + } + msg, err := data.Decode(string(payload)) + if err != nil { + fmt.Fprintf(stderr, "decode error: %v\n") + } + if err := t.Execute(stdout, msg); err != nil { + fmt.Fprintf(stderr, "rendering error: %v\n", err) + out.Send(data.Empty().Set("status", "1").Bytes(), nil) + return + } + if err := out.Send(payload, attachment); err != nil { + return + } + } +} + +func CmdDevnull(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + for { + _, attachment, err := in.Receive() + if err != nil { + return + } + if attachment != nil { + attachment.Close() + } + } +} + +func CmdPrompt(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if len(args) < 2 { + fmt.Fprintf(stderr, "usage: %s PROMPT...\n", args[0]) + return + } + if !term.IsTerminal(0) { + fmt.Fprintf(stderr, "can't prompt: no tty available...\n") + return + } + fmt.Printf("%s: ", strings.Join(args[1:], " ")) + oldState, _ := term.SaveState(0) + term.DisableEcho(0, oldState) + line, _, err := bufio.NewReader(os.Stdin).ReadLine() + if err != nil { + fmt.Fprintln(stderr, err.Error()) + return + } + val := string(line) + fmt.Printf("\n") + term.RestoreTerminal(0, oldState) + out.Send(data.Empty().Set("fromcmd", args...).Set("value", val).Bytes(), nil) +} + +func CmdStdio(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + var tasks sync.WaitGroup + defer tasks.Wait() + + r := beam.NewRouter(out) + r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error { + tasks.Add(1) + go func() { + defer tasks.Done() + defer attachment.Close() + io.Copy(os.Stdout, attachment) + attachment.Close() + }() + return nil + }).Tee(out) + + if _, err := beam.Copy(r, in); err != nil { + Fatal(err) + fmt.Fprintf(stderr, "%v\n", err) + return + } +} + +func CmdEcho(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + fmt.Fprintln(stdout, strings.Join(args[1:], " ")) +} + +func CmdPass(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + for { + payload, attachment, err := in.Receive() + if err != nil { + return + } + if err := out.Send(payload, attachment); err != nil { + if attachment != nil { + attachment.Close() + } + return + } + } +} + +func CmdSpawn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + c := exec.Command(utils.SelfPath()) + r, w, err := os.Pipe() + if err != nil { + fmt.Fprintf(stderr, "%v\n", err) + return + } + c.Stdin = r + c.Stdout = stdout + c.Stderr = stderr + go func() { + fmt.Fprintf(w, strings.Join(args[1:], " ")) + w.Sync() + w.Close() + }() + if err := c.Run(); err != nil { + fmt.Fprintf(stderr, "%v\n", err) + return + } +} + +func CmdIn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + os.Chdir(args[1]) + GetHandler("pass")([]string{"pass"}, stdout, stderr, in, out) +} + +func CmdExec(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + cmd := exec.Command(args[1], args[2:]...) + cmd.Stdout = stdout + cmd.Stderr = stderr + //cmd.Stdin = os.Stdin + local, remote, err := beam.SocketPair() + if err != nil { + fmt.Fprintf(stderr, "%v\n", err) + return + } + child, err := beam.FileConn(local) + if err != nil { + local.Close() + remote.Close() + fmt.Fprintf(stderr, "%v\n", err) + return + } + local.Close() + cmd.ExtraFiles = append(cmd.ExtraFiles, remote) + + var tasks sync.WaitGroup + tasks.Add(1) + go func() { + defer Debugf("done copying to child\n") + defer tasks.Done() + defer child.CloseWrite() + beam.Copy(child, in) + }() + + tasks.Add(1) + go func() { + defer Debugf("done copying from child %d\n") + defer tasks.Done() + r := beam.NewRouter(out) + r.NewRoute().All().Handler(func(p []byte, a *os.File) error { + return out.Send(data.Message(p).Set("pid", fmt.Sprintf("%d", cmd.Process.Pid)).Bytes(), a) + }) + beam.Copy(r, child) + }() + execErr := cmd.Run() + // We can close both ends of the socket without worrying about data stuck in the buffer, + // because unix socket writes are fully synchronous. + child.Close() + tasks.Wait() + var status string + if execErr != nil { + status = execErr.Error() + } else { + status = "ok" + } + out.Send(data.Empty().Set("status", status).Set("cmd", args...).Bytes(), nil) +} + +func CmdTrace(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + r := beam.NewRouter(out) + r.NewRoute().All().Handler(func(payload []byte, attachment *os.File) error { + var sfd string = "nil" + if attachment != nil { + sfd = fmt.Sprintf("%d", attachment.Fd()) + } + fmt.Printf("===> %s [%s]\n", data.Message(payload).Pretty(), sfd) + out.Send(payload, attachment) + return nil + }) + beam.Copy(r, in) +} + +func CmdEmit(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + out.Send(data.Parse(args[1:]).Bytes(), nil) +} + +func CmdPrint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + for { + payload, a, err := in.Receive() + if err != nil { + return + } + // Skip commands + if a != nil && data.Message(payload).Get("cmd") == nil { + dup, err := beam.SendPipe(out, payload) + if err != nil { + a.Close() + return + } + io.Copy(io.MultiWriter(os.Stdout, dup), a) + dup.Close() + } else { + if err := out.Send(payload, a); err != nil { + return + } + } + } +} + +func CmdMultiprint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + var tasks sync.WaitGroup + defer tasks.Wait() + r := beam.NewRouter(out) + multiprint := func(p []byte, a *os.File) error { + tasks.Add(1) + go func() { + defer tasks.Done() + defer a.Close() + msg := data.Message(string(p)) + input := bufio.NewScanner(a) + for input.Scan() { + fmt.Printf("[%s] %s\n", msg.Pretty(), input.Text()) + } + }() + return nil + } + r.NewRoute().KeyIncludes("type", "job").Passthrough(out) + r.NewRoute().HasAttachment().Handler(multiprint).Tee(out) + beam.Copy(r, in) +} + +func CmdListen(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if len(args) != 2 { + out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil) + return + } + u, err := url.Parse(args[1]) + if err != nil { + out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + l, err := net.Listen(u.Scheme, u.Host) + if err != nil { + out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + for { + conn, err := l.Accept() + if err != nil { + out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + f, err := connToFile(conn) + if err != nil { + conn.Close() + continue + } + out.Send(data.Empty().Set("type", "socket").Set("remoteaddr", conn.RemoteAddr().String()).Bytes(), f) + } +} + +func CmdBeamsend(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if len(args) < 2 { + if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil { + Fatal(err) + } + return + } + var connector func(string) (chan net.Conn, error) + connector = dialer + connections, err := connector(args[1]) + if err != nil { + out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + // Copy in to conn + SendToConn(connections, in) +} + +func CmdBeamreceive(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if len(args) != 2 { + if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil { + Fatal(err) + } + return + } + var connector func(string) (chan net.Conn, error) + connector = listener + connections, err := connector(args[1]) + if err != nil { + out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + // Copy in to conn + ReceiveFromConn(connections, out) +} + +func CmdConnect(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + if len(args) != 2 { + out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil) + return + } + u, err := url.Parse(args[1]) + if err != nil { + out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil) + return + } + var tasks sync.WaitGroup + for { + _, attachment, err := in.Receive() + if err != nil { + break + } + if attachment == nil { + continue + } + Logf("connecting to %s/%s\n", u.Scheme, u.Host) + conn, err := net.Dial(u.Scheme, u.Host) + if err != nil { + out.Send(data.Empty().Set("cmd", "msg", "connect error: "+err.Error()).Bytes(), nil) + return + } + out.Send(data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil) + tasks.Add(1) + go func(attachment *os.File, conn net.Conn) { + defer tasks.Done() + // even when successful, conn.File() returns a duplicate, + // so we must close the original + var iotasks sync.WaitGroup + iotasks.Add(2) + go func(attachment *os.File, conn net.Conn) { + defer iotasks.Done() + io.Copy(attachment, conn) + }(attachment, conn) + go func(attachment *os.File, conn net.Conn) { + defer iotasks.Done() + io.Copy(conn, attachment) + }(attachment, conn) + iotasks.Wait() + conn.Close() + attachment.Close() + }(attachment, conn) + } + tasks.Wait() +} + +func CmdOpenfile(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + for _, name := range args { + f, err := os.Open(name) + if err != nil { + continue + } + if err := out.Send(data.Empty().Set("path", name).Set("type", "file").Bytes(), f); err != nil { + f.Close() + } + } +} + +func CmdChdir(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) { + os.Chdir(args[1]) +} diff --git a/pkg/beam/examples/beamsh/scripts/bug0.ds b/pkg/beam/examples/beamsh/scripts/bug0.ds new file mode 100755 index 0000000000..89b75230be --- /dev/null +++ b/pkg/beam/examples/beamsh/scripts/bug0.ds @@ -0,0 +1,3 @@ +#!/usr/bin/env beamsh + +exec ls -l diff --git a/pkg/beam/examples/beamsh/scripts/bug1.ds b/pkg/beam/examples/beamsh/scripts/bug1.ds new file mode 100755 index 0000000000..2d8a9e2ed9 --- /dev/null +++ b/pkg/beam/examples/beamsh/scripts/bug1.ds @@ -0,0 +1,5 @@ +#!/usr/bin/env beamsh + +trace { + exec ls -l +} diff --git a/pkg/beam/examples/beamsh/scripts/bug2.ds b/pkg/beam/examples/beamsh/scripts/bug2.ds new file mode 100755 index 0000000000..08f0431f68 --- /dev/null +++ b/pkg/beam/examples/beamsh/scripts/bug2.ds @@ -0,0 +1,7 @@ +#!/usr/bin/env beamsh + +trace { + stdio { + exec ls -l + } +} diff --git a/pkg/beam/examples/beamsh/scripts/bug3.ds b/pkg/beam/examples/beamsh/scripts/bug3.ds new file mode 100755 index 0000000000..7bb8694d49 --- /dev/null +++ b/pkg/beam/examples/beamsh/scripts/bug3.ds @@ -0,0 +1,10 @@ +#!/usr/bin/env beamsh -x + +trace outer { + # stdio fails + stdio { + trace inner { + exec ls -l + } + } +} diff --git a/pkg/beam/examples/beamsh/scripts/bug4.ds b/pkg/beam/examples/beamsh/scripts/bug4.ds new file mode 100755 index 0000000000..b7beedbae2 --- /dev/null +++ b/pkg/beam/examples/beamsh/scripts/bug4.ds @@ -0,0 +1,9 @@ +#!/usr/bin/env beamsh + +stdio { + trace { + stdio { + exec ls -l + } + } +} diff --git a/pkg/beam/examples/beamsh/scripts/bug5.ds b/pkg/beam/examples/beamsh/scripts/bug5.ds new file mode 100755 index 0000000000..9f9a85515d --- /dev/null +++ b/pkg/beam/examples/beamsh/scripts/bug5.ds @@ -0,0 +1,6 @@ +#!/usr/bin/env beamsh + +stdio { + # exec fails + exec ls -l +} diff --git a/pkg/beam/examples/beamsh/scripts/bug6.ds b/pkg/beam/examples/beamsh/scripts/bug6.ds new file mode 100755 index 0000000000..90281401cd --- /dev/null +++ b/pkg/beam/examples/beamsh/scripts/bug6.ds @@ -0,0 +1,7 @@ +#!/usr/bin/env beamsh + +stdio { + trace { + echo hello + } +} diff --git a/pkg/beam/examples/beamsh/scripts/bug7.ds b/pkg/beam/examples/beamsh/scripts/bug7.ds new file mode 100755 index 0000000000..b6e7bd9201 --- /dev/null +++ b/pkg/beam/examples/beamsh/scripts/bug7.ds @@ -0,0 +1,6 @@ +#!/usr/bin/env beamsh + +stdio { + # exec fails + echo hello world +} diff --git a/pkg/beam/examples/beamsh/scripts/demo1.ds b/pkg/beam/examples/beamsh/scripts/demo1.ds new file mode 100755 index 0000000000..20a3359f3a --- /dev/null +++ b/pkg/beam/examples/beamsh/scripts/demo1.ds @@ -0,0 +1,9 @@ +#!/usr/bin/env beamsh + +devnull { + multiprint { + exec tail -f /var/log/system.log & + exec ls -l + exec ls ksdhfkjdshf jksdfhkjsdhf + } +} diff --git a/pkg/beam/examples/beamsh/scripts/helloworld.ds b/pkg/beam/examples/beamsh/scripts/helloworld.ds new file mode 100755 index 0000000000..32e59b062e --- /dev/null +++ b/pkg/beam/examples/beamsh/scripts/helloworld.ds @@ -0,0 +1,8 @@ +#!/usr/bin/env beamsh + +print { + trace { + emit msg=hello + emit msg=world + } +} diff --git a/pkg/beam/examples/beamsh/scripts/logdemo.ds b/pkg/beam/examples/beamsh/scripts/logdemo.ds new file mode 100755 index 0000000000..8b729a966f --- /dev/null +++ b/pkg/beam/examples/beamsh/scripts/logdemo.ds @@ -0,0 +1,9 @@ +#!/usr/bin/env beamsh + +trace { + log { + exec ls -l + exec ls /tmp/jhsdfkjhsdjkfhsdjkfhsdjkkhsdjkf + echo hello world + } +} diff --git a/pkg/beam/examples/beamsh/scripts/miniserver.ds b/pkg/beam/examples/beamsh/scripts/miniserver.ds new file mode 100755 index 0000000000..9707477ee0 --- /dev/null +++ b/pkg/beam/examples/beamsh/scripts/miniserver.ds @@ -0,0 +1,9 @@ +#!/usr/bin/env beamsh + +multiprint { + trace { + listen tcp://localhost:7676 & + listen tcp://localhost:8787 & + } +} + diff --git a/pkg/beam/router.go b/pkg/beam/router.go new file mode 100644 index 0000000000..fc41a8991b --- /dev/null +++ b/pkg/beam/router.go @@ -0,0 +1,184 @@ +package beam + +import ( + "fmt" + "github.com/dotcloud/docker/pkg/beam/data" + "io" + "os" +) + +type Router struct { + routes []*Route + sink Sender +} + +func NewRouter(sink Sender) *Router { + return &Router{sink: sink} +} + +func (r *Router) Send(payload []byte, attachment *os.File) (err error) { + //fmt.Printf("Router.Send(%s)\n", MsgDesc(payload, attachment)) + defer func() { + //fmt.Printf("DONE Router.Send(%s) = %v\n", MsgDesc(payload, attachment), err) + }() + for _, route := range r.routes { + if route.Match(payload, attachment) { + return route.Handle(payload, attachment) + } + } + if r.sink != nil { + // fmt.Printf("[%d] [Router.Send] no match. sending %s to sink %#v\n", os.Getpid(), MsgDesc(payload, attachment), r.sink) + return r.sink.Send(payload, attachment) + } + //fmt.Printf("[Router.Send] no match. return error.\n") + return fmt.Errorf("no matching route") +} + +func (r *Router) NewRoute() *Route { + route := &Route{} + r.routes = append(r.routes, route) + return route +} + +type Route struct { + rules []func([]byte, *os.File) bool + handler func([]byte, *os.File) error +} + +func (route *Route) Match(payload []byte, attachment *os.File) bool { + for _, rule := range route.rules { + if !rule(payload, attachment) { + return false + } + } + return true +} + +func (route *Route) Handle(payload []byte, attachment *os.File) error { + if route.handler == nil { + return nil + } + return route.handler(payload, attachment) +} + +func (r *Route) HasAttachment() *Route { + r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { + return attachment != nil + }) + return r +} + +func (route *Route) Tee(dst Sender) *Route { + inner := route.handler + route.handler = func(payload []byte, attachment *os.File) error { + if inner == nil { + return nil + } + if attachment == nil { + return inner(payload, attachment) + } + // Setup the tee + w, err := SendPipe(dst, payload) + if err != nil { + return err + } + teeR, teeW, err := os.Pipe() + if err != nil { + w.Close() + return err + } + go func() { + io.Copy(io.MultiWriter(teeW, w), attachment) + attachment.Close() + w.Close() + teeW.Close() + }() + return inner(payload, teeR) + } + return route +} + +func (r *Route) Filter(f func([]byte, *os.File) bool) *Route { + r.rules = append(r.rules, f) + return r +} + +func (r *Route) KeyStartsWith(k string, beginning ...string) *Route { + r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { + values := data.Message(payload).Get(k) + if values == nil { + return false + } + if len(values) < len(beginning) { + return false + } + for i, v := range beginning { + if v != values[i] { + return false + } + } + return true + }) + return r +} + +func (r *Route) KeyEquals(k string, full ...string) *Route { + r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { + values := data.Message(payload).Get(k) + if len(values) != len(full) { + return false + } + for i, v := range full { + if v != values[i] { + return false + } + } + return true + }) + return r +} + +func (r *Route) KeyIncludes(k, v string) *Route { + r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { + for _, val := range data.Message(payload).Get(k) { + if val == v { + return true + } + } + return false + }) + return r +} + +func (r *Route) NoKey(k string) *Route { + r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { + return len(data.Message(payload).Get(k)) == 0 + }) + return r +} + +func (r *Route) KeyExists(k string) *Route { + r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { + return data.Message(payload).Get(k) != nil + }) + return r +} + +func (r *Route) Passthrough(dst Sender) *Route { + r.handler = func(payload []byte, attachment *os.File) error { + return dst.Send(payload, attachment) + } + return r +} + +func (r *Route) All() *Route { + r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool { + return true + }) + return r +} + +func (r *Route) Handler(h func([]byte, *os.File) error) *Route { + r.handler = h + return r +} diff --git a/pkg/beam/router_test.go b/pkg/beam/router_test.go new file mode 100644 index 0000000000..f7f7bf1d2d --- /dev/null +++ b/pkg/beam/router_test.go @@ -0,0 +1,95 @@ +package beam + +import ( + "fmt" + "io/ioutil" + "os" + "sync" + "testing" +) + +type msg struct { + payload []byte + attachment *os.File +} + +func (m msg) String() string { + return MsgDesc(m.payload, m.attachment) +} + +type mockReceiver []msg + +func (r *mockReceiver) Send(p []byte, a *os.File) error { + (*r) = append((*r), msg{p, a}) + return nil +} + +func TestSendNoSinkNoRoute(t *testing.T) { + r := NewRouter(nil) + if err := r.Send([]byte("hello"), nil); err == nil { + t.Fatalf("error expected") + } + a, b, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer a.Close() + defer b.Close() + if err := r.Send([]byte("foo bar baz"), a); err == nil { + t.Fatalf("error expected") + } +} + +func TestSendSinkNoRoute(t *testing.T) { + var sink mockReceiver + r := NewRouter(&sink) + if err := r.Send([]byte("hello"), nil); err != nil { + t.Fatal(err) + } + a, b, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer a.Close() + defer b.Close() + if err := r.Send([]byte("world"), a); err != nil { + t.Fatal(err) + } + if len(sink) != 2 { + t.Fatalf("%#v\n", sink) + } + if string(sink[0].payload) != "hello" { + t.Fatalf("%#v\n", sink) + } + if sink[0].attachment != nil { + t.Fatalf("%#v\n", sink) + } + if string(sink[1].payload) != "world" { + t.Fatalf("%#v\n", sink) + } + if sink[1].attachment == nil || sink[1].attachment.Fd() > 42 || sink[1].attachment.Fd() < 0 { + t.Fatalf("%v\n", sink) + } + var tasks sync.WaitGroup + tasks.Add(2) + go func() { + defer tasks.Done() + fmt.Printf("[%d] Reading from '%d'\n", os.Getpid(), sink[1].attachment.Fd()) + data, err := ioutil.ReadAll(sink[1].attachment) + if err != nil { + t.Fatal(err) + } + if string(data) != "foo bar\n" { + t.Fatalf("%v\n", string(data)) + } + }() + go func() { + defer tasks.Done() + fmt.Printf("[%d] writing to '%d'\n", os.Getpid(), a.Fd()) + if _, err := fmt.Fprintf(b, "foo bar\n"); err != nil { + t.Fatal(err) + } + b.Close() + }() + tasks.Wait() +} diff --git a/pkg/beam/service.go b/pkg/beam/service.go new file mode 100644 index 0000000000..8e117059cb --- /dev/null +++ b/pkg/beam/service.go @@ -0,0 +1,85 @@ +package beam + +import ( + "net" +) + +// Listen is a convenience interface for applications to create service endpoints +// which can be easily used with existing networking code. +// +// Listen registers a new service endpoint on the beam connection `conn`, using the +// service name `name`. It returns a listener which can be used in the usual +// way. Calling Accept() on the listener will block until a new connection is available +// on the service endpoint. The endpoint is then returned as a regular net.Conn and +// can be used as a regular network connection. +// +// Note that if the underlying file descriptor received in attachment is nil or does +// not point to a connection, that message will be skipped. +// +func Listen(conn Sender, name string) (net.Listener, error) { + endpoint, err := SendConn(conn, []byte(name)) + if err != nil { + return nil, err + } + return &listener{ + name: name, + endpoint: endpoint, + }, nil +} + +func Connect(ctx *UnixConn, name string) (net.Conn, error) { + l, err := Listen(ctx, name) + if err != nil { + return nil, err + } + conn, err := l.Accept() + if err != nil { + return nil, err + } + return conn, nil +} + +type listener struct { + name string + endpoint ReceiveCloser +} + +func (l *listener) Accept() (net.Conn, error) { + for { + _, f, err := l.endpoint.Receive() + if err != nil { + return nil, err + } + if f == nil { + // Skip empty attachments + continue + } + conn, err := net.FileConn(f) + if err != nil { + // Skip beam attachments which are not connections + // (for example might be a regular file, directory etc) + continue + } + return conn, nil + } + panic("impossibru!") + return nil, nil +} + +func (l *listener) Close() error { + return l.endpoint.Close() +} + +func (l *listener) Addr() net.Addr { + return addr(l.name) +} + +type addr string + +func (a addr) Network() string { + return "beam" +} + +func (a addr) String() string { + return string(a) +} diff --git a/pkg/beam/unix.go b/pkg/beam/unix.go new file mode 100644 index 0000000000..b480c47eb9 --- /dev/null +++ b/pkg/beam/unix.go @@ -0,0 +1,211 @@ +package beam + +import ( + "bufio" + "fmt" + "net" + "os" + "syscall" +) + +func debugCheckpoint(msg string, args ...interface{}) { + if os.Getenv("DEBUG") == "" { + return + } + os.Stdout.Sync() + tty, _ := os.OpenFile("/dev/tty", os.O_RDWR, 0700) + fmt.Fprintf(tty, msg, args...) + bufio.NewScanner(tty).Scan() + tty.Close() +} + +type UnixConn struct { + *net.UnixConn +} + +func FileConn(f *os.File) (*UnixConn, error) { + conn, err := net.FileConn(f) + if err != nil { + return nil, err + } + uconn, ok := conn.(*net.UnixConn) + if !ok { + conn.Close() + return nil, fmt.Errorf("%d: not a unix connection", f.Fd()) + } + return &UnixConn{uconn}, nil + +} + +// Send sends a new message on conn with data and f as payload and +// attachment, respectively. +// On success, f is closed +func (conn *UnixConn) Send(data []byte, f *os.File) error { + { + var fd int = -1 + if f != nil { + fd = int(f.Fd()) + } + debugCheckpoint("===DEBUG=== about to send '%s'[%d]. Hit enter to confirm: ", data, fd) + } + var fds []int + if f != nil { + fds = append(fds, int(f.Fd())) + } + if err := sendUnix(conn.UnixConn, data, fds...); err != nil { + return err + } + + if f != nil { + f.Close() + } + return nil +} + +// Receive waits for a new message on conn, and receives its payload +// and attachment, or an error if any. +// +// If more than 1 file descriptor is sent in the message, they are all +// closed except for the first, which is the attachment. +// It is legal for a message to have no attachment or an empty payload. +func (conn *UnixConn) Receive() (rdata []byte, rf *os.File, rerr error) { + defer func() { + var fd int = -1 + if rf != nil { + fd = int(rf.Fd()) + } + debugCheckpoint("===DEBUG=== Receive() -> '%s'[%d]. Hit enter to continue.\n", rdata, fd) + }() + for { + data, fds, err := receiveUnix(conn.UnixConn) + if err != nil { + return nil, nil, err + } + var f *os.File + if len(fds) > 1 { + for _, fd := range fds[1:] { + syscall.Close(fd) + } + } + if len(fds) >= 1 { + f = os.NewFile(uintptr(fds[0]), "") + } + return data, f, nil + } + panic("impossibru") + return nil, nil, nil +} + +func receiveUnix(conn *net.UnixConn) ([]byte, []int, error) { + buf := make([]byte, 4096) + oob := make([]byte, 4096) + bufn, oobn, _, _, err := conn.ReadMsgUnix(buf, oob) + if err != nil { + return nil, nil, err + } + return buf[:bufn], extractFds(oob[:oobn]), nil +} + +func sendUnix(conn *net.UnixConn, data []byte, fds ...int) error { + _, _, err := conn.WriteMsgUnix(data, syscall.UnixRights(fds...), nil) + return err +} + +func extractFds(oob []byte) (fds []int) { + // Grab forklock to make sure no forks accidentally inherit the new + // fds before they are made CLOEXEC + // There is a slight race condition between ReadMsgUnix returns and + // when we grap the lock, so this is not perfect. Unfortunately + // There is no way to pass MSG_CMSG_CLOEXEC to recvmsg() nor any + // way to implement non-blocking i/o in go, so this is hard to fix. + syscall.ForkLock.Lock() + defer syscall.ForkLock.Unlock() + scms, err := syscall.ParseSocketControlMessage(oob) + if err != nil { + return + } + for _, scm := range scms { + gotFds, err := syscall.ParseUnixRights(&scm) + if err != nil { + continue + } + fds = append(fds, gotFds...) + + for _, fd := range fds { + syscall.CloseOnExec(fd) + } + } + return +} + +func socketpair() ([2]int, error) { + return syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.FD_CLOEXEC, 0) +} + +// SocketPair is a convenience wrapper around the socketpair(2) syscall. +// It returns a unix socket of type SOCK_STREAM in the form of 2 file descriptors +// not bound to the underlying filesystem. +// Messages sent on one end are received on the other, and vice-versa. +// It is the caller's responsibility to close both ends. +func SocketPair() (a *os.File, b *os.File, err error) { + defer func() { + var ( + fdA int = -1 + fdB int = -1 + ) + if a != nil { + fdA = int(a.Fd()) + } + if b != nil { + fdB = int(b.Fd()) + } + debugCheckpoint("===DEBUG=== SocketPair() = [%d-%d]. Hit enter to confirm: ", fdA, fdB) + }() + pair, err := socketpair() + if err != nil { + return nil, nil, err + } + return os.NewFile(uintptr(pair[0]), ""), os.NewFile(uintptr(pair[1]), ""), nil +} + +func USocketPair() (*UnixConn, *UnixConn, error) { + debugCheckpoint("===DEBUG=== USocketPair(). Hit enter to confirm: ") + defer debugCheckpoint("===DEBUG=== USocketPair() returned. Hit enter to confirm ") + a, b, err := SocketPair() + if err != nil { + return nil, nil, err + } + defer a.Close() + defer b.Close() + uA, err := FileConn(a) + if err != nil { + return nil, nil, err + } + uB, err := FileConn(b) + if err != nil { + uA.Close() + return nil, nil, err + } + return uA, uB, nil +} + +// FdConn wraps a file descriptor in a standard *net.UnixConn object, or +// returns an error if the file descriptor does not point to a unix socket. +// This creates a duplicate file descriptor. It's the caller's responsibility +// to close both. +func FdConn(fd int) (n *net.UnixConn, err error) { + { + debugCheckpoint("===DEBUG=== FdConn([%d]) = (unknown fd). Hit enter to confirm: ", fd) + } + f := os.NewFile(uintptr(fd), fmt.Sprintf("%d", fd)) + conn, err := net.FileConn(f) + if err != nil { + return nil, err + } + uconn, ok := conn.(*net.UnixConn) + if !ok { + conn.Close() + return nil, fmt.Errorf("%d: not a unix connection", fd) + } + return uconn, nil +} diff --git a/pkg/beam/unix_test.go b/pkg/beam/unix_test.go new file mode 100644 index 0000000000..09815aa0d6 --- /dev/null +++ b/pkg/beam/unix_test.go @@ -0,0 +1,86 @@ +package beam + +import ( + "fmt" + "io/ioutil" + "testing" +) + +func TestSocketPair(t *testing.T) { + a, b, err := SocketPair() + if err != nil { + t.Fatal(err) + } + go func() { + a.Write([]byte("hello world!")) + fmt.Printf("done writing. closing\n") + a.Close() + fmt.Printf("done closing\n") + }() + data, err := ioutil.ReadAll(b) + if err != nil { + t.Fatal(err) + } + fmt.Printf("--> %s\n", data) + fmt.Printf("still open: %v\n", a.Fd()) +} + +func TestSendUnixSocket(t *testing.T) { + a1, a2, err := USocketPair() + if err != nil { + t.Fatal(err) + } + // defer a1.Close() + // defer a2.Close() + b1, b2, err := USocketPair() + if err != nil { + t.Fatal(err) + } + // defer b1.Close() + // defer b2.Close() + glueA, glueB, err := SocketPair() + if err != nil { + t.Fatal(err) + } + // defer glueA.Close() + // defer glueB.Close() + go func() { + err := b2.Send([]byte("a"), glueB) + if err != nil { + t.Fatal(err) + } + }() + go func() { + err := a2.Send([]byte("b"), glueA) + if err != nil { + t.Fatal(err) + } + }() + connAhdr, connA, err := a1.Receive() + if err != nil { + t.Fatal(err) + } + if string(connAhdr) != "b" { + t.Fatalf("unexpected: %s", connAhdr) + } + connBhdr, connB, err := b1.Receive() + if err != nil { + t.Fatal(err) + } + if string(connBhdr) != "a" { + t.Fatalf("unexpected: %s", connBhdr) + } + fmt.Printf("received both ends: %v <-> %v\n", connA.Fd(), connB.Fd()) + go func() { + fmt.Printf("sending message on %v\n", connA.Fd()) + connA.Write([]byte("hello world")) + connA.Sync() + fmt.Printf("closing %v\n", connA.Fd()) + connA.Close() + }() + data, err := ioutil.ReadAll(connB) + if err != nil { + t.Fatal(err) + } + fmt.Printf("---> %s\n", data) +} diff --git a/pkg/dockerscript/MAINTAINERS b/pkg/dockerscript/MAINTAINERS new file mode 100644 index 0000000000..db33365bcd --- /dev/null +++ b/pkg/dockerscript/MAINTAINERS @@ -0,0 +1 @@ +Solomon Hykes diff --git a/pkg/dockerscript/dockerscript.go b/pkg/dockerscript/dockerscript.go new file mode 100644 index 0000000000..e7ec5d1286 --- /dev/null +++ b/pkg/dockerscript/dockerscript.go @@ -0,0 +1,121 @@ +package dockerscript + +import ( + "fmt" + "github.com/dotcloud/docker/pkg/dockerscript/scanner" + "io" + "strings" +) + +type Command struct { + Args []string + Children []*Command + Background bool +} + +type Scanner struct { + scanner.Scanner + commentLine bool +} + +func Parse(src io.Reader) ([]*Command, error) { + s := &Scanner{} + s.Init(src) + s.Whitespace = 1<<'\t' | 1<<' ' + s.Mode = scanner.ScanStrings | scanner.ScanRawStrings | scanner.ScanIdents + expr, err := parse(s, "") + if err != nil { + return nil, fmt.Errorf("line %d:%d: %v\n", s.Pos().Line, s.Pos().Column, err) + } + return expr, nil +} + +func (cmd *Command) subString(depth int) string { + var prefix string + for i := 0; i < depth; i++ { + prefix += " " + } + s := prefix + strings.Join(cmd.Args, ", ") + if len(cmd.Children) > 0 { + s += " {\n" + for _, subcmd := range cmd.Children { + s += subcmd.subString(depth + 1) + } + s += prefix + "}" + } + s += "\n" + return s +} + +func (cmd *Command) String() string { + return cmd.subString(0) +} + +func parseArgs(s *Scanner) ([]string, rune, error) { + var parseError error + // FIXME: we overwrite previously set error + s.Error = func(s *scanner.Scanner, msg string) { + parseError = fmt.Errorf(msg) + // parseError = fmt.Errorf("line %d:%d: %s\n", s.Pos().Line, s.Pos().Column, msg) + } + var args []string + tok := s.Scan() + for tok != scanner.EOF { + if parseError != nil { + return args, tok, parseError + } + text := s.TokenText() + // Toggle line comment + if strings.HasPrefix(text, "#") { + s.commentLine = true + } else if text == "\n" || text == "\r" { + s.commentLine = false + return args, tok, nil + } + if !s.commentLine { + if text == "{" || text == "}" || text == "\n" || text == "\r" || text == ";" || text == "&" { + return args, tok, nil + } + args = append(args, text) + } + tok = s.Scan() + } + return args, tok, nil +} + +func parse(s *Scanner, opener string) (expr []*Command, err error) { + /* + defer func() { + fmt.Printf("parse() returned %d commands:\n", len(expr)) + for _, c := range expr { + fmt.Printf("\t----> %s\n", c) + } + }() + */ + for { + args, tok, err := parseArgs(s) + if err != nil { + return nil, err + } + cmd := &Command{Args: args} + afterArgs := s.TokenText() + if afterArgs == "{" { + children, err := parse(s, "{") + if err != nil { + return nil, err + } + cmd.Children = children + } else if afterArgs == "}" && opener != "{" { + return nil, fmt.Errorf("unexpected end of block '}'") + } else if afterArgs == "&" { + cmd.Background = true + } + if len(cmd.Args) > 0 || len(cmd.Children) > 0 { + expr = append(expr, cmd) + } + if tok == scanner.EOF || afterArgs == "}" { + break + } + } + return expr, nil +} diff --git a/pkg/dockerscript/scanner/extra.go b/pkg/dockerscript/scanner/extra.go new file mode 100644 index 0000000000..05c17e247e --- /dev/null +++ b/pkg/dockerscript/scanner/extra.go @@ -0,0 +1,21 @@ +package scanner + +import ( + "strings" + "unicode" +) + +// extra functions used to hijack the upstream text/scanner + +func detectIdent(ch rune) bool { + if unicode.IsLetter(ch) { + return true + } + if unicode.IsDigit(ch) { + return true + } + if strings.ContainsRune("_:/+-@%^.!=", ch) { + return true + } + return false +} diff --git a/pkg/dockerscript/scanner/scanner.go b/pkg/dockerscript/scanner/scanner.go new file mode 100644 index 0000000000..b208fc7810 --- /dev/null +++ b/pkg/dockerscript/scanner/scanner.go @@ -0,0 +1,673 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package scanner provides a scanner and tokenizer for UTF-8-encoded text. +// It takes an io.Reader providing the source, which then can be tokenized +// through repeated calls to the Scan function. For compatibility with +// existing tools, the NUL character is not allowed. If the first character +// in the source is a UTF-8 encoded byte order mark (BOM), it is discarded. +// +// By default, a Scanner skips white space and Go comments and recognizes all +// literals as defined by the Go language specification. It may be +// customized to recognize only a subset of those literals and to recognize +// different white space characters. +// +// Basic usage pattern: +// +// var s scanner.Scanner +// s.Init(src) +// tok := s.Scan() +// for tok != scanner.EOF { +// // do something with tok +// tok = s.Scan() +// } +// +package scanner + +import ( + "bytes" + "fmt" + "io" + "os" + "unicode/utf8" +) + +// TODO(gri): Consider changing this to use the new (token) Position package. + +// A source position is represented by a Position value. +// A position is valid if Line > 0. +type Position struct { + Filename string // filename, if any + Offset int // byte offset, starting at 0 + Line int // line number, starting at 1 + Column int // column number, starting at 1 (character count per line) +} + +// IsValid returns true if the position is valid. +func (pos *Position) IsValid() bool { return pos.Line > 0 } + +func (pos Position) String() string { + s := pos.Filename + if pos.IsValid() { + if s != "" { + s += ":" + } + s += fmt.Sprintf("%d:%d", pos.Line, pos.Column) + } + if s == "" { + s = "???" + } + return s +} + +// Predefined mode bits to control recognition of tokens. For instance, +// to configure a Scanner such that it only recognizes (Go) identifiers, +// integers, and skips comments, set the Scanner's Mode field to: +// +// ScanIdents | ScanInts | SkipComments +// +const ( + ScanIdents = 1 << -Ident + ScanInts = 1 << -Int + ScanFloats = 1 << -Float // includes Ints + ScanChars = 1 << -Char + ScanStrings = 1 << -String + ScanRawStrings = 1 << -RawString + ScanComments = 1 << -Comment + SkipComments = 1 << -skipComment // if set with ScanComments, comments become white space + GoTokens = ScanIdents | ScanFloats | ScanChars | ScanStrings | ScanRawStrings | ScanComments | SkipComments +) + +// The result of Scan is one of the following tokens or a Unicode character. +const ( + EOF = -(iota + 1) + Ident + Int + Float + Char + String + RawString + Comment + skipComment +) + +var tokenString = map[rune]string{ + EOF: "EOF", + Ident: "Ident", + Int: "Int", + Float: "Float", + Char: "Char", + String: "String", + RawString: "RawString", + Comment: "Comment", +} + +// TokenString returns a printable string for a token or Unicode character. +func TokenString(tok rune) string { + if s, found := tokenString[tok]; found { + return s + } + return fmt.Sprintf("%q", string(tok)) +} + +// GoWhitespace is the default value for the Scanner's Whitespace field. +// Its value selects Go's white space characters. +const GoWhitespace = 1<<'\t' | 1<<'\n' | 1<<'\r' | 1<<' ' + +const bufLen = 1024 // at least utf8.UTFMax + +// A Scanner implements reading of Unicode characters and tokens from an io.Reader. +type Scanner struct { + // Input + src io.Reader + + // Source buffer + srcBuf [bufLen + 1]byte // +1 for sentinel for common case of s.next() + srcPos int // reading position (srcBuf index) + srcEnd int // source end (srcBuf index) + + // Source position + srcBufOffset int // byte offset of srcBuf[0] in source + line int // line count + column int // character count + lastLineLen int // length of last line in characters (for correct column reporting) + lastCharLen int // length of last character in bytes + + // Token text buffer + // Typically, token text is stored completely in srcBuf, but in general + // the token text's head may be buffered in tokBuf while the token text's + // tail is stored in srcBuf. + tokBuf bytes.Buffer // token text head that is not in srcBuf anymore + tokPos int // token text tail position (srcBuf index); valid if >= 0 + tokEnd int // token text tail end (srcBuf index) + + // One character look-ahead + ch rune // character before current srcPos + + // Error is called for each error encountered. If no Error + // function is set, the error is reported to os.Stderr. + Error func(s *Scanner, msg string) + + // ErrorCount is incremented by one for each error encountered. + ErrorCount int + + // The Mode field controls which tokens are recognized. For instance, + // to recognize Ints, set the ScanInts bit in Mode. The field may be + // changed at any time. + Mode uint + + // The Whitespace field controls which characters are recognized + // as white space. To recognize a character ch <= ' ' as white space, + // set the ch'th bit in Whitespace (the Scanner's behavior is undefined + // for values ch > ' '). The field may be changed at any time. + Whitespace uint64 + + // Start position of most recently scanned token; set by Scan. + // Calling Init or Next invalidates the position (Line == 0). + // The Filename field is always left untouched by the Scanner. + // If an error is reported (via Error) and Position is invalid, + // the scanner is not inside a token. Call Pos to obtain an error + // position in that case. + Position +} + +// Init initializes a Scanner with a new source and returns s. +// Error is set to nil, ErrorCount is set to 0, Mode is set to GoTokens, +// and Whitespace is set to GoWhitespace. +func (s *Scanner) Init(src io.Reader) *Scanner { + s.src = src + + // initialize source buffer + // (the first call to next() will fill it by calling src.Read) + s.srcBuf[0] = utf8.RuneSelf // sentinel + s.srcPos = 0 + s.srcEnd = 0 + + // initialize source position + s.srcBufOffset = 0 + s.line = 1 + s.column = 0 + s.lastLineLen = 0 + s.lastCharLen = 0 + + // initialize token text buffer + // (required for first call to next()). + s.tokPos = -1 + + // initialize one character look-ahead + s.ch = -1 // no char read yet + + // initialize public fields + s.Error = nil + s.ErrorCount = 0 + s.Mode = GoTokens + s.Whitespace = GoWhitespace + s.Line = 0 // invalidate token position + + return s +} + +// next reads and returns the next Unicode character. It is designed such +// that only a minimal amount of work needs to be done in the common ASCII +// case (one test to check for both ASCII and end-of-buffer, and one test +// to check for newlines). +func (s *Scanner) next() rune { + ch, width := rune(s.srcBuf[s.srcPos]), 1 + + if ch >= utf8.RuneSelf { + // uncommon case: not ASCII or not enough bytes + for s.srcPos+utf8.UTFMax > s.srcEnd && !utf8.FullRune(s.srcBuf[s.srcPos:s.srcEnd]) { + // not enough bytes: read some more, but first + // save away token text if any + if s.tokPos >= 0 { + s.tokBuf.Write(s.srcBuf[s.tokPos:s.srcPos]) + s.tokPos = 0 + // s.tokEnd is set by Scan() + } + // move unread bytes to beginning of buffer + copy(s.srcBuf[0:], s.srcBuf[s.srcPos:s.srcEnd]) + s.srcBufOffset += s.srcPos + // read more bytes + // (an io.Reader must return io.EOF when it reaches + // the end of what it is reading - simply returning + // n == 0 will make this loop retry forever; but the + // error is in the reader implementation in that case) + i := s.srcEnd - s.srcPos + n, err := s.src.Read(s.srcBuf[i:bufLen]) + s.srcPos = 0 + s.srcEnd = i + n + s.srcBuf[s.srcEnd] = utf8.RuneSelf // sentinel + if err != nil { + if s.srcEnd == 0 { + if s.lastCharLen > 0 { + // previous character was not EOF + s.column++ + } + s.lastCharLen = 0 + return EOF + } + if err != io.EOF { + s.error(err.Error()) + } + // If err == EOF, we won't be getting more + // bytes; break to avoid infinite loop. If + // err is something else, we don't know if + // we can get more bytes; thus also break. + break + } + } + // at least one byte + ch = rune(s.srcBuf[s.srcPos]) + if ch >= utf8.RuneSelf { + // uncommon case: not ASCII + ch, width = utf8.DecodeRune(s.srcBuf[s.srcPos:s.srcEnd]) + if ch == utf8.RuneError && width == 1 { + // advance for correct error position + s.srcPos += width + s.lastCharLen = width + s.column++ + s.error("illegal UTF-8 encoding") + return ch + } + } + } + + // advance + s.srcPos += width + s.lastCharLen = width + s.column++ + + // special situations + switch ch { + case 0: + // for compatibility with other tools + s.error("illegal character NUL") + case '\n': + s.line++ + s.lastLineLen = s.column + s.column = 0 + } + + return ch +} + +// Next reads and returns the next Unicode character. +// It returns EOF at the end of the source. It reports +// a read error by calling s.Error, if not nil; otherwise +// it prints an error message to os.Stderr. Next does not +// update the Scanner's Position field; use Pos() to +// get the current position. +func (s *Scanner) Next() rune { + s.tokPos = -1 // don't collect token text + s.Line = 0 // invalidate token position + ch := s.Peek() + s.ch = s.next() + return ch +} + +// Peek returns the next Unicode character in the source without advancing +// the scanner. It returns EOF if the scanner's position is at the last +// character of the source. +func (s *Scanner) Peek() rune { + if s.ch < 0 { + // this code is only run for the very first character + s.ch = s.next() + if s.ch == '\uFEFF' { + s.ch = s.next() // ignore BOM + } + } + return s.ch +} + +func (s *Scanner) error(msg string) { + s.ErrorCount++ + if s.Error != nil { + s.Error(s, msg) + return + } + pos := s.Position + if !pos.IsValid() { + pos = s.Pos() + } + fmt.Fprintf(os.Stderr, "%s: %s\n", pos, msg) +} + +func (s *Scanner) scanIdentifier() rune { + ch := s.next() // read character after first '_' or letter + for detectIdent(ch) { + ch = s.next() + } + return ch +} + +func digitVal(ch rune) int { + switch { + case '0' <= ch && ch <= '9': + return int(ch - '0') + case 'a' <= ch && ch <= 'f': + return int(ch - 'a' + 10) + case 'A' <= ch && ch <= 'F': + return int(ch - 'A' + 10) + } + return 16 // larger than any legal digit val +} + +func isDecimal(ch rune) bool { return '0' <= ch && ch <= '9' } + +func (s *Scanner) scanMantissa(ch rune) rune { + for isDecimal(ch) { + ch = s.next() + } + return ch +} + +func (s *Scanner) scanFraction(ch rune) rune { + if ch == '.' { + ch = s.scanMantissa(s.next()) + } + return ch +} + +func (s *Scanner) scanExponent(ch rune) rune { + if ch == 'e' || ch == 'E' { + ch = s.next() + if ch == '-' || ch == '+' { + ch = s.next() + } + ch = s.scanMantissa(ch) + } + return ch +} + +func (s *Scanner) scanNumber(ch rune) (rune, rune) { + // isDecimal(ch) + if ch == '0' { + // int or float + ch = s.next() + if ch == 'x' || ch == 'X' { + // hexadecimal int + ch = s.next() + hasMantissa := false + for digitVal(ch) < 16 { + ch = s.next() + hasMantissa = true + } + if !hasMantissa { + s.error("illegal hexadecimal number") + } + } else { + // octal int or float + has8or9 := false + for isDecimal(ch) { + if ch > '7' { + has8or9 = true + } + ch = s.next() + } + if s.Mode&ScanFloats != 0 && (ch == '.' || ch == 'e' || ch == 'E') { + // float + ch = s.scanFraction(ch) + ch = s.scanExponent(ch) + return Float, ch + } + // octal int + if has8or9 { + s.error("illegal octal number") + } + } + return Int, ch + } + // decimal int or float + ch = s.scanMantissa(ch) + if s.Mode&ScanFloats != 0 && (ch == '.' || ch == 'e' || ch == 'E') { + // float + ch = s.scanFraction(ch) + ch = s.scanExponent(ch) + return Float, ch + } + return Int, ch +} + +func (s *Scanner) scanDigits(ch rune, base, n int) rune { + for n > 0 && digitVal(ch) < base { + ch = s.next() + n-- + } + if n > 0 { + s.error("illegal char escape") + } + return ch +} + +func (s *Scanner) scanEscape(quote rune) rune { + ch := s.next() // read character after '/' + switch ch { + case 'a', 'b', 'f', 'n', 'r', 't', 'v', '\\', quote: + // nothing to do + ch = s.next() + case '0', '1', '2', '3', '4', '5', '6', '7': + ch = s.scanDigits(ch, 8, 3) + case 'x': + ch = s.scanDigits(s.next(), 16, 2) + case 'u': + ch = s.scanDigits(s.next(), 16, 4) + case 'U': + ch = s.scanDigits(s.next(), 16, 8) + default: + s.error("illegal char escape") + } + return ch +} + +func (s *Scanner) scanString(quote rune) (n int) { + ch := s.next() // read character after quote + for ch != quote { + if ch == '\n' || ch < 0 { + s.error("literal not terminated") + return + } + if ch == '\\' { + ch = s.scanEscape(quote) + } else { + ch = s.next() + } + n++ + } + return +} + +func (s *Scanner) scanRawString() { + ch := s.next() // read character after '`' + for ch != '`' { + if ch < 0 { + s.error("literal not terminated") + return + } + ch = s.next() + } +} + +func (s *Scanner) scanChar() { + if s.scanString('\'') != 1 { + s.error("illegal char literal") + } +} + +func (s *Scanner) scanComment(ch rune) rune { + // ch == '/' || ch == '*' + if ch == '/' { + // line comment + ch = s.next() // read character after "//" + for ch != '\n' && ch >= 0 { + ch = s.next() + } + return ch + } + + // general comment + ch = s.next() // read character after "/*" + for { + if ch < 0 { + s.error("comment not terminated") + break + } + ch0 := ch + ch = s.next() + if ch0 == '*' && ch == '/' { + ch = s.next() + break + } + } + return ch +} + +// Scan reads the next token or Unicode character from source and returns it. +// It only recognizes tokens t for which the respective Mode bit (1<<-t) is set. +// It returns EOF at the end of the source. It reports scanner errors (read and +// token errors) by calling s.Error, if not nil; otherwise it prints an error +// message to os.Stderr. +func (s *Scanner) Scan() rune { + ch := s.Peek() + + // reset token text position + s.tokPos = -1 + s.Line = 0 + +redo: + // skip white space + for s.Whitespace&(1< 0 { + // common case: last character was not a '\n' + s.Line = s.line + s.Column = s.column + } else { + // last character was a '\n' + // (we cannot be at the beginning of the source + // since we have called next() at least once) + s.Line = s.line - 1 + s.Column = s.lastLineLen + } + + // determine token value + tok := ch + switch { + case detectIdent(ch): + if s.Mode&ScanIdents != 0 { + tok = Ident + ch = s.scanIdentifier() + } else { + ch = s.next() + } + case isDecimal(ch): + if s.Mode&(ScanInts|ScanFloats) != 0 { + tok, ch = s.scanNumber(ch) + } else { + ch = s.next() + } + default: + switch ch { + case '"': + if s.Mode&ScanStrings != 0 { + s.scanString('"') + tok = String + } + ch = s.next() + case '\'': + if s.Mode&ScanChars != 0 { + s.scanChar() + tok = Char + } + ch = s.next() + case '.': + ch = s.next() + if isDecimal(ch) && s.Mode&ScanFloats != 0 { + tok = Float + ch = s.scanMantissa(ch) + ch = s.scanExponent(ch) + } + case '/': + ch = s.next() + if (ch == '/' || ch == '*') && s.Mode&ScanComments != 0 { + if s.Mode&SkipComments != 0 { + s.tokPos = -1 // don't collect token text + ch = s.scanComment(ch) + goto redo + } + ch = s.scanComment(ch) + tok = Comment + } + case '`': + if s.Mode&ScanRawStrings != 0 { + s.scanRawString() + tok = String + } + ch = s.next() + default: + ch = s.next() + } + } + + // end of token text + s.tokEnd = s.srcPos - s.lastCharLen + + s.ch = ch + return tok +} + +// Pos returns the position of the character immediately after +// the character or token returned by the last call to Next or Scan. +func (s *Scanner) Pos() (pos Position) { + pos.Filename = s.Filename + pos.Offset = s.srcBufOffset + s.srcPos - s.lastCharLen + switch { + case s.column > 0: + // common case: last character was not a '\n' + pos.Line = s.line + pos.Column = s.column + case s.lastLineLen > 0: + // last character was a '\n' + pos.Line = s.line - 1 + pos.Column = s.lastLineLen + default: + // at the beginning of the source + pos.Line = 1 + pos.Column = 1 + } + return +} + +// TokenText returns the string corresponding to the most recently scanned token. +// Valid after calling Scan(). +func (s *Scanner) TokenText() string { + if s.tokPos < 0 { + // no token text + return "" + } + + if s.tokEnd < 0 { + // if EOF was reached, s.tokEnd is set to -1 (s.srcPos == 0) + s.tokEnd = s.tokPos + } + + if s.tokBuf.Len() == 0 { + // common case: the entire token text is still in srcBuf + return string(s.srcBuf[s.tokPos:s.tokEnd]) + } + + // part of the token text was saved in tokBuf: save the rest in + // tokBuf as well and return its content + s.tokBuf.Write(s.srcBuf[s.tokPos:s.tokEnd]) + s.tokPos = s.tokEnd // ensure idempotency of TokenText() call + return s.tokBuf.String() +}