Skip to content

Commit

Permalink
Fix stream runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
lxsmnsyc committed Dec 6, 2023
1 parent f42763c commit 2c1d3ab
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 110 deletions.
29 changes: 16 additions & 13 deletions packages/seroval/src/core/context/serializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -947,8 +947,8 @@ export default abstract class BaseSerializerContext implements PluginAccessOptio
let result = '';
let initialized = false;
if (node.f.t !== SerovalNodeType.IndexedValue) {
result = '(' + this.serialize(node.f) + ',';
this.markRef(node.f.i);
result = '(' + this.serialize(node.f) + ',';
initialized = true;
}
result += this.assignIndexedValue(
Expand All @@ -960,7 +960,7 @@ export default abstract class BaseSerializerContext implements PluginAccessOptio
'(i=0,t={[' + this.getRefParam(node.f.i) + ']:' + this.createFunction([], 't') + ',next:'
+ this.createEffectfulFunction(
[],
'if(i>s.d)return{done:!0,value:void 0};c=i++,d=s.v[c];if(c===s.t)throw d;return{done:c===s.d,value:d}',
'if(i>s.d)return{done:!0,value:void 0};if(d=s.v[c=i++],c===s.t)throw d;return{done:c===s.d,value:d}',
) + '})',
),
),
Expand All @@ -984,12 +984,12 @@ export default abstract class BaseSerializerContext implements PluginAccessOptio
let result = '';

if (promise.t !== SerovalNodeType.IndexedValue) {
result += '(' + this.serialize(promise);
this.markRef(promise.i);
result += '(' + this.serialize(promise);
}
if (symbol.t !== SerovalNodeType.IndexedValue) {
result += (result ? ',' : '(') + this.serialize(symbol);
this.markRef(symbol.i);
result += (result ? ',' : '(') + this.serialize(symbol);
}
if (result) {
result += ',';
Expand All @@ -1000,19 +1000,22 @@ export default abstract class BaseSerializerContext implements PluginAccessOptio
this.createFunction(
['s'],
this.createFunction(
['b', 'c', 'p', 't'],
'(b=[],c=0,p=void 0,s.on({next:' + this.createEffectfulFunction(
['v'],
'p&&p.s({done:!1,value:v});b.push([0,v])',
['b', 'c', 'p', 'd', 'e', 't', 'f'],
'(b=[],c=0,p=[],d=-1,e=!1,f=' + this.createEffectfulFunction(
['i', 'l'],
'for(i=0,l=p.length;i<l;i++)p[i].s({done:!0,value:void 0})',
) + ',s.on({next:' + this.createEffectfulFunction(
['v', 't'],
'if(t=p.shift())t.s({done:!1,value:v});b.push(v)',
) + ',throw:' + this.createEffectfulFunction(
['v'],
'p&&p.f(v);b.push([1,v])',
['v', 't'],
'if(t=p.shift())t.f(v);f(),d=b.length,e=!0,b.push(v)',
) + ',return:' + this.createEffectfulFunction(
['v'],
'p&&p.s({done:!0,value:v});b.push([2,v])',
['v', 't'],
'if(t=p.shift())t.s({done:!0,value:v});f(),d=b.length,b.push(v)',
) + '}),t={[' + this.getRefParam(symbol.i) + ']:' + this.createFunction([], 't') + ',next:' + this.createEffectfulFunction(
['i', 't', 'v'],
'i=c++;if(i>b.length)return ' + this.getRefParam(promise.i) + '();t=b[i][0],v=b[i][1];if(t===1)throw v;return{done:t===2,value:v}',
'if(d===-1){return((i=c++)>=b.length)?(p.push(t=' + this.getRefParam(promise.i) + '()),t):{done:!0,value:b[i]}}if(c>d)return{done:!0,value:void 0};if(v=b[i=c++],c!==d)return{done:!1,value:v};if(e)throw v;return{done:!0,value:v}',
) + '})',
),
),
Expand Down
74 changes: 46 additions & 28 deletions packages/seroval/src/core/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,28 +128,44 @@ export function streamToAsyncIterable<T>(
stream: Stream<T>,
): () => AsyncIterableIterator<T> {
return (): AsyncIterableIterator<T> => {
const buffer: [type: 0 | 1 | 2, value: T][] = [];
const buffer: T[] = [];
const pending: Deferred[] = [];
let count = 0;
let pending: Deferred | undefined;
let doneAt = -1;
let isThrow = false;

function resolveAll(): void {
for (let i = 0, len = pending.length; i < len; i++) {
pending[i].resolve({ done: true, value: undefined });
}
}

stream.on({
next(value) {
if (pending) {
pending.resolve({ done: false, value });
const current = pending.shift();
if (current) {
current.resolve({ done: false, value });
}
buffer.push([0, value]);
buffer.push(value);
},
throw(value) {
if (pending) {
pending.reject(value);
const current = pending.shift();
if (current) {
current.reject(value);
}
buffer.push([1, value as T]);
resolveAll();
doneAt = buffer.length;
buffer.push(value as T);
isThrow = true;
},
return(value) {
if (pending) {
pending.resolve({ done: true, value });
const current = pending.shift();
if (current) {
current.resolve({ done: true, value });
}
buffer.push([2, value]);
resolveAll();
doneAt = buffer.length;
buffer.push(value);
},
});

Expand All @@ -158,25 +174,27 @@ export function streamToAsyncIterable<T>(
return this;
},
async next(): Promise<IteratorResult<T>> {
const current = count++;
if (current < buffer.length) {
const [type, value] = buffer[current];
if (type === 1) {
throw value;
if (doneAt === -1) {
const current = count++;
if (current >= buffer.length) {
const deferred = createDeferred();
pending.push(deferred);
return deferred.promise as Promise<IteratorResult<T>>;
}
if (type === 0) {
return {
done: false,
value,
};
}
return {
done: true,
value,
};
return { done: false, value: buffer[current] };
}
if (count > doneAt) {
return { done: true, value: undefined };
}
const current = count++;
const value = buffer[current];
if (count !== doneAt) {
return { done: false, value };
}
if (isThrow) {
throw value;
}
pending = createDeferred();
return pending.promise as Promise<IteratorResult<T>>;
return { done: true, value };
},
};
};
Expand Down
Loading

0 comments on commit 2c1d3ab

Please sign in to comment.