From 60d4ba06eb204bb22aa3d825953ef295f62bc7b5 Mon Sep 17 00:00:00 2001 From: Jan Max Meyer Date: Mon, 4 Mar 2024 17:03:58 +0100 Subject: [PATCH] Implement and use File.valid_filename function Avoids to upload or rename files with invalid filenames. --- src/viur/core/modules/file.py | 148 +++++++++++++++++++--------------- 1 file changed, 84 insertions(+), 64 deletions(-) diff --git a/src/viur/core/modules/file.py b/src/viur/core/modules/file.py index a886f1afc..8ee638f7d 100644 --- a/src/viur/core/modules/file.py +++ b/src/viur/core/modules/file.py @@ -8,6 +8,7 @@ import json import logging import PIL +import re import requests import string import typing as t @@ -26,6 +27,12 @@ # Globals for connectivity +VALID_FILENAME_REGEX = re.compile( + # || MAY NOT BE THE NAME | MADE OF SPECIAL CHARS | SPECIAL CHARS + `. `|` + r"^(?!^(CON|PRN|AUX|NUL|COM[1-9]|LPT[1-9])$)[^\x00-\x1F<>:\"\/\\|?*]*[^\x00-\x1F<>:\"\/\\|?*. ]$", + re.IGNORECASE +) + _credentials, __project_id = google.auth.default() client = storage.Client(__project_id, _credentials) bucket = client.lookup_bucket(f"""{__project_id}.appspot.com""") @@ -33,6 +40,7 @@ # FilePath is a descriptor for ViUR file components FilePath = namedtuple("FilePath", ("dlkey", "is_derived", "filename")) + def importBlobFromViur2(dlKey, fileName): if not conf.viur2import_blobsource: return False @@ -174,11 +182,11 @@ def getsignedurl(): authRequest = google.auth.transport.requests.Request() expiresAt = datetime.datetime.now() + datetime.timedelta(seconds=60) signing_credentials = google.auth.compute_engine.IDTokenCredentials(authRequest, "") - contentDisposition = f"""filename={fileSkel["name"]}""" + content_disposition = f"""filename={fileSkel["name"]}""" signedUrl = blob.generate_signed_url( expiresAt, credentials=signing_credentials, - response_disposition=contentDisposition, + response_disposition=content_disposition, version="v4") return signedUrl @@ -292,7 +300,8 @@ class FileLeafSkel(TreeSkel): name = StringBone( descr="Filename", caseSensitive=False, - searchable=True + searchable=True, + vfunc=lambda val: None if File.valid_filename(val) else "Invalid name provided" ) mimetype = StringBone( @@ -403,15 +412,15 @@ class File(Tree): # Helper functions currently resist here @staticmethod - def sanitize_filename(filename: str) -> str: + def valid_filename(filename: str) -> bool: """ - Sanitize the filename so it can be safely downloaded or be embedded into html + Verifies for a valid filename. + The filename should be valid on Linux, Mac OS and Windows. + + Rule set: https://stackoverflow.com/a/31976060/3749896 + Regex test: https://regex101.com/r/iBYpoC/1 """ - filename = filename[:100] # Limit to 100 Chars max - filename = "".join(ch for ch in filename if ch not in "\0'\"<>\n;$&?#:;/\\") # Remove invalid Chars - filename = filename.strip(".") # Ensure the filename does not start or end with a dot - filename = urlquote(filename) # Finally quote any non-ASCII characters - return filename + return bool(re.match(VALID_FILENAME_REGEX, filename)) @staticmethod def hmac_sign(data: t.Any) -> str: @@ -451,11 +460,16 @@ def create_download_url( # Undo escaping on ()= performed on fileNames filename = filename.replace("(", "(").replace(")", ")").replace("=", "=") filepath = f"""{dlkey}/{"derived" if derived else "source"}/{filename}""" - download_filename = File.sanitize_filename(download_filename) if download_filename else "" + + if download_filename: + if not File.valid_filename(download_filename): + raise errors.UnprocessableEntity(f"Invalid download_filename {download_filename!r} provided") + + download_filename = urlquote(download_filename) expires = (datetime.datetime.now() + expires).strftime("%Y%m%d%H%M") if expires else 0 - data = base64.urlsafe_b64encode(f"{filepath}\0{expires}\0{download_filename}".encode("UTF-8")) + data = base64.urlsafe_b64encode(f"""{filepath}\0{expires}\0{download_filename or ""}""".encode("UTF-8")) sig = File.hmac_sign(data) return f"""{File.DOWNLOAD_URL_PREFIX}{data.decode("ASCII")}?sig={sig}""" @@ -583,6 +597,9 @@ def write(self, filename: str, content: t.Any, mimetype: str = "text/plain", wid :return: Returns the key of the file object written. This can be associated e.g. with a FileBone. """ + if not File.valid_filename(filename): + raise ValueError(f"{filename=} is invalid") + dl_key = utils.string.random() blob = bucket.blob(f"{dl_key}/source/{filename}") @@ -651,19 +668,10 @@ def getUploadURL( authData: t.Optional[str] = None, authSig: t.Optional[str] = None ): - # Validate the filename from the client seems legit - # This applies the rules defined by sanitize_filename(), - # to allow for "secure" filenames only. - filename = fileName.strip() + filename = fileName.strip() # VIUR4 FIXME: just for compatiblity of the parameter names - if not ( - filename - and len(filename) <= 100 - and all(ch not in filename for ch in "\0'\"<>\n;$&?#:;/\\") - and filename[0] != "." - and filename[-1] != "." - ): - raise errors.UnprocessableEntity(f"Invalid filename provided") + if not File.valid_filename(filename): + raise errors.UnprocessableEntity(f"Invalid filename {filename!r} provided") # Validate the mimetype from the client seems legit mimetype = mimeType.strip().lower() @@ -761,7 +769,7 @@ def getUploadURL( }) @exposed - def download(self, blobKey: str, fileName: str = "", download: str = "", sig: str = "", *args, **kwargs): + def download(self, blobKey: str, fileName: str = "", download: bool = False, sig: str = "", *args, **kwargs): """ Download a file. :param blobKey: The unique blob key of the file. @@ -769,6 +777,13 @@ def download(self, blobKey: str, fileName: str = "", download: str = "", sig: st :param download: Set header to attachment retrival, set explictly to "1" if download is wanted. """ global _credentials, bucket + + if filename := fileName.strip(): + if not File.valid_filename(filename): + raise errors.UnprocessableEntityf(f"The provided filename {filename!r} is invalid!") + + download_filename = "" + if not sig: # Check if the current user has the right to download *any* blob present in this application. # blobKey is then the path inside cloudstore - not a base64 encoded tuple @@ -778,7 +793,7 @@ def download(self, blobKey: str, fileName: str = "", download: str = "", sig: st raise errors.Forbidden() validUntil = "-1" # Prevent this from being cached down below blob = bucket.get_blob(blobKey) - downloadFilename = "" + else: # We got an request including a signature (probably a guest or a user without file-view access) # First, validate the signature, otherwise we don't need to proceed any further @@ -786,52 +801,60 @@ def download(self, blobKey: str, fileName: str = "", download: str = "", sig: st raise errors.Forbidden() # Split the blobKey into the individual fields it should contain try: - dlPath, validUntil, downloadFilename = base64.urlsafe_b64decode(blobKey).decode("UTF-8").split("\0") + dlPath, validUntil, download_filename = base64.urlsafe_b64decode(blobKey).decode("UTF-8").split("\0") except: # It's the old format, without an downloadFileName dlPath, validUntil = base64.urlsafe_b64decode(blobKey).decode("UTF-8").split("\0") - downloadFilename = "" + if validUntil != "0" and datetime.datetime.strptime(validUntil, "%Y%m%d%H%M") < datetime.datetime.now(): - raise errors.Gone() - blob = bucket.get_blob(dlPath) + blob = None + else: + blob = bucket.get_blob(dlPath) + if not blob: - raise errors.Gone() - if downloadFilename: - contentDisposition = f"attachment; filename={downloadFilename}" - elif download: - fileName = self.sanitize_filename(blob.name.split("/")[-1]) - contentDisposition = f"attachment; filename={fileName}" - else: - fileName = self.sanitize_filename(blob.name.split("/")[-1]) - contentDisposition = f"filename={fileName}" + raise errors.Gone("The requested blob has expired.") + + if not filename: + filename = download_filename or urlquote(blob.name.split("/")[-1]) + + content_disposition = "; ".join( + item for item in ( + "attachment" if download else None, + f"filename={filename}" if filename else None, + ) if item + ) + if isinstance(_credentials, ServiceAccountCredentials): expiresAt = datetime.datetime.now() + datetime.timedelta(seconds=60) - signedUrl = blob.generate_signed_url(expiresAt, response_disposition=contentDisposition, version="v4") + signedUrl = blob.generate_signed_url(expiresAt, response_disposition=content_disposition, version="v4") raise errors.Redirect(signedUrl) + elif conf.instance.is_dev_server: # No Service-Account to sign with - Serve everything directly response = current.request.get().response response.headers["Content-Type"] = blob.content_type - if contentDisposition: - response.headers["Content-Disposition"] = contentDisposition + if content_disposition: + response.headers["Content-Disposition"] = content_disposition return blob.download_as_bytes() - else: # We are inside the appengine - if validUntil == "0": # Its an indefinitely valid URL - if blob.size < 5 * 1024 * 1024: # Less than 5 MB - Serve directly and push it into the ede caches - response = current.request.get().response - response.headers["Content-Type"] = blob.content_type - response.headers["Cache-Control"] = "public, max-age=604800" # 7 Days - if contentDisposition: - response.headers["Content-Disposition"] = contentDisposition - return blob.download_as_bytes() - # Default fallback - create a signed URL and redirect - authRequest = google.auth.transport.requests.Request() - expiresAt = datetime.datetime.now() + datetime.timedelta(seconds=60) - signing_credentials = google.auth.compute_engine.IDTokenCredentials(authRequest, "") - signedUrl = blob.generate_signed_url( - expiresAt, - credentials=signing_credentials, - response_disposition=contentDisposition, - version="v4") - raise errors.Redirect(signedUrl) + + if validUntil == "0": # Its an indefinitely valid URL + if blob.size < 5 * 1024 * 1024: # Less than 5 MB - Serve directly and push it into the ede caches + response = current.request.get().response + response.headers["Content-Type"] = blob.content_type + response.headers["Cache-Control"] = "public, max-age=604800" # 7 Days + if content_disposition: + response.headers["Content-Disposition"] = content_disposition + return blob.download_as_bytes() + + # Default fallback - create a signed URL and redirect + authRequest = google.auth.transport.requests.Request() + expiresAt = datetime.datetime.now() + datetime.timedelta(seconds=60) + signing_credentials = google.auth.compute_engine.IDTokenCredentials(authRequest, "") + signedUrl = blob.generate_signed_url( + expiresAt, + credentials=signing_credentials, + response_disposition=content_disposition, + version="v4") + + raise errors.Redirect(signedUrl) @exposed @force_ssl @@ -909,9 +932,6 @@ def onEdit(self, skelType: SkelType, skel: SkeletonInstance): bucket.copy_blob(old_blob, bucket, new_path, if_generation_match=0) bucket.delete_blob(old_path) - def onItemUploaded(self, skel): - pass - def mark_for_deletion(self, dlkey: str) -> None: """ Adds a marker to the datastore that the file specified as *dlkey* can be deleted.