Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support batching #66

Merged
merged 7 commits into from
Nov 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ jobs:
name: Test (Elixir ${{ matrix.elixir }} / OTP ${{ matrix.otp }})
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
# NOTE: best effort coverage
# https://hexdocs.pm/elixir/compatibility-and-deprecations.html#compatibility-between-elixir-and-erlang-otp
Expand Down
4 changes: 4 additions & 0 deletions lib/inngest/error.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ end
defmodule Inngest.InvalidDebounceConfigError do
defexception message: "a 'period' must be set for debounce"
end

defmodule Inngest.InvalidBatchEventConfigError do
defexception [:message]
end
78 changes: 21 additions & 57 deletions lib/inngest/function.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,16 @@ defmodule Inngest.Function do
)

@behaviour Inngest.Function
@default_retries 3

@impl true
def slug() do
__MODULE__.__info__(:attributes)
|> Keyword.get(:func)
|> List.first()
fn_opts()
|> Map.get(:id)
end

@impl true
def name() do
case __MODULE__.__info__(:attributes)
|> Keyword.get(:func)
|> List.first()
|> Map.get(:name) do
case fn_opts() |> Map.get(:name) do
nil -> slug()
name -> name
end
Expand All @@ -116,13 +110,13 @@ defmodule Inngest.Function do
end

def slugs() do
failure = if failure_handler_defined?(__MODULE__), do: [failure_slug()], else: []
failure = if failure_handler_defined?(), do: [failure_slug()], else: []
[slug()] ++ failure
end

def serve(path) do
handler =
if failure_handler_defined?(__MODULE__) do
if failure_handler_defined?() do
id = failure_slug()

[
Expand Down Expand Up @@ -172,38 +166,31 @@ defmodule Inngest.Function do
}
}
|> maybe_debounce()
|> maybe_batch_events()
] ++ handler
end

defp retries() do
case __MODULE__.__info__(:attributes)
|> Keyword.get(:func)
|> List.first()
|> Map.get(:retries) do
nil -> @default_retries
retry -> retry
end
end
defp retries(), do: fn_opts() |> Map.get(:retries)

defp maybe_debounce(config) do
case __MODULE__.__info__(:attributes)
|> Keyword.get(:func)
|> List.first()
|> Map.get(:debounce) do
nil ->
config

debounce ->
if Map.get(debounce, :period) == nil do
raise Inngest.InvalidDebounceConfigError
else
Map.put(config, :debounce, debounce)
end
fn_opts()
|> Inngest.FnOpts.validate_debounce(config)
end

defp maybe_batch_events(config) do
fn_opts()
|> Inngest.FnOpts.validate_batch_events(config)
end

defp fn_opts() do
case __MODULE__.__info__(:attributes) |> Keyword.get(:func) |> List.first() do
nil -> %Inngest.FnOpts{}
val -> val
end
end

defp failure_handler_defined?(mod) do
mod.__info__(:functions) |> Keyword.get(:handle_failure) == 2
defp failure_handler_defined?() do
__MODULE__.__info__(:functions) |> Keyword.get(:handle_failure) == 2
end

defp failure_slug(), do: "#{slug()}-failure"
Expand Down Expand Up @@ -246,26 +233,3 @@ defmodule Inngest.Function do

def validate_datetime(_), do: {:error, "Expect valid DateTime formatted input"}
end

defmodule Inngest.FnOpts do
@moduledoc false

defstruct [
:id,
:name,
:retries,
:debounce
]

@type t() :: %__MODULE__{
id: binary(),
name: binary(),
retries: number() | nil,
debounce: debounce() | nil
}

@type debounce() :: %{
key: nil | binary(),
period: binary()
}
end
103 changes: 103 additions & 0 deletions lib/inngest/function/config.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
defmodule Inngest.FnOpts do
@moduledoc false

defstruct [
:id,
:name,
:debounce,
:batch_events,
retries: 3
]

alias Inngest.Util

@type t() :: %__MODULE__{
id: binary(),
name: binary(),
retries: number() | nil,
debounce: debounce() | nil,
batch_events: batch_events() | nil
}

@type debounce() :: %{
key: nil | binary(),
period: binary()
}

@type batch_events() :: %{
max_size: number(),
timeout: binary()
}

@doc """
Validate the debounce configuration
"""
@spec validate_debounce(t(), map()) :: map()
def validate_debounce(fnopts, config) do
case fnopts |> Map.get(:debounce) do
nil ->
config

debounce ->
period = Map.get(debounce, :period)

if is_nil(period) do
raise Inngest.InvalidDebounceConfigError
end

case Util.parse_duration(period) do
{:error, error} ->
raise Inngest.InvalidDebounceConfigError, message: error

{:ok, seconds} ->
# credo:disable-for-next-line
if seconds > 7 * Util.day_in_seconds() do
raise Inngest.InvalidDebounceConfigError,
message: "cannot specify period for more than 7 days"
end
end

Map.put(config, :debounce, debounce)
end
end

@doc """
Validate the event batch config
"""
@spec validate_batch_events(t(), map()) :: map()
def validate_batch_events(fnopts, config) do
case fnopts |> Map.get(:batch_events) do
nil ->
config

batch ->
max_size = Map.get(batch, :max_size)
timeout = Map.get(batch, :timeout)

if is_nil(max_size) do
raise Inngest.InvalidBatchEventConfigError,
message: "'max_size' must be set for batch_events"
end

if is_nil(timeout) do
raise Inngest.InvalidBatchEventConfigError,
message: "'timeout' must be set for batch_events"
end

case Util.parse_duration(timeout) do
{:error, error} ->
raise Inngest.InvalidBatchEventConfigError, message: error

{:ok, seconds} ->
# credo:disable-for-next-line
if seconds < 1 || seconds > 60 do
raise Inngest.InvalidBatchEventConfigError,
message: "'timeout' duration set to '#{timeout}', needs to be 1s - 60s"
end
end

batch = batch |> Map.put(:maxSize, max_size) |> Map.drop([:max_size])
Map.put(config, :batchEvents, batch)
end
end
end
32 changes: 32 additions & 0 deletions lib/inngest/util.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule Inngest.Util do
@moduledoc """
Utility functions
"""

@doc """
Parse string duration that Inngest understands into seconds
"""
@spec parse_duration(binary()) :: {:ok, number()} | {:error, binary()}
def parse_duration(value) do
with [_, num, unit] <- Regex.run(~r/(\d+)(s|m|h|d)/i, value),
dur <- String.to_integer(num) do
case unit do
"d" -> {:ok, dur * day_in_seconds()}
"h" -> {:ok, dur * hour_in_seconds()}
"m" -> {:ok, dur * minute_in_seconds()}
"s" -> {:ok, dur}
_ -> {:error, "invalid time unit '#{unit}', must be d|h|m|s"}

Check warning on line 18 in lib/inngest/util.ex

View check run for this annotation

Codecov / codecov/patch

lib/inngest/util.ex#L18

Added line #L18 was not covered by tests
end
else
nil ->
{:error, "invalid duration: '#{value}'"}

_ ->
{:error, "unknow error occurred when parsing duration"}
end
end

def day_in_seconds(), do: 60 * 60 * 24
def hour_in_seconds(), do: 60 * 60
def minute_in_seconds(), do: 60
end
8 changes: 8 additions & 0 deletions test/inngest/function/cases/batch_events_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
defmodule Inngest.Function.Cases.BatchEventsTest do
use ExUnit.Case, async: true

alias Inngest.Test.DevServer

Check warning on line 4 in test/inngest/function/cases/batch_events_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.14 / OTP 24.3)

unused alias DevServer

Check warning on line 4 in test/inngest/function/cases/batch_events_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.14 / OTP 25.3)

unused alias DevServer

Check warning on line 4 in test/inngest/function/cases/batch_events_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.15 / OTP 24.3)

unused alias DevServer

Check warning on line 4 in test/inngest/function/cases/batch_events_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.15 / OTP 25.3)

unused alias DevServer
import Inngest.Test.Helper

Check warning on line 5 in test/inngest/function/cases/batch_events_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.14 / OTP 24.3)

unused import Inngest.Test.Helper

Check warning on line 5 in test/inngest/function/cases/batch_events_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.14 / OTP 25.3)

unused import Inngest.Test.Helper

Check warning on line 5 in test/inngest/function/cases/batch_events_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.15 / OTP 24.3)

unused import Inngest.Test.Helper

Check warning on line 5 in test/inngest/function/cases/batch_events_test.exs

View workflow job for this annotation

GitHub Actions / Test (Elixir 1.15 / OTP 25.3)

unused import Inngest.Test.Helper

# TODO: Add test after moving batching logic to OSS
end
Loading
Loading