Source code for ssg.templates

from __future__ import absolute_import
from __future__ import print_function

import os
import glob

from collections import namedtuple

import ssg.utils
from ssg.utils import mkdir_p
import ssg.yaml
import ssg.jinja
import ssg.build_yaml

from ssg.build_cpe import ProductCPEs

TemplatingLang = namedtuple(
    "templating_language_attributes",
    ["name", "file_extension", "template_type", "lang_specific_dir"])

TemplateType = ssg.utils.enum("REMEDIATION", "CHECK")

LANGUAGES = {
    "anaconda": TemplatingLang("anaconda", ".anaconda", TemplateType.REMEDIATION, "anaconda"),
    "ansible": TemplatingLang("ansible", ".yml",        TemplateType.REMEDIATION, "ansible"),
    "bash": TemplatingLang("bash", ".sh",               TemplateType.REMEDIATION, "bash"),
    "blueprint": TemplatingLang("blueprint", ".toml",   TemplateType.REMEDIATION, "blueprint"),
    "cpe-oval": TemplatingLang("cpe-oval", ".xml",      TemplateType.CHECK, "cpe-oval"),
    "ignition": TemplatingLang("ignition", ".yml",      TemplateType.REMEDIATION, "ignition"),
    "kubernetes": TemplatingLang("kubernetes", ".yml",  TemplateType.REMEDIATION, "kubernetes"),
    "oval": TemplatingLang("oval", ".xml",              TemplateType.CHECK,       "oval"),
    "puppet": TemplatingLang("puppet", ".pp",           TemplateType.REMEDIATION, "puppet"),
    "sce-bash": TemplatingLang("sce-bash", ".sh",       TemplateType.CHECK,       "sce")
}

PREPROCESSING_FILE_NAME = "template.py"
TEMPLATE_YAML_FILE_NAME = "template.yml"


[docs] def load_module(module_name, module_path): try: # Python 2.7 from imp import load_source return load_source(module_name, module_path) except ImportError: # https://docs.python.org/3/library/importlib.html#importing-a-source-file-directly import importlib spec = importlib.util.spec_from_file_location(module_name, module_path) if not spec: raise ValueError("Error loading '%s' module" % module_path) module = importlib.util.module_from_spec(spec) if not spec.loader: raise ValueError("Error loading '%s' module" % module_path) spec.loader.exec_module(module) return module
[docs] class Template: def __init__(self, templates_root_directory, name): self.langs = [] self.templates_root_directory = templates_root_directory self.name = name self.template_path = os.path.join(self.templates_root_directory, self.name) self.template_yaml_path = os.path.join(self.template_path, TEMPLATE_YAML_FILE_NAME) self.preprocessing_file_path = os.path.join(self.template_path, PREPROCESSING_FILE_NAME)
[docs] @classmethod def load_template(cls, templates_root_directory, name): maybe_template = cls(templates_root_directory, name) if maybe_template._looks_like_template(): maybe_template._load() return maybe_template return None
def _load(self): if not os.path.exists(self.preprocessing_file_path): self.preprocessing_file_path = None template_yaml = ssg.yaml.open_raw(self.template_yaml_path) for supported_lang in template_yaml["supported_languages"]: if supported_lang not in LANGUAGES.keys(): raise ValueError( "The template {0} declares to support the {1} language," "but this language is not supported by the content.".format( self.name, supported_lang)) lang = LANGUAGES[supported_lang] langfilename = lang.name + ".template" if not os.path.exists(os.path.join(self.template_path, langfilename)): raise ValueError( "The template {0} declares to support the {1} language," "but the implementation file is missing.".format(self.name, lang)) self.langs.append(lang)
[docs] def preprocess(self, parameters, lang): parameters = self._preprocess_with_template_module(parameters, lang) # TODO: Remove this right after the variables in templates are renamed to lowercase parameters = {k.upper(): v for k, v in parameters.items()} return parameters
def _preprocess_with_template_module(self, parameters, lang): if self.preprocessing_file_path is not None: unique_dummy_module_name = "template_" + self.name preprocess_mod = load_module( unique_dummy_module_name, self.preprocessing_file_path) if not hasattr(preprocess_mod, "preprocess"): msg = ( "The '{name}' template's preprocessing file {preprocessing_file} " "doesn't define the 'preprocess' function, which is probably an omission." .format(name=self.name, preprocessing_file=self.preprocessing_file_path) ) raise ValueError(msg) parameters = preprocess_mod.preprocess(parameters.copy(), lang) return parameters def _looks_like_template(self): if not os.path.isdir(self.template_path): return False if os.path.islink(self.template_path): return False template_sources = sorted(glob.glob(os.path.join(self.template_path, "*.template"))) if not os.path.isfile(self.template_yaml_path) and not template_sources: return False return True
[docs] class Builder(object): """ Class for building all templated content for a given product. To generate content from templates, pass the env_yaml, path to the directory with resolved rule YAMLs, path to the directory that contains templates, path to the output directory for checks and a path to the output directory for remediations into the constructor. Then, call the method build() to perform a build. """ def __init__(self, env_yaml, resolved_rules_dir, templates_dir, remediations_dir, checks_dir, platforms_dir, cpe_items_dir): self.env_yaml = env_yaml self.resolved_rules_dir = resolved_rules_dir self.templates_dir = templates_dir self.remediations_dir = remediations_dir self.checks_dir = checks_dir self.platforms_dir = platforms_dir self.cpe_items_dir = cpe_items_dir self.output_dirs = dict() self.templates = dict() self._init_lang_output_dirs() self._init_and_load_templates() self.product_cpes = ProductCPEs() if cpe_items_dir is not None: self.product_cpes.load_cpes_from_directory_tree(cpe_items_dir, self.env_yaml) def _init_and_load_templates(self): for item in sorted(os.listdir(self.templates_dir)): maybe_template = Template.load_template(self.templates_dir, item) if maybe_template is not None: self.templates[item] = maybe_template def _init_lang_output_dirs(self): for lang_name, lang in LANGUAGES.items(): lang_dir = lang.lang_specific_dir if lang.template_type == TemplateType.CHECK: output_dir = self.checks_dir else: output_dir = self.remediations_dir dir_ = os.path.join(output_dir, lang_dir) self.output_dirs[lang_name] = dir_
[docs] def get_resolved_langs_to_generate(self, templatable): """ Given a specific Templatable instance, determine which languages are generated by the combination of the template supported_languages AND the Templatable's template configuration 'backends'. """ template_name = templatable.get_template_name() if template_name not in self.templates.keys(): raise ValueError( "Templatable {0} uses template {1} which does not exist." .format(templatable, template_name)) template_langs = set(self.templates[template_name].langs) rule_langs = set(templatable.extract_configured_backend_lang(LANGUAGES)) return rule_langs.intersection(template_langs)
[docs] def process_template_lang_file(self, template_name, template_vars, lang, local_env_yaml): """ Processes template for a given template name and language and returns rendered content. """ if lang not in self.templates[template_name].langs: raise ValueError("Language {0} is not available for template {1}." .format(lang.name, template_name)) template_file_name = lang.name + ".template" template_file_path = os.path.join(self.templates_dir, template_name, template_file_name) template_parameters = self.templates[template_name].preprocess(template_vars, lang.name) env_yaml = self.env_yaml.copy() env_yaml.update(local_env_yaml) jinja_dict = ssg.utils.merge_dicts(env_yaml, template_parameters) return ssg.jinja.process_file_with_macros(template_file_path, jinja_dict)
[docs] def get_lang_contents_for_templatable(self, templatable, language): """ For the specified Templatable, build and return only the specified language content. """ template_name = templatable.get_template_name() template_vars = templatable.get_template_vars(self.env_yaml) # Checks and remediations are processed with a custom YAML dict local_env_yaml = templatable.get_template_context(self.env_yaml) try: return self.process_template_lang_file(template_name, template_vars, language, local_env_yaml) except Exception as e: raise RuntimeError("Unable to generate {0} template language for Templatable {1}: {2}" .format(language.name, templatable, e))
[docs] def write_lang_contents_for_templatable(self, filled_template, lang, templatable): output_file_name = templatable.id_ + lang.file_extension output_filepath = os.path.join(self.output_dirs[lang.name], output_file_name) with open(output_filepath, "w") as f: f.write(filled_template)
[docs] def build_lang_for_templatable(self, templatable, lang): """ Builds templated content of a given Templatable for a selected language returning the filled template. """ return self.get_lang_contents_for_templatable(templatable, lang)
[docs] def build_cpe(self, cpe): for lang in self.get_resolved_langs_to_generate(cpe): filled_template = self.build_lang_for_templatable(cpe, lang) if lang.template_type == TemplateType.REMEDIATION: cpe.set_conditional(lang.name, filled_template) if lang.template_type == TemplateType.CHECK: self.write_lang_contents_for_templatable(filled_template, lang, cpe) self.product_cpes.add_cpe_item(cpe) cpe_path = os.path.join(self.cpe_items_dir, cpe.id_+".yml") cpe.dump_yaml(cpe_path)
[docs] def build_platform(self, platform): """ Builds templated content of a given Platform (all CPEs/Symbols) for all available languages, writing the output to the correct build directories and updating the platform it self. """ langs_affecting_this_platform = set() for fact_ref in platform.test.get_symbols(): cpe = self.product_cpes.get_cpe_for_fact_ref(fact_ref) if cpe.is_templated(): self.build_cpe(cpe) langs_affecting_this_platform.update( self.get_resolved_langs_to_generate(cpe)) for lang in langs_affecting_this_platform: if lang.template_type == TemplateType.REMEDIATION: platform.update_conditional_from_cpe_items(lang.name, self.product_cpes) platform_path = os.path.join(self.platforms_dir, platform.id_+".yml") platform.dump_yaml(platform_path)
[docs] def build_rule(self, rule): """ Builds templated content of a given Rule for all available languages, writing the output to the correct build directories. """ for lang in self.get_resolved_langs_to_generate(rule): if lang.name != "sce-bash": filled_template = self.build_lang_for_templatable(rule, lang) self.write_lang_contents_for_templatable(filled_template, lang, rule)
[docs] def build_extra_ovals(self): declaration_path = os.path.join(self.templates_dir, "extra_ovals.yml") declaration = ssg.yaml.open_raw(declaration_path) for oval_def_id, template in declaration.items(): # Since OVAL definition ID in shorthand format is always the same # as rule ID, we can use it instead of the rule ID even if no rule # with that ID exists rule = ssg.build_yaml.Rule.get_instance_from_full_dict({ "id_": oval_def_id, "title": oval_def_id, "template": template, }) filled_template = self.build_lang_for_templatable(rule, LANGUAGES["oval"]) self.write_lang_contents_for_templatable(filled_template, LANGUAGES["oval"], rule)
[docs] def build_all_platforms(self): for platform_file in sorted(os.listdir(self.platforms_dir)): platform_path = os.path.join(self.platforms_dir, platform_file) platform = ssg.build_yaml.Platform.from_yaml(platform_path, self.env_yaml, self.product_cpes) self.build_platform(platform)
[docs] def build_all_rules(self): for rule_file in sorted(os.listdir(self.resolved_rules_dir)): rule_path = os.path.join(self.resolved_rules_dir, rule_file) try: rule = ssg.build_yaml.Rule.from_yaml(rule_path, self.env_yaml, self.product_cpes) except ssg.build_yaml.DocumentationNotComplete: # Happens on non-debug build when a rule is "documentation-incomplete" continue if rule.is_templated(): self.build_rule(rule)
[docs] def build(self): """ Builds all templated content for all languages, writing the output to the correct build directories. """ for dir_ in self.output_dirs.values(): mkdir_p(dir_) self.build_extra_ovals() self.build_all_rules() self.build_all_platforms()