Skip to content

Commit

Permalink
dask parallel need more check, to be simple, we set parallel to false…
Browse files Browse the repository at this point in the history
… when open mfdataset; simplify caravan cache dataset
  • Loading branch information
OuyangWenyu committed Oct 19, 2023
1 parent ec6e5fd commit c107c1e
Showing 1 changed file with 24 additions and 11 deletions.
35 changes: 24 additions & 11 deletions hydrodataset/caravan.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,12 +619,31 @@ def cache_attributes_xrdataset(self):
ds[var_name].attrs["units"] = converted_units[var_name]
return ds

def cache_xrdataset(self):
"""Save all attr data in a netcdf file in the cache directory, ts data are already nc format"""
def cache_xrdataset(self, checkregion="hysets"):
"""
Save all attr data in a netcdf file in the cache directory,
ts data are already nc format
Parameters
----------
checkregion : str, optional
as my experience, the dameged file is in hysets, by default "hysets"
"""
warnings.warn("Check you units of all variables")
if self.region != "Global":
raise ValueError("Only Global region is supported.")
pbar = tqdm(self.region_data_name, desc="Start Checking Data...")
ds_attr = self.cache_attributes_xrdataset()
ds_attr.to_netcdf(
os.path.join(
self.data_source_description["ATTR_DIR"],
"caravan_attributes.nc",
)
)
if checkregion is not None:
regions = [checkregion]
else:
regions = self.region_data_name
pbar = tqdm(regions, desc="Start Checking Data...")
for region in pbar:
pbar.set_description(f"Processing Region-{region}")
# all files are too large to read in memory, hence we read them region by region
Expand Down Expand Up @@ -686,13 +705,6 @@ def cache_xrdataset(self):
os.path.dirname(file_path), f"{gage_id}.nc"
)
the_ncfile_data.to_netcdf(the_ncfile)
ds_attr = self.cache_attributes_xrdataset()
ds_attr.to_netcdf(
os.path.join(
self.data_source_description["ATTR_DIR"],
"caravan_attributes.nc",
)
)

def read_attr_xrdataset(self, gage_id_lst=None, var_lst=None, **kwargs):
# Define the path to the attributes file
Expand Down Expand Up @@ -727,8 +739,9 @@ def read_ts_xrdataset(self, gage_id_lst, t_range, var_lst, **kwargs):
file_paths.append(file_path)

# Open the dataset in a lazy manner using dask
parallel = kwargs.get("parallel", False)
combined_ds = xr.open_mfdataset(
file_paths, combine="nested", concat_dim="basin", parallel=True
file_paths, combine="nested", concat_dim="basin", parallel=parallel
)

def extract_unit(variable_name, units_string):
Expand Down

0 comments on commit c107c1e

Please sign in to comment.