Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(mem-leak): running_swap never shrinks #2301

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 30 additions & 38 deletions mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ use std::convert::TryFrom;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex, Weak};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use uuid::Uuid;

Expand Down Expand Up @@ -518,7 +518,7 @@ struct LockedAmountInfo {
}

struct SwapsContext {
running_swaps: Mutex<Vec<Weak<dyn AtomicSwap>>>,
running_swaps: Mutex<HashMap<Uuid, Arc<dyn AtomicSwap>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we intentionally chose a weak reference instead of Arc as it might avoid cyclic references (similar to how it's done for MmArc with MmWeak in coins) 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not that i can trivially find the AtomicSwap pointing back to it self (points to mmctx tho:

)
you see, such questions are really hard to answer in such a codebase and require digging xD
it's much easier/managable to avoid the arc/weak model and use our abortable systems to make sure cyclic refs aren't problematic.

i went for arc here tho since we already manually remove the swap after it finishes or after stopping it. since we drop the thing manually we don't need to complicate it with weak refs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth to test that change (by using test_mm2_stops_immediately test) to make sure that's not the case.

active_swaps_v2_infos: Mutex<HashMap<Uuid, ActiveSwapV2Info>>,
banned_pubkeys: Mutex<HashMap<H256Json, BanReason>>,
swap_msgs: Mutex<HashMap<Uuid, SwapMsgStore>>,
Expand All @@ -534,7 +534,7 @@ impl SwapsContext {
fn from_ctx(ctx: &MmArc) -> Result<Arc<SwapsContext>, String> {
Ok(try_s!(from_ctx(&ctx.swaps_ctx, move || {
Ok(SwapsContext {
running_swaps: Mutex::new(vec![]),
running_swaps: Mutex::new(HashMap::new()),
active_swaps_v2_infos: Mutex::new(HashMap::new()),
banned_pubkeys: Mutex::new(HashMap::new()),
swap_msgs: Mutex::new(HashMap::new()),
Expand Down Expand Up @@ -619,21 +619,21 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber {
let swap_ctx = SwapsContext::from_ctx(ctx).unwrap();
let swap_lock = swap_ctx.running_swaps.lock().unwrap();

let mut locked = swap_lock
.iter()
.filter_map(|swap| swap.upgrade())
.flat_map(|swap| swap.locked_amount())
.fold(MmNumber::from(0), |mut total_amount, locked| {
if locked.coin == coin {
total_amount += locked.amount;
}
if let Some(trade_fee) = locked.trade_fee {
if trade_fee.coin == coin && !trade_fee.paid_from_trading_vol {
total_amount += trade_fee.amount;
let mut locked =
swap_lock
.values()
.flat_map(|swap| swap.locked_amount())
.fold(MmNumber::from(0), |mut total_amount, locked| {
if locked.coin == coin {
total_amount += locked.amount;
}
}
total_amount
});
if let Some(trade_fee) = locked.trade_fee {
if trade_fee.coin == coin && !trade_fee.paid_from_trading_vol {
total_amount += trade_fee.amount;
}
}
total_amount
});
drop(swap_lock);

let locked_amounts = swap_ctx.locked_amounts.lock().unwrap();
Expand All @@ -657,11 +657,8 @@ pub fn get_locked_amount(ctx: &MmArc, coin: &str) -> MmNumber {
/// Get number of currently running swaps
pub fn running_swaps_num(ctx: &MmArc) -> u64 {
let swap_ctx = SwapsContext::from_ctx(ctx).unwrap();
let swaps = swap_ctx.running_swaps.lock().unwrap();
swaps.iter().fold(0, |total, swap| match swap.upgrade() {
Some(_) => total + 1,
None => total,
})
let count = swap_ctx.running_swaps.lock().unwrap().len();
count as u64
}

/// Get total amount of selected coin locked by all currently ongoing swaps except the one with selected uuid
Expand All @@ -670,8 +667,7 @@ fn get_locked_amount_by_other_swaps(ctx: &MmArc, except_uuid: &Uuid, coin: &str)
let swap_lock = swap_ctx.running_swaps.lock().unwrap();

swap_lock
.iter()
.filter_map(|swap| swap.upgrade())
.values()
.filter(|swap| swap.uuid() != except_uuid)
.flat_map(|swap| swap.locked_amount())
.fold(MmNumber::from(0), |mut total_amount, locked| {
Expand All @@ -691,11 +687,9 @@ pub fn active_swaps_using_coins(ctx: &MmArc, coins: &HashSet<String>) -> Result<
let swap_ctx = try_s!(SwapsContext::from_ctx(ctx));
let swaps = try_s!(swap_ctx.running_swaps.lock());
let mut uuids = vec![];
for swap in swaps.iter() {
if let Some(swap) = swap.upgrade() {
if coins.contains(&swap.maker_coin().to_string()) || coins.contains(&swap.taker_coin().to_string()) {
uuids.push(*swap.uuid())
}
for swap in swaps.values() {
if coins.contains(&swap.maker_coin().to_string()) || coins.contains(&swap.taker_coin().to_string()) {
uuids.push(*swap.uuid())
}
}
drop(swaps);
Expand All @@ -711,15 +705,13 @@ pub fn active_swaps_using_coins(ctx: &MmArc, coins: &HashSet<String>) -> Result<

pub fn active_swaps(ctx: &MmArc) -> Result<Vec<(Uuid, u8)>, String> {
let swap_ctx = try_s!(SwapsContext::from_ctx(ctx));
let swaps = swap_ctx.running_swaps.lock().unwrap();
let mut uuids = vec![];
for swap in swaps.iter() {
if let Some(swap) = swap.upgrade() {
uuids.push((*swap.uuid(), LEGACY_SWAP_TYPE))
}
}

drop(swaps);
let mut uuids: Vec<_> = swap_ctx
.running_swaps
.lock()
.unwrap()
.keys()
.map(|uuid| (*uuid, LEGACY_SWAP_TYPE))
.collect();

let swaps_v2 = swap_ctx.active_swaps_v2_infos.lock().unwrap();
uuids.extend(swaps_v2.iter().map(|(uuid, info)| (*uuid, info.swap_type)));
Expand Down
10 changes: 8 additions & 2 deletions mm2src/mm2_main/src/lp_swap/maker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2095,10 +2095,14 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) {
};
}
let running_swap = Arc::new(swap);
let weak_ref = Arc::downgrade(&running_swap);
let swap_ctx = SwapsContext::from_ctx(&ctx).unwrap();
swap_ctx.init_msg_store(running_swap.uuid, running_swap.taker);
swap_ctx.running_swaps.lock().unwrap().push(weak_ref);
// Register the swap in the running swaps map.
swap_ctx
.running_swaps
.lock()
.unwrap()
.insert(uuid, running_swap.clone());
let mut swap_fut = Box::pin(
async move {
let mut events;
Expand Down Expand Up @@ -2162,6 +2166,8 @@ pub async fn run_maker_swap(swap: RunMakerSwapInput, ctx: MmArc) {
_swap = swap_fut => (), // swap finished normally
_touch = touch_loop => unreachable!("Touch loop can not stop!"),
};
// Remove the swap from the running swaps map.
swap_ctx.running_swaps.lock().unwrap().remove(&uuid);
}

pub struct MakerSwapPreparedParams {
Expand Down
25 changes: 15 additions & 10 deletions mm2src/mm2_main/src/lp_swap/taker_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,14 +458,17 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
let ctx = swap.ctx.clone();
subscribe_to_topic(&ctx, swap_topic(&swap.uuid));
let mut status = ctx.log.status_handle();
let uuid = swap.uuid.to_string();
let uuid_str = uuid.to_string();
let to_broadcast = !(swap.maker_coin.is_privacy() || swap.taker_coin.is_privacy());
let running_swap = Arc::new(swap);
let weak_ref = Arc::downgrade(&running_swap);
let swap_ctx = SwapsContext::from_ctx(&ctx).unwrap();
swap_ctx.init_msg_store(running_swap.uuid, running_swap.maker);
swap_ctx.running_swaps.lock().unwrap().push(weak_ref);

// Register the swap in the running swaps map.
swap_ctx
.running_swaps
.lock()
.unwrap()
.insert(uuid, running_swap.clone());
let mut swap_fut = Box::pin(
async move {
let mut events;
Expand All @@ -491,10 +494,10 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
}

if event.is_error() {
error!("[swap uuid={uuid}] {event:?}");
error!("[swap uuid={uuid_str}] {event:?}");
}

status.status(&[&"swap", &("uuid", uuid.as_str())], &event.status_str());
status.status(&[&"swap", &("uuid", uuid_str.as_str())], &event.status_str());
running_swap.apply_event(event);
}
match res.0 {
Expand All @@ -503,12 +506,12 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
},
None => {
if let Err(e) = mark_swap_as_finished(ctx.clone(), running_swap.uuid).await {
error!("!mark_swap_finished({}): {}", uuid, e);
error!("!mark_swap_finished({}): {}", uuid_str, e);
}

if to_broadcast {
if let Err(e) = broadcast_my_swap_status(&ctx, running_swap.uuid).await {
error!("!broadcast_my_swap_status({}): {}", uuid, e);
error!("!broadcast_my_swap_status({}): {}", uuid_str, e);
}
}
break;
Expand All @@ -522,6 +525,8 @@ pub async fn run_taker_swap(swap: RunTakerSwapInput, ctx: MmArc) {
_swap = swap_fut => (), // swap finished normally
_touch = touch_loop => unreachable!("Touch loop can not stop!"),
};
// Remove the swap from the running swaps map.
swap_ctx.running_swaps.lock().unwrap().remove(&uuid);
}

#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
Expand Down Expand Up @@ -3276,8 +3281,8 @@ mod taker_swap_tests {
.unwrap();
let swaps_ctx = SwapsContext::from_ctx(&ctx).unwrap();
let arc = Arc::new(swap);
let weak_ref = Arc::downgrade(&arc);
swaps_ctx.running_swaps.lock().unwrap().push(weak_ref);
// Create a dummy abort handle as if it was a running swap.
swaps_ctx.running_swaps.lock().unwrap().insert(arc.uuid, arc);

let actual = get_locked_amount(&ctx, "RICK");
assert_eq!(actual, MmNumber::from(0));
Expand Down
Loading