Skip to content

Commit

Permalink
Pagination for Tasks and Content (#653)
Browse files Browse the repository at this point in the history
* add pagination

* using the right iteration api

* add support for limit and start id with content

* wip

* passing the content_id filter to listing tasks

* renamed ListContent

* format fix

* support pagination with unknown row count

* support content pagination

* fix build

* update indexify ts client version

---------

Co-authored-by: Lucas Jackson <lucasjackson5815@gmail.com>
  • Loading branch information
diptanu and lucasjacks0n authored Jun 13, 2024
1 parent 624e49d commit 1c21867
Show file tree
Hide file tree
Showing 19 changed files with 538 additions and 324 deletions.
10 changes: 10 additions & 0 deletions crates/indexify_proto/src/indexify_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ pub struct ListTasksRequest {
pub namespace: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub extraction_policy: ::prost::alloc::string::String,
#[prost(string, tag = "3")]
pub content_id: ::prost::alloc::string::String,
#[prost(string, tag = "4")]
pub start_id: ::prost::alloc::string::String,
#[prost(uint64, tag = "5")]
pub limit: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -347,6 +353,10 @@ pub struct ListContentRequest {
::prost::alloc::string::String,
::prost_wkt_types::Value,
>,
#[prost(uint64, tag = "5")]
pub limit: u64,
#[prost(string, tag = "6")]
pub start_id: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
6 changes: 5 additions & 1 deletion protos/coordinator_service.proto
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
syntax = "proto3";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";

package indexify_coordinator;

Expand Down Expand Up @@ -122,6 +121,9 @@ message ListStateChangesResponse {
message ListTasksRequest {
string namespace = 1;
string extraction_policy = 2;
string content_id = 3;
string start_id = 4;
uint64 limit = 5;
}

message ListTasksResponse {
Expand Down Expand Up @@ -305,6 +307,8 @@ message ListContentRequest {
string source = 2;
string parent_id = 3;
map<string, google.protobuf.Value> labels_eq = 4;
uint64 limit = 5;
string start_id = 6;
}

message ListContentResponse {
Expand Down
9 changes: 8 additions & 1 deletion src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ impl From<metadata_storage::ExtractedMetadata> for ExtractedMetadata {
}

#[derive(Debug, Serialize, Deserialize, ToSchema, PartialEq, Clone)]
pub struct ListContentFilters {
pub struct ListContent {
#[serde(
deserialize_with = "api_utils::deserialize_none_to_empty_string",
default
Expand All @@ -344,6 +344,10 @@ pub struct ListContentFilters {
pub parent_id: String,
#[serde(default, deserialize_with = "api_utils::deserialize_labels_eq_filter")]
pub labels_eq: Option<HashMap<String, serde_json::Value>>,

pub limit: Option<u64>,

pub start_id: Option<String>,
}

#[derive(Debug, Serialize, Deserialize, IntoParams, ToSchema)]
Expand Down Expand Up @@ -678,6 +682,9 @@ pub struct GetExtractedMetadataResponse {
#[derive(Debug, Serialize, Deserialize)]
pub struct ListTasks {
pub extraction_policy: Option<String>,
pub content_id: Option<String>,
pub start_id: Option<String>,
pub limit: Option<u64>,
}

#[derive(Debug, Serialize, Deserialize, ToSchema)]
Expand Down
30 changes: 19 additions & 11 deletions src/api_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,45 +296,49 @@ mod test_deserialize_labels_eq_filter {
use hyper::Uri;

use super::*;
use crate::api::ListContentFilters;
use crate::api::ListContent;

/// 1. ?source=foo&labels_eq=key:value
#[test]
fn test_key_value() {
let expected_query: Query<ListContentFilters> = Query(ListContentFilters {
let expected_query: Query<ListContent> = Query(ListContent {
source: "foo".to_string(),
parent_id: "".to_string(),
labels_eq: Some({
let mut labels_eq = HashMap::new();
labels_eq.insert("key".to_string(), serde_json::json!("value"));
labels_eq
}),
start_id: None,
limit: None,
});

let query_str: Uri = "http://example.com/path?source=foo&labels_eq=key:value"
.parse()
.unwrap();
let query: Query<ListContentFilters> = Query::try_from_uri(&query_str).unwrap();
let query: Query<ListContent> = Query::try_from_uri(&query_str).unwrap();
assert_eq!(query.0, expected_query.0);
}

/// 2. ?source=foo&labels_eq=key:
#[test]
fn test_key_empty_value() {
let expected_query: Query<ListContentFilters> = Query(ListContentFilters {
let expected_query: Query<ListContent> = Query(ListContent {
source: "foo".to_string(),
parent_id: "".to_string(),
labels_eq: Some({
let mut labels_eq = HashMap::new();
labels_eq.insert("key".to_string(), serde_json::json!(""));
labels_eq
}),
start_id: None,
limit: None,
});

let query_str: Uri = "http://example.com/path?source=foo&labels_eq=key:"
.parse()
.unwrap();
let query: Query<ListContentFilters> = Query::try_from_uri(&query_str).unwrap();
let query: Query<ListContent> = Query::try_from_uri(&query_str).unwrap();
assert_eq!(query.0, expected_query.0);
}

Expand All @@ -345,14 +349,14 @@ mod test_deserialize_labels_eq_filter {
let query_str: Uri = "http://example.com/path?source=foo&labels_eq="
.parse()
.unwrap();
let query: Result<Query<ListContentFilters>, _> = Query::try_from_uri(&query_str);
let query: Result<Query<ListContent>, _> = Query::try_from_uri(&query_str);
assert!(query.is_err(), "query should be invalid: \"labels_eq=\"");
}

/// 4. ?source=foo&labels_eq=key:value&labels_eq=key2:25
#[test]
fn test_multiple_key_value() {
let expected_query: Query<ListContentFilters> = Query(ListContentFilters {
let expected_query: Query<ListContent> = Query(ListContent {
source: "foo".to_string(),
parent_id: "".to_string(),
labels_eq: Some({
Expand All @@ -362,19 +366,21 @@ mod test_deserialize_labels_eq_filter {

labels_eq
}),
start_id: None,
limit: None,
});

let query_str: Uri = "http://example.com/path?source=foo&labels_eq=key:value,key2:25"
.parse()
.unwrap();
let query: Query<ListContentFilters> = Query::try_from_uri(&query_str).unwrap();
let query: Query<ListContent> = Query::try_from_uri(&query_str).unwrap();
assert_eq!(query.0, expected_query.0);
}

/// 5. ?source=foo&labels_eq=key:value&labels_eq=key2:
#[test]
fn test_multiple_key_value_key_empty_value() {
let expected_query: Query<ListContentFilters> = Query(ListContentFilters {
let expected_query: Query<ListContent> = Query(ListContent {
source: "foo".to_string(),
parent_id: "".to_string(),
labels_eq: Some({
Expand All @@ -383,12 +389,14 @@ mod test_deserialize_labels_eq_filter {
labels_eq.insert("key2".to_string(), serde_json::json!(""));
labels_eq
}),
start_id: None,
limit: None,
});

let query_str: Uri = "http://example.com/path?source=foo&labels_eq=key:value,key2:"
.parse()
.unwrap();
let query: Query<ListContentFilters> = Query::try_from_uri(&query_str).unwrap();
let query: Query<ListContent> = Query::try_from_uri(&query_str).unwrap();
assert_eq!(query.0, expected_query.0);
}

Expand All @@ -410,7 +418,7 @@ mod test_deserialize_labels_eq_filter {
.collect();

for query_str in invalid_query_params {
let query: Result<Query<ListContentFilters>, _> = Query::try_from_uri(&query_str);
let query: Result<Query<ListContent>, _> = Query::try_from_uri(&query_str);
assert!(query.is_err(), "query should be invalid: {}", query_str);
}
}
Expand Down
17 changes: 13 additions & 4 deletions src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,17 @@ impl Coordinator {
source: &str,
parent_id: &str,
labels_eq: &HashMap<String, serde_json::Value>,
start_id: Option<String>,
limit: Option<u64>,
) -> Result<Vec<internal_api::ContentMetadata>> {
self.shared_state
.list_content(namespace, parent_id, |c| {
content_filter(c, source, labels_eq)
})
.list_content(
namespace,
parent_id,
|c| content_filter(c, source, labels_eq),
start_id,
limit,
)
.await
}

Expand Down Expand Up @@ -318,10 +324,13 @@ impl Coordinator {
&self,
namespace: &str,
extraction_policy: Option<String>,
start_id: Option<String>,
limit: Option<u64>,
content_id: Option<String>,
) -> Result<Vec<indexify_coordinator::Task>> {
let tasks = self
.shared_state
.list_tasks(namespace, extraction_policy)
.list_tasks(namespace, extraction_policy, start_id, limit, content_id)
.await?;
let tasks = tasks
.into_iter()
Expand Down
33 changes: 31 additions & 2 deletions src/coordinator_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,27 @@ impl CoordinatorService for CoordinatorServiceServer {
let labels_eq = internal_api::utils::convert_map_prost_to_serde_json(req.labels_eq)
.map_err(|e| tonic::Status::aborted(e.to_string()))?;

let start_id = if req.start_id.is_empty() {
None
} else {
Some(req.start_id)
};
let limit = if req.limit == 0 {
None
} else {
Some(req.limit)
};

let content_list = self
.coordinator
.list_content(&req.namespace, &req.source, &req.parent_id, &labels_eq)
.list_content(
&req.namespace,
&req.source,
&req.parent_id,
&labels_eq,
start_id,
limit,
)
.await
.map_err(|e| tonic::Status::aborted(e.to_string()))?;

Expand Down Expand Up @@ -927,9 +945,20 @@ impl CoordinatorService for CoordinatorServiceServer {
} else {
Some(req.extraction_policy)
};
let content_id = if req.content_id.is_empty() {
None
} else {
Some(req.content_id)
};
let tasks = self
.coordinator
.list_tasks(&req.namespace, extraction_policy)
.list_tasks(
&req.namespace,
extraction_policy,
Some(req.start_id),
Some(req.limit),
content_id,
)
.await
.map_err(|e| tonic::Status::aborted(e.to_string()))?;
Ok(Response::new(indexify_coordinator::ListTasksResponse {
Expand Down
4 changes: 4 additions & 0 deletions src/data_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ impl DataManager {
source_filter: &str,
parent_id_filter: &str,
labels_eq_filter: Option<&HashMap<String, serde_json::Value>>,
start_id: String,
limit: u64,
) -> Result<Vec<api::ContentMetadata>> {
let default_labels_eq = HashMap::new();
let labels_eq = internal_api::utils::convert_map_serde_to_prost_json(
Expand All @@ -255,6 +257,8 @@ impl DataManager {
source: source_filter.to_string(),
parent_id: parent_id_filter.to_string(),
labels_eq,
start_id,
limit,
};
let response = self
.coordinator_client
Expand Down
7 changes: 6 additions & 1 deletion src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ async fn update_labels(
async fn list_content(
Path(namespace): Path<String>,
State(state): State<NamespaceEndpointState>,
filter: Query<super::api::ListContentFilters>,
filter: Query<super::api::ListContent>,
) -> Result<Json<ListContentResponse>, IndexifyAPIError> {
let content_list = state
.data_manager
Expand All @@ -724,6 +724,8 @@ async fn list_content(
&filter.source,
&filter.parent_id,
filter.labels_eq.as_ref(),
filter.start_id.clone().unwrap_or_default(),
filter.limit.unwrap_or(10),
)
.await
.map_err(IndexifyAPIError::internal_error)?;
Expand Down Expand Up @@ -1239,6 +1241,9 @@ async fn list_tasks(
.list_tasks(ListTasksRequest {
namespace: namespace.clone(),
extraction_policy: query.extraction_policy.unwrap_or("".to_string()),
start_id: query.start_id.clone().unwrap_or_default(),
limit: query.limit.unwrap_or(10),
content_id: query.content_id.unwrap_or_default(),
})
.await
.map_err(|e| IndexifyAPIError::new(StatusCode::INTERNAL_SERVER_ERROR, e.message()))?
Expand Down
39 changes: 16 additions & 23 deletions src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,9 +531,11 @@ impl App {
namespace: &str,
parent_id: &str,
predicate: impl Fn(&internal_api::ContentMetadata) -> bool,
start_id: Option<String>,
limit: Option<u64>,
) -> Result<Vec<internal_api::ContentMetadata>> {
self.state_machine
.list_content(namespace, parent_id, predicate)
.list_content(namespace, parent_id, predicate, start_id, limit)
}

pub async fn remove_executor(&self, executor_id: &str) -> Result<()> {
Expand Down Expand Up @@ -1086,26 +1088,13 @@ impl App {
&self,
namespace: &str,
extraction_policy: Option<String>,
start_id: Option<String>,
limit: Option<u64>,
content_id: Option<String>,
) -> Result<Vec<internal_api::Task>> {
let tasks: Vec<internal_api::Task> = self
.state_machine
.get_all_rows_from_cf::<internal_api::Task>(StateMachineColumns::Tasks)
.await?
.into_iter()
.map(|(_, value)| value)
.collect();
let filtered_tasks = tasks
.iter()
.filter(|task| task.namespace == namespace)
.filter(|task| {
extraction_policy
.as_ref()
.map(|eb| eb == &task.extraction_policy_id)
.unwrap_or(true)
})
.cloned()
.collect();
Ok(filtered_tasks)
self.state_machine
.list_tasks(namespace, extraction_policy, start_id, limit, content_id)
.await
}

pub async fn update_labels(
Expand Down Expand Up @@ -1671,9 +1660,13 @@ mod tests {

// Read the content back
let read_content = node
.list_content(&content_metadata_vec.first().unwrap().namespace, "", |_| {
true
})
.list_content(
&content_metadata_vec.first().unwrap().namespace,
"",
|_| true,
None,
None,
)
.await
.unwrap();
assert_eq!(read_content.len(), content_size);
Expand Down
Loading

0 comments on commit 1c21867

Please sign in to comment.