Skip to content

Commit

Permalink
refactor: refactor merge into optimizer and fix unexpected distributi…
Browse files Browse the repository at this point in the history
…on plan (#15507)

* refactor: refactor merge into optimizer and fix unexpected distribution plan

* fix get row id

* fix

* fix

* save

* fix

* fix pipeline
  • Loading branch information
xudong963 authored May 27, 2024
1 parent 8b615a1 commit cdb2e25
Show file tree
Hide file tree
Showing 19 changed files with 292 additions and 318 deletions.
22 changes: 21 additions & 1 deletion src/query/service/src/interpreters/interpreter_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use databend_common_pipeline_core::processors::PlanProfile;
use databend_common_sql::binder::ExplainConfig;
use databend_common_sql::optimizer::ColumnSet;
use databend_common_sql::plans::FunctionCall;
use databend_common_sql::plans::MergeInto;
use databend_common_sql::plans::UpdatePlan;
use databend_common_sql::BindContext;
use databend_common_sql::MetadataRef;
Expand All @@ -40,6 +41,7 @@ use databend_common_users::UserApiProvider;
use super::InsertMultiTableInterpreter;
use super::InterpreterFactory;
use super::UpdateInterpreter;
use crate::interpreters::interpreter_merge_into::MergeIntoInterpreter;
use crate::interpreters::Interpreter;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
Expand Down Expand Up @@ -191,7 +193,7 @@ impl Interpreter for ExplainInterpreter {
// todo:(JackTan25), we need to make all execute2() just do `build pipeline` work,
// don't take real actions. for now we fix #13657 like below.
let pipeline = match &self.plan {
Plan::Query { .. } => {
Plan::Query { .. } | Plan::MergeInto(_) => {
let interpter =
InterpreterFactory::get(self.ctx.clone(), &self.plan).await?;
interpter.execute2().await?
Expand All @@ -216,6 +218,7 @@ impl Interpreter for ExplainInterpreter {
)
.await?
}
Plan::MergeInto(merge_into) => self.explain_merge_fragments(merge_into).await?,
Plan::Update(update) => self.explain_update_fragments(update.as_ref()).await?,
_ => {
return Err(ErrorCode::Unimplemented("Unsupported EXPLAIN statement"));
Expand Down Expand Up @@ -563,4 +566,21 @@ impl ExplainInterpreter {
}
Ok(vec![DataBlock::concat(&result)?])
}

async fn explain_merge_fragments(&self, merge_into: &MergeInto) -> Result<Vec<DataBlock>> {
let interpreter = MergeIntoInterpreter::try_create(self.ctx.clone(), merge_into.clone())?;
let (plan, _) = interpreter.build_physical_plan().await?;
let root_fragment = Fragmenter::try_create(self.ctx.clone())?.build_fragment(&plan)?;

let mut fragments_actions = QueryFragmentsActions::create(self.ctx.clone());
root_fragment.get_actions(self.ctx.clone(), &mut fragments_actions)?;

let display_string = fragments_actions
.display_indent(&merge_into.meta_data)
.to_string();

let line_split_result = display_string.lines().collect::<Vec<_>>();
let formatted_plan = StringType::from_data(line_split_result);
Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])])
}
}
8 changes: 5 additions & 3 deletions src/query/service/src/interpreters/interpreter_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,11 @@ impl InterpreterFactory {
Plan::Insert(insert) => InsertInterpreter::try_create(ctx, *insert.clone()),

Plan::Replace(replace) => ReplaceInterpreter::try_create(ctx, *replace.clone()),
Plan::MergeInto(merge_into) => {
MergeIntoInterpreter::try_create(ctx, *merge_into.clone())
}
Plan::MergeInto(merge_into) => Ok(Arc::new(MergeIntoInterpreter::try_create(
ctx,
*merge_into.clone(),
)?)),

Plan::Delete(delete) => Ok(Arc::new(DeleteInterpreter::try_create(
ctx,
*delete.clone(),
Expand Down
50 changes: 17 additions & 33 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use databend_common_sql::executor::PhysicalPlan;
use databend_common_sql::executor::PhysicalPlanBuilder;
use databend_common_sql::plans;
use databend_common_sql::plans::MergeInto as MergePlan;
use databend_common_sql::plans::RelOperator;
use databend_common_sql::IndexType;
use databend_common_sql::ScalarExpr;
use databend_common_sql::TypeCheck;
Expand All @@ -60,7 +59,6 @@ use itertools::Itertools;
use crate::interpreters::common::dml_build_update_stream_req;
use crate::interpreters::HookOperator;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
Expand All @@ -74,8 +72,8 @@ pub struct MergeIntoInterpreter {
}

impl MergeIntoInterpreter {
pub fn try_create(ctx: Arc<QueryContext>, plan: MergePlan) -> Result<InterpreterPtr> {
Ok(Arc::new(MergeIntoInterpreter { ctx, plan }))
pub fn try_create(ctx: Arc<QueryContext>, plan: MergePlan) -> Result<MergeIntoInterpreter> {
Ok(MergeIntoInterpreter { ctx, plan })
}
}

Expand Down Expand Up @@ -125,7 +123,7 @@ impl Interpreter for MergeIntoInterpreter {
}

impl MergeIntoInterpreter {
async fn build_physical_plan(&self) -> Result<(PhysicalPlan, TableInfo)> {
pub async fn build_physical_plan(&self) -> Result<(PhysicalPlan, TableInfo)> {
let MergePlan {
bind_context,
input,
Expand All @@ -145,8 +143,10 @@ impl MergeIntoInterpreter {
split_idx,
row_id_index,
can_try_update_column_only,
enable_right_broadcast,
..
} = &self.plan;
let enable_right_broadcast = *enable_right_broadcast;
let mut columns_set = columns_set.clone();
let table = self.ctx.get_table(catalog, database, table_name).await?;
let fuse_table = table.as_any().downcast_ref::<FuseTable>().ok_or_else(|| {
Expand Down Expand Up @@ -211,16 +211,8 @@ impl MergeIntoInterpreter {
let table_name = table_name.clone();
let input = input.clone();

// we need to extract join plan, but we need to give this exchange
// back at last.
let (input, extract_exchange) = if let RelOperator::Exchange(_) = input.plan() {
(Box::new(input.child(0)?.clone()), true)
} else {
(input, false)
};

let mut builder = PhysicalPlanBuilder::new(meta_data.clone(), self.ctx.clone(), false);
let mut join_input = builder.build(&input, *columns_set.clone()).await?;
let join_input = builder.build(&input, *columns_set.clone()).await?;

// find row_id column index
let join_output_schema = join_input.output_schema()?;
Expand Down Expand Up @@ -265,7 +257,7 @@ impl MergeIntoInterpreter {
}
}

if *distributed && !*change_join_order {
if enable_right_broadcast {
row_number_idx = Some(join_output_schema.index_of(ROW_NUMBER_COL_NAME)?);
}

Expand All @@ -276,7 +268,7 @@ impl MergeIntoInterpreter {
));
}

if *distributed && row_number_idx.is_none() && !*change_join_order {
if enable_right_broadcast && row_number_idx.is_none() {
return Err(ErrorCode::InvalidRowIdIndex(
"can't get internal row_number_idx when running merge into",
));
Expand All @@ -285,17 +277,6 @@ impl MergeIntoInterpreter {
let table_info = fuse_table.get_table_info().clone();
let catalog_ = self.ctx.get_catalog(catalog).await?;

if !*distributed && extract_exchange {
join_input = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(join_input),
kind: FragmentKind::Merge,
keys: vec![],
allow_adjust_parallelism: true,
ignore_exchange: false,
});
};

// transform unmatched for insert
// reference to func `build_eval_scalar`
// (DataSchemaRef, Option<RemoteExpr>, Vec<RemoteExpr>,Vec<usize>) => (source_schema, condition, value_exprs)
Expand Down Expand Up @@ -432,6 +413,7 @@ impl MergeIntoInterpreter {
can_try_update_column_only: *can_try_update_column_only,
plan_id: u32::MAX,
merge_into_split_idx,
enable_right_broadcast,
}))
} else {
let merge_append = PhysicalPlan::MergeInto(Box::new(MergeInto {
Expand All @@ -444,23 +426,25 @@ impl MergeIntoInterpreter {
row_id_idx,
segments: segments.clone(),
distributed: true,
output_schema: match *change_join_order {
false => DataSchemaRef::new(DataSchema::new(vec![
join_output_schema.fields[row_number_idx.unwrap()].clone(),
])),
true => DataSchemaRef::new(DataSchema::new(vec![DataField::new(
output_schema: if let Some(row_number_idx) = row_number_idx {
DataSchemaRef::new(DataSchema::new(vec![
join_output_schema.fields[row_number_idx].clone(),
]))
} else {
DataSchemaRef::new(DataSchema::new(vec![DataField::new(
ROW_ID_COL_NAME,
databend_common_expression::types::DataType::Number(
databend_common_expression::types::NumberDataType::UInt64,
),
)])),
)]))
},
merge_type: merge_type.clone(),
change_join_order: *change_join_order,
target_build_optimization: false, // we don't support for distributed mode for now..
can_try_update_column_only: *can_try_update_column_only,
plan_id: u32::MAX,
merge_into_split_idx,
enable_right_broadcast,
}));
// if change_join_order = true, it means the target is build side,
// in this way, we will do matched operation and not matched operation
Expand Down
30 changes: 15 additions & 15 deletions src/query/service/src/pipelines/builders/builder_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl PipelineBuilder {
}
assert!(self.join_state.is_some());
assert!(self.merge_into_probe_data_fields.is_some());

self.main_pipeline.resize(1, false)?;
let join_state = self.join_state.clone().unwrap();
// split row_number and log
// output_port_row_number
Expand Down Expand Up @@ -291,6 +291,7 @@ impl PipelineBuilder {
// we will receive MutationLogs only without rowids.
return Ok(());
}
self.main_pipeline.resize(1, false)?;
// we will receive MutationLogs and rowids. So we should apply
// rowids firstly and then send all mutation logs to commit sink.
// we need to spilt rowid and mutationlogs, and we can get pipeitems:
Expand Down Expand Up @@ -366,9 +367,10 @@ impl PipelineBuilder {
change_join_order,
can_try_update_column_only,
merge_into_split_idx,
enable_right_broadcast,
..
} = merge_into;

let enable_right_broadcast = *enable_right_broadcast;
self.build_pipeline(input)?;

self.main_pipeline
Expand Down Expand Up @@ -457,10 +459,8 @@ impl PipelineBuilder {
}

if need_unmatch {
// distributed: false, standalone mode, we need to add insert processor
// (distributed,change join order):(true,true) target is build side, we
// need to support insert in local node.
if !*distributed || *change_join_order {
// If merge into doesn't contain right broadcast join, execute insert in local.
if !enable_right_broadcast {
let merge_into_not_matched_processor = MergeIntoNotMatchedProcessor::create(
unmatched.clone(),
input.output_schema()?,
Expand Down Expand Up @@ -613,7 +613,7 @@ impl PipelineBuilder {
}
} else if need_match && need_unmatch {
// remove first row_id port and last row_number_port
if !*change_join_order {
if enable_right_broadcast {
self.main_pipeline.output_len() - 2
} else {
// remove first row_id port
Expand All @@ -624,7 +624,7 @@ impl PipelineBuilder {
self.main_pipeline.output_len() - 1
} else {
// there are only row_number
if !*change_join_order {
if enable_right_broadcast {
0
} else {
// unmatched prot
Expand Down Expand Up @@ -659,7 +659,7 @@ impl PipelineBuilder {
// receive row_id
builder.add_items_prepend(vec![create_dummy_item()]);
}
if need_unmatch && !*change_join_order {
if enable_right_broadcast {
// receive row_number
builder.add_items(vec![create_dummy_item()]);
}
Expand Down Expand Up @@ -721,7 +721,7 @@ impl PipelineBuilder {
}

// need to receive row_number, we should give a dummy item here.
if *distributed && need_unmatch && !*change_join_order {
if enable_right_broadcast {
builder.add_items(vec![create_dummy_item()]);
}
self.main_pipeline.add_pipe(builder.finalize());
Expand Down Expand Up @@ -773,7 +773,7 @@ impl PipelineBuilder {
}

// need to receive row_number, we should give a dummy item here.
if *distributed && need_unmatch && !*change_join_order {
if enable_right_broadcast {
builder.add_items(vec![create_dummy_item()]);
}
self.main_pipeline.add_pipe(builder.finalize());
Expand Down Expand Up @@ -817,7 +817,7 @@ impl PipelineBuilder {
}

// receive row_number
if *distributed && need_unmatch && !*change_join_order {
if enable_right_broadcast {
pipe_items.push(create_dummy_item());
}

Expand Down Expand Up @@ -853,7 +853,7 @@ impl PipelineBuilder {
}

// with row_number
if *distributed && need_unmatch && !change_join_order {
if enable_right_broadcast {
ranges.push(vec![self.main_pipeline.output_len() - 1]);
}

Expand All @@ -877,7 +877,7 @@ impl PipelineBuilder {
vec.push(serialize_segment_transform.into_pipe_item());
}

if need_unmatch && !*change_join_order {
if enable_right_broadcast {
vec.push(create_dummy_item())
}
vec
Expand Down Expand Up @@ -911,7 +911,7 @@ impl PipelineBuilder {
));

// accumulate row_number
if *distributed && need_unmatch && !*change_join_order {
if enable_right_broadcast {
let pipe_items = if need_match {
vec![
create_dummy_item(),
Expand Down
4 changes: 0 additions & 4 deletions src/query/service/src/schedulers/fragments/fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,6 @@ impl PhysicalPlanReplacer for Fragmenter {

fn replace_merge_into(&mut self, plan: &MergeInto) -> Result<PhysicalPlan> {
let input = self.replace(&plan.input)?;
if !plan.change_join_order {
self.state = State::SelectLeaf;
}
Ok(PhysicalPlan::MergeInto(Box::new(MergeInto {
input: Box::new(input),
..plan.clone()
Expand Down Expand Up @@ -232,7 +229,6 @@ impl PhysicalPlanReplacer for Fragmenter {
// Consume current fragments to prevent them being consumed by `probe_input`.
fragments.append(&mut self.fragments);
let probe_input = self.replace(plan.probe.as_ref())?;

fragments.append(&mut self.fragments);
self.fragments = fragments;

Expand Down
Loading

0 comments on commit cdb2e25

Please sign in to comment.