Introduction to Node.js Streams

Introduction

Streams are a somewhat advanced concept to understand. So in this article, we will go along with some examples for a better understanding and introduce you to a few concepts along the way.

What is a Stream

In simple terms, streams are used to read from input or write to output sequentially. Most of the time, streams are used to read or write from a continuous source or a comparably big one.

For example, let's assume that you have to read a large file. If the file size is larger than your free memory space, you cannot read the whole file into the memory in order to process it. You have to read it piece by piece and process each chunk, which may be separated by a line for example.

Another example of a continuous source is network communication - like a chat application where data should be continuously flowing from the sender to the receiver.

Streams in Node.js

The Stream module is a native module that is shipped by default in Node.js. The Stream is an instance of the EventEmitter class, which handles events asynchronously in Node.js. Due to their superclass, streams are inherently event-based.

There are 4 types of streams in Node.js:

  • Writable: Used to write data sequentially
  • Readable: Used to read data sequentially
  • Duplex: Used to both read and write data sequentially
  • Transform: Where data can be modified when writing or reading. Take compression for an example, with a stream like this you can write compressed data and read decompressed data.

Let's take a look at a few examples of streams.

Writable Streams

First of all, let's create a writable stream and write some data into a file:

const fs = require('fs');
const file = fs.createWriteStream('file.txt');

file.write('hello world');
file.end(', from streams!');

In this code, we have used the file system module to create a writable stream to a file (file.txt) and write 2 separate chunks to it: hello world and , from streams.

Unlike the fs.writeFile() where we need to write the file content at once, using a stream we can write the content chunk by chunk.

To simulate a continuous input, we could do something along the lines of:

const fs = require('fs');
const file = fs.createWriteStream('file.txt');

for (let i = 0; i < 10000; i++) {
    file.write('Hello world ' + i);
}
file.end();

This will write Hello world + {i} ten thousand times and then end the stream:

Hello world 0
Hello world 1
Hello world 2
Hello world 3
Hello world 4
...

Please remember to .end() your streams after you're done using them, since the finish event is sent after the .end() method has been called.

This signifies that the stream's body has been flushed into our file.

Readable Streams

Now let's take a look at another simple example by reading a file using a stream. We can read a file chunk by chunk, instead of reading the full content in to memory, using a readable stream:

const fs = require('fs');

const readableStream = fs.createReadStream('./article.md', {
    highWaterMark: 10
});

readableStream.on('readable', () => {
    process.stdout.write(`[${readableStream.read()}]`);
});

readableStream.on('end', () => {
    console.log('DONE');
});

Similar to creating a writable stream, we've created a readable stream by calling the .createReadStream() method.

While buffering (segmenting the data into chunks), the size of the buffer depends on the highWaterMark parameter, which is passed to the stream constructor.

The default value of this parameter is 16384 bytes (16kb) so if you don't override the parameter, the stream will read 16kb chunks and pass them to you to process.

Since we are using a small text file it makes more sense to use a small value for our example, so the text will be chucked by 10 characters.

In our example above, we simply printed the chunk of data we received, except with brackets around it so that you can easily see the different chunks. The output of our code looks like this:

[### Introd][uction

St][reams are ][a somewhat][ advanced ][concept to][ understan][d. So in t][his articl][e, we will][ go along ][with some ][examples f][or a bette][r understa][nding and ][introduce ][you to a f][ew concept][s along th][e way.

##][# What is ][a Stream

][In simple ]...

Duplex Streams

With both writable and readable streams out of the way, we can jump into an example using duplex streams - which essentially combine both.

We'll be demonstrating them using a simple HTTP server built using Node.js' native http module. The example used here is from the official Node.js documentation.

Since servers receive requests and then send responses, they're a good example for duplex streams, which handle both - a readable stream will act as a continuous request and a writable stream will act as a response.

First, let's import the HTTP module:

const http = require('http');

Now let's create a simple HTTP server:

const server = http.createServer((req, res) => {
    // `req` is an http.IncomingMessage, which is a Readable Stream.
    // `res` is an http.ServerResponse, which is a Writable Stream.

    let body = '';

    // Get the data as utf8 strings.
    // If an encoding is not set, Buffer objects will be received.
    req.setEncoding('utf8');

    // Readable streams emit 'data' events once a listener is added.
    req.on('data', (chunk) => {
        body += chunk;
    });

    // The 'end' event indicates that the entire body has been received.
    req.on('end', () => {
        consol.log(body);

        try {
            // Send 'Hello World' to the user
            res.write('Hello World');
            res.end();
        } catch (er) {
            res.statusCode = 400;
            return res.end(`error: ${er.message}`);
        }
    });
});

The req parameter is a readable stream, which we will process upon receiving as an HTTP request. We'll then send res as a response, which is, again, a simple writable stream.

Then, using the .on() method, we read the body of the request in chunks of 64KB and store it into the body, triggered by the data event.

Please note the use of the setEncoding() method before reading from the stream.

This way, the stream will emit strings and it would emit Buffer objects otherwise. Though, you can also perform that conversation inside the data event callback if you prefer.

The end event is triggered when there is nothing left to read in a readable stream. We will talk about other useful events later in this article.

Now, let's listen to the server:

server.listen(1337);

Hitting http://localhost:1337, you should see a simple Hello World response from the HTTP server.

Stream Pipelines

Using stream pipes we can directly pipe readable streams to a writable stream without storing the buffer temporarily - so we can save memory space.

Consider a scenario where a user requests a large file from the server and there is no memory space to load it to the memory, or the same file is requested by a thousand different clients. In this case, we cannot read the content of the file to memory and then write it back to the client.

This is where the pipe method is useful, as we'll pipe a readable stream (a request) into a writable stream (a response) and serve it to the user without holding it up in the buffer.

First, let's do this without using streams:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
    fs.readFile('./video.mkv', (err, data) => {
        if (err) throw err;

        res.end(data);
    });
});

server.listen(1337);

This method is directly reading the file into the memory using the .readFile() method and sends it to the user.

Open your web browser and go to http://localhost:1337, here's what happening behind the scenes:

memory_usage

Now, let's serve the video using a stream:

const http = require('http');
const fs = require('fs');

const server = http.createServer((req, res) => {
    const src = fs.createReadStream('./video.mkv');
    src.pipe(res);
});

server.listen(1337);

In this code, we have created a readable stream to the file and directly pipe it to the HTTP response, so rather than loading it to the memory the input from the HDD disk is directly written into the network without consuming the memory.

Here is the screenshot of memory usage while sending the file using a stream:

memory_usage_streams

As you can see the memory usage is way too low compared to the first method.

Useful Events in a Stream

Since the Stream class inherits the EventEmitter class, each stream will have its own type of events that you can subscribe to using the EventEmitter's on() method. This event will depend on the stream type.

Events in Readable Streams

  • data: Emitted when a chunk of data is read from the stream. By default, the chunk will be a Buffer object. If you want to change it you can use the .setEncoding() method.
  • error: Emitted when an error occurs during reading. This may happen if the writable stream is unable to generate data due to some internal failure or when an invalid chunk is pushed to the stream.
  • end: Emitted when there is no more data in the stream.
  • close: Emitted when the stream resource is closed and indicates that no more events will be emitted in the future.
  • readable: Emitted when the data is available in the readable stream to read.

Events in Writable Streams

  • close: Emitted when the stream resource is closed and indicates that no more events will be emitted in the future.
  • error: Emitted when an error occurs during reading. This may happen if the writable stream is unable to generate data due to some internal failure or when invalid chunk data is pushed to the stream.
  • finish: Emitted when all the data has been flushed from the writable stream.
  • pipe: Emitted when the writable stream is piped to a readable stream.
  • unpipe: Emitted when the writable stream is un-piped from a readable stream.

Conclusion

In simple terms, streams are used to read from input or write into output sequentially. Most of the time, streams are used to read or write from a continuous source or a comparably big one.

The Stream module is a native module that shipped by default in Node.js. The Stream is an instance of the EventEmitter class, which handles events asynchronously in Node.js. Due to their superclass, streams are inherently event-based.

Transformation Streams weren't covered in this article, as they warrant their own article.

The source code of this project is available on GitHub as usual. Use this to compare your code if you got stuck along the tutorial.

If you want more information about streams and or advanced knowledge, it's recommended to follow the official documentation for Streams.

Author image
About Janith Kasun
Colombo, Sri Lanka Twitter