Skip to content

Commit

Permalink
feat(query): add async retry transform (#15647)
Browse files Browse the repository at this point in the history
* feat(query): add async retry transform

* feat(query): add async retry transform

* fix(query): fix lint

* fix(query): fix lint
  • Loading branch information
sundy-li authored Jun 3, 2024
1 parent 6fdec82 commit 01cf07d
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/query/expression/src/utils/udf_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use crate::DataSchema;
const UDF_TCP_KEEP_ALIVE_SEC: u64 = 30;
const UDF_HTTP2_KEEP_ALIVE_INTERVAL_SEC: u64 = 60;
const UDF_KEEP_ALIVE_TIMEOUT_SEC: u64 = 20;
// 4MB by default, we use 16G
// max_encoding_message_size is usize::max by default
const MAX_DECODING_MESSAGE_SIZE: usize = 16 * 1024 * 1024 * 1024;

#[derive(Debug, Clone)]
Expand Down
1 change: 1 addition & 0 deletions src/query/pipeline/transforms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ async-trait = { workspace = true }
jsonb = { workspace = true }
match-template = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
typetag = { workspace = true }

[dev-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod transform_dummy;
mod transform_multi_sort_merge;
mod transform_sort_merge_base;

mod transform_retry_async;
mod transform_sort_merge;
mod transform_sort_merge_limit;
pub mod transform_sort_partial;
Expand All @@ -38,6 +39,7 @@ pub use transform_blocking::*;
pub use transform_compact::*;
pub use transform_dummy::*;
pub use transform_multi_sort_merge::try_add_multi_sort_merge;
pub use transform_retry_async::*;
pub use transform_sort_merge::sort_merge;
pub use transform_sort_merge::*;
pub use transform_sort_merge_base::*;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_exception::Result;
use databend_common_expression::DataBlock;

use super::AsyncTransform;

pub trait AsyncRetry: AsyncTransform {
fn retry_on(&self, err: &databend_common_exception::ErrorCode) -> bool;
fn retry_strategy(&self) -> RetryStrategy;
}

#[derive(Clone)]
pub struct RetryStrategy {
pub retry_times: usize,
pub retry_sleep_duration: Option<tokio::time::Duration>,
}

pub struct AsyncRetryWrapper<T: AsyncRetry + 'static> {
t: T,
}

impl<T: AsyncRetry + 'static> AsyncRetryWrapper<T> {
pub fn create(inner: T) -> Self {
Self { t: inner }
}
}

#[async_trait::async_trait]
impl<T: AsyncRetry + 'static> AsyncTransform for AsyncRetryWrapper<T> {
const NAME: &'static str = T::NAME;

async fn transform(&mut self, data: DataBlock) -> Result<DataBlock> {
let strategy = self.t.retry_strategy();
for _ in 0..strategy.retry_times {
match self.t.transform(data.clone()).await {
Ok(v) => return Ok(v),
Err(e) => {
if !self.t.retry_on(&e) {
return Err(e);
}
if let Some(duration) = strategy.retry_sleep_duration {
tokio::time::sleep(duration).await;
}
}
}
}
self.t.transform(data.clone()).await
}

fn name(&self) -> String {
Self::NAME.to_string()
}

async fn on_start(&mut self) -> Result<()> {
self.t.on_start().await
}

async fn on_finish(&mut self) -> Result<()> {
self.t.on_finish().await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ use databend_common_expression::DataBlock;
use databend_common_expression::DataField;
use databend_common_expression::DataSchema;
use databend_common_expression::FunctionContext;
use databend_common_pipeline_transforms::processors::AsyncRetry;
use databend_common_pipeline_transforms::processors::AsyncRetryWrapper;
use databend_common_pipeline_transforms::processors::AsyncTransform;
use databend_common_pipeline_transforms::processors::AsyncTransformer;
use databend_common_pipeline_transforms::processors::RetryStrategy;
use databend_common_sql::executor::physical_plans::UdfFunctionDesc;

use crate::pipelines::processors::InputPort;
Expand All @@ -44,10 +47,22 @@ impl TransformUdfServer {
input: Arc<InputPort>,
output: Arc<OutputPort>,
) -> Result<Box<dyn Processor>> {
Ok(AsyncTransformer::create(input, output, Self {
func_ctx,
funcs,
}))
let s = Self { func_ctx, funcs };
let retry_wrapper = AsyncRetryWrapper::create(s);
Ok(AsyncTransformer::create(input, output, retry_wrapper))
}
}

impl AsyncRetry for TransformUdfServer {
fn retry_on(&self, _err: &databend_common_exception::ErrorCode) -> bool {
true
}

fn retry_strategy(&self) -> RetryStrategy {
RetryStrategy {
retry_times: 64,
retry_sleep_duration: Some(tokio::time::Duration::from_millis(500)),
}
}
}

Expand Down
10 changes: 10 additions & 0 deletions tests/sqllogictests/suites/udf_server/udf_server_test.test
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ DROP FUNCTION IF EXISTS bool_select;
statement ok
DROP FUNCTION IF EXISTS gcd;

statement ok
DROP FUNCTION IF EXISTS gcd_error;

statement ok
DROP FUNCTION IF EXISTS decimal_div;

Expand Down Expand Up @@ -86,6 +89,9 @@ CREATE OR REPLACE FUNCTION bool_select (BOOLEAN, BIGINT, BIGINT) RETURNS BIGINT
statement ok
CREATE OR REPLACE FUNCTION gcd (INT, INT) RETURNS INT LANGUAGE python HANDLER = 'gcd' ADDRESS = 'http://0.0.0.0:8815';

statement ok
CREATE OR REPLACE FUNCTION gcd_error (INT, INT) RETURNS INT LANGUAGE python HANDLER = 'gcd_error' ADDRESS = 'http://0.0.0.0:8815';

statement ok
CREATE OR REPLACE FUNCTION split_and_join (VARCHAR, VARCHAR, VARCHAR) RETURNS VARCHAR LANGUAGE python HANDLER = 'split_and_join' ADDRESS = 'http://0.0.0.0:8815';

Expand Down Expand Up @@ -177,6 +183,10 @@ SELECT gcd(a,b) d from (select number + 1 a, a * 2 b from numbers(3) where numb
2
3

query I
SELECT sum(gcd_error(a,b)) from (select number + 1 a, a * 2 b from numbers(3000))
----
4501500

statement ok
create or replace table gcd_target(id int);
Expand Down
17 changes: 17 additions & 0 deletions tests/udf/udf_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@ def gcd(x: int, y: int) -> int:
(x, y) = (y, x % y)
return x

gcd_error_cnt = 0
@udf(
name="gcd_error",
input_types=["INT", "INT"],
result_type="INT",
skip_null=True,
)
def gcd_error(x: int, y: int) -> int:
global gcd_error_cnt
if y % 2 == 0 and gcd_error_cnt <= 3:
gcd_error_cnt += 1
raise ValueError("gcd_error")
while y != 0:
(x, y) = (y, x % y)
return x


@udf(input_types=["VARCHAR", "VARCHAR", "VARCHAR"], result_type="VARCHAR")
def split_and_join(s: str, split_s: str, join_s: str) -> str:
Expand Down Expand Up @@ -310,6 +326,7 @@ def wait_concurrent(x):
udf_server.add_function(binary_reverse)
udf_server.add_function(bool_select)
udf_server.add_function(gcd)
udf_server.add_function(gcd_error)
udf_server.add_function(split_and_join)
udf_server.add_function(decimal_div)
udf_server.add_function(hex_to_dec)
Expand Down

0 comments on commit 01cf07d

Please sign in to comment.