Skip to content

Commit

Permalink
refactor: more explicit error confirmation for http query route error.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed May 27, 2024
1 parent 8b615a1 commit 08ca16f
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 55 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions scripts/ci/deploy/config/databend-query-node-1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ auth_type = "no_password"
# name = "admin"
# auth_type = "no_password"

# [[query.users]]
# name = "databend"
# auth_type = "double_sha1_password"
# # echo -n "databend" | sha1sum | cut -d' ' -f1 | xxd -r -p | sha1sum
# auth_string = "3081f32caef285c232d066033c89a78d88a6d8a5"
[[query.users]]
name = "databend"
auth_type = "double_sha1_password"
# echo -n "databend" | sha1sum | cut -d' ' -f1 | xxd -r -p | sha1sum
auth_string = "3081f32caef285c232d066033c89a78d88a6d8a5"

# [[query.users]]
# name = "datafuselabs"
Expand Down
1 change: 1 addition & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ rustls = "0.22"
rustls-pemfile = "2"
rustls-pki-types = "1"
rustyline = "14"
semver = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_urlencoded = "0.7.1"
Expand Down
78 changes: 42 additions & 36 deletions src/query/service/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,20 @@ const HEADER_QUERY_ID: &str = "X-DATABEND-QUERY-ID";
const HEADER_QUERY_STATE: &str = "X-DATABEND-QUERY-STATE";
const HEADER_QUERY_PAGE_ROWS: &str = "X-DATABEND-QUERY-PAGE-ROWS";

pub fn make_page_uri(query_id: &str, page_no: usize) -> String {
format!("/v1/query/{}/page/{}", query_id, page_no)
pub fn make_page_uri(query_id: &str, node_id: &str, page_no: usize) -> String {
format!("/v1/query/{}/{}/page/{}", query_id, node_id, page_no)
}

pub fn make_state_uri(query_id: &str) -> String {
format!("/v1/query/{}", query_id)
pub fn make_state_uri(query_id: &str, node_id: &str) -> String {
format!("/v1/query/{}/{}", query_id, node_id)
}

pub fn make_final_uri(query_id: &str) -> String {
format!("/v1/query/{}/final", query_id)
pub fn make_final_uri(query_id: &str, node_id: &str) -> String {
format!("/v1/query/{}/{}/final", query_id, node_id)
}

pub fn make_kill_uri(query_id: &str) -> String {
format!("/v1/query/{}/kill", query_id)
pub fn make_kill_uri(query_id: &str, node_id: &str) -> String {
format!("/v1/query/{}/{}/kill", query_id, node_id)
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -146,28 +146,31 @@ impl QueryResponse {
r: HttpQueryResponseInternal,
is_final: bool,
) -> impl IntoResponse {
let node_id = r.node_id.clone();
let state = r.state.clone();
let (data, next_uri) = if is_final {
(JsonBlock::empty(), None)
} else {
match state.state {
ExecuteStateKind::Running | ExecuteStateKind::Starting => match r.data {
None => (JsonBlock::empty(), Some(make_state_uri(&id))),
None => (JsonBlock::empty(), Some(make_state_uri(&id, &r.node_id))),
Some(d) => {
let uri = match d.next_page_no {
Some(n) => Some(make_page_uri(&id, n)),
None => Some(make_state_uri(&id)),
Some(n) => Some(make_page_uri(&id, &r.node_id, n)),
None => Some(make_state_uri(&id, &r.node_id)),
};
(d.page.data, uri)
}
},
ExecuteStateKind::Failed => (JsonBlock::empty(), Some(make_final_uri(&id))),
ExecuteStateKind::Failed => {
(JsonBlock::empty(), Some(make_final_uri(&id, &r.node_id)))
}
ExecuteStateKind::Succeeded => match r.data {
None => (JsonBlock::empty(), Some(make_final_uri(&id))),
None => (JsonBlock::empty(), Some(make_final_uri(&id, &r.node_id))),
Some(d) => {
let uri = match d.next_page_no {
Some(n) => Some(make_page_uri(&id, n)),
None => Some(make_final_uri(&id)),
Some(n) => Some(make_page_uri(&id, &r.node_id, n)),
None => Some(make_final_uri(&id, &r.node_id)),
};
(d.page.data, uri)
}
Expand Down Expand Up @@ -198,9 +201,9 @@ impl QueryResponse {
warnings: r.state.warnings,
id: id.clone(),
next_uri,
stats_uri: Some(make_state_uri(&id)),
final_uri: Some(make_final_uri(&id)),
kill_uri: Some(make_kill_uri(&id)),
stats_uri: Some(make_state_uri(&id, &node_id)),
final_uri: Some(make_final_uri(&id, &node_id)),
kill_uri: Some(make_kill_uri(&id, &node_id)),
error: r.state.error.map(QueryError::from_error_code),
has_result_set: r.state.has_result_set,
})
Expand All @@ -223,15 +226,16 @@ impl QueryResponse {
#[poem::handler]
async fn query_final_handler(
ctx: &HttpQueryContext,
Path(query_id): Path<String>,
Path((query_id, node_id)): Path<(String, String)>,
) -> PoemResult<impl IntoResponse> {
ctx.check_node_id(&node_id, &query_id)?;
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
let _t = SlowRequestLogTracker::new(ctx);

async {
info!(
"{}: got /v1/query/{}/final request, this query is going to be finally completed.",
query_id, query_id
"{}: got {} request, this query is going to be finally completed.",
query_id,
make_final_uri(&query_id, &node_id)
);
let http_query_manager = HttpQueryManager::instance();
match http_query_manager
Expand Down Expand Up @@ -260,15 +264,16 @@ async fn query_final_handler(
#[poem::handler]
async fn query_cancel_handler(
ctx: &HttpQueryContext,
Path(query_id): Path<String>,
Path((query_id, node_id)): Path<(String, String)>,
) -> PoemResult<impl IntoResponse> {
ctx.check_node_id(&node_id, &query_id)?;
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
let _t = SlowRequestLogTracker::new(ctx);

async {
info!(
"{}: got /v1/query/{}/kill request, cancel the query",
query_id, query_id
"{}: got {} request, cancel the query",
query_id,
make_kill_uri(&query_id, &node_id)
);
let http_query_manager = HttpQueryManager::instance();
match http_query_manager
Expand All @@ -290,10 +295,10 @@ async fn query_cancel_handler(
#[poem::handler]
async fn query_state_handler(
ctx: &HttpQueryContext,
Path(query_id): Path<String>,
Path((query_id, node_id)): Path<(String, String)>,
) -> PoemResult<impl IntoResponse> {
ctx.check_node_id(&node_id, &query_id)?;
let root = get_http_tracing_span(full_name!(), ctx, &query_id);

async {
let http_query_manager = HttpQueryManager::instance();
match http_query_manager.get_query(&query_id) {
Expand All @@ -315,11 +320,11 @@ async fn query_state_handler(
#[poem::handler]
async fn query_page_handler(
ctx: &HttpQueryContext,
Path((query_id, page_no)): Path<(String, usize)>,
Path((query_id, node_id, page_no)): Path<(String, String, usize)>,
) -> PoemResult<impl IntoResponse> {
ctx.check_node_id(&node_id, &query_id)?;
let root = get_http_tracing_span(full_name!(), ctx, &query_id);
let _t = SlowRequestLogTracker::new(ctx);

async {
let http_query_manager = HttpQueryManager::instance();
match http_query_manager.get_query(&query_id) {
Expand Down Expand Up @@ -352,7 +357,8 @@ pub(crate) async fn query_handler(
let _t = SlowRequestLogTracker::new(ctx);

async {
info!("http query new request: {:}", mask_connection_info(&format!("{:?}", req)));
let agent = ctx.user_agent.as_ref().map(|s|(format!("(from {s})"))).unwrap_or("".to_string());
info!("http query new request{}: {:}", agent, mask_connection_info(&format!("{:?}", req)));
let http_query_manager = HttpQueryManager::instance();
let sql = req.sql.clone();

Expand Down Expand Up @@ -397,14 +403,14 @@ pub fn query_route() -> Route {
// Note: endpoints except /v1/query may change without notice, use uris in response instead
let rules = [
("/", post(query_handler)),
("/:id", get(query_state_handler)),
("/:id/page/:page_no", get(query_page_handler)),
("/:query_id/:node_id", get(query_state_handler)),
("/:query_id/:node_id/page/:page_no", get(query_page_handler)),
(
"/:id/kill",
"/:query_id/:node_id/kill",
get(query_cancel_handler).post(query_cancel_handler),
),
(
"/:id/final",
"/:query_id/:node_id/final",
get(query_final_handler).post(query_final_handler),
),
];
Expand Down Expand Up @@ -436,7 +442,7 @@ fn query_id_to_trace_id(query_id: &str) -> TraceId {
}

/// The HTTP query endpoints are expected to be responses within 60 seconds.
/// If it exceeds far of 60 seconds, there might be something wrong, we should
/// If it exceeds far from 60 seconds, there might be something wrong, we should
/// log it.
struct SlowRequestLogTracker {
started_at: std::time::Instant,
Expand Down
22 changes: 20 additions & 2 deletions src/query/service/src/servers/http/v1/query/http_query_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use http::StatusCode;
use log::warn;
use poem::FromRequest;
use poem::Request;
use poem::RequestBody;
use poem::Result as PoemResult;
use time::Instant;

use crate::servers::http::v1::HttpQueryManager;
use crate::sessions::Session;
use crate::sessions::SessionManager;
use crate::sessions::SessionType;
Expand Down Expand Up @@ -101,11 +104,26 @@ impl HttpQueryContext {
pub fn set_fail(&self) {
self.session.txn_mgr().lock().set_fail();
}

pub fn check_node_id(&self, node_id: &str, query_id: &str) -> poem::Result<()> {
if node_id != self.node_id {
let manager = HttpQueryManager::instance();
let start_time = manager.server_info.start_time.clone();
let uptime = (Instant::now() - manager.start_instant).as_seconds_f32();
let msg = format!(
"route error: query {query_id} SHOULD be on server {node_id}, but current server is {}, which started at {start_time}({uptime} secs ago)",
self.node_id
);
warn!("{msg}");
return Err(poem::Error::from_string(msg, StatusCode::NOT_FOUND));
}
Ok(())
}
}

impl<'a> FromRequest<'a> for &'a HttpQueryContext {
#[async_backtrace::framed]
async fn from_request(req: &'a Request, _body: &mut RequestBody) -> PoemResult<Self> {
async fn from_request(req: &'a Request, _body: &mut RequestBody) -> poem::Result<Self> {
Ok(req.extensions().get::<HttpQueryContext>().expect(
"To use the `HttpQueryContext` extractor, the `HTTPSessionMiddleware` is required",
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_storages_common_txn::TxnManagerRef;
use parking_lot::Mutex;
use time::Instant;
use tokio::task;

use super::expiring_map::ExpiringMap;
Expand Down Expand Up @@ -77,6 +78,7 @@ impl<T> LimitedQueue<T> {
}

pub struct HttpQueryManager {
pub(crate) start_instant: Instant,
pub(crate) server_info: ServerInfo,
#[allow(clippy::type_complexity)]
pub(crate) queries: Arc<DashMap<String, Arc<HttpQuery>>>,
Expand All @@ -90,6 +92,7 @@ impl HttpQueryManager {
#[async_backtrace::framed]
pub async fn init(cfg: &InnerConfig) -> Result<()> {
GlobalInstance::set(Arc::new(HttpQueryManager {
start_instant: Instant::now(),
server_info: ServerInfo {
id: cfg.query.node_id.clone(),
start_time: chrono::Local::now().to_rfc3339_opts(SecondsFormat::Nanos, false),
Expand Down
Loading

0 comments on commit 08ca16f

Please sign in to comment.