Skip to content

Commit

Permalink
COPDS-2331 (#136)
Browse files Browse the repository at this point in the history
* add FrozenFile

* inherit from FileIO

* add test

* cleanup duplicated files

* add test

* cleanup

* rename
  • Loading branch information
malmans2 authored Jan 10, 2025
1 parent c3df28e commit 5ae1aa5
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 13 deletions.
10 changes: 6 additions & 4 deletions cacholote/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,19 +256,21 @@ def delete_cache_files(
return

entries_to_delete = []
files_to_delete: set[str] = set()
self.logger.info("getting cache entries to delete")
with config.get().instantiated_sessionmaker() as session:
for cache_entry in session.scalars(
sa.select(database.CacheEntry).filter(*filters).order_by(*sorters)
):
files = _get_files_from_cache_entry(cache_entry, key="file:size")
if any(file.startswith(self.urldir) for file in files):
if (
not self.stop_cleaning(maxsize)
and any(file.startswith(self.urldir) for file in files)
) or (set(files) & files_to_delete):
entries_to_delete.append(cache_entry)
for file in files:
self.pop_file_size(file)

if self.stop_cleaning(maxsize):
break
files_to_delete.add(file)

if entries_to_delete:
self.logger.info(
Expand Down
28 changes: 20 additions & 8 deletions cacholote/extra_encoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ def _logging_timer(event: str, **kwargs: Any) -> Generator[float, None, None]:
context.upload_log(f"end {event}. {_kwargs_to_str(**kwargs)}")


class InPlaceFile(io.FileIO):
pass


class FileInfoModel(pydantic.BaseModel):
type: str
href: str
Expand Down Expand Up @@ -389,27 +393,35 @@ def _store_io_object(

def dictify_io_object(obj: _UNION_IO_TYPES) -> dict[str, Any]:
"""Encode a file object to JSON deserialized data (``dict``)."""
is_in_place = isinstance(obj, InPlaceFile)
settings = config.get()

cache_files_urlpath = settings.cache_files_urlpath
fs_out, *_ = fsspec.get_fs_token_paths(
cache_files_urlpath,
storage_options=settings.cache_files_storage_options,
)

if urlpath_in := getattr(obj, "path", getattr(obj, "name", "")):
fs_in = getattr(obj, "fs", fsspec.filesystem("file"))
root = f"{fs_in.checksum(urlpath_in):x}"
ext = pathlib.Path(urlpath_in).suffix
urlpath_out = posixpath.join(cache_files_urlpath, f"{root}{ext}")
if is_in_place:
urlpath_out = urlpath_in
else:
root = f"{fs_in.checksum(urlpath_in):x}"
ext = pathlib.Path(urlpath_in).suffix
urlpath_out = posixpath.join(cache_files_urlpath, f"{root}{ext}")
else:
root = hashlib.md5(f"{hash(obj)}".encode()).hexdigest() # fsspec uses md5
urlpath_out = posixpath.join(cache_files_urlpath, root)

if is_in_place:
fs_out = fs_in
else:
fs_out, *_ = fsspec.get_fs_token_paths(
cache_files_urlpath,
storage_options=settings.cache_files_storage_options,
)

with utils.FileLock(
fs_out, urlpath_out, timeout=settings.lock_timeout
) as file_exists:
if not file_exists:
if not (file_exists or is_in_place):
if urlpath_in:
_store_file_object(fs_in, urlpath_in, fs_out, urlpath_out)
else:
Expand Down
14 changes: 14 additions & 0 deletions tests/test_50_io_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Any

import fsspec
import fsspec.implementations.local
import pytest
import pytest_httpserver
import pytest_structlog
Expand Down Expand Up @@ -235,3 +236,16 @@ def test_io_logging(
},
]
assert log.events == expected


def test_io_in_place_file(tmp_path: pathlib.Path) -> None:
@cache.cacheable
def cached_in_place_open(path: str) -> io.FileIO:
return extra_encoders.InPlaceFile(path, "rb")

tmpfile = tmp_path / "test.txt"
fsspec.filesystem("file").touch(tmpfile)

output = cached_in_place_open(str(tmpfile))
assert isinstance(output, fsspec.implementations.local.LocalFileOpener)
assert output.name == str(tmpfile)
30 changes: 29 additions & 1 deletion tests/test_60_clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os
import pathlib
import time
from typing import Any, Literal
from typing import Any, BinaryIO, Literal

import fsspec
import pydantic
Expand Down Expand Up @@ -411,3 +411,31 @@ def test_clean_multiple_urlpaths(tmp_path: pathlib.Path, use_database: bool) ->
clean.clean_cache_files(maxsize=0, use_database=use_database, depth=2)
assert not cached_file1.exists()
assert cached_file2.exists()


def test_clean_duplicates(tmp_path: pathlib.Path) -> None:
con = config.get().engine.raw_connection()
cur = con.cursor()

# Create file
tmpfile = tmp_path / "file.txt"
fsspec.filesystem("file").pipe_file(tmpfile, ONE_BYTE)

@cache.cacheable
def func1(path: pathlib.Path) -> BinaryIO:
return path.open("rb")

@cache.cacheable
def func2(path: pathlib.Path) -> BinaryIO:
return path.open("rb")

fp1 = func1(tmpfile)
fp2 = func2(tmpfile)
assert fp1.name == fp2.name

cur.execute("SELECT COUNT(*) FROM cache_entries", ())
assert cur.fetchone() == (2,)

clean.clean_cache_files(maxsize=0)
cur.execute("SELECT COUNT(*) FROM cache_entries", ())
assert cur.fetchone() == (0,)

0 comments on commit 5ae1aa5

Please sign in to comment.