Skip to content

Commit

Permalink
feat: Add rate_limit_backoff function for exponential backoff stream …
Browse files Browse the repository at this point in the history
…processing
  • Loading branch information
ThatScalaGuy committed Jan 1, 2025
1 parent 8d66547 commit fb1bd1b
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ All notable changes to this project will be documented in this file.
- Initial release candidate 3
- Add `from_state_eval` source function
- Add `rate_limit_linear` pipe function
- Add `rate_limit_backoff` pipe function

### Changed
- None
Expand Down
80 changes: 80 additions & 0 deletions src/gs.gleam
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import gleam/dict.{type Dict}
import gleam/erlang/process.{type Subject}
import gleam/float
import gleam/int
import gleam/io
import gleam/list
import gleam/option.{type Option, None, Some}
Expand Down Expand Up @@ -1811,6 +1813,84 @@ pub fn flatten(stream: Stream(Stream(a))) -> Stream(a) {
})
}

/// Implements exponential backoff rate limiting for a stream by dynamically adjusting
/// delays between elements based on processing history.
///
/// ## Example
/// ```gleam
/// > from_range(1, 10)
/// |> rate_limit_backoff(
/// initial_delay_ms: 100, // Start with 100ms delay
/// max_delay_ms: 1000, // Cap at 1 second delay
/// factor: 2.0 // Double delay on each retry
/// )
/// |> to_list()
/// // Returns [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] with increasing delays
/// ```
///
/// ## Visual Representation
/// ```
/// Input: [1]-->[2]-->[3]-->[4]-->[5]-->[6]-->...
/// | | | | | |
/// 100ms 200ms 400ms 800ms 1s 1s (exponential delays)
/// | | | | | |
/// Output: [1]-->[2]-->[3]-->[4]-->[5]-->[6]-->...
/// ```
///
/// ## When to Use
/// - When implementing retry logic with increasing delays
/// - For handling rate-limited APIs with automatic backoff
/// - When implementing fault-tolerant stream processing
/// - For graceful degradation under load
/// - When implementing adaptive rate limiting
/// - For implementing congestion control mechanisms
///
/// ## Description
/// The `rate_limit_backoff` function creates a rate-limited stream that automatically
/// adjusts delays between elements using exponential backoff. Unlike linear rate
/// limiting, this approach increases delays exponentially up to a maximum value,
/// making it suitable for scenarios where adaptive rate limiting is needed.
///
/// Parameters:
/// - initial_delay_ms: Starting delay between elements
/// - max_delay_ms: Maximum delay cap
/// - factor: Multiplication factor for delay increase
///
/// The function ensures that:
/// - Delays start at initial_delay_ms
/// - Each delay is multiplied by factor
/// - Delays are capped at max_delay_ms
///
/// This is particularly useful for:
/// - API rate limit compliance
/// - Retry mechanisms
/// - Load balancing
/// - Error recovery
/// - System protection
pub fn rate_limit_backoff(
stream: Stream(a),
initial_delay_ms: Int,
max_delay_ms: Int,
factor: Float,
) -> Stream(a) {
let backoff_stream =
from_state_eval(initial_delay_ms, fn(current_delay) {
let next_delay =
int.min(
max_delay_ms,
float.truncate(float.floor(int.to_float(current_delay) *. factor)),
)
#(current_delay, next_delay)
})

stream
|> zip_with(backoff_stream, fn(value, delay) {
process.sleep(delay)
value
})
}

/// TODO: rate_limit
/// Implements linear rate limiting for a stream by controlling the number of elements processed per time interval.
///
/// ## Example
Expand Down

0 comments on commit fb1bd1b

Please sign in to comment.