Skip to content

Commit

Permalink
Rename to iterableWritableStream from PushAsyncIterableIterator (#23
Browse files Browse the repository at this point in the history
)

* Rename to `iterableWritableStream` from `PushAsyncIterableIterator`

* Verbiage
  • Loading branch information
compulim authored Jun 15, 2024
1 parent f54555b commit e1f1fd3
Show file tree
Hide file tree
Showing 13 changed files with 335 additions and 256 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Added typed `Observable` from `core-js-pure`, in PR [#8](https://github.com/compulim/iter-fest/pull/8)
- Added `observableFromAsync` to convert `AsyncIterable` into `Observable` in PR [#9](https://github.com/compulim/iter-fest/pull/9)
- Added `PushAsyncIterableIterator` in PR [#11](https://github.com/compulim/iter-fest/pull/11)
- Added `IterableWritableStream` in PR [#11](https://github.com/compulim/iter-fest/pull/11) and [#23](https://github.com/compulim/iter-fest/pull/23)
- Added `readerValues` in PR [#12](https://github.com/compulim/iter-fest/pull/12) and [#14](https://github.com/compulim/iter-fest/pull/14)
- Added `observableSubscribeAsReadable` in PR [#13](https://github.com/compulim/iter-fest/pull/13)
- Added `readableStreamFrom` in PR [#15](https://github.com/compulim/iter-fest/pull/15) and [#22](https://github.com/compulim/iter-fest/pull/22)
Expand Down
20 changes: 12 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,18 @@ Note: `readableStreamFrom()` will call `[Symbol.iterator]()` initially to restar

### Producer-consumer queue

`PushAsyncIterableIterator` is a simple push-based producer-consumer queue and designed to decouple the flow between a producer and consumer. The producer can push a new job at anytime. The consumer can pull a job at its own convenience via for-loop.
`IterableWritableStream` is a push-based producer-consumer queue designed to decouple the flow between a producer and multiple consumers. The producer can push a new job at anytime. The consumer can pull a job at its own convenience via for-loop.

Compare to pull-based queue, a push-based queue is very easy to use. However, pull-based queue offers better flow control as it will produce a job only if there is a consumer ready to consume.
`IterableWritableStream` supports multiple consumers and continuation:

For a full-featured producer-consumer queue that supports flow control, use `ReadableStream` instead.
- Multiple consumers: when 2 or more consumers are active at the same time, jobs will be distributed across all consumers in a round robin fashion when possible
- Continuation: when the last consumer disconnected while producer keep pushing new jobs, the next consumer will pick up where the last consumer left

Compare to pull-based queue, a push-based queue is easy to use. However, pull-based queue offers better flow control as it will produce a job only if there is a consumer ready to consume.

```ts
const iterable = new PushAsyncIterableIterator();
const iterable = new IterableWritableStream();
const writer = iterable.getWriter();

(async function consumer() {
for await (const value of iterable) {
Expand All @@ -173,10 +177,10 @@ const iterable = new PushAsyncIterableIterator();
})();

(async function producer() {
iterable.push(1);
iterable.push(2);
iterable.push(3);
iterable.close();
writer.write(1);
writer.write(2);
writer.write(3);
writer.close();
})();

// Prints "1", "2", "3", "Done".
Expand Down
75 changes: 38 additions & 37 deletions packages/integration-test/importDefault.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import withResolvers from 'core-js-pure/full/promise/with-resolvers';
import {
IterableWritableStream,
Observable,
PushAsyncIterableIterator,
SymbolObservable,
asyncGeneratorWithLastValue,
asyncIteratorToAsyncIterable,
Expand Down Expand Up @@ -141,6 +141,43 @@ test('iterableToSpliced should work', () =>

test('iterableToString should work', () => expect(iterableToString([1, 2, 3])).toBe('1,2,3'));

test('IterableWritableStream should work', async () => {
let deferred = withResolvers();
const done = jest.fn();
const iterable = new IterableWritableStream();
const writer = iterable.getWriter();
const values = [];

(async function () {
for await (const value of iterable) {
values.push(value);

deferred.resolve();
deferred = withResolvers();
}

done();
deferred.resolve();
})();

expect(values).toEqual([]);
expect(done).not.toHaveBeenCalled();

writer.write(1);
await deferred.promise;
expect(values).toEqual([1]);
expect(done).not.toHaveBeenCalled();

writer.write(2);
await deferred.promise;
expect(values).toEqual([1, 2]);
expect(done).not.toHaveBeenCalled();

writer.close();
await deferred.promise;
expect(done).toHaveBeenCalledTimes(1);
});

test('iteratorToIterable should work', () =>
expect(
Array.from(
Expand Down Expand Up @@ -209,42 +246,6 @@ test('observableSubscribeAsReadable should work', async () => {
await expect(reader.read()).resolves.toEqual({ done: true });
});

test('PushAsyncIterableIterator should work', async () => {
let deferred = withResolvers();
const done = jest.fn();
const iterable = new PushAsyncIterableIterator();
const values = [];

(async function () {
for await (const value of iterable) {
values.push(value);

deferred.resolve();
deferred = withResolvers();
}

done();
deferred.resolve();
})();

expect(values).toEqual([]);
expect(done).not.toHaveBeenCalled();

iterable.push(1);
await deferred.promise;
expect(values).toEqual([1]);
expect(done).not.toHaveBeenCalled();

iterable.push(2);
await deferred.promise;
expect(values).toEqual([1, 2]);
expect(done).not.toHaveBeenCalled();

iterable.close();
await deferred.promise;
expect(done).toHaveBeenCalledTimes(1);
});

test('readableStreamFrom should work', async () => {
const iterable = [1, 2, 3].values();

Expand Down
75 changes: 38 additions & 37 deletions packages/integration-test/importNamed.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import { iterableSlice } from 'iter-fest/iterableSlice';
import { iterableSome } from 'iter-fest/iterableSome';
import { iterableToSpliced } from 'iter-fest/iterableToSpliced';
import { iterableToString } from 'iter-fest/iterableToString';
import { IterableWritableStream } from 'iter-fest/iterableWritableStream';
import { iteratorToIterable } from 'iter-fest/iteratorToIterable';
import { Observable } from 'iter-fest/observable';
import { observableFromAsync } from 'iter-fest/observableFromAsync';
import { observableSubscribeAsReadable } from 'iter-fest/observableSubscribeAsReadable';
import { PushAsyncIterableIterator } from 'iter-fest/pushAsyncIterableIterator';
import { readableStreamFrom } from 'iter-fest/readableStreamFrom';
import { readerValues } from 'iter-fest/readerValues';
import { SymbolObservable } from 'iter-fest/symbolObservable';
Expand Down Expand Up @@ -139,6 +139,43 @@ test('iterableToSpliced should work', () =>

test('iterableToString should work', () => expect(iterableToString([1, 2, 3])).toBe('1,2,3'));

test('IterableWritableStream should work', async () => {
let deferred = withResolvers();
const done = jest.fn();
const iterable = new IterableWritableStream();
const writer = iterable.getWriter();
const values = [];

(async function () {
for await (const value of iterable) {
values.push(value);

deferred.resolve();
deferred = withResolvers();
}

done();
deferred.resolve();
})();

expect(values).toEqual([]);
expect(done).not.toHaveBeenCalled();

writer.write(1);
await deferred.promise;
expect(values).toEqual([1]);
expect(done).not.toHaveBeenCalled();

writer.write(2);
await deferred.promise;
expect(values).toEqual([1, 2]);
expect(done).not.toHaveBeenCalled();

writer.close();
await deferred.promise;
expect(done).toHaveBeenCalledTimes(1);
});

test('iteratorToIterable should work', () =>
expect(
Array.from(
Expand Down Expand Up @@ -207,42 +244,6 @@ test('observableSubscribeAsReadable should work', async () => {
await expect(reader.read()).resolves.toEqual({ done: true });
});

test('PushAsyncIterableIterator should work', async () => {
let deferred = withResolvers();
const done = jest.fn();
const iterable = new PushAsyncIterableIterator();
const values = [];

(async function () {
for await (const value of iterable) {
values.push(value);

deferred.resolve();
deferred = withResolvers();
}

done();
deferred.resolve();
})();

expect(values).toEqual([]);
expect(done).not.toHaveBeenCalled();

iterable.push(1);
await deferred.promise;
expect(values).toEqual([1]);
expect(done).not.toHaveBeenCalled();

iterable.push(2);
await deferred.promise;
expect(values).toEqual([1, 2]);
expect(done).not.toHaveBeenCalled();

iterable.close();
await deferred.promise;
expect(done).toHaveBeenCalledTimes(1);
});

test('readableStreamFrom should work', async () => {
const iterable = [1, 2, 3].values();

Expand Down
75 changes: 38 additions & 37 deletions packages/integration-test/requireNamed.test.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ const { iterableSlice } = require('iter-fest/iterableSlice');
const { iterableSome } = require('iter-fest/iterableSome');
const { iterableToSpliced } = require('iter-fest/iterableToSpliced');
const { iterableToString } = require('iter-fest/iterableToString');
const { IterableWritableStream } = require('iter-fest/iterableWritableStream');
const { iteratorToIterable } = require('iter-fest/iteratorToIterable');
const { Observable } = require('iter-fest/observable');
const { observableFromAsync } = require('iter-fest/observableFromAsync');
const { observableSubscribeAsReadable } = require('iter-fest/observableSubscribeAsReadable');
const { PushAsyncIterableIterator } = require('iter-fest/pushAsyncIterableIterator');
const { readableStreamFrom } = require('iter-fest/readableStreamFrom');
const { readerValues } = require('iter-fest/readerValues');
const { SymbolObservable } = require('iter-fest/symbolObservable');
Expand Down Expand Up @@ -140,6 +140,43 @@ test('iterableToSpliced should work', () =>

test('iterableToString should work', () => expect(iterableToString([1, 2, 3])).toBe('1,2,3'));

test('IterableWritableStream should work', async () => {
let deferred = withResolvers();
const done = jest.fn();
const iterable = new IterableWritableStream();
const writer = iterable.getWriter();
const values = [];

(async function () {
for await (const value of iterable) {
values.push(value);

deferred.resolve();
deferred = withResolvers();
}

done();
deferred.resolve();
})();

expect(values).toEqual([]);
expect(done).not.toHaveBeenCalled();

writer.write(1);
await deferred.promise;
expect(values).toEqual([1]);
expect(done).not.toHaveBeenCalled();

writer.write(2);
await deferred.promise;
expect(values).toEqual([1, 2]);
expect(done).not.toHaveBeenCalled();

writer.close();
await deferred.promise;
expect(done).toHaveBeenCalledTimes(1);
});

test('iteratorToIterable should work', () =>
expect(
Array.from(
Expand Down Expand Up @@ -208,42 +245,6 @@ test('observableSubscribeAsReadable should work', async () => {
await expect(reader.read()).resolves.toEqual({ done: true });
});

test('PushAsyncIterableIterator should work', async () => {
let deferred = withResolvers();
const done = jest.fn();
const iterable = new PushAsyncIterableIterator();
const values = [];

(async function () {
for await (const value of iterable) {
values.push(value);

deferred.resolve();
deferred = withResolvers();
}

done();
deferred.resolve();
})();

expect(values).toEqual([]);
expect(done).not.toHaveBeenCalled();

iterable.push(1);
await deferred.promise;
expect(values).toEqual([1]);
expect(done).not.toHaveBeenCalled();

iterable.push(2);
await deferred.promise;
expect(values).toEqual([1, 2]);
expect(done).not.toHaveBeenCalled();

iterable.close();
await deferred.promise;
expect(done).toHaveBeenCalledTimes(1);
});

test('readableStreamFrom should work', async () => {
const iterable = [1, 2, 3].values();

Expand Down
Loading

0 comments on commit e1f1fd3

Please sign in to comment.