daemon: support other containerd runtimes (MVP)
Contrary to popular belief, the OCI Runtime specification does not specify the command-line API for runtimes. Looking at containerd's architecture from the lens of the OCI Runtime spec, the _shim_ is the OCI Runtime and runC is "just" an implementation detail of the io.containerd.runc.v2 runtime. When one configures a non-default runtime in Docker, what they're really doing is instructing Docker to create containers using the io.containerd.runc.v2 runtime with a configuration option telling the runtime that the runC binary is at some non-default path. Consequently, only OCI runtimes which are compatible with the io.containerd.runc.v2 shim, such as crun, can be used in this manner. Other OCI runtimes, including kata-containers v2, come with their own containerd shim and are not compatible with io.containerd.runc.v2. As Docker has not historically provided a way to select a non-default runtime which requires its own shim, runtimes such as kata-containers v2 could not be used with Docker. Allow other containerd shims to be used with Docker; no daemon configuration required. If the daemon is instructed to create a container with a runtime name which does not match any of the configured or stock runtimes, it passes the name along to containerd verbatim. A user can start a container with the kata-containers runtime, for example, simply by calling docker run --runtime io.containerd.kata.v2 Runtime names which containerd would interpret as a path to an arbitrary binary are disallowed. While handy for development and testing it is not strictly necessary and would allow anyone with Engine API access to trivially execute any binary on the host as root, so we have decided it would be safest for our users if it was not allowed. It is not yet possible to set an alternative containerd shim as the default runtime; it can only be configured per-container. Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
parent
7624f8aeb1
commit
547da0d575
29 changed files with 10504 additions and 16 deletions
|
@ -707,8 +707,8 @@ func verifyPlatformContainerSettings(daemon *Daemon, hostConfig *containertypes.
|
|||
hostConfig.Runtime = daemon.configStore.GetDefaultRuntimeName()
|
||||
}
|
||||
|
||||
if rt := daemon.configStore.GetRuntime(hostConfig.Runtime); rt == nil {
|
||||
return warnings, fmt.Errorf("Unknown runtime specified %s", hostConfig.Runtime)
|
||||
if _, err := daemon.getRuntime(hostConfig.Runtime); err != nil {
|
||||
return warnings, err
|
||||
}
|
||||
|
||||
parser := volumemounts.NewParser()
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"strings"
|
||||
|
||||
v2runcoptions "github.com/containerd/containerd/runtime/v2/runc/options"
|
||||
"github.com/containerd/containerd/runtime/v2/shim"
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/daemon/config"
|
||||
"github.com/docker/docker/errdefs"
|
||||
|
@ -116,7 +117,10 @@ func (daemon *Daemon) rewriteRuntimePath(name, p string, args []string) (string,
|
|||
func (daemon *Daemon) getRuntime(name string) (*types.Runtime, error) {
|
||||
rt := daemon.configStore.GetRuntime(name)
|
||||
if rt == nil {
|
||||
return nil, errdefs.InvalidParameter(errors.Errorf("runtime not found in config: %s", name))
|
||||
if !isPermissibleC8dRuntimeName(name) {
|
||||
return nil, errdefs.InvalidParameter(errors.Errorf("unknown or invalid runtime name: %s", name))
|
||||
}
|
||||
return &types.Runtime{Shim: &types.ShimConfig{Binary: name}}, nil
|
||||
}
|
||||
|
||||
if len(rt.Args) > 0 {
|
||||
|
@ -134,3 +138,37 @@ func (daemon *Daemon) getRuntime(name string) (*types.Runtime, error) {
|
|||
|
||||
return rt, nil
|
||||
}
|
||||
|
||||
// isPermissibleC8dRuntimeName tests whether name is safe to pass into
|
||||
// containerd as a runtime name, and whether the name is well-formed.
|
||||
// It does not check if the runtime is installed.
|
||||
//
|
||||
// A runtime name containing slash characters is interpreted by containerd as
|
||||
// the path to a runtime binary. If we allowed this, anyone with Engine API
|
||||
// access could get containerd to execute an arbitrary binary as root. Although
|
||||
// Engine API access is already equivalent to root on the host, the runtime name
|
||||
// has not historically been a vector to run arbitrary code as root so users are
|
||||
// not expecting it to become one.
|
||||
//
|
||||
// This restriction is not configurable. There are viable workarounds for
|
||||
// legitimate use cases: administrators and runtime developers can make runtimes
|
||||
// available for use with Docker by installing them onto PATH following the
|
||||
// [binary naming convention] for containerd Runtime v2.
|
||||
//
|
||||
// [binary naming convention]: https://github.com/containerd/containerd/blob/main/runtime/v2/README.md#binary-naming
|
||||
func isPermissibleC8dRuntimeName(name string) bool {
|
||||
// containerd uses a rather permissive test to validate runtime names:
|
||||
//
|
||||
// - Any name for which filepath.IsAbs(name) is interpreted as the absolute
|
||||
// path to a shim binary. We want to block this behaviour.
|
||||
// - Any name which contains at least one '.' character and no '/' characters
|
||||
// and does not begin with a '.' character is a valid runtime name. The shim
|
||||
// binary name is derived from the final two components of the name and
|
||||
// searched for on the PATH. The name "a.." is technically valid per
|
||||
// containerd's implementation: it would resolve to a binary named
|
||||
// "containerd-shim---".
|
||||
//
|
||||
// https://github.com/containerd/containerd/blob/11ded166c15f92450958078cd13c6d87131ec563/runtime/v2/manager.go#L297-L317
|
||||
// https://github.com/containerd/containerd/blob/11ded166c15f92450958078cd13c6d87131ec563/runtime/v2/shim/util.go#L83-L93
|
||||
return !filepath.IsAbs(name) && !strings.ContainsRune(name, '/') && shim.BinaryName(name) != ""
|
||||
}
|
||||
|
|
107
daemon/runtime_unix_test.go
Normal file
107
daemon/runtime_unix_test.go
Normal file
|
@ -0,0 +1,107 @@
|
|||
//go:build !windows
|
||||
// +build !windows
|
||||
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
v2runcoptions "github.com/containerd/containerd/runtime/v2/runc/options"
|
||||
"gotest.tools/v3/assert"
|
||||
is "gotest.tools/v3/assert/cmp"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/daemon/config"
|
||||
"github.com/docker/docker/errdefs"
|
||||
)
|
||||
|
||||
func TestGetRuntime(t *testing.T) {
|
||||
// Configured runtimes can have any arbitrary name, including names
|
||||
// which would not be allowed as implicit runtime names. Explicit takes
|
||||
// precedence over implicit.
|
||||
const configuredRtName = "my/custom.shim.v1"
|
||||
configuredRuntime := types.Runtime{Path: "/bin/true"}
|
||||
|
||||
d := &Daemon{configStore: config.New()}
|
||||
d.configStore.Root = t.TempDir()
|
||||
assert.Assert(t, os.Mkdir(filepath.Join(d.configStore.Root, "runtimes"), 0700))
|
||||
d.configStore.Runtimes = map[string]types.Runtime{
|
||||
configuredRtName: configuredRuntime,
|
||||
}
|
||||
configureRuntimes(d.configStore)
|
||||
assert.Assert(t, d.loadRuntimes())
|
||||
|
||||
stockRuntime, ok := d.configStore.Runtimes[config.StockRuntimeName]
|
||||
assert.Assert(t, ok, "stock runtime could not be found (test needs to be updated)")
|
||||
|
||||
configdOpts := *stockRuntime.Shim.Opts.(*v2runcoptions.Options)
|
||||
configdOpts.BinaryName = configuredRuntime.Path
|
||||
wantConfigdRuntime := configuredRuntime
|
||||
wantConfigdRuntime.Shim = &types.ShimConfig{
|
||||
Binary: stockRuntime.Shim.Binary,
|
||||
Opts: &configdOpts,
|
||||
}
|
||||
|
||||
for _, tt := range []struct {
|
||||
name, runtime string
|
||||
want *types.Runtime
|
||||
}{
|
||||
{
|
||||
name: "StockRuntime",
|
||||
runtime: config.StockRuntimeName,
|
||||
want: &stockRuntime,
|
||||
},
|
||||
{
|
||||
name: "ShimName",
|
||||
runtime: "io.containerd.my-shim.v42",
|
||||
want: &types.Runtime{Shim: &types.ShimConfig{Binary: "io.containerd.my-shim.v42"}},
|
||||
},
|
||||
{
|
||||
// containerd is pretty loose about the format of runtime names. Perhaps too
|
||||
// loose. The only requirements are that the name contain a dot and (depending
|
||||
// on the containerd version) not start with a dot. It does not enforce any
|
||||
// particular format of the dot-delimited components of the name.
|
||||
name: "VersionlessShimName",
|
||||
runtime: "io.containerd.my-shim",
|
||||
want: &types.Runtime{Shim: &types.ShimConfig{Binary: "io.containerd.my-shim"}},
|
||||
},
|
||||
{
|
||||
name: "IllformedShimName",
|
||||
runtime: "myshim",
|
||||
},
|
||||
{
|
||||
name: "EmptyString",
|
||||
runtime: "",
|
||||
},
|
||||
{
|
||||
name: "PathToShim",
|
||||
runtime: "/path/to/runc",
|
||||
},
|
||||
{
|
||||
name: "PathToShimName",
|
||||
runtime: "/path/to/io.containerd.runc.v2",
|
||||
},
|
||||
{
|
||||
name: "RelPathToShim",
|
||||
runtime: "my/io.containerd.runc.v2",
|
||||
},
|
||||
{
|
||||
name: "ConfiguredRuntime",
|
||||
runtime: configuredRtName,
|
||||
want: &wantConfigdRuntime,
|
||||
},
|
||||
} {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := d.getRuntime(tt.runtime)
|
||||
assert.Check(t, is.DeepEqual(got, tt.want))
|
||||
if tt.want != nil {
|
||||
assert.Check(t, err)
|
||||
} else {
|
||||
assert.Check(t, errdefs.IsInvalidParameter(err))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -39,6 +39,7 @@ import (
|
|||
"github.com/moby/sys/mount"
|
||||
"golang.org/x/sys/unix"
|
||||
"gotest.tools/v3/assert"
|
||||
is "gotest.tools/v3/assert/cmp"
|
||||
"gotest.tools/v3/icmd"
|
||||
"gotest.tools/v3/poll"
|
||||
)
|
||||
|
@ -2384,7 +2385,7 @@ func (s *DockerDaemonSuite) TestRunWithRuntimeFromConfigFile(c *testing.T) {
|
|||
// Run with "vm"
|
||||
out, err = s.d.Cmd("run", "--rm", "--runtime=vm", "busybox", "ls")
|
||||
assert.ErrorContains(c, err, "", out)
|
||||
assert.Assert(c, strings.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
|
||||
assert.Assert(c, is.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
|
||||
// Reset config to only have the default
|
||||
config = `
|
||||
{
|
||||
|
@ -2404,11 +2405,11 @@ func (s *DockerDaemonSuite) TestRunWithRuntimeFromConfigFile(c *testing.T) {
|
|||
// Run with "oci"
|
||||
out, err = s.d.Cmd("run", "--rm", "--runtime=oci", "busybox", "ls")
|
||||
assert.ErrorContains(c, err, "", out)
|
||||
assert.Assert(c, strings.Contains(out, "Unknown runtime specified oci"))
|
||||
assert.Assert(c, is.Contains(out, "unknown or invalid runtime name: oci"))
|
||||
// Start previously created container with oci
|
||||
out, err = s.d.Cmd("start", "oci-runtime-ls")
|
||||
assert.ErrorContains(c, err, "", out)
|
||||
assert.Assert(c, strings.Contains(out, "Unknown runtime specified oci"))
|
||||
assert.Assert(c, is.Contains(out, "unknown or invalid runtime name: oci"))
|
||||
// Check that we can't override the default runtime
|
||||
config = `
|
||||
{
|
||||
|
@ -2426,7 +2427,7 @@ func (s *DockerDaemonSuite) TestRunWithRuntimeFromConfigFile(c *testing.T) {
|
|||
|
||||
content, err := s.d.ReadLogFile()
|
||||
assert.NilError(c, err)
|
||||
assert.Assert(c, strings.Contains(string(content), `file configuration validation failed: runtime name 'runc' is reserved`))
|
||||
assert.Assert(c, is.Contains(string(content), `file configuration validation failed: runtime name 'runc' is reserved`))
|
||||
// Check that we can select a default runtime
|
||||
config = `
|
||||
{
|
||||
|
@ -2451,7 +2452,7 @@ func (s *DockerDaemonSuite) TestRunWithRuntimeFromConfigFile(c *testing.T) {
|
|||
|
||||
out, err = s.d.Cmd("run", "--rm", "busybox", "ls")
|
||||
assert.ErrorContains(c, err, "", out)
|
||||
assert.Assert(c, strings.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
|
||||
assert.Assert(c, is.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
|
||||
// Run with default runtime explicitly
|
||||
out, err = s.d.Cmd("run", "--rm", "--runtime=runc", "busybox", "ls")
|
||||
assert.NilError(c, err, out)
|
||||
|
@ -2475,7 +2476,7 @@ func (s *DockerDaemonSuite) TestRunWithRuntimeFromCommandLine(c *testing.T) {
|
|||
// Run with "vm"
|
||||
out, err = s.d.Cmd("run", "--rm", "--runtime=vm", "busybox", "ls")
|
||||
assert.ErrorContains(c, err, "", out)
|
||||
assert.Assert(c, strings.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
|
||||
assert.Assert(c, is.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
|
||||
// Start a daemon without any extra runtimes
|
||||
s.d.Stop(c)
|
||||
s.d.StartWithBusybox(c)
|
||||
|
@ -2487,25 +2488,25 @@ func (s *DockerDaemonSuite) TestRunWithRuntimeFromCommandLine(c *testing.T) {
|
|||
// Run with "oci"
|
||||
out, err = s.d.Cmd("run", "--rm", "--runtime=oci", "busybox", "ls")
|
||||
assert.ErrorContains(c, err, "", out)
|
||||
assert.Assert(c, strings.Contains(out, "Unknown runtime specified oci"))
|
||||
assert.Assert(c, is.Contains(out, "unknown or invalid runtime name: oci"))
|
||||
// Start previously created container with oci
|
||||
out, err = s.d.Cmd("start", "oci-runtime-ls")
|
||||
assert.ErrorContains(c, err, "", out)
|
||||
assert.Assert(c, strings.Contains(out, "Unknown runtime specified oci"))
|
||||
assert.Assert(c, is.Contains(out, "unknown or invalid runtime name: oci"))
|
||||
// Check that we can't override the default runtime
|
||||
s.d.Stop(c)
|
||||
assert.Assert(c, s.d.StartWithError("--add-runtime", "runc=my-runc") != nil)
|
||||
|
||||
content, err := s.d.ReadLogFile()
|
||||
assert.NilError(c, err)
|
||||
assert.Assert(c, strings.Contains(string(content), `runtime name 'runc' is reserved`))
|
||||
assert.Assert(c, is.Contains(string(content), `runtime name 'runc' is reserved`))
|
||||
// Check that we can select a default runtime
|
||||
s.d.Stop(c)
|
||||
s.d.StartWithBusybox(c, "--default-runtime=vm", "--add-runtime", "oci=runc", "--add-runtime", "vm=/usr/local/bin/vm-manager")
|
||||
|
||||
out, err = s.d.Cmd("run", "--rm", "busybox", "ls")
|
||||
assert.ErrorContains(c, err, "", out)
|
||||
assert.Assert(c, strings.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
|
||||
assert.Assert(c, is.Contains(out, "/usr/local/bin/vm-manager: no such file or directory"))
|
||||
// Run with default runtime explicitly
|
||||
out, err = s.d.Cmd("run", "--rm", "--runtime=runc", "busybox", "ls")
|
||||
assert.NilError(c, err, out)
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
|
@ -15,7 +16,9 @@ import (
|
|||
"github.com/docker/docker/api/types/versions"
|
||||
"github.com/docker/docker/integration/internal/container"
|
||||
net "github.com/docker/docker/integration/internal/network"
|
||||
"github.com/docker/docker/pkg/stdcopy"
|
||||
"github.com/docker/docker/pkg/system"
|
||||
"github.com/docker/docker/testutil/daemon"
|
||||
"golang.org/x/sys/unix"
|
||||
"gotest.tools/v3/assert"
|
||||
is "gotest.tools/v3/assert/cmp"
|
||||
|
@ -214,3 +217,57 @@ func TestRunConsoleSize(t *testing.T) {
|
|||
|
||||
assert.Equal(t, strings.TrimSpace(b.String()), "123 57")
|
||||
}
|
||||
|
||||
func TestRunWithAlternativeContainerdShim(t *testing.T) {
|
||||
skip.If(t, testEnv.IsRemoteDaemon)
|
||||
skip.If(t, testEnv.DaemonInfo.OSType != "linux")
|
||||
|
||||
realShimPath, err := exec.LookPath("containerd-shim-runc-v2")
|
||||
assert.Assert(t, err)
|
||||
realShimPath, err = filepath.Abs(realShimPath)
|
||||
assert.Assert(t, err)
|
||||
|
||||
// t.TempDir() can't be used here as the temporary directory returned by
|
||||
// that function cannot be accessed by the fake-root user for rootless
|
||||
// Docker. It creates a nested hierarchy of directories where the
|
||||
// outermost has permission 0700.
|
||||
shimDir, err := os.MkdirTemp("", t.Name())
|
||||
assert.Assert(t, err)
|
||||
t.Cleanup(func() {
|
||||
if err := os.RemoveAll(shimDir); err != nil {
|
||||
t.Errorf("shimDir RemoveAll cleanup: %v", err)
|
||||
}
|
||||
})
|
||||
assert.Assert(t, os.Chmod(shimDir, 0777))
|
||||
shimDir, err = filepath.Abs(shimDir)
|
||||
assert.Assert(t, err)
|
||||
assert.Assert(t, os.Symlink(realShimPath, filepath.Join(shimDir, "containerd-shim-realfake-v42")))
|
||||
|
||||
d := daemon.New(t,
|
||||
daemon.WithEnvVars("PATH="+shimDir+":"+os.Getenv("PATH")),
|
||||
daemon.WithContainerdSocket(""), // A new containerd instance needs to be started which inherits the PATH env var defined above.
|
||||
)
|
||||
d.StartWithBusybox(t)
|
||||
defer d.Stop(t)
|
||||
|
||||
client := d.NewClientT(t)
|
||||
ctx := context.Background()
|
||||
|
||||
cID := container.Run(ctx, t, client,
|
||||
container.WithImage("busybox"),
|
||||
container.WithCmd("sh", "-c", `echo 'Hello, world!'`),
|
||||
container.WithRuntime("io.containerd.realfake.v42"),
|
||||
)
|
||||
|
||||
poll.WaitOn(t, container.IsStopped(ctx, client, cID), poll.WithDelay(100*time.Millisecond))
|
||||
|
||||
out, err := client.ContainerLogs(ctx, cID, types.ContainerLogsOptions{ShowStdout: true})
|
||||
assert.NilError(t, err)
|
||||
defer out.Close()
|
||||
|
||||
var b bytes.Buffer
|
||||
_, err = stdcopy.StdCopy(&b, io.Discard, out)
|
||||
assert.NilError(t, err)
|
||||
|
||||
assert.Equal(t, strings.TrimSpace(b.String()), "Hello, world!")
|
||||
}
|
||||
|
|
|
@ -234,3 +234,10 @@ func WithConsoleSize(width, height uint) func(*TestContainerConfig) {
|
|||
c.HostConfig.ConsoleSize = [2]uint{height, width}
|
||||
}
|
||||
}
|
||||
|
||||
// WithRuntime sets the runtime to use to start the container
|
||||
func WithRuntime(name string) func(*TestContainerConfig) {
|
||||
return func(c *TestContainerConfig) {
|
||||
c.HostConfig.Runtime = name
|
||||
}
|
||||
}
|
||||
|
|
|
@ -76,6 +76,7 @@ type Daemon struct {
|
|||
log LogT
|
||||
pidFile string
|
||||
args []string
|
||||
extraEnv []string
|
||||
containerdSocket string
|
||||
rootlessUser *user.User
|
||||
rootlessXDGRuntimeDir string
|
||||
|
@ -334,9 +335,10 @@ func (d *Daemon) StartWithLogFile(out *os.File, providedArgs ...string) error {
|
|||
dockerdBinary = "sudo"
|
||||
d.args = append(d.args,
|
||||
"-u", d.rootlessUser.Username,
|
||||
"-E", "XDG_RUNTIME_DIR="+d.rootlessXDGRuntimeDir,
|
||||
"-E", "HOME="+d.rootlessUser.HomeDir,
|
||||
"-E", "PATH="+os.Getenv("PATH"),
|
||||
"--preserve-env",
|
||||
"--preserve-env=PATH", // Pass through PATH, overriding secure_path.
|
||||
"XDG_RUNTIME_DIR="+d.rootlessXDGRuntimeDir,
|
||||
"HOME="+d.rootlessUser.HomeDir,
|
||||
"--",
|
||||
defaultDockerdRootlessBinary,
|
||||
)
|
||||
|
@ -392,6 +394,7 @@ func (d *Daemon) StartWithLogFile(out *os.File, providedArgs ...string) error {
|
|||
d.args = append(d.args, providedArgs...)
|
||||
d.cmd = exec.Command(dockerdBinary, d.args...)
|
||||
d.cmd.Env = append(os.Environ(), "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE=1")
|
||||
d.cmd.Env = append(d.cmd.Env, d.extraEnv...)
|
||||
d.cmd.Stdout = out
|
||||
d.cmd.Stderr = out
|
||||
d.logFile = out
|
||||
|
|
|
@ -122,3 +122,10 @@ func WithOOMScoreAdjust(score int) Option {
|
|||
d.OOMScoreAdjust = score
|
||||
}
|
||||
}
|
||||
|
||||
// WithEnvVars sets additional environment variables for the daemon
|
||||
func WithEnvVars(vars ...string) Option {
|
||||
return func(d *Daemon) {
|
||||
d.extraEnv = append(d.extraEnv, vars...)
|
||||
}
|
||||
}
|
||||
|
|
18
vendor/github.com/containerd/containerd/api/services/ttrpc/events/v1/doc.go
generated
vendored
Normal file
18
vendor/github.com/containerd/containerd/api/services/ttrpc/events/v1/doc.go
generated
vendored
Normal file
|
@ -0,0 +1,18 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package events defines the ttrpc event service.
|
||||
package events
|
760
vendor/github.com/containerd/containerd/api/services/ttrpc/events/v1/events.pb.go
generated
vendored
Normal file
760
vendor/github.com/containerd/containerd/api/services/ttrpc/events/v1/events.pb.go
generated
vendored
Normal file
|
@ -0,0 +1,760 @@
|
|||
// Code generated by protoc-gen-gogo. DO NOT EDIT.
|
||||
// source: github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
context "context"
|
||||
fmt "fmt"
|
||||
github_com_containerd_ttrpc "github.com/containerd/ttrpc"
|
||||
github_com_containerd_typeurl "github.com/containerd/typeurl"
|
||||
proto "github.com/gogo/protobuf/proto"
|
||||
github_com_gogo_protobuf_types "github.com/gogo/protobuf/types"
|
||||
types "github.com/gogo/protobuf/types"
|
||||
io "io"
|
||||
math "math"
|
||||
math_bits "math/bits"
|
||||
reflect "reflect"
|
||||
strings "strings"
|
||||
time "time"
|
||||
)
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
var _ = time.Kitchen
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the proto package it is being compiled against.
|
||||
// A compilation error at this line likely means your copy of the
|
||||
// proto package needs to be updated.
|
||||
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
|
||||
|
||||
type ForwardRequest struct {
|
||||
Envelope *Envelope `protobuf:"bytes,1,opt,name=envelope,proto3" json:"envelope,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (m *ForwardRequest) Reset() { *m = ForwardRequest{} }
|
||||
func (*ForwardRequest) ProtoMessage() {}
|
||||
func (*ForwardRequest) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_19f98672016720b5, []int{0}
|
||||
}
|
||||
func (m *ForwardRequest) XXX_Unmarshal(b []byte) error {
|
||||
return m.Unmarshal(b)
|
||||
}
|
||||
func (m *ForwardRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
if deterministic {
|
||||
return xxx_messageInfo_ForwardRequest.Marshal(b, m, deterministic)
|
||||
} else {
|
||||
b = b[:cap(b)]
|
||||
n, err := m.MarshalToSizedBuffer(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b[:n], nil
|
||||
}
|
||||
}
|
||||
func (m *ForwardRequest) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_ForwardRequest.Merge(m, src)
|
||||
}
|
||||
func (m *ForwardRequest) XXX_Size() int {
|
||||
return m.Size()
|
||||
}
|
||||
func (m *ForwardRequest) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_ForwardRequest.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_ForwardRequest proto.InternalMessageInfo
|
||||
|
||||
type Envelope struct {
|
||||
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"timestamp"`
|
||||
Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"`
|
||||
Topic string `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"`
|
||||
Event *types.Any `protobuf:"bytes,4,opt,name=event,proto3" json:"event,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Envelope) Reset() { *m = Envelope{} }
|
||||
func (*Envelope) ProtoMessage() {}
|
||||
func (*Envelope) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_19f98672016720b5, []int{1}
|
||||
}
|
||||
func (m *Envelope) XXX_Unmarshal(b []byte) error {
|
||||
return m.Unmarshal(b)
|
||||
}
|
||||
func (m *Envelope) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
if deterministic {
|
||||
return xxx_messageInfo_Envelope.Marshal(b, m, deterministic)
|
||||
} else {
|
||||
b = b[:cap(b)]
|
||||
n, err := m.MarshalToSizedBuffer(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b[:n], nil
|
||||
}
|
||||
}
|
||||
func (m *Envelope) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_Envelope.Merge(m, src)
|
||||
}
|
||||
func (m *Envelope) XXX_Size() int {
|
||||
return m.Size()
|
||||
}
|
||||
func (m *Envelope) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_Envelope.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_Envelope proto.InternalMessageInfo
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*ForwardRequest)(nil), "containerd.services.events.ttrpc.v1.ForwardRequest")
|
||||
proto.RegisterType((*Envelope)(nil), "containerd.services.events.ttrpc.v1.Envelope")
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterFile("github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto", fileDescriptor_19f98672016720b5)
|
||||
}
|
||||
|
||||
var fileDescriptor_19f98672016720b5 = []byte{
|
||||
// 396 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x52, 0xc1, 0x8e, 0xd3, 0x30,
|
||||
0x10, 0x8d, 0x61, 0x77, 0x69, 0x8d, 0xc4, 0xc1, 0xaa, 0x50, 0x08, 0x28, 0x59, 0x2d, 0x97, 0x15,
|
||||
0x12, 0xb6, 0x76, 0xf7, 0x06, 0x17, 0xa8, 0x28, 0x12, 0x1c, 0x23, 0x84, 0x2a, 0x90, 0x10, 0x6e,
|
||||
0x3a, 0x4d, 0x2d, 0x25, 0xb6, 0x49, 0x9c, 0xa0, 0xde, 0xfa, 0x09, 0x7c, 0x0c, 0x17, 0xfe, 0xa0,
|
||||
0x47, 0x8e, 0x9c, 0x80, 0xe6, 0x4b, 0x50, 0x9d, 0xa4, 0x81, 0xf6, 0x40, 0xa5, 0xbd, 0xbd, 0xcc,
|
||||
0x7b, 0x6f, 0xde, 0xcc, 0xc4, 0xf8, 0x75, 0x2c, 0xcc, 0xbc, 0x98, 0xd0, 0x48, 0xa5, 0x2c, 0x52,
|
||||
0xd2, 0x70, 0x21, 0x21, 0x9b, 0xfe, 0x0d, 0xb9, 0x16, 0x2c, 0x87, 0xac, 0x14, 0x11, 0xe4, 0xcc,
|
||||
0x98, 0x4c, 0x47, 0x0c, 0x4a, 0x90, 0x26, 0x67, 0xe5, 0x45, 0x83, 0xa8, 0xce, 0x94, 0x51, 0xe4,
|
||||
0x61, 0xe7, 0xa2, 0xad, 0x83, 0x36, 0x0a, 0x6b, 0xa4, 0xe5, 0x85, 0xf7, 0xec, 0xbf, 0x81, 0xb6,
|
||||
0xd9, 0xa4, 0x98, 0x31, 0x9d, 0x14, 0xb1, 0x90, 0x6c, 0x26, 0x20, 0x99, 0x6a, 0x6e, 0xe6, 0x75,
|
||||
0x8c, 0x37, 0x88, 0x55, 0xac, 0x2c, 0x64, 0x1b, 0xd4, 0x54, 0xef, 0xc5, 0x4a, 0xc5, 0x09, 0x74,
|
||||
0x6e, 0x2e, 0x17, 0x0d, 0x75, 0x7f, 0x97, 0x82, 0x54, 0x9b, 0x96, 0x0c, 0x76, 0x49, 0x23, 0x52,
|
||||
0xc8, 0x0d, 0x4f, 0x75, 0x2d, 0x38, 0x7b, 0x8f, 0xef, 0xbc, 0x54, 0xd9, 0x67, 0x9e, 0x4d, 0x43,
|
||||
0xf8, 0x54, 0x40, 0x6e, 0xc8, 0x2b, 0xdc, 0x03, 0x59, 0x42, 0xa2, 0x34, 0xb8, 0xe8, 0x14, 0x9d,
|
||||
0xdf, 0xbe, 0x7c, 0x4c, 0x0f, 0x58, 0x9d, 0x8e, 0x1a, 0x53, 0xb8, 0xb5, 0x9f, 0x7d, 0x45, 0xb8,
|
||||
0xd7, 0x96, 0xc9, 0x10, 0xf7, 0xb7, 0xe1, 0x4d, 0x63, 0x8f, 0xd6, 0xe3, 0xd1, 0x76, 0x3c, 0xfa,
|
||||
0xa6, 0x55, 0x0c, 0x7b, 0xab, 0x9f, 0x81, 0xf3, 0xe5, 0x57, 0x80, 0xc2, 0xce, 0x46, 0x1e, 0xe0,
|
||||
0xbe, 0xe4, 0x29, 0xe4, 0x9a, 0x47, 0xe0, 0xde, 0x38, 0x45, 0xe7, 0xfd, 0xb0, 0x2b, 0x90, 0x01,
|
||||
0x3e, 0x36, 0x4a, 0x8b, 0xc8, 0xbd, 0x69, 0x99, 0xfa, 0x83, 0x3c, 0xc2, 0xc7, 0x76, 0x54, 0xf7,
|
||||
0xc8, 0x66, 0x0e, 0xf6, 0x32, 0x9f, 0xcb, 0x45, 0x58, 0x4b, 0x9e, 0x1c, 0x2d, 0xbf, 0x05, 0xe8,
|
||||
0xf2, 0x23, 0x3e, 0x19, 0xd9, 0xe5, 0xc8, 0x5b, 0x7c, 0xab, 0xb9, 0x0e, 0xb9, 0x3a, 0xe8, 0x08,
|
||||
0xff, 0xde, 0xd2, 0xbb, 0xbb, 0x17, 0x36, 0xda, 0xfc, 0x9c, 0xe1, 0x87, 0xd5, 0xda, 0x77, 0x7e,
|
||||
0xac, 0x7d, 0x67, 0x59, 0xf9, 0x68, 0x55, 0xf9, 0xe8, 0x7b, 0xe5, 0xa3, 0xdf, 0x95, 0x8f, 0xde,
|
||||
0xbd, 0xb8, 0xd6, 0x8b, 0x7d, 0x5a, 0xa3, 0xb1, 0x33, 0x46, 0x93, 0x13, 0x9b, 0x79, 0xf5, 0x27,
|
||||
0x00, 0x00, 0xff, 0xff, 0xd4, 0x90, 0xbd, 0x09, 0x04, 0x03, 0x00, 0x00,
|
||||
}
|
||||
|
||||
// Field returns the value for the given fieldpath as a string, if defined.
|
||||
// If the value is not defined, the second value will be false.
|
||||
func (m *Envelope) Field(fieldpath []string) (string, bool) {
|
||||
if len(fieldpath) == 0 {
|
||||
return "", false
|
||||
}
|
||||
|
||||
switch fieldpath[0] {
|
||||
// unhandled: timestamp
|
||||
case "namespace":
|
||||
return string(m.Namespace), len(m.Namespace) > 0
|
||||
case "topic":
|
||||
return string(m.Topic), len(m.Topic) > 0
|
||||
case "event":
|
||||
decoded, err := github_com_containerd_typeurl.UnmarshalAny(m.Event)
|
||||
if err != nil {
|
||||
return "", false
|
||||
}
|
||||
|
||||
adaptor, ok := decoded.(interface{ Field([]string) (string, bool) })
|
||||
if !ok {
|
||||
return "", false
|
||||
}
|
||||
return adaptor.Field(fieldpath[1:])
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
func (m *ForwardRequest) Marshal() (dAtA []byte, err error) {
|
||||
size := m.Size()
|
||||
dAtA = make([]byte, size)
|
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dAtA[:n], nil
|
||||
}
|
||||
|
||||
func (m *ForwardRequest) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *ForwardRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
if m.XXX_unrecognized != nil {
|
||||
i -= len(m.XXX_unrecognized)
|
||||
copy(dAtA[i:], m.XXX_unrecognized)
|
||||
}
|
||||
if m.Envelope != nil {
|
||||
{
|
||||
size, err := m.Envelope.MarshalToSizedBuffer(dAtA[:i])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
i -= size
|
||||
i = encodeVarintEvents(dAtA, i, uint64(size))
|
||||
}
|
||||
i--
|
||||
dAtA[i] = 0xa
|
||||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func (m *Envelope) Marshal() (dAtA []byte, err error) {
|
||||
size := m.Size()
|
||||
dAtA = make([]byte, size)
|
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dAtA[:n], nil
|
||||
}
|
||||
|
||||
func (m *Envelope) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *Envelope) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
if m.XXX_unrecognized != nil {
|
||||
i -= len(m.XXX_unrecognized)
|
||||
copy(dAtA[i:], m.XXX_unrecognized)
|
||||
}
|
||||
if m.Event != nil {
|
||||
{
|
||||
size, err := m.Event.MarshalToSizedBuffer(dAtA[:i])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
i -= size
|
||||
i = encodeVarintEvents(dAtA, i, uint64(size))
|
||||
}
|
||||
i--
|
||||
dAtA[i] = 0x22
|
||||
}
|
||||
if len(m.Topic) > 0 {
|
||||
i -= len(m.Topic)
|
||||
copy(dAtA[i:], m.Topic)
|
||||
i = encodeVarintEvents(dAtA, i, uint64(len(m.Topic)))
|
||||
i--
|
||||
dAtA[i] = 0x1a
|
||||
}
|
||||
if len(m.Namespace) > 0 {
|
||||
i -= len(m.Namespace)
|
||||
copy(dAtA[i:], m.Namespace)
|
||||
i = encodeVarintEvents(dAtA, i, uint64(len(m.Namespace)))
|
||||
i--
|
||||
dAtA[i] = 0x12
|
||||
}
|
||||
n3, err3 := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i-github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp):])
|
||||
if err3 != nil {
|
||||
return 0, err3
|
||||
}
|
||||
i -= n3
|
||||
i = encodeVarintEvents(dAtA, i, uint64(n3))
|
||||
i--
|
||||
dAtA[i] = 0xa
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func encodeVarintEvents(dAtA []byte, offset int, v uint64) int {
|
||||
offset -= sovEvents(v)
|
||||
base := offset
|
||||
for v >= 1<<7 {
|
||||
dAtA[offset] = uint8(v&0x7f | 0x80)
|
||||
v >>= 7
|
||||
offset++
|
||||
}
|
||||
dAtA[offset] = uint8(v)
|
||||
return base
|
||||
}
|
||||
func (m *ForwardRequest) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
var l int
|
||||
_ = l
|
||||
if m.Envelope != nil {
|
||||
l = m.Envelope.Size()
|
||||
n += 1 + l + sovEvents(uint64(l))
|
||||
}
|
||||
if m.XXX_unrecognized != nil {
|
||||
n += len(m.XXX_unrecognized)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (m *Envelope) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
var l int
|
||||
_ = l
|
||||
l = github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp)
|
||||
n += 1 + l + sovEvents(uint64(l))
|
||||
l = len(m.Namespace)
|
||||
if l > 0 {
|
||||
n += 1 + l + sovEvents(uint64(l))
|
||||
}
|
||||
l = len(m.Topic)
|
||||
if l > 0 {
|
||||
n += 1 + l + sovEvents(uint64(l))
|
||||
}
|
||||
if m.Event != nil {
|
||||
l = m.Event.Size()
|
||||
n += 1 + l + sovEvents(uint64(l))
|
||||
}
|
||||
if m.XXX_unrecognized != nil {
|
||||
n += len(m.XXX_unrecognized)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func sovEvents(x uint64) (n int) {
|
||||
return (math_bits.Len64(x|1) + 6) / 7
|
||||
}
|
||||
func sozEvents(x uint64) (n int) {
|
||||
return sovEvents(uint64((x << 1) ^ uint64((int64(x) >> 63))))
|
||||
}
|
||||
func (this *ForwardRequest) String() string {
|
||||
if this == nil {
|
||||
return "nil"
|
||||
}
|
||||
s := strings.Join([]string{`&ForwardRequest{`,
|
||||
`Envelope:` + strings.Replace(this.Envelope.String(), "Envelope", "Envelope", 1) + `,`,
|
||||
`XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`,
|
||||
`}`,
|
||||
}, "")
|
||||
return s
|
||||
}
|
||||
func (this *Envelope) String() string {
|
||||
if this == nil {
|
||||
return "nil"
|
||||
}
|
||||
s := strings.Join([]string{`&Envelope{`,
|
||||
`Timestamp:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Timestamp), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
|
||||
`Namespace:` + fmt.Sprintf("%v", this.Namespace) + `,`,
|
||||
`Topic:` + fmt.Sprintf("%v", this.Topic) + `,`,
|
||||
`Event:` + strings.Replace(fmt.Sprintf("%v", this.Event), "Any", "types.Any", 1) + `,`,
|
||||
`XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`,
|
||||
`}`,
|
||||
}, "")
|
||||
return s
|
||||
}
|
||||
func valueToStringEvents(v interface{}) string {
|
||||
rv := reflect.ValueOf(v)
|
||||
if rv.IsNil() {
|
||||
return "nil"
|
||||
}
|
||||
pv := reflect.Indirect(rv).Interface()
|
||||
return fmt.Sprintf("*%v", pv)
|
||||
}
|
||||
|
||||
type EventsService interface {
|
||||
Forward(ctx context.Context, req *ForwardRequest) (*types.Empty, error)
|
||||
}
|
||||
|
||||
func RegisterEventsService(srv *github_com_containerd_ttrpc.Server, svc EventsService) {
|
||||
srv.Register("containerd.services.events.ttrpc.v1.Events", map[string]github_com_containerd_ttrpc.Method{
|
||||
"Forward": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
|
||||
var req ForwardRequest
|
||||
if err := unmarshal(&req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return svc.Forward(ctx, &req)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
type eventsClient struct {
|
||||
client *github_com_containerd_ttrpc.Client
|
||||
}
|
||||
|
||||
func NewEventsClient(client *github_com_containerd_ttrpc.Client) EventsService {
|
||||
return &eventsClient{
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *eventsClient) Forward(ctx context.Context, req *ForwardRequest) (*types.Empty, error) {
|
||||
var resp types.Empty
|
||||
if err := c.client.Call(ctx, "containerd.services.events.ttrpc.v1.Events", "Forward", req, &resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
func (m *ForwardRequest) Unmarshal(dAtA []byte) error {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
preIndex := iNdEx
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
if wireType == 4 {
|
||||
return fmt.Errorf("proto: ForwardRequest: wiretype end group for non-group")
|
||||
}
|
||||
if fieldNum <= 0 {
|
||||
return fmt.Errorf("proto: ForwardRequest: illegal tag %d (wire type %d)", fieldNum, wire)
|
||||
}
|
||||
switch fieldNum {
|
||||
case 1:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Envelope", wireType)
|
||||
}
|
||||
var msglen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
msglen |= int(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if msglen < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
postIndex := iNdEx + msglen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
if m.Envelope == nil {
|
||||
m.Envelope = &Envelope{}
|
||||
}
|
||||
if err := m.Envelope.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
iNdEx = postIndex
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipEvents(dAtA[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if (skippy < 0) || (iNdEx+skippy) < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
|
||||
if iNdEx > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (m *Envelope) Unmarshal(dAtA []byte) error {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
preIndex := iNdEx
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
if wireType == 4 {
|
||||
return fmt.Errorf("proto: Envelope: wiretype end group for non-group")
|
||||
}
|
||||
if fieldNum <= 0 {
|
||||
return fmt.Errorf("proto: Envelope: illegal tag %d (wire type %d)", fieldNum, wire)
|
||||
}
|
||||
switch fieldNum {
|
||||
case 1:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType)
|
||||
}
|
||||
var msglen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
msglen |= int(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if msglen < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
postIndex := iNdEx + msglen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
iNdEx = postIndex
|
||||
case 2:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Namespace", wireType)
|
||||
}
|
||||
var stringLen uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
stringLen |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
intStringLen := int(stringLen)
|
||||
if intStringLen < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
postIndex := iNdEx + intStringLen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.Namespace = string(dAtA[iNdEx:postIndex])
|
||||
iNdEx = postIndex
|
||||
case 3:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType)
|
||||
}
|
||||
var stringLen uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
stringLen |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
intStringLen := int(stringLen)
|
||||
if intStringLen < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
postIndex := iNdEx + intStringLen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.Topic = string(dAtA[iNdEx:postIndex])
|
||||
iNdEx = postIndex
|
||||
case 4:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Event", wireType)
|
||||
}
|
||||
var msglen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
msglen |= int(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if msglen < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
postIndex := iNdEx + msglen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
if m.Event == nil {
|
||||
m.Event = &types.Any{}
|
||||
}
|
||||
if err := m.Event.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
iNdEx = postIndex
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipEvents(dAtA[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if (skippy < 0) || (iNdEx+skippy) < 0 {
|
||||
return ErrInvalidLengthEvents
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
|
||||
if iNdEx > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func skipEvents(dAtA []byte) (n int, err error) {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
depth := 0
|
||||
for iNdEx < l {
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return 0, ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
wireType := int(wire & 0x7)
|
||||
switch wireType {
|
||||
case 0:
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return 0, ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
iNdEx++
|
||||
if dAtA[iNdEx-1] < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
case 1:
|
||||
iNdEx += 8
|
||||
case 2:
|
||||
var length int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return 0, ErrIntOverflowEvents
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
length |= (int(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if length < 0 {
|
||||
return 0, ErrInvalidLengthEvents
|
||||
}
|
||||
iNdEx += length
|
||||
case 3:
|
||||
depth++
|
||||
case 4:
|
||||
if depth == 0 {
|
||||
return 0, ErrUnexpectedEndOfGroupEvents
|
||||
}
|
||||
depth--
|
||||
case 5:
|
||||
iNdEx += 4
|
||||
default:
|
||||
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
|
||||
}
|
||||
if iNdEx < 0 {
|
||||
return 0, ErrInvalidLengthEvents
|
||||
}
|
||||
if depth == 0 {
|
||||
return iNdEx, nil
|
||||
}
|
||||
}
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
|
||||
var (
|
||||
ErrInvalidLengthEvents = fmt.Errorf("proto: negative length found during unmarshaling")
|
||||
ErrIntOverflowEvents = fmt.Errorf("proto: integer overflow")
|
||||
ErrUnexpectedEndOfGroupEvents = fmt.Errorf("proto: unexpected end of group")
|
||||
)
|
48
vendor/github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto
generated
vendored
Normal file
48
vendor/github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto
generated
vendored
Normal file
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
package containerd.services.events.ttrpc.v1;
|
||||
|
||||
import weak "github.com/containerd/containerd/protobuf/plugin/fieldpath.proto";
|
||||
import weak "gogoproto/gogo.proto";
|
||||
import "google/protobuf/any.proto";
|
||||
import "google/protobuf/empty.proto";
|
||||
import "google/protobuf/timestamp.proto";
|
||||
|
||||
option go_package = "github.com/containerd/containerd/api/services/ttrpc/events/v1;events";
|
||||
|
||||
service Events {
|
||||
// Forward sends an event that has already been packaged into an envelope
|
||||
// with a timestamp and namespace.
|
||||
//
|
||||
// This is useful if earlier timestamping is required or when forwarding on
|
||||
// behalf of another component, namespace or publisher.
|
||||
rpc Forward(ForwardRequest) returns (google.protobuf.Empty);
|
||||
}
|
||||
|
||||
message ForwardRequest {
|
||||
Envelope envelope = 1;
|
||||
}
|
||||
|
||||
message Envelope {
|
||||
option (containerd.plugin.fieldpath) = true;
|
||||
google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
||||
string namespace = 2;
|
||||
string topic = 3;
|
||||
google.protobuf.Any event = 4;
|
||||
}
|
109
vendor/github.com/containerd/containerd/pkg/shutdown/shutdown.go
generated
vendored
Normal file
109
vendor/github.com/containerd/containerd/pkg/shutdown/shutdown.go
generated
vendored
Normal file
|
@ -0,0 +1,109 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shutdown
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// ErrShutdown is the error condition when a context has been fully shutdown
|
||||
var ErrShutdown = errors.New("shutdown")
|
||||
|
||||
// Service is used to facilitate shutdown by through callback
|
||||
// registration and shutdown initiation
|
||||
type Service interface {
|
||||
// Shutdown initiates shutdown
|
||||
Shutdown()
|
||||
// RegisterCallback registers functions to be called on shutdown and before
|
||||
// the shutdown channel is closed. A callback error will propagate to the
|
||||
// context error
|
||||
RegisterCallback(func(context.Context) error)
|
||||
}
|
||||
|
||||
// WithShutdown returns a context which is similar to a cancel context, but
|
||||
// with callbacks which can propagate to the context error. Unlike a cancel
|
||||
// context, the shutdown context cannot be canceled from the parent context.
|
||||
// However, future child contexes will be canceled upon shutdown.
|
||||
func WithShutdown(ctx context.Context) (context.Context, Service) {
|
||||
ss := &shutdownService{
|
||||
Context: ctx,
|
||||
doneC: make(chan struct{}),
|
||||
timeout: 30 * time.Second,
|
||||
}
|
||||
return ss, ss
|
||||
}
|
||||
|
||||
type shutdownService struct {
|
||||
context.Context
|
||||
|
||||
mu sync.Mutex
|
||||
isShutdown bool
|
||||
callbacks []func(context.Context) error
|
||||
doneC chan struct{}
|
||||
err error
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
func (s *shutdownService) Shutdown() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.isShutdown {
|
||||
return
|
||||
}
|
||||
s.isShutdown = true
|
||||
|
||||
go func(callbacks []func(context.Context) error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), s.timeout)
|
||||
defer cancel()
|
||||
grp, ctx := errgroup.WithContext(ctx)
|
||||
for i := range callbacks {
|
||||
fn := callbacks[i]
|
||||
grp.Go(func() error { return fn(ctx) })
|
||||
}
|
||||
err := grp.Wait()
|
||||
if err == nil {
|
||||
err = ErrShutdown
|
||||
}
|
||||
s.mu.Lock()
|
||||
s.err = err
|
||||
close(s.doneC)
|
||||
s.mu.Unlock()
|
||||
}(s.callbacks)
|
||||
}
|
||||
|
||||
func (s *shutdownService) Done() <-chan struct{} {
|
||||
return s.doneC
|
||||
}
|
||||
|
||||
func (s *shutdownService) Err() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.err
|
||||
}
|
||||
func (s *shutdownService) RegisterCallback(fn func(context.Context) error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.callbacks == nil {
|
||||
s.callbacks = []func(context.Context) error{}
|
||||
}
|
||||
s.callbacks = append(s.callbacks, fn)
|
||||
}
|
120
vendor/github.com/containerd/containerd/pkg/ttrpcutil/client.go
generated
vendored
Normal file
120
vendor/github.com/containerd/containerd/pkg/ttrpcutil/client.go
generated
vendored
Normal file
|
@ -0,0 +1,120 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package ttrpcutil
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
|
||||
"github.com/containerd/containerd/pkg/dialer"
|
||||
"github.com/containerd/ttrpc"
|
||||
)
|
||||
|
||||
const ttrpcDialTimeout = 5 * time.Second
|
||||
|
||||
type ttrpcConnector func() (*ttrpc.Client, error)
|
||||
|
||||
// Client is the client to interact with TTRPC part of containerd server (plugins, events)
|
||||
type Client struct {
|
||||
mu sync.Mutex
|
||||
connector ttrpcConnector
|
||||
client *ttrpc.Client
|
||||
closed bool
|
||||
}
|
||||
|
||||
// NewClient returns a new containerd TTRPC client that is connected to the containerd instance provided by address
|
||||
func NewClient(address string, opts ...ttrpc.ClientOpts) (*Client, error) {
|
||||
connector := func() (*ttrpc.Client, error) {
|
||||
conn, err := dialer.Dialer(address, ttrpcDialTimeout)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect: %w", err)
|
||||
}
|
||||
|
||||
client := ttrpc.NewClient(conn, opts...)
|
||||
return client, nil
|
||||
}
|
||||
|
||||
return &Client{
|
||||
connector: connector,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Reconnect re-establishes the TTRPC connection to the containerd daemon
|
||||
func (c *Client) Reconnect() error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if c.connector == nil {
|
||||
return errors.New("unable to reconnect to containerd, no connector available")
|
||||
}
|
||||
|
||||
if c.closed {
|
||||
return errors.New("client is closed")
|
||||
}
|
||||
|
||||
if c.client != nil {
|
||||
if err := c.client.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
client, err := c.connector()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.client = client
|
||||
return nil
|
||||
}
|
||||
|
||||
// EventsService creates an EventsService client
|
||||
func (c *Client) EventsService() (v1.EventsService, error) {
|
||||
client, err := c.Client()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return v1.NewEventsClient(client), nil
|
||||
}
|
||||
|
||||
// Client returns the underlying TTRPC client object
|
||||
func (c *Client) Client() (*ttrpc.Client, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.client == nil {
|
||||
client, err := c.connector()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.client = client
|
||||
}
|
||||
return c.client, nil
|
||||
}
|
||||
|
||||
// Close closes the clients TTRPC connection to containerd
|
||||
func (c *Client) Close() error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
c.closed = true
|
||||
if c.client != nil {
|
||||
return c.client.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
169
vendor/github.com/containerd/containerd/runtime/v2/shim/publisher.go
generated
vendored
Normal file
169
vendor/github.com/containerd/containerd/runtime/v2/shim/publisher.go
generated
vendored
Normal file
|
@ -0,0 +1,169 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
|
||||
"github.com/containerd/containerd/events"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/pkg/ttrpcutil"
|
||||
"github.com/containerd/ttrpc"
|
||||
"github.com/containerd/typeurl"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
queueSize = 2048
|
||||
maxRequeue = 5
|
||||
)
|
||||
|
||||
type item struct {
|
||||
ev *v1.Envelope
|
||||
ctx context.Context
|
||||
count int
|
||||
}
|
||||
|
||||
// NewPublisher creates a new remote events publisher
|
||||
func NewPublisher(address string) (*RemoteEventsPublisher, error) {
|
||||
client, err := ttrpcutil.NewClient(address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
l := &RemoteEventsPublisher{
|
||||
client: client,
|
||||
closed: make(chan struct{}),
|
||||
requeue: make(chan *item, queueSize),
|
||||
}
|
||||
|
||||
go l.processQueue()
|
||||
return l, nil
|
||||
}
|
||||
|
||||
// RemoteEventsPublisher forwards events to a ttrpc server
|
||||
type RemoteEventsPublisher struct {
|
||||
client *ttrpcutil.Client
|
||||
closed chan struct{}
|
||||
closer sync.Once
|
||||
requeue chan *item
|
||||
}
|
||||
|
||||
// Done returns a channel which closes when done
|
||||
func (l *RemoteEventsPublisher) Done() <-chan struct{} {
|
||||
return l.closed
|
||||
}
|
||||
|
||||
// Close closes the remote connection and closes the done channel
|
||||
func (l *RemoteEventsPublisher) Close() (err error) {
|
||||
err = l.client.Close()
|
||||
l.closer.Do(func() {
|
||||
close(l.closed)
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (l *RemoteEventsPublisher) processQueue() {
|
||||
for i := range l.requeue {
|
||||
if i.count > maxRequeue {
|
||||
logrus.Errorf("evicting %s from queue because of retry count", i.ev.Topic)
|
||||
// drop the event
|
||||
continue
|
||||
}
|
||||
|
||||
if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil {
|
||||
logrus.WithError(err).Error("forward event")
|
||||
l.queue(i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (l *RemoteEventsPublisher) queue(i *item) {
|
||||
go func() {
|
||||
i.count++
|
||||
// re-queue after a short delay
|
||||
time.Sleep(time.Duration(1*i.count) * time.Second)
|
||||
l.requeue <- i
|
||||
}()
|
||||
}
|
||||
|
||||
// Publish publishes the event by forwarding it to the configured ttrpc server
|
||||
func (l *RemoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
any, err := typeurl.MarshalAny(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
i := &item{
|
||||
ev: &v1.Envelope{
|
||||
Timestamp: time.Now(),
|
||||
Namespace: ns,
|
||||
Topic: topic,
|
||||
Event: any,
|
||||
},
|
||||
ctx: ctx,
|
||||
}
|
||||
|
||||
if err := l.forwardRequest(i.ctx, &v1.ForwardRequest{Envelope: i.ev}); err != nil {
|
||||
l.queue(i)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *RemoteEventsPublisher) forwardRequest(ctx context.Context, req *v1.ForwardRequest) error {
|
||||
service, err := l.client.EventsService()
|
||||
if err == nil {
|
||||
fCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
_, err = service.Forward(fCtx, req)
|
||||
cancel()
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
if err != ttrpc.ErrClosed {
|
||||
return err
|
||||
}
|
||||
|
||||
// Reconnect and retry request
|
||||
if err = l.client.Reconnect(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
service, err = l.client.EventsService()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// try again with a fresh context, otherwise we may get a context timeout unexpectedly.
|
||||
fCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
_, err = service.Forward(fCtx, req)
|
||||
cancel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
501
vendor/github.com/containerd/containerd/runtime/v2/shim/shim.go
generated
vendored
Normal file
501
vendor/github.com/containerd/containerd/runtime/v2/shim/shim.go
generated
vendored
Normal file
|
@ -0,0 +1,501 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/events"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/pkg/shutdown"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
shimapi "github.com/containerd/containerd/runtime/v2/task"
|
||||
"github.com/containerd/containerd/version"
|
||||
"github.com/containerd/ttrpc"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// Publisher for events
|
||||
type Publisher interface {
|
||||
events.Publisher
|
||||
io.Closer
|
||||
}
|
||||
|
||||
// StartOpts describes shim start configuration received from containerd
|
||||
type StartOpts struct {
|
||||
ID string // TODO(2.0): Remove ID, passed directly to start for call symmetry
|
||||
ContainerdBinary string
|
||||
Address string
|
||||
TTRPCAddress string
|
||||
}
|
||||
|
||||
type StopStatus struct {
|
||||
Pid int
|
||||
ExitStatus int
|
||||
ExitedAt time.Time
|
||||
}
|
||||
|
||||
// Init func for the creation of a shim server
|
||||
// TODO(2.0): Remove init function
|
||||
type Init func(context.Context, string, Publisher, func()) (Shim, error)
|
||||
|
||||
// Shim server interface
|
||||
// TODO(2.0): Remove unified shim interface
|
||||
type Shim interface {
|
||||
shimapi.TaskService
|
||||
Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error)
|
||||
StartShim(ctx context.Context, opts StartOpts) (string, error)
|
||||
}
|
||||
|
||||
// Manager is the interface which manages the shim process
|
||||
type Manager interface {
|
||||
Name() string
|
||||
Start(ctx context.Context, id string, opts StartOpts) (string, error)
|
||||
Stop(ctx context.Context, id string) (StopStatus, error)
|
||||
}
|
||||
|
||||
// OptsKey is the context key for the Opts value.
|
||||
type OptsKey struct{}
|
||||
|
||||
// Opts are context options associated with the shim invocation.
|
||||
type Opts struct {
|
||||
BundlePath string
|
||||
Debug bool
|
||||
}
|
||||
|
||||
// BinaryOpts allows the configuration of a shims binary setup
|
||||
type BinaryOpts func(*Config)
|
||||
|
||||
// Config of shim binary options provided by shim implementations
|
||||
type Config struct {
|
||||
// NoSubreaper disables setting the shim as a child subreaper
|
||||
NoSubreaper bool
|
||||
// NoReaper disables the shim binary from reaping any child process implicitly
|
||||
NoReaper bool
|
||||
// NoSetupLogger disables automatic configuration of logrus to use the shim FIFO
|
||||
NoSetupLogger bool
|
||||
}
|
||||
|
||||
type ttrpcService interface {
|
||||
RegisterTTRPC(*ttrpc.Server) error
|
||||
}
|
||||
|
||||
type taskService struct {
|
||||
shimapi.TaskService
|
||||
}
|
||||
|
||||
func (t taskService) RegisterTTRPC(server *ttrpc.Server) error {
|
||||
shimapi.RegisterTaskService(server, t.TaskService)
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
debugFlag bool
|
||||
versionFlag bool
|
||||
id string
|
||||
namespaceFlag string
|
||||
socketFlag string
|
||||
bundlePath string
|
||||
addressFlag string
|
||||
containerdBinaryFlag string
|
||||
action string
|
||||
)
|
||||
|
||||
const (
|
||||
ttrpcAddressEnv = "TTRPC_ADDRESS"
|
||||
)
|
||||
|
||||
func parseFlags() {
|
||||
flag.BoolVar(&debugFlag, "debug", false, "enable debug output in logs")
|
||||
flag.BoolVar(&versionFlag, "v", false, "show the shim version and exit")
|
||||
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
|
||||
flag.StringVar(&id, "id", "", "id of the task")
|
||||
flag.StringVar(&socketFlag, "socket", "", "socket path to serve")
|
||||
flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir")
|
||||
|
||||
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
|
||||
flag.StringVar(&containerdBinaryFlag, "publish-binary", "containerd", "path to publish binary (used for publishing events)")
|
||||
|
||||
flag.Parse()
|
||||
action = flag.Arg(0)
|
||||
}
|
||||
|
||||
func setRuntime() {
|
||||
debug.SetGCPercent(40)
|
||||
go func() {
|
||||
for range time.Tick(30 * time.Second) {
|
||||
debug.FreeOSMemory()
|
||||
}
|
||||
}()
|
||||
if os.Getenv("GOMAXPROCS") == "" {
|
||||
// If GOMAXPROCS hasn't been set, we default to a value of 2 to reduce
|
||||
// the number of Go stacks present in the shim.
|
||||
runtime.GOMAXPROCS(2)
|
||||
}
|
||||
}
|
||||
|
||||
func setLogger(ctx context.Context, id string) (context.Context, error) {
|
||||
l := log.G(ctx)
|
||||
l.Logger.SetFormatter(&logrus.TextFormatter{
|
||||
TimestampFormat: log.RFC3339NanoFixed,
|
||||
FullTimestamp: true,
|
||||
})
|
||||
if debugFlag {
|
||||
l.Logger.SetLevel(logrus.DebugLevel)
|
||||
}
|
||||
f, err := openLog(ctx, id)
|
||||
if err != nil { //nolint:staticcheck // Ignore SA4023 as some platforms always return error
|
||||
return ctx, err
|
||||
}
|
||||
l.Logger.SetOutput(f)
|
||||
return log.WithLogger(ctx, l), nil
|
||||
}
|
||||
|
||||
// Run initializes and runs a shim server
|
||||
// TODO(2.0): Remove function
|
||||
func Run(name string, initFunc Init, opts ...BinaryOpts) {
|
||||
var config Config
|
||||
for _, o := range opts {
|
||||
o(&config)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", name))
|
||||
|
||||
if err := run(ctx, nil, initFunc, name, config); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "%s: %s", name, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(2.0): Remove this type
|
||||
type shimToManager struct {
|
||||
shim Shim
|
||||
name string
|
||||
}
|
||||
|
||||
func (stm shimToManager) Name() string {
|
||||
return stm.name
|
||||
}
|
||||
|
||||
func (stm shimToManager) Start(ctx context.Context, id string, opts StartOpts) (string, error) {
|
||||
opts.ID = id
|
||||
return stm.shim.StartShim(ctx, opts)
|
||||
}
|
||||
|
||||
func (stm shimToManager) Stop(ctx context.Context, id string) (StopStatus, error) {
|
||||
// shim must already have id
|
||||
dr, err := stm.shim.Cleanup(ctx)
|
||||
if err != nil {
|
||||
return StopStatus{}, err
|
||||
}
|
||||
return StopStatus{
|
||||
Pid: int(dr.Pid),
|
||||
ExitStatus: int(dr.ExitStatus),
|
||||
ExitedAt: dr.ExitedAt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// RunManager initialzes and runs a shim server
|
||||
// TODO(2.0): Rename to Run
|
||||
func RunManager(ctx context.Context, manager Manager, opts ...BinaryOpts) {
|
||||
var config Config
|
||||
for _, o := range opts {
|
||||
o(&config)
|
||||
}
|
||||
|
||||
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", manager.Name()))
|
||||
|
||||
if err := run(ctx, manager, nil, "", config); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "%s: %s", manager.Name(), err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func run(ctx context.Context, manager Manager, initFunc Init, name string, config Config) error {
|
||||
parseFlags()
|
||||
if versionFlag {
|
||||
fmt.Printf("%s:\n", os.Args[0])
|
||||
fmt.Println(" Version: ", version.Version)
|
||||
fmt.Println(" Revision:", version.Revision)
|
||||
fmt.Println(" Go version:", version.GoVersion)
|
||||
fmt.Println("")
|
||||
return nil
|
||||
}
|
||||
|
||||
if namespaceFlag == "" {
|
||||
return fmt.Errorf("shim namespace cannot be empty")
|
||||
}
|
||||
|
||||
setRuntime()
|
||||
|
||||
signals, err := setupSignals(config)
|
||||
if err != nil { //nolint:staticcheck // Ignore SA4023 as some platforms always return error
|
||||
return err
|
||||
}
|
||||
|
||||
if !config.NoSubreaper {
|
||||
if err := subreaper(); err != nil { //nolint:staticcheck // Ignore SA4023 as some platforms always return error
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
ttrpcAddress := os.Getenv(ttrpcAddressEnv)
|
||||
publisher, err := NewPublisher(ttrpcAddress)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer publisher.Close()
|
||||
|
||||
ctx = namespaces.WithNamespace(ctx, namespaceFlag)
|
||||
ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
|
||||
ctx, sd := shutdown.WithShutdown(ctx)
|
||||
defer sd.Shutdown()
|
||||
|
||||
if manager == nil {
|
||||
service, err := initFunc(ctx, id, publisher, sd.Shutdown)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
plugin.Register(&plugin.Registration{
|
||||
Type: plugin.TTRPCPlugin,
|
||||
ID: "task",
|
||||
Requires: []plugin.Type{
|
||||
plugin.EventPlugin,
|
||||
},
|
||||
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
||||
return taskService{service}, nil
|
||||
},
|
||||
})
|
||||
manager = shimToManager{
|
||||
shim: service,
|
||||
name: name,
|
||||
}
|
||||
}
|
||||
|
||||
// Handle explicit actions
|
||||
switch action {
|
||||
case "delete":
|
||||
logger := log.G(ctx).WithFields(logrus.Fields{
|
||||
"pid": os.Getpid(),
|
||||
"namespace": namespaceFlag,
|
||||
})
|
||||
go reap(ctx, logger, signals)
|
||||
ss, err := manager.Stop(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data, err := proto.Marshal(&shimapi.DeleteResponse{
|
||||
Pid: uint32(ss.Pid),
|
||||
ExitStatus: uint32(ss.ExitStatus),
|
||||
ExitedAt: ss.ExitedAt,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := os.Stdout.Write(data); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
case "start":
|
||||
opts := StartOpts{
|
||||
ContainerdBinary: containerdBinaryFlag,
|
||||
Address: addressFlag,
|
||||
TTRPCAddress: ttrpcAddress,
|
||||
}
|
||||
|
||||
address, err := manager.Start(ctx, id, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := os.Stdout.WriteString(address); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if !config.NoSetupLogger {
|
||||
ctx, err = setLogger(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
plugin.Register(&plugin.Registration{
|
||||
Type: plugin.InternalPlugin,
|
||||
ID: "shutdown",
|
||||
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
||||
return sd, nil
|
||||
},
|
||||
})
|
||||
|
||||
// Register event plugin
|
||||
plugin.Register(&plugin.Registration{
|
||||
Type: plugin.EventPlugin,
|
||||
ID: "publisher",
|
||||
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
||||
return publisher, nil
|
||||
},
|
||||
})
|
||||
|
||||
var (
|
||||
initialized = plugin.NewPluginSet()
|
||||
ttrpcServices = []ttrpcService{}
|
||||
)
|
||||
plugins := plugin.Graph(func(*plugin.Registration) bool { return false })
|
||||
for _, p := range plugins {
|
||||
id := p.URI()
|
||||
log.G(ctx).WithField("type", p.Type).Infof("loading plugin %q...", id)
|
||||
|
||||
initContext := plugin.NewContext(
|
||||
ctx,
|
||||
p,
|
||||
initialized,
|
||||
// NOTE: Root is empty since the shim does not support persistent storage,
|
||||
// shim plugins should make use state directory for writing files to disk.
|
||||
// The state directory will be destroyed when the shim if cleaned up or
|
||||
// on reboot
|
||||
"",
|
||||
bundlePath,
|
||||
)
|
||||
initContext.Address = addressFlag
|
||||
initContext.TTRPCAddress = ttrpcAddress
|
||||
|
||||
// load the plugin specific configuration if it is provided
|
||||
//TODO: Read configuration passed into shim, or from state directory?
|
||||
//if p.Config != nil {
|
||||
// pc, err := config.Decode(p)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// initContext.Config = pc
|
||||
//}
|
||||
|
||||
result := p.Init(initContext)
|
||||
if err := initialized.Add(result); err != nil {
|
||||
return fmt.Errorf("could not add plugin result to plugin set: %w", err)
|
||||
}
|
||||
|
||||
instance, err := result.Instance()
|
||||
if err != nil {
|
||||
if plugin.IsSkipPlugin(err) {
|
||||
log.G(ctx).WithError(err).WithField("type", p.Type).Infof("skip loading plugin %q...", id)
|
||||
} else {
|
||||
log.G(ctx).WithError(err).Warnf("failed to load plugin %s", id)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if src, ok := instance.(ttrpcService); ok {
|
||||
logrus.WithField("id", id).Debug("registering ttrpc service")
|
||||
ttrpcServices = append(ttrpcServices, src)
|
||||
}
|
||||
}
|
||||
|
||||
server, err := newServer()
|
||||
if err != nil { //nolint:staticcheck // Ignore SA4023 as some platforms always return error
|
||||
return fmt.Errorf("failed creating server: %w", err)
|
||||
}
|
||||
|
||||
for _, srv := range ttrpcServices {
|
||||
if err := srv.RegisterTTRPC(server); err != nil {
|
||||
return fmt.Errorf("failed to register service: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := serve(ctx, server, signals, sd.Shutdown); err != nil { //nolint:staticcheck // Ignore SA4023 as some platforms always return error
|
||||
if err != shutdown.ErrShutdown {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: If the shim server is down(like oom killer), the address
|
||||
// socket might be leaking.
|
||||
if address, err := ReadAddress("address"); err == nil {
|
||||
_ = RemoveSocket(address)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-publisher.Done():
|
||||
return nil
|
||||
case <-time.After(5 * time.Second):
|
||||
return errors.New("publisher not closed")
|
||||
}
|
||||
}
|
||||
|
||||
// serve serves the ttrpc API over a unix socket in the current working directory
|
||||
// and blocks until the context is canceled
|
||||
func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal, shutdown func()) error {
|
||||
dump := make(chan os.Signal, 32)
|
||||
setupDumpStacks(dump)
|
||||
|
||||
path, err := os.Getwd()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
l, err := serveListener(socketFlag)
|
||||
if err != nil { //nolint:staticcheck // Ignore SA4023 as some platforms always return error
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
defer l.Close()
|
||||
if err := server.Serve(ctx, l); err != nil &&
|
||||
!strings.Contains(err.Error(), "use of closed network connection") {
|
||||
log.G(ctx).WithError(err).Fatal("containerd-shim: ttrpc server failure")
|
||||
}
|
||||
}()
|
||||
logger := log.G(ctx).WithFields(logrus.Fields{
|
||||
"pid": os.Getpid(),
|
||||
"path": path,
|
||||
"namespace": namespaceFlag,
|
||||
})
|
||||
go func() {
|
||||
for range dump {
|
||||
dumpStacks(logger)
|
||||
}
|
||||
}()
|
||||
|
||||
go handleExitSignals(ctx, logger, shutdown)
|
||||
return reap(ctx, logger, signals)
|
||||
}
|
||||
|
||||
func dumpStacks(logger *logrus.Entry) {
|
||||
var (
|
||||
buf []byte
|
||||
stackSize int
|
||||
)
|
||||
bufferLen := 16384
|
||||
for stackSize == len(buf) {
|
||||
buf = make([]byte, bufferLen)
|
||||
stackSize = runtime.Stack(buf, true)
|
||||
bufferLen *= 2
|
||||
}
|
||||
buf = buf[:stackSize]
|
||||
logger.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
|
||||
}
|
27
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_darwin.go
generated
vendored
Normal file
27
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_darwin.go
generated
vendored
Normal file
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import "github.com/containerd/ttrpc"
|
||||
|
||||
func newServer() (*ttrpc.Server, error) {
|
||||
return ttrpc.NewServer()
|
||||
}
|
||||
|
||||
func subreaper() error {
|
||||
return nil
|
||||
}
|
27
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_freebsd.go
generated
vendored
Normal file
27
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_freebsd.go
generated
vendored
Normal file
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import "github.com/containerd/ttrpc"
|
||||
|
||||
func newServer() (*ttrpc.Server, error) {
|
||||
return ttrpc.NewServer()
|
||||
}
|
||||
|
||||
func subreaper() error {
|
||||
return nil
|
||||
}
|
30
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_linux.go
generated
vendored
Normal file
30
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_linux.go
generated
vendored
Normal file
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import (
|
||||
"github.com/containerd/containerd/sys/reaper"
|
||||
"github.com/containerd/ttrpc"
|
||||
)
|
||||
|
||||
func newServer() (*ttrpc.Server, error) {
|
||||
return ttrpc.NewServer(ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser()))
|
||||
}
|
||||
|
||||
func subreaper() error {
|
||||
return reaper.SetSubreaper(1)
|
||||
}
|
113
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_unix.go
generated
vendored
Normal file
113
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_unix.go
generated
vendored
Normal file
|
@ -0,0 +1,113 @@
|
|||
//go:build !windows
|
||||
// +build !windows
|
||||
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/containerd/containerd/sys/reaper"
|
||||
"github.com/containerd/fifo"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// setupSignals creates a new signal handler for all signals and sets the shim as a
|
||||
// sub-reaper so that the container processes are reparented
|
||||
func setupSignals(config Config) (chan os.Signal, error) {
|
||||
signals := make(chan os.Signal, 32)
|
||||
smp := []os.Signal{unix.SIGTERM, unix.SIGINT, unix.SIGPIPE}
|
||||
if !config.NoReaper {
|
||||
smp = append(smp, unix.SIGCHLD)
|
||||
}
|
||||
signal.Notify(signals, smp...)
|
||||
return signals, nil
|
||||
}
|
||||
|
||||
func setupDumpStacks(dump chan<- os.Signal) {
|
||||
signal.Notify(dump, syscall.SIGUSR1)
|
||||
}
|
||||
|
||||
func serveListener(path string) (net.Listener, error) {
|
||||
var (
|
||||
l net.Listener
|
||||
err error
|
||||
)
|
||||
if path == "" {
|
||||
l, err = net.FileListener(os.NewFile(3, "socket"))
|
||||
path = "[inherited from parent]"
|
||||
} else {
|
||||
if len(path) > socketPathLimit {
|
||||
return nil, fmt.Errorf("%q: unix socket path too long (> %d)", path, socketPathLimit)
|
||||
}
|
||||
l, err = net.Listen("unix", path)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
logrus.WithField("socket", path).Debug("serving api on socket")
|
||||
return l, nil
|
||||
}
|
||||
|
||||
func reap(ctx context.Context, logger *logrus.Entry, signals chan os.Signal) error {
|
||||
logger.Info("starting signal loop")
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case s := <-signals:
|
||||
// Exit signals are handled separately from this loop
|
||||
// They get registered with this channel so that we can ignore such signals for short-running actions (e.g. `delete`)
|
||||
switch s {
|
||||
case unix.SIGCHLD:
|
||||
if err := reaper.Reap(); err != nil {
|
||||
logger.WithError(err).Error("reap exit status")
|
||||
}
|
||||
case unix.SIGPIPE:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func handleExitSignals(ctx context.Context, logger *logrus.Entry, cancel context.CancelFunc) {
|
||||
ch := make(chan os.Signal, 32)
|
||||
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
for {
|
||||
select {
|
||||
case s := <-ch:
|
||||
logger.WithField("signal", s).Debugf("Caught exit signal")
|
||||
cancel()
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func openLog(ctx context.Context, _ string) (io.Writer, error) {
|
||||
return fifo.OpenFifoDup2(ctx, "log", unix.O_WRONLY, 0700, int(os.Stderr.Fd()))
|
||||
}
|
58
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_windows.go
generated
vendored
Normal file
58
vendor/github.com/containerd/containerd/runtime/v2/shim/shim_windows.go
generated
vendored
Normal file
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
|
||||
"github.com/containerd/ttrpc"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func setupSignals(config Config) (chan os.Signal, error) {
|
||||
return nil, errors.New("not supported")
|
||||
}
|
||||
|
||||
func newServer() (*ttrpc.Server, error) {
|
||||
return nil, errors.New("not supported")
|
||||
}
|
||||
|
||||
func subreaper() error {
|
||||
return errors.New("not supported")
|
||||
}
|
||||
|
||||
func setupDumpStacks(dump chan<- os.Signal) {
|
||||
}
|
||||
|
||||
func serveListener(path string) (net.Listener, error) {
|
||||
return nil, errors.New("not supported")
|
||||
}
|
||||
|
||||
func reap(ctx context.Context, logger *logrus.Entry, signals chan os.Signal) error {
|
||||
return errors.New("not supported")
|
||||
}
|
||||
|
||||
func handleExitSignals(ctx context.Context, logger *logrus.Entry, cancel context.CancelFunc) {
|
||||
}
|
||||
|
||||
func openLog(ctx context.Context, _ string) (io.Writer, error) {
|
||||
return nil, errors.New("not supported")
|
||||
}
|
169
vendor/github.com/containerd/containerd/runtime/v2/shim/util.go
generated
vendored
Normal file
169
vendor/github.com/containerd/containerd/runtime/v2/shim/util.go
generated
vendored
Normal file
|
@ -0,0 +1,169 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/gogo/protobuf/types"
|
||||
exec "golang.org/x/sys/execabs"
|
||||
)
|
||||
|
||||
type CommandConfig struct {
|
||||
Runtime string
|
||||
Address string
|
||||
TTRPCAddress string
|
||||
Path string
|
||||
SchedCore bool
|
||||
Args []string
|
||||
Opts *types.Any
|
||||
}
|
||||
|
||||
// Command returns the shim command with the provided args and configuration
|
||||
func Command(ctx context.Context, config *CommandConfig) (*exec.Cmd, error) {
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
self, err := os.Executable()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
args := []string{
|
||||
"-namespace", ns,
|
||||
"-address", config.Address,
|
||||
"-publish-binary", self,
|
||||
}
|
||||
args = append(args, config.Args...)
|
||||
cmd := exec.CommandContext(ctx, config.Runtime, args...)
|
||||
cmd.Dir = config.Path
|
||||
cmd.Env = append(
|
||||
os.Environ(),
|
||||
"GOMAXPROCS=2",
|
||||
fmt.Sprintf("%s=%s", ttrpcAddressEnv, config.TTRPCAddress),
|
||||
)
|
||||
if config.SchedCore {
|
||||
cmd.Env = append(cmd.Env, "SCHED_CORE=1")
|
||||
}
|
||||
cmd.SysProcAttr = getSysProcAttr()
|
||||
if config.Opts != nil {
|
||||
d, err := proto.Marshal(config.Opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cmd.Stdin = bytes.NewReader(d)
|
||||
}
|
||||
return cmd, nil
|
||||
}
|
||||
|
||||
// BinaryName returns the shim binary name from the runtime name,
|
||||
// empty string returns means runtime name is invalid
|
||||
func BinaryName(runtime string) string {
|
||||
// runtime name should format like $prefix.name.version
|
||||
parts := strings.Split(runtime, ".")
|
||||
if len(parts) < 2 {
|
||||
return ""
|
||||
}
|
||||
|
||||
return fmt.Sprintf(shimBinaryFormat, parts[len(parts)-2], parts[len(parts)-1])
|
||||
}
|
||||
|
||||
// BinaryPath returns the full path for the shim binary from the runtime name,
|
||||
// empty string returns means runtime name is invalid
|
||||
func BinaryPath(runtime string) string {
|
||||
dir := filepath.Dir(runtime)
|
||||
binary := BinaryName(runtime)
|
||||
|
||||
path, err := filepath.Abs(filepath.Join(dir, binary))
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
return path
|
||||
}
|
||||
|
||||
// Connect to the provided address
|
||||
func Connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
|
||||
return d(address, 100*time.Second)
|
||||
}
|
||||
|
||||
// WritePidFile writes a pid file atomically
|
||||
func WritePidFile(path string, pid int) error {
|
||||
path, err := filepath.Abs(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tempPath := filepath.Join(filepath.Dir(path), fmt.Sprintf(".%s", filepath.Base(path)))
|
||||
f, err := os.OpenFile(tempPath, os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, 0666)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = fmt.Fprintf(f, "%d", pid)
|
||||
f.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return os.Rename(tempPath, path)
|
||||
}
|
||||
|
||||
// WriteAddress writes a address file atomically
|
||||
func WriteAddress(path, address string) error {
|
||||
path, err := filepath.Abs(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tempPath := filepath.Join(filepath.Dir(path), fmt.Sprintf(".%s", filepath.Base(path)))
|
||||
f, err := os.OpenFile(tempPath, os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, 0666)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = f.WriteString(address)
|
||||
f.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return os.Rename(tempPath, path)
|
||||
}
|
||||
|
||||
// ErrNoAddress is returned when the address file has no content
|
||||
var ErrNoAddress = errors.New("no shim address")
|
||||
|
||||
// ReadAddress returns the shim's socket address from the path
|
||||
func ReadAddress(path string) (string, error) {
|
||||
path, err := filepath.Abs(path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if len(data) == 0 {
|
||||
return "", ErrNoAddress
|
||||
}
|
||||
return string(data), nil
|
||||
}
|
173
vendor/github.com/containerd/containerd/runtime/v2/shim/util_unix.go
generated
vendored
Normal file
173
vendor/github.com/containerd/containerd/runtime/v2/shim/util_unix.go
generated
vendored
Normal file
|
@ -0,0 +1,173 @@
|
|||
//go:build !windows
|
||||
// +build !windows
|
||||
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/defaults"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/sys"
|
||||
)
|
||||
|
||||
const (
|
||||
shimBinaryFormat = "containerd-shim-%s-%s"
|
||||
socketPathLimit = 106
|
||||
)
|
||||
|
||||
func getSysProcAttr() *syscall.SysProcAttr {
|
||||
return &syscall.SysProcAttr{
|
||||
Setpgid: true,
|
||||
}
|
||||
}
|
||||
|
||||
// AdjustOOMScore sets the OOM score for the process to the parents OOM score +1
|
||||
// to ensure that they parent has a lower* score than the shim
|
||||
// if not already at the maximum OOM Score
|
||||
func AdjustOOMScore(pid int) error {
|
||||
parent := os.Getppid()
|
||||
score, err := sys.GetOOMScoreAdj(parent)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get parent OOM score: %w", err)
|
||||
}
|
||||
shimScore := score + 1
|
||||
if err := sys.AdjustOOMScore(pid, shimScore); err != nil {
|
||||
return fmt.Errorf("set shim OOM score: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
const socketRoot = defaults.DefaultStateDir
|
||||
|
||||
// SocketAddress returns a socket address
|
||||
func SocketAddress(ctx context.Context, socketPath, id string) (string, error) {
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
d := sha256.Sum256([]byte(filepath.Join(socketPath, ns, id)))
|
||||
return fmt.Sprintf("unix://%s/%x", filepath.Join(socketRoot, "s"), d), nil
|
||||
}
|
||||
|
||||
// AnonDialer returns a dialer for a socket
|
||||
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
|
||||
return net.DialTimeout("unix", socket(address).path(), timeout)
|
||||
}
|
||||
|
||||
// AnonReconnectDialer returns a dialer for an existing socket on reconnection
|
||||
func AnonReconnectDialer(address string, timeout time.Duration) (net.Conn, error) {
|
||||
return AnonDialer(address, timeout)
|
||||
}
|
||||
|
||||
// NewSocket returns a new socket
|
||||
func NewSocket(address string) (*net.UnixListener, error) {
|
||||
var (
|
||||
sock = socket(address)
|
||||
path = sock.path()
|
||||
)
|
||||
|
||||
isAbstract := sock.isAbstract()
|
||||
|
||||
if !isAbstract {
|
||||
if err := os.MkdirAll(filepath.Dir(path), 0600); err != nil {
|
||||
return nil, fmt.Errorf("%s: %w", path, err)
|
||||
}
|
||||
}
|
||||
l, err := net.Listen("unix", path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !isAbstract {
|
||||
if err := os.Chmod(path, 0600); err != nil {
|
||||
os.Remove(sock.path())
|
||||
l.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return l.(*net.UnixListener), nil
|
||||
}
|
||||
|
||||
const abstractSocketPrefix = "\x00"
|
||||
|
||||
type socket string
|
||||
|
||||
func (s socket) isAbstract() bool {
|
||||
return !strings.HasPrefix(string(s), "unix://")
|
||||
}
|
||||
|
||||
func (s socket) path() string {
|
||||
path := strings.TrimPrefix(string(s), "unix://")
|
||||
// if there was no trim performed, we assume an abstract socket
|
||||
if len(path) == len(s) {
|
||||
path = abstractSocketPrefix + path
|
||||
}
|
||||
return path
|
||||
}
|
||||
|
||||
// RemoveSocket removes the socket at the specified address if
|
||||
// it exists on the filesystem
|
||||
func RemoveSocket(address string) error {
|
||||
sock := socket(address)
|
||||
if !sock.isAbstract() {
|
||||
return os.Remove(sock.path())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SocketEaddrinuse returns true if the provided error is caused by the
|
||||
// EADDRINUSE error number
|
||||
func SocketEaddrinuse(err error) bool {
|
||||
netErr, ok := err.(*net.OpError)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
if netErr.Op != "listen" {
|
||||
return false
|
||||
}
|
||||
syscallErr, ok := netErr.Err.(*os.SyscallError)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
errno, ok := syscallErr.Err.(syscall.Errno)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return errno == syscall.EADDRINUSE
|
||||
}
|
||||
|
||||
// CanConnect returns true if the socket provided at the address
|
||||
// is accepting new connections
|
||||
func CanConnect(address string) bool {
|
||||
conn, err := AnonDialer(address, 100*time.Millisecond)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
conn.Close()
|
||||
return true
|
||||
}
|
87
vendor/github.com/containerd/containerd/runtime/v2/shim/util_windows.go
generated
vendored
Normal file
87
vendor/github.com/containerd/containerd/runtime/v2/shim/util_windows.go
generated
vendored
Normal file
|
@ -0,0 +1,87 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package shim
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
winio "github.com/Microsoft/go-winio"
|
||||
)
|
||||
|
||||
const shimBinaryFormat = "containerd-shim-%s-%s.exe"
|
||||
|
||||
func getSysProcAttr() *syscall.SysProcAttr {
|
||||
return nil
|
||||
}
|
||||
|
||||
// AnonReconnectDialer returns a dialer for an existing npipe on containerd reconnection
|
||||
func AnonReconnectDialer(address string, timeout time.Duration) (net.Conn, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
|
||||
c, err := winio.DialPipeContext(ctx, address)
|
||||
if os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("npipe not found on reconnect: %w", os.ErrNotExist)
|
||||
} else if err == context.DeadlineExceeded {
|
||||
return nil, fmt.Errorf("timed out waiting for npipe %s: %w", address, err)
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// AnonDialer returns a dialer for a npipe
|
||||
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
|
||||
// If there is nobody serving the pipe we limit the timeout for this case to
|
||||
// 5 seconds because any shim that would serve this endpoint should serve it
|
||||
// within 5 seconds.
|
||||
serveTimer := time.NewTimer(5 * time.Second)
|
||||
defer serveTimer.Stop()
|
||||
for {
|
||||
c, err := winio.DialPipeContext(ctx, address)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
select {
|
||||
case <-serveTimer.C:
|
||||
return nil, fmt.Errorf("pipe not found before timeout: %w", os.ErrNotExist)
|
||||
default:
|
||||
// Wait 10ms for the shim to serve and try again.
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
} else if err == context.DeadlineExceeded {
|
||||
return nil, fmt.Errorf("timed out waiting for npipe %s: %w", address, err)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
}
|
||||
|
||||
// RemoveSocket removes the socket at the specified address if
|
||||
// it exists on the filesystem
|
||||
func RemoveSocket(address string) error {
|
||||
return nil
|
||||
}
|
17
vendor/github.com/containerd/containerd/runtime/v2/task/doc.go
generated
vendored
Normal file
17
vendor/github.com/containerd/containerd/runtime/v2/task/doc.go
generated
vendored
Normal file
|
@ -0,0 +1,17 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package task
|
7312
vendor/github.com/containerd/containerd/runtime/v2/task/shim.pb.go
generated
vendored
Normal file
7312
vendor/github.com/containerd/containerd/runtime/v2/task/shim.pb.go
generated
vendored
Normal file
File diff suppressed because it is too large
Load diff
202
vendor/github.com/containerd/containerd/runtime/v2/task/shim.proto
generated
vendored
Normal file
202
vendor/github.com/containerd/containerd/runtime/v2/task/shim.proto
generated
vendored
Normal file
|
@ -0,0 +1,202 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
syntax = "proto3";
|
||||
|
||||
package containerd.task.v2;
|
||||
|
||||
import "google/protobuf/any.proto";
|
||||
import "google/protobuf/empty.proto";
|
||||
import weak "gogoproto/gogo.proto";
|
||||
import "google/protobuf/timestamp.proto";
|
||||
import "github.com/containerd/containerd/api/types/mount.proto";
|
||||
import "github.com/containerd/containerd/api/types/task/task.proto";
|
||||
|
||||
option go_package = "github.com/containerd/containerd/runtime/v2/task;task";
|
||||
|
||||
// Shim service is launched for each container and is responsible for owning the IO
|
||||
// for the container and its additional processes. The shim is also the parent of
|
||||
// each container and allows reattaching to the IO and receiving the exit status
|
||||
// for the container processes.
|
||||
service Task {
|
||||
rpc State(StateRequest) returns (StateResponse);
|
||||
rpc Create(CreateTaskRequest) returns (CreateTaskResponse);
|
||||
rpc Start(StartRequest) returns (StartResponse);
|
||||
rpc Delete(DeleteRequest) returns (DeleteResponse);
|
||||
rpc Pids(PidsRequest) returns (PidsResponse);
|
||||
rpc Pause(PauseRequest) returns (google.protobuf.Empty);
|
||||
rpc Resume(ResumeRequest) returns (google.protobuf.Empty);
|
||||
rpc Checkpoint(CheckpointTaskRequest) returns (google.protobuf.Empty);
|
||||
rpc Kill(KillRequest) returns (google.protobuf.Empty);
|
||||
rpc Exec(ExecProcessRequest) returns (google.protobuf.Empty);
|
||||
rpc ResizePty(ResizePtyRequest) returns (google.protobuf.Empty);
|
||||
rpc CloseIO(CloseIORequest) returns (google.protobuf.Empty);
|
||||
rpc Update(UpdateTaskRequest) returns (google.protobuf.Empty);
|
||||
rpc Wait(WaitRequest) returns (WaitResponse);
|
||||
rpc Stats(StatsRequest) returns (StatsResponse);
|
||||
rpc Connect(ConnectRequest) returns (ConnectResponse);
|
||||
rpc Shutdown(ShutdownRequest) returns (google.protobuf.Empty);
|
||||
}
|
||||
|
||||
message CreateTaskRequest {
|
||||
string id = 1;
|
||||
string bundle = 2;
|
||||
repeated containerd.types.Mount rootfs = 3;
|
||||
bool terminal = 4;
|
||||
string stdin = 5;
|
||||
string stdout = 6;
|
||||
string stderr = 7;
|
||||
string checkpoint = 8;
|
||||
string parent_checkpoint = 9;
|
||||
google.protobuf.Any options = 10;
|
||||
}
|
||||
|
||||
message CreateTaskResponse {
|
||||
uint32 pid = 1;
|
||||
}
|
||||
|
||||
message DeleteRequest {
|
||||
string id = 1;
|
||||
string exec_id = 2;
|
||||
}
|
||||
|
||||
message DeleteResponse {
|
||||
uint32 pid = 1;
|
||||
uint32 exit_status = 2;
|
||||
google.protobuf.Timestamp exited_at = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
||||
}
|
||||
|
||||
message ExecProcessRequest {
|
||||
string id = 1;
|
||||
string exec_id = 2;
|
||||
bool terminal = 3;
|
||||
string stdin = 4;
|
||||
string stdout = 5;
|
||||
string stderr = 6;
|
||||
google.protobuf.Any spec = 7;
|
||||
}
|
||||
|
||||
message ExecProcessResponse {
|
||||
}
|
||||
|
||||
message ResizePtyRequest {
|
||||
string id = 1;
|
||||
string exec_id = 2;
|
||||
uint32 width = 3;
|
||||
uint32 height = 4;
|
||||
}
|
||||
|
||||
message StateRequest {
|
||||
string id = 1;
|
||||
string exec_id = 2;
|
||||
}
|
||||
|
||||
message StateResponse {
|
||||
string id = 1;
|
||||
string bundle = 2;
|
||||
uint32 pid = 3;
|
||||
containerd.v1.types.Status status = 4;
|
||||
string stdin = 5;
|
||||
string stdout = 6;
|
||||
string stderr = 7;
|
||||
bool terminal = 8;
|
||||
uint32 exit_status = 9;
|
||||
google.protobuf.Timestamp exited_at = 10 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
||||
string exec_id = 11;
|
||||
}
|
||||
|
||||
message KillRequest {
|
||||
string id = 1;
|
||||
string exec_id = 2;
|
||||
uint32 signal = 3;
|
||||
bool all = 4;
|
||||
}
|
||||
|
||||
message CloseIORequest {
|
||||
string id = 1;
|
||||
string exec_id = 2;
|
||||
bool stdin = 3;
|
||||
}
|
||||
|
||||
message PidsRequest {
|
||||
string id = 1;
|
||||
}
|
||||
|
||||
message PidsResponse {
|
||||
repeated containerd.v1.types.ProcessInfo processes = 1;
|
||||
}
|
||||
|
||||
message CheckpointTaskRequest {
|
||||
string id = 1;
|
||||
string path = 2;
|
||||
google.protobuf.Any options = 3;
|
||||
}
|
||||
|
||||
message UpdateTaskRequest {
|
||||
string id = 1;
|
||||
google.protobuf.Any resources = 2;
|
||||
map<string, string> annotations = 3;
|
||||
}
|
||||
|
||||
message StartRequest {
|
||||
string id = 1;
|
||||
string exec_id = 2;
|
||||
}
|
||||
|
||||
message StartResponse {
|
||||
uint32 pid = 1;
|
||||
}
|
||||
|
||||
message WaitRequest {
|
||||
string id = 1;
|
||||
string exec_id = 2;
|
||||
}
|
||||
|
||||
message WaitResponse {
|
||||
uint32 exit_status = 1;
|
||||
google.protobuf.Timestamp exited_at = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
|
||||
}
|
||||
|
||||
message StatsRequest {
|
||||
string id = 1;
|
||||
}
|
||||
|
||||
message StatsResponse {
|
||||
google.protobuf.Any stats = 1;
|
||||
}
|
||||
|
||||
message ConnectRequest {
|
||||
string id = 1;
|
||||
}
|
||||
|
||||
message ConnectResponse {
|
||||
uint32 shim_pid = 1;
|
||||
uint32 task_pid = 2;
|
||||
string version = 3;
|
||||
}
|
||||
|
||||
message ShutdownRequest {
|
||||
string id = 1;
|
||||
bool now = 2;
|
||||
}
|
||||
|
||||
message PauseRequest {
|
||||
string id = 1;
|
||||
}
|
||||
|
||||
message ResumeRequest {
|
||||
string id = 1;
|
||||
}
|
283
vendor/github.com/containerd/containerd/sys/reaper/reaper_unix.go
generated
vendored
Normal file
283
vendor/github.com/containerd/containerd/sys/reaper/reaper_unix.go
generated
vendored
Normal file
|
@ -0,0 +1,283 @@
|
|||
//go:build !windows
|
||||
// +build !windows
|
||||
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package reaper
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
runc "github.com/containerd/go-runc"
|
||||
exec "golang.org/x/sys/execabs"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// ErrNoSuchProcess is returned when the process no longer exists
|
||||
var ErrNoSuchProcess = errors.New("no such process")
|
||||
|
||||
const bufferSize = 32
|
||||
|
||||
type subscriber struct {
|
||||
sync.Mutex
|
||||
c chan runc.Exit
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (s *subscriber) close() {
|
||||
s.Lock()
|
||||
if s.closed {
|
||||
s.Unlock()
|
||||
return
|
||||
}
|
||||
close(s.c)
|
||||
s.closed = true
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
func (s *subscriber) do(fn func()) {
|
||||
s.Lock()
|
||||
fn()
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
// Reap should be called when the process receives an SIGCHLD. Reap will reap
|
||||
// all exited processes and close their wait channels
|
||||
func Reap() error {
|
||||
now := time.Now()
|
||||
exits, err := reap(false)
|
||||
for _, e := range exits {
|
||||
done := Default.notify(runc.Exit{
|
||||
Timestamp: now,
|
||||
Pid: e.Pid,
|
||||
Status: e.Status,
|
||||
})
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(1 * time.Second):
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Default is the default monitor initialized for the package
|
||||
var Default = &Monitor{
|
||||
subscribers: make(map[chan runc.Exit]*subscriber),
|
||||
}
|
||||
|
||||
// Monitor monitors the underlying system for process status changes
|
||||
type Monitor struct {
|
||||
sync.Mutex
|
||||
|
||||
subscribers map[chan runc.Exit]*subscriber
|
||||
}
|
||||
|
||||
// Start starts the command a registers the process with the reaper
|
||||
func (m *Monitor) Start(c *exec.Cmd) (chan runc.Exit, error) {
|
||||
ec := m.Subscribe()
|
||||
if err := c.Start(); err != nil {
|
||||
m.Unsubscribe(ec)
|
||||
return nil, err
|
||||
}
|
||||
return ec, nil
|
||||
}
|
||||
|
||||
// Wait blocks until a process is signal as dead.
|
||||
// User should rely on the value of the exit status to determine if the
|
||||
// command was successful or not.
|
||||
func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) {
|
||||
for e := range ec {
|
||||
if e.Pid == c.Process.Pid {
|
||||
// make sure we flush all IO
|
||||
c.Wait()
|
||||
m.Unsubscribe(ec)
|
||||
return e.Status, nil
|
||||
}
|
||||
}
|
||||
// return no such process if the ec channel is closed and no more exit
|
||||
// events will be sent
|
||||
return -1, ErrNoSuchProcess
|
||||
}
|
||||
|
||||
// WaitTimeout is used to skip the blocked command and kill the left process.
|
||||
func (m *Monitor) WaitTimeout(c *exec.Cmd, ec chan runc.Exit, timeout time.Duration) (int, error) {
|
||||
type exitStatusWrapper struct {
|
||||
status int
|
||||
err error
|
||||
}
|
||||
|
||||
// capacity can make sure that the following goroutine will not be
|
||||
// blocked if there is no receiver when timeout.
|
||||
waitCh := make(chan *exitStatusWrapper, 1)
|
||||
go func() {
|
||||
defer close(waitCh)
|
||||
|
||||
status, err := m.Wait(c, ec)
|
||||
waitCh <- &exitStatusWrapper{
|
||||
status: status,
|
||||
err: err,
|
||||
}
|
||||
}()
|
||||
|
||||
timer := time.NewTimer(timeout)
|
||||
defer timer.Stop()
|
||||
|
||||
select {
|
||||
case <-timer.C:
|
||||
syscall.Kill(c.Process.Pid, syscall.SIGKILL)
|
||||
return 0, fmt.Errorf("timeout %v for cmd(pid=%d): %s, %s", timeout, c.Process.Pid, c.Path, c.Args)
|
||||
case res := <-waitCh:
|
||||
return res.status, res.err
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe to process exit changes
|
||||
func (m *Monitor) Subscribe() chan runc.Exit {
|
||||
c := make(chan runc.Exit, bufferSize)
|
||||
m.Lock()
|
||||
m.subscribers[c] = &subscriber{
|
||||
c: c,
|
||||
}
|
||||
m.Unlock()
|
||||
return c
|
||||
}
|
||||
|
||||
// Unsubscribe to process exit changes
|
||||
func (m *Monitor) Unsubscribe(c chan runc.Exit) {
|
||||
m.Lock()
|
||||
s, ok := m.subscribers[c]
|
||||
if !ok {
|
||||
m.Unlock()
|
||||
return
|
||||
}
|
||||
s.close()
|
||||
delete(m.subscribers, c)
|
||||
m.Unlock()
|
||||
}
|
||||
|
||||
func (m *Monitor) getSubscribers() map[chan runc.Exit]*subscriber {
|
||||
out := make(map[chan runc.Exit]*subscriber)
|
||||
m.Lock()
|
||||
for k, v := range m.subscribers {
|
||||
out[k] = v
|
||||
}
|
||||
m.Unlock()
|
||||
return out
|
||||
}
|
||||
|
||||
func (m *Monitor) notify(e runc.Exit) chan struct{} {
|
||||
const timeout = 1 * time.Millisecond
|
||||
var (
|
||||
done = make(chan struct{}, 1)
|
||||
timer = time.NewTimer(timeout)
|
||||
success = make(map[chan runc.Exit]struct{})
|
||||
)
|
||||
stop(timer, true)
|
||||
|
||||
go func() {
|
||||
defer close(done)
|
||||
|
||||
for {
|
||||
var (
|
||||
failed int
|
||||
subscribers = m.getSubscribers()
|
||||
)
|
||||
for _, s := range subscribers {
|
||||
s.do(func() {
|
||||
if s.closed {
|
||||
return
|
||||
}
|
||||
if _, ok := success[s.c]; ok {
|
||||
return
|
||||
}
|
||||
timer.Reset(timeout)
|
||||
recv := true
|
||||
select {
|
||||
case s.c <- e:
|
||||
success[s.c] = struct{}{}
|
||||
case <-timer.C:
|
||||
recv = false
|
||||
failed++
|
||||
}
|
||||
stop(timer, recv)
|
||||
})
|
||||
}
|
||||
// all subscribers received the message
|
||||
if failed == 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return done
|
||||
}
|
||||
|
||||
func stop(timer *time.Timer, recv bool) {
|
||||
if !timer.Stop() && recv {
|
||||
<-timer.C
|
||||
}
|
||||
}
|
||||
|
||||
// exit is the wait4 information from an exited process
|
||||
type exit struct {
|
||||
Pid int
|
||||
Status int
|
||||
}
|
||||
|
||||
// reap reaps all child processes for the calling process and returns their
|
||||
// exit information
|
||||
func reap(wait bool) (exits []exit, err error) {
|
||||
var (
|
||||
ws unix.WaitStatus
|
||||
rus unix.Rusage
|
||||
)
|
||||
flag := unix.WNOHANG
|
||||
if wait {
|
||||
flag = 0
|
||||
}
|
||||
for {
|
||||
pid, err := unix.Wait4(-1, &ws, flag, &rus)
|
||||
if err != nil {
|
||||
if err == unix.ECHILD {
|
||||
return exits, nil
|
||||
}
|
||||
return exits, err
|
||||
}
|
||||
if pid <= 0 {
|
||||
return exits, nil
|
||||
}
|
||||
exits = append(exits, exit{
|
||||
Pid: pid,
|
||||
Status: exitStatus(ws),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
const exitSignalOffset = 128
|
||||
|
||||
// exitStatus returns the correct exit status for a process based on if it
|
||||
// was signaled or exited cleanly
|
||||
func exitStatus(status unix.WaitStatus) int {
|
||||
if status.Signaled() {
|
||||
return exitSignalOffset + int(status.Signal())
|
||||
}
|
||||
return status.ExitStatus()
|
||||
}
|
39
vendor/github.com/containerd/containerd/sys/reaper/reaper_utils_linux.go
generated
vendored
Normal file
39
vendor/github.com/containerd/containerd/sys/reaper/reaper_utils_linux.go
generated
vendored
Normal file
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package reaper
|
||||
|
||||
import (
|
||||
"unsafe"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// SetSubreaper sets the value i as the subreaper setting for the calling process
|
||||
func SetSubreaper(i int) error {
|
||||
return unix.Prctl(unix.PR_SET_CHILD_SUBREAPER, uintptr(i), 0, 0, 0)
|
||||
}
|
||||
|
||||
// GetSubreaper returns the subreaper setting for the calling process
|
||||
func GetSubreaper() (int, error) {
|
||||
var i uintptr
|
||||
|
||||
if err := unix.Prctl(unix.PR_GET_CHILD_SUBREAPER, uintptr(unsafe.Pointer(&i)), 0, 0, 0); err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
return int(i), nil
|
||||
}
|
6
vendor/modules.txt
vendored
6
vendor/modules.txt
vendored
|
@ -167,6 +167,7 @@ github.com/containerd/containerd/api/services/leases/v1
|
|||
github.com/containerd/containerd/api/services/namespaces/v1
|
||||
github.com/containerd/containerd/api/services/snapshots/v1
|
||||
github.com/containerd/containerd/api/services/tasks/v1
|
||||
github.com/containerd/containerd/api/services/ttrpc/events/v1
|
||||
github.com/containerd/containerd/api/services/version/v1
|
||||
github.com/containerd/containerd/api/types
|
||||
github.com/containerd/containerd/api/types/task
|
||||
|
@ -204,6 +205,8 @@ github.com/containerd/containerd/pkg/cap
|
|||
github.com/containerd/containerd/pkg/dialer
|
||||
github.com/containerd/containerd/pkg/kmutex
|
||||
github.com/containerd/containerd/pkg/seccomp
|
||||
github.com/containerd/containerd/pkg/shutdown
|
||||
github.com/containerd/containerd/pkg/ttrpcutil
|
||||
github.com/containerd/containerd/pkg/userns
|
||||
github.com/containerd/containerd/platforms
|
||||
github.com/containerd/containerd/plugin
|
||||
|
@ -217,6 +220,8 @@ github.com/containerd/containerd/remotes/errors
|
|||
github.com/containerd/containerd/rootfs
|
||||
github.com/containerd/containerd/runtime/linux/runctypes
|
||||
github.com/containerd/containerd/runtime/v2/runc/options
|
||||
github.com/containerd/containerd/runtime/v2/shim
|
||||
github.com/containerd/containerd/runtime/v2/task
|
||||
github.com/containerd/containerd/services
|
||||
github.com/containerd/containerd/services/content/contentserver
|
||||
github.com/containerd/containerd/services/introspection
|
||||
|
@ -224,6 +229,7 @@ github.com/containerd/containerd/services/server/config
|
|||
github.com/containerd/containerd/snapshots
|
||||
github.com/containerd/containerd/snapshots/proxy
|
||||
github.com/containerd/containerd/sys
|
||||
github.com/containerd/containerd/sys/reaper
|
||||
github.com/containerd/containerd/version
|
||||
# github.com/containerd/continuity v0.3.0
|
||||
## explicit; go 1.17
|
||||
|
|
Loading…
Reference in a new issue