Skip to content

Commit

Permalink
Add cancelOn configuration (#82)
Browse files Browse the repository at this point in the history
- followed similar pattern to `InngestFunctionTriggers` but didn't reuse
  that class for a few reasons
  - `if` concept is similar but for triggers it gets serialized as `expression` but is `if` for cancel
  - `timeout` is only used by cancel and not for triggers and vice versa with `cron`, so it seems the `InngestFunctionTrigger` class's fields would be sparsely populated for all the concrete cases and not provide too much value in sharing code
  - While I like the idea of calling the "thing" that sets off Cancel as "CancelTrigger" or "CancellationTrigger", I thought this could be confusing since [triggers are their own key under configuration](https://github.com/inngest/inngest/blame/0ac11f2c312c066e517e53749052dd89fd2926ba/docs/SDK_SPEC.md#L478) 
- `Cancellation` name avoided "trigger" for reason above and followed pattern in https://github.com/inngest/inngest-js/blob/0e51903d5968a7287dd7e518bd5cc8acec3e6f3e/packages/inngest/src/types.ts#L901
- I originally was going to implement with `match` as well per https://www.inngest.com/docs/reference/typescript/functions/cancel-on, but I saw later that [it's currently deprecated so I stuck to `if` only](https://github.com/inngest/inngest-js/blob/0e51903d5968a7287dd7e518bd5cc8acec3e6f3e/packages/inngest/src/types.ts#L946). 
- Per the flexibility of timeout in https://github.com/inngest/inngest/blob/0ac11f2c312c066e517e53749052dd89fd2926ba/docs/SDK_SPEC.md?plain=1#L580-L583 The type for timeout is either `Duration` or `Instant`.
  • Loading branch information
albertchae authored Sep 13, 2024
1 parent 37aabd7 commit ccddba0
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.inngest.springbootdemo.testfunctions;

import com.inngest.FunctionContext;
import com.inngest.InngestFunction;
import com.inngest.InngestFunctionConfigBuilder;
import com.inngest.Step;
import org.jetbrains.annotations.NotNull;

public class CancelOnEventFunction extends InngestFunction {

@NotNull
@Override
public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) {
return builder
.id("cancelable-fn")
.name("Cancelable Function")
.cancelOn("cancel/cancelable")
.triggerEvent("test/cancelable");
}

@Override
public String execute(FunctionContext ctx, Step step) {
step.waitForEvent("wait-forever",
"test/waiting-for-godot",
"10m",
null);

return "I didn't get canceled";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.inngest.springbootdemo.testfunctions;

import com.inngest.FunctionContext;
import com.inngest.InngestFunction;
import com.inngest.InngestFunctionConfigBuilder;
import com.inngest.Step;
import org.jetbrains.annotations.NotNull;


public class CancelOnMatchFunction extends InngestFunction {

@NotNull
@Override
public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) {
return builder
.id("cancel-on-match-fn")
.name("Cancel On Match Function")
.cancelOn("cancel/cancel-on-match", "event.data.userId == async.data.userId")
.triggerEvent("test/cancel-on-match");
}

@Override
public String execute(FunctionContext ctx, Step step) {
step.waitForEvent("wait-forever",
"test/waiting-for-godot",
"10m",
null);

return "I didn't get canceled";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.inngest.springbootdemo;

import com.inngest.Inngest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Collections;

import static org.junit.jupiter.api.Assertions.assertEquals;

@IntegrationTest
@Execution(ExecutionMode.CONCURRENT)
class CancellationIntegrationTest {
@Autowired
private DevServerComponent devServer;

@Autowired
private Inngest client;

@Test
void testCancelOnEventOnly() throws Exception {
String event = InngestFunctionTestHelpers.sendEvent(client, "test/cancelable").getIds()[0];
Thread.sleep(1000);
InngestFunctionTestHelpers.sendEvent(client, "cancel/cancelable");
Thread.sleep(1000);

RunEntry<Object> run = devServer.runsByEvent(event).first();

assertEquals("Cancelled", run.getStatus());
}

@Test
void testCancelOnIf() throws Exception {
String user23Event = InngestFunctionTestHelpers.sendEvent(client, "test/cancel-on-match", Collections.singletonMap("userId", "23")).getIds()[0];
String user42Event = InngestFunctionTestHelpers.sendEvent(client, "test/cancel-on-match", Collections.singletonMap("userId", "42")).getIds()[0];
Thread.sleep(1000);
InngestFunctionTestHelpers.sendEvent(client, "cancel/cancel-on-match", Collections.singletonMap("userId", "42"));
Thread.sleep(1000);

// Only the event matching the if expression is canceled
assertEquals("Running", devServer.runsByEvent(user23Event).first().getStatus());
assertEquals("Cancelled", devServer.runsByEvent(user42Event).first().getStatus());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ protected HashMap<String, InngestFunction> functions() {
addInngestFunction(functions, new MultiplyMatrixFunction());
addInngestFunction(functions, new WithOnFailureFunction());
addInngestFunction(functions, new LoopFunction());
addInngestFunction(functions, new CancelOnEventFunction());
addInngestFunction(functions, new CancelOnMatchFunction());

return functions;
}
Expand Down
2 changes: 2 additions & 0 deletions inngest/src/main/kotlin/com/inngest/Function.kt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ internal class InternalFunctionConfig
@Json(serializeNull = false)
val idempotency: String? = null,
@Json(serializeNull = false)
val cancel: MutableList<Cancellation>? = null,
@Json(serializeNull = false)
val batchEvents: BatchEvents? = null,
val steps: Map<String, StepConfig>,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.beust.klaxon.Json
import com.beust.klaxon.JsonValue
import com.beust.klaxon.KlaxonException
import java.time.Duration
import java.time.Instant

// TODO: Throw illegal argument exception
class InngestFunctionConfigBuilder {
Expand All @@ -18,6 +19,7 @@ class InngestFunctionConfigBuilder {
private var debounce: Debounce? = null
private var priority: Priority? = null
private var idempotency: String? = null
private var cancel: MutableList<Cancellation>? = null
private var batchEvents: BatchEvents? = null

/**
Expand Down Expand Up @@ -84,6 +86,68 @@ class InngestFunctionConfigBuilder {
return this
}

/**
* Define events that can be used to cancel a running or sleeping function
*
* @param event The name of the event that should cancel the function run.
* @param if The CEL expression that must evaluate to true in order to cancel the function run. There
* are two variables available in this expression:
* - event, referencing the original function's event trigger
* - async, referencing the new cancel event.
* @param timeout An optional timeout specified as a Duration that the cancel is valid for. If this isn't
* specified, cancellation triggers are valid for up to a year or until the
* function ends.
*/
@JvmOverloads // Can only overload one of the cancelOn signatures because they would clash and not compile otherwise
fun cancelOn(
event: String,
`if`: String? = null,
timeout: Duration? = null,
): InngestFunctionConfigBuilder {
return cancelOn(
Cancellation(
event,
`if`,
timeout?.let { durationConverter.toJson(it) },
),
)
}

/**
* Define events that can be used to cancel a running or sleeping function
*
* @param event The name of the event that should cancel the function run.
* @param if The CEL expression that must evaluate to true in order to cancel the function run. There
* are two variables available in this expression:
* - event, referencing the original function's event trigger
* - async, referencing the new cancel event.
* @param timeout An optional timeout specified as an Instant that the cancel is valid until. If this isn't
* specified, cancellation triggers are valid for up to a year or until the
* function ends.
*/
fun cancelOn(
event: String,
`if`: String? = null,
timeout: Instant? = null,
): InngestFunctionConfigBuilder {
return cancelOn(
Cancellation(
event,
`if`,
timeout?.let { timeout.toString() },
),
)
}

internal fun cancelOn(cancellation: Cancellation): InngestFunctionConfigBuilder =
apply {
if (this.cancel == null) {
this.cancel = mutableListOf(cancellation)
} else {
this.cancel!!.add(cancellation)
}
}

/**
* Configure the function to be executed with batches of events (1 to n).
* Events will be added into a batch until the maxSize has been reached or
Expand Down Expand Up @@ -261,6 +325,7 @@ class InngestFunctionConfigBuilder {
debounce,
priority,
idempotency,
cancel,
batchEvents,
steps = buildSteps(serverUrl),
)
Expand Down Expand Up @@ -357,6 +422,16 @@ internal data class Priority
val run: String,
)

internal data class Cancellation
@JvmOverloads
constructor(
val event: String,
@Json(serializeNull = false)
val `if`: String? = null,
@Json(serializeNull = false)
val timeout: String? = null,
)

internal data class BatchEvents
@JvmOverloads
constructor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ abstract class InngestFunctionTrigger // or interface or data class
@Json(serializeNull = false) val event: String? = null,
@Json(serializeNull = false, name = "expression") val `if`: String? = null,
@Json(serializeNull = false) val cron: String? = null,
// IDEA - Add timeout and re-use for cancelOn?
)

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.inngest

import java.time.Duration
import java.time.Instant
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
Expand Down Expand Up @@ -85,4 +87,23 @@ class InngestFunctionConfigBuilderTest {
.build("app-id", "https://mysite.com/api/inngest")
}
}

@Test
fun testCancelOnTimeout() {
val durationConfig =
InngestFunctionConfigBuilder()
.id("test-id")
.cancelOn("cancel", null, Duration.ofSeconds(6000))
.build("app-id", "https://mysite.com/api/inngest")

assertEquals<List<Cancellation>?>(listOf(Cancellation("cancel", null, "\"6000s\"")), durationConfig.cancel)

val instantConfig =
InngestFunctionConfigBuilder()
.id("test-id")
.cancelOn("cancel", null, Instant.ofEpochSecond(1726056053))
.build("app-id", "https://mysite.com/api/inngest")

assertEquals<List<Cancellation>?>(listOf(Cancellation("cancel", null, "2024-09-11T12:00:53Z")), instantConfig.cancel)
}
}

0 comments on commit ccddba0

Please sign in to comment.