Skip to content

Commit

Permalink
feat: Add bracket pipe function for resource management and `concur…
Browse files Browse the repository at this point in the history
…rently` function for parallel stream processing
  • Loading branch information
ThatScalaGuy committed Jan 8, 2025
1 parent 38ad631 commit 6ebbcf0
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 1 deletion.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Changelog
All notable changes to this project will be documented in this file.

## [1.1.0-beta.1] - next
- Add `bracket` pipe function


## [1.0.0] - 7. Jan 2024

### Added
Expand Down
3 changes: 2 additions & 1 deletion gleam.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name = "gs"
version = "1.0.0"
version = "1.1.0-beta.1"

# Fill out these fields if you intend to generate HTML documentation or publish
# your project to the Hex package manager.
Expand All @@ -16,6 +16,7 @@ repository = { type = "github", user = "ThatScalaGuy", repo = "gs" }
gleam_stdlib = ">= 0.34.0 and < 2.0.0"
gleam_erlang = ">= 0.33.0 and < 1.0.0"
gleam_otp = ">= 0.16.0 and < 1.0.0"
glisten = ">= 7.0.0 and < 8.0.0"

[dev-dependencies]
gleeunit = ">= 1.0.0 and < 2.0.0"
4 changes: 4 additions & 0 deletions manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ packages = [
{ name = "gleam_otp", version = "0.16.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "FA0EB761339749B4E82D63016C6A18C4E6662DA05BAB6F1346F9AF2E679E301A" },
{ name = "gleam_stdlib", version = "0.51.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "14AFA8D3DDD7045203D422715DBB822D1725992A31DF35A08D97389014B74B68" },
{ name = "gleeunit", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "F7A7228925D3EE7D0813C922E062BFD6D7E9310F0BEE585D3A42F3307E3CFD13" },
{ name = "glisten", version = "7.0.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_otp", "gleam_stdlib", "logging", "telemetry"], otp_app = "glisten", source = "hex", outer_checksum = "028C0882EAC7ABEDEFBE92CE4D1FEDADE95FA81B1B1AB099C4F91C133BEF2C42" },
{ name = "logging", version = "1.3.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "logging", source = "hex", outer_checksum = "1098FBF10B54B44C2C7FDF0B01C1253CAFACDACABEFB4B0D027803246753E06D" },
{ name = "telemetry", version = "1.3.0", build_tools = ["rebar3"], requirements = [], otp_app = "telemetry", source = "hex", outer_checksum = "7015FC8919DBE63764F4B4B87A95B7C0996BD539E0D499BE6EC9D7F3875B79E6" },
]

[requirements]
gleam_erlang = { version = ">= 0.33.0 and < 1.0.0" }
gleam_otp = { version = ">= 0.16.0 and < 1.0.0" }
gleam_stdlib = { version = ">= 0.34.0 and < 2.0.0" }
gleeunit = { version = ">= 1.0.0 and < 2.0.0" }
glisten = { version = ">= 7.0.0 and < 8.0.0" }
147 changes: 147 additions & 0 deletions src/gs.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,51 @@ pub fn find(stream: Stream(a), pred: fn(a) -> Bool) -> Stream(a) {
stream |> filter(pred) |> take(1)
}

/// Processes nested streams concurrently and discards all results.
///
/// ## Example
/// ```gleam
/// > [[1, 2], [3, 4], [5, 6]]
/// |> from_list
/// |> map(from_list)
/// |> concurrently
/// // Processes all streams in parallel, returns Stream(Nil)
/// ```
///
/// ## Visual Representation
/// ```
/// Input streams: [Stream1]-->[Stream2]-->[Stream3]-->|
/// | | |
/// v v v
/// Parallel processing: [1,2] [3,4] [5,6]
/// ↓ ↓ ↓ ↓ ↓ ↓
/// Nil Nil Nil Nil Nil Nil
/// ```
///
/// ## When to Use
/// - When you need to process multiple streams in parallel
/// - For implementing concurrent operations with side effects
/// - When processing order doesn't matter
/// - For maximizing throughput with independent streams
/// - When implementing parallel processing pipelines
///
/// ## Description
/// The `concurrently` function takes a stream of streams and processes each
/// inner stream concurrently using tasks. Results are discarded, making it
/// suitable for operations where only side effects matter. The function
/// returns a stream that yields Nil for each completed inner stream.
pub fn concurrently(streams: Stream(Stream(a))) -> Stream(Nil) {
Stream(pull: fn() {
case streams.pull() {
Some(#(stream, rest)) -> {
task.async(fn() { stream |> to_nil })
Some(#(Nil, concurrently(rest)))
}
None -> None
}
})
}

/// Drops (skips) the first `n` elements from a stream.
///
/// ## Example
Expand Down Expand Up @@ -1403,6 +1448,108 @@ pub fn zip(left: Stream(a), right: Stream(b)) -> Stream(#(a, b)) {
})
}

/// Creates a stream where each element is paired with a resource that is automatically
/// cleaned up when the stream ends.
///
/// ## Example
/// ```gleam
/// // Example using a file handle as a resource
/// let file_stream =
/// from_range(1, 3)
/// |> bracket(
/// acquire: fn() { open_file("log.txt") },
/// cleanup: fn(file) { close_file(file) }
/// )
/// |> map(fn(pair) {
/// let #(file, number) = pair
/// write_to_file(file, number)
/// number
/// })
/// |> to_list
/// // File is automatically closed after processing
/// ```
///
/// ## Visual Representation
/// ```
/// Input: [1]-------->[2]-------->[3]------->|
/// | | | |
/// acquire() use resource use resource cleanup()
/// | | | |
/// v v v v
/// #(res,1)--->#(res,2)--->#(res,3)---->|
///
/// Resource Lifecycle
/// +------------------------------------------------+
/// | |
/// | acquire -> use -> use -> use -> cleanup |
/// | ↑ ↑ ↑ ↑ ↑ |
/// | | | | | | |
/// +------------------------------------------------+
/// ```
/// Where:
/// - `|` represents the end of the stream
/// - `res` is the acquired resource
/// - Resource is shared across all elements
/// - Cleanup occurs at stream end
///
/// ## When to Use
/// - When working with resources that need proper cleanup (files, connections)
/// - For implementing resource-safe stream processing
/// - When ensuring resource cleanup regardless of stream completion
/// - For managing stateful resources across stream operations
/// - When implementing transactions or session-based processing
/// - For handling connection pools or shared resources
/// - When implementing resource-bound processing pipelines
///
/// ## Description
/// The `bracket` function creates a resource-managed stream by:
/// 1. Acquiring a resource using the provided `acquire` function
/// 2. Pairing each stream element with the resource
/// 3. Ensuring resource cleanup using the `cleanup` function when the stream ends
///
/// This pattern is particularly useful for:
/// - File operations (open/close)
/// - Database transactions (begin/commit/rollback)
/// - Network connections (connect/disconnect)
/// - System resources (allocate/deallocate)
///
/// The function guarantees that:
/// - Resource is acquired exactly once at start
/// - Resource is available for each element
/// - Cleanup occurs exactly once at end
/// - Cleanup happens even if stream is not fully consumed
/// - Resource management is handled automatically
///
/// This provides a safe way to work with resources in a streaming context,
/// ensuring proper cleanup and preventing resource leaks.
pub fn bracket(
stream: Stream(a),
acquire acquire: fn() -> resource,
cleanup cleanup: fn(resource) -> Nil,
) -> Stream(#(resource, a)) {
Stream(pull: fn() {
let resource = acquire()
bracket_loop(stream, resource, cleanup).pull()
})
}

fn bracket_loop(
stream: Stream(a),
resource: resource,
cleanup: fn(resource) -> Nil,
) -> Stream(#(resource, a)) {
Stream(pull: fn() {
case stream.pull() {
Some(#(value, next)) ->
Some(#(#(resource, value), bracket_loop(next, resource, cleanup)))
None -> {
cleanup(resource)
None
}
}
})
}

/// Zips two streams together using a function to combine their elements.
///
/// ## Example
Expand Down
61 changes: 61 additions & 0 deletions test/gs_test.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -604,3 +604,64 @@ pub fn buffer_test() {
|> gs.to_list
|> should.equal([1, 2, 3])
}

pub fn bracket_test() {
// Test counter for tracking resource cleanup
let counter = process.new_subject()

// Basic bracket usage with resource tracking
gs.from_list([1, 2, 3])
|> gs.bracket(
acquire: fn() {
process.send(counter, "acquired")
"resource"
},
cleanup: fn(_resource) {
process.send(counter, "cleaned")
Nil
},
)
|> gs.map(fn(pair) { pair.1 })
|> gs.to_list
|> should.equal([1, 2, 3])

// Verify resource lifecycle (acquire once, cleanup once)
process.receive(counter, 0)
|> should.equal(Ok("acquired"))
process.receive(counter, 0)
|> should.equal(Ok("cleaned"))

// Test with empty stream
gs.from_empty()
|> gs.bracket(
acquire: fn() {
process.send(counter, "acquired")
"resource"
},
cleanup: fn(_resource) {
process.send(counter, "cleaned")
Nil
},
)
|> gs.to_list
|> should.equal([])

// Verify resource cleanup for empty stream
process.receive(counter, 0)
|> should.equal(Ok("acquired"))
process.receive(counter, 0)
|> should.equal(Ok("cleaned"))

// Test with take operation
gs.from_counter(1)
|> gs.bracket(
acquire: fn() {
process.send(counter, "acquired")
"resource"
},
cleanup: fn(_resource) {
process.send(counter, "cleaned")
Nil
},
)
}

0 comments on commit 6ebbcf0

Please sign in to comment.