Skip to content

Commit

Permalink
chore(test): don't use spinloop when testing
Browse files Browse the repository at this point in the history
GitHub Actions doesn't allocate enough resource to do spinloop

Signed-off-by: Wataru Ishida <wataru.ishid@gmail.com>
  • Loading branch information
ishidawataru committed Mar 26, 2024
1 parent 439a98d commit d60d65c
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 17 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ FROM optcast AS unittest

ENV RUST_LOG=info
ENV NCCL_SOCKET_IFNAME=lo
RUN cd reduction_server && cargo test --all -- --nocapture --test-threads=1
RUN cd reduction_server && RUSTFLAGS="--cfg no_spinloop" cargo test --all -- --nocapture --test-threads=1

FROM nvcr.io/nvidia/cuda:12.3.1-devel-ubuntu22.04 AS final

Expand Down
13 changes: 13 additions & 0 deletions reduction_server/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* See LICENSE for license information
*/

use std::hint;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
Expand Down Expand Up @@ -60,6 +61,12 @@ fn do_client<T: Float>(args: &Args, comms: Vec<(Comm, Comm)>) {
let start = std::time::Instant::now();

loop {
if cfg!(no_spinloop) {
std::thread::sleep(NO_SPINLOOP_INTERVAL);
} else {
hint::spin_loop();
}

for (i, req, sbuf, rbuf, mhs) in reqs.iter_mut() {
if req.is_none() && reqed < args.try_count {
*req = Some(
Expand All @@ -72,6 +79,12 @@ fn do_client<T: Float>(args: &Args, comms: Vec<(Comm, Comm)>) {
let mut rrequest: Option<Request> = None;

loop {
if cfg!(no_spinloop) {
std::thread::sleep(NO_SPINLOOP_INTERVAL);
} else {
hint::spin_loop();
}

if srequest.is_none() {
srequest = nccl_net::isend(
scomm,
Expand Down
78 changes: 64 additions & 14 deletions reduction_server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

use half::{bf16, f16};
use log::{info, trace, warn, error};
use log::{error, info, trace, warn};

use crate::reduce::{Reduce, WorkingMemory};
use crate::utils::*;
Expand Down Expand Up @@ -103,7 +103,11 @@ fn reduce_loop<T: Float>(
trace!("rank({})/job({}) reduce wait recv", i, job_idx);

loop {
hint::spin_loop();
if cfg!(no_spinloop) {
std::thread::sleep(NO_SPINLOOP_INTERVAL);
} else {
hint::spin_loop();
}
let send_ready = send_ready.load(std::sync::atomic::Ordering::Relaxed);
let send_expect = (1 << args.send_threads) - 1;
let recv_ready = recv_ready.load(std::sync::atomic::Ordering::Relaxed);
Expand Down Expand Up @@ -213,7 +217,11 @@ fn send_loop<T: Float>(
for (idx, (readys, send)) in sends.iter().enumerate().cycle() {
for ready in readys.iter() {
loop {
hint::spin_loop();
if cfg!(no_spinloop) {
std::thread::sleep(NO_SPINLOOP_INTERVAL);
} else {
hint::spin_loop();
}
let ready = ready.load(std::sync::atomic::Ordering::Relaxed);
// trace!(
// "[send] rank({})/job({}) send ready: 0b{:016b}",
Expand All @@ -235,7 +243,11 @@ fn send_loop<T: Float>(

let mut reqs = vec_of_none(send.len());
loop {
hint::spin_loop();
if cfg!(no_spinloop) {
std::thread::sleep(NO_SPINLOOP_INTERVAL);
} else {
hint::spin_loop();
}
if rank.load(std::sync::atomic::Ordering::Relaxed) != nrank {
warn!("rank != nrank");
warn!("send thread({}) exit.", i);
Expand All @@ -260,7 +272,12 @@ fn send_loop<T: Float>(
let start = std::time::Instant::now();

loop {
hint::spin_loop();
if cfg!(no_spinloop) {
std::thread::sleep(NO_SPINLOOP_INTERVAL);
} else {
hint::spin_loop();
}

if rank.load(std::sync::atomic::Ordering::Relaxed) != nrank {
warn!("rank != nrank");
warn!("send thread({}) exit.", i);
Expand Down Expand Up @@ -360,7 +377,11 @@ fn recv_loop<T: Float>(
for (job_idx, (readys, recv)) in recvs.iter_mut().enumerate() {
for ready in readys.iter() {
loop {
hint::spin_loop();
if cfg!(no_spinloop) {
std::thread::sleep(NO_SPINLOOP_INTERVAL);
} else {
hint::spin_loop();
}
let ready = ready.load(std::sync::atomic::Ordering::Relaxed);
// trace!(
// "[recv] rank({})/job({}) recv ready: 0b{:016b}",
Expand All @@ -382,7 +403,11 @@ fn recv_loop<T: Float>(

let mut reqs = vec_of_none(recv.len());
loop {
hint::spin_loop();
if cfg!(no_spinloop) {
std::thread::sleep(NO_SPINLOOP_INTERVAL);
} else {
hint::spin_loop();
}
if rank.load(std::sync::atomic::Ordering::Relaxed) != nrank {
warn!("rank != nrank");
warn!("recv thread({}) exit.", i);
Expand All @@ -408,7 +433,12 @@ fn recv_loop<T: Float>(
let start = std::time::Instant::now();

loop {
hint::spin_loop();
if cfg!(no_spinloop) {
std::thread::sleep(NO_SPINLOOP_INTERVAL);
} else {
hint::spin_loop();
}

if rank.load(std::sync::atomic::Ordering::Relaxed) != nrank {
warn!("rank != nrank");
warn!("recv thread({}) exit.", i);
Expand Down Expand Up @@ -522,7 +552,12 @@ fn upstream_loop<T: Float>(
for (idx, (send_ready, reduce_readys, buf)) in jobs.iter_mut().enumerate() {
for reduce_ready in reduce_readys.iter() {
loop {
hint::spin_loop();
if cfg!(no_spinloop) {
std::thread::sleep(NO_SPINLOOP_INTERVAL);
} else {
hint::spin_loop();
}

let reduce_ready = reduce_ready.load(std::sync::atomic::Ordering::Relaxed);
if reduce_ready == 0 {
break;
Expand All @@ -536,7 +571,11 @@ fn upstream_loop<T: Float>(
}

loop {
hint::spin_loop();
if cfg!(no_spinloop) {
std::thread::sleep(NO_SPINLOOP_INTERVAL);
} else {
hint::spin_loop();
}
let send_ready = send_ready.load(std::sync::atomic::Ordering::Relaxed);
let send_expect = (1 << args.send_threads) - 1;
if send_ready == send_expect {
Expand All @@ -554,15 +593,20 @@ fn upstream_loop<T: Float>(
let mut rrequest: Option<Request> = None;

loop {
if cfg!(no_spinloop) {
std::thread::sleep(NO_SPINLOOP_INTERVAL);
} else {
hint::spin_loop();
}

if srequest.is_none() {
srequest = nccl_net::isend(&scomm, send_mh, buf.lock().as_ref(), tag).unwrap();
if srequest.is_some() {
trace!("upstream send : idx: {} start", idx);
}
}
if rrequest.is_none() {
rrequest =
nccl_net::irecv(&rcomm, recv_mh, buf.lock().as_mut(), tag).unwrap();
rrequest = nccl_net::irecv(&rcomm, recv_mh, buf.lock().as_mut(), tag).unwrap();
if srequest.is_some() {
trace!("upstream recv : idx: {} start", idx);
}
Expand All @@ -573,6 +617,12 @@ fn upstream_loop<T: Float>(
}

loop {
if cfg!(no_spinloop) {
std::thread::sleep(NO_SPINLOOP_INTERVAL);
} else {
hint::spin_loop();
}

if srequest.is_some() {
match nccl_net::test(&srequest.as_ref().unwrap()) {
Ok((send_done, _)) => {
Expand All @@ -583,7 +633,7 @@ fn upstream_loop<T: Float>(
}
Err(e) => {
error!("upstream send : idx: {} error: {:?}", idx, e);
return
return;
}
}
}
Expand All @@ -597,7 +647,7 @@ fn upstream_loop<T: Float>(
}
Err(e) => {
error!("upstream recv : idx: {} error: {:?}", idx, e);
return
return;
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions reduction_server/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ use std::fmt::Debug;
use std::time::Duration;

use clap::{Parser, ValueEnum};
use half::{f16, bf16};
use half::{bf16, f16};
use log::info;
use num_traits::FromPrimitive;

pub(crate) const NO_SPINLOOP_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);

pub(crate) fn transpose<T>(v: Vec<Vec<T>>) -> Vec<Vec<T>> {
assert!(!v.is_empty());
let len = v[0].len();
Expand Down Expand Up @@ -140,8 +142,8 @@ pub(crate) fn vec_of_none<T>(n: usize) -> Vec<Option<T>> {

#[cfg(test)]
pub mod tests {
use std::sync::Once;
use crate::nccl_net;
use std::sync::Once;

static INIT: Once = Once::new();

Expand Down

0 comments on commit d60d65c

Please sign in to comment.