diff --git a/src/opuntia.erl b/src/opuntia.erl index b6ba824..ac71681 100644 --- a/src/opuntia.erl +++ b/src/opuntia.erl @@ -2,7 +2,7 @@ %% %% This module implements the token-bucket traffic-shaping algorithm. %% -%% The rate is given in tokens per millisecond and a bucket size. +%% The rate is given in `t:tokens()' per `t:time_unit()' and a bucket size. %% Resolution is in native unit times as described by `erlang:monotonic_time/0'. %% %% The delay is always returned in milliseconds unit, @@ -13,9 +13,10 @@ -export([new/1, update/2]). -ifdef(TEST). --export([create/2, calculate/3]). +-export([create/2, calculate/3, convert_time_unit/3]). -else. --compile({inline, [create/2, calculate/3, convert_native_to_ms/1]}). +-compile({inline, [create/2, calculate/3, convert_time_unit/3, timediff_in_units/3, + unbounded_available_tokens/3, exactly_available_tokens/2, final_state/4]}). -endif. -include("opuntia.hrl"). @@ -24,7 +25,10 @@ %% Number of milliseconds that is advise to wait after a shaping update. -type rate() :: non_neg_integer(). -%% Number of tokens accepted per millisecond. +%% Number of tokens accepted per `t:time_unit()'. + +-type time_unit() :: second | millisecond | microsecond | nanosecond | native. +%% Supported shaping time units. -type bucket_size() :: non_neg_integer(). %% Maximum capacity of the bucket regardless of how much time passes. @@ -34,34 +38,41 @@ -type config() :: 0 | #{bucket_size := bucket_size(), rate := rate(), + time_unit := time_unit(), start_full := boolean()}. --type shape() :: {bucket_size(), rate()}. +-type shape() :: {bucket_size(), rate(), time_unit()}. %% See `new/1' for more details. -type shaper() :: none | #token_bucket_shaper{}. %% Shaper type --export_type([shaper/0, shape/0, tokens/0, bucket_size/0, rate/0, delay/0]). +-export_type([shaper/0, shape/0, tokens/0, bucket_size/0, rate/0, time_unit/0, delay/0]). --define(NON_NEG_INT(N), (is_integer(N) andalso N > 0)). +-define(POS_INTEGER(N), (is_integer(N) andalso N > 0)). +-define(TU(T), (second =:= T orelse + millisecond =:= T orelse + microsecond =:= T orelse + nanosecond =:= T orelse + native =:= T)). %% @doc Creates a new shaper according to the configuration. %% %% If zero is given, no shaper in created and any update action will always return zero delay; -%% to configure a shaper it will need a +%% to configure a shaper it will need a config like %% ``` -%% #{bucket_size => MaximumTokens, rate => Rate, start_full => Boolean}, +%% #{bucket_size => MaximumTokens, rate => Rate, time_unit => TimeUnit, start_full => Boolean} %% ''' %% where %% %% %% So, for example, if we configure a shaper with the following: %% ``` -%% #{bucket_size => 60000, rate => 10, start_full => true} +%% #{bucket_size => 60000, rate => 10, time_unit => millisecond, start_full => true} %% ''' %% it means that the bucket will %% allow `10' tokens per `millisecond', up to 60000 tokens, regardless of how long it is left @@ -89,17 +100,19 @@ create(0, _) -> none; create(#{bucket_size := MaximumTokens, rate := Rate, + time_unit := TimeUnit, start_full := StartFull}, NativeNow) - when ?NON_NEG_INT(MaximumTokens), - ?NON_NEG_INT(Rate), + when ?POS_INTEGER(MaximumTokens), + ?POS_INTEGER(Rate), MaximumTokens >= Rate, + ?TU(TimeUnit), is_boolean(StartFull) -> AvailableAtStart = case StartFull of true -> MaximumTokens; false -> 0 end, - #token_bucket_shaper{shape = {MaximumTokens, Rate}, + #token_bucket_shaper{shape = {MaximumTokens, Rate, TimeUnit}, available_tokens = AvailableAtStart, last_update = NativeNow, debt = 0.0}. @@ -109,21 +122,16 @@ calculate(none, _, _) -> {none, 0}; calculate(Shaper, 0, _) -> {Shaper, 0}; -calculate(#token_bucket_shaper{shape = {MaximumTokens, Rate}, +calculate(#token_bucket_shaper{shape = {MaximumTokens, Rate, TimeUnit}, available_tokens = LastAvailableTokens, last_update = NativeLastUpdate, - debt = LastDebt} = Shaper, TokensNowUsed, NativeNow) -> - NativeTimeSinceLastUpdate = NativeNow - NativeLastUpdate, + debt = OverPenalisedInUnitsLastTime} = Shaper, TokensNowUsed, NativeNow) -> - %% This is now a float and so will all below be, to preserve best rounding errors possible - TimeSinceLastUpdate = convert_native_to_ms(NativeTimeSinceLastUpdate) + LastDebt, + TimeDiffInUnits = timediff_in_units(TimeUnit, NativeLastUpdate, NativeNow), - %% How much we might have recovered since last time - AvailableAtGrowthRate = Rate * TimeSinceLastUpdate, - UnboundedTokenGrowth = LastAvailableTokens + AvailableAtGrowthRate, + UnboundedTokens = unbounded_available_tokens(Rate, LastAvailableTokens, TimeDiffInUnits), - %% Real recovery cannot grow higher than the actual rate in the window frame - ExactlyAvailableNow = min(MaximumTokens, UnboundedTokenGrowth), + ExactlyAvailableNow = exactly_available_tokens(MaximumTokens, UnboundedTokens), %% How many are available after using TokensNowUsed can't be smaller than zero TokensAvailable = max(0, ExactlyAvailableNow - TokensNowUsed), @@ -132,27 +140,81 @@ calculate(#token_bucket_shaper{shape = {MaximumTokens, Rate}, TokensOverused = max(0, TokensNowUsed - ExactlyAvailableNow), %% And then MaybeDelay will be zero if TokensOverused was zero - MaybeDelayMs = TokensOverused / Rate, + OverUsedRateNow = TokensOverused / Rate, + + NewShaper = Shaper#token_bucket_shaper{available_tokens = TokensAvailable}, + Punish = OverUsedRateNow - OverPenalisedInUnitsLastTime, + final_state(NewShaper, TimeUnit, Punish, NativeNow). + +-spec timediff_in_units(time_unit(), integer(), integer()) -> float(). +timediff_in_units(TimeUnit, NativeLastUpdate, NativeNow) -> + %% Time difference between now and the last update, in native + NativeTimeSinceLastUpdate = NativeNow - NativeLastUpdate, + %% This is now a float and so will all below be, to preserve best rounding errors possible + convert_time_unit(NativeTimeSinceLastUpdate, native, TimeUnit). - %% We penalise rounding up, the most important contract is that rate will never exceed that - %% requested, but the same way timeouts in Erlang promise not to arrive any time earlier but - %% don't promise at what time in the future they would arrive, nor we promise any upper bound - %% to the limits of the shaper delay. - RoundedDelayMs = ceil(MaybeDelayMs), +%% Unbounded growth is calculated, with float precision, in the configured time units +%% +%% Note that it can be negative, if earlier we have penalised giving a larger `last_update' and now we +%% update even before we have reach the point in time where the previous `last_update' was set +%% +%% If the growth was negative, that means that it has grown the debt instead +-spec unbounded_available_tokens(rate(), tokens(), float()) -> float(). +unbounded_available_tokens(Rate, LastAvailableTokens, TimeDiffInUnits) -> + %% How much we might have recovered since last time + AvailableAtGrowthRate = Rate * TimeDiffInUnits, + %% Unbounded growth at rate since the last update + LastAvailableTokens + AvailableAtGrowthRate. - NewShaper = Shaper#token_bucket_shaper{available_tokens = TokensAvailable, - last_update = NativeNow + RoundedDelayMs, - debt = RoundedDelayMs - MaybeDelayMs}, - {NewShaper, RoundedDelayMs}. +%% This is the real growth considering the maximum bucket size. +-spec exactly_available_tokens(bucket_size(), float()) -> float(). +exactly_available_tokens(MaximumTokens, UnboundedTokens) -> + %% Real recovery cannot grow higher than the actual rate in the window frame + ExactlyAvailableNow0 = min(MaximumTokens, UnboundedTokens), + %% But it can't be negative either which can happen if we were already in debt, + %% but this is a debt we will pay when we calculate the final punishment in final_state + max(+0.0, ExactlyAvailableNow0). + +%% We penalise rounding up, the most important contract is that rate will never exceed that +%% requested, but the same way timeouts in Erlang promise not to arrive any time earlier but +%% don't promise at what time in the future they would arrive, nor we promise any upper bound +%% to the limits of the shaper delay. +%% +%% Two cases, either: +%% Punish is positive: even after paying the old debt you incur a new debt again +%% Punish is negative: I overpenalised you last time, you get off now but with a future bill +final_state(Shaper, TimeUnit, Punish, NativeNow) when Punish >= +0.0 -> + DelayMs = convert_time_unit(Punish, TimeUnit, millisecond), + RoundedDelayMs = ceil(DelayMs), + Debt = RoundedDelayMs - DelayMs, + DebtInUnits = convert_time_unit(Debt, millisecond, TimeUnit), + DelayNative = convert_time_unit(RoundedDelayMs, TimeUnit, native), + RoundedDelayNative = ceil(DelayNative), + NewShaper = Shaper#token_bucket_shaper{last_update = NativeNow + RoundedDelayNative, + debt = DebtInUnits}, + {NewShaper, RoundedDelayMs}; +final_state(Shaper, TimeUnit, Punish, NativeNow) when Punish < +0.0 -> + DebtInUnits = convert_time_unit(-Punish, millisecond, TimeUnit), + NewShaper = Shaper#token_bucket_shaper{last_update = NativeNow, + debt = DebtInUnits}, + {NewShaper, 0}. %% Avoid rounding errors by using floats and float division, -%% erlang:convert_native_to_ms works only with integers --spec convert_native_to_ms(number()) -> float(). -convert_native_to_ms(Time) -> - time_unit_multiplier(millisecond) * Time / time_unit_multiplier(native). +%% erlang:convert_time_unit works only with integers +-spec convert_time_unit(number(), time_unit(), time_unit()) -> float(). +convert_time_unit(Time, SameUnit, SameUnit) -> Time; +convert_time_unit(Time, FromUnit, ToUnit) -> + time_unit_multiplier(ToUnit) * Time / time_unit_multiplier(FromUnit). -compile({inline, [time_unit_multiplier/1]}). +-spec time_unit_multiplier(time_unit()) -> pos_integer(). time_unit_multiplier(native) -> erts_internal:time_unit(); +time_unit_multiplier(nanosecond) -> + 1000*1000*1000; +time_unit_multiplier(microsecond) -> + 1000*1000; time_unit_multiplier(millisecond) -> - 1000. + 1000; +time_unit_multiplier(second) -> + 1. diff --git a/src/opuntia.hrl b/src/opuntia.hrl index 94f89c3..83fe159 100644 --- a/src/opuntia.hrl +++ b/src/opuntia.hrl @@ -6,7 +6,6 @@ available_tokens :: opuntia:tokens(), last_update :: integer(), debt :: float() %% Always in the range [0.0, 1.0] - %% Signifies the unnecesary number of milliseconds of penalisation }). -endif. diff --git a/test/opuntia_SUITE.erl b/test/opuntia_SUITE.erl index 7ad91df..fe85d69 100644 --- a/test/opuntia_SUITE.erl +++ b/test/opuntia_SUITE.erl @@ -30,7 +30,14 @@ groups() -> run_shaper_without_consuming_does_not_delay, run_basic_shaper_property, run_stateful_server - ]} + ]}, + %% This group is purposefully left out because it is too slow to run on CI, + %% and uses a `timer:tc/4` only available since OTP26 + %% Here for the record, can be enabled locally and checked + {delays, [sequence], + [ + run_with_delays + ]} ]. %%%=================================================================== @@ -71,8 +78,13 @@ keep_table() -> %%%=================================================================== simple_test_no_delay_is_needed(_) -> + Units = [second, millisecond, microsecond, nanosecond, native], + [ simple_test_no_delay_is_needed_for_unit(Unit) || Unit <- Units ]. + +simple_test_no_delay_is_needed_for_unit(Unit) -> FoldFun = fun(N, ShIn) -> {ShOut, 0} = opuntia:update(ShIn, N), ShOut end, - Shaper = opuntia:new(#{bucket_size => 100000, rate => 10000, start_full => true}), + Config = #{bucket_size => 100000, rate => 10000, time_unit => Unit, start_full => true}, + Shaper = opuntia:new(Config), lists:foldl(FoldFun, Shaper, lists:duplicate(10000, 1)). run_shaper_with_zero_does_not_shape(_) -> @@ -93,6 +105,25 @@ run_shaper_without_consuming_does_not_delay(_) -> end), run_prop(?FUNCTION_NAME, Prop, 1000, 2). +run_with_delays(_) -> + S = "ToConsume ~p, Shape ~p, TimeItTook ~p, CalculatedDelay ~p ms, in range [~p, ~p]ms, ~nLastShaper ~p,~nHistory ~p", + Prop = ?FORALL( + {TokensToSpend, Shape}, + {tokens(), shape()}, + begin + Shaper = opuntia:new(Shape), + {TimeItTookUs, {LastShaper, History, CalculatedDelay}} = + timer:tc(fun run_with_sleeps/2, [Shaper, TokensToSpend], native), + TimeItTookMs = opuntia:convert_time_unit(TimeItTookUs, native, millisecond), + {CannotBeFasterThan, CannotBeSlowerThan} = should_take_in_range(Shape, TokensToSpend), + AdjustCannotBeSlowerThan = CannotBeSlowerThan + 10, + Val = value_in_range(TimeItTookMs, CannotBeFasterThan, AdjustCannotBeSlowerThan), + P = [TokensToSpend, Shape, TimeItTookMs, CalculatedDelay, CannotBeFasterThan, + AdjustCannotBeSlowerThan, LastShaper, History], + success_or_log_and_return(Val andalso is_integer(CalculatedDelay), S, P) + end), + run_prop(?FUNCTION_NAME, Prop, 100, 1). + run_basic_shaper_property(_) -> S = "ToConsume ~p, Shape ~p, CalculatedDelay ~p ms, in range [~p, ~p]ms, ~nLastShaper ~p,~nHistory ~p", Prop = ?FORALL( @@ -165,9 +196,10 @@ do_postcondition(State, Server, Key, Tokens, Res) -> TokensNowConsumed = tokens_now_consumed(State, Key, Tokens), {MinimumExpectedMs, _} = should_take_in_range(Shape, TokensNowConsumed), Duration = Now - Start, - ct:pal("For shape ~p, requested ~p, expected ~p and duration ~p~n", - [Shape, Tokens, MinimumExpectedMs, Duration]), - continue =:= Res andalso MinimumExpectedMs =< Duration. + S = "For shape ~p, consumed ~p, expected-min-time ~f and it took duration ~B~n", + P = [{maps:get(rate, Shape), maps:get(time_unit, Shape)}, Tokens, floor(MinimumExpectedMs), Duration], + Val = continue =:= Res andalso floor(MinimumExpectedMs) =< Duration, + success_or_log_and_return(Val, S, P). next_state(_State, _Result, {call, ?MODULE, reset_shapers, [_Server]}) -> #{}; @@ -201,25 +233,29 @@ key() -> tokens() -> integer(1, 99999). +time_unit() -> + oneof([second, millisecond, microsecond, nanosecond, native]). + config() -> union([shape_for_server(), function(0, shape_for_server())]). shape_for_server() -> - ShapeGen = {integer(1, 9999), integer(1, 9999)}, - ?LET({M, N}, ShapeGen, - begin - #{bucket_size => max(M, N), - rate => min(M, N), - start_full => true} - end). + %% server is slower and proper struggles with bigger numbers, not critical + ShapeGen = {integer(1, 999), integer(1, 999), time_unit(), boolean()}, + let_shape(ShapeGen). shape() -> - ShapeGen = {integer(1, 99999), integer(1, 99999), boolean()}, + Int = integer(1, 99999), + ShapeGen = {Int, Int, time_unit(), boolean()}, + let_shape(ShapeGen). + +let_shape(ShapeGen) -> ?LET(Shape, ShapeGen, begin - {M, N, StartFull} = Shape, + {M, N, TimeUnit, StartFull} = Shape, #{bucket_size => max(M, N), rate => min(M, N), + time_unit => TimeUnit, start_full => StartFull} end). @@ -233,34 +269,58 @@ success_or_log_and_return(false, S, P) -> ct:pal(S, P), false. -should_take_in_range(#{rate := Rate, start_full := false}, ToConsume) -> - ExpectedMs = ToConsume / Rate, - {ExpectedMs, ExpectedMs + 1}; +should_take_in_range(#{rate := Rate, time_unit := TimeUnit, start_full := false}, ToConsume) -> + Expected = ToConsume / Rate, + ExpectedMs = opuntia:convert_time_unit(Expected, TimeUnit, millisecond), + {ExpectedMs, ceil(ExpectedMs + 1)}; should_take_in_range(#{bucket_size := MaximumTokens, rate := Rate, + time_unit := TimeUnit, start_full := true}, ToConsume) -> case ToConsume < MaximumTokens of - true -> {0, 0}; + true -> {0, 1}; false -> ToThrottle = ToConsume - MaximumTokens, - ExpectedMs = ToThrottle / Rate, - {ExpectedMs, ExpectedMs + 1} + Expected = ToThrottle / Rate, + ExpectedMs = opuntia:convert_time_unit(Expected, TimeUnit, millisecond), + {ExpectedMs, ceil(ExpectedMs + 1)} end. +run_with_sleeps(Shaper, ToConsume) -> + run_with_sleeps(Shaper, [], 0, ToConsume). + +run_with_sleeps(Shaper, History, AccumulatedDelay, TokensLeft) when TokensLeft =< 0 -> + {Shaper, lists:reverse(History), AccumulatedDelay}; +run_with_sleeps(Shaper, History, AccumulatedDelay, TokensLeft) -> + ConsumeNow = rand:uniform(TokensLeft), + {NewShaper, DelayMs} = opuntia:update(Shaper, ConsumeNow), + timer:sleep(DelayMs), + NewEvent = #{consumed => ConsumeNow, proposed_delay => DelayMs, shaper => Shaper}, + NewHistory = [NewEvent | History], + NewDelay = AccumulatedDelay + DelayMs, + run_with_sleeps(NewShaper, NewHistory, NewDelay, TokensLeft - ConsumeNow). + run_shaper(Shape, ToConsume) -> - Now = erlang:monotonic_time(), - Shaper = opuntia:create(Shape, Now), - run_shaper(Shaper, Now, [], 0, ToConsume). + Shaper = opuntia:create(Shape, 0), + run_shaper(Shaper, [], 0, ToConsume). -run_shaper(Shaper, _, History, AccumulatedDelay, 0) -> +run_shaper(Shaper, History, AccumulatedDelay, 0) -> {Shaper, lists:reverse(History), AccumulatedDelay}; -run_shaper(Shaper, Now, History, AccumulatedDelay, TokensLeft) -> +run_shaper(Shaper, History, AccumulatedDelay, TokensLeft) -> %% Uniform distributes in [1, N], and we want [0, N], so we generate [1, N+1] and subtract 1 ConsumeNow = rand:uniform(TokensLeft + 1) - 1, - {NewShaper, DelayMs} = opuntia:calculate(Shaper, ConsumeNow, Now), - true = DelayMs >= 0, - run_shaper(NewShaper, Now, [{ConsumeNow, DelayMs} | History], AccumulatedDelay + DelayMs, TokensLeft - ConsumeNow). + {NewShaper, DelayMs} = opuntia:calculate(Shaper, ConsumeNow, 0), + NewEvent = #{consumed => ConsumeNow, proposed_delay => DelayMs, final_shaper => NewShaper}, + NewHistory = [NewEvent | History], + NewDelay = AccumulatedDelay + DelayMs, + NewToConsume = TokensLeft - ConsumeNow, + case is_integer(DelayMs) andalso DelayMs >= 0 of + true -> + run_shaper(NewShaper, NewHistory, NewDelay, NewToConsume); + false -> + {NewShaper, NewHistory, bad_delay} + end. run_prop(PropName, Property, NumTests, WorkersPerScheduler) -> Opts = [quiet, noshrink, {start_size, 1}, {numtests, NumTests},