diff --git a/vendor.conf b/vendor.conf index 03a1659cbd..f4d11ff446 100644 --- a/vendor.conf +++ b/vendor.conf @@ -134,7 +134,7 @@ github.com/containerd/cgroups b9de8a2212026c07cec67baf3323 github.com/containerd/console 2f1e3d2b6afd18e8b2077816c711205a0b4d8769 # v1.0.2 github.com/containerd/go-runc 16b287bc67d069a60fa48db15f330b790b74365b # v1.0.0 github.com/containerd/typeurl 5e43fb8b75ed2f2305fc04e6918c8d10636771bc # v1.0.2 -github.com/containerd/ttrpc bfba540dc45464586c106b1f31c8547933c1eb41 # v1.0.2 +github.com/containerd/ttrpc 0247db16a1f98bb76731a12ad72b8d49705b38b3 # v1.1.0 github.com/gogo/googleapis 01e0f9cca9b92166042241267ee2a5cdf5cff46c # v1.3.2 github.com/cilium/ebpf ca492085341e0e917f48ec30704d5054c5d42ca8 # v0.6.2 github.com/klauspost/compress a3b7545c88eea469c2246bee0e6c130525d56190 # v1.11.13 diff --git a/vendor/github.com/containerd/ttrpc/README.md b/vendor/github.com/containerd/ttrpc/README.md index c345c844e6..547a1297df 100644 --- a/vendor/github.com/containerd/ttrpc/README.md +++ b/vendor/github.com/containerd/ttrpc/README.md @@ -1,6 +1,7 @@ # ttrpc -[![Build Status](https://travis-ci.org/containerd/ttrpc.svg?branch=master)](https://travis-ci.org/containerd/ttrpc) +[![Build Status](https://github.com/containerd/ttrpc/workflows/CI/badge.svg)](https://github.com/containerd/ttrpc/actions?query=workflow%3ACI) +[![codecov](https://codecov.io/gh/containerd/ttrpc/branch/main/graph/badge.svg)](https://codecov.io/gh/containerd/ttrpc) GRPC for low-memory environments. @@ -40,13 +41,8 @@ directly, if required. # Status -Very new. YMMV. - TODO: -- [X] Plumb error codes and GRPC status -- [X] Remove use of any type and dependency on typeurl package -- [X] Ensure that protocol can support streaming in the future - [ ] Document protocol layout - [ ] Add testing under concurrent load to ensure - [ ] Verify connection error handling @@ -55,8 +51,8 @@ TODO: ttrpc is a containerd sub-project, licensed under the [Apache 2.0 license](./LICENSE). As a containerd sub-project, you will find the: - * [Project governance](https://github.com/containerd/project/blob/master/GOVERNANCE.md), - * [Maintainers](https://github.com/containerd/project/blob/master/MAINTAINERS), - * and [Contributing guidelines](https://github.com/containerd/project/blob/master/CONTRIBUTING.md) + * [Project governance](https://github.com/containerd/project/blob/main/GOVERNANCE.md), + * [Maintainers](https://github.com/containerd/project/blob/main/MAINTAINERS), + * and [Contributing guidelines](https://github.com/containerd/project/blob/main/CONTRIBUTING.md) information in our [`containerd/project`](https://github.com/containerd/project) repository. diff --git a/vendor/github.com/containerd/ttrpc/channel.go b/vendor/github.com/containerd/ttrpc/channel.go index aa8c9541cf..81116a5e23 100644 --- a/vendor/github.com/containerd/ttrpc/channel.go +++ b/vendor/github.com/containerd/ttrpc/channel.go @@ -19,11 +19,11 @@ package ttrpc import ( "bufio" "encoding/binary" + "fmt" "io" "net" "sync" - "github.com/pkg/errors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -105,7 +105,7 @@ func (ch *channel) recv() (messageHeader, []byte, error) { if mh.Length > uint32(messageLengthMax) { if _, err := ch.br.Discard(int(mh.Length)); err != nil { - return mh, nil, errors.Wrapf(err, "failed to discard after receiving oversized message") + return mh, nil, fmt.Errorf("failed to discard after receiving oversized message: %w", err) } return mh, nil, status.Errorf(codes.ResourceExhausted, "message length %v exceed maximum message size of %v", mh.Length, messageLengthMax) @@ -113,7 +113,7 @@ func (ch *channel) recv() (messageHeader, []byte, error) { p := ch.getmbuf(int(mh.Length)) if _, err := io.ReadFull(ch.br, p); err != nil { - return messageHeader{}, nil, errors.Wrapf(err, "failed reading message") + return messageHeader{}, nil, fmt.Errorf("failed reading message: %w", err) } return mh, p, nil diff --git a/vendor/github.com/containerd/ttrpc/client.go b/vendor/github.com/containerd/ttrpc/client.go index 30c9b73f32..26c3dd2a98 100644 --- a/vendor/github.com/containerd/ttrpc/client.go +++ b/vendor/github.com/containerd/ttrpc/client.go @@ -18,6 +18,7 @@ package ttrpc import ( "context" + "errors" "io" "net" "os" @@ -27,7 +28,6 @@ import ( "time" "github.com/gogo/protobuf/proto" - "github.com/pkg/errors" "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -194,72 +194,131 @@ type message struct { err error } -type receiver struct { - wg *sync.WaitGroup - messages chan *message - err error +// callMap provides access to a map of active calls, guarded by a mutex. +type callMap struct { + m sync.Mutex + activeCalls map[uint32]*callRequest + closeErr error } -func (r *receiver) run(ctx context.Context, c *channel) { - defer r.wg.Done() - - for { - select { - case <-ctx.Done(): - r.err = ctx.Err() - return - default: - mh, p, err := c.recv() - if err != nil { - _, ok := status.FromError(err) - if !ok { - // treat all errors that are not an rpc status as terminal. - // all others poison the connection. - r.err = filterCloseErr(err) - return - } - } - select { - case r.messages <- &message{ - messageHeader: mh, - p: p[:mh.Length], - err: err, - }: - case <-ctx.Done(): - r.err = ctx.Err() - return - } - } +// newCallMap returns a new callMap with an empty set of active calls. +func newCallMap() *callMap { + return &callMap{ + activeCalls: make(map[uint32]*callRequest), } } +// set adds a call entry to the map with the given streamID key. +func (cm *callMap) set(streamID uint32, cr *callRequest) error { + cm.m.Lock() + defer cm.m.Unlock() + if cm.closeErr != nil { + return cm.closeErr + } + cm.activeCalls[streamID] = cr + return nil +} + +// get looks up the call entry for the given streamID key, then removes it +// from the map and returns it. +func (cm *callMap) get(streamID uint32) (cr *callRequest, ok bool, err error) { + cm.m.Lock() + defer cm.m.Unlock() + if cm.closeErr != nil { + return nil, false, cm.closeErr + } + cr, ok = cm.activeCalls[streamID] + if ok { + delete(cm.activeCalls, streamID) + } + return +} + +// abort sends the given error to each active call, and clears the map. +// Once abort has been called, any subsequent calls to the callMap will return the error passed to abort. +func (cm *callMap) abort(err error) error { + cm.m.Lock() + defer cm.m.Unlock() + if cm.closeErr != nil { + return cm.closeErr + } + for streamID, call := range cm.activeCalls { + call.errs <- err + delete(cm.activeCalls, streamID) + } + cm.closeErr = err + return nil +} + func (c *Client) run() { var ( - streamID uint32 = 1 - waiters = make(map[uint32]*callRequest) - calls = c.calls - incoming = make(chan *message) - receiversDone = make(chan struct{}) - wg sync.WaitGroup + waiters = newCallMap() + receiverDone = make(chan struct{}) ) - // broadcast the shutdown error to the remaining waiters. - abortWaiters := func(wErr error) { - for _, waiter := range waiters { - waiter.errs <- wErr - } - } - recv := &receiver{ - wg: &wg, - messages: incoming, - } - wg.Add(1) - + // Sender goroutine + // Receives calls from dispatch, adds them to the set of active calls, and sends them + // to the server. go func() { - wg.Wait() - close(receiversDone) + var streamID uint32 = 1 + for { + select { + case <-c.ctx.Done(): + return + case call := <-c.calls: + id := streamID + streamID += 2 // enforce odd client initiated request ids + if err := waiters.set(id, call); err != nil { + call.errs <- err // errs is buffered so should not block. + continue + } + if err := c.send(id, messageTypeRequest, call.req); err != nil { + call.errs <- err // errs is buffered so should not block. + waiters.get(id) // remove from waiters set + } + } + } + }() + + // Receiver goroutine + // Receives responses from the server, looks up the call info in the set of active calls, + // and notifies the caller of the response. + go func() { + defer close(receiverDone) + for { + select { + case <-c.ctx.Done(): + c.setError(c.ctx.Err()) + return + default: + mh, p, err := c.channel.recv() + if err != nil { + _, ok := status.FromError(err) + if !ok { + // treat all errors that are not an rpc status as terminal. + // all others poison the connection. + c.setError(filterCloseErr(err)) + return + } + } + msg := &message{ + messageHeader: mh, + p: p[:mh.Length], + err: err, + } + call, ok, err := waiters.get(mh.StreamID) + if err != nil { + logrus.Errorf("ttrpc: failed to look up active call: %s", err) + continue + } + if !ok { + logrus.Errorf("ttrpc: received message for unknown channel %v", mh.StreamID) + continue + } + call.errs <- c.recv(call.resp, msg) + } + } }() - go recv.run(c.ctx, c.channel) defer func() { c.conn.Close() @@ -269,32 +328,14 @@ func (c *Client) run() { for { select { - case call := <-calls: - if err := c.send(streamID, messageTypeRequest, call.req); err != nil { - call.errs <- err - continue - } - - waiters[streamID] = call - streamID += 2 // enforce odd client initiated request ids - case msg := <-incoming: - call, ok := waiters[msg.StreamID] - if !ok { - logrus.Errorf("ttrpc: received message for unknown channel %v", msg.StreamID) - continue - } - - call.errs <- c.recv(call.resp, msg) - delete(waiters, msg.StreamID) - case <-receiversDone: - // all the receivers have exited - if recv.err != nil { - c.setError(recv.err) - } + case <-receiverDone: + // The receiver has exited. // don't return out, let the close of the context trigger the abort of waiters c.Close() case <-c.ctx.Done(): - abortWaiters(c.error()) + // Abort all active calls. This will also prevent any new calls from being added + // to waiters. + waiters.abort(c.error()) return } } @@ -347,7 +388,7 @@ func filterCloseErr(err error) error { return nil case err == io.EOF: return ErrClosed - case errors.Cause(err) == io.EOF: + case errors.Is(err, io.EOF): return ErrClosed case strings.Contains(err.Error(), "use of closed network connection"): return ErrClosed diff --git a/vendor/github.com/containerd/ttrpc/codec.go b/vendor/github.com/containerd/ttrpc/codec.go index b4aac2fac0..880634c27e 100644 --- a/vendor/github.com/containerd/ttrpc/codec.go +++ b/vendor/github.com/containerd/ttrpc/codec.go @@ -17,8 +17,9 @@ package ttrpc import ( + "fmt" + "github.com/gogo/protobuf/proto" - "github.com/pkg/errors" ) type codec struct{} @@ -28,7 +29,7 @@ func (c codec) Marshal(msg interface{}) ([]byte, error) { case proto.Message: return proto.Marshal(v) default: - return nil, errors.Errorf("ttrpc: cannot marshal unknown type: %T", msg) + return nil, fmt.Errorf("ttrpc: cannot marshal unknown type: %T", msg) } } @@ -37,6 +38,6 @@ func (c codec) Unmarshal(p []byte, msg interface{}) error { case proto.Message: return proto.Unmarshal(p, v) default: - return errors.Errorf("ttrpc: cannot unmarshal into unknown type: %T", msg) + return fmt.Errorf("ttrpc: cannot unmarshal into unknown type: %T", msg) } } diff --git a/vendor/github.com/containerd/ttrpc/config.go b/vendor/github.com/containerd/ttrpc/config.go index 6a53c112b7..097419635c 100644 --- a/vendor/github.com/containerd/ttrpc/config.go +++ b/vendor/github.com/containerd/ttrpc/config.go @@ -16,7 +16,7 @@ package ttrpc -import "github.com/pkg/errors" +import "errors" type serverConfig struct { handshaker Handshaker diff --git a/vendor/github.com/containerd/ttrpc/go.mod b/vendor/github.com/containerd/ttrpc/go.mod index 4ed7512f91..efc00860c6 100644 --- a/vendor/github.com/containerd/ttrpc/go.mod +++ b/vendor/github.com/containerd/ttrpc/go.mod @@ -3,12 +3,11 @@ module github.com/containerd/ttrpc go 1.13 require ( - github.com/gogo/protobuf v1.3.1 - github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect - github.com/pkg/errors v0.9.1 - github.com/prometheus/procfs v0.0.0-20190522114515-bc1a522cf7b1 - github.com/sirupsen/logrus v1.4.2 - golang.org/x/sys v0.0.0-20200120151820-655fe14d7479 - google.golang.org/genproto v0.0.0-20200117163144-32f20d992d24 - google.golang.org/grpc v1.26.0 + github.com/gogo/protobuf v1.3.2 + github.com/prometheus/procfs v0.6.0 + github.com/sirupsen/logrus v1.8.1 + golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c + google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63 + google.golang.org/grpc v1.27.1 + google.golang.org/protobuf v1.27.1 ) diff --git a/vendor/github.com/containerd/ttrpc/server.go b/vendor/github.com/containerd/ttrpc/server.go index c18b4e43b2..b0e48073e4 100644 --- a/vendor/github.com/containerd/ttrpc/server.go +++ b/vendor/github.com/containerd/ttrpc/server.go @@ -18,6 +18,7 @@ package ttrpc import ( "context" + "errors" "io" "math/rand" "net" @@ -25,7 +26,6 @@ import ( "sync/atomic" "time" - "github.com/pkg/errors" "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" diff --git a/vendor/github.com/containerd/ttrpc/services.go b/vendor/github.com/containerd/ttrpc/services.go index 2a83ba88ae..f359e9611f 100644 --- a/vendor/github.com/containerd/ttrpc/services.go +++ b/vendor/github.com/containerd/ttrpc/services.go @@ -18,13 +18,14 @@ package ttrpc import ( "context" + "errors" + "fmt" "io" "os" "path" "unsafe" "github.com/gogo/protobuf/proto" - "github.com/pkg/errors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -51,7 +52,7 @@ func newServiceSet(interceptor UnaryServerInterceptor) *serviceSet { func (s *serviceSet) register(name string, methods map[string]Method) { if _, ok := s.services[name]; ok { - panic(errors.Errorf("duplicate service %v registered", name)) + panic(fmt.Errorf("duplicate service %v registered", name)) } s.services[name] = ServiceDesc{ @@ -116,12 +117,12 @@ func (s *serviceSet) dispatch(ctx context.Context, serviceName, methodName strin func (s *serviceSet) resolve(service, method string) (Method, error) { srv, ok := s.services[service] if !ok { - return nil, status.Errorf(codes.NotFound, "service %v", service) + return nil, status.Errorf(codes.Unimplemented, "service %v", service) } mthd, ok := srv.Methods[method] if !ok { - return nil, status.Errorf(codes.NotFound, "method %v", method) + return nil, status.Errorf(codes.Unimplemented, "method %v", method) } return mthd, nil diff --git a/vendor/github.com/containerd/ttrpc/unixcreds_linux.go b/vendor/github.com/containerd/ttrpc/unixcreds_linux.go index a471bd365c..a59dad60cd 100644 --- a/vendor/github.com/containerd/ttrpc/unixcreds_linux.go +++ b/vendor/github.com/containerd/ttrpc/unixcreds_linux.go @@ -18,11 +18,12 @@ package ttrpc import ( "context" + "errors" + "fmt" "net" "os" "syscall" - "github.com/pkg/errors" "golang.org/x/sys/unix" ) @@ -31,12 +32,12 @@ type UnixCredentialsFunc func(*unix.Ucred) error func (fn UnixCredentialsFunc) Handshake(ctx context.Context, conn net.Conn) (net.Conn, interface{}, error) { uc, err := requireUnixSocket(conn) if err != nil { - return nil, nil, errors.Wrap(err, "ttrpc.UnixCredentialsFunc: require unix socket") + return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: require unix socket: %w", err) } rs, err := uc.SyscallConn() if err != nil { - return nil, nil, errors.Wrap(err, "ttrpc.UnixCredentialsFunc: (net.UnixConn).SyscallConn failed") + return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: (net.UnixConn).SyscallConn failed: %w", err) } var ( ucred *unix.Ucred @@ -45,15 +46,15 @@ func (fn UnixCredentialsFunc) Handshake(ctx context.Context, conn net.Conn) (net if err := rs.Control(func(fd uintptr) { ucred, ucredErr = unix.GetsockoptUcred(int(fd), unix.SOL_SOCKET, unix.SO_PEERCRED) }); err != nil { - return nil, nil, errors.Wrapf(err, "ttrpc.UnixCredentialsFunc: (*syscall.RawConn).Control failed") + return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: (*syscall.RawConn).Control failed: %w", err) } if ucredErr != nil { - return nil, nil, errors.Wrapf(err, "ttrpc.UnixCredentialsFunc: failed to retrieve socket peer credentials") + return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: failed to retrieve socket peer credentials: %w", err) } if err := fn(ucred); err != nil { - return nil, nil, errors.Wrapf(err, "ttrpc.UnixCredentialsFunc: credential check failed") + return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: credential check failed: %w", err) } return uc, ucred, nil @@ -93,7 +94,7 @@ func requireRoot(ucred *unix.Ucred) error { func requireUidGid(ucred *unix.Ucred, uid, gid int) error { if (uid != -1 && uint32(uid) != ucred.Uid) || (gid != -1 && uint32(gid) != ucred.Gid) { - return errors.Wrap(syscall.EPERM, "ttrpc: invalid credentials") + return fmt.Errorf("ttrpc: invalid credentials: %v", syscall.EPERM) } return nil }