diff --git a/rebar.config b/rebar.config index 6a94c0b..62dc8a9 100644 --- a/rebar.config +++ b/rebar.config @@ -11,7 +11,7 @@ "-no_auto_compile -dir ebin -logdir log/ct --erl_args -smp enable -boot start_sasl"}, {cover_enabled, true}, {cover_export_enabled, true}, - {cover_opts, [verbose]}, + {cover_opts, [verbose, {min_coverage, 92}]}, {ct_opts, [{verbose, true}]}, {deps, [{katana, "1.0.0"}, {mixer, "1.2.0", {pkg, inaka_mixer}}, {meck, "0.9.2"}]}, {dialyzer, diff --git a/src/wpool.erl b/src/wpool.erl index 7803a76..adca5f3 100644 --- a/src/wpool.erl +++ b/src/wpool.erl @@ -146,7 +146,7 @@ %% %% Defaults to `5'. See {@link wpool_pool} for more details. --type queue_type() :: wpool_queue_manager:queue_type(). +-type queue_type() :: fifo | lifo. %% Order in which requests will be stored and handled by workers. %% %% This option can take values `lifo' or `fifo'. Defaults to `fifo'. @@ -164,6 +164,27 @@ %% Callbacks can be added and removed later by `wpool_pool:add_callback_module/2' and %% `wpool_pool:remove_callback_module/2'. +-type run(Result) :: fun((name() | pid(), timeout()) -> Result). +%% A function to run with a given worker. +%% +%% It can be used to enable APIs that hide the gen_server behind a complex logic +%% that might for example curate parameters or run side-effects, for example, `supervisor'. +%% +%% For example: +%% ``` +%% Opts = +%% #{workers => 3, +%% worker_shutdown => infinity, +%% worker => {supervisor, {Name, ModuleCallback, Args}}}, +%% %% Note that the supervisor's `init/1' callback takes such 3-tuple. +%% {ok, Pid} = wpool:start_sup_pool(pool_of_supervisors, Opts), +%% +%% ... +%% +%% Run = fun(Sup, _) -> supervisor:start_child(Sup, Params) end, +%% {ok, Pid} = wpool:run(pool_of_supervisors, Run, next_worker), +%% ''' + -type name() :: atom(). %% Name of the pool @@ -273,13 +294,15 @@ {workers, [{pos_integer(), worker_stats()}]}]. %% Statistics about a given live pool. --export_type([name/0, option/0, options/0, custom_strategy/0, strategy/0, worker_stats/0, stats/0]). +-export_type([name/0, option/0, options/0, custom_strategy/0, strategy/0, + queue_type/0, run/1, worker_stats/0, stats/0]). -export([start/0, start/2, stop/0, stop/1]). -export([child_spec/2, start_pool/1, start_pool/2, start_sup_pool/1, start_sup_pool/2]). -export([stop_pool/1, stop_sup_pool/1]). --export([call/2, cast/2, call/3, cast/3, call/4, broadcall/3, broadcast/2]). --export([send_request/2, send_request/3, send_request/4]). +-export([call/2, call/3, call/4, cast/2, cast/3, + run/2, run/3, run/4, broadcall/3, broadcast/2, + send_request/2, send_request/3, send_request/4]). -export([stats/0, stats/1, get_workers/1]). -export([default_strategy/0]). @@ -369,6 +392,38 @@ default_strategy() -> Strategy end. +%% @equiv run(Sup, Run, default_strategy()) +-spec run(name(), run(Result)) -> Result. +run(Sup, Run) -> + run(Sup, Run, default_strategy()). + +%% @equiv run(Sup, Run, Strategy, 5000) +-spec run(name(), run(Result), strategy()) -> Result. +run(Sup, Run, Strategy) -> + run(Sup, Run, Strategy, 5000). + +%% @doc Picks a server and issues the run to it. +%% +%% For all strategies except available_worker, Timeout applies only to the +%% time spent on the actual run to the worker, because time spent finding +%% the worker in other strategies is negligible. +%% For available_worker the time used choosing a worker is also considered +-spec run(name(), run(Result), strategy(), timeout()) -> Result. +run(Sup, Run, available_worker, Timeout) -> + wpool_pool:run_with_available_worker(Sup, Run, Timeout); +run(Sup, Run, next_available_worker, Timeout) -> + wpool_process:run(wpool_pool:next_available_worker(Sup), Run, Timeout); +run(Sup, Run, next_worker, Timeout) -> + wpool_process:run(wpool_pool:next_worker(Sup), Run, Timeout); +run(Sup, Run, random_worker, Timeout) -> + wpool_process:run(wpool_pool:random_worker(Sup), Run, Timeout); +run(Sup, Run, best_worker, Timeout) -> + wpool_process:run(wpool_pool:best_worker(Sup), Run, Timeout); +run(Sup, Run, {hash_worker, HashKey}, Timeout) -> + wpool_process:run(wpool_pool:hash_worker(Sup, HashKey), Run, Timeout); +run(Sup, Run, Fun, Timeout) when is_function(Fun, 1) -> + wpool_process:run(Fun(Sup), Run, Timeout). + %% @equiv call(Sup, Call, default_strategy()) -spec call(name(), term()) -> term(). call(Sup, Call) -> @@ -380,10 +435,11 @@ call(Sup, Call, Strategy) -> call(Sup, Call, Strategy, 5000). %% @doc Picks a server and issues the call to it. -%% For all strategies except available_worker, Timeout applies only to the -%% time spent on the actual call to the worker, because time spent finding -%% the worker in other strategies is negligible. -%% For available_worker the time used choosing a worker is also considered +%% +%% For all strategies except available_worker, Timeout applies only to the +%% time spent on the actual call to the worker, because time spent finding +%% the worker in other strategies is negligible. +%% For available_worker the time used choosing a worker is also considered -spec call(name(), term(), strategy(), timeout()) -> term(). call(Sup, Call, available_worker, Timeout) -> wpool_pool:call_available_worker(Sup, Call, Timeout); @@ -434,7 +490,8 @@ send_request(Sup, Call, Strategy) -> send_request(Sup, Call, Strategy, 5000). %% @doc Picks a server and issues the call to it. -%% Timeout applies only for the time used choosing a worker in the available_worker strategy +%% +%% Timeout applies only for the time used choosing a worker in the available_worker strategy -spec send_request(name(), term(), strategy(), timeout()) -> noproc | timeout | gen_server:request_id(). send_request(Sup, Call, available_worker, Timeout) -> @@ -486,6 +543,7 @@ stats(Sup) -> wpool_pool:stats(Sup). %% @doc Retrieves the list of worker registered names. +%% %% This can be useful to manually inspect the workers or do custom work on them. -spec get_workers(name()) -> [atom()]. get_workers(Sup) -> diff --git a/src/wpool_pool.erl b/src/wpool_pool.erl index 4f64998..f8a6ae5 100644 --- a/src/wpool_pool.erl +++ b/src/wpool_pool.erl @@ -28,7 +28,8 @@ %% API -export([start_link/2]). -export([best_worker/1, random_worker/1, next_worker/1, hash_worker/2, - next_available_worker/1, send_request_available_worker/3, call_available_worker/3]). + next_available_worker/1, send_request_available_worker/3, call_available_worker/3, + run_with_available_worker/3]). -export([cast_to_available_worker/2, broadcast/2, broadcall/3]). -export([stats/0, stats/1, get_workers/1]). -export([worker_name/2, find_wpool/1]). @@ -112,19 +113,44 @@ next_available_worker(Name) -> end end. +%% @doc Picks the first available worker and sends the call to it. +%% The timeout provided includes the time it takes to get a worker +%% and for it to process the call. +%% @throws no_workers | timeout +-spec run_with_available_worker(wpool:name(), wpool:run(Result), timeout()) -> Result. +run_with_available_worker(Name, Run, Timeout) -> + case find_wpool(Name) of + undefined -> + exit(no_workers); + #wpool{qmanager = QManager} -> + case wpool_queue_manager:run_with_available_worker(QManager, Run, Timeout) of + noproc -> + exit(no_workers); + timeout -> + exit(timeout); + Result -> + Result + end + end. + %% @doc Picks the first available worker and sends the call to it. %% The timeout provided includes the time it takes to get a worker %% and for it to process the call. %% @throws no_workers | timeout -spec call_available_worker(wpool:name(), any(), timeout()) -> any(). call_available_worker(Name, Call, Timeout) -> - case wpool_queue_manager:call_available_worker(queue_manager_name(Name), Call, Timeout) of - noproc -> + case find_wpool(Name) of + undefined -> exit(no_workers); - timeout -> - exit(timeout); - Result -> - Result + #wpool{qmanager = QManager} -> + case wpool_queue_manager:call_available_worker(QManager, Call, Timeout) of + noproc -> + exit(no_workers); + timeout -> + exit(timeout); + Result -> + Result + end end. %% @doc Picks the first available worker and sends the request to it. diff --git a/src/wpool_process.erl b/src/wpool_process.erl index c28c756..d752911 100644 --- a/src/wpool_process.erl +++ b/src/wpool_process.erl @@ -65,7 +65,7 @@ -export_type([next_step/0]). %% api --export([start_link/4, call/3, cast/2, send_request/2]). +-export([start_link/4, run/3, call/3, cast/2, send_request/2]). -ifdef(TEST). @@ -91,6 +91,11 @@ start_link(Name, Module, InitArgs, Options) -> {Name, Module, InitArgs, FullOpts}, WorkerOpt). +%% @doc Runs a function that takes as a parameter the given process +-spec run(wpool:name() | pid(), wpool:run(Result), timeout()) -> Result. +run(Process, Run, Timeout) -> + Run(Process, Timeout). + %% @equiv gen_server:call(Process, Call, Timeout) -spec call(wpool:name() | pid(), term(), timeout()) -> term(). call(Process, Call, Timeout) -> diff --git a/src/wpool_queue_manager.erl b/src/wpool_queue_manager.erl index 7540ab6..4efe464 100644 --- a/src/wpool_queue_manager.erl +++ b/src/wpool_queue_manager.erl @@ -18,8 +18,9 @@ %% api -export([start_link/2, start_link/3]). --export([call_available_worker/3, cast_to_available_worker/2, new_worker/2, worker_dead/2, - send_request_available_worker/3, worker_ready/2, worker_busy/2, pending_task_count/1]). +-export([run_with_available_worker/3, call_available_worker/3, cast_to_available_worker/2, + new_worker/2, worker_dead/2, send_request_available_worker/3, worker_ready/2, + worker_busy/2, pending_task_count/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2]). @@ -28,7 +29,7 @@ clients :: queue:queue({cast | {pid(), _}, term()}), workers :: gb_sets:set(atom()), monitors :: #{atom() := monitored_from()}, - queue_type :: queue_type()}). + queue_type :: wpool:queue_type()}). -opaque state() :: #state{}. @@ -50,7 +51,6 @@ -type arg() :: option() | pool. -type queue_mgr() :: atom(). --type queue_type() :: fifo | lifo. -type worker_event() :: new_worker | worker_dead | worker_busy | worker_ready. -export_type([worker_event/0]). @@ -58,7 +58,7 @@ -type call_request() :: {available_worker, infinity | pos_integer()} | pending_task_count. -export_type([call_request/0]). --export_type([queue_mgr/0, queue_type/0]). +-export_type([queue_mgr/0]). %%%=================================================================== %%% API @@ -71,13 +71,27 @@ start_link(WPool, Name) -> start_link(WPool, Name, Options) -> gen_server:start_link({local, Name}, ?MODULE, [{pool, WPool} | Options], []). +%% @doc returns the first available worker in the pool +-spec run_with_available_worker(queue_mgr(), wpool:run(Result), timeout()) -> + noproc | timeout | Result. +run_with_available_worker(QueueManager, Call, Timeout) -> + case get_available_worker(QueueManager, Call, Timeout) of + {ok, Worker, TimeLeft} when TimeLeft > 0 -> + wpool_process:run(Worker, Call, TimeLeft); + {ok, Worker, _} -> + worker_ready(QueueManager, Worker), + timeout; + Other -> + Other + end. + %% @doc returns the first available worker in the pool -spec call_available_worker(queue_mgr(), any(), timeout()) -> noproc | timeout | any(). call_available_worker(QueueManager, Call, Timeout) -> case get_available_worker(QueueManager, Call, Timeout) of - {ok, TimeLeft, Worker} when TimeLeft > 0 -> + {ok, Worker, TimeLeft} when TimeLeft > 0 -> wpool_process:call(Worker, Call, TimeLeft); - {ok, _, Worker} -> + {ok, Worker, _} -> worker_ready(QueueManager, Worker), timeout; Other -> @@ -97,7 +111,7 @@ cast_to_available_worker(QueueManager, Cast) -> noproc | timeout | gen_server:request_id(). send_request_available_worker(QueueManager, Call, Timeout) -> case get_available_worker(QueueManager, Call, Timeout) of - {ok, _TimeLeft, Worker} -> + {ok, Worker, _} -> wpool_process:send_request(Worker, Call); Other -> Other @@ -237,10 +251,9 @@ handle_info(_Info, State) -> %%% private %%%=================================================================== -spec get_available_worker(queue_mgr(), any(), timeout()) -> - noproc | timeout | {ok, timeout(), any()}. + noproc | timeout | {ok, atom(), timeout()}. get_available_worker(QueueManager, Call, Timeout) -> - Start = now_in_milliseconds(), - ExpiresAt = expires(Timeout, Start), + ExpiresAt = expires(Timeout), try gen_server:call(QueueManager, {available_worker, ExpiresAt}, Timeout) of {'EXIT', _, noproc} -> noproc; @@ -248,7 +261,7 @@ get_available_worker(QueueManager, Call, Timeout) -> exit({Exit, {gen_server, call, [Worker, Call, Timeout]}}); {ok, Worker} -> TimeLeft = time_left(ExpiresAt), - {ok, TimeLeft, Worker} + {ok, Worker, TimeLeft} catch _:{noproc, {gen_server, call, _}} -> noproc; @@ -268,11 +281,11 @@ inc(Key) -> dec(Key) -> put(Key, get(Key) - 1). --spec expires(timeout(), integer()) -> timeout(). -expires(infinity, _) -> +-spec expires(timeout()) -> timeout(). +expires(infinity) -> infinity; -expires(Timeout, NowMs) -> - NowMs + Timeout. +expires(Timeout) -> + now_in_milliseconds() + Timeout. -spec time_left(timeout()) -> timeout(). time_left(infinity) -> @@ -280,7 +293,7 @@ time_left(infinity) -> time_left(ExpiresAt) -> ExpiresAt - now_in_milliseconds(). --spec is_expired(integer()) -> boolean(). +-spec is_expired(timeout()) -> boolean(). is_expired(ExpiresAt) -> ExpiresAt > now_in_milliseconds(). diff --git a/test/echo_server.erl b/test/echo_server.erl index 82d5012..d3b0d18 100644 --- a/test/echo_server.erl +++ b/test/echo_server.erl @@ -17,6 +17,7 @@ -behaviour(gen_server). %% gen_server callbacks +-export([start_link/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_continue/2, format_status/1]). @@ -26,6 +27,10 @@ -export_type([from/0]). +-spec start_link(term()) -> gen_server:start_ret(). +start_link(Something) -> + gen_server:start_link(?MODULE, Something, []). + %%%=================================================================== %%% callbacks %%%=================================================================== diff --git a/test/echo_supervisor.erl b/test/echo_supervisor.erl new file mode 100644 index 0000000..66b3df7 --- /dev/null +++ b/test/echo_supervisor.erl @@ -0,0 +1,23 @@ +-module(echo_supervisor). + +-behaviour(supervisor). + +-export([start_link/0]). +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, noargs). + +init(noargs) -> + Children = + #{id => undefined, + start => {echo_server, start_link, []}, + restart => transient, + shutdown => 5000, + type => worker, + modules => [echo_server]}, + Strategy = + #{strategy => simple_one_for_one, + intensity => 5, + period => 60}, + {ok, {Strategy, [Children]}}. diff --git a/test/wpool_SUITE.erl b/test/wpool_SUITE.erl index 07a1ae1..e2febe3 100644 --- a/test/wpool_SUITE.erl +++ b/test/wpool_SUITE.erl @@ -28,7 +28,8 @@ -export([stats/1, stop_pool/1, non_brutal_shutdown/1, brutal_worker_shutdown/1, overrun/1, kill_on_overrun/1, too_much_overrun/1, default_strategy/1, overrun_handler1/1, overrun_handler2/1, default_options/1, complete_coverage/1, child_spec/1, broadcall/1, - broadcast/1, send_request/1, worker_killed_stats/1, accepts_maps_and_lists_as_opts/1]). + broadcast/1, send_request/1, worker_killed_stats/1, accepts_maps_and_lists_as_opts/1, + pool_of_supervisors/1]). -elvis([{elvis_style, no_block_expressions, disable}]). @@ -51,7 +52,8 @@ all() -> send_request, kill_on_overrun, worker_killed_stats, - accepts_maps_and_lists_as_opts]. + accepts_maps_and_lists_as_opts, + pool_of_supervisors]. -spec init_per_suite(config()) -> config(). init_per_suite(Config) -> @@ -515,6 +517,30 @@ accepts_maps_and_lists_as_opts(_Config) -> {comment, []}. +-spec pool_of_supervisors(config()) -> {comment, string()}. +pool_of_supervisors(_Config) -> + Opts = + #{workers => 3, + worker_shutdown => infinity, + worker => {supervisor, {echo_supervisor, echo_supervisor, noargs}}}, + + {ok, Pid} = wpool:start_sup_pool(pool_of_supervisors, Opts), + true = erlang:is_process_alive(Pid), + + Run = fun(Sup, _) -> supervisor:start_child(Sup, [{ok, #{}}]) end, + ForEach = + fun(_) -> + {ok, EchoServer} = wpool:run(pool_of_supervisors, Run, next_worker), + true = erlang:is_process_alive(EchoServer) + end, + lists:foreach(ForEach, lists:seq(1, 9)), + + Supervisors = wpool:get_workers(pool_of_supervisors), + [3 = proplists:get_value(active, supervisor:count_children(Supervisor)) + || Supervisor <- Supervisors], + + {comment, "Nicely load-balanced childrens across supervisors"}. + %% ============================================================================= %% Helpers %% ============================================================================= diff --git a/test/wpool_pool_SUITE.erl b/test/wpool_pool_SUITE.erl index 7036d20..ff98ae6 100644 --- a/test/wpool_pool_SUITE.erl +++ b/test/wpool_pool_SUITE.erl @@ -77,14 +77,25 @@ stop_worker(_Config) -> -spec available_worker(config()) -> {comment, []}. available_worker(_Config) -> Pool = available_worker, + Run = fun(Worker, Timeout) -> gen_server:call(Worker, {erlang, self, []}, Timeout) end, try wpool:call(not_a_pool, x) of - Result -> - no_result = Result + no_result -> + no_result + catch + _:no_workers -> + ok + end, + + try wpool:run(not_a_pool, Run) of + no_result -> + no_result catch _:no_workers -> ok end, + {ok, _} = wpool:run(Pool, Run, available_worker), + ct:log("Put them all to work, each request should go to a different worker"), [wpool:cast(Pool, {timer, sleep, [5000]}) || _ <- lists:seq(1, ?WORKERS)], @@ -107,11 +118,19 @@ available_worker(_Config) -> ct:log("If we can't wait we get no workers"), try wpool:call(Pool, {erlang, self, []}, available_worker, 100) of - R -> - should_fail = R + should_fail -> + should_fail + catch + _:timeout -> + timeout + end, + + try wpool:run(Pool, Run, available_worker, 100) of + should_fail -> + should_fail catch - _:Error -> - timeout = Error + _:timeout -> + timeout end, ct:log("Let's wait until all workers are free"), @@ -155,6 +174,9 @@ best_worker(_Config) -> ok end, + Run = fun(Worker, Timeout) -> gen_server:call(Worker, {erlang, self, []}, Timeout) end, + {ok, _} = wpool:run(Pool, Run, best_worker), + Req = wpool:send_request(Pool, {erlang, self, []}, best_worker), {reply, {ok, _}} = gen_server:wait_response(Req, 5000), @@ -185,6 +207,9 @@ next_available_worker(_Config) -> ok end, + Run = fun(Worker, Timeout) -> gen_server:call(Worker, {erlang, self, []}, Timeout) end, + {ok, _} = wpool:run(Pool, Run, next_available_worker), + ct:log("Put them all to work..."), [wpool:cast(Pool, {timer, sleep, [1500 + I]}, next_available_worker) || I <- lists:seq(0, (?WORKERS - 1) * 60000, 60000)], @@ -261,6 +286,9 @@ next_worker(_Config) -> Req = wpool:send_request(Pool, {erlang, self, []}, next_worker), {reply, {ok, _}} = gen_server:wait_response(Req, 5000), + Run = fun(Worker, Timeout) -> gen_server:call(Worker, {erlang, self, []}, Timeout) end, + {ok, _} = wpool:run(Pool, Run, next_worker), + {comment, []}. -spec random_worker(config()) -> {comment, []}. @@ -275,6 +303,9 @@ random_worker(_Config) -> ok end, + Run = fun(Worker, Timeout) -> gen_server:call(Worker, {erlang, self, []}, Timeout) end, + {ok, _} = wpool:run(Pool, Run, random_worker), + %% Ask for a random worker's identity 20x more than the number of workers %% and expect to get an answer from every worker at least once. Serial = @@ -342,6 +373,9 @@ hash_worker(_Config) -> sets:size( sets:from_list(Spread)), + Run = fun(Worker, Timeout) -> gen_server:call(Worker, {erlang, self, []}, Timeout) end, + [{ok, _} = wpool:run(Pool, Run, {hash_worker, I}) || I <- lists:seq(1, 20 * ?WORKERS)], + %% Fill up their message queues... [wpool:cast(Pool, {timer, sleep, [60000]}, {hash_worker, I}) || I <- lists:seq(1, 20 * ?WORKERS)], @@ -393,6 +427,9 @@ custom_worker(_Config) -> Req = wpool:send_request(Pool, {erlang, self, []}, Strategy), {reply, {ok, _}} = gen_server:wait_response(Req, 5000), + Run = fun(Worker, Timeout) -> gen_server:call(Worker, {erlang, self, []}, Timeout) end, + {ok, _} = wpool:run(Pool, Run, Strategy), + {comment, []}. -spec manager_crash(config()) -> {comment, []}.