-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use lithops to parallelize open_mfdataset #9932
base: main
Are you sure you want to change the base?
Changes from all commits
01e7518
83e553b
e44326d
4e4eeb0
d858059
d377780
3132f6a
900eef5
4c4462f
5b9b749
fadb953
4c412f5
ab49351
82d4127
47bc1f7
cf1960c
758293d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1365,6 +1365,9 @@ def open_groups( | |
return groups | ||
|
||
|
||
import warnings | ||
|
||
|
||
def open_mfdataset( | ||
paths: str | ||
| os.PathLike | ||
|
@@ -1480,9 +1483,10 @@ def open_mfdataset( | |
those corresponding to other dimensions. | ||
* list of str: The listed coordinate variables will be concatenated, | ||
in addition the "minimal" coordinates. | ||
parallel : bool, default: False | ||
If True, the open and preprocess steps of this function will be | ||
performed in parallel using ``dask.delayed``. Default is False. | ||
parallel : 'dask', 'lithops', or False | ||
Specify whether the open and preprocess steps of this function will be | ||
performed in parallel using ``dask.delayed``, in parallel using ``lithops.map``, or in serial. | ||
Default is False. Passing True is now a deprecated alias for passing 'dask'. | ||
join : {"outer", "inner", "left", "right", "exact", "override"}, default: "outer" | ||
String indicating how to combine differing indexes | ||
(excluding concat_dim) in objects | ||
|
@@ -1596,27 +1600,67 @@ def open_mfdataset( | |
|
||
open_kwargs = dict(engine=engine, chunks=chunks or {}, **kwargs) | ||
|
||
if parallel: | ||
if parallel is True: | ||
warnings.warn( | ||
"Passing ``parallel=True`` is deprecated, instead please pass ``parallel='dask'`` explicitly", | ||
PendingDeprecationWarning, | ||
stacklevel=2, | ||
) | ||
parallel = "dask" | ||
|
||
if parallel == "dask": | ||
import dask | ||
|
||
# wrap the open_dataset, getattr, and preprocess with delayed | ||
open_ = dask.delayed(open_dataset) | ||
getattr_ = dask.delayed(getattr) | ||
if preprocess is not None: | ||
preprocess = dask.delayed(preprocess) | ||
else: | ||
elif parallel == "lithops": | ||
import lithops | ||
|
||
# TODO use RetryingFunctionExecutor instead? | ||
fn_exec = lithops.FunctionExecutor() | ||
|
||
# lithops doesn't have a delayed primitive | ||
open_ = open_dataset | ||
# TODO I don't know how best to chain this with the getattr | ||
# getattr_ = getattr | ||
elif parallel is False: | ||
open_ = open_dataset | ||
getattr_ = getattr | ||
else: | ||
raise ValueError( | ||
f"{parallel} is an invalid option for the keyword argument ``parallel``" | ||
) | ||
|
||
datasets = [open_(p, **open_kwargs) for p in paths1d] | ||
closers = [getattr_(ds, "_close") for ds in datasets] | ||
if preprocess is not None: | ||
datasets = [preprocess(ds) for ds in datasets] | ||
if parallel == "dask": | ||
datasets = [open_(p, **open_kwargs) for p in paths1d] | ||
closers = [getattr_(ds, "_close") for ds in datasets] | ||
if preprocess is not None: | ||
datasets = [preprocess(ds) for ds in datasets] | ||
|
||
if parallel: | ||
# calling compute here will return the datasets/file_objs lists, | ||
# the underlying datasets will still be stored as dask arrays | ||
datasets, closers = dask.compute(datasets, closers) | ||
elif parallel == "lithops": | ||
|
||
def generate_lazy_ds(path): | ||
# allows passing the open_dataset function to lithops without evaluating it | ||
ds = open_(path, **kwargs) | ||
return ds | ||
|
||
futures = fn_exec.map(generate_lazy_ds, paths1d) | ||
|
||
# wait for all the serverless workers to finish, and send their resulting lazy datasets back to the client | ||
# TODO do we need download_results? | ||
completed_futures, _ = fn_exec.wait(futures, download_results=True) | ||
datasets = completed_futures.get_result() | ||
Comment on lines
+1653
to
+1658
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we can find an abstraction that works for both this (which is kinda like For example, maybe we can use (But I guess if we refactor the dask code as well we don't really need that idea) |
||
elif parallel is False: | ||
virtual_datasets = [open_(p, **kwargs) for p in paths1d] | ||
closers = [getattr_(ds, "_close") for ds in virtual_datasets] | ||
if preprocess is not None: | ||
virtual_datasets = [preprocess(ds) for ds in virtual_datasets] | ||
|
||
# Combine all datasets, closing them in case of a ValueError | ||
try: | ||
|
@@ -1654,7 +1698,9 @@ def open_mfdataset( | |
ds.close() | ||
raise | ||
|
||
combined.set_close(partial(_multi_file_closer, closers)) | ||
# TODO remove if once closers added above | ||
if parallel != "lithops": | ||
combined.set_close(partial(_multi_file_closer, closers)) | ||
|
||
# read global attributes from the attrs_file or from the first dataset | ||
if attrs_file is not None: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks potentially like a
functools.partial
with**kwargs
?