Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): add async retry transform #15647

Merged
merged 7 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/query/expression/src/utils/udf_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use crate::DataSchema;
const UDF_TCP_KEEP_ALIVE_SEC: u64 = 30;
const UDF_HTTP2_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
const UDF_KEEP_ALIVE_TIMEOUT_SEC: u64 = 20;
// 4MB by default, we use 16G
// max_encoding_message_size is usize::max by default
const MAX_DECODING_MESSAGE_SIZE: usize = 16 * 1024 * 1024 * 1024;

#[derive(Debug, Clone)]
Expand Down
1 change: 1 addition & 0 deletions src/query/pipeline/transforms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ async-trait = { workspace = true }
jsonb = { workspace = true }
match-template = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
typetag = { workspace = true }

[dev-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod transform_dummy;
mod transform_multi_sort_merge;
mod transform_sort_merge_base;

mod transform_retry_async;
mod transform_sort_merge;
mod transform_sort_merge_limit;
pub mod transform_sort_partial;
Expand All @@ -38,6 +39,7 @@ pub use transform_blocking::*;
pub use transform_compact::*;
pub use transform_dummy::*;
pub use transform_multi_sort_merge::try_add_multi_sort_merge;
pub use transform_retry_async::*;
pub use transform_sort_merge::sort_merge;
pub use transform_sort_merge::*;
pub use transform_sort_merge_base::*;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_exception::Result;
use databend_common_expression::DataBlock;

use super::AsyncTransform;

pub trait AsyncRetry: AsyncTransform {
fn retry_on(&self, err: &databend_common_exception::ErrorCode) -> bool;
fn retry_strategy(&self) -> RetryStrategy;
}

#[derive(Clone)]
pub struct RetryStrategy {
pub retry_times: usize,
pub retry_sleep_duration: Option<tokio::time::Duration>,
}

pub struct AsyncRetryWrapper<T: AsyncRetry + 'static> {
t: T,
}

impl<T: AsyncRetry + 'static> AsyncRetryWrapper<T> {
pub fn create(inner: T) -> Self {
Self { t: inner }
}
}

#[async_trait::async_trait]
impl<T: AsyncRetry + 'static> AsyncTransform for AsyncRetryWrapper<T> {
const NAME: &'static str = T::NAME;

async fn transform(&mut self, data: DataBlock) -> Result<DataBlock> {
let strategy = self.t.retry_strategy();
for _ in 0..strategy.retry_times {
match self.t.transform(data.clone()).await {
Ok(v) => return Ok(v),
Err(e) => {
if !self.t.retry_on(&e) {
return Err(e);
}
if let Some(duration) = strategy.retry_sleep_duration {
tokio::time::sleep(duration).await;
}
}
}
}
self.t.transform(data.clone()).await
}

fn name(&self) -> String {
Self::NAME.to_string()
}

async fn on_start(&mut self) -> Result<()> {
self.t.on_start().await
}

async fn on_finish(&mut self) -> Result<()> {
self.t.on_finish().await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ use databend_common_expression::DataBlock;
use databend_common_expression::DataField;
use databend_common_expression::DataSchema;
use databend_common_expression::FunctionContext;
use databend_common_pipeline_transforms::processors::AsyncRetry;
use databend_common_pipeline_transforms::processors::AsyncRetryWrapper;
use databend_common_pipeline_transforms::processors::AsyncTransform;
use databend_common_pipeline_transforms::processors::AsyncTransformer;
use databend_common_pipeline_transforms::processors::RetryStrategy;
use databend_common_sql::executor::physical_plans::UdfFunctionDesc;

use crate::pipelines::processors::InputPort;
Expand All @@ -44,10 +47,22 @@ impl TransformUdfServer {
input: Arc<InputPort>,
output: Arc<OutputPort>,
) -> Result<Box<dyn Processor>> {
Ok(AsyncTransformer::create(input, output, Self {
func_ctx,
funcs,
}))
let s = Self { func_ctx, funcs };
let retry_wrapper = AsyncRetryWrapper::create(s);
Ok(AsyncTransformer::create(input, output, retry_wrapper))
}
}

impl AsyncRetry for TransformUdfServer {
fn retry_on(&self, _err: &databend_common_exception::ErrorCode) -> bool {
true
}

fn retry_strategy(&self) -> RetryStrategy {
RetryStrategy {
retry_times: 64,
retry_sleep_duration: Some(tokio::time::Duration::from_millis(500)),
}
}
}

Expand Down
10 changes: 10 additions & 0 deletions tests/sqllogictests/suites/udf_server/udf_server_test.test
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ DROP FUNCTION IF EXISTS bool_select;
statement ok
DROP FUNCTION IF EXISTS gcd;

statement ok
DROP FUNCTION IF EXISTS gcd_error;

statement ok
DROP FUNCTION IF EXISTS decimal_div;

Expand Down Expand Up @@ -86,6 +89,9 @@ CREATE OR REPLACE FUNCTION bool_select (BOOLEAN, BIGINT, BIGINT) RETURNS BIGINT
statement ok
CREATE OR REPLACE FUNCTION gcd (INT, INT) RETURNS INT LANGUAGE python HANDLER = 'gcd' ADDRESS = 'http://0.0.0.0:8815';

statement ok
CREATE OR REPLACE FUNCTION gcd_error (INT, INT) RETURNS INT LANGUAGE python HANDLER = 'gcd_error' ADDRESS = 'http://0.0.0.0:8815';

statement ok
CREATE OR REPLACE FUNCTION split_and_join (VARCHAR, VARCHAR, VARCHAR) RETURNS VARCHAR LANGUAGE python HANDLER = 'split_and_join' ADDRESS = 'http://0.0.0.0:8815';

Expand Down Expand Up @@ -177,6 +183,10 @@ SELECT gcd(a,b) d from (select number + 1 a, a * 2 b from numbers(3) where numb
2
3

query I
SELECT sum(gcd_error(a,b)) from (select number + 1 a, a * 2 b from numbers(3000))
----
4501500

statement ok
create or replace table gcd_target(id int);
Expand Down
17 changes: 17 additions & 0 deletions tests/udf/udf_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@ def gcd(x: int, y: int) -> int:
(x, y) = (y, x % y)
return x

gcd_error_cnt = 0
@udf(
name="gcd_error",
input_types=["INT", "INT"],
result_type="INT",
skip_null=True,
)
def gcd_error(x: int, y: int) -> int:
global gcd_error_cnt
if y % 2 == 0 and gcd_error_cnt <= 3:
gcd_error_cnt += 1
raise ValueError("gcd_error")
while y != 0:
(x, y) = (y, x % y)
return x


@udf(input_types=["VARCHAR", "VARCHAR", "VARCHAR"], result_type="VARCHAR")
def split_and_join(s: str, split_s: str, join_s: str) -> str:
Expand Down Expand Up @@ -310,6 +326,7 @@ def wait_concurrent(x):
udf_server.add_function(binary_reverse)
udf_server.add_function(bool_select)
udf_server.add_function(gcd)
udf_server.add_function(gcd_error)
udf_server.add_function(split_and_join)
udf_server.add_function(decimal_div)
udf_server.add_function(hex_to_dec)
Expand Down
Loading