Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Jan 2, 2025
1 parent 595cff9 commit 6ba61fd
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 154 deletions.
14 changes: 9 additions & 5 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,23 +197,27 @@ async fn compact_table(
// do recluster.
if let Some(cluster_type) = table.cluster_type() {
if cluster_type == ClusterType::Linear {
// evict the table from cache
ctx.evict_table_from_cache(
&compact_target.catalog,
&compact_target.database,
&compact_target.table,
)?;
let recluster = RelOperator::Recluster(Recluster {
catalog: compact_target.catalog,
database: compact_target.database,
table: compact_target.table,
filters: None,
limit: Some(settings.get_auto_compaction_segments_limit()? as usize),
cluster_type: ClusterType::Linear,
metadata: Default::default(),
bind_context: Default::default(),
filters: None,
hilbert_stmt: None,
});
let s_expr = SExpr::create_leaf(Arc::new(recluster));
let recluster_interpreter =
ReclusterTableInterpreter::try_create(ctx.clone(), s_expr, lock_opt, false)?;
// Recluster will be done in `ReclusterTableInterpreter::execute2` directly,
// we do not need to use `PipelineCompleteExecutor` to execute it.
let build_res = recluster_interpreter.execute2().await?;
assert!(build_res.main_pipeline.is_empty());
debug_assert!(build_res.main_pipeline.is_empty());
}
}
}
Expand Down
21 changes: 12 additions & 9 deletions src/query/service/src/interpreters/interpreter_table_recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
Expand All @@ -22,6 +23,7 @@ use databend_common_exception::Result;
use databend_common_sql::executor::PhysicalPlanBuilder;
use databend_common_sql::optimizer::SExpr;
use databend_common_sql::plans::Recluster;
use databend_common_sql::MetadataRef;
use log::error;
use log::warn;

Expand Down Expand Up @@ -77,6 +79,7 @@ impl Interpreter for ReclusterTableInterpreter {
let mut times = 0;
let start = SystemTime::now();
let timeout = Duration::from_secs(recluster_timeout_secs);
let plan: Recluster = self.s_expr.plan().clone().try_into()?;
loop {
if let Err(err) = ctx.check_aborting() {
error!(
Expand All @@ -85,7 +88,7 @@ impl Interpreter for ReclusterTableInterpreter {
return Err(err.with_context("failed to execute"));
}

let res = self.execute_recluster().await;
let res = self.execute_recluster(plan.clone()).await;

match res {
Ok(is_break) => {
Expand Down Expand Up @@ -132,16 +135,19 @@ impl Interpreter for ReclusterTableInterpreter {
);
break;
}

self.ctx.clear_selected_segment_locations();
self.ctx
.evict_table_from_cache(&plan.catalog, &plan.database, &plan.table)?;
}

Ok(PipelineBuildResult::create())
}
}

impl ReclusterTableInterpreter {
async fn execute_recluster(&self) -> Result<bool> {
async fn execute_recluster(&self, plan: Recluster) -> Result<bool> {
let start = SystemTime::now();
let plan: Recluster = self.s_expr.plan().clone().try_into()?;

// try to add lock table.
let lock_guard = self
Expand All @@ -150,11 +156,8 @@ impl ReclusterTableInterpreter {
.acquire_table_lock(&plan.catalog, &plan.database, &plan.table, &self.lock_opt)
.await?;

let mut builder = PhysicalPlanBuilder::new(plan.metadata.clone(), self.ctx.clone(), false);
let physical_plan = match builder
.build(&self.s_expr, plan.bind_context.column_set())
.await
{
let mut builder = PhysicalPlanBuilder::new(MetadataRef::default(), self.ctx.clone(), false);
let physical_plan = match builder.build(&self.s_expr, HashSet::new()).await {
Ok(res) => res,
Err(e) => {
return if e.code() == ErrorCode::NO_NEED_TO_RECLUSTER {
Expand All @@ -167,7 +170,7 @@ impl ReclusterTableInterpreter {

let mut build_res =
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?;
assert!(build_res.main_pipeline.is_complete_pipeline()?);
debug_assert!(build_res.main_pipeline.is_complete_pipeline()?);

let max_threads = self.ctx.get_settings().get_max_threads()? as usize;
build_res.set_max_threads(max_threads);
Expand Down
8 changes: 5 additions & 3 deletions src/query/sql/src/executor/physical_plan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,18 @@ impl PhysicalPlanBuilder {
RelOperator::MutationSource(mutation_source) => {
self.build_mutation_source(mutation_source).await
}
RelOperator::Recluster(recluster) => {
self.build_recluster(s_expr, recluster, required).await
}
RelOperator::Recluster(recluster) => self.build_recluster(recluster).await,
RelOperator::CompactBlock(compact) => self.build_compact_block(compact).await,
}
}

pub fn set_mutation_build_info(&mut self, mutation_build_info: MutationBuildInfo) {
self.mutation_build_info = Some(mutation_build_info);
}

pub fn set_metadata(&mut self, metadata: MetadataRef) {
self.metadata = metadata;
}
}

#[derive(Clone)]
Expand Down
42 changes: 25 additions & 17 deletions src/query/sql/src/executor/physical_plans/physical_recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use databend_common_catalog::plan::PushDownInfo;
use databend_common_catalog::plan::ReclusterInfoSideCar;
use databend_common_catalog::plan::ReclusterParts;
use databend_common_catalog::plan::ReclusterTask;
use databend_common_catalog::table::TableExt;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::TableInfo;
Expand All @@ -31,8 +30,9 @@ use crate::executor::physical_plans::FragmentKind;
use crate::executor::physical_plans::MutationKind;
use crate::executor::PhysicalPlan;
use crate::executor::PhysicalPlanBuilder;
use crate::optimizer::ColumnSet;
use crate::optimizer::SExpr;
use crate::plans::set_update_stream_columns;
use crate::plans::Plan;
use crate::Planner;

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct Recluster {
Expand All @@ -51,33 +51,25 @@ pub struct HilbertSerialize {
impl PhysicalPlanBuilder {
pub async fn build_recluster(
&mut self,
s_expr: &SExpr,
recluster: &crate::plans::Recluster,
required: ColumnSet,
) -> Result<PhysicalPlan> {
let crate::plans::Recluster {
catalog,
database,
table,
filters,
limit,
cluster_type,
..
hilbert_stmt,
} = recluster;

let tenant = self.ctx.get_tenant();
let catalog = self.ctx.get_catalog(catalog).await?;
let tbl = catalog.get_table(&tenant, database, table).await?;
// check mutability
tbl.check_mutable()?;
let tbl = self.ctx.get_table(catalog, database, table).await?;
let push_downs = filters.clone().map(|v| PushDownInfo {
filters: Some(v),
..PushDownInfo::default()
});
let table_info = tbl.get_table_info().clone();

let mut plan = match cluster_type {
ClusterType::Hilbert => {
let mut plan = match hilbert_stmt {
Some(stmt) => {
let handler = get_hilbert_clustering_handler();
let Some((recluster_info, snapshot)) = handler
.do_hilbert_clustering(tbl.clone(), self.ctx.clone(), push_downs)
Expand All @@ -87,7 +79,23 @@ impl PhysicalPlanBuilder {
"No need to do recluster for '{database}'.'{table}'"
)));
};
let plan = self.build(s_expr.child(0)?, required).await?;

let mut planner = Planner::new(self.ctx.clone());
let plan = planner.plan_stmt(stmt).await?;
let (mut s_expr, metadata, bind_context) = match plan {
Plan::Query {
s_expr,
metadata,
bind_context,
..
} => (s_expr, metadata, bind_context),
v => unreachable!("Input plan must be Query, but it's {}", v),
};
self.set_metadata(metadata);
if tbl.change_tracking_enabled() {
*s_expr = set_update_stream_columns(&s_expr)?;
}
let plan = self.build(&s_expr, bind_context.column_set()).await?;

let plan = PhysicalPlan::HilbertSerialize(Box::new(HilbertSerialize {
plan_id: 0,
Expand All @@ -106,7 +114,7 @@ impl PhysicalPlanBuilder {
recluster_info: Some(recluster_info),
}))
}
ClusterType::Linear => {
None => {
let Some((parts, snapshot)) =
tbl.recluster(self.ctx.clone(), push_downs, *limit).await?
else {
Expand Down
Loading

0 comments on commit 6ba61fd

Please sign in to comment.