Skip to content

Commit

Permalink
arrayQueue vs linkQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
samthor committed Dec 18, 2024
1 parent c2acf2a commit db75990
Show file tree
Hide file tree
Showing 2 changed files with 285 additions and 111 deletions.
340 changes: 254 additions & 86 deletions src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,26 @@ export class WorkQueue<T> {
}

/**
* Link-list based queue.
* Array-based queue. Faster thank {@link LinkQueue}, but cannot be garbage-collected.
*/
export interface LinkQueue<X> {
export interface Queue<X> {
/**
* Adds more events to the queue. Returns `true` if any listeners were directly woken up.
*/
push(...all: X[]): boolean;

/**
* Returns a listener that provides all events passed with `push` after this call completes.
*
* If the signal is cancelled, the listener becomes invalid and returns undefined values.
*/
join(signal: AbortSignal): Listener<X>;
}

/**
* Link-list based queue.
*/
export interface LinkQueue<X> extends Queue<X> {
/**
* Returns a listener that provides all events passed with `push` after this call completes.
*
Expand Down Expand Up @@ -159,6 +171,12 @@ export interface Listener<X> {
* If the array has zero length, the listener was aborted.
*/
batch(): Promise<X[]>;

/**
* Waits for and returns only the last pending event. This can be useful if the event is purely a
* "high water mark".
*/
last(): Promise<X | undefined>;
}

/**
Expand All @@ -183,101 +201,251 @@ type QueueRef<X> = {
};
const emptyQueueRef: QueueRef<unknown> = {};

/**
* Builds a listener that never has any values in it, and resolves immediately. For aborted signals.
*/
export function buildEmptyListener<X = any>(): Listener<X> {
return {
next() {
return Promise.resolve(undefined);
},
peek() {
return undefined;
},
batch() {
return Promise.resolve([]);
},
};
return new EmptyListenerImpl();
}

/**
* Builds a {@link LinkQueue}.
*/
export function buildLinkQueue<X>(): LinkQueue<X> {
let head: QueueRef<X> = {};
let p: ReturnType<typeof promiseWithResolvers<void>> | undefined;
abstract class ListenerImpl<X> implements Listener<X> {
abstract next();

abstract peek();

return {
push(...all) {
if (!all.length) {
return false; // no events to broadcast
async batch(): Promise<X[]> {
const next = await this.next();
if (next === undefined) {
return [];
}
const out = [next];

while (this.peek()) {
const next = await this.next();
if (next === undefined) {
break;
}
out.push(next);
}

for (const each of all) {
const prev = head;
head = {};
prev.value = each;
prev.next = head;
return out;
}

async last() {
const out = await this.batch();
return out.at(-1);
}
}

class EmptyListenerImpl extends ListenerImpl<never> {
next() {
return Promise.resolve(undefined);
}
peek() {
return undefined;
}
}

class LinkQueueImpl<X> implements LinkQueue<X> {
head: QueueRef<X> = {};
p: ReturnType<typeof promiseWithResolvers<void>> | undefined;

push(...all) {
if (!all.length) {
return false; // no events to broadcast
}

for (const each of all) {
const prev = this.head;
this.head = {};
prev.value = each;
prev.next = this.head;
}

// wake up listeners; we only have any if p exists
if (!this.p) {
return false;
}
const { resolve } = this.p;
this.p = undefined;
resolve();
return true;
}

join(signal) {
let waitNext: () => Promise<void>;
let ref: QueueRef<X> = this.head;

if (!signal) {
// no signal, just wait normally
waitNext = () => this.p!.promise;
} else if (signal.aborted) {
// aborted signal, fail immediately
waitNext = () => Promise.resolve();
ref = emptyQueueRef as QueueRef<X>;
} else {
// normal signal
const signalPromise = promiseForSignal(signal, undefined);
waitNext = () => Promise.race([signalPromise, this.p!.promise]);
}

const outer = this;

return new (class extends ListenerImpl<X> {
peek() {
return ref.value;
}
async next() {
let { value } = ref;
if (value !== undefined) {
ref = ref.next!;
return value;
}

// wake up listeners; we only have any if p exists
if (!p) {
return false;
outer.p ??= promiseWithResolvers();
await waitNext();
return this.next();
}
const { resolve } = p;
p = undefined;
resolve();
return true;
},
join(signal) {
let waitNext: () => Promise<void>;
let ref: QueueRef<X> = head;

if (!signal) {
// no signal, just wait normally
waitNext = () => p!.promise;
} else if (signal.aborted) {
// aborted signal, fail immediately
waitNext = () => Promise.resolve();
ref = emptyQueueRef as QueueRef<X>;
} else {
// normal signal
const signalPromise = promiseForSignal(signal, undefined);
waitNext = () => Promise.race([signalPromise, p!.promise]);
async last() {
let value = await this.next();
if (value === undefined) {
return undefined;
}
while (ref.value !== undefined) {
value = ref.value;
ref = ref.next!;
}
return value;
}
})();
}
}

class ArrayQueueImpl<X> implements Queue<X> {
head: number = 0;
data: X[] = [];
subs = new Map<Listener<X>, number>();
p: ReturnType<typeof promiseWithResolvers<void>> | undefined;
trimTask = false;

push(...all: X[]): boolean {
if (all.length === 0) {
return false;
}

this.head += all.length;
if (this.subs.size === 0) {
this.data.splice(0, this.data.length);
return false;
}

this.data.push(...all);

// wake up listeners; we only have any if p exists
if (!this.p) {
return false;
}
const { resolve } = this.p;
this.p = undefined;
resolve();
return true;
}

join(signal: AbortSignal): Listener<X> {
const outer = this;
const signalPromise = promiseForSignal(signal, undefined);

const waitFor = async (cb: (avail: number) => number): Promise<X[]> => {
for (;;) {
const last = outer.subs.get(l);
if (last === undefined) {
console.info('aborted', []);
return [];
} else if (last < outer.head) {
const start = outer.head - outer.data.length;
const skip = last - start;

const avail = outer.data.length - skip;
const toReturn = cb(avail);

return {
peek() {
return ref.value;
},
async next() {
let { value } = ref;
if (value !== undefined) {
ref = ref.next!;
return value;
}

if (!p) {
p = promiseWithResolvers();
}
await waitNext();
return this.next();
},
async batch() {
const next = await this.next();
if (next === undefined) {
return [];
}
const out = [next];

while (this.peek()) {
const next = await this.next();
if (next === undefined) {
break;
}
out.push(next);
}
const out = outer.data.slice(skip, skip + toReturn);

outer.subs.set(l, last + out.length);
outer.queueTrimEvents();
return out;
},
};
},
};
}

outer.p ??= promiseWithResolvers();
await Promise.race([signalPromise, outer.p!.promise]);
}
};

const l = new (class extends ListenerImpl<X> {
peek() {
const last = outer.subs.get(l);
if (last === undefined || last >= outer.head) {
return undefined;
}

const start = outer.head - outer.data.length;
const skip = last - start;
return outer.data[skip];
}

async next() {
const out = await waitFor(() => 1);
return out[0]!;
}

async batch() {
return waitFor((avail) => avail);
}
})();

if (!signal.aborted) {
this.subs.set(l, this.head);
signal.addEventListener('abort', () => {
this.subs.delete(l);
this.queueTrimEvents();
});
}

return l;
}

queueTrimEvents() {
if (this.trimTask) {
return;
}
this.trimTask = true;
setTimeout(() => {
this.trimTask = false;

const min = Math.min(...this.subs.values());
if (min === this.head) {
if (this.data.length) {
this.data.splice(0, this.data.length);
}
return;
}

const start = this.head - this.data.length;
const strip = min - start;
this.data.splice(0, strip);
}, 250);
}
}

/**
* Builds a {@link LinkQueue}.
*/
export function buildLinkQueue<X>(): LinkQueue<X> {
return new LinkQueueImpl();
}

/**
* Builds a {@link Queue}.
*/
export function buildArrayQueue<X>(): Queue<X> {
return new ArrayQueueImpl();
}
Loading

0 comments on commit db75990

Please sign in to comment.