vendor: github.com/containerd/ttrpc v1.2.2

full diff: https://github.com/containerd/ttrpc/compare/v1.1.1...v1.2.2

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2023-07-19 17:49:35 +02:00
parent 4f4dd2d995
commit 30916df3d3
No known key found for this signature in database
GPG key ID: 76698F39D527CE8C
26 changed files with 1848 additions and 470 deletions

View file

@ -130,7 +130,7 @@ require (
github.com/containerd/go-runc v1.1.0 // indirect
github.com/containerd/nydus-snapshotter v0.3.1 // indirect
github.com/containerd/stargz-snapshotter/estargz v0.13.0 // indirect
github.com/containerd/ttrpc v1.1.1 // indirect
github.com/containerd/ttrpc v1.2.2 // indirect
github.com/containerd/typeurl v1.0.2 // indirect
github.com/containernetworking/cni v1.1.2 // indirect
github.com/cyphar/filepath-securejoin v0.2.3 // indirect

View file

@ -411,8 +411,8 @@ github.com/containerd/ttrpc v0.0.0-20191028202541-4f1b8fe65a5c/go.mod h1:LPm1u0x
github.com/containerd/ttrpc v1.0.1/go.mod h1:UAxOpgT9ziI0gJrmKvgcZivgxOp8iFPSk8httJEt98Y=
github.com/containerd/ttrpc v1.0.2/go.mod h1:UAxOpgT9ziI0gJrmKvgcZivgxOp8iFPSk8httJEt98Y=
github.com/containerd/ttrpc v1.1.0/go.mod h1:XX4ZTnoOId4HklF4edwc4DcqskFZuvXB1Evzy5KFQpQ=
github.com/containerd/ttrpc v1.1.1 h1:NoRHS/z8UiHhpY1w0xcOqoJDGf2DHyzXrF0H4l5AE8c=
github.com/containerd/ttrpc v1.1.1/go.mod h1:XX4ZTnoOId4HklF4edwc4DcqskFZuvXB1Evzy5KFQpQ=
github.com/containerd/ttrpc v1.2.2 h1:9vqZr0pxwOF5koz6N0N3kJ0zDHokrcPxIR/ZR2YFtOs=
github.com/containerd/ttrpc v1.2.2/go.mod h1:sIT6l32Ph/H9cvnJsfXM5drIVzTr5A2flTf1G5tYZak=
github.com/containerd/typeurl v0.0.0-20180627222232-a93fcdb778cd/go.mod h1:Cm3kwCdlkCfMSHURc+r6fwoGH6/F1hH3S4sg0rLFWPc=
github.com/containerd/typeurl v0.0.0-20190911142611-5eb25027c9fd/go.mod h1:GeKYzf2pQcqv7tJ0AoCuuhtnqhva5LNU3U+OyKxxJpk=
github.com/containerd/typeurl v1.0.1/go.mod h1:TB1hUtrpaiO88KEK56ijojHS1+NeF0izUACaJW2mdXg=
@ -1787,6 +1787,7 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=

1
vendor/github.com/containerd/ttrpc/.gitattributes generated vendored Normal file
View file

@ -0,0 +1 @@
*.go text eol=lf

View file

@ -1,4 +1,5 @@
# Binaries for programs and plugins
/bin/
*.exe
*.dll
*.so
@ -9,3 +10,4 @@
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
coverage.txt

52
vendor/github.com/containerd/ttrpc/.golangci.yml generated vendored Normal file
View file

@ -0,0 +1,52 @@
linters:
enable:
- staticcheck
- unconvert
- gofmt
- goimports
- revive
- ineffassign
- vet
- unused
- misspell
disable:
- errcheck
linters-settings:
revive:
ignore-generated-headers: true
rules:
- name: blank-imports
- name: context-as-argument
- name: context-keys-type
- name: dot-imports
- name: error-return
- name: error-strings
- name: error-naming
- name: exported
- name: if-return
- name: increment-decrement
- name: var-naming
arguments: [["UID", "GID"], []]
- name: var-declaration
- name: package-comments
- name: range
- name: receiver-naming
- name: time-naming
- name: unexported-return
- name: indent-error-flow
- name: errorf
- name: empty-block
- name: superfluous-else
- name: unused-parameter
- name: unreachable-code
- name: redefines-builtin-id
issues:
include:
- EXC0002
run:
timeout: 8m
skip-dirs:
- example

180
vendor/github.com/containerd/ttrpc/Makefile generated vendored Normal file
View file

@ -0,0 +1,180 @@
# Copyright The containerd Authors.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Go command to use for build
GO ?= go
INSTALL ?= install
# Root directory of the project (absolute path).
ROOTDIR=$(dir $(abspath $(lastword $(MAKEFILE_LIST))))
WHALE = "🇩"
ONI = "👹"
# Project binaries.
COMMANDS=protoc-gen-go-ttrpc protoc-gen-gogottrpc
ifdef BUILDTAGS
GO_BUILDTAGS = ${BUILDTAGS}
endif
GO_BUILDTAGS ?=
GO_TAGS=$(if $(GO_BUILDTAGS),-tags "$(strip $(GO_BUILDTAGS))",)
# Project packages.
PACKAGES=$(shell $(GO) list ${GO_TAGS} ./... | grep -v /example)
TESTPACKAGES=$(shell $(GO) list ${GO_TAGS} ./... | grep -v /cmd | grep -v /integration | grep -v /example)
BINPACKAGES=$(addprefix ./cmd/,$(COMMANDS))
#Replaces ":" (*nix), ";" (windows) with newline for easy parsing
GOPATHS=$(shell echo ${GOPATH} | tr ":" "\n" | tr ";" "\n")
TESTFLAGS_RACE=
GO_BUILD_FLAGS=
# See Golang issue re: '-trimpath': https://github.com/golang/go/issues/13809
GO_GCFLAGS=$(shell \
set -- ${GOPATHS}; \
echo "-gcflags=-trimpath=$${1}/src"; \
)
BINARIES=$(addprefix bin/,$(COMMANDS))
# Flags passed to `go test`
TESTFLAGS ?= $(TESTFLAGS_RACE) $(EXTRA_TESTFLAGS)
TESTFLAGS_PARALLEL ?= 8
# Use this to replace `go test` with, for instance, `gotestsum`
GOTEST ?= $(GO) test
.PHONY: clean all AUTHORS build binaries test integration generate protos check-protos coverage ci check help install vendor install-protobuf install-protobuild
.DEFAULT: default
# Forcibly set the default goal to all, in case an include above brought in a rule definition.
.DEFAULT_GOAL := all
all: binaries
check: proto-fmt ## run all linters
@echo "$(WHALE) $@"
GOGC=75 golangci-lint run
ci: check binaries check-protos coverage # coverage-integration ## to be used by the CI
AUTHORS: .mailmap .git/HEAD
git log --format='%aN <%aE>' | sort -fu > $@
generate: protos
@echo "$(WHALE) $@"
@PATH="${ROOTDIR}/bin:${PATH}" $(GO) generate -x ${PACKAGES}
protos: bin/protoc-gen-gogottrpc bin/protoc-gen-go-ttrpc ## generate protobuf
@echo "$(WHALE) $@"
@(PATH="${ROOTDIR}/bin:${PATH}" protobuild --quiet ${PACKAGES})
check-protos: protos ## check if protobufs needs to be generated again
@echo "$(WHALE) $@"
@test -z "$$(git status --short | grep ".pb.go" | tee /dev/stderr)" || \
((git diff | cat) && \
(echo "$(ONI) please run 'make protos' when making changes to proto files" && false))
check-api-descriptors: protos ## check that protobuf changes aren't present.
@echo "$(WHALE) $@"
@test -z "$$(git status --short | grep ".pb.txt" | tee /dev/stderr)" || \
((git diff $$(find . -name '*.pb.txt') | cat) && \
(echo "$(ONI) please run 'make protos' when making changes to proto files and check-in the generated descriptor file changes" && false))
proto-fmt: ## check format of proto files
@echo "$(WHALE) $@"
@test -z "$$(find . -name '*.proto' -type f -exec grep -Hn -e "^ " {} \; | tee /dev/stderr)" || \
(echo "$(ONI) please indent proto files with tabs only" && false)
@test -z "$$(find . -name '*.proto' -type f -exec grep -Hn "Meta meta = " {} \; | grep -v '(gogoproto.nullable) = false' | tee /dev/stderr)" || \
(echo "$(ONI) meta fields in proto files must have option (gogoproto.nullable) = false" && false)
build: ## build the go packages
@echo "$(WHALE) $@"
@$(GO) build ${DEBUG_GO_GCFLAGS} ${GO_GCFLAGS} ${GO_BUILD_FLAGS} ${EXTRA_FLAGS} ${PACKAGES}
test: ## run tests, except integration tests and tests that require root
@echo "$(WHALE) $@"
@$(GOTEST) ${TESTFLAGS} ${TESTPACKAGES}
integration: ## run integration tests
@echo "$(WHALE) $@"
@cd "${ROOTDIR}/integration" && $(GOTEST) -v ${TESTFLAGS} -parallel ${TESTFLAGS_PARALLEL} .
benchmark: ## run benchmarks tests
@echo "$(WHALE) $@"
@$(GO) test ${TESTFLAGS} -bench . -run Benchmark
FORCE:
define BUILD_BINARY
@echo "$(WHALE) $@"
@$(GO) build ${DEBUG_GO_GCFLAGS} ${GO_GCFLAGS} ${GO_BUILD_FLAGS} -o $@ ${GO_TAGS} ./$<
endef
# Build a binary from a cmd.
bin/%: cmd/% FORCE
$(call BUILD_BINARY)
binaries: $(BINARIES) ## build binaries
@echo "$(WHALE) $@"
clean: ## clean up binaries
@echo "$(WHALE) $@"
@rm -f $(BINARIES)
install: ## install binaries
@echo "$(WHALE) $@ $(BINPACKAGES)"
@$(GO) install $(BINPACKAGES)
install-protobuf:
@echo "$(WHALE) $@"
@script/install-protobuf
install-protobuild:
@echo "$(WHALE) $@"
@$(GO) install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28.1
@$(GO) install github.com/containerd/protobuild@14832ccc41429f5c4f81028e5af08aa233a219cf
coverage: ## generate coverprofiles from the unit tests, except tests that require root
@echo "$(WHALE) $@"
@rm -f coverage.txt
@$(GO) test ${TESTFLAGS} ${TESTPACKAGES} 2> /dev/null
@( for pkg in ${PACKAGES}; do \
$(GO) test ${TESTFLAGS} \
-cover \
-coverprofile=profile.out \
-covermode=atomic $$pkg || exit; \
if [ -f profile.out ]; then \
cat profile.out >> coverage.txt; \
rm profile.out; \
fi; \
done )
vendor: ## ensure all the go.mod/go.sum files are up-to-date
@echo "$(WHALE) $@"
@$(GO) mod tidy
@$(GO) mod verify
verify-vendor: ## verify if all the go.mod/go.sum files are up-to-date
@echo "$(WHALE) $@"
@$(GO) mod tidy
@$(GO) mod verify
@test -z "$$(git status --short | grep "go.sum" | tee /dev/stderr)" || \
((git diff | cat) && \
(echo "$(ONI) make sure to checkin changes after go mod tidy" && false))
help: ## this help
@awk 'BEGIN {FS = ":.*?## "} /^[a-zA-Z_-]+:.*?## / {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST) | sort

240
vendor/github.com/containerd/ttrpc/PROTOCOL.md generated vendored Normal file
View file

@ -0,0 +1,240 @@
# Protocol Specification
The ttrpc protocol is client/server protocol to support multiple request streams
over a single connection with lightweight framing. The client represents the
process which initiated the underlying connection and the server is the process
which accepted the connection. The protocol is currently defined as
asymmetrical, with clients sending requests and servers sending responses. Both
clients and servers are able to send stream data. The roles are also used in
determining the stream identifiers, with client initiated streams using odd
number identifiers and server initiated using even number. The protocol may be
extended in the future to support server initiated streams, that is not
supported in the latest version.
## Purpose
The ttrpc protocol is designed to be lightweight and optimized for low latency
and reliable connections between processes on the same host. The protocol does
not include features for handling unreliable connections such as handshakes,
resets, pings, or flow control. The protocol is designed to make low-overhead
implementations as simple as possible. It is not intended as a suitable
replacement for HTTP2/3 over the network.
## Message Frame
Each Message Frame consists of a 10-byte message header followed
by message data. The data length and stream ID are both big-endian
4-byte unsigned integers. The message type is an unsigned 1-byte
integer. The flags are also an unsigned 1-byte integer and
use is defined by the message type.
+---------------------------------------------------------------+
| Data Length (32) |
+---------------------------------------------------------------+
| Stream ID (32) |
+---------------+-----------------------------------------------+
| Msg Type (8) |
+---------------+
| Flags (8) |
+---------------+-----------------------------------------------+
| Data (*) |
+---------------------------------------------------------------+
The Data Length field represents the number of bytes in the Data field. The
total frame size will always be Data Length + 10 bytes. The maximum data length
is 4MB and any larger size should be rejected. Due to the maximum data size
being less than 16MB, the first frame byte should always be zero. This first
byte should be considered reserved for future use.
The Stream ID must be odd for client initiated streams and even for server
initiated streams. Server initiated streams are not currently supported.
## Mesage Types
| Message Type | Name | Description |
|--------------|----------|----------------------------------|
| 0x01 | Request | Initiates stream |
| 0x02 | Response | Final stream data and terminates |
| 0x03 | Data | Stream data |
### Request
The request message is used to initiate stream and send along request data for
properly routing and handling the stream. The stream may indicate unary without
any inbound or outbound stream data with only a response is expected on the
stream. The request may also indicate the stream is still open for more data and
no response is expected until data is finished. If the remote indicates the
stream is closed, the request may be considered non-unary but without anymore
stream data sent. In the case of `remote closed`, the remote still expects to
receive a response or stream data. For compatibility with non streaming clients,
a request with empty flags indicates a unary request.
#### Request Flags
| Flag | Name | Description |
|------|-----------------|--------------------------------------------------|
| 0x01 | `remote closed` | Non-unary, but no more data expected from remote |
| 0x02 | `remote open` | Non-unary, remote is still sending data |
### Response
The response message is used to end a stream with data, an empty response, or
an error. A response message is the only expected message after a unary request.
A non-unary request does not require a response message if the server is sending
back stream data. A non-unary stream may return a single response message but no
other stream data may follow.
#### Response Flags
No response flags are defined at this time, flags should be empty.
### Data
The data message is used to send data on an already initialized stream. Either
client or server may send data. A data message is not allowed on a unary stream.
A data message should not be sent after indicating `remote closed` to the peer.
The last data message on a stream must set the `remote closed` flag.
The `no data` flag is used to indicate that the data message does not include
any data. This is normally used with the `remote closed` flag to indicate the
stream is now closed without transmitting any data. Since ttrpc normally
transmits a single object per message, a zero length data message may be
interpreted as an empty object. For example, transmitting the number zero as a
protobuf message ends up with a data length of zero, but the message is still
considered data and should be processed.
#### Data Flags
| Flag | Name | Description |
|------|-----------------|-----------------------------------|
| 0x01 | `remote closed` | No more data expected from remote |
| 0x04 | `no data` | This message does not have data |
## Streaming
All ttrpc requests use streams to transfer data. Unary streams will only have
two messages sent per stream, a request from a client and a response from the
server. Non-unary streams, however, may send any numbers of messages from the
client and the server. This makes stream management more complicated than unary
streams since both client and server need to track additional state. To keep
this management as simple as possible, ttrpc minimizes the number of states and
uses two flags instead of control frames. Each stream has two states while a
stream is still alive: `local closed` and `remote closed`. Each peer considers
local and remote from their own perspective and sets flags from the other peer's
perspective. For example, if a client sends a data frame with the
`remote closed` flag, that is indicating that the client is now `local closed`
and the server will be `remote closed`. A unary operation does not need to send
these flags since each received message always indicates `remote closed`. Once a
peer is both `local closed` and `remote closed`, the stream is considered
finished and may be cleaned up.
Due to the asymmetric nature of the current protocol, a client should
always be in the `local closed` state before `remote closed` and a server should
always be in the `remote closed` state before `local closed`. This happens
because the client is always initiating requests and a client always expects a
final response back from a server to indicate the initiated request has been
fulfilled. This may mean server sends a final empty response to finish a stream
even after it has already completed sending data before the client.
### Unary State Diagram
+--------+ +--------+
| Client | | Server |
+---+----+ +----+---+
| +---------+ |
local >---------------+ Request +--------------------> remote
closed | +---------+ | closed
| |
| +----------+ |
finished <--------------+ Response +--------------------< finished
| +----------+ |
| |
### Non-Unary State Diagrams
RC: `remote closed` flag
RO: `remote open` flag
+--------+ +--------+
| Client | | Server |
+---+----+ +----+---+
| +--------------+ |
>-------------+ Request [RO] +----------------->
| +--------------+ |
| |
| +------+ |
>-----------------+ Data +--------------------->
| +------+ |
| |
| +-----------+ |
local >---------------+ Data [RC] +------------------> remote
closed | +-----------+ | closed
| |
| +----------+ |
finished <--------------+ Response +--------------------< finished
| +----------+ |
| |
+--------+ +--------+
| Client | | Server |
+---+----+ +----+---+
| +--------------+ |
local >-------------+ Request [RC] +-----------------> remote
closed | +--------------+ | closed
| |
| +------+ |
<-----------------+ Data +---------------------<
| +------+ |
| |
| +-----------+ |
finished <---------------+ Data [RC] +------------------< finished
| +-----------+ |
| |
+--------+ +--------+
| Client | | Server |
+---+----+ +----+---+
| +--------------+ |
>-------------+ Request [RO] +----------------->
| +--------------+ |
| |
| +------+ |
>-----------------+ Data +--------------------->
| +------+ |
| |
| +------+ |
<-----------------+ Data +---------------------<
| +------+ |
| |
| +------+ |
>-----------------+ Data +--------------------->
| +------+ |
| |
| +-----------+ |
local >---------------+ Data [RC] +------------------> remote
closed | +-----------+ | closed
| |
| +------+ |
<-----------------+ Data +---------------------<
| +------+ |
| |
| +-----------+ |
finished <---------------+ Data [RC] +------------------< finished
| +-----------+ |
| |
## RPC
While this protocol is defined primarily to support Remote Procedure Calls, the
protocol does not define the request and response types beyond the messages
defined in the protocol. The implementation provides a default protobuf
definition of request and response which may be used for cross language rpc.
All implementations should at least define a request type which support
routing by procedure name and a response type which supports call status.
## Version History
| Version | Features |
|---------|---------------------|
| 1.0 | Unary requests only |
| 1.2 | Streaming support |

28
vendor/github.com/containerd/ttrpc/Protobuild.toml generated vendored Normal file
View file

@ -0,0 +1,28 @@
version = "2"
generators = ["go"]
# Control protoc include paths. Below are usually some good defaults, but feel
# free to try it without them if it works for your project.
[includes]
# Include paths that will be added before all others. Typically, you want to
# treat the root of the project as an include, but this may not be necessary.
before = ["."]
# Paths that will be added untouched to the end of the includes. We use
# `/usr/local/include` to pickup the common install location of protobuf.
# This is the default.
after = ["/usr/local/include"]
# This section maps protobuf imports to Go packages. These will become
# `-M` directives in the call to the go protobuf generator.
[packages]
"google/protobuf/any.proto" = "github.com/gogo/protobuf/types"
"proto/status.proto" = "google.golang.org/genproto/googleapis/rpc/status"
[[overrides]]
# enable ttrpc and disable fieldpath and grpc for the shim
prefixes = ["github.com/containerd/ttrpc/integration/streaming"]
generators = ["go", "go-ttrpc"]
[overrides.parameters.go-ttrpc]
prefix = "TTRPC"

View file

@ -1,7 +1,6 @@
# ttrpc
[![Build Status](https://github.com/containerd/ttrpc/workflows/CI/badge.svg)](https://github.com/containerd/ttrpc/actions?query=workflow%3ACI)
[![codecov](https://codecov.io/gh/containerd/ttrpc/branch/main/graph/badge.svg)](https://codecov.io/gh/containerd/ttrpc)
GRPC for low-memory environments.
@ -20,13 +19,17 @@ Please note that while this project supports generating either end of the
protocol, the generated service definitions will be incompatible with regular
GRPC services, as they do not speak the same protocol.
# Protocol
See the [protocol specification](./PROTOCOL.md).
# Usage
Create a gogo vanity binary (see
[`cmd/protoc-gen-gogottrpc/main.go`](cmd/protoc-gen-gogottrpc/main.go) for an
example with the ttrpc plugin enabled.
It's recommended to use [`protobuild`](https://github.com//stevvooe/protobuild)
It's recommended to use [`protobuild`](https://github.com/containerd/protobuild)
to build the protobufs for this project, but this will work with protoc
directly, if required.
@ -37,13 +40,11 @@ directly, if required.
- The client and server interface are identical whereas in GRPC there is a
client and server interface that are different.
- The Go stdlib context package is used instead.
- No support for streams yet.
# Status
TODO:
- [ ] Document protocol layout
- [ ] Add testing under concurrent load to ensure
- [ ] Verify connection error handling

View file

@ -38,6 +38,26 @@ type messageType uint8
const (
messageTypeRequest messageType = 0x1
messageTypeResponse messageType = 0x2
messageTypeData messageType = 0x3
)
func (mt messageType) String() string {
switch mt {
case messageTypeRequest:
return "request"
case messageTypeResponse:
return "response"
case messageTypeData:
return "data"
default:
return "unknown"
}
}
const (
flagRemoteClosed uint8 = 0x1
flagRemoteOpen uint8 = 0x2
flagNoData uint8 = 0x4
)
// messageHeader represents the fixed-length message header of 10 bytes sent
@ -46,7 +66,7 @@ type messageHeader struct {
Length uint32 // length excluding this header. b[:4]
StreamID uint32 // identifies which request stream message is a part of. b[4:8]
Type messageType // message type b[8]
Flags uint8 // reserved b[9]
Flags uint8 // type specific flags b[9]
}
func readMessageHeader(p []byte, r io.Reader) (messageHeader, error) {
@ -111,22 +131,31 @@ func (ch *channel) recv() (messageHeader, []byte, error) {
return mh, nil, status.Errorf(codes.ResourceExhausted, "message length %v exceed maximum message size of %v", mh.Length, messageLengthMax)
}
p := ch.getmbuf(int(mh.Length))
if _, err := io.ReadFull(ch.br, p); err != nil {
return messageHeader{}, nil, fmt.Errorf("failed reading message: %w", err)
var p []byte
if mh.Length > 0 {
p = ch.getmbuf(int(mh.Length))
if _, err := io.ReadFull(ch.br, p); err != nil {
return messageHeader{}, nil, fmt.Errorf("failed reading message: %w", err)
}
}
return mh, p, nil
}
func (ch *channel) send(streamID uint32, t messageType, p []byte) error {
if err := writeMessageHeader(ch.bw, ch.hwbuf[:], messageHeader{Length: uint32(len(p)), StreamID: streamID, Type: t}); err != nil {
func (ch *channel) send(streamID uint32, t messageType, flags uint8, p []byte) error {
// TODO: Error on send rather than on recv
//if len(p) > messageLengthMax {
// return status.Errorf(codes.InvalidArgument, "refusing to send, message length %v exceed maximum message size of %v", len(p), messageLengthMax)
//}
if err := writeMessageHeader(ch.bw, ch.hwbuf[:], messageHeader{Length: uint32(len(p)), StreamID: streamID, Type: t, Flags: flags}); err != nil {
return err
}
_, err := ch.bw.Write(p)
if err != nil {
return err
if len(p) > 0 {
_, err := ch.bw.Write(p)
if err != nil {
return err
}
}
return ch.bw.Flush()

View file

@ -19,30 +19,30 @@ package ttrpc
import (
"context"
"errors"
"fmt"
"io"
"net"
"os"
"strings"
"sync"
"syscall"
"time"
"github.com/gogo/protobuf/proto"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
// ErrClosed is returned by client methods when the underlying connection is
// closed.
var ErrClosed = errors.New("ttrpc: closed")
// Client for a ttrpc server
type Client struct {
codec codec
conn net.Conn
channel *channel
calls chan *callRequest
streamLock sync.RWMutex
streams map[streamID]*stream
nextStreamID streamID
sendLock sync.Mutex
ctx context.Context
closed func()
@ -51,8 +51,6 @@ type Client struct {
userCloseFunc func()
userCloseWaitCh chan struct{}
errOnce sync.Once
err error
interceptor UnaryClientInterceptor
}
@ -73,13 +71,16 @@ func WithUnaryClientInterceptor(i UnaryClientInterceptor) ClientOpts {
}
}
// NewClient creates a new ttrpc client using the given connection
func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
ctx, cancel := context.WithCancel(context.Background())
channel := newChannel(conn)
c := &Client{
codec: codec{},
conn: conn,
channel: newChannel(conn),
calls: make(chan *callRequest),
channel: channel,
streams: make(map[streamID]*stream),
nextStreamID: 1,
closed: cancel,
ctx: ctx,
userCloseFunc: func() {},
@ -95,13 +96,13 @@ func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
return c
}
type callRequest struct {
ctx context.Context
req *Request
resp *Response // response will be written back here
errs chan error // error written here on completion
func (c *Client) send(sid uint32, mt messageType, flags uint8, b []byte) error {
c.sendLock.Lock()
defer c.sendLock.Unlock()
return c.channel.send(sid, mt, flags, b)
}
// Call makes a unary request and returns with response
func (c *Client) Call(ctx context.Context, service, method string, req, resp interface{}) error {
payload, err := c.codec.Marshal(req)
if err != nil {
@ -113,6 +114,7 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
Service: service,
Method: method,
Payload: payload,
// TODO: metadata from context
}
cresp = &Response{}
@ -123,7 +125,7 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
}
if dl, ok := ctx.Deadline(); ok {
creq.TimeoutNano = dl.Sub(time.Now()).Nanoseconds()
creq.TimeoutNano = time.Until(dl).Nanoseconds()
}
info := &UnaryClientInfo{
@ -143,36 +145,143 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
return nil
}
func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error {
errs := make(chan error, 1)
call := &callRequest{
ctx: ctx,
req: req,
resp: resp,
errs: errs,
}
select {
case <-ctx.Done():
return ctx.Err()
case c.calls <- call:
case <-c.ctx.Done():
return c.error()
}
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errs:
return filterCloseErr(err)
case <-c.ctx.Done():
return c.error()
}
// StreamDesc describes the stream properties, whether the stream has
// a streaming client, a streaming server, or both
type StreamDesc struct {
StreamingClient bool
StreamingServer bool
}
// ClientStream is used to send or recv messages on the underlying stream
type ClientStream interface {
CloseSend() error
SendMsg(m interface{}) error
RecvMsg(m interface{}) error
}
type clientStream struct {
ctx context.Context
s *stream
c *Client
desc *StreamDesc
localClosed bool
remoteClosed bool
}
func (cs *clientStream) CloseSend() error {
if !cs.desc.StreamingClient {
return fmt.Errorf("%w: cannot close non-streaming client", ErrProtocol)
}
if cs.localClosed {
return ErrStreamClosed
}
err := cs.s.send(messageTypeData, flagRemoteClosed|flagNoData, nil)
if err != nil {
return filterCloseErr(err)
}
cs.localClosed = true
return nil
}
func (cs *clientStream) SendMsg(m interface{}) error {
if !cs.desc.StreamingClient {
return fmt.Errorf("%w: cannot send data from non-streaming client", ErrProtocol)
}
if cs.localClosed {
return ErrStreamClosed
}
var (
payload []byte
err error
)
if m != nil {
payload, err = cs.c.codec.Marshal(m)
if err != nil {
return err
}
}
err = cs.s.send(messageTypeData, 0, payload)
if err != nil {
return filterCloseErr(err)
}
return nil
}
func (cs *clientStream) RecvMsg(m interface{}) error {
if cs.remoteClosed {
return io.EOF
}
var msg *streamMessage
select {
case <-cs.ctx.Done():
return cs.ctx.Err()
case <-cs.s.recvClose:
// If recv has a pending message, process that first
select {
case msg = <-cs.s.recv:
default:
return cs.s.recvErr
}
case msg = <-cs.s.recv:
}
if msg.header.Type == messageTypeResponse {
resp := &Response{}
err := proto.Unmarshal(msg.payload[:msg.header.Length], resp)
// return the payload buffer for reuse
cs.c.channel.putmbuf(msg.payload)
if err != nil {
return err
}
if err := cs.c.codec.Unmarshal(resp.Payload, m); err != nil {
return err
}
if resp.Status != nil && resp.Status.Code != int32(codes.OK) {
return status.ErrorProto(resp.Status)
}
cs.c.deleteStream(cs.s)
cs.remoteClosed = true
return nil
} else if msg.header.Type == messageTypeData {
if !cs.desc.StreamingServer {
cs.c.deleteStream(cs.s)
cs.remoteClosed = true
return fmt.Errorf("received data from non-streaming server: %w", ErrProtocol)
}
if msg.header.Flags&flagRemoteClosed == flagRemoteClosed {
cs.c.deleteStream(cs.s)
cs.remoteClosed = true
if msg.header.Flags&flagNoData == flagNoData {
return io.EOF
}
}
err := cs.c.codec.Unmarshal(msg.payload[:msg.header.Length], m)
cs.c.channel.putmbuf(msg.payload)
if err != nil {
return err
}
return nil
}
return fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
}
// Close closes the ttrpc connection and underlying connection
func (c *Client) Close() error {
c.closeOnce.Do(func() {
c.closed()
c.conn.Close()
})
return nil
}
@ -188,194 +297,105 @@ func (c *Client) UserOnCloseWait(ctx context.Context) error {
}
}
type message struct {
messageHeader
p []byte
err error
}
// callMap provides access to a map of active calls, guarded by a mutex.
type callMap struct {
m sync.Mutex
activeCalls map[uint32]*callRequest
closeErr error
}
// newCallMap returns a new callMap with an empty set of active calls.
func newCallMap() *callMap {
return &callMap{
activeCalls: make(map[uint32]*callRequest),
}
}
// set adds a call entry to the map with the given streamID key.
func (cm *callMap) set(streamID uint32, cr *callRequest) error {
cm.m.Lock()
defer cm.m.Unlock()
if cm.closeErr != nil {
return cm.closeErr
}
cm.activeCalls[streamID] = cr
return nil
}
// get looks up the call entry for the given streamID key, then removes it
// from the map and returns it.
func (cm *callMap) get(streamID uint32) (cr *callRequest, ok bool, err error) {
cm.m.Lock()
defer cm.m.Unlock()
if cm.closeErr != nil {
return nil, false, cm.closeErr
}
cr, ok = cm.activeCalls[streamID]
if ok {
delete(cm.activeCalls, streamID)
}
return
}
// abort sends the given error to each active call, and clears the map.
// Once abort has been called, any subsequent calls to the callMap will return the error passed to abort.
func (cm *callMap) abort(err error) error {
cm.m.Lock()
defer cm.m.Unlock()
if cm.closeErr != nil {
return cm.closeErr
}
for streamID, call := range cm.activeCalls {
call.errs <- err
delete(cm.activeCalls, streamID)
}
cm.closeErr = err
return nil
}
func (c *Client) run() {
var (
waiters = newCallMap()
receiverDone = make(chan struct{})
)
err := c.receiveLoop()
c.Close()
c.cleanupStreams(err)
// Sender goroutine
// Receives calls from dispatch, adds them to the set of active calls, and sends them
// to the server.
go func() {
var streamID uint32 = 1
for {
select {
case <-c.ctx.Done():
return
case call := <-c.calls:
id := streamID
streamID += 2 // enforce odd client initiated request ids
if err := waiters.set(id, call); err != nil {
call.errs <- err // errs is buffered so should not block.
continue
}
if err := c.send(id, messageTypeRequest, call.req); err != nil {
call.errs <- err // errs is buffered so should not block.
waiters.get(id) // remove from waiters set
}
}
}
}()
// Receiver goroutine
// Receives responses from the server, looks up the call info in the set of active calls,
// and notifies the caller of the response.
go func() {
defer close(receiverDone)
for {
select {
case <-c.ctx.Done():
c.setError(c.ctx.Err())
return
default:
mh, p, err := c.channel.recv()
if err != nil {
_, ok := status.FromError(err)
if !ok {
// treat all errors that are not an rpc status as terminal.
// all others poison the connection.
c.setError(filterCloseErr(err))
return
}
}
msg := &message{
messageHeader: mh,
p: p[:mh.Length],
err: err,
}
call, ok, err := waiters.get(mh.StreamID)
if err != nil {
logrus.Errorf("ttrpc: failed to look up active call: %s", err)
continue
}
if !ok {
logrus.Errorf("ttrpc: received message for unknown channel %v", mh.StreamID)
continue
}
call.errs <- c.recv(call.resp, msg)
}
}
}()
defer func() {
c.conn.Close()
c.userCloseFunc()
close(c.userCloseWaitCh)
}()
c.userCloseFunc()
close(c.userCloseWaitCh)
}
func (c *Client) receiveLoop() error {
for {
select {
case <-receiverDone:
// The receiver has exited.
// don't return out, let the close of the context trigger the abort of waiters
c.Close()
case <-c.ctx.Done():
// Abort all active calls. This will also prevent any new calls from being added
// to waiters.
waiters.abort(c.error())
return
return ErrClosed
default:
var (
msg = &streamMessage{}
err error
)
msg.header, msg.payload, err = c.channel.recv()
if err != nil {
_, ok := status.FromError(err)
if !ok {
// treat all errors that are not an rpc status as terminal.
// all others poison the connection.
return filterCloseErr(err)
}
}
sid := streamID(msg.header.StreamID)
s := c.getStream(sid)
if s == nil {
logrus.WithField("stream", sid).Errorf("ttrpc: received message on inactive stream")
continue
}
if err != nil {
s.closeWithError(err)
} else {
if err := s.receive(c.ctx, msg); err != nil {
logrus.WithError(err).WithField("stream", sid).Errorf("ttrpc: failed to handle message")
}
}
}
}
}
func (c *Client) error() error {
c.errOnce.Do(func() {
if c.err == nil {
c.err = ErrClosed
}
})
return c.err
}
// createStream creates a new stream and registers it with the client
// Introduce stream types for multiple or single response
func (c *Client) createStream(flags uint8, b []byte) (*stream, error) {
c.streamLock.Lock()
func (c *Client) setError(err error) {
c.errOnce.Do(func() {
c.err = err
})
}
func (c *Client) send(streamID uint32, mtype messageType, msg interface{}) error {
p, err := c.codec.Marshal(msg)
if err != nil {
return err
// Check if closed since lock acquired to prevent adding
// anything after cleanup completes
select {
case <-c.ctx.Done():
c.streamLock.Unlock()
return nil, ErrClosed
default:
}
return c.channel.send(streamID, mtype, p)
// Stream ID should be allocated at same time
s := newStream(c.nextStreamID, c)
c.streams[s.id] = s
c.nextStreamID = c.nextStreamID + 2
c.sendLock.Lock()
defer c.sendLock.Unlock()
c.streamLock.Unlock()
if err := c.channel.send(uint32(s.id), messageTypeRequest, flags, b); err != nil {
return s, filterCloseErr(err)
}
return s, nil
}
func (c *Client) recv(resp *Response, msg *message) error {
if msg.err != nil {
return msg.err
}
func (c *Client) deleteStream(s *stream) {
c.streamLock.Lock()
delete(c.streams, s.id)
c.streamLock.Unlock()
s.closeWithError(nil)
}
if msg.Type != messageTypeResponse {
return errors.New("unknown message type received")
}
func (c *Client) getStream(sid streamID) *stream {
c.streamLock.RLock()
s := c.streams[sid]
c.streamLock.RUnlock()
return s
}
defer c.channel.putmbuf(msg.p)
return proto.Unmarshal(msg.p, resp)
func (c *Client) cleanupStreams(err error) {
c.streamLock.Lock()
defer c.streamLock.Unlock()
for sid, s := range c.streams {
s.closeWithError(err)
delete(c.streams, sid)
}
}
// filterCloseErr rewrites EOF and EPIPE errors to ErrClosed. Use when
@ -388,6 +408,8 @@ func filterCloseErr(err error) error {
return nil
case err == io.EOF:
return ErrClosed
case errors.Is(err, io.ErrClosedPipe):
return ErrClosed
case errors.Is(err, io.EOF):
return ErrClosed
case strings.Contains(err.Error(), "use of closed network connection"):
@ -395,11 +417,9 @@ func filterCloseErr(err error) error {
default:
// if we have an epipe on a write or econnreset on a read , we cast to errclosed
var oerr *net.OpError
if errors.As(err, &oerr) && (oerr.Op == "write" || oerr.Op == "read") {
serr, sok := oerr.Err.(*os.SyscallError)
if sok && ((serr.Err == syscall.EPIPE && oerr.Op == "write") ||
(serr.Err == syscall.ECONNRESET && oerr.Op == "read")) {
if errors.As(err, &oerr) {
if (oerr.Op == "write" && errors.Is(err, syscall.EPIPE)) ||
(oerr.Op == "read" && errors.Is(err, syscall.ECONNRESET)) {
return ErrClosed
}
}
@ -407,3 +427,86 @@ func filterCloseErr(err error) error {
return err
}
// NewStream creates a new stream with the given stream descriptor to the
// specified service and method. If not a streaming client, the request object
// may be provided.
func (c *Client) NewStream(ctx context.Context, desc *StreamDesc, service, method string, req interface{}) (ClientStream, error) {
var payload []byte
if req != nil {
var err error
payload, err = c.codec.Marshal(req)
if err != nil {
return nil, err
}
}
request := &Request{
Service: service,
Method: method,
Payload: payload,
// TODO: metadata from context
}
p, err := c.codec.Marshal(request)
if err != nil {
return nil, err
}
var flags uint8
if desc.StreamingClient {
flags = flagRemoteOpen
} else {
flags = flagRemoteClosed
}
s, err := c.createStream(flags, p)
if err != nil {
return nil, err
}
return &clientStream{
ctx: ctx,
s: s,
c: c,
desc: desc,
}, nil
}
func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error {
p, err := c.codec.Marshal(req)
if err != nil {
return err
}
s, err := c.createStream(0, p)
if err != nil {
return err
}
defer c.deleteStream(s)
var msg *streamMessage
select {
case <-ctx.Done():
return ctx.Err()
case <-c.ctx.Done():
return ErrClosed
case <-s.recvClose:
// If recv has a pending message, process that first
select {
case msg = <-s.recv:
default:
return s.recvErr
}
case msg = <-s.recv:
}
if msg.header.Type == messageTypeResponse {
err = proto.Unmarshal(msg.payload[:msg.header.Length], resp)
} else {
err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
}
// return the payload buffer for reuse
c.channel.putmbuf(msg.payload)
return err
}

View file

@ -19,7 +19,7 @@ package ttrpc
import (
"fmt"
"github.com/gogo/protobuf/proto"
"google.golang.org/protobuf/proto"
)
type codec struct{}

23
vendor/github.com/containerd/ttrpc/doc.go generated vendored Normal file
View file

@ -0,0 +1,23 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
package ttrpc defines and implements a low level simple transfer protocol
optimized for low latency and reliable connections between processes on the same
host. The protocol uses simple framing for sending requests, responses, and data
using multiple streams.
*/
package ttrpc

34
vendor/github.com/containerd/ttrpc/errors.go generated vendored Normal file
View file

@ -0,0 +1,34 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ttrpc
import "errors"
var (
// ErrProtocol is a general error in the handling the protocol.
ErrProtocol = errors.New("protocol error")
// ErrClosed is returned by client methods when the underlying connection is
// closed.
ErrClosed = errors.New("ttrpc: closed")
// ErrServerClosed is returned when the Server has closed its connection.
ErrServerClosed = errors.New("ttrpc: server closed")
// ErrStreamClosed is when the streaming connection is closed.
ErrStreamClosed = errors.New("ttrpc: stream closed")
)

View file

@ -45,6 +45,6 @@ func (fn handshakerFunc) Handshake(ctx context.Context, conn net.Conn) (net.Conn
return fn(ctx, conn)
}
func noopHandshake(ctx context.Context, conn net.Conn) (net.Conn, interface{}, error) {
func noopHandshake(_ context.Context, conn net.Conn) (net.Conn, interface{}, error) {
return conn, nil, nil
}

View file

@ -28,6 +28,13 @@ type UnaryClientInfo struct {
FullMethod string
}
// StreamServerInfo provides information about the server request
type StreamServerInfo struct {
FullMethod string
StreamingClient bool
StreamingServer bool
}
// Unmarshaler contains the server request data and allows it to be unmarshaled
// into a concrete type
type Unmarshaler func(interface{}) error
@ -41,10 +48,18 @@ type UnaryServerInterceptor func(context.Context, Unmarshaler, *UnaryServerInfo,
// UnaryClientInterceptor specifies the interceptor function for client request/response
type UnaryClientInterceptor func(context.Context, *Request, *Response, *UnaryClientInfo, Invoker) error
func defaultServerInterceptor(ctx context.Context, unmarshal Unmarshaler, info *UnaryServerInfo, method Method) (interface{}, error) {
func defaultServerInterceptor(ctx context.Context, unmarshal Unmarshaler, _ *UnaryServerInfo, method Method) (interface{}, error) {
return method(ctx, unmarshal)
}
func defaultClientInterceptor(ctx context.Context, req *Request, resp *Response, _ *UnaryClientInfo, invoker Invoker) error {
return invoker(ctx, req, resp)
}
type StreamServerInterceptor func(context.Context, StreamServer, *StreamServerInfo, StreamHandler) (interface{}, error)
func defaultStreamServerInterceptor(ctx context.Context, ss StreamServer, _ *StreamServerInfo, stream StreamHandler) (interface{}, error) {
return stream(ctx, ss)
}
type StreamClientInterceptor func(context.Context)

396
vendor/github.com/containerd/ttrpc/request.pb.go generated vendored Normal file
View file

@ -0,0 +1,396 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v3.20.1
// source: github.com/containerd/ttrpc/request.proto
package ttrpc
import (
status "google.golang.org/genproto/googleapis/rpc/status"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type Request struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"`
Method string `protobuf:"bytes,2,opt,name=method,proto3" json:"method,omitempty"`
Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"`
TimeoutNano int64 `protobuf:"varint,4,opt,name=timeout_nano,json=timeoutNano,proto3" json:"timeout_nano,omitempty"`
Metadata []*KeyValue `protobuf:"bytes,5,rep,name=metadata,proto3" json:"metadata,omitempty"`
}
func (x *Request) Reset() {
*x = Request{}
if protoimpl.UnsafeEnabled {
mi := &file_github_com_containerd_ttrpc_request_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Request) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Request) ProtoMessage() {}
func (x *Request) ProtoReflect() protoreflect.Message {
mi := &file_github_com_containerd_ttrpc_request_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Request.ProtoReflect.Descriptor instead.
func (*Request) Descriptor() ([]byte, []int) {
return file_github_com_containerd_ttrpc_request_proto_rawDescGZIP(), []int{0}
}
func (x *Request) GetService() string {
if x != nil {
return x.Service
}
return ""
}
func (x *Request) GetMethod() string {
if x != nil {
return x.Method
}
return ""
}
func (x *Request) GetPayload() []byte {
if x != nil {
return x.Payload
}
return nil
}
func (x *Request) GetTimeoutNano() int64 {
if x != nil {
return x.TimeoutNano
}
return 0
}
func (x *Request) GetMetadata() []*KeyValue {
if x != nil {
return x.Metadata
}
return nil
}
type Response struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Status *status.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"`
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
}
func (x *Response) Reset() {
*x = Response{}
if protoimpl.UnsafeEnabled {
mi := &file_github_com_containerd_ttrpc_request_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Response) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Response) ProtoMessage() {}
func (x *Response) ProtoReflect() protoreflect.Message {
mi := &file_github_com_containerd_ttrpc_request_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Response.ProtoReflect.Descriptor instead.
func (*Response) Descriptor() ([]byte, []int) {
return file_github_com_containerd_ttrpc_request_proto_rawDescGZIP(), []int{1}
}
func (x *Response) GetStatus() *status.Status {
if x != nil {
return x.Status
}
return nil
}
func (x *Response) GetPayload() []byte {
if x != nil {
return x.Payload
}
return nil
}
type StringList struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
List []string `protobuf:"bytes,1,rep,name=list,proto3" json:"list,omitempty"`
}
func (x *StringList) Reset() {
*x = StringList{}
if protoimpl.UnsafeEnabled {
mi := &file_github_com_containerd_ttrpc_request_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StringList) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StringList) ProtoMessage() {}
func (x *StringList) ProtoReflect() protoreflect.Message {
mi := &file_github_com_containerd_ttrpc_request_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StringList.ProtoReflect.Descriptor instead.
func (*StringList) Descriptor() ([]byte, []int) {
return file_github_com_containerd_ttrpc_request_proto_rawDescGZIP(), []int{2}
}
func (x *StringList) GetList() []string {
if x != nil {
return x.List
}
return nil
}
type KeyValue struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
}
func (x *KeyValue) Reset() {
*x = KeyValue{}
if protoimpl.UnsafeEnabled {
mi := &file_github_com_containerd_ttrpc_request_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *KeyValue) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*KeyValue) ProtoMessage() {}
func (x *KeyValue) ProtoReflect() protoreflect.Message {
mi := &file_github_com_containerd_ttrpc_request_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use KeyValue.ProtoReflect.Descriptor instead.
func (*KeyValue) Descriptor() ([]byte, []int) {
return file_github_com_containerd_ttrpc_request_proto_rawDescGZIP(), []int{3}
}
func (x *KeyValue) GetKey() string {
if x != nil {
return x.Key
}
return ""
}
func (x *KeyValue) GetValue() string {
if x != nil {
return x.Value
}
return ""
}
var File_github_com_containerd_ttrpc_request_proto protoreflect.FileDescriptor
var file_github_com_containerd_ttrpc_request_proto_rawDesc = []byte{
0x0a, 0x29, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e,
0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2f, 0x72, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x74, 0x74, 0x72,
0x70, 0x63, 0x1a, 0x12, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa5, 0x01, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x16, 0x0a, 0x06,
0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, 0x65,
0x74, 0x68, 0x6f, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18,
0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x21,
0x0a, 0x0c, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x5f, 0x6e, 0x61, 0x6e, 0x6f, 0x18, 0x04,
0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x4e, 0x61, 0x6e,
0x6f, 0x12, 0x2b, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20,
0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x4b, 0x65, 0x79, 0x56,
0x61, 0x6c, 0x75, 0x65, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x22, 0x45,
0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1f, 0x0a, 0x06, 0x73, 0x74,
0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x07, 0x2e, 0x53, 0x74, 0x61,
0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x70,
0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61,
0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x20, 0x0a, 0x0a, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x4c,
0x69, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6c, 0x69, 0x73, 0x74, 0x18, 0x01, 0x20, 0x03, 0x28,
0x09, 0x52, 0x04, 0x6c, 0x69, 0x73, 0x74, 0x22, 0x32, 0x0a, 0x08, 0x4b, 0x65, 0x79, 0x56, 0x61,
0x6c, 0x75, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02,
0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x1d, 0x5a, 0x1b, 0x67,
0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69,
0x6e, 0x65, 0x72, 0x64, 0x2f, 0x74, 0x74, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
}
var (
file_github_com_containerd_ttrpc_request_proto_rawDescOnce sync.Once
file_github_com_containerd_ttrpc_request_proto_rawDescData = file_github_com_containerd_ttrpc_request_proto_rawDesc
)
func file_github_com_containerd_ttrpc_request_proto_rawDescGZIP() []byte {
file_github_com_containerd_ttrpc_request_proto_rawDescOnce.Do(func() {
file_github_com_containerd_ttrpc_request_proto_rawDescData = protoimpl.X.CompressGZIP(file_github_com_containerd_ttrpc_request_proto_rawDescData)
})
return file_github_com_containerd_ttrpc_request_proto_rawDescData
}
var file_github_com_containerd_ttrpc_request_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_github_com_containerd_ttrpc_request_proto_goTypes = []interface{}{
(*Request)(nil), // 0: ttrpc.Request
(*Response)(nil), // 1: ttrpc.Response
(*StringList)(nil), // 2: ttrpc.StringList
(*KeyValue)(nil), // 3: ttrpc.KeyValue
(*status.Status)(nil), // 4: Status
}
var file_github_com_containerd_ttrpc_request_proto_depIdxs = []int32{
3, // 0: ttrpc.Request.metadata:type_name -> ttrpc.KeyValue
4, // 1: ttrpc.Response.status:type_name -> Status
2, // [2:2] is the sub-list for method output_type
2, // [2:2] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
2, // [2:2] is the sub-list for extension extendee
0, // [0:2] is the sub-list for field type_name
}
func init() { file_github_com_containerd_ttrpc_request_proto_init() }
func file_github_com_containerd_ttrpc_request_proto_init() {
if File_github_com_containerd_ttrpc_request_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_github_com_containerd_ttrpc_request_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Request); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_github_com_containerd_ttrpc_request_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Response); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_github_com_containerd_ttrpc_request_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StringList); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_github_com_containerd_ttrpc_request_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*KeyValue); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_github_com_containerd_ttrpc_request_proto_rawDesc,
NumEnums: 0,
NumMessages: 4,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_github_com_containerd_ttrpc_request_proto_goTypes,
DependencyIndexes: file_github_com_containerd_ttrpc_request_proto_depIdxs,
MessageInfos: file_github_com_containerd_ttrpc_request_proto_msgTypes,
}.Build()
File_github_com_containerd_ttrpc_request_proto = out.File
file_github_com_containerd_ttrpc_request_proto_rawDesc = nil
file_github_com_containerd_ttrpc_request_proto_goTypes = nil
file_github_com_containerd_ttrpc_request_proto_depIdxs = nil
}

29
vendor/github.com/containerd/ttrpc/request.proto generated vendored Normal file
View file

@ -0,0 +1,29 @@
syntax = "proto3";
package ttrpc;
import "proto/status.proto";
option go_package = "github.com/containerd/ttrpc";
message Request {
string service = 1;
string method = 2;
bytes payload = 3;
int64 timeout_nano = 4;
repeated KeyValue metadata = 5;
}
message Response {
Status status = 1;
bytes payload = 2;
}
message StringList {
repeated string list = 1;
}
message KeyValue {
string key = 1;
string value = 2;
}

View file

@ -32,10 +32,6 @@ import (
"google.golang.org/grpc/status"
)
var (
ErrServerClosed = errors.New("ttrpc: server closed")
)
type Server struct {
config *serverConfig
services *serviceSet
@ -67,8 +63,14 @@ func NewServer(opts ...ServerOpt) (*Server, error) {
}, nil
}
// Register registers a map of methods to method handlers
// TODO: Remove in 2.0, does not support streams
func (s *Server) Register(name string, methods map[string]Method) {
s.services.register(name, methods)
s.services.register(name, &ServiceDesc{Methods: methods})
}
func (s *Server) RegisterService(name string, desc *ServiceDesc) {
s.services.register(name, desc)
}
func (s *Server) Serve(ctx context.Context, l net.Listener) error {
@ -119,12 +121,18 @@ func (s *Server) Serve(ctx context.Context, l net.Listener) error {
approved, handshake, err := handshaker.Handshake(ctx, conn)
if err != nil {
logrus.WithError(err).Errorf("ttrpc: refusing connection after handshake")
logrus.WithError(err).Error("ttrpc: refusing connection after handshake")
conn.Close()
continue
}
sc, err := s.newConn(approved, handshake)
if err != nil {
logrus.WithError(err).Error("ttrpc: create connection failed")
conn.Close()
continue
}
sc := s.newConn(approved, handshake)
go sc.run(ctx)
}
}
@ -143,15 +151,20 @@ func (s *Server) Shutdown(ctx context.Context) error {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
if s.closeIdleConns() {
return lnerr
s.closeIdleConns()
if s.countConnection() == 0 {
break
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
return lnerr
}
// Close the server without waiting for active connections.
@ -203,11 +216,18 @@ func (s *Server) closeListeners() error {
return err
}
func (s *Server) addConnection(c *serverConn) {
func (s *Server) addConnection(c *serverConn) error {
s.mu.Lock()
defer s.mu.Unlock()
select {
case <-s.done:
return ErrServerClosed
default:
}
s.connections[c] = struct{}{}
return nil
}
func (s *Server) delConnection(c *serverConn) {
@ -224,20 +244,17 @@ func (s *Server) countConnection() int {
return len(s.connections)
}
func (s *Server) closeIdleConns() bool {
func (s *Server) closeIdleConns() {
s.mu.Lock()
defer s.mu.Unlock()
quiescent := true
for c := range s.connections {
st, ok := c.getState()
if !ok || st != connStateIdle {
quiescent = false
if st, ok := c.getState(); !ok || st == connStateActive {
continue
}
c.close()
delete(s.connections, c)
}
return quiescent
}
type connState int
@ -261,7 +278,7 @@ func (cs connState) String() string {
}
}
func (s *Server) newConn(conn net.Conn, handshake interface{}) *serverConn {
func (s *Server) newConn(conn net.Conn, handshake interface{}) (*serverConn, error) {
c := &serverConn{
server: s,
conn: conn,
@ -269,8 +286,11 @@ func (s *Server) newConn(conn net.Conn, handshake interface{}) *serverConn {
shutdown: make(chan struct{}),
}
c.setState(connStateIdle)
s.addConnection(c)
return c
if err := s.addConnection(c); err != nil {
c.close()
return nil, err
}
return c, nil
}
type serverConn struct {
@ -302,27 +322,25 @@ func (c *serverConn) close() error {
func (c *serverConn) run(sctx context.Context) {
type (
request struct {
id uint32
req *Request
}
response struct {
id uint32
resp *Response
id uint32
status *status.Status
data []byte
closeStream bool
streaming bool
}
)
var (
ch = newChannel(c.conn)
ctx, cancel = context.WithCancel(sctx)
active int
state connState = connStateIdle
responses = make(chan response)
requests = make(chan request)
recvErr = make(chan error, 1)
shutdown = c.shutdown
done = make(chan struct{})
ch = newChannel(c.conn)
ctx, cancel = context.WithCancel(sctx)
state connState = connStateIdle
responses = make(chan response)
recvErr = make(chan error, 1)
done = make(chan struct{})
streams = sync.Map{}
active int32
lastStreamID uint32
)
defer c.conn.Close()
@ -330,27 +348,26 @@ func (c *serverConn) run(sctx context.Context) {
defer close(done)
defer c.server.delConnection(c)
sendStatus := func(id uint32, st *status.Status) bool {
select {
case responses <- response{
// even though we've had an invalid stream id, we send it
// back on the same stream id so the client knows which
// stream id was bad.
id: id,
status: st,
closeStream: true,
}:
return true
case <-c.shutdown:
return false
case <-done:
return false
}
}
go func(recvErr chan error) {
defer close(recvErr)
sendImmediate := func(id uint32, st *status.Status) bool {
select {
case responses <- response{
// even though we've had an invalid stream id, we send it
// back on the same stream id so the client knows which
// stream id was bad.
id: id,
resp: &Response{
Status: st.Proto(),
},
}:
return true
case <-c.shutdown:
return false
case <-done:
return false
}
}
for {
select {
case <-c.shutdown:
@ -370,110 +387,173 @@ func (c *serverConn) run(sctx context.Context) {
// in this case, we send an error for that particular message
// when the status is defined.
if !sendImmediate(mh.StreamID, status) {
if !sendStatus(mh.StreamID, status) {
return
}
continue
}
if mh.Type != messageTypeRequest {
// we must ignore this for future compat.
continue
}
var req Request
if err := c.server.codec.Unmarshal(p, &req); err != nil {
ch.putmbuf(p)
if !sendImmediate(mh.StreamID, status.Newf(codes.InvalidArgument, "unmarshal request error: %v", err)) {
return
}
continue
}
ch.putmbuf(p)
if mh.StreamID%2 != 1 {
// enforce odd client initiated identifiers.
if !sendImmediate(mh.StreamID, status.Newf(codes.InvalidArgument, "StreamID must be odd for client initiated streams")) {
if !sendStatus(mh.StreamID, status.Newf(codes.InvalidArgument, "StreamID must be odd for client initiated streams")) {
return
}
continue
}
// Forward the request to the main loop. We don't wait on s.done
// because we have already accepted the client request.
select {
case requests <- request{
id: mh.StreamID,
req: &req,
}:
case <-done:
return
if mh.Type == messageTypeData {
i, ok := streams.Load(mh.StreamID)
if !ok {
if !sendStatus(mh.StreamID, status.Newf(codes.InvalidArgument, "StreamID is no longer active")) {
return
}
}
sh := i.(*streamHandler)
if mh.Flags&flagNoData != flagNoData {
unmarshal := func(obj interface{}) error {
err := protoUnmarshal(p, obj)
ch.putmbuf(p)
return err
}
if err := sh.data(unmarshal); err != nil {
if !sendStatus(mh.StreamID, status.Newf(codes.InvalidArgument, "data handling error: %v", err)) {
return
}
}
}
if mh.Flags&flagRemoteClosed == flagRemoteClosed {
sh.closeSend()
if len(p) > 0 {
if !sendStatus(mh.StreamID, status.Newf(codes.InvalidArgument, "data close message cannot include data")) {
return
}
}
}
} else if mh.Type == messageTypeRequest {
if mh.StreamID <= lastStreamID {
// enforce odd client initiated identifiers.
if !sendStatus(mh.StreamID, status.Newf(codes.InvalidArgument, "StreamID cannot be re-used and must increment")) {
return
}
continue
}
lastStreamID = mh.StreamID
// TODO: Make request type configurable
// Unmarshaller which takes in a byte array and returns an interface?
var req Request
if err := c.server.codec.Unmarshal(p, &req); err != nil {
ch.putmbuf(p)
if !sendStatus(mh.StreamID, status.Newf(codes.InvalidArgument, "unmarshal request error: %v", err)) {
return
}
continue
}
ch.putmbuf(p)
id := mh.StreamID
respond := func(status *status.Status, data []byte, streaming, closeStream bool) error {
select {
case responses <- response{
id: id,
status: status,
data: data,
closeStream: closeStream,
streaming: streaming,
}:
case <-done:
return ErrClosed
}
return nil
}
sh, err := c.server.services.handle(ctx, &req, respond)
if err != nil {
status, _ := status.FromError(err)
if !sendStatus(mh.StreamID, status) {
return
}
continue
}
streams.Store(id, sh)
atomic.AddInt32(&active, 1)
}
// TODO: else we must ignore this for future compat. log this?
}
}(recvErr)
for {
newstate := state
switch {
case active > 0:
var (
newstate connState
shutdown chan struct{}
)
activeN := atomic.LoadInt32(&active)
if activeN > 0 {
newstate = connStateActive
shutdown = nil
case active == 0:
} else {
newstate = connStateIdle
shutdown = c.shutdown // only enable this branch in idle mode
}
if newstate != state {
c.setState(newstate)
state = newstate
}
select {
case request := <-requests:
active++
go func(id uint32) {
ctx, cancel := getRequestContext(ctx, request.req)
defer cancel()
p, status := c.server.services.call(ctx, request.req.Service, request.req.Method, request.req.Payload)
resp := &Response{
Status: status.Proto(),
Payload: p,
}
select {
case responses <- response{
id: id,
resp: resp,
}:
case <-done:
}
}(request.id)
case response := <-responses:
p, err := c.server.codec.Marshal(response.resp)
if err != nil {
logrus.WithError(err).Error("failed marshaling response")
return
if !response.streaming || response.status.Code() != codes.OK {
p, err := c.server.codec.Marshal(&Response{
Status: response.status.Proto(),
Payload: response.data,
})
if err != nil {
logrus.WithError(err).Error("failed marshaling response")
return
}
if err := ch.send(response.id, messageTypeResponse, 0, p); err != nil {
logrus.WithError(err).Error("failed sending message on channel")
return
}
} else {
var flags uint8
if response.closeStream {
flags = flagRemoteClosed
}
if response.data == nil {
flags = flags | flagNoData
}
if err := ch.send(response.id, messageTypeData, flags, response.data); err != nil {
logrus.WithError(err).Error("failed sending message on channel")
return
}
}
if err := ch.send(response.id, messageTypeResponse, p); err != nil {
logrus.WithError(err).Error("failed sending message on channel")
return
if response.closeStream {
// The ttrpc protocol currently does not support the case where
// the server is localClosed but not remoteClosed. Once the server
// is closing, the whole stream may be considered finished
streams.Delete(response.id)
atomic.AddInt32(&active, -1)
}
active--
case err := <-recvErr:
// TODO(stevvooe): Not wildly clear what we should do in this
// branch. Basically, it means that we are no longer receiving
// requests due to a terminal error.
recvErr = nil // connection is now "closing"
if err == io.EOF || err == io.ErrUnexpectedEOF || errors.Is(err, syscall.ECONNRESET) {
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.ECONNRESET) {
// The client went away and we should stop processing
// requests, so that the client connection is closed
return
}
logrus.WithError(err).Error("error receiving message")
// else, initiate shutdown
case <-shutdown:
return
}

View file

@ -25,43 +25,62 @@ import (
"path"
"unsafe"
"github.com/gogo/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
type Method func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error)
type StreamHandler func(context.Context, StreamServer) (interface{}, error)
type Stream struct {
Handler StreamHandler
StreamingClient bool
StreamingServer bool
}
type ServiceDesc struct {
Methods map[string]Method
// TODO(stevvooe): Add stream support.
Streams map[string]Stream
}
type serviceSet struct {
services map[string]ServiceDesc
interceptor UnaryServerInterceptor
services map[string]*ServiceDesc
unaryInterceptor UnaryServerInterceptor
streamInterceptor StreamServerInterceptor
}
func newServiceSet(interceptor UnaryServerInterceptor) *serviceSet {
return &serviceSet{
services: make(map[string]ServiceDesc),
interceptor: interceptor,
services: make(map[string]*ServiceDesc),
unaryInterceptor: interceptor,
streamInterceptor: defaultStreamServerInterceptor,
}
}
func (s *serviceSet) register(name string, methods map[string]Method) {
func (s *serviceSet) register(name string, desc *ServiceDesc) {
if _, ok := s.services[name]; ok {
panic(fmt.Errorf("duplicate service %v registered", name))
}
s.services[name] = ServiceDesc{
Methods: methods,
}
s.services[name] = desc
}
func (s *serviceSet) call(ctx context.Context, serviceName, methodName string, p []byte) ([]byte, *status.Status) {
p, err := s.dispatch(ctx, serviceName, methodName, p)
func (s *serviceSet) unaryCall(ctx context.Context, method Method, info *UnaryServerInfo, data []byte) (p []byte, st *status.Status) {
unmarshal := func(obj interface{}) error {
return protoUnmarshal(data, obj)
}
resp, err := s.unaryInterceptor(ctx, unmarshal, info, method)
if err == nil {
if isNil(resp) {
err = errors.New("ttrpc: marshal called with nil")
} else {
p, err = protoMarshal(resp)
}
}
st, ok := status.FromError(err)
if !ok {
st = status.New(convertCode(err), err.Error())
@ -70,38 +89,142 @@ func (s *serviceSet) call(ctx context.Context, serviceName, methodName string, p
return p, st
}
func (s *serviceSet) dispatch(ctx context.Context, serviceName, methodName string, p []byte) ([]byte, error) {
method, err := s.resolve(serviceName, methodName)
if err != nil {
return nil, err
func (s *serviceSet) streamCall(ctx context.Context, stream StreamHandler, info *StreamServerInfo, ss StreamServer) (p []byte, st *status.Status) {
resp, err := s.streamInterceptor(ctx, ss, info, stream)
if err == nil {
p, err = protoMarshal(resp)
}
st, ok := status.FromError(err)
if !ok {
st = status.New(convertCode(err), err.Error())
}
return
}
func (s *serviceSet) handle(ctx context.Context, req *Request, respond func(*status.Status, []byte, bool, bool) error) (*streamHandler, error) {
srv, ok := s.services[req.Service]
if !ok {
return nil, status.Errorf(codes.Unimplemented, "service %v", req.Service)
}
unmarshal := func(obj interface{}) error {
switch v := obj.(type) {
case proto.Message:
if err := proto.Unmarshal(p, v); err != nil {
return status.Errorf(codes.Internal, "ttrpc: error unmarshalling payload: %v", err.Error())
if method, ok := srv.Methods[req.Method]; ok {
go func() {
ctx, cancel := getRequestContext(ctx, req)
defer cancel()
info := &UnaryServerInfo{
FullMethod: fullPath(req.Service, req.Method),
}
default:
return status.Errorf(codes.Internal, "ttrpc: error unsupported request type: %T", v)
p, st := s.unaryCall(ctx, method, info, req.Payload)
respond(st, p, false, true)
}()
return nil, nil
}
if stream, ok := srv.Streams[req.Method]; ok {
ctx, cancel := getRequestContext(ctx, req)
info := &StreamServerInfo{
FullMethod: fullPath(req.Service, req.Method),
StreamingClient: stream.StreamingClient,
StreamingServer: stream.StreamingServer,
}
sh := &streamHandler{
ctx: ctx,
respond: respond,
recv: make(chan Unmarshaler, 5),
info: info,
}
go func() {
defer cancel()
p, st := s.streamCall(ctx, stream.Handler, info, sh)
respond(st, p, stream.StreamingServer, true)
}()
if req.Payload != nil {
unmarshal := func(obj interface{}) error {
return protoUnmarshal(req.Payload, obj)
}
if err := sh.data(unmarshal); err != nil {
return nil, err
}
}
return sh, nil
}
return nil, status.Errorf(codes.Unimplemented, "method %v", req.Method)
}
type streamHandler struct {
ctx context.Context
respond func(*status.Status, []byte, bool, bool) error
recv chan Unmarshaler
info *StreamServerInfo
remoteClosed bool
localClosed bool
}
func (s *streamHandler) closeSend() {
if !s.remoteClosed {
s.remoteClosed = true
close(s.recv)
}
}
func (s *streamHandler) data(unmarshal Unmarshaler) error {
if s.remoteClosed {
return ErrStreamClosed
}
select {
case s.recv <- unmarshal:
return nil
case <-s.ctx.Done():
return s.ctx.Err()
}
}
info := &UnaryServerInfo{
FullMethod: fullPath(serviceName, methodName),
func (s *streamHandler) SendMsg(m interface{}) error {
if s.localClosed {
return ErrStreamClosed
}
resp, err := s.interceptor(ctx, unmarshal, info, method)
p, err := protoMarshal(m)
if err != nil {
return nil, err
return err
}
return s.respond(nil, p, true, false)
}
func (s *streamHandler) RecvMsg(m interface{}) error {
select {
case unmarshal, ok := <-s.recv:
if !ok {
return io.EOF
}
return unmarshal(m)
case <-s.ctx.Done():
return s.ctx.Err()
}
}
func protoUnmarshal(p []byte, obj interface{}) error {
switch v := obj.(type) {
case proto.Message:
if err := proto.Unmarshal(p, v); err != nil {
return status.Errorf(codes.Internal, "ttrpc: error unmarshalling payload: %v", err.Error())
}
default:
return status.Errorf(codes.Internal, "ttrpc: error unsupported request type: %T", v)
}
return nil
}
func protoMarshal(obj interface{}) ([]byte, error) {
if obj == nil {
return nil, nil
}
if isNil(resp) {
return nil, errors.New("ttrpc: marshal called with nil")
}
switch v := resp.(type) {
switch v := obj.(type) {
case proto.Message:
r, err := proto.Marshal(v)
if err != nil {
@ -114,20 +237,6 @@ func (s *serviceSet) dispatch(ctx context.Context, serviceName, methodName strin
}
}
func (s *serviceSet) resolve(service, method string) (Method, error) {
srv, ok := s.services[service]
if !ok {
return nil, status.Errorf(codes.Unimplemented, "service %v", service)
}
mthd, ok := srv.Methods[method]
if !ok {
return nil, status.Errorf(codes.Unimplemented, "method %v", method)
}
return mthd, nil
}
// convertCode maps stdlib go errors into grpc space.
//
// This is ripped from the grpc-go code base.

84
vendor/github.com/containerd/ttrpc/stream.go generated vendored Normal file
View file

@ -0,0 +1,84 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ttrpc
import (
"context"
"sync"
)
type streamID uint32
type streamMessage struct {
header messageHeader
payload []byte
}
type stream struct {
id streamID
sender sender
recv chan *streamMessage
closeOnce sync.Once
recvErr error
recvClose chan struct{}
}
func newStream(id streamID, send sender) *stream {
return &stream{
id: id,
sender: send,
recv: make(chan *streamMessage, 1),
recvClose: make(chan struct{}),
}
}
func (s *stream) closeWithError(err error) error {
s.closeOnce.Do(func() {
if err != nil {
s.recvErr = err
} else {
s.recvErr = ErrClosed
}
close(s.recvClose)
})
return nil
}
func (s *stream) send(mt messageType, flags uint8, b []byte) error {
return s.sender.send(uint32(s.id), mt, flags, b)
}
func (s *stream) receive(ctx context.Context, msg *streamMessage) error {
select {
case <-s.recvClose:
return s.recvErr
default:
}
select {
case <-s.recvClose:
return s.recvErr
case s.recv <- msg:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
type sender interface {
send(uint32, messageType, uint8, []byte) error
}

22
vendor/github.com/containerd/ttrpc/stream_server.go generated vendored Normal file
View file

@ -0,0 +1,22 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ttrpc
type StreamServer interface {
SendMsg(m interface{}) error
RecvMsg(m interface{}) error
}

16
vendor/github.com/containerd/ttrpc/test.proto generated vendored Normal file
View file

@ -0,0 +1,16 @@
syntax = "proto3";
package ttrpc;
option go_package = "github.com/containerd/ttrpc/internal";
message TestPayload {
string foo = 1;
int64 deadline = 2;
string metadata = 3;
}
message EchoPayload {
int64 seq = 1;
string msg = 2;
}

View file

@ -1,63 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ttrpc
import (
"fmt"
spb "google.golang.org/genproto/googleapis/rpc/status"
)
type Request struct {
Service string `protobuf:"bytes,1,opt,name=service,proto3"`
Method string `protobuf:"bytes,2,opt,name=method,proto3"`
Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"`
TimeoutNano int64 `protobuf:"varint,4,opt,name=timeout_nano,proto3"`
Metadata []*KeyValue `protobuf:"bytes,5,rep,name=metadata,proto3"`
}
func (r *Request) Reset() { *r = Request{} }
func (r *Request) String() string { return fmt.Sprintf("%+#v", r) }
func (r *Request) ProtoMessage() {}
type Response struct {
Status *spb.Status `protobuf:"bytes,1,opt,name=status,proto3"`
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3"`
}
func (r *Response) Reset() { *r = Response{} }
func (r *Response) String() string { return fmt.Sprintf("%+#v", r) }
func (r *Response) ProtoMessage() {}
type StringList struct {
List []string `protobuf:"bytes,1,rep,name=list,proto3"`
}
func (r *StringList) Reset() { *r = StringList{} }
func (r *StringList) String() string { return fmt.Sprintf("%+#v", r) }
func (r *StringList) ProtoMessage() {}
func makeStringList(item ...string) StringList { return StringList{List: item} }
type KeyValue struct {
Key string `protobuf:"bytes,1,opt,name=key,proto3"`
Value string `protobuf:"bytes,2,opt,name=value,proto3"`
}
func (m *KeyValue) Reset() { *m = KeyValue{} }
func (*KeyValue) ProtoMessage() {}
func (m *KeyValue) String() string { return fmt.Sprintf("%+#v", m) }

View file

@ -29,7 +29,7 @@ import (
type UnixCredentialsFunc func(*unix.Ucred) error
func (fn UnixCredentialsFunc) Handshake(ctx context.Context, conn net.Conn) (net.Conn, interface{}, error) {
func (fn UnixCredentialsFunc) Handshake(_ context.Context, conn net.Conn) (net.Conn, interface{}, error) {
uc, err := requireUnixSocket(conn)
if err != nil {
return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: require unix socket: %w", err)
@ -50,7 +50,7 @@ func (fn UnixCredentialsFunc) Handshake(ctx context.Context, conn net.Conn) (net
}
if ucredErr != nil {
return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: failed to retrieve socket peer credentials: %w", err)
return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: failed to retrieve socket peer credentials: %w", ucredErr)
}
if err := fn(ucred); err != nil {
@ -88,10 +88,6 @@ func UnixSocketRequireSameUser() UnixCredentialsFunc {
return UnixSocketRequireUidGid(euid, egid)
}
func requireRoot(ucred *unix.Ucred) error {
return requireUidGid(ucred, 0, 0)
}
func requireUidGid(ucred *unix.Ucred, uid, gid int) error {
if (uid != -1 && uint32(uid) != ucred.Uid) || (gid != -1 && uint32(gid) != ucred.Gid) {
return fmt.Errorf("ttrpc: invalid credentials: %v", syscall.EPERM)

2
vendor/modules.txt vendored
View file

@ -340,7 +340,7 @@ github.com/containerd/nydus-snapshotter/pkg/errdefs
## explicit; go 1.16
github.com/containerd/stargz-snapshotter/estargz
github.com/containerd/stargz-snapshotter/estargz/errorutil
# github.com/containerd/ttrpc v1.1.1
# github.com/containerd/ttrpc v1.2.2
## explicit; go 1.13
github.com/containerd/ttrpc
# github.com/containerd/typeurl v1.0.2