This commit is contained in:
Brian Goff 2024-04-19 20:57:08 -07:00 committed by GitHub
commit 3aad06af98
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 302 additions and 13 deletions
api
container/stream
daemon
integration/container
internal/unix_noeintr

View file

@ -775,7 +775,7 @@ func (s *containerRouter) postContainersAttach(ctx context.Context, w http.Respo
}
contentType := types.MediaTypeRawStream
setupStreams := func(multiplexed bool) (io.ReadCloser, io.Writer, io.Writer, error) {
setupStreams := func(multiplexed bool, cancel func()) (io.ReadCloser, io.Writer, io.Writer, error) {
conn, _, err := hijacker.Hijack()
if err != nil {
return nil, nil, nil, err
@ -793,6 +793,8 @@ func (s *containerRouter) postContainersAttach(ctx context.Context, w http.Respo
fmt.Fprintf(conn, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n")
}
go notifyClosed(ctx, conn, cancel)
closer := func() error {
httputils.CloseStreams(conn)
return nil
@ -841,7 +843,7 @@ func (s *containerRouter) wsContainersAttach(ctx context.Context, w http.Respons
version := httputils.VersionFromContext(ctx)
setupStreams := func(multiplexed bool) (io.ReadCloser, io.Writer, io.Writer, error) {
setupStreams := func(multiplexed bool, cancel func()) (io.ReadCloser, io.Writer, io.Writer, error) {
wsChan := make(chan *websocket.Conn)
h := func(conn *websocket.Conn) {
wsChan <- conn
@ -860,6 +862,8 @@ func (s *containerRouter) wsContainersAttach(ctx context.Context, w http.Respons
if versions.GreaterThanOrEqualTo(version, "1.28") {
conn.PayloadType = websocket.BinaryFrame
}
// TODO: Close notifications
return conn, conn, conn, nil
}

View file

@ -0,0 +1,54 @@
package container
import (
"context"
"net"
"syscall"
"github.com/containerd/log"
"github.com/docker/docker/internal/unix_noeintr"
"golang.org/x/sys/unix"
)
func notifyClosed(ctx context.Context, conn net.Conn, notify func()) {
sc, ok := conn.(syscall.Conn)
if !ok {
log.G(ctx).Debug("notifyClosed: conn does not support close notifications")
return
}
rc, err := sc.SyscallConn()
if err != nil {
log.G(ctx).WithError(err).Warn("notifyClosed: failed get raw conn for close notifications")
return
}
epFd, err := unix_noeintr.EpollCreate()
if err != nil {
log.G(ctx).WithError(err).Warn("notifyClosed: failed to create epoll fd")
return
}
defer unix.Close(epFd)
err = rc.Control(func(fd uintptr) {
err := unix_noeintr.EpollCtl(epFd, unix.EPOLL_CTL_ADD, int(fd), &unix.EpollEvent{
Events: unix.EPOLLHUP,
Fd: int32(fd),
})
if err != nil {
log.G(ctx).WithError(err).Warn("notifyClosed: failed to register fd for close notifications")
return
}
events := make([]unix.EpollEvent, 1)
if _, err := unix_noeintr.EpollWait(epFd, events, -1); err != nil {
log.G(ctx).WithError(err).Warn("notifyClosed: failed to wait for close notifications")
return
}
notify()
})
if err != nil {
log.G(ctx).WithError(err).Warn("notifyClosed: failed to register for close notifications")
return
}
}

View file

@ -0,0 +1,10 @@
//go:build !linux
package container
import (
"context"
"net"
)
func notifyClosed(ctx context.Context, conn net.Conn, notify func()) {}

View file

@ -30,7 +30,7 @@ type ContainerRmConfig struct {
// ContainerAttachConfig holds the streams to use when connecting to a container to view logs.
type ContainerAttachConfig struct {
GetStreams func(multiplexed bool) (io.ReadCloser, io.Writer, io.Writer, error)
GetStreams func(multiplexed bool, cancel func()) (io.ReadCloser, io.Writer, io.Writer, error)
UseStdin bool
UseStdout bool
UseStderr bool

View file

@ -150,6 +150,14 @@ func (c *Config) CopyStreams(ctx context.Context, cfg *AttachConfig) <-chan erro
cfg.CStderr.Close()
}
if cfg.Stdin != nil {
// In this case, `cfg.Stdin` is a stream from the client.
// The way `io.Copy` works we may get stuck waiting to read from `cfg.Stdin` even if the container has exited.
// This will cause the `io.Copy` to never return and the `group.Wait()` to never return.
// By closing cfg.Stdin we will cause the `io.Copy` to return and the `group.Wait()` to return.
cfg.Stdin.Close()
}
// Now with these closed, wait should return.
if err := group.Wait(); err != nil {
errs <- err

View file

@ -0,0 +1,102 @@
package stream
import (
"context"
"io"
"testing"
"time"
"gotest.tools/v3/assert"
)
// Make sure when there is no I/O on a stream that the goroutines do not get blcoked after the container exits.
func TestAttachNoIO(t *testing.T) {
t.Run("stdin only", func(t *testing.T) {
stdinR, _ := io.Pipe()
defer stdinR.Close()
testStreamCopy(t, stdinR, nil, nil)
})
t.Run("stdout only", func(t *testing.T) {
_, w := io.Pipe()
defer w.Close()
testStreamCopy(t, nil, w, nil)
})
t.Run("stderr only", func(t *testing.T) {
_, w := io.Pipe()
defer w.Close()
testStreamCopy(t, nil, nil, w)
})
t.Run("stdout+stderr", func(t *testing.T) {
_, stdoutW := io.Pipe()
defer stdoutW.Close()
_, stderrW := io.Pipe()
defer stderrW.Close()
testStreamCopy(t, nil, stdoutW, stderrW)
})
t.Run("stdin+stdout", func(t *testing.T) {
stdin, _ := io.Pipe()
defer stdin.Close()
_, stdout := io.Pipe()
defer stdout.Close()
testStreamCopy(t, stdin, stdout, nil)
})
t.Run("stdin+stderr", func(t *testing.T) {
stdin, _ := io.Pipe()
defer stdin.Close()
_, stderr := io.Pipe()
defer stderr.Close()
testStreamCopy(t, stdin, nil, stderr)
})
t.Run("stdin+stdout+stderr", func(t *testing.T) {
stdinR, _ := io.Pipe()
defer stdinR.Close()
stdoutR, stdoutW := io.Pipe()
defer stdoutR.Close()
stderrR, stderrW := io.Pipe()
defer stderrR.Close()
testStreamCopy(t, stdinR, stdoutW, stderrW)
})
}
func testStreamCopy(t *testing.T, stdin io.ReadCloser, stdout, stderr io.WriteCloser) {
cfg := AttachConfig{
UseStdin: stdin != nil,
UseStdout: stdout != nil,
UseStderr: stderr != nil,
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
}
sc := NewConfig()
sc.AttachStreams(&cfg)
defer sc.CloseStreams()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
chErr := sc.CopyStreams(ctx, &cfg)
select {
case err := <-chErr:
assert.NilError(t, err)
default:
}
cancel()
select {
case err := <-chErr:
assert.ErrorIs(t, err, context.Canceled)
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for CopyStreams to exit")
}
}

View file

@ -18,13 +18,13 @@ import (
)
// ContainerAttach attaches to logs according to the config passed in. See ContainerAttachConfig.
func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerAttachConfig) error {
func (daemon *Daemon) ContainerAttach(prefixOrName string, req *backend.ContainerAttachConfig) error {
keys := []byte{}
var err error
if c.DetachKeys != "" {
keys, err = term.ToBytes(c.DetachKeys)
if req.DetachKeys != "" {
keys, err = term.ToBytes(req.DetachKeys)
if err != nil {
return errdefs.InvalidParameter(errors.Errorf("Invalid detach keys (%s) provided", c.DetachKeys))
return errdefs.InvalidParameter(errors.Errorf("Invalid detach keys (%s) provided", req.DetachKeys))
}
}
@ -42,20 +42,37 @@ func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerA
}
cfg := stream.AttachConfig{
UseStdin: c.UseStdin,
UseStdout: c.UseStdout,
UseStderr: c.UseStderr,
UseStdin: req.UseStdin,
UseStdout: req.UseStdout,
UseStderr: req.UseStderr,
TTY: ctr.Config.Tty,
CloseStdin: ctr.Config.StdinOnce,
DetachKeys: keys,
}
ctr.StreamConfig.AttachStreams(&cfg)
multiplexed := !ctr.Config.Tty && c.MuxStreams
inStream, outStream, errStream, err := c.GetStreams(multiplexed)
multiplexed := !ctr.Config.Tty && req.MuxStreams
clientCtx, closeNotify := context.WithCancel(context.Background())
defer closeNotify()
go func() {
<-clientCtx.Done()
// The client has disconnected
// In this case we need to close the container's output streams so that the goroutines used to copy
// to the client streams are unblocked and can exit.
if cfg.CStdout != nil {
cfg.CStdout.Close()
}
if cfg.CStderr != nil {
cfg.CStderr.Close()
}
}()
inStream, outStream, errStream, err := req.GetStreams(multiplexed, closeNotify)
if err != nil {
return err
}
defer inStream.Close()
if multiplexed {
@ -73,7 +90,7 @@ func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerA
cfg.Stderr = errStream
}
if err := daemon.containerAttach(ctr, &cfg, c.Logs, c.Stream); err != nil {
if err := daemon.containerAttach(ctr, &cfg, req.Logs, req.Stream); err != nil {
fmt.Fprintf(outStream, "Error attaching: %s\n", err)
}
return nil

View file

@ -1,14 +1,18 @@
package container // import "github.com/docker/docker/integration/container"
import (
"context"
"testing"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/api/types/system"
"github.com/docker/docker/testutil"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
"gotest.tools/v3/skip"
)
func TestAttach(t *testing.T) {
@ -59,3 +63,56 @@ func TestAttach(t *testing.T) {
})
}
}
// Regression test for #37182
func TestAttachDisconnectLeak(t *testing.T) {
skip.If(t, testEnv.DaemonInfo.OSType != "linux", "Bug still exists on Windows")
setupTest(t)
client := testEnv.APIClient()
resp, err := client.ContainerCreate(context.Background(),
&container.Config{
Image: "busybox",
Cmd: []string{"/bin/sh", "-c", "while true; usleep 100000; done"},
},
&container.HostConfig{},
&network.NetworkingConfig{},
nil,
"",
)
assert.NilError(t, err)
cID := resp.ID
defer client.ContainerRemove(context.Background(), cID, container.RemoveOptions{
Force: true,
})
info, err := client.Info(context.Background())
assert.NilError(t, err)
assert.Assert(t, info.NGoroutines > 1)
attach, err := client.ContainerAttach(context.Background(), cID, container.AttachOptions{
Stdout: true,
})
assert.NilError(t, err)
defer attach.Close()
infoAttach, err := client.Info(context.Background())
assert.NilError(t, err)
assert.Assert(t, infoAttach.NGoroutines > info.NGoroutines)
attach.Close()
var info2 system.Info
for i := 0; i < 10; i++ {
info2, err = client.Info(context.Background())
assert.NilError(t, err)
if info2.NGoroutines > info.NGoroutines {
time.Sleep(time.Second)
continue
}
return
}
t.Fatalf("goroutine leak: %d -> %d", info.NGoroutines, info2.NGoroutines)
}

View file

@ -0,0 +1,37 @@
package unix_noeintr
import (
"errors"
"golang.org/x/sys/unix"
)
func EpollCreate() (int, error) {
for {
fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
if errors.Is(err, unix.EINTR) {
continue
}
return fd, err
}
}
func EpollCtl(epFd int, op int, fd int, event *unix.EpollEvent) error {
for {
err := unix.EpollCtl(epFd, op, fd, event)
if errors.Is(err, unix.EINTR) {
continue
}
return err
}
}
func EpollWait(epFd int, events []unix.EpollEvent, msec int) (int, error) {
for {
n, err := unix.EpollWait(epFd, events, msec)
if errors.Is(err, unix.EINTR) {
continue
}
return n, err
}
}