stream.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. /*
  2. Copyright The containerd Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package diff
  14. import (
  15. "context"
  16. "io"
  17. "os"
  18. "github.com/containerd/containerd/archive/compression"
  19. "github.com/containerd/containerd/images"
  20. "github.com/gogo/protobuf/types"
  21. ocispec "github.com/opencontainers/image-spec/specs-go/v1"
  22. "github.com/pkg/errors"
  23. )
  24. var (
  25. handlers []Handler
  26. // ErrNoProcessor is returned when no stream processor is available for a media-type
  27. ErrNoProcessor = errors.New("no processor for media-type")
  28. )
  29. func init() {
  30. // register the default compression handler
  31. RegisterProcessor(compressedHandler)
  32. }
  33. // RegisterProcessor registers a stream processor for media-types
  34. func RegisterProcessor(handler Handler) {
  35. handlers = append(handlers, handler)
  36. }
  37. // GetProcessor returns the processor for a media-type
  38. func GetProcessor(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) {
  39. // reverse this list so that user configured handlers come up first
  40. for i := len(handlers) - 1; i >= 0; i-- {
  41. processor, ok := handlers[i](ctx, stream.MediaType())
  42. if ok {
  43. return processor(ctx, stream, payloads)
  44. }
  45. }
  46. return nil, ErrNoProcessor
  47. }
  48. // Handler checks a media-type and initializes the processor
  49. type Handler func(ctx context.Context, mediaType string) (StreamProcessorInit, bool)
  50. // StaticHandler returns the processor init func for a static media-type
  51. func StaticHandler(expectedMediaType string, fn StreamProcessorInit) Handler {
  52. return func(ctx context.Context, mediaType string) (StreamProcessorInit, bool) {
  53. if mediaType == expectedMediaType {
  54. return fn, true
  55. }
  56. return nil, false
  57. }
  58. }
  59. // StreamProcessorInit returns the initialized stream processor
  60. type StreamProcessorInit func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error)
  61. // RawProcessor provides access to direct fd for processing
  62. type RawProcessor interface {
  63. // File returns the fd for the read stream of the underlying processor
  64. File() *os.File
  65. }
  66. // StreamProcessor handles processing a content stream and transforming it into a different media-type
  67. type StreamProcessor interface {
  68. io.ReadCloser
  69. // MediaType is the resulting media-type that the processor processes the stream into
  70. MediaType() string
  71. }
  72. func compressedHandler(ctx context.Context, mediaType string) (StreamProcessorInit, bool) {
  73. compressed, err := images.DiffCompression(ctx, mediaType)
  74. if err != nil {
  75. return nil, false
  76. }
  77. if compressed != "" {
  78. return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) {
  79. ds, err := compression.DecompressStream(stream)
  80. if err != nil {
  81. return nil, err
  82. }
  83. return &compressedProcessor{
  84. rc: ds,
  85. }, nil
  86. }, true
  87. }
  88. return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) {
  89. return &stdProcessor{
  90. rc: stream,
  91. }, nil
  92. }, true
  93. }
  94. // NewProcessorChain initialized the root StreamProcessor
  95. func NewProcessorChain(mt string, r io.Reader) StreamProcessor {
  96. return &processorChain{
  97. mt: mt,
  98. rc: r,
  99. }
  100. }
  101. type processorChain struct {
  102. mt string
  103. rc io.Reader
  104. }
  105. func (c *processorChain) MediaType() string {
  106. return c.mt
  107. }
  108. func (c *processorChain) Read(p []byte) (int, error) {
  109. return c.rc.Read(p)
  110. }
  111. func (c *processorChain) Close() error {
  112. return nil
  113. }
  114. type stdProcessor struct {
  115. rc StreamProcessor
  116. }
  117. func (c *stdProcessor) MediaType() string {
  118. return ocispec.MediaTypeImageLayer
  119. }
  120. func (c *stdProcessor) Read(p []byte) (int, error) {
  121. return c.rc.Read(p)
  122. }
  123. func (c *stdProcessor) Close() error {
  124. return nil
  125. }
  126. type compressedProcessor struct {
  127. rc io.ReadCloser
  128. }
  129. func (c *compressedProcessor) MediaType() string {
  130. return ocispec.MediaTypeImageLayer
  131. }
  132. func (c *compressedProcessor) Read(p []byte) (int, error) {
  133. return c.rc.Read(p)
  134. }
  135. func (c *compressedProcessor) Close() error {
  136. return c.rc.Close()
  137. }
  138. func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string, args, env []string) Handler {
  139. set := make(map[string]struct{}, len(mediaTypes))
  140. for _, m := range mediaTypes {
  141. set[m] = struct{}{}
  142. }
  143. return func(_ context.Context, mediaType string) (StreamProcessorInit, bool) {
  144. if _, ok := set[mediaType]; ok {
  145. return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) {
  146. payload := payloads[id]
  147. return NewBinaryProcessor(ctx, mediaType, returnsMediaType, stream, path, args, env, payload)
  148. }, true
  149. }
  150. return nil, false
  151. }
  152. }
  153. const mediaTypeEnvVar = "STREAM_PROCESSOR_MEDIATYPE"