Skip to content

Commit

Permalink
feat(core): Implement list with deleted for s3 service (#5498)
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuanwo authored Jan 2, 2025
1 parent 7a83013 commit 0146a12
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 127 deletions.
20 changes: 20 additions & 0 deletions core/src/raw/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ pub struct OpList {
///
/// Default to `false`
versions: bool,
/// The deleted is used to control whether the deleted objects should be returned.
///
/// - If `false`, list operation will not return with deleted objects
/// - If `true`, list operation will return with deleted objects if object versioning is supported
/// by the underlying service
///
/// Default to `false`
deleted: bool,
}

impl Default for OpList {
Expand All @@ -122,6 +130,7 @@ impl Default for OpList {
recursive: false,
concurrent: 1,
versions: false,
deleted: false,
}
}
}
Expand Down Expand Up @@ -206,6 +215,17 @@ impl OpList {
pub fn versions(&self) -> bool {
self.versions
}

/// Change the deleted of this list operation
pub fn with_deleted(mut self, deleted: bool) -> Self {
self.deleted = deleted;
self
}

/// Get the deleted of this list operation
pub fn deleted(&self) -> bool {
self.deleted
}
}

/// Args for `presign` operation.
Expand Down
11 changes: 4 additions & 7 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,7 @@ impl Access for S3Backend {
list_with_start_after: true,
list_with_recursive: true,
list_with_versions: self.core.enable_versioning,
list_with_deleted: self.core.enable_versioning,
list_has_etag: true,
list_has_content_md5: true,
list_has_content_length: true,
Expand Down Expand Up @@ -1060,21 +1061,17 @@ impl Access for S3Backend {
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let l = if args.versions() {
let l = if args.versions() || args.deleted() {
TwoWays::Two(PageLister::new(S3ObjectVersionsLister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
args,
)))
} else {
TwoWays::One(PageLister::new(S3Lister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
args,
)))
};

Expand Down
31 changes: 30 additions & 1 deletion core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,6 @@ impl S3Core {
write!(url, "&max-keys={limit}").expect("write into string must succeed");
}
if let Some(start_after) = start_after {
let start_after = build_abs_path(&self.root, &start_after);
write!(url, "&start-after={}", percent_encode_path(&start_after))
.expect("write into string must succeed");
}
Expand Down Expand Up @@ -994,6 +993,7 @@ pub struct ListObjectVersionsOutput {
pub next_version_id_marker: Option<String>,
pub common_prefixes: Vec<OutputCommonPrefix>,
pub version: Vec<ListObjectVersionsOutputVersion>,
pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
}

#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
Expand All @@ -1008,6 +1008,15 @@ pub struct ListObjectVersionsOutputVersion {
pub etag: Option<String>,
}

#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ListObjectVersionsOutputDeleteMarker {
pub key: String,
pub version_id: String,
pub is_latest: bool,
pub last_modified: String,
}

pub enum ChecksumAlgorithm {
Crc32c,
}
Expand Down Expand Up @@ -1284,6 +1293,16 @@ mod tests {
<CommonPrefixes>
<Prefix>videos/</Prefix>
</CommonPrefixes>
<DeleteMarker>
<Key>my-third-image.jpg</Key>
<VersionId>03jpff543dhffds434rfdsFDN943fdsFkdmqnh892</VersionId>
<IsLatest>true</IsLatest>
<LastModified>2009-10-15T17:50:30.000Z</LastModified>
<Owner>
<ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
<DisplayName>mtd@amazon.com</DisplayName>
</Owner>
</DeleteMarker>
</ListVersionsResult>"#,
);

Expand Down Expand Up @@ -1329,5 +1348,15 @@ mod tests {
}
]
);

assert_eq!(
output.delete_marker,
vec![ListObjectVersionsOutputDeleteMarker {
key: "my-third-image.jpg".to_owned(),
version_id: "03jpff543dhffds434rfdsFDN943fdsFkdmqnh892".to_owned(),
is_latest: true,
last_modified: "2009-10-15T17:50:30.000Z".to_owned(),
},]
);
}
}
117 changes: 65 additions & 52 deletions core/src/services/s3/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,26 @@ pub struct S3Lister {
core: Arc<S3Core>,

path: String,
delimiter: &'static str,
limit: Option<usize>,
args: OpList,

/// Amazon S3 starts listing **after** this specified key
start_after: Option<String>,
delimiter: &'static str,
abs_start_after: Option<String>,
}

impl S3Lister {
pub fn new(
core: Arc<S3Core>,
path: &str,
recursive: bool,
limit: Option<usize>,
start_after: Option<&str>,
) -> Self {
let delimiter = if recursive { "" } else { "/" };
pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
let delimiter = if args.recursive() { "" } else { "/" };
let abs_start_after = args
.start_after()
.map(|start_after| build_abs_path(&core.root, start_after));

Self {
core,

path: path.to_string(),
args,
delimiter,
limit,
start_after: start_after.map(String::from),
abs_start_after,
}
}
}
Expand All @@ -70,10 +67,10 @@ impl oio::PageList for S3Lister {
&self.path,
&ctx.token,
self.delimiter,
self.limit,
self.args.limit(),
// start after should only be set for the first page.
if ctx.token.is_empty() {
self.start_after.clone()
self.abs_start_after.clone()
} else {
None
},
Expand Down Expand Up @@ -143,35 +140,29 @@ impl oio::PageList for S3Lister {
}
}

// refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html
/// refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html
pub struct S3ObjectVersionsLister {
core: Arc<S3Core>,

prefix: String,
args: OpList,

delimiter: &'static str,
limit: Option<usize>,
start_after: String,
abs_start_after: String,
abs_start_after: Option<String>,
}

impl S3ObjectVersionsLister {
pub fn new(
core: Arc<S3Core>,
path: &str,
recursive: bool,
limit: Option<usize>,
start_after: Option<&str>,
) -> Self {
let delimiter = if recursive { "" } else { "/" };
let start_after = start_after.unwrap_or_default().to_owned();
let abs_start_after = build_abs_path(core.root.as_str(), start_after.as_str());
pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
let delimiter = if args.recursive() { "" } else { "/" };
let abs_start_after = args
.start_after()
.map(|start_after| build_abs_path(&core.root, start_after));

Self {
core,
prefix: path.to_string(),
args,
delimiter,
limit,
start_after,
abs_start_after,
}
}
Expand All @@ -182,8 +173,8 @@ impl oio::PageList for S3ObjectVersionsLister {
let markers = ctx.token.rsplit_once(" ");
let (key_marker, version_id_marker) = if let Some(data) = markers {
data
} else if !self.start_after.is_empty() {
(self.abs_start_after.as_str(), "")
} else if let Some(start_after) = &self.abs_start_after {
(start_after.as_str(), "")
} else {
("", "")
};
Expand All @@ -193,7 +184,7 @@ impl oio::PageList for S3ObjectVersionsLister {
.s3_list_object_versions(
&self.prefix,
self.delimiter,
self.limit,
self.args.limit(),
key_marker,
version_id_marker,
)
Expand Down Expand Up @@ -231,26 +222,48 @@ impl oio::PageList for S3ObjectVersionsLister {
ctx.entries.push_back(de);
}

for version_object in output.version {
let mut path = build_rel_path(&self.core.root, &version_object.key);
if path.is_empty() {
path = "/".to_owned();
if self.args.versions() {
for version_object in output.version {
let mut path = build_rel_path(&self.core.root, &version_object.key);
if path.is_empty() {
path = "/".to_owned();
}

let mut meta = Metadata::new(EntryMode::from_path(&path));
meta.set_version(&version_object.version_id);
meta.set_is_current(version_object.is_latest);
meta.set_content_length(version_object.size);
meta.set_last_modified(parse_datetime_from_rfc3339(
version_object.last_modified.as_str(),
)?);
if let Some(etag) = version_object.etag {
meta.set_etag(&etag);
meta.set_content_md5(etag.trim_matches('"'));
}

let entry = oio::Entry::new(&path, meta);
ctx.entries.push_back(entry);
}
}

let mut meta = Metadata::new(EntryMode::from_path(&path));
meta.set_version(&version_object.version_id);
meta.set_is_current(version_object.is_latest);
meta.set_content_length(version_object.size);
meta.set_last_modified(parse_datetime_from_rfc3339(
version_object.last_modified.as_str(),
)?);
if let Some(etag) = version_object.etag {
meta.set_etag(&etag);
meta.set_content_md5(etag.trim_matches('"'));
if self.args.deleted() {
for delete_marker in output.delete_marker {
let mut path = build_rel_path(&self.core.root, &delete_marker.key);
if path.is_empty() {
path = "/".to_owned();
}

let mut meta = Metadata::new(EntryMode::FILE);
meta.set_version(&delete_marker.version_id);
meta.set_is_deleted(true);
meta.set_is_current(delete_marker.is_latest);
meta.set_last_modified(parse_datetime_from_rfc3339(
delete_marker.last_modified.as_str(),
)?);

let entry = oio::Entry::new(&path, meta);
ctx.entries.push_back(entry);
}

let entry = oio::Entry::new(&path, meta);
ctx.entries.push_back(entry);
}

Ok(())
Expand Down
4 changes: 3 additions & 1 deletion core/src/types/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,10 @@ pub struct Capability {
/// Indicates if versions listing is supported.
#[deprecated(since = "0.51.1", note = "use with_versions instead")]
pub list_with_version: bool,
/// Indicates if versions listing is supported.
/// Indicates if listing with versions included is supported.
pub list_with_versions: bool,
/// Indicates if listing with deleted files included is supported.
pub list_with_deleted: bool,
/// Indicates whether cache control information is available in list response
pub list_has_cache_control: bool,
/// Indicates whether content disposition information is available in list response
Expand Down
Loading

0 comments on commit 0146a12

Please sign in to comment.