826 lines
26 KiB
Markdown
826 lines
26 KiB
Markdown
|
# minipass
|
||
|
|
||
|
A _very_ minimal implementation of a [PassThrough
|
||
|
stream](https://nodejs.org/api/stream.html#stream_class_stream_passthrough)
|
||
|
|
||
|
[It's very
|
||
|
fast](https://docs.google.com/spreadsheets/d/1K_HR5oh3r80b8WVMWCPPjfuWXUgfkmhlX7FGI6JJ8tY/edit?usp=sharing)
|
||
|
for objects, strings, and buffers.
|
||
|
|
||
|
Supports `pipe()`ing (including multi-`pipe()` and backpressure
|
||
|
transmission), buffering data until either a `data` event handler
|
||
|
or `pipe()` is added (so you don't lose the first chunk), and
|
||
|
most other cases where PassThrough is a good idea.
|
||
|
|
||
|
There is a `read()` method, but it's much more efficient to
|
||
|
consume data from this stream via `'data'` events or by calling
|
||
|
`pipe()` into some other stream. Calling `read()` requires the
|
||
|
buffer to be flattened in some cases, which requires copying
|
||
|
memory.
|
||
|
|
||
|
If you set `objectMode: true` in the options, then whatever is
|
||
|
written will be emitted. Otherwise, it'll do a minimal amount of
|
||
|
Buffer copying to ensure proper Streams semantics when `read(n)`
|
||
|
is called.
|
||
|
|
||
|
`objectMode` can only be set at instantiation. Attempting to
|
||
|
write something other than a String or Buffer without having set
|
||
|
`objectMode` in the options will throw an error.
|
||
|
|
||
|
This is not a `through` or `through2` stream. It doesn't
|
||
|
transform the data, it just passes it right through. If you want
|
||
|
to transform the data, extend the class, and override the
|
||
|
`write()` method. Once you're done transforming the data however
|
||
|
you want, call `super.write()` with the transform output.
|
||
|
|
||
|
For some examples of streams that extend Minipass in various
|
||
|
ways, check out:
|
||
|
|
||
|
- [minizlib](http://npm.im/minizlib)
|
||
|
- [fs-minipass](http://npm.im/fs-minipass)
|
||
|
- [tar](http://npm.im/tar)
|
||
|
- [minipass-collect](http://npm.im/minipass-collect)
|
||
|
- [minipass-flush](http://npm.im/minipass-flush)
|
||
|
- [minipass-pipeline](http://npm.im/minipass-pipeline)
|
||
|
- [tap](http://npm.im/tap)
|
||
|
- [tap-parser](http://npm.im/tap-parser)
|
||
|
- [treport](http://npm.im/treport)
|
||
|
- [minipass-fetch](http://npm.im/minipass-fetch)
|
||
|
- [pacote](http://npm.im/pacote)
|
||
|
- [make-fetch-happen](http://npm.im/make-fetch-happen)
|
||
|
- [cacache](http://npm.im/cacache)
|
||
|
- [ssri](http://npm.im/ssri)
|
||
|
- [npm-registry-fetch](http://npm.im/npm-registry-fetch)
|
||
|
- [minipass-json-stream](http://npm.im/minipass-json-stream)
|
||
|
- [minipass-sized](http://npm.im/minipass-sized)
|
||
|
|
||
|
## Usage in TypeScript
|
||
|
|
||
|
The `Minipass` class takes three type template definitions:
|
||
|
|
||
|
- `RType` the type being read, which defaults to `Buffer`. If
|
||
|
`RType` is `string`, then the constructor _must_ get an options
|
||
|
object specifying either an `encoding` or `objectMode: true`.
|
||
|
If it's anything other than `string` or `Buffer`, then it
|
||
|
_must_ get an options object specifying `objectMode: true`.
|
||
|
- `WType` the type being written. If `RType` is `Buffer` or
|
||
|
`string`, then this defaults to `ContiguousData` (Buffer,
|
||
|
string, ArrayBuffer, or ArrayBufferView). Otherwise, it
|
||
|
defaults to `RType`.
|
||
|
- `Events` type mapping event names to the arguments emitted
|
||
|
with that event, which extends `Minipass.Events`.
|
||
|
|
||
|
To declare types for custom events in subclasses, extend the
|
||
|
third parameter with your own event signatures. For example:
|
||
|
|
||
|
```js
|
||
|
import { Minipass } from 'minipass'
|
||
|
|
||
|
// a NDJSON stream that emits 'jsonError' when it can't stringify
|
||
|
export interface Events extends Minipass.Events {
|
||
|
jsonError: [e: Error]
|
||
|
}
|
||
|
|
||
|
export class NDJSONStream extends Minipass<string, any, Events> {
|
||
|
constructor() {
|
||
|
super({ objectMode: true })
|
||
|
}
|
||
|
|
||
|
// data is type `any` because that's WType
|
||
|
write(data, encoding, cb) {
|
||
|
try {
|
||
|
const json = JSON.stringify(data)
|
||
|
return super.write(json + '\n', encoding, cb)
|
||
|
} catch (er) {
|
||
|
if (!er instanceof Error) {
|
||
|
er = Object.assign(new Error('json stringify failed'), {
|
||
|
cause: er,
|
||
|
})
|
||
|
}
|
||
|
// trying to emit with something OTHER than an error will
|
||
|
// fail, because we declared the event arguments type.
|
||
|
this.emit('jsonError', er)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
const s = new NDJSONStream()
|
||
|
s.on('jsonError', e => {
|
||
|
// here, TS knows that e is an Error
|
||
|
})
|
||
|
```
|
||
|
|
||
|
Emitting/handling events that aren't declared in this way is
|
||
|
fine, but the arguments will be typed as `unknown`.
|
||
|
|
||
|
## Differences from Node.js Streams
|
||
|
|
||
|
There are several things that make Minipass streams different
|
||
|
from (and in some ways superior to) Node.js core streams.
|
||
|
|
||
|
Please read these caveats if you are familiar with node-core
|
||
|
streams and intend to use Minipass streams in your programs.
|
||
|
|
||
|
You can avoid most of these differences entirely (for a very
|
||
|
small performance penalty) by setting `{async: true}` in the
|
||
|
constructor options.
|
||
|
|
||
|
### Timing
|
||
|
|
||
|
Minipass streams are designed to support synchronous use-cases.
|
||
|
Thus, data is emitted as soon as it is available, always. It is
|
||
|
buffered until read, but no longer. Another way to look at it is
|
||
|
that Minipass streams are exactly as synchronous as the logic
|
||
|
that writes into them.
|
||
|
|
||
|
This can be surprising if your code relies on
|
||
|
`PassThrough.write()` always providing data on the next tick
|
||
|
rather than the current one, or being able to call `resume()` and
|
||
|
not have the entire buffer disappear immediately.
|
||
|
|
||
|
However, without this synchronicity guarantee, there would be no
|
||
|
way for Minipass to achieve the speeds it does, or support the
|
||
|
synchronous use cases that it does. Simply put, waiting takes
|
||
|
time.
|
||
|
|
||
|
This non-deferring approach makes Minipass streams much easier to
|
||
|
reason about, especially in the context of Promises and other
|
||
|
flow-control mechanisms.
|
||
|
|
||
|
Example:
|
||
|
|
||
|
```js
|
||
|
// hybrid module, either works
|
||
|
import { Minipass } from 'minipass'
|
||
|
// or:
|
||
|
const { Minipass } = require('minipass')
|
||
|
|
||
|
const stream = new Minipass()
|
||
|
stream.on('data', () => console.log('data event'))
|
||
|
console.log('before write')
|
||
|
stream.write('hello')
|
||
|
console.log('after write')
|
||
|
// output:
|
||
|
// before write
|
||
|
// data event
|
||
|
// after write
|
||
|
```
|
||
|
|
||
|
### Exception: Async Opt-In
|
||
|
|
||
|
If you wish to have a Minipass stream with behavior that more
|
||
|
closely mimics Node.js core streams, you can set the stream in
|
||
|
async mode either by setting `async: true` in the constructor
|
||
|
options, or by setting `stream.async = true` later on.
|
||
|
|
||
|
```js
|
||
|
// hybrid module, either works
|
||
|
import { Minipass } from 'minipass'
|
||
|
// or:
|
||
|
const { Minipass } = require('minipass')
|
||
|
|
||
|
const asyncStream = new Minipass({ async: true })
|
||
|
asyncStream.on('data', () => console.log('data event'))
|
||
|
console.log('before write')
|
||
|
asyncStream.write('hello')
|
||
|
console.log('after write')
|
||
|
// output:
|
||
|
// before write
|
||
|
// after write
|
||
|
// data event <-- this is deferred until the next tick
|
||
|
```
|
||
|
|
||
|
Switching _out_ of async mode is unsafe, as it could cause data
|
||
|
corruption, and so is not enabled. Example:
|
||
|
|
||
|
```js
|
||
|
import { Minipass } from 'minipass'
|
||
|
const stream = new Minipass({ encoding: 'utf8' })
|
||
|
stream.on('data', chunk => console.log(chunk))
|
||
|
stream.async = true
|
||
|
console.log('before writes')
|
||
|
stream.write('hello')
|
||
|
setStreamSyncAgainSomehow(stream) // <-- this doesn't actually exist!
|
||
|
stream.write('world')
|
||
|
console.log('after writes')
|
||
|
// hypothetical output would be:
|
||
|
// before writes
|
||
|
// world
|
||
|
// after writes
|
||
|
// hello
|
||
|
// NOT GOOD!
|
||
|
```
|
||
|
|
||
|
To avoid this problem, once set into async mode, any attempt to
|
||
|
make the stream sync again will be ignored.
|
||
|
|
||
|
```js
|
||
|
const { Minipass } = require('minipass')
|
||
|
const stream = new Minipass({ encoding: 'utf8' })
|
||
|
stream.on('data', chunk => console.log(chunk))
|
||
|
stream.async = true
|
||
|
console.log('before writes')
|
||
|
stream.write('hello')
|
||
|
stream.async = false // <-- no-op, stream already async
|
||
|
stream.write('world')
|
||
|
console.log('after writes')
|
||
|
// actual output:
|
||
|
// before writes
|
||
|
// after writes
|
||
|
// hello
|
||
|
// world
|
||
|
```
|
||
|
|
||
|
### No High/Low Water Marks
|
||
|
|
||
|
Node.js core streams will optimistically fill up a buffer,
|
||
|
returning `true` on all writes until the limit is hit, even if
|
||
|
the data has nowhere to go. Then, they will not attempt to draw
|
||
|
more data in until the buffer size dips below a minimum value.
|
||
|
|
||
|
Minipass streams are much simpler. The `write()` method will
|
||
|
return `true` if the data has somewhere to go (which is to say,
|
||
|
given the timing guarantees, that the data is already there by
|
||
|
the time `write()` returns).
|
||
|
|
||
|
If the data has nowhere to go, then `write()` returns false, and
|
||
|
the data sits in a buffer, to be drained out immediately as soon
|
||
|
as anyone consumes it.
|
||
|
|
||
|
Since nothing is ever buffered unnecessarily, there is much less
|
||
|
copying data, and less bookkeeping about buffer capacity levels.
|
||
|
|
||
|
### Hazards of Buffering (or: Why Minipass Is So Fast)
|
||
|
|
||
|
Since data written to a Minipass stream is immediately written
|
||
|
all the way through the pipeline, and `write()` always returns
|
||
|
true/false based on whether the data was fully flushed,
|
||
|
backpressure is communicated immediately to the upstream caller.
|
||
|
This minimizes buffering.
|
||
|
|
||
|
Consider this case:
|
||
|
|
||
|
```js
|
||
|
const { PassThrough } = require('stream')
|
||
|
const p1 = new PassThrough({ highWaterMark: 1024 })
|
||
|
const p2 = new PassThrough({ highWaterMark: 1024 })
|
||
|
const p3 = new PassThrough({ highWaterMark: 1024 })
|
||
|
const p4 = new PassThrough({ highWaterMark: 1024 })
|
||
|
|
||
|
p1.pipe(p2).pipe(p3).pipe(p4)
|
||
|
p4.on('data', () => console.log('made it through'))
|
||
|
|
||
|
// this returns false and buffers, then writes to p2 on next tick (1)
|
||
|
// p2 returns false and buffers, pausing p1, then writes to p3 on next tick (2)
|
||
|
// p3 returns false and buffers, pausing p2, then writes to p4 on next tick (3)
|
||
|
// p4 returns false and buffers, pausing p3, then emits 'data' and 'drain'
|
||
|
// on next tick (4)
|
||
|
// p3 sees p4's 'drain' event, and calls resume(), emitting 'resume' and
|
||
|
// 'drain' on next tick (5)
|
||
|
// p2 sees p3's 'drain', calls resume(), emits 'resume' and 'drain' on next tick (6)
|
||
|
// p1 sees p2's 'drain', calls resume(), emits 'resume' and 'drain' on next
|
||
|
// tick (7)
|
||
|
|
||
|
p1.write(Buffer.alloc(2048)) // returns false
|
||
|
```
|
||
|
|
||
|
Along the way, the data was buffered and deferred at each stage,
|
||
|
and multiple event deferrals happened, for an unblocked pipeline
|
||
|
where it was perfectly safe to write all the way through!
|
||
|
|
||
|
Furthermore, setting a `highWaterMark` of `1024` might lead
|
||
|
someone reading the code to think an advisory maximum of 1KiB is
|
||
|
being set for the pipeline. However, the actual advisory
|
||
|
buffering level is the _sum_ of `highWaterMark` values, since
|
||
|
each one has its own bucket.
|
||
|
|
||
|
Consider the Minipass case:
|
||
|
|
||
|
```js
|
||
|
const m1 = new Minipass()
|
||
|
const m2 = new Minipass()
|
||
|
const m3 = new Minipass()
|
||
|
const m4 = new Minipass()
|
||
|
|
||
|
m1.pipe(m2).pipe(m3).pipe(m4)
|
||
|
m4.on('data', () => console.log('made it through'))
|
||
|
|
||
|
// m1 is flowing, so it writes the data to m2 immediately
|
||
|
// m2 is flowing, so it writes the data to m3 immediately
|
||
|
// m3 is flowing, so it writes the data to m4 immediately
|
||
|
// m4 is flowing, so it fires the 'data' event immediately, returns true
|
||
|
// m4's write returned true, so m3 is still flowing, returns true
|
||
|
// m3's write returned true, so m2 is still flowing, returns true
|
||
|
// m2's write returned true, so m1 is still flowing, returns true
|
||
|
// No event deferrals or buffering along the way!
|
||
|
|
||
|
m1.write(Buffer.alloc(2048)) // returns true
|
||
|
```
|
||
|
|
||
|
It is extremely unlikely that you _don't_ want to buffer any data
|
||
|
written, or _ever_ buffer data that can be flushed all the way
|
||
|
through. Neither node-core streams nor Minipass ever fail to
|
||
|
buffer written data, but node-core streams do a lot of
|
||
|
unnecessary buffering and pausing.
|
||
|
|
||
|
As always, the faster implementation is the one that does less
|
||
|
stuff and waits less time to do it.
|
||
|
|
||
|
### Immediately emit `end` for empty streams (when not paused)
|
||
|
|
||
|
If a stream is not paused, and `end()` is called before writing
|
||
|
any data into it, then it will emit `end` immediately.
|
||
|
|
||
|
If you have logic that occurs on the `end` event which you don't
|
||
|
want to potentially happen immediately (for example, closing file
|
||
|
descriptors, moving on to the next entry in an archive parse
|
||
|
stream, etc.) then be sure to call `stream.pause()` on creation,
|
||
|
and then `stream.resume()` once you are ready to respond to the
|
||
|
`end` event.
|
||
|
|
||
|
However, this is _usually_ not a problem because:
|
||
|
|
||
|
### Emit `end` When Asked
|
||
|
|
||
|
One hazard of immediately emitting `'end'` is that you may not
|
||
|
yet have had a chance to add a listener. In order to avoid this
|
||
|
hazard, Minipass streams safely re-emit the `'end'` event if a
|
||
|
new listener is added after `'end'` has been emitted.
|
||
|
|
||
|
Ie, if you do `stream.on('end', someFunction)`, and the stream
|
||
|
has already emitted `end`, then it will call the handler right
|
||
|
away. (You can think of this somewhat like attaching a new
|
||
|
`.then(fn)` to a previously-resolved Promise.)
|
||
|
|
||
|
To prevent calling handlers multiple times who would not expect
|
||
|
multiple ends to occur, all listeners are removed from the
|
||
|
`'end'` event whenever it is emitted.
|
||
|
|
||
|
### Emit `error` When Asked
|
||
|
|
||
|
The most recent error object passed to the `'error'` event is
|
||
|
stored on the stream. If a new `'error'` event handler is added,
|
||
|
and an error was previously emitted, then the event handler will
|
||
|
be called immediately (or on `process.nextTick` in the case of
|
||
|
async streams).
|
||
|
|
||
|
This makes it much more difficult to end up trying to interact
|
||
|
with a broken stream, if the error handler is added after an
|
||
|
error was previously emitted.
|
||
|
|
||
|
### Impact of "immediate flow" on Tee-streams
|
||
|
|
||
|
A "tee stream" is a stream piping to multiple destinations:
|
||
|
|
||
|
```js
|
||
|
const tee = new Minipass()
|
||
|
t.pipe(dest1)
|
||
|
t.pipe(dest2)
|
||
|
t.write('foo') // goes to both destinations
|
||
|
```
|
||
|
|
||
|
Since Minipass streams _immediately_ process any pending data
|
||
|
through the pipeline when a new pipe destination is added, this
|
||
|
can have surprising effects, especially when a stream comes in
|
||
|
from some other function and may or may not have data in its
|
||
|
buffer.
|
||
|
|
||
|
```js
|
||
|
// WARNING! WILL LOSE DATA!
|
||
|
const src = new Minipass()
|
||
|
src.write('foo')
|
||
|
src.pipe(dest1) // 'foo' chunk flows to dest1 immediately, and is gone
|
||
|
src.pipe(dest2) // gets nothing!
|
||
|
```
|
||
|
|
||
|
One solution is to create a dedicated tee-stream junction that
|
||
|
pipes to both locations, and then pipe to _that_ instead.
|
||
|
|
||
|
```js
|
||
|
// Safe example: tee to both places
|
||
|
const src = new Minipass()
|
||
|
src.write('foo')
|
||
|
const tee = new Minipass()
|
||
|
tee.pipe(dest1)
|
||
|
tee.pipe(dest2)
|
||
|
src.pipe(tee) // tee gets 'foo', pipes to both locations
|
||
|
```
|
||
|
|
||
|
The same caveat applies to `on('data')` event listeners. The
|
||
|
first one added will _immediately_ receive all of the data,
|
||
|
leaving nothing for the second:
|
||
|
|
||
|
```js
|
||
|
// WARNING! WILL LOSE DATA!
|
||
|
const src = new Minipass()
|
||
|
src.write('foo')
|
||
|
src.on('data', handler1) // receives 'foo' right away
|
||
|
src.on('data', handler2) // nothing to see here!
|
||
|
```
|
||
|
|
||
|
Using a dedicated tee-stream can be used in this case as well:
|
||
|
|
||
|
```js
|
||
|
// Safe example: tee to both data handlers
|
||
|
const src = new Minipass()
|
||
|
src.write('foo')
|
||
|
const tee = new Minipass()
|
||
|
tee.on('data', handler1)
|
||
|
tee.on('data', handler2)
|
||
|
src.pipe(tee)
|
||
|
```
|
||
|
|
||
|
All of the hazards in this section are avoided by setting `{
|
||
|
async: true }` in the Minipass constructor, or by setting
|
||
|
`stream.async = true` afterwards. Note that this does add some
|
||
|
overhead, so should only be done in cases where you are willing
|
||
|
to lose a bit of performance in order to avoid having to refactor
|
||
|
program logic.
|
||
|
|
||
|
## USAGE
|
||
|
|
||
|
It's a stream! Use it like a stream and it'll most likely do what
|
||
|
you want.
|
||
|
|
||
|
```js
|
||
|
import { Minipass } from 'minipass'
|
||
|
const mp = new Minipass(options) // options is optional
|
||
|
mp.write('foo')
|
||
|
mp.pipe(someOtherStream)
|
||
|
mp.end('bar')
|
||
|
```
|
||
|
|
||
|
### OPTIONS
|
||
|
|
||
|
- `encoding` How would you like the data coming _out_ of the
|
||
|
stream to be encoded? Accepts any values that can be passed to
|
||
|
`Buffer.toString()`.
|
||
|
- `objectMode` Emit data exactly as it comes in. This will be
|
||
|
flipped on by default if you write() something other than a
|
||
|
string or Buffer at any point. Setting `objectMode: true` will
|
||
|
prevent setting any encoding value.
|
||
|
- `async` Defaults to `false`. Set to `true` to defer data
|
||
|
emission until next tick. This reduces performance slightly,
|
||
|
but makes Minipass streams use timing behavior closer to Node
|
||
|
core streams. See [Timing](#timing) for more details.
|
||
|
- `signal` An `AbortSignal` that will cause the stream to unhook
|
||
|
itself from everything and become as inert as possible. Note
|
||
|
that providing a `signal` parameter will make `'error'` events
|
||
|
no longer throw if they are unhandled, but they will still be
|
||
|
emitted to handlers if any are attached.
|
||
|
|
||
|
### API
|
||
|
|
||
|
Implements the user-facing portions of Node.js's `Readable` and
|
||
|
`Writable` streams.
|
||
|
|
||
|
### Methods
|
||
|
|
||
|
- `write(chunk, [encoding], [callback])` - Put data in. (Note
|
||
|
that, in the base Minipass class, the same data will come out.)
|
||
|
Returns `false` if the stream will buffer the next write, or
|
||
|
true if it's still in "flowing" mode.
|
||
|
- `end([chunk, [encoding]], [callback])` - Signal that you have
|
||
|
no more data to write. This will queue an `end` event to be
|
||
|
fired when all the data has been consumed.
|
||
|
- `pause()` - No more data for a while, please. This also
|
||
|
prevents `end` from being emitted for empty streams until the
|
||
|
stream is resumed.
|
||
|
- `resume()` - Resume the stream. If there's data in the buffer,
|
||
|
it is all discarded. Any buffered events are immediately
|
||
|
emitted.
|
||
|
- `pipe(dest)` - Send all output to the stream provided. When
|
||
|
data is emitted, it is immediately written to any and all pipe
|
||
|
destinations. (Or written on next tick in `async` mode.)
|
||
|
- `unpipe(dest)` - Stop piping to the destination stream. This is
|
||
|
immediate, meaning that any asynchronously queued data will
|
||
|
_not_ make it to the destination when running in `async` mode.
|
||
|
- `options.end` - Boolean, end the destination stream when the
|
||
|
source stream ends. Default `true`.
|
||
|
- `options.proxyErrors` - Boolean, proxy `error` events from
|
||
|
the source stream to the destination stream. Note that errors
|
||
|
are _not_ proxied after the pipeline terminates, either due
|
||
|
to the source emitting `'end'` or manually unpiping with
|
||
|
`src.unpipe(dest)`. Default `false`.
|
||
|
- `on(ev, fn)`, `emit(ev, fn)` - Minipass streams are
|
||
|
EventEmitters. Some events are given special treatment,
|
||
|
however. (See below under "events".)
|
||
|
- `promise()` - Returns a Promise that resolves when the stream
|
||
|
emits `end`, or rejects if the stream emits `error`.
|
||
|
- `collect()` - Return a Promise that resolves on `end` with an
|
||
|
array containing each chunk of data that was emitted, or
|
||
|
rejects if the stream emits `error`. Note that this consumes
|
||
|
the stream data.
|
||
|
- `concat()` - Same as `collect()`, but concatenates the data
|
||
|
into a single Buffer object. Will reject the returned promise
|
||
|
if the stream is in objectMode, or if it goes into objectMode
|
||
|
by the end of the data.
|
||
|
- `read(n)` - Consume `n` bytes of data out of the buffer. If `n`
|
||
|
is not provided, then consume all of it. If `n` bytes are not
|
||
|
available, then it returns null. **Note** consuming streams in
|
||
|
this way is less efficient, and can lead to unnecessary Buffer
|
||
|
copying.
|
||
|
- `destroy([er])` - Destroy the stream. If an error is provided,
|
||
|
then an `'error'` event is emitted. If the stream has a
|
||
|
`close()` method, and has not emitted a `'close'` event yet,
|
||
|
then `stream.close()` will be called. Any Promises returned by
|
||
|
`.promise()`, `.collect()` or `.concat()` will be rejected.
|
||
|
After being destroyed, writing to the stream will emit an
|
||
|
error. No more data will be emitted if the stream is destroyed,
|
||
|
even if it was previously buffered.
|
||
|
|
||
|
### Properties
|
||
|
|
||
|
- `bufferLength` Read-only. Total number of bytes buffered, or in
|
||
|
the case of objectMode, the total number of objects.
|
||
|
- `encoding` Read-only. The encoding that has been set.
|
||
|
- `flowing` Read-only. Boolean indicating whether a chunk written
|
||
|
to the stream will be immediately emitted.
|
||
|
- `emittedEnd` Read-only. Boolean indicating whether the end-ish
|
||
|
events (ie, `end`, `prefinish`, `finish`) have been emitted.
|
||
|
Note that listening on any end-ish event will immediateyl
|
||
|
re-emit it if it has already been emitted.
|
||
|
- `writable` Whether the stream is writable. Default `true`. Set
|
||
|
to `false` when `end()`
|
||
|
- `readable` Whether the stream is readable. Default `true`.
|
||
|
- `pipes` An array of Pipe objects referencing streams that this
|
||
|
stream is piping into.
|
||
|
- `destroyed` A getter that indicates whether the stream was
|
||
|
destroyed.
|
||
|
- `paused` True if the stream has been explicitly paused,
|
||
|
otherwise false.
|
||
|
- `objectMode` Indicates whether the stream is in `objectMode`.
|
||
|
- `aborted` Readonly property set when the `AbortSignal`
|
||
|
dispatches an `abort` event.
|
||
|
|
||
|
### Events
|
||
|
|
||
|
- `data` Emitted when there's data to read. Argument is the data
|
||
|
to read. This is never emitted while not flowing. If a listener
|
||
|
is attached, that will resume the stream.
|
||
|
- `end` Emitted when there's no more data to read. This will be
|
||
|
emitted immediately for empty streams when `end()` is called.
|
||
|
If a listener is attached, and `end` was already emitted, then
|
||
|
it will be emitted again. All listeners are removed when `end`
|
||
|
is emitted.
|
||
|
- `prefinish` An end-ish event that follows the same logic as
|
||
|
`end` and is emitted in the same conditions where `end` is
|
||
|
emitted. Emitted after `'end'`.
|
||
|
- `finish` An end-ish event that follows the same logic as `end`
|
||
|
and is emitted in the same conditions where `end` is emitted.
|
||
|
Emitted after `'prefinish'`.
|
||
|
- `close` An indication that an underlying resource has been
|
||
|
released. Minipass does not emit this event, but will defer it
|
||
|
until after `end` has been emitted, since it throws off some
|
||
|
stream libraries otherwise.
|
||
|
- `drain` Emitted when the internal buffer empties, and it is
|
||
|
again suitable to `write()` into the stream.
|
||
|
- `readable` Emitted when data is buffered and ready to be read
|
||
|
by a consumer.
|
||
|
- `resume` Emitted when stream changes state from buffering to
|
||
|
flowing mode. (Ie, when `resume` is called, `pipe` is called,
|
||
|
or a `data` event listener is added.)
|
||
|
|
||
|
### Static Methods
|
||
|
|
||
|
- `Minipass.isStream(stream)` Returns `true` if the argument is a
|
||
|
stream, and false otherwise. To be considered a stream, the
|
||
|
object must be either an instance of Minipass, or an
|
||
|
EventEmitter that has either a `pipe()` method, or both
|
||
|
`write()` and `end()` methods. (Pretty much any stream in
|
||
|
node-land will return `true` for this.)
|
||
|
|
||
|
## EXAMPLES
|
||
|
|
||
|
Here are some examples of things you can do with Minipass
|
||
|
streams.
|
||
|
|
||
|
### simple "are you done yet" promise
|
||
|
|
||
|
```js
|
||
|
mp.promise().then(
|
||
|
() => {
|
||
|
// stream is finished
|
||
|
},
|
||
|
er => {
|
||
|
// stream emitted an error
|
||
|
}
|
||
|
)
|
||
|
```
|
||
|
|
||
|
### collecting
|
||
|
|
||
|
```js
|
||
|
mp.collect().then(all => {
|
||
|
// all is an array of all the data emitted
|
||
|
// encoding is supported in this case, so
|
||
|
// so the result will be a collection of strings if
|
||
|
// an encoding is specified, or buffers/objects if not.
|
||
|
//
|
||
|
// In an async function, you may do
|
||
|
// const data = await stream.collect()
|
||
|
})
|
||
|
```
|
||
|
|
||
|
### collecting into a single blob
|
||
|
|
||
|
This is a bit slower because it concatenates the data into one
|
||
|
chunk for you, but if you're going to do it yourself anyway, it's
|
||
|
convenient this way:
|
||
|
|
||
|
```js
|
||
|
mp.concat().then(onebigchunk => {
|
||
|
// onebigchunk is a string if the stream
|
||
|
// had an encoding set, or a buffer otherwise.
|
||
|
})
|
||
|
```
|
||
|
|
||
|
### iteration
|
||
|
|
||
|
You can iterate over streams synchronously or asynchronously in
|
||
|
platforms that support it.
|
||
|
|
||
|
Synchronous iteration will end when the currently available data
|
||
|
is consumed, even if the `end` event has not been reached. In
|
||
|
string and buffer mode, the data is concatenated, so unless
|
||
|
multiple writes are occurring in the same tick as the `read()`,
|
||
|
sync iteration loops will generally only have a single iteration.
|
||
|
|
||
|
To consume chunks in this way exactly as they have been written,
|
||
|
with no flattening, create the stream with the `{ objectMode:
|
||
|
true }` option.
|
||
|
|
||
|
```js
|
||
|
const mp = new Minipass({ objectMode: true })
|
||
|
mp.write('a')
|
||
|
mp.write('b')
|
||
|
for (let letter of mp) {
|
||
|
console.log(letter) // a, b
|
||
|
}
|
||
|
mp.write('c')
|
||
|
mp.write('d')
|
||
|
for (let letter of mp) {
|
||
|
console.log(letter) // c, d
|
||
|
}
|
||
|
mp.write('e')
|
||
|
mp.end()
|
||
|
for (let letter of mp) {
|
||
|
console.log(letter) // e
|
||
|
}
|
||
|
for (let letter of mp) {
|
||
|
console.log(letter) // nothing
|
||
|
}
|
||
|
```
|
||
|
|
||
|
Asynchronous iteration will continue until the end event is reached,
|
||
|
consuming all of the data.
|
||
|
|
||
|
```js
|
||
|
const mp = new Minipass({ encoding: 'utf8' })
|
||
|
|
||
|
// some source of some data
|
||
|
let i = 5
|
||
|
const inter = setInterval(() => {
|
||
|
if (i-- > 0) mp.write(Buffer.from('foo\n', 'utf8'))
|
||
|
else {
|
||
|
mp.end()
|
||
|
clearInterval(inter)
|
||
|
}
|
||
|
}, 100)
|
||
|
|
||
|
// consume the data with asynchronous iteration
|
||
|
async function consume() {
|
||
|
for await (let chunk of mp) {
|
||
|
console.log(chunk)
|
||
|
}
|
||
|
return 'ok'
|
||
|
}
|
||
|
|
||
|
consume().then(res => console.log(res))
|
||
|
// logs `foo\n` 5 times, and then `ok`
|
||
|
```
|
||
|
|
||
|
### subclass that `console.log()`s everything written into it
|
||
|
|
||
|
```js
|
||
|
class Logger extends Minipass {
|
||
|
write(chunk, encoding, callback) {
|
||
|
console.log('WRITE', chunk, encoding)
|
||
|
return super.write(chunk, encoding, callback)
|
||
|
}
|
||
|
end(chunk, encoding, callback) {
|
||
|
console.log('END', chunk, encoding)
|
||
|
return super.end(chunk, encoding, callback)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
someSource.pipe(new Logger()).pipe(someDest)
|
||
|
```
|
||
|
|
||
|
### same thing, but using an inline anonymous class
|
||
|
|
||
|
```js
|
||
|
// js classes are fun
|
||
|
someSource
|
||
|
.pipe(
|
||
|
new (class extends Minipass {
|
||
|
emit(ev, ...data) {
|
||
|
// let's also log events, because debugging some weird thing
|
||
|
console.log('EMIT', ev)
|
||
|
return super.emit(ev, ...data)
|
||
|
}
|
||
|
write(chunk, encoding, callback) {
|
||
|
console.log('WRITE', chunk, encoding)
|
||
|
return super.write(chunk, encoding, callback)
|
||
|
}
|
||
|
end(chunk, encoding, callback) {
|
||
|
console.log('END', chunk, encoding)
|
||
|
return super.end(chunk, encoding, callback)
|
||
|
}
|
||
|
})()
|
||
|
)
|
||
|
.pipe(someDest)
|
||
|
```
|
||
|
|
||
|
### subclass that defers 'end' for some reason
|
||
|
|
||
|
```js
|
||
|
class SlowEnd extends Minipass {
|
||
|
emit(ev, ...args) {
|
||
|
if (ev === 'end') {
|
||
|
console.log('going to end, hold on a sec')
|
||
|
setTimeout(() => {
|
||
|
console.log('ok, ready to end now')
|
||
|
super.emit('end', ...args)
|
||
|
}, 100)
|
||
|
return true
|
||
|
} else {
|
||
|
return super.emit(ev, ...args)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
```
|
||
|
|
||
|
### transform that creates newline-delimited JSON
|
||
|
|
||
|
```js
|
||
|
class NDJSONEncode extends Minipass {
|
||
|
write(obj, cb) {
|
||
|
try {
|
||
|
// JSON.stringify can throw, emit an error on that
|
||
|
return super.write(JSON.stringify(obj) + '\n', 'utf8', cb)
|
||
|
} catch (er) {
|
||
|
this.emit('error', er)
|
||
|
}
|
||
|
}
|
||
|
end(obj, cb) {
|
||
|
if (typeof obj === 'function') {
|
||
|
cb = obj
|
||
|
obj = undefined
|
||
|
}
|
||
|
if (obj !== undefined) {
|
||
|
this.write(obj)
|
||
|
}
|
||
|
return super.end(cb)
|
||
|
}
|
||
|
}
|
||
|
```
|
||
|
|
||
|
### transform that parses newline-delimited JSON
|
||
|
|
||
|
```js
|
||
|
class NDJSONDecode extends Minipass {
|
||
|
constructor(options) {
|
||
|
// always be in object mode, as far as Minipass is concerned
|
||
|
super({ objectMode: true })
|
||
|
this._jsonBuffer = ''
|
||
|
}
|
||
|
write(chunk, encoding, cb) {
|
||
|
if (
|
||
|
typeof chunk === 'string' &&
|
||
|
typeof encoding === 'string' &&
|
||
|
encoding !== 'utf8'
|
||
|
) {
|
||
|
chunk = Buffer.from(chunk, encoding).toString()
|
||
|
} else if (Buffer.isBuffer(chunk)) {
|
||
|
chunk = chunk.toString()
|
||
|
}
|
||
|
if (typeof encoding === 'function') {
|
||
|
cb = encoding
|
||
|
}
|
||
|
const jsonData = (this._jsonBuffer + chunk).split('\n')
|
||
|
this._jsonBuffer = jsonData.pop()
|
||
|
for (let i = 0; i < jsonData.length; i++) {
|
||
|
try {
|
||
|
// JSON.parse can throw, emit an error on that
|
||
|
super.write(JSON.parse(jsonData[i]))
|
||
|
} catch (er) {
|
||
|
this.emit('error', er)
|
||
|
continue
|
||
|
}
|
||
|
}
|
||
|
if (cb) cb()
|
||
|
}
|
||
|
}
|
||
|
```
|