Skip to content

Commit

Permalink
Put / get now round trips, additional relations to add
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan <ryan.roberts@btp.works>
  • Loading branch information
ryan-s-roberts committed Apr 12, 2024
1 parent e58e985 commit 0a5fac9
Show file tree
Hide file tree
Showing 37 changed files with 147,147 additions and 1,615 deletions.
1,397 changes: 831 additions & 566 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 7 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ members = [
"crates/pallet-chronicle",
"crates/pallet-opa",
"crates/opactl",
"crates/chronicle-data",
"node/runtime-chronicle",
"node/node-chronicle",
"crates/embedded-substrate",
Expand All @@ -28,17 +29,13 @@ members = [
"crates/protocol-substrate-chronicle",
"crates/protocol-substrate-opa",
"crates/chronicle-test-infrastructure",
"crates/chronicle-arrow",
"crates/chronicle-persistence",
"crates/chronicle-data",
]

[workspace.dependencies]
Inflector = "0.11.4"
anyhow = { version = "^1", features = ["backtrace"] }
arrow-array = { versipn = "^0.50" }
arrow-flight = { versipn = "^0.50" }
arrow-ipc = { versipn = "^0.50" }
arrow-schema = { versipn = "^0.50" }
assert_fs = "1.0"
async-graphql = "^7"
async-graphql-poem = "^7"
Expand All @@ -56,7 +53,7 @@ clap_complete = "3.2.3"
clap_generate = "3.0.3"
collecting-hashmap = { version = "0.2" }
colored_json = "^3"
console-subscriber = "0.1"
console-subscriber = "0.2"
const_format = "0.2"
criterion = { version = "0.5.1", features = ["async_futures", "async_tokio"] }
crossbeam = "^0.8"
Expand Down Expand Up @@ -98,8 +95,8 @@ locspan = "0.7"
lru = "0.11"
macro-attr-2018 = "3.0.0"
maplit = "1.0.2"
metrics = "0.21.0"
metrics-exporter-prometheus = "0.12.1"
metrics = "^0.22"
metrics-exporter-prometheus = "^0.14"
mime = "0.3"
mobc = "0.8"
mockito = "1.1"
Expand All @@ -114,7 +111,7 @@ pin-project = "1.0.12"
pin-project-lite = "0.2"
pinvec = "0.1.0"
pkcs8 = { version = "0.10", features = ["std", "alloc"] }
poem = { version = "^2", features = ["opentelemetry", "websocket"] }
poem = { version = "^3", features = ["opentelemetry", "websocket"] }
poem-grpc = { version = "^0.3" }
portpicker = "0.1"
pow_of_2 = "0.1"
Expand All @@ -132,7 +129,7 @@ secret-vault = { version = "1.8", features = [] }
secret-vault-value = "0.3"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
serde_json = { version = "^1.0", default-features = false }
serde_yaml = "0.9.14"
shellexpand = "3.0.0"
temp-dir = "0.1.11"
Expand Down
116 changes: 63 additions & 53 deletions crates/api/src/chronicle_graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use diesel::{
r2d2::{ConnectionManager, Pool, PooledConnection},
PgConnection, Queryable,
};
use futures::Stream;
use futures::{Future, Stream};
use lazy_static::lazy_static;
use poem::{
get, handler,
Expand Down Expand Up @@ -433,6 +433,7 @@ impl core::fmt::Debug for UserInfoUri {
}
}

#[derive(Clone, Debug)]
pub struct SecurityConf {
jwks_uri: Option<JwksUri>,
userinfo_uri: Option<UserInfoUri>,
Expand Down Expand Up @@ -462,7 +463,7 @@ pub trait ChronicleApiServer {
pool: Pool<ConnectionManager<PgConnection>>,
api: ApiDispatch,
addresses: Vec<SocketAddr>,
security_conf: SecurityConf,
security_conf: &SecurityConf,
serve_graphql: bool,
serve_data: bool,
) -> Result<(), ApiError>;
Expand Down Expand Up @@ -620,7 +621,6 @@ where
}
}

#[poem::async_trait]
impl<Q, M, S> Endpoint for QueryEndpoint<Q, M, S>
where
Q: ObjectType + 'static,
Expand All @@ -629,16 +629,18 @@ where
{
type Output = poem::Response;

async fn call(&self, req: poem::Request) -> poem::Result<Self::Output> {
let checked_claims = check_claims(&self.secconf, &req).await?;
self.respond(req, |api_req| {
if let Some(claims) = checked_claims {
api_req.0.data(claims)
} else {
api_req.0
}
})
.await
fn call(&self, req: poem::Request) -> impl Future<Output = poem::Result<Self::Output>> {
async move {
let checked_claims = check_claims(&self.secconf, &req).await?;
self.respond(req, |api_req| {
if let Some(claims) = checked_claims {
api_req.0.data(claims)
} else {
api_req.0
}
})
.await
}
}
}

Expand Down Expand Up @@ -673,7 +675,6 @@ where
}
}

#[poem::async_trait]
impl<Q, M, S> Endpoint for SubscriptionEndpoint<Q, M, S>
where
Q: ObjectType + 'static,
Expand All @@ -682,19 +683,21 @@ where
{
type Output = poem::Response;

async fn call(&self, req: poem::Request) -> poem::Result<Self::Output> {
let checked_claims = check_claims(&self.secconf, &req).await?;
self.respond(
req,
if let Some(claims) = checked_claims {
let mut data = async_graphql::Data::default();
data.insert(claims);
data
} else {
async_graphql::Data::default()
},
)
.await
fn call(&self, req: poem::Request) -> impl Future<Output = poem::Result<Self::Output>> {
async move {
let checked_claims = check_claims(&self.secconf, &req).await?;
self.respond(
req,
if let Some(claims) = checked_claims {
let mut data = async_graphql::Data::default();
data.insert(claims);
data
} else {
async_graphql::Data::default()
},
)
.await
}
}
}

Expand Down Expand Up @@ -752,8 +755,8 @@ impl IriEndpoint {
.body("failed to compact JSON response"))
},
},
Err(StoreError::Db(diesel::result::Error::NotFound)) |
Err(StoreError::RecordNotFound) => {
Err(StoreError::Db(diesel::result::Error::NotFound))
| Err(StoreError::RecordNotFound) => {
tracing::debug!("not found: {prov_type} {} in {ns}", id.external_id_part());
Ok(poem::Response::builder()
.status(StatusCode::NOT_FOUND)
Expand Down Expand Up @@ -805,20 +808,21 @@ impl IriEndpoint {
let (req, mut body) = req.split();

let ns_iri: poem::Result<Path<NamespacedIri>> =
FromRequest::from_request(&req, &mut body).await;
poem::web::Path::from_request(&req, &mut body).await;

let ns_iri: NamespacedIri = match ns_iri {
Ok(Path(nsi)) => nsi,
Err(_) => {
let path: Path<Iri> = FromRequest::from_request(&req, &mut body).await?;
let path: Path<Iri> = poem::web::Path::from_request(&req, &mut body).await?;
path.0.into()
},
};

match ChronicleIri::from_str(&ns_iri.iri) {
Ok(iri) => Ok(Ok((ns_iri.ns.into(), iri))),
Err(error) =>
Ok(Err(Response::builder().status(StatusCode::NOT_FOUND).body(error.to_string()))),
Err(error) => {
Ok(Err(Response::builder().status(StatusCode::NOT_FOUND).body(error.to_string())))
},
}
}

Expand All @@ -829,21 +833,24 @@ impl IriEndpoint {
claims: Option<&JwtClaims>,
) -> poem::Result<poem::Response> {
match self.parse_ns_iri_from_uri_path(req).await? {
Ok((ns, ChronicleIri::Activity(id))) =>
Ok((ns, ChronicleIri::Activity(id))) => {
self.response_for_query(claims, "activity", &id, &ns, |mut conn, id, ns| {
self.store.prov_model_for_activity_id(&mut conn, id, ns)
})
.await,
Ok((ns, ChronicleIri::Agent(id))) =>
.await
},
Ok((ns, ChronicleIri::Agent(id))) => {
self.response_for_query(claims, "agent", &id, &ns, |mut conn, id, ns| {
self.store.prov_model_for_agent_id(&mut conn, id, ns)
})
.await,
Ok((ns, ChronicleIri::Entity(id))) =>
.await
},
Ok((ns, ChronicleIri::Entity(id))) => {
self.response_for_query(claims, "entity", &id, &ns, |mut conn, id, ns| {
self.store.prov_model_for_entity_id(&mut conn, id, ns)
})
.await,
.await
},
Ok(_) => Ok(poem::Response::builder()
.status(StatusCode::NOT_FOUND)
.body("may query only: activity, agent, entity")),
Expand All @@ -852,29 +859,31 @@ impl IriEndpoint {
}
}

#[poem::async_trait]
impl Endpoint for IriEndpoint {
type Output = poem::Response;

async fn call(&self, req: poem::Request) -> poem::Result<Self::Output> {
let checked_claims = if let Some(secconf) = &self.secconf {
check_claims(secconf, &req).await?
} else {
None
};
self.respond(req, checked_claims.as_ref()).await
fn call(&self, req: poem::Request) -> impl Future<Output = poem::Result<Self::Output>> {
async move {
let checked_claims = if let Some(secconf) = &self.secconf {
check_claims(secconf, &req).await?
} else {
None
};
self.respond(req, checked_claims.as_ref()).await
}
}
}

struct LdContextEndpoint;

#[poem::async_trait]
impl Endpoint for LdContextEndpoint {
type Output = poem::Response;

async fn call(&self, _req: poem::Request) -> poem::Result<Self::Output> {
let context: &serde_json::Value = &common::context::PROV;
Ok(IntoResponse::into_response(poem::web::Json(context)))
fn call(&self, _req: poem::Request) -> impl Future<Output = poem::Result<Self::Output>> {
async move {
let context: &serde_json::Value = &common::context::PROV;
Ok(IntoResponse::into_response(poem::web::Json(context)))
}
}
}

Expand Down Expand Up @@ -918,7 +927,6 @@ impl async_graphql::extensions::Extension for AuthFromJwt {
}
}

#[async_trait::async_trait]
impl async_graphql::extensions::ExtensionFactory for AuthFromJwt {
fn create(&self) -> Arc<dyn async_graphql::extensions::Extension> {
Arc::new(AuthFromJwt {
Expand Down Expand Up @@ -1000,10 +1008,12 @@ where
pool: Pool<ConnectionManager<PgConnection>>,
api: ApiDispatch,
addresses: Vec<SocketAddr>,
sec: SecurityConf,
sec: &SecurityConf,
serve_graphql: bool,
serve_data: bool,
) -> Result<(), ApiError> {
tracing::info!("Serve graphql on {:?}", addresses);
let sec = sec.clone();
let claim_parser = sec
.id_claims
.map(|id_claims| AuthFromJwt { id_claims, allow_anonymous: sec.allow_anonymous });
Expand Down
10 changes: 4 additions & 6 deletions crates/api/src/chronicle_graphql/mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ async fn transaction_context<'a>(
_ctx: &Context<'a>,
) -> async_graphql::Result<Submission> {
match res {
ApiResponse::Submission { subject, tx_id, .. } => {
Ok(Submission::from_submission(&subject, &tx_id))
},
ApiResponse::AlreadyRecorded { subject, .. } => {
Ok(Submission::from_already_recorded(&subject))
},
ApiResponse::Submission { subject, tx_id, .. } =>
Ok(Submission::from_submission(&subject, &tx_id)),
ApiResponse::AlreadyRecorded { subject, .. } =>
Ok(Submission::from_already_recorded(&subject)),
_ => unreachable!(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/api/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{path::PathBuf, sync::Arc};
use std::{path::PathBuf, sync::Arc};

use chrono::{DateTime, Utc};
use futures::AsyncRead;
use futures::{AsyncRead, Future};

use serde::{Deserialize, Serialize};

Expand Down
18 changes: 9 additions & 9 deletions crates/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ pub enum ApiError {
chronicle_persistence::StoreError,
),

#[error("Storage: {0:?}")]
ArrowService(#[source] anyhow::Error),

#[error("Transaction failed: {0}")]
Transaction(
#[from]
Expand Down Expand Up @@ -522,7 +525,6 @@ where
if commit.tx_id == tx_id {
let end_time = Instant::now();
let elapsed_time = end_time - start_time;

debug!(
"Depth charge transaction committed: {}",
commit.tx_id
Expand All @@ -531,10 +533,10 @@ where
"Depth charge round trip time: {:.2?}",
elapsed_time
);
histogram!(
"depth_charge_round_trip",
elapsed_time.as_millis() as f64
);
let hist = histogram!("depth_charge_round_trip",);

hist.record(elapsed_time.as_millis() as f64);

break;
}
},
Expand All @@ -557,10 +559,8 @@ where
Ok(dispatch)
}

/// Notify after a successful submission, for now this makes little
/// difference, but with the future introduction of a submission queue,
/// submission notifications will be decoupled from api invocation.
/// This is a measure to keep the api interface stable once this is introduced
/// Notify after a successful submission, depending on the consistency requirement TODO: set in
/// the transaction
fn submit_blocking(
&mut self,
tx: ChronicleTransaction,
Expand Down
Loading

0 comments on commit 0a5fac9

Please sign in to comment.