Merge pull request #5343 from shykes/beamsh

beam + beam/data + dockerscript
This commit is contained in:
Victor Vieux 2014-04-22 17:01:05 -07:00
commit 817494d009
32 changed files with 3205 additions and 0 deletions

1
pkg/beam/MAINTAINERS Normal file
View file

@ -0,0 +1 @@
Solomon Hykes <solomon@docker.com>

135
pkg/beam/beam.go Normal file
View file

@ -0,0 +1,135 @@
package beam
import (
"fmt"
"io"
"os"
)
type Sender interface {
Send([]byte, *os.File) error
}
type Receiver interface {
Receive() ([]byte, *os.File, error)
}
type ReceiveCloser interface {
Receiver
Close() error
}
type SendCloser interface {
Sender
Close() error
}
type ReceiveSender interface {
Receiver
Sender
}
func SendPipe(dst Sender, data []byte) (*os.File, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
if err := dst.Send(data, r); err != nil {
r.Close()
w.Close()
return nil, err
}
return w, nil
}
func SendConn(dst Sender, data []byte) (conn *UnixConn, err error) {
local, remote, err := SocketPair()
if err != nil {
return nil, err
}
defer func() {
if err != nil {
local.Close()
remote.Close()
}
}()
conn, err = FileConn(local)
if err != nil {
return nil, err
}
local.Close()
if err := dst.Send(data, remote); err != nil {
return nil, err
}
return conn, nil
}
func ReceiveConn(src Receiver) ([]byte, *UnixConn, error) {
for {
data, f, err := src.Receive()
if err != nil {
return nil, nil, err
}
if f == nil {
// Skip empty attachments
continue
}
conn, err := FileConn(f)
if err != nil {
// Skip beam attachments which are not connections
// (for example might be a regular file, directory etc)
continue
}
return data, conn, nil
}
panic("impossibru!")
return nil, nil, nil
}
func Copy(dst Sender, src Receiver) (int, error) {
var n int
for {
payload, attachment, err := src.Receive()
if err == io.EOF {
return n, nil
} else if err != nil {
return n, err
}
if err := dst.Send(payload, attachment); err != nil {
if attachment != nil {
attachment.Close()
}
return n, err
}
n++
}
panic("impossibru!")
return n, nil
}
// MsgDesc returns a human readable description of a beam message, usually
// for debugging purposes.
func MsgDesc(payload []byte, attachment *os.File) string {
var filedesc string = "<nil>"
if attachment != nil {
filedesc = fmt.Sprintf("%d", attachment.Fd())
}
return fmt.Sprintf("'%s'[%s]", payload, filedesc)
}
type devnull struct{}
func Devnull() ReceiveSender {
return devnull{}
}
func (d devnull) Send(p []byte, a *os.File) error {
if a != nil {
a.Close()
}
return nil
}
func (d devnull) Receive() ([]byte, *os.File, error) {
return nil, nil, io.EOF
}

39
pkg/beam/beam_test.go Normal file
View file

@ -0,0 +1,39 @@
package beam
import (
"github.com/dotcloud/docker/pkg/beam/data"
"testing"
)
func TestSendConn(t *testing.T) {
a, b, err := USocketPair()
if err != nil {
t.Fatal(err)
}
defer a.Close()
defer b.Close()
go func() {
conn, err := SendConn(a, data.Empty().Set("type", "connection").Bytes())
if err != nil {
t.Fatal(err)
}
if err := conn.Send(data.Empty().Set("foo", "bar").Bytes(), nil); err != nil {
t.Fatal(err)
}
conn.CloseWrite()
}()
payload, conn, err := ReceiveConn(b)
if err != nil {
t.Fatal(err)
}
if val := data.Message(string(payload)).Get("type"); val == nil || val[0] != "connection" {
t.Fatalf("%v != %v\n", val, "connection")
}
msg, _, err := conn.Receive()
if err != nil {
t.Fatal(err)
}
if val := data.Message(string(msg)).Get("foo"); val == nil || val[0] != "bar" {
t.Fatalf("%v != %v\n", val, "bar")
}
}

115
pkg/beam/data/data.go Normal file
View file

@ -0,0 +1,115 @@
package data
import (
"fmt"
"strconv"
"strings"
)
func Encode(obj map[string][]string) string {
var msg string
msg += encodeHeader(0)
for k, values := range obj {
msg += encodeNamedList(k, values)
}
return msg
}
func encodeHeader(msgtype int) string {
return fmt.Sprintf("%03.3d;", msgtype)
}
func encodeString(s string) string {
return fmt.Sprintf("%d:%s,", len(s), s)
}
var EncodeString = encodeString
var DecodeString = decodeString
func encodeList(l []string) string {
values := make([]string, 0, len(l))
for _, s := range l {
values = append(values, encodeString(s))
}
return encodeString(strings.Join(values, ""))
}
func encodeNamedList(name string, l []string) string {
return encodeString(name) + encodeList(l)
}
func Decode(msg string) (map[string][]string, error) {
msgtype, skip, err := decodeHeader(msg)
if err != nil {
return nil, err
}
if msgtype != 0 {
// FIXME: use special error type so the caller can easily ignore
return nil, fmt.Errorf("unknown message type: %d", msgtype)
}
msg = msg[skip:]
obj := make(map[string][]string)
for len(msg) > 0 {
k, skip, err := decodeString(msg)
if err != nil {
return nil, err
}
msg = msg[skip:]
values, skip, err := decodeList(msg)
if err != nil {
return nil, err
}
msg = msg[skip:]
obj[k] = values
}
return obj, nil
}
func decodeList(msg string) ([]string, int, error) {
blob, skip, err := decodeString(msg)
if err != nil {
return nil, 0, err
}
var l []string
for len(blob) > 0 {
v, skipv, err := decodeString(blob)
if err != nil {
return nil, 0, err
}
l = append(l, v)
blob = blob[skipv:]
}
return l, skip, nil
}
func decodeString(msg string) (string, int, error) {
parts := strings.SplitN(msg, ":", 2)
if len(parts) != 2 {
return "", 0, fmt.Errorf("invalid format: no column")
}
var length int
if l, err := strconv.ParseUint(parts[0], 10, 64); err != nil {
return "", 0, err
} else {
length = int(l)
}
if len(parts[1]) < length+1 {
return "", 0, fmt.Errorf("message '%s' is %d bytes, expected at least %d", parts[1], len(parts[1]), length+1)
}
payload := parts[1][:length+1]
if payload[length] != ',' {
return "", 0, fmt.Errorf("message is not comma-terminated")
}
return payload[:length], len(parts[0]) + 1 + length + 1, nil
}
func decodeHeader(msg string) (int, int, error) {
if len(msg) < 4 {
return 0, 0, fmt.Errorf("message too small")
}
msgtype, err := strconv.ParseInt(msg[:3], 10, 32)
if err != nil {
return 0, 0, err
}
return int(msgtype), 4, nil
}

129
pkg/beam/data/data_test.go Normal file
View file

@ -0,0 +1,129 @@
package data
import (
"strings"
"testing"
)
func TestEncodeHelloWorld(t *testing.T) {
input := "hello world!"
output := encodeString(input)
expectedOutput := "12:hello world!,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeEmptyString(t *testing.T) {
input := ""
output := encodeString(input)
expectedOutput := "0:,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeEmptyList(t *testing.T) {
input := []string{}
output := encodeList(input)
expectedOutput := "0:,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeEmptyMap(t *testing.T) {
input := make(map[string][]string)
output := Encode(input)
expectedOutput := "000;"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncode1Key1Value(t *testing.T) {
input := make(map[string][]string)
input["hello"] = []string{"world"}
output := Encode(input)
expectedOutput := "000;5:hello,8:5:world,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncode1Key2Value(t *testing.T) {
input := make(map[string][]string)
input["hello"] = []string{"beautiful", "world"}
output := Encode(input)
expectedOutput := "000;5:hello,20:9:beautiful,5:world,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeEmptyValue(t *testing.T) {
input := make(map[string][]string)
input["foo"] = []string{}
output := Encode(input)
expectedOutput := "000;3:foo,0:,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeBinaryKey(t *testing.T) {
input := make(map[string][]string)
input["foo\x00bar\x7f"] = []string{}
output := Encode(input)
expectedOutput := "000;8:foo\x00bar\x7f,0:,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestEncodeBinaryValue(t *testing.T) {
input := make(map[string][]string)
input["foo\x00bar\x7f"] = []string{"\x01\x02\x03\x04"}
output := Encode(input)
expectedOutput := "000;8:foo\x00bar\x7f,7:4:\x01\x02\x03\x04,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}
func TestDecodeString(t *testing.T) {
validEncodedStrings := []struct {
input string
output string
skip int
}{
{"3:foo,", "foo", 6},
{"5:hello,", "hello", 8},
{"5:hello,5:world,", "hello", 8},
}
for _, sample := range validEncodedStrings {
output, skip, err := decodeString(sample.input)
if err != nil {
t.Fatalf("error decoding '%v': %v", sample.input, err)
}
if skip != sample.skip {
t.Fatalf("invalid skip: %v!=%v", skip, sample.skip)
}
if output != sample.output {
t.Fatalf("invalid output: %v!=%v", output, sample.output)
}
}
}
func TestDecode1Key1Value(t *testing.T) {
input := "000;3:foo,6:3:bar,,"
output, err := Decode(input)
if err != nil {
t.Fatal(err)
}
if v, exists := output["foo"]; !exists {
t.Fatalf("wrong output: %v\n", output)
} else if len(v) != 1 || strings.Join(v, "") != "bar" {
t.Fatalf("wrong output: %v\n", output)
}
}

93
pkg/beam/data/message.go Normal file
View file

@ -0,0 +1,93 @@
package data
import (
"fmt"
"strings"
)
type Message string
func Empty() Message {
return Message(Encode(nil))
}
func Parse(args []string) Message {
data := make(map[string][]string)
for _, word := range args {
if strings.Contains(word, "=") {
kv := strings.SplitN(word, "=", 2)
key := kv[0]
var val string
if len(kv) == 2 {
val = kv[1]
}
data[key] = []string{val}
}
}
return Message(Encode(data))
}
func (m Message) Add(k, v string) Message {
data, err := Decode(string(m))
if err != nil {
return m
}
if values, exists := data[k]; exists {
data[k] = append(values, v)
} else {
data[k] = []string{v}
}
return Message(Encode(data))
}
func (m Message) Set(k string, v ...string) Message {
data, err := Decode(string(m))
if err != nil {
panic(err)
return m
}
data[k] = v
return Message(Encode(data))
}
func (m Message) Del(k string) Message {
data, err := Decode(string(m))
if err != nil {
panic(err)
return m
}
delete(data, k)
return Message(Encode(data))
}
func (m Message) Get(k string) []string {
data, err := Decode(string(m))
if err != nil {
return nil
}
v, exists := data[k]
if !exists {
return nil
}
return v
}
func (m Message) Pretty() string {
data, err := Decode(string(m))
if err != nil {
return ""
}
entries := make([]string, 0, len(data))
for k, values := range data {
entries = append(entries, fmt.Sprintf("%s=%s", k, strings.Join(values, ",")))
}
return strings.Join(entries, " ")
}
func (m Message) String() string {
return string(m)
}
func (m Message) Bytes() []byte {
return []byte(m)
}

View file

@ -0,0 +1,53 @@
package data
import (
"testing"
)
func TestEmptyMessage(t *testing.T) {
m := Empty()
if m.String() != Encode(nil) {
t.Fatalf("%v != %v", m.String(), Encode(nil))
}
}
func TestSetMessage(t *testing.T) {
m := Empty().Set("foo", "bar")
output := m.String()
expectedOutput := "000;3:foo,6:3:bar,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
decodedOutput, err := Decode(output)
if err != nil {
t.Fatal(err)
}
if len(decodedOutput) != 1 {
t.Fatalf("wrong output data: %#v\n", decodedOutput)
}
}
func TestSetMessageTwice(t *testing.T) {
m := Empty().Set("foo", "bar").Set("ga", "bu")
output := m.String()
expectedOutput := "000;3:foo,6:3:bar,,2:ga,5:2:bu,,"
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
decodedOutput, err := Decode(output)
if err != nil {
t.Fatal(err)
}
if len(decodedOutput) != 2 {
t.Fatalf("wrong output data: %#v\n", decodedOutput)
}
}
func TestSetDelMessage(t *testing.T) {
m := Empty().Set("foo", "bar").Del("foo")
output := m.String()
expectedOutput := Encode(nil)
if output != expectedOutput {
t.Fatalf("'%v' != '%v'", output, expectedOutput)
}
}

View file

@ -0,0 +1,92 @@
##
## Netstrings spec copied as-is from http://cr.yp.to/proto/netstrings.txt
##
Netstrings
D. J. Bernstein, djb@pobox.com
19970201
1. Introduction
A netstring is a self-delimiting encoding of a string. Netstrings are
very easy to generate and to parse. Any string may be encoded as a
netstring; there are no restrictions on length or on allowed bytes.
Another virtue of a netstring is that it declares the string size up
front. Thus an application can check in advance whether it has enough
space to store the entire string.
Netstrings may be used as a basic building block for reliable network
protocols. Most high-level protocols, in effect, transmit a sequence
of strings; those strings may be encoded as netstrings and then
concatenated into a sequence of characters, which in turn may be
transmitted over a reliable stream protocol such as TCP.
Note that netstrings can be used recursively. The result of encoding
a sequence of strings is a single string. A series of those encoded
strings may in turn be encoded into a single string. And so on.
In this document, a string of 8-bit bytes may be written in two
different forms: as a series of hexadecimal numbers between angle
brackets, or as a sequence of ASCII characters between double quotes.
For example, <68 65 6c 6c 6f 20 77 6f 72 6c 64 21> is a string of
length 12; it is the same as the string "hello world!".
Although this document restricts attention to strings of 8-bit bytes,
netstrings could be used with any 6-bit-or-larger character set.
2. Definition
Any string of 8-bit bytes may be encoded as [len]":"[string]",".
Here [string] is the string and [len] is a nonempty sequence of ASCII
digits giving the length of [string] in decimal. The ASCII digits are
<30> for 0, <31> for 1, and so on up through <39> for 9. Extra zeros
at the front of [len] are prohibited: [len] begins with <30> exactly
when [string] is empty.
For example, the string "hello world!" is encoded as <31 32 3a 68
65 6c 6c 6f 20 77 6f 72 6c 64 21 2c>, i.e., "12:hello world!,". The
empty string is encoded as "0:,".
[len]":"[string]"," is called a netstring. [string] is called the
interpretation of the netstring.
3. Sample code
The following C code starts with a buffer buf of length len and
prints it as a netstring.
if (printf("%lu:",len) < 0) barf();
if (fwrite(buf,1,len,stdout) < len) barf();
if (putchar(',') < 0) barf();
The following C code reads a netstring and decodes it into a
dynamically allocated buffer buf of length len.
if (scanf("%9lu",&len) < 1) barf(); /* >999999999 bytes is bad */
if (getchar() != ':') barf();
buf = malloc(len + 1); /* malloc(0) is not portable */
if (!buf) barf();
if (fread(buf,1,len,stdin) < len) barf();
if (getchar() != ',') barf();
Both of these code fragments assume that the local character set is
ASCII, and that the relevant stdio streams are in binary mode.
4. Security considerations
The famous Finger security hole may be blamed on Finger's use of the
CRLF encoding. In that encoding, each string is simply terminated by
CRLF. This encoding has several problems. Most importantly, it does
not declare the string size in advance. This means that a correct
CRLF parser must be prepared to ask for more and more memory as it is
reading the string. In the case of Finger, a lazy implementor found
this to be too much trouble; instead he simply declared a fixed-size
buffer and used C's gets() function. The rest is history.
In contrast, as the above sample code shows, it is very easy to
handle netstrings without risking buffer overflow. Thus widespread
use of netstrings may improve network security.

BIN
pkg/beam/examples/beamsh/beamsh Executable file

Binary file not shown.

View file

@ -0,0 +1,542 @@
package main
import (
"bufio"
"flag"
"fmt"
"github.com/dotcloud/docker/pkg/beam"
"github.com/dotcloud/docker/pkg/beam/data"
"github.com/dotcloud/docker/pkg/dockerscript"
"github.com/dotcloud/docker/pkg/term"
"io"
"net"
"net/url"
"os"
"path"
"strings"
"sync"
)
var rootPlugins = []string{
"stdio",
}
var (
flX bool
flPing bool
introspect beam.ReceiveSender = beam.Devnull()
)
func main() {
fd3 := os.NewFile(3, "beam-introspect")
if introsp, err := beam.FileConn(fd3); err == nil {
introspect = introsp
Logf("introspection enabled\n")
} else {
Logf("introspection disabled\n")
}
fd3.Close()
flag.BoolVar(&flX, "x", false, "print commands as they are being executed")
flag.Parse()
if flag.NArg() == 0 {
if term.IsTerminal(0) {
// No arguments, stdin is terminal --> interactive mode
input := bufio.NewScanner(os.Stdin)
for {
fmt.Printf("[%d] beamsh> ", os.Getpid())
if !input.Scan() {
break
}
line := input.Text()
if len(line) != 0 {
cmd, err := dockerscript.Parse(strings.NewReader(line))
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
continue
}
if err := executeRootScript(cmd); err != nil {
Fatal(err)
}
}
if err := input.Err(); err == io.EOF {
break
} else if err != nil {
Fatal(err)
}
}
} else {
// No arguments, stdin not terminal --> batch mode
script, err := dockerscript.Parse(os.Stdin)
if err != nil {
Fatal("parse error: %v\n", err)
}
if err := executeRootScript(script); err != nil {
Fatal(err)
}
}
} else {
// 1+ arguments: parse them as script files
for _, scriptpath := range flag.Args() {
f, err := os.Open(scriptpath)
if err != nil {
Fatal(err)
}
script, err := dockerscript.Parse(f)
if err != nil {
Fatal("parse error: %v\n", err)
}
if err := executeRootScript(script); err != nil {
Fatal(err)
}
}
}
}
func executeRootScript(script []*dockerscript.Command) error {
if len(rootPlugins) > 0 {
// If there are root plugins, wrap the script inside them
var (
rootCmd *dockerscript.Command
lastCmd *dockerscript.Command
)
for _, plugin := range rootPlugins {
pluginCmd := &dockerscript.Command{
Args: []string{plugin},
}
if rootCmd == nil {
rootCmd = pluginCmd
} else {
lastCmd.Children = []*dockerscript.Command{pluginCmd}
}
lastCmd = pluginCmd
}
lastCmd.Children = script
script = []*dockerscript.Command{rootCmd}
}
handlers, err := Handlers(introspect)
if err != nil {
return err
}
defer handlers.Close()
var tasks sync.WaitGroup
defer func() {
Debugf("Waiting for introspection...\n")
tasks.Wait()
Debugf("DONE Waiting for introspection\n")
}()
if introspect != nil {
tasks.Add(1)
go func() {
Debugf("starting introspection\n")
defer Debugf("done with introspection\n")
defer tasks.Done()
introspect.Send(data.Empty().Set("cmd", "log", "stdout").Set("message", "introspection worked!").Bytes(), nil)
Debugf("XXX starting reading introspection messages\n")
r := beam.NewRouter(handlers)
r.NewRoute().All().Handler(func(p []byte, a *os.File) error {
Logf("[INTROSPECTION] %s\n", beam.MsgDesc(p, a))
return handlers.Send(p, a)
})
n, err := beam.Copy(r, introspect)
Debugf("XXX done reading %d introspection messages: %v\n", n, err)
}()
}
if err := executeScript(handlers, script); err != nil {
return err
}
return nil
}
func executeScript(out beam.Sender, script []*dockerscript.Command) error {
Debugf("executeScript(%s)\n", scriptString(script))
defer Debugf("executeScript(%s) DONE\n", scriptString(script))
var background sync.WaitGroup
defer background.Wait()
for _, cmd := range script {
if cmd.Background {
background.Add(1)
go func(out beam.Sender, cmd *dockerscript.Command) {
executeCommand(out, cmd)
background.Done()
}(out, cmd)
} else {
if err := executeCommand(out, cmd); err != nil {
return err
}
}
}
return nil
}
// 1) Find a handler for the command (if no handler, fail)
// 2) Attach new in & out pair to the handler
// 3) [in the background] Copy handler output to our own output
// 4) [in the background] Run the handler
// 5) Recursively executeScript() all children commands and wait for them to complete
// 6) Wait for handler to return and (shortly afterwards) output copy to complete
// 7) Profit
func executeCommand(out beam.Sender, cmd *dockerscript.Command) error {
if flX {
fmt.Printf("+ %v\n", strings.Replace(strings.TrimRight(cmd.String(), "\n"), "\n", "\n+ ", -1))
}
Debugf("executeCommand(%s)\n", strings.Join(cmd.Args, " "))
defer Debugf("executeCommand(%s) DONE\n", strings.Join(cmd.Args, " "))
if len(cmd.Args) == 0 {
return fmt.Errorf("empty command")
}
Debugf("[executeCommand] sending job '%s'\n", strings.Join(cmd.Args, " "))
job, err := beam.SendConn(out, data.Empty().Set("cmd", cmd.Args...).Set("type", "job").Bytes())
if err != nil {
return fmt.Errorf("%v\n", err)
}
var tasks sync.WaitGroup
tasks.Add(1)
Debugf("[executeCommand] spawning background copy of the output of '%s'\n", strings.Join(cmd.Args, " "))
go func() {
if out != nil {
Debugf("[executeCommand] background copy of the output of '%s'\n", strings.Join(cmd.Args, " "))
n, err := beam.Copy(out, job)
if err != nil {
Fatalf("[executeCommand] [%s] error during background copy: %v\n", strings.Join(cmd.Args, " "), err)
}
Debugf("[executeCommand] background copy done of the output of '%s': copied %d messages\n", strings.Join(cmd.Args, " "), n)
}
tasks.Done()
}()
// depth-first execution of children commands
// executeScript() blocks until all commands are completed
Debugf("[executeCommand] recursively running children of '%s'\n", strings.Join(cmd.Args, " "))
executeScript(job, cmd.Children)
Debugf("[executeCommand] DONE recursively running children of '%s'\n", strings.Join(cmd.Args, " "))
job.CloseWrite()
Debugf("[executeCommand] closing the input of '%s' (all children are completed)\n", strings.Join(cmd.Args, " "))
Debugf("[executeCommand] waiting for background copy of '%s' to complete...\n", strings.Join(cmd.Args, " "))
tasks.Wait()
Debugf("[executeCommand] background copy of '%s' complete! This means the job completed.\n", strings.Join(cmd.Args, " "))
return nil
}
type Handler func([]string, io.Writer, io.Writer, beam.Receiver, beam.Sender)
func Handlers(sink beam.Sender) (*beam.UnixConn, error) {
var tasks sync.WaitGroup
pub, priv, err := beam.USocketPair()
if err != nil {
return nil, err
}
go func() {
defer func() {
Debugf("[handlers] closewrite() on endpoint\n")
// FIXME: this is not yet necessary but will be once
// there is synchronization over standard beam messages
priv.CloseWrite()
Debugf("[handlers] done closewrite() on endpoint\n")
}()
r := beam.NewRouter(sink)
r.NewRoute().HasAttachment().KeyIncludes("type", "job").Handler(func(payload []byte, attachment *os.File) error {
conn, err := beam.FileConn(attachment)
if err != nil {
attachment.Close()
return err
}
// attachment.Close()
tasks.Add(1)
go func() {
defer tasks.Done()
defer func() {
Debugf("[handlers] '%s' closewrite\n", payload)
conn.CloseWrite()
Debugf("[handlers] '%s' done closewrite\n", payload)
}()
cmd := data.Message(payload).Get("cmd")
Debugf("[handlers] received %s\n", strings.Join(cmd, " "))
if len(cmd) == 0 {
return
}
handler := GetHandler(cmd[0])
if handler == nil {
return
}
stdout, err := beam.SendPipe(conn, data.Empty().Set("cmd", "log", "stdout").Set("fromcmd", cmd...).Bytes())
if err != nil {
return
}
defer stdout.Close()
stderr, err := beam.SendPipe(conn, data.Empty().Set("cmd", "log", "stderr").Set("fromcmd", cmd...).Bytes())
if err != nil {
return
}
defer stderr.Close()
Debugf("[handlers] calling %s\n", strings.Join(cmd, " "))
handler(cmd, stdout, stderr, beam.Receiver(conn), beam.Sender(conn))
Debugf("[handlers] returned: %s\n", strings.Join(cmd, " "))
}()
return nil
})
beam.Copy(r, priv)
Debugf("[handlers] waiting for all tasks\n")
tasks.Wait()
Debugf("[handlers] all tasks returned\n")
}()
return pub, nil
}
func GetHandler(name string) Handler {
if name == "logger" {
return CmdLogger
} else if name == "render" {
return CmdRender
} else if name == "devnull" {
return CmdDevnull
} else if name == "prompt" {
return CmdPrompt
} else if name == "stdio" {
return CmdStdio
} else if name == "echo" {
return CmdEcho
} else if name == "pass" {
return CmdPass
} else if name == "in" {
return CmdIn
} else if name == "exec" {
return CmdExec
} else if name == "trace" {
return CmdTrace
} else if name == "emit" {
return CmdEmit
} else if name == "print" {
return CmdPrint
} else if name == "multiprint" {
return CmdMultiprint
} else if name == "listen" {
return CmdListen
} else if name == "beamsend" {
return CmdBeamsend
} else if name == "beamreceive" {
return CmdBeamreceive
} else if name == "connect" {
return CmdConnect
} else if name == "openfile" {
return CmdOpenfile
} else if name == "spawn" {
return CmdSpawn
} else if name == "chdir" {
return CmdChdir
}
return nil
}
// VARIOUS HELPER FUNCTIONS:
func connToFile(conn net.Conn) (f *os.File, err error) {
if connWithFile, ok := conn.(interface {
File() (*os.File, error)
}); !ok {
return nil, fmt.Errorf("no file descriptor available")
} else {
f, err = connWithFile.File()
if err != nil {
return nil, err
}
}
return f, err
}
type Msg struct {
payload []byte
attachment *os.File
}
func Logf(msg string, args ...interface{}) (int, error) {
if len(msg) == 0 || msg[len(msg)-1] != '\n' {
msg = msg + "\n"
}
msg = fmt.Sprintf("[%v] [%v] %s", os.Getpid(), path.Base(os.Args[0]), msg)
return fmt.Printf(msg, args...)
}
func Debugf(msg string, args ...interface{}) {
if os.Getenv("BEAMDEBUG") != "" {
Logf(msg, args...)
}
}
func Fatalf(msg string, args ...interface{}) {
Logf(msg, args...)
os.Exit(1)
}
func Fatal(args ...interface{}) {
Fatalf("%v", args[0])
}
func scriptString(script []*dockerscript.Command) string {
lines := make([]string, 0, len(script))
for _, cmd := range script {
line := strings.Join(cmd.Args, " ")
if len(cmd.Children) > 0 {
line += fmt.Sprintf(" { %s }", scriptString(cmd.Children))
} else {
line += " {}"
}
lines = append(lines, line)
}
return fmt.Sprintf("'%s'", strings.Join(lines, "; "))
}
func dialer(addr string) (chan net.Conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
connections := make(chan net.Conn)
go func() {
defer close(connections)
for {
conn, err := net.Dial(u.Scheme, u.Host)
if err != nil {
return
}
connections <- conn
}
}()
return connections, nil
}
func listener(addr string) (chan net.Conn, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, err
}
l, err := net.Listen(u.Scheme, u.Host)
if err != nil {
return nil, err
}
connections := make(chan net.Conn)
go func() {
defer close(connections)
for {
conn, err := l.Accept()
if err != nil {
return
}
Logf("new connection\n")
connections <- conn
}
}()
return connections, nil
}
func SendToConn(connections chan net.Conn, src beam.Receiver) error {
var tasks sync.WaitGroup
defer tasks.Wait()
for {
payload, attachment, err := src.Receive()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
conn, ok := <-connections
if !ok {
break
}
Logf("Sending %s\n", msgDesc(payload, attachment))
tasks.Add(1)
go func(payload []byte, attachment *os.File, conn net.Conn) {
defer tasks.Done()
if _, err := conn.Write([]byte(data.EncodeString(string(payload)))); err != nil {
return
}
if attachment == nil {
conn.Close()
return
}
var iotasks sync.WaitGroup
iotasks.Add(2)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying the connection to [%d]\n", attachment.Fd())
io.Copy(attachment, conn)
attachment.Close()
Debugf("done copying the connection to [%d]\n", attachment.Fd())
}(attachment, conn)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
Debugf("copying [%d] to the connection\n", attachment.Fd())
io.Copy(conn, attachment)
conn.Close()
Debugf("done copying [%d] to the connection\n", attachment.Fd())
}(attachment, conn)
iotasks.Wait()
}(payload, attachment, conn)
}
return nil
}
func msgDesc(payload []byte, attachment *os.File) string {
return beam.MsgDesc(payload, attachment)
}
func ReceiveFromConn(connections chan net.Conn, dst beam.Sender) error {
for conn := range connections {
err := func() error {
Logf("parsing message from network...\n")
defer Logf("done parsing message from network\n")
buf := make([]byte, 4098)
n, err := conn.Read(buf)
if n == 0 {
conn.Close()
if err == io.EOF {
return nil
} else {
return err
}
}
Logf("decoding message from '%s'\n", buf[:n])
header, skip, err := data.DecodeString(string(buf[:n]))
if err != nil {
conn.Close()
return err
}
pub, priv, err := beam.SocketPair()
if err != nil {
return err
}
Logf("decoded message: %s\n", data.Message(header).Pretty())
go func(skipped []byte, conn net.Conn, f *os.File) {
// this closes both conn and f
if len(skipped) > 0 {
if _, err := f.Write(skipped); err != nil {
Logf("ERROR: %v\n", err)
f.Close()
conn.Close()
return
}
}
bicopy(conn, f)
}(buf[skip:n], conn, pub)
if err := dst.Send([]byte(header), priv); err != nil {
return err
}
return nil
}()
if err != nil {
Logf("Error reading from connection: %v\n", err)
}
}
return nil
}
func bicopy(a, b io.ReadWriteCloser) {
var iotasks sync.WaitGroup
oneCopy := func(dst io.WriteCloser, src io.Reader) {
defer iotasks.Done()
io.Copy(dst, src)
dst.Close()
}
iotasks.Add(2)
go oneCopy(a, b)
go oneCopy(b, a)
iotasks.Wait()
}

View file

@ -0,0 +1,441 @@
package main
import (
"bufio"
"fmt"
"github.com/dotcloud/docker/pkg/beam"
"github.com/dotcloud/docker/pkg/beam/data"
"github.com/dotcloud/docker/pkg/term"
"github.com/dotcloud/docker/utils"
"io"
"net"
"net/url"
"os"
"os/exec"
"path"
"strings"
"sync"
"text/template"
)
func CmdLogger(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if err := os.MkdirAll("logs", 0700); err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
var tasks sync.WaitGroup
defer tasks.Wait()
var n int = 1
r := beam.NewRouter(out)
r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error {
tasks.Add(1)
go func(n int) {
defer tasks.Done()
defer attachment.Close()
var streamname string
if cmd := data.Message(payload).Get("cmd"); len(cmd) == 1 || cmd[1] == "stdout" {
streamname = "stdout"
} else {
streamname = cmd[1]
}
if fromcmd := data.Message(payload).Get("fromcmd"); len(fromcmd) != 0 {
streamname = fmt.Sprintf("%s-%s", strings.Replace(strings.Join(fromcmd, "_"), "/", "_", -1), streamname)
}
logfile, err := os.OpenFile(path.Join("logs", fmt.Sprintf("%d-%s", n, streamname)), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0700)
if err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
defer logfile.Close()
io.Copy(logfile, attachment)
logfile.Sync()
}(n)
n++
return nil
}).Tee(out)
if _, err := beam.Copy(r, in); err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
}
func CmdRender(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) != 2 {
fmt.Fprintf(stderr, "Usage: %s FORMAT\n", args[0])
out.Send(data.Empty().Set("status", "1").Bytes(), nil)
return
}
txt := args[1]
if !strings.HasSuffix(txt, "\n") {
txt += "\n"
}
t := template.Must(template.New("render").Parse(txt))
for {
payload, attachment, err := in.Receive()
if err != nil {
return
}
msg, err := data.Decode(string(payload))
if err != nil {
fmt.Fprintf(stderr, "decode error: %v\n")
}
if err := t.Execute(stdout, msg); err != nil {
fmt.Fprintf(stderr, "rendering error: %v\n", err)
out.Send(data.Empty().Set("status", "1").Bytes(), nil)
return
}
if err := out.Send(payload, attachment); err != nil {
return
}
}
}
func CmdDevnull(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
for {
_, attachment, err := in.Receive()
if err != nil {
return
}
if attachment != nil {
attachment.Close()
}
}
}
func CmdPrompt(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) < 2 {
fmt.Fprintf(stderr, "usage: %s PROMPT...\n", args[0])
return
}
if !term.IsTerminal(0) {
fmt.Fprintf(stderr, "can't prompt: no tty available...\n")
return
}
fmt.Printf("%s: ", strings.Join(args[1:], " "))
oldState, _ := term.SaveState(0)
term.DisableEcho(0, oldState)
line, _, err := bufio.NewReader(os.Stdin).ReadLine()
if err != nil {
fmt.Fprintln(stderr, err.Error())
return
}
val := string(line)
fmt.Printf("\n")
term.RestoreTerminal(0, oldState)
out.Send(data.Empty().Set("fromcmd", args...).Set("value", val).Bytes(), nil)
}
func CmdStdio(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
var tasks sync.WaitGroup
defer tasks.Wait()
r := beam.NewRouter(out)
r.NewRoute().HasAttachment().KeyStartsWith("cmd", "log").Handler(func(payload []byte, attachment *os.File) error {
tasks.Add(1)
go func() {
defer tasks.Done()
defer attachment.Close()
io.Copy(os.Stdout, attachment)
attachment.Close()
}()
return nil
}).Tee(out)
if _, err := beam.Copy(r, in); err != nil {
Fatal(err)
fmt.Fprintf(stderr, "%v\n", err)
return
}
}
func CmdEcho(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
fmt.Fprintln(stdout, strings.Join(args[1:], " "))
}
func CmdPass(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
for {
payload, attachment, err := in.Receive()
if err != nil {
return
}
if err := out.Send(payload, attachment); err != nil {
if attachment != nil {
attachment.Close()
}
return
}
}
}
func CmdSpawn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
c := exec.Command(utils.SelfPath())
r, w, err := os.Pipe()
if err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
c.Stdin = r
c.Stdout = stdout
c.Stderr = stderr
go func() {
fmt.Fprintf(w, strings.Join(args[1:], " "))
w.Sync()
w.Close()
}()
if err := c.Run(); err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
}
func CmdIn(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
os.Chdir(args[1])
GetHandler("pass")([]string{"pass"}, stdout, stderr, in, out)
}
func CmdExec(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
cmd := exec.Command(args[1], args[2:]...)
cmd.Stdout = stdout
cmd.Stderr = stderr
//cmd.Stdin = os.Stdin
local, remote, err := beam.SocketPair()
if err != nil {
fmt.Fprintf(stderr, "%v\n", err)
return
}
child, err := beam.FileConn(local)
if err != nil {
local.Close()
remote.Close()
fmt.Fprintf(stderr, "%v\n", err)
return
}
local.Close()
cmd.ExtraFiles = append(cmd.ExtraFiles, remote)
var tasks sync.WaitGroup
tasks.Add(1)
go func() {
defer Debugf("done copying to child\n")
defer tasks.Done()
defer child.CloseWrite()
beam.Copy(child, in)
}()
tasks.Add(1)
go func() {
defer Debugf("done copying from child %d\n")
defer tasks.Done()
r := beam.NewRouter(out)
r.NewRoute().All().Handler(func(p []byte, a *os.File) error {
return out.Send(data.Message(p).Set("pid", fmt.Sprintf("%d", cmd.Process.Pid)).Bytes(), a)
})
beam.Copy(r, child)
}()
execErr := cmd.Run()
// We can close both ends of the socket without worrying about data stuck in the buffer,
// because unix socket writes are fully synchronous.
child.Close()
tasks.Wait()
var status string
if execErr != nil {
status = execErr.Error()
} else {
status = "ok"
}
out.Send(data.Empty().Set("status", status).Set("cmd", args...).Bytes(), nil)
}
func CmdTrace(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
r := beam.NewRouter(out)
r.NewRoute().All().Handler(func(payload []byte, attachment *os.File) error {
var sfd string = "nil"
if attachment != nil {
sfd = fmt.Sprintf("%d", attachment.Fd())
}
fmt.Printf("===> %s [%s]\n", data.Message(payload).Pretty(), sfd)
out.Send(payload, attachment)
return nil
})
beam.Copy(r, in)
}
func CmdEmit(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
out.Send(data.Parse(args[1:]).Bytes(), nil)
}
func CmdPrint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
for {
payload, a, err := in.Receive()
if err != nil {
return
}
// Skip commands
if a != nil && data.Message(payload).Get("cmd") == nil {
dup, err := beam.SendPipe(out, payload)
if err != nil {
a.Close()
return
}
io.Copy(io.MultiWriter(os.Stdout, dup), a)
dup.Close()
} else {
if err := out.Send(payload, a); err != nil {
return
}
}
}
}
func CmdMultiprint(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
var tasks sync.WaitGroup
defer tasks.Wait()
r := beam.NewRouter(out)
multiprint := func(p []byte, a *os.File) error {
tasks.Add(1)
go func() {
defer tasks.Done()
defer a.Close()
msg := data.Message(string(p))
input := bufio.NewScanner(a)
for input.Scan() {
fmt.Printf("[%s] %s\n", msg.Pretty(), input.Text())
}
}()
return nil
}
r.NewRoute().KeyIncludes("type", "job").Passthrough(out)
r.NewRoute().HasAttachment().Handler(multiprint).Tee(out)
beam.Copy(r, in)
}
func CmdListen(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) != 2 {
out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil)
return
}
u, err := url.Parse(args[1])
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
l, err := net.Listen(u.Scheme, u.Host)
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
for {
conn, err := l.Accept()
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
f, err := connToFile(conn)
if err != nil {
conn.Close()
continue
}
out.Send(data.Empty().Set("type", "socket").Set("remoteaddr", conn.RemoteAddr().String()).Bytes(), f)
}
}
func CmdBeamsend(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) < 2 {
if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
Fatal(err)
}
return
}
var connector func(string) (chan net.Conn, error)
connector = dialer
connections, err := connector(args[1])
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
// Copy in to conn
SendToConn(connections, in)
}
func CmdBeamreceive(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) != 2 {
if err := out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil); err != nil {
Fatal(err)
}
return
}
var connector func(string) (chan net.Conn, error)
connector = listener
connections, err := connector(args[1])
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
// Copy in to conn
ReceiveFromConn(connections, out)
}
func CmdConnect(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
if len(args) != 2 {
out.Send(data.Empty().Set("status", "1").Set("message", "wrong number of arguments").Bytes(), nil)
return
}
u, err := url.Parse(args[1])
if err != nil {
out.Send(data.Empty().Set("status", "1").Set("message", err.Error()).Bytes(), nil)
return
}
var tasks sync.WaitGroup
for {
_, attachment, err := in.Receive()
if err != nil {
break
}
if attachment == nil {
continue
}
Logf("connecting to %s/%s\n", u.Scheme, u.Host)
conn, err := net.Dial(u.Scheme, u.Host)
if err != nil {
out.Send(data.Empty().Set("cmd", "msg", "connect error: "+err.Error()).Bytes(), nil)
return
}
out.Send(data.Empty().Set("cmd", "msg", "connection established").Bytes(), nil)
tasks.Add(1)
go func(attachment *os.File, conn net.Conn) {
defer tasks.Done()
// even when successful, conn.File() returns a duplicate,
// so we must close the original
var iotasks sync.WaitGroup
iotasks.Add(2)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
io.Copy(attachment, conn)
}(attachment, conn)
go func(attachment *os.File, conn net.Conn) {
defer iotasks.Done()
io.Copy(conn, attachment)
}(attachment, conn)
iotasks.Wait()
conn.Close()
attachment.Close()
}(attachment, conn)
}
tasks.Wait()
}
func CmdOpenfile(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
for _, name := range args {
f, err := os.Open(name)
if err != nil {
continue
}
if err := out.Send(data.Empty().Set("path", name).Set("type", "file").Bytes(), f); err != nil {
f.Close()
}
}
}
func CmdChdir(args []string, stdout, stderr io.Writer, in beam.Receiver, out beam.Sender) {
os.Chdir(args[1])
}

View file

@ -0,0 +1,3 @@
#!/usr/bin/env beamsh
exec ls -l

View file

@ -0,0 +1,5 @@
#!/usr/bin/env beamsh
trace {
exec ls -l
}

View file

@ -0,0 +1,7 @@
#!/usr/bin/env beamsh
trace {
stdio {
exec ls -l
}
}

View file

@ -0,0 +1,10 @@
#!/usr/bin/env beamsh -x
trace outer {
# stdio fails
stdio {
trace inner {
exec ls -l
}
}
}

View file

@ -0,0 +1,9 @@
#!/usr/bin/env beamsh
stdio {
trace {
stdio {
exec ls -l
}
}
}

View file

@ -0,0 +1,6 @@
#!/usr/bin/env beamsh
stdio {
# exec fails
exec ls -l
}

View file

@ -0,0 +1,7 @@
#!/usr/bin/env beamsh
stdio {
trace {
echo hello
}
}

View file

@ -0,0 +1,6 @@
#!/usr/bin/env beamsh
stdio {
# exec fails
echo hello world
}

View file

@ -0,0 +1,9 @@
#!/usr/bin/env beamsh
devnull {
multiprint {
exec tail -f /var/log/system.log &
exec ls -l
exec ls ksdhfkjdshf jksdfhkjsdhf
}
}

View file

@ -0,0 +1,8 @@
#!/usr/bin/env beamsh
print {
trace {
emit msg=hello
emit msg=world
}
}

View file

@ -0,0 +1,9 @@
#!/usr/bin/env beamsh
trace {
log {
exec ls -l
exec ls /tmp/jhsdfkjhsdjkfhsdjkfhsdjkkhsdjkf
echo hello world
}
}

View file

@ -0,0 +1,9 @@
#!/usr/bin/env beamsh
multiprint {
trace {
listen tcp://localhost:7676 &
listen tcp://localhost:8787 &
}
}

184
pkg/beam/router.go Normal file
View file

@ -0,0 +1,184 @@
package beam
import (
"fmt"
"github.com/dotcloud/docker/pkg/beam/data"
"io"
"os"
)
type Router struct {
routes []*Route
sink Sender
}
func NewRouter(sink Sender) *Router {
return &Router{sink: sink}
}
func (r *Router) Send(payload []byte, attachment *os.File) (err error) {
//fmt.Printf("Router.Send(%s)\n", MsgDesc(payload, attachment))
defer func() {
//fmt.Printf("DONE Router.Send(%s) = %v\n", MsgDesc(payload, attachment), err)
}()
for _, route := range r.routes {
if route.Match(payload, attachment) {
return route.Handle(payload, attachment)
}
}
if r.sink != nil {
// fmt.Printf("[%d] [Router.Send] no match. sending %s to sink %#v\n", os.Getpid(), MsgDesc(payload, attachment), r.sink)
return r.sink.Send(payload, attachment)
}
//fmt.Printf("[Router.Send] no match. return error.\n")
return fmt.Errorf("no matching route")
}
func (r *Router) NewRoute() *Route {
route := &Route{}
r.routes = append(r.routes, route)
return route
}
type Route struct {
rules []func([]byte, *os.File) bool
handler func([]byte, *os.File) error
}
func (route *Route) Match(payload []byte, attachment *os.File) bool {
for _, rule := range route.rules {
if !rule(payload, attachment) {
return false
}
}
return true
}
func (route *Route) Handle(payload []byte, attachment *os.File) error {
if route.handler == nil {
return nil
}
return route.handler(payload, attachment)
}
func (r *Route) HasAttachment() *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
return attachment != nil
})
return r
}
func (route *Route) Tee(dst Sender) *Route {
inner := route.handler
route.handler = func(payload []byte, attachment *os.File) error {
if inner == nil {
return nil
}
if attachment == nil {
return inner(payload, attachment)
}
// Setup the tee
w, err := SendPipe(dst, payload)
if err != nil {
return err
}
teeR, teeW, err := os.Pipe()
if err != nil {
w.Close()
return err
}
go func() {
io.Copy(io.MultiWriter(teeW, w), attachment)
attachment.Close()
w.Close()
teeW.Close()
}()
return inner(payload, teeR)
}
return route
}
func (r *Route) Filter(f func([]byte, *os.File) bool) *Route {
r.rules = append(r.rules, f)
return r
}
func (r *Route) KeyStartsWith(k string, beginning ...string) *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
values := data.Message(payload).Get(k)
if values == nil {
return false
}
if len(values) < len(beginning) {
return false
}
for i, v := range beginning {
if v != values[i] {
return false
}
}
return true
})
return r
}
func (r *Route) KeyEquals(k string, full ...string) *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
values := data.Message(payload).Get(k)
if len(values) != len(full) {
return false
}
for i, v := range full {
if v != values[i] {
return false
}
}
return true
})
return r
}
func (r *Route) KeyIncludes(k, v string) *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
for _, val := range data.Message(payload).Get(k) {
if val == v {
return true
}
}
return false
})
return r
}
func (r *Route) NoKey(k string) *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
return len(data.Message(payload).Get(k)) == 0
})
return r
}
func (r *Route) KeyExists(k string) *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
return data.Message(payload).Get(k) != nil
})
return r
}
func (r *Route) Passthrough(dst Sender) *Route {
r.handler = func(payload []byte, attachment *os.File) error {
return dst.Send(payload, attachment)
}
return r
}
func (r *Route) All() *Route {
r.rules = append(r.rules, func(payload []byte, attachment *os.File) bool {
return true
})
return r
}
func (r *Route) Handler(h func([]byte, *os.File) error) *Route {
r.handler = h
return r
}

95
pkg/beam/router_test.go Normal file
View file

@ -0,0 +1,95 @@
package beam
import (
"fmt"
"io/ioutil"
"os"
"sync"
"testing"
)
type msg struct {
payload []byte
attachment *os.File
}
func (m msg) String() string {
return MsgDesc(m.payload, m.attachment)
}
type mockReceiver []msg
func (r *mockReceiver) Send(p []byte, a *os.File) error {
(*r) = append((*r), msg{p, a})
return nil
}
func TestSendNoSinkNoRoute(t *testing.T) {
r := NewRouter(nil)
if err := r.Send([]byte("hello"), nil); err == nil {
t.Fatalf("error expected")
}
a, b, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
defer a.Close()
defer b.Close()
if err := r.Send([]byte("foo bar baz"), a); err == nil {
t.Fatalf("error expected")
}
}
func TestSendSinkNoRoute(t *testing.T) {
var sink mockReceiver
r := NewRouter(&sink)
if err := r.Send([]byte("hello"), nil); err != nil {
t.Fatal(err)
}
a, b, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
defer a.Close()
defer b.Close()
if err := r.Send([]byte("world"), a); err != nil {
t.Fatal(err)
}
if len(sink) != 2 {
t.Fatalf("%#v\n", sink)
}
if string(sink[0].payload) != "hello" {
t.Fatalf("%#v\n", sink)
}
if sink[0].attachment != nil {
t.Fatalf("%#v\n", sink)
}
if string(sink[1].payload) != "world" {
t.Fatalf("%#v\n", sink)
}
if sink[1].attachment == nil || sink[1].attachment.Fd() > 42 || sink[1].attachment.Fd() < 0 {
t.Fatalf("%v\n", sink)
}
var tasks sync.WaitGroup
tasks.Add(2)
go func() {
defer tasks.Done()
fmt.Printf("[%d] Reading from '%d'\n", os.Getpid(), sink[1].attachment.Fd())
data, err := ioutil.ReadAll(sink[1].attachment)
if err != nil {
t.Fatal(err)
}
if string(data) != "foo bar\n" {
t.Fatalf("%v\n", string(data))
}
}()
go func() {
defer tasks.Done()
fmt.Printf("[%d] writing to '%d'\n", os.Getpid(), a.Fd())
if _, err := fmt.Fprintf(b, "foo bar\n"); err != nil {
t.Fatal(err)
}
b.Close()
}()
tasks.Wait()
}

85
pkg/beam/service.go Normal file
View file

@ -0,0 +1,85 @@
package beam
import (
"net"
)
// Listen is a convenience interface for applications to create service endpoints
// which can be easily used with existing networking code.
//
// Listen registers a new service endpoint on the beam connection `conn`, using the
// service name `name`. It returns a listener which can be used in the usual
// way. Calling Accept() on the listener will block until a new connection is available
// on the service endpoint. The endpoint is then returned as a regular net.Conn and
// can be used as a regular network connection.
//
// Note that if the underlying file descriptor received in attachment is nil or does
// not point to a connection, that message will be skipped.
//
func Listen(conn Sender, name string) (net.Listener, error) {
endpoint, err := SendConn(conn, []byte(name))
if err != nil {
return nil, err
}
return &listener{
name: name,
endpoint: endpoint,
}, nil
}
func Connect(ctx *UnixConn, name string) (net.Conn, error) {
l, err := Listen(ctx, name)
if err != nil {
return nil, err
}
conn, err := l.Accept()
if err != nil {
return nil, err
}
return conn, nil
}
type listener struct {
name string
endpoint ReceiveCloser
}
func (l *listener) Accept() (net.Conn, error) {
for {
_, f, err := l.endpoint.Receive()
if err != nil {
return nil, err
}
if f == nil {
// Skip empty attachments
continue
}
conn, err := net.FileConn(f)
if err != nil {
// Skip beam attachments which are not connections
// (for example might be a regular file, directory etc)
continue
}
return conn, nil
}
panic("impossibru!")
return nil, nil
}
func (l *listener) Close() error {
return l.endpoint.Close()
}
func (l *listener) Addr() net.Addr {
return addr(l.name)
}
type addr string
func (a addr) Network() string {
return "beam"
}
func (a addr) String() string {
return string(a)
}

211
pkg/beam/unix.go Normal file
View file

@ -0,0 +1,211 @@
package beam
import (
"bufio"
"fmt"
"net"
"os"
"syscall"
)
func debugCheckpoint(msg string, args ...interface{}) {
if os.Getenv("DEBUG") == "" {
return
}
os.Stdout.Sync()
tty, _ := os.OpenFile("/dev/tty", os.O_RDWR, 0700)
fmt.Fprintf(tty, msg, args...)
bufio.NewScanner(tty).Scan()
tty.Close()
}
type UnixConn struct {
*net.UnixConn
}
func FileConn(f *os.File) (*UnixConn, error) {
conn, err := net.FileConn(f)
if err != nil {
return nil, err
}
uconn, ok := conn.(*net.UnixConn)
if !ok {
conn.Close()
return nil, fmt.Errorf("%d: not a unix connection", f.Fd())
}
return &UnixConn{uconn}, nil
}
// Send sends a new message on conn with data and f as payload and
// attachment, respectively.
// On success, f is closed
func (conn *UnixConn) Send(data []byte, f *os.File) error {
{
var fd int = -1
if f != nil {
fd = int(f.Fd())
}
debugCheckpoint("===DEBUG=== about to send '%s'[%d]. Hit enter to confirm: ", data, fd)
}
var fds []int
if f != nil {
fds = append(fds, int(f.Fd()))
}
if err := sendUnix(conn.UnixConn, data, fds...); err != nil {
return err
}
if f != nil {
f.Close()
}
return nil
}
// Receive waits for a new message on conn, and receives its payload
// and attachment, or an error if any.
//
// If more than 1 file descriptor is sent in the message, they are all
// closed except for the first, which is the attachment.
// It is legal for a message to have no attachment or an empty payload.
func (conn *UnixConn) Receive() (rdata []byte, rf *os.File, rerr error) {
defer func() {
var fd int = -1
if rf != nil {
fd = int(rf.Fd())
}
debugCheckpoint("===DEBUG=== Receive() -> '%s'[%d]. Hit enter to continue.\n", rdata, fd)
}()
for {
data, fds, err := receiveUnix(conn.UnixConn)
if err != nil {
return nil, nil, err
}
var f *os.File
if len(fds) > 1 {
for _, fd := range fds[1:] {
syscall.Close(fd)
}
}
if len(fds) >= 1 {
f = os.NewFile(uintptr(fds[0]), "")
}
return data, f, nil
}
panic("impossibru")
return nil, nil, nil
}
func receiveUnix(conn *net.UnixConn) ([]byte, []int, error) {
buf := make([]byte, 4096)
oob := make([]byte, 4096)
bufn, oobn, _, _, err := conn.ReadMsgUnix(buf, oob)
if err != nil {
return nil, nil, err
}
return buf[:bufn], extractFds(oob[:oobn]), nil
}
func sendUnix(conn *net.UnixConn, data []byte, fds ...int) error {
_, _, err := conn.WriteMsgUnix(data, syscall.UnixRights(fds...), nil)
return err
}
func extractFds(oob []byte) (fds []int) {
// Grab forklock to make sure no forks accidentally inherit the new
// fds before they are made CLOEXEC
// There is a slight race condition between ReadMsgUnix returns and
// when we grap the lock, so this is not perfect. Unfortunately
// There is no way to pass MSG_CMSG_CLOEXEC to recvmsg() nor any
// way to implement non-blocking i/o in go, so this is hard to fix.
syscall.ForkLock.Lock()
defer syscall.ForkLock.Unlock()
scms, err := syscall.ParseSocketControlMessage(oob)
if err != nil {
return
}
for _, scm := range scms {
gotFds, err := syscall.ParseUnixRights(&scm)
if err != nil {
continue
}
fds = append(fds, gotFds...)
for _, fd := range fds {
syscall.CloseOnExec(fd)
}
}
return
}
func socketpair() ([2]int, error) {
return syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM|syscall.FD_CLOEXEC, 0)
}
// SocketPair is a convenience wrapper around the socketpair(2) syscall.
// It returns a unix socket of type SOCK_STREAM in the form of 2 file descriptors
// not bound to the underlying filesystem.
// Messages sent on one end are received on the other, and vice-versa.
// It is the caller's responsibility to close both ends.
func SocketPair() (a *os.File, b *os.File, err error) {
defer func() {
var (
fdA int = -1
fdB int = -1
)
if a != nil {
fdA = int(a.Fd())
}
if b != nil {
fdB = int(b.Fd())
}
debugCheckpoint("===DEBUG=== SocketPair() = [%d-%d]. Hit enter to confirm: ", fdA, fdB)
}()
pair, err := socketpair()
if err != nil {
return nil, nil, err
}
return os.NewFile(uintptr(pair[0]), ""), os.NewFile(uintptr(pair[1]), ""), nil
}
func USocketPair() (*UnixConn, *UnixConn, error) {
debugCheckpoint("===DEBUG=== USocketPair(). Hit enter to confirm: ")
defer debugCheckpoint("===DEBUG=== USocketPair() returned. Hit enter to confirm ")
a, b, err := SocketPair()
if err != nil {
return nil, nil, err
}
defer a.Close()
defer b.Close()
uA, err := FileConn(a)
if err != nil {
return nil, nil, err
}
uB, err := FileConn(b)
if err != nil {
uA.Close()
return nil, nil, err
}
return uA, uB, nil
}
// FdConn wraps a file descriptor in a standard *net.UnixConn object, or
// returns an error if the file descriptor does not point to a unix socket.
// This creates a duplicate file descriptor. It's the caller's responsibility
// to close both.
func FdConn(fd int) (n *net.UnixConn, err error) {
{
debugCheckpoint("===DEBUG=== FdConn([%d]) = (unknown fd). Hit enter to confirm: ", fd)
}
f := os.NewFile(uintptr(fd), fmt.Sprintf("%d", fd))
conn, err := net.FileConn(f)
if err != nil {
return nil, err
}
uconn, ok := conn.(*net.UnixConn)
if !ok {
conn.Close()
return nil, fmt.Errorf("%d: not a unix connection", fd)
}
return uconn, nil
}

86
pkg/beam/unix_test.go Normal file
View file

@ -0,0 +1,86 @@
package beam
import (
"fmt"
"io/ioutil"
"testing"
)
func TestSocketPair(t *testing.T) {
a, b, err := SocketPair()
if err != nil {
t.Fatal(err)
}
go func() {
a.Write([]byte("hello world!"))
fmt.Printf("done writing. closing\n")
a.Close()
fmt.Printf("done closing\n")
}()
data, err := ioutil.ReadAll(b)
if err != nil {
t.Fatal(err)
}
fmt.Printf("--> %s\n", data)
fmt.Printf("still open: %v\n", a.Fd())
}
func TestSendUnixSocket(t *testing.T) {
a1, a2, err := USocketPair()
if err != nil {
t.Fatal(err)
}
// defer a1.Close()
// defer a2.Close()
b1, b2, err := USocketPair()
if err != nil {
t.Fatal(err)
}
// defer b1.Close()
// defer b2.Close()
glueA, glueB, err := SocketPair()
if err != nil {
t.Fatal(err)
}
// defer glueA.Close()
// defer glueB.Close()
go func() {
err := b2.Send([]byte("a"), glueB)
if err != nil {
t.Fatal(err)
}
}()
go func() {
err := a2.Send([]byte("b"), glueA)
if err != nil {
t.Fatal(err)
}
}()
connAhdr, connA, err := a1.Receive()
if err != nil {
t.Fatal(err)
}
if string(connAhdr) != "b" {
t.Fatalf("unexpected: %s", connAhdr)
}
connBhdr, connB, err := b1.Receive()
if err != nil {
t.Fatal(err)
}
if string(connBhdr) != "a" {
t.Fatalf("unexpected: %s", connBhdr)
}
fmt.Printf("received both ends: %v <-> %v\n", connA.Fd(), connB.Fd())
go func() {
fmt.Printf("sending message on %v\n", connA.Fd())
connA.Write([]byte("hello world"))
connA.Sync()
fmt.Printf("closing %v\n", connA.Fd())
connA.Close()
}()
data, err := ioutil.ReadAll(connB)
if err != nil {
t.Fatal(err)
}
fmt.Printf("---> %s\n", data)
}

View file

@ -0,0 +1 @@
Solomon Hykes <solomon@docker.com>

View file

@ -0,0 +1,121 @@
package dockerscript
import (
"fmt"
"github.com/dotcloud/docker/pkg/dockerscript/scanner"
"io"
"strings"
)
type Command struct {
Args []string
Children []*Command
Background bool
}
type Scanner struct {
scanner.Scanner
commentLine bool
}
func Parse(src io.Reader) ([]*Command, error) {
s := &Scanner{}
s.Init(src)
s.Whitespace = 1<<'\t' | 1<<' '
s.Mode = scanner.ScanStrings | scanner.ScanRawStrings | scanner.ScanIdents
expr, err := parse(s, "")
if err != nil {
return nil, fmt.Errorf("line %d:%d: %v\n", s.Pos().Line, s.Pos().Column, err)
}
return expr, nil
}
func (cmd *Command) subString(depth int) string {
var prefix string
for i := 0; i < depth; i++ {
prefix += " "
}
s := prefix + strings.Join(cmd.Args, ", ")
if len(cmd.Children) > 0 {
s += " {\n"
for _, subcmd := range cmd.Children {
s += subcmd.subString(depth + 1)
}
s += prefix + "}"
}
s += "\n"
return s
}
func (cmd *Command) String() string {
return cmd.subString(0)
}
func parseArgs(s *Scanner) ([]string, rune, error) {
var parseError error
// FIXME: we overwrite previously set error
s.Error = func(s *scanner.Scanner, msg string) {
parseError = fmt.Errorf(msg)
// parseError = fmt.Errorf("line %d:%d: %s\n", s.Pos().Line, s.Pos().Column, msg)
}
var args []string
tok := s.Scan()
for tok != scanner.EOF {
if parseError != nil {
return args, tok, parseError
}
text := s.TokenText()
// Toggle line comment
if strings.HasPrefix(text, "#") {
s.commentLine = true
} else if text == "\n" || text == "\r" {
s.commentLine = false
return args, tok, nil
}
if !s.commentLine {
if text == "{" || text == "}" || text == "\n" || text == "\r" || text == ";" || text == "&" {
return args, tok, nil
}
args = append(args, text)
}
tok = s.Scan()
}
return args, tok, nil
}
func parse(s *Scanner, opener string) (expr []*Command, err error) {
/*
defer func() {
fmt.Printf("parse() returned %d commands:\n", len(expr))
for _, c := range expr {
fmt.Printf("\t----> %s\n", c)
}
}()
*/
for {
args, tok, err := parseArgs(s)
if err != nil {
return nil, err
}
cmd := &Command{Args: args}
afterArgs := s.TokenText()
if afterArgs == "{" {
children, err := parse(s, "{")
if err != nil {
return nil, err
}
cmd.Children = children
} else if afterArgs == "}" && opener != "{" {
return nil, fmt.Errorf("unexpected end of block '}'")
} else if afterArgs == "&" {
cmd.Background = true
}
if len(cmd.Args) > 0 || len(cmd.Children) > 0 {
expr = append(expr, cmd)
}
if tok == scanner.EOF || afterArgs == "}" {
break
}
}
return expr, nil
}

View file

@ -0,0 +1,21 @@
package scanner
import (
"strings"
"unicode"
)
// extra functions used to hijack the upstream text/scanner
func detectIdent(ch rune) bool {
if unicode.IsLetter(ch) {
return true
}
if unicode.IsDigit(ch) {
return true
}
if strings.ContainsRune("_:/+-@%^.!=", ch) {
return true
}
return false
}

View file

@ -0,0 +1,673 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package scanner provides a scanner and tokenizer for UTF-8-encoded text.
// It takes an io.Reader providing the source, which then can be tokenized
// through repeated calls to the Scan function. For compatibility with
// existing tools, the NUL character is not allowed. If the first character
// in the source is a UTF-8 encoded byte order mark (BOM), it is discarded.
//
// By default, a Scanner skips white space and Go comments and recognizes all
// literals as defined by the Go language specification. It may be
// customized to recognize only a subset of those literals and to recognize
// different white space characters.
//
// Basic usage pattern:
//
// var s scanner.Scanner
// s.Init(src)
// tok := s.Scan()
// for tok != scanner.EOF {
// // do something with tok
// tok = s.Scan()
// }
//
package scanner
import (
"bytes"
"fmt"
"io"
"os"
"unicode/utf8"
)
// TODO(gri): Consider changing this to use the new (token) Position package.
// A source position is represented by a Position value.
// A position is valid if Line > 0.
type Position struct {
Filename string // filename, if any
Offset int // byte offset, starting at 0
Line int // line number, starting at 1
Column int // column number, starting at 1 (character count per line)
}
// IsValid returns true if the position is valid.
func (pos *Position) IsValid() bool { return pos.Line > 0 }
func (pos Position) String() string {
s := pos.Filename
if pos.IsValid() {
if s != "" {
s += ":"
}
s += fmt.Sprintf("%d:%d", pos.Line, pos.Column)
}
if s == "" {
s = "???"
}
return s
}
// Predefined mode bits to control recognition of tokens. For instance,
// to configure a Scanner such that it only recognizes (Go) identifiers,
// integers, and skips comments, set the Scanner's Mode field to:
//
// ScanIdents | ScanInts | SkipComments
//
const (
ScanIdents = 1 << -Ident
ScanInts = 1 << -Int
ScanFloats = 1 << -Float // includes Ints
ScanChars = 1 << -Char
ScanStrings = 1 << -String
ScanRawStrings = 1 << -RawString
ScanComments = 1 << -Comment
SkipComments = 1 << -skipComment // if set with ScanComments, comments become white space
GoTokens = ScanIdents | ScanFloats | ScanChars | ScanStrings | ScanRawStrings | ScanComments | SkipComments
)
// The result of Scan is one of the following tokens or a Unicode character.
const (
EOF = -(iota + 1)
Ident
Int
Float
Char
String
RawString
Comment
skipComment
)
var tokenString = map[rune]string{
EOF: "EOF",
Ident: "Ident",
Int: "Int",
Float: "Float",
Char: "Char",
String: "String",
RawString: "RawString",
Comment: "Comment",
}
// TokenString returns a printable string for a token or Unicode character.
func TokenString(tok rune) string {
if s, found := tokenString[tok]; found {
return s
}
return fmt.Sprintf("%q", string(tok))
}
// GoWhitespace is the default value for the Scanner's Whitespace field.
// Its value selects Go's white space characters.
const GoWhitespace = 1<<'\t' | 1<<'\n' | 1<<'\r' | 1<<' '
const bufLen = 1024 // at least utf8.UTFMax
// A Scanner implements reading of Unicode characters and tokens from an io.Reader.
type Scanner struct {
// Input
src io.Reader
// Source buffer
srcBuf [bufLen + 1]byte // +1 for sentinel for common case of s.next()
srcPos int // reading position (srcBuf index)
srcEnd int // source end (srcBuf index)
// Source position
srcBufOffset int // byte offset of srcBuf[0] in source
line int // line count
column int // character count
lastLineLen int // length of last line in characters (for correct column reporting)
lastCharLen int // length of last character in bytes
// Token text buffer
// Typically, token text is stored completely in srcBuf, but in general
// the token text's head may be buffered in tokBuf while the token text's
// tail is stored in srcBuf.
tokBuf bytes.Buffer // token text head that is not in srcBuf anymore
tokPos int // token text tail position (srcBuf index); valid if >= 0
tokEnd int // token text tail end (srcBuf index)
// One character look-ahead
ch rune // character before current srcPos
// Error is called for each error encountered. If no Error
// function is set, the error is reported to os.Stderr.
Error func(s *Scanner, msg string)
// ErrorCount is incremented by one for each error encountered.
ErrorCount int
// The Mode field controls which tokens are recognized. For instance,
// to recognize Ints, set the ScanInts bit in Mode. The field may be
// changed at any time.
Mode uint
// The Whitespace field controls which characters are recognized
// as white space. To recognize a character ch <= ' ' as white space,
// set the ch'th bit in Whitespace (the Scanner's behavior is undefined
// for values ch > ' '). The field may be changed at any time.
Whitespace uint64
// Start position of most recently scanned token; set by Scan.
// Calling Init or Next invalidates the position (Line == 0).
// The Filename field is always left untouched by the Scanner.
// If an error is reported (via Error) and Position is invalid,
// the scanner is not inside a token. Call Pos to obtain an error
// position in that case.
Position
}
// Init initializes a Scanner with a new source and returns s.
// Error is set to nil, ErrorCount is set to 0, Mode is set to GoTokens,
// and Whitespace is set to GoWhitespace.
func (s *Scanner) Init(src io.Reader) *Scanner {
s.src = src
// initialize source buffer
// (the first call to next() will fill it by calling src.Read)
s.srcBuf[0] = utf8.RuneSelf // sentinel
s.srcPos = 0
s.srcEnd = 0
// initialize source position
s.srcBufOffset = 0
s.line = 1
s.column = 0
s.lastLineLen = 0
s.lastCharLen = 0
// initialize token text buffer
// (required for first call to next()).
s.tokPos = -1
// initialize one character look-ahead
s.ch = -1 // no char read yet
// initialize public fields
s.Error = nil
s.ErrorCount = 0
s.Mode = GoTokens
s.Whitespace = GoWhitespace
s.Line = 0 // invalidate token position
return s
}
// next reads and returns the next Unicode character. It is designed such
// that only a minimal amount of work needs to be done in the common ASCII
// case (one test to check for both ASCII and end-of-buffer, and one test
// to check for newlines).
func (s *Scanner) next() rune {
ch, width := rune(s.srcBuf[s.srcPos]), 1
if ch >= utf8.RuneSelf {
// uncommon case: not ASCII or not enough bytes
for s.srcPos+utf8.UTFMax > s.srcEnd && !utf8.FullRune(s.srcBuf[s.srcPos:s.srcEnd]) {
// not enough bytes: read some more, but first
// save away token text if any
if s.tokPos >= 0 {
s.tokBuf.Write(s.srcBuf[s.tokPos:s.srcPos])
s.tokPos = 0
// s.tokEnd is set by Scan()
}
// move unread bytes to beginning of buffer
copy(s.srcBuf[0:], s.srcBuf[s.srcPos:s.srcEnd])
s.srcBufOffset += s.srcPos
// read more bytes
// (an io.Reader must return io.EOF when it reaches
// the end of what it is reading - simply returning
// n == 0 will make this loop retry forever; but the
// error is in the reader implementation in that case)
i := s.srcEnd - s.srcPos
n, err := s.src.Read(s.srcBuf[i:bufLen])
s.srcPos = 0
s.srcEnd = i + n
s.srcBuf[s.srcEnd] = utf8.RuneSelf // sentinel
if err != nil {
if s.srcEnd == 0 {
if s.lastCharLen > 0 {
// previous character was not EOF
s.column++
}
s.lastCharLen = 0
return EOF
}
if err != io.EOF {
s.error(err.Error())
}
// If err == EOF, we won't be getting more
// bytes; break to avoid infinite loop. If
// err is something else, we don't know if
// we can get more bytes; thus also break.
break
}
}
// at least one byte
ch = rune(s.srcBuf[s.srcPos])
if ch >= utf8.RuneSelf {
// uncommon case: not ASCII
ch, width = utf8.DecodeRune(s.srcBuf[s.srcPos:s.srcEnd])
if ch == utf8.RuneError && width == 1 {
// advance for correct error position
s.srcPos += width
s.lastCharLen = width
s.column++
s.error("illegal UTF-8 encoding")
return ch
}
}
}
// advance
s.srcPos += width
s.lastCharLen = width
s.column++
// special situations
switch ch {
case 0:
// for compatibility with other tools
s.error("illegal character NUL")
case '\n':
s.line++
s.lastLineLen = s.column
s.column = 0
}
return ch
}
// Next reads and returns the next Unicode character.
// It returns EOF at the end of the source. It reports
// a read error by calling s.Error, if not nil; otherwise
// it prints an error message to os.Stderr. Next does not
// update the Scanner's Position field; use Pos() to
// get the current position.
func (s *Scanner) Next() rune {
s.tokPos = -1 // don't collect token text
s.Line = 0 // invalidate token position
ch := s.Peek()
s.ch = s.next()
return ch
}
// Peek returns the next Unicode character in the source without advancing
// the scanner. It returns EOF if the scanner's position is at the last
// character of the source.
func (s *Scanner) Peek() rune {
if s.ch < 0 {
// this code is only run for the very first character
s.ch = s.next()
if s.ch == '\uFEFF' {
s.ch = s.next() // ignore BOM
}
}
return s.ch
}
func (s *Scanner) error(msg string) {
s.ErrorCount++
if s.Error != nil {
s.Error(s, msg)
return
}
pos := s.Position
if !pos.IsValid() {
pos = s.Pos()
}
fmt.Fprintf(os.Stderr, "%s: %s\n", pos, msg)
}
func (s *Scanner) scanIdentifier() rune {
ch := s.next() // read character after first '_' or letter
for detectIdent(ch) {
ch = s.next()
}
return ch
}
func digitVal(ch rune) int {
switch {
case '0' <= ch && ch <= '9':
return int(ch - '0')
case 'a' <= ch && ch <= 'f':
return int(ch - 'a' + 10)
case 'A' <= ch && ch <= 'F':
return int(ch - 'A' + 10)
}
return 16 // larger than any legal digit val
}
func isDecimal(ch rune) bool { return '0' <= ch && ch <= '9' }
func (s *Scanner) scanMantissa(ch rune) rune {
for isDecimal(ch) {
ch = s.next()
}
return ch
}
func (s *Scanner) scanFraction(ch rune) rune {
if ch == '.' {
ch = s.scanMantissa(s.next())
}
return ch
}
func (s *Scanner) scanExponent(ch rune) rune {
if ch == 'e' || ch == 'E' {
ch = s.next()
if ch == '-' || ch == '+' {
ch = s.next()
}
ch = s.scanMantissa(ch)
}
return ch
}
func (s *Scanner) scanNumber(ch rune) (rune, rune) {
// isDecimal(ch)
if ch == '0' {
// int or float
ch = s.next()
if ch == 'x' || ch == 'X' {
// hexadecimal int
ch = s.next()
hasMantissa := false
for digitVal(ch) < 16 {
ch = s.next()
hasMantissa = true
}
if !hasMantissa {
s.error("illegal hexadecimal number")
}
} else {
// octal int or float
has8or9 := false
for isDecimal(ch) {
if ch > '7' {
has8or9 = true
}
ch = s.next()
}
if s.Mode&ScanFloats != 0 && (ch == '.' || ch == 'e' || ch == 'E') {
// float
ch = s.scanFraction(ch)
ch = s.scanExponent(ch)
return Float, ch
}
// octal int
if has8or9 {
s.error("illegal octal number")
}
}
return Int, ch
}
// decimal int or float
ch = s.scanMantissa(ch)
if s.Mode&ScanFloats != 0 && (ch == '.' || ch == 'e' || ch == 'E') {
// float
ch = s.scanFraction(ch)
ch = s.scanExponent(ch)
return Float, ch
}
return Int, ch
}
func (s *Scanner) scanDigits(ch rune, base, n int) rune {
for n > 0 && digitVal(ch) < base {
ch = s.next()
n--
}
if n > 0 {
s.error("illegal char escape")
}
return ch
}
func (s *Scanner) scanEscape(quote rune) rune {
ch := s.next() // read character after '/'
switch ch {
case 'a', 'b', 'f', 'n', 'r', 't', 'v', '\\', quote:
// nothing to do
ch = s.next()
case '0', '1', '2', '3', '4', '5', '6', '7':
ch = s.scanDigits(ch, 8, 3)
case 'x':
ch = s.scanDigits(s.next(), 16, 2)
case 'u':
ch = s.scanDigits(s.next(), 16, 4)
case 'U':
ch = s.scanDigits(s.next(), 16, 8)
default:
s.error("illegal char escape")
}
return ch
}
func (s *Scanner) scanString(quote rune) (n int) {
ch := s.next() // read character after quote
for ch != quote {
if ch == '\n' || ch < 0 {
s.error("literal not terminated")
return
}
if ch == '\\' {
ch = s.scanEscape(quote)
} else {
ch = s.next()
}
n++
}
return
}
func (s *Scanner) scanRawString() {
ch := s.next() // read character after '`'
for ch != '`' {
if ch < 0 {
s.error("literal not terminated")
return
}
ch = s.next()
}
}
func (s *Scanner) scanChar() {
if s.scanString('\'') != 1 {
s.error("illegal char literal")
}
}
func (s *Scanner) scanComment(ch rune) rune {
// ch == '/' || ch == '*'
if ch == '/' {
// line comment
ch = s.next() // read character after "//"
for ch != '\n' && ch >= 0 {
ch = s.next()
}
return ch
}
// general comment
ch = s.next() // read character after "/*"
for {
if ch < 0 {
s.error("comment not terminated")
break
}
ch0 := ch
ch = s.next()
if ch0 == '*' && ch == '/' {
ch = s.next()
break
}
}
return ch
}
// Scan reads the next token or Unicode character from source and returns it.
// It only recognizes tokens t for which the respective Mode bit (1<<-t) is set.
// It returns EOF at the end of the source. It reports scanner errors (read and
// token errors) by calling s.Error, if not nil; otherwise it prints an error
// message to os.Stderr.
func (s *Scanner) Scan() rune {
ch := s.Peek()
// reset token text position
s.tokPos = -1
s.Line = 0
redo:
// skip white space
for s.Whitespace&(1<<uint(ch)) != 0 {
ch = s.next()
}
// start collecting token text
s.tokBuf.Reset()
s.tokPos = s.srcPos - s.lastCharLen
// set token position
// (this is a slightly optimized version of the code in Pos())
s.Offset = s.srcBufOffset + s.tokPos
if s.column > 0 {
// common case: last character was not a '\n'
s.Line = s.line
s.Column = s.column
} else {
// last character was a '\n'
// (we cannot be at the beginning of the source
// since we have called next() at least once)
s.Line = s.line - 1
s.Column = s.lastLineLen
}
// determine token value
tok := ch
switch {
case detectIdent(ch):
if s.Mode&ScanIdents != 0 {
tok = Ident
ch = s.scanIdentifier()
} else {
ch = s.next()
}
case isDecimal(ch):
if s.Mode&(ScanInts|ScanFloats) != 0 {
tok, ch = s.scanNumber(ch)
} else {
ch = s.next()
}
default:
switch ch {
case '"':
if s.Mode&ScanStrings != 0 {
s.scanString('"')
tok = String
}
ch = s.next()
case '\'':
if s.Mode&ScanChars != 0 {
s.scanChar()
tok = Char
}
ch = s.next()
case '.':
ch = s.next()
if isDecimal(ch) && s.Mode&ScanFloats != 0 {
tok = Float
ch = s.scanMantissa(ch)
ch = s.scanExponent(ch)
}
case '/':
ch = s.next()
if (ch == '/' || ch == '*') && s.Mode&ScanComments != 0 {
if s.Mode&SkipComments != 0 {
s.tokPos = -1 // don't collect token text
ch = s.scanComment(ch)
goto redo
}
ch = s.scanComment(ch)
tok = Comment
}
case '`':
if s.Mode&ScanRawStrings != 0 {
s.scanRawString()
tok = String
}
ch = s.next()
default:
ch = s.next()
}
}
// end of token text
s.tokEnd = s.srcPos - s.lastCharLen
s.ch = ch
return tok
}
// Pos returns the position of the character immediately after
// the character or token returned by the last call to Next or Scan.
func (s *Scanner) Pos() (pos Position) {
pos.Filename = s.Filename
pos.Offset = s.srcBufOffset + s.srcPos - s.lastCharLen
switch {
case s.column > 0:
// common case: last character was not a '\n'
pos.Line = s.line
pos.Column = s.column
case s.lastLineLen > 0:
// last character was a '\n'
pos.Line = s.line - 1
pos.Column = s.lastLineLen
default:
// at the beginning of the source
pos.Line = 1
pos.Column = 1
}
return
}
// TokenText returns the string corresponding to the most recently scanned token.
// Valid after calling Scan().
func (s *Scanner) TokenText() string {
if s.tokPos < 0 {
// no token text
return ""
}
if s.tokEnd < 0 {
// if EOF was reached, s.tokEnd is set to -1 (s.srcPos == 0)
s.tokEnd = s.tokPos
}
if s.tokBuf.Len() == 0 {
// common case: the entire token text is still in srcBuf
return string(s.srcBuf[s.tokPos:s.tokEnd])
}
// part of the token text was saved in tokBuf: save the rest in
// tokBuf as well and return its content
s.tokBuf.Write(s.srcBuf[s.tokPos:s.tokEnd])
s.tokPos = s.tokEnd // ensure idempotency of TokenText() call
return s.tokBuf.String()
}