Skip to content

Commit

Permalink
Merge pull request #378 from near/cherry
Browse files Browse the repository at this point in the history
Cherry-pick necessary changes and avoid to update nearcore
  • Loading branch information
kobayurii authored Nov 25, 2024
2 parents a82b8d5 + 8ecf775 commit 940ded2
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 35 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased](https://github.com/near/read-rpc/compare/main...develop)

### What's Changed
* Corrected state size calculation logic.
* Integrated cargo_pkg_version metric to reflect the current server version.
* Delete unnecessary debug logs about update blocks by finalities

## [0.3.1](https://github.com/near/read-rpc/releases/tag/v0.3.1)

### Supported Nearcore Version
- nearcore v2.3.0
- rust v1.81.0

### What's Changed
* Improved bulk insertion of state_changes, reducing database requests from hundreds to a maximum of 7 per block.
* Configuration improvement. Create default config.toml on start application to loaded parameters from the environment variables.
Expand Down
6 changes: 6 additions & 0 deletions database/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,10 @@ lazy_static! {
&["method_name", "table_name"]
)
.unwrap();
pub(crate) static ref ACCOUTS_DATABASE_READ_QUERIES: IntCounterVec = register_int_counter_vec(
"account_database_read_queries_counter",
"Total number of accounts database reads queries by method_name and table_name",
&["account_id", "shard_id", "method_name", "table_name"]
)
.unwrap();
}
64 changes: 64 additions & 0 deletions database/src/postgres/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"state_changes_data",
])
.inc();
crate::metrics::ACCOUTS_DATABASE_READ_QUERIES
.with_label_values(&[
account_id.as_ref(),
&shard_id_pool.shard_id.to_string(),
method_name,
"state_changes_data",
])
.inc();
let page_state = if let Some(page_state_token) = page_token {
borsh::from_slice::<crate::postgres::PageState>(&hex::decode(page_state_token)?)?
} else {
Expand Down Expand Up @@ -150,6 +158,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"state_changes_data",
])
.inc();
crate::metrics::ACCOUTS_DATABASE_READ_QUERIES
.with_label_values(&[
account_id.as_ref(),
&shard_id_pool.shard_id.to_string(),
method_name,
"state_changes_data",
])
.inc();
let mut items = std::collections::HashMap::new();
let mut stream = sqlx::query_as::<_, (String, Vec<u8>)>(
"
Expand Down Expand Up @@ -207,6 +223,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"state_changes_data",
])
.inc();
crate::metrics::ACCOUTS_DATABASE_READ_QUERIES
.with_label_values(&[
account_id.as_ref(),
&shard_id_pool.shard_id.to_string(),
method_name,
"state_changes_data",
])
.inc();
let mut items = std::collections::HashMap::new();
let mut stream = sqlx::query_as::<_, (String, Vec<u8>)>(
"
Expand Down Expand Up @@ -264,6 +288,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"state_changes_data",
])
.inc();
crate::metrics::ACCOUTS_DATABASE_READ_QUERIES
.with_label_values(&[
account_id.as_ref(),
&shard_id_pool.shard_id.to_string(),
method_name,
"state_changes_data",
])
.inc();
let (data_value,): (Vec<u8>,) = sqlx::query_as(
"
SELECT data_value
Expand Down Expand Up @@ -297,6 +329,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"state_changes_account",
])
.inc();
crate::metrics::ACCOUTS_DATABASE_READ_QUERIES
.with_label_values(&[
account_id.as_ref(),
&shard_id_pool.shard_id.to_string(),
method_name,
"state_changes_account",
])
.inc();
let (block_height, block_hash, data_value): (bigdecimal::BigDecimal, String, Vec<u8>) =
sqlx::query_as(
"
Expand Down Expand Up @@ -334,6 +374,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"state_changes_contract",
])
.inc();
crate::metrics::ACCOUTS_DATABASE_READ_QUERIES
.with_label_values(&[
account_id.as_ref(),
&shard_id_pool.shard_id.to_string(),
method_name,
"state_changes_contract",
])
.inc();
let (block_height, block_hash, contract_code): (bigdecimal::BigDecimal, String, Vec<u8>) =
sqlx::query_as(
"
Expand Down Expand Up @@ -372,6 +420,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"state_changes_access_key",
])
.inc();
crate::metrics::ACCOUTS_DATABASE_READ_QUERIES
.with_label_values(&[
account_id.as_ref(),
&shard_id_pool.shard_id.to_string(),
method_name,
"state_changes_access_key",
])
.inc();
let key_data = borsh::to_vec(&public_key)?;
let (block_height, block_hash, data_value): (bigdecimal::BigDecimal, String, Vec<u8>) =
sqlx::query_as(
Expand Down Expand Up @@ -412,6 +468,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
"state_changes_access_key",
])
.inc();
crate::metrics::ACCOUTS_DATABASE_READ_QUERIES
.with_label_values(&[
account_id.as_ref(),
&shard_id_pool.shard_id.to_string(),
method_name,
"state_changes_access_key",
])
.inc();
let mut access_keys = vec![];
let mut stream = sqlx::query_as::<_, (String, Vec<u8>, bigdecimal::BigDecimal)>(
"
Expand Down
4 changes: 4 additions & 0 deletions rpc-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ impl ServerContext {
let compiled_contract_code_cache =
std::sync::Arc::new(CompiledCodeCache::new(contract_code_cache_size_in_bytes));

crate::metrics::CARGO_PKG_VERSION
.with_label_values(&[NEARD_VERSION])
.inc();

Ok(Self {
s3_client,
db_manager: std::sync::Arc::new(Box::new(db_manager)),
Expand Down
6 changes: 6 additions & 0 deletions rpc-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ async fn rpc_handler(
})
.await
}
"EXPERIMENTAL_congestion_level" => {
process_method_call(request, |params| {
modules::blocks::methods::congestion_level(data, params)
})
.await
}
"EXPERIMENTAL_genesis_config" => {
process_method_call(request, |_: ()| {
modules::network::methods::genesis_config(data)
Expand Down
15 changes: 8 additions & 7 deletions rpc-server/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use actix_web::{get, Responder};
use prometheus::{Encoder, IntCounterVec, IntGauge, IntGaugeVec, Opts};
use prometheus::{CounterVec, Encoder, IntCounterVec, IntGauge, IntGaugeVec, Opts};

type Result<T, E> = std::result::Result<T, E>;

Expand Down Expand Up @@ -113,12 +113,13 @@ lazy_static! {
"Optimistic updating status. 0: working, 1: not working",
).unwrap();

pub(crate) static ref LEGACY_DATABASE_TX_DETAILS: IntCounterVec = register_int_counter_vec(
"legacy_database_tx_details",
"Total number of calls to the legacy database for transaction details",
// This declares a label named `lookup_type` to differentiate "finished" and "in_progress" transaction lookups
&["lookup_type"]
).unwrap();
pub(crate) static ref CARGO_PKG_VERSION: CounterVec = {
let opts = Opts::new("cargo_pkg_version", "Cargo package version. This is used to track the version of the running server.")
.variable_label("version");
let counter_vec = CounterVec::new(opts, &["version"]).expect("metric can be created");
prometheus::register(Box::new(counter_vec.clone())).unwrap();
counter_vec
};

// Error metrics
// 0: ReadRPC success, NEAR RPC success"
Expand Down
63 changes: 58 additions & 5 deletions rpc-server/src/modules/blocks/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub async fn chunk(
near_jsonrpc::primitives::types::chunks::RpcChunkError,
> {
tracing::debug!("`chunk` called with parameters: {:?}", request_data);
let chunk_result = fetch_chunk(&data, request_data.chunk_reference.clone()).await;
let chunk_result = fetch_chunk(&data, request_data.chunk_reference.clone(), "chunk").await;
#[cfg(feature = "shadow-data-consistency")]
{
crate::utils::shadow_compare_results_handler(
Expand All @@ -67,6 +67,58 @@ pub async fn chunk(
chunk_result
}

// EXPERIMENTAL_congestion_level rpc method implementation
#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(data)))]
pub async fn congestion_level(
data: Data<ServerContext>,
request_data: near_jsonrpc::primitives::types::congestion::RpcCongestionLevelRequest,
) -> Result<
near_jsonrpc::primitives::types::congestion::RpcCongestionLevelResponse,
near_jsonrpc::primitives::types::congestion::RpcCongestionLevelError,
> {
tracing::debug!(
"`EXPERIMENTAL_congestion_level` called with parameters: {:?}",
request_data
);
let chunk_view = fetch_chunk(
&data,
request_data.chunk_reference.clone(),
"EXPERIMENTAL_congestion_level",
)
.await?
.chunk_view;
let config_result = crate::modules::network::methods::protocol_config_call(
&data,
near_primitives::types::BlockReference::BlockId(near_primitives::types::BlockId::Height(
chunk_view.header.height_included,
)),
"EXPERIMENTAL_congestion_level",
)
.await;
let config = config_result.map_err(|err: near_jsonrpc::primitives::types::config::RpcProtocolConfigError| match err {
near_jsonrpc::primitives::types::config::RpcProtocolConfigError::UnknownBlock { error_message } => {
near_jsonrpc::primitives::types::congestion::RpcCongestionLevelError::UnknownBlock {
error_message,
}
}
near_jsonrpc::primitives::types::config::RpcProtocolConfigError::InternalError { error_message } => {
near_jsonrpc::primitives::types::congestion::RpcCongestionLevelError::InternalError {
error_message,
}
}
})?;
let congestion_level = chunk_view
.header
.congestion_info
.map(|info| info.congestion_level(config.runtime_config.congestion_control_config))
.unwrap_or(0.0);
Ok(
near_jsonrpc::primitives::types::congestion::RpcCongestionLevelResponse {
congestion_level,
},
)
}

/// `EXPERIMENTAL_changes` rpc method implementation
/// calls proxy_rpc_call to get `EXPERIMENTAL_changes` from near-rpc if request parameters not supported by read-rpc
/// as example: BlockReference for Finality::None is not supported by read-rpc
Expand Down Expand Up @@ -379,6 +431,7 @@ pub async fn fetch_block(
pub async fn fetch_chunk(
data: &Data<ServerContext>,
chunk_reference: near_jsonrpc::primitives::types::chunks::ChunkReference,
method_name: &str,
) -> Result<
near_jsonrpc::primitives::types::chunks::RpcChunkResponse,
near_jsonrpc::primitives::types::chunks::RpcChunkError,
Expand All @@ -400,7 +453,7 @@ pub async fn fetch_chunk(
}
near_primitives::types::BlockId::Hash(block_hash) => data
.db_manager
.get_block_height_by_hash(block_hash, "chunk")
.get_block_height_by_hash(block_hash, method_name)
.await
.map_err(|err| {
tracing::error!("Failed to fetch block by hash: {}", err);
Expand All @@ -412,7 +465,7 @@ pub async fn fetch_chunk(
// Check if the chunk stored in block with the given height
if let Ok(block_height_shard_id) = data
.db_manager
.get_block_by_height_and_shard_id(block_height, shard_id, "chunk")
.get_block_by_height_and_shard_id(block_height, shard_id, method_name)
.await
{
(block_height_shard_id.0, block_height_shard_id.1)
Expand All @@ -422,7 +475,7 @@ pub async fn fetch_chunk(
}
near_jsonrpc::primitives::types::chunks::ChunkReference::ChunkHash { chunk_id } => data
.db_manager
.get_block_by_chunk_hash(chunk_id, "chunk")
.get_block_by_chunk_hash(chunk_id, method_name)
.await
.map_err(
|_err| near_jsonrpc::primitives::types::chunks::RpcChunkError::UnknownChunk {
Expand All @@ -444,7 +497,7 @@ pub async fn fetch_chunk(
&near_primitives::types::BlockReference::BlockId(near_primitives::types::BlockId::Height(
block_height,
)),
"chunk",
method_name,
Some(block_height),
)
.await;
Expand Down
9 changes: 0 additions & 9 deletions rpc-server/src/modules/blocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,6 @@ impl BlocksInfoByFinality {
// Update final block info in the cache.
// Executes every second.
pub async fn update_final_block(&self, block_info: BlockInfo) {
tracing::debug!(
"Update final block info: {:?}",
block_info.block_cache.block_height
);
let mut final_block_lock = self.final_block.write().await;
final_block_lock.block_cache = block_info.block_cache;
final_block_lock.block_view = block_info.block_view;
Expand All @@ -348,11 +344,6 @@ impl BlocksInfoByFinality {
// Update optimistic block changes and optimistic block info in the cache.
// Executes every second.
pub async fn update_optimistic_block(&self, block_info: BlockInfo) {
tracing::debug!(
"Update optimistic block info: {:?}",
block_info.block_cache.block_height
);

let mut optimistic_changes_lock = self.optimistic_changes.write().await;
optimistic_changes_lock.account_changes = block_info.changes_in_block_account_map().await;

Expand Down
25 changes: 15 additions & 10 deletions rpc-server/src/modules/network/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,12 @@ pub async fn protocol_config(
request_data
);

let config_view = protocol_config_call(&data, request_data.block_reference.clone()).await;
let config_view = protocol_config_call(
&data,
request_data.block_reference.clone(),
"EXPERIMENTAL_protocol_config",
)
.await;

#[cfg(feature = "shadow-data-consistency")]
{
Expand Down Expand Up @@ -366,21 +371,21 @@ async fn validators_call(
Ok(validators.validators_info)
}

async fn protocol_config_call(
pub async fn protocol_config_call(
data: &Data<ServerContext>,
block_reference: near_primitives::types::BlockReference,
method_name: &str,
) -> Result<
near_chain_configs::ProtocolConfigView,
near_jsonrpc::primitives::types::config::RpcProtocolConfigError,
> {
let protocol_version =
get_protocol_version(data, block_reference, "EXPERIMENTAL_protocol_config")
.await
.map_err(|err| {
near_jsonrpc::primitives::types::config::RpcProtocolConfigError::UnknownBlock {
error_message: err.to_string(),
}
})?;
let protocol_version = get_protocol_version(data, block_reference, method_name)
.await
.map_err(|err| {
near_jsonrpc::primitives::types::config::RpcProtocolConfigError::UnknownBlock {
error_message: err.to_string(),
}
})?;
// Stores runtime config for each protocol version
// Create store of runtime configs for the given chain id.
//
Expand Down
Loading

0 comments on commit 940ded2

Please sign in to comment.