diff --git a/src/query/service/src/interpreters/interpreter_replace.rs b/src/query/service/src/interpreters/interpreter_replace.rs index 65084b30ac97..18c1e6eed442 100644 --- a/src/query/service/src/interpreters/interpreter_replace.rs +++ b/src/query/service/src/interpreters/interpreter_replace.rs @@ -47,6 +47,7 @@ use databend_common_storage::StageFileInfo; use databend_common_storages_factory::Table; use databend_common_storages_fuse::FuseTable; use databend_storages_common_table_meta::readers::snapshot_reader::TableSnapshotAccessor; +use databend_storages_common_table_meta::table::ClusterType; use parking_lot::RwLock; use crate::interpreters::common::check_deduplicate_label; @@ -290,7 +291,10 @@ impl ReplaceInterpreter { .ctx .get_settings() .get_replace_into_bloom_pruning_max_column_number()?; - let bloom_filter_column_indexes = if table.cluster_key_meta().is_some() { + let bloom_filter_column_indexes = if table + .cluster_type() + .is_some_and(|v| v == ClusterType::Linear) + { fuse_table .choose_bloom_filter_columns( self.ctx.clone(), diff --git a/src/query/service/src/pipelines/builders/builder_commit.rs b/src/query/service/src/pipelines/builders/builder_commit.rs index 53bb53140e7b..e4becfd76f55 100644 --- a/src/query/service/src/pipelines/builders/builder_commit.rs +++ b/src/query/service/src/pipelines/builders/builder_commit.rs @@ -40,7 +40,7 @@ impl PipelineBuilder { self.main_pipeline.add_async_accumulating_transformer(|| { let base_segments = if matches!( plan.mutation_kind, - MutationKind::Compact | MutationKind::Insert | MutationKind::Recluster(_) + MutationKind::Compact | MutationKind::Insert | MutationKind::Recluster ) { vec![] } else { diff --git a/src/query/service/src/pipelines/builders/builder_hilbert_serialize.rs b/src/query/service/src/pipelines/builders/builder_hilbert_serialize.rs index b9bb69cca7f6..d931544037aa 100644 --- a/src/query/service/src/pipelines/builders/builder_hilbert_serialize.rs +++ b/src/query/service/src/pipelines/builders/builder_hilbert_serialize.rs @@ -21,7 +21,6 @@ use databend_common_storages_fuse::operations::TransformSerializeBlock; use databend_common_storages_fuse::statistics::ClusterStatsGenerator; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::TableContext; -use databend_storages_common_table_meta::table::ClusterType; use crate::pipelines::PipelineBuilder; @@ -49,7 +48,7 @@ impl PipelineBuilder { transform_output_port, table, ClusterStatsGenerator::default(), - MutationKind::Recluster(ClusterType::Hilbert), + MutationKind::Recluster, )?; proc.into_processor() }) diff --git a/src/query/service/src/pipelines/builders/builder_recluster.rs b/src/query/service/src/pipelines/builders/builder_recluster.rs index 82c5938c6f62..f57e149feab6 100644 --- a/src/query/service/src/pipelines/builders/builder_recluster.rs +++ b/src/query/service/src/pipelines/builders/builder_recluster.rs @@ -34,7 +34,6 @@ use databend_common_storages_factory::Table; use databend_common_storages_fuse::operations::TransformSerializeBlock; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::TableContext; -use databend_storages_common_table_meta::table::ClusterType; use crate::pipelines::builders::SortPipelineBuilder; use crate::pipelines::processors::TransformAddStreamColumns; @@ -178,7 +177,7 @@ impl PipelineBuilder { transform_output_port, table, cluster_stats_gen.clone(), - MutationKind::Recluster(ClusterType::Linear), + MutationKind::Recluster, )?; proc.into_processor() }) diff --git a/src/query/sql/src/executor/physical_plans/common.rs b/src/query/sql/src/executor/physical_plans/common.rs index 3dbc09ba2daa..230b5f2af50d 100644 --- a/src/query/sql/src/executor/physical_plans/common.rs +++ b/src/query/sql/src/executor/physical_plans/common.rs @@ -17,7 +17,6 @@ use std::fmt::Formatter; use databend_common_expression::types::DataType; use databend_common_expression::Scalar; -use databend_storages_common_table_meta::table::ClusterType; use crate::plans::UDFField; use crate::plans::UDFType; @@ -71,7 +70,7 @@ pub enum MutationKind { Delete, Update, Replace, - Recluster(ClusterType), + Recluster, Insert, Compact, MergeInto, @@ -82,10 +81,7 @@ impl Display for MutationKind { match self { MutationKind::Delete => write!(f, "Delete"), MutationKind::Insert => write!(f, "Insert"), - MutationKind::Recluster(cluster_type) => match cluster_type { - ClusterType::Linear => write!(f, "Recluster"), - ClusterType::Hilbert => write!(f, "Hilbert Recluster"), - }, + MutationKind::Recluster => write!(f, "Recluster"), MutationKind::Update => write!(f, "Update"), MutationKind::Replace => write!(f, "Replace"), MutationKind::Compact => write!(f, "Compact"), diff --git a/src/query/sql/src/executor/physical_plans/physical_recluster.rs b/src/query/sql/src/executor/physical_plans/physical_recluster.rs index 75b1d07815eb..a82d2882d2a0 100644 --- a/src/query/sql/src/executor/physical_plans/physical_recluster.rs +++ b/src/query/sql/src/executor/physical_plans/physical_recluster.rs @@ -21,7 +21,6 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::schema::TableInfo; use databend_enterprise_hilbert_clustering::get_hilbert_clustering_handler; -use databend_storages_common_table_meta::table::ClusterType; use crate::executor::physical_plans::CommitSink; use crate::executor::physical_plans::CompactSource; @@ -90,7 +89,7 @@ impl PhysicalPlanBuilder { input: Box::new(plan), table_info, snapshot: Some(snapshot), - mutation_kind: MutationKind::Recluster(ClusterType::Hilbert), + mutation_kind: MutationKind::Recluster, update_stream_meta: vec![], merge_meta: false, deduplicated_label: None, @@ -139,7 +138,7 @@ impl PhysicalPlanBuilder { input: Box::new(root), table_info, snapshot: Some(snapshot), - mutation_kind: MutationKind::Recluster(ClusterType::Linear), + mutation_kind: MutationKind::Recluster, update_stream_meta: vec![], merge_meta: false, deduplicated_label: None, diff --git a/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs b/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs index a249c1a08f14..2235d87c5e4f 100644 --- a/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs @@ -101,7 +101,7 @@ impl SnapshotGenerator for MutationGenerator { if matches!( self.mutation_kind, - MutationKind::Compact | MutationKind::Recluster(_) + MutationKind::Compact | MutationKind::Recluster ) { // for compaction, a basic but very important verification: // the number of rows should be the same diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs index db26f5f330af..7e44a6573e46 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs @@ -63,16 +63,17 @@ use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; pub struct TableMutationAggregator { ctx: Arc, schema: TableSchemaRef, + table_id: u64, dal: Operator, location_gen: TableMetaLocationGenerator, - thresholds: BlockThresholds, block_per_seg: usize, - default_cluster_key_id: Option, + default_cluster_key_id: Option, base_segments: Vec, // Used for recluster. recluster_merged_blocks: Vec>, + set_hilbert_level: bool, mutations: HashMap, appended_segments: Vec, @@ -83,7 +84,6 @@ pub struct TableMutationAggregator { kind: MutationKind, start_time: Instant, finished_tasks: usize, - table_id: u64, } // takes in table mutation logs and aggregates them (former mutation_transform) @@ -115,7 +115,7 @@ impl AsyncAccumulatingTransform for TableMutationAggregator { }, self.schema.clone(), )), - MutationKind::Recluster(_) => self.apply_recluster(&mut new_segment_locs).await?, + MutationKind::Recluster => self.apply_recluster(&mut new_segment_locs).await?, _ => self.apply_mutation(&mut new_segment_locs).await?, }; @@ -137,6 +137,16 @@ impl TableMutationAggregator { removed_statistics: Statistics, kind: MutationKind, ) -> Self { + let set_hilbert_level = table + .cluster_type() + .is_some_and(|v| matches!(v, ClusterType::Hilbert)) + && matches!( + kind, + MutationKind::Delete + | MutationKind::MergeInto + | MutationKind::Replace + | MutationKind::Recluster + ); TableMutationAggregator { ctx, schema: table.schema(), @@ -144,6 +154,7 @@ impl TableMutationAggregator { location_gen: table.meta_location_generator().clone(), thresholds: table.get_block_thresholds(), default_cluster_key_id: table.cluster_key_id(), + set_hilbert_level, block_per_seg: table .get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT), mutations: HashMap::new(), @@ -262,6 +273,7 @@ impl TableMutationAggregator { let default_cluster_key = Some(default_cluster_key_id); let thresholds = self.thresholds; let block_per_seg = self.block_per_seg; + let set_hilbert_level = self.set_hilbert_level; let kind = self.kind; for chunk in &merged_blocks.into_iter().chunks(chunk_size) { let new_blocks = chunk.collect::>(); @@ -279,6 +291,7 @@ impl TableMutationAggregator { all_perfect, block_per_seg, kind, + set_hilbert_level, ) .await }); @@ -413,6 +426,8 @@ impl TableMutationAggregator { let thresholds = self.thresholds; let default_cluster_key_id = self.default_cluster_key_id; let block_per_seg = self.block_per_seg; + let kind = self.kind; + let set_hilbert_level = self.set_hilbert_level; let mut tasks = Vec::with_capacity(segment_indices.len()); for index in segment_indices { let segment_mutation = self.mutations.remove(&index).unwrap(); @@ -420,10 +435,10 @@ impl TableMutationAggregator { let schema = self.schema.clone(); let op = self.dal.clone(); let location_gen = self.location_gen.clone(); - let kind = self.kind; - let mut all_perfect = false; tasks.push(async move { + let mut all_perfect = false; + let mut set_level = false; let (new_blocks, origin_summary) = if let Some(loc) = location { // read the old segment let compact_segment_info = @@ -453,6 +468,12 @@ impl TableMutationAggregator { // assign back the mutated blocks to segment let new_blocks = block_editor.into_values().collect::>(); + set_level = set_hilbert_level + && segment_info + .summary + .cluster_stats + .as_ref() + .is_some_and(|v| v.cluster_key_id == default_cluster_key_id.unwrap()); (new_blocks, Some(segment_info.summary)) } else { // use by compact. @@ -478,6 +499,7 @@ impl TableMutationAggregator { all_perfect, block_per_seg, kind, + set_level, ) .await?; @@ -551,6 +573,7 @@ async fn write_segment( all_perfect: bool, block_per_seg: usize, kind: MutationKind, + set_hilbert_level: bool, ) -> Result<(String, Statistics)> { let location = location_gen.gen_segment_info_location(); let mut new_summary = reduce_block_metas(&blocks, thresholds, default_cluster_key); @@ -564,9 +587,13 @@ async fn write_segment( new_summary.perfect_block_count = new_summary.block_count; } } - if matches!(kind, MutationKind::Recluster(ClusterType::Hilbert)) { - assert!(new_summary.cluster_stats.is_none()); - let level = if new_summary.block_count >= block_per_seg as u64 { + if set_hilbert_level { + debug_assert!(new_summary.cluster_stats.is_none()); + let level = if new_summary.block_count >= block_per_seg as u64 + && (new_summary.row_count as usize >= block_per_seg * thresholds.min_rows_per_block + || new_summary.uncompressed_byte_size as usize + >= block_per_seg * thresholds.max_bytes_per_block) + { -1 } else { 0 diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs b/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs index ff60336ac00c..5064f96e729c 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs @@ -336,7 +336,7 @@ impl Processor for TransformSerializeBlock { } } - if matches!(self.kind, MutationKind::Recluster(_)) { + if matches!(self.kind, MutationKind::Recluster) { Self::mutation_logs(MutationLogEntry::ReclusterAppendBlock { block_meta: Arc::new(block_meta), })