decoder.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557
  1. // Copyright 2019+ Klaus Post. All rights reserved.
  2. // License information can be found in the LICENSE file.
  3. // Based on work by Yann Collet, released under BSD License.
  4. package zstd
  5. import (
  6. "errors"
  7. "io"
  8. "sync"
  9. )
  10. // Decoder provides decoding of zstandard streams.
  11. // The decoder has been designed to operate without allocations after a warmup.
  12. // This means that you should store the decoder for best performance.
  13. // To re-use a stream decoder, use the Reset(r io.Reader) error to switch to another stream.
  14. // A decoder can safely be re-used even if the previous stream failed.
  15. // To release the resources, you must call the Close() function on a decoder.
  16. type Decoder struct {
  17. o decoderOptions
  18. // Unreferenced decoders, ready for use.
  19. decoders chan *blockDec
  20. // Streams ready to be decoded.
  21. stream chan decodeStream
  22. // Current read position used for Reader functionality.
  23. current decoderState
  24. // Custom dictionaries.
  25. // Always uses copies.
  26. dicts map[uint32]dict
  27. // streamWg is the waitgroup for all streams
  28. streamWg sync.WaitGroup
  29. }
  30. // decoderState is used for maintaining state when the decoder
  31. // is used for streaming.
  32. type decoderState struct {
  33. // current block being written to stream.
  34. decodeOutput
  35. // output in order to be written to stream.
  36. output chan decodeOutput
  37. // cancel remaining output.
  38. cancel chan struct{}
  39. flushed bool
  40. }
  41. var (
  42. // Check the interfaces we want to support.
  43. _ = io.WriterTo(&Decoder{})
  44. _ = io.Reader(&Decoder{})
  45. )
  46. // NewReader creates a new decoder.
  47. // A nil Reader can be provided in which case Reset can be used to start a decode.
  48. //
  49. // A Decoder can be used in two modes:
  50. //
  51. // 1) As a stream, or
  52. // 2) For stateless decoding using DecodeAll.
  53. //
  54. // Only a single stream can be decoded concurrently, but the same decoder
  55. // can run multiple concurrent stateless decodes. It is even possible to
  56. // use stateless decodes while a stream is being decoded.
  57. //
  58. // The Reset function can be used to initiate a new stream, which is will considerably
  59. // reduce the allocations normally caused by NewReader.
  60. func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
  61. initPredefined()
  62. var d Decoder
  63. d.o.setDefault()
  64. for _, o := range opts {
  65. err := o(&d.o)
  66. if err != nil {
  67. return nil, err
  68. }
  69. }
  70. d.current.output = make(chan decodeOutput, d.o.concurrent)
  71. d.current.flushed = true
  72. if r == nil {
  73. d.current.err = ErrDecoderNilInput
  74. }
  75. // Transfer option dicts.
  76. d.dicts = make(map[uint32]dict, len(d.o.dicts))
  77. for _, dc := range d.o.dicts {
  78. d.dicts[dc.id] = dc
  79. }
  80. d.o.dicts = nil
  81. // Create decoders
  82. d.decoders = make(chan *blockDec, d.o.concurrent)
  83. for i := 0; i < d.o.concurrent; i++ {
  84. dec := newBlockDec(d.o.lowMem)
  85. dec.localFrame = newFrameDec(d.o)
  86. d.decoders <- dec
  87. }
  88. if r == nil {
  89. return &d, nil
  90. }
  91. return &d, d.Reset(r)
  92. }
  93. // Read bytes from the decompressed stream into p.
  94. // Returns the number of bytes written and any error that occurred.
  95. // When the stream is done, io.EOF will be returned.
  96. func (d *Decoder) Read(p []byte) (int, error) {
  97. if d.stream == nil {
  98. return 0, ErrDecoderNilInput
  99. }
  100. var n int
  101. for {
  102. if len(d.current.b) > 0 {
  103. filled := copy(p, d.current.b)
  104. p = p[filled:]
  105. d.current.b = d.current.b[filled:]
  106. n += filled
  107. }
  108. if len(p) == 0 {
  109. break
  110. }
  111. if len(d.current.b) == 0 {
  112. // We have an error and no more data
  113. if d.current.err != nil {
  114. break
  115. }
  116. if !d.nextBlock(n == 0) {
  117. return n, nil
  118. }
  119. }
  120. }
  121. if len(d.current.b) > 0 {
  122. if debug {
  123. println("returning", n, "still bytes left:", len(d.current.b))
  124. }
  125. // Only return error at end of block
  126. return n, nil
  127. }
  128. if d.current.err != nil {
  129. d.drainOutput()
  130. }
  131. if debug {
  132. println("returning", n, d.current.err, len(d.decoders))
  133. }
  134. return n, d.current.err
  135. }
  136. // Reset will reset the decoder the supplied stream after the current has finished processing.
  137. // Note that this functionality cannot be used after Close has been called.
  138. // Reset can be called with a nil reader to release references to the previous reader.
  139. // After being called with a nil reader, no other operations than Reset or DecodeAll or Close
  140. // should be used.
  141. func (d *Decoder) Reset(r io.Reader) error {
  142. if d.current.err == ErrDecoderClosed {
  143. return d.current.err
  144. }
  145. d.drainOutput()
  146. if r == nil {
  147. d.current.err = ErrDecoderNilInput
  148. d.current.flushed = true
  149. return nil
  150. }
  151. if d.stream == nil {
  152. d.stream = make(chan decodeStream, 1)
  153. d.streamWg.Add(1)
  154. go d.startStreamDecoder(d.stream)
  155. }
  156. // If bytes buffer and < 1MB, do sync decoding anyway.
  157. if bb, ok := r.(byter); ok && bb.Len() < 1<<20 {
  158. bb2 := bb
  159. if debug {
  160. println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
  161. }
  162. b := bb2.Bytes()
  163. var dst []byte
  164. if cap(d.current.b) > 0 {
  165. dst = d.current.b
  166. }
  167. dst, err := d.DecodeAll(b, dst[:0])
  168. if err == nil {
  169. err = io.EOF
  170. }
  171. d.current.b = dst
  172. d.current.err = err
  173. d.current.flushed = true
  174. if debug {
  175. println("sync decode to", len(dst), "bytes, err:", err)
  176. }
  177. return nil
  178. }
  179. // Remove current block.
  180. d.current.decodeOutput = decodeOutput{}
  181. d.current.err = nil
  182. d.current.cancel = make(chan struct{})
  183. d.current.flushed = false
  184. d.current.d = nil
  185. d.stream <- decodeStream{
  186. r: r,
  187. output: d.current.output,
  188. cancel: d.current.cancel,
  189. }
  190. return nil
  191. }
  192. // drainOutput will drain the output until errEndOfStream is sent.
  193. func (d *Decoder) drainOutput() {
  194. if d.current.cancel != nil {
  195. println("cancelling current")
  196. close(d.current.cancel)
  197. d.current.cancel = nil
  198. }
  199. if d.current.d != nil {
  200. if debug {
  201. printf("re-adding current decoder %p, decoders: %d", d.current.d, len(d.decoders))
  202. }
  203. d.decoders <- d.current.d
  204. d.current.d = nil
  205. d.current.b = nil
  206. }
  207. if d.current.output == nil || d.current.flushed {
  208. println("current already flushed")
  209. return
  210. }
  211. for v := range d.current.output {
  212. if v.d != nil {
  213. if debug {
  214. printf("re-adding decoder %p", v.d)
  215. }
  216. d.decoders <- v.d
  217. }
  218. if v.err == errEndOfStream {
  219. println("current flushed")
  220. d.current.flushed = true
  221. return
  222. }
  223. }
  224. }
  225. // WriteTo writes data to w until there's no more data to write or when an error occurs.
  226. // The return value n is the number of bytes written.
  227. // Any error encountered during the write is also returned.
  228. func (d *Decoder) WriteTo(w io.Writer) (int64, error) {
  229. if d.stream == nil {
  230. return 0, ErrDecoderNilInput
  231. }
  232. var n int64
  233. for {
  234. if len(d.current.b) > 0 {
  235. n2, err2 := w.Write(d.current.b)
  236. n += int64(n2)
  237. if err2 != nil && d.current.err == nil {
  238. d.current.err = err2
  239. break
  240. }
  241. }
  242. if d.current.err != nil {
  243. break
  244. }
  245. d.nextBlock(true)
  246. }
  247. err := d.current.err
  248. if err != nil {
  249. d.drainOutput()
  250. }
  251. if err == io.EOF {
  252. err = nil
  253. }
  254. return n, err
  255. }
  256. // DecodeAll allows stateless decoding of a blob of bytes.
  257. // Output will be appended to dst, so if the destination size is known
  258. // you can pre-allocate the destination slice to avoid allocations.
  259. // DecodeAll can be used concurrently.
  260. // The Decoder concurrency limits will be respected.
  261. func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
  262. if d.current.err == ErrDecoderClosed {
  263. return dst, ErrDecoderClosed
  264. }
  265. // Grab a block decoder and frame decoder.
  266. block := <-d.decoders
  267. frame := block.localFrame
  268. defer func() {
  269. if debug {
  270. printf("re-adding decoder: %p", block)
  271. }
  272. frame.rawInput = nil
  273. frame.bBuf = nil
  274. d.decoders <- block
  275. }()
  276. frame.bBuf = input
  277. for {
  278. frame.history.reset()
  279. err := frame.reset(&frame.bBuf)
  280. if err == io.EOF {
  281. if debug {
  282. println("frame reset return EOF")
  283. }
  284. return dst, nil
  285. }
  286. if frame.DictionaryID != nil {
  287. dict, ok := d.dicts[*frame.DictionaryID]
  288. if !ok {
  289. return nil, ErrUnknownDictionary
  290. }
  291. frame.history.setDict(&dict)
  292. }
  293. if err != nil {
  294. return dst, err
  295. }
  296. if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)) {
  297. return dst, ErrDecoderSizeExceeded
  298. }
  299. if frame.FrameContentSize > 0 && frame.FrameContentSize < 1<<30 {
  300. // Never preallocate moe than 1 GB up front.
  301. if cap(dst)-len(dst) < int(frame.FrameContentSize) {
  302. dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize))
  303. copy(dst2, dst)
  304. dst = dst2
  305. }
  306. }
  307. if cap(dst) == 0 {
  308. // Allocate len(input) * 2 by default if nothing is provided
  309. // and we didn't get frame content size.
  310. size := len(input) * 2
  311. // Cap to 1 MB.
  312. if size > 1<<20 {
  313. size = 1 << 20
  314. }
  315. if uint64(size) > d.o.maxDecodedSize {
  316. size = int(d.o.maxDecodedSize)
  317. }
  318. dst = make([]byte, 0, size)
  319. }
  320. dst, err = frame.runDecoder(dst, block)
  321. if err != nil {
  322. return dst, err
  323. }
  324. if len(frame.bBuf) == 0 {
  325. if debug {
  326. println("frame dbuf empty")
  327. }
  328. break
  329. }
  330. }
  331. return dst, nil
  332. }
  333. // nextBlock returns the next block.
  334. // If an error occurs d.err will be set.
  335. // Optionally the function can block for new output.
  336. // If non-blocking mode is used the returned boolean will be false
  337. // if no data was available without blocking.
  338. func (d *Decoder) nextBlock(blocking bool) (ok bool) {
  339. if d.current.d != nil {
  340. if debug {
  341. printf("re-adding current decoder %p", d.current.d)
  342. }
  343. d.decoders <- d.current.d
  344. d.current.d = nil
  345. }
  346. if d.current.err != nil {
  347. // Keep error state.
  348. return blocking
  349. }
  350. if blocking {
  351. d.current.decodeOutput = <-d.current.output
  352. } else {
  353. select {
  354. case d.current.decodeOutput = <-d.current.output:
  355. default:
  356. return false
  357. }
  358. }
  359. if debug {
  360. println("got", len(d.current.b), "bytes, error:", d.current.err)
  361. }
  362. return true
  363. }
  364. // Close will release all resources.
  365. // It is NOT possible to reuse the decoder after this.
  366. func (d *Decoder) Close() {
  367. if d.current.err == ErrDecoderClosed {
  368. return
  369. }
  370. d.drainOutput()
  371. if d.stream != nil {
  372. close(d.stream)
  373. d.streamWg.Wait()
  374. d.stream = nil
  375. }
  376. if d.decoders != nil {
  377. close(d.decoders)
  378. for dec := range d.decoders {
  379. dec.Close()
  380. }
  381. d.decoders = nil
  382. }
  383. if d.current.d != nil {
  384. d.current.d.Close()
  385. d.current.d = nil
  386. }
  387. d.current.err = ErrDecoderClosed
  388. }
  389. // IOReadCloser returns the decoder as an io.ReadCloser for convenience.
  390. // Any changes to the decoder will be reflected, so the returned ReadCloser
  391. // can be reused along with the decoder.
  392. // io.WriterTo is also supported by the returned ReadCloser.
  393. func (d *Decoder) IOReadCloser() io.ReadCloser {
  394. return closeWrapper{d: d}
  395. }
  396. // closeWrapper wraps a function call as a closer.
  397. type closeWrapper struct {
  398. d *Decoder
  399. }
  400. // WriteTo forwards WriteTo calls to the decoder.
  401. func (c closeWrapper) WriteTo(w io.Writer) (n int64, err error) {
  402. return c.d.WriteTo(w)
  403. }
  404. // Read forwards read calls to the decoder.
  405. func (c closeWrapper) Read(p []byte) (n int, err error) {
  406. return c.d.Read(p)
  407. }
  408. // Close closes the decoder.
  409. func (c closeWrapper) Close() error {
  410. c.d.Close()
  411. return nil
  412. }
  413. type decodeOutput struct {
  414. d *blockDec
  415. b []byte
  416. err error
  417. }
  418. type decodeStream struct {
  419. r io.Reader
  420. // Blocks ready to be written to output.
  421. output chan decodeOutput
  422. // cancel reading from the input
  423. cancel chan struct{}
  424. }
  425. // errEndOfStream indicates that everything from the stream was read.
  426. var errEndOfStream = errors.New("end-of-stream")
  427. // Create Decoder:
  428. // Spawn n block decoders. These accept tasks to decode a block.
  429. // Create goroutine that handles stream processing, this will send history to decoders as they are available.
  430. // Decoders update the history as they decode.
  431. // When a block is returned:
  432. // a) history is sent to the next decoder,
  433. // b) content written to CRC.
  434. // c) return data to WRITER.
  435. // d) wait for next block to return data.
  436. // Once WRITTEN, the decoders reused by the writer frame decoder for re-use.
  437. func (d *Decoder) startStreamDecoder(inStream chan decodeStream) {
  438. defer d.streamWg.Done()
  439. frame := newFrameDec(d.o)
  440. for stream := range inStream {
  441. if debug {
  442. println("got new stream")
  443. }
  444. br := readerWrapper{r: stream.r}
  445. decodeStream:
  446. for {
  447. frame.history.reset()
  448. err := frame.reset(&br)
  449. if debug && err != nil {
  450. println("Frame decoder returned", err)
  451. }
  452. if err == nil && frame.DictionaryID != nil {
  453. dict, ok := d.dicts[*frame.DictionaryID]
  454. if !ok {
  455. err = ErrUnknownDictionary
  456. } else {
  457. frame.history.setDict(&dict)
  458. }
  459. }
  460. if err != nil {
  461. stream.output <- decodeOutput{
  462. err: err,
  463. }
  464. break
  465. }
  466. if debug {
  467. println("starting frame decoder")
  468. }
  469. // This goroutine will forward history between frames.
  470. frame.frameDone.Add(1)
  471. frame.initAsync()
  472. go frame.startDecoder(stream.output)
  473. decodeFrame:
  474. // Go through all blocks of the frame.
  475. for {
  476. dec := <-d.decoders
  477. select {
  478. case <-stream.cancel:
  479. if !frame.sendErr(dec, io.EOF) {
  480. // To not let the decoder dangle, send it back.
  481. stream.output <- decodeOutput{d: dec}
  482. }
  483. break decodeStream
  484. default:
  485. }
  486. err := frame.next(dec)
  487. switch err {
  488. case io.EOF:
  489. // End of current frame, no error
  490. println("EOF on next block")
  491. break decodeFrame
  492. case nil:
  493. continue
  494. default:
  495. println("block decoder returned", err)
  496. break decodeStream
  497. }
  498. }
  499. // All blocks have started decoding, check if there are more frames.
  500. println("waiting for done")
  501. frame.frameDone.Wait()
  502. println("done waiting...")
  503. }
  504. frame.frameDone.Wait()
  505. println("Sending EOS")
  506. stream.output <- decodeOutput{err: errEndOfStream}
  507. }
  508. }