client.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833
  1. /*
  2. Copyright The containerd Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package containerd
  14. import (
  15. "bytes"
  16. "context"
  17. "encoding/json"
  18. "fmt"
  19. "net/http"
  20. "runtime"
  21. "strconv"
  22. "strings"
  23. "sync"
  24. "time"
  25. containersapi "github.com/containerd/containerd/api/services/containers/v1"
  26. contentapi "github.com/containerd/containerd/api/services/content/v1"
  27. diffapi "github.com/containerd/containerd/api/services/diff/v1"
  28. eventsapi "github.com/containerd/containerd/api/services/events/v1"
  29. imagesapi "github.com/containerd/containerd/api/services/images/v1"
  30. introspectionapi "github.com/containerd/containerd/api/services/introspection/v1"
  31. leasesapi "github.com/containerd/containerd/api/services/leases/v1"
  32. namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1"
  33. snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
  34. "github.com/containerd/containerd/api/services/tasks/v1"
  35. versionservice "github.com/containerd/containerd/api/services/version/v1"
  36. apitypes "github.com/containerd/containerd/api/types"
  37. "github.com/containerd/containerd/containers"
  38. "github.com/containerd/containerd/content"
  39. contentproxy "github.com/containerd/containerd/content/proxy"
  40. "github.com/containerd/containerd/defaults"
  41. "github.com/containerd/containerd/errdefs"
  42. "github.com/containerd/containerd/events"
  43. "github.com/containerd/containerd/images"
  44. "github.com/containerd/containerd/leases"
  45. leasesproxy "github.com/containerd/containerd/leases/proxy"
  46. "github.com/containerd/containerd/namespaces"
  47. "github.com/containerd/containerd/pkg/dialer"
  48. "github.com/containerd/containerd/platforms"
  49. "github.com/containerd/containerd/plugin"
  50. "github.com/containerd/containerd/remotes"
  51. "github.com/containerd/containerd/remotes/docker"
  52. "github.com/containerd/containerd/services/introspection"
  53. "github.com/containerd/containerd/snapshots"
  54. snproxy "github.com/containerd/containerd/snapshots/proxy"
  55. "github.com/containerd/typeurl"
  56. ptypes "github.com/gogo/protobuf/types"
  57. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  58. specs "github.com/opencontainers/runtime-spec/specs-go"
  59. "github.com/pkg/errors"
  60. "golang.org/x/sync/semaphore"
  61. "google.golang.org/grpc"
  62. "google.golang.org/grpc/backoff"
  63. "google.golang.org/grpc/health/grpc_health_v1"
  64. )
  65. func init() {
  66. const prefix = "types.containerd.io"
  67. // register TypeUrls for commonly marshaled external types
  68. major := strconv.Itoa(specs.VersionMajor)
  69. typeurl.Register(&specs.Spec{}, prefix, "opencontainers/runtime-spec", major, "Spec")
  70. typeurl.Register(&specs.Process{}, prefix, "opencontainers/runtime-spec", major, "Process")
  71. typeurl.Register(&specs.LinuxResources{}, prefix, "opencontainers/runtime-spec", major, "LinuxResources")
  72. typeurl.Register(&specs.WindowsResources{}, prefix, "opencontainers/runtime-spec", major, "WindowsResources")
  73. }
  74. // New returns a new containerd client that is connected to the containerd
  75. // instance provided by address
  76. func New(address string, opts ...ClientOpt) (*Client, error) {
  77. var copts clientOpts
  78. for _, o := range opts {
  79. if err := o(&copts); err != nil {
  80. return nil, err
  81. }
  82. }
  83. if copts.timeout == 0 {
  84. copts.timeout = 10 * time.Second
  85. }
  86. c := &Client{
  87. defaultns: copts.defaultns,
  88. }
  89. if copts.defaultRuntime != "" {
  90. c.runtime = copts.defaultRuntime
  91. } else {
  92. c.runtime = defaults.DefaultRuntime
  93. }
  94. if copts.defaultPlatform != nil {
  95. c.platform = copts.defaultPlatform
  96. } else {
  97. c.platform = platforms.Default()
  98. }
  99. if copts.services != nil {
  100. c.services = *copts.services
  101. }
  102. if address != "" {
  103. backoffConfig := backoff.DefaultConfig
  104. backoffConfig.MaxDelay = 3 * time.Second
  105. connParams := grpc.ConnectParams{
  106. Backoff: backoffConfig,
  107. }
  108. gopts := []grpc.DialOption{
  109. grpc.WithBlock(),
  110. grpc.WithInsecure(),
  111. grpc.FailOnNonTempDialError(true),
  112. grpc.WithConnectParams(connParams),
  113. grpc.WithContextDialer(dialer.ContextDialer),
  114. // TODO(stevvooe): We may need to allow configuration of this on the client.
  115. grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(defaults.DefaultMaxRecvMsgSize)),
  116. grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(defaults.DefaultMaxSendMsgSize)),
  117. }
  118. if len(copts.dialOptions) > 0 {
  119. gopts = copts.dialOptions
  120. }
  121. if copts.defaultns != "" {
  122. unary, stream := newNSInterceptors(copts.defaultns)
  123. gopts = append(gopts,
  124. grpc.WithUnaryInterceptor(unary),
  125. grpc.WithStreamInterceptor(stream),
  126. )
  127. }
  128. connector := func() (*grpc.ClientConn, error) {
  129. ctx, cancel := context.WithTimeout(context.Background(), copts.timeout)
  130. defer cancel()
  131. conn, err := grpc.DialContext(ctx, dialer.DialAddress(address), gopts...)
  132. if err != nil {
  133. return nil, errors.Wrapf(err, "failed to dial %q", address)
  134. }
  135. return conn, nil
  136. }
  137. conn, err := connector()
  138. if err != nil {
  139. return nil, err
  140. }
  141. c.conn, c.connector = conn, connector
  142. }
  143. if copts.services == nil && c.conn == nil {
  144. return nil, errors.Wrap(errdefs.ErrUnavailable, "no grpc connection or services is available")
  145. }
  146. // check namespace labels for default runtime
  147. if copts.defaultRuntime == "" && c.defaultns != "" {
  148. if label, err := c.GetLabel(context.Background(), defaults.DefaultRuntimeNSLabel); err != nil {
  149. return nil, err
  150. } else if label != "" {
  151. c.runtime = label
  152. }
  153. }
  154. return c, nil
  155. }
  156. // NewWithConn returns a new containerd client that is connected to the containerd
  157. // instance provided by the connection
  158. func NewWithConn(conn *grpc.ClientConn, opts ...ClientOpt) (*Client, error) {
  159. var copts clientOpts
  160. for _, o := range opts {
  161. if err := o(&copts); err != nil {
  162. return nil, err
  163. }
  164. }
  165. c := &Client{
  166. defaultns: copts.defaultns,
  167. conn: conn,
  168. runtime: fmt.Sprintf("%s.%s", plugin.RuntimePlugin, runtime.GOOS),
  169. }
  170. // check namespace labels for default runtime
  171. if copts.defaultRuntime == "" && c.defaultns != "" {
  172. if label, err := c.GetLabel(context.Background(), defaults.DefaultRuntimeNSLabel); err != nil {
  173. return nil, err
  174. } else if label != "" {
  175. c.runtime = label
  176. }
  177. }
  178. if copts.services != nil {
  179. c.services = *copts.services
  180. }
  181. return c, nil
  182. }
  183. // Client is the client to interact with containerd and its various services
  184. // using a uniform interface
  185. type Client struct {
  186. services
  187. connMu sync.Mutex
  188. conn *grpc.ClientConn
  189. runtime string
  190. defaultns string
  191. platform platforms.MatchComparer
  192. connector func() (*grpc.ClientConn, error)
  193. }
  194. // Reconnect re-establishes the GRPC connection to the containerd daemon
  195. func (c *Client) Reconnect() error {
  196. if c.connector == nil {
  197. return errors.Wrap(errdefs.ErrUnavailable, "unable to reconnect to containerd, no connector available")
  198. }
  199. c.connMu.Lock()
  200. defer c.connMu.Unlock()
  201. c.conn.Close()
  202. conn, err := c.connector()
  203. if err != nil {
  204. return err
  205. }
  206. c.conn = conn
  207. return nil
  208. }
  209. // Runtime returns the name of the runtime being used
  210. func (c *Client) Runtime() string {
  211. return c.runtime
  212. }
  213. // IsServing returns true if the client can successfully connect to the
  214. // containerd daemon and the healthcheck service returns the SERVING
  215. // response.
  216. // This call will block if a transient error is encountered during
  217. // connection. A timeout can be set in the context to ensure it returns
  218. // early.
  219. func (c *Client) IsServing(ctx context.Context) (bool, error) {
  220. c.connMu.Lock()
  221. if c.conn == nil {
  222. c.connMu.Unlock()
  223. return false, errors.Wrap(errdefs.ErrUnavailable, "no grpc connection available")
  224. }
  225. c.connMu.Unlock()
  226. r, err := c.HealthService().Check(ctx, &grpc_health_v1.HealthCheckRequest{}, grpc.WaitForReady(true))
  227. if err != nil {
  228. return false, err
  229. }
  230. return r.Status == grpc_health_v1.HealthCheckResponse_SERVING, nil
  231. }
  232. // Containers returns all containers created in containerd
  233. func (c *Client) Containers(ctx context.Context, filters ...string) ([]Container, error) {
  234. r, err := c.ContainerService().List(ctx, filters...)
  235. if err != nil {
  236. return nil, err
  237. }
  238. var out []Container
  239. for _, container := range r {
  240. out = append(out, containerFromRecord(c, container))
  241. }
  242. return out, nil
  243. }
  244. // NewContainer will create a new container in container with the provided id
  245. // the id must be unique within the namespace
  246. func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error) {
  247. ctx, done, err := c.WithLease(ctx)
  248. if err != nil {
  249. return nil, err
  250. }
  251. defer done(ctx)
  252. container := containers.Container{
  253. ID: id,
  254. Runtime: containers.RuntimeInfo{
  255. Name: c.runtime,
  256. },
  257. }
  258. for _, o := range opts {
  259. if err := o(ctx, c, &container); err != nil {
  260. return nil, err
  261. }
  262. }
  263. r, err := c.ContainerService().Create(ctx, container)
  264. if err != nil {
  265. return nil, err
  266. }
  267. return containerFromRecord(c, r), nil
  268. }
  269. // LoadContainer loads an existing container from metadata
  270. func (c *Client) LoadContainer(ctx context.Context, id string) (Container, error) {
  271. r, err := c.ContainerService().Get(ctx, id)
  272. if err != nil {
  273. return nil, err
  274. }
  275. return containerFromRecord(c, r), nil
  276. }
  277. // RemoteContext is used to configure object resolutions and transfers with
  278. // remote content stores and image providers.
  279. type RemoteContext struct {
  280. // Resolver is used to resolve names to objects, fetchers, and pushers.
  281. // If no resolver is provided, defaults to Docker registry resolver.
  282. Resolver remotes.Resolver
  283. // PlatformMatcher is used to match the platforms for an image
  284. // operation and define the preference when a single match is required
  285. // from multiple platforms.
  286. PlatformMatcher platforms.MatchComparer
  287. // Unpack is done after an image is pulled to extract into a snapshotter.
  288. // It is done simultaneously for schema 2 images when they are pulled.
  289. // If an image is not unpacked on pull, it can be unpacked any time
  290. // afterwards. Unpacking is required to run an image.
  291. Unpack bool
  292. // UnpackOpts handles options to the unpack call.
  293. UnpackOpts []UnpackOpt
  294. // Snapshotter used for unpacking
  295. Snapshotter string
  296. // SnapshotterOpts are additional options to be passed to a snapshotter during pull
  297. SnapshotterOpts []snapshots.Opt
  298. // Labels to be applied to the created image
  299. Labels map[string]string
  300. // BaseHandlers are a set of handlers which get are called on dispatch.
  301. // These handlers always get called before any operation specific
  302. // handlers.
  303. BaseHandlers []images.Handler
  304. // HandlerWrapper wraps the handler which gets sent to dispatch.
  305. // Unlike BaseHandlers, this can run before and after the built
  306. // in handlers, allowing operations to run on the descriptor
  307. // after it has completed transferring.
  308. HandlerWrapper func(images.Handler) images.Handler
  309. // ConvertSchema1 is whether to convert Docker registry schema 1
  310. // manifests. If this option is false then any image which resolves
  311. // to schema 1 will return an error since schema 1 is not supported.
  312. ConvertSchema1 bool
  313. // Platforms defines which platforms to handle when doing the image operation.
  314. // Platforms is ignored when a PlatformMatcher is set, otherwise the
  315. // platforms will be used to create a PlatformMatcher with no ordering
  316. // preference.
  317. Platforms []string
  318. // MaxConcurrentDownloads is the max concurrent content downloads for each pull.
  319. MaxConcurrentDownloads int
  320. // MaxConcurrentUploadedLayers is the max concurrent uploaded layers for each push.
  321. MaxConcurrentUploadedLayers int
  322. // AllMetadata downloads all manifests and known-configuration files
  323. AllMetadata bool
  324. // ChildLabelMap sets the labels used to reference child objects in the content
  325. // store. By default, all GC reference labels will be set for all fetched content.
  326. ChildLabelMap func(ocispec.Descriptor) []string
  327. }
  328. func defaultRemoteContext() *RemoteContext {
  329. return &RemoteContext{
  330. Resolver: docker.NewResolver(docker.ResolverOptions{
  331. Client: http.DefaultClient,
  332. }),
  333. }
  334. }
  335. // Fetch downloads the provided content into containerd's content store
  336. // and returns a non-platform specific image reference
  337. func (c *Client) Fetch(ctx context.Context, ref string, opts ...RemoteOpt) (images.Image, error) {
  338. fetchCtx := defaultRemoteContext()
  339. for _, o := range opts {
  340. if err := o(c, fetchCtx); err != nil {
  341. return images.Image{}, err
  342. }
  343. }
  344. if fetchCtx.Unpack {
  345. return images.Image{}, errors.Wrap(errdefs.ErrNotImplemented, "unpack on fetch not supported, try pull")
  346. }
  347. if fetchCtx.PlatformMatcher == nil {
  348. if len(fetchCtx.Platforms) == 0 {
  349. fetchCtx.PlatformMatcher = platforms.All
  350. } else {
  351. var ps []ocispec.Platform
  352. for _, s := range fetchCtx.Platforms {
  353. p, err := platforms.Parse(s)
  354. if err != nil {
  355. return images.Image{}, errors.Wrapf(err, "invalid platform %s", s)
  356. }
  357. ps = append(ps, p)
  358. }
  359. fetchCtx.PlatformMatcher = platforms.Any(ps...)
  360. }
  361. }
  362. ctx, done, err := c.WithLease(ctx)
  363. if err != nil {
  364. return images.Image{}, err
  365. }
  366. defer done(ctx)
  367. img, err := c.fetch(ctx, fetchCtx, ref, 0)
  368. if err != nil {
  369. return images.Image{}, err
  370. }
  371. return c.createNewImage(ctx, img)
  372. }
  373. // Push uploads the provided content to a remote resource
  374. func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, opts ...RemoteOpt) error {
  375. pushCtx := defaultRemoteContext()
  376. for _, o := range opts {
  377. if err := o(c, pushCtx); err != nil {
  378. return err
  379. }
  380. }
  381. if pushCtx.PlatformMatcher == nil {
  382. if len(pushCtx.Platforms) > 0 {
  383. var ps []ocispec.Platform
  384. for _, platform := range pushCtx.Platforms {
  385. p, err := platforms.Parse(platform)
  386. if err != nil {
  387. return errors.Wrapf(err, "invalid platform %s", platform)
  388. }
  389. ps = append(ps, p)
  390. }
  391. pushCtx.PlatformMatcher = platforms.Any(ps...)
  392. } else {
  393. pushCtx.PlatformMatcher = platforms.All
  394. }
  395. }
  396. // Annotate ref with digest to push only push tag for single digest
  397. if !strings.Contains(ref, "@") {
  398. ref = ref + "@" + desc.Digest.String()
  399. }
  400. pusher, err := pushCtx.Resolver.Pusher(ctx, ref)
  401. if err != nil {
  402. return err
  403. }
  404. var wrapper func(images.Handler) images.Handler
  405. if len(pushCtx.BaseHandlers) > 0 {
  406. wrapper = func(h images.Handler) images.Handler {
  407. h = images.Handlers(append(pushCtx.BaseHandlers, h)...)
  408. if pushCtx.HandlerWrapper != nil {
  409. h = pushCtx.HandlerWrapper(h)
  410. }
  411. return h
  412. }
  413. } else if pushCtx.HandlerWrapper != nil {
  414. wrapper = pushCtx.HandlerWrapper
  415. }
  416. var limiter *semaphore.Weighted
  417. if pushCtx.MaxConcurrentUploadedLayers > 0 {
  418. limiter = semaphore.NewWeighted(int64(pushCtx.MaxConcurrentUploadedLayers))
  419. }
  420. return remotes.PushContent(ctx, pusher, desc, c.ContentStore(), limiter, pushCtx.PlatformMatcher, wrapper)
  421. }
  422. // GetImage returns an existing image
  423. func (c *Client) GetImage(ctx context.Context, ref string) (Image, error) {
  424. i, err := c.ImageService().Get(ctx, ref)
  425. if err != nil {
  426. return nil, err
  427. }
  428. return NewImage(c, i), nil
  429. }
  430. // ListImages returns all existing images
  431. func (c *Client) ListImages(ctx context.Context, filters ...string) ([]Image, error) {
  432. imgs, err := c.ImageService().List(ctx, filters...)
  433. if err != nil {
  434. return nil, err
  435. }
  436. images := make([]Image, len(imgs))
  437. for i, img := range imgs {
  438. images[i] = NewImage(c, img)
  439. }
  440. return images, nil
  441. }
  442. // Restore restores a container from a checkpoint
  443. func (c *Client) Restore(ctx context.Context, id string, checkpoint Image, opts ...RestoreOpts) (Container, error) {
  444. store := c.ContentStore()
  445. index, err := decodeIndex(ctx, store, checkpoint.Target())
  446. if err != nil {
  447. return nil, err
  448. }
  449. ctx, done, err := c.WithLease(ctx)
  450. if err != nil {
  451. return nil, err
  452. }
  453. defer done(ctx)
  454. copts := []NewContainerOpts{}
  455. for _, o := range opts {
  456. copts = append(copts, o(ctx, id, c, checkpoint, index))
  457. }
  458. ctr, err := c.NewContainer(ctx, id, copts...)
  459. if err != nil {
  460. return nil, err
  461. }
  462. return ctr, nil
  463. }
  464. func writeIndex(ctx context.Context, index *ocispec.Index, client *Client, ref string) (d ocispec.Descriptor, err error) {
  465. labels := map[string]string{}
  466. for i, m := range index.Manifests {
  467. labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = m.Digest.String()
  468. }
  469. data, err := json.Marshal(index)
  470. if err != nil {
  471. return ocispec.Descriptor{}, err
  472. }
  473. return writeContent(ctx, client.ContentStore(), ocispec.MediaTypeImageIndex, ref, bytes.NewReader(data), content.WithLabels(labels))
  474. }
  475. // GetLabel gets a label value from namespace store
  476. // If there is no default label, an empty string returned with nil error
  477. func (c *Client) GetLabel(ctx context.Context, label string) (string, error) {
  478. ns, err := namespaces.NamespaceRequired(ctx)
  479. if err != nil {
  480. if c.defaultns == "" {
  481. return "", err
  482. }
  483. ns = c.defaultns
  484. }
  485. srv := c.NamespaceService()
  486. labels, err := srv.Labels(ctx, ns)
  487. if err != nil {
  488. return "", err
  489. }
  490. value := labels[label]
  491. return value, nil
  492. }
  493. // Subscribe to events that match one or more of the provided filters.
  494. //
  495. // Callers should listen on both the envelope and errs channels. If the errs
  496. // channel returns nil or an error, the subscriber should terminate.
  497. //
  498. // The subscriber can stop receiving events by canceling the provided context.
  499. // The errs channel will be closed and return a nil error.
  500. func (c *Client) Subscribe(ctx context.Context, filters ...string) (ch <-chan *events.Envelope, errs <-chan error) {
  501. return c.EventService().Subscribe(ctx, filters...)
  502. }
  503. // Close closes the clients connection to containerd
  504. func (c *Client) Close() error {
  505. c.connMu.Lock()
  506. defer c.connMu.Unlock()
  507. if c.conn != nil {
  508. return c.conn.Close()
  509. }
  510. return nil
  511. }
  512. // NamespaceService returns the underlying Namespaces Store
  513. func (c *Client) NamespaceService() namespaces.Store {
  514. if c.namespaceStore != nil {
  515. return c.namespaceStore
  516. }
  517. c.connMu.Lock()
  518. defer c.connMu.Unlock()
  519. return NewNamespaceStoreFromClient(namespacesapi.NewNamespacesClient(c.conn))
  520. }
  521. // ContainerService returns the underlying container Store
  522. func (c *Client) ContainerService() containers.Store {
  523. if c.containerStore != nil {
  524. return c.containerStore
  525. }
  526. c.connMu.Lock()
  527. defer c.connMu.Unlock()
  528. return NewRemoteContainerStore(containersapi.NewContainersClient(c.conn))
  529. }
  530. // ContentStore returns the underlying content Store
  531. func (c *Client) ContentStore() content.Store {
  532. if c.contentStore != nil {
  533. return c.contentStore
  534. }
  535. c.connMu.Lock()
  536. defer c.connMu.Unlock()
  537. return contentproxy.NewContentStore(contentapi.NewContentClient(c.conn))
  538. }
  539. // SnapshotService returns the underlying snapshotter for the provided snapshotter name
  540. func (c *Client) SnapshotService(snapshotterName string) snapshots.Snapshotter {
  541. snapshotterName, err := c.resolveSnapshotterName(context.Background(), snapshotterName)
  542. if err != nil {
  543. snapshotterName = DefaultSnapshotter
  544. }
  545. if c.snapshotters != nil {
  546. return c.snapshotters[snapshotterName]
  547. }
  548. c.connMu.Lock()
  549. defer c.connMu.Unlock()
  550. return snproxy.NewSnapshotter(snapshotsapi.NewSnapshotsClient(c.conn), snapshotterName)
  551. }
  552. // TaskService returns the underlying TasksClient
  553. func (c *Client) TaskService() tasks.TasksClient {
  554. if c.taskService != nil {
  555. return c.taskService
  556. }
  557. c.connMu.Lock()
  558. defer c.connMu.Unlock()
  559. return tasks.NewTasksClient(c.conn)
  560. }
  561. // ImageService returns the underlying image Store
  562. func (c *Client) ImageService() images.Store {
  563. if c.imageStore != nil {
  564. return c.imageStore
  565. }
  566. c.connMu.Lock()
  567. defer c.connMu.Unlock()
  568. return NewImageStoreFromClient(imagesapi.NewImagesClient(c.conn))
  569. }
  570. // DiffService returns the underlying Differ
  571. func (c *Client) DiffService() DiffService {
  572. if c.diffService != nil {
  573. return c.diffService
  574. }
  575. c.connMu.Lock()
  576. defer c.connMu.Unlock()
  577. return NewDiffServiceFromClient(diffapi.NewDiffClient(c.conn))
  578. }
  579. // IntrospectionService returns the underlying Introspection Client
  580. func (c *Client) IntrospectionService() introspection.Service {
  581. if c.introspectionService != nil {
  582. return c.introspectionService
  583. }
  584. c.connMu.Lock()
  585. defer c.connMu.Unlock()
  586. return introspection.NewIntrospectionServiceFromClient(introspectionapi.NewIntrospectionClient(c.conn))
  587. }
  588. // LeasesService returns the underlying Leases Client
  589. func (c *Client) LeasesService() leases.Manager {
  590. if c.leasesService != nil {
  591. return c.leasesService
  592. }
  593. c.connMu.Lock()
  594. defer c.connMu.Unlock()
  595. return leasesproxy.NewLeaseManager(leasesapi.NewLeasesClient(c.conn))
  596. }
  597. // HealthService returns the underlying GRPC HealthClient
  598. func (c *Client) HealthService() grpc_health_v1.HealthClient {
  599. c.connMu.Lock()
  600. defer c.connMu.Unlock()
  601. return grpc_health_v1.NewHealthClient(c.conn)
  602. }
  603. // EventService returns the underlying event service
  604. func (c *Client) EventService() EventService {
  605. if c.eventService != nil {
  606. return c.eventService
  607. }
  608. c.connMu.Lock()
  609. defer c.connMu.Unlock()
  610. return NewEventServiceFromClient(eventsapi.NewEventsClient(c.conn))
  611. }
  612. // VersionService returns the underlying VersionClient
  613. func (c *Client) VersionService() versionservice.VersionClient {
  614. c.connMu.Lock()
  615. defer c.connMu.Unlock()
  616. return versionservice.NewVersionClient(c.conn)
  617. }
  618. // Conn returns the underlying GRPC connection object
  619. func (c *Client) Conn() *grpc.ClientConn {
  620. c.connMu.Lock()
  621. defer c.connMu.Unlock()
  622. return c.conn
  623. }
  624. // Version of containerd
  625. type Version struct {
  626. // Version number
  627. Version string
  628. // Revision from git that was built
  629. Revision string
  630. }
  631. // Version returns the version of containerd that the client is connected to
  632. func (c *Client) Version(ctx context.Context) (Version, error) {
  633. c.connMu.Lock()
  634. if c.conn == nil {
  635. c.connMu.Unlock()
  636. return Version{}, errors.Wrap(errdefs.ErrUnavailable, "no grpc connection available")
  637. }
  638. c.connMu.Unlock()
  639. response, err := c.VersionService().Version(ctx, &ptypes.Empty{})
  640. if err != nil {
  641. return Version{}, err
  642. }
  643. return Version{
  644. Version: response.Version,
  645. Revision: response.Revision,
  646. }, nil
  647. }
  648. // ServerInfo represents the introspected server information
  649. type ServerInfo struct {
  650. UUID string
  651. }
  652. // Server returns server information from the introspection service
  653. func (c *Client) Server(ctx context.Context) (ServerInfo, error) {
  654. c.connMu.Lock()
  655. if c.conn == nil {
  656. c.connMu.Unlock()
  657. return ServerInfo{}, errors.Wrap(errdefs.ErrUnavailable, "no grpc connection available")
  658. }
  659. c.connMu.Unlock()
  660. response, err := c.IntrospectionService().Server(ctx, &ptypes.Empty{})
  661. if err != nil {
  662. return ServerInfo{}, err
  663. }
  664. return ServerInfo{
  665. UUID: response.UUID,
  666. }, nil
  667. }
  668. func (c *Client) resolveSnapshotterName(ctx context.Context, name string) (string, error) {
  669. if name == "" {
  670. label, err := c.GetLabel(ctx, defaults.DefaultSnapshotterNSLabel)
  671. if err != nil {
  672. return "", err
  673. }
  674. if label != "" {
  675. name = label
  676. } else {
  677. name = DefaultSnapshotter
  678. }
  679. }
  680. return name, nil
  681. }
  682. func (c *Client) getSnapshotter(ctx context.Context, name string) (snapshots.Snapshotter, error) {
  683. name, err := c.resolveSnapshotterName(ctx, name)
  684. if err != nil {
  685. return nil, err
  686. }
  687. s := c.SnapshotService(name)
  688. if s == nil {
  689. return nil, errors.Wrapf(errdefs.ErrNotFound, "snapshotter %s was not found", name)
  690. }
  691. return s, nil
  692. }
  693. // CheckRuntime returns true if the current runtime matches the expected
  694. // runtime. Providing various parts of the runtime schema will match those
  695. // parts of the expected runtime
  696. func CheckRuntime(current, expected string) bool {
  697. cp := strings.Split(current, ".")
  698. l := len(cp)
  699. for i, p := range strings.Split(expected, ".") {
  700. if i > l {
  701. return false
  702. }
  703. if p != cp[i] {
  704. return false
  705. }
  706. }
  707. return true
  708. }
  709. // GetSnapshotterSupportedPlatforms returns a platform matchers which represents the
  710. // supported platforms for the given snapshotters
  711. func (c *Client) GetSnapshotterSupportedPlatforms(ctx context.Context, snapshotterName string) (platforms.MatchComparer, error) {
  712. filters := []string{fmt.Sprintf("type==%s, id==%s", plugin.SnapshotPlugin, snapshotterName)}
  713. in := c.IntrospectionService()
  714. resp, err := in.Plugins(ctx, filters)
  715. if err != nil {
  716. return nil, err
  717. }
  718. if len(resp.Plugins) <= 0 {
  719. return nil, fmt.Errorf("inspection service could not find snapshotter %s plugin", snapshotterName)
  720. }
  721. sn := resp.Plugins[0]
  722. snPlatforms := toPlatforms(sn.Platforms)
  723. return platforms.Any(snPlatforms...), nil
  724. }
  725. func toPlatforms(pt []apitypes.Platform) []ocispec.Platform {
  726. platforms := make([]ocispec.Platform, len(pt))
  727. for i, p := range pt {
  728. platforms[i] = ocispec.Platform{
  729. Architecture: p.Architecture,
  730. OS: p.OS,
  731. Variant: p.Variant,
  732. }
  733. }
  734. return platforms
  735. }