owned this note
owned this note
Published
Linked with GitHub
# 資料流
[TOC]
<a href="https://colab.research.google.com/github/davidho27941/ML_tutorial_notebook/blob/main/ETL_tensorflow_basic.ipynb" target="_parent\">
<img src="https://colab.research.google.com/assets/colab-badge.svg\" alt="Open In Colab\">
</a>
## 簡介
$\qquad$在大數據的時代,資料的大小動輒是GB或是TB等級,這樣的資料不可能一次性的被儲存到記憶體之上並提供給機器學習模型進行訓練。這種時候我們需要做事情就是讓資料從硬碟上直接送到GPU本身進行訓練,這樣就能避免記憶體大小對我們的資料造成的限制。在[利用TensorFlow以及PyTorch建立張量](/e3tpZ2mqQwiAqEV9nmZUBQ)章節中,我們簡單示範了如何利用生成器(`generator`)物件實作資料流。在本章節中,我們將示範
1. 如何使用`tensorflow-dataset`所提供的各API進行資料抽取、轉置以及載入(ETL)流程。
2. 如何建立一個基於`tf.keras.utils.Sequence`物件的資料流。
有人可能會問「都有基於生成器的作法了,為何還需要基於前述物件的實作呢?」,原因如下:
1. 當取用資料時,擁有更安全的多進程(Multiprocessing)及多線程(Multi-threading)過程
$\qquad$在利用生成器的實作範例中,生成器只是根據既有的規則以及設定,不斷的從硬碟終將資料送模型。若要實作打散資料或是在每個訓練週期中對資料進行重新排列,使用者往往會需要付出更大的心力進行編寫。同時,因為其資料流已經規則已經在建構初期救定下來,若非使用者在一開始就實作了多線程/多進程,將無法在訓練過程中透過其他方式改善資料流的效率。
2. 更靈活的資料操作
$\qquad$一個繼承了`tf.keras.utils.Sequence`物件的資料流實作,將可以繼承這兩個物件本身已經預先設計好的功能(例如:`on_epoch_end`等功能),使用者無須再花時間及精力來實作一些進階功能。
## 基於`tensorflow-dataset`的資料流實作
### Tensorflow-dataset 簡介
$\qquad$**Tensorflow-dataset**是一由TensorFlow所提供的函式庫,可以整合資料的抽取(Extract)、轉置(Transform)、以及載入(Load)過程[^1]為一體的函式庫。

$\qquad$用戶可以通過Tensorflow-dataset所提供的資料集,或是利用自己準備的資料集,通過Tensorflow-dataset提供的API將資料輕易的導入訓練流程之中。
> 以下將`tensorflow-dataset`簡稱為`tfds`。
1. 資料抽取
用戶可以利用`tfds.load`函式來進行資料的導入。使用時至少需要傳遞以下參數:
* `name`: 資料集的名稱,同時可包含資料集版本。範例:`name=mnist:3.0.0`
* `split`: 所要讀取的部份,如果有分類,也可以直接呼叫分類,例如:`train`。同時可以利用分類加上數值或是比例(i.e. `train[:50000]`或`train[:80%]`)來進行切片。
* `as_supervised`:須為布林值。如果為真,則回傳監督式學習所需要的資料以及標籤;若為否,則回傳一個包含所有特徵的字典物件。
$\qquad$如果並非使用預先提供的資料集,則需要另外宣告`data_dir`來指名資料及的所在位置。另外,若同時希望載入資料相關資料,可以宣告`with_info=True`來取得資訊(若有提供)。
> 更多資訊可以參考[官方文件](https://www.tensorflow.org/datasets/api_docs/python/tfds/load)。
2. 資料轉置
$\qquad$當用戶透過`tfds.load()`函式將資料抽出之後,可以利用所建立的`tf.data.Dataset`物件進行資料的轉置以及載入。以下是幾個基礎的用法:
* 隨機排序: `dataset.shuffle(BUFFER_SIZE)`。
* 重複資料: `dataset.repeat(NUM_EPOCHS)`
* 對資料進行映射: `dataset.map(lambda x: ...)`
* 賦予資料批次: `dataset.batch(BATCH_SIZE)`
3. 資料載入:
$\qquad$當用戶完成自訂的抽出以及轉置流程後,可以利用前述建立的物件進行資料載入。其方法很簡單,可以直接將物件傳遞至`model.fit()`函式中進行訓練,或是利用`.take()`方法查看個別資料的狀況。
$\qquad$以下是一些簡單的範例。
### Tensorflow-dataset 實作範例
#### 利用`tfds.load()`函式抽出資料
```python=
# 建立tf.data.Dataset物件
# tensorflow-dataset允許用戶通過`split`參數宣告要取用的部份
whole_dataset = tfds.load(name="mnist",
split='train+test')
print(f'Content of whole_dataset: {whole_dataset},\nType of whole_dataset: {type(whole_dataset)}.')
```
執行以上程式碼,將會得到以下輸出:
```
Downloading and preparing dataset mnist/3.0.1 (download: 11.06 MiB, generated: 21.00 MiB, total: 32.06 MiB) to /root/tensorflow_datasets/mnist/3.0.1...
WARNING:absl:Dataset mnist is hosted on GCS. It will automatically be downloaded to your
local data directory. If you'd instead prefer to read directly from our public
GCS bucket (recommended if you're running on GCP), you can instead pass
`try_gcs=True` to `tfds.load` or set `data_dir=gs://tfds-data/datasets`.
Dl Completed...: 100%
4/4 [00:01<00:00, 2.51 file/s]
Dataset mnist downloaded and prepared to /root/tensorflow_datasets/mnist/3.0.1. Subsequent calls will reuse this data.
Content of whole_dataset: <PrefetchDataset element_spec={'image': TensorSpec(shape=(28, 28, 1), dtype=tf.uint8, name=None), 'label': TensorSpec(shape=(), dtype=tf.int64, name=None)}>,
Type of whole_dataset: <class 'tensorflow.python.data.ops.dataset_ops.PrefetchDataset'>.
```
此時所指定的資料將會被下載,並同時建立一個`tf.data.Dataset`物件。
#### 利用`tf.data.Datset`類別的自帶方法(method)轉置資料
$\qquad$首先我們可以抽取訓練資料集,並同時將`with_info`以及`as_supervised`參數設定為`True`用以取得資料集資訊以及整理好的`(image, label)`串列。
```python=
# 抽取訓練資料集
train_ds, train_ds_info = tfds.load(name="mnist",
split='train',
with_info=True,
as_supervised=True)
print(f"Length of train_ds: {len(list(train_ds.as_numpy_iterator()))}")
```
我們可以透過輸出確認此資料集長度:
```
Length of train_ds: 60000
```
我們可以透過以下程式碼來確認前幾筆影像以及其標籤:
```python=
# 查看`train_ds`的內容
# 利用matplotlib直接查看影像內容
import matplotlib.pyplot as plt
for image, label in train_ds.take(5):
# transform `image` to numpy array and remove the last chennel.
# i.e. transform from (28, 28, 1) to (28, 28).
image = np.array(image).squeeze()
plt.figure(figsize=(6, 6))
plt.title(f"Label: {label}")
plt.imshow(image, cmap='gray')
```
其輸出為:

接下來,我們可以開始對資料集本身進行轉置的動作。
##### 將訓練資料集重新排序
$\qquad$要對資料集進行重新排序,可以利用`.shuffle(BUFFER_SIZE)`方法來達成。其中的`BUFFER_SIZE`為所選取的資料大小。例如一個具有一萬筆資料的資料集進行重新排序時,將會進行打散。函式將會根據所設定的`BUFFER_SIZE`參數,從打散的資料集中取出前`BUFFER_SIZE`個資料。當需要在每次迭代時重新打散,則可以將`reshuffle_each_iteration`參數設定為`True`。若希望其結果可以重現,可以將`seed`參數設定至一個定值。
以下是簡單的範例:
```python=
# 取得重新排序前的前五筆標籤
pre_label = [label for _, label in train_ds.take(5)]
# 將訓練資料集重新排序
train_ds = train_ds.shuffle(100)
# 重新取得前五個元素
after_label = [label for _, label in train_ds.take(5)]
for pre, after in zip(pre_label, after_label):
print(f"Previous: {pre}, after: {after}")
```
其輸出為:
```
Previous: 4, after: 9
Previous: 1, after: 3
Previous: 0, after: 4
Previous: 7, after: 4
Previous: 8, after: 0
```
##### 利用匿名函式將資料進行正規化
$\qquad$在資料的轉置過程中,我們時常會需要將資料進行正規化,或是做出一些計算等等,這時候可以透過`.map()`方法來進行處理。
以下是將MNIST資料集的影像進行正規化範例:
```python=
# 利用匿名函式將資料進行正規化
def rescale(x, y):
x = x/255
return x, y
train_ds = train_ds.map(lambda x, y: rescale(x, y))
count = 0
for image, _ in train_ds.take(5):
print(f"The maximum value of image is: {np.amax(image)}")
count += 1
```
原本在MNIST資料集中的影像,其值域為`[0~255]`之間,藉由上方的程式碼,我們可以將數值正規化到`[0~1]`的區間。
```
The maximum value of image is: 1.0
The maximum value of image is: 0.9960784316062927
The maximum value of image is: 1.0
The maximum value of image is: 1.0
The maximum value of image is: 1.0
```
##### 將訓練資料進行批次化
$\qquad$在訓練模型時,我們會將資料處理成批次資料送進模型訓練。`tf.data.Dataset`物件也提供了相應的方法來進行批次化。只需要簡單的呼叫`.batch()`方法並設定相應的參數即可。
```
# 將訓練資料進行批次化
train_ds = train_ds.batch(64)
for image, label in train_ds.take(1):
print(image.shape)
```
我們可以藉由輸出發現資料已經被批次化了。
```
(64, 28, 28, 1)
```
> 更加詳細的內容將在[利用Tensorflow-Dataset進行各種文件的導入](/wDgBT081SYSJz0nAzVLQNg)以及[加速並改善Tensorflow-Dataset的效率](/GnKanoulR3ekc_IybVxCyg)中介紹。
## 基於`tf.keras.utils.Sequence`的資料流實作
$\qquad$在[官方文件](https://www.tensorflow.org/api_docs/python/tf/keras/utils/Sequence)中,要求一個繼承`tf.keras.utils.Sequence`的物件必須要有實作`__getitem__()` 以及`__len__()`方法。這兩個方法實際上扮演著什麼樣的角色呢?請看下方解釋:
### 前置解說
1. `__getitem__()`方法:
$\qquad$這一個方法主要是提供物件可以被利用`[]`搭配相對應的位置來取用資料。未實作個方法的套件將無法透過`[]`進行取值。以下是一個實作了`__getitem__()`方法的範例:
```python=
def gen_Fib(n):
if n < 2:
_result = 1
else:
_result = gen_Fib(n-1) + gen_Fib(n-2)
return _result
class Fib():
def __init__(self):
pass
def __getitem__(self, n):
return gen_Fib(n)
```
在上述範例中,我們建構了一個`Fib()`物件,並在其中實作了`__getitem__()`方法,這方法會透過`gen_Fib()`函數計算在斐波那契數列中一個特定位置的值。如果我們想知道斐波那契數列前十個元素的值,可以利用上述物件來計算:
```python=
fib_value = Fib()
for i in range(10):
print(f"Element {i} in Fibonacci sequence is {fib_value[i]}")
```
其輸出為:
```
Element 0 in Fibonacci sequence is 1
Element 1 in Fibonacci sequence is 1
Element 2 in Fibonacci sequence is 2
Element 3 in Fibonacci sequence is 3
Element 4 in Fibonacci sequence is 5
Element 5 in Fibonacci sequence is 8
Element 6 in Fibonacci sequence is 13
Element 7 in Fibonacci sequence is 21
Element 8 in Fibonacci sequence is 34
Element 9 in Fibonacci sequence is 55
```
透過以上範例,我們可以了解到`__getitem__()`方法讓我們可以用`[]`來對一個物件進行取值。
2. `__len__()`方法:
$\qquad$這一個方法主要是讓使用者可以利用`len()`函數來取得一個物件的長度。一個實作了`__len__()`方法的物件就能以`len()`函數來取得其長度。範例如下:
$\qquad$假設有一個資料夾,其內部有10個資料。假設資料集長度等於資料的數量,我們可以透過在`dataset`物件中實作`__len__()`來取得資料的數量,作為資料集的長度來回傳給`len()`函數。
```python=
class dataset():
def __init__(self):
self.path = "file_path/"
def __len__(self):
return len(os.listdir(self.path))
```
若以`len()`函數取得`dataset`物件的長度,則結果為:
```python=
print(f"Length of `dataset` object: {len(dataset)}.")
```
執行結果:
```
Length of `dataset` object: 10.
```
### 實作
$\qquad$理解了`__getitem__()` 以及`__len__()`方法的作用後,我們可以開始建構一個繼承`tf.keras.utils.Sequence`物件屬性的資料流。假設我們的資料存放在`data`資料夾之下,以`train`, `test`,以及`val`三個資料夾區分訓練、測試以及驗證資料集。訓練資料為圖像資料,其檔名為`xxxxx.jpg`;標籤資料為文字資料,其檔名為`xxxxx.txt`,用以表達一個圖像屬於哪一個類別。這個資料集中有800類的圖片,我們建構的資料流會基於指定的索引(index)讀取對應的資料,並回傳影像以及其對應的`one-hot`編碼標籤。
```python=
import tensorflow as tf
import glob
import numpy as np
import cv2
PATH = "./data"
class pipeline(tf.keras.utils.Sequence):
def __init__(self, file_path, mode, transform=None):
self.file_path = file_path
self.img_list = sorted(glob.glob(file_path+f"{mode}/*.jpg"))
self.label_list = sorted(glob.glob(file_path+f"{mode}/*.txt"))
self.transform = transform
def __len__(self):
return len(self.img_path)
def __getitem__(self, index):
if self.transform is not None: # If trnasorm function exist, apply transform to image before return it.
_img = cv2.imread(self.img_path[index])
_img = self.transform(_img)
else:
_img = cv2.imread(self.img_list[index])
with open( self.label_list[index], 'r')as f:
_label = tf.tensor(int(f.readlines()[0]))
return _img, tf.keras.utils.to_categorical(_label, num_classes=800)
```
> 編按:在這裡我們並未有`batch_size`變數,本範例一次僅會回傳一組影像以及標籤。若使用者有需求,可以參考[這篇文章](https://stackoverflow.com/questions/66705131/custom-data-generator-build-from-tf-keras-utils-sequence-doesnt-work-with-tenso)進行實作。
## 參考資料以及範例
1. [Tensorflow-Datasets 官方文件](https://www.tensorflow.org/datasets)
2. [Tensorflow 官方文件](https://www.tensorflow.org/api_docs/python/tf/keras/utils/Sequence)
3. [DeepLearning.ai 的教學資料](https://github.com/https-deeplearning-ai/tensorflow-1-public/tree/main/C3)
###### tags: `Machine Learning` `Notebook` `技術隨筆` `機器學習` `Python` `TensorFlow`
[^1]: 簡稱ETL。