最近在尝试翻译 node 的官方文档,选了 stream 这一部分。翻译的过程中发现有些东西, 文档里面也讲得不是很清楚,所以稍微研究了一下。

pipe 向多个可写流

文档里面说到 pipe() 方法可以将多个可写流接到一个可读流上, 但是对这么做的具体行为并没有任何介绍。

那就先实现一个看看吧。

    'use strict';

    const stream = require('stream');
    // 为了方便看输出引入一个命令行颜色的库
    const clc = require('cli-color');

    let startTime = new Date().getTime();

    let count = 0;
    const myRead = new stream.Readable({
        read() {
            count++;
            if (count < 10) {
                this.push('Count ' + count);
            } else {
                this.push(null);
            }
        }
    });

    const myWrite1 = new stream.Writable({
        write(chunk, encoding, callback) {
            console.log(clc.yellow('write 1 get chunk: ') + chunk
                + ' at ' + clc.red(timePass()));
            setTimeout(() => {
                console.log(clc.yellow('write 1 finish chunk: ') + chunk
                    + ' at ' + clc.red(timePass()));
                callback();
            }, 1000);
        }
    });

    const myWrite2 = new stream.Writable({
        write(chunk, encoding, callback) {
            console.log(clc.blue('write 2 get chunk: ') + chunk
                + ' at ' + clc.red(timePass()));
            setTimeout(() => {
                console.log(clc.blue('write 2 finish chunk: ') + chunk
                    + ' at ' + clc.red(timePass()));
                callback();
            }, 3000);
        }
    });

    function timePass() {
        // 只看 setTimeout 消耗的时间,剩下的处理过程忽略
        return Math.floor((new Date().getTime() - startTime) / 1000);
    }

    myRead.pipe(myWrite1);
    myRead.pipe(myWrite2);

运行这段脚本会在控制台输出

    write 1 get chunk: Count 1 at 0
    write 2 get chunk: Count 1 at 0
    write 1 finish chunk: Count 1 at 1
    write 1 get chunk: Count 2 at 1
    write 1 finish chunk: Count 2 at 2
    write 1 get chunk: Count 3 at 2
    write 2 finish chunk: Count 1 at 3
    write 2 get chunk: Count 2 at 3
    write 1 finish chunk: Count 3 at 3
    write 1 get chunk: Count 4 at 3
    write 1 finish chunk: Count 4 at 4
    write 1 get chunk: Count 5 at 4
    write 1 finish chunk: Count 5 at 5
    write 1 get chunk: Count 6 at 5
    write 2 finish chunk: Count 2 at 6
    write 2 get chunk: Count 3 at 6
    write 1 finish chunk: Count 6 at 6
    write 1 get chunk: Count 7 at 6
    write 1 finish chunk: Count 7 at 7
    write 1 get chunk: Count 8 at 7
    write 1 finish chunk: Count 8 at 8
    write 1 get chunk: Count 9 at 8
    write 2 finish chunk: Count 3 at 9
    write 2 get chunk: Count 4 at 9
    write 1 finish chunk: Count 9 at 9
    write 2 finish chunk: Count 4 at 12
    write 2 get chunk: Count 5 at 12
    write 2 finish chunk: Count 5 at 15
    write 2 get chunk: Count 6 at 15
    write 2 finish chunk: Count 6 at 18
    write 2 get chunk: Count 7 at 18
    write 2 finish chunk: Count 7 at 21
    write 2 get chunk: Count 8 at 21
    write 2 finish chunk: Count 8 at 24
    write 2 get chunk: Count 9 at 24
    write 2 finish chunk: Count 9 at 27

其实这个输出跟我之前想象的并不一样,我原以为在可读流中的数据被消耗之后就会消失,像这样的实现应该是两个可写流争夺数据,而 1 号因为处理速度比较快,会抢到比 2 号更多的数据。但事实上, 可读流的所有数据都在两个可写流中处理了。

那进一步,给两个可写流设置 highWaterMark 参数,为了方便控制缓冲区大小, 顺便把流设置成对象模式。即给 myWrite1 和 myWrite2 构造函数分别加上下面的选项

    // myWrite1
    {
        objectMode: true,
        highWaterMark: 4
    }

    // myWrite2
    {
        objectMode: true,
        highWaterMark: 3
    }

再次运行这段代码会输出

    write 1 get chunk: Count 1
    write 2 get chunk: Count 1
    write 1 finish chunk: Count 1
    write 1 get chunk: Count 2
    write 1 finish chunk: Count 2
    write 1 get chunk: Count 3
    write 2 finish chunk: Count 1
    write 2 get chunk: Count 2
    write 1 finish chunk: Count 3
    write 2 finish chunk: Count 2
    write 2 get chunk: Count 3
    write 2 finish chunk: Count 3
    write 1 get chunk: Count 4
    write 2 get chunk: Count 4
    write 1 finish chunk: Count 4
    write 1 get chunk: Count 5
    write 1 finish chunk: Count 5
    write 1 get chunk: Count 6
    write 2 finish chunk: Count 4
    write 2 get chunk: Count 5
    write 1 finish chunk: Count 6
    write 2 finish chunk: Count 5
    write 2 get chunk: Count 6
    write 2 finish chunk: Count 6
    write 1 get chunk: Count 7
    write 2 get chunk: Count 7
    write 1 finish chunk: Count 7
    write 1 get chunk: Count 8
    write 1 finish chunk: Count 8
    write 1 get chunk: Count 9
    write 2 finish chunk: Count 7
    write 2 get chunk: Count 8
    write 1 finish chunk: Count 9
    write 2 finish chunk: Count 8
    write 2 get chunk: Count 9
    write 2 finish chunk: Count 9

可以看到输出有了很大的变化。第一版中,由于两个可写流对数据的消耗速度不同, 1 号处理速度是 2 号的三倍,所以当 1 号处理到第 9 个数据块的时候,2 号才处理第 3 个数据块。

而加上 highWaterMark 选项之后发生了变化。两个可读流都将第三个数据块处理完后, 第四个数据块才开始处理。也就是说,从可读流拉取 3 个数据块,就将 2 号可写流的缓冲区打满了, 所以数据的流动就暂停了。直到 2 号可写流发出 drain 事件,数据才恢复流动。 即多个可写流在消耗数据的时候,拉取数据只会拉取到填满缓冲区最小的一个流, 然后等到最慢的一个可写流将缓冲区数据处理完后,再重新拉取数据。(并不准确)

本来以为这个机制已经很清晰了,但当我修改 highWaterMark 参数,把两个可写流的缓冲区容量交换时, 我又懵逼了。输出是这样的

    write 1 get chunk: Count 1 at 0
    write 2 get chunk: Count 1 at 0
    write 1 finish chunk: Count 1 at 1
    write 1 get chunk: Count 2 at 1
    write 1 finish chunk: Count 2 at 2
    write 1 get chunk: Count 3 at 2
    write 2 finish chunk: Count 1 at 3
    write 2 get chunk: Count 2 at 3
    write 1 finish chunk: Count 3 at 3
    write 1 get chunk: Count 4 at 3
    write 1 finish chunk: Count 4 at 4
    write 1 get chunk: Count 5 at 4
    write 1 finish chunk: Count 5 at 5
    write 2 finish chunk: Count 2 at 6
    write 2 get chunk: Count 3 at 6
    write 2 finish chunk: Count 3 at 9
    write 2 get chunk: Count 4 at 9
    write 2 finish chunk: Count 4 at 12
    write 2 get chunk: Count 5 at 12
    write 2 finish chunk: Count 5 at 15
    write 1 get chunk: Count 6 at 15
    write 2 get chunk: Count 6 at 15
    write 1 finish chunk: Count 6 at 16
    write 1 get chunk: Count 7 at 16
    write 1 finish chunk: Count 7 at 17
    write 1 get chunk: Count 8 at 17
    write 2 finish chunk: Count 6 at 18
    write 2 get chunk: Count 7 at 18
    write 1 finish chunk: Count 8 at 18
    write 1 get chunk: Count 9 at 18
    write 1 finish chunk: Count 9 at 19
    write 2 finish chunk: Count 7 at 21
    write 2 get chunk: Count 8 at 21
    write 2 finish chunk: Count 8 at 24
    write 2 get chunk: Count 9 at 24
    write 2 finish chunk: Count 9 at 27

1 号可写流并没有在第三个数据块处理完成之后听下等待 2 号,而是在第 5 个数据块的时候, 才等待 2 号赶上它。

又试了好几个参数的组合,但结果还是让我百思不得其解。所幸,突然发现可写流对象中, 有一个 _writableState 属性,里面包含了可读流当前的状态。所以修改一下代码, 输出多一些信息。

const myWrite1 = new stream.Writable({
    write(chunk, encoding, callback) {
        console.log(clc.yellow('write 1 get chunk: ') + chunk
            + ' at ' + clc.red(timePass()));
        setTimeout(() => {
            console.log(clc.yellow('write 1 finish chunk: ') + chunk
                + ' at ' + clc.red(timePass()) + '.'
                + (myWrite1._writableState.needDrain ? '' : 'don`t') + ' need drain');
            callback();
        }, 1000);
    },
    objectMode: true,
    highWaterMark: 4
});

const myWrite2 = new stream.Writable({
    write(chunk, encoding, callback) {
        console.log(clc.blue('write 2 get chunk: ') + chunk
            + ' at ' + clc.red(timePass()));
        setTimeout(() => {
            console.log(clc.blue('write 2 finish chunk: ') + chunk
                + ' at ' + clc.red(timePass()) + '.'
                + (myWrite1._writableState.needDrain ? '' : 'don`t') + ' need drain');
            callback();
        }, 3000);
    },
    objectMode: true,
    highWaterMark: 3
});

myWrite1.on('drain', () => {console.log(clc.green('1 drain'))});
myWrite2.on('drain', () => {console.log(clc.green('2 drain'))});

在可写流 1 缓冲区大小为 4,可写流 2 为 3 时,输出如下

write 1 get chunk: Count 1 at 0
write 2 get chunk: Count 1 at 0
write 1 finish chunk: Count 1 at 1.don`t need drain
write 1 get chunk: Count 2 at 1
write 1 finish chunk: Count 2 at 2.don`t need drain
write 1 get chunk: Count 3 at 2
write 2 finish chunk: Count 1 at 3.don`t need drain
write 2 get chunk: Count 2 at 3
write 1 finish chunk: Count 3 at 3.don`t need drain
write 2 finish chunk: Count 2 at 6.don`t need drain
write 2 get chunk: Count 3 at 6
write 2 finish chunk: Count 3 at 9.don`t need drain
2 drain
write 1 get chunk: Count 4 at 9
write 2 get chunk: Count 4 at 9
write 1 finish chunk: Count 4 at 10.don`t need drain
write 1 get chunk: Count 5 at 10
write 1 finish chunk: Count 5 at 11.don`t need drain
write 1 get chunk: Count 6 at 11
write 2 finish chunk: Count 4 at 12.don`t need drain
write 2 get chunk: Count 5 at 12
write 1 finish chunk: Count 6 at 12.don`t need drain
write 2 finish chunk: Count 5 at 15.don`t need drain
write 2 get chunk: Count 6 at 15
write 2 finish chunk: Count 6 at 18.don`t need drain
2 drain
write 1 get chunk: Count 7 at 18
write 2 get chunk: Count 7 at 18
write 1 finish chunk: Count 7 at 19.don`t need drain
write 1 get chunk: Count 8 at 19
write 1 finish chunk: Count 8 at 20.don`t need drain
write 1 get chunk: Count 9 at 20
write 2 finish chunk: Count 7 at 21.don`t need drain
write 2 get chunk: Count 8 at 21
write 1 finish chunk: Count 9 at 21.don`t need drain
write 2 finish chunk: Count 8 at 24.don`t need drain
write 2 get chunk: Count 9 at 24
write 2 finish chunk: Count 9 at 27.don`t need drain

可写流 2 缓冲区大小为 4,可写流 1 为 3 时,输出如下

write 1 get chunk: Count 1 at 0
write 2 get chunk: Count 1 at 0
write 1 finish chunk: Count 1 at 1. need drain
write 1 get chunk: Count 2 at 1
write 1 finish chunk: Count 2 at 2. need drain
write 1 get chunk: Count 3 at 2
write 2 finish chunk: Count 1 at 3. need drain
write 2 get chunk: Count 2 at 3
write 1 finish chunk: Count 3 at 3. need drain
1 drain
write 1 get chunk: Count 4 at 3
write 1 finish chunk: Count 4 at 4.don`t need drain
write 1 get chunk: Count 5 at 4
write 1 finish chunk: Count 5 at 5.don`t need drain
write 2 finish chunk: Count 2 at 6.don`t need drain
write 2 get chunk: Count 3 at 6
write 2 finish chunk: Count 3 at 9.don`t need drain
write 2 get chunk: Count 4 at 9
write 2 finish chunk: Count 4 at 12.don`t need drain
write 2 get chunk: Count 5 at 12
write 2 finish chunk: Count 5 at 15.don`t need drain
2 drain
write 1 get chunk: Count 6 at 15
write 2 get chunk: Count 6 at 15
write 1 finish chunk: Count 6 at 16. need drain
write 1 get chunk: Count 7 at 16
write 1 finish chunk: Count 7 at 17. need drain
write 1 get chunk: Count 8 at 17
write 2 finish chunk: Count 6 at 18. need drain
write 2 get chunk: Count 7 at 18
write 1 finish chunk: Count 8 at 18. need drain
1 drain
write 1 get chunk: Count 9 at 18
write 1 finish chunk: Count 9 at 19.don`t need drain
write 2 finish chunk: Count 7 at 21.don`t need drain
write 2 get chunk: Count 8 at 21
write 2 finish chunk: Count 8 at 24.don`t need drain
write 2 get chunk: Count 9 at 24
write 2 finish chunk: Count 9 at 27.don`t need drain

在第一种情况中,可写流 1 的缓冲区大,消耗数据的速度又快,所以没有机会将其缓冲区打满, 所以都是由可写流 2 触发 drain 事件。而在第二种情况中,可写流 1 缓冲区一开始就被打满, 然后当它消耗完数据之后,就触发了 drain 事件,向缓冲区补充数据,但由于可写流 2 的缓冲区这时候只消耗了两个数据块,只有两个空位,所以只能从可写流拉取两份数据。 这种情况下,可写流 1 的缓冲区就没有被打满,所以即使它消耗完了数据,也不会触发 drain 事件,而是等到可写流 2 的数据消耗完后,由可写流 2 来触发。

所以之前的结论应该稍微修改一下,变成多个可写流在消耗数据的时候, 拉取数据只会拉取到填满缓冲区最小的一个流,然后等待一个缓冲区被打满的可写流消耗完数据, 触发 drain 事件,再重新拉取数据。