123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- package client // import "github.com/docker/docker/client"
- import (
- "bufio"
- "context"
- "fmt"
- "net"
- "net/http"
- "net/http/httputil"
- "net/url"
- "time"
- "github.com/docker/docker/api/types"
- "github.com/docker/docker/api/types/versions"
- "github.com/pkg/errors"
- "go.opentelemetry.io/otel"
- "go.opentelemetry.io/otel/codes"
- "go.opentelemetry.io/otel/propagation"
- "go.opentelemetry.io/otel/semconv/v1.17.0/httpconv"
- "go.opentelemetry.io/otel/trace"
- )
- // postHijacked sends a POST request and hijacks the connection.
- func (cli *Client) postHijacked(ctx context.Context, path string, query url.Values, body interface{}, headers map[string][]string) (types.HijackedResponse, error) {
- bodyEncoded, err := encodeData(body)
- if err != nil {
- return types.HijackedResponse{}, err
- }
- req, err := cli.buildRequest(ctx, http.MethodPost, cli.getAPIPath(ctx, path, query), bodyEncoded, headers)
- if err != nil {
- return types.HijackedResponse{}, err
- }
- conn, mediaType, err := cli.setupHijackConn(req, "tcp")
- if err != nil {
- return types.HijackedResponse{}, err
- }
- return types.NewHijackedResponse(conn, mediaType), err
- }
- // DialHijack returns a hijacked connection with negotiated protocol proto.
- func (cli *Client) DialHijack(ctx context.Context, url, proto string, meta map[string][]string) (net.Conn, error) {
- req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, nil)
- if err != nil {
- return nil, err
- }
- req = cli.addHeaders(req, meta)
- conn, _, err := cli.setupHijackConn(req, proto)
- return conn, err
- }
- func (cli *Client) setupHijackConn(req *http.Request, proto string) (_ net.Conn, _ string, retErr error) {
- ctx := req.Context()
- req.Header.Set("Connection", "Upgrade")
- req.Header.Set("Upgrade", proto)
- // We aren't using the configured RoundTripper here so manually inject the trace context
- tp := cli.tp
- if tp == nil {
- if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
- tp = span.TracerProvider()
- } else {
- tp = otel.GetTracerProvider()
- }
- }
- ctx, span := tp.Tracer("").Start(ctx, req.Method+" "+req.URL.Path, trace.WithSpanKind(trace.SpanKindClient))
- span.SetAttributes(httpconv.ClientRequest(req)...)
- defer func() {
- if retErr != nil {
- span.RecordError(retErr)
- span.SetStatus(codes.Error, retErr.Error())
- }
- span.End()
- }()
- otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header))
- dialer := cli.Dialer()
- conn, err := dialer(ctx)
- if err != nil {
- return nil, "", errors.Wrap(err, "cannot connect to the Docker daemon. Is 'docker daemon' running on this host?")
- }
- // When we set up a TCP connection for hijack, there could be long periods
- // of inactivity (a long running command with no output) that in certain
- // network setups may cause ECONNTIMEOUT, leaving the client in an unknown
- // state. Setting TCP KeepAlive on the socket connection will prohibit
- // ECONNTIMEOUT unless the socket connection truly is broken
- if tcpConn, ok := conn.(*net.TCPConn); ok {
- _ = tcpConn.SetKeepAlive(true)
- _ = tcpConn.SetKeepAlivePeriod(30 * time.Second)
- }
- clientconn := httputil.NewClientConn(conn, nil)
- defer clientconn.Close()
- // Server hijacks the connection, error 'connection closed' expected
- resp, err := clientconn.Do(req)
- if resp != nil {
- span.SetStatus(httpconv.ClientStatus(resp.StatusCode))
- }
- //nolint:staticcheck // ignore SA1019 for connecting to old (pre go1.8) daemons
- if err != httputil.ErrPersistEOF {
- if err != nil {
- return nil, "", err
- }
- if resp.StatusCode != http.StatusSwitchingProtocols {
- _ = resp.Body.Close()
- return nil, "", fmt.Errorf("unable to upgrade to %s, received %d", proto, resp.StatusCode)
- }
- }
- c, br := clientconn.Hijack()
- if br.Buffered() > 0 {
- // If there is buffered content, wrap the connection. We return an
- // object that implements CloseWrite if the underlying connection
- // implements it.
- if _, ok := c.(types.CloseWriter); ok {
- c = &hijackedConnCloseWriter{&hijackedConn{c, br}}
- } else {
- c = &hijackedConn{c, br}
- }
- } else {
- br.Reset(nil)
- }
- var mediaType string
- if versions.GreaterThanOrEqualTo(cli.ClientVersion(), "1.42") {
- // Prior to 1.42, Content-Type is always set to raw-stream and not relevant
- mediaType = resp.Header.Get("Content-Type")
- }
- return c, mediaType, nil
- }
- // hijackedConn wraps a net.Conn and is returned by setupHijackConn in the case
- // that a) there was already buffered data in the http layer when Hijack() was
- // called, and b) the underlying net.Conn does *not* implement CloseWrite().
- // hijackedConn does not implement CloseWrite() either.
- type hijackedConn struct {
- net.Conn
- r *bufio.Reader
- }
- func (c *hijackedConn) Read(b []byte) (int, error) {
- return c.r.Read(b)
- }
- // hijackedConnCloseWriter is a hijackedConn which additionally implements
- // CloseWrite(). It is returned by setupHijackConn in the case that a) there
- // was already buffered data in the http layer when Hijack() was called, and b)
- // the underlying net.Conn *does* implement CloseWrite().
- type hijackedConnCloseWriter struct {
- *hijackedConn
- }
- var _ types.CloseWriter = &hijackedConnCloseWriter{}
- func (c *hijackedConnCloseWriter) CloseWrite() error {
- conn := c.Conn.(types.CloseWriter)
- return conn.CloseWrite()
- }
|