Skip to content

Commit

Permalink
feat: scheduled workflow run worker, remove builtin worker (instrumen…
Browse files Browse the repository at this point in the history
…tation)
  • Loading branch information
radityaharya authored Aug 3, 2024
1 parent 630c751 commit d4b9099
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 270 deletions.
3 changes: 0 additions & 3 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ REDIS_URL="redis://redis:6379"
# Skips environment validation in ~/src/env.js Used for builds
#SKIP_ENV_VALIDATION=true

# Disables workflow instrumentation
#NO_WORKER=true

#CONFIGS

# Sentry
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ To get started with Flowify, follow these steps:
| SPOTIFY_CLIENT_ID | Spotify API client ID |
| SPOTIFY_CLIENT_SECRET | Spotify API client secret |
| REDIS_URL | Redis connection URL |
| NO_WORKER | Disables built-in workflow runner |

6. **Push the schema to your database:**

Expand Down
33 changes: 33 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
services:
flowify:
build:
context: .
dockerfile: Dockerfile
ports:
- "3000:3000"
environment:
- NODE_ENV=production
- DATABASE_URL=${DATABASE_URL}
- REDIS_URL=${REDIS_URL}
- SPOTIFY_CLIENT_ID=${SPOTIFY_CLIENT_ID}
- SPOTIFY_CLIENT_SECRET=${SPOTIFY_CLIENT_SECRET}
- SKIP_ENV_VALIDATION=true
- NEXT_AUTH_SECRET=${NEXT_AUTH_SECRET}
- NEXTAUTH_URL=${NEXTAUTH_URL}
- TZ=Asia/Jakarta
env_file:
- .env

flowify-worker:
build:
context: .
dockerfile: worker/Dockerfile
environment:
- NODE_ENV=production
- DATABASE_URL=${DATABASE_URL}
- REDIS_URL=${REDIS_URL}
- SPOTIFY_CLIENT_ID=${SPOTIFY_CLIENT_ID}
- SPOTIFY_CLIENT_SECRET=${SPOTIFY_CLIENT_SECRET}
- TZ=Asia/Jakarta
env_file:
- .env
2 changes: 1 addition & 1 deletion next.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { withSentryConfig } from "@sentry/nextjs";

const config = {
experimental: {
instrumentationHook: process.env.NO_WORKER ? false : true,
instrumentationHook: true
},
output: process.env.STANDALONE_OUTPUT ? "standalone" : undefined,
images: {
Expand Down
140 changes: 0 additions & 140 deletions src/app/playground/page.tsx

This file was deleted.

67 changes: 7 additions & 60 deletions src/instrumentation.ts
Original file line number Diff line number Diff line change
@@ -1,62 +1,9 @@
import { v4 as uuid } from "uuid";
import { env } from "~/env";
import { Runner } from "~/lib/workflow/Workflow";
import { getAccessTokenFromUserId } from "~/server/db/helper";

export const register = async () => {
//This if statement is important, read here: https://nextjs.org/docs/app/building-your-application/optimizing/instrumentation
if (process.env.NEXT_RUNTIME === "nodejs" && !process.env.NO_WORKER) {
const hostname = (await import("os")).hostname();
const WORKER_ID =
hostname +
"-" +
`${process.env.WORKER_ID ?? `instrumentation-${uuid()}`}`;
console.info("Registering worker");
console.info("Worker ID", WORKER_ID);
const { Worker } = await import("bullmq");
const Redis = (await import("ioredis")).default;
const updateWorkflowRun = (
await import("~/lib/workflow/utils/workflowQueue")
).updateWorkflowRun;
const connection = new Redis(env.REDIS_URL, {
maxRetriesPerRequest: null,
});
export async function register() {
if (process.env.NEXT_RUNTIME === "nodejs") {
await import("../sentry.server.config");
}

new Worker(
"workflowQueue",
async (job) => {
const data = job?.data;
await updateWorkflowRun(job.id!, "active", WORKER_ID);
if (!data) {
throw new Error("No data found in job");
}
const accessToken = await getAccessTokenFromUserId(
data.userId as string,
);
const runner = new Runner({
slug: data.userId,
access_token: accessToken,
});
const workflow = data.workflow as Workflow.WorkflowObject;
let res: any;
try {
res = await runner.runWorkflow(workflow);
} catch (e) {
await updateWorkflowRun(job.id!, "failed", WORKER_ID);
console.error("Error running workflow", e);
throw e;
}
await updateWorkflowRun(job.id!, "completed", WORKER_ID);
return res.map((obj: any) => obj.track.id);
},
{
connection,
concurrency: 5,
removeOnComplete: { count: 1000 },
removeOnFail: { count: 5000 },
},
);
return;
if (process.env.NEXT_RUNTIME === "edge") {
await import("../sentry.edge.config");
}
console.info("Not registering worker");
};
}
63 changes: 63 additions & 0 deletions src/lib/cron.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import parser from "cron-parser";

/**
* Parses a cron expression and returns the schedule details.
*
* @param cronExpression - The cron expression to parse.
* @returns An object containing the schedule details.
*/
export const getScheduleFromCronExpression = (cronExpression) => {
if (!cronExpression || cronExpression === "unset") {
return {
interval: "unset",
scheduleTime: new Date(),
dayOfWeek: "*",
dayOfMonth: "*",
};
}

const [minute, hour, dayOfMonth, month, dayOfWeek] =
cronExpression.split(" ");
let interval =
dayOfWeek !== "*" && dayOfMonth === "*"
? "weekly"
: dayOfMonth !== "*" && dayOfWeek === "*"
? "monthly"
: dayOfMonth === "1" && month === "1"
? "yearly"
: "daily";
const scheduleTime = new Date();
scheduleTime.setHours(parseInt(hour, 10));
scheduleTime.setMinutes(parseInt(minute, 10));

return {
interval,
scheduleTime,
dayOfWeek: dayOfWeek === "*" ? "0" : dayOfWeek,
dayOfMonth: dayOfMonth === "*" ? "1" : dayOfMonth,
};
};

/**
* Gets the next run time and the minutes until the next run based on a cron expression.
* @param cronExpression - The cron expression to parse.
* @returns An object containing the next run time and the minutes until the next run.
*/
export function getNextRunInfo(cronExpression: string) {
try {
const interval = parser.parseExpression(cronExpression);
const nextRun = interval.next().toDate();
const now = new Date();
const minutesUntilNextRun = Math.ceil(
(nextRun.getTime() - now.getTime()) / 60000,
);

return {
nextRun,
minutesUntilNextRun,
};
} catch (err) {
console.error("Error parsing cron expression:", err);
return null;
}
}
Loading

0 comments on commit d4b9099

Please sign in to comment.