From 4decbaa8064a7e2bd2fd7fc95b15c105f6fe6b6b Mon Sep 17 00:00:00 2001 From: William Wong Date: Sat, 8 Jun 2024 16:01:59 -0700 Subject: [PATCH] Add `observableSubscribeAsReadable` (#13) * Add `observableSubscribeAsReadable` * Add description --- CHANGELOG.md | 5 +- README.md | 24 +++++-- .../integration-test/importDefault.test.ts | 21 ++++++ packages/integration-test/importNamed.test.ts | 21 ++++++ .../integration-test/requireNamed.test.cjs | 21 ++++++ .../integration-test/requiredDefault.test.cjs | 21 ++++++ packages/iter-fest/package.json | 10 +++ packages/iter-fest/src/index.ts | 1 + .../src/observableSubscribeAsReadable.spec.ts | 70 +++++++++++++++++++ .../src/observableSubscribeAsReadable.ts | 24 +++++++ packages/iter-fest/tsup.config.ts | 1 + 11 files changed, 212 insertions(+), 7 deletions(-) create mode 100644 packages/iter-fest/src/observableSubscribeAsReadable.spec.ts create mode 100644 packages/iter-fest/src/observableSubscribeAsReadable.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index d7ccc85..4ab35c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,8 +12,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added typed `Observable` from `core-js-pure`, in PR [#8](https://github.com/compulim/iter-fest/pull/8) - Added `observableValues` to convert `Observable` into `AsyncIterableIterator` in PR [#8](https://github.com/compulim/iter-fest/pull/8) - Added `observableFromAsync` to convert `AsyncIterable` into `Observable` in PR [#9](https://github.com/compulim/iter-fest/pull/9) -- Added `PushAsyncIterableIterator` in PR [#10](https://github.com/compulim/iter-fest/pull/10) -- Added `readerToAsyncIterableIterator` in PR [#11](https://github.com/compulim/iter-fest/pull/11) +- Added `PushAsyncIterableIterator` in PR [#11](https://github.com/compulim/iter-fest/pull/11) +- Added `readerToAsyncIterableIterator` in PR [#12](https://github.com/compulim/iter-fest/pull/12) +- Added `observableSubscribeAsReadable` in PR [#13](https://github.com/compulim/iter-fest/pull/13) ### Changed diff --git a/README.md b/README.md index 2d51948..3153600 100644 --- a/README.md +++ b/README.md @@ -126,17 +126,31 @@ const iterable = new PushAsyncIterableIterator(); `readerToAsyncIterableIterator` will convert default reader of `ReadableStream` into an `AsyncIterableIterator` to use in for-loop. ```ts -const readableStream = new ReadableStream({ start(controller) { - controller.enqueue(1); - controller.enqueue(2); - controller.close(); -} }); +const readableStream = new ReadableStream({ + start(controller) { + controller.enqueue(1); + controller.enqueue(2); + controller.close(); + } +}); for await (const value of readerToAsyncIterableIterator(readableStream.getReader())) { console.log(value); // Prints "1", "2", "3". } ``` +## Converts `Observable` into `ReadableStream` + +`ReadableStream` is powerful for transforming and piping stream of data. It can be formed using data from both push-based and pull-based source with backpressuree. + +```ts +const observable = Observable.from([1, 2, 3]); +const readable = observableSubscribeAsReadable(observable); +const reader = readable.getReader(); + +readable.pipeTo(stream.writable); // Will write 1, 2, 3. +``` + ## Behaviors ### How this compares to the TC39 proposals? diff --git a/packages/integration-test/importDefault.test.ts b/packages/integration-test/importDefault.test.ts index 73d24f4..aad993a 100644 --- a/packages/integration-test/importDefault.test.ts +++ b/packages/integration-test/importDefault.test.ts @@ -25,6 +25,7 @@ import { iterableToString, iteratorToIterable, observableFromAsync, + observableSubscribeAsReadable, observableValues, readerToAsyncIterableIterator } from 'iter-fest'; @@ -133,6 +134,26 @@ test('observableFromAsync should work', async () => { expect(next).toHaveBeenNthCalledWith(3, 3); }); +test('observableSubscribeAsReadable should work', async () => { + const stream = new TextDecoderStream(); + const observable = Observable.from([65, 66, 67]); + const readable = observableSubscribeAsReadable(observable); + const numberToInt8Array = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(Int8Array.from([chunk])); + } + }); + + readable.pipeThrough(numberToInt8Array).pipeTo(stream.writable); + + const reader = stream.readable.getReader(); + + await expect(reader.read()).resolves.toEqual({ done: false, value: 'A' }); + await expect(reader.read()).resolves.toEqual({ done: false, value: 'B' }); + await expect(reader.read()).resolves.toEqual({ done: false, value: 'C' }); + await expect(reader.read()).resolves.toEqual({ done: true }); +}); + test('observableValues should work', async () => { const observable = Observable.from([1, 2, 3]); const values = []; diff --git a/packages/integration-test/importNamed.test.ts b/packages/integration-test/importNamed.test.ts index 0fbca9f..c7ed2ca 100644 --- a/packages/integration-test/importNamed.test.ts +++ b/packages/integration-test/importNamed.test.ts @@ -22,6 +22,7 @@ import { iterableToString } from 'iter-fest/iterableToString'; import { iteratorToIterable } from 'iter-fest/iteratorToIterable'; import { Observable } from 'iter-fest/observable'; import { observableFromAsync } from 'iter-fest/observableFromAsync'; +import { observableSubscribeAsReadable } from 'iter-fest/observableSubscribeAsReadable'; import { observableValues } from 'iter-fest/observableValues'; import { PushAsyncIterableIterator } from 'iter-fest/pushAsyncIterableIterator'; import { readerToAsyncIterableIterator } from 'iter-fest/readerToAsyncIterableIterator'; @@ -131,6 +132,26 @@ test('observableFromAsync should work', async () => { expect(next).toHaveBeenNthCalledWith(3, 3); }); +test('observableSubscribeAsReadable should work', async () => { + const stream = new TextDecoderStream(); + const observable = Observable.from([65, 66, 67]); + const readable = observableSubscribeAsReadable(observable); + const numberToInt8Array = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(Int8Array.from([chunk])); + } + }); + + readable.pipeThrough(numberToInt8Array).pipeTo(stream.writable); + + const reader = stream.readable.getReader(); + + await expect(reader.read()).resolves.toEqual({ done: false, value: 'A' }); + await expect(reader.read()).resolves.toEqual({ done: false, value: 'B' }); + await expect(reader.read()).resolves.toEqual({ done: false, value: 'C' }); + await expect(reader.read()).resolves.toEqual({ done: true }); +}); + test('observableValues should work', async () => { const observable = Observable.from([1, 2, 3]); const values = []; diff --git a/packages/integration-test/requireNamed.test.cjs b/packages/integration-test/requireNamed.test.cjs index 96e3dbf..c124835 100644 --- a/packages/integration-test/requireNamed.test.cjs +++ b/packages/integration-test/requireNamed.test.cjs @@ -23,6 +23,7 @@ const { iterableToString } = require('iter-fest/iterableToString'); const { iteratorToIterable } = require('iter-fest/iteratorToIterable'); const { Observable } = require('iter-fest/observable'); const { observableFromAsync } = require('iter-fest/observableFromAsync'); +const { observableSubscribeAsReadable } = require('iter-fest/observableSubscribeAsReadable'); const { observableValues } = require('iter-fest/observableValues'); const { PushAsyncIterableIterator } = require('iter-fest/pushAsyncIterableIterator'); const { readerToAsyncIterableIterator } = require('iter-fest/readerToAsyncIterableIterator'); @@ -132,6 +133,26 @@ test('observableFromAsync should work', async () => { expect(next).toHaveBeenNthCalledWith(3, 3); }); +test('observableSubscribeAsReadable should work', async () => { + const stream = new TextDecoderStream(); + const observable = Observable.from([65, 66, 67]); + const readable = observableSubscribeAsReadable(observable); + const numberToInt8Array = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(Int8Array.from([chunk])); + } + }); + + readable.pipeThrough(numberToInt8Array).pipeTo(stream.writable); + + const reader = stream.readable.getReader(); + + await expect(reader.read()).resolves.toEqual({ done: false, value: 'A' }); + await expect(reader.read()).resolves.toEqual({ done: false, value: 'B' }); + await expect(reader.read()).resolves.toEqual({ done: false, value: 'C' }); + await expect(reader.read()).resolves.toEqual({ done: true }); +}); + test('observableValues should work', async () => { const observable = Observable.from([1, 2, 3]); const values = []; diff --git a/packages/integration-test/requiredDefault.test.cjs b/packages/integration-test/requiredDefault.test.cjs index 721a864..ea37bbe 100644 --- a/packages/integration-test/requiredDefault.test.cjs +++ b/packages/integration-test/requiredDefault.test.cjs @@ -24,6 +24,7 @@ const { iteratorToIterable, Observable, observableFromAsync, + observableSubscribeAsReadable, observableValues, PushAsyncIterableIterator, readerToAsyncIterableIterator, @@ -134,6 +135,26 @@ test('observableFromAsync should work', async () => { expect(next).toHaveBeenNthCalledWith(3, 3); }); +test('observableSubscribeAsReadable should work', async () => { + const stream = new TextDecoderStream(); + const observable = Observable.from([65, 66, 67]); + const readable = observableSubscribeAsReadable(observable); + const numberToInt8Array = new TransformStream({ + transform(chunk, controller) { + controller.enqueue(Int8Array.from([chunk])); + } + }); + + readable.pipeThrough(numberToInt8Array).pipeTo(stream.writable); + + const reader = stream.readable.getReader(); + + await expect(reader.read()).resolves.toEqual({ done: false, value: 'A' }); + await expect(reader.read()).resolves.toEqual({ done: false, value: 'B' }); + await expect(reader.read()).resolves.toEqual({ done: false, value: 'C' }); + await expect(reader.read()).resolves.toEqual({ done: true }); +}); + test('observableValues should work', async () => { const observable = Observable.from([1, 2, 3]); const values = []; diff --git a/packages/iter-fest/package.json b/packages/iter-fest/package.json index 4ea0310..657fd37 100644 --- a/packages/iter-fest/package.json +++ b/packages/iter-fest/package.json @@ -226,6 +226,16 @@ "default": "./dist/iter-fest.observable.js" } }, + "./observableSubscribeAsReadable": { + "import": { + "types": "./dist/iter-fest.observableSubscribeAsReadable.d.mts", + "default": "./dist/iter-fest.observableSubscribeAsReadable.mjs" + }, + "require": { + "types": "./dist/iter-fest.observableSubscribeAsReadable.d.ts", + "default": "./dist/iter-fest.observableSubscribeAsReadable.js" + } + }, "./observableFromAsync": { "import": { "types": "./dist/iter-fest.observableFromAsync.d.mts", diff --git a/packages/iter-fest/src/index.ts b/packages/iter-fest/src/index.ts index 7a50bf1..8c4e807 100644 --- a/packages/iter-fest/src/index.ts +++ b/packages/iter-fest/src/index.ts @@ -23,5 +23,6 @@ export * from './iterableToSpliced'; export * from './iterableToString'; export * from './iteratorToIterable'; export * from './observableFromAsync'; +export * from './observableSubscribeAsReadable'; export * from './observableValues'; export * from './readerToAsyncIterableIterator'; diff --git a/packages/iter-fest/src/observableSubscribeAsReadable.spec.ts b/packages/iter-fest/src/observableSubscribeAsReadable.spec.ts new file mode 100644 index 0000000..76997ea --- /dev/null +++ b/packages/iter-fest/src/observableSubscribeAsReadable.spec.ts @@ -0,0 +1,70 @@ +import { Observable, type SubscriberFunction, type SubscriptionObserver } from './Observable'; +import { observableSubscribeAsReadable } from './observableSubscribeAsReadable'; +import type { JestMockOf } from './private/JestMockOf'; +import hasResolved from './private/hasResolved'; + +describe('comprehensive', () => { + let observable: Observable; + let readable: ReadableStream; + let subscriberFunction: JestMockOf>; + let unsubscribeFunction: JestMockOf<() => void>; + + beforeEach(() => { + unsubscribeFunction = jest.fn(); + subscriberFunction = jest.fn().mockImplementation(() => unsubscribeFunction); + observable = new Observable(subscriberFunction); + readable = observableSubscribeAsReadable(observable); + }); + + test('should subscribe', () => expect(subscriberFunction).toHaveBeenCalledTimes(1)); + test('should not call unsubscribe', () => expect(unsubscribeFunction).not.toHaveBeenCalled()); + + describe('when read()', () => { + let observer: SubscriptionObserver; + let reader: ReadableStreamDefaultReader; + let readPromise: Promise>; + + beforeEach(() => { + reader = readable.getReader(); + readPromise = reader.read(); + observer = subscriberFunction.mock.calls[0]![0]; + }); + + test('should not resolve', () => expect(hasResolved(readPromise)).resolves.toBe(false)); + + describe('when complete()', () => { + beforeEach(() => observer.complete()); + + test('should receive done', () => expect(readPromise).resolves.toEqual({ done: true })); + }); + + describe('when error()', () => { + beforeEach(() => observer.error(new Error('artificial'))); + + test('should reject', () => expect(readPromise).rejects.toThrow('artificial')); + }); + + describe('when next(1)', () => { + beforeEach(() => observer.next(1)); + + test('should receive a value', () => expect(readPromise).resolves.toEqual({ done: false, value: 1 })); + }); + }); + + describe('when cancel()', () => { + beforeEach(() => readable.cancel()); + + test('should unsubscribe', () => expect(unsubscribeFunction).toHaveBeenCalledTimes(1)); + }); +}); + +test('all at once as AsyncIterableIterator', async () => { + const observable = Observable.from([1, 2, 3]); + const readable = observableSubscribeAsReadable(observable); + const reader = readable.getReader(); + + await expect(reader.read()).resolves.toEqual({ done: false, value: 1 }); + await expect(reader.read()).resolves.toEqual({ done: false, value: 2 }); + await expect(reader.read()).resolves.toEqual({ done: false, value: 3 }); + await expect(reader.read()).resolves.toEqual({ done: true }); +}); diff --git a/packages/iter-fest/src/observableSubscribeAsReadable.ts b/packages/iter-fest/src/observableSubscribeAsReadable.ts new file mode 100644 index 0000000..486b8e3 --- /dev/null +++ b/packages/iter-fest/src/observableSubscribeAsReadable.ts @@ -0,0 +1,24 @@ +import type { Observable, Subscription } from './Observable'; + +export function observableSubscribeAsReadable(observable: Observable): ReadableStream { + let subscription: Subscription; + + return new ReadableStream({ + cancel() { + subscription.unsubscribe(); + }, + start(controller) { + subscription = observable.subscribe({ + complete() { + controller.close(); + }, + error(err: unknown) { + controller.error(err); + }, + next(value) { + controller.enqueue(value); + } + }); + } + }); +} diff --git a/packages/iter-fest/tsup.config.ts b/packages/iter-fest/tsup.config.ts index 047e907..c6b7836 100644 --- a/packages/iter-fest/tsup.config.ts +++ b/packages/iter-fest/tsup.config.ts @@ -28,6 +28,7 @@ export default defineConfig([ 'iter-fest.iteratorToIterable': './src/iteratorToIterable.ts', 'iter-fest.observable': './src/Observable.ts', 'iter-fest.observableFromAsync': './src/observableFromAsync.ts', + 'iter-fest.observableSubscribeAsReadable': './src/observableSubscribeAsReadable.ts', 'iter-fest.observableValues': './src/observableValues.ts', 'iter-fest.pushAsyncIterableIterator': './src/PushAsyncIterableIterator.ts', 'iter-fest.readerToAsyncIterableIterator': './src/readerToAsyncIterableIterator.ts',