moby/client/hijack.go
Brian Goff 642e9917ff Add otel support
This uses otel standard environment variables to configure tracing in
the daemon.
It also adds support for propagating trace contexts in the client and
reading those from the API server.

See
https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/
for details on otel environment variables.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
2023-09-07 18:38:19 +00:00

164 lines
5.1 KiB
Go

package client // import "github.com/docker/docker/client"
import (
"bufio"
"context"
"fmt"
"net"
"net/http"
"net/http/httputil"
"net/url"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/versions"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
"go.opentelemetry.io/otel/trace"
)
// postHijacked sends a POST request and hijacks the connection.
func (cli *Client) postHijacked(ctx context.Context, path string, query url.Values, body interface{}, headers map[string][]string) (types.HijackedResponse, error) {
bodyEncoded, err := encodeData(body)
if err != nil {
return types.HijackedResponse{}, err
}
req, err := cli.buildRequest(ctx, http.MethodPost, cli.getAPIPath(ctx, path, query), bodyEncoded, headers)
if err != nil {
return types.HijackedResponse{}, err
}
conn, mediaType, err := cli.setupHijackConn(req, "tcp")
if err != nil {
return types.HijackedResponse{}, err
}
return types.NewHijackedResponse(conn, mediaType), err
}
// DialHijack returns a hijacked connection with negotiated protocol proto.
func (cli *Client) DialHijack(ctx context.Context, url, proto string, meta map[string][]string) (net.Conn, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, nil)
if err != nil {
return nil, err
}
req = cli.addHeaders(req, meta)
conn, _, err := cli.setupHijackConn(req, proto)
return conn, err
}
func (cli *Client) setupHijackConn(req *http.Request, proto string) (_ net.Conn, _ string, retErr error) {
ctx := req.Context()
req.Header.Set("Connection", "Upgrade")
req.Header.Set("Upgrade", proto)
// We aren't using the configured RoundTripper here so manually inject the trace context
tp := cli.tp
if tp == nil {
if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
tp = span.TracerProvider()
} else {
tp = otel.GetTracerProvider()
}
}
ctx, span := tp.Tracer("").Start(ctx, req.Method+" "+req.URL.Path)
span.SetAttributes(semconv.HTTPClientAttributesFromHTTPRequest(req)...)
defer func() {
if retErr != nil {
span.RecordError(retErr)
span.SetStatus(codes.Error, retErr.Error())
}
span.End()
}()
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header))
dialer := cli.Dialer()
conn, err := dialer(ctx)
if err != nil {
return nil, "", errors.Wrap(err, "cannot connect to the Docker daemon. Is 'docker daemon' running on this host?")
}
// When we set up a TCP connection for hijack, there could be long periods
// of inactivity (a long running command with no output) that in certain
// network setups may cause ECONNTIMEOUT, leaving the client in an unknown
// state. Setting TCP KeepAlive on the socket connection will prohibit
// ECONNTIMEOUT unless the socket connection truly is broken
if tcpConn, ok := conn.(*net.TCPConn); ok {
_ = tcpConn.SetKeepAlive(true)
_ = tcpConn.SetKeepAlivePeriod(30 * time.Second)
}
clientconn := httputil.NewClientConn(conn, nil)
defer clientconn.Close()
// Server hijacks the connection, error 'connection closed' expected
resp, err := clientconn.Do(req)
if resp != nil {
span.SetStatus(semconv.SpanStatusFromHTTPStatusCode(resp.StatusCode))
}
//nolint:staticcheck // ignore SA1019 for connecting to old (pre go1.8) daemons
if err != httputil.ErrPersistEOF {
if err != nil {
return nil, "", err
}
if resp.StatusCode != http.StatusSwitchingProtocols {
_ = resp.Body.Close()
return nil, "", fmt.Errorf("unable to upgrade to %s, received %d", proto, resp.StatusCode)
}
}
c, br := clientconn.Hijack()
if br.Buffered() > 0 {
// If there is buffered content, wrap the connection. We return an
// object that implements CloseWrite if the underlying connection
// implements it.
if _, ok := c.(types.CloseWriter); ok {
c = &hijackedConnCloseWriter{&hijackedConn{c, br}}
} else {
c = &hijackedConn{c, br}
}
} else {
br.Reset(nil)
}
var mediaType string
if versions.GreaterThanOrEqualTo(cli.ClientVersion(), "1.42") {
// Prior to 1.42, Content-Type is always set to raw-stream and not relevant
mediaType = resp.Header.Get("Content-Type")
}
return c, mediaType, nil
}
// hijackedConn wraps a net.Conn and is returned by setupHijackConn in the case
// that a) there was already buffered data in the http layer when Hijack() was
// called, and b) the underlying net.Conn does *not* implement CloseWrite().
// hijackedConn does not implement CloseWrite() either.
type hijackedConn struct {
net.Conn
r *bufio.Reader
}
func (c *hijackedConn) Read(b []byte) (int, error) {
return c.r.Read(b)
}
// hijackedConnCloseWriter is a hijackedConn which additionally implements
// CloseWrite(). It is returned by setupHijackConn in the case that a) there
// was already buffered data in the http layer when Hijack() was called, and b)
// the underlying net.Conn *does* implement CloseWrite().
type hijackedConnCloseWriter struct {
*hijackedConn
}
var _ types.CloseWriter = &hijackedConnCloseWriter{}
func (c *hijackedConnCloseWriter) CloseWrite() error {
conn := c.Conn.(types.CloseWriter)
return conn.CloseWrite()
}