Skip to content

Commit

Permalink
feat: Add example project with parallel processing and update .gitign…
Browse files Browse the repository at this point in the history
…ore files
  • Loading branch information
ThatScalaGuy committed Jan 13, 2025
1 parent 84a525e commit c271ccb
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 42 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
*.ez
/build
erl_crash.dump
INTERNAL.md
INTERNAL.md
.vscode
4 changes: 2 additions & 2 deletions examples/05-pi/manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
packages = [
{ name = "gleam_erlang", version = "0.33.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "A1D26B80F01901B59AABEE3475DD4C18D27D58FA5C897D922FCB9B099749C064" },
{ name = "gleam_otp", version = "0.16.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "FA0EB761339749B4E82D63016C6A18C4E6662DA05BAB6F1346F9AF2E679E301A" },
{ name = "gleam_stdlib", version = "0.51.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "14AFA8D3DDD7045203D422715DBB822D1725992A31DF35A08D97389014B74B68" },
{ name = "gleam_stdlib", version = "0.52.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "50703862DF26453B277688FFCDBE9DD4AC45B3BD9742C0B370DB62BC1629A07D" },
{ name = "gleeunit", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "F7A7228925D3EE7D0813C922E062BFD6D7E9310F0BEE585D3A42F3307E3CFD13" },
{ name = "gs", version = "0.0.9", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_otp", "gleam_stdlib"], source = "local", path = "../.." },
{ name = "gs", version = "1.1.0-beta.1", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_otp", "gleam_stdlib"], source = "local", path = "../.." },
]

[requirements]
Expand Down
4 changes: 4 additions & 0 deletions examples/06-par/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
*.beam
*.ez
/build
erl_crash.dump
10 changes: 10 additions & 0 deletions examples/06-par/gleam.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
name = "example"
version = "1.0.0"
description = "Gleam Stream wxample"

[dependencies]
gleam_stdlib = ">= 0.34.0 and < 2.0.0"
gs = { path = "../.." }

[dev-dependencies]
gleeunit = ">= 1.0.0 and < 2.0.0"
15 changes: 15 additions & 0 deletions examples/06-par/manifest.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# This file was generated by Gleam
# You typically do not need to edit this file

packages = [
{ name = "gleam_erlang", version = "0.33.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "A1D26B80F01901B59AABEE3475DD4C18D27D58FA5C897D922FCB9B099749C064" },
{ name = "gleam_otp", version = "0.16.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "FA0EB761339749B4E82D63016C6A18C4E6662DA05BAB6F1346F9AF2E679E301A" },
{ name = "gleam_stdlib", version = "0.52.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "50703862DF26453B277688FFCDBE9DD4AC45B3BD9742C0B370DB62BC1629A07D" },
{ name = "gleeunit", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "F7A7228925D3EE7D0813C922E062BFD6D7E9310F0BEE585D3A42F3307E3CFD13" },
{ name = "gs", version = "1.1.0-beta.1", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_otp", "gleam_stdlib"], source = "local", path = "../.." },
]

[requirements]
gleam_stdlib = { version = ">= 0.34.0 and < 2.0.0" }
gleeunit = { version = ">= 1.0.0 and < 2.0.0" }
gs = { path = "../.." }
63 changes: 63 additions & 0 deletions examples/06-par/src/example.gleam
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import gleam/erlang
import gleam/erlang/process
import gleam/float
import gleam/int
import gleam/io
import gs
import gs/par

pub fn measure(label: String, f: fn() -> a) -> a {
let start = erlang.system_time(erlang.Millisecond)
let result = f()
let end = erlang.system_time(erlang.Millisecond)
let duration = end - start

io.println(label <> " took " <> int.to_string(duration) <> "ms")

result
}

fn new_point() -> #(Float, Float) {
#(float.random(), float.random())
}

fn is_inside_circle(point: #(Float, Float)) -> Bool {
let #(x, y) = point
x *. x +. y *. y <=. 1.0
}

pub fn main() {
let num = 10

measure("Parallel", fn() {
let a =
gs.from_counter(1)
|> par.par_map(6, fn(num) { #(num, new_point()) })
|> gs.filter(fn(num_point) { is_inside_circle(num_point.1) })
|> gs.count()
|> gs.map(fn(e) { #(e.1, e.0.0) })
|> par.par_map(6, fn(element) {
4.0 *. int.to_float(element.0) /. int.to_float(element.1)
})
|> gs.take(num)
|> gs.to_last

io.debug(a)
})

measure("Sequential", fn() {
let b =
gs.from_counter(1)
|> gs.map(fn(num) { #(num, new_point()) })
|> gs.filter(fn(num_point) { is_inside_circle(num_point.1) })
|> gs.count()
|> gs.map(fn(e) { #(e.1, e.0.0) })
|> gs.map(fn(element) {
4.0 *. int.to_float(element.0) /. int.to_float(element.1)
})
|> gs.take(num)
|> gs.to_last

io.debug(b)
})
}
2 changes: 1 addition & 1 deletion manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
packages = [
{ name = "gleam_erlang", version = "0.33.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "A1D26B80F01901B59AABEE3475DD4C18D27D58FA5C897D922FCB9B099749C064" },
{ name = "gleam_otp", version = "0.16.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "FA0EB761339749B4E82D63016C6A18C4E6662DA05BAB6F1346F9AF2E679E301A" },
{ name = "gleam_stdlib", version = "0.51.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "14AFA8D3DDD7045203D422715DBB822D1725992A31DF35A08D97389014B74B68" },
{ name = "gleam_stdlib", version = "0.52.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "50703862DF26453B277688FFCDBE9DD4AC45B3BD9742C0B370DB62BC1629A07D" },
{ name = "gleeunit", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "F7A7228925D3EE7D0813C922E062BFD6D7E9310F0BEE585D3A42F3307E3CFD13" },
]

Expand Down
83 changes: 64 additions & 19 deletions src/gs.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -1772,13 +1772,8 @@ pub fn zip_with(
pub fn buffer(stream: Stream(a), size: Int, mode: OverflowStrategy) -> Stream(a) {
Stream(pull: fn() {
let assert Ok(pid) = queue.start(size)
case stream.pull() {
Some(#(value, next)) -> {
task.async(fn() { fill_buffer(next, mode, pid) |> to_nil() })
Some(#(value, read_buffer(pid)))
}
None -> None
}
task.async(fn() { fill_buffer(stream, mode, pid) |> to_nil() })
read_buffer(pid).pull()
})
}

Expand All @@ -1804,7 +1799,10 @@ fn fill_buffer(
True -> fill_buffer(next, mode, pid).pull()
False ->
case mode {
Wait -> fill_buffer(stream, mode, pid).pull()
Wait -> {
let current = Stream(pull: fn() { Some(#(value, next)) })
fill_buffer(current, mode, pid).pull()
}
Drop -> fill_buffer(next, mode, pid).pull()
Stop -> None
Panic -> panic as "Buffer overflow"
Expand Down Expand Up @@ -2858,15 +2856,14 @@ pub fn to_option(stream: Stream(a)) -> Option(a) {
///
/// ## Example
/// ```gleam
/// > let #(subject, task) =
/// > let subject = process.new_subject()
/// > from_range(1, 3)
/// > |> to_subject()
/// > |> to_subject(subject)
/// >
/// > process.receive(subject, 100) // -> Ok(Some(1))
/// > process.receive(subject, 100) // -> Ok(Some(2))
/// > process.receive(subject, 100) // -> Ok(Some(3))
/// > process.receive(subject, 100) // -> Ok(None)
/// > task.await(task)
/// ```
///
/// ## Visual Representation
Expand Down Expand Up @@ -2914,14 +2911,9 @@ pub fn to_option(stream: Stream(a)) -> Option(a) {
/// The returned task ensures proper cleanup and should be awaited when the
/// stream processing is complete. Recipients can receive values from the
/// subject using `process.receive` or similar functions.
pub fn to_subject(stream: Stream(a)) -> #(Subject(Option(a)), task.Task(Nil)) {
let subject = process.new_subject()
let task =
task.async(fn() {
to_fold(stream, Nil, fn(_, x) { process.send(subject, Some(x)) })
process.send(subject, None)
})
#(subject, task)
pub fn to_subject(stream: Stream(a), subject: Subject(Option(a))) -> Nil {
to_fold(stream, Nil, fn(_, x) { process.send(subject, Some(x)) })
process.send(subject, None)
}

/// Processes a stream of Options until encountering None, discarding all values and returning Nil.
Expand Down Expand Up @@ -3032,3 +3024,56 @@ pub fn to_nil_none_terminated(stream: Stream(Option(a))) -> Nil {
pub fn to_nil_error_terminated(stream: Stream(Result(a, e))) -> Nil {
stream |> error_terminated |> to_nil
}

/// Returns the last element of a stream as an Option.
///
/// ## Example
/// ```gleam
/// > from_range(1, 5)
/// |> to_last()
/// Some(5)
///
/// > from_empty()
/// |> to_last()
/// None
/// ```
///
/// ## Visual Representation
/// ```
/// Input: [1]-->[2]-->[3]-->[4]-->[5]-->|
/// | | | | |
/// keep keep keep keep keep
/// ↓ ↓ ↓ ↓ ↓
/// Last: Some(1)->Some(2)->Some(3)->Some(4)->Some(5)
/// ^
/// Result
/// ```
/// Where:
/// - `|` represents the end of the stream
/// - Each element replaces the previous in Last
/// - Only the final element is returned
///
/// ## When to Use
/// - When you only need the final element of a stream
/// - For finding the latest value in a sequence
/// - When implementing reductions that only care about final state
/// - For getting the most recent element in time-series data
/// - When you need to know the terminal value of a sequence
/// - For implementing "latest value" semantics
///
/// ## Description
/// The `to_last` function is a terminal operation that processes an entire stream
/// and returns an Option containing the last element encountered. For empty
/// streams, it returns None. The function maintains only the most recently seen
/// value in memory, making it memory efficient regardless of stream length.
///
/// Key characteristics:
/// - Returns Some(value) with the last element for non-empty streams
/// - Returns None for empty streams
/// - Processes all elements sequentially
/// - Memory efficient (only stores one element)
/// - Terminal operation (ends stream processing)
/// - Suitable for both finite and infinite streams (with proper termination)
pub fn to_last(stream: Stream(a)) -> Option(a) {
to_fold(stream, None, fn(_, x) { Some(x) })
}
39 changes: 39 additions & 0 deletions src/gs/internal/par_map_actor.gleam
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import gleam/erlang/process.{type Subject}
import gleam/option.{type Option, None, Some}
import gleam/otp/actor

pub type Message(a, b) {
Dispatch(Subject(Bool), a)
GetResult(Subject(b))
}

type State(b) {
State(result: Option(b))
}

pub fn start(f: fn(a) -> b) {
actor.start(State(result: None), handle_message(f))
}

fn handle_message(f: fn(a) -> b) {
fn(msg: Message(a, b), state: State(b)) -> actor.Next(Message(a, b), State(b)) {
case msg {
Dispatch(reply_with, item) -> {
process.send(reply_with, True)
actor.continue(State(result: Some(f(item))))
}

GetResult(reply_with) -> {
case state.result {
Some(result) -> {
process.send(reply_with, result)
actor.Stop(process.Normal)
}
None -> {
actor.continue(state)
}
}
}
}
}
}
21 changes: 15 additions & 6 deletions src/gs/internal/queue.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ pub type Message(a) {
}

type State(a) {
State(items: List(a), max_size: Int)
State(items: List(a), current_size: Int, max_size: Int)
}

pub fn start(max_size: Int) {
actor.start(State(items: [], max_size: max_size), handle_message)
actor.start(
State(items: [], current_size: 0, max_size: max_size),
handle_message,
)
}

fn handle_message(
Expand All @@ -24,11 +27,15 @@ fn handle_message(
) -> actor.Next(Message(a), State(a)) {
case msg {
Push(reply_with, item) -> {
case list.length(state.items) < state.max_size {
case state.current_size < state.max_size {
True -> {
process.send(reply_with, True)
actor.continue(
State(..state, items: list.append(state.items, [item])),
State(
..state,
current_size: state.current_size + 1,
items: list.append(state.items, [item]),
),
)
}
False -> {
Expand All @@ -46,7 +53,9 @@ fn handle_message(
}
[first, ..rest] -> {
process.send(reply_with, Some(first))
actor.continue(State(..state, items: rest))
actor.continue(
State(..state, current_size: state.current_size - 1, items: rest),
)
}
}
}
Expand All @@ -60,7 +69,7 @@ fn handle_message(
}

Size(reply_with) -> {
process.send(reply_with, list.length(state.items))
process.send(reply_with, state.current_size)
actor.continue(state)
}
}
Expand Down
16 changes: 8 additions & 8 deletions src/gs/par.gleam
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import gleam/list
import gleam/otp/task
import gleam/erlang/process
import gs.{type Stream}
import gs/internal/par_map_actor

/// Experimental parallel map operation over a stream.
///
Expand All @@ -23,11 +23,11 @@ pub fn par_map(
with mapper: fn(a) -> b,
) -> Stream(b) {
stream
|> gs.chunks(workers)
|> gs.flat_map(fn(elms) {
elms
|> list.map(fn(elm) { task.async(fn() { mapper(elm) }) })
|> list.map(fn(t) { task.await_forever(t) })
|> gs.from_list
|> gs.map(fn(ele) {
let assert Ok(pid) = par_map_actor.start(mapper)
process.call_forever(pid, fn(s) { par_map_actor.Dispatch(s, ele) })
pid
})
|> gs.buffer(workers, gs.Wait)
|> gs.map(fn(pid) { process.call_forever(pid, par_map_actor.GetResult) })
}
6 changes: 1 addition & 5 deletions test/par_test.gleam
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import gleam/erlang/process
import gleeunit
import gleeunit/should
import gs
Expand All @@ -15,10 +14,7 @@ pub fn par_map_test() {

let result =
stream
|> par.par_map(6, fn(x) {
process.sleep(1000)
x * 2
})
|> par.par_map(3, fn(x) { x * 2 })
|> gs.to_list

result
Expand Down

0 comments on commit c271ccb

Please sign in to comment.