Merge pull request #43887 from thaJeztah/22.06_backport_implicit_runtime_config
[22.06 backport] daemon: support other containerd runtimes (MVP)
This commit is contained in:
commit
4f057d8bb6
29 changed files with 10504 additions and 16 deletions
|
@ -707,8 +707,8 @@ func verifyPlatformContainerSettings(daemon *Daemon, hostConfig *containertypes.
|
|||
hostConfig.Runtime = daemon.configStore.GetDefaultRuntimeName()
|
||||
}
|
||||
|
||||
if rt := daemon.configStore.GetRuntime(hostConfig.Runtime); rt == nil {
|
||||
return warnings, fmt.Errorf("Unknown runtime specified %s", hostConfig.Runtime)
|
||||
if _, err := daemon.getRuntime(hostConfig.Runtime); err != nil {
|
||||
return warnings, err
|
||||
}
|
||||
|
||||
parser := volumemounts.NewParser()
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"strings"
|
||||
|
||||
v2runcoptions "github.com/containerd/containerd/runtime/v2/runc/options"
|
||||
"github.com/containerd/containerd/runtime/v2/shim"
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/daemon/config"
|
||||
"github.com/docker/docker/errdefs"
|
||||
|
@ -116,7 +117,10 @@ func (daemon *Daemon) rewriteRuntimePath(name, p string, args []string) (string,
|
|||
func (daemon *Daemon) getRuntime(name string) (*types.Runtime, error) {
|
||||
rt := daemon.configStore.GetRuntime(name)
|
||||
if rt == nil {
|
||||
return nil, errdefs.InvalidParameter(errors.Errorf("runtime not found in config: %s", name))
|
||||
if !isPermissibleC8dRuntimeName(name) {
|
||||
return nil, errdefs.InvalidParameter(errors.Errorf("unknown or invalid runtime name: %s", name))
|
||||
}
|
||||
return &types.Runtime{Shim: &types.ShimConfig{Binary: name}}, nil
|
||||
}
|
||||
|
||||
if len(rt.Args) > 0 {
|
||||
|
@ -134,3 +138,37 @@ func (daemon *Daemon) getRuntime(name string) (*types.Runtime, error) {
|
|||
|
||||
return rt, nil
|
||||
}
|
||||
|
||||
// isPermissibleC8dRuntimeName tests whether name is safe to pass into
|
||||
// containerd as a runtime name, and whether the name is well-formed.
|
||||
// It does not check if the runtime is installed.
|
||||
//
|
||||
// A runtime name containing slash characters is interpreted by containerd as
|
||||
// the path to a runtime binary. If we allowed this, anyone with Engine API
|
||||
// access could get containerd to execute an arbitrary binary as root. Although
|
||||
// Engine API access is already equivalent to root on the host, the runtime name
|
||||
// has not historically been a vector to run arbitrary code as root so users are
|
||||
// not expecting it to become one.
|
||||
//
|
||||
// This restriction is not configurable. There are viable workarounds for
|
||||
// legitimate use cases: administrators and runtime developers can make runtimes
|
||||
// available for use with Docker by installing them onto PATH following the
|
||||
// [binary naming convention] for containerd Runtime v2.
|
||||
//
|
||||
// [binary naming convention]: https://github.com/containerd/containerd/blob/main/runtime/v2/README.md#binary-naming
|
||||
func isPermissibleC8dRuntimeName(name string) bool {
|
||||
// containerd uses a rather permissive test to validate runtime names:
|
||||
//
|
||||
// - Any name for which filepath.IsAbs(name) is interpreted as the absolute
|
||||
// path to a shim binary. We want to block this behaviour.
|
||||
// - Any name which contains at least one '.' character and no '/' characters
|
||||
// and does not begin with a '.' character is a valid runtime name. The shim
|
||||
// binary name is derived from the final two components of the name and
|
||||
// searched for on the PATH. The name "a.." is technically valid per
|
||||
// containerd's implementation: it would resolve to a binary named
|
||||
// "containerd-shim---".
|
||||
//
|
||||
// https://github.com/containerd/containerd/blob/11ded166c15f92450958078cd13c6d87131ec563/runtime/v2/manager.go#L297-L317
|
||||
// https://github.com/containerd/containerd/blob/11ded166c15f92450958078cd13c6d87131ec563/runtime/v2/shim/util.go#L83-L93
|
||||
return !filepath.IsAbs(name) && !strings.ContainsRune(name, '/') && shim.BinaryName(name) != ""
|
||||
}
|
||||
|
|
107
daemon/runtime_unix_test.go
Normal file
107
daemon/runtime_unix_test.go
Normal file
|
@ -0,0 +1,107 @@
|
|||
//go:build !windows
|
||||
// +build !windows
|
||||
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
v2runcoptions "github.com/containerd/containerd/runtime/v2/runc/options"
|
||||
"gotest.tools/v3/assert"
|
||||
is "gotest.tools/v3/assert/cmp"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/daemon/config"
|
||||
"github.com/docker/docker/errdefs"
|
||||
)
|
||||
|
||||
func TestGetRuntime(t *testing.T) {
|
||||
// Configured runtimes can have any arbitrary name, including names
|
||||
// which would not be allowed as implicit runtime names. Explicit takes
|
||||
// precedence over implicit.
|
||||
const configuredRtName = "my/custom.shim.v1"
|
||||
configuredRuntime := types.Runtime{Path: "/bin/true"}
|
||||
|
||||
d := &Daemon{configStore: config.New()}
|
||||
d.configStore.Root = t.TempDir()
|
||||
assert.Assert(t, os.Mkdir(filepath.Join(d.configStore.Root, "runtimes"), 0700))
|
||||
d.configStore.Runtimes = map[string]types.Runtime{
|
||||
configuredRtName: configuredRuntime,
|
||||
}
|
||||
configureRuntimes(d.configStore)
|
||||
assert.Assert(t, d.loadRuntimes())
|
||||
|
||||
stockRuntime, ok := d.configStore.Runtimes[config.StockRuntimeName]
|
||||
assert.Assert(t, ok, "stock runtime could not be found (test needs to be updated)")
|
||||
|
||||
configdOpts := *stockRuntime.Shim.Opts.(*v2runcoptions.Options)
|
||||
configdOpts.BinaryName = configuredRuntime.Path
|
||||
wantConfigdRuntime := configuredRuntime
|
||||
wantConfigdRuntime.Shim = &types.ShimConfig{
|
||||
Binary: stockRuntime.Shim.Binary,
|
||||
Opts: &configdOpts,
|
||||
}
|
||||
|
||||
for _, tt := range []struct {
|
||||
name, runtime string
|
||||
want *types.Runtime
|
||||
}{
|
||||
{
|
||||
name: "StockRuntime",
|
||||
runtime: config.StockRuntimeName,
|
||||
want: &stockRuntime,
|
||||
},
|
||||
{
|
||||
name: "ShimName",
|
||||
runtime: "io.containerd.my-shim.v42",
|
||||
want: &types.Runtime{Shim: &types.ShimConfig{Binary: "io.containerd.my-shim.v42"}},
|
||||
},
|
||||
{
|
||||
// containerd is pretty loose about the format of runtime names. Perhaps too
|
||||
// loose. The only requirements are that the name contain a dot and (depending
|
||||
// on the containerd version) not start with a dot. It does not enforce any
|
||||
// particular format of the dot-delimited components of the name.
|
||||
name: "VersionlessShimName",
|
||||
runtime: "io.containerd.my-shim",
|
||||
want: &types.Runtime{Shim: &types.ShimConfig{Binary: "io.containerd.my-shim"}},
|
||||
},
|
||||
{
|
||||
name: "IllformedShimName",
|
||||
runtime: "myshim",
|
||||
},
|
||||
{
|
||||
name: "EmptyString",
|
||||
runtime: "",
|
||||
},
|
||||
{
|
||||
name: "PathToShim",
|
||||
runtime: "/path/to/runc",
|
||||
},
|
||||
{
|
||||
name: "PathToShimName",
|
||||
runtime: "/path/to/io.containerd.runc.v2",
|
||||
},
|
||||
{
|
||||
name: "RelPathToShim",
|
||||
runtime: "my/io.containerd.runc.v2",
|
||||
},
|
||||
{
|
||||
name: "ConfiguredRuntime",
|
||||
runtime: configuredRtName,
|
||||
want: &wantConfigdRuntime,
|
||||
},
|
||||
} {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := d.getRuntime(tt.runtime)
|
||||
assert.Check(t, is.DeepEqual(got, tt.want))
|
||||
if tt.want != nil {
|
||||
assert.Check(t, err)
|
||||
} else {
|
||||
assert.Check(t, errdefs.IsInvalidParameter(err))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -39,6 +39,7 @@ import (
|
|||
"github.com/moby/sys/mount"
|
||||
"golang.org/x/sys/unix"
|
||||
"gotest.tools/v3/assert"
|
||||
is "gotest.tools/v3/assert/cmp"
|
||||
"gotest.tools/v3/icmd"
|
||||
"gotest.tools/v3/poll"
|
||||
)
|
||||
|
@ -2384,7 +2385,7 @@ func (s *DockerDaemonSuite) TestRunWithRuntimeFromConfigFile(c *testing.T) {
|
|||
// Run with "vm"
|
||||
out, err = s.d.Cmd("run", "--rm", "--runtime=vm", "busybox", "ls")
|
||||
assert.ErrorContains(c, err, "", out)
|
||||
assert.Assert(c, strings.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
|
||||
assert.Assert(c, is.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
|
||||
// Reset config to only have the default
|
||||
config = `
|
||||
{
|
||||
|
@ -2404,11 +2405,11 @@ func (s *DockerDaemonSuite) TestRunWithRuntimeFromConfigFile(c *testing.T) {
|
|||
// Run with "oci"
|
||||
out, err = s.d.Cmd("run", "--rm", "--runtime=oci", "busybox", "ls")
|
||||
assert.ErrorContains(c, err, "", out)
|
||||
assert.Assert(c, strings.Contains(out, "Unknown runtime specified oci"))
|
||||
assert.Assert(c, is.Contains(out, "unknown or invalid runtime name: oci"))
|
||||
// Start previously created container with oci
|
||||
out, err = s.d.Cmd("start", "oci-runtime-ls")
|
||||
assert.ErrorContains(c, err, "", out)
|
||||
assert.Assert(c, strings.Contains(out, "Unknown runtime specified oci"))
|
||||
assert.Assert(c, is.Contains(out, "unknown or invalid runtime name: oci"))
|
||||
// Check that we can't override the default runtime
|
||||
config = `
|
||||
{
|
||||
|
@ -2426,7 +2427,7 @@ func (s *DockerDaemonSuite) TestRunWithRuntimeFromConfigFile(c *testing.T) {
|
|||
|
||||
content, err := s.d.ReadLogFile()
|
||||
assert.NilError(c, err)
|
||||
assert.Assert(c, strings.Contains(string(content), `file configuration validation failed: runtime name 'runc' is reserved`))
|
||||
assert.Assert(c, is.Contains(string(content), `file configuration validation failed: runtime name 'runc' is reserved`))
|
||||
// Check that we can select a default runtime
|
||||
config = `
|
||||
{
|
||||
|
@ -2451,7 +2452,7 @@ func (s *DockerDaemonSuite) TestRunWithRuntimeFromConfigFile(c *testing.T) {
|
|||
|
||||
out, err = s.d.Cmd("run", "--rm", "busybox", "ls")
|
||||
assert.ErrorContains(c, err, "", out)
|
||||
assert.Assert(c, strings.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
|
||||
assert.Assert(c, is.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
|
||||
// Run with default runtime explicitly
|
||||
out, err = s.d.Cmd("run", "--rm", "--runtime=runc", "busybox", "ls")
|
||||
assert.NilError(c, err, out)
|
||||
|
@ -2475,7 +2476,7 @@ func (s *DockerDaemonSuite) TestRunWithRuntimeFromCommandLine(c *testing.T) {
|
|||
// Run with "vm"
|
||||
out, err = s.d.Cmd("run", "--rm", "--runtime=vm", "busybox", "ls")
|
||||
assert.ErrorContains(c, err, "", out)
|
||||
assert.Assert(c, strings.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
|
||||
assert.Assert(c, is.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
|
||||
// Start a daemon without any extra runtimes
|
||||
s.d.Stop(c)
|
||||
s.d.StartWithBusybox(c)
|
||||
|
@ -2487,25 +2488,25 @@ func (s *DockerDaemonSuite) TestRunWithRuntimeFromCommandLine(c *testing.T) {
|
|||
// Run with "oci"
|
||||
out, err = s.d.Cmd("run", "--rm", "--runtime=oci", "busybox", "ls")
|
||||
assert.ErrorContains(c, err, "", out)
|
||||
assert.Assert(c, strings.Contains(out, "Unknown runtime specified oci"))
|
||||
assert.Assert(c, is.Contains(out, "unknown or invalid runtime name: oci"))
|
||||
// Start previously created container with oci
|
||||
out, err = s.d.Cmd("start", "oci-runtime-ls")
|
||||
assert.ErrorContains(c, err, "", out)
|
||||
assert.Assert(c, strings.Contains(out, "Unknown runtime specified oci"))
|
||||
assert.Assert(c, is.Contains(out, "unknown or invalid runtime name: oci"))
|
||||
// Check that we can't override the default runtime
|
||||
s.d.Stop(c)
|
||||
assert.Assert(c, s.d.StartWithError("--add-runtime", "runc=my-runc") != nil)
|
||||
|
||||
content, err := s.d.ReadLogFile()
|
||||
assert.NilError(c, err)
|
||||
assert.Assert(c, strings.Contains(string(content), `runtime name 'runc' is reserved`))
|
||||
assert.Assert(c, is.Contains(string(content), `runtime name 'runc' is reserved`))
|
||||
// Check that we can select a default runtime
|
||||
s.d.Stop(c)
|
||||
s.d.StartWithBusybox(c, "--default-runtime=vm", "--add-runtime", "oci=runc", "--add-runtime", "vm=/usr/local/bin/vm-manager")
|
||||
|
||||
out, err = s.d.Cmd("run", "--rm", "busybox", "ls")
|
||||
assert.ErrorContains(c, err, "", out)
|
||||
assert.Assert(c, strings.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
|
||||
assert.Assert(c, is.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
|
||||
// Run with default runtime explicitly
|
||||
out, err = s.d.Cmd("run", "--rm", "--runtime=runc", "busybox", "ls")
|
||||
assert.NilError(c, err, out)
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
|
@ -15,7 +16,9 @@ import (
|
|||
"github.com/docker/docker/api/types/versions"
|
||||
"github.com/docker/docker/integration/internal/container"
|
||||
net "github.com/docker/docker/integration/internal/network"
|
||||
"github.com/docker/docker/pkg/stdcopy"
|
||||
"github.com/docker/docker/pkg/system"
|
||||
"github.com/docker/docker/testutil/daemon"
|
||||
"golang.org/x/sys/unix"
|
||||
"gotest.tools/v3/assert"
|
||||
is "gotest.tools/v3/assert/cmp"
|
||||
|
@ -214,3 +217,57 @@ func TestRunConsoleSize(t *testing.T) {
|
|||
|
||||
assert.Equal(t, strings.TrimSpace(b.String()), "123 57")
|
||||
}
|
||||
|
||||
func TestRunWithAlternativeContainerdShim(t *testing.T) {
|
||||
skip.If(t, testEnv.IsRemoteDaemon)
|
||||
skip.If(t, testEnv.DaemonInfo.OSType != "linux")
|
||||
|
||||
realShimPath, err := exec.LookPath("containerd-shim-runc-v2")
|
||||
assert.Assert(t, err)
|
||||
realShimPath, err = filepath.Abs(realShimPath)
|
||||
assert.Assert(t, err)
|
||||
|
||||
// t.TempDir() can't be used here as the temporary directory returned by
|
||||
// that function cannot be accessed by the fake-root user for rootless
|
||||
// Docker. It creates a nested hierarchy of directories where the
|
||||
// outermost has permission 0700.
|
||||
shimDir, err := os.MkdirTemp("", t.Name())
|
||||
assert.Assert(t, err)
|
||||
t.Cleanup(func() {
|
||||
if err := os.RemoveAll(shimDir); err != nil {
|
||||
t.Errorf("shimDir RemoveAll cleanup: %v", err)
|
||||
}
|
||||
})
|
||||
assert.Assert(t, os.Chmod(shimDir, 0777))
|
||||
shimDir, err = filepath.Abs(shimDir)
|
||||
assert.Assert(t, err)
|
||||
assert.Assert(t, os.Symlink(realShimPath, filepath.Join(shimDir, "containerd-shim-realfake-v42")))
|
||||
|
||||
d := daemon.New(t,
|
||||
daemon.WithEnvVars("PATH="+shimDir+":"+os.Getenv("PATH")),
|
||||
daemon.WithContainerdSocket(""), // A new containerd instance needs to be started which inherits the PATH env var defined above.
|
||||
)
|
||||
d.StartWithBusybox(t)
|
||||
defer d.Stop(t)
|
||||
|
||||
client := d.NewClientT(t)
|
||||
ctx := context.Background()
|
||||
|
||||
cID := container.Run(ctx, t, client,
|
||||
container.WithImage("busybox"),
|
||||
container.WithCmd("sh", "-c", `echo 'Hello, world!'`),
|
||||
container.WithRuntime("io.containerd.realfake.v42"),
|
||||
)
|
||||
|
||||
poll.WaitOn(t, container.IsStopped(ctx, client, cID), poll.WithDelay(100*time.Millisecond))
|
||||
|
||||
out, err := client.ContainerLogs(ctx, cID, types.ContainerLogsOptions{ShowStdout: true})
|
||||
assert.NilError(t, err)
|
||||
defer out.Close()
|
||||
|
||||
var b bytes.Buffer
|
||||
_, err = stdcopy.StdCopy(&b, io.Discard, out)
|
||||
assert.NilError(t, err)
|
||||
|
||||
assert.Equal(t, strings.TrimSpace(b.String()), "Hello, world!")
|
||||
}
|
||||
|
|
|
@ -234,3 +234,10 @@ func WithConsoleSize(width, height uint) func(*TestContainerConfig) {
|
|||
c.HostConfig.ConsoleSize = [2]uint{height, width}
|
||||
}
|
||||
}
|
||||
|
||||
// WithRuntime sets the runtime to use to start the container
|
||||
func WithRuntime(name string) func(*TestContainerConfig) {
|
||||
return func(c *TestContainerConfig) {
|
||||
c.HostConfig.Runtime = name
|
||||
}
|
||||
}
|
||||
|
|
|
@ -76,6 +76,7 @@ type Daemon struct {
|
|||
log LogT
|
||||
pidFile string
|
||||
args []string
|
||||
extraEnv []string
|
||||
containerdSocket string
|
||||
rootlessUser *user.User
|
||||
rootlessXDGRuntimeDir string
|
||||
|
@ -334,9 +335,10 @@ func (d *Daemon) StartWithLogFile(out *os.File, providedArgs ...string) error {
|
|||
dockerdBinary = "sudo"
|
||||
d.args = append(d.args,
|
||||
"-u", d.rootlessUser.Username,
|
||||
"-E", "XDG_RUNTIME_DIR="+d.rootlessXDGRuntimeDir,
|
||||
"-E", "HOME="+d.rootlessUser.HomeDir,
|
||||
"-E", "PATH="+os.Getenv("PATH"),
|
||||
"--preserve-env",
|
||||
"--preserve-env=PATH", // Pass through PATH, overriding secure_path.
|
||||
"XDG_RUNTIME_DIR="+d.rootlessXDGRuntimeDir,
|
||||
"HOME="+d.rootlessUser.HomeDir,
|
||||
"--",
|
||||
defaultDockerdRootlessBinary,
|
||||
)
|
||||
|
@ -392,6 +394,7 @@ func (d *Daemon) StartWithLogFile(out *os.File, providedArgs ...string) error {
|
|||
d.args = append(d.args, providedArgs...)
|
||||
d.cmd = exec.Command(dockerdBinary, d.args...)
|
||||
d.cmd.Env = append(os.Environ(), "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE=1")
|
||||
d.cmd.Env = append(d.cmd.Env, d.extraEnv...)
|
||||
d.cmd.Stdout = out
|
||||
d.cmd.Stderr = out
|
||||
d.logFile = out
|
||||
|
|
|
@ -122,3 +122,10 @@ func WithOOMScoreAdjust(score int) Option {
|
|||
d.OOMScoreAdjust = score
|
||||
}
|
||||
}
|
||||
|
||||
// WithEnvVars sets additional environment variables for the daemon
|
||||
func WithEnvVars(vars ...string) Option {
|
||||
return func(d *Daemon) {
|
||||
d.extraEnv = append(d.extraEnv, vars...)
|
||||
}
|
||||
}
|
||||
|
|
18
vendor/github.com/containerd/containerd/api/services/ttrpc/events/v1/doc.go
generated
vendored
Normal file
18
vendor/github.com/containerd/containerd/api/services/ttrpc/events/v1/doc.go
generated
vendored
Normal file
|
@ -0,0 +1,18 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package events defines the ttrpc event service.
|
||||
package events
|
760
vendor/github.com/containerd/containerd/api/services/ttrpc/events/v1/events.pb.go
generated
vendored
Normal file
760
vendor/github.com/containerd/containerd/api/services/ttrpc/events/v1/events.pb.go
generated
vendored
Normal file
|
@ -0,0 +1,760 @@
|
|||
// Code generated by protoc-gen-gogo. DO NOT EDIT.
|
||||
// source: github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
context "context"
|
||||
fmt "fmt"
|
||||
github_com_containerd_ttrpc "github.com/containerd/ttrpc"
|
||||
github_com_containerd_typeurl "github.com/containerd/typeurl"
|
||||
proto "github.com/gogo/protobuf/proto"
|
||||
github_com_gogo_protobuf_types "github.com/gogo/protobuf/types"
|
||||
types "github.com/gogo/protobuf/types"
|
||||
io "io"
|
||||
math "math"
|
||||
math_bits "math/bits"
|
||||
reflect "reflect"
|
||||
strings "strings"
|
||||
time "time"
|
||||
)
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
var _ = time.Kitchen
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the proto package it is being compiled against.
|
||||
// A compilation error at this line likely means your copy of the
|
||||
// proto package needs to be updated.
|
||||
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
|
||||
|
||||
type ForwardRequest struct {
|
||||
Envelope *Envelope `protobuf:"bytes,1,opt,name=envelope,proto3" json:"envelope,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (m *ForwardRequest) Reset() { *m = ForwardRequest{} }
|
||||
func (*ForwardRequest) ProtoMessage() {}
|
||||
func (*ForwardRequest) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_19f98672016720b5, []int{0}
|
||||
}
|
||||
func (m *ForwardRequest) XXX_Unmarshal(b []byte) error {
|
||||
return m.Unmarshal(b)
|
||||
}
|
||||
func (m *ForwardRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
if deterministic {
|
||||
return xxx_messageInfo_ForwardRequest.Marshal(b, m, deterministic)
|
||||
} else {
|
||||
b = b[:cap(b)]
|
||||
n, err := m.MarshalToSizedBuffer(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b[:n], nil
|
||||
}
|
||||
}
|
||||
func (m *ForwardRequest) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_ForwardRequest.Merge(m, src)
|
||||
}
|
||||
func (m *ForwardRequest) XXX_Size() int {
|
||||
return m.Size()
|
||||
}
|
||||
func (m *ForwardRequest) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_ForwardRequest.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_ForwardRequest proto.InternalMessageInfo
|
||||
|
||||
type Envelope struct {
|
||||
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"timestamp"`
|
||||
Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"`
|
||||
Topic string `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"`
|
||||
Event *types.Any `protobuf:"bytes,4,opt,name=event,proto3" json:"event,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Envelope) Reset() { *m = Envelope{} }
|
||||
func (*Envelope) ProtoMessage() {}
|
||||
func (*Envelope) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_19f98672016720b5, []int{1}
|
||||
}
|
||||
func (m *Envelope) XXX_Unmarshal(b []byte) error {
|
||||
return m.Unmarshal(b)
|
||||
}
|
||||
func (m *Envelope) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
if deterministic {
|
||||
return xxx_messageInfo_Envelope.Marshal(b, m, deterministic)
|
||||
} else {
|
||||
b = b[:cap(b)]
|
||||
n, err := m.MarshalToSizedBuffer(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b[:n], nil
|
||||
}
|
||||
}
|
||||
func (m *Envelope) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_Envelope.Merge(m, src)
|
||||
}
|
||||
func (m *Envelope) XXX_Size() int {
|
||||
return m.Size()
|
||||
}
|
||||
func (m *Envelope) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_Envelope.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_Envelope proto.InternalMessageInfo
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*ForwardRequest)(nil), "containerd.services.events.ttrpc.v1.ForwardRequest")
|
||||
proto.RegisterType((*Envelope)(nil), "containerd.services.events.ttrpc.v1.Envelope")
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterFile("github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto", fileDescriptor_19f98672016720b5)
|
||||
}
|
||||
|
||||
var fileDescriptor_19f98672016720b5 = []byte{
|
||||
// 396 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x52, 0xc1, 0x8e, 0xd3, 0x30,
|
||||
0x10, 0x8d, 0x61, 0x77, 0x69, 0x8d, 0xc4, 0xc1, 0xaa, 0x50, 0x08, 0x28, 0x59, 0x2d, 0x97, 0x15,
|
||||
0x12, 0xb6, 0x76, 0xf7, 0x06, 0x17, 0xa8, 0x28, 0x12, 0x1c, 0x23, 0x84, 0x2a, 0x90, 0x10, 0x6e,
|
||||
0x3a, 0x4d, 0x2d, 0x25, 0xb6, 0x49, 0x9c, 0xa0, 0xde, 0xfa, 0x09, 0x7c, 0x0c, 0x17, 0xfe, 0xa0,
|
||||
0x47, 0x8e, 0x9c, 0x80, 0xe6, 0x4b, 0x50, 0x9d, 0xa4, 0x81, 0xf6, 0x40, 0xa5, 0xbd, 0xbd, 0xcc,
|
||||
0x7b, 0x6f, 0xde, 0xcc, 0xc4, 0xf8, 0x75, 0x2c, 0xcc, 0xbc, 0x98, 0xd0, 0x48, 0xa5, 0x2c, 0x52,
|
||||
0xd2, 0x70, 0x21, 0x21, 0x9b, 0xfe, 0x0d, 0xb9, 0x16, 0x2c, 0x87, 0xac, 0x14, 0x11, 0xe4, 0xcc,
|
||||
0x98, 0x4c, 0x47, 0x0c, 0x4a, 0x90, 0x26, 0x67, 0xe5, 0x45, 0x83, 0xa8, 0xce, 0x94, 0x51, 0xe4,
|
||||
0x61, 0xe7, 0xa2, 0xad, 0x83, 0x36, 0x0a, 0x6b, 0xa4, 0xe5, 0x85, 0xf7, 0xec, 0xbf, 0x81, 0xb6,
|
||||
0xd9, 0xa4, 0x98, 0x31, 0x9d, 0x14, 0xb1, 0x90, 0x6c, 0x26, 0x20, 0x99, 0x6a, 0x6e, 0xe6, 0x75,
|
||||
0x8c, 0x37, 0x88, 0x55, 0xac, 0x2c, 0x64, 0x1b, 0xd4, 0x54, 0xef, 0xc5, 0x4a, 0xc5, 0x09, 0x74,
|
||||
0x6e, 0x2e, 0x17, 0x0d, 0x75, 0x7f, 0x97, 0x82, 0x54, 0x9b, 0x96, 0x0c, 0x76, 0x49, 0x23, 0x52,
|
||||
0xc8, 0x0d, 0x4f, 0x75, 0x2d, 0x38, 0x7b, 0x8f, 0xef, 0xbc, 0x54, 0xd9, 0x67, 0x9e, 0x4d, 0x43,
|
||||
0xf8, 0x54, 0x40, 0x6e, 0xc8, 0x2b, 0xdc, 0x03, 0x59, 0x42, 0xa2, 0x34, 0xb8, 0xe8, 0x14, 0x9d,
|
||||
0xdf, 0xbe, 0x7c, 0x4c, 0x0f, 0x58, 0x9d, 0x8e, 0x1a, 0x53, 0xb8, 0xb5, 0x9f, 0x7d, 0x45, 0xb8,
|
||||
0xd7, 0x96, 0xc9, 0x10, 0xf7, 0xb7, 0xe1, 0x4d, 0x63, 0x8f, 0xd6, 0xe3, 0xd1, 0x76, 0x3c, 0xfa,
|
||||
0xa6, 0x55, 0x0c, 0x7b, 0xab, 0x9f, 0x81, 0xf3, 0xe5, 0x57, 0x80, 0xc2, 0xce, 0x46, 0x1e, 0xe0,
|
||||
0xbe, 0xe4, 0x29, 0xe4, 0x9a, 0x47, 0xe0, 0xde, 0x38, 0x45, 0xe7, 0xfd, 0xb0, 0x2b, 0x90, 0x01,
|
||||
0x3e, 0x36, 0x4a, 0x8b, 0xc8, 0xbd, 0x69, 0x99, 0xfa, 0x83, 0x3c, 0xc2, 0xc7, 0x76, 0x54, 0xf7,
|
||||
0xc8, 0x66, 0x0e, 0xf6, 0x32, 0x9f, 0xcb, 0x45, 0x58, 0x4b, 0x9e, 0x1c, 0x2d, 0xbf, 0x05, 0xe8,
|
||||
0xf2, 0x23, 0x3e, 0x19, 0xd9, 0xe5, 0xc8, 0x5b, 0x7c, 0xab, 0xb9, 0x0e, 0xb9, 0x3a, 0xe8, 0x08,
|
||||
0xff, 0xde, 0xd2, 0xbb, 0xbb, 0x17, 0x36, 0xda, 0xfc, 0x9c, 0xe1, 0x87, 0xd5, 0xda, 0x77, 0x7e,
|
||||
0xac, 0x7d, 0x67, 0x59, 0xf9, 0x68, 0x55, 0xf9, 0xe8, 0x7b, 0xe5, 0xa3, 0xdf, 0x95, 0x8f, 0xde,
|
||||
0xbd, 0xb8, 0xd6, 0x8b, 0x7d, 0x5a, 0xa3, 0xb1, 0x33, 0x46, 0x93, 0x13, 0x9b, 0x79, 0xf5, 0x27,
|
||||
0x00, 0x00, 0xff, 0xff, 0xd4, 0x90, 0xbd, 0x09, 0x04, 0x03, 0x00, 0x00,
|
||||
}
|
||||
|
||||
// Field returns the value for the given fieldpath as a string, if defined.
|
||||
// If the value is not defined, the second value will be false.
|
||||
func (m *Envelope) Field(fieldpath []string) (string, bool) {
|
||||
if len(fieldpath) == 0 {
|
||||
return "", false
|
||||
}
|
||||
|
||||
switch fieldpath[0] {
|
||||
// unhandled: timestamp
|
||||
case "namespace":
|
||||
return string(m.Namespace), len(m.Namespace) > 0
|
||||
case "topic":
|
||||
return string(m.Topic), len(m.Topic) > 0
|
||||
case "event":
|
||||
decoded, err := github_com_containerd_typeurl.UnmarshalAny(m.Event)
|
||||
if err != nil {
|
||||
return "", false
|
||||
}
|
||||
|
||||
adaptor, ok := decoded.(interface{ Field([]string) (string, bool) })
|
||||
if !ok {
|
||||
return "", false
|
||||
}
|
||||
return adaptor.Field(fieldpath[1:])
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
func (m *ForwardRequest) Marshal() (dAtA []byte, err error) {
|
||||
size := m.Size()
|
||||
dAtA = make([]byte, size)
|
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dAtA[:n], nil
|
||||
}
|
||||
|
||||
func (m *ForwardRequest) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *ForwardRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
if m.XXX_unrecognized != nil {
|
||||
i -= len(m.XXX_unrecognized)
|
||||
copy(dAtA[i:], m.XXX_unrecognized)
|
||||
}
|
||||
if m.Envelope != nil {
|
||||
{
|
||||
size, err := m.Envelope.MarshalToSizedBuffer(dAtA[:i])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
i -= size
|
||||
i = encodeVarintEvents(dAtA, i, uint64(size))
|
||||
}
|
||||
i--
|
||||
dAtA[i] = 0xa
|
||||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func (m *Envelope) Marshal() (dAtA []byte, err error) {
|
||||
size := m.Size()
|
||||
dAtA = make([]byte, size)
|
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dAtA[:n], nil
|
||||
}
|
||||
|
||||
func (m *Envelope) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *Envelope) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
if m.XXX_unrecognized != nil {
|
||||
i -= len(m.XXX_unrecognized)
|
||||
copy(dAtA[i:], m.XXX_unrecognized)
|
||||
}
|
||||
if m.Event != nil {
|
||||
{
|
||||
size, err := m.Event.MarshalToSizedBuffer(dAtA[:i])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
i -= size
|
||||
i = encodeVarintEvents(dAtA, i, uint64(size))
|
||||
}
|
||||
i--
|
||||
dAtA[i] = 0x22
|
||||
}
|
||||
if len(m.Topic) > 0 {
|
||||
i -= len(m.Topic)
|
||||
copy(dAtA[i:], m.Topic)
|
||||
i = encodeVarintEvents(dAtA, i, uint64(len(m.Topic)))
|
||||
i--
|
||||
dAtA[i] = 0x1a
|
||||
}
|
||||
if len(m.Namespace) > 0 {
|
||||
i -= len(m.Namespace)
|
||||
copy(dAtA[i:], m.Namespace)
|
||||
i = encodeVarintEvents(dAtA, i, uint64(len(m.Namespace)))
|
||||
i--
|
||||
dAtA[i] = 0x12
|
||||
}
|
||||
n3, err3 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp):])
|
||||
if err3 != nil {
|
||||
return 0, err3
|
||||
}
|
||||
i -= n3
|
||||
i = encodeVarintEvents(dAtA, i, uint64(n3))
|
||||
i--
|
||||
dAtA[i] = 0xa
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func encodeVarintEvents(dAtA []byte, offset int, v uint64) int {
|
||||
offset -= sovEvents(v)
|
||||
base := offset
|
||||
for v >= 1<<7 {
|
||||
dAtA[offset] = uint8(v&0x7f | 0x80)
|
||||
v >>= 7
|
||||
offset++
|
||||
}
|
||||
dAtA[offset] = uint8(v)
|
||||
return base
|
||||
}
|
||||
func (m *ForwardRequest) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
var l int
|
||||
_ = l
|
||||
if m.Envelope != nil {
|
||||
l = m.Envelope.Size()
|
||||
n += 1 + l + sovEvents(uint64(l))
|
||||
}
|
||||
if m.XXX_unrecognized != nil {
|
||||
n += len(m.XXX_unrecognized)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (m *Envelope) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
var l int
|
||||
_ = l
|
||||
l = github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp)
|
||||
n += 1 + l + sovEvents(uint64(l))
|
||||
l = len(m.Namespace)
|
||||
if l > 0 {
|
||||
n += 1 + l + sovEvents(uint64(l))
|
||||
}
|
||||
l = len(m.Topic)
|
||||
if l > 0 {
|
||||
n += 1 + l + sovEvents(uint64(l))
|
||||
}
|
||||
if m.Event != nil {
|
||||
l = m.Event.Size()
|
||||
n += 1 + l + sovEvents(uint64(l))
|
||||
}
|
||||
if m.XXX_unrecognized != nil {
|
||||
n += len(m.XXX_unrecognized)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func sovEvents(x uint64) (n int) {
|
||||
return (math_bits.Len64(x|1) + 6) / 7
|
||||
}
|
||||
func sozEvents(x uint64) (n int) {
|
||||
return sovEvents(uint64((x << 1) ^ uint64((int64(x) >> 63))))
|
||||
}
|
||||
func (this *ForwardRequest) String() string {
|
||||
if this == nil {
|
||||
return "nil"
|
||||
}
|
||||
s := strings.Join([]string{`&ForwardRequest{`,
|
||||
`Envelope:` + strings.Replace(this.Envelope.String(), "Envelope", "Envelope", 1) + `,`,
|
||||
`XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`,
|
||||
`}`,
|
||||
}, "")
|
||||
return s
|
||||
}
|
||||
func (this *Envelope) String() string {
|
||||
if this == nil {
|
||||
return "nil"
|
||||
}
|
||||
s := strings.Join([]string{`&Envelope{`,
|
||||
`Timestamp:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Timestamp), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
|
||||
`Namespace:` + fmt.Sprintf("%v", this.Namespace) + `,`,
|
||||
`Topic:` + fmt.Sprintf("%v", this.Topic) + `,`,
|
||||
`Event:` + strings.Replace(fmt.Sprintf("%v", this.Event), "Any", "types.Any", 1) + `,`,
|
||||
`XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`,
|
||||
`}`,
|
||||
}, "")
|
||||
return s
|
||||
}
|
||||
func valueToStringEvents(v interface{}) string {
|
||||
rv := reflect.ValueOf(v)
|
||||
if rv.IsNil() {
|
||||
return "nil"
|
||||
}
|
||||
pv := reflect.Indirect(rv).Interface()
|
||||
return fmt.Sprintf("*%v", pv)
|
||||
}
|
||||
|
||||
type EventsService interface {
|
||||
Forward(ctx context.Context, req *ForwardRequest) (*types.Empty, error)
|
||||
}
|
||||
|
||||
func RegisterEventsService(srv *github_com_containerd_ttrpc.Server, svc EventsService) {
|
||||
srv.Register("containerd.services.events.ttrpc.v1.Events", map[string]github_com_containerd_ttrpc.Method{
|
||||
"Forward": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
|
||||
var req ForwardRequest
|
||||
if err := unmarshal(&req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return svc.Forward(ctx, &req)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
type eventsClient struct {
|
||||
client *github_com_containerd_ttrpc.Client
|
||||
}
|
||||
|
||||
func NewEventsClient(client *github_com_containerd_ttrpc.Client) EventsService {
|
||||
return &eventsClient{
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *eventsClient) Forward(ctx context.Context, req *ForwardRequest) (*types.Empty, error) {
|
||||
var resp types.Empty
|
||||
if err := c.client.Call(ctx, "containerd.services.events.ttrpc.v1.Events", "Forward", req, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
func (m *ForwardRequest) Unmarshal(dAtA []byte) error {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
preIndex := iNdEx
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
if wireType == 4 {
|
||||
return fmt.Errorf("proto: ForwardRequest: wiretype end group for non-group")
|
||||
}
|
||||
if fieldNum <= 0 {
|
||||
return fmt.Errorf("proto: ForwardRequest: illegal tag %d (wire type %d)", fieldNum, wire)
|
||||
}
|
||||
switch fieldNum {
|
||||
case 1:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Envelope", wireType)
|
||||
}
|
||||
var msglen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
msglen |= int(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if msglen < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
postIndex := iNdEx + msglen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
if m.Envelope == nil {
|
||||
m.Envelope = &Envelope{}
|
||||
}
|
||||
if err := m.Envelope.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
iNdEx = postIndex
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipEvents(dAtA[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if (skippy < 0) || (iNdEx+skippy) < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
|
||||
if iNdEx > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (m *Envelope) Unmarshal(dAtA []byte) error {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
preIndex := iNdEx
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
if wireType == 4 {
|
||||
return fmt.Errorf("proto: Envelope: wiretype end group for non-group")
|
||||
}
|
||||
if fieldNum <= 0 {
|
||||
return fmt.Errorf("proto: Envelope: illegal tag %d (wire type %d)", fieldNum, wire)
|
||||
}
|
||||
switch fieldNum {
|
||||
case 1:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType)
|
||||
}
|
||||
var msglen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
msglen |= int(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if msglen < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
postIndex := iNdEx + msglen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
iNdEx = postIndex
|
||||
case 2:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Namespace", wireType)
|
||||
}
|
||||
var stringLen uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
stringLen |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
intStringLen := int(stringLen)
|
||||
if intStringLen < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
postIndex := iNdEx + intStringLen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.Namespace = string(dAtA[iNdEx:postIndex])
|
||||
iNdEx = postIndex
|
||||
case 3:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType)
|
||||
}
|
||||
var stringLen uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
stringLen |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
intStringLen := int(stringLen)
|
||||
if intStringLen < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
postIndex := iNdEx + intStringLen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.Topic = string(dAtA[iNdEx:postIndex])
|
||||
iNdEx = postIndex
|
||||
case 4:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Event", wireType)
|
||||
}
|
||||
var msglen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
msglen |= int(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if msglen < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
postIndex := iNdEx + msglen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
if m.Event == nil {
|
||||
m.Event = &types.Any{}
|
||||
}
|
||||
if err := m.Event.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
iNdEx = postIndex
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipEvents(dAtA[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if (skippy < 0) || (iNdEx+skippy) < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
|
||||
if iNdEx > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func skipEvents(dAtA []byte) (n int, err error) {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
depth := 0
|
||||
for iNdEx < l {
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return 0, ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
wireType := int(wire & 0x7)
|
||||
switch wireType {
|
||||
case 0:
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return 0, ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
iNdEx++
|
||||
if dAtA[iNdEx-1] < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
case 1:
|
||||
iNdEx += 8
|
||||
case 2:
|
||||
var length int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return 0, ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
length |= (int(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if length < 0 {
|
||||
return 0, ErrInvalidLengthEvents
|
||||
}
|
||||
iNdEx += length
|
||||
case 3:
|
||||
depth++
|
||||
case 4:
|
||||
if depth == 0 {
|
||||
return 0, ErrUnexpectedEndOfGroupEvents
|
||||
}
|
||||
depth--
|
||||
case 5:
|
||||
iNdEx += 4
|
||||
default:
|
||||
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
|
||||
}
|
||||
if iNdEx < 0 {
|
||||
return 0, ErrInvalidLengthEvents
|
||||
}
|
||||
if depth == 0 {
|
||||
return iNdEx, nil
|
||||
}
|
||||
}
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
|
||||
var (
|
||||
ErrInvalidLengthEvents = fmt.Errorf("proto: negative length found during unmarshaling")
|
||||
ErrIntOverflowEvents = fmt.Errorf("proto: integer overflow")
|
||||
ErrUnexpectedEndOfGroupEvents = fmt.Errorf("proto: unexpected end of group")
|
||||
)
|
48
vendor/github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto
generated
vendored
Normal file
48
vendor/github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto
generated
vendored
Normal file
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
package containerd.services.events.ttrpc.v1;
|
||||
|
||||
import weak "github.com/containerd/containerd/protobuf/plugin/fieldpath.proto";
|
||||
import weak "gogoproto/gogo.proto";
|
||||
import "google/protobuf/any.proto";
|
||||
import "google/protobuf/empty.proto";
|
||||
import "google/protobuf/timestamp.proto";
|
||||
|
||||
option go_package = "github.com/containerd/containerd/api/services/ttrpc/events/v1;events";
|
||||
|
||||
service Events {
|
||||
// Forward sends an event that has already been packaged into an envelope
|
||||
// with a timestamp and namespace.
|
||||
//
|
||||
// This is useful if earlier timestamping is required or when forwarding on
|
||||
// behalf of another component, namespace or publisher.
|
||||
rpc Forward(ForwardRequest) returns (google.protobuf.Empty);
|
||||
}
|
||||
|
||||
message ForwardRequest {
|
||||
Envelope envelope = 1;
|
||||
}
|
||||
|
||||
message Envelope {
|
||||
option (containerd.plugin.fieldpath) = true;
|
||||
google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
||||
string namespace = 2;
|
||||
string topic = 3;
|
||||
google.protobuf.Any event = 4;
|
||||
}
|
109
vendor/github.com/containerd/containerd/pkg/shutdown/shutdown.go
generated
vendored
Normal file
109
vendor/github.com/containerd/containerd/pkg/shutdown/shutdown.go
generated
vendored
Normal file
|
@ -0,0 +1,109 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shutdown
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// ErrShutdown is the error condition when a context has been fully shutdown
|
||||
var ErrShutdown = errors.New("shutdown")
|
||||
|
||||
// Service is used to facilitate shutdown by through callback
|
||||
// registration and shutdown initiation
|
||||
type Service interface {
|
||||
// Shutdown initiates shutdown
|
||||
Shutdown()
|
||||
// RegisterCallback registers functions to be called on shutdown and before
|
||||
// the shutdown channel is closed. A callback error will propagate to the
|
||||
// context error
|
||||
RegisterCallback(func(context.Context) error)
|
||||
}
|
||||
|
||||
// WithShutdown returns a context which is similar to a cancel context, but
|
||||
// with callbacks which can propagate to the context error. Unlike a cancel
|
||||
// context, the shutdown context cannot be canceled from the parent context.
|
||||
// However, future child contexes will be canceled upon shutdown.
|
||||
func WithShutdown(ctx context.Context) (context.Context, Service) {
|
||||
ss := &shutdownService{
|
||||
Context: ctx,
|
||||
doneC: make(chan struct{}),
|
||||
timeout: 30 * time.Second,
|
||||
}
|
||||
return ss, ss
|
||||
}
|
||||
|
||||
type shutdownService struct {
|
||||
context.Context
|
||||
|
||||
mu sync.Mutex
|
||||
isShutdown bool
|
||||
callbacks []func(context.Context) error
|
||||
doneC chan struct{}
|
||||
err error
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
func (s *shutdownService) Shutdown() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.isShutdown {
|
||||
return
|
||||
}
|
||||
s.isShutdown = true
|
||||
|
||||
go func(callbacks []func(context.Context) error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), s.timeout)
|
||||
defer cancel()
|
||||
grp, ctx := errgroup.WithContext(ctx)
|
||||
for i := range callbacks {
|
||||
fn := callbacks[i]
|
||||
grp.Go(func() error { return fn(ctx) })
|
||||
}
|
||||
err := grp.Wait()
|
||||
if err == nil {
|
||||
err = ErrShutdown
|
||||
}
|
||||
s.mu.Lock()
|
||||
s.err = err
|
||||
close(s.doneC)
|
||||
s.mu.Unlock()
|
||||
}(s.callbacks)
|
||||
}
|
||||
|
||||
func (s *shutdownService) Done() <-chan struct{} {
|
||||
return s.doneC
|
||||
}
|
||||
|
||||
func (s *shutdownService) Err() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.err
|
||||
}
|
||||
func (s *shutdownService) RegisterCallback(fn func(context.Context) error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.callbacks == nil {
|
||||
s.callbacks = []func(context.Context) error{}
|
||||
}
|
||||
s.callbacks = append(s.callbacks, fn)
|
||||
}
|
120
vendor/github.com/containerd/containerd/pkg/ttrpcutil/client.go
generated
vendored
Normal file
120
vendor/github.com/containerd/containerd/pkg/ttrpcutil/client.go
generated
vendored
Normal file
|
@ -0,0 +1,120 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package ttrpcutil
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
|
||||
"github.com/containerd/containerd/pkg/dialer"
|
||||
"github.com/containerd/ttrpc"
|
||||
)
|
||||
|
||||
const ttrpcDialTimeout = 5 * time.Second
|
||||
|
||||
type ttrpcConnector func() (*ttrpc.Client, error)
|
||||
|
||||
// Client is the client to interact with TTRPC part of containerd server (plugins, events)
|
||||
type Client struct {
|
||||
mu sync.Mutex
|
||||
connector ttrpcConnector
|
||||
client *ttrpc.Client
|
||||
closed bool
|
||||
}
|
||||
|
||||
// NewClient returns a new containerd TTRPC client that is connected to the containerd instance provided by address
|
||||
func NewClient(address string, opts ...ttrpc.ClientOpts) (*Client, error) {
|
||||
connector := func() (*ttrpc.Client, error) {
|
||||
conn, err := dialer.Dialer(address, ttrpcDialTimeout)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect: %w", err)
|
||||
}
|
||||
|
||||
client := ttrpc.NewClient(conn, opts...)
|
||||
return client, nil
|
||||
}
|
||||
|
||||
return &Client{
|
||||
connector: connector,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Reconnect re-establishes the TTRPC connection to the containerd daemon
|
||||
func (c *Client) Reconnect() error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.connector == nil {
|
||||
return errors.New("unable to reconnect to containerd, no connector available")
|
||||
}
|
||||
|
||||
if c.closed {
|
||||
return errors.New("client is closed")
|
||||
}
|
||||
|
||||
if c.client != nil {
|
||||
if err := c.client.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
client, err := c.connector()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.client = client
|
||||
return nil
|
||||
}
|
||||
|
||||
// EventsService creates an EventsService client
|
||||
func (c *Client) EventsService() (v1.EventsService, error) {
|
||||
client, err := c.Client()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v1.NewEventsClient(client), nil
|
||||
}
|
||||
|
||||
// Client returns the underlying TTRPC client object
|
||||
func (c *Client) Client() (*ttrpc.Client, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.client == nil {
|
||||
client, err := c.connector()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.client = client
|
||||
}
|
||||
return c.client, nil
|
||||
}
|
||||
|
||||
// Close closes the clients TTRPC connection to containerd
|
||||
func (c *Client) Close() error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
c.closed = true
|
||||
if c.client != nil {
|
||||
return c.client.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
169
vendor/github.com/containerd/containerd/runtime/v2/shim/publisher.go
generated
vendored
Normal file
169
vendor/github.com/containerd/containerd/runtime/v2/shim/publisher.go
generated
vendored
Normal file
|
@ -0,0 +1,169 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
|
||||
"github.com/containerd/containerd/events"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/pkg/ttrpcutil"
|
||||
"github.com/containerd/ttrpc"
|
||||
"github.com/containerd/typeurl"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
queueSize = 2048
|
||||
maxRequeue = 5
|
||||
)
|
||||
|
||||
type item struct {
|
||||
ev *v1.Envelope
|
||||
ctx context.Context
|
||||
count int
|
||||
}
|
||||
|
||||
// NewPublisher creates a new remote events publisher
|
||||
func NewPublisher(address string) (*RemoteEventsPublisher, error) {
|
||||
client, err := ttrpcutil.NewClient(address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
l := &RemoteEventsPublisher{
|
||||
client: client,
|
||||
closed: make(chan struct{}),
|
||||
requeue: make(chan *item, queueSize),
|
||||
}
|
||||
|
||||
go l.processQueue()
|
||||
return l, nil
|
||||
}
|
||||
|
||||
// RemoteEventsPublisher forwards events to a ttrpc server
|
||||
type RemoteEventsPublisher struct {
|
||||
client *ttrpcutil.Client
|
||||
closed chan struct{}
|
||||
closer sync.Once
|
||||
requeue chan *item
|
||||
}
|
||||
|
||||
// Done returns a channel which closes when done
|
||||
func (l *RemoteEventsPublisher) Done() <-chan struct{} {
|
||||
return l.closed
|
||||
}
|
||||
|
||||
// Close closes the remote connection and closes the done channel
|
||||
func (l *RemoteEventsPublisher) Close() (err error) {
|
||||
err = l.client.Close()
|
||||
l.closer.Do(func() {
|
||||
close(l.closed)
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (l *RemoteEventsPublisher) processQueue() {
|
||||
for i := range l.requeue {
|
||||
if i.count > maxRequeue {
|
||||
logrus.Errorf("evicting %s from queue because of retry count", i.ev.Topic)
|
||||
// drop the event
|
||||
continue
|
||||
}
|
||||
|
||||
if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil {
|
||||
logrus.WithError(err).Error("forward event")
|
||||
l.queue(i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (l *RemoteEventsPublisher) queue(i *item) {
|
||||
go func() {
|
||||
i.count++
|
||||
// re-queue after a short delay
|
||||
time.Sleep(time.Duration(1*i.count) * time.Second)
|
||||
l.requeue <- i
|
||||
}()
|
||||
}
|
||||
|
||||
// Publish publishes the event by forwarding it to the configured ttrpc server
|
||||
func (l *RemoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
any, err := typeurl.MarshalAny(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
i := &item{
|
||||
ev: &v1.Envelope{
|
||||
Timestamp: time.Now(),
|
||||
Namespace: ns,
|
||||
Topic: topic,
|
||||
Event: any,
|
||||
},
|
||||
ctx: ctx,
|
||||
}
|
||||
|
||||
if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil {
|
||||
l.queue(i)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *RemoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error {
|
||||
service, err := l.client.EventsService()
|
||||
if err == nil {
|
||||
fCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
_, err = service.Forward(fCtx, req)
|
||||
cancel()
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
if err != ttrpc.ErrClosed {
|
||||
return err
|
||||
}
|
||||
|
||||
// Reconnect and retry request
|
||||
if err = l.client.Reconnect(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
service, err = l.client.EventsService()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// try again with a fresh context, otherwise we may get a context timeout unexpectedly.
|
||||
fCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
_, err = service.Forward(fCtx, req)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
501
vendor/github.com/containerd/containerd/runtime/v2/shim/shim.go
generated
vendored
Normal file
501
vendor/github.com/containerd/containerd/runtime/v2/shim/shim.go
generated
vendored
Normal file
|
@ -0,0 +1,501 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/events"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/pkg/shutdown"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
shimapi "github.com/containerd/containerd/runtime/v2/task"
|
||||
"github.com/containerd/containerd/version"
|
||||
"github.com/containerd/ttrpc"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// Publisher for events
|
||||
type Publisher interface {
|
||||
events.Publisher
|
||||
io.Closer
|
||||
}
|
||||
|
||||
// StartOpts describes shim start configuration received from containerd
|
||||
type StartOpts struct {
|
||||
ID string // TODO(2.0): Remove ID, passed directly to start for call symmetry
|
||||
ContainerdBinary string
|
||||
Address string
|
||||
TTRPCAddress string
|
||||
}
|
||||
|
||||
type StopStatus struct {
|
||||
Pid int
|
||||
ExitStatus int
|
||||
ExitedAt time.Time
|
||||
}
|
||||
|
||||
// Init func for the creation of a shim server
|
||||
// TODO(2.0): Remove init function
|
||||
type Init func(context.Context, string, Publisher, func()) (Shim, error)
|
||||
|
||||
// Shim server interface
|
||||
// TODO(2.0): Remove unified shim interface
|
||||
type Shim interface {
|
||||
shimapi.TaskService
|
||||
Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error)
|
||||
StartShim(ctx context.Context, opts StartOpts) (string, error)
|
||||
}
|
||||
|
||||
// Manager is the interface which manages the shim process
|
||||
type Manager interface {
|
||||
Name() string
|
||||
Start(ctx context.Context, id string, opts StartOpts) (string, error)
|
||||
Stop(ctx context.Context, id string) (StopStatus, error)
|
||||
}
|
||||
|
||||
// OptsKey is the context key for the Opts value.
|
||||
type OptsKey struct{}
|
||||
|
||||
// Opts are context options associated with the shim invocation.
|
||||
type Opts struct {
|
||||
BundlePath string
|
||||
Debug bool
|
||||
}
|
||||
|
||||
// BinaryOpts allows the configuration of a shims binary setup
|
||||
type BinaryOpts func(*Config)
|
||||
|
||||
// Config of shim binary options provided by shim implementations
|
||||
type Config struct {
|
||||
// NoSubreaper disables setting the shim as a child subreaper
|
||||
NoSubreaper bool
|
||||
// NoReaper disables the shim binary from reaping any child process implicitly
|
||||
NoReaper bool
|
||||
// NoSetupLogger disables automatic configuration of logrus to use the shim FIFO
|
||||
NoSetupLogger bool
|
||||
}
|
||||
|
||||
type ttrpcService interface {
|
||||
RegisterTTRPC(*ttrpc.Server) error
|
||||
}
|
||||
|
||||
type taskService struct {
|
||||
shimapi.TaskService
|
||||
}
|
||||
|
||||
func (t taskService) RegisterTTRPC(server *ttrpc.Server) error {
|
||||
shimapi.RegisterTaskService(server, t.TaskService)
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
debugFlag bool
|
||||
versionFlag bool
|
||||
id string
|
||||
namespaceFlag string
|
||||
socketFlag string
|
||||
bundlePath string
|
||||
addressFlag string
|
||||
containerdBinaryFlag string
|
||||
action string
|
||||
)
|
||||
|
||||
const (
|
||||
ttrpcAddressEnv = "TTRPC_ADDRESS"
|
||||
)
|
||||
|
||||
func parseFlags() {
|
||||
flag.BoolVar(&debugFlag, "debug", false, "enable debug output in logs")
|
||||
flag.BoolVar(&versionFlag, "v", false, "show the shim version and exit")
|
||||
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
|
||||
flag.StringVar(&id, "id", "", "id of the task")
|
||||
flag.StringVar(&socketFlag, "socket", "", "socket path to serve")
|
||||
flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir")
|
||||
|
||||
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
|
||||
flag.StringVar(&containerdBinaryFlag, "publish-binary", "containerd", "path to publish binary (used for publishing events)")
|
||||
|
||||
flag.Parse()
|
||||
action = flag.Arg(0)
|
||||
}
|
||||
|
||||
func setRuntime() {
|
||||
debug.SetGCPercent(40)
|
||||
go func() {
|
||||
for range time.Tick(30 * time.Second) {
|
||||
debug.FreeOSMemory()
|
||||
}
|
||||
}()
|
||||
if os.Getenv("GOMAXPROCS") == "" {
|
||||
// If GOMAXPROCS hasn't been set, we default to a value of 2 to reduce
|
||||
// the number of Go stacks present in the shim.
|
||||
runtime.GOMAXPROCS(2)
|
||||
}
|
||||
}
|
||||
|
||||
func setLogger(ctx context.Context, id string) (context.Context, error) {
|
||||
l := log.G(ctx)
|
||||
l.Logger.SetFormatter(&logrus.TextFormatter{
|
||||
TimestampFormat: log.RFC3339NanoFixed,
|
||||
FullTimestamp: true,
|
||||
})
|
||||
if debugFlag {
|
||||
l.Logger.SetLevel(logrus.DebugLevel)
|
||||
}
|
||||
f, err := openLog(ctx, id)
|
||||
if err != nil { //nolint:staticcheck // Ignore SA4023 as some platforms always return error
|
||||
return ctx, err
|
||||
}
|
||||
l.Logger.SetOutput(f)
|
||||
return log.WithLogger(ctx, l), nil
|
||||
}
|
||||
|
||||
// Run initializes and runs a shim server
|
||||
// TODO(2.0): Remove function
|
||||
func Run(name string, initFunc Init, opts ...BinaryOpts) {
|
||||
var config Config
|
||||
for _, o := range opts {
|
||||
o(&config)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", name))
|
||||
|
||||
if err := run(ctx, nil, initFunc, name, config); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "%s: %s", name, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(2.0): Remove this type
|
||||
type shimToManager struct {
|
||||
shim Shim
|
||||
name string
|
||||
}
|
||||
|
||||
func (stm shimToManager) Name() string {
|
||||
return stm.name
|
||||
}
|
||||
|
||||
func (stm shimToManager) Start(ctx context.Context, id string, opts StartOpts) (string, error) {
|
||||
opts.ID = id
|
||||
return stm.shim.StartShim(ctx, opts)
|
||||
}
|
||||
|
||||
func (stm shimToManager) Stop(ctx context.Context, id string) (StopStatus, error) {
|
||||
// shim must already have id
|
||||
dr, err := stm.shim.Cleanup(ctx)
|
||||
if err != nil {
|
||||
return StopStatus{}, err
|
||||
}
|
||||
return StopStatus{
|
||||
Pid: int(dr.Pid),
|
||||
ExitStatus: int(dr.ExitStatus),
|
||||
ExitedAt: dr.ExitedAt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// RunManager initialzes and runs a shim server
|
||||
// TODO(2.0): Rename to Run
|
||||
func RunManager(ctx context.Context, manager Manager, opts ...BinaryOpts) {
|
||||
var config Config
|
||||
for _, o := range opts {
|
||||
o(&config)
|
||||
}
|
||||
|
||||
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", manager.Name()))
|
||||
|
||||
if err := run(ctx, manager, nil, "", config); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "%s: %s", manager.Name(), err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func run(ctx context.Context, manager Manager, initFunc Init, name string, config Config) error {
|
||||
parseFlags()
|
||||
if versionFlag {
|
||||
fmt.Printf("%s:\n", os.Args[0])
|
||||
fmt.Println(" Version: ", version.Version)
|
||||
fmt.Println(" Revision:", version.Revision)
|
||||
fmt.Println(" Go version:", version.GoVersion)
|
||||
fmt.Println("")
|
||||
return nil
|
||||
}
|
||||
|
||||
if namespaceFlag == "" {
|
||||
return fmt.Errorf("shim namespace cannot be empty")
|
||||
}
|
||||
|
||||
setRuntime()
|
||||
|
||||
signals, err := setupSignals(config)
|
||||
if err != nil { //nolint:staticcheck // Ignore SA4023 as some platforms always return error
|
||||
return err
|
||||
}
|
||||
|
||||
if !config.NoSubreaper {
|
||||
if err := subreaper(); err != nil { //nolint:staticcheck // Ignore SA4023 as some platforms always return error
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
ttrpcAddress := os.Getenv(ttrpcAddressEnv)
|
||||
publisher, err := NewPublisher(ttrpcAddress)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer publisher.Close()
|
||||
|
||||
ctx = namespaces.WithNamespace(ctx, namespaceFlag)
|
||||
ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
|
||||
ctx, sd := shutdown.WithShutdown(ctx)
|
||||
defer sd.Shutdown()
|
||||
|
||||
if manager == nil {
|
||||
service, err := initFunc(ctx, id, publisher, sd.Shutdown)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
plugin.Register(&plugin.Registration{
|
||||
Type: plugin.TTRPCPlugin,
|
||||
ID: "task",
|
||||
Requires: []plugin.Type{
|
||||
plugin.EventPlugin,
|
||||
},
|
||||
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
||||
return taskService{service}, nil
|
||||
},
|
||||
})
|
||||
manager = shimToManager{
|
||||
shim: service,
|
||||
name: name,
|
||||
}
|
||||
}
|
||||
|
||||
// Handle explicit actions
|
||||
switch action {
|
||||
case "delete":
|
||||
logger := log.G(ctx).WithFields(logrus.Fields{
|
||||
"pid": os.Getpid(),
|
||||
"namespace": namespaceFlag,
|
||||
})
|
||||
go reap(ctx, logger, signals)
|
||||
ss, err := manager.Stop(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data, err := proto.Marshal(&shimapi.DeleteResponse{
|
||||
Pid: uint32(ss.Pid),
|
||||
ExitStatus: uint32(ss.ExitStatus),
|
||||
ExitedAt: ss.ExitedAt,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := os.Stdout.Write(data); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
case "start":
|
||||
opts := StartOpts{
|
||||
ContainerdBinary: containerdBinaryFlag,
|
||||
Address: addressFlag,
|
||||
TTRPCAddress: ttrpcAddress,
|
||||
}
|
||||
|
||||
address, err := manager.Start(ctx, id, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := os.Stdout.WriteString(address); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if !config.NoSetupLogger {
|
||||
ctx, err = setLogger(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
plugin.Register(&plugin.Registration{
|
||||
Type: plugin.InternalPlugin,
|
||||
ID: "shutdown",
|
||||
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
||||
return sd, nil
|
||||
},
|
||||
})
|
||||
|
||||
// Register event plugin
|
||||
plugin.Register(&plugin.Registration{
|
||||
Type: plugin.EventPlugin,
|
||||
ID: "publisher",
|
||||
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
||||
return publisher, nil
|
||||
},
|
||||
})
|
||||
|
||||
var (
|
||||
initialized = plugin.NewPluginSet()
|
||||
ttrpcServices = []ttrpcService{}
|
||||
)
|
||||
plugins := plugin.Graph(func(*plugin.Registration) bool { return false })
|
||||
for _, p := range plugins {
|
||||
id := p.URI()
|
||||
log.G(ctx).WithField("type", p.Type).Infof("loading plugin %q...", id)
|
||||
|
||||
initContext := plugin.NewContext(
|
||||
ctx,
|
||||
p,
|
||||
initialized,
|
||||
// NOTE: Root is empty since the shim does not support persistent storage,
|
||||
// shim plugins should make use state directory for writing files to disk.
|
||||
// The state directory will be destroyed when the shim if cleaned up or
|
||||
// on reboot
|
||||
"",
|
||||
bundlePath,
|
||||
)
|
||||
initContext.Address = addressFlag
|
||||
initContext.TTRPCAddress = ttrpcAddress
|
||||
|
||||
// load the plugin specific configuration if it is provided
|
||||
//TODO: Read configuration passed into shim, or from state directory?
|
||||
//if p.Config != nil {
|
||||
// pc, err := config.Decode(p)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// initContext.Config = pc
|
||||
//}
|
||||
|
||||
result := p.Init(initContext)
|
||||
if err := initialized.Add(result); err != nil {
|
||||
return fmt.Errorf("could not add plugin result to plugin set: %w", err)
|
||||
}
|
||||
|
||||
instance, err := result.Instance()
|
||||
if err != nil {
|
||||
if plugin.IsSkipPlugin(err) {
|
||||
log.G(ctx).WithError(err).WithField("type", p.Type).Infof("skip loading plugin %q...", id)
|
||||
} else {
|
||||
log.G(ctx).WithError(err).Warnf("failed to load plugin %s", id)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if src, ok := instance.(ttrpcService); ok {
|
||||
logrus.WithField("id", id).Debug("registering ttrpc service")
|
||||
ttrpcServices = append(ttrpcServices, src)
|
||||
}
|
||||
}
|
||||
|
||||
server, err := newServer()
|
||||
if err != nil { //nolint:staticcheck // Ignore SA4023 as some platforms always return error
|
||||
return fmt.Errorf("failed creating server: %w", err)
|
||||
}
|
||||
|
||||
for _, srv := range ttrpcServices {
|
||||
if err := srv.RegisterTTRPC(server); err != nil {
|
||||
return fmt.Errorf("failed to register service: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := serve(ctx, server, signals, sd.Shutdown); err != nil { //nolint:staticcheck // Ignore SA4023 as some platforms always return error
|
||||
if err != shutdown.ErrShutdown {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: If the shim server is down(like oom killer), the address
|
||||
// socket might be leaking.
|
||||
if address, err := ReadAddress("address"); err == nil {
|
||||
_ = RemoveSocket(address)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-publisher.Done():
|
||||
return nil
|
||||
case <-time.After(5 * time.Second):
|
||||
return errors.New("publisher not closed")
|
||||
}
|
||||
}
|
||||
|
||||
// serve serves the ttrpc API over a unix socket in the current working directory
|
||||
// and blocks until the context is canceled
|
||||
func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal, shutdown func()) error {
|
||||
dump := make(chan os.Signal, 32)
|
||||
setupDumpStacks(dump)
|
||||
|
||||
path, err := os.Getwd()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
l, err := serveListener(socketFlag)
|
||||
if err != nil { //nolint:staticcheck // Ignore SA4023 as some platforms always return error
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
defer l.Close()
|
||||
if err := server.Serve(ctx, l); err != nil &&
|
||||
!strings.Contains(err.Error(), "use of closed network connection") {
|
||||
log.G(ctx).WithError(err).Fatal("containerd-shim: ttrpc server failure")
|
||||
}
|
||||
}()
|
||||
logger := log.G(ctx).WithFields(logrus.Fields{
|
||||
"pid": os.Getpid(),
|
||||
"path": path,
|
||||
"namespace": namespaceFlag,
|
||||
})
|
||||
go func() {
|
||||
for range dump {
|
||||
dumpStacks(logger)
|
||||
}
|
||||
}()
|
||||
|
||||
go handleExitSignals(ctx, logger, shutdown)
|
||||
return reap(ctx, logger, signals)
|
||||
}
|
||||
|
||||
func dumpStacks(logger *logrus.Entry) {
|
||||
var (
|
||||
buf []byte
|
||||
stackSize int
|
||||
)
|
||||
bufferLen := 16384
|
||||
for stackSize == len(buf) {
|
||||
buf = make([]byte, bufferLen)
|
||||
stackSize = runtime.Stack(buf, true)
|
||||
bufferLen *= 2
|
||||
}
|
||||
buf = buf[:stackSize]
|
||||
logger.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
|
||||
}
|
27
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_darwin.go
generated
vendored
Normal file
27
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_darwin.go
generated
vendored
Normal file
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import "github.com/containerd/ttrpc"
|
||||
|
||||
func newServer() (*ttrpc.Server, error) {
|
||||
return ttrpc.NewServer()
|
||||
}
|
||||
|
||||
func subreaper() error {
|
||||
return nil
|
||||
}
|
27
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_freebsd.go
generated
vendored
Normal file
27
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_freebsd.go
generated
vendored
Normal file
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import "github.com/containerd/ttrpc"
|
||||
|
||||
func newServer() (*ttrpc.Server, error) {
|
||||
return ttrpc.NewServer()
|
||||
}
|
||||
|
||||
func subreaper() error {
|
||||
return nil
|
||||
}
|
30
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_linux.go
generated
vendored
Normal file
30
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_linux.go
generated
vendored
Normal file
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import (
|
||||
"github.com/containerd/containerd/sys/reaper"
|
||||
"github.com/containerd/ttrpc"
|
||||
)
|
||||
|
||||
func newServer() (*ttrpc.Server, error) {
|
||||
return ttrpc.NewServer(ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser()))
|
||||
}
|
||||
|
||||
func subreaper() error {
|
||||
return reaper.SetSubreaper(1)
|
||||
}
|
113
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_unix.go
generated
vendored
Normal file
113
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_unix.go
generated
vendored
Normal file
|
@ -0,0 +1,113 @@
|
|||
//go:build !windows
|
||||
// +build !windows
|
||||
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/containerd/containerd/sys/reaper"
|
||||
"github.com/containerd/fifo"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// setupSignals creates a new signal handler for all signals and sets the shim as a
|
||||
// sub-reaper so that the container processes are reparented
|
||||
func setupSignals(config Config) (chan os.Signal, error) {
|
||||
signals := make(chan os.Signal, 32)
|
||||
smp := []os.Signal{unix.SIGTERM, unix.SIGINT, unix.SIGPIPE}
|
||||
if !config.NoReaper {
|
||||
smp = append(smp, unix.SIGCHLD)
|
||||
}
|
||||
signal.Notify(signals, smp...)
|
||||
return signals, nil
|
||||
}
|
||||
|
||||
func setupDumpStacks(dump chan<- os.Signal) {
|
||||
signal.Notify(dump, syscall.SIGUSR1)
|
||||
}
|
||||
|
||||
func serveListener(path string) (net.Listener, error) {
|
||||
var (
|
||||
l net.Listener
|
||||
err error
|
||||
)
|
||||
if path == "" {
|
||||
l, err = net.FileListener(os.NewFile(3, "socket"))
|
||||
path = "[inherited from parent]"
|
||||
} else {
|
||||
if len(path) > socketPathLimit {
|
||||
return nil, fmt.Errorf("%q: unix socket path too long (> %d)", path, socketPathLimit)
|
||||
}
|
||||
l, err = net.Listen("unix", path)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
logrus.WithField("socket", path).Debug("serving api on socket")
|
||||
return l, nil
|
||||
}
|
||||
|
||||
func reap(ctx context.Context, logger *logrus.Entry, signals chan os.Signal) error {
|
||||
logger.Info("starting signal loop")
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case s := <-signals:
|
||||
// Exit signals are handled separately from this loop
|
||||
// They get registered with this channel so that we can ignore such signals for short-running actions (e.g. `delete`)
|
||||
switch s {
|
||||
case unix.SIGCHLD:
|
||||
if err := reaper.Reap(); err != nil {
|
||||
logger.WithError(err).Error("reap exit status")
|
||||
}
|
||||
case unix.SIGPIPE:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func handleExitSignals(ctx context.Context, logger *logrus.Entry, cancel context.CancelFunc) {
|
||||
ch := make(chan os.Signal, 32)
|
||||
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
for {
|
||||
select {
|
||||
case s := <-ch:
|
||||
logger.WithField("signal", s).Debugf("Caught exit signal")
|
||||
cancel()
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func openLog(ctx context.Context, _ string) (io.Writer, error) {
|
||||
return fifo.OpenFifoDup2(ctx, "log", unix.O_WRONLY, 0700, int(os.Stderr.Fd()))
|
||||
}
|
58
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_windows.go
generated
vendored
Normal file
58
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_windows.go
generated
vendored
Normal file
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
|
||||
"github.com/containerd/ttrpc"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func setupSignals(config Config) (chan os.Signal, error) {
|
||||
return nil, errors.New("not supported")
|
||||
}
|
||||
|
||||
func newServer() (*ttrpc.Server, error) {
|
||||
return nil, errors.New("not supported")
|
||||
}
|
||||
|
||||
func subreaper() error {
|
||||
return errors.New("not supported")
|
||||
}
|
||||
|
||||
func setupDumpStacks(dump chan<- os.Signal) {
|
||||
}
|
||||
|
||||
func serveListener(path string) (net.Listener, error) {
|
||||
return nil, errors.New("not supported")
|
||||
}
|
||||
|
||||
func reap(ctx context.Context, logger *logrus.Entry, signals chan os.Signal) error {
|
||||
return errors.New("not supported")
|
||||
}
|
||||
|
||||
func handleExitSignals(ctx context.Context, logger *logrus.Entry, cancel context.CancelFunc) {
|
||||
}
|
||||
|
||||
func openLog(ctx context.Context, _ string) (io.Writer, error) {
|
||||
return nil, errors.New("not supported")
|
||||
}
|
169
vendor/github.com/containerd/containerd/runtime/v2/shim/util.go
generated
vendored
Normal file
169
vendor/github.com/containerd/containerd/runtime/v2/shim/util.go
generated
vendored
Normal file
|
@ -0,0 +1,169 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/gogo/protobuf/types"
|
||||
exec "golang.org/x/sys/execabs"
|
||||
)
|
||||
|
||||
type CommandConfig struct {
|
||||
Runtime string
|
||||
Address string
|
||||
TTRPCAddress string
|
||||
Path string
|
||||
SchedCore bool
|
||||
Args []string
|
||||
Opts *types.Any
|
||||
}
|
||||
|
||||
// Command returns the shim command with the provided args and configuration
|
||||
func Command(ctx context.Context, config *CommandConfig) (*exec.Cmd, error) {
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
self, err := os.Executable()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
args := []string{
|
||||
"-namespace", ns,
|
||||
"-address", config.Address,
|
||||
"-publish-binary", self,
|
||||
}
|
||||
args = append(args, config.Args...)
|
||||
cmd := exec.CommandContext(ctx, config.Runtime, args...)
|
||||
cmd.Dir = config.Path
|
||||
cmd.Env = append(
|
||||
os.Environ(),
|
||||
"GOMAXPROCS=2",
|
||||
fmt.Sprintf("%s=%s", ttrpcAddressEnv, config.TTRPCAddress),
|
||||
)
|
||||
if config.SchedCore {
|
||||
cmd.Env = append(cmd.Env, "SCHED_CORE=1")
|
||||
}
|
||||
cmd.SysProcAttr = getSysProcAttr()
|
||||
if config.Opts != nil {
|
||||
d, err := proto.Marshal(config.Opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cmd.Stdin = bytes.NewReader(d)
|
||||
}
|
||||
return cmd, nil
|
||||
}
|
||||
|
||||
// BinaryName returns the shim binary name from the runtime name,
|
||||
// empty string returns means runtime name is invalid
|
||||
func BinaryName(runtime string) string {
|
||||
// runtime name should format like $prefix.name.version
|
||||
parts := strings.Split(runtime, ".")
|
||||
if len(parts) < 2 {
|
||||
return ""
|
||||
}
|
||||
|
||||
return fmt.Sprintf(shimBinaryFormat, parts[len(parts)-2], parts[len(parts)-1])
|
||||
}
|
||||
|
||||
// BinaryPath returns the full path for the shim binary from the runtime name,
|
||||
// empty string returns means runtime name is invalid
|
||||
func BinaryPath(runtime string) string {
|
||||
dir := filepath.Dir(runtime)
|
||||
binary := BinaryName(runtime)
|
||||
|
||||
path, err := filepath.Abs(filepath.Join(dir, binary))
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
return path
|
||||
}
|
||||
|
||||
// Connect to the provided address
|
||||
func Connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
|
||||
return d(address, 100*time.Second)
|
||||
}
|
||||
|
||||
// WritePidFile writes a pid file atomically
|
||||
func WritePidFile(path string, pid int) error {
|
||||
path, err := filepath.Abs(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tempPath := filepath.Join(filepath.Dir(path), fmt.Sprintf(".%s", filepath.Base(path)))
|
||||
f, err := os.OpenFile(tempPath, os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, 0666)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = fmt.Fprintf(f, "%d", pid)
|
||||
f.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return os.Rename(tempPath, path)
|
||||
}
|
||||
|
||||
// WriteAddress writes a address file atomically
|
||||
func WriteAddress(path, address string) error {
|
||||
path, err := filepath.Abs(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tempPath := filepath.Join(filepath.Dir(path), fmt.Sprintf(".%s", filepath.Base(path)))
|
||||
f, err := os.OpenFile(tempPath, os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, 0666)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = f.WriteString(address)
|
||||
f.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return os.Rename(tempPath, path)
|
||||
}
|
||||
|
||||
// ErrNoAddress is returned when the address file has no content
|
||||
var ErrNoAddress = errors.New("no shim address")
|
||||
|
||||
// ReadAddress returns the shim's socket address from the path
|
||||
func ReadAddress(path string) (string, error) {
|
||||
path, err := filepath.Abs(path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if len(data) == 0 {
|
||||
return "", ErrNoAddress
|
||||
}
|
||||
return string(data), nil
|
||||
}
|
173
vendor/github.com/containerd/containerd/runtime/v2/shim/util_unix.go
generated
vendored
Normal file
173
vendor/github.com/containerd/containerd/runtime/v2/shim/util_unix.go
generated
vendored
Normal file
|
@ -0,0 +1,173 @@
|
|||
//go:build !windows
|
||||
// +build !windows
|
||||
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/defaults"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/sys"
|
||||
)
|
||||
|
||||
const (
|
||||
shimBinaryFormat = "containerd-shim-%s-%s"
|
||||
socketPathLimit = 106
|
||||
)
|
||||
|
||||
func getSysProcAttr() *syscall.SysProcAttr {
|
||||
return &syscall.SysProcAttr{
|
||||
Setpgid: true,
|
||||
}
|
||||
}
|
||||
|
||||
// AdjustOOMScore sets the OOM score for the process to the parents OOM score +1
|
||||
// to ensure that they parent has a lower* score than the shim
|
||||
// if not already at the maximum OOM Score
|
||||
func AdjustOOMScore(pid int) error {
|
||||
parent := os.Getppid()
|
||||
score, err := sys.GetOOMScoreAdj(parent)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get parent OOM score: %w", err)
|
||||
}
|
||||
shimScore := score + 1
|
||||
if err := sys.AdjustOOMScore(pid, shimScore); err != nil {
|
||||
return fmt.Errorf("set shim OOM score: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
const socketRoot = defaults.DefaultStateDir
|
||||
|
||||
// SocketAddress returns a socket address
|
||||
func SocketAddress(ctx context.Context, socketPath, id string) (string, error) {
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
d := sha256.Sum256([]byte(filepath.Join(socketPath, ns, id)))
|
||||
return fmt.Sprintf("unix://%s/%x", filepath.Join(socketRoot, "s"), d), nil
|
||||
}
|
||||
|
||||
// AnonDialer returns a dialer for a socket
|
||||
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
|
||||
return net.DialTimeout("unix", socket(address).path(), timeout)
|
||||
}
|
||||
|
||||
// AnonReconnectDialer returns a dialer for an existing socket on reconnection
|
||||
func AnonReconnectDialer(address string, timeout time.Duration) (net.Conn, error) {
|
||||
return AnonDialer(address, timeout)
|
||||
}
|
||||
|
||||
// NewSocket returns a new socket
|
||||
func NewSocket(address string) (*net.UnixListener, error) {
|
||||
var (
|
||||
sock = socket(address)
|
||||
path = sock.path()
|
||||
)
|
||||
|
||||
isAbstract := sock.isAbstract()
|
||||
|
||||
if !isAbstract {
|
||||
if err := os.MkdirAll(filepath.Dir(path), 0600); err != nil {
|
||||
return nil, fmt.Errorf("%s: %w", path, err)
|
||||
}
|
||||
}
|
||||
l, err := net.Listen("unix", path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !isAbstract {
|
||||
if err := os.Chmod(path, 0600); err != nil {
|
||||
os.Remove(sock.path())
|
||||
l.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return l.(*net.UnixListener), nil
|
||||
}
|
||||
|
||||
const abstractSocketPrefix = "\x00"
|
||||
|
||||
type socket string
|
||||
|
||||
func (s socket) isAbstract() bool {
|
||||
return !strings.HasPrefix(string(s), "unix://")
|
||||
}
|
||||
|
||||
func (s socket) path() string {
|
||||
path := strings.TrimPrefix(string(s), "unix://")
|
||||
// if there was no trim performed, we assume an abstract socket
|
||||
if len(path) == len(s) {
|
||||
path = abstractSocketPrefix + path
|
||||
}
|
||||
return path
|
||||
}
|
||||
|
||||
// RemoveSocket removes the socket at the specified address if
|
||||
// it exists on the filesystem
|
||||
func RemoveSocket(address string) error {
|
||||
sock := socket(address)
|
||||
if !sock.isAbstract() {
|
||||
return os.Remove(sock.path())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SocketEaddrinuse returns true if the provided error is caused by the
|
||||
// EADDRINUSE error number
|
||||
func SocketEaddrinuse(err error) bool {
|
||||
netErr, ok := err.(*net.OpError)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
if netErr.Op != "listen" {
|
||||
return false
|
||||
}
|
||||
syscallErr, ok := netErr.Err.(*os.SyscallError)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
errno, ok := syscallErr.Err.(syscall.Errno)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return errno == syscall.EADDRINUSE
|
||||
}
|
||||
|
||||
// CanConnect returns true if the socket provided at the address
|
||||
// is accepting new connections
|
||||
func CanConnect(address string) bool {
|
||||
conn, err := AnonDialer(address, 100*time.Millisecond)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
conn.Close()
|
||||
return true
|
||||
}
|
87
vendor/github.com/containerd/containerd/runtime/v2/shim/util_windows.go
generated
vendored
Normal file
87
vendor/github.com/containerd/containerd/runtime/v2/shim/util_windows.go
generated
vendored
Normal file
|
@ -0,0 +1,87 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
winio "github.com/Microsoft/go-winio"
|
||||
)
|
||||
|
||||
const shimBinaryFormat = "containerd-shim-%s-%s.exe"
|
||||
|
||||
func getSysProcAttr() *syscall.SysProcAttr {
|
||||
return nil
|
||||
}
|
||||
|
||||
// AnonReconnectDialer returns a dialer for an existing npipe on containerd reconnection
|
||||
func AnonReconnectDialer(address string, timeout time.Duration) (net.Conn, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
|
||||
c, err := winio.DialPipeContext(ctx, address)
|
||||
if os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("npipe not found on reconnect: %w", os.ErrNotExist)
|
||||
} else if err == context.DeadlineExceeded {
|
||||
return nil, fmt.Errorf("timed out waiting for npipe %s: %w", address, err)
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// AnonDialer returns a dialer for a npipe
|
||||
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
|
||||
// If there is nobody serving the pipe we limit the timeout for this case to
|
||||
// 5 seconds because any shim that would serve this endpoint should serve it
|
||||
// within 5 seconds.
|
||||
serveTimer := time.NewTimer(5 * time.Second)
|
||||
defer serveTimer.Stop()
|
||||
for {
|
||||
c, err := winio.DialPipeContext(ctx, address)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
select {
|
||||
case <-serveTimer.C:
|
||||
return nil, fmt.Errorf("pipe not found before timeout: %w", os.ErrNotExist)
|
||||
default:
|
||||
// Wait 10ms for the shim to serve and try again.
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
} else if err == context.DeadlineExceeded {
|
||||
return nil, fmt.Errorf("timed out waiting for npipe %s: %w", address, err)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
}
|
||||
|
||||
// RemoveSocket removes the socket at the specified address if
|
||||
// it exists on the filesystem
|
||||
func RemoveSocket(address string) error {
|
||||
return nil
|
||||
}
|
17
vendor/github.com/containerd/containerd/runtime/v2/task/doc.go
generated
vendored
Normal file
17
vendor/github.com/containerd/containerd/runtime/v2/task/doc.go
generated
vendored
Normal file
|
@ -0,0 +1,17 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package task
|
7312
vendor/github.com/containerd/containerd/runtime/v2/task/shim.pb.go
generated
vendored
Normal file
7312
vendor/github.com/containerd/containerd/runtime/v2/task/shim.pb.go
generated
vendored
Normal file
File diff suppressed because it is too large
Load diff
202
vendor/github.com/containerd/containerd/runtime/v2/task/shim.proto
generated
vendored
Normal file
202
vendor/github.com/containerd/containerd/runtime/v2/task/shim.proto
generated
vendored
Normal file
|
@ -0,0 +1,202 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
package containerd.task.v2;
|
||||
|
||||
import "google/protobuf/any.proto";
|
||||
import "google/protobuf/empty.proto";
|
||||
import weak "gogoproto/gogo.proto";
|
||||
import "google/protobuf/timestamp.proto";
|
||||
import "github.com/containerd/containerd/api/types/mount.proto";
|
||||
import "github.com/containerd/containerd/api/types/task/task.proto";
|
||||
|
||||
option go_package = "github.com/containerd/containerd/runtime/v2/task;task";
|
||||
|
||||
// Shim service is launched for each container and is responsible for owning the IO
|
||||
// for the container and its additional processes. The shim is also the parent of
|
||||
// each container and allows reattaching to the IO and receiving the exit status
|
||||
// for the container processes.
|
||||
service Task {
|
||||
rpc State(StateRequest) returns (StateResponse);
|
||||
rpc Create(CreateTaskRequest) returns (CreateTaskResponse);
|
||||
rpc Start(StartRequest) returns (StartResponse);
|
||||
rpc Delete(DeleteRequest) returns (DeleteResponse);
|
||||
rpc Pids(PidsRequest) returns (PidsResponse);
|
||||
rpc Pause(PauseRequest) returns (google.protobuf.Empty);
|
||||
rpc Resume(ResumeRequest) returns (google.protobuf.Empty);
|
||||
rpc Checkpoint(CheckpointTaskRequest) returns (google.protobuf.Empty);
|
||||
rpc Kill(KillRequest) returns (google.protobuf.Empty);
|
||||
rpc Exec(ExecProcessRequest) returns (google.protobuf.Empty);
|
||||
rpc ResizePty(ResizePtyRequest) returns (google.protobuf.Empty);
|
||||
rpc CloseIO(CloseIORequest) returns (google.protobuf.Empty);
|
||||
rpc Update(UpdateTaskRequest) returns (google.protobuf.Empty);
|
||||
rpc Wait(WaitRequest) returns (WaitResponse);
|
||||
rpc Stats(StatsRequest) returns (StatsResponse);
|
||||
rpc Connect(ConnectRequest) returns (ConnectResponse);
|
||||
rpc Shutdown(ShutdownRequest) returns (google.protobuf.Empty);
|
||||
}
|
||||
|
||||
message CreateTaskRequest {
|
||||
string id = 1;
|
||||
string bundle = 2;
|
||||
repeated containerd.types.Mount rootfs = 3;
|
||||
bool terminal = 4;
|
||||
string stdin = 5;
|
||||
string stdout = 6;
|
||||
string stderr = 7;
|
||||
string checkpoint = 8;
|
||||
string parent_checkpoint = 9;
|
||||
google.protobuf.Any options = 10;
|
||||
}
|
||||
|
||||
message CreateTaskResponse {
|
||||
uint32 pid = 1;
|
||||
}
|
||||
|
||||
message DeleteRequest {
|
||||
string id = 1;
|
||||
string exec_id = 2;
|
||||
}
|
||||
|
||||
message DeleteResponse {
|
||||
uint32 pid = 1;
|
||||
uint32 exit_status = 2;
|
||||
google.protobuf.Timestamp exited_at = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
||||
}
|
||||
|
||||
message ExecProcessRequest {
|
||||
string id = 1;
|
||||
string exec_id = 2;
|
||||
bool terminal = 3;
|
||||
string stdin = 4;
|
||||
string stdout = 5;
|
||||
string stderr = 6;
|
||||
google.protobuf.Any spec = 7;
|
||||
}
|
||||
|
||||
message ExecProcessResponse {
|
||||
}
|
||||
|
||||
message ResizePtyRequest {
|
||||
string id = 1;
|
||||
string exec_id = 2;
|
||||
uint32 width = 3;
|
||||
uint32 height = 4;
|
||||
}
|
||||
|
||||
message StateRequest {
|
||||
string id = 1;
|
||||
string exec_id = 2;
|
||||
}
|
||||
|
||||
message StateResponse {
|
||||
string id = 1;
|
||||
string bundle = 2;
|
||||
uint32 pid = 3;
|
||||
containerd.v1.types.Status status = 4;
|
||||
string stdin = 5;
|
||||
string stdout = 6;
|
||||
string stderr = 7;
|
||||
bool terminal = 8;
|
||||
uint32 exit_status = 9;
|
||||
google.protobuf.Timestamp exited_at = 10 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
||||
string exec_id = 11;
|
||||
}
|
||||
|
||||
message KillRequest {
|
||||
string id = 1;
|
||||
string exec_id = 2;
|
||||
uint32 signal = 3;
|
||||
bool all = 4;
|
||||
}
|
||||
|
||||
message CloseIORequest {
|
||||
string id = 1;
|
||||
string exec_id = 2;
|
||||
bool stdin = 3;
|
||||
}
|
||||
|
||||
message PidsRequest {
|
||||
string id = 1;
|
||||
}
|
||||
|
||||
message PidsResponse {
|
||||
repeated containerd.v1.types.ProcessInfo processes = 1;
|
||||
}
|
||||
|
||||
message CheckpointTaskRequest {
|
||||
string id = 1;
|
||||
string path = 2;
|
||||
google.protobuf.Any options = 3;
|
||||
}
|
||||
|
||||
message UpdateTaskRequest {
|
||||
string id = 1;
|
||||
google.protobuf.Any resources = 2;
|
||||
map<string, string> annotations = 3;
|
||||
}
|
||||
|
||||
message StartRequest {
|
||||
string id = 1;
|
||||
string exec_id = 2;
|
||||
}
|
||||
|
||||
message StartResponse {
|
||||
uint32 pid = 1;
|
||||
}
|
||||
|
||||
message WaitRequest {
|
||||
string id = 1;
|
||||
string exec_id = 2;
|
||||
}
|
||||
|
||||
message WaitResponse {
|
||||
uint32 exit_status = 1;
|
||||
google.protobuf.Timestamp exited_at = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
||||
}
|
||||
|
||||
message StatsRequest {
|
||||
string id = 1;
|
||||
}
|
||||
|
||||
message StatsResponse {
|
||||
google.protobuf.Any stats = 1;
|
||||
}
|
||||
|
||||
message ConnectRequest {
|
||||
string id = 1;
|
||||
}
|
||||
|
||||
message ConnectResponse {
|
||||
uint32 shim_pid = 1;
|
||||
uint32 task_pid = 2;
|
||||
string version = 3;
|
||||
}
|
||||
|
||||
message ShutdownRequest {
|
||||
string id = 1;
|
||||
bool now = 2;
|
||||
}
|
||||
|
||||
message PauseRequest {
|
||||
string id = 1;
|
||||
}
|
||||
|
||||
message ResumeRequest {
|
||||
string id = 1;
|
||||
}
|
283
vendor/github.com/containerd/containerd/sys/reaper/reaper_unix.go
generated
vendored
Normal file
283
vendor/github.com/containerd/containerd/sys/reaper/reaper_unix.go
generated
vendored
Normal file
|
@ -0,0 +1,283 @@
|
|||
//go:build !windows
|
||||
// +build !windows
|
||||
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package reaper
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
runc "github.com/containerd/go-runc"
|
||||
exec "golang.org/x/sys/execabs"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// ErrNoSuchProcess is returned when the process no longer exists
|
||||
var ErrNoSuchProcess = errors.New("no such process")
|
||||
|
||||
const bufferSize = 32
|
||||
|
||||
type subscriber struct {
|
||||
sync.Mutex
|
||||
c chan runc.Exit
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (s *subscriber) close() {
|
||||
s.Lock()
|
||||
if s.closed {
|
||||
s.Unlock()
|
||||
return
|
||||
}
|
||||
close(s.c)
|
||||
s.closed = true
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
func (s *subscriber) do(fn func()) {
|
||||
s.Lock()
|
||||
fn()
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
// Reap should be called when the process receives an SIGCHLD. Reap will reap
|
||||
// all exited processes and close their wait channels
|
||||
func Reap() error {
|
||||
now := time.Now()
|
||||
exits, err := reap(false)
|
||||
for _, e := range exits {
|
||||
done := Default.notify(runc.Exit{
|
||||
Timestamp: now,
|
||||
Pid: e.Pid,
|
||||
Status: e.Status,
|
||||
})
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(1 * time.Second):
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Default is the default monitor initialized for the package
|
||||
var Default = &Monitor{
|
||||
subscribers: make(map[chan runc.Exit]*subscriber),
|
||||
}
|
||||
|
||||
// Monitor monitors the underlying system for process status changes
|
||||
type Monitor struct {
|
||||
sync.Mutex
|
||||
|
||||
subscribers map[chan runc.Exit]*subscriber
|
||||
}
|
||||
|
||||
// Start starts the command a registers the process with the reaper
|
||||
func (m *Monitor) Start(c *exec.Cmd) (chan runc.Exit, error) {
|
||||
ec := m.Subscribe()
|
||||
if err := c.Start(); err != nil {
|
||||
m.Unsubscribe(ec)
|
||||
return nil, err
|
||||
}
|
||||
return ec, nil
|
||||
}
|
||||
|
||||
// Wait blocks until a process is signal as dead.
|
||||
// User should rely on the value of the exit status to determine if the
|
||||
// command was successful or not.
|
||||
func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) {
|
||||
for e := range ec {
|
||||
if e.Pid == c.Process.Pid {
|
||||
// make sure we flush all IO
|
||||
c.Wait()
|
||||
m.Unsubscribe(ec)
|
||||
return e.Status, nil
|
||||
}
|
||||
}
|
||||
// return no such process if the ec channel is closed and no more exit
|
||||
// events will be sent
|
||||
return -1, ErrNoSuchProcess
|
||||
}
|
||||
|
||||
// WaitTimeout is used to skip the blocked command and kill the left process.
|
||||
func (m *Monitor) WaitTimeout(c *exec.Cmd, ec chan runc.Exit, timeout time.Duration) (int, error) {
|
||||
type exitStatusWrapper struct {
|
||||
status int
|
||||
err error
|
||||
}
|
||||
|
||||
// capacity can make sure that the following goroutine will not be
|
||||
// blocked if there is no receiver when timeout.
|
||||
waitCh := make(chan *exitStatusWrapper, 1)
|
||||
go func() {
|
||||
defer close(waitCh)
|
||||
|
||||
status, err := m.Wait(c, ec)
|
||||
waitCh <- &exitStatusWrapper{
|
||||
status: status,
|
||||
err: err,
|
||||
}
|
||||
}()
|
||||
|
||||
timer := time.NewTimer(timeout)
|
||||
defer timer.Stop()
|
||||
|
||||
select {
|
||||
case <-timer.C:
|
||||
syscall.Kill(c.Process.Pid, syscall.SIGKILL)
|
||||
return 0, fmt.Errorf("timeout %v for cmd(pid=%d): %s, %s", timeout, c.Process.Pid, c.Path, c.Args)
|
||||
case res := <-waitCh:
|
||||
return res.status, res.err
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe to process exit changes
|
||||
func (m *Monitor) Subscribe() chan runc.Exit {
|
||||
c := make(chan runc.Exit, bufferSize)
|
||||
m.Lock()
|
||||
m.subscribers[c] = &subscriber{
|
||||
c: c,
|
||||
}
|
||||
m.Unlock()
|
||||
return c
|
||||
}
|
||||
|
||||
// Unsubscribe to process exit changes
|
||||
func (m *Monitor) Unsubscribe(c chan runc.Exit) {
|
||||
m.Lock()
|
||||
s, ok := m.subscribers[c]
|
||||
if !ok {
|
||||
m.Unlock()
|
||||
return
|
||||
}
|
||||
s.close()
|
||||
delete(m.subscribers, c)
|
||||
m.Unlock()
|
||||
}
|
||||
|
||||
func (m *Monitor) getSubscribers() map[chan runc.Exit]*subscriber {
|
||||
out := make(map[chan runc.Exit]*subscriber)
|
||||
m.Lock()
|
||||
for k, v := range m.subscribers {
|
||||
out[k] = v
|
||||
}
|
||||
m.Unlock()
|
||||
return out
|
||||
}
|
||||
|
||||
func (m *Monitor) notify(e runc.Exit) chan struct{} {
|
||||
const timeout = 1 * time.Millisecond
|
||||
var (
|
||||
done = make(chan struct{}, 1)
|
||||
timer = time.NewTimer(timeout)
|
||||
success = make(map[chan runc.Exit]struct{})
|
||||
)
|
||||
stop(timer, true)
|
||||
|
||||
go func() {
|
||||
defer close(done)
|
||||
|
||||
for {
|
||||
var (
|
||||
failed int
|
||||
subscribers = m.getSubscribers()
|
||||
)
|
||||
for _, s := range subscribers {
|
||||
s.do(func() {
|
||||
if s.closed {
|
||||
return
|
||||
}
|
||||
if _, ok := success[s.c]; ok {
|
||||
return
|
||||
}
|
||||
timer.Reset(timeout)
|
||||
recv := true
|
||||
select {
|
||||
case s.c <- e:
|
||||
success[s.c] = struct{}{}
|
||||
case <-timer.C:
|
||||
recv = false
|
||||
failed++
|
||||
}
|
||||
stop(timer, recv)
|
||||
})
|
||||
}
|
||||
// all subscribers received the message
|
||||
if failed == 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return done
|
||||
}
|
||||
|
||||
func stop(timer *time.Timer, recv bool) {
|
||||
if !timer.Stop() && recv {
|
||||
<-timer.C
|
||||
}
|
||||
}
|
||||
|
||||
// exit is the wait4 information from an exited process
|
||||
type exit struct {
|
||||
Pid int
|
||||
Status int
|
||||
}
|
||||
|
||||
// reap reaps all child processes for the calling process and returns their
|
||||
// exit information
|
||||
func reap(wait bool) (exits []exit, err error) {
|
||||
var (
|
||||
ws unix.WaitStatus
|
||||
rus unix.Rusage
|
||||
)
|
||||
flag := unix.WNOHANG
|
||||
if wait {
|
||||
flag = 0
|
||||
}
|
||||
for {
|
||||
pid, err := unix.Wait4(-1, &ws, flag, &rus)
|
||||
if err != nil {
|
||||
if err == unix.ECHILD {
|
||||
return exits, nil
|
||||
}
|
||||
return exits, err
|
||||
}
|
||||
if pid <= 0 {
|
||||
return exits, nil
|
||||
}
|
||||
exits = append(exits, exit{
|
||||
Pid: pid,
|
||||
Status: exitStatus(ws),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
const exitSignalOffset = 128
|
||||
|
||||
// exitStatus returns the correct exit status for a process based on if it
|
||||
// was signaled or exited cleanly
|
||||
func exitStatus(status unix.WaitStatus) int {
|
||||
if status.Signaled() {
|
||||
return exitSignalOffset + int(status.Signal())
|
||||
}
|
||||
return status.ExitStatus()
|
||||
}
|
39
vendor/github.com/containerd/containerd/sys/reaper/reaper_utils_linux.go
generated
vendored
Normal file
39
vendor/github.com/containerd/containerd/sys/reaper/reaper_utils_linux.go
generated
vendored
Normal file
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package reaper
|
||||
|
||||
import (
|
||||
"unsafe"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// SetSubreaper sets the value i as the subreaper setting for the calling process
|
||||
func SetSubreaper(i int) error {
|
||||
return unix.Prctl(unix.PR_SET_CHILD_SUBREAPER, uintptr(i), 0, 0, 0)
|
||||
}
|
||||
|
||||
// GetSubreaper returns the subreaper setting for the calling process
|
||||
func GetSubreaper() (int, error) {
|
||||
var i uintptr
|
||||
|
||||
if err := unix.Prctl(unix.PR_GET_CHILD_SUBREAPER, uintptr(unsafe.Pointer(&i)), 0, 0, 0); err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
return int(i), nil
|
||||
}
|
6
vendor/modules.txt
vendored
6
vendor/modules.txt
vendored
|
@ -167,6 +167,7 @@ github.com/containerd/containerd/api/services/leases/v1
|
|||
github.com/containerd/containerd/api/services/namespaces/v1
|
||||
github.com/containerd/containerd/api/services/snapshots/v1
|
||||
github.com/containerd/containerd/api/services/tasks/v1
|
||||
github.com/containerd/containerd/api/services/ttrpc/events/v1
|
||||
github.com/containerd/containerd/api/services/version/v1
|
||||
github.com/containerd/containerd/api/types
|
||||
github.com/containerd/containerd/api/types/task
|
||||
|
@ -204,6 +205,8 @@ github.com/containerd/containerd/pkg/cap
|
|||
github.com/containerd/containerd/pkg/dialer
|
||||
github.com/containerd/containerd/pkg/kmutex
|
||||
github.com/containerd/containerd/pkg/seccomp
|
||||
github.com/containerd/containerd/pkg/shutdown
|
||||
github.com/containerd/containerd/pkg/ttrpcutil
|
||||
github.com/containerd/containerd/pkg/userns
|
||||
github.com/containerd/containerd/platforms
|
||||
github.com/containerd/containerd/plugin
|
||||
|
@ -217,6 +220,8 @@ github.com/containerd/containerd/remotes/errors
|
|||
github.com/containerd/containerd/rootfs
|
||||
github.com/containerd/containerd/runtime/linux/runctypes
|
||||
github.com/containerd/containerd/runtime/v2/runc/options
|
||||
github.com/containerd/containerd/runtime/v2/shim
|
||||
github.com/containerd/containerd/runtime/v2/task
|
||||
github.com/containerd/containerd/services
|
||||
github.com/containerd/containerd/services/content/contentserver
|
||||
github.com/containerd/containerd/services/introspection
|
||||
|
@ -224,6 +229,7 @@ github.com/containerd/containerd/services/server/config
|
|||
github.com/containerd/containerd/snapshots
|
||||
github.com/containerd/containerd/snapshots/proxy
|
||||
github.com/containerd/containerd/sys
|
||||
github.com/containerd/containerd/sys/reaper
|
||||
github.com/containerd/containerd/version
|
||||
# github.com/containerd/continuity v0.3.0
|
||||
## explicit; go 1.17
|
||||
|
|
Loading…
Reference in a new issue