Skip to content

Commit

Permalink
use timed-map crate instead of internal ExpirableMap type
Browse files Browse the repository at this point in the history
  • Loading branch information
shamardy committed Oct 21, 2024
1 parent ad83236 commit e2b814e
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 181 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mm2src/coins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ sha3 = "0.9"
utxo_signer = { path = "utxo_signer" }
# using the same version as cosmrs
tendermint-rpc = { version = "0.34", default-features = false }
timed-map = "0.1.0"
tokio-tungstenite-wasm = { git = "https://github.com/KomodoPlatform/tokio-tungstenite-wasm", rev = "d20abdb", features = ["rustls-tls-native-roots"]}
url = { version = "2.2.2", features = ["serde"] }
uuid = { version = "1.2.2", features = ["fast-rng", "serde", "v4"] }
Expand Down
10 changes: 5 additions & 5 deletions mm2src/coins/eth/web3_transport/websocket_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::eth::web3_transport::Web3SendOut;
use crate::eth::{EthCoin, RpcTransportEventHandlerShared};
use crate::{MmCoin, RpcTransportEventHandler};
use common::executor::{AbortSettings, SpawnAbortable, Timer};
use common::expirable_map::ExpirableMap;
use common::log;
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot;
Expand All @@ -25,6 +24,7 @@ use proxy_signature::{ProxySign, RawMessage};
use std::sync::atomic::AtomicBool;
use std::sync::{atomic::{AtomicUsize, Ordering},
Arc};
use timed_map::{StdClock, TimedMap};
use tokio_tungstenite_wasm::WebSocketStream;
use web3::error::{Error, TransportError};
use web3::helpers::to_string;
Expand Down Expand Up @@ -137,15 +137,15 @@ impl WebsocketTransport {
&self,
request: Option<ControllerMessage>,
wsocket: &mut WebSocketStream,
response_notifiers: &mut ExpirableMap<usize, oneshot::Sender<Vec<u8>>>,
response_notifiers: &mut TimedMap<StdClock, usize, oneshot::Sender<Vec<u8>>>,
) -> OuterAction {
match request {
Some(ControllerMessage::Request(WsRequest {
request_id,
serialized_request,
response_notifier,
})) => {
response_notifiers.insert(
response_notifiers.insert_expirable(
request_id,
response_notifier,
// Since request will be cancelled when timeout occurs, we are free to drop its state.
Expand Down Expand Up @@ -188,7 +188,7 @@ impl WebsocketTransport {
async fn handle_response(
&self,
message: Option<Result<tokio_tungstenite_wasm::Message, tokio_tungstenite_wasm::Error>>,
response_notifiers: &mut ExpirableMap<usize, oneshot::Sender<Vec<u8>>>,
response_notifiers: &mut TimedMap<StdClock, usize, oneshot::Sender<Vec<u8>>>,
) -> OuterAction {
match message {
Some(Ok(tokio_tungstenite_wasm::Message::Text(inc_event))) => {
Expand Down Expand Up @@ -249,7 +249,7 @@ impl WebsocketTransport {
let _guard = self.connection_guard.lock().await;

// List of awaiting requests
let mut response_notifiers: ExpirableMap<RequestId, oneshot::Sender<Vec<u8>>> = ExpirableMap::default();
let mut response_notifiers: TimedMap<StdClock, RequestId, oneshot::Sender<Vec<u8>>> = TimedMap::default();

let mut wsocket = match self
.attempt_to_establish_socket_connection(MAX_ATTEMPTS, SLEEP_DURATION)
Expand Down
2 changes: 1 addition & 1 deletion mm2src/common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ pub mod crash_reports;
pub mod custom_futures;
pub mod custom_iter;
#[path = "executor/mod.rs"] pub mod executor;
pub mod expirable_map;
pub mod expirable_entry;
pub mod number_type_casting;
pub mod password_policy;
pub mod seri;
Expand Down
34 changes: 34 additions & 0 deletions mm2src/common/expirable_entry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//! This module provides a cross-compatible map that associates values with keys and supports expiring entries.
//!
//! Designed for performance-oriented use-cases utilizing `FxHashMap` under the hood,
//! and is not suitable for cryptographic purposes.
use instant::{Duration, Instant};
#[derive(Clone, Debug)]
pub struct ExpirableEntry<V> {
pub(crate) value: V,
pub(crate) expires_at: Instant,
}

impl<V> ExpirableEntry<V> {
#[inline(always)]
pub fn new(v: V, exp: Duration) -> Self {
Self {
expires_at: Instant::now() + exp,
value: v,
}
}

#[inline(always)]
pub fn get_element(&self) -> &V { &self.value }

#[inline(always)]
pub fn update_value(&mut self, v: V) { self.value = v }

#[inline(always)]
pub fn update_expiration(&mut self, expires_at: Instant) { self.expires_at = expires_at }

/// Checks whether entry has longer ttl than the given one.
#[inline(always)]
pub fn has_longer_life_than(&self, min_ttl: Duration) -> bool { self.expires_at > Instant::now() + min_ttl }
}
163 changes: 0 additions & 163 deletions mm2src/common/expirable_map.rs

This file was deleted.

2 changes: 1 addition & 1 deletion mm2src/common/time_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::collections::hash_map::{self,
use std::collections::VecDeque;
use std::time::Duration;

use crate::expirable_map::ExpirableEntry;
use crate::expirable_entry::ExpirableEntry;

#[derive(Debug)]
pub struct TimeCache<Key, Value> {
Expand Down
1 change: 1 addition & 0 deletions mm2src/mm2_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ser_error = { path = "../derives/ser_error" }
ser_error_derive = { path = "../derives/ser_error_derive" }
serde_json = { version = "1", features = ["preserve_order", "raw_value"] }
shared_ref_counter = { path = "../common/shared_ref_counter" }
timed-map = "0.1.0"
uuid = { version = "1.2.2", features = ["fast-rng", "serde", "v4"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
Expand Down
6 changes: 3 additions & 3 deletions mm2src/mm2_core/src/data_asker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use common::expirable_map::ExpirableMap;
use common::{HttpStatusCode, StatusCode};
use derive_more::Display;
use futures::channel::oneshot;
Expand All @@ -12,6 +11,7 @@ use serde::{Deserialize, Serialize};
use serde_json::json;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use timed_map::{StdClock, TimedMap};

use crate::mm_ctx::{MmArc, MmCtx};

Expand All @@ -20,7 +20,7 @@ const EVENT_NAME: &str = "DATA_NEEDED";
#[derive(Clone, Debug, Default)]
pub struct DataAsker {
data_id: Arc<AtomicUsize>,
awaiting_asks: Arc<AsyncMutex<ExpirableMap<usize, oneshot::Sender<serde_json::Value>>>>,
awaiting_asks: Arc<AsyncMutex<TimedMap<StdClock, usize, oneshot::Sender<serde_json::Value>>>>,
}

#[derive(Debug, Display)]
Expand Down Expand Up @@ -59,7 +59,7 @@ impl MmCtx {
.awaiting_asks
.lock()
.await
.insert(data_id, sender, timeout);
.insert_expirable(data_id, sender, timeout);
}

let input = json!({
Expand Down
10 changes: 5 additions & 5 deletions mm2src/mm2_core/src/mm_ctx.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
#[cfg(feature = "track-ctx-pointer")]
use common::executor::Timer;
use common::executor::{abortable_queue::{AbortableQueue, WeakSpawner},
graceful_shutdown, AbortSettings, AbortableSystem, SpawnAbortable, SpawnFuture};
use common::log::{self, LogLevel, LogOnError, LogState};
use common::{cfg_native, cfg_wasm32, small_rng};
use common::{executor::{abortable_queue::{AbortableQueue, WeakSpawner},
graceful_shutdown, AbortSettings, AbortableSystem, SpawnAbortable, SpawnFuture},
expirable_map::ExpirableMap};
use futures::channel::oneshot;
use futures::lock::Mutex as AsyncMutex;
use gstuff::{try_s, Constructible, ERR, ERRL};
Expand All @@ -23,6 +22,7 @@ use std::fmt;
use std::future::Future;
use std::ops::Deref;
use std::sync::{Arc, Mutex};
use timed_map::{StdClock, TimedMap};

use crate::data_asker::DataAsker;

Expand Down Expand Up @@ -146,7 +146,7 @@ pub struct MmCtx {
#[cfg(not(target_arch = "wasm32"))]
pub async_sqlite_connection: Constructible<Arc<AsyncMutex<AsyncConnection>>>,
/// Links the RPC context to the P2P context to handle health check responses.
pub healthcheck_response_handler: AsyncMutex<ExpirableMap<PeerAddress, oneshot::Sender<()>>>,
pub healthcheck_response_handler: AsyncMutex<TimedMap<StdClock, PeerAddress, oneshot::Sender<()>>>,
}

impl MmCtx {
Expand Down Expand Up @@ -196,7 +196,7 @@ impl MmCtx {
nft_ctx: Mutex::new(None),
#[cfg(not(target_arch = "wasm32"))]
async_sqlite_connection: Constructible::default(),
healthcheck_response_handler: AsyncMutex::new(ExpirableMap::default()),
healthcheck_response_handler: AsyncMutex::new(TimedMap::default()),
}
}

Expand Down
Loading

0 comments on commit e2b814e

Please sign in to comment.