diff --git a/Cargo.lock b/Cargo.lock index bcf694bfcbfc0..a60dd02877c28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4021,6 +4021,7 @@ dependencies = [ "databend-common-storages-view", "databend-common-users", "databend-enterprise-data-mask-feature", + "databend-enterprise-hilbert-clustering", "databend-storages-common-cache", "databend-storages-common-io", "databend-storages-common-session", @@ -4673,6 +4674,18 @@ dependencies = [ "databend-common-meta-app", ] +[[package]] +name = "databend-enterprise-hilbert-clustering" +version = "0.1.0" +dependencies = [ + "async-backtrace", + "async-trait", + "databend-common-base", + "databend-common-catalog", + "databend-common-exception", + "databend-storages-common-table-meta", +] + [[package]] name = "databend-enterprise-inverted-index" version = "0.1.0" @@ -4734,6 +4747,7 @@ dependencies = [ "databend-enterprise-background-service", "databend-enterprise-data-mask-feature", "databend-enterprise-fail-safe", + "databend-enterprise-hilbert-clustering", "databend-enterprise-inverted-index", "databend-enterprise-storage-encryption", "databend-enterprise-storage-quota", diff --git a/Cargo.toml b/Cargo.toml index 972b749ad0c0f..cf0d7e746f9c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -176,6 +176,7 @@ databend-enterprise-attach-table = { path = "src/query/ee_features/attach_table" databend-enterprise-background-service = { path = "src/query/ee_features/background_service" } databend-enterprise-data-mask-feature = { path = "src/query/ee_features/data_mask" } databend-enterprise-fail-safe = { path = "src/query/ee_features/fail_safe" } +databend-enterprise-hilbert-clustering = { path = "src/query/ee_features/hilbert_clustering" } databend-enterprise-inverted-index = { path = "src/query/ee_features/inverted_index" } databend-enterprise-meta = { path = "src/meta/ee" } databend-enterprise-query = { path = "src/query/ee" } diff --git a/src/common/exception/src/exception_code.rs b/src/common/exception/src/exception_code.rs index c5923dfd08eba..9f0bfaff4b13d 100644 --- a/src/common/exception/src/exception_code.rs +++ b/src/common/exception/src/exception_code.rs @@ -419,8 +419,9 @@ build_exceptions! { // recluster error codes NoNeedToRecluster(4011), NoNeedToCompact(4012), + UnsupportedClusterType(4013), - RefreshTableInfoFailure(4012), + RefreshTableInfoFailure(4021), } // Service errors [5001,6000]. diff --git a/src/common/license/src/license.rs b/src/common/license/src/license.rs index b2d9769e2ffca..5ef0828d71399 100644 --- a/src/common/license/src/license.rs +++ b/src/common/license/src/license.rs @@ -73,6 +73,8 @@ pub enum Feature { StorageQuota(StorageQuota), #[serde(alias = "amend_table", alias = "AMEND_TABLE")] AmendTable, + #[serde(alias = "hilbert_clustering", alias = "HILBERT_CLUSTERING")] + HilbertClustering, #[serde(other)] Unknown, } @@ -119,6 +121,7 @@ impl fmt::Display for Feature { write!(f, ")") } Feature::AmendTable => write!(f, "amend_table"), + Feature::HilbertClustering => write!(f, "hilbert_clustering"), Feature::Unknown => write!(f, "unknown"), } } @@ -166,7 +169,8 @@ impl Feature { | (Feature::InvertedIndex, Feature::InvertedIndex) | (Feature::VirtualColumn, Feature::VirtualColumn) | (Feature::AttacheTable, Feature::AttacheTable) - | (Feature::StorageEncryption, Feature::StorageEncryption) => Ok(true), + | (Feature::StorageEncryption, Feature::StorageEncryption) + | (Feature::HilbertClustering, Feature::HilbertClustering) => Ok(true), (_, _) => Ok(false), } } @@ -334,6 +338,11 @@ mod tests { serde_json::from_str::("\"amend_table\"").unwrap() ); + assert_eq!( + Feature::HilbertClustering, + serde_json::from_str::("\"hilbert_clustering\"").unwrap() + ); + assert_eq!( Feature::Unknown, serde_json::from_str::("\"ssss\"").unwrap() @@ -367,11 +376,12 @@ mod tests { storage_usage: Some(1), }), Feature::AmendTable, + Feature::HilbertClustering, ]), }; assert_eq!( - "LicenseInfo{ type: enterprise, org: databend, tenants: [databend_tenant,foo], features: [aggregate_index,amend_table,attach_table,background_service,compute_quota(threads_num: 1, memory_usage: 1),computed_column,data_mask,inverted_index,license_info,storage_encryption,storage_quota(storage_usage: 1),stream,vacuum,virtual_column] }", + "LicenseInfo{ type: enterprise, org: databend, tenants: [databend_tenant,foo], features: [aggregate_index,amend_table,attach_table,background_service,compute_quota(threads_num: 1, memory_usage: 1),computed_column,data_mask,hilbert_clustering,inverted_index,license_info,storage_encryption,storage_quota(storage_usage: 1),stream,vacuum,virtual_column] }", license_info.to_string() ); } diff --git a/src/query/catalog/src/plan/partition.rs b/src/query/catalog/src/plan/partition.rs index 3278c22d3123b..d0d1f8a4d0202 100644 --- a/src/query/catalog/src/plan/partition.rs +++ b/src/query/catalog/src/plan/partition.rs @@ -391,3 +391,11 @@ impl ReclusterParts { } } } + +// TODO refine this +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)] +pub struct ReclusterInfoSideCar { + pub merged_blocks: Vec>, + pub removed_segment_indexes: Vec, + pub removed_statistics: Statistics, +} diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index d3e04f0686089..c58ecedb3f036 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -19,6 +19,9 @@ use std::sync::Arc; use chrono::DateTime; use chrono::Utc; +use databend_common_ast::ast::Expr; +use databend_common_ast::parser::parse_comma_separated_exprs; +use databend_common_ast::parser::tokenize_sql; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockThresholds; @@ -136,6 +139,28 @@ pub trait Table: Sync + Send { Some(cluster_type) } + fn resolve_cluster_keys(&self, ctx: Arc) -> Option> { + let Some((_, cluster_key_str)) = &self.cluster_key_meta() else { + return None; + }; + let tokens = tokenize_sql(cluster_key_str).unwrap(); + let sql_dialect = ctx.get_settings().get_sql_dialect().unwrap_or_default(); + let mut ast_exprs = parse_comma_separated_exprs(&tokens, sql_dialect).unwrap(); + // unwrap tuple. + if ast_exprs.len() == 1 { + if let Expr::Tuple { exprs, .. } = &ast_exprs[0] { + ast_exprs = exprs.clone(); + } + } else { + // Defensive check: + // `ast_exprs` should always contain one element which can be one of the following: + // 1. A tuple of composite cluster keys + // 2. A single cluster key + unreachable!("invalid cluster key ast expression, {:?}", ast_exprs); + } + Some(ast_exprs) + } + fn change_tracking_enabled(&self) -> bool { false } diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index c7f45b5701847..9f9c0a832f54b 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -279,11 +279,23 @@ pub trait TableContext: Send + Sync { max_files: Option, ) -> Result; - fn add_segment_location(&self, segment_loc: Location) -> Result<()>; + fn add_inserted_segment_location(&self, segment_loc: Location) -> Result<()>; - fn clear_segment_locations(&self) -> Result<()>; + fn clear_inserted_segment_locations(&self) -> Result<()>; - fn get_segment_locations(&self) -> Result>; + fn get_inserted_segment_locations(&self) -> Result>; + + fn add_selected_segment_location(&self, _segment_loc: Location) { + unimplemented!() + } + + fn get_selected_segment_locations(&self) -> Vec { + unimplemented!() + } + + fn clear_selected_segment_locations(&self) { + unimplemented!() + } fn add_file_status(&self, file_path: &str, file_status: FileStatus) -> Result<()>; diff --git a/src/query/ee/Cargo.toml b/src/query/ee/Cargo.toml index def5aeb0bc62f..772711bfc6be5 100644 --- a/src/query/ee/Cargo.toml +++ b/src/query/ee/Cargo.toml @@ -47,6 +47,7 @@ databend-enterprise-attach-table = { workspace = true } databend-enterprise-background-service = { workspace = true } databend-enterprise-data-mask-feature = { workspace = true } databend-enterprise-fail-safe = { workspace = true } +databend-enterprise-hilbert-clustering = { workspace = true } databend-enterprise-inverted-index = { workspace = true } databend-enterprise-storage-encryption = { workspace = true } databend-enterprise-storage-quota = { workspace = true } diff --git a/src/query/ee/src/enterprise_services.rs b/src/query/ee/src/enterprise_services.rs index 4d43c636a9b11..81fb0ed2cc30e 100644 --- a/src/query/ee/src/enterprise_services.rs +++ b/src/query/ee/src/enterprise_services.rs @@ -21,6 +21,7 @@ use crate::attach_table::RealAttachTableHandler; use crate::background_service::RealBackgroundService; use crate::data_mask::RealDatamaskHandler; use crate::fail_safe::RealFailSafeHandler; +use crate::hilbert_clustering::RealHilbertClusteringHandler; use crate::inverted_index::RealInvertedIndexHandler; use crate::license::license_mgr::RealLicenseManager; use crate::storage_encryption::RealStorageEncryptionHandler; @@ -45,6 +46,7 @@ impl EnterpriseServices { RealInvertedIndexHandler::init()?; RealStorageQuotaHandler::init(&cfg)?; RealFailSafeHandler::init()?; + RealHilbertClusteringHandler::init()?; Ok(()) } } diff --git a/src/query/ee/src/hilbert_clustering/handler.rs b/src/query/ee/src/hilbert_clustering/handler.rs new file mode 100644 index 0000000000000..1d40e1fd8228c --- /dev/null +++ b/src/query/ee/src/hilbert_clustering/handler.rs @@ -0,0 +1,178 @@ +// Copyright 2023 Databend Cloud +// +// Licensed under the Elastic License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.elastic.co/licensing/elastic-license +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_base::base::GlobalInstance; +use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::plan::ReclusterInfoSideCar; +use databend_common_catalog::table::Table; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_common_expression::BlockThresholds; +use databend_common_storages_fuse::pruning::create_segment_location_vector; +use databend_common_storages_fuse::statistics::reducers::merge_statistics_mut; +use databend_common_storages_fuse::FuseTable; +use databend_common_storages_fuse::SegmentLocation; +use databend_common_storages_fuse::DEFAULT_BLOCK_PER_SEGMENT; +use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; +use databend_enterprise_hilbert_clustering::HilbertClusteringHandler; +use databend_enterprise_hilbert_clustering::HilbertClusteringHandlerWrapper; +use databend_storages_common_table_meta::meta::CompactSegmentInfo; +use databend_storages_common_table_meta::meta::Statistics; +use databend_storages_common_table_meta::meta::TableSnapshot; + +pub struct RealHilbertClusteringHandler {} + +#[async_trait::async_trait] +impl HilbertClusteringHandler for RealHilbertClusteringHandler { + #[async_backtrace::framed] + async fn do_hilbert_clustering( + &self, + table: Arc, + ctx: Arc, + push_downs: Option, + ) -> Result)>> { + let Some((cluster_key_id, _)) = table.cluster_key_meta() else { + return Ok(None); + }; + + let fuse_table = FuseTable::try_from_table(table.as_ref())?; + let Some(snapshot) = fuse_table.read_table_snapshot().await? else { + // no snapshot, no recluster. + return Ok(None); + }; + + let block_per_seg = + fuse_table.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT); + let block_thresholds = fuse_table.get_block_thresholds(); + let thresholds = BlockThresholds { + max_rows_per_block: block_per_seg * block_thresholds.max_rows_per_block, + min_rows_per_block: block_per_seg * block_thresholds.min_rows_per_block, + max_bytes_per_block: block_per_seg * block_thresholds.max_bytes_per_block, + }; + let segment_locations = snapshot.segments.clone(); + let segment_locations = create_segment_location_vector(segment_locations, None); + + let max_threads = ctx.get_settings().get_max_threads()? as usize; + let chunk_size = max_threads * 4; + let mut target_segments = vec![]; + let mut total_rows = 0; + let mut total_size = 0; + let mut stable = false; + 'FOR: for chunk in segment_locations.chunks(chunk_size) { + // read segments. + let compact_segments = FuseTable::segment_pruning( + &ctx, + fuse_table.schema_with_stream(), + fuse_table.get_operator(), + &push_downs, + fuse_table.get_storage_format(), + chunk.to_vec(), + ) + .await?; + + if compact_segments.is_empty() { + continue; + } + + for (location, segment) in compact_segments.into_iter() { + total_rows += segment.summary.row_count as usize; + total_size += segment.summary.uncompressed_byte_size as usize; + if !thresholds.check_large_enough(total_rows, total_size) { + // totals < N + target_segments.push((location, segment.clone())); + continue; + } + + if thresholds.check_for_compact(total_rows, total_size) { + // N <= totals < 2N + target_segments.push((location, segment.clone())); + } else { + // totals >= 2N + let new_target_segments = vec![(location, segment)]; + if Self::check_for_recluster(&new_target_segments, cluster_key_id, stable) { + target_segments = new_target_segments; + stable = true; + break 'FOR; + } + }; + + if Self::check_for_recluster(&target_segments, cluster_key_id, stable) { + stable = true; + break 'FOR; + } + target_segments.clear(); + total_rows = 0; + total_size = 0; + } + } + + if !stable && !Self::check_for_recluster(&target_segments, cluster_key_id, stable) { + return Ok(None); + } + + let rows_per_block = block_thresholds.calc_rows_per_block(total_size, total_rows) as u64; + let block_size = ctx.get_settings().get_max_block_size()?; + ctx.get_settings() + .set_max_block_size(rows_per_block.min(block_size))?; + + let mut removed_statistics = Statistics::default(); + let mut removed_segment_indexes = Vec::with_capacity(target_segments.len()); + for (segment_loc, segment) in target_segments { + ctx.add_selected_segment_location(segment_loc.location); + removed_segment_indexes.push(segment_loc.segment_idx); + merge_statistics_mut( + &mut removed_statistics, + &segment.summary, + Some(cluster_key_id), + ); + } + + let recluster_info = ReclusterInfoSideCar { + merged_blocks: vec![], + removed_segment_indexes, + removed_statistics, + }; + Ok(Some((recluster_info, snapshot))) + } +} + +impl RealHilbertClusteringHandler { + pub fn init() -> Result<()> { + let handler = RealHilbertClusteringHandler {}; + let wrapper = HilbertClusteringHandlerWrapper::new(Box::new(handler)); + GlobalInstance::set(Arc::new(wrapper)); + Ok(()) + } + + fn check_for_recluster( + segments: &[(SegmentLocation, Arc)], + default_cluster_id: u32, + stable: bool, + ) -> bool { + match segments.len() { + 0 => false, + 1 => segments[0] + .1 + .summary + .cluster_stats + .as_ref() + .map_or(true, |stats| { + stats.cluster_key_id != default_cluster_id || (stats.level != -1 && stable) + }), + _ => true, + } + } +} diff --git a/src/query/ee/src/hilbert_clustering/mod.rs b/src/query/ee/src/hilbert_clustering/mod.rs new file mode 100644 index 0000000000000..fa357368f0f4d --- /dev/null +++ b/src/query/ee/src/hilbert_clustering/mod.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Databend Cloud +// +// Licensed under the Elastic License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.elastic.co/licensing/elastic-license +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod handler; + +pub use handler::RealHilbertClusteringHandler; diff --git a/src/query/ee/src/lib.rs b/src/query/ee/src/lib.rs index 7bf07a60abe3c..4a45297dd40c8 100644 --- a/src/query/ee/src/lib.rs +++ b/src/query/ee/src/lib.rs @@ -18,6 +18,7 @@ pub mod background_service; pub mod data_mask; pub mod enterprise_services; pub mod fail_safe; +pub mod hilbert_clustering; pub mod inverted_index; pub mod license; pub mod storage_encryption; diff --git a/src/query/ee_features/hilbert_clustering/Cargo.toml b/src/query/ee_features/hilbert_clustering/Cargo.toml new file mode 100644 index 0000000000000..0f856e1c45143 --- /dev/null +++ b/src/query/ee_features/hilbert_clustering/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "databend-enterprise-hilbert-clustering" +version = { workspace = true } +authors = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +edition = { workspace = true } + +[lib] +doctest = false +test = true + +[dependencies] +async-backtrace = { workspace = true } +async-trait = { workspace = true } +databend-common-base = { workspace = true } +databend-common-catalog = { workspace = true } +databend-common-exception = { workspace = true } +databend-storages-common-table-meta = { workspace = true } + +[build-dependencies] + +[lints] +workspace = true diff --git a/src/query/ee_features/hilbert_clustering/src/handler.rs b/src/query/ee_features/hilbert_clustering/src/handler.rs new file mode 100644 index 0000000000000..a0104079d7026 --- /dev/null +++ b/src/query/ee_features/hilbert_clustering/src/handler.rs @@ -0,0 +1,59 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_base::base::GlobalInstance; +use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::plan::ReclusterInfoSideCar; +use databend_common_catalog::table::Table; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_storages_common_table_meta::meta::TableSnapshot; + +#[async_trait::async_trait] +pub trait HilbertClusteringHandler: Sync + Send { + async fn do_hilbert_clustering( + &self, + table: Arc, + ctx: Arc, + push_downs: Option, + ) -> Result)>>; +} + +pub struct HilbertClusteringHandlerWrapper { + handler: Box, +} + +impl HilbertClusteringHandlerWrapper { + pub fn new(handler: Box) -> Self { + Self { handler } + } + + #[async_backtrace::framed] + pub async fn do_hilbert_clustering( + &self, + table: Arc, + ctx: Arc, + push_downs: Option, + ) -> Result)>> { + self.handler + .do_hilbert_clustering(table, ctx, push_downs) + .await + } +} + +pub fn get_hilbert_clustering_handler() -> Arc { + GlobalInstance::get() +} diff --git a/src/query/ee_features/hilbert_clustering/src/lib.rs b/src/query/ee_features/hilbert_clustering/src/lib.rs new file mode 100644 index 0000000000000..ff2070f19983b --- /dev/null +++ b/src/query/ee_features/hilbert_clustering/src/lib.rs @@ -0,0 +1,18 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[allow(unused)] +mod handler; + +pub use handler::*; diff --git a/src/query/expression/src/kernels/sort.rs b/src/query/expression/src/kernels/sort.rs index 91bc7ebff2992..02a6c1fcb579d 100644 --- a/src/query/expression/src/kernels/sort.rs +++ b/src/query/expression/src/kernels/sort.rs @@ -16,6 +16,7 @@ use databend_common_exception::Result; use crate::types::DataType; use crate::visitor::ValueVisitor; +use crate::Column; use crate::ColumnBuilder; use crate::DataBlock; use crate::Scalar; @@ -126,8 +127,11 @@ pub fn compare_scalars(rows: Vec>, data_types: &[DataType]) -> Resul .into_iter() .map(|builder| builder.build()) .collect::>(); + compare_columns(order_columns, length) +} - let descriptions = order_columns +pub fn compare_columns(columns: Vec, length: usize) -> Result> { + let descriptions = columns .iter() .enumerate() .map(|(idx, _)| SortColumnDescription { @@ -139,7 +143,7 @@ pub fn compare_scalars(rows: Vec>, data_types: &[DataType]) -> Resul let mut sort_compare = SortCompare::new(descriptions, length, LimitType::None); - for array in order_columns { + for array in columns { sort_compare.visit_value(Value::Column(array))?; sort_compare.increment_column_index(); } diff --git a/src/query/functions/src/aggregates/aggregate_range_bound.rs b/src/query/functions/src/aggregates/aggregate_range_bound.rs new file mode 100644 index 0000000000000..d2835570f6b9b --- /dev/null +++ b/src/query/functions/src/aggregates/aggregate_range_bound.rs @@ -0,0 +1,359 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; + +use borsh::BorshDeserialize; +use borsh::BorshSerialize; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::compare_columns; +use databend_common_expression::types::array::ArrayColumnBuilder; +use databend_common_expression::types::Bitmap; +use databend_common_expression::types::*; +use databend_common_expression::with_number_mapped_type; +use databend_common_expression::AggregateFunctionRef; +use databend_common_expression::Scalar; +use ethnum::i256; +use rand::prelude::SliceRandom; +use rand::rngs::SmallRng; +use rand::thread_rng; +use rand::Rng; +use rand::SeedableRng; + +use super::assert_unary_arguments; +use super::AggregateUnaryFunction; +use super::FunctionData; +use super::UnaryState; +use crate::aggregates::aggregate_function_factory::AggregateFunctionDescription; +use crate::with_simple_no_number_mapped_type; + +struct RangeBoundData { + partitions: usize, + sample_size: usize, +} + +impl FunctionData for RangeBoundData { + fn as_any(&self) -> &dyn Any { + self + } +} + +#[derive(BorshSerialize, BorshDeserialize)] +pub struct RangeBoundState +where + T: ValueType, + T::Scalar: BorshSerialize + BorshDeserialize, +{ + values: Vec<(u64, Vec)>, + total_rows: usize, + total_samples: usize, +} + +impl Default for RangeBoundState +where + T: ValueType, + T::Scalar: BorshSerialize + BorshDeserialize, +{ + fn default() -> Self { + RangeBoundState:: { + values: vec![], + total_rows: 0, + total_samples: 0, + } + } +} + +impl UnaryState> for RangeBoundState +where + T: ArgType + Sync + Send, + T::Scalar: Ord + Sync + Send + BorshSerialize + BorshDeserialize, +{ + fn add( + &mut self, + other: T::ScalarRef<'_>, + function_data: Option<&dyn FunctionData>, + ) -> Result<()> { + let range_bound_data = unsafe { + function_data + .unwrap() + .as_any() + .downcast_ref_unchecked::() + }; + + let total_sample_size = std::cmp::min( + range_bound_data.sample_size * range_bound_data.partitions, + 10_000, + ); + + if self.values.is_empty() { + self.values.push((0, vec![])); + } + let (total_rows, samples) = &mut self.values[0]; + *total_rows += 1; + self.total_rows += 1; + if samples.len() < total_sample_size { + self.total_samples += 1; + samples.push(T::to_owned_scalar(other)); + } else { + let mut rng = thread_rng(); + let replacement_index = rng.gen_range(0..*total_rows) as usize; + if replacement_index < total_sample_size { + self.total_samples += 1; + samples[replacement_index] = T::to_owned_scalar(other); + } + } + Ok(()) + } + + fn add_batch( + &mut self, + other: T::Column, + validity: Option<&Bitmap>, + function_data: Option<&dyn FunctionData>, + ) -> Result<()> { + let column_len = T::column_len(&other); + let unset_bits = validity.map_or(0, |v| v.null_count()); + if unset_bits == column_len { + return Ok(()); + } + + let valid_size = column_len - unset_bits; + let range_bound_data = unsafe { + function_data + .unwrap() + .as_any() + .downcast_ref_unchecked::() + }; + let sample_size = std::cmp::max(valid_size / 100, range_bound_data.sample_size); + + let mut indices = validity.map_or_else( + || (0..column_len).collect::>(), + |v| { + v.iter() + .enumerate() + .filter_map(|(i, v)| v.then_some(i)) + .collect() + }, + ); + + let sampled_indices = if valid_size > sample_size { + let mut rng = SmallRng::from_entropy(); + indices.shuffle(&mut rng); + &indices[..sample_size] + } else { + &indices + }; + + let sample_values = sampled_indices + .iter() + .map(|i| T::to_owned_scalar(unsafe { T::index_column_unchecked(&other, *i) })) + .collect::>(); + + self.total_rows += valid_size; + self.total_samples += sample_values.len(); + self.values.push((valid_size as u64, sample_values)); + Ok(()) + } + + fn merge(&mut self, rhs: &Self) -> Result<()> { + self.values.extend_from_slice(&rhs.values); + self.total_rows += rhs.total_rows; + self.total_samples += rhs.total_samples; + Ok(()) + } + + fn merge_result( + &mut self, + builder: &mut ArrayColumnBuilder, + function_data: Option<&dyn FunctionData>, + ) -> Result<()> { + let range_bound_data = unsafe { + function_data + .unwrap() + .as_any() + .downcast_ref_unchecked::() + }; + let step = self.total_rows as f64 / range_bound_data.partitions as f64; + + let values = std::mem::take(&mut self.values); + let mut data = Vec::with_capacity(self.total_samples); + let mut weights = Vec::with_capacity(self.total_samples); + for (num, values) in values.into_iter() { + let weight = num as f64 / values.len() as f64; + values.into_iter().for_each(|v| { + data.push(v); + weights.push(weight); + }); + } + let col = T::upcast_column(T::column_from_vec(data.clone(), &[])); + let indices = compare_columns(vec![col], self.total_samples)?; + + let mut cum_weight = 0.0; + let mut target = step; + let mut bounds = Vec::with_capacity(range_bound_data.partitions - 1); + let mut previous_bound = None; + + let mut i = 0; + let mut j = 0; + while i < self.total_samples && j < range_bound_data.partitions - 1 { + let idx = indices[i] as usize; + let weight = weights[idx]; + cum_weight += weight; + if cum_weight >= target { + let data = &data[idx]; + if previous_bound.as_ref().map_or(true, |prev| data > prev) { + bounds.push(data.clone()); + target += step; + j += 1; + previous_bound = Some(data.clone()); + } + } + i += 1; + } + + let col = T::column_from_vec(bounds, &[]); + builder.push(col); + Ok(()) + } +} + +pub fn try_create_aggregate_range_bound_function( + display_name: &str, + params: Vec, + arguments: Vec, +) -> Result { + assert_unary_arguments(display_name, arguments.len())?; + let data_type = arguments[0].clone().remove_nullable(); + let function_data = get_partitions(¶ms, display_name)?; + let return_type = DataType::Array(Box::new(data_type.clone())); + + with_simple_no_number_mapped_type!(|T| match data_type { + DataType::T => { + let func = AggregateUnaryFunction::, T, ArrayType>::try_create( + display_name, + return_type, + params, + arguments[0].clone(), + ) + .with_function_data(Box::new(function_data)) + .with_need_drop(true); + Ok(Arc::new(func)) + } + DataType::Number(num_type) => { + with_number_mapped_type!(|NUM| match num_type { + NumberDataType::NUM => { + let func = AggregateUnaryFunction::< + RangeBoundState>, + NumberType, + ArrayType>, + >::try_create( + display_name, return_type, params, arguments[0].clone() + ) + .with_function_data(Box::new(function_data)) + .with_need_drop(true); + Ok(Arc::new(func)) + } + }) + } + DataType::Decimal(DecimalDataType::Decimal128(_)) => { + let func = AggregateUnaryFunction::< + RangeBoundState>, + DecimalType, + ArrayType>, + >::try_create( + display_name, return_type, params, arguments[0].clone() + ) + .with_function_data(Box::new(function_data)) + .with_need_drop(true); + Ok(Arc::new(func)) + } + DataType::Decimal(DecimalDataType::Decimal256(_)) => { + let func = AggregateUnaryFunction::< + RangeBoundState>, + DecimalType, + ArrayType>, + >::try_create( + display_name, return_type, params, arguments[0].clone() + ) + .with_function_data(Box::new(function_data)) + .with_need_drop(true); + Ok(Arc::new(func)) + } + DataType::Binary => { + let func = AggregateUnaryFunction::< + RangeBoundState, + BinaryType, + ArrayType, + >::try_create( + display_name, return_type, params, arguments[0].clone() + ) + .with_function_data(Box::new(function_data)) + .with_need_drop(true); + Ok(Arc::new(func)) + } + _ => Err(ErrorCode::BadDataValueType(format!( + "{} does not support type '{:?}'", + display_name, data_type + ))), + }) +} +pub fn aggregate_range_bound_function_desc() -> AggregateFunctionDescription { + AggregateFunctionDescription::creator(Box::new( + crate::aggregates::try_create_aggregate_range_bound_function, + )) +} + +fn get_partitions(params: &[Scalar], display_name: &str) -> Result { + match params.len() { + 0 => Ok(RangeBoundData { + partitions: 1024, + sample_size: 100, + }), + 1 => { + let partitions = get_positive_integer(¶ms[0], display_name)?; + Ok(RangeBoundData { + partitions, + sample_size: 100, + }) + } + 2 => { + let partitions = get_positive_integer(¶ms[0], display_name)?; + let sample_size = get_positive_integer(¶ms[1], display_name)?; + Ok(RangeBoundData { + partitions, + sample_size, + }) + } + _ => Err(ErrorCode::BadArguments(format!( + "The number of arguments in aggregate function {} must be [0, 1, 2]", + display_name, + ))), + } +} + +fn get_positive_integer(val: &Scalar, display_name: &str) -> Result { + if let Scalar::Number(number) = val { + if let Some(number) = number.integer_to_i128() { + if number > 0 { + return Ok(number as usize); + } + } + } + Err(ErrorCode::BadDataValueType(format!( + "The argument of aggregate function {} must be positive int", + display_name + ))) +} diff --git a/src/query/functions/src/aggregates/aggregator.rs b/src/query/functions/src/aggregates/aggregator.rs index 4c9c88a6e9b40..a8141c310940d 100644 --- a/src/query/functions/src/aggregates/aggregator.rs +++ b/src/query/functions/src/aggregates/aggregator.rs @@ -52,6 +52,7 @@ use crate::aggregates::aggregate_quantile_cont_function_desc; use crate::aggregates::aggregate_quantile_disc_function_desc; use crate::aggregates::aggregate_quantile_tdigest_function_desc; use crate::aggregates::aggregate_quantile_tdigest_weighted_function_desc; +use crate::aggregates::aggregate_range_bound_function_desc; use crate::aggregates::aggregate_retention_function_desc; use crate::aggregates::aggregate_skewness_function_desc; use crate::aggregates::aggregate_st_collect_function_desc; @@ -119,6 +120,8 @@ impl Aggregators { factory.register("skewness", aggregate_skewness_function_desc()); factory.register("string_agg", aggregate_string_agg_function_desc()); + factory.register("range_bound", aggregate_range_bound_function_desc()); + factory.register( "bitmap_and_count", aggregate_bitmap_and_count_function_desc(), diff --git a/src/query/functions/src/aggregates/mod.rs b/src/query/functions/src/aggregates/mod.rs index a24d642ff76f2..8f21e14d41074 100644 --- a/src/query/functions/src/aggregates/mod.rs +++ b/src/query/functions/src/aggregates/mod.rs @@ -40,6 +40,7 @@ mod aggregate_quantile_cont; mod aggregate_quantile_disc; mod aggregate_quantile_tdigest; mod aggregate_quantile_tdigest_weighted; +mod aggregate_range_bound; mod aggregate_retention; mod aggregate_scalar_state; mod aggregate_skewness; @@ -74,6 +75,7 @@ pub use aggregate_quantile_cont::*; pub use aggregate_quantile_disc::*; pub use aggregate_quantile_tdigest::*; pub use aggregate_quantile_tdigest_weighted::*; +pub use aggregate_range_bound::*; pub use aggregate_retention::*; pub use aggregate_skewness::*; pub use aggregate_st_collect::*; diff --git a/src/query/functions/src/scalars/hilbert.rs b/src/query/functions/src/scalars/hilbert.rs index 67f426e6ffb19..b8a6e9a78f551 100644 --- a/src/query/functions/src/scalars/hilbert.rs +++ b/src/query/functions/src/scalars/hilbert.rs @@ -15,6 +15,8 @@ use databend_common_expression::hilbert_index; use databend_common_expression::types::ArrayType; use databend_common_expression::types::BinaryType; +use databend_common_expression::types::GenericType; +use databend_common_expression::types::NullableType; use databend_common_expression::types::NumberDataType; use databend_common_expression::types::NumberType; use databend_common_expression::types::StringType; @@ -56,12 +58,19 @@ pub fn register(registry: &mut FunctionRegistry) { }) } - registry.register_passthrough_nullable_2_arg::, NumberType, BinaryType, _, _>( + registry.register_combine_nullable_2_arg::>, NumberType, BinaryType, _, _>( "hilbert_index", |_, _, _| FunctionDomain::Full, - vectorize_with_builder_2_arg::, NumberType, BinaryType>( + vectorize_with_builder_2_arg::>, NumberType, NullableType>( |val, len, builder, ctx| { - let points = val.iter().collect::>(); + let mut points = Vec::with_capacity(val.len()); + for a in val.iter() { + if a.is_none() { + builder.push_null(); + return; + } + points.push(a.unwrap()); + } let dimension = points.len(); if std::intrinsics::unlikely(len > 64) { @@ -70,11 +79,28 @@ pub fn register(registry: &mut FunctionRegistry) { ctx.set_error(builder.len(), "Dimension must between 2 and 5"); } else { let slice = hilbert_index(&points, len as usize); - builder.put_slice(&slice); + builder.push(&slice); } - - builder.commit_row(); }, ), ); + + registry.register_passthrough_nullable_2_arg::, ArrayType>, NumberType, _, _>( + "range_partition_id", + |_, _, _| FunctionDomain::Full, + vectorize_with_builder_2_arg::, ArrayType>, NumberType>(|val, arr, builder, _| { + let mut low = 0; + let mut high = arr.len(); + while low < high { + let mid = (high + low) / 2; + let bound = unsafe {arr.index_unchecked(mid)}; + if val > bound { + low = mid + 1; + } else { + high = mid; + } + } + builder.push(low as u64); + }), + ); } diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs index 13a8786660c25..85215b0b58b6f 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs @@ -27,7 +27,7 @@ pub fn build_compact_block_no_split_pipeline( thresholds: BlockThresholds, max_threads: usize, ) -> Result<()> { - // has been resize 1. + pipeline.try_resize(1)?; pipeline.add_accumulating_transformer(|| BlockCompactNoSplitBuilder::new(thresholds)); pipeline.try_resize(max_threads)?; pipeline.add_block_meta_transformer(TransformCompactBlock::default); diff --git a/src/query/service/src/interpreters/hook/compact_hook.rs b/src/query/service/src/interpreters/hook/compact_hook.rs index 61e1a7c5bcaaf..960a227c12b5a 100644 --- a/src/query/service/src/interpreters/hook/compact_hook.rs +++ b/src/query/service/src/interpreters/hook/compact_hook.rs @@ -27,6 +27,7 @@ use databend_common_sql::optimizer::SExpr; use databend_common_sql::plans::OptimizeCompactBlock; use databend_common_sql::plans::Recluster; use databend_common_sql::plans::RelOperator; +use databend_storages_common_table_meta::table::ClusterType; use log::info; use crate::interpreters::common::metrics_inc_compact_hook_compact_time_ms; @@ -185,7 +186,7 @@ async fn compact_table( PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?; // Clears previously generated segment locations to avoid duplicate data in the refresh phase - ctx.clear_segment_locations()?; + ctx.clear_inserted_segment_locations()?; ctx.set_executor(complete_executor.get_inner())?; complete_executor.execute()?; drop(complete_executor); @@ -194,21 +195,26 @@ async fn compact_table( { // do recluster. - if table.cluster_key_meta().is_some() { - let recluster = RelOperator::Recluster(Recluster { - catalog: compact_target.catalog, - database: compact_target.database, - table: compact_target.table, - filters: None, - limit: Some(settings.get_auto_compaction_segments_limit()? as usize), - }); - let s_expr = SExpr::create_leaf(Arc::new(recluster)); - let recluster_interpreter = - ReclusterTableInterpreter::try_create(ctx.clone(), s_expr, lock_opt, false)?; - // Recluster will be done in `ReclusterTableInterpreter::execute2` directly, - // we do not need to use `PipelineCompleteExecutor` to execute it. - let build_res = recluster_interpreter.execute2().await?; - assert!(build_res.main_pipeline.is_empty()); + if let Some(cluster_type) = table.cluster_type() { + if cluster_type == ClusterType::Linear { + let recluster = RelOperator::Recluster(Recluster { + catalog: compact_target.catalog, + database: compact_target.database, + table: compact_target.table, + filters: None, + limit: Some(settings.get_auto_compaction_segments_limit()? as usize), + cluster_type: ClusterType::Linear, + metadata: Default::default(), + bind_context: Default::default(), + }); + let s_expr = SExpr::create_leaf(Arc::new(recluster)); + let recluster_interpreter = + ReclusterTableInterpreter::try_create(ctx.clone(), s_expr, lock_opt, false)?; + // Recluster will be done in `ReclusterTableInterpreter::execute2` directly, + // we do not need to use `PipelineCompleteExecutor` to execute it. + let build_res = recluster_interpreter.execute2().await?; + assert!(build_res.main_pipeline.is_empty()); + } } } diff --git a/src/query/service/src/interpreters/hook/refresh_hook.rs b/src/query/service/src/interpreters/hook/refresh_hook.rs index ecdcbab1e9fbf..5860e1c7ead59 100644 --- a/src/query/service/src/interpreters/hook/refresh_hook.rs +++ b/src/query/service/src/interpreters/hook/refresh_hook.rs @@ -209,7 +209,7 @@ async fn generate_refresh_index_plan( catalog: &str, table_id: MetaId, ) -> Result> { - let segment_locs = ctx.get_segment_locations()?; + let segment_locs = ctx.get_inserted_segment_locations()?; let catalog = ctx.get_catalog(catalog).await?; let mut plans = vec![]; let indexes = catalog @@ -272,7 +272,7 @@ async fn generate_refresh_inverted_index_plan( desc: &RefreshDesc, table: Arc, ) -> Result> { - let segment_locs = ctx.get_segment_locations()?; + let segment_locs = ctx.get_inserted_segment_locations()?; let mut plans = vec![]; let table_meta = &table.get_table_info().meta; @@ -296,7 +296,7 @@ async fn generate_refresh_virtual_column_plan( ctx: Arc, desc: &RefreshDesc, ) -> Result> { - let segment_locs = ctx.get_segment_locations()?; + let segment_locs = ctx.get_inserted_segment_locations()?; let table_info = ctx .get_table(&desc.catalog, &desc.database, &desc.table) diff --git a/src/query/service/src/interpreters/interpreter_table_recluster.rs b/src/query/service/src/interpreters/interpreter_table_recluster.rs index ffb4329203726..8e58d8bad911d 100644 --- a/src/query/service/src/interpreters/interpreter_table_recluster.rs +++ b/src/query/service/src/interpreters/interpreter_table_recluster.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; use std::time::SystemTime; @@ -23,7 +22,6 @@ use databend_common_exception::Result; use databend_common_sql::executor::PhysicalPlanBuilder; use databend_common_sql::optimizer::SExpr; use databend_common_sql::plans::Recluster; -use databend_common_sql::MetadataRef; use log::error; use log::warn; @@ -143,15 +141,18 @@ impl ReclusterTableInterpreter { let start = SystemTime::now(); let plan: Recluster = self.s_expr.plan().clone().try_into()?; - // try add lock table. + // try to add lock table. let lock_guard = self .ctx .clone() .acquire_table_lock(&plan.catalog, &plan.database, &plan.table, &self.lock_opt) .await?; - let mut builder = PhysicalPlanBuilder::new(MetadataRef::default(), self.ctx.clone(), false); - let physical_plan = match builder.build(&self.s_expr, HashSet::new()).await { + let mut builder = PhysicalPlanBuilder::new(plan.metadata.clone(), self.ctx.clone(), false); + let physical_plan = match builder + .build(&self.s_expr, plan.bind_context.column_set()) + .await + { Ok(res) => res, Err(e) => { return if e.code() == ErrorCode::NO_NEED_TO_RECLUSTER { @@ -176,7 +177,7 @@ impl ReclusterTableInterpreter { let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?; - self.ctx.clear_segment_locations()?; + self.ctx.clear_inserted_segment_locations()?; self.ctx.set_executor(complete_executor.get_inner())?; complete_executor.execute()?; // make sure the executor is dropped before the next loop. diff --git a/src/query/service/src/pipelines/builders/builder_column_mutation.rs b/src/query/service/src/pipelines/builders/builder_column_mutation.rs index 225fef28c1380..1e5ea0acdead0 100644 --- a/src/query/service/src/pipelines/builders/builder_column_mutation.rs +++ b/src/query/service/src/pipelines/builders/builder_column_mutation.rs @@ -19,7 +19,6 @@ use databend_common_exception::Result; use databend_common_expression::RemoteExpr; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_pipeline_transforms::processors::TransformPipelineHelper; -use databend_common_sql::binder::MutationType; use databend_common_sql::evaluator::BlockOperator; use databend_common_sql::evaluator::CompoundBlockOperator; use databend_common_sql::executor::physical_plans::ColumnMutation; @@ -47,45 +46,28 @@ impl PipelineBuilder { .build_table_by_table_info(&column_mutation.table_info, None)?; let table = FuseTable::try_from_table(table.as_ref())?; - if column_mutation.mutation_type == MutationType::Delete { - let cluster_stats_gen = table.get_cluster_stats_gen( - self.ctx.clone(), - 0, - table.get_block_thresholds(), - None, - )?; - self.main_pipeline.add_transform(|input, output| { - let proc = TransformSerializeBlock::try_create( - self.ctx.clone(), - input, - output, - table, - cluster_stats_gen.clone(), - MutationKind::Delete, - )?; - proc.into_processor() - })?; + let block_thresholds = table.get_block_thresholds(); + let cluster_stats_gen = if matches!(column_mutation.mutation_kind, MutationKind::Delete) { + table.get_cluster_stats_gen(self.ctx.clone(), 0, block_thresholds, None)? } else { - let block_thresholds = table.get_block_thresholds(); - let cluster_stats_gen = table.cluster_gen_for_append( + table.cluster_gen_for_append( self.ctx.clone(), &mut self.main_pipeline, block_thresholds, None, + )? + }; + self.main_pipeline.add_transform(|input, output| { + let proc = TransformSerializeBlock::try_create( + self.ctx.clone(), + input, + output, + table, + cluster_stats_gen.clone(), + column_mutation.mutation_kind, )?; - self.main_pipeline.add_transform(|input, output| { - let proc = TransformSerializeBlock::try_create( - self.ctx.clone(), - input, - output, - table, - cluster_stats_gen.clone(), - MutationKind::Update, - )?; - proc.into_processor() - })?; - } - + proc.into_processor() + })?; Ok(()) } diff --git a/src/query/service/src/pipelines/builders/builder_commit.rs b/src/query/service/src/pipelines/builders/builder_commit.rs index e4becfd76f551..53bb53140e7bd 100644 --- a/src/query/service/src/pipelines/builders/builder_commit.rs +++ b/src/query/service/src/pipelines/builders/builder_commit.rs @@ -40,7 +40,7 @@ impl PipelineBuilder { self.main_pipeline.add_async_accumulating_transformer(|| { let base_segments = if matches!( plan.mutation_kind, - MutationKind::Compact | MutationKind::Insert | MutationKind::Recluster + MutationKind::Compact | MutationKind::Insert | MutationKind::Recluster(_) ) { vec![] } else { diff --git a/src/query/service/src/pipelines/builders/builder_hilbert_serialize.rs b/src/query/service/src/pipelines/builders/builder_hilbert_serialize.rs new file mode 100644 index 0000000000000..b9bb69cca7f6f --- /dev/null +++ b/src/query/service/src/pipelines/builders/builder_hilbert_serialize.rs @@ -0,0 +1,57 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_pipeline_transforms::processors::build_compact_block_no_split_pipeline; +use databend_common_sql::executor::physical_plans::HilbertSerialize; +use databend_common_sql::executor::physical_plans::MutationKind; +use databend_common_storages_factory::Table; +use databend_common_storages_fuse::operations::TransformSerializeBlock; +use databend_common_storages_fuse::statistics::ClusterStatsGenerator; +use databend_common_storages_fuse::FuseTable; +use databend_common_storages_fuse::TableContext; +use databend_storages_common_table_meta::table::ClusterType; + +use crate::pipelines::PipelineBuilder; + +impl PipelineBuilder { + pub(crate) fn build_hilbert_serialize(&mut self, serialize: &HilbertSerialize) -> Result<()> { + self.build_pipeline(&serialize.input)?; + let table = self + .ctx + .build_table_by_table_info(&serialize.table_info, None)?; + let table = FuseTable::try_from_table(table.as_ref())?; + + let block_thresholds = table.get_block_thresholds(); + let max_threads = self.ctx.get_settings().get_max_threads()? as usize; + build_compact_block_no_split_pipeline( + &mut self.main_pipeline, + block_thresholds, + max_threads, + )?; + + self.main_pipeline + .add_transform(|transform_input_port, transform_output_port| { + let proc = TransformSerializeBlock::try_create( + self.ctx.clone(), + transform_input_port, + transform_output_port, + table, + ClusterStatsGenerator::default(), + MutationKind::Recluster(ClusterType::Hilbert), + )?; + proc.into_processor() + }) + } +} diff --git a/src/query/service/src/pipelines/builders/builder_recluster.rs b/src/query/service/src/pipelines/builders/builder_recluster.rs index dfc414490f9bb..82c5938c6f621 100644 --- a/src/query/service/src/pipelines/builders/builder_recluster.rs +++ b/src/query/service/src/pipelines/builders/builder_recluster.rs @@ -34,12 +34,33 @@ use databend_common_storages_factory::Table; use databend_common_storages_fuse::operations::TransformSerializeBlock; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::TableContext; +use databend_storages_common_table_meta::table::ClusterType; use crate::pipelines::builders::SortPipelineBuilder; use crate::pipelines::processors::TransformAddStreamColumns; use crate::pipelines::PipelineBuilder; impl PipelineBuilder { + /// The flow of Pipeline is as follows: + // ┌──────────┐ ┌───────────────┐ ┌─────────┐ + // │FuseSource├────►│CompoundBlockOp├────►│SortMerge├────┐ + // └──────────┘ └───────────────┘ └─────────┘ │ + // ┌──────────┐ ┌───────────────┐ ┌─────────┐ │ ┌──────────────┐ ┌─────────┐ + // │FuseSource├────►│CompoundBlockOp├────►│SortMerge├────┤────►│MultiSortMerge├────►│Resize(N)├───┐ + // └──────────┘ └───────────────┘ └─────────┘ │ └──────────────┘ └─────────┘ │ + // ┌──────────┐ ┌───────────────┐ ┌─────────┐ │ │ + // │FuseSource├────►│CompoundBlockOp├────►│SortMerge├────┘ │ + // └──────────┘ └───────────────┘ └─────────┘ │ + // ┌──────────────────────────────────────────────────────────────────────────────────────────────┘ + // │ ┌──────────────┐ + // │ ┌───►│SerializeBlock├───┐ + // │ │ └──────────────┘ │ + // │ │ ┌──────────────┐ │ ┌─────────┐ ┌────────────────┐ ┌─────────────┐ ┌──────────┐ + // └───►│───►│SerializeBlock├───┤───►│Resize(1)├───►│SerializeSegment├────►│ReclusterAggr├────►│CommitSink│ + // │ └──────────────┘ │ └─────────┘ └────────────────┘ └─────────────┘ └──────────┘ + // │ ┌──────────────┐ │ + // └───►│SerializeBlock├───┘ + // └──────────────┘ pub(crate) fn build_recluster(&mut self, recluster: &Recluster) -> Result<()> { match recluster.tasks.len() { 0 => self.main_pipeline.add_source(EmptySource::create, 1), @@ -157,7 +178,7 @@ impl PipelineBuilder { transform_output_port, table, cluster_stats_gen.clone(), - MutationKind::Recluster, + MutationKind::Recluster(ClusterType::Linear), )?; proc.into_processor() }) diff --git a/src/query/service/src/pipelines/builders/builder_replace_into.rs b/src/query/service/src/pipelines/builders/builder_replace_into.rs index ad3fd8854904c..e8e1f08d3228e 100644 --- a/src/query/service/src/pipelines/builders/builder_replace_into.rs +++ b/src/query/service/src/pipelines/builders/builder_replace_into.rs @@ -359,7 +359,7 @@ impl PipelineBuilder { } else { None }; - let cluster_keys = table.cluster_keys(self.ctx.clone()); + let cluster_keys = table.linear_cluster_keys(self.ctx.clone()); if *need_insert { let replace_into_processor = ReplaceIntoProcessor::create( self.ctx.clone(), diff --git a/src/query/service/src/pipelines/builders/mod.rs b/src/query/service/src/pipelines/builders/mod.rs index a35c735954af7..707ed7c7e4b58 100644 --- a/src/query/service/src/pipelines/builders/mod.rs +++ b/src/query/service/src/pipelines/builders/mod.rs @@ -25,6 +25,7 @@ mod builder_distributed_insert_select; mod builder_exchange; mod builder_fill_missing_columns; mod builder_filter; +mod builder_hilbert_serialize; mod builder_insert_multi_table; mod builder_join; mod builder_limit; diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 8a87ce1e7eb7c..6508dcca83b6c 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -227,6 +227,7 @@ impl PipelineBuilder { // Recluster. PhysicalPlan::Recluster(recluster) => self.build_recluster(recluster), + PhysicalPlan::HilbertSerialize(serialize) => self.build_hilbert_serialize(serialize), PhysicalPlan::Duplicate(duplicate) => self.build_duplicate(duplicate), PhysicalPlan::Shuffle(shuffle) => self.build_shuffle(shuffle), diff --git a/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs b/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs index 4fd297786eae6..b8285d3061c24 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_recursive_cte_source.rs @@ -339,6 +339,7 @@ async fn create_memory_table_for_cte_scan( | PhysicalPlan::CompactSource(_) | PhysicalPlan::CommitSink(_) | PhysicalPlan::Recluster(_) + | PhysicalPlan::HilbertSerialize(_) | PhysicalPlan::Duplicate(_) | PhysicalPlan::ChunkFilter(_) | PhysicalPlan::ChunkEvalScalar(_) diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index f08482c90175e..d6ebd30d4fe41 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -152,6 +152,7 @@ pub struct QueryContext { fragment_id: Arc, // Used by synchronized generate aggregating indexes when new data written. inserted_segment_locs: Arc>>, + selected_segment_locs: Arc>>, // Temp table for materialized CTE, first string is the database_name, second string is the table_name // All temp tables' catalog is `CATALOG_DEFAULT`, so we don't need to store it. m_cte_temp_table: Arc>>, @@ -178,9 +179,10 @@ impl QueryContext { shared, query_settings, fragment_id: Arc::new(AtomicUsize::new(0)), - inserted_segment_locs: Arc::new(RwLock::new(HashSet::new())), - block_threshold: Arc::new(RwLock::new(BlockThresholds::default())), + inserted_segment_locs: Default::default(), + block_threshold: Default::default(), m_cte_temp_table: Default::default(), + selected_segment_locs: Default::default(), }) } @@ -1204,19 +1206,19 @@ impl TableContext for QueryContext { }) } - fn add_segment_location(&self, segment_loc: Location) -> Result<()> { + fn add_inserted_segment_location(&self, segment_loc: Location) -> Result<()> { let mut segment_locations = self.inserted_segment_locs.write(); segment_locations.insert(segment_loc); Ok(()) } - fn clear_segment_locations(&self) -> Result<()> { + fn clear_inserted_segment_locations(&self) -> Result<()> { let mut segment_locations = self.inserted_segment_locs.write(); segment_locations.clear(); Ok(()) } - fn get_segment_locations(&self) -> Result> { + fn get_inserted_segment_locations(&self) -> Result> { Ok(self .inserted_segment_locs .read() @@ -1225,6 +1227,20 @@ impl TableContext for QueryContext { .collect::>()) } + fn add_selected_segment_location(&self, segment_loc: Location) { + let mut segment_locations = self.selected_segment_locs.write(); + segment_locations.insert(segment_loc); + } + + fn get_selected_segment_locations(&self) -> Vec { + self.selected_segment_locs.read().iter().cloned().collect() + } + + fn clear_selected_segment_locations(&self) { + let mut segment_locations = self.selected_segment_locs.write(); + segment_locations.clear(); + } + fn add_file_status(&self, file_path: &str, file_status: FileStatus) -> Result<()> { if matches!(self.get_query_kind(), QueryKind::CopyIntoTable) { self.shared.copy_status.add_chunk(file_path, file_status); diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index e8679f31baf22..1532ed129dfc3 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -842,15 +842,15 @@ impl TableContext for CtxDelegation { todo!() } - fn add_segment_location(&self, _segment_loc: Location) -> Result<()> { + fn add_inserted_segment_location(&self, _segment_loc: Location) -> Result<()> { todo!() } - fn clear_segment_locations(&self) -> Result<()> { + fn clear_inserted_segment_locations(&self) -> Result<()> { todo!() } - fn get_segment_locations(&self) -> Result> { + fn get_inserted_segment_locations(&self) -> Result> { todo!() } diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index a88975171c57e..94c359fdc501f 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -730,15 +730,15 @@ impl TableContext for CtxDelegation { HashMap::new() } - fn add_segment_location(&self, _segment_loc: Location) -> Result<()> { + fn add_inserted_segment_location(&self, _segment_loc: Location) -> Result<()> { todo!() } - fn clear_segment_locations(&self) -> Result<()> { + fn clear_inserted_segment_locations(&self) -> Result<()> { todo!() } - fn get_segment_locations(&self) -> Result> { + fn get_inserted_segment_locations(&self) -> Result> { todo!() } diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index b895d7ffd65d7..d199506c1b384 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1151,6 +1151,20 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=u64::MAX)), }), + ("hilbert_num_range_ids", DefaultSettingValue { + value: UserSettingValue::UInt64(1024), + desc: "Specifies the domain of range IDs in Hilbert clustering. A larger value provides finer granularity, but may incur a performance cost.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(1..=65536)), + }), + ("hilbert_sample_size_per_block", DefaultSettingValue { + value: UserSettingValue::UInt64(1000), + desc: "Specifies the number of sample points per block used in Hilbert clustering.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(1..=u64::MAX)), + }), ]); Ok(Arc::new(DefaultSettings { diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 21d12dbb56fcb..6ccd7ea9d2e97 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -181,6 +181,11 @@ impl Settings { self.try_get_u64("max_block_size") } + // Set max_block_size. + pub fn set_max_block_size(&self, val: u64) -> Result<()> { + self.try_set_u64("max_block_size", val) + } + // Max block size for parquet reader pub fn get_parquet_max_block_size(&self) -> Result { self.try_get_u64("parquet_max_block_size") @@ -848,4 +853,12 @@ impl Settings { let v = self.try_get_u64("stream_consume_batch_size_hint")?; Ok(if v == 0 { None } else { Some(v) }) } + + pub fn get_hilbert_num_range_ids(&self) -> Result { + self.try_get_u64("hilbert_num_range_ids") + } + + pub fn get_hilbert_sample_size_per_block(&self) -> Result { + self.try_get_u64("hilbert_sample_size_per_block") + } } diff --git a/src/query/sql/Cargo.toml b/src/query/sql/Cargo.toml index ba707dc16a72b..0cdb7cf0e5a88 100644 --- a/src/query/sql/Cargo.toml +++ b/src/query/sql/Cargo.toml @@ -35,6 +35,7 @@ databend-common-storages-result-cache = { workspace = true } databend-common-storages-view = { workspace = true } databend-common-users = { workspace = true } databend-enterprise-data-mask-feature = { workspace = true } +databend-enterprise-hilbert-clustering = { workspace = true } databend-storages-common-cache = { workspace = true } databend-storages-common-io = { workspace = true } databend-storages-common-session = { workspace = true } diff --git a/src/query/sql/src/executor/format.rs b/src/query/sql/src/executor/format.rs index a671c196405ab..233c0c408340e 100644 --- a/src/query/sql/src/executor/format.rs +++ b/src/query/sql/src/executor/format.rs @@ -350,6 +350,9 @@ fn to_format_tree( distributed_insert_to_format_tree(plan.as_ref(), metadata, profs) } PhysicalPlan::Recluster(_) => Ok(FormatTreeNode::new("Recluster".to_string())), + PhysicalPlan::HilbertSerialize(_) => { + Ok(FormatTreeNode::new("HilbertSerialize".to_string())) + } PhysicalPlan::CompactSource(_) => Ok(FormatTreeNode::new("CompactSource".to_string())), PhysicalPlan::CommitSink(plan) => commit_sink_to_format_tree(plan, metadata, profs), PhysicalPlan::ProjectSet(plan) => project_set_to_format_tree(plan, metadata, profs), diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index 8888cfb8e9f92..292438a290e06 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -23,6 +23,7 @@ use enum_as_inner::EnumAsInner; use itertools::Itertools; use super::physical_plans::AddStreamColumn; +use super::physical_plans::HilbertSerialize; use super::physical_plans::MutationManipulate; use super::physical_plans::MutationOrganize; use super::physical_plans::MutationSource; @@ -131,6 +132,7 @@ pub enum PhysicalPlan { /// Recluster Recluster(Box), + HilbertSerialize(Box), /// Multi table insert Duplicate(Box), @@ -348,6 +350,10 @@ impl PhysicalPlan { plan.plan_id = *next_id; *next_id += 1; } + PhysicalPlan::HilbertSerialize(plan) => { + plan.plan_id = *next_id; + *next_id += 1; + } PhysicalPlan::Duplicate(plan) => { plan.plan_id = *next_id; *next_id += 1; @@ -438,6 +444,7 @@ impl PhysicalPlan { PhysicalPlan::ReplaceInto(v) => v.plan_id, PhysicalPlan::CompactSource(v) => v.plan_id, PhysicalPlan::Recluster(v) => v.plan_id, + PhysicalPlan::HilbertSerialize(v) => v.plan_id, PhysicalPlan::Duplicate(v) => v.plan_id, PhysicalPlan::Shuffle(v) => v.plan_id, PhysicalPlan::ChunkFilter(v) => v.plan_id, @@ -492,7 +499,8 @@ impl PhysicalPlan { | PhysicalPlan::CompactSource(_) | PhysicalPlan::CommitSink(_) | PhysicalPlan::DistributedInsertSelect(_) - | PhysicalPlan::Recluster(_) => Ok(DataSchemaRef::default()), + | PhysicalPlan::Recluster(_) + | PhysicalPlan::HilbertSerialize(_) => Ok(DataSchemaRef::default()), PhysicalPlan::Duplicate(plan) => plan.input.output_schema(), PhysicalPlan::Shuffle(plan) => plan.input.output_schema(), PhysicalPlan::ChunkFilter(plan) => plan.input.output_schema(), @@ -552,6 +560,7 @@ impl PhysicalPlan { PhysicalPlan::ExpressionScan(_) => "ExpressionScan".to_string(), PhysicalPlan::CacheScan(_) => "CacheScan".to_string(), PhysicalPlan::Recluster(_) => "Recluster".to_string(), + PhysicalPlan::HilbertSerialize(_) => "HilbertSerialize".to_string(), PhysicalPlan::Udf(_) => "Udf".to_string(), PhysicalPlan::Duplicate(_) => "Duplicate".to_string(), PhysicalPlan::Shuffle(_) => "Shuffle".to_string(), @@ -574,6 +583,7 @@ impl PhysicalPlan { | PhysicalPlan::CompactSource(_) | PhysicalPlan::ReplaceAsyncSourcer(_) | PhysicalPlan::Recluster(_) + | PhysicalPlan::HilbertSerialize(_) | PhysicalPlan::RecursiveCteScan(_) => Box::new(std::iter::empty()), PhysicalPlan::Filter(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::EvalScalar(plan) => Box::new(std::iter::once(plan.input.as_ref())), @@ -679,6 +689,7 @@ impl PhysicalPlan { | PhysicalPlan::CacheScan(_) | PhysicalPlan::RecursiveCteScan(_) | PhysicalPlan::Recluster(_) + | PhysicalPlan::HilbertSerialize(_) | PhysicalPlan::Duplicate(_) | PhysicalPlan::Shuffle(_) | PhysicalPlan::ChunkFilter(_) diff --git a/src/query/sql/src/executor/physical_plan_builder.rs b/src/query/sql/src/executor/physical_plan_builder.rs index 1b28a32d5f87e..45f3d4c93f4d1 100644 --- a/src/query/sql/src/executor/physical_plan_builder.rs +++ b/src/query/sql/src/executor/physical_plan_builder.rs @@ -126,7 +126,9 @@ impl PhysicalPlanBuilder { RelOperator::MutationSource(mutation_source) => { self.build_mutation_source(mutation_source).await } - RelOperator::Recluster(recluster) => self.build_recluster(recluster).await, + RelOperator::Recluster(recluster) => { + self.build_recluster(s_expr, recluster, required).await + } RelOperator::CompactBlock(compact) => self.build_compact_block(compact).await, } } diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs index 7f53a14b515fd..5cdccd69f9e91 100644 --- a/src/query/sql/src/executor/physical_plan_visitor.rs +++ b/src/query/sql/src/executor/physical_plan_visitor.rs @@ -17,6 +17,7 @@ use databend_common_exception::Result; use super::physical_plans::AddStreamColumn; use super::physical_plans::CacheScan; use super::physical_plans::ExpressionScan; +use super::physical_plans::HilbertSerialize; use super::physical_plans::MutationManipulate; use super::physical_plans::MutationOrganize; use super::physical_plans::MutationSplit; @@ -107,6 +108,7 @@ pub trait PhysicalPlanReplacer { PhysicalPlan::ExpressionScan(plan) => self.replace_expression_scan(plan), PhysicalPlan::CacheScan(plan) => self.replace_cache_scan(plan), PhysicalPlan::Recluster(plan) => self.replace_recluster(plan), + PhysicalPlan::HilbertSerialize(plan) => self.replace_hilbert_serialize(plan), PhysicalPlan::Udf(plan) => self.replace_udf(plan), PhysicalPlan::AsyncFunction(plan) => self.replace_async_function(plan), PhysicalPlan::Duplicate(plan) => self.replace_duplicate(plan), @@ -125,6 +127,10 @@ pub trait PhysicalPlanReplacer { Ok(PhysicalPlan::Recluster(Box::new(plan.clone()))) } + fn replace_hilbert_serialize(&mut self, plan: &HilbertSerialize) -> Result { + Ok(PhysicalPlan::HilbertSerialize(Box::new(plan.clone()))) + } + fn replace_table_scan(&mut self, plan: &TableScan) -> Result { Ok(PhysicalPlan::TableScan(plan.clone())) } @@ -639,6 +645,7 @@ impl PhysicalPlan { | PhysicalPlan::ExpressionScan(_) | PhysicalPlan::CacheScan(_) | PhysicalPlan::Recluster(_) + | PhysicalPlan::HilbertSerialize(_) | PhysicalPlan::ExchangeSource(_) | PhysicalPlan::CompactSource(_) | PhysicalPlan::MutationSource(_) => {} diff --git a/src/query/sql/src/executor/physical_plans/common.rs b/src/query/sql/src/executor/physical_plans/common.rs index 230b5f2af50db..3dbc09ba2daaf 100644 --- a/src/query/sql/src/executor/physical_plans/common.rs +++ b/src/query/sql/src/executor/physical_plans/common.rs @@ -17,6 +17,7 @@ use std::fmt::Formatter; use databend_common_expression::types::DataType; use databend_common_expression::Scalar; +use databend_storages_common_table_meta::table::ClusterType; use crate::plans::UDFField; use crate::plans::UDFType; @@ -70,7 +71,7 @@ pub enum MutationKind { Delete, Update, Replace, - Recluster, + Recluster(ClusterType), Insert, Compact, MergeInto, @@ -81,7 +82,10 @@ impl Display for MutationKind { match self { MutationKind::Delete => write!(f, "Delete"), MutationKind::Insert => write!(f, "Insert"), - MutationKind::Recluster => write!(f, "Recluster"), + MutationKind::Recluster(cluster_type) => match cluster_type { + ClusterType::Linear => write!(f, "Recluster"), + ClusterType::Hilbert => write!(f, "Hilbert Recluster"), + }, MutationKind::Update => write!(f, "Update"), MutationKind::Replace => write!(f, "Replace"), MutationKind::Compact => write!(f, "Compact"), diff --git a/src/query/sql/src/executor/physical_plans/mod.rs b/src/query/sql/src/executor/physical_plans/mod.rs index 2bb277f10a73c..0caee74d6bec5 100644 --- a/src/query/sql/src/executor/physical_plans/mod.rs +++ b/src/query/sql/src/executor/physical_plans/mod.rs @@ -67,7 +67,6 @@ pub use physical_async_func::AsyncFunctionDesc; pub use physical_cache_scan::CacheScan; pub use physical_column_mutation::ColumnMutation; pub use physical_commit_sink::CommitSink; -pub use physical_commit_sink::ReclusterInfoSideCar; pub use physical_compact_source::CompactSource; pub use physical_constant_table_scan::ConstantTableScan; pub use physical_copy_into_location::CopyIntoLocation; @@ -91,6 +90,7 @@ pub use physical_mutation_source::*; pub use physical_project_set::ProjectSet; pub use physical_r_cte_scan::RecursiveCteScan; pub use physical_range_join::*; +pub use physical_recluster::HilbertSerialize; pub use physical_recluster::Recluster; pub use physical_refresh_index::RefreshIndex; pub use physical_replace_async_source::ReplaceAsyncSourcer; diff --git a/src/query/sql/src/executor/physical_plans/physical_column_mutation.rs b/src/query/sql/src/executor/physical_plans/physical_column_mutation.rs index b3e167a802dd1..68f50af6e3117 100644 --- a/src/query/sql/src/executor/physical_plans/physical_column_mutation.rs +++ b/src/query/sql/src/executor/physical_plans/physical_column_mutation.rs @@ -17,8 +17,8 @@ use std::collections::HashMap; use databend_common_expression::RemoteExpr; use databend_common_meta_app::schema::TableInfo; -use crate::binder::MutationType; use crate::executor::physical_plan::PhysicalPlan; +use crate::executor::physical_plans::MutationKind; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct ColumnMutation { @@ -27,7 +27,7 @@ pub struct ColumnMutation { pub table_info: TableInfo, pub mutation_expr: Option>, pub computed_expr: Option>, - pub mutation_type: MutationType, + pub mutation_kind: MutationKind, pub field_id_to_schema_index: HashMap, pub input_num_columns: usize, pub has_filter_column: bool, diff --git a/src/query/sql/src/executor/physical_plans/physical_commit_sink.rs b/src/query/sql/src/executor/physical_plans/physical_commit_sink.rs index 33bd4f06f7ca5..572a4944b1d1e 100644 --- a/src/query/sql/src/executor/physical_plans/physical_commit_sink.rs +++ b/src/query/sql/src/executor/physical_plans/physical_commit_sink.rs @@ -14,10 +14,9 @@ use std::sync::Arc; +use databend_common_catalog::plan::ReclusterInfoSideCar; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::UpdateStreamMetaReq; -use databend_storages_common_table_meta::meta::BlockMeta; -use databend_storages_common_table_meta::meta::Statistics; use databend_storages_common_table_meta::meta::TableSnapshot; use crate::executor::physical_plans::common::MutationKind; @@ -39,11 +38,3 @@ pub struct CommitSink { // Used for recluster. pub recluster_info: Option, } - -// TODO refine this -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)] -pub struct ReclusterInfoSideCar { - pub merged_blocks: Vec>, - pub removed_segment_indexes: Vec, - pub removed_statistics: Statistics, -} diff --git a/src/query/sql/src/executor/physical_plans/physical_mutation.rs b/src/query/sql/src/executor/physical_plans/physical_mutation.rs index df8573418c554..d928bb43fe724 100644 --- a/src/query/sql/src/executor/physical_plans/physical_mutation.rs +++ b/src/query/sql/src/executor/physical_plans/physical_mutation.rs @@ -186,7 +186,7 @@ impl PhysicalPlanBuilder { table_info: mutation_build_info.table_info.clone(), mutation_expr, computed_expr, - mutation_type: mutation_type.clone(), + mutation_kind, field_id_to_schema_index, input_num_columns: mutation_input_schema.fields().len(), has_filter_column: predicate_column_index.is_some(), diff --git a/src/query/sql/src/executor/physical_plans/physical_recluster.rs b/src/query/sql/src/executor/physical_plans/physical_recluster.rs index 6fdb94eb97c9d..bd3f82081bc02 100644 --- a/src/query/sql/src/executor/physical_plans/physical_recluster.rs +++ b/src/query/sql/src/executor/physical_plans/physical_recluster.rs @@ -14,14 +14,16 @@ use databend_common_catalog::plan::PartInfoType; use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::plan::ReclusterInfoSideCar; use databend_common_catalog::plan::ReclusterParts; use databend_common_catalog::plan::ReclusterTask; use databend_common_catalog::table::TableExt; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::schema::TableInfo; +use databend_enterprise_hilbert_clustering::get_hilbert_clustering_handler; +use databend_storages_common_table_meta::table::ClusterType; -use crate::executor::physical_plans::physical_commit_sink::ReclusterInfoSideCar; use crate::executor::physical_plans::CommitSink; use crate::executor::physical_plans::CompactSource; use crate::executor::physical_plans::Exchange; @@ -29,6 +31,8 @@ use crate::executor::physical_plans::FragmentKind; use crate::executor::physical_plans::MutationKind; use crate::executor::PhysicalPlan; use crate::executor::PhysicalPlanBuilder; +use crate::optimizer::ColumnSet; +use crate::optimizer::SExpr; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct Recluster { @@ -37,30 +41,19 @@ pub struct Recluster { pub table_info: TableInfo, } +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] +pub struct HilbertSerialize { + pub plan_id: u32, + pub input: Box, + pub table_info: TableInfo, +} + impl PhysicalPlanBuilder { - /// The flow of Pipeline is as follows: - // ┌──────────┐ ┌───────────────┐ ┌─────────┐ - // │FuseSource├────►│CompoundBlockOp├────►│SortMerge├────┐ - // └──────────┘ └───────────────┘ └─────────┘ │ - // ┌──────────┐ ┌───────────────┐ ┌─────────┐ │ ┌──────────────┐ ┌─────────┐ - // │FuseSource├────►│CompoundBlockOp├────►│SortMerge├────┤────►│MultiSortMerge├────►│Resize(N)├───┐ - // └──────────┘ └───────────────┘ └─────────┘ │ └──────────────┘ └─────────┘ │ - // ┌──────────┐ ┌───────────────┐ ┌─────────┐ │ │ - // │FuseSource├────►│CompoundBlockOp├────►│SortMerge├────┘ │ - // └──────────┘ └───────────────┘ └─────────┘ │ - // ┌──────────────────────────────────────────────────────────────────────────────────────────────┘ - // │ ┌──────────────┐ - // │ ┌───►│SerializeBlock├───┐ - // │ │ └──────────────┘ │ - // │ │ ┌──────────────┐ │ ┌─────────┐ ┌────────────────┐ ┌─────────────┐ ┌──────────┐ - // └───►│───►│SerializeBlock├───┤───►│Resize(1)├───►│SerializeSegment├────►│ReclusterAggr├────►│CommitSink│ - // │ └──────────────┘ │ └─────────┘ └────────────────┘ └─────────────┘ └──────────┘ - // │ ┌──────────────┐ │ - // └───►│SerializeBlock├───┘ - // └──────────────┘ pub async fn build_recluster( &mut self, + s_expr: &SExpr, recluster: &crate::plans::Recluster, + required: ColumnSet, ) -> Result { let crate::plans::Recluster { catalog, @@ -68,6 +61,8 @@ impl PhysicalPlanBuilder { table, filters, limit, + cluster_type, + .. } = recluster; let tenant = self.ctx.get_tenant(); @@ -75,95 +70,129 @@ impl PhysicalPlanBuilder { let tbl = catalog.get_table(&tenant, database, table).await?; // check mutability tbl.check_mutable()?; - let push_downs = filters.clone().map(|v| PushDownInfo { filters: Some(v), ..PushDownInfo::default() }); - let Some((parts, snapshot)) = tbl.recluster(self.ctx.clone(), push_downs, *limit).await? - else { - return Err(ErrorCode::NoNeedToRecluster(format!( - "No need to do recluster for '{database}'.'{table}'" - ))); - }; - if parts.is_empty() { - return Err(ErrorCode::NoNeedToRecluster(format!( - "No need to do recluster for '{database}'.'{table}'" - ))); - } - - let is_distributed = parts.is_distributed(self.ctx.clone()); let table_info = tbl.get_table_info().clone(); - let mut plan = match parts { - ReclusterParts::Recluster { - tasks, - remained_blocks, - removed_segment_indexes, - removed_segment_summary, - } => { - let mut root = PhysicalPlan::Recluster(Box::new(Recluster { - tasks, + + let mut plan = match cluster_type { + ClusterType::Hilbert => { + let handler = get_hilbert_clustering_handler(); + let Some((recluster_info, snapshot)) = handler + .do_hilbert_clustering(tbl.clone(), self.ctx.clone(), push_downs) + .await? + else { + return Err(ErrorCode::NoNeedToRecluster(format!( + "No need to do recluster for '{database}'.'{table}'" + ))); + }; + let plan = self.build(s_expr.child(0)?, required).await?; + + let plan = PhysicalPlan::HilbertSerialize(Box::new(HilbertSerialize { + plan_id: 0, + input: Box::new(plan), table_info: table_info.clone(), - plan_id: u32::MAX, })); - - if is_distributed { - root = PhysicalPlan::Exchange(Exchange { - plan_id: 0, - input: Box::new(root), - kind: FragmentKind::Merge, - keys: vec![], - allow_adjust_parallelism: true, - ignore_exchange: false, - }); - } PhysicalPlan::CommitSink(Box::new(CommitSink { - input: Box::new(root), + input: Box::new(plan), table_info, snapshot: Some(snapshot), - mutation_kind: MutationKind::Recluster, + mutation_kind: MutationKind::Recluster(ClusterType::Hilbert), update_stream_meta: vec![], merge_meta: false, deduplicated_label: None, plan_id: u32::MAX, - recluster_info: Some(ReclusterInfoSideCar { - merged_blocks: remained_blocks, - removed_segment_indexes, - removed_statistics: removed_segment_summary, - }), + recluster_info: Some(recluster_info), })) } - ReclusterParts::Compact(parts) => { - let merge_meta = parts.partitions_type() == PartInfoType::LazyLevel; - let mut root = PhysicalPlan::CompactSource(Box::new(CompactSource { - parts, - table_info: table_info.clone(), - column_ids: snapshot.schema.to_leaf_column_id_set(), - plan_id: u32::MAX, - })); - - if is_distributed { - root = PhysicalPlan::Exchange(Exchange { - plan_id: 0, - input: Box::new(root), - kind: FragmentKind::Merge, - keys: vec![], - allow_adjust_parallelism: true, - ignore_exchange: false, - }); + ClusterType::Linear => { + let Some((parts, snapshot)) = + tbl.recluster(self.ctx.clone(), push_downs, *limit).await? + else { + return Err(ErrorCode::NoNeedToRecluster(format!( + "No need to do recluster for '{database}'.'{table}'" + ))); + }; + if parts.is_empty() { + return Err(ErrorCode::NoNeedToRecluster(format!( + "No need to do recluster for '{database}'.'{table}'" + ))); } - PhysicalPlan::CommitSink(Box::new(CommitSink { - input: Box::new(root), - table_info, - snapshot: Some(snapshot), - mutation_kind: MutationKind::Compact, - update_stream_meta: vec![], - merge_meta, - deduplicated_label: None, - plan_id: u32::MAX, - recluster_info: None, - })) + let is_distributed = parts.is_distributed(self.ctx.clone()); + match parts { + ReclusterParts::Recluster { + tasks, + remained_blocks, + removed_segment_indexes, + removed_segment_summary, + } => { + let mut root = PhysicalPlan::Recluster(Box::new(Recluster { + tasks, + table_info: table_info.clone(), + plan_id: u32::MAX, + })); + + if is_distributed { + root = PhysicalPlan::Exchange(Exchange { + plan_id: 0, + input: Box::new(root), + kind: FragmentKind::Merge, + keys: vec![], + allow_adjust_parallelism: true, + ignore_exchange: false, + }); + } + PhysicalPlan::CommitSink(Box::new(CommitSink { + input: Box::new(root), + table_info, + snapshot: Some(snapshot), + mutation_kind: MutationKind::Recluster(ClusterType::Linear), + update_stream_meta: vec![], + merge_meta: false, + deduplicated_label: None, + plan_id: u32::MAX, + recluster_info: Some(ReclusterInfoSideCar { + merged_blocks: remained_blocks, + removed_segment_indexes, + removed_statistics: removed_segment_summary, + }), + })) + } + ReclusterParts::Compact(parts) => { + let merge_meta = parts.partitions_type() == PartInfoType::LazyLevel; + let mut root = PhysicalPlan::CompactSource(Box::new(CompactSource { + parts, + table_info: table_info.clone(), + column_ids: snapshot.schema.to_leaf_column_id_set(), + plan_id: u32::MAX, + })); + + if is_distributed { + root = PhysicalPlan::Exchange(Exchange { + plan_id: 0, + input: Box::new(root), + kind: FragmentKind::Merge, + keys: vec![], + allow_adjust_parallelism: true, + ignore_exchange: false, + }); + } + + PhysicalPlan::CommitSink(Box::new(CommitSink { + input: Box::new(root), + table_info, + snapshot: Some(snapshot), + mutation_kind: MutationKind::Compact, + update_stream_meta: vec![], + merge_meta, + deduplicated_label: None, + plan_id: u32::MAX, + recluster_info: None, + })) + } + } } }; plan.adjust_plan_id(&mut 0); diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index 2f72347d8ee0e..3837ebec501db 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -22,7 +22,7 @@ use databend_common_ast::ast::AlterTableStmt; use databend_common_ast::ast::AnalyzeTableStmt; use databend_common_ast::ast::AttachTableStmt; use databend_common_ast::ast::ClusterOption; -use databend_common_ast::ast::ClusterType; +use databend_common_ast::ast::ClusterType as AstClusterType; use databend_common_ast::ast::ColumnDefinition; use databend_common_ast::ast::ColumnExpr; use databend_common_ast::ast::CompactTarget; @@ -32,6 +32,7 @@ use databend_common_ast::ast::DescribeTableStmt; use databend_common_ast::ast::DropTableStmt; use databend_common_ast::ast::Engine; use databend_common_ast::ast::ExistsTableStmt; +use databend_common_ast::ast::Expr; use databend_common_ast::ast::Identifier; use databend_common_ast::ast::InvertedIndexDefinition; use databend_common_ast::ast::ModifyColumnAction; @@ -78,6 +79,8 @@ use databend_common_expression::TableSchema; use databend_common_expression::TableSchemaRef; use databend_common_expression::TableSchemaRefExt; use databend_common_functions::BUILTIN_FUNCTIONS; +use databend_common_license::license::Feature; +use databend_common_license::license_manager::LicenseManagerSwitch; use databend_common_meta_app::schema::CreateOption; use databend_common_meta_app::schema::TableIndex; use databend_common_meta_app::storage::StorageParams; @@ -85,6 +88,7 @@ use databend_common_storage::DataOperator; use databend_common_storages_view::view_table::QUERY; use databend_common_storages_view::view_table::VIEW_ENGINE; use databend_storages_common_table_meta::table::is_reserved_opt_key; +use databend_storages_common_table_meta::table::ClusterType; use databend_storages_common_table_meta::table::OPT_KEY_CLUSTER_TYPE; use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID; use databend_storages_common_table_meta::table::OPT_KEY_ENGINE_META; @@ -97,6 +101,7 @@ use derive_visitor::DriveMut; use log::debug; use opendal::Operator; +use crate::bind_table; use crate::binder::get_storage_params_from_options; use crate::binder::parse_storage_params_from_uri; use crate::binder::scalar::ScalarBinder; @@ -144,8 +149,10 @@ use crate::plans::VacuumTableOption; use crate::plans::VacuumTablePlan; use crate::plans::VacuumTemporaryFilesPlan; use crate::BindContext; +use crate::NameResolutionContext; use crate::Planner; use crate::SelectBuilder; +use crate::TypeChecker; impl Binder { #[async_backtrace::framed] @@ -468,7 +475,7 @@ impl Binder { Some(self.ctx.as_ref()), "when create TABLE with external location", ) - .await?; + .await?; // create a temporary op to check if params is correct let data_operator = DataOperator::try_create(&sp).await?; @@ -1020,50 +1027,15 @@ impl Binder { selection, limit, } => { - let filters = if let Some(expr) = selection { - let (_, mut context) = - self.bind_table_reference(bind_context, table_reference)?; - - let mut scalar_binder = ScalarBinder::new( - &mut context, - self.ctx.clone(), - &self.name_resolution_ctx, - self.metadata.clone(), - &[], - ); - scalar_binder.forbid_udf(); - let (scalar, _) = scalar_binder.bind(expr)?; - - // prepare the filter expression - let filter = cast_expr_to_non_null_boolean( - scalar - .as_expr()? - .project_column_ref(|col| col.column_name.clone()), - )?; - // prepare the inverse filter expression - let inverted_filter = - check_function(None, "not", &[], &[filter.clone()], &BUILTIN_FUNCTIONS)?; - - Some(Filters { - filter: filter.as_remote_expr(), - inverted_filter: inverted_filter.as_remote_expr(), - }) - } else { - None - }; - - let recluster = RelOperator::Recluster(Recluster { + self.bind_recluster_table( catalog, database, table, - filters, - limit: limit.map(|v| v as usize), - }); - let s_expr = SExpr::create_leaf(Arc::new(recluster)); - Ok(Plan::ReclusterTable { - s_expr: Box::new(s_expr), - is_final: *is_final, - }) + limit.map(|v| v as usize), + selection, + *is_final, + ) + .await } AlterTableAction::FlashbackTo { point } => { let point = self.resolve_data_travel_point(bind_context, point)?; @@ -1094,6 +1066,157 @@ impl Binder { } } + #[async_backtrace::framed] + pub async fn bind_recluster_table( + &mut self, + catalog: String, + database: String, + table: String, + limit: Option, + selection: &Option, + is_final: bool, + ) -> Result { + let tbl = self.ctx.get_table(&catalog, &database, &table).await?; + let Some(cluster_type) = tbl.cluster_type() else { + return Err(ErrorCode::UnclusteredTable(format!( + "Unclustered table '{}.{}'", + database, table, + ))); + }; + + let filters = if let Some(expr) = selection { + let (mut context, metadata) = bind_table(tbl.clone())?; + let mut type_checker = TypeChecker::try_create( + &mut context, + self.ctx.clone(), + &self.name_resolution_ctx, + metadata, + &[], + true, + )?; + let (scalar, _) = *type_checker.resolve(expr)?; + + // prepare the filter expression + let filter = cast_expr_to_non_null_boolean( + scalar + .as_expr()? + .project_column_ref(|col| col.column_name.clone()), + )?; + // prepare the inverse filter expression + let inverted_filter = + check_function(None, "not", &[], &[filter.clone()], &BUILTIN_FUNCTIONS)?; + + Some(Filters { + filter: filter.as_remote_expr(), + inverted_filter: inverted_filter.as_remote_expr(), + }) + } else { + None + }; + + let s_expr = match cluster_type { + ClusterType::Linear => { + let recluster = RelOperator::Recluster(Recluster { + catalog, + database, + table, + limit, + filters, + cluster_type: ClusterType::Linear, + metadata: self.metadata.clone(), + bind_context: Default::default(), + }); + SExpr::create_leaf(Arc::new(recluster)) + } + ClusterType::Hilbert => { + LicenseManagerSwitch::instance().check_enterprise_enabled( + self.ctx.get_license_key(), + Feature::HilbertClustering, + )?; + let ast_exprs = tbl.resolve_cluster_keys(self.ctx.clone()).unwrap(); + let cluster_keys_len = ast_exprs.len(); + let settings = self.ctx.get_settings(); + let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?; + let cluster_key_strs = ast_exprs.into_iter().fold( + Vec::with_capacity(cluster_keys_len), + |mut acc, mut ast| { + let mut normalizer = IdentifierNormalizer { + ctx: &name_resolution_ctx, + }; + ast.drive_mut(&mut normalizer); + acc.push(format!("{:#}", &ast)); + acc + }, + ); + + let partitions = settings.get_hilbert_num_range_ids()?; + let sample_size = settings.get_hilbert_sample_size_per_block()?; + let keys_bounds_str = cluster_key_strs + .iter() + .map(|s| format!("range_bound({partitions}, {sample_size})({s}) AS {s}_bound")) + .collect::>() + .join(", "); + let hilbert_keys_str = cluster_key_strs + .iter() + .map(|s| { + format!( + "hilbert_key(cast(ifnull(range_partition_id({table}.{s}, _keys_bound.{s}_bound), {}) as uint16))", + partitions - 1 + ) + }) + .collect::>() + .join(", "); + let query = format!( + "WITH _keys_bound AS ( \ + SELECT \ + {keys_bounds_str} \ + FROM {database}.{table} \ + ), \ + _source_data AS ( \ + SELECT \ + {table}.*, \ + hilbert_index([{hilbert_keys_str}], 2) AS _hilbert_index \ + FROM {database}.{table}, _keys_bound \ + ) \ + SELECT \ + * EXCLUDE(_hilbert_index) \ + FROM \ + _source_data \ + ORDER BY \ + _hilbert_index" + ); + let tokens = tokenize_sql(query.as_str())?; + let (stmt, _) = parse_sql(&tokens, self.dialect)?; + let Statement::Query(query) = &stmt else { + unreachable!() + }; + let mut bind_context = BindContext::new(); + let (mut s_expr, new_bind_context) = self.bind_query(&mut bind_context, query)?; + if tbl.change_tracking_enabled() { + s_expr = set_update_stream_columns(&s_expr)?; + } + SExpr::create_unary( + Arc::new(RelOperator::Recluster(Recluster { + catalog, + database, + table, + limit, + filters, + cluster_type: ClusterType::Hilbert, + metadata: self.metadata.clone(), + bind_context: Box::new(new_bind_context), + })), + Arc::new(s_expr), + ) + } + }; + + Ok(Plan::ReclusterTable { + s_expr: Box::new(s_expr), + is_final, + }) + } + #[async_backtrace::framed] pub(in crate::planner::binder) async fn bind_rename_table( &mut self, @@ -1643,10 +1766,15 @@ impl Binder { } = cluster_opt; let expr_len = cluster_exprs.len(); - if matches!(cluster_type, ClusterType::Hilbert) && !(2..=5).contains(&expr_len) { - return Err(ErrorCode::InvalidClusterKeys( - "Hilbert clustering requires the dimension to be between 2 and 5", - )); + if matches!(cluster_type, AstClusterType::Hilbert) { + LicenseManagerSwitch::instance() + .check_enterprise_enabled(self.ctx.get_license_key(), Feature::HilbertClustering)?; + + if !(2..=5).contains(&expr_len) { + return Err(ErrorCode::InvalidClusterKeys( + "Hilbert clustering requires the dimension to be between 2 and 5", + )); + } } // Build a temporary BindContext to resolve the expr @@ -1731,6 +1859,24 @@ impl Binder { } } +fn set_update_stream_columns(s_expr: &SExpr) -> Result { + match s_expr.plan() { + RelOperator::Scan(scan) if scan.table_index == 0 => { + let mut scan = scan.clone(); + scan.set_update_stream_columns(true); + Ok(SExpr::create_leaf(Arc::new(scan.into()))) + } + _ => { + let mut children = Vec::with_capacity(s_expr.arity()); + for child in s_expr.children() { + let child = set_update_stream_columns(child)?; + children.push(Arc::new(child)); + } + Ok(s_expr.replace_children(children)) + } + } +} + const VERIFICATION_KEY: &str = "_v_d77aa11285c22e0e1d4593a035c98c0d"; const VERIFICATION_KEY_DEL: &str = "_v_d77aa11285c22e0e1d4593a035c98c0d_del"; diff --git a/src/query/sql/src/planner/expression_parser.rs b/src/query/sql/src/planner/expression_parser.rs index aa098e43e5d63..dce84a0d74425 100644 --- a/src/query/sql/src/planner/expression_parser.rs +++ b/src/query/sql/src/planner/expression_parser.rs @@ -28,7 +28,6 @@ use databend_common_expression::type_check::check_cast; use databend_common_expression::type_check::check_function; use databend_common_expression::types::DataType; use databend_common_expression::types::NumberDataType; -use databend_common_expression::types::NumberScalar; use databend_common_expression::ConstantFolder; use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; @@ -114,6 +113,17 @@ pub fn parse_exprs( ctx: Arc, table_meta: Arc, sql: &str, +) -> Result> { + let sql_dialect = ctx.get_settings().get_sql_dialect().unwrap_or_default(); + let tokens = tokenize_sql(sql)?; + let ast_exprs = parse_comma_separated_exprs(&tokens, sql_dialect)?; + parse_ast_exprs(ctx, table_meta, ast_exprs) +} + +fn parse_ast_exprs( + ctx: Arc, + table_meta: Arc, + ast_exprs: Vec, ) -> Result> { let (mut bind_context, metadata) = bind_table(table_meta)?; let settings = ctx.get_settings(); @@ -127,9 +137,6 @@ pub fn parse_exprs( false, )?; - let tokens = tokenize_sql(sql)?; - let sql_dialect = settings.get_sql_dialect().unwrap_or_default(); - let ast_exprs = parse_comma_separated_exprs(&tokens, sql_dialect)?; let exprs = ast_exprs .iter() .map(|ast| { @@ -386,41 +393,11 @@ pub fn parse_lambda_expr( pub fn parse_cluster_keys( ctx: Arc, table_meta: Arc, - cluster_key_str: &str, + ast_exprs: Vec, ) -> Result> { - let (mut bind_context, metadata) = bind_table(table_meta)?; - let settings = ctx.get_settings(); - let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?; - let mut type_checker = TypeChecker::try_create( - &mut bind_context, - ctx, - &name_resolution_ctx, - metadata, - &[], - true, - )?; - - let tokens = tokenize_sql(cluster_key_str)?; - let sql_dialect = settings.get_sql_dialect().unwrap_or_default(); - let mut ast_exprs = parse_comma_separated_exprs(&tokens, sql_dialect)?; - // unwrap tuple. - if ast_exprs.len() == 1 { - if let AExpr::Tuple { exprs, .. } = &ast_exprs[0] { - ast_exprs = exprs.clone(); - } - } else { - // Defensive check: - // `ast_exprs` should always contain one element which can be one of the following: - // 1. A tuple of composite cluster keys - // 2. A single cluster key - unreachable!("invalid cluster key ast expression, {:?}", ast_exprs); - } - - let mut exprs = Vec::with_capacity(ast_exprs.len()); - for ast in ast_exprs { - let (scalar, _) = *type_checker.resolve(&ast)?; - let expr = scalar.as_expr()?.project_column_ref(|col| col.index); - + let exprs = parse_ast_exprs(ctx, table_meta, ast_exprs)?; + let mut res = Vec::with_capacity(exprs.len()); + for expr in exprs { let inner_type = expr.data_type().remove_nullable(); let mut should_wrapper = false; if inner_type == DataType::String { @@ -455,153 +432,9 @@ pub fn parse_cluster_keys( } else { expr }; - exprs.push(expr); - } - Ok(exprs) -} - -pub fn parse_hilbert_cluster_key( - ctx: Arc, - table_meta: Arc, - cluster_key_str: &str, -) -> Result> { - let (mut bind_context, metadata) = bind_table(table_meta)?; - let settings = ctx.get_settings(); - let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?; - let mut type_checker = TypeChecker::try_create( - &mut bind_context, - ctx, - &name_resolution_ctx, - metadata, - &[], - true, - )?; - - let tokens = tokenize_sql(cluster_key_str)?; - let sql_dialect = settings.get_sql_dialect().unwrap_or_default(); - let mut ast_exprs = parse_comma_separated_exprs(&tokens, sql_dialect)?; - // unwrap tuple. - if ast_exprs.len() == 1 { - if let AExpr::Tuple { exprs, .. } = &ast_exprs[0] { - ast_exprs = exprs.clone(); - } - } else { - unreachable!("invalid cluster key ast expression, {:?}", ast_exprs); - } - - let expr_len = ast_exprs.len(); - if !(2..=5).contains(&expr_len) { - return Err(ErrorCode::InvalidClusterKeys( - "Hilbert clustering requires the dimension to be between 2 and 5", - )); - } - - let mut max_size = 0; - let mut byte_sizes = Vec::with_capacity(expr_len); - let mut exprs = Vec::with_capacity(expr_len); - for ast in ast_exprs { - let (scalar, _) = *type_checker.resolve(&ast)?; - let expr = scalar.as_expr()?.project_column_ref(|col| col.index); - let byte_size = hilbert_byte_size(expr.data_type())?; - max_size = max_size.max(byte_size); - byte_sizes.push(byte_size); - exprs.push(expr); - } - - let max_size = max_size.min(8); - let common_cast = match max_size { - 1 => "to_int8", - 2 => "to_int16", - 4 => "to_int32", - 8 => "to_int64", - _ => unreachable!(), - }; - let max_val = Expr::Constant { - span: None, - scalar: Scalar::Binary(vec![0xFF; max_size]), - data_type: DataType::Binary, - }; - - for (expr, byte_size) in exprs.iter_mut().zip(byte_sizes.into_iter()) { - let inner_type = expr.data_type().remove_nullable(); - let cast_str = match inner_type { - DataType::Date | DataType::Timestamp | DataType::Boolean => Some(common_cast), - DataType::Decimal(_) => Some("to_float64"), - DataType::Number(t) if max_size > byte_size => { - if matches!(t, NumberDataType::Float32) { - Some("to_float64") - } else { - Some(common_cast) - } - } - _ => None, - }; - *expr = if let Some(cast) = cast_str { - check_function(None, cast, &[], &[expr.clone()], &BUILTIN_FUNCTIONS)? - } else { - expr.clone() - }; - *expr = check_function( - None, - "hilbert_key", - &[], - &[expr.clone()], - &BUILTIN_FUNCTIONS, - )?; - let data_type = expr.data_type(); - let is_nullable = data_type.is_nullable(); - if is_nullable { - let is_not_null_expr = check_function( - None, - "is_not_null", - &[], - &[expr.clone()], - &BUILTIN_FUNCTIONS, - )?; - - let assume_not_null_expr = check_function( - None, - "assume_not_null", - &[], - &[expr.clone()], - &BUILTIN_FUNCTIONS, - )?; - - *expr = check_function( - None, - "if", - &[], - &[is_not_null_expr, assume_not_null_expr, max_val.clone()], - &BUILTIN_FUNCTIONS, - )?; - } - } - - let array = check_function(None, "array", &[], &exprs, &BUILTIN_FUNCTIONS)?; - let result = check_function( - None, - "hilbert_index", - &[], - &[array, Expr::Constant { - span: None, - scalar: Scalar::Number(NumberScalar::UInt64(max_size as u64)), - data_type: DataType::Number(NumberDataType::UInt64), - }], - &BUILTIN_FUNCTIONS, - )?; - Ok(vec![result]) -} - -fn hilbert_byte_size(data_type: &DataType) -> Result { - match data_type { - DataType::Nullable(inner) => hilbert_byte_size(inner), - DataType::Number(_) | DataType::Date | DataType::Timestamp | DataType::Decimal(_) => { - Ok(data_type.numeric_byte_size().unwrap()) - } - DataType::Boolean => Ok(1), - DataType::String => Ok(24), - _ => Err(ErrorCode::Internal("unsupported data type for hilbert")), + res.push(expr); } + Ok(res) } pub fn analyze_cluster_keys( diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index de1ea299a3fdd..3b82f7d6149ee 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -357,6 +357,16 @@ pub async fn optimize(mut opt_ctx: OptimizerContext, plan: Plan) -> Result Ok(Plan::Set(plan)) } + Plan::ReclusterTable { s_expr, is_final } => { + let s_expr = if s_expr.children.is_empty() { + s_expr + } else { + let input_s_expr = optimize_query(&mut opt_ctx, s_expr.child(0)?.clone()).await?; + Box::new(s_expr.replace_children(vec![Arc::new(input_s_expr)])) + }; + Ok(Plan::ReclusterTable { s_expr, is_final }) + } + // Already done in binder // Plan::RefreshIndex(mut plan) => { // // use fresh index @@ -507,7 +517,7 @@ async fn optimize_mutation(mut opt_ctx: OptimizerContext, s_expr: SExpr) -> Resu if let &RelOperator::Exchange(_) = input_s_expr.plan() { input_s_expr = input_s_expr.child(0)?.clone(); } - // If there still exists a Exchange::Merge operator, we should disable distributed optimization and + // If there still exists an Exchange::Merge operator, we should disable distributed optimization and // optimize the input plan again. if input_s_expr.has_merge_exchange() { opt_ctx = opt_ctx.with_enable_distributed_optimization(false); diff --git a/src/query/sql/src/planner/plans/recluster.rs b/src/query/sql/src/planner/plans/recluster.rs index 4cd9fbb6c63d2..0aa7fb0e1d3ee 100644 --- a/src/query/sql/src/planner/plans/recluster.rs +++ b/src/query/sql/src/planner/plans/recluster.rs @@ -13,24 +13,30 @@ // limitations under the License. use databend_common_catalog::plan::Filters; +use databend_storages_common_table_meta::table::ClusterType; +use educe::Educe; use crate::plans::Operator; use crate::plans::RelOp; +use crate::BindContext; +use crate::MetadataRef; -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Debug, Clone, Educe)] +#[educe(Eq, PartialEq, Hash)] pub struct Recluster { pub catalog: String, pub database: String, pub table: String, pub limit: Option, + #[educe(Eq(ignore), Hash(ignore))] pub filters: Option, -} - -impl std::hash::Hash for Recluster { - fn hash(&self, state: &mut H) { - self.table.hash(state); - } + #[educe(Hash(ignore))] + pub cluster_type: ClusterType, + #[educe(Eq(ignore), PartialEq(ignore), Hash(ignore))] + pub bind_context: Box, + #[educe(Eq(ignore), PartialEq(ignore), Hash(ignore))] + pub metadata: MetadataRef, } impl Operator for Recluster { diff --git a/src/query/storages/common/table_meta/src/table/table_keys.rs b/src/query/storages/common/table_meta/src/table/table_keys.rs index 0e4296fc3c8bd..e4202d5c05d51 100644 --- a/src/query/storages/common/table_meta/src/table/table_keys.rs +++ b/src/query/storages/common/table_meta/src/table/table_keys.rs @@ -87,7 +87,7 @@ pub fn is_internal_opt_key>(opt_key: S) -> bool { INTERNAL_TABLE_OPTION_KEYS.contains(opt_key.as_ref().to_lowercase().as_str()) } -#[derive(Debug, Clone, Eq, PartialEq)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Eq, PartialEq, Copy)] pub enum ClusterType { Linear, Hilbert, diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 2d9203154a3b7..3c665b06c21a2 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -64,7 +64,6 @@ use databend_common_meta_app::schema::UpsertTableCopiedFileReq; use databend_common_pipeline_core::Pipeline; use databend_common_sql::binder::STREAM_COLUMN_FACTORY; use databend_common_sql::parse_cluster_keys; -use databend_common_sql::parse_hilbert_cluster_key; use databend_common_sql::BloomIndexColumns; use databend_common_storage::init_operator; use databend_common_storage::DataOperator; @@ -403,28 +402,25 @@ impl FuseTable { self.cluster_key_meta.clone().map(|v| v.0) } - pub fn cluster_keys(&self, ctx: Arc) -> Vec> { + pub fn linear_cluster_keys(&self, ctx: Arc) -> Vec> { + let Some(cluster_type) = self.cluster_type() else { + return vec![]; + }; + if matches!(cluster_type, ClusterType::Hilbert) { + return vec![]; + } + let table_meta = Arc::new(self.clone()); - if let Some((_, order)) = &self.cluster_key_meta { - let cluster_type = self.get_option(OPT_KEY_CLUSTER_TYPE, ClusterType::Linear); - let cluster_keys = match cluster_type { - ClusterType::Linear => parse_cluster_keys(ctx, table_meta.clone(), order), - ClusterType::Hilbert => parse_hilbert_cluster_key(ctx, table_meta.clone(), order), - } - .unwrap(); - - let cluster_keys = cluster_keys - .iter() - .map(|k| { - k.project_column_ref(|index| { - table_meta.schema().field(*index).name().to_string() - }) + let cluster_key_exprs = self.resolve_cluster_keys(ctx.clone()).unwrap(); + let exprs = parse_cluster_keys(ctx, table_meta.clone(), cluster_key_exprs).unwrap(); + let cluster_keys = exprs + .iter() + .map(|k| { + k.project_column_ref(|index| table_meta.schema().field(*index).name().to_string()) .as_remote_expr() - }) - .collect(); - return cluster_keys; - } - vec![] + }) + .collect(); + cluster_keys } pub fn bloom_index_cols(&self) -> BloomIndexColumns { @@ -439,7 +435,7 @@ impl FuseTable { } pub fn cluster_key_types(&self, ctx: Arc) -> Vec { - let Some((_, cluster_key_str)) = &self.cluster_key_meta else { + let Some(ast_exprs) = self.resolve_cluster_keys(ctx.clone()) else { return vec![]; }; let cluster_type = self.get_option(OPT_KEY_CLUSTER_TYPE, ClusterType::Linear); @@ -447,7 +443,7 @@ impl FuseTable { ClusterType::Hilbert => vec![DataType::Binary], ClusterType::Linear => { let cluster_keys = - parse_cluster_keys(ctx, Arc::new(self.clone()), cluster_key_str).unwrap(); + parse_cluster_keys(ctx, Arc::new(self.clone()), ast_exprs).unwrap(); cluster_keys .into_iter() .map(|v| v.data_type().clone()) diff --git a/src/query/storages/fuse/src/operations/append.rs b/src/query/storages/fuse/src/operations/append.rs index 0588a64b4c77a..6f80b8865a1aa 100644 --- a/src/query/storages/fuse/src/operations/append.rs +++ b/src/query/storages/fuse/src/operations/append.rs @@ -33,6 +33,7 @@ use databend_common_pipeline_transforms::processors::TransformSortPartial; use databend_common_sql::evaluator::BlockOperator; use databend_common_sql::evaluator::CompoundBlockOperator; use databend_common_sql::executor::physical_plans::MutationKind; +use databend_storages_common_table_meta::table::ClusterType; use crate::operations::common::TransformSerializeBlock; use crate::statistics::ClusterStatsGenerator; @@ -169,8 +170,8 @@ impl FuseTable { block_thresholds: BlockThresholds, modified_schema: Option>, ) -> Result { - let cluster_keys = self.cluster_keys(ctx.clone()); - if cluster_keys.is_empty() { + let cluster_type = self.cluster_type(); + if cluster_type.is_none_or(|v| v == ClusterType::Hilbert) { return Ok(ClusterStatsGenerator::default()); } @@ -178,6 +179,7 @@ impl FuseTable { modified_schema.unwrap_or(DataSchema::from(self.schema_with_stream()).into()); let mut merged = input_schema.fields().clone(); + let cluster_keys = self.linear_cluster_keys(ctx.clone()); let mut cluster_key_index = Vec::with_capacity(cluster_keys.len()); let mut extra_key_num = 0; diff --git a/src/query/storages/fuse/src/operations/changes.rs b/src/query/storages/fuse/src/operations/changes.rs index 9c8f013c2270c..a049eced60ba6 100644 --- a/src/query/storages/fuse/src/operations/changes.rs +++ b/src/query/storages/fuse/src/operations/changes.rs @@ -292,7 +292,10 @@ impl FuseTable { if !self.is_native() || self.cluster_key_meta().is_none() { (vec![], None) } else { - (self.cluster_keys(ctx.clone()), self.cluster_key_meta()) + ( + self.linear_cluster_keys(ctx.clone()), + self.cluster_key_meta(), + ) }; let bloom_index_cols = self.bloom_index_cols(); let mut pruner = FusePruner::create_with_pages( diff --git a/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs b/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs index 2235d87c5e4f7..a249c1a08f14b 100644 --- a/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs @@ -101,7 +101,7 @@ impl SnapshotGenerator for MutationGenerator { if matches!( self.mutation_kind, - MutationKind::Compact | MutationKind::Recluster + MutationKind::Compact | MutationKind::Recluster(_) ) { // for compaction, a basic but very important verification: // the number of rows should be the same diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index 413a6c1170c44..6c84bd7450361 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -442,7 +442,7 @@ where F: SnapshotGenerator + Send + 'static metrics_inc_commit_copied_files(files.file_info.len() as u64); } for segment_loc in std::mem::take(&mut self.new_segment_locs).into_iter() { - self.ctx.add_segment_location(segment_loc)?; + self.ctx.add_inserted_segment_location(segment_loc)?; } let target_descriptions = { diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs index 3a42880d3b31c..db26f5f330afa 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs @@ -30,10 +30,12 @@ use databend_common_metrics::storage::metrics_inc_recluster_write_block_nums; use databend_common_pipeline_transforms::processors::AsyncAccumulatingTransform; use databend_common_sql::executor::physical_plans::MutationKind; use databend_storages_common_table_meta::meta::BlockMeta; +use databend_storages_common_table_meta::meta::ClusterStatistics; use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::Statistics; use databend_storages_common_table_meta::meta::Versioned; +use databend_storages_common_table_meta::table::ClusterType; use itertools::Itertools; use log::debug; use log::info; @@ -113,7 +115,7 @@ impl AsyncAccumulatingTransform for TableMutationAggregator { }, self.schema.clone(), )), - MutationKind::Recluster => self.apply_recluster(&mut new_segment_locs).await?, + MutationKind::Recluster(_) => self.apply_recluster(&mut new_segment_locs).await?, _ => self.apply_mutation(&mut new_segment_locs).await?, }; @@ -125,6 +127,7 @@ impl AsyncAccumulatingTransform for TableMutationAggregator { } impl TableMutationAggregator { + #[allow(clippy::too_many_arguments)] pub fn create( table: &FuseTable, ctx: Arc, @@ -258,6 +261,8 @@ impl TableMutationAggregator { let chunk_size = merged_blocks.len().div_ceil(segments_num); let default_cluster_key = Some(default_cluster_key_id); let thresholds = self.thresholds; + let block_per_seg = self.block_per_seg; + let kind = self.kind; for chunk in &merged_blocks.into_iter().chunks(chunk_size) { let new_blocks = chunk.collect::>(); let all_perfect = new_blocks.len() > 1; @@ -272,7 +277,8 @@ impl TableMutationAggregator { thresholds, default_cluster_key, all_perfect, - MutationKind::Recluster, + block_per_seg, + kind, ) .await }); @@ -297,7 +303,7 @@ impl TableMutationAggregator { let mut appended_segments = Vec::new(); let mut replaced_segments = HashMap::with_capacity(replaced_segments_len); if new_segments_len > removed_segments_len { - // The remain new segments will be append. + // The remain new segments will be appended. let appended = new_segments.split_off(removed_segments_len); for (location, stats) in appended.into_iter().rev() { let segment_loc = (location, SegmentInfo::VERSION); @@ -406,6 +412,7 @@ impl TableMutationAggregator { ) -> Result> { let thresholds = self.thresholds; let default_cluster_key_id = self.default_cluster_key_id; + let block_per_seg = self.block_per_seg; let mut tasks = Vec::with_capacity(segment_indices.len()); for index in segment_indices { let segment_mutation = self.mutations.remove(&index).unwrap(); @@ -469,6 +476,7 @@ impl TableMutationAggregator { thresholds, default_cluster_key_id, all_perfect, + block_per_seg, kind, ) .await?; @@ -541,6 +549,7 @@ async fn write_segment( thresholds: BlockThresholds, default_cluster_key: Option, all_perfect: bool, + block_per_seg: usize, kind: MutationKind, ) -> Result<(String, Statistics)> { let location = location_gen.gen_segment_info_location(); @@ -555,6 +564,21 @@ async fn write_segment( new_summary.perfect_block_count = new_summary.block_count; } } + if matches!(kind, MutationKind::Recluster(ClusterType::Hilbert)) { + assert!(new_summary.cluster_stats.is_none()); + let level = if new_summary.block_count >= block_per_seg as u64 { + -1 + } else { + 0 + }; + new_summary.cluster_stats = Some(ClusterStatistics { + cluster_key_id: default_cluster_key.unwrap(), + min: vec![], + max: vec![], + level, + pages: None, + }); + } // create new segment info let new_segment = SegmentInfo::new(blocks, new_summary.clone()); new_segment diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs b/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs index 5064f96e729c8..ff60336ac00c5 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs @@ -336,7 +336,7 @@ impl Processor for TransformSerializeBlock { } } - if matches!(self.kind, MutationKind::Recluster) { + if matches!(self.kind, MutationKind::Recluster(_)) { Self::mutation_logs(MutationLogEntry::ReclusterAppendBlock { block_meta: Arc::new(block_meta), }) diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs index 4e487b41d4873..b0c159fdde7b1 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs @@ -349,7 +349,7 @@ impl SegmentCompactChecker { return vec![]; } - if self.total_block_count > 2 * self.block_threshold { + if self.total_block_count >= 2 * self.block_threshold { self.total_block_count = 0; let trivial = vec![(idx, segment)]; if self.segments.is_empty() { @@ -458,7 +458,7 @@ impl CompactTaskBuilder { self.blocks.push(block.clone()); (false, true) } else { - // blocks > 2N + // blocks >= 2N (true, !self.blocks.is_empty()) } } diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index bc3730e9a7b03..79484d4619cb8 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -49,6 +49,7 @@ use databend_storages_common_pruner::TopNPrunner; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::ColumnStatistics; use databend_storages_common_table_meta::table::ChangeType; +use databend_storages_common_table_meta::table::ClusterType; use log::info; use opendal::Operator; use sha2::Digest; @@ -104,6 +105,14 @@ impl FuseTable { ); match snapshot { Some(snapshot) => { + let selected_segment = ctx.get_selected_segment_locations(); + let segment_locs = if !selected_segment.is_empty() { + selected_segment + } else { + snapshot.segments.clone() + }; + let segment_len = segment_locs.len(); + let snapshot_loc = self .meta_location_generator .snapshot_location_from_uuid(&snapshot.snapshot_id, snapshot.format_version)?; @@ -115,10 +124,10 @@ impl FuseTable { nodes_num = cluster.nodes.len(); } - if !dry_run && snapshot.segments.len() > nodes_num && distributed_pruning { - let mut segments = Vec::with_capacity(snapshot.segments.len()); - for (idx, segment_location) in snapshot.segments.iter().enumerate() { - segments.push(FuseLazyPartInfo::create(idx, segment_location.clone())) + if !dry_run && segment_len > nodes_num && distributed_pruning { + let mut segments = Vec::with_capacity(segment_locs.len()); + for (idx, segment_location) in segment_locs.into_iter().enumerate() { + segments.push(FuseLazyPartInfo::create(idx, segment_location)) } return Ok(( @@ -126,8 +135,8 @@ impl FuseTable { Some(snapshot_loc), snapshot.summary.row_count as usize, snapshot.summary.compressed_byte_size as usize, - snapshot.segments.len(), - snapshot.segments.len(), + segment_len, + segment_len, ), Partitions::create(PartitionsShuffleKind::Mod, segments), )); @@ -136,8 +145,7 @@ impl FuseTable { let snapshot_loc = Some(snapshot_loc); let table_schema = self.schema_with_stream(); let summary = snapshot.summary.block_count as usize; - let segments_location = - create_segment_location_vector(snapshot.segments.clone(), snapshot_loc); + let segments_location = create_segment_location_vector(segment_locs, snapshot_loc); self.prune_snapshot_blocks( ctx.clone(), @@ -471,31 +479,32 @@ impl FuseTable { None }; - let pruner = if !self.is_native() || self.cluster_key_meta.is_none() { - FusePruner::create( - &ctx, - dal, - table_schema.clone(), - &push_downs, - self.bloom_index_cols(), - bloom_index_builder, - self.get_storage_format(), - )? - } else { - let cluster_keys = self.cluster_keys(ctx.clone()); - - FusePruner::create_with_pages( - &ctx, - dal, - table_schema, - &push_downs, - self.cluster_key_meta.clone(), - cluster_keys, - self.bloom_index_cols(), - bloom_index_builder, - self.get_storage_format(), - )? - }; + let pruner = + if !self.is_native() || self.cluster_type().is_none_or(|v| v != ClusterType::Linear) { + FusePruner::create( + &ctx, + dal, + table_schema.clone(), + &push_downs, + self.bloom_index_cols(), + bloom_index_builder, + self.get_storage_format(), + )? + } else { + let cluster_keys = self.linear_cluster_keys(ctx.clone()); + + FusePruner::create_with_pages( + &ctx, + dal, + table_schema, + &push_downs, + self.cluster_key_meta.clone(), + cluster_keys, + self.bloom_index_cols(), + bloom_index_builder, + self.get_storage_format(), + )? + }; Ok(pruner) } diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index f28e351c34b10..b4956c81c97a0 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -32,6 +32,7 @@ use databend_common_metrics::storage::metrics_inc_recluster_segment_nums_schedul use databend_common_sql::BloomIndexColumns; use databend_storages_common_table_meta::meta::CompactSegmentInfo; use databend_storages_common_table_meta::meta::TableSnapshot; +use databend_storages_common_table_meta::table::ClusterType; use log::warn; use opendal::Operator; @@ -61,14 +62,12 @@ impl FuseTable { ctx.set_status_info(status); } - if self.cluster_key_meta.is_none() { + let cluster_type = self.cluster_type(); + if cluster_type.is_none_or(|v| v != ClusterType::Linear) { return Ok(None); } - let snapshot_opt = self.read_table_snapshot().await?; - let snapshot = if let Some(val) = snapshot_opt { - val - } else { + let Some(snapshot) = self.read_table_snapshot().await? else { // no snapshot, no recluster. return Ok(None); }; diff --git a/src/query/storages/fuse/src/pruning/fuse_pruner.rs b/src/query/storages/fuse/src/pruning/fuse_pruner.rs index 1e52bc3578c47..f834f13055c22 100644 --- a/src/query/storages/fuse/src/pruning/fuse_pruner.rs +++ b/src/query/storages/fuse/src/pruning/fuse_pruner.rs @@ -508,9 +508,6 @@ impl FusePruner { let res = worker?; metas.extend(res); } - // Todo:: for now, all operation (contains other mutation other than delete, like select,update etc.) - // will get here, we can prevent other mutations like update and so on. - // TopN pruner. self.topn_pruning(metas) } diff --git a/src/query/storages/fuse/src/table_functions/clustering_information.rs b/src/query/storages/fuse/src/table_functions/clustering_information.rs index 1f153b2e1b1aa..ceb3e1748f810 100644 --- a/src/query/storages/fuse/src/table_functions/clustering_information.rs +++ b/src/query/storages/fuse/src/table_functions/clustering_information.rs @@ -189,7 +189,7 @@ impl<'a> ClusteringInformation<'a> { (cluster_key, exprs) } (Some(a), None) => { - let exprs = self.table.cluster_keys(self.ctx.clone()); + let exprs = self.table.linear_cluster_keys(self.ctx.clone()); let exprs = exprs .iter() .map(|k| k.as_expr(&BUILTIN_FUNCTIONS)) @@ -205,13 +205,17 @@ impl<'a> ClusteringInformation<'a> { } }; - let cluster_type = if default_cluster_key_id.is_some() { - self.table - .get_option(OPT_KEY_CLUSTER_TYPE, ClusterType::Linear) - .to_string() - } else { - "linear".to_string() - }; + if default_cluster_key_id.is_some() { + let typ = self + .table + .get_option(OPT_KEY_CLUSTER_TYPE, ClusterType::Linear); + if matches!(typ, ClusterType::Hilbert) { + return Err(ErrorCode::UnsupportedClusterType( + "Unsupported 'hilbert' type, Please use 'hilbert_clustering_information' instead" + )); + } + } + let cluster_type = "linear".to_string(); let snapshot = self.table.read_table_snapshot().await?; let now = Utc::now(); diff --git a/tests/sqllogictests/suites/mode/standalone/ee/explain_hilbert_clustering.test b/tests/sqllogictests/suites/mode/standalone/ee/explain_hilbert_clustering.test new file mode 100644 index 0000000000000..12fbf29fe9c89 --- /dev/null +++ b/tests/sqllogictests/suites/mode/standalone/ee/explain_hilbert_clustering.test @@ -0,0 +1,67 @@ +## Copyright 2023 Databend Cloud +## +## Licensed under the Elastic License, Version 2.0 (the "License"); +## you may not use this file except in compliance with the License. +## You may obtain a copy of the License at +## +## https://www.elastic.co/licensing/elastic-license +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. + +statement ok +CREATE OR REPLACE TABLE test_hilbert(a int, b int) cluster by hilbert(a,b) row_per_block = 2; + +statement ok +INSERT INTO test_hilbert VALUES(1, 1), (1, 2); + +statement ok +INSERT INTO test_hilbert VALUES(2, 1), (2, 2); + +statement ok +ALTER TABLE test_hilbert RECLUSTER FINAL; + +statement error 4013 +select * from clustering_information('default','test_hilbert') + +query T +EXPLAIN SELECT * FROM test_hilbert WHERE a = 1; +---- +Filter +├── output columns: [test_hilbert.a (#0), test_hilbert.b (#1)] +├── filters: [is_true(test_hilbert.a (#0) = 1)] +├── estimated rows: 2.00 +└── TableScan + ├── table: default.default.test_hilbert + ├── output columns: [a (#0), b (#1)] + ├── read rows: 4 + ├── read size: < 1 KiB + ├── partitions total: 2 + ├── partitions scanned: 2 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [is_true(test_hilbert.a (#0) = 1)], limit: NONE] + └── estimated rows: 4.00 + +query T +EXPLAIN SELECT * FROM test_hilbert WHERE b = 1; +---- +Filter +├── output columns: [test_hilbert.a (#0), test_hilbert.b (#1)] +├── filters: [is_true(test_hilbert.b (#1) = 1)] +├── estimated rows: 2.00 +└── TableScan + ├── table: default.default.test_hilbert + ├── output columns: [a (#0), b (#1)] + ├── read rows: 2 + ├── read size: < 1 KiB + ├── partitions total: 2 + ├── partitions scanned: 1 + ├── pruning stats: [segments: , blocks: ] + ├── push downs: [filters: [is_true(test_hilbert.b (#1) = 1)], limit: NONE] + └── estimated rows: 4.00 + +statement ok +DROP TABLE test_hilbert ALL; diff --git a/tests/sqllogictests/suites/mode/standalone/explain/clustering.test b/tests/sqllogictests/suites/mode/standalone/explain/clustering.test index 0a20fe6dc65a4..12f8069645f34 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/clustering.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/clustering.test @@ -54,61 +54,5 @@ Filter ├── push downs: [filters: [is_true(test_linear.b (#1) = 1)], limit: NONE] └── estimated rows: 4.00 -statement ok -CREATE OR REPLACE TABLE test_hilbert(a int, b int) cluster by hilbert(a,b) row_per_block = 2; - -statement ok -INSERT INTO test_hilbert VALUES(1, 1), (1, 2); - -statement ok -INSERT INTO test_hilbert VALUES(2, 1), (2, 2); - -statement ok -ALTER TABLE test_hilbert RECLUSTER FINAL; - -query TTIFFT -select cluster_key, type, total_block_count, average_overlaps, average_depth, block_depth_histogram from clustering_information('default','test_hilbert') ----- -(a, b) hilbert 2 0.0 1.0 {"00001":2} - -query T -EXPLAIN SELECT * FROM test_hilbert WHERE a = 1; ----- -Filter -├── output columns: [test_hilbert.a (#0), test_hilbert.b (#1)] -├── filters: [is_true(test_hilbert.a (#0) = 1)] -├── estimated rows: 2.00 -└── TableScan - ├── table: default.default.test_hilbert - ├── output columns: [a (#0), b (#1)] - ├── read rows: 4 - ├── read size: < 1 KiB - ├── partitions total: 2 - ├── partitions scanned: 2 - ├── pruning stats: [segments: , blocks: ] - ├── push downs: [filters: [is_true(test_hilbert.a (#0) = 1)], limit: NONE] - └── estimated rows: 4.00 - -query T -EXPLAIN SELECT * FROM test_hilbert WHERE b = 1; ----- -Filter -├── output columns: [test_hilbert.a (#0), test_hilbert.b (#1)] -├── filters: [is_true(test_hilbert.b (#1) = 1)] -├── estimated rows: 2.00 -└── TableScan - ├── table: default.default.test_hilbert - ├── output columns: [a (#0), b (#1)] - ├── read rows: 2 - ├── read size: < 1 KiB - ├── partitions total: 2 - ├── partitions scanned: 1 - ├── pruning stats: [segments: , blocks: ] - ├── push downs: [filters: [is_true(test_hilbert.b (#1) = 1)], limit: NONE] - └── estimated rows: 4.00 - statement ok DROP TABLE test_linear ALL; - -statement ok -DROP TABLE test_hilbert ALL;