Web Streams Everywhere (and Fetch for Node.js)


Chrome developer advocate Jake Archibald called 2016 “the year of web streams.” Clearly, his prediction was somewhat premature. The Streams Standard was announced back in 2014. It’s taken a while, but there’s now a consistent streaming API implemented in modern browsers (still waiting on Firefox…) and in Node (and Deno).

What are streams?

Streaming involves splitting a resource into smaller pieces called chunks and processing each chunk one at a time. Rather than needing to wait to complete the download of all the data, with streams you can process data progressively as soon as the first chunk is available.

There are three kinds of streams: readable streams, writable streams, and transform streams. Readable streams are where the chunks of data come from. The underlying data sources could be a file or HTTP connection, for example. The data can then (optionally) be modified by a transform stream. The chunks of data can then be piped to a writable stream.

Web streams everywhere

Node has always had it’s own type of streams. They are generally considered to be difficult to work with. The Web Hypertext Application Technology Working Group (WHATWG) web standard for streams came later, and are largely considered an improvement. The Node docs calls them “web streams” which sounds a bit less cumbersome. The original Node streams aren’t being deprecated or removed but they will now co-exist with the web standard stream API. This makes it easier to write cross-platform code and means developers only need to learn one way of doing things.

Deno, another attempt at server-side JavaScript by Node’s original creator, has always closely aligned with browser APIs and has full support for web streams. Cloudflare workers (which are a bit like service workers but running on CDN edge locations) and Deno Deploy (a serverless offering from Deno) also support streams.

fetch() response as a readable stream

There are multiple ways to create a readable stream, but calling fetch() is bound to be the most common. The response body of fetch() is a readable stream.

fetch('data.txt')
.then(response => console.log(response.body));

If you look at the console log you can see that a readable stream has several useful methods. As the spec says, A readable stream can be piped directly to a writable stream, using its pipeTo() method, or it can be piped through one or more transform streams first, using its pipeThrough() method.

Unlike browsers, Node core doesn’t currently implement fetch. node-fetch, a popular dependency that tries to match the API of the browser standard, returns a node stream, not a WHATWG stream. Undici, an improved HTTP/1.1 client from the Node.js team, is a modern alternative to the Node.js core http.request (which things like node-fetch and Axios are built on top of). Undici has implemented fetchand response.body does return a web stream. 🎉

Undici might end up in Node.js core eventually, and it looks set to become the recommended way to handle HTTP requests in Node. Once you npm install undici and import fetch, it works the same as in the browser. In the following example, we pipe the stream through a transform stream. Each chunk of the stream is a Uint8Array. Node core provides a TextDecoderStream to decode binary data.

import { fetch } from 'undici';
import { TextDecoderStream } from 'node:stream/web';

async function fetchStream() {
  const response = await fetch('https://example.com')
  const stream = response.body;
  const textStream = stream.pipeThrough(new TextDecoderStream());
}

response.body is synchronous so you don’t need to await it. In the browser, fetch and TextDecoderStream are available on the global object so you wouldn’t include any import statements. Other than that, the code is exactly the same for Node and web browsers. Deno also has built-in support for fetch and TextDecoderStream.

Async iteration

The for-await-of loop is an asynchronous version of the for-of loop. A regular for-of loop is used to loop over arrays and other iterables. A for-await-of loop can be used to iterate over an array of promises, for example.

const promiseArray = [Promise.resolve("thing 1"), Promise.resolve("thing 2")];
for await (const thing of promiseArray) { console.log(thing); }

Importantly for us, this can also be used to iterate streams.

async function fetchAndLogStream() {
  const response = await fetch('https://example.com')
  const stream = response.body;
  const textStream = stream.pipeThrough(new TextDecoderStream());

  for await (const chunk of textStream) {
    console.log(chunk);
  }
}

fetchAndLogStream();

Async iteration of streams works in Node and Deno. All modern browsers have shipped for-await-of loops but they don’t work on streams just yet.

Some other ways to get a readable stream

Fetch will be one of the most common ways to get hold of a stream, but there are other ways. Blob and File both have a .stream() method that returns a readable stream. The following code works in modern browsers as well as in Node and in Deno — although, in Node, you will need to import { Blob } from 'buffer'; before you can use it:

const blobStream = new Blob(['Lorem ipsum'], { type: 'text/plain' }).stream();

Here is a front-end browser-based example: If you have a <input type="file"> in your markup, it’s easy to get the user-selected file as a stream.

const fileStream = document.querySelector('input').files[0].stream();

Shipping in Node 17, the FileHandle object returned by the fs/promises open() function has a .readableWebStream() method.

import {
  open,
} from 'node:fs/promises';

const file = await open('./some/file/to/read');

for await (const chunk of file.readableWebStream())
  console.log(chunk);

await file.close();

Streams work nicely with promises

If you need to do something after the stream has completed, you can use promises.

someReadableStream
.pipeTo(someWritableStream)
.then(() => console.log("all data successfully written"))
.catch(error => console.error("something went wrong", error))

Or, you can optionally await the result:

await someReadableStream.pipeTo(someWritableStream)

Creating your own transform stream

We already saw TextDecoderStream (there’s also a TextEncoderStream). You can also create your own transform stream from scratch. The TransformStream constructor can accept an object. You can specify three methods in the object: start, transform and flush. They’re all optional, but transform is what actually does the transformation.

As an example, let’s pretend that TextDecoderStream() doesn’t exist and implement the same functionality (be sure to use TextDecoderStream in production though as the following is an over-simplified example):

const decoder = new TextDecoder();
const decodeStream = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(decoder.decode(chunk, {stream: true}));
  }
});

Each received chunk is modified and then forwarded on by the controller. In the above example, each chunk is some encoded text that gets decoded and then forwarded. Let’s take a quick look at the other two methods:

const transformStream = new TransformStream({
  start(controller) {
    // Called immediately when the TransformStream is created
  },

  flush(controller) {
    // Called when chunks are no longer being forwarded to the transformer
  }
});

A transform stream is a readable stream and a writable stream working together, usually to transform some data. Every object made with new TransformStream() has a property called readable, which is a ReadableStream, and a property called writable, which is a writable stream. Calling someReadableStream.pipeThrough() writes the data from someReadableStream to transformStream.writable, possibly transforms the data, then pushes the data to transformStream.readable.

Some people find it helpful to create a transform stream that doesn’t actually transform data. This is known as an “identity transform stream” — created by calling new TransformStream() without passing in any object argument, or by leaving off the transform method. It forwards all chunks written to its writable side to its readable side, without any changes. As a simple example of the concept, “hello” is logged by the following code:

const {readable, writable} = new TransformStream();
writable.getWriter().write('hello');
readable.getReader().read().then(({value, done}) => console.log(value))

Creating your own readable stream

It’s possible to create a custom stream and populate it with your own chunks. The new ReadableStream() constructor takes an object that can contain a start function, a pull function, and a cancel function. This function is invoked immediately when the ReadableStream is created. Inside the start function, use controller.enqueue to add chunks to the stream.

Here’s a basic “hello world” example:

import { ReadableStream } from "node:stream/web";
const readable = new ReadableStream({
  start(controller) {
    controller.enqueue("hello");
    controller.enqueue("world");
    controller.close();
  },
});

const allChunks = [];
for await (const chunk of readable) {
  allChunks.push(chunk);
}
console.log(allChunks.join(" "));

Here’s an more real-world example taken from the streams specification that turns a web socket into a readable stream:

function makeReadableWebSocketStream(url, protocols) {
  let websocket = new WebSocket(url, protocols);
  websocket.binaryType = "arraybuffer";

  return new ReadableStream({
    start(controller) {
      websocket.onmessage = event => controller.enqueue(event.data);
      websocket.onclose = () => controller.close();
      websocket.onerror = () => controller.error(new Error("The WebSocket errored"));
    }
  });
}

Node streams interoperability

In Node, the old Node-specific way of working with streams isn’t being removed. The old node streams API and the web streams API will coexist. It might therefore sometimes be necessary to turn a Node stream into a web stream, and vice versa, using .fromWeb() and .toWeb() methods, which are being added in Node 17.

import {Readable} from 'node:stream';
import {fetch} from 'undici';

const response = await fetch(url);
const readableNodeStream = Readable.fromWeb(response.body);

Conclusion

ES modules, EventTarget, AbortController, URL parser, Web Crypto, Blob, TextEncoder/Decoder: increasingly more browser APIs are ending up in Node.js. The knowledge and skills are transferable. Fetch and streams are an important part of that convergence.

Domenic Denicola, a co-author of the streams spec, has written that the goal of the streams API is to provide an efficient abstraction and unifying primitive for I/O, like promises have become for asynchronicity. To become truly useful on the front end, more APIs need to actually support streams. At the moment a MediaStream, despite its name, is not a readable stream. If you’re working with video or audio (at least at the moment), a readable stream can’t be assigned to srcObject. Or let’s say you want to get an image and pass it through a transform stream, then insert it onto the page. At the time of writing, the code for using a stream as the src of an image element is somewhat verbose:

const response = await fetch('cute-cat.png');
const bodyStream = response.body;
const newResponse = new Response(bodyStream);
const blob = await newResponse.blob();
const url = URL.createObjectURL(blob);
document.querySelector('img').src = url;    

Over time, though, more APIs in both the browser and Node (and Deno) will make use of streams, so they’re worth learning about. There’s already a stream API for working with Web Sockets in Deno and Chrome, for example. Chrome has implemented Fetch request streams. Node and Chrome have implemented transferable streams to pipe data to and from a worker to process the chunks in a separate thread. People are already using streams to do interesting things for products in the real world: the creators of file-sharing web app Wormhole have open-sourced code to encrypt a stream, for example.

Perhaps 2022 will be the year of web streams…





Source link

Leave a Reply

Your email address will not be published. Required fields are marked *