diff --git a/cacholote/clean.py b/cacholote/clean.py index 173b49e..5ce704c 100644 --- a/cacholote/clean.py +++ b/cacholote/clean.py @@ -256,19 +256,21 @@ def delete_cache_files( return entries_to_delete = [] + files_to_delete: set[str] = set() self.logger.info("getting cache entries to delete") with config.get().instantiated_sessionmaker() as session: for cache_entry in session.scalars( sa.select(database.CacheEntry).filter(*filters).order_by(*sorters) ): files = _get_files_from_cache_entry(cache_entry, key="file:size") - if any(file.startswith(self.urldir) for file in files): + if ( + not self.stop_cleaning(maxsize) + and any(file.startswith(self.urldir) for file in files) + ) or (set(files) & files_to_delete): entries_to_delete.append(cache_entry) for file in files: self.pop_file_size(file) - - if self.stop_cleaning(maxsize): - break + files_to_delete.add(file) if entries_to_delete: self.logger.info( diff --git a/cacholote/extra_encoders.py b/cacholote/extra_encoders.py index 2509951..603a0f1 100644 --- a/cacholote/extra_encoders.py +++ b/cacholote/extra_encoders.py @@ -129,6 +129,10 @@ def _logging_timer(event: str, **kwargs: Any) -> Generator[float, None, None]: context.upload_log(f"end {event}. {_kwargs_to_str(**kwargs)}") +class InPlaceFile(io.FileIO): + pass + + class FileInfoModel(pydantic.BaseModel): type: str href: str @@ -389,27 +393,35 @@ def _store_io_object( def dictify_io_object(obj: _UNION_IO_TYPES) -> dict[str, Any]: """Encode a file object to JSON deserialized data (``dict``).""" + is_in_place = isinstance(obj, InPlaceFile) settings = config.get() cache_files_urlpath = settings.cache_files_urlpath - fs_out, *_ = fsspec.get_fs_token_paths( - cache_files_urlpath, - storage_options=settings.cache_files_storage_options, - ) if urlpath_in := getattr(obj, "path", getattr(obj, "name", "")): fs_in = getattr(obj, "fs", fsspec.filesystem("file")) - root = f"{fs_in.checksum(urlpath_in):x}" - ext = pathlib.Path(urlpath_in).suffix - urlpath_out = posixpath.join(cache_files_urlpath, f"{root}{ext}") + if is_in_place: + urlpath_out = urlpath_in + else: + root = f"{fs_in.checksum(urlpath_in):x}" + ext = pathlib.Path(urlpath_in).suffix + urlpath_out = posixpath.join(cache_files_urlpath, f"{root}{ext}") else: root = hashlib.md5(f"{hash(obj)}".encode()).hexdigest() # fsspec uses md5 urlpath_out = posixpath.join(cache_files_urlpath, root) + if is_in_place: + fs_out = fs_in + else: + fs_out, *_ = fsspec.get_fs_token_paths( + cache_files_urlpath, + storage_options=settings.cache_files_storage_options, + ) + with utils.FileLock( fs_out, urlpath_out, timeout=settings.lock_timeout ) as file_exists: - if not file_exists: + if not (file_exists or is_in_place): if urlpath_in: _store_file_object(fs_in, urlpath_in, fs_out, urlpath_out) else: diff --git a/tests/test_50_io_encoder.py b/tests/test_50_io_encoder.py index d1af15f..b3c40ea 100644 --- a/tests/test_50_io_encoder.py +++ b/tests/test_50_io_encoder.py @@ -9,6 +9,7 @@ from typing import Any import fsspec +import fsspec.implementations.local import pytest import pytest_httpserver import pytest_structlog @@ -235,3 +236,16 @@ def test_io_logging( }, ] assert log.events == expected + + +def test_io_in_place_file(tmp_path: pathlib.Path) -> None: + @cache.cacheable + def cached_in_place_open(path: str) -> io.FileIO: + return extra_encoders.InPlaceFile(path, "rb") + + tmpfile = tmp_path / "test.txt" + fsspec.filesystem("file").touch(tmpfile) + + output = cached_in_place_open(str(tmpfile)) + assert isinstance(output, fsspec.implementations.local.LocalFileOpener) + assert output.name == str(tmpfile) diff --git a/tests/test_60_clean.py b/tests/test_60_clean.py index 688b7b8..3efe013 100644 --- a/tests/test_60_clean.py +++ b/tests/test_60_clean.py @@ -5,7 +5,7 @@ import os import pathlib import time -from typing import Any, Literal +from typing import Any, BinaryIO, Literal import fsspec import pydantic @@ -411,3 +411,31 @@ def test_clean_multiple_urlpaths(tmp_path: pathlib.Path, use_database: bool) -> clean.clean_cache_files(maxsize=0, use_database=use_database, depth=2) assert not cached_file1.exists() assert cached_file2.exists() + + +def test_clean_duplicates(tmp_path: pathlib.Path) -> None: + con = config.get().engine.raw_connection() + cur = con.cursor() + + # Create file + tmpfile = tmp_path / "file.txt" + fsspec.filesystem("file").pipe_file(tmpfile, ONE_BYTE) + + @cache.cacheable + def func1(path: pathlib.Path) -> BinaryIO: + return path.open("rb") + + @cache.cacheable + def func2(path: pathlib.Path) -> BinaryIO: + return path.open("rb") + + fp1 = func1(tmpfile) + fp2 = func2(tmpfile) + assert fp1.name == fp2.name + + cur.execute("SELECT COUNT(*) FROM cache_entries", ()) + assert cur.fetchone() == (2,) + + clean.clean_cache_files(maxsize=0) + cur.execute("SELECT COUNT(*) FROM cache_entries", ()) + assert cur.fetchone() == (0,)