Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/andersea/slurry
Browse files Browse the repository at this point in the history
  • Loading branch information
andersea committed Jan 26, 2024
2 parents cd7f296 + c2efbc2 commit 9d862bb
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 11 deletions.
7 changes: 5 additions & 2 deletions slurry/sections/_producers.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class InsertValue(TrioSection):
"""Inserts a single user supplied value into the pipeline on startup and then
passes through any further received items unmodified.
If no input is used, the single value will be sent, and InsertValue will close.
:param value: Item to send on startup.
:type value: Any
"""
Expand All @@ -127,5 +129,6 @@ def __init__(self, value: Any) -> None:

async def refine(self, input, output):
await output(self.value)
async for item in input:
await output(item)
if input:
async for item in input:
await output(item)
33 changes: 24 additions & 9 deletions tests/test_producers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import trio

from slurry import Pipeline
from slurry.sections import Repeat, Metronome, InsertValue
from slurry.sections import Repeat, Metronome, InsertValue, _producers

from .fixtures import produce_alphabet

Expand Down Expand Up @@ -51,19 +51,27 @@ async def test_repeat_input(autojump_clock):
break
assert results == [('a', 1), ('a', 2), ('b', 2.5), ('b', 3.5), ('c', 4)]

async def test_metronome():
async def test_metronome(autojump_clock, monkeypatch):
monkeypatch.setattr(_producers, "time", trio.current_time)
async with Pipeline.create(
produce_alphabet(5, max=3),
produce_alphabet(5, max=6, delay=1),
Metronome(5)
) as pipeline, pipeline.tap() as aiter:
results = []
start_time = trio.current_time()
async for item in aiter:
results.append((item, trio.current_time() - start_time))
if len(results) == 2:
break
assert [x[0] for x in results] == ['a', 'b']
assert 5 - results[1][1] + results[0][1] < 0.1
results.append((item, trio.current_time()))
assert results == [(letter, 5.0 * (i + 1)) for i, letter in enumerate("abcde")]

async def test_metronome_no_input(autojump_clock, monkeypatch):
monkeypatch.setattr(_producers, "time", trio.current_time)
async with Pipeline.create(
Metronome(5, "a")
) as pipeline, pipeline.tap() as aiter:
results = []
for _ in range(5):
item = await aiter.__anext__()
results.append((item, trio.current_time()))
assert results == [("a", 5.0 * (i + 1)) for i in range(5)]

async def test_insert_value(autojump_clock):
async with Pipeline.create(
Expand All @@ -73,3 +81,10 @@ async def test_insert_value(autojump_clock):
start_time = trio.current_time()
results = [(v, trio.current_time() - start_time) async for v in aiter]
assert results == [('n', 0), ('a', 1), ('b', 2), ('c', 3)]

async def test_insert_value_no_input(autojump_clock):
async with Pipeline.create(
InsertValue('n')
) as pipeline, pipeline.tap() as aiter:
results = [v async for v in aiter]
assert results == ['n']

0 comments on commit 9d862bb

Please sign in to comment.