diff --git a/README.md b/README.md index 87f2495..9ad5977 100644 --- a/README.md +++ b/README.md @@ -13,9 +13,10 @@ Originally forked from [promise-streams](https://github.com/spion/promise-stream - `FilterStream` similar to `Array.prototype.filter` Easy stream filtering of data - `ReduceStream` similar to `Array.prototype.reduce` but a stream that emits each step and `.promise()` resolves to the end result -- `bstream.wait(stream)` resolves when the stream finishes -- `bstream.collect(stream)` Concats strings and buffers, returns an array of objects. -- `bstream.pipe(source, target, [target,])` Returns a promise for when the last target stream finishes +- `bluestream.wait(stream)` resolves when the stream finishes +- `bluestream.collect(stream)` Concats strings and buffers, returns an array of objects. +- `bluestream.readAsync(stream, numberOfBytesOrObjects)` Reads a number of bytes or objects from a stream +- `bluestream.pipe(source, target, [target,])` Returns a promise for when the last target stream finishes # Examples @@ -142,7 +143,7 @@ Options: The other options are also passed to node's Write stream constructor. -#### ps.filter +#### filter `([opts:Options,] fn: async (data[, enc]) => boolean) => FilterStream` @@ -151,7 +152,7 @@ indicate whether the data value should pass to the next stream Options: Same as `ps.transform` -#### ps.reduce +#### reduce `([opts:Options,] fn: (acc, data[, enc]) => Promise) => ReduceStream` @@ -170,25 +171,31 @@ process.stdin.pipe(split()).pipe(es.reduce(function(acc, el) { }); ``` -#### ps.wait +#### wait `(s: Stream) => Promise` Wait for the stream to end. Also captures errors. -#### ps.pipe +#### pipe `(source: Stream, destination: Stream) => Promise` Pipes s1 to s2 and forwards all errors to the resulting promise. The promise is fulfilled without a value when the destination stream ends. -#### ps.collect +#### collect `(source: Stream) => Promise` Returns a Buffer, string or array of all the data events concatenated together. If no events null is returned. +#### readAsync + +`(source: Stream, count: Number) => Promise` + +Returns a count of bytes in a Buffer, characters in a string or objects in an array. If no data arrives before the stream ends `null` is returned. + #### PromiseStream.promise `() => Promise` diff --git a/lib/index.js b/lib/index.js index f00487a..3bd1392 100644 --- a/lib/index.js +++ b/lib/index.js @@ -1,34 +1,37 @@ +import { collect } from './collect' import { FilterStream, filter } from './filter' +import { pipe } from './pipe' +import { readAsync } from './readAsync' import { ReadStream, read } from './read' import { ReduceStream, reduce } from './reduce' import { TransformStream, transform, map } from './transform' -import { WriteStream, write } from './write' import { wait } from './utils' -import { pipe } from './pipe' -import { collect } from './collect' +import { WriteStream, write } from './write' +export { collect } from './collect' export { FilterStream, filter } from './filter' +export { pipe } from './pipe' +export { readAsync } from './readAsync' export { ReadStream, read } from './read' export { ReduceStream, reduce } from './reduce' export { TransformStream, transform, map } from './transform' -export { WriteStream, write } from './write' export { wait } from './utils' -export { pipe } from './pipe' -export { collect } from './collect' +export { WriteStream, write } from './write' export default { - FilterStream, + collect, filter, - ReadStream, + FilterStream, + map, + pipe, read, - ReduceStream, + readAsync, + ReadStream, reduce, - TransformStream, + ReduceStream, transform, - map, - WriteStream, - write, + TransformStream, wait, - pipe, - collect + write, + WriteStream } diff --git a/lib/readAsync.js b/lib/readAsync.js new file mode 100644 index 0000000..3961a12 --- /dev/null +++ b/lib/readAsync.js @@ -0,0 +1,57 @@ +import { defer } from './utils' + +const readOnceAsync = async (stream, count) => { + const data = stream.read(count) + if (data !== null) { + return data + } + return new Promise(resolve => { + stream.once('readable', () => { + const data = stream.read(count) + if (data === null) { + return resolve(stream.read()) + } + resolve(data) + }) + }) +} + +export const readAsync = async (stream, count) => { + if (!(stream && stream._readableState)) { + throw new TypeError('"stream" is not a readable stream') + } + if (stream._readableState.flowing) { + throw new TypeError('"stream" is in flowing mode, this is probably not what you want as data loss could occur. Please use stream.pause() to pause the stream before calling readAsync.') + } + + const objectMode = stream._readableState.objectMode + const { resolve, reject, promise } = defer() + + const cleanup = () => { + stream.removeListener('error', reject) + } + + stream.once('error', reject) + + if (objectMode) { + const objects = [] + for (let index = 0; index < (count || 1); index++) { + const obj = await readOnceAsync(stream) + if (obj === null) { + cleanup() + if (objects.length === 0) { + return null + } + return objects + } + objects.push(obj) + } + cleanup() + resolve(objects) + } else { + const data = await readOnceAsync(stream, count) + cleanup() + return data + } + return promise +} diff --git a/test/readAsync-test.js b/test/readAsync-test.js new file mode 100644 index 0000000..dffae17 --- /dev/null +++ b/test/readAsync-test.js @@ -0,0 +1,104 @@ +import fs from 'fs' +import path from 'path' +import { readAsync, read, write, collect } from '../lib' + +function nextTick (data) { + return new Promise(resolve => process.nextTick(() => resolve(data))) +} + +function bufferStream () { + return fs.createReadStream(path.join(__dirname, 'test.txt')) +} + +function stringStream () { + return fs.createReadStream(path.join(__dirname, 'test.txt'), 'utf8') +} + +function objectStream (arr = [1, 2, 3, 4, 5, 6]) { + return read(() => { + const value = arr.shift() + return (value ? { value } : null) + }) +} + +describe('#readAsync', () => { + it(`rejects if it's not a readable stream`, async () => { + const writeStream = write(() => {}) + await readAsync(writeStream, 1).then(() => { + assert.isTrue(false, 'The promise should have rejected') + }, err => { + assert.isNotNull(err) + assert.equal(writeStream._eventsCount, 1) + }) + }) + it('resolvers a buffer with a number bytes from a buffer stream', async () => { + const stream = bufferStream() + assert.deepEqual(await readAsync(stream, 4), Buffer.from('1\n2\n')) + assert.deepEqual(await readAsync(stream, 4), Buffer.from('3\n4\n')) + assert.equal(stream._eventsCount, 1) + }) + it('resolvers a string with a number characters from a string stream', async () => { + const stream = stringStream() + assert.equal(await readAsync(stream, 4), '1\n2\n') + assert.equal(await readAsync(stream, 4), '3\n4\n') + assert.equal(stream._eventsCount, 1) + }) + it('reads the number of objects from an object stream', async () => { + const stream = objectStream() + const objects = await readAsync(stream, 3) + assert.deepEqual(objects, [{ value: 1 }, { value: 2 }, { value: 3 }]) + assert.equal(stream._eventsCount, 1) + }) + it('resolvers early if the stream ends before there is enough bytes', async () => { + const file = await collect(bufferStream()) + const stream = bufferStream() + const readBytes = await readAsync(stream, 500) + assert.equal(readBytes.length, file.length) + assert.deepEqual(readBytes, file) + assert.equal(stream._eventsCount, 1) + }) + it('resolvers early if the stream ends before there is enough objects', async () => { + const stream = objectStream() + const objects = await readAsync(stream, 10) + assert.deepEqual(objects, [{ value: 1 }, { value: 2 }, { value: 3 }, { value: 4 }, { value: 5 }, { value: 6 }]) + assert.equal(stream._eventsCount, 1) + }) + it('resolves null if there was no data and the stream closed', async () => { + const stream = read({ objectMode: false }, () => null) + assert.isNull(await readAsync(stream, 5)) + assert.equal(stream._eventsCount, 1) + const stream2 = read(() => null) + assert.isNull(await readAsync(stream2, 5)) + assert.equal(stream2._eventsCount, 1) + }) + it('resolves null if the stream has already ended', async () => { + const stream = read(() => null) + stream.read() + stream.read() + assert.isNull(await readAsync(stream, 5)) + assert.equal(stream._eventsCount, 1) + }) + it('rejects if the stream errors', async () => { + const stream = read(() => 1) + const error = new Error('Foo!') + nextTick().then(() => stream.emit('error', error)) + await readAsync(stream, 5).then(() => { + assert.isTrue(false, 'The promise should have rejected') + }, err => { + assert.isNotNull(err) + assert.deepEqual(err, error) + assert.equal(stream._eventsCount, 1) + }) + }) + it('rejects if the stream is already in flowing mode', async () => { + const stream = read(() => nextTick(1)) + stream.resume() + await readAsync(stream, 1).then(() => { + assert.isTrue(false, 'The promise should have rejected') + }, err => { + stream.pause() + assert.isNotNull(err) + assert.equal(stream._eventsCount, 1) + }) + }) +}) diff --git a/types/index.d.ts b/types/index.d.ts index 27a38a8..d6a9346 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -56,6 +56,8 @@ export function pipe(...streams: (ReadStream|WriteStream|TransformStream)[]): Pr export function read(opts: StreamOptions, readFn: Function): ReadStream; +export function readAsync(stream: ReadableStream, count: Number): (Object[]|Buffer|String|null); + export function reduce(opts: StreamOptions, fn?: Function, initial?: any): ReduceStream; export function transform(opts: StreamOptions, fn?: Function): TransformStream;