From 793eff87a7a63dd6701818aa60ad4888ac80f47a Mon Sep 17 00:00:00 2001 From: Chengzhong Wu Date: Thu, 28 May 2026 22:18:02 -0400 Subject: [PATCH 1/2] meta: label "source maps" PRs Signed-off-by: Chengzhong Wu PR-URL: https://github.com/nodejs/node/pull/63591 Reviewed-By: Jake Yuesong Li Reviewed-By: Antoine du Hamel Reviewed-By: Matteo Collina Reviewed-By: Luigi Pinca --- .github/label-pr-config.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/label-pr-config.yml b/.github/label-pr-config.yml index 551bee55ababce..8b36066473f415 100644 --- a/.github/label-pr-config.yml +++ b/.github/label-pr-config.yml @@ -104,6 +104,7 @@ subSystemLabels: /^lib\/internal\/url\.js$/: whatwg-url /^lib\/internal\/modules\/esm/: esm /^lib\/internal\/modules/: module + /^lib\/internal\/source_map/: source maps /^lib\/internal\/webstreams/: web streams /^lib\/internal\/test_runner/: test_runner /^lib\/internal\/v8\//: v8 module @@ -128,6 +129,8 @@ exlusiveLabels: /^test\/report\//: test, report /^test\/fixtures\/es-module/: test, esm /^test\/es-module\//: test, esm + /^test\/fixtures\/source-map/: test, source maps + /^test\/fixtures\/test-426/: test, source maps /^test\/fixtures\/wpt\/streams\//: test, web streams /^test\/fixtures\/typescript/: test, strip-types /^test\/module-hooks\//: test, module, loaders From 40dc5a1dc465ebb27002c996bbf1d00cbf74b8fc Mon Sep 17 00:00:00 2001 From: Trivikram Kamat <16024985+trivikr@users.noreply.github.com> Date: Fri, 29 May 2026 00:59:57 -0700 Subject: [PATCH 2/2] stream: use data listener for compose forwarding Forward Node.js tail stream data through a pipe-style data listener instead of manually draining readable events. This keeps compose closer to the stream pipe hot path while preserving backpressure with pause and resume. Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 PR-URL: https://github.com/nodejs/node/pull/63593 Reviewed-By: Robert Nagy Reviewed-By: Ethan Arrowood Reviewed-By: Jake Yuesong Li Reviewed-By: Stephen Belanger --- benchmark/streams/compose.js | 63 +++++++++++++++++++++++++++++++-- lib/internal/streams/compose.js | 28 ++++----------- 2 files changed, 67 insertions(+), 24 deletions(-) diff --git a/benchmark/streams/compose.js b/benchmark/streams/compose.js index b98596ffbd1411..283ad8b7e30b32 100644 --- a/benchmark/streams/compose.js +++ b/benchmark/streams/compose.js @@ -9,16 +9,35 @@ const { } = require('node:stream'); const bench = common.createBenchmark(main, { - n: [1e3], + type: ['creation', 'throughput'], + n: [1, 1e3], + streams: [100], + chunks: [1e4], +}, { + combinationFilter({ type, n }) { + return type === 'creation' ? n === 1e3 : n === 1; + }, + test: { + n: [1, 1e3], + type: ['creation', 'throughput'], + }, }); -function main({ n }) { +function main({ type, n, streams, chunks }) { + switch (type) { + case 'creation': + return benchCreation(n, streams); + case 'throughput': + return benchThroughput(n, streams, chunks); + } +} + +function benchCreation(n, numberOfPassThroughs) { const cachedPassThroughs = []; const cachedReadables = []; const cachedWritables = []; for (let i = 0; i < n; i++) { - const numberOfPassThroughs = 100; const passThroughs = []; for (let i = 0; i < numberOfPassThroughs; i++) { @@ -40,3 +59,41 @@ function main({ n }) { } bench.end(n); } + +function benchThroughput(n, numberOfPassThroughs, chunks) { + const chunk = Buffer.alloc(1024); + + let i = 0; + bench.start(); + + function run() { + if (i++ === n) { + bench.end(n * chunks); + return; + } + + const passThroughs = []; + for (let i = 0; i < numberOfPassThroughs; i++) { + passThroughs.push(new PassThrough()); + } + + let remaining = chunks; + const composed = compose(...passThroughs); + composed.on('data', () => {}); + composed.on('end', run); + + write(); + + function write() { + while (remaining-- > 0) { + if (!composed.write(chunk)) { + composed.once('drain', write); + return; + } + } + composed.end(); + } + } + + run(); +} diff --git a/lib/internal/streams/compose.js b/lib/internal/streams/compose.js index d664a430b8d75a..7baa7974ffdf05 100644 --- a/lib/internal/streams/compose.js +++ b/lib/internal/streams/compose.js @@ -82,7 +82,6 @@ module.exports = function compose(...streams) { let ondrain; let onfinish; - let onreadable; let onclose; let d; @@ -184,31 +183,19 @@ module.exports = function compose(...streams) { if (readable) { if (isNodeStream(tail)) { - tail.on('readable', function() { - if (onreadable) { - const cb = onreadable; - onreadable = null; - cb(); + d._read = function() { + tail.resume(); + }; + + tail.on('data', function(chunk) { + if (!d.push(chunk)) { + tail.pause(); } }); tail.on('end', function() { d.push(null); }); - - d._read = function() { - while (true) { - const buf = tail.read(); - if (buf === null) { - onreadable = d._read; - return; - } - - if (!d.push(buf)) { - return; - } - } - }; } else if (isWebStream(tail)) { const readable = isTransformStream(tail) ? tail.readable : tail; const reader = readable.getReader(); @@ -238,7 +225,6 @@ module.exports = function compose(...streams) { err = new AbortError(); } - onreadable = null; ondrain = null; onfinish = null;