diff --git a/doc/api/stream.md b/doc/api/stream.md index 9e34d78937d88c..60821bb1015fb8 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -69,6 +69,11 @@ or `require('node:stream').promises`. -* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]} -* `source` {Stream|Iterable|AsyncIterable|Function} +* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]| + ReadableStream\[]|WritableStream\[]|TransformStream\[]} +* `source` {Stream|Iterable|AsyncIterable|Function|ReadableStream} * Returns: {Promise|AsyncIterable} -* `...transforms` {Stream|Function} +* `...transforms` {Stream|Function|TransformStream} * `source` {AsyncIterable} * Returns: {Promise|AsyncIterable} -* `destination` {Stream|Function} +* `destination` {Stream|Function|WritableStream} * `source` {AsyncIterable} * Returns: {Promise|AsyncIterable} * `options` {Object} Pipeline options diff --git a/lib/internal/streams/iter/pull.js b/lib/internal/streams/iter/pull.js index a6ba6d59e69e39..6b8e0ebd874019 100644 --- a/lib/internal/streams/iter/pull.js +++ b/lib/internal/streams/iter/pull.js @@ -280,6 +280,31 @@ function* processTransformResultSync(result) { * @param {*} result */ function appendTransformResultSync(target, result) { + if (result === null) { + return; + } + if (isUint8ArrayBatch(result)) { + if (result.length > 0) { + ArrayPrototypePush(target, result); + } + return; + } + if (isUint8Array(result)) { + ArrayPrototypePush(target, [result]); + return; + } + if (typeof result === 'string') { + ArrayPrototypePush(target, [toUint8Array(result)]); + return; + } + if (isAnyArrayBuffer(result)) { + ArrayPrototypePush(target, [new Uint8Array(result)]); + return; + } + if (ArrayBufferIsView(result)) { + ArrayPrototypePush(target, [arrayBufferViewToUint8Array(result)]); + return; + } for (const batch of processTransformResultSync(result)) { ArrayPrototypePush(target, batch); } @@ -372,9 +397,38 @@ async function* processTransformResultAsync(result) { * Append normalized transform result batches to an array (async). * @param {Array} target * @param {*} result - * @returns {Promise} + * @returns {Promise|undefined} */ -async function appendTransformResultAsync(target, result) { +function appendTransformResultAsync(target, result) { + if (result === null) { + return; + } + if (isUint8ArrayBatch(result)) { + if (result.length > 0) { + ArrayPrototypePush(target, result); + } + return; + } + if (isUint8Array(result)) { + ArrayPrototypePush(target, [result]); + return; + } + if (typeof result === 'string') { + ArrayPrototypePush(target, [toUint8Array(result)]); + return; + } + if (isAnyArrayBuffer(result)) { + ArrayPrototypePush(target, [new Uint8Array(result)]); + return; + } + if (ArrayBufferIsView(result)) { + ArrayPrototypePush(target, [arrayBufferViewToUint8Array(result)]); + return; + } + return appendTransformResultAsyncSlow(target, result); +} + +async function appendTransformResultAsyncSlow(target, result) { for await (const batch of processTransformResultAsync(result)) { ArrayPrototypePush(target, batch); } @@ -553,13 +607,19 @@ async function* applyFusedStatelessAsyncTransforms(source, run, signal) { for (let i = 0; i < run.length; i++) { const next = []; for (let j = 0; j < pending.length; j++) { - await appendTransformResultAsync( + const pendingResult = appendTransformResultAsync( next, run[i](pending[j], { __proto__: null, signal })); + if (pendingResult !== undefined) { + await pendingResult; + } } - await appendTransformResultAsync( + const flushResult = appendTransformResultAsync( next, run[i](null, { __proto__: null, signal })); + if (flushResult !== undefined) { + await flushResult; + } pending = next; } for (let i = 0; i < pending.length; i++) {