Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@
<!-- YAML
added: v15.0.0
changes:
- version:
- v19.7.0
- v18.16.0
pr-url: https://github.com/nodejs/node/pull/46307

Check warning on line 75 in doc/api/stream.md

View workflow job for this annotation

GitHub Actions / lint-pr-url

pr-url doesn't match the URL of the current PR.
description: Added support for webstreams.
- version:
- v18.0.0
- v17.2.0
Expand All @@ -79,13 +84,14 @@
ends.
-->

* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]}
* `source` {Stream|Iterable|AsyncIterable|Function}
* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]|
ReadableStream\[]|WritableStream\[]|TransformStream\[]}
* `source` {Stream|Iterable|AsyncIterable|Function|ReadableStream}
* Returns: {Promise|AsyncIterable}
* `...transforms` {Stream|Function}
* `...transforms` {Stream|Function|TransformStream}
* `source` {AsyncIterable}
* Returns: {Promise|AsyncIterable}
* `destination` {Stream|Function}
* `destination` {Stream|Function|WritableStream}
* `source` {AsyncIterable}
* Returns: {Promise|AsyncIterable}
* `options` {Object} Pipeline options
Expand Down
68 changes: 64 additions & 4 deletions lib/internal/streams/iter/pull.js
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,31 @@ function* processTransformResultSync(result) {
* @param {*} result
*/
function appendTransformResultSync(target, result) {
if (result === null) {
return;
}
if (isUint8ArrayBatch(result)) {
if (result.length > 0) {
ArrayPrototypePush(target, result);
}
return;
}
if (isUint8Array(result)) {
ArrayPrototypePush(target, [result]);
return;
}
if (typeof result === 'string') {
ArrayPrototypePush(target, [toUint8Array(result)]);
return;
}
if (isAnyArrayBuffer(result)) {
ArrayPrototypePush(target, [new Uint8Array(result)]);
return;
}
if (ArrayBufferIsView(result)) {
ArrayPrototypePush(target, [arrayBufferViewToUint8Array(result)]);
return;
}
for (const batch of processTransformResultSync(result)) {
ArrayPrototypePush(target, batch);
}
Expand Down Expand Up @@ -372,9 +397,38 @@ async function* processTransformResultAsync(result) {
* Append normalized transform result batches to an array (async).
* @param {Array<Uint8Array[]>} target
* @param {*} result
* @returns {Promise<void>}
* @returns {Promise<void>|undefined}
*/
async function appendTransformResultAsync(target, result) {
function appendTransformResultAsync(target, result) {
if (result === null) {
return;
}
if (isUint8ArrayBatch(result)) {
if (result.length > 0) {
ArrayPrototypePush(target, result);
}
return;
}
if (isUint8Array(result)) {
ArrayPrototypePush(target, [result]);
return;
}
if (typeof result === 'string') {
ArrayPrototypePush(target, [toUint8Array(result)]);
return;
}
if (isAnyArrayBuffer(result)) {
ArrayPrototypePush(target, [new Uint8Array(result)]);
return;
}
if (ArrayBufferIsView(result)) {
ArrayPrototypePush(target, [arrayBufferViewToUint8Array(result)]);
return;
}
return appendTransformResultAsyncSlow(target, result);
}

async function appendTransformResultAsyncSlow(target, result) {
for await (const batch of processTransformResultAsync(result)) {
ArrayPrototypePush(target, batch);
}
Expand Down Expand Up @@ -553,13 +607,19 @@ async function* applyFusedStatelessAsyncTransforms(source, run, signal) {
for (let i = 0; i < run.length; i++) {
const next = [];
for (let j = 0; j < pending.length; j++) {
await appendTransformResultAsync(
const pendingResult = appendTransformResultAsync(
next,
run[i](pending[j], { __proto__: null, signal }));
if (pendingResult !== undefined) {
await pendingResult;
}
}
await appendTransformResultAsync(
const flushResult = appendTransformResultAsync(
next,
run[i](null, { __proto__: null, signal }));
if (flushResult !== undefined) {
await flushResult;
}
pending = next;
}
for (let i = 0; i < pending.length; i++) {
Expand Down
Loading