Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: compact and recluster add table lock #15632

Merged
merged 3 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/query/catalog/src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ pub trait Lock: Sync + Send {
fn tenant_name(&self) -> &str;

async fn try_lock(&self, ctx: Arc<dyn TableContext>) -> Result<Option<LockGuard>>;

async fn try_lock_no_retry(&self, ctx: Arc<dyn TableContext>) -> Result<Option<LockGuard>>;
}
4 changes: 1 addition & 3 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use databend_storages_common_table_meta::meta::SnapshotId;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::table::ChangeType;

use crate::lock::Lock;
use crate::plan::DataSourceInfo;
use crate::plan::DataSourcePlan;
use crate::plan::PartStatistics;
Expand Down Expand Up @@ -339,10 +338,9 @@ pub trait Table: Sync + Send {
async fn compact_segments(
&self,
ctx: Arc<dyn TableContext>,
lock: Arc<dyn Lock>,
limit: Option<usize>,
) -> Result<()> {
let (_, _, _) = (ctx, lock, limit);
let (_, _) = (ctx, limit);

Err(ErrorCode::Unimplemented(format!(
"The operation 'compact_segments' is not supported for the table '{}', which is using the '{}' engine.",
Expand Down
3 changes: 2 additions & 1 deletion src/query/ee/tests/it/inverted_index/index_refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use databend_common_catalog::table::TableExt;
use databend_common_exception::Result;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::schema::CreateTableIndexReq;
use databend_common_sql::plans::LockTableOption;
use databend_common_sql::plans::RefreshTableIndexPlan;
use databend_common_storages_fuse::io::read::InvertedIndexReader;
use databend_common_storages_fuse::io::MetaReaders;
Expand Down Expand Up @@ -86,7 +87,7 @@ async fn test_fuse_do_refresh_inverted_index() -> Result<()> {
table: fixture.default_table_name(),
index_name: index_name.clone(),
segment_locs: None,
need_lock: true,
lock_opt: LockTableOption::LockWithRetry,
};
let interpreter = RefreshTableIndexInterpreter::try_create(ctx.clone(), refresh_index_plan)?;
let _ = interpreter.execute(ctx.clone()).await?;
Expand Down
3 changes: 2 additions & 1 deletion src/query/ee/tests/it/inverted_index/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use databend_common_expression::TableSchemaRefExt;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::schema::CreateTableIndexReq;
use databend_common_sql::plans::CreateTablePlan;
use databend_common_sql::plans::LockTableOption;
use databend_common_sql::plans::RefreshTableIndexPlan;
use databend_common_sql::BloomIndexColumns;
use databend_common_storages_fuse::pruning::create_segment_location_vector;
Expand Down Expand Up @@ -525,7 +526,7 @@ async fn test_block_pruner() -> Result<()> {
table: test_tbl_name.to_string(),
index_name: index_name.clone(),
segment_locs: None,
need_lock: true,
lock_opt: LockTableOption::LockWithRetry,
};
let interpreter = RefreshTableIndexInterpreter::try_create(ctx.clone(), refresh_index_plan)?;
let _ = interpreter.execute(ctx.clone()).await?;
Expand Down
13 changes: 7 additions & 6 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_pipeline_core::Pipeline;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::plans::LockTableOption;
use databend_common_sql::plans::OptimizeTableAction;
use databend_common_sql::plans::OptimizeTablePlan;
use log::info;
Expand Down Expand Up @@ -52,10 +53,10 @@ pub async fn hook_compact(
pipeline: &mut Pipeline,
compact_target: CompactTargetTableDescription,
trace_ctx: CompactHookTraceCtx,
need_lock: bool,
lock_opt: LockTableOption,
) {
let op_name = trace_ctx.operation_name.clone();
if let Err(e) = do_hook_compact(ctx, pipeline, compact_target, trace_ctx, need_lock).await {
if let Err(e) = do_hook_compact(ctx, pipeline, compact_target, trace_ctx, lock_opt).await {
info!("compact hook ({}) with error (ignored): {}", op_name, e);
}
}
Expand All @@ -66,7 +67,7 @@ async fn do_hook_compact(
pipeline: &mut Pipeline,
compact_target: CompactTargetTableDescription,
trace_ctx: CompactHookTraceCtx,
need_lock: bool,
lock_opt: LockTableOption,
) -> Result<()> {
if pipeline.is_empty() {
return Ok(());
Expand Down Expand Up @@ -103,7 +104,7 @@ async fn do_hook_compact(
if err.is_ok() {
info!("execute {op_name} finished successfully. running table optimization job.");
match GlobalIORuntime::instance().block_on({
compact_table(ctx, compact_target, compaction_limits, need_lock)
compact_table(ctx, compact_target, compaction_limits, lock_opt)
}) {
Ok(_) => {
info!("execute {op_name} finished successfully. table optimization job finished.");
Expand All @@ -126,7 +127,7 @@ async fn compact_table(
ctx: Arc<QueryContext>,
compact_target: CompactTargetTableDescription,
compaction_limits: CompactionLimits,
need_lock: bool,
lock_opt: LockTableOption,
) -> Result<()> {
// evict the table from cache
ctx.evict_table_from_cache(
Expand All @@ -143,7 +144,7 @@ async fn compact_table(
table: compact_target.table,
action: OptimizeTableAction::CompactBlocks(compaction_limits.block_limit),
limit: compaction_limits.segment_limit,
need_lock,
lock_opt,
})?;

let mut build_res = optimize_interpreter.execute2().await?;
Expand Down
17 changes: 12 additions & 5 deletions src/query/service/src/interpreters/hook/hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::Instant;
use databend_common_catalog::table_context::TableContext;
use databend_common_pipeline_core::Pipeline;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::plans::LockTableOption;
use log::info;
use log::warn;

Expand All @@ -35,7 +36,7 @@ pub struct HookOperator {
database: String,
table: String,
mutation_kind: MutationKind,
need_lock: bool,
lock_opt: LockTableOption,
}

impl HookOperator {
Expand All @@ -45,15 +46,15 @@ impl HookOperator {
database: String,
table: String,
mutation_kind: MutationKind,
need_lock: bool,
lock_opt: LockTableOption,
) -> Self {
Self {
ctx,
catalog,
database,
table,
mutation_kind,
need_lock,
lock_opt,
}
}

Expand Down Expand Up @@ -105,7 +106,7 @@ impl HookOperator {
pipeline,
compact_target,
trace_ctx,
self.need_lock,
self.lock_opt.clone(),
)
.await;
}
Expand All @@ -122,6 +123,12 @@ impl HookOperator {
table: self.table.to_owned(),
};

hook_refresh(self.ctx.clone(), pipeline, refresh_desc, self.need_lock).await;
hook_refresh(
self.ctx.clone(),
pipeline,
refresh_desc,
self.lock_opt.clone(),
)
.await;
}
}
17 changes: 11 additions & 6 deletions src/query/service/src/interpreters/hook/refresh_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use databend_common_meta_app::schema::ListIndexesByIdReq;
use databend_common_meta_app::schema::ListVirtualColumnsReq;
use databend_common_meta_types::MetaId;
use databend_common_pipeline_core::Pipeline;
use databend_common_sql::plans::LockTableOption;
use databend_common_sql::plans::Plan;
use databend_common_sql::plans::RefreshIndexPlan;
use databend_common_sql::plans::RefreshTableIndexPlan;
Expand Down Expand Up @@ -57,7 +58,7 @@ pub async fn hook_refresh(
ctx: Arc<QueryContext>,
pipeline: &mut Pipeline,
desc: RefreshDesc,
need_lock: bool,
lock_opt: LockTableOption,
) {
if pipeline.is_empty() {
return;
Expand All @@ -66,7 +67,7 @@ pub async fn hook_refresh(
pipeline.set_on_finished(move |(_profiles, err)| {
if err.is_ok() {
info!("execute pipeline finished successfully, starting run refresh job.");
match GlobalIORuntime::instance().block_on(do_refresh(ctx, desc, need_lock)) {
match GlobalIORuntime::instance().block_on(do_refresh(ctx, desc, lock_opt)) {
Ok(_) => {
info!("execute refresh job successfully.");
}
Expand All @@ -79,7 +80,11 @@ pub async fn hook_refresh(
});
}

async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc, need_lock: bool) -> Result<()> {
async fn do_refresh(
ctx: Arc<QueryContext>,
desc: RefreshDesc,
lock_opt: LockTableOption,
) -> Result<()> {
let table = ctx
.get_table(&desc.catalog, &desc.database, &desc.table)
.await?;
Expand All @@ -99,7 +104,7 @@ async fn do_refresh(ctx: Arc<QueryContext>, desc: RefreshDesc, need_lock: bool)

// Generate sync inverted indexes.
let inverted_index_plans =
generate_refresh_inverted_index_plan(ctx.clone(), &desc, table.clone(), need_lock).await?;
generate_refresh_inverted_index_plan(ctx.clone(), &desc, table.clone(), lock_opt).await?;
plans.extend_from_slice(&inverted_index_plans);

// Generate virtual columns.
Expand Down Expand Up @@ -263,7 +268,7 @@ async fn generate_refresh_inverted_index_plan(
ctx: Arc<QueryContext>,
desc: &RefreshDesc,
table: Arc<dyn Table>,
need_lock: bool,
lock_opt: LockTableOption,
) -> Result<Vec<Plan>> {
let segment_locs = ctx.get_segment_locations()?;
let mut plans = vec![];
Expand All @@ -279,7 +284,7 @@ async fn generate_refresh_inverted_index_plan(
table: desc.table.clone(),
index_name: index.name.clone(),
segment_locs: Some(segment_locs.clone()),
need_lock,
lock_opt: lock_opt.clone(),
};
plans.push(Plan::RefreshTableIndex(Box::new(plan)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::executor::physical_plans::TableScan;
use databend_common_sql::executor::table_read_plan::ToReadDataSourcePlan;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_sql::plans::LockTableOption;
use databend_common_storage::StageFileInfo;
use databend_common_storages_stage::StageTable;
use log::debug;
Expand Down Expand Up @@ -392,7 +393,7 @@ impl Interpreter for CopyIntoTableInterpreter {
self.plan.database_name.to_string(),
self.plan.table_name.to_string(),
MutationKind::Insert,
true,
LockTableOption::LockNoRetry,
);
hook_operator.execute(&mut build_res.main_pipeline).await;
}
Expand Down
25 changes: 12 additions & 13 deletions src/query/service/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use databend_common_sql::plans::BoundColumnRef;
use databend_common_sql::plans::ConstantExpr;
use databend_common_sql::plans::EvalScalar;
use databend_common_sql::plans::FunctionCall;
use databend_common_sql::plans::LockTableOption;
use databend_common_sql::plans::RelOperator;
use databend_common_sql::plans::ScalarItem;
use databend_common_sql::plans::SubqueryDesc;
Expand All @@ -67,7 +68,6 @@ use crate::interpreters::common::create_push_down_filters;
use crate::interpreters::HookOperator;
use crate::interpreters::Interpreter;
use crate::interpreters::SelectInterpreter;
use crate::locks::LockManager;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelinePullingExecutor;
use crate::pipelines::PipelineBuildResult;
Expand Down Expand Up @@ -110,21 +110,21 @@ impl Interpreter for DeleteInterpreter {
let is_distributed = !self.ctx.get_cluster().is_empty();

let catalog_name = self.plan.catalog_name.as_str();
let catalog = self.ctx.get_catalog(catalog_name).await?;
let catalog_info = catalog.info();

let db_name = self.plan.database_name.as_str();
let tbl_name = self.plan.table_name.as_str();
let tbl = catalog
.get_table(&self.ctx.get_tenant(), db_name, tbl_name)
.await?;

// Add table lock.
let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?;
let lock_guard = table_lock.try_lock(self.ctx.clone()).await?;
let lock_guard = self
.ctx
.clone()
.acquire_table_lock(catalog_name, db_name, tbl_name)
.await?;

// refresh table.
let tbl = tbl.refresh(self.ctx.as_ref()).await?;
let catalog = self.ctx.get_catalog(catalog_name).await?;
let catalog_info = catalog.info();
let tbl = catalog
.get_table(&self.ctx.get_tenant(), db_name, tbl_name)
.await?;

// check mutability
tbl.check_mutable()?;
Expand Down Expand Up @@ -301,7 +301,7 @@ impl Interpreter for DeleteInterpreter {
tbl_name.to_string(),
MutationKind::Delete,
// table lock has been added, no need to check.
false,
LockTableOption::NoLock,
);
hook_operator
.execute_refresh(&mut build_res.main_pipeline)
Expand Down Expand Up @@ -354,7 +354,6 @@ impl DeleteInterpreter {
mutation_kind: MutationKind::Delete,
update_stream_meta: vec![],
merge_meta,
need_lock: false,
deduplicated_label: None,
plan_id: u32::MAX,
}));
Expand Down
5 changes: 3 additions & 2 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use databend_common_sql::executor::PhysicalPlanBuilder;
use databend_common_sql::plans::insert::InsertValue;
use databend_common_sql::plans::Insert;
use databend_common_sql::plans::InsertInputSource;
use databend_common_sql::plans::LockTableOption;
use databend_common_sql::plans::Plan;
use databend_common_sql::NameResolutionContext;

Expand Down Expand Up @@ -276,7 +277,7 @@ impl Interpreter for InsertInterpreter {
self.plan.database.clone(),
self.plan.table.clone(),
MutationKind::Insert,
true,
LockTableOption::LockNoRetry,
);
hook_operator.execute(&mut build_res.main_pipeline).await;
}
Expand Down Expand Up @@ -311,7 +312,7 @@ impl Interpreter for InsertInterpreter {
self.plan.database.clone(),
self.plan.table.clone(),
MutationKind::Insert,
true,
LockTableOption::LockNoRetry,
);
hook_operator.execute(&mut build_res.main_pipeline).await;
}
Expand Down
Loading
Loading