Source code for maec.bundle.bundle

# MAEC Bundle Class

# Copyright (c) 2018, The MITRE Corporation
# All rights reserved

from mixbox import fields
from mixbox import idgen

from cybox.core import Object
from cybox.utils.normalize import normalize_object_properties

import maec
import maec.bindings.maec_bundle as bundle_binding
from maec.utils import BundleComparator, BundleDeduplicator

from . import _namespace
from .malware_action import MalwareAction
from .av_classification import AVClassifications
from .behavior import Behavior
from .candidate_indicator import CandidateIndicatorList
from .process_tree import ProcessTree
from .capability import CapabilityList
from .object_history import ObjectHistory


[docs]class BehaviorList(maec.EntityList): _binding_class = bundle_binding.BehaviorListType _namespace = _namespace behavior = fields.TypedField("Behavior", Behavior, multiple=True)
[docs]class ActionList(maec.EntityList): _binding_class = bundle_binding.ActionListType _namespace = _namespace action = fields.TypedField("Action", MalwareAction, multiple=True)
[docs]class ObjectList(maec.EntityList): _binding_class = bundle_binding.ObjectListType _namespace = _namespace object = fields.TypedField("Object", Object, multiple=True)
[docs]class BaseCollection(maec.Entity): _binding = bundle_binding _binding_class = bundle_binding.BaseCollectionType _namespace = _namespace name = fields.TypedField("name") affinity_type = fields.TypedField("Affinity_Type") affinity_degree = fields.TypedField("Affinity_Degree") description = fields.TypedField("Description") def __init__(self, name = None): super(BaseCollection, self).__init__() self.name = name
[docs]class ActionCollection(BaseCollection): _binding = bundle_binding _binding_class = bundle_binding.ActionCollectionType _namespace = _namespace id_ = fields.TypedField("id") action_list = fields.TypedField("Action_List", ActionList) def __init__(self, name = None, id = None): super(ActionCollection, self).__init__(name) if id: self.id_ = id else: self.id_ = idgen.create_id(prefix="action_collection") self.action_list = ActionList()
[docs] def add_action(self, action): """Add an input Action to the Collection.""" self.action_list.append(action)
[docs]class BehaviorCollection(BaseCollection): _binding = bundle_binding _binding_class = bundle_binding.BehaviorCollectionType _namespace = _namespace id_ = fields.TypedField("id") behavior_list = fields.TypedField("Behavior_List", BehaviorList) def __init__(self, name = None, id = None): super(BehaviorCollection, self).__init__(name) if id: self.id_ = id else: self.id_ = idgen.create_id(prefix="behavior_collection") self.behavior_list = BehaviorList()
[docs] def add_behavior(self, behavior): """Add an input Behavior to the Collection.""" self.behavior_list.append(behavior)
[docs]class ObjectCollection(BaseCollection): _binding = bundle_binding _binding_class = bundle_binding.ObjectCollectionType _namespace = _namespace id_ = fields.TypedField("id") object_list = fields.TypedField("Object_List", ObjectList) def __init__(self, name = None, id = None): super(ObjectCollection, self).__init__(name) if id: self.id_ = id else: self.id_ = idgen.create_id(prefix="object_collection") self.object_list = ObjectList()
[docs] def add_object(self, object): """Add an input Object to the Collection.""" self.object_list.append(object)
[docs]class CandidateIndicatorCollection(BaseCollection): _binding = bundle_binding _binding_class = bundle_binding.CandidateIndicatorCollectionType _namespace = _namespace id_ = fields.TypedField("id") candidate_indicator_list = fields.TypedField("Candidate_Indicator_List", CandidateIndicatorList) def __init__(self, name = None, id = None): super(CandidateIndicatorCollection, self).__init__(name) if id: self.id_ = id else: self.id_ = idgen.create_id(prefix="candidate_indicator_collection") self.candidate_indicator_list = CandidateIndicatorList()
[docs] def add_candidate_indicator(self, candidate_indicator): """Add an input Candidate Indicator to the Collection.""" self.candidate_indicator_list.append(candidate_indicator)
[docs]class BehaviorCollectionList(maec.EntityList): _binding_class = bundle_binding.BehaviorCollectionListType _namespace = _namespace behavior_collection = fields.TypedField("Behavior_Collection", BehaviorCollection, multiple=True) def __init__(self): super(BehaviorCollectionList, self).__init__()
[docs] def to_obj(self, ns_info=None): behavior_collection_list_obj = super(BehaviorCollectionList, self).to_obj() for behavior_collection in self: if len(behavior_collection.behavior_list) > 0: behavior_collection_list_obj.add_Behavior_Collection(behavior_collection.to_obj(ns_info=ns_info)) if behavior_collection_list_obj.hasContent_(): return behavior_collection_list_obj
[docs] def has_collection(self, collection_name): """Checks for the existence of a specific named Collection in the list, based on the its name.""" for collection in self: if collection.name is not None and collection.name == collection_name: return True return False
[docs] def get_named_collection(self, collection_name): """Return a specific named Collection from the list, based on its name.""" for collection in self: if collection.name is not None and collection.name == collection_name: return collection return None
[docs]class ActionCollectionList(maec.EntityList): _binding_class = bundle_binding.ActionCollectionListType _namespace = _namespace action_collection = fields.TypedField("Action_Collection", ActionCollection, multiple=True) def __init__(self): super(ActionCollectionList, self).__init__()
[docs] def to_obj(self, ns_info=None): action_collection_list_obj = super(ActionCollectionList, self).to_obj() action_collection_list_obj.set_Action_Collection([]) for action_collection in self: if len(action_collection.action_list) > 0: action_collection_list_obj.add_Action_Collection(action_collection.to_obj(ns_info=ns_info)) if action_collection_list_obj.hasContent_(): return action_collection_list_obj
[docs] def has_collection(self, collection_name): """Checks for the existence of a specific named Collection in the list, based on the its name.""" for collection in self: if collection.name is not None and collection.name == collection_name: return True return False
[docs] def get_named_collection(self, collection_name): """Return a specific named Collection from the list, based on its name.""" for collection in self: if collection.name is not None and collection.name == collection_name: return collection return None
[docs]class ObjectCollectionList(maec.EntityList): _binding_class = bundle_binding.ObjectCollectionListType _namespace = _namespace object_collection = fields.TypedField("Object_Collection", ObjectCollection, multiple=True) def __init__(self): super(ObjectCollectionList, self).__init__()
[docs] def to_obj(self, ns_info=None): object_collection_list_obj = super(ObjectCollectionList, self).to_obj() for object_collection in self: if len(object_collection.object_list) > 0: object_collection_list_obj.add_Object_Collection(object_collection.to_obj(ns_info=ns_info)) if object_collection_list_obj.hasContent_(): return object_collection_list_obj
[docs] def has_collection(self, collection_name): """Checks for the existence of a specific named Collection in the list, based on the its name.""" for collection in self: if collection.name is not None and collection.name == collection_name: return True return False
[docs] def get_named_collection(self, collection_name): """Return a specific named Collection from the list, based on its name.""" for collection in self: if collection.name is not None and collection.name == collection_name: return collection return None
[docs]class CandidateIndicatorCollectionList(maec.EntityList): _binding_class = bundle_binding.CandidateIndicatorCollectionListType _namespace = _namespace candidate_indicator_collection = fields.TypedField("Candidate_Indicator_Collection", CandidateIndicatorCollection, multiple=True) def __init__(self): super(CandidateIndicatorCollectionList, self).__init__()
[docs] def to_obj(self, ns_info=None): candidate_indicator_collection_list_obj = super(CandidateIndicatorCollectionList, self).to_obj() for candidate_indicator_collection in self: if len(candidate_indicator_collection.candidate_indicator_list) > 0: candidate_indicator_collection_list_obj.add_Candidate_Indicator_Collection(candidate_indicator_collection.to_obj(ns_info=ns_info)) if candidate_indicator_collection_list_obj.hasContent_(): return candidate_indicator_collection_list_obj
[docs] def has_collection(self, collection_name): """Checks for the existence of a specific named Collection in the list, based on the its name.""" for collection in self: if collection.name is not None and collection.name == collection_name: return True return False
[docs] def get_named_collection(self, collection_name): """Return a specific named Collection from the list, based on its name.""" for collection in self: if collection.name is not None and collection.name == collection_name: return collection return None
[docs]class Collections(maec.Entity): _binding = bundle_binding _binding_class = bundle_binding.CollectionsType _namespace = _namespace behavior_collections = fields.TypedField("Behavior_Collections", BehaviorCollectionList) action_collections = fields.TypedField("Action_Collections", ActionCollectionList) object_collections = fields.TypedField("Object_Collections", ObjectCollectionList) candidate_indicator_collections = fields.TypedField("Candidate_Indicator_Collections", CandidateIndicatorCollectionList) def __init__(self): super(Collections, self).__init__()
[docs] def add_named_action_collection(self, action_collection_name, collection_id = None): """Add a new named Action Collection to the Collections instance.""" if not self.action_collections: self.action_collections = ActionCollectionList() self.action_collections.append(ActionCollection(action_collection_name, collection_id))
[docs] def add_named_object_collection(self, object_collection_name, collection_id = None): """Add a new named Object Collection to the Collections instance.""" if not self.object_collections: self.object_collections = ObjectCollectionList() self.object_collections.append(ObjectCollection(object_collection_name, collection_id))
[docs] def add_named_behavior_collection(self, behavior_collection_name, collection_id = None): """Add a new named Behavior Collection to the Collections instance.""" if not self.behavior_collections: self.behavior_collections = BehaviorCollectionList() self.behavior_collections.append(BehaviorCollection(behavior_collection_name, collection_id))
[docs] def add_named_candidate_indicator_collection(self, candidate_indicator_collection_name, collection_id = None): """Add a new named Candidate Indicator Collection to the Collections instance.""" if not self.candidate_indicator_collections: self.candidate_indicator_collections = CandidateIndicatorCollectionList() self.candidate_indicator_collections.append(CandidateIndicatorCollection(candidate_indicator_collection_name, collection_id))
[docs] def has_content(self): """Returns true if any Collections instance inside of the Collection has len > 0.""" if self.behavior_collections and len(self.behavior_collections) > 0: return True elif self.action_collections and len(self.action_collections) > 0: return True elif self.object_collections and len(self.object_collections) > 0: return True elif self.candidate_indicator_collections and len(self.candidate_indicator_collections) > 0: return True return False
[docs]class BehaviorReference(maec.Entity): _binding = bundle_binding _binding_class = bundle_binding.BehaviorReferenceType _namespace = _namespace behavior_idref = fields.TypedField('behavior_idref')
[docs]class Bundle(maec.Entity): _binding = bundle_binding _namespace = _namespace _binding_class = bundle_binding.BundleType id_ = fields.TypedField("id") schema_version = fields.TypedField("schema_version") defined_subject = fields.TypedField("defined_subject") content_type = fields.TypedField("content_type") timestamp = fields.TypedField("timestamp") malware_instance_object_attributes = fields.TypedField("Malware_Instance_Object_Attributes", Object) av_classifications = fields.TypedField("AV_Classifications", AVClassifications) actions = fields.TypedField("Actions", ActionList) process_tree = fields.TypedField("Process_Tree", ProcessTree) behaviors = fields.TypedField("Behaviors", BehaviorList) capabilities = fields.TypedField("Capabilities", CapabilityList) objects = fields.TypedField("Objects", ObjectList) candidate_indicators = fields.TypedField("Candidate_Indicators", CandidateIndicatorList) collections = fields.TypedField("Collections", Collections) def __init__(self, id = None, defined_subject = False, schema_version = "4.1", content_type = None, malware_instance_object = None): super(Bundle, self).__init__() if id: self.id_ = id else: self.id_ = idgen.create_id(prefix="bundle") self.schema_version = schema_version self.defined_subject = defined_subject self.content_type = content_type self.timestamp = None self.malware_instance_object_attributes = malware_instance_object self.__input_namespaces__ = {} self.__input_schemalocations__ = {}
[docs] def set_malware_instance_object_attributes(self, malware_instance_object): """Set the top-level Malware Instance Object Attributes entity in the Bundle.""" self.malware_instance_object_attributes = malware_instance_object
[docs] def add_av_classification(self, av_classification): """Add an AV Classification to the top-level AV_Classifications entity in the Bundle.""" if not self.av_classifications: self.av_classifications = AVClassifications() self.av_classifications.append(av_classification)
[docs] def add_capability(self, capability): """Add a Capability to the top-level Capabilities entity in the Bundle.""" if not self.capabilities: self.capabilities = CapabilityList() self.capabilities.capability.append(capability)
[docs] def set_process_tree(self, process_tree): """Set the Process Tree, in the top-level <Process_Tree> element.""" self.process_tree = process_tree
[docs] def add_named_action_collection(self, collection_name, collection_id = None): """Add a new named Action Collection to the top-level Collections entity in the Bundle.""" if not self.collections: self.collections = Collections() if collection_name is not None: self.collections.add_named_action_collection(collection_name, collection_id)
[docs] def add_action(self, action, action_collection_name = None): """Add an Action to an existing named Action Collection in the Collections entity. If it does not exist, add it to the top-level Actions entity.""" if action_collection_name is not None and self.collections: #The collection has already been defined if self.collections.action_collections.has_collection(action_collection_name): action_collection = self.collections.action_collections.get_named_collection(action_collection_name) action_collection.add_action(action) elif action_collection_name == None: if not self.actions: self.actions = ActionList() self.actions.append(action)
[docs] def add_named_object_collection(self, collection_name, collection_id = None): """Add a new named Object Collection to the Collections entity in the Bundle.""" if not self.collections: self.collections = Collections() if collection_name is not None: self.collections.add_named_object_collection(collection_name, collection_id)
[docs] def get_all_actions(self, bin = False): """Return a list of all Actions in the Bundle.""" all_actions = [] if self.actions: for action in self.actions: all_actions.append(action) if self.collections and self.collections.action_collections: for collection in self.collections.action_collections: for action in collection.action_list: all_actions.append(action) if bin: binned_actions = {} for action in all_actions: if action.name and action.name.value not in binned_actions: binned_actions[action.name.value] = [action] elif action.name and action.name.value in binned_actions: binned_actions[action.name.value].append(action) return binned_actions else: return all_actions
[docs] def get_all_actions_on_object(self, object): """Return a list of all of the Actions in the Bundle that operate on a particular input Object.""" object_actions = [] if object.id_: for action in self.get_all_actions(): associated_objects = action.associated_objects if associated_objects: for associated_object in associated_objects: if associated_object.idref and associated_object.idref == object.id_: object_actions.append(action) elif associated_object.id_ and associated_object.id_ == object.id_: object_actions.append(action) return object_actions
[docs] def add_object(self, object, object_collection_name = None): """Add an Object to an existing named Object Collection in the Collections entity. If it does not exist, add it to the top-level Object entity.""" if object_collection_name is not None and self.collections: #The collection has already been defined if self.collections.object_collections.has_collection(object_collection_name): object_collection = self.collections.object_collections.get_named_collection(object_collection_name) object_collection.add_object(object) elif object_collection_name == None: if not self.objects: self.objects = ObjectList() self.objects.append(object)
[docs] def get_all_objects(self, include_actions = False): """Return a list of all Objects in the Bundle.""" all_objects = [] if self.objects: for obj in self.objects: all_objects.append(obj) if obj.related_objects: for related_obj in obj.related_objects: all_objects.append(related_obj) if self.collections and self.collections.object_collections: for collection in self.collections.object_collections: for obj in collection.object_list: all_objects.append(obj) if obj.related_objects: for related_obj in obj.related_objects: all_objects.append(related_obj) # Include Objects in Actions, if include_actions flag is specified if include_actions: for action in self.get_all_actions(): associated_objects = action.associated_objects if associated_objects: for associated_object in associated_objects: all_objects.append(associated_object) if associated_object.related_objects: for related_obj in associated_object.related_objects: all_objects.append(related_obj) # Add the Object corresponding to the Malware Instance Object Attributes, if specified if self.malware_instance_object_attributes: all_objects.append(self.malware_instance_object_attributes) return all_objects
[docs] def get_all_multiple_referenced_objects(self): """Return a list of all Objects in the Bundle that are referenced more than once.""" idref_list = [x.idref for x in self.get_all_objects() if x.idref] return [self.get_object_by_id(x) for x in idref_list if self.get_object_by_id(x)]
[docs] def get_all_non_reference_objects(self): """Return a list of all Objects in the Bundle that are not references (i.e. all of the actual Objects in the Bundle).""" return [x for x in self.get_all_objects(True) if x.id_ and not x.idref]
[docs] def get_object_by_id(self, id, extra_objects = [], ignore_actions = False): """Find and return the Entity (Action, Object, etc.) with the specified ID.""" if not ignore_actions: if self.actions: for action in self.actions: if action.id_ == id: return action if action.associated_objects: for associated_obj in action.associated_objects: if associated_obj.id_ == id: return associated_obj if self.collections and self.collections.action_collections: for collection in self.collections.action_collections: for action in collection.action_list: if action.id_ == id: return action if action.associated_objects: for associated_obj in action.associated_objects: if associated_obj.id_ == id: return associated_obj if self.objects: for obj in self.objects: if obj.id_ == id: return obj if self.collections and self.collections.object_collections: for collection in self.collections.object_collections: for obj in collection.object_list: if obj.id_ == id: return obj # Test the extra_objects Array for obj in extra_objects: if obj.id_ == id: return obj
[docs] def add_named_behavior_collection(self, collection_name, collection_id = None): """Add a new named Behavior Collection to the Collections entity in the Bundle.""" if not self.collections: self.collections = Collections() if collection_name is not None: self.collections.add_named_behavior_collection(collection_name, collection_id)
[docs] def add_behavior(self, behavior, behavior_collection_name = None): """Add a Behavior to an existing named Behavior Collection in the Collections entity. If it does not exist, add it to the top-level Behaviors entity.""" if behavior_collection_name is not None and self.collections: #The collection has already been defined if self.collections.behavior_collections.has_collection(behavior_collection_name): behavior_collection = self.collections.behavior_collections.get_named_collection(behavior_collection_name) behavior_collection.add_behavior(behavior) elif behavior_collection_name == None: if not self.behaviors: self.behaviors = BehaviorList() self.behaviors.append(behavior)
[docs] def add_named_candidate_indicator_collection(self, collection_name, collection_id = None): """Add a new named Candidate Indicator Collection to the Collections entity in the Bundle.""" if not self.collections: self.collections = Collections() if collection_name is not None: self.collections.add_named_candidate_indicator_collection(collection_name, collection_id)
[docs] def add_candidate_indicator(self, candidate_indicator, candidate_indicator_collection_name = None): """Add a Candidate Indicator to an existing named Candidate Indicator Collection in the Collections entity. If it does not exist, add it to the top-level Candidate Indicators entity.""" if candidate_indicator_collection_name is not None and self.collections: #The collection has already been defined if self.collections.candidate_indicator_collections.has_collection(candidate_indicator_collection_name): candidate_indicator_collection = self.collections.candidate_indicator_collections.get_named_collection(candidate_indicator_collection_name) candidate_indicator_collection.add_candidate_indicator(candidate_indicator) elif candidate_indicator_collection_name == None: if not self.candidate_indicators: self.candidate_indicators = CandidateIndicatorList() self.candidate_indicators.append(candidate_indicator)
[docs] def deduplicate(self): """Deduplicate all Objects in the Bundle. Add duplicate Objects to new "Deduplicated Objects" Object Collection, and replace duplicate entries with references to corresponding Object.""" BundleDeduplicator.deduplicate(self)
[docs] def get_action_objects(self, action_name_list): """Get all Objects corresponding to one or more types of Actions, specified via a list of Action names.""" action_objects = {} all_actions = self.get_all_actions(bin=True) for action_name in action_name_list: if action_name in all_actions: associated_objects = [] associated_object_lists = [[y for y in x.associated_objects if x.associated_objects] for x in all_actions[action_name]] for associated_object_list in associated_object_lists: associated_objects += associated_object_list action_objects[action_name] = associated_objects return action_objects
[docs] def get_object_history(self): """Build and return the Object history for the Bundle.""" return ObjectHistory.build(self)
[docs] def normalize_objects(self): """Normalize all Objects in the Bundle, using the CybOX normalize module.""" all_objects = self.get_all_objects(include_actions = True) for object in all_objects: if object.properties: normalize_object_properties(object.properties)
[docs] def dereference_objects(self, extra_objects = []): """Dereference any Objects in the Bundle by replacing them with the entities they reference.""" all_objects = self.get_all_objects(include_actions=True) # Add any extra objects that were passed, e.g. from a Malware Subject all_objects = all_objects + extra_objects for object in all_objects: if object.idref and not object.id_: real_object = self.get_object_by_id(object.idref, extra_objects, ignore_actions = True) if real_object: object.idref = None object.id_ = real_object.id_ object.properties = real_object.properties
[docs] @classmethod def compare(cls, bundle_list, match_on = None, case_sensitive = True): """Compare the Bundle to a list of other Bundles, returning a BundleComparator object.""" return BundleComparator.compare(bundle_list, match_on, case_sensitive)