Skip to content

Commit

Permalink
hilbert_clustering
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Dec 30, 2024
1 parent 6560bf5 commit 059bb42
Show file tree
Hide file tree
Showing 67 changed files with 1,543 additions and 577 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ databend-enterprise-attach-table = { path = "src/query/ee_features/attach_table"
databend-enterprise-background-service = { path = "src/query/ee_features/background_service" }
databend-enterprise-data-mask-feature = { path = "src/query/ee_features/data_mask" }
databend-enterprise-fail-safe = { path = "src/query/ee_features/fail_safe" }
databend-enterprise-hilbert-clustering = { path = "src/query/ee_features/hilbert_clustering" }
databend-enterprise-inverted-index = { path = "src/query/ee_features/inverted_index" }
databend-enterprise-meta = { path = "src/meta/ee" }
databend-enterprise-query = { path = "src/query/ee" }
Expand Down
3 changes: 2 additions & 1 deletion src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,9 @@ build_exceptions! {
// recluster error codes
NoNeedToRecluster(4011),
NoNeedToCompact(4012),
UnsupportedClusterType(4013),

RefreshTableInfoFailure(4012),
RefreshTableInfoFailure(4021),
}

// Service errors [5001,6000].
Expand Down
14 changes: 12 additions & 2 deletions src/common/license/src/license.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub enum Feature {
StorageQuota(StorageQuota),
#[serde(alias = "amend_table", alias = "AMEND_TABLE")]
AmendTable,
#[serde(alias = "hilbert_clustering", alias = "HILBERT_CLUSTERING")]
HilbertClustering,
#[serde(other)]
Unknown,
}
Expand Down Expand Up @@ -119,6 +121,7 @@ impl fmt::Display for Feature {
write!(f, ")")
}
Feature::AmendTable => write!(f, "amend_table"),
Feature::HilbertClustering => write!(f, "hilbert_clustering"),
Feature::Unknown => write!(f, "unknown"),
}
}
Expand Down Expand Up @@ -166,7 +169,8 @@ impl Feature {
| (Feature::InvertedIndex, Feature::InvertedIndex)
| (Feature::VirtualColumn, Feature::VirtualColumn)
| (Feature::AttacheTable, Feature::AttacheTable)
| (Feature::StorageEncryption, Feature::StorageEncryption) => Ok(true),
| (Feature::StorageEncryption, Feature::StorageEncryption)
| (Feature::HilbertClustering, Feature::HilbertClustering) => Ok(true),
(_, _) => Ok(false),
}
}
Expand Down Expand Up @@ -334,6 +338,11 @@ mod tests {
serde_json::from_str::<Feature>("\"amend_table\"").unwrap()
);

assert_eq!(
Feature::HilbertClustering,
serde_json::from_str::<Feature>("\"hilbert_clustering\"").unwrap()
);

assert_eq!(
Feature::Unknown,
serde_json::from_str::<Feature>("\"ssss\"").unwrap()
Expand Down Expand Up @@ -367,11 +376,12 @@ mod tests {
storage_usage: Some(1),
}),
Feature::AmendTable,
Feature::HilbertClustering,
]),
};

assert_eq!(
"LicenseInfo{ type: enterprise, org: databend, tenants: [databend_tenant,foo], features: [aggregate_index,amend_table,attach_table,background_service,compute_quota(threads_num: 1, memory_usage: 1),computed_column,data_mask,inverted_index,license_info,storage_encryption,storage_quota(storage_usage: 1),stream,vacuum,virtual_column] }",
"LicenseInfo{ type: enterprise, org: databend, tenants: [databend_tenant,foo], features: [aggregate_index,amend_table,attach_table,background_service,compute_quota(threads_num: 1, memory_usage: 1),computed_column,data_mask,hilbert_clustering,inverted_index,license_info,storage_encryption,storage_quota(storage_usage: 1),stream,vacuum,virtual_column] }",
license_info.to_string()
);
}
Expand Down
8 changes: 8 additions & 0 deletions src/query/catalog/src/plan/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,3 +391,11 @@ impl ReclusterParts {
}
}
}

// TODO refine this
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)]
pub struct ReclusterInfoSideCar {
pub merged_blocks: Vec<Arc<BlockMeta>>,
pub removed_segment_indexes: Vec<usize>,
pub removed_statistics: Statistics,
}
25 changes: 25 additions & 0 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ use std::sync::Arc;

use chrono::DateTime;
use chrono::Utc;
use databend_common_ast::ast::Expr;
use databend_common_ast::parser::parse_comma_separated_exprs;
use databend_common_ast::parser::tokenize_sql;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::BlockThresholds;
Expand Down Expand Up @@ -136,6 +139,28 @@ pub trait Table: Sync + Send {
Some(cluster_type)
}

fn resolve_cluster_keys(&self, ctx: Arc<dyn TableContext>) -> Option<Vec<Expr>> {
let Some((_, cluster_key_str)) = &self.cluster_key_meta() else {
return None;
};
let tokens = tokenize_sql(cluster_key_str).unwrap();
let sql_dialect = ctx.get_settings().get_sql_dialect().unwrap_or_default();
let mut ast_exprs = parse_comma_separated_exprs(&tokens, sql_dialect).unwrap();
// unwrap tuple.
if ast_exprs.len() == 1 {
if let Expr::Tuple { exprs, .. } = &ast_exprs[0] {
ast_exprs = exprs.clone();
}
} else {
// Defensive check:
// `ast_exprs` should always contain one element which can be one of the following:
// 1. A tuple of composite cluster keys
// 2. A single cluster key
unreachable!("invalid cluster key ast expression, {:?}", ast_exprs);
}
Some(ast_exprs)
}

fn change_tracking_enabled(&self) -> bool {
false
}
Expand Down
18 changes: 15 additions & 3 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,23 @@ pub trait TableContext: Send + Sync {
max_files: Option<usize>,
) -> Result<FilteredCopyFiles>;

fn add_segment_location(&self, segment_loc: Location) -> Result<()>;
fn add_inserted_segment_location(&self, segment_loc: Location) -> Result<()>;

fn clear_segment_locations(&self) -> Result<()>;
fn clear_inserted_segment_locations(&self) -> Result<()>;

fn get_segment_locations(&self) -> Result<Vec<Location>>;
fn get_inserted_segment_locations(&self) -> Result<Vec<Location>>;

fn add_selected_segment_location(&self, _segment_loc: Location) {
unimplemented!()
}

fn get_selected_segment_locations(&self) -> Vec<Location> {
unimplemented!()
}

fn clear_selected_segment_locations(&self) {
unimplemented!()
}

fn add_file_status(&self, file_path: &str, file_status: FileStatus) -> Result<()>;

Expand Down
1 change: 1 addition & 0 deletions src/query/ee/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ databend-enterprise-attach-table = { workspace = true }
databend-enterprise-background-service = { workspace = true }
databend-enterprise-data-mask-feature = { workspace = true }
databend-enterprise-fail-safe = { workspace = true }
databend-enterprise-hilbert-clustering = { workspace = true }
databend-enterprise-inverted-index = { workspace = true }
databend-enterprise-storage-encryption = { workspace = true }
databend-enterprise-storage-quota = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions src/query/ee/src/enterprise_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::attach_table::RealAttachTableHandler;
use crate::background_service::RealBackgroundService;
use crate::data_mask::RealDatamaskHandler;
use crate::fail_safe::RealFailSafeHandler;
use crate::hilbert_clustering::RealHilbertClusteringHandler;
use crate::inverted_index::RealInvertedIndexHandler;
use crate::license::license_mgr::RealLicenseManager;
use crate::storage_encryption::RealStorageEncryptionHandler;
Expand All @@ -45,6 +46,7 @@ impl EnterpriseServices {
RealInvertedIndexHandler::init()?;
RealStorageQuotaHandler::init(&cfg)?;
RealFailSafeHandler::init()?;
RealHilbertClusteringHandler::init()?;
Ok(())
}
}
178 changes: 178 additions & 0 deletions src/query/ee/src/hilbert_clustering/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// Copyright 2023 Databend Cloud
//
// Licensed under the Elastic License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.elastic.co/licensing/elastic-license
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_base::base::GlobalInstance;
use databend_common_catalog::plan::PushDownInfo;
use databend_common_catalog::plan::ReclusterInfoSideCar;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::BlockThresholds;
use databend_common_storages_fuse::pruning::create_segment_location_vector;
use databend_common_storages_fuse::statistics::reducers::merge_statistics_mut;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::SegmentLocation;
use databend_common_storages_fuse::DEFAULT_BLOCK_PER_SEGMENT;
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
use databend_enterprise_hilbert_clustering::HilbertClusteringHandler;
use databend_enterprise_hilbert_clustering::HilbertClusteringHandlerWrapper;
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableSnapshot;

pub struct RealHilbertClusteringHandler {}

#[async_trait::async_trait]
impl HilbertClusteringHandler for RealHilbertClusteringHandler {
#[async_backtrace::framed]
async fn do_hilbert_clustering(
&self,
table: Arc<dyn Table>,
ctx: Arc<dyn TableContext>,
push_downs: Option<PushDownInfo>,
) -> Result<Option<(ReclusterInfoSideCar, Arc<TableSnapshot>)>> {
let Some((cluster_key_id, _)) = table.cluster_key_meta() else {
return Ok(None);
};

let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let Some(snapshot) = fuse_table.read_table_snapshot().await? else {
// no snapshot, no recluster.
return Ok(None);
};

let block_per_seg =
fuse_table.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT);
let block_thresholds = fuse_table.get_block_thresholds();
let thresholds = BlockThresholds {
max_rows_per_block: block_per_seg * block_thresholds.max_rows_per_block,
min_rows_per_block: block_per_seg * block_thresholds.min_rows_per_block,
max_bytes_per_block: block_per_seg * block_thresholds.max_bytes_per_block,
};
let segment_locations = snapshot.segments.clone();
let segment_locations = create_segment_location_vector(segment_locations, None);

let max_threads = ctx.get_settings().get_max_threads()? as usize;
let chunk_size = max_threads * 4;
let mut target_segments = vec![];
let mut total_rows = 0;
let mut total_size = 0;
let mut stable = false;
'FOR: for chunk in segment_locations.chunks(chunk_size) {
// read segments.
let compact_segments = FuseTable::segment_pruning(
&ctx,
fuse_table.schema_with_stream(),
fuse_table.get_operator(),
&push_downs,
fuse_table.get_storage_format(),
chunk.to_vec(),
)
.await?;

if compact_segments.is_empty() {
continue;
}

for (location, segment) in compact_segments.into_iter() {
total_rows += segment.summary.row_count as usize;
total_size += segment.summary.uncompressed_byte_size as usize;
if !thresholds.check_large_enough(total_rows, total_size) {
// totals < N
target_segments.push((location, segment.clone()));
continue;
}

if thresholds.check_for_compact(total_rows, total_size) {
// N <= totals < 2N
target_segments.push((location, segment.clone()));
} else {
// totals >= 2N
let new_target_segments = vec![(location, segment)];
if Self::check_for_recluster(&new_target_segments, cluster_key_id, stable) {
target_segments = new_target_segments;
stable = true;
break 'FOR;
}
};

if Self::check_for_recluster(&target_segments, cluster_key_id, stable) {
stable = true;
break 'FOR;
}
target_segments.clear();
total_rows = 0;
total_size = 0;
}
}

if !stable && !Self::check_for_recluster(&target_segments, cluster_key_id, stable) {
return Ok(None);
}

let rows_per_block = block_thresholds.calc_rows_per_block(total_size, total_rows) as u64;
let block_size = ctx.get_settings().get_max_block_size()?;
ctx.get_settings()
.set_max_block_size(rows_per_block.min(block_size))?;

let mut removed_statistics = Statistics::default();
let mut removed_segment_indexes = Vec::with_capacity(target_segments.len());
for (segment_loc, segment) in target_segments {
ctx.add_selected_segment_location(segment_loc.location);
removed_segment_indexes.push(segment_loc.segment_idx);
merge_statistics_mut(
&mut removed_statistics,
&segment.summary,
Some(cluster_key_id),
);
}

let recluster_info = ReclusterInfoSideCar {
merged_blocks: vec![],
removed_segment_indexes,
removed_statistics,
};
Ok(Some((recluster_info, snapshot)))
}
}

impl RealHilbertClusteringHandler {
pub fn init() -> Result<()> {
let handler = RealHilbertClusteringHandler {};
let wrapper = HilbertClusteringHandlerWrapper::new(Box::new(handler));
GlobalInstance::set(Arc::new(wrapper));
Ok(())
}

fn check_for_recluster(
segments: &[(SegmentLocation, Arc<CompactSegmentInfo>)],
default_cluster_id: u32,
stable: bool,
) -> bool {
match segments.len() {
0 => false,
1 => segments[0]
.1
.summary
.cluster_stats
.as_ref()
.map_or(true, |stats| {
stats.cluster_key_id != default_cluster_id || (stats.level != -1 && stable)
}),
_ => true,
}
}
}
Loading

0 comments on commit 059bb42

Please sign in to comment.