"""
Common functions for building CPEs
"""
from __future__ import absolute_import
from __future__ import print_function
import os
import sys
import ssg.id_translate
from .constants import oval_namespace
from .constants import PREFIX_TO_NS
from .utils import required_key, apply_formatting_on_dict_values
from .xml import ElementTree as ET
from .boolean_expression import Algebra, Symbol, Function
from .entities.common import XCCDFEntity, Templatable
from .yaml import convert_string_to_bool
from .oval_object_model import load_oval_document, OVALDefinitionReference
from .id_translate import IDTranslator
from .xml import parse_file
[docs]
class CPEDoesNotExist(Exception):
pass
[docs]
class ProductCPEs(object):
"""
Reads from the disk all the yaml CPEs related to a product
and provides them in a structured way.
"""
def __init__(self):
self.cpes_by_id = {}
self.cpes_by_name = {}
self.product_cpes = {}
self.platforms = {}
self.cpe_oval_href = ""
self.algebra = Algebra(
symbol_cls=CPEALCheckFactRef, function_cls=CPEALLogicalTest)
[docs]
def load_product_cpes(self, env_yaml):
self.cpe_oval_href = "ssg-" + env_yaml["product"] + "-cpe-oval.xml"
try:
product_cpes_list = env_yaml["cpes"]
self.load_product_cpes_from_list(product_cpes_list)
except KeyError as exc:
raise Exception("Product %s does not define 'cpes'" % (env_yaml["product"]))
[docs]
def load_product_cpes_from_list(self, product_cpes_list):
for cpe_dict_repr in product_cpes_list:
for cpe_id, cpe in cpe_dict_repr.items():
# these product CPEs defined in product.yml are defined
# differently than CPEs in shared/applicability/*.yml
# therefore we have to place the ID at the place where it is expected
cpe["id_"] = cpe_id
cpe_item = CPEItem.get_instance_from_full_dict(cpe)
cpe_item.is_product_cpe = True
self.add_cpe_item(cpe_item)
[docs]
def load_content_cpes(self, env_yaml):
cpes_root = required_key(env_yaml, "cpes_root")
if not os.path.isabs(cpes_root):
cpes_root = os.path.join(env_yaml["product_dir"], cpes_root)
self.load_cpes_from_directory_tree(cpes_root, env_yaml)
[docs]
def load_cpes_from_list(self, cpes_list):
for cpe_dict_repr in cpes_list:
for cpe_id, cpe in cpe_dict_repr.items():
cpe["id_"] = cpe_id
cpe_item = CPEItem.get_instance_from_full_dict(cpe)
self.add_cpe_item(cpe_item)
[docs]
def load_cpes_from_directory_tree(self, root_path, env_yaml):
for dir_item in sorted(os.listdir(root_path)):
dir_item_path = os.path.join(root_path, dir_item)
if not os.path.isfile(dir_item_path):
continue
_, ext = os.path.splitext(os.path.basename(dir_item_path))
if ext != '.yml':
sys.stderr.write(
"Encountered file '%s' while looking for content CPEs, "
"extension '%s' is unknown. Skipping..\n"
% (dir_item, ext)
)
continue
cpe_item = CPEItem.from_yaml(dir_item_path, env_yaml)
self.add_cpe_item(cpe_item)
[docs]
def add_cpe_item(self, cpe_item):
self.cpes_by_id[cpe_item.id_] = cpe_item
self.cpes_by_name[cpe_item.name] = cpe_item
if cpe_item.is_product_cpe:
self.product_cpes[cpe_item.id_] = cpe_item
[docs]
def get_cpe(self, cpe_id_or_name):
try:
if CPEItem.is_cpe_name(cpe_id_or_name):
return self.cpes_by_name[cpe_id_or_name]
else:
if CPEALCheckFactRef.cpe_id_is_parametrized(cpe_id_or_name):
cpe_id_or_name = CPEALCheckFactRef.get_base_name_of_parametrized_cpe_id(
cpe_id_or_name)
return self.cpes_by_id[cpe_id_or_name]
except KeyError:
raise CPEDoesNotExist("CPE %s is not defined" % cpe_id_or_name)
[docs]
def get_cpe_for_fact_ref(self, fact_ref):
return self.get_cpe(fact_ref.as_id())
[docs]
def get_cpe_name(self, cpe_id):
cpe = self.get_cpe(cpe_id)
return cpe.name
[docs]
def get_product_cpe_names(self):
return [cpe.name for cpe in self.product_cpes.values()]
[docs]
class CPEList(object):
"""
Represents the cpe-list element from the CPE standard.
"""
prefix = "cpe-dict"
ns = PREFIX_TO_NS[prefix]
def __init__(self):
self.cpe_items = []
[docs]
def add(self, cpe_item):
self.cpe_items.append(cpe_item)
@staticmethod
def _create_cpe_list_xml_skeleton():
cpe_list = ET.Element("{%s}cpe-list" % CPEList.ns)
cpe_list.set("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance")
cpe_list.set("xsi:schemaLocation",
"http://cpe.mitre.org/dictionary/2.0 "
"http://cpe.mitre.org/files/cpe-dictionary_2.1.xsd")
return cpe_list
def _add_cpe_items_xml(self, cpe_list, cpe_oval_file, selection_of_cpe_names):
self.cpe_items.sort(key=lambda cpe: cpe.name)
for cpe_item in self.cpe_items:
if cpe_item.name in selection_of_cpe_names:
cpe_list.append(cpe_item.to_xml_element(cpe_oval_file))
[docs]
def to_xml_element(self, cpe_oval_file, selection_of_cpe_names=None):
cpe_list = self._create_cpe_list_xml_skeleton()
if selection_of_cpe_names is None:
selection_of_cpe_names = [cpe_item.name for cpe_item in self.cpe_items]
self._add_cpe_items_xml(cpe_list, cpe_oval_file, selection_of_cpe_names)
if hasattr(ET, "indent"):
ET.indent(cpe_list, space=" ", level=0)
return cpe_list
[docs]
def to_file(self, file_name, cpe_oval_file, selection_of_cpe_names=None):
root = self.to_xml_element(cpe_oval_file, selection_of_cpe_names)
tree = ET.ElementTree(root)
tree.write(file_name, encoding="utf-8")
[docs]
def translate_cpe_oval_def_ids(self):
for cpe_item in self.cpe_items:
cpe_item.set_cpe_oval_def_id()
[docs]
class CPEItem(XCCDFEntity, Templatable):
"""
Represents the cpe-item element from the CPE standard.
"""
KEYS = dict(
name=lambda: "",
check_id=lambda: "",
bash_conditional=lambda: "",
ansible_conditional=lambda: "",
is_product_cpe=lambda: False,
versioned=lambda: False,
args=lambda: {},
content_id=lambda: "ssg",
** XCCDFEntity.KEYS
)
KEYS.update(**Templatable.KEYS)
MANDATORY_KEYS = [
"name",
]
prefix = "cpe-dict"
ns = PREFIX_TO_NS[prefix]
@property
def cpe_oval_short_def_id(self):
return self.check_id or self.id_
@property
def cpe_oval_def_id(self):
translator = ssg.id_translate.IDTranslator(self.content_id)
full_id = translator.generate_id(
"{" + oval_namespace + "}definition", self.cpe_oval_short_def_id)
return full_id
[docs]
def set_cpe_oval_def_id(self):
self.check_id = self.cpe_oval_def_id
[docs]
def to_xml_element(self, cpe_oval_filename):
cpe_item = ET.Element("{%s}cpe-item" % CPEItem.ns)
cpe_item.set('name', self.name)
cpe_item_title = ET.SubElement(cpe_item, "{%s}title" % CPEItem.ns)
cpe_item_title.set('xml:lang', "en-us")
cpe_item_title.text = self.title
cpe_item_check = ET.SubElement(cpe_item, "{%s}check" % CPEItem.ns)
cpe_item_check.set('system', oval_namespace)
cpe_item_check.set('href', cpe_oval_filename)
cpe_item_check.text = self.cpe_oval_short_def_id
return cpe_item
[docs]
@classmethod
def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
cpe_item = super(CPEItem, cls).from_yaml(yaml_file, env_yaml, product_cpes)
if cpe_item.is_product_cpe:
cpe_item.is_product_cpe = convert_string_to_bool(cpe_item.is_product_cpe)
if cpe_item.versioned:
cpe_item.versioned = convert_string_to_bool(cpe_item.versioned)
return cpe_item
[docs]
def set_template_variables(self, *sources):
if self.is_templated():
self.template["vars"] = {}
for source in sources:
self.template["vars"].update(source)
[docs]
def create_resolved_cpe_item_for_fact_ref(self, fact_ref):
if fact_ref.has_version_specs():
if not self.versioned:
raise ValueError("CPE entity '{0}' does not support version specifiers: "
"{1}".format(self.id_, fact_ref.cpe_name))
try:
resolved_parameters = self.args[fact_ref.arg]
except KeyError:
raise KeyError(
"The {0} CPE item does not support the argument {1}. "
"Following arguments are supported: {2}".format(
self.id_, fact_ref.arg, [a for a in self.args.keys()]))
resolved_parameters.update(fact_ref.as_dict())
cpe_item_as_dict = self.represent_as_dict()
cpe_item_as_dict["args"] = None
cpe_item_as_dict["id_"] = fact_ref.as_id()
new_associated_cpe_item_as_dict = apply_formatting_on_dict_values(
cpe_item_as_dict, resolved_parameters)
new_associated_cpe_item = CPEItem.get_instance_from_full_dict(
new_associated_cpe_item_as_dict)
new_associated_cpe_item.set_template_variables(resolved_parameters)
return new_associated_cpe_item
[docs]
@staticmethod
def is_cpe_name(cpe_id_or_name):
return cpe_id_or_name.startswith("cpe:")
[docs]
def set_conditional(self, language, content):
if language == "ansible":
self.ansible_conditional = content
elif language == "bash":
self.bash_conditional = content
else:
raise RuntimeError(
"The language {0} is not supported as conditional for CPE".format(language))
[docs]
class CPEALLogicalTest(Function):
prefix = "cpe-lang"
ns = PREFIX_TO_NS[prefix]
[docs]
def to_xml_element(self):
cpe_test = ET.Element("{%s}logical-test" % CPEALLogicalTest.ns)
cpe_test.set('operator', ('OR' if self.is_or() else 'AND'))
cpe_test.set('negate', ('true' if self.is_not() else 'false'))
# Logical tests must go first, therefore we separate tests and factrefs
tests = [t for t in self.args if isinstance(t, CPEALLogicalTest)]
factrefs = [f for f in self.args if isinstance(f, CPEALCheckFactRef)]
for obj in tests + factrefs:
cpe_test.append(obj.to_xml_element())
return cpe_test
[docs]
def enrich_with_cpe_info(self, cpe_products):
for arg in self.args:
arg.enrich_with_cpe_info(cpe_products)
[docs]
def to_bash_conditional(self):
child_bash_conds = [
a.to_bash_conditional() for a in self.args
if a.to_bash_conditional() != '']
if not child_bash_conds:
return ""
cond = ""
if self.is_not():
cond += "! "
op = " "
cond += "( "
if self.is_or():
op = " || "
elif self.is_and():
op = " && "
cond += op.join(child_bash_conds)
cond += " )"
return cond
[docs]
def to_ansible_conditional(self):
child_ansible_conds = [
a.to_ansible_conditional() for a in self.args
if a.to_ansible_conditional() != '']
if not child_ansible_conds:
return ""
cond = ""
if self.is_not():
cond += "not "
op = " "
cond += "( "
if self.is_or():
op = " or "
elif self.is_and():
op = " and "
cond += op.join(child_ansible_conds)
cond += " )"
return cond
[docs]
class CPEALCheckFactRef(Symbol):
prefix = "cpe-lang"
ns = PREFIX_TO_NS[prefix]
def __init__(self, obj):
super(CPEALCheckFactRef, self).__init__(obj)
self.cpe_name = obj # we do not want to modify original name used for platforms
self.bash_conditional = ""
self.ansible_conditional = ""
[docs]
def enrich_with_cpe_info(self, cpe_products):
self.cpe_oval_href = cpe_products.cpe_oval_href
cpe_item = cpe_products.get_cpe(self.cpe_name)
self.bash_conditional = cpe_item.bash_conditional
self.ansible_conditional = cpe_item.ansible_conditional
self.cpe_name = cpe_products.get_cpe_name(self.cpe_name)
self.cpe_oval_def_id = cpe_item.cpe_oval_def_id
[docs]
def to_xml_element(self):
el = ET.Element("{%s}check-fact-ref" % CPEALCheckFactRef.ns)
el.set("system", oval_namespace)
el.set("href", self.cpe_oval_href)
el.set("id-ref", self.cpe_oval_def_id)
return el
[docs]
def to_bash_conditional(self):
return self.bash_conditional
[docs]
def to_ansible_conditional(self):
return self.ansible_conditional
[docs]
@staticmethod
def cpe_id_is_parametrized(cpe_id):
return Symbol.is_parametrized(cpe_id)
[docs]
@staticmethod
def get_base_name_of_parametrized_cpe_id(cpe_id):
"""
If given a parametrized platform name such as package[test],
it returns the package part only.
"""
return Symbol.get_base_of_parametrized_name(cpe_id)
[docs]
def get_linked_cpe_oval_document(unlinked_oval_file_path):
oval_document = load_oval_document(parse_file(unlinked_oval_file_path))
oval_document.product_name = os.path.basename(__file__)
references_to_keep = OVALDefinitionReference()
for oval_def in oval_document.definitions.values():
if oval_def.class_ != "inventory":
continue
references_to_keep += oval_document.get_all_references_of_definition(
oval_def.id_
)
oval_document.keep_referenced_components(references_to_keep)
translator = IDTranslator("ssg")
oval_document = translator.translate_oval_document(oval_document)
return oval_document