Skip to content

Commit

Permalink
Merge pull request #192 from inaka/small_perf_improvements
Browse files Browse the repository at this point in the history
Small perf improvements
  • Loading branch information
elbrujohalcon authored Apr 26, 2022
2 parents 9ecceb5 + a56fc38 commit d117503
Showing 1 changed file with 35 additions and 26 deletions.
61 changes: 35 additions & 26 deletions src/wpool_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ random_worker(Name) ->
case find_wpool(Name) of
undefined ->
exit(no_workers);
Wpool = #wpool{size = WpoolSize} ->
WorkerNumber = rand:uniform(WpoolSize),
worker_name(Wpool, WorkerNumber)
Wpool = #wpool{size = Size} ->
WorkerNumber = fast_rand_uniform(Size),
nth_worker_name(Wpool, WorkerNumber)
end.

%% @doc Picks the next worker in a round robin fashion
Expand All @@ -87,7 +87,7 @@ next_worker(Name) ->
Index = atomics:get(Atomic, 1),
NextIndex = next_to_check(Index, Size),
_ = atomics:compare_exchange(Atomic, 1, Index, NextIndex),
worker_name(Wpool, Index)
nth_worker_name(Wpool, Index)
end.

%% @doc Picks the first available worker, if any
Expand Down Expand Up @@ -141,7 +141,7 @@ hash_worker(Name, HashKey) ->
exit(no_workers);
Wpool = #wpool{size = WpoolSize} ->
Index = 1 + erlang:phash2(HashKey, WpoolSize),
worker_name(Wpool, Index)
nth_worker_name(Wpool, Index)
end.

%% @doc Casts a message to the first available worker.
Expand Down Expand Up @@ -220,7 +220,7 @@ stats(Wpool, Name) ->
{workers, WorkerStats}].

worker_info(Wpool, N, Info) ->
case erlang:whereis(worker_name(Wpool, N)) of
case erlang:whereis(nth_worker_name(Wpool, N)) of
undefined ->
undefined;
Worker ->
Expand Down Expand Up @@ -350,9 +350,11 @@ init({Name, Options}) ->
{ok, {SupStrategy, Children}}.

%% @private
-spec worker_name(wpool() | wpool:name(), pos_integer()) -> atom().
worker_name(#wpool{workers = Workers}, I) ->
element(I, Workers);
-spec nth_worker_name(wpool(), pos_integer()) -> atom().
nth_worker_name(#wpool{workers = Workers}, I) ->
element(I, Workers).

-spec worker_name(wpool:name(), pos_integer()) -> atom().
worker_name(Name, I) ->
list_to_atom(?MODULE_STRING ++ [$- | atom_to_list(Name)] ++ [$- | integer_to_list(I)]).

Expand All @@ -368,18 +370,17 @@ queue_manager_name(Name) ->
event_manager_name(Name) ->
list_to_atom(?MODULE_STRING ++ [$- | atom_to_list(Name)] ++ "-event-manager").

worker_with_no_task(Wpool) ->
worker_with_no_task(#wpool{size = Size} = Wpool) ->
%% Moving the beginning of the list to a random point to ensure that clients
%% do not always start asking for process_info to the processes that are most
%% likely to have bigger message queues
Size = Wpool#wpool.size,
First = rand:uniform(Size),
First = fast_rand_uniform(Size),
worker_with_no_task(0, Size, First, Wpool).

worker_with_no_task(Size, Size, _, _) ->
undefined;
worker_with_no_task(Step, Size, ToCheck, Wpool) ->
Worker = worker_name(Wpool, ToCheck),
Worker = nth_worker_name(Wpool, ToCheck),
case try_process_info(whereis(Worker), [message_queue_len, dictionary]) of
[{message_queue_len, 0}, {dictionary, Dictionary}] ->
case proplists:get_value(wpool_task, Dictionary) of
Expand All @@ -397,29 +398,37 @@ try_process_info(undefined, _) ->
try_process_info(Pid, Keys) ->
erlang:process_info(Pid, Keys).

min_message_queue(Wpool) ->
min_message_queue(#wpool{size = Size} = Wpool) ->
%% Moving the beginning of the list to a random point to ensure that clients
%% do not always start asking for process_info to the processes that are most
%% likely to have bigger message queues
Size = Wpool#wpool.size,
First = rand:uniform(Size),
min_message_queue(0, Size, First, Wpool, []).
First = fast_rand_uniform(Size),
Worker = nth_worker_name(Wpool, First),
QLength = queue_length(whereis(Worker)),
min_message_queue(0, Size, First, Wpool, QLength, Worker).

min_message_queue(Size, Size, _, _, Found) ->
{_, Worker} = lists:min(Found),
min_message_queue(_, _, _, _, 0, Worker) ->
Worker;
min_message_queue(Size, Size, _, _, _QLength, Worker) ->
Worker;
min_message_queue(Step, Size, ToCheck, Wpool, Found) ->
Worker = worker_name(Wpool, ToCheck),
min_message_queue(Step, Size, ToCheck, Wpool, CurrentQLength, CurrentWorker) ->
Worker = nth_worker_name(Wpool, ToCheck),
QLength = queue_length(whereis(Worker)),
min_message_queue(Step + 1,
Size,
next_to_check(ToCheck, Size),
Wpool,
[{QLength, Worker} | Found]).
Next = next_to_check(ToCheck, Size),
case QLength < CurrentQLength of
true ->
min_message_queue(Step + 1, Size, Next, Wpool, QLength, Worker);
false ->
min_message_queue(Step + 1, Size, Next, Wpool, CurrentQLength, CurrentWorker)
end.

next_to_check(Next, Size) ->
Next rem Size + 1.

fast_rand_uniform(Range) ->
UI = erlang:unique_integer(),
1 + erlang:phash2(UI, Range).

queue_length(undefined) ->
infinity;
queue_length(Pid) when is_pid(Pid) ->
Expand Down

0 comments on commit d117503

Please sign in to comment.