client.go 18 KB


  1. package containerd
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "runtime"
  8. "strconv"
  9. "sync"
  10. "time"
  11. containersapi "github.com/containerd/containerd/api/services/containers/v1"
  12. contentapi "github.com/containerd/containerd/api/services/content/v1"
  13. diffapi "github.com/containerd/containerd/api/services/diff/v1"
  14. eventsapi "github.com/containerd/containerd/api/services/events/v1"
  15. imagesapi "github.com/containerd/containerd/api/services/images/v1"
  16. introspectionapi "github.com/containerd/containerd/api/services/introspection/v1"
  17. namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1"
  18. snapshotapi "github.com/containerd/containerd/api/services/snapshot/v1"
  19. "github.com/containerd/containerd/api/services/tasks/v1"
  20. versionservice "github.com/containerd/containerd/api/services/version/v1"
  21. "github.com/containerd/containerd/containers"
  22. "github.com/containerd/containerd/content"
  23. "github.com/containerd/containerd/dialer"
  24. "github.com/containerd/containerd/diff"
  25. "github.com/containerd/containerd/errdefs"
  26. "github.com/containerd/containerd/images"
  27. "github.com/containerd/containerd/namespaces"
  28. "github.com/containerd/containerd/platforms"
  29. "github.com/containerd/containerd/plugin"
  30. "github.com/containerd/containerd/reference"
  31. "github.com/containerd/containerd/remotes"
  32. "github.com/containerd/containerd/remotes/docker"
  33. "github.com/containerd/containerd/remotes/docker/schema1"
  34. contentservice "github.com/containerd/containerd/services/content"
  35. diffservice "github.com/containerd/containerd/services/diff"
  36. imagesservice "github.com/containerd/containerd/services/images"
  37. namespacesservice "github.com/containerd/containerd/services/namespaces"
  38. snapshotservice "github.com/containerd/containerd/services/snapshot"
  39. "github.com/containerd/containerd/snapshot"
  40. "github.com/containerd/typeurl"
  41. pempty "github.com/golang/protobuf/ptypes/empty"
  42. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  43. specs "github.com/opencontainers/runtime-spec/specs-go"
  44. "github.com/pkg/errors"
  45. "google.golang.org/grpc"
  46. "google.golang.org/grpc/health/grpc_health_v1"
  47. )
  48. func init() {
  49. const prefix = "types.containerd.io"
  50. // register TypeUrls for commonly marshaled external types
  51. major := strconv.Itoa(specs.VersionMajor)
  52. typeurl.Register(&specs.Spec{}, prefix, "opencontainers/runtime-spec", major, "Spec")
  53. typeurl.Register(&specs.Process{}, prefix, "opencontainers/runtime-spec", major, "Process")
  54. typeurl.Register(&specs.LinuxResources{}, prefix, "opencontainers/runtime-spec", major, "LinuxResources")
  55. typeurl.Register(&specs.WindowsResources{}, prefix, "opencontainers/runtime-spec", major, "WindowsResources")
  56. }
  57. // New returns a new containerd client that is connected to the containerd
  58. // instance provided by address
  59. func New(address string, opts ...ClientOpt) (*Client, error) {
  60. var copts clientOpts
  61. for _, o := range opts {
  62. if err := o(&copts); err != nil {
  63. return nil, err
  64. }
  65. }
  66. gopts := []grpc.DialOption{
  67. grpc.WithBlock(),
  68. grpc.WithInsecure(),
  69. grpc.WithTimeout(60 * time.Second),
  70. grpc.FailOnNonTempDialError(true),
  71. grpc.WithBackoffMaxDelay(3 * time.Second),
  72. grpc.WithDialer(dialer.Dialer),
  73. }
  74. if len(copts.dialOptions) > 0 {
  75. gopts = copts.dialOptions
  76. }
  77. if copts.defaultns != "" {
  78. unary, stream := newNSInterceptors(copts.defaultns)
  79. gopts = append(gopts,
  80. grpc.WithUnaryInterceptor(unary),
  81. grpc.WithStreamInterceptor(stream),
  82. )
  83. }
  84. conn, err := grpc.Dial(dialer.DialAddress(address), gopts...)
  85. if err != nil {
  86. return nil, errors.Wrapf(err, "failed to dial %q", address)
  87. }
  88. return NewWithConn(conn, opts...)
  89. }
  90. // NewWithConn returns a new containerd client that is connected to the containerd
  91. // instance provided by the connection
  92. func NewWithConn(conn *grpc.ClientConn, opts ...ClientOpt) (*Client, error) {
  93. return &Client{
  94. conn: conn,
  95. runtime: fmt.Sprintf("%s.%s", plugin.RuntimePlugin, runtime.GOOS),
  96. }, nil
  97. }
  98. // Client is the client to interact with containerd and its various services
  99. // using a uniform interface
  100. type Client struct {
  101. conn *grpc.ClientConn
  102. runtime string
  103. }
  104. // IsServing returns true if the client can successfully connect to the
  105. // containerd daemon and the healthcheck service returns the SERVING
  106. // response.
  107. // This call will block if a transient error is encountered during
  108. // connection. A timeout can be set in the context to ensure it returns
  109. // early.
  110. func (c *Client) IsServing(ctx context.Context) (bool, error) {
  111. r, err := c.HealthService().Check(ctx, &grpc_health_v1.HealthCheckRequest{}, grpc.FailFast(false))
  112. if err != nil {
  113. return false, err
  114. }
  115. return r.Status == grpc_health_v1.HealthCheckResponse_SERVING, nil
  116. }
  117. // Containers returns all containers created in containerd
  118. func (c *Client) Containers(ctx context.Context, filters ...string) ([]Container, error) {
  119. r, err := c.ContainerService().List(ctx, filters...)
  120. if err != nil {
  121. return nil, err
  122. }
  123. var out []Container
  124. for _, container := range r {
  125. out = append(out, containerFromRecord(c, container))
  126. }
  127. return out, nil
  128. }
  129. // NewContainer will create a new container in container with the provided id
  130. // the id must be unique within the namespace
  131. func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error) {
  132. ctx, done, err := c.withLease(ctx)
  133. if err != nil {
  134. return nil, err
  135. }
  136. defer done()
  137. container := containers.Container{
  138. ID: id,
  139. Runtime: containers.RuntimeInfo{
  140. Name: c.runtime,
  141. },
  142. }
  143. for _, o := range opts {
  144. if err := o(ctx, c, &container); err != nil {
  145. return nil, err
  146. }
  147. }
  148. r, err := c.ContainerService().Create(ctx, container)
  149. if err != nil {
  150. return nil, err
  151. }
  152. return containerFromRecord(c, r), nil
  153. }
  154. // LoadContainer loads an existing container from metadata
  155. func (c *Client) LoadContainer(ctx context.Context, id string) (Container, error) {
  156. r, err := c.ContainerService().Get(ctx, id)
  157. if err != nil {
  158. return nil, err
  159. }
  160. return containerFromRecord(c, r), nil
  161. }
  162. // RemoteContext is used to configure object resolutions and transfers with
  163. // remote content stores and image providers.
  164. type RemoteContext struct {
  165. // Resolver is used to resolve names to objects, fetchers, and pushers.
  166. // If no resolver is provided, defaults to Docker registry resolver.
  167. Resolver remotes.Resolver
  168. // Unpack is done after an image is pulled to extract into a snapshotter.
  169. // If an image is not unpacked on pull, it can be unpacked any time
  170. // afterwards. Unpacking is required to run an image.
  171. Unpack bool
  172. // Snapshotter used for unpacking
  173. Snapshotter string
  174. // Labels to be applied to the created image
  175. Labels map[string]string
  176. // BaseHandlers are a set of handlers which get are called on dispatch.
  177. // These handlers always get called before any operation specific
  178. // handlers.
  179. BaseHandlers []images.Handler
  180. // ConvertSchema1 is whether to convert Docker registry schema 1
  181. // manifests. If this option is false then any image which resolves
  182. // to schema 1 will return an error since schema 1 is not supported.
  183. ConvertSchema1 bool
  184. }
  185. func defaultRemoteContext() *RemoteContext {
  186. return &RemoteContext{
  187. Resolver: docker.NewResolver(docker.ResolverOptions{
  188. Client: http.DefaultClient,
  189. }),
  190. Snapshotter: DefaultSnapshotter,
  191. }
  192. }
  193. // Pull downloads the provided content into containerd's content store
  194. func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image, error) {
  195. pullCtx := defaultRemoteContext()
  196. for _, o := range opts {
  197. if err := o(c, pullCtx); err != nil {
  198. return nil, err
  199. }
  200. }
  201. store := c.ContentStore()
  202. ctx, done, err := c.withLease(ctx)
  203. if err != nil {
  204. return nil, err
  205. }
  206. defer done()
  207. name, desc, err := pullCtx.Resolver.Resolve(ctx, ref)
  208. if err != nil {
  209. return nil, err
  210. }
  211. fetcher, err := pullCtx.Resolver.Fetcher(ctx, name)
  212. if err != nil {
  213. return nil, err
  214. }
  215. var (
  216. schema1Converter *schema1.Converter
  217. handler images.Handler
  218. )
  219. if desc.MediaType == images.MediaTypeDockerSchema1Manifest && pullCtx.ConvertSchema1 {
  220. schema1Converter = schema1.NewConverter(store, fetcher)
  221. handler = images.Handlers(append(pullCtx.BaseHandlers, schema1Converter)...)
  222. } else {
  223. handler = images.Handlers(append(pullCtx.BaseHandlers,
  224. remotes.FetchHandler(store, fetcher),
  225. images.ChildrenHandler(store, platforms.Default()))...,
  226. )
  227. }
  228. if err := images.Dispatch(ctx, handler, desc); err != nil {
  229. return nil, err
  230. }
  231. if schema1Converter != nil {
  232. desc, err = schema1Converter.Convert(ctx)
  233. if err != nil {
  234. return nil, err
  235. }
  236. }
  237. imgrec := images.Image{
  238. Name: name,
  239. Target: desc,
  240. Labels: pullCtx.Labels,
  241. }
  242. is := c.ImageService()
  243. if created, err := is.Create(ctx, imgrec); err != nil {
  244. if !errdefs.IsAlreadyExists(err) {
  245. return nil, err
  246. }
  247. updated, err := is.Update(ctx, imgrec)
  248. if err != nil {
  249. return nil, err
  250. }
  251. imgrec = updated
  252. } else {
  253. imgrec = created
  254. }
  255. img := &image{
  256. client: c,
  257. i: imgrec,
  258. }
  259. if pullCtx.Unpack {
  260. if err := img.Unpack(ctx, pullCtx.Snapshotter); err != nil {
  261. return nil, err
  262. }
  263. }
  264. return img, nil
  265. }
  266. // Push uploads the provided content to a remote resource
  267. func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, opts ...RemoteOpt) error {
  268. pushCtx := defaultRemoteContext()
  269. for _, o := range opts {
  270. if err := o(c, pushCtx); err != nil {
  271. return err
  272. }
  273. }
  274. pusher, err := pushCtx.Resolver.Pusher(ctx, ref)
  275. if err != nil {
  276. return err
  277. }
  278. var m sync.Mutex
  279. manifestStack := []ocispec.Descriptor{}
  280. filterHandler := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
  281. switch desc.MediaType {
  282. case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest,
  283. images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
  284. m.Lock()
  285. manifestStack = append(manifestStack, desc)
  286. m.Unlock()
  287. return nil, images.ErrStopHandler
  288. default:
  289. return nil, nil
  290. }
  291. })
  292. cs := c.ContentStore()
  293. pushHandler := remotes.PushHandler(cs, pusher)
  294. handlers := append(pushCtx.BaseHandlers,
  295. images.ChildrenHandler(cs, platforms.Default()),
  296. filterHandler,
  297. pushHandler,
  298. )
  299. if err := images.Dispatch(ctx, images.Handlers(handlers...), desc); err != nil {
  300. return err
  301. }
  302. // Iterate in reverse order as seen, parent always uploaded after child
  303. for i := len(manifestStack) - 1; i >= 0; i-- {
  304. _, err := pushHandler(ctx, manifestStack[i])
  305. if err != nil {
  306. return err
  307. }
  308. }
  309. return nil
  310. }
  311. // GetImage returns an existing image
  312. func (c *Client) GetImage(ctx context.Context, ref string) (Image, error) {
  313. i, err := c.ImageService().Get(ctx, ref)
  314. if err != nil {
  315. return nil, err
  316. }
  317. return &image{
  318. client: c,
  319. i: i,
  320. }, nil
  321. }
  322. // ListImages returns all existing images
  323. func (c *Client) ListImages(ctx context.Context, filters ...string) ([]Image, error) {
  324. imgs, err := c.ImageService().List(ctx, filters...)
  325. if err != nil {
  326. return nil, err
  327. }
  328. images := make([]Image, len(imgs))
  329. for i, img := range imgs {
  330. images[i] = &image{
  331. client: c,
  332. i: img,
  333. }
  334. }
  335. return images, nil
  336. }
  337. // Subscribe to events that match one or more of the provided filters.
  338. //
  339. // Callers should listen on both the envelope channel and errs channel. If the
  340. // errs channel returns nil or an error, the subscriber should terminate.
  341. //
  342. // To cancel shutdown reciept of events, cancel the provided context. The errs
  343. // channel will be closed and return a nil error.
  344. func (c *Client) Subscribe(ctx context.Context, filters ...string) (ch <-chan *eventsapi.Envelope, errs <-chan error) {
  345. var (
  346. evq = make(chan *eventsapi.Envelope)
  347. errq = make(chan error, 1)
  348. )
  349. errs = errq
  350. ch = evq
  351. session, err := c.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{
  352. Filters: filters,
  353. })
  354. if err != nil {
  355. errq <- err
  356. close(errq)
  357. return
  358. }
  359. go func() {
  360. defer close(errq)
  361. for {
  362. ev, err := session.Recv()
  363. if err != nil {
  364. errq <- err
  365. return
  366. }
  367. select {
  368. case evq <- ev:
  369. case <-ctx.Done():
  370. return
  371. }
  372. }
  373. }()
  374. return ch, errs
  375. }
  376. // Close closes the clients connection to containerd
  377. func (c *Client) Close() error {
  378. return c.conn.Close()
  379. }
  380. // NamespaceService returns the underlying Namespaces Store
  381. func (c *Client) NamespaceService() namespaces.Store {
  382. return namespacesservice.NewStoreFromClient(namespacesapi.NewNamespacesClient(c.conn))
  383. }
  384. // ContainerService returns the underlying container Store
  385. func (c *Client) ContainerService() containers.Store {
  386. return NewRemoteContainerStore(containersapi.NewContainersClient(c.conn))
  387. }
  388. // ContentStore returns the underlying content Store
  389. func (c *Client) ContentStore() content.Store {
  390. return contentservice.NewStoreFromClient(contentapi.NewContentClient(c.conn))
  391. }
  392. // SnapshotService returns the underlying snapshotter for the provided snapshotter name
  393. func (c *Client) SnapshotService(snapshotterName string) snapshot.Snapshotter {
  394. return snapshotservice.NewSnapshotterFromClient(snapshotapi.NewSnapshotsClient(c.conn), snapshotterName)
  395. }
  396. // TaskService returns the underlying TasksClient
  397. func (c *Client) TaskService() tasks.TasksClient {
  398. return tasks.NewTasksClient(c.conn)
  399. }
  400. // ImageService returns the underlying image Store
  401. func (c *Client) ImageService() images.Store {
  402. return imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(c.conn))
  403. }
  404. // DiffService returns the underlying Differ
  405. func (c *Client) DiffService() diff.Differ {
  406. return diffservice.NewDiffServiceFromClient(diffapi.NewDiffClient(c.conn))
  407. }
  408. // IntrospectionService returns the underlying Introspection Client
  409. func (c *Client) IntrospectionService() introspectionapi.IntrospectionClient {
  410. return introspectionapi.NewIntrospectionClient(c.conn)
  411. }
  412. // HealthService returns the underlying GRPC HealthClient
  413. func (c *Client) HealthService() grpc_health_v1.HealthClient {
  414. return grpc_health_v1.NewHealthClient(c.conn)
  415. }
  416. // EventService returns the underlying EventsClient
  417. func (c *Client) EventService() eventsapi.EventsClient {
  418. return eventsapi.NewEventsClient(c.conn)
  419. }
  420. // VersionService returns the underlying VersionClient
  421. func (c *Client) VersionService() versionservice.VersionClient {
  422. return versionservice.NewVersionClient(c.conn)
  423. }
  424. // Version of containerd
  425. type Version struct {
  426. // Version number
  427. Version string
  428. // Revision from git that was built
  429. Revision string
  430. }
  431. // Version returns the version of containerd that the client is connected to
  432. func (c *Client) Version(ctx context.Context) (Version, error) {
  433. response, err := c.VersionService().Version(ctx, &pempty.Empty{})
  434. if err != nil {
  435. return Version{}, err
  436. }
  437. return Version{
  438. Version: response.Version,
  439. Revision: response.Revision,
  440. }, nil
  441. }
  442. type imageFormat string
  443. const (
  444. ociImageFormat imageFormat = "oci"
  445. )
  446. type importOpts struct {
  447. format imageFormat
  448. refObject string
  449. labels map[string]string
  450. }
  451. // ImportOpt allows the caller to specify import specific options
  452. type ImportOpt func(c *importOpts) error
  453. // WithImportLabel sets a label to be associated with an imported image
  454. func WithImportLabel(key, value string) ImportOpt {
  455. return func(opts *importOpts) error {
  456. if opts.labels == nil {
  457. opts.labels = make(map[string]string)
  458. }
  459. opts.labels[key] = value
  460. return nil
  461. }
  462. }
  463. // WithImportLabels associates a set of labels to an imported image
  464. func WithImportLabels(labels map[string]string) ImportOpt {
  465. return func(opts *importOpts) error {
  466. if opts.labels == nil {
  467. opts.labels = make(map[string]string)
  468. }
  469. for k, v := range labels {
  470. opts.labels[k] = v
  471. }
  472. return nil
  473. }
  474. }
  475. // WithOCIImportFormat sets the import format for an OCI image format
  476. func WithOCIImportFormat() ImportOpt {
  477. return func(c *importOpts) error {
  478. if c.format != "" {
  479. return errors.New("format already set")
  480. }
  481. c.format = ociImageFormat
  482. return nil
  483. }
  484. }
  485. // WithRefObject specifies the ref object to import.
  486. // If refObject is empty, it is copied from the ref argument of Import().
  487. func WithRefObject(refObject string) ImportOpt {
  488. return func(c *importOpts) error {
  489. c.refObject = refObject
  490. return nil
  491. }
  492. }
  493. func resolveImportOpt(ref string, opts ...ImportOpt) (importOpts, error) {
  494. var iopts importOpts
  495. for _, o := range opts {
  496. if err := o(&iopts); err != nil {
  497. return iopts, err
  498. }
  499. }
  500. // use OCI as the default format
  501. if iopts.format == "" {
  502. iopts.format = ociImageFormat
  503. }
  504. // if refObject is not explicitly specified, use the one specified in ref
  505. if iopts.refObject == "" {
  506. refSpec, err := reference.Parse(ref)
  507. if err != nil {
  508. return iopts, err
  509. }
  510. iopts.refObject = refSpec.Object
  511. }
  512. return iopts, nil
  513. }
  514. // Import imports an image from a Tar stream using reader.
  515. // OCI format is assumed by default.
  516. //
  517. // Note that unreferenced blobs are imported to the content store as well.
  518. func (c *Client) Import(ctx context.Context, ref string, reader io.Reader, opts ...ImportOpt) (Image, error) {
  519. iopts, err := resolveImportOpt(ref, opts...)
  520. if err != nil {
  521. return nil, err
  522. }
  523. ctx, done, err := c.withLease(ctx)
  524. if err != nil {
  525. return nil, err
  526. }
  527. defer done()
  528. switch iopts.format {
  529. case ociImageFormat:
  530. return c.importFromOCITar(ctx, ref, reader, iopts)
  531. default:
  532. return nil, errors.Errorf("unsupported format: %s", iopts.format)
  533. }
  534. }
  535. type exportOpts struct {
  536. format imageFormat
  537. }
  538. // ExportOpt allows callers to set export options
  539. type ExportOpt func(c *exportOpts) error
  540. // WithOCIExportFormat sets the OCI image format as the export target
  541. func WithOCIExportFormat() ExportOpt {
  542. return func(c *exportOpts) error {
  543. if c.format != "" {
  544. return errors.New("format already set")
  545. }
  546. c.format = ociImageFormat
  547. return nil
  548. }
  549. }
  550. // TODO: add WithMediaTypeTranslation that transforms media types according to the format.
  551. // e.g. application/vnd.docker.image.rootfs.diff.tar.gzip
  552. // -> application/vnd.oci.image.layer.v1.tar+gzip
  553. // Export exports an image to a Tar stream.
  554. // OCI format is used by default.
  555. // It is up to caller to put "org.opencontainers.image.ref.name" annotation to desc.
  556. func (c *Client) Export(ctx context.Context, desc ocispec.Descriptor, opts ...ExportOpt) (io.ReadCloser, error) {
  557. var eopts exportOpts
  558. for _, o := range opts {
  559. if err := o(&eopts); err != nil {
  560. return nil, err
  561. }
  562. }
  563. // use OCI as the default format
  564. if eopts.format == "" {
  565. eopts.format = ociImageFormat
  566. }
  567. pr, pw := io.Pipe()
  568. switch eopts.format {
  569. case ociImageFormat:
  570. go func() {
  571. pw.CloseWithError(c.exportToOCITar(ctx, desc, pw, eopts))
  572. }()
  573. default:
  574. return nil, errors.Errorf("unsupported format: %s", eopts.format)
  575. }
  576. return pr, nil
  577. }