Skip to content

Commit

Permalink
feat: Add rate_limit_linear function for controlled stream processing
Browse files Browse the repository at this point in the history
  • Loading branch information
ThatScalaGuy committed Jan 1, 2025
1 parent c3cfb34 commit 8d66547
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ All notable changes to this project will be documented in this file.
### Added
- Initial release candidate 3
- Add `from_state_eval` source function
- Add `rate_limit_linear` pipe function

### Changed
- None
Expand Down
66 changes: 66 additions & 0 deletions src/gs.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -1811,6 +1811,72 @@ pub fn flatten(stream: Stream(Stream(a))) -> Stream(a) {
})
}

/// Implements linear rate limiting for a stream by controlling the number of elements processed per time interval.
///
/// ## Example
/// ```gleam
/// > from_range(1, 10)
/// |> rate_limit_linear(
/// rate: 2, // 2 elements
/// interval_ms: 1000 // per second
/// )
/// |> to_list()
/// // Returns [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] with constant delays
/// // Processes 2 elements per second
/// ```
///
/// ## Visual Representation
/// ```
/// Input: [1]-->[2]-->[3]-->[4]-->[5]-->[6]-->...
/// | | | | | |
/// 500ms 500ms 500ms 500ms 500ms 500ms (constant delays)
/// | | | | | |
/// Output: [1]-->[2]-->[3]-->[4]-->[5]-->[6]-->...
/// |<-1 s.-> | <- 1 s. -> |
/// (2 items) (2 items)
/// ```
///
/// ## When to Use
/// - When implementing consistent rate limiting for API calls
/// - For controlling throughput to external services
/// - When processing needs to match a specific rate requirement
/// - For implementing SLA compliance
/// - When preventing system overload
/// - For simulating time-constrained processing
/// - When implementing quota-based systems
///
/// ## Description
/// The `rate_limit_linear` function creates a rate-limited stream that processes
/// a specified number of elements within a given time interval. Unlike exponential
/// backoff, this maintains a constant rate of processing, making it suitable for
/// scenarios where consistent throughput is required.
///
/// Parameters:
/// - rate: Number of elements to process per interval
/// - interval_ms: Time interval in milliseconds
///
/// The function ensures that:
/// - Elements are processed at a constant rate
/// - Delays are evenly distributed across the interval
/// - Processing matches the specified throughput
/// - Rate limiting is precise and predictable
///
/// This is particularly useful for:
/// - API rate limiting compliance
/// - Resource utilization control
/// - Service level agreement implementation
/// - System load management
/// - Throughput optimization
/// - Quota enforcement
pub fn rate_limit_linear(
stream: Stream(a),
rate: Int,
interval_ms: Int,
) -> Stream(a) {
let delay = interval_ms / rate
from_tick(delay) |> zip(stream) |> map(fn(value) { value.1 })
}

/// Creates a stream that terminates when encountering a None value.
///
/// ## Example
Expand Down
53 changes: 34 additions & 19 deletions test/gs_test.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -422,44 +422,59 @@ pub fn split_test() {
|> gs.to_list
|> should.equal([])
}

pub fn from_state_eval_test() {
// Basic counter example
gs.from_state_eval(
0,
fn(state) { #(state, state + 1) },
)
gs.from_state_eval(0, fn(state) { #(state, state + 1) })
|> gs.take(3)
|> gs.to_list
|> should.equal([0, 1, 2])

// Fibonacci sequence example
gs.from_state_eval(
#(0, 1),
fn(state) {
let #(current, next) = state
#(current, #(next, current + next))
},
)
gs.from_state_eval(#(0, 1), fn(state) {
let #(current, next) = state
#(current, #(next, current + next))
})
|> gs.take(6)
|> gs.to_list
|> should.equal([0, 1, 1, 2, 3, 5])

// String state example
gs.from_state_eval(
"a",
fn(state) { #(state, state <> "a") },
)
gs.from_state_eval("a", fn(state) { #(state, state <> "a") })
|> gs.take(3)
|> gs.to_list
|> should.equal(["a", "aa", "aaa"])

// Empty stream if never pulled
gs.from_state_eval(
0,
fn(state) { #(state, state + 1) },
)
gs.from_state_eval(0, fn(state) { #(state, state + 1) })
|> gs.take(0)
|> gs.to_list
|> should.equal([])
}

pub fn rate_limit_linear_test() {
// Test basic functionality - should emit 2 elements per second
gs.from_list([1, 2, 3, 4])
|> gs.rate_limit_linear(2, 1000)
|> gs.take(4)
|> gs.to_list
|> should.equal([1, 2, 3, 4])

// Test empty stream
gs.from_empty()
|> gs.rate_limit_linear(2, 1000)
|> gs.to_list
|> should.equal([])

// Test single element
gs.from_pure(1)
|> gs.rate_limit_linear(1, 1000)
|> gs.to_list
|> should.equal([1])

// Test with faster rate (500ms interval)
gs.from_list([1, 2])
|> gs.rate_limit_linear(2, 500)
|> gs.to_list
|> should.equal([1, 2])
}

0 comments on commit 8d66547

Please sign in to comment.