From d1cb2313509ced94fbc8855e787387adbc2d9a00 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Sun, 26 Nov 2023 21:02:41 -0800 Subject: [PATCH 1/5] add idempotency to list of configs --- lib/inngest/function/config.ex | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/inngest/function/config.ex b/lib/inngest/function/config.ex index 22685ff..9929a25 100644 --- a/lib/inngest/function/config.ex +++ b/lib/inngest/function/config.ex @@ -9,6 +9,7 @@ defmodule Inngest.FnOpts do :debounce, :batch_events, :rate_limit, + :idempotency, :concurrency, retries: 3 ] @@ -22,6 +23,7 @@ defmodule Inngest.FnOpts do debounce: debounce() | nil, batch_events: batch_events() | nil, rate_limit: rate_limit() | nil, + idempotency: idempotency() | nil, concurrency: concurrency() | nil } @@ -41,6 +43,8 @@ defmodule Inngest.FnOpts do key: binary() | nil } + @type idempotency() :: binary() + @type concurrency() :: number() | concurrency_option() From f7509fe10b5528d475b2fa68e55a895dde8e2e30 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Sun, 26 Nov 2023 21:12:25 -0800 Subject: [PATCH 2/5] add validation --- lib/inngest/error.ex | 4 ++++ lib/inngest/function.ex | 28 ++++++++++++--------------- lib/inngest/function/config.ex | 26 +++++++++++++++++++++---- test/inngest/function/config_test.exs | 24 +++++++++++++++++++++++ 4 files changed, 62 insertions(+), 20 deletions(-) diff --git a/lib/inngest/error.ex b/lib/inngest/error.ex index 4e3505a..79ff1b5 100644 --- a/lib/inngest/error.ex +++ b/lib/inngest/error.ex @@ -21,3 +21,7 @@ end defmodule Inngest.ConcurrencyConfigError do defexception [:message] end + +defmodule Inngest.IdempotencyConfigError do + defexception [:message] +end diff --git a/lib/inngest/function.ex b/lib/inngest/function.ex index c391302..a660659 100644 --- a/lib/inngest/function.ex +++ b/lib/inngest/function.ex @@ -168,31 +168,27 @@ defmodule Inngest.Function do |> maybe_debounce() |> maybe_batch_events() |> maybe_rate_limit() + |> maybe_idempotency() |> maybe_concurrency() ] ++ handler end defp retries(), do: fn_opts() |> Map.get(:retries) - defp maybe_debounce(config) do - fn_opts() - |> Inngest.FnOpts.validate_debounce(config) - end + defp maybe_debounce(config), + do: fn_opts() |> Inngest.FnOpts.validate_debounce(config) - defp maybe_batch_events(config) do - fn_opts() - |> Inngest.FnOpts.validate_batch_events(config) - end + defp maybe_batch_events(config), + do: fn_opts() |> Inngest.FnOpts.validate_batch_events(config) - defp maybe_rate_limit(config) do - fn_opts() - |> Inngest.FnOpts.validate_rate_limit(config) - end + defp maybe_rate_limit(config), + do: fn_opts() |> Inngest.FnOpts.validate_rate_limit(config) - defp maybe_concurrency(config) do - fn_opts() - |> Inngest.FnOpts.validate_concurrency(config) - end + defp maybe_idempotency(config), + do: fn_opts() |> Inngest.FnOpts.validate_idempotency(config) + + defp maybe_concurrency(config), + do: fn_opts() |> Inngest.FnOpts.validate_concurrency(config) defp fn_opts() do case __MODULE__.__info__(:attributes) |> Keyword.get(:func) |> List.first() do diff --git a/lib/inngest/function/config.ex b/lib/inngest/function/config.ex index 9929a25..a45ae06 100644 --- a/lib/inngest/function/config.ex +++ b/lib/inngest/function/config.ex @@ -58,7 +58,7 @@ defmodule Inngest.FnOpts do @concurrency_scopes ["fn", "env", "account"] @doc """ - Validate the debounce configuration + Validate the debounce settings """ @spec validate_debounce(t(), map()) :: map() def validate_debounce(fnopts, config) do @@ -90,7 +90,7 @@ defmodule Inngest.FnOpts do end @doc """ - Validate the event batch config + Validate the event batch settings """ @spec validate_batch_events(t(), map()) :: map() def validate_batch_events(fnopts, config) do @@ -125,7 +125,7 @@ defmodule Inngest.FnOpts do end @doc """ - Validate the rate limit config + Validate the rate limit settings """ @spec validate_rate_limit(t(), map()) :: map() def validate_rate_limit(fnopts, config) do @@ -159,7 +159,25 @@ defmodule Inngest.FnOpts do end @doc """ - Validate the concurrency config + Validate the idempotency settings + """ + def validate_idempotency(fnopts, config) do + # NOTE: nothing really to validate, just have this for the sake of consistency + case fnopts |> Map.get(:idempotency) do + nil -> + config + + setting -> + if !is_binary(setting) do + raise Inngest.IdempotencyConfigError, message: "idempotency must be a CEL string" + end + + Map.put(config, :idempotency, setting) + end + end + + @doc """ + Validate the concurrency settings """ @spec validate_concurrency(t(), map()) :: map() def validate_concurrency(fnopts, config) do diff --git a/test/inngest/function/config_test.exs b/test/inngest/function/config_test.exs index 6e675dc..665c56d 100644 --- a/test/inngest/function/config_test.exs +++ b/test/inngest/function/config_test.exs @@ -167,6 +167,30 @@ defmodule Inngest.FnOptsTest do end end + describe "validate_idempotency/2" do + @fn_opts %FnOpts{ + id: "foobar", + name: "Foobar", + idempotency: "event.data.foobar" + } + + test "should succeed with valid settings" do + assert %{ + idempotency: "event.data.foobar" + } = FnOpts.validate_idempotency(@fn_opts, @config) + end + + test "should raise if value is not string" do + opts = %{@fn_opts | idempotency: false} + + assert_raise Inngest.IdempotencyConfigError, + "idempotency must be a CEL string", + fn -> + FnOpts.validate_idempotency(opts, @config) + end + end + end + describe "validate_concurrency/2" do @fn_opts %FnOpts{ id: "foobar", From ab381cb3354c8d997697bf7e07c5802acba21876 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Sun, 26 Nov 2023 21:18:10 -0800 Subject: [PATCH 3/5] add integration test for idempotency --- .../function/cases/idempotency_test.exs | 38 +++++++++++++++++++ .../function/cases/rate_limit_test.exs | 4 +- test/support/cases/concurrency_fn.ex | 1 - test/support/cases/idempotent_fn.ex | 18 +++++++++ 4 files changed, 59 insertions(+), 2 deletions(-) create mode 100644 test/inngest/function/cases/idempotency_test.exs create mode 100644 test/support/cases/idempotent_fn.ex diff --git a/test/inngest/function/cases/idempotency_test.exs b/test/inngest/function/cases/idempotency_test.exs new file mode 100644 index 0000000..863db28 --- /dev/null +++ b/test/inngest/function/cases/idempotency_test.exs @@ -0,0 +1,38 @@ +defmodule Inngest.Function.Cases.IdempotencyTest do + use ExUnit.Case, async: true + + alias Inngest.Test.DevServer + import Inngest.Test.Helper + + @default_sleep 3_000 + + @tag :integration + test "should only run 1 out of 10" do + event_ids = Enum.map(1..10, fn _ -> send_test_event("test/plug.idempotency") end) + + Process.sleep(@default_sleep) + + fn_runs = + event_ids + |> Enum.map(fn id -> + {:ok, %{"data" => data}} = DevServer.run_ids(id) + + if Enum.count(data) == 1 do + assert [ + %{ + "output" => "Rate Limited", + "status" => "Completed", + "run_id" => run_id + } + ] = data + + run_id + else + nil + end + end) + |> Enum.filter(&(!is_nil(&1))) + + assert Enum.count(fn_runs) == 1 + end +end diff --git a/test/inngest/function/cases/rate_limit_test.exs b/test/inngest/function/cases/rate_limit_test.exs index f409223..64d046f 100644 --- a/test/inngest/function/cases/rate_limit_test.exs +++ b/test/inngest/function/cases/rate_limit_test.exs @@ -7,7 +7,7 @@ defmodule Inngest.Function.Cases.RateLimitTest do @default_sleep 10_000 @tag :integration - test "should only run 2 out of 10" do + test "should <= 2 out of 10" do event_ids = Enum.map(1..10, fn _ -> send_test_event("test/plug.ratelimit") end) Process.sleep(@default_sleep) @@ -25,6 +25,8 @@ defmodule Inngest.Function.Cases.RateLimitTest do "run_id" => run_id } ] = data + + run_id else nil end diff --git a/test/support/cases/concurrency_fn.ex b/test/support/cases/concurrency_fn.ex index 24fbf6f..c535029 100644 --- a/test/support/cases/concurrency_fn.ex +++ b/test/support/cases/concurrency_fn.ex @@ -14,7 +14,6 @@ defmodule Inngest.Test.Case.ConcurrencyFn do @trigger %Trigger{event: "test/plug.throttle"} @impl true - @spec exec(any, %{:step => atom, optional(any) => any}) :: {:ok, <<_::72>>} def exec(ctx, %{step: step} = _args) do _ = step.run(ctx, "wait", fn -> diff --git a/test/support/cases/idempotent_fn.ex b/test/support/cases/idempotent_fn.ex new file mode 100644 index 0000000..85a193d --- /dev/null +++ b/test/support/cases/idempotent_fn.ex @@ -0,0 +1,18 @@ +defmodule Inngest.Test.Case.IdempotentFn do + @moduledoc false + + use Inngest.Function + alias Inngest.{FnOpts, Trigger} + + @func %FnOpts{ + id: "idempotent-fn", + name: "Idempotent Function", + idempotency: "event.data.foobar" + } + @trigger %Trigger{event: "test/plug.idempotency"} + + @impl true + def exec(_ctx, _args) do + {:ok, "Done"} + end +end From 36db0b1f05bed0a9bcdcf0794a829b3d978ee1c4 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Sun, 26 Nov 2023 21:18:50 -0800 Subject: [PATCH 4/5] fix output --- test/inngest/function/cases/idempotency_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/inngest/function/cases/idempotency_test.exs b/test/inngest/function/cases/idempotency_test.exs index 863db28..ddd2a96 100644 --- a/test/inngest/function/cases/idempotency_test.exs +++ b/test/inngest/function/cases/idempotency_test.exs @@ -20,7 +20,7 @@ defmodule Inngest.Function.Cases.IdempotencyTest do if Enum.count(data) == 1 do assert [ %{ - "output" => "Rate Limited", + "output" => "Done", "status" => "Completed", "run_id" => run_id } From dd6509681f94fc12cf6809793922aa33788aed56 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Sun, 26 Nov 2023 21:23:57 -0800 Subject: [PATCH 5/5] expand test --- .../inngest/function/cases/idempotency_test.exs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/test/inngest/function/cases/idempotency_test.exs b/test/inngest/function/cases/idempotency_test.exs index ddd2a96..93a8a01 100644 --- a/test/inngest/function/cases/idempotency_test.exs +++ b/test/inngest/function/cases/idempotency_test.exs @@ -8,7 +8,8 @@ defmodule Inngest.Function.Cases.IdempotencyTest do @tag :integration test "should only run 1 out of 10" do - event_ids = Enum.map(1..10, fn _ -> send_test_event("test/plug.idempotency") end) + event_ids = + Enum.map(1..10, fn _ -> send_test_event("test/plug.idempotency", %{foobar: false}) end) Process.sleep(@default_sleep) @@ -34,5 +35,19 @@ defmodule Inngest.Function.Cases.IdempotencyTest do |> Enum.filter(&(!is_nil(&1))) assert Enum.count(fn_runs) == 1 + + # sending with a different value will run + other = send_test_event("test/plug.idempotency", %{foobar: "hello"}) + Process.sleep(@default_sleep) + + assert {:ok, + %{ + "data" => [ + %{ + "output" => "Done", + "status" => "Completed" + } + ] + }} = DevServer.run_ids(other) end end