diff --git a/vendor.mod b/vendor.mod index 14ddf2d1c5..e063f18152 100644 --- a/vendor.mod +++ b/vendor.mod @@ -130,7 +130,7 @@ require ( github.com/containerd/go-runc v1.1.0 // indirect github.com/containerd/nydus-snapshotter v0.3.1 // indirect github.com/containerd/stargz-snapshotter/estargz v0.13.0 // indirect - github.com/containerd/ttrpc v1.1.1 // indirect + github.com/containerd/ttrpc v1.2.2 // indirect github.com/containerd/typeurl v1.0.2 // indirect github.com/containernetworking/cni v1.1.2 // indirect github.com/cyphar/filepath-securejoin v0.2.3 // indirect diff --git a/vendor.sum b/vendor.sum index 666d732661..5c1fcb070b 100644 --- a/vendor.sum +++ b/vendor.sum @@ -411,8 +411,8 @@ github.com/containerd/ttrpc v0.0.0-20191028202541-4f1b8fe65a5c/go.mod h1:LPm1u0x github.com/containerd/ttrpc v1.0.1/go.mod h1:UAxOpgT9ziI0gJrmKvgcZivgxOp8iFPSk8httJEt98Y= github.com/containerd/ttrpc v1.0.2/go.mod h1:UAxOpgT9ziI0gJrmKvgcZivgxOp8iFPSk8httJEt98Y= github.com/containerd/ttrpc v1.1.0/go.mod h1:XX4ZTnoOId4HklF4edwc4DcqskFZuvXB1Evzy5KFQpQ= -github.com/containerd/ttrpc v1.1.1 h1:NoRHS/z8UiHhpY1w0xcOqoJDGf2DHyzXrF0H4l5AE8c= -github.com/containerd/ttrpc v1.1.1/go.mod h1:XX4ZTnoOId4HklF4edwc4DcqskFZuvXB1Evzy5KFQpQ= +github.com/containerd/ttrpc v1.2.2 h1:9vqZr0pxwOF5koz6N0N3kJ0zDHokrcPxIR/ZR2YFtOs= +github.com/containerd/ttrpc v1.2.2/go.mod h1:sIT6l32Ph/H9cvnJsfXM5drIVzTr5A2flTf1G5tYZak= github.com/containerd/typeurl v0.0.0-20180627222232-a93fcdb778cd/go.mod h1:Cm3kwCdlkCfMSHURc+r6fwoGH6/F1hH3S4sg0rLFWPc= github.com/containerd/typeurl v0.0.0-20190911142611-5eb25027c9fd/go.mod h1:GeKYzf2pQcqv7tJ0AoCuuhtnqhva5LNU3U+OyKxxJpk= github.com/containerd/typeurl v1.0.1/go.mod h1:TB1hUtrpaiO88KEK56ijojHS1+NeF0izUACaJW2mdXg= @@ -1787,6 +1787,7 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= diff --git a/vendor/github.com/containerd/ttrpc/.gitattributes b/vendor/github.com/containerd/ttrpc/.gitattributes new file mode 100644 index 0000000000..d207b1802b --- /dev/null +++ b/vendor/github.com/containerd/ttrpc/.gitattributes @@ -0,0 +1 @@ +*.go text eol=lf diff --git a/vendor/github.com/containerd/ttrpc/.gitignore b/vendor/github.com/containerd/ttrpc/.gitignore index ea58090bd2..88ceb2764b 100644 --- a/vendor/github.com/containerd/ttrpc/.gitignore +++ b/vendor/github.com/containerd/ttrpc/.gitignore @@ -1,4 +1,5 @@ # Binaries for programs and plugins +/bin/ *.exe *.dll *.so @@ -9,3 +10,4 @@ # Output of the go coverage tool, specifically when used with LiteIDE *.out +coverage.txt diff --git a/vendor/github.com/containerd/ttrpc/.golangci.yml b/vendor/github.com/containerd/ttrpc/.golangci.yml new file mode 100644 index 0000000000..6462e52f66 --- /dev/null +++ b/vendor/github.com/containerd/ttrpc/.golangci.yml @@ -0,0 +1,52 @@ +linters: + enable: + - staticcheck + - unconvert + - gofmt + - goimports + - revive + - ineffassign + - vet + - unused + - misspell + disable: + - errcheck + +linters-settings: + revive: + ignore-generated-headers: true + rules: + - name: blank-imports + - name: context-as-argument + - name: context-keys-type + - name: dot-imports + - name: error-return + - name: error-strings + - name: error-naming + - name: exported + - name: if-return + - name: increment-decrement + - name: var-naming + arguments: [["UID", "GID"], []] + - name: var-declaration + - name: package-comments + - name: range + - name: receiver-naming + - name: time-naming + - name: unexported-return + - name: indent-error-flow + - name: errorf + - name: empty-block + - name: superfluous-else + - name: unused-parameter + - name: unreachable-code + - name: redefines-builtin-id + +issues: + include: + - EXC0002 + +run: + timeout: 8m + skip-dirs: + - example diff --git a/vendor/github.com/containerd/ttrpc/Makefile b/vendor/github.com/containerd/ttrpc/Makefile new file mode 100644 index 0000000000..c3a497dcac --- /dev/null +++ b/vendor/github.com/containerd/ttrpc/Makefile @@ -0,0 +1,180 @@ +# Copyright The containerd Authors. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Go command to use for build +GO ?= go +INSTALL ?= install + +# Root directory of the project (absolute path). +ROOTDIR=$(dir $(abspath $(lastword $(MAKEFILE_LIST)))) + +WHALE = "🇩" +ONI = "👹" + +# Project binaries. +COMMANDS=protoc-gen-go-ttrpc protoc-gen-gogottrpc + +ifdef BUILDTAGS + GO_BUILDTAGS = ${BUILDTAGS} +endif +GO_BUILDTAGS ?= +GO_TAGS=$(if $(GO_BUILDTAGS),-tags "$(strip $(GO_BUILDTAGS))",) + +# Project packages. +PACKAGES=$(shell $(GO) list ${GO_TAGS} ./... | grep -v /example) +TESTPACKAGES=$(shell $(GO) list ${GO_TAGS} ./... | grep -v /cmd | grep -v /integration | grep -v /example) +BINPACKAGES=$(addprefix ./cmd/,$(COMMANDS)) + +#Replaces ":" (*nix), ";" (windows) with newline for easy parsing +GOPATHS=$(shell echo ${GOPATH} | tr ":" "\n" | tr ";" "\n") + +TESTFLAGS_RACE= +GO_BUILD_FLAGS= +# See Golang issue re: '-trimpath': https://github.com/golang/go/issues/13809 +GO_GCFLAGS=$(shell \ + set -- ${GOPATHS}; \ + echo "-gcflags=-trimpath=$${1}/src"; \ + ) + +BINARIES=$(addprefix bin/,$(COMMANDS)) + +# Flags passed to `go test` +TESTFLAGS ?= $(TESTFLAGS_RACE) $(EXTRA_TESTFLAGS) +TESTFLAGS_PARALLEL ?= 8 + +# Use this to replace `go test` with, for instance, `gotestsum` +GOTEST ?= $(GO) test + +.PHONY: clean all AUTHORS build binaries test integration generate protos check-protos coverage ci check help install vendor install-protobuf install-protobuild +.DEFAULT: default + +# Forcibly set the default goal to all, in case an include above brought in a rule definition. +.DEFAULT_GOAL := all + +all: binaries + +check: proto-fmt ## run all linters + @echo "$(WHALE) $@" + GOGC=75 golangci-lint run + +ci: check binaries check-protos coverage # coverage-integration ## to be used by the CI + +AUTHORS: .mailmap .git/HEAD + git log --format='%aN <%aE>' | sort -fu > $@ + +generate: protos + @echo "$(WHALE) $@" + @PATH="${ROOTDIR}/bin:${PATH}" $(GO) generate -x ${PACKAGES} + +protos: bin/protoc-gen-gogottrpc bin/protoc-gen-go-ttrpc ## generate protobuf + @echo "$(WHALE) $@" + @(PATH="${ROOTDIR}/bin:${PATH}" protobuild --quiet ${PACKAGES}) + +check-protos: protos ## check if protobufs needs to be generated again + @echo "$(WHALE) $@" + @test -z "$$(git status --short | grep ".pb.go" | tee /dev/stderr)" || \ + ((git diff | cat) && \ + (echo "$(ONI) please run 'make protos' when making changes to proto files" && false)) + +check-api-descriptors: protos ## check that protobuf changes aren't present. + @echo "$(WHALE) $@" + @test -z "$$(git status --short | grep ".pb.txt" | tee /dev/stderr)" || \ + ((git diff $$(find . -name '*.pb.txt') | cat) && \ + (echo "$(ONI) please run 'make protos' when making changes to proto files and check-in the generated descriptor file changes" && false)) + +proto-fmt: ## check format of proto files + @echo "$(WHALE) $@" + @test -z "$$(find . -name '*.proto' -type f -exec grep -Hn -e "^ " {} \; | tee /dev/stderr)" || \ + (echo "$(ONI) please indent proto files with tabs only" && false) + @test -z "$$(find . -name '*.proto' -type f -exec grep -Hn "Meta meta = " {} \; | grep -v '(gogoproto.nullable) = false' | tee /dev/stderr)" || \ + (echo "$(ONI) meta fields in proto files must have option (gogoproto.nullable) = false" && false) + +build: ## build the go packages + @echo "$(WHALE) $@" + @$(GO) build ${DEBUG_GO_GCFLAGS} ${GO_GCFLAGS} ${GO_BUILD_FLAGS} ${EXTRA_FLAGS} ${PACKAGES} + +test: ## run tests, except integration tests and tests that require root + @echo "$(WHALE) $@" + @$(GOTEST) ${TESTFLAGS} ${TESTPACKAGES} + +integration: ## run integration tests + @echo "$(WHALE) $@" + @cd "${ROOTDIR}/integration" && $(GOTEST) -v ${TESTFLAGS} -parallel ${TESTFLAGS_PARALLEL} . + +benchmark: ## run benchmarks tests + @echo "$(WHALE) $@" + @$(GO) test ${TESTFLAGS} -bench . -run Benchmark + +FORCE: + +define BUILD_BINARY +@echo "$(WHALE) $@" +@$(GO) build ${DEBUG_GO_GCFLAGS} ${GO_GCFLAGS} ${GO_BUILD_FLAGS} -o $@ ${GO_TAGS} ./$< +endef + +# Build a binary from a cmd. +bin/%: cmd/% FORCE + $(call BUILD_BINARY) + +binaries: $(BINARIES) ## build binaries + @echo "$(WHALE) $@" + +clean: ## clean up binaries + @echo "$(WHALE) $@" + @rm -f $(BINARIES) + +install: ## install binaries + @echo "$(WHALE) $@ $(BINPACKAGES)" + @$(GO) install $(BINPACKAGES) + +install-protobuf: + @echo "$(WHALE) $@" + @script/install-protobuf + +install-protobuild: + @echo "$(WHALE) $@" + @$(GO) install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28.1 + @$(GO) install github.com/containerd/protobuild@14832ccc41429f5c4f81028e5af08aa233a219cf + +coverage: ## generate coverprofiles from the unit tests, except tests that require root + @echo "$(WHALE) $@" + @rm -f coverage.txt + @$(GO) test ${TESTFLAGS} ${TESTPACKAGES} 2> /dev/null + @( for pkg in ${PACKAGES}; do \ + $(GO) test ${TESTFLAGS} \ + -cover \ + -coverprofile=profile.out \ + -covermode=atomic $$pkg || exit; \ + if [ -f profile.out ]; then \ + cat profile.out >> coverage.txt; \ + rm profile.out; \ + fi; \ + done ) + +vendor: ## ensure all the go.mod/go.sum files are up-to-date + @echo "$(WHALE) $@" + @$(GO) mod tidy + @$(GO) mod verify + +verify-vendor: ## verify if all the go.mod/go.sum files are up-to-date + @echo "$(WHALE) $@" + @$(GO) mod tidy + @$(GO) mod verify + @test -z "$$(git status --short | grep "go.sum" | tee /dev/stderr)" || \ + ((git diff | cat) && \ + (echo "$(ONI) make sure to checkin changes after go mod tidy" && false)) + +help: ## this help + @awk 'BEGIN {FS = ":.*?## "} /^[a-zA-Z_-]+:.*?## / {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST) | sort diff --git a/vendor/github.com/containerd/ttrpc/PROTOCOL.md b/vendor/github.com/containerd/ttrpc/PROTOCOL.md new file mode 100644 index 0000000000..12b43f6bd6 --- /dev/null +++ b/vendor/github.com/containerd/ttrpc/PROTOCOL.md @@ -0,0 +1,240 @@ +# Protocol Specification + +The ttrpc protocol is client/server protocol to support multiple request streams +over a single connection with lightweight framing. The client represents the +process which initiated the underlying connection and the server is the process +which accepted the connection. The protocol is currently defined as +asymmetrical, with clients sending requests and servers sending responses. Both +clients and servers are able to send stream data. The roles are also used in +determining the stream identifiers, with client initiated streams using odd +number identifiers and server initiated using even number. The protocol may be +extended in the future to support server initiated streams, that is not +supported in the latest version. + +## Purpose + +The ttrpc protocol is designed to be lightweight and optimized for low latency +and reliable connections between processes on the same host. The protocol does +not include features for handling unreliable connections such as handshakes, +resets, pings, or flow control. The protocol is designed to make low-overhead +implementations as simple as possible. It is not intended as a suitable +replacement for HTTP2/3 over the network. + +## Message Frame + +Each Message Frame consists of a 10-byte message header followed +by message data. The data length and stream ID are both big-endian +4-byte unsigned integers. The message type is an unsigned 1-byte +integer. The flags are also an unsigned 1-byte integer and +use is defined by the message type. + + +---------------------------------------------------------------+ + | Data Length (32) | + +---------------------------------------------------------------+ + | Stream ID (32) | + +---------------+-----------------------------------------------+ + | Msg Type (8) | + +---------------+ + | Flags (8) | + +---------------+-----------------------------------------------+ + | Data (*) | + +---------------------------------------------------------------+ + +The Data Length field represents the number of bytes in the Data field. The +total frame size will always be Data Length + 10 bytes. The maximum data length +is 4MB and any larger size should be rejected. Due to the maximum data size +being less than 16MB, the first frame byte should always be zero. This first +byte should be considered reserved for future use. + +The Stream ID must be odd for client initiated streams and even for server +initiated streams. Server initiated streams are not currently supported. + +## Mesage Types + +| Message Type | Name | Description | +|--------------|----------|----------------------------------| +| 0x01 | Request | Initiates stream | +| 0x02 | Response | Final stream data and terminates | +| 0x03 | Data | Stream data | + +### Request + +The request message is used to initiate stream and send along request data for +properly routing and handling the stream. The stream may indicate unary without +any inbound or outbound stream data with only a response is expected on the +stream. The request may also indicate the stream is still open for more data and +no response is expected until data is finished. If the remote indicates the +stream is closed, the request may be considered non-unary but without anymore +stream data sent. In the case of `remote closed`, the remote still expects to +receive a response or stream data. For compatibility with non streaming clients, +a request with empty flags indicates a unary request. + +#### Request Flags + +| Flag | Name | Description | +|------|-----------------|--------------------------------------------------| +| 0x01 | `remote closed` | Non-unary, but no more data expected from remote | +| 0x02 | `remote open` | Non-unary, remote is still sending data | + +### Response + +The response message is used to end a stream with data, an empty response, or +an error. A response message is the only expected message after a unary request. +A non-unary request does not require a response message if the server is sending +back stream data. A non-unary stream may return a single response message but no +other stream data may follow. + +#### Response Flags + +No response flags are defined at this time, flags should be empty. + +### Data + +The data message is used to send data on an already initialized stream. Either +client or server may send data. A data message is not allowed on a unary stream. +A data message should not be sent after indicating `remote closed` to the peer. +The last data message on a stream must set the `remote closed` flag. + +The `no data` flag is used to indicate that the data message does not include +any data. This is normally used with the `remote closed` flag to indicate the +stream is now closed without transmitting any data. Since ttrpc normally +transmits a single object per message, a zero length data message may be +interpreted as an empty object. For example, transmitting the number zero as a +protobuf message ends up with a data length of zero, but the message is still +considered data and should be processed. + +#### Data Flags + +| Flag | Name | Description | +|------|-----------------|-----------------------------------| +| 0x01 | `remote closed` | No more data expected from remote | +| 0x04 | `no data` | This message does not have data | + +## Streaming + +All ttrpc requests use streams to transfer data. Unary streams will only have +two messages sent per stream, a request from a client and a response from the +server. Non-unary streams, however, may send any numbers of messages from the +client and the server. This makes stream management more complicated than unary +streams since both client and server need to track additional state. To keep +this management as simple as possible, ttrpc minimizes the number of states and +uses two flags instead of control frames. Each stream has two states while a +stream is still alive: `local closed` and `remote closed`. Each peer considers +local and remote from their own perspective and sets flags from the other peer's +perspective. For example, if a client sends a data frame with the +`remote closed` flag, that is indicating that the client is now `local closed` +and the server will be `remote closed`. A unary operation does not need to send +these flags since each received message always indicates `remote closed`. Once a +peer is both `local closed` and `remote closed`, the stream is considered +finished and may be cleaned up. + +Due to the asymmetric nature of the current protocol, a client should +always be in the `local closed` state before `remote closed` and a server should +always be in the `remote closed` state before `local closed`. This happens +because the client is always initiating requests and a client always expects a +final response back from a server to indicate the initiated request has been +fulfilled. This may mean server sends a final empty response to finish a stream +even after it has already completed sending data before the client. + +### Unary State Diagram + + +--------+ +--------+ + | Client | | Server | + +---+----+ +----+---+ + | +---------+ | + local >---------------+ Request +--------------------> remote + closed | +---------+ | closed + | | + | +----------+ | + finished <--------------+ Response +--------------------< finished + | +----------+ | + | | + +### Non-Unary State Diagrams + +RC: `remote closed` flag +RO: `remote open` flag + + +--------+ +--------+ + | Client | | Server | + +---+----+ +----+---+ + | +--------------+ | + >-------------+ Request [RO] +-----------------> + | +--------------+ | + | | + | +------+ | + >-----------------+ Data +---------------------> + | +------+ | + | | + | +-----------+ | + local >---------------+ Data [RC] +------------------> remote + closed | +-----------+ | closed + | | + | +----------+ | + finished <--------------+ Response +--------------------< finished + | +----------+ | + | | + + +--------+ +--------+ + | Client | | Server | + +---+----+ +----+---+ + | +--------------+ | + local >-------------+ Request [RC] +-----------------> remote + closed | +--------------+ | closed + | | + | +------+ | + <-----------------+ Data +---------------------< + | +------+ | + | | + | +-----------+ | + finished <---------------+ Data [RC] +------------------< finished + | +-----------+ | + | | + + +--------+ +--------+ + | Client | | Server | + +---+----+ +----+---+ + | +--------------+ | + >-------------+ Request [RO] +-----------------> + | +--------------+ | + | | + | +------+ | + >-----------------+ Data +---------------------> + | +------+ | + | | + | +------+ | + <-----------------+ Data +---------------------< + | +------+ | + | | + | +------+ | + >-----------------+ Data +---------------------> + | +------+ | + | | + | +-----------+ | + local >---------------+ Data [RC] +------------------> remote + closed | +-----------+ | closed + | | + | +------+ | + <-----------------+ Data +---------------------< + | +------+ | + | | + | +-----------+ | + finished <---------------+ Data [RC] +------------------< finished + | +-----------+ | + | | + +## RPC + +While this protocol is defined primarily to support Remote Procedure Calls, the +protocol does not define the request and response types beyond the messages +defined in the protocol. The implementation provides a default protobuf +definition of request and response which may be used for cross language rpc. +All implementations should at least define a request type which support +routing by procedure name and a response type which supports call status. + +## Version History + +| Version | Features | +|---------|---------------------| +| 1.0 | Unary requests only | +| 1.2 | Streaming support | diff --git a/vendor/github.com/containerd/ttrpc/Protobuild.toml b/vendor/github.com/containerd/ttrpc/Protobuild.toml new file mode 100644 index 0000000000..0f6ccbd1e8 --- /dev/null +++ b/vendor/github.com/containerd/ttrpc/Protobuild.toml @@ -0,0 +1,28 @@ +version = "2" +generators = ["go"] + +# Control protoc include paths. Below are usually some good defaults, but feel +# free to try it without them if it works for your project. +[includes] + # Include paths that will be added before all others. Typically, you want to + # treat the root of the project as an include, but this may not be necessary. + before = ["."] + + # Paths that will be added untouched to the end of the includes. We use + # `/usr/local/include` to pickup the common install location of protobuf. + # This is the default. + after = ["/usr/local/include"] + +# This section maps protobuf imports to Go packages. These will become +# `-M` directives in the call to the go protobuf generator. +[packages] + "google/protobuf/any.proto" = "github.com/gogo/protobuf/types" + "proto/status.proto" = "google.golang.org/genproto/googleapis/rpc/status" + +[[overrides]] +# enable ttrpc and disable fieldpath and grpc for the shim +prefixes = ["github.com/containerd/ttrpc/integration/streaming"] +generators = ["go", "go-ttrpc"] + +[overrides.parameters.go-ttrpc] +prefix = "TTRPC" diff --git a/vendor/github.com/containerd/ttrpc/README.md b/vendor/github.com/containerd/ttrpc/README.md index 547a1297df..675a5179ef 100644 --- a/vendor/github.com/containerd/ttrpc/README.md +++ b/vendor/github.com/containerd/ttrpc/README.md @@ -1,7 +1,6 @@ # ttrpc [![Build Status](https://github.com/containerd/ttrpc/workflows/CI/badge.svg)](https://github.com/containerd/ttrpc/actions?query=workflow%3ACI) -[![codecov](https://codecov.io/gh/containerd/ttrpc/branch/main/graph/badge.svg)](https://codecov.io/gh/containerd/ttrpc) GRPC for low-memory environments. @@ -20,13 +19,17 @@ Please note that while this project supports generating either end of the protocol, the generated service definitions will be incompatible with regular GRPC services, as they do not speak the same protocol. +# Protocol + +See the [protocol specification](./PROTOCOL.md). + # Usage Create a gogo vanity binary (see [`cmd/protoc-gen-gogottrpc/main.go`](cmd/protoc-gen-gogottrpc/main.go) for an example with the ttrpc plugin enabled. -It's recommended to use [`protobuild`](https://github.com//stevvooe/protobuild) +It's recommended to use [`protobuild`](https://github.com/containerd/protobuild) to build the protobufs for this project, but this will work with protoc directly, if required. @@ -37,13 +40,11 @@ directly, if required. - The client and server interface are identical whereas in GRPC there is a client and server interface that are different. - The Go stdlib context package is used instead. -- No support for streams yet. # Status TODO: -- [ ] Document protocol layout - [ ] Add testing under concurrent load to ensure - [ ] Verify connection error handling diff --git a/vendor/github.com/containerd/ttrpc/channel.go b/vendor/github.com/containerd/ttrpc/channel.go index 81116a5e23..feafd9a6b5 100644 --- a/vendor/github.com/containerd/ttrpc/channel.go +++ b/vendor/github.com/containerd/ttrpc/channel.go @@ -38,6 +38,26 @@ type messageType uint8 const ( messageTypeRequest messageType = 0x1 messageTypeResponse messageType = 0x2 + messageTypeData messageType = 0x3 +) + +func (mt messageType) String() string { + switch mt { + case messageTypeRequest: + return "request" + case messageTypeResponse: + return "response" + case messageTypeData: + return "data" + default: + return "unknown" + } +} + +const ( + flagRemoteClosed uint8 = 0x1 + flagRemoteOpen uint8 = 0x2 + flagNoData uint8 = 0x4 ) // messageHeader represents the fixed-length message header of 10 bytes sent @@ -46,7 +66,7 @@ type messageHeader struct { Length uint32 // length excluding this header. b[:4] StreamID uint32 // identifies which request stream message is a part of. b[4:8] Type messageType // message type b[8] - Flags uint8 // reserved b[9] + Flags uint8 // type specific flags b[9] } func readMessageHeader(p []byte, r io.Reader) (messageHeader, error) { @@ -111,22 +131,31 @@ func (ch *channel) recv() (messageHeader, []byte, error) { return mh, nil, status.Errorf(codes.ResourceExhausted, "message length %v exceed maximum message size of %v", mh.Length, messageLengthMax) } - p := ch.getmbuf(int(mh.Length)) - if _, err := io.ReadFull(ch.br, p); err != nil { - return messageHeader{}, nil, fmt.Errorf("failed reading message: %w", err) + var p []byte + if mh.Length > 0 { + p = ch.getmbuf(int(mh.Length)) + if _, err := io.ReadFull(ch.br, p); err != nil { + return messageHeader{}, nil, fmt.Errorf("failed reading message: %w", err) + } } return mh, p, nil } -func (ch *channel) send(streamID uint32, t messageType, p []byte) error { - if err := writeMessageHeader(ch.bw, ch.hwbuf[:], messageHeader{Length: uint32(len(p)), StreamID: streamID, Type: t}); err != nil { +func (ch *channel) send(streamID uint32, t messageType, flags uint8, p []byte) error { + // TODO: Error on send rather than on recv + //if len(p) > messageLengthMax { + // return status.Errorf(codes.InvalidArgument, "refusing to send, message length %v exceed maximum message size of %v", len(p), messageLengthMax) + //} + if err := writeMessageHeader(ch.bw, ch.hwbuf[:], messageHeader{Length: uint32(len(p)), StreamID: streamID, Type: t, Flags: flags}); err != nil { return err } - _, err := ch.bw.Write(p) - if err != nil { - return err + if len(p) > 0 { + _, err := ch.bw.Write(p) + if err != nil { + return err + } } return ch.bw.Flush() diff --git a/vendor/github.com/containerd/ttrpc/client.go b/vendor/github.com/containerd/ttrpc/client.go index 26c3dd2a98..4b1e1e709b 100644 --- a/vendor/github.com/containerd/ttrpc/client.go +++ b/vendor/github.com/containerd/ttrpc/client.go @@ -19,30 +19,30 @@ package ttrpc import ( "context" "errors" + "fmt" "io" "net" - "os" "strings" "sync" "syscall" "time" - "github.com/gogo/protobuf/proto" "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" ) -// ErrClosed is returned by client methods when the underlying connection is -// closed. -var ErrClosed = errors.New("ttrpc: closed") - // Client for a ttrpc server type Client struct { codec codec conn net.Conn channel *channel - calls chan *callRequest + + streamLock sync.RWMutex + streams map[streamID]*stream + nextStreamID streamID + sendLock sync.Mutex ctx context.Context closed func() @@ -51,8 +51,6 @@ type Client struct { userCloseFunc func() userCloseWaitCh chan struct{} - errOnce sync.Once - err error interceptor UnaryClientInterceptor } @@ -73,13 +71,16 @@ func WithUnaryClientInterceptor(i UnaryClientInterceptor) ClientOpts { } } +// NewClient creates a new ttrpc client using the given connection func NewClient(conn net.Conn, opts ...ClientOpts) *Client { ctx, cancel := context.WithCancel(context.Background()) + channel := newChannel(conn) c := &Client{ codec: codec{}, conn: conn, - channel: newChannel(conn), - calls: make(chan *callRequest), + channel: channel, + streams: make(map[streamID]*stream), + nextStreamID: 1, closed: cancel, ctx: ctx, userCloseFunc: func() {}, @@ -95,13 +96,13 @@ func NewClient(conn net.Conn, opts ...ClientOpts) *Client { return c } -type callRequest struct { - ctx context.Context - req *Request - resp *Response // response will be written back here - errs chan error // error written here on completion +func (c *Client) send(sid uint32, mt messageType, flags uint8, b []byte) error { + c.sendLock.Lock() + defer c.sendLock.Unlock() + return c.channel.send(sid, mt, flags, b) } +// Call makes a unary request and returns with response func (c *Client) Call(ctx context.Context, service, method string, req, resp interface{}) error { payload, err := c.codec.Marshal(req) if err != nil { @@ -113,6 +114,7 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int Service: service, Method: method, Payload: payload, + // TODO: metadata from context } cresp = &Response{} @@ -123,7 +125,7 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int } if dl, ok := ctx.Deadline(); ok { - creq.TimeoutNano = dl.Sub(time.Now()).Nanoseconds() + creq.TimeoutNano = time.Until(dl).Nanoseconds() } info := &UnaryClientInfo{ @@ -143,36 +145,143 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int return nil } -func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error { - errs := make(chan error, 1) - call := &callRequest{ - ctx: ctx, - req: req, - resp: resp, - errs: errs, - } - - select { - case <-ctx.Done(): - return ctx.Err() - case c.calls <- call: - case <-c.ctx.Done(): - return c.error() - } - - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-errs: - return filterCloseErr(err) - case <-c.ctx.Done(): - return c.error() - } +// StreamDesc describes the stream properties, whether the stream has +// a streaming client, a streaming server, or both +type StreamDesc struct { + StreamingClient bool + StreamingServer bool } +// ClientStream is used to send or recv messages on the underlying stream +type ClientStream interface { + CloseSend() error + SendMsg(m interface{}) error + RecvMsg(m interface{}) error +} + +type clientStream struct { + ctx context.Context + s *stream + c *Client + desc *StreamDesc + localClosed bool + remoteClosed bool +} + +func (cs *clientStream) CloseSend() error { + if !cs.desc.StreamingClient { + return fmt.Errorf("%w: cannot close non-streaming client", ErrProtocol) + } + if cs.localClosed { + return ErrStreamClosed + } + err := cs.s.send(messageTypeData, flagRemoteClosed|flagNoData, nil) + if err != nil { + return filterCloseErr(err) + } + cs.localClosed = true + return nil +} + +func (cs *clientStream) SendMsg(m interface{}) error { + if !cs.desc.StreamingClient { + return fmt.Errorf("%w: cannot send data from non-streaming client", ErrProtocol) + } + if cs.localClosed { + return ErrStreamClosed + } + + var ( + payload []byte + err error + ) + if m != nil { + payload, err = cs.c.codec.Marshal(m) + if err != nil { + return err + } + } + + err = cs.s.send(messageTypeData, 0, payload) + if err != nil { + return filterCloseErr(err) + } + + return nil +} + +func (cs *clientStream) RecvMsg(m interface{}) error { + if cs.remoteClosed { + return io.EOF + } + + var msg *streamMessage + select { + case <-cs.ctx.Done(): + return cs.ctx.Err() + case <-cs.s.recvClose: + // If recv has a pending message, process that first + select { + case msg = <-cs.s.recv: + default: + return cs.s.recvErr + } + case msg = <-cs.s.recv: + } + + if msg.header.Type == messageTypeResponse { + resp := &Response{} + err := proto.Unmarshal(msg.payload[:msg.header.Length], resp) + // return the payload buffer for reuse + cs.c.channel.putmbuf(msg.payload) + if err != nil { + return err + } + + if err := cs.c.codec.Unmarshal(resp.Payload, m); err != nil { + return err + } + + if resp.Status != nil && resp.Status.Code != int32(codes.OK) { + return status.ErrorProto(resp.Status) + } + + cs.c.deleteStream(cs.s) + cs.remoteClosed = true + + return nil + } else if msg.header.Type == messageTypeData { + if !cs.desc.StreamingServer { + cs.c.deleteStream(cs.s) + cs.remoteClosed = true + return fmt.Errorf("received data from non-streaming server: %w", ErrProtocol) + } + if msg.header.Flags&flagRemoteClosed == flagRemoteClosed { + cs.c.deleteStream(cs.s) + cs.remoteClosed = true + + if msg.header.Flags&flagNoData == flagNoData { + return io.EOF + } + } + + err := cs.c.codec.Unmarshal(msg.payload[:msg.header.Length], m) + cs.c.channel.putmbuf(msg.payload) + if err != nil { + return err + } + return nil + } + + return fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol) +} + +// Close closes the ttrpc connection and underlying connection func (c *Client) Close() error { c.closeOnce.Do(func() { c.closed() + + c.conn.Close() }) return nil } @@ -188,194 +297,105 @@ func (c *Client) UserOnCloseWait(ctx context.Context) error { } } -type message struct { - messageHeader - p []byte - err error -} - -// callMap provides access to a map of active calls, guarded by a mutex. -type callMap struct { - m sync.Mutex - activeCalls map[uint32]*callRequest - closeErr error -} - -// newCallMap returns a new callMap with an empty set of active calls. -func newCallMap() *callMap { - return &callMap{ - activeCalls: make(map[uint32]*callRequest), - } -} - -// set adds a call entry to the map with the given streamID key. -func (cm *callMap) set(streamID uint32, cr *callRequest) error { - cm.m.Lock() - defer cm.m.Unlock() - if cm.closeErr != nil { - return cm.closeErr - } - cm.activeCalls[streamID] = cr - return nil -} - -// get looks up the call entry for the given streamID key, then removes it -// from the map and returns it. -func (cm *callMap) get(streamID uint32) (cr *callRequest, ok bool, err error) { - cm.m.Lock() - defer cm.m.Unlock() - if cm.closeErr != nil { - return nil, false, cm.closeErr - } - cr, ok = cm.activeCalls[streamID] - if ok { - delete(cm.activeCalls, streamID) - } - return -} - -// abort sends the given error to each active call, and clears the map. -// Once abort has been called, any subsequent calls to the callMap will return the error passed to abort. -func (cm *callMap) abort(err error) error { - cm.m.Lock() - defer cm.m.Unlock() - if cm.closeErr != nil { - return cm.closeErr - } - for streamID, call := range cm.activeCalls { - call.errs <- err - delete(cm.activeCalls, streamID) - } - cm.closeErr = err - return nil -} - func (c *Client) run() { - var ( - waiters = newCallMap() - receiverDone = make(chan struct{}) - ) + err := c.receiveLoop() + c.Close() + c.cleanupStreams(err) - // Sender goroutine - // Receives calls from dispatch, adds them to the set of active calls, and sends them - // to the server. - go func() { - var streamID uint32 = 1 - for { - select { - case <-c.ctx.Done(): - return - case call := <-c.calls: - id := streamID - streamID += 2 // enforce odd client initiated request ids - if err := waiters.set(id, call); err != nil { - call.errs <- err // errs is buffered so should not block. - continue - } - if err := c.send(id, messageTypeRequest, call.req); err != nil { - call.errs <- err // errs is buffered so should not block. - waiters.get(id) // remove from waiters set - } - } - } - }() - - // Receiver goroutine - // Receives responses from the server, looks up the call info in the set of active calls, - // and notifies the caller of the response. - go func() { - defer close(receiverDone) - for { - select { - case <-c.ctx.Done(): - c.setError(c.ctx.Err()) - return - default: - mh, p, err := c.channel.recv() - if err != nil { - _, ok := status.FromError(err) - if !ok { - // treat all errors that are not an rpc status as terminal. - // all others poison the connection. - c.setError(filterCloseErr(err)) - return - } - } - msg := &message{ - messageHeader: mh, - p: p[:mh.Length], - err: err, - } - call, ok, err := waiters.get(mh.StreamID) - if err != nil { - logrus.Errorf("ttrpc: failed to look up active call: %s", err) - continue - } - if !ok { - logrus.Errorf("ttrpc: received message for unknown channel %v", mh.StreamID) - continue - } - call.errs <- c.recv(call.resp, msg) - } - } - }() - - defer func() { - c.conn.Close() - c.userCloseFunc() - close(c.userCloseWaitCh) - }() + c.userCloseFunc() + close(c.userCloseWaitCh) +} +func (c *Client) receiveLoop() error { for { select { - case <-receiverDone: - // The receiver has exited. - // don't return out, let the close of the context trigger the abort of waiters - c.Close() case <-c.ctx.Done(): - // Abort all active calls. This will also prevent any new calls from being added - // to waiters. - waiters.abort(c.error()) - return + return ErrClosed + default: + var ( + msg = &streamMessage{} + err error + ) + + msg.header, msg.payload, err = c.channel.recv() + if err != nil { + _, ok := status.FromError(err) + if !ok { + // treat all errors that are not an rpc status as terminal. + // all others poison the connection. + return filterCloseErr(err) + } + } + sid := streamID(msg.header.StreamID) + s := c.getStream(sid) + if s == nil { + logrus.WithField("stream", sid).Errorf("ttrpc: received message on inactive stream") + continue + } + + if err != nil { + s.closeWithError(err) + } else { + if err := s.receive(c.ctx, msg); err != nil { + logrus.WithError(err).WithField("stream", sid).Errorf("ttrpc: failed to handle message") + } + } } } } -func (c *Client) error() error { - c.errOnce.Do(func() { - if c.err == nil { - c.err = ErrClosed - } - }) - return c.err -} +// createStream creates a new stream and registers it with the client +// Introduce stream types for multiple or single response +func (c *Client) createStream(flags uint8, b []byte) (*stream, error) { + c.streamLock.Lock() -func (c *Client) setError(err error) { - c.errOnce.Do(func() { - c.err = err - }) -} - -func (c *Client) send(streamID uint32, mtype messageType, msg interface{}) error { - p, err := c.codec.Marshal(msg) - if err != nil { - return err + // Check if closed since lock acquired to prevent adding + // anything after cleanup completes + select { + case <-c.ctx.Done(): + c.streamLock.Unlock() + return nil, ErrClosed + default: } - return c.channel.send(streamID, mtype, p) + // Stream ID should be allocated at same time + s := newStream(c.nextStreamID, c) + c.streams[s.id] = s + c.nextStreamID = c.nextStreamID + 2 + + c.sendLock.Lock() + defer c.sendLock.Unlock() + c.streamLock.Unlock() + + if err := c.channel.send(uint32(s.id), messageTypeRequest, flags, b); err != nil { + return s, filterCloseErr(err) + } + + return s, nil } -func (c *Client) recv(resp *Response, msg *message) error { - if msg.err != nil { - return msg.err - } +func (c *Client) deleteStream(s *stream) { + c.streamLock.Lock() + delete(c.streams, s.id) + c.streamLock.Unlock() + s.closeWithError(nil) +} - if msg.Type != messageTypeResponse { - return errors.New("unknown message type received") - } +func (c *Client) getStream(sid streamID) *stream { + c.streamLock.RLock() + s := c.streams[sid] + c.streamLock.RUnlock() + return s +} - defer c.channel.putmbuf(msg.p) - return proto.Unmarshal(msg.p, resp) +func (c *Client) cleanupStreams(err error) { + c.streamLock.Lock() + defer c.streamLock.Unlock() + + for sid, s := range c.streams { + s.closeWithError(err) + delete(c.streams, sid) + } } // filterCloseErr rewrites EOF and EPIPE errors to ErrClosed. Use when @@ -388,6 +408,8 @@ func filterCloseErr(err error) error { return nil case err == io.EOF: return ErrClosed + case errors.Is(err, io.ErrClosedPipe): + return ErrClosed case errors.Is(err, io.EOF): return ErrClosed case strings.Contains(err.Error(), "use of closed network connection"): @@ -395,11 +417,9 @@ func filterCloseErr(err error) error { default: // if we have an epipe on a write or econnreset on a read , we cast to errclosed var oerr *net.OpError - if errors.As(err, &oerr) && (oerr.Op == "write" || oerr.Op == "read") { - serr, sok := oerr.Err.(*os.SyscallError) - if sok && ((serr.Err == syscall.EPIPE && oerr.Op == "write") || - (serr.Err == syscall.ECONNRESET && oerr.Op == "read")) { - + if errors.As(err, &oerr) { + if (oerr.Op == "write" && errors.Is(err, syscall.EPIPE)) || + (oerr.Op == "read" && errors.Is(err, syscall.ECONNRESET)) { return ErrClosed } } @@ -407,3 +427,86 @@ func filterCloseErr(err error) error { return err } + +// NewStream creates a new stream with the given stream descriptor to the +// specified service and method. If not a streaming client, the request object +// may be provided. +func (c *Client) NewStream(ctx context.Context, desc *StreamDesc, service, method string, req interface{}) (ClientStream, error) { + var payload []byte + if req != nil { + var err error + payload, err = c.codec.Marshal(req) + if err != nil { + return nil, err + } + } + + request := &Request{ + Service: service, + Method: method, + Payload: payload, + // TODO: metadata from context + } + p, err := c.codec.Marshal(request) + if err != nil { + return nil, err + } + + var flags uint8 + if desc.StreamingClient { + flags = flagRemoteOpen + } else { + flags = flagRemoteClosed + } + s, err := c.createStream(flags, p) + if err != nil { + return nil, err + } + + return &clientStream{ + ctx: ctx, + s: s, + c: c, + desc: desc, + }, nil +} + +func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error { + p, err := c.codec.Marshal(req) + if err != nil { + return err + } + + s, err := c.createStream(0, p) + if err != nil { + return err + } + defer c.deleteStream(s) + + var msg *streamMessage + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.ctx.Done(): + return ErrClosed + case <-s.recvClose: + // If recv has a pending message, process that first + select { + case msg = <-s.recv: + default: + return s.recvErr + } + case msg = <-s.recv: + } + + if msg.header.Type == messageTypeResponse { + err = proto.Unmarshal(msg.payload[:msg.header.Length], resp) + } else { + err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol) + } + + // return the payload buffer for reuse + c.channel.putmbuf(msg.payload) + + return err +} diff --git a/vendor/github.com/containerd/ttrpc/codec.go b/vendor/github.com/containerd/ttrpc/codec.go index 880634c27e..3e82722a42 100644 --- a/vendor/github.com/containerd/ttrpc/codec.go +++ b/vendor/github.com/containerd/ttrpc/codec.go @@ -19,7 +19,7 @@ package ttrpc import ( "fmt" - "github.com/gogo/protobuf/proto" + "google.golang.org/protobuf/proto" ) type codec struct{} diff --git a/vendor/github.com/containerd/ttrpc/doc.go b/vendor/github.com/containerd/ttrpc/doc.go new file mode 100644 index 0000000000..d80cd424cc --- /dev/null +++ b/vendor/github.com/containerd/ttrpc/doc.go @@ -0,0 +1,23 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/* +package ttrpc defines and implements a low level simple transfer protocol +optimized for low latency and reliable connections between processes on the same +host. The protocol uses simple framing for sending requests, responses, and data +using multiple streams. +*/ +package ttrpc diff --git a/vendor/github.com/containerd/ttrpc/errors.go b/vendor/github.com/containerd/ttrpc/errors.go new file mode 100644 index 0000000000..ec14b7952b --- /dev/null +++ b/vendor/github.com/containerd/ttrpc/errors.go @@ -0,0 +1,34 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package ttrpc + +import "errors" + +var ( + // ErrProtocol is a general error in the handling the protocol. + ErrProtocol = errors.New("protocol error") + + // ErrClosed is returned by client methods when the underlying connection is + // closed. + ErrClosed = errors.New("ttrpc: closed") + + // ErrServerClosed is returned when the Server has closed its connection. + ErrServerClosed = errors.New("ttrpc: server closed") + + // ErrStreamClosed is when the streaming connection is closed. + ErrStreamClosed = errors.New("ttrpc: stream closed") +) diff --git a/vendor/github.com/containerd/ttrpc/handshake.go b/vendor/github.com/containerd/ttrpc/handshake.go index a424b67a49..3c6b610d35 100644 --- a/vendor/github.com/containerd/ttrpc/handshake.go +++ b/vendor/github.com/containerd/ttrpc/handshake.go @@ -45,6 +45,6 @@ func (fn handshakerFunc) Handshake(ctx context.Context, conn net.Conn) (net.Conn return fn(ctx, conn) } -func noopHandshake(ctx context.Context, conn net.Conn) (net.Conn, interface{}, error) { +func noopHandshake(_ context.Context, conn net.Conn) (net.Conn, interface{}, error) { return conn, nil, nil } diff --git a/vendor/github.com/containerd/ttrpc/interceptor.go b/vendor/github.com/containerd/ttrpc/interceptor.go index c1219dac65..7ff5e9d33f 100644 --- a/vendor/github.com/containerd/ttrpc/interceptor.go +++ b/vendor/github.com/containerd/ttrpc/interceptor.go @@ -28,6 +28,13 @@ type UnaryClientInfo struct { FullMethod string } +// StreamServerInfo provides information about the server request +type StreamServerInfo struct { + FullMethod string + StreamingClient bool + StreamingServer bool +} + // Unmarshaler contains the server request data and allows it to be unmarshaled // into a concrete type type Unmarshaler func(interface{}) error @@ -41,10 +48,18 @@ type UnaryServerInterceptor func(context.Context, Unmarshaler, *UnaryServerInfo, // UnaryClientInterceptor specifies the interceptor function for client request/response type UnaryClientInterceptor func(context.Context, *Request, *Response, *UnaryClientInfo, Invoker) error -func defaultServerInterceptor(ctx context.Context, unmarshal Unmarshaler, info *UnaryServerInfo, method Method) (interface{}, error) { +func defaultServerInterceptor(ctx context.Context, unmarshal Unmarshaler, _ *UnaryServerInfo, method Method) (interface{}, error) { return method(ctx, unmarshal) } func defaultClientInterceptor(ctx context.Context, req *Request, resp *Response, _ *UnaryClientInfo, invoker Invoker) error { return invoker(ctx, req, resp) } + +type StreamServerInterceptor func(context.Context, StreamServer, *StreamServerInfo, StreamHandler) (interface{}, error) + +func defaultStreamServerInterceptor(ctx context.Context, ss StreamServer, _ *StreamServerInfo, stream StreamHandler) (interface{}, error) { + return stream(ctx, ss) +} + +type StreamClientInterceptor func(context.Context) diff --git a/vendor/github.com/containerd/ttrpc/request.pb.go b/vendor/github.com/containerd/ttrpc/request.pb.go new file mode 100644 index 0000000000..3921ae5a35 --- /dev/null +++ b/vendor/github.com/containerd/ttrpc/request.pb.go @@ -0,0 +1,396 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.20.1 +// source: github.com/containerd/ttrpc/request.proto + +package ttrpc + +import ( + status "google.golang.org/genproto/googleapis/rpc/status" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Request struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` + Method string `protobuf:"bytes,2,opt,name=method,proto3" json:"method,omitempty"` + Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"` + TimeoutNano int64 `protobuf:"varint,4,opt,name=timeout_nano,json=timeoutNano,proto3" json:"timeout_nano,omitempty"` + Metadata []*KeyValue `protobuf:"bytes,5,rep,name=metadata,proto3" json:"metadata,omitempty"` +} + +func (x *Request) Reset() { + *x = Request{} + if protoimpl.UnsafeEnabled { + mi := &file_github_com_containerd_ttrpc_request_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Request) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Request) ProtoMessage() {} + +func (x *Request) ProtoReflect() protoreflect.Message { + mi := &file_github_com_containerd_ttrpc_request_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Request.ProtoReflect.Descriptor instead. +func (*Request) Descriptor() ([]byte, []int) { + return file_github_com_containerd_ttrpc_request_proto_rawDescGZIP(), []int{0} +} + +func (x *Request) GetService() string { + if x != nil { + return x.Service + } + return "" +} + +func (x *Request) GetMethod() string { + if x != nil { + return x.Method + } + return "" +} + +func (x *Request) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +func (x *Request) GetTimeoutNano() int64 { + if x != nil { + return x.TimeoutNano + } + return 0 +} + +func (x *Request) GetMetadata() []*KeyValue { + if x != nil { + return x.Metadata + } + return nil +} + +type Response struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Status *status.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` +} + +func (x *Response) Reset() { + *x = Response{} + if protoimpl.UnsafeEnabled { + mi := &file_github_com_containerd_ttrpc_request_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Response) ProtoMessage() {} + +func (x *Response) ProtoReflect() protoreflect.Message { + mi := &file_github_com_containerd_ttrpc_request_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Response.ProtoReflect.Descriptor instead. +func (*Response) Descriptor() ([]byte, []int) { + return file_github_com_containerd_ttrpc_request_proto_rawDescGZIP(), []int{1} +} + +func (x *Response) GetStatus() *status.Status { + if x != nil { + return x.Status + } + return nil +} + +func (x *Response) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +type StringList struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + List []string `protobuf:"bytes,1,rep,name=list,proto3" json:"list,omitempty"` +} + +func (x *StringList) Reset() { + *x = StringList{} + if protoimpl.UnsafeEnabled { + mi := &file_github_com_containerd_ttrpc_request_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StringList) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StringList) ProtoMessage() {} + +func (x *StringList) ProtoReflect() protoreflect.Message { + mi := &file_github_com_containerd_ttrpc_request_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StringList.ProtoReflect.Descriptor instead. +func (*StringList) Descriptor() ([]byte, []int) { + return file_github_com_containerd_ttrpc_request_proto_rawDescGZIP(), []int{2} +} + +func (x *StringList) GetList() []string { + if x != nil { + return x.List + } + return nil +} + +type KeyValue struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` +} + +func (x *KeyValue) Reset() { + *x = KeyValue{} + if protoimpl.UnsafeEnabled { + mi := &file_github_com_containerd_ttrpc_request_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *KeyValue) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*KeyValue) ProtoMessage() {} + +func (x *KeyValue) ProtoReflect() protoreflect.Message { + mi := &file_github_com_containerd_ttrpc_request_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use KeyValue.ProtoReflect.Descriptor instead. +func (*KeyValue) Descriptor() ([]byte, []int) { + return file_github_com_containerd_ttrpc_request_proto_rawDescGZIP(), []int{3} +} + +func (x *KeyValue) GetKey() string { + if x != nil { + return x.Key + } + return "" +} + +func (x *KeyValue) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +var File_github_com_containerd_ttrpc_request_proto protoreflect.FileDescriptor + +var file_github_com_containerd_ttrpc_request_proto_rawDesc = []byte{ + 0x0a, 0x29, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, + 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2f, 0x72, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x74, 0x74, 0x72, + 0x70, 0x63, 0x1a, 0x12, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa5, 0x01, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x16, 0x0a, 0x06, + 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, 0x65, + 0x74, 0x68, 0x6f, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x21, + 0x0a, 0x0c, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x5f, 0x6e, 0x61, 0x6e, 0x6f, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x4e, 0x61, 0x6e, + 0x6f, 0x12, 0x2b, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x74, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x4b, 0x65, 0x79, 0x56, + 0x61, 0x6c, 0x75, 0x65, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x22, 0x45, + 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1f, 0x0a, 0x06, 0x73, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x07, 0x2e, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x70, + 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, + 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x20, 0x0a, 0x0a, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x4c, + 0x69, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6c, 0x69, 0x73, 0x74, 0x18, 0x01, 0x20, 0x03, 0x28, + 0x09, 0x52, 0x04, 0x6c, 0x69, 0x73, 0x74, 0x22, 0x32, 0x0a, 0x08, 0x4b, 0x65, 0x79, 0x56, 0x61, + 0x6c, 0x75, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x1d, 0x5a, 0x1b, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, + 0x6e, 0x65, 0x72, 0x64, 0x2f, 0x74, 0x74, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, +} + +var ( + file_github_com_containerd_ttrpc_request_proto_rawDescOnce sync.Once + file_github_com_containerd_ttrpc_request_proto_rawDescData = file_github_com_containerd_ttrpc_request_proto_rawDesc +) + +func file_github_com_containerd_ttrpc_request_proto_rawDescGZIP() []byte { + file_github_com_containerd_ttrpc_request_proto_rawDescOnce.Do(func() { + file_github_com_containerd_ttrpc_request_proto_rawDescData = protoimpl.X.CompressGZIP(file_github_com_containerd_ttrpc_request_proto_rawDescData) + }) + return file_github_com_containerd_ttrpc_request_proto_rawDescData +} + +var file_github_com_containerd_ttrpc_request_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_github_com_containerd_ttrpc_request_proto_goTypes = []interface{}{ + (*Request)(nil), // 0: ttrpc.Request + (*Response)(nil), // 1: ttrpc.Response + (*StringList)(nil), // 2: ttrpc.StringList + (*KeyValue)(nil), // 3: ttrpc.KeyValue + (*status.Status)(nil), // 4: Status +} +var file_github_com_containerd_ttrpc_request_proto_depIdxs = []int32{ + 3, // 0: ttrpc.Request.metadata:type_name -> ttrpc.KeyValue + 4, // 1: ttrpc.Response.status:type_name -> Status + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_github_com_containerd_ttrpc_request_proto_init() } +func file_github_com_containerd_ttrpc_request_proto_init() { + if File_github_com_containerd_ttrpc_request_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_github_com_containerd_ttrpc_request_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Request); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_github_com_containerd_ttrpc_request_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Response); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_github_com_containerd_ttrpc_request_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StringList); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_github_com_containerd_ttrpc_request_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*KeyValue); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_github_com_containerd_ttrpc_request_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_github_com_containerd_ttrpc_request_proto_goTypes, + DependencyIndexes: file_github_com_containerd_ttrpc_request_proto_depIdxs, + MessageInfos: file_github_com_containerd_ttrpc_request_proto_msgTypes, + }.Build() + File_github_com_containerd_ttrpc_request_proto = out.File + file_github_com_containerd_ttrpc_request_proto_rawDesc = nil + file_github_com_containerd_ttrpc_request_proto_goTypes = nil + file_github_com_containerd_ttrpc_request_proto_depIdxs = nil +} diff --git a/vendor/github.com/containerd/ttrpc/request.proto b/vendor/github.com/containerd/ttrpc/request.proto new file mode 100644 index 0000000000..37da334fc2 --- /dev/null +++ b/vendor/github.com/containerd/ttrpc/request.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; + +package ttrpc; + +import "proto/status.proto"; + +option go_package = "github.com/containerd/ttrpc"; + +message Request { + string service = 1; + string method = 2; + bytes payload = 3; + int64 timeout_nano = 4; + repeated KeyValue metadata = 5; +} + +message Response { + Status status = 1; + bytes payload = 2; +} + +message StringList { + repeated string list = 1; +} + +message KeyValue { + string key = 1; + string value = 2; +} diff --git a/vendor/github.com/containerd/ttrpc/server.go b/vendor/github.com/containerd/ttrpc/server.go index e4c07b60fb..7af59f828e 100644 --- a/vendor/github.com/containerd/ttrpc/server.go +++ b/vendor/github.com/containerd/ttrpc/server.go @@ -32,10 +32,6 @@ import ( "google.golang.org/grpc/status" ) -var ( - ErrServerClosed = errors.New("ttrpc: server closed") -) - type Server struct { config *serverConfig services *serviceSet @@ -67,8 +63,14 @@ func NewServer(opts ...ServerOpt) (*Server, error) { }, nil } +// Register registers a map of methods to method handlers +// TODO: Remove in 2.0, does not support streams func (s *Server) Register(name string, methods map[string]Method) { - s.services.register(name, methods) + s.services.register(name, &ServiceDesc{Methods: methods}) +} + +func (s *Server) RegisterService(name string, desc *ServiceDesc) { + s.services.register(name, desc) } func (s *Server) Serve(ctx context.Context, l net.Listener) error { @@ -119,12 +121,18 @@ func (s *Server) Serve(ctx context.Context, l net.Listener) error { approved, handshake, err := handshaker.Handshake(ctx, conn) if err != nil { - logrus.WithError(err).Errorf("ttrpc: refusing connection after handshake") + logrus.WithError(err).Error("ttrpc: refusing connection after handshake") + conn.Close() + continue + } + + sc, err := s.newConn(approved, handshake) + if err != nil { + logrus.WithError(err).Error("ttrpc: create connection failed") conn.Close() continue } - sc := s.newConn(approved, handshake) go sc.run(ctx) } } @@ -143,15 +151,20 @@ func (s *Server) Shutdown(ctx context.Context) error { ticker := time.NewTicker(200 * time.Millisecond) defer ticker.Stop() for { - if s.closeIdleConns() { - return lnerr + s.closeIdleConns() + + if s.countConnection() == 0 { + break } + select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: } } + + return lnerr } // Close the server without waiting for active connections. @@ -203,11 +216,18 @@ func (s *Server) closeListeners() error { return err } -func (s *Server) addConnection(c *serverConn) { +func (s *Server) addConnection(c *serverConn) error { s.mu.Lock() defer s.mu.Unlock() + select { + case <-s.done: + return ErrServerClosed + default: + } + s.connections[c] = struct{}{} + return nil } func (s *Server) delConnection(c *serverConn) { @@ -224,20 +244,17 @@ func (s *Server) countConnection() int { return len(s.connections) } -func (s *Server) closeIdleConns() bool { +func (s *Server) closeIdleConns() { s.mu.Lock() defer s.mu.Unlock() - quiescent := true + for c := range s.connections { - st, ok := c.getState() - if !ok || st != connStateIdle { - quiescent = false + if st, ok := c.getState(); !ok || st == connStateActive { continue } c.close() delete(s.connections, c) } - return quiescent } type connState int @@ -261,7 +278,7 @@ func (cs connState) String() string { } } -func (s *Server) newConn(conn net.Conn, handshake interface{}) *serverConn { +func (s *Server) newConn(conn net.Conn, handshake interface{}) (*serverConn, error) { c := &serverConn{ server: s, conn: conn, @@ -269,8 +286,11 @@ func (s *Server) newConn(conn net.Conn, handshake interface{}) *serverConn { shutdown: make(chan struct{}), } c.setState(connStateIdle) - s.addConnection(c) - return c + if err := s.addConnection(c); err != nil { + c.close() + return nil, err + } + return c, nil } type serverConn struct { @@ -302,27 +322,25 @@ func (c *serverConn) close() error { func (c *serverConn) run(sctx context.Context) { type ( - request struct { - id uint32 - req *Request - } - response struct { - id uint32 - resp *Response + id uint32 + status *status.Status + data []byte + closeStream bool + streaming bool } ) var ( - ch = newChannel(c.conn) - ctx, cancel = context.WithCancel(sctx) - active int - state connState = connStateIdle - responses = make(chan response) - requests = make(chan request) - recvErr = make(chan error, 1) - shutdown = c.shutdown - done = make(chan struct{}) + ch = newChannel(c.conn) + ctx, cancel = context.WithCancel(sctx) + state connState = connStateIdle + responses = make(chan response) + recvErr = make(chan error, 1) + done = make(chan struct{}) + streams = sync.Map{} + active int32 + lastStreamID uint32 ) defer c.conn.Close() @@ -330,27 +348,26 @@ func (c *serverConn) run(sctx context.Context) { defer close(done) defer c.server.delConnection(c) + sendStatus := func(id uint32, st *status.Status) bool { + select { + case responses <- response{ + // even though we've had an invalid stream id, we send it + // back on the same stream id so the client knows which + // stream id was bad. + id: id, + status: st, + closeStream: true, + }: + return true + case <-c.shutdown: + return false + case <-done: + return false + } + } + go func(recvErr chan error) { defer close(recvErr) - sendImmediate := func(id uint32, st *status.Status) bool { - select { - case responses <- response{ - // even though we've had an invalid stream id, we send it - // back on the same stream id so the client knows which - // stream id was bad. - id: id, - resp: &Response{ - Status: st.Proto(), - }, - }: - return true - case <-c.shutdown: - return false - case <-done: - return false - } - } - for { select { case <-c.shutdown: @@ -370,110 +387,173 @@ func (c *serverConn) run(sctx context.Context) { // in this case, we send an error for that particular message // when the status is defined. - if !sendImmediate(mh.StreamID, status) { + if !sendStatus(mh.StreamID, status) { return } continue } - if mh.Type != messageTypeRequest { - // we must ignore this for future compat. - continue - } - - var req Request - if err := c.server.codec.Unmarshal(p, &req); err != nil { - ch.putmbuf(p) - if !sendImmediate(mh.StreamID, status.Newf(codes.InvalidArgument, "unmarshal request error: %v", err)) { - return - } - continue - } - ch.putmbuf(p) - if mh.StreamID%2 != 1 { // enforce odd client initiated identifiers. - if !sendImmediate(mh.StreamID, status.Newf(codes.InvalidArgument, "StreamID must be odd for client initiated streams")) { + if !sendStatus(mh.StreamID, status.Newf(codes.InvalidArgument, "StreamID must be odd for client initiated streams")) { return } continue } - // Forward the request to the main loop. We don't wait on s.done - // because we have already accepted the client request. - select { - case requests <- request{ - id: mh.StreamID, - req: &req, - }: - case <-done: - return + if mh.Type == messageTypeData { + i, ok := streams.Load(mh.StreamID) + if !ok { + if !sendStatus(mh.StreamID, status.Newf(codes.InvalidArgument, "StreamID is no longer active")) { + return + } + } + sh := i.(*streamHandler) + if mh.Flags&flagNoData != flagNoData { + unmarshal := func(obj interface{}) error { + err := protoUnmarshal(p, obj) + ch.putmbuf(p) + return err + } + + if err := sh.data(unmarshal); err != nil { + if !sendStatus(mh.StreamID, status.Newf(codes.InvalidArgument, "data handling error: %v", err)) { + return + } + } + } + + if mh.Flags&flagRemoteClosed == flagRemoteClosed { + sh.closeSend() + if len(p) > 0 { + if !sendStatus(mh.StreamID, status.Newf(codes.InvalidArgument, "data close message cannot include data")) { + return + } + } + } + } else if mh.Type == messageTypeRequest { + if mh.StreamID <= lastStreamID { + // enforce odd client initiated identifiers. + if !sendStatus(mh.StreamID, status.Newf(codes.InvalidArgument, "StreamID cannot be re-used and must increment")) { + return + } + continue + + } + lastStreamID = mh.StreamID + + // TODO: Make request type configurable + // Unmarshaller which takes in a byte array and returns an interface? + var req Request + if err := c.server.codec.Unmarshal(p, &req); err != nil { + ch.putmbuf(p) + if !sendStatus(mh.StreamID, status.Newf(codes.InvalidArgument, "unmarshal request error: %v", err)) { + return + } + continue + } + ch.putmbuf(p) + + id := mh.StreamID + respond := func(status *status.Status, data []byte, streaming, closeStream bool) error { + select { + case responses <- response{ + id: id, + status: status, + data: data, + closeStream: closeStream, + streaming: streaming, + }: + case <-done: + return ErrClosed + } + return nil + } + sh, err := c.server.services.handle(ctx, &req, respond) + if err != nil { + status, _ := status.FromError(err) + if !sendStatus(mh.StreamID, status) { + return + } + continue + } + + streams.Store(id, sh) + atomic.AddInt32(&active, 1) } + // TODO: else we must ignore this for future compat. log this? } }(recvErr) for { - newstate := state - switch { - case active > 0: + var ( + newstate connState + shutdown chan struct{} + ) + + activeN := atomic.LoadInt32(&active) + if activeN > 0 { newstate = connStateActive shutdown = nil - case active == 0: + } else { newstate = connStateIdle shutdown = c.shutdown // only enable this branch in idle mode } - if newstate != state { c.setState(newstate) state = newstate } select { - case request := <-requests: - active++ - go func(id uint32) { - ctx, cancel := getRequestContext(ctx, request.req) - defer cancel() - - p, status := c.server.services.call(ctx, request.req.Service, request.req.Method, request.req.Payload) - resp := &Response{ - Status: status.Proto(), - Payload: p, - } - - select { - case responses <- response{ - id: id, - resp: resp, - }: - case <-done: - } - }(request.id) case response := <-responses: - p, err := c.server.codec.Marshal(response.resp) - if err != nil { - logrus.WithError(err).Error("failed marshaling response") - return + if !response.streaming || response.status.Code() != codes.OK { + p, err := c.server.codec.Marshal(&Response{ + Status: response.status.Proto(), + Payload: response.data, + }) + if err != nil { + logrus.WithError(err).Error("failed marshaling response") + return + } + + if err := ch.send(response.id, messageTypeResponse, 0, p); err != nil { + logrus.WithError(err).Error("failed sending message on channel") + return + } + } else { + var flags uint8 + if response.closeStream { + flags = flagRemoteClosed + } + if response.data == nil { + flags = flags | flagNoData + } + if err := ch.send(response.id, messageTypeData, flags, response.data); err != nil { + logrus.WithError(err).Error("failed sending message on channel") + return + } } - if err := ch.send(response.id, messageTypeResponse, p); err != nil { - logrus.WithError(err).Error("failed sending message on channel") - return + if response.closeStream { + // The ttrpc protocol currently does not support the case where + // the server is localClosed but not remoteClosed. Once the server + // is closing, the whole stream may be considered finished + streams.Delete(response.id) + atomic.AddInt32(&active, -1) } - - active-- case err := <-recvErr: // TODO(stevvooe): Not wildly clear what we should do in this // branch. Basically, it means that we are no longer receiving // requests due to a terminal error. recvErr = nil // connection is now "closing" - if err == io.EOF || err == io.ErrUnexpectedEOF || errors.Is(err, syscall.ECONNRESET) { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.ECONNRESET) { // The client went away and we should stop processing // requests, so that the client connection is closed return } logrus.WithError(err).Error("error receiving message") + // else, initiate shutdown case <-shutdown: return } diff --git a/vendor/github.com/containerd/ttrpc/services.go b/vendor/github.com/containerd/ttrpc/services.go index f359e9611f..6aabfbb4d1 100644 --- a/vendor/github.com/containerd/ttrpc/services.go +++ b/vendor/github.com/containerd/ttrpc/services.go @@ -25,43 +25,62 @@ import ( "path" "unsafe" - "github.com/gogo/protobuf/proto" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" ) type Method func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) +type StreamHandler func(context.Context, StreamServer) (interface{}, error) + +type Stream struct { + Handler StreamHandler + StreamingClient bool + StreamingServer bool +} + type ServiceDesc struct { Methods map[string]Method - - // TODO(stevvooe): Add stream support. + Streams map[string]Stream } type serviceSet struct { - services map[string]ServiceDesc - interceptor UnaryServerInterceptor + services map[string]*ServiceDesc + unaryInterceptor UnaryServerInterceptor + streamInterceptor StreamServerInterceptor } func newServiceSet(interceptor UnaryServerInterceptor) *serviceSet { return &serviceSet{ - services: make(map[string]ServiceDesc), - interceptor: interceptor, + services: make(map[string]*ServiceDesc), + unaryInterceptor: interceptor, + streamInterceptor: defaultStreamServerInterceptor, } } -func (s *serviceSet) register(name string, methods map[string]Method) { +func (s *serviceSet) register(name string, desc *ServiceDesc) { if _, ok := s.services[name]; ok { panic(fmt.Errorf("duplicate service %v registered", name)) } - s.services[name] = ServiceDesc{ - Methods: methods, - } + s.services[name] = desc } -func (s *serviceSet) call(ctx context.Context, serviceName, methodName string, p []byte) ([]byte, *status.Status) { - p, err := s.dispatch(ctx, serviceName, methodName, p) +func (s *serviceSet) unaryCall(ctx context.Context, method Method, info *UnaryServerInfo, data []byte) (p []byte, st *status.Status) { + unmarshal := func(obj interface{}) error { + return protoUnmarshal(data, obj) + } + + resp, err := s.unaryInterceptor(ctx, unmarshal, info, method) + if err == nil { + if isNil(resp) { + err = errors.New("ttrpc: marshal called with nil") + } else { + p, err = protoMarshal(resp) + } + } + st, ok := status.FromError(err) if !ok { st = status.New(convertCode(err), err.Error()) @@ -70,38 +89,142 @@ func (s *serviceSet) call(ctx context.Context, serviceName, methodName string, p return p, st } -func (s *serviceSet) dispatch(ctx context.Context, serviceName, methodName string, p []byte) ([]byte, error) { - method, err := s.resolve(serviceName, methodName) - if err != nil { - return nil, err +func (s *serviceSet) streamCall(ctx context.Context, stream StreamHandler, info *StreamServerInfo, ss StreamServer) (p []byte, st *status.Status) { + resp, err := s.streamInterceptor(ctx, ss, info, stream) + if err == nil { + p, err = protoMarshal(resp) + } + st, ok := status.FromError(err) + if !ok { + st = status.New(convertCode(err), err.Error()) + } + return +} + +func (s *serviceSet) handle(ctx context.Context, req *Request, respond func(*status.Status, []byte, bool, bool) error) (*streamHandler, error) { + srv, ok := s.services[req.Service] + if !ok { + return nil, status.Errorf(codes.Unimplemented, "service %v", req.Service) } - unmarshal := func(obj interface{}) error { - switch v := obj.(type) { - case proto.Message: - if err := proto.Unmarshal(p, v); err != nil { - return status.Errorf(codes.Internal, "ttrpc: error unmarshalling payload: %v", err.Error()) + if method, ok := srv.Methods[req.Method]; ok { + go func() { + ctx, cancel := getRequestContext(ctx, req) + defer cancel() + + info := &UnaryServerInfo{ + FullMethod: fullPath(req.Service, req.Method), } - default: - return status.Errorf(codes.Internal, "ttrpc: error unsupported request type: %T", v) + p, st := s.unaryCall(ctx, method, info, req.Payload) + + respond(st, p, false, true) + }() + return nil, nil + } + if stream, ok := srv.Streams[req.Method]; ok { + ctx, cancel := getRequestContext(ctx, req) + info := &StreamServerInfo{ + FullMethod: fullPath(req.Service, req.Method), + StreamingClient: stream.StreamingClient, + StreamingServer: stream.StreamingServer, } + sh := &streamHandler{ + ctx: ctx, + respond: respond, + recv: make(chan Unmarshaler, 5), + info: info, + } + go func() { + defer cancel() + p, st := s.streamCall(ctx, stream.Handler, info, sh) + respond(st, p, stream.StreamingServer, true) + }() + + if req.Payload != nil { + unmarshal := func(obj interface{}) error { + return protoUnmarshal(req.Payload, obj) + } + if err := sh.data(unmarshal); err != nil { + return nil, err + } + } + + return sh, nil + } + return nil, status.Errorf(codes.Unimplemented, "method %v", req.Method) +} + +type streamHandler struct { + ctx context.Context + respond func(*status.Status, []byte, bool, bool) error + recv chan Unmarshaler + info *StreamServerInfo + + remoteClosed bool + localClosed bool +} + +func (s *streamHandler) closeSend() { + if !s.remoteClosed { + s.remoteClosed = true + close(s.recv) + } +} + +func (s *streamHandler) data(unmarshal Unmarshaler) error { + if s.remoteClosed { + return ErrStreamClosed + } + select { + case s.recv <- unmarshal: return nil + case <-s.ctx.Done(): + return s.ctx.Err() } +} - info := &UnaryServerInfo{ - FullMethod: fullPath(serviceName, methodName), +func (s *streamHandler) SendMsg(m interface{}) error { + if s.localClosed { + return ErrStreamClosed } - - resp, err := s.interceptor(ctx, unmarshal, info, method) + p, err := protoMarshal(m) if err != nil { - return nil, err + return err + } + return s.respond(nil, p, true, false) +} + +func (s *streamHandler) RecvMsg(m interface{}) error { + select { + case unmarshal, ok := <-s.recv: + if !ok { + return io.EOF + } + return unmarshal(m) + case <-s.ctx.Done(): + return s.ctx.Err() + + } +} + +func protoUnmarshal(p []byte, obj interface{}) error { + switch v := obj.(type) { + case proto.Message: + if err := proto.Unmarshal(p, v); err != nil { + return status.Errorf(codes.Internal, "ttrpc: error unmarshalling payload: %v", err.Error()) + } + default: + return status.Errorf(codes.Internal, "ttrpc: error unsupported request type: %T", v) + } + return nil +} + +func protoMarshal(obj interface{}) ([]byte, error) { + if obj == nil { + return nil, nil } - if isNil(resp) { - return nil, errors.New("ttrpc: marshal called with nil") - } - - switch v := resp.(type) { + switch v := obj.(type) { case proto.Message: r, err := proto.Marshal(v) if err != nil { @@ -114,20 +237,6 @@ func (s *serviceSet) dispatch(ctx context.Context, serviceName, methodName strin } } -func (s *serviceSet) resolve(service, method string) (Method, error) { - srv, ok := s.services[service] - if !ok { - return nil, status.Errorf(codes.Unimplemented, "service %v", service) - } - - mthd, ok := srv.Methods[method] - if !ok { - return nil, status.Errorf(codes.Unimplemented, "method %v", method) - } - - return mthd, nil -} - // convertCode maps stdlib go errors into grpc space. // // This is ripped from the grpc-go code base. diff --git a/vendor/github.com/containerd/ttrpc/stream.go b/vendor/github.com/containerd/ttrpc/stream.go new file mode 100644 index 0000000000..739a4c9675 --- /dev/null +++ b/vendor/github.com/containerd/ttrpc/stream.go @@ -0,0 +1,84 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package ttrpc + +import ( + "context" + "sync" +) + +type streamID uint32 + +type streamMessage struct { + header messageHeader + payload []byte +} + +type stream struct { + id streamID + sender sender + recv chan *streamMessage + + closeOnce sync.Once + recvErr error + recvClose chan struct{} +} + +func newStream(id streamID, send sender) *stream { + return &stream{ + id: id, + sender: send, + recv: make(chan *streamMessage, 1), + recvClose: make(chan struct{}), + } +} + +func (s *stream) closeWithError(err error) error { + s.closeOnce.Do(func() { + if err != nil { + s.recvErr = err + } else { + s.recvErr = ErrClosed + } + close(s.recvClose) + }) + return nil +} + +func (s *stream) send(mt messageType, flags uint8, b []byte) error { + return s.sender.send(uint32(s.id), mt, flags, b) +} + +func (s *stream) receive(ctx context.Context, msg *streamMessage) error { + select { + case <-s.recvClose: + return s.recvErr + default: + } + select { + case <-s.recvClose: + return s.recvErr + case s.recv <- msg: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +type sender interface { + send(uint32, messageType, uint8, []byte) error +} diff --git a/vendor/github.com/containerd/ttrpc/stream_server.go b/vendor/github.com/containerd/ttrpc/stream_server.go new file mode 100644 index 0000000000..b6d1ba720a --- /dev/null +++ b/vendor/github.com/containerd/ttrpc/stream_server.go @@ -0,0 +1,22 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package ttrpc + +type StreamServer interface { + SendMsg(m interface{}) error + RecvMsg(m interface{}) error +} diff --git a/vendor/github.com/containerd/ttrpc/test.proto b/vendor/github.com/containerd/ttrpc/test.proto new file mode 100644 index 0000000000..0e114d5568 --- /dev/null +++ b/vendor/github.com/containerd/ttrpc/test.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package ttrpc; + +option go_package = "github.com/containerd/ttrpc/internal"; + +message TestPayload { + string foo = 1; + int64 deadline = 2; + string metadata = 3; +} + +message EchoPayload { + int64 seq = 1; + string msg = 2; +} diff --git a/vendor/github.com/containerd/ttrpc/types.go b/vendor/github.com/containerd/ttrpc/types.go deleted file mode 100644 index 9a1c19a723..0000000000 --- a/vendor/github.com/containerd/ttrpc/types.go +++ /dev/null @@ -1,63 +0,0 @@ -/* - Copyright The containerd Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package ttrpc - -import ( - "fmt" - - spb "google.golang.org/genproto/googleapis/rpc/status" -) - -type Request struct { - Service string `protobuf:"bytes,1,opt,name=service,proto3"` - Method string `protobuf:"bytes,2,opt,name=method,proto3"` - Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"` - TimeoutNano int64 `protobuf:"varint,4,opt,name=timeout_nano,proto3"` - Metadata []*KeyValue `protobuf:"bytes,5,rep,name=metadata,proto3"` -} - -func (r *Request) Reset() { *r = Request{} } -func (r *Request) String() string { return fmt.Sprintf("%+#v", r) } -func (r *Request) ProtoMessage() {} - -type Response struct { - Status *spb.Status `protobuf:"bytes,1,opt,name=status,proto3"` - Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3"` -} - -func (r *Response) Reset() { *r = Response{} } -func (r *Response) String() string { return fmt.Sprintf("%+#v", r) } -func (r *Response) ProtoMessage() {} - -type StringList struct { - List []string `protobuf:"bytes,1,rep,name=list,proto3"` -} - -func (r *StringList) Reset() { *r = StringList{} } -func (r *StringList) String() string { return fmt.Sprintf("%+#v", r) } -func (r *StringList) ProtoMessage() {} - -func makeStringList(item ...string) StringList { return StringList{List: item} } - -type KeyValue struct { - Key string `protobuf:"bytes,1,opt,name=key,proto3"` - Value string `protobuf:"bytes,2,opt,name=value,proto3"` -} - -func (m *KeyValue) Reset() { *m = KeyValue{} } -func (*KeyValue) ProtoMessage() {} -func (m *KeyValue) String() string { return fmt.Sprintf("%+#v", m) } diff --git a/vendor/github.com/containerd/ttrpc/unixcreds_linux.go b/vendor/github.com/containerd/ttrpc/unixcreds_linux.go index a59dad60cd..c82c9f9d4c 100644 --- a/vendor/github.com/containerd/ttrpc/unixcreds_linux.go +++ b/vendor/github.com/containerd/ttrpc/unixcreds_linux.go @@ -29,7 +29,7 @@ import ( type UnixCredentialsFunc func(*unix.Ucred) error -func (fn UnixCredentialsFunc) Handshake(ctx context.Context, conn net.Conn) (net.Conn, interface{}, error) { +func (fn UnixCredentialsFunc) Handshake(_ context.Context, conn net.Conn) (net.Conn, interface{}, error) { uc, err := requireUnixSocket(conn) if err != nil { return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: require unix socket: %w", err) @@ -50,7 +50,7 @@ func (fn UnixCredentialsFunc) Handshake(ctx context.Context, conn net.Conn) (net } if ucredErr != nil { - return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: failed to retrieve socket peer credentials: %w", err) + return nil, nil, fmt.Errorf("ttrpc.UnixCredentialsFunc: failed to retrieve socket peer credentials: %w", ucredErr) } if err := fn(ucred); err != nil { @@ -88,10 +88,6 @@ func UnixSocketRequireSameUser() UnixCredentialsFunc { return UnixSocketRequireUidGid(euid, egid) } -func requireRoot(ucred *unix.Ucred) error { - return requireUidGid(ucred, 0, 0) -} - func requireUidGid(ucred *unix.Ucred, uid, gid int) error { if (uid != -1 && uint32(uid) != ucred.Uid) || (gid != -1 && uint32(gid) != ucred.Gid) { return fmt.Errorf("ttrpc: invalid credentials: %v", syscall.EPERM) diff --git a/vendor/modules.txt b/vendor/modules.txt index c94dc0c11f..b6c8b283b4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -340,7 +340,7 @@ github.com/containerd/nydus-snapshotter/pkg/errdefs ## explicit; go 1.16 github.com/containerd/stargz-snapshotter/estargz github.com/containerd/stargz-snapshotter/estargz/errorutil -# github.com/containerd/ttrpc v1.1.1 +# github.com/containerd/ttrpc v1.2.2 ## explicit; go 1.13 github.com/containerd/ttrpc # github.com/containerd/typeurl v1.0.2