From a6efb17f73564375cb039b61ba2c832aead98b2a Mon Sep 17 00:00:00 2001 From: Isaac Whitfield Date: Sun, 31 Mar 2024 20:45:47 -0700 Subject: [PATCH 1/9] Add new cluster structures to cache records --- lib/cachex/options.ex | 15 ++++++++++++++- lib/cachex/spec.ex | 21 +++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/lib/cachex/options.ex b/lib/cachex/options.ex index c145a43..7d6da3b 100644 --- a/lib/cachex/options.ex +++ b/lib/cachex/options.ex @@ -308,7 +308,20 @@ defmodule Cachex.Options do error(:invalid_nodes) true -> - cache(cache, nodes: nodes) + cache(cache, + nodes: nodes, + cluster: + cluster( + enabled: nodes != [node()], + router: fn key, nodes -> + key + |> :erlang.phash2() + |> Jumper.slot(length(nodes)) + end, + nodes: nodes + ) + ) + # coveralls-ignore-stop end end diff --git a/lib/cachex/spec.ex b/lib/cachex/spec.ex index 1540ac0..648b340 100644 --- a/lib/cachex/spec.ex +++ b/lib/cachex/spec.ex @@ -27,6 +27,7 @@ defmodule Cachex.Spec do @type cache :: record(:cache, name: atom, + cluster: cluster, commands: map, compressed: boolean, expiration: expiration, @@ -39,6 +40,14 @@ defmodule Cachex.Spec do warmers: [warmer] ) + # Record specification for a cluster instance + @type cluster :: + record(:cluster, + enabled: boolean, + router: (any, any -> atom), + nodes: any + ) + # Record specification for a command instance @type command :: record(:command, @@ -119,6 +128,7 @@ defmodule Cachex.Spec do """ defrecord :cache, name: nil, + cluster: nil, commands: %{}, compressed: false, expiration: nil, @@ -130,6 +140,11 @@ defmodule Cachex.Spec do transactions: false, warmers: [] + defrecord :cluster, + enabled: false, + router: nil, + nodes: nil + @doc """ Creates a command record from the provided values. @@ -273,6 +288,12 @@ defmodule Cachex.Spec do @spec cache(cache, Keyword.t()) :: cache defmacro cache(record, args) + @doc """ + Updates a cluster record from the provided values. + """ + @spec cluster(cluster, Keyword.t()) :: cluster + defmacro cluster(record, args) + @doc """ Updates a command record from the provided values. """ From 9d2ba79dfe385d76dbc3e6bd4912bb4d58eec43f Mon Sep 17 00:00:00 2001 From: Isaac Whitfield Date: Tue, 2 Apr 2024 00:12:48 -0700 Subject: [PATCH 2/9] Strip out routing macros and enable configurable routing --- lib/cachex/options.ex | 17 ++- lib/cachex/router.ex | 304 ++++++++++++++++++------------------------ lib/cachex/spec.ex | 6 +- 3 files changed, 139 insertions(+), 188 deletions(-) diff --git a/lib/cachex/options.ex b/lib/cachex/options.ex index 7d6da3b..8bdf0df 100644 --- a/lib/cachex/options.ex +++ b/lib/cachex/options.ex @@ -73,8 +73,9 @@ defmodule Cachex.Options do end) # wrap for compatibility - with cache() <- parsed, - do: {:ok, parsed} + with cache() <- parsed do + {:ok, parsed} + end end @doc """ @@ -309,16 +310,18 @@ defmodule Cachex.Options do true -> cache(cache, - nodes: nodes, cluster: cluster( enabled: nodes != [node()], router: fn key, nodes -> - key - |> :erlang.phash2() - |> Jumper.slot(length(nodes)) + slot = + key + |> :erlang.phash2() + |> Jumper.slot(length(nodes)) + + Enum.at(nodes, slot) end, - nodes: nodes + state: nodes ) ) diff --git a/lib/cachex/router.ex b/lib/cachex/router.ex index 888be1d..5f76fe9 100644 --- a/lib/cachex/router.ex +++ b/lib/cachex/router.ex @@ -27,7 +27,7 @@ defmodule Cachex.Router do Dispatches a call to an appropriate execution environment. This acts as a macro just to avoid the overhead of slicing up module - names are runtime, when they can be guaranteed at compile time much + names at runtime, when they can be guaranteed at compile time much more easily. """ defmacro call(cache, {action, _arguments} = call) do @@ -41,7 +41,11 @@ defmodule Cachex.Router do quote do Overseer.enforce unquote(cache) do - Router.execute(var!(cache), unquote(act_join), unquote(call)) + call = unquote(call) + cache = var!(cache) + module = unquote(act_join) + + Router.execute(cache, module, call) end end end @@ -52,23 +56,17 @@ defmodule Cachex.Router do This macro should not be called externally; the only reason it remains public is due to the code injected by the `dispatch/2` macro. """ - defmacro execute(cache, module, call) do - quote do - current = node() + @spec execute(Cachex.Spec.cache(), atom, {atom, [any]}) :: any + def execute(cache(cluster: cluster(enabled: false)) = cache, module, call), + do: route_local(cache, module, call) - case unquote(cache) do - cache(nodes: [^current]) -> - unquote(configure_local(cache, module, call)) + def execute(cache(cluster: cluster(enabled: true)) = cache, module, call), + do: route_cluster(cache, module, call) - cache(nodes: remote_nodes) -> - unquote( - configure_remote(cache, module, call, quote(do: remote_nodes)) - ) - end - end - end + ############### + # Private API # + ############### - @doc false # Results merging for distributed cache results. # # Follows these rules: @@ -80,16 +78,16 @@ defmodule Cachex.Router do # # This has to be public due to scopes, but we hide the docs # because we don't really care for anybody else calling it. - def result_merge(left, right) when is_list(left), + defp result_merge(left, right) when is_list(left), do: left ++ right - def result_merge(left, right) when is_number(left), + defp result_merge(left, right) when is_number(left), do: left + right - def result_merge(left, right) when is_boolean(left), + defp result_merge(left, right) when is_boolean(left), do: left && right - def result_merge(left, right) when is_map(left) do + defp result_merge(left, right) when is_map(left) do Map.merge(left, right, fn :creation_date, _left, right -> right @@ -102,10 +100,6 @@ defmodule Cachex.Router do end) end - ############### - # Private API # - ############### - # Provides handling for local actions on this node. # # This will provide handling of notifications across hooks before and after @@ -116,36 +110,29 @@ defmodule Cachex.Router do # simply executed as is. If `via` is provided, you can override the handle # passed to the hooks (useful for re-use of functions). An example of this # is `decr/4` which simply calls `incr/4` with `via: { :decr, arguments }`. - defp configure_local(cache, module, {_action, arguments} = call) do - quote do - call = unquote(call) - cache = unquote(cache) - module = unquote(module) - arguments = unquote(arguments) - - option = List.last(arguments) - notify = Keyword.get(option, :notify, true) - - message = - notify && - case option[:via] do - msg when not is_tuple(msg) -> call - msg -> msg - end - - notify && Informant.broadcast(cache, message) - result = apply(module, :execute, [cache | arguments]) - - if notify do - Informant.broadcast( - cache, - message, - Keyword.get(option, :hook_result, result) - ) - end + defp route_local(cache, module, {_action, arguments} = call) do + option = List.last(arguments) + notify = Keyword.get(option, :notify, true) + + message = + notify && + case option[:via] do + msg when not is_tuple(msg) -> call + msg -> msg + end - result + notify && Informant.broadcast(cache, message) + result = apply(module, :execute, [cache | arguments]) + + if notify do + Informant.broadcast( + cache, + message, + Keyword.get(option, :hook_result, result) + ) end + + result end # actions based on a key @@ -172,9 +159,11 @@ defmodule Cachex.Router do # the total number of slots available (i.e. the count of the nodes). If it comes # out to the local node, just execute the local code, otherwise RPC the base call # to the remote node, and just assume that it'll correctly handle it. - defp configure_remote(cache, module, {action, [key | _]} = call, nodes) - when action in @keyed_actions, - do: call_slot(cache, module, call, nodes, slot_key(key, nodes)) + defp route_cluster(cache, module, {action, [key | _]} = call) + when action in @keyed_actions do + cache(cluster: cluster(router: router, state: nodes)) = cache + route_node(cache, module, call, router.(key, nodes)) + end # actions which merge outputs @merge_actions [ @@ -196,53 +185,47 @@ defmodule Cachex.Router do # them with the results on the local node. The hooks will only be notified # on the local node, due to an annoying recursion issue when handling the # same across all nodes - seems to provide better logic though. - defp configure_remote(cache, module, {action, arguments} = call, nodes) + defp route_cluster(cache, module, {action, arguments} = call) when action in @merge_actions do - quote do - # :bind_quoted - call = unquote(call) - cache = unquote(cache) - nodes = unquote(nodes) - module = unquote(module) - arguments = unquote(arguments) - - # all calls have options we can use - options = List.last(arguments) - - # can force local node setting local: true - results = - case Keyword.get(options, :local) do - true -> - [] - - _any -> - # don't want to execute on the local node - other_nodes = List.delete(nodes, node()) - - # execute the call on all other nodes - {results, _} = - :rpc.multicall( - other_nodes, - module, - :execute, - [cache | arguments] - ) - - results - end + # fetch the nodes from the cluster state + cache(cluster: cluster(state: nodes)) = cache + + # all calls have options we can use + options = List.last(arguments) + + # can force local node setting local: true + results = + case Keyword.get(options, :local) do + true -> + [] + + _any -> + # don't want to execute on the local node + other_nodes = List.delete(nodes, node()) + + # execute the call on all other nodes + {results, _} = + :rpc.multicall( + other_nodes, + module, + :execute, + [cache | arguments] + ) - # execution on the local node, using the local macros and then unpack - {:ok, result} = unquote(configure_local(cache, module, call)) + results + end - # results merge - merge_result = - results - |> Enum.map(&elem(&1, 1)) - |> Enum.reduce(result, &Router.result_merge/2) + # execution on the local node, using the local macros and then unpack + {:ok, result} = route_local(cache, module, call) - # return after merge - {:ok, merge_result} - end + # results merge + merge_result = + results + |> Enum.map(&elem(&1, 1)) + |> Enum.reduce(result, &result_merge/2) + + # return after merge + {:ok, merge_result} end # actions which always run locally @@ -251,108 +234,75 @@ defmodule Cachex.Router do # Provides handling of `:inspect` operations. # # These operations are guaranteed to run on the local nodes. - defp configure_remote(cache, module, {action, _arguments} = call, _nodes) + defp route_cluster(cache, module, {action, _arguments} = call) when action in @local_actions, - do: configure_local(cache, module, call) + do: route_local(cache, module, call) # Provides handling of `:put_many` operations. # # These operations can only execute if their keys slot to the same remote nodes. - defp configure_remote(cache, module, {:put_many, _arguments} = call, nodes), - do: multi_call_slot(cache, module, call, nodes, quote(do: &elem(&1, 0))) + defp route_cluster(cache, module, {:put_many, _arguments} = call), + do: route_batch(cache, module, call, &elem(&1, 0)) # Provides handling of `:transaction` operations. # # These operations can only execute if their keys slot to the same remote nodes. - defp configure_remote(cache, module, {:transaction, [keys | _]} = call, nodes) do + defp route_cluster(cache, module, {:transaction, [keys | _]} = call) do case keys do - [] -> configure_local(cache, module, call) - _ -> multi_call_slot(cache, module, call, nodes, quote(do: & &1)) + [] -> route_local(cache, module, call) + _ -> route_batch(cache, module, call, & &1) end end # Any other actions are explicitly disabled in distributed environments. - defp configure_remote(_cache, _module, _call, _nodes), + defp route_cluster(_cache, _module, _call), do: error(:non_distributed) - # Calls a slot for the provided cache action. - # - # This will determine a local slot and delegate locally if so, bypassing - # any RPC calls required. This function currently assumes that there is - # a local variable available named "remote_nodes" and "slot", until I - # figure out how to better improve the macro scoping in use locally. - defp call_slot(cache, module, {action, arguments} = call, nodes, slot) do - quote do - slot = unquote(slot) - nodes = unquote(nodes) - action = unquote(action) - arguments = unquote(arguments) - cache(name: name) = unquote(cache) - - case Enum.at(nodes, slot) do - ^current -> - unquote(configure_local(cache, module, call)) - - targeted -> - result = - :rpc.call( - targeted, - Cachex, - action, - [name | arguments] - ) - - with {:badrpc, reason} <- result do - {:error, reason} - end - end - end - end - # Calls a slot for the provided cache action if all keys slot to the same node. # - # This is a delegate handler for `call_slot/5`, but ensures that all keys slot to the + # This is a delegate handler for `route_node/4`, but ensures that all keys slot to the # same node to avoid the case where we have to fork a call out internally. - defp multi_call_slot(cache, module, call, nodes, mapper) do - {_action, [keys | _]} = call - - quote do - # :bind_quoted - keys = unquote(keys) - mapper = unquote(mapper) - - # map all keys to a slot in the nodes list - slots = - Enum.map(keys, fn key -> - # basically just slot_key(mapper.(key), nodes) - unquote(slot_key(quote(do: mapper.(key)), nodes)) - end) - - # unique to avoid dups - case Enum.uniq(slots) do - # if there's a single slot it's safe to continue with the call to the remote - [slot] -> - unquote(call_slot(cache, module, call, nodes, quote(do: slot))) - - # otherwise, cross_slot errors! - _disable -> - error(:cross_slot) - end + defp route_batch(cache, module, {_action, [keys | _]} = call, mapper) do + # map all keys to a slot in the nodes list + cache(cluster: cluster(router: router, state: nodes)) = cache + slots = Enum.map(keys, &router.(mapper.(&1), nodes)) + + # unique to avoid dups + case Enum.uniq(slots) do + # if there's a single slot it's safe to continue with the call to the remote + [slot] -> + route_node(cache, module, call, slot) + + # otherwise, cross_slot errors! + _disable -> + error(:cross_slot) end end - # Slots a key into the list of provided nodes. + # Calls a node for the provided cache action. # - # This uses `:erlang.phash2/1` to hash the key to a numeric value, - # as keys can be basically any type - so others hashes would be - # more expensive due to the serialization costs. Note that the - # collision possibility isn't really relevant, as long as there's - # a uniformly random collision possibility. - defp slot_key(key, nodes) do - quote bind_quoted: [key: key, nodes: nodes] do - key - |> :erlang.phash2() - |> Jumper.slot(length(nodes)) + # This will determine a local slot and delegate locally if so, bypassing + # any RPC calls in order to gain a slight bit of performance. + defp route_node(cache, module, {action, arguments} = call, node) do + current = node() + cache(name: name) = cache + + case node do + ^current -> + route_local(cache, module, call) + + targeted -> + result = + :rpc.call( + targeted, + Cachex, + action, + [name | arguments] + ) + + with {:badrpc, reason} <- result do + {:error, reason} + end end end end diff --git a/lib/cachex/spec.ex b/lib/cachex/spec.ex index 648b340..423f6af 100644 --- a/lib/cachex/spec.ex +++ b/lib/cachex/spec.ex @@ -34,7 +34,6 @@ defmodule Cachex.Spec do fallback: fallback, hooks: hooks, limit: limit, - nodes: [atom], ordered: boolean, transactions: boolean, warmers: [warmer] @@ -45,7 +44,7 @@ defmodule Cachex.Spec do record(:cluster, enabled: boolean, router: (any, any -> atom), - nodes: any + state: any ) # Record specification for a command instance @@ -135,7 +134,6 @@ defmodule Cachex.Spec do fallback: nil, hooks: nil, limit: nil, - nodes: [], ordered: false, transactions: false, warmers: [] @@ -143,7 +141,7 @@ defmodule Cachex.Spec do defrecord :cluster, enabled: false, router: nil, - nodes: nil + state: nil @doc """ Creates a command record from the provided values. From 7732b1a5380408fdf2392fe8c756ada96fe6c713 Mon Sep 17 00:00:00 2001 From: Isaac Whitfield Date: Wed, 21 Aug 2024 16:43:18 -0700 Subject: [PATCH 3/9] Migrate to full routing behaviours and implementations --- lib/cachex.ex | 134 +++++++++---- lib/cachex/errors.ex | 4 + lib/cachex/options.ex | 49 +++-- lib/cachex/router.ex | 332 ++++--------------------------- lib/cachex/router/jump.ex | 44 ++++ lib/cachex/router/ring.ex | 14 ++ lib/cachex/services/conductor.ex | 311 +++++++++++++++++++++++++++++ lib/cachex/spec.ex | 53 +++-- lib/cachex/spec/validator.ex | 18 ++ mix.exs | 1 + test/cachex/errors_test.exs | 1 + 11 files changed, 599 insertions(+), 362 deletions(-) create mode 100644 lib/cachex/router/jump.ex create mode 100644 lib/cachex/router/ring.ex create mode 100644 lib/cachex/services/conductor.ex diff --git a/lib/cachex.ex b/lib/cachex.ex index 975f0fb..fafc281 100644 --- a/lib/cachex.ex +++ b/lib/cachex.ex @@ -41,15 +41,15 @@ defmodule Cachex do alias Cachex.ExecutionError alias Cachex.Options alias Cachex.Query - alias Cachex.Router alias Cachex.Services # alias any services + alias Services.Conductor alias Services.Overseer # import util macros + require Conductor require Overseer - require Router # avoid inspect clashes import Kernel, except: [inspect: 2] @@ -277,6 +277,23 @@ defmodule Cachex do iex> Cachex.start_link(:my_cache, [ ordered: true ]) { :ok, _pid } + * `:router` + + This option determines which module is used for cache routing inside distributed + caches. You can provide either a full `record` structure or simply a module name. + + iex> import Cachex.Spec + ...> + ...> Cachex.start_link(:my_cache, [ + ...> router: router( + ...> module: Cachex.Router.Jump, + ...> options: [] + ...> ) + ...> ]) + { :ok, _pid } + + Please see the `Cachex.Spec.router/1` documentation for further customization options. + * `:stats` This option can be used to toggle statistics gathering for a cache. This is a @@ -299,6 +316,33 @@ defmodule Cachex do iex> Cachex.start_link(:my_cache, [ transactions: true ]) { :ok, _pid } + * `:warmers` + + The `:warmers` option allows the user to attach a list of warming modules to + a cache. These cache warmers must implement the `Cachex.Warmer` behaviour + and are defined as `warmer` records. + + The only required value is the `:module` definition, although you can also + choose to provide a name and state to attach to the warmer process. The flag + `:required` is used to control whether the warmer must finish execution before + the cache supervision tree can be considered fully started. + + iex> import Cachex.Spec + ...> + ...> Cachex.start_link(:my_cache, [ + ...> warmers: [ + ...> warmer( + ...> required: true, + ...> module: MyProject.DatabaseWarmer, + ...> state: connection, + ...> name: MyProject.DatabaseWarmer + ...> ) + ...> ] + ...> ]) + { :ok, _pid } + + Please see the `Cachex.Spec.warmer/1` documentation for further customization options. + """ @spec start_link(atom | Keyword.t()) :: {atom, pid} def start_link(options) when is_list(options) do @@ -307,9 +351,10 @@ defmodule Cachex do {:ok, true} <- ensure_unused(name), {:ok, cache} <- Options.parse(name, options), {:ok, pid} = Supervisor.start_link(__MODULE__, cache, name: name), - {:ok, link} = Services.link(cache), - ^link <- Overseer.update(name, link), - :ok <- setup_warmers(link) do + {:ok, cache} = Services.link(cache), + {:ok, cache} <- setup_router(cache), + ^cache <- Overseer.update(name, cache), + :ok <- setup_warmers(cache) do {:ok, pid} end end @@ -378,7 +423,7 @@ defmodule Cachex do """ @spec clear(cache, Keyword.t()) :: {status, integer} def clear(cache, options \\ []) when is_list(options), - do: Router.call(cache, {:clear, [options]}) + do: Conductor.route(cache, {:clear, [options]}) @doc """ Retrieves the number of unexpired records in a cache. @@ -399,7 +444,7 @@ defmodule Cachex do """ @spec count(cache, Keyword.t()) :: {status, number} def count(cache, options \\ []) when is_list(options), - do: Router.call(cache, {:count, [options]}) + do: Conductor.route(cache, {:count, [options]}) @doc """ Decrements an entry in the cache. @@ -455,7 +500,7 @@ defmodule Cachex do """ @spec del(cache, any, Keyword.t()) :: {status, boolean} def del(cache, key, options \\ []) when is_list(options), - do: Router.call(cache, {:del, [key, options]}) + do: Conductor.route(cache, {:del, [key, options]}) @doc """ Serializes a cache to a location on a filesystem. @@ -489,7 +534,7 @@ defmodule Cachex do @spec dump(cache, binary, Keyword.t()) :: {status, any} def dump(cache, path, options \\ []) when is_binary(path) and is_list(options), - do: Router.call(cache, {:dump, [path, options]}) + do: Conductor.route(cache, {:dump, [path, options]}) @doc """ Determines whether a cache contains any entries. @@ -511,7 +556,7 @@ defmodule Cachex do """ @spec empty?(cache, Keyword.t()) :: {status, boolean} def empty?(cache, options \\ []) when is_list(options), - do: Router.call(cache, {:empty?, [options]}) + do: Conductor.route(cache, {:empty?, [options]}) @doc """ Executes multiple functions in the context of a cache. @@ -565,7 +610,7 @@ defmodule Cachex do """ @spec exists?(cache, any, Keyword.t()) :: {status, boolean} def exists?(cache, key, options \\ []) when is_list(options), - do: Router.call(cache, {:exists?, [key, options]}) + do: Conductor.route(cache, {:exists?, [key, options]}) @doc """ Places an expiration time on an entry in a cache. @@ -589,7 +634,7 @@ defmodule Cachex do @spec expire(cache, any, number | nil, Keyword.t()) :: {status, boolean} def expire(cache, key, expiration, options \\ []) when (is_nil(expiration) or is_number(expiration)) and is_list(options), - do: Router.call(cache, {:expire, [key, expiration, options]}) + do: Conductor.route(cache, {:expire, [key, expiration, options]}) @doc """ Updates an entry in a cache to expire at a given time. @@ -634,7 +679,7 @@ defmodule Cachex do """ @spec export(cache, Keyword.t()) :: {status, [Cachex.Spec.entry()]} def export(cache, options \\ []) when is_list(options), - do: Router.call(cache, {:export, [options]}) + do: Conductor.route(cache, {:export, [options]}) @doc """ Fetches an entry from a cache, generating a value on cache miss. @@ -701,7 +746,7 @@ defmodule Cachex do Overseer.enforce cache do case fallback || fallback(cache(cache, :fallback), :default) do val when is_function(val) -> - Router.call(cache, {:fetch, [key, val, options]}) + Conductor.route(cache, {:fetch, [key, val, options]}) _na -> error(:invalid_fallback) @@ -724,7 +769,7 @@ defmodule Cachex do """ @spec get(cache, any, Keyword.t()) :: {atom, any} def get(cache, key, options \\ []) when is_list(options), - do: Router.call(cache, {:get, [key, options]}) + do: Conductor.route(cache, {:get, [key, options]}) @doc """ Retrieves and updates an entry in a cache. @@ -754,9 +799,9 @@ defmodule Cachex do """ @spec get_and_update(cache, any, function, Keyword.t()) :: {:commit | :ignore, any} - def get_and_update(cache, key, update_function, options \\ []) - when is_function(update_function) and is_list(options), - do: Router.call(cache, {:get_and_update, [key, update_function, options]}) + def get_and_update(cache, key, updater, options \\ []) + when is_function(updater) and is_list(options), + do: Conductor.route(cache, {:get_and_update, [key, updater, options]}) @doc """ Retrieves a list of all entry keys from a cache. @@ -778,7 +823,7 @@ defmodule Cachex do """ @spec keys(cache, Keyword.t()) :: {status, [any]} def keys(cache, options \\ []) when is_list(options), - do: Router.call(cache, {:keys, [options]}) + do: Conductor.route(cache, {:keys, [options]}) @doc """ Imports an export set into a cache. @@ -796,7 +841,7 @@ defmodule Cachex do @spec import(cache, [Cachex.Spec.entry()], Keyword.t()) :: {status, any} def import(cache, entries, options \\ []) when is_list(entries) and is_list(options), - do: Router.call(cache, {:import, [entries, options]}) + do: Conductor.route(cache, {:import, [entries, options]}) @doc """ Increments an entry in the cache. @@ -827,7 +872,7 @@ defmodule Cachex do @spec incr(cache, any, integer, Keyword.t()) :: {status, integer} def incr(cache, key, amount \\ 1, options \\ []) when is_integer(amount) and is_list(options), - do: Router.call(cache, {:incr, [key, amount, options]}) + do: Conductor.route(cache, {:incr, [key, amount, options]}) @doc """ Inspects various aspects of a cache. @@ -913,7 +958,7 @@ defmodule Cachex do """ @spec inspect(cache, atom | tuple, Keyword.t()) :: {status, any} def inspect(cache, option, options \\ []) when is_list(options), - do: Router.call(cache, {:inspect, [option, options]}) + do: Conductor.route(cache, {:inspect, [option, options]}) @doc """ Invokes a custom command against a cache entry. @@ -939,7 +984,7 @@ defmodule Cachex do """ @spec invoke(cache, atom, any, Keyword.t()) :: any def invoke(cache, cmd, key, options \\ []) when is_list(options), - do: Router.call(cache, {:invoke, [cmd, key, options]}) + do: Conductor.route(cache, {:invoke, [cmd, key, options]}) @doc """ Deserializes a cache from a location on a filesystem. @@ -982,7 +1027,7 @@ defmodule Cachex do @spec load(cache, binary, Keyword.t()) :: {status, any} def load(cache, path, options \\ []) when is_binary(path) and is_list(options), - do: Router.call(cache, {:load, [path, options]}) + do: Conductor.route(cache, {:load, [path, options]}) @doc """ Removes an expiration time from an entry in a cache. @@ -1017,7 +1062,7 @@ defmodule Cachex do """ @spec purge(cache, Keyword.t()) :: {status, number} def purge(cache, options \\ []) when is_list(options), - do: Router.call(cache, {:purge, [options]}) + do: Conductor.route(cache, {:purge, [options]}) @doc """ Places an entry in a cache. @@ -1045,7 +1090,7 @@ defmodule Cachex do # TODO: maybe rename TTL to be expiration? @spec put(cache, any, any, Keyword.t()) :: {status, boolean} def put(cache, key, value, options \\ []) when is_list(options), - do: Router.call(cache, {:put, [key, value, options]}) + do: Conductor.route(cache, {:put, [key, value, options]}) @doc """ Places a batch of entries in a cache. @@ -1076,7 +1121,7 @@ defmodule Cachex do @spec put_many(cache, [{any, any}], Keyword.t()) :: {status, boolean} def put_many(cache, pairs, options \\ []) when is_list(pairs) and is_list(options), - do: Router.call(cache, {:put_many, [pairs, options]}) + do: Conductor.route(cache, {:put_many, [pairs, options]}) @doc """ Refreshes an expiration for an entry in a cache. @@ -1103,7 +1148,7 @@ defmodule Cachex do """ @spec refresh(cache, any, Keyword.t()) :: {status, boolean} def refresh(cache, key, options \\ []) when is_list(options), - do: Router.call(cache, {:refresh, [key, options]}) + do: Conductor.route(cache, {:refresh, [key, options]}) @doc """ Resets a cache by clearing the keyspace and restarting any hooks. @@ -1142,7 +1187,7 @@ defmodule Cachex do """ @spec reset(cache, Keyword.t()) :: {status, true} def reset(cache, options \\ []) when is_list(options), - do: Router.call(cache, {:reset, [options]}) + do: Conductor.route(cache, {:reset, [options]}) @doc """ Retrieves the total size of a cache. @@ -1163,7 +1208,7 @@ defmodule Cachex do """ @spec size(cache, Keyword.t()) :: {status, number} def size(cache, options \\ []) when is_list(options), - do: Router.call(cache, {:size, [options]}) + do: Conductor.route(cache, {:size, [options]}) @doc """ Retrieves statistics about a cache. @@ -1188,7 +1233,7 @@ defmodule Cachex do """ @spec stats(cache, Keyword.t()) :: {status, map()} def stats(cache, options \\ []) when is_list(options), - do: Router.call(cache, {:stats, [options]}) + do: Conductor.route(cache, {:stats, [options]}) @doc """ Creates a `Stream` of entries in a cache. @@ -1235,7 +1280,7 @@ defmodule Cachex do @spec stream(cache, any, Keyword.t()) :: {status, Enumerable.t()} def stream(cache, query \\ Query.create(true), options \\ []) when is_list(options), - do: Router.call(cache, {:stream, [query, options]}) + do: Conductor.route(cache, {:stream, [query, options]}) @doc """ Takes an entry from a cache. @@ -1258,7 +1303,7 @@ defmodule Cachex do """ @spec take(cache, any, Keyword.t()) :: {status, any} def take(cache, key, options \\ []) when is_list(options), - do: Router.call(cache, {:take, [key, options]}) + do: Conductor.route(cache, {:take, [key, options]}) @doc """ Updates the last write time on a cache entry. @@ -1268,7 +1313,7 @@ defmodule Cachex do """ @spec touch(cache, any, Keyword.t()) :: {status, boolean} def touch(cache, key, options \\ []) when is_list(options), - do: Router.call(cache, {:touch, [key, options]}) + do: Conductor.route(cache, {:touch, [key, options]}) @doc """ Executes multiple functions in the context of a transaction. @@ -1307,7 +1352,7 @@ defmodule Cachex do |> Overseer.update(&cache(&1, transactions: true)) end - Router.call(trans_cache, {:transaction, [keys, operation, options]}) + Conductor.route(trans_cache, {:transaction, [keys, operation, options]}) end end @@ -1332,7 +1377,7 @@ defmodule Cachex do """ @spec ttl(cache, any, Keyword.t()) :: {status, integer | nil} def ttl(cache, key, options \\ []) when is_list(options), - do: Router.call(cache, {:ttl, [key, options]}) + do: Conductor.route(cache, {:ttl, [key, options]}) @doc """ Updates an entry in a cache. @@ -1358,7 +1403,7 @@ defmodule Cachex do """ @spec update(cache, any, any, Keyword.t()) :: {status, any} def update(cache, key, value, options \\ []) when is_list(options), - do: Router.call(cache, {:update, [key, value, options]}) + do: Conductor.route(cache, {:update, [key, value, options]}) @doc """ Triggers a manual warming in a cache. @@ -1399,7 +1444,7 @@ defmodule Cachex do """ @spec warm(cache, Keyword.t()) :: {status, [atom()]} def warm(cache, options \\ []), - do: Router.call(cache, {:warm, [options]}) + do: Conductor.route(cache, {:warm, [options]}) ############### # Private API # @@ -1427,6 +1472,19 @@ defmodule Cachex do end end + # Initializes cache router on startup. + # + # This will initialize the base router state and attach all nodes + # provided at cache startup to the router state. + defp setup_router(cache(nodes: nodes, router: router) = cache) do + router(module: module, options: options) = router + + state = module.init(nodes, options) + local = module.nodes(state) == [node()] + + {:ok, cache(cache, router: router(router, state: state, enabled: !local))} + end + # Initializes cache warmers on startup. # # This will trigger the initial cache warming via `Cachex.warm/2` while diff --git a/lib/cachex/errors.ex b/lib/cachex/errors.ex index 9627cf6..8133e97 100644 --- a/lib/cachex/errors.ex +++ b/lib/cachex/errors.ex @@ -23,6 +23,7 @@ defmodule Cachex.Errors do :invalid_nodes, :invalid_option, :invalid_pairs, + :invalid_router, :invalid_warmer, :janitor_disabled, :no_cache, @@ -99,6 +100,9 @@ defmodule Cachex.Errors do def long_form(:invalid_pairs), do: "Invalid insertion pairs provided" + def long_form(:invalid_router), + do: "Invalid router definition provided" + def long_form(:invalid_warmer), do: "Invalid warmer definition provided" diff --git a/lib/cachex/options.ex b/lib/cachex/options.ex index 8bdf0df..05de160 100644 --- a/lib/cachex/options.ex +++ b/lib/cachex/options.ex @@ -20,6 +20,7 @@ defmodule Cachex.Options do :nodes, :limit, :hooks, + :router, :ordered, :commands, :fallback, @@ -309,22 +310,7 @@ defmodule Cachex.Options do error(:invalid_nodes) true -> - cache(cache, - cluster: - cluster( - enabled: nodes != [node()], - router: fn key, nodes -> - slot = - key - |> :erlang.phash2() - |> Jumper.slot(length(nodes)) - - Enum.at(nodes, slot) - end, - state: nodes - ) - ) - + cache(cache, nodes: nodes) # coveralls-ignore-stop end end @@ -339,6 +325,37 @@ defmodule Cachex.Options do ordered: get(options, :ordered, &is_boolean/1, false) ) + # Configures a cache based on router flags. + # + # This allows a user to provide a custom router for distributed + # caches, with the default being set to a default router record. + defp parse_type(:router, cache() = cache, options) do + router = + transform(options, :router, fn + # provided full record, woohoo! + router() = router -> + router + + # unset so default + nil -> + router() + + # shorthand router name + mod when is_atom(mod) -> + router(module: mod) + + # anything else, no thanks! + _invalid -> + nil + end) + + # validate using the spec validator + case Validator.valid?(:router, router) do + false -> error(:invalid_router) + true -> cache(cache, router: router) + end + end + # Configures a cache based on transaction flags. # # This will simply configure the `:transactions` field in the cache diff --git a/lib/cachex/router.ex b/lib/cachex/router.ex index 5f76fe9..41ca5fd 100644 --- a/lib/cachex/router.ex +++ b/lib/cachex/router.ex @@ -1,308 +1,64 @@ defmodule Cachex.Router do @moduledoc """ - Routing module to dispatch Cachex actions to their execution environment. + Module controlling routing behaviour definitions. - This module acts as the single source of dispatch within Cachex. In prior - versions the backing actions were called directly from the main interface - and were wrapped in macros, which was difficult to maintain and also quite - noisy. Now that all execution flows via the router, this is no longer an - issue and it also serves as a gateway to distribution in the future. + This module defines the router implementations for Cachex, allowing the user + to route commands between nodes in a cache cluster. This means that users + can provide their own routing and rebalancing logic without having to depend + on it being included in Cachex. """ - alias Cachex.Router - alias Cachex.Services - # add some service aliases - alias Services.Informant - alias Services.Overseer - - # import macro stuff - import Cachex.Errors - import Cachex.Spec - - ############## - # Public API # - ############## + ############# + # Behaviour # + ############# @doc """ - Dispatches a call to an appropriate execution environment. - - This acts as a macro just to avoid the overhead of slicing up module - names at runtime, when they can be guaranteed at compile time much - more easily. + Initializes a routing state using a list of nodes. """ - defmacro call(cache, {action, _arguments} = call) do - act_name = - action - |> Kernel.to_string() - |> String.replace_trailing("?", "") - |> Macro.camelize() - - act_join = :"Elixir.Cachex.Actions.#{act_name}" - - quote do - Overseer.enforce unquote(cache) do - call = unquote(call) - cache = var!(cache) - module = unquote(act_join) - - Router.execute(cache, module, call) - end - end - end + @callback init(nodes :: [atom], options :: Keyword.t()) :: any @doc """ - Executes a previously dispatched action. - - This macro should not be called externally; the only reason it remains - public is due to the code injected by the `dispatch/2` macro. + Retrieves the list of nodes from a routing state. """ - @spec execute(Cachex.Spec.cache(), atom, {atom, [any]}) :: any - def execute(cache(cluster: cluster(enabled: false)) = cache, module, call), - do: route_local(cache, module, call) - - def execute(cache(cluster: cluster(enabled: true)) = cache, module, call), - do: route_cluster(cache, module, call) - - ############### - # Private API # - ############### - - # Results merging for distributed cache results. - # - # Follows these rules: - # - # - Lists are always concatenated. - # - Numbers are always summed. - # - Booleans are always AND-ed. - # - Maps are always merged (recursively). - # - # This has to be public due to scopes, but we hide the docs - # because we don't really care for anybody else calling it. - defp result_merge(left, right) when is_list(left), - do: left ++ right - - defp result_merge(left, right) when is_number(left), - do: left + right - - defp result_merge(left, right) when is_boolean(left), - do: left && right - - defp result_merge(left, right) when is_map(left) do - Map.merge(left, right, fn - :creation_date, _left, right -> - right - - key, left, right when key in [:hit_rate, :miss_rate] -> - (left + right) / 2 - - _key, left, right -> - result_merge(left, right) - end) - end - - # Provides handling for local actions on this node. - # - # This will provide handling of notifications across hooks before and after - # the execution of an action. This is taken from code formerly in the old - # `Cachex.Actions` module, but has been moved here as it's more appropriate. - # - # If `notify` is set to false, notifications are disabled and the call is - # simply executed as is. If `via` is provided, you can override the handle - # passed to the hooks (useful for re-use of functions). An example of this - # is `decr/4` which simply calls `incr/4` with `via: { :decr, arguments }`. - defp route_local(cache, module, {_action, arguments} = call) do - option = List.last(arguments) - notify = Keyword.get(option, :notify, true) - - message = - notify && - case option[:via] do - msg when not is_tuple(msg) -> call - msg -> msg - end - - notify && Informant.broadcast(cache, message) - result = apply(module, :execute, [cache | arguments]) - - if notify do - Informant.broadcast( - cache, - message, - Keyword.get(option, :hook_result, result) - ) - end - - result - end - - # actions based on a key - @keyed_actions [ - :del, - :exists?, - :expire, - :fetch, - :get, - :get_and_update, - :incr, - :invoke, - :put, - :refresh, - :take, - :touch, - :ttl, - :update - ] - - # Provides handling to key-based actions distributed to remote nodes. - # - # The algorithm here is simple; hash the key and slot the value using JCH into - # the total number of slots available (i.e. the count of the nodes). If it comes - # out to the local node, just execute the local code, otherwise RPC the base call - # to the remote node, and just assume that it'll correctly handle it. - defp route_cluster(cache, module, {action, [key | _]} = call) - when action in @keyed_actions do - cache(cluster: cluster(router: router, state: nodes)) = cache - route_node(cache, module, call, router.(key, nodes)) - end - - # actions which merge outputs - @merge_actions [ - :clear, - :count, - :empty?, - :export, - :import, - :keys, - :purge, - :reset, - :size, - :stats - ] + @callback nodes(state :: any) :: [atom] - # Provides handling of cross-node actions distributed over remote nodes. - # - # This will do an RPC call across all nodes to fetch their results and merge - # them with the results on the local node. The hooks will only be notified - # on the local node, due to an annoying recursion issue when handling the - # same across all nodes - seems to provide better logic though. - defp route_cluster(cache, module, {action, arguments} = call) - when action in @merge_actions do - # fetch the nodes from the cluster state - cache(cluster: cluster(state: nodes)) = cache - - # all calls have options we can use - options = List.last(arguments) - - # can force local node setting local: true - results = - case Keyword.get(options, :local) do - true -> - [] - - _any -> - # don't want to execute on the local node - other_nodes = List.delete(nodes, node()) - - # execute the call on all other nodes - {results, _} = - :rpc.multicall( - other_nodes, - module, - :execute, - [cache | arguments] - ) - - results - end - - # execution on the local node, using the local macros and then unpack - {:ok, result} = route_local(cache, module, call) - - # results merge - merge_result = - results - |> Enum.map(&elem(&1, 1)) - |> Enum.reduce(result, &result_merge/2) - - # return after merge - {:ok, merge_result} - end - - # actions which always run locally - @local_actions [:dump, :inspect, :load, :warm] - - # Provides handling of `:inspect` operations. - # - # These operations are guaranteed to run on the local nodes. - defp route_cluster(cache, module, {action, _arguments} = call) - when action in @local_actions, - do: route_local(cache, module, call) - - # Provides handling of `:put_many` operations. - # - # These operations can only execute if their keys slot to the same remote nodes. - defp route_cluster(cache, module, {:put_many, _arguments} = call), - do: route_batch(cache, module, call, &elem(&1, 0)) - - # Provides handling of `:transaction` operations. - # - # These operations can only execute if their keys slot to the same remote nodes. - defp route_cluster(cache, module, {:transaction, [keys | _]} = call) do - case keys do - [] -> route_local(cache, module, call) - _ -> route_batch(cache, module, call, & &1) - end - end - - # Any other actions are explicitly disabled in distributed environments. - defp route_cluster(_cache, _module, _call), - do: error(:non_distributed) - - # Calls a slot for the provided cache action if all keys slot to the same node. - # - # This is a delegate handler for `route_node/4`, but ensures that all keys slot to the - # same node to avoid the case where we have to fork a call out internally. - defp route_batch(cache, module, {_action, [keys | _]} = call, mapper) do - # map all keys to a slot in the nodes list - cache(cluster: cluster(router: router, state: nodes)) = cache - slots = Enum.map(keys, &router.(mapper.(&1), nodes)) - - # unique to avoid dups - case Enum.uniq(slots) do - # if there's a single slot it's safe to continue with the call to the remote - [slot] -> - route_node(cache, module, call, slot) - - # otherwise, cross_slot errors! - _disable -> - error(:cross_slot) - end - end + @doc """ + Route a provided key to a node in a routing state. + """ + @callback route(state :: any, key :: any) :: atom - # Calls a node for the provided cache action. - # - # This will determine a local slot and delegate locally if so, bypassing - # any RPC calls in order to gain a slight bit of performance. - defp route_node(cache, module, {action, arguments} = call, node) do - current = node() - cache(name: name) = cache + @doc """ + Attach a new node to a routing state. + """ + @callback attach(state :: any, node :: atom) :: any - case node do - ^current -> - route_local(cache, module, call) + @doc """ + Detach an existing node from a routing state. + """ + @callback detach(state :: any, node :: atom) :: any + + @doc false + defmacro __using__(_) do + quote location: :keep, generated: true do + # inherit the behaviour + @behaviour Cachex.Router + + @doc false + def attach(state, node), + do: + raise(RuntimeError, + message: "Router does not support node addition" + ) - targeted -> - result = - :rpc.call( - targeted, - Cachex, - action, - [name | arguments] + @doc false + def detach(state, node), + do: + raise(RuntimeError, + message: "Router does not support node removal" ) - with {:badrpc, reason} <- result do - {:error, reason} - end + # state modifiers are overridable + defoverridable attach: 2, detach: 2 end end end diff --git a/lib/cachex/router/jump.ex b/lib/cachex/router/jump.ex new file mode 100644 index 0000000..6a3d9d3 --- /dev/null +++ b/lib/cachex/router/jump.ex @@ -0,0 +1,44 @@ +defmodule Cachex.Router.Jump do + @moduledoc """ + Basic routing implementation based on Jump Consistent Hash. + + This implementation backed Cachex's distribution in the v3.x lineage, + and is suitable for clusters of a static size. Attaching and detaching + nodes after initialization is not supported and will cause an error + if you attempt to do so. + + For more information on the algorithm backing this router, please + see the appropriate [publication](https://arxiv.org/pdf/1406.2294). + """ + use Cachex.Router + + @doc """ + Initializes a routing state using a list of nodes. + + In the case of this router the routing state is simply the list + of nodes being tracked, with duplicate entries removed. + """ + @spec init(nodes :: [atom], options :: Keyword.t()) :: [atom] + def init(nodes, _options), + do: Enum.uniq(nodes) + + @doc """ + Retrieves the list of nodes from a routing state. + """ + @spec nodes(nodes :: [atom]) :: [atom] + def nodes(nodes), + do: nodes + + @doc """ + Route a provided key to a node in a routing state. + """ + @spec route(nodes :: [atom], key :: any) :: atom + def route(nodes, key) do + slot = + key + |> :erlang.phash2() + |> Jumper.slot(length(nodes)) + + Enum.at(nodes, slot) + end +end diff --git a/lib/cachex/router/ring.ex b/lib/cachex/router/ring.ex new file mode 100644 index 0000000..3ffcb86 --- /dev/null +++ b/lib/cachex/router/ring.ex @@ -0,0 +1,14 @@ +# defmodule Cachex.Router.Ring do +# use Cachex.Router + +# def init(nodes, _options \\ []) do +# ring = HashRing.new() +# ring = HashRing.add_nodes(ring, nodes) +# ring +# end + +# defdelegate nodes(ring), to: HashRing, as: :nodes +# defdelegate route(ring, key), to: HashRing, as: :key_to_node +# defdelegate attach(ring, node), to: HashRing, as: :add_node +# defdelegate detach(ring, node), to: HashRing, as: :remove_node +# end diff --git a/lib/cachex/services/conductor.ex b/lib/cachex/services/conductor.ex new file mode 100644 index 0000000..a357676 --- /dev/null +++ b/lib/cachex/services/conductor.ex @@ -0,0 +1,311 @@ +defmodule Cachex.Services.Conductor do + @moduledoc """ + Routing module to dispatch Cachex actions to their execution environment. + + This module acts as the single source of dispatch within Cachex. In prior + versions the backing actions were called directly from the main interface + and were wrapped in macros, which was difficult to maintain and also quite + noisy. Now that all execution flows via the router, this is no longer an + issue and it also serves as a gateway to distribution in the future. + """ + alias Cachex.Services + + # add some service aliases + alias Services.Conductor + alias Services.Informant + alias Services.Overseer + + # import macro stuff + import Cachex.Errors + import Cachex.Spec + + ############## + # Public API # + ############## + + @doc """ + Executes a previously dispatched action. + + This macro should not be called externally; the only reason it remains + public is due to the code injected by the `dispatch/2` macro. + """ + @spec route(Cachex.Spec.cache(), atom, {atom, [any]}) :: any + def route(cache(router: router(enabled: false)) = cache, module, call), + do: route_local(cache, module, call) + + def route(cache(router: router(enabled: true)) = cache, module, call), + do: route_cluster(cache, module, call) + + @doc """ + Dispatches a call to an appropriate execution environment. + + This acts as a macro just to avoid the overhead of slicing up module + names at runtime, when they can be guaranteed at compile time much + more easily. + """ + defmacro route(cache, {action, _arguments} = call) do + act_name = + action + |> Kernel.to_string() + |> String.replace_trailing("?", "") + |> Macro.camelize() + + act_join = :"Elixir.Cachex.Actions.#{act_name}" + + quote do + Overseer.enforce unquote(cache) do + call = unquote(call) + cache = var!(cache) + module = unquote(act_join) + + Conductor.route(cache, module, call) + end + end + end + + ############### + # Private API # + ############### + + # Results merging for distributed cache results. + # + # Follows these rules: + # + # - Lists are always concatenated. + # - Numbers are always summed. + # - Booleans are always AND-ed. + # - Maps are always merged (recursively). + # + # This has to be public due to scopes, but we hide the docs + # because we don't really care for anybody else calling it. + defp result_merge(left, right) when is_list(left), + do: left ++ right + + defp result_merge(left, right) when is_number(left), + do: left + right + + defp result_merge(left, right) when is_boolean(left), + do: left && right + + defp result_merge(left, right) when is_map(left) do + Map.merge(left, right, fn + :creation_date, _left, right -> + right + + key, left, right when key in [:hit_rate, :miss_rate] -> + (left + right) / 2 + + _key, left, right -> + result_merge(left, right) + end) + end + + # Provides handling for local actions on this node. + # + # This will provide handling of notifications across hooks before and after + # the execution of an action. This is taken from code formerly in the old + # `Cachex.Actions` module, but has been moved here as it's more appropriate. + # + # If `notify` is set to false, notifications are disabled and the call is + # simply executed as is. If `via` is provided, you can override the handle + # passed to the hooks (useful for re-use of functions). An example of this + # is `decr/4` which simply calls `incr/4` with `via: { :decr, arguments }`. + defp route_local(cache, module, {_action, arguments} = call) do + option = List.last(arguments) + notify = Keyword.get(option, :notify, true) + + message = + notify && + case option[:via] do + msg when not is_tuple(msg) -> call + msg -> msg + end + + notify && Informant.broadcast(cache, message) + result = apply(module, :execute, [cache | arguments]) + + if notify do + Informant.broadcast( + cache, + message, + Keyword.get(option, :hook_result, result) + ) + end + + result + end + + # actions based on a key + @keyed_actions [ + :del, + :exists?, + :expire, + :fetch, + :get, + :get_and_update, + :incr, + :invoke, + :put, + :refresh, + :take, + :touch, + :ttl, + :update + ] + + # Provides handling to key-based actions distributed to remote nodes. + # + # The algorithm here is simple; hash the key and slot the value using JCH into + # the total number of slots available (i.e. the count of the nodes). If it comes + # out to the local node, just execute the local code, otherwise RPC the base call + # to the remote node, and just assume that it'll correctly handle it. + defp route_cluster(cache, module, {action, [key | _]} = call) + when action in @keyed_actions do + cache(router: router(module: router, state: nodes)) = cache + route_node(cache, module, call, router.route(nodes, key)) + end + + # actions which merge outputs + @merge_actions [ + :clear, + :count, + :empty?, + :export, + :import, + :keys, + :purge, + :reset, + :size, + :stats + ] + + # Provides handling of cross-node actions distributed over remote nodes. + # + # This will do an RPC call across all nodes to fetch their results and merge + # them with the results on the local node. The hooks will only be notified + # on the local node, due to an annoying recursion issue when handling the + # same across all nodes - seems to provide better logic though. + defp route_cluster(cache, module, {action, arguments} = call) + when action in @merge_actions do + # fetch the nodes from the cluster state + cache(router: router(module: router, state: state)) = cache + + # all calls have options we can use + options = List.last(arguments) + + # can force local node setting local: true + results = + case Keyword.get(options, :local) do + true -> + [] + + _any -> + # don't want to execute on the local node + other_nodes = + state + |> router.nodes() + |> List.delete(node()) + + # execute the call on all other nodes + {results, _} = + :rpc.multicall( + other_nodes, + module, + :execute, + [cache | arguments] + ) + + results + end + + # execution on the local node, using the local macros and then unpack + {:ok, result} = route_local(cache, module, call) + + # results merge + merge_result = + results + |> Enum.map(&elem(&1, 1)) + |> Enum.reduce(result, &result_merge/2) + + # return after merge + {:ok, merge_result} + end + + # actions which always run locally + @local_actions [:dump, :inspect, :load, :warm] + + # Provides handling of `:inspect` operations. + # + # These operations are guaranteed to run on the local nodes. + defp route_cluster(cache, module, {action, _arguments} = call) + when action in @local_actions, + do: route_local(cache, module, call) + + # Provides handling of `:put_many` operations. + # + # These operations can only execute if their keys slot to the same remote nodes. + defp route_cluster(cache, module, {:put_many, _arguments} = call), + do: route_batch(cache, module, call, &elem(&1, 0)) + + # Provides handling of `:transaction` operations. + # + # These operations can only execute if their keys slot to the same remote nodes. + defp route_cluster(cache, module, {:transaction, [keys | _]} = call) do + case keys do + [] -> route_local(cache, module, call) + _ -> route_batch(cache, module, call, & &1) + end + end + + # Any other actions are explicitly disabled in distributed environments. + defp route_cluster(_cache, _module, _call), + do: error(:non_distributed) + + # Calls a slot for the provided cache action if all keys slot to the same node. + # + # This is a delegate handler for `route_node/4`, but ensures that all keys slot to the + # same node to avoid the case where we have to fork a call out internally. + defp route_batch(cache, module, {_action, [keys | _]} = call, mapper) do + # map all keys to a slot in the nodes list + cache(router: router(module: router, state: state)) = cache + slots = Enum.map(keys, &router.route(state, mapper.(&1))) + + # unique to avoid dups + case Enum.uniq(slots) do + # if there's a single slot it's safe to continue with the call to the remote + [slot] -> + route_node(cache, module, call, slot) + + # otherwise, cross_slot errors! + _disable -> + error(:cross_slot) + end + end + + # Calls a node for the provided cache action. + # + # This will determine a local slot and delegate locally if so, bypassing + # any RPC calls in order to gain a slight bit of performance. + defp route_node(cache, module, {action, arguments} = call, node) do + current = node() + cache(name: name) = cache + + case node do + ^current -> + route_local(cache, module, call) + + targeted -> + result = + :rpc.call( + targeted, + Cachex, + action, + [name | arguments] + ) + + with {:badrpc, reason} <- result do + {:error, reason} + end + end + end +end diff --git a/lib/cachex/spec.ex b/lib/cachex/spec.ex index 282af69..5572f5d 100644 --- a/lib/cachex/spec.ex +++ b/lib/cachex/spec.ex @@ -27,26 +27,19 @@ defmodule Cachex.Spec do @type cache :: record(:cache, name: atom, - cluster: cluster, commands: map, compressed: boolean, expiration: expiration, fallback: fallback, hooks: hooks, limit: limit, + nodes: [atom], ordered: boolean, + router: router, transactions: boolean, warmers: [warmer] ) - # Record specification for a cluster instance - @type cluster :: - record(:cluster, - enabled: boolean, - router: (any, any -> atom), - state: any - ) - # Record specification for a command instance @type command :: record(:command, @@ -105,6 +98,14 @@ defmodule Cachex.Spec do options: Keyword.t() ) + # Record specification for a router instance + @type router :: + record(:router, + enabled: boolean, + module: atom, + state: any + ) + # Record specification for a cache warmer @type warmer :: record(:warmer, @@ -134,15 +135,12 @@ defmodule Cachex.Spec do fallback: nil, hooks: nil, limit: nil, + nodes: [], ordered: false, + router: nil, transactions: false, warmers: [] - defrecord :cluster, - enabled: false, - router: nil, - state: nil - @doc """ Creates a command record from the provided values. @@ -260,6 +258,21 @@ defmodule Cachex.Spec do reclaim: 0.1, options: [] + @doc """ + Creates a router record from the provided values. + + A router record reprsents routing within a distributed cache. Each router record should have a + valid routing module provided, which correct implements the behaviour defined in `Cachex.Router`. + + Options to be passed on router state initialization can also be provided, but note that all other + values inside this structure are for internal use and will be overwritten as needed. + """ + defrecord :router, + enabled: false, + options: [], + module: Cachex.Router.Jump, + state: nil + @doc """ Creates a warmer record from the provided values. @@ -286,12 +299,6 @@ defmodule Cachex.Spec do @spec cache(cache, Keyword.t()) :: cache defmacro cache(record, args) - @doc """ - Updates a cluster record from the provided values. - """ - @spec cluster(cluster, Keyword.t()) :: cluster - defmacro cluster(record, args) - @doc """ Updates a command record from the provided values. """ @@ -334,6 +341,12 @@ defmodule Cachex.Spec do @spec limit(limit, Keyword.t()) :: limit defmacro limit(record, args) + @doc """ + Updates a router record from the provided values. + """ + @spec router(router, Keyword.t()) :: router + defmacro router(record, args) + @doc """ Updates a warmer record from the provided values. """ diff --git a/lib/cachex/spec/validator.ex b/lib/cachex/spec/validator.ex index 60464dd..925be3d 100644 --- a/lib/cachex/spec/validator.ex +++ b/lib/cachex/spec/validator.ex @@ -131,6 +131,24 @@ defmodule Cachex.Spec.Validator do check6 end + # Validates a router specification record. + # + # This will validate the correctly implemented `Cachex.Router` behaviour + # and confirm that the provided options are a keyword list. + def valid?(:router, router() = router) do + router(options: options, module: module, enabled: enabled) = router + + check1 = behaviour?(module, Cachex.Router) + check2 = check1 and is_boolean(enabled) + check3 = check2 and {:init, 2} in module.__info__(:functions) + check4 = check3 and {:nodes, 1} in module.__info__(:functions) + check5 = check4 and {:route, 2} in module.__info__(:functions) + check6 = check5 and {:attach, 2} in module.__info__(:functions) + check7 = check6 and {:detach, 2} in module.__info__(:functions) + check8 = check7 and Keyword.keyword?(options) + check8 + end + # Validates a warmer specification record. # # This will validate that the provided module correctly implements the diff --git a/mix.exs b/mix.exs index f9468a8..3c59daa 100644 --- a/mix.exs +++ b/mix.exs @@ -95,6 +95,7 @@ defmodule Cachex.Mixfile do # Production dependencies {:eternal, "~> 1.2"}, {:jumper, "~> 1.0"}, + {:libring, "~> 1.6"}, {:sleeplocks, "~> 1.1"}, {:unsafe, "~> 1.0"}, # Testing dependencies diff --git a/test/cachex/errors_test.exs b/test/cachex/errors_test.exs index f6c4ae4..a5c598c 100644 --- a/test/cachex/errors_test.exs +++ b/test/cachex/errors_test.exs @@ -17,6 +17,7 @@ defmodule Cachex.ErrorsTest do invalid_nodes: "Invalid nodes list provided", invalid_option: "Invalid option syntax provided", invalid_pairs: "Invalid insertion pairs provided", + invalid_router: "Invalid router definition provided", invalid_warmer: "Invalid warmer definition provided", janitor_disabled: "Specified janitor process running", no_cache: "Specified cache not running", From 4b6a8a2b60b6d918b7199f56a909af99bea53b02 Mon Sep 17 00:00:00 2001 From: Isaac Whitfield Date: Wed, 21 Aug 2024 16:58:55 -0700 Subject: [PATCH 4/9] Fill out Cachex.Router.Ring documentation --- lib/cachex/router.ex | 4 +-- lib/cachex/router/jump.ex | 4 +-- lib/cachex/router/ring.ex | 54 +++++++++++++++++++++++++++++---------- 3 files changed, 44 insertions(+), 18 deletions(-) diff --git a/lib/cachex/router.ex b/lib/cachex/router.ex index 41ca5fd..8a8a77c 100644 --- a/lib/cachex/router.ex +++ b/lib/cachex/router.ex @@ -13,12 +13,12 @@ defmodule Cachex.Router do ############# @doc """ - Initializes a routing state using a list of nodes. + Initialize a routing state using a list of nodes. """ @callback init(nodes :: [atom], options :: Keyword.t()) :: any @doc """ - Retrieves the list of nodes from a routing state. + Retrieve the list of nodes from a routing state. """ @callback nodes(state :: any) :: [atom] diff --git a/lib/cachex/router/jump.ex b/lib/cachex/router/jump.ex index 6a3d9d3..bf3ac6a 100644 --- a/lib/cachex/router/jump.ex +++ b/lib/cachex/router/jump.ex @@ -13,7 +13,7 @@ defmodule Cachex.Router.Jump do use Cachex.Router @doc """ - Initializes a routing state using a list of nodes. + Initialize a routing state using a list of nodes. In the case of this router the routing state is simply the list of nodes being tracked, with duplicate entries removed. @@ -23,7 +23,7 @@ defmodule Cachex.Router.Jump do do: Enum.uniq(nodes) @doc """ - Retrieves the list of nodes from a routing state. + Retrieve the list of nodes from a routing state. """ @spec nodes(nodes :: [atom]) :: [atom] def nodes(nodes), diff --git a/lib/cachex/router/ring.ex b/lib/cachex/router/ring.ex index 3ffcb86..6793723 100644 --- a/lib/cachex/router/ring.ex +++ b/lib/cachex/router/ring.ex @@ -1,14 +1,40 @@ -# defmodule Cachex.Router.Ring do -# use Cachex.Router - -# def init(nodes, _options \\ []) do -# ring = HashRing.new() -# ring = HashRing.add_nodes(ring, nodes) -# ring -# end - -# defdelegate nodes(ring), to: HashRing, as: :nodes -# defdelegate route(ring, key), to: HashRing, as: :key_to_node -# defdelegate attach(ring, node), to: HashRing, as: :add_node -# defdelegate detach(ring, node), to: HashRing, as: :remove_node -# end +defmodule Cachex.Router.Ring do + @moduledoc """ + Simple routing implementation based on a consistent hash ring. + + This implementation makes use of a hashing ring to better enable + modification of the internal node listing. Cachex uses the library + [libring](https://github.com/bitwalker/libring) to do the heavy + lifting here. + """ + use Cachex.Router + + @doc """ + Initialize a ring using a list of nodes. + """ + def init(nodes, _options \\ []) do + ring = HashRing.new() + ring = HashRing.add_nodes(ring, nodes) + ring + end + + @doc """ + Retrieve the list of nodes from a ring. + """ + defdelegate nodes(ring), to: HashRing, as: :nodes + + @doc """ + Route a provided key to a node in a ring. + """ + defdelegate route(ring, key), to: HashRing, as: :key_to_node + + @doc """ + Attach a new node to a ring. + """ + defdelegate attach(ring, node), to: HashRing, as: :add_node + + @doc """ + Detach an existing node to a ring. + """ + defdelegate detach(ring, node), to: HashRing, as: :remove_node +end From a964b3143b1c5d39203605c86485d1c7439c2cbe Mon Sep 17 00:00:00 2001 From: Isaac Whitfield Date: Wed, 21 Aug 2024 18:49:47 -0700 Subject: [PATCH 5/9] Add some missing test coverage in routing --- coveralls.json | 1 - lib/cachex.ex | 2 +- lib/cachex/policy/lrw/evented.ex | 4 +- lib/cachex/router.ex | 2 +- lib/cachex/router/jump.ex | 9 ++-- lib/cachex/router/ring.ex | 23 ++++++-- lib/cachex/services/conductor.ex | 2 + lib/cachex/spec/validator.ex | 2 +- test/cachex/actions/transaction_test.exs | 3 +- test/cachex/options_test.exs | 68 ++++++++++++++++-------- test/cachex/policy/lrw/evented_test.exs | 14 ++++- test/cachex/router/jump_test.exs | 29 ++++++++++ test/cachex/router/ring_test.exs | 42 +++++++++++++++ test/cachex/router_test.exs | 35 ++++++++++++ test/lib/cachex_case.ex | 1 + 15 files changed, 200 insertions(+), 37 deletions(-) create mode 100644 test/cachex/router/jump_test.exs create mode 100644 test/cachex/router/ring_test.exs create mode 100644 test/cachex/router_test.exs diff --git a/coveralls.json b/coveralls.json index f774a66..f954291 100644 --- a/coveralls.json +++ b/coveralls.json @@ -3,7 +3,6 @@ "treat_no_relevant_lines_as_covered": true }, "skip_files": [ - "lib/cachex/router.ex", "lib/cachex/spec.ex", "mix.exs" ] diff --git a/lib/cachex.ex b/lib/cachex.ex index fafc281..3131b84 100644 --- a/lib/cachex.ex +++ b/lib/cachex.ex @@ -1479,7 +1479,7 @@ defmodule Cachex do defp setup_router(cache(nodes: nodes, router: router) = cache) do router(module: module, options: options) = router - state = module.init(nodes, options) + state = module.new(nodes, options) local = module.nodes(state) == [node()] {:ok, cache(cache, router: router(router, state: state, enabled: !local))} diff --git a/lib/cachex/policy/lrw/evented.ex b/lib/cachex/policy/lrw/evented.ex index 8968258..73f81f0 100644 --- a/lib/cachex/policy/lrw/evented.ex +++ b/lib/cachex/policy/lrw/evented.ex @@ -16,8 +16,8 @@ defmodule Cachex.Policy.LRW.Evented do # add internal aliases alias Cachex.Policy.LRW - # actions which didn't trigger a write - @ignored [:error, :ignored] + # actions which didn't trigger + @ignored [:error, :ignore] ###################### # Hook Configuration # diff --git a/lib/cachex/router.ex b/lib/cachex/router.ex index 8a8a77c..61c886d 100644 --- a/lib/cachex/router.ex +++ b/lib/cachex/router.ex @@ -15,7 +15,7 @@ defmodule Cachex.Router do @doc """ Initialize a routing state using a list of nodes. """ - @callback init(nodes :: [atom], options :: Keyword.t()) :: any + @callback new(nodes :: [atom], options :: Keyword.t()) :: any @doc """ Retrieve the list of nodes from a routing state. diff --git a/lib/cachex/router/jump.ex b/lib/cachex/router/jump.ex index bf3ac6a..f09cf75 100644 --- a/lib/cachex/router/jump.ex +++ b/lib/cachex/router/jump.ex @@ -18,9 +18,12 @@ defmodule Cachex.Router.Jump do In the case of this router the routing state is simply the list of nodes being tracked, with duplicate entries removed. """ - @spec init(nodes :: [atom], options :: Keyword.t()) :: [atom] - def init(nodes, _options), - do: Enum.uniq(nodes) + @spec new(nodes :: [atom], options :: Keyword.t()) :: [atom] + def new(nodes, _options \\ []) do + nodes + |> Enum.uniq() + |> Enum.sort() + end @doc """ Retrieve the list of nodes from a routing state. diff --git a/lib/cachex/router/ring.ex b/lib/cachex/router/ring.ex index 6793723..ed06d63 100644 --- a/lib/cachex/router/ring.ex +++ b/lib/cachex/router/ring.ex @@ -12,7 +12,8 @@ defmodule Cachex.Router.Ring do @doc """ Initialize a ring using a list of nodes. """ - def init(nodes, _options \\ []) do + @spec new(nodes :: [atom], options :: Keyword.t()) :: HashRing.t() + def new(nodes, _options \\ []) do ring = HashRing.new() ring = HashRing.add_nodes(ring, nodes) ring @@ -21,20 +22,32 @@ defmodule Cachex.Router.Ring do @doc """ Retrieve the list of nodes from a ring. """ - defdelegate nodes(ring), to: HashRing, as: :nodes + @spec nodes(ring :: HashRing.t()) :: [atom] + def nodes(ring) do + ring + |> HashRing.nodes() + |> Enum.uniq() + |> Enum.sort() + end @doc """ Route a provided key to a node in a ring. """ - defdelegate route(ring, key), to: HashRing, as: :key_to_node + @spec route(ring :: HashRing.t(), key :: any) :: atom + def route(ring, key), + do: HashRing.key_to_node(ring, key) @doc """ Attach a new node to a ring. """ - defdelegate attach(ring, node), to: HashRing, as: :add_node + @spec attach(ring :: HashRing.t(), node :: atom) :: HashRing.t() + def attach(ring, node), + do: HashRing.add_node(ring, node) @doc """ Detach an existing node to a ring. """ - defdelegate detach(ring, node), to: HashRing, as: :remove_node + @spec detach(ring :: HashRing.t(), node :: atom) :: HashRing.t() + def detach(ring, node), + do: HashRing.remove_node(ring, node) end diff --git a/lib/cachex/services/conductor.ex b/lib/cachex/services/conductor.ex index a357676..965ef2f 100644 --- a/lib/cachex/services/conductor.ex +++ b/lib/cachex/services/conductor.ex @@ -44,6 +44,7 @@ defmodule Cachex.Services.Conductor do more easily. """ defmacro route(cache, {action, _arguments} = call) do + # coveralls-ignore-start act_name = action |> Kernel.to_string() @@ -51,6 +52,7 @@ defmodule Cachex.Services.Conductor do |> Macro.camelize() act_join = :"Elixir.Cachex.Actions.#{act_name}" + # coveralls-ignore-stop quote do Overseer.enforce unquote(cache) do diff --git a/lib/cachex/spec/validator.ex b/lib/cachex/spec/validator.ex index 925be3d..80d0d13 100644 --- a/lib/cachex/spec/validator.ex +++ b/lib/cachex/spec/validator.ex @@ -140,7 +140,7 @@ defmodule Cachex.Spec.Validator do check1 = behaviour?(module, Cachex.Router) check2 = check1 and is_boolean(enabled) - check3 = check2 and {:init, 2} in module.__info__(:functions) + check3 = check2 and {:new, 2} in module.__info__(:functions) check4 = check3 and {:nodes, 1} in module.__info__(:functions) check5 = check4 and {:route, 2} in module.__info__(:functions) check6 = check5 and {:attach, 2} in module.__info__(:functions) diff --git a/test/cachex/actions/transaction_test.exs b/test/cachex/actions/transaction_test.exs index 8736104..559049e 100644 --- a/test/cachex/actions/transaction_test.exs +++ b/test/cachex/actions/transaction_test.exs @@ -88,7 +88,8 @@ defmodule Cachex.Actions.TransactionTest do {cache, _nodes} = Helper.create_cache_cluster(2) # we know that 2 & 3 hash to the same slots - {:ok, result} = Cachex.transaction(cache, [2, 3], &:erlang.phash2/1) + {:ok, result} = Cachex.transaction(cache, [], &:erlang.phash2/1) + {:ok, ^result} = Cachex.transaction(cache, [2, 3], &:erlang.phash2/1) # check the result phashed ok assert(result > 0 && is_integer(result)) diff --git a/test/cachex/options_test.exs b/test/cachex/options_test.exs index d064191..9e7a400 100644 --- a/test/cachex/options_test.exs +++ b/test/cachex/options_test.exs @@ -135,27 +135,6 @@ defmodule Cachex.OptionsTest do refute comp3 end - # This test will verify the parsing of compression flags to determine whether - # a cache has them enabled or disabled. This is simply checking whether the flag - # is set to true or false, and the default. - test "parsing :ordered flags" do - # grab a cache name - name = Helper.create_name() - - # parse our values as options - {:ok, cache(ordered: ordered1)} = - Cachex.Options.parse(name, ordered: true) - - {:ok, cache(ordered: ordered2)} = - Cachex.Options.parse(name, ordered: false) - - {:ok, cache(ordered: ordered3)} = Cachex.Options.parse(name, []) - - assert ordered1 - refute ordered2 - refute ordered3 - end - # This test verifies the parsing of TTL related flags. We have to test various # combinations of :ttl_interval and :default_ttl to verify each state correctly. test "parsing :expiration flags" do @@ -345,6 +324,53 @@ defmodule Cachex.OptionsTest do assert(msg == :invalid_limit) end + # This test will verify the parsing of compression flags to determine whether + # a cache has them enabled or disabled. This is simply checking whether the flag + # is set to true or false, and the default. + test "parsing :ordered flags" do + # grab a cache name + name = Helper.create_name() + + # parse our values as options + {:ok, cache(ordered: ordered1)} = + Cachex.Options.parse(name, ordered: true) + + {:ok, cache(ordered: ordered2)} = + Cachex.Options.parse(name, ordered: false) + + {:ok, cache(ordered: ordered3)} = Cachex.Options.parse(name, []) + + assert ordered1 + refute ordered2 + refute ordered3 + end + + # This test will ensure that we can parse router values successfully. Routers + # can be provided as either an atom module name, or a router struct. + test "parsing :router flags" do + # grab a cache name + name = Helper.create_name() + + # parse out valid router combinations + {:ok, cache(router: router1)} = Cachex.Options.parse(name, []) + + {:ok, cache(router: router2)} = + Cachex.Options.parse(name, router: Cachex.Router.Ring) + + # parse out invalid hook combinations + {:error, msg} = Cachex.Options.parse(name, router: "[router]") + {:error, ^msg} = Cachex.Options.parse(name, router: router(module: Missing)) + + # check the router for the first state and the default value + assert(router1 == router(module: Cachex.Router.Jump)) + + # check the router in the second state + assert(router2 == router(module: Cachex.Router.Ring)) + + # check the invalid router message + assert(msg == :invalid_router) + end + # This test will verify the ability to record stats in a state. This option # will just add the Cachex Stats hook to the list of hooks inside the cache. # We just need to verify that the hook is added after being parsed. diff --git a/test/cachex/policy/lrw/evented_test.exs b/test/cachex/policy/lrw/evented_test.exs index ec7c518..bcc9891 100644 --- a/test/cachex/policy/lrw/evented_test.exs +++ b/test/cachex/policy/lrw/evented_test.exs @@ -47,7 +47,7 @@ defmodule Cachex.Policy.LRW.EventedTest do # retrieve the cache state state = Services.Overseer.retrieve(cache) - # add 1000 keys to the cache + # add 100 keys to the cache for x <- 1..100 do # add the entry to the cache {:ok, true} = Cachex.put(state, x, x) @@ -65,6 +65,18 @@ defmodule Cachex.Policy.LRW.EventedTest do # flush all existing hook events Helper.flush() + # run a no-op fetch to verify no change + {:ignore, nil} = + Cachex.fetch(state, 101, fn -> + {:ignore, nil} + end) + + # retrieve the cache size + size2 = Cachex.size!(cache) + + # verify the cache size + assert(size2 == 100) + # add a new key to the cache to trigger evictions {:ok, true} = Cachex.put(state, 101, 101) diff --git a/test/cachex/router/jump_test.exs b/test/cachex/router/jump_test.exs new file mode 100644 index 0000000..3fcafb9 --- /dev/null +++ b/test/cachex/router/jump_test.exs @@ -0,0 +1,29 @@ +defmodule Cachex.Router.JumpTest do + use CachexCase + + test "routing keys within a jump router" do + # create a router from three node names + router = Router.Jump.new([:a, :b, :c]) + + # test that we can route to expected nodes + assert Router.Jump.nodes(router) == [:a, :b, :c] + assert Router.Jump.route(router, "elixir") == :b + assert Router.Jump.route(router, "erlang") == :c + end + + test "attaching a node causes an error" do + assert_raise(RuntimeError, "Router does not support node addition", fn -> + [node()] + |> Router.Jump.new() + |> Router.Jump.attach(node()) + end) + end + + test "detaching a node causes an error" do + assert_raise(RuntimeError, "Router does not support node removal", fn -> + [node()] + |> Router.Jump.new() + |> Router.Jump.detach(node()) + end) + end +end diff --git a/test/cachex/router/ring_test.exs b/test/cachex/router/ring_test.exs new file mode 100644 index 0000000..b103252 --- /dev/null +++ b/test/cachex/router/ring_test.exs @@ -0,0 +1,42 @@ +defmodule Cachex.Router.RingTest do + use CachexCase + + test "routing keys within a ring router" do + # create a router from three node names + router = Router.Ring.new([:a, :b, :c]) + + # test that we can route to expected nodes + assert Router.Ring.nodes(router) == [:a, :b, :c] + assert Router.Ring.route(router, "elixir") == :c + assert Router.Ring.route(router, "erlang") == :b + end + + test "attaching and detaching node in a ring router" do + # create a router from three node names + router = Router.Ring.new([:a, :b, :c]) + + # verify the routing of various keys + assert Router.Ring.nodes(router) == [:a, :b, :c] + assert Router.Ring.route(router, "elixir") == :c + assert Router.Ring.route(router, "erlang") == :b + assert Router.Ring.route(router, "fsharp") == :c + + # attach a new node :d to the router + router = Router.Ring.attach(router, :d) + + # route the same keys again, fsharp is resharded + assert Router.Ring.nodes(router) == [:a, :b, :c, :d] + assert Router.Ring.route(router, "elixir") == :c + assert Router.Ring.route(router, "erlang") == :b + assert Router.Ring.route(router, "fsharp") == :d + + # remove the node :d from the router + router = Router.Ring.detach(router, :d) + + # the key fsharp is routed back to the initial + assert Router.Ring.nodes(router) == [:a, :b, :c] + assert Router.Ring.route(router, "elixir") == :c + assert Router.Ring.route(router, "erlang") == :b + assert Router.Ring.route(router, "fsharp") == :c + end +end diff --git a/test/cachex/router_test.exs b/test/cachex/router_test.exs new file mode 100644 index 0000000..835d2d1 --- /dev/null +++ b/test/cachex/router_test.exs @@ -0,0 +1,35 @@ +defmodule Cachex.RouterTest do + use CachexCase + + test "default Router implementations" do + # create a router from three node names + router = __MODULE__.DefaultRouter.new([:a, :b, :c]) + + # check we can route and fetch the node list + assert __MODULE__.DefaultRouter.nodes(router) == [:a, :b, :c] + assert __MODULE__.DefaultRouter.route(router, "") == :a + + # verify that addition of a node is not applicable by default + assert_raise(RuntimeError, "Router does not support node addition", fn -> + __MODULE__.DefaultRouter.attach(router, node()) + end) + + # verify that removal of a node is not applicable by default + assert_raise(RuntimeError, "Router does not support node removal", fn -> + __MODULE__.DefaultRouter.detach(router, node()) + end) + end + + defmodule DefaultRouter do + use Cachex.Router + + def new(nodes, _opts \\ []), + do: nodes + + def nodes(nodes), + do: nodes + + def route(nodes, _key), + do: Enum.at(nodes, 0) + end +end diff --git a/test/lib/cachex_case.ex b/test/lib/cachex_case.ex index 7d8944b..8291b5a 100644 --- a/test/lib/cachex_case.ex +++ b/test/lib/cachex_case.ex @@ -7,6 +7,7 @@ defmodule CachexCase do alias CachexCase.ExecuteHook alias CachexCase.ForwardHook alias CachexCase.Helper + alias Cachex.Router alias Cachex.Services import Cachex.Spec From 77cccca5010802a97dd9cdd7809ab7dcc30d76e6 Mon Sep 17 00:00:00 2001 From: Isaac Whitfield Date: Thu, 22 Aug 2024 08:33:18 -0700 Subject: [PATCH 6/9] Improve reproducibility in fetch test --- test/cachex/actions/fetch_test.exs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/cachex/actions/fetch_test.exs b/test/cachex/actions/fetch_test.exs index d7791ab..e30257d 100644 --- a/test/cachex/actions/fetch_test.exs +++ b/test/cachex/actions/fetch_test.exs @@ -191,8 +191,8 @@ defmodule Cachex.Actions.FetchTest do # create a test agent to hold our test state {:ok, agent} = Agent.start_link(fn -> %{} end) - # execute 100 fetches - for idx <- 1..100 do + # execute 1000 fetches + for idx <- 1..1000 do # with a unique key key = "key_#{idx}" count = System.schedulers_online() * 2 @@ -224,6 +224,6 @@ defmodule Cachex.Actions.FetchTest do end) # all should have been called just once - assert %{1 => 100} == calls + assert %{1 => 1000} == calls end end From 45a03519270028a1bdedf19ee9c905d8dd8bf781 Mon Sep 17 00:00:00 2001 From: Isaac Whitfield Date: Thu, 22 Aug 2024 08:50:14 -0700 Subject: [PATCH 7/9] Add some missing validation coverage --- test/cachex/spec/validator_test.exs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/test/cachex/spec/validator_test.exs b/test/cachex/spec/validator_test.exs index 225c754..5b959f9 100644 --- a/test/cachex/spec/validator_test.exs +++ b/test/cachex/spec/validator_test.exs @@ -206,6 +206,32 @@ defmodule Cachex.Spec.ValidatorTest do refute Validator.valid?(:limit, limit8) end + test "validation of router records" do + # define some valid records + router1 = router(module: Router.Jump) + router2 = router(module: Router.Jump, options: []) + router3 = router(module: Router.Jump, enabled: true) + + # ensure all records are valid + assert Validator.valid?(:router, router1) + assert Validator.valid?(:router, router2) + assert Validator.valid?(:router, router3) + + # define some invalid records + router5 = router(module: " ") + router6 = router(module: :missing) + router7 = router(module: __MODULE__) + router8 = router(module: Router.Jump, options: "") + router9 = router(module: Router.Jump, enabled: "yes") + + # ensure all records are invalid + refute Validator.valid?(:router, router5) + refute Validator.valid?(:router, router6) + refute Validator.valid?(:router, router7) + refute Validator.valid?(:router, router8) + refute Validator.valid?(:router, router9) + end + test "validation of warmer records" do # create a warmer for validation Helper.create_warmer(:validator_warmer, 1000, fn _ -> From 5e5ad3499374c8f9743abffaca8f1831d79bc032 Mon Sep 17 00:00:00 2001 From: Isaac Whitfield Date: Thu, 22 Aug 2024 16:43:04 -0700 Subject: [PATCH 8/9] Enable skipping of tests via @tag --- test/test_helper.exs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_helper.exs b/test/test_helper.exs index 3c6c45a..5f97ddd 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -7,8 +7,8 @@ Application.ensure_all_started(:cachex) |> Enum.filter(&(!File.dir?(&1))) |> Enum.each(&Code.require_file/1) -# start ExUnit! -ExUnit.start() +# start ExUnit with skips +ExUnit.start(exclude: [:skip]) # internal module defmodule TestHelper do From b54a966ac82dbcb96d5bf205a3d094e92cfe7431 Mon Sep 17 00:00:00 2001 From: Isaac Whitfield Date: Sat, 24 Aug 2024 22:25:00 -0700 Subject: [PATCH 9/9] Support managed routers and reshape node options --- lib/cachex.ex | 24 ++------ lib/cachex/actions/transaction.ex | 5 +- lib/cachex/errors.ex | 4 -- lib/cachex/options.ex | 30 ---------- lib/cachex/router.ex | 48 +++++++++------- lib/cachex/router/jump.ex | 42 +++++++++----- lib/cachex/router/local.ex | 23 ++++++++ lib/cachex/router/mod.ex | 56 ++++++++++++++++++ lib/cachex/router/ring.ex | 73 +++++++++++++----------- lib/cachex/services.ex | 16 ++++++ lib/cachex/services/conductor.ex | 26 ++++++++- lib/cachex/spec.ex | 7 +-- lib/cachex/spec/validator.ex | 15 ++--- test/cachex/actions/transaction_test.exs | 4 +- test/cachex/errors_test.exs | 1 - test/cachex/options_test.exs | 6 +- test/cachex/router/jump_test.exs | 56 ++++++++++++------ test/cachex/router/local_test.exs | 19 ++++++ test/cachex/router/mod_test.exs | 47 +++++++++++++++ test/cachex/router/ring_test.exs | 71 ++++++++++++----------- test/cachex/router_test.exs | 32 ----------- test/cachex/services_test.exs | 2 + test/cachex/spec/validator_test.exs | 18 +++--- test/lib/cachex_case.ex | 1 - test/lib/cachex_case/helper.ex | 25 ++++---- 25 files changed, 402 insertions(+), 249 deletions(-) create mode 100644 lib/cachex/router/local.ex create mode 100644 lib/cachex/router/mod.ex create mode 100644 test/cachex/router/local_test.exs create mode 100644 test/cachex/router/mod_test.exs diff --git a/lib/cachex.ex b/lib/cachex.ex index 3131b84..c7951b5 100644 --- a/lib/cachex.ex +++ b/lib/cachex.ex @@ -254,19 +254,6 @@ defmodule Cachex do Please see the `Cachex.Spec.limit/1` documentation for further customization options. - * `:nodes` - - A list of nodes this cache will live on, to provide distributed behaviour across - physical nodes. This should be a list of node names, in the long form. - - iex> Cachex.start_link(:my_cache, [ - ...> nodes: [ - ...> :foo@localhost, - ...> :bar@localhost - ...> ] - ...> ]) - { :ok, _pid } - * `:ordered` This option will specify whether this cache should enable ETS ordering, which can @@ -1339,7 +1326,8 @@ defmodule Cachex do """ @spec transaction(cache, [any], function, Keyword.t()) :: {status, any} def transaction(cache, keys, operation, options \\ []) - when is_function(operation, 1) and is_list(keys) and is_list(options) do + when (is_function(operation) or is_function(operation, 1)) and + is_list(keys) and is_list(options) do Overseer.enforce cache do trans_cache = case cache(cache, :transactions) do @@ -1476,13 +1464,13 @@ defmodule Cachex do # # This will initialize the base router state and attach all nodes # provided at cache startup to the router state. - defp setup_router(cache(nodes: nodes, router: router) = cache) do + defp setup_router(cache(router: router) = cache) do router(module: module, options: options) = router - state = module.new(nodes, options) - local = module.nodes(state) == [node()] + state = module.init(cache, options) + route = router(router, state: state) - {:ok, cache(cache, router: router(router, state: state, enabled: !local))} + {:ok, cache(cache, router: route)} end # Initializes cache warmers on startup. diff --git a/lib/cachex/actions/transaction.ex b/lib/cachex/actions/transaction.ex index 9265c18..ffbb52e 100644 --- a/lib/cachex/actions/transaction.ex +++ b/lib/cachex/actions/transaction.ex @@ -24,7 +24,10 @@ defmodule Cachex.Actions.Transaction do """ def execute(cache() = cache, keys, operation, _options) do Locksmith.transaction(cache, keys, fn -> - {:ok, operation.(cache)} + case :erlang.fun_info(operation)[:arity] do + 0 -> {:ok, operation.()} + 1 -> {:ok, operation.(cache)} + end end) end end diff --git a/lib/cachex/errors.ex b/lib/cachex/errors.ex index 8133e97..661e8a6 100644 --- a/lib/cachex/errors.ex +++ b/lib/cachex/errors.ex @@ -20,7 +20,6 @@ defmodule Cachex.Errors do :invalid_limit, :invalid_match, :invalid_name, - :invalid_nodes, :invalid_option, :invalid_pairs, :invalid_router, @@ -91,9 +90,6 @@ defmodule Cachex.Errors do def long_form(:invalid_name), do: "Invalid cache name provided" - def long_form(:invalid_nodes), - do: "Invalid nodes list provided" - def long_form(:invalid_option), do: "Invalid option syntax provided" diff --git a/lib/cachex/options.ex b/lib/cachex/options.ex index 05de160..4c77107 100644 --- a/lib/cachex/options.ex +++ b/lib/cachex/options.ex @@ -17,7 +17,6 @@ defmodule Cachex.Options do # option parser order @option_parsers [ :name, - :nodes, :limit, :hooks, :router, @@ -286,35 +285,6 @@ defmodule Cachex.Options do defp parse_type(:name, name, _options), do: cache(name: name) - # Configures any nodes assigned to the cache. - # - # This will enforce a non-empty list of nodes, containing at least - # the current local node. The list will be deduplicated, and sorted - # to ensure a deterministic ordering across nodes. - defp parse_type(:nodes, cache, options) do - nodes = - options - |> Keyword.get(:nodes, []) - |> Enum.concat([node()]) - |> Enum.uniq() - |> Enum.sort() - - valid = - nodes - |> List.delete(node()) - |> Enum.all?(&Node.connect/1) - - case valid do - # coveralls-ignore-start - false -> - error(:invalid_nodes) - - true -> - cache(cache, nodes: nodes) - # coveralls-ignore-stop - end - end - # Configures a cache based on ordering flags. # # This will simply configure the `:ordered` field in the cache diff --git a/lib/cachex/router.ex b/lib/cachex/router.ex index 61c886d..d4b9c65 100644 --- a/lib/cachex/router.ex +++ b/lib/cachex/router.ex @@ -8,14 +8,27 @@ defmodule Cachex.Router do on it being included in Cachex. """ + ############## + # Public API # + ############## + + @doc """ + Retrieve all currently connected nodes (including this one). + """ + @spec connected() :: [atom] + def connected(), + do: [node() | :erlang.nodes(:connected)] + ############# # Behaviour # ############# @doc """ - Initialize a routing state using a list of nodes. + Initialize a routing state for a cache. + + Please see all child implementations for supported options. """ - @callback new(nodes :: [atom], options :: Keyword.t()) :: any + @callback init(cache :: Cachex.Spec.cache(), options :: Keyword.t()) :: any @doc """ Retrieve the list of nodes from a routing state. @@ -23,42 +36,35 @@ defmodule Cachex.Router do @callback nodes(state :: any) :: [atom] @doc """ - Route a provided key to a node in a routing state. + Route a key to a node in a routing state. """ @callback route(state :: any, key :: any) :: atom @doc """ - Attach a new node to a routing state. + Create a child specification to back a routing state. """ - @callback attach(state :: any, node :: atom) :: any + @callback spec(cache :: Cachex.Spec.cache(), options :: Keyword.t()) :: + Supervisor.child_spec() - @doc """ - Detach an existing node from a routing state. - """ - @callback detach(state :: any, node :: atom) :: any + ################## + # Implementation # + ################## @doc false defmacro __using__(_) do quote location: :keep, generated: true do - # inherit the behaviour @behaviour Cachex.Router @doc false - def attach(state, node), - do: - raise(RuntimeError, - message: "Router does not support node addition" - ) + def init(cache, options \\ []), + do: nil @doc false - def detach(state, node), - do: - raise(RuntimeError, - message: "Router does not support node removal" - ) + def spec(cache, options), + do: :ignore # state modifiers are overridable - defoverridable attach: 2, detach: 2 + defoverridable init: 2, spec: 2 end end end diff --git a/lib/cachex/router/jump.ex b/lib/cachex/router/jump.ex index f09cf75..e5945bf 100644 --- a/lib/cachex/router/jump.ex +++ b/lib/cachex/router/jump.ex @@ -1,39 +1,51 @@ defmodule Cachex.Router.Jump do @moduledoc """ - Basic routing implementation based on Jump Consistent Hash. + Routing implementation based on Jump Consistent Hash. - This implementation backed Cachex's distribution in the v3.x lineage, - and is suitable for clusters of a static size. Attaching and detaching - nodes after initialization is not supported and will cause an error - if you attempt to do so. + This implementation backed Cachex's distribution in the v3.x lineage, and is + suitable for clusters of a static size. Each key is hashed and then slotted + against a node in the cluster. Please note that the hash algorithm should + not be relied upon and is not considered part of the public API. - For more information on the algorithm backing this router, please - see the appropriate [publication](https://arxiv.org/pdf/1406.2294). + The initialization of this router accepts a `:nodes` option which enables + the user to define the nodes to route amongst. If this is not provided the + router will default to detecting a cluster via `Node.self/0` and `Node.list/2`. + + For more information on the algorithm backing this router, please see the + appropriate [publication](https://arxiv.org/pdf/1406.2294). """ use Cachex.Router + alias Cachex.Router @doc """ - Initialize a routing state using a list of nodes. + Initialize a jump hash routing state for a cache. + + ## Options + + * `:nodes` + + The `:nodes` option allows a user to provide a list of nodes to treat + as a cluster. If this is not provided, the cluster will be inferred + by using `Node.self/1` and `Node.list/2`. - In the case of this router the routing state is simply the list - of nodes being tracked, with duplicate entries removed. """ - @spec new(nodes :: [atom], options :: Keyword.t()) :: [atom] - def new(nodes, _options \\ []) do - nodes + @spec init(cache :: Cachex.Spec.cache(), options :: Keyword.t()) :: [atom] + def init(_cache, options) do + options + |> Keyword.get_lazy(:nodes, &Router.connected/0) |> Enum.uniq() |> Enum.sort() end @doc """ - Retrieve the list of nodes from a routing state. + Retrieve the list of nodes from a jump hash routing state. """ @spec nodes(nodes :: [atom]) :: [atom] def nodes(nodes), do: nodes @doc """ - Route a provided key to a node in a routing state. + Route a key to a node in a jump hash routing state. """ @spec route(nodes :: [atom], key :: any) :: atom def route(nodes, key) do diff --git a/lib/cachex/router/local.ex b/lib/cachex/router/local.ex new file mode 100644 index 0000000..fa012b4 --- /dev/null +++ b/lib/cachex/router/local.ex @@ -0,0 +1,23 @@ +defmodule Cachex.Router.Local do + @moduledoc """ + Routing implementation for the local node. + + This module acts as the base implementation for routing when *not* being + used in a distributed cache. All actions are routed to the current node. + """ + use Cachex.Router + + @doc """ + Retrieve the list of nodes from a local routing state. + """ + @spec nodes(state :: nil) :: [atom] + def nodes(_state), + do: [node()] + + @doc """ + Route a key to a node in a local routing state. + """ + @spec route(state :: nil, key :: any) :: atom + def route(_state, _key), + do: node() +end diff --git a/lib/cachex/router/mod.ex b/lib/cachex/router/mod.ex new file mode 100644 index 0000000..017042c --- /dev/null +++ b/lib/cachex/router/mod.ex @@ -0,0 +1,56 @@ +defmodule Cachex.Router.Mod do + @moduledoc """ + Routing implementation based on basic hashing. + + This router provides the simplest (and quickest!) implementation for + clusters of a static size. Provided keys are hashed and routed to a node + via the modulo operation. Please note that the hash algorithm should + not be relied upon and is not considered part of the public API. + + The initialization of this router accepts a `:nodes` option which enables + the user to define the nodes to route amongst. If this is not provided the + router will default to detecting a cluster via `Node.self/0` and `Node.list/2`. + """ + use Cachex.Router + alias Cachex.Router + + @doc """ + Initialize a modulo routing state for a cache. + + ## Options + + * `:nodes` + + The `:nodes` option allows a user to provide a list of nodes to treat + as a cluster. If this is not provided, the cluster will be inferred + by using `Node.self/1` and `Node.list/2`. + + """ + @spec init(cache :: Cachex.Spec.cache(), options :: Keyword.t()) :: [atom] + def init(_cache, options) do + options + |> Keyword.get_lazy(:nodes, &Router.connected/0) + |> Enum.uniq() + |> Enum.sort() + end + + @doc """ + Retrieve the list of nodes from a modulo routing state. + """ + @spec nodes(nodes :: [atom]) :: [atom] + def nodes(nodes), + do: Enum.sort(nodes) + + @doc """ + Route a key to a node in a modulo routing state. + """ + @spec route(nodes :: [atom], key :: any) :: atom + def route(nodes, key) do + slot = + key + |> :erlang.phash2() + |> rem(length(nodes)) + + Enum.at(nodes, slot) + end +end diff --git a/lib/cachex/router/ring.ex b/lib/cachex/router/ring.ex index ed06d63..687e6e3 100644 --- a/lib/cachex/router/ring.ex +++ b/lib/cachex/router/ring.ex @@ -1,53 +1,58 @@ defmodule Cachex.Router.Ring do @moduledoc """ - Simple routing implementation based on a consistent hash ring. + Routing implementation using a consistent hash ring. - This implementation makes use of a hashing ring to better enable - modification of the internal node listing. Cachex uses the library - [libring](https://github.com/bitwalker/libring) to do the heavy - lifting here. + This router provides the most resilient routing for a distributed cache, + due to being much more resilient to addition and removal of nodes in + the cluster. Most distributed caches will end up using this router if + they have the requirement to handle such cases. + + The implementation inside this router is entirely provided by the + [libring](https://github.com/bitwalker/libring) library. As such the + initialization of this router will accept all options available when + calling `HashRing.Managed.new/2`. + + The documentation (pinned at the version used by Cachex) can be found + [here](https://hexdocs.pm/libring/1.6.0/HashRing.Managed.html#new/2). """ use Cachex.Router + import Cachex.Spec @doc """ - Initialize a ring using a list of nodes. - """ - @spec new(nodes :: [atom], options :: Keyword.t()) :: HashRing.t() - def new(nodes, _options \\ []) do - ring = HashRing.new() - ring = HashRing.add_nodes(ring, nodes) - ring - end + Initialize a ring routing state for a cache. - @doc """ - Retrieve the list of nodes from a ring. + To see the list of options supported for this call, please visit the `libring` + [documentation](https://hexdocs.pm/libring/1.6.0/HashRing.Managed.html#new/2). """ - @spec nodes(ring :: HashRing.t()) :: [atom] - def nodes(ring) do - ring - |> HashRing.nodes() - |> Enum.uniq() - |> Enum.sort() - end + @spec init(cache :: Cachex.Spec.cache(), options :: Keyword.t()) :: + HashRing.Managed.ring() + def init(cache(name: name), _options), + do: name @doc """ - Route a provided key to a node in a ring. + Retrieve the list of nodes from a routing state. """ - @spec route(ring :: HashRing.t(), key :: any) :: atom - def route(ring, key), - do: HashRing.key_to_node(ring, key) + @spec nodes(ring :: HashRing.Managed.ring()) :: [atom] + defdelegate nodes(ring), to: HashRing.Managed @doc """ - Attach a new node to a ring. + Route a key to a node in a ring routing state. """ - @spec attach(ring :: HashRing.t(), node :: atom) :: HashRing.t() - def attach(ring, node), - do: HashRing.add_node(ring, node) + @spec route(ring :: HashRing.Managed.ring(), key :: any) :: atom + defdelegate route(ring, key), to: HashRing.Managed, as: :key_to_node @doc """ - Detach an existing node to a ring. + Create a child specification to back a ring routing state. """ - @spec detach(ring :: HashRing.t(), node :: atom) :: HashRing.t() - def detach(ring, node), - do: HashRing.remove_node(ring, node) + @spec spec(cache :: Cachex.Spec.cache(), options :: Keyword.t()) :: + Supervisor.child_spec() + def spec(cache(name: name), options), + do: [ + %{ + id: name, + type: :worker, + restart: :permanent, + start: {HashRing.Worker, :start_link, [[{:name, name} | options]]} + } + ] end diff --git a/lib/cachex/services.ex b/lib/cachex/services.ex index c3e7c69..d4d885f 100644 --- a/lib/cachex/services.ex +++ b/lib/cachex/services.ex @@ -56,6 +56,7 @@ defmodule Cachex.Services do |> Enum.concat(locksmith_spec(cache)) |> Enum.concat(informant_spec(cache)) |> Enum.concat(incubator_spec(cache)) + |> Enum.concat(conductor_spec(cache)) |> Enum.concat(courier_spec(cache)) |> Enum.concat(janitor_spec(cache)) end @@ -117,6 +118,21 @@ defmodule Cachex.Services do # Private API # ############### + # Creates a specification for the Conductor service. + # + # The Conductor service provides a way to dispatch cache calls between + # nodes in a distributed cluster. It's a little complicated because a + # Conductor's routing logic can be either a separate process or the + # same process to avoid unnecessary overhead. If this makes no sense + # when you come to read it, that's probably why. + defp conductor_spec(cache() = cache), + do: [ + %{ + id: Services.Conductor, + start: {Services.Conductor, :start_link, [cache]} + } + ] + # Creates a specification for the Courier service. # # The courier acts as a synchronised way to retrieve values computed via diff --git a/lib/cachex/services/conductor.ex b/lib/cachex/services/conductor.ex index 965ef2f..740b2aa 100644 --- a/lib/cachex/services/conductor.ex +++ b/lib/cachex/services/conductor.ex @@ -8,6 +8,7 @@ defmodule Cachex.Services.Conductor do noisy. Now that all execution flows via the router, this is no longer an issue and it also serves as a gateway to distribution in the future. """ + alias Cachex.Router alias Cachex.Services # add some service aliases @@ -23,6 +24,27 @@ defmodule Cachex.Services.Conductor do # Public API # ############## + @doc """ + Starts a new Conductor process for a cache. + """ + @spec start_link(Cachex.Spec.cache()) :: Supervisor.on_start() + def start_link(cache(router: router(module: mod, options: opts)) = cache) do + case mod.spec(cache, opts) do + :ignore -> + :ignore + + specification -> + Supervisor.start_link(specification, strategy: :one_for_one) + end + end + + @doc """ + Retrieve all routable nodes for a cache. + """ + @spec nodes(Cachex.Spec.cache()) :: {:ok, [atom]} + def nodes(cache(router: router(module: module, state: state))), + do: {:ok, module.nodes(state)} + @doc """ Executes a previously dispatched action. @@ -30,10 +52,10 @@ defmodule Cachex.Services.Conductor do public is due to the code injected by the `dispatch/2` macro. """ @spec route(Cachex.Spec.cache(), atom, {atom, [any]}) :: any - def route(cache(router: router(enabled: false)) = cache, module, call), + def route(cache(router: router(module: Router.Local)) = cache, module, call), do: route_local(cache, module, call) - def route(cache(router: router(enabled: true)) = cache, module, call), + def route(cache() = cache, module, call), do: route_cluster(cache, module, call) @doc """ diff --git a/lib/cachex/spec.ex b/lib/cachex/spec.ex index 5572f5d..bc7cb4b 100644 --- a/lib/cachex/spec.ex +++ b/lib/cachex/spec.ex @@ -33,7 +33,6 @@ defmodule Cachex.Spec do fallback: fallback, hooks: hooks, limit: limit, - nodes: [atom], ordered: boolean, router: router, transactions: boolean, @@ -101,7 +100,7 @@ defmodule Cachex.Spec do # Record specification for a router instance @type router :: record(:router, - enabled: boolean, + options: Keyword.t(), module: atom, state: any ) @@ -135,7 +134,6 @@ defmodule Cachex.Spec do fallback: nil, hooks: nil, limit: nil, - nodes: [], ordered: false, router: nil, transactions: false, @@ -268,9 +266,8 @@ defmodule Cachex.Spec do values inside this structure are for internal use and will be overwritten as needed. """ defrecord :router, - enabled: false, options: [], - module: Cachex.Router.Jump, + module: Cachex.Router.Local, state: nil @doc """ diff --git a/lib/cachex/spec/validator.ex b/lib/cachex/spec/validator.ex index 80d0d13..685e120 100644 --- a/lib/cachex/spec/validator.ex +++ b/lib/cachex/spec/validator.ex @@ -136,17 +136,14 @@ defmodule Cachex.Spec.Validator do # This will validate the correctly implemented `Cachex.Router` behaviour # and confirm that the provided options are a keyword list. def valid?(:router, router() = router) do - router(options: options, module: module, enabled: enabled) = router + router(options: options, module: module) = router check1 = behaviour?(module, Cachex.Router) - check2 = check1 and is_boolean(enabled) - check3 = check2 and {:new, 2} in module.__info__(:functions) - check4 = check3 and {:nodes, 1} in module.__info__(:functions) - check5 = check4 and {:route, 2} in module.__info__(:functions) - check6 = check5 and {:attach, 2} in module.__info__(:functions) - check7 = check6 and {:detach, 2} in module.__info__(:functions) - check8 = check7 and Keyword.keyword?(options) - check8 + check2 = check1 and {:init, 2} in module.__info__(:functions) + check3 = check2 and {:nodes, 1} in module.__info__(:functions) + check4 = check3 and {:route, 2} in module.__info__(:functions) + check5 = check4 and Keyword.keyword?(options) + check5 end # Validates a warmer specification record. diff --git a/test/cachex/actions/transaction_test.exs b/test/cachex/actions/transaction_test.exs index 559049e..11e9b6c 100644 --- a/test/cachex/actions/transaction_test.exs +++ b/test/cachex/actions/transaction_test.exs @@ -38,7 +38,7 @@ defmodule Cachex.Actions.TransactionTest do # execute a broken transaction result1 = - Cachex.transaction(cache, [], fn _state -> + Cachex.transaction(cache, [], fn -> raise ArgumentError, message: "Error message" end) @@ -47,7 +47,7 @@ defmodule Cachex.Actions.TransactionTest do # ensure a new transaction executes normally result2 = - Cachex.transaction(cache, [], fn _state -> + Cachex.transaction(cache, [], fn -> Cachex.Services.Locksmith.transaction?() end) diff --git a/test/cachex/errors_test.exs b/test/cachex/errors_test.exs index a5c598c..7a23c59 100644 --- a/test/cachex/errors_test.exs +++ b/test/cachex/errors_test.exs @@ -14,7 +14,6 @@ defmodule Cachex.ErrorsTest do invalid_limit: "Invalid limit fields provided", invalid_match: "Invalid match specification provided", invalid_name: "Invalid cache name provided", - invalid_nodes: "Invalid nodes list provided", invalid_option: "Invalid option syntax provided", invalid_pairs: "Invalid insertion pairs provided", invalid_router: "Invalid router definition provided", diff --git a/test/cachex/options_test.exs b/test/cachex/options_test.exs index 9e7a400..15869d9 100644 --- a/test/cachex/options_test.exs +++ b/test/cachex/options_test.exs @@ -355,17 +355,17 @@ defmodule Cachex.OptionsTest do {:ok, cache(router: router1)} = Cachex.Options.parse(name, []) {:ok, cache(router: router2)} = - Cachex.Options.parse(name, router: Cachex.Router.Ring) + Cachex.Options.parse(name, router: Cachex.Router.Mod) # parse out invalid hook combinations {:error, msg} = Cachex.Options.parse(name, router: "[router]") {:error, ^msg} = Cachex.Options.parse(name, router: router(module: Missing)) # check the router for the first state and the default value - assert(router1 == router(module: Cachex.Router.Jump)) + assert(router1 == router(module: Cachex.Router.Local)) # check the router in the second state - assert(router2 == router(module: Cachex.Router.Ring)) + assert(router2 == router(module: Cachex.Router.Mod)) # check the invalid router message assert(msg == :invalid_router) diff --git a/test/cachex/router/jump_test.exs b/test/cachex/router/jump_test.exs index 3fcafb9..efdef97 100644 --- a/test/cachex/router/jump_test.exs +++ b/test/cachex/router/jump_test.exs @@ -1,29 +1,47 @@ defmodule Cachex.Router.JumpTest do use CachexCase - test "routing keys within a jump router" do - # create a router from three node names - router = Router.Jump.new([:a, :b, :c]) + test "routing keys via a jump router" do + # create a test cache cluster for nodes + {cache, nodes} = + Helper.create_cache_cluster(3, + router: Cachex.Router.Jump + ) + + # convert the name to a cache and sort + cache = Services.Overseer.retrieve(cache) + nodes = Enum.sort(nodes) + + # fetch the router state after initialize + cache(router: router(state: state)) = cache # test that we can route to expected nodes - assert Router.Jump.nodes(router) == [:a, :b, :c] - assert Router.Jump.route(router, "elixir") == :b - assert Router.Jump.route(router, "erlang") == :c + assert Services.Conductor.nodes(cache) == {:ok, nodes} + assert Cachex.Router.Jump.route(state, "elixir") == Enum.at(nodes, 1) + assert Cachex.Router.Jump.route(state, "erlang") == Enum.at(nodes, 2) end - test "attaching a node causes an error" do - assert_raise(RuntimeError, "Router does not support node addition", fn -> - [node()] - |> Router.Jump.new() - |> Router.Jump.attach(node()) - end) - end + test "routing keys via a jump router with defined nodes" do + # create our nodes + nodes = [:a, :b, :c] + + # create router from nodes + router = + router( + module: Cachex.Router.Jump, + options: [nodes: nodes] + ) - test "detaching a node causes an error" do - assert_raise(RuntimeError, "Router does not support node removal", fn -> - [node()] - |> Router.Jump.new() - |> Router.Jump.detach(node()) - end) + # create a test cache and fetch back + cache = Helper.create_cache(router: router) + cache = Services.Overseer.retrieve(cache) + + # fetch the router state after initialize + cache(router: router(state: state)) = cache + + # test that we can route to expected nodes + assert Services.Conductor.nodes(cache) == {:ok, nodes} + assert Cachex.Router.Jump.route(state, "elixir") == Enum.at(nodes, 1) + assert Cachex.Router.Jump.route(state, "erlang") == Enum.at(nodes, 2) end end diff --git a/test/cachex/router/local_test.exs b/test/cachex/router/local_test.exs new file mode 100644 index 0000000..1ba0ae0 --- /dev/null +++ b/test/cachex/router/local_test.exs @@ -0,0 +1,19 @@ +defmodule Cachex.Router.LocalTest do + use CachexCase + + test "routing keys via a local router" do + # create a test cache + cache = Helper.create_cache(router: Cachex.Router.Local) + + # convert the name to a cache and sort + cache = Services.Overseer.retrieve(cache) + + # fetch the router state after initialize + cache(router: router(state: state)) = cache + + # test that we can route to expected nodes + assert Services.Conductor.nodes(cache) == {:ok, [node()]} + assert Cachex.Router.Local.route(state, "elixir") == node() + assert Cachex.Router.Local.route(state, "erlang") == node() + end +end diff --git a/test/cachex/router/mod_test.exs b/test/cachex/router/mod_test.exs new file mode 100644 index 0000000..dde6b92 --- /dev/null +++ b/test/cachex/router/mod_test.exs @@ -0,0 +1,47 @@ +defmodule Cachex.Router.ModTest do + use CachexCase + + test "routing keys via a modulo router" do + # create a test cache cluster for nodes + {cache, nodes} = + Helper.create_cache_cluster(3, + router: Cachex.Router.Mod + ) + + # convert the name to a cache and sort + cache = Services.Overseer.retrieve(cache) + nodes = Enum.sort(nodes) + + # fetch the router state after initialize + cache(router: router(state: state)) = cache + + # test that we can route to expected nodes + assert Services.Conductor.nodes(cache) == {:ok, nodes} + assert Cachex.Router.Mod.route(state, "elixir") == Enum.at(nodes, 1) + assert Cachex.Router.Mod.route(state, "erlang") == Enum.at(nodes, 0) + end + + test "routing keys via a modulo router with defined nodes" do + # create our nodes + nodes = [:a, :b, :c] + + # create router from nodes + router = + router( + module: Cachex.Router.Jump, + options: [nodes: nodes] + ) + + # create a test cache and fetch back + cache = Helper.create_cache(router: router) + cache = Services.Overseer.retrieve(cache) + + # fetch the router state after initialize + cache(router: router(state: state)) = cache + + # test that we can route to expected nodes + assert Services.Conductor.nodes(cache) == {:ok, nodes} + assert Cachex.Router.Mod.route(state, "elixir") == Enum.at(nodes, 1) + assert Cachex.Router.Mod.route(state, "erlang") == Enum.at(nodes, 0) + end +end diff --git a/test/cachex/router/ring_test.exs b/test/cachex/router/ring_test.exs index b103252..bff9506 100644 --- a/test/cachex/router/ring_test.exs +++ b/test/cachex/router/ring_test.exs @@ -1,42 +1,49 @@ defmodule Cachex.Router.RingTest do use CachexCase - test "routing keys within a ring router" do - # create a router from three node names - router = Router.Ring.new([:a, :b, :c]) + test "routing keys via a ring router" do + # create a test cache cluster for nodes + {cache, _nodes} = + Helper.create_cache_cluster(3, + router: + router( + module: Cachex.Router.Ring, + options: [ + nodes: [:a, :b, :c] + ] + ) + ) + + # convert the name to a cache and sort + cache = Services.Overseer.retrieve(cache) + + # fetch the router state after initialize + cache(router: router(state: state)) = cache # test that we can route to expected nodes - assert Router.Ring.nodes(router) == [:a, :b, :c] - assert Router.Ring.route(router, "elixir") == :c - assert Router.Ring.route(router, "erlang") == :b + assert Services.Conductor.nodes(cache) == {:ok, [:c, :b, :a]} + assert Cachex.Router.Ring.route(state, "elixir") == :c + assert Cachex.Router.Ring.route(state, "erlang") == :b end - test "attaching and detaching node in a ring router" do - # create a router from three node names - router = Router.Ring.new([:a, :b, :c]) + test "routing keys via a ring router with monitored nodes" do + # create a test cache cluster for nodes + {cache, nodes} = + Helper.create_cache_cluster(3, + router: + router( + module: Cachex.Router.Ring, + options: [ + monitor_nodes: true + ] + ) + ) + + # convert the name to a cache and sort + cache = Services.Overseer.retrieve(cache) + nodes = Enum.reverse(nodes) - # verify the routing of various keys - assert Router.Ring.nodes(router) == [:a, :b, :c] - assert Router.Ring.route(router, "elixir") == :c - assert Router.Ring.route(router, "erlang") == :b - assert Router.Ring.route(router, "fsharp") == :c - - # attach a new node :d to the router - router = Router.Ring.attach(router, :d) - - # route the same keys again, fsharp is resharded - assert Router.Ring.nodes(router) == [:a, :b, :c, :d] - assert Router.Ring.route(router, "elixir") == :c - assert Router.Ring.route(router, "erlang") == :b - assert Router.Ring.route(router, "fsharp") == :d - - # remove the node :d from the router - router = Router.Ring.detach(router, :d) - - # the key fsharp is routed back to the initial - assert Router.Ring.nodes(router) == [:a, :b, :c] - assert Router.Ring.route(router, "elixir") == :c - assert Router.Ring.route(router, "erlang") == :b - assert Router.Ring.route(router, "fsharp") == :c + # test that we can route to expected nodes + assert Services.Conductor.nodes(cache) == {:ok, nodes} end end diff --git a/test/cachex/router_test.exs b/test/cachex/router_test.exs index 835d2d1..e4bf477 100644 --- a/test/cachex/router_test.exs +++ b/test/cachex/router_test.exs @@ -1,35 +1,3 @@ defmodule Cachex.RouterTest do use CachexCase - - test "default Router implementations" do - # create a router from three node names - router = __MODULE__.DefaultRouter.new([:a, :b, :c]) - - # check we can route and fetch the node list - assert __MODULE__.DefaultRouter.nodes(router) == [:a, :b, :c] - assert __MODULE__.DefaultRouter.route(router, "") == :a - - # verify that addition of a node is not applicable by default - assert_raise(RuntimeError, "Router does not support node addition", fn -> - __MODULE__.DefaultRouter.attach(router, node()) - end) - - # verify that removal of a node is not applicable by default - assert_raise(RuntimeError, "Router does not support node removal", fn -> - __MODULE__.DefaultRouter.detach(router, node()) - end) - end - - defmodule DefaultRouter do - use Cachex.Router - - def new(nodes, _opts \\ []), - do: nodes - - def nodes(nodes), - do: nodes - - def route(nodes, _key), - do: Enum.at(nodes, 0) - end end diff --git a/test/cachex/services_test.exs b/test/cachex/services_test.exs index 7a7c8f8..c178484 100644 --- a/test/cachex/services_test.exs +++ b/test/cachex/services_test.exs @@ -22,6 +22,7 @@ defmodule Cachex.ServicesTest do }, %{id: Services.Informant, start: {Services.Informant, _, _}}, %{id: Services.Incubator, start: {Services.Incubator, _, _}}, + %{id: Services.Conductor, start: {Services.Conductor, _, _}}, %{id: Services.Courier, start: {Services.Courier, _, _}}, %{id: Services.Janitor, start: {Services.Janitor, _, _}} ] = Services.cache_spec(cache) @@ -41,6 +42,7 @@ defmodule Cachex.ServicesTest do }, %{id: Services.Informant, start: {Services.Informant, _, _}}, %{id: Services.Incubator, start: {Services.Incubator, _, _}}, + %{id: Services.Conductor, start: {Services.Conductor, _, _}}, %{id: Services.Courier, start: {Services.Courier, _, _}} ] = Services.cache_spec(cache) end diff --git a/test/cachex/spec/validator_test.exs b/test/cachex/spec/validator_test.exs index 5b959f9..1070791 100644 --- a/test/cachex/spec/validator_test.exs +++ b/test/cachex/spec/validator_test.exs @@ -208,9 +208,9 @@ defmodule Cachex.Spec.ValidatorTest do test "validation of router records" do # define some valid records - router1 = router(module: Router.Jump) - router2 = router(module: Router.Jump, options: []) - router3 = router(module: Router.Jump, enabled: true) + router1 = router() + router2 = router(module: Cachex.Router.Jump) + router3 = router(module: Cachex.Router.Jump, options: []) # ensure all records are valid assert Validator.valid?(:router, router1) @@ -218,18 +218,16 @@ defmodule Cachex.Spec.ValidatorTest do assert Validator.valid?(:router, router3) # define some invalid records - router5 = router(module: " ") - router6 = router(module: :missing) - router7 = router(module: __MODULE__) - router8 = router(module: Router.Jump, options: "") - router9 = router(module: Router.Jump, enabled: "yes") + router4 = router(module: " ") + router5 = router(module: :missing) + router6 = router(module: __MODULE__) + router7 = router(module: Cachex.Router.Jump, options: "") # ensure all records are invalid + refute Validator.valid?(:router, router4) refute Validator.valid?(:router, router5) refute Validator.valid?(:router, router6) refute Validator.valid?(:router, router7) - refute Validator.valid?(:router, router8) - refute Validator.valid?(:router, router9) end test "validation of warmer records" do diff --git a/test/lib/cachex_case.ex b/test/lib/cachex_case.ex index 8291b5a..7d8944b 100644 --- a/test/lib/cachex_case.ex +++ b/test/lib/cachex_case.ex @@ -7,7 +7,6 @@ defmodule CachexCase do alias CachexCase.ExecuteHook alias CachexCase.ForwardHook alias CachexCase.Helper - alias Cachex.Router alias Cachex.Services import Cachex.Spec diff --git a/test/lib/cachex_case/helper.ex b/test/lib/cachex_case/helper.ex index 1fb5171..1d6a188 100644 --- a/test/lib/cachex_case/helper.ex +++ b/test/lib/cachex_case/helper.ex @@ -37,14 +37,19 @@ defmodule CachexCase.Helper do nodes = [node() | LocalCluster.start_nodes(name, amount - 1)] # basic match to ensure that the result is as expected - {[{:ok, _pid1}, {:ok, _pid2}], []} = + {results, []} = :rpc.multicall( nodes, Cachex, :start, - [name, [nodes: nodes] ++ args] + [name, args ++ [router: router(module: Cachex.Router.Jump)]] ) + # double check all pids + for result <- results do + assert match?({:ok, pid} when is_pid(pid), result) + end + # cleanup the cache on exit TestHelper.delete_on_exit(name) @@ -53,6 +58,8 @@ defmodule CachexCase.Helper do nodes |> List.delete(node()) |> LocalCluster.stop_nodes() + + poll(250, [], fn -> Node.list(:connected) end) end) {name, nodes} @@ -82,13 +89,11 @@ defmodule CachexCase.Helper do @doc false # Creates a cache name. # - # These names are atoms of 8 random characters between the letters A - Z. This - # is used to generate random cache names for tests. - def create_name do - 8 - |> gen_rand_bytes - |> String.to_atom() - end + # These names start and end with _ with 8 A-Z letters in between. This is used + # to generate random cache names for tests. The underscores are to ensure we + # keep a guaranteed sorting order when using distributed clusters. + def create_name, + do: String.to_atom("_#{gen_rand_bytes(8)}_") @doc false # Triggers a cache to be deleted at the end of the test. @@ -127,7 +132,7 @@ defmodule CachexCase.Helper do # raise the last known assertion error to bubble back to ExUnit. def poll(timeout, expected, generator, start_time \\ now()) do try do - assert(generator.() == expected) + assert generator.() == expected rescue e -> unless start_time + timeout > now() do