Source code for maec.utils.deduplicator

# MAEC Bundle Deduplicator Module
# Copyright (c) 2018, The MITRE Corporation
# All rights reserved

# See LICENSE.txt for complete terms

import copy

from mixbox import compat
from mixbox import entities

from cybox.core import RelatedObject, AssociatedObject
from cybox.common.properties import BaseProperty
from mixbox.vendor.six import iteritems


[docs]class BundleDeduplicator(object):
[docs] @classmethod def deduplicate(cls, bundle): """Deduplicate the input Bundle.""" # Dictionary of all unique objects # Key = object type (xsi:type) # Value = dictionary of unique objects for that type # Key = unique object id # Value = object values, as a set cls.objects_dict = {} # Dictionary of non-unique -> unique Object ID mappings cls.object_ids_mapping = {} # Dictionary of Objects with IDs cls.id_objects = {} # Dictionary of Objects with IDrefs cls.idref_objects = {} # Get all Objects in the Bundle all_objects = bundle.get_all_objects(include_actions=True) # Perform the Object mapping cls.map_objects(all_objects) # Do the actual deduplication if duplicate objects were found if cls.object_ids_mapping: # Next, add the unique objects to their own collection cls.handle_unique_objects(bundle, all_objects) # Replace the non-unique Objects with references # to unique Objects across the entire Bundle cls.handle_duplicate_objects(bundle, all_objects) # Finally, perform some cleanup to handle strange # cases where you may have Objects pointing to each other cls.cleanup(bundle)
[docs] @classmethod def cleanup(cls, bundle): """Cleanup and remove and Objects that may be referencing the re-used Objects. Otherwise, this can create Object->Object->Object etc. references which don't make sense. """ object_ids = cls.object_ids_mapping.values() # copy aside for lookup later # Cleanup the root-level Objects if bundle.objects: # List of Objects to remove objs = (x for x in bundle.objects if (x.idref and x.idref in object_ids)) # Remove the extraneous Objects for obj in objs: bundle.objects.remove(obj) # Cleanup the Object Collections if bundle.collections and bundle.collections.object_collections: for collection in bundle.collections.object_collections: # Ignore the re-used objects collection if collection.name == "Deduplicated Objects": continue # List of Objects to remove objs = (x for x in collection.object_list if (x.idref and x.idref in object_ids)) for obj in objs: collection.object_list.remove(obj)
[docs] @classmethod def handle_duplicate_objects(cls, bundle, all_objects): """Replace all of the duplicate Objects with references to the unique object placed in the "Re-used Objects" Collection.""" for duplicate_object_id, unique_object_id in iteritems(cls.object_ids_mapping): # Modify the existing Object to serve as a reference to # the unique Object in the collection if duplicate_object_id and duplicate_object_id in cls.id_objects: object = cls.id_objects[duplicate_object_id] object.idref = unique_object_id object.id_ = None object.properties = None object.related_objects = None object.domain_specific_object_properties = None if duplicate_object_id and duplicate_object_id in cls.idref_objects: for object in cls.idref_objects[duplicate_object_id]: object.idref = unique_object_id
[docs] @classmethod def handle_unique_objects(cls, bundle, all_objects): """Add a new Object collection to the Bundle for storing the unique Objects. Add the Objects to the collection. """ # First, find the ID of the last Object Collection (if applicable) counter = 1 if bundle.collections and bundle.collections.object_collections: counter += len(bundle.collections.object_collections) # Find the namespace used in the Bundle IDs bundle_namespace = bundle.id_.split('-')[1] # Build the collection ID collection_id = "maec-%s-objc-%s" % (bundle_namespace, counter) # Add the named Object collection bundle.add_named_object_collection("Deduplicated Objects", collection_id) # Add the unique Objects to the collection cls.add_unique_objects(bundle, all_objects)
[docs] @classmethod def add_unique_objects(cls, bundle, all_objects): """Add the unique Objects to the collection and perform the properties replacement.""" added_ids = [] for unique_object_id in cls.object_ids_mapping.values(): if unique_object_id not in added_ids: for object in all_objects: if object.id_ and object.id_ == unique_object_id: object_copy = copy.deepcopy(object) if isinstance(object_copy, AssociatedObject): object_copy.association_type = None elif isinstance(object_copy, RelatedObject): object_copy.relationship = None # Modify the existing Object to serve as a reference to the Object in the collection object.idref = object.id_ object.id_ = None object.properties = None object.related_objects = None object.domain_specific_object_properties = None # Add the unique Object to the collection bundle.add_object(object_copy, "Deduplicated Objects") # Break out of the all_objects loop break added_ids.append(unique_object_id)
[docs] @classmethod def map_objects(cls, all_objects): """Map the non-unique Objects to their unique (first observed) counterparts.""" # Do the object mapping for obj in all_objects: # Add the Object to its respective dictionary if obj.id_: cls.id_objects[obj.id_] = obj elif obj.idref and obj.idref not in cls.idref_objects: cls.idref_objects[obj.idref] = [obj] elif obj.idref and obj.idref in cls.idref_objects: cls.idref_objects[obj.idref].append(obj) # Find a matching ID for the Object matching_object_id = cls.find_matching_object(obj) if matching_object_id: cls.object_ids_mapping[obj.id_] = matching_object_id
[docs] @classmethod def get_typedfield_values(cls, val, name, values, ignoreCase=False): """Returns the value contained in a TypedField or its nested members, if applicable.""" # If it's a BaseProperty instance, then we're done. Return it. if isinstance(val, BaseProperty): val = str(val) if ignoreCase else str(val).lower() # TODO (bworrell): This seems backwards. values.add("%s:%s" % (name, val)) return # If it's an Entity, iterate over the typedfields and find the values # for each field. if isinstance(val, entities.Entity): for attrname, item_property in val.typed_fields_with_attrnames: path = "{name}/{attrname}".format(**locals()) fieldval = getattr(val, attrname) cls.get_typedfield_values(fieldval, path, values, ignoreCase) # If the value is a mutable sequence, attempt to find TypedFields as # in each item. EntityLists are Entity subclasses that can have # TypedFields, so we don't make this an elif. if isinstance(val, compat.MutableSequence): for list_item in val: cls.get_typedfield_values(list_item, name, values, ignoreCase)
[docs] @classmethod def get_object_values(cls, obj, ignoreCase=False): """Get the values specified for an Object's properties as a set.""" values = set() for attrname, typed_field in obj.properties.typed_fields_with_attrnames: # Make sure the typed field is comparable if typed_field.comparable: val = getattr(obj.properties, attrname) if val is not None: cls.get_typedfield_values(val, attrname, values, ignoreCase) return values
[docs] @classmethod def find_matching_object(cls, obj): """Find a matching object, if it exists.""" if obj and obj.properties: object_values = cls.get_object_values(obj) xsi_type = obj.properties._XSI_TYPE if not xsi_type: return None elif xsi_type in cls.objects_dict: types_dict = cls.objects_dict[xsi_type] # See if we already have an identical object in the dictionary for obj_id, obj_values in types_dict.iteritems(): if obj_values == object_values: # If so, return its ID for use in the IDREF return obj_id # If not, add it to the dictionary types_dict[obj.id_] = object_values else: types_dict = {obj.id_:object_values} cls.objects_dict[xsi_type] = types_dict return None