Readable 流

除了 Readable 流之外,当然还有 Writable 流。

    let stream = require('stream');

    let myWritable = new stream.Writable({
        // 内部缓存大小,默认为 16384(16kb),对于处于 objectMode 的流,默认是 16(个对象)
        highwatermark: 16384,
        // 布尔值,决定是否在传给 _write 之前将数据转为 buffer,默认为 true
        decodeStrings: false,
        // 是否为对象模式
        objectmode: false,
        write: _write 函数的实现,
        writev: _writev 函数的实现
    });

highwatermark 参数和 Readable 流是一样的,标识内部缓存区的大小。如果被打满了, 使用 write 方法写入数据的话,是会返回 false 的。而当缓冲区的数据被消耗, 重新获得可写入的空间时,Writable 流就会发出 drain 事件,通知说可以写入了,跟 Readable 流的 readable 事件正好相反。

第二个参数 decodeStrings 如果不设置的话,在 _write 函数中收到的数据块都会是 Buffer 的形式, 如果想要拿到字符串进行处理,那就须要将其设置为 false。

淡然最重要的还是 write 参数,比如说想把收到的数据写入数据库。大概会是这样

    let stream = require('stream');

    let myWritable = new stream.Writable({
        objectMode: true,
        write(chunk, encoding, next) {
            // chunk 数据块
            // encoding 编码格式
            // next 回调函数
            someDataBase.save(chunk).then(() => {
                next();
            });
        }
    });

当处理完当前的数据块后,使用回调函数,通知此数据块已经处理完毕。

writev 函数是可选的,当你想要有一次处理多个数据块的能力的时候,可以实现这个方法, stream 会使用当前缓存区内所有的数据块作为参数来调用它。writev 接受两个参数, writev(chuncks, callback),其中 callback 是回调函数,和 write 一样,数据处理完之后, 调用此函数。chunks 是一个数组,其中每一个元素的格式像这样 {chunk: …, encoding: …} 其实就是 write 函数的批量处理版本。

有了 Readable 和 Writeable 之后,其他两种就很好理解了。Duplex 是同时实现了 Readable 和 Writable 的 stream,比如 TCP socket(输入与输出之间并不相关,仅仅是双工), 而 Transform 是 Duplex 中的一种,输入和输出之间是有依赖关系的,相当于一个 filter, 将数据进行变换之后传给后面的环节。

比如说我们想要使用流读取文件内容,但又不想使用 size 这种粒度,想要一个数据块就是一行 (当然如果你知道 readline 模块的话,就先假装不知道吧)应该怎么做呢。很自然就会想到使用一个 Transform 流整理数据的格式。

    let fs = require('fs');
    let stream = require('stream');
    let util = require('util');
    let process = require('process');

    let fileStream = fs.createReadStream(
        '/path/to/file',
        {encoding: 'utf8'}
    );

    util.inherits(Chunk2Line, stream.Transform);
    function Chunk2Line(opt) {
        stream.Transform.call(this, opt);
        this._strBuffer = '';
        this.count = 1;
    }
    Chunk2Line.prototype._transform = function(chunk, encoding, callback) {
        let lines = chunk.split('\n');
        if (lines.lenght > 1) {
            this._strBuffer = lines.pop();
        }
        lines.map(item => {
            this.push(this.count + ' ' + item + '\n', 'utf8');
            this.count++;
        });
        callback();
    };
    let chunk2Line = new Chunk2Line({decodeStrings: false});

    fileStream.pipe(chunk2Line).pipe(process.stdout);

代码大概像上面这样,我们就拥有了以行为单位的流,并给每一行加上了行号。 Transform 流的变换功能非常灵活,可以很方便的组装成想要的管道, 而且一些 Transform 的功能很容易抽取出纯函数来,就可以实现整个数据流程的分阶段缓存, 应用场景其实是很丰富的。

Transform 流还有一个重要的 flush 函数,和其他的一些 API,就不多赘述了,还是看文档靠谱。