Skip to content

Commit

Permalink
Fsspec update (#105)
Browse files Browse the repository at this point in the history
* compatibility with fsspec>=2023.10

* fsspec is typed

* template update

* restore fsspec in pyproject.toml
  • Loading branch information
malmans2 authored Oct 23, 2023
1 parent fc891c4 commit 8b4b3de
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 65 deletions.
2 changes: 1 addition & 1 deletion .cruft.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"template": "https://github.com/ecmwf-projects/cookiecutter-conda-package",
"commit": "2efb5584faa681782a41bf2bb437295c5c0f4045",
"commit": "519dfc3783111cd3fde95aebc84101297152c45e",
"checkout": null,
"context": {
"cookiecutter": {
Expand Down
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repos:
- id: debug-statements
- id: mixed-line-ending
- repo: https://github.com/psf/black
rev: 23.9.1
rev: 23.10.0
hooks:
- id: black
- repo: https://github.com/keewis/blackdoc
Expand All @@ -20,7 +20,7 @@ repos:
- id: blackdoc
additional_dependencies: [black==22.3.0]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.0.292
rev: v0.1.1
hooks:
- id: ruff
args: [--fix, --show-fixes]
Expand Down
12 changes: 8 additions & 4 deletions cacholote/extra_encoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
return cast(F, wrapper)


def _filesystem_is_local(fs: fsspec.AbstractFileSystem) -> bool:
return isinstance(fs, fsspec.get_filesystem_class("file"))


@contextlib.contextmanager
def _logging_timer(event: str, **kwargs: Any) -> Generator[float, None, None]:
logger = config.get().logger
Expand Down Expand Up @@ -188,11 +192,11 @@ def decode_xr_dataset(
if file_json["type"] == "application/vnd+zarr":
filename_or_obj = fs.get_mapper(urlpath)
else:
if fs.protocol == "file":
if _filesystem_is_local(fs):
filename_or_obj = urlpath
else:
# Download local copy
protocols = [fs.protocol] if isinstance(fs.protocol, str) else fs.protocol
protocols = (fs.protocol,) if isinstance(fs.protocol, str) else fs.protocol
with fsspec.open(
f"filecache::{urlpath}",
filecache={"same_names": True},
Expand Down Expand Up @@ -241,7 +245,7 @@ def _maybe_store_xr_dataset(
raise ValueError(f"type {filetype!r} is NOT supported.")

_maybe_store_file_object(
fs if fs.protocol == "file" else fsspec.filesystem("file"),
fs if _filesystem_is_local(fs) else fsspec.filesystem("file"),
tmpfilename,
fs,
urlpath,
Expand Down Expand Up @@ -305,7 +309,7 @@ def _maybe_store_file_object(
fs_in.mv(urlpath_in, urlpath_out, **kwargs)
else:
fs_in.cp(urlpath_in, urlpath_out, **kwargs)
elif fs_in.protocol == "file":
elif _filesystem_is_local(fs_in):
fs_out.put(urlpath_in, urlpath_out, **kwargs)
else:
with fs_in.open(urlpath_in, "rb") as f_in:
Expand Down
1 change: 1 addition & 0 deletions ci/environment-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ dependencies:
- requests
- s3fs
- sqlalchemy[mypy]
- types-requests
- xarray>=2022.6.0
- zarr
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ strict = true
[[tool.mypy.overrides]]
ignore_missing_imports = true
module = [
"botocore.*",
"cfgrib.*",
"fsspec.*"
"fsspec.*",
"moto.*"
]

[tool.ruff]
Expand Down
85 changes: 29 additions & 56 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,57 @@
import contextlib
import os
import pathlib
import shlex
import subprocess
import time
from typing import Any, Dict, Generator
from typing import Any, Generator, Iterator

import botocore.session
import psycopg
import pytest
import requests
from moto.moto_server.threaded_moto_server import ThreadedMotoServer

from cacholote import config


def wait_s3_up(
endpoint_url: str, max_sleep: float = 1.0, min_sleep: float = 0.1
) -> None:
requests = pytest.importorskip("requests")
while max_sleep > 0:
time.sleep(min_sleep)
max_sleep -= min_sleep
try:
r = requests.get(endpoint_url)
if r.ok:
break
except requests.exceptions.ConnectionError:
pass


@contextlib.contextmanager
def initialize_s3() -> Generator[Dict[str, str], None, None]:
pytest.importorskip("boto3")
@pytest.fixture(scope="session")
def s3_base() -> Iterator[ThreadedMotoServer]:
if "AWS_SECRET_ACCESS_KEY" not in os.environ:
os.environ["AWS_SECRET_ACCESS_KEY"] = "foo"
if "AWS_ACCESS_KEY_ID" not in os.environ:
os.environ["AWS_ACCESS_KEY_ID"] = "foo"
port = 5555
proc = subprocess.Popen(
shlex.split(f"moto_server s3 -p {port}"),
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
)
endpoint_url = f"http://127.0.0.1:{port}/"
wait_s3_up(endpoint_url)
try:
yield {"endpoint_url": endpoint_url}
finally:
proc.terminate()
proc.wait()
server = ThreadedMotoServer(ip_address="127.0.0.1", port=5555)
server.start()
yield server
server.stop()


@pytest.fixture(autouse=True)
def set_cache(
tmpdir: pathlib.Path,
tmp_path: pathlib.Path,
postgresql: psycopg.Connection[Any],
request: pytest.FixtureRequest,
s3_base: ThreadedMotoServer,
) -> Generator[str, None, None]:
if not hasattr(request, "param") or request.param == "file":
with config.set(
cache_db_urlpath="sqlite:///" + str(tmpdir / "cacholote.db"),
cache_files_urlpath=str(tmpdir / "cache_files"),
cache_db_urlpath="sqlite:///" + str(tmp_path / "cacholote.db"),
cache_files_urlpath=str(tmp_path / "cache_files"),
):
yield "file"
elif request.param == "cads":
pytest.importorskip("s3fs")
botocore_session = pytest.importorskip("botocore.session")

endpoint_url = f"http://{s3_base._ip_address}:{s3_base._port}/"
client_kwargs = {"endpoint_url": endpoint_url}
test_bucket_name = "test-bucket"
with initialize_s3() as client_kwargs:
session = botocore_session.Session()
client = session.create_client("s3", **client_kwargs)
client.create_bucket(Bucket=test_bucket_name)
with config.set(
cache_db_urlpath=(
f"postgresql+psycopg2://{postgresql.info.user}:@{postgresql.info.host}:"
f"{postgresql.info.port}/{postgresql.info.dbname}"
),
cache_files_urlpath=f"s3://{test_bucket_name}",
cache_files_storage_options=dict(client_kwargs=client_kwargs),
):
yield request.param
requests.post(f"{endpoint_url}moto-api/reset")
session = botocore.session.Session()
client = session.create_client("s3", **client_kwargs)
client.create_bucket(Bucket=test_bucket_name)
with config.set(
cache_db_urlpath=(
f"postgresql+psycopg2://{postgresql.info.user}:@{postgresql.info.host}:"
f"{postgresql.info.port}/{postgresql.info.dbname}"
),
cache_files_urlpath=f"s3://{test_bucket_name}",
cache_files_storage_options={"client_kwargs": client_kwargs},
):
yield request.param
else:
raise ValueError
2 changes: 1 addition & 1 deletion tests/test_40_xarray_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def test_xr_corrupted_files(


def test_xr_logging(capsys: pytest.CaptureFixture[str]) -> None:
config.set(logger=structlog.get_logger())
config.set(logger=structlog.get_logger(), raise_all_encoding_errors=True)

# Cache dataset
cfunc = cache.cacheable(get_grib_ds)
Expand Down

0 comments on commit 8b4b3de

Please sign in to comment.