From 02a9f3e82af650c288272ab291a9a7225ddf933d Mon Sep 17 00:00:00 2001 From: yusufyian Date: Sat, 15 Jun 2024 06:21:35 +0000 Subject: [PATCH] chore: Add remote_loader module for loading accounts remotely --- accounts-db/Cargo.toml | 13 ++++++ accounts-db/src/accounts_cache.rs | 45 ++++++++++++++++++++- accounts-db/src/accounts_db.rs | 22 +++++++++- accounts-db/src/lib.rs | 1 + accounts-db/src/read_only_accounts_cache.rs | 16 ++++++++ 5 files changed, 94 insertions(+), 3 deletions(-) diff --git a/accounts-db/Cargo.toml b/accounts-db/Cargo.toml index 567a901..548e26c 100644 --- a/accounts-db/Cargo.toml +++ b/accounts-db/Cargo.toml @@ -10,6 +10,12 @@ license = { workspace = true } edition = { workspace = true } [dependencies] +bs58 = { workspace = true } +base64 = { workspace = true } +zstd = { workspace = true } +tokio = { workspace = true } +reqwest = { workspace = true } +serde_json = { workspace = true } arrayref = { workspace = true } bincode = { workspace = true } blake3 = { workspace = true } @@ -19,6 +25,7 @@ byteorder = { workspace = true } bzip2 = { workspace = true } crossbeam-channel = { workspace = true } dashmap = { workspace = true, features = ["rayon", "raw-api"] } +dirs-next = { workspace = true } flate2 = { workspace = true } fnv = { workspace = true } im = { workspace = true, features = ["rayon", "serde"] } @@ -42,8 +49,11 @@ regex = { workspace = true } seqlock = { workspace = true } serde = { workspace = true, features = ["rc"] } serde_derive = { workspace = true } +serde_yaml = { workspace = true } +sha2 = { workspace = true } smallvec = { workspace = true } solana-bucket-map = { workspace = true } +solana-client = {workspace = true} solana-config-program = { workspace = true } solana-frozen-abi = { workspace = true } solana-frozen-abi-macro = { workspace = true } @@ -55,6 +65,7 @@ solana-rayon-threadlimit = { workspace = true } solana-sdk = { workspace = true } solana-stake-program = { workspace = true } solana-system-program = { workspace = true } +solana-version = { workspace = true } solana-vote-program = { workspace = true } static_assertions = { workspace = true } strum = { workspace = true, features = ["derive"] } @@ -62,6 +73,8 @@ strum_macros = { workspace = true } tar = { workspace = true } tempfile = { workspace = true } thiserror = { workspace = true } +sonic-printer = { workspace = true} +chrono = { workspace = true } [lib] crate-type = ["lib"] diff --git a/accounts-db/src/accounts_cache.rs b/accounts-db/src/accounts_cache.rs index e612ad7..91cf453 100644 --- a/accounts-db/src/accounts_cache.rs +++ b/accounts-db/src/accounts_cache.rs @@ -1,5 +1,6 @@ use { crate::{accounts_db::AccountsDb, accounts_hash::AccountHash}, + crate::remote_loader::RemoteAccountLoader, //Sonic: using RemoteAccountLoader dashmap::DashMap, seqlock::SeqLock, solana_sdk::{ @@ -162,6 +163,7 @@ pub struct AccountsCache { maybe_unflushed_roots: RwLock>, max_flushed_root: AtomicU64, total_size: Arc, + pub remote_loader: RemoteAccountLoader, //Sonic: using RemoteAccountLoader } impl AccountsCache { @@ -224,8 +226,47 @@ impl AccountsCache { } pub fn load(&self, slot: Slot, pubkey: &Pubkey) -> Option { - self.slot_cache(slot) - .and_then(|slot_cache| slot_cache.get_cloned(pubkey)) + // self.slot_cache(slot) + // .and_then(|slot_cache| slot_cache.get_cloned(pubkey)) + match self.slot_cache(slot) + .and_then(|slot_cache| slot_cache.get_cloned(pubkey)) { + Some(account) => { + Some(account) + }, + None => { + //Sonic: load from remote + let account = self.remote_loader.get_account(pubkey); + match account { + Some(acc) => { + //Sonic: store into cache + Some(self.store(slot, pubkey, acc)) + }, + None => None, + } + // None + }, + } + } + + //Sonic: check if account exists in remote + pub fn has_account_from_remote(&self, pubkey: &Pubkey) -> bool { + self.remote_loader.has_account(pubkey) + } + + //Sonic: load accounts from remote + pub fn load_accounts_from_remote(&self, pubkeys: Vec) { + pubkeys.iter().for_each(|pubkey| { + //Sonic: load from remote + self.remote_loader.load_account(pubkey); + }); + } + + //Sonic: load accounts from remote + pub fn deactivate_remote_accounts(&self, pubkeys: Vec) { + pubkeys.iter().for_each(|pubkey| { + //Sonic: deactivate account in cache + self.remote_loader.deactivate_account(&pubkey); + }); } pub fn remove_slot(&self, slot: Slot) -> Option { diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index 71f215f..cffa0f1 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -5065,6 +5065,10 @@ impl AccountsDb { AccountIndexGetResult::Found(lock, index) => (lock, index), // we bail out pretty early for missing. AccountIndexGetResult::NotFound => { + // Sonic: check if the pubkey is from remote in cache. + if ancestors.len() > 1 && self.accounts_cache.has_account_from_remote(pubkey) { + return Some((0, StorageLocation::Cached, None)); //Sonic: return a dummy slot number + } return None; } }; @@ -5415,7 +5419,7 @@ impl AccountsDb { self.read_index_for_accessor_or_load_slow(ancestors, pubkey, max_root, false)?; // Notice the subtle `?` at previous line, we bail out pretty early if missing. - let in_write_cache = storage_location.is_cached(); + let in_write_cache: bool = storage_location.is_cached(); if !load_into_read_cache_only { if !in_write_cache { let result = self.read_only_accounts_cache.load(*pubkey, slot); @@ -5449,6 +5453,7 @@ impl AccountsDb { load_hint, )?; let loaded_account = account_accessor.check_and_get_loaded_account(); + let is_cached = loaded_account.is_cached(); let account = loaded_account.take_account(); if matches!(load_zero_lamports, LoadZeroLamports::None) && account.is_zero_lamport() { @@ -7713,6 +7718,17 @@ impl AccountsDb { .map(|d| d.as_ref().unwrap().get_cache_hash_data()) .collect::>(); + //Sonic: calculate the total lamports of remote accounts + let mut lamports: u64 = 0; + for chis in cache_hash_intermediates.clone() { + for item in chis { + if self.accounts_cache.has_account_from_remote(&item.pubkey) { + println!("==== remote key: {:?}", item); + lamports += item.lamports; + } + } + } + // turn raw data into merkle tree hashes and sum of lamports let (accounts_hash, capitalization) = accounts_hasher.rest_of_hash_calculation(&cache_hash_intermediates, &mut stats); @@ -7722,6 +7738,10 @@ impl AccountsDb { AccountsHashKind::Incremental(IncrementalAccountsHash(accounts_hash)) } }; + + //Sonic: subtract the lamports of remote accounts from the capitalization + let capitalization = capitalization - lamports; + info!("calculate_accounts_hash_from_storages: slot: {slot}, {accounts_hash:?}, capitalization: {capitalization}"); Ok((accounts_hash, capitalization)) }; diff --git a/accounts-db/src/lib.rs b/accounts-db/src/lib.rs index 74fdb86..a1026ee 100644 --- a/accounts-db/src/lib.rs +++ b/accounts-db/src/lib.rs @@ -49,6 +49,7 @@ pub mod transaction_results; pub mod utils; mod verify_accounts_hash_in_background; pub mod waitable_condvar; +pub mod remote_loader; #[macro_use] extern crate solana_metrics; diff --git a/accounts-db/src/read_only_accounts_cache.rs b/accounts-db/src/read_only_accounts_cache.rs index 27e0d84..088b8b4 100644 --- a/accounts-db/src/read_only_accounts_cache.rs +++ b/accounts-db/src/read_only_accounts_cache.rs @@ -1,6 +1,7 @@ //! ReadOnlyAccountsCache used to store accounts, such as executable accounts, //! which can be large, loaded many times, and rarely change. use { + // crate::remote_loader::RemoteAccountLoader, dashmap::{mapref::entry::Entry, DashMap}, index_list::{Index, IndexList}, solana_measure::measure_us, @@ -72,6 +73,7 @@ pub(crate) struct ReadOnlyAccountsCache { // Performance statistics stats: ReadOnlyCacheStats, + // remote_loader: RemoteAccountLoader, } impl ReadOnlyAccountsCache { @@ -83,6 +85,7 @@ impl ReadOnlyAccountsCache { data_size: AtomicUsize::default(), ms_to_skip_lru_update, stats: ReadOnlyCacheStats::default(), + // remote_loader: RemoteAccountLoader::default(), } } @@ -106,6 +109,14 @@ impl ReadOnlyAccountsCache { let Some(entry) = self.cache.get(&key) else { self.stats.misses.fetch_add(1, Ordering::Relaxed); return None; + // let account = self.remote_loader.get_account(&pubkey); + // if let Some(acc) = account { + // Some(self.store(pubkey, slot, acc.clone())); + // return Some(acc); + // } else { + // self.stats.misses.fetch_add(1, Ordering::Relaxed); + // return None; + // } }; // Move the entry to the end of the queue. // self.queue is modified while holding a reference to the cache entry; @@ -129,6 +140,11 @@ impl ReadOnlyAccountsCache { account } + // pub fn has_account_from_remote(&self, pubkey: &Pubkey) -> bool { + // // panic!("has_account_from_remote() not implemented"); + // self.remote_loader.has_account(pubkey) + // } + fn account_size(&self, account: &AccountSharedData) -> usize { CACHE_ENTRY_SIZE + account.data().len() }