Skip to content

Commit

Permalink
Add PushAsyncIterableIterator (#11)
Browse files Browse the repository at this point in the history
* Add PushAsyncIterableIterator

* Verbiage

* Fix next after close
  • Loading branch information
compulim authored Jun 8, 2024
1 parent c619924 commit 0a3d131
Show file tree
Hide file tree
Showing 14 changed files with 289 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added typed `Observable` from `core-js-pure`, in PR [#8](https://github.com/compulim/iter-fest/pull/8)
- Added `observableValues` to convert `Observable` into `AsyncIterableIterator` in PR [#8](https://github.com/compulim/iter-fest/pull/8)
- Added `observableFromAsync` to convert `AsyncIterable` into `Observable` in PR [#9](https://github.com/compulim/iter-fest/pull/9)
- Added `PushAsyncIterableIterator` into PR [#10](https://github.com/compulim/iter-fest/pull/10)

### Changed

Expand Down
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,33 @@ const next = value => console.log(value);
observable.subscribe({ next }); // Prints "1", "2", "3".
```

### Producer-consumer queue

`PushAsyncIterableIterator` is a simple push-based producer-consumer queue. The producer can push a new job at anytime. The consumer will wait for jobs to be available.

A push-based queue is easier to use than a pull-based queue. However, pull-based queue offers better flow control. For a full-featured producer-consumer queue that supports flow control, use `ReadableStream` instead.

```ts
const iterable = new PushAsyncIterableIterator();

(async function consumer() {
for await (const value of iterable) {
console.log(value);
}

console.log('Done');
})();

(async function producer() {
iterable.push(1);
iterable.push(2);
iterable.push(3);
iterable.close();
})();

// Prints "1", "2", "3", "Done".
```

## Behaviors

### How this compares to the TC39 proposals?
Expand Down
1 change: 1 addition & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 41 additions & 3 deletions packages/integration-test/importDefault.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import withResolvers from 'core-js-pure/full/promise/with-resolvers';
import {
Observable,
PushAsyncIterableIterator,
SymbolObservable,
iterableAt,
iterableConcat,
iterableEntries,
Expand All @@ -20,10 +24,8 @@ import {
iterableToSpliced,
iterableToString,
iteratorToIterable,
Observable,
observableFromAsync,
observableValues,
SymbolObservable
observableValues
} from 'iter-fest';

test('iterableAt should work', () => expect(iterableAt([1, 2, 3].values(), 1)).toBe(2));
Expand Down Expand Up @@ -141,6 +143,42 @@ test('observableValues should work', async () => {
expect(values).toEqual([1, 2, 3]);
});

test('PushAsyncIterableIterator should work', async () => {
let deferred = withResolvers();
const done = jest.fn();
const iterable = new PushAsyncIterableIterator();
const values = [];

(async function () {
for await (const value of iterable) {
values.push(value);

deferred.resolve();
deferred = withResolvers();
}

done();
deferred.resolve();
})();

expect(values).toEqual([]);
expect(done).not.toHaveBeenCalled();

iterable.push(1);
await deferred.promise;
expect(values).toEqual([1]);
expect(done).not.toHaveBeenCalled();

iterable.push(2);
await deferred.promise;
expect(values).toEqual([1, 2]);
expect(done).not.toHaveBeenCalled();

iterable.close();
await deferred.promise;
expect(done).toHaveBeenCalledTimes(1);
});

test('SymbolObservable should work', () => {
const observable = new Observable(() => {});

Expand Down
38 changes: 38 additions & 0 deletions packages/integration-test/importNamed.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import withResolvers from 'core-js-pure/full/promise/with-resolvers';
import { iterableAt } from 'iter-fest/iterableAt';
import { iterableConcat } from 'iter-fest/iterableConcat';
import { iterableEntries } from 'iter-fest/iterableEntries';
Expand All @@ -22,6 +23,7 @@ import { iteratorToIterable } from 'iter-fest/iteratorToIterable';
import { Observable } from 'iter-fest/observable';
import { observableFromAsync } from 'iter-fest/observableFromAsync';
import { observableValues } from 'iter-fest/observableValues';
import { PushAsyncIterableIterator } from 'iter-fest/pushAsyncIterableIterator';
import { SymbolObservable } from 'iter-fest/symbolObservable';

test('iterableAt should work', () => expect(iterableAt([1, 2, 3].values(), 1)).toBe(2));
Expand Down Expand Up @@ -139,6 +141,42 @@ test('observableValues should work', async () => {
expect(values).toEqual([1, 2, 3]);
});

test('PushAsyncIterableIterator should work', async () => {
let deferred = withResolvers();
const done = jest.fn();
const iterable = new PushAsyncIterableIterator();
const values = [];

(async function () {
for await (const value of iterable) {
values.push(value);

deferred.resolve();
deferred = withResolvers();
}

done();
deferred.resolve();
})();

expect(values).toEqual([]);
expect(done).not.toHaveBeenCalled();

iterable.push(1);
await deferred.promise;
expect(values).toEqual([1]);
expect(done).not.toHaveBeenCalled();

iterable.push(2);
await deferred.promise;
expect(values).toEqual([1, 2]);
expect(done).not.toHaveBeenCalled();

iterable.close();
await deferred.promise;
expect(done).toHaveBeenCalledTimes(1);
});

test('SymbolObservable should work', () => {
const observable = new Observable(() => {});

Expand Down
1 change: 1 addition & 0 deletions packages/integration-test/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"@babel/preset-typescript": "^7.24.6",
"@types/jest": "^29.5.12",
"babel-jest": "^29.7.0",
"core-js-pure": "^3.37.1",
"jest": "^29.7.0",
"jest-environment-jsdom": "^29.7.0"
},
Expand Down
39 changes: 39 additions & 0 deletions packages/integration-test/requireNamed.test.cjs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const withResolvers = require('core-js-pure/full/promise/with-resolvers');

const { iterableAt } = require('iter-fest/iterableAt');
const { iterableConcat } = require('iter-fest/iterableConcat');
const { iterableEntries } = require('iter-fest/iterableEntries');
Expand All @@ -22,6 +24,7 @@ const { iteratorToIterable } = require('iter-fest/iteratorToIterable');
const { Observable } = require('iter-fest/observable');
const { observableFromAsync } = require('iter-fest/observableFromAsync');
const { observableValues } = require('iter-fest/observableValues');
const { PushAsyncIterableIterator } = require('iter-fest/pushAsyncIterableIterator');
const { SymbolObservable } = require('iter-fest/symbolObservable');

test('iterableAt should work', () => expect(iterableAt([1, 2, 3].values(), 1)).toBe(2));
Expand Down Expand Up @@ -139,6 +142,42 @@ test('observableValues should work', async () => {
expect(values).toEqual([1, 2, 3]);
});

test('PushAsyncIterableIterator should work', async () => {
let deferred = withResolvers();
const done = jest.fn();
const iterable = new PushAsyncIterableIterator();
const values = [];

(async function () {
for await (const value of iterable) {
values.push(value);

deferred.resolve();
deferred = withResolvers();
}

done();
deferred.resolve();
})();

expect(values).toEqual([]);
expect(done).not.toHaveBeenCalled();

iterable.push(1);
await deferred.promise;
expect(values).toEqual([1]);
expect(done).not.toHaveBeenCalled();

iterable.push(2);
await deferred.promise;
expect(values).toEqual([1, 2]);
expect(done).not.toHaveBeenCalled();

iterable.close();
await deferred.promise;
expect(done).toHaveBeenCalledTimes(1);
});

test('SymbolObservable should work', () => {
const observable = new Observable(() => {});

Expand Down
39 changes: 39 additions & 0 deletions packages/integration-test/requiredDefault.test.cjs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const withResolvers = require('core-js-pure/full/promise/with-resolvers');

const {
iterableAt,
iterableConcat,
Expand All @@ -23,6 +25,7 @@ const {
Observable,
observableFromAsync,
observableValues,
PushAsyncIterableIterator,
SymbolObservable
} = require('iter-fest');

Expand Down Expand Up @@ -141,6 +144,42 @@ test('observableValues should work', async () => {
expect(values).toEqual([1, 2, 3]);
});

test('PushAsyncIterableIterator should work', async () => {
let deferred = withResolvers();
const done = jest.fn();
const iterable = new PushAsyncIterableIterator();
const values = [];

(async function () {
for await (const value of iterable) {
values.push(value);

deferred.resolve();
deferred = withResolvers();
}

done();
deferred.resolve();
})();

expect(values).toEqual([]);
expect(done).not.toHaveBeenCalled();

iterable.push(1);
await deferred.promise;
expect(values).toEqual([1]);
expect(done).not.toHaveBeenCalled();

iterable.push(2);
await deferred.promise;
expect(values).toEqual([1, 2]);
expect(done).not.toHaveBeenCalled();

iterable.close();
await deferred.promise;
expect(done).toHaveBeenCalledTimes(1);
});

test('SymbolObservable should work', () => {
const observable = new Observable(() => {});

Expand Down
10 changes: 10 additions & 0 deletions packages/iter-fest/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,16 @@
"default": "./dist/iter-fest.observableValues.js"
}
},
"./pushAsyncIterableIterator": {
"import": {
"types": "./dist/iter-fest.pushAsyncIterableIterator.d.mts",
"default": "./dist/iter-fest.pushAsyncIterableIterator.mjs"
},
"require": {
"types": "./dist/iter-fest.pushAsyncIterableIterator.d.ts",
"default": "./dist/iter-fest.pushAsyncIterableIterator.js"
}
},
"./symbolObservable": {
"import": {
"types": "./dist/iter-fest.symbolObservable.d.mts",
Expand Down
53 changes: 53 additions & 0 deletions packages/iter-fest/src/PushAsyncIterableIterator.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { PushAsyncIterableIterator } from './PushAsyncIterableIterator';
import type { JestMockOf } from './private/JestMockOf';

describe('comprehensive', () => {
let done: JestMockOf<() => void>;
let iterable: PushAsyncIterableIterator<number>;
let values: number[];

beforeEach(() => {
done = jest.fn();
iterable = new PushAsyncIterableIterator();
values = [];

(async () => {
for await (const value of iterable) {
values.push(value);
}

done();
})();
});

test('should receive no values', () => expect(values).toEqual([]));
test('should not completed the for-loop', () => expect(done).not.toHaveBeenCalled());

describe('when push(1) is called', () => {
beforeEach(() => iterable.push(1));

test('should receive a value', () => expect(values).toEqual([1]));

describe('when push(2) is called', () => {
beforeEach(() => iterable.push(2));

test('should receive a value', () => expect(values).toEqual([1, 2]));
});
});

describe('when close() is called', () => {
beforeEach(() => iterable.close());

test('should completed the for-loop', () => expect(done).toHaveBeenCalledTimes(1));
});
});

test('push after close should close the iterable', async () => {
const iterable = new PushAsyncIterableIterator();

iterable.close();
await expect(iterable.next()).resolves.toEqual({ done: true, value: undefined });

iterable.push(1);
await expect(iterable.next()).resolves.toEqual({ done: true, value: undefined });
});
34 changes: 34 additions & 0 deletions packages/iter-fest/src/PushAsyncIterableIterator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import withResolvers from './private/withResolvers';

const CLOSE = Symbol('close');

export class PushAsyncIterableIterator<T> implements AsyncIterableIterator<T> {
#closed: boolean = false;
#pushResolvers: PromiseWithResolvers<T | typeof CLOSE> = withResolvers();

[Symbol.asyncIterator]() {
return this;
}

close() {
this.#closed = true;
this.#pushResolvers.resolve(CLOSE);
}

async next(): Promise<IteratorResult<T>> {
const value = await this.#pushResolvers.promise;

if (value === CLOSE) {
return { done: true, value: undefined };
}

return { done: false, value };
}

push(value: T) {
if (!this.#closed) {
this.#pushResolvers.resolve(value);
this.#pushResolvers = withResolvers();
}
}
}
Loading

0 comments on commit 0a3d131

Please sign in to comment.