Update containerd to v1 beta3

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
Brian Goff 2017-11-10 15:44:10 -05:00
parent 97be2a0752
commit d3f934e304
70 changed files with 2925 additions and 559 deletions

View file

@ -4,7 +4,7 @@ TOMLV_COMMIT=9baf8a8a9f2ed20a8e54160840c492f937eeaf9a
# When updating RUNC_COMMIT, also update runc in vendor.conf accordingly
RUNC_COMMIT=0351df1c5a66838d0c392b4ac4cf9450de844e2d
CONTAINERD_COMMIT=992280e8e265f491f7a624ab82f3e238be086e49
CONTAINERD_COMMIT=v1.0.0-beta.3
TINI_COMMIT=949e6facb77383876aeff8a6944dde66b3089574
LIBNETWORK_COMMIT=7b2b1feb1de4817d522cc372af149ff48d25028e
VNDR_COMMIT=a6e196d8b4b0cbbdc29aebdb20c59ac6926bb384

View file

@ -31,8 +31,8 @@ type rpcUser struct {
func (u rpcUser) Apply(r Remote) error {
if remote, ok := r.(*remote); ok {
remote.GRPC.Uid = u.uid
remote.GRPC.Gid = u.gid
remote.GRPC.UID = u.uid
remote.GRPC.GID = u.gid
return nil
}
return fmt.Errorf("WithRemoteAddr option not supported for this remote")

View file

@ -103,7 +103,7 @@ github.com/googleapis/gax-go da06d194a00e19ce00d9011a13931c3f6f6887c7
google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944
# containerd
github.com/containerd/containerd 992280e8e265f491f7a624ab82f3e238be086e49
github.com/containerd/containerd v1.0.0-beta.3
github.com/containerd/fifo fbfb6a11ec671efbe94ad1c12c2e98773f19e1e6
github.com/containerd/continuity 35d55c5e8dd23b32037d56cf97174aff3efdfa83
github.com/containerd/cgroups f7dd103d3e4e696aa67152f6b4ddd1779a3455a9

View file

@ -1,4 +1,4 @@
Attribution-ShareAlike 4.0 International
Attribution 4.0 International
=======================================================================
@ -54,18 +54,16 @@ exhaustive, and do not form part of our licenses.
=======================================================================
Creative Commons Attribution-ShareAlike 4.0 International Public
License
Creative Commons Attribution 4.0 International Public License
By exercising the Licensed Rights (defined below), You accept and agree
to be bound by the terms and conditions of this Creative Commons
Attribution-ShareAlike 4.0 International Public License ("Public
License"). To the extent this Public License may be interpreted as a
contract, You are granted the Licensed Rights in consideration of Your
acceptance of these terms and conditions, and the Licensor grants You
such rights in consideration of benefits the Licensor receives from
making the Licensed Material available under these terms and
conditions.
Attribution 4.0 International Public License ("Public License"). To the
extent this Public License may be interpreted as a contract, You are
granted the Licensed Rights in consideration of Your acceptance of
these terms and conditions, and the Licensor grants You such rights in
consideration of benefits the Licensor receives from making the
Licensed Material available under these terms and conditions.
Section 1 -- Definitions.
@ -84,11 +82,7 @@ Section 1 -- Definitions.
and Similar Rights in Your contributions to Adapted Material in
accordance with the terms and conditions of this Public License.
c. BY-SA Compatible License means a license listed at
creativecommons.org/compatiblelicenses, approved by Creative
Commons as essentially the equivalent of this Public License.
d. Copyright and Similar Rights means copyright and/or similar rights
c. Copyright and Similar Rights means copyright and/or similar rights
closely related to copyright including, without limitation,
performance, broadcast, sound recording, and Sui Generis Database
Rights, without regard to how the rights are labeled or
@ -96,33 +90,29 @@ Section 1 -- Definitions.
specified in Section 2(b)(1)-(2) are not Copyright and Similar
Rights.
e. Effective Technological Measures means those measures that, in the
d. Effective Technological Measures means those measures that, in the
absence of proper authority, may not be circumvented under laws
fulfilling obligations under Article 11 of the WIPO Copyright
Treaty adopted on December 20, 1996, and/or similar international
agreements.
f. Exceptions and Limitations means fair use, fair dealing, and/or
e. Exceptions and Limitations means fair use, fair dealing, and/or
any other exception or limitation to Copyright and Similar Rights
that applies to Your use of the Licensed Material.
g. License Elements means the license attributes listed in the name
of a Creative Commons Public License. The License Elements of this
Public License are Attribution and ShareAlike.
h. Licensed Material means the artistic or literary work, database,
f. Licensed Material means the artistic or literary work, database,
or other material to which the Licensor applied this Public
License.
i. Licensed Rights means the rights granted to You subject to the
g. Licensed Rights means the rights granted to You subject to the
terms and conditions of this Public License, which are limited to
all Copyright and Similar Rights that apply to Your use of the
Licensed Material and that the Licensor has authority to license.
j. Licensor means the individual(s) or entity(ies) granting rights
h. Licensor means the individual(s) or entity(ies) granting rights
under this Public License.
k. Share means to provide material to the public by any means or
i. Share means to provide material to the public by any means or
process that requires permission under the Licensed Rights, such
as reproduction, public display, public performance, distribution,
dissemination, communication, or importation, and to make material
@ -130,13 +120,13 @@ Section 1 -- Definitions.
public may access the material from a place and at a time
individually chosen by them.
l. Sui Generis Database Rights means rights other than copyright
j. Sui Generis Database Rights means rights other than copyright
resulting from Directive 96/9/EC of the European Parliament and of
the Council of 11 March 1996 on the legal protection of databases,
as amended and/or succeeded, as well as other essentially
equivalent rights anywhere in the world.
m. You means the individual or entity exercising the Licensed Rights
k. You means the individual or entity exercising the Licensed Rights
under this Public License. Your has a corresponding meaning.
@ -182,13 +172,7 @@ Section 2 -- Scope.
Licensed Rights under the terms and conditions of this
Public License.
b. Additional offer from the Licensor -- Adapted Material.
Every recipient of Adapted Material from You
automatically receives an offer from the Licensor to
exercise the Licensed Rights in the Adapted Material
under the conditions of the Adapter's License You apply.
c. No downstream restrictions. You may not offer or impose
b. No downstream restrictions. You may not offer or impose
any additional or different terms or conditions on, or
apply any Effective Technological Measures to, the
Licensed Material if doing so restricts exercise of the
@ -270,24 +254,9 @@ following conditions.
information required by Section 3(a)(1)(A) to the extent
reasonably practicable.
b. ShareAlike.
In addition to the conditions in Section 3(a), if You Share
Adapted Material You produce, the following conditions also apply.
1. The Adapter's License You apply must be a Creative Commons
license with the same License Elements, this version or
later, or a BY-SA Compatible License.
2. You must include the text of, or the URI or hyperlink to, the
Adapter's License You apply. You may satisfy this condition
in any reasonable manner based on the medium, means, and
context in which You Share Adapted Material.
3. You may not offer or impose any additional or different terms
or conditions on, or apply any Effective Technological
Measures to, Adapted Material that restrict exercise of the
rights granted under the Adapter's License You apply.
4. If You Share Adapted Material You produce, the Adapter's
License You apply must not prevent recipients of the Adapted
Material from complying with this Public License.
Section 4 -- Sui Generis Database Rights.
@ -302,9 +271,8 @@ apply to Your use of the Licensed Material:
b. if You include all or a substantial portion of the database
contents in a database in which You have Sui Generis Database
Rights, then the database in which You have Sui Generis Database
Rights (but not its individual contents) is Adapted Material,
Rights (but not its individual contents) is Adapted Material; and
including for purposes of Section 3(b); and
c. You must comply with the conditions in Section 3(a) if You Share
all or a substantial portion of the contents of the database.
@ -407,11 +375,13 @@ Section 8 -- Interpretation.
=======================================================================
Creative Commons is not a party to its public licenses.
Notwithstanding, Creative Commons may elect to apply one of its public
licenses to material it publishes and in those instances will be
considered the "Licensor." Except for the limited purpose of indicating
that material is shared under a Creative Commons public license or as
Creative Commons is not a party to its public
licenses. Notwithstanding, Creative Commons may elect to apply one of
its public licenses to material it publishes and in those instances
will be considered the “Licensor.” The text of the Creative Commons
public licenses is dedicated to the public domain under the CC0 Public
Domain Dedication. Except for the limited purpose of indicating that
material is shared under a Creative Commons public license or as
otherwise permitted by the Creative Commons policies published at
creativecommons.org/policies, Creative Commons does not authorize the
use of the trademark "Creative Commons" or any other trademark or logo
@ -419,7 +389,7 @@ of Creative Commons without its prior written consent including,
without limitation, in connection with any unauthorized modifications
to any of its public licenses or any other arrangements,
understandings, or agreements concerning use of licensed material. For
the avoidance of doubt, this paragraph does not form part of the public
licenses.
the avoidance of doubt, this paragraph does not form part of the
public licenses.
Creative Commons may be contacted at creativecommons.org.

View file

@ -198,11 +198,10 @@ For sync communication we have a community slack with a #containerd channel that
__If you are reporting a security issue, please reach out discreetly at security@containerd.io__.
## Copyright and license
## Licenses
Copyright ©2016-2017 Docker, Inc. All rights reserved, except as follows. Code
is released under the Apache 2.0 license. The README.md file, and files in the
"docs" folder are licensed under the Creative Commons Attribution 4.0
International License under the terms and conditions set forth in the file
"LICENSE.docs". You may obtain a duplicate copy of the same license, titled
CC-BY-SA-4.0, at http://creativecommons.org/licenses/by/4.0/.
The containerd codebase is released under the [Apache 2.0 license](LICENSE.code).
The README.md file, and files in the "docs" folder are licensed under the
Creative Commons Attribution 4.0 International License under the terms and
conditions set forth in the file "[LICENSE.docs](LICENSE.docs)". You may obtain a duplicate
copy of the same license, titled CC-BY-4.0, at http://creativecommons.org/licenses/by/4.0/.

View file

@ -0,0 +1 @@
package leases

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,58 @@
syntax = "proto3";
package containerd.services.leases.v1;
import "gogoproto/gogo.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
option go_package = "github.com/containerd/containerd/api/services/leases/v1;leases";
// Leases service manages resources leases within the metadata store.
service Leases {
// Create creates a new lease for managing changes to metadata. A lease
// can be used to protect objects from being removed.
rpc Create(CreateRequest) returns (CreateResponse);
// Delete deletes the lease and makes any unreferenced objects created
// during the lease eligible for garbage collection if not referenced
// or retained by other resources during the lease.
rpc Delete(DeleteRequest) returns (google.protobuf.Empty);
// ListTransactions lists all active leases, returning the full list of
// leases and optionally including the referenced resources.
rpc List(ListRequest) returns (ListResponse);
}
// Lease is an object which retains resources while it exists.
message Lease {
string id = 1;
google.protobuf.Timestamp created_at = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
map<string, string> labels = 3;
}
message CreateRequest {
// ID is used to identity the lease, when the id is not set the service
// generates a random identifier for the lease.
string id = 1;
map<string, string> labels = 3;
}
message CreateResponse {
Lease lease = 1;
}
message DeleteRequest {
string id = 1;
}
message ListRequest {
repeated string filters = 1;
}
message ListResponse {
repeated Lease leases = 1;
}

View file

@ -22,9 +22,11 @@ import (
versionservice "github.com/containerd/containerd/api/services/version/v1"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/dialer"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/reference"
@ -34,6 +36,7 @@ import (
contentservice "github.com/containerd/containerd/services/content"
diffservice "github.com/containerd/containerd/services/diff"
imagesservice "github.com/containerd/containerd/services/images"
namespacesservice "github.com/containerd/containerd/services/namespaces"
snapshotservice "github.com/containerd/containerd/services/snapshot"
"github.com/containerd/containerd/snapshot"
"github.com/containerd/typeurl"
@ -70,7 +73,7 @@ func New(address string, opts ...ClientOpt) (*Client, error) {
grpc.WithTimeout(60 * time.Second),
grpc.FailOnNonTempDialError(true),
grpc.WithBackoffMaxDelay(3 * time.Second),
grpc.WithDialer(Dialer),
grpc.WithDialer(dialer.Dialer),
}
if len(copts.dialOptions) > 0 {
gopts = copts.dialOptions
@ -82,7 +85,7 @@ func New(address string, opts ...ClientOpt) (*Client, error) {
grpc.WithStreamInterceptor(stream),
)
}
conn, err := grpc.Dial(DialAddress(address), gopts...)
conn, err := grpc.Dial(dialer.DialAddress(address), gopts...)
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", address)
}
@ -135,6 +138,12 @@ func (c *Client) Containers(ctx context.Context, filters ...string) ([]Container
// NewContainer will create a new container in container with the provided id
// the id must be unique within the namespace
func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error) {
ctx, done, err := c.withLease(ctx)
if err != nil {
return nil, err
}
defer done()
container := containers.Container{
ID: id,
Runtime: containers.RuntimeInfo{
@ -210,6 +219,12 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
}
store := c.ContentStore()
ctx, done, err := c.withLease(ctx)
if err != nil {
return nil, err
}
defer done()
name, desc, err := pullCtx.Resolver.Resolve(ctx, ref)
if err != nil {
return nil, err
@ -228,7 +243,7 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
handler = images.Handlers(append(pullCtx.BaseHandlers, schema1Converter)...)
} else {
handler = images.Handlers(append(pullCtx.BaseHandlers,
remotes.FetchHandler(store, fetcher, desc),
remotes.FetchHandler(store, fetcher),
images.ChildrenHandler(store, platforms.Default()))...,
)
}
@ -265,11 +280,6 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
imgrec = created
}
// Remove root tag from manifest now that image refers to it
if _, err := store.Update(ctx, content.Info{Digest: desc.Digest}, "labels.containerd.io/gc.root"); err != nil {
return nil, errors.Wrap(err, "failed to remove manifest root tag")
}
img := &image{
client: c,
i: imgrec,
@ -414,9 +424,9 @@ func (c *Client) Close() error {
return c.conn.Close()
}
// NamespaceService returns the underlying NamespacesClient
func (c *Client) NamespaceService() namespacesapi.NamespacesClient {
return namespacesapi.NewNamespacesClient(c.conn)
// NamespaceService returns the underlying Namespaces Store
func (c *Client) NamespaceService() namespaces.Store {
return namespacesservice.NewStoreFromClient(namespacesapi.NewNamespacesClient(c.conn))
}
// ContainerService returns the underlying container Store
@ -449,6 +459,7 @@ func (c *Client) DiffService() diff.Differ {
return diffservice.NewDiffServiceFromClient(diffapi.NewDiffClient(c.conn))
}
// IntrospectionService returns the underlying Introspection Client
func (c *Client) IntrospectionService() introspectionapi.IntrospectionClient {
return introspectionapi.NewIntrospectionClient(c.conn)
}
@ -580,6 +591,13 @@ func (c *Client) Import(ctx context.Context, ref string, reader io.Reader, opts
if err != nil {
return nil, err
}
ctx, done, err := c.withLease(ctx)
if err != nil {
return nil, err
}
defer done()
switch iopts.format {
case ociImageFormat:
return c.importFromOCITar(ctx, ref, reader, iopts)

View file

@ -2,12 +2,10 @@ package containerd
import (
"context"
"time"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/snapshot"
"github.com/containerd/typeurl"
"github.com/gogo/protobuf/types"
"github.com/opencontainers/image-spec/identity"
@ -93,11 +91,8 @@ func WithNewSnapshot(id string, i Image) NewContainerOpts {
return err
}
setSnapshotterIfEmpty(c)
labels := map[string]string{
"containerd.io/gc.root": time.Now().String(),
}
parent := identity.ChainID(diffIDs).String()
if _, err := client.SnapshotService(c.Snapshotter).Prepare(ctx, id, parent, snapshot.WithLabels(labels)); err != nil {
if _, err := client.SnapshotService(c.Snapshotter).Prepare(ctx, id, parent); err != nil {
return err
}
c.SnapshotKey = id
@ -126,11 +121,8 @@ func WithNewSnapshotView(id string, i Image) NewContainerOpts {
return err
}
setSnapshotterIfEmpty(c)
labels := map[string]string{
"containerd.io/gc.root": time.Now().String(),
}
parent := identity.ChainID(diffIDs).String()
if _, err := client.SnapshotService(c.Snapshotter).View(ctx, id, parent, snapshot.WithLabels(labels)); err != nil {
if _, err := client.SnapshotService(c.Snapshotter).View(ctx, id, parent); err != nil {
return err
}
c.SnapshotKey = id

View file

@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
@ -27,6 +28,19 @@ var (
}
)
// LabelStore is used to store mutable labels for digests
type LabelStore interface {
// Get returns all the labels for the given digest
Get(digest.Digest) (map[string]string, error)
// Set sets all the labels for a given digest
Set(digest.Digest, map[string]string) error
// Update replaces the given labels for a digest,
// a key with an empty value removes a label.
Update(digest.Digest, map[string]string) (map[string]string, error)
}
// Store is digest-keyed store for content. All data written into the store is
// stored under a verifiable digest.
//
@ -34,16 +48,27 @@ var (
// including resumable ingest.
type store struct {
root string
ls LabelStore
}
// NewStore returns a local content store
func NewStore(root string) (content.Store, error) {
return NewLabeledStore(root, nil)
}
// NewLabeledStore returns a new content store using the provided label store
//
// Note: content stores which are used underneath a metadata store may not
// require labels and should use `NewStore`. `NewLabeledStore` is primarily
// useful for tests or standalone implementations.
func NewLabeledStore(root string, ls LabelStore) (content.Store, error) {
if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil && !os.IsExist(err) {
return nil, err
}
return &store{
root: root,
ls: ls,
}, nil
}
@ -57,16 +82,23 @@ func (s *store) Info(ctx context.Context, dgst digest.Digest) (content.Info, err
return content.Info{}, err
}
return s.info(dgst, fi), nil
var labels map[string]string
if s.ls != nil {
labels, err = s.ls.Get(dgst)
if err != nil {
return content.Info{}, err
}
}
return s.info(dgst, fi, labels), nil
}
func (s *store) info(dgst digest.Digest, fi os.FileInfo) content.Info {
func (s *store) info(dgst digest.Digest, fi os.FileInfo, labels map[string]string) content.Info {
return content.Info{
Digest: dgst,
Size: fi.Size(),
CreatedAt: fi.ModTime(),
UpdatedAt: fi.ModTime(),
UpdatedAt: getATime(fi),
Labels: labels,
}
}
@ -111,8 +143,66 @@ func (s *store) Delete(ctx context.Context, dgst digest.Digest) error {
}
func (s *store) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
// TODO: Support persisting and updating mutable content data
return content.Info{}, errors.Wrapf(errdefs.ErrFailedPrecondition, "update not supported on immutable content store")
if s.ls == nil {
return content.Info{}, errors.Wrapf(errdefs.ErrFailedPrecondition, "update not supported on immutable content store")
}
p := s.blobPath(info.Digest)
fi, err := os.Stat(p)
if err != nil {
if os.IsNotExist(err) {
err = errors.Wrapf(errdefs.ErrNotFound, "content %v", info.Digest)
}
return content.Info{}, err
}
var (
all bool
labels map[string]string
)
if len(fieldpaths) > 0 {
for _, path := range fieldpaths {
if strings.HasPrefix(path, "labels.") {
if labels == nil {
labels = map[string]string{}
}
key := strings.TrimPrefix(path, "labels.")
labels[key] = info.Labels[key]
continue
}
switch path {
case "labels":
all = true
labels = info.Labels
default:
return content.Info{}, errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on content info %q", path, info.Digest)
}
}
} else {
all = true
labels = info.Labels
}
if all {
err = s.ls.Set(info.Digest, labels)
} else {
labels, err = s.ls.Update(info.Digest, labels)
}
if err != nil {
return content.Info{}, err
}
info = s.info(info.Digest, fi, labels)
info.UpdatedAt = time.Now()
if err := os.Chtimes(p, info.UpdatedAt, info.CreatedAt); err != nil {
log.G(ctx).WithError(err).Warnf("could not change access time for %s", info.Digest)
}
return info, nil
}
func (s *store) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error {
@ -154,7 +244,14 @@ func (s *store) Walk(ctx context.Context, fn content.WalkFunc, filters ...string
// store or extra paths not expected previously.
}
return fn(s.info(dgst, fi))
var labels map[string]string
if s.ls != nil {
labels, err = s.ls.Get(dgst)
if err != nil {
return err
}
}
return fn(s.info(dgst, fi, labels))
})
}

View file

@ -18,3 +18,12 @@ func getStartTime(fi os.FileInfo) time.Time {
return fi.ModTime()
}
func getATime(fi os.FileInfo) time.Time {
if st, ok := fi.Sys().(*syscall.Stat_t); ok {
return time.Unix(int64(sys.StatAtime(st).Sec),
int64(sys.StatAtime(st).Nsec))
}
return fi.ModTime()
}

View file

@ -8,3 +8,7 @@ import (
func getStartTime(fi os.FileInfo) time.Time {
return fi.ModTime()
}
func getATime(fi os.FileInfo) time.Time {
return fi.ModTime()
}

View file

@ -56,6 +56,13 @@ func (w *writer) Write(p []byte) (n int, err error) {
}
func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
var base content.Info
for _, opt := range opts {
if err := opt(&base); err != nil {
return err
}
}
if w.fp == nil {
return errors.Wrap(errdefs.ErrFailedPrecondition, "cannot commit on closed writer")
}
@ -123,6 +130,12 @@ func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest,
w.fp = nil
unlock(w.ref)
if w.s.ls != nil && base.Labels != nil {
if err := w.s.ls.Set(dgst, base.Labels); err != nil {
return err
}
}
return nil
}

View file

@ -1,4 +1,4 @@
package containerd
package dialer
import (
"net"

View file

@ -1,6 +1,6 @@
// +build !windows
package containerd
package dialer
import (
"fmt"
@ -11,6 +11,12 @@ import (
"time"
)
// DialAddress returns the address with unix:// prepended to the
// provided address
func DialAddress(address string) string {
return fmt.Sprintf("unix://%s", address)
}
func isNoent(err error) bool {
if err != nil {
if nerr, ok := err.(*net.OpError); ok {
@ -28,9 +34,3 @@ func dialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", address, timeout)
}
// DialAddress returns the address with unix:// prepended to the
// provided address
func DialAddress(address string) string {
return fmt.Sprintf("unix://%s", address)
}

View file

@ -1,4 +1,4 @@
package containerd
package dialer
import (
"net"
@ -24,6 +24,7 @@ func dialer(address string, timeout time.Duration) (net.Conn, error) {
return winio.DialPipe(address, &timeout)
}
// DialAddress returns the dial address
func DialAddress(address string) string {
return address
}

View file

@ -1,12 +1,13 @@
package events
package exchange
import (
"context"
"strings"
"time"
events "github.com/containerd/containerd/api/services/events/v1"
v1 "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/log"
@ -34,7 +35,7 @@ func NewExchange() *Exchange {
//
// This is useful when an event is forwaded on behalf of another namespace or
// when the event is propagated on behalf of another publisher.
func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err error) {
func (e *Exchange) Forward(ctx context.Context, envelope *v1.Envelope) (err error) {
if err := validateEnvelope(envelope); err != nil {
return err
}
@ -59,11 +60,11 @@ func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err
// Publish packages and sends an event. The caller will be considered the
// initial publisher of the event. This means the timestamp will be calculated
// at this point and this method may read from the calling context.
func (e *Exchange) Publish(ctx context.Context, topic string, event Event) (err error) {
func (e *Exchange) Publish(ctx context.Context, topic string, event events.Event) (err error) {
var (
namespace string
encoded *types.Any
envelope events.Envelope
envelope v1.Envelope
)
namespace, err = namespaces.NamespaceRequired(ctx)
@ -108,9 +109,9 @@ func (e *Exchange) Publish(ctx context.Context, topic string, event Event) (err
// Zero or more filters may be provided as strings. Only events that match
// *any* of the provided filters will be sent on the channel. The filters use
// the standard containerd filters package syntax.
func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *events.Envelope, errs <-chan error) {
func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *v1.Envelope, errs <-chan error) {
var (
evch = make(chan *events.Envelope)
evch = make(chan *v1.Envelope)
errq = make(chan error, 1)
channel = goevents.NewChannel(0)
queue = goevents.NewQueue(channel)
@ -150,7 +151,7 @@ func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *even
for {
select {
case ev := <-channel.C:
env, ok := ev.(*events.Envelope)
env, ok := ev.(*v1.Envelope)
if !ok {
// TODO(stevvooe): For the most part, we are well protected
// from this condition. Both Forward and Publish protect
@ -204,7 +205,7 @@ func validateTopic(topic string) error {
return nil
}
func validateEnvelope(envelope *events.Envelope) error {
func validateEnvelope(envelope *v1.Envelope) error {
if err := namespaces.Validate(envelope.Namespace); err != nil {
return errors.Wrapf(err, "event envelope has invalid namespace")
}

View file

@ -10,7 +10,7 @@ import (
"sync"
)
// Resourcetype represents type of resource at a node
// ResourceType represents type of resource at a node
type ResourceType uint8
// Node presents a resource which has a type and key,
@ -145,10 +145,10 @@ func ConcurrentMark(ctx context.Context, root <-chan Node, refs func(context.Con
// Sweep removes all nodes returned through the channel which are not in
// the reachable set by calling the provided remove function.
func Sweep(reachable map[Node]struct{}, all <-chan Node, remove func(Node) error) error {
func Sweep(reachable map[Node]struct{}, all []Node, remove func(Node) error) error {
// All black objects are now reachable, and all white objects are
// unreachable. Free those that are white!
for node := range all {
for _, node := range all {
if _, ok := reachable[node]; !ok {
if err := remove(node); err != nil {
return err

View file

@ -3,9 +3,9 @@ package containerd
import (
"context"
"fmt"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/rootfs"
@ -30,6 +30,8 @@ type Image interface {
Size(ctx context.Context) (int64, error)
// Config descriptor for the image.
Config(ctx context.Context) (ocispec.Descriptor, error)
// IsUnpacked returns whether or not an image is unpacked.
IsUnpacked(context.Context, string) (bool, error)
}
var _ = (Image)(&image{})
@ -63,6 +65,26 @@ func (i *image) Config(ctx context.Context) (ocispec.Descriptor, error) {
return i.i.Config(ctx, provider, platforms.Default())
}
func (i *image) IsUnpacked(ctx context.Context, snapshotterName string) (bool, error) {
sn := i.client.SnapshotService(snapshotterName)
cs := i.client.ContentStore()
diffs, err := i.i.RootFS(ctx, cs, platforms.Default())
if err != nil {
return false, err
}
chainID := identity.ChainID(diffs)
_, err = sn.Stat(ctx, chainID.String())
if err == nil {
return true, nil
} else if !errdefs.IsNotFound(err) {
return false, err
}
return false, nil
}
func (i *image) Unpack(ctx context.Context, snapshotterName string) error {
layers, err := i.getLayers(ctx, platforms.Default())
if err != nil {
@ -79,27 +101,14 @@ func (i *image) Unpack(ctx context.Context, snapshotterName string) error {
)
for _, layer := range layers {
labels := map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
"containerd.io/uncompressed": layer.Diff.Digest.String(),
}
lastUnpacked := unpacked
unpacked, err = rootfs.ApplyLayer(ctx, layer, chain, sn, a, snapshot.WithLabels(labels))
if err != nil {
return err
}
if lastUnpacked {
info := snapshot.Info{
Name: identity.ChainID(chain).String(),
}
// Remove previously created gc.root label
if _, err := sn.Update(ctx, info, "labels.containerd.io/gc.root"); err != nil {
return err
}
}
chain = append(chain, layer.Diff.Digest)
}
@ -120,15 +129,6 @@ func (i *image) Unpack(ctx context.Context, snapshotterName string) error {
if _, err := cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", snapshotterName)); err != nil {
return err
}
sinfo := snapshot.Info{
Name: rootfs,
}
// Config now referenced snapshot, release root reference
if _, err := sn.Update(ctx, sinfo, "labels.containerd.io/gc.root"); err != nil {
return err
}
}
return nil

View file

@ -16,7 +16,7 @@ import (
// NewFifos returns a new set of fifos for the task
func NewFifos(id string) (*FIFOSet, error) {
root := filepath.Join(os.TempDir(), "containerd")
root := "/run/containerd/fifo"
if err := os.MkdirAll(root, 0700); err != nil {
return nil, err
}

91
vendor/github.com/containerd/containerd/lease.go generated vendored Normal file
View file

@ -0,0 +1,91 @@
package containerd
import (
"context"
"time"
leasesapi "github.com/containerd/containerd/api/services/leases/v1"
"github.com/containerd/containerd/leases"
)
// Lease is used to hold a reference to active resources which have not been
// referenced by a root resource. This is useful for preventing garbage
// collection of resources while they are actively being updated.
type Lease struct {
id string
createdAt time.Time
client *Client
}
// CreateLease creates a new lease
func (c *Client) CreateLease(ctx context.Context) (Lease, error) {
lapi := leasesapi.NewLeasesClient(c.conn)
resp, err := lapi.Create(ctx, &leasesapi.CreateRequest{})
if err != nil {
return Lease{}, err
}
return Lease{
id: resp.Lease.ID,
client: c,
}, nil
}
// ListLeases lists active leases
func (c *Client) ListLeases(ctx context.Context) ([]Lease, error) {
lapi := leasesapi.NewLeasesClient(c.conn)
resp, err := lapi.List(ctx, &leasesapi.ListRequest{})
if err != nil {
return nil, err
}
leases := make([]Lease, len(resp.Leases))
for i := range resp.Leases {
leases[i] = Lease{
id: resp.Leases[i].ID,
createdAt: resp.Leases[i].CreatedAt,
client: c,
}
}
return leases, nil
}
func (c *Client) withLease(ctx context.Context) (context.Context, func() error, error) {
_, ok := leases.Lease(ctx)
if ok {
return ctx, func() error {
return nil
}, nil
}
l, err := c.CreateLease(ctx)
if err != nil {
return nil, nil, err
}
ctx = leases.WithLease(ctx, l.ID())
return ctx, func() error {
return l.Delete(ctx)
}, nil
}
// ID returns the lease ID
func (l Lease) ID() string {
return l.id
}
// CreatedAt returns the time at which the lease was created
func (l Lease) CreatedAt() time.Time {
return l.createdAt
}
// Delete deletes the lease, removing the reference to all resources created
// during the lease.
func (l Lease) Delete(ctx context.Context) error {
lapi := leasesapi.NewLeasesClient(l.client.conn)
_, err := lapi.Delete(ctx, &leasesapi.DeleteRequest{
ID: l.id,
})
return err
}

View file

@ -0,0 +1,24 @@
package leases
import "context"
type leaseKey struct{}
// WithLease sets a given lease on the context
func WithLease(ctx context.Context, lid string) context.Context {
ctx = context.WithValue(ctx, leaseKey{}, lid)
// also store on the grpc headers so it gets picked up by any clients that
// are using this.
return withGRPCLeaseHeader(ctx, lid)
}
// Lease returns the lease from the context.
func Lease(ctx context.Context) (string, bool) {
lid, ok := ctx.Value(leaseKey{}).(string)
if !ok {
return fromGRPCHeader(ctx)
}
return lid, ok
}

41
vendor/github.com/containerd/containerd/leases/grpc.go generated vendored Normal file
View file

@ -0,0 +1,41 @@
package leases
import (
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
)
const (
// GRPCHeader defines the header name for specifying a containerd lease.
GRPCHeader = "containerd-lease"
)
func withGRPCLeaseHeader(ctx context.Context, lid string) context.Context {
// also store on the grpc headers so it gets picked up by any clients
// that are using this.
txheader := metadata.Pairs(GRPCHeader, lid)
md, ok := metadata.FromOutgoingContext(ctx) // merge with outgoing context.
if !ok {
md = txheader
} else {
// order ensures the latest is first in this list.
md = metadata.Join(txheader, md)
}
return metadata.NewOutgoingContext(ctx, md)
}
func fromGRPCHeader(ctx context.Context) (string, bool) {
// try to extract for use in grpc servers.
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", false
}
values := md[GRPCHeader]
if len(values) == 0 {
return "", false
}
return values[0], true
}

View file

@ -9,9 +9,10 @@ import (
"os"
"path/filepath"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/linux/runcopts"
client "github.com/containerd/containerd/linux/shim"
"github.com/containerd/containerd/linux/shim"
"github.com/containerd/containerd/linux/shim/client"
"github.com/pkg/errors"
)
@ -70,32 +71,33 @@ type bundle struct {
workDir string
}
type shimOpt func(*bundle, string, *runcopts.RuncOptions) (client.Config, client.ClientOpt)
// ShimOpt specifies shim options for initialization and connection
type ShimOpt func(*bundle, string, *runcopts.RuncOptions) (shim.Config, client.Opt)
// ShimRemote is a shimOpt for connecting and starting a remote shim
func ShimRemote(shim, daemonAddress, cgroup string, nonewns, debug bool, exitHandler func()) shimOpt {
return func(b *bundle, ns string, ropts *runcopts.RuncOptions) (client.Config, client.ClientOpt) {
// ShimRemote is a ShimOpt for connecting and starting a remote shim
func ShimRemote(shimBinary, daemonAddress, cgroup string, nonewns, debug bool, exitHandler func()) ShimOpt {
return func(b *bundle, ns string, ropts *runcopts.RuncOptions) (shim.Config, client.Opt) {
return b.shimConfig(ns, ropts),
client.WithStart(shim, b.shimAddress(ns), daemonAddress, cgroup, nonewns, debug, exitHandler)
client.WithStart(shimBinary, b.shimAddress(ns), daemonAddress, cgroup, nonewns, debug, exitHandler)
}
}
// ShimLocal is a shimOpt for using an in process shim implementation
func ShimLocal(exchange *events.Exchange) shimOpt {
return func(b *bundle, ns string, ropts *runcopts.RuncOptions) (client.Config, client.ClientOpt) {
// ShimLocal is a ShimOpt for using an in process shim implementation
func ShimLocal(exchange *exchange.Exchange) ShimOpt {
return func(b *bundle, ns string, ropts *runcopts.RuncOptions) (shim.Config, client.Opt) {
return b.shimConfig(ns, ropts), client.WithLocal(exchange)
}
}
// ShimConnect is a shimOpt for connecting to an existing remote shim
func ShimConnect() shimOpt {
return func(b *bundle, ns string, ropts *runcopts.RuncOptions) (client.Config, client.ClientOpt) {
// ShimConnect is a ShimOpt for connecting to an existing remote shim
func ShimConnect() ShimOpt {
return func(b *bundle, ns string, ropts *runcopts.RuncOptions) (shim.Config, client.Opt) {
return b.shimConfig(ns, ropts), client.WithConnect(b.shimAddress(ns))
}
}
// NewShimClient connects to the shim managing the bundle and tasks creating it if needed
func (b *bundle) NewShimClient(ctx context.Context, namespace string, getClientOpts shimOpt, runcOpts *runcopts.RuncOptions) (*client.Client, error) {
func (b *bundle) NewShimClient(ctx context.Context, namespace string, getClientOpts ShimOpt, runcOpts *runcopts.RuncOptions) (*client.Client, error) {
cfg, opt := getClientOpts(b, namespace, runcOpts)
return client.New(ctx, cfg, opt)
}
@ -118,7 +120,7 @@ func (b *bundle) shimAddress(namespace string) string {
return filepath.Join(string(filepath.Separator), "containerd-shim", namespace, b.id, "shim.sock")
}
func (b *bundle) shimConfig(namespace string, runcOptions *runcopts.RuncOptions) client.Config {
func (b *bundle) shimConfig(namespace string, runcOptions *runcopts.RuncOptions) shim.Config {
var (
criuPath string
runtimeRoot string
@ -129,7 +131,7 @@ func (b *bundle) shimConfig(namespace string, runcOptions *runcopts.RuncOptions)
systemdCgroup = runcOptions.SystemdCgroup
runtimeRoot = runcOptions.RuntimeRoot
}
return client.Config{
return shim.Config{
Path: b.path,
WorkDir: b.workDir,
Namespace: namespace,

View file

@ -15,7 +15,7 @@ import (
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/linux/runcopts"
client "github.com/containerd/containerd/linux/shim"
@ -143,7 +143,7 @@ type Runtime struct {
monitor runtime.TaskMonitor
tasks *runtime.TaskList
db *metadata.DB
events *events.Exchange
events *exchange.Exchange
config *Config
}

View file

@ -1,6 +1,6 @@
// +build !windows
package shim
package client
import (
"context"
@ -20,19 +20,23 @@ import (
"github.com/sirupsen/logrus"
"github.com/containerd/containerd/events"
shim "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/linux/shim"
shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/reaper"
"github.com/containerd/containerd/sys"
google_protobuf "github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"
)
// ClientOpt is an option for a shim client configuration
type ClientOpt func(context.Context, Config) (shim.ShimClient, io.Closer, error)
var empty = &google_protobuf.Empty{}
// Opt is an option for a shim client configuration
type Opt func(context.Context, shim.Config) (shimapi.ShimClient, io.Closer, error)
// WithStart executes a new shim process
func WithStart(binary, address, daemonAddress, cgroup string, nonewns, debug bool, exitHandler func()) ClientOpt {
return func(ctx context.Context, config Config) (_ shim.ShimClient, _ io.Closer, err error) {
func WithStart(binary, address, daemonAddress, cgroup string, nonewns, debug bool, exitHandler func()) Opt {
return func(ctx context.Context, config shim.Config) (_ shimapi.ShimClient, _ io.Closer, err error) {
socket, err := newSocket(address)
if err != nil {
return nil, nil, err
@ -84,24 +88,24 @@ func WithStart(binary, address, daemonAddress, cgroup string, nonewns, debug boo
}
}
func newCommand(binary, daemonAddress string, nonewns, debug bool, config Config, socket *os.File) *exec.Cmd {
func newCommand(binary, daemonAddress string, nonewns, debug bool, config shim.Config, socket *os.File) *exec.Cmd {
args := []string{
"--namespace", config.Namespace,
"--workdir", config.WorkDir,
"--address", daemonAddress,
"-namespace", config.Namespace,
"-workdir", config.WorkDir,
"-address", daemonAddress,
}
if config.Criu != "" {
args = append(args, "--criu-path", config.Criu)
args = append(args, "-criu-path", config.Criu)
}
if config.RuntimeRoot != "" {
args = append(args, "--runtime-root", config.RuntimeRoot)
args = append(args, "-runtime-root", config.RuntimeRoot)
}
if config.SystemdCgroup {
args = append(args, "--systemd-cgroup")
args = append(args, "-systemd-cgroup")
}
if debug {
args = append(args, "--debug")
args = append(args, "-debug")
}
cmd := exec.Command(binary, args...)
@ -160,39 +164,29 @@ func dialAddress(address string) string {
}
// WithConnect connects to an existing shim
func WithConnect(address string) ClientOpt {
return func(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) {
func WithConnect(address string) Opt {
return func(ctx context.Context, config shim.Config) (shimapi.ShimClient, io.Closer, error) {
conn, err := connect(address, annonDialer)
if err != nil {
return nil, nil, err
}
return shim.NewShimClient(conn), conn, nil
return shimapi.NewShimClient(conn), conn, nil
}
}
// WithLocal uses an in process shim
func WithLocal(publisher events.Publisher) func(context.Context, Config) (shim.ShimClient, io.Closer, error) {
return func(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) {
service, err := NewService(config, publisher)
func WithLocal(publisher events.Publisher) func(context.Context, shim.Config) (shimapi.ShimClient, io.Closer, error) {
return func(ctx context.Context, config shim.Config) (shimapi.ShimClient, io.Closer, error) {
service, err := shim.NewService(config, publisher)
if err != nil {
return nil, nil, err
}
return NewLocal(service), nil, nil
return shim.NewLocal(service), nil, nil
}
}
// Config contains shim specific configuration
type Config struct {
Path string
Namespace string
WorkDir string
Criu string
RuntimeRoot string
SystemdCgroup bool
}
// New returns a new shim client
func New(ctx context.Context, config Config, opt ClientOpt) (*Client, error) {
func New(ctx context.Context, config shim.Config, opt Opt) (*Client, error) {
s, c, err := opt(ctx, config)
if err != nil {
return nil, err
@ -206,7 +200,7 @@ func New(ctx context.Context, config Config, opt ClientOpt) (*Client, error) {
// Client is a shim client containing the connection to a shim
type Client struct {
shim.ShimClient
shimapi.ShimClient
c io.Closer
exitCh chan struct{}

View file

@ -1,6 +1,6 @@
// +build linux
package shim
package client
import (
"os/exec"

View file

@ -1,6 +1,6 @@
// +build !linux,!windows
package shim
package client
import (
"os/exec"

View file

@ -98,12 +98,16 @@ func (s *Service) newInitProcess(context context.Context, r *shimapi.CreateTaskR
return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m)
}
}
root := s.config.RuntimeRoot
if root == "" {
root = RuncRoot
}
runtime := &runc.Runc{
Command: r.Runtime,
Log: filepath.Join(s.config.Path, "log.json"),
LogFormat: runc.JSON,
PdeathSignal: syscall.SIGKILL,
Root: filepath.Join(s.config.RuntimeRoot, s.config.Namespace),
Root: filepath.Join(root, s.config.Namespace),
Criu: s.config.Criu,
SystemdCgroup: s.config.SystemdCgroup,
}

View file

@ -32,6 +32,16 @@ var empty = &google_protobuf.Empty{}
// RuncRoot is the path to the root runc state directory
const RuncRoot = "/run/containerd/runc"
// Config contains shim specific configuration
type Config struct {
Path string
Namespace string
WorkDir string
Criu string
RuntimeRoot string
SystemdCgroup bool
}
// NewService returns a new shim service that can be used via GRPC
func NewService(config Config, publisher events.Publisher) (*Service, error) {
if config.Namespace == "" {

View file

@ -11,7 +11,7 @@ import (
"github.com/containerd/cgroups"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
client "github.com/containerd/containerd/linux/shim"
"github.com/containerd/containerd/linux/shim/client"
shim "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/runtime"
"github.com/gogo/protobuf/types"

View file

@ -38,6 +38,7 @@ var (
bucketKeyObjectContent = []byte("content") // stores content references
bucketKeyObjectBlob = []byte("blob") // stores content links
bucketKeyObjectIngest = []byte("ingest") // stores ingest links
bucketKeyObjectLeases = []byte("leases") // stores leases
bucketKeyDigest = []byte("digest")
bucketKeyMediaType = []byte("mediatype")
@ -53,6 +54,7 @@ var (
bucketKeySnapshotter = []byte("snapshotter")
bucketKeyTarget = []byte("target")
bucketKeyExtensions = []byte("extensions")
bucketKeyCreatedAt = []byte("createdat")
)
func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket {

View file

@ -391,27 +391,31 @@ func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected dig
return err
}
}
return nw.commit(ctx, tx, size, expected, opts...)
dgst, err := nw.commit(ctx, tx, size, expected, opts...)
if err != nil {
return err
}
return addContentLease(ctx, tx, dgst)
})
}
func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) error {
func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) (digest.Digest, error) {
var base content.Info
for _, opt := range opts {
if err := opt(&base); err != nil {
return err
return "", err
}
}
if err := validateInfo(&base); err != nil {
return err
return "", err
}
status, err := nw.Writer.Status()
if err != nil {
return err
return "", err
}
if size != 0 && size != status.Offset {
return errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size)
return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size)
}
size = status.Offset
@ -419,32 +423,32 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64,
if err := nw.Writer.Commit(ctx, size, expected); err != nil {
if !errdefs.IsAlreadyExists(err) {
return err
return "", err
}
if getBlobBucket(tx, nw.namespace, actual) != nil {
return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
}
}
bkt, err := createBlobBucket(tx, nw.namespace, actual)
if err != nil {
return err
return "", err
}
commitTime := time.Now().UTC()
sizeEncoded, err := encodeInt(size)
if err != nil {
return err
return "", err
}
if err := boltutil.WriteTimestamps(bkt, commitTime, commitTime); err != nil {
return err
return "", err
}
if err := boltutil.WriteLabels(bkt, base.Labels); err != nil {
return err
return "", err
}
return bkt.Put(bucketKeySize, sizeEncoded)
return actual, bkt.Put(bucketKeySize, sizeEncoded)
}
func (nw *namespacedWriter) Status() (content.Status, error) {
@ -566,7 +570,7 @@ func (cs *contentStore) garbageCollect(ctx context.Context) error {
return err
}
if err := cs.Store.Walk(ctx, func(info content.Info) error {
return cs.Store.Walk(ctx, func(info content.Info) error {
if _, ok := seen[info.Digest.String()]; !ok {
if err := cs.Store.Delete(ctx, info.Digest); err != nil {
return err
@ -574,9 +578,5 @@ func (cs *contentStore) garbageCollect(ctx context.Context) error {
log.G(ctx).WithField("digest", info.Digest).Debug("removed content")
}
return nil
}); err != nil {
return err
}
return nil
})
}

View file

@ -190,6 +190,7 @@ func (m *DB) Update(fn func(*bolt.Tx) error) error {
return m.db.Update(fn)
}
// GarbageCollect starts garbage collection
func (m *DB) GarbageCollect(ctx context.Context) error {
lt1 := time.Now()
m.wlock.Lock()
@ -198,39 +199,8 @@ func (m *DB) GarbageCollect(ctx context.Context) error {
log.G(ctx).WithField("d", time.Now().Sub(lt1)).Debug("metadata garbage collected")
}()
var marked map[gc.Node]struct{}
if err := m.db.View(func(tx *bolt.Tx) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
roots := make(chan gc.Node)
errChan := make(chan error)
go func() {
defer close(errChan)
defer close(roots)
// Call roots
if err := scanRoots(ctx, tx, roots); err != nil {
cancel()
errChan <- err
}
}()
refs := func(ctx context.Context, n gc.Node, fn func(gc.Node)) error {
return references(ctx, tx, n, fn)
}
reachable, err := gc.ConcurrentMark(ctx, roots, refs)
if rerr := <-errChan; rerr != nil {
return rerr
}
if err != nil {
return err
}
marked = reachable
return nil
}); err != nil {
marked, err := m.getMarked(ctx)
if err != nil {
return err
}
@ -241,15 +211,11 @@ func (m *DB) GarbageCollect(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
nodeC := make(chan gc.Node)
var scanErr error
rm := func(ctx context.Context, n gc.Node) error {
if _, ok := marked[n]; ok {
return nil
}
go func() {
defer close(nodeC)
scanErr = scanAll(ctx, tx, nodeC)
}()
rm := func(n gc.Node) error {
if n.Type == ResourceSnapshot {
if idx := strings.IndexRune(n.Key, '/'); idx > 0 {
m.dirtySS[n.Key[:idx]] = struct{}{}
@ -260,12 +226,8 @@ func (m *DB) GarbageCollect(ctx context.Context) error {
return remove(ctx, tx, n)
}
if err := gc.Sweep(marked, nodeC, rm); err != nil {
return errors.Wrap(err, "failed to sweep")
}
if scanErr != nil {
return errors.Wrap(scanErr, "failed to scan all")
if err := scanAll(ctx, tx, rm); err != nil {
return errors.Wrap(err, "failed to scan and remove")
}
return nil
@ -292,6 +254,54 @@ func (m *DB) GarbageCollect(ctx context.Context) error {
return nil
}
func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) {
var marked map[gc.Node]struct{}
if err := m.db.View(func(tx *bolt.Tx) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var (
nodes []gc.Node
wg sync.WaitGroup
roots = make(chan gc.Node)
)
wg.Add(1)
go func() {
defer wg.Done()
for n := range roots {
nodes = append(nodes, n)
}
}()
// Call roots
if err := scanRoots(ctx, tx, roots); err != nil {
cancel()
return err
}
close(roots)
wg.Wait()
refs := func(n gc.Node) ([]gc.Node, error) {
var sn []gc.Node
if err := references(ctx, tx, n, func(nn gc.Node) {
sn = append(sn, nn)
}); err != nil {
return nil, err
}
return sn, nil
}
reachable, err := gc.Tricolor(nodes, refs)
if err != nil {
return err
}
marked = reachable
return nil
}); err != nil {
return nil, err
}
return marked, nil
}
func (m *DB) cleanupSnapshotter(name string) {
ctx := context.Background()
sn, ok := m.ss[name]

View file

@ -12,10 +12,15 @@ import (
)
const (
// ResourceUnknown specifies an unknown resource
ResourceUnknown gc.ResourceType = iota
// ResourceContent specifies a content resource
ResourceContent
// ResourceSnapshot specifies a snapshot resource
ResourceSnapshot
// ResourceContainer specifies a container resource
ResourceContainer
// ResourceTask specifies a task resource
ResourceTask
)
@ -41,6 +46,55 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
nbkt := v1bkt.Bucket(k)
ns := string(k)
lbkt := nbkt.Bucket(bucketKeyObjectLeases)
if lbkt != nil {
if err := lbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
libkt := lbkt.Bucket(k)
cbkt := libkt.Bucket(bucketKeyObjectContent)
if cbkt != nil {
if err := cbkt.ForEach(func(k, v []byte) error {
select {
case nc <- gcnode(ResourceContent, ns, string(k)):
case <-ctx.Done():
return ctx.Err()
}
return nil
}); err != nil {
return err
}
}
sbkt := libkt.Bucket(bucketKeyObjectSnapshots)
if sbkt != nil {
if err := sbkt.ForEach(func(sk, sv []byte) error {
if sv != nil {
return nil
}
snbkt := sbkt.Bucket(sk)
return snbkt.ForEach(func(k, v []byte) error {
select {
case nc <- gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", sk, k)):
case <-ctx.Done():
return ctx.Err()
}
return nil
})
}); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
}
ibkt := nbkt.Bucket(bucketKeyObjectImages)
if ibkt != nil {
if err := ibkt.ForEach(func(k, v []byte) error {
@ -174,7 +228,7 @@ func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)
return nil
}
func scanAll(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
func scanAll(ctx context.Context, tx *bolt.Tx, fn func(ctx context.Context, n gc.Node) error) error {
v1bkt := tx.Bucket(bucketKeyVersion)
if v1bkt == nil {
return nil
@ -201,12 +255,8 @@ func scanAll(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
if v != nil {
return nil
}
select {
case nc <- gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", sk, k)):
case <-ctx.Done():
return ctx.Err()
}
return nil
node := gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", sk, k))
return fn(ctx, node)
})
}); err != nil {
return err
@ -222,12 +272,8 @@ func scanAll(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
if v != nil {
return nil
}
select {
case nc <- gcnode(ResourceContent, ns, string(k)):
case <-ctx.Done():
return ctx.Err()
}
return nil
node := gcnode(ResourceContent, ns, string(k))
return fn(ctx, node)
}); err != nil {
return err
}

View file

@ -0,0 +1,201 @@
package metadata
import (
"context"
"time"
"github.com/boltdb/bolt"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/namespaces"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
// Lease retains resources to prevent garbage collection before
// the resources can be fully referenced.
type Lease struct {
ID string
CreatedAt time.Time
Labels map[string]string
Content []string
Snapshots map[string][]string
}
// LeaseManager manages the create/delete lifecyle of leases
// and also returns existing leases
type LeaseManager struct {
tx *bolt.Tx
}
// NewLeaseManager creates a new lease manager for managing leases using
// the provided database transaction.
func NewLeaseManager(tx *bolt.Tx) *LeaseManager {
return &LeaseManager{
tx: tx,
}
}
// Create creates a new lease using the provided lease
func (lm *LeaseManager) Create(ctx context.Context, lid string, labels map[string]string) (Lease, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return Lease{}, err
}
topbkt, err := createBucketIfNotExists(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if err != nil {
return Lease{}, err
}
txbkt, err := topbkt.CreateBucket([]byte(lid))
if err != nil {
if err == bolt.ErrBucketExists {
err = errdefs.ErrAlreadyExists
}
return Lease{}, err
}
t := time.Now().UTC()
createdAt, err := t.MarshalBinary()
if err != nil {
return Lease{}, err
}
if err := txbkt.Put(bucketKeyCreatedAt, createdAt); err != nil {
return Lease{}, err
}
if labels != nil {
if err := boltutil.WriteLabels(txbkt, labels); err != nil {
return Lease{}, err
}
}
return Lease{
ID: lid,
CreatedAt: t,
Labels: labels,
}, nil
}
// Delete delets the lease with the provided lease ID
func (lm *LeaseManager) Delete(ctx context.Context, lid string) error {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if topbkt == nil {
return nil
}
if err := topbkt.DeleteBucket([]byte(lid)); err != nil && err != bolt.ErrBucketNotFound {
return err
}
return nil
}
// List lists all active leases
func (lm *LeaseManager) List(ctx context.Context, includeResources bool, filter ...string) ([]Lease, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
var leases []Lease
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if topbkt == nil {
return leases, nil
}
if err := topbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
txbkt := topbkt.Bucket(k)
l := Lease{
ID: string(k),
}
if v := txbkt.Get(bucketKeyCreatedAt); v != nil {
t := &l.CreatedAt
if err := t.UnmarshalBinary(v); err != nil {
return err
}
}
labels, err := boltutil.ReadLabels(txbkt)
if err != nil {
return err
}
l.Labels = labels
// TODO: Read Snapshots
// TODO: Read Content
leases = append(leases, l)
return nil
}); err != nil {
return nil, err
}
return leases, nil
}
func addSnapshotLease(ctx context.Context, tx *bolt.Tx, snapshotter, key string) error {
lid, ok := leases.Lease(ctx)
if !ok {
return nil
}
namespace, ok := namespaces.Namespace(ctx)
if !ok {
panic("namespace must already be required")
}
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid))
if bkt == nil {
return errors.Wrap(errdefs.ErrNotFound, "lease does not exist")
}
bkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectSnapshots)
if err != nil {
return err
}
bkt, err = bkt.CreateBucketIfNotExists([]byte(snapshotter))
if err != nil {
return err
}
return bkt.Put([]byte(key), nil)
}
func addContentLease(ctx context.Context, tx *bolt.Tx, dgst digest.Digest) error {
lid, ok := leases.Lease(ctx)
if !ok {
return nil
}
namespace, ok := namespaces.Namespace(ctx)
if !ok {
panic("namespace must already be required")
}
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid))
if bkt == nil {
return errors.Wrap(errdefs.ErrNotFound, "lease does not exist")
}
bkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectContent)
if err != nil {
return err
}
return bkt.Put([]byte(dgst.String()), nil)
}

View file

@ -326,6 +326,10 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
return err
}
if err := addSnapshotLease(ctx, tx, s.name, key); err != nil {
return err
}
// TODO: Consider doing this outside of transaction to lessen
// metadata lock time
if readonly {

View file

@ -1,83 +0,0 @@
package mount
// On Solaris we can't invoke the mount system call directly. First,
// the mount system call takes more than 6 arguments, and go doesn't
// support invoking system calls that take more than 6 arguments. Past
// that, the mount system call is a private interfaces. For example,
// the arguments and data structures passed to the kernel to create an
// nfs mount are private and can change at any time. The only public
// and stable interface for creating mounts on Solaris is the mount.8
// command, so we'll invoke that here.
import (
"bytes"
"errors"
"fmt"
"os/exec"
"strings"
"golang.org/x/sys/unix"
)
const (
mountCmd = "/usr/sbin/mount"
)
func doMount(arg ...string) error {
cmd := exec.Command(mountCmd, arg...)
/* Setup Stdin, Stdout, and Stderr */
stderr := new(bytes.Buffer)
cmd.Stdin = nil
cmd.Stdout = nil
cmd.Stderr = stderr
/*
* Run the command. If the command fails create a new error
* object to return that includes stderr output.
*/
err := cmd.Start()
if err != nil {
return err
}
err = cmd.Wait()
if err != nil {
return errors.New(fmt.Sprintf("%v: %s", err, stderr.String()))
}
return nil
}
func (m *Mount) Mount(target string) error {
var err error
if len(m.Options) == 0 {
err = doMount("-F", m.Type, m.Source, target)
} else {
err = doMount("-F", m.Type, "-o", strings.Join(m.Options, ","),
m.Source, target)
}
return err
}
func Unmount(mount string, flags int) error {
return unix.Unmount(mount, flags)
}
// UnmountAll repeatedly unmounts the given mount point until there
// are no mounts remaining (EINVAL is returned by mount), which is
// useful for undoing a stack of mounts on the same mount point.
func UnmountAll(mount string, flags int) error {
for {
if err := Unmount(mount, flags); err != nil {
// EINVAL is returned if the target is not a
// mount point, indicating that we are
// done. It can also indicate a few other
// things (such as invalid flags) which we
// unfortunately end up squelching here too.
if err == unix.EINVAL {
return nil
}
return err
}
}
}

View file

@ -5,17 +5,21 @@ package mount
import "github.com/pkg/errors"
var (
// ErrNotImplementOnUnix is returned for methods that are not implemented
ErrNotImplementOnUnix = errors.New("not implemented under unix")
)
// Mount is not implemented on this platform
func (m *Mount) Mount(target string) error {
return ErrNotImplementOnUnix
}
// Unmount is not implemented on this platform
func Unmount(mount string, flags int) error {
return ErrNotImplementOnUnix
}
// UnmountAll is not implemented on this platform
func UnmountAll(mount string, flags int) error {
return ErrNotImplementOnUnix
}

View file

@ -3,17 +3,21 @@ package mount
import "github.com/pkg/errors"
var (
// ErrNotImplementOnWindows is returned when an action is not implemented for windows
ErrNotImplementOnWindows = errors.New("not implemented under windows")
)
// Mount to the provided target
func (m *Mount) Mount(target string) error {
return ErrNotImplementOnWindows
}
// Unmount the mount at the provided path
func Unmount(mount string, flags int) error {
return ErrNotImplementOnWindows
}
// UnmountAll mounts at the provided path
func UnmountAll(mount string, flags int) error {
return ErrNotImplementOnWindows
}

View file

@ -1,50 +0,0 @@
// +build solaris,cgo
package mount
/*
#include <stdio.h>
#include <stdlib.h>
#include <sys/mnttab.h>
*/
import "C"
import (
"fmt"
"unsafe"
)
// Self retrieves a list of mounts for the current running process.
func Self() ([]Info, error) {
path := C.CString(C.MNTTAB)
defer C.free(unsafe.Pointer(path))
mode := C.CString("r")
defer C.free(unsafe.Pointer(mode))
mnttab := C.fopen(path, mode)
if mnttab == nil {
return nil, fmt.Errorf("Failed to open %s", C.MNTTAB)
}
var out []Info
var mp C.struct_mnttab
ret := C.getmntent(mnttab, &mp)
for ret == 0 {
var mountinfo Info
mountinfo.Mountpoint = C.GoString(mp.mnt_mountp)
mountinfo.Source = C.GoString(mp.mnt_special)
mountinfo.FSType = C.GoString(mp.mnt_fstype)
mountinfo.Options = C.GoString(mp.mnt_mntopts)
out = append(out, mountinfo)
ret = C.getmntent(mnttab, &mp)
}
C.fclose(mnttab)
return out, nil
}
// PID collects the mounts for a specific process ID.
func PID(pid int) ([]Info, error) {
return nil, fmt.Errorf("mountinfo.PID is not implemented on solaris")
}

View file

@ -5,7 +5,7 @@ import (
"path/filepath"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/log"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
@ -18,15 +18,15 @@ type InitContext struct {
State string
Config interface{}
Address string
Events *events.Exchange
Events *exchange.Exchange
Meta *Meta // plugins can fill in metadata at init.
plugins *PluginSet
plugins *Set
}
// NewContext returns a new plugin InitContext
func NewContext(ctx context.Context, r *Registration, plugins *PluginSet, root, state string) *InitContext {
func NewContext(ctx context.Context, r *Registration, plugins *Set, root, state string) *InitContext {
return &InitContext{
Context: log.WithModule(ctx, r.URI()),
Root: filepath.Join(root, r.URI()),
@ -61,32 +61,37 @@ type Plugin struct {
err error // will be set if there was an error initializing the plugin
}
// Err returns the errors during initialization.
// returns nil if not error was encountered
func (p *Plugin) Err() error {
return p.err
}
// Instance returns the instance and any initialization error of the plugin
func (p *Plugin) Instance() (interface{}, error) {
return p.instance, p.err
}
// PluginSet defines a plugin collection, used with InitContext.
// Set defines a plugin collection, used with InitContext.
//
// This maintains ordering and unique indexing over the set.
//
// After iteratively instantiating plugins, this set should represent, the
// ordered, initialization set of plugins for a containerd instance.
type PluginSet struct {
type Set struct {
ordered []*Plugin // order of initialization
byTypeAndID map[Type]map[string]*Plugin
}
func NewPluginSet() *PluginSet {
return &PluginSet{
// NewPluginSet returns an initialized plugin set
func NewPluginSet() *Set {
return &Set{
byTypeAndID: make(map[Type]map[string]*Plugin),
}
}
func (ps *PluginSet) Add(p *Plugin) error {
// Add a plugin to the set
func (ps *Set) Add(p *Plugin) error {
if byID, typeok := ps.byTypeAndID[p.Registration.Type]; !typeok {
ps.byTypeAndID[p.Registration.Type] = map[string]*Plugin{
p.Registration.ID: p,
@ -102,13 +107,14 @@ func (ps *PluginSet) Add(p *Plugin) error {
}
// Get returns the first plugin by its type
func (ps *PluginSet) Get(t Type) (interface{}, error) {
func (ps *Set) Get(t Type) (interface{}, error) {
for _, v := range ps.byTypeAndID[t] {
return v.Instance()
}
return nil, errors.Wrapf(errdefs.ErrNotFound, "no plugins registered for %s", t)
}
// GetAll plugins in the set
func (i *InitContext) GetAll() []*Plugin {
return i.plugins.ordered
}

View file

@ -58,9 +58,13 @@ const (
// Registration contains information for registering a plugin
type Registration struct {
Type Type
ID string
Config interface{}
// Type of the plugin
Type Type
// ID of the plugin
ID string
// Config specific to the plugin
Config interface{}
// Requires is a list of plugins that the registered plugin requires to be available
Requires []Type
// InitFn is called when initializing a plugin. The registration and
@ -69,6 +73,7 @@ type Registration struct {
InitFn func(*InitContext) (interface{}, error)
}
// Init the registered plugin
func (r *Registration) Init(ic *InitContext) *Plugin {
p, err := r.InitFn(ic)
return &Plugin{

View file

@ -9,6 +9,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"strings"
"sync"
"time"
@ -159,7 +160,6 @@ func (c *Converter) Convert(ctx context.Context) (ocispec.Descriptor, error) {
}
labels := map[string]string{}
labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339)
labels["containerd.io/gc.ref.content.0"] = manifest.Config.Digest.String()
for i, ch := range manifest.Layers {
labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i+1)] = ch.Digest.String()
@ -175,12 +175,6 @@ func (c *Converter) Convert(ctx context.Context) (ocispec.Descriptor, error) {
return ocispec.Descriptor{}, errors.Wrap(err, "failed to write config")
}
for _, ch := range manifest.Layers {
if _, err := c.contentStore.Update(ctx, content.Info{Digest: ch.Digest}, "labels.containerd.io/gc.root"); err != nil {
return ocispec.Descriptor{}, errors.Wrap(err, "failed to remove blob root tag")
}
}
return desc, nil
}
@ -215,13 +209,26 @@ func (c *Converter) fetchManifest(ctx context.Context, desc ocispec.Descriptor)
func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) error {
log.G(ctx).Debug("fetch blob")
ref := remotes.MakeRefKey(ctx, desc)
calc := newBlobStateCalculator()
var (
ref = remotes.MakeRefKey(ctx, desc)
calc = newBlobStateCalculator()
retry = 16
)
tryit:
cw, err := c.contentStore.Writer(ctx, ref, desc.Size, desc.Digest)
if err != nil {
if !errdefs.IsAlreadyExists(err) {
if errdefs.IsUnavailable(err) {
select {
case <-time.After(time.Millisecond * time.Duration(rand.Intn(retry))):
if retry < 2048 {
retry = retry << 1
}
goto tryit
case <-ctx.Done():
return err
}
} else if !errdefs.IsAlreadyExists(err) {
return err
}
@ -270,10 +277,7 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro
eg.Go(func() error {
defer pw.Close()
opt := content.WithLabels(map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
})
return content.Copy(ctx, cw, io.TeeReader(rc, pw), desc.Size, desc.Digest, opt)
return content.Copy(ctx, cw, io.TeeReader(rc, pw), desc.Size, desc.Digest)
})
if err := eg.Wait(); err != nil {

View file

@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io"
"math/rand"
"time"
"github.com/containerd/containerd/content"
@ -44,7 +45,7 @@ func MakeRefKey(ctx context.Context, desc ocispec.Descriptor) string {
// FetchHandler returns a handler that will fetch all content into the ingester
// discovered in a call to Dispatch. Use with ChildrenHandler to do a full
// recursive fetch.
func FetchHandler(ingester content.Ingester, fetcher Fetcher, root ocispec.Descriptor) images.HandlerFunc {
func FetchHandler(ingester content.Ingester, fetcher Fetcher) images.HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{
"digest": desc.Digest,
@ -56,13 +57,13 @@ func FetchHandler(ingester content.Ingester, fetcher Fetcher, root ocispec.Descr
case images.MediaTypeDockerSchema1Manifest:
return nil, fmt.Errorf("%v not supported", desc.MediaType)
default:
err := fetch(ctx, ingester, fetcher, desc, desc.Digest == root.Digest)
err := fetch(ctx, ingester, fetcher, desc)
return nil, err
}
}
}
func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor, root bool) error {
func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor) error {
log.G(ctx).Debug("fetch")
var (
@ -84,7 +85,7 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc
// of writer and abort if not updated recently.
select {
case <-time.After(time.Millisecond * time.Duration(retry)):
case <-time.After(time.Millisecond * time.Duration(rand.Intn(retry))):
if retry < 2048 {
retry = retry << 1
}
@ -104,13 +105,13 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc
}
defer rc.Close()
r, opts := commitOpts(desc, rc, root)
r, opts := commitOpts(desc, rc)
return content.Copy(ctx, cw, r, desc.Size, desc.Digest, opts...)
}
// commitOpts gets the appropriate content options to alter
// the content info on commit based on media type.
func commitOpts(desc ocispec.Descriptor, r io.Reader, root bool) (io.Reader, []content.Opt) {
func commitOpts(desc ocispec.Descriptor, r io.Reader) (io.Reader, []content.Opt) {
var childrenF func(r io.Reader) ([]ocispec.Descriptor, error)
switch desc.MediaType {
@ -162,13 +163,10 @@ func commitOpts(desc ocispec.Descriptor, r io.Reader, root bool) (io.Reader, []c
return errors.Wrap(err, "unable to get commit labels")
}
if len(children) > 0 || root {
if len(children) > 0 {
if info.Labels == nil {
info.Labels = map[string]string{}
}
if root {
info.Labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339)
}
for i, ch := range children {
info.Labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = ch.Digest.String()
}

View file

@ -7,6 +7,7 @@ import (
"github.com/gogo/protobuf/types"
)
// TaskInfo provides task specific information
type TaskInfo struct {
ID string
Runtime string
@ -14,6 +15,7 @@ type TaskInfo struct {
Namespace string
}
// Process is a runtime object for an executing process inside a container
type Process interface {
ID() string
// State returns the process state
@ -30,6 +32,7 @@ type Process interface {
Wait(context.Context) (*Exit, error)
}
// Task is the runtime object for an executing container
type Task interface {
Process
@ -55,27 +58,37 @@ type Task interface {
Metrics(context.Context) (interface{}, error)
}
// ExecOpts provides additional options for additional processes running in a task
type ExecOpts struct {
Spec *types.Any
IO IO
}
// ConsoleSize of a pty or windows terminal
type ConsoleSize struct {
Width uint32
Height uint32
}
// Status is the runtime status of a task and/or process
type Status int
const (
// CreatedStatus when a process has been created
CreatedStatus Status = iota + 1
// RunningStatus when a process is running
RunningStatus
// StoppedStatus when a process has stopped
StoppedStatus
// DeletedStatus when a process has been deleted
DeletedStatus
// PausedStatus when a process is paused
PausedStatus
// PausingStatus when a process is currently pausing
PausingStatus
)
// State information for a process
type State struct {
// Status is the current status of the container
Status Status
@ -93,6 +106,7 @@ type State struct {
Terminal bool
}
// ProcessInfo holds platform specific process information
type ProcessInfo struct {
// Pid is the process ID
Pid uint32

View file

@ -9,21 +9,26 @@ import (
)
var (
ErrTaskNotExists = errors.New("task does not exist")
// ErrTaskNotExists is returned when a task does not exist
ErrTaskNotExists = errors.New("task does not exist")
// ErrTaskAlreadyExists is returned when a task already exists
ErrTaskAlreadyExists = errors.New("task already exists")
)
// NewTaskList returns a new TaskList
func NewTaskList() *TaskList {
return &TaskList{
tasks: make(map[string]map[string]Task),
}
}
// TaskList holds and provides locking around tasks
type TaskList struct {
mu sync.Mutex
tasks map[string]map[string]Task
}
// Get a task
func (l *TaskList) Get(ctx context.Context, id string) (Task, error) {
l.mu.Lock()
defer l.mu.Unlock()
@ -42,6 +47,7 @@ func (l *TaskList) Get(ctx context.Context, id string) (Task, error) {
return t, nil
}
// GetAll tasks under a namespace
func (l *TaskList) GetAll(ctx context.Context) ([]Task, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
@ -58,6 +64,7 @@ func (l *TaskList) GetAll(ctx context.Context) ([]Task, error) {
return o, nil
}
// Add a task
func (l *TaskList) Add(ctx context.Context, t Task) error {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
@ -66,6 +73,7 @@ func (l *TaskList) Add(ctx context.Context, t Task) error {
return l.AddWithNamespace(namespace, t)
}
// AddWithNamespace adds a task with the provided namespace
func (l *TaskList) AddWithNamespace(namespace string, t Task) error {
l.mu.Lock()
defer l.mu.Unlock()
@ -81,6 +89,7 @@ func (l *TaskList) AddWithNamespace(namespace string, t Task) error {
return nil
}
// Delete a task
func (l *TaskList) Delete(ctx context.Context, t Task) {
l.mu.Lock()
defer l.mu.Unlock()

View file

@ -33,23 +33,27 @@ type Config struct {
md toml.MetaData
}
// GRPCConfig provides GRPC configuration for the socket
type GRPCConfig struct {
Address string `toml:"address"`
Uid int `toml:"uid"`
Gid int `toml:"gid"`
UID int `toml:"uid"`
GID int `toml:"gid"`
}
// Debug provides debug configuration
type Debug struct {
Address string `toml:"address"`
Uid int `toml:"uid"`
Gid int `toml:"gid"`
UID int `toml:"uid"`
GID int `toml:"gid"`
Level string `toml:"level"`
}
// MetricsConfig provides metrics configuration
type MetricsConfig struct {
Address string `toml:"address"`
}
// CgroupConfig provides cgroup configuration
type CgroupConfig struct {
Path string `toml:"path"`
}

View file

@ -16,13 +16,14 @@ import (
eventsapi "github.com/containerd/containerd/api/services/events/v1"
images "github.com/containerd/containerd/api/services/images/v1"
introspection "github.com/containerd/containerd/api/services/introspection/v1"
leasesapi "github.com/containerd/containerd/api/services/leases/v1"
namespaces "github.com/containerd/containerd/api/services/namespaces/v1"
snapshotapi "github.com/containerd/containerd/api/services/snapshot/v1"
tasks "github.com/containerd/containerd/api/services/tasks/v1"
version "github.com/containerd/containerd/api/services/version/v1"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/content/local"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/plugin"
@ -65,7 +66,7 @@ func New(ctx context.Context, config *Config) (*Server, error) {
services []plugin.Service
s = &Server{
rpc: rpc,
events: events.NewExchange(),
events: exchange.NewExchange(),
}
initialized = plugin.NewPluginSet()
)
@ -122,7 +123,7 @@ func New(ctx context.Context, config *Config) (*Server, error) {
// Server is the containerd main daemon
type Server struct {
rpc *grpc.Server
events *events.Exchange
events *exchange.Exchange
}
// ServeGRPC provides the containerd grpc APIs on the provided listener
@ -255,6 +256,8 @@ func interceptor(
ctx = log.WithModule(ctx, "events")
case introspection.IntrospectionServer:
ctx = log.WithModule(ctx, "introspection")
case leasesapi.LeasesServer:
ctx = log.WithModule(ctx, "leases")
default:
log.G(ctx).Warnf("unknown GRPC server type: %#v\n", info.Server)
}

View file

@ -10,19 +10,6 @@ import (
specs "github.com/opencontainers/runtime-spec/specs-go"
)
const (
// DefaultRootDir is the default location used by containerd to store
// persistent data
DefaultRootDir = "/var/lib/containerd"
// DefaultStateDir is the default location used by containerd to store
// transient data
DefaultStateDir = "/run/containerd"
// DefaultAddress is the default unix socket address
DefaultAddress = "/run/containerd/containerd.sock"
// DefaultDebugAddress is the default unix socket address for pprof data
DefaultDebugAddress = "/run/containerd/debug.sock"
)
// apply sets config settings on the server process
func apply(ctx context.Context, config *Config) error {
if config.Subreaper {

View file

@ -44,6 +44,6 @@ func (ra *remoteReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
return n, nil
}
func (rr *remoteReaderAt) Close() error {
func (ra *remoteReaderAt) Close() error {
return nil
}

View file

@ -21,7 +21,7 @@ import (
"google.golang.org/grpc/codes"
)
type Service struct {
type service struct {
store content.Store
publisher events.Publisher
}
@ -32,7 +32,7 @@ var bufPool = sync.Pool{
},
}
var _ api.ContentServer = &Service{}
var _ api.ContentServer = &service{}
func init() {
plugin.Register(&plugin.Registration{
@ -53,19 +53,20 @@ func init() {
})
}
func NewService(cs content.Store, publisher events.Publisher) (*Service, error) {
return &Service{
// NewService returns the content GRPC server
func NewService(cs content.Store, publisher events.Publisher) (api.ContentServer, error) {
return &service{
store: cs,
publisher: publisher,
}, nil
}
func (s *Service) Register(server *grpc.Server) error {
func (s *service) Register(server *grpc.Server) error {
api.RegisterContentServer(server, s)
return nil
}
func (s *Service) Info(ctx context.Context, req *api.InfoRequest) (*api.InfoResponse, error) {
func (s *service) Info(ctx context.Context, req *api.InfoRequest) (*api.InfoResponse, error) {
if err := req.Digest.Validate(); err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, "%q failed validation", req.Digest)
}
@ -80,7 +81,7 @@ func (s *Service) Info(ctx context.Context, req *api.InfoRequest) (*api.InfoResp
}, nil
}
func (s *Service) Update(ctx context.Context, req *api.UpdateRequest) (*api.UpdateResponse, error) {
func (s *service) Update(ctx context.Context, req *api.UpdateRequest) (*api.UpdateResponse, error) {
if err := req.Info.Digest.Validate(); err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, "%q failed validation", req.Info.Digest)
}
@ -95,7 +96,7 @@ func (s *Service) Update(ctx context.Context, req *api.UpdateRequest) (*api.Upda
}, nil
}
func (s *Service) List(req *api.ListContentRequest, session api.Content_ListServer) error {
func (s *service) List(req *api.ListContentRequest, session api.Content_ListServer) error {
var (
buffer []api.Info
sendBlock = func(block []api.Info) error {
@ -137,7 +138,7 @@ func (s *Service) List(req *api.ListContentRequest, session api.Content_ListServ
return nil
}
func (s *Service) Delete(ctx context.Context, req *api.DeleteContentRequest) (*empty.Empty, error) {
func (s *service) Delete(ctx context.Context, req *api.DeleteContentRequest) (*empty.Empty, error) {
if err := req.Digest.Validate(); err != nil {
return nil, grpc.Errorf(codes.InvalidArgument, err.Error())
}
@ -155,7 +156,7 @@ func (s *Service) Delete(ctx context.Context, req *api.DeleteContentRequest) (*e
return &empty.Empty{}, nil
}
func (s *Service) Read(req *api.ReadContentRequest, session api.Content_ReadServer) error {
func (s *service) Read(req *api.ReadContentRequest, session api.Content_ReadServer) error {
if err := req.Digest.Validate(); err != nil {
return grpc.Errorf(codes.InvalidArgument, "%v: %v", req.Digest, err)
}
@ -223,7 +224,7 @@ func (rw *readResponseWriter) Write(p []byte) (n int, err error) {
return len(p), nil
}
func (s *Service) Status(ctx context.Context, req *api.StatusRequest) (*api.StatusResponse, error) {
func (s *service) Status(ctx context.Context, req *api.StatusRequest) (*api.StatusResponse, error) {
status, err := s.store.Status(ctx, req.Ref)
if err != nil {
return nil, errdefs.ToGRPCf(err, "could not get status for ref %q", req.Ref)
@ -242,7 +243,7 @@ func (s *Service) Status(ctx context.Context, req *api.StatusRequest) (*api.Stat
return &resp, nil
}
func (s *Service) ListStatuses(ctx context.Context, req *api.ListStatusesRequest) (*api.ListStatusesResponse, error) {
func (s *service) ListStatuses(ctx context.Context, req *api.ListStatusesRequest) (*api.ListStatusesResponse, error) {
statuses, err := s.store.ListStatuses(ctx, req.Filters...)
if err != nil {
return nil, errdefs.ToGRPC(err)
@ -263,7 +264,7 @@ func (s *Service) ListStatuses(ctx context.Context, req *api.ListStatusesRequest
return &resp, nil
}
func (s *Service) Write(session api.Content_WriteServer) (err error) {
func (s *service) Write(session api.Content_WriteServer) (err error) {
var (
ctx = session.Context()
msg api.WriteContentResponse
@ -283,7 +284,7 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) {
// identically across all GRPC methods.
//
// This is pretty noisy, so we can remove it but leave it for now.
log.G(ctx).WithError(err).Error("(*Service).Write failed")
log.G(ctx).WithError(err).Error("(*service).Write failed")
}
return
@ -319,7 +320,7 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(fields))
log.G(ctx).Debug("(*Service).Write started")
log.G(ctx).Debug("(*service).Write started")
// this action locks the writer for the session.
wr, err := s.store.Writer(ctx, ref, total, expected)
if err != nil {
@ -444,7 +445,7 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) {
}
}
func (s *Service) Abort(ctx context.Context, req *api.AbortRequest) (*empty.Empty, error) {
func (s *service) Abort(ctx context.Context, req *api.AbortRequest) (*empty.Empty, error) {
if err := s.store.Abort(ctx, req.Ref); err != nil {
return nil, errdefs.ToGRPC(err)
}

View file

@ -15,6 +15,7 @@ type remoteStore struct {
client contentapi.ContentClient
}
// NewStoreFromClient returns a new content store
func NewStoreFromClient(client contentapi.ContentClient) content.Store {
return &remoteStore{
client: client,

View file

@ -9,7 +9,7 @@ import (
"golang.org/x/net/context"
)
// NewApplierFromClient returns a new Applier which communicates
// NewDiffServiceFromClient returns a new diff service which communicates
// over a GRPC connection.
func NewDiffServiceFromClient(client diffapi.DiffClient) diff.Differ {
return &remote{

View file

@ -13,6 +13,7 @@ type remoteStore struct {
client imagesapi.ImagesClient
}
// NewStoreFromClient returns a new image store client
func NewStoreFromClient(client imagesapi.ImagesClient) images.Store {
return &remoteStore{
client: client,

View file

@ -34,24 +34,25 @@ func init() {
})
}
type Service struct {
type service struct {
db *metadata.DB
publisher events.Publisher
}
// NewService returns the GRPC image server
func NewService(db *metadata.DB, publisher events.Publisher) imagesapi.ImagesServer {
return &Service{
return &service{
db: db,
publisher: publisher,
}
}
func (s *Service) Register(server *grpc.Server) error {
func (s *service) Register(server *grpc.Server) error {
imagesapi.RegisterImagesServer(server, s)
return nil
}
func (s *Service) Get(ctx context.Context, req *imagesapi.GetImageRequest) (*imagesapi.GetImageResponse, error) {
func (s *service) Get(ctx context.Context, req *imagesapi.GetImageRequest) (*imagesapi.GetImageResponse, error) {
var resp imagesapi.GetImageResponse
return &resp, errdefs.ToGRPC(s.withStoreView(ctx, func(ctx context.Context, store images.Store) error {
@ -65,7 +66,7 @@ func (s *Service) Get(ctx context.Context, req *imagesapi.GetImageRequest) (*ima
}))
}
func (s *Service) List(ctx context.Context, req *imagesapi.ListImagesRequest) (*imagesapi.ListImagesResponse, error) {
func (s *service) List(ctx context.Context, req *imagesapi.ListImagesRequest) (*imagesapi.ListImagesResponse, error) {
var resp imagesapi.ListImagesResponse
return &resp, errdefs.ToGRPC(s.withStoreView(ctx, func(ctx context.Context, store images.Store) error {
@ -79,7 +80,7 @@ func (s *Service) List(ctx context.Context, req *imagesapi.ListImagesRequest) (*
}))
}
func (s *Service) Create(ctx context.Context, req *imagesapi.CreateImageRequest) (*imagesapi.CreateImageResponse, error) {
func (s *service) Create(ctx context.Context, req *imagesapi.CreateImageRequest) (*imagesapi.CreateImageResponse, error) {
if req.Image.Name == "" {
return nil, status.Errorf(codes.InvalidArgument, "Image.Name required")
}
@ -111,7 +112,7 @@ func (s *Service) Create(ctx context.Context, req *imagesapi.CreateImageRequest)
}
func (s *Service) Update(ctx context.Context, req *imagesapi.UpdateImageRequest) (*imagesapi.UpdateImageResponse, error) {
func (s *service) Update(ctx context.Context, req *imagesapi.UpdateImageRequest) (*imagesapi.UpdateImageResponse, error) {
if req.Image.Name == "" {
return nil, status.Errorf(codes.InvalidArgument, "Image.Name required")
}
@ -149,7 +150,7 @@ func (s *Service) Update(ctx context.Context, req *imagesapi.UpdateImageRequest)
return &resp, nil
}
func (s *Service) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest) (*empty.Empty, error) {
func (s *service) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest) (*empty.Empty, error) {
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store images.Store) error {
return errdefs.ToGRPC(store.Delete(ctx, req.Name))
}); err != nil {
@ -169,14 +170,14 @@ func (s *Service) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest)
return &empty.Empty{}, nil
}
func (s *Service) withStore(ctx context.Context, fn func(ctx context.Context, store images.Store) error) func(tx *bolt.Tx) error {
func (s *service) withStore(ctx context.Context, fn func(ctx context.Context, store images.Store) error) func(tx *bolt.Tx) error {
return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewImageStore(tx)) }
}
func (s *Service) withStoreView(ctx context.Context, fn func(ctx context.Context, store images.Store) error) error {
func (s *service) withStoreView(ctx context.Context, fn func(ctx context.Context, store images.Store) error) error {
return s.db.View(s.withStore(ctx, fn))
}
func (s *Service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store images.Store) error) error {
func (s *service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store images.Store) error) error {
return s.db.Update(s.withStore(ctx, fn))
}

View file

@ -0,0 +1,97 @@
package namespaces
import (
"context"
"strings"
api "github.com/containerd/containerd/api/services/namespaces/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/namespaces"
"github.com/gogo/protobuf/types"
)
// NewStoreFromClient returns a new namespace store
func NewStoreFromClient(client api.NamespacesClient) namespaces.Store {
return &remote{client: client}
}
type remote struct {
client api.NamespacesClient
}
func (r *remote) Create(ctx context.Context, namespace string, labels map[string]string) error {
var req api.CreateNamespaceRequest
req.Namespace = api.Namespace{
Name: namespace,
Labels: labels,
}
_, err := r.client.Create(ctx, &req)
if err != nil {
return errdefs.FromGRPC(err)
}
return nil
}
func (r *remote) Labels(ctx context.Context, namespace string) (map[string]string, error) {
var req api.GetNamespaceRequest
req.Name = namespace
resp, err := r.client.Get(ctx, &req)
if err != nil {
return nil, errdefs.FromGRPC(err)
}
return resp.Namespace.Labels, nil
}
func (r *remote) SetLabel(ctx context.Context, namespace, key, value string) error {
var req api.UpdateNamespaceRequest
req.Namespace = api.Namespace{
Name: namespace,
Labels: map[string]string{key: value},
}
req.UpdateMask = &types.FieldMask{
Paths: []string{strings.Join([]string{"labels", key}, ".")},
}
_, err := r.client.Update(ctx, &req)
if err != nil {
return errdefs.FromGRPC(err)
}
return nil
}
func (r *remote) List(ctx context.Context) ([]string, error) {
var req api.ListNamespacesRequest
resp, err := r.client.List(ctx, &req)
if err != nil {
return nil, errdefs.FromGRPC(err)
}
var namespaces []string
for _, ns := range resp.Namespaces {
namespaces = append(namespaces, ns.Name)
}
return namespaces, nil
}
func (r *remote) Delete(ctx context.Context, namespace string) error {
var req api.DeleteNamespaceRequest
req.Name = namespace
_, err := r.client.Delete(ctx, &req)
if err != nil {
return errdefs.FromGRPC(err)
}
return nil
}

View file

@ -0,0 +1,212 @@
package namespaces
import (
"strings"
"github.com/boltdb/bolt"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
api "github.com/containerd/containerd/api/services/namespaces/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin"
"github.com/golang/protobuf/ptypes/empty"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin,
ID: "namespaces",
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
return NewService(m.(*metadata.DB), ic.Events), nil
},
})
}
type service struct {
db *metadata.DB
publisher events.Publisher
}
var _ api.NamespacesServer = &service{}
// NewService returns the GRPC namespaces server
func NewService(db *metadata.DB, publisher events.Publisher) api.NamespacesServer {
return &service{
db: db,
publisher: publisher,
}
}
func (s *service) Register(server *grpc.Server) error {
api.RegisterNamespacesServer(server, s)
return nil
}
func (s *service) Get(ctx context.Context, req *api.GetNamespaceRequest) (*api.GetNamespaceResponse, error) {
var resp api.GetNamespaceResponse
return &resp, s.withStoreView(ctx, func(ctx context.Context, store namespaces.Store) error {
labels, err := store.Labels(ctx, req.Name)
if err != nil {
return errdefs.ToGRPC(err)
}
resp.Namespace = api.Namespace{
Name: req.Name,
Labels: labels,
}
return nil
})
}
func (s *service) List(ctx context.Context, req *api.ListNamespacesRequest) (*api.ListNamespacesResponse, error) {
var resp api.ListNamespacesResponse
return &resp, s.withStoreView(ctx, func(ctx context.Context, store namespaces.Store) error {
namespaces, err := store.List(ctx)
if err != nil {
return err
}
for _, namespace := range namespaces {
labels, err := store.Labels(ctx, namespace)
if err != nil {
// In general, this should be unlikely, since we are holding a
// transaction to service this request.
return errdefs.ToGRPC(err)
}
resp.Namespaces = append(resp.Namespaces, api.Namespace{
Name: namespace,
Labels: labels,
})
}
return nil
})
}
func (s *service) Create(ctx context.Context, req *api.CreateNamespaceRequest) (*api.CreateNamespaceResponse, error) {
var resp api.CreateNamespaceResponse
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error {
if err := store.Create(ctx, req.Namespace.Name, req.Namespace.Labels); err != nil {
return errdefs.ToGRPC(err)
}
for k, v := range req.Namespace.Labels {
if err := store.SetLabel(ctx, req.Namespace.Name, k, v); err != nil {
return err
}
}
resp.Namespace = req.Namespace
return nil
}); err != nil {
return &resp, err
}
if err := s.publisher.Publish(ctx, "/namespaces/create", &eventsapi.NamespaceCreate{
Name: req.Namespace.Name,
Labels: req.Namespace.Labels,
}); err != nil {
return &resp, err
}
return &resp, nil
}
func (s *service) Update(ctx context.Context, req *api.UpdateNamespaceRequest) (*api.UpdateNamespaceResponse, error) {
var resp api.UpdateNamespaceResponse
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error {
if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 {
for _, path := range req.UpdateMask.Paths {
switch {
case strings.HasPrefix(path, "labels."):
key := strings.TrimPrefix(path, "labels.")
if err := store.SetLabel(ctx, req.Namespace.Name, key, req.Namespace.Labels[key]); err != nil {
return err
}
default:
return grpc.Errorf(codes.InvalidArgument, "cannot update %q field", path)
}
}
} else {
// clear out the existing labels and then set them to the incoming request.
// get current set of labels
labels, err := store.Labels(ctx, req.Namespace.Name)
if err != nil {
return errdefs.ToGRPC(err)
}
for k := range labels {
if err := store.SetLabel(ctx, req.Namespace.Name, k, ""); err != nil {
return err
}
}
for k, v := range req.Namespace.Labels {
if err := store.SetLabel(ctx, req.Namespace.Name, k, v); err != nil {
return err
}
}
}
return nil
}); err != nil {
return &resp, err
}
if err := s.publisher.Publish(ctx, "/namespaces/update", &eventsapi.NamespaceUpdate{
Name: req.Namespace.Name,
Labels: req.Namespace.Labels,
}); err != nil {
return &resp, err
}
return &resp, nil
}
func (s *service) Delete(ctx context.Context, req *api.DeleteNamespaceRequest) (*empty.Empty, error) {
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store namespaces.Store) error {
return errdefs.ToGRPC(store.Delete(ctx, req.Name))
}); err != nil {
return &empty.Empty{}, err
}
// set the namespace in the context before publishing the event
ctx = namespaces.WithNamespace(ctx, req.Name)
if err := s.publisher.Publish(ctx, "/namespaces/delete", &eventsapi.NamespaceDelete{
Name: req.Name,
}); err != nil {
return &empty.Empty{}, err
}
return &empty.Empty{}, nil
}
func (s *service) withStore(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) func(tx *bolt.Tx) error {
return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewNamespaceStore(tx)) }
}
func (s *service) withStoreView(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) error {
return s.db.View(s.withStore(ctx, fn))
}
func (s *service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) error {
return s.db.Update(s.withStore(ctx, fn))
}

View file

@ -20,6 +20,9 @@ const (
KindCommitted
)
// ParseKind parses the provided string into a Kind
//
// If the string cannot be parsed KindUnknown is returned
func ParseKind(s string) Kind {
s = strings.ToLower(s)
switch s {
@ -34,6 +37,7 @@ func ParseKind(s string) Kind {
return KindUnknown
}
// String returns the string representation of the Kind
func (k Kind) String() string {
switch k {
case KindView:
@ -47,10 +51,12 @@ func (k Kind) String() string {
return "Unknown"
}
// MarshalJSON the Kind to JSON
func (k Kind) MarshalJSON() ([]byte, error) {
return json.Marshal(k.String())
}
// UnmarshalJSON the Kind from JSON
func (k *Kind) UnmarshalJSON(b []byte) error {
var s string
if err := json.Unmarshal(b, &s); err != nil {
@ -81,6 +87,7 @@ type Usage struct {
Size int64 // provides usage, in bytes, of snapshot
}
// Add the provided usage to the current usage
func (u *Usage) Add(other Usage) {
u.Size += other.Size

View file

@ -11,17 +11,16 @@ import (
"path/filepath"
"strconv"
"strings"
"time"
"golang.org/x/sys/unix"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/fs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/snapshot"
"github.com/opencontainers/image-spec/identity"
"github.com/opencontainers/image-spec/specs-go/v1"
"github.com/opencontainers/runc/libcontainer/user"
@ -260,19 +259,17 @@ func withRemappedSnapshotBase(id string, i Image, uid, gid uint32, readonly bool
snapshotter = client.SnapshotService(c.Snapshotter)
parent = identity.ChainID(diffIDs).String()
usernsID = fmt.Sprintf("%s-%d-%d", parent, uid, gid)
opt = snapshot.WithLabels(map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
})
)
if _, err := snapshotter.Stat(ctx, usernsID); err == nil {
if _, err := snapshotter.Prepare(ctx, id, usernsID, opt); err != nil {
if _, err := snapshotter.Prepare(ctx, id, usernsID); err == nil {
c.SnapshotKey = id
c.Image = i.Name()
return nil
} else if !errdefs.IsNotFound(err) {
return err
}
c.SnapshotKey = id
c.Image = i.Name()
return nil
}
mounts, err := snapshotter.Prepare(ctx, usernsID+"-remap", parent, opt)
mounts, err := snapshotter.Prepare(ctx, usernsID+"-remap", parent)
if err != nil {
return err
}
@ -280,13 +277,13 @@ func withRemappedSnapshotBase(id string, i Image, uid, gid uint32, readonly bool
snapshotter.Remove(ctx, usernsID)
return err
}
if err := snapshotter.Commit(ctx, usernsID, usernsID+"-remap", opt); err != nil {
if err := snapshotter.Commit(ctx, usernsID, usernsID+"-remap"); err != nil {
return err
}
if readonly {
_, err = snapshotter.View(ctx, id, usernsID, opt)
_, err = snapshotter.View(ctx, id, usernsID)
} else {
_, err = snapshotter.Prepare(ctx, id, usernsID, opt)
_, err = snapshotter.Prepare(ctx, id, usernsID)
}
if err != nil {
return err

View file

@ -15,6 +15,7 @@ import (
specs "github.com/opencontainers/runtime-spec/specs-go"
)
// WithImageConfig configures the spec to from the configuration of an Image
func WithImageConfig(i Image) SpecOpts {
return func(ctx context.Context, client *Client, _ *containers.Container, s *specs.Spec) error {
var (
@ -51,6 +52,8 @@ func WithImageConfig(i Image) SpecOpts {
}
}
// WithTTY sets the information on the spec as well as the environment variables for
// using a TTY
func WithTTY(width, height int) SpecOpts {
return func(_ context.Context, _ *Client, _ *containers.Container, s *specs.Spec) error {
s.Process.Terminal = true
@ -63,6 +66,7 @@ func WithTTY(width, height int) SpecOpts {
}
}
// WithResources sets the provided resources on the spec for task updates
func WithResources(resources *specs.WindowsResources) UpdateTaskOpts {
return func(ctx context.Context, client *Client, r *UpdateTaskInfo) error {
r.Resources = resources

View file

@ -151,6 +151,7 @@ func createDefaultSpec(ctx context.Context, id string) (*specs.Spec, error) {
"/proc/timer_stats",
"/proc/sched_debug",
"/sys/firmware",
"/proc/scsi",
},
ReadonlyPaths: []string{
"/proc/asound",

View file

@ -1,5 +1,8 @@
package sys
// SetOOMScore sets the oom score for the process
//
// Not implemented on Windows
func SetOOMScore(pid, score int) error {
return nil
}

View file

@ -1,19 +0,0 @@
// +build solaris
package sys
import (
"errors"
)
//Solaris TODO
// GetSubreaper returns the subreaper setting for the calling process
func GetSubreaper() (int, error) {
return 0, errors.New("osutils GetSubreaper not implemented on Solaris")
}
// SetSubreaper sets the value i as the subreaper setting for the calling process
func SetSubreaper(i int) error {
return errors.New("osutils SetSubreaper not implemented on Solaris")
}

View file

@ -6,14 +6,17 @@ import (
"syscall"
)
// StatAtime returns the access time from a stat struct
func StatAtime(st *syscall.Stat_t) syscall.Timespec {
return st.Atimespec
}
// StatCtime returns the created time from a stat struct
func StatCtime(st *syscall.Stat_t) syscall.Timespec {
return st.Ctimespec
}
// StatMtime returns the modified time from a stat struct
func StatMtime(st *syscall.Stat_t) syscall.Timespec {
return st.Mtimespec
}

View file

@ -6,14 +6,17 @@ import (
"syscall"
)
// StatAtime returns the Atim
func StatAtime(st *syscall.Stat_t) syscall.Timespec {
return st.Atim
}
// StatCtime returns the Ctim
func StatCtime(st *syscall.Stat_t) syscall.Timespec {
return st.Ctim
}
// StatMtime returns the Mtim
func StatMtime(st *syscall.Stat_t) syscall.Timespec {
return st.Mtim
}

View file

@ -18,7 +18,6 @@ import (
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/rootfs"
@ -26,7 +25,6 @@ import (
google_protobuf "github.com/gogo/protobuf/types"
digest "github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/specs-go/v1"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
)
@ -51,6 +49,7 @@ type Status struct {
ExitTime time.Time
}
// ProcessInfo provides platform specific process information
type ProcessInfo struct {
// Pid is the process ID
Pid uint32
@ -358,6 +357,12 @@ func (t *task) Resize(ctx context.Context, w, h uint32) error {
}
func (t *task) Checkpoint(ctx context.Context, opts ...CheckpointTaskOpts) (Image, error) {
ctx, done, err := t.client.withLease(ctx)
if err != nil {
return nil, err
}
defer done()
request := &tasks.CheckpointTaskRequest{
ContainerID: t.id,
}
@ -391,15 +396,6 @@ func (t *task) Checkpoint(ctx context.Context, opts ...CheckpointTaskOpts) (Imag
index := v1.Index{
Annotations: make(map[string]string),
}
// make sure we clear the gc root labels reguardless of success
var clearRoots []ocispec.Descriptor
defer func() {
for _, r := range append(index.Manifests, clearRoots...) {
if err := clearRootGCLabel(ctx, t.client, r); err != nil {
log.G(ctx).WithError(err).WithField("dgst", r.Digest).Warnf("failed to remove root marker")
}
}
}()
if err := t.checkpointTask(ctx, &index, request); err != nil {
return nil, err
}
@ -418,7 +414,6 @@ func (t *task) Checkpoint(ctx context.Context, opts ...CheckpointTaskOpts) (Imag
if err != nil {
return nil, err
}
clearRoots = append(clearRoots, desc)
im := images.Image{
Name: i.Name,
Target: desc,
@ -534,9 +529,6 @@ func (t *task) checkpointTask(ctx context.Context, index *v1.Index, request *tas
func (t *task) checkpointRWSnapshot(ctx context.Context, index *v1.Index, snapshotterName string, id string) error {
opts := []diff.Opt{
diff.WithReference(fmt.Sprintf("checkpoint-rw-%s", id)),
diff.WithLabels(map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
}),
}
rw, err := rootfs.Diff(ctx, id, t.client.SnapshotService(snapshotterName), t.client.DiffService(), opts...)
if err != nil {
@ -563,9 +555,7 @@ func (t *task) checkpointImage(ctx context.Context, index *v1.Index, image strin
}
func (t *task) writeIndex(ctx context.Context, index *v1.Index) (d v1.Descriptor, err error) {
labels := map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
}
labels := map[string]string{}
for i, m := range index.Manifests {
labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = m.Digest.String()
}
@ -595,9 +585,3 @@ func writeContent(ctx context.Context, store content.Store, mediaType, ref strin
Size: size,
}, nil
}
func clearRootGCLabel(ctx context.Context, client *Client, desc ocispec.Descriptor) error {
info := content.Info{Digest: desc.Digest}
_, err := client.ContentStore().Update(ctx, info, "labels.containerd.io/gc.root")
return err
}

View file

@ -16,14 +16,14 @@ github.com/docker/go-units v0.3.1
github.com/gogo/protobuf d2e1ade2d719b78fe5b061b4c18a9f7111b5bdc8
github.com/golang/protobuf 5a0f697c9ed9d68fef0116532c6e05cfeae00e55
github.com/opencontainers/runtime-spec v1.0.0
github.com/opencontainers/runc 0351df1c5a66838d0c392b4ac4cf9450de844e2d
github.com/opencontainers/runc 74a17296470088de3805e138d3d87c62e613dfc4
github.com/sirupsen/logrus v1.0.0
github.com/containerd/btrfs cc52c4dea2ce11a44e6639e561bb5c2af9ada9e3
github.com/stretchr/testify v1.1.4
github.com/davecgh/go-spew v1.1.0
github.com/pmezard/go-difflib v1.0.0
github.com/containerd/fifo fbfb6a11ec671efbe94ad1c12c2e98773f19e1e6
github.com/urfave/cli 8ba6f23b6e36d03666a14bd9421f5e3efcb59aca
github.com/urfave/cli 7bc6a0acffa589f415f88aca16cc1de5ffd66f9c
golang.org/x/net 7dcfb8076726a3fdd9353b6b8a1f1b6be6811bd6
google.golang.org/grpc v1.3.0
github.com/pkg/errors v0.8.0

View file

@ -1,2 +1,2 @@
// hcsshimtypes holds the windows runtime specific types
// Package hcsshimtypes holds the windows runtime specific types
package hcsshimtypes