diff --git a/lib/logger.js b/lib/logger.js index 543b899..9a6f4c1 100644 --- a/lib/logger.js +++ b/lib/logger.js @@ -7,11 +7,7 @@ var stream = require('stream'); var util = require('util'); -var fs = require('fs'); - -var byline = require('byline'); -var through = require('through'); -var duplexer = require('duplexer'); +var StringDecoder = require('string_decoder').StringDecoder; module.exports = Logger; @@ -30,7 +26,7 @@ var formatters = { function Logger(options) { var defaults = JSON.parse(JSON.stringify(Logger.DEFAULTS)); options = util._extend(defaults, options || {}); - var catcher = new byline.LineStream; + var catcher = deLiner(); var emitter = catcher; var transforms = [ objectifier(), @@ -56,54 +52,97 @@ function Logger(options) { transforms.push(formatters[options.format](options)); - // restore line endings that were removed by byline + // restore line endings that were removed by line splitting transforms.push(reLiner()); for (var t in transforms) { emitter = emitter.pipe(transforms[t]); } - return duplexer(catcher, emitter); + return duplex(catcher, emitter); +} + +function deLiner() { + var decoder = new StringDecoder('utf8'); + var last = ''; + + return new stream.Transform({ + transform: transform, + flush: flush, + readableObjectMode: true, + }); + + function transform (chunk, enc, cb) { + last += decoder.write(chunk) + var list = last.split(/\r\n|[\n\v\f\r\x85\u2028\u2029]/g); + last = list.pop() + for (var i = 0; i < list.length; i++) { + // swallow empty lines + if (list[i]) { + this.push(list[i]) + } + } + cb() + } + + function flush (callback) { + // any incomplete UTF8 sequences will get dumped to the log as UTF8 + // replacement characters + last += decoder.end(); + if (last) { + this.push(last); + } + callback() + } } function reLiner() { - return through(appendNewline); + return new stream.Transform({ + transform: appendNewline + }); - function appendNewline(line) { - this.emit('data', line + '\n'); + function appendNewline(line, _enc, callback) { + this.push(line); + callback(null, Buffer.from('\n')); } } function objectifier() { - return through(objectify, null, {autoDestroy: false}); + return new stream.Transform({ + readableObjectMode: true, + transform: objectify, + }); - function objectify(line) { - this.emit('data', { - msg: line, - time: Date.now(), - }); + function objectify(chunk, encoding, callback) { + callback(null, {msg: chunk, time: Date.now()}); } } function staticTagger(tag) { - return through(tagger); + return new stream.Transform({ + objectMode: true, + transform: tagger, + }); - function tagger(logEvent) { + function tagger(logEvent, _enc, callback) { logEvent.tag = tag; - this.emit('data', logEvent); + callback(null, logEvent); } } function textFormatter(options) { - return through(textify); + return new stream.Transform({ + writableObjectMode: true, + transform: textify, + }); - function textify(logEvent) { + function textify(logEvent, _enc, callback) { var line = util.format('%s%s', textifyTags(logEvent.tag), logEvent.msg.toString()); if (options.timeStamp) { line = util.format('%s %s', new Date(logEvent.time).toISOString(), line); } - this.emit('data', line.replace(/\n/g, '\\n')); + callback(null, line.replace(/\n/g, '\\n')); } function textifyTags(tags) { @@ -120,28 +159,34 @@ function textFormatter(options) { } function jsonFormatter(options) { - return through(jsonify); + return new stream.Transform({ + writableObjectMode: true, + transform: jsonify, + }); - function jsonify(logEvent) { + function jsonify(logEvent, _enc, callback) { if (options.timeStamp) { logEvent.time = new Date(logEvent.time).toISOString(); } else { delete logEvent.time; } logEvent.msg = logEvent.msg.toString(); - this.emit('data', JSON.stringify(logEvent)); + callback(null, JSON.stringify(logEvent)); } } function lineMerger(host) { var previousLine = null; var flushTimer = null; - var stream = through(lineMergerWrite, lineMergerEnd); - var flush = _flush.bind(stream); + var merged = new stream.Transform({ + objectMode: true, + transform: lineMergerWrite, + flush: lineMergerEnd, + }); - return stream; + return merged; - function lineMergerWrite(line) { + function lineMergerWrite(line, _enc, callback) { if (/^\s+/.test(line.msg)) { if (previousLine) { previousLine.msg += '\n' + line.msg; @@ -149,23 +194,52 @@ function lineMerger(host) { previousLine = line; } } else { - flush(); + mergePrevious.call(this); previousLine = line; } // rolling timeout clearTimeout(flushTimer); - flushTimer = setTimeout(flush.bind(this), 10); + flushTimer = setTimeout(mergePrevious.bind(this), 10); + callback(); } - function _flush() { + function mergePrevious() { if (previousLine) { - this.emit('data', previousLine); + this.push(previousLine) previousLine = null; } } - function lineMergerEnd() { - flush.call(this); - this.emit('end'); + function lineMergerEnd(callback) { + mergePrevious.call(this); + callback(); + } +} + +function duplex(writable, readable) { + var dup = new stream.Duplex({ + write: writable.write.bind(writable), + read: readFromReadable, + }); + dup.on('finish', onFinish); + readable.on('end', onEnd); + + return dup; + + function readFromReadable(size) { + var dup = this; + readable.once('readable', function() { + var buf; + while ((buf = readable.read()) !== null) { + dup.push(buf); + } + }); + } + + function onFinish() { + writable.end(); + } + function onEnd() { + dup.push(null); } } diff --git a/package.json b/package.json index 9ef7d4e..77edec3 100644 --- a/package.json +++ b/package.json @@ -27,13 +27,10 @@ "test": "tap --100 test/test-*" }, "dependencies": { - "byline": "^5.0.0", - "duplexer": "^0.1.1", - "minimist": "^1.2.0", - "through": "^2.3.4" + "minimist": "^1.2.0" }, "devDependencies": { - "tap": "^12.0.1" + "tap": "^12.1.1" }, "engines": { "node": ">=4"