Skip to content

Commit

Permalink
Merge pull request #181 from brews/fix_validate_memory
Browse files Browse the repository at this point in the history
Hack fix to validate OOM errors
  • Loading branch information
brews authored Feb 18, 2022
2 parents ae0e87d + 22357f9 commit 696938a
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 68 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
- Add basic CI/CD test and build status badges to README. (PR #182, @brews)
### Fixed
- Fix dodola validate-dataset OOM on small workers without dask-distributed. (PR #181, @brews)

## [0.17.0] - 2022-02-17
### Changed
Expand Down
72 changes: 7 additions & 65 deletions dodola/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import warnings
import logging
import dask
import numpy as np
import xarray as xr
from xclim import sdba, set_options
Expand Down Expand Up @@ -637,71 +636,14 @@ def apply_precip_ceiling(ds, ceiling):
return ds_corrected


def validate_dataset(ds, var, data_type, time_period="future"):
"""
Validate a Dataset. Valid for CMIP6, bias corrected and downscaled.
Raises AssertionError when validation fails.
Parameters
----------
ds : xr.Dataset
var : {"tasmax", "tasmin", "dtr", "pr"}
Variable in Dataset to validate.
data_type : {"cmip6", "bias_corrected", "downscaled"}
Type of data output to validate.
time_period : {"historical", "future"}
Time period of data that will be validated.
"""
# This is pretty rough but works to communicate the idea.
# Consider having failed tests raise something like ValidationError rather
# than AssertionErrors.

# These only read in Zarr Store metadata -- not memory intensive.
_test_variable_names(ds, var)
_test_timesteps(ds, data_type, time_period)

# Other test are done on annual selections with dask.delayed to
# avoid large memory errors. xr.map_blocks had trouble with this.
@dask.delayed
def memory_intensive_tests(ds, v, t):
d = ds.sel(time=str(t))

_test_for_nans(d, v)

if v == "tasmin":
_test_temp_range(d, v)
elif v == "tasmax":
_test_temp_range(d, v)
elif v == "dtr":
_test_dtr_range(d, v, data_type)
_test_negative_values(d, v)
elif v == "pr":
_test_negative_values(d, v)
_test_maximum_precip(d, v)
else:
raise ValueError(f"Argument {v=} not recognized")

# Assumes error thrown if had problem before this.
return True

results = []
for t in np.unique(ds["time"].dt.year.data):
logger.debug(f"Validating year {t}")
results.append(memory_intensive_tests(ds, var, t))
results = dask.compute(*results)
assert all(results) # Likely don't need this
return True


def _test_for_nans(ds, var):
def test_for_nans(ds, var):
"""
Tests for presence of NaNs
"""
assert ds[var].isnull().sum() == 0, "there are nans!"


def _test_timesteps(ds, data_type, time_period):
def test_timesteps(ds, data_type, time_period):
"""
Tests that Dataset contains the correct number of timesteps (number of days on a noleap calendar)
for the data_type/time_period combination.
Expand Down Expand Up @@ -763,14 +705,14 @@ def _test_timesteps(ds, data_type, time_period):
)


def _test_variable_names(ds, var):
def test_variable_names(ds, var):
"""
Test that the correct variable name exists in the file
"""
assert var in ds.var(), "{} not in Dataset".format(var)


def _test_temp_range(ds, var):
def test_temp_range(ds, var):
"""
Ensure temperature values are in a valid range
"""
Expand All @@ -781,7 +723,7 @@ def _test_temp_range(ds, var):
), "{} values are invalid".format(var)


def _test_dtr_range(ds, var, data_type):
def test_dtr_range(ds, var, data_type):
"""
Ensure DTR values are in a valid range
Test polar values separately since some polar values can be much higher post-bias correction.
Expand Down Expand Up @@ -830,7 +772,7 @@ def _test_dtr_range(ds, var, data_type):
), "diurnal temperature range max is {} for non-polar regions".format(non_polar_max)


def _test_negative_values(ds, var):
def test_negative_values(ds, var):
"""
Tests for presence of negative values
"""
Expand All @@ -839,7 +781,7 @@ def _test_negative_values(ds, var):
assert neg_values == 0, "there are {} negative values!".format(neg_values)


def _test_maximum_precip(ds, var):
def test_maximum_precip(ds, var):
"""
Tests that max precip is reasonable
"""
Expand Down
56 changes: 53 additions & 3 deletions dodola/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from functools import wraps
import json
import logging
import dask
from dodola.core import (
xesmf_regrid,
standardize_gcm,
Expand All @@ -12,12 +13,18 @@
adjust_quantiledeltamapping,
train_analogdownscaling,
adjust_analogdownscaling,
validate_dataset,
dtr_floor,
non_polar_dtr_ceiling,
apply_precip_ceiling,
xclim_units_any2pint,
xclim_units_pint2cf,
test_for_nans,
test_variable_names,
test_timesteps,
test_temp_range,
test_dtr_range,
test_negative_values,
test_maximum_precip,
)
import dodola.repository as storage

Expand Down Expand Up @@ -701,6 +708,12 @@ def adjust_maximum_precipitation(x, out, threshold=3000.0):
def validate(x, var, data_type, time_period):
"""Performs validation on an input dataset
Valid for CMIP6, bias corrected and downscaled. Raises AssertionError when
validation fails.
This function performs more memory-intensive tests by reading input data
and subsetting to each year in the "time" dimension.
Parameters
----------
x : str
Expand All @@ -714,6 +727,43 @@ def validate(x, var, data_type, time_period):
Time period that input data should cover, used in validating the number of timesteps
in conjunction with the data type.
"""

# This is pretty rough but works to communicate the idea.
# Consider having failed tests raise something like ValidationError rather
# than AssertionErrors.
ds = storage.read(x)
validate_dataset(ds, var, data_type, time_period)

# These only read in Zarr Store metadata -- not memory intensive.
test_variable_names(ds, var)
test_timesteps(ds, data_type, time_period)

# Other test are done on annual selections with dask.delayed to
# avoid large memory errors.
# Doing all this here because this involves storage and I/O logic.
@dask.delayed
def memory_intensive_tests(f, v, t):
d = storage.read(f).sel(time=str(t))

test_for_nans(d, v)

if v == "tasmin":
test_temp_range(d, v)
elif v == "tasmax":
test_temp_range(d, v)
elif v == "dtr":
test_dtr_range(d, v, data_type)
test_negative_values(d, v)
elif v == "pr":
test_negative_values(d, v)
test_maximum_precip(d, v)
else:
raise ValueError(f"Argument {v=} not recognized")

# Assumes error thrown if had problem before this.
return True

tasks = []
for t in set(ds["time"].dt.year.data):
logger.debug(f"Validating year {t}")
tasks.append(memory_intensive_tests(x, var, t))
tasks = dask.compute(*tasks)
assert all(tasks) # Likely don't need this

0 comments on commit 696938a

Please sign in to comment.