neiro blog

High level Node.JS streams

· [neiro]

Node.js has a simple and powerful stream API. Streams in Node.js are unix pipes that let you perform asynchronous I/O operations by reading source data and pipe it to destination. If your application operates not with streams only, but promises, callbacks or synchronous code, you may want to use more deeper abstraction that fits your needs. In this case you may take a look at Highland.

Highland

Highland library allows you to manage asynchronous and synchronous code easily both in Node.js and in the browser. With Highland you can simple switch between synchronous and asynchronous data sources without re-writing your code. You can install Highland with NPM:

1 npm install highland

and require or import it as yet another Node.js module:

1 import _ from 'highland';

General examples

Converting from arrays to Highland Streams:

1 _([0, 1, 2]).toArray(xs => console.log(xs));
2// 0, 1, 2

Map and reduce over a stream:

1 _([0, 1, 2]).map(x => x + 1).reduce(1, (a,
2b) => a * b); // [6]

Reading files in parallel:

1 import fs from 'fs';
2const readFile =
3/.wrapCallback(fs.readFile); const stream = /(['./.babelrc',
4'./.eslintrc']).map(readFile).parallel(2);

Handling errors:

1 stream.errors((err, rethrow) => { console.error(err); });

Pipe to Node.js streams:

1 stream.pipe(outputStream);

Stream objects

Constructor:

1 const stream = _(source); // source - Array/Generator/Node Stream/Event Emitter/Promise/Iterator/Iterable

General functions:

1stream.destroy(); stream.end();
2stream.pause(); stream.resume(); stream.write(x); // Write x value to
3stream

Transformations

 1 ([0, 1, 2]).append(3); // [0, 1, 2, 3]
 2 ([0, 1, 2, null, undefined]).compact(); // [1, 2]
 3 ([1, 2, 5]).filter(x => x
 4<= 3); // [1, 2]
 5([1, 2, 3]).head(); // 1
 6([1, 2, 3]).last(); // 3
 7(['ABC']).invoke('toLowerCase', []); // abc
 8([{ foo: 'bar' }]).pick(['foo']); // { foo: 'bar' }
 9([{ foo: 'bar' }]).pluck(['foo']); // bar
10[0, 1, 2, 3]).slice(2, 4); // [2, 3]
11(['c', 'a', 'b']).sort(); // ['a', 'b', 'c']
12_([0, 0, 2, 3]).uniq(); // [0, 2, 3]

High-order Streams

1([0, 1]).concat([2, 3]); // [0, 1, 2, 3]
2([0, 1, [2, [3]]]).flatten(); // [0, 1, 2, 3]
3(/([0, 1]), /([2, 3])).sequence(); // [0, 1, 2, 3]
4(['a', 'b']).zip([1, 2]); // => ['a', 1], ['b', 2]
5
6const fork = stream.fork(); const observer = stream.observe();
7fork.resume();

Objects

1 .extend({ name: 'foo' }, { type: 'obj' });
2// { name: 'foo', type: 'obj' }
3.get('foo', { foo: 'bar' }); // bar
4.keys({ foo: 'bar' }); // ['foo']
5.values({ foo: 'bar' }); // ['bar']
6_.pairs({ foo: 'bar' }); // ['foo', 'bar']

Utils

1.isStream([1, 2, 3]); // false
2.isStream(stream); // true
3
4([1, 2, 3]).each(.log); // 1, 2, 3
5
6const readFile = .wrapCallback(fs.readFile); // Wraps callback to Highland stream
7.isStream(readFile); // true

Conclusion

If you need to handle your synchronous and asynchronous data with differrent abstractions in one way, operate with Node.js streams at higher level, you can use Highland high-level streams library to fit your needs. You can find more at Highland docs.

#javascript #functional