Skip to content

Commit

Permalink
Add MessageIO.readMessage and MessageIO.writeMessage.
Browse files Browse the repository at this point in the history
  • Loading branch information
arthurschreiber committed Aug 27, 2024
1 parent 2ecdc11 commit e761fa1
Show file tree
Hide file tree
Showing 6 changed files with 747 additions and 3 deletions.
43 changes: 43 additions & 0 deletions benchmarks/message-io/incoming-message-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
const { createBenchmark } = require('../common');
const { Readable } = require('stream');

const Debug = require('tedious/lib/debug');
const IncomingMessageStream = require('tedious/lib/incoming-message-stream');
const { Packet } = require('tedious/lib/packet');

const bench = createBenchmark(main, {
n: [100, 1000, 10000, 100000]
});

function main({ n }) {
const debug = new Debug();

const stream = Readable.from((async function*() {
for (let i = 0; i < n; i++) {
const packet = new Packet(2);
packet.last(true);
packet.addData(Buffer.from([1, 2, 3, 4, 5, 6, 7, 8, 9]));

yield packet.buffer;
}
})());

const incoming = new IncomingMessageStream(debug);
stream.pipe(incoming);

bench.start();
console.profile('incoming-message-stream');

(async function() {
let total = 0;

for await (m of incoming) {
for await (const buf of m) {
total += buf.length;
}
}

console.profileEnd('incoming-message-stream');
bench.end(n);
})();
}
72 changes: 72 additions & 0 deletions benchmarks/message-io/outgoing-message-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
const { createBenchmark } = require('../common');
const { Duplex } = require('stream');

const Debug = require('../../lib/debug');
const OutgoingMessageStream = require('../../lib/outgoing-message-stream');
const Message = require('../../lib/message');

const bench = createBenchmark(main, {
n: [100, 1000, 10000, 100000]
});

function main({ n }) {
const debug = new Debug();

const stream = new Duplex({
read() {},
write(chunk, encoding, callback) {
// Just consume the data
callback();
}
});

const payload = [
Buffer.alloc(1024),
Buffer.alloc(1024),
Buffer.alloc(1024),
Buffer.alloc(256),
Buffer.alloc(256),
Buffer.alloc(256),
Buffer.alloc(256),
];

const out = new OutgoingMessageStream(debug, {
packetSize: 8 + 1024
});
out.pipe(stream);

bench.start();
console.profile('write-message');

function writeNextMessage(i) {
if (i == n) {
out.end();
out.once('finish', () => {
console.profileEnd('write-message');
bench.end(n);
});
return;
}

const m = new Message({ type: 2, resetConnection: false });
out.write(m);

for (const buf of payload) {
m.write(buf);
}

m.end();

if (out.needsDrain) {
out.once('drain', () => {
writeNextMessage(i + 1);
});
} else {
process.nextTick(() => {
writeNextMessage(i + 1);
});
}
}

writeNextMessage(0);
}
39 changes: 39 additions & 0 deletions benchmarks/message-io/read-message.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
const { createBenchmark } = require('../common');
const { Readable } = require('stream');

const Debug = require('tedious/lib/debug');
const MessageIO = require('tedious/lib/message-io');
const { Packet } = require('tedious/lib/packet');

const bench = createBenchmark(main, {
n: [100, 1000, 10000, 100000]
});

function main({ n }) {
const debug = new Debug();

const stream = Readable.from((async function*() {
for (let i = 0; i < n; i++) {
const packet = new Packet(2);
packet.last(true);
packet.addData(Buffer.from([1, 2, 3, 4, 5, 6, 7, 8, 9]));

yield packet.buffer;
}
})());

(async function() {
bench.start();
console.profile('read-message');

let total = 0;
for (let i = 0; i < n; i++) {
for await (const chunk of MessageIO.readMessage(stream, debug)) {
total += chunk.length;
}
}

console.profileEnd('read-message');
bench.end(n);
})();
}
43 changes: 43 additions & 0 deletions benchmarks/message-io/write-message.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
const { createBenchmark, createConnection } = require('../common');
const { Duplex } = require('stream');

const Debug = require('tedious/lib/debug');
const MessageIO = require('tedious/lib/message-io');

const bench = createBenchmark(main, {
n: [100, 1000, 10000, 100000]
});

function main({ n }) {
const debug = new Debug();

const stream = new Duplex({
read() {},
write(chunk, encoding, callback) {
// Just consume the data
callback();
}
});

const payload = [
Buffer.alloc(1024),
Buffer.alloc(1024),
Buffer.alloc(1024),
Buffer.alloc(256),
Buffer.alloc(256),
Buffer.alloc(256),
Buffer.alloc(256),
];

(async function() {
bench.start();
console.profile('write-message');

for (let i = 0; i <= n; i++) {
await MessageIO.writeMessage(stream, debug, 8 + 1024, 2, payload);
}

console.profileEnd('write-message');
bench.end(n);
})();
}
Loading

0 comments on commit e761fa1

Please sign in to comment.