From cdb2e2506d82d7905c8cee34bece43d54e31a9db Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Mon, 27 May 2024 21:22:36 +0800 Subject: [PATCH] refactor: refactor merge into optimizer and fix unexpected distribution plan (#15507) * refactor: refactor merge into optimizer and fix unexpected distribution plan * fix get row id * fix * fix * save * fix * fix pipeline --- .../src/interpreters/interpreter_explain.rs | 22 ++- .../src/interpreters/interpreter_factory.rs | 8 +- .../interpreters/interpreter_merge_into.rs | 50 ++--- .../pipelines/builders/builder_merge_into.rs | 30 +-- .../src/schedulers/fragments/fragmenter.rs | 4 - src/query/sql/src/executor/format.rs | 49 ++++- .../physical_plans/physical_merge_into.rs | 1 + .../sql/src/planner/binder/merge_into.rs | 1 + .../distributed/distributed_merge.rs | 54 ++---- .../sql/src/planner/optimizer/optimizer.rs | 181 ++++-------------- src/query/sql/src/planner/optimizer/s_expr.rs | 7 + src/query/sql/src/planner/plans/merge_into.rs | 1 + .../merge_into/mutator/matched_mutator.rs | 35 ++-- ...distributed_merge_into_without_enable.test | 9 + ...34_pr13848_without_distributed_enable.test | 9 + ...e_pipeline_without_distributed_enable.test | 11 +- ...merge_into_without_distributed_enable.test | 27 +++ ...merge_into_without_distributed_enable.test | 9 + .../merge_into_non_equal_distributed.test | 102 +++++----- 19 files changed, 292 insertions(+), 318 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_explain.rs b/src/query/service/src/interpreters/interpreter_explain.rs index a292262af5cd..5e23c961e1ee 100644 --- a/src/query/service/src/interpreters/interpreter_explain.rs +++ b/src/query/service/src/interpreters/interpreter_explain.rs @@ -30,6 +30,7 @@ use databend_common_pipeline_core::processors::PlanProfile; use databend_common_sql::binder::ExplainConfig; use databend_common_sql::optimizer::ColumnSet; use databend_common_sql::plans::FunctionCall; +use databend_common_sql::plans::MergeInto; use databend_common_sql::plans::UpdatePlan; use databend_common_sql::BindContext; use databend_common_sql::MetadataRef; @@ -40,6 +41,7 @@ use databend_common_users::UserApiProvider; use super::InsertMultiTableInterpreter; use super::InterpreterFactory; use super::UpdateInterpreter; +use crate::interpreters::interpreter_merge_into::MergeIntoInterpreter; use crate::interpreters::Interpreter; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; @@ -191,7 +193,7 @@ impl Interpreter for ExplainInterpreter { // todo:(JackTan25), we need to make all execute2() just do `build pipeline` work, // don't take real actions. for now we fix #13657 like below. let pipeline = match &self.plan { - Plan::Query { .. } => { + Plan::Query { .. } | Plan::MergeInto(_) => { let interpter = InterpreterFactory::get(self.ctx.clone(), &self.plan).await?; interpter.execute2().await? @@ -216,6 +218,7 @@ impl Interpreter for ExplainInterpreter { ) .await? } + Plan::MergeInto(merge_into) => self.explain_merge_fragments(merge_into).await?, Plan::Update(update) => self.explain_update_fragments(update.as_ref()).await?, _ => { return Err(ErrorCode::Unimplemented("Unsupported EXPLAIN statement")); @@ -563,4 +566,21 @@ impl ExplainInterpreter { } Ok(vec![DataBlock::concat(&result)?]) } + + async fn explain_merge_fragments(&self, merge_into: &MergeInto) -> Result> { + let interpreter = MergeIntoInterpreter::try_create(self.ctx.clone(), merge_into.clone())?; + let (plan, _) = interpreter.build_physical_plan().await?; + let root_fragment = Fragmenter::try_create(self.ctx.clone())?.build_fragment(&plan)?; + + let mut fragments_actions = QueryFragmentsActions::create(self.ctx.clone()); + root_fragment.get_actions(self.ctx.clone(), &mut fragments_actions)?; + + let display_string = fragments_actions + .display_indent(&merge_into.meta_data) + .to_string(); + + let line_split_result = display_string.lines().collect::>(); + let formatted_plan = StringType::from_data(line_split_result); + Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])]) + } } diff --git a/src/query/service/src/interpreters/interpreter_factory.rs b/src/query/service/src/interpreters/interpreter_factory.rs index d5e2494292e3..3cb94513e079 100644 --- a/src/query/service/src/interpreters/interpreter_factory.rs +++ b/src/query/service/src/interpreters/interpreter_factory.rs @@ -339,9 +339,11 @@ impl InterpreterFactory { Plan::Insert(insert) => InsertInterpreter::try_create(ctx, *insert.clone()), Plan::Replace(replace) => ReplaceInterpreter::try_create(ctx, *replace.clone()), - Plan::MergeInto(merge_into) => { - MergeIntoInterpreter::try_create(ctx, *merge_into.clone()) - } + Plan::MergeInto(merge_into) => Ok(Arc::new(MergeIntoInterpreter::try_create( + ctx, + *merge_into.clone(), + )?)), + Plan::Delete(delete) => Ok(Arc::new(DeleteInterpreter::try_create( ctx, *delete.clone(), diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index b10d0fe80b11..5548cdb75a7c 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -45,7 +45,6 @@ use databend_common_sql::executor::PhysicalPlan; use databend_common_sql::executor::PhysicalPlanBuilder; use databend_common_sql::plans; use databend_common_sql::plans::MergeInto as MergePlan; -use databend_common_sql::plans::RelOperator; use databend_common_sql::IndexType; use databend_common_sql::ScalarExpr; use databend_common_sql::TypeCheck; @@ -60,7 +59,6 @@ use itertools::Itertools; use crate::interpreters::common::dml_build_update_stream_req; use crate::interpreters::HookOperator; use crate::interpreters::Interpreter; -use crate::interpreters::InterpreterPtr; use crate::pipelines::PipelineBuildResult; use crate::schedulers::build_query_pipeline_without_render_result_set; use crate::sessions::QueryContext; @@ -74,8 +72,8 @@ pub struct MergeIntoInterpreter { } impl MergeIntoInterpreter { - pub fn try_create(ctx: Arc, plan: MergePlan) -> Result { - Ok(Arc::new(MergeIntoInterpreter { ctx, plan })) + pub fn try_create(ctx: Arc, plan: MergePlan) -> Result { + Ok(MergeIntoInterpreter { ctx, plan }) } } @@ -125,7 +123,7 @@ impl Interpreter for MergeIntoInterpreter { } impl MergeIntoInterpreter { - async fn build_physical_plan(&self) -> Result<(PhysicalPlan, TableInfo)> { + pub async fn build_physical_plan(&self) -> Result<(PhysicalPlan, TableInfo)> { let MergePlan { bind_context, input, @@ -145,8 +143,10 @@ impl MergeIntoInterpreter { split_idx, row_id_index, can_try_update_column_only, + enable_right_broadcast, .. } = &self.plan; + let enable_right_broadcast = *enable_right_broadcast; let mut columns_set = columns_set.clone(); let table = self.ctx.get_table(catalog, database, table_name).await?; let fuse_table = table.as_any().downcast_ref::().ok_or_else(|| { @@ -211,16 +211,8 @@ impl MergeIntoInterpreter { let table_name = table_name.clone(); let input = input.clone(); - // we need to extract join plan, but we need to give this exchange - // back at last. - let (input, extract_exchange) = if let RelOperator::Exchange(_) = input.plan() { - (Box::new(input.child(0)?.clone()), true) - } else { - (input, false) - }; - let mut builder = PhysicalPlanBuilder::new(meta_data.clone(), self.ctx.clone(), false); - let mut join_input = builder.build(&input, *columns_set.clone()).await?; + let join_input = builder.build(&input, *columns_set.clone()).await?; // find row_id column index let join_output_schema = join_input.output_schema()?; @@ -265,7 +257,7 @@ impl MergeIntoInterpreter { } } - if *distributed && !*change_join_order { + if enable_right_broadcast { row_number_idx = Some(join_output_schema.index_of(ROW_NUMBER_COL_NAME)?); } @@ -276,7 +268,7 @@ impl MergeIntoInterpreter { )); } - if *distributed && row_number_idx.is_none() && !*change_join_order { + if enable_right_broadcast && row_number_idx.is_none() { return Err(ErrorCode::InvalidRowIdIndex( "can't get internal row_number_idx when running merge into", )); @@ -285,17 +277,6 @@ impl MergeIntoInterpreter { let table_info = fuse_table.get_table_info().clone(); let catalog_ = self.ctx.get_catalog(catalog).await?; - if !*distributed && extract_exchange { - join_input = PhysicalPlan::Exchange(Exchange { - plan_id: 0, - input: Box::new(join_input), - kind: FragmentKind::Merge, - keys: vec![], - allow_adjust_parallelism: true, - ignore_exchange: false, - }); - }; - // transform unmatched for insert // reference to func `build_eval_scalar` // (DataSchemaRef, Option, Vec,Vec) => (source_schema, condition, value_exprs) @@ -432,6 +413,7 @@ impl MergeIntoInterpreter { can_try_update_column_only: *can_try_update_column_only, plan_id: u32::MAX, merge_into_split_idx, + enable_right_broadcast, })) } else { let merge_append = PhysicalPlan::MergeInto(Box::new(MergeInto { @@ -444,16 +426,17 @@ impl MergeIntoInterpreter { row_id_idx, segments: segments.clone(), distributed: true, - output_schema: match *change_join_order { - false => DataSchemaRef::new(DataSchema::new(vec![ - join_output_schema.fields[row_number_idx.unwrap()].clone(), - ])), - true => DataSchemaRef::new(DataSchema::new(vec![DataField::new( + output_schema: if let Some(row_number_idx) = row_number_idx { + DataSchemaRef::new(DataSchema::new(vec![ + join_output_schema.fields[row_number_idx].clone(), + ])) + } else { + DataSchemaRef::new(DataSchema::new(vec![DataField::new( ROW_ID_COL_NAME, databend_common_expression::types::DataType::Number( databend_common_expression::types::NumberDataType::UInt64, ), - )])), + )])) }, merge_type: merge_type.clone(), change_join_order: *change_join_order, @@ -461,6 +444,7 @@ impl MergeIntoInterpreter { can_try_update_column_only: *can_try_update_column_only, plan_id: u32::MAX, merge_into_split_idx, + enable_right_broadcast, })); // if change_join_order = true, it means the target is build side, // in this way, we will do matched operation and not matched operation diff --git a/src/query/service/src/pipelines/builders/builder_merge_into.rs b/src/query/service/src/pipelines/builders/builder_merge_into.rs index 9a952f926ac9..ff7db2510b75 100644 --- a/src/query/service/src/pipelines/builders/builder_merge_into.rs +++ b/src/query/service/src/pipelines/builders/builder_merge_into.rs @@ -143,7 +143,7 @@ impl PipelineBuilder { } assert!(self.join_state.is_some()); assert!(self.merge_into_probe_data_fields.is_some()); - + self.main_pipeline.resize(1, false)?; let join_state = self.join_state.clone().unwrap(); // split row_number and log // output_port_row_number @@ -291,6 +291,7 @@ impl PipelineBuilder { // we will receive MutationLogs only without rowids. return Ok(()); } + self.main_pipeline.resize(1, false)?; // we will receive MutationLogs and rowids. So we should apply // rowids firstly and then send all mutation logs to commit sink. // we need to spilt rowid and mutationlogs, and we can get pipeitems: @@ -366,9 +367,10 @@ impl PipelineBuilder { change_join_order, can_try_update_column_only, merge_into_split_idx, + enable_right_broadcast, .. } = merge_into; - + let enable_right_broadcast = *enable_right_broadcast; self.build_pipeline(input)?; self.main_pipeline @@ -457,10 +459,8 @@ impl PipelineBuilder { } if need_unmatch { - // distributed: false, standalone mode, we need to add insert processor - // (distributed,change join order):(true,true) target is build side, we - // need to support insert in local node. - if !*distributed || *change_join_order { + // If merge into doesn't contain right broadcast join, execute insert in local. + if !enable_right_broadcast { let merge_into_not_matched_processor = MergeIntoNotMatchedProcessor::create( unmatched.clone(), input.output_schema()?, @@ -613,7 +613,7 @@ impl PipelineBuilder { } } else if need_match && need_unmatch { // remove first row_id port and last row_number_port - if !*change_join_order { + if enable_right_broadcast { self.main_pipeline.output_len() - 2 } else { // remove first row_id port @@ -624,7 +624,7 @@ impl PipelineBuilder { self.main_pipeline.output_len() - 1 } else { // there are only row_number - if !*change_join_order { + if enable_right_broadcast { 0 } else { // unmatched prot @@ -659,7 +659,7 @@ impl PipelineBuilder { // receive row_id builder.add_items_prepend(vec![create_dummy_item()]); } - if need_unmatch && !*change_join_order { + if enable_right_broadcast { // receive row_number builder.add_items(vec![create_dummy_item()]); } @@ -721,7 +721,7 @@ impl PipelineBuilder { } // need to receive row_number, we should give a dummy item here. - if *distributed && need_unmatch && !*change_join_order { + if enable_right_broadcast { builder.add_items(vec![create_dummy_item()]); } self.main_pipeline.add_pipe(builder.finalize()); @@ -773,7 +773,7 @@ impl PipelineBuilder { } // need to receive row_number, we should give a dummy item here. - if *distributed && need_unmatch && !*change_join_order { + if enable_right_broadcast { builder.add_items(vec![create_dummy_item()]); } self.main_pipeline.add_pipe(builder.finalize()); @@ -817,7 +817,7 @@ impl PipelineBuilder { } // receive row_number - if *distributed && need_unmatch && !*change_join_order { + if enable_right_broadcast { pipe_items.push(create_dummy_item()); } @@ -853,7 +853,7 @@ impl PipelineBuilder { } // with row_number - if *distributed && need_unmatch && !change_join_order { + if enable_right_broadcast { ranges.push(vec![self.main_pipeline.output_len() - 1]); } @@ -877,7 +877,7 @@ impl PipelineBuilder { vec.push(serialize_segment_transform.into_pipe_item()); } - if need_unmatch && !*change_join_order { + if enable_right_broadcast { vec.push(create_dummy_item()) } vec @@ -911,7 +911,7 @@ impl PipelineBuilder { )); // accumulate row_number - if *distributed && need_unmatch && !*change_join_order { + if enable_right_broadcast { let pipe_items = if need_match { vec![ create_dummy_item(), diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs index b647457b71b3..3c157dd62964 100644 --- a/src/query/service/src/schedulers/fragments/fragmenter.rs +++ b/src/query/service/src/schedulers/fragments/fragmenter.rs @@ -162,9 +162,6 @@ impl PhysicalPlanReplacer for Fragmenter { fn replace_merge_into(&mut self, plan: &MergeInto) -> Result { let input = self.replace(&plan.input)?; - if !plan.change_join_order { - self.state = State::SelectLeaf; - } Ok(PhysicalPlan::MergeInto(Box::new(MergeInto { input: Box::new(input), ..plan.clone() @@ -232,7 +229,6 @@ impl PhysicalPlanReplacer for Fragmenter { // Consume current fragments to prevent them being consumed by `probe_input`. fragments.append(&mut self.fragments); let probe_input = self.replace(plan.probe.as_ref())?; - fragments.append(&mut self.fragments); self.fragments = fragments; diff --git a/src/query/sql/src/executor/format.rs b/src/query/sql/src/executor/format.rs index ec7466ce7b80..eca2d21eb683 100644 --- a/src/query/sql/src/executor/format.rs +++ b/src/query/sql/src/executor/format.rs @@ -27,6 +27,9 @@ use itertools::Itertools; use super::physical_plans::AsyncFunction; use super::physical_plans::CacheScan; use super::physical_plans::ExpressionScan; +use super::physical_plans::MergeInto; +use super::physical_plans::MergeIntoAddRowNumber; +use super::physical_plans::MergeIntoAppendNotMatched; use crate::executor::explain::PlanStatsInfo; use crate::executor::physical_plans::AggregateExpand; use crate::executor::physical_plans::AggregateFinal; @@ -241,12 +244,12 @@ fn to_format_tree( Ok(FormatTreeNode::new("ReplaceDeduplicate".to_string())) } PhysicalPlan::ReplaceInto(_) => Ok(FormatTreeNode::new("Replace".to_string())), - PhysicalPlan::MergeInto(_) => Ok(FormatTreeNode::new("MergeInto".to_string())), - PhysicalPlan::MergeIntoAddRowNumber(_) => { - Ok(FormatTreeNode::new("MergeIntoAddRowNumber".to_string())) + PhysicalPlan::MergeInto(plan) => format_merge_into(plan, metadata, profs), + PhysicalPlan::MergeIntoAddRowNumber(plan) => { + format_merge_into_add_row_number(plan, metadata, profs) } - PhysicalPlan::MergeIntoAppendNotMatched(_) => { - Ok(FormatTreeNode::new("MergeIntoAppendNotMatched".to_string())) + PhysicalPlan::MergeIntoAppendNotMatched(plan) => { + format_merge_into_append_not_matched(plan, metadata, profs) } PhysicalPlan::CteScan(plan) => cte_scan_to_format_tree(plan), PhysicalPlan::MaterializedCte(plan) => { @@ -363,6 +366,42 @@ fn append_profile_info( } } +fn format_merge_into( + plan: &MergeInto, + metadata: &Metadata, + profs: &HashMap, +) -> Result> { + let child = to_format_tree(&plan.input, metadata, profs)?; + Ok(FormatTreeNode::with_children( + "MergeInto".to_string(), + vec![child], + )) +} + +fn format_merge_into_add_row_number( + plan: &MergeIntoAddRowNumber, + metadata: &Metadata, + profs: &HashMap, +) -> Result> { + let child = to_format_tree(&plan.input, metadata, profs)?; + Ok(FormatTreeNode::with_children( + "MergeIntoAddRowNumber".to_string(), + vec![child], + )) +} + +fn format_merge_into_append_not_matched( + plan: &MergeIntoAppendNotMatched, + metadata: &Metadata, + profs: &HashMap, +) -> Result> { + let child = to_format_tree(&plan.input, metadata, profs)?; + Ok(FormatTreeNode::with_children( + "MergeIntoAppendNotMatched".to_string(), + vec![child], + )) +} + fn copy_into_table(plan: &CopyIntoTable) -> Result> { Ok(FormatTreeNode::new(format!( "CopyIntoTable: {}", diff --git a/src/query/sql/src/executor/physical_plans/physical_merge_into.rs b/src/query/sql/src/executor/physical_plans/physical_merge_into.rs index c920d568a202..c6858bc8d0f3 100644 --- a/src/query/sql/src/executor/physical_plans/physical_merge_into.rs +++ b/src/query/sql/src/executor/physical_plans/physical_merge_into.rs @@ -48,6 +48,7 @@ pub struct MergeInto { pub target_build_optimization: bool, pub can_try_update_column_only: bool, pub merge_into_split_idx: Option, + pub enable_right_broadcast: bool, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] diff --git a/src/query/sql/src/planner/binder/merge_into.rs b/src/query/sql/src/planner/binder/merge_into.rs index a5229ea3dfa0..91cb7a59671e 100644 --- a/src/query/sql/src/planner/binder/merge_into.rs +++ b/src/query/sql/src/planner/binder/merge_into.rs @@ -474,6 +474,7 @@ impl Binder { row_id_index, split_idx, can_try_update_column_only: self.can_try_update_column_only(&matched_clauses), + enable_right_broadcast: false, }) } diff --git a/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs b/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs index 3ad4c230d99e..3097ef86c214 100644 --- a/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs +++ b/src/query/sql/src/planner/optimizer/distributed/distributed_merge.rs @@ -34,24 +34,13 @@ impl MergeSourceOptimizer { } } - // rewrite plan: - // 1. if use right join, and the default - // distributed right join will use shuffle hash join, but its - // performance is very slow and poor. So we need to rewrite it. - // In new distributed plan, target partitions will be shuffled - // to query nodes, and source will be broadcasted to all nodes - // and build hashtable. It means all nodes hold the same hashtable. - // 2. if use left outer join, we will just use distributed optimizer's - // result, so we won't arrive here, because the it doesn't take place - // change join order. pub fn optimize(&self, s_expr: &SExpr) -> Result { - let join_s_expr = s_expr.child(0)?; - let left_exchange = join_s_expr.child(0)?; - assert!(left_exchange.children.len() == 1); + let left_exchange = s_expr.child(0)?; + assert_eq!(left_exchange.children.len(), 1); let left_exchange_input = left_exchange.child(0)?; - let right_exchange = join_s_expr.child(1)?; - assert!(right_exchange.children.len() == 1); + let right_exchange = s_expr.child(1)?; + assert_eq!(right_exchange.children.len(), 1); let right_exchange_input = right_exchange.child(0)?; // source is build side @@ -66,18 +55,16 @@ impl MergeSourceOptimizer { )), ]; - let mut join: Join = join_s_expr.plan().clone().try_into()?; + let mut join: Join = s_expr.plan().clone().try_into()?; join.need_hold_hash_table = true; - let mut join_s_expr = join_s_expr.replace_plan(Arc::new(RelOperator::Join(join))); + let mut join_s_expr = s_expr.replace_plan(Arc::new(RelOperator::Join(join))); join_s_expr = join_s_expr.replace_children(new_join_children); - Ok(s_expr.replace_children(vec![Arc::new(join_s_expr)])) + Ok(join_s_expr) } // for right outer join (source as build) fn merge_source_matcher() -> Matcher { // Input: - // Exchange(Merge) - // | // Join // / \ // / \ @@ -86,8 +73,6 @@ impl MergeSourceOptimizer { // * * // source is build we will get below: // Output: - // Exchange - // | // Join // / \ // / \ @@ -97,20 +82,17 @@ impl MergeSourceOptimizer { // | | // * * Matcher::MatchOp { - op_type: RelOp::Exchange, - children: vec![Matcher::MatchOp { - op_type: RelOp::Join, - children: vec![ - Matcher::MatchOp { - op_type: RelOp::Exchange, - children: vec![Matcher::Leaf], - }, - Matcher::MatchOp { - op_type: RelOp::Exchange, - children: vec![Matcher::Leaf], - }, - ], - }], + op_type: RelOp::Join, + children: vec![ + Matcher::MatchOp { + op_type: RelOp::Exchange, + children: vec![Matcher::Leaf], + }, + Matcher::MatchOp { + op_type: RelOp::Exchange, + children: vec![Matcher::Leaf], + }, + ], } } } diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index 2506f353b1d7..ab43ddf9151e 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; use std::sync::Arc; use async_recursion::async_recursion; @@ -46,8 +45,8 @@ use crate::optimizer::RuleID; use crate::optimizer::SExpr; use crate::optimizer::DEFAULT_REWRITE_RULES; use crate::plans::CopyIntoLocationPlan; -use crate::plans::Exchange; use crate::plans::Join; +use crate::plans::JoinType; use crate::plans::MergeInto; use crate::plans::Plan; use crate::plans::RelOperator; @@ -234,7 +233,7 @@ pub async fn optimize(opt_ctx: OptimizerContext, plan: Plan) -> Result { } Ok(Plan::CopyIntoTable(plan)) } - Plan::MergeInto(plan) => optimize_merge_into(opt_ctx.clone(), plan).await, + Plan::MergeInto(plan) => optimize_merge_into(opt_ctx, plan).await, // distributed insert will be optimized in `physical_plan_builder` Plan::Insert(mut plan) => { @@ -421,57 +420,25 @@ async fn get_optimized_memo(opt_ctx: OptimizerContext, mut s_expr: SExpr) -> Res Ok(cascades.memo) } -async fn optimize_merge_into(opt_ctx: OptimizerContext, plan: Box) -> Result { - // optimize source :fix issue #13733 - // reason: if there is subquery,windowfunc exprs etc. see - // src/planner/semantic/lowering.rs `as_raw_expr()`, we will - // get dummy index. So we need to use optimizer to solve this. - let mut right_source = optimize_query(opt_ctx.clone(), plan.input.child(1)?.clone()).await?; - - // if it's not distributed execution, we should reserve - // exchange to merge source data. - if opt_ctx.enable_distributed_optimization - && opt_ctx - .table_ctx - .get_settings() - .get_enable_distributed_merge_into()? - { - // we need to remove exchange of right_source, because it's - // not an end query. - if let RelOperator::Exchange(_) = right_source.plan.as_ref() { - right_source = right_source.child(0)?.clone(); - } +async fn optimize_merge_into(mut opt_ctx: OptimizerContext, plan: Box) -> Result { + let enable_distributed_merge_into = opt_ctx + .table_ctx + .get_settings() + .get_enable_distributed_merge_into()?; + if opt_ctx.enable_distributed_optimization { + opt_ctx = opt_ctx.with_enable_distributed_optimization(enable_distributed_merge_into); } - // replace right source - let mut join_sexpr = plan.input.clone(); - let left_target = optimize_query(opt_ctx.clone(), join_sexpr.child(0)?.clone()).await?; - join_sexpr = - Box::new(join_sexpr.replace_children(vec![Arc::new(left_target), Arc::new(right_source)])); - - let join_op = Join::try_from(join_sexpr.plan().clone())?; - let non_equal_join = join_op.right_conditions.is_empty() && join_op.left_conditions.is_empty(); - // before, we think source table is always the small table. - // 1. for matched only, we use inner join - // 2. for insert only, we use right anti join - // 3. for full merge into, we use right outer join - // for now, let's import the statistic info to determine left join or right join - // we just do optimization for the top join (target and source),won't do recursive optimization. - let change_join_order = if opt_ctx.enable_merge_into_join_reorder { - let rule = RuleFactory::create_rule(RuleID::CommuteJoin, plan.meta_data.clone())?; - let mut state = TransformResult::new(); - // we will reorder the join order according to the cardinality of target and source. - rule.apply(&join_sexpr, &mut state)?; - assert!(state.results().len() <= 1); - // we need to check whether we do swap left and right. - if state.results().len() == 1 { - join_sexpr = Box::new(state.results()[0].clone()); - true - } else { - false - } - } else { - false - }; + let old_left_conditions = Join::try_from(plan.input.plan().clone())?.left_conditions; + let mut join_s_expr = optimize_query(opt_ctx.clone(), *plan.input.clone()).await?; + if let &RelOperator::Exchange(_) = join_s_expr.plan() { + join_s_expr = join_s_expr.child(0)?.clone(); + } + let left_conditions = Join::try_from(join_s_expr.plan().clone())?.left_conditions; + let mut change_join_order = false; + if old_left_conditions != left_conditions { + change_join_order = true; + } + let join_op = Join::try_from(join_s_expr.plan().clone())?; // we just support left join to use MergeIntoBlockInfoHashTable, we // don't support spill for now, and we need the matched clauses' count @@ -502,115 +469,35 @@ async fn optimize_merge_into(opt_ctx: OptimizerContext, plan: Box) -> }) } - // try to optimize distributed join, only if - // - distributed optimization is enabled - // - no local table scan - // - distributed merge-into is enabled - if opt_ctx.enable_distributed_optimization - && !contains_local_table_scan(&join_sexpr, &opt_ctx.metadata) - && opt_ctx - .table_ctx - .get_settings() - .get_enable_distributed_merge_into()? - { - // distributed execution stargeties: - // I. change join order is true, we use the `optimize_distributed_query`'s result. - // II. change join order is false and match_pattern and not enable spill and not non-equal-join, we use right outer join with rownumber distributed strategies. - // III otherwise, use `merge_into_join_sexpr` as standalone execution(so if change join order is false,but doesn't match_pattern, we don't support distributed,in fact. case I - // can take this at most time, if that's a hash shuffle, the I can take it. We think source is always very small). - // input is a Join_SExpr - let mut merge_into_join_sexpr = - optimize_distributed_query(opt_ctx.table_ctx.clone(), &join_sexpr)?; + if opt_ctx.enable_distributed_optimization { let merge_source_optimizer = MergeSourceOptimizer::create(); - // II. - // - join spilling is disabled - let (optimized_distributed_merge_into_join_sexpr, distributed) = if opt_ctx - .table_ctx - .get_settings() - .get_join_spilling_memory_ratio()? - == 0 - && !change_join_order + // Inner join shouldn't add `RowNumber` node. + let mut enable_right_broadcast = false; + if matches!(join_op.join_type, JoinType::RightAnti | JoinType::Right) && merge_source_optimizer .merge_source_matcher - .matches(&merge_into_join_sexpr) - && !non_equal_join + .matches(&join_s_expr) { - ( - merge_source_optimizer.optimize(&merge_into_join_sexpr)?, - true, - ) - } else if change_join_order { - // I - // we need to judge whether it'a broadcast join to support runtime filter. - merge_into_join_sexpr = try_to_change_as_broadcast_join( - merge_into_join_sexpr, - change_join_order, - opt_ctx.table_ctx.clone(), - plan.as_ref(), - false, // we will open it, but for now we don't support distributed - new_columns_set.as_mut(), - )?; - ( - merge_into_join_sexpr.clone(), - matches!( - merge_into_join_sexpr.plan.as_ref(), - RelOperator::Exchange(_) - ), - ) - } else { - // III. - (merge_into_join_sexpr.clone(), false) - }; - + // Todo(xudong): should consider the cost of shuffle and broadcast. + // Current behavior is to always use broadcast join.(source table is usually small) + join_s_expr = merge_source_optimizer.optimize(&join_s_expr)?; + enable_right_broadcast = true; + } + let distributed = !join_s_expr.has_merge_exchange(); Ok(Plan::MergeInto(Box::new(MergeInto { - input: Box::new(optimized_distributed_merge_into_join_sexpr), + input: Box::new(join_s_expr), distributed, change_join_order, columns_set: new_columns_set.clone(), + enable_right_broadcast, ..*plan }))) } else { Ok(Plan::MergeInto(Box::new(MergeInto { - input: join_sexpr, + input: Box::new(join_s_expr), change_join_order, columns_set: new_columns_set, ..*plan }))) } } - -fn try_to_change_as_broadcast_join( - merge_into_join_sexpr: SExpr, - _change_join_order: bool, - _table_ctx: Arc, - _plan: &MergeInto, - _only_one_matched_clause: bool, - _new_columns_set: &mut HashSet, -) -> Result { - if let RelOperator::Exchange(Exchange::Merge) = merge_into_join_sexpr.plan.as_ref() { - let right_exchange = merge_into_join_sexpr.child(0)?.child(1)?; - if let RelOperator::Exchange(Exchange::Broadcast) = right_exchange.plan.as_ref() { - let join: Join = merge_into_join_sexpr.child(0)?.plan().clone().try_into()?; - let join_s_expr = merge_into_join_sexpr - .child(0)? - .replace_plan(Arc::new(RelOperator::Join(join))); - // for now, when we use target table as build side and it's a broadcast join, - // we will use merge_into_block_info_hashtable to reduce i/o operations. - // Todo(JackTan25): we don't support in distributed mod for target build optimization for now. we will enable in next pr. - // if change_join_order - // && matches!(plan.merge_type, MergeIntoType::FullOperation) - // && only_one_matched_clause - // { - // remove rowid - // new_columns_set.remove(&plan.row_id_index); - // table_ctx.set_merge_into_join(MergeIntoJoin { - // merge_into_join_type: MergeIntoJoinType::Left, - // is_distributed: true, - // target_tbl_idx: plan.target_table_idx, - // }) - // } - return Ok(merge_into_join_sexpr.replace_children(vec![Arc::new(join_s_expr)])); - } - } - Ok(merge_into_join_sexpr) -} diff --git a/src/query/sql/src/planner/optimizer/s_expr.rs b/src/query/sql/src/planner/optimizer/s_expr.rs index 0f63a49a7a1f..0b7c74e2e27b 100644 --- a/src/query/sql/src/planner/optimizer/s_expr.rs +++ b/src/query/sql/src/planner/optimizer/s_expr.rs @@ -394,6 +394,13 @@ impl SExpr { .collect::>(); self.children = children; } + + pub fn has_merge_exchange(&self) -> bool { + if let RelOperator::Exchange(Exchange::Merge) = self.plan.as_ref() { + return true; + } + self.children.iter().any(|child| child.has_merge_exchange()) + } } fn find_subquery(rel_op: &RelOperator) -> bool { diff --git a/src/query/sql/src/planner/plans/merge_into.rs b/src/query/sql/src/planner/plans/merge_into.rs index 623f99a38598..707f7874b55f 100644 --- a/src/query/sql/src/planner/plans/merge_into.rs +++ b/src/query/sql/src/planner/plans/merge_into.rs @@ -76,6 +76,7 @@ pub struct MergeInto { // `update *`` or `update set t1.a = t2.a ...`, the right expr on the `=` must be only a column, // we don't support complex expressions. pub can_try_update_column_only: bool, + pub enable_right_broadcast: bool, } impl std::fmt::Debug for MergeInto { diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs index 32bbb4f4cfa0..3758a4629057 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs @@ -20,7 +20,6 @@ use std::time::Instant; use ahash::AHashMap; use databend_common_arrow::arrow::bitmap::MutableBitmap; -use databend_common_arrow::arrow::buffer::Buffer; use databend_common_base::base::tokio::sync::Semaphore; use databend_common_base::base::ProgressValues; use databend_common_base::runtime::GlobalIORuntime; @@ -34,9 +33,9 @@ use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; -use databend_common_expression::types::NumberColumn; +use databend_common_expression::types::DataType; +use databend_common_expression::types::NumberDataType; use databend_common_expression::BlockMetaInfoDowncast; -use databend_common_expression::Column; use databend_common_expression::DataBlock; use databend_common_metrics::storage::*; use databend_common_sql::StreamContext; @@ -182,12 +181,19 @@ impl MatchedAggregator { let start = Instant::now(); // data_block is from matched_split, so there is only one column. // that's row_id - let row_ids = get_row_id(&data_block, 0)?; + let row_id_col = data_block.get_by_offset(0); + debug_assert!( + row_id_col.data_type.remove_nullable() == DataType::Number(NumberDataType::UInt64) + ); + let row_ids = row_id_col + .value + .convert_to_full_column(&row_id_col.data_type, data_block.num_rows()); let row_id_kind = RowIdKind::downcast_ref_from(data_block.get_meta().unwrap()).unwrap(); match row_id_kind { RowIdKind::Update => { - for row_id in row_ids { - let (prefix, offset) = split_row_id(row_id); + for row_id in row_ids.iter() { + let (prefix, offset) = + split_row_id(row_id.as_number().unwrap().into_u_int64().unwrap()); if !self .block_mutation_row_offset .entry(prefix) @@ -202,8 +208,9 @@ impl MatchedAggregator { } } RowIdKind::Delete => { - for row_id in row_ids { - let (prefix, offset) = split_row_id(row_id); + for row_id in row_ids.iter() { + let (prefix, offset) = + split_row_id(row_id.as_number().unwrap().into_u_int64().unwrap()); let value = self.block_mutation_row_offset.get(&prefix); if value.is_none() { self.ctx.add_merge_status(MergeStatus { @@ -477,15 +484,3 @@ impl AggregationContext { Ok(Some(mutation)) } } - -pub(crate) fn get_row_id(data_block: &DataBlock, row_id_idx: usize) -> Result> { - let row_id_col = data_block.get_by_offset(row_id_idx); - match row_id_col.value.as_column() { - Some(Column::Nullable(boxed)) => match &boxed.column { - Column::Number(NumberColumn::UInt64(data)) => Ok(data.clone()), - _ => Err(ErrorCode::BadArguments("row id is not uint64")), - }, - Some(Column::Number(NumberColumn::UInt64(data))) => Ok(data.clone()), - _ => Err(ErrorCode::BadArguments("row id is not uint64")), - } -} diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0033_distributed_merge_into_without_enable.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0033_distributed_merge_into_without_enable.test index 7999d8cda0d1..f855da0d8a13 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0033_distributed_merge_into_without_enable.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0033_distributed_merge_into_without_enable.test @@ -1,3 +1,12 @@ +statement ok +drop database if exists db; + +statement ok +create database db; + +statement ok +use db; + ## we need to test without enable_distributed_merge_into in cluster environment. statement ok set enable_experimental_merge_into = 1; diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0034_pr13848_without_distributed_enable.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0034_pr13848_without_distributed_enable.test index bf24c8887fd8..3680a2300fcf 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0034_pr13848_without_distributed_enable.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0034_pr13848_without_distributed_enable.test @@ -1,3 +1,12 @@ +statement ok +drop database if exists db; + +statement ok +create database db; + +statement ok +use db; + ## test window,agg,join's correctess statement ok set enable_experimental_merge_into = 1; diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0035_merge_into_separate_pipeline_without_distributed_enable.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0035_merge_into_separate_pipeline_without_distributed_enable.test index b33c004a4c94..ce527c7ca714 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0035_merge_into_separate_pipeline_without_distributed_enable.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0035_merge_into_separate_pipeline_without_distributed_enable.test @@ -1,3 +1,12 @@ +statement ok +drop database if exists db; + +statement ok +create database db; + +statement ok +use db; + statement ok set enable_experimental_merge_into = 1; @@ -140,7 +149,7 @@ merge into t1_separate as t1 using (select * from t2_separate) as t2 on t1.a = t 2 4 query T -select count(*) from fuse_block('default','t1_separate'); +select count(*) from fuse_block('db','t1_separate'); ---- 1 diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test index e6dd6b1668e8..9a6f74340b39 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0036_merge_into_without_distributed_enable.test @@ -1,6 +1,15 @@ statement ok set enable_experimental_merge_into = 1; +statement ok +drop database if exists db; + +statement ok +create database db; + +statement ok +use db; + statement ok drop table if exists t1; @@ -476,6 +485,9 @@ drop table if exists salaries; statement ok CREATE TABLE employees (employee_id INT, employee_name VARCHAR(255),department VARCHAR(255)) cluster by(employee_id,employee_name); +statement ok +drop table if exists salaries; + statement ok CREATE TABLE salaries (employee_id INT,salary DECIMAL(10, 2)) cluster by(employee_id,salary); @@ -498,6 +510,9 @@ select * from salaries order by employee_id; 3 55000.00 4 55000.00 +statement ok +drop table if exists t1_target; + ## null cast bug fix statement ok drop table if exists t1_target; @@ -508,6 +523,9 @@ drop table if exists t2_source; statement ok create table t1_target(a int not null) cluster by(a); +statement ok +drop table if exists t2_source; + statement ok create table t2_source(a int not null) cluster by(a); @@ -528,6 +546,9 @@ select * from t1_target order by a; 1 2 +statement ok +drop table if exists cluster_target; + ## cluster table test statement ok drop table if exists cluster_target; @@ -538,6 +559,9 @@ drop table if exists cluster_source; statement ok create table cluster_target(a int,b string,c int) cluster by(a,b); +statement ok +drop table if exists cluster_source; + statement ok create table cluster_source(a int,b string,c int); @@ -588,6 +612,9 @@ create table source_test(a int,b string,delete_flag bool) cluster by(a,b); statement ok insert into source_test values(1,'d',true),(2,'e',true),(3,'f',false),(4,'e',true),(5,'f',false); +statement ok +drop stage if exists source_parquet; + statement ok create stage source_parquet file_format = (type = parquet); diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0037_target_build_merge_into_without_distributed_enable.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0037_target_build_merge_into_without_distributed_enable.test index 22be2a2a44fd..5ca50b6b4a3c 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0037_target_build_merge_into_without_distributed_enable.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0037_target_build_merge_into_without_distributed_enable.test @@ -1,3 +1,12 @@ +statement ok +drop database if exists db; + +statement ok +create database db; + +statement ok +use db; + statement ok set enable_experimental_merge_into = 1; diff --git a/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test b/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test index a71e2bed9df7..cb7316c3aa8a 100644 --- a/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test +++ b/tests/sqllogictests/suites/mode/cluster/merge_into_non_equal_distributed.test @@ -58,44 +58,42 @@ explain merge into t1 using t2 on t1.a < t2.a when matched then update * when no ---- MergeInto: target_table: default.default.t1 -├── distributed: false +├── distributed: true ├── target_build_optimization: false ├── can_try_update_column_only: true ├── matched update: [condition: None,update set a = a (#0)] └── unmatched insert: [condition: None,insert into (a) values(CAST(a (#0) AS Int32 NULL))] HashJoin -├── output columns: [t1.a (#1), t1._row_id (#2), t2.a (#0)] +├── output columns: [t1.a (#1), t1._row_id (#2), t2.a (#0), #_row_number] ├── join type: RIGHT OUTER ├── build keys: [] ├── probe keys: [] ├── filters: [t1.a (#1) < t2.a (#0)] ├── estimated rows: 15.00 ├── Exchange(Build) -│ ├── output columns: [t2.a (#0)] -│ ├── exchange type: Merge -│ └── TableScan -│ ├── table: default.default.t2 -│ ├── output columns: [a (#0)] -│ ├── read rows: 1 -│ ├── read size: < 1 KiB -│ ├── partitions total: 1 -│ ├── partitions scanned: 1 -│ ├── pruning stats: [segments: , blocks: ] -│ ├── push downs: [filters: [], limit: NONE] -│ └── estimated rows: 1.00 -└── Exchange(Probe) - ├── output columns: [t1.a (#1), t1._row_id (#2)] - ├── exchange type: Merge - └── TableScan - ├── table: default.default.t1 - ├── output columns: [a (#1), _row_id (#2)] - ├── read rows: 15 - ├── read size: < 1 KiB - ├── partitions total: 3 - ├── partitions scanned: 3 - ├── pruning stats: [segments: , blocks: ] - ├── push downs: [filters: [], limit: NONE] - └── estimated rows: 15.00 +│ ├── output columns: [t2.a (#0), #_row_number] +│ ├── exchange type: Broadcast +│ └── MergeIntoAddRowNumber +│ └── TableScan +│ ├── table: default.default.t2 +│ ├── output columns: [a (#0)] +│ ├── read rows: 1 +│ ├── read size: < 1 KiB +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── pruning stats: [segments: , blocks: ] +│ ├── push downs: [filters: [], limit: NONE] +│ └── estimated rows: 1.00 +└── TableScan(Probe) + ├── table: default.default.t1 + ├── output columns: [a (#1), _row_id (#2)] + ├── read rows: 15 + ├── read size: < 1 KiB + ├── partitions total: 3 + ├── partitions scanned: 3 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 15.00 query TT merge into t1 using t2 on t1.a < t2.a when matched then update * when not matched then insert *; @@ -157,43 +155,41 @@ explain merge into t1 using (select $1 as a from @ss) as t2 on t1.a = t2.a when ---- MergeInto: target_table: default.default.t1 -├── distributed: false +├── distributed: true ├── target_build_optimization: false ├── can_try_update_column_only: true ├── matched update: [condition: None,update set a = a (#0)] └── unmatched insert: [condition: None,insert into (a) values(CAST(a (#0) AS Int32 NULL))] HashJoin -├── output columns: [t1.a (#1), t1._row_id (#2), stage._$1 (#0)] +├── output columns: [t1.a (#1), t1._row_id (#2), stage._$1 (#0), #_row_number] ├── join type: RIGHT OUTER ├── build keys: [CAST(t2.a (#0) AS Int64 NULL)] ├── probe keys: [CAST(t1.a (#1) AS Int64 NULL)] ├── filters: [] ├── estimated rows: 0.00 ├── Exchange(Build) -│ ├── output columns: [stage._$1 (#0)] -│ ├── exchange type: Merge -│ └── TableScan -│ ├── table: default.system.stage -│ ├── output columns: [_$1 (#0)] -│ ├── read rows: 6 -│ ├── read size: < 1 KiB -│ ├── partitions total: 1 -│ ├── partitions scanned: 1 -│ ├── push downs: [filters: [], limit: NONE] -│ └── estimated rows: 0.00 -└── Exchange(Probe) - ├── output columns: [t1.a (#1), t1._row_id (#2)] - ├── exchange type: Merge - └── TableScan - ├── table: default.default.t1 - ├── output columns: [a (#1), _row_id (#2)] - ├── read rows: 2 - ├── read size: < 1 KiB - ├── partitions total: 1 - ├── partitions scanned: 1 - ├── pruning stats: [segments: , blocks: ] - ├── push downs: [filters: [], limit: NONE] - └── estimated rows: 2.00 +│ ├── output columns: [stage._$1 (#0), #_row_number] +│ ├── exchange type: Broadcast +│ └── MergeIntoAddRowNumber +│ └── TableScan +│ ├── table: default.system.stage +│ ├── output columns: [_$1 (#0)] +│ ├── read rows: 6 +│ ├── read size: < 1 KiB +│ ├── partitions total: 1 +│ ├── partitions scanned: 1 +│ ├── push downs: [filters: [], limit: NONE] +│ └── estimated rows: 0.00 +└── TableScan(Probe) + ├── table: default.default.t1 + ├── output columns: [a (#1), _row_id (#2)] + ├── read rows: 2 + ├── read size: < 1 KiB + ├── partitions total: 1 + ├── partitions scanned: 1 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [], limit: NONE] + └── estimated rows: 2.00 query TT merge into t1 using (select $1 as a from @ss) as t2 on t1.a = t2.a when matched then update * when not matched then insert *;