Skip to content

Commit

Permalink
Add observableSubscribeAsReadable
Browse files Browse the repository at this point in the history
  • Loading branch information
compulim committed Jun 8, 2024
1 parent 22b897b commit ba0f601
Show file tree
Hide file tree
Showing 11 changed files with 210 additions and 7 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added typed `Observable` from `core-js-pure`, in PR [#8](https://github.com/compulim/iter-fest/pull/8)
- Added `observableValues` to convert `Observable` into `AsyncIterableIterator` in PR [#8](https://github.com/compulim/iter-fest/pull/8)
- Added `observableFromAsync` to convert `AsyncIterable` into `Observable` in PR [#9](https://github.com/compulim/iter-fest/pull/9)
- Added `PushAsyncIterableIterator` in PR [#10](https://github.com/compulim/iter-fest/pull/10)
- Added `readerToAsyncIterableIterator` in PR [#11](https://github.com/compulim/iter-fest/pull/11)
- Added `PushAsyncIterableIterator` in PR [#11](https://github.com/compulim/iter-fest/pull/11)
- Added `readerToAsyncIterableIterator` in PR [#12](https://github.com/compulim/iter-fest/pull/12)
- Added `observableSubscribeAsReadable` in PR [#13](https://github.com/compulim/iter-fest/pull/13)

### Changed

Expand Down
22 changes: 17 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,29 @@ const iterable = new PushAsyncIterableIterator();
`readerToAsyncIterableIterator` will convert default reader of `ReadableStream` into an `AsyncIterableIterator` to use in for-loop.

```ts
const readableStream = new ReadableStream({ start(controller) {
controller.enqueue(1);
controller.enqueue(2);
controller.close();
} });
const readableStream = new ReadableStream({
start(controller) {
controller.enqueue(1);
controller.enqueue(2);
controller.close();
}
});

for await (const value of readerToAsyncIterableIterator(readableStream.getReader())) {
console.log(value); // Prints "1", "2", "3".
}
```

## Converts `Observable` into `ReadableStream`

```ts
const observable = Observable.from([1, 2, 3]);
const readable = observableSubscribeAsReadable(observable);
const reader = readable.getReader();

readable.pipeTo(stream.writable); // Will write 1, 2, 3.
```

## Behaviors

### How this compares to the TC39 proposals?
Expand Down
21 changes: 21 additions & 0 deletions packages/integration-test/importDefault.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
iterableToString,
iteratorToIterable,
observableFromAsync,
observableSubscribeAsReadable,
observableValues,
readerToAsyncIterableIterator
} from 'iter-fest';
Expand Down Expand Up @@ -133,6 +134,26 @@ test('observableFromAsync should work', async () => {
expect(next).toHaveBeenNthCalledWith(3, 3);
});

test('observableSubscribeAsReadable should work', async () => {
const stream = new TextDecoderStream();
const observable = Observable.from([65, 66, 67]);
const readable = observableSubscribeAsReadable(observable);
const numberToInt8Array = new TransformStream({
transform(chunk, controller) {
controller.enqueue(Int8Array.from([chunk]));
}
});

readable.pipeThrough(numberToInt8Array).pipeTo(stream.writable);

const reader = stream.readable.getReader();

await expect(reader.read()).resolves.toEqual({ done: false, value: 'A' });
await expect(reader.read()).resolves.toEqual({ done: false, value: 'B' });
await expect(reader.read()).resolves.toEqual({ done: false, value: 'C' });
await expect(reader.read()).resolves.toEqual({ done: true });
});

test('observableValues should work', async () => {
const observable = Observable.from([1, 2, 3]);
const values = [];
Expand Down
21 changes: 21 additions & 0 deletions packages/integration-test/importNamed.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { iterableToString } from 'iter-fest/iterableToString';
import { iteratorToIterable } from 'iter-fest/iteratorToIterable';
import { Observable } from 'iter-fest/observable';
import { observableFromAsync } from 'iter-fest/observableFromAsync';
import { observableSubscribeAsReadable } from 'iter-fest/observableSubscribeAsReadable';
import { observableValues } from 'iter-fest/observableValues';
import { PushAsyncIterableIterator } from 'iter-fest/pushAsyncIterableIterator';
import { readerToAsyncIterableIterator } from 'iter-fest/readerToAsyncIterableIterator';
Expand Down Expand Up @@ -131,6 +132,26 @@ test('observableFromAsync should work', async () => {
expect(next).toHaveBeenNthCalledWith(3, 3);
});

test('observableSubscribeAsReadable should work', async () => {
const stream = new TextDecoderStream();
const observable = Observable.from([65, 66, 67]);
const readable = observableSubscribeAsReadable(observable);
const numberToInt8Array = new TransformStream({
transform(chunk, controller) {
controller.enqueue(Int8Array.from([chunk]));
}
});

readable.pipeThrough(numberToInt8Array).pipeTo(stream.writable);

const reader = stream.readable.getReader();

await expect(reader.read()).resolves.toEqual({ done: false, value: 'A' });
await expect(reader.read()).resolves.toEqual({ done: false, value: 'B' });
await expect(reader.read()).resolves.toEqual({ done: false, value: 'C' });
await expect(reader.read()).resolves.toEqual({ done: true });
});

test('observableValues should work', async () => {
const observable = Observable.from([1, 2, 3]);
const values = [];
Expand Down
21 changes: 21 additions & 0 deletions packages/integration-test/requireNamed.test.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const { iterableToString } = require('iter-fest/iterableToString');
const { iteratorToIterable } = require('iter-fest/iteratorToIterable');
const { Observable } = require('iter-fest/observable');
const { observableFromAsync } = require('iter-fest/observableFromAsync');
const { observableSubscribeAsReadable } = require('iter-fest/observableSubscribeAsReadable');
const { observableValues } = require('iter-fest/observableValues');
const { PushAsyncIterableIterator } = require('iter-fest/pushAsyncIterableIterator');
const { readerToAsyncIterableIterator } = require('iter-fest/readerToAsyncIterableIterator');
Expand Down Expand Up @@ -132,6 +133,26 @@ test('observableFromAsync should work', async () => {
expect(next).toHaveBeenNthCalledWith(3, 3);
});

test('observableSubscribeAsReadable should work', async () => {
const stream = new TextDecoderStream();
const observable = Observable.from([65, 66, 67]);
const readable = observableSubscribeAsReadable(observable);
const numberToInt8Array = new TransformStream({
transform(chunk, controller) {
controller.enqueue(Int8Array.from([chunk]));
}
});

readable.pipeThrough(numberToInt8Array).pipeTo(stream.writable);

const reader = stream.readable.getReader();

await expect(reader.read()).resolves.toEqual({ done: false, value: 'A' });
await expect(reader.read()).resolves.toEqual({ done: false, value: 'B' });
await expect(reader.read()).resolves.toEqual({ done: false, value: 'C' });
await expect(reader.read()).resolves.toEqual({ done: true });
});

test('observableValues should work', async () => {
const observable = Observable.from([1, 2, 3]);
const values = [];
Expand Down
21 changes: 21 additions & 0 deletions packages/integration-test/requiredDefault.test.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const {
iteratorToIterable,
Observable,
observableFromAsync,
observableSubscribeAsReadable,
observableValues,
PushAsyncIterableIterator,
readerToAsyncIterableIterator,
Expand Down Expand Up @@ -134,6 +135,26 @@ test('observableFromAsync should work', async () => {
expect(next).toHaveBeenNthCalledWith(3, 3);
});

test('observableSubscribeAsReadable should work', async () => {
const stream = new TextDecoderStream();
const observable = Observable.from([65, 66, 67]);
const readable = observableSubscribeAsReadable(observable);
const numberToInt8Array = new TransformStream({
transform(chunk, controller) {
controller.enqueue(Int8Array.from([chunk]));
}
});

readable.pipeThrough(numberToInt8Array).pipeTo(stream.writable);

const reader = stream.readable.getReader();

await expect(reader.read()).resolves.toEqual({ done: false, value: 'A' });
await expect(reader.read()).resolves.toEqual({ done: false, value: 'B' });
await expect(reader.read()).resolves.toEqual({ done: false, value: 'C' });
await expect(reader.read()).resolves.toEqual({ done: true });
});

test('observableValues should work', async () => {
const observable = Observable.from([1, 2, 3]);
const values = [];
Expand Down
10 changes: 10 additions & 0 deletions packages/iter-fest/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,16 @@
"default": "./dist/iter-fest.observable.js"
}
},
"./observableSubscribeAsReadable": {
"import": {
"types": "./dist/iter-fest.observableSubscribeAsReadable.d.mts",
"default": "./dist/iter-fest.observableSubscribeAsReadable.mjs"
},
"require": {
"types": "./dist/iter-fest.observableSubscribeAsReadable.d.ts",
"default": "./dist/iter-fest.observableSubscribeAsReadable.js"
}
},
"./observableFromAsync": {
"import": {
"types": "./dist/iter-fest.observableFromAsync.d.mts",
Expand Down
1 change: 1 addition & 0 deletions packages/iter-fest/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ export * from './iterableToSpliced';
export * from './iterableToString';
export * from './iteratorToIterable';
export * from './observableFromAsync';
export * from './observableSubscribeAsReadable';
export * from './observableValues';
export * from './readerToAsyncIterableIterator';
70 changes: 70 additions & 0 deletions packages/iter-fest/src/observableSubscribeAsReadable.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { Observable, type SubscriberFunction, type SubscriptionObserver } from './Observable';
import { observableSubscribeAsReadable } from './observableSubscribeAsReadable';
import type { JestMockOf } from './private/JestMockOf';
import hasResolved from './private/hasResolved';

describe('comprehensive', () => {
let observable: Observable<number>;
let readable: ReadableStream<number>;
let subscriberFunction: JestMockOf<SubscriberFunction<number>>;
let unsubscribeFunction: JestMockOf<() => void>;

beforeEach(() => {
unsubscribeFunction = jest.fn();
subscriberFunction = jest.fn().mockImplementation(() => unsubscribeFunction);
observable = new Observable(subscriberFunction);
readable = observableSubscribeAsReadable(observable);
});

test('should subscribe', () => expect(subscriberFunction).toHaveBeenCalledTimes(1));
test('should not call unsubscribe', () => expect(unsubscribeFunction).not.toHaveBeenCalled());

describe('when read()', () => {
let observer: SubscriptionObserver<number>;
let reader: ReadableStreamDefaultReader<number>;
let readPromise: Promise<ReadableStreamReadResult<number>>;

beforeEach(() => {
reader = readable.getReader();
readPromise = reader.read();
observer = subscriberFunction.mock.calls[0]![0];
});

test('should not resolve', () => expect(hasResolved(readPromise)).resolves.toBe(false));

describe('when complete()', () => {
beforeEach(() => observer.complete());

test('should receive done', () => expect(readPromise).resolves.toEqual({ done: true }));
});

describe('when error()', () => {
beforeEach(() => observer.error(new Error('artificial')));

test('should reject', () => expect(readPromise).rejects.toThrow('artificial'));
});

describe('when next(1)', () => {
beforeEach(() => observer.next(1));

test('should receive a value', () => expect(readPromise).resolves.toEqual({ done: false, value: 1 }));
});
});

describe('when cancel()', () => {
beforeEach(() => readable.cancel());

test('should unsubscribe', () => expect(unsubscribeFunction).toHaveBeenCalledTimes(1));
});
});

test('all at once as AsyncIterableIterator', async () => {
const observable = Observable.from([1, 2, 3]);
const readable = observableSubscribeAsReadable(observable);
const reader = readable.getReader();

await expect(reader.read()).resolves.toEqual({ done: false, value: 1 });
await expect(reader.read()).resolves.toEqual({ done: false, value: 2 });
await expect(reader.read()).resolves.toEqual({ done: false, value: 3 });
await expect(reader.read()).resolves.toEqual({ done: true });
});
24 changes: 24 additions & 0 deletions packages/iter-fest/src/observableSubscribeAsReadable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import type { Observable, Subscription } from './Observable';

export function observableSubscribeAsReadable<T>(observable: Observable<T>): ReadableStream<T> {
let subscription: Subscription;

return new ReadableStream<T>({
cancel() {
subscription.unsubscribe();
},
start(controller) {
subscription = observable.subscribe({
complete() {
controller.close();
},
error(err: unknown) {
controller.error(err);
},
next(value) {
controller.enqueue(value);
}
});
}
});
}
1 change: 1 addition & 0 deletions packages/iter-fest/tsup.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export default defineConfig([
'iter-fest.iteratorToIterable': './src/iteratorToIterable.ts',
'iter-fest.observable': './src/Observable.ts',
'iter-fest.observableFromAsync': './src/observableFromAsync.ts',
'iter-fest.observableSubscribeAsReadable': './src/observableSubscribeAsReadable.ts',
'iter-fest.observableValues': './src/observableValues.ts',
'iter-fest.pushAsyncIterableIterator': './src/PushAsyncIterableIterator.ts',
'iter-fest.readerToAsyncIterableIterator': './src/readerToAsyncIterableIterator.ts',
Expand Down

0 comments on commit ba0f601

Please sign in to comment.