فهرست منبع

Merge pull request #20002 from twistlock/19575_authz_plugin_support_events

Fix 19575: Docker events doesn't work with authorization plugin
Alexander Morozov 9 سال پیش
والد
کامیت
862f073694
4فایلهای تغییر یافته به همراه146 افزوده شده و 9 حذف شده
  1. 70 0
      integration-cli/docker_cli_authz_unix_test.go
  2. 1 1
      pkg/authorization/authz.go
  3. 2 2
      pkg/authorization/authz_test.go
  4. 73 6
      pkg/authorization/response.go

+ 70 - 0
integration-cli/docker_cli_authz_unix_test.go

@@ -11,10 +11,15 @@ import (
 	"os"
 	"strings"
 
+	"bufio"
+	"bytes"
 	"github.com/docker/docker/pkg/authorization"
 	"github.com/docker/docker/pkg/integration/checker"
 	"github.com/docker/docker/pkg/plugins"
 	"github.com/go-check/check"
+	"os/exec"
+	"strconv"
+	"time"
 )
 
 const (
@@ -221,6 +226,71 @@ func (s *DockerAuthzSuite) TestAuthZPluginDenyResponse(c *check.C) {
 	c.Assert(res, check.Equals, fmt.Sprintf("Error response from daemon: authorization denied by plugin %s: %s\n", testAuthZPlugin, unauthorizedMessage))
 }
 
+// TestAuthZPluginAllowEventStream verifies event stream propogates correctly after request pass through by the authorization plugin
+func (s *DockerAuthzSuite) TestAuthZPluginAllowEventStream(c *check.C) {
+	testRequires(c, DaemonIsLinux)
+
+	// Start the authorization plugin
+	err := s.d.Start("--authorization-plugin=" + testAuthZPlugin)
+	c.Assert(err, check.IsNil)
+	s.ctrl.reqRes.Allow = true
+	s.ctrl.resRes.Allow = true
+
+	startTime := strconv.FormatInt(daemonTime(c).Unix(), 10)
+	// Add another command to to enable event pipelining
+	eventsCmd := exec.Command(s.d.cmd.Path, "--host", s.d.sock(), "events", "--since", startTime)
+	stdout, err := eventsCmd.StdoutPipe()
+	if err != nil {
+		c.Assert(err, check.IsNil)
+	}
+
+	observer := eventObserver{
+		buffer:    new(bytes.Buffer),
+		command:   eventsCmd,
+		scanner:   bufio.NewScanner(stdout),
+		startTime: startTime,
+	}
+
+	err = observer.Start()
+	c.Assert(err, checker.IsNil)
+	defer observer.Stop()
+
+	// Create a container and wait for the creation events
+	_, err = s.d.Cmd("pull", "busybox")
+	c.Assert(err, check.IsNil)
+	out, err := s.d.Cmd("run", "-d", "busybox", "top")
+	c.Assert(err, check.IsNil)
+
+	containerID := strings.TrimSpace(out)
+
+	events := map[string]chan bool{
+		"create": make(chan bool),
+		"start":  make(chan bool),
+	}
+
+	matcher := matchEventLine(containerID, "container", events)
+	processor := processEventMatch(events)
+	go observer.Match(matcher, processor)
+
+	// Ensure all events are received
+	for event, eventChannel := range events {
+
+		select {
+		case <-time.After(5 * time.Second):
+			// Fail the test
+			observer.CheckEventError(c, containerID, event, matcher)
+			c.FailNow()
+		case <-eventChannel:
+			// Ignore, event received
+		}
+	}
+
+	// Ensure both events and container endpoints are passed to the authorization plugin
+	assertURIRecorded(c, s.ctrl.requestsURIs, "/events")
+	assertURIRecorded(c, s.ctrl.requestsURIs, "/containers/create")
+	assertURIRecorded(c, s.ctrl.requestsURIs, fmt.Sprintf("/containers/%s/start", containerID))
+}
+
 func (s *DockerAuthzSuite) TestAuthZPluginErrorResponse(c *check.C) {
 	err := s.d.Start("--authorization-plugin=" + testAuthZPlugin)
 	c.Assert(err, check.IsNil)

+ 1 - 1
pkg/authorization/authz.go

@@ -116,7 +116,7 @@ func (ctx *Ctx) AuthZResponse(rm ResponseModifier, r *http.Request) error {
 		}
 	}
 
-	rm.Flush()
+	rm.FlushAll()
 
 	return nil
 }

+ 2 - 2
pkg/authorization/authz_test.go

@@ -118,7 +118,7 @@ func TestResponseModifier(t *testing.T) {
 	m.Write([]byte("body"))
 	m.WriteHeader(500)
 
-	m.Flush()
+	m.FlushAll()
 	if r.Header().Get("h1") != "v1" {
 		t.Fatalf("Header value must exists %s", r.Header().Get("h1"))
 	}
@@ -147,7 +147,7 @@ func TestResponseModifierOverride(t *testing.T) {
 	m.OverrideHeader(overrideHeaderBytes)
 	m.OverrideBody([]byte("override body"))
 	m.OverrideStatusCode(404)
-	m.Flush()
+	m.FlushAll()
 	if r.Header().Get("h1") != "v2" {
 		t.Fatalf("Header value must exists %s", r.Header().Get("h1"))
 	}

+ 73 - 6
pkg/authorization/response.go

@@ -5,6 +5,7 @@ import (
 	"bytes"
 	"encoding/json"
 	"fmt"
+	"github.com/Sirupsen/logrus"
 	"net"
 	"net/http"
 )
@@ -12,6 +13,8 @@ import (
 // ResponseModifier allows authorization plugins to read and modify the content of the http.response
 type ResponseModifier interface {
 	http.ResponseWriter
+	http.Flusher
+	http.CloseNotifier
 
 	// RawBody returns the current http content
 	RawBody() []byte
@@ -32,7 +35,10 @@ type ResponseModifier interface {
 	OverrideStatusCode(statusCode int)
 
 	// Flush flushes all data to the HTTP response
-	Flush() error
+	FlushAll() error
+
+	// Hijacked indicates the response has been hijacked by the Docker daemon
+	Hijacked() bool
 }
 
 // NewResponseModifier creates a wrapper to an http.ResponseWriter to allow inspecting and modifying the content
@@ -44,7 +50,10 @@ func NewResponseModifier(rw http.ResponseWriter) ResponseModifier {
 // the http request/response from docker daemon
 type responseModifier struct {
 	// The original response writer
-	rw     http.ResponseWriter
+	rw http.ResponseWriter
+
+	r *http.Request
+
 	status int
 	// body holds the response body
 	body []byte
@@ -52,15 +61,34 @@ type responseModifier struct {
 	header http.Header
 	// statusCode holds the response status code
 	statusCode int
+	// hijacked indicates the request has been hijacked
+	hijacked bool
+}
+
+func (rm *responseModifier) Hijacked() bool {
+	return rm.hijacked
 }
 
 // WriterHeader stores the http status code
 func (rm *responseModifier) WriteHeader(s int) {
+
+	// Use original request if hijacked
+	if rm.hijacked {
+		rm.rw.WriteHeader(s)
+		return
+	}
+
 	rm.statusCode = s
 }
 
 // Header returns the internal http header
 func (rm *responseModifier) Header() http.Header {
+
+	// Use original header if hijacked
+	if rm.hijacked {
+		return rm.rw.Header()
+	}
+
 	return rm.header
 }
 
@@ -90,6 +118,11 @@ func (rm *responseModifier) OverrideHeader(b []byte) error {
 
 // Write stores the byte array inside content
 func (rm *responseModifier) Write(b []byte) (int, error) {
+
+	if rm.hijacked {
+		return rm.rw.Write(b)
+	}
+
 	rm.body = append(rm.body, b...)
 	return len(b), nil
 }
@@ -109,6 +142,10 @@ func (rm *responseModifier) RawHeaders() ([]byte, error) {
 
 // Hijack returns the internal connection of the wrapped http.ResponseWriter
 func (rm *responseModifier) Hijack() (net.Conn, *bufio.ReadWriter, error) {
+
+	rm.hijacked = true
+	rm.FlushAll()
+
 	hijacker, ok := rm.rw.(http.Hijacker)
 	if !ok {
 		return nil, nil, fmt.Errorf("Internal reponse writer doesn't support the Hijacker interface")
@@ -116,8 +153,30 @@ func (rm *responseModifier) Hijack() (net.Conn, *bufio.ReadWriter, error) {
 	return hijacker.Hijack()
 }
 
-// Flush flushes all data to the HTTP response
-func (rm *responseModifier) Flush() error {
+// CloseNotify uses the internal close notify API of the wrapped http.ResponseWriter
+func (rm *responseModifier) CloseNotify() <-chan bool {
+	closeNotifier, ok := rm.rw.(http.CloseNotifier)
+	if !ok {
+		logrus.Errorf("Internal reponse writer doesn't support the CloseNotifier interface")
+		return nil
+	}
+	return closeNotifier.CloseNotify()
+}
+
+// Flush uses the internal flush API of the wrapped http.ResponseWriter
+func (rm *responseModifier) Flush() {
+	flusher, ok := rm.rw.(http.Flusher)
+	if !ok {
+		logrus.Errorf("Internal reponse writer doesn't support the Flusher interface")
+		return
+	}
+
+	rm.FlushAll()
+	flusher.Flush()
+}
+
+// FlushAll flushes all data to the HTTP response
+func (rm *responseModifier) FlushAll() error {
 	// Copy the status code
 	if rm.statusCode > 0 {
 		rm.rw.WriteHeader(rm.statusCode)
@@ -130,7 +189,15 @@ func (rm *responseModifier) Flush() error {
 		}
 	}
 
-	// Write body
-	_, err := rm.rw.Write(rm.body)
+	var err error
+	if len(rm.body) > 0 {
+		// Write body
+		_, err = rm.rw.Write(rm.body)
+	}
+
+	// Clean previous data
+	rm.body = nil
+	rm.statusCode = 0
+	rm.header = http.Header{}
 	return err
 }