Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
cirospaciari committed Dec 31, 2024
1 parent d806aa3 commit b97919f
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 36 deletions.
109 changes: 74 additions & 35 deletions src/s3.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1487,17 +1487,18 @@ pub const AWSCredentials = struct {
pub fn resolve(result: S3UploadResult, self: *@This()) void {
const sink = self.sink;
defer self.deref();

if (sink.endPromise.globalObject()) |globalObject| {
switch (result) {
.success => sink.endPromise.resolve(globalObject, JSC.jsNumber(0)),
.failure => |err| {
if (!sink.done) {
sink.abort();
return;
}
sink.endPromise.rejectOnNextTick(globalObject, err.toJS(globalObject));
},
if (sink.endPromise.hasValue()) {
if (sink.endPromise.globalObject()) |globalObject| {
switch (result) {
.success => sink.endPromise.resolve(globalObject, JSC.jsNumber(0)),
.failure => |err| {
if (!sink.done) {
sink.abort();
return;
}
sink.endPromise.rejectOnNextTick(globalObject, err.toJS(globalObject));
},
}
}
}
if (self.callback) |callback| {
Expand Down Expand Up @@ -1576,6 +1577,7 @@ pub const AWSCredentials = struct {
.callback = @ptrCast(&S3UploadStreamWrapper.resolve),
.callback_context = undefined,
.globalThis = globalThis,
.state = .wait_stream_check,
.options = options,
.vm = JSC.VirtualMachine.get(),
});
Expand Down Expand Up @@ -1622,14 +1624,14 @@ pub const AWSCredentials = struct {
bun.assert(!signal.isDead());

if (assignment_result.toError()) |err| {
readable_stream.cancel(globalThis);
if (response_stream.sink.endPromise.hasValue()) {
response_stream.sink.endPromise.rejectOnNextTick(globalThis, err);
}
task.fail(.{
.code = "UnknownError",
.message = "ReadableStream ended with an error",
});
readable_stream.cancel(globalThis);
return endPromise;
}

Expand All @@ -1641,6 +1643,7 @@ pub const AWSCredentials = struct {
if (assignment_result.asAnyPromise()) |promise| {
switch (promise.status(globalThis.vm())) {
.pending => {
task.continueStream();
ctx.ref();
assignment_result.then(
globalThis,
Expand All @@ -1650,31 +1653,32 @@ pub const AWSCredentials = struct {
);
},
.fulfilled => {
readable_stream.done(globalThis);
task.continueStream();
if (response_stream.sink.endPromise.hasValue()) {
response_stream.sink.endPromise.resolve(globalThis, JSC.jsNumber(0));
}
readable_stream.done(globalThis);
},
.rejected => {
readable_stream.cancel(globalThis);
if (response_stream.sink.endPromise.hasValue()) {
response_stream.sink.endPromise.rejectOnNextTick(globalThis, promise.result(globalThis.vm()));
}
task.fail(.{
.code = "UnknownError",
.message = "ReadableStream ended with an error",
});
readable_stream.cancel(globalThis);
},
}
} else {
readable_stream.cancel(globalThis);
if (response_stream.sink.endPromise.hasValue()) {
response_stream.sink.endPromise.rejectOnNextTick(globalThis, assignment_result);
}
task.fail(.{
.code = "UnknownError",
.message = "ReadableStream ended with an error",
});
readable_stream.cancel(globalThis);
}
}
return endPromise;
Expand All @@ -1683,22 +1687,24 @@ pub const AWSCredentials = struct {
pub fn s3WritableStream(this: *@This(), path: []const u8, globalThis: *JSC.JSGlobalObject, options: MultiPartUpload.MultiPartUploadOptions, content_type: ?[]const u8, proxy: ?[]const u8) bun.JSError!JSC.JSValue {
const Wrapper = struct {
pub fn callback(result: S3UploadResult, sink: *JSC.WebCore.FetchTaskletChunkedRequestSink) void {
if (sink.endPromise.globalObject()) |globalObject| {
const event_loop = globalObject.bunVM().eventLoop();
event_loop.enter();
defer event_loop.exit();
switch (result) {
.success => {
sink.endPromise.resolve(globalObject, JSC.jsNumber(0));
},
.failure => |err| {
if (!sink.done) {
sink.abort();
return;
}
if (sink.endPromise.hasValue()) {
if (sink.endPromise.globalObject()) |globalObject| {
const event_loop = globalObject.bunVM().eventLoop();
event_loop.enter();
defer event_loop.exit();
switch (result) {
.success => {
sink.endPromise.resolve(globalObject, JSC.jsNumber(0));
},
.failure => |err| {
if (!sink.done) {
sink.abort();
return;
}

sink.endPromise.rejectOnNextTick(globalObject, err.toJS(globalObject));
},
sink.endPromise.rejectOnNextTick(globalObject, err.toJS(globalObject));
},
}
}
}
sink.finalize();
Expand Down Expand Up @@ -1777,6 +1783,7 @@ pub const MultiPartUpload = struct {
multipart_upload_list: bun.ByteList = .{},

state: enum {
wait_stream_check,
not_started,
multipart_started,
multipart_completed,
Expand Down Expand Up @@ -1829,7 +1836,7 @@ pub const MultiPartUpload = struct {
}

pub fn onPartResponse(result: AWS.S3PartResult, this: *@This()) void {
if (this.state == .canceled) {
if (this.state == .canceled or this.ctx.state == .finished) {
log("onPartResponse {} canceled", .{this.partNumber});
if (this.owns_data) bun.default_allocator.free(this.data);
this.ctx.deref();
Expand Down Expand Up @@ -1887,7 +1894,7 @@ pub const MultiPartUpload = struct {
}, .{ .part = @ptrCast(&onPartResponse) }, this);
}
pub fn start(this: *@This()) void {
if (this.state != .pending or this.ctx.state != .multipart_completed) return;
if (this.state != .pending or this.ctx.state != .multipart_completed or this.ctx.state == .finished) return;
this.ctx.ref();
this.state = .started;
this.perform();
Expand Down Expand Up @@ -1933,11 +1940,14 @@ pub const MultiPartUpload = struct {
}

pub fn singleSendUploadResponse(result: AWS.S3UploadResult, this: *@This()) void {
defer this.deref();
if (this.state == .finished) return;
switch (result) {
.failure => |err| {
if (this.options.retry > 0) {
log("singleSendUploadResponse {} retry", .{this.options.retry});
this.options.retry -= 1;
this.ref();
// retry failed
this.credentials.executeSimpleS3Request(.{
.path = this.path,
Expand Down Expand Up @@ -1998,6 +2008,9 @@ pub const MultiPartUpload = struct {
}

fn drainEnqueuedParts(this: *@This()) void {
if (this.state == .finished) {
return;
}
// check pending to start or transformed buffered ones into tasks
if (this.state == .multipart_completed) {
for (this.queue.items) |*part| {
Expand All @@ -2019,13 +2032,16 @@ pub const MultiPartUpload = struct {
}
pub fn fail(this: *@This(), _err: AWS.S3Error) void {
log("fail {s}:{s}", .{ _err.code, _err.message });
this.ended = true;
for (this.queue.items) |*task| {
task.cancel();
}
if (this.state != .finished) {
this.callback(.{ .failure = _err }, this.callback_context);
const old_state = this.state;
this.state = .finished;
if (this.state == .multipart_completed) {
this.callback(.{ .failure = _err }, this.callback_context);

if (old_state == .multipart_completed) {
// will deref after rollback
this.rollbackMultiPartRequest();
} else {
Expand Down Expand Up @@ -2057,6 +2073,8 @@ pub const MultiPartUpload = struct {
}
}
pub fn startMultiPartRequestResult(result: AWS.S3DownloadResult, this: *@This()) void {
defer this.deref();
if (this.state == .finished) return;
switch (result) {
.failure => |err| {
log("startMultiPartRequestResult {s} failed {s}: {s}", .{ this.path, err.message, err.message });
Expand Down Expand Up @@ -2094,6 +2112,10 @@ pub const MultiPartUpload = struct {

pub fn onCommitMultiPartRequest(result: AWS.S3CommitResult, this: *@This()) void {
log("onCommitMultiPartRequest {s}", .{this.upload_id});
if (this.state == .finished) {
this.deinit();
return;
}
switch (result) {
.failure => |err| {
if (this.options.retry > 0) {
Expand Down Expand Up @@ -2137,6 +2159,7 @@ pub const MultiPartUpload = struct {
const searchParams = std.fmt.bufPrint(&params_buffer, "?uploadId={s}", .{
this.upload_id,
}) catch unreachable;
this.ref();

this.credentials.executeSimpleS3Request(.{
.path = this.path,
Expand All @@ -2152,6 +2175,7 @@ pub const MultiPartUpload = struct {
const search_params = std.fmt.bufPrint(&params_buffer, "?uploadId={s}", .{
this.upload_id,
}) catch unreachable;
this.ref();

this.credentials.executeSimpleS3Request(.{
.path = this.path,
Expand All @@ -2167,6 +2191,7 @@ pub const MultiPartUpload = struct {
if (this.state == .not_started) {
// will auto start later
this.state = .multipart_started;
this.ref();
this.credentials.executeSimpleS3Request(.{
.path = this.path,
.method = .POST,
Expand Down Expand Up @@ -2211,6 +2236,7 @@ pub const MultiPartUpload = struct {
if (this.ended and this.buffered.items.len < this.partSizeInBytes() and this.state == .not_started) {
log("processBuffered {s} singlefile_started", .{this.path});
this.state = .singlefile_started;
this.ref();
// we can do only 1 request
this.credentials.executeSimpleS3Request(.{
.path = this.path,
Expand All @@ -2229,9 +2255,22 @@ pub const MultiPartUpload = struct {
return this.options.partSize * OneMiB;
}

pub fn continueStream(this: *@This()) void {
if (this.state == .wait_stream_check) {
this.state = .not_started;
if (this.ended) {
this.processBuffered(this.partSizeInBytes());
}
}
}

pub fn sendRequestData(this: *@This(), chunk: []const u8, is_last: bool) void {
if (this.ended) return;

if (this.state == .wait_stream_check and chunk.len == 0 and is_last) {
// we do this because stream will close if the file dont exists and we dont wanna to send an empty part in this case
this.ended = true;
return;
}
if (is_last) {
this.ended = true;
if (chunk.len > 0) {
Expand Down
21 changes: 20 additions & 1 deletion test/js/bun/s3/s3.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,26 @@ describe.skipIf(!s3Options.accessKeyId)("s3", () => {
});
});
}

describe("errors", () => {
it("Bun.write(s3file, file) should throw if the file does not exist", async () => {
try {
await Bun.write(s3("test.txt", s3Options), file("./do-not-exist.txt"));
expect.unreachable();
} catch (e: any) {
expect(e?.code).toBe("ENOENT");
expect(e?.path).toBe("./do-not-exist.txt");
expect(e?.syscall).toBe("open");
}
});
it("Bun.write(s3file, file) should throw if the file does not exist", async () => {
try {
await Bun.write(s3("test.txt", s3Options), s3("do-not-exist.txt", s3Options));
expect.unreachable();
} catch (e: any) {
expect(e?.code).toBe("NoSuchKey");
}
});
});
describe("credentials", () => {
it("should error with invalid access key id", async () => {
[s3, (...args) => new S3(...args), file].forEach(fn => {
Expand Down

0 comments on commit b97919f

Please sign in to comment.