Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support other units #1

Merged
merged 9 commits into from
Jan 3, 2024
134 changes: 98 additions & 36 deletions src/opuntia.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
%%
%% This module implements the token-bucket traffic-shaping algorithm.
%%
%% The rate is given in tokens per millisecond and a bucket size.
%% The rate is given in `t:tokens()' per `t:time_unit()' and a bucket size.
%% Resolution is in native unit times as described by `erlang:monotonic_time/0'.
%%
%% The delay is always returned in milliseconds unit,
Expand All @@ -13,9 +13,10 @@
-export([new/1, update/2]).

-ifdef(TEST).
-export([create/2, calculate/3]).
-export([create/2, calculate/3, convert_time_unit/3]).
-else.
-compile({inline, [create/2, calculate/3, convert_native_to_ms/1]}).
-compile({inline, [create/2, calculate/3, convert_time_unit/3, timediff_in_units/3,
unbounded_bucket_diff/3, exactly_available_now/2, final_state/4]}).
-endif.

-include("opuntia.hrl").
Expand All @@ -24,7 +25,10 @@
%% Number of milliseconds that is advise to wait after a shaping update.

-type rate() :: non_neg_integer().
%% Number of tokens accepted per millisecond.
%% Number of tokens accepted per `t:time_unit()'.

-type time_unit() :: second | millisecond | microsecond | nanosecond | native.
%% Supported shaping time units.

-type bucket_size() :: non_neg_integer().
%% Maximum capacity of the bucket regardless of how much time passes.
Expand All @@ -34,34 +38,41 @@

-type config() :: 0 | #{bucket_size := bucket_size(),
rate := rate(),
time_unit := time_unit(),
start_full := boolean()}.
-type shape() :: {bucket_size(), rate()}.
-type shape() :: {bucket_size(), rate(), time_unit()}.
%% See `new/1' for more details.

-type shaper() :: none | #token_bucket_shaper{}.
%% Shaper type

-export_type([shaper/0, shape/0, tokens/0, bucket_size/0, rate/0, delay/0]).
-export_type([shaper/0, shape/0, tokens/0, bucket_size/0, rate/0, time_unit/0, delay/0]).

-define(NON_NEG_INT(N), (is_integer(N) andalso N > 0)).
NelsonVides marked this conversation as resolved.
Show resolved Hide resolved
-define(TU(T), (second =:= T orelse
millisecond =:= T orelse
microsecond =:= T orelse
nanosecond =:= T orelse
native =:= T)).

%% @doc Creates a new shaper according to the configuration.
%%
%% If zero is given, no shaper in created and any update action will always return zero delay;
%% to configure a shaper it will need a
%% to configure a shaper it will need a config like
%% ```
%% #{bucket_size => MaximumTokens, rate => Rate, start_full => Boolean},
%% #{bucket_size => MaximumTokens, rate => Rate, time_unit => TimeUnit, start_full => Boolean}
%% '''
%% where
%% <ul>
%% <li>`Rate' is the number of tokens per millisecond the bucket will grow with.</li>
%% <li>`TimeUnit' is the time unit of measurement as defined by `t:time_unit()'</li>
%% <li>`Rate' is the number of tokens per `TimeUnit' the bucket will grow with.</li>
%% <li>`MaximumTokens' is the maximum number of tokens the bucket can grow.</li>
%% <li>`StartFull' indicates if the shaper starts with the bucket full, or empty if not.</li>
%% </ul>
%%
%% So, for example, if we configure a shaper with the following:
%% ```
%% #{bucket_size => 60000, rate => 10, start_full => true}
%% #{bucket_size => 60000, rate => 10, time_unit => millisecond, start_full => true}
%% '''
%% it means that the bucket will
%% allow `10' tokens per `millisecond', up to 60000 tokens, regardless of how long it is left
Expand Down Expand Up @@ -89,17 +100,19 @@ create(0, _) ->
none;
create(#{bucket_size := MaximumTokens,
rate := Rate,
time_unit := TimeUnit,
start_full := StartFull},
NativeNow)
when ?NON_NEG_INT(MaximumTokens),
?NON_NEG_INT(Rate),
MaximumTokens >= Rate,
?TU(TimeUnit),
is_boolean(StartFull) ->
AvailableAtStart = case StartFull of
true -> MaximumTokens;
false -> 0
end,
#token_bucket_shaper{shape = {MaximumTokens, Rate},
#token_bucket_shaper{shape = {MaximumTokens, Rate, TimeUnit},
available_tokens = AvailableAtStart,
last_update = NativeNow,
debt = 0.0}.
Expand All @@ -109,21 +122,16 @@ calculate(none, _, _) ->
{none, 0};
calculate(Shaper, 0, _) ->
{Shaper, 0};
calculate(#token_bucket_shaper{shape = {MaximumTokens, Rate},
calculate(#token_bucket_shaper{shape = {MaximumTokens, Rate, TimeUnit},
available_tokens = LastAvailableTokens,
last_update = NativeLastUpdate,
debt = LastDebt} = Shaper, TokensNowUsed, NativeNow) ->
NativeTimeSinceLastUpdate = NativeNow - NativeLastUpdate,
debt = OverPenalisedInUnitsLastTime} = Shaper, TokensNowUsed, NativeNow) ->

%% This is now a float and so will all below be, to preserve best rounding errors possible
TimeSinceLastUpdate = convert_native_to_ms(NativeTimeSinceLastUpdate) + LastDebt,
TimeDiffInUnits = timediff_in_units(TimeUnit, NativeLastUpdate, NativeNow),

%% How much we might have recovered since last time
AvailableAtGrowthRate = Rate * TimeSinceLastUpdate,
UnboundedTokenGrowth = LastAvailableTokens + AvailableAtGrowthRate,
UnboundedTokenGrowth = unbounded_bucket_diff(Rate, LastAvailableTokens, TimeDiffInUnits),
NelsonVides marked this conversation as resolved.
Show resolved Hide resolved

%% Real recovery cannot grow higher than the actual rate in the window frame
ExactlyAvailableNow = min(MaximumTokens, UnboundedTokenGrowth),
ExactlyAvailableNow = exactly_available_now(MaximumTokens, UnboundedTokenGrowth),

%% How many are available after using TokensNowUsed can't be smaller than zero
TokensAvailable = max(0, ExactlyAvailableNow - TokensNowUsed),
Expand All @@ -132,27 +140,81 @@ calculate(#token_bucket_shaper{shape = {MaximumTokens, Rate},
TokensOverused = max(0, TokensNowUsed - ExactlyAvailableNow),

%% And then MaybeDelay will be zero if TokensOverused was zero
MaybeDelayMs = TokensOverused / Rate,
OverUsedRateNow = TokensOverused / Rate,

NewShaper = Shaper#token_bucket_shaper{available_tokens = TokensAvailable},
Punish = OverUsedRateNow - OverPenalisedInUnitsLastTime,
final_state(NewShaper, TimeUnit, Punish, NativeNow).

-spec timediff_in_units(time_unit(), integer(), integer()) -> float().
timediff_in_units(TimeUnit, NativeLastUpdate, NativeNow) ->
%% Time difference between now and the last update, in native
NativeTimeSinceLastUpdate = NativeNow - NativeLastUpdate,
%% This is now a float and so will all below be, to preserve best rounding errors possible
convert_time_unit(NativeTimeSinceLastUpdate, native, TimeUnit).

%% We penalise rounding up, the most important contract is that rate will never exceed that
%% requested, but the same way timeouts in Erlang promise not to arrive any time earlier but
%% don't promise at what time in the future they would arrive, nor we promise any upper bound
%% to the limits of the shaper delay.
RoundedDelayMs = ceil(MaybeDelayMs),
%% Unbounded growth is calculated, with float precision, in the configured time units
%%
%% Note that it can be negative, if earlier we have penalised giving a larger `last_update' and now we
%% update even before we have reach the point in time where the previous `last_update' was set
%%
%% If the growth was negative, that means that it has grown the debt instead
-spec unbounded_bucket_diff(rate(), tokens(), float()) -> float().
unbounded_bucket_diff(Rate, LastAvailableTokens, TimeDiffInUnits) ->
NelsonVides marked this conversation as resolved.
Show resolved Hide resolved
%% How much we might have recovered since last time
AvailableAtGrowthRate = Rate * TimeDiffInUnits,
%% Unbounded growth at rate since the last update
LastAvailableTokens + AvailableAtGrowthRate.

NewShaper = Shaper#token_bucket_shaper{available_tokens = TokensAvailable,
last_update = NativeNow + RoundedDelayMs,
debt = RoundedDelayMs - MaybeDelayMs},
{NewShaper, RoundedDelayMs}.
%% This is the real growth considering the maximum bucket size.
-spec exactly_available_now(bucket_size(), float()) -> float().
exactly_available_now(MaximumTokens, UnboundedTokenGrowth) ->
%% Real recovery cannot grow higher than the actual rate in the window frame
ExactlyAvailableNow0 = min(MaximumTokens, UnboundedTokenGrowth),
%% But it can't be negative either which can happen if we were already in debt,
%% but this is a debt we will pay when we calculate the final punishment in final_state
max(+0.0, ExactlyAvailableNow0).

%% We penalise rounding up, the most important contract is that rate will never exceed that
%% requested, but the same way timeouts in Erlang promise not to arrive any time earlier but
%% don't promise at what time in the future they would arrive, nor we promise any upper bound
%% to the limits of the shaper delay.
%%
%% Two cases, either:
%% Punish is positive: even after paying the old debt you incurr a new debt again
NelsonVides marked this conversation as resolved.
Show resolved Hide resolved
%% Punish is negative: I overpenalised you last time, you get off now but with a future bill
final_state(Shaper, TimeUnit, Punish, NativeNow) when Punish >= +0.0 ->
DelayMs = convert_time_unit(Punish, TimeUnit, millisecond),
RoundedDelayMs = ceil(DelayMs),
OverPenalisedNow = RoundedDelayMs - DelayMs,
OverUsedRateNowInUnits = convert_time_unit(OverPenalisedNow, millisecond, TimeUnit),
NelsonVides marked this conversation as resolved.
Show resolved Hide resolved
DelayNative = convert_time_unit(RoundedDelayMs, TimeUnit, native),
RoundedDelayNative = ceil(DelayNative),
NewShaper = Shaper#token_bucket_shaper{last_update = NativeNow + RoundedDelayNative,
debt = OverUsedRateNowInUnits},
{NewShaper, RoundedDelayMs};
final_state(Shaper, TimeUnit, Punish, NativeNow) when Punish < +0.0 ->
DebtInUnits = convert_time_unit(-Punish, millisecond, TimeUnit),
NewShaper = Shaper#token_bucket_shaper{last_update = NativeNow,
debt = DebtInUnits},
{NewShaper, 0}.

%% Avoid rounding errors by using floats and float division,
%% erlang:convert_native_to_ms works only with integers
-spec convert_native_to_ms(number()) -> float().
convert_native_to_ms(Time) ->
time_unit_multiplier(millisecond) * Time / time_unit_multiplier(native).
%% erlang:convert_time_unit works only with integers
-spec convert_time_unit(number(), time_unit(), time_unit()) -> float().
convert_time_unit(Time, SameUnit, SameUnit) -> Time;
convert_time_unit(Time, FromUnit, ToUnit) ->
time_unit_multiplier(ToUnit) * Time / time_unit_multiplier(FromUnit).

-compile({inline, [time_unit_multiplier/1]}).
-spec time_unit_multiplier(time_unit()) -> pos_integer().
time_unit_multiplier(native) ->
erts_internal:time_unit();
time_unit_multiplier(nanosecond) ->
1000*1000*1000;
time_unit_multiplier(microsecond) ->
1000*1000;
time_unit_multiplier(millisecond) ->
1000.
1000;
time_unit_multiplier(second) ->
1.
1 change: 0 additions & 1 deletion src/opuntia.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
available_tokens :: opuntia:tokens(),
last_update :: integer(),
debt :: float() %% Always in the range [0.0, 1.0]
%% Signifies the unnecesary number of milliseconds of penalisation
}).

-endif.
Loading