Skip to content

Commit

Permalink
feat: Add count function for tracking elements in stream processing
Browse files Browse the repository at this point in the history
  • Loading branch information
ThatScalaGuy committed Jan 3, 2025
1 parent 904c690 commit 522dcb4
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ All notable changes to this project will be documented in this file.
- Add `from_state_eval` source function
- Add `rate_limit_linear` pipe function
- Add `rate_limit_backoff` pipe function
= Add `count` pipe function

### Changed
- None
Expand Down
56 changes: 56 additions & 0 deletions src/gs.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -1751,6 +1751,62 @@ pub fn sleep(stream: Stream(a), delay_ms: Int) -> Stream(a) {
})
}

/// Counts elements as they pass through the stream, yielding tuples of elements and their count.
///
/// ## Example
/// ```gleam
/// > from_range(1, 3)
/// |> count()
/// |> to_list()
/// [#(1, 1), #(2, 2), #(3, 3)]
/// ```
///
/// ## Visual Representation
/// ```
/// Input: [1]---->[2]---->[3]-->|
/// | | |
/// count=1 count=2 count=3
/// | | |
/// v v v
/// Output: [(1,1)]->[(2,2)]->[(3,3)]-->|
/// ```
/// Where:
/// - `|` represents the end of the stream
/// - Each element is paired with its count
/// - Count increases monotonically
///
/// ## When to Use
/// - When you need to track how many elements have been processed
/// - For implementing progress tracking in stream processing
/// - When you need element indices in a stream
/// - For debugging or monitoring stream flow
/// - When implementing stateful stream operations
/// - For adding sequence numbers to stream elements
///
/// ## Description
/// The `count` function transforms a stream by pairing each element with a
/// running count of how many elements have been processed. It maintains an
/// internal counter that increments for each element, starting from 1.
///
/// Key characteristics:
/// - Preserves original elements
/// - Adds monotonic counting
/// - Processes elements lazily
/// - Count starts at 1
/// - Returns tuples of (element, count)
pub fn count(stream: Stream(a)) -> Stream(#(a, Int)) {
count_loop(stream, 1)
}

fn count_loop(stream: Stream(a), counter: Int) -> Stream(#(a, Int)) {
Stream(pull: fn() {
case stream.pull() {
Some(#(value, next)) -> Some(#(#(value, counter), count_loop(next, counter + 1)))
None -> None
}
})
}

/// Flattens a stream of streams into a single stream.
///
/// ## Example
Expand Down
26 changes: 26 additions & 0 deletions test/gs_test.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -478,3 +478,29 @@ pub fn rate_limit_linear_test() {
|> gs.to_list
|> should.equal([1, 2])
}
pub fn count_test() {
// Basic counting
gs.from_list([1, 2, 3])
|> gs.count()
|> gs.to_list
|> should.equal([#(1, 1), #(2, 2), #(3, 3)])

// Empty stream
gs.from_empty()
|> gs.count()
|> gs.to_list
|> should.equal([])

// Single element
gs.from_pure(42)
|> gs.count()
|> gs.to_list
|> should.equal([#(42, 1)])

// Test with take
gs.from_counter(1)
|> gs.count()
|> gs.take(3)
|> gs.to_list
|> should.equal([#(1, 1), #(2, 2), #(3, 3)])
}

0 comments on commit 522dcb4

Please sign in to comment.