From 1d4466ad952b5f6df767852045c8775c55d5b582 Mon Sep 17 00:00:00 2001 From: Roardom Date: Thu, 18 Jul 2024 16:04:34 +0000 Subject: [PATCH] update: add option to require peer connectivity In response to https://github.com/arvidn/libtorrent/pull/7708. We prefer our peers to be connectable. A significant majority of peers in prod (80+%) are already connectable. We'll see how quickly the remaining 20% bounce back. --- .env.example | 7 ++++++ src/announce.rs | 40 ++++++++++++++++++++------------- src/config.rs | 10 +++++++++ src/tracker.rs | 2 +- src/tracker/blacklisted_port.rs | 7 +----- src/tracker/peer.rs | 21 ++++++++++++----- src/tracker/torrent.rs | 11 ++++----- src/warning.rs | 2 ++ 8 files changed, 66 insertions(+), 34 deletions(-) diff --git a/.env.example b/.env.example index 1814eb6..91b73db 100644 --- a/.env.example +++ b/.env.example @@ -104,6 +104,13 @@ IS_CONNECTIVITY_CHECK_ENABLED=false # Default: 1800 CONNECTIVITY_CHECK_INTERVAL=1800 +# When enabled, restrict peers to those with open ports. Peers with closed +# ports will receive empty peer lists and are not included in other returned +# peer lists. Requires `IS_CONNECTIVITY_CHECK_ENABLED` to be `true`. +# +# Default: false +REQUIRE_PEER_CONNECTIVITY=false + # Enable logging of all successful announces to the `announces` table for # debugging. This will generate significant amounts of data. Do not # enable if you do not know what you are doing. diff --git a/src/announce.rs b/src/announce.rs index 8e6e88b..7f2813e 100644 --- a/src/announce.rs +++ b/src/announce.rs @@ -329,6 +329,14 @@ pub async fn announce( .get(&queries.info_hash) .ok_or(InfoHashNotFound)?; + let is_connectable = check_connectivity(&tracker, client_ip, queries.port).await; + + let mut warnings: Vec = Vec::new(); + + if !is_connectable && tracker.config.require_peer_connectivity { + warnings.push(AnnounceWarning::ConnectivityIssueDetected); + } + let ( upload_factor, download_factor, @@ -392,7 +400,6 @@ pub async fn announce( let leecher_delta; let times_completed_delta; let is_visible; - let mut warnings: Vec = Vec::new(); if queries.event == Event::Stopped { // Try and remove the peer @@ -407,8 +414,8 @@ pub async fn announce( uploaded_delta = queries.uploaded.saturating_sub(peer.uploaded); downloaded_delta = queries.downloaded.saturating_sub(peer.downloaded); - leecher_delta = 0 - peer.is_included_in_leech_list() as i32; - seeder_delta = 0 - peer.is_included_in_seed_list() as i32; + leecher_delta = 0 - peer.is_included_in_leech_list(&tracker.config) as i32; + seeder_delta = 0 - peer.is_included_in_seed_list(&tracker.config) as i32; } else { // Some clients (namely transmission) will keep sending // `stopped` events until a successful announce is received. @@ -444,8 +451,9 @@ pub async fn announce( peer.ip_address = client_ip; peer.port = queries.port; peer.is_seeder = queries.left == 0; - peer.is_visible = - peer.is_included_in_leech_list() || !has_hit_download_slot_limit; + peer.is_connectable = is_connectable; + peer.is_visible = peer.is_included_in_leech_list(&tracker.config) + || !has_hit_download_slot_limit; peer.is_active = true; peer.updated_at = Utc::now(); peer.uploaded = queries.uploaded; @@ -459,6 +467,7 @@ pub async fn announce( is_seeder: queries.left == 0, is_active: true, is_visible: !has_hit_download_slot_limit, + is_connectable, updated_at: Utc::now(), uploaded: queries.uploaded, downloaded: queries.downloaded, @@ -475,10 +484,10 @@ pub async fn announce( // in-memory db match old_peer { Some(old_peer) => { - leecher_delta = new_peer.is_included_in_leech_list() as i32 - - old_peer.is_included_in_leech_list() as i32; - seeder_delta = new_peer.is_included_in_seed_list() as i32 - - old_peer.is_included_in_seed_list() as i32; + leecher_delta = new_peer.is_included_in_leech_list(&tracker.config) as i32 + - old_peer.is_included_in_leech_list(&tracker.config) as i32; + seeder_delta = new_peer.is_included_in_seed_list(&tracker.config) as i32 + - old_peer.is_included_in_seed_list(&tracker.config) as i32; times_completed_delta = (new_peer.is_seeder && !old_peer.is_seeder) as u32; // Calculate change in upload and download compared to previous @@ -512,7 +521,8 @@ pub async fn announce( let mut peer_count = 0; for &peer in torrent.peers.values() { - if peer.user_id == user_id && peer.is_included_in_peer_list() { + if peer.user_id == user_id && peer.is_included_in_peer_list(&tracker.config) + { peer_count += 1; if peer_count >= tracker.config.max_peers_per_torrent_per_user { @@ -528,8 +538,8 @@ pub async fn announce( } } - leecher_delta = new_peer.is_included_in_leech_list() as i32; - seeder_delta = new_peer.is_included_in_seed_list() as i32; + leecher_delta = new_peer.is_included_in_leech_list(&tracker.config) as i32; + seeder_delta = new_peer.is_included_in_seed_list(&tracker.config) as i32; times_completed_delta = 0; // Calculate change in upload and download compared to previous @@ -564,7 +574,7 @@ pub async fn announce( // Don't return peers with the same user id or those that are marked as inactive let valid_peers = torrent.peers.iter().filter(|(_index, peer)| { - peer.user_id != user_id && peer.is_included_in_peer_list() + peer.user_id != user_id && peer.is_included_in_peer_list(&tracker.config) }); // Make sure leech peer lists are filled with seeds @@ -743,8 +753,6 @@ pub async fn announce( }); } - let connectable = check_connectivity(&tracker, client_ip, queries.port).await; - tracker.peer_updates.lock().upsert(PeerUpdate { peer_id: queries.peer_id, ip: client_ip, @@ -759,7 +767,7 @@ pub async fn announce( torrent_id, user_id, updated_at: Utc::now(), - connectable, + connectable: is_connectable, }); tracker.history_updates.lock().upsert(HistoryUpdate { diff --git a/src/config.rs b/src/config.rs index e5fb7a1..f2ec098 100644 --- a/src/config.rs +++ b/src/config.rs @@ -57,6 +57,10 @@ pub struct Config { /// The minimum number of seconds a socket's connectivity status is cached for /// before rechecking the peer's connectivity. Use `-1` for no caching. pub connectivity_check_interval: i64, + /// When enabled, restrict peers to those with open ports. Peers with closed + /// ports will receive empty peer lists and are not included in other returned + /// peer lists. Requires `IS_CONNECTIVITY_CHECK_ENABLED` to be `true`. + pub require_peer_connectivity: bool, /// Enable logging of all successful announces to the `announces` table for /// debugging. This will generate significant amounts of data. Do not /// enable if you do not know what you are doing. @@ -145,6 +149,11 @@ impl Config { .parse() .context("CONNECTIVITY_CHECK_INTERVAL must be a number between -(2^63) and 2^63 - 1")?; + let require_peer_connectivity = env::var("REQUIRE_PEER_CONNECTIVITY") + .context("REQUIRE_PEER_CONNECTIVITY not found in .env file.")? + .parse() + .context("REQUIRE_PEER_CONNECTIVITY must be either `true` or `false`")?; + let is_announce_logging_enabled = env::var("IS_ANNOUNCE_LOGGING_ENABLED") .context("IS_ANNOUNCE_LOGGING_ENABLED not found in .env file.")? .parse() @@ -176,6 +185,7 @@ impl Config { max_peers_per_torrent_per_user, is_connectivity_check_enabled, connectivity_check_interval, + require_peer_connectivity, is_announce_logging_enabled, reverse_proxy_client_ip_header_name, }) diff --git a/src/tracker.rs b/src/tracker.rs index 43fcaba..6190a25 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -86,7 +86,7 @@ impl Tracker { ); println!("Loading from database into memory: torrents..."); - let torrents = torrent::Map::from_db(&pool).await?; + let torrents = torrent::Map::from_db(&pool, &config).await?; println!("\x1B[1F\x1B[2KLoaded {:?} torrents", torrents.len()); println!("Loading from database into memory: infohash to torrent id mapping..."); diff --git a/src/tracker/blacklisted_port.rs b/src/tracker/blacklisted_port.rs index 66a3b39..2866495 100644 --- a/src/tracker/blacklisted_port.rs +++ b/src/tracker/blacklisted_port.rs @@ -7,7 +7,7 @@ pub struct Set(IndexSet); impl Default for Set { #[rustfmt::skip] fn default() -> Set { - let mut set = IndexSet::from([ + let set = IndexSet::from([ // SSH Port 22, // DNS queries @@ -36,11 +36,6 @@ impl Default for Set { 6699, ]); - // Block system-reserved ports since 99.9% of the time they're fake and thus not connectable - for system_reserved_port in 0..1024 { - set.insert(system_reserved_port); - } - Set(set) } } diff --git a/src/tracker/peer.rs b/src/tracker/peer.rs index 662d87d..f479b61 100644 --- a/src/tracker/peer.rs +++ b/src/tracker/peer.rs @@ -13,6 +13,8 @@ use anyhow::{Context, Result}; pub mod peer_id; pub use peer_id::PeerId; +use crate::config::Config; + #[derive(Clone, Serialize)] pub struct Map(IndexMap); @@ -31,6 +33,7 @@ pub struct Peer { pub is_seeder: bool, pub is_active: bool, pub is_visible: bool, + pub is_connectable: bool, #[serde(with = "ts_seconds")] pub updated_at: DateTime, pub uploaded: u64, @@ -40,20 +43,24 @@ pub struct Peer { impl Peer { /// Determines if the peer should be included in the peer list #[inline(always)] - pub fn is_included_in_peer_list(&self) -> bool { - self.is_active && self.is_visible + pub fn is_included_in_peer_list(&self, config: &Config) -> bool { + if config.require_peer_connectivity { + self.is_active && self.is_visible && self.is_connectable + } else { + self.is_active && self.is_visible + } } /// Determines if the peer should be included in the list of seeds #[inline(always)] - pub fn is_included_in_seed_list(&self) -> bool { - self.is_seeder && self.is_included_in_peer_list() + pub fn is_included_in_seed_list(&self, config: &Config) -> bool { + self.is_seeder && self.is_included_in_peer_list(config) } /// Determines if the peer should be included in the list of leeches #[inline(always)] - pub fn is_included_in_leech_list(&self) -> bool { - !self.is_seeder && self.is_included_in_peer_list() + pub fn is_included_in_leech_list(&self, config: &Config) -> bool { + !self.is_seeder && self.is_included_in_peer_list(config) } } @@ -73,6 +80,7 @@ impl Map { peers.seeder as `is_seeder: bool`, peers.active as `is_active: bool`, peers.visible as `is_visible: bool`, + peers.connectable as `is_connectable: bool`, peers.updated_at as `updated_at: DateTime`, peers.uploaded as `uploaded: u64`, peers.downloaded as `downloaded: u64`, @@ -97,6 +105,7 @@ impl Map { is_seeder: row.is_seeder, is_active: row.is_active, is_visible: row.is_visible, + is_connectable: row.is_connectable, updated_at: row .updated_at .expect("Peer with a null updated_at found in database."), diff --git a/src/tracker/torrent.rs b/src/tracker/torrent.rs index a5e2af6..d0bcfad 100644 --- a/src/tracker/torrent.rs +++ b/src/tracker/torrent.rs @@ -8,6 +8,7 @@ use indexmap::IndexMap; use serde::{Deserialize, Serialize}; use sqlx::MySqlPool; +use crate::config::Config; use crate::tracker::peer; use anyhow::{Context, Result}; @@ -29,7 +30,7 @@ impl Map { Map(IndexMap::new()) } - pub async fn from_db(db: &MySqlPool) -> Result { + pub async fn from_db(db: &MySqlPool, config: &Config) -> Result { let peers = peer::Map::from_db(db).await?; // First, group the peers by their torrent id. @@ -47,8 +48,8 @@ impl Map { .entry(peer.torrent_id) .and_modify(|torrent| { torrent.peers.insert(*index, *peer); - torrent.num_seeders += peer.is_included_in_seed_list() as u32; - torrent.num_leechers += peer.is_included_in_leech_list() as u32; + torrent.num_seeders += peer.is_included_in_seed_list(config) as u32; + torrent.num_leechers += peer.is_included_in_leech_list(config) as u32; }) .or_insert_with(|| { let mut peers = peer::Map::new(); @@ -56,8 +57,8 @@ impl Map { GroupedPeer { peers, - num_seeders: peer.is_included_in_seed_list() as u32, - num_leechers: peer.is_included_in_leech_list() as u32, + num_seeders: peer.is_included_in_seed_list(config) as u32, + num_leechers: peer.is_included_in_leech_list(config) as u32, } }); }); diff --git a/src/warning.rs b/src/warning.rs index c9692f2..af6ae8d 100644 --- a/src/warning.rs +++ b/src/warning.rs @@ -8,4 +8,6 @@ pub enum AnnounceWarning { RateLimitExceeded, #[error("Download slot limit reached")] HitDownloadSlotLimit, + #[error("Connectivity issue detected. Enable port-forwarding to resolve.")] + ConnectivityIssueDetected, }