node之stream(上)——readable

升级客
• 阅读 2390

题外话

该文章整合了多篇网络文章(整合之处已设置超链接,可点击直接了解原文),目的仅仅是为了和大伙分享,更加通俗易懂的了解流的各个流程的初始。本人也是node的初学菜鸟,有描述错误或误人子弟的地方多请大神们多多指出。

readable

我们先来安利一些思路,方便理清楚逻辑:)。

读缓冲区(readable buffer):这里的读是个形容词,是指可读流临时存放data(只能是字符串或者Buffer,不能是数字)的缓冲区。(读缓冲区就像一个水电站一样,感觉这样描述比较好理解flowing、paused模式)

flowing模式:即流动模式,就像打开水电站的水闸一样,上游的水和下游完完全全连通直到上游来源的数据耗尽。

paused模式:即暂停模式,就像水电站的水闸在你指定的时候(使用stream.read())才会打开。不过,当你使用read()打开水闸的时候是一个超自然现象---水电站里的水瞬间被抽干,上游的水还没来得及填充水电站。然后自动关闭水闸,等待你的下一次“惠顾“read()。

_read:上游水源通过_read里中的push、unshift方法流入水电站中。

事件 查看原文

readable:在数据块可以从流中读取的时候发出。它对应的处理器没有参数,可以在处理器里调用read([size])方法读取数据。

data:有数据可读时发出。它对应的处理器有一个参数,代表数据。如果你只想快快地读取一个流的数据,给data关联一个处理器是最方便的办法。处理器的参数是Buffer对象,如果你调用了Readable的setEncoding(encoding)方法,处理器的参数就是String对象。

end:当数据被读完时发出。对应的处理器没有参数。

close:当底层的资源,如文件,已关闭时发出。不是所有的Readable流都会发出这个事件。对应的处理器没有参数。

error:当在接收数据中出现错误时发出。对应的处理器参数是Error的实例,它的message属性描述了错误原因,stack属性保存了发生错误时的堆栈信息。

函数 查看原文

read([size]):如果你给read方法传递了一个大小作为参数,那它会返回指定数量的数据,如果数据不足,就会返回null。如果你不给read方法传参,它会返回内部缓冲区里的所有数据,如果没有数据,会返回null,此时有可能说明遇到了文件末尾。read返回的数据可能是Buffer对象,也可能是String对象。

setEncoding(encoding):给流设置一个编码格式,用于解码读到的数据。调用此方法后,read([size])方法返回String对象。

pause():暂停可读流,不再发出data事件

resume():恢复可读流,继续发出data事件

pipe(destination,[options]):把这个可读流的输出传递给destination指定的Writable流,两个流组成一个管道。options是一个JS对象,这个对象有一个布尔类型的end属性,默认值为true,当end为true时,Readable结束时自动结束Writable。注意,我们可以把一个Readable与若干Writable连在一起,组成多个管道,每一个Writable都能得到同样的数据。这个方法返回destination,如果destination本身又是Readable流,就可以级联调用pipe(比如我们在使用gzip压缩、解压缩时就会这样,马上会讲到)。

unpipe([destination]):端口与指定destination的管道。不传递destination时,断开与这个可读流连在一起的所有管道。

流动模式和暂停模式切换 查看原文

流从默认的暂停模式切换到流动模式可以使用以下几种方式:

通过添加 data 事件监听器来启动数据监听

调用 resume() 方法启动数据流

调用 pipe() 方法将数据转接到另一个 可写流

从流动模式切换为暂停模式又两种方法:

在流没有 pipe() 时,调用 pause() 方法可以将流暂停

pipe() 时,需要移除所有 data 事件的监听,再调用 unpipe() 方法

触发准备数据(_read)的方法

data listener

readable listener

read()——如果当前缓冲区为空,或者缓冲区并未超出我们设定的最大值,那么就可以继续准备数据;如果此时正在准备数据(_read())或者已经结束读取(push(null)),那么就放弃准备数据。

工作流程 查看原文

这里我要比比两句哈哈。下面的备注来自的原文大神写的非常的细腻,还包括了源码的解读,不过不太适合初入了解流的同志们,把整个思路理清楚了之后更配哟~
paused模式:

node之stream(上)——readable

  1. 1.在paused模式下则读取全部缓冲区的长度;若读取的字节数(n)大于设置的缓冲区最大值,则适当扩大缓冲区的大小(默认为16k,最大为8m);若读取的长度大于当前缓冲区的大小,设置needReadable属性并准备数据等待下一次读取。

  2. 2.如果当前缓冲区为空,或者缓冲区并未超出我们设定的最大值,那么就可以继续准备数据;如果此时正在准备数据(_read())或者已经结束读取(push(null)),那么就放弃准备数据。

  3. 3.针对这个私有方法_read,文档上有特殊说明,自定义的Readable实现类需要实现这个方法,在该方法中手动添加数据到Readable对象的读缓冲区,然后进行Readable的读取。可以理解为_read函数为读取数据前的准备工作(准备数据),针对的是流的实现者而言。

flowing模式:

node之stream(上)——readable

  1. 1.对于处在flowing模式下的读取,每次只读缓冲区中第一个buffer的长度

  2. 2.针对这个私有方法_read,文档上有特殊说明,自定义的Readable实现类需要实现这个方法,在该方法中手动添加数据到Readable对象的读缓冲区,然后进行Readable的读取。可以理解为_read函数为读取数据前的准备工作(准备数据),针对的是流的实现者而言。

实例

paused模式:
//这是一个将存放多条json字符串的txt文件读取成json的例子
const stream = require('stream');
const fs = require('fs');
const util = require('util');

function JSONLineReader(source) {
    stream.Readable.call(this);
    this._source = source;
    this._foundLineEnd = false;
    this._buffer = '';

    source.on('readable', function() {//监听source什么时候准备好,那么我们就可以用read()或则readable listener去触发JSONLineReader的_read方法
        this.read();
        // this.on('readable', function(data) {
        //     console.log('readable');
        // });
    }.bind(this))
}

util.inherits(JSONLineReader, stream.Readable);

JSONLineReader.prototype._read = function(size) {
    var chunk;
    var line;
    var lineIndex;
    var result;
    if (this._buffer.length === 0) {
        chunk = this._source.read();
        this._buffer += chunk; //一次就拿完 只是看什么时候push null
    }
    lineIndex = this._buffer.indexOf('\n');
    if (lineIndex !== -1) {
        line = this._buffer.slice(0, lineIndex);
        if (line) {
            result = JSON.parse(line);
            this._buffer = this._buffer.slice(lineIndex + 1);
            this.emit('object', result);util.inspect(result))
            this.push(util.inspect(result));
        } else {
            this._buffer = this._buffer.slice(1);
        }
    }

}

let input = fs.createReadStream(__dirname + '/json-lines.txt', {
    encoding: 'utf8'
});

var jsonLineReader = new JSONLineReader(input);

jsonLineReader.on('object', function(obj) {
    console.log('pos:', obj);
})

/*json-lines.txt
{"success":false,"code":501}
{"success":true,"code":202}
{"success":false,"code":503}
{"success":true,"code":204}
{"success":false,"code":505}
{"success":true,"code":206}
{"success":false,"code":507}
{"success":true,"code":208}
{"success":false,"code":509}
*/
flowing模式:
let stream = require('stream');
let util = require('util');
util.inherits(flowingReadableDemo, stream.Readable);

function flowingReadableDemo(opt) {
    stream.Readable.call(this, opt);
    this.quotes = ["yessdasdsa", "noasdasdas", "maybe"];
    this._index = 0;
}
flowingReadableDemo.prototype._read = function() {
    if (this._index >= this.quotes.length) {
        this.push(null);
    } else {
        this.push(this.quotes[this._index]);
        this._index += 1;
    }
};
let r = new flowingReadableDemo();
r.on('data', function(data) {
    console.log("Callback read: " + data.toString());
    // flowing状态下,我们无需执行read,仅需要设置data事件处理函数或者设定导流目标pipe
});
r.on('end', function(data) {
    console.log("No more answers.");
});

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
美凌格栋栋酱 美凌格栋栋酱
6个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
3年前
VBox 启动虚拟机失败
在Vbox(5.0.8版本)启动Ubuntu的虚拟机时,遇到错误信息:NtCreateFile(\\Device\\VBoxDrvStub)failed:0xc000000034STATUS\_OBJECT\_NAME\_NOT\_FOUND(0retries) (rc101)Makesurethekern
Wesley13 Wesley13
3年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
PHP创建多级树型结构
<!lang:php<?php$areaarray(array('id'1,'pid'0,'name''中国'),array('id'5,'pid'0,'name''美国'),array('id'2,'pid'1,'name''吉林'),array('id'4,'pid'2,'n
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
3年前
Docker 部署SpringBoot项目不香吗?
  公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。  !(http://dingyue.ws.126.net/2020/0920/b00fbfc7j00qgy5xy002kd200qo00hsg00it00cj.jpg)  2
Python进阶者 Python进阶者
1年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这