Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
__pycache__/
.pytest_cache/
.tox/
.venv/
.vscode/
.vscode-test/
.vagra
68 changes: 63 additions & 5 deletions __init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,19 @@
- Looks like imports with jump stubs doesn't get their types set correctly?

"""
import os
import traceback

import binaryninja
from binaryninja.plugin import PluginCommand
from binaryninja.interaction import (
TextLineField, DirectoryNameField,
get_form_input, show_message_box
)
from binaryninja import log_info

from . import pe_parsing, reports, sync


files = {}
from . import pe_parsing, reports, sync, comhelper
from .data import files


def bv_is_pe(bv):
Expand All @@ -48,6 +54,43 @@ def wrapper(bv):
return wrapper


def select_bvs(func):

def wrapper(bv):
ext_field = TextLineField("Extensions", default="exe,dll")
dir_field = DirectoryNameField("Directory")
get_form_input([ext_field, dir_field], "Binary Dependency Graph")
exts = ['.' + ext for ext in ext_field.result.split(",") if ext]
if not exts:
show_message_box("Error", "No extensions specified")
return
directory = dir_field.result
if not os.path.exists(directory):
show_message_box("Error", "Directory does not exist")
return

bvs = []
try:
for root, dirs, files in os.walk(directory):
for file in files:
if not file.endswith(tuple(exts)):
continue
path = os.path.join(root, file)
bv = binaryninja.open_view(path, update_analysis=False)
bvs.append(bv)
if bv.view_type == "PE":
bvs.append(bv)
func(bvs)
except Exception as e:
show_message_box("Error", traceback.format_exc())
return
finally:
for bv in bvs:
bv.file.close()

return wrapper


def register_file(bv):
# name = os.path.basename(bv.file.filename).split(".")[0]
name = pe_parsing.get_eat_name(bv)
Expand All @@ -67,7 +110,6 @@ def register_file(bv):
sync.resolve_imports, is_valid=bv_is_pe
)


PluginCommand.register(
"PE\\Debug\\PE tables",
"Show the IAT and EAT as seen by PE Utils",
Expand All @@ -78,3 +120,19 @@ def register_file(bv):
"Show a relationship graph for the currently loaded BVs",
all_bvs(reports.generate_relation_graph), is_valid=bv_is_pe
)
PluginCommand.register(
"PE\\Debug\\Binary relationship graph (selected)",
"Show a relationship graph for the currently loaded BVs",
select_bvs(reports.generate_relation_graph), is_valid=lambda _: True
)

PluginCommand.register_for_address(
"PE\\COM\\Resolve Interface ID",
"Resolve interface id of COM object",
comhelper.resolve_iid, is_valid=lambda bv, _: bv_is_pe(bv)
)
PluginCommand.register_for_address(
"PE\\COM\\Resolve Class ID",
"Resolve class id of COM object",
comhelper.resolve_clsid, is_valid=lambda bv, _: bv_is_pe(bv)
)
236 changes: 236 additions & 0 deletions comhelper/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
"""
This module is modified from https://github.com/fboldewin/COM-Code-Helper

Helper of analyzing COM Objects
"""

import os
import struct
import csv

from binaryninja import Type, BinaryView, log_info, log_warn, log_error, Architecture


_DATA_DIRECTORY = os.path.dirname(__file__)
CLASSES_FILE = os.path.join(_DATA_DIRECTORY, "classes.txt")
INTERFACES_FILE = os.path.join(_DATA_DIRECTORY, "interfaces.txt")
VTABLES_FILE = os.path.join(_DATA_DIRECTORY, "vtables.txt")


class ComHelper(object):
"""Base class for COMImporter and COMExporter"""

_vtable_data = None
_interface_data = None
_class_data = None

def __init__(self, bv: BinaryView) -> None:
self.bv = bv

@property
def can_run(self):
"""Whether the importer can run"""
if not self.bv.platform.name.startswith("windows"):
return False

@property
def interface_data(self):
if self._interface_data is None:
self._interface_data = _load_interface_data()
return self._interface_data

@property
def class_data(self):
if self._class_data is None:
self._class_data = _load_class_data()
return self._class_data

@property
def vtable_data(self):
if self._vtable_data is None:
self._vtable_data = _load_vtable_data()
return self._vtable_data

def define_guid(self, address: int = -1, name: str = None):
"""Define GUID type and define data_var if address >= 0
:param address: Address of the GUID
:param name: Name of the GUID
"""
guid_type_name = "GUID"
guid_type = self.bv.get_type_by_name(guid_type_name)
if not guid_type:
guid_type = Type.structure(
members=[
Type.int(4, sign=False),
Type.int(2, sign=False),
Type.int(2, sign=False),
Type.array(Type.int(1, False), 8),
],
packed=True,
)
self.bv.define_user_type("GUID", guid_type)
log_info("Created GUID type")

if address >= 0:
self.bv.define_data_var(address, guid_type, name=name)
log_info("Defined GUID variable at %x" % address)

def read_guid(cls, address: int):
"""Read GUID at address"""
raw_bytes = cls.bv.read(address, 16)
return format_guid(raw_bytes)

def define_vtable(self, interface: str, vtable_address: int = -1):
"""Define vtable type and data_var for a given interface

:param interface: Name of the interface
:param vtable_address: Address of the vtable
"""

vtable_name = "vtable_for_" + interface
vtable_type = self.bv.get_type_by_name(vtable_name)
if not vtable_type:
vtable_data = self.vtable_data
if interface not in vtable_data:
log_error("No VTable data for %s" % interface)
return

members = []
member_infos = vtable_data[interface]
for member_info in member_infos:
member_name = member_info["name"]
member_size = member_info["size"]
# TODO: Try to get member type from analyzed DLL
if member_name.startswith("unknown"):
member_type = Type.array(Type.int(1, sign=False), member_size)
else:
member_type = Type.pointer(Architecture["armv7"], Type.void())
members.append((member_type, member_name))

vtable_type = Type.structure(members=members, packed=True)
self.bv.define_user_type(vtable_name, vtable_type)
log_info("Created VTable type for %s" % interface)

# Define VTable data_var
if vtable_address >= 0:
self.bv.define_data_var(vtable_address, vtable_type, name=vtable_name)
log_info(
"Defined VTable variable for %s at %x" % (interface, vtable_address)
)


class COMImporter(ComHelper):
"""
Helper of importing COM object information generated by COMExporter
"""

@property
def can_run(self):
"""Whether the importer can run"""
if not self.bv.platform.name.startswith("windows"):
return False
for sym in self.bv.symbols:
if sym.type == "FunctionSymbol" and "CoCreateInstance" in sym.name:
return True
return False

def resolve_clsid(self, address: int):
"""Analyze class by clsid
:param address: Address of the CLSID
"""
guid_str = self.read_guid(address)
if guid_str not in self.class_data:
log_warn("No class data for GUID %s" % guid_str)
return

clsid = self.class_data[guid_str]
clsid_name = "clsid_for_" + clsid
self.define_guid(address, name=clsid_name)

def resolve_iid(self, address: int):
"""Analyze interface by guid
:param address: Address of the GUID
"""
guid_str = self.read_guid(address)
if guid_str not in self.interface_data:
log_warn("No interface data for GUID %s" % guid_str)
return

interface = self.interface_data[guid_str]
guid_name = "iid_for_" + interface
self.define_guid(address, name=guid_name)
self.define_vtable(interface)


class COMExporter(ComHelper):
"""
Helper of analyzing COM Objects in dll and export them
"""

pass


def _load_class_data():
with open(CLASSES_FILE, "r") as class_fp:
class_reader = csv.reader(class_fp, delimiter=" ")
records = list(class_reader)
return records


def _load_interface_data():
records = {}
with open(INTERFACES_FILE, "r") as interface_fp:
interface_reader = csv.reader(interface_fp, delimiter=" ")
for guid, interface in interface_reader:
records[guid] = interface
return records


def _load_vtable_data():
vtable_data = {}
with open(VTABLES_FILE, "r") as vtable_fp:
vtable_reader = csv.reader(vtable_fp, delimiter=" ")
records = list(vtable_reader)

curr_interface = None
curr_offset = 0
for row_idx, row in enumerate(records):
if not row:
continue
if len(row) != 3:
raise ValueError("Invalid VTable row at line %d" % (row_idx + 1))
interface, name, offset = row
offset = int(offset)

if interface != curr_interface:
curr_interface = interface
curr_offset = 0
vtable_data[interface] = []

if offset != curr_offset:
vtable_data[interface].append(dict(name='unknown' + hex(offset)[2:], size=curr_offset - offset))
curr_offset = offset
else:
curr_offset += 4 # Vtable pointer is 32-bit
# raise ValueError("Unmatched VTable offset at %d expect_offset=%d, curr_offset=%d" % (row_idx, offset, curr_offset))
vtable_data[interface].append(dict(name=name, size=4))

return vtable_data


def format_guid(raw_bytes):
if len(raw_bytes) != 16:
raise ValueError("Invalid GUID length")
values = struct.unpack("<IHH8B", raw_bytes)
guid = "%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X" % values
return guid


def resolve_iid(bv, address):
importer = COMImporter(bv)
importer.resolve_iid(address)


def resolve_clsid(bv, address):
importer = COMImporter(bv)
importer.resolve_clsid(address)
Loading