Skip to content

Commit

Permalink
Support multiple time units
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Dec 20, 2023
1 parent adbd133 commit 600ff08
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 28 deletions.
90 changes: 63 additions & 27 deletions src/opuntia.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
%%
%% This module implements the token-bucket traffic-shaping algorithm.
%%
%% The rate is given in tokens per millisecond and a bucket size.
%% The rate is given in `t:tokens()' per `t:time_unit()' and a bucket size.
%% Resolution is in native unit times as described by `erlang:monotonic_time/0'.
%%
%% The delay is always returned in milliseconds unit,
Expand All @@ -13,9 +13,9 @@
-export([new/1, update/2]).

-ifdef(TEST).
-export([create/2, calculate/3]).
-export([create/2, calculate/3, convert_time_unit/3]).
-else.
-compile({inline, [create/2, calculate/3, convert_native_to_ms/1]}).
-compile({inline, [create/2, calculate/3, convert_time_unit/3]}).
-endif.

-include("opuntia.hrl").
Expand All @@ -24,7 +24,10 @@
%% Number of milliseconds that is advise to wait after a shaping update.

-type rate() :: non_neg_integer().
%% Number of tokens accepted per millisecond.
%% Number of tokens accepted per `t:time_unit()'.

-type time_unit() :: second | millisecond | microsecond | nanosecond | native.
%% Supported shaping time units.

-type bucket_size() :: non_neg_integer().
%% Maximum capacity of the bucket regardless of how much time passes.
Expand All @@ -34,34 +37,41 @@

-type config() :: 0 | #{bucket_size := bucket_size(),
rate := rate(),
time_unit := time_unit(),
start_full := boolean()}.
-type shape() :: {bucket_size(), rate()}.
-type shape() :: {bucket_size(), rate(), time_unit()}.
%% See `new/1' for more details.

-type shaper() :: none | #token_bucket_shaper{}.
%% Shaper type

-export_type([shaper/0, shape/0, tokens/0, bucket_size/0, rate/0, delay/0]).
-export_type([shaper/0, shape/0, tokens/0, bucket_size/0, rate/0, time_unit/0, delay/0]).

-define(NON_NEG_INT(N), (is_integer(N) andalso N > 0)).
-define(TU(T), (second =:= T orelse
millisecond =:= T orelse
microsecond =:= T orelse
nanosecond =:= T orelse
native =:= T)).

%% @doc Creates a new shaper according to the configuration.
%%
%% If zero is given, no shaper in created and any update action will always return zero delay;
%% to configure a shaper it will need a
%% to configure a shaper it will need a config like
%% ```
%% #{bucket_size => MaximumTokens, rate => Rate, start_full => Boolean},
%% #{bucket_size => MaximumTokens, rate => Rate, time_unit => TimeUnit, start_full => Boolean}
%% '''
%% where
%% <ul>
%% <li>`Rate' is the number of tokens per millisecond the bucket will grow with.</li>
%% <li>`TimeUnit' is the time unit of measurement as defined by `t:time_unit()'</li>
%% <li>`Rate' is the number of tokens per `TimeUnit' the bucket will grow with.</li>
%% <li>`MaximumTokens' is the maximum number of tokens the bucket can grow.</li>
%% <li>`StartFull' indicates if the shaper starts with the bucket full, or empty if not.</li>
%% </ul>
%%
%% So, for example, if we configure a shaper with the following:
%% ```
%% #{bucket_size => 60000, rate => 10, start_full => true}
%% #{bucket_size => 60000, rate => 10, time_unit => millisecond, start_full => true}
%% '''
%% it means that the bucket will
%% allow `10' tokens per `millisecond', up to 60000 tokens, regardless of how long it is left
Expand Down Expand Up @@ -89,17 +99,19 @@ create(0, _) ->
none;
create(#{bucket_size := MaximumTokens,
rate := Rate,
time_unit := TimeUnit,
start_full := StartFull},
NativeNow)
when ?NON_NEG_INT(MaximumTokens),
?NON_NEG_INT(Rate),
MaximumTokens >= Rate,
?TU(TimeUnit),
is_boolean(StartFull) ->
AvailableAtStart = case StartFull of
true -> MaximumTokens;
false -> 0
end,
#token_bucket_shaper{shape = {MaximumTokens, Rate},
#token_bucket_shaper{shape = {MaximumTokens, Rate, TimeUnit},
available_tokens = AvailableAtStart,
last_update = NativeNow,
debt = 0.0}.
Expand All @@ -109,21 +121,23 @@ calculate(none, _, _) ->
{none, 0};
calculate(Shaper, 0, _) ->
{Shaper, 0};
calculate(#token_bucket_shaper{shape = {MaximumTokens, Rate},
calculate(#token_bucket_shaper{shape = {MaximumTokens, Rate, TimeUnit},
available_tokens = LastAvailableTokens,
last_update = NativeLastUpdate,
debt = LastDebt} = Shaper, TokensNowUsed, NativeNow) ->
debt = OverPenalisedInUnitsLastTime} = Shaper, TokensNowUsed, NativeNow) ->
NativeTimeSinceLastUpdate = NativeNow - NativeLastUpdate,

%% This is now a float and so will all below be, to preserve best rounding errors possible
TimeSinceLastUpdate = convert_native_to_ms(NativeTimeSinceLastUpdate) + LastDebt,
TimeSinceLastUpdate = convert_time_unit(NativeTimeSinceLastUpdate, native, TimeUnit),

%% How much we might have recovered since last time
AvailableAtGrowthRate = Rate * TimeSinceLastUpdate,
UnboundedTokenGrowth = LastAvailableTokens + AvailableAtGrowthRate,

%% Real recovery cannot grow higher than the actual rate in the window frame
ExactlyAvailableNow = min(MaximumTokens, UnboundedTokenGrowth),
ExactlyAvailableNow0 = min(MaximumTokens, UnboundedTokenGrowth),
%% But it can't be negative either (which can happen if we were already in debt)
ExactlyAvailableNow = max(0, ExactlyAvailableNow0),

%% How many are available after using TokensNowUsed can't be smaller than zero
TokensAvailable = max(0, ExactlyAvailableNow - TokensNowUsed),
Expand All @@ -132,27 +146,49 @@ calculate(#token_bucket_shaper{shape = {MaximumTokens, Rate},
TokensOverused = max(0, TokensNowUsed - ExactlyAvailableNow),

%% And then MaybeDelay will be zero if TokensOverused was zero
MaybeDelayMs = TokensOverused / Rate,
OverUsedRateNow = TokensOverused / Rate,

%% We penalise rounding up, the most important contract is that rate will never exceed that
%% requested, but the same way timeouts in Erlang promise not to arrive any time earlier but
%% don't promise at what time in the future they would arrive, nor we promise any upper bound
%% to the limits of the shaper delay.
RoundedDelayMs = ceil(MaybeDelayMs),

NewShaper = Shaper#token_bucket_shaper{available_tokens = TokensAvailable,
last_update = NativeNow + RoundedDelayMs,
debt = RoundedDelayMs - MaybeDelayMs},
{NewShaper, RoundedDelayMs}.
case OverUsedRateNow - OverPenalisedInUnitsLastTime of
%% Even after paying the old debt you incurr a new debt again
Punish when Punish >= +0.0 ->
DelayMs = convert_time_unit(Punish, TimeUnit, millisecond),
RoundedDelayMs = ceil(DelayMs),
OverPenalisedNow = RoundedDelayMs - DelayMs,
OverUsedRateNowInUnits = convert_time_unit(OverPenalisedNow, millisecond, TimeUnit),
DelayNative = convert_time_unit(RoundedDelayMs, TimeUnit, native),
RoundedDelayNative = ceil(DelayNative),
NewShaper = Shaper#token_bucket_shaper{available_tokens = TokensAvailable,
last_update = NativeNow + RoundedDelayNative,
debt = OverUsedRateNowInUnits},
{NewShaper, RoundedDelayMs};
%% I penalised you too much last time, you get off now but with a future bill
Punish when Punish =< +0.0 ->
NewShaper = Shaper#token_bucket_shaper{available_tokens = TokensAvailable,
last_update = NativeNow,
debt = -Punish},
{NewShaper, 0}
end.

%% Avoid rounding errors by using floats and float division,
%% erlang:convert_native_to_ms works only with integers
-spec convert_native_to_ms(number()) -> float().
convert_native_to_ms(Time) ->
time_unit_multiplier(millisecond) * Time / time_unit_multiplier(native).
%% erlang:convert_time_unit works only with integers
-spec convert_time_unit(number(), time_unit(), time_unit()) -> float().
convert_time_unit(Time, SameUnit, SameUnit) -> Time;
convert_time_unit(Time, FromUnit, ToUnit) ->
time_unit_multiplier(ToUnit) * Time / time_unit_multiplier(FromUnit).

-compile({inline, [time_unit_multiplier/1]}).
-spec time_unit_multiplier(time_unit()) -> pos_integer().
time_unit_multiplier(native) ->
erts_internal:time_unit();
time_unit_multiplier(nanosecond) ->
1000*1000*1000;
time_unit_multiplier(microsecond) ->
1000*1000;
time_unit_multiplier(millisecond) ->
1000.
1000;
time_unit_multiplier(second) ->
1.
1 change: 0 additions & 1 deletion src/opuntia.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
available_tokens :: opuntia:tokens(),
last_update :: integer(),
debt :: float() %% Always in the range [0.0, 1.0]
%% Signifies the unnecesary number of milliseconds of penalisation
}).

-endif.

0 comments on commit 600ff08

Please sign in to comment.