From e1f1fd3a5470c741f30cd3f99e3bb8207328812f Mon Sep 17 00:00:00 2001 From: William Wong Date: Sat, 15 Jun 2024 01:25:37 -0700 Subject: [PATCH] Rename to `iterableWritableStream` from `PushAsyncIterableIterator` (#23) * Rename to `iterableWritableStream` from `PushAsyncIterableIterator` * Verbiage --- CHANGELOG.md | 2 +- README.md | 20 ++-- .../integration-test/importDefault.test.ts | 75 ++++++------ packages/integration-test/importNamed.test.ts | 75 ++++++------ .../integration-test/requireNamed.test.cjs | 75 ++++++------ .../integration-test/requiredDefault.test.cjs | 75 ++++++------ packages/iter-fest/package.json | 20 ++-- .../src/PushAsyncIterableIterator.spec.ts | 53 --------- .../src/PushAsyncIterableIterator.ts | 34 ------ packages/iter-fest/src/index.ts | 2 +- .../src/iterableWritableStream.spec.ts | 112 ++++++++++++++++++ .../iter-fest/src/iterableWritableStream.ts | 46 +++++++ packages/iter-fest/tsup.config.ts | 2 +- 13 files changed, 335 insertions(+), 256 deletions(-) delete mode 100644 packages/iter-fest/src/PushAsyncIterableIterator.spec.ts delete mode 100644 packages/iter-fest/src/PushAsyncIterableIterator.ts create mode 100644 packages/iter-fest/src/iterableWritableStream.spec.ts create mode 100644 packages/iter-fest/src/iterableWritableStream.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index b0f3686..4b52815 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added typed `Observable` from `core-js-pure`, in PR [#8](https://github.com/compulim/iter-fest/pull/8) - Added `observableFromAsync` to convert `AsyncIterable` into `Observable` in PR [#9](https://github.com/compulim/iter-fest/pull/9) -- Added `PushAsyncIterableIterator` in PR [#11](https://github.com/compulim/iter-fest/pull/11) +- Added `IterableWritableStream` in PR [#11](https://github.com/compulim/iter-fest/pull/11) and [#23](https://github.com/compulim/iter-fest/pull/23) - Added `readerValues` in PR [#12](https://github.com/compulim/iter-fest/pull/12) and [#14](https://github.com/compulim/iter-fest/pull/14) - Added `observableSubscribeAsReadable` in PR [#13](https://github.com/compulim/iter-fest/pull/13) - Added `readableStreamFrom` in PR [#15](https://github.com/compulim/iter-fest/pull/15) and [#22](https://github.com/compulim/iter-fest/pull/22) diff --git a/README.md b/README.md index ea68071..dd772d3 100644 --- a/README.md +++ b/README.md @@ -155,14 +155,18 @@ Note: `readableStreamFrom()` will call `[Symbol.iterator]()` initially to restar ### Producer-consumer queue -`PushAsyncIterableIterator` is a simple push-based producer-consumer queue and designed to decouple the flow between a producer and consumer. The producer can push a new job at anytime. The consumer can pull a job at its own convenience via for-loop. +`IterableWritableStream` is a push-based producer-consumer queue designed to decouple the flow between a producer and multiple consumers. The producer can push a new job at anytime. The consumer can pull a job at its own convenience via for-loop. -Compare to pull-based queue, a push-based queue is very easy to use. However, pull-based queue offers better flow control as it will produce a job only if there is a consumer ready to consume. +`IterableWritableStream` supports multiple consumers and continuation: -For a full-featured producer-consumer queue that supports flow control, use `ReadableStream` instead. +- Multiple consumers: when 2 or more consumers are active at the same time, jobs will be distributed across all consumers in a round robin fashion when possible +- Continuation: when the last consumer disconnected while producer keep pushing new jobs, the next consumer will pick up where the last consumer left + +Compare to pull-based queue, a push-based queue is easy to use. However, pull-based queue offers better flow control as it will produce a job only if there is a consumer ready to consume. ```ts -const iterable = new PushAsyncIterableIterator(); +const iterable = new IterableWritableStream(); +const writer = iterable.getWriter(); (async function consumer() { for await (const value of iterable) { @@ -173,10 +177,10 @@ const iterable = new PushAsyncIterableIterator(); })(); (async function producer() { - iterable.push(1); - iterable.push(2); - iterable.push(3); - iterable.close(); + writer.write(1); + writer.write(2); + writer.write(3); + writer.close(); })(); // Prints "1", "2", "3", "Done". diff --git a/packages/integration-test/importDefault.test.ts b/packages/integration-test/importDefault.test.ts index 56be0e6..3789f6a 100644 --- a/packages/integration-test/importDefault.test.ts +++ b/packages/integration-test/importDefault.test.ts @@ -1,7 +1,7 @@ import withResolvers from 'core-js-pure/full/promise/with-resolvers'; import { + IterableWritableStream, Observable, - PushAsyncIterableIterator, SymbolObservable, asyncGeneratorWithLastValue, asyncIteratorToAsyncIterable, @@ -141,6 +141,43 @@ test('iterableToSpliced should work', () => test('iterableToString should work', () => expect(iterableToString([1, 2, 3])).toBe('1,2,3')); +test('IterableWritableStream should work', async () => { + let deferred = withResolvers(); + const done = jest.fn(); + const iterable = new IterableWritableStream(); + const writer = iterable.getWriter(); + const values = []; + + (async function () { + for await (const value of iterable) { + values.push(value); + + deferred.resolve(); + deferred = withResolvers(); + } + + done(); + deferred.resolve(); + })(); + + expect(values).toEqual([]); + expect(done).not.toHaveBeenCalled(); + + writer.write(1); + await deferred.promise; + expect(values).toEqual([1]); + expect(done).not.toHaveBeenCalled(); + + writer.write(2); + await deferred.promise; + expect(values).toEqual([1, 2]); + expect(done).not.toHaveBeenCalled(); + + writer.close(); + await deferred.promise; + expect(done).toHaveBeenCalledTimes(1); +}); + test('iteratorToIterable should work', () => expect( Array.from( @@ -209,42 +246,6 @@ test('observableSubscribeAsReadable should work', async () => { await expect(reader.read()).resolves.toEqual({ done: true }); }); -test('PushAsyncIterableIterator should work', async () => { - let deferred = withResolvers(); - const done = jest.fn(); - const iterable = new PushAsyncIterableIterator(); - const values = []; - - (async function () { - for await (const value of iterable) { - values.push(value); - - deferred.resolve(); - deferred = withResolvers(); - } - - done(); - deferred.resolve(); - })(); - - expect(values).toEqual([]); - expect(done).not.toHaveBeenCalled(); - - iterable.push(1); - await deferred.promise; - expect(values).toEqual([1]); - expect(done).not.toHaveBeenCalled(); - - iterable.push(2); - await deferred.promise; - expect(values).toEqual([1, 2]); - expect(done).not.toHaveBeenCalled(); - - iterable.close(); - await deferred.promise; - expect(done).toHaveBeenCalledTimes(1); -}); - test('readableStreamFrom should work', async () => { const iterable = [1, 2, 3].values(); diff --git a/packages/integration-test/importNamed.test.ts b/packages/integration-test/importNamed.test.ts index 6c00a46..879a610 100644 --- a/packages/integration-test/importNamed.test.ts +++ b/packages/integration-test/importNamed.test.ts @@ -22,11 +22,11 @@ import { iterableSlice } from 'iter-fest/iterableSlice'; import { iterableSome } from 'iter-fest/iterableSome'; import { iterableToSpliced } from 'iter-fest/iterableToSpliced'; import { iterableToString } from 'iter-fest/iterableToString'; +import { IterableWritableStream } from 'iter-fest/iterableWritableStream'; import { iteratorToIterable } from 'iter-fest/iteratorToIterable'; import { Observable } from 'iter-fest/observable'; import { observableFromAsync } from 'iter-fest/observableFromAsync'; import { observableSubscribeAsReadable } from 'iter-fest/observableSubscribeAsReadable'; -import { PushAsyncIterableIterator } from 'iter-fest/pushAsyncIterableIterator'; import { readableStreamFrom } from 'iter-fest/readableStreamFrom'; import { readerValues } from 'iter-fest/readerValues'; import { SymbolObservable } from 'iter-fest/symbolObservable'; @@ -139,6 +139,43 @@ test('iterableToSpliced should work', () => test('iterableToString should work', () => expect(iterableToString([1, 2, 3])).toBe('1,2,3')); +test('IterableWritableStream should work', async () => { + let deferred = withResolvers(); + const done = jest.fn(); + const iterable = new IterableWritableStream(); + const writer = iterable.getWriter(); + const values = []; + + (async function () { + for await (const value of iterable) { + values.push(value); + + deferred.resolve(); + deferred = withResolvers(); + } + + done(); + deferred.resolve(); + })(); + + expect(values).toEqual([]); + expect(done).not.toHaveBeenCalled(); + + writer.write(1); + await deferred.promise; + expect(values).toEqual([1]); + expect(done).not.toHaveBeenCalled(); + + writer.write(2); + await deferred.promise; + expect(values).toEqual([1, 2]); + expect(done).not.toHaveBeenCalled(); + + writer.close(); + await deferred.promise; + expect(done).toHaveBeenCalledTimes(1); +}); + test('iteratorToIterable should work', () => expect( Array.from( @@ -207,42 +244,6 @@ test('observableSubscribeAsReadable should work', async () => { await expect(reader.read()).resolves.toEqual({ done: true }); }); -test('PushAsyncIterableIterator should work', async () => { - let deferred = withResolvers(); - const done = jest.fn(); - const iterable = new PushAsyncIterableIterator(); - const values = []; - - (async function () { - for await (const value of iterable) { - values.push(value); - - deferred.resolve(); - deferred = withResolvers(); - } - - done(); - deferred.resolve(); - })(); - - expect(values).toEqual([]); - expect(done).not.toHaveBeenCalled(); - - iterable.push(1); - await deferred.promise; - expect(values).toEqual([1]); - expect(done).not.toHaveBeenCalled(); - - iterable.push(2); - await deferred.promise; - expect(values).toEqual([1, 2]); - expect(done).not.toHaveBeenCalled(); - - iterable.close(); - await deferred.promise; - expect(done).toHaveBeenCalledTimes(1); -}); - test('readableStreamFrom should work', async () => { const iterable = [1, 2, 3].values(); diff --git a/packages/integration-test/requireNamed.test.cjs b/packages/integration-test/requireNamed.test.cjs index 17d8513..9b34e96 100644 --- a/packages/integration-test/requireNamed.test.cjs +++ b/packages/integration-test/requireNamed.test.cjs @@ -23,11 +23,11 @@ const { iterableSlice } = require('iter-fest/iterableSlice'); const { iterableSome } = require('iter-fest/iterableSome'); const { iterableToSpliced } = require('iter-fest/iterableToSpliced'); const { iterableToString } = require('iter-fest/iterableToString'); +const { IterableWritableStream } = require('iter-fest/iterableWritableStream'); const { iteratorToIterable } = require('iter-fest/iteratorToIterable'); const { Observable } = require('iter-fest/observable'); const { observableFromAsync } = require('iter-fest/observableFromAsync'); const { observableSubscribeAsReadable } = require('iter-fest/observableSubscribeAsReadable'); -const { PushAsyncIterableIterator } = require('iter-fest/pushAsyncIterableIterator'); const { readableStreamFrom } = require('iter-fest/readableStreamFrom'); const { readerValues } = require('iter-fest/readerValues'); const { SymbolObservable } = require('iter-fest/symbolObservable'); @@ -140,6 +140,43 @@ test('iterableToSpliced should work', () => test('iterableToString should work', () => expect(iterableToString([1, 2, 3])).toBe('1,2,3')); +test('IterableWritableStream should work', async () => { + let deferred = withResolvers(); + const done = jest.fn(); + const iterable = new IterableWritableStream(); + const writer = iterable.getWriter(); + const values = []; + + (async function () { + for await (const value of iterable) { + values.push(value); + + deferred.resolve(); + deferred = withResolvers(); + } + + done(); + deferred.resolve(); + })(); + + expect(values).toEqual([]); + expect(done).not.toHaveBeenCalled(); + + writer.write(1); + await deferred.promise; + expect(values).toEqual([1]); + expect(done).not.toHaveBeenCalled(); + + writer.write(2); + await deferred.promise; + expect(values).toEqual([1, 2]); + expect(done).not.toHaveBeenCalled(); + + writer.close(); + await deferred.promise; + expect(done).toHaveBeenCalledTimes(1); +}); + test('iteratorToIterable should work', () => expect( Array.from( @@ -208,42 +245,6 @@ test('observableSubscribeAsReadable should work', async () => { await expect(reader.read()).resolves.toEqual({ done: true }); }); -test('PushAsyncIterableIterator should work', async () => { - let deferred = withResolvers(); - const done = jest.fn(); - const iterable = new PushAsyncIterableIterator(); - const values = []; - - (async function () { - for await (const value of iterable) { - values.push(value); - - deferred.resolve(); - deferred = withResolvers(); - } - - done(); - deferred.resolve(); - })(); - - expect(values).toEqual([]); - expect(done).not.toHaveBeenCalled(); - - iterable.push(1); - await deferred.promise; - expect(values).toEqual([1]); - expect(done).not.toHaveBeenCalled(); - - iterable.push(2); - await deferred.promise; - expect(values).toEqual([1, 2]); - expect(done).not.toHaveBeenCalled(); - - iterable.close(); - await deferred.promise; - expect(done).toHaveBeenCalledTimes(1); -}); - test('readableStreamFrom should work', async () => { const iterable = [1, 2, 3].values(); diff --git a/packages/integration-test/requiredDefault.test.cjs b/packages/integration-test/requiredDefault.test.cjs index fbcc75a..aeb81d6 100644 --- a/packages/integration-test/requiredDefault.test.cjs +++ b/packages/integration-test/requiredDefault.test.cjs @@ -24,11 +24,11 @@ const { iterableSome, iterableToSpliced, iterableToString, + IterableWritableStream, iteratorToIterable, Observable, observableFromAsync, observableSubscribeAsReadable, - PushAsyncIterableIterator, readableStreamFrom, readerValues, SymbolObservable @@ -142,6 +142,43 @@ test('iterableToSpliced should work', () => test('iterableToString should work', () => expect(iterableToString([1, 2, 3])).toBe('1,2,3')); +test('IterableWritableStream should work', async () => { + let deferred = withResolvers(); + const done = jest.fn(); + const iterable = new IterableWritableStream(); + const writer = iterable.getWriter(); + const values = []; + + (async function () { + for await (const value of iterable) { + values.push(value); + + deferred.resolve(); + deferred = withResolvers(); + } + + done(); + deferred.resolve(); + })(); + + expect(values).toEqual([]); + expect(done).not.toHaveBeenCalled(); + + writer.write(1); + await deferred.promise; + expect(values).toEqual([1]); + expect(done).not.toHaveBeenCalled(); + + writer.write(2); + await deferred.promise; + expect(values).toEqual([1, 2]); + expect(done).not.toHaveBeenCalled(); + + writer.close(); + await deferred.promise; + expect(done).toHaveBeenCalledTimes(1); +}); + test('iteratorToIterable should work', () => expect( Array.from( @@ -210,42 +247,6 @@ test('observableSubscribeAsReadable should work', async () => { await expect(reader.read()).resolves.toEqual({ done: true }); }); -test('PushAsyncIterableIterator should work', async () => { - let deferred = withResolvers(); - const done = jest.fn(); - const iterable = new PushAsyncIterableIterator(); - const values = []; - - (async function () { - for await (const value of iterable) { - values.push(value); - - deferred.resolve(); - deferred = withResolvers(); - } - - done(); - deferred.resolve(); - })(); - - expect(values).toEqual([]); - expect(done).not.toHaveBeenCalled(); - - iterable.push(1); - await deferred.promise; - expect(values).toEqual([1]); - expect(done).not.toHaveBeenCalled(); - - iterable.push(2); - await deferred.promise; - expect(values).toEqual([1, 2]); - expect(done).not.toHaveBeenCalled(); - - iterable.close(); - await deferred.promise; - expect(done).toHaveBeenCalledTimes(1); -}); - test('readableStreamFrom should work', async () => { const iterable = [1, 2, 3].values(); diff --git a/packages/iter-fest/package.json b/packages/iter-fest/package.json index 3869863..ab29539 100644 --- a/packages/iter-fest/package.json +++ b/packages/iter-fest/package.json @@ -236,6 +236,16 @@ "default": "./dist/iter-fest.iterableToString.js" } }, + "./iterableWritableStream": { + "import": { + "types": "./dist/iter-fest.iterableWritableStream.d.mts", + "default": "./dist/iter-fest.iterableWritableStream.mjs" + }, + "require": { + "types": "./dist/iter-fest.iterableWritableStream.d.ts", + "default": "./dist/iter-fest.iterableWritableStream.js" + } + }, "./iteratorToIterable": { "import": { "types": "./dist/iter-fest.iteratorToIterable.d.mts", @@ -276,16 +286,6 @@ "default": "./dist/iter-fest.observableSubscribeAsReadable.js" } }, - "./pushAsyncIterableIterator": { - "import": { - "types": "./dist/iter-fest.pushAsyncIterableIterator.d.mts", - "default": "./dist/iter-fest.pushAsyncIterableIterator.mjs" - }, - "require": { - "types": "./dist/iter-fest.pushAsyncIterableIterator.d.ts", - "default": "./dist/iter-fest.pushAsyncIterableIterator.js" - } - }, "./readableStreamFrom": { "import": { "types": "./dist/iter-fest.readableStreamFrom.d.mts", diff --git a/packages/iter-fest/src/PushAsyncIterableIterator.spec.ts b/packages/iter-fest/src/PushAsyncIterableIterator.spec.ts deleted file mode 100644 index 5d07909..0000000 --- a/packages/iter-fest/src/PushAsyncIterableIterator.spec.ts +++ /dev/null @@ -1,53 +0,0 @@ -import { PushAsyncIterableIterator } from './PushAsyncIterableIterator'; -import type { JestMockOf } from './private/JestMockOf'; - -describe('comprehensive', () => { - let done: JestMockOf<() => void>; - let iterable: PushAsyncIterableIterator; - let values: number[]; - - beforeEach(() => { - done = jest.fn(); - iterable = new PushAsyncIterableIterator(); - values = []; - - (async () => { - for await (const value of iterable) { - values.push(value); - } - - done(); - })(); - }); - - test('should receive no values', () => expect(values).toEqual([])); - test('should not completed the for-loop', () => expect(done).not.toHaveBeenCalled()); - - describe('when push(1) is called', () => { - beforeEach(() => iterable.push(1)); - - test('should receive a value', () => expect(values).toEqual([1])); - - describe('when push(2) is called', () => { - beforeEach(() => iterable.push(2)); - - test('should receive a value', () => expect(values).toEqual([1, 2])); - }); - }); - - describe('when close() is called', () => { - beforeEach(() => iterable.close()); - - test('should completed the for-loop', () => expect(done).toHaveBeenCalledTimes(1)); - }); -}); - -test('push after close should close the iterable', async () => { - const iterable = new PushAsyncIterableIterator(); - - iterable.close(); - await expect(iterable.next()).resolves.toEqual({ done: true, value: undefined }); - - iterable.push(1); - await expect(iterable.next()).resolves.toEqual({ done: true, value: undefined }); -}); diff --git a/packages/iter-fest/src/PushAsyncIterableIterator.ts b/packages/iter-fest/src/PushAsyncIterableIterator.ts deleted file mode 100644 index 652c5a7..0000000 --- a/packages/iter-fest/src/PushAsyncIterableIterator.ts +++ /dev/null @@ -1,34 +0,0 @@ -import withResolvers from './private/withResolvers'; - -const CLOSE = Symbol('close'); - -export class PushAsyncIterableIterator implements AsyncIterableIterator { - #closed: boolean = false; - #pushResolvers: PromiseWithResolvers = withResolvers(); - - [Symbol.asyncIterator]() { - return this; - } - - close() { - this.#closed = true; - this.#pushResolvers.resolve(CLOSE); - } - - async next(): Promise> { - const value = await this.#pushResolvers.promise; - - if (value === CLOSE) { - return { done: true, value: undefined }; - } - - return { done: false, value }; - } - - push(value: T) { - if (!this.#closed) { - this.#pushResolvers.resolve(value); - this.#pushResolvers = withResolvers(); - } - } -} diff --git a/packages/iter-fest/src/index.ts b/packages/iter-fest/src/index.ts index 04cdd59..c3e8445 100644 --- a/packages/iter-fest/src/index.ts +++ b/packages/iter-fest/src/index.ts @@ -1,5 +1,4 @@ export * from './Observable'; -export * from './PushAsyncIterableIterator'; export * from './SymbolObservable'; export * from './asyncGeneratorWithLastValue'; export * from './asyncIteratorToAsyncIterable'; @@ -24,6 +23,7 @@ export * from './iterableSlice'; export * from './iterableSome'; export * from './iterableToSpliced'; export * from './iterableToString'; +export * from './iterableWritableStream'; export * from './iteratorToIterable'; export * from './observableFromAsync'; export * from './observableSubscribeAsReadable'; diff --git a/packages/iter-fest/src/iterableWritableStream.spec.ts b/packages/iter-fest/src/iterableWritableStream.spec.ts new file mode 100644 index 0000000..87f6fdf --- /dev/null +++ b/packages/iter-fest/src/iterableWritableStream.spec.ts @@ -0,0 +1,112 @@ +import { IterableWritableStream } from './iterableWritableStream'; +import type { JestMockOf } from './private/JestMockOf'; + +describe('comprehensive', () => { + let done: JestMockOf<() => void>; + let values: number[]; + let writer: WritableStreamDefaultWriter; + + beforeEach(() => { + done = jest.fn(); + + const iterable = new IterableWritableStream(); + + writer = iterable.getWriter(); + values = []; + + (async () => { + for await (const value of iterable) { + values.push(value); + } + + done(); + })(); + }); + + test('should receive no values', () => expect(values).toEqual([])); + test('should not completed the for-loop', () => expect(done).not.toHaveBeenCalled()); + + describe('when push(1) is called', () => { + beforeEach(() => writer.write(1)); + + test('should receive a value', () => expect(values).toEqual([1])); + + describe('when push(2) is called', () => { + beforeEach(() => writer.write(2)); + + test('should receive a value', () => expect(values).toEqual([1, 2])); + }); + }); + + describe('when close() is called', () => { + beforeEach(() => writer.close()); + + test('should completed the for-loop', () => expect(done).toHaveBeenCalledTimes(1)); + }); +}); + +test('two serial iterations should continue where it left', async () => { + const iterable = new IterableWritableStream(); + const writer = iterable.getWriter(); + const getValuesUntil = async (until?: number | undefined) => { + const values: number[] = []; + + for await (const value of iterable) { + values.push(value); + + if (typeof until !== 'undefined' && value === until) { + break; + } + } + + return values; + }; + + const promise1 = getValuesUntil(2); + + writer.write(1); + writer.write(2); + writer.write(3); + writer.write(4); + writer.close(); + + await expect(promise1).resolves.toEqual([1, 2]); + + // Make sure there are enough time to idle before the next consumer begin. + await undefined; + + const promise2 = getValuesUntil(); + + await expect(promise2).resolves.toEqual([3, 4]); +}); + +test('two parallel iterations should round robin', async () => { + const iterable = new IterableWritableStream(); + const writer = iterable.getWriter(); + const getValues = async () => { + const values: number[] = []; + + for await (const value of iterable) { + values.push(value); + + if (value === 3) { + break; + } + } + + return values; + }; + + const promise1 = getValues(); + const promise2 = getValues(); + + writer.write(1); + writer.write(2); + writer.write(3); + writer.write(4); + writer.write(5); + writer.close(); + + await expect(promise1).resolves.toEqual([1, 3]); + await expect(promise2).resolves.toEqual([2, 4, 5]); +}); diff --git a/packages/iter-fest/src/iterableWritableStream.ts b/packages/iter-fest/src/iterableWritableStream.ts new file mode 100644 index 0000000..b73032d --- /dev/null +++ b/packages/iter-fest/src/iterableWritableStream.ts @@ -0,0 +1,46 @@ +import withResolvers from './private/withResolvers'; + +const CLOSE = Symbol(); + +export class IterableWritableStream extends WritableStream { + constructor() { + super({ + close: () => { + this.#buffer.push(CLOSE); + this.#writeResolvers.resolve(); + }, + write: (chunk: T) => { + this.#buffer.push(chunk); + this.#writeResolvers.resolve(); + this.#writeResolvers = withResolvers(); + } + }); + } + + #buffer: (T | typeof CLOSE)[] = []; + #writeResolvers: PromiseWithResolvers = withResolvers(); + + [Symbol.asyncIterator](): AsyncIterator { + return { + next: async () => { + while (!this.#buffer.length) { + await this.#writeResolvers.promise; + } + + const value = this.#buffer[0] as T | typeof CLOSE; + + if (value === CLOSE) { + return { done: true, value: undefined }; + } + + // If it is not CLOSE, remove it from the queue. + this.#buffer.shift(); + + // Idle here so concurrent iteration has a chance to pick up the next value in a round robin fashion. + await undefined; + + return { done: false, value }; + } + }; + } +} diff --git a/packages/iter-fest/tsup.config.ts b/packages/iter-fest/tsup.config.ts index 1c8914e..4cc55dd 100644 --- a/packages/iter-fest/tsup.config.ts +++ b/packages/iter-fest/tsup.config.ts @@ -28,11 +28,11 @@ export default defineConfig([ 'iter-fest.iterableSome': './src/iterableSome.ts', 'iter-fest.iterableToSpliced': './src/iterableToSpliced.ts', 'iter-fest.iterableToString': './src/iterableToString.ts', + 'iter-fest.iterableWritableStream': './src/iterableWritableStream.ts', 'iter-fest.iteratorToIterable': './src/iteratorToIterable.ts', 'iter-fest.observable': './src/Observable.ts', 'iter-fest.observableFromAsync': './src/observableFromAsync.ts', 'iter-fest.observableSubscribeAsReadable': './src/observableSubscribeAsReadable.ts', - 'iter-fest.pushAsyncIterableIterator': './src/PushAsyncIterableIterator.ts', 'iter-fest.readableStreamFrom': './src/readableStreamFrom.ts', 'iter-fest.readerValues': './src/readerValues.ts', 'iter-fest.symbolObservable': './src/SymbolObservable.ts'