diff --git a/lib/inngest/error.ex b/lib/inngest/error.ex index 4e3505a..79ff1b5 100644 --- a/lib/inngest/error.ex +++ b/lib/inngest/error.ex @@ -21,3 +21,7 @@ end defmodule Inngest.ConcurrencyConfigError do defexception [:message] end + +defmodule Inngest.IdempotencyConfigError do + defexception [:message] +end diff --git a/lib/inngest/function.ex b/lib/inngest/function.ex index c391302..a660659 100644 --- a/lib/inngest/function.ex +++ b/lib/inngest/function.ex @@ -168,31 +168,27 @@ defmodule Inngest.Function do |> maybe_debounce() |> maybe_batch_events() |> maybe_rate_limit() + |> maybe_idempotency() |> maybe_concurrency() ] ++ handler end defp retries(), do: fn_opts() |> Map.get(:retries) - defp maybe_debounce(config) do - fn_opts() - |> Inngest.FnOpts.validate_debounce(config) - end + defp maybe_debounce(config), + do: fn_opts() |> Inngest.FnOpts.validate_debounce(config) - defp maybe_batch_events(config) do - fn_opts() - |> Inngest.FnOpts.validate_batch_events(config) - end + defp maybe_batch_events(config), + do: fn_opts() |> Inngest.FnOpts.validate_batch_events(config) - defp maybe_rate_limit(config) do - fn_opts() - |> Inngest.FnOpts.validate_rate_limit(config) - end + defp maybe_rate_limit(config), + do: fn_opts() |> Inngest.FnOpts.validate_rate_limit(config) - defp maybe_concurrency(config) do - fn_opts() - |> Inngest.FnOpts.validate_concurrency(config) - end + defp maybe_idempotency(config), + do: fn_opts() |> Inngest.FnOpts.validate_idempotency(config) + + defp maybe_concurrency(config), + do: fn_opts() |> Inngest.FnOpts.validate_concurrency(config) defp fn_opts() do case __MODULE__.__info__(:attributes) |> Keyword.get(:func) |> List.first() do diff --git a/lib/inngest/function/config.ex b/lib/inngest/function/config.ex index 22685ff..a45ae06 100644 --- a/lib/inngest/function/config.ex +++ b/lib/inngest/function/config.ex @@ -9,6 +9,7 @@ defmodule Inngest.FnOpts do :debounce, :batch_events, :rate_limit, + :idempotency, :concurrency, retries: 3 ] @@ -22,6 +23,7 @@ defmodule Inngest.FnOpts do debounce: debounce() | nil, batch_events: batch_events() | nil, rate_limit: rate_limit() | nil, + idempotency: idempotency() | nil, concurrency: concurrency() | nil } @@ -41,6 +43,8 @@ defmodule Inngest.FnOpts do key: binary() | nil } + @type idempotency() :: binary() + @type concurrency() :: number() | concurrency_option() @@ -54,7 +58,7 @@ defmodule Inngest.FnOpts do @concurrency_scopes ["fn", "env", "account"] @doc """ - Validate the debounce configuration + Validate the debounce settings """ @spec validate_debounce(t(), map()) :: map() def validate_debounce(fnopts, config) do @@ -86,7 +90,7 @@ defmodule Inngest.FnOpts do end @doc """ - Validate the event batch config + Validate the event batch settings """ @spec validate_batch_events(t(), map()) :: map() def validate_batch_events(fnopts, config) do @@ -121,7 +125,7 @@ defmodule Inngest.FnOpts do end @doc """ - Validate the rate limit config + Validate the rate limit settings """ @spec validate_rate_limit(t(), map()) :: map() def validate_rate_limit(fnopts, config) do @@ -155,7 +159,25 @@ defmodule Inngest.FnOpts do end @doc """ - Validate the concurrency config + Validate the idempotency settings + """ + def validate_idempotency(fnopts, config) do + # NOTE: nothing really to validate, just have this for the sake of consistency + case fnopts |> Map.get(:idempotency) do + nil -> + config + + setting -> + if !is_binary(setting) do + raise Inngest.IdempotencyConfigError, message: "idempotency must be a CEL string" + end + + Map.put(config, :idempotency, setting) + end + end + + @doc """ + Validate the concurrency settings """ @spec validate_concurrency(t(), map()) :: map() def validate_concurrency(fnopts, config) do diff --git a/test/inngest/function/cases/idempotency_test.exs b/test/inngest/function/cases/idempotency_test.exs new file mode 100644 index 0000000..93a8a01 --- /dev/null +++ b/test/inngest/function/cases/idempotency_test.exs @@ -0,0 +1,53 @@ +defmodule Inngest.Function.Cases.IdempotencyTest do + use ExUnit.Case, async: true + + alias Inngest.Test.DevServer + import Inngest.Test.Helper + + @default_sleep 3_000 + + @tag :integration + test "should only run 1 out of 10" do + event_ids = + Enum.map(1..10, fn _ -> send_test_event("test/plug.idempotency", %{foobar: false}) end) + + Process.sleep(@default_sleep) + + fn_runs = + event_ids + |> Enum.map(fn id -> + {:ok, %{"data" => data}} = DevServer.run_ids(id) + + if Enum.count(data) == 1 do + assert [ + %{ + "output" => "Done", + "status" => "Completed", + "run_id" => run_id + } + ] = data + + run_id + else + nil + end + end) + |> Enum.filter(&(!is_nil(&1))) + + assert Enum.count(fn_runs) == 1 + + # sending with a different value will run + other = send_test_event("test/plug.idempotency", %{foobar: "hello"}) + Process.sleep(@default_sleep) + + assert {:ok, + %{ + "data" => [ + %{ + "output" => "Done", + "status" => "Completed" + } + ] + }} = DevServer.run_ids(other) + end +end diff --git a/test/inngest/function/cases/rate_limit_test.exs b/test/inngest/function/cases/rate_limit_test.exs index f409223..64d046f 100644 --- a/test/inngest/function/cases/rate_limit_test.exs +++ b/test/inngest/function/cases/rate_limit_test.exs @@ -7,7 +7,7 @@ defmodule Inngest.Function.Cases.RateLimitTest do @default_sleep 10_000 @tag :integration - test "should only run 2 out of 10" do + test "should <= 2 out of 10" do event_ids = Enum.map(1..10, fn _ -> send_test_event("test/plug.ratelimit") end) Process.sleep(@default_sleep) @@ -25,6 +25,8 @@ defmodule Inngest.Function.Cases.RateLimitTest do "run_id" => run_id } ] = data + + run_id else nil end diff --git a/test/inngest/function/config_test.exs b/test/inngest/function/config_test.exs index 6e675dc..665c56d 100644 --- a/test/inngest/function/config_test.exs +++ b/test/inngest/function/config_test.exs @@ -167,6 +167,30 @@ defmodule Inngest.FnOptsTest do end end + describe "validate_idempotency/2" do + @fn_opts %FnOpts{ + id: "foobar", + name: "Foobar", + idempotency: "event.data.foobar" + } + + test "should succeed with valid settings" do + assert %{ + idempotency: "event.data.foobar" + } = FnOpts.validate_idempotency(@fn_opts, @config) + end + + test "should raise if value is not string" do + opts = %{@fn_opts | idempotency: false} + + assert_raise Inngest.IdempotencyConfigError, + "idempotency must be a CEL string", + fn -> + FnOpts.validate_idempotency(opts, @config) + end + end + end + describe "validate_concurrency/2" do @fn_opts %FnOpts{ id: "foobar", diff --git a/test/support/cases/concurrency_fn.ex b/test/support/cases/concurrency_fn.ex index 24fbf6f..c535029 100644 --- a/test/support/cases/concurrency_fn.ex +++ b/test/support/cases/concurrency_fn.ex @@ -14,7 +14,6 @@ defmodule Inngest.Test.Case.ConcurrencyFn do @trigger %Trigger{event: "test/plug.throttle"} @impl true - @spec exec(any, %{:step => atom, optional(any) => any}) :: {:ok, <<_::72>>} def exec(ctx, %{step: step} = _args) do _ = step.run(ctx, "wait", fn -> diff --git a/test/support/cases/idempotent_fn.ex b/test/support/cases/idempotent_fn.ex new file mode 100644 index 0000000..85a193d --- /dev/null +++ b/test/support/cases/idempotent_fn.ex @@ -0,0 +1,18 @@ +defmodule Inngest.Test.Case.IdempotentFn do + @moduledoc false + + use Inngest.Function + alias Inngest.{FnOpts, Trigger} + + @func %FnOpts{ + id: "idempotent-fn", + name: "Idempotent Function", + idempotency: "event.data.foobar" + } + @trigger %Trigger{event: "test/plug.idempotency"} + + @impl true + def exec(_ctx, _args) do + {:ok, "Done"} + end +end