Firmadyne `extractor.py`
===
---
###### tags: `firmadyne` `security`
本篇紀錄閱讀 `extractor.py` source code 筆記
---
**目錄**
[TOC]
# `/sources/extractor/extractor.py`
1. Source Code:
https://github.com/firmadyne/extractor/blob/481fe2850076743d06db052ee4934ffb9a312426/extractor.py
2. Explaining:
在 Line 731~732,定義了進入點為 `main()`
---
main() 中使用了 `argparse.ArgumentParser`
用法參考 https://docs.python.org/3/library/argparse.html
定義了參數 `input` `output` `-sql` `-nf` `-nk` `-np` `-b`
---
在官方 Usage 中是如此呼叫 `extractor.py`:
- `./sources/extractor/extractor.py -b Netgear -sql 127.0.0.1 -np -nk "WNAP320 Firmware Version 2.0.3.zip" images`
所以 `main()` 中 Line 726 行執行 `Extractor` 的 `__init__()` 的參數如下:
- ` extract = Extractor(indir='WNAP320 Firmware Version 2.0.3.zip', outdir='images', rootfs=True, kernel=False, numproc=False, server=127.0.0.1, brand='Netgear')`
---
`Extractor` `__init__()` 執行動作如下:
- ```python==39
self._input = os.path.abspath(indir)
```
self._input 將存著 indir 絕對路徑,此例子為 `${FIRMADYNE}/WNAP320 Firmware Version 2.0.3.zip`
- ```python==41
self.output_dir = os.path.abspath(outdir) if outdir else None
```
若 outdir 有設定,就會存著 outdir 的絕對路徑,否則存 None,此例子為 `${FIRMADYNE}/images`
- ```python==44
self.do_kernel = kernel
```
```python==47
self.do_rootfs = rootfs
```
```python==50
self.brand = brand
```
```python==53
self.database = server
```
將參數存到物件的 member 中
- ```python==56
self._pool = multiprocessing.Pool() if numproc else None
```
若 numproc 為 True,就會將物件 member _pool 設定為 a pool of worker processes,否則就是 None,此例子為 None
- ```python==59
self.visited = set()
```
```python==63
self._list = list()
```
將 visited 和 _list 分別初始化為 class set 和 class list
---
回到 `main()` 中 Line 729,執行了剛剛 construct 的 `Extractor` 的 member function `extract()`,執行動作如下:
- ```python==175
if os.path.isdir(self._input):
```
此例子中 self._input 是 `${FIRMADYNE}/WNAP320 Firmware Version 2.0.3.zip`
他是一個檔案,不是一個目錄, os.path.isdir 回傳 false,不會進入 if
- ```python==179
elif os.path.isfile(self._input):
```
進入這個 elif,執行
- ```python==180
self._list.append(self._input)
```
- ```python==184
if self.output_dir and not os.path.isdir(self.output_dir):
```
此例子中,self.output_dir 有設定,但它是一個目錄,os.path.isdir 回傳 True,不會進入 if
- ```python==187
if self._pool:
```
此例子中,_pool 為 None,進入 else 區域執行
- ```python==190
for item in self._list:
self._extract_item(item)
```
此例子中, _list 只有一個 object,也就是 _input,會執行
`self._extract_item('${FIRMADYNE}/WNAP320 Firmware Version 2.0.3.zip')`
---
`Extractor` `_extract_item()` 執行 `ExtractionItem(self, path, 0).extract()`
作用只是傳遞 `Extractor` object 本身跟 path 給 `ExtractionItem` `__init__()` 初始化,並且執行 `ExtractionItem` `extract()`
此例中 `ExtractionItem` `__init__()` 的實際參數如下:
`__init__(self, extractor=此Extractor物件, path='${FIRMADYNE}/WNAP320 Firmware Version 2.0.3.zip', depth=0, tag=None)`
---
`ExtractionItem` `__init__()` 進行以下動作:
- ```python==212
self.temp = None
```
```python==215
self.depth = depth
```
```python==218
self.extractor = extractor
```
```python==221
self.item = path
```
一些 member 的 initialization
此例子中,這些參數會是
```
.temp=None
.depth=0
.extractor=剛剛的 Extractor 物件
.item='${FIRMADYNE}/WNAP320 Firmware Version 2.0.3.zip'
```
- ```python==224
if self.extractor.database:
```
此例子中, extractor.database 是 `127.0.0.1`,進入 if
連接至 database,並存放在 member self.database 中
- ```python==234
self.checksum = Extractor.io_md5(path)
```
呼叫 static method `Extractor.io_md5()`,並將結果存回 member checksum,`Extractor.io_md5()`可自行去看,不是重點
- ```python==237
self.tag = tag if tag else self.generate_tag()
```
此例子中,tag 是 None,會執行 member function `generate_tag()` 並存回 member tag 中,`generate_tag()` 也可自行去看,簡單來說,那 method 會執行以下事情:
- 查詢 table brand 是否已有此 brand,沒有就 insert 一個,並存回 brand_id
- 查詢 table image 是否有同 checksum 的 image,存回 image_id,沒有就 insert 一個 image
- ```python==303
return str(image_id[0]) if \
image_id else os.path.basename(self.item) + "_" + self.checksum
```
此例子中 image_id 有值,所以會回傳 str(image_id[0])
結果就是此 firmware 在 table image 的 id,此例子為 `"1"`
- ```python==240
self.output = os.path.join(self.extractor.output_dir, self.tag) if \
self.extractor.output_dir else None
```
此例子中, member output 會是 `${FIRMADYNE}/images/1`
- ```python==244
self.terminate = False
self.status = None
self.update_status()
```
設定 member terminate, status,並執行 member function `update_status()`,此 function 做以下事情:
- ```python==322
kernel_done = os.path.isfile(self.get_kernel_path()) if \
self.extractor.do_kernel and self.output else \
not self.extractor.do_kernel
```
此例子中,extractor.do_kernel 是 False,所以存回 `not self.extractor.do_kernel`,也就是將 True 存回 kernel_done
- ```python==325
rootfs_done = os.path.isfile(self.get_rootfs_path()) if \
self.extractor.do_rootfs and self.output else \
not self.extractor.do_rootfs
```
此例子中,extractor.do_rootfs 是 True,且 sele.out 是 `${FIRMADYNE}/images/1`,所以會將 `os.path.isfile(self.get_rootfs_path())` 結果存回 rootfs_done,簡單來說,會去查看有沒有 `${FIRMADYNE}/images/1.tar.gz` 這個檔案,目前是沒有,所以會存 false 到 rootfs_done
- ```python==328
self.status = (kernel_done, rootfs_done)
```
建一個 member tuple status,最終存 `(True, False)`
- ```python==330
if self.database and kernel_done and self.extractor.do_kernel:
```
此例中 `extractor.do_kernel` 為 False,不會進 if
- ```python==333
if self.database and rootfs_done and self.extractor.do_rootfs:
```
此例中 `rootfs_done` 為 False,不會進 if
- ```python==336
return self.get_status()
```
`get_status()` 執行 `return True if self.terminate or all(i for i in self.status) else False`
此例目前 self.terminate 為 False,且 tuple self.status中不是全部元素都是 True,所以回傳 False
---
執行完 `ExtractionItem` `__init__()` 回到 `ExtractionItem` `extract()` 進行以下動作:
- ```python==397
with Extractor.visited_lock:
```
參考 https://docs.python.org/2/library/threading.html#using-locks-conditions-and-semaphores-in-the-with-statement
用 `with` 可以自動為此 block 開關鎖
重點在此
- ```python==420
for analysis in [self._check_archive, self._check_firmware,
self._check_kernel, self._check_rootfs,
self._check_compressed]:
```
```python==424
os.chdir(self.temp)
```
```python==427
if analysis():
```
將當前目錄切換到 temp,注意等等會有一個 binwalk 的參數 `module.extractor.directory` 就會是 temp
此迴圈依序執行 member function `_check_archive()`, `_check_firmware()`, `_check_kernel()`, `_check_rootfs()`, `_check_compressed`
## `_check_archive()`
```python==474
return self._check_recursive("archive")
```
在 `_check_recursive(self, fmt=archive)` 中:
- ```python==638
for module in binwalk.scan(self.item, "-e", "-r", "-y", fmt,
signature=True, quiet=True):
```
此例中,如同下了指令:
- `binwalk --signature --quiet -e -r -y archive '${FIRMADYNE}/WNAP320 Firmware Version 2.0.3.zip'`
- ```python==640
for entry in module.results:
```
```python==647
desc = entry.description
self.printf(">>>> %s" % entry.description)
break
```
將第一個 entry 的 description 存進 desc 後就跳出
- ```python==651
if module.extractor.directory:
```
`module.extractor.directory` 為 member temp 的值,進入 if
- ```python==652
unix = Extractor.io_find_rootfs(module.extractor.directory)
```
static method `Extractor.io_find_rootfs` 簡單來說,會去尋找 filesystem,會做的事情大概如下:
- 如果當前目錄底下,只有一個子目錄,就把當前目錄改成那個子目錄
所以走到最後當前目錄底下不會只有一個子目錄
子目錄要嘛 0 個,要嘛很多個
- 看看目前子目錄中,名字有在 UNIX_DIRS 裡的目錄有幾個
並存到 count 中
- count 若大於 UNIX_THRESHOLD,就回傳 (True, 當前目錄位置),表示成功找到 filesystem
- 否則就回傳 (False, 最初的目錄位置)
- 中間有略過一部分,但其實不是重點
重點是這個 function 是拿來找 filesystem
此例中是沒找到 filesystem 的,所以變數 unix 會是 `(False, self.temp)`
- ```python==655
if unix[0]:
```
不會進入,改進 Line 663 的 else
- ```python==666
for root, _, files in os.walk(module.extractor.directory):
```
Line 666 ~ 702 會走過所有在 module.extractor.directory 底下的目錄,請參考 os.walk()
- ```python==674
if desc and "original file name:" in desc:
```
此例子是不會有 `"original file name:"` 字串,故不會進入 if
- ```python==683
for filename in files:
```
Line 683 ~ 702 會 iterate 過目前目錄 `_` (定義在 Line 666) 底下所有的 filename。
- ```python==684
if count > ExtractionItem.RECURSION_BREADTH:
```
一開始不會進入此 if,改進 Line 689 的 else。
- ```python==690
new_item = ExtractionItem(self.extractor,
os.path.join(root,
filename),
self.depth + 1,
self.tag)
if new_item.extract():
```
以此 file 為 root path 遞迴嘗試 Extraction。
## `_check_firmware()`
```python=481
for module in binwalk.scan(self.item, "-y", "header", signature=True,
quiet=True):
```
此例中,如同下了指令:
- `binwalk --signature --quiet -y header '${FIRMADYNE}/WNAP320 Firmware Version 2.0.3.zip'`
```python=483
for entry in module.results:
```
Loop 每一條結果
```python=485
if "uImage header" in entry.description:
```
若在 description 欄位中有 match pattern `uImage header`
```python=486
if not self.get_kernel_status() and \
"OS Kernel Image" in entry.description:
```
`get_kernel_status` 只是用來回傳 member status[0] 的內容,此例子為 True
所以若 pattern `OS Kernel Image` 也有在 description 中,就會進入 if,如果真的進了:
- ```python=488
kernel_offset = entry.offset + 64
kernel_size = 0
for stmt in entry.description.split(','):
if "image size:" in stmt:
kernel_size = int(''.join(
i for i in stmt if i.isdigit()), 10)
if kernel_size != 0 and kernel_offset + kernel_size \
<= os.path.getsize(self.item):
self.printf(">>>> %s" % entry.description)
tmp_fd, tmp_path = tempfile.mkstemp(dir=self.temp)
os.close(tmp_fd)
Extractor.io_dd(self.item, kernel_offset,
kernel_size, tmp_path)
kernel = ExtractionItem(self.extractor, tmp_path,
self.depth, self.tag)
return kernel.extract()
```
Line 491 ~ 494 抓取 kernel_size 的資訊
Line 502 執行 `Extractor` static method `io_dd`
`io_dd` 主要做的事情就是把 `self.item` 從 `kernel_offset` 開始抓 `kernel_size` bytes 存到 `tmp_path`
Line 504 ~ 507 將 `tmp_path` 設為 root path 再 extract 一次,並且 return
```python=515
elif not self.get_kernel_status() and \
not self.get_rootfs_status() and \
"rootfs offset: " in entry.description and \
"kernel offset: " in entry.description:
```
若剛剛沒找到 pattern `OS Kernel Image`
那就改找 pattern `rootfs offset: ` 和 `kernel offset: `
並且 member status[0]、[1] 都要是 False
但此例中 `get_kernel_status` 回傳 status[0],也就是 True,所以不會進此 elif。
若進了此 elif:
- ```python=519
kernel_offset = 0
kernel_size = 0
rootfs_offset = 0
rootfs_size = 0
```
初始一些參數
- ```python=524
for stmt in entry.description.split(','):
if "kernel offset:" in stmt:
kernel_offset = int(stmt.split(':')[1], 16)
elif "kernel length:" in stmt:
kernel_size = int(stmt.split(':')[1], 16)
elif "rootfs offset:" in stmt:
rootfs_offset = int(stmt.split(':')[1], 16)
elif "rootfs length:" in stmt:
rootfs_size = int(stmt.split(':')[1], 16)
```
- ```python=535
if kernel_offset != rootfs_size and kernel_size == 0 and \
rootfs_size == 0:
kernel_size = rootfs_offset - kernel_offset
rootfs_size = os.path.getsize(self.item) - rootfs_offset
```
抓取、設定參數 `kernel_offset` `kernel_size` `rootfs_offset` `rootfs_size`
- ```python=541
if (kernel_size > 0 and kernel_offset + kernel_size \
<= os.path.getsize(self.item)) and \
(rootfs_size != 0 and rootfs_offset + rootfs_size \
<= os.path.getsize(self.item)):
self.printf(">>>> %s" % entry.description)
tmp_fd, tmp_path = tempfile.mkstemp(dir=self.temp)
os.close(tmp_fd)
Extractor.io_dd(self.item, kernel_offset, kernel_size,
tmp_path)
kernel = ExtractionItem(self.extractor, tmp_path,
self.depth, self.tag)
kernel.extract()
tmp_fd, tmp_path = tempfile.mkstemp(dir=self.temp)
os.close(tmp_fd)
Extractor.io_dd(self.item, rootfs_offset, rootfs_size,
tmp_path)
rootfs = ExtractionItem(self.extractor, tmp_path,
self.depth, self.tag)
rootfs.extract()
return self.update_status()
```
確定 size + offset 在檔案整個 size 範圍內,才做以下事情
- 將 kernel dd 出來,並且當作 root path 做 extraction
- 將 rootfs dd 出來,並且當作 root path 做 extraction
- update status
```python=564
return False
```
如果每個 entry 都不會進上面兩個條件式其中之一,`_check_firmware` 即回傳 False。
## `_check_kernel()`
```python=571
if not self.get_kernel_status():
for module in binwalk.scan(self.item, "-y", "kernel",
signature=True, quiet=True):
for entry in module.results:
if "kernel version" in entry.description:
self.update_database("kernel_version",
entry.description)
if "Linux" in entry.description:
if self.get_kernel_path():
shutil.copy(self.item, self.get_kernel_path())
else:
self.extractor.do_kernel = False
self.printf(">>>> %s" % entry.description)
return True
# VxWorks, etc
else:
self.printf(">>>> Ignoring: %s" % entry.description)
return False
return False
return False
```
Line 571 若 member status[0] 為 False,表示還未抓到 kernel,進入 if 嘗試抓 kernel
Line 572 此例中,如同下了指令:
- `binwalk --signature --quiet -y kernel '${FIRMADYNE}/WNAP320 Firmware Version 2.0.3.zip'`
Line 575 若搜尋到 pattern `kernel version` 做以下事情:
- 設定 database 中,此 iid 的 kernel_version 欄位等於整個 entry.description
- 若搜尋到 pattern `Linux`
- 若有設定 output 位置,member function `get_kernel_path` 回傳要輸出 kernel 的位置。
- 若有 output 位置,則複製原檔案到該輸出的位置
- 沒有設定 output 位置的話,設定 Extractor 的 do_kernel 為 False
- 回傳 True
- 沒搜到 `Linux` 就回傳 False
沒搜到 pattern 就回傳 False
## `_check_rootfs()`
```python=597
if not self.get_rootfs_status():
for module in binwalk.scan(self.item, "-e", "-r", "-y",
"filesystem", signature=True,
quiet=True):
for entry in module.results:
self.printf(">>>> %s" % entry.description)
break
if module.extractor.directory:
unix = Extractor.io_find_rootfs(module.extractor.directory)
if not unix[0]:
self.printf(">>>> Extraction failed!")
return False
self.printf(">>>> Found Linux filesystem in %s!" % unix[1])
if self.output:
shutil.make_archive(self.output, "gztar",
root_dir=unix[1])
else:
self.extractor.do_rootfs = False
return True
return False
```
前面有解釋 `io_find_rootfs` 會做什麼
簡單來說,若用 binwalk 發現含有 `filesystem` 的 entry
將這個路徑丟到 `io_find_rootfs` 找 filesystem
沒找到 fs 或根本沒有這個 entry 都會 return False
其他狀況 return True
Line 614 若有設定 output 路徑,將剛剛 `io_find_rootfs` 抓出來的 fs 路徑輸出到該輸出的位置,這個就是主要產生 `iid.tar.gz` 的 code
沒設定 output 路徑的話,設定 do_rootfs 為 False
## `_check_compressed`
```python=626
return self._check_recursive("compressed")
```
前面有解釋 `_check_recursive` 會做什麼
這裡只是把 binwalk -y 的參數設定為 `compressed`
接下來做的事情的流程跟前面解釋的一樣