Skip to content

Commit

Permalink
update: add option to require peer connectivity
Browse files Browse the repository at this point in the history
In response to arvidn/libtorrent#7708. We prefer our peers to be connectable. A significant majority of peers in prod (80+%) are already connectable. We'll see how quickly the remaining 20% bounce back.
  • Loading branch information
Roardom committed Jul 18, 2024
1 parent 3b1a642 commit 1d4466a
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 34 deletions.
7 changes: 7 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ IS_CONNECTIVITY_CHECK_ENABLED=false
# Default: 1800
CONNECTIVITY_CHECK_INTERVAL=1800

# When enabled, restrict peers to those with open ports. Peers with closed
# ports will receive empty peer lists and are not included in other returned
# peer lists. Requires `IS_CONNECTIVITY_CHECK_ENABLED` to be `true`.
#
# Default: false
REQUIRE_PEER_CONNECTIVITY=false

# Enable logging of all successful announces to the `announces` table for
# debugging. This will generate significant amounts of data. Do not
# enable if you do not know what you are doing.
Expand Down
40 changes: 24 additions & 16 deletions src/announce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,14 @@ pub async fn announce(
.get(&queries.info_hash)
.ok_or(InfoHashNotFound)?;

let is_connectable = check_connectivity(&tracker, client_ip, queries.port).await;

let mut warnings: Vec<AnnounceWarning> = Vec::new();

if !is_connectable && tracker.config.require_peer_connectivity {
warnings.push(AnnounceWarning::ConnectivityIssueDetected);
}

let (
upload_factor,
download_factor,
Expand Down Expand Up @@ -392,7 +400,6 @@ pub async fn announce(
let leecher_delta;
let times_completed_delta;
let is_visible;
let mut warnings: Vec<AnnounceWarning> = Vec::new();

if queries.event == Event::Stopped {
// Try and remove the peer
Expand All @@ -407,8 +414,8 @@ pub async fn announce(
uploaded_delta = queries.uploaded.saturating_sub(peer.uploaded);
downloaded_delta = queries.downloaded.saturating_sub(peer.downloaded);

leecher_delta = 0 - peer.is_included_in_leech_list() as i32;
seeder_delta = 0 - peer.is_included_in_seed_list() as i32;
leecher_delta = 0 - peer.is_included_in_leech_list(&tracker.config) as i32;
seeder_delta = 0 - peer.is_included_in_seed_list(&tracker.config) as i32;
} else {
// Some clients (namely transmission) will keep sending
// `stopped` events until a successful announce is received.
Expand Down Expand Up @@ -444,8 +451,9 @@ pub async fn announce(
peer.ip_address = client_ip;
peer.port = queries.port;
peer.is_seeder = queries.left == 0;
peer.is_visible =
peer.is_included_in_leech_list() || !has_hit_download_slot_limit;
peer.is_connectable = is_connectable;
peer.is_visible = peer.is_included_in_leech_list(&tracker.config)
|| !has_hit_download_slot_limit;
peer.is_active = true;
peer.updated_at = Utc::now();
peer.uploaded = queries.uploaded;
Expand All @@ -459,6 +467,7 @@ pub async fn announce(
is_seeder: queries.left == 0,
is_active: true,
is_visible: !has_hit_download_slot_limit,
is_connectable,
updated_at: Utc::now(),
uploaded: queries.uploaded,
downloaded: queries.downloaded,
Expand All @@ -475,10 +484,10 @@ pub async fn announce(
// in-memory db
match old_peer {
Some(old_peer) => {
leecher_delta = new_peer.is_included_in_leech_list() as i32
- old_peer.is_included_in_leech_list() as i32;
seeder_delta = new_peer.is_included_in_seed_list() as i32
- old_peer.is_included_in_seed_list() as i32;
leecher_delta = new_peer.is_included_in_leech_list(&tracker.config) as i32
- old_peer.is_included_in_leech_list(&tracker.config) as i32;
seeder_delta = new_peer.is_included_in_seed_list(&tracker.config) as i32
- old_peer.is_included_in_seed_list(&tracker.config) as i32;
times_completed_delta = (new_peer.is_seeder && !old_peer.is_seeder) as u32;

// Calculate change in upload and download compared to previous
Expand Down Expand Up @@ -512,7 +521,8 @@ pub async fn announce(
let mut peer_count = 0;

for &peer in torrent.peers.values() {
if peer.user_id == user_id && peer.is_included_in_peer_list() {
if peer.user_id == user_id && peer.is_included_in_peer_list(&tracker.config)
{
peer_count += 1;

if peer_count >= tracker.config.max_peers_per_torrent_per_user {
Expand All @@ -528,8 +538,8 @@ pub async fn announce(
}
}

leecher_delta = new_peer.is_included_in_leech_list() as i32;
seeder_delta = new_peer.is_included_in_seed_list() as i32;
leecher_delta = new_peer.is_included_in_leech_list(&tracker.config) as i32;
seeder_delta = new_peer.is_included_in_seed_list(&tracker.config) as i32;
times_completed_delta = 0;

// Calculate change in upload and download compared to previous
Expand Down Expand Up @@ -564,7 +574,7 @@ pub async fn announce(

// Don't return peers with the same user id or those that are marked as inactive
let valid_peers = torrent.peers.iter().filter(|(_index, peer)| {
peer.user_id != user_id && peer.is_included_in_peer_list()
peer.user_id != user_id && peer.is_included_in_peer_list(&tracker.config)
});

// Make sure leech peer lists are filled with seeds
Expand Down Expand Up @@ -743,8 +753,6 @@ pub async fn announce(
});
}

let connectable = check_connectivity(&tracker, client_ip, queries.port).await;

tracker.peer_updates.lock().upsert(PeerUpdate {
peer_id: queries.peer_id,
ip: client_ip,
Expand All @@ -759,7 +767,7 @@ pub async fn announce(
torrent_id,
user_id,
updated_at: Utc::now(),
connectable,
connectable: is_connectable,
});

tracker.history_updates.lock().upsert(HistoryUpdate {
Expand Down
10 changes: 10 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ pub struct Config {
/// The minimum number of seconds a socket's connectivity status is cached for
/// before rechecking the peer's connectivity. Use `-1` for no caching.
pub connectivity_check_interval: i64,
/// When enabled, restrict peers to those with open ports. Peers with closed
/// ports will receive empty peer lists and are not included in other returned
/// peer lists. Requires `IS_CONNECTIVITY_CHECK_ENABLED` to be `true`.
pub require_peer_connectivity: bool,
/// Enable logging of all successful announces to the `announces` table for
/// debugging. This will generate significant amounts of data. Do not
/// enable if you do not know what you are doing.
Expand Down Expand Up @@ -145,6 +149,11 @@ impl Config {
.parse()
.context("CONNECTIVITY_CHECK_INTERVAL must be a number between -(2^63) and 2^63 - 1")?;

let require_peer_connectivity = env::var("REQUIRE_PEER_CONNECTIVITY")
.context("REQUIRE_PEER_CONNECTIVITY not found in .env file.")?
.parse()
.context("REQUIRE_PEER_CONNECTIVITY must be either `true` or `false`")?;

let is_announce_logging_enabled = env::var("IS_ANNOUNCE_LOGGING_ENABLED")
.context("IS_ANNOUNCE_LOGGING_ENABLED not found in .env file.")?
.parse()
Expand Down Expand Up @@ -176,6 +185,7 @@ impl Config {
max_peers_per_torrent_per_user,
is_connectivity_check_enabled,
connectivity_check_interval,
require_peer_connectivity,
is_announce_logging_enabled,
reverse_proxy_client_ip_header_name,
})
Expand Down
2 changes: 1 addition & 1 deletion src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl Tracker {
);

println!("Loading from database into memory: torrents...");
let torrents = torrent::Map::from_db(&pool).await?;
let torrents = torrent::Map::from_db(&pool, &config).await?;
println!("\x1B[1F\x1B[2KLoaded {:?} torrents", torrents.len());

println!("Loading from database into memory: infohash to torrent id mapping...");
Expand Down
7 changes: 1 addition & 6 deletions src/tracker/blacklisted_port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub struct Set(IndexSet<u16>);
impl Default for Set {
#[rustfmt::skip]
fn default() -> Set {
let mut set = IndexSet::from([
let set = IndexSet::from([
// SSH Port
22,
// DNS queries
Expand Down Expand Up @@ -36,11 +36,6 @@ impl Default for Set {
6699,
]);

// Block system-reserved ports since 99.9% of the time they're fake and thus not connectable
for system_reserved_port in 0..1024 {
set.insert(system_reserved_port);
}

Set(set)
}
}
Expand Down
21 changes: 15 additions & 6 deletions src/tracker/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use anyhow::{Context, Result};
pub mod peer_id;
pub use peer_id::PeerId;

use crate::config::Config;

#[derive(Clone, Serialize)]
pub struct Map(IndexMap<Index, Peer>);

Expand All @@ -31,6 +33,7 @@ pub struct Peer {
pub is_seeder: bool,
pub is_active: bool,
pub is_visible: bool,
pub is_connectable: bool,
#[serde(with = "ts_seconds")]
pub updated_at: DateTime<Utc>,
pub uploaded: u64,
Expand All @@ -40,20 +43,24 @@ pub struct Peer {
impl Peer {
/// Determines if the peer should be included in the peer list
#[inline(always)]
pub fn is_included_in_peer_list(&self) -> bool {
self.is_active && self.is_visible
pub fn is_included_in_peer_list(&self, config: &Config) -> bool {
if config.require_peer_connectivity {
self.is_active && self.is_visible && self.is_connectable
} else {
self.is_active && self.is_visible
}
}

/// Determines if the peer should be included in the list of seeds
#[inline(always)]
pub fn is_included_in_seed_list(&self) -> bool {
self.is_seeder && self.is_included_in_peer_list()
pub fn is_included_in_seed_list(&self, config: &Config) -> bool {
self.is_seeder && self.is_included_in_peer_list(config)
}

/// Determines if the peer should be included in the list of leeches
#[inline(always)]
pub fn is_included_in_leech_list(&self) -> bool {
!self.is_seeder && self.is_included_in_peer_list()
pub fn is_included_in_leech_list(&self, config: &Config) -> bool {
!self.is_seeder && self.is_included_in_peer_list(config)
}
}

Expand All @@ -73,6 +80,7 @@ impl Map {
peers.seeder as `is_seeder: bool`,
peers.active as `is_active: bool`,
peers.visible as `is_visible: bool`,
peers.connectable as `is_connectable: bool`,
peers.updated_at as `updated_at: DateTime<Utc>`,
peers.uploaded as `uploaded: u64`,
peers.downloaded as `downloaded: u64`,
Expand All @@ -97,6 +105,7 @@ impl Map {
is_seeder: row.is_seeder,
is_active: row.is_active,
is_visible: row.is_visible,
is_connectable: row.is_connectable,
updated_at: row
.updated_at
.expect("Peer with a null updated_at found in database."),
Expand Down
11 changes: 6 additions & 5 deletions src/tracker/torrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
use sqlx::MySqlPool;

use crate::config::Config;
use crate::tracker::peer;

use anyhow::{Context, Result};
Expand All @@ -29,7 +30,7 @@ impl Map {
Map(IndexMap::new())
}

pub async fn from_db(db: &MySqlPool) -> Result<Map> {
pub async fn from_db(db: &MySqlPool, config: &Config) -> Result<Map> {
let peers = peer::Map::from_db(db).await?;

// First, group the peers by their torrent id.
Expand All @@ -47,17 +48,17 @@ impl Map {
.entry(peer.torrent_id)
.and_modify(|torrent| {
torrent.peers.insert(*index, *peer);
torrent.num_seeders += peer.is_included_in_seed_list() as u32;
torrent.num_leechers += peer.is_included_in_leech_list() as u32;
torrent.num_seeders += peer.is_included_in_seed_list(config) as u32;
torrent.num_leechers += peer.is_included_in_leech_list(config) as u32;
})
.or_insert_with(|| {
let mut peers = peer::Map::new();
peers.insert(*index, *peer);

GroupedPeer {
peers,
num_seeders: peer.is_included_in_seed_list() as u32,
num_leechers: peer.is_included_in_leech_list() as u32,
num_seeders: peer.is_included_in_seed_list(config) as u32,
num_leechers: peer.is_included_in_leech_list(config) as u32,
}
});
});
Expand Down
2 changes: 2 additions & 0 deletions src/warning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ pub enum AnnounceWarning {
RateLimitExceeded,
#[error("Download slot limit reached")]
HitDownloadSlotLimit,
#[error("Connectivity issue detected. Enable port-forwarding to resolve.")]
ConnectivityIssueDetected,
}

0 comments on commit 1d4466a

Please sign in to comment.