Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Jan 5, 2025
1 parent 1771fbd commit 8536f92
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 26 deletions.
6 changes: 5 additions & 1 deletion src/query/service/src/interpreters/interpreter_replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use databend_common_storage::StageFileInfo;
use databend_common_storages_factory::Table;
use databend_common_storages_fuse::FuseTable;
use databend_storages_common_table_meta::readers::snapshot_reader::TableSnapshotAccessor;
use databend_storages_common_table_meta::table::ClusterType;
use parking_lot::RwLock;

use crate::interpreters::common::check_deduplicate_label;
Expand Down Expand Up @@ -290,7 +291,10 @@ impl ReplaceInterpreter {
.ctx
.get_settings()
.get_replace_into_bloom_pruning_max_column_number()?;
let bloom_filter_column_indexes = if table.cluster_key_meta().is_some() {
let bloom_filter_column_indexes = if table
.cluster_type()
.is_some_and(|v| v == ClusterType::Linear)
{
fuse_table
.choose_bloom_filter_columns(
self.ctx.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/pipelines/builders/builder_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl PipelineBuilder {
self.main_pipeline.add_async_accumulating_transformer(|| {
let base_segments = if matches!(
plan.mutation_kind,
MutationKind::Compact | MutationKind::Insert | MutationKind::Recluster(_)
MutationKind::Compact | MutationKind::Insert | MutationKind::Recluster
) {
vec![]
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use databend_common_storages_fuse::operations::TransformSerializeBlock;
use databend_common_storages_fuse::statistics::ClusterStatsGenerator;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::TableContext;
use databend_storages_common_table_meta::table::ClusterType;

use crate::pipelines::PipelineBuilder;

Expand Down Expand Up @@ -49,7 +48,7 @@ impl PipelineBuilder {
transform_output_port,
table,
ClusterStatsGenerator::default(),
MutationKind::Recluster(ClusterType::Hilbert),
MutationKind::Recluster,
)?;
proc.into_processor()
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use databend_common_storages_factory::Table;
use databend_common_storages_fuse::operations::TransformSerializeBlock;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::TableContext;
use databend_storages_common_table_meta::table::ClusterType;

use crate::pipelines::builders::SortPipelineBuilder;
use crate::pipelines::processors::TransformAddStreamColumns;
Expand Down Expand Up @@ -178,7 +177,7 @@ impl PipelineBuilder {
transform_output_port,
table,
cluster_stats_gen.clone(),
MutationKind::Recluster(ClusterType::Linear),
MutationKind::Recluster,
)?;
proc.into_processor()
})
Expand Down
8 changes: 2 additions & 6 deletions src/query/sql/src/executor/physical_plans/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::fmt::Formatter;

use databend_common_expression::types::DataType;
use databend_common_expression::Scalar;
use databend_storages_common_table_meta::table::ClusterType;

use crate::plans::UDFField;
use crate::plans::UDFType;
Expand Down Expand Up @@ -71,7 +70,7 @@ pub enum MutationKind {
Delete,
Update,
Replace,
Recluster(ClusterType),
Recluster,
Insert,
Compact,
MergeInto,
Expand All @@ -82,10 +81,7 @@ impl Display for MutationKind {
match self {
MutationKind::Delete => write!(f, "Delete"),
MutationKind::Insert => write!(f, "Insert"),
MutationKind::Recluster(cluster_type) => match cluster_type {
ClusterType::Linear => write!(f, "Recluster"),
ClusterType::Hilbert => write!(f, "Hilbert Recluster"),
},
MutationKind::Recluster => write!(f, "Recluster"),
MutationKind::Update => write!(f, "Update"),
MutationKind::Replace => write!(f, "Replace"),
MutationKind::Compact => write!(f, "Compact"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::TableInfo;
use databend_enterprise_hilbert_clustering::get_hilbert_clustering_handler;
use databend_storages_common_table_meta::table::ClusterType;

use crate::executor::physical_plans::CommitSink;
use crate::executor::physical_plans::CompactSource;
Expand Down Expand Up @@ -90,7 +89,7 @@ impl PhysicalPlanBuilder {
input: Box::new(plan),
table_info,
snapshot: Some(snapshot),
mutation_kind: MutationKind::Recluster(ClusterType::Hilbert),
mutation_kind: MutationKind::Recluster,
update_stream_meta: vec![],
merge_meta: false,
deduplicated_label: None,
Expand Down Expand Up @@ -139,7 +138,7 @@ impl PhysicalPlanBuilder {
input: Box::new(root),
table_info,
snapshot: Some(snapshot),
mutation_kind: MutationKind::Recluster(ClusterType::Linear),
mutation_kind: MutationKind::Recluster,
update_stream_meta: vec![],
merge_meta: false,
deduplicated_label: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl SnapshotGenerator for MutationGenerator {

if matches!(
self.mutation_kind,
MutationKind::Compact | MutationKind::Recluster(_)
MutationKind::Compact | MutationKind::Recluster
) {
// for compaction, a basic but very important verification:
// the number of rows should be the same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,17 @@ use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
pub struct TableMutationAggregator {
ctx: Arc<dyn TableContext>,
schema: TableSchemaRef,
table_id: u64,
dal: Operator,
location_gen: TableMetaLocationGenerator,

thresholds: BlockThresholds,
block_per_seg: usize,
default_cluster_key_id: Option<u32>,

default_cluster_key_id: Option<u32>,
base_segments: Vec<Location>,
// Used for recluster.
recluster_merged_blocks: Vec<Arc<BlockMeta>>,
set_hilbert_level: bool,

mutations: HashMap<SegmentIndex, BlockMutations>,
appended_segments: Vec<Location>,
Expand All @@ -83,7 +84,6 @@ pub struct TableMutationAggregator {
kind: MutationKind,
start_time: Instant,
finished_tasks: usize,
table_id: u64,
}

// takes in table mutation logs and aggregates them (former mutation_transform)
Expand Down Expand Up @@ -115,7 +115,7 @@ impl AsyncAccumulatingTransform for TableMutationAggregator {
},
self.schema.clone(),
)),
MutationKind::Recluster(_) => self.apply_recluster(&mut new_segment_locs).await?,
MutationKind::Recluster => self.apply_recluster(&mut new_segment_locs).await?,
_ => self.apply_mutation(&mut new_segment_locs).await?,
};

Expand All @@ -137,13 +137,24 @@ impl TableMutationAggregator {
removed_statistics: Statistics,
kind: MutationKind,
) -> Self {
let set_hilbert_level = table
.cluster_type()
.is_some_and(|v| matches!(v, ClusterType::Hilbert))
&& matches!(
kind,
MutationKind::Delete
| MutationKind::MergeInto
| MutationKind::Replace
| MutationKind::Recluster
);
TableMutationAggregator {
ctx,
schema: table.schema(),
dal: table.get_operator(),
location_gen: table.meta_location_generator().clone(),
thresholds: table.get_block_thresholds(),
default_cluster_key_id: table.cluster_key_id(),
set_hilbert_level,
block_per_seg: table
.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT),
mutations: HashMap::new(),
Expand Down Expand Up @@ -262,6 +273,7 @@ impl TableMutationAggregator {
let default_cluster_key = Some(default_cluster_key_id);
let thresholds = self.thresholds;
let block_per_seg = self.block_per_seg;
let set_hilbert_level = self.set_hilbert_level;
let kind = self.kind;
for chunk in &merged_blocks.into_iter().chunks(chunk_size) {
let new_blocks = chunk.collect::<Vec<_>>();
Expand All @@ -279,6 +291,7 @@ impl TableMutationAggregator {
all_perfect,
block_per_seg,
kind,
set_hilbert_level,
)
.await
});
Expand Down Expand Up @@ -413,17 +426,19 @@ impl TableMutationAggregator {
let thresholds = self.thresholds;
let default_cluster_key_id = self.default_cluster_key_id;
let block_per_seg = self.block_per_seg;
let kind = self.kind;
let set_hilbert_level = self.set_hilbert_level;
let mut tasks = Vec::with_capacity(segment_indices.len());
for index in segment_indices {
let segment_mutation = self.mutations.remove(&index).unwrap();
let location = self.base_segments.get(index).cloned();
let schema = self.schema.clone();
let op = self.dal.clone();
let location_gen = self.location_gen.clone();
let kind = self.kind;

let mut all_perfect = false;
tasks.push(async move {
let mut all_perfect = false;
let mut set_level = false;
let (new_blocks, origin_summary) = if let Some(loc) = location {
// read the old segment
let compact_segment_info =
Expand Down Expand Up @@ -453,6 +468,12 @@ impl TableMutationAggregator {

// assign back the mutated blocks to segment
let new_blocks = block_editor.into_values().collect::<Vec<_>>();
set_level = set_hilbert_level
&& segment_info
.summary
.cluster_stats
.as_ref()
.is_some_and(|v| v.cluster_key_id == default_cluster_key_id.unwrap());
(new_blocks, Some(segment_info.summary))
} else {
// use by compact.
Expand All @@ -478,6 +499,7 @@ impl TableMutationAggregator {
all_perfect,
block_per_seg,
kind,
set_level,
)
.await?;

Expand Down Expand Up @@ -551,6 +573,7 @@ async fn write_segment(
all_perfect: bool,
block_per_seg: usize,
kind: MutationKind,
set_hilbert_level: bool,
) -> Result<(String, Statistics)> {
let location = location_gen.gen_segment_info_location();
let mut new_summary = reduce_block_metas(&blocks, thresholds, default_cluster_key);
Expand All @@ -564,9 +587,13 @@ async fn write_segment(
new_summary.perfect_block_count = new_summary.block_count;
}
}
if matches!(kind, MutationKind::Recluster(ClusterType::Hilbert)) {
assert!(new_summary.cluster_stats.is_none());
let level = if new_summary.block_count >= block_per_seg as u64 {
if set_hilbert_level {
debug_assert!(new_summary.cluster_stats.is_none());
let level = if new_summary.block_count >= block_per_seg as u64
&& (new_summary.row_count as usize >= block_per_seg * thresholds.min_rows_per_block
|| new_summary.uncompressed_byte_size as usize
>= block_per_seg * thresholds.max_bytes_per_block)
{
-1
} else {
0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl Processor for TransformSerializeBlock {
}
}

if matches!(self.kind, MutationKind::Recluster(_)) {
if matches!(self.kind, MutationKind::Recluster) {
Self::mutation_logs(MutationLogEntry::ReclusterAppendBlock {
block_meta: Arc::new(block_meta),
})
Expand Down

0 comments on commit 8536f92

Please sign in to comment.