Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 47 additions & 30 deletions lib/internal/streams/iter/pull.js
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,17 @@ function* processTransformResultSync(result) {
result);
}

/**
* Append normalized transform result batches to an array (sync).
* @param {Array<Uint8Array[]>} target
* @param {*} result
*/
function appendTransformResultSync(target, result) {
for (const batch of processTransformResultSync(result)) {
ArrayPrototypePush(target, batch);
}
}

/**
* Process transform result (async).
* @yields {Uint8Array[]}
Expand Down Expand Up @@ -356,6 +367,18 @@ async function* processTransformResultAsync(result) {
result);
}

/**
* Append normalized transform result batches to an array (async).
* @param {Array<Uint8Array[]>} target
* @param {*} result
* @returns {Promise<void>}
*/
async function appendTransformResultAsync(target, result) {
for await (const batch of processTransformResultAsync(result)) {
ArrayPrototypePush(target, batch);
}
}

// =============================================================================
// Sync Pipeline Implementation
// =============================================================================
Expand Down Expand Up @@ -398,18 +421,19 @@ function* applyFusedStatelessSyncTransforms(source, run) {
yield* processTransformResultSync(current);
}
}
// Flush
let current = null;
// Flush each transform after all upstream data, including data emitted by
// earlier flushes, has been processed by that transform.
let pending = [];
for (let i = 0; i < run.length; i++) {
const result = run[i](current);
if (result === null) {
current = null;
continue;
const next = [];
for (let j = 0; j < pending.length; j++) {
appendTransformResultSync(next, run[i](pending[j]));
}
current = result;
appendTransformResultSync(next, run[i](null));
pending = next;
}
if (current != null) {
yield* processTransformResultSync(current);
for (let i = 0; i < pending.length; i++) {
yield pending[i];
}
}

Expand Down Expand Up @@ -522,30 +546,23 @@ async function* applyFusedStatelessAsyncTransforms(source, run, signal) {
yield* processTransformResultAsync(current);
}
}
// Flush: send null through each transform in order
let current = null;
// Flush each transform after all upstream data, including data emitted by
// earlier flushes, has been processed by that transform.
let pending = [];
for (let i = 0; i < run.length; i++) {
const result = run[i](current, { __proto__: null, signal });
if (result === null) {
current = null;
continue;
}
if (isPromise(result)) {
current = await result;
} else {
current = result;
const next = [];
for (let j = 0; j < pending.length; j++) {
await appendTransformResultAsync(
next,
run[i](pending[j], { __proto__: null, signal }));
}
await appendTransformResultAsync(
next,
run[i](null, { __proto__: null, signal }));
pending = next;
}
if (current !== null) {
if (isUint8ArrayBatch(current)) {
if (current.length > 0) yield current;
} else if (isUint8Array(current)) {
yield [current];
} else if (typeof current === 'string') {
yield [toUint8Array(current)];
} else {
yield* processTransformResultAsync(current);
}
for (let i = 0; i < pending.length; i++) {
yield pending[i];
}
}

Expand Down
14 changes: 14 additions & 0 deletions test/parallel/test-stream-iter-pull-async.js
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,19 @@ async function testPullStatelessTransformFlush() {
assert.strictEqual(data, 'data-TRAILER');
}

// Consecutive stateless transforms each receive a final flush signal after
// upstream flush output has been processed.
async function testPullConsecutiveStatelessTransformFlush() {
const enc = new TextEncoder();
const addAOnFlush = (chunks) => (chunks === null ?
[enc.encode('-A')] : chunks);
const addBOnFlush = (chunks) => (chunks === null ?
[enc.encode('-B')] : chunks);

const data = await text(pull(from('x'), addAOnFlush, addBOnFlush));
assert.strictEqual(data, 'x-A-B');
}

// Stateless transform flush error propagates
async function testPullStatelessTransformFlushError() {
const badFlush = (chunks) => {
Expand Down Expand Up @@ -357,6 +370,7 @@ async function testTransformOptionsNotShared() {
testPullStatelessTransformError(),
testPullStatefulTransformError(),
testPullStatelessTransformFlush(),
testPullConsecutiveStatelessTransformFlush(),
testPullStatelessTransformFlushError(),
testPullWithSyncSource(),
testPullStringSource(),
Expand Down
15 changes: 15 additions & 0 deletions test/parallel/test-stream-iter-pull-sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,20 @@ function testPullSyncStatelessTransformFlush() {
assert.strictEqual(data, 'data-TRAILER');
}

// Consecutive stateless transforms each receive a final flush signal after
// upstream flush output has been processed.
function testPullSyncConsecutiveStatelessTransformFlush() {
const enc = new TextEncoder();
const addAOnFlush = (chunks) => (chunks === null ?
[enc.encode('-A')] : chunks);
const addBOnFlush = (chunks) => (chunks === null ?
[enc.encode('-B')] : chunks);

const data = new TextDecoder().decode(bytesSync(
pullSync(fromSync('x'), addAOnFlush, addBOnFlush)));
assert.strictEqual(data, 'x-A-B');
}

// Stateless transform flush error propagates
function testPullSyncStatelessTransformFlushError() {
const badFlush = (chunks) => {
Expand Down Expand Up @@ -173,6 +187,7 @@ Promise.all([
testPullSyncStatelessTransformError(),
testPullSyncStatefulTransformError(),
testPullSyncStatelessTransformFlush(),
testPullSyncConsecutiveStatelessTransformFlush(),
testPullSyncStatelessTransformFlushError(),
testPullSyncInvalidTransform(),
]).then(common.mustCall());
Loading