123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- /*
- Copyright The containerd Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package diff
- import (
- "context"
- "io"
- "os"
- "github.com/containerd/containerd/archive/compression"
- "github.com/containerd/containerd/images"
- "github.com/gogo/protobuf/types"
- ocispec "github.com/opencontainers/image-spec/specs-go/v1"
- "github.com/pkg/errors"
- )
- var (
- handlers []Handler
- // ErrNoProcessor is returned when no stream processor is available for a media-type
- ErrNoProcessor = errors.New("no processor for media-type")
- )
- func init() {
- // register the default compression handler
- RegisterProcessor(compressedHandler)
- }
- // RegisterProcessor registers a stream processor for media-types
- func RegisterProcessor(handler Handler) {
- handlers = append(handlers, handler)
- }
- // GetProcessor returns the processor for a media-type
- func GetProcessor(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) {
- // reverse this list so that user configured handlers come up first
- for i := len(handlers) - 1; i >= 0; i-- {
- processor, ok := handlers[i](ctx, stream.MediaType())
- if ok {
- return processor(ctx, stream, payloads)
- }
- }
- return nil, ErrNoProcessor
- }
- // Handler checks a media-type and initializes the processor
- type Handler func(ctx context.Context, mediaType string) (StreamProcessorInit, bool)
- // StaticHandler returns the processor init func for a static media-type
- func StaticHandler(expectedMediaType string, fn StreamProcessorInit) Handler {
- return func(ctx context.Context, mediaType string) (StreamProcessorInit, bool) {
- if mediaType == expectedMediaType {
- return fn, true
- }
- return nil, false
- }
- }
- // StreamProcessorInit returns the initialized stream processor
- type StreamProcessorInit func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error)
- // RawProcessor provides access to direct fd for processing
- type RawProcessor interface {
- // File returns the fd for the read stream of the underlying processor
- File() *os.File
- }
- // StreamProcessor handles processing a content stream and transforming it into a different media-type
- type StreamProcessor interface {
- io.ReadCloser
- // MediaType is the resulting media-type that the processor processes the stream into
- MediaType() string
- }
- func compressedHandler(ctx context.Context, mediaType string) (StreamProcessorInit, bool) {
- compressed, err := images.DiffCompression(ctx, mediaType)
- if err != nil {
- return nil, false
- }
- if compressed != "" {
- return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) {
- ds, err := compression.DecompressStream(stream)
- if err != nil {
- return nil, err
- }
- return &compressedProcessor{
- rc: ds,
- }, nil
- }, true
- }
- return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) {
- return &stdProcessor{
- rc: stream,
- }, nil
- }, true
- }
- // NewProcessorChain initialized the root StreamProcessor
- func NewProcessorChain(mt string, r io.Reader) StreamProcessor {
- return &processorChain{
- mt: mt,
- rc: r,
- }
- }
- type processorChain struct {
- mt string
- rc io.Reader
- }
- func (c *processorChain) MediaType() string {
- return c.mt
- }
- func (c *processorChain) Read(p []byte) (int, error) {
- return c.rc.Read(p)
- }
- func (c *processorChain) Close() error {
- return nil
- }
- type stdProcessor struct {
- rc StreamProcessor
- }
- func (c *stdProcessor) MediaType() string {
- return ocispec.MediaTypeImageLayer
- }
- func (c *stdProcessor) Read(p []byte) (int, error) {
- return c.rc.Read(p)
- }
- func (c *stdProcessor) Close() error {
- return nil
- }
- type compressedProcessor struct {
- rc io.ReadCloser
- }
- func (c *compressedProcessor) MediaType() string {
- return ocispec.MediaTypeImageLayer
- }
- func (c *compressedProcessor) Read(p []byte) (int, error) {
- return c.rc.Read(p)
- }
- func (c *compressedProcessor) Close() error {
- return c.rc.Close()
- }
- func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string, args, env []string) Handler {
- set := make(map[string]struct{}, len(mediaTypes))
- for _, m := range mediaTypes {
- set[m] = struct{}{}
- }
- return func(_ context.Context, mediaType string) (StreamProcessorInit, bool) {
- if _, ok := set[mediaType]; ok {
- return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) {
- payload := payloads[id]
- return NewBinaryProcessor(ctx, mediaType, returnsMediaType, stream, path, args, env, payload)
- }, true
- }
- return nil, false
- }
- }
- const mediaTypeEnvVar = "STREAM_PROCESSOR_MEDIATYPE"
|