Skip to content

Commit

Permalink
Implement and use File.valid_filename function
Browse files Browse the repository at this point in the history
Avoids to upload or rename files with invalid filenames.
  • Loading branch information
phorward committed Mar 4, 2024
1 parent 5e71bc8 commit 60d4ba0
Showing 1 changed file with 84 additions and 64 deletions.
148 changes: 84 additions & 64 deletions src/viur/core/modules/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import json
import logging
import PIL
import re
import requests
import string
import typing as t
Expand All @@ -26,13 +27,20 @@

# Globals for connectivity

VALID_FILENAME_REGEX = re.compile(
# || MAY NOT BE THE NAME | MADE OF SPECIAL CHARS | SPECIAL CHARS + `. `|`
r"^(?!^(CON|PRN|AUX|NUL|COM[1-9]|LPT[1-9])$)[^\x00-\x1F<>:\"\/\\|?*]*[^\x00-\x1F<>:\"\/\\|?*. ]$",
re.IGNORECASE
)

_credentials, __project_id = google.auth.default()
client = storage.Client(__project_id, _credentials)
bucket = client.lookup_bucket(f"""{__project_id}.appspot.com""")

# FilePath is a descriptor for ViUR file components
FilePath = namedtuple("FilePath", ("dlkey", "is_derived", "filename"))


def importBlobFromViur2(dlKey, fileName):
if not conf.viur2import_blobsource:
return False
Expand Down Expand Up @@ -174,11 +182,11 @@ def getsignedurl():
authRequest = google.auth.transport.requests.Request()
expiresAt = datetime.datetime.now() + datetime.timedelta(seconds=60)
signing_credentials = google.auth.compute_engine.IDTokenCredentials(authRequest, "")
contentDisposition = f"""filename={fileSkel["name"]}"""
content_disposition = f"""filename={fileSkel["name"]}"""
signedUrl = blob.generate_signed_url(
expiresAt,
credentials=signing_credentials,
response_disposition=contentDisposition,
response_disposition=content_disposition,
version="v4")
return signedUrl

Expand Down Expand Up @@ -292,7 +300,8 @@ class FileLeafSkel(TreeSkel):
name = StringBone(
descr="Filename",
caseSensitive=False,
searchable=True
searchable=True,
vfunc=lambda val: None if File.valid_filename(val) else "Invalid name provided"
)

mimetype = StringBone(
Expand Down Expand Up @@ -403,15 +412,15 @@ class File(Tree):
# Helper functions currently resist here

@staticmethod
def sanitize_filename(filename: str) -> str:
def valid_filename(filename: str) -> bool:
"""
Sanitize the filename so it can be safely downloaded or be embedded into html
Verifies for a valid filename.
The filename should be valid on Linux, Mac OS and Windows.
Rule set: https://stackoverflow.com/a/31976060/3749896
Regex test: https://regex101.com/r/iBYpoC/1
"""
filename = filename[:100] # Limit to 100 Chars max
filename = "".join(ch for ch in filename if ch not in "\0'\"<>\n;$&?#:;/\\") # Remove invalid Chars
filename = filename.strip(".") # Ensure the filename does not start or end with a dot
filename = urlquote(filename) # Finally quote any non-ASCII characters
return filename
return bool(re.match(VALID_FILENAME_REGEX, filename))

@staticmethod
def hmac_sign(data: t.Any) -> str:
Expand Down Expand Up @@ -451,11 +460,16 @@ def create_download_url(
# Undo escaping on ()= performed on fileNames
filename = filename.replace("&#040;", "(").replace("&#041;", ")").replace("&#061;", "=")
filepath = f"""{dlkey}/{"derived" if derived else "source"}/{filename}"""
download_filename = File.sanitize_filename(download_filename) if download_filename else ""

if download_filename:
if not File.valid_filename(download_filename):
raise errors.UnprocessableEntity(f"Invalid download_filename {download_filename!r} provided")

download_filename = urlquote(download_filename)

expires = (datetime.datetime.now() + expires).strftime("%Y%m%d%H%M") if expires else 0

data = base64.urlsafe_b64encode(f"{filepath}\0{expires}\0{download_filename}".encode("UTF-8"))
data = base64.urlsafe_b64encode(f"""{filepath}\0{expires}\0{download_filename or ""}""".encode("UTF-8"))
sig = File.hmac_sign(data)

return f"""{File.DOWNLOAD_URL_PREFIX}{data.decode("ASCII")}?sig={sig}"""
Expand Down Expand Up @@ -583,6 +597,9 @@ def write(self, filename: str, content: t.Any, mimetype: str = "text/plain", wid
:return: Returns the key of the file object written. This can be associated e.g. with a FileBone.
"""
if not File.valid_filename(filename):
raise ValueError(f"{filename=} is invalid")

dl_key = utils.string.random()

blob = bucket.blob(f"{dl_key}/source/{filename}")
Expand Down Expand Up @@ -651,19 +668,10 @@ def getUploadURL(
authData: t.Optional[str] = None,
authSig: t.Optional[str] = None
):
# Validate the filename from the client seems legit
# This applies the rules defined by sanitize_filename(),
# to allow for "secure" filenames only.
filename = fileName.strip()
filename = fileName.strip() # VIUR4 FIXME: just for compatiblity of the parameter names

if not (
filename
and len(filename) <= 100
and all(ch not in filename for ch in "\0'\"<>\n;$&?#:;/\\")
and filename[0] != "."
and filename[-1] != "."
):
raise errors.UnprocessableEntity(f"Invalid filename provided")
if not File.valid_filename(filename):
raise errors.UnprocessableEntity(f"Invalid filename {filename!r} provided")

# Validate the mimetype from the client seems legit
mimetype = mimeType.strip().lower()
Expand Down Expand Up @@ -761,14 +769,21 @@ def getUploadURL(
})

@exposed
def download(self, blobKey: str, fileName: str = "", download: str = "", sig: str = "", *args, **kwargs):
def download(self, blobKey: str, fileName: str = "", download: bool = False, sig: str = "", *args, **kwargs):
"""
Download a file.
:param blobKey: The unique blob key of the file.
:param fileName: Optional filename to provide in the header.
:param download: Set header to attachment retrival, set explictly to "1" if download is wanted.
"""
global _credentials, bucket

if filename := fileName.strip():
if not File.valid_filename(filename):
raise errors.UnprocessableEntityf(f"The provided filename {filename!r} is invalid!")

download_filename = ""

if not sig:
# Check if the current user has the right to download *any* blob present in this application.
# blobKey is then the path inside cloudstore - not a base64 encoded tuple
Expand All @@ -778,60 +793,68 @@ def download(self, blobKey: str, fileName: str = "", download: str = "", sig: st
raise errors.Forbidden()
validUntil = "-1" # Prevent this from being cached down below
blob = bucket.get_blob(blobKey)
downloadFilename = ""

else:
# We got an request including a signature (probably a guest or a user without file-view access)
# First, validate the signature, otherwise we don't need to proceed any further
if not self.hmac_verify(blobKey, sig):
raise errors.Forbidden()
# Split the blobKey into the individual fields it should contain
try:
dlPath, validUntil, downloadFilename = base64.urlsafe_b64decode(blobKey).decode("UTF-8").split("\0")
dlPath, validUntil, download_filename = base64.urlsafe_b64decode(blobKey).decode("UTF-8").split("\0")
except: # It's the old format, without an downloadFileName
dlPath, validUntil = base64.urlsafe_b64decode(blobKey).decode("UTF-8").split("\0")
downloadFilename = ""

if validUntil != "0" and datetime.datetime.strptime(validUntil, "%Y%m%d%H%M") < datetime.datetime.now():
raise errors.Gone()
blob = bucket.get_blob(dlPath)
blob = None
else:
blob = bucket.get_blob(dlPath)

if not blob:
raise errors.Gone()
if downloadFilename:
contentDisposition = f"attachment; filename={downloadFilename}"
elif download:
fileName = self.sanitize_filename(blob.name.split("/")[-1])
contentDisposition = f"attachment; filename={fileName}"
else:
fileName = self.sanitize_filename(blob.name.split("/")[-1])
contentDisposition = f"filename={fileName}"
raise errors.Gone("The requested blob has expired.")

if not filename:
filename = download_filename or urlquote(blob.name.split("/")[-1])

content_disposition = "; ".join(
item for item in (
"attachment" if download else None,
f"filename={filename}" if filename else None,
) if item
)

if isinstance(_credentials, ServiceAccountCredentials):
expiresAt = datetime.datetime.now() + datetime.timedelta(seconds=60)
signedUrl = blob.generate_signed_url(expiresAt, response_disposition=contentDisposition, version="v4")
signedUrl = blob.generate_signed_url(expiresAt, response_disposition=content_disposition, version="v4")
raise errors.Redirect(signedUrl)

elif conf.instance.is_dev_server: # No Service-Account to sign with - Serve everything directly
response = current.request.get().response
response.headers["Content-Type"] = blob.content_type
if contentDisposition:
response.headers["Content-Disposition"] = contentDisposition
if content_disposition:
response.headers["Content-Disposition"] = content_disposition
return blob.download_as_bytes()
else: # We are inside the appengine
if validUntil == "0": # Its an indefinitely valid URL
if blob.size < 5 * 1024 * 1024: # Less than 5 MB - Serve directly and push it into the ede caches
response = current.request.get().response
response.headers["Content-Type"] = blob.content_type
response.headers["Cache-Control"] = "public, max-age=604800" # 7 Days
if contentDisposition:
response.headers["Content-Disposition"] = contentDisposition
return blob.download_as_bytes()
# Default fallback - create a signed URL and redirect
authRequest = google.auth.transport.requests.Request()
expiresAt = datetime.datetime.now() + datetime.timedelta(seconds=60)
signing_credentials = google.auth.compute_engine.IDTokenCredentials(authRequest, "")
signedUrl = blob.generate_signed_url(
expiresAt,
credentials=signing_credentials,
response_disposition=contentDisposition,
version="v4")
raise errors.Redirect(signedUrl)

if validUntil == "0": # Its an indefinitely valid URL
if blob.size < 5 * 1024 * 1024: # Less than 5 MB - Serve directly and push it into the ede caches
response = current.request.get().response
response.headers["Content-Type"] = blob.content_type
response.headers["Cache-Control"] = "public, max-age=604800" # 7 Days
if content_disposition:
response.headers["Content-Disposition"] = content_disposition
return blob.download_as_bytes()

# Default fallback - create a signed URL and redirect
authRequest = google.auth.transport.requests.Request()
expiresAt = datetime.datetime.now() + datetime.timedelta(seconds=60)
signing_credentials = google.auth.compute_engine.IDTokenCredentials(authRequest, "")
signedUrl = blob.generate_signed_url(
expiresAt,
credentials=signing_credentials,
response_disposition=content_disposition,
version="v4")

raise errors.Redirect(signedUrl)

@exposed
@force_ssl
Expand Down Expand Up @@ -909,9 +932,6 @@ def onEdit(self, skelType: SkelType, skel: SkeletonInstance):
bucket.copy_blob(old_blob, bucket, new_path, if_generation_match=0)
bucket.delete_blob(old_path)

def onItemUploaded(self, skel):
pass

def mark_for_deletion(self, dlkey: str) -> None:
"""
Adds a marker to the datastore that the file specified as *dlkey* can be deleted.
Expand Down

0 comments on commit 60d4ba0

Please sign in to comment.