decoder.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941
  1. // Copyright 2019+ Klaus Post. All rights reserved.
  2. // License information can be found in the LICENSE file.
  3. // Based on work by Yann Collet, released under BSD License.
  4. package zstd
  5. import (
  6. "bytes"
  7. "context"
  8. "encoding/binary"
  9. "io"
  10. "sync"
  11. "github.com/klauspost/compress/zstd/internal/xxhash"
  12. )
  13. // Decoder provides decoding of zstandard streams.
  14. // The decoder has been designed to operate without allocations after a warmup.
  15. // This means that you should store the decoder for best performance.
  16. // To re-use a stream decoder, use the Reset(r io.Reader) error to switch to another stream.
  17. // A decoder can safely be re-used even if the previous stream failed.
  18. // To release the resources, you must call the Close() function on a decoder.
  19. type Decoder struct {
  20. o decoderOptions
  21. // Unreferenced decoders, ready for use.
  22. decoders chan *blockDec
  23. // Current read position used for Reader functionality.
  24. current decoderState
  25. // sync stream decoding
  26. syncStream struct {
  27. decodedFrame uint64
  28. br readerWrapper
  29. enabled bool
  30. inFrame bool
  31. }
  32. frame *frameDec
  33. // Custom dictionaries.
  34. // Always uses copies.
  35. dicts map[uint32]dict
  36. // streamWg is the waitgroup for all streams
  37. streamWg sync.WaitGroup
  38. }
  39. // decoderState is used for maintaining state when the decoder
  40. // is used for streaming.
  41. type decoderState struct {
  42. // current block being written to stream.
  43. decodeOutput
  44. // output in order to be written to stream.
  45. output chan decodeOutput
  46. // cancel remaining output.
  47. cancel context.CancelFunc
  48. // crc of current frame
  49. crc *xxhash.Digest
  50. flushed bool
  51. }
  52. var (
  53. // Check the interfaces we want to support.
  54. _ = io.WriterTo(&Decoder{})
  55. _ = io.Reader(&Decoder{})
  56. )
  57. // NewReader creates a new decoder.
  58. // A nil Reader can be provided in which case Reset can be used to start a decode.
  59. //
  60. // A Decoder can be used in two modes:
  61. //
  62. // 1) As a stream, or
  63. // 2) For stateless decoding using DecodeAll.
  64. //
  65. // Only a single stream can be decoded concurrently, but the same decoder
  66. // can run multiple concurrent stateless decodes. It is even possible to
  67. // use stateless decodes while a stream is being decoded.
  68. //
  69. // The Reset function can be used to initiate a new stream, which is will considerably
  70. // reduce the allocations normally caused by NewReader.
  71. func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
  72. initPredefined()
  73. var d Decoder
  74. d.o.setDefault()
  75. for _, o := range opts {
  76. err := o(&d.o)
  77. if err != nil {
  78. return nil, err
  79. }
  80. }
  81. d.current.crc = xxhash.New()
  82. d.current.flushed = true
  83. if r == nil {
  84. d.current.err = ErrDecoderNilInput
  85. }
  86. // Transfer option dicts.
  87. d.dicts = make(map[uint32]dict, len(d.o.dicts))
  88. for _, dc := range d.o.dicts {
  89. d.dicts[dc.id] = dc
  90. }
  91. d.o.dicts = nil
  92. // Create decoders
  93. d.decoders = make(chan *blockDec, d.o.concurrent)
  94. for i := 0; i < d.o.concurrent; i++ {
  95. dec := newBlockDec(d.o.lowMem)
  96. dec.localFrame = newFrameDec(d.o)
  97. d.decoders <- dec
  98. }
  99. if r == nil {
  100. return &d, nil
  101. }
  102. return &d, d.Reset(r)
  103. }
  104. // Read bytes from the decompressed stream into p.
  105. // Returns the number of bytes written and any error that occurred.
  106. // When the stream is done, io.EOF will be returned.
  107. func (d *Decoder) Read(p []byte) (int, error) {
  108. var n int
  109. for {
  110. if len(d.current.b) > 0 {
  111. filled := copy(p, d.current.b)
  112. p = p[filled:]
  113. d.current.b = d.current.b[filled:]
  114. n += filled
  115. }
  116. if len(p) == 0 {
  117. break
  118. }
  119. if len(d.current.b) == 0 {
  120. // We have an error and no more data
  121. if d.current.err != nil {
  122. break
  123. }
  124. if !d.nextBlock(n == 0) {
  125. return n, d.current.err
  126. }
  127. }
  128. }
  129. if len(d.current.b) > 0 {
  130. if debugDecoder {
  131. println("returning", n, "still bytes left:", len(d.current.b))
  132. }
  133. // Only return error at end of block
  134. return n, nil
  135. }
  136. if d.current.err != nil {
  137. d.drainOutput()
  138. }
  139. if debugDecoder {
  140. println("returning", n, d.current.err, len(d.decoders))
  141. }
  142. return n, d.current.err
  143. }
  144. // Reset will reset the decoder the supplied stream after the current has finished processing.
  145. // Note that this functionality cannot be used after Close has been called.
  146. // Reset can be called with a nil reader to release references to the previous reader.
  147. // After being called with a nil reader, no other operations than Reset or DecodeAll or Close
  148. // should be used.
  149. func (d *Decoder) Reset(r io.Reader) error {
  150. if d.current.err == ErrDecoderClosed {
  151. return d.current.err
  152. }
  153. d.drainOutput()
  154. d.syncStream.br.r = nil
  155. if r == nil {
  156. d.current.err = ErrDecoderNilInput
  157. if len(d.current.b) > 0 {
  158. d.current.b = d.current.b[:0]
  159. }
  160. d.current.flushed = true
  161. return nil
  162. }
  163. // If bytes buffer and < 5MB, do sync decoding anyway.
  164. if bb, ok := r.(byter); ok && bb.Len() < 5<<20 {
  165. bb2 := bb
  166. if debugDecoder {
  167. println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
  168. }
  169. b := bb2.Bytes()
  170. var dst []byte
  171. if cap(d.current.b) > 0 {
  172. dst = d.current.b
  173. }
  174. dst, err := d.DecodeAll(b, dst[:0])
  175. if err == nil {
  176. err = io.EOF
  177. }
  178. d.current.b = dst
  179. d.current.err = err
  180. d.current.flushed = true
  181. if debugDecoder {
  182. println("sync decode to", len(dst), "bytes, err:", err)
  183. }
  184. return nil
  185. }
  186. // Remove current block.
  187. d.stashDecoder()
  188. d.current.decodeOutput = decodeOutput{}
  189. d.current.err = nil
  190. d.current.flushed = false
  191. d.current.d = nil
  192. // Ensure no-one else is still running...
  193. d.streamWg.Wait()
  194. if d.frame == nil {
  195. d.frame = newFrameDec(d.o)
  196. }
  197. if d.o.concurrent == 1 {
  198. return d.startSyncDecoder(r)
  199. }
  200. d.current.output = make(chan decodeOutput, d.o.concurrent)
  201. ctx, cancel := context.WithCancel(context.Background())
  202. d.current.cancel = cancel
  203. d.streamWg.Add(1)
  204. go d.startStreamDecoder(ctx, r, d.current.output)
  205. return nil
  206. }
  207. // drainOutput will drain the output until errEndOfStream is sent.
  208. func (d *Decoder) drainOutput() {
  209. if d.current.cancel != nil {
  210. if debugDecoder {
  211. println("cancelling current")
  212. }
  213. d.current.cancel()
  214. d.current.cancel = nil
  215. }
  216. if d.current.d != nil {
  217. if debugDecoder {
  218. printf("re-adding current decoder %p, decoders: %d", d.current.d, len(d.decoders))
  219. }
  220. d.decoders <- d.current.d
  221. d.current.d = nil
  222. d.current.b = nil
  223. }
  224. if d.current.output == nil || d.current.flushed {
  225. println("current already flushed")
  226. return
  227. }
  228. for v := range d.current.output {
  229. if v.d != nil {
  230. if debugDecoder {
  231. printf("re-adding decoder %p", v.d)
  232. }
  233. d.decoders <- v.d
  234. }
  235. }
  236. d.current.output = nil
  237. d.current.flushed = true
  238. }
  239. // WriteTo writes data to w until there's no more data to write or when an error occurs.
  240. // The return value n is the number of bytes written.
  241. // Any error encountered during the write is also returned.
  242. func (d *Decoder) WriteTo(w io.Writer) (int64, error) {
  243. var n int64
  244. for {
  245. if len(d.current.b) > 0 {
  246. n2, err2 := w.Write(d.current.b)
  247. n += int64(n2)
  248. if err2 != nil && (d.current.err == nil || d.current.err == io.EOF) {
  249. d.current.err = err2
  250. } else if n2 != len(d.current.b) {
  251. d.current.err = io.ErrShortWrite
  252. }
  253. }
  254. if d.current.err != nil {
  255. break
  256. }
  257. d.nextBlock(true)
  258. }
  259. err := d.current.err
  260. if err != nil {
  261. d.drainOutput()
  262. }
  263. if err == io.EOF {
  264. err = nil
  265. }
  266. return n, err
  267. }
  268. // DecodeAll allows stateless decoding of a blob of bytes.
  269. // Output will be appended to dst, so if the destination size is known
  270. // you can pre-allocate the destination slice to avoid allocations.
  271. // DecodeAll can be used concurrently.
  272. // The Decoder concurrency limits will be respected.
  273. func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
  274. if d.decoders == nil {
  275. return dst, ErrDecoderClosed
  276. }
  277. // Grab a block decoder and frame decoder.
  278. block := <-d.decoders
  279. frame := block.localFrame
  280. initialSize := len(dst)
  281. defer func() {
  282. if debugDecoder {
  283. printf("re-adding decoder: %p", block)
  284. }
  285. frame.rawInput = nil
  286. frame.bBuf = nil
  287. if frame.history.decoders.br != nil {
  288. frame.history.decoders.br.in = nil
  289. }
  290. d.decoders <- block
  291. }()
  292. frame.bBuf = input
  293. for {
  294. frame.history.reset()
  295. err := frame.reset(&frame.bBuf)
  296. if err != nil {
  297. if err == io.EOF {
  298. if debugDecoder {
  299. println("frame reset return EOF")
  300. }
  301. return dst, nil
  302. }
  303. return dst, err
  304. }
  305. if frame.DictionaryID != nil {
  306. dict, ok := d.dicts[*frame.DictionaryID]
  307. if !ok {
  308. return nil, ErrUnknownDictionary
  309. }
  310. if debugDecoder {
  311. println("setting dict", frame.DictionaryID)
  312. }
  313. frame.history.setDict(&dict)
  314. }
  315. if frame.WindowSize > d.o.maxWindowSize {
  316. if debugDecoder {
  317. println("window size exceeded:", frame.WindowSize, ">", d.o.maxWindowSize)
  318. }
  319. return dst, ErrWindowSizeExceeded
  320. }
  321. if frame.FrameContentSize != fcsUnknown {
  322. if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)-initialSize) {
  323. if debugDecoder {
  324. println("decoder size exceeded; fcs:", frame.FrameContentSize, "> mcs:", d.o.maxDecodedSize-uint64(len(dst)-initialSize), "len:", len(dst))
  325. }
  326. return dst, ErrDecoderSizeExceeded
  327. }
  328. if d.o.limitToCap && frame.FrameContentSize > uint64(cap(dst)-len(dst)) {
  329. if debugDecoder {
  330. println("decoder size exceeded; fcs:", frame.FrameContentSize, "> (cap-len)", cap(dst)-len(dst))
  331. }
  332. return dst, ErrDecoderSizeExceeded
  333. }
  334. if cap(dst)-len(dst) < int(frame.FrameContentSize) {
  335. dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize)+compressedBlockOverAlloc)
  336. copy(dst2, dst)
  337. dst = dst2
  338. }
  339. }
  340. if cap(dst) == 0 && !d.o.limitToCap {
  341. // Allocate len(input) * 2 by default if nothing is provided
  342. // and we didn't get frame content size.
  343. size := len(input) * 2
  344. // Cap to 1 MB.
  345. if size > 1<<20 {
  346. size = 1 << 20
  347. }
  348. if uint64(size) > d.o.maxDecodedSize {
  349. size = int(d.o.maxDecodedSize)
  350. }
  351. dst = make([]byte, 0, size)
  352. }
  353. dst, err = frame.runDecoder(dst, block)
  354. if err != nil {
  355. return dst, err
  356. }
  357. if uint64(len(dst)-initialSize) > d.o.maxDecodedSize {
  358. return dst, ErrDecoderSizeExceeded
  359. }
  360. if len(frame.bBuf) == 0 {
  361. if debugDecoder {
  362. println("frame dbuf empty")
  363. }
  364. break
  365. }
  366. }
  367. return dst, nil
  368. }
  369. // nextBlock returns the next block.
  370. // If an error occurs d.err will be set.
  371. // Optionally the function can block for new output.
  372. // If non-blocking mode is used the returned boolean will be false
  373. // if no data was available without blocking.
  374. func (d *Decoder) nextBlock(blocking bool) (ok bool) {
  375. if d.current.err != nil {
  376. // Keep error state.
  377. return false
  378. }
  379. d.current.b = d.current.b[:0]
  380. // SYNC:
  381. if d.syncStream.enabled {
  382. if !blocking {
  383. return false
  384. }
  385. ok = d.nextBlockSync()
  386. if !ok {
  387. d.stashDecoder()
  388. }
  389. return ok
  390. }
  391. //ASYNC:
  392. d.stashDecoder()
  393. if blocking {
  394. d.current.decodeOutput, ok = <-d.current.output
  395. } else {
  396. select {
  397. case d.current.decodeOutput, ok = <-d.current.output:
  398. default:
  399. return false
  400. }
  401. }
  402. if !ok {
  403. // This should not happen, so signal error state...
  404. d.current.err = io.ErrUnexpectedEOF
  405. return false
  406. }
  407. next := d.current.decodeOutput
  408. if next.d != nil && next.d.async.newHist != nil {
  409. d.current.crc.Reset()
  410. }
  411. if debugDecoder {
  412. var tmp [4]byte
  413. binary.LittleEndian.PutUint32(tmp[:], uint32(xxhash.Sum64(next.b)))
  414. println("got", len(d.current.b), "bytes, error:", d.current.err, "data crc:", tmp)
  415. }
  416. if !d.o.ignoreChecksum && len(next.b) > 0 {
  417. n, err := d.current.crc.Write(next.b)
  418. if err == nil {
  419. if n != len(next.b) {
  420. d.current.err = io.ErrShortWrite
  421. }
  422. }
  423. }
  424. if next.err == nil && next.d != nil && len(next.d.checkCRC) != 0 {
  425. got := d.current.crc.Sum64()
  426. var tmp [4]byte
  427. binary.LittleEndian.PutUint32(tmp[:], uint32(got))
  428. if !d.o.ignoreChecksum && !bytes.Equal(tmp[:], next.d.checkCRC) {
  429. if debugDecoder {
  430. println("CRC Check Failed:", tmp[:], " (got) !=", next.d.checkCRC, "(on stream)")
  431. }
  432. d.current.err = ErrCRCMismatch
  433. } else {
  434. if debugDecoder {
  435. println("CRC ok", tmp[:])
  436. }
  437. }
  438. }
  439. return true
  440. }
  441. func (d *Decoder) nextBlockSync() (ok bool) {
  442. if d.current.d == nil {
  443. d.current.d = <-d.decoders
  444. }
  445. for len(d.current.b) == 0 {
  446. if !d.syncStream.inFrame {
  447. d.frame.history.reset()
  448. d.current.err = d.frame.reset(&d.syncStream.br)
  449. if d.current.err != nil {
  450. return false
  451. }
  452. if d.frame.DictionaryID != nil {
  453. dict, ok := d.dicts[*d.frame.DictionaryID]
  454. if !ok {
  455. d.current.err = ErrUnknownDictionary
  456. return false
  457. } else {
  458. d.frame.history.setDict(&dict)
  459. }
  460. }
  461. if d.frame.WindowSize > d.o.maxDecodedSize || d.frame.WindowSize > d.o.maxWindowSize {
  462. d.current.err = ErrDecoderSizeExceeded
  463. return false
  464. }
  465. d.syncStream.decodedFrame = 0
  466. d.syncStream.inFrame = true
  467. }
  468. d.current.err = d.frame.next(d.current.d)
  469. if d.current.err != nil {
  470. return false
  471. }
  472. d.frame.history.ensureBlock()
  473. if debugDecoder {
  474. println("History trimmed:", len(d.frame.history.b), "decoded already:", d.syncStream.decodedFrame)
  475. }
  476. histBefore := len(d.frame.history.b)
  477. d.current.err = d.current.d.decodeBuf(&d.frame.history)
  478. if d.current.err != nil {
  479. println("error after:", d.current.err)
  480. return false
  481. }
  482. d.current.b = d.frame.history.b[histBefore:]
  483. if debugDecoder {
  484. println("history after:", len(d.frame.history.b))
  485. }
  486. // Check frame size (before CRC)
  487. d.syncStream.decodedFrame += uint64(len(d.current.b))
  488. if d.syncStream.decodedFrame > d.frame.FrameContentSize {
  489. if debugDecoder {
  490. printf("DecodedFrame (%d) > FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize)
  491. }
  492. d.current.err = ErrFrameSizeExceeded
  493. return false
  494. }
  495. // Check FCS
  496. if d.current.d.Last && d.frame.FrameContentSize != fcsUnknown && d.syncStream.decodedFrame != d.frame.FrameContentSize {
  497. if debugDecoder {
  498. printf("DecodedFrame (%d) != FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize)
  499. }
  500. d.current.err = ErrFrameSizeMismatch
  501. return false
  502. }
  503. // Update/Check CRC
  504. if d.frame.HasCheckSum {
  505. if !d.o.ignoreChecksum {
  506. d.frame.crc.Write(d.current.b)
  507. }
  508. if d.current.d.Last {
  509. if !d.o.ignoreChecksum {
  510. d.current.err = d.frame.checkCRC()
  511. } else {
  512. d.current.err = d.frame.consumeCRC()
  513. }
  514. if d.current.err != nil {
  515. println("CRC error:", d.current.err)
  516. return false
  517. }
  518. }
  519. }
  520. d.syncStream.inFrame = !d.current.d.Last
  521. }
  522. return true
  523. }
  524. func (d *Decoder) stashDecoder() {
  525. if d.current.d != nil {
  526. if debugDecoder {
  527. printf("re-adding current decoder %p", d.current.d)
  528. }
  529. d.decoders <- d.current.d
  530. d.current.d = nil
  531. }
  532. }
  533. // Close will release all resources.
  534. // It is NOT possible to reuse the decoder after this.
  535. func (d *Decoder) Close() {
  536. if d.current.err == ErrDecoderClosed {
  537. return
  538. }
  539. d.drainOutput()
  540. if d.current.cancel != nil {
  541. d.current.cancel()
  542. d.streamWg.Wait()
  543. d.current.cancel = nil
  544. }
  545. if d.decoders != nil {
  546. close(d.decoders)
  547. for dec := range d.decoders {
  548. dec.Close()
  549. }
  550. d.decoders = nil
  551. }
  552. if d.current.d != nil {
  553. d.current.d.Close()
  554. d.current.d = nil
  555. }
  556. d.current.err = ErrDecoderClosed
  557. }
  558. // IOReadCloser returns the decoder as an io.ReadCloser for convenience.
  559. // Any changes to the decoder will be reflected, so the returned ReadCloser
  560. // can be reused along with the decoder.
  561. // io.WriterTo is also supported by the returned ReadCloser.
  562. func (d *Decoder) IOReadCloser() io.ReadCloser {
  563. return closeWrapper{d: d}
  564. }
  565. // closeWrapper wraps a function call as a closer.
  566. type closeWrapper struct {
  567. d *Decoder
  568. }
  569. // WriteTo forwards WriteTo calls to the decoder.
  570. func (c closeWrapper) WriteTo(w io.Writer) (n int64, err error) {
  571. return c.d.WriteTo(w)
  572. }
  573. // Read forwards read calls to the decoder.
  574. func (c closeWrapper) Read(p []byte) (n int, err error) {
  575. return c.d.Read(p)
  576. }
  577. // Close closes the decoder.
  578. func (c closeWrapper) Close() error {
  579. c.d.Close()
  580. return nil
  581. }
  582. type decodeOutput struct {
  583. d *blockDec
  584. b []byte
  585. err error
  586. }
  587. func (d *Decoder) startSyncDecoder(r io.Reader) error {
  588. d.frame.history.reset()
  589. d.syncStream.br = readerWrapper{r: r}
  590. d.syncStream.inFrame = false
  591. d.syncStream.enabled = true
  592. d.syncStream.decodedFrame = 0
  593. return nil
  594. }
  595. // Create Decoder:
  596. // ASYNC:
  597. // Spawn 3 go routines.
  598. // 0: Read frames and decode block literals.
  599. // 1: Decode sequences.
  600. // 2: Execute sequences, send to output.
  601. func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output chan decodeOutput) {
  602. defer d.streamWg.Done()
  603. br := readerWrapper{r: r}
  604. var seqDecode = make(chan *blockDec, d.o.concurrent)
  605. var seqExecute = make(chan *blockDec, d.o.concurrent)
  606. // Async 1: Decode sequences...
  607. go func() {
  608. var hist history
  609. var hasErr bool
  610. for block := range seqDecode {
  611. if hasErr {
  612. if block != nil {
  613. seqExecute <- block
  614. }
  615. continue
  616. }
  617. if block.async.newHist != nil {
  618. if debugDecoder {
  619. println("Async 1: new history, recent:", block.async.newHist.recentOffsets)
  620. }
  621. hist.decoders = block.async.newHist.decoders
  622. hist.recentOffsets = block.async.newHist.recentOffsets
  623. hist.windowSize = block.async.newHist.windowSize
  624. if block.async.newHist.dict != nil {
  625. hist.setDict(block.async.newHist.dict)
  626. }
  627. }
  628. if block.err != nil || block.Type != blockTypeCompressed {
  629. hasErr = block.err != nil
  630. seqExecute <- block
  631. continue
  632. }
  633. hist.decoders.literals = block.async.literals
  634. block.err = block.prepareSequences(block.async.seqData, &hist)
  635. if debugDecoder && block.err != nil {
  636. println("prepareSequences returned:", block.err)
  637. }
  638. hasErr = block.err != nil
  639. if block.err == nil {
  640. block.err = block.decodeSequences(&hist)
  641. if debugDecoder && block.err != nil {
  642. println("decodeSequences returned:", block.err)
  643. }
  644. hasErr = block.err != nil
  645. // block.async.sequence = hist.decoders.seq[:hist.decoders.nSeqs]
  646. block.async.seqSize = hist.decoders.seqSize
  647. }
  648. seqExecute <- block
  649. }
  650. close(seqExecute)
  651. }()
  652. var wg sync.WaitGroup
  653. wg.Add(1)
  654. // Async 3: Execute sequences...
  655. frameHistCache := d.frame.history.b
  656. go func() {
  657. var hist history
  658. var decodedFrame uint64
  659. var fcs uint64
  660. var hasErr bool
  661. for block := range seqExecute {
  662. out := decodeOutput{err: block.err, d: block}
  663. if block.err != nil || hasErr {
  664. hasErr = true
  665. output <- out
  666. continue
  667. }
  668. if block.async.newHist != nil {
  669. if debugDecoder {
  670. println("Async 2: new history")
  671. }
  672. hist.windowSize = block.async.newHist.windowSize
  673. hist.allocFrameBuffer = block.async.newHist.allocFrameBuffer
  674. if block.async.newHist.dict != nil {
  675. hist.setDict(block.async.newHist.dict)
  676. }
  677. if cap(hist.b) < hist.allocFrameBuffer {
  678. if cap(frameHistCache) >= hist.allocFrameBuffer {
  679. hist.b = frameHistCache
  680. } else {
  681. hist.b = make([]byte, 0, hist.allocFrameBuffer)
  682. println("Alloc history sized", hist.allocFrameBuffer)
  683. }
  684. }
  685. hist.b = hist.b[:0]
  686. fcs = block.async.fcs
  687. decodedFrame = 0
  688. }
  689. do := decodeOutput{err: block.err, d: block}
  690. switch block.Type {
  691. case blockTypeRLE:
  692. if debugDecoder {
  693. println("add rle block length:", block.RLESize)
  694. }
  695. if cap(block.dst) < int(block.RLESize) {
  696. if block.lowMem {
  697. block.dst = make([]byte, block.RLESize)
  698. } else {
  699. block.dst = make([]byte, maxBlockSize)
  700. }
  701. }
  702. block.dst = block.dst[:block.RLESize]
  703. v := block.data[0]
  704. for i := range block.dst {
  705. block.dst[i] = v
  706. }
  707. hist.append(block.dst)
  708. do.b = block.dst
  709. case blockTypeRaw:
  710. if debugDecoder {
  711. println("add raw block length:", len(block.data))
  712. }
  713. hist.append(block.data)
  714. do.b = block.data
  715. case blockTypeCompressed:
  716. if debugDecoder {
  717. println("execute with history length:", len(hist.b), "window:", hist.windowSize)
  718. }
  719. hist.decoders.seqSize = block.async.seqSize
  720. hist.decoders.literals = block.async.literals
  721. do.err = block.executeSequences(&hist)
  722. hasErr = do.err != nil
  723. if debugDecoder && hasErr {
  724. println("executeSequences returned:", do.err)
  725. }
  726. do.b = block.dst
  727. }
  728. if !hasErr {
  729. decodedFrame += uint64(len(do.b))
  730. if decodedFrame > fcs {
  731. println("fcs exceeded", block.Last, fcs, decodedFrame)
  732. do.err = ErrFrameSizeExceeded
  733. hasErr = true
  734. } else if block.Last && fcs != fcsUnknown && decodedFrame != fcs {
  735. do.err = ErrFrameSizeMismatch
  736. hasErr = true
  737. } else {
  738. if debugDecoder {
  739. println("fcs ok", block.Last, fcs, decodedFrame)
  740. }
  741. }
  742. }
  743. output <- do
  744. }
  745. close(output)
  746. frameHistCache = hist.b
  747. wg.Done()
  748. if debugDecoder {
  749. println("decoder goroutines finished")
  750. }
  751. }()
  752. decodeStream:
  753. for {
  754. var hist history
  755. var hasErr bool
  756. decodeBlock := func(block *blockDec) {
  757. if hasErr {
  758. if block != nil {
  759. seqDecode <- block
  760. }
  761. return
  762. }
  763. if block.err != nil || block.Type != blockTypeCompressed {
  764. hasErr = block.err != nil
  765. seqDecode <- block
  766. return
  767. }
  768. remain, err := block.decodeLiterals(block.data, &hist)
  769. block.err = err
  770. hasErr = block.err != nil
  771. if err == nil {
  772. block.async.literals = hist.decoders.literals
  773. block.async.seqData = remain
  774. } else if debugDecoder {
  775. println("decodeLiterals error:", err)
  776. }
  777. seqDecode <- block
  778. }
  779. frame := d.frame
  780. if debugDecoder {
  781. println("New frame...")
  782. }
  783. var historySent bool
  784. frame.history.reset()
  785. err := frame.reset(&br)
  786. if debugDecoder && err != nil {
  787. println("Frame decoder returned", err)
  788. }
  789. if err == nil && frame.DictionaryID != nil {
  790. dict, ok := d.dicts[*frame.DictionaryID]
  791. if !ok {
  792. err = ErrUnknownDictionary
  793. } else {
  794. frame.history.setDict(&dict)
  795. }
  796. }
  797. if err == nil && d.frame.WindowSize > d.o.maxWindowSize {
  798. if debugDecoder {
  799. println("decoder size exceeded, fws:", d.frame.WindowSize, "> mws:", d.o.maxWindowSize)
  800. }
  801. err = ErrDecoderSizeExceeded
  802. }
  803. if err != nil {
  804. select {
  805. case <-ctx.Done():
  806. case dec := <-d.decoders:
  807. dec.sendErr(err)
  808. decodeBlock(dec)
  809. }
  810. break decodeStream
  811. }
  812. // Go through all blocks of the frame.
  813. for {
  814. var dec *blockDec
  815. select {
  816. case <-ctx.Done():
  817. break decodeStream
  818. case dec = <-d.decoders:
  819. // Once we have a decoder, we MUST return it.
  820. }
  821. err := frame.next(dec)
  822. if !historySent {
  823. h := frame.history
  824. if debugDecoder {
  825. println("Alloc History:", h.allocFrameBuffer)
  826. }
  827. hist.reset()
  828. if h.dict != nil {
  829. hist.setDict(h.dict)
  830. }
  831. dec.async.newHist = &h
  832. dec.async.fcs = frame.FrameContentSize
  833. historySent = true
  834. } else {
  835. dec.async.newHist = nil
  836. }
  837. if debugDecoder && err != nil {
  838. println("next block returned error:", err)
  839. }
  840. dec.err = err
  841. dec.checkCRC = nil
  842. if dec.Last && frame.HasCheckSum && err == nil {
  843. crc, err := frame.rawInput.readSmall(4)
  844. if err != nil {
  845. println("CRC missing?", err)
  846. dec.err = err
  847. }
  848. var tmp [4]byte
  849. copy(tmp[:], crc)
  850. dec.checkCRC = tmp[:]
  851. if debugDecoder {
  852. println("found crc to check:", dec.checkCRC)
  853. }
  854. }
  855. err = dec.err
  856. last := dec.Last
  857. decodeBlock(dec)
  858. if err != nil {
  859. break decodeStream
  860. }
  861. if last {
  862. break
  863. }
  864. }
  865. }
  866. close(seqDecode)
  867. wg.Wait()
  868. d.frame.history.b = frameHistCache
  869. }