Skip to content

Commit

Permalink
Additional relation support:
Browse files Browse the repository at this point in the history
* derivation
* agent attribution

Signed-off-by: Ryan <ryan.roberts@btp.works>
  • Loading branch information
ryan-s-roberts committed Apr 25, 2024
1 parent 2a6ba18 commit ffa7a03
Show file tree
Hide file tree
Showing 16 changed files with 945 additions and 254 deletions.
16 changes: 10 additions & 6 deletions crates/api/src/chronicle_graphql/authorization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ pub enum Error {
UnexpectedResponse { server: String, status: StatusCode },
}

#[derive(Clone)]
pub struct TokenChecker {
client: reqwest::Client,
verifier: Option<RemoteJwksVerifier>,
verifier: Option<Arc<RemoteJwksVerifier>>,
jwks_uri: Option<JwksUri>,
userinfo_uri: Option<UserInfoUri>,
userinfo_cache: Arc<Mutex<TimedCache<String, Map<String, Value>>>>,
Expand All @@ -60,11 +61,14 @@ impl TokenChecker {
Self {
client: reqwest::Client::new(),
verifier: jwks_uri.map(|uri| {
RemoteJwksVerifier::new(
uri.full_uri(),
None,
Duration::from_secs(cache_expiry_seconds.into()),
)
{
RemoteJwksVerifier::new(
uri.full_uri(),
None,
Duration::from_secs(cache_expiry_seconds.into()),
)
}
.into()
}),
jwks_uri: jwks_uri.cloned(),
userinfo_uri: userinfo_uri.cloned(),
Expand Down
23 changes: 18 additions & 5 deletions crates/api/src/chronicle_graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use crate::{ApiDispatch, ApiError, StoreError};
#[macro_use]
pub mod activity;
pub mod agent;
mod authorization;
pub mod authorization;
mod cursor_project;
pub mod entity;
pub mod mutation;
Expand Down Expand Up @@ -453,6 +453,18 @@ impl SecurityConf {
) -> Self {
Self { jwks_uri, userinfo_uri, id_claims, jwt_must_claim, allow_anonymous, opa }
}

pub fn as_endpoint_conf(&self, cache_expiry_seconds: u32) -> EndpointSecurityConfiguration {
EndpointSecurityConfiguration::new(
TokenChecker::new(
self.jwks_uri.as_ref(),
self.userinfo_uri.as_ref(),
cache_expiry_seconds,
),
self.jwt_must_claim.clone(),
self.allow_anonymous,
)
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -576,14 +588,15 @@ async fn execute_opa_check(
}
}

struct EndpointSecurityConfiguration {
#[derive(Clone)]
pub struct EndpointSecurityConfiguration {
checker: TokenChecker,
must_claim: HashMap<String, String>,
allow_anonymous: bool,
pub must_claim: HashMap<String, String>,
pub allow_anonymous: bool,
}

impl EndpointSecurityConfiguration {
fn new(
pub fn new(
checker: TokenChecker,
must_claim: HashMap<String, String>,
allow_anonymous: bool,
Expand Down
516 changes: 429 additions & 87 deletions crates/chronicle-arrow/src/lib.rs

Large diffs are not rendered by default.

35 changes: 25 additions & 10 deletions crates/chronicle-arrow/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,35 @@ pub fn attribution_struct() -> arrow_schema::DataType {
pub fn derivation_struct() -> arrow_schema::DataType {
arrow_schema::DataType::Struct(
vec![
arrow_schema::Field::new("target", arrow_schema::DataType::Utf8, false),
arrow_schema::Field::new("source", arrow_schema::DataType::Utf8, false),
arrow_schema::Field::new("activity", arrow_schema::DataType::Utf8, false),
]
.into(),
)
}

pub fn qualified_agent_struct() -> arrow_schema::DataType {
arrow_schema::DataType::Struct(
vec![
arrow_schema::Field::new("agent", arrow_schema::DataType::Utf8, false),
arrow_schema::Field::new("role", arrow_schema::DataType::Utf8, true),
]
.into(),
)
}

pub fn association_struct() -> arrow_schema::DataType {
arrow_schema::DataType::Struct(
vec![
arrow_schema::Field::new("responsible_agent", arrow_schema::DataType::Utf8, false),
arrow_schema::Field::new("responsible_role", arrow_schema::DataType::Utf8, true),
arrow_schema::Field::new("delegate_agent", arrow_schema::DataType::Utf8, true),
arrow_schema::Field::new("delegate_role", arrow_schema::DataType::Utf8, true),
arrow_schema::Field::new("responsible", qualified_agent_struct(), false),
arrow_schema::Field::new(
"delegated",
arrow_schema::DataType::new_list(
qualified_agent_struct(),
true, // Set the List type as non-nullable
),
false,
),
]
.into(),
)
Expand All @@ -63,7 +78,7 @@ pub fn agent_delegation_struct() -> arrow_schema::DataType {
arrow_schema::DataType::Struct(
vec![
arrow_schema::Field::new("agent", arrow_schema::DataType::Utf8, false),
arrow_schema::Field::new("activity", arrow_schema::DataType::Utf8, true),
arrow_schema::Field::new("activity", arrow_schema::DataType::Utf8, false),
arrow_schema::Field::new("role", arrow_schema::DataType::Utf8, true),
]
.into(),
Expand Down Expand Up @@ -164,13 +179,13 @@ pub fn schema_for_activity(activity: &ActivityDef) -> Schema {
builder.push(arrow_schema::Field::new(
"used",
arrow_schema::DataType::new_list(arrow_schema::DataType::Utf8, false),
true,
false,
));

builder.push(arrow_schema::Field::new(
"generated",
arrow_schema::DataType::new_list(arrow_schema::DataType::Utf8, false),
true,
false,
));

builder.push(arrow_schema::Field::new(
Expand All @@ -181,7 +196,7 @@ pub fn schema_for_activity(activity: &ActivityDef) -> Schema {

builder.push(arrow_schema::Field::new(
"was_associated_with",
arrow_schema::DataType::new_list(association_struct(), false),
arrow_schema::DataType::new_list(association_struct(), true),
false,
));

Expand All @@ -203,7 +218,7 @@ pub fn schema_for_agent(agent: &AgentDef) -> Schema {
builder.push(arrow_schema::Field::new(
"acted_on_behalf_of",
arrow_schema::DataType::new_list(agent_delegation_struct(), false),
true,
false,
));

builder.push(arrow_schema::Field::new(
Expand Down
93 changes: 90 additions & 3 deletions crates/chronicle-arrow/src/operations/activity.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use arrow_array::{Array, RecordBatch};

use common::{
attributes::Attributes,
prov::{
operations::{ChronicleOperation, SetAttributes},
ActivityId, EntityId, NamespaceId,
ActivityId, AgentId, EntityId, NamespaceId, Role,
},
};

use futures::StreamExt;

use crate::ChronicleArrowError;
use crate::{
query::{ActivityAssociationRef, AgentInteraction},
ChronicleArrowError,
};

use super::{string_list_column, with_implied};

Expand Down Expand Up @@ -72,6 +74,70 @@ fn get_ended(
opt_time_column(record_batch, "ended", row_index)
}

fn get_was_associated_with(
record_batch: &RecordBatch,
row_index: usize,
) -> Result<Vec<ActivityAssociationRef>, ChronicleArrowError> {
use arrow_array::{ListArray, StructArray, StringArray};


let column_index = record_batch.schema().index_of("was_associated_with")
.map_err(|_| ChronicleArrowError::MissingColumn("was_associated_with".to_string()))?;
let column = record_batch.column(column_index);
let list_array = column.as_any().downcast_ref::<ListArray>()
.ok_or(ChronicleArrowError::ColumnTypeMismatch("Expected ListArray".to_string()))?;
let binding = list_array.value(row_index);
let struct_array = binding.as_any().downcast_ref::<StructArray>()
.ok_or(ChronicleArrowError::ColumnTypeMismatch("Expected StructArray".to_string()))?;

let mut associations = Vec::new();
for i in 0..struct_array.len() {
let responsible_struct_array = struct_array.column(0).as_any().downcast_ref::<StructArray>()
.ok_or(ChronicleArrowError::ColumnTypeMismatch("Expected StructArray for responsible".to_string()))?;

let agent_array = responsible_struct_array.column(0).as_any().downcast_ref::<StringArray>()
.ok_or(ChronicleArrowError::ColumnTypeMismatch("Expected StringArray for agent".to_string()))?;
let role_array = responsible_struct_array.column(1).as_any().downcast_ref::<StringArray>()
.ok_or(ChronicleArrowError::ColumnTypeMismatch("Expected StringArray for role".to_string()))?;

let agent = agent_array.value(i).to_string();
let role = Some(role_array.value(i).to_string());

// Handling the delegated field, which is a ListArray of StructArray
let delegated_list_array = struct_array.column(1).as_any().downcast_ref::<ListArray>()
.ok_or(ChronicleArrowError::ColumnTypeMismatch("Expected ListArray for delegated".to_string()))?;
let delegated_binding = delegated_list_array.value(i);
let delegated_struct_array = delegated_binding.as_any().downcast_ref::<StructArray>()
.ok_or(ChronicleArrowError::ColumnTypeMismatch("Expected StructArray for delegated".to_string()))?;

let mut delegated_agents = Vec::new();
for j in 0..delegated_struct_array.len() {
let delegated_agent_array = delegated_struct_array.column(0).as_any().downcast_ref::<StringArray>()
.ok_or(ChronicleArrowError::ColumnTypeMismatch("Expected StringArray for delegated agent".to_string()))?;
let delegated_role_array = delegated_struct_array.column(1).as_any().downcast_ref::<StringArray>()
.ok_or(ChronicleArrowError::ColumnTypeMismatch("Expected StringArray for delegated role".to_string()))?;

let delegated_agent = delegated_agent_array.value(j).to_string();
let delegated_role = Some(delegated_role_array.value(j).to_string());

delegated_agents.push(AgentInteraction {
agent: delegated_agent,
role: delegated_role,
});
}

associations.push(ActivityAssociationRef {
responsible: AgentInteraction {
agent,
role,
},
delegated: delegated_agents,
});
}

Ok(associations)
}

pub fn activity_operations(
ns: &NamespaceId,
id: &str,
Expand Down Expand Up @@ -138,5 +204,26 @@ pub fn activity_operations(
));
}

let was_associated_with_refs = get_was_associated_with(record_batch, row_index)?;

for association_ref in was_associated_with_refs {
operations.push(ChronicleOperation::was_associated_with(
ns.clone(),
ActivityId::from_external_id(id),
AgentId::from_external_id(&association_ref.responsible.agent),
association_ref.responsible.role.map(Role),
));

for delegated in &association_ref.delegated {
operations.push(ChronicleOperation::agent_acts_on_behalf_of(
ns.clone(),
AgentId::from_external_id(id),
AgentId::from_external_id(&association_ref.responsible.agent),
Some(ActivityId::from_external_id(id)),
delegated.role.as_ref().map(|role| Role(role.clone())),
));
}
}

Ok(with_implied(operations))
}
6 changes: 3 additions & 3 deletions crates/chronicle-arrow/src/operations/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ fn get_acted_on_behalf_of(
"acted_on_behalf_of",
row_index,
"agent",
"role",
"activity",
"role",
)?
.into_iter()
.map(|(agent, role, activity)| ActedOnBehalfOfRef { agent, role, activity })
.map(|(agent, activity, role)| ActedOnBehalfOfRef { agent, role, activity })
.collect())
}

Expand Down Expand Up @@ -81,7 +81,7 @@ pub fn agent_operations(
ns.clone(),
AgentId::from_external_id(id),
AgentId::from_external_id(acted_on_behalf_of_ref.agent),
acted_on_behalf_of_ref.activity.map(ActivityId::from_external_id),
Some(ActivityId::from_external_id(acted_on_behalf_of_ref.activity)),
acted_on_behalf_of_ref.role.map(Role::from),
));
}
Expand Down
12 changes: 6 additions & 6 deletions crates/chronicle-arrow/src/operations/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ fn get_derivation(
record_batch: &RecordBatch,
row_index: usize,
) -> Result<Vec<DerivationRef>, ChronicleArrowError> {
Ok(struct_2_list_column(record_batch, column_name, row_index, "target", "activity")?
Ok(struct_2_list_column(record_batch, column_name, row_index, "source", "activity")?
.into_iter()
.map(|(target, activity)| DerivationRef { target, activity })
.map(|(target, activity)| DerivationRef { source: target, activity })
.collect())
}

Expand Down Expand Up @@ -92,8 +92,8 @@ pub fn entity_operations(
for was_derived_from_ref in was_derived_from_refs {
operations.push(ChronicleOperation::entity_derive(
ns.clone(),
EntityId::from_external_id(was_derived_from_ref.source),
EntityId::from_external_id(id),
EntityId::from_external_id(was_derived_from_ref.target),
Some(ActivityId::from_external_id(was_derived_from_ref.activity)),
DerivationType::None,
))
Expand All @@ -104,8 +104,8 @@ pub fn entity_operations(
for had_primary_source_ref in had_primary_source_refs {
operations.push(ChronicleOperation::entity_derive(
ns.clone(),
EntityId::from_external_id(had_primary_source_ref.source),
EntityId::from_external_id(id),
EntityId::from_external_id(had_primary_source_ref.target),
Some(ActivityId::from_external_id(had_primary_source_ref.activity)),
DerivationType::PrimarySource,
))
Expand All @@ -116,8 +116,8 @@ pub fn entity_operations(
for was_quoted_from_ref in was_quoted_from_refs {
operations.push(ChronicleOperation::entity_derive(
ns.clone(),
EntityId::from_external_id(was_quoted_from_ref.source),
EntityId::from_external_id(id),
EntityId::from_external_id(was_quoted_from_ref.target),
Some(ActivityId::from_external_id(was_quoted_from_ref.activity)),
DerivationType::Quotation,
))
Expand All @@ -128,8 +128,8 @@ pub fn entity_operations(
for was_revision_of_ref in was_revision_of_refs {
operations.push(ChronicleOperation::entity_derive(
ns.clone(),
EntityId::from_external_id(was_revision_of_ref.source),
EntityId::from_external_id(id),
EntityId::from_external_id(was_revision_of_ref.target),
Some(ActivityId::from_external_id(was_revision_of_ref.activity)),
DerivationType::Revision,
))
Expand Down
Loading

0 comments on commit ffa7a03

Please sign in to comment.