From 21b30fad3a1fc0726691fd32593ada8e4ae2a704 Mon Sep 17 00:00:00 2001 From: XYlearn <1014355965@qq.com> Date: Thu, 21 Sep 2023 19:09:08 +0800 Subject: [PATCH 1/2] fix: fix wrong ordinal processing --- pe_parsing.py | 73 +++++++++++++++++++++++++++++++++++---------------- reports.py | 18 +++++++------ sync.py | 41 +++++++++++++++++++---------- 3 files changed, 88 insertions(+), 44 deletions(-) diff --git a/pe_parsing.py b/pe_parsing.py index cabf31c..0602347 100644 --- a/pe_parsing.py +++ b/pe_parsing.py @@ -5,6 +5,7 @@ from binaryninja.flowgraph import FlowGraph, FlowGraphNode from binaryninja.function import DisassemblyTextLine, InstructionTextToken from binaryninja.enums import InstructionTextTokenType, BranchType +from binaryninja.log import log_warn if sys.version_info[0] == 3: decode_as = "ascii" @@ -22,10 +23,11 @@ def get_directory_addr(bv, directory_offset): raw = bv.parent_view pe_offset = get_pe_header_addr(bv) - field_offset = raw.read(pe_offset + directory_offset, 4) - # Quick and dirty size-agnostic cross-version bytes-to-int conversion - field_offset = int(codecs.encode(field_offset[::-1], "hex"), 16) + field_offset = raw.read_int(pe_offset + directory_offset, 4) + if not field_offset: + return 0 + dir_addr = bv.start + field_offset return dir_addr @@ -85,7 +87,7 @@ def get_pe_header_addr(bv): class Export(object): - def __init__(self, addr, symbol, ord_, name_index=0): + def __init__(self, addr, symbol, ord_, hint, name_index=0): self.addr = addr self.ord = ord_ self.symbol = symbol @@ -111,6 +113,8 @@ def __repr__(self): def get_eat_name(bv): eat = get_eat_addr(bv) + if not eat: + return "" dll_name_ptr = bv.start + read_int(bv, eat + 0xc, 4) dll_name = bv.get_strings(dll_name_ptr)[0].value @@ -119,29 +123,42 @@ def get_eat_name(bv): def get_exports(bv): eat = get_eat_addr(bv) + if not eat: + return [] - eat_items = read_int(bv, eat + 0x14, 4) - eat_addr = read_int(bv, eat + 0x1c, 4) - ord_addr = read_int(bv, eat + 0x24, 4) - # name_addr = read_int(raw, eat + 0x20) + ord_base = read_int(bv, eat + 0x10, 4) + eat_addr_items = read_int(bv, eat + 0x14, 4) + eat_name_ptrs = read_int(bv, eat + 0x18, 4) + eat_addr_rva = read_int(bv, eat + 0x1c, 4) + name_addr_rva = read_int(bv, eat + 0x20) + ord_addr_rva = read_int(bv, eat + 0x24, 4) # Keep track of how many ordinals refer to a given symbol name_counter = {} exports = [] - for n in range(eat_items): - addr = bv.start + read_int(bv, bv.start + eat_addr + n * 4, 4) - ord_ = read_int(bv, bv.start + ord_addr + n * 2, 2) - # name_ptr = bv.start + read_int(raw, name_addr + n * bv.address_size) + for hint in range(eat_name_ptrs): + ord_ = read_int(bv, bv.start + ord_addr_rva + hint * 2, 2) # by hint + addr = bv.start + read_int(bv, bv.start + eat_addr_rva + ord_ * 4, 4) + name_ptr = bv.start + read_int(bv, bv.start + name_addr_rva + hint * bv.address_size) + name = read_cstring(bv, name_ptr) # mangled name + if decode_as: + name = name.decode(decode_as) symbol = bv.get_symbol_at(addr) + for symbol in bv.get_symbols(start=addr): + if symbol.name == name: + break + else: + log_warn("Unable to find symbol for export %r with hint %d" % (name, hint)) + continue # Dupe export counting - if symbol.name.lower() not in name_counter: - name_counter[symbol.name.lower()] = 0 - name_counter[symbol.name.lower()] += 1 + if name not in name_counter: + name_counter[name] = 0 + name_counter[name] += 1 - exports.append(Export(addr, symbol, ord_, - name_index=name_counter[symbol.name.lower()])) + exports.append(Export(addr, symbol, ord_ + ord_base, hint, + name_index=name_counter[name])) return exports @@ -164,15 +181,18 @@ def __repr__(self): def read_imports(self, bv): n = 0 - while read_int(bv, self.lookup_table + n * bv.address_size): + flag_mask = (1 << (bv.address_size * 8 - 1)) - 1 + while True: lookup = read_int(bv, self.lookup_table + n * bv.address_size) + if not lookup: + break datavar_addr = self.import_table + n * bv.address_size n += 1 # We won't find *any* info here if this is an ordinal import. - if lookup >> (bv.address_size * 8 - 1): + if lookup & ~flag_mask: # Strip the ordinal flag - lookup ^= 1 << (bv.address_size * 8 - 1) + lookup &= flag_mask self.imports.append(Import(lookup, None, datavar_addr)) continue @@ -195,15 +215,24 @@ def __init__(self, ordinal, name, datavar_addr): self.name = name self.datavar_addr = datavar_addr + def __repr__(self): + return "Import(%r, %r, 0x%08x)" % (self.ordinal, self.name, + self.datavar_addr) + def get_imports(bv): iat = get_iat_addr(bv) + if not iat: + return [] imports = [] n = 0 - while read_int(bv, iat + n * (4 * 5), 4): - lookup_table = bv.start + read_int(bv, iat + n * (4 * 5), 4) + while True: + lookup_table_rva = read_int(bv, iat + n * (4 * 5), 4) + if not lookup_table_rva: + break + lookup_table = bv.start + lookup_table_rva import_table = bv.start + read_int(bv, iat + n * (4 * 5) + 0x10, 4) name_addr = bv.start + read_int(bv, iat + n * (4 * 5) + 0xc, 4) diff --git a/reports.py b/reports.py index 8d3c9dd..9e2d8e6 100644 --- a/reports.py +++ b/reports.py @@ -35,20 +35,22 @@ def generate_relation_graph(bvs): for bv in bvs: name = get_eat_name(bv) - nodes.add(name.lower()) - bv_nodes.add(name.lower()) - node_labels[name.lower()] = name + lower_name = name.lower() + nodes.add(lower_name) + bv_nodes.add(lower_name) + node_labels[lower_name] = name if not first_node: - first_node = name.lower() + first_node = lower_name - edges[name] = set() + edges[lower_name] = set() for library in get_imports(bv): - nodes.add(library.name.lower()) - node_labels[library.name.lower()] = library.name + imp_lower_name = library.name.lower() + nodes.add(imp_lower_name) + node_labels[imp_lower_name] = library.name - edges[name.lower()].add(library.name.lower()) + edges[lower_name].add(imp_lower_name) graph_nodes = {} graph = FlowGraph() diff --git a/sync.py b/sync.py index 376032b..e7ab751 100644 --- a/sync.py +++ b/sync.py @@ -1,6 +1,7 @@ from binaryninja.log import log_warn, log_info, log_error from binaryninja.types import Symbol from binaryninja.enums import SymbolType +from binaryninja.demangle import demangle_ms import peutils @@ -18,15 +19,17 @@ def resolve_imports(bv): def resolve_imports_for_library(bv, lib): source_bv = peutils.files[lib.name.lower()] exports = pe_parsing.get_exports(source_bv) + + export_by_ord = {export.ord: export for export in exports} for import_ in lib.imports: # Find the name name = None - for export in exports: - if export.ord == import_.ordinal: - log_info(repr(export)) - name = export.name - export_symbol = export.symbol + if import_.ordinal in export_by_ord: + export = export_by_ord[import_.ordinal] + log_info(repr(export)) + name = export.name + export_symbol = export.symbol if not name: log_warn("Unable to find name for %r" % import_) @@ -59,17 +62,27 @@ def resolve_imports_for_library(bv, lib): ) continue - type_tokens = [token.text for token in export_func.type_tokens] - i = type_tokens.index(export_symbol.name) - type_tokens[i] = "(*const func_name)" - - type_string = "".join(type_tokens) - log_info("Setting type for %s to %r" % (name, type_string)) - try: - (type_, name) = bv.parse_type_string(type_string) + (type_, name) = demangle_ms(bv.arch, export_symbol.name) except: - log_error("Invalid type, skipping") + log_error("Invalid name, skipping") + continue + + if type_ is None: + type_tokens = [token.text for token in export_func.type_tokens] + if export_symbol.name not in type_tokens: + log_error("Unknown error") + continue + i = type_tokens.index(export_symbol.name) + type_tokens[i] = "(*const func_name)" + + type_string = "".join(type_tokens) + log_info("Setting type for %s to %r" % (name, type_string)) + + try: + (type_, name) = bv.parse_type_string(type_string) + except: + log_error("Invalid type, skipping") bv.define_data_var(import_.datavar_addr, type_) From 2ad2b11695f5d264a4ff016fa61a25da8297784f Mon Sep 17 00:00:00 2001 From: XYlearn <1014355965@qq.com> Date: Thu, 30 Nov 2023 20:32:09 +0800 Subject: [PATCH 2/2] feat: add com helper --- .gitignore | 7 + __init__.py | 68 +- comhelper/__init__.py | 236 + comhelper/classes.txt | 3687 +++ comhelper/interfaces.txt | 27479 ++++++++++++++++++++++ comhelper/vtables.txt | 45985 +++++++++++++++++++++++++++++++++++++ data.py | 2 + pe_parsing.py | 14 +- reports.py | 16 +- sync.py | 22 +- 10 files changed, 77490 insertions(+), 26 deletions(-) create mode 100644 .gitignore create mode 100644 comhelper/__init__.py create mode 100644 comhelper/classes.txt create mode 100644 comhelper/interfaces.txt create mode 100644 comhelper/vtables.txt create mode 100644 data.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..02a14d5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +__pycache__/ +.pytest_cache/ +.tox/ +.venv/ +.vscode/ +.vscode-test/ +.vagra \ No newline at end of file diff --git a/__init__.py b/__init__.py index 84559a9..b3c8bec 100644 --- a/__init__.py +++ b/__init__.py @@ -25,13 +25,19 @@ - Looks like imports with jump stubs doesn't get their types set correctly? """ +import os +import traceback + +import binaryninja from binaryninja.plugin import PluginCommand +from binaryninja.interaction import ( + TextLineField, DirectoryNameField, + get_form_input, show_message_box +) from binaryninja import log_info -from . import pe_parsing, reports, sync - - -files = {} +from . import pe_parsing, reports, sync, comhelper +from .data import files def bv_is_pe(bv): @@ -48,6 +54,43 @@ def wrapper(bv): return wrapper +def select_bvs(func): + + def wrapper(bv): + ext_field = TextLineField("Extensions", default="exe,dll") + dir_field = DirectoryNameField("Directory") + get_form_input([ext_field, dir_field], "Binary Dependency Graph") + exts = ['.' + ext for ext in ext_field.result.split(",") if ext] + if not exts: + show_message_box("Error", "No extensions specified") + return + directory = dir_field.result + if not os.path.exists(directory): + show_message_box("Error", "Directory does not exist") + return + + bvs = [] + try: + for root, dirs, files in os.walk(directory): + for file in files: + if not file.endswith(tuple(exts)): + continue + path = os.path.join(root, file) + bv = binaryninja.open_view(path, update_analysis=False) + bvs.append(bv) + if bv.view_type == "PE": + bvs.append(bv) + func(bvs) + except Exception as e: + show_message_box("Error", traceback.format_exc()) + return + finally: + for bv in bvs: + bv.file.close() + + return wrapper + + def register_file(bv): # name = os.path.basename(bv.file.filename).split(".")[0] name = pe_parsing.get_eat_name(bv) @@ -67,7 +110,6 @@ def register_file(bv): sync.resolve_imports, is_valid=bv_is_pe ) - PluginCommand.register( "PE\\Debug\\PE tables", "Show the IAT and EAT as seen by PE Utils", @@ -78,3 +120,19 @@ def register_file(bv): "Show a relationship graph for the currently loaded BVs", all_bvs(reports.generate_relation_graph), is_valid=bv_is_pe ) +PluginCommand.register( + "PE\\Debug\\Binary relationship graph (selected)", + "Show a relationship graph for the currently loaded BVs", + select_bvs(reports.generate_relation_graph), is_valid=lambda _: True +) + +PluginCommand.register_for_address( + "PE\\COM\\Resolve Interface ID", + "Resolve interface id of COM object", + comhelper.resolve_iid, is_valid=lambda bv, _: bv_is_pe(bv) +) +PluginCommand.register_for_address( + "PE\\COM\\Resolve Class ID", + "Resolve class id of COM object", + comhelper.resolve_clsid, is_valid=lambda bv, _: bv_is_pe(bv) +) diff --git a/comhelper/__init__.py b/comhelper/__init__.py new file mode 100644 index 0000000..0ba1311 --- /dev/null +++ b/comhelper/__init__.py @@ -0,0 +1,236 @@ +""" +This module is modified from https://github.com/fboldewin/COM-Code-Helper + +Helper of analyzing COM Objects +""" + +import os +import struct +import csv + +from binaryninja import Type, BinaryView, log_info, log_warn, log_error, Architecture + + +_DATA_DIRECTORY = os.path.dirname(__file__) +CLASSES_FILE = os.path.join(_DATA_DIRECTORY, "classes.txt") +INTERFACES_FILE = os.path.join(_DATA_DIRECTORY, "interfaces.txt") +VTABLES_FILE = os.path.join(_DATA_DIRECTORY, "vtables.txt") + + +class ComHelper(object): + """Base class for COMImporter and COMExporter""" + + _vtable_data = None + _interface_data = None + _class_data = None + + def __init__(self, bv: BinaryView) -> None: + self.bv = bv + + @property + def can_run(self): + """Whether the importer can run""" + if not self.bv.platform.name.startswith("windows"): + return False + + @property + def interface_data(self): + if self._interface_data is None: + self._interface_data = _load_interface_data() + return self._interface_data + + @property + def class_data(self): + if self._class_data is None: + self._class_data = _load_class_data() + return self._class_data + + @property + def vtable_data(self): + if self._vtable_data is None: + self._vtable_data = _load_vtable_data() + return self._vtable_data + + def define_guid(self, address: int = -1, name: str = None): + """Define GUID type and define data_var if address >= 0 + :param address: Address of the GUID + :param name: Name of the GUID + """ + guid_type_name = "GUID" + guid_type = self.bv.get_type_by_name(guid_type_name) + if not guid_type: + guid_type = Type.structure( + members=[ + Type.int(4, sign=False), + Type.int(2, sign=False), + Type.int(2, sign=False), + Type.array(Type.int(1, False), 8), + ], + packed=True, + ) + self.bv.define_user_type("GUID", guid_type) + log_info("Created GUID type") + + if address >= 0: + self.bv.define_data_var(address, guid_type, name=name) + log_info("Defined GUID variable at %x" % address) + + def read_guid(cls, address: int): + """Read GUID at address""" + raw_bytes = cls.bv.read(address, 16) + return format_guid(raw_bytes) + + def define_vtable(self, interface: str, vtable_address: int = -1): + """Define vtable type and data_var for a given interface + + :param interface: Name of the interface + :param vtable_address: Address of the vtable + """ + + vtable_name = "vtable_for_" + interface + vtable_type = self.bv.get_type_by_name(vtable_name) + if not vtable_type: + vtable_data = self.vtable_data + if interface not in vtable_data: + log_error("No VTable data for %s" % interface) + return + + members = [] + member_infos = vtable_data[interface] + for member_info in member_infos: + member_name = member_info["name"] + member_size = member_info["size"] + # TODO: Try to get member type from analyzed DLL + if member_name.startswith("unknown"): + member_type = Type.array(Type.int(1, sign=False), member_size) + else: + member_type = Type.pointer(Architecture["armv7"], Type.void()) + members.append((member_type, member_name)) + + vtable_type = Type.structure(members=members, packed=True) + self.bv.define_user_type(vtable_name, vtable_type) + log_info("Created VTable type for %s" % interface) + + # Define VTable data_var + if vtable_address >= 0: + self.bv.define_data_var(vtable_address, vtable_type, name=vtable_name) + log_info( + "Defined VTable variable for %s at %x" % (interface, vtable_address) + ) + + +class COMImporter(ComHelper): + """ + Helper of importing COM object information generated by COMExporter + """ + + @property + def can_run(self): + """Whether the importer can run""" + if not self.bv.platform.name.startswith("windows"): + return False + for sym in self.bv.symbols: + if sym.type == "FunctionSymbol" and "CoCreateInstance" in sym.name: + return True + return False + + def resolve_clsid(self, address: int): + """Analyze class by clsid + :param address: Address of the CLSID + """ + guid_str = self.read_guid(address) + if guid_str not in self.class_data: + log_warn("No class data for GUID %s" % guid_str) + return + + clsid = self.class_data[guid_str] + clsid_name = "clsid_for_" + clsid + self.define_guid(address, name=clsid_name) + + def resolve_iid(self, address: int): + """Analyze interface by guid + :param address: Address of the GUID + """ + guid_str = self.read_guid(address) + if guid_str not in self.interface_data: + log_warn("No interface data for GUID %s" % guid_str) + return + + interface = self.interface_data[guid_str] + guid_name = "iid_for_" + interface + self.define_guid(address, name=guid_name) + self.define_vtable(interface) + + +class COMExporter(ComHelper): + """ + Helper of analyzing COM Objects in dll and export them + """ + + pass + + +def _load_class_data(): + with open(CLASSES_FILE, "r") as class_fp: + class_reader = csv.reader(class_fp, delimiter=" ") + records = list(class_reader) + return records + + +def _load_interface_data(): + records = {} + with open(INTERFACES_FILE, "r") as interface_fp: + interface_reader = csv.reader(interface_fp, delimiter=" ") + for guid, interface in interface_reader: + records[guid] = interface + return records + + +def _load_vtable_data(): + vtable_data = {} + with open(VTABLES_FILE, "r") as vtable_fp: + vtable_reader = csv.reader(vtable_fp, delimiter=" ") + records = list(vtable_reader) + + curr_interface = None + curr_offset = 0 + for row_idx, row in enumerate(records): + if not row: + continue + if len(row) != 3: + raise ValueError("Invalid VTable row at line %d" % (row_idx + 1)) + interface, name, offset = row + offset = int(offset) + + if interface != curr_interface: + curr_interface = interface + curr_offset = 0 + vtable_data[interface] = [] + + if offset != curr_offset: + vtable_data[interface].append(dict(name='unknown' + hex(offset)[2:], size=curr_offset - offset)) + curr_offset = offset + else: + curr_offset += 4 # Vtable pointer is 32-bit + # raise ValueError("Unmatched VTable offset at %d expect_offset=%d, curr_offset=%d" % (row_idx, offset, curr_offset)) + vtable_data[interface].append(dict(name=name, size=4)) + + return vtable_data + + +def format_guid(raw_bytes): + if len(raw_bytes) != 16: + raise ValueError("Invalid GUID length") + values = struct.unpack("