decoder.go 23 KB


  1. // Copyright 2019+ Klaus Post. All rights reserved.
  2. // License information can be found in the LICENSE file.
  3. // Based on work by Yann Collet, released under BSD License.
  4. package zstd
  5. import (
  6. "bytes"
  7. "context"
  8. "encoding/binary"
  9. "io"
  10. "sync"
  11. "github.com/klauspost/compress/zstd/internal/xxhash"
  12. )
  13. // Decoder provides decoding of zstandard streams.
  14. // The decoder has been designed to operate without allocations after a warmup.
  15. // This means that you should store the decoder for best performance.
  16. // To re-use a stream decoder, use the Reset(r io.Reader) error to switch to another stream.
  17. // A decoder can safely be re-used even if the previous stream failed.
  18. // To release the resources, you must call the Close() function on a decoder.
  19. type Decoder struct {
  20. o decoderOptions
  21. // Unreferenced decoders, ready for use.
  22. decoders chan *blockDec
  23. // Current read position used for Reader functionality.
  24. current decoderState
  25. // sync stream decoding
  26. syncStream struct {
  27. decodedFrame uint64
  28. br readerWrapper
  29. enabled bool
  30. inFrame bool
  31. dstBuf []byte
  32. }
  33. frame *frameDec
  34. // Custom dictionaries.
  35. // Always uses copies.
  36. dicts map[uint32]dict
  37. // streamWg is the waitgroup for all streams
  38. streamWg sync.WaitGroup
  39. }
  40. // decoderState is used for maintaining state when the decoder
  41. // is used for streaming.
  42. type decoderState struct {
  43. // current block being written to stream.
  44. decodeOutput
  45. // output in order to be written to stream.
  46. output chan decodeOutput
  47. // cancel remaining output.
  48. cancel context.CancelFunc
  49. // crc of current frame
  50. crc *xxhash.Digest
  51. flushed bool
  52. }
  53. var (
  54. // Check the interfaces we want to support.
  55. _ = io.WriterTo(&Decoder{})
  56. _ = io.Reader(&Decoder{})
  57. )
  58. // NewReader creates a new decoder.
  59. // A nil Reader can be provided in which case Reset can be used to start a decode.
  60. //
  61. // A Decoder can be used in two modes:
  62. //
  63. // 1) As a stream, or
  64. // 2) For stateless decoding using DecodeAll.
  65. //
  66. // Only a single stream can be decoded concurrently, but the same decoder
  67. // can run multiple concurrent stateless decodes. It is even possible to
  68. // use stateless decodes while a stream is being decoded.
  69. //
  70. // The Reset function can be used to initiate a new stream, which is will considerably
  71. // reduce the allocations normally caused by NewReader.
  72. func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
  73. initPredefined()
  74. var d Decoder
  75. d.o.setDefault()
  76. for _, o := range opts {
  77. err := o(&d.o)
  78. if err != nil {
  79. return nil, err
  80. }
  81. }
  82. d.current.crc = xxhash.New()
  83. d.current.flushed = true
  84. if r == nil {
  85. d.current.err = ErrDecoderNilInput
  86. }
  87. // Transfer option dicts.
  88. d.dicts = make(map[uint32]dict, len(d.o.dicts))
  89. for _, dc := range d.o.dicts {
  90. d.dicts[dc.id] = dc
  91. }
  92. d.o.dicts = nil
  93. // Create decoders
  94. d.decoders = make(chan *blockDec, d.o.concurrent)
  95. for i := 0; i < d.o.concurrent; i++ {
  96. dec := newBlockDec(d.o.lowMem)
  97. dec.localFrame = newFrameDec(d.o)
  98. d.decoders <- dec
  99. }
  100. if r == nil {
  101. return &d, nil
  102. }
  103. return &d, d.Reset(r)
  104. }
  105. // Read bytes from the decompressed stream into p.
  106. // Returns the number of bytes written and any error that occurred.
  107. // When the stream is done, io.EOF will be returned.
  108. func (d *Decoder) Read(p []byte) (int, error) {
  109. var n int
  110. for {
  111. if len(d.current.b) > 0 {
  112. filled := copy(p, d.current.b)
  113. p = p[filled:]
  114. d.current.b = d.current.b[filled:]
  115. n += filled
  116. }
  117. if len(p) == 0 {
  118. break
  119. }
  120. if len(d.current.b) == 0 {
  121. // We have an error and no more data
  122. if d.current.err != nil {
  123. break
  124. }
  125. if !d.nextBlock(n == 0) {
  126. return n, d.current.err
  127. }
  128. }
  129. }
  130. if len(d.current.b) > 0 {
  131. if debugDecoder {
  132. println("returning", n, "still bytes left:", len(d.current.b))
  133. }
  134. // Only return error at end of block
  135. return n, nil
  136. }
  137. if d.current.err != nil {
  138. d.drainOutput()
  139. }
  140. if debugDecoder {
  141. println("returning", n, d.current.err, len(d.decoders))
  142. }
  143. return n, d.current.err
  144. }
  145. // Reset will reset the decoder the supplied stream after the current has finished processing.
  146. // Note that this functionality cannot be used after Close has been called.
  147. // Reset can be called with a nil reader to release references to the previous reader.
  148. // After being called with a nil reader, no other operations than Reset or DecodeAll or Close
  149. // should be used.
  150. func (d *Decoder) Reset(r io.Reader) error {
  151. if d.current.err == ErrDecoderClosed {
  152. return d.current.err
  153. }
  154. d.drainOutput()
  155. d.syncStream.br.r = nil
  156. if r == nil {
  157. d.current.err = ErrDecoderNilInput
  158. if len(d.current.b) > 0 {
  159. d.current.b = d.current.b[:0]
  160. }
  161. d.current.flushed = true
  162. return nil
  163. }
  164. // If bytes buffer and < 5MB, do sync decoding anyway.
  165. if bb, ok := r.(byter); ok && bb.Len() < d.o.decodeBufsBelow && !d.o.limitToCap {
  166. bb2 := bb
  167. if debugDecoder {
  168. println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
  169. }
  170. b := bb2.Bytes()
  171. var dst []byte
  172. if cap(d.syncStream.dstBuf) > 0 {
  173. dst = d.syncStream.dstBuf[:0]
  174. }
  175. dst, err := d.DecodeAll(b, dst)
  176. if err == nil {
  177. err = io.EOF
  178. }
  179. // Save output buffer
  180. d.syncStream.dstBuf = dst
  181. d.current.b = dst
  182. d.current.err = err
  183. d.current.flushed = true
  184. if debugDecoder {
  185. println("sync decode to", len(dst), "bytes, err:", err)
  186. }
  187. return nil
  188. }
  189. // Remove current block.
  190. d.stashDecoder()
  191. d.current.decodeOutput = decodeOutput{}
  192. d.current.err = nil
  193. d.current.flushed = false
  194. d.current.d = nil
  195. d.syncStream.dstBuf = nil
  196. // Ensure no-one else is still running...
  197. d.streamWg.Wait()
  198. if d.frame == nil {
  199. d.frame = newFrameDec(d.o)
  200. }
  201. if d.o.concurrent == 1 {
  202. return d.startSyncDecoder(r)
  203. }
  204. d.current.output = make(chan decodeOutput, d.o.concurrent)
  205. ctx, cancel := context.WithCancel(context.Background())
  206. d.current.cancel = cancel
  207. d.streamWg.Add(1)
  208. go d.startStreamDecoder(ctx, r, d.current.output)
  209. return nil
  210. }
  211. // drainOutput will drain the output until errEndOfStream is sent.
  212. func (d *Decoder) drainOutput() {
  213. if d.current.cancel != nil {
  214. if debugDecoder {
  215. println("cancelling current")
  216. }
  217. d.current.cancel()
  218. d.current.cancel = nil
  219. }
  220. if d.current.d != nil {
  221. if debugDecoder {
  222. printf("re-adding current decoder %p, decoders: %d", d.current.d, len(d.decoders))
  223. }
  224. d.decoders <- d.current.d
  225. d.current.d = nil
  226. d.current.b = nil
  227. }
  228. if d.current.output == nil || d.current.flushed {
  229. println("current already flushed")
  230. return
  231. }
  232. for v := range d.current.output {
  233. if v.d != nil {
  234. if debugDecoder {
  235. printf("re-adding decoder %p", v.d)
  236. }
  237. d.decoders <- v.d
  238. }
  239. }
  240. d.current.output = nil
  241. d.current.flushed = true
  242. }
  243. // WriteTo writes data to w until there's no more data to write or when an error occurs.
  244. // The return value n is the number of bytes written.
  245. // Any error encountered during the write is also returned.
  246. func (d *Decoder) WriteTo(w io.Writer) (int64, error) {
  247. var n int64
  248. for {
  249. if len(d.current.b) > 0 {
  250. n2, err2 := w.Write(d.current.b)
  251. n += int64(n2)
  252. if err2 != nil && (d.current.err == nil || d.current.err == io.EOF) {
  253. d.current.err = err2
  254. } else if n2 != len(d.current.b) {
  255. d.current.err = io.ErrShortWrite
  256. }
  257. }
  258. if d.current.err != nil {
  259. break
  260. }
  261. d.nextBlock(true)
  262. }
  263. err := d.current.err
  264. if err != nil {
  265. d.drainOutput()
  266. }
  267. if err == io.EOF {
  268. err = nil
  269. }
  270. return n, err
  271. }
  272. // DecodeAll allows stateless decoding of a blob of bytes.
  273. // Output will be appended to dst, so if the destination size is known
  274. // you can pre-allocate the destination slice to avoid allocations.
  275. // DecodeAll can be used concurrently.
  276. // The Decoder concurrency limits will be respected.
  277. func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
  278. if d.decoders == nil {
  279. return dst, ErrDecoderClosed
  280. }
  281. // Grab a block decoder and frame decoder.
  282. block := <-d.decoders
  283. frame := block.localFrame
  284. initialSize := len(dst)
  285. defer func() {
  286. if debugDecoder {
  287. printf("re-adding decoder: %p", block)
  288. }
  289. frame.rawInput = nil
  290. frame.bBuf = nil
  291. if frame.history.decoders.br != nil {
  292. frame.history.decoders.br.in = nil
  293. }
  294. d.decoders <- block
  295. }()
  296. frame.bBuf = input
  297. for {
  298. frame.history.reset()
  299. err := frame.reset(&frame.bBuf)
  300. if err != nil {
  301. if err == io.EOF {
  302. if debugDecoder {
  303. println("frame reset return EOF")
  304. }
  305. return dst, nil
  306. }
  307. return dst, err
  308. }
  309. if frame.DictionaryID != nil {
  310. dict, ok := d.dicts[*frame.DictionaryID]
  311. if !ok {
  312. return nil, ErrUnknownDictionary
  313. }
  314. if debugDecoder {
  315. println("setting dict", frame.DictionaryID)
  316. }
  317. frame.history.setDict(&dict)
  318. }
  319. if frame.WindowSize > d.o.maxWindowSize {
  320. if debugDecoder {
  321. println("window size exceeded:", frame.WindowSize, ">", d.o.maxWindowSize)
  322. }
  323. return dst, ErrWindowSizeExceeded
  324. }
  325. if frame.FrameContentSize != fcsUnknown {
  326. if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)-initialSize) {
  327. if debugDecoder {
  328. println("decoder size exceeded; fcs:", frame.FrameContentSize, "> mcs:", d.o.maxDecodedSize-uint64(len(dst)-initialSize), "len:", len(dst))
  329. }
  330. return dst, ErrDecoderSizeExceeded
  331. }
  332. if d.o.limitToCap && frame.FrameContentSize > uint64(cap(dst)-len(dst)) {
  333. if debugDecoder {
  334. println("decoder size exceeded; fcs:", frame.FrameContentSize, "> (cap-len)", cap(dst)-len(dst))
  335. }
  336. return dst, ErrDecoderSizeExceeded
  337. }
  338. if cap(dst)-len(dst) < int(frame.FrameContentSize) {
  339. dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize)+compressedBlockOverAlloc)
  340. copy(dst2, dst)
  341. dst = dst2
  342. }
  343. }
  344. if cap(dst) == 0 && !d.o.limitToCap {
  345. // Allocate len(input) * 2 by default if nothing is provided
  346. // and we didn't get frame content size.
  347. size := len(input) * 2
  348. // Cap to 1 MB.
  349. if size > 1<<20 {
  350. size = 1 << 20
  351. }
  352. if uint64(size) > d.o.maxDecodedSize {
  353. size = int(d.o.maxDecodedSize)
  354. }
  355. dst = make([]byte, 0, size)
  356. }
  357. dst, err = frame.runDecoder(dst, block)
  358. if err != nil {
  359. return dst, err
  360. }
  361. if uint64(len(dst)-initialSize) > d.o.maxDecodedSize {
  362. return dst, ErrDecoderSizeExceeded
  363. }
  364. if len(frame.bBuf) == 0 {
  365. if debugDecoder {
  366. println("frame dbuf empty")
  367. }
  368. break
  369. }
  370. }
  371. return dst, nil
  372. }
  373. // nextBlock returns the next block.
  374. // If an error occurs d.err will be set.
  375. // Optionally the function can block for new output.
  376. // If non-blocking mode is used the returned boolean will be false
  377. // if no data was available without blocking.
  378. func (d *Decoder) nextBlock(blocking bool) (ok bool) {
  379. if d.current.err != nil {
  380. // Keep error state.
  381. return false
  382. }
  383. d.current.b = d.current.b[:0]
  384. // SYNC:
  385. if d.syncStream.enabled {
  386. if !blocking {
  387. return false
  388. }
  389. ok = d.nextBlockSync()
  390. if !ok {
  391. d.stashDecoder()
  392. }
  393. return ok
  394. }
  395. //ASYNC:
  396. d.stashDecoder()
  397. if blocking {
  398. d.current.decodeOutput, ok = <-d.current.output
  399. } else {
  400. select {
  401. case d.current.decodeOutput, ok = <-d.current.output:
  402. default:
  403. return false
  404. }
  405. }
  406. if !ok {
  407. // This should not happen, so signal error state...
  408. d.current.err = io.ErrUnexpectedEOF
  409. return false
  410. }
  411. next := d.current.decodeOutput
  412. if next.d != nil && next.d.async.newHist != nil {
  413. d.current.crc.Reset()
  414. }
  415. if debugDecoder {
  416. var tmp [4]byte
  417. binary.LittleEndian.PutUint32(tmp[:], uint32(xxhash.Sum64(next.b)))
  418. println("got", len(d.current.b), "bytes, error:", d.current.err, "data crc:", tmp)
  419. }
  420. if !d.o.ignoreChecksum && len(next.b) > 0 {
  421. n, err := d.current.crc.Write(next.b)
  422. if err == nil {
  423. if n != len(next.b) {
  424. d.current.err = io.ErrShortWrite
  425. }
  426. }
  427. }
  428. if next.err == nil && next.d != nil && len(next.d.checkCRC) != 0 {
  429. got := d.current.crc.Sum64()
  430. var tmp [4]byte
  431. binary.LittleEndian.PutUint32(tmp[:], uint32(got))
  432. if !d.o.ignoreChecksum && !bytes.Equal(tmp[:], next.d.checkCRC) {
  433. if debugDecoder {
  434. println("CRC Check Failed:", tmp[:], " (got) !=", next.d.checkCRC, "(on stream)")
  435. }
  436. d.current.err = ErrCRCMismatch
  437. } else {
  438. if debugDecoder {
  439. println("CRC ok", tmp[:])
  440. }
  441. }
  442. }
  443. return true
  444. }
  445. func (d *Decoder) nextBlockSync() (ok bool) {
  446. if d.current.d == nil {
  447. d.current.d = <-d.decoders
  448. }
  449. for len(d.current.b) == 0 {
  450. if !d.syncStream.inFrame {
  451. d.frame.history.reset()
  452. d.current.err = d.frame.reset(&d.syncStream.br)
  453. if d.current.err != nil {
  454. return false
  455. }
  456. if d.frame.DictionaryID != nil {
  457. dict, ok := d.dicts[*d.frame.DictionaryID]
  458. if !ok {
  459. d.current.err = ErrUnknownDictionary
  460. return false
  461. } else {
  462. d.frame.history.setDict(&dict)
  463. }
  464. }
  465. if d.frame.WindowSize > d.o.maxDecodedSize || d.frame.WindowSize > d.o.maxWindowSize {
  466. d.current.err = ErrDecoderSizeExceeded
  467. return false
  468. }
  469. d.syncStream.decodedFrame = 0
  470. d.syncStream.inFrame = true
  471. }
  472. d.current.err = d.frame.next(d.current.d)
  473. if d.current.err != nil {
  474. return false
  475. }
  476. d.frame.history.ensureBlock()
  477. if debugDecoder {
  478. println("History trimmed:", len(d.frame.history.b), "decoded already:", d.syncStream.decodedFrame)
  479. }
  480. histBefore := len(d.frame.history.b)
  481. d.current.err = d.current.d.decodeBuf(&d.frame.history)
  482. if d.current.err != nil {
  483. println("error after:", d.current.err)
  484. return false
  485. }
  486. d.current.b = d.frame.history.b[histBefore:]
  487. if debugDecoder {
  488. println("history after:", len(d.frame.history.b))
  489. }
  490. // Check frame size (before CRC)
  491. d.syncStream.decodedFrame += uint64(len(d.current.b))
  492. if d.syncStream.decodedFrame > d.frame.FrameContentSize {
  493. if debugDecoder {
  494. printf("DecodedFrame (%d) > FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize)
  495. }
  496. d.current.err = ErrFrameSizeExceeded
  497. return false
  498. }
  499. // Check FCS
  500. if d.current.d.Last && d.frame.FrameContentSize != fcsUnknown && d.syncStream.decodedFrame != d.frame.FrameContentSize {
  501. if debugDecoder {
  502. printf("DecodedFrame (%d) != FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize)
  503. }
  504. d.current.err = ErrFrameSizeMismatch
  505. return false
  506. }
  507. // Update/Check CRC
  508. if d.frame.HasCheckSum {
  509. if !d.o.ignoreChecksum {
  510. d.frame.crc.Write(d.current.b)
  511. }
  512. if d.current.d.Last {
  513. if !d.o.ignoreChecksum {
  514. d.current.err = d.frame.checkCRC()
  515. } else {
  516. d.current.err = d.frame.consumeCRC()
  517. }
  518. if d.current.err != nil {
  519. println("CRC error:", d.current.err)
  520. return false
  521. }
  522. }
  523. }
  524. d.syncStream.inFrame = !d.current.d.Last
  525. }
  526. return true
  527. }
  528. func (d *Decoder) stashDecoder() {
  529. if d.current.d != nil {
  530. if debugDecoder {
  531. printf("re-adding current decoder %p", d.current.d)
  532. }
  533. d.decoders <- d.current.d
  534. d.current.d = nil
  535. }
  536. }
  537. // Close will release all resources.
  538. // It is NOT possible to reuse the decoder after this.
  539. func (d *Decoder) Close() {
  540. if d.current.err == ErrDecoderClosed {
  541. return
  542. }
  543. d.drainOutput()
  544. if d.current.cancel != nil {
  545. d.current.cancel()
  546. d.streamWg.Wait()
  547. d.current.cancel = nil
  548. }
  549. if d.decoders != nil {
  550. close(d.decoders)
  551. for dec := range d.decoders {
  552. dec.Close()
  553. }
  554. d.decoders = nil
  555. }
  556. if d.current.d != nil {
  557. d.current.d.Close()
  558. d.current.d = nil
  559. }
  560. d.current.err = ErrDecoderClosed
  561. }
  562. // IOReadCloser returns the decoder as an io.ReadCloser for convenience.
  563. // Any changes to the decoder will be reflected, so the returned ReadCloser
  564. // can be reused along with the decoder.
  565. // io.WriterTo is also supported by the returned ReadCloser.
  566. func (d *Decoder) IOReadCloser() io.ReadCloser {
  567. return closeWrapper{d: d}
  568. }
  569. // closeWrapper wraps a function call as a closer.
  570. type closeWrapper struct {
  571. d *Decoder
  572. }
  573. // WriteTo forwards WriteTo calls to the decoder.
  574. func (c closeWrapper) WriteTo(w io.Writer) (n int64, err error) {
  575. return c.d.WriteTo(w)
  576. }
  577. // Read forwards read calls to the decoder.
  578. func (c closeWrapper) Read(p []byte) (n int, err error) {
  579. return c.d.Read(p)
  580. }
  581. // Close closes the decoder.
  582. func (c closeWrapper) Close() error {
  583. c.d.Close()
  584. return nil
  585. }
  586. type decodeOutput struct {
  587. d *blockDec
  588. b []byte
  589. err error
  590. }
  591. func (d *Decoder) startSyncDecoder(r io.Reader) error {
  592. d.frame.history.reset()
  593. d.syncStream.br = readerWrapper{r: r}
  594. d.syncStream.inFrame = false
  595. d.syncStream.enabled = true
  596. d.syncStream.decodedFrame = 0
  597. return nil
  598. }
  599. // Create Decoder:
  600. // ASYNC:
  601. // Spawn 3 go routines.
  602. // 0: Read frames and decode block literals.
  603. // 1: Decode sequences.
  604. // 2: Execute sequences, send to output.
  605. func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output chan decodeOutput) {
  606. defer d.streamWg.Done()
  607. br := readerWrapper{r: r}
  608. var seqDecode = make(chan *blockDec, d.o.concurrent)
  609. var seqExecute = make(chan *blockDec, d.o.concurrent)
  610. // Async 1: Decode sequences...
  611. go func() {
  612. var hist history
  613. var hasErr bool
  614. for block := range seqDecode {
  615. if hasErr {
  616. if block != nil {
  617. seqExecute <- block
  618. }
  619. continue
  620. }
  621. if block.async.newHist != nil {
  622. if debugDecoder {
  623. println("Async 1: new history, recent:", block.async.newHist.recentOffsets)
  624. }
  625. hist.reset()
  626. hist.decoders = block.async.newHist.decoders
  627. hist.recentOffsets = block.async.newHist.recentOffsets
  628. hist.windowSize = block.async.newHist.windowSize
  629. if block.async.newHist.dict != nil {
  630. hist.setDict(block.async.newHist.dict)
  631. }
  632. }
  633. if block.err != nil || block.Type != blockTypeCompressed {
  634. hasErr = block.err != nil
  635. seqExecute <- block
  636. continue
  637. }
  638. hist.decoders.literals = block.async.literals
  639. block.err = block.prepareSequences(block.async.seqData, &hist)
  640. if debugDecoder && block.err != nil {
  641. println("prepareSequences returned:", block.err)
  642. }
  643. hasErr = block.err != nil
  644. if block.err == nil {
  645. block.err = block.decodeSequences(&hist)
  646. if debugDecoder && block.err != nil {
  647. println("decodeSequences returned:", block.err)
  648. }
  649. hasErr = block.err != nil
  650. // block.async.sequence = hist.decoders.seq[:hist.decoders.nSeqs]
  651. block.async.seqSize = hist.decoders.seqSize
  652. }
  653. seqExecute <- block
  654. }
  655. close(seqExecute)
  656. hist.reset()
  657. }()
  658. var wg sync.WaitGroup
  659. wg.Add(1)
  660. // Async 3: Execute sequences...
  661. frameHistCache := d.frame.history.b
  662. go func() {
  663. var hist history
  664. var decodedFrame uint64
  665. var fcs uint64
  666. var hasErr bool
  667. for block := range seqExecute {
  668. out := decodeOutput{err: block.err, d: block}
  669. if block.err != nil || hasErr {
  670. hasErr = true
  671. output <- out
  672. continue
  673. }
  674. if block.async.newHist != nil {
  675. if debugDecoder {
  676. println("Async 2: new history")
  677. }
  678. hist.reset()
  679. hist.windowSize = block.async.newHist.windowSize
  680. hist.allocFrameBuffer = block.async.newHist.allocFrameBuffer
  681. if block.async.newHist.dict != nil {
  682. hist.setDict(block.async.newHist.dict)
  683. }
  684. if cap(hist.b) < hist.allocFrameBuffer {
  685. if cap(frameHistCache) >= hist.allocFrameBuffer {
  686. hist.b = frameHistCache
  687. } else {
  688. hist.b = make([]byte, 0, hist.allocFrameBuffer)
  689. println("Alloc history sized", hist.allocFrameBuffer)
  690. }
  691. }
  692. hist.b = hist.b[:0]
  693. fcs = block.async.fcs
  694. decodedFrame = 0
  695. }
  696. do := decodeOutput{err: block.err, d: block}
  697. switch block.Type {
  698. case blockTypeRLE:
  699. if debugDecoder {
  700. println("add rle block length:", block.RLESize)
  701. }
  702. if cap(block.dst) < int(block.RLESize) {
  703. if block.lowMem {
  704. block.dst = make([]byte, block.RLESize)
  705. } else {
  706. block.dst = make([]byte, maxCompressedBlockSize)
  707. }
  708. }
  709. block.dst = block.dst[:block.RLESize]
  710. v := block.data[0]
  711. for i := range block.dst {
  712. block.dst[i] = v
  713. }
  714. hist.append(block.dst)
  715. do.b = block.dst
  716. case blockTypeRaw:
  717. if debugDecoder {
  718. println("add raw block length:", len(block.data))
  719. }
  720. hist.append(block.data)
  721. do.b = block.data
  722. case blockTypeCompressed:
  723. if debugDecoder {
  724. println("execute with history length:", len(hist.b), "window:", hist.windowSize)
  725. }
  726. hist.decoders.seqSize = block.async.seqSize
  727. hist.decoders.literals = block.async.literals
  728. do.err = block.executeSequences(&hist)
  729. hasErr = do.err != nil
  730. if debugDecoder && hasErr {
  731. println("executeSequences returned:", do.err)
  732. }
  733. do.b = block.dst
  734. }
  735. if !hasErr {
  736. decodedFrame += uint64(len(do.b))
  737. if decodedFrame > fcs {
  738. println("fcs exceeded", block.Last, fcs, decodedFrame)
  739. do.err = ErrFrameSizeExceeded
  740. hasErr = true
  741. } else if block.Last && fcs != fcsUnknown && decodedFrame != fcs {
  742. do.err = ErrFrameSizeMismatch
  743. hasErr = true
  744. } else {
  745. if debugDecoder {
  746. println("fcs ok", block.Last, fcs, decodedFrame)
  747. }
  748. }
  749. }
  750. output <- do
  751. }
  752. close(output)
  753. frameHistCache = hist.b
  754. wg.Done()
  755. if debugDecoder {
  756. println("decoder goroutines finished")
  757. }
  758. hist.reset()
  759. }()
  760. var hist history
  761. decodeStream:
  762. for {
  763. var hasErr bool
  764. hist.reset()
  765. decodeBlock := func(block *blockDec) {
  766. if hasErr {
  767. if block != nil {
  768. seqDecode <- block
  769. }
  770. return
  771. }
  772. if block.err != nil || block.Type != blockTypeCompressed {
  773. hasErr = block.err != nil
  774. seqDecode <- block
  775. return
  776. }
  777. remain, err := block.decodeLiterals(block.data, &hist)
  778. block.err = err
  779. hasErr = block.err != nil
  780. if err == nil {
  781. block.async.literals = hist.decoders.literals
  782. block.async.seqData = remain
  783. } else if debugDecoder {
  784. println("decodeLiterals error:", err)
  785. }
  786. seqDecode <- block
  787. }
  788. frame := d.frame
  789. if debugDecoder {
  790. println("New frame...")
  791. }
  792. var historySent bool
  793. frame.history.reset()
  794. err := frame.reset(&br)
  795. if debugDecoder && err != nil {
  796. println("Frame decoder returned", err)
  797. }
  798. if err == nil && frame.DictionaryID != nil {
  799. dict, ok := d.dicts[*frame.DictionaryID]
  800. if !ok {
  801. err = ErrUnknownDictionary
  802. } else {
  803. frame.history.setDict(&dict)
  804. }
  805. }
  806. if err == nil && d.frame.WindowSize > d.o.maxWindowSize {
  807. if debugDecoder {
  808. println("decoder size exceeded, fws:", d.frame.WindowSize, "> mws:", d.o.maxWindowSize)
  809. }
  810. err = ErrDecoderSizeExceeded
  811. }
  812. if err != nil {
  813. select {
  814. case <-ctx.Done():
  815. case dec := <-d.decoders:
  816. dec.sendErr(err)
  817. decodeBlock(dec)
  818. }
  819. break decodeStream
  820. }
  821. // Go through all blocks of the frame.
  822. for {
  823. var dec *blockDec
  824. select {
  825. case <-ctx.Done():
  826. break decodeStream
  827. case dec = <-d.decoders:
  828. // Once we have a decoder, we MUST return it.
  829. }
  830. err := frame.next(dec)
  831. if !historySent {
  832. h := frame.history
  833. if debugDecoder {
  834. println("Alloc History:", h.allocFrameBuffer)
  835. }
  836. hist.reset()
  837. if h.dict != nil {
  838. hist.setDict(h.dict)
  839. }
  840. dec.async.newHist = &h
  841. dec.async.fcs = frame.FrameContentSize
  842. historySent = true
  843. } else {
  844. dec.async.newHist = nil
  845. }
  846. if debugDecoder && err != nil {
  847. println("next block returned error:", err)
  848. }
  849. dec.err = err
  850. dec.checkCRC = nil
  851. if dec.Last && frame.HasCheckSum && err == nil {
  852. crc, err := frame.rawInput.readSmall(4)
  853. if err != nil {
  854. println("CRC missing?", err)
  855. dec.err = err
  856. }
  857. var tmp [4]byte
  858. copy(tmp[:], crc)
  859. dec.checkCRC = tmp[:]
  860. if debugDecoder {
  861. println("found crc to check:", dec.checkCRC)
  862. }
  863. }
  864. err = dec.err
  865. last := dec.Last
  866. decodeBlock(dec)
  867. if err != nil {
  868. break decodeStream
  869. }
  870. if last {
  871. break
  872. }
  873. }
  874. }
  875. close(seqDecode)
  876. wg.Wait()
  877. hist.reset()
  878. d.frame.history.b = frameHistCache
  879. }