On this page

Iterable Streams

History
Source Code: lib/stream/iter.js
Stability: 1Experimental

The node:stream/iter module provides a streaming API built on iterables rather than the event-driven Readable/Writable/Transform class hierarchy, or the Web Streams ReadableStream/WritableStream/TransformStream interfaces.

This module is available only when the --experimental-stream-iter CLI flag is enabled.

Streams are represented as AsyncIterable<Uint8Array[]> (async) or Iterable<Uint8Array[]> (sync). There are no base classes to extend -- any object implementing the iterable protocol can participate. Transforms are plain functions or objects with a transform method.

Data flows in batches (Uint8Array[] per iteration) to amortize the cost of async operations.

import { from, pull, text } from 'node:stream/iter';
import { compressGzip, decompressGzip } from 'node:zlib/iter';

// Compress and decompress a string
const compressed = pull(from('Hello, world!'), compressGzip());
const result = await text(pull(compressed, decompressGzip()));
console.log(result); // 'Hello, world!'

All data in this API is represented as Uint8Array bytes. Strings are automatically UTF-8 encoded when passed to from(), push(), or pipeTo(). This removes ambiguity around encodings and enables zero-copy transfers between streams and native code.

Each iteration yields a batch -- an array of Uint8Array chunks (Uint8Array[]). Batching amortizes the cost of await and Promise creation across multiple chunks. A consumer that processes one chunk at a time can simply iterate the inner array:

for await (const batch of source) {
  for (const chunk of batch) {
    handle(chunk);
  }
}

Transforms come in two forms:

  • Stateless -- a function (chunks, options) => result called once per batch. Receives Uint8Array[] (or null as the flush signal) and an options object. Returns Uint8Array[], null, or an iterable of chunks.

  • Stateful -- an object { transform(source, options) } where transform is a generator (sync or async) that receives the entire upstream iterable and an options object, and yields output. This form is used for compression, encryption, and any transform that needs to buffer across batches.

Both forms receive an options parameter with the following property:

Attributes
options.signal:<AbortSignal>
An AbortSignal that fires when the pipeline is cancelled, encounters an error, or the consumer stops reading. Transforms can check  signal.aborted or listen for the 'abort' event to perform early cleanup.

The flush signal (null) is sent after the source ends, giving transforms a chance to emit trailing data (e.g., compression footers).

// Stateless: uppercase transform
const upper = (chunks) => {
  if (chunks === null) return null; // flush
  return chunks.map((c) => new TextEncoder().encode(
    new TextDecoder().decode(c).toUpperCase(),
  ));
};

// Stateful: line splitter
const lines = {
  transform: async function*(source) {
    let partial = '';
    for await (const chunks of source) {
      if (chunks === null) {
        if (partial) yield [new TextEncoder().encode(partial)];
        continue;
      }
      for (const chunk of chunks) {
        const str = partial + new TextDecoder().decode(chunk);
        const parts = str.split('\n');
        partial = parts.pop();
        for (const line of parts) {
          yield [new TextEncoder().encode(`${line}\n`)];
        }
      }
    }
  },
};

The API supports two models:

  • Pull -- data flows on demand. pull() and pullSync() create lazy pipelines that only read from the source when the consumer iterates.

  • Push -- data is written explicitly. push() creates a writer/readable pair with backpressure. The writer pushes data in; the readable is consumed as an async iterable.

Pull streams have natural backpressure -- the consumer drives the pace, so the source is never read faster than the consumer can process. Push streams need explicit backpressure because the producer and consumer run independently. The highWaterMark and backpressure options on push(), broadcast(), and share() control how this works.

Push streams use a two-part buffering system. Think of it like a bucket (slots) being filled through a hose (pending writes), with a float valve that closes when the bucket is full:

                          highWaterMark (e.g., 3)
                                 |
    Producer                     v
       |                    +---------+
       v                    |         |
  [ write() ] ----+    +--->| slots   |---> Consumer pulls
  [ write() ]     |    |    | (bucket)|     for await (...)
  [ write() ]     v    |    +---------+
              +--------+         ^
              | pending|         |
              | writes |    float valve
              | (hose) |    (backpressure)
              +--------+
                   ^
                   |
          'strict' mode limits this too!
  • Slots (the bucket) -- data ready for the consumer, capped at highWaterMark. When the consumer pulls, it drains all slots at once into a single batch.

  • Pending writes (the hose) -- writes waiting for slot space. After the consumer drains, pending writes are promoted into the now-empty slots and their promises resolve.

How each policy uses these buffers:

PolicySlots limitPending writes limit
'strict'highWaterMarkhighWaterMark
'block'highWaterMarkUnbounded
'drop-oldest'highWaterMarkN/A (never waits)
'drop-newest'highWaterMarkN/A (never waits)

Strict mode catches "fire-and-forget" patterns where the producer calls write() without awaiting, which would cause unbounded memory growth. It limits both the slots buffer and the pending writes queue to highWaterMark.

If you properly await each write, you can only ever have one pending write at a time (yours), so you never hit the pending writes limit. Unawaited writes accumulate in the pending queue and throw once it overflows:

import { push, text } from 'node:stream/iter';

const { writer, readable } = push({ highWaterMark: 16 });

// Consumer must run concurrently -- without it, the first write
// that fills the buffer blocks the producer forever.
const consuming = text(readable);

// GOOD: awaited writes. The producer waits for the consumer to
// make room when the buffer is full.
for (const item of dataset) {
  await writer.write(item);
}
await writer.end();
console.log(await consuming);

Forgetting to await will eventually throw:

// BAD: fire-and-forget. Strict mode throws once both buffers fill.
for (const item of dataset) {
  writer.write(item); // Not awaited -- queues without bound
}
// --> throws "Backpressure violation: too many pending writes"

Block mode caps slots at highWaterMark but places no limit on the pending writes queue. Awaited writes block until the consumer makes room, just like strict mode. The difference is that unawaited writes silently queue forever instead of throwing -- a potential memory leak if the producer forgets to await.

This is the mode that existing Node.js classic streams and Web Streams default to. Use it when you control the producer and know it awaits properly, or when migrating code from those APIs.

import { push, text } from 'node:stream/iter';

const { writer, readable } = push({
  highWaterMark: 16,
  backpressure: 'block',
});

const consuming = text(readable);

// Safe -- awaited writes block until the consumer reads.
for (const item of dataset) {
  await writer.write(item);
}
await writer.end();
console.log(await consuming);

Writes never wait. When the slots buffer is full, the oldest buffered chunk is evicted to make room for the incoming write. The consumer always sees the most recent data. Useful for live feeds, telemetry, or any scenario where stale data is less valuable than current data.

import { push } from 'node:stream/iter';

// Keep only the 5 most recent readings
const { writer, readable } = push({
  highWaterMark: 5,
  backpressure: 'drop-oldest',
});

Writes never wait. When the slots buffer is full, the incoming write is silently discarded. The consumer processes what is already buffered without being overwhelmed by new data. Useful for rate-limiting or shedding load under pressure.

import { push } from 'node:stream/iter';

// Accept up to 10 buffered items; discard anything beyond that
const { writer, readable } = push({
  highWaterMark: 10,
  backpressure: 'drop-newest',
});

A writer is any object conforming to the Writer interface. Only write() is required; all other methods are optional.

Each async method has a synchronous *Sync counterpart designed for a try-fallback pattern: attempt the fast synchronous path first, and fall back to the async version only when the synchronous call indicates it could not complete:

if (!writer.writeSync(chunk)) await writer.write(chunk);
if (!writer.writevSync(chunks)) await writer.writev(chunks);
if (writer.endSync() < 0) await writer.end();
writer.fail(err);  // Always synchronous, no fallback needed
Attributes

The number of buffer slots available before the high water mark is reached. Returns null if the writer is closed or the consumer has disconnected.

The value is always non-negative.

writer.end(options?): Promise<number>
Attributes
options:<Object>
Cancel just this operation. The signal cancels only the pending  end() call; it does not fail the writer itself.
Returns:
{Promise } Total bytes written.

Signal that no more data will be written.

writer.endSync(): number
Returns:<number>
Total bytes written, or  -1 if the writer is not open.

Synchronous variant of writer.end(). Returns -1 if the writer is already closed or errored. Can be used as a try-fallback pattern:

const result = writer.endSync();
if (result < 0) {
  writer.end();
}
writer.fail(reason): void
Attributes
reason:<any>

Put the writer into a terminal error state. If the writer is already closed or errored, this is a no-op. Unlike write() and end(), fail() is unconditionally synchronous because failing a writer is a pure state transition with no async work to perform.

write(chunk, options?): Promise<void>
Attributes
options:<Object>
Cancel just this write operation. The signal cancels only the pending  write() call; it does not fail the writer itself.
Returns:
{Promise }

Write a chunk. The promise resolves when buffer space is available.

writer.writeSync(chunk): boolean
Attributes
Returns:<boolean>
true if the write was accepted, false if the buffer is full.

Synchronous write. Does not block; returns false if backpressure is active.

writer.writev(chunks, options?): Promise<void>
Attributes
options:<Object>
Cancel just this write operation. The signal cancels only the pending  writev() call; it does not fail the writer itself.
Returns:
{Promise }

Write multiple chunks as a single batch.

writer.writevSync(chunks): boolean
Attributes
Returns:<boolean>
true if the write was accepted, false if the buffer is full.

Synchronous batch write.

All functions are available both as named exports and as properties of the Stream namespace object:

// Named exports
import { from, pull, bytes, Stream } from 'node:stream/iter';

// Namespace access
Stream.from('hello');

Including the node: prefix on the module specifier is optional.

M

from

History
from(input): AsyncIterable<Uint8Array[]>
Attributes
Must not be  null or undefined .
Returns:
{AsyncIterable<Uint8Array[]>}

Create an async byte stream from the given input. Strings are UTF-8 encoded. ArrayBuffer and ArrayBufferView values are wrapped as Uint8Array. Arrays and iterables are recursively flattened and normalized.

Objects implementing Symbol.for('Stream.toAsyncStreamable') or Symbol.for('Stream.toStreamable') are converted via those protocols. The toAsyncStreamable protocol takes precedence over toStreamable, which takes precedence over the iteration protocols (Symbol.asyncIterator, Symbol.iterator).

import { Buffer } from 'node:buffer';
import { from, text } from 'node:stream/iter';

console.log(await text(from('hello')));       // 'hello'
console.log(await text(from(Buffer.from('hello')))); // 'hello'
M

fromSync

History
fromSync(input): Iterable<Uint8Array[]>
Attributes
Must not be  null or undefined .
Returns:
{Iterable<Uint8Array[]>}

Synchronous version of from(). Returns a sync iterable. Cannot accept async iterables or promises. Objects implementing Symbol.for('Stream.toStreamable') are converted via that protocol (takes precedence over Symbol.iterator). The toAsyncStreamable protocol is ignored entirely.

import { fromSync, textSync } from 'node:stream/iter';

console.log(textSync(fromSync('hello'))); // 'hello'
M

pipeTo

History
pipeTo(source, ...transforms?, writer, options?): Promise<number>
Attributes
The data source.
...transforms:<Function> | <Object>
Zero or more transforms to apply.
writer:<Object>
Destination with  write(chunk) method.
options:<Object>
Abort the pipeline.
preventClose?:<boolean>
If  true , do not call writer.end() when the source ends. Default: false .
preventFail?:<boolean>
If  true , do not call writer.fail() on error. Default: false .
Returns:
{Promise } Total bytes written.

Pipe a source through transforms into a writer. If the writer has a writev(chunks) method, entire batches are passed in a single call (enabling scatter/gather I/O).

If the writer implements the optional *Sync methods (writeSync, writevSync, endSync), pipeTo() will attempt to use the synchronous methods first as a fast path, and fall back to the async versions only when the sync methods indicate they cannot complete (e.g., backpressure or waiting for the next tick). fail() is always called synchronously.

import { from, pipeTo } from 'node:stream/iter';
import { compressGzip } from 'node:zlib/iter';
import { open } from 'node:fs/promises';

const fh = await open('output.gz', 'w');
const totalBytes = await pipeTo(
  from('Hello, world!'),
  compressGzip(),
  fh.writer({ autoClose: true }),
);
M

pipeToSync

History
pipeToSync(source, ...transforms?, writer, options?): void
  • source {Iterable} The sync data source.
  • ...transforms <Function> | <Object> Zero or more sync transforms.
  • writer <Object> Destination with write(chunk) method.
  • options <Object>
    Attributes
    preventClose?:<boolean>
    Default: false .
    preventFail?:<boolean>
    Default: false .
  • Returns: <number> Total bytes written.

Synchronous version of pipeTo(). The source, all transforms, and the writer must be synchronous. Cannot accept async iterables or promises.

The writer must have the *Sync methods (writeSync, writevSync, endSync) and fail() for this to work.

M

pull

History
pull(source, ...transforms?, options?): AsyncIterable<Uint8Array[]>
Attributes
The data source.
...transforms:<Function> | <Object>
Zero or more transforms to apply.
options:<Object>
Abort the pipeline.
Returns:
{AsyncIterable<Uint8Array[]>}

Create a lazy async pipeline. Data is not read from source until the returned iterable is consumed. Transforms are applied in order.

import { from, pull, text } from 'node:stream/iter';

const asciiUpper = (chunks) => {
  if (chunks === null) return null;
  return chunks.map((c) => {
    for (let i = 0; i < c.length; i++) {
      c[i] -= (c[i] >= 97 && c[i] <= 122) * 32;
    }
    return c;
  });
};

const result = pull(from('hello'), asciiUpper);
console.log(await text(result)); // 'HELLO'

Using an AbortSignal:

import { pull } from 'node:stream/iter';

const ac = new AbortController();
const result = pull(source, transform, { signal: ac.signal });
ac.abort(); // Pipeline throws AbortError on next iteration
M

pullSync

History
pullSync(source, ...transforms?): void
  • source {Iterable} The sync data source.
  • ...transforms <Function> | <Object> Zero or more sync transforms.
  • Returns: {Iterable<Uint8Array[]>}

Synchronous version of pull(). All transforms must be synchronous.

M

push

History
push(...transforms?, options?): Object
Attributes
...transforms:<Function> | <Object>
Optional transforms applied to the readable side.
options:<Object>
highWaterMark?:<number>
Maximum number of buffered slots before backpressure is applied. Must be >= 1; values below 1 are clamped to 1.  Default: 4 .
backpressure?:<string>
Backpressure policy:  'strict' , 'block' , 'drop-oldest' , or 'drop-newest' . Default: 'strict' .
Abort the stream.
Returns:<Object>
writer:
{PushWriter} The writer side.
readable:
{AsyncIterable<Uint8Array[]>} The readable side.

Create a push stream with backpressure. The writer pushes data in; the readable side is consumed as an async iterable.

import { push, text } from 'node:stream/iter';

const { writer, readable } = push();

// Producer and consumer must run concurrently. With strict backpressure
// (the default), awaited writes block until the consumer reads.
const producing = (async () => {
  await writer.write('hello');
  await writer.write(' world');
  await writer.end();
})();

console.log(await text(readable)); // 'hello world'
await producing;

The writer returned by push() conforms to the [Writer interface][].

M

duplex

History
duplex(options?): Array
Attributes
options:<Object>
highWaterMark?:<number>
Buffer size for both directions.  Default: 4 .
backpressure?:<string>
Policy for both directions.  Default: 'strict' .
Cancellation signal for both channels.
Options specific to the A-to-B direction. Overrides shared options.
highWaterMark:<number>
backpressure:<string>
Options specific to the B-to-A direction. Overrides shared options.
highWaterMark:<number>
backpressure:<string>
Returns:<Array>
A pair  [channelA, channelB] of duplex channels.

Create a pair of connected duplex channels for bidirectional communication, similar to socketpair(). Data written to one channel's writer appears in the other channel's readable.

Each channel has:

  • writer — a [Writer interface][] object for sending data to the peer.
  • readable — an AsyncIterable<Uint8Array[]> for reading data from the peer.
  • close() — close this end of the channel (idempotent).
  • [Symbol.asyncDispose]() — async dispose support for await using.
import { duplex, text } from 'node:stream/iter';

const [client, server] = duplex();

// Server echoes back
const serving = (async () => {
  for await (const chunks of server.readable) {
    await server.writer.writev(chunks);
  }
})();

await client.writer.write('hello');
await client.writer.end();

console.log(await text(server.readable)); // handled by echo
await serving;
M

array

History
array(source, options?): void
  • source {AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}
  • options <Object>
    Attributes
    limit:<number>
    Maximum number of bytes to consume. If the total bytes collected exceeds limit, an  ERR_OUT_OF_RANGE error is thrown
  • Returns: {Promise<Uint8Array[]>}

Collect all chunks as an array of Uint8Array values (without concatenating).

M

arrayBuffer

History
arrayBuffer(source, options?): void
  • source {AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}
  • options <Object>
    Attributes
    limit:<number>
    Maximum number of bytes to consume. If the total bytes collected exceeds limit, an  ERR_OUT_OF_RANGE error is thrown
  • Returns: {Promise}

Collect all bytes into an ArrayBuffer.

M

arrayBufferSync

History
arrayBufferSync(source, options?): void
  • source {Iterable<Uint8Array[]>}
  • options <Object>
    Attributes
    limit:<number>
    Maximum number of bytes to consume. If the total bytes collected exceeds limit, an  ERR_OUT_OF_RANGE error is thrown
  • Returns: <ArrayBuffer>

Synchronous version of arrayBuffer().

M

arraySync

History
arraySync(source, options?): void
  • source {Iterable<Uint8Array[]>}
  • options <Object>
    Attributes
    limit:<number>
    Maximum number of bytes to consume. If the total bytes collected exceeds limit, an  ERR_OUT_OF_RANGE error is thrown
  • Returns: <Uint8Array[]>

Synchronous version of array().

M

bytes

History
bytes(source, options?): void
  • source {AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}
  • options <Object>
    Attributes
    limit:<number>
    Maximum number of bytes to consume. If the total bytes collected exceeds limit, an  ERR_OUT_OF_RANGE error is thrown
  • Returns: {Promise}

Collect all bytes from a stream into a single Uint8Array.

import { from, bytes } from 'node:stream/iter';

const data = await bytes(from('hello'));
console.log(data); // Uint8Array(5) [ 104, 101, 108, 108, 111 ]
M

bytesSync

History
bytesSync(source, options?): void
  • source {Iterable<Uint8Array[]>}
  • options <Object>
    Attributes
    limit:<number>
    Maximum number of bytes to consume. If the total bytes collected exceeds limit, an  ERR_OUT_OF_RANGE error is thrown
  • Returns: <Uint8Array>

Synchronous version of bytes().

M

text

History
text(source, options?): void
  • source {AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>}
  • options <Object>
    Attributes
    encoding?:<string>
    Text encoding.  Default: 'utf-8' .
    limit:<number>
    Maximum number of bytes to consume. If the total bytes collected exceeds limit, an  ERR_OUT_OF_RANGE error is thrown
  • Returns: {Promise}

Collect all bytes and decode as text.

import { from, text } from 'node:stream/iter';

console.log(await text(from('hello'))); // 'hello'
M

textSync

History
textSync(source, options?): void
  • source {Iterable<Uint8Array[]>}
  • options <Object>
    Attributes
    encoding?:<string>
    Default: 'utf-8' .
    limit:<number>
    Maximum number of bytes to consume. If the total bytes collected exceeds limit, an  ERR_OUT_OF_RANGE error is thrown
  • Returns: <string>

Synchronous version of text().

M

ondrain

History
ondrain(drainable): Promise<boolean> | null
Attributes
drainable:<Object>
An object implementing the drainable protocol.
Returns:
{Promise |null}

Wait for a drainable writer's backpressure to clear. Returns a promise that resolves to true when the writer can accept more data, or null if the object does not implement the drainable protocol.

import { push, ondrain, text } from 'node:stream/iter';

const { writer, readable } = push({ highWaterMark: 2 });
writer.writeSync('a');
writer.writeSync('b');

// Start consuming so the buffer can actually drain
const consuming = text(readable);

// Buffer is full -- wait for drain
const canWrite = await ondrain(writer);
if (canWrite) {
  await writer.write('c');
}
await writer.end();
await consuming;
M

merge

History
merge(...sources, options?): void
  • ...sources {AsyncIterable<Uint8Array[]>|Iterable<Uint8Array[]>} Two or more iterables.
  • options <Object>
    Attributes
  • Returns: {AsyncIterable<Uint8Array[]>}

Merge multiple async iterables by yielding batches in temporal order (whichever source produces data first). All sources are consumed concurrently.

import { from, merge, text } from 'node:stream/iter';

const merged = merge(from('hello '), from('world'));
console.log(await text(merged)); // Order depends on timing
M

tap

History
tap(callback): Function
Attributes
callback:<Function>
(chunks) => void Called with each batch.
Returns:<Function>
A stateless transform.

Create a pass-through transform that observes batches without modifying them. Useful for logging, metrics, or debugging.

import { from, pull, text, tap } from 'node:stream/iter';

const result = pull(
  from('hello'),
  tap((chunks) => console.log('Batch size:', chunks.length)),
);
console.log(await text(result));

tap() intentionally does not prevent in-place modification of the chunks by the tapping callback; but return values are ignored.

M

tapSync

History
tapSync(callback): Function
Attributes
callback:<Function>
Returns:<Function>

Synchronous version of tap().

M

broadcast

History
broadcast(options?): Object
Attributes
options:<Object>
highWaterMark?:<number>
Buffer size in slots. Must be >= 1; values below 1 are clamped to 1.  Default: 16 .
backpressure?:<string>
'strict' , 'block' , 'drop-oldest' , or 'drop-newest' . Default: 'strict' .
Returns:<Object>
writer:
{BroadcastWriter}
broadcast:
{Broadcast}

Create a push-model multi-consumer broadcast channel. A single writer pushes data to multiple consumers. Each consumer has an independent cursor into a shared buffer.

import { broadcast, text } from 'node:stream/iter';

const { writer, broadcast: bc } = broadcast();

// Create consumers before writing
const c1 = bc.push();  // Consumer 1
const c2 = bc.push();  // Consumer 2

// Producer and consumers must run concurrently. Awaited writes
// block when the buffer fills until consumers read.
const producing = (async () => {
  await writer.write('hello');
  await writer.end();
})();

const [r1, r2] = await Promise.all([text(c1), text(c2)]);
console.log(r1); // 'hello'
console.log(r2); // 'hello'
await producing;
Attributes

The number of chunks currently buffered.

broadcast.cancel(reason?): void
Attributes
reason:<Error>

Cancel the broadcast. All consumers receive an error.

Attributes

The number of active consumers.

broadcast.push(...transforms?, options?): AsyncIterable<Uint8Array[]>
Attributes
...transforms:<Function> | <Object>
options:<Object>
Returns:
{AsyncIterable<Uint8Array[]>}

Create a new consumer. Each consumer receives all data written to the broadcast from the point of subscription onward. Optional transforms are applied to this consumer's view of the data.

broadcast[Symbol.dispose](): void

Alias for broadcast.cancel().

M

Broadcast.from

History
Broadcast.from(input, options?): Object
Attributes
options:<Object>
Same as  broadcast() .
Returns:<Object>
{ writer, broadcast }

Create a {Broadcast} from an existing source. The source is consumed automatically and pushed to all subscribers.

M

share

History
share(source, options?): Share
Attributes
The source to share.
options:<Object>
highWaterMark?:<number>
Buffer size. Must be >= 1; values below 1 are clamped to 1.  Default: 16 .
backpressure?:<string>
'strict' , 'block' , 'drop-oldest' , or 'drop-newest' . Default: 'strict' .
Returns:
{Share}

Create a pull-model multi-consumer shared stream. Unlike broadcast(), the source is only read when a consumer pulls. Multiple consumers share a single buffer.

import { from, share, text } from 'node:stream/iter';

const shared = share(from('hello'));

const c1 = shared.pull();
const c2 = shared.pull();

// Consume concurrently to avoid deadlock with small buffers.
const [r1, r2] = await Promise.all([text(c1), text(c2)]);
console.log(r1); // 'hello'
console.log(r2); // 'hello'
Attributes

The number of chunks currently buffered.

share.cancel(reason?): void
Attributes
reason:<Error>

Cancel the share. All consumers receive an error.

Attributes

The number of active consumers.

share.pull(...transforms?, options?): AsyncIterable<Uint8Array[]>
Attributes
...transforms:<Function> | <Object>
options:<Object>
Returns:
{AsyncIterable<Uint8Array[]>}

Create a new consumer of the shared source.

share[Symbol.dispose](): void

Alias for share.cancel().

M

Share.from

History
Share.from(input, options?): Share
Attributes
options:<Object>
Same as  share() .
Returns:
{Share}

Create a {Share} from an existing source.

M

shareSync

History
shareSync(source, options?): void
  • source {Iterable} The sync source to share.
  • options <Object>
    Attributes
    highWaterMark?:<number>
    Must be >= 1; values below 1 are clamped to 1.  Default: 16 .
    backpressure?:<string>
    Default: 'strict' .
  • Returns: {SyncShare}

Synchronous version of share().

M

SyncShare.fromSync

History
SyncShare.fromSync(input, options?): void
  • input {Iterable|SyncShareable}
  • options <Object>
  • Returns: {SyncShare}

Compression and decompression transforms for use with pull(), pullSync(), pipeTo(), and pipeToSync() are available via the node:zlib/iter module. See the node:zlib/iter documentation for details.

These well-known symbols allow third-party objects to participate in the streaming protocol without importing from node:stream/iter directly.

  • Value: Symbol.for('Stream.broadcastProtocol')

The value must be a function. When called by Broadcast.from(), it receives the options passed to Broadcast.from() and must return an object conforming to the {Broadcast} interface. The implementation is fully custom -- it can manage consumers, buffering, and backpressure however it wants.

import { Broadcast, text } from 'node:stream/iter';

// This example defers to the built-in Broadcast, but a custom
// implementation could use any mechanism.
class MessageBus {
  #broadcast;
  #writer;

  constructor() {
    const { writer, broadcast } = Broadcast();
    this.#writer = writer;
    this.#broadcast = broadcast;
  }

  [Symbol.for('Stream.broadcastProtocol')](options) {
    return this.#broadcast;
  }

  send(data) {
    this.#writer.write(new TextEncoder().encode(data));
  }

  close() {
    this.#writer.end();
  }
}

const bus = new MessageBus();
const { broadcast } = Broadcast.from(bus);
const consumer = broadcast.push();
bus.send('hello');
bus.close();
console.log(await text(consumer)); // 'hello'
  • Value: Symbol.for('Stream.drainableProtocol')

Implement to make a writer compatible with ondrain(). The method should return a promise that resolves when backpressure clears, or null if no backpressure.

import { ondrain } from 'node:stream/iter';

class CustomWriter {
  #queue = [];
  #drain = null;
  #closed = false;
  [Symbol.for('Stream.drainableProtocol')]() {
    if (this.#closed) return null;
    if (this.#queue.length < 3) return Promise.resolve(true);
    this.#drain ??= Promise.withResolvers();
    return this.#drain.promise;
  }
  write(chunk) {
    this.#queue.push(chunk);
  }
  flush() {
    this.#queue.length = 0;
    this.#drain?.resolve(true);
    this.#drain = null;
  }
  close() {
    this.#closed = true;
  }
}
const writer = new CustomWriter();
const ready = ondrain(writer);
console.log(ready); // Promise { true } -- no backpressure
  • Value: Symbol.for('Stream.shareProtocol')

The value must be a function. When called by Share.from(), it receives the options passed to Share.from() and must return an object conforming the the {Share} interface. The implementation is fully custom -- it can manage the shared source, consumers, buffering, and backpressure however it wants.

import { share, Share, text } from 'node:stream/iter';

// This example defers to the built-in share(), but a custom
// implementation could use any mechanism.
class DataPool {
  #share;

  constructor(source) {
    this.#share = share(source);
  }

  [Symbol.for('Stream.shareProtocol')](options) {
    return this.#share;
  }
}

const pool = new DataPool(
  (async function* () {
    yield 'hello';
  })(),
);

const shared = Share.from(pool);
const consumer = shared.pull();
console.log(await text(consumer)); // 'hello'
  • Value: Symbol.for('Stream.shareSyncProtocol')

The value must be a function. When called by SyncShare.fromSync(), it receives the options passed to SyncShare.fromSync() and must return an object conforming to the {SyncShare} interface. The implementation is fully custom -- it can manage the shared source, consumers, and buffering however it wants.

import { shareSync, SyncShare, textSync } from 'node:stream/iter';

// This example defers to the built-in shareSync(), but a custom
// implementation could use any mechanism.
class SyncDataPool {
  #share;

  constructor(source) {
    this.#share = shareSync(source);
  }

  [Symbol.for('Stream.shareSyncProtocol')](options) {
    return this.#share;
  }
}

const encoder = new TextEncoder();
const pool = new SyncDataPool(
  function* () {
    yield [encoder.encode('hello')];
  }(),
);

const shared = SyncShare.fromSync(pool);
const consumer = shared.pull();
console.log(textSync(consumer)); // 'hello'
  • Value: Symbol.for('Stream.toAsyncStreamable')

The value must be a function that converts the object into a streamable value. When the object is encountered anywhere in the streaming pipeline (as a source passed to from(), or as a value returned from a transform), this method is called to produce the actual data. It may return (or resolve to) any streamable value: a string, Uint8Array, AsyncIterable, Iterable, or another streamable object.

import { from, text } from 'node:stream/iter';

class Greeting {
  #name;

  constructor(name) {
    this.#name = name;
  }

  [Symbol.for('Stream.toAsyncStreamable')]() {
    return `hello ${this.#name}`;
  }
}

const stream = from(new Greeting('world'));
console.log(await text(stream)); // 'hello world'
  • Value: Symbol.for('Stream.toStreamable')

The value must be a function that synchronously converts the object into a streamable value. When the object is encountered anywhere in the streaming pipeline (as a source passed to fromSync(), or as a value returned from a sync transform), this method is called to produce the actual data. It must synchronously return a streamable value: a string, Uint8Array, or Iterable.

import { fromSync, textSync } from 'node:stream/iter';

class Greeting {
  #name;

  constructor(name) {
    this.#name = name;
  }

  [Symbol.for('Stream.toStreamable')]() {
    return `hello ${this.#name}`;
  }
}

const stream = fromSync(new Greeting('world'));
console.log(textSync(stream)); // 'hello world'