Source code for ssg.xml

from __future__ import absolute_import
from __future__ import print_function
import collections

import platform
import re

from .constants import (
    xml_version, oval_header, timestamp, PREFIX_TO_NS, XCCDF11_NS, XCCDF12_NS)
from .constants import (
    datastream_namespace,
    oval_namespace,
    stig_ns,
    cat_namespace,
    xlink_namespace,
    ocil_namespace,
    cpe_language_namespace,
)


try:
    from xml.etree import cElementTree as ElementTree
except ImportError:
    from xml.etree import ElementTree as ElementTree


[docs] def oval_generated_header(product_name, schema_version, ssg_version): return xml_version + oval_header + \ """ <generator> <oval:product_name>%s from SCAP Security Guide</oval:product_name> <oval:product_version>ssg: %s, python: %s</oval:product_version> <oval:schema_version>%s</oval:schema_version> <oval:timestamp>%s</oval:timestamp> </generator>""" % (product_name, ssg_version, platform.python_version(), schema_version, timestamp)
[docs] def register_namespaces(ns=None): """ Register all possible namespaces """ try: if ns is None: ns = PREFIX_TO_NS for prefix, uri in ns.items(): ElementTree.register_namespace(prefix, uri) except Exception: # Probably an old version of Python # Doesn't matter, as this is non-essential. pass
[docs] def get_namespaces_from(file): """ Return dictionary of namespaces in file. Return empty dictionary in case of error. """ result = {} try: result = { key: value for _, (key, value) in ElementTree.iterparse(file, events=["start-ns"]) } except Exception: # Probably an old version of Python # Doesn't matter, as this is non-essential. pass finally: return result
[docs] def open_xml(filename): """ Given a filename, register all possible namespaces, and return the XML tree. """ register_namespaces() return ElementTree.parse(filename)
[docs] def parse_file(filename): """ Given a filename, return the root of the ElementTree """ tree = open_xml(filename) return tree.getroot()
[docs] def map_elements_to_their_ids(tree, xpath_expr): """ Given an ElementTree and an XPath expression, iterate through matching elements and create 1:1 id->element mapping. Raises AssertionError if a matching element doesn't have the ``id`` attribute. Returns mapping as a dictionary """ aggregated = {} for element in tree.findall(xpath_expr): element_id = element.get("id") assert element_id is not None aggregated[element_id] = element return aggregated
SSG_XHTML_TAGS = [ 'table', 'tr', 'th', 'td', 'ul', 'li', 'ol', 'p', 'code', 'strong', 'b', 'em', 'i', 'pre', 'br', 'hr', 'small', ]
[docs] def add_xhtml_namespace(data): """ Given a xml blob, adds the xhtml namespace to all relevant tags. """ # The use of lambda in the lines below is a workaround for https://bugs.python.org/issue1519638 # I decided for this approach to avoid adding workarounds in the matching regex, this way only # the substituted part contains the workaround. # Transform <tt> in <code> data = re.sub(r'<(\/)?tt(\/)?>', lambda m: r'<' + (m.group(1) or '') + 'code' + (m.group(2) or '') + '>', data) # Adds xhtml prefix to elements: <tag>, </tag>, <tag/> return re.sub(r'<(\/)?((?:%s).*?)(\/)?>' % "|".join(SSG_XHTML_TAGS), lambda m: r'<' + (m.group(1) or '') + 'xhtml:' + (m.group(2) or '') + (m.group(3) or '') + '>', data)
[docs] def determine_xccdf_tree_namespace(tree): root = tree.getroot() if root.tag == "{%s}Benchmark" % XCCDF11_NS: xccdf_ns = XCCDF11_NS elif root.tag == "{%s}Benchmark" % XCCDF12_NS: xccdf_ns = XCCDF12_NS else: raise ValueError("Unknown root element '%s'" % root.tag) return xccdf_ns
[docs] def get_element_tag_without_ns(xml_tag): return re.search(r'^{.*}(.*)', xml_tag).group(1)
[docs] def get_element_namespace(self): return re.search(r'^{(.*)}.*', self.root.tag).group(1)
[docs] class XMLElement(object): ''' Represents an generic element read from an XML file. ''' ns = { "ds": datastream_namespace, "xccdf-1.1": XCCDF11_NS, "xccdf-1.2": XCCDF12_NS, "oval": oval_namespace, "catalog": cat_namespace, "xlink": xlink_namespace, "ocil": ocil_namespace, "cpe-lang": cpe_language_namespace, } def __init__(self, root): self.root = root self._determine_xccdf_version()
[docs] def get_attr(self, attr): return self.root.get(attr)
[docs] def get_namespace(self): return re.search(r'^{(.*)}.*', self.root.tag).group(1)
def _determine_xccdf_version(self): if self.get_namespace() == self.ns["xccdf-1.1"]: self.content_xccdf_ns = "xccdf-1.1" else: self.content_xccdf_ns = "xccdf-1.2"
[docs] class XMLContent(XMLElement): ''' Can represent a Data Stream or an XCCDF Benchmark read from an XML file. ''' check_engines = [("OVAL", "oval:oval_definitions"), ("OCIL", "ocil:ocil")] def __init__(self, root): super(XMLContent, self).__init__(root) self.component_refs = self.get_component_refs() self.uris = self.get_uris() self.components = self._find_all_component_contents()
[docs] def get_component_refs(self): component_refs = dict() for ds in self.root.findall("ds:data-stream", self.ns): checks = ds.find("ds:checks", self.ns) for component_ref in checks.findall("ds:component-ref", self.ns): component_ref_href = component_ref.get("{%s}href" % (self.ns["xlink"])) component_ref_id = component_ref.get("id") component_refs[component_ref_href] = component_ref_id return component_refs
[docs] def get_uris(self): uris = dict() for ds in self.root.findall("ds:data-stream", self.ns): checklists = ds.find("ds:checklists", self.ns) catalog = checklists.find(".//catalog:catalog", self.ns) for uri in catalog.findall("catalog:uri", self.ns): uri_uri = uri.get("uri") uri_name = uri.get("name") uris[uri_uri] = uri_name return uris
[docs] def is_benchmark(self): if self.root.tag == "{%s}Benchmark" % (self.ns["xccdf-1.2"]): return True elif self.root.tag == "{%s}Benchmark" % (self.ns["xccdf-1.1"]): self.content_xccdf_ns = "xccdf-1.1" return True
[docs] def get_benchmarks(self): ds_components = self.root.findall("ds:component", self.ns) if not ds_components: # The content is not a DS, maybe it is just an XCCDF Benchmark if self.is_benchmark(): yield XMLBenchmark(self.root) for component in ds_components: for benchmark in component.findall("%s:Benchmark" % self.content_xccdf_ns, self.ns): yield XMLBenchmark(benchmark)
[docs] def find_benchmark(self, id_): ds_components = self.root.findall("ds:component", self.ns) if not ds_components: # The content is not a DS, maybe it is just an XCCDF Benchmark if self.is_benchmark(): return XMLBenchmark(self.root) for component in ds_components: benchmark = component.find("%s:Benchmark[@id='%s']" % (self.content_xccdf_ns, id_), self.ns) if benchmark is not None: return XMLBenchmark(benchmark) return None
def _find_all_component_contents(self): component_doc_dict = collections.defaultdict(dict) for component in self.root.findall("ds:component", self.ns): for check_id, check_tag in self.check_engines: def_doc = component.find(check_tag, self.ns) if def_doc is None: continue comp_id = component.get("id") comp_href = "#" + comp_id try: filename = self.uris["#" + self.component_refs[comp_href]] except KeyError: continue xml_component = XMLComponent(def_doc) component_doc_dict[check_id][filename] = xml_component return component_doc_dict
[docs] class XMLBenchmark(XMLElement): ''' Represents an XCCDF Benchmark read from an XML file. ''' def __init__(self, root): super(XMLBenchmark, self).__init__(root) self.root = root
[docs] def find_rules(self, rule_id): if rule_id: rules = [XMLRule(r) for r in self.root.iterfind( ".//%s:Rule[@id='%s']" % (self.content_xccdf_ns, rule_id), self.ns)] if len(rules) == 0: raise ValueError("Can't find rule %s" % (rule_id)) else: rules = [XMLRule(r) for r in self.root.iterfind( ".//%s:Rule" % (self.content_xccdf_ns), self.ns)] return rules
[docs] def find_rule(self, rule_id): rule = self.root.find( ".//%s:Rule[@id='%s']" % (self.content_xccdf_ns, rule_id), self.ns) return XMLRule(rule) if rule else None
[docs] def find_all_cpe_platforms(self, idref): cpes = [XMLCPEPlatform(p) for p in self.root.iterfind( ".//cpe-lang:platform[@id='{0}']".format(idref.replace("#", "")), self.ns)] return cpes
[docs] class XMLRule(XMLElement): ''' Represents an XCCDF Rule read from an XML file. ''' def __init__(self, root): super(XMLRule, self).__init__(root) self.root = root
[docs] def get_check_element(self, check_system_uri): return self.root.find( "%s:check[@system='%s']" % (self.content_xccdf_ns, check_system_uri), self.ns)
[docs] def get_check_content_ref_element(self, check_element): return check_element.find( "%s:check-content-ref" % (self.content_xccdf_ns), self.ns)
[docs] def get_fix_element(self, fix_uri): return self.root.find("%s:fix[@system='%s']" % (self.content_xccdf_ns, fix_uri), self.ns)
[docs] def get_version_element(self): return self.root.find("%s:version" % (self.content_xccdf_ns), self.ns)
[docs] def get_all_platform_elements(self): return self.root.findall(".//%s:platform" % (self.content_xccdf_ns), self.ns)
def _get_description_text(self, el): desc_text = el.text if el.text else "" # If a 'sub' element is found, lets replace it with the id of the variable it references if get_element_tag_without_ns(el.tag) == "sub": desc_text += "'%s'" % el.attrib['idref'] for desc_el in el: desc_text += self._get_description_text(desc_el) desc_text += el.tail if el.tail else "" return desc_text
[docs] def get_element_text(self, el): el_tag = get_element_tag_without_ns(el.tag) if el_tag == "description": temp_text = self._get_description_text(el) else: temp_text = "".join(el.itertext()) return temp_text
[docs] def join_text_elements(self): """ This function collects the text of almost all subelements. Similar to what itertext() would do, except that this function skips some elements that are not relevant for comparison. This function also injects a line for each element whose text was collected, to facilitate tracking of where in the rule the text came from. """ text = "" for el in self.root: el_tag = get_element_tag_without_ns(el.tag) if el_tag == "fix": # We ignore the fix element because it has its own dedicated differ continue if el_tag == "reference" and el.get("href" == stig_ns): # We ignore references to DISA Benchmark Rules, # they have a format of SV-\d+r\d+_rule # and can change for non-text related changes continue el_text = self.get_element_text(el).strip() if el_text: text += "\n[%s]:\n" % el_tag text += el_text + "\n" return text
[docs] class XMLComponent(XMLElement): ''' Represents the element of the Data stream component that has relevant content. This make it easier to access contents pertaining to a SCAP component. ''' def __init__(self, root): super(XMLComponent, self).__init__(root)
[docs] def find_oval_definition(self, def_id): definitions = self.root.find("oval:definitions", self.ns) definition = definitions.find("oval:definition[@id='%s']" % (def_id), self.ns) return XMLOvalDefinition(definition)
[docs] def find_ocil_questionnaire(self, def_id): questionnaires = self.root.find("ocil:questionnaires", self.ns) questionnaire = questionnaires.find( "ocil:questionnaire[@id='%s']" % def_id, self.ns) return XMLOcilQuestionnaire(questionnaire)
[docs] def find_ocil_test_action(self, test_action_ref): test_actions = self.root.find("ocil:test_actions", self.ns) test_action = test_actions.find( "ocil:boolean_question_test_action[@id='%s']" % test_action_ref, self.ns) return XMLOcilTestAction(test_action)
[docs] def find_ocil_boolean_question(self, question_id): questions = self.root.find("ocil:questions", self.ns) question = questions.find( "ocil:boolean_question[@id='%s']" % question_id, self.ns) return XMLOcilQuestion(question)
[docs] def find_boolean_question(self, ocil_id): questionnaire = self.find_ocil_questionnaire(ocil_id) if questionnaire is None: raise ValueError("OCIL questionnaire %s doesn't exist" % ocil_id) test_action_ref = questionnaire.get_test_action_ref_element().text test_action = self.find_ocil_test_action(test_action_ref) if test_action is None: raise ValueError( "OCIL boolean_question_test_action %s doesn't exist" % ( test_action_ref)) question_id = test_action.get_attr("question_ref") question = self.find_ocil_boolean_question(question_id) if question is None: raise ValueError( "OCIL boolean_question %s doesn't exist" % question_id) question_text = question.get_question_test_element() return question_text.text
[docs] class XMLOvalDefinition(XMLComponent): def __init__(self, root): super(XMLOvalDefinition, self).__init__(root)
[docs] def get_criteria_element(self): return self.root.find("oval:criteria", self.ns)
[docs] def get_elements(self): criteria = self.get_criteria_element() elements = [] for child in criteria.iter(): # iter recurses el_tag = get_element_tag_without_ns(child.tag) if el_tag == "criteria": operator = child.get("operator") elements.append(("criteria", operator)) elif el_tag == "criterion": test_id = child.get("test_ref") elements.append(("criterion", test_id)) elif el_tag == "extend_definition": extend_def_id = child.get("definition_ref") elements.append(("extend_definition", extend_def_id)) return elements
[docs] class XMLOcilQuestionnaire(XMLComponent): def __init__(self, root): super(XMLOcilQuestionnaire, self).__init__(root)
[docs] def get_test_action_ref_element(self): return self.root.find( "ocil:actions/ocil:test_action_ref", self.ns)
[docs] class XMLOcilTestAction(XMLComponent): def __init__(self, root): super(XMLOcilTestAction, self).__init__(root)
[docs] class XMLOcilQuestion(XMLComponent): def __init__(self, root): super(XMLOcilQuestion, self).__init__(root)
[docs] def get_question_test_element(self): return self.root.find("ocil:question_text", self.ns)
[docs] class XMLCPEPlatform(XMLElement): def __init__(self, root): super(XMLCPEPlatform, self).__init__(root)
[docs] def find_all_check_fact_ref_elements(self): return self.root.findall(".//cpe-lang:check-fact-ref", self.ns)