diff --git a/src/opuntia.erl b/src/opuntia.erl
index b6ba824..ac71681 100644
--- a/src/opuntia.erl
+++ b/src/opuntia.erl
@@ -2,7 +2,7 @@
%%
%% This module implements the token-bucket traffic-shaping algorithm.
%%
-%% The rate is given in tokens per millisecond and a bucket size.
+%% The rate is given in `t:tokens()' per `t:time_unit()' and a bucket size.
%% Resolution is in native unit times as described by `erlang:monotonic_time/0'.
%%
%% The delay is always returned in milliseconds unit,
@@ -13,9 +13,10 @@
-export([new/1, update/2]).
-ifdef(TEST).
--export([create/2, calculate/3]).
+-export([create/2, calculate/3, convert_time_unit/3]).
-else.
--compile({inline, [create/2, calculate/3, convert_native_to_ms/1]}).
+-compile({inline, [create/2, calculate/3, convert_time_unit/3, timediff_in_units/3,
+ unbounded_available_tokens/3, exactly_available_tokens/2, final_state/4]}).
-endif.
-include("opuntia.hrl").
@@ -24,7 +25,10 @@
%% Number of milliseconds that is advise to wait after a shaping update.
-type rate() :: non_neg_integer().
-%% Number of tokens accepted per millisecond.
+%% Number of tokens accepted per `t:time_unit()'.
+
+-type time_unit() :: second | millisecond | microsecond | nanosecond | native.
+%% Supported shaping time units.
-type bucket_size() :: non_neg_integer().
%% Maximum capacity of the bucket regardless of how much time passes.
@@ -34,34 +38,41 @@
-type config() :: 0 | #{bucket_size := bucket_size(),
rate := rate(),
+ time_unit := time_unit(),
start_full := boolean()}.
--type shape() :: {bucket_size(), rate()}.
+-type shape() :: {bucket_size(), rate(), time_unit()}.
%% See `new/1' for more details.
-type shaper() :: none | #token_bucket_shaper{}.
%% Shaper type
--export_type([shaper/0, shape/0, tokens/0, bucket_size/0, rate/0, delay/0]).
+-export_type([shaper/0, shape/0, tokens/0, bucket_size/0, rate/0, time_unit/0, delay/0]).
--define(NON_NEG_INT(N), (is_integer(N) andalso N > 0)).
+-define(POS_INTEGER(N), (is_integer(N) andalso N > 0)).
+-define(TU(T), (second =:= T orelse
+ millisecond =:= T orelse
+ microsecond =:= T orelse
+ nanosecond =:= T orelse
+ native =:= T)).
%% @doc Creates a new shaper according to the configuration.
%%
%% If zero is given, no shaper in created and any update action will always return zero delay;
-%% to configure a shaper it will need a
+%% to configure a shaper it will need a config like
%% ```
-%% #{bucket_size => MaximumTokens, rate => Rate, start_full => Boolean},
+%% #{bucket_size => MaximumTokens, rate => Rate, time_unit => TimeUnit, start_full => Boolean}
%% '''
%% where
%%
-%% - `Rate' is the number of tokens per millisecond the bucket will grow with.
+%% - `TimeUnit' is the time unit of measurement as defined by `t:time_unit()'
+%% - `Rate' is the number of tokens per `TimeUnit' the bucket will grow with.
%% - `MaximumTokens' is the maximum number of tokens the bucket can grow.
%% - `StartFull' indicates if the shaper starts with the bucket full, or empty if not.
%%
%%
%% So, for example, if we configure a shaper with the following:
%% ```
-%% #{bucket_size => 60000, rate => 10, start_full => true}
+%% #{bucket_size => 60000, rate => 10, time_unit => millisecond, start_full => true}
%% '''
%% it means that the bucket will
%% allow `10' tokens per `millisecond', up to 60000 tokens, regardless of how long it is left
@@ -89,17 +100,19 @@ create(0, _) ->
none;
create(#{bucket_size := MaximumTokens,
rate := Rate,
+ time_unit := TimeUnit,
start_full := StartFull},
NativeNow)
- when ?NON_NEG_INT(MaximumTokens),
- ?NON_NEG_INT(Rate),
+ when ?POS_INTEGER(MaximumTokens),
+ ?POS_INTEGER(Rate),
MaximumTokens >= Rate,
+ ?TU(TimeUnit),
is_boolean(StartFull) ->
AvailableAtStart = case StartFull of
true -> MaximumTokens;
false -> 0
end,
- #token_bucket_shaper{shape = {MaximumTokens, Rate},
+ #token_bucket_shaper{shape = {MaximumTokens, Rate, TimeUnit},
available_tokens = AvailableAtStart,
last_update = NativeNow,
debt = 0.0}.
@@ -109,21 +122,16 @@ calculate(none, _, _) ->
{none, 0};
calculate(Shaper, 0, _) ->
{Shaper, 0};
-calculate(#token_bucket_shaper{shape = {MaximumTokens, Rate},
+calculate(#token_bucket_shaper{shape = {MaximumTokens, Rate, TimeUnit},
available_tokens = LastAvailableTokens,
last_update = NativeLastUpdate,
- debt = LastDebt} = Shaper, TokensNowUsed, NativeNow) ->
- NativeTimeSinceLastUpdate = NativeNow - NativeLastUpdate,
+ debt = OverPenalisedInUnitsLastTime} = Shaper, TokensNowUsed, NativeNow) ->
- %% This is now a float and so will all below be, to preserve best rounding errors possible
- TimeSinceLastUpdate = convert_native_to_ms(NativeTimeSinceLastUpdate) + LastDebt,
+ TimeDiffInUnits = timediff_in_units(TimeUnit, NativeLastUpdate, NativeNow),
- %% How much we might have recovered since last time
- AvailableAtGrowthRate = Rate * TimeSinceLastUpdate,
- UnboundedTokenGrowth = LastAvailableTokens + AvailableAtGrowthRate,
+ UnboundedTokens = unbounded_available_tokens(Rate, LastAvailableTokens, TimeDiffInUnits),
- %% Real recovery cannot grow higher than the actual rate in the window frame
- ExactlyAvailableNow = min(MaximumTokens, UnboundedTokenGrowth),
+ ExactlyAvailableNow = exactly_available_tokens(MaximumTokens, UnboundedTokens),
%% How many are available after using TokensNowUsed can't be smaller than zero
TokensAvailable = max(0, ExactlyAvailableNow - TokensNowUsed),
@@ -132,27 +140,81 @@ calculate(#token_bucket_shaper{shape = {MaximumTokens, Rate},
TokensOverused = max(0, TokensNowUsed - ExactlyAvailableNow),
%% And then MaybeDelay will be zero if TokensOverused was zero
- MaybeDelayMs = TokensOverused / Rate,
+ OverUsedRateNow = TokensOverused / Rate,
+
+ NewShaper = Shaper#token_bucket_shaper{available_tokens = TokensAvailable},
+ Punish = OverUsedRateNow - OverPenalisedInUnitsLastTime,
+ final_state(NewShaper, TimeUnit, Punish, NativeNow).
+
+-spec timediff_in_units(time_unit(), integer(), integer()) -> float().
+timediff_in_units(TimeUnit, NativeLastUpdate, NativeNow) ->
+ %% Time difference between now and the last update, in native
+ NativeTimeSinceLastUpdate = NativeNow - NativeLastUpdate,
+ %% This is now a float and so will all below be, to preserve best rounding errors possible
+ convert_time_unit(NativeTimeSinceLastUpdate, native, TimeUnit).
- %% We penalise rounding up, the most important contract is that rate will never exceed that
- %% requested, but the same way timeouts in Erlang promise not to arrive any time earlier but
- %% don't promise at what time in the future they would arrive, nor we promise any upper bound
- %% to the limits of the shaper delay.
- RoundedDelayMs = ceil(MaybeDelayMs),
+%% Unbounded growth is calculated, with float precision, in the configured time units
+%%
+%% Note that it can be negative, if earlier we have penalised giving a larger `last_update' and now we
+%% update even before we have reach the point in time where the previous `last_update' was set
+%%
+%% If the growth was negative, that means that it has grown the debt instead
+-spec unbounded_available_tokens(rate(), tokens(), float()) -> float().
+unbounded_available_tokens(Rate, LastAvailableTokens, TimeDiffInUnits) ->
+ %% How much we might have recovered since last time
+ AvailableAtGrowthRate = Rate * TimeDiffInUnits,
+ %% Unbounded growth at rate since the last update
+ LastAvailableTokens + AvailableAtGrowthRate.
- NewShaper = Shaper#token_bucket_shaper{available_tokens = TokensAvailable,
- last_update = NativeNow + RoundedDelayMs,
- debt = RoundedDelayMs - MaybeDelayMs},
- {NewShaper, RoundedDelayMs}.
+%% This is the real growth considering the maximum bucket size.
+-spec exactly_available_tokens(bucket_size(), float()) -> float().
+exactly_available_tokens(MaximumTokens, UnboundedTokens) ->
+ %% Real recovery cannot grow higher than the actual rate in the window frame
+ ExactlyAvailableNow0 = min(MaximumTokens, UnboundedTokens),
+ %% But it can't be negative either which can happen if we were already in debt,
+ %% but this is a debt we will pay when we calculate the final punishment in final_state
+ max(+0.0, ExactlyAvailableNow0).
+
+%% We penalise rounding up, the most important contract is that rate will never exceed that
+%% requested, but the same way timeouts in Erlang promise not to arrive any time earlier but
+%% don't promise at what time in the future they would arrive, nor we promise any upper bound
+%% to the limits of the shaper delay.
+%%
+%% Two cases, either:
+%% Punish is positive: even after paying the old debt you incur a new debt again
+%% Punish is negative: I overpenalised you last time, you get off now but with a future bill
+final_state(Shaper, TimeUnit, Punish, NativeNow) when Punish >= +0.0 ->
+ DelayMs = convert_time_unit(Punish, TimeUnit, millisecond),
+ RoundedDelayMs = ceil(DelayMs),
+ Debt = RoundedDelayMs - DelayMs,
+ DebtInUnits = convert_time_unit(Debt, millisecond, TimeUnit),
+ DelayNative = convert_time_unit(RoundedDelayMs, TimeUnit, native),
+ RoundedDelayNative = ceil(DelayNative),
+ NewShaper = Shaper#token_bucket_shaper{last_update = NativeNow + RoundedDelayNative,
+ debt = DebtInUnits},
+ {NewShaper, RoundedDelayMs};
+final_state(Shaper, TimeUnit, Punish, NativeNow) when Punish < +0.0 ->
+ DebtInUnits = convert_time_unit(-Punish, millisecond, TimeUnit),
+ NewShaper = Shaper#token_bucket_shaper{last_update = NativeNow,
+ debt = DebtInUnits},
+ {NewShaper, 0}.
%% Avoid rounding errors by using floats and float division,
-%% erlang:convert_native_to_ms works only with integers
--spec convert_native_to_ms(number()) -> float().
-convert_native_to_ms(Time) ->
- time_unit_multiplier(millisecond) * Time / time_unit_multiplier(native).
+%% erlang:convert_time_unit works only with integers
+-spec convert_time_unit(number(), time_unit(), time_unit()) -> float().
+convert_time_unit(Time, SameUnit, SameUnit) -> Time;
+convert_time_unit(Time, FromUnit, ToUnit) ->
+ time_unit_multiplier(ToUnit) * Time / time_unit_multiplier(FromUnit).
-compile({inline, [time_unit_multiplier/1]}).
+-spec time_unit_multiplier(time_unit()) -> pos_integer().
time_unit_multiplier(native) ->
erts_internal:time_unit();
+time_unit_multiplier(nanosecond) ->
+ 1000*1000*1000;
+time_unit_multiplier(microsecond) ->
+ 1000*1000;
time_unit_multiplier(millisecond) ->
- 1000.
+ 1000;
+time_unit_multiplier(second) ->
+ 1.
diff --git a/src/opuntia.hrl b/src/opuntia.hrl
index 94f89c3..83fe159 100644
--- a/src/opuntia.hrl
+++ b/src/opuntia.hrl
@@ -6,7 +6,6 @@
available_tokens :: opuntia:tokens(),
last_update :: integer(),
debt :: float() %% Always in the range [0.0, 1.0]
- %% Signifies the unnecesary number of milliseconds of penalisation
}).
-endif.
diff --git a/test/opuntia_SUITE.erl b/test/opuntia_SUITE.erl
index 7ad91df..fe85d69 100644
--- a/test/opuntia_SUITE.erl
+++ b/test/opuntia_SUITE.erl
@@ -30,7 +30,14 @@ groups() ->
run_shaper_without_consuming_does_not_delay,
run_basic_shaper_property,
run_stateful_server
- ]}
+ ]},
+ %% This group is purposefully left out because it is too slow to run on CI,
+ %% and uses a `timer:tc/4` only available since OTP26
+ %% Here for the record, can be enabled locally and checked
+ {delays, [sequence],
+ [
+ run_with_delays
+ ]}
].
%%%===================================================================
@@ -71,8 +78,13 @@ keep_table() ->
%%%===================================================================
simple_test_no_delay_is_needed(_) ->
+ Units = [second, millisecond, microsecond, nanosecond, native],
+ [ simple_test_no_delay_is_needed_for_unit(Unit) || Unit <- Units ].
+
+simple_test_no_delay_is_needed_for_unit(Unit) ->
FoldFun = fun(N, ShIn) -> {ShOut, 0} = opuntia:update(ShIn, N), ShOut end,
- Shaper = opuntia:new(#{bucket_size => 100000, rate => 10000, start_full => true}),
+ Config = #{bucket_size => 100000, rate => 10000, time_unit => Unit, start_full => true},
+ Shaper = opuntia:new(Config),
lists:foldl(FoldFun, Shaper, lists:duplicate(10000, 1)).
run_shaper_with_zero_does_not_shape(_) ->
@@ -93,6 +105,25 @@ run_shaper_without_consuming_does_not_delay(_) ->
end),
run_prop(?FUNCTION_NAME, Prop, 1000, 2).
+run_with_delays(_) ->
+ S = "ToConsume ~p, Shape ~p, TimeItTook ~p, CalculatedDelay ~p ms, in range [~p, ~p]ms, ~nLastShaper ~p,~nHistory ~p",
+ Prop = ?FORALL(
+ {TokensToSpend, Shape},
+ {tokens(), shape()},
+ begin
+ Shaper = opuntia:new(Shape),
+ {TimeItTookUs, {LastShaper, History, CalculatedDelay}} =
+ timer:tc(fun run_with_sleeps/2, [Shaper, TokensToSpend], native),
+ TimeItTookMs = opuntia:convert_time_unit(TimeItTookUs, native, millisecond),
+ {CannotBeFasterThan, CannotBeSlowerThan} = should_take_in_range(Shape, TokensToSpend),
+ AdjustCannotBeSlowerThan = CannotBeSlowerThan + 10,
+ Val = value_in_range(TimeItTookMs, CannotBeFasterThan, AdjustCannotBeSlowerThan),
+ P = [TokensToSpend, Shape, TimeItTookMs, CalculatedDelay, CannotBeFasterThan,
+ AdjustCannotBeSlowerThan, LastShaper, History],
+ success_or_log_and_return(Val andalso is_integer(CalculatedDelay), S, P)
+ end),
+ run_prop(?FUNCTION_NAME, Prop, 100, 1).
+
run_basic_shaper_property(_) ->
S = "ToConsume ~p, Shape ~p, CalculatedDelay ~p ms, in range [~p, ~p]ms, ~nLastShaper ~p,~nHistory ~p",
Prop = ?FORALL(
@@ -165,9 +196,10 @@ do_postcondition(State, Server, Key, Tokens, Res) ->
TokensNowConsumed = tokens_now_consumed(State, Key, Tokens),
{MinimumExpectedMs, _} = should_take_in_range(Shape, TokensNowConsumed),
Duration = Now - Start,
- ct:pal("For shape ~p, requested ~p, expected ~p and duration ~p~n",
- [Shape, Tokens, MinimumExpectedMs, Duration]),
- continue =:= Res andalso MinimumExpectedMs =< Duration.
+ S = "For shape ~p, consumed ~p, expected-min-time ~f and it took duration ~B~n",
+ P = [{maps:get(rate, Shape), maps:get(time_unit, Shape)}, Tokens, floor(MinimumExpectedMs), Duration],
+ Val = continue =:= Res andalso floor(MinimumExpectedMs) =< Duration,
+ success_or_log_and_return(Val, S, P).
next_state(_State, _Result, {call, ?MODULE, reset_shapers, [_Server]}) ->
#{};
@@ -201,25 +233,29 @@ key() ->
tokens() ->
integer(1, 99999).
+time_unit() ->
+ oneof([second, millisecond, microsecond, nanosecond, native]).
+
config() ->
union([shape_for_server(), function(0, shape_for_server())]).
shape_for_server() ->
- ShapeGen = {integer(1, 9999), integer(1, 9999)},
- ?LET({M, N}, ShapeGen,
- begin
- #{bucket_size => max(M, N),
- rate => min(M, N),
- start_full => true}
- end).
+ %% server is slower and proper struggles with bigger numbers, not critical
+ ShapeGen = {integer(1, 999), integer(1, 999), time_unit(), boolean()},
+ let_shape(ShapeGen).
shape() ->
- ShapeGen = {integer(1, 99999), integer(1, 99999), boolean()},
+ Int = integer(1, 99999),
+ ShapeGen = {Int, Int, time_unit(), boolean()},
+ let_shape(ShapeGen).
+
+let_shape(ShapeGen) ->
?LET(Shape, ShapeGen,
begin
- {M, N, StartFull} = Shape,
+ {M, N, TimeUnit, StartFull} = Shape,
#{bucket_size => max(M, N),
rate => min(M, N),
+ time_unit => TimeUnit,
start_full => StartFull}
end).
@@ -233,34 +269,58 @@ success_or_log_and_return(false, S, P) ->
ct:pal(S, P),
false.
-should_take_in_range(#{rate := Rate, start_full := false}, ToConsume) ->
- ExpectedMs = ToConsume / Rate,
- {ExpectedMs, ExpectedMs + 1};
+should_take_in_range(#{rate := Rate, time_unit := TimeUnit, start_full := false}, ToConsume) ->
+ Expected = ToConsume / Rate,
+ ExpectedMs = opuntia:convert_time_unit(Expected, TimeUnit, millisecond),
+ {ExpectedMs, ceil(ExpectedMs + 1)};
should_take_in_range(#{bucket_size := MaximumTokens,
rate := Rate,
+ time_unit := TimeUnit,
start_full := true},
ToConsume) ->
case ToConsume < MaximumTokens of
- true -> {0, 0};
+ true -> {0, 1};
false ->
ToThrottle = ToConsume - MaximumTokens,
- ExpectedMs = ToThrottle / Rate,
- {ExpectedMs, ExpectedMs + 1}
+ Expected = ToThrottle / Rate,
+ ExpectedMs = opuntia:convert_time_unit(Expected, TimeUnit, millisecond),
+ {ExpectedMs, ceil(ExpectedMs + 1)}
end.
+run_with_sleeps(Shaper, ToConsume) ->
+ run_with_sleeps(Shaper, [], 0, ToConsume).
+
+run_with_sleeps(Shaper, History, AccumulatedDelay, TokensLeft) when TokensLeft =< 0 ->
+ {Shaper, lists:reverse(History), AccumulatedDelay};
+run_with_sleeps(Shaper, History, AccumulatedDelay, TokensLeft) ->
+ ConsumeNow = rand:uniform(TokensLeft),
+ {NewShaper, DelayMs} = opuntia:update(Shaper, ConsumeNow),
+ timer:sleep(DelayMs),
+ NewEvent = #{consumed => ConsumeNow, proposed_delay => DelayMs, shaper => Shaper},
+ NewHistory = [NewEvent | History],
+ NewDelay = AccumulatedDelay + DelayMs,
+ run_with_sleeps(NewShaper, NewHistory, NewDelay, TokensLeft - ConsumeNow).
+
run_shaper(Shape, ToConsume) ->
- Now = erlang:monotonic_time(),
- Shaper = opuntia:create(Shape, Now),
- run_shaper(Shaper, Now, [], 0, ToConsume).
+ Shaper = opuntia:create(Shape, 0),
+ run_shaper(Shaper, [], 0, ToConsume).
-run_shaper(Shaper, _, History, AccumulatedDelay, 0) ->
+run_shaper(Shaper, History, AccumulatedDelay, 0) ->
{Shaper, lists:reverse(History), AccumulatedDelay};
-run_shaper(Shaper, Now, History, AccumulatedDelay, TokensLeft) ->
+run_shaper(Shaper, History, AccumulatedDelay, TokensLeft) ->
%% Uniform distributes in [1, N], and we want [0, N], so we generate [1, N+1] and subtract 1
ConsumeNow = rand:uniform(TokensLeft + 1) - 1,
- {NewShaper, DelayMs} = opuntia:calculate(Shaper, ConsumeNow, Now),
- true = DelayMs >= 0,
- run_shaper(NewShaper, Now, [{ConsumeNow, DelayMs} | History], AccumulatedDelay + DelayMs, TokensLeft - ConsumeNow).
+ {NewShaper, DelayMs} = opuntia:calculate(Shaper, ConsumeNow, 0),
+ NewEvent = #{consumed => ConsumeNow, proposed_delay => DelayMs, final_shaper => NewShaper},
+ NewHistory = [NewEvent | History],
+ NewDelay = AccumulatedDelay + DelayMs,
+ NewToConsume = TokensLeft - ConsumeNow,
+ case is_integer(DelayMs) andalso DelayMs >= 0 of
+ true ->
+ run_shaper(NewShaper, NewHistory, NewDelay, NewToConsume);
+ false ->
+ {NewShaper, NewHistory, bad_delay}
+ end.
run_prop(PropName, Property, NumTests, WorkersPerScheduler) ->
Opts = [quiet, noshrink, {start_size, 1}, {numtests, NumTests},