compression.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. /*
  2. Copyright The containerd Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compression
  14. import (
  15. "bufio"
  16. "bytes"
  17. "compress/gzip"
  18. "context"
  19. "encoding/binary"
  20. "fmt"
  21. "io"
  22. "os"
  23. "strconv"
  24. "sync"
  25. "github.com/containerd/containerd/log"
  26. "github.com/klauspost/compress/zstd"
  27. exec "golang.org/x/sys/execabs"
  28. )
  29. type (
  30. // Compression is the state represents if compressed or not.
  31. Compression int
  32. )
  33. const (
  34. // Uncompressed represents the uncompressed.
  35. Uncompressed Compression = iota
  36. // Gzip is gzip compression algorithm.
  37. Gzip
  38. // Zstd is zstd compression algorithm.
  39. Zstd
  40. )
  41. const disablePigzEnv = "CONTAINERD_DISABLE_PIGZ"
  42. var (
  43. initPigz sync.Once
  44. unpigzPath string
  45. )
  46. var (
  47. bufioReader32KPool = &sync.Pool{
  48. New: func() interface{} { return bufio.NewReaderSize(nil, 32*1024) },
  49. }
  50. )
  51. // DecompressReadCloser include the stream after decompress and the compress method detected.
  52. type DecompressReadCloser interface {
  53. io.ReadCloser
  54. // GetCompression returns the compress method which is used before decompressing
  55. GetCompression() Compression
  56. }
  57. type readCloserWrapper struct {
  58. io.Reader
  59. compression Compression
  60. closer func() error
  61. }
  62. func (r *readCloserWrapper) Close() error {
  63. if r.closer != nil {
  64. return r.closer()
  65. }
  66. return nil
  67. }
  68. func (r *readCloserWrapper) GetCompression() Compression {
  69. return r.compression
  70. }
  71. type writeCloserWrapper struct {
  72. io.Writer
  73. closer func() error
  74. }
  75. func (w *writeCloserWrapper) Close() error {
  76. if w.closer != nil {
  77. w.closer()
  78. }
  79. return nil
  80. }
  81. type bufferedReader struct {
  82. buf *bufio.Reader
  83. }
  84. func newBufferedReader(r io.Reader) *bufferedReader {
  85. buf := bufioReader32KPool.Get().(*bufio.Reader)
  86. buf.Reset(r)
  87. return &bufferedReader{buf}
  88. }
  89. func (r *bufferedReader) Read(p []byte) (n int, err error) {
  90. if r.buf == nil {
  91. return 0, io.EOF
  92. }
  93. n, err = r.buf.Read(p)
  94. if err == io.EOF {
  95. r.buf.Reset(nil)
  96. bufioReader32KPool.Put(r.buf)
  97. r.buf = nil
  98. }
  99. return
  100. }
  101. func (r *bufferedReader) Peek(n int) ([]byte, error) {
  102. if r.buf == nil {
  103. return nil, io.EOF
  104. }
  105. return r.buf.Peek(n)
  106. }
  107. const (
  108. zstdMagicSkippableStart = 0x184D2A50
  109. zstdMagicSkippableMask = 0xFFFFFFF0
  110. )
  111. var (
  112. gzipMagic = []byte{0x1F, 0x8B, 0x08}
  113. zstdMagic = []byte{0x28, 0xb5, 0x2f, 0xfd}
  114. )
  115. type matcher = func([]byte) bool
  116. func magicNumberMatcher(m []byte) matcher {
  117. return func(source []byte) bool {
  118. return bytes.HasPrefix(source, m)
  119. }
  120. }
  121. // zstdMatcher detects zstd compression algorithm.
  122. // There are two frame formats defined by Zstandard: Zstandard frames and Skippable frames.
  123. // See https://tools.ietf.org/id/draft-kucherawy-dispatch-zstd-00.html#rfc.section.2 for more details.
  124. func zstdMatcher() matcher {
  125. return func(source []byte) bool {
  126. if bytes.HasPrefix(source, zstdMagic) {
  127. // Zstandard frame
  128. return true
  129. }
  130. // skippable frame
  131. if len(source) < 8 {
  132. return false
  133. }
  134. // magic number from 0x184D2A50 to 0x184D2A5F.
  135. if binary.LittleEndian.Uint32(source[:4])&zstdMagicSkippableMask == zstdMagicSkippableStart {
  136. return true
  137. }
  138. return false
  139. }
  140. }
  141. // DetectCompression detects the compression algorithm of the source.
  142. func DetectCompression(source []byte) Compression {
  143. for compression, fn := range map[Compression]matcher{
  144. Gzip: magicNumberMatcher(gzipMagic),
  145. Zstd: zstdMatcher(),
  146. } {
  147. if fn(source) {
  148. return compression
  149. }
  150. }
  151. return Uncompressed
  152. }
  153. // DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
  154. func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
  155. buf := newBufferedReader(archive)
  156. bs, err := buf.Peek(10)
  157. if err != nil && err != io.EOF {
  158. // Note: we'll ignore any io.EOF error because there are some odd
  159. // cases where the layer.tar file will be empty (zero bytes) and
  160. // that results in an io.EOF from the Peek() call. So, in those
  161. // cases we'll just treat it as a non-compressed stream and
  162. // that means just create an empty layer.
  163. // See Issue docker/docker#18170
  164. return nil, err
  165. }
  166. switch compression := DetectCompression(bs); compression {
  167. case Uncompressed:
  168. return &readCloserWrapper{
  169. Reader: buf,
  170. compression: compression,
  171. }, nil
  172. case Gzip:
  173. ctx, cancel := context.WithCancel(context.Background())
  174. gzReader, err := gzipDecompress(ctx, buf)
  175. if err != nil {
  176. cancel()
  177. return nil, err
  178. }
  179. return &readCloserWrapper{
  180. Reader: gzReader,
  181. compression: compression,
  182. closer: func() error {
  183. cancel()
  184. return gzReader.Close()
  185. },
  186. }, nil
  187. case Zstd:
  188. zstdReader, err := zstd.NewReader(buf)
  189. if err != nil {
  190. return nil, err
  191. }
  192. return &readCloserWrapper{
  193. Reader: zstdReader,
  194. compression: compression,
  195. closer: func() error {
  196. zstdReader.Close()
  197. return nil
  198. },
  199. }, nil
  200. default:
  201. return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
  202. }
  203. }
  204. // CompressStream compresses the dest with specified compression algorithm.
  205. func CompressStream(dest io.Writer, compression Compression) (io.WriteCloser, error) {
  206. switch compression {
  207. case Uncompressed:
  208. return &writeCloserWrapper{dest, nil}, nil
  209. case Gzip:
  210. return gzip.NewWriter(dest), nil
  211. case Zstd:
  212. return zstd.NewWriter(dest)
  213. default:
  214. return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
  215. }
  216. }
  217. // Extension returns the extension of a file that uses the specified compression algorithm.
  218. func (compression *Compression) Extension() string {
  219. switch *compression {
  220. case Gzip:
  221. return "gz"
  222. case Zstd:
  223. return "zst"
  224. }
  225. return ""
  226. }
  227. func gzipDecompress(ctx context.Context, buf io.Reader) (io.ReadCloser, error) {
  228. initPigz.Do(func() {
  229. if unpigzPath = detectPigz(); unpigzPath != "" {
  230. log.L.Debug("using pigz for decompression")
  231. }
  232. })
  233. if unpigzPath == "" {
  234. return gzip.NewReader(buf)
  235. }
  236. return cmdStream(exec.CommandContext(ctx, unpigzPath, "-d", "-c"), buf)
  237. }
  238. func cmdStream(cmd *exec.Cmd, in io.Reader) (io.ReadCloser, error) {
  239. reader, writer := io.Pipe()
  240. cmd.Stdin = in
  241. cmd.Stdout = writer
  242. var errBuf bytes.Buffer
  243. cmd.Stderr = &errBuf
  244. if err := cmd.Start(); err != nil {
  245. return nil, err
  246. }
  247. go func() {
  248. if err := cmd.Wait(); err != nil {
  249. writer.CloseWithError(fmt.Errorf("%s: %s", err, errBuf.String()))
  250. } else {
  251. writer.Close()
  252. }
  253. }()
  254. return reader, nil
  255. }
  256. func detectPigz() string {
  257. path, err := exec.LookPath("unpigz")
  258. if err != nil {
  259. log.L.WithError(err).Debug("unpigz not found, falling back to go gzip")
  260. return ""
  261. }
  262. // Check if pigz disabled via CONTAINERD_DISABLE_PIGZ env variable
  263. value := os.Getenv(disablePigzEnv)
  264. if value == "" {
  265. return path
  266. }
  267. disable, err := strconv.ParseBool(value)
  268. if err != nil {
  269. log.L.WithError(err).Warnf("could not parse %s: %s", disablePigzEnv, value)
  270. return path
  271. }
  272. if disable {
  273. return ""
  274. }
  275. return path
  276. }