Source code for maec.utils.merge

# Copyright (c) 2018, The MITRE Corporation. All rights reserved.
# See LICENSE.txt for complete terms.

"""Methods for merging MAEC documents"""

from copy import deepcopy
import itertools

from mixbox import idgen
from mixbox.namespaces import Namespace
from mixbox.vendor.six import iteritems

from cybox.core import Object
from cybox.common import HashList

import maec
from maec.package import (Package, MalwareSubject, MalwareConfigurationDetails,
                          FindingsBundleList, MetaAnalysis, Analyses,
                          MinorVariants, MalwareSubjectRelationshipList,
                          MalwareSubjectList)

[docs]def dict_merge(target, *args): '''Merge multiple dictionaries into one''' if len(args) > 1: for obj in args: dict_merge(target, obj) return target # Recursively merge dicts and set non-dict values obj = args[0] if not isinstance(obj, dict): return obj for k, v in iteritems(obj): if k in target and isinstance(target[k], dict): dict_merge(target[k], v) elif k in target and isinstance(target[k], list): target[k] = (target[k] + v) else: target[k] = deepcopy(v) return target
[docs]def merge_documents(input_list, output_file): '''Merge a list of input MAEC documents and write them to an output file''' parsed_documents = [] # Parse the documents and get their API representation for input_file in input_list: api_representation = maec.parse_xml_instance(input_file)['api'] parsed_documents.append(api_representation) # Do a sanity check on the input list of documents for document in parsed_documents: if isinstance(document, Package): continue else: print('Error: unsupported document type. Currently only MAEC Packages are supported') # Merge the MAEC packages merged_package = merge_packages(parsed_documents) # Write the merged package to the output file merged_package.to_xml_file(output_file, {"https://github.com/MAECProject/python-maec":"merged"})
[docs]def merge_packages(package_list, namespace = None): '''Merge a list of input MAEC Packages and return a merged Package instance.''' malware_subjects = [] # Instantiate the ID generator class (for automatic ID generation) if not namespace: NS = Namespace("https://github.com/MAECProject/python-maec", "merged") else: NS = namespace maec.utils.set_id_namespace(NS) # Build the list of Malware Subjects for package in package_list: for malware_subject in package.malware_subjects: malware_subjects.append(malware_subject) # Merge the Malware Subjects merged_subjects = merge_malware_subjects(malware_subjects) # Merge the input namespace/schemaLocation dictionaries merged_namespaces = {} merged_schemalocations = {} for package in package_list: merged_namespaces.update(package.__input_namespaces__) merged_schemalocations.update(package.__input_schemalocations__) # Create a new Package with the merged Malware Subjects merged_package = Package() merged_package.malware_subjects = MalwareSubjectList(merged_subjects) merged_package.__input_namespaces__ = merged_namespaces merged_package.__input_schemalocations__ = merged_schemalocations return merged_package
[docs]def bin_malware_subjects(malware_subject_list, default_hash_type='md5'): '''Bin a list of Malware Subjects by hash Default = MD5 ''' binned_subjects = {} for malware_subject in malware_subject_list: mal_inst_obj = malware_subject.malware_instance_object_attributes if mal_inst_obj: obj_properties = mal_inst_obj.properties if obj_properties and obj_properties.hashes: for hash in obj_properties.hashes: if hash.type_ and hash.simple_hash_value: hash_type = '' hash_value = '' # Get the hash type hash_type = str(hash.type_).lower() # Get the hash value hash_value = str(hash.simple_hash_value).lower() # Check the hash type and bin accordingly if hash_type == default_hash_type: if hash_value in binned_subjects: binned_subjects[hash_value].append(malware_subject) else: binned_subjects[hash_value] = [malware_subject] return binned_subjects
[docs]def merge_entities(entity_list): '''Merge a list of MAEC/CybOX entities''' dict_list = [x.to_dict() for x in entity_list] output_dict = dict_merge({}, *dict_list) return output_dict
[docs]def deduplicate_vocabulary_list(entity_list, value_name = "value"): # TODO: Move this to the deduplicator module? '''Deduplicate a simple list of MAEC/CybOX vocabulary entries''' temp = [] output_list = [] for entity in entity_list: entity_value = getattr(entity, value_name) entity_lower = str(entity_value).lower() if entity_value and entity_lower not in temp: temp.append(entity_lower) output_list.append(entity) elif not entity_value: output_list.append(entity) return output_list
[docs]def merge_findings_bundles(findings_bundles_list): '''Merge two or more Malware Subject Findings Bundles''' # Merge the meta-analysis merged_meta_analysis = None meta_analysis_list = [x.meta_analysis for x in findings_bundles_list if x.meta_analysis] if meta_analysis_list: merged_meta_analysis = MetaAnalysis.from_dict(merge_entities(meta_analysis_list)) # Merge the list of bundles merged_bundles = list(itertools.chain(*[x.bundle for x in findings_bundles_list if x.bundle])) # Merge the list of external bundle references merged_bundle_external_references = list(itertools.chain(*[x.bundle_external_reference for x in findings_bundles_list if x.bundle_external_reference])) # Construct the merged Findings Bundle List entity merged_findings_bundle_list = FindingsBundleList() if merged_meta_analysis: merged_findings_bundle_list.meta_analysis = merged_meta_analysis if merged_bundles: merged_findings_bundle_list.bundle = merged_bundles if merged_bundle_external_references: merged_findings_bundle_list.bundle_external_reference = merged_bundle_external_references return merged_findings_bundle_list
[docs]def create_mappings(mapping_dict, original_malware_subject_list, merged_malware_subject): '''Map the IDs of a list of existing Malware Subjects to the new merged Malware Subject''' for malware_subject in original_malware_subject_list: mapping_dict[malware_subject.id_] = merged_malware_subject.id_
[docs]def merge_binned_malware_subjects(merged_malware_subject, binned_list, id_mappings_dict): '''Merge a list of input binned (related) Malware Subjects''' # Merge the Malware_Instance_Object_Attributes mal_inst_obj_list = [x.malware_instance_object_attributes for x in binned_list] merged_inst_obj = Object.from_dict(merge_entities(mal_inst_obj_list)) # Give the merged Object a new ID merged_inst_obj.id_ = idgen.create_id('object') # Deduplicate the hash values, if they exist if merged_inst_obj.properties and merged_inst_obj.properties.hashes: hashes = merged_inst_obj.properties.hashes hashes = HashList(deduplicate_vocabulary_list(hashes, value_name = 'simple_hash_value')) hashes = HashList(deduplicate_vocabulary_list(hashes, value_name = 'fuzzy_hash_value')) merged_inst_obj.properties.hashes = hashes # Merge and deduplicate the labels merged_labels = list(itertools.chain(*[x.label for x in binned_list if x.label])) deduplicated_labels = deduplicate_vocabulary_list(merged_labels) # Merge the configuration details config_details_list = [x.configuration_details for x in binned_list if x.configuration_details] merged_config_details = None if config_details_list: merged_config_details = MalwareConfigurationDetails.from_dict(merge_entities(config_details_list)) # Merge the minor variants merged_minor_variants = list(itertools.chain(*[x.minor_variants for x in binned_list if x.minor_variants])) # Merge the field data # TODO: Add support. Not implemented in the APIs. # Merge the analyses merged_analyses = list(itertools.chain(*[x.analyses for x in binned_list if x.analyses])) # Merge the findings bundles merged_findings_bundles = merge_findings_bundles([x.findings_bundles for x in binned_list if x.findings_bundles]) # Merge the relationships merged_relationships = list(itertools.chain(*[x.relationships for x in binned_list if x.relationships])) # Merge the compatible platforms merged_compatible_platforms = list(itertools.chain(*[x.compatible_platform for x in binned_list if x.compatible_platform])) # Build the merged Malware Subject merged_malware_subject.malware_instance_object_attributes = merged_inst_obj if deduplicated_labels: merged_malware_subject.label = deduplicated_labels if merged_config_details: merged_malware_subject.configuration_details = merged_config_details if merged_minor_variants: merged_malware_subject.minor_variants = MinorVariants(merged_minor_variants) if merged_analyses: merged_malware_subject.analyses = Analyses(merged_analyses) if merged_findings_bundles: merged_malware_subject.findings_bundles = merged_findings_bundles if merged_relationships: merged_malware_subject.relationships = MalwareSubjectRelationshipList(merged_relationships) if merged_compatible_platforms: merged_malware_subject.compatible_platform = merged_compatible_platforms
[docs]def update_relationships(malware_subject_list, id_mappings): '''Update any existing Malware Subject relationships to account for merged Malware Subjects''' for malware_subject in malware_subject_list: if malware_subject.relationships: relationships = malware_subject.relationships for relationship in relationships: malware_subject_references = relationship.malware_subject_references for malware_subject_reference in malware_subject_references: if malware_subject_reference.malware_subject_idref in id_mappings.keys(): malware_subject_reference.malware_subject_idref = id_mappings[malware_subject_reference.malware_subject_idref]
[docs]def merge_malware_subjects(malware_subject_list): '''Merge a list of input Malware Subjects''' id_mappings = {} output_subjects = [] # Bin the Malware Subjects by hash binned_subjects = bin_malware_subjects(malware_subject_list) # Merge the Malware Subjects that were binned for binned_list in binned_subjects.values(): # Make sure we're dealing with at least two subjects if len(binned_list) > 1: # Instantiate the merged Malware Subject merged_malware_subject = MalwareSubject() # Add the ID mappings from the old (merged) subject to the new one create_mappings(id_mappings, binned_list, merged_malware_subject) # Perform the merging merge_binned_malware_subjects(merged_malware_subject, binned_list, id_mappings) # Add the merged Malware Subject to the output list output_subjects.append(merged_malware_subject) # Add the Malware Subjects that weren't merged for malware_subject in malware_subject_list: if malware_subject.id_ not in id_mappings.keys(): output_subjects.append(malware_subject) # Update the relationships for the Malware Subjects to account for the merges update_relationships(output_subjects, id_mappings) # Return the list of original and merged Malware Subjects return output_subjects