|
@@ -35,6 +35,7 @@ type Decoder struct {
|
|
|
br readerWrapper
|
|
|
enabled bool
|
|
|
inFrame bool
|
|
|
+ dstBuf []byte
|
|
|
}
|
|
|
|
|
|
frame *frameDec
|
|
@@ -187,21 +188,23 @@ func (d *Decoder) Reset(r io.Reader) error {
|
|
|
}
|
|
|
|
|
|
// If bytes buffer and < 5MB, do sync decoding anyway.
|
|
|
- if bb, ok := r.(byter); ok && bb.Len() < 5<<20 {
|
|
|
+ if bb, ok := r.(byter); ok && bb.Len() < d.o.decodeBufsBelow && !d.o.limitToCap {
|
|
|
bb2 := bb
|
|
|
if debugDecoder {
|
|
|
println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
|
|
|
}
|
|
|
b := bb2.Bytes()
|
|
|
var dst []byte
|
|
|
- if cap(d.current.b) > 0 {
|
|
|
- dst = d.current.b
|
|
|
+ if cap(d.syncStream.dstBuf) > 0 {
|
|
|
+ dst = d.syncStream.dstBuf[:0]
|
|
|
}
|
|
|
|
|
|
- dst, err := d.DecodeAll(b, dst[:0])
|
|
|
+ dst, err := d.DecodeAll(b, dst)
|
|
|
if err == nil {
|
|
|
err = io.EOF
|
|
|
}
|
|
|
+ // Save output buffer
|
|
|
+ d.syncStream.dstBuf = dst
|
|
|
d.current.b = dst
|
|
|
d.current.err = err
|
|
|
d.current.flushed = true
|
|
@@ -216,6 +219,7 @@ func (d *Decoder) Reset(r io.Reader) error {
|
|
|
d.current.err = nil
|
|
|
d.current.flushed = false
|
|
|
d.current.d = nil
|
|
|
+ d.syncStream.dstBuf = nil
|
|
|
|
|
|
// Ensure no-one else is still running...
|
|
|
d.streamWg.Wait()
|
|
@@ -312,6 +316,7 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
|
|
|
// Grab a block decoder and frame decoder.
|
|
|
block := <-d.decoders
|
|
|
frame := block.localFrame
|
|
|
+ initialSize := len(dst)
|
|
|
defer func() {
|
|
|
if debugDecoder {
|
|
|
printf("re-adding decoder: %p", block)
|
|
@@ -354,7 +359,16 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
|
|
|
return dst, ErrWindowSizeExceeded
|
|
|
}
|
|
|
if frame.FrameContentSize != fcsUnknown {
|
|
|
- if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)) {
|
|
|
+ if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)-initialSize) {
|
|
|
+ if debugDecoder {
|
|
|
+ println("decoder size exceeded; fcs:", frame.FrameContentSize, "> mcs:", d.o.maxDecodedSize-uint64(len(dst)-initialSize), "len:", len(dst))
|
|
|
+ }
|
|
|
+ return dst, ErrDecoderSizeExceeded
|
|
|
+ }
|
|
|
+ if d.o.limitToCap && frame.FrameContentSize > uint64(cap(dst)-len(dst)) {
|
|
|
+ if debugDecoder {
|
|
|
+ println("decoder size exceeded; fcs:", frame.FrameContentSize, "> (cap-len)", cap(dst)-len(dst))
|
|
|
+ }
|
|
|
return dst, ErrDecoderSizeExceeded
|
|
|
}
|
|
|
if cap(dst)-len(dst) < int(frame.FrameContentSize) {
|
|
@@ -364,7 +378,7 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if cap(dst) == 0 {
|
|
|
+ if cap(dst) == 0 && !d.o.limitToCap {
|
|
|
// Allocate len(input) * 2 by default if nothing is provided
|
|
|
// and we didn't get frame content size.
|
|
|
size := len(input) * 2
|
|
@@ -382,6 +396,9 @@ func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
|
|
|
if err != nil {
|
|
|
return dst, err
|
|
|
}
|
|
|
+ if uint64(len(dst)-initialSize) > d.o.maxDecodedSize {
|
|
|
+ return dst, ErrDecoderSizeExceeded
|
|
|
+ }
|
|
|
if len(frame.bBuf) == 0 {
|
|
|
if debugDecoder {
|
|
|
println("frame dbuf empty")
|
|
@@ -667,6 +684,7 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch
|
|
|
if debugDecoder {
|
|
|
println("Async 1: new history, recent:", block.async.newHist.recentOffsets)
|
|
|
}
|
|
|
+ hist.reset()
|
|
|
hist.decoders = block.async.newHist.decoders
|
|
|
hist.recentOffsets = block.async.newHist.recentOffsets
|
|
|
hist.windowSize = block.async.newHist.windowSize
|
|
@@ -698,6 +716,7 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch
|
|
|
seqExecute <- block
|
|
|
}
|
|
|
close(seqExecute)
|
|
|
+ hist.reset()
|
|
|
}()
|
|
|
|
|
|
var wg sync.WaitGroup
|
|
@@ -721,6 +740,7 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch
|
|
|
if debugDecoder {
|
|
|
println("Async 2: new history")
|
|
|
}
|
|
|
+ hist.reset()
|
|
|
hist.windowSize = block.async.newHist.windowSize
|
|
|
hist.allocFrameBuffer = block.async.newHist.allocFrameBuffer
|
|
|
if block.async.newHist.dict != nil {
|
|
@@ -750,7 +770,7 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch
|
|
|
if block.lowMem {
|
|
|
block.dst = make([]byte, block.RLESize)
|
|
|
} else {
|
|
|
- block.dst = make([]byte, maxBlockSize)
|
|
|
+ block.dst = make([]byte, maxCompressedBlockSize)
|
|
|
}
|
|
|
}
|
|
|
block.dst = block.dst[:block.RLESize]
|
|
@@ -802,13 +822,14 @@ func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output ch
|
|
|
if debugDecoder {
|
|
|
println("decoder goroutines finished")
|
|
|
}
|
|
|
+ hist.reset()
|
|
|
}()
|
|
|
|
|
|
+ var hist history
|
|
|
decodeStream:
|
|
|
for {
|
|
|
- var hist history
|
|
|
var hasErr bool
|
|
|
-
|
|
|
+ hist.reset()
|
|
|
decodeBlock := func(block *blockDec) {
|
|
|
if hasErr {
|
|
|
if block != nil {
|
|
@@ -852,6 +873,10 @@ decodeStream:
|
|
|
}
|
|
|
}
|
|
|
if err == nil && d.frame.WindowSize > d.o.maxWindowSize {
|
|
|
+ if debugDecoder {
|
|
|
+ println("decoder size exceeded, fws:", d.frame.WindowSize, "> mws:", d.o.maxWindowSize)
|
|
|
+ }
|
|
|
+
|
|
|
err = ErrDecoderSizeExceeded
|
|
|
}
|
|
|
if err != nil {
|
|
@@ -920,5 +945,6 @@ decodeStream:
|
|
|
}
|
|
|
close(seqDecode)
|
|
|
wg.Wait()
|
|
|
+ hist.reset()
|
|
|
d.frame.history.b = frameHistCache
|
|
|
}
|