diff --git a/.env.example b/.env.example index cf50fee..ef40b06 100644 --- a/.env.example +++ b/.env.example @@ -22,9 +22,6 @@ REDIS_URL="redis://redis:6379" # Skips environment validation in ~/src/env.js Used for builds #SKIP_ENV_VALIDATION=true -# Disables workflow instrumentation -#NO_WORKER=true - #CONFIGS # Sentry diff --git a/README.md b/README.md index 93dbdd9..8684855 100644 --- a/README.md +++ b/README.md @@ -114,7 +114,6 @@ To get started with Flowify, follow these steps: | SPOTIFY_CLIENT_ID | Spotify API client ID | | SPOTIFY_CLIENT_SECRET | Spotify API client secret | | REDIS_URL | Redis connection URL | - | NO_WORKER | Disables built-in workflow runner | 6. **Push the schema to your database:** diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..48ff643 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,33 @@ +services: + flowify: + build: + context: . + dockerfile: Dockerfile + ports: + - "3000:3000" + environment: + - NODE_ENV=production + - DATABASE_URL=${DATABASE_URL} + - REDIS_URL=${REDIS_URL} + - SPOTIFY_CLIENT_ID=${SPOTIFY_CLIENT_ID} + - SPOTIFY_CLIENT_SECRET=${SPOTIFY_CLIENT_SECRET} + - SKIP_ENV_VALIDATION=true + - NEXT_AUTH_SECRET=${NEXT_AUTH_SECRET} + - NEXTAUTH_URL=${NEXTAUTH_URL} + - TZ=Asia/Jakarta + env_file: + - .env + + flowify-worker: + build: + context: . + dockerfile: worker/Dockerfile + environment: + - NODE_ENV=production + - DATABASE_URL=${DATABASE_URL} + - REDIS_URL=${REDIS_URL} + - SPOTIFY_CLIENT_ID=${SPOTIFY_CLIENT_ID} + - SPOTIFY_CLIENT_SECRET=${SPOTIFY_CLIENT_SECRET} + - TZ=Asia/Jakarta + env_file: + - .env \ No newline at end of file diff --git a/next.config.js b/next.config.js index c44dfea..ed99799 100644 --- a/next.config.js +++ b/next.config.js @@ -5,7 +5,7 @@ import { withSentryConfig } from "@sentry/nextjs"; const config = { experimental: { - instrumentationHook: process.env.NO_WORKER ? false : true, + instrumentationHook: true }, output: process.env.STANDALONE_OUTPUT ? "standalone" : undefined, images: { diff --git a/src/app/playground/page.tsx b/src/app/playground/page.tsx deleted file mode 100644 index 0f4851c..0000000 --- a/src/app/playground/page.tsx +++ /dev/null @@ -1,140 +0,0 @@ -"use client"; - -import { Button } from "@/components/ui/button"; -import { - Card, - CardContent, - CardFooter, - CardHeader, - CardTitle, -} from "@/components/ui/card"; -import { Input } from "@/components/ui/input"; -import { Label } from "@/components/ui/label"; -import { useEffect, useState } from "react"; - -const CronExpressionGenerator = () => { - const [schedule, setSchedule] = useState("daily"); - const [hour, setHour] = useState("0"); - const [minute, setMinute] = useState("0"); - const [dayOfWeek, setDayOfWeek] = useState("1"); - const [dayOfMonth, setDayOfMonth] = useState("1"); - const [cronExpression, setCronExpression] = useState(""); - - const generateCronExpression = () => { - let expression = ""; - switch (schedule) { - case "daily": - expression = `${minute} ${hour} * * *`; - break; - case "weekly": - expression = `${minute} ${hour} * * ${dayOfWeek}`; - break; - case "monthly": - expression = `${minute} ${hour} ${dayOfMonth} * *`; - break; - default: - expression = "* * * * *"; - } - setCronExpression(expression); - }; - - useEffect(() => { - generateCronExpression(); - }, [schedule, hour, minute, dayOfWeek, dayOfMonth]); - - return ( - - - Cron Expression Generator - - -
-
- - -
- -
-
- - setHour(e.target.value)} - className="w-full mt-1" - /> -
-
- - setMinute(e.target.value)} - className="w-full mt-1" - /> -
-
- - {schedule === "weekly" && ( -
- - setDayOfWeek(e.target.value)} - className="w-full mt-1" - /> -
- )} - - {schedule === "monthly" && ( -
- - setDayOfMonth(e.target.value)} - className="w-full mt-1" - /> -
- )} -
-
- - - - - -
- ); -}; - -export default CronExpressionGenerator; diff --git a/src/instrumentation.ts b/src/instrumentation.ts index 3586aff..10aaace 100644 --- a/src/instrumentation.ts +++ b/src/instrumentation.ts @@ -1,62 +1,9 @@ -import { v4 as uuid } from "uuid"; -import { env } from "~/env"; -import { Runner } from "~/lib/workflow/Workflow"; -import { getAccessTokenFromUserId } from "~/server/db/helper"; - -export const register = async () => { - //This if statement is important, read here: https://nextjs.org/docs/app/building-your-application/optimizing/instrumentation - if (process.env.NEXT_RUNTIME === "nodejs" && !process.env.NO_WORKER) { - const hostname = (await import("os")).hostname(); - const WORKER_ID = - hostname + - "-" + - `${process.env.WORKER_ID ?? `instrumentation-${uuid()}`}`; - console.info("Registering worker"); - console.info("Worker ID", WORKER_ID); - const { Worker } = await import("bullmq"); - const Redis = (await import("ioredis")).default; - const updateWorkflowRun = ( - await import("~/lib/workflow/utils/workflowQueue") - ).updateWorkflowRun; - const connection = new Redis(env.REDIS_URL, { - maxRetriesPerRequest: null, - }); +export async function register() { + if (process.env.NEXT_RUNTIME === "nodejs") { + await import("../sentry.server.config"); + } - new Worker( - "workflowQueue", - async (job) => { - const data = job?.data; - await updateWorkflowRun(job.id!, "active", WORKER_ID); - if (!data) { - throw new Error("No data found in job"); - } - const accessToken = await getAccessTokenFromUserId( - data.userId as string, - ); - const runner = new Runner({ - slug: data.userId, - access_token: accessToken, - }); - const workflow = data.workflow as Workflow.WorkflowObject; - let res: any; - try { - res = await runner.runWorkflow(workflow); - } catch (e) { - await updateWorkflowRun(job.id!, "failed", WORKER_ID); - console.error("Error running workflow", e); - throw e; - } - await updateWorkflowRun(job.id!, "completed", WORKER_ID); - return res.map((obj: any) => obj.track.id); - }, - { - connection, - concurrency: 5, - removeOnComplete: { count: 1000 }, - removeOnFail: { count: 5000 }, - }, - ); - return; + if (process.env.NEXT_RUNTIME === "edge") { + await import("../sentry.edge.config"); } - console.info("Not registering worker"); -}; +} diff --git a/src/lib/cron.ts b/src/lib/cron.ts new file mode 100644 index 0000000..6236752 --- /dev/null +++ b/src/lib/cron.ts @@ -0,0 +1,63 @@ +import parser from "cron-parser"; + +/** + * Parses a cron expression and returns the schedule details. + * + * @param cronExpression - The cron expression to parse. + * @returns An object containing the schedule details. + */ +export const getScheduleFromCronExpression = (cronExpression) => { + if (!cronExpression || cronExpression === "unset") { + return { + interval: "unset", + scheduleTime: new Date(), + dayOfWeek: "*", + dayOfMonth: "*", + }; + } + + const [minute, hour, dayOfMonth, month, dayOfWeek] = + cronExpression.split(" "); + let interval = + dayOfWeek !== "*" && dayOfMonth === "*" + ? "weekly" + : dayOfMonth !== "*" && dayOfWeek === "*" + ? "monthly" + : dayOfMonth === "1" && month === "1" + ? "yearly" + : "daily"; + const scheduleTime = new Date(); + scheduleTime.setHours(parseInt(hour, 10)); + scheduleTime.setMinutes(parseInt(minute, 10)); + + return { + interval, + scheduleTime, + dayOfWeek: dayOfWeek === "*" ? "0" : dayOfWeek, + dayOfMonth: dayOfMonth === "*" ? "1" : dayOfMonth, + }; +}; + +/** + * Gets the next run time and the minutes until the next run based on a cron expression. + * @param cronExpression - The cron expression to parse. + * @returns An object containing the next run time and the minutes until the next run. + */ +export function getNextRunInfo(cronExpression: string) { + try { + const interval = parser.parseExpression(cronExpression); + const nextRun = interval.next().toDate(); + const now = new Date(); + const minutesUntilNextRun = Math.ceil( + (nextRun.getTime() - now.getTime()) / 60000, + ); + + return { + nextRun, + minutesUntilNextRun, + }; + } catch (err) { + console.error("Error parsing cron expression:", err); + return null; + } +} diff --git a/src/lib/workflow/utils/workflowQueue.ts b/src/lib/workflow/utils/workflowQueue.ts index 757a59e..a52fe1d 100644 --- a/src/lib/workflow/utils/workflowQueue.ts +++ b/src/lib/workflow/utils/workflowQueue.ts @@ -5,6 +5,7 @@ import { eq } from "drizzle-orm"; import Redis from "ioredis"; import { v4 as uuidv4 } from "uuid"; import { env } from "~/env"; +import { getNextRunInfo } from "~/lib/cron"; import { db } from "~/server/db"; import { workflowJobs, @@ -29,6 +30,79 @@ export const workflowQueue = new Queue("workflowQueue", { }, }); +async function syncRepeatable(job: any, userId: string) { + const workflow = job.data.workflow; + + const userPlan = await db.query.users.findFirst({ + where: (users, { eq }) => eq(users.id, userId), + columns: { + id: true, + planId: true, + }, + with: { + plan: { + columns: { + maxExecutionTime: true, + maxOperations: true, + }, + }, + }, + }); + + if (!userPlan) { + throw new Error("User has no plan"); + } + + if (userPlan.plan?.maxExecutionTime === 0) { + throw new Error("Plan does not allow execution"); + } + + if (workflow.operations.length > (userPlan.plan?.maxOperations || 0)) { + throw new Error("Plan does not allow this many operations"); + } + + try { + await workflowQueue.removeRepeatableByKey(job.data.workflow.id); + log.info("Deleted repeatable job", job.data.workflow.id); + } catch (err) { + log.info("Error deleting repeatable job", err); + } + + try { + const newJob = await workflowQueue.add( + "workflowQueue", + { workflow, userId, maxExecutionTime: userPlan?.plan?.maxExecutionTime }, + { + jobId: job.data.workflow.id, + repeat: { + pattern: job.data.cron, + key: job.data.workflow.id, + }, + }, + ); + + const runInfo = getNextRunInfo(job.data.cron); + if (runInfo) { + const { nextRun, minutesUntilNextRun } = runInfo; + log.info("Created repeatable job", { + jobId: newJob.id, + cron: job.data.cron, + nextRun, + now: new Date(), + minutesUntilNextRun, + }); + } else { + log.info("Created repeatable job", { + jobId: newJob.id, + cron: job.data.cron, + }); + } + } catch (error) { + log.error("Error creating repeatable job", error); + throw error; + } +} + /** * Stores a workflow job in the database. * @param userId - The ID of the user associated with the job. @@ -47,6 +121,9 @@ export async function storeWorkflowJob(userId: string, job: any) { const res = await db.query.workflowJobs.findFirst({ where: (workflowJobs, { eq }) => eq(workflowJobs.id, job.id as string), }); + if (job.data.cron && job.data.cron !== "unset") { + await syncRepeatable(job, userId); + } return res; } @@ -56,7 +133,7 @@ export async function storeWorkflowJob(userId: string, job: any) { * @param job - The job object containing the updated workflow job information. * @returns A Promise that resolves to the updated workflow job. */ -export async function updateWorkflowJob(_userId: string, job: any) { +export async function updateWorkflowJob(userId: string, job: any) { log.info("Updating workflow job", job); await db .update(workflowJobs) @@ -69,6 +146,9 @@ export async function updateWorkflowJob(_userId: string, job: any) { const res = await db.query.workflowJobs.findFirst({ where: (workflowJobs, { eq }) => eq(workflowJobs.id, job.id as string), }); + if (job.data.cron && job.data.cron !== "unset") { + await syncRepeatable(job, userId); + } return res; } @@ -256,39 +336,43 @@ export async function updateWorkflowRun( throw new Error("Job not found"); } + // Determine status if not provided if (!status) { - if (job.finishedOn) { - status = "completed"; - } else if (job.stacktrace) { - status = "failed"; - } else if (job.processedOn) { - status = "active"; - } else if (job.delay) { - status = "delayed"; - } + status = job.finishedOn + ? "completed" + : job.stacktrace + ? "failed" + : job.processedOn + ? "active" + : job.delay + ? "delayed" + : status; } - const finished = ["completed", "failed", "cancelled"].includes(status!); - - let completedAt; - if (finished) { - completedAt = new Date(); - } + const finished = ["completed", "failed", "cancelled"].includes( + status ?? "", + ); + const completedAt = finished ? new Date() : null; + // Compress return values if present if (returnValues?.length > 0) { returnValues = compressReturnValues(returnValues); } + const updateData = { + status: status ?? null, + error: job.failedReason ?? null, + completedAt, + workerId: workerId ?? null, + prevState: prevState ? JSON.stringify(prevState) : null, + returnValues: returnValues ? JSON.stringify(returnValues) : null, + }; + + log.info("Updating workflow run with data:", updateData); + await db .update(workflowRuns) - .set({ - status: status, - error: job.failedReason, - completedAt: completedAt, - workerId: workerId, - prevState: JSON.stringify(prevState), - returnValues: JSON.stringify(returnValues), - }) + .set(updateData) .where(eq(workflowRuns.id, jobId)); return "updated"; } catch (err) { diff --git a/worker/Dockerfile b/worker/Dockerfile index 3f14cea..9daaa3c 100644 --- a/worker/Dockerfile +++ b/worker/Dockerfile @@ -3,7 +3,7 @@ WORKDIR /app ENV NODE_ENV="production" FROM base as deps -RUN bun add postgres radash bullmq drizzle-orm ioredis spotify-web-api-node @t3-oss/env-nextjs lodash +RUN bun add postgres radash bullmq drizzle-orm ioredis spotify-web-api-node @t3-oss/env-nextjs lodash cron-parser FROM base as build COPY --from=deps /app/node_modules ./node_modules diff --git a/worker/worker.ts b/worker/worker.ts index 6097a45..f99660a 100644 --- a/worker/worker.ts +++ b/worker/worker.ts @@ -13,7 +13,7 @@ import { Logger } from "@lib/log"; import { db } from "@/server/db"; import { workerPool } from "~/server/db/schema"; import { eq } from "drizzle-orm"; -import { Base } from '../src/lib/workflow/Base'; +import { Base } from "../src/lib/workflow/Base"; const log = new Logger("worker"); @@ -47,39 +47,60 @@ const worker = new Worker( "workflowQueue", async (job, done) => { const data = job?.data; - const maxExecutionTime = data.maxExecutionTime || 60000; - await reportWorking(); - await updateWorkflowRun(job.id!, "active", WORKER_ID); + log.info("Running job", { jobId: job.id, data }); + if (!data) { throw new Error("No data found in job"); } + const maxExecutionTime = data.maxExecutionTime || 60000; + await reportWorking(); + + if (!job.id) { + throw new Error("Job ID is undefined"); + } + if (!WORKER_ID) { + throw new Error("Worker ID is undefined"); + } + + await updateWorkflowRun(job.id, "active", WORKER_ID); + const accessToken = await getAccessTokenFromUserId(data.userId as string); if (!accessToken) { await reportIdle(); - throw new Error("no access token"); + throw new Error("No access token"); } + const runner = new Runner({ slug: data.userId, access_token: accessToken, }); + const workflow = data.workflow as Workflow.WorkflowObject; - const operationCallback = createOperationCallback(job.id!); + const operationCallback = createOperationCallback(job.id); let res: any; + try { - log.info("Running workflow..." , { + log.info("Running workflow...", { workflowId: workflow.id, job: job.id, userId: data.userId, maxExecutionTime, numOfOperations: workflow.operations.length, }); - res = await runner.runWorkflow(workflow, maxExecutionTime, operationCallback); + + res = await runner.runWorkflow( + workflow, + maxExecutionTime, + operationCallback, + ); + res = compressReturnValues(res); } catch (e) { log.error("Error running workflow", e); throw e; } + log.info("Workflow executed successfully"); return res; }, @@ -92,6 +113,7 @@ const worker = new Worker( }, ); +// eslint-disable-next-line @typescript-eslint/no-misused-promises worker.on("completed", async (job) => { log.info(`Job ${job.id} completed`); await updateWorkflowRun(job.id!, "completed", WORKER_ID); @@ -102,6 +124,7 @@ worker.on("drained", () => { reportIdle(); }); +// eslint-disable-next-line @typescript-eslint/no-misused-promises worker.on("failed", async (job, err) => { log.error(`Job ${job?.id} failed`, err); if (job) { @@ -123,27 +146,21 @@ function createOperationCallback(workflowRunId: string) { async function reportInit() { log.info("Worker started"); - let i = 0; - await db - .insert(workerPool) - .values({ - deviceHash: WORKER_ID, - concurrency: CONCURRENCY, - threads: os.cpus().length, - status: "idle", - endpoint: WORKER_ENDPOINT, - }) - .onConflictDoUpdate({ - target: workerPool.deviceHash, - set: { - deviceHash: WORKER_ID, - concurrency: CONCURRENCY, - threads: os.cpus().length, - status: "idle", - endpoint: WORKER_ENDPOINT, - }, - }); + const workerData = { + deviceHash: WORKER_ID, + concurrency: CONCURRENCY, + threads: os.cpus().length, + status: "idle", + endpoint: WORKER_ENDPOINT, + }; + + log.info("Worker data for initialization:", workerData); + + await db.insert(workerPool).values(workerData).onConflictDoUpdate({ + target: workerPool.deviceHash, + set: workerData, + }); const updatedWorker = await db.query.workerPool.findFirst({ where: (workerPool, { eq }) => eq(workerPool.deviceHash, WORKER_ID), @@ -154,33 +171,45 @@ async function reportInit() { } async function reportExit() { + const workerData = { + status: "sleeping", + endpoint: WORKER_ENDPOINT, + }; + + log.info("Worker data for exit:", workerData); + await db .update(workerPool) - .set({ - status: "sleeping", - endpoint: WORKER_ENDPOINT, - }) + .set(workerData) .where(eq(workerPool.deviceHash, WORKER_ID)); log.info("Worker exited"); } async function reportWorking() { + const workerData = { + status: "working", + endpoint: WORKER_ENDPOINT, + }; + + log.info("Worker data for working:", workerData); + await db .update(workerPool) - .set({ - status: "working", - endpoint: WORKER_ENDPOINT, - }) + .set(workerData) .where(eq(workerPool.deviceHash, WORKER_ID)); } async function reportIdle() { + const workerData = { + status: "idle", + endpoint: WORKER_ENDPOINT, + }; + + log.info("Worker data for idle:", workerData); + await db .update(workerPool) - .set({ - status: "idle", - endpoint: WORKER_ENDPOINT, - }) + .set(workerData) .where(eq(workerPool.deviceHash, WORKER_ID)); }