Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

context.df.Task.any launches tasks even if the task was already launched and sometimes return result from a different Task #574

Open
Ayzrian opened this issue Jan 24, 2024 · 6 comments
Labels
Needs: Investigation 🔍 A deeper investigation needs to be done by the project maintainers. P2 Priority 2 item question Further information is requested

Comments

@Ayzrian
Copy link

Ayzrian commented Jan 24, 2024

Describe the bug

First I would like to explain the idea I am trying to achieve with Durable Functions. I have two activities:

  • activity-get-next-products-batch - resolves a batch of products from data source and places into a blob storage file
  • activity-process-products - processes a batch of products produced by previous activity. In real world this can run really long

I want to achieve a result where each batch processing starts as soon as possible, but at the same time we keep producing next batch.

So when we resolve the batch, we do two actions

  • Start next activity-get-next-products-batch
  • Start activity-process-products for processing of the batch, this is pushed in the array of processing tasks to be awaited.

Then we do Task.any to wait for any of the jobs to finish (keep in mind that there can be multiple activity-process-products running ideally).

In this set-up Task.any can return two tasks:

  • activity-process-products - then we filter finished job from the list of tasks
  • activity-get-next-products-batch - then we schedule both jobs again, and keep waiting for previuos unfinished jobs with new call to Task.any

My expectations that if a job is already launched it won't be launched again we will keep waiting for it to finish, but what happnes in reallity that activity-get-next-products-batch invocation runs longer than activity-process-products, and when the code just want to wait for the batch to arrive the runtime schedules another job, rather than waiting for already scheduled. See gant chart obtained from Durable Functions Monitor.

image

And from the logs here we starting the job as TaskEventId 2 it runs longer that another job started for processing of products.

image

Then when processing is finished we just wait for the next batch to arrive, but the runtime schedules the task again, rather than waiting for already launched task... See the input is the same, but TaskEventId is 3 this time. The code will be attached below.

image


Another issue that I see that when in the code that when both tasks for any finish around the same time, the results are being returned from a wrong Task, though the code checks that the Task being returned === the task that was scheduled to gext batch.

Here on the screenshot below you can see that both tasks finished but have different output. Then we see logs that Get Products Task Finished, this code is

    const finishedTask: Task = yield context.df.Task.any(tasksToWait);

    if (getProductsTask === finishedTask) {

But what happens is that the Task.any actually returned result from a complete different task.

image

Investigative information

  • Durable Functions extension version: 2.1.12
  • durable-functions npm module version: 2.1.3
  • Language (JavaScript/TypeScript) and version: TypeScript 4.9.3
  • Node.js version: ~18.12.1 (Azure Function Apps ~18)

If deployed to Azure App Service

  • Timeframe issue observed: 2024-01-24T11:12:47.3924557Z - 2024-01-24T11:13:08.2962719Z
  • Function App name: fa-cms-dev-ne-002
  • Function name(s): bloomreach-full-feed-orchestrator
  • Region: NE
  • Orchestration instance ID(s): en-2024-01-24-asypevhui4i6dyu5njd3c

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

The code of orchestrator.

import { Logger } from '../../common/logger';
import { Task } from 'durable-functions/lib/src/task';
import { BloomreachFullFeedOrchestratorParams, GetNextProductsBatchResult, ProcessProductsResult } from './types';
import { RetryOptions } from 'durable-functions';

export default df.orchestrator(function* (context) {
  const logger = new Logger(context.log, {
    correlationId: context.df.newGuid(context.df.instanceId),
  });
  logger.setContext('bloomreach-full-feed-orchestrator');
  if (!context.df.isReplaying) {
    logger.info(`isReplaying=` + String(context.df.isReplaying));
  }

  if (!context.df.isReplaying) {
    logger.info(`Input payload:`, {
      input: JSON.stringify(context.df.getInput()),
    });
  }

  const params: BloomreachFullFeedOrchestratorParams = context.df.getInput();

  let lastCommerceId = null;
  let batchNumber = 1;

  if (!context.df.isReplaying) {
    logger.info('Scheduled first task to get products.');
  }

  let getProductsTask: Task | null = context.df.callActivityWithRetry(
    'activity-get-next-products-batch',
    new RetryOptions(1000, 1),
    {
      lastCommerceId,
      batchNumber,
      orchestratorId: context.bindingData.instanceId,
    }
  );
  let processingTasks: Task[] = [];
  const processingResults: ProcessProductsResult[] = [];

  while (processingTasks.length > 0 || getProductsTask !== null) {
    const tasksToWait = [...processingTasks];

    if (getProductsTask !== null) {
      tasksToWait.push(getProductsTask);
    }

    if (!context.df.isReplaying) {
      logger.info(`Waiting for any of the tasks to finish.`);
    }

    const finishedTask: Task = yield context.df.Task.any(tasksToWait);

    if (getProductsTask === finishedTask) {
      if (!context.df.isReplaying) {
        logger.info('Get Products Task Finished.');
      }

      if (!finishedTask.isCompleted) {
        if (!context.df.isReplaying) {
          logger.warn(`Get Products Task Failed...`);
        }
      }

      const result = finishedTask.result as GetNextProductsBatchResult;

      if (!context.df.isReplaying) {
        logger.info(`Result=${JSON.stringify(result)}`);
      }

      lastCommerceId = result.lastCommerceId;

      if (!context.df.isReplaying) {
        logger.info('Starting new processing job activity.');
      }

      processingTasks.push(
        context.df.callActivityWithRetry('activity-process-products', new RetryOptions(1000, 1), {
          locale: params.locale,
          inputFilePath: result.filePath,
        })
      );

      if (result.hasMoreProducts) {
        if (!context.df.isReplaying) {
          logger.info('Starting getting new batch activity.');
        }

        batchNumber++;
        getProductsTask = context.df.callActivity('activity-get-next-products-batch', {
          lastCommerceId,
          batchNumber,
          orchestratorId: context.bindingData.instanceId,
        });
      } else {
        getProductsTask = null;
        if (!context.df.isReplaying) {
          logger.info('Finished products downloading from the Commercetools.');
        }
      }
    } else {
      if (!context.df.isReplaying) {
        logger.info(
          `Batch processing activity finished. Removing from the active list. Length=${processingTasks.length}`
        );
      }

      processingTasks = processingTasks.filter((task) => finishedTask !== task);

      if (!context.df.isReplaying) {
        logger.info(`LengthAfterFilter=${processingTasks.length}`);
      }

      processingResults.push(finishedTask.result as ProcessProductsResult);
    }
  }

  if (!context.df.isReplaying) {
    logger.info(`Finished processing :)`);
  }

  // then use processing results to merge the files together and do
});

Expected behavior

a) The Task.any doesn't schedule a new task for already launched task.
b) The Task.any return the valid result for the task rather than a result from different task.

Actual behavior

a) The Task.any schedule a new task for already launched task.
b) The Task.any return the invalid result for the task

Screenshots

See above in the bug description

Known workarounds
MAY BE this could be workaround-ed if I treat each activity as a sub-orchestrator in "singleton" mode, where each of them has an id, and than parent somehow will do the similar logic with different subochestrators...

@Ayzrian
Copy link
Author

Ayzrian commented Jan 24, 2024

Hey, @castrodd , could you please take a look?

Sorry for pinging u, but see some recent activity from you on this repository.

Thanks.

@Ayzrian
Copy link
Author

Ayzrian commented Jan 24, 2024

Or maybe @davidmrdavid , you could take a look at this one?

p.s. sorry for pinging u :)

Thanks

@bachuv bachuv added question Further information is requested P2 Priority 2 item and removed Needs: Triage 🔍 labels Jan 30, 2024
@castrodd
Copy link
Member

castrodd commented Feb 5, 2024

@Ayzrian I will take a look soon and get back to you. Thank you for your patience!

@Ayzrian
Copy link
Author

Ayzrian commented Feb 22, 2024

Hey, @castrodd I wonder if you have any updates for it?

@castrodd castrodd self-assigned this Mar 28, 2024
@castrodd castrodd added the Needs: Investigation 🔍 A deeper investigation needs to be done by the project maintainers. label Mar 28, 2024
@Ayzrian
Copy link
Author

Ayzrian commented Apr 1, 2024

Hey team,

It has been more than a month already, do we have any updates for this issue?

Best Regards

@davidmrdavid
Copy link
Collaborator

Hi @Ayzrian - apologies for the delay here, it's been busy. I'll coordinate internally with @castrodd to help debug this one.

There's a lot to grok here but I think I recognize, from the Python SDK, the first behavior you mentioned, that a task inside a Task.any may be launched twice under certain conditions. For example, when provided again in a newly constructed Task.any, which seems to be what your code is doing.

This bug is in part a technical limitation of the legacy protocol the JS SDK is using to communicate with the C# Durable Functions code - the inter-process/out-of-process protocol does not have a notion of "Task ID", meaning the C# Durable Functions code is not able to filter out repeated sub-tasks in a TaskAny and instead believes all sub-tasks are brand new Task scheduling requests.

The latest out-of-process protocol handles this edge-case better, but that will take substantial effort to incorporate so that is not a short term fix. One way to fix this is to filter out, on the SDK side, any Task.Any sub-tasks that have already been scheduled from being sent over the protocol (as otherwise they'll be re-scheduled) but that creates other edge cases. Still, from our experience in the Python SDK, I've come to that performing this SDK-side filtering is a better trade-off, and that the new edge cases it introduces are less likely to be encountered by users. I'll work with @castrodd to develop that fix.

All that said, I'm don't quite understand your second bug report:

b) The Task.any return the invalid result for the task

Can you break this down further? A small example with concrete inputs and outputs would help me understand. Unfortunately, I can't quite download the attached pictures and zooming in with the browser makes them distorted, so I can't review the logs you shared.

Thanks!

@castrodd castrodd removed their assignment Aug 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Needs: Investigation 🔍 A deeper investigation needs to be done by the project maintainers. P2 Priority 2 item question Further information is requested
Projects
None yet
Development

No branches or pull requests

4 participants