Skip to content

Commit

Permalink
@uppy/companion: fix double tus uploads (#4816)
Browse files Browse the repository at this point in the history
* fix double uploads

fixes #4815
also improve error logging when canceling

* don't close socket when pausing

because it prevented us from canceling an upload that is paused
note that we here sacrifice the ability to pause uploads and have other uploads take over the paused upload's rate limit queue spot.
this is sacrificed because we will soon remove the ability to pause/resume uploads anyways

* Update packages/@uppy/companion/src/server/Uploader.js

Co-authored-by: Merlijn Vos <merlijn@soverin.net>

* Revert "don't close socket when pausing"

This reverts commit e4dbe77.

---------

Co-authored-by: Merlijn Vos <merlijn@soverin.net>
  • Loading branch information
mifi and Murderlon authored Dec 12, 2023
1 parent de98701 commit df36e12
Showing 1 changed file with 34 additions and 23 deletions.
57 changes: 34 additions & 23 deletions packages/@uppy/companion/src/server/Uploader.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ function sanitizeMetadata(inputMetadata) {
return outputMetadata
}

class AbortError extends Error {
isAbortError = true
}

class ValidationError extends Error {
constructor(message) {
super(message)
Expand Down Expand Up @@ -139,6 +135,13 @@ function validateOptions(options) {
}
}

const states = {
idle: 'idle',
uploading: 'uploading',
paused: 'paused',
done: 'done',
}

class Uploader {
/**
* Uploads file to destination based on the supplied protocol (tus, s3-multipart, multipart)
Expand Down Expand Up @@ -176,10 +179,7 @@ class Uploader {
? this.options.metadata.name.substring(0, MAX_FILENAME_LENGTH)
: this.fileName

this.uploadStopped = false

this.storage = options.storage
this._paused = false

this.downloadedBytes = 0

Expand All @@ -188,15 +188,17 @@ class Uploader {
if (this.options.protocol === PROTOCOLS.tus) {
emitter().on(`pause:${this.token}`, () => {
logger.debug('Received from client: pause', 'uploader', this.shortToken)
this._paused = true
if (this.#uploadState !== states.uploading) return
this.#uploadState = states.paused
if (this.tus) {
this.tus.abort()
}
})

emitter().on(`resume:${this.token}`, () => {
logger.debug('Received from client: resume', 'uploader', this.shortToken)
this._paused = false
if (this.#uploadState !== states.paused) return
this.#uploadState = states.uploading
if (this.tus) {
this.tus.start()
}
Expand All @@ -205,17 +207,21 @@ class Uploader {

emitter().on(`cancel:${this.token}`, () => {
logger.debug('Received from client: cancel', 'uploader', this.shortToken)
this._paused = true
if (this.tus) {
const shouldTerminate = !!this.tus.url
this.tus.abort(shouldTerminate).catch(() => { })
}
this.abortReadStream(new AbortError())
this.#canceled = true
this.abortReadStream(new Error('Canceled'))
})
}

#uploadState = states.idle

#canceled = false

abortReadStream(err) {
this.uploadStopped = true
this.#uploadState = states.done
if (this.readStream) this.readStream.destroy(err)
}

Expand Down Expand Up @@ -244,7 +250,9 @@ class Uploader {

const onData = (chunk) => {
this.downloadedBytes += chunk.length
if (exceedsMaxFileSize(this.options.companionOptions.maxFileSize, this.downloadedBytes)) this.abortReadStream(new Error('maxFileSize exceeded'))
if (exceedsMaxFileSize(this.options.companionOptions.maxFileSize, this.downloadedBytes)) {
this.abortReadStream(new Error('maxFileSize exceeded'))
}
this.onProgress(0, undefined)
}

Expand All @@ -271,9 +279,11 @@ class Uploader {
*/
async uploadStream(stream) {
try {
if (this.uploadStopped) throw new Error('Cannot upload stream after upload stopped')
if (this.#uploadState !== states.idle) throw new Error('Can only start an upload in the idle state')
if (this.readStream) throw new Error('Already uploading')

this.#uploadState = states.uploading

this.readStream = stream
if (this._needDownloadFirst()) {
logger.debug('need to download the whole file first', 'controller.get.provider.size', this.shortToken)
Expand All @@ -282,7 +292,7 @@ class Uploader {
// The stream will then typically come from a "Transfer-Encoding: chunked" response
await this._downloadStreamAsFile(this.readStream)
}
if (this.uploadStopped) return undefined
if (this.#uploadState !== states.uploading) return undefined

const { url, extraData } = await Promise.race([
this._uploadByProtocol(),
Expand All @@ -291,6 +301,7 @@ class Uploader {
])
return { url, extraData }
} finally {
this.#uploadState = states.done
logger.debug('cleanup', this.shortToken)
if (this.readStream && !this.readStream.destroyed) this.readStream.destroy()
await this.tryDeleteTmpPath()
Expand All @@ -314,11 +325,10 @@ class Uploader {
const { url, extraData } = ret
this.#emitSuccess(url, extraData)
} catch (err) {
if (err?.isAbortError) {
if (this.#canceled) {
logger.error('Aborted upload', 'uploader.aborted', this.shortToken)
return
}
// console.log(err)
logger.error(err, 'uploader.error', this.shortToken)
this.#emitError(err)
} finally {
Expand Down Expand Up @@ -458,7 +468,7 @@ class Uploader {

const formattedPercentage = percentage.toFixed(2)

if (this._paused || this.uploadStopped) {
if (this.#uploadState !== states.uploading) {
return
}

Expand Down Expand Up @@ -519,7 +529,8 @@ class Uploader {
const chunkSize = this.options.chunkSize || (isFileStream ? Infinity : 50e6)

return new Promise((resolve, reject) => {
this.tus = new tus.Upload(stream, {

const tusOptions = {
endpoint: this.options.endpoint,
uploadUrl: this.options.uploadUrl,
uploadLengthDeferred: !isFileStream,
Expand Down Expand Up @@ -564,11 +575,11 @@ class Uploader {
onSuccess() {
resolve({ url: uploader.tus.url })
},
})

if (!this._paused) {
this.tus.start()
}

this.tus = new tus.Upload(stream, tusOptions)

this.tus.start()
})
}

Expand Down

0 comments on commit df36e12

Please sign in to comment.