stream.js

/**
 * Stream API
 *
 * A stream is an abstract interface for working with streaming data, and
 * is based on generators and async-iterators.
 *
 * @see {@link https://youtu.be/YVdw1MDHVZs}
 * @see {@link https://github.com/aalykiot/dune/blob/main/docs/the-stream-guide.md}
 *
 * @module Stream
 */

import assert from 'assert';
import { EventEmitter } from 'events';

const isFunction = (value) => typeof value === 'function';
const isWritableStream = (value) => value?.write && value?.end;

const pipeStream = (iterator, stream, signal) => async () => {
  // Handle pipeline errors.
  signal.on('uncaughtStreamException', () => stream.end());

  // Consume the async iterator.
  for await (const chunk of iterator) {
    await stream.write(chunk);
  }
  stream.end();
};

const pipeDirect = (signal) => (source, target) => {
  // Check if the source is an async iterable.
  assert.true(
    isFunction(source[Symbol.asyncIterator]),
    'Source should be an async iterable.'
  );

  // Pattern match against target's type.
  if (isWritableStream(target)) return pipeStream(source, target, signal);
  if (isFunction(target)) return target(source, signal);

  throw new Error('Unrecognized target type.');
};

const wrap = (iterable, signal) => {
  // Wrap the async iterable object into a readable stream.
  const source = iterable[Symbol.asyncIterator](signal);
  const readable = async function* () {
    yield* source;
  };

  return readable();
};

/**
 * A module method to pipe between streams forwarding errors and properly cleaning up.
 *
 * @param {(AsyncGeneratorFunction|AsyncIterator)} source - The source stream from which data is read.
 * @param  {...AsyncGeneratorFunction} targets - One or more target streams where data from the source is written to.
 * @returns {Promise}
 */
export function pipeline(source, ...targets) {
  // The signal EE is used to signal the pipeline that an uncaught
  // exception has been thrown and the pipeline is broken.
  const signal = new EventEmitter();
  const sourceWrap = isFunction(source) ? source(signal) : wrap(source, signal);

  const stream = targets.reduce(pipeDirect(signal), sourceWrap);

  // Ensure that the pipeline is a closed circuit.
  if (!isFunction(stream)) {
    throw new Error('The last stream in the pipeline should be a writable.');
  }

  return stream().catch((err) => {
    signal.emit('uncaughtStreamException', err);
    throw err;
  });
}

/**
 * Combines two or more streams into a Duplex stream.
 *
 * @param  {...AsyncGeneratorFunction} targets - A series of streams to be combined into the Duplex stream.
 * @returns AsyncGeneratorFunction - The combined Duplex stream.
 */
export function compose(...targets) {
  // Ensure that the compose stream is an open circuit.
  const last = targets.length - 1;
  if (isWritableStream(targets[last])) {
    throw new Error(`The last stream should be an async generator function.`);
  }

  return function* composeGen(iterator, signal) {
    const stream = targets.reduce(pipeDirect(signal), iterator);
    yield* stream;
  };
}

/**
 * An alias of `pipeline()`.
 * @ignore
 */
export const pipe = pipeline;

export default { pipeline, compose, pipe };