Node 流 pipe 多个目标
最近在尝试翻译 node 的官方文档,选了 stream 这一部分。翻译的过程中发现有些东西, 文档里面也讲得不是很清楚,所以稍微研究了一下。
pipe 向多个可写流
文档里面说到 pipe() 方法可以将多个可写流接到一个可读流上, 但是对这么做的具体行为并没有任何介绍。
那就先实现一个看看吧。
'use strict';
const stream = require('stream');
// 为了方便看输出引入一个命令行颜色的库
const clc = require('cli-color');
let startTime = new Date().getTime();
let count = 0;
const myRead = new stream.Readable({
read() {
count++;
if (count < 10) {
this.push('Count ' + count);
} else {
this.push(null);
}
}
});
const myWrite1 = new stream.Writable({
write(chunk, encoding, callback) {
console.log(clc.yellow('write 1 get chunk: ') + chunk
+ ' at ' + clc.red(timePass()));
setTimeout(() => {
console.log(clc.yellow('write 1 finish chunk: ') + chunk
+ ' at ' + clc.red(timePass()));
callback();
}, 1000);
}
});
const myWrite2 = new stream.Writable({
write(chunk, encoding, callback) {
console.log(clc.blue('write 2 get chunk: ') + chunk
+ ' at ' + clc.red(timePass()));
setTimeout(() => {
console.log(clc.blue('write 2 finish chunk: ') + chunk
+ ' at ' + clc.red(timePass()));
callback();
}, 3000);
}
});
function timePass() {
// 只看 setTimeout 消耗的时间,剩下的处理过程忽略
return Math.floor((new Date().getTime() - startTime) / 1000);
}
myRead.pipe(myWrite1);
myRead.pipe(myWrite2);
运行这段脚本会在控制台输出
write 1 get chunk: Count 1 at 0
write 2 get chunk: Count 1 at 0
write 1 finish chunk: Count 1 at 1
write 1 get chunk: Count 2 at 1
write 1 finish chunk: Count 2 at 2
write 1 get chunk: Count 3 at 2
write 2 finish chunk: Count 1 at 3
write 2 get chunk: Count 2 at 3
write 1 finish chunk: Count 3 at 3
write 1 get chunk: Count 4 at 3
write 1 finish chunk: Count 4 at 4
write 1 get chunk: Count 5 at 4
write 1 finish chunk: Count 5 at 5
write 1 get chunk: Count 6 at 5
write 2 finish chunk: Count 2 at 6
write 2 get chunk: Count 3 at 6
write 1 finish chunk: Count 6 at 6
write 1 get chunk: Count 7 at 6
write 1 finish chunk: Count 7 at 7
write 1 get chunk: Count 8 at 7
write 1 finish chunk: Count 8 at 8
write 1 get chunk: Count 9 at 8
write 2 finish chunk: Count 3 at 9
write 2 get chunk: Count 4 at 9
write 1 finish chunk: Count 9 at 9
write 2 finish chunk: Count 4 at 12
write 2 get chunk: Count 5 at 12
write 2 finish chunk: Count 5 at 15
write 2 get chunk: Count 6 at 15
write 2 finish chunk: Count 6 at 18
write 2 get chunk: Count 7 at 18
write 2 finish chunk: Count 7 at 21
write 2 get chunk: Count 8 at 21
write 2 finish chunk: Count 8 at 24
write 2 get chunk: Count 9 at 24
write 2 finish chunk: Count 9 at 27
其实这个输出跟我之前想象的并不一样,我原以为在可读流中的数据被消耗之后就会消失,像这样的实现应该是两个可写流争夺数据,而 1 号因为处理速度比较快,会抢到比 2 号更多的数据。但事实上, 可读流的所有数据都在两个可写流中处理了。
那进一步,给两个可写流设置 highWaterMark
参数,为了方便控制缓冲区大小,
顺便把流设置成对象模式。即给 myWrite1 和 myWrite2 构造函数分别加上下面的选项
// myWrite1
{
objectMode: true,
highWaterMark: 4
}
// myWrite2
{
objectMode: true,
highWaterMark: 3
}
再次运行这段代码会输出
write 1 get chunk: Count 1
write 2 get chunk: Count 1
write 1 finish chunk: Count 1
write 1 get chunk: Count 2
write 1 finish chunk: Count 2
write 1 get chunk: Count 3
write 2 finish chunk: Count 1
write 2 get chunk: Count 2
write 1 finish chunk: Count 3
write 2 finish chunk: Count 2
write 2 get chunk: Count 3
write 2 finish chunk: Count 3
write 1 get chunk: Count 4
write 2 get chunk: Count 4
write 1 finish chunk: Count 4
write 1 get chunk: Count 5
write 1 finish chunk: Count 5
write 1 get chunk: Count 6
write 2 finish chunk: Count 4
write 2 get chunk: Count 5
write 1 finish chunk: Count 6
write 2 finish chunk: Count 5
write 2 get chunk: Count 6
write 2 finish chunk: Count 6
write 1 get chunk: Count 7
write 2 get chunk: Count 7
write 1 finish chunk: Count 7
write 1 get chunk: Count 8
write 1 finish chunk: Count 8
write 1 get chunk: Count 9
write 2 finish chunk: Count 7
write 2 get chunk: Count 8
write 1 finish chunk: Count 9
write 2 finish chunk: Count 8
write 2 get chunk: Count 9
write 2 finish chunk: Count 9
可以看到输出有了很大的变化。第一版中,由于两个可写流对数据的消耗速度不同, 1 号处理速度是 2 号的三倍,所以当 1 号处理到第 9 个数据块的时候,2 号才处理第 3 个数据块。
而加上 highWaterMark
选项之后发生了变化。两个可读流都将第三个数据块处理完后,
第四个数据块才开始处理。也就是说,从可读流拉取 3 个数据块,就将 2 号可写流的缓冲区打满了,
所以数据的流动就暂停了。直到 2 号可写流发出 drain
事件,数据才恢复流动。
即多个可写流在消耗数据的时候,拉取数据只会拉取到填满缓冲区最小的一个流,
然后等到最慢的一个可写流将缓冲区数据处理完后,再重新拉取数据。(并不准确)
本来以为这个机制已经很清晰了,但当我修改 highWaterMark
参数,把两个可写流的缓冲区容量交换时,
我又懵逼了。输出是这样的
write 1 get chunk: Count 1 at 0
write 2 get chunk: Count 1 at 0
write 1 finish chunk: Count 1 at 1
write 1 get chunk: Count 2 at 1
write 1 finish chunk: Count 2 at 2
write 1 get chunk: Count 3 at 2
write 2 finish chunk: Count 1 at 3
write 2 get chunk: Count 2 at 3
write 1 finish chunk: Count 3 at 3
write 1 get chunk: Count 4 at 3
write 1 finish chunk: Count 4 at 4
write 1 get chunk: Count 5 at 4
write 1 finish chunk: Count 5 at 5
write 2 finish chunk: Count 2 at 6
write 2 get chunk: Count 3 at 6
write 2 finish chunk: Count 3 at 9
write 2 get chunk: Count 4 at 9
write 2 finish chunk: Count 4 at 12
write 2 get chunk: Count 5 at 12
write 2 finish chunk: Count 5 at 15
write 1 get chunk: Count 6 at 15
write 2 get chunk: Count 6 at 15
write 1 finish chunk: Count 6 at 16
write 1 get chunk: Count 7 at 16
write 1 finish chunk: Count 7 at 17
write 1 get chunk: Count 8 at 17
write 2 finish chunk: Count 6 at 18
write 2 get chunk: Count 7 at 18
write 1 finish chunk: Count 8 at 18
write 1 get chunk: Count 9 at 18
write 1 finish chunk: Count 9 at 19
write 2 finish chunk: Count 7 at 21
write 2 get chunk: Count 8 at 21
write 2 finish chunk: Count 8 at 24
write 2 get chunk: Count 9 at 24
write 2 finish chunk: Count 9 at 27
1 号可写流并没有在第三个数据块处理完成之后听下等待 2 号,而是在第 5 个数据块的时候, 才等待 2 号赶上它。
又试了好几个参数的组合,但结果还是让我百思不得其解。所幸,突然发现可写流对象中,
有一个 _writableState
属性,里面包含了可读流当前的状态。所以修改一下代码,
输出多一些信息。
const myWrite1 = new stream.Writable({
write(chunk, encoding, callback) {
console.log(clc.yellow('write 1 get chunk: ') + chunk
+ ' at ' + clc.red(timePass()));
setTimeout(() => {
console.log(clc.yellow('write 1 finish chunk: ') + chunk
+ ' at ' + clc.red(timePass()) + '.'
+ (myWrite1._writableState.needDrain ? '' : 'don`t') + ' need drain');
callback();
}, 1000);
},
objectMode: true,
highWaterMark: 4
});
const myWrite2 = new stream.Writable({
write(chunk, encoding, callback) {
console.log(clc.blue('write 2 get chunk: ') + chunk
+ ' at ' + clc.red(timePass()));
setTimeout(() => {
console.log(clc.blue('write 2 finish chunk: ') + chunk
+ ' at ' + clc.red(timePass()) + '.'
+ (myWrite1._writableState.needDrain ? '' : 'don`t') + ' need drain');
callback();
}, 3000);
},
objectMode: true,
highWaterMark: 3
});
myWrite1.on('drain', () => {console.log(clc.green('1 drain'))});
myWrite2.on('drain', () => {console.log(clc.green('2 drain'))});
在可写流 1 缓冲区大小为 4,可写流 2 为 3 时,输出如下
write 1 get chunk: Count 1 at 0
write 2 get chunk: Count 1 at 0
write 1 finish chunk: Count 1 at 1.don`t need drain
write 1 get chunk: Count 2 at 1
write 1 finish chunk: Count 2 at 2.don`t need drain
write 1 get chunk: Count 3 at 2
write 2 finish chunk: Count 1 at 3.don`t need drain
write 2 get chunk: Count 2 at 3
write 1 finish chunk: Count 3 at 3.don`t need drain
write 2 finish chunk: Count 2 at 6.don`t need drain
write 2 get chunk: Count 3 at 6
write 2 finish chunk: Count 3 at 9.don`t need drain
2 drain
write 1 get chunk: Count 4 at 9
write 2 get chunk: Count 4 at 9
write 1 finish chunk: Count 4 at 10.don`t need drain
write 1 get chunk: Count 5 at 10
write 1 finish chunk: Count 5 at 11.don`t need drain
write 1 get chunk: Count 6 at 11
write 2 finish chunk: Count 4 at 12.don`t need drain
write 2 get chunk: Count 5 at 12
write 1 finish chunk: Count 6 at 12.don`t need drain
write 2 finish chunk: Count 5 at 15.don`t need drain
write 2 get chunk: Count 6 at 15
write 2 finish chunk: Count 6 at 18.don`t need drain
2 drain
write 1 get chunk: Count 7 at 18
write 2 get chunk: Count 7 at 18
write 1 finish chunk: Count 7 at 19.don`t need drain
write 1 get chunk: Count 8 at 19
write 1 finish chunk: Count 8 at 20.don`t need drain
write 1 get chunk: Count 9 at 20
write 2 finish chunk: Count 7 at 21.don`t need drain
write 2 get chunk: Count 8 at 21
write 1 finish chunk: Count 9 at 21.don`t need drain
write 2 finish chunk: Count 8 at 24.don`t need drain
write 2 get chunk: Count 9 at 24
write 2 finish chunk: Count 9 at 27.don`t need drain
可写流 2 缓冲区大小为 4,可写流 1 为 3 时,输出如下
write 1 get chunk: Count 1 at 0
write 2 get chunk: Count 1 at 0
write 1 finish chunk: Count 1 at 1. need drain
write 1 get chunk: Count 2 at 1
write 1 finish chunk: Count 2 at 2. need drain
write 1 get chunk: Count 3 at 2
write 2 finish chunk: Count 1 at 3. need drain
write 2 get chunk: Count 2 at 3
write 1 finish chunk: Count 3 at 3. need drain
1 drain
write 1 get chunk: Count 4 at 3
write 1 finish chunk: Count 4 at 4.don`t need drain
write 1 get chunk: Count 5 at 4
write 1 finish chunk: Count 5 at 5.don`t need drain
write 2 finish chunk: Count 2 at 6.don`t need drain
write 2 get chunk: Count 3 at 6
write 2 finish chunk: Count 3 at 9.don`t need drain
write 2 get chunk: Count 4 at 9
write 2 finish chunk: Count 4 at 12.don`t need drain
write 2 get chunk: Count 5 at 12
write 2 finish chunk: Count 5 at 15.don`t need drain
2 drain
write 1 get chunk: Count 6 at 15
write 2 get chunk: Count 6 at 15
write 1 finish chunk: Count 6 at 16. need drain
write 1 get chunk: Count 7 at 16
write 1 finish chunk: Count 7 at 17. need drain
write 1 get chunk: Count 8 at 17
write 2 finish chunk: Count 6 at 18. need drain
write 2 get chunk: Count 7 at 18
write 1 finish chunk: Count 8 at 18. need drain
1 drain
write 1 get chunk: Count 9 at 18
write 1 finish chunk: Count 9 at 19.don`t need drain
write 2 finish chunk: Count 7 at 21.don`t need drain
write 2 get chunk: Count 8 at 21
write 2 finish chunk: Count 8 at 24.don`t need drain
write 2 get chunk: Count 9 at 24
write 2 finish chunk: Count 9 at 27.don`t need drain
在第一种情况中,可写流 1 的缓冲区大,消耗数据的速度又快,所以没有机会将其缓冲区打满,
所以都是由可写流 2 触发 drain
事件。而在第二种情况中,可写流 1 缓冲区一开始就被打满,
然后当它消耗完数据之后,就触发了 drain
事件,向缓冲区补充数据,但由于可写流 2
的缓冲区这时候只消耗了两个数据块,只有两个空位,所以只能从可写流拉取两份数据。
这种情况下,可写流 1 的缓冲区就没有被打满,所以即使它消耗完了数据,也不会触发
drain
事件,而是等到可写流 2 的数据消耗完后,由可写流 2 来触发。
所以之前的结论应该稍微修改一下,变成多个可写流在消耗数据的时候, 拉取数据只会拉取到填满缓冲区最小的一个流,然后等待一个缓冲区被打满的可写流消耗完数据, 触发 drain 事件,再重新拉取数据。