Using streams in Node.js
Sometimes one doesn’t want to wait to finish for an asynchronous operation, like a fetch or calculation, to do further actions with that data. Streams are there to help with that. This will be a short introduction into Readable, Transform and Writable streams in Node.js.
Readable Streams
The documentation already explains this pretty good:
The Readable stream interface is the abstraction for a source of data that you are reading from. In other words, data comes out of a Readable stream.
In this example we are going to create a stream from an Array:
// File array-stream.js
import stream from 'stream';
export default class ArrayStream extends stream.Readable {
/**
* Initialize the stream
* @param {Array} values The data to create a stream from
*/
constructor(values) {
super({objectMode: true});
this.values = values;
this.currentIndex = 0;
}
/**
* The _read function for the stream.
* This function get’s called, when something listens to this stream
*/
_read() {
// When the data is empty, puss null to tell so.
if (this.currentIndex === this.values.length) {
return this.push(null);
}
this.push(this.values[this.currentIndex++]);
}
}
// File index.js
import ArrayStream from './array-stream';
const arrayStream = new ArrayStream([1, 2, 3]);
arrayStream.on('data', chunk => console.log(chunk));
// => 1, 2 and 3
That’s pretty straight forward. We set up the stream class and add a _read()
function that get’s called when the listener indicates that it is ready to receive data. It then pushes out one chunk of the data. When there is no more data, pushing null
will tell the receiver.
Transform Streams
A Transform Stream is a mix of a Readable and a Writable Stream. The input has to be a stream and the output is a new stream. In between, we can do something with the chunks of data.
In this example, we are going to use the Readable arrayStream we created above and will multiply the values by 2:
// File double-stream.js
import streams from `streams`;
export default class DoubleStream extends stream.Transform {
/**
* Initialize the stream
*/
constructor() {
super({objectMode: true});
}
/**
* The _transform function for the stream.
* This function get’s called, when there is some data available
* @param {Number} number The value we want to double
* @param {String} encoding The encoding of the passed data chunk
* @param {Function} done A callback function for async operations
*/
_transform(number, encoding, done) {
const time = Math.floor(Math.random() * 1000);
setTimeout(() => {
this.push(number * 2);
done();
}, time);
}
}
// File index.js
import ArrayStream from './array-stream';
import DoubleStream from './double-stream';
const arrayStream = new ArrayStream([1, 2, 3]),
doubleStream = new DoubleStream();
arrayStream.pipe(doubleStream);
doubleStream.on('data', chunk => console.log(chunk));
// => 2, 4 and 6, with delays
Creating a Transfrom stream is pretty similar to a Readable stream. Only this time the internal function is called _transform()
. To connect our Readable stream to the Transform stream, we use the .pipe()
function. Similar to the .on('data')
listener, it will pass data chunks as they appear to the next stream.
As you can see in the Transform stream, streams are perfect for handling async data.
Writable Streams
As the name suggests, this is a stream that receives a stream and writes it somewhere. Or does whatever it does but will not return another stream. We can pipe data chunks into this stream but will not be able to pipe to another stream.
We want to log out our new doubled numbers, so let’s create a write stream:
// File log-stream.js
import stream from 'stream';
export default class LogStream extends stream.Writable {
/**
* Initialize the stream
*/
constructor(values) {
super({objectMode: true});
}
/**
* The _write function for the stream.
* This function get’s called, when there is some data available
* @param {Number} number The value we want to double
* @param {String} encoding The encoding of the passed data chunk
* @param {Function} done A callback function for async operations
*/
_write(number, encoding, done) {
console.log(number);
done();
}
}
// File index.js
import ArrayStream from './array-stream';
import DoubleStream from './double-stream';
import LogStream from './log-stream';
const arrayStream = new ArrayStream([1, 2, 3]),
doubleStream = new DoubleStream(),
logStream = new LogStream();
arrayStream
.pipe(doubleStream)
.pipe(logStream);
Similar to our streams before, there is a _write()
function, that get’s the piped in data. We log the passed number and call the done()
callback to indicate that we are ready.
You may have noted, that we did not need to handle an end of the Transform or Writable stream: By pushing null
in the Readable stream, the other streams know when there is no more data and end themselves.
Conclusion
As you can see, putting streams toghether is pretty simple. We can know build many more Transform streams and pipe our data through them. Each time a chunk of data is finished, we could notify the user and make it already available to proceed with that data.
At first it looked complicated, but in fact it is really simple.