Prechádzať zdrojové kódy

Merge pull request #5716 from shykes/pr_out_engine_receiver_and_sender_preserve_job_env

Solomon Hykes 11 rokov pred
rodič
commit
5877ae2462

+ 24 - 0
engine/env.go

@@ -250,3 +250,27 @@ func (env *Env) Map() map[string]string {
 	}
 	}
 	return m
 	return m
 }
 }
+
+// MultiMap returns a representation of env as a
+// map of string arrays, keyed by string.
+// This is the same structure as http headers for example,
+// which allow each key to have multiple values.
+func (env *Env) MultiMap() map[string][]string {
+	m := make(map[string][]string)
+	for _, kv := range *env {
+		parts := strings.SplitN(kv, "=", 2)
+		m[parts[0]] = append(m[parts[0]], parts[1])
+	}
+	return m
+}
+
+// InitMultiMap removes all values in env, then initializes
+// new values from the contents of m.
+func (env *Env) InitMultiMap(m map[string][]string) {
+	(*env) = make([]string, 0, len(m))
+	for k, vals := range m {
+		for _, v := range vals {
+			env.Set(k, v)
+		}
+	}
+}

+ 20 - 0
engine/env_test.go

@@ -123,3 +123,23 @@ func TestEnviron(t *testing.T) {
 		t.Fatalf("bar not found in the environ")
 		t.Fatalf("bar not found in the environ")
 	}
 	}
 }
 }
+
+func TestMultiMap(t *testing.T) {
+	e := &Env{}
+	e.Set("foo", "bar")
+	e.Set("bar", "baz")
+	e.Set("hello", "world")
+	m := e.MultiMap()
+	e2 := &Env{}
+	e2.Set("old_key", "something something something")
+	e2.InitMultiMap(m)
+	if v := e2.Get("old_key"); v != "" {
+		t.Fatalf("%#v", v)
+	}
+	if v := e2.Get("bar"); v != "baz" {
+		t.Fatalf("%#v", v)
+	}
+	if v := e2.Get("hello"); v != "world" {
+		t.Fatalf("%#v", v)
+	}
+}

+ 11 - 2
engine/remote.go

@@ -25,7 +25,9 @@ func (s *Sender) Install(eng *Engine) error {
 }
 }
 
 
 func (s *Sender) Handle(job *Job) Status {
 func (s *Sender) Handle(job *Job) Status {
-	msg := data.Empty().Set("cmd", append([]string{job.Name}, job.Args...)...)
+	cmd := append([]string{job.Name}, job.Args...)
+	env := data.Encode(job.Env().MultiMap())
+	msg := data.Empty().Set("cmd", cmd...).Set("env", env)
 	peer, err := beam.SendConn(s, msg.Bytes())
 	peer, err := beam.SendConn(s, msg.Bytes())
 	if err != nil {
 	if err != nil {
 		return job.Errorf("beamsend: %v", err)
 		return job.Errorf("beamsend: %v", err)
@@ -99,8 +101,15 @@ func (rcv *Receiver) Run() error {
 		}
 		}
 		f.Close()
 		f.Close()
 		defer peer.Close()
 		defer peer.Close()
-		cmd := data.Message(p).Get("cmd")
+		msg := data.Message(p)
+		cmd := msg.Get("cmd")
 		job := rcv.Engine.Job(cmd[0], cmd[1:]...)
 		job := rcv.Engine.Job(cmd[0], cmd[1:]...)
+		// Decode env
+		env, err := data.Decode(msg.GetOne("env"))
+		if err != nil {
+			return fmt.Errorf("error decoding 'env': %v", err)
+		}
+		job.Env().InitMultiMap(env)
 		stdout, err := beam.SendRPipe(peer, data.Empty().Set("cmd", "log", "stdout").Bytes())
 		stdout, err := beam.SendRPipe(peer, data.Empty().Set("cmd", "log", "stdout").Bytes())
 		if err != nil {
 		if err != nil {
 			return err
 			return err

+ 41 - 0
engine/remote_test.go

@@ -82,6 +82,47 @@ func TestStdin(t *testing.T) {
 	)
 	)
 }
 }
 
 
+func TestEnv(t *testing.T) {
+	var (
+		foo          string
+		answer       int
+		shadok_words []string
+	)
+	testRemote(t,
+
+		func(eng *Engine) {
+			job := eng.Job("sendenv")
+			job.Env().Set("foo", "bar")
+			job.Env().SetInt("answer", 42)
+			job.Env().SetList("shadok_words", []string{"ga", "bu", "zo", "meu"})
+			if err := job.Run(); err != nil {
+				t.Fatal(err)
+			}
+		},
+
+		func(eng *Engine) {
+			eng.Register("sendenv", func(job *Job) Status {
+				foo = job.Env().Get("foo")
+				answer = job.Env().GetInt("answer")
+				shadok_words = job.Env().GetList("shadok_words")
+				return StatusOK
+			})
+		},
+	)
+	// Check for results here rather than inside the job handler,
+	// otherwise the tests may incorrectly pass if the handler is not
+	// called.
+	if foo != "bar" {
+		t.Fatalf("%#v", foo)
+	}
+	if answer != 42 {
+		t.Fatalf("%#v", answer)
+	}
+	if strings.Join(shadok_words, ", ") != "ga, bu, zo, meu" {
+		t.Fatalf("%#v", shadok_words)
+	}
+}
+
 // Helpers
 // Helpers
 
 
 func testRemote(t *testing.T, senderSide, receiverSide func(*Engine)) {
 func testRemote(t *testing.T, senderSide, receiverSide func(*Engine)) {

+ 10 - 0
pkg/beam/data/message.go

@@ -72,6 +72,16 @@ func (m Message) Get(k string) []string {
 	return v
 	return v
 }
 }
 
 
+// GetOne returns the last value added at the key k,
+// or an empty string if there is no value.
+func (m Message) GetOne(k string) string {
+	var v string
+	if vals := m.Get(k); len(vals) > 0 {
+		v = vals[len(vals)-1]
+	}
+	return v
+}
+
 func (m Message) Pretty() string {
 func (m Message) Pretty() string {
 	data, err := Decode(string(m))
 	data, err := Decode(string(m))
 	if err != nil {
 	if err != nil {

+ 8 - 0
pkg/beam/data/message_test.go

@@ -51,3 +51,11 @@ func TestSetDelMessage(t *testing.T) {
 		t.Fatalf("'%v' != '%v'", output, expectedOutput)
 		t.Fatalf("'%v' != '%v'", output, expectedOutput)
 	}
 	}
 }
 }
+
+func TestGetOne(t *testing.T) {
+	m := Empty().Set("shadok words", "ga", "bu", "zo", "meu")
+	val := m.GetOne("shadok words")
+	if val != "meu" {
+		t.Fatalf("%#v", val)
+	}
+}