Browse Source

Timestamps for docker logs.

Fixes #1165
Docker-DCO-1.1-Signed-off-by: Alexandr Morozov <lk4d4math@gmail.com> (github: LK4D4)
Alexandr Morozov 11 năm trước cách đây
mục cha
commit
d1297feef8

+ 6 - 3
api/client/commands.go

@@ -1583,6 +1583,7 @@ func (cli *DockerCli) CmdDiff(args ...string) error {
 func (cli *DockerCli) CmdLogs(args ...string) error {
 func (cli *DockerCli) CmdLogs(args ...string) error {
 	cmd := cli.Subcmd("logs", "CONTAINER", "Fetch the logs of a container")
 	cmd := cli.Subcmd("logs", "CONTAINER", "Fetch the logs of a container")
 	follow := cmd.Bool([]string{"f", "-follow"}, false, "Follow log output")
 	follow := cmd.Bool([]string{"f", "-follow"}, false, "Follow log output")
+	times := cmd.Bool([]string{"t", "-timestamps"}, false, "Show timestamps")
 	if err := cmd.Parse(args); err != nil {
 	if err := cmd.Parse(args); err != nil {
 		return nil
 		return nil
 	}
 	}
@@ -1603,14 +1604,16 @@ func (cli *DockerCli) CmdLogs(args ...string) error {
 	}
 	}
 
 
 	v := url.Values{}
 	v := url.Values{}
-	v.Set("logs", "1")
 	v.Set("stdout", "1")
 	v.Set("stdout", "1")
 	v.Set("stderr", "1")
 	v.Set("stderr", "1")
+	if *times {
+		v.Set("timestamps", "1")
+	}
 	if *follow && container.State.Running {
 	if *follow && container.State.Running {
-		v.Set("stream", "1")
+		v.Set("follow", "1")
 	}
 	}
 
 
-	if err := cli.hijack("POST", "/containers/"+name+"/attach?"+v.Encode(), container.Config.Tty, nil, cli.out, cli.err, nil); err != nil {
+	if err := cli.streamHelper("GET", "/containers/"+name+"/logs?"+v.Encode(), container.Config.Tty, nil, cli.out, cli.err, nil); err != nil {
 		return err
 		return err
 	}
 	}
 	return nil
 	return nil

+ 13 - 2
api/client/utils.go

@@ -130,6 +130,10 @@ func (cli *DockerCli) call(method, path string, data interface{}, passAuthInfo b
 }
 }
 
 
 func (cli *DockerCli) stream(method, path string, in io.Reader, out io.Writer, headers map[string][]string) error {
 func (cli *DockerCli) stream(method, path string, in io.Reader, out io.Writer, headers map[string][]string) error {
+	return cli.streamHelper(method, path, true, in, out, nil, headers)
+}
+
+func (cli *DockerCli) streamHelper(method, path string, setRawTerminal bool, in io.Reader, stdout, stderr io.Writer, headers map[string][]string) error {
 	if (method == "POST" || method == "PUT") && in == nil {
 	if (method == "POST" || method == "PUT") && in == nil {
 		in = bytes.NewReader([]byte{})
 		in = bytes.NewReader([]byte{})
 	}
 	}
@@ -184,9 +188,16 @@ func (cli *DockerCli) stream(method, path string, in io.Reader, out io.Writer, h
 	}
 	}
 
 
 	if api.MatchesContentType(resp.Header.Get("Content-Type"), "application/json") {
 	if api.MatchesContentType(resp.Header.Get("Content-Type"), "application/json") {
-		return utils.DisplayJSONMessagesStream(resp.Body, out, cli.terminalFd, cli.isTerminal)
+		return utils.DisplayJSONMessagesStream(resp.Body, stdout, cli.terminalFd, cli.isTerminal)
 	}
 	}
-	if _, err := io.Copy(out, resp.Body); err != nil {
+	if stdout != nil || stderr != nil {
+		// When TTY is ON, use regular copy
+		if setRawTerminal {
+			_, err = io.Copy(stdout, resp.Body)
+		} else {
+			_, err = utils.StdCopy(stdout, stderr, resp.Body)
+		}
+		utils.Debugf("[stream] End of stdout")
 		return err
 		return err
 	}
 	}
 	return nil
 	return nil

+ 43 - 0
api/server/server.go

@@ -328,6 +328,48 @@ func getContainersJSON(eng *engine.Engine, version version.Version, w http.Respo
 	return nil
 	return nil
 }
 }
 
 
+func getContainersLogs(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
+	if err := parseForm(r); err != nil {
+		return err
+	}
+	if vars == nil {
+		return fmt.Errorf("Missing parameter")
+	}
+
+	var (
+		job    = eng.Job("inspect", vars["name"], "container")
+		c, err = job.Stdout.AddEnv()
+	)
+	if err != nil {
+		return err
+	}
+	if err = job.Run(); err != nil {
+		return err
+	}
+
+	var outStream, errStream io.Writer
+	outStream = utils.NewWriteFlusher(w)
+
+	if c.GetSubEnv("Config") != nil && !c.GetSubEnv("Config").GetBool("Tty") && version.GreaterThanOrEqualTo("1.6") {
+		errStream = utils.NewStdWriter(outStream, utils.Stderr)
+		outStream = utils.NewStdWriter(outStream, utils.Stdout)
+	} else {
+		errStream = outStream
+	}
+
+	job = eng.Job("logs", vars["name"])
+	job.Setenv("follow", r.Form.Get("follow"))
+	job.Setenv("stdout", r.Form.Get("stdout"))
+	job.Setenv("stderr", r.Form.Get("stderr"))
+	job.Setenv("timestamps", r.Form.Get("timestamps"))
+	job.Stdout.Add(outStream)
+	job.Stderr.Set(errStream)
+	if err := job.Run(); err != nil {
+		fmt.Fprintf(outStream, "Error: %s\n", err)
+	}
+	return nil
+}
+
 func postImagesTag(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
 func postImagesTag(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
 	if err := parseForm(r); err != nil {
 	if err := parseForm(r); err != nil {
 		return err
 		return err
@@ -1017,6 +1059,7 @@ func createRouter(eng *engine.Engine, logging, enableCors bool, dockerVersion st
 			"/containers/{name:.*}/changes":   getContainersChanges,
 			"/containers/{name:.*}/changes":   getContainersChanges,
 			"/containers/{name:.*}/json":      getContainersByName,
 			"/containers/{name:.*}/json":      getContainersByName,
 			"/containers/{name:.*}/top":       getContainersTop,
 			"/containers/{name:.*}/top":       getContainersTop,
+			"/containers/{name:.*}/logs":      getContainersLogs,
 			"/containers/{name:.*}/attach/ws": wsContainersAttach,
 			"/containers/{name:.*}/attach/ws": wsContainersAttach,
 		},
 		},
 		"POST": {
 		"POST": {

+ 12 - 0
daemon/container.go

@@ -473,6 +473,18 @@ func (container *Container) StderrPipe() (io.ReadCloser, error) {
 	return utils.NewBufReader(reader), nil
 	return utils.NewBufReader(reader), nil
 }
 }
 
 
+func (container *Container) StdoutLogPipe() io.ReadCloser {
+	reader, writer := io.Pipe()
+	container.stdout.AddWriter(writer, "stdout")
+	return utils.NewBufReader(reader)
+}
+
+func (container *Container) StderrLogPipe() io.ReadCloser {
+	reader, writer := io.Pipe()
+	container.stderr.AddWriter(writer, "stderr")
+	return utils.NewBufReader(reader)
+}
+
 func (container *Container) buildHostnameAndHostsFiles(IP string) {
 func (container *Container) buildHostnameAndHostsFiles(IP string) {
 	container.HostnamePath = path.Join(container.root, "hostname")
 	container.HostnamePath = path.Join(container.root, "hostname")
 	ioutil.WriteFile(container.HostnamePath, []byte(container.Config.Hostname+"\n"), 0644)
 	ioutil.WriteFile(container.HostnamePath, []byte(container.Config.Hostname+"\n"), 0644)

+ 4 - 0
docs/sources/reference/api/docker_remote_api.md

@@ -45,6 +45,10 @@ You can still call an old version of the api using
 You can now use the `-until` parameter to close connection
 You can now use the `-until` parameter to close connection
 after timestamp.
 after timestamp.
 
 
+`GET /containers/(id)/logs`
+
+This url is prefered method for getting container logs now.
+
 ### v1.10
 ### v1.10
 
 
 #### Full Documentation
 #### Full Documentation

+ 36 - 0
docs/sources/reference/api/docker_remote_api_v1.11.md

@@ -300,6 +300,42 @@ List processes running inside the container `id`
     -   **404** – no such container
     -   **404** – no such container
     -   **500** – server error
     -   **500** – server error
 
 
+### Get container logs
+
+`GET /containers/(id)/logs`
+
+Get stdout and stderr logs from the container ``id``
+
+    **Example request**:
+
+       GET /containers/4fa6e0f0c678/logs?stderr=1&stdout=1&timestamps=1&follow=1 HTTP/1.1
+
+    **Example response**:
+
+       HTTP/1.1 200 OK
+       Content-Type: application/vnd.docker.raw-stream
+
+       {{ STREAM }}
+
+    Query Parameters:
+
+     
+
+    -   **follow** – 1/True/true or 0/False/false, return stream.
+        Default false
+    -   **stdout** – 1/True/true or 0/False/false, if logs=true, return
+        stdout log. Default false
+    -   **stderr** – 1/True/true or 0/False/false, if logs=true, return
+        stderr log. Default false
+    -   **timestamps** – 1/True/true or 0/False/false, if logs=true, print
+        timestamps for every log line. Default false
+
+    Status Codes:
+
+    -   **200** – no error
+    -   **404** – no such container
+    -   **500** – server error
+
 ### Inspect changes on a container's filesystem
 ### Inspect changes on a container's filesystem
 
 
 `GET /containers/(id)/changes`
 `GET /containers/(id)/changes`

+ 4 - 3
docs/sources/reference/commandline/cli.md

@@ -649,13 +649,14 @@ Fetch the logs of a container
     Usage: docker logs [OPTIONS] CONTAINER
     Usage: docker logs [OPTIONS] CONTAINER
 
 
     -f, --follow=false: Follow log output
     -f, --follow=false: Follow log output
+    -t, --timestamps=false: Show timestamps
 
 
 The `docker logs` command batch-retrieves all logs
 The `docker logs` command batch-retrieves all logs
 present at the time of execution.
 present at the time of execution.
 
 
-The `docker logs --follow` command combines `docker logs` and `docker
-attach`: it will first return all logs from the beginning and then
-continue streaming new output from the container'sstdout and stderr.
+The ``docker logs --follow`` command will first return all logs from the
+beginning and then continue streaming new output from the container's stdout
+and stderr.
 
 
 ## port
 ## port
 
 

+ 95 - 0
integration-cli/docker_cli_logs_test.go

@@ -3,7 +3,10 @@ package main
 import (
 import (
 	"fmt"
 	"fmt"
 	"os/exec"
 	"os/exec"
+	"regexp"
+	"strings"
 	"testing"
 	"testing"
+	"time"
 )
 )
 
 
 // This used to work, it test a log of PageSize-1 (gh#4851)
 // This used to work, it test a log of PageSize-1 (gh#4851)
@@ -74,3 +77,95 @@ func TestLogsContainerMuchBiggerThanPage(t *testing.T) {
 
 
 	logDone("logs - logs container running echo much bigger than page size")
 	logDone("logs - logs container running echo much bigger than page size")
 }
 }
+
+func TestLogsTimestamps(t *testing.T) {
+	testLen := 100
+	runCmd := exec.Command(dockerBinary, "run", "-d", "busybox", "sh", "-c", fmt.Sprintf("for i in $(seq 1 %d); do echo =; done;", testLen))
+
+	out, _, _, err := runCommandWithStdoutStderr(runCmd)
+	errorOut(err, t, fmt.Sprintf("run failed with errors: %v", err))
+
+	cleanedContainerID := stripTrailingCharacters(out)
+	exec.Command(dockerBinary, "wait", cleanedContainerID).Run()
+
+	logsCmd := exec.Command(dockerBinary, "logs", "-t", cleanedContainerID)
+	out, _, _, err = runCommandWithStdoutStderr(logsCmd)
+	errorOut(err, t, fmt.Sprintf("failed to log container: %v %v", out, err))
+
+	lines := strings.Split(out, "\n")
+
+	if len(lines) != testLen+1 {
+		t.Fatalf("Expected log %d lines, received %d\n", testLen+1, len(lines))
+	}
+
+	ts := regexp.MustCompile(`^\[.*?\]`)
+
+	for _, l := range lines {
+		if l != "" {
+			_, err := time.Parse("["+time.StampMilli+"]", ts.FindString(l))
+			if err != nil {
+				t.Fatalf("Failed to parse timestamp from %v: %v", l, err)
+			}
+		}
+	}
+
+	deleteContainer(cleanedContainerID)
+
+	logDone("logs - logs with timestamps")
+}
+
+func TestLogsSeparateStderr(t *testing.T) {
+	msg := "stderr_log"
+	runCmd := exec.Command(dockerBinary, "run", "-d", "busybox", "sh", "-c", fmt.Sprintf("echo %s 1>&2", msg))
+
+	out, _, _, err := runCommandWithStdoutStderr(runCmd)
+	errorOut(err, t, fmt.Sprintf("run failed with errors: %v", err))
+
+	cleanedContainerID := stripTrailingCharacters(out)
+	exec.Command(dockerBinary, "wait", cleanedContainerID).Run()
+
+	logsCmd := exec.Command(dockerBinary, "logs", cleanedContainerID)
+	stdout, stderr, _, err := runCommandWithStdoutStderr(logsCmd)
+	errorOut(err, t, fmt.Sprintf("failed to log container: %v %v", out, err))
+
+	if stdout != "" {
+		t.Fatalf("Expected empty stdout stream, got %v", stdout)
+	}
+
+	stderr = strings.TrimSpace(stderr)
+	if stderr != msg {
+		t.Fatalf("Expected %v in stderr stream, got %v", msg, stderr)
+	}
+
+	deleteContainer(cleanedContainerID)
+
+	logDone("logs - separate stderr (without pseudo-tty)")
+}
+
+func TestLogsStderrInStdout(t *testing.T) {
+	msg := "stderr_log"
+	runCmd := exec.Command(dockerBinary, "run", "-d", "-t", "busybox", "sh", "-c", fmt.Sprintf("echo %s 1>&2", msg))
+
+	out, _, _, err := runCommandWithStdoutStderr(runCmd)
+	errorOut(err, t, fmt.Sprintf("run failed with errors: %v", err))
+
+	cleanedContainerID := stripTrailingCharacters(out)
+	exec.Command(dockerBinary, "wait", cleanedContainerID).Run()
+
+	logsCmd := exec.Command(dockerBinary, "logs", cleanedContainerID)
+	stdout, stderr, _, err := runCommandWithStdoutStderr(logsCmd)
+	errorOut(err, t, fmt.Sprintf("failed to log container: %v %v", out, err))
+
+	if stderr != "" {
+		t.Fatalf("Expected empty stderr stream, got %v", stdout)
+	}
+
+	stdout = strings.TrimSpace(stdout)
+	if stdout != msg {
+		t.Fatalf("Expected %v in stdout stream, got %v", msg, stdout)
+	}
+
+	deleteContainer(cleanedContainerID)
+
+	logDone("logs - stderr in stdout (with pseudo-tty)")
+}

+ 91 - 0
server/server.go

@@ -124,6 +124,7 @@ func InitServer(job *engine.Job) engine.Status {
 		"container_copy":   srv.ContainerCopy,
 		"container_copy":   srv.ContainerCopy,
 		"insert":           srv.ImageInsert,
 		"insert":           srv.ImageInsert,
 		"attach":           srv.ContainerAttach,
 		"attach":           srv.ContainerAttach,
+		"logs":             srv.ContainerLogs,
 		"search":           srv.ImagesSearch,
 		"search":           srv.ImagesSearch,
 		"changes":          srv.ContainerChanges,
 		"changes":          srv.ContainerChanges,
 		"top":              srv.ContainerTop,
 		"top":              srv.ContainerTop,
@@ -2252,6 +2253,96 @@ func (srv *Server) ContainerResize(job *engine.Job) engine.Status {
 	return job.Errorf("No such container: %s", name)
 	return job.Errorf("No such container: %s", name)
 }
 }
 
 
+func (srv *Server) ContainerLogs(job *engine.Job) engine.Status {
+	if len(job.Args) != 1 {
+		return job.Errorf("Usage: %s CONTAINER\n", job.Name)
+	}
+
+	var (
+		name   = job.Args[0]
+		stdout = job.GetenvBool("stdout")
+		stderr = job.GetenvBool("stderr")
+		follow = job.GetenvBool("follow")
+		times  = job.GetenvBool("timestamps")
+		format string
+	)
+	if !(stdout || stderr) {
+		return job.Errorf("You must choose at least one stream")
+	}
+	if times {
+		format = time.StampMilli
+	}
+	container := srv.daemon.Get(name)
+	if container == nil {
+		return job.Errorf("No such container: %s", name)
+	}
+	cLog, err := container.ReadLog("json")
+	if err != nil && os.IsNotExist(err) {
+		// Legacy logs
+		utils.Debugf("Old logs format")
+		if stdout {
+			cLog, err := container.ReadLog("stdout")
+			if err != nil {
+				utils.Errorf("Error reading logs (stdout): %s", err)
+			} else if _, err := io.Copy(job.Stdout, cLog); err != nil {
+				utils.Errorf("Error streaming logs (stdout): %s", err)
+			}
+		}
+		if stderr {
+			cLog, err := container.ReadLog("stderr")
+			if err != nil {
+				utils.Errorf("Error reading logs (stderr): %s", err)
+			} else if _, err := io.Copy(job.Stderr, cLog); err != nil {
+				utils.Errorf("Error streaming logs (stderr): %s", err)
+			}
+		}
+	} else if err != nil {
+		utils.Errorf("Error reading logs (json): %s", err)
+	} else {
+		dec := json.NewDecoder(cLog)
+		for {
+			l := &utils.JSONLog{}
+
+			if err := dec.Decode(l); err == io.EOF {
+				break
+			} else if err != nil {
+				utils.Errorf("Error streaming logs: %s", err)
+				break
+			}
+			logLine := l.Log
+			if times {
+				logLine = fmt.Sprintf("[%s] %s", l.Created.Format(format), logLine)
+			}
+			if l.Stream == "stdout" && stdout {
+				fmt.Fprintf(job.Stdout, "%s", logLine)
+			}
+			if l.Stream == "stderr" && stderr {
+				fmt.Fprintf(job.Stderr, "%s", logLine)
+			}
+		}
+	}
+	if follow {
+		errors := make(chan error, 2)
+		if stdout {
+			stdoutPipe := container.StdoutLogPipe()
+			go func() {
+				errors <- utils.WriteLog(stdoutPipe, job.Stdout, format)
+			}()
+		}
+		if stderr {
+			stderrPipe := container.StderrLogPipe()
+			go func() {
+				errors <- utils.WriteLog(stderrPipe, job.Stderr, format)
+			}()
+		}
+		err := <-errors
+		if err != nil {
+			utils.Errorf("%s", err)
+		}
+	}
+	return engine.StatusOK
+}
+
 func (srv *Server) ContainerAttach(job *engine.Job) engine.Status {
 func (srv *Server) ContainerAttach(job *engine.Job) engine.Status {
 	if len(job.Args) != 1 {
 	if len(job.Args) != 1 {
 		return job.Errorf("Usage: %s CONTAINER\n", job.Name)
 		return job.Errorf("Usage: %s CONTAINER\n", job.Name)

+ 82 - 30
utils/utils.go

@@ -341,18 +341,15 @@ func (r *bufReader) Close() error {
 type WriteBroadcaster struct {
 type WriteBroadcaster struct {
 	sync.Mutex
 	sync.Mutex
 	buf     *bytes.Buffer
 	buf     *bytes.Buffer
-	writers map[StreamWriter]bool
-}
-
-type StreamWriter struct {
-	wc     io.WriteCloser
-	stream string
+	streams map[string](map[io.WriteCloser]struct{})
 }
 }
 
 
 func (w *WriteBroadcaster) AddWriter(writer io.WriteCloser, stream string) {
 func (w *WriteBroadcaster) AddWriter(writer io.WriteCloser, stream string) {
 	w.Lock()
 	w.Lock()
-	sw := StreamWriter{wc: writer, stream: stream}
-	w.writers[sw] = true
+	if _, ok := w.streams[stream]; !ok {
+		w.streams[stream] = make(map[io.WriteCloser]struct{})
+	}
+	w.streams[stream][writer] = struct{}{}
 	w.Unlock()
 	w.Unlock()
 }
 }
 
 
@@ -362,33 +359,83 @@ type JSONLog struct {
 	Created time.Time `json:"time"`
 	Created time.Time `json:"time"`
 }
 }
 
 
+func (jl *JSONLog) Format(format string) (string, error) {
+	if format == "" {
+		return jl.Log, nil
+	}
+	if format == "json" {
+		m, err := json.Marshal(jl)
+		return string(m), err
+	}
+	return fmt.Sprintf("[%s] %s", jl.Created.Format(format), jl.Log), nil
+}
+
+func WriteLog(src io.Reader, dst io.WriteCloser, format string) error {
+	dec := json.NewDecoder(src)
+	for {
+		l := &JSONLog{}
+
+		if err := dec.Decode(l); err == io.EOF {
+			return nil
+		} else if err != nil {
+			Errorf("Error streaming logs: %s", err)
+			return err
+		}
+		line, err := l.Format(format)
+		if err != nil {
+			return err
+		}
+		fmt.Fprintf(dst, "%s", line)
+	}
+}
+
+type LogFormatter struct {
+	wc         io.WriteCloser
+	timeFormat string
+}
+
 func (w *WriteBroadcaster) Write(p []byte) (n int, err error) {
 func (w *WriteBroadcaster) Write(p []byte) (n int, err error) {
+	created := time.Now().UTC()
 	w.Lock()
 	w.Lock()
 	defer w.Unlock()
 	defer w.Unlock()
+	if writers, ok := w.streams[""]; ok {
+		for sw := range writers {
+			if n, err := sw.Write(p); err != nil || n != len(p) {
+				// On error, evict the writer
+				delete(writers, sw)
+			}
+		}
+	}
 	w.buf.Write(p)
 	w.buf.Write(p)
-	for sw := range w.writers {
-		lp := p
-		if sw.stream != "" {
-			lp = nil
-			for {
-				line, err := w.buf.ReadString('\n')
-				if err != nil {
-					w.buf.Write([]byte(line))
-					break
-				}
-				b, err := json.Marshal(&JSONLog{Log: line, Stream: sw.stream, Created: time.Now().UTC()})
+	lines := []string{}
+	for {
+		line, err := w.buf.ReadString('\n')
+		if err != nil {
+			w.buf.Write([]byte(line))
+			break
+		}
+		lines = append(lines, line)
+	}
+
+	if len(lines) != 0 {
+		for stream, writers := range w.streams {
+			if stream == "" {
+				continue
+			}
+			var lp []byte
+			for _, line := range lines {
+				b, err := json.Marshal(&JSONLog{Log: line, Stream: stream, Created: created})
 				if err != nil {
 				if err != nil {
-					// On error, evict the writer
-					delete(w.writers, sw)
-					continue
+					Errorf("Error making JSON log line: %s", err)
 				}
 				}
 				lp = append(lp, b...)
 				lp = append(lp, b...)
 				lp = append(lp, '\n')
 				lp = append(lp, '\n')
 			}
 			}
-		}
-		if n, err := sw.wc.Write(lp); err != nil || n != len(lp) {
-			// On error, evict the writer
-			delete(w.writers, sw)
+			for sw := range writers {
+				if _, err := sw.Write(lp); err != nil {
+					delete(writers, sw)
+				}
+			}
 		}
 		}
 	}
 	}
 	return len(p), nil
 	return len(p), nil
@@ -397,15 +444,20 @@ func (w *WriteBroadcaster) Write(p []byte) (n int, err error) {
 func (w *WriteBroadcaster) CloseWriters() error {
 func (w *WriteBroadcaster) CloseWriters() error {
 	w.Lock()
 	w.Lock()
 	defer w.Unlock()
 	defer w.Unlock()
-	for sw := range w.writers {
-		sw.wc.Close()
+	for _, writers := range w.streams {
+		for w := range writers {
+			w.Close()
+		}
 	}
 	}
-	w.writers = make(map[StreamWriter]bool)
+	w.streams = make(map[string](map[io.WriteCloser]struct{}))
 	return nil
 	return nil
 }
 }
 
 
 func NewWriteBroadcaster() *WriteBroadcaster {
 func NewWriteBroadcaster() *WriteBroadcaster {
-	return &WriteBroadcaster{writers: make(map[StreamWriter]bool), buf: bytes.NewBuffer(nil)}
+	return &WriteBroadcaster{
+		streams: make(map[string](map[io.WriteCloser]struct{})),
+		buf:     bytes.NewBuffer(nil),
+	}
 }
 }
 
 
 func GetTotalUsedFds() int {
 func GetTotalUsedFds() int {