Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

context.df.Task.all throws as soon as one task fails so one can't get the result of other tasks #477

Open
GP4cK opened this issue Feb 13, 2023 · 14 comments
Assignees
Labels
Enhancement New feature or request P2 Priority 2 item
Milestone

Comments

@GP4cK
Copy link

GP4cK commented Feb 13, 2023

Describe the bug
context.df.Task.all throws as soon as one task fails so one can't get the result of other tasks. I've made a minimum reproducible repo here: https://github.com/GP4cK/azure-durable-task-all-bug.

Investigative information

  • Durable Functions extension version: [3.*, 4.0.0)
  • durable-functions npm module version: 2.1.1
  • Language (JavaScript/TypeScript) and version: Typescript
  • Node.js version: v16.19.0

To Reproduce
Steps to reproduce the behavior:

  1. Clone https://github.com/GP4cK/azure-durable-task-all-bug
  2. Follow the steps in the README

Expected behavior
I'm expecting to be able to loop over the tasks and that for each task, task.isCompleted || task.isFaulted should be true and task.result should not be undefined (if the Activity/SubOrchestration is returning something).

Actual behavior
When the first task fails, the second task is neither completed or faulted and its result is undefined.

Known workarounds
I guess the workaround is to not use the fan-out fan-in pattern and use the chaining pattern instead.

@ghost ghost added the Needs: Triage 🔍 label Feb 13, 2023
@davidmrdavid
Copy link
Collaborator

@GP4K: I noticed that reproducer has what looks like an explicit storage account connection string. You may want to remove it if so.

Looking at this issue shortly

@davidmrdavid
Copy link
Collaborator

davidmrdavid commented Feb 13, 2023

@GP4cK:

Ah, you've stumbled upon a bit of a low level detail in the implementation. Let me try to clarify what's going on and how to achieve the result you want.

When you schedule tasks with Task.all, all those task get scheduled and are guaranteed to produce a result. However, the semantics of Task.all only need a single failing task in order to error out. Therefore, as soon as single task results in an exception (and that exception becomes available to your orchestrator), your orchestrator will stop waiting for the other results and move on to the next line of your code.

That's pretty much what's happening here. Your orchestrator is no longer waiting for other results. It should be noted that this is the intended behavior. Put simply Task.all only waits for all results if all tasks succeed. However, those tasks are running in the background and will produce a result, so obtaining them is possible, but it requires a few extra steps.

Given today's API, to obtain the result of your ActivitySuccess task, you would need to explicitly yield it after the Task.all operation produces its exception. That tells the framework that you want to "materialize" the result of that task, and therefore the framework will do the work of fetching its result, if available, from the orchestrator History. If it's available, it will provide. If it's not available yet, the orchestrator will suspend until the task provides an output.

To summarize: you can use Task.all to schedule, in parallel, many tasks. However, if any of them fail, Task.all will short-circuit waiting for all results and produce an exception. After they've been scheduled, yu can force a task's result by yield'ing it explicitly. Since you have a collection of tasks whose results you want to obtain, then you could just iterate over them (assuming you have them saved in a list) via a .map operation or something equivalent.

Please let me know if that clarifies your question. Thanks!

FYI - @hossam-nasr. This is an interesting case. Perhaps something we can facilitate in future releases.

@davidmrdavid davidmrdavid added Needs: Author Feedback Waiting for the author of the issue to respond to a question and removed Needs: Triage 🔍 labels Feb 13, 2023
@GP4cK
Copy link
Author

GP4cK commented Feb 13, 2023

@GP4K: I noticed that reproducer has what looks like an explicit storage account connection string. You may want to remove it if so.

That's the default connection string to Azurite so it's ok.

Thanks for the explanation. So if I translate it to code, that should be it, right?

try {
  yield context.df.Task.all(tasks);
} catch (err) {
  // One task failed
}

for (let task of tasks) {
  try { // Could yield task throw? Do I need to use try/catch?
    yield task;
    // Do something with task.result
  } catch (err) {
  }
}

Or should I filter the tasks that haven't completed yet and call Task.all recursively like:

const pendingTasks = task.filter((task) => !task.isCompleted && !task.isFaulted);
if (pendingTasks.length === 0) return;
try {
  yield context.df.Task.all(pendingTasks);
} catch (err) {
  // check again if there are pending tasks
}

@ghost ghost added Needs: Attention 👋 and removed Needs: Author Feedback Waiting for the author of the issue to respond to a question labels Feb 13, 2023
@davidmrdavid
Copy link
Collaborator

I recommend option 1 as it guarantees that, at the end of the loop, all your tasks have completed. With your second snippet, you have to check again and again on different subsets of "pendingTasks", as you noted.

@davidmrdavid davidmrdavid added Needs: Author Feedback Waiting for the author of the issue to respond to a question and removed Needs: Attention 👋 labels Feb 14, 2023
@GP4cK
Copy link
Author

GP4cK commented Feb 14, 2023

I see. But regarding performances, it would be roughly the same since the tasks have been initially scheduled by the orchestrator, right?

@ghost ghost added Needs: Attention 👋 and removed Needs: Author Feedback Waiting for the author of the issue to respond to a question labels Feb 14, 2023
@davidmrdavid
Copy link
Collaborator

Yes, that's correct. Only the first Task.all should be needed to schedule all tasks. Performance wise, this shouldn't be an issue.

@davidmrdavid davidmrdavid added Needs: Author Feedback Waiting for the author of the issue to respond to a question and removed Needs: Attention 👋 labels Feb 14, 2023
@GP4cK
Copy link
Author

GP4cK commented Feb 14, 2023

Thank you very much for your help. It would be really nice if this was documented because there are other issues that are confusing. Ex: #66 (comment)

@ghost ghost added Needs: Attention 👋 and removed Needs: Author Feedback Waiting for the author of the issue to respond to a question labels Feb 14, 2023
@ejizba
Copy link
Contributor

ejizba commented Feb 14, 2023

It appears like Task.all is consistent with the Node.js Promise.all method in terms of throwing on the first error. However, Node.js also has a Promise.allSettled method which I believe is consistent with the requested behavior. Perhaps durable should add a Task.allSettled method to match

@danieltskv
Copy link

danieltskv commented Feb 23, 2023

Hi @davidmrdavid,

I have a similar scenario where I want to schedule multiple parallel activity functions where some can fail or throw an error, and thats is fine, as I will only use the results of the ones that succeeded.

The issue I'm noticing with the proposed solution to @GP4cK's code is that some activity functions that eventually succeed are being called multiple times.

I was assuming the later loop that iterates over the tasks simply continues or fetches the result, but it seems that when the replay happens on the orchestrator function, some activity functions are ran again. I notice this by putting breakpoints and logs inside the activity functions. I can provide sample code if needed.

Is this expected behavior? Is there anything I can do to prevent re-calling the activity functions? I.e., only run each activity functions once, regardless of its result.

Thank you

@davidmrdavid
Copy link
Collaborator

Hi @danieltskv,

That is definitely not expected behavior. Do you think you could open a new issue, link this ticket in it, and provide some reproducer code? That would help us get to the bottom of this. Thanks!

@danieltskv
Copy link

@davidmrdavid
Done: #485 Thanks!

@IDrumsey
Copy link

+1 for the Task.allSettled function as mentioned by @ejizba. Would be helpful.

@IDrumsey
Copy link

I recommend option 1 as it guarantees that, at the end of the loop, all your tasks have completed. With your second snippet, you have to check again and again on different subsets of "pendingTasks", as you noted.

@davidmrdavid, I have attempted to apply solution 1 (from @GP4cK) in Python. It may work for Node.js, but doesn't seem to work for Python. Here is the code I'm running and I can move this to a new issue in https://github.com/Azure/azure-functions-durable-python/issues if requested.

orchestrator

def orchestrator_function(context: df.DurableOrchestrationContext):

    tasks = [
        context.call_activity('Hello1'),
        context.call_activity('Hello1'),
        context.call_activity('Hello1')
    ]

    try:

        allResults = yield context.task_all(tasks)
    
    except Exception as e:

        # one of the tasks threw an exception so the orchestrator is no longer waiting for the tasks to all complete
        # Need to re-attach the orchestrator to the tasks to wait for them
        # https://github.com/Azure/azure-functions-durable-js/issues/477#issuecomment-1428850454

        for task in tasks:

            try:
                result = yield task
            
            except Exception as e:

                pass


    return 'done'

main = df.Orchestrator.create(orchestrator_function)

Hello1 activity function

def main(name: str) -> str:

    raise Exception('test error')

After running the orchestrator function, I receive the following response

Orchestrator function 'test1' failed: 'AtomicTask' object has no attribute 'append'

Pretty sure this relates to this comment of yours. This issue may resolve this my issue as well? Looks like you had said it would take a couple of weeks awhile ago and it hasn't been resolved, is there a timeline for this?

Is there a workaround for this currently? I want to have the results for all successful tasks even if some of the tasks raised an exception. The only way I can think of currently is to run some non-orchestrator code that externally calls the activity function for each task and track each task individually. This way the exception handling will be for each call, not task_all and I likely wouldn't run into the AtomicTask bug. What are your thoughts on this whole situation? Again, I think @ejizba's solution would be a good long term solution.

@davidmrdavid
Copy link
Collaborator

Hi @IDrumsey:

Yes it would be great if you could re-open that request in the Python SDK repo (https://github.com/Azure/azure-functions-durable-python/issues ) and attach a .zip'ed reproducer that I can use the quickly identify the issue. Thanks!

@hossam-nasr hossam-nasr added Enhancement New feature or request and removed Needs: Attention 👋 labels Apr 10, 2023
@lilyjma lilyjma self-assigned this Sep 21, 2023
@lilyjma lilyjma added the P2 Priority 2 item label Jan 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Enhancement New feature or request P2 Priority 2 item
Projects
None yet
Development

No branches or pull requests

7 participants