From 5f727fdc89c06782652bfbf6a4d05ade1db3d2c8 Mon Sep 17 00:00:00 2001 From: Trivikram Kamat <16024985+trivikr@users.noreply.github.com> Date: Wed, 3 Jun 2026 22:38:18 -0700 Subject: [PATCH 1/2] stream: fast-path stateless transform flush results Avoid generator-based normalization for common flush result types in fused stateless stream/iter transforms. This keeps the corrected flush ordering while reducing overhead for already-normalized batches and primitive byte outputs. Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 PR-URL: https://github.com/nodejs/node/pull/63605 Reviewed-By: Matteo Collina --- lib/internal/streams/iter/pull.js | 68 +++++++++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 4 deletions(-) diff --git a/lib/internal/streams/iter/pull.js b/lib/internal/streams/iter/pull.js index a6ba6d59e69e39..6b8e0ebd874019 100644 --- a/lib/internal/streams/iter/pull.js +++ b/lib/internal/streams/iter/pull.js @@ -280,6 +280,31 @@ function* processTransformResultSync(result) { * @param {*} result */ function appendTransformResultSync(target, result) { + if (result === null) { + return; + } + if (isUint8ArrayBatch(result)) { + if (result.length > 0) { + ArrayPrototypePush(target, result); + } + return; + } + if (isUint8Array(result)) { + ArrayPrototypePush(target, [result]); + return; + } + if (typeof result === 'string') { + ArrayPrototypePush(target, [toUint8Array(result)]); + return; + } + if (isAnyArrayBuffer(result)) { + ArrayPrototypePush(target, [new Uint8Array(result)]); + return; + } + if (ArrayBufferIsView(result)) { + ArrayPrototypePush(target, [arrayBufferViewToUint8Array(result)]); + return; + } for (const batch of processTransformResultSync(result)) { ArrayPrototypePush(target, batch); } @@ -372,9 +397,38 @@ async function* processTransformResultAsync(result) { * Append normalized transform result batches to an array (async). * @param {Array} target * @param {*} result - * @returns {Promise} + * @returns {Promise|undefined} */ -async function appendTransformResultAsync(target, result) { +function appendTransformResultAsync(target, result) { + if (result === null) { + return; + } + if (isUint8ArrayBatch(result)) { + if (result.length > 0) { + ArrayPrototypePush(target, result); + } + return; + } + if (isUint8Array(result)) { + ArrayPrototypePush(target, [result]); + return; + } + if (typeof result === 'string') { + ArrayPrototypePush(target, [toUint8Array(result)]); + return; + } + if (isAnyArrayBuffer(result)) { + ArrayPrototypePush(target, [new Uint8Array(result)]); + return; + } + if (ArrayBufferIsView(result)) { + ArrayPrototypePush(target, [arrayBufferViewToUint8Array(result)]); + return; + } + return appendTransformResultAsyncSlow(target, result); +} + +async function appendTransformResultAsyncSlow(target, result) { for await (const batch of processTransformResultAsync(result)) { ArrayPrototypePush(target, batch); } @@ -553,13 +607,19 @@ async function* applyFusedStatelessAsyncTransforms(source, run, signal) { for (let i = 0; i < run.length; i++) { const next = []; for (let j = 0; j < pending.length; j++) { - await appendTransformResultAsync( + const pendingResult = appendTransformResultAsync( next, run[i](pending[j], { __proto__: null, signal })); + if (pendingResult !== undefined) { + await pendingResult; + } } - await appendTransformResultAsync( + const flushResult = appendTransformResultAsync( next, run[i](null, { __proto__: null, signal })); + if (flushResult !== undefined) { + await flushResult; + } pending = next; } for (let i = 0; i < pending.length; i++) { From 09c603fb46580b575e3f1bc348d888e459b6f802 Mon Sep 17 00:00:00 2001 From: David Sanders Date: Thu, 4 Jun 2026 01:41:43 -0700 Subject: [PATCH 2/2] doc: add webstreams to args for `pipeline` from `stream/promises` Signed-off-by: David Sanders PR-URL: https://github.com/nodejs/node/pull/63628 Reviewed-By: Darshan Sen Reviewed-By: Mattias Buelens --- doc/api/stream.md | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 9e34d78937d88c..60821bb1015fb8 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -69,6 +69,11 @@ or `require('node:stream').promises`. -* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]} -* `source` {Stream|Iterable|AsyncIterable|Function} +* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]| + ReadableStream\[]|WritableStream\[]|TransformStream\[]} +* `source` {Stream|Iterable|AsyncIterable|Function|ReadableStream} * Returns: {Promise|AsyncIterable} -* `...transforms` {Stream|Function} +* `...transforms` {Stream|Function|TransformStream} * `source` {AsyncIterable} * Returns: {Promise|AsyncIterable} -* `destination` {Stream|Function} +* `destination` {Stream|Function|WritableStream} * `source` {AsyncIterable} * Returns: {Promise|AsyncIterable} * `options` {Object} Pipeline options