Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: idempotency support #69

Merged
merged 5 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/inngest/error.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ end
defmodule Inngest.ConcurrencyConfigError do
defexception [:message]
end

defmodule Inngest.IdempotencyConfigError do
defexception [:message]
end
28 changes: 12 additions & 16 deletions lib/inngest/function.ex
Original file line number Diff line number Diff line change
Expand Up @@ -168,31 +168,27 @@ defmodule Inngest.Function do
|> maybe_debounce()
|> maybe_batch_events()
|> maybe_rate_limit()
|> maybe_idempotency()
|> maybe_concurrency()
] ++ handler
end

defp retries(), do: fn_opts() |> Map.get(:retries)

defp maybe_debounce(config) do
fn_opts()
|> Inngest.FnOpts.validate_debounce(config)
end
defp maybe_debounce(config),
do: fn_opts() |> Inngest.FnOpts.validate_debounce(config)

defp maybe_batch_events(config) do
fn_opts()
|> Inngest.FnOpts.validate_batch_events(config)
end
defp maybe_batch_events(config),
do: fn_opts() |> Inngest.FnOpts.validate_batch_events(config)

defp maybe_rate_limit(config) do
fn_opts()
|> Inngest.FnOpts.validate_rate_limit(config)
end
defp maybe_rate_limit(config),
do: fn_opts() |> Inngest.FnOpts.validate_rate_limit(config)

defp maybe_concurrency(config) do
fn_opts()
|> Inngest.FnOpts.validate_concurrency(config)
end
defp maybe_idempotency(config),
do: fn_opts() |> Inngest.FnOpts.validate_idempotency(config)

defp maybe_concurrency(config),
do: fn_opts() |> Inngest.FnOpts.validate_concurrency(config)

defp fn_opts() do
case __MODULE__.__info__(:attributes) |> Keyword.get(:func) |> List.first() do
Expand Down
30 changes: 26 additions & 4 deletions lib/inngest/function/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Inngest.FnOpts do
:debounce,
:batch_events,
:rate_limit,
:idempotency,
:concurrency,
retries: 3
]
Expand All @@ -22,6 +23,7 @@ defmodule Inngest.FnOpts do
debounce: debounce() | nil,
batch_events: batch_events() | nil,
rate_limit: rate_limit() | nil,
idempotency: idempotency() | nil,
concurrency: concurrency() | nil
}

Expand All @@ -41,6 +43,8 @@ defmodule Inngest.FnOpts do
key: binary() | nil
}

@type idempotency() :: binary()

@type concurrency() ::
number()
| concurrency_option()
Expand All @@ -54,7 +58,7 @@ defmodule Inngest.FnOpts do
@concurrency_scopes ["fn", "env", "account"]

@doc """
Validate the debounce configuration
Validate the debounce settings
"""
@spec validate_debounce(t(), map()) :: map()
def validate_debounce(fnopts, config) do
Expand Down Expand Up @@ -86,7 +90,7 @@ defmodule Inngest.FnOpts do
end

@doc """
Validate the event batch config
Validate the event batch settings
"""
@spec validate_batch_events(t(), map()) :: map()
def validate_batch_events(fnopts, config) do
Expand Down Expand Up @@ -121,7 +125,7 @@ defmodule Inngest.FnOpts do
end

@doc """
Validate the rate limit config
Validate the rate limit settings
"""
@spec validate_rate_limit(t(), map()) :: map()
def validate_rate_limit(fnopts, config) do
Expand Down Expand Up @@ -155,7 +159,25 @@ defmodule Inngest.FnOpts do
end

@doc """
Validate the concurrency config
Validate the idempotency settings
"""
def validate_idempotency(fnopts, config) do
# NOTE: nothing really to validate, just have this for the sake of consistency
case fnopts |> Map.get(:idempotency) do
nil ->
config

setting ->
if !is_binary(setting) do
raise Inngest.IdempotencyConfigError, message: "idempotency must be a CEL string"
end

Map.put(config, :idempotency, setting)
end
end

@doc """
Validate the concurrency settings
"""
@spec validate_concurrency(t(), map()) :: map()
def validate_concurrency(fnopts, config) do
Expand Down
53 changes: 53 additions & 0 deletions test/inngest/function/cases/idempotency_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
defmodule Inngest.Function.Cases.IdempotencyTest do
use ExUnit.Case, async: true

alias Inngest.Test.DevServer
import Inngest.Test.Helper

@default_sleep 3_000

@tag :integration
test "should only run 1 out of 10" do
event_ids =
Enum.map(1..10, fn _ -> send_test_event("test/plug.idempotency", %{foobar: false}) end)

Process.sleep(@default_sleep)

fn_runs =
event_ids
|> Enum.map(fn id ->
{:ok, %{"data" => data}} = DevServer.run_ids(id)

if Enum.count(data) == 1 do
assert [
%{
"output" => "Done",
"status" => "Completed",
"run_id" => run_id
}
] = data

run_id
else
nil
end
end)
|> Enum.filter(&(!is_nil(&1)))

assert Enum.count(fn_runs) == 1

# sending with a different value will run
other = send_test_event("test/plug.idempotency", %{foobar: "hello"})
Process.sleep(@default_sleep)

assert {:ok,
%{
"data" => [
%{
"output" => "Done",
"status" => "Completed"
}
]
}} = DevServer.run_ids(other)
end
end
4 changes: 3 additions & 1 deletion test/inngest/function/cases/rate_limit_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Inngest.Function.Cases.RateLimitTest do
@default_sleep 10_000

@tag :integration
test "should only run 2 out of 10" do
test "should <= 2 out of 10" do
event_ids = Enum.map(1..10, fn _ -> send_test_event("test/plug.ratelimit") end)

Process.sleep(@default_sleep)
Expand All @@ -25,6 +25,8 @@ defmodule Inngest.Function.Cases.RateLimitTest do
"run_id" => run_id
}
] = data

run_id
else
nil
end
Expand Down
24 changes: 24 additions & 0 deletions test/inngest/function/config_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,30 @@ defmodule Inngest.FnOptsTest do
end
end

describe "validate_idempotency/2" do
@fn_opts %FnOpts{
id: "foobar",
name: "Foobar",
idempotency: "event.data.foobar"
}

test "should succeed with valid settings" do
assert %{
idempotency: "event.data.foobar"
} = FnOpts.validate_idempotency(@fn_opts, @config)
end

test "should raise if value is not string" do
opts = %{@fn_opts | idempotency: false}

assert_raise Inngest.IdempotencyConfigError,
"idempotency must be a CEL string",
fn ->
FnOpts.validate_idempotency(opts, @config)
end
end
end

describe "validate_concurrency/2" do
@fn_opts %FnOpts{
id: "foobar",
Expand Down
1 change: 0 additions & 1 deletion test/support/cases/concurrency_fn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ defmodule Inngest.Test.Case.ConcurrencyFn do
@trigger %Trigger{event: "test/plug.throttle"}

@impl true
@spec exec(any, %{:step => atom, optional(any) => any}) :: {:ok, <<_::72>>}
def exec(ctx, %{step: step} = _args) do
_ =
step.run(ctx, "wait", fn ->
Expand Down
18 changes: 18 additions & 0 deletions test/support/cases/idempotent_fn.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule Inngest.Test.Case.IdempotentFn do
@moduledoc false

use Inngest.Function
alias Inngest.{FnOpts, Trigger}

@func %FnOpts{
id: "idempotent-fn",
name: "Idempotent Function",
idempotency: "event.data.foobar"
}
@trigger %Trigger{event: "test/plug.idempotency"}

@impl true
def exec(_ctx, _args) do
{:ok, "Done"}
end
end
Loading