Skip to content

Commit

Permalink
fix: merge into split (#15644)
Browse files Browse the repository at this point in the history
* fix: merge into split logic

for merge-into stmt with both matched and not-matched branch, right join
is used,
after got the joined result-set, a column of the result-set is chosen as
the "split" column,
i.e. for rows of the split column, those are nulls are recognized as
no-matched, and others as matched.

before this PR, an arbitrary column of target table is picked as the
"split" column, which is unsafe, since
the "arbitrary column" may have NULL values originally, which may lead
to incorrectly treat a matched row
as unmatched, and unexpected result (data duplication).

* disable target build

* adjust logic tests

* reabase & fix clippy
  • Loading branch information
dantengsky authored May 28, 2024
1 parent ae5b4f6 commit b3305ae
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 22 deletions.
2 changes: 2 additions & 0 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ impl MergeIntoInterpreter {
}
}

log::info!("target build optimization is {}", target_build_optimization);

let update_stream_meta = dml_build_update_stream_req(self.ctx.clone(), meta_data).await?;

let table_name = table_name.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ impl PipelineBuilder {
PhysicalPlan::TableScan(scan) => match scan.table_index {
None | Some(databend_common_sql::DUMMY_TABLE_INDEX) => (false, false),
Some(table_index) => match need_reserve_block_info(self.ctx.clone(), table_index) {
(true, is_distributed) => (true, is_distributed),
// due to issue https://github.com/datafuselabs/databend/issues/15643,
// target build optimization of merge-into is disabled

//(true, is_distributed) => (true, is_distributed),
(true, is_distributed) => (false, is_distributed),
_ => (false, false),
},
},
Expand Down
17 changes: 2 additions & 15 deletions src/query/sql/src/planner/binder/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use crate::IndexType;
use crate::ScalarBinder;
use crate::ScalarExpr;
use crate::Visibility;
use crate::DUMMY_COLUMN_INDEX;

#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum MergeIntoType {
Expand Down Expand Up @@ -439,20 +438,8 @@ impl Binder {
.await?,
);
}
let mut split_idx = DUMMY_COLUMN_INDEX;
// find any target table column index for merge_into_split
for column in self
.metadata
.read()
.columns_by_table_index(table_index)
.iter()
{
if column.index() != row_id_index {
split_idx = column.index();
break;
}
}
assert!(split_idx != DUMMY_COLUMN_INDEX);

let split_idx = row_id_index;

Ok(MergeInto {
catalog: catalog_name.to_string(),
Expand Down
7 changes: 5 additions & 2 deletions src/query/sql/src/planner/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ async fn optimize_merge_into(mut opt_ctx: OptimizerContext, plan: Box<MergeInto>
.table_ctx
.get_settings()
.get_enable_distributed_merge_into()?;
let mut new_columns_set = plan.columns_set.clone();
let new_columns_set = plan.columns_set.clone();
if change_join_order
&& matches!(plan.merge_type, MergeIntoType::FullOperation)
&& opt_ctx
Expand All @@ -461,7 +461,10 @@ async fn optimize_merge_into(mut opt_ctx: OptimizerContext, plan: Box<MergeInto>
== 0
&& flag
{
new_columns_set.remove(&plan.row_id_index);
// due to issue https://github.com/datafuselabs/databend/issues/15643,
// target build optimization of merge-into is disabled: here row_id column should be kept

// new_columns_set.remove(&plan.row_id_index);
opt_ctx.table_ctx.set_merge_into_join(MergeIntoJoin {
merge_into_join_type: MergeIntoJoinType::Left,
is_distributed: false,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
statement ok
create or replace database m_test;

statement ok
use m_test;

statement ok
create table t(a string, b string, c string, d string, k string);

statement ok
create table s(a string, b string, c string, d string, k string);

statement ok
insert into t(k) values('k');

statement ok
insert into s(k) values('k');


query II
merge into t using s on t.k = s.k when matched then update * when not matched then insert *;
----
0 1

query TTTTT
select * from t;
----
NULL NULL NULL NULL k
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ explain merge into target_build_optimization as t1 using source_optimization as
MergeInto:
target_table: default.default.target_build_optimization
├── distributed: false
├── target_build_optimization: true
├── target_build_optimization: false
├── can_try_update_column_only: true
├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)]
└── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))]
Expand Down Expand Up @@ -135,7 +135,7 @@ explain merge into target_build_optimization as t1 using source_optimization as
MergeInto:
target_table: default.default.target_build_optimization
├── distributed: false
├── target_build_optimization: true
├── target_build_optimization: false
├── can_try_update_column_only: true
├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)]
└── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ on t1.a = t2.a when matched then update set t1.b = t2.b when not matched then in
MergeInto:
target_table: default.default.column_only_optimization_target
├── distributed: false
├── target_build_optimization: true
├── target_build_optimization: false
├── can_try_update_column_only: true
├── matched update: [condition: None,update set b = t2.b (#1)]
└── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL))]
Expand Down Expand Up @@ -211,7 +211,7 @@ on t1.a = t2.a when matched then update * when not matched then insert *;
MergeInto:
target_table: default.default.column_only_optimization_target
├── distributed: false
├── target_build_optimization: true
├── target_build_optimization: false
├── can_try_update_column_only: true
├── matched update: [condition: None,update set a = a (#0),b = b (#1)]
└── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL))]
Expand Down

0 comments on commit b3305ae

Please sign in to comment.