Skip to content

Commit

Permalink
[NO-ISSUE] feat/handle graceful shutdown for k8s (#154)
Browse files Browse the repository at this point in the history
* feat: handle graceful shutdown for k8s

* refactor: use 2 config env vars

* refactor: avoid waiting 30s in local dev
  • Loading branch information
GuillaumeDecMeetsMore authored Dec 2, 2024
1 parent 2bae6a5 commit 4d8d008
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 8 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 23 additions & 1 deletion bins/nittei/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use nittei_infra::setup_context;
use telemetry::init_subscriber;
#[cfg(all(target_env = "musl", target_pointer_width = "64"))]
use tikv_jemallocator::Jemalloc;
use tokio::signal;
use tracing::{error, info};

// Use Jemalloc only for musl-64 bits platforms
// The default MUSL allocator is known to be slower than Jemalloc
Expand All @@ -22,7 +24,27 @@ async fn main() -> anyhow::Result<()> {
init_subscriber()?;

let context = setup_context().await?;
let (tx, rx) = tokio::sync::oneshot::channel::<()>();

let app = Application::new(context).await?;
app.start().await

// Listen for SIGINT (Ctrl+C) to shutdown the service
// This sends a message on the channel to shutdown the server gracefully
// By doing so, it makes the app return failed status on the status API endpoint (useful for k8s)
// It waits for a configurable amount of seconds (in order for the readiness probe to fail)
// And then waits for the server to finish processing the current requests before shutting down
tokio::spawn(async move {
if let Err(e) = signal::ctrl_c().await {
error!("[main] Failed to listen for SIGINT: {}", e);
}
info!("[shutdown] Received SIGINT, sending event on channel...");
let _ = tx.send(());
});

// Start the application and block until it finishes
app.start(rx).await?;

info!("[shutdown] shutdown complete");

Ok(())
}
4 changes: 3 additions & 1 deletion bins/nittei/tests/helpers/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ pub async fn spawn_app() -> (TestApp, NitteiSDK, String) {

let address = format!("http://localhost:{}", application.port());

let (_, rx) = tokio::sync::oneshot::channel::<()>();

// Allow underscore future because it needs to run in background
// If we `await` it, the tests will hang
#[allow(clippy::let_underscore_future)]
let _ = actix_web::rt::spawn(async move {
application
.start()
.start(rx)
.await
.expect("Expected application to start");
});
Expand Down
1 change: 1 addition & 0 deletions crates/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ chrono-tz = "0.8.1"
anyhow = "1.0"
jsonwebtoken = "7"
thiserror = "1.0"
tokio = { version = "1.0", features = ["full"] }
tracing = "0.1.25"
tracing-actix-web = { version = "0.7.11", features = ["opentelemetry_0_23"] }
tracing-futures = "0.2.5"
Expand Down
89 changes: 85 additions & 4 deletions crates/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod shared;
mod status;
mod user;

use std::net::TcpListener;
use std::{net::TcpListener, sync::Arc};

use actix_cors::Cors;
use actix_web::{
Expand All @@ -20,6 +20,7 @@ use actix_web::{
App,
HttpServer,
};
use futures::lock::Mutex;
use http_logger::NitteiTracingRootSpanBuilder;
use job_schedulers::{start_reminder_generation_job, start_send_reminders_job};
use nittei_domain::{
Expand Down Expand Up @@ -54,18 +55,36 @@ pub struct Application {
port: u16,
/// The application context (database connections, etc.)
context: NitteiContext,

/// Shutdown data
/// Shared state of the server
shared_state: ServerSharedState,
}

/// Struct for storing the shared state of the server
/// Mainly useful for sharing the shutdown flag between the binary crate and the status endpoint
#[derive(Clone)]
pub struct ServerSharedState {
/// Flag to indicate if the application is shutting down
pub is_shutting_down: Arc<Mutex<bool>>,
}

impl Application {
pub async fn new(context: NitteiContext) -> anyhow::Result<Self> {
let (server, port) = Application::configure_server(context.clone()).await?;
let shared_state = ServerSharedState {
is_shutting_down: Arc::new(Mutex::new(false)),
};

let (server, port) =
Application::configure_server(context.clone(), shared_state.clone()).await?;

Application::start_jobs(context.clone());

Ok(Self {
server,
port,
context,
shared_state,
})
}

Expand All @@ -89,7 +108,10 @@ impl Application {
/// - CORS (permissive)
/// - Compression
/// - Tracing logger
async fn configure_server(context: NitteiContext) -> anyhow::Result<(Server, u16)> {
async fn configure_server(
context: NitteiContext,
shared_state: ServerSharedState,
) -> anyhow::Result<(Server, u16)> {
let port = context.config.port;
let address = nittei_utils::config::APP_CONFIG.http_host.clone();
let address_and_port = format!("{}:{}", address, port);
Expand All @@ -99,14 +121,22 @@ impl Application {

let server = HttpServer::new(move || {
let ctx = context.clone();
let shared_state = shared_state.clone();

App::new()
.wrap(Cors::permissive())
.wrap(middleware::Compress::default())
.wrap(TracingLogger::<NitteiTracingRootSpanBuilder>::new())
.app_data(Data::new(ctx))
.app_data(Data::new(shared_state))
.service(web::scope("/api/v1").configure(configure_server_api))
})
// Disable signals to avoid conflicts with the signal handler
// This is handled by the signal handler in the binary and the `schedule_shutdown` function
.disable_signals()
// Set the shutdown timeout (time to wait for the server to finish processing requests)
// Default is 30 seconds
.shutdown_timeout(nittei_utils::config::APP_CONFIG.server_shutdown_timeout)
.listen(listener)?
.workers(4)
.run();
Expand All @@ -115,7 +145,14 @@ impl Application {
}

/// Init the default account and start the Actix server
pub async fn start(self) -> anyhow::Result<()> {
///
/// It also sets up the shutdown handler
pub async fn start(
self,
shutdown_channel: tokio::sync::oneshot::Receiver<()>,
) -> anyhow::Result<()> {
self.setup_shutdown_handler(shutdown_channel);

self.init_default_account().await?;
self.server.await.map_err(|e| anyhow::anyhow!(e))
}
Expand Down Expand Up @@ -274,4 +311,48 @@ impl Application {
};
Ok(())
}

/// Setup the shutdown handler
fn setup_shutdown_handler(&self, shutdown_channel: tokio::sync::oneshot::Receiver<()>) {
let server_handle = self.server.handle();
let shared_state = self.shared_state.clone();

// Listen to shutdown channel
tokio::spawn(async move {
// Wait for the shutdown channel to receive a message
if let Err(e) = shutdown_channel.await {
error!("[server] Failed to listen for shutdown channel: {}", e);
} else {
info!("[server] Received shutdown signal",);

if cfg!(debug_assertions) {
// In debug mode, stop the server immediately
info!("[server] Stopping server...");
server_handle.stop(true).await;
info!("[server] Server stopped");
} else {
// In production, do the whole graceful shutdown process

// Update flag
*shared_state.is_shutting_down.lock().await = true;

info!("[server] is_shutting_down flag is now true");

let duration = nittei_utils::config::APP_CONFIG.server_shutdown_sleep;

info!("[server] Waiting {}s before stopping", duration);

// Wait for the timeout
tokio::time::sleep(std::time::Duration::from_secs(duration)).await;

info!("[server] Stopping server...");

// Shutdown the server
server_handle.stop(true).await;

info!("[server] Server stopped");
}
}
});
}
}
15 changes: 14 additions & 1 deletion crates/api/src/status/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,21 @@ use actix_web::{web, HttpResponse};
use nittei_api_structs::get_service_health::*;
use nittei_infra::NitteiContext;

use crate::ServerSharedState;

/// Get the status of the service
async fn status(ctx: web::Data<NitteiContext>) -> HttpResponse {
async fn status(
ctx: web::Data<NitteiContext>,
shared_state: web::Data<ServerSharedState>,
) -> HttpResponse {
let is_shutting_down = shared_state.is_shutting_down.lock().await;

if *is_shutting_down {
return HttpResponse::ServiceUnavailable().json(APIResponse {
message: "Service is shutting down".into(),
});
}

match ctx.repos.status.check_connection().await {
Ok(_) => HttpResponse::Ok().json(APIResponse {
message: "Ok!\r\n".into(),
Expand Down
14 changes: 14 additions & 0 deletions crates/utils/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ pub struct AppConfig {
/// Env var: NITTEI__HTTP_PORT
pub http_port: usize,

/// The sleep time for the HTTP server shutdown (in seconds)
/// Default is 25 seconds
/// Env var: NITTEI__SERVER_SHUTDOWN_SLEEP
pub server_shutdown_sleep: u64,

/// The shutdown timeout for the HTTP server (in seconds)
/// Default is 5 seconds
/// Env var: NITTEI__SERVER_SHUTDOWN_TIMEOUT
pub server_shutdown_timeout: u64,

/// The database URL
/// Default is postgresql://postgres:postgres@localhost:45432/nittei
/// Env var: NITTEI__DATABASE_URL
Expand Down Expand Up @@ -139,6 +149,10 @@ fn parse_config() -> AppConfig {
.expect("Failed to set default host")
.set_default("http_port", "5000")
.expect("Failed to set default port")
.set_default("server_shutdown_sleep", "25")
.expect("Failed to set default server_shutdown_sleep")
.set_default("server_shutdown_timeout", "5")
.expect("Failed to set default server_shutdown_timeout")
.set_default("skip_db_migrations", false)
.expect("Failed to set default skip_db_migrations")
.set_default("enable_reminders_job", false)
Expand Down

0 comments on commit 4d8d008

Please sign in to comment.