From 603fbf003de200d02efc447353e091f549ba3c02 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 10 Dec 2024 15:06:13 +0800 Subject: [PATCH] chore: tweak attach table refreshing (#17019) * chore: refresh attach table refresh * remove unused crates * refresh table schema * remove unused code * fix: fuse table type should be inferred correctly * Update src/query/storages/fuse/src/fuse_table.rs Co-authored-by: TCeason <33082201+TCeason@users.noreply.github.com> --------- Co-authored-by: TCeason <33082201+TCeason@users.noreply.github.com> --- Cargo.lock | 2 - .../src/statistics/data_cache_statistics.rs | 2 +- .../src/catalogs/default/mutable_catalog.rs | 4 +- .../src/databases/default/default_database.rs | 32 +--- src/query/storages/factory/Cargo.toml | 2 - .../storages/factory/src/storage_factory.rs | 88 +++-------- src/query/storages/fuse/src/fuse_table.rs | 138 +++++++++++------- 7 files changed, 111 insertions(+), 157 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b135d630e2d2..a59d6d0c6251 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4130,7 +4130,6 @@ dependencies = [ name = "databend-common-storages-factory" version = "0.1.0" dependencies = [ - "async-trait", "dashmap 6.1.0", "databend-common-catalog", "databend-common-config", @@ -4145,7 +4144,6 @@ dependencies = [ "databend-common-storages-stream", "databend-common-storages-view", "databend-storages-common-index", - "tokio", ] [[package]] diff --git a/src/query/catalog/src/statistics/data_cache_statistics.rs b/src/query/catalog/src/statistics/data_cache_statistics.rs index 6097715d3794..04ae435204eb 100644 --- a/src/query/catalog/src/statistics/data_cache_statistics.rs +++ b/src/query/catalog/src/statistics/data_cache_statistics.rs @@ -19,7 +19,7 @@ use serde::Serialize; #[derive(Default)] pub struct DataCacheMetrics { - bytes_from_remote_disk: AtomicUsize, + pub bytes_from_remote_disk: AtomicUsize, bytes_from_local_disk: AtomicUsize, bytes_from_memory: AtomicUsize, } diff --git a/src/query/service/src/catalogs/default/mutable_catalog.rs b/src/query/service/src/catalogs/default/mutable_catalog.rs index 4db4ef1e160c..d492b6b2b3f0 100644 --- a/src/query/service/src/catalogs/default/mutable_catalog.rs +++ b/src/query/service/src/catalogs/default/mutable_catalog.rs @@ -434,7 +434,7 @@ impl Catalog for MutableCatalog { fn get_table_by_info(&self, table_info: &TableInfo) -> Result> { let storage = self.ctx.storage_factory.clone(); - storage.get_table(table_info) + storage.get_table(table_info, self.disable_table_info_refresh) } #[async_backtrace::framed] @@ -566,7 +566,7 @@ impl Catalog for MutableCatalog { self.info(), DatabaseType::NormalDB, ); - tables.push(storage.get_table(&table_info)?); + tables.push(storage.get_table(&table_info, ctx.disable_table_info_refresh)?); } Ok((tables, drop_ids)) } diff --git a/src/query/service/src/databases/default/default_database.rs b/src/query/service/src/databases/default/default_database.rs index efcc273c34a2..a445b5733b83 100644 --- a/src/query/service/src/databases/default/default_database.rs +++ b/src/query/service/src/databases/default/default_database.rs @@ -99,26 +99,7 @@ impl DefaultDatabase { }) .collect::>(); - if self.ctx.disable_table_info_refresh { - Ok(table_infos) - } else { - let mut refreshed = Vec::with_capacity(table_infos.len()); - for table_info in table_infos { - refreshed.push( - self.ctx - .storage_factory - .refresh_table_info(table_info.clone()) - .await - .map_err(|err| { - err.add_message_back(format!( - "(while refresh table info on {})", - table_info.name - )) - })?, - ); - } - Ok(refreshed) - } + Ok(table_infos) } } #[async_trait::async_trait] @@ -133,7 +114,7 @@ impl Database for DefaultDatabase { fn get_table_by_info(&self, table_info: &TableInfo) -> Result> { let storage = &self.ctx.storage_factory; - storage.get_table(table_info) + storage.get_table(table_info, self.ctx.disable_table_info_refresh) } // Get one table by db and table name. @@ -166,15 +147,6 @@ impl Database for DefaultDatabase { let table_info = Arc::new(table_info); - let table_info = if self.ctx.disable_table_info_refresh { - table_info - } else { - self.ctx - .storage_factory - .refresh_table_info(table_info) - .await? - }; - self.get_table_by_info(table_info.as_ref()) } diff --git a/src/query/storages/factory/Cargo.toml b/src/query/storages/factory/Cargo.toml index a2ec632b75b4..b6a2864df4c9 100644 --- a/src/query/storages/factory/Cargo.toml +++ b/src/query/storages/factory/Cargo.toml @@ -11,7 +11,6 @@ doctest = false test = true [dependencies] -async-trait = { workspace = true } dashmap = { workspace = true } databend-common-catalog = { workspace = true } databend-common-config = { workspace = true } @@ -26,7 +25,6 @@ databend-common-storages-random = { workspace = true } databend-common-storages-stream = { workspace = true } databend-common-storages-view = { workspace = true } databend-storages-common-index = { workspace = true } -tokio = { workspace = true } [lints] workspace = true diff --git a/src/query/storages/factory/src/storage_factory.rs b/src/query/storages/factory/src/storage_factory.rs index 4a91c9a8764e..5d487696f615 100644 --- a/src/query/storages/factory/src/storage_factory.rs +++ b/src/query/storages/factory/src/storage_factory.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use dashmap::mapref::one::Ref; use dashmap::DashMap; pub use databend_common_catalog::catalog::StorageDescription; use databend_common_config::InnerConfig; @@ -31,11 +32,8 @@ use databend_common_storages_view::view_table::ViewTable; use crate::fuse::FuseTable; use crate::Table; -// default schema refreshing timeout is 5 seconds. -const DEFAULT_SCHEMA_REFRESHING_TIMEOUT_MS: Duration = Duration::from_secs(5); - pub trait StorageCreator: Send + Sync { - fn try_create(&self, table_info: TableInfo) -> Result>; + fn try_create(&self, table_info: TableInfo, disable_refresh: bool) -> Result>; } impl StorageCreator for T @@ -43,25 +41,16 @@ where T: Fn(TableInfo) -> Result>, T: Send + Sync, { - fn try_create(&self, table_info: TableInfo) -> Result> { + fn try_create(&self, table_info: TableInfo, _need_refresh: bool) -> Result> { self(table_info) } } -use std::future::Future; -use std::time::Duration; - -use dashmap::mapref::one::Ref; +pub struct FuseTableCreator {} -#[async_trait::async_trait] -impl TableInfoRefresher for F -where - F: Fn(Arc) -> Fut, - Fut: Future>> + Send, - F: Send + Sync, -{ - async fn refresh(&self, table_info: Arc) -> Result> { - self(table_info).await +impl StorageCreator for FuseTableCreator { + fn try_create(&self, table_info: TableInfo, disable_refresh: bool) -> Result> { + FuseTable::try_create_ext(table_info, disable_refresh) } } @@ -79,24 +68,14 @@ where } } -// Table engines that need to refresh schema should provide implementation of this -// trait. For example, FUSE table engine that attaches to a remote storage may -// need to refresh schema to get the latest schema. -#[async_trait::async_trait] -pub trait TableInfoRefresher: Send + Sync { - async fn refresh(&self, table_info: Arc) -> Result>; -} - pub struct Storage { creator: Arc, descriptor: Arc, - table_info_refresher: Option>, } #[derive(Default)] pub struct StorageFactory { storages: DashMap, - schema_refreshing_timeout: Duration, } impl StorageFactory { @@ -108,7 +87,6 @@ impl StorageFactory { creators.insert("MEMORY".to_string(), Storage { creator: Arc::new(MemoryTable::try_create), descriptor: Arc::new(MemoryTable::description), - table_info_refresher: None, }); } @@ -116,90 +94,60 @@ impl StorageFactory { creators.insert("NULL".to_string(), Storage { creator: Arc::new(NullTable::try_create), descriptor: Arc::new(NullTable::description), - table_info_refresher: None, }); // Register FUSE table engine. creators.insert("FUSE".to_string(), Storage { - creator: Arc::new(FuseTable::try_create), + creator: Arc::new(FuseTableCreator {}), descriptor: Arc::new(FuseTable::description), - table_info_refresher: Some(Arc::new(FuseTable::refresh_schema)), }); // Register VIEW table engine creators.insert("VIEW".to_string(), Storage { creator: Arc::new(ViewTable::try_create), descriptor: Arc::new(ViewTable::description), - table_info_refresher: None, }); // Register RANDOM table engine creators.insert("RANDOM".to_string(), Storage { creator: Arc::new(RandomTable::try_create), descriptor: Arc::new(RandomTable::description), - table_info_refresher: None, }); // Register STREAM table engine creators.insert("STREAM".to_string(), Storage { creator: Arc::new(StreamTable::try_create), descriptor: Arc::new(StreamTable::description), - table_info_refresher: None, }); // Register ICEBERG table engine creators.insert("ICEBERG".to_string(), Storage { creator: Arc::new(IcebergTable::try_create), descriptor: Arc::new(IcebergTable::description), - table_info_refresher: None, }); // Register DELTA table engine creators.insert("DELTA".to_string(), Storage { creator: Arc::new(DeltaTable::try_create), descriptor: Arc::new(DeltaTable::description), - table_info_refresher: None, }); - StorageFactory { - storages: creators, - schema_refreshing_timeout: DEFAULT_SCHEMA_REFRESHING_TIMEOUT_MS, - } + StorageFactory { storages: creators } } - pub fn get_table(&self, table_info: &TableInfo) -> Result> { + pub fn get_table( + &self, + table_info: &TableInfo, + disable_refresh: bool, + ) -> Result> { let factory = self.get_storage_factory(table_info)?; - let table: Arc = factory.creator.try_create(table_info.clone())?.into(); + let table: Arc = factory + .creator + .try_create(table_info.clone(), disable_refresh)? + .into(); Ok(table) } - pub async fn refresh_table_info(&self, table_info: Arc) -> Result> { - let factory = self.get_storage_factory(table_info.as_ref())?; - match factory.table_info_refresher.as_ref() { - None => Ok(table_info), - Some(refresher) => { - let table_description = table_info.desc.clone(); - tokio::time::timeout( - self.schema_refreshing_timeout, - refresher.refresh(table_info), - ) - .await - .map_err(|elapsed| { - ErrorCode::RefreshTableInfoFailure(format!( - "failed to refresh table meta {} in time. Elapsed: {}", - table_description, elapsed - )) - }) - .map_err(|e| { - ErrorCode::RefreshTableInfoFailure(format!( - "failed to refresh table meta {} : {}", - table_description, e - )) - })? - } - } - } - fn get_storage_factory(&self, table_info: &TableInfo) -> Result> { let engine = table_info.engine().to_uppercase(); self.storages.get(&engine).ok_or_else(|| { diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index a6f7054ee5db..f4607cca52da 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -25,6 +25,7 @@ use std::time::Instant; use async_channel::Receiver; use chrono::Duration; use chrono::TimeDelta; +use databend_common_base::base::tokio; use databend_common_base::runtime::GlobalIORuntime; use databend_common_catalog::catalog::StorageDescription; use databend_common_catalog::plan::DataSourcePlan; @@ -48,6 +49,7 @@ use databend_common_expression::types::DataType; use databend_common_expression::BlockThresholds; use databend_common_expression::ColumnId; use databend_common_expression::RemoteExpr; +use databend_common_expression::TableSchema; use databend_common_expression::ORIGIN_BLOCK_ID_COL_NAME; use databend_common_expression::ORIGIN_BLOCK_ROW_NUM_COL_NAME; use databend_common_expression::ORIGIN_VERSION_COL_NAME; @@ -143,41 +145,26 @@ pub struct FuseTable { type PartInfoReceiver = Option>>; +// default schema refreshing timeout is 5 seconds. +const DEFAULT_SCHEMA_REFRESHING_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); + impl FuseTable { pub fn try_create(table_info: TableInfo) -> Result> { - Ok(Self::do_create(table_info)?) + Ok(Self::do_create_table_ext(table_info, false)?) } - pub async fn refresh_schema(table_info: Arc) -> Result> { - // check if table is AttachedReadOnly in a lighter way - let need_refresh_schema = match table_info.db_type { - DatabaseType::NormalDB => { - table_info.meta.storage_params.is_some() - && Self::is_table_attached(&table_info.meta.options) - } - }; + pub fn do_create(table_info: TableInfo) -> Result> { + Self::do_create_table_ext(table_info, true) + } - if need_refresh_schema { - info!("refreshing table schema {}", table_info.desc); - let table = Self::do_create(table_info.as_ref().clone())?; - let snapshot = table.read_table_snapshot().await?; - let schema = snapshot - .ok_or_else(|| { - ErrorCode::ShareStorageError( - "Failed to load snapshot of read_only attach table".to_string(), - ) - })? - .schema - .clone(); - let mut table_info = table_info.as_ref().clone(); - table_info.meta.schema = Arc::new(schema); - Ok(Arc::new(table_info)) - } else { - Ok(table_info) - } + pub fn try_create_ext(table_info: TableInfo, disable_refresh: bool) -> Result> { + Ok(Self::do_create_table_ext(table_info, disable_refresh)?) } - pub fn do_create(mut table_info: TableInfo) -> Result> { + pub fn do_create_table_ext( + mut table_info: TableInfo, + disable_refresh: bool, + ) -> Result> { let storage_prefix = Self::parse_storage_prefix_from_table_info(&table_info)?; let cluster_key_meta = table_info.meta.cluster_key(); @@ -190,11 +177,13 @@ impl FuseTable { let table_meta_options = &table_info.meta.options; let operator = init_operator(&sp)?; let table_type = if Self::is_table_attached(table_meta_options) { - Self::tweak_attach_table_snapshot_location( - &mut table_info, - &operator, - &storage_prefix, - )?; + if !disable_refresh { + Self::refresh_table_info( + &mut table_info, + &operator, + &storage_prefix, + )?; + } FuseTableType::Attached } else { FuseTableType::External @@ -333,24 +322,28 @@ impl FuseTable { #[async_backtrace::framed] pub async fn read_table_snapshot(&self) -> Result>> { let reader = MetaReaders::table_snapshot_reader(self.get_operator()); - self.read_table_snapshot_with_reader(reader).await + let loc = self.snapshot_loc(); + let ver = self.snapshot_format_version(loc.clone())?; + Self::read_table_snapshot_with_reader(reader, loc, ver).await } #[fastrace::trace] #[async_backtrace::framed] pub async fn read_table_snapshot_without_cache(&self) -> Result>> { let reader = MetaReaders::table_snapshot_reader_without_cache(self.get_operator()); - self.read_table_snapshot_with_reader(reader).await + let loc = self.snapshot_loc(); + let ver = self.snapshot_format_version(loc.clone())?; + Self::read_table_snapshot_with_reader(reader, loc, ver).await } async fn read_table_snapshot_with_reader( - &self, reader: TableSnapshotReader, + snapshot_location: Option, + ver: u64, ) -> Result>> { - if let Some(loc) = self.snapshot_loc() { - let ver = self.snapshot_format_version(Some(loc.clone()))?; + if let Some(location) = snapshot_location { let params = LoadParams { - location: loc, + location, len_hint: None, ver, put_cache: true, @@ -469,11 +462,12 @@ impl FuseTable { self.meta_location_generator.prefix() } - fn load_snapshot_location_from_hint( + fn refresh_schema_from_hint( operator: &Operator, storage_prefix: &str, - ) -> Result> { - GlobalIORuntime::instance().block_on(async { + table_description: &str, + ) -> Result> { + let refresh_task = async { let hint_file_path = format!("{}/{}", storage_prefix, FUSE_TBL_LAST_SNAPSHOT_HINT); let begin_load_hint = Instant::now(); let maybe_hint_content = operator.read(&hint_file_path).await; @@ -488,9 +482,30 @@ impl FuseTable { let hint_content = buf.to_vec(); let snapshot_full_path = String::from_utf8(hint_content)?; let operator_info = operator.info(); - Ok::<_, ErrorCode>(Some( + + let loc = snapshot_full_path[operator_info.root().len()..].to_string(); + + // refresh table schema by loading the snapshot + let begin = Instant::now(); + let reader = MetaReaders::table_snapshot_reader_without_cache(operator.clone()); + let ver = TableMetaLocationGenerator::snapshot_version(loc.as_str()); + let snapshot = + Self::read_table_snapshot_with_reader(reader, Some(loc), ver).await?; + info!("table snapshot refreshed, time used {:?}", begin.elapsed()); + + let schema = snapshot + .ok_or_else(|| { + ErrorCode::ShareStorageError( + "Failed to load snapshot of read_only attach table".to_string(), + ) + })? + .schema + .clone(); + + Ok::<_, ErrorCode>(Some(( snapshot_full_path[operator_info.root().len()..].to_string(), - )) + schema, + ))) } Err(e) if e.kind() == opendal::ErrorKind::NotFound => { // Table be attached has not last snapshot hint file, treat it as empty table @@ -498,10 +513,29 @@ impl FuseTable { } Err(e) => Err(e.into()), } - }) + }; + + let refresh_task_with_timeout = async { + tokio::time::timeout(DEFAULT_SCHEMA_REFRESHING_TIMEOUT, refresh_task) + .await + .map_err(|_e| { + ErrorCode::RefreshTableInfoFailure(format!( + "failed to refresh table info {} in time", + table_description + )) + }) + .map_err(|e| { + ErrorCode::RefreshTableInfoFailure(format!( + "failed to refresh table info {} : {}", + table_description, e + )) + })? + }; + + GlobalIORuntime::instance().block_on(refresh_task_with_timeout) } - fn tweak_attach_table_snapshot_location( + fn refresh_table_info( table_info: &mut TableInfo, operator: &Operator, storage_prefix: &str, @@ -515,22 +549,26 @@ impl FuseTable { return Ok(()); } - let location = Self::load_snapshot_location_from_hint(operator, storage_prefix)?; + let refreshed = Self::refresh_schema_from_hint(operator, storage_prefix, &table_info.desc)?; info!( "extracted snapshot location [{:?}] of table {}, with id {:?} from the last snapshot hint file.", - location, table_info.desc, table_info.ident + refreshed.as_ref().map(|(location, _)| location), + table_info.desc, + table_info.ident ); // Adjust snapshot location to the values extracted from the last snapshot hint - match location { + match refreshed { None => { table_info.options_mut().remove(OPT_KEY_SNAPSHOT_LOCATION); } - Some(location) => { + Some((location, schema)) => { table_info .options_mut() .insert(OPT_KEY_SNAPSHOT_LOCATION.to_string(), location); + + table_info.meta.schema = Arc::new(schema); } }