From e6d6a0b8acb005bc39d0921a1241093c75850c3a Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Sun, 10 Nov 2024 14:27:42 +0800 Subject: [PATCH 1/4] feat(services/redis): add support of list operation --- core/Cargo.toml | 2 +- core/src/services/redis/backend.rs | 88 +++++++++++++++++++++++++++--- core/src/services/redis/core.rs | 13 +++++ core/src/services/redis/docs.md | 2 +- 4 files changed, 94 insertions(+), 11 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index 5ba079e964f0..86ef81840572 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -182,7 +182,7 @@ services-pcloud = [] services-persy = ["dep:persy", "internal-tokio-rt"] services-postgresql = ["dep:sqlx", "sqlx?/postgres"] services-redb = ["dep:redb", "internal-tokio-rt"] -services-redis = ["dep:redis", "dep:bb8", "redis?/tokio-rustls-comp"] +services-redis = ["dep:redis", "dep:bb8", "redis?/tokio-rustls-comp", "dep:ouroboros"] services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"] services-rocksdb = ["dep:rocksdb", "internal-tokio-rt"] services-s3 = [ diff --git a/core/src/services/redis/backend.rs b/core/src/services/redis/backend.rs index 19afe34c7655..5f906ec9862e 100644 --- a/core/src/services/redis/backend.rs +++ b/core/src/services/redis/backend.rs @@ -16,9 +16,13 @@ // under the License. use bb8::RunError; +use futures::Stream; +use futures::StreamExt; use http::Uri; +use ouroboros::self_referencing; use redis::cluster::ClusterClient; use redis::cluster::ClusterClientBuilder; +use redis::AsyncIter; use redis::Client; use redis::ConnectionAddr; use redis::ConnectionInfo; @@ -27,6 +31,9 @@ use redis::RedisConnectionInfo; use std::fmt::Debug; use std::fmt::Formatter; use std::path::PathBuf; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; use std::time::Duration; use tokio::sync::OnceCell; @@ -291,7 +298,23 @@ impl Debug for Adapter { impl Adapter { async fn conn(&self) -> Result> { - let pool = self + let pool = self.pool().await?; + Adapter::conn_from_pool(pool).await + } + + async fn conn_from_pool( + pool: &bb8::Pool, + ) -> Result> { + pool.get().await.map_err(|err| match err { + RunError::TimedOut => { + Error::new(ErrorKind::Unexpected, "get connection from pool failed").set_temporary() + } + RunError::User(err) => err, + }) + } + + async fn pool(&self) -> Result<&bb8::Pool> { + Ok(self .conn .get_or_try_init(|| async { bb8::Pool::builder() @@ -302,13 +325,7 @@ impl Adapter { .set_source(err) }) }) - .await?; - pool.get().await.map_err(|err| match err { - RunError::TimedOut => { - Error::new(ErrorKind::Unexpected, "get connection from pool failed").set_temporary() - } - RunError::User(err) => err, - }) + .await?) } fn get_redis_connection_manager(&self) -> RedisConnectionManager { @@ -326,8 +343,43 @@ impl Adapter { } } +#[self_referencing] +struct RedisAsyncConnIter<'a> { + conn: bb8::PooledConnection<'a, RedisConnectionManager>, + + #[borrows(mut conn)] + #[not_covariant] + iter: AsyncIter<'this, String>, +} + +#[self_referencing] +pub struct RedisScanner { + pool: bb8::Pool, + path: String, + + #[borrows(pool, path)] + #[not_covariant] + inner: RedisAsyncConnIter<'this>, +} + +unsafe impl Sync for RedisScanner {} + +impl Stream for RedisScanner { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.with_inner_mut(|s| s.with_iter_mut(|v| v.poll_next_unpin(cx).map(|v| v.map(Ok)))) + } +} + +impl kv::Scan for RedisScanner { + async fn next(&mut self) -> Result> { + ::next(self).await.transpose() + } +} + impl kv::Adapter for Adapter { - type Scanner = (); + type Scanner = RedisScanner; fn info(&self) -> kv::Info { kv::Info::new( @@ -336,6 +388,7 @@ impl kv::Adapter for Adapter { Capability { read: true, write: true, + list: true, ..Default::default() }, @@ -366,4 +419,21 @@ impl kv::Adapter for Adapter { conn.append(key, value).await?; Ok(()) } + + async fn scan(&self, path: &str) -> Result { + let pool = self.pool().await?.clone(); + + Ok( + RedisScanner::try_new_async_send(pool, path.to_string(), |pool, path| { + Box::pin(async { + let conn = Adapter::conn_from_pool(pool).await?; + Ok(RedisAsyncConnIter::try_new_async_send(conn, |conn| { + Box::pin(async { conn.scan(path).await }) + }) + .await?) + }) + }) + .await?, + ) + } } diff --git a/core/src/services/redis/core.rs b/core/src/services/redis/core.rs index 041ed87169b9..6c5ca8cad2ce 100644 --- a/core/src/services/redis/core.rs +++ b/core/src/services/redis/core.rs @@ -26,6 +26,7 @@ use redis::cluster::ClusterClient; use redis::cluster_async::ClusterConnection; use redis::from_redis_value; use redis::AsyncCommands; +use redis::AsyncIter; use redis::Client; use redis::RedisError; @@ -105,6 +106,18 @@ impl RedisConnection { } Ok(()) } + + pub async fn scan(&mut self, prefix: &str) -> crate::Result> { + let pattern = format!("{}*", prefix); + Ok(match self { + RedisConnection::Normal(ref mut conn) => { + conn.scan_match(pattern).await.map_err(format_redis_error)? + } + RedisConnection::Cluster(ref mut conn) => { + conn.scan_match(pattern).await.map_err(format_redis_error)? + } + }) + } } #[derive(Clone)] diff --git a/core/src/services/redis/docs.md b/core/src/services/redis/docs.md index 8d183ea74b8b..bea0ee0a1d80 100644 --- a/core/src/services/redis/docs.md +++ b/core/src/services/redis/docs.md @@ -9,7 +9,7 @@ This service can be used to: - [x] delete - [x] copy - [x] rename -- [ ] ~~list~~ +- [x] list - [ ] ~~presign~~ - [ ] blocking From f291fc8392b3f71f0242718cd0ab9555c67da9cf Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Sun, 10 Nov 2024 21:15:45 +0800 Subject: [PATCH 2/4] fix --- core/src/services/redis/backend.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/core/src/services/redis/backend.rs b/core/src/services/redis/backend.rs index 5f906ec9862e..62203c1640a6 100644 --- a/core/src/services/redis/backend.rs +++ b/core/src/services/redis/backend.rs @@ -314,8 +314,7 @@ impl Adapter { } async fn pool(&self) -> Result<&bb8::Pool> { - Ok(self - .conn + self.conn .get_or_try_init(|| async { bb8::Pool::builder() .build(self.get_redis_connection_manager()) @@ -325,7 +324,7 @@ impl Adapter { .set_source(err) }) }) - .await?) + .await } fn get_redis_connection_manager(&self) -> RedisConnectionManager { @@ -423,17 +422,15 @@ impl kv::Adapter for Adapter { async fn scan(&self, path: &str) -> Result { let pool = self.pool().await?.clone(); - Ok( - RedisScanner::try_new_async_send(pool, path.to_string(), |pool, path| { - Box::pin(async { - let conn = Adapter::conn_from_pool(pool).await?; - Ok(RedisAsyncConnIter::try_new_async_send(conn, |conn| { - Box::pin(async { conn.scan(path).await }) - }) - .await?) + RedisScanner::try_new_async_send(pool, path.to_string(), |pool, path| { + Box::pin(async { + let conn = Adapter::conn_from_pool(pool).await?; + RedisAsyncConnIter::try_new_async_send(conn, |conn| { + Box::pin(async { conn.scan(path).await }) }) + .await }) - .await?, - ) + }) + .await } } From c5a733cd50b4c2f478c321c25b6b2692f03d694a Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Mon, 11 Nov 2024 12:30:32 +0800 Subject: [PATCH 3/4] update kvrocks to avoid cursor incompetible --- fixtures/redis/docker-compose-kvrocks.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fixtures/redis/docker-compose-kvrocks.yml b/fixtures/redis/docker-compose-kvrocks.yml index 25e55a15f52f..f4e24d76a9de 100644 --- a/fixtures/redis/docker-compose-kvrocks.yml +++ b/fixtures/redis/docker-compose-kvrocks.yml @@ -19,6 +19,6 @@ version: '3.8' services: redis: - image: apache/kvrocks:2.5.1 + image: apache/kvrocks:2.10.1 ports: - '6379:6666' From 8762b149f711a68b8b982c9c1100840a1ea71dba Mon Sep 17 00:00:00 2001 From: PragmaTwice Date: Mon, 11 Nov 2024 13:17:47 +0800 Subject: [PATCH 4/4] disable list on cluster mode --- core/src/services/redis/backend.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/services/redis/backend.rs b/core/src/services/redis/backend.rs index 62203c1640a6..f5939d840b89 100644 --- a/core/src/services/redis/backend.rs +++ b/core/src/services/redis/backend.rs @@ -387,7 +387,12 @@ impl kv::Adapter for Adapter { Capability { read: true, write: true, - list: true, + // due to limitation of Redis itself, + // on cluster mode we cannot get full list of keys via SCAN, + // so here we disable it on cluster mode to avoid confusions. + // TODO: we can perform multiple SCAN on each cluster node + // and merge the result to simulate the behavior of list here. + list: self.cluster_client.is_none(), ..Default::default() },