Skip to content

Commit

Permalink
chore: tweak attach table refreshing (#17019)
Browse files Browse the repository at this point in the history
* chore: refresh attach table refresh

* remove unused crates

* refresh table schema

* remove unused code

* fix: fuse table type should be inferred correctly

* Update src/query/storages/fuse/src/fuse_table.rs

Co-authored-by: TCeason <33082201+TCeason@users.noreply.github.com>

---------

Co-authored-by: TCeason <33082201+TCeason@users.noreply.github.com>
  • Loading branch information
dantengsky and TCeason authored Dec 10, 2024
1 parent ece1166 commit 603fbf0
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 157 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/query/catalog/src/statistics/data_cache_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use serde::Serialize;

#[derive(Default)]
pub struct DataCacheMetrics {
bytes_from_remote_disk: AtomicUsize,
pub bytes_from_remote_disk: AtomicUsize,
bytes_from_local_disk: AtomicUsize,
bytes_from_memory: AtomicUsize,
}
Expand Down
4 changes: 2 additions & 2 deletions src/query/service/src/catalogs/default/mutable_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ impl Catalog for MutableCatalog {

fn get_table_by_info(&self, table_info: &TableInfo) -> Result<Arc<dyn Table>> {
let storage = self.ctx.storage_factory.clone();
storage.get_table(table_info)
storage.get_table(table_info, self.disable_table_info_refresh)
}

#[async_backtrace::framed]
Expand Down Expand Up @@ -566,7 +566,7 @@ impl Catalog for MutableCatalog {
self.info(),
DatabaseType::NormalDB,
);
tables.push(storage.get_table(&table_info)?);
tables.push(storage.get_table(&table_info, ctx.disable_table_info_refresh)?);
}
Ok((tables, drop_ids))
}
Expand Down
32 changes: 2 additions & 30 deletions src/query/service/src/databases/default/default_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,26 +99,7 @@ impl DefaultDatabase {
})
.collect::<Vec<_>>();

if self.ctx.disable_table_info_refresh {
Ok(table_infos)
} else {
let mut refreshed = Vec::with_capacity(table_infos.len());
for table_info in table_infos {
refreshed.push(
self.ctx
.storage_factory
.refresh_table_info(table_info.clone())
.await
.map_err(|err| {
err.add_message_back(format!(
"(while refresh table info on {})",
table_info.name
))
})?,
);
}
Ok(refreshed)
}
Ok(table_infos)
}
}
#[async_trait::async_trait]
Expand All @@ -133,7 +114,7 @@ impl Database for DefaultDatabase {

fn get_table_by_info(&self, table_info: &TableInfo) -> Result<Arc<dyn Table>> {
let storage = &self.ctx.storage_factory;
storage.get_table(table_info)
storage.get_table(table_info, self.ctx.disable_table_info_refresh)
}

// Get one table by db and table name.
Expand Down Expand Up @@ -166,15 +147,6 @@ impl Database for DefaultDatabase {

let table_info = Arc::new(table_info);

let table_info = if self.ctx.disable_table_info_refresh {
table_info
} else {
self.ctx
.storage_factory
.refresh_table_info(table_info)
.await?
};

self.get_table_by_info(table_info.as_ref())
}

Expand Down
2 changes: 0 additions & 2 deletions src/query/storages/factory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ doctest = false
test = true

[dependencies]
async-trait = { workspace = true }
dashmap = { workspace = true }
databend-common-catalog = { workspace = true }
databend-common-config = { workspace = true }
Expand All @@ -26,7 +25,6 @@ databend-common-storages-random = { workspace = true }
databend-common-storages-stream = { workspace = true }
databend-common-storages-view = { workspace = true }
databend-storages-common-index = { workspace = true }
tokio = { workspace = true }

[lints]
workspace = true
88 changes: 18 additions & 70 deletions src/query/storages/factory/src/storage_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::sync::Arc;

use dashmap::mapref::one::Ref;
use dashmap::DashMap;
pub use databend_common_catalog::catalog::StorageDescription;
use databend_common_config::InnerConfig;
Expand All @@ -31,37 +32,25 @@ use databend_common_storages_view::view_table::ViewTable;
use crate::fuse::FuseTable;
use crate::Table;

// default schema refreshing timeout is 5 seconds.
const DEFAULT_SCHEMA_REFRESHING_TIMEOUT_MS: Duration = Duration::from_secs(5);

pub trait StorageCreator: Send + Sync {
fn try_create(&self, table_info: TableInfo) -> Result<Box<dyn Table>>;
fn try_create(&self, table_info: TableInfo, disable_refresh: bool) -> Result<Box<dyn Table>>;
}

impl<T> StorageCreator for T
where
T: Fn(TableInfo) -> Result<Box<dyn Table>>,
T: Send + Sync,
{
fn try_create(&self, table_info: TableInfo) -> Result<Box<dyn Table>> {
fn try_create(&self, table_info: TableInfo, _need_refresh: bool) -> Result<Box<dyn Table>> {
self(table_info)
}
}

use std::future::Future;
use std::time::Duration;

use dashmap::mapref::one::Ref;
pub struct FuseTableCreator {}

#[async_trait::async_trait]
impl<F, Fut> TableInfoRefresher for F
where
F: Fn(Arc<TableInfo>) -> Fut,
Fut: Future<Output = Result<Arc<TableInfo>>> + Send,
F: Send + Sync,
{
async fn refresh(&self, table_info: Arc<TableInfo>) -> Result<Arc<TableInfo>> {
self(table_info).await
impl StorageCreator for FuseTableCreator {
fn try_create(&self, table_info: TableInfo, disable_refresh: bool) -> Result<Box<dyn Table>> {
FuseTable::try_create_ext(table_info, disable_refresh)
}
}

Expand All @@ -79,24 +68,14 @@ where
}
}

// Table engines that need to refresh schema should provide implementation of this
// trait. For example, FUSE table engine that attaches to a remote storage may
// need to refresh schema to get the latest schema.
#[async_trait::async_trait]
pub trait TableInfoRefresher: Send + Sync {
async fn refresh(&self, table_info: Arc<TableInfo>) -> Result<Arc<TableInfo>>;
}

pub struct Storage {
creator: Arc<dyn StorageCreator>,
descriptor: Arc<dyn StorageDescriptor>,
table_info_refresher: Option<Arc<dyn TableInfoRefresher>>,
}

#[derive(Default)]
pub struct StorageFactory {
storages: DashMap<String, Storage>,
schema_refreshing_timeout: Duration,
}

impl StorageFactory {
Expand All @@ -108,98 +87,67 @@ impl StorageFactory {
creators.insert("MEMORY".to_string(), Storage {
creator: Arc::new(MemoryTable::try_create),
descriptor: Arc::new(MemoryTable::description),
table_info_refresher: None,
});
}

// Register NULL table engine.
creators.insert("NULL".to_string(), Storage {
creator: Arc::new(NullTable::try_create),
descriptor: Arc::new(NullTable::description),
table_info_refresher: None,
});

// Register FUSE table engine.
creators.insert("FUSE".to_string(), Storage {
creator: Arc::new(FuseTable::try_create),
creator: Arc::new(FuseTableCreator {}),
descriptor: Arc::new(FuseTable::description),
table_info_refresher: Some(Arc::new(FuseTable::refresh_schema)),
});

// Register VIEW table engine
creators.insert("VIEW".to_string(), Storage {
creator: Arc::new(ViewTable::try_create),
descriptor: Arc::new(ViewTable::description),
table_info_refresher: None,
});

// Register RANDOM table engine
creators.insert("RANDOM".to_string(), Storage {
creator: Arc::new(RandomTable::try_create),
descriptor: Arc::new(RandomTable::description),
table_info_refresher: None,
});

// Register STREAM table engine
creators.insert("STREAM".to_string(), Storage {
creator: Arc::new(StreamTable::try_create),
descriptor: Arc::new(StreamTable::description),
table_info_refresher: None,
});

// Register ICEBERG table engine
creators.insert("ICEBERG".to_string(), Storage {
creator: Arc::new(IcebergTable::try_create),
descriptor: Arc::new(IcebergTable::description),
table_info_refresher: None,
});

// Register DELTA table engine
creators.insert("DELTA".to_string(), Storage {
creator: Arc::new(DeltaTable::try_create),
descriptor: Arc::new(DeltaTable::description),
table_info_refresher: None,
});

StorageFactory {
storages: creators,
schema_refreshing_timeout: DEFAULT_SCHEMA_REFRESHING_TIMEOUT_MS,
}
StorageFactory { storages: creators }
}

pub fn get_table(&self, table_info: &TableInfo) -> Result<Arc<dyn Table>> {
pub fn get_table(
&self,
table_info: &TableInfo,
disable_refresh: bool,
) -> Result<Arc<dyn Table>> {
let factory = self.get_storage_factory(table_info)?;
let table: Arc<dyn Table> = factory.creator.try_create(table_info.clone())?.into();
let table: Arc<dyn Table> = factory
.creator
.try_create(table_info.clone(), disable_refresh)?
.into();
Ok(table)
}

pub async fn refresh_table_info(&self, table_info: Arc<TableInfo>) -> Result<Arc<TableInfo>> {
let factory = self.get_storage_factory(table_info.as_ref())?;
match factory.table_info_refresher.as_ref() {
None => Ok(table_info),
Some(refresher) => {
let table_description = table_info.desc.clone();
tokio::time::timeout(
self.schema_refreshing_timeout,
refresher.refresh(table_info),
)
.await
.map_err(|elapsed| {
ErrorCode::RefreshTableInfoFailure(format!(
"failed to refresh table meta {} in time. Elapsed: {}",
table_description, elapsed
))
})
.map_err(|e| {
ErrorCode::RefreshTableInfoFailure(format!(
"failed to refresh table meta {} : {}",
table_description, e
))
})?
}
}
}

fn get_storage_factory(&self, table_info: &TableInfo) -> Result<Ref<String, Storage>> {
let engine = table_info.engine().to_uppercase();
self.storages.get(&engine).ok_or_else(|| {
Expand Down
Loading

0 comments on commit 603fbf0

Please sign in to comment.