Source code for maec.utils.comparator

# MAEC Comparator Classes
from mixbox import compat

[docs]class ComparisonResult(object): def __init__(self, bundle_list, lookup_table): self.lookup_table = lookup_table self.bundle_list = bundle_list def get_unique(self, bundle_list=None): unique_objs = {} if bundle_list is None: bundle_list = self.bundle_list for b in self.bundle_list: unique_objs[b.id_] = [] for obj_hash in self.lookup_table: sources = BundleComparator.get_sources(self.lookup_table, obj_hash) if len(sources) == 1: result_index = sources[0] for unique_obj in self.lookup_table[obj_hash][result_index]: unique_objs[result_index].append(unique_obj['object'].id_) return unique_objs def get_common(self, bundle_list=None): confirmed_objs = [] if bundle_list is None: bundle_list = self.bundle_list for obj_hash in self.lookup_table: sources = BundleComparator.get_sources(self.lookup_table, obj_hash) if len(sources) > 1: confirmed_obj_dict = {} confirmed_obj_dict['object'] = obj_hash confirmed_obj_dict['object_instances'] = {} for key, obj_list in self.lookup_table[obj_hash].items(): confirmed_obj_dict['object_instances'][key] = [] for common_obj in obj_list: confirmed_obj_dict['object_instances'][key].append(common_obj['object'].id_) if confirmed_obj_dict not in confirmed_objs: confirmed_objs.append(confirmed_obj_dict) return confirmed_objs
[docs]class SimilarObjectCluster(dict): def __init__(self): pass def add_object(self, obj, owner): if owner not in self: self[owner] = [{ 'object':obj, 'ownerBundle':owner }] else: self[owner].append({ 'object':obj, 'ownerBundle':owner }) def get_object_by_owner_id(self, owner_id): return self[owner_id][0]["object"]
[docs]class BundleComparator(object): @classmethod def compare(cls, bundle_list, match_on = None, case_sensitive = True): cls.object_table = {} cls.case_sensitive = case_sensitive if not match_on: # Default matching properties cls.match_on = { 'FileObjectType': ['file_name', 'file_path'], 'WindowsRegistryKeyObjectType': ['hive','key'], 'WindowsMutexObjectType': ['name'], 'SocketObjectType': ['address_value', 'port_value'], 'WindowsPipeObjectType': ['name'], 'ProcessObjectType': ['name'] } else: cls.match_on = match_on lookup_table = {} for bundle in bundle_list: for action in bundle.get_all_actions(): cls.process_action(action, lookup_table, bundle.id_) for obj in bundle.get_all_objects(): cls.process_object(obj, lookup_table, bundle.id_) return ComparisonResult(bundle_list, lookup_table) @classmethod def process_action(cls, action, lookup_table, bundle): if action.associated_objects: for associated_object in action.associated_objects: # get actual object from associated object obj = associated_object cls.process_object(obj, lookup_table, bundle) @classmethod def process_object(cls, obj, lookup_table, bundle): # get hash string from object to use as key in lookup table # Make sure the object is one of the supported types in the match_on dictionary if obj.properties and obj.properties._XSI_TYPE in cls.match_on: hash_value = ObjectHash.get_hash(obj, cls.match_on, cls.case_sensitive) if hash_value: if hash_value not in lookup_table: lookup_table[hash_value] = SimilarObjectCluster() lookup_table[hash_value].add_object(obj, bundle) @classmethod def get_sources(cls, lookup_table, obj_hash): val = [] for obj_dict_list in lookup_table[obj_hash].values(): if not obj_dict_list[0] in val: val.append(obj_dict_list[0]['ownerBundle']) return val
[docs]class ObjectHash(object): @classmethod def get_hash(cls, obj, match_on, case_sensitive): cls.match_on = match_on cls.case_sensitive = case_sensitive hash_val = '' for attrname, typed_field in obj.properties.typed_fields_with_attrnames(): # Make sure the typed field is comparable if typed_field.comparable: # Check if we're dealing with a nested element that we want to compare nested_element = cls.is_nested_match(attrname, cls.match_on[obj.properties._XSI_TYPE]) # Handle the normal, non-nested case if not nested_element and attrname in cls.match_on[obj.properties._XSI_TYPE]: hash_val = cls.get_val(obj, attrname, hash_val) # Handle the nested case elif nested_element: split_nested_element = nested_element.split('.') hash_val = cls.get_val(obj, attrname, hash_val, split_nested_element[1:]) if not cls.case_sensitive: return hash_val.lower() else: return hash_val @classmethod def get_val(cls, obj, typed_field, hash_val, nested_elements = None): if not nested_elements: val = getattr(obj.properties, str(typed_field)) if val is not None: hash_val += str(typed_field) + ":" + str(val) + " " else: if len(nested_elements) == 1: val = getattr(obj.properties, str(typed_field)) if val is not None: hash_val += str(typed_field) + ":" if isinstance(val, compat.MutableSequence): for list_item in val: if '/' in str(nested_elements[0]): hash_val += '[' split_names = nested_elements[0].split('/') for name in split_names: name_val = getattr(list_item, name) if name_val : hash_val += name + ':' + str(name_val) + ',' hash_val = hash_val.rstrip(',') hash_val += ']' else: hash_val += '[' + str(nested_elements[0]) + ':' + str(getattr(list_item, str(nested_elements[0]))) + ']' else: hash_val += str(getattr(val, nested_elements[0])) return hash_val @classmethod def is_nested_match(cls, typed_field_name, match_on_list): for matching_property in match_on_list: if '.' in matching_property and typed_field_name in matching_property: return matching_property return False