diff --git a/src/streaming_decoder.rs b/src/streaming_decoder.rs index d033fbca..f345a897 100644 --- a/src/streaming_decoder.rs +++ b/src/streaming_decoder.rs @@ -1,3 +1,5 @@ +use core::borrow::BorrowMut; + use crate::frame_decoder::{BlockDecodingStrategy, FrameDecoder, FrameDecoderError}; use crate::io::{Error, ErrorKind, Read}; @@ -6,22 +8,26 @@ use crate::io::{Error, ErrorKind, Read}; /// /// The lower level FrameDecoder by comparison allows for finer grained control but need sto have it's decode_blocks method called continously /// to decode the zstd-frame. -pub struct StreamingDecoder { - pub decoder: FrameDecoder, +pub struct StreamingDecoder> { + pub decoder: DEC, source: READ, } -impl StreamingDecoder { - pub fn new(mut source: READ) -> Result, FrameDecoderError> { - let mut decoder = FrameDecoder::new(); - decoder.init(&mut source)?; +impl> StreamingDecoder { + pub fn new_with_decoder( + mut source: READ, + mut decoder: DEC, + ) -> Result, FrameDecoderError> { + decoder.borrow_mut().init(&mut source)?; Ok(StreamingDecoder { decoder, source }) } +} - pub fn new_with_decoder( +impl StreamingDecoder { + pub fn new( mut source: READ, - mut decoder: FrameDecoder, - ) -> Result, FrameDecoderError> { + ) -> Result, FrameDecoderError> { + let mut decoder = FrameDecoder::new(); decoder.init(&mut source)?; Ok(StreamingDecoder { decoder, source }) } @@ -31,9 +37,10 @@ impl StreamingDecoder { } } -impl Read for StreamingDecoder { +impl> Read for StreamingDecoder { fn read(&mut self, buf: &mut [u8]) -> Result { - if self.decoder.is_finished() && self.decoder.can_collect() == 0 { + let decoder = self.decoder.borrow_mut(); + if decoder.is_finished() && decoder.can_collect() == 0 { //No more bytes can ever be decoded return Ok(0); } @@ -43,10 +50,10 @@ impl Read for StreamingDecoder { // So we need to call this until we can actually collect enough bytes // TODO add BlockDecodingStrategy::UntilCollectable(usize) that pushes this logic into the decode_blocks function - while self.decoder.can_collect() < buf.len() && !self.decoder.is_finished() { + while decoder.can_collect() < buf.len() && !decoder.is_finished() { //More bytes can be decoded - let additional_bytes_needed = buf.len() - self.decoder.can_collect(); - match self.decoder.decode_blocks( + let additional_bytes_needed = buf.len() - decoder.can_collect(); + match decoder.decode_blocks( &mut self.source, BlockDecodingStrategy::UptoBytes(additional_bytes_needed), ) { @@ -61,6 +68,6 @@ impl Read for StreamingDecoder { } } - self.decoder.read(buf) + decoder.read(buf) } }