From c743303ec4c7d71a3dc1506e23e3c155e5385547 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Sat, 25 Nov 2023 14:25:06 -0800 Subject: [PATCH 1/7] INN-2381: Add batching support --- lib/inngest/error.ex | 4 ++ lib/inngest/function.ex | 53 ++++++++++++++++++- .../function/cases/batch_events_test.exs | 8 +++ test/support/cases/batching_fn.ex | 24 +++++++++ 4 files changed, 87 insertions(+), 2 deletions(-) create mode 100644 test/inngest/function/cases/batch_events_test.exs create mode 100644 test/support/cases/batching_fn.ex diff --git a/lib/inngest/error.ex b/lib/inngest/error.ex index 952b2cf..36ccf81 100644 --- a/lib/inngest/error.ex +++ b/lib/inngest/error.ex @@ -9,3 +9,7 @@ end defmodule Inngest.InvalidDebounceConfigError do defexception message: "a 'period' must be set for debounce" end + +defmodule Inngest.InvalidBatchEventConfigError do + defexception [:message] +end diff --git a/lib/inngest/function.ex b/lib/inngest/function.ex index eb4ad27..5564971 100644 --- a/lib/inngest/function.ex +++ b/lib/inngest/function.ex @@ -172,6 +172,7 @@ defmodule Inngest.Function do } } |> maybe_debounce() + |> maybe_batch_events() ] ++ handler end @@ -202,6 +203,47 @@ defmodule Inngest.Function do end end + defp maybe_batch_events(config) do + case __MODULE__.__info__(:attributes) + |> Keyword.get(:func) + |> List.first() + |> Map.get(:batch_events) do + nil -> + config + + batch -> + max_size = Map.get(batch, :max_size) + timeout = Map.get(batch, :timeout) + + if is_nil(max_size) do + raise Inngest.InvalidBatchEventConfigError, + message: "'max_size' must be set for batch_events" + end + + if is_nil(timeout) do + raise Inngest.InvalidBatchEventConfigError, + message: "'timeout' must be set for batch_events" + end + + case Regex.run(~r/(\d+)s/i, timeout) do + nil -> + raise Inngest.InvalidBatchEventConfigError, + message: "invalid 'timeout' value: #{timeout}" + + match -> + dur = match |> Enum.at(1) |> String.to_integer() + + if dur < 1 || dur > 60 do + raise Inngest.InvalidBatchEventConfigError, + message: "'timeout' duration: #{dur}s, needs to be 1s - 60s" + end + end + + batch = batch |> Map.put(:maxSize, max_size) |> Map.drop([:max_size]) + Map.put(config, :batchEvents, batch) + end + end + defp failure_handler_defined?(mod) do mod.__info__(:functions) |> Keyword.get(:handle_failure) == 2 end @@ -254,18 +296,25 @@ defmodule Inngest.FnOpts do :id, :name, :retries, - :debounce + :debounce, + :batch_events ] @type t() :: %__MODULE__{ id: binary(), name: binary(), retries: number() | nil, - debounce: debounce() | nil + debounce: debounce() | nil, + batch_events: batch_events() | nil } @type debounce() :: %{ key: nil | binary(), period: binary() } + + @type batch_events() :: %{ + max_size: number(), + timeout: binary() + } end diff --git a/test/inngest/function/cases/batch_events_test.exs b/test/inngest/function/cases/batch_events_test.exs new file mode 100644 index 0000000..99d3020 --- /dev/null +++ b/test/inngest/function/cases/batch_events_test.exs @@ -0,0 +1,8 @@ +defmodule Inngest.Function.Cases.BatchEventsTest do + use ExUnit.Case, async: true + + alias Inngest.Test.DevServer + import Inngest.Test.Helper + + # TODO: Add test after moving batching logic to OSS +end diff --git a/test/support/cases/batching_fn.ex b/test/support/cases/batching_fn.ex new file mode 100644 index 0000000..8be896c --- /dev/null +++ b/test/support/cases/batching_fn.ex @@ -0,0 +1,24 @@ +defmodule Inngest.Test.Case.BatchFn do + @moduledoc false + + use Inngest.Function + alias Inngest.{FnOpts, Trigger} + + @func %FnOpts{ + id: "batch-fn", + name: "Batch Function", + batch_events: %{ + max_size: 5, + timeout: "5s" + } + } + @trigger %Trigger{event: "test/plug.batching"} + + @impl true + def exec(ctx, %{events: events, step: step} = _args) do + _ = step.run(ctx, "batch", fn -> ":+1" end) + count = Enum.count(events) + + {:ok, "Batched: #{count}"} + end +end From 8b194af867fd99dc5e37cfc2175f76b3f31802bd Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Sun, 26 Nov 2023 13:44:59 -0800 Subject: [PATCH 2/7] refactor --- lib/inngest/function.ex | 49 +++++++++++++++-------------------------- 1 file changed, 18 insertions(+), 31 deletions(-) diff --git a/lib/inngest/function.ex b/lib/inngest/function.ex index 5564971..dbebba4 100644 --- a/lib/inngest/function.ex +++ b/lib/inngest/function.ex @@ -87,22 +87,16 @@ defmodule Inngest.Function do ) @behaviour Inngest.Function - @default_retries 3 @impl true def slug() do - __MODULE__.__info__(:attributes) - |> Keyword.get(:func) - |> List.first() + fn_opts() |> Map.get(:id) end @impl true def name() do - case __MODULE__.__info__(:attributes) - |> Keyword.get(:func) - |> List.first() - |> Map.get(:name) do + case fn_opts() |> Map.get(:name) do nil -> slug() name -> name end @@ -116,13 +110,13 @@ defmodule Inngest.Function do end def slugs() do - failure = if failure_handler_defined?(__MODULE__), do: [failure_slug()], else: [] + failure = if failure_handler_defined?(), do: [failure_slug()], else: [] [slug()] ++ failure end def serve(path) do handler = - if failure_handler_defined?(__MODULE__) do + if failure_handler_defined?() do id = failure_slug() [ @@ -176,21 +170,10 @@ defmodule Inngest.Function do ] ++ handler end - defp retries() do - case __MODULE__.__info__(:attributes) - |> Keyword.get(:func) - |> List.first() - |> Map.get(:retries) do - nil -> @default_retries - retry -> retry - end - end + defp retries(), do: fn_opts() |> Map.get(:retries) defp maybe_debounce(config) do - case __MODULE__.__info__(:attributes) - |> Keyword.get(:func) - |> List.first() - |> Map.get(:debounce) do + case fn_opts() |> Map.get(:debounce) do nil -> config @@ -204,10 +187,7 @@ defmodule Inngest.Function do end defp maybe_batch_events(config) do - case __MODULE__.__info__(:attributes) - |> Keyword.get(:func) - |> List.first() - |> Map.get(:batch_events) do + case fn_opts() |> Map.get(:batch_events) do nil -> config @@ -244,8 +224,15 @@ defmodule Inngest.Function do end end - defp failure_handler_defined?(mod) do - mod.__info__(:functions) |> Keyword.get(:handle_failure) == 2 + defp fn_opts() do + case __MODULE__.__info__(:attributes) |> Keyword.get(:func) |> List.first() do + nil -> %Inngest.FnOpts{} + val -> val + end + end + + defp failure_handler_defined?() do + __MODULE__.__info__(:functions) |> Keyword.get(:handle_failure) == 2 end defp failure_slug(), do: "#{slug()}-failure" @@ -295,9 +282,9 @@ defmodule Inngest.FnOpts do defstruct [ :id, :name, - :retries, :debounce, - :batch_events + :batch_events, + retries: 3 ] @type t() :: %__MODULE__{ From e5788be6a983eddb20719c869e60c82cf786de29 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Sun, 26 Nov 2023 13:52:30 -0800 Subject: [PATCH 3/7] set fail-fast to false to allow other matrix to finish --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8a098c7..b9397b3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,6 +17,7 @@ jobs: name: Test (Elixir ${{ matrix.elixir }} / OTP ${{ matrix.otp }}) runs-on: ubuntu-latest strategy: + fail-fast: false matrix: # NOTE: best effort coverage # https://hexdocs.pm/elixir/compatibility-and-deprecations.html#compatibility-between-elixir-and-erlang-otp From c82634826f6a9b9ed219486af9ae83a628ffee3d Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Sun, 26 Nov 2023 13:57:19 -0800 Subject: [PATCH 4/7] move module to separate file --- lib/inngest/function.ex | 30 ------------------------------ lib/inngest/function/config.ex | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 30 deletions(-) create mode 100644 lib/inngest/function/config.ex diff --git a/lib/inngest/function.ex b/lib/inngest/function.ex index dbebba4..28c24a6 100644 --- a/lib/inngest/function.ex +++ b/lib/inngest/function.ex @@ -275,33 +275,3 @@ defmodule Inngest.Function do def validate_datetime(_), do: {:error, "Expect valid DateTime formatted input"} end - -defmodule Inngest.FnOpts do - @moduledoc false - - defstruct [ - :id, - :name, - :debounce, - :batch_events, - retries: 3 - ] - - @type t() :: %__MODULE__{ - id: binary(), - name: binary(), - retries: number() | nil, - debounce: debounce() | nil, - batch_events: batch_events() | nil - } - - @type debounce() :: %{ - key: nil | binary(), - period: binary() - } - - @type batch_events() :: %{ - max_size: number(), - timeout: binary() - } -end diff --git a/lib/inngest/function/config.ex b/lib/inngest/function/config.ex new file mode 100644 index 0000000..8b89de4 --- /dev/null +++ b/lib/inngest/function/config.ex @@ -0,0 +1,33 @@ +defmodule Inngest.FnOpts do + @moduledoc false + + defstruct [ + :id, + :name, + :debounce, + :batch_events, + retries: 3 + ] + + @type t() :: %__MODULE__{ + id: binary(), + name: binary(), + retries: number() | nil, + debounce: debounce() | nil, + batch_events: batch_events() | nil + } + + @type debounce() :: %{ + key: nil | binary(), + period: binary() + } + + @type batch_events() :: %{ + max_size: number(), + timeout: binary() + } + + # @spec validate_debounce(t()) :: map() + # def validate_debounce(fnopts) do + # end +end From b8a360b7a64c042eb44c52fe409faa12bb438d7c Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Sun, 26 Nov 2023 15:10:44 -0800 Subject: [PATCH 5/7] add duration parser helper --- lib/inngest/util.ex | 32 ++++++++++++++++++++++++++++++++ test/inngest/util_test.exs | 29 +++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+) create mode 100644 lib/inngest/util.ex create mode 100644 test/inngest/util_test.exs diff --git a/lib/inngest/util.ex b/lib/inngest/util.ex new file mode 100644 index 0000000..26fb587 --- /dev/null +++ b/lib/inngest/util.ex @@ -0,0 +1,32 @@ +defmodule Inngest.Util do + @moduledoc """ + Utility functions + """ + + @doc """ + Parse string duration that Inngest understands into seconds + """ + @spec parse_duration(binary()) :: {:ok, number()} | {:error, binary()} + def parse_duration(value) do + with [_, num, unit] <- Regex.run(~r/(\d+)(s|m|h|d)/i, value), + dur <- String.to_integer(num) do + case unit do + "d" -> {:ok, dur * day_in_seconds()} + "h" -> {:ok, dur * hour_in_seconds()} + "m" -> {:ok, dur * minute_in_seconds()} + "s" -> {:ok, dur} + _ -> {:error, "invalid time unit '#{unit}', must be d|h|m|s"} + end + else + nil -> + {:error, "invalid duration: '#{value}'"} + + _ -> + {:error, "unknow error occurred when parsing duration"} + end + end + + def day_in_seconds(), do: 60 * 60 * 24 + def hour_in_seconds(), do: 60 * 60 + def minute_in_seconds(), do: 60 +end diff --git a/test/inngest/util_test.exs b/test/inngest/util_test.exs new file mode 100644 index 0000000..06acfcf --- /dev/null +++ b/test/inngest/util_test.exs @@ -0,0 +1,29 @@ +defmodule Inngest.UtilTest do + use ExUnit.Case, async: true + + alias Inngest.Util + + describe "parse_duration/1" do + %{ + "10s" => 10, + "20m" => 20 * Util.minute_in_seconds(), + "2h" => 2 * Util.hour_in_seconds(), + "7d" => 7 * Util.day_in_seconds() + } + |> Enum.map(fn {key, val} -> {key, Macro.escape(val)} end) + |> Enum.each(fn {input, expected} -> + test "should succeed for #{input}" do + output = unquote(expected) + assert {:ok, ^output} = Util.parse_duration(unquote(input)) + end + end) + + test "should fail for invalid duration" do + assert {:error, "invalid duration: 'foobar'"} = Util.parse_duration("foobar") + end + + test "should fail for invalid time units" do + assert {:error, "invalid duration: '1y'"} = Util.parse_duration("1y") + end + end +end From a596436702eaada3b2b3cf034ffffdb760a3deb0 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Sun, 26 Nov 2023 15:24:54 -0800 Subject: [PATCH 6/7] add debounce config validation --- lib/inngest/function.ex | 13 +----- lib/inngest/function/config.ex | 32 +++++++++++++-- test/inngest/function/config_test.exs | 57 +++++++++++++++++++++++++++ 3 files changed, 88 insertions(+), 14 deletions(-) create mode 100644 test/inngest/function/config_test.exs diff --git a/lib/inngest/function.ex b/lib/inngest/function.ex index 28c24a6..5184e2e 100644 --- a/lib/inngest/function.ex +++ b/lib/inngest/function.ex @@ -173,17 +173,8 @@ defmodule Inngest.Function do defp retries(), do: fn_opts() |> Map.get(:retries) defp maybe_debounce(config) do - case fn_opts() |> Map.get(:debounce) do - nil -> - config - - debounce -> - if Map.get(debounce, :period) == nil do - raise Inngest.InvalidDebounceConfigError - else - Map.put(config, :debounce, debounce) - end - end + fn_opts() + |> Inngest.FnOpts.validate_debounce(config) end defp maybe_batch_events(config) do diff --git a/lib/inngest/function/config.ex b/lib/inngest/function/config.ex index 8b89de4..5310835 100644 --- a/lib/inngest/function/config.ex +++ b/lib/inngest/function/config.ex @@ -9,6 +9,8 @@ defmodule Inngest.FnOpts do retries: 3 ] + alias Inngest.Util + @type t() :: %__MODULE__{ id: binary(), name: binary(), @@ -27,7 +29,31 @@ defmodule Inngest.FnOpts do timeout: binary() } - # @spec validate_debounce(t()) :: map() - # def validate_debounce(fnopts) do - # end + @spec validate_debounce(t(), map()) :: map() + def validate_debounce(fnopts, config) do + case fnopts |> Map.get(:debounce) do + nil -> + config + + debounce -> + period = Map.get(debounce, :period) + + if is_nil(period) do + raise Inngest.InvalidDebounceConfigError + end + + case Util.parse_duration(period) do + {:error, error} -> + raise Inngest.InvalidDebounceConfigError, message: error + + {:ok, seconds} -> + if seconds > 7 * Util.day_in_seconds() do + raise Inngest.InvalidDebounceConfigError, + message: "cannot specify period for more than 7 days" + end + end + + Map.put(config, :debounce, debounce) + end + end end diff --git a/test/inngest/function/config_test.exs b/test/inngest/function/config_test.exs new file mode 100644 index 0000000..b53d643 --- /dev/null +++ b/test/inngest/function/config_test.exs @@ -0,0 +1,57 @@ +defmodule Inngest.FnOptsTest do + use ExUnit.Case, async: true + + alias Inngest.FnOpts + + describe "validate_debounce/2" do + @fn_opts %FnOpts{ + id: "foobar", + name: "FooBar", + debounce: %{ + period: "5s" + } + } + + @config %{} + + test "should succeed with valid config" do + assert %{debounce: _} = FnOpts.validate_debounce(@fn_opts, @config) + end + + ## Invalid configs + test "should raise when period is missing" do + opts = drop_at(@fn_opts, [:debounce, :period]) + + assert_raise Inngest.InvalidDebounceConfigError, fn -> + FnOpts.validate_debounce(opts, @config) + end + end + + test "should raise with invalid period" do + opts = update_at(@fn_opts, [:debounce, :period], "yolo") + + assert_raise Inngest.InvalidDebounceConfigError, fn -> + FnOpts.validate_debounce(opts, @config) + end + end + + test "should raise with period longer than 7 days" do + opts = update_at(@fn_opts, [:debounce, :period], "8d") + + assert_raise Inngest.InvalidDebounceConfigError, fn -> + FnOpts.validate_debounce(opts, @config) + end + end + + # helper function to remove nested fields from a struct + defp drop_at(struct, path) do + access = Enum.map(path, fn f -> Access.key!(f) end) + pop_in(struct, access) |> elem(1) + end + + defp update_at(struct, path, new_value) do + access = Enum.map(path, fn f -> Access.key!(f) end) + update_in(struct, access, fn _ -> new_value end) + end + end +end From a918aaf88d48896615d48d7b0c545738642ad401 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Sun, 26 Nov 2023 15:43:12 -0800 Subject: [PATCH 7/7] move batch validation to fnopts module --- lib/inngest/function.ex | 37 +------------ lib/inngest/function/config.ex | 44 +++++++++++++++ test/inngest/function/config_test.exs | 80 +++++++++++++++++++++++---- 3 files changed, 116 insertions(+), 45 deletions(-) diff --git a/lib/inngest/function.ex b/lib/inngest/function.ex index 5184e2e..5accd17 100644 --- a/lib/inngest/function.ex +++ b/lib/inngest/function.ex @@ -178,41 +178,8 @@ defmodule Inngest.Function do end defp maybe_batch_events(config) do - case fn_opts() |> Map.get(:batch_events) do - nil -> - config - - batch -> - max_size = Map.get(batch, :max_size) - timeout = Map.get(batch, :timeout) - - if is_nil(max_size) do - raise Inngest.InvalidBatchEventConfigError, - message: "'max_size' must be set for batch_events" - end - - if is_nil(timeout) do - raise Inngest.InvalidBatchEventConfigError, - message: "'timeout' must be set for batch_events" - end - - case Regex.run(~r/(\d+)s/i, timeout) do - nil -> - raise Inngest.InvalidBatchEventConfigError, - message: "invalid 'timeout' value: #{timeout}" - - match -> - dur = match |> Enum.at(1) |> String.to_integer() - - if dur < 1 || dur > 60 do - raise Inngest.InvalidBatchEventConfigError, - message: "'timeout' duration: #{dur}s, needs to be 1s - 60s" - end - end - - batch = batch |> Map.put(:maxSize, max_size) |> Map.drop([:max_size]) - Map.put(config, :batchEvents, batch) - end + fn_opts() + |> Inngest.FnOpts.validate_batch_events(config) end defp fn_opts() do diff --git a/lib/inngest/function/config.ex b/lib/inngest/function/config.ex index 5310835..54168a5 100644 --- a/lib/inngest/function/config.ex +++ b/lib/inngest/function/config.ex @@ -29,6 +29,9 @@ defmodule Inngest.FnOpts do timeout: binary() } + @doc """ + Validate the debounce configuration + """ @spec validate_debounce(t(), map()) :: map() def validate_debounce(fnopts, config) do case fnopts |> Map.get(:debounce) do @@ -47,6 +50,7 @@ defmodule Inngest.FnOpts do raise Inngest.InvalidDebounceConfigError, message: error {:ok, seconds} -> + # credo:disable-for-next-line if seconds > 7 * Util.day_in_seconds() do raise Inngest.InvalidDebounceConfigError, message: "cannot specify period for more than 7 days" @@ -56,4 +60,44 @@ defmodule Inngest.FnOpts do Map.put(config, :debounce, debounce) end end + + @doc """ + Validate the event batch config + """ + @spec validate_batch_events(t(), map()) :: map() + def validate_batch_events(fnopts, config) do + case fnopts |> Map.get(:batch_events) do + nil -> + config + + batch -> + max_size = Map.get(batch, :max_size) + timeout = Map.get(batch, :timeout) + + if is_nil(max_size) do + raise Inngest.InvalidBatchEventConfigError, + message: "'max_size' must be set for batch_events" + end + + if is_nil(timeout) do + raise Inngest.InvalidBatchEventConfigError, + message: "'timeout' must be set for batch_events" + end + + case Util.parse_duration(timeout) do + {:error, error} -> + raise Inngest.InvalidBatchEventConfigError, message: error + + {:ok, seconds} -> + # credo:disable-for-next-line + if seconds < 1 || seconds > 60 do + raise Inngest.InvalidBatchEventConfigError, + message: "'timeout' duration set to '#{timeout}', needs to be 1s - 60s" + end + end + + batch = batch |> Map.put(:maxSize, max_size) |> Map.drop([:max_size]) + Map.put(config, :batchEvents, batch) + end + end end diff --git a/test/inngest/function/config_test.exs b/test/inngest/function/config_test.exs index b53d643..91cd447 100644 --- a/test/inngest/function/config_test.exs +++ b/test/inngest/function/config_test.exs @@ -3,6 +3,19 @@ defmodule Inngest.FnOptsTest do alias Inngest.FnOpts + @config %{} + + # helper function to remove nested fields from a struct + defp drop_at(struct, path) do + access = Enum.map(path, fn f -> Access.key!(f) end) + pop_in(struct, access) |> elem(1) + end + + defp update_at(struct, path, new_value) do + access = Enum.map(path, fn f -> Access.key!(f) end) + update_in(struct, access, fn _ -> new_value end) + end + describe "validate_debounce/2" do @fn_opts %FnOpts{ id: "foobar", @@ -12,10 +25,8 @@ defmodule Inngest.FnOptsTest do } } - @config %{} - test "should succeed with valid config" do - assert %{debounce: _} = FnOpts.validate_debounce(@fn_opts, @config) + assert %{debounce: %{period: "5s"}} = FnOpts.validate_debounce(@fn_opts, @config) end ## Invalid configs @@ -42,16 +53,65 @@ defmodule Inngest.FnOptsTest do FnOpts.validate_debounce(opts, @config) end end + end + + describe "validate_batch_events/2" do + @fn_opts %FnOpts{ + id: "foobar", + name: "Foobar", + batch_events: %{ + max_size: 10, + timeout: "5s" + } + } - # helper function to remove nested fields from a struct - defp drop_at(struct, path) do - access = Enum.map(path, fn f -> Access.key!(f) end) - pop_in(struct, access) |> elem(1) + test "should succeed with valid config" do + assert %{ + batchEvents: %{ + maxSize: 10, + timeout: "5s" + } + } = FnOpts.validate_batch_events(@fn_opts, @config) + end + + test "should raise if max_size is missing" do + opts = drop_at(@fn_opts, [:batch_events, :max_size]) + + assert_raise Inngest.InvalidBatchEventConfigError, + "'max_size' must be set for batch_events", + fn -> + FnOpts.validate_batch_events(opts, @config) + end end - defp update_at(struct, path, new_value) do - access = Enum.map(path, fn f -> Access.key!(f) end) - update_in(struct, access, fn _ -> new_value end) + test "should raise if timeout is missing" do + opts = drop_at(@fn_opts, [:batch_events, :timeout]) + + assert_raise Inngest.InvalidBatchEventConfigError, + "'timeout' must be set for batch_events", + fn -> + FnOpts.validate_batch_events(opts, @config) + end + end + + test "should raise if timeout is invalid" do + opts = update_at(@fn_opts, [:batch_events, :timeout], "hello") + + assert_raise Inngest.InvalidBatchEventConfigError, + "invalid duration: 'hello'", + fn -> + FnOpts.validate_batch_events(opts, @config) + end + end + + test "should raise if timeout is out of range" do + opts = update_at(@fn_opts, [:batch_events, :timeout], "2m") + + assert_raise Inngest.InvalidBatchEventConfigError, + "'timeout' duration set to '2m', needs to be 1s - 60s", + fn -> + FnOpts.validate_batch_events(opts, @config) + end end end end