diff --git a/src/query/ast/src/ast/format/syntax/query.rs b/src/query/ast/src/ast/format/syntax/query.rs index 8d60aee53b14..7266d1ce8e72 100644 --- a/src/query/ast/src/ast/format/syntax/query.rs +++ b/src/query/ast/src/ast/format/syntax/query.rs @@ -437,6 +437,9 @@ pub(crate) fn pretty_table(table: TableReference) -> RcDoc<'static> { JoinOperator::RightAnti => RcDoc::text("RIGHT ANTI JOIN"), JoinOperator::LeftSemi => RcDoc::text("LEFT SEMI JOIN"), JoinOperator::RightSemi => RcDoc::text("RIGHT SEMI JOIN"), + JoinOperator::Asof => RcDoc::text("ASOF JOIN"), + JoinOperator::LeftAsof => RcDoc::text("ASOF LEFT JOIN"), + JoinOperator::RightAsof => RcDoc::text("ASOF RIGHT JOIN"), }) .append(RcDoc::space().append(pretty_table(*join.right))) .append(match &join.condition { diff --git a/src/query/ast/src/ast/query.rs b/src/query/ast/src/ast/query.rs index 4b653453d786..1b8084756eaa 100644 --- a/src/query/ast/src/ast/query.rs +++ b/src/query/ast/src/ast/query.rs @@ -862,6 +862,15 @@ impl Display for TableReference { JoinOperator::CrossJoin => { write!(f, " CROSS JOIN")?; } + JoinOperator::Asof => { + write!(f, " ASOF JOIN")?; + } + JoinOperator::LeftAsof => { + write!(f, " ASOF LEFT JOIN")?; + } + JoinOperator::RightAsof => { + write!(f, " ASOF RIGHT JOIN")?; + } } write!(f, " {}", join.right)?; match &join.condition { @@ -934,6 +943,10 @@ pub enum JoinOperator { RightAnti, // CrossJoin can only work with `JoinCondition::None` CrossJoin, + // Asof + Asof, + LeftAsof, + RightAsof, } #[derive(Debug, Clone, PartialEq, Drive, DriveMut)] diff --git a/src/query/ast/src/parser/query.rs b/src/query/ast/src/parser/query.rs index 4948b6f53609..f352600e7e08 100644 --- a/src/query/ast/src/parser/query.rs +++ b/src/query/ast/src/parser/query.rs @@ -626,6 +626,9 @@ pub fn join_operator(i: Input) -> IResult { value(JoinOperator::RightOuter, rule! { RIGHT ~ OUTER? }), value(JoinOperator::FullOuter, rule! { FULL ~ OUTER? }), value(JoinOperator::CrossJoin, rule! { CROSS }), + value(JoinOperator::LeftAsof, rule! { ASOF ~ LEFT }), + value(JoinOperator::RightAsof, rule! { ASOF ~ RIGHT }), + value(JoinOperator::Asof, rule! { ASOF }), ))(i) } diff --git a/src/query/ast/src/parser/token.rs b/src/query/ast/src/parser/token.rs index 29e16192ff93..f0d400f3fc23 100644 --- a/src/query/ast/src/parser/token.rs +++ b/src/query/ast/src/parser/token.rs @@ -356,6 +356,8 @@ pub enum TokenKind { ARRAY, #[token("AS", ignore(ascii_case))] AS, + #[token("ASOF", ignore(ascii_case))] + ASOF, #[token("AST", ignore(ascii_case))] AST, #[token("AT", ignore(ascii_case))] @@ -1644,6 +1646,7 @@ impl TokenKind { | TokenKind::WHEN => true, | TokenKind::ARRAY | TokenKind::AS + | TokenKind::ASOF | TokenKind::BETWEEN | TokenKind::CREATE | TokenKind::ATTACH diff --git a/src/query/expression/src/types.rs b/src/query/expression/src/types.rs index 9c412c1affb8..583a3f8936d7 100755 --- a/src/query/expression/src/types.rs +++ b/src/query/expression/src/types.rs @@ -62,6 +62,10 @@ pub use self::string::StringType; pub use self::timestamp::TimestampType; pub use self::variant::VariantType; use crate::property::Domain; +use crate::types::date::DATE_MAX; +use crate::types::date::DATE_MIN; +use crate::types::timestamp::TIMESTAMP_MAX; +use crate::types::timestamp::TIMESTAMP_MIN; use crate::values::Column; use crate::values::Scalar; use crate::ColumnBuilder; @@ -174,6 +178,63 @@ impl DataType { } } + pub fn infinity(&self) -> Result { + match &self { + DataType::Timestamp => Ok(Scalar::Timestamp(TIMESTAMP_MAX)), + DataType::Date => Ok(Scalar::Date(DATE_MAX)), + DataType::Number(NumberDataType::Float32) => Ok(Scalar::Number(NumberScalar::Float32( + ordered_float::OrderedFloat(f32::INFINITY), + ))), + DataType::Number(NumberDataType::Int32) => { + Ok(Scalar::Number(NumberScalar::Int32(i32::MAX))) + } + DataType::Number(NumberDataType::Int16) => { + Ok(Scalar::Number(NumberScalar::Int16(i16::MAX))) + } + DataType::Number(NumberDataType::Int8) => { + Ok(Scalar::Number(NumberScalar::Int8(i8::MAX))) + } + DataType::Number(NumberDataType::Float64) => Ok(Scalar::Number(NumberScalar::Float64( + ordered_float::OrderedFloat(f64::INFINITY), + ))), + DataType::Number(NumberDataType::Int64) => { + Ok(Scalar::Number(NumberScalar::Int64(i64::MAX))) + } + _ => Result::Err(format!( + "only support numeric types and time types, but got {:?}", + self + )), + } + } + pub fn ninfinity(&self) -> Result { + match &self { + DataType::Timestamp => Ok(Scalar::Timestamp(TIMESTAMP_MIN)), + DataType::Date => Ok(Scalar::Date(DATE_MIN)), + DataType::Number(NumberDataType::Float32) => Ok(Scalar::Number(NumberScalar::Float32( + ordered_float::OrderedFloat(f32::NEG_INFINITY), + ))), + DataType::Number(NumberDataType::Int32) => { + Ok(Scalar::Number(NumberScalar::Int32(i32::MIN))) + } + DataType::Number(NumberDataType::Int16) => { + Ok(Scalar::Number(NumberScalar::Int16(i16::MIN))) + } + DataType::Number(NumberDataType::Int8) => { + Ok(Scalar::Number(NumberScalar::Int8(i8::MIN))) + } + DataType::Number(NumberDataType::Float64) => Ok(Scalar::Number(NumberScalar::Float64( + ordered_float::OrderedFloat(f64::NEG_INFINITY), + ))), + DataType::Number(NumberDataType::Int64) => { + Ok(Scalar::Number(NumberScalar::Int64(i64::MIN))) + } + _ => Result::Err(format!( + "only support numeric types and time types, but got {:?}", + self + )), + } + } + pub fn is_unsigned_numeric(&self) -> bool { match self { DataType::Number(ty) => ALL_UNSIGNED_INTEGER_TYPES.contains(ty), diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs index e6a790389e1b..617e59c36e40 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/common.rs @@ -184,7 +184,7 @@ impl HashJoinState { } } -pub(crate) fn wrap_true_validity( +pub fn wrap_true_validity( column: &BlockEntry, num_rows: usize, true_validity: &Bitmap, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs index b1c8379530b5..1240be5b4fa4 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_probe_state.rs @@ -160,6 +160,9 @@ impl HashJoinProbeState { pub fn probe(&self, input: DataBlock, probe_state: &mut ProbeState) -> Result> { match self.hash_join_state.hash_join_desc.join_type { JoinType::Inner + | JoinType::Asof + | JoinType::LeftAsof + | JoinType::RightAsof | JoinType::LeftSemi | JoinType::LeftAnti | JoinType::RightSemi diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs index 521c21e343ec..e4358271159c 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/mod.rs @@ -31,6 +31,7 @@ mod transform_hash_join_probe; mod util; pub use build_spill::BuildSpillState; +pub use common::wrap_true_validity; pub use desc::HashJoinDesc; pub use hash_join_build_state::HashJoinBuildState; pub use hash_join_probe_state::HashJoinProbeState; diff --git a/src/query/service/src/pipelines/processors/transforms/range_join/ie_join_state.rs b/src/query/service/src/pipelines/processors/transforms/range_join/ie_join_state.rs index e2046c474920..697f3638c593 100644 --- a/src/query/service/src/pipelines/processors/transforms/range_join/ie_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/range_join/ie_join_state.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::time::Duration; +use std::sync::atomic::Ordering; + use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::bitmap::MutableBitmap; use databend_common_catalog::table_context::TableContext; @@ -28,6 +31,7 @@ use databend_common_expression::DataBlock; use databend_common_expression::DataField; use databend_common_expression::DataSchemaRef; use databend_common_expression::DataSchemaRefExt; +use databend_common_expression::Scalar; use databend_common_expression::ScalarRef; use databend_common_expression::SortColumnDescription; use databend_common_expression::Value; @@ -174,6 +178,22 @@ impl IEJoinState { impl RangeJoinState { pub fn ie_join(&self, task_id: usize) -> Result> { + let partition_count = self.partition_count.load(Ordering::SeqCst) as usize; + if task_id < partition_count { + let blocks = self.inner_join(task_id); + self.completed_pair.fetch_add(1, Ordering::SeqCst); + blocks + } else { + if !self.left_match.read().is_empty() { + return Ok(vec![self.fill_left_outer(task_id)?]); + } else if !self.right_match.read().is_empty() { + return Ok(vec![self.fill_right_outer(task_id)?]); + } + Ok(vec![DataBlock::empty()]) + } + } + + pub fn inner_join(&self, task_id: usize) -> Result> { let block_size = self.ctx.get_settings().get_max_block_size()? as usize; let tasks = self.tasks.read(); let (left_idx, right_idx) = tasks[task_id]; @@ -313,6 +333,8 @@ impl RangeJoinState { let mut right_buffer = Vec::with_capacity(block_size); let mut off1; let mut off2 = 0; + let mut left_match = self.left_match.write(); + let mut right_match = self.right_match.write(); for (idx, p) in p_array.iter().enumerate() { if let ScalarRef::Number(NumberScalar::Int64(val)) = unsafe { l1_index_column.index_unchecked(*p as usize) } @@ -365,27 +387,161 @@ impl RangeJoinState { let left_table = self.left_table.read(); let right_table = self.right_table.read(); let mut indices = Vec::with_capacity(left_buffer.len()); + let mut column_builder = + NumberColumnBuilder::with_capacity(&NumberDataType::UInt64, left_buffer.len()); for res in left_buffer.iter() { indices.push((0u32, *res as u32, 1usize)); + if !left_match.is_empty() { + column_builder.push(NumberScalar::UInt64((*res + left_offset) as u64)); + } } let mut left_result_block = DataBlock::take_blocks(&left_table[left_idx..left_idx + 1], &indices, indices.len()); indices.clear(); for res in right_buffer.iter() { indices.push((0u32, *res as u32, 1usize)); + if !right_match.is_empty() { + column_builder.push(NumberScalar::UInt64((*res + right_offset) as u64)); + } } let right_result_block = DataBlock::take_blocks( &right_table[right_idx..right_idx + 1], &indices, indices.len(), ); - // Merge left_result_block and right_result_block for col in right_result_block.columns() { left_result_block.add_column(col.clone()); } + if !left_match.is_empty() || !right_match.is_empty() { + left_result_block.add_column(BlockEntry::new( + DataType::Number(NumberDataType::UInt64), + Value::Column(Column::Number(column_builder.build())), + )); + } for filter in self.other_conditions.iter() { left_result_block = filter_block(left_result_block, filter)?; } + if !left_match.is_empty() || !right_match.is_empty() { + let column = &left_result_block + .columns() + .last() + .unwrap() + .value + .try_downcast::() + .unwrap(); + if let ValueRef::Column(col) = column.as_ref() { + for val in UInt64Type::iter_column(&col) { + if !left_match.is_empty() { + left_match.set(val as usize, true); + } + if !right_match.is_empty() { + right_match.set(val as usize, true); + } + } + } + left_result_block.pop_columns(1); + } + Ok(left_result_block) + } + + pub fn fill_left_outer(&self, task_id: usize) -> Result { + let partition_count = self.partition_count.load(Ordering::SeqCst) as usize; + let mut completed = self.completed_pair.load(Ordering::SeqCst) as usize; + while completed < partition_count { + std::thread::sleep(Duration::from_millis(10)); + completed = self.completed_pair.load(Ordering::SeqCst) as usize; + } + + let block_size = self.ctx.get_settings().get_max_block_size()? as usize; + let tasks = self.tasks.read(); + let (left_idx, right_idx) = tasks[task_id]; + let row_offset = self.row_offset.read(); + let (left_offset, _right_offset) = row_offset[task_id]; + let left_table = self.left_table.read(); + let right_table = self.right_table.read(); + let mut indices = Vec::with_capacity(block_size); + let left_match = self.left_match.read(); + + for (i, state) in left_match + .iter() + .enumerate() + .skip(left_offset) + .take(left_table[left_idx].num_rows()) + { + if !(state) { + indices.push((0u32, (i - left_offset) as u32, 1usize)); + } + } + if indices.is_empty() { + return Ok(DataBlock::empty()); + } + let mut left_result_block = + DataBlock::take_blocks(&left_table[left_idx..left_idx + 1], &indices, indices.len()); + let nullable_columns = right_table[right_idx] + .columns() + .iter() + .map(|c| BlockEntry { + value: Value::Scalar(Scalar::Null), + data_type: c.data_type.wrap_nullable(), + }) + .collect::>(); + let right_result_block = DataBlock::new(nullable_columns, indices.len()); + // Merge left_result_block and right_result_block + for col in right_result_block.columns() { + left_result_block.add_column(col.clone()); + } + Ok(left_result_block) + } + + pub fn fill_right_outer(&self, task_id: usize) -> Result { + let partition_count = self.partition_count.load(Ordering::SeqCst) as usize; + let mut completed = self.completed_pair.load(Ordering::SeqCst) as usize; + while completed < partition_count { + std::thread::sleep(Duration::from_millis(10)); + completed = self.completed_pair.load(Ordering::SeqCst) as usize; + } + + let block_size = self.ctx.get_settings().get_max_block_size()? as usize; + let tasks = self.tasks.read(); + let (left_idx, right_idx) = tasks[task_id]; + let row_offset = self.row_offset.read(); + let (_, right_offset) = row_offset[task_id]; + let left_table = self.left_table.read(); + let right_table = self.right_table.read(); + let mut indices = Vec::with_capacity(block_size); + let right_match = self.right_match.read(); + + for (i, state) in right_match + .iter() + .enumerate() + .skip(right_offset) + .take(right_table[right_idx].num_rows()) + { + if !(state) { + indices.push((0u32, (i - right_offset) as u32, 1usize)); + } + } + if indices.is_empty() { + return Ok(DataBlock::empty()); + } + let nullable_columns = left_table[left_idx] + .columns() + .iter() + .map(|c| BlockEntry { + value: Value::Scalar(Scalar::Null), + data_type: c.data_type.wrap_nullable(), + }) + .collect::>(); + let mut left_result_block = DataBlock::new(nullable_columns, indices.len()); + let right_result_block = DataBlock::take_blocks( + &right_table[right_idx..right_idx + 1], + &indices, + indices.len(), + ); + // Merge left_result_block and right_result_block + for col in right_result_block.columns() { + left_result_block.add_column(col.clone()); + } Ok(left_result_block) } } diff --git a/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs b/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs index bdc79cd0df38..2f19f94135bf 100644 --- a/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs @@ -16,6 +16,8 @@ use std::sync::atomic; use std::sync::atomic::AtomicU64; use std::sync::Arc; +use databend_common_arrow::arrow::bitmap::Bitmap; +use databend_common_arrow::arrow::bitmap::MutableBitmap; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::types::DataType; @@ -31,11 +33,13 @@ use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_sql::executor::physical_plans::RangeJoin; use databend_common_sql::executor::physical_plans::RangeJoinCondition; use databend_common_sql::executor::physical_plans::RangeJoinType; +use databend_common_sql::plans::JoinType; use parking_lot::Mutex; use parking_lot::RwLock; use crate::pipelines::executor::WatchNotify; use crate::pipelines::processors::transforms::range_join::IEJoinState; +use crate::pipelines::processors::transforms::wrap_true_validity; use crate::sessions::QueryContext; pub struct RangeJoinState { @@ -59,8 +63,15 @@ pub struct RangeJoinState { // Row index offset for left/right pub(crate) row_offset: RwLock>, pub(crate) finished_tasks: AtomicU64, + pub(crate) completed_pair: AtomicU64, // IEJoin state pub(crate) ie_join_state: Option, + pub(crate) join_type: JoinType, + // A bool indicating for tuple in the LHS if they found a match (used in full outer join) + pub(crate) left_match: RwLock, + // A bool indicating for tuple in the RHS if they found a match (used in full outer join) + pub(crate) right_match: RwLock, + pub(crate) partition_count: AtomicU64, } impl RangeJoinState { @@ -70,7 +81,6 @@ impl RangeJoinState { } else { None }; - Self { ctx, left_table: RwLock::new(vec![]), @@ -87,21 +97,46 @@ impl RangeJoinState { tasks: RwLock::new(vec![]), row_offset: RwLock::new(vec![]), finished_tasks: AtomicU64::new(0), + completed_pair: AtomicU64::new(0), ie_join_state, + join_type: range_join.join_type.clone(), + left_match: RwLock::new(MutableBitmap::new()), + right_match: RwLock::new(MutableBitmap::new()), + partition_count: AtomicU64::new(0), } } pub(crate) fn sink_right(&self, block: DataBlock) -> Result<()> { // Sink block to right table let mut right_table = self.right_table.write(); - right_table.push(block); + let mut right_block = block; + if matches!(self.join_type, JoinType::Left) { + let validity = Bitmap::new_constant(true, right_block.num_rows()); + let nullable_right_columns = right_block + .columns() + .iter() + .map(|c| wrap_true_validity(c, right_block.num_rows(), &validity)) + .collect::>(); + right_block = DataBlock::new(nullable_right_columns, right_block.num_rows()); + } + right_table.push(right_block); Ok(()) } pub(crate) fn sink_left(&self, block: DataBlock) -> Result<()> { // Sink block to left table let mut left_table = self.left_table.write(); - left_table.push(block); + let mut left_block = block; + if matches!(self.join_type, JoinType::Right) { + let validity = Bitmap::new_constant(true, left_block.num_rows()); + let nullable_left_columns = left_block + .columns() + .iter() + .map(|c| wrap_true_validity(c, left_block.num_rows(), &validity)) + .collect::>(); + left_block = DataBlock::new(nullable_left_columns, left_block.num_rows()); + } + left_table.push(left_block); Ok(()) } @@ -272,6 +307,29 @@ impl RangeJoinState { right_offset = 0; left_offset += left_block.num_rows(); } + self.partition_count + .fetch_add(tasks.len().try_into().unwrap(), atomic::Ordering::SeqCst); + // Add Fill task + left_offset = 0; + right_offset = 0; + if matches!(self.join_type, JoinType::Left) { + let mut left_match = self.left_match.write(); + for (left_idx, left_block) in left_sorted_blocks.iter().enumerate() { + row_offset.push((left_offset, 0)); + tasks.push((left_idx, 0)); + left_offset += left_block.num_rows(); + left_match.extend_constant(left_block.num_rows(), false); + } + } + if matches!(self.join_type, JoinType::Right) { + let mut right_match = self.right_match.write(); + for (right_idx, right_block) in right_sorted_blocks.iter().enumerate() { + row_offset.push((0, right_offset)); + tasks.push((0, right_idx)); + right_offset += right_block.num_rows(); + right_match.extend_constant(right_block.num_rows(), false); + } + } Ok(()) } } diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs index 2d342702d28e..2ca99add28e2 100644 --- a/src/query/sql/src/executor/physical_plan_visitor.rs +++ b/src/query/sql/src/executor/physical_plan_visitor.rs @@ -300,6 +300,7 @@ pub trait PhysicalPlanReplacer { other_conditions: plan.other_conditions.clone(), join_type: plan.join_type.clone(), range_join_type: plan.range_join_type.clone(), + output_schema: plan.output_schema.clone(), stat_info: plan.stat_info.clone(), })) } diff --git a/src/query/sql/src/executor/physical_plans/mod.rs b/src/query/sql/src/executor/physical_plans/mod.rs index 1a372bc7d655..d175e063c8e0 100644 --- a/src/query/sql/src/executor/physical_plans/mod.rs +++ b/src/query/sql/src/executor/physical_plans/mod.rs @@ -104,6 +104,8 @@ mod physical_window_partition; pub use physical_table_scan::TableScan; pub use physical_udf::Udf; pub use physical_udf::UdfFunctionDesc; +mod physical_asof_join; +pub use physical_asof_join::*; pub use physical_union_all::UnionAll; pub use physical_window::*; pub use physical_window_partition::WindowPartition; diff --git a/src/query/sql/src/executor/physical_plans/physical_asof_join.rs b/src/query/sql/src/executor/physical_plans/physical_asof_join.rs new file mode 100644 index 000000000000..9f541a7694de --- /dev/null +++ b/src/query/sql/src/executor/physical_plans/physical_asof_join.rs @@ -0,0 +1,457 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::DataType; +use databend_common_expression::types::NumberScalar; +use databend_common_expression::DataSchemaRef; +use databend_common_expression::RemoteExpr; +use databend_common_expression::Scalar; + +use crate::binder::bind_window_function_info; +use crate::binder::ColumnBindingBuilder; +use crate::binder::WindowFunctionInfo; +use crate::binder::WindowOrderByInfo; +use crate::executor::explain::PlanStatsInfo; +use crate::executor::PhysicalPlan; +use crate::executor::PhysicalPlanBuilder; +use crate::optimizer::ColumnSet; +use crate::optimizer::RelExpr; +use crate::optimizer::SExpr; +use crate::plans::BoundColumnRef; +use crate::plans::ComparisonOp; +use crate::plans::ConstantExpr; +use crate::plans::FunctionCall; +use crate::plans::Join; +use crate::plans::JoinType; +use crate::plans::LagLeadFunction; +use crate::plans::ScalarExpr; +use crate::plans::ScalarItem; +use crate::plans::WindowFunc; +use crate::plans::WindowFuncFrame; +use crate::plans::WindowFuncFrameBound; +use crate::plans::WindowFuncFrameUnits; +use crate::plans::WindowFuncType; +use crate::plans::WindowOrderBy; +use crate::ColumnEntry; +use crate::DerivedColumn; +use crate::Visibility; + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct AsofJoin { + // A unique id of operator in a `PhysicalPlan` tree, only used for display. + pub plan_id: u32, + pub left: Box, + pub right: Box, + pub non_equi_conditions: Vec, + // Now only support inner join, will support left/right join later + pub join_type: JoinType, + + pub output_schema: DataSchemaRef, + + // Only used for explain + pub stat_info: Option, +} + +impl AsofJoin { + pub fn output_schema(&self) -> Result { + Ok(self.output_schema.clone()) + } +} + +impl PhysicalPlanBuilder { + pub async fn build_asof_join( + &mut self, + join: &Join, + s_expr: &SExpr, + required: (ColumnSet, ColumnSet), + mut range_conditions: Vec, + mut other_conditions: Vec, + ) -> Result { + let mut window_index: usize = 0; + + if range_conditions.is_empty() { + return Err(ErrorCode::Internal("Missing inequality condition!")); + } + if range_conditions.len() > 1 { + return Err(ErrorCode::Internal("Multiple inequalities condition!")); + } + let (window_func, right_column, left_type) = + self.bind_window_func(join, s_expr, &range_conditions, &mut other_conditions)?; + let window_plan = self.build_window_plan(&window_func, s_expr, &mut window_index)?; + self.add_range_condition( + &window_func, + window_index, + &mut range_conditions, + right_column, + left_type, + )?; + let mut ss_expr = s_expr.clone(); + ss_expr.children[0] = Arc::new(window_plan); + ss_expr.children[1] = Arc::new(s_expr.child(0)?.clone()); + let join_type = match join.join_type { + JoinType::Asof => Ok(JoinType::Inner), + JoinType::LeftAsof => Ok(JoinType::Left), + JoinType::RightAsof => Ok(JoinType::Right), + _ => Err(ErrorCode::Internal("unsupported join type!")), + }?; + let left_prop = RelExpr::with_s_expr(ss_expr.child(0)?).derive_relational_prop()?; + let right_prop = RelExpr::with_s_expr(ss_expr.child(1)?).derive_relational_prop()?; + let left_required = required.0.union(&left_prop.used_columns).cloned().collect(); + let right_required = required + .1 + .union(&right_prop.used_columns) + .cloned() + .collect(); + self.build_range_join( + join_type, + &ss_expr, + left_required, + right_required, + range_conditions, + other_conditions, + ) + .await + } + + fn add_range_condition( + &mut self, + window_func: &WindowFunc, + window_index: usize, + range_conditions: &mut Vec, + right_column: ScalarExpr, + left_type: DataType, + ) -> Result { + let mut folded_args: Vec = Vec::with_capacity(2); + let mut func_name = String::from("eq"); + // Generate a ColumnBinding for each argument of aggregates + let column = ColumnBindingBuilder::new( + window_func.display_name.clone(), + window_index, + Box::new(left_type), + Visibility::Visible, + ) + .build(); + folded_args.push(right_column.clone()); + folded_args.push( + BoundColumnRef { + span: right_column.span(), + column, + } + .into(), + ); + for condition in range_conditions.iter() { + if let ScalarExpr::FunctionCall(func) = condition { + match ComparisonOp::try_from_func_name(func.func_name.as_str()).unwrap() { + ComparisonOp::GTE => { + func_name = String::from("lt"); + } + ComparisonOp::GT => { + func_name = String::from("lte"); + } + ComparisonOp::LT => { + func_name = String::from("gte"); + } + ComparisonOp::LTE => { + func_name = String::from("gt"); + } + _ => unreachable!("must be range condition!"), + } + } + } + range_conditions.push( + FunctionCall { + span: range_conditions[0].span(), + params: vec![], + arguments: folded_args, + func_name, + } + .into(), + ); + Ok(true) + } + + fn bind_window_func( + &mut self, + join: &Join, + s_expr: &SExpr, + range_conditions: &[ScalarExpr], + other_conditions: &mut Vec, + ) -> Result<(WindowFunc, ScalarExpr, DataType), ErrorCode> { + let right_prop = RelExpr::with_s_expr(s_expr.child(0)?).derive_relational_prop()?; + let left_prop = RelExpr::with_s_expr(s_expr.child(1)?).derive_relational_prop()?; + + let mut right_column = range_conditions[0].clone(); + let mut left_column = range_conditions[0].clone(); + let mut order_items: Vec = Vec::with_capacity(range_conditions.len()); + let mut constant_default = ConstantExpr { + span: right_column.span(), + value: Scalar::Null, + }; + for condition in range_conditions.iter() { + if let ScalarExpr::FunctionCall(func) = condition { + if func.arguments.len() == 2 { + for arg in func.arguments.iter() { + if let ScalarExpr::BoundColumnRef(_) = arg { + let asc = + match ComparisonOp::try_from_func_name(func.func_name.as_str()) + .unwrap() + { + ComparisonOp::GT | ComparisonOp::GTE => Ok(Some(true)), + ComparisonOp::LT | ComparisonOp::LTE => Ok(Some(false)), + _ => Err(ErrorCode::Internal("must be range condition!")), + }?; + if arg.used_columns().is_subset(&left_prop.output_columns) { + left_column = arg.clone(); + constant_default.span = left_column.span(); + constant_default.value = left_column + .data_type()? + .remove_nullable() + .infinity() + .unwrap(); + if let Some(false) = asc { + constant_default.value = left_column + .data_type()? + .remove_nullable() + .ninfinity() + .unwrap(); + } + order_items.push(WindowOrderBy { + expr: arg.clone(), + asc, + nulls_first: Some(true), + }); + } + if arg.used_columns().is_subset(&right_prop.output_columns) { + right_column = arg.clone(); + } + } else { + return Err(ErrorCode::Internal( + "Cannot downcast Scalar to BoundColumnRef", + )); + } + } + } + } + } + + let mut partition_items: Vec = Vec::with_capacity(join.equi_conditions.len()); + let mut other_args: Vec = Vec::with_capacity(2); + for condition in join.equi_conditions.iter() { + if matches!(condition.right, ScalarExpr::BoundColumnRef(_)) + && matches!(condition.left, ScalarExpr::BoundColumnRef(_)) + { + partition_items.push(condition.right.clone()); + other_args.clear(); + other_args.push(condition.left.clone()); + other_args.push(condition.right.clone()); + other_conditions.push( + FunctionCall { + span: range_conditions[0].span(), + params: vec![], + arguments: other_args.clone(), + func_name: String::from("eq"), + } + .into(), + ); + } else { + return Err(ErrorCode::Internal( + "Cannot downcast Scalar to BoundColumnRef", + )); + } + } + let func_type = WindowFuncType::LagLead(LagLeadFunction { + is_lag: false, + arg: Box::new(left_column.clone()), + offset: 1, + default: Some(Box::new(constant_default.into())), + return_type: Box::new(left_column.data_type()?.clone()), + }); + let window_func = WindowFunc { + span: range_conditions[0].span(), + display_name: func_type.func_name(), + partition_by: partition_items, + func: func_type, + order_by: order_items, + frame: WindowFuncFrame { + units: WindowFuncFrameUnits::Rows, + start_bound: WindowFuncFrameBound::Following(Some(Scalar::Number( + NumberScalar::UInt64(1), + ))), + end_bound: WindowFuncFrameBound::Following(Some(Scalar::Number( + NumberScalar::UInt64(1), + ))), + }, + }; + Ok((window_func, right_column, left_column.data_type()?.clone())) + } + + fn build_window_plan( + &mut self, + window: &WindowFunc, + s_expr: &SExpr, + window_index: &mut usize, + ) -> Result { + let mut window_args = vec![]; + let window_func_name = window.func.func_name(); + let func = match &window.func { + WindowFuncType::LagLead(ll) => { + let (new_arg, new_default) = + self.replace_lag_lead_args(&mut window_args, &window_func_name, ll)?; + + WindowFuncType::LagLead(LagLeadFunction { + is_lag: ll.is_lag, + arg: Box::new(new_arg), + offset: ll.offset, + default: new_default, + return_type: ll.return_type.clone(), + }) + } + func => func.clone(), + }; + + // resolve partition by + let mut partition_by_items = vec![]; + for (i, part) in window.partition_by.iter().enumerate() { + let part = part.clone(); + let name = format!("{window_func_name}_part_{i}"); + let replaced_part = self.replace_expr(&name, &part)?; + partition_by_items.push(ScalarItem { + index: replaced_part.column.index, + scalar: part, + }); + } + + // resolve order by + let mut order_by_items = vec![]; + for (i, order) in window.order_by.iter().enumerate() { + let order_expr = order.expr.clone(); + let name = format!("{window_func_name}_order_{i}"); + let replaced_order = self.replace_expr(&name, &order_expr)?; + order_by_items.push(WindowOrderByInfo { + order_by_item: ScalarItem { + index: replaced_order.column.index, + scalar: order_expr, + }, + asc: order.asc, + nulls_first: order.nulls_first, + }); + } + + let index = self.metadata.write().add_derived_column( + window.display_name.clone(), + window.func.return_type(), + None, + ); + + *window_index = index; + + let window_info = WindowFunctionInfo { + span: window.span, + index, + partition_by_items, + func, + arguments: window_args, + order_by_items, + frame: window.frame.clone(), + }; + bind_window_function_info(&self.ctx, &window_info, s_expr.child(1)?.clone()) + } + + fn replace_lag_lead_args( + &mut self, + window_args: &mut Vec, + window_func_name: &String, + f: &LagLeadFunction, + ) -> Result<(ScalarExpr, Option>)> { + let arg = (*f.arg).clone(); + let name = format!("{window_func_name}_arg"); + let replaced_arg = self.replace_expr(&name, &arg)?; + window_args.push(ScalarItem { + scalar: arg, + index: replaced_arg.column.index, + }); + let new_default = match &f.default { + None => None, + Some(d) => { + let d = (**d).clone(); + let name = format!("{window_func_name}_default_value"); + let replaced_default = self.replace_expr(&name, &d)?; + window_args.push(ScalarItem { + scalar: d, + index: replaced_default.column.index, + }); + Some(Box::new(replaced_default.into())) + } + }; + + Ok((replaced_arg.into(), new_default)) + } + + fn replace_expr(&self, name: &str, arg: &ScalarExpr) -> Result { + if let ScalarExpr::BoundColumnRef(col) = &arg { + Ok(col.clone()) + } else { + for entry in self.metadata.read().columns() { + if let ColumnEntry::DerivedColumn(DerivedColumn { + scalar_expr, + alias, + column_index, + data_type, + .. + }) = entry + { + if scalar_expr.as_ref() == Some(arg) { + // Generate a ColumnBinding for each argument of aggregates + let column = ColumnBindingBuilder::new( + alias.to_string(), + *column_index, + Box::new(data_type.clone()), + Visibility::Visible, + ) + .build(); + + return Ok(BoundColumnRef { + span: arg.span(), + column, + }); + } + } + } + let ty = arg.data_type()?; + + let index = self.metadata.write().add_derived_column( + name.to_string(), + ty.clone(), + Some(arg.clone()), + ); + + // Generate a ColumnBinding for each argument of aggregates + let column = ColumnBindingBuilder::new( + name.to_string(), + index, + Box::new(ty), + Visibility::Visible, + ) + .build(); + Ok(BoundColumnRef { + span: arg.span(), + column, + }) + } + } +} diff --git a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs index 0d947551bd4b..2cf7a8146e02 100644 --- a/src/query/sql/src/executor/physical_plans/physical_hash_join.rs +++ b/src/query/sql/src/executor/physical_plans/physical_hash_join.rs @@ -487,6 +487,10 @@ impl PhysicalPlanBuilder { )); probe_fields } + JoinType::Asof | JoinType::LeftAsof | JoinType::RightAsof => unreachable!( + "Invalid join type {} during building physical hash join.", + join.join_type + ), }; let mut projections = ColumnSet::new(); let projected_schema = DataSchemaRefExt::create(merged_fields.clone()); diff --git a/src/query/sql/src/executor/physical_plans/physical_join.rs b/src/query/sql/src/executor/physical_plans/physical_join.rs index 9ef68c7a5301..a049667e4d26 100644 --- a/src/query/sql/src/executor/physical_plans/physical_join.rs +++ b/src/query/sql/src/executor/physical_plans/physical_join.rs @@ -32,11 +32,17 @@ pub enum PhysicalJoinType { Hash, // The first arg is range conditions, the second arg is other conditions RangeJoin(Vec, Vec), + AsofJoin(Vec, Vec), } // Choose physical join type by join conditions pub fn physical_join(join: &Join, s_expr: &SExpr) -> Result { - if !join.equi_conditions.is_empty() { + if !join.equi_conditions.is_empty() + && !matches!( + join.join_type, + JoinType::Asof | JoinType::LeftAsof | JoinType::RightAsof + ) + { // Contain equi condition, use hash join return Ok(PhysicalJoinType::Hash); } @@ -59,14 +65,21 @@ pub fn physical_join(join: &Join, s_expr: &SExpr) -> Result { &mut other_conditions, ) } - if !range_conditions.is_empty() && matches!(join.join_type, JoinType::Inner | JoinType::Cross) { return Ok(PhysicalJoinType::RangeJoin( range_conditions, other_conditions, )); } - + if matches!( + join.join_type, + JoinType::Asof | JoinType::LeftAsof | JoinType::RightAsof + ) { + return Ok(PhysicalJoinType::AsofJoin( + range_conditions, + other_conditions, + )); + } // Leverage hash join to execute nested loop join Ok(PhysicalJoinType::Hash) } @@ -164,10 +177,21 @@ impl PhysicalPlanBuilder { ) .await } - PhysicalJoinType::RangeJoin(range, other) => { - self.build_range_join(s_expr, left_required, right_required, range, other) + PhysicalJoinType::AsofJoin(range, other) => { + self.build_asof_join(join, s_expr, (left_required, right_required), range, other) .await } + PhysicalJoinType::RangeJoin(range, other) => { + self.build_range_join( + join.join_type.clone(), + s_expr, + left_required, + right_required, + range, + other, + ) + .await + } } } } diff --git a/src/query/sql/src/executor/physical_plans/physical_range_join.rs b/src/query/sql/src/executor/physical_plans/physical_range_join.rs index bcae6af95aaa..4985f75dee90 100644 --- a/src/query/sql/src/executor/physical_plans/physical_range_join.rs +++ b/src/query/sql/src/executor/physical_plans/physical_range_join.rs @@ -15,6 +15,7 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::type_check::common_super_type; +use databend_common_expression::DataField; use databend_common_expression::DataSchemaRef; use databend_common_expression::DataSchemaRefExt; use databend_common_expression::RemoteExpr; @@ -47,6 +48,7 @@ pub struct RangeJoin { // Now only support inner join, will support left/right join later pub join_type: JoinType, pub range_join_type: RangeJoinType, + pub output_schema: DataSchemaRef, // Only used for explain pub stat_info: Option, @@ -54,9 +56,7 @@ pub struct RangeJoin { impl RangeJoin { pub fn output_schema(&self) -> Result { - let mut fields = self.left.output_schema()?.fields().clone(); - fields.extend(self.right.output_schema()?.fields().clone()); - Ok(DataSchemaRefExt::create(fields)) + Ok(self.output_schema.clone()) } } @@ -77,6 +77,7 @@ pub struct RangeJoinCondition { impl PhysicalPlanBuilder { pub async fn build_range_join( &mut self, + join_type: JoinType, s_expr: &SExpr, left_required: ColumnSet, right_required: ColumnSet, @@ -102,15 +103,46 @@ impl PhysicalPlanBuilder { let left_side = self.build(s_expr.child(1)?, left_required).await?; let right_side = self.build(s_expr.child(0)?, right_required).await?; - let left_schema = left_side.output_schema()?; - let right_schema = right_side.output_schema()?; + let left_schema = match join_type { + JoinType::Right | JoinType::RightSingle | JoinType::Full => { + let left_schema = left_side.output_schema()?; + // Wrap nullable type for columns in build side. + let left_schema = DataSchemaRefExt::create( + left_schema + .fields() + .iter() + .map(|field| { + DataField::new(field.name(), field.data_type().wrap_nullable()) + }) + .collect::>(), + ); + left_schema + } + _ => left_side.output_schema()?, + }; + let right_schema = match join_type { + JoinType::Left | JoinType::LeftSingle | JoinType::Full => { + let right_schema = right_side.output_schema()?; + // Wrap nullable type for columns in build side. + let right_schema = DataSchemaRefExt::create( + right_schema + .fields() + .iter() + .map(|field| { + DataField::new(field.name(), field.data_type().wrap_nullable()) + }) + .collect::>(), + ); + right_schema + } + _ => right_side.output_schema()?, + }; let merged_schema = DataSchemaRefExt::create( - left_side - .output_schema()? + left_schema .fields() .iter() - .chain(right_side.output_schema()?.fields()) + .chain(right_schema.fields()) .cloned() .collect::>(), ); @@ -135,8 +167,9 @@ impl PhysicalPlanBuilder { .iter() .map(|scalar| resolve_scalar(scalar, &merged_schema)) .collect::>()?, - join_type: JoinType::Inner, + join_type, range_join_type, + output_schema: merged_schema, stat_info: Some(self.build_plan_stat_info(s_expr)?), })) } diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs index 6a16eb1594f5..a55af2aff144 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_join.rs @@ -230,6 +230,13 @@ impl Binder { "Join conditions should be empty in cross join", )); } + if matches!( + join_type, + JoinType::Asof | JoinType::LeftAsof | JoinType::RightAsof + ) && non_equi_conditions.is_empty() + { + return Err(ErrorCode::SemanticError("Missing inequality condition!")); + } self.push_down_other_conditions( &join_type, &mut left_child, @@ -338,6 +345,9 @@ impl Binder { JoinPredicate::ALL(_) => match join_type { JoinType::Cross | JoinType::Inner + | JoinType::Asof + | JoinType::LeftAsof + | JoinType::RightAsof | JoinType::LeftSemi | JoinType::LeftAnti | JoinType::RightSemi @@ -411,7 +421,11 @@ impl Binder { check_duplicate_join_tables(left_column_bindings, right_column_bindings)?; match join_op { - JoinOperator::LeftOuter | JoinOperator::RightOuter | JoinOperator::FullOuter + JoinOperator::LeftOuter + | JoinOperator::RightOuter + | JoinOperator::FullOuter + | JoinOperator::LeftAsof + | JoinOperator::RightAsof if join_condition == &JoinCondition::None => { return Err(ErrorCode::SemanticError( @@ -423,6 +437,11 @@ impl Binder { "cross join should not contain join conditions".to_string(), )); } + JoinOperator::Asof if join_condition == &JoinCondition::None => { + return Err(ErrorCode::SemanticError( + "asof join should contain join conditions".to_string(), + )); + } _ => (), }; @@ -438,7 +457,7 @@ fn wrap_nullable_for_column( bind_context: &mut BindContext, ) { match join_type { - JoinOperator::LeftOuter => { + JoinOperator::LeftOuter | JoinOperator::LeftAsof => { for column in left_column_bindings { bind_context.add_column_binding(column.clone()); } @@ -448,7 +467,7 @@ fn wrap_nullable_for_column( bind_context.add_column_binding(nullable_column); } } - JoinOperator::RightOuter => { + JoinOperator::RightOuter | JoinOperator::RightAsof => { for column in left_column_bindings { let mut nullable_column = column.clone(); nullable_column.data_type = Box::new(column.data_type.wrap_nullable()); @@ -916,6 +935,9 @@ fn join_type(join_type: &JoinOperator) -> JoinType { JoinOperator::RightSemi => JoinType::RightSemi, JoinOperator::LeftAnti => JoinType::LeftAnti, JoinOperator::RightAnti => JoinType::RightAnti, + JoinOperator::Asof => JoinType::Asof, + JoinOperator::LeftAsof => JoinType::LeftAsof, + JoinOperator::RightAsof => JoinType::RightAsof, } } diff --git a/src/query/sql/src/planner/binder/mod.rs b/src/query/sql/src/planner/binder/mod.rs index d1d528de2c5c..a1de5b9db7b4 100644 --- a/src/query/sql/src/planner/binder/mod.rs +++ b/src/query/sql/src/planner/binder/mod.rs @@ -76,4 +76,6 @@ pub use location::parse_uri_location; pub use scalar::ScalarBinder; pub use scalar_common::*; pub use stream_column_factory::STREAM_COLUMN_FACTORY; +pub use window::bind_window_function_info; +pub use window::WindowFunctionInfo; pub use window::WindowOrderByInfo; diff --git a/src/query/sql/src/planner/binder/window.rs b/src/query/sql/src/planner/binder/window.rs index e4d1d893223b..052a1b9c50a4 100644 --- a/src/query/sql/src/planner/binder/window.rs +++ b/src/query/sql/src/planner/binder/window.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use databend_common_ast::ast::WindowDefinition; use databend_common_ast::ast::WindowSpec; use databend_common_ast::Span; +use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -56,82 +57,7 @@ impl Binder { window_info: &WindowFunctionInfo, child: SExpr, ) -> Result { - let window_plan = Window { - span: window_info.span, - index: window_info.index, - function: window_info.func.clone(), - arguments: window_info.arguments.clone(), - partition_by: window_info.partition_by_items.clone(), - order_by: window_info.order_by_items.clone(), - frame: window_info.frame.clone(), - limit: None, - }; - - // eval scalars before sort - // Generate a `EvalScalar` as the input of `Window`. - let mut scalar_items: Vec = Vec::new(); - for arg in &window_plan.arguments { - scalar_items.push(arg.clone()); - } - for part in &window_plan.partition_by { - scalar_items.push(part.clone()); - } - for order in &window_plan.order_by { - scalar_items.push(order.order_by_item.clone()) - } - - let child = if !scalar_items.is_empty() { - let eval_scalar_plan = EvalScalar { - items: scalar_items, - }; - SExpr::create_unary(Arc::new(eval_scalar_plan.into()), Arc::new(child)) - } else { - child - }; - - let default_nulls_first = !self - .ctx - .get_settings() - .get_sql_dialect() - .unwrap() - .is_null_biggest(); - - let mut sort_items: Vec = vec![]; - if !window_plan.partition_by.is_empty() { - for part in window_plan.partition_by.iter() { - sort_items.push(SortItem { - index: part.index, - asc: true, - nulls_first: default_nulls_first, - }); - } - } - - for order in window_plan.order_by.iter() { - sort_items.push(SortItem { - index: order.order_by_item.index, - asc: order.asc.unwrap_or(true), - nulls_first: order.nulls_first.unwrap_or(default_nulls_first), - }); - } - - let child = if !sort_items.is_empty() { - let sort_plan = Sort { - items: sort_items, - limit: window_plan.limit, - after_exchange: None, - pre_projection: None, - window_partition: window_plan.partition_by.clone(), - }; - SExpr::create_unary(Arc::new(sort_plan.into()), Arc::new(child)) - } else { - child - }; - - Ok(SExpr::create_unary( - Arc::new(window_plan.into()), - Arc::new(child), - )) + bind_window_function_info(&self.ctx, window_info, child) } pub(super) fn analyze_window_definition( @@ -619,6 +545,88 @@ pub fn find_replaced_window_function( }) } +pub fn bind_window_function_info( + ctx: &Arc, + window_info: &WindowFunctionInfo, + child: SExpr, +) -> Result { + let window_plan = Window { + span: window_info.span, + index: window_info.index, + function: window_info.func.clone(), + arguments: window_info.arguments.clone(), + partition_by: window_info.partition_by_items.clone(), + order_by: window_info.order_by_items.clone(), + frame: window_info.frame.clone(), + limit: None, + }; + + // eval scalars before sort + // Generate a `EvalScalar` as the input of `Window`. + let mut scalar_items: Vec = Vec::new(); + for arg in &window_plan.arguments { + scalar_items.push(arg.clone()); + } + for part in &window_plan.partition_by { + scalar_items.push(part.clone()); + } + for order in &window_plan.order_by { + scalar_items.push(order.order_by_item.clone()) + } + + let child = if !scalar_items.is_empty() { + let eval_scalar_plan = EvalScalar { + items: scalar_items, + }; + SExpr::create_unary(Arc::new(eval_scalar_plan.into()), Arc::new(child)) + } else { + child + }; + + let default_nulls_first = !ctx + .get_settings() + .get_sql_dialect() + .unwrap() + .is_null_biggest(); + + let mut sort_items: Vec = vec![]; + if !window_plan.partition_by.is_empty() { + for part in window_plan.partition_by.iter() { + sort_items.push(SortItem { + index: part.index, + asc: true, + nulls_first: default_nulls_first, + }); + } + } + + for order in window_plan.order_by.iter() { + sort_items.push(SortItem { + index: order.order_by_item.index, + asc: order.asc.unwrap_or(true), + nulls_first: order.nulls_first.unwrap_or(default_nulls_first), + }); + } + + let child = if !sort_items.is_empty() { + let sort_plan = Sort { + items: sort_items, + limit: window_plan.limit, + after_exchange: None, + pre_projection: None, + window_partition: window_plan.partition_by.clone(), + }; + SExpr::create_unary(Arc::new(sort_plan.into()), Arc::new(child)) + } else { + child + }; + + Ok(SExpr::create_unary( + Arc::new(window_plan.into()), + Arc::new(child), + )) +} + impl Binder { /// Analyze windows in select clause, this will rewrite window functions. /// See [`WindowRewriter`] for more details. diff --git a/src/query/sql/src/planner/format/display_rel_operator.rs b/src/query/sql/src/planner/format/display_rel_operator.rs index 8abb5809546e..c39bb40f7d37 100644 --- a/src/query/sql/src/planner/format/display_rel_operator.rs +++ b/src/query/sql/src/planner/format/display_rel_operator.rs @@ -209,6 +209,9 @@ fn format_join(op: &Join) -> String { JoinType::RightMark => "RightMark".to_string(), JoinType::LeftSingle => "LeftSingle".to_string(), JoinType::RightSingle => "RightSingle".to_string(), + JoinType::Asof => "Asof".to_string(), + JoinType::LeftAsof => "LeftAsof".to_string(), + JoinType::RightAsof => "RightAsof".to_string(), }; format!("Join({})", join_type) diff --git a/src/query/sql/src/planner/plans/join.rs b/src/query/sql/src/planner/plans/join.rs index cfc93c846323..2a70d95e50ab 100644 --- a/src/query/sql/src/planner/plans/join.rs +++ b/src/query/sql/src/planner/plans/join.rs @@ -63,6 +63,10 @@ pub enum JoinType { /// Single Join is a special kind of join that is used to process correlated scalar subquery. LeftSingle, RightSingle, + /// Asof Join special for Speed ​​up timestamp join + Asof, + LeftAsof, + RightAsof, } impl JoinType { @@ -78,6 +82,8 @@ impl JoinType { JoinType::RightAnti => JoinType::LeftAnti, JoinType::LeftMark => JoinType::RightMark, JoinType::RightMark => JoinType::LeftMark, + JoinType::RightAsof => JoinType::LeftAsof, + JoinType::LeftAsof => JoinType::RightAsof, _ => self.clone(), } } @@ -90,6 +96,8 @@ impl JoinType { | JoinType::Full | JoinType::LeftSingle | JoinType::RightSingle + | JoinType::LeftAsof + | JoinType::RightAsof ) } @@ -140,6 +148,15 @@ impl Display for JoinType { JoinType::RightSingle => { write!(f, "RIGHT SINGLE") } + JoinType::Asof => { + write!(f, "ASOF") + } + JoinType::LeftAsof => { + write!(f, "LEFT ASOF") + } + JoinType::RightAsof => { + write!(f, "RIGHT ASOF") + } } } } @@ -536,8 +553,12 @@ impl Operator for Join { )?; let cardinality = match self.join_type { JoinType::Inner | JoinType::Cross => inner_join_cardinality, - JoinType::Left => f64::max(left_cardinality, inner_join_cardinality), - JoinType::Right => f64::max(right_cardinality, inner_join_cardinality), + JoinType::Left | JoinType::Asof | JoinType::LeftAsof => { + f64::max(left_cardinality, inner_join_cardinality) + } + JoinType::Right | JoinType::RightAsof => { + f64::max(right_cardinality, inner_join_cardinality) + } JoinType::Full => { f64::max(left_cardinality, inner_join_cardinality) + f64::max(right_cardinality, inner_join_cardinality) diff --git a/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join.test b/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join.test new file mode 100644 index 000000000000..585a931b9772 --- /dev/null +++ b/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join.test @@ -0,0 +1,59 @@ +# name: test/sql/join/asof/test_asof_join.test +# description: Test As-Of join useage +# group: [asof] +# Use doubles for readable infinities +statement ok +CREATE TABLE events0 (begin DOUBLE, value INTEGER); + +statement ok +INSERT INTO events0 VALUES + (1, 0), + (3, 1), + (6, 2), + (8, 3) +; + +# Prevent optimiser from removing true inequalities +statement ok +create table prices(wh timestamp, symbol int, price int); + +statement ok +insert into prices values ('2020-01-01 00:00:00', 1, 42); + +statement ok +create table trades(wh timestamp, symbol int); + +statement ok +insert into trades values ('2020-01-01 00:00:03', 1); + +query III +SELECT t.*, p.price +FROM trades t ASOF JOIN prices p + ON t.symbol = p.symbol AND t.wh >= p.wh; +---- +2020-01-01 00:00:03.000000 1 42 + +# +# Errors +# + +# Invalid ASOF JOIN comparison +statement error (?s).*Missing inequality condition! +SELECT p.ts, e.value +FROM range(0,10) p(ts) ASOF JOIN events0 e +ON p.ts <> e.begin +ORDER BY p.ts ASC + +# Missing ASOF JOIN inequality +statement error (?s).*Missing inequality condition! +SELECT p.ts, e.value +FROM range(0,10) p(ts) ASOF JOIN events0 e +ON p.ts = e.begin +ORDER BY p.ts ASC + +# Multiple ASOF JOIN inequalities +statement error (?s).*Multiple inequalities condition! +SELECT p.ts, e.value +FROM range(0,10) p(ts) ASOF JOIN events0 e +ON p.ts >= e.begin AND p.ts >= e.value +ORDER BY p.ts ASC diff --git a/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_double.test b/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_double.test new file mode 100644 index 000000000000..a3183986344c --- /dev/null +++ b/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_double.test @@ -0,0 +1,58 @@ +# name: test/sql/join/asof/test_asof_join_doubles.test +# description: Test As-Of joins for floating point +# group: [asof] + +# +# Inequality only +# +# +# With equality +# + +statement ok +CREATE TABLE events (key INTEGER DEFAULT '0', begin DOUBLE NOT NULL DEFAULT '0', value INTEGER DEFAULT '0'); + +statement ok +INSERT INTO events VALUES + (1, 1, 0), + (1, 3, 1), + (1, 6, 2), + (1, 8, 3), + (2, 0, 10), + (2, 7, 20), + (2, 11, 30), + (3, 11, 30) +; + +statement ok +CREATE TABLE probes (key INTEGER DEFAULT '0', ts DOUBLE NOT NULL DEFAULT '0'); + +statement ok +INSERT INTO probes VALUES + (1, 1), + (1, 2), + (1, 3), + (2, 1), + (2, 2), + (2, 3) +; + +# INNER Window version +query III nosort inner_equality +SELECT p.key, p.ts, e.value +FROM + probes p +JOIN ( + SELECT key, value, begin, + LEAD(begin, 1, '999'::DOUBLE) OVER (PARTITION BY key ORDER BY begin ASC) AS ed + FROM events +) e +ON p.key = e.key AND p.ts >= e.begin AND p.ts < e.ed +ORDER BY 1, 2 ASC +---- +1 1.0 0 +1 2.0 0 +1 3.0 1 +2 1.0 10 +2 2.0 10 +2 3.0 10 diff --git a/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_inequal.test b/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_inequal.test new file mode 100644 index 000000000000..ffcad2af964e --- /dev/null +++ b/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_inequal.test @@ -0,0 +1,69 @@ +# name: test/sql/join/asof/test_asof_join_inequalities.test +# description: Test As-Of joins for greater than and less thans +# group: [asof] + +# Join on a timestamp range + +statement ok +CREATE TABLE events0 AS + (SELECT add_hours('2023-03-21 13:00:00'::TIMESTAMP , v) AS begin, v AS value + FROM range(0, 4) vals(v) ); + +statement ok +CREATE TABLE probe0 AS + ( SELECT add_hours('2023-03-21 12:00:00'::TIMESTAMP, v) AS begin + FROM range(0,10) vals(v)); + +# Check results against IEJoin +# +# Strictly Greater Than +# + +# INNER +query III +SELECT p.begin, e.begin, e.value +FROM probe0 p ASOF JOIN events0 e +ON p.begin > e.begin +ORDER BY 1,2 ASC +---- +2023-03-21 14:00:00.000000 2023-03-21 13:00:00.000000 0 +2023-03-21 15:00:00.000000 2023-03-21 14:00:00.000000 1 +2023-03-21 16:00:00.000000 2023-03-21 15:00:00.000000 2 +2023-03-21 17:00:00.000000 2023-03-21 16:00:00.000000 3 +2023-03-21 18:00:00.000000 2023-03-21 16:00:00.000000 3 +2023-03-21 19:00:00.000000 2023-03-21 16:00:00.000000 3 +2023-03-21 20:00:00.000000 2023-03-21 16:00:00.000000 3 +2023-03-21 21:00:00.000000 2023-03-21 16:00:00.000000 3 + +# +# Less Than or Equal +# + +# INNER +query III +SELECT p.begin, e.begin, e.value +FROM probe0 p ASOF JOIN events0 e +ON p.begin <= e.begin +ORDER BY 1,2 ASC +---- +2023-03-21 12:00:00.000000 2023-03-21 13:00:00.000000 0 +2023-03-21 13:00:00.000000 2023-03-21 13:00:00.000000 0 +2023-03-21 14:00:00.000000 2023-03-21 14:00:00.000000 1 +2023-03-21 15:00:00.000000 2023-03-21 15:00:00.000000 2 +2023-03-21 16:00:00.000000 2023-03-21 16:00:00.000000 3 + +# +# Strictly Less Than +# + +# INNER +query III +SELECT p.begin, e.begin, e.value +FROM probe0 p ASOF JOIN events0 e +ON p.begin < e.begin +ORDER BY 1,2 ASC +---- +2023-03-21 12:00:00.000000 2023-03-21 13:00:00.000000 0 +2023-03-21 13:00:00.000000 2023-03-21 14:00:00.000000 1 +2023-03-21 14:00:00.000000 2023-03-21 15:00:00.000000 2 +2023-03-21 15:00:00.000000 2023-03-21 16:00:00.000000 3 diff --git a/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_ints.test b/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_ints.test new file mode 100644 index 000000000000..a364053f6ddd --- /dev/null +++ b/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_ints.test @@ -0,0 +1,81 @@ +# name: test/sql/join/asof/test_asof_join_integers.test +# description: Test As-Of joins for integers +# group: [asof] + +# Join on a string range + +statement ok +CREATE TABLE events0 (begin INTEGER, value INTEGER); + +statement ok +INSERT INTO events0 VALUES + (NULL, -1), + (1, 0), + (3, 1), + (6, 2), + (8, 3), + (999999, 9) +; + +statement ok +CREATE TABLE probe0 AS + (SELECT v::INTEGER AS begin + FROM range(0,10) vals(v)) +; + +# This is not implemented yet because it requires a dedicated operator +# instead of LEAD(...infinity::INTEGER) + +# INNER ON inequality only +query II +SELECT p.begin, e.value +FROM probe0 p ASOF JOIN events0 e +ON p.begin >= e.begin +ORDER BY p.begin ASC +---- +1 0 +2 0 +3 1 +4 1 +5 1 +6 2 +7 2 +8 3 +9 3 + +# LEFT ON inequality only +query II +SELECT p.begin, e.value +FROM probe0 p ASOF LEFT JOIN events0 e +ON p.begin >= e.begin +ORDER BY p.begin ASC +---- +0 NULL +1 0 +2 0 +3 1 +4 1 +5 1 +6 2 +7 2 +8 3 +9 3 + +# RIGHT ON inequality only +query II +SELECT p.begin, e.value +FROM probe0 p ASOF RIGHT JOIN events0 e +ON p.begin >= e.begin +ORDER BY p.begin ASC, e.value ASC +---- +1 0 +2 0 +3 1 +4 1 +5 1 +6 2 +7 2 +8 3 +9 3 +NULL -1 +NULL 9 diff --git a/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_miss.test b/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_miss.test new file mode 100644 index 000000000000..4e20015dc833 --- /dev/null +++ b/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_miss.test @@ -0,0 +1,126 @@ +# name: test/sql/join/asof/test_asof_join_missing.test_slow +# description: Test As-Of join with missing matches +# group: [asof] + + +# These test stress several aspects of the matching: +# * Probe inequality less than the minimum (no match) +# * Probe equality missing (no match) +# * More than 64 valid probe entries (mask => SV construction) +# * First radix bin empty. +# * First payload bin empty +# * Multiple scanned payload blocks + +# Check results against IEJoin + +# 10 dates, 5 keys +query I +WITH build AS ( + SELECT k, add_minutes('2001-01-01 00:00:00'::TIMESTAMP , v) AS t, v + FROM range(0,10) vals(v), range(0,5) keys(k) +), probe AS ( + SELECT k * 2 AS k, add_seconds(t, -30) AS t + FROM build +) +SELECT SUM(v) +FROM probe p ASOF JOIN build b on p.k=b.k and p.t>=b.t; +---- +108 + +# Coverage: Missing right side bin +query II +WITH build AS ( + SELECT k * 2 as k, add_minutes('2001-01-01 00:00:00'::TIMESTAMP , v) AS t, v + FROM range(0,10) vals(v), range(0,5) keys(k) +), probe AS ( + SELECT k / 2 AS k, add_seconds(t ,- 30) AS t + FROM build +) +SELECT SUM(v), COUNT(*) +FROM probe p ASOF JOIN build b on p.k=b.k and p.t>=b.t; +---- +108 27 + +# 20 dates, 5 keys +query I +WITH build AS ( + SELECT k, add_minutes('2001-01-01 00:00:00'::TIMESTAMP , v) AS t, v + FROM range(0,20) vals(v), range(0,5) keys(k) +), probe AS ( + SELECT k * 2 AS k, add_seconds(t ,- 30) AS t + FROM build +) +SELECT SUM(v) +FROM probe p ASOF JOIN build b on p.k=b.k and p.t>=b.t; +---- +513 + +# 30 dates, 5 keys +query I +WITH build AS ( + SELECT k, add_minutes('2001-01-01 00:00:00'::TIMESTAMP , v) AS t, v + FROM range(0,30) vals(v), range(0,5) keys(k) +), probe AS ( + SELECT k * 2 AS k, add_seconds(t ,- 30) AS t + FROM build +) +SELECT SUM(v) +FROM probe p ASOF JOIN build b on p.k=b.k and p.t>=b.t; +---- +1218 + +# 50 dates, 5 keys +query I +WITH build AS ( + SELECT k, add_minutes('2001-01-01 00:00:00'::TIMESTAMP , v) AS t, v + FROM range(0,50) vals(v), range(0,5) keys(k) +), probe AS ( + SELECT k * 2 AS k, add_seconds(t ,- 30) AS t + FROM build +) +SELECT SUM(v) +FROM probe p ASOF JOIN build b on p.k=b.k and p.t>=b.t; +---- +3528 + +# 100 dates, 5 keys +query I +WITH build AS ( + SELECT k, add_minutes('2001-01-01 00:00:00'::TIMESTAMP , v) AS t, v + FROM range(0,100) vals(v), range(0,5) keys(k) +), probe AS ( + SELECT k * 2 AS k, add_seconds(t ,- 30) AS t + FROM build +) +SELECT SUM(v) +FROM probe p ASOF JOIN build b on p.k=b.k and p.t>=b.t; +---- +14553 + +# 100 dates, 50 keys +query I +WITH build AS ( + SELECT k, add_minutes('2001-01-01 00:00:00'::TIMESTAMP , v) AS t, v + FROM range(0,100) vals(v), range(0,50) keys(k) +), probe AS ( + SELECT k * 2 AS k, add_seconds(t ,- 30) AS t + FROM build +) +SELECT SUM(v) +FROM probe p ASOF JOIN build b on p.k=b.k and p.t>=b.t; +---- +121275 + +# 1000 dates, 5 keys +query I +WITH build AS ( + SELECT k, add_minutes('2001-01-01 00:00:00'::TIMESTAMP , v) AS t, v + FROM range(0,1000) vals(v), range(0,5) keys(k) +), probe AS ( + SELECT k * 2 AS k, add_seconds(t ,- 30) AS t + FROM build +) +SELECT SUM(v) +FROM probe p ASOF JOIN build b on p.k=b.k and p.t>=b.t; +---- +1495503 diff --git a/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_time.test b/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_time.test new file mode 100644 index 000000000000..6d54c9fbcdfd --- /dev/null +++ b/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_time.test @@ -0,0 +1,36 @@ +# name: test/sql/join/asof/test_asof_join_timestamps.test +# description: Test As-Of joins for timestamps +# group: [asof] + +# Join on a timestamp range + +statement ok +CREATE TABLE events0 AS ( + SELECT + add_hours('2023-03-21 13:00:00' :: TIMESTAMP, v) AS BEGIN, + v AS value + FROM + RANGE(0, 4) vals(v) +); + +statement ok +CREATE TABLE probe0 AS + ( SELECT add_hours('2023-03-21 12:00:00'::TIMESTAMP,v) AS BEGIN + FROM range(0,10) vals(v)); + +# INNER ON inequality only +query II nosort +SELECT p.begin, e.value +FROM probe0 p ASOF JOIN events0 e +ON p.begin >= e.begin +ORDER BY p.begin ASC +---- +2023-03-21 13:00:00.000000 0 +2023-03-21 14:00:00.000000 1 +2023-03-21 15:00:00.000000 2 +2023-03-21 16:00:00.000000 3 +2023-03-21 17:00:00.000000 3 +2023-03-21 18:00:00.000000 3 +2023-03-21 19:00:00.000000 3 +2023-03-21 20:00:00.000000 3 +2023-03-21 21:00:00.000000 3