diff --git a/lib/read.js b/lib/read.js index 14381ad..43d1d53 100644 --- a/lib/read.js +++ b/lib/read.js @@ -26,20 +26,15 @@ async function readHandler (bytes) { } } -function wrapRead (thisStream) { - const read = thisStream._read.bind(thisStream) - thisStream._asyncRead = async bytes => read(bytes) - thisStream._read = readHandler -} - export class ReadStream extends Readable { - constructor (opts, readCb) { + constructor (opts = {}, fn) { if (typeof opts === 'function') { - readCb = opts + fn = opts opts = {} } opts = { + ...fn && { read: fn }, ...opts } @@ -48,10 +43,6 @@ export class ReadStream extends Readable { opts.objectMode = true } - if (typeof readCb === 'function') { - opts.read = readCb - } - super(opts) this._handlingErrors = false this._reading = false @@ -59,7 +50,11 @@ export class ReadStream extends Readable { this._ending = false this._streamDeferred = defer() this._asyncQueue = new Set() - wrapRead(this) + + const read = this._read.bind(this) + this._asyncRead = async bytes => read(bytes) + this._read = readHandler + this.once('end', () => this._streamDeferred.resolve()) } diff --git a/lib/transform.js b/lib/transform.js index 2867967..1dc6cfc 100644 --- a/lib/transform.js +++ b/lib/transform.js @@ -19,45 +19,24 @@ async function transformHandler (data, encoding, done) { Promise.race(this._queue).then(() => done(), e => this.emitError(e)) } -function wrapTransform (thisStream) { - const transform = thisStream._transform.bind(thisStream) - thisStream._asyncTransform = async (data, encoding) => transform(data, encoding) - thisStream._transform = transformHandler -} - function flushHandler (done) { Promise.all(this._queue) .then(() => { - this._streamEnd.resolve() - if (this._asyncFlush) { - return this._asyncFlush() - } + return this._asyncFlush() }) .then(data => done(null, data), done) } -function wrapFlush (thisStream) { - thisStream._asyncFlush = thisStream._flush - thisStream._flush = flushHandler -} - export class TransformStream extends Transform { - constructor (opts, fn) { + constructor (opts = {}, fn) { if (typeof opts === 'function') { fn = opts opts = {} } - if (!opts) { - opts = {} - } - - if (!opts.transform && fn) { - opts.transform = fn - } - opts = { concurrent: 1, + ...fn && { transform: fn }, ...opts } @@ -71,12 +50,22 @@ export class TransformStream extends Transform { } super(opts) - wrapTransform(this) - wrapFlush(this) + + const transform = this._transform.bind(this) + this._asyncTransform = async (data, encoding) => transform(data, encoding) + this._transform = transformHandler + + if (this._flush) { + const flush = this._flush.bind(this) + this._asyncFlush = async () => flush() + this._flush = flushHandler + } + this._streamEnd = defer() this._handlingErrors = false this._concurrent = opts.concurrent this._queue = new Set() + this.once('finish', () => this._streamEnd.resolve()) } async end (chunk, encoding, cb) { diff --git a/lib/write.js b/lib/write.js index 22dc7f8..792a38e 100644 --- a/lib/write.js +++ b/lib/write.js @@ -16,31 +16,19 @@ async function writeHandler (data, encoding, done) { Promise.race(this._queue).then(() => done(), e => this.emitError(e)) } -function wrapWrite (thisStream) { - const write = thisStream._write.bind(thisStream) - thisStream._asyncWrite = async (data, encoding) => write(data, encoding) - thisStream._write = writeHandler -} - export class WriteStream extends Writable { - constructor (opts, fn) { + constructor (opts = {}, fn) { if (typeof opts === 'function') { fn = opts opts = {} } - if (!opts) { - opts = {} - } - - if (!opts.write && fn) { - opts.write = fn + opts = { + concurrent: 1, + ...fn && { write: fn }, + ...opts } - opts = Object.assign({ - concurrent: 1 - }, opts) - // only if the user hasn't suggested anything about object mode do we default to object mode if (!('objectMode' in opts) && !('writableObjectMode' in opts)) { opts.objectMode = true @@ -51,11 +39,15 @@ export class WriteStream extends Writable { } super(opts) - wrapWrite(this) this._streamEnd = defer() this._handlingErrors = false this._concurrent = opts.concurrent this._queue = new Set() + + const write = this._write.bind(this) + this._asyncWrite = async (data, encoding) => write(data, encoding) + this._write = writeHandler + this.once('finish', () => this._streamEnd.resolve()) } diff --git a/package.json b/package.json index 15b8da4..1ed918f 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,7 @@ "promise", "stream", "readable", + "transform", "writable", "transform streams", "bluebird"