Replies: 11 comments 21 replies
-
@cjao @venkatBala (In response to the setup and teardown methods) How about adding a resource class instead of setup and teardown method added to the Executor class? Does it make sense to keep infrastructure provisioning and electron execution separate? For example, let's consider the AWS BatchExecutor. Maybe we can add a AWSBatchResource class to the plugin.
|
Beta Was this translation helpful? Give feedback.
-
Tagging @Emmanuel289 as well. |
Beta Was this translation helpful? Give feedback.
-
@FyzHsn , @cjao the proposed UX is given here. https://www.notion.so/Sharable-common-Executers-45634f68265d4972a81e63cb796f1e91. Quoting it UX could be something like executer=ct.executers.AWSEC2Executer(ncores=2)
e1=executer.get_shared_instance()
e2=executer.get_shared_instance()
@ct.electron(executer=e1)
def a1():...
@ct.electron(executer=e1)
def a2():...
@ct.electron(executer=executer)
def a3():...
@ct.electron(executer=e2)
def a4():...
@ct.electron(executer=e2)
def a5():...
@ct.lattice
def workflow():
r=a1()
r=a3(r)
r=a2(r)
r=a3(r)
r=a4(r)
r=a5(r)
return r Even though a2 happens after a3, since a1 and a2 share an instance of EC2 executer, the same EC2 that is created for a1 is kept alive until node having a2 is reached. And for Note that this would be true for all executers. For instance, for executers where there is no concept of spin up or shutdown (SSH executers, local executer etc..) there is no concept of "needing to wait for all electrons to complete" @FyzHsn this would alleviate your concern on forcing things to run serially. We want the user to specify if they want to run things in the same instance or use a different. We do this by giving them an option to pass typed instances of executers. |
Beta Was this translation helpful? Give feedback.
-
Slightly unrelated benefit of this will be that we will be able to use the same Dask client for multiple different electrons. Hence the code there will simplify and we will be able to get rid of the global client dictionary. |
Beta Was this translation helpful? Give feedback.
-
The above ideas are mocked up in this branch. Toy import covalent as ct
from covalent.executor import DaskExecutor
from dask.distributed import LocalCluster
dask_exec = DaskExecutor(lc.scheduler_address)
@ct.electron(executor=dask_exec)
def task(x):
return x
@ct.electron(executor="local")
def local_task(x):
return x
@ct.lattice(workflow_executor=dask_exec)
def workflow(x):
res1 = task(x)
res2 = local_task(res1)
res3 = task(res2)
return 1 from a jupyter notebook, then the logs show that
|
Beta Was this translation helpful? Give feedback.
-
Some UX questions that boil down to the appropriate choice of defaults:
This tightly couples an executor instance to the computational resource it represents: one executor instance, one resource. So for example, there would be a one-to-one correspondence between instances of One concern raised by @santoshkumarradha is that in the common scenario where users do want to segregate resources between electrons, they would need to construct a new executor plugin instance which could be unwieldy when many parameters are required to initialize the plugin. I have tried to mitigate this by introducing a e1=ct.executers.AWSEC2Executer(ncores=2)
e2 = e1.clone() # Same credentials and machine specifications as e1 but different instance_id electrons using ct.electron(executor=e1.clone())
def task_1():
...
ct.electron(executor=e1.clone())
def task_2():
...
executer=ct.executers.AWSEC2Executer(ncores=2)
e1=executer.get_shared_instance()
e2=executer.get_shared_instance() makes it easier to allocate exclusive hardware resources to each electron. In this case, two electrons that specify the same executor instance would not share hardware unless the instance is a "shared instance" obtained through |
Beta Was this translation helpful? Give feedback.
-
As discussed in Slack, binding an executor plugin instance to a hardware instance would fail to handle the use case where the same electron is to be run multiple times in parallel in different hardware instances. Accordingly, the import covalent as ct
from covalent.executor import DaskExecutor
from dask.distributed import LocalCluster
dask_exec = DaskExecutor(lc.scheduler_address).get_shared_instance()
@ct.electron(executor=dask_exec)
def task(x):
return x
@ct.electron(executor="local")
def local_task(x):
return x
@ct.lattice(workflow_executor=dask_exec)
def workflow(x):
res1 = task(x)
res2 = local_task(res1)
res3 = task(res2)
return 1 which yields similar results
The special value Moreover, here is a revised conceptual framework for thinking about executors plugins, and the the hardware resources they represent, from the client's point of view:
|
Beta Was this translation helpful? Give feedback.
-
One potential issue is that the semantics of In the AWSLambda executor, however, The EC2 executor fares better. There, |
Beta Was this translation helpful? Give feedback.
-
Here some more detailed design notes by domain. Executor -- workflow construction:Executor has some new public attributes:
When an executor instance is passed to an electron decorator, its Example (with fake ex1 = EC2Executor() # instance_id = 1
ex2 = ex1.get_shared_instance() # instance_id = 2
@ct.electron(executor=ex1)
def task_1():
pass
@ct.electron(executor=ex2)
def task_2():
pass
@ct.lattice:
def workflow():
res11 = task_1()
res12 = task_1()
res21 = task_2()
res22 = task_2()
workflow.build_graph() This transport graph has four nodes with the following executor
In addition, collection and dunder tasks (@FyzHsn @wjcunningham7) are automatically assigned the same executor instance as the main task. Specifically, when Covalent generates either a collection or dunder task, it sets the main task's executor to Example: ex = EC2Executor() # instance_id = 1
@ct.electron(executor=ex)
def collect(arr: List):
return list(arr)
@ct.lattice
def workflow():
collection_1 = collect([1, 2])
workflow.build_graph() Here, Executor -- backend:
async def cancel(self, dispatch_id, node_id):
fut = self.get_task_data(dispatch_id, node_id, "future")
await fut.cancel()
DispatcherThe dispatcher runs all tasks with the same Changes to
ExecutorCache: tracks executor instances during workflow execution
When the dispatcher attempts to retrieve the executor for a task, it first looks in the Result Object Unplanned tasks: The Executor plugin supportTo make an executor plugin work with these semantics, a few simple rules must be observed:
LimitationsThe benefits of sharing tasks in this manner will vary by executor. They will be largest for executors where Covalent has full control of the resource allocation and task scheduling, such as the |
Beta Was this translation helpful? Give feedback.
-
@cjao @wjcunningham7 I want to bring up another point in resource sharing:
Thoughts? |
Beta Was this translation helpful? Give feedback.
-
Notion planning page (see also here)
Remote executors often have significant startup delays, whether it is to spin up a virtual machine in the cloud or boot a bare-metal Slurm compute node. One would like the ability to not only allocate multiple tasks to the same executor instance, but also keep the underlying hardware resources alive until the last assigned task completes. This would amortize the provisioning costs over multiple task runs.
Note: the present situation is actually slightly worse than before PR #754. The SDK now sends a JSON representation of each instance, and the the server constructs a new executor plugin instance for each task from the JSON data. Previously, users could assign the same executor plugin instance to multiple electrons, and the instances themselves would be pickled over to the dispatcher. This at least allowed the same plugin instance to be shared by multiple tasks, although there was still no mechanism to reserve the executor's underlying resources for multiple tasks.
Assumption:
when users specify an executor plugin instance during workflow construction (as opposed to a mere "short name" like "dask" or "local"), they intend for all electrons decorated with that instance to use the same resources exposed by that instance (e.g. same set of Slurm nodes).Proposed changes:
setup()
andteardown/cleanup()
(@venkatBala :) ). Thesetup()
method is invoked upon instantiation to provision resources, such as by spinning up a EC2 instance or reserving Slurm nodes.execute()
decrements the counter before returning; when the counter reaches 0, thecleanup()
method is invoked to release the resources.Beta Was this translation helpful? Give feedback.
All reactions