Skip to content

Commit

Permalink
fix(cursor): return NotEnoughData
Browse files Browse the repository at this point in the history
It fixes the regression introduced in #169.
Previously, the cursor checked if unhandled bytes were left.
Return this behaviour to detect schema mismatch.

Closes #185
  • Loading branch information
loyd committed Dec 8, 2024
1 parent f3771b0 commit 955e819
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 3 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- [Variant data type](https://clickhouse.com/docs/en/sql-reference/data-types/variant) support ([#170]).

### Fixed
- query/cursor: return `NotEnoughData` if a row is unparsed when the stream ends ([#185]).

[#170]: https://github.com/ClickHouse/clickhouse-rs/pull/170
[#185]: https://github.com/ClickHouse/clickhouse-rs/pull/185

## [0.13.1] - 2024-10-21
### Added
Expand Down
5 changes: 5 additions & 0 deletions src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ impl<T> RowCursor<T> {

match self.raw.next().await? {
Some(chunk) => self.bytes.extend(chunk),
None if self.bytes.remaining() > 0 => {
// If some data is left, we have an incomplete row in the buffer.
// This is usually a schema mismatch on the client side.
return Err(Error::NotEnoughData);
}
None => return Ok(None),
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/rowbinary/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ use std::{convert::TryFrom, mem, str};

use crate::error::{Error, Result};
use bytes::Buf;
use serde::de::{EnumAccess, VariantAccess};
use serde::{
de::{DeserializeSeed, Deserializer, SeqAccess, Visitor},
de::{DeserializeSeed, Deserializer, EnumAccess, SeqAccess, VariantAccess, Visitor},
Deserialize,
};

Expand Down
41 changes: 40 additions & 1 deletion tests/it/cursor_error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use clickhouse::{Client, Compression};
use serde::Deserialize;

use clickhouse::{error::Error, Client, Compression, Row};

#[tokio::test]
async fn deferred() {
Expand Down Expand Up @@ -96,3 +98,40 @@ async fn deferred_lz4() {
assert_ne!(i, 0); // we're interested only in errors during processing
assert!(err.to_string().contains("TIMEOUT_EXCEEDED"));
}

// See #185.
#[tokio::test]
async fn invalid_schema() {
#[derive(Debug, Row, Deserialize)]
#[allow(dead_code)]
struct MyRow {
no: u32,
dec: Option<String>, // valid schema: u64-based types
}

let client = prepare_database!();

client
.query(
"CREATE TABLE test(no UInt32, dec Nullable(Decimal64(4)))
ENGINE = MergeTree
ORDER BY no",
)
.execute()
.await
.unwrap();

client
.query("INSERT INTO test VALUES (1, 1.1), (2, 2.2), (3, 3.3)")
.execute()
.await
.unwrap();

let err = client
.query("SELECT ?fields FROM test")
.fetch_all::<MyRow>()
.await
.unwrap_err();

assert!(matches!(err, Error::NotEnoughData));
}

0 comments on commit 955e819

Please sign in to comment.