Skip to content

Commit

Permalink
fix(storage): Only allow 1 batch to run at a time (#5704)
Browse files Browse the repository at this point in the history
  • Loading branch information
tyllark authored Nov 25, 2024
1 parent bc314f6 commit 371e0a4
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -547,11 +547,15 @@ class S3UploadTask {
}
}

bool _isNextBatchWaiting = false;
Future<void> _startNextUploadPartsBatch({
bool resumingFromPause = false,
}) async {
// await for previous batching to complete (if any)
if (_isNextBatchWaiting) return;
_isNextBatchWaiting = true;
await _uploadPartBatchingCompleted;
_isNextBatchWaiting = false;

if (_state != StorageTransferState.inProgress) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

import 'dart:async';
import 'dart:math';
import 'dart:typed_data';

import 'package:amplify_core/amplify_core.dart';
Expand Down Expand Up @@ -1238,6 +1239,108 @@ void main() {
expect(finalState, StorageTransferState.failure);
});

test('should handle async gaps when reading from Multipart file',
() async {
late StorageTransferState finalState;

//completeMultipartUploadSmithyOperation
final testCompleteMultipartUploadOutput =
s3.CompleteMultipartUploadOutput();
final completeMultipartUploadSmithyOperation =
MockSmithyOperation<s3.CompleteMultipartUploadOutput>();
when(
() => completeMultipartUploadSmithyOperation.result,
).thenAnswer(
(_) async => testCompleteMultipartUploadOutput,
);

//uploadPartSmithyOperation
final testUploadPartOutput = s3.UploadPartOutput(eTag: 'eTag-part-1');
final uploadPartSmithyOperation =
MockSmithyOperation<s3.UploadPartOutput>();
when(
() => uploadPartSmithyOperation.result,
).thenAnswer(
(_) async => testUploadPartOutput,
);

//createMultipartUploadSmithyOperation
final testCreateMultipartUploadOutput = s3.CreateMultipartUploadOutput(
uploadId: 'uploadId', // response should always contain valid uploadId
);
final createMultipartUploadSmithyOperation =
MockSmithyOperation<s3.CreateMultipartUploadOutput>();
when(
() => createMultipartUploadSmithyOperation.result,
).thenAnswer(
(_) async => testCreateMultipartUploadOutput,
);

//s3Client
when(
() => s3Client.completeMultipartUpload(any()),
).thenAnswer((_) => completeMultipartUploadSmithyOperation);
when(
() => s3Client.uploadPart(
any(),
s3ClientConfig: any(named: 's3ClientConfig'),
),
).thenAnswer(
(_) => uploadPartSmithyOperation,
);
when(
() => s3Client.createMultipartUpload(any()),
).thenAnswer(
(_) => createMultipartUploadSmithyOperation,
);

//transferDatabase
when(
() => transferDatabase.insertTransferRecord(any<TransferRecord>()),
).thenAnswer(
(_) async => '1',
);
when(
() => transferDatabase.deleteTransferRecords(any()),
).thenAnswer(
(_) async => 1,
);

final bytes = List<int>.filled(
(32 * pow(2, 20)).toInt(),
0,
);
final mockFile = AWSFile.fromStream(
Stream.value(bytes),
size: bytes.length,
contentType: 'image/jpeg',
);

final uploadTask = S3UploadTask.fromAWSFile(
mockFile,
s3Client: s3Client,
defaultS3ClientConfig: defaultS3ClientConfig,
pathResolver: pathResolver,
bucket: testBucket,
path: const StoragePath.fromString(testKey),
options: testUploadDataOptions,
logger: logger,
transferDatabase: transferDatabase,
onProgress: (progress) {
finalState = progress.state;
},
);

unawaited(uploadTask.start());

await uploadTask.result;

expect(
finalState,
StorageTransferState.success,
);
});

test(
'should complete with StorageAccessDeniedException when CreateMultipartUploadRequest'
' returned UnknownSmithyHttpException with status code 403',
Expand Down

0 comments on commit 371e0a4

Please sign in to comment.