Skip to content

Commit

Permalink
(state-indexer-logic): improvement to bulk insert state changes (#362)
Browse files Browse the repository at this point in the history
* imrovement to bulk insert state changes

* fmt

* update changelog
  • Loading branch information
kobayurii authored Oct 9, 2024
1 parent 1303566 commit edd4414
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 127 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased](https://github.com/near/read-rpc/compare/main...develop)

## [0.3.0](https://github.com/near/read-rpc/releases/tag/v0.2.17)
### What's Changed
* Improved bulk insertion of state_changes, reducing database requests from hundreds to a maximum of 7 per block.

## [0.3.0](https://github.com/near/read-rpc/releases/tag/v0.3.0)

### BREAKING CHANGES

Expand Down
8 changes: 4 additions & 4 deletions database/src/postgres/state_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
_ => {}
}
});
query_builder.push(" ON CONFLICT (account_id, data_key, block_height) DO UPDATE SET data_value = EXCLUDED.data_value;");
query_builder.push(" ON CONFLICT DO NOTHING;");
query_builder
.build()
.execute(self.shards_pool.get(&shard_id).ok_or(anyhow::anyhow!(
Expand Down Expand Up @@ -320,7 +320,7 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
_ => {}
}
});
query_builder.push(" ON CONFLICT (account_id, data_key, block_height) DO UPDATE SET data_value = EXCLUDED.data_value;");
query_builder.push(" ON CONFLICT DO NOTHING;");
query_builder
.build()
.execute(self.shards_pool.get(&shard_id).ok_or(anyhow::anyhow!(
Expand Down Expand Up @@ -374,7 +374,7 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
_ => {}
}
});
query_builder.push(" ON CONFLICT (account_id, block_height) DO UPDATE SET data_value = EXCLUDED.data_value;");
query_builder.push(" ON CONFLICT DO NOTHING;");
query_builder
.build()
.execute(self.shards_pool.get(&shard_id).ok_or(anyhow::anyhow!(
Expand Down Expand Up @@ -428,7 +428,7 @@ impl crate::StateIndexerDbManager for crate::PostgresDBManager {
_ => {}
}
});
query_builder.push(" ON CONFLICT (account_id, block_height) DO UPDATE SET data_value = EXCLUDED.data_value;");
query_builder.push(" ON CONFLICT DO NOTHING;");
query_builder
.build()
.execute(self.shards_pool.get(&shard_id).ok_or(anyhow::anyhow!(
Expand Down
203 changes: 81 additions & 122 deletions logic-state-indexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::collections::HashMap;

pub use near_client::{NearClient, NearJsonRpc};
use near_indexer_primitives::views::StateChangeValueView;
use near_indexer_primitives::CryptoHash;
use near_primitives::types::ShardId;
use std::collections::HashMap;

use futures::FutureExt;
use itertools::Itertools;

#[macro_use]
extern crate lazy_static;
Expand All @@ -22,14 +21,26 @@ const SAVE_ATTEMPTS: usize = 20;
// Target for tracing logs
pub const INDEXER: &str = "state_indexer";

pub type AccountId = String;
pub type AccountIdStateChangeKey = String;

#[derive(Debug)]
#[derive(Debug, Default)]
struct StateChangesToStore {
data: HashMap<AccountId, ShardedStateChangesWithCause>,
access_key: HashMap<AccountId, ShardedStateChangesWithCause>,
contract: HashMap<AccountId, ShardedStateChangesWithCause>,
account: HashMap<AccountId, ShardedStateChangesWithCause>,
data: HashMap<
ShardId,
HashMap<AccountIdStateChangeKey, near_indexer_primitives::views::StateChangeWithCauseView>,
>,
access_key: HashMap<
ShardId,
HashMap<AccountIdStateChangeKey, near_indexer_primitives::views::StateChangeWithCauseView>,
>,
contract: HashMap<
ShardId,
HashMap<AccountIdStateChangeKey, near_indexer_primitives::views::StateChangeWithCauseView>,
>,
account: HashMap<
ShardId,
HashMap<AccountIdStateChangeKey, near_indexer_primitives::views::StateChangeWithCauseView>,
>,
}

impl StateChangesToStore {
Expand All @@ -42,23 +53,14 @@ impl StateChangesToStore {
block_hash: CryptoHash,
) -> anyhow::Result<()> {
if !self.data.is_empty() {
let futures: Vec<_> = self
.data
.values()
.chunk_by(|sharded_state_change| sharded_state_change.shard_id)
.into_iter()
.map(|(shard_id, sharded_state_changes)| {
let state_changes = sharded_state_changes
.map(|sharded_state_change| sharded_state_change.state_change.clone())
.collect();
db_manager.save_state_changes_data(
shard_id,
state_changes,
block_height,
block_hash,
)
})
.collect();
let futures = self.data.iter().map(|(shard_id, state_changes)| {
db_manager.save_state_changes_data(
*shard_id,
state_changes.values().cloned().collect(),
block_height,
block_hash,
)
});
futures::future::join_all(futures)
.await
.into_iter()
Expand All @@ -76,24 +78,14 @@ impl StateChangesToStore {
block_hash: CryptoHash,
) -> anyhow::Result<()> {
if !self.access_key.is_empty() {
let futures: Vec<_> = self
.access_key
.values()
.chunk_by(|sharded_state_change| sharded_state_change.shard_id)
.into_iter()
.map(|(shard_id, sharded_state_changes)| {
let state_changes = sharded_state_changes
.map(|sharded_state_change| sharded_state_change.state_change.clone())
.collect();
db_manager.save_state_changes_access_key(
shard_id,
state_changes,
block_height,
block_hash,
)
})
.collect();

let futures = self.access_key.iter().map(|(shard_id, state_changes)| {
db_manager.save_state_changes_access_key(
*shard_id,
state_changes.values().cloned().collect(),
block_height,
block_hash,
)
});
futures::future::join_all(futures)
.await
.into_iter()
Expand All @@ -111,24 +103,14 @@ impl StateChangesToStore {
block_hash: CryptoHash,
) -> anyhow::Result<()> {
if !self.contract.is_empty() {
let futures: Vec<_> = self
.contract
.values()
.chunk_by(|sharded_state_change| sharded_state_change.shard_id)
.into_iter()
.map(|(shard_id, sharded_state_changes)| {
let state_changes = sharded_state_changes
.map(|sharded_state_change| sharded_state_change.state_change.clone())
.collect();
db_manager.save_state_changes_contract(
shard_id,
state_changes,
block_height,
block_hash,
)
})
.collect();

let futures = self.contract.iter().map(|(shard_id, state_changes)| {
db_manager.save_state_changes_contract(
*shard_id,
state_changes.values().cloned().collect(),
block_height,
block_hash,
)
});
futures::future::join_all(futures)
.await
.into_iter()
Expand All @@ -146,24 +128,14 @@ impl StateChangesToStore {
block_hash: CryptoHash,
) -> anyhow::Result<()> {
if !self.account.is_empty() {
let futures: Vec<_> = self
.account
.values()
.chunk_by(|sharded_state_change| sharded_state_change.shard_id)
.into_iter()
.map(|(shard_id, sharded_state_changes)| {
let state_changes = sharded_state_changes
.map(|sharded_state_change| sharded_state_change.state_change.clone())
.collect();
db_manager.save_state_changes_account(
shard_id,
state_changes,
block_height,
block_hash,
)
})
.collect();

let futures = self.account.iter().map(|(shard_id, state_changes)| {
db_manager.save_state_changes_account(
*shard_id,
state_changes.values().cloned().collect(),
block_height,
block_hash,
)
});
futures::future::join_all(futures)
.await
.into_iter()
Expand Down Expand Up @@ -197,14 +169,6 @@ impl StateChangesToStore {
}
}

/// Wrapper around Statechanges with cause to store shard_id
/// based on the account_id and ShardLayout settings
#[derive(Debug)]
struct ShardedStateChangesWithCause {
shard_id: u64,
state_change: near_indexer_primitives::views::StateChangeWithCauseView,
}

#[cfg_attr(
feature = "tracing-instrumentation",
tracing::instrument(skip(streamer_message, db_manager))
Expand Down Expand Up @@ -379,12 +343,7 @@ async fn handle_state_changes(
indexer_config: &(impl configuration::RightsizingConfig + std::fmt::Debug),
shard_layout: &near_primitives::shard_layout::ShardLayout,
) -> anyhow::Result<()> {
let mut state_changes_to_store = StateChangesToStore {
data: HashMap::new(),
access_key: HashMap::new(),
contract: HashMap::new(),
account: HashMap::new(),
};
let mut state_changes_to_store = StateChangesToStore::default();

let initial_state_changes = streamer_message
.shards
Expand All @@ -409,13 +368,13 @@ async fn handle_state_changes(
let data_key: &[u8] = key.as_ref();
let key = format!("{}_data_{}", account_id.as_str(), hex::encode(data_key));
// This will override the previous record for this account_id + state change kind + suffix
state_changes_to_store.data.insert(
key,
ShardedStateChangesWithCause {
shard_id,
state_change,
},
);
state_changes_to_store
.data
.entry(shard_id)
.and_modify(|sharded_state_changes| {
sharded_state_changes.insert(key.clone(), state_change.clone());
})
.or_insert(HashMap::from([(key, state_change)]));
}
StateChangeValueView::AccessKeyUpdate {
account_id,
Expand All @@ -436,13 +395,13 @@ async fn handle_state_changes(
hex::encode(borsh::to_vec(&public_key)?)
);
// This will override the previous record for this account_id + state change kind + suffix
state_changes_to_store.access_key.insert(
key,
ShardedStateChangesWithCause {
shard_id,
state_change,
},
);
state_changes_to_store
.access_key
.entry(shard_id)
.and_modify(|sharded_state_changes| {
sharded_state_changes.insert(key.clone(), state_change.clone());
})
.or_insert(HashMap::from([(key, state_change)]));
}
// ContractCode and Account changes is not separate-able by any key, we can omit the suffix
StateChangeValueView::ContractCodeUpdate { account_id, .. }
Expand All @@ -451,27 +410,27 @@ async fn handle_state_changes(
near_primitives::shard_layout::account_id_to_shard_id(account_id, shard_layout);
let key = format!("{}_contract", account_id.as_str());
// This will override the previous record for this account_id + state change kind + suffix
state_changes_to_store.contract.insert(
key,
ShardedStateChangesWithCause {
shard_id,
state_change,
},
);
state_changes_to_store
.contract
.entry(shard_id)
.and_modify(|sharded_state_changes| {
sharded_state_changes.insert(key.clone(), state_change.clone());
})
.or_insert(HashMap::from([(key, state_change)]));
}
StateChangeValueView::AccountUpdate { account_id, .. }
| StateChangeValueView::AccountDeletion { account_id } => {
let shard_id =
near_primitives::shard_layout::account_id_to_shard_id(account_id, shard_layout);
let key = format!("{}_account", account_id.as_str());
// This will override the previous record for this account_id + state change kind + suffix
state_changes_to_store.account.insert(
key,
ShardedStateChangesWithCause {
shard_id,
state_change,
},
);
state_changes_to_store
.account
.entry(shard_id)
.and_modify(|sharded_state_changes| {
sharded_state_changes.insert(key.clone(), state_change.clone());
})
.or_insert(HashMap::from([(key, state_change)]));
}
};
}
Expand Down

0 comments on commit edd4414

Please sign in to comment.