Skip to content

Commit

Permalink
feat: Add failure handler (#64)
Browse files Browse the repository at this point in the history
Clean up some useless code and provide a way to call user provided
failure handler

---------

Co-authored-by: Darwin D Wu <darwin67@users.noreply.github.com>
  • Loading branch information
darwin67 and darwin67 authored Nov 24, 2023
1 parent f3792b6 commit 8555fa7
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 124 deletions.
89 changes: 61 additions & 28 deletions lib/inngest/function.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,6 @@ defmodule Inngest.Function do
"""
@callback exec(Context.t(), Input.t()) :: {:ok, any()} | {:error, any()}

@doc """
The method to be callbed when the Inngest function fails
"""
@callback on_failure(Context.t(), Input.t()) :: {:ok, any()} | {:error, any()}

defmacro __using__(_opts) do
quote location: :keep do
alias Inngest.{Client, Trigger}
Expand Down Expand Up @@ -120,31 +115,63 @@ defmodule Inngest.Function do
|> List.first()
end

@impl true
def on_failure(_ctx, _input), do: {:ok, "noop"}

def step(path),
do: %{
step: %Step{
id: :step,
name: "step",
runtime: %Step.RunTime{
url: "#{Config.app_host() <> path}?fnId=#{slug()}&step=step"
},
retries: %Step.Retry{
attempts: retries()
}
}
}
def slugs() do
failure = if failure_handler_defined?(__MODULE__), do: [failure_slug()], else: []
[slug()] ++ failure
end

def serve(path) do
%{
id: slug(),
name: name(),
triggers: [trigger()],
steps: step(path),
mod: __MODULE__
}
handler =
if failure_handler_defined?(__MODULE__) do
id = failure_slug()

[
%{
id: id,
name: "#{name()} (failure)",
triggers: [
%Trigger{
event: "inngest/function.failed",
expression: "event.data.function_id == \"#{slug()}\""
}
],
steps: %{
step: %Step{
id: :step,
name: "step",
runtime: %Step.RunTime{
url: "#{Config.app_host() <> path}?fnId=#{id}&step=step"
},
retries: %Step.Retry{
attempts: 0
}
}
}
}
]
else
[]
end

[
%{
id: slug(),
name: name(),
triggers: [trigger()],
steps: %{
step: %Step{
id: :step,
name: "step",
runtime: %Step.RunTime{
url: "#{Config.app_host() <> path}?fnId=#{slug()}&step=step"
},
retries: %Step.Retry{
attempts: retries()
}
}
}
}
] ++ handler
end

defp retries() do
Expand All @@ -156,6 +183,12 @@ defmodule Inngest.Function do
retry -> retry
end
end

defp failure_handler_defined?(mod) do
mod.__info__(:functions) |> Keyword.get(:handle_failure) == 2
end

defp failure_slug(), do: "#{slug()}-failure"
end
end

Expand Down
9 changes: 0 additions & 9 deletions lib/inngest/router/helper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,6 @@ defmodule Inngest.Router.Helper do

alias Inngest.Config

@spec func_map(list(), binary()) :: map()
def func_map(funcs, path) do
funcs
|> Enum.reduce(%{}, fn func, x ->
slug = func.slug()
Map.put(x, slug, func.serve(path))
end)
end

def load_functions(params) do
if Config.path_runtime_eval() do
%{funcs: funcs} = load_functions_from_path(params)
Expand Down
16 changes: 12 additions & 4 deletions lib/inngest/router/invoke.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@ defmodule Inngest.Router.Invoke do
end

defp exec(
%{request_path: path, private: %{raw_body: [body]}} = conn,
%{private: %{raw_body: [body]}} = conn,
%{"event" => event, "events" => events, "ctx" => ctx, "fnId" => fn_slug} = params
) do
func =
params
|> load_functions()
|> func_map(path)
|> Map.get(fn_slug)
|> Enum.find(fn func ->
Enum.member?(func.slugs(), fn_slug)
end)

ctx = %Inngest.Function.Context{
attempt: Map.get(ctx, "attempt", 0),
Expand Down Expand Up @@ -83,7 +84,11 @@ defmodule Inngest.Router.Invoke do

defp invoke(func, ctx, input) do
try do
func.mod.exec(ctx, input) |> SdkResponse.from_result([])
if is_failure?(input) do
func.handle_failure(ctx, input) |> SdkResponse.from_result([])
else
func.exec(ctx, input) |> SdkResponse.from_result([])
end
rescue
non_retry in Inngest.NonRetriableError ->
SdkResponse.from_result({:error, non_retry}, retry: false, stacktrace: __STACKTRACE__)
Expand Down Expand Up @@ -140,4 +145,7 @@ defmodule Inngest.Router.Invoke do
{:error, error}
end
end

defp is_failure?(%{event: %{name: "inngest/function.failed"}} = _input), do: true
defp is_failure?(_), do: false
end
4 changes: 2 additions & 2 deletions lib/inngest/router/register.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ defmodule Inngest.Router.Register do
funcs =
params
|> load_functions()
|> func_map(path)
|> Enum.flat_map(& &1.serve(path))

{status, resp} =
case register(path, funcs, framework: framework) do
Expand Down Expand Up @@ -54,7 +54,7 @@ defmodule Inngest.Router.Register do
sdk: Config.sdk_version(),
framework: framework,
appName: Config.app_name(),
functions: functions |> Enum.map(fn {_, v} -> v.mod.serve(path) end)
functions: functions
}

key = Inngest.Signature.hashed_signing_key(Config.signing_key())
Expand Down
51 changes: 51 additions & 0 deletions test/inngest/function/cases/failure_handler_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
defmodule Inngest.Function.Cases.RetriableTest do
use ExUnit.Case, async: true

alias Inngest.Test.DevServer
import Inngest.Test.Helper

@default_sleep 10_000
@event_name "test/plug.retriable"

@tag :integration
test "should fail after retrying and failure is handled" do
event_id = send_test_event(@event_name)
Process.sleep(@default_sleep)

assert {:ok,
%{
"data" => [
%{
"run_id" => _run_id,
"output" => %{
"error" => "invalid status code: 500",
"message" => stacktrace
},
"status" => "Failed"
}
]
}} = DevServer.run_ids(event_id)

assert stacktrace =~ "YOLO!!!"

{:ok, %{"data" => events}} = DevServer.list_events()

assert %{"id" => failed_id} =
events
|> Enum.find(fn evt ->
Map.get(evt, "name") == "inngest/function.failed" &&
get_in(evt, ["data", "event", "name"]) == @event_name
end)

assert {:ok,
%{
"data" => [
%{
"run_id" => _,
"output" => "error handled",
"status" => "Completed"
}
]
}} = DevServer.run_ids(failed_id)
end
end
30 changes: 0 additions & 30 deletions test/inngest/function/cases/retriable_test.exs

This file was deleted.

52 changes: 28 additions & 24 deletions test/inngest/function_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,40 @@ defmodule Inngest.FunctionTest do

describe "serve/1" do
test "event function should return approprivate map" do
assert %{
id: "test-event",
name: "App / Email: Awesome Event Func",
triggers: [
%Trigger{event: "my/awesome.event"}
],
steps: %{
step: %{
id: _,
name: _,
runtime: %{
type: "http",
url: _
},
retries: %{
attempts: _
assert [
%{
id: "test-event",
name: "App / Email: Awesome Event Func",
triggers: [
%Trigger{event: "my/awesome.event"}
],
steps: %{
step: %{
id: _,
name: _,
runtime: %{
type: "http",
url: _
},
retries: %{
attempts: _
}
}
}
}
} = TestEventFn.serve("/api/inngest")
] = TestEventFn.serve("/api/inngest")
end

test "cron function should return appropriate map" do
assert %{
id: "test-cron",
name: "Awesome Cron Func",
triggers: [
%Trigger{cron: "TZ=America/Los_Angeles * * * * *"}
]
} = TestCronFn.serve("/api/inngest")
assert [
%{
id: "test-cron",
name: "Awesome Cron Func",
triggers: [
%Trigger{cron: "TZ=America/Los_Angeles * * * * *"}
]
}
] = TestCronFn.serve("/api/inngest")
end
end

Expand Down
27 changes: 0 additions & 27 deletions test/inngest/router/helper_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,6 @@ defmodule Inngest.Router.HelperTest do

alias Inngest.Router.Helper

describe "func_map/2" do
test "should return a function map" do
path = "/api/inngest"
funcs = [Inngest.TestEventFn]

assert %{
"test-event" => %{
id: "test-event",
mod: Inngest.TestEventFn,
steps: %{
step: %Inngest.Function.Step{
id: :step,
name: "step"
}
},
triggers: [
%Inngest.Trigger{
event: "my/awesome.event",
expression: nil,
cron: nil
}
]
}
} = Helper.func_map(funcs, path)
end
end

describe "load_functions_from_path/1" do
@path "test/support/**/*.ex"
@paths ["dev/**/*.ex", @path]
Expand Down
9 changes: 9 additions & 0 deletions test/support/cases/retriable_error.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,13 @@ defmodule Inngest.Test.Case.RetriableError do

{:ok, "completed"}
end

def handle_failure(ctx, %{step: step} = _args) do
_ =
step.run(ctx, "handle-failure", fn ->
"CATCH ERROR!!!"
end)

{:ok, "error handled"}
end
end
6 changes: 6 additions & 0 deletions test/support/dev_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ defmodule Inngest.Test.DevServer do
|> parse_resp()
end

def list_events() do
client()
|> Tesla.get("/v1/events")
|> parse_resp()
end

defp client() do
middleware = [
{Tesla.Middleware.BaseUrl, @base_url},
Expand Down

0 comments on commit 8555fa7

Please sign in to comment.