Firmadyne `extractor.py` === --- ###### tags: `firmadyne` `security` 本篇紀錄閱讀 `extractor.py` source code 筆記 --- **目錄** [TOC] # `/sources/extractor/extractor.py` 1. Source Code: https://github.com/firmadyne/extractor/blob/481fe2850076743d06db052ee4934ffb9a312426/extractor.py 2. Explaining: 在 Line 731~732,定義了進入點為 `main()` --- main() 中使用了 `argparse.ArgumentParser` 用法參考 https://docs.python.org/3/library/argparse.html 定義了參數 `input` `output` `-sql` `-nf` `-nk` `-np` `-b` --- 在官方 Usage 中是如此呼叫 `extractor.py`: - `./sources/extractor/extractor.py -b Netgear -sql 127.0.0.1 -np -nk "WNAP320 Firmware Version 2.0.3.zip" images` 所以 `main()` 中 Line 726 行執行 `Extractor` 的 `__init__()` 的參數如下: - ` extract = Extractor(indir='WNAP320 Firmware Version 2.0.3.zip', outdir='images', rootfs=True, kernel=False, numproc=False, server=127.0.0.1, brand='Netgear')` --- `Extractor` `__init__()` 執行動作如下: - ```python==39 self._input = os.path.abspath(indir) ``` self._input 將存著 indir 絕對路徑,此例子為 `${FIRMADYNE}/WNAP320 Firmware Version 2.0.3.zip` - ```python==41 self.output_dir = os.path.abspath(outdir) if outdir else None ``` 若 outdir 有設定,就會存著 outdir 的絕對路徑,否則存 None,此例子為 `${FIRMADYNE}/images` - ```python==44 self.do_kernel = kernel ``` ```python==47 self.do_rootfs = rootfs ``` ```python==50 self.brand = brand ``` ```python==53 self.database = server ``` 將參數存到物件的 member 中 - ```python==56 self._pool = multiprocessing.Pool() if numproc else None ``` 若 numproc 為 True,就會將物件 member _pool 設定為 a pool of worker processes,否則就是 None,此例子為 None - ```python==59 self.visited = set() ``` ```python==63 self._list = list() ``` 將 visited 和 _list 分別初始化為 class set 和 class list --- 回到 `main()` 中 Line 729,執行了剛剛 construct 的 `Extractor` 的 member function `extract()`,執行動作如下: - ```python==175 if os.path.isdir(self._input): ``` 此例子中 self._input 是 `${FIRMADYNE}/WNAP320 Firmware Version 2.0.3.zip` 他是一個檔案,不是一個目錄, os.path.isdir 回傳 false,不會進入 if - ```python==179 elif os.path.isfile(self._input): ``` 進入這個 elif,執行 - ```python==180 self._list.append(self._input) ``` - ```python==184 if self.output_dir and not os.path.isdir(self.output_dir): ``` 此例子中,self.output_dir 有設定,但它是一個目錄,os.path.isdir 回傳 True,不會進入 if - ```python==187 if self._pool: ``` 此例子中,_pool 為 None,進入 else 區域執行 - ```python==190 for item in self._list: self._extract_item(item) ``` 此例子中, _list 只有一個 object,也就是 _input,會執行 `self._extract_item('${FIRMADYNE}/WNAP320 Firmware Version 2.0.3.zip')` --- `Extractor` `_extract_item()` 執行 `ExtractionItem(self, path, 0).extract()` 作用只是傳遞 `Extractor` object 本身跟 path 給 `ExtractionItem` `__init__()` 初始化,並且執行 `ExtractionItem` `extract()` 此例中 `ExtractionItem` `__init__()` 的實際參數如下: `__init__(self, extractor=此Extractor物件, path='${FIRMADYNE}/WNAP320 Firmware Version 2.0.3.zip', depth=0, tag=None)` --- `ExtractionItem` `__init__()` 進行以下動作: - ```python==212 self.temp = None ``` ```python==215 self.depth = depth ``` ```python==218 self.extractor = extractor ``` ```python==221 self.item = path ``` 一些 member 的 initialization 此例子中,這些參數會是 ``` .temp=None .depth=0 .extractor=剛剛的 Extractor 物件 .item='${FIRMADYNE}/WNAP320 Firmware Version 2.0.3.zip' ``` - ```python==224 if self.extractor.database: ``` 此例子中, extractor.database 是 `127.0.0.1`,進入 if 連接至 database,並存放在 member self.database 中 - ```python==234 self.checksum = Extractor.io_md5(path) ``` 呼叫 static method `Extractor.io_md5()`,並將結果存回 member checksum,`Extractor.io_md5()`可自行去看,不是重點 - ```python==237 self.tag = tag if tag else self.generate_tag() ``` 此例子中,tag 是 None,會執行 member function `generate_tag()` 並存回 member tag 中,`generate_tag()` 也可自行去看,簡單來說,那 method 會執行以下事情: - 查詢 table brand 是否已有此 brand,沒有就 insert 一個,並存回 brand_id - 查詢 table image 是否有同 checksum 的 image,存回 image_id,沒有就 insert 一個 image - ```python==303 return str(image_id[0]) if \ image_id else os.path.basename(self.item) + "_" + self.checksum ``` 此例子中 image_id 有值,所以會回傳 str(image_id[0]) 結果就是此 firmware 在 table image 的 id,此例子為 `"1"` - ```python==240 self.output = os.path.join(self.extractor.output_dir, self.tag) if \ self.extractor.output_dir else None ``` 此例子中, member output 會是 `${FIRMADYNE}/images/1` - ```python==244 self.terminate = False self.status = None self.update_status() ``` 設定 member terminate, status,並執行 member function `update_status()`,此 function 做以下事情: - ```python==322 kernel_done = os.path.isfile(self.get_kernel_path()) if \ self.extractor.do_kernel and self.output else \ not self.extractor.do_kernel ``` 此例子中,extractor.do_kernel 是 False,所以存回 `not self.extractor.do_kernel`,也就是將 True 存回 kernel_done - ```python==325 rootfs_done = os.path.isfile(self.get_rootfs_path()) if \ self.extractor.do_rootfs and self.output else \ not self.extractor.do_rootfs ``` 此例子中,extractor.do_rootfs 是 True,且 sele.out 是 `${FIRMADYNE}/images/1`,所以會將 `os.path.isfile(self.get_rootfs_path())` 結果存回 rootfs_done,簡單來說,會去查看有沒有 `${FIRMADYNE}/images/1.tar.gz` 這個檔案,目前是沒有,所以會存 false 到 rootfs_done - ```python==328 self.status = (kernel_done, rootfs_done) ``` 建一個 member tuple status,最終存 `(True, False)` - ```python==330 if self.database and kernel_done and self.extractor.do_kernel: ``` 此例中 `extractor.do_kernel` 為 False,不會進 if - ```python==333 if self.database and rootfs_done and self.extractor.do_rootfs: ``` 此例中 `rootfs_done` 為 False,不會進 if - ```python==336 return self.get_status() ``` `get_status()` 執行 `return True if self.terminate or all(i for i in self.status) else False` 此例目前 self.terminate 為 False,且 tuple self.status中不是全部元素都是 True,所以回傳 False --- 執行完 `ExtractionItem` `__init__()` 回到 `ExtractionItem` `extract()` 進行以下動作: - ```python==397 with Extractor.visited_lock: ``` 參考 https://docs.python.org/2/library/threading.html#using-locks-conditions-and-semaphores-in-the-with-statement 用 `with` 可以自動為此 block 開關鎖 重點在此 - ```python==420 for analysis in [self._check_archive, self._check_firmware, self._check_kernel, self._check_rootfs, self._check_compressed]: ``` ```python==424 os.chdir(self.temp) ``` ```python==427 if analysis(): ``` 將當前目錄切換到 temp,注意等等會有一個 binwalk 的參數 `module.extractor.directory` 就會是 temp 此迴圈依序執行 member function `_check_archive()`, `_check_firmware()`, `_check_kernel()`, `_check_rootfs()`, `_check_compressed` ## `_check_archive()` ```python==474 return self._check_recursive("archive") ``` 在 `_check_recursive(self, fmt=archive)` 中: - ```python==638 for module in binwalk.scan(self.item, "-e", "-r", "-y", fmt, signature=True, quiet=True): ``` 此例中,如同下了指令: - `binwalk --signature --quiet -e -r -y archive '${FIRMADYNE}/WNAP320 Firmware Version 2.0.3.zip'` - ```python==640 for entry in module.results: ``` ```python==647 desc = entry.description self.printf(">>>> %s" % entry.description) break ``` 將第一個 entry 的 description 存進 desc 後就跳出 - ```python==651 if module.extractor.directory: ``` `module.extractor.directory` 為 member temp 的值,進入 if - ```python==652 unix = Extractor.io_find_rootfs(module.extractor.directory) ``` static method `Extractor.io_find_rootfs` 簡單來說,會去尋找 filesystem,會做的事情大概如下: - 如果當前目錄底下,只有一個子目錄,就把當前目錄改成那個子目錄 所以走到最後當前目錄底下不會只有一個子目錄 子目錄要嘛 0 個,要嘛很多個 - 看看目前子目錄中,名字有在 UNIX_DIRS 裡的目錄有幾個 並存到 count 中 - count 若大於 UNIX_THRESHOLD,就回傳 (True, 當前目錄位置),表示成功找到 filesystem - 否則就回傳 (False, 最初的目錄位置) - 中間有略過一部分,但其實不是重點 重點是這個 function 是拿來找 filesystem 此例中是沒找到 filesystem 的,所以變數 unix 會是 `(False, self.temp)` - ```python==655 if unix[0]: ``` 不會進入,改進 Line 663 的 else - ```python==666 for root, _, files in os.walk(module.extractor.directory): ``` Line 666 ~ 702 會走過所有在 module.extractor.directory 底下的目錄,請參考 os.walk() - ```python==674 if desc and "original file name:" in desc: ``` 此例子是不會有 `"original file name:"` 字串,故不會進入 if - ```python==683 for filename in files: ``` Line 683 ~ 702 會 iterate 過目前目錄 `_` (定義在 Line 666) 底下所有的 filename。 - ```python==684 if count > ExtractionItem.RECURSION_BREADTH: ``` 一開始不會進入此 if,改進 Line 689 的 else。 - ```python==690 new_item = ExtractionItem(self.extractor, os.path.join(root, filename), self.depth + 1, self.tag) if new_item.extract(): ``` 以此 file 為 root path 遞迴嘗試 Extraction。 ## `_check_firmware()` ```python=481 for module in binwalk.scan(self.item, "-y", "header", signature=True, quiet=True): ``` 此例中,如同下了指令: - `binwalk --signature --quiet -y header '${FIRMADYNE}/WNAP320 Firmware Version 2.0.3.zip'` ```python=483 for entry in module.results: ``` Loop 每一條結果 ```python=485 if "uImage header" in entry.description: ``` 若在 description 欄位中有 match pattern `uImage header` ```python=486 if not self.get_kernel_status() and \ "OS Kernel Image" in entry.description: ``` `get_kernel_status` 只是用來回傳 member status[0] 的內容,此例子為 True 所以若 pattern `OS Kernel Image` 也有在 description 中,就會進入 if,如果真的進了: - ```python=488 kernel_offset = entry.offset + 64 kernel_size = 0 for stmt in entry.description.split(','): if "image size:" in stmt: kernel_size = int(''.join( i for i in stmt if i.isdigit()), 10) if kernel_size != 0 and kernel_offset + kernel_size \ <= os.path.getsize(self.item): self.printf(">>>> %s" % entry.description) tmp_fd, tmp_path = tempfile.mkstemp(dir=self.temp) os.close(tmp_fd) Extractor.io_dd(self.item, kernel_offset, kernel_size, tmp_path) kernel = ExtractionItem(self.extractor, tmp_path, self.depth, self.tag) return kernel.extract() ``` Line 491 ~ 494 抓取 kernel_size 的資訊 Line 502 執行 `Extractor` static method `io_dd` `io_dd` 主要做的事情就是把 `self.item` 從 `kernel_offset` 開始抓 `kernel_size` bytes 存到 `tmp_path` Line 504 ~ 507 將 `tmp_path` 設為 root path 再 extract 一次,並且 return ```python=515 elif not self.get_kernel_status() and \ not self.get_rootfs_status() and \ "rootfs offset: " in entry.description and \ "kernel offset: " in entry.description: ``` 若剛剛沒找到 pattern `OS Kernel Image` 那就改找 pattern `rootfs offset: ` 和 `kernel offset: ` 並且 member status[0]、[1] 都要是 False 但此例中 `get_kernel_status` 回傳 status[0],也就是 True,所以不會進此 elif。 若進了此 elif: - ```python=519 kernel_offset = 0 kernel_size = 0 rootfs_offset = 0 rootfs_size = 0 ``` 初始一些參數 - ```python=524 for stmt in entry.description.split(','): if "kernel offset:" in stmt: kernel_offset = int(stmt.split(':')[1], 16) elif "kernel length:" in stmt: kernel_size = int(stmt.split(':')[1], 16) elif "rootfs offset:" in stmt: rootfs_offset = int(stmt.split(':')[1], 16) elif "rootfs length:" in stmt: rootfs_size = int(stmt.split(':')[1], 16) ``` - ```python=535 if kernel_offset != rootfs_size and kernel_size == 0 and \ rootfs_size == 0: kernel_size = rootfs_offset - kernel_offset rootfs_size = os.path.getsize(self.item) - rootfs_offset ``` 抓取、設定參數 `kernel_offset` `kernel_size` `rootfs_offset` `rootfs_size` - ```python=541 if (kernel_size > 0 and kernel_offset + kernel_size \ <= os.path.getsize(self.item)) and \ (rootfs_size != 0 and rootfs_offset + rootfs_size \ <= os.path.getsize(self.item)): self.printf(">>>> %s" % entry.description) tmp_fd, tmp_path = tempfile.mkstemp(dir=self.temp) os.close(tmp_fd) Extractor.io_dd(self.item, kernel_offset, kernel_size, tmp_path) kernel = ExtractionItem(self.extractor, tmp_path, self.depth, self.tag) kernel.extract() tmp_fd, tmp_path = tempfile.mkstemp(dir=self.temp) os.close(tmp_fd) Extractor.io_dd(self.item, rootfs_offset, rootfs_size, tmp_path) rootfs = ExtractionItem(self.extractor, tmp_path, self.depth, self.tag) rootfs.extract() return self.update_status() ``` 確定 size + offset 在檔案整個 size 範圍內,才做以下事情 - 將 kernel dd 出來,並且當作 root path 做 extraction - 將 rootfs dd 出來,並且當作 root path 做 extraction - update status ```python=564 return False ``` 如果每個 entry 都不會進上面兩個條件式其中之一,`_check_firmware` 即回傳 False。 ## `_check_kernel()` ```python=571 if not self.get_kernel_status(): for module in binwalk.scan(self.item, "-y", "kernel", signature=True, quiet=True): for entry in module.results: if "kernel version" in entry.description: self.update_database("kernel_version", entry.description) if "Linux" in entry.description: if self.get_kernel_path(): shutil.copy(self.item, self.get_kernel_path()) else: self.extractor.do_kernel = False self.printf(">>>> %s" % entry.description) return True # VxWorks, etc else: self.printf(">>>> Ignoring: %s" % entry.description) return False return False return False ``` Line 571 若 member status[0] 為 False,表示還未抓到 kernel,進入 if 嘗試抓 kernel Line 572 此例中,如同下了指令: - `binwalk --signature --quiet -y kernel '${FIRMADYNE}/WNAP320 Firmware Version 2.0.3.zip'` Line 575 若搜尋到 pattern `kernel version` 做以下事情: - 設定 database 中,此 iid 的 kernel_version 欄位等於整個 entry.description - 若搜尋到 pattern `Linux` - 若有設定 output 位置,member function `get_kernel_path` 回傳要輸出 kernel 的位置。 - 若有 output 位置,則複製原檔案到該輸出的位置 - 沒有設定 output 位置的話,設定 Extractor 的 do_kernel 為 False - 回傳 True - 沒搜到 `Linux` 就回傳 False 沒搜到 pattern 就回傳 False ## `_check_rootfs()` ```python=597 if not self.get_rootfs_status(): for module in binwalk.scan(self.item, "-e", "-r", "-y", "filesystem", signature=True, quiet=True): for entry in module.results: self.printf(">>>> %s" % entry.description) break if module.extractor.directory: unix = Extractor.io_find_rootfs(module.extractor.directory) if not unix[0]: self.printf(">>>> Extraction failed!") return False self.printf(">>>> Found Linux filesystem in %s!" % unix[1]) if self.output: shutil.make_archive(self.output, "gztar", root_dir=unix[1]) else: self.extractor.do_rootfs = False return True return False ``` 前面有解釋 `io_find_rootfs` 會做什麼 簡單來說,若用 binwalk 發現含有 `filesystem` 的 entry 將這個路徑丟到 `io_find_rootfs` 找 filesystem 沒找到 fs 或根本沒有這個 entry 都會 return False 其他狀況 return True Line 614 若有設定 output 路徑,將剛剛 `io_find_rootfs` 抓出來的 fs 路徑輸出到該輸出的位置,這個就是主要產生 `iid.tar.gz` 的 code 沒設定 output 路徑的話,設定 do_rootfs 為 False ## `_check_compressed` ```python=626 return self._check_recursive("compressed") ``` 前面有解釋 `_check_recursive` 會做什麼 這裡只是把 binwalk -y 的參數設定為 `compressed` 接下來做的事情的流程跟前面解釋的一樣