# NodeJS streaming source code divein
- [File Stream](https://frontendmasters.com/courses/realtime-html5-nodejs/file-streams/) - [Handling Requests](https://frontendmasters.com/courses/realtime-html5-nodejs/handling-requests/) (by Brian)
- File Streams
- 用途
如果使用 [fs](https://nodejs.org/docs/latest-v14.x/api/fs.html).readFile(pathToFile, function (err, data) {}) API,也可以拿來讀檔案(還有其他刪除、寫入等動作),但底層會將檔案內容一次全部讀進記憶體中,然後在 callback 透過 data 一次回傳一大包,這樣不僅在執行時會導致系統記憶體衝高,當檔案內容過大時還會讀取失敗(檔案大小大於空閒的記憶體)。
透過 file stream 來解決這個問題,它會對檔案逐塊讀取資料、處理內容。
課成用到的主要是 Readable streams 跟 Writable streams
- `ASQ()`
Create an empty **sequence** which is ready for chaining,課程中利用這個套件來處理流程。(前面 [file i/o 章節](https://www.notion.so/Real-Time-Web-with-Node-js-dd8c2b31574842f99a05f187dff745f4)有介紹並用到)
預期最後用 ASQ 處理過函數的地方,會長成這樣:
```jsx
collectPostData
.copy(args.file) // 執行動作
.val((msg) => {
// 執行成功
})
.or((err) => {
// 執行失敗
});
```
- Node Stream: [stream 文件參考](https://nodejs.org/docs/latest-v12.x/api/stream.html),另外[這篇文章](https://www.eebreakdown.com/2016/10/nodejs-streams.html)蠻詳細地介紹 Node 的 stream
- 定義:
A stream is an abstract interface for working with streaming data in Node.js. The stream module provides an API for implementing the stream interface.
一個抽象化的 interface,讓開發時可以對資料做控制、存取。過程並非一次讀入整個檔案內容,而是將檔案拆成好幾個 chunk,然後放入 buffer 中。等到部份的 chunk 讀寫完後,再從 buffer 中抓一些 chunk 出來讀寫。
**推薦閱讀官方文件的[講解](https://nodejs.org/docs/latest-v14.x/api/stream.html#stream_organization_of_this_document),可以比較完整地了解整個 stream 運作機制。**
補充:何謂 Buffer in Node.js?(前面 [file i/o 章節有提過相關的 ArrayBuffer](https://www.notion.so/Real-Time-Web-with-Node-js-dd8c2b31574842f99a05f187dff745f4))
Buffer objects are used to represent a fixed-length sequence of bytes. Many Node.js APIs support Buffers.
`JavaScript` 語言本身沒有二進制資料類型,如果要處理像文件檔案,必須使用二進制的資料。所以在 Node.js 中定義了一個 Buffer 的 Class,用來建立一個專門存放二進制資料的緩存區。
更多關於 Buffer 可參考[文章](https://morosedog.gitlab.io/nodejs-20200123-Nodejs-11/) & [文件](https://nodejs.org/api/buffer.html)
- 從一點點原始碼看何謂 Stream (v14.x)
[stream.js]
位置:[https://github.com/nodejs/node/blob/v14.x/lib/stream.js](https://github.com/nodejs/node/blob/v14.x/lib/stream.js)
定義的地方:
[https://github.com/nodejs/node/blob/v14.x/lib/internal/streams/legacy.js#L10](https://github.com/nodejs/node/blob/v14.x/lib/internal/streams/legacy.js#L10)
- Stream 這個 Class 以靜態屬性 export 出了 **Readable、Writable、Duplex、Transform 以及 PassThrough** 這幾個 Classes。
- Stream 繼承了 [EventEmitter](https://nodejs.org/dist/latest-v14.x/docs/api/events.html#events_class_eventemitter),所以它可以 fire event,開發時也可以監聽 stream 的 event。([原始碼](https://github.com/nodejs/node/blob/v14.17.1/lib/events.js#L85))
- 補充說明,什麼是 [EventEmitter](https://nodejs.org/dist/latest-v14.x/docs/api/events.html#events_class_eventemitter) ?
The EventEmitter class is defined and exposed by the events module, All EventEmitters emit the event 'newListener' when new listeners are added and 'removeListener' when existing listeners are removed.
從使用角度來看,也是一個 Class,new 出來的 Instance 可以呼叫 .emit(event, ...) 以及 .on(event, listener) 這兩個方法。.on() 可以註冊 event listener,.emit() 則是 fire event。一旦 event 發生,監聽該 event 的 handler 會被執行。
從[原始碼](https://github.com/nodejs/node/blob/3ce447f6a8098dba9bb0431b122e30b1e7abc1a2/lib/events.js#L83)看
- [init](https://github.com/nodejs/node/blob/3ce447f6a8098dba9bb0431b122e30b1e7abc1a2/lib/events.js#L192) 函數:
一個內部的物件 `this._events = ObjectCreate(null)`,這個物件會以 event type (事件名稱) 當 key,而註冊進來的 listener 作為 value。如果一個事件有多個 listener,value 就會是一個按註冊順序儲存 handler callback 的陣列。
```jsx
EventEmitter.init = function(opts) {
if (this._events === undefined ||
this._events === ObjectGetPrototypeOf(this)._events) {
this._events = ObjectCreate(null); // 這個物件用來管理註冊進來的 listeners
this._eventsCount = 0;
}
this._maxListeners = this._maxListeners || undefined;
if (opts && opts.captureRejections) {
if (typeof opts.captureRejections !== 'boolean') {
throw new ERR_INVALID_ARG_TYPE('options.captureRejections',
'boolean', opts.captureRejections);
}
this[kCapture] = Boolean(opts.captureRejections);
} else {
// Assigning the kCapture property directly saves an expensive
// prototype lookup in a very sensitive hot path.
this[kCapture] = EventEmitter.prototype[kCapture];
}
}
```
- [on](https://github.com/nodejs/node/blob/3ce447f6a8098dba9bb0431b122e30b1e7abc1a2/lib/events.js#L470) 函數:
原來它是 addListener 的別名
```jsx
EventEmitter.prototype.on = EventEmitter.prototype.addListener;
```
```jsx
EventEmitter.prototype.addListener = function addListener(type, listener) {
return _addListener(this, type, listener, false);
};
```
直接看 [_addListener](https://github.com/nodejs/node/blob/3ce447f6a8098dba9bb0431b122e30b1e7abc1a2/lib/events.js#L404) :
```jsx
function _addListener(target, type, listener, prepend) {
let m;
let **events**;
let **existing**;
checkListener(listener);
**events = target._events;**
if (events === undefined) {
// init 做的事情
events = target._events = ObjectCreate(null);
target._eventsCount = 0;
} else {
// 這段*~~不想看~~*還沒認真看
// To avoid recursion in the case that type === "newListener"! Before
// adding it to the listeners, first emit "newListener".
if (events.newListener !== undefined) {
target.emit('newListener', type,
listener.listener ? listener.listener : listener);
// Re-assign `events` because a newListener handler could have caused the
// this._events to be assigned to a new object
events = target._events;
}
**existing = events[type];**
}
if (existing === undefined) {
// 假如 event type 不存在, 表示還沒註冊過,就以 type 當 key, 把 listener 當 value 塞進去
// Optimize the case of one listener. Don't need the extra array object.
**events[type] = listener;
++target._eventsCount;**
} else {
// 如果 event type 已存在, 且它的值是函式, 就要改用陣列來存
// 如果已經是陣列, 就繼續把新的 listener push/unshift 進去
// 有一個 prepend 參數,是 boolean 值
// 如果想把一個 event handler 直接插在第一個就給 true
// 對應到的 api 是 [prependListener](https://nodejs.org/dist/latest-v14.x/docs/api/events.html#events_emitter_prependlistener_eventname_listener)
if (**typeof existing === 'function'**) {
// Adding the second element, need to change to array.
**existing = events[type] =
prepend ? [listener, existing] : [existing, listener];**
// If we've already got an array, just append.
} else if (prepend) {
**existing.unshift(listener);**
} else {
**existing.push(listener);**
}
//這段也*~~不想看~~*還沒看
// Check for listener leak
m = _getMaxListeners(target);
if (m > 0 && existing.length > m && !existing.warned) {
existing.warned = true;
// No error code for this since it is a Warning
// eslint-disable-next-line no-restricted-syntax
const w = new Error('Possible EventEmitter memory leak detected. ' +
`${existing.length} ${String(type)} listeners ` +
`added to ${inspect(target, { depth: -1 })}. Use ` +
'emitter.setMaxListeners() to increase limit');
w.name = 'MaxListenersExceededWarning';
w.emitter = target;
w.type = type;
w.count = existing.length;
process.emitWarning(w);
}
}
return target;
}
```
- [emit](https://github.com/nodejs/node/blob/3ce447f6a8098dba9bb0431b122e30b1e7abc1a2/lib/events.js#L325) 函數:
```jsx
EventEmitter.prototype.emit = function emit(type, ...args) {
// 把 error type 拉出來處理 ...略
const events = this._events;
// 把 error type 拉出來處理 ...略
const handler = events[type]; // 找出 handler
if (handler === undefined)
// 若沒有 event type 的 handler, 直接 return
return false;
if (typeof handler === 'function') {
// 使用上跟 .apply(this, ...args) 很像,[參考文件](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Reflect/apply)
const result = ReflectApply(handler, this, args);
// 讓 event handler 的 this 指向 instance 並執行
// 很像 element.addEventListener('click', function() {
// console.log(this) <- this 指向 element
// })
// We check if result is undefined first because that
// is the most common case so we do not pay any perf
// penalty
if (result !== undefined && result !== null) {
addCatch(this, result, type, args);
}
} else {
// 多個 event handler,按順序跑回圈執行
const len = handler.length;
const listeners = arrayClone(handler);
for (let i = 0; i < len; ++i) {
const result = ReflectApply(listeners[i], this, args);
// We check if result is undefined first because that
// is the most common case so we do not pay any perf
// penalty.
// This code is duplicated because extracting it away
// would make it non-inlineable.
if (result !== undefined && result !== null) {
addCatch(this, result, type, args);
}
}
}
return true;
};
```
每一次的 emit,都會跑一段同步的程式碼,一個 callback 執行完再接著下一個,直到所有 handler 執行結束。
補充:EventEmitter 跟 Event Loop 的關係是什麼呢?
EventEmitter 當中**沒有任何非同步**的程式。Node 提供 EventEmitter,是讓開發者創造 event pattern 的工具。它的本身與 Event Loop 並無關係。

只是大部分使用情境會是通知某個非同步的工作完成、或是某非同步事件發生(讀檔完成、斷線、socket 關閉等等),但 event handler 執行本身並沒有非同步。
等於在瀏覽器環境下常用到 dom api 相關的 event,運行上與 node 的 EventEmitter 完全不同。
- Stream 裡面包含了 Buffer:專門存放二進制資料的緩存區。
以 [Readable](https://github.com/nodejs/node/blob/v14.x/lib/internal/streams/readable.js#L187) 為例:
```jsx
function Readable(options) {
if (!(this instanceof Readable))
return new Readable(options);
// Checking for a Stream.Duplex instance is faster here instead of inside
// the ReadableState constructor, at least with V8 6.5.
const isDuplex = this instanceof Stream.Duplex;
//
this._readableState = new **ReadableState**(options, this, isDuplex); // <-- 狀態物件
Stream.call(this, options);
}
```
每個 Stream instance 都會有一個狀態 this._readableState,透過 [ReadableState](https://github.com/nodejs/node/blob/3ce447f6a8098dba9bb0431b122e30b1e7abc1a2/deps/npm/node_modules/duplexify/node_modules/readable-stream/lib/_stream_readable.js#L101) :
```jsx
function ReadableState(options, stream) {
// ... 略
**// the point at which it stops calling _read() to fill the buffer**
// Note: 0 is a valid value, means "don't call _read preemptively ever"
var hwm = options.highWaterMark;
var readableHwm = options.readableHighWaterMark;
var defaultHwm = this.objectMode ? 16 : **16 * 1024**;
if (hwm || hwm === 0) this.highWaterMark = hwm;
else if (isDuplex && (readableHwm || readableHwm === 0)) this.highWaterMark = readableHwm;
else this.highWaterMark = defaultHwm;
// cast to ints.
this.highWaterMark = Math.floor(this.highWaterMark);
// ... 略
**// A linked list is used to store data chunks instead of an array because the
// linked list can remove elements from the beginning faster than
// array.shift()**
**this.buffer = new BufferList();**
}
```
從這邊可以得知,所謂的 Buffer 其實是一個 linked-list。([BufferList](https://github.com/nodejs/node/blob/3ce447f6a8098dba9bb0431b122e30b1e7abc1a2/deps/npm/node_modules/concat-stream/node_modules/readable-stream/lib/internal/streams/BufferList.js#L13))
另外 Stream 有一個叫 [highWaterMark](https://nodejs.org/api/stream.html#stream_new_stream_readable_options) (HWM) 的屬性,用來限制 buffer 最大可容納的 bytes。
相關的原始碼可從 [read](https://github.com/nodejs/node/blob/3ce447f6a8098dba9bb0431b122e30b1e7abc1a2/deps/npm/node_modules/duplexify/node_modules/readable-stream/lib/_stream_readable.js#L374) 函數來追:
read 函數會在 [data event 發生後執行](https://nodejs.org/api/stream.html#stream_event_data),裡面的 howMuchToRead 負責計算還有多少要 read:
```jsx
Readable.prototype.read = function (n) {
debug('read', n);
n = parseInt(n, 10);
var state = this._readableState;
// ...略
**n = howMuchToRead(n, state);**
```
[howMuchToRead](https://github.com/nodejs/node/blob/3ce447f6a8098dba9bb0431b122e30b1e7abc1a2/deps/npm/node_modules/duplexify/node_modules/readable-stream/lib/_stream_readable.js#L355) 函數:
```jsx
function howMuchToRead(n, state) {
if (n <= 0 || state.length === 0 && state.ended) return 0;
if (state.objectMode) return 1;
if (n !== n) {
// Only flow one buffer at a time
if (state.flowing && state.length) return state.buffer.head.data.length;else return state.length;
}
**// If we're asking for more than the current hwm, then raise the hwm.
if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n);**
if (n <= state.length) return n;
// Don't have enough
if (!state.ended) {
state.needReadable = true;
return 0;
}
return state.length;
}
```
[computeNewHighWaterMark](https://github.com/nodejs/node/blob/3ce447f6a8098dba9bb0431b122e30b1e7abc1a2/deps/npm/node_modules/concat-stream/node_modules/readable-stream/lib/_stream_readable.js#L336) 這個函數則是限制了 buffer 最大可容納的 bytes 數:
```jsx
// Don't raise the hwm > 8MB
var MAX_HWM = 0x800000;
function computeNewHighWaterMark(n) {
if (n >= MAX_HWM) {
n = MAX_HWM;
} else {
// Get the next highest power of 2 to prevent increasing hwm excessively in
// tiny amounts
n--;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
n++;
}
return n;
}
```
從這邊可以得知 Stream 有一個叫 highWaterMark (HWM) 的屬性,用來限制 buffer 最大可容納的 bytes,預設值是 16 kB,最大值不可超過 8 MB。([object mode](https://nodejs.org/docs/latest-v14.x/api/stream.html#stream_object_mode) 是另一回事)
雖然 Readable、Writable、Duplex、Transform 應該會有不同的實作,但是關於 Stream 是 EventEmitter 與 Buffer 的組合,其它應該也具有相同性質。
**綜合以上可以得知,Stream 就是 EventEmitter 與 Buffer 的組合。**
- 原始碼連結:最新的 [lts](https://github.com/nodejs/node/blob/aa15690b6446c56f36292d38ef064c095a374ba7/lib/stream.js) (v14.17.0)
- Readable Stream:
[12.22.1 文件](https://nodejs.org/docs/latest-v12.x/api/stream.html#stream_readable_streams),[14.17.0 文件](https://nodejs.org/docs/latest-v14.x/api/stream.html#stream_class_stream_readable),在 v0.9.4 加進去的:The Readable stream API evolved across multiple Node.js versions and provides multiple methods of consuming stream data. 基本上不同 version 都有同樣的 api 可使用。(下面 Writable Stream 亦同)
Readable streams use the EventEmitter API for notifying application code when data is available to be read off the stream. That available data can be read from the stream in multiple ways.
提供多種方法,利用它來讀取 data。
- Writable Stream: [文件](https://nodejs.org/docs/latest-v14.x/api/stream.html#stream_class_stream_writable),Writable streams expose methods such as write() and end() that are used to write data onto the stream.
提供多種方法,利用它來寫入 data。
- Piping Stream: stream 的 pipe() API
先舉個例子說明 pipe 的概念:
類 Unix 系統在 terminal 的 pipe 可以串連多個指令,作用是把前一個執行的 output,當成下一個執行的 input,例如:
```bash
ls / | sort -r | grep e
```
把對根目錄 / 執行 ls 出來的結果,當成執行 sort 的輸入,執行後排序好的輸出,再當成 grep 的輸入,然後找出含有 e 字母的字串。
同樣的概念也可以再 stream 上實現,透過 stream 的 [pipe() API](https://nodejs.org/docs/latest-v14.x/api/stream.html#stream_readable_pipe_destination_options):
stream pipe() 的用途是把一個 Readable Stream (source) 接給 Writable Stream、Transform Stream 或 DuplexStream 等,pipe() 會再回傳一個 Readable Stream,讓 pipe() 能夠一直串連下去。
這讓開發時處理資料流更加方便,可以寫出像這樣 `A.pipe(B).pipe(C)` 的程式碼,例如 Node 官方上的檔案壓縮範例:
```jsx
const gzip = zlib.createGzip();
const fs = require('fs');
const inp = fs.createReadStream('input.txt');
const out = fs.createWriteStream('input.txt.gz');
inp.pipe(gzip).pipe(out);
```
node 官方也有在文件提到用 stream 處理資料推薦用這個 pipe api(Use of the [readable.pipe()](https://nodejs.org/docs/latest-v14.x/api/stream.html#stream_readable_pipe_destination_options) method is recommended for most users as it has been implemented to provide the easiest way of consuming stream data.)
- Node as a web server and Handle Request
從一個基本的 http server 開始。
範例 code:
```jsx
const http = require('http');
function handleHTTP(req, res) {
if (req.method === 'GET') {
if (req.url === '/') {
res.writeHead(200, {
"Content-type": 'text/html'
});
res.end(`Hello World: ${Math.random()}`);
} else {
res.writeHead(404);
res.end('Cannot find it!');
}
return;
}
res.writeHead(403);
res.end('No, no, do not do that!');
}
const host = 'localhost';
const port = 8006;
const httpServer = http.createServer(handleHTTP);
httpServer.listen(port, host);
```
http module:
The HTTP interfaces in Node.js are designed to support many features of the protocol which have been traditionally difficult to use. In particular, large, possibly chunk-encoded, messages. 是一個原生的 module,它提供 [createServer()](https://nodejs.org/docs/latest-v14.x/api/http.html#http_http_createserver_options_requestlistener) 這個函數去建立一個 http Server,參數要傳入一個函數,這個函數中接收 request 及 response 兩個參數,代表的就是 request 跟 response。