diff --git a/CHANGELOG.md b/CHANGELOG.md index 00174cf..efefa51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added - Add basic CI/CD test and build status badges to README. (PR #182, @brews) +### Fixed +- Fix dodola validate-dataset OOM on small workers without dask-distributed. (PR #181, @brews) ## [0.17.0] - 2022-02-17 ### Changed diff --git a/dodola/core.py b/dodola/core.py index 3eca0c5..d9c331f 100644 --- a/dodola/core.py +++ b/dodola/core.py @@ -6,7 +6,6 @@ import warnings import logging -import dask import numpy as np import xarray as xr from xclim import sdba, set_options @@ -637,71 +636,14 @@ def apply_precip_ceiling(ds, ceiling): return ds_corrected -def validate_dataset(ds, var, data_type, time_period="future"): - """ - Validate a Dataset. Valid for CMIP6, bias corrected and downscaled. - - Raises AssertionError when validation fails. - - Parameters - ---------- - ds : xr.Dataset - var : {"tasmax", "tasmin", "dtr", "pr"} - Variable in Dataset to validate. - data_type : {"cmip6", "bias_corrected", "downscaled"} - Type of data output to validate. - time_period : {"historical", "future"} - Time period of data that will be validated. - """ - # This is pretty rough but works to communicate the idea. - # Consider having failed tests raise something like ValidationError rather - # than AssertionErrors. - - # These only read in Zarr Store metadata -- not memory intensive. - _test_variable_names(ds, var) - _test_timesteps(ds, data_type, time_period) - - # Other test are done on annual selections with dask.delayed to - # avoid large memory errors. xr.map_blocks had trouble with this. - @dask.delayed - def memory_intensive_tests(ds, v, t): - d = ds.sel(time=str(t)) - - _test_for_nans(d, v) - - if v == "tasmin": - _test_temp_range(d, v) - elif v == "tasmax": - _test_temp_range(d, v) - elif v == "dtr": - _test_dtr_range(d, v, data_type) - _test_negative_values(d, v) - elif v == "pr": - _test_negative_values(d, v) - _test_maximum_precip(d, v) - else: - raise ValueError(f"Argument {v=} not recognized") - - # Assumes error thrown if had problem before this. - return True - - results = [] - for t in np.unique(ds["time"].dt.year.data): - logger.debug(f"Validating year {t}") - results.append(memory_intensive_tests(ds, var, t)) - results = dask.compute(*results) - assert all(results) # Likely don't need this - return True - - -def _test_for_nans(ds, var): +def test_for_nans(ds, var): """ Tests for presence of NaNs """ assert ds[var].isnull().sum() == 0, "there are nans!" -def _test_timesteps(ds, data_type, time_period): +def test_timesteps(ds, data_type, time_period): """ Tests that Dataset contains the correct number of timesteps (number of days on a noleap calendar) for the data_type/time_period combination. @@ -763,14 +705,14 @@ def _test_timesteps(ds, data_type, time_period): ) -def _test_variable_names(ds, var): +def test_variable_names(ds, var): """ Test that the correct variable name exists in the file """ assert var in ds.var(), "{} not in Dataset".format(var) -def _test_temp_range(ds, var): +def test_temp_range(ds, var): """ Ensure temperature values are in a valid range """ @@ -781,7 +723,7 @@ def _test_temp_range(ds, var): ), "{} values are invalid".format(var) -def _test_dtr_range(ds, var, data_type): +def test_dtr_range(ds, var, data_type): """ Ensure DTR values are in a valid range Test polar values separately since some polar values can be much higher post-bias correction. @@ -830,7 +772,7 @@ def _test_dtr_range(ds, var, data_type): ), "diurnal temperature range max is {} for non-polar regions".format(non_polar_max) -def _test_negative_values(ds, var): +def test_negative_values(ds, var): """ Tests for presence of negative values """ @@ -839,7 +781,7 @@ def _test_negative_values(ds, var): assert neg_values == 0, "there are {} negative values!".format(neg_values) -def _test_maximum_precip(ds, var): +def test_maximum_precip(ds, var): """ Tests that max precip is reasonable """ diff --git a/dodola/services.py b/dodola/services.py index 8416542..69af83b 100644 --- a/dodola/services.py +++ b/dodola/services.py @@ -3,6 +3,7 @@ from functools import wraps import json import logging +import dask from dodola.core import ( xesmf_regrid, standardize_gcm, @@ -12,12 +13,18 @@ adjust_quantiledeltamapping, train_analogdownscaling, adjust_analogdownscaling, - validate_dataset, dtr_floor, non_polar_dtr_ceiling, apply_precip_ceiling, xclim_units_any2pint, xclim_units_pint2cf, + test_for_nans, + test_variable_names, + test_timesteps, + test_temp_range, + test_dtr_range, + test_negative_values, + test_maximum_precip, ) import dodola.repository as storage @@ -701,6 +708,12 @@ def adjust_maximum_precipitation(x, out, threshold=3000.0): def validate(x, var, data_type, time_period): """Performs validation on an input dataset + Valid for CMIP6, bias corrected and downscaled. Raises AssertionError when + validation fails. + + This function performs more memory-intensive tests by reading input data + and subsetting to each year in the "time" dimension. + Parameters ---------- x : str @@ -714,6 +727,43 @@ def validate(x, var, data_type, time_period): Time period that input data should cover, used in validating the number of timesteps in conjunction with the data type. """ - + # This is pretty rough but works to communicate the idea. + # Consider having failed tests raise something like ValidationError rather + # than AssertionErrors. ds = storage.read(x) - validate_dataset(ds, var, data_type, time_period) + + # These only read in Zarr Store metadata -- not memory intensive. + test_variable_names(ds, var) + test_timesteps(ds, data_type, time_period) + + # Other test are done on annual selections with dask.delayed to + # avoid large memory errors. + # Doing all this here because this involves storage and I/O logic. + @dask.delayed + def memory_intensive_tests(f, v, t): + d = storage.read(f).sel(time=str(t)) + + test_for_nans(d, v) + + if v == "tasmin": + test_temp_range(d, v) + elif v == "tasmax": + test_temp_range(d, v) + elif v == "dtr": + test_dtr_range(d, v, data_type) + test_negative_values(d, v) + elif v == "pr": + test_negative_values(d, v) + test_maximum_precip(d, v) + else: + raise ValueError(f"Argument {v=} not recognized") + + # Assumes error thrown if had problem before this. + return True + + tasks = [] + for t in set(ds["time"].dt.year.data): + logger.debug(f"Validating year {t}") + tasks.append(memory_intensive_tests(x, var, t)) + tasks = dask.compute(*tasks) + assert all(tasks) # Likely don't need this