本文由淺入深給大家介紹node.js stream api,具體詳情請看下文吧。
基本介紹
在 Node.js 中,讀取文件的方式有兩種,一種是用 fs.readFile ,另外一種是利用 fs.createReadStream 來讀取。
fs.readFile 對於每個 Node.js 使用者來說最熟悉不過了,簡單易懂,很好上手。但它的缺點是會先將數據全部讀入內存,一旦遇到大文件的時候,這種方式讀取的效率就非常低下了。
而 fs.createReadStream 則是通過 Stream 來讀取數據,它會把文件(數據)分割成小塊,然後觸發一些特定的事件,我們可以監聽這些事件,編寫特定的處理函數。這種方式相對上面來說,並不好上手,但它效率非常高。
事實上, Stream 在 Node.js 中並非僅僅用在文件處理上,其他地方也可以看到它的身影,如 process.stdin/stdout , http , tcp sockets , zlib , crypto 等都有用到。
本文是我學習 Node.js 中的 Stream API 中的一點總結,希望對大家有用。
特點
基於事件通訊
可以通過 pipe 來連接流
種類
Readable Stream 可讀數據流
Writeable Stream 可寫數據流
Duplex Stream 雙向數據流,可以同時讀和寫
Transform Stream 轉換數據流,可讀可寫,同時可以轉換(處理)數據
事件
可讀數據流的事件
readable 數據向外流時觸發
data 對於那些沒有顯式暫停的數據流,添加data事件監聽函數,會將數據流切換到流動態,盡快向外提供數據
end 讀取完數據時觸發。注意不能和 writeableStream.end() 混淆,writeableStream 並沒有 end 事件,只有 .end() 方法
close 數據源關閉時觸發
error 讀取數據發生錯誤時觸發
可寫數據流的事件
drain writable.write(chunk) 返回 false 之後,緩存全部寫入完成,可以重新寫入時就會觸發
finish 調用 .end 方法時,所有緩存的數據釋放後觸發,類似於可讀數據流中的 end 事件,表示寫入過程結束
pipe 作為 pipe 目標時觸發
unpipe 作為 unpipe 目標時觸發
error 寫入數據發生錯誤時觸發
狀態
可讀數據流有兩種狀態: 流動態 和 暫停態 ,改變數據流狀態的方法如下:
暫停態 -> 流動態
添加 data 事件的監聽函數
調用 resume 方法
調用 pipe 方法
注意:如果轉為流動態時,沒有 data 事件的監聽函數,也沒有 pipe 方法的目的地,那麼數據將遺失。
流動態 -> 暫停態
不存在 pipe 方法的目的地時,調用 pause 方法
存在 pipe 方法的目的地時,移除所有 data 事件的監聽函數,並且調用 unpipe 方法,移除所有 pipe 方法的目的地
注意:只移除 data 事件的監聽函數,並不會自動引發數據流進入「暫停態」。另外,存在 pipe 方法的目的地時,調用 pause 方法,並不能保證數據流總是處於暫停態,一旦那些目的地發出數據請求,數據流有可能會繼續提供數據。
用法
讀寫文件
var fs = require('fs'); // 新建可讀數據流 var rs = fs.createReadStream('./test1.txt'); // 新建可寫數據流 var ws = fs.createWriteStream('./test2.txt'); // 監聽可讀數據流結束事件 rs.on('end', function() { console.log('read text1.txt successfully!'); }); // 監聽可寫數據流結束事件 ws.on('finish', function() { console.log('write text2.txt successfully!'); }); // 把可讀數據流轉換成流動態,流進可寫數據流中 rs.pipe(ws); 讀取 CSV 文件,並上傳數據(我在生產環境中寫過) var fs = require('fs'); var es = require('event-stream'); var csv = require('csv'); var parser = csv.parse(); var transformer = csv.transform(function(record) { return record.join(','); }); var data = fs.createReadStream('./demo.csv'); data .pipe(parser) .pipe(transformer) // 處理前一個 stream 傳遞過來的數據 .pipe(es.map(function(data, callback) { upload(data, function(err) { callback(err); }); })) // 相當於監聽前一個 stream 的 end 事件 .pipe(es.wait(function(err, body) { process.stdout.write('done!'); }));
更多用法
可以參考一下 https://github.com/jeresig/node-stream-playground ,進去示例網站之後直接點 add stream 就能看到結果了。
常見坑
用 rs.pipe(ws) 的方式來寫文件並不是把 rs 的內容 append 到 ws 後面,而是直接用 rs 的內容覆蓋 ws 原有的內容
已結束/關閉的流不能重復使用,必須重新創建數據流
pipe 方法返回的是目標數據流,如 a.pipe(b) 返回的是 b,因此監聽事件的時候請注意你監聽的對象是否正確
如果你要監聽多個數據流,同時你又使用了 pipe 方法來串聯數據流的話,你就要寫成:
data
.on('end', function() { console.log('data end'); }) .pipe(a) .on('end', function() { console.log('a end'); }) .pipe(b) .on('end', function() { console.log('b end'); });
常用類庫
event-stream 用起來有函數式編程的感覺,個人比較喜歡
awesome-nodejs#streams 由於其他 stream 庫我都沒用過,所以有需求的就直接看這裡吧
以上內容是小編給大家介紹的Node.js 中 Stream API 的使用,希望大家喜歡。