Skip to content

Commit

Permalink
api: add Insert::witten_bytes function
Browse files Browse the repository at this point in the history
  • Loading branch information
mmcgee-jump committed Dec 13, 2024
1 parent 2803c8e commit d9ebfcb
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
13 changes: 11 additions & 2 deletions src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const_assert!(BUFFER_SIZE.is_power_of_two()); // to use the whole buffer's capac
pub struct Insert<T> {
state: InsertState,
buffer: BytesMut,
written_bytes: usize,
#[cfg(feature = "lz4")]
compression: Compression,
send_timeout: Option<Duration>,
Expand Down Expand Up @@ -137,6 +138,7 @@ impl<T> Insert<T> {
sql,
},
buffer: BytesMut::with_capacity(BUFFER_SIZE),
written_bytes: 0,
#[cfg(feature = "lz4")]
compression: client.compression,
send_timeout: None,
Expand Down Expand Up @@ -222,7 +224,7 @@ impl<T> Insert<T> {
}

#[inline(always)]
pub(crate) fn do_write(&mut self, row: &T) -> Result<usize>
pub(crate) fn do_write(&mut self, row: &T) -> Result<()>
where
T: Serialize,
{
Expand All @@ -240,7 +242,8 @@ impl<T> Insert<T> {
self.abort();
}

result.and(Ok(written))
self.written_bytes += written;
result.and(Ok(()))
}

/// Ends `INSERT`, the server starts processing the data.
Expand All @@ -256,6 +259,12 @@ impl<T> Insert<T> {
self.state.terminated();
self.wait_handle().await
}

/// Returns the number of serialized bytes that have been written because of
/// write operations.
pub fn witten_bytes(&self) -> usize {
self.written_bytes
}

async fn send_chunk(&mut self) -> Result<()> {
debug_assert!(matches!(self.state, InsertState::Active { .. }));
Expand Down
4 changes: 2 additions & 2 deletions src/inserter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ where
}

match self.insert.as_mut().unwrap().do_write(row) {
Ok(bytes) => {
self.pending.bytes += bytes as u64;
Ok(()) => {
self.pending_bytes = self.insert.written_bytes() as u64;
self.pending.rows += 1;

if !self.in_transaction {
Expand Down

0 comments on commit d9ebfcb

Please sign in to comment.