From 439a98d6febddd79bce1017f97d0ca35db131552 Mon Sep 17 00:00:00 2001 From: Wataru Ishida Date: Tue, 26 Mar 2024 07:00:24 +0000 Subject: [PATCH 1/2] feat: initial aggregation tree support use `--upstream` to specify a parent in the tree Signed-off-by: Wataru Ishida --- reduction_server/src/server.rs | 279 ++++++++++++++++++++++++++++++++- reduction_server/src/utils.rs | 3 + 2 files changed, 278 insertions(+), 4 deletions(-) diff --git a/reduction_server/src/server.rs b/reduction_server/src/server.rs index 0cc8f14..654e407 100644 --- a/reduction_server/src/server.rs +++ b/reduction_server/src/server.rs @@ -7,18 +7,18 @@ use std::collections::HashMap; use std::hint; use std::io::{Read, Write}; -use std::net::TcpListener; +use std::net::{TcpListener, TcpStream}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use half::{bf16, f16}; -use log::{info, trace, warn}; +use log::{info, trace, warn, error}; use crate::reduce::{Reduce, WorkingMemory}; use crate::utils::*; use crate::nccl_net; -use crate::nccl_net::Comm; +use crate::nccl_net::{Comm, Request}; use crate::partitioned_vec::PartitionedVec; @@ -446,6 +446,178 @@ fn recv_loop( } } +fn upstream_loop( + args: &Args, + rank: &AtomicUsize, + mut jobs: Vec<( + Arc, + Vec>, + Arc>, + )>, +) { + let nrank = args.nrank; + + info!("connecting to upstream {}", args.upstream); + let mut stream = loop { + let res = TcpStream::connect(&args.upstream); + if res.is_ok() { + break res.unwrap(); + } + // sleep 1s + std::thread::sleep(std::time::Duration::from_secs(1)); + }; + + let mut buffer = [0u8; 4]; + stream.read(buffer.as_mut()).unwrap(); + let size = u32::from_le_bytes(buffer); + let mut handle = vec![0u8; size as usize]; + stream.read(handle.as_mut()).unwrap(); + + let (lcomm, lhandle) = nccl_net::listen().unwrap(); + + // send size of handle + let size = lhandle.len() as u32; + stream.write_all(&size.to_le_bytes()).unwrap(); + // send handle + stream.write_all(&lhandle).unwrap(); + + let mut scomm: Option = None; + let mut rcomm: Option = None; + + loop { + if scomm.is_none() { + scomm = nccl_net::connect(handle.as_slice()).unwrap(); + } + if rcomm.is_none() { + rcomm = nccl_net::accept(&lcomm).unwrap(); + } + if scomm.is_some() && rcomm.is_some() { + break; + } + } + + let scomm = scomm.unwrap(); + let rcomm = rcomm.unwrap(); + + let mhs = jobs + .iter() + .map(|(_, _, buf)| { + let send_mh = nccl_net::reg_mr(&scomm, &buf.lock()).unwrap(); + let recv_mh = nccl_net::reg_mr(&rcomm, &buf.lock()).unwrap(); + (send_mh, recv_mh) + }) + .collect::>(); + + loop { + if rank.load(std::sync::atomic::Ordering::Relaxed) == args.nrank { + break; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + + info!("upstream connected"); + let tag = 0x69; + + loop { + for (idx, (send_ready, reduce_readys, buf)) in jobs.iter_mut().enumerate() { + for reduce_ready in reduce_readys.iter() { + loop { + hint::spin_loop(); + let reduce_ready = reduce_ready.load(std::sync::atomic::Ordering::Relaxed); + if reduce_ready == 0 { + break; + } + if rank.load(std::sync::atomic::Ordering::Relaxed) != nrank { + warn!("rank != nrank"); + warn!("upstream thread({}) exit.", 0); + return; + } + } + } + + loop { + hint::spin_loop(); + let send_ready = send_ready.load(std::sync::atomic::Ordering::Relaxed); + let send_expect = (1 << args.send_threads) - 1; + if send_ready == send_expect { + break; + } + if rank.load(std::sync::atomic::Ordering::Relaxed) != nrank { + warn!("rank != nrank"); + warn!("upstream thread({}) exit.", 0); + return; + } + } + + let (send_mh, recv_mh) = &mhs[idx]; + let mut srequest: Option = None; + let mut rrequest: Option = None; + + loop { + if srequest.is_none() { + srequest = nccl_net::isend(&scomm, send_mh, buf.lock().as_ref(), tag).unwrap(); + if srequest.is_some() { + trace!("upstream send : idx: {} start", idx); + } + } + if rrequest.is_none() { + rrequest = + nccl_net::irecv(&rcomm, recv_mh, buf.lock().as_mut(), tag).unwrap(); + if srequest.is_some() { + trace!("upstream recv : idx: {} start", idx); + } + } + if srequest.is_some() && rrequest.is_some() { + break; + } + } + + loop { + if srequest.is_some() { + match nccl_net::test(&srequest.as_ref().unwrap()) { + Ok((send_done, _)) => { + if send_done { + trace!("upstream send : idx: {} done", idx); + srequest = None; + } + } + Err(e) => { + error!("upstream send : idx: {} error: {:?}", idx, e); + return + } + } + } + if rrequest.is_some() { + match nccl_net::test(&rrequest.as_ref().unwrap()) { + Ok((recv_done, _)) => { + if recv_done { + trace!("upstream recv : idx: {} done", idx); + rrequest = None; + } + } + Err(e) => { + error!("upstream recv : idx: {} error: {:?}", idx, e); + return + } + } + } + if srequest.is_none() && rrequest.is_none() { + break; + } + } + + for reduce_ready in reduce_readys.iter_mut() { + reduce_ready.store( + (1 << args.send_threads) - 1, + std::sync::atomic::Ordering::Relaxed, + ); + } + + send_ready.store(0, std::sync::atomic::Ordering::Relaxed); + } + } +} + fn do_server(args: Args) { let mut args = args; @@ -514,7 +686,7 @@ fn do_server(args: Args) { }) .collect::>(); - // transpose readys[job][thread] + // transpose readys[reduce_threads][reduce_jobs] to readys[reduce_jobs][reduce_threads] let (send_readys, recv_readys): (Vec<_>, Vec<_>) = (0..args.reduce_jobs) .map(|i| { (0..args.reduce_threads) @@ -523,6 +695,31 @@ fn do_server(args: Args) { }) .unzip(); + let send_readys = if args.upstream.is_empty() { + send_readys + } else { + // launch upstream thread + let args = Arc::clone(&args); + let jobs = send_readys + .into_iter() + .enumerate() + .map(|(i, send_ready)| { + let ready = Arc::new(AtomicUsize::new((1 << args.send_threads) - 1)); + let (sbuf, _) = &bufs[i]; + (ready, send_ready, Arc::clone(sbuf)) + }) + .collect::>(); + + let readys = jobs + .iter() + .map(|(ready, _, _)| vec![Arc::clone(ready)]) + .collect::>(); + + let rank = Arc::clone(&rank); + std::thread::spawn(move || upstream_loop(&args, &rank, jobs)); + readys + }; + // launch send threads let send_chs = (0..args.send_threads) .map(|send_idx| { @@ -646,6 +843,80 @@ mod tests { server.join().unwrap(); } + fn do_test_upstream(dt: &str) { + initialize(); + let nrank = 2; + let root = { + let dt = dt.to_string(); + std::thread::spawn(move || { + let nrank = format!("{}", nrank); + let args = Args::parse_from([ + "--verbose", // doesn't work without specifying a flag that doesn't take an argument + "--port", + "8080", + "--data-type", + &dt, + "--nrank", + &nrank, + ]); + server(args); + }) + }; + (0..nrank) + .map(|i| { + let port = format!("{}", 8081 + i); + let parent = { + let dt = dt.to_string(); + let port = port.to_string(); + std::thread::spawn(move || { + let nrank = format!("{}", nrank); + let args = Args::parse_from([ + "--verbose", // doesn't work without specifying a flag that doesn't take an argument + "--upstream", + "localhost:8080", + "--port", + &port, + "--data-type", + &dt, + "--nrank", + &nrank, + ]); + server(args); + }) + }; + let children = (0..nrank).map(move |_| { + let dt = dt.to_string(); + let port = port.to_string(); + std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(100)); + let address = format!("127.0.0.1:{}", port); + let args = Args::parse_from([ + "--client", + "--address", + &address, + "--data-type", + &dt, + "--nreq", + "1", // when using socket plugin, concurrent recv/send requests doesn't work + ]); + client(args); + }) + }); + vec![parent].into_iter().chain(children) + }) + .flatten() + .collect::>() + .into_iter() + .for_each(|h| h.join().unwrap()); + + root.join().unwrap(); + } + + #[test] + fn test_server_with_upstream_f32() { + do_test_upstream("f32"); + } + #[test] fn test_server_f32() { do_test("f32"); diff --git a/reduction_server/src/utils.rs b/reduction_server/src/utils.rs index 829c4a8..dc4ab7d 100644 --- a/reduction_server/src/utils.rs +++ b/reduction_server/src/utils.rs @@ -50,6 +50,9 @@ pub(crate) struct Args { #[arg(short, long, default_value = "0.0.0.0")] pub address: String, + #[arg(long, default_value = "")] + pub upstream: String, + #[arg(long, default_value = "1048576")] pub count: usize, From e77c072b1fbfdab21f86d4499a78989c8b34d598 Mon Sep 17 00:00:00 2001 From: Wataru Ishida Date: Tue, 26 Mar 2024 07:50:13 +0000 Subject: [PATCH 2/2] chore(test): don't use spinloop when testing GitHub Actions doesn't allocate enough resource to do spinloop Signed-off-by: Wataru Ishida --- Dockerfile | 1 + reduction_server/src/client.rs | 13 ++++++ reduction_server/src/server.rs | 78 ++++++++++++++++++++++++++++------ reduction_server/src/utils.rs | 6 ++- 4 files changed, 82 insertions(+), 16 deletions(-) diff --git a/Dockerfile b/Dockerfile index 7544194..c554b6a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -48,6 +48,7 @@ FROM optcast AS unittest ENV RUST_LOG=info ENV NCCL_SOCKET_IFNAME=lo +ENV RUSTFLAGS="--cfg no_spinloop" RUN cd reduction_server && cargo test --all -- --nocapture --test-threads=1 FROM nvcr.io/nvidia/cuda:12.3.1-devel-ubuntu22.04 AS final diff --git a/reduction_server/src/client.rs b/reduction_server/src/client.rs index c24b22d..e3d3b68 100644 --- a/reduction_server/src/client.rs +++ b/reduction_server/src/client.rs @@ -4,6 +4,7 @@ * See LICENSE for license information */ +use std::hint; use std::io::{Read, Write}; use std::net::{TcpListener, TcpStream}; use std::sync::Arc; @@ -60,6 +61,12 @@ fn do_client(args: &Args, comms: Vec<(Comm, Comm)>) { let start = std::time::Instant::now(); loop { + if cfg!(no_spinloop) { + std::thread::sleep(NO_SPINLOOP_INTERVAL); + } else { + hint::spin_loop(); + } + for (i, req, sbuf, rbuf, mhs) in reqs.iter_mut() { if req.is_none() && reqed < args.try_count { *req = Some( @@ -72,6 +79,12 @@ fn do_client(args: &Args, comms: Vec<(Comm, Comm)>) { let mut rrequest: Option = None; loop { + if cfg!(no_spinloop) { + std::thread::sleep(NO_SPINLOOP_INTERVAL); + } else { + hint::spin_loop(); + } + if srequest.is_none() { srequest = nccl_net::isend( scomm, diff --git a/reduction_server/src/server.rs b/reduction_server/src/server.rs index 654e407..ba59864 100644 --- a/reduction_server/src/server.rs +++ b/reduction_server/src/server.rs @@ -12,7 +12,7 @@ use std::sync::atomic::AtomicUsize; use std::sync::Arc; use half::{bf16, f16}; -use log::{info, trace, warn, error}; +use log::{error, info, trace, warn}; use crate::reduce::{Reduce, WorkingMemory}; use crate::utils::*; @@ -103,7 +103,11 @@ fn reduce_loop( trace!("rank({})/job({}) reduce wait recv", i, job_idx); loop { - hint::spin_loop(); + if cfg!(no_spinloop) { + std::thread::sleep(NO_SPINLOOP_INTERVAL); + } else { + hint::spin_loop(); + } let send_ready = send_ready.load(std::sync::atomic::Ordering::Relaxed); let send_expect = (1 << args.send_threads) - 1; let recv_ready = recv_ready.load(std::sync::atomic::Ordering::Relaxed); @@ -213,7 +217,11 @@ fn send_loop( for (idx, (readys, send)) in sends.iter().enumerate().cycle() { for ready in readys.iter() { loop { - hint::spin_loop(); + if cfg!(no_spinloop) { + std::thread::sleep(NO_SPINLOOP_INTERVAL); + } else { + hint::spin_loop(); + } let ready = ready.load(std::sync::atomic::Ordering::Relaxed); // trace!( // "[send] rank({})/job({}) send ready: 0b{:016b}", @@ -235,7 +243,11 @@ fn send_loop( let mut reqs = vec_of_none(send.len()); loop { - hint::spin_loop(); + if cfg!(no_spinloop) { + std::thread::sleep(NO_SPINLOOP_INTERVAL); + } else { + hint::spin_loop(); + } if rank.load(std::sync::atomic::Ordering::Relaxed) != nrank { warn!("rank != nrank"); warn!("send thread({}) exit.", i); @@ -260,7 +272,12 @@ fn send_loop( let start = std::time::Instant::now(); loop { - hint::spin_loop(); + if cfg!(no_spinloop) { + std::thread::sleep(NO_SPINLOOP_INTERVAL); + } else { + hint::spin_loop(); + } + if rank.load(std::sync::atomic::Ordering::Relaxed) != nrank { warn!("rank != nrank"); warn!("send thread({}) exit.", i); @@ -360,7 +377,11 @@ fn recv_loop( for (job_idx, (readys, recv)) in recvs.iter_mut().enumerate() { for ready in readys.iter() { loop { - hint::spin_loop(); + if cfg!(no_spinloop) { + std::thread::sleep(NO_SPINLOOP_INTERVAL); + } else { + hint::spin_loop(); + } let ready = ready.load(std::sync::atomic::Ordering::Relaxed); // trace!( // "[recv] rank({})/job({}) recv ready: 0b{:016b}", @@ -382,7 +403,11 @@ fn recv_loop( let mut reqs = vec_of_none(recv.len()); loop { - hint::spin_loop(); + if cfg!(no_spinloop) { + std::thread::sleep(NO_SPINLOOP_INTERVAL); + } else { + hint::spin_loop(); + } if rank.load(std::sync::atomic::Ordering::Relaxed) != nrank { warn!("rank != nrank"); warn!("recv thread({}) exit.", i); @@ -408,7 +433,12 @@ fn recv_loop( let start = std::time::Instant::now(); loop { - hint::spin_loop(); + if cfg!(no_spinloop) { + std::thread::sleep(NO_SPINLOOP_INTERVAL); + } else { + hint::spin_loop(); + } + if rank.load(std::sync::atomic::Ordering::Relaxed) != nrank { warn!("rank != nrank"); warn!("recv thread({}) exit.", i); @@ -522,7 +552,12 @@ fn upstream_loop( for (idx, (send_ready, reduce_readys, buf)) in jobs.iter_mut().enumerate() { for reduce_ready in reduce_readys.iter() { loop { - hint::spin_loop(); + if cfg!(no_spinloop) { + std::thread::sleep(NO_SPINLOOP_INTERVAL); + } else { + hint::spin_loop(); + } + let reduce_ready = reduce_ready.load(std::sync::atomic::Ordering::Relaxed); if reduce_ready == 0 { break; @@ -536,7 +571,11 @@ fn upstream_loop( } loop { - hint::spin_loop(); + if cfg!(no_spinloop) { + std::thread::sleep(NO_SPINLOOP_INTERVAL); + } else { + hint::spin_loop(); + } let send_ready = send_ready.load(std::sync::atomic::Ordering::Relaxed); let send_expect = (1 << args.send_threads) - 1; if send_ready == send_expect { @@ -554,6 +593,12 @@ fn upstream_loop( let mut rrequest: Option = None; loop { + if cfg!(no_spinloop) { + std::thread::sleep(NO_SPINLOOP_INTERVAL); + } else { + hint::spin_loop(); + } + if srequest.is_none() { srequest = nccl_net::isend(&scomm, send_mh, buf.lock().as_ref(), tag).unwrap(); if srequest.is_some() { @@ -561,8 +606,7 @@ fn upstream_loop( } } if rrequest.is_none() { - rrequest = - nccl_net::irecv(&rcomm, recv_mh, buf.lock().as_mut(), tag).unwrap(); + rrequest = nccl_net::irecv(&rcomm, recv_mh, buf.lock().as_mut(), tag).unwrap(); if srequest.is_some() { trace!("upstream recv : idx: {} start", idx); } @@ -573,6 +617,12 @@ fn upstream_loop( } loop { + if cfg!(no_spinloop) { + std::thread::sleep(NO_SPINLOOP_INTERVAL); + } else { + hint::spin_loop(); + } + if srequest.is_some() { match nccl_net::test(&srequest.as_ref().unwrap()) { Ok((send_done, _)) => { @@ -583,7 +633,7 @@ fn upstream_loop( } Err(e) => { error!("upstream send : idx: {} error: {:?}", idx, e); - return + return; } } } @@ -597,7 +647,7 @@ fn upstream_loop( } Err(e) => { error!("upstream recv : idx: {} error: {:?}", idx, e); - return + return; } } } diff --git a/reduction_server/src/utils.rs b/reduction_server/src/utils.rs index dc4ab7d..bf068fd 100644 --- a/reduction_server/src/utils.rs +++ b/reduction_server/src/utils.rs @@ -8,10 +8,12 @@ use std::fmt::Debug; use std::time::Duration; use clap::{Parser, ValueEnum}; -use half::{f16, bf16}; +use half::{bf16, f16}; use log::info; use num_traits::FromPrimitive; +pub(crate) const NO_SPINLOOP_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100); + pub(crate) fn transpose(v: Vec>) -> Vec> { assert!(!v.is_empty()); let len = v[0].len(); @@ -140,8 +142,8 @@ pub(crate) fn vec_of_none(n: usize) -> Vec> { #[cfg(test)] pub mod tests { - use std::sync::Once; use crate::nccl_net; + use std::sync::Once; static INIT: Once = Once::new();