458 lines
13 KiB
JavaScript
458 lines
13 KiB
JavaScript
|
'use strict'
|
||
|
|
||
|
const AbortController = globalThis.AbortController || require('abort-controller').AbortController
|
||
|
const {
|
||
|
codes: { ERR_INVALID_ARG_VALUE, ERR_INVALID_ARG_TYPE, ERR_MISSING_ARGS, ERR_OUT_OF_RANGE },
|
||
|
AbortError
|
||
|
} = require('../../ours/errors')
|
||
|
const { validateAbortSignal, validateInteger, validateObject } = require('../validators')
|
||
|
const kWeakHandler = require('../../ours/primordials').Symbol('kWeak')
|
||
|
const kResistStopPropagation = require('../../ours/primordials').Symbol('kResistStopPropagation')
|
||
|
const { finished } = require('./end-of-stream')
|
||
|
const staticCompose = require('./compose')
|
||
|
const { addAbortSignalNoValidate } = require('./add-abort-signal')
|
||
|
const { isWritable, isNodeStream } = require('./utils')
|
||
|
const { deprecate } = require('../../ours/util')
|
||
|
const {
|
||
|
ArrayPrototypePush,
|
||
|
Boolean,
|
||
|
MathFloor,
|
||
|
Number,
|
||
|
NumberIsNaN,
|
||
|
Promise,
|
||
|
PromiseReject,
|
||
|
PromiseResolve,
|
||
|
PromisePrototypeThen,
|
||
|
Symbol
|
||
|
} = require('../../ours/primordials')
|
||
|
const kEmpty = Symbol('kEmpty')
|
||
|
const kEof = Symbol('kEof')
|
||
|
function compose(stream, options) {
|
||
|
if (options != null) {
|
||
|
validateObject(options, 'options')
|
||
|
}
|
||
|
if ((options === null || options === undefined ? undefined : options.signal) != null) {
|
||
|
validateAbortSignal(options.signal, 'options.signal')
|
||
|
}
|
||
|
if (isNodeStream(stream) && !isWritable(stream)) {
|
||
|
throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be writable')
|
||
|
}
|
||
|
const composedStream = staticCompose(this, stream)
|
||
|
if (options !== null && options !== undefined && options.signal) {
|
||
|
// Not validating as we already validated before
|
||
|
addAbortSignalNoValidate(options.signal, composedStream)
|
||
|
}
|
||
|
return composedStream
|
||
|
}
|
||
|
function map(fn, options) {
|
||
|
if (typeof fn !== 'function') {
|
||
|
throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
|
||
|
}
|
||
|
if (options != null) {
|
||
|
validateObject(options, 'options')
|
||
|
}
|
||
|
if ((options === null || options === undefined ? undefined : options.signal) != null) {
|
||
|
validateAbortSignal(options.signal, 'options.signal')
|
||
|
}
|
||
|
let concurrency = 1
|
||
|
if ((options === null || options === undefined ? undefined : options.concurrency) != null) {
|
||
|
concurrency = MathFloor(options.concurrency)
|
||
|
}
|
||
|
let highWaterMark = concurrency - 1
|
||
|
if ((options === null || options === undefined ? undefined : options.highWaterMark) != null) {
|
||
|
highWaterMark = MathFloor(options.highWaterMark)
|
||
|
}
|
||
|
validateInteger(concurrency, 'options.concurrency', 1)
|
||
|
validateInteger(highWaterMark, 'options.highWaterMark', 0)
|
||
|
highWaterMark += concurrency
|
||
|
return async function* map() {
|
||
|
const signal = require('../../ours/util').AbortSignalAny(
|
||
|
[options === null || options === undefined ? undefined : options.signal].filter(Boolean)
|
||
|
)
|
||
|
const stream = this
|
||
|
const queue = []
|
||
|
const signalOpt = {
|
||
|
signal
|
||
|
}
|
||
|
let next
|
||
|
let resume
|
||
|
let done = false
|
||
|
let cnt = 0
|
||
|
function onCatch() {
|
||
|
done = true
|
||
|
afterItemProcessed()
|
||
|
}
|
||
|
function afterItemProcessed() {
|
||
|
cnt -= 1
|
||
|
maybeResume()
|
||
|
}
|
||
|
function maybeResume() {
|
||
|
if (resume && !done && cnt < concurrency && queue.length < highWaterMark) {
|
||
|
resume()
|
||
|
resume = null
|
||
|
}
|
||
|
}
|
||
|
async function pump() {
|
||
|
try {
|
||
|
for await (let val of stream) {
|
||
|
if (done) {
|
||
|
return
|
||
|
}
|
||
|
if (signal.aborted) {
|
||
|
throw new AbortError()
|
||
|
}
|
||
|
try {
|
||
|
val = fn(val, signalOpt)
|
||
|
if (val === kEmpty) {
|
||
|
continue
|
||
|
}
|
||
|
val = PromiseResolve(val)
|
||
|
} catch (err) {
|
||
|
val = PromiseReject(err)
|
||
|
}
|
||
|
cnt += 1
|
||
|
PromisePrototypeThen(val, afterItemProcessed, onCatch)
|
||
|
queue.push(val)
|
||
|
if (next) {
|
||
|
next()
|
||
|
next = null
|
||
|
}
|
||
|
if (!done && (queue.length >= highWaterMark || cnt >= concurrency)) {
|
||
|
await new Promise((resolve) => {
|
||
|
resume = resolve
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
queue.push(kEof)
|
||
|
} catch (err) {
|
||
|
const val = PromiseReject(err)
|
||
|
PromisePrototypeThen(val, afterItemProcessed, onCatch)
|
||
|
queue.push(val)
|
||
|
} finally {
|
||
|
done = true
|
||
|
if (next) {
|
||
|
next()
|
||
|
next = null
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
pump()
|
||
|
try {
|
||
|
while (true) {
|
||
|
while (queue.length > 0) {
|
||
|
const val = await queue[0]
|
||
|
if (val === kEof) {
|
||
|
return
|
||
|
}
|
||
|
if (signal.aborted) {
|
||
|
throw new AbortError()
|
||
|
}
|
||
|
if (val !== kEmpty) {
|
||
|
yield val
|
||
|
}
|
||
|
queue.shift()
|
||
|
maybeResume()
|
||
|
}
|
||
|
await new Promise((resolve) => {
|
||
|
next = resolve
|
||
|
})
|
||
|
}
|
||
|
} finally {
|
||
|
done = true
|
||
|
if (resume) {
|
||
|
resume()
|
||
|
resume = null
|
||
|
}
|
||
|
}
|
||
|
}.call(this)
|
||
|
}
|
||
|
function asIndexedPairs(options = undefined) {
|
||
|
if (options != null) {
|
||
|
validateObject(options, 'options')
|
||
|
}
|
||
|
if ((options === null || options === undefined ? undefined : options.signal) != null) {
|
||
|
validateAbortSignal(options.signal, 'options.signal')
|
||
|
}
|
||
|
return async function* asIndexedPairs() {
|
||
|
let index = 0
|
||
|
for await (const val of this) {
|
||
|
var _options$signal
|
||
|
if (
|
||
|
options !== null &&
|
||
|
options !== undefined &&
|
||
|
(_options$signal = options.signal) !== null &&
|
||
|
_options$signal !== undefined &&
|
||
|
_options$signal.aborted
|
||
|
) {
|
||
|
throw new AbortError({
|
||
|
cause: options.signal.reason
|
||
|
})
|
||
|
}
|
||
|
yield [index++, val]
|
||
|
}
|
||
|
}.call(this)
|
||
|
}
|
||
|
async function some(fn, options = undefined) {
|
||
|
for await (const unused of filter.call(this, fn, options)) {
|
||
|
return true
|
||
|
}
|
||
|
return false
|
||
|
}
|
||
|
async function every(fn, options = undefined) {
|
||
|
if (typeof fn !== 'function') {
|
||
|
throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
|
||
|
}
|
||
|
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
|
||
|
return !(await some.call(
|
||
|
this,
|
||
|
async (...args) => {
|
||
|
return !(await fn(...args))
|
||
|
},
|
||
|
options
|
||
|
))
|
||
|
}
|
||
|
async function find(fn, options) {
|
||
|
for await (const result of filter.call(this, fn, options)) {
|
||
|
return result
|
||
|
}
|
||
|
return undefined
|
||
|
}
|
||
|
async function forEach(fn, options) {
|
||
|
if (typeof fn !== 'function') {
|
||
|
throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
|
||
|
}
|
||
|
async function forEachFn(value, options) {
|
||
|
await fn(value, options)
|
||
|
return kEmpty
|
||
|
}
|
||
|
// eslint-disable-next-line no-unused-vars
|
||
|
for await (const unused of map.call(this, forEachFn, options));
|
||
|
}
|
||
|
function filter(fn, options) {
|
||
|
if (typeof fn !== 'function') {
|
||
|
throw new ERR_INVALID_ARG_TYPE('fn', ['Function', 'AsyncFunction'], fn)
|
||
|
}
|
||
|
async function filterFn(value, options) {
|
||
|
if (await fn(value, options)) {
|
||
|
return value
|
||
|
}
|
||
|
return kEmpty
|
||
|
}
|
||
|
return map.call(this, filterFn, options)
|
||
|
}
|
||
|
|
||
|
// Specific to provide better error to reduce since the argument is only
|
||
|
// missing if the stream has no items in it - but the code is still appropriate
|
||
|
class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
|
||
|
constructor() {
|
||
|
super('reduce')
|
||
|
this.message = 'Reduce of an empty stream requires an initial value'
|
||
|
}
|
||
|
}
|
||
|
async function reduce(reducer, initialValue, options) {
|
||
|
var _options$signal2
|
||
|
if (typeof reducer !== 'function') {
|
||
|
throw new ERR_INVALID_ARG_TYPE('reducer', ['Function', 'AsyncFunction'], reducer)
|
||
|
}
|
||
|
if (options != null) {
|
||
|
validateObject(options, 'options')
|
||
|
}
|
||
|
if ((options === null || options === undefined ? undefined : options.signal) != null) {
|
||
|
validateAbortSignal(options.signal, 'options.signal')
|
||
|
}
|
||
|
let hasInitialValue = arguments.length > 1
|
||
|
if (
|
||
|
options !== null &&
|
||
|
options !== undefined &&
|
||
|
(_options$signal2 = options.signal) !== null &&
|
||
|
_options$signal2 !== undefined &&
|
||
|
_options$signal2.aborted
|
||
|
) {
|
||
|
const err = new AbortError(undefined, {
|
||
|
cause: options.signal.reason
|
||
|
})
|
||
|
this.once('error', () => {}) // The error is already propagated
|
||
|
await finished(this.destroy(err))
|
||
|
throw err
|
||
|
}
|
||
|
const ac = new AbortController()
|
||
|
const signal = ac.signal
|
||
|
if (options !== null && options !== undefined && options.signal) {
|
||
|
const opts = {
|
||
|
once: true,
|
||
|
[kWeakHandler]: this,
|
||
|
[kResistStopPropagation]: true
|
||
|
}
|
||
|
options.signal.addEventListener('abort', () => ac.abort(), opts)
|
||
|
}
|
||
|
let gotAnyItemFromStream = false
|
||
|
try {
|
||
|
for await (const value of this) {
|
||
|
var _options$signal3
|
||
|
gotAnyItemFromStream = true
|
||
|
if (
|
||
|
options !== null &&
|
||
|
options !== undefined &&
|
||
|
(_options$signal3 = options.signal) !== null &&
|
||
|
_options$signal3 !== undefined &&
|
||
|
_options$signal3.aborted
|
||
|
) {
|
||
|
throw new AbortError()
|
||
|
}
|
||
|
if (!hasInitialValue) {
|
||
|
initialValue = value
|
||
|
hasInitialValue = true
|
||
|
} else {
|
||
|
initialValue = await reducer(initialValue, value, {
|
||
|
signal
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
if (!gotAnyItemFromStream && !hasInitialValue) {
|
||
|
throw new ReduceAwareErrMissingArgs()
|
||
|
}
|
||
|
} finally {
|
||
|
ac.abort()
|
||
|
}
|
||
|
return initialValue
|
||
|
}
|
||
|
async function toArray(options) {
|
||
|
if (options != null) {
|
||
|
validateObject(options, 'options')
|
||
|
}
|
||
|
if ((options === null || options === undefined ? undefined : options.signal) != null) {
|
||
|
validateAbortSignal(options.signal, 'options.signal')
|
||
|
}
|
||
|
const result = []
|
||
|
for await (const val of this) {
|
||
|
var _options$signal4
|
||
|
if (
|
||
|
options !== null &&
|
||
|
options !== undefined &&
|
||
|
(_options$signal4 = options.signal) !== null &&
|
||
|
_options$signal4 !== undefined &&
|
||
|
_options$signal4.aborted
|
||
|
) {
|
||
|
throw new AbortError(undefined, {
|
||
|
cause: options.signal.reason
|
||
|
})
|
||
|
}
|
||
|
ArrayPrototypePush(result, val)
|
||
|
}
|
||
|
return result
|
||
|
}
|
||
|
function flatMap(fn, options) {
|
||
|
const values = map.call(this, fn, options)
|
||
|
return async function* flatMap() {
|
||
|
for await (const val of values) {
|
||
|
yield* val
|
||
|
}
|
||
|
}.call(this)
|
||
|
}
|
||
|
function toIntegerOrInfinity(number) {
|
||
|
// We coerce here to align with the spec
|
||
|
// https://github.com/tc39/proposal-iterator-helpers/issues/169
|
||
|
number = Number(number)
|
||
|
if (NumberIsNaN(number)) {
|
||
|
return 0
|
||
|
}
|
||
|
if (number < 0) {
|
||
|
throw new ERR_OUT_OF_RANGE('number', '>= 0', number)
|
||
|
}
|
||
|
return number
|
||
|
}
|
||
|
function drop(number, options = undefined) {
|
||
|
if (options != null) {
|
||
|
validateObject(options, 'options')
|
||
|
}
|
||
|
if ((options === null || options === undefined ? undefined : options.signal) != null) {
|
||
|
validateAbortSignal(options.signal, 'options.signal')
|
||
|
}
|
||
|
number = toIntegerOrInfinity(number)
|
||
|
return async function* drop() {
|
||
|
var _options$signal5
|
||
|
if (
|
||
|
options !== null &&
|
||
|
options !== undefined &&
|
||
|
(_options$signal5 = options.signal) !== null &&
|
||
|
_options$signal5 !== undefined &&
|
||
|
_options$signal5.aborted
|
||
|
) {
|
||
|
throw new AbortError()
|
||
|
}
|
||
|
for await (const val of this) {
|
||
|
var _options$signal6
|
||
|
if (
|
||
|
options !== null &&
|
||
|
options !== undefined &&
|
||
|
(_options$signal6 = options.signal) !== null &&
|
||
|
_options$signal6 !== undefined &&
|
||
|
_options$signal6.aborted
|
||
|
) {
|
||
|
throw new AbortError()
|
||
|
}
|
||
|
if (number-- <= 0) {
|
||
|
yield val
|
||
|
}
|
||
|
}
|
||
|
}.call(this)
|
||
|
}
|
||
|
function take(number, options = undefined) {
|
||
|
if (options != null) {
|
||
|
validateObject(options, 'options')
|
||
|
}
|
||
|
if ((options === null || options === undefined ? undefined : options.signal) != null) {
|
||
|
validateAbortSignal(options.signal, 'options.signal')
|
||
|
}
|
||
|
number = toIntegerOrInfinity(number)
|
||
|
return async function* take() {
|
||
|
var _options$signal7
|
||
|
if (
|
||
|
options !== null &&
|
||
|
options !== undefined &&
|
||
|
(_options$signal7 = options.signal) !== null &&
|
||
|
_options$signal7 !== undefined &&
|
||
|
_options$signal7.aborted
|
||
|
) {
|
||
|
throw new AbortError()
|
||
|
}
|
||
|
for await (const val of this) {
|
||
|
var _options$signal8
|
||
|
if (
|
||
|
options !== null &&
|
||
|
options !== undefined &&
|
||
|
(_options$signal8 = options.signal) !== null &&
|
||
|
_options$signal8 !== undefined &&
|
||
|
_options$signal8.aborted
|
||
|
) {
|
||
|
throw new AbortError()
|
||
|
}
|
||
|
if (number-- > 0) {
|
||
|
yield val
|
||
|
}
|
||
|
|
||
|
// Don't get another item from iterator in case we reached the end
|
||
|
if (number <= 0) {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}.call(this)
|
||
|
}
|
||
|
module.exports.streamReturningOperators = {
|
||
|
asIndexedPairs: deprecate(asIndexedPairs, 'readable.asIndexedPairs will be removed in a future version.'),
|
||
|
drop,
|
||
|
filter,
|
||
|
flatMap,
|
||
|
map,
|
||
|
take,
|
||
|
compose
|
||
|
}
|
||
|
module.exports.promiseReturningOperators = {
|
||
|
every,
|
||
|
forEach,
|
||
|
reduce,
|
||
|
toArray,
|
||
|
some,
|
||
|
find
|
||
|
}
|