diff --git a/mm2src/mm2_main/src/lp_native_dex.rs b/mm2src/mm2_main/src/lp_native_dex.rs index 1e5f7feff0..3af6d231f1 100644 --- a/mm2src/mm2_main/src/lp_native_dex.rs +++ b/mm2src/mm2_main/src/lp_native_dex.rs @@ -52,7 +52,8 @@ use crate::lp_message_service::{init_message_service, InitMessageServiceError}; use crate::lp_network::{lp_network_ports, p2p_event_process_loop, subscribe_to_topic, NetIdError}; use crate::lp_ordermatch::{broadcast_maker_orders_keep_alive_loop, clean_memory_loop, init_ordermatch_context, lp_ordermatch_loop, orders_kick_start, BalanceUpdateOrdermatchHandler, OrdermatchInitError}; -use crate::lp_swap::{running_swaps_num, swap_kick_starts}; +use crate::lp_swap; +use crate::lp_swap::swap_kick_starts; use crate::lp_wallet::{initialize_wallet_passphrase, WalletInitError}; use crate::rpc::spawn_rpc; @@ -535,14 +536,8 @@ pub async fn lp_init(ctx: MmArc, version: String, datetime: String) -> MmInitRes }; Timer::sleep(0.2).await } + lp_swap::clear_running_swaps(&ctx); - // wait for swaps to stop - loop { - if running_swaps_num(&ctx) == 0 { - break; - }; - Timer::sleep(0.2).await - } Ok(()) } diff --git a/mm2src/mm2_main/src/lp_swap.rs b/mm2src/mm2_main/src/lp_swap.rs index 507b9a6f51..d2990e6e25 100644 --- a/mm2src/mm2_main/src/lp_swap.rs +++ b/mm2src/mm2_main/src/lp_swap.rs @@ -88,7 +88,7 @@ use std::convert::TryFrom; use std::num::NonZeroUsize; use std::path::PathBuf; use std::str::FromStr; -use std::sync::{Arc, Mutex, Weak}; +use std::sync::{Arc, Mutex}; use std::time::Duration; use uuid::Uuid; @@ -518,7 +518,7 @@ struct LockedAmountInfo { } struct SwapsContext { - running_swaps: Mutex>>, + running_swaps: Mutex>>, active_swaps_v2_infos: Mutex>, banned_pubkeys: Mutex>, swap_msgs: Mutex>, @@ -534,7 +534,7 @@ impl SwapsContext { fn from_ctx(ctx: &MmArc) -> Result, String> { Ok(try_s!(from_ctx(&ctx.swaps_ctx, move || { Ok(SwapsContext { - running_swaps: Mutex::new(vec![]), + running_swaps: Mutex::new(HashMap::new()), active_swaps_v2_infos: Mutex::new(HashMap::new()), banned_pubkeys: Mutex::new(HashMap::new()), swap_msgs: Mutex::new(HashMap::new()), @@ -619,21 +619,21 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber { let swap_ctx = SwapsContext::from_ctx(ctx).unwrap(); let swap_lock = swap_ctx.running_swaps.lock().unwrap(); - let mut locked = swap_lock - .iter() - .filter_map(|swap| swap.upgrade()) - .flat_map(|swap| swap.locked_amount()) - .fold(MmNumber::from(0), |mut total_amount, locked| { - if locked.coin == coin { - total_amount += locked.amount; - } - if let Some(trade_fee) = locked.trade_fee { - if trade_fee.coin == coin && !trade_fee.paid_from_trading_vol { - total_amount += trade_fee.amount; + let mut locked = + swap_lock + .values() + .flat_map(|swap| swap.locked_amount()) + .fold(MmNumber::from(0), |mut total_amount, locked| { + if locked.coin == coin { + total_amount += locked.amount; } - } - total_amount - }); + if let Some(trade_fee) = locked.trade_fee { + if trade_fee.coin == coin && !trade_fee.paid_from_trading_vol { + total_amount += trade_fee.amount; + } + } + total_amount + }); drop(swap_lock); let locked_amounts = swap_ctx.locked_amounts.lock().unwrap(); @@ -655,13 +655,9 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber { } /// Get number of currently running swaps -pub fn running_swaps_num(ctx: &MmArc) -> u64 { +pub fn clear_running_swaps(ctx: &MmArc) { let swap_ctx = SwapsContext::from_ctx(ctx).unwrap(); - let swaps = swap_ctx.running_swaps.lock().unwrap(); - swaps.iter().fold(0, |total, swap| match swap.upgrade() { - Some(_) => total + 1, - None => total, - }) + swap_ctx.running_swaps.lock().unwrap().clear(); } /// Get total amount of selected coin locked by all currently ongoing swaps except the one with selected uuid @@ -670,8 +666,7 @@ fn get_locked_amount_by_other_swaps(ctx: &MmArc, except_uuid: &Uuid, coin: &str) let swap_lock = swap_ctx.running_swaps.lock().unwrap(); swap_lock - .iter() - .filter_map(|swap| swap.upgrade()) + .values() .filter(|swap| swap.uuid() != except_uuid) .flat_map(|swap| swap.locked_amount()) .fold(MmNumber::from(0), |mut total_amount, locked| { @@ -691,11 +686,9 @@ pub fn active_swaps_using_coins(ctx: &MmArc, coins: &HashSet) -> Result< let swap_ctx = try_s!(SwapsContext::from_ctx(ctx)); let swaps = try_s!(swap_ctx.running_swaps.lock()); let mut uuids = vec![]; - for swap in swaps.iter() { - if let Some(swap) = swap.upgrade() { - if coins.contains(&swap.maker_coin().to_string()) || coins.contains(&swap.taker_coin().to_string()) { - uuids.push(*swap.uuid()) - } + for swap in swaps.values() { + if coins.contains(&swap.maker_coin().to_string()) || coins.contains(&swap.taker_coin().to_string()) { + uuids.push(*swap.uuid()) } } drop(swaps); @@ -711,15 +704,13 @@ pub fn active_swaps_using_coins(ctx: &MmArc, coins: &HashSet) -> Result< pub fn active_swaps(ctx: &MmArc) -> Result, String> { let swap_ctx = try_s!(SwapsContext::from_ctx(ctx)); - let swaps = swap_ctx.running_swaps.lock().unwrap(); - let mut uuids = vec![]; - for swap in swaps.iter() { - if let Some(swap) = swap.upgrade() { - uuids.push((*swap.uuid(), LEGACY_SWAP_TYPE)) - } - } - - drop(swaps); + let mut uuids: Vec<_> = swap_ctx + .running_swaps + .lock() + .unwrap() + .keys() + .map(|uuid| (*uuid, LEGACY_SWAP_TYPE)) + .collect(); let swaps_v2 = swap_ctx.active_swaps_v2_infos.lock().unwrap(); uuids.extend(swaps_v2.iter().map(|(uuid, info)| (*uuid, info.swap_type))); diff --git a/mm2src/mm2_main/src/lp_swap/maker_swap.rs b/mm2src/mm2_main/src/lp_swap/maker_swap.rs index 0eb72b8a71..b720fd6b98 100644 --- a/mm2src/mm2_main/src/lp_swap/maker_swap.rs +++ b/mm2src/mm2_main/src/lp_swap/maker_swap.rs @@ -2095,10 +2095,14 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) { }; } let running_swap = Arc::new(swap); - let weak_ref = Arc::downgrade(&running_swap); let swap_ctx = SwapsContext::from_ctx(&ctx).unwrap(); swap_ctx.init_msg_store(running_swap.uuid, running_swap.taker); - swap_ctx.running_swaps.lock().unwrap().push(weak_ref); + // Register the swap in the running swaps map. + swap_ctx + .running_swaps + .lock() + .unwrap() + .insert(uuid, running_swap.clone()); let mut swap_fut = Box::pin( async move { let mut events; @@ -2162,6 +2166,8 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) { _swap = swap_fut => (), // swap finished normally _touch = touch_loop => unreachable!("Touch loop can not stop!"), }; + // Remove the swap from the running swaps map. + swap_ctx.running_swaps.lock().unwrap().remove(&uuid); } pub struct MakerSwapPreparedParams { diff --git a/mm2src/mm2_main/src/lp_swap/taker_swap.rs b/mm2src/mm2_main/src/lp_swap/taker_swap.rs index c7b1cf59a9..1b199508aa 100644 --- a/mm2src/mm2_main/src/lp_swap/taker_swap.rs +++ b/mm2src/mm2_main/src/lp_swap/taker_swap.rs @@ -458,14 +458,17 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) { let ctx = swap.ctx.clone(); subscribe_to_topic(&ctx, swap_topic(&swap.uuid)); let mut status = ctx.log.status_handle(); - let uuid = swap.uuid.to_string(); + let uuid_str = uuid.to_string(); let to_broadcast = !(swap.maker_coin.is_privacy() || swap.taker_coin.is_privacy()); let running_swap = Arc::new(swap); - let weak_ref = Arc::downgrade(&running_swap); let swap_ctx = SwapsContext::from_ctx(&ctx).unwrap(); swap_ctx.init_msg_store(running_swap.uuid, running_swap.maker); - swap_ctx.running_swaps.lock().unwrap().push(weak_ref); - + // Register the swap in the running swaps map. + swap_ctx + .running_swaps + .lock() + .unwrap() + .insert(uuid, running_swap.clone()); let mut swap_fut = Box::pin( async move { let mut events; @@ -491,10 +494,10 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) { } if event.is_error() { - error!("[swap uuid={uuid}] {event:?}"); + error!("[swap uuid={uuid_str}] {event:?}"); } - status.status(&[&"swap", &("uuid", uuid.as_str())], &event.status_str()); + status.status(&[&"swap", &("uuid", uuid_str.as_str())], &event.status_str()); running_swap.apply_event(event); } match res.0 { @@ -503,12 +506,12 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) { }, None => { if let Err(e) = mark_swap_as_finished(ctx.clone(), running_swap.uuid).await { - error!("!mark_swap_finished({}): {}", uuid, e); + error!("!mark_swap_finished({}): {}", uuid_str, e); } if to_broadcast { if let Err(e) = broadcast_my_swap_status(&ctx, running_swap.uuid).await { - error!("!broadcast_my_swap_status({}): {}", uuid, e); + error!("!broadcast_my_swap_status({}): {}", uuid_str, e); } } break; @@ -522,6 +525,8 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) { _swap = swap_fut => (), // swap finished normally _touch = touch_loop => unreachable!("Touch loop can not stop!"), }; + // Remove the swap from the running swaps map. + swap_ctx.running_swaps.lock().unwrap().remove(&uuid); } #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)] @@ -3276,8 +3281,7 @@ mod taker_swap_tests { .unwrap(); let swaps_ctx = SwapsContext::from_ctx(&ctx).unwrap(); let arc = Arc::new(swap); - let weak_ref = Arc::downgrade(&arc); - swaps_ctx.running_swaps.lock().unwrap().push(weak_ref); + swaps_ctx.running_swaps.lock().unwrap().insert(arc.uuid, arc); let actual = get_locked_amount(&ctx, "RICK"); assert_eq!(actual, MmNumber::from(0));