diff --git a/lib/internal/streams/iter/pull.js b/lib/internal/streams/iter/pull.js index 3ff88b251d182a..95871e99037d58 100644 --- a/lib/internal/streams/iter/pull.js +++ b/lib/internal/streams/iter/pull.js @@ -273,6 +273,17 @@ function* processTransformResultSync(result) { result); } +/** + * Append normalized transform result batches to an array (sync). + * @param {Array} target + * @param {*} result + */ +function appendTransformResultSync(target, result) { + for (const batch of processTransformResultSync(result)) { + ArrayPrototypePush(target, batch); + } +} + /** * Process transform result (async). * @yields {Uint8Array[]} @@ -356,6 +367,18 @@ async function* processTransformResultAsync(result) { result); } +/** + * Append normalized transform result batches to an array (async). + * @param {Array} target + * @param {*} result + * @returns {Promise} + */ +async function appendTransformResultAsync(target, result) { + for await (const batch of processTransformResultAsync(result)) { + ArrayPrototypePush(target, batch); + } +} + // ============================================================================= // Sync Pipeline Implementation // ============================================================================= @@ -398,18 +421,19 @@ function* applyFusedStatelessSyncTransforms(source, run) { yield* processTransformResultSync(current); } } - // Flush - let current = null; + // Flush each transform after all upstream data, including data emitted by + // earlier flushes, has been processed by that transform. + let pending = []; for (let i = 0; i < run.length; i++) { - const result = run[i](current); - if (result === null) { - current = null; - continue; + const next = []; + for (let j = 0; j < pending.length; j++) { + appendTransformResultSync(next, run[i](pending[j])); } - current = result; + appendTransformResultSync(next, run[i](null)); + pending = next; } - if (current != null) { - yield* processTransformResultSync(current); + for (let i = 0; i < pending.length; i++) { + yield pending[i]; } } @@ -522,30 +546,23 @@ async function* applyFusedStatelessAsyncTransforms(source, run, signal) { yield* processTransformResultAsync(current); } } - // Flush: send null through each transform in order - let current = null; + // Flush each transform after all upstream data, including data emitted by + // earlier flushes, has been processed by that transform. + let pending = []; for (let i = 0; i < run.length; i++) { - const result = run[i](current, { __proto__: null, signal }); - if (result === null) { - current = null; - continue; - } - if (isPromise(result)) { - current = await result; - } else { - current = result; + const next = []; + for (let j = 0; j < pending.length; j++) { + await appendTransformResultAsync( + next, + run[i](pending[j], { __proto__: null, signal })); } + await appendTransformResultAsync( + next, + run[i](null, { __proto__: null, signal })); + pending = next; } - if (current !== null) { - if (isUint8ArrayBatch(current)) { - if (current.length > 0) yield current; - } else if (isUint8Array(current)) { - yield [current]; - } else if (typeof current === 'string') { - yield [toUint8Array(current)]; - } else { - yield* processTransformResultAsync(current); - } + for (let i = 0; i < pending.length; i++) { + yield pending[i]; } } diff --git a/test/parallel/test-stream-iter-pull-async.js b/test/parallel/test-stream-iter-pull-async.js index 157cc5e265ea34..1dbe7c98878ccc 100644 --- a/test/parallel/test-stream-iter-pull-async.js +++ b/test/parallel/test-stream-iter-pull-async.js @@ -233,6 +233,19 @@ async function testPullStatelessTransformFlush() { assert.strictEqual(data, 'data-TRAILER'); } +// Consecutive stateless transforms each receive a final flush signal after +// upstream flush output has been processed. +async function testPullConsecutiveStatelessTransformFlush() { + const enc = new TextEncoder(); + const addAOnFlush = (chunks) => (chunks === null ? + [enc.encode('-A')] : chunks); + const addBOnFlush = (chunks) => (chunks === null ? + [enc.encode('-B')] : chunks); + + const data = await text(pull(from('x'), addAOnFlush, addBOnFlush)); + assert.strictEqual(data, 'x-A-B'); +} + // Stateless transform flush error propagates async function testPullStatelessTransformFlushError() { const badFlush = (chunks) => { @@ -357,6 +370,7 @@ async function testTransformOptionsNotShared() { testPullStatelessTransformError(), testPullStatefulTransformError(), testPullStatelessTransformFlush(), + testPullConsecutiveStatelessTransformFlush(), testPullStatelessTransformFlushError(), testPullWithSyncSource(), testPullStringSource(), diff --git a/test/parallel/test-stream-iter-pull-sync.js b/test/parallel/test-stream-iter-pull-sync.js index 35679ac102d512..c47a6b3f92330d 100644 --- a/test/parallel/test-stream-iter-pull-sync.js +++ b/test/parallel/test-stream-iter-pull-sync.js @@ -127,6 +127,20 @@ function testPullSyncStatelessTransformFlush() { assert.strictEqual(data, 'data-TRAILER'); } +// Consecutive stateless transforms each receive a final flush signal after +// upstream flush output has been processed. +function testPullSyncConsecutiveStatelessTransformFlush() { + const enc = new TextEncoder(); + const addAOnFlush = (chunks) => (chunks === null ? + [enc.encode('-A')] : chunks); + const addBOnFlush = (chunks) => (chunks === null ? + [enc.encode('-B')] : chunks); + + const data = new TextDecoder().decode(bytesSync( + pullSync(fromSync('x'), addAOnFlush, addBOnFlush))); + assert.strictEqual(data, 'x-A-B'); +} + // Stateless transform flush error propagates function testPullSyncStatelessTransformFlushError() { const badFlush = (chunks) => { @@ -173,6 +187,7 @@ Promise.all([ testPullSyncStatelessTransformError(), testPullSyncStatefulTransformError(), testPullSyncStatelessTransformFlush(), + testPullSyncConsecutiveStatelessTransformFlush(), testPullSyncStatelessTransformFlushError(), testPullSyncInvalidTransform(), ]).then(common.mustCall());