Skip to content

Commit

Permalink
* Initial support for arrow-flight
Browse files Browse the repository at this point in the history
Re organise persistence into seperate crate, add arrow and schema production

Signed-off-by: Ryan <ryan.roberts@btp.works>
  • Loading branch information
ryan-s-roberts committed Mar 12, 2024
1 parent 382488d commit fb0a713
Show file tree
Hide file tree
Showing 66 changed files with 2,406 additions and 1,001 deletions.
631 changes: 455 additions & 176 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ members = [
"crates/chronicle-synth",
"crates/chronicle-signing",
"crates/chronicle-telemetry",
"crates/chronicle-persistence",
"crates/chronicle-arrow",
"crates/gq-subscribe",
"crates/id-provider",
"crates/pallet-chronicle",
Expand All @@ -26,11 +28,17 @@ members = [
"crates/protocol-substrate-chronicle",
"crates/protocol-substrate-opa",
"crates/chronicle-test-infrastructure",
"crates/chronicle-arrow",
"crates/chronicle-persistence",
]

[workspace.dependencies]
Inflector = "0.11.4"
anyhow = { version = "^1", features = ["backtrace"] }
arrow-array = { versipn = "^0.50" }
arrow-flight = { versipn = "^0.50" }
arrow-ipc = { versipn = "^0.50" }
arrow-schema = { versipn = "^0.50" }
assert_fs = "1.0"
async-graphql = "^7"
async-graphql-poem = "^7"
Expand Down Expand Up @@ -107,11 +115,11 @@ pin-project-lite = "0.2"
pinvec = "0.1.0"
pkcs8 = { version = "0.10", features = ["std", "alloc"] }
poem = { version = "^2", features = ["opentelemetry", "websocket"] }
poem-grpc = { version = "^0.3" }
portpicker = "0.1"
pow_of_2 = "0.1"
proptest = "1"
question = "0.2.2"
r2d2 = "0.8.9"
rand = { version = "0.8", features = ["getrandom"] }
rand_core = "0.6"
rdf-types = "0.14"
Expand Down
27 changes: 15 additions & 12 deletions crates/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,7 @@ async-trait = { workspace = true }
base64 = { workspace = true }
cached = { workspace = true }
cfg-if = { workspace = true }
chronicle-signing = { workspace = true }
chronicle-telemetry = { path = "../chronicle-telemetry" }
chrono = { workspace = true }
common = { path = "../common", features = [
"json-ld",
"diesel-bindings",
"graphql-bindings",
"std",
] }
custom_error = { workspace = true }
derivative = { workspace = true }
diesel = { workspace = true }
Expand All @@ -45,10 +37,6 @@ opa = { workspace = true }
opentelemetry = { workspace = true }
parking_lot = { workspace = true }
poem = { workspace = true }
protocol-substrate = { path = "../protocol-substrate" }
protocol-substrate-chronicle = { path = "../protocol-substrate-chronicle" }
protocol-substrate-opa = { path = "../protocol-substrate-opa" }
r2d2 = { workspace = true }
rand = { workspace = true }
rand_core = { workspace = true }
reqwest = { workspace = true }
Expand All @@ -63,6 +51,21 @@ tracing = { workspace = true }
url = { workspace = true }
user-error = { workspace = true }
uuid = { workspace = true }
#Local deps
chronicle-persistence = { path = "../chronicle-persistence" }
chronicle-signing = { workspace = true }
chronicle-telemetry = { path = "../chronicle-telemetry" }
common = { path = "../common", features = [
"json-ld",
"diesel-bindings",
"graphql-bindings",
"std",
] }
protocol-substrate = { path = "../protocol-substrate" }
protocol-substrate-chronicle = { path = "../protocol-substrate-chronicle" }
protocol-substrate-opa = { path = "../protocol-substrate-opa" }
r2d2 = { version = "^0.8.1" }


[dev-dependencies]
assert_fs = { workspace = true }
Expand Down
56 changes: 29 additions & 27 deletions crates/api/src/chronicle_graphql/activity.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use super::{Activity, Agent, Entity, Namespace, Store};
use async_graphql::Context;
use chronicle_persistence::{
queryable::{Activity, Agent, Entity, Namespace},
Store,
};
use common::prov::Role;
use diesel::prelude::*;
use std::collections::HashMap;
Expand All @@ -8,10 +11,10 @@ pub async fn namespace<'a>(
namespaceid: i32,
ctx: &Context<'a>,
) -> async_graphql::Result<Namespace> {
use crate::persistence::schema::namespace::{self, dsl};
use chronicle_persistence::schema::namespace::{self, dsl};
let store = ctx.data_unchecked::<Store>();

let mut connection = store.pool.get()?;
let mut connection = store.connection()?;

Ok(namespace::table
.filter(dsl::id.eq(namespaceid))
Expand All @@ -22,7 +25,7 @@ pub async fn was_associated_with<'a>(
id: i32,
ctx: &Context<'a>,
) -> async_graphql::Result<Vec<(Agent, Option<Role>, Option<Agent>, Option<Role>)>> {
use crate::persistence::schema::{agent, association, delegation};
use chronicle_persistence::schema::{agent, association, delegation};

#[derive(Queryable)]
struct DelegationAgents {
Expand All @@ -32,7 +35,7 @@ pub async fn was_associated_with<'a>(
}

let store = ctx.data_unchecked::<Store>();
let mut connection = store.pool.get()?;
let mut connection = store.connection()?;

let delegation_entries = delegation::table
.filter(delegation::dsl::activity_id.eq(id))
Expand Down Expand Up @@ -62,8 +65,8 @@ pub async fn was_associated_with<'a>(

let res = association::table
.filter(association::dsl::activity_id.eq(id))
.inner_join(crate::persistence::schema::agent::table)
.order(crate::persistence::schema::agent::external_id)
.inner_join(chronicle_persistence::schema::agent::table)
.order(chronicle_persistence::schema::agent::external_id)
.select((Agent::as_select(), association::role))
.load::<(Agent, Role)>(&mut connection)?
.into_iter()
Expand All @@ -89,16 +92,16 @@ pub async fn was_associated_with<'a>(
}

pub async fn used<'a>(id: i32, ctx: &Context<'a>) -> async_graphql::Result<Vec<Entity>> {
use crate::persistence::schema::usage::{self, dsl};
use chronicle_persistence::schema::usage::{self, dsl};

let store = ctx.data_unchecked::<Store>();

let mut connection = store.pool.get()?;
let mut connection = store.connection()?;

let res = usage::table
.filter(dsl::activity_id.eq(id))
.inner_join(crate::persistence::schema::entity::table)
.order(crate::persistence::schema::entity::external_id)
.inner_join(chronicle_persistence::schema::entity::table)
.order(chronicle_persistence::schema::entity::external_id)
.select(Entity::as_select())
.load::<Entity>(&mut connection)?;

Expand All @@ -109,35 +112,34 @@ pub async fn was_informed_by<'a>(
id: i32,
ctx: &Context<'a>,
) -> async_graphql::Result<Vec<Activity>> {
use crate::persistence::schema::wasinformedby::{self, dsl};
use chronicle_persistence::schema::wasinformedby::{self, dsl};

let store = ctx.data_unchecked::<Store>();

let mut connection = store.pool.get()?;
let mut connection = store.connection()?;

let res =
wasinformedby::table
.filter(dsl::activity_id.eq(id))
.inner_join(crate::persistence::schema::activity::table.on(
wasinformedby::informing_activity_id.eq(crate::persistence::schema::activity::id),
))
.order(crate::persistence::schema::activity::external_id)
.select(Activity::as_select())
.load::<Activity>(&mut connection)?;
let res = wasinformedby::table
.filter(dsl::activity_id.eq(id))
.inner_join(chronicle_persistence::schema::activity::table.on(
wasinformedby::informing_activity_id.eq(chronicle_persistence::schema::activity::id),
))
.order(chronicle_persistence::schema::activity::external_id)
.select(Activity::as_select())
.load::<Activity>(&mut connection)?;

Ok(res)
}

pub async fn generated<'a>(id: i32, ctx: &Context<'a>) -> async_graphql::Result<Vec<Entity>> {
use crate::persistence::schema::generation::{self, dsl};
use chronicle_persistence::schema::generation::{self, dsl};

let store = ctx.data_unchecked::<Store>();

let mut connection = store.pool.get()?;
let mut connection = store.connection()?;

let res = generation::table
.filter(dsl::activity_id.eq(id))
.inner_join(crate::persistence::schema::entity::table)
.inner_join(chronicle_persistence::schema::entity::table)
.select(Entity::as_select())
.load::<Entity>(&mut connection)?;

Expand All @@ -149,11 +151,11 @@ pub async fn load_attribute<'a>(
external_id: &str,
ctx: &Context<'a>,
) -> async_graphql::Result<Option<serde_json::Value>> {
use crate::persistence::schema::activity_attribute;
use chronicle_persistence::schema::activity_attribute;

let store = ctx.data_unchecked::<Store>();

let mut connection = store.pool.get()?;
let mut connection = store.connection()?;

Ok(activity_attribute::table
.filter(
Expand Down
42 changes: 12 additions & 30 deletions crates/api/src/chronicle_graphql/agent.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,37 @@
use crate::chronicle_graphql::Entity;

use super::{Agent, Identity, Namespace, Store};
use async_graphql::Context;
use chronicle_persistence::{
queryable::{Agent, Entity, Namespace},
Store,
};
use common::prov::Role;
use diesel::prelude::*;

pub async fn namespace<'a>(
namespace_id: i32,
ctx: &Context<'a>,
) -> async_graphql::Result<Namespace> {
use crate::persistence::schema::namespace::{self, dsl};
use chronicle_persistence::schema::namespace::{self, dsl};
let store = ctx.data_unchecked::<Store>();

let mut connection = store.pool.get()?;
let mut connection = store.connection()?;

Ok(namespace::table
.filter(dsl::id.eq(namespace_id))
.first::<Namespace>(&mut connection)?)
}

pub async fn identity<'a>(
identity_id: Option<i32>,
ctx: &Context<'a>,
) -> async_graphql::Result<Option<Identity>> {
use crate::persistence::schema::identity::{self, dsl};
let store = ctx.data_unchecked::<Store>();

let mut connection = store.pool.get()?;

if let Some(identity_id) = identity_id {
Ok(identity::table
.filter(dsl::id.eq(identity_id))
.first::<Identity>(&mut connection)
.optional()?)
} else {
Ok(None)
}
}

pub async fn acted_on_behalf_of<'a>(
id: i32,
ctx: &Context<'a>,
) -> async_graphql::Result<Vec<(Agent, Option<Role>)>> {
use crate::persistence::schema::{
use chronicle_persistence::schema::{
agent as agentdsl,
delegation::{self, dsl},
};

let store = ctx.data_unchecked::<Store>();

let mut connection = store.pool.get()?;
let mut connection = store.connection()?;

Ok(delegation::table
.filter(dsl::delegate_id.eq(id))
Expand All @@ -68,14 +50,14 @@ pub async fn attribution<'a>(
id: i32,
ctx: &Context<'a>,
) -> async_graphql::Result<Vec<(Entity, Option<Role>)>> {
use crate::persistence::schema::{
use chronicle_persistence::schema::{
attribution::{self, dsl},
entity as entity_dsl,
};

let store = ctx.data_unchecked::<Store>();

let mut connection = store.pool.get()?;
let mut connection = store.connection()?;

Ok(attribution::table
.filter(dsl::agent_id.eq(id))
Expand All @@ -93,11 +75,11 @@ pub async fn load_attribute<'a>(
external_id: &str,
ctx: &Context<'a>,
) -> async_graphql::Result<Option<serde_json::Value>> {
use crate::persistence::schema::agent_attribute;
use chronicle_persistence::schema::agent_attribute;

let store = ctx.data_unchecked::<Store>();

let mut connection = store.pool.get()?;
let mut connection = store.connection()?;

Ok(agent_attribute::table
.filter(agent_attribute::agent_id.eq(id).and(agent_attribute::typename.eq(external_id)))
Expand Down
14 changes: 9 additions & 5 deletions crates/api/src/chronicle_graphql/authorization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,25 @@ pub enum Error {
#[error("Base64 decoding failure: {0}", source)]
Base64 {
#[from]
#[source]
source: base64::DecodeError,
},
#[error("JSON decoding failure: {0}", source)]
Json {
#[from]
#[source]
source: serde_json::Error,
},
#[error("JWT validation failure: {0}", source)]
Jwks {
#[from]
#[source]
source: jwtk::Error,
},
#[error("web access failure: {0}", source)]
Reqwest {
#[from]
#[source]
source: reqwest::Error,
},
#[error("formatting error: {0}", message)]
Expand Down Expand Up @@ -76,15 +80,15 @@ impl TokenChecker {
// should respond with JSON web key set
if !status.is_success() {
tracing::warn!("{uri:?} returns {status}");
return Err(Error::UnexpectedResponse { server: format!("{uri:?}"), status })
return Err(Error::UnexpectedResponse { server: format!("{uri:?}"), status });
}
}
if let Some(uri) = &self.userinfo_uri {
let status = self.client.get(uri.full_uri()).send().await?.status();
// should require an authorization token
if !status.is_client_error() || status == StatusCode::NOT_FOUND {
tracing::warn!("{uri:?} without authorization token returns {status}");
return Err(Error::UnexpectedResponse { server: format!("{uri:?}"), status })
return Err(Error::UnexpectedResponse { server: format!("{uri:?}"), status });
}
}
Ok(())
Expand All @@ -98,7 +102,7 @@ impl TokenChecker {
if let Some(verifier) = &self.verifier {
verifier.verify::<Map<String, Value>>(token).await?;
} else {
return Err(Error::Format { message: "no JWKS endpoint configured".to_string() })
return Err(Error::Format { message: "no JWKS endpoint configured".to_string() });
}

// JWT is composed of three base64-encoded components
Expand All @@ -107,7 +111,7 @@ impl TokenChecker {
.map(|component| BASE64_ENGINE.decode(component))
.collect::<Result<Vec<Vec<u8>>, base64::DecodeError>>()?;
if components.len() != 3 {
return Err(Error::Format { message: format!("JWT has unexpected format: {token}") })
return Err(Error::Format { message: format!("JWT has unexpected format: {token}") });
};

if let Value::Object(claims) = serde_json::from_slice(components[1].as_slice())? {
Expand All @@ -133,7 +137,7 @@ impl TokenChecker {
},
_ => (),
}
return Err(Error::Jwks { source }) // abort on JWKS verifier failure
return Err(Error::Jwks { source }); // abort on JWKS verifier failure
},
Err(err) => error = Some(err), // could tolerate error from what may be opaque token
};
Expand Down
Loading

0 comments on commit fb0a713

Please sign in to comment.