# MAEC Process Tree classes
# Copyright (c) 2018, The MITRE Corporation
# All rights reserved
from mixbox import fields
from mixbox import idgen
from cybox.objects.process_object import Process
import maec
import maec.bindings.maec_bundle as bundle_binding
from . import _namespace
from .action_reference_list import ActionReferenceList
[docs]class ProcessTreeNode(Process):
_binding = bundle_binding
_binding_class = bundle_binding.ProcessTreeNodeType
_namespace = _namespace
_XSI_NS = "maecBundle"
_XSI_TYPE = "ProcessTreeNodeType"
superclass = Process
id_ = fields.TypedField("id")
parent_action_idref = fields.TypedField("parent_action_idref")
ordinal_position = fields.TypedField("ordinal_position")
initiated_actions = fields.TypedField("Initiated_Actions", ActionReferenceList)
spawned_process = fields.TypedField("Spawned_Process", multiple = True)
injected_process = fields.TypedField("Injected_Process", multiple = True)
def __init__(self, id = None, parent_action_idref = None):
super(ProcessTreeNode, self).__init__()
if id:
self.id_ = id
else:
self.id_ = idgen.create_id(prefix="process_tree")
self.parent_action_idref = parent_action_idref
[docs] def add_spawned_process(self, process_node, process_id = None):
"""Add a spawned process to the Process Tree node, either directly or to a
particular process embedded in the node based on its ID."""
if not process_id:
if not self.spawned_process:
self.spawned_process = []
self.spawned_process.append(process_node)
elif process_id:
if str(self.pid) == process_id:
if not self.spawned_process:
self.spawned_process = []
self.spawned_process.append(process_node)
else:
embedded_process = self.find_embedded_process(process_id)
if embedded_process:
if not embedded_process.spawned_process:
embedded_process.spawned_process = []
embedded_process.spawned_process.append(process_node)
[docs] def add_injected_process(self, process_node, process_id = None):
"""Add an injected process to the Process Tree node, either directly or to a
particular process embedded in the node based on its ID."""
if not process_id:
if not self.injected_process:
self.injected_process = []
self.injected_process.append(process_node)
elif process_id:
if str(self.pid) == process_id:
if not self.injected_process:
self.injected_process = []
self.injected_process.append(process_node)
else:
embedded_process = self.find_embedded_process(process_id)
if embedded_process:
if not embedded_process.injected_process:
embedded_process.injected_process = []
embedded_process.injected_process.append(process_node)
[docs] def add_initiated_action(self, action_id):
"""Add an initiated Action to the Process Tree node, based on its ID."""
if not self.initiated_actions:
self.initiated_actions = ActionReferenceList()
self.initiated_actions.append(action_id)
[docs] def find_embedded_process(self, process_id):
"""Find a Process embedded somewhere in the Process Tree node tree, based on its ID."""
embedded_process = None
if self.spawned_process:
for spawned_process in self.spawned_process:
if str(spawned_process.pid) == str(process_id):
embedded_process = spawned_process
else:
embedded_process = spawned_process.find_embedded_process(process_id)
if self.injected_process:
for injected_process in self.injected_process:
if str(injected_process.pid) == str(process_id):
embedded_process = injected_process
else:
embedded_process = injected_process.find_embedded_process(process_id)
return embedded_process
[docs] def set_id(self, id):
"""Set the ID of the Process Tree node."""
self.id_ = id
[docs] def set_parent_action(self, parent_action_id):
"""Set the ID of the parent action of the Process Tree node."""
self.parent_action_idref = parent_action_id
[docs]class ProcessTree(maec.Entity):
_binding = bundle_binding
_binding_class = bundle_binding.ProcessTreeType
_namespace = _namespace
root_process = fields.TypedField("Root_Process", ProcessTreeNode)
def __init__(self, root_process = None):
super(ProcessTree, self).__init__()
self.root_process = root_process
[docs] def set_root_process(self, root_process):
"""Set the Root Process node of the Process Tree entity."""
self.root_process = root_process
# Allow recursive definition of ProcessTreeNodes
ProcessTreeNode.spawned_process.type_ = ProcessTreeNode
ProcessTreeNode.injected_process.type_ = ProcessTreeNode