Skip to content

Commit

Permalink
fix(transform): Minor refactor and transform now resolves after flush (
Browse files Browse the repository at this point in the history
…#24)

In general less, more similar code between the streams.

Change when transform streams `resolve()` we're now listening the to readable `finish` event instead of doing it before the call to `_flush()`. This should allow us to wait for flush to do whatever it needs to do before ending.
  • Loading branch information
reconbot authored Nov 24, 2017
1 parent 184e183 commit 0c951df
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 57 deletions.
21 changes: 8 additions & 13 deletions lib/read.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,15 @@ async function readHandler (bytes) {
}
}

function wrapRead (thisStream) {
const read = thisStream._read.bind(thisStream)
thisStream._asyncRead = async bytes => read(bytes)
thisStream._read = readHandler
}

export class ReadStream extends Readable {
constructor (opts, readCb) {
constructor (opts = {}, fn) {
if (typeof opts === 'function') {
readCb = opts
fn = opts
opts = {}
}

opts = {
...fn && { read: fn },
...opts
}

Expand All @@ -48,18 +43,18 @@ export class ReadStream extends Readable {
opts.objectMode = true
}

if (typeof readCb === 'function') {
opts.read = readCb
}

super(opts)
this._handlingErrors = false
this._reading = false
this._keepReading = true
this._ending = false
this._streamDeferred = defer()
this._asyncQueue = new Set()
wrapRead(this)

const read = this._read.bind(this)
this._asyncRead = async bytes => read(bytes)
this._read = readHandler

this.once('end', () => this._streamDeferred.resolve())
}

Expand Down
41 changes: 15 additions & 26 deletions lib/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,45 +19,24 @@ async function transformHandler (data, encoding, done) {
Promise.race(this._queue).then(() => done(), e => this.emitError(e))
}

function wrapTransform (thisStream) {
const transform = thisStream._transform.bind(thisStream)
thisStream._asyncTransform = async (data, encoding) => transform(data, encoding)
thisStream._transform = transformHandler
}

function flushHandler (done) {
Promise.all(this._queue)
.then(() => {
this._streamEnd.resolve()
if (this._asyncFlush) {
return this._asyncFlush()
}
return this._asyncFlush()
})
.then(data => done(null, data), done)
}

function wrapFlush (thisStream) {
thisStream._asyncFlush = thisStream._flush
thisStream._flush = flushHandler
}

export class TransformStream extends Transform {
constructor (opts, fn) {
constructor (opts = {}, fn) {
if (typeof opts === 'function') {
fn = opts
opts = {}
}

if (!opts) {
opts = {}
}

if (!opts.transform && fn) {
opts.transform = fn
}

opts = {
concurrent: 1,
...fn && { transform: fn },
...opts
}

Expand All @@ -71,12 +50,22 @@ export class TransformStream extends Transform {
}

super(opts)
wrapTransform(this)
wrapFlush(this)

const transform = this._transform.bind(this)
this._asyncTransform = async (data, encoding) => transform(data, encoding)
this._transform = transformHandler

if (this._flush) {
const flush = this._flush.bind(this)
this._asyncFlush = async () => flush()
this._flush = flushHandler
}

this._streamEnd = defer()
this._handlingErrors = false
this._concurrent = opts.concurrent
this._queue = new Set()
this.once('finish', () => this._streamEnd.resolve())
}

async end (chunk, encoding, cb) {
Expand Down
28 changes: 10 additions & 18 deletions lib/write.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,19 @@ async function writeHandler (data, encoding, done) {
Promise.race(this._queue).then(() => done(), e => this.emitError(e))
}

function wrapWrite (thisStream) {
const write = thisStream._write.bind(thisStream)
thisStream._asyncWrite = async (data, encoding) => write(data, encoding)
thisStream._write = writeHandler
}

export class WriteStream extends Writable {
constructor (opts, fn) {
constructor (opts = {}, fn) {
if (typeof opts === 'function') {
fn = opts
opts = {}
}

if (!opts) {
opts = {}
}

if (!opts.write && fn) {
opts.write = fn
opts = {
concurrent: 1,
...fn && { write: fn },
...opts
}

opts = Object.assign({
concurrent: 1
}, opts)

// only if the user hasn't suggested anything about object mode do we default to object mode
if (!('objectMode' in opts) && !('writableObjectMode' in opts)) {
opts.objectMode = true
Expand All @@ -51,11 +39,15 @@ export class WriteStream extends Writable {
}

super(opts)
wrapWrite(this)
this._streamEnd = defer()
this._handlingErrors = false
this._concurrent = opts.concurrent
this._queue = new Set()

const write = this._write.bind(this)
this._asyncWrite = async (data, encoding) => write(data, encoding)
this._write = writeHandler

this.once('finish', () => this._streamEnd.resolve())
}

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"promise",
"stream",
"readable",
"transform",
"writable",
"transform streams",
"bluebird"
Expand Down

0 comments on commit 0c951df

Please sign in to comment.