diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e82c04..38283fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added typed `Observable` from `core-js-pure`, in PR [#8](https://github.com/compulim/iter-fest/pull/8) - Added `observableValues` to convert `Observable` into `AsyncIterableIterator` in PR [#8](https://github.com/compulim/iter-fest/pull/8) - Added `observableFromAsync` to convert `AsyncIterable` into `Observable` in PR [#9](https://github.com/compulim/iter-fest/pull/9) +- Added `PushAsyncIterableIterator` into PR [#10](https://github.com/compulim/iter-fest/pull/10) ### Changed diff --git a/README.md b/README.md index fa3acca..b714e3d 100644 --- a/README.md +++ b/README.md @@ -94,6 +94,33 @@ const next = value => console.log(value); observable.subscribe({ next }); // Prints "1", "2", "3". ``` +### Producer-consumer queue + +`PushAsyncIterableIterator` is a simple push-based producer-consumer queue. The producer can push a new job at anytime. The consumer will wait for jobs to be available. + +A push-based queue is easier to use than a pull-based queue. However, pull-based queue offers better flow control. For a full-featured producer-consumer queue that supports flow control, use `ReadableStream` instead. + +```ts +const iterable = new PushAsyncIterableIterator(); + +(async function consumer() { + for await (const value of iterable) { + console.log(value); + } + + console.log('Done'); +})(); + +(async function producer() { + iterable.push(1); + iterable.push(2); + iterable.push(3); + iterable.close(); +})(); + +// Prints "1", "2", "3", "Done". +``` + ## Behaviors ### How this compares to the TC39 proposals? diff --git a/package-lock.json b/package-lock.json index 14bf5b9..eefd347 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10643,6 +10643,7 @@ "@babel/preset-typescript": "^7.24.6", "@types/jest": "^29.5.12", "babel-jest": "^29.7.0", + "core-js-pure": "^3.37.1", "jest": "^29.7.0", "jest-environment-jsdom": "^29.7.0" } diff --git a/packages/integration-test/importDefault.test.ts b/packages/integration-test/importDefault.test.ts index 6c612ad..09f5c8a 100644 --- a/packages/integration-test/importDefault.test.ts +++ b/packages/integration-test/importDefault.test.ts @@ -1,4 +1,8 @@ +import withResolvers from 'core-js-pure/full/promise/with-resolvers'; import { + Observable, + PushAsyncIterableIterator, + SymbolObservable, iterableAt, iterableConcat, iterableEntries, @@ -20,10 +24,8 @@ import { iterableToSpliced, iterableToString, iteratorToIterable, - Observable, observableFromAsync, - observableValues, - SymbolObservable + observableValues } from 'iter-fest'; test('iterableAt should work', () => expect(iterableAt([1, 2, 3].values(), 1)).toBe(2)); @@ -141,6 +143,42 @@ test('observableValues should work', async () => { expect(values).toEqual([1, 2, 3]); }); +test('PushAsyncIterableIterator should work', async () => { + let deferred = withResolvers(); + const done = jest.fn(); + const iterable = new PushAsyncIterableIterator(); + const values = []; + + (async function () { + for await (const value of iterable) { + values.push(value); + + deferred.resolve(); + deferred = withResolvers(); + } + + done(); + deferred.resolve(); + })(); + + expect(values).toEqual([]); + expect(done).not.toHaveBeenCalled(); + + iterable.push(1); + await deferred.promise; + expect(values).toEqual([1]); + expect(done).not.toHaveBeenCalled(); + + iterable.push(2); + await deferred.promise; + expect(values).toEqual([1, 2]); + expect(done).not.toHaveBeenCalled(); + + iterable.close(); + await deferred.promise; + expect(done).toHaveBeenCalledTimes(1); +}); + test('SymbolObservable should work', () => { const observable = new Observable(() => {}); diff --git a/packages/integration-test/importNamed.test.ts b/packages/integration-test/importNamed.test.ts index 1bbba19..f26d3da 100644 --- a/packages/integration-test/importNamed.test.ts +++ b/packages/integration-test/importNamed.test.ts @@ -1,3 +1,4 @@ +import withResolvers from 'core-js-pure/full/promise/with-resolvers'; import { iterableAt } from 'iter-fest/iterableAt'; import { iterableConcat } from 'iter-fest/iterableConcat'; import { iterableEntries } from 'iter-fest/iterableEntries'; @@ -22,6 +23,7 @@ import { iteratorToIterable } from 'iter-fest/iteratorToIterable'; import { Observable } from 'iter-fest/observable'; import { observableFromAsync } from 'iter-fest/observableFromAsync'; import { observableValues } from 'iter-fest/observableValues'; +import { PushAsyncIterableIterator } from 'iter-fest/pushAsyncIterableIterator'; import { SymbolObservable } from 'iter-fest/symbolObservable'; test('iterableAt should work', () => expect(iterableAt([1, 2, 3].values(), 1)).toBe(2)); @@ -139,6 +141,42 @@ test('observableValues should work', async () => { expect(values).toEqual([1, 2, 3]); }); +test('PushAsyncIterableIterator should work', async () => { + let deferred = withResolvers(); + const done = jest.fn(); + const iterable = new PushAsyncIterableIterator(); + const values = []; + + (async function () { + for await (const value of iterable) { + values.push(value); + + deferred.resolve(); + deferred = withResolvers(); + } + + done(); + deferred.resolve(); + })(); + + expect(values).toEqual([]); + expect(done).not.toHaveBeenCalled(); + + iterable.push(1); + await deferred.promise; + expect(values).toEqual([1]); + expect(done).not.toHaveBeenCalled(); + + iterable.push(2); + await deferred.promise; + expect(values).toEqual([1, 2]); + expect(done).not.toHaveBeenCalled(); + + iterable.close(); + await deferred.promise; + expect(done).toHaveBeenCalledTimes(1); +}); + test('SymbolObservable should work', () => { const observable = new Observable(() => {}); diff --git a/packages/integration-test/package.json b/packages/integration-test/package.json index 87980bf..d6f1d30 100644 --- a/packages/integration-test/package.json +++ b/packages/integration-test/package.json @@ -22,6 +22,7 @@ "@babel/preset-typescript": "^7.24.6", "@types/jest": "^29.5.12", "babel-jest": "^29.7.0", + "core-js-pure": "^3.37.1", "jest": "^29.7.0", "jest-environment-jsdom": "^29.7.0" }, diff --git a/packages/integration-test/requireNamed.test.cjs b/packages/integration-test/requireNamed.test.cjs index 11e17eb..acb4e47 100644 --- a/packages/integration-test/requireNamed.test.cjs +++ b/packages/integration-test/requireNamed.test.cjs @@ -1,3 +1,5 @@ +const withResolvers = require('core-js-pure/full/promise/with-resolvers'); + const { iterableAt } = require('iter-fest/iterableAt'); const { iterableConcat } = require('iter-fest/iterableConcat'); const { iterableEntries } = require('iter-fest/iterableEntries'); @@ -22,6 +24,7 @@ const { iteratorToIterable } = require('iter-fest/iteratorToIterable'); const { Observable } = require('iter-fest/observable'); const { observableFromAsync } = require('iter-fest/observableFromAsync'); const { observableValues } = require('iter-fest/observableValues'); +const { PushAsyncIterableIterator } = require('iter-fest/pushAsyncIterableIterator'); const { SymbolObservable } = require('iter-fest/symbolObservable'); test('iterableAt should work', () => expect(iterableAt([1, 2, 3].values(), 1)).toBe(2)); @@ -139,6 +142,42 @@ test('observableValues should work', async () => { expect(values).toEqual([1, 2, 3]); }); +test('PushAsyncIterableIterator should work', async () => { + let deferred = withResolvers(); + const done = jest.fn(); + const iterable = new PushAsyncIterableIterator(); + const values = []; + + (async function () { + for await (const value of iterable) { + values.push(value); + + deferred.resolve(); + deferred = withResolvers(); + } + + done(); + deferred.resolve(); + })(); + + expect(values).toEqual([]); + expect(done).not.toHaveBeenCalled(); + + iterable.push(1); + await deferred.promise; + expect(values).toEqual([1]); + expect(done).not.toHaveBeenCalled(); + + iterable.push(2); + await deferred.promise; + expect(values).toEqual([1, 2]); + expect(done).not.toHaveBeenCalled(); + + iterable.close(); + await deferred.promise; + expect(done).toHaveBeenCalledTimes(1); +}); + test('SymbolObservable should work', () => { const observable = new Observable(() => {}); diff --git a/packages/integration-test/requiredDefault.test.cjs b/packages/integration-test/requiredDefault.test.cjs index 9210e7e..9b366eb 100644 --- a/packages/integration-test/requiredDefault.test.cjs +++ b/packages/integration-test/requiredDefault.test.cjs @@ -1,3 +1,5 @@ +const withResolvers = require('core-js-pure/full/promise/with-resolvers'); + const { iterableAt, iterableConcat, @@ -23,6 +25,7 @@ const { Observable, observableFromAsync, observableValues, + PushAsyncIterableIterator, SymbolObservable } = require('iter-fest'); @@ -141,6 +144,42 @@ test('observableValues should work', async () => { expect(values).toEqual([1, 2, 3]); }); +test('PushAsyncIterableIterator should work', async () => { + let deferred = withResolvers(); + const done = jest.fn(); + const iterable = new PushAsyncIterableIterator(); + const values = []; + + (async function () { + for await (const value of iterable) { + values.push(value); + + deferred.resolve(); + deferred = withResolvers(); + } + + done(); + deferred.resolve(); + })(); + + expect(values).toEqual([]); + expect(done).not.toHaveBeenCalled(); + + iterable.push(1); + await deferred.promise; + expect(values).toEqual([1]); + expect(done).not.toHaveBeenCalled(); + + iterable.push(2); + await deferred.promise; + expect(values).toEqual([1, 2]); + expect(done).not.toHaveBeenCalled(); + + iterable.close(); + await deferred.promise; + expect(done).toHaveBeenCalledTimes(1); +}); + test('SymbolObservable should work', () => { const observable = new Observable(() => {}); diff --git a/packages/iter-fest/package.json b/packages/iter-fest/package.json index 78ef7ae..c0d7455 100644 --- a/packages/iter-fest/package.json +++ b/packages/iter-fest/package.json @@ -246,6 +246,16 @@ "default": "./dist/iter-fest.observableValues.js" } }, + "./pushAsyncIterableIterator": { + "import": { + "types": "./dist/iter-fest.pushAsyncIterableIterator.d.mts", + "default": "./dist/iter-fest.pushAsyncIterableIterator.mjs" + }, + "require": { + "types": "./dist/iter-fest.pushAsyncIterableIterator.d.ts", + "default": "./dist/iter-fest.pushAsyncIterableIterator.js" + } + }, "./symbolObservable": { "import": { "types": "./dist/iter-fest.symbolObservable.d.mts", diff --git a/packages/iter-fest/src/PushAsyncIterableIterator.spec.ts b/packages/iter-fest/src/PushAsyncIterableIterator.spec.ts new file mode 100644 index 0000000..5d07909 --- /dev/null +++ b/packages/iter-fest/src/PushAsyncIterableIterator.spec.ts @@ -0,0 +1,53 @@ +import { PushAsyncIterableIterator } from './PushAsyncIterableIterator'; +import type { JestMockOf } from './private/JestMockOf'; + +describe('comprehensive', () => { + let done: JestMockOf<() => void>; + let iterable: PushAsyncIterableIterator; + let values: number[]; + + beforeEach(() => { + done = jest.fn(); + iterable = new PushAsyncIterableIterator(); + values = []; + + (async () => { + for await (const value of iterable) { + values.push(value); + } + + done(); + })(); + }); + + test('should receive no values', () => expect(values).toEqual([])); + test('should not completed the for-loop', () => expect(done).not.toHaveBeenCalled()); + + describe('when push(1) is called', () => { + beforeEach(() => iterable.push(1)); + + test('should receive a value', () => expect(values).toEqual([1])); + + describe('when push(2) is called', () => { + beforeEach(() => iterable.push(2)); + + test('should receive a value', () => expect(values).toEqual([1, 2])); + }); + }); + + describe('when close() is called', () => { + beforeEach(() => iterable.close()); + + test('should completed the for-loop', () => expect(done).toHaveBeenCalledTimes(1)); + }); +}); + +test('push after close should close the iterable', async () => { + const iterable = new PushAsyncIterableIterator(); + + iterable.close(); + await expect(iterable.next()).resolves.toEqual({ done: true, value: undefined }); + + iterable.push(1); + await expect(iterable.next()).resolves.toEqual({ done: true, value: undefined }); +}); diff --git a/packages/iter-fest/src/PushAsyncIterableIterator.ts b/packages/iter-fest/src/PushAsyncIterableIterator.ts new file mode 100644 index 0000000..652c5a7 --- /dev/null +++ b/packages/iter-fest/src/PushAsyncIterableIterator.ts @@ -0,0 +1,34 @@ +import withResolvers from './private/withResolvers'; + +const CLOSE = Symbol('close'); + +export class PushAsyncIterableIterator implements AsyncIterableIterator { + #closed: boolean = false; + #pushResolvers: PromiseWithResolvers = withResolvers(); + + [Symbol.asyncIterator]() { + return this; + } + + close() { + this.#closed = true; + this.#pushResolvers.resolve(CLOSE); + } + + async next(): Promise> { + const value = await this.#pushResolvers.promise; + + if (value === CLOSE) { + return { done: true, value: undefined }; + } + + return { done: false, value }; + } + + push(value: T) { + if (!this.#closed) { + this.#pushResolvers.resolve(value); + this.#pushResolvers = withResolvers(); + } + } +} diff --git a/packages/iter-fest/src/index.ts b/packages/iter-fest/src/index.ts index f0ed73c..05b9cb3 100644 --- a/packages/iter-fest/src/index.ts +++ b/packages/iter-fest/src/index.ts @@ -1,4 +1,5 @@ export * from './Observable'; +export * from './PushAsyncIterableIterator'; export * from './SymbolObservable'; export * from './iterableAt'; export * from './iterableConcat'; diff --git a/packages/iter-fest/src/private/hasResolved.ts b/packages/iter-fest/src/private/hasResolved.ts new file mode 100644 index 0000000..3038411 --- /dev/null +++ b/packages/iter-fest/src/private/hasResolved.ts @@ -0,0 +1,3 @@ +export default function hasResolved(promise: Promise): Promise { + return Promise.race([promise.then(() => true), Promise.resolve(false)]); +} diff --git a/packages/iter-fest/tsup.config.ts b/packages/iter-fest/tsup.config.ts index 66706ad..7ff38a5 100644 --- a/packages/iter-fest/tsup.config.ts +++ b/packages/iter-fest/tsup.config.ts @@ -29,6 +29,7 @@ export default defineConfig([ 'iter-fest.observable': './src/Observable.ts', 'iter-fest.observableFromAsync': './src/observableFromAsync.ts', 'iter-fest.observableValues': './src/observableValues.ts', + 'iter-fest.pushAsyncIterableIterator': './src/PushAsyncIterableIterator.ts', 'iter-fest.symbolObservable': './src/SymbolObservable.ts' }, format: ['cjs', 'esm'],