#-------------------------------------------------------------------------------
#
# IDA Pro Plug-in: Function Identification and Recovery Signature Tool (FIRST)
# Copyright (C) 2016 Angel M. Villegas
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Requirements
# ------------
# Requests (docs.python-requests.org/)
#
# Installation
# ------------
# Drag and drop into IDA Pro's plugin folder for IDA Pro 6.9 SP1 and higher
#
#-------------------------------------------------------------------------------
# IDA Pro Python Modules
import idc
import idaapi
import idautils
import random
# Third Party Python Modules
required_modules_loaded = True
try:
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
except ImportError:
required_modules_loaded &= False
print 'FIRST requires Python module requests\n'
try:
from requests_kerberos import HTTPKerberosAuth
except ImportError:
print '[1st] Kerberos support is not avaialble'
HTTPKerberosAuth = None
from PyQt5 import QtGui, QtWidgets, QtCore
from PyQt5.QtCore import Qt
# Python Modules
import re
import csv
import sys
import math
import time
import json
import inspect
import os.path
import datetime
import calendar
import threading
import collections
import ConfigParser
from pprint import pprint
from os.path import exists
from hashlib import sha256, md5, sha1
from base64 import b64encode, b64decode
# Constants
#-------------------------------------------------------------------------------
FIRST_INDEX = {
'hashes' : 1,
'malware_name' : 2,
}
# Global Variables
#-------------------------------------------------------------------------------
FIRST_ICON = None
FIRST_DB = 'FIRST_data'
g_network_headers = {}
class IDAWrapper(object):
'''
Class to wrap functions that are not thread safe. These functions must
be run on the main thread to avoid random crashes (and starting in 7.2,
this is enforced by IDA, with an exception being generated if a
thread-unsafe function is called from outside of the main thread.)
'''
mapping = {
'get_tform_type' : 'get_widget_type',
}
def __init__(self):
self.version = idaapi.IDA_SDK_VERSION
def __getattribute__(self, name):
default = '[1st] default'
if (idaapi.IDA_SDK_VERSION >= 700) and (name in IDAWrapper.mapping):
name = IDAWrapper.mapping[name]
val = getattr(idaapi, name, default)
if val == default:
val = getattr(idautils, name, default)
if val == default:
val = getattr(idc, name, default)
if val == default:
msg = 'Unable to find {}'.format(name)
idaapi.execute_ui_requests((FIRSTUI.Requests.Print(msg),))
return
if hasattr(val, '__call__'):
def call(*args, **kwargs):
holder = [None] # need a holder, because 'global' sucks
def trampoline():
holder[0] = val(*args, **kwargs)
return 1
# Execute the request using MFF_WRITE, which should be safe for
# any possible request at the expense of speed. In my testing,
# though, it wasn't noticably slower than MFF_FAST. If this
# is observed to impact performance, consider creating a list
# that maps API calls to the most appropriate flag.
idaapi.execute_sync(trampoline, idaapi.MFF_WRITE)
return holder[0]
return call
else:
return val
IDAW = IDAWrapper()
# Some of the IDA API functions return generators that invoke thread-unsafe
# code during iteration. Thus, making the initial API call via IDAW is not
# sufficient to have these underlying API calls be executed safely on the
# main thread. This generator wraps those and performs the iteration safely.
def safe_generator(iterator):
# Make the sentinel value something that isn't likely to be returned
# by an API call (and isn't a fixed string that could be inserted into
# a program to break FIRST maliciously)
sentinel = '[1st] Sentinel %d' % (random.randint(0, 65535))
holder = [sentinel] # need a holder, because 'global' sucks
def trampoline():
try:
holder[0] = next(iterator)
except StopIteration:
holder[0] = sentinel
return 1
while True:
# See notes above regarding why we use MFF_WRITE here
idaapi.execute_sync(trampoline, idaapi.MFF_WRITE)
if holder[0] == sentinel:
return
yield holder[0]
# Main Plug-in Form Class
#-------------------------------------------------------------------------------
class FIRST_FormClass(idaapi.PluginForm):
system = {0 : 'Unknown', 1 : 'Win', 6 : 'Linux', 9 : 'Osx'}
def __init__(self):
super(FIRST_FormClass, self).__init__()
self.parent = None
def OnCreate(self, form):
self.form = form
self.parent = self.FormToPyQtWidget(form)
self.populate_model()
self.populate_main_form()
def populate_model(self):
# Selectable views in the main plug-in window
self.views_ui = {'Configuration' : self.view_configuration_info,
'Management' : self.view_created,
'Currently Applied' : self.view_applied,
'About' : self.view_about}
self.views = ['About', 'Configuration', 'Management', 'Currently Applied']
self.views_model = FIRST.Model.Base(['Views'], self.views)
def view_configuration_info(self):
self.thread_stop = True
container = QtWidgets.QVBoxLayout()
label = QtWidgets.QLabel('Configuration Information')
label.setStyleSheet('font: 18px;')
container.addWidget(label)
layout = QtWidgets.QHBoxLayout()
self.message = QtWidgets.QLabel()
layout.addWidget(self.message)
layout.addStretch()
save_button = QtWidgets.QPushButton('Save')
layout.addWidget(save_button)
scroll_layout = FIRSTUI.ScrollWidget(frame=QtWidgets.QFrame.NoFrame)
FIRSTUI.SharedObjects.server_config_layout(self, scroll_layout, FIRST.config)
container.addWidget(scroll_layout)
container.addStretch()
container.addLayout(layout)
save_button.clicked.connect(self.save_config)
return container
def save_config(self):
FIRST.config = FIRSTUI.SharedObjects.get_config(self)
FIRST.config.save_config(FIRST.config_path)
info = FIRST.Info.get_file_details()
FIRST.server = FIRST.Server(FIRST.config,
info['md5'],
info['crc32'],
h_sha1=info['sha1'],
h_sha256=info['sha256'])
title = 'FIRST: Configuration Changes'
msg = 'FIRST\'s configuration information has been updated'
idaapi.execute_ui_requests((FIRSTUI.Requests.MsgBox(title, msg, QtWidgets.QMessageBox.Information),))
def view_created(self):
container = QtWidgets.QVBoxLayout()
groups = None
label = QtWidgets.QLabel('FIRST Metadata')
label.setStyleSheet('font: 18px;')
container.addWidget(label)
container.addSpacing(5)
description = ('The metadata you\'ve created and added to FIRST '
'are shown below. You can delete them via right '
'clicking on them and selecting delete or '
'selecting one and hitting the delete key.')
description = QtWidgets.QLabel(description)
description.setWordWrap(True)
container.addWidget(description)
container.addSpacing(10)
data_model = FIRST.Model.Check({})
tree_view = FIRST.Model.TreeView(depth=1)
tree_view.setExpandsOnDoubleClick(False)
tree_view.setIndentation(15)
tree_view.setSelectionBehavior(QtWidgets.QAbstractItemView.SelectRows)
tree_view.setSelectionMode(QtWidgets.QAbstractItemView.SingleSelection)
# Setup the Model's header
FIRSTUI.SharedObjects.make_model_headers(data_model, full=False)
tree_view.setModel(data_model)
self.created_data_model = data_model
tree_view.setColumnWidth(0, 175) # Function
tree_view.setColumnWidth(1, 35) # Rank
tree_view.setColumnWidth(2, 150) # Prototype
tree_view.setColumnWidth(3, 20) # i
# Add chunks to the list at a time receieved
self.__received_data = False
if FIRST.server:
# Spawn thread to get chunks of data back from server
self.thread_stop = False
idaapi.show_wait_box('Querying FIRST for metadata you\'ve created')
server_thread = FIRST.server.created(self.__data_callback,
self.__complete_callback)
# wait several seconds
for i in xrange(2):
time.sleep(1)
if idaapi.wasBreak():
self.thread_stop = True
FIRST.server.stop_operation(server_thread)
idaapi.hide_wait_box()
self.history_dialogs = []
tree_view.setContextMenuPolicy(Qt.ActionsContextMenu)
delete_action = QtWidgets.QAction('&Delete', self.parent)
delete_action.setShortcut('Del')
delete_action.triggered.connect(self.delete_metadata)
history_action = QtWidgets.QAction('View &History', self.parent)
history_action.setShortcut('H')
history_action.triggered.connect(self.metadata_history)
tree_view.addAction(delete_action)
tree_view.addAction(history_action)
self.created_tree_view = tree_view
container.addWidget(self.created_tree_view)
return container
def __data_callback(self, thread, data):
if self.thread_stop:
FIRST.server.stop_operation(thread)
# Build the model
root_node = self.created_data_model.invisibleRootItem()
for match in data:
self.__received_data = True
row = FIRSTUI.SharedObjects.make_match_info(match, full=False)
root_node.appendRow(row)
def __complete_callback(self, thread, data):
FIRST.server.remove_operation(thread)
# Alert the user if no matches were found in FIRST
if not self.__received_data:
title = 'FIRST: No Metadata Found'
msg = 'You have not added any metadata to FIRST'
idaapi.execute_ui_requests((FIRSTUI.Requests.MsgBox(title, msg, QtWidgets.QMessageBox.Information),))
return
def delete_metadata(self):
selected = self.created_tree_view.selectedIndexes()
if not selected:
return
ids = set([x.data(FIRSTUI.ROLE_ID) for x in selected])
index = selected[0]
for metadata_id in ids:
# Delete from FIRST
if metadata_id:
response = FIRST.server.delete(metadata_id)
if (not response
or ('failed' not in response)
or response['failed']
or (('deleted' in response) and not response['deleted'])):
title = 'FIRST: Delete Created Metadata'
msg = 'Cannot delete the requested signature. '
if response and ('msg' in response):
msg += 'Error: {0[msg]} '.format(response)
idaapi.execute_ui_requests((FIRSTUI.Requests.MsgBox(title, msg),))
return
# Remove from view, get the top row of the tree
if index.parent().isValid():
index = index.parent()
root = self.created_data_model.invisibleRootItem()
root.removeRow(index.row())
def metadata_history(self):
selected = self.created_tree_view.selectedIndexes()
if not selected:
return
ids = [x.data(FIRSTUI.ROLE_ID) for x in selected]
index = selected[0]
if not ids:
return
dialog = FIRSTUI.Dialog(None, FIRSTUI.History, metadata_id=ids[0])
dialog.show()
self.history_dialogs.append(dialog)
def view_about(self):
self.thread_stop = True
container = QtWidgets.QVBoxLayout()
label = QtWidgets.QLabel('FIRST ')
label.setStyleSheet('font: 24px;')
container.addWidget(label)
label = QtWidgets.QLabel('Function Identification and Recovery Signature Tool')
label.setStyleSheet('font: 12px;')
container.addWidget(label)
grid_layout = QtWidgets.QGridLayout()
grid_layout.addWidget(QtWidgets.QLabel('Version'), 0, 0)
grid_layout.addWidget(QtWidgets.QLabel(str(FIRST.VERSION)), 0, 1)
grid_layout.addWidget(QtWidgets.QLabel('Date'), 1, 0)
grid_layout.addWidget(QtWidgets.QLabel(FIRST.DATE), 1, 1)
grid_layout.addWidget(QtWidgets.QLabel('Report Issues'), 2, 0)
label = QtWidgets.QLabel(('<a href="https://github.com/'
'vrtadmin/FIRST-plugin-ida/issues">'
'github.com/vrtadmin/FIRST-plugin-ida</a>'))
label.setTextFormat(Qt.RichText)
label.setTextInteractionFlags(Qt.TextBrowserInteraction)
label.setOpenExternalLinks(True)
grid_layout.addWidget(label, 2, 1)
grid_layout.setColumnMinimumWidth(0, 100)
grid_layout.setColumnStretch(1, 1)
grid_layout.setContentsMargins(10, 0, 0, 0)
container.addSpacing(10)
container.addLayout(grid_layout)
container.addStretch()
copyright = '{}-{} Cisco Systems, Inc.'.format(FIRST.BEGIN, FIRST.END)
label = QtWidgets.QLabel(copyright)
label.setStyleSheet('font: 10px;')
label.setAlignment(Qt.AlignCenter)
container.addWidget(label)
return container
def view_applied(self):
self.thread_stop = True
container = QtWidgets.QVBoxLayout()
groups = None
label = QtWidgets.QLabel('Applied Metadata')
label.setStyleSheet('font: 18px;')
container.addWidget(label)
container.addSpacing(5)
description = ('FIRST metadata you\'ve applied in this IDB '
'are shown below. You can go to the function via '
'right clicking on the function and selecting View '
'or double clicking the function.')
description = QtWidgets.QLabel(description)
description.setWordWrap(True)
container.addWidget(description)
container.addSpacing(10)
data = FIRST.Metadata.get_functions_with_applied_metadata()
data = {d.address : d for d in data}
data_model = FIRST.Model.Check(data)
tree_view = FIRST.Model.TreeView(depth=1)
tree_view.setExpandsOnDoubleClick(False)
tree_view.setIndentation(15)
tree_view.setSelectionBehavior(QtWidgets.QAbstractItemView.SelectRows)
tree_view.setSelectionMode(QtWidgets.QAbstractItemView.SingleSelection)
# Setup the Model's header
headers = [ ('Function', 'function name and address in this IDB', 0),
('Prototype', 'function prototype', 1),
('User', 'creator of the metadata', 2)]
for display_name, tooltip, i in headers:
item_header = QtGui.QStandardItem(display_name)
item_header.setToolTip(tooltip)
data_model.setHorizontalHeaderItem(i, item_header)
# Setup all other rows
name_str = '0x{0.address:08x}: {0.name}'
if not FIRST.Info.is_32bit():
name_str = name_str.replace(':08x}', ':016x}')
root_node = data_model.invisibleRootItem()
cmp_func = lambda x,y: cmp(x.address, y.address)
for match in sorted(data.values(), cmp=cmp_func):
# Row: <address and name> <prototype> <creator>
name = QtGui.QStandardItem(name_str.format(match))
prototype = QtGui.QStandardItem(match.prototype)
prototype.setToolTip(match.prototype)
creator = QtGui.QStandardItem(match.creator)
creator.setToolTip(match.creator)
info = [name, prototype, creator]
# Add row:
# Comment:
# (comment)
comment = match.comment
if not comment:
comment = '- No Comment -'
comment = QtGui.QStandardItem('Comment:\n' + comment)
comment.setColumnCount(8)
comment.setData(True, role=FIRSTUI.ROLE_COMMENT)
comment_list = [comment] + ([QtGui.QStandardItem()] * 2)
info[0].appendRow(comment_list)
# Mark all items noneditable and add id associated with the match
for item in info + comment_list:
item.setEditable(False)
item.setData(match.id, role=FIRSTUI.ROLE_ID)
item.setData(match.address, role=FIRSTUI.ROLE_ADDRESS)
root_node.appendRow(info)
tree_view.setModel(data_model)
self.applied_data_model = data_model
tree_view.setColumnWidth(0, 200) # Address and Function
tree_view.setColumnWidth(1, 400) # Prototype
tree_view.setColumnWidth(2, 20) # Author
# Keep a reference to the dialog so it doesn't hide before the
# user is done with it
self.history_dialogs = []
tree_view.setContextMenuPolicy(Qt.CustomContextMenu)
tree_view.customContextMenuRequested.connect(self.applied_custom_menu)
self.applied_tree_view = tree_view
container.addWidget(self.applied_tree_view)
return container
def applied_custom_menu(self, point):
index = self.applied_tree_view.indexAt(point)
address = index.data(FIRSTUI.ROLE_ADDRESS)
if not address:
return
menu = QtWidgets.QMenu(self.applied_tree_view)
goto_action = QtWidgets.QAction('&Go to Function', self.applied_tree_view)
goto_action.triggered.connect(lambda:IDAW.Jump(address))
menu.addAction(goto_action)
metadata_id = index.data(FIRSTUI.ROLE_ID)
if metadata_id:
history_action = QtWidgets.QAction('View &History', self.applied_tree_view)
history_action.triggered.connect(lambda:self._metadata_history(metadata_id))
menu.addAction(history_action)
menu.exec_(QtGui.QCursor.pos())
def _metadata_history(self, metadata_id):
dialog = FIRSTUI.Dialog(None, FIRSTUI.History, metadata_id=metadata_id)
dialog.show()
# Keep a reference to the dialog so it doesn't hide before the
# user is done with it
self.history_dialogs.append(dialog)
def populate_main_form(self):
list_view = QtWidgets.QListView()
list_view.setFixedWidth(115)
list_view.setModel(self.views_model)
select = QtCore.QItemSelectionModel.Select
list_view.selectionModel().select(self.views_model.createIndex(0, 0), select)
list_view.clicked.connect(self.view_clicked)
current_view = QtWidgets.QWidget()
view = self.view_about()
if not view:
view = QtWidgets.QBoxLayout()
current_view.setLayout(view)
self.splitter = QtWidgets.QSplitter(Qt.Horizontal)
self.splitter.addWidget(list_view)
self.splitter.addWidget(current_view)
self.splitter.setChildrenCollapsible(False)
self.splitter.show()
outer_layout = QtWidgets.QHBoxLayout()
outer_layout.addWidget(self.splitter)
self.parent.setLayout(outer_layout)
def view_clicked(self, index):
key = self.views_model.data(index)
if key in self.views_ui:
# Get the new view
widget = QtWidgets.QWidget()
layout = self.views_ui[key]()
if not layout:
layout = QtWidgets.QVBoxLayout()
widget.setLayout(layout)
# Remove the old view to the splitter
old_widget = self.splitter.widget(1)
if old_widget:
old_widget.hide()
old_widget.deleteLater()
self.splitter.insertWidget(1, widget)
def check_function_accept(self, dialog):
FIRST.Callbacks.accepted(self, dialog)
def check_function(self, ctx):
if not IDAW.get_func(IDAW.ScreenEA()):
title = 'Unable to derive function'
msg = ( 'Cannot upload function. Ensure the cursor is '
'positioned within a defined function (cursor '
'currently at 0x{0:X})').format(IDAW.ScreenEA())
idaapi.execute_ui_requests((FIRSTUI.Requests.MsgBox(title, msg),))
return
dialog = FIRSTUI.Dialog(None, FIRSTUI.Check)
dialog.registerSuccessCallback(self.check_function_accept)
dialog.show()
def check_all_function(self, ctx):
dialog = FIRSTUI.Dialog(None, FIRSTUI.CheckAll)
dialog.registerSuccessCallback(self.check_function_accept)
dialog.show()
def upload_func(self, ctx):
dialog = FIRSTUI.Dialog(None, FIRSTUI.Upload)
dialog.registerSuccessCallback(self.check_function_accept)
dialog.show()
def upload_all_func(self, ctx):
dialog = FIRSTUI.Dialog(None, FIRSTUI.UploadAll)
dialog.registerSuccessCallback(self.check_function_accept)
dialog.show()
def update_funcs(self, ctx):
FIRST.Callbacks.Update()
data = FIRST.Metadata.get_functions_with_applied_metadata()
if data:
title = 'FIRST: Updating Metadata for Functions'
msg = ('There are {} functions with FIRST data. They are '
'being updated to reflect the most recent '
'metadata for each function.').format(len(data))
idaapi.execute_ui_requests((FIRSTUI.Requests.MsgBox(title, msg, QtWidgets.QMessageBox.Information),))
def view_history(self, ctx):
function = IDAW.get_func(IDAW.ScreenEA())
if not function:
msg = '[1st] Unable to retrieve function at 0x{0:x}\n'.format(IDAW.ScreenEA())
idaapi.execute_ui_requests((FIRSTUI.Requests.Print(msg),))
return
metadata = FIRST.Metadata.get_function(function.startEA)
if not metadata:
message = '[1st] Unable to retrieve function at 0x{0:x}\n'
idaapi.execute_ui_requests((FIRSTUI.Requests.Print(message.format(metadata.address)),))
return
if not metadata.id:
message = '[1st] No FIRST metadata is applied to the function at 0x{0:x}\n'
idaapi.execute_ui_requests((FIRSTUI.Requests.Print(message.format(metadata.address)),))
return
dialog = FIRSTUI.Dialog(None, FIRSTUI.History, metadata_id=metadata.id)
dialog.show()
[docs]class FIRST(object):
debug = False
# About Information
#------------------------
VERSION = 'BETA'
DATE = 'May 2018'
BEGIN = 2014
END = 2018
plugin_enabled = False
show_welcome = False
server = None
config = None
config_path = os.path.join(idaapi.get_user_idadir(), 'first_beta.cfg')
installed_hooks = []
function_list = None
plugin = None
iat = []
# Colors used
color_changed = QtGui.QBrush(QtGui.QColor.fromRgb(255, 153, 139))
color_unchanged = QtGui.QBrush(QtGui.QColor.fromRgb(238, 238, 238))
color_default = QtGui.QBrush(QtGui.QColor.fromRgb(255, 255, 255))
color_selected = QtGui.QBrush(QtGui.QColor.fromRgb(160, 216, 241))
color_applied = QtGui.QBrush(QtGui.QColor.fromRgb(214, 227, 181))
[docs] @staticmethod
def initialize():
'''Initializes FIRST by installing hooks and populating required data
strucutres.'''
global g_network_headers
g_network_headers['User-Agent'] = "FIRST {} {} Cython {}({}.{}.{}) {}".format(
FIRST.VERSION,
FIRST.DATE,
sys.api_version,
sys.version_info.major,
sys.version_info.minor,
sys.version_info.releaselevel,
sys.platform
)
FIRST.installed_hooks = [FIRST.Hook.IDP(), FIRST.Hook.UI()]
[x.hook() for x in FIRST.installed_hooks]
FIRST.plugin = FIRST_FormClass()
@staticmethod
def cleanup_hooks():
if FIRST.installed_hooks:
for x in FIRST.installed_hooks:
x.unhook()
FIRST.installed_hooks = []
class Error(Exception):
'''FIRST Exception Class'''
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
[docs] class Info():
'''Information gathering functions.
Will get different information required by FIRST to interact with
server or other plug-in side operations.
This class contains only static methods and should be accessed as such.
Attributes:
processor_map (:obj:`dict`): Dictionary mapping between IDA's naming
convention to FIRST's.
include_bits (:obj:`list`): List of processors that should include
the number of bits.
'''
processor_map = {'metapc' : 'intel'}
include_bits = ['intel', 'arm']
[docs] @staticmethod
def set_file_details(md5, crc32, sha1=None, sha256=None):
'''Sets details about the sample.
This is a work around for situations where there is no original
sample on disk that IDA analyzes. FIRST requires a MD5 and CRC32 to
store functions, without it the function will not be saved.
Args:
md5 (:obj:`str`): Valid MD5 hash
'''
# Validate User Input
md5 = md5.lower()
if not re.match(r'^[a-f\d]{32}$', md5) or type(crc32) != int:
return
db = IDAW.GetArrayId(FIRST_DB)
key = FIRST_INDEX['hashes']
if -1 == db:
db = IDAW.CreateArray(FIRST_DB)
# Get hashes from file
data = {'md5' : md5,
'sha1' : sha1,
'sha256' : sha256,
'crc32' : crc32}
IDAW.SetArrayString(db, key, json.dumps(data))
# Update server class
if FIRST.server and hasattr(FIRST.server, 'binary_info'):
FIRST.server = FIRST.Server(FIRST.config, md5, crc32, sha1, sha256)
[docs] @staticmethod
def get_file_details():
'''Returns details about the sample.
The MD5 and CRC32 fields will always be returned since IDA Pro
provides that information. If the IDB is created with the original
sample then the sample will be hashed to get the SHA1 and SHA256.
All tthe data is stored in the IDB to prevent getting the
information multiple times.
Returns:
dict. Dictionary of file hashes and CRC32.
'''
db = IDAW.GetArrayId(FIRST_DB)
key = FIRST_INDEX['hashes']
if -1 != db:
data = IDAW.GetArrayElement(IDAW.AR_STR, db, key)
if 0 != data:
return json.loads(data)
else:
db = IDAW.CreateArray(FIRST_DB)
# Get hashes from file
data = {'md5' : IDAW.GetInputMD5(),
'sha1' : None,
'sha256' : None,
'crc32' : IDAW.retrieve_input_file_crc32()}
file_path = IDAW.GetInputFilePath()
if file_path and exists(file_path):
with open(file_path, 'rb') as f:
f_data = f.read()
data['sha1'] = sha1(f_data).hexdigest()
data['sha256'] = sha256(f_data).hexdigest()
# Store this in the IDB so it can be retrieved
IDAW.SetArrayString(db, key, json.dumps(data))
return data
[docs] @staticmethod
def is_32bit():
'''Returns if the sample is 32bit or not.
Returns:
bool: True is 32bit or False.
'''
if (idaapi.IDA_SDK_VERSION < 730):
info = IDAW.get_inf_structure()
if info.is_64bit():
return False
elif info.is_32bit():
return True
return False
else:
return IDAW.inf_is_32bit()
[docs] @staticmethod
def get_architecture():
'''Returns the architecture the sample is built for.
The values are normalized for the FIRST server. It altered then
FIRST will not match on other functions with the same architecture.
Returns:
str. String representation of the architecture associated with
the sample. Examples: intel32, intel64, arm32, mips, etc.
'''
info = IDAW.get_inf_structure()
proc = info.procName.lower()
proc = FIRST.Info.processor_map.get(proc, proc)
if proc in FIRST.Info.include_bits:
bits = 16
if (idaapi.IDA_SDK_VERSION < 730):
if info.is_64bit():
bits = 64
elif info.is_32bit():
bits = 32
else:
if IDAW.inf_is_64bit():
bits = 64
elif IDAW.inf_is_32bit():
bits = 32
return '{}{}'.format(proc, bits)
return proc
[docs] @staticmethod
def signature(address):
'''Returns opcodes for the function the address is associated with.
Given a virtual address, this function will return it in a series
of bytes or None. The opcodes are ordered in address ascending
order.
Args:
address (`int`): An address associated with a function. The
address can be any address within the function.
Returns:
str: A string of binary data on success.
None: On failure.
'''
function = IDAW.get_func(address)
blocks = {}
# Ensure address is in a function
if not function:
return None
fc = IDAW.FlowChart(function)
for block in fc:
data = {'start' : block.startEA, 'end' : block.endEA}
data['size'] = block.endEA - block.startEA
data['bytes'] = IDAW.GetManyBytes(block.startEA, data['size'])
if data['size'] > 0:
blocks[block.startEA] = data
if not blocks:
return None
sig = ''
for address in sorted(blocks.keys()):
if 'bytes' in blocks[address]:
sig += blocks[address]['bytes']
return sig
[docs] @staticmethod
def get_apis(address):
'''Returns a list of all APIs used by a function.
The address provided will be used to get a function and each
instruction in the function is examined for APIs in the sample's
IAT.
Args:
address (`int`): An address associated with a function. The
address can be any address within the function.
Returns:
list: Empty list or list of `MetadataShim` objects
'''
apis = []
# populate iat
if not FIRST.iat:
func = lambda ea, name, ord: FIRST.iat.append(name) == None
imports = IDAW.get_import_module_qty()
if imports:
for i in xrange(imports):
IDAW.enum_import_names(i, func)
# Cycle through all instructions within the function
for instr in safe_generator(IDAW.FuncItems(address)):
name = None
if not IDAW.is_call_insn(instr):
instruction = IDAW.DecodeInstruction(instr)
if not instruction:
continue
for i in xrange(len(instruction.Operands)):
if IDAW.GetOpType(instr, i) == idaapi.o_mem:
name = IDAW.Name(IDAW.GetOperandValue(instr, i))
break
else:
# It is a call instruction
for xref in safe_generator(IDAW.XrefsFrom(instr, IDAW.XREF_FAR)):
if xref.to == None:
break
name = IDAW.NameEx(0, xref.to)
if (name in FIRST.iat) and (name not in apis):
apis.append(name)
return apis
[docs] class DB():
'''FIRST DB Class
Provides functions to save data to and retrieve data from IDA's
IDB backend. Additionally, it contains functions for calculating the
index functions should be saved to in the IDB to provide constant time
lookups.
This class contains only static methods and should be accessed as such.
Attributes:
record_size (:obj:`int`): The number of bytes that can be saved into
one index in the IDB's array. Once the number of bytes are hit
the record is split and will continue in the next index.
Note:
IDA enforces a hard limit of 1024, setting this value higher
than that will result in information loss.
max_records (:obj:`int`): Determines how many array indices can be
used to store data for a given function.
Note:
If this number is increased and there is enough data to use
all the indices, this could result in over writting other
FIRST function data saved in the IDB.
'''
record_size = 1024
max_records = 16
[docs] @staticmethod
def save(functions):
'''Saves one or more functions to the IDB DB.
This function can be used to save one or more FIRSTMetadata objects
to the IDB's database.
Args:
functions (`FIRSTMetadata` or `list` of `FIRSTMetadata`)
Returns:
None
'''
if list != type(functions):
functions = [functions]
for function in functions:
if not isinstance(function, FIRST.MetadataShim):
continue
tag = FIRST.DB.get_tag(function)
key = FIRST.DB.get_index(function)
if None in [tag, key]:
continue
max_str = hex(FIRST.DB.max_records)[2:]
if (len(max_str) % 2) != 0:
max_str = '0' + max_str
max_str = max_str.decode('hex')
data = function.to_db()
records = len(data) + len(max_str)
records = int(math.ceil(records / float(FIRST.DB.record_size)))
record_str = hex(records)[2:]
if (len(record_str) % 2) != 0:
record_str = '0' + record_str
record_str = record_str.decode('hex')
start = FIRST.DB.record_size - len(max_str)
IDAW.SetArrayString(tag, key, record_str + data[:start])
if records > FIRST.DB.max_records:
temp_str = 'Cannot store data for function: {0}\n'
idaapi.execute_ui_requests((FIRSTUI.Requests.Print(temp_str.format(function.name)),))
continue
for i in xrange(1, records):
begin = start + ((i + 1) * FIRST.DB.record_size)
end = begin + FIRST.DB.record_size
IDAW.SetArrayString(tag, key + i, data[begin:end])
[docs] @staticmethod
def get_function(address=None, function=None):
'''Retrieves function and all its details from the IDB DB.
The data returned here may not match the current state of the IDB.
Either the address or function argument should be provided.
Providing neither will result in a return value of None.
Args:
address (`int`, optional): The start address of the function.
function (:obj:`MetadataShim`, optional): The current
MetadataShim object for the function.
Returns:
FIRSTMetadata: If function exits and is saved it is returned.
None: On failure.
'''
if isinstance(function, FIRST.MetadataShim):
address = function.address
elif None != address:
function = FIRST.MetadataShim(address)
else:
return None
tag = FIRST.DB.get_tag(function)
key = FIRST.DB.get_index(function)
if None in [tag, key]:
return None
max_str = hex(FIRST.DB.max_records)[2:]
if (len(max_str) % 2) != 0:
max_str = '0' + max_str
max_str = max_str.decode('hex')
first = IDAW.GetArrayElement(IDAW.AR_STR, tag, key)
if not first:
return None
data = first[len(max_str):]
records = 0
for r in first[:len(max_str)]:
records = (records << 16) | ord(r)
for i in xrange(1, records):
new_data = IDAW.GetArrayElement(IDAW.AR_STR, tag, key + i)
if 0 == new_data:
break
data += new_data
function.from_db(data)
return function
[docs] @staticmethod
def get_tag(function):
'''Calculates and returns the tag for the given function.
Function that will return array id corresponding to the array with
the function data in it if the array exists. If the array does not
exist then it is created and the created array id is returned.
Args:
function (:obj:`MetadataShim`): The function to get a tag for.
Results:
int: The array ID on success.
None: On failure.
'''
if not isinstance(function, FIRST.MetadataShim):
return None
array_str = 'FIRST_{0}'.format(function.segment - IDAW.get_imagebase())
tag = IDAW.GetArrayId(array_str)
if -1 == tag:
# The array doesn't exist, create it
tag = IDAW.CreateArray(array_str)
if -1 == tag:
return None
return tag
[docs] @staticmethod
def get_index(function):
'''Computes the base index for the function.
The index computed by thios function is index into an IDB array.
Args:
function (:obj:`MetadataShim`): The function to get an index
for.
Results:
int: The index into the array.
None: On failure.
'''
if not isinstance(function, FIRST.MetadataShim):
return None
return function.offset << 4
[docs] class Configuration(object):
'''Class containing configuration details for FIRST.
Args:
config (:obj:`RawConfigParser`): Configuration details for plugin.
'''
def __init__(self, config=None):
self.__server = 'first.talosintelligence.com'
self.__port = 443
self.__protocol = 'https'
self.__verify = False
self.__auth = False
self.__api_key = ''
self.__data = {}
# Load configuration
if isinstance(config, ConfigParser.RawConfigParser):
self.load_config(config)
@property
def server(self):
''':obj:`str`: The FIRST server.'''
return self.__server
[docs] def set_server(self, _server):
self.__server = _server
@property
def port(self):
''':obj:`int`: The FIRST server port (Default: 80)'''
if isinstance(self.__port, int):
return self.__port
try:
return int(self.__port)
except ValueError:
return 80
[docs] def set_port(self, _port):
self.__port = _port
@property
def protocol(self):
''':obj:`str`: The TCP protocol used to communicate with FIRST.'''
return self.__protocol
[docs] def set_protocol(self, _protocol):
self.__protocol = _protocol
@property
def auth(self):
''':obj:`HTTPKeberosAuth`: Authenication used with FIRST
(default: None).'''
return self.__auth
@property
def authentication(self):
''':obj:`bool`: Flag set if authentication is used in connection.'''
return self.__auth == True
@property
def verify(self):
''':obj:`bool`: Whether the SSL cert will be verified.'''
return self.__verify == True
[docs] def set_verify(self, _verify):
self.__verify = _verify
[docs] def set_authentication(self, _authentication):
self.__auth = _authentication
@property
def api_key(self):
''':obj:`str`: The user's API key.'''
return self.__api_key
[docs] def set_api_key(self, key):
self.__api_key = key
[docs] def set_data(self, key, value):
'''Sets a specific configuration setting.
Args:
key (:obj:`str`): The configuration setting.
value (:obj:`str`): The configuration setting value.
'''
self.__data[key] = value
[docs] def save_config(self, config_path):
'''Saves the configuration set in this instance to disk.
Args:
config_path (:obj:`str`): File path to save configuration.
'''
config = ConfigParser.RawConfigParser()
section = 'connection_info'
values = { 'server' : self.server, 'port' : self.port,
'protocol' : self.protocol, 'verify' : self.verify,
'authentication' : self.authentication,
'api_key' : self.api_key}
config.add_section(section)
for option, value in values.iteritems():
config.set(section, option, value)
if len(self.__data):
section = 'settings'
config.add_section(section)
for option, value in self.__data.iteritems():
config.set(section, option, value)
try:
with open(config_path, 'wb') as f:
config.write(f)
except IOError as e:
idaapi.execute_ui_requests((FIRSTUI.Requests.Print(str(e) + '\n'),))
[docs] def load_config(self, config):
'''Loads configuration details into this instance.
Args:
config (:obj:`RawConfigParser`): The configuration details to
load.
'''
if not isinstance(config, ConfigParser.RawConfigParser):
return
self.__data = {}
# Set connection information
section = 'connection_info'
if config.has_section(section):
required = {'server' : self.set_server, 'port' : self.set_port,
'verify' : self.set_verify,
'protocol' : self.set_protocol,
'authentication' : self.set_authentication,
'api_key' : self.set_api_key}
for option, set_function in required.iteritems():
if config.has_option(section, option):
set_function(config.get(section, option))
section = 'settings'
if config.has_section(section):
for option in config.options(section):
self.__data[option] = config.get(section, option)
[docs] class Server(object):
'''Encapsulate interacting with the FIRST server's REST API.
Note:
Using functions ``set_protocol``, ``set_server``, and ``set_port``
do not update the configuration details, just the server instance
represented with this class.
Attributes:
urn (:obj:`str`): URL format string.
paths (:obj:`dict`): Mapping between operations and FIRST URI path
format strings.
MAX_CHUNK (:obj:`int`): The maximum number of entries sent to the
server. Default: 20
Note:
The FIRST server can set the max number of entries received.
If this value is greater than the server's then the server
will not perform the operation.
Args:
config (:obj:`Configuration`): FIRST configuration information.
h_md5 (:obj:`str`): The MD5 of the sample.
crc32 (:obj:`int`): The CRC32 of the sample.
h_sha1 (:obj:`str`, optional): The SHA1 of the sample.
h_sha256 (:obj:`str`, optional): The SHA256 of the sample.
'''
MAX_CHUNK = 20
urn = '{0.protocol}://{0.server}:{0.port}/{1}'
paths = {
# Test Connection URL
'test' : 'api/test_connection/{0[api_key]}',
'checkin' : 'api/sample/checkin/{0[api_key]}',
# Metadata URLs
'add' : 'api/metadata/add/{0[api_key]}',
'history' : 'api/metadata/history/{0[api_key]}',
'applied' : 'api/metadata/applied/{0[api_key]}',
'unapplied' : 'api/metadata/unapplied/{0[api_key]}',
'delete' : 'api/metadata/delete/{0[api_key]}/{0[id]}',
'created' : 'api/metadata/created/{0[api_key]}/{0[page]}',
'get' : 'api/metadata/get/{0[api_key]}',
# Scan URLs
'scan' : 'api/metadata/scan/{0[api_key]}',
}
def __init__(self, config, h_md5, crc32, h_sha1=None, h_sha256=None):
self.error_log = []
self.threads = {}
self.checkedin = False
self.binary_info = {'md5' : h_md5, 'crc32' : crc32,
'sha1' : h_sha1, 'sha256' : h_sha256}
self.auth, self.server, self.protocol = [None] * 3
self.port, self.verify, self.api_key = [None] * 3
if isinstance(config, FIRST.Configuration):
self.auth = config.authentication
self.server = config.server
self.protocol = config.protocol
self.port = config.port
self.verify = config.verify
self.api_key = config.api_key
[docs] def set_port(self, port):
'''Overrides the FIRST server port set in the configuration.
Args:
port (:obj:`int`): The FIRST server port.
'''
self.checkedin = False
self.port = port
[docs] def set_protocol(self, protocol):
'''Overrides the FIRST server protocol set in the configuration.
Args:
protocol (:obj:`int`): The FIRST server protocol.
'''
self.checkedin = False
self.protocol = protocol
[docs] def set_server(self, server):
'''Overrides the FIRST server set in the configuration.
Args:
port (:obj:`int`): The FIRST server.
'''
self.checkedin = False
self.server = server
[docs] def checkin(self, action):
'''Checks in with FIRST server to ensure annotations can be added.
This function must be called before any annotations are added to
FIRST. This function allows the FIRST server to setup information
about the sample, thereby allowing functions to be associated with
the sample. This only needs to be called once and is attempted
before the first user selected operation.
This operation is not done if the operation to be performed is to
test the connection to the server.
Args:
action (:obj:`str`): The FIRST operation to be performed
'''
if self.checkedin or action == 'test':
return
self.checkedin = True
response = self._sendp('checkin', self.binary_info)
if (not response
or (('failed' in response) and response['failed'])
or (('checkin' in response) and not response['checkin'])):
# Try to check in again with the next sever communication
self.checkedin = False
return
def _sendp(self, action, params={}, raw=False):
self.checkin(action)
if action not in self.paths:
return None
# Ensure all None values are converted to empty strings
for key in params:
if params[key] is None:
params[key] = ''
authentication = None
if self.auth:
if not HTTPKerberosAuth:
idaapi.execute_ui_requests((FIRSTUI.Requests.Print('[1st] Kerberos module is not loaded\n'),))
return
authentication = HTTPKerberosAuth()
url = self.urn.format(self, self.paths[action])
if FIRST.debug:
idaapi.execute_ui_requests(
(FIRSTUI.Requests.Print(
'[POST] {}\nSending: '.format(url.format(self._user()))),)
)
pprint(params)
try:
response = requests.post(url.format(self._user()),
data=params,
verify=self.verify,
headers=g_network_headers,
auth=authentication)
if raw:
return response
except requests.exceptions.ConnectionError as e:
title = 'Cannot connect to FIRST'
msg = ('Unable to connect to FIRST server at {0}\n'
'Retry operation').format(self.server)
idaapi.execute_ui_requests((FIRSTUI.Requests.MsgBox(title, msg),))
raise FIRST.Error('cannot connect')
except requests.exceptions.Timeout as e:
title = 'Cannot connect to FIRST'
msg = ( 'Unable to connect to FIRST server at {0}. '
'Connection timed out.').format(self.server)
idaapi.execute_ui_requests((FIRSTUI.Requests.MsgBox(title, msg),))
return
if FIRST.debug:
print response
if 'content' in dir(response):
print response.content
if 'status_code' not in dir(response):
return None
elif 200 != response.status_code:
return None
#idaapi.execute_ui_requests((FIRSTUI.Requests.Print('Server Raw Response:'),
# (FIRSTUI.Requests.Print(response)))
#try:
# pprint(response.text)
#except:
# pass
response = self.to_json(response)
if FIRST.debug:
idaapi.execute_ui_requests((FIRSTUI.Requests.Print('Server Response:'),))
pprint(response)
return response
def _sendg(self, action, params={}, raw=False):
self.checkin(action)
if action not in self.paths:
return None
# Ensure all None values are converted to empty strings
for key in params:
if params[key] is None:
params[key] = ''
params.update(self._user())
#idaapi.execute_ui_requests((FIRSTUI.Requests.Print('[GET] Sending: '),))
#pprint(params)
authentication = None
if self.auth:
if not HTTPKerberosAuth:
idaapi.execute_ui_requests((FIRSTUI.Requests.Print('[1st] Kerberos module is not loaded\n'),))
return
authentication = HTTPKerberosAuth()
url = self.urn.format(self, self.paths[action])
try:
response = requests.get(url.format(params),
verify=self.verify,
headers=g_network_headers,
auth=authentication)
if raw:
return response
except requests.exceptions.ConnectionError as e:
title = 'Cannot connect to FIRST'
msg = ('Unable to connect to FIRST server at {0}\n'
'Retry operation').format(self.server)
idaapi.execute_ui_requests((FIRSTUI.Requests.MsgBox(title, msg),))
raise FIRST.Error('cannot connect')
except requests.exceptions.Timeout as e:
title = 'Cannot connect to FIRST'
msg = ( 'Unable to connect to FIRST server at {0}. '
'Connection timed out.').format(self.server)
idaapi.execute_ui_requests((FIRSTUI.Requests.MsgBox(title, msg),))
return
if 'status_code' not in dir(response):
return None
elif 200 != response.status_code:
return None
#idaapi.execute_ui_requests((FIRSTUI.Requests.Print('Server Raw Response:'),
# FIRSTUI.Requests.Print(response))
#try:
# pprint(response.text)
#except:
# pass
response = self.to_json(response)
#idaapi.execute_ui_requests((FIRSTUI.Requests.Print('Server Response:'),))
#pprint(response)
return response
[docs] def to_json(self, response):
'''Converts Requests' response object to json.
Args:
response (:obj:`requests.models.Response`): A request response.
Returns:
dict: JSON data or empty dictionary.
'''
try:
return response.json()
except:
return {}
def _user(self):
return {'api_key' : self.api_key}
def _min_info(self):
return {'md5' : self.binary_info['md5'],
'crc32' : self.binary_info['crc32']}
[docs] def stop_operation(self, server_thread):
'''Signals a server thread to stop its work.
Args:
server_thread (:obj:`threading.Thread`): The thread to stop.
'''
if server_thread not in self.threads:
return
self.threads[server_thread]['stop'] = True
self.threads[server_thread]['complete'] = True
[docs] def remove_operation(self, server_thread):
'''Removes operation from server thread structure.
Args:
server_thread (:obj:`threading.Thread`): The thread to remove.
'''
if server_thread in self.threads:
del self.threads[server_thread]
# Test connection URL
[docs] def test_connection(self):
'''Interacts with server to see if there is a valid connection.
This is a short operation and is a blocking call.
Returns:
bool: True if connection can be made and FIRST returns a
success message. False otherwise.
'''
if not self.api_key:
return False
try:
data = self._sendg('test', {'api_key' : self.api_key})
except FIRST.Error as e:
data = None
return data and ('status' in data) and ('connected' == data['status'])
# Signature URLS
[docs] def add(self, metadata, data_callback=None, complete_callback=None):
'''Adds function metadata to FIRST.
This is a long operation, thus it has the option of providing a
``data_callback`` and ``complete_callback`` arguments. Those
arguments are functions that will be called with the newly returned
data and when the whole operation is complete, respectively. Both
functions should follow the below their respective prototypes;
``data_callback_prototype`` and ``complete_callback_prototype``.
Args:
metadata (:obj:`list` of :obj:`MetadataShim` or
:obj:`MetadataShim`): The metadata to be added to FIRST.
data_callback (:obj:`data_callback_prototype`, optional):
A function to call when data is receieved from the server.
complete_callback (:obj:`complete_callback_prototype`, optional):
A function to call when the whole long operation completes.
Returns:
threading.Thread. The thread created for the operation.
'''
args = (metadata, data_callback, complete_callback)
thread = threading.Thread(target=self.__thread_add, args=args)
thread.daemon = True
thread.start()
return thread
def __thread_add(self, metadata, data_callback=None, complete_callback=None):
'''thread'''
thread = threading.current_thread()
self.threads[thread] = {'results' : [], 'complete' : False,
'stop' : False}
if isinstance(metadata, FIRST.MetadataShim):
metadata = [metadata]
if False in [isinstance(m, FIRST.MetadataShim) for m in metadata]:
self.threads[thread]['complete'] = True
if complete_callback:
complete_callback(thread, self.threads[thread])
return
architecture = FIRST.Info.get_architecture()
for i in xrange(0, len(metadata), self.MAX_CHUNK):
params = self._min_info()
data = {}
for m in metadata[i:i + self.MAX_CHUNK]:
data[m.address] = { 'architecture' : architecture,
'opcodes' : b64encode(m.signature),
'name' : m.name,
'prototype' : m.prototype,
'comment' : m.comment,
'apis' : m.apis,
'id' : m.id}
params['functions'] = json.dumps(data)
try:
response = self._sendp('add', params)
except FIRST.Error as e:
self.threads[thread]['complete'] = True
if complete_callback:
complete_callback(thread, self.threads[thread])
return
if response:
self.threads[thread]['results'].append(response)
if data_callback:
data_callback(thread, response)
if self.threads[thread]['stop']:
break
self.threads[thread]['complete'] = True
if complete_callback:
complete_callback(thread, self.threads[thread])
[docs] def history(self, metadata):
'''Gets annotation history from FIRST.
This is a short operation and is a blocking call.
Args:
metadata (:obj:`MetadataShim` or :obj:`MetadataServer`): The
FIRST annotation the history is being requested.
Returns:
dict: JSON data returned from server. None on failure.
'''
if (isinstance(metadata, FIRST.MetadataShim)
or isinstance(metadata, FIRST.MetadataServer)):
metadata = metadata.id
try:
response = self._sendp('history', {'metadata' : json.dumps([metadata])})
except FIRST.Error as e:
return None
return response
[docs] def applied(self, metadata_id):
'''Sets a FIRST annotation as applied to this sample.
This is a short operation and is a blocking call.
Args:
metadata_id (:obj:`str`): The FIRST annotation ID.
Returns:
dict: JSON data returned from the server. None on failure.
'''
params = self._min_info()
params['id'] = metadata_id
try:
response = self._sendp('applied', params)
except FIRST.Error as e:
return None
return response
[docs] def unapplied(self, metadata_id):
'''Sets a FIRST annotation as unapplied to this sample.
This is a short operation and is a blocking call.
Args:
metadata_id (:obj:`str`): The FIRST annotation ID.
Returns:
dict: JSON data returned from the server. None on failure.
'''
params = self._min_info()
params['id'] = metadata_id
try:
response = self._sendp('unapplied', params)
except FIRST.Error as e:
return None
return response
[docs] def delete(self, metadata_id):
'''Deletes a FIRST annotation created by the user.
This is a short operation and is a blocking call.
Args:
metadata_id (:obj:`str`): The FIRST annotation ID.
Returns:
dict: JSON data returned from the server. None on failure.
'''
params = {'id' : metadata_id}
try:
response = self._sendg('delete', params)
except FIRST.Error as e:
return None
return response
[docs] def created(self, data_callback=None, complete_callback=None):
'''Retrieves FIRST annotations the user has created.
This is a long operation, thus it has the option of providing a
``data_callback`` and ``complete_callback`` arguments. Those
arguments are functions that will be called with the newly returned
data and when the whole operation is complete, respectively. Both
functions should follow the below their respective prototypes;
``data_callback_prototype`` and ``complete_callback_prototype``.
Args:
data_callback (:obj:`data_callback_prototype`, optional):
A function to call when data is receieved from the server.
complete_callback (:obj:`complete_callback_prototype`, optional):
A function to call when the whole long operation completes.
Returns:
threading.Thread. The thread created for the operation.
'''
args = (data_callback, complete_callback)
thread = threading.Thread(target=self.__thread_created, args=args)
thread.daemon = True
thread.start()
return thread
def __thread_created(self, data_callback=None, complete_callback=None):
'''Thread to get created data'''
thread = threading.current_thread()
self.threads[thread] = {'results' : [], 'complete' : False,
'stop' : False}
page = 1
total_pages = 0
first_time = True
while (first_time
or ((page <= total_pages) and (not self.threads[thread]['stop']))):
if first_time:
first_time = False
try:
response = self._sendg('created', {'page' : page})
except FIRST.Error as e:
self.threads[thread]['complete'] = True
if complete_callback:
complete_callback(thread, self.threads[thread])
if not response:
continue
if 'pages' in response:
total_pages = response['pages']
# Print out page data very 10 percent
ten_percent = total_pages / 10.0
if (not ten_percent) or (0 == (page % ten_percent)):
idaapi.execute_ui_requests((FIRSTUI.Requests.Print('{} out of {} pages\n'.format(page, total_pages)),))
if ('results' in response) and response['results']:
metadata = response['results']
data = [FIRST.MetadataServer(x, x['id']) for x in metadata]
self.threads[thread]['results'].append(data)
if data_callback:
data_callback(thread, data)
page += 1
self.threads[thread]['complete'] = True
if complete_callback:
complete_callback(thread, self.threads[thread])
[docs] def get(self, metadata_ids, data_callback=None, complete_callback=None):
'''Retrieves FIRST annotations the user has created.
This is a long operation, thus it has the option of providing a
``data_callback`` and ``complete_callback`` arguments. Those
arguments are functions that will be called with the newly returned
data and when the whole operation is complete, respectively. Both
functions should follow the below their respective prototypes;
``data_callback_prototype`` and ``complete_callback_prototype``.
Args:
metadata (:obj:`list` of :obj:`MetadataShim`): The metadata to
be retrieved from FIRST.
data_callback (:obj:`data_callback_prototype`, optional):
A function to call when data is receieved from the server.
complete_callback (:obj:`complete_callback_prototype`, optional):
A function to call when the whole long operation completes.
Returns:
threading.Thread. The thread created for the operation.
'''
args = (metadata_ids, data_callback, complete_callback)
thread = threading.Thread(target=self.__thread_get, args=args)
thread.daemon = True
thread.start()
return thread
def __thread_get(self, metadata, data_callback=None, complete_callback=None):
'''Thread to get metadata'''
thread = threading.current_thread()
self.threads[thread] = {'results' : [], 'complete' : False,
'stop' : False}
if isinstance(metadata, FIRST.MetadataShim):
metadata = [metadata]
if False in [isinstance(m, FIRST.MetadataShim) for m in metadata]:
self.threads[thread]['complete'] = True
return
for i in xrange(0, len(metadata), self.MAX_CHUNK):
if self.threads[thread]['stop']:
break
data = [m.id for m in metadata[i:i + self.MAX_CHUNK]]
try:
response = self._sendp('get', {'metadata' : json.dumps(data)})
except FIRST.Error as e:
self.threads[thread]['complete'] = True
if complete_callback:
complete_callback(thread, self.threads[thread])
return
if (not response or ('results' not in response)
or (dict != type(response['results']))
or (not len(response['results']))):
continue
results = {}
for metadata_id, details in response['results'].iteritems():
results[metadata_id] = FIRST.MetadataServer(details)
if 0 < len(results):
self.threads[thread]['results'].append(results)
if data_callback:
data_callback(thread, results)
self.threads[thread]['complete'] = True
if complete_callback:
complete_callback(thread, self.threads[thread])
[docs] def scan(self, metadata, data_callback=None, complete_callback=None):
'''Queries FIRST for matches.
This is a long operation, thus it has the option of providing a
``data_callback`` and ``complete_callback`` arguments. Those
arguments are functions that will be called with the newly returned
data and when the whole operation is complete, respectively. Both
functions should follow the below their respective prototypes;
``data_callback_prototype`` and ``complete_callback_prototype``.
Args:
metadata (:obj:`list` of :obj:`MetadataShim`): The metadata to
be queried for matches in FIRST.
data_callback (:obj:`data_callback_prototype`, optional):
A function to call when data is receieved from the server.
complete_callback (:obj:`complete_callback_prototype`, optional):
A function to call when the whole long operation completes.
Returns:
threading.Thread. The thread created for the operation.
'''
args = (metadata, data_callback, complete_callback)
thread = threading.Thread(target=self.__thread_scan, args=args)
thread.daemon = True
thread.start()
return thread
def __thread_scan(self, metadata, data_callback=None, complete_callback=None):
'''Thread to query FIRST for metadata'''
thread = threading.current_thread()
self.threads[thread] = {'results' : [], 'complete' : False,
'stop' : False}
if isinstance(metadata, FIRST.MetadataShim):
metadata = [metadata]
if False in [isinstance(m, FIRST.MetadataShim) for m in metadata]:
self.threads[thread]['complete'] = True
return
subkeys = {'engines', 'matches'}
architecture = FIRST.Info.get_architecture()
for i in xrange(0, len(metadata), self.MAX_CHUNK):
if self.threads[thread]['stop']:
break
params = self._min_info()
data = {}
for m in metadata[i:i + self.MAX_CHUNK]:
signature = m.signature
if not signature:
continue
data[m.address] = { 'opcodes' : b64encode(m.signature),
'apis' : m.apis,
'architecture' : architecture}
params['functions'] = json.dumps(data)
try:
response = self._sendp('scan', params)
except FIRST.Error as e:
self.threads[thread]['complete'] = True
if complete_callback:
complete_callback(thread, self.threads[thread])
return
if (not response or ('results' not in response)
or (dict != type(response['results']))
or (not subkeys.issubset(response['results'].keys()))
or (0 == len(response['results']['matches']))):
continue
results = {}
engine_info = response['results']['engines']
matches = response['results']['matches']
for address_str in matches:
functions = []
address = int(address_str)
for match in matches[address_str]:
engines = {x : engine_info[x] for x in match['engines']}
data = FIRST.MetadataServer(match, address, engines)
functions.append(data)
if len(functions) > 0:
results[address] = functions
if 0 < len(results):
self.threads[thread]['results'].append(results)
if data_callback:
data_callback(thread, results)
self.threads[thread]['complete'] = True
if complete_callback:
complete_callback(thread, self.threads[thread])
[docs] class Model(object):
[docs] class Base(QtCore.QAbstractTableModel):
'''A QT QAbstractTableModel Implementation.
Args:
header (:obj:`list`): The column values.
data (:obj:`dict`): Dictionary of values.
parent (:obj:`QtCore.QObject`): The parent object.
Overloads many class methods to provide the functionality FIRST
required.
'''
def __init__(self, header, data=None, parent=None):
super(FIRST.Model.Base, self).__init__(parent)
self.header = header
self._data = data
if None == data:
self._data = collections.OrderedDict()
[docs] def rowCount(self, parent=QtCore.QModelIndex()):
'''The number of rows under the given parent.
When the parent is valid it means that rowCount is returning
the number of children of parent.
Args:
parent (:obj:`QtCore.QModelIndex`, optional): Parent
Returns:
int: Number of rows
'''
if None == self._data:
return 0
return len(self._data)
[docs] def columnCount(self, parent=QtCore.QModelIndex()):
'''The number of columns for the children of the given parent.
Args:
parent (:obj:`QtCore.QModelIndex`, optional): Parent
Returns:
int: Number of columns
'''
if None == self.header:
return 0
return len(self.header)
[docs] def data(self, index, role=Qt.DisplayRole):
'''The data stored under the given role for the item referred
to by the index.
Args:
index (:obj:`QtCore.QModelIndex`): Index
role (:obj:`Qt.ItemDataRole`): Default :obj:`Qt.DisplayRole`
Returns:
data
'''
if role == Qt.DisplayRole:
row = self._data[index.row()]
if (index.column() == 0) and (type(row) != dict):
return row
elif index.column() < self.columnCount():
if type(row) == dict:
if self.header[index.column()] in row:
return row[self.header[index.column()]]
elif self.header[index.column()].lower() in row:
return row[self.header[index.column()].lower()]
return row[index.column()]
return None
elif role == Qt.FontRole:
return QtGui.QFont().setPointSize(30)
elif role == Qt.DecorationRole and index.column() == 0:
return None
elif role == Qt.TextAlignmentRole:
return Qt.AlignLeft;
[docs] def raw_data(self, i):
'''Provides a way to get the raw data in the model.
Args:
i (:obj:`int`): The data index to be retrieved.
Returns:
dict. The data held at the given index, otherwise None.
'''
if i < len(self._data):
return self._data[i]
return None
[docs] class Upload(Base):
'''Expands on the Base QAbstractTableModel for Add operation.
Data held in this DataModel is sorted based on their offset within
the IDB. A couple of additional functions are added to this model
to provide more functionality to modify the selected underlying
data.
'''
def __init__(self, header, data, parent=None):
super(FIRST.Model.Upload, self).__init__(header, data, parent)
self._data.sort(cmp=lambda x,y: cmp(x.offset, y.offset))
self.__original_data = self._data
self.select_all_flag = False
self.rows_selected = set()
[docs] def set_row_selected(self, row):
'''Causes a row to be selected or deselected.
Args:
row (:obj:`int`): The row index to be selected.
'''
if row in self.rows_selected:
self.rows_selected.remove(row)
else:
self.rows_selected.add(row)
[docs] def select_all(self, flag):
'''Makes all visible functions selected or deselected.
Args:
flag (:obj:`bool`): Flag to select or deselect all.
'''
self.rows_selected = set(xrange(len(self._data))) if flag else set()
[docs] def filter_sub_functions(self, flag):
'''Filters out or restores any sub_* functions.
Args:
flag (:obj:`bool`): Flag to filter out or restore sub_*
functions
'''
self.beginResetModel()
if flag:
self._data = [d for d in self._data if not d.name.startswith('sub_')]
else:
self._data = self.__original_data
self.endResetModel()
self.select_all(False)
[docs] def set_colors(self, changed='66d9ef', unchanged='d2d2d2', default='ffffff', select='a9c5ff'):
'''Sets the colors associated with the various properties.
Args:
changed (:obj:`str`): Change color, default: '66d9ef'
unchanged (:obj:`str`): Unchanged color, default: 'd2d2d2'
default (:obj:`str`): Default color, default: 'ffffff'
select (:obj:`str`): Selected color, default: 'a9c5ff'
'''
colors = [changed, unchanged, default, select]
if None in [re.match('^[a-fA-F0-9]{6}$', x) for x in colors]:
# Invalid color provided
return
self.colors = []
for c in colors:
r, g, b = int(c[:2], 16), int(c[2:4], 16), int(c[-2:], 16)
self.colors.append(QtGui.QBrush(QtGui.QColor.fromRgb(r, g, b)))
[docs] def data(self, index, role):
if not index.isValid():
return None
if not (0 <= index.row() < self.rowCount()):
return None
elif role == Qt.FontRole:
return QtGui.QFont().setPointSize(30)
elif role == Qt.DecorationRole and index.column() == 0:
return None
elif role == Qt.TextAlignmentRole:
return Qt.AlignLeft;
# Color background
if role == Qt.BackgroundRole:
function = self._data[index.row()]
# Row is selected
if index.row() in self.rows_selected:
return FIRST.color_selected
# Data has been updated since original
if function.has_changed:
return FIRST.color_changed
#
if function.id is not None:
return FIRST.color_unchanged
# Return the default color
return FIRST.color_default
if role == Qt.DisplayRole:
function = self._data[index.row()]
column = index.column()
if 0 == column:
return '0x{0:X}'.format(function.address)
elif 1 == column:
return function.name
elif 2 == column:
return function.prototype
elif 3 == column:
return function.comment
return None
return super(FIRST.Model.Upload, self).data(index, role)
[docs] def get_selected_data(self):
'''Returns the list of data selected in the model.'''
return [self._data[x] for x in self.rows_selected]
[docs] class Check(QtGui.QStandardItemModel):
'''Expands on the Qt QStandardItemModel for Check operations.'''
def __init__(self, data, parent=None):
super(FIRST.Model.Check, self).__init__(parent)
self._data = {}
self.select_highest_flag = False
self.ids_selected = set()
self.applied_ids = set()
self.add_data(data)
[docs] def add_data(self, data):
'''Provides a way to add more data to the model.
Args:
data (:obj:`dict`): Data to be added to the model.
'''
self._data.update(data)
for address in data:
function = FIRST.Metadata.get_function(address)
if function and function.id:
self.applied_ids.add((address, function.id))
[docs] def set_id_selected(self, data):
'''Add or removes data associated with an ID to/from the
selected ids array.
Args:
data (:obj:`list`): The data to be (de)selected.
'''
if not data or 2 != len(data):
return
if data in self.ids_selected:
self.ids_selected.remove(data)
else:
address, data_id = data
# Find if any other matches that have been selected for
# that address, id pair
for match in self._data[address]:
key = (address, match.id)
if key in self.ids_selected:
self.ids_selected.remove(key)
self.ids_selected.add(data)
[docs] def select_highest_ranked(self, flag, hidden=[]):
'''Sets the highsest rank annotations as (de)selected.
Args:
flag (:obj:`bool`): Where to select or deselect the highest.
True to select highest, False to deselect highest.
hidden (:obj:`list` or :obj:`int`): Address that should be
skipped.
'''
self.ids_selected = set()
func = lambda x: x[0]
# Iterate through each match set, selecting the highest ranked
if flag:
# Reset list
self.ids_selected = set()
for address, matches in self._data.iteritems():
# If address is hidden then skip it
if address in hidden:
continue
ids = {}
# Group highest similarity percentages first
for match in matches:
if match.similarity not in ids:
ids[match.similarity] = []
ids[match.similarity].append(match)
# Get highest similiarity group
index = max(ids)
similar = ids[index]
# Get highest ranked metadata
match = max(similar, key=lambda x: x.rank).id
self.set_id_selected((address, match))
[docs] def unselect_group(self, data):
'''Unselects a group of addresses at once.
Args:
data (:obj:`list` of :obj:`int`): List of addresses.
'''
lookup = {d[0] : d for d in self.ids_selected}
for address in data:
if address in lookup:
self.ids_selected.remove(lookup[address])
[docs] def data(self, index, role):
'''The data stored under the given role for the item referred
to by the index.
Args:
index (:obj:`QtCore.QModelIndex`): Index
role (:obj:`Qt.ItemDataRole`): Default :obj:`Qt.DisplayRole`
Returns:
data
'''
if not index.isValid():
return None
# Color background
if role == Qt.BackgroundRole:
metadata_id = index.data(FIRSTUI.ROLE_ID)
address = index.data(FIRSTUI.ROLE_ADDRESS)
if (metadata_id and address
and ((address, metadata_id) in self.ids_selected)):
return FIRST.color_selected
elif (metadata_id and address
and ((address, metadata_id) in self.applied_ids)):
return FIRST.color_applied
# Data has been updated since original
elif not metadata_id:
return FIRST.color_unchanged
# Return the default color
return FIRST.color_default
return super(FIRST.Model.Check, self).data(index, role)
[docs] def get_selected_data(self):
'''Returns a dictionary of data selected in the model.'''
data = {}
for (address, _id) in self.ids_selected:
if address not in self._data:
continue
match = [x for x in self._data[address] if x.id == _id]
if match:
data[address] = match[0]
return data
[docs] class TreeView(QtWidgets.QTreeView):
'''A QT QTreeView Implementation.
Args:
widget (:obj:`Qt.QObject`, optional): The parent.
depth (:obj:`int`, optional): The depth of the tree.
'''
def __init__(self, widget=None, depth=2):
super(FIRST.Model.TreeView, self).__init__(widget)
self.__depth = depth
[docs] def drawRow(self, painter, option, index):
'''Draws the row in the tree view that contains the model item
index, using the painter given. The option control how the item
is displayed.
Args:
painter (:obj:`QtGui.QPainter`): Painter
option (:obj:`QtGui.QStyleOptionViewItem`): Options
index (:obj:`QtCore.QModelIndex`): Index
'''
metadata_id = index.data(FIRSTUI.ROLE_ID)
header = self.header()
firstSection = header.logicalIndex(0)
left = header.sectionViewportPosition(firstSection)
indent = self.__depth * self.indentation()
if (index.data(FIRSTUI.ROLE_COMMENT)
and (index.row() == 0) and (index.column() == 0)):
lastSection = header.logicalIndex(header.count() - 1)
right = header.sectionViewportPosition(lastSection) + header.sectionSize(lastSection)
left += indent;
option.rect.setX(left)
option.rect.setWidth(right - left)
self.itemDelegate(index).paint(painter, option, index)
else:
super(FIRST.Model.TreeView, self).drawRow(painter, option, index)
[docs] class Callbacks(object):
'''Callbacks for FIRST's Dialog UI components.
This class contains only static methods and should be accessed as such.
'''
[docs] @staticmethod
def accepted(fclass, dialog):
'''Registered callback for accept dialog action.
Args:
fclass (:obj:`idaapi.PluginForm`): The plugin form part of
dialog (:obj:`FIRSTUI.*`): A dialog box object.
'''
if (isinstance(dialog, FIRSTUI.Upload)
or isinstance(dialog, FIRSTUI.UploadAll)):
FIRST.Callbacks.Upload(dialog)
elif (isinstance(dialog, FIRSTUI.Check)
or isinstance(dialog, FIRSTUI.CheckAll)):
FIRST.Callbacks.check(dialog)
elif isinstance(dialog, FIRSTUI.Welcome):
FIRST.Callbacks.welcome(dialog)
[docs] @staticmethod
def welcome(dialog):
'''Welcome dialog box handler.
Args:
dialog (:obj:`FIRSTUI.Welcome`): Welcome dialog box.
'''
FIRST.config = FIRSTUI.SharedObjects.get_config(dialog)
FIRST.config.save_config(FIRST.config_path)
info = FIRST.Info.get_file_details()
FIRST.server = FIRST.Server(FIRST.config,
info['md5'],
info['crc32'],
h_sha1=info['sha1'],
h_sha256=info['sha256'])
FIRST.Metadata.populate_function_list()
FIRST.plugin_enabled = True
[docs] @staticmethod
def check(dialog):
'''Check and CheckAll dialog box handler.
Args:
dialog (:obj:`FIRSTUI.Check` or :obj:`FIRSTUI.CheckAll`): Check
or CheckAll dialog box.
'''
data = dialog.data_model.get_selected_data()
if data:
message = ('Applying metadata to {0} signature(s)\n'
' {1}% complete')
i = 0.0
total = len(data)
idaapi.show_wait_box(message.format(total, int(i)))
try:
for address, metadata in data.iteritems():
function = FIRST.Metadata.get_function(address)
if idaapi.wasBreak():
raise FIRST.Error('canceled')
if function:
percentage = int((i / total) * 100)
msg = message.format(total, percentage)
idaapi.replace_wait_box(msg)
# Check if metadata was already applied to the file
# If so, tell FIRST server it was unapplied
if function.id:
FIRST.server.unapplied(function.id)
# Apply metadata and inform FIRST it was applied
function.apply_metadata(metadata)
FIRST.server.applied(metadata.id)
else:
msg = '[1st] Error: getting function at {0:x}\n'.format(address)
idaapi.execute_ui_requests((FIRSTUI.Requests.Print(msg),))
i += 1
except FIRST.Error as e:
idaapi.replace_wait_box('Not all metadata was applied')
time.sleep(1)
finally:
idaapi.hide_wait_box()
message = 'Applied metadata data to {} out of {} functions\n'
idaapi.execute_ui_requests((FIRSTUI.Requests.Print(message.format(int(i), total)),))
else:
idaapi.execute_ui_requests((FIRSTUI.Requests.Print('No functions were selected\n'),))
[docs] class Upload(object):
'''Uploading/Adding callback class.
This class is basic and sets up data and complete callbacks for the
add operation.
Args:
dialog (:obj:`FIRSTUI.Upload` or :obj:`FIRSTUI.UploadAll`):
Dialog box the accepted button was selected.
Attributes:
message (:obj:`str`): Format string for the wait box message.
'''
message = ('Uploading metadata for {0} function(s)\n'
' {1}% complete')
def __init__(self, dialog):
data = []
self.__to_update = []
if isinstance(dialog, FIRSTUI.UploadAll):
data = dialog.get_selected_data()
elif isinstance(dialog, FIRSTUI.Upload):
data.append(dialog.metadata)
self.total = len(data)
self.uploaded = 0.0
# If there is no data of incorrect dialog received then exit
if not data:
return
idaapi.show_wait_box(self.message.format(self.total, 0))
thread = FIRST.server.add(data, self.__data, self.__complete)
def __data(self, thread, data):
if ('failed' in data) and data['failed']:
idaapi.hide_wait_box()
if 'msg' not in data:
return
msg = '[1st] Error: {}'.format(data['msg'])
print msg
#idaapi.execute_ui_requests((FIRSTUI.Requests.Print(msg),))
return
if ('results' not in data):
idaapi.hide_wait_box()
msg = '[1st] Error: no results returned'
print msg
#idaapi.execute_ui_requests((FIRSTUI.Requests.Print(msg),))
return
results = data['results']
for address, metadata_id in results.iteritems():
# Update Wait Box
self.uploaded += 1
percentage = int((self.uploaded / self.total) * 100)
msg = self.message.format(self.total, percentage)
idaapi.replace_wait_box(msg)
if idaapi.wasBreak():
FIRST.server.stop_operation(thread)
msg = 'Not all functions were added to FIRST'
idaapi.execute_ui_requests((FIRSTUI.Requests.Print(msg),))
return
f = FIRST.Metadata.get_function(int(address))
if f:
if f.id and (f.id != metadata_id):
FIRST.server.unapplied(f.id)
f.id = metadata_id
self.__to_update.append(f)
def __complete(self, thread, data):
FIRST.server.remove_operation(thread)
idaapi.hide_wait_box()
msg = 'Added {} function(s) to FIRST\n'.format(int(self.uploaded))
idaapi.execute_ui_requests((FIRSTUI.Requests.Print(msg),))
changed = {'changed' : False}
func = FIRSTUI.Requests.Callback
updates = [func(x.update_db, **changed) for x in self.__to_update]
idaapi.execute_ui_requests(updates)
[docs] class Update(object):
'''Updating callback class.
This class is basic and sets up data and complete callbacks for the
add operation.
Args:
dialog (:obj:`FIRSTUI.Update`): Dialog box the accepted button
was selected.
'''
def __init__(self):
to_update = FIRST.Metadata.get_functions_with_applied_metadata()
self.functions = {f.id : f for f in to_update if f.id}
self.total = len(self.functions)
self.updated = 0
server_thread = FIRST.server.get(self.functions.values(),
self.__data, self.__complete)
def __data(self, thread, data):
if (not data) and (dict != type(data)):
return
for metadata_id, metadata in data.iteritems():
if metadata_id not in self.functions:
continue
self.updated += 1
function = self.functions[metadata_id]
function.apply_metadata(metadata)
# Remove function from dictionary
del self.functions[metadata_id]
def __complete(self, thread, data):
msg = 'Updated {} function(s)\n'.format(self.updated)
idaapi.execute_ui_requests((FIRSTUI.Requests.Print(msg),))
# If functions are still in the queue, this means the metadata
# was deleted on the server remove the id from it
for function in self.functions.values():
function.id = None
class Hook():
@staticmethod
def function_rptcmt(is_preprocess=True):
'''Handler for creating function repeatable comments (UI Hook).
Args:
is_preprocess (:obj:`bool`, optional): True if called during
preprocess, False when called during postprocess.
'''
function = FIRST.Metadata.get_function(IDAW.ScreenEA())
if not function:
return
if is_preprocess:
function.snapshot()
else:
function.update_comment()
@staticmethod
def function_named(is_preprocess=True):
'''Handler for when labels are changed (UI Hook).
Args:
is_preprocess (:obj:`bool`, optional): True if called during
preprocess, False when called during postprocess.
'''
function = FIRST.Metadata.get_function(IDAW.ScreenEA())
if not function:
return
if is_preprocess:
function.snapshot()
else:
function.update_name()
@staticmethod
def function_settype(is_preprocess=True):
'''Handler for when a function prototype is changed (UI Hook).
Args:
is_preprocess (:obj:`bool`, optional): True if called during
preprocess, False when called during postprocess.
'''
function = FIRST.Metadata.get_function(IDAW.ScreenEA())
if not function:
return
if is_preprocess:
function.snapshot()
else:
function.update_prototype()
@staticmethod
def function_created(is_preprocess=True):
'''Handler for when functions are created (UI Hook).
Adds a new funciton to FIRST's function list.
Args:
is_preprocess (:obj:`bool`, optional): True if called during
preprocess, False when called during postprocess.
'''
if dict != type(FIRST.function_list):
return
# No action needed for preprocess
if is_preprocess:
return
ea = IDAW.ScreenEA()
# Ensure it is a function or was correctly created
function = IDAW.get_func(ea)
if not function:
return
# Calculate offset to function from segment
function = FIRST.MetadataShim(ea)
seg_offset = function.segment - IDAW.get_imagebase()
if seg_offset not in FIRST.function_list:
FIRST.function_list[seg_offset] = {}
if function.offset in FIRST.function_list[seg_offset]:
return
msg = 'Adding {0.name} (0x{0.address:x} to global list\n'
idaapi.execute_ui_requests((FIRSTUI.Requests.Print(msg.format(function)),))
FIRST.function_list[seg_offset][function.offset] = function
old_imagebase = None
@staticmethod
def function_rebase(is_preprocess=True):
'''Handler for when the program is rebased in IDA (UI Hook).
Args:
is_preprocess (:obj:`bool`, optional): True if called during
preprocess, False when called during postprocess.
'''
if is_preprocess:
FIRST.Hook.old_imagebase = IDAW.get_imagebase()
return
if FIRST.Hook.old_imagebase == IDAW.get_imagebase():
return
for segment in FIRST.Metadata.get_segments_with_functions():
offset = segment.startEA - IDAW.get_imagebase()
for metadata in FIRST.Metadata.get_segment_functions(segment):
adjustment = IDAW.get_imagebase() - FIRST.Hook.old_imagebase
metadata.address = metadata.address + adjustment
class IDP(idaapi.IDP_Hooks):
'''FIRST's IDP Hook. Initializes most of the FIRST plugin.
Attributes:
executed (:obj:`bool`): Flag to understand if the hook has
fired or not.
'''
executed = False
def __init__(self):
super(FIRST.Hook.IDP, self).__init__()
def on_auto_queue_empty(self, arg):
if (arg == 200) and (not FIRST.Hook.IDP.executed):
FIRST.Hook.IDP.executed = True
self.unhook()
if self in FIRST.installed_hooks:
i = FIRST.installed_hooks.index(self)
del FIRST.installed_hooks[i]
FIRST.Metadata.populate_function_list()
# Get/Initialize the hash details for the file
FIRST.Info.get_file_details()
config = ConfigParser.RawConfigParser()
if not config.read(FIRST.config_path):
FIRST.show_welcome = True
config = None
else:
# Create connection to FIRST server
FIRST.config = FIRST.Configuration(config)
info = FIRST.Info.get_file_details()
FIRST.server = FIRST.Server(FIRST.config,
info['md5'],
info['crc32'],
h_sha1=info['sha1'],
h_sha256=info['sha256'])
FIRST.plugin_enabled = True
if idaapi.get_kernel_version().startswith("7"):
def ev_auto_queue_empty(self, arg):
self.on_auto_queue_empty(arg)
return super(self.__class__, self).ev_auto_queue_empty(arg)
else:
def auto_queue_empty(self, arg):
self.on_auto_queue_empty(arg)
return super(self.__class__, self).auto_queue_empty(arg)
class UI(idaapi.UI_Hooks):
'''FIRST's UI Hook. Sets UI change hooks and right click menu.'''
def __init__(self):
super(FIRST.Hook.UI, self).__init__()
self.handlers_created = False
self.action_names = [ 'first:get_func', 'first:get_all_func',
'first:upload_func', 'first:upload_all_func',
'first:update_funcs', 'first:view_history']
self.actions = None
self.handlers = {'MakeRptCmt' : [FIRST.Hook.function_rptcmt],
'MakeName' : [FIRST.Hook.function_named],
'SetType' : [FIRST.Hook.function_settype],
'MakeFunction' : [FIRST.Hook.function_created],
'RebaseProgram' : [FIRST.Hook.function_rebase]}
self.handler_action = ''
def tform_visible(self, form, hwnd):
'''Shows the FIRST Welcome dialog box if required.'''
if ((IDAW.BWN_DISASMS == IDAW.get_tform_type(form))
and FIRST.show_welcome):
parent = idaapi.PluginForm.FormToPyQtWidget(form)
welcome_dialog = FIRSTUI.Dialog(parent, FIRSTUI.Welcome)
welcome_dialog.registerSuccessCallback(FIRST.Callbacks.welcome)
welcome_dialog.show()
FIRST.show_welcome = False
def finish_populating_tform_popup(self, form, popup):
'''Initializes UI change hooks and enables right click menu.'''
if None == FIRST.plugin:
return
if None == self.actions:
self.actions = [
{'name' : self.action_names[0],
'text' : 'Check FIRST for this function',
'handler' : FIRST.plugin.check_function,
'shortcut' : None,
'tooltip' : ('See if metadata for this function'
' exists')},
{'name' : self.action_names[1],
'text' : 'Query FIRST for all function matches',
'handler' : FIRST.plugin.check_all_function,
'shortcut' : None,
'tooltip' : ('See if metadata for any defined '
'function exists')},
{'name' : self.action_names[2],
'text' : 'Add this function to FIRST',
'handler' : FIRST.plugin.upload_func,
'shortcut' : None,
'tooltip' : 'Add this function to FIRST\'s database'},
{'name' : self.action_names[3],
'text' : 'Add multiple functions to FIRST',
'handler' : FIRST.plugin.upload_all_func,
'shortcut' : None,
'tooltip' : ('Add multiple functions to '
'FIRST\'s database')},
{'name' : self.action_names[4],
'text' : 'Apply updated metadata from FIRST',
'handler' : FIRST.plugin.update_funcs,
'shortcut' : None,
'tooltip' : ('Any applied metadata will be updated '
'from FIRST\'s database')},
{'name' : self.action_names[5],
'text' : 'View metadata history',
'handler' : FIRST.plugin.view_history,
'shortcut' : None,
'tooltip' : 'See how metadata has changed over time'},
]
if not self.handlers_created:
self.init_actions()
self.handlers_created = True
tform_type = IDAW.get_tform_type(form)
if IDAW.BWN_DISASMS == tform_type and FIRST.plugin_enabled:
func = IDAW.get_func(IDAW.ScreenEA())
IDAW.attach_action_to_popup(form, popup, '')
for name in self.action_names[:-1]:
if ((type(func) != idaapi.func_t)
and (self.action_names.index(name) in [0, 2])):
continue
IDAW.attach_action_to_popup(form, popup, name)
if type(func) == idaapi.func_t:
function = FIRST.Metadata.get_function(func.startEA)
if function and function.id:
IDAW.attach_action_to_popup(form, popup, self.action_names[-1])
def init_actions(self):
'''Sets up action descriptors.'''
global FIRST_ICON
for action_item in self.actions:
handler = FIRST.Hook.ActionHandler(action_item['handler'])
action_desc = IDAW.action_desc_t(action_item['name'],
action_item['text'],
handler,
action_item['shortcut'],
action_item['tooltip'],
FIRST_ICON)
IDAW.register_action(action_desc)
def preprocess(self, name):
'''UI Hooks preprocessing call.
Args:
name (:obj:`str`): The action that will occur.
'''
self.handler_action = name
if self.handler_action in self.handlers:
handlers = self.handlers[self.handler_action]
for handler in handlers:
handler(True)
return 0
def postprocess(self):
'''UI Hooks postprocessing call.'''
if self.handler_action in self.handlers:
handlers = self.handlers[self.handler_action]
for handler in handlers:
handler(False)
return 0
def term(self):
'''Removes all installed hooks.'''
FIRST.cleanup_hooks()
class ActionHandler(idaapi.action_handler_t):
'''Action handler wrapper function.
This function wraps callback functions with a class to prevent
duplication of code.
Args:
fn (:obj:`function`): Function that will be used for callback.
'''
def __init__(self, fn):
idaapi.action_handler_t.__init__(self)
self.fn = fn
def activate(self, ctx):
self.fn(ctx)
return 1
def update(self, ctx):
return IDAW.AST_ENABLE_ALWAYS
[docs]class FIRSTUI(object):
ROLE_ID = 35
ROLE_COMMENT = 36
ROLE_ADDRESS = 37
ROLE_NAME = 38
[docs] class Requests(object):
[docs] class MsgBox(object):
def __init__(self, title, msg, icon=QtWidgets.QMessageBox.Critical):
self.title = title
self.msg = msg
self.icon = icon
def __call__(self):
msg_box = QtWidgets.QMessageBox()
msg_box.setIcon(self.icon)
msg_box.setWindowTitle(self.title)
msg_box.setText(self.msg)
msg_box.exec_()
return False # Don't reschedule
[docs] class Print(object):
def __init__(self, msg):
self.msg = msg
def __call__(self):
IDAW.msg(self.msg)
return False # Don't reschedule
[docs] class Callback(object):
def __init__(self, func, **kwargs):
self.func = func
self.kwargs = kwargs
def __call__(self):
self.func(**self.kwargs)
return False # Don't reschedule
class ScrollWidget(QtWidgets.QWidget):
'''A scroll widget'''
def __init__(self, parent=None, frame=QtWidgets.QFrame.Box):
super(FIRSTUI.ScrollWidget, self).__init__()
# Container Widget
widget = QtWidgets.QWidget()
# Layout of Container Widget
self.layout = QtWidgets.QVBoxLayout(self)
self.layout.setContentsMargins(0, 0, 0, 0)
widget.setLayout(self.layout)
# Scroll Area Properties
scroll = QtWidgets.QScrollArea()
scroll.setFrameShape(frame)
scroll.setVerticalScrollBarPolicy(Qt.ScrollBarAlwaysOn)
scroll.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
scroll.setWidgetResizable(True)
scroll.setWidget(widget)
# Scroll Area Layer add
scroll_layout = QtWidgets.QVBoxLayout(self)
scroll_layout.addWidget(scroll)
scroll_layout.setContentsMargins(0, 0, 0, 0)
self.setLayout(scroll_layout)
def addWidget(self, widget):
self.layout.addWidget(widget)
def addLayout(self, layout):
self.layout.addLayout(layout)
[docs] class SharedObjects(object):
#----------------------------------------------------------------------
[docs] @staticmethod
def server_config_layout(obj, outer_layout, config=None):
'''Server Configuration GUI components'''
if not isinstance(config, FIRST.Configuration):
config = None
server_groupbox = QtWidgets.QGroupBox()
server_groupbox.setTitle('Server Configuration')
vbox = QtWidgets.QVBoxLayout(server_groupbox)
grid_layout = QtWidgets.QGridLayout(server_groupbox)
vbox.addLayout(grid_layout)
obj.server = QtWidgets.QLineEdit()
obj.port = QtWidgets.QLineEdit()
obj.api_key = QtWidgets.QLineEdit()
obj.protocol = QtWidgets.QComboBox()
proto_options = ['http', 'https']
[obj.protocol.addItem(x.upper(), x) for x in proto_options]
obj.verify = QtWidgets.QComboBox()
options = ['No', 'Yes']
[obj.verify.addItem(x, x.lower()) for x in options]
obj.kerberos = QtWidgets.QComboBox()
[obj.kerberos.addItem(x, x.lower()) for x in options]
obj.kerberos.setEnabled(HTTPKerberosAuth is not None)
if config:
obj.server = QtWidgets.QLineEdit(config.server)
obj.port = QtWidgets.QLineEdit(str(config.port))
obj.protocol.setCurrentIndex(proto_options.index(config.protocol))
obj.verify.setCurrentIndex(int(config.verify))
obj.kerberos.setCurrentIndex(int(config.authentication))
obj.kerberos.setEnabled(HTTPKerberosAuth is not None)
obj.api_key.setText(config.api_key)
layout = QtWidgets.QHBoxLayout()
obj.server_message = QtWidgets.QLabel()
layout.addWidget(obj.server_message)
layout.addStretch()
test_button = QtWidgets.QPushButton('Test')
test_callback = lambda x: FIRSTUI.SharedObjects.test_connection(obj)
test_button.clicked.connect(test_callback)
layout.addWidget(test_button)
vbox.addSpacing(20)
vbox.addLayout(layout)
grid_layout.addWidget(QtWidgets.QLabel('Server'), 0, 0)
grid_layout.addWidget(obj.server, 0, 1)
grid_layout.addWidget(QtWidgets.QLabel('Port'), 1, 0)
grid_layout.addWidget(obj.port, 1, 1)
grid_layout.addWidget(QtWidgets.QLabel('Protocol'), 2, 0)
grid_layout.addWidget(obj.protocol, 2, 1)
grid_layout.addWidget(QtWidgets.QLabel('Verify'), 3, 0)
grid_layout.addWidget(obj.verify, 3, 1)
grid_layout.addWidget(QtWidgets.QLabel('Use Kerberos'), 4, 0)
grid_layout.addWidget(obj.kerberos, 4, 1)
grid_layout.addWidget(QtWidgets.QLabel('API Key'), 5, 0)
grid_layout.addWidget(obj.api_key, 5, 1)
grid_layout.setColumnMinimumWidth(0, 75)
grid_layout.setSpacing(10)
grid_layout.setContentsMargins(10, 10, 10, 10)
outer_layout.addWidget(server_groupbox)
[docs] @staticmethod
def test_connection(obj):
obj.server_message.setText('... testing connection ...')
data = {'server' : obj.server.text()}
thread = threading.Thread(target=FIRSTUI.SharedObjects._thread_test_connection,
args=(obj,))
thread.daemon = True
thread.start()
@staticmethod
def _thread_test_connection(obj):
info = FIRST.Info.get_file_details()
config = FIRSTUI.SharedObjects.get_config(obj)
if not re.match('^[\da-f]{8}-([\da-f]{4}-){3}[\da-f]{12}$', config.api_key.lower()):
obj.server_message.setText('Valid API Key not provided')
return
server = FIRST.Server( FIRSTUI.SharedObjects.get_config(obj),
info['md5'],
info['crc32'],
h_sha1=info['sha1'],
h_sha256=info['sha256'])
if not server.test_connection():
obj.server_message.setText('Failed to establish connection with server')
else:
obj.server_message.setText('Connected to FIRST server')
[docs] @staticmethod
def get_config(obj):
config = FIRST.Configuration(None)
config.set_server(obj.server.text())
config.set_port(obj.port.text())
config.set_protocol(obj.protocol.currentText().lower())
config.set_verify(obj.verify.currentText().lower() == 'yes')
config.set_authentication(obj.kerberos.currentText().lower() == 'yes')
config.set_api_key(obj.api_key.text())
return config
# Check, CheckAll, and Management shared components
#----------------------------------------------------------------------
[docs] @staticmethod
def make_match_info(match, full=True, check_all=True):
'''
Build a tree item for a function_ea node (level-1)
This is the function match information (name, prototype, rank)
@param function_context: a dbFunction_Context object
@return: QStandradItemModel item for the function context
'''
# Add (name, <empty> rank, similarity, prototype, ., engines, ., user) row
name = QtGui.QStandardItem(match.name)
rank = QtGui.QStandardItem('-')
if 'rank' in dir(match):
rank = QtGui.QStandardItem(str(match.rank))
rank.setTextAlignment(Qt.AlignCenter)
prototype = QtGui.QStandardItem(match.prototype)
prototype_tooltip = QtGui.QStandardItem('...')
info = [name, rank, prototype]
if full:
engine_info = match.engine_info
msg = '<p><b>{}</b><br/>{}</p>'
tooltip = [msg.format(k,v) for k,v in engine_info.iteritems()]
tooltip = '<hr style="margin:1px"/>'.join(tooltip)
prototype_tooltip.setToolTip(match.prototype)
prototype_tooltip.setTextAlignment(Qt.AlignCenter)
similarity = QtGui.QStandardItem(str(round(match.similarity, 2)) + '%')
similarity.setTextAlignment(Qt.AlignCenter)
engines = QtGui.QStandardItem(', '.join(engine_info.keys()))
engines_tooltip = QtGui.QStandardItem('...')
engines_tooltip.setTextAlignment(Qt.AlignCenter)
engines_tooltip.setToolTip(tooltip)
creator = QtGui.QStandardItem(match.creator)
info = [name]
if check_all:
info += [QtGui.QStandardItem()]
info += [ rank, similarity, prototype, prototype_tooltip,
engines, engines_tooltip, creator]
# Add row:
# Comment:
# (comment)
comment = match.comment
if not comment:
comment = '- No Comment -'
comment = QtGui.QStandardItem('Comment:\n' + comment)
comment.setColumnCount(8)
comment.setData(True, role=FIRSTUI.ROLE_COMMENT)
comment_list = [comment] + \
[QtGui.QStandardItem() for i in range(len(info)-1)]
info[0].appendRow(comment_list)
# Mark all items noneditable and add id associated with the match
for item in info + comment_list:
item.setEditable(False)
item.setData(match.id, role=FIRSTUI.ROLE_ID)
item.setData(match.address, role=FIRSTUI.ROLE_ADDRESS)
return info
[docs] class Generic(object):
def __init__(self):
self.should_show = True
[docs] def get_server_thread(self):
return None
[docs] def setupUi(self, dialog):
self.dialog = dialog
self.init_window()
sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Preferred, QtWidgets.QSizePolicy.Expanding)
sizePolicy.setHorizontalStretch(0)
sizePolicy.setVerticalStretch(2)
self.msg = QtWidgets.QLabel('')
# top_layout
#------------------------------
self.top_layout = QtWidgets.QHBoxLayout()
self.init_top_layout()
# table_layout
#------------------------------
self.data_layout = QtWidgets.QVBoxLayout()
self.data_layout.setContentsMargins(0,0,0,0)
self.data_layout.setSpacing(0)
self.init_data_layout()
# mid_layout
#------------------------------
self.middle_layout = QtWidgets.QHBoxLayout()
self.init_middle_layout()
# bottom_layout
#------------------------------
self.apply_button = QtWidgets.QPushButton('Apply')
self.apply_button.setFixedWidth(100)
self.cancel_button = QtWidgets.QPushButton('Cancel')
self.cancel_button.setFixedWidth(100)
self.bottom_layout = QtWidgets.QHBoxLayout()
self.bottom_layout.addWidget(self.msg)
self.bottom_layout.addWidget(self.apply_button)
self.bottom_layout.addWidget(self.cancel_button)
self.bottom_layout.setContentsMargins(0, 20, 0, 10)
self.init_bottom_layout()
# Vertical Layout
#------------------------------
self.vbox_outer = QtWidgets.QVBoxLayout(dialog)
self.vbox_outer.setObjectName('vbox_outer')
self.vbox_outer.addLayout(self.top_layout)
self.vbox_outer.addLayout(self.data_layout)
self.vbox_outer.addLayout(self.middle_layout)
self.vbox_outer.addLayout(self.bottom_layout)
# Signal Handling
#------------------------------
self.init_signals()
[docs] def init_window(self):
self.dialog.setWindowTitle('FIRST Dialog')
self.dialog.setWindowIcon(get_first_icon())
self.dialog.resize(732, 387)
[docs] def init_top_layout(self):
pass
[docs] def init_middle_layout(self):
pass
[docs] def init_bottom_layout(self):
pass
[docs] def init_signals(self):
self.apply_button.clicked.connect(self.dialog.ok_button_callback)
self.cancel_button.clicked.connect(self.dialog.reject)
[docs] def init_data_layout(self):
pass
[docs] class UploadAll(Generic):
[docs] def init_window(self):
self.dialog.setWindowTitle('FIRST: Mass Function Prototype Upload')
self.dialog.setWindowIcon(get_first_icon())
self.dialog.resize(600, 450)
[docs] def init_top_layout(self):
title = QtWidgets.QLabel('Mass Function Upload')
title.setStyleSheet('font: 16pt;')
description = QtWidgets.QLabel((
'Upload function prototype to server for others to access.\n'
'Select the functions you want to upload. Click to select a '
'function and click again to deselect the function. Once '
'uploaded you can manage prototypes you\'ve created in the '
'management window.'))
description.setWordWrap(True)
description.setLineWidth(200)
description.setStyleSheet('text-size: 90%')
vbox_text = QtWidgets.QVBoxLayout()
vbox_text.addWidget(title)
vbox_text.addWidget(description)
vbox_legend = QtWidgets.QVBoxLayout()
grid_legend = QtWidgets.QGridLayout()
style = 'background-color: #{0:06x}; border: 1px solid #c0c0c0;'
colors = [ FIRST.color_changed, FIRST.color_unchanged,
FIRST.color_default, FIRST.color_selected]
text = ['Changed', 'Unchanged', 'Default', 'Selected']
for i in xrange(len(colors)):
box = QtWidgets.QLabel()
box.setFixedHeight(10)
box.setFixedWidth(10)
box.setStyleSheet(style.format(colors[i].color().rgb() & 0xFFFFFF))
grid_legend.addWidget(box, i, 0)
grid_legend.addWidget(QtWidgets.QLabel(text[i]), i, 1)
vbox_legend.addLayout(grid_legend)
vbox_legend.setAlignment(Qt.AlignRight | Qt.AlignBottom)
self.top_layout.addLayout(vbox_text)
self.top_layout.addStretch()
self.top_layout.addLayout(vbox_legend)
[docs] def init_data_layout(self):
header = ['Address', 'Name', 'Prototype', 'Comment']
self.table_views = []
self.data_models = []
self.total_functions = 0
segment_str = '0x{0.startEA:08x}-0x{0.endEA:08x}: Name "{1}"'
if not FIRST.Info.is_32bit():
segment_str = segment_str.replace(':08x}', ':016x}')
# Cycle through segments
segments = FIRST.Metadata.get_segments_with_functions()
if not segments:
msg_box = QtWidgets.QMessageBox()
msg_box.setIcon(QtWidgets.QMessageBox.Critical)
msg_box.setWindowTitle('Unable to derive functions')
msg_box.setText(('Cannot upload function. FIRST cannot '
'find any functions'))
self.should_show = False
return
self.scroll_layout = FIRSTUI.ScrollWidget()
idaapi.show_wait_box('Getting all defined functions...')
try:
first_segment = True
for segment in segments:
segment_name = IDAW.SegName(segment.startEA)
segment_info = segment_str.format(segment, segment_name)
segment_label = QtWidgets.QLabel(segment_info)
segment_label.setContentsMargins(10, 5, 0, 5)
func_db = FIRST.Metadata.get_segment_functions(segment)
self.total_functions += len(func_db)
if idaapi.wasBreak():
raise FIRST.Error('canceled')
data_model = FIRST.Model.Upload(header, func_db)
table_view = QtWidgets.QTableView()
table_view.setModel(data_model)
table_view.setSortingEnabled(False)
table_view.setSelectionMode(QtWidgets.QAbstractItemView.NoSelection)
table_view.setAlternatingRowColors(False)
table_view.setShowGrid(False)
hdr = table_view.verticalHeader()
hdr.setHighlightSections(False)
hdr.setDefaultSectionSize(hdr.minimumSectionSize())
hdr.hide()
hdr = table_view.horizontalHeader()
hdr.setHighlightSections(False)
hdr.setDefaultAlignment(Qt.AlignLeft)
hdr.setStretchLastSection(True)
table_view.resizeColumnsToContents()
table_view.setColumnWidth(0, table_view.columnWidth(0) + 10)
table_view.setColumnWidth(1, table_view.columnWidth(1) + 10)
table_view.setColumnWidth(2, table_view.columnWidth(2) + 10)
self.table_views.append(table_view)
self.data_models.append(data_model)
if not first_segment:
self.data_layout.addSpacing(5)
first_segment = False
self.scroll_layout.addWidget(segment_label)
self.scroll_layout.addWidget(table_view)
if idaapi.wasBreak():
raise FIRST.Error('canceled')
except FIRST.Error as e:
if 'canceled' == e.value:
self.should_show = False
finally:
idaapi.hide_wait_box()
self.data_layout.addWidget(self.scroll_layout)
[docs] def init_middle_layout(self):
if not self.should_show:
return
vbox = QtWidgets.QVBoxLayout()
self.select_all = QtWidgets.QCheckBox('Select All ')
self.filter_sub_funcs = QtWidgets.QCheckBox('Filter Out "sub_" functions ')
vbox.addWidget(self.filter_sub_funcs)
vbox.addWidget(self.select_all)
format_str = '{} functions'.format(self.total_functions)
self.function_number = QtWidgets.QLabel(format_str)
self.function_number.setAlignment(Qt.AlignTop)
self.middle_layout.addWidget(self.function_number)
self.middle_layout.addStretch()
self.middle_layout.addLayout(vbox)
[docs] def init_bottom_layout(self):
self.apply_button.setText('Upload')
[docs] def init_signals(self):
super(FIRSTUI.UploadAll, self).init_signals()
self.select_all.stateChanged.connect(self.select_all_callback)
self.filter_sub_funcs.stateChanged.connect(self.filter_sub_callback)
callback = lambda y, z: lambda x: self.table_clicked(x, y, z)
for i in xrange(len(self.table_views)):
table_view = self.table_views[i]
data_model = self.data_models[i]
table_view.clicked.connect(callback(table_view, data_model))
[docs] def filter_sub_callback(self, value):
self.total_functions = 0
for data_model in self.data_models:
data_model.filter_sub_functions(self.filter_sub_funcs.isChecked())
self.total_functions += data_model.rowCount()
format_str = '{} functions'
if self.filter_sub_funcs.isChecked():
format_str = '{} filtered functions'
self.function_number.setText(format_str.format(self.total_functions))
if self.select_all.isChecked():
self.select_all.setChecked(False)
[docs] def select_all_callback(self, value):
for data_model in self.data_models:
data_model.select_all(self.select_all.isChecked())
for table_view in self.table_views:
table_view.reset()
[docs] def table_clicked(self, index, table_view, data_model):
# If select all is selected then the user is trying to exclude some
# prototypes
if self.select_all.isChecked():
# Disconnect checkbox signal to prevent reentry
self.select_all.stateChanged.disconnect(self.select_all_callback)
self.select_all.setChecked(False)
self.select_all.stateChanged.connect(self.select_all_callback)
data_model.set_row_selected(index.row())
table_view.reset()
[docs] def get_selected_data(self):
metadata = []
for data_model in self.data_models:
metadata += data_model.get_selected_data()
return metadata
[docs] class Upload(Generic):
[docs] def init_top_layout(self):
self.dialog.setWindowTitle('FIRST: Function Prototype Upload')
self.dialog.setWindowIcon(get_first_icon())
self.dialog.resize(600, 250)
[docs] def init_data_layout(self):
function = IDAW.get_func(IDAW.ScreenEA())
if not function:
msg_box = QtWidgets.QMessageBox()
msg_box.setIcon(QtWidgets.QMessageBox.Critical)
msg_box.setWindowTitle('FIRST: No Function Selected')
msg_box.setText('No address was selected in the IDA View. '
'Please select a function and try again.')
self.should_show = False
return
self.metadata = FIRST.Metadata.get_function(function.startEA)
if not self.metadata:
temp_str = 'Unable to retrieve function at 0x{0:x}'
raise FIRST.Error(temp_str.format(IDAW.ScreenEA()))
data = [('Name:', self.metadata.name, QtWidgets.QLineEdit),
('Prototype:', self.metadata.prototype, QtWidgets.QLineEdit),
('Comments:', self.metadata.comment, QtWidgets.QPlainTextEdit)]
hdr_font = lambda x : 'font: {0}pt;'.format(x)
title = QtWidgets.QLabel('Function Upload')
title.setStyleSheet(hdr_font(16))
description = QtWidgets.QLabel((
'Upload function prototype to server for others to access.\n'
'Review the data below before uploading. If the metadata needs '
'updating, then close window and modify the function metadata '
'in IDA. Once uploaded you can manage prototypes you\'ve created '
'in the management window.'))
description.setWordWrap(True)
description.setStyleSheet('text-size: 90%')
l_metadata = QtWidgets.QLabel('Metadata')
l_metadata.setStyleSheet(hdr_font(14))
vbox_metadata = QtWidgets.QVBoxLayout()
vbox_metadata.setContentsMargins(10, 0, 0, 0)
for label, content, gui_component in data:
hbox = QtWidgets.QHBoxLayout()
hbox.setAlignment(Qt.AlignTop)
label = QtWidgets.QLabel(label)
label.setAlignment(Qt.AlignRight)
label.setFixedWidth(80)
label.setContentsMargins(0, 0, 5, 0)
label.setStyleSheet(hdr_font(10))
value = gui_component(content)
value.setReadOnly(True)
value.setStyleSheet(hdr_font(10))
if isinstance(value, QtWidgets.QPlainTextEdit):
value.setFixedHeight(90)
hbox.addWidget(label)
hbox.addWidget(value)
vbox_metadata.addLayout(hbox)
vbox_text = QtWidgets.QVBoxLayout()
vbox_text.addWidget(title)
vbox_text.addWidget(description)
vbox_text.addSpacing(10)
vbox_text.addWidget(l_metadata)
vbox_text.addLayout(vbox_metadata)
self.top_layout.addLayout(vbox_text)
[docs] def init_middle_layout(self):
pass
[docs] def init_bottom_layout(self):
self.apply_button.setText('Upload')
[docs] def init_signals(self):
super(FIRSTUI.Upload, self).init_signals()
[docs] class CheckAll(Generic):
'''Check all docs'''
[docs] def get_server_thread(self):
try:
return self._server_thread
except:
return None
[docs] def init_window(self):
self.dialog.setWindowTitle('FIRST: Check for Function Prototypes')
self.dialog.setWindowIcon(get_first_icon())
self.dialog.resize(750, 400)
[docs] def init_top_layout(self):
title = QtWidgets.QLabel('Check All Functions')
title.setStyleSheet('font: 16pt;')
description = QtWidgets.QLabel((
'Query FIRST\'s server for function metadata.\n'
'If a function within this IDB matches a signature found in '
'FIRST then it and its metadata will be available for you to '
'select below to apply to your IDB. Select the function you '
'wish to apply existing metadata to in order to view the '
'possible matches.'))
description.setWordWrap(True)
description.setStyleSheet('text-size: 90%')
vbox_text = QtWidgets.QVBoxLayout()
vbox_text.addWidget(title)
vbox_text.addWidget(description)
widget = QtWidgets.QWidget()
widget.setFixedWidth(100)
vbox_legend = QtWidgets.QVBoxLayout(widget)
grid_legend = QtWidgets.QGridLayout()
style = 'background-color: #{0:06x}; border: 1px solid #c0c0c0;'
colors = [FIRST.color_applied, FIRST.color_selected]
text = ['Applied', 'Selected']
for i in xrange(len(colors)):
box = QtWidgets.QLabel()
box.setFixedHeight(10)
box.setFixedWidth(10)
box.setStyleSheet(style.format(colors[i].color().rgb() & 0xFFFFFF))
grid_legend.addWidget(box, i, 0)
grid_legend.addWidget(QtWidgets.QLabel(text[i]), i, 1)
vbox_legend.addLayout(grid_legend)
vbox_legend.setAlignment(Qt.AlignRight | Qt.AlignBottom)
vbox_legend.setContentsMargins(20, 0, 0, 0)
self.top_layout.addLayout(vbox_text)
self.top_layout.addWidget(widget)
[docs] def init_data_layout(self):
self.groups = {}
idaapi.show_wait_box('Checking FIRST for all functions')
functions = FIRST.Metadata.get_non_jmp_wrapped_functions()
metadata = set([FIRST.Metadata.get_function(x) for x in functions])
if None in metadata:
metadata.remove(None)
metadata = list(metadata)
if not len(metadata):
idaapi.hide_wait_box()
title = 'Unable to derive functions'
msg = 'FIRST cannot find any functions'
idaapi.execute_ui_requests((FIRSTUI.Requests.MsgBox(title, msg),))
self.should_show = False
return
# Initiailize data model with no data to prevent the same
# data from being added twice
self.data_model = FIRST.Model.Check(self.groups)
self._model_builder(self.data_model, self.groups, True)
server_thread = FIRST.server.scan( metadata,
self.__data_callback,
self._complete_callback)
self._server_thread = server_thread
idaapi.hide_wait_box()
tree_view = FIRST.Model.TreeView()
tree_view.setUniformRowHeights(False)
tree_view.setExpandsOnDoubleClick(False)
tree_view.setIndentation(15)
tree_view.setSelectionBehavior(QtWidgets.QAbstractItemView.SelectRows)
tree_view.setSelectionMode(QtWidgets.QAbstractItemView.NoSelection)
tree_view.setModel(self.data_model)
tree_view.setColumnWidth(0, 175) # Function
tree_view.setColumnWidth(1, 55) # Matches
tree_view.setColumnWidth(2, 35) # Rank
tree_view.setColumnWidth(3, 75) # Similarity
tree_view.setColumnWidth(4, 150) # Prototype
tree_view.setColumnWidth(5, 20) # i
tree_view.setColumnWidth(7, 20) # i
self.tree_view = tree_view
self.data_layout.addWidget(self.tree_view)
self.timer_i = 0
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self._searching_text)
self.timer.start(750)
def _searching_text(self):
# Provide user with update if thread is still working
self.timer_i = (self.timer_i + 1) % 4
self.msg.setText('Searching FIRST' + ('.' * self.timer_i))
def __data_callback(self, thread, data):
if data and len(data):
self.groups.update(data)
self._model_builder(self.data_model, data)
message = self.found_format.format(len(self.groups))
self.found_label.setText(message)
self.filter_only_subs()
def _complete_callback(self, thread, data):
self.timer.stop()
FIRST.server.remove_operation(thread)
# Alert the user if no matches were found in FIRST
if not self.groups:
self.msg.setText('No matches found in FIRST\'s database')
return
self.msg.setText('Finished searching FIRST')
def _make_function_item(self, function, matches):
'''
Top level function information (level-0)
'''
#if not isinstance(function, FIRST.MetadataShim):
if not {'name', 'address'}.issubset(dir(function)):
return QtGui.QStandardItem('-')
label = '0x{0.address:08x} - {0.name}'
if not FIRST.Info.is_32bit():
label = label.replace(':08x}', ':016x}')
function_label = QtGui.QStandardItem(label.format(function))
function_label.setToolTip(function.name)
function_label.setData(function.address, role=FIRSTUI.ROLE_ADDRESS)
function_label.setData(function.name, role=FIRSTUI.ROLE_NAME)
if 999999 < matches:
matches = '{}M'.format(round(matches / 1000000.0, 1))
elif 999 < matches:
matches = '{}K'.format(round(matches / 1000.0, 1))
else:
matches = str(matches)
matches = QtGui.QStandardItem(matches)
matches.setTextAlignment(Qt.AlignCenter)
row = [function_label, matches] + \
[QtGui.QStandardItem() for i in range(7)]
[x.setEditable(False) for x in row]
return row
def _model_builder(self, model, data, initialize=False):
'''
Build the function model.
@param model: QStandardItemModel object
'''
if initialize:
model.clear() # Clear the model
FIRSTUI.SharedObjects.make_model_headers(model)
if not data:
return
model.add_data(data)
# Add db functions to the model
root_node = model.invisibleRootItem()
for address, matches in data.iteritems():
function = FIRST.Metadata.get_function(address)
func_row = self._make_function_item(function, len(matches))
root_node.appendRow(func_row)
for match in matches:
info_list = FIRSTUI.SharedObjects.make_match_info(match)
func_row[0].appendRow(info_list)
[docs] def init_middle_layout(self):
found = len(self.groups)
total = len(FIRST.Metadata.get_non_jmp_wrapped_functions())
s = 's' if 1 != total else ''
label = 'Matched {0} out of {1} function{2}'
self.select_highest_ranked = QtWidgets.QCheckBox('Select Highest Ranked ')
self.filter_sub_funcs_only = QtWidgets.QCheckBox('Show only "sub_" functions')
vbox = QtWidgets.QVBoxLayout()
vbox.addWidget(self.filter_sub_funcs_only)
vbox.addWidget(self.select_highest_ranked)
self.found_format = label.format('{}', total, s)
self.found_label = QtWidgets.QLabel(self.found_format.format(found))
self.found_label.setAlignment(Qt.AlignTop)
self.middle_layout.addWidget(self.found_label)
self.middle_layout.addStretch()
self.middle_layout.addLayout(vbox)
[docs] def init_bottom_layout(self):
pass
[docs] def init_signals(self):
if not self.should_show:
return
super(FIRSTUI.CheckAll, self).init_signals()
self.select_highest_ranked.stateChanged.connect(self.select_highest)
self.filter_sub_funcs_only.stateChanged.connect(self.filter_only_subs)
self.tree_view.clicked.connect(self.tree_clicked)
# A reference is needed to the dialog or else it closes quickly
# after showing
self.history_dialogs = []
self.tree_view.setContextMenuPolicy(Qt.CustomContextMenu)
self.tree_view.customContextMenuRequested.connect(self.custom_menu)
[docs] def metadata_history(self, metadata_id):
dialog = FIRSTUI.Dialog(None, FIRSTUI.History, metadata_id=metadata_id)
dialog.show()
# Keep a reference to the dialog so it doesn't hide before the
# user is done with it
self.history_dialogs.append(dialog)
[docs] def tree_clicked(self, index):
# If select all is selected then the user is trying to exclude some
# metadata
if self.select_highest_ranked.isChecked():
# Disconnect checkbox signal to prevent reentry
self.select_highest_ranked.stateChanged.disconnect(self.select_highest)
self.select_highest_ranked.setChecked(False)
self.select_highest_ranked.stateChanged.connect(self.select_highest)
data_id = index.data(FIRSTUI.ROLE_ID)
address = index.data(FIRSTUI.ROLE_ADDRESS)
if data_id and address:
self.data_model.set_id_selected((address, data_id))
self.tree_view.clearFocus()
[docs] def get_groups(self):
return self.groups
[docs] def select_highest(self):
flag = self.select_highest_ranked.isChecked()
# Get list of hidden addresses
hidden = []
if flag and self.filter_sub_funcs_only.isChecked():
root = self.data_model.invisibleRootItem()
for i in xrange(root.rowCount()):
child = root.child(i)
if child and self.tree_view.isIndexHidden(child.index()):
hidden.append(child.data(FIRSTUI.ROLE_ADDRESS))
self.data_model.select_highest_ranked(flag, hidden)
self.tree_view.setFocus()
[docs] def filter_only_subs(self):
flag = self.filter_sub_funcs_only.isChecked()
hidden = []
root = self.data_model.invisibleRootItem()
for i in xrange(root.rowCount()):
child = root.child(i)
if not child:
continue
name = child.data(FIRSTUI.ROLE_NAME)
if not name:
continue
hide = not name.startswith('sub_')
self.tree_view.setRowHidden(i, root.index(), flag and hide)
if flag and hide:
hidden.append(child.data(FIRSTUI.ROLE_ADDRESS))
if hidden:
self.data_model.unselect_group(hidden)
[docs] class Check(CheckAll):
[docs] def init_window(self):
super(FIRSTUI.Check, self).init_window()
self.dialog.setWindowTitle('FIRST: Check for Function Prototype')
[docs] def init_top_layout(self):
title = QtWidgets.QLabel('Check Function')
title.setStyleSheet('font: 16pt;')
description = QtWidgets.QLabel((
'Query FIRST\'s server for function metadata.\n'
'If a function within this IDB matches a signature found in '
'FIRST then it and its metadata will be available for you to '
'select below to apply to your IDB. Click to select a '
'function\'s metadata and click again to deselect it.'))
description.setWordWrap(True)
description.setStyleSheet('text-size: 90%')
vbox_text = QtWidgets.QVBoxLayout()
vbox_text.addWidget(title)
vbox_text.addWidget(description)
widget = QtWidgets.QWidget()
widget.setFixedWidth(100)
vbox_legend = QtWidgets.QVBoxLayout(widget)
grid_legend = QtWidgets.QGridLayout()
style = 'background-color: #{0:06x}; border: 1px solid #c0c0c0;'
colors = [FIRST.color_applied, FIRST.color_selected]
text = ['Applied', 'Selected']
for i in xrange(len(colors)):
box = QtWidgets.QLabel()
box.setFixedHeight(10)
box.setFixedWidth(10)
box.setStyleSheet(style.format(colors[i].color().rgb() & 0xFFFFFF))
grid_legend.addWidget(box, i, 0)
grid_legend.addWidget(QtWidgets.QLabel(text[i]), i, 1)
vbox_legend.addLayout(grid_legend)
vbox_legend.setAlignment(Qt.AlignRight | Qt.AlignBottom)
vbox_legend.setContentsMargins(20, 0, 0, 0)
self.top_layout.addLayout(vbox_text)
self.top_layout.addWidget(widget)
def __data_callback(self, thread, data):
if data and len(data):
self.groups.update(data)
self.data_model.add_data(data)
self._model_builder(self.data_model)
[docs] def init_data_layout(self):
function = IDAW.get_func(IDAW.ScreenEA())
if not function:
title = 'FIRST: No Function Selected'
msg = ('No address was selected in the IDA View. '
'Please select a function and try again.')
idaapi.execute_ui_requests((FIRSTUI.Requests.MsgBox(title, msg),))
self.should_show = False
return
self.metadata = FIRST.Metadata.get_function(function.startEA)
if not self.metadata:
title = 'FIRST: Could not load function'
msg = ('FIRST is unable to get the function\'s '
'metadata. Please select a function and try again.')
idaapi.execute_ui_requests((FIRSTUI.Requests.MsgBox(title, msg),))
self.should_show = False
return
self.groups = {}
idaapi.show_wait_box('Checking FIRST for matches')
# Initiailize data model with no data to prevent the same
# data from being added twice
self.data_model = FIRST.Model.Check(self.groups)
self._model_builder(self.data_model)
server_thread = FIRST.server.scan( [self.metadata],
self.__data_callback,
self._complete_callback)
self._server_thread = server_thread
idaapi.hide_wait_box()
tree_view = FIRST.Model.TreeView()
tree_view.setUniformRowHeights(False)
tree_view.setExpandsOnDoubleClick(False)
tree_view.setIndentation(15)
tree_view.setSelectionBehavior(QtWidgets.QAbstractItemView.SelectRows)
tree_view.setSelectionMode(QtWidgets.QAbstractItemView.NoSelection)
tree_view.setModel(self.data_model)
tree_view.setColumnWidth(0, 200) # Function
tree_view.setColumnWidth(1, 40) # Rank
tree_view.setColumnWidth(2, 75) # Similarity
tree_view.setColumnWidth(3, 150) # Prototype
tree_view.setColumnWidth(4, 20) # i
tree_view.setColumnWidth(6, 20) # i
self.tree_view = tree_view
self.data_layout.addWidget(self.tree_view)
self.timer_i = 0
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self._searching_text)
self.timer.start(750)
def _model_builder(self, model):
model.clear()
root_node = model.invisibleRootItem()
FIRSTUI.SharedObjects.make_model_headers(model, check_all=False)
matches = self.get_groups()
if not matches:
return
address = matches.keys()[0]
for match in matches[address]:
row = FIRSTUI.SharedObjects.make_match_info(match, check_all=False)
root_node.appendRow(row)
[docs] def init_middle_layout(self):
pass
[docs] def init_signals(self):
if self.should_show:
super(FIRSTUI.CheckAll, self).init_signals()
self.tree_view.clicked.connect(self.tree_clicked)
# A reference is needed to the dialog or else it closes quickly
# after showing
self.history_dialogs = []
self.tree_view.setContextMenuPolicy(Qt.CustomContextMenu)
self.tree_view.customContextMenuRequested.connect(self.custom_menu)
[docs] def tree_clicked(self, index):
data_id = index.data(FIRSTUI.ROLE_ID)
address = index.data(FIRSTUI.ROLE_ADDRESS)
if data_id and address:
self.data_model.set_id_selected((address, data_id))
self.tree_view.clearFocus()
[docs] class History(Generic):
def __init__(self, metadata_id):
self.should_show = True
self.metadata_id = metadata_id
[docs] def init_window(self):
self.dialog.setWindowTitle('FIRST: Metadata Revision History')
self.dialog.setWindowIcon(get_first_icon())
self.dialog.resize(600, 400)
[docs] def utc_to_local(self, utc_str):
if not utc_str:
return None
utc_dt = datetime.datetime.strptime(utc_str[:26], '%Y-%m-%dT%H:%M:%S.%f')
timestamp = calendar.timegm(utc_dt.timetuple())
local = datetime.datetime.fromtimestamp(timestamp)
return local.replace(microsecond=utc_dt.microsecond)
[docs] def init_top_layout(self):
history = FIRST.server.history(self.metadata_id)
if (not history
or ('results' not in history)
or (self.metadata_id not in history['results'])
or ('creator' not in history['results'][self.metadata_id])
or ('history' not in history['results'][self.metadata_id])):
self.should_show = False
return
self.creator = history['results'][self.metadata_id]['creator']
self.history = history['results'][self.metadata_id]['history']
title = QtWidgets.QLabel('Revision History')
title.setStyleSheet('font: 16pt;')
creator = QtWidgets.QLabel('by: <b>{}</b>'.format(self.creator))
creator.setAlignment(Qt.AlignRight | Qt.AlignBottom)
self.top_layout.addWidget(title)
self.top_layout.addStretch()
self.top_layout.addWidget(creator)
[docs] def init_data_layout(self):
if not self.should_show:
return
vbox_text = QtWidgets.QVBoxLayout()
scroll_layout = FIRSTUI.ScrollWidget(frame=QtWidgets.QFrame.NoFrame)
scroll_layout.addLayout(vbox_text)
first_record = True
cmp_func = lambda x,y: cmp( self.utc_to_local(y['committed']),
self.utc_to_local(x['committed']))
for details in sorted(self.history, cmp_func):
if not first_record:
hr = QtWidgets.QFrame()
hr.setFrameShape(QtWidgets.QFrame.HLine)
hr.setFrameShadow(QtWidgets.QFrame.Sunken)
vbox_text.addWidget(hr)
hbox = QtWidgets.QHBoxLayout()
local = self.utc_to_local(details['committed'])
vbox = QtWidgets.QVBoxLayout()
vbox.setContentsMargins(0, 5, 0, 0)
local_str = 'N/A'
if local:
local_str = datetime.datetime.strftime(local, '%B %d, %Y\n%I:%M:%S %p')
timestamp = QtWidgets.QLabel(local_str)
timestamp.setFixedWidth(125)
timestamp.setAlignment(Qt.AlignCenter | Qt.AlignVCenter)
vbox.addWidget(timestamp)
hbox.addLayout(vbox)
# Second Column
vbox = QtWidgets.QVBoxLayout()
vbox.setContentsMargins(0, 5, 0, 5)
for label in ['Name', 'Prototype', 'Comment']:
l = QtWidgets.QLabel('<b>{}</b>'.format(label))
l.setFixedWidth(75)
l.setAlignment(Qt.AlignRight)
vbox.addWidget(l)
vbox.addStretch()
vbox.setAlignment(Qt.AlignTop)
hbox.addLayout(vbox)
# Third Column
vbox = QtWidgets.QVBoxLayout()
vbox.setContentsMargins(0, 5, 0, 5)
comment = QtWidgets.QLabel(details['comment'])
if not details['comment']:
comment = QtWidgets.QLabel('- No Comment -')
comment.setWordWrap(True)
vbox.addWidget(QtWidgets.QLabel(details['name']))
vbox.addWidget(QtWidgets.QLabel(details['prototype']))
vbox.addWidget(comment)
vbox.addStretch()
hbox.addLayout(vbox)
vbox_text.addLayout(hbox)
first_record = False
self.data_layout.addSpacing(5)
self.data_layout.addWidget(scroll_layout)
[docs] def init_middle_layout(self):
vbox = QtWidgets.QVBoxLayout()
vbox.addStretch()
self.middle_layout.addLayout(vbox)
[docs] def init_bottom_layout(self):
self.bottom_layout.removeWidget(self.apply_button)
self.cancel_button.setText('Close')
[docs] def init_signals(self):
super(FIRSTUI.History, self).init_signals()
# GUI Shown to user when first installed
#-------------------------------------------------------------------------------
[docs] class Welcome(Generic):
[docs] def init_window(self):
super(FIRSTUI.Welcome, self).init_window()
self.dialog.setWindowTitle('FIRST')
self.dialog.resize(375, 400)
[docs] def init_top_layout(self):
vbox = QtWidgets.QVBoxLayout()
label = QtWidgets.QLabel('FIRST')
label.setStyleSheet('font: 40px;')
vbox.addWidget(label)
label = QtWidgets.QLabel('Function Identification and Recovery Signature Tool')
label.setStyleSheet('font: 12px;')
vbox.addWidget(label)
self.top_layout.addLayout(vbox)
self.top_layout.addSpacing(10)
[docs] def init_data_layout(self):
FIRSTUI.SharedObjects.server_config_layout(self, self.data_layout)
[docs] def init_middle_layout(self):
#self.check_on_startup = QtWidgets.QCheckBox("Check FIRST on IDA startup")
#self.middle_layout.addWidget(self.check_on_startup)
self.middle_layout.addStretch()
[docs] def init_signals(self):
super(FIRSTUI.Welcome, self).init_signals()
[docs] def show(self):
super(FIRSTUI.Welcome, self).show()
# Class to interface with Dialog GUIs and the back end data
#-------------------------------------------------------------------------------
class Dialog(QtWidgets.QDialog):
error_format = '<font color="#ff0000">{0}</font>'
def __init__(self, parent, dialog, **kwargs):
super(FIRSTUI.Dialog, self).__init__(parent)
self.parent = parent
self.data = None
self.ui = dialog(**kwargs)
self.ui.setupUi(self)
self.should_show = self.ui.should_show
self.accepted.connect(self.success_callback)
self.rejected.connect(self.reject_callback)
self.setWindowFlags(self.windowFlags() & ~Qt.WindowContextHelpButtonHint)
self.hide_attempts = 0
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self.__hide)
self.timer.start(500)
def registerSuccessCallback(self, fn):
self.callback_fn = fn
def success_callback(self):
if self.callback_fn != None:
self.callback_fn(self.ui)
def ok_button_callback(self):
if self.reject_accept():
return
self._stop_server_thread()
self.accept()
def reject_accept(self):
return False
def reject_callback(self):
self._stop_server_thread()
self.close()
def _stop_server_thread(self):
server_thread = self.ui.get_server_thread()
if server_thread and (threading.Thread == type(server_thread)):
if server_thread.is_alive():
FIRST.server.stop_operation(server_thread)
def show(self):
if self.should_show:
super(type(self), self).show()
def __hide(self):
if not self.ui.should_show:
self.hide()
# Check for up to 5 seconds whether the dialog needs to be
# closed or not
self.hide_attempts += 1
if self.hide_attempts > 10:
self.timer.stop()
# Create FIRST icon
def get_first_icon(return_hex=False, return_pixmap=False):
first_icon = ( '89504e470d0a1a0a0000000d49484452000000700000008708030000'
'00f9a92dd80000001974455874536f6674776172650041646f626520'
'496d616765526561647971c9653c0000005d504c54458a8a84b7b6b3'
'eaeae9d8d8d6b1b1ad797872d6d5d4c1c1be6d6c66a2a19d8d8d87e2'
'e2e0acaca86e6d67c5c4c2f5f5f4e0e0decbcbc9989792cececc8382'
'7da7a7a394938e81807aecebebf5f5f5777670bbbab79e9d9964635c'
'ffffffe54bab180000001f74524e53ffffffffffffffffffffffffff'
'ffffffffffffffffffffffffffffffffff00cd1976100000026f4944'
'415478daecdae9b2822014006050cca55cd2b4ac8befff9837771445'
'30619ae69c5f05d1172207704295e140000208e0f78311a5288fcec6'
'c0b7d706ca5ff6433f38785ddc0b924d7b8b2f4dc50de3aaaa5fe226'
'98fa3a6eef0a4b069c7b4d78a368f96cc58d8a42068c969bdefbfa2b'
'550809305b6bdb5f2d7a2c78f6c4a09a474f5be0baf76cc72f3c165c'
'f768dbc65fa8b90d77d354919816028ffed51f88fb777efda5316e7c'
'773728f22861ef50779c71a953ed053391d782ed7ca7c1f20f560489'
'78f8edfa3369fb1aaf0c890a58222a0106f32bba1b145fce39f8710f'
'1fc5f68c6267852306ad0db0241e9504fb3ce37f003ea4b80e4cfa77'
'81b5043adb60f92799a3bac522180a2ef12eb0904d8a5d93135384f7'
'804811eca77e132136004e44ea5cb581f9d0e4eab0e5a92e90b05b28'
'b6c25703f33d6015b3cba2ab04925de07b428ef3233402be8772d86c'
'5ce799263902ccb82f491ceebe9148ded2a0cde795d37c4f2303a20f'
'c03ecf0d6b47ac1bc433d0e2069507ef3a4057004ece2b05b1ed4801'
'4c675baa4405bcbfcefc4f6062e194187713e332bf8bdcad31447956'
'2ef6993fca2c74701cb2d3eaba253a3dc98256c0651a571f68e17461'
'1dd600ba82a38c1630e1ca438b5fb6fc032fe9fc44ef24955ef0322d'
'9dee16758093410cddc5cc7328c80ca2835752dd98093e00fb872671'
'd06d10717cd493a88d5de2f18fbe000410400001041040007f007c98'
'066d00010410c0df035fa6410220805f0ffe99061180bf0a7aa641aa'
'0df44c8314400015c1d234680308e0d7839969900008e0d783b96910'
'0108e051e0531758c8fe35f128f0fc9cf40bb55144952e50530008a0'
'72fc0b3000d4bbb97bf44864540000000049454e44ae426082')
if return_hex:
return first_icon
image = QtGui.QImage()
image.loadFromData(QtCore.QByteArray.fromHex(first_icon))
pixmap = QtGui.QPixmap()
pixmap.convertFromImage(image)
if return_pixmap:
return pixmap
return QtGui.QIcon(pixmap)
FIRST_ICON = get_first_icon(True).decode('hex')
FIRST_ICON = IDAW.load_custom_icon(data=FIRST_ICON, format='png')
# Function Identification and Recovery Signature Tool (FIRST) Plug-in Class
#-------------------------------------------------------------------------------
class FIRST_Plugin(idaapi.plugin_t):
flags = idaapi.PLUGIN_FIX
comment = 'Function Identification and Recovery Signature Tool'
# IDA Pro display details
help = 'Configure Function Identification and Recovery Signature Tool (FIRST)'
wanted_name = 'FIRST'
wanted_hotkey = '1'
def init(self):
FIRST.initialize()
return idaapi.PLUGIN_KEEP
def run(self, arg):
if FIRST.plugin:
FIRST.plugin.Show(self.wanted_name)
def term(self):
FIRST.cleanup_hooks()
def PLUGIN_ENTRY():
global required_modules_loaded
if required_modules_loaded:
return FIRST_Plugin()
idaapi.execute_ui_requests((FIRSTUI.Requests.Print('[1st] Unable to load all required modules.\n'),))
return None