Skip to content

Commit

Permalink
Clean up (#14)
Browse files Browse the repository at this point in the history
* Clean up

* Removed `observableValues`

* Update entries

* Rename to `readerValues`

* Add pull-based test

* Added mixed mode test

* Renamed to `readerValues`
  • Loading branch information
compulim authored Jun 9, 2024
1 parent 4decbaa commit f9dfd27
Show file tree
Hide file tree
Showing 13 changed files with 187 additions and 204 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Added typed `Observable` from `core-js-pure`, in PR [#8](https://github.com/compulim/iter-fest/pull/8)
- Added `observableValues` to convert `Observable` into `AsyncIterableIterator` in PR [#8](https://github.com/compulim/iter-fest/pull/8)
- Added `observableFromAsync` to convert `AsyncIterable` into `Observable` in PR [#9](https://github.com/compulim/iter-fest/pull/9)
- Added `PushAsyncIterableIterator` in PR [#11](https://github.com/compulim/iter-fest/pull/11)
- Added `readerToAsyncIterableIterator` in PR [#12](https://github.com/compulim/iter-fest/pull/12)
- Added `readerValues` in PR [#12](https://github.com/compulim/iter-fest/pull/12) and [#14](https://github.com/compulim/iter-fest/pull/14)
- Added `observableSubscribeAsReadable` in PR [#13](https://github.com/compulim/iter-fest/pull/13)

### Changed
Expand Down
95 changes: 58 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,20 @@ console.log(iterableSome(iterable, value => value % 2)); // Prints "true".

List of ported functions: [`at`](https://tc39.es/ecma262/#sec-array.prototype.at), [`concat`](https://tc39.es/ecma262/#sec-array.prototype.concat), [`entries`](https://tc39.es/ecma262/#sec-array.prototype.entries), [`every`](https://tc39.es/ecma262/#sec-array.prototype.every), [`filter`](https://tc39.es/ecma262/#sec-array.prototype.filter), [`find`](https://tc39.es/ecma262/#sec-array.prototype.find), [`findIndex`](https://tc39.es/ecma262/#sec-array.prototype.findindex), [`findLast`](https://tc39.es/ecma262/#sec-array.prototype.findlast), [`findLastIndex`](https://tc39.es/ecma262/#sec-array.prototype.findlastindex), [`forEach`](https://tc39.es/ecma262/#sec-array.prototype.foreach), [`includes`](https://tc39.es/ecma262/#sec-array.prototype.includes), [`indexOf`](https://tc39.es/ecma262/#sec-array.prototype.indexof), [`join`](https://tc39.es/ecma262/#sec-array.prototype.join), [`keys`](https://tc39.es/ecma262/#sec-array.prototype.keys), [`map`](https://tc39.es/ecma262/#sec-array.prototype.map), [`reduce`](https://tc39.es/ecma262/#sec-array.prototype.reduce), [`slice`](https://tc39.es/ecma262/#sec-array.prototype.slice), [`some`](https://tc39.es/ecma262/#sec-array.prototype.some), [`toSpliced`](https://tc39.es/ecma262/#sec-array.prototype.tospliced), and [`toString`](https://tc39.es/ecma262/#sec-array.prototype.tostring).

## Conversions

| From | To | Function signature |
| ----------------------------- | ----------------------- | ------------------------------------------------------------------------------------------------------------------------------- |
| `Iterator` | `IterableIterator` | [`iteratorToIterable<T>(iterator: Iterator<T>): IterableIterator<T>`](#converting-an-iterator-to-iterable) |
| `Observable` | `ReadableStream` | [`observableSubscribeAsReadable<T>(observable: Observable<T>): ReadableStream<T>`](#converting-an-observable-to-readablestream) |
| `ReadableStreamDefaultReader` | `AsyncIterableIterator` | [`readerValues`<T>(reader: ReadableStreamDefaultReader<T>): AsyncIterableIterator<T>`](#iterating-readablestreamdefaultreader) |
| `AsyncIterable` | `Observable` | [`observableFromAsync<T>(iterator: AsyncIterableIterator<T>): Observable<T>`](#converting-an-asynciterable-to-observable) |

To convert `Observable` to `AsyncIterableIterator`, [use `ReadableStream` as intermediate format](#converting-an-observable-to-asynciterableiterator).

### Converting an iterator to iterable

`iteratorToIterable` converts a pure iterator to `IterableIterator` and enable for-loop iteration.
`iteratorToIterable` enable a [pure iterator](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Iterator) to be iterable using a for-loop statement.

```ts
const iterate = (): Iterator<number> => {
Expand All @@ -44,7 +55,7 @@ const iterate = (): Iterator<number> => {
return {
next: () => {
if (++value <= 3) {
return { done: false, value };
return { value };
}

return { done: true, value: undefined };
Expand All @@ -57,20 +68,17 @@ for (const value of iteratorToIterable(iterate())) {
}
```

### Typed `Observable`

`Observable` and `Symbol.observable` is re-exported from `core-js-pure` with proper type definitions.

### Converting an `Observable` to `AsyncIterableIterator`

`observableValues` subscribes to an `Observable` and return as `AsyncIterableIterator`.
`ReadableStream` can be used to when converting an `Observable` to `AsyncIterableIterator`.

`Observable` is push-based and `AsyncIterableIterator` is pull-based. Values from `Observable` may push continuously and will be buffered internally. When for-loop break or complete, the iterator will unsubscribe from the `Observable`.
Note: `Observable` is push-based and it does not support flow control. When converting to `AsyncIterableIteratorrr`, the internal buffer of `ReadableStream` could build up quickly.

```ts
const observable = Observable.from([1, 2, 3]);
const readable = observableSubscribeAsReadable(observable);

for await (const value of observableValues(observable)) {
for await (const value of readerValues(readable.getReader())) {
console.log(value); // Prints "1", "2", "3".
}
```
Expand All @@ -94,61 +102,74 @@ const next = value => console.log(value);
observable.subscribe({ next }); // Prints "1", "2", "3".
```

### Producer-consumer queue
## Converting an `Observable` to `ReadableStream`

`PushAsyncIterableIterator` is a simple push-based producer-consumer queue. The producer can push a new job at anytime. The consumer will wait for jobs to be available.
`ReadableStream` is powerful for transforming and piping stream of data. It can be formed using data from both push-based and pull-based source with backpressuree.

A push-based queue is easier to use than a pull-based queue. However, pull-based queue offers better flow control. For a full-featured producer-consumer queue that supports flow control, use `ReadableStream` instead.
Note: `Observable` is push-based and it does not support flow control. When converting to `ReadableStream`, the internal buffer could build up quickly.

```ts
const iterable = new PushAsyncIterableIterator();

(async function consumer() {
for await (const value of iterable) {
console.log(value);
}

console.log('Done');
})();

(async function producer() {
iterable.push(1);
iterable.push(2);
iterable.push(3);
iterable.close();
})();
const observable = Observable.from([1, 2, 3]);
const readable = observableSubscribeAsReadable(observable);
const reader = readable.getReader();

// Prints "1", "2", "3", "Done".
readable.pipeTo(stream.writable); // Will write 1, 2, 3.
```

### Iterating `ReadableStreamDefaultReader`

`readerToAsyncIterableIterator` will convert default reader of `ReadableStream` into an `AsyncIterableIterator` to use in for-loop.
`readerValues` will convert default reader of `ReadableStream` into an `AsyncIterableIterator` to use in for-loop.

```ts
const readableStream = new ReadableStream({
start(controller) {
controller.enqueue(1);
controller.enqueue(2);
},
pull(controller) {
controller.enqueue(3);
controller.close();
}
});

for await (const value of readerToAsyncIterableIterator(readableStream.getReader())) {
for await (const value of readerValues(readableStream.getReader())) {
console.log(value); // Prints "1", "2", "3".
}
```

## Converts `Observable` into `ReadableStream`
## Others

`ReadableStream` is powerful for transforming and piping stream of data. It can be formed using data from both push-based and pull-based source with backpressuree.
### Typed `Observable`

`Observable` and `Symbol.observable` is re-exported from `core-js-pure` with proper type definitions.

### Producer-consumer queue

`PushAsyncIterableIterator` is a simple push-based producer-consumer queue and designed to decouple the flow between a producer and consumer. The producer can push a new job at anytime. The consumer can pull a job at its own convenience.

Compare to pull-based queue, a push-based queue is easier to use. However, pull-based queue offers better flow control as it will produce a job only if there is a consumer ready to consume.

For a full-featured producer-consumer queue that supports flow control, use `ReadableStream` instead.

```ts
const observable = Observable.from([1, 2, 3]);
const readable = observableSubscribeAsReadable(observable);
const reader = readable.getReader();
const iterable = new PushAsyncIterableIterator();

readable.pipeTo(stream.writable); // Will write 1, 2, 3.
(async function consumer() {
for await (const value of iterable) {
console.log(value);
}

console.log('Done');
})();

(async function producer() {
iterable.push(1);
iterable.push(2);
iterable.push(3);
iterable.close();
})();

// Prints "1", "2", "3", "Done".
```

## Behaviors
Expand Down
18 changes: 3 additions & 15 deletions packages/integration-test/importDefault.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import {
iteratorToIterable,
observableFromAsync,
observableSubscribeAsReadable,
observableValues,
readerToAsyncIterableIterator
readerValues
} from 'iter-fest';

test('iterableAt should work', () => expect(iterableAt([1, 2, 3].values(), 1)).toBe(2));
Expand Down Expand Up @@ -154,17 +153,6 @@ test('observableSubscribeAsReadable should work', async () => {
await expect(reader.read()).resolves.toEqual({ done: true });
});

test('observableValues should work', async () => {
const observable = Observable.from([1, 2, 3]);
const values = [];

for await (const value of observableValues(observable)) {
values.push(value);
}

expect(values).toEqual([1, 2, 3]);
});

test('PushAsyncIterableIterator should work', async () => {
let deferred = withResolvers();
const done = jest.fn();
Expand Down Expand Up @@ -201,7 +189,7 @@ test('PushAsyncIterableIterator should work', async () => {
expect(done).toHaveBeenCalledTimes(1);
});

test('readerToAsyncIterableIterator should work', async () => {
test('readerValues should work', async () => {
const readableStream = new ReadableStream({
start(controller) {
controller.enqueue(1);
Expand All @@ -212,7 +200,7 @@ test('readerToAsyncIterableIterator should work', async () => {

const values = [];

for await (const value of readerToAsyncIterableIterator(readableStream.getReader())) {
for await (const value of readerValues(readableStream.getReader())) {
values.push(value);
}

Expand Down
18 changes: 3 additions & 15 deletions packages/integration-test/importNamed.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import { iteratorToIterable } from 'iter-fest/iteratorToIterable';
import { Observable } from 'iter-fest/observable';
import { observableFromAsync } from 'iter-fest/observableFromAsync';
import { observableSubscribeAsReadable } from 'iter-fest/observableSubscribeAsReadable';
import { observableValues } from 'iter-fest/observableValues';
import { PushAsyncIterableIterator } from 'iter-fest/pushAsyncIterableIterator';
import { readerToAsyncIterableIterator } from 'iter-fest/readerToAsyncIterableIterator';
import { readerValues } from 'iter-fest/readerValues';
import { SymbolObservable } from 'iter-fest/symbolObservable';

test('iterableAt should work', () => expect(iterableAt([1, 2, 3].values(), 1)).toBe(2));
Expand Down Expand Up @@ -152,17 +151,6 @@ test('observableSubscribeAsReadable should work', async () => {
await expect(reader.read()).resolves.toEqual({ done: true });
});

test('observableValues should work', async () => {
const observable = Observable.from([1, 2, 3]);
const values = [];

for await (const value of observableValues(observable)) {
values.push(value);
}

expect(values).toEqual([1, 2, 3]);
});

test('PushAsyncIterableIterator should work', async () => {
let deferred = withResolvers();
const done = jest.fn();
Expand Down Expand Up @@ -199,7 +187,7 @@ test('PushAsyncIterableIterator should work', async () => {
expect(done).toHaveBeenCalledTimes(1);
});

test('readerToAsyncIterableIterator should work', async () => {
test('readerValues should work', async () => {
const readableStream = new ReadableStream({
start(controller) {
controller.enqueue(1);
Expand All @@ -210,7 +198,7 @@ test('readerToAsyncIterableIterator should work', async () => {

const values = [];

for await (const value of readerToAsyncIterableIterator(readableStream.getReader())) {
for await (const value of readerValues(readableStream.getReader())) {
values.push(value);
}

Expand Down
18 changes: 3 additions & 15 deletions packages/integration-test/requireNamed.test.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ const { iteratorToIterable } = require('iter-fest/iteratorToIterable');
const { Observable } = require('iter-fest/observable');
const { observableFromAsync } = require('iter-fest/observableFromAsync');
const { observableSubscribeAsReadable } = require('iter-fest/observableSubscribeAsReadable');
const { observableValues } = require('iter-fest/observableValues');
const { PushAsyncIterableIterator } = require('iter-fest/pushAsyncIterableIterator');
const { readerToAsyncIterableIterator } = require('iter-fest/readerToAsyncIterableIterator');
const { readerValues } = require('iter-fest/readerValues');
const { SymbolObservable } = require('iter-fest/symbolObservable');

test('iterableAt should work', () => expect(iterableAt([1, 2, 3].values(), 1)).toBe(2));
Expand Down Expand Up @@ -153,17 +152,6 @@ test('observableSubscribeAsReadable should work', async () => {
await expect(reader.read()).resolves.toEqual({ done: true });
});

test('observableValues should work', async () => {
const observable = Observable.from([1, 2, 3]);
const values = [];

for await (const value of observableValues(observable)) {
values.push(value);
}

expect(values).toEqual([1, 2, 3]);
});

test('PushAsyncIterableIterator should work', async () => {
let deferred = withResolvers();
const done = jest.fn();
Expand Down Expand Up @@ -200,7 +188,7 @@ test('PushAsyncIterableIterator should work', async () => {
expect(done).toHaveBeenCalledTimes(1);
});

test('readerToAsyncIterableIterator should work', async () => {
test('readerValues should work', async () => {
const readableStream = new ReadableStream({
start(controller) {
controller.enqueue(1);
Expand All @@ -211,7 +199,7 @@ test('readerToAsyncIterableIterator should work', async () => {

const values = [];

for await (const value of readerToAsyncIterableIterator(readableStream.getReader())) {
for await (const value of readerValues(readableStream.getReader())) {
values.push(value);
}

Expand Down
18 changes: 3 additions & 15 deletions packages/integration-test/requiredDefault.test.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ const {
Observable,
observableFromAsync,
observableSubscribeAsReadable,
observableValues,
PushAsyncIterableIterator,
readerToAsyncIterableIterator,
readerValues,
SymbolObservable
} = require('iter-fest');

Expand Down Expand Up @@ -155,17 +154,6 @@ test('observableSubscribeAsReadable should work', async () => {
await expect(reader.read()).resolves.toEqual({ done: true });
});

test('observableValues should work', async () => {
const observable = Observable.from([1, 2, 3]);
const values = [];

for await (const value of observableValues(observable)) {
values.push(value);
}

expect(values).toEqual([1, 2, 3]);
});

test('PushAsyncIterableIterator should work', async () => {
let deferred = withResolvers();
const done = jest.fn();
Expand Down Expand Up @@ -202,7 +190,7 @@ test('PushAsyncIterableIterator should work', async () => {
expect(done).toHaveBeenCalledTimes(1);
});

test('readerToAsyncIterableIterator should work', async () => {
test('readerValues should work', async () => {
const readableStream = new ReadableStream({
start(controller) {
controller.enqueue(1);
Expand All @@ -213,7 +201,7 @@ test('readerToAsyncIterableIterator should work', async () => {

const values = [];

for await (const value of readerToAsyncIterableIterator(readableStream.getReader())) {
for await (const value of readerValues(readableStream.getReader())) {
values.push(value);
}

Expand Down
Loading

0 comments on commit f9dfd27

Please sign in to comment.