最近又做了一些日志回收,入库之类的工作。用了之前一直没什么机会用到的 stream 模块。 stream 模块 API 变化很大啊,比之前好多了,也稳定下来了,之前居然连 pause 方法都没有,就像开了水龙头就关不上一样,只能自己拿杯子接着。

Node 中的流分 Readable,Writable,Duplex,Transform 几种。平时其实也多多少少会用到, 因为 Http 请求和返回都是 stream,只是说常用 express,包了一层,并不是很关注。

除了 Http 之外,stream 用的最多还是文件读写了,现在 fs 模块也相应地提供了 fs.createReadStream 和 fs.createWriteStream 两个方法,非常地方便。

先来看一下怎么创建一个流

    let stream = require('stream');

    let myReadable = new stream.Readable({
        // 内部缓存大小,默认为 16384(16kb),对于处于 objectMode 的流,默认是 16(个对象)
        highwatermark: 16384,
        // 默认为 null,如果设置了编码,buffer 会被转换为指定格式的字符串
        encoding: 'utf8',
        // 是否为对象模式
        objectmode: false,
        read: _read 函数的实现
    });

Node 中的流对象,大概可以理解为一个水管中的阀门一样的东西,流对象内部会有一个缓冲区,用来暂存数据, 上面的参数中的 highwatermark 就是用来设置缓冲区的大小的。像阀门一样,内部的容积, 相对于整体数据是很小的,只是起到一个暂存缓冲的作用。

Readable 的作用一般是从数据源中汲取数据,供给整个数据流后面的部分使用,相当于对数据进行了一次转换, 由其他的形式(比如文件,数据库等)转换为流的形式。当然它也可以自己生成数据。

流有着两种状态,一种是暂停状态,一种是流状态,这也是我觉得现在 stream 模块可用性提高了很多的主要原因。 暂停状态就像我们家里水龙头接水一样,平时是不出水的,须要的时候自己拿容器去接一点 (调用 Readable 的 read 方法),而流模式就像拿水管在花园里浇花一样,水龙头是一直开着的, 水有多少出多少,把关掉水流的操作留给了后面的环节(也就是说后面如果堵住了,数据的流动也会停止)。 流默认是暂停模式的。

转换流对象的状态的方法有很多,最直接的是两个方法

    stream.pasue();     // 暂停
    stream.resume();    // 恢复流模式

但还有其他一些方法会导致模式的切换

    // 添加对 data 事件的监听就是向流索要数据,会自动切换到流模式
    // 尽可能快地获取或生成数据
    myReadable.on('data', chunk => {
        // do somthing
    });
    // 通过 pipe 发送数据也是一样的,会自动切换到流模式
    myReadable.pipe(someWritableStream);

如果须要切换回暂停模式的时候,须要移除对 data 事件的监听,还要调用 pasue() 方法。

从 Readable 流中获取数据,实际上是调用了对象内部的 _read 方法,也就是上面代码里面的配置项中的 read。 read 接收一个参数 size,表示要读取数据的数量,然后须要使用 push 方法向缓存区中推入数据。

比如我们创建一个只会返回字符串 ‘123’ 的可读流,那么他的 _read 方法可以是这样

    let stream = require('stream');
    let process = require('process');

    let myReadable = new stream.Readable({
        // 内部缓存大小,默认为 16384(16kb),对于处于 objectMode 的流,默认是 16(个对象)
        highwatermark: 16384,
        // 默认为 null,如果设置了编码,buffer 会被转换为指定格式的字符串
        encoding: 'utf8',
        // 是否为对象模式
        objectmode: false,
        read(size) {
            this.push('123', 'utf8');
        }
    });
    myReadable.pipe(process.stdout);

如果运行这个文件,就会在你的标准输出狂刷 123 了。但如果须要复杂一些的功能, 可能就不能这样直接用构造函数生成了,可以使用 util 模块的 inherits 函数来继承 Readable 创建自定义的流,像这样

    let Readable = require('stream').Readable;
    let util = require('util');
    let process = require('process');

    util.inherits(customRead, Readable);
    function customRead(opt) {
        Readable.call(this, opt);
        this.max = 10;
        this.cunrrent = 1;
    }
    customRead.prototype._read = function (size) {
        if (this.cunrrent <= this.max) {
            this.push(String(this.cunrrent++), 'utf8');
        } else {
            // null 代表后面已经没有其他数据了,可读流会结束,触发 'end' 事件
            this.push(null);
        }
    }

    let test = new customRead();
    test.on('end', () => {
        console.log('end');
    });
    test.pipe(process.stdout);

执行这个文件的话,就会输出 1 到 10 然后数据 end。

EOF

当然 Readable 流还有很多方法和事件,比如 close 事件,readable 事件,setEncoding 方法,unpipe 方法之类的具体就看 node 的文档 吧。(最近发现还有个中文文档 翻译得还挺不错的)