Kaynağa Gözat

vendor: github.com/containerd/ttrpc v1.1.0

full diff: https://github.com/containerd/ttrpc/compare/v1.0.2...v1.1.0

- client: Handle sending/receiving in separate goroutines
- Return Unimplemented when services or methods are not implemented
- go.mod: sirupsen/logrus v1.7.0
- go.mod: update dependencies
  - go.mod: github.com/gogo/protobuf v1.3.2
  - go.mod: google.golang.org/grpc v1.27.1
  - go.mod: google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63
  - go.mod: github.com/prometheus/procfs v0.6.0
- replace pkg/errors
- Rename branch from master to main
- Use GitHub Actions for CI
- Make "go test" and "go build" work on macOS
- Add protoc-gen-go-ttrpc

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
Sebastiaan van Stijn 3 yıl önce
ebeveyn
işleme
cbb4aed0b4

+ 1 - 1
vendor.conf

@@ -134,7 +134,7 @@ github.com/containerd/cgroups                       b9de8a2212026c07cec67baf3323
 github.com/containerd/console                       2f1e3d2b6afd18e8b2077816c711205a0b4d8769 # v1.0.2
 github.com/containerd/go-runc                       16b287bc67d069a60fa48db15f330b790b74365b # v1.0.0
 github.com/containerd/typeurl                       5e43fb8b75ed2f2305fc04e6918c8d10636771bc # v1.0.2
-github.com/containerd/ttrpc                         bfba540dc45464586c106b1f31c8547933c1eb41 # v1.0.2
+github.com/containerd/ttrpc                         0247db16a1f98bb76731a12ad72b8d49705b38b3 # v1.1.0
 github.com/gogo/googleapis                          01e0f9cca9b92166042241267ee2a5cdf5cff46c # v1.3.2
 github.com/cilium/ebpf                              ca492085341e0e917f48ec30704d5054c5d42ca8 # v0.6.2
 github.com/klauspost/compress                       a3b7545c88eea469c2246bee0e6c130525d56190 # v1.11.13

+ 5 - 9
vendor/github.com/containerd/ttrpc/README.md

@@ -1,6 +1,7 @@
 # ttrpc
 
-[![Build Status](https://travis-ci.org/containerd/ttrpc.svg?branch=master)](https://travis-ci.org/containerd/ttrpc)
+[![Build Status](https://github.com/containerd/ttrpc/workflows/CI/badge.svg)](https://github.com/containerd/ttrpc/actions?query=workflow%3ACI)
+[![codecov](https://codecov.io/gh/containerd/ttrpc/branch/main/graph/badge.svg)](https://codecov.io/gh/containerd/ttrpc)
 
 GRPC for low-memory environments.
 
@@ -40,13 +41,8 @@ directly, if required.
 
 # Status
 
-Very new. YMMV.
-
 TODO:
 
-- [X] Plumb error codes and GRPC status
-- [X] Remove use of any type and dependency on typeurl package
-- [X] Ensure that protocol can support streaming in the future
 - [ ] Document protocol layout
 - [ ] Add testing under concurrent load to ensure
 - [ ] Verify connection error handling
@@ -55,8 +51,8 @@ TODO:
 
 ttrpc is a containerd sub-project, licensed under the [Apache 2.0 license](./LICENSE).
 As a containerd sub-project, you will find the:
- * [Project governance](https://github.com/containerd/project/blob/master/GOVERNANCE.md),
- * [Maintainers](https://github.com/containerd/project/blob/master/MAINTAINERS),
- * and [Contributing guidelines](https://github.com/containerd/project/blob/master/CONTRIBUTING.md)
+ * [Project governance](https://github.com/containerd/project/blob/main/GOVERNANCE.md),
+ * [Maintainers](https://github.com/containerd/project/blob/main/MAINTAINERS),
+ * and [Contributing guidelines](https://github.com/containerd/project/blob/main/CONTRIBUTING.md)
 
 information in our [`containerd/project`](https://github.com/containerd/project) repository.

+ 3 - 3
vendor/github.com/containerd/ttrpc/channel.go

@@ -19,11 +19,11 @@ package ttrpc
 import (
 	"bufio"
 	"encoding/binary"
+	"fmt"
 	"io"
 	"net"
 	"sync"
 
-	"github.com/pkg/errors"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
@@ -105,7 +105,7 @@ func (ch *channel) recv() (messageHeader, []byte, error) {
 
 	if mh.Length > uint32(messageLengthMax) {
 		if _, err := ch.br.Discard(int(mh.Length)); err != nil {
-			return mh, nil, errors.Wrapf(err, "failed to discard after receiving oversized message")
+			return mh, nil, fmt.Errorf("failed to discard after receiving oversized message: %w", err)
 		}
 
 		return mh, nil, status.Errorf(codes.ResourceExhausted, "message length %v exceed maximum message size of %v", mh.Length, messageLengthMax)
@@ -113,7 +113,7 @@ func (ch *channel) recv() (messageHeader, []byte, error) {
 
 	p := ch.getmbuf(int(mh.Length))
 	if _, err := io.ReadFull(ch.br, p); err != nil {
-		return messageHeader{}, nil, errors.Wrapf(err, "failed reading message")
+		return messageHeader{}, nil, fmt.Errorf("failed reading message: %w", err)
 	}
 
 	return mh, p, nil

+ 118 - 77
vendor/github.com/containerd/ttrpc/client.go

@@ -18,6 +18,7 @@ package ttrpc
 
 import (
 	"context"
+	"errors"
 	"io"
 	"net"
 	"os"
@@ -27,7 +28,6 @@ import (
 	"time"
 
 	"github.com/gogo/protobuf/proto"
-	"github.com/pkg/errors"
 	"github.com/sirupsen/logrus"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -194,72 +194,131 @@ type message struct {
 	err error
 }
 
-type receiver struct {
-	wg       *sync.WaitGroup
-	messages chan *message
-	err      error
+// callMap provides access to a map of active calls, guarded by a mutex.
+type callMap struct {
+	m           sync.Mutex
+	activeCalls map[uint32]*callRequest
+	closeErr    error
 }
 
-func (r *receiver) run(ctx context.Context, c *channel) {
-	defer r.wg.Done()
+// newCallMap returns a new callMap with an empty set of active calls.
+func newCallMap() *callMap {
+	return &callMap{
+		activeCalls: make(map[uint32]*callRequest),
+	}
+}
 
-	for {
-		select {
-		case <-ctx.Done():
-			r.err = ctx.Err()
-			return
-		default:
-			mh, p, err := c.recv()
-			if err != nil {
-				_, ok := status.FromError(err)
-				if !ok {
-					// treat all errors that are not an rpc status as terminal.
-					// all others poison the connection.
-					r.err = filterCloseErr(err)
-					return
-				}
-			}
-			select {
-			case r.messages <- &message{
-				messageHeader: mh,
-				p:             p[:mh.Length],
-				err:           err,
-			}:
-			case <-ctx.Done():
-				r.err = ctx.Err()
-				return
-			}
-		}
+// set adds a call entry to the map with the given streamID key.
+func (cm *callMap) set(streamID uint32, cr *callRequest) error {
+	cm.m.Lock()
+	defer cm.m.Unlock()
+	if cm.closeErr != nil {
+		return cm.closeErr
 	}
+	cm.activeCalls[streamID] = cr
+	return nil
+}
+
+// get looks up the call entry for the given streamID key, then removes it
+// from the map and returns it.
+func (cm *callMap) get(streamID uint32) (cr *callRequest, ok bool, err error) {
+	cm.m.Lock()
+	defer cm.m.Unlock()
+	if cm.closeErr != nil {
+		return nil, false, cm.closeErr
+	}
+	cr, ok = cm.activeCalls[streamID]
+	if ok {
+		delete(cm.activeCalls, streamID)
+	}
+	return
+}
+
+// abort sends the given error to each active call, and clears the map.
+// Once abort has been called, any subsequent calls to the callMap will return the error passed to abort.
+func (cm *callMap) abort(err error) error {
+	cm.m.Lock()
+	defer cm.m.Unlock()
+	if cm.closeErr != nil {
+		return cm.closeErr
+	}
+	for streamID, call := range cm.activeCalls {
+		call.errs <- err
+		delete(cm.activeCalls, streamID)
+	}
+	cm.closeErr = err
+	return nil
 }
 
 func (c *Client) run() {
 	var (
-		streamID      uint32 = 1
-		waiters              = make(map[uint32]*callRequest)
-		calls                = c.calls
-		incoming             = make(chan *message)
-		receiversDone        = make(chan struct{})
-		wg            sync.WaitGroup
+		waiters      = newCallMap()
+		receiverDone = make(chan struct{})
 	)
 
-	// broadcast the shutdown error to the remaining waiters.
-	abortWaiters := func(wErr error) {
-		for _, waiter := range waiters {
-			waiter.errs <- wErr
+	// Sender goroutine
+	// Receives calls from dispatch, adds them to the set of active calls, and sends them
+	// to the server.
+	go func() {
+		var streamID uint32 = 1
+		for {
+			select {
+			case <-c.ctx.Done():
+				return
+			case call := <-c.calls:
+				id := streamID
+				streamID += 2 // enforce odd client initiated request ids
+				if err := waiters.set(id, call); err != nil {
+					call.errs <- err // errs is buffered so should not block.
+					continue
+				}
+				if err := c.send(id, messageTypeRequest, call.req); err != nil {
+					call.errs <- err // errs is buffered so should not block.
+					waiters.get(id)  // remove from waiters set
+				}
+			}
 		}
-	}
-	recv := &receiver{
-		wg:       &wg,
-		messages: incoming,
-	}
-	wg.Add(1)
+	}()
 
+	// Receiver goroutine
+	// Receives responses from the server, looks up the call info in the set of active calls,
+	// and notifies the caller of the response.
 	go func() {
-		wg.Wait()
-		close(receiversDone)
+		defer close(receiverDone)
+		for {
+			select {
+			case <-c.ctx.Done():
+				c.setError(c.ctx.Err())
+				return
+			default:
+				mh, p, err := c.channel.recv()
+				if err != nil {
+					_, ok := status.FromError(err)
+					if !ok {
+						// treat all errors that are not an rpc status as terminal.
+						// all others poison the connection.
+						c.setError(filterCloseErr(err))
+						return
+					}
+				}
+				msg := &message{
+					messageHeader: mh,
+					p:             p[:mh.Length],
+					err:           err,
+				}
+				call, ok, err := waiters.get(mh.StreamID)
+				if err != nil {
+					logrus.Errorf("ttrpc: failed to look up active call: %s", err)
+					continue
+				}
+				if !ok {
+					logrus.Errorf("ttrpc: received message for unknown channel %v", mh.StreamID)
+					continue
+				}
+				call.errs <- c.recv(call.resp, msg)
+			}
+		}
 	}()
-	go recv.run(c.ctx, c.channel)
 
 	defer func() {
 		c.conn.Close()
@@ -269,32 +328,14 @@ func (c *Client) run() {
 
 	for {
 		select {
-		case call := <-calls:
-			if err := c.send(streamID, messageTypeRequest, call.req); err != nil {
-				call.errs <- err
-				continue
-			}
-
-			waiters[streamID] = call
-			streamID += 2 // enforce odd client initiated request ids
-		case msg := <-incoming:
-			call, ok := waiters[msg.StreamID]
-			if !ok {
-				logrus.Errorf("ttrpc: received message for unknown channel %v", msg.StreamID)
-				continue
-			}
-
-			call.errs <- c.recv(call.resp, msg)
-			delete(waiters, msg.StreamID)
-		case <-receiversDone:
-			// all the receivers have exited
-			if recv.err != nil {
-				c.setError(recv.err)
-			}
+		case <-receiverDone:
+			// The receiver has exited.
 			// don't return out, let the close of the context trigger the abort of waiters
 			c.Close()
 		case <-c.ctx.Done():
-			abortWaiters(c.error())
+			// Abort all active calls. This will also prevent any new calls from being added
+			// to waiters.
+			waiters.abort(c.error())
 			return
 		}
 	}
@@ -347,7 +388,7 @@ func filterCloseErr(err error) error {
 		return nil
 	case err == io.EOF:
 		return ErrClosed
-	case errors.Cause(err) == io.EOF:
+	case errors.Is(err, io.EOF):
 		return ErrClosed
 	case strings.Contains(err.Error(), "use of closed network connection"):
 		return ErrClosed

+ 4 - 3
vendor/github.com/containerd/ttrpc/codec.go

@@ -17,8 +17,9 @@
 package ttrpc
 
 import (
+	"fmt"
+
 	"github.com/gogo/protobuf/proto"
-	"github.com/pkg/errors"
 )
 
 type codec struct{}
@@ -28,7 +29,7 @@ func (c codec) Marshal(msg interface{}) ([]byte, error) {
 	case proto.Message:
 		return proto.Marshal(v)
 	default:
-		return nil, errors.Errorf("ttrpc: cannot marshal unknown type: %T", msg)
+		return nil, fmt.Errorf("ttrpc: cannot marshal unknown type: %T", msg)
 	}
 }
 
@@ -37,6 +38,6 @@ func (c codec) Unmarshal(p []byte, msg interface{}) error {
 	case proto.Message:
 		return proto.Unmarshal(p, v)
 	default:
-		return errors.Errorf("ttrpc: cannot unmarshal into unknown type: %T", msg)
+		return fmt.Errorf("ttrpc: cannot unmarshal into unknown type: %T", msg)
 	}
 }

+ 1 - 1
vendor/github.com/containerd/ttrpc/config.go

@@ -16,7 +16,7 @@
 
 package ttrpc
 
-import "github.com/pkg/errors"
+import "errors"
 
 type serverConfig struct {
 	handshaker  Handshaker

+ 7 - 8
vendor/github.com/containerd/ttrpc/go.mod

@@ -3,12 +3,11 @@ module github.com/containerd/ttrpc
 go 1.13
 
 require (
-	github.com/gogo/protobuf v1.3.1
-	github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
-	github.com/pkg/errors v0.9.1
-	github.com/prometheus/procfs v0.0.0-20190522114515-bc1a522cf7b1
-	github.com/sirupsen/logrus v1.4.2
-	golang.org/x/sys v0.0.0-20200120151820-655fe14d7479
-	google.golang.org/genproto v0.0.0-20200117163144-32f20d992d24
-	google.golang.org/grpc v1.26.0
+	github.com/gogo/protobuf v1.3.2
+	github.com/prometheus/procfs v0.6.0
+	github.com/sirupsen/logrus v1.8.1
+	golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c
+	google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63
+	google.golang.org/grpc v1.27.1
+	google.golang.org/protobuf v1.27.1
 )

+ 1 - 1
vendor/github.com/containerd/ttrpc/server.go

@@ -18,6 +18,7 @@ package ttrpc
 
 import (
 	"context"
+	"errors"
 	"io"
 	"math/rand"
 	"net"
@@ -25,7 +26,6 @@ import (
 	"sync/atomic"
 	"time"
 
-	"github.com/pkg/errors"
 	"github.com/sirupsen/logrus"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"

+ 5 - 4
vendor/github.com/containerd/ttrpc/services.go

@@ -18,13 +18,14 @@ package ttrpc
 
 import (
 	"context"
+	"errors"
+	"fmt"
 	"io"
 	"os"
 	"path"
 	"unsafe"
 
 	"github.com/gogo/protobuf/proto"
-	"github.com/pkg/errors"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 )
@@ -51,7 +52,7 @@ func newServiceSet(interceptor UnaryServerInterceptor) *serviceSet {
 
 func (s *serviceSet) register(name string, methods map[string]Method) {
 	if _, ok := s.services[name]; ok {
-		panic(errors.Errorf("duplicate service %v registered", name))
+		panic(fmt.Errorf("duplicate service %v registered", name))
 	}
 
 	s.services[name] = ServiceDesc{
@@ -116,12 +117,12 @@ func (s *serviceSet) dispatch(ctx context.Context, serviceName, methodName strin
 func (s *serviceSet) resolve(service, method string) (Method, error) {
 	srv, ok := s.services[service]
 	if !ok {
-		return nil, status.Errorf(codes.NotFound, "service %v", service)
+		return nil, status.Errorf(codes.Unimplemented, "service %v", service)
 	}
 
 	mthd, ok := srv.Methods[method]
 	if !ok {
-		return nil, status.Errorf(codes.NotFound, "method %v", method)
+		return nil, status.Errorf(codes.Unimplemented, "method %v", method)
 	}
 
 	return mthd, nil

+ 8 - 7
vendor/github.com/containerd/ttrpc/unixcreds_linux.go

@@ -18,11 +18,12 @@ package ttrpc
 
 import (
 	"context"
+	"errors"
+	"fmt"
 	"net"
 	"os"
 	"syscall"
 
-	"github.com/pkg/errors"
 	"golang.org/x/sys/unix"
 )
 
@@ -31,12 +32,12 @@ type UnixCredentialsFunc func(*unix.Ucred) error
 func (fn UnixCredentialsFunc) Handshake(ctx context.Context, conn net.Conn) (net.Conn, interface{}, error) {
 	uc, err := requireUnixSocket(conn)
 	if err != nil {
-		return nil, nil, errors.Wrap(err, "ttrpc.UnixCredentialsFunc: require unix socket")
+		return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: require unix socket: %w", err)
 	}
 
 	rs, err := uc.SyscallConn()
 	if err != nil {
-		return nil, nil, errors.Wrap(err, "ttrpc.UnixCredentialsFunc: (net.UnixConn).SyscallConn failed")
+		return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: (net.UnixConn).SyscallConn failed: %w", err)
 	}
 	var (
 		ucred    *unix.Ucred
@@ -45,15 +46,15 @@ func (fn UnixCredentialsFunc) Handshake(ctx context.Context, conn net.Conn) (net
 	if err := rs.Control(func(fd uintptr) {
 		ucred, ucredErr = unix.GetsockoptUcred(int(fd), unix.SOL_SOCKET, unix.SO_PEERCRED)
 	}); err != nil {
-		return nil, nil, errors.Wrapf(err, "ttrpc.UnixCredentialsFunc: (*syscall.RawConn).Control failed")
+		return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: (*syscall.RawConn).Control failed: %w", err)
 	}
 
 	if ucredErr != nil {
-		return nil, nil, errors.Wrapf(err, "ttrpc.UnixCredentialsFunc: failed to retrieve socket peer credentials")
+		return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: failed to retrieve socket peer credentials: %w", err)
 	}
 
 	if err := fn(ucred); err != nil {
-		return nil, nil, errors.Wrapf(err, "ttrpc.UnixCredentialsFunc: credential check failed")
+		return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: credential check failed: %w", err)
 	}
 
 	return uc, ucred, nil
@@ -93,7 +94,7 @@ func requireRoot(ucred *unix.Ucred) error {
 
 func requireUidGid(ucred *unix.Ucred, uid, gid int) error {
 	if (uid != -1 && uint32(uid) != ucred.Uid) || (gid != -1 && uint32(gid) != ucred.Gid) {
-		return errors.Wrap(syscall.EPERM, "ttrpc: invalid credentials")
+		return fmt.Errorf("ttrpc: invalid credentials: %v", syscall.EPERM)
 	}
 	return nil
 }