Source code for utils.rule_dir_json

#!/usr/bin/python3

from __future__ import print_function

import argparse
import os
import sys
from collections import defaultdict

import json

import ssg.build_yaml
import ssg.oval
import ssg.build_remediations
import ssg.products
import ssg.rules
import ssg.yaml


SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
BUILD_OUTPUT = os.path.join(SSG_ROOT, "build", "rule_dirs.json")


[docs] def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("-r", "--root", type=str, action="store", default=SSG_ROOT, help="Path to SSG root directory (defaults to %s)" % SSG_ROOT) parser.add_argument("-o", "--output", type=str, action="store", default=BUILD_OUTPUT, help="File to write json output to (defaults to build/rule_dirs.json)") parser.add_argument("-q", "--quiet", action="store_true", help="Hides output from the script, just creates the file.") return parser.parse_args()
[docs] def walk_products(root, all_products): visited_dirs = set() all_rule_dirs = [] product_yamls = {} # There is a one-to-many mapping of guide_dir->product. This means that # we first need to walk all products, gather their guide_dirs, and then # come back and walk the guide_dirs so we have a complete list of products # to evaluate each under. product_to_guide = dict() guide_to_products = defaultdict(set) for product in sorted(all_products): product_dir = os.path.join(root, "products", product) product_yaml_path = os.path.join(product_dir, "product.yml") product_yaml = ssg.products.load_product_yaml(product_yaml_path) product_yaml.read_properties_from_directory(os.path.join(root, "product_properties")) product_yamls[product] = product_yaml guide_dir = os.path.join(product_dir, product_yaml['benchmark_root']) guide_dir = os.path.abspath(guide_dir) product_to_guide[product] = guide_dir guide_to_products[guide_dir].add(product) for product in sorted(all_products): product_dir = os.path.join(root, product) product_yaml = product_yamls[product] guide_dir = product_to_guide[product] potential_products = list(guide_to_products[guide_dir]) if guide_dir not in visited_dirs: for rule_id, rule_dir in collect_rule_ids_and_dirs(guide_dir): all_rule_dirs.append((rule_id, rule_dir, guide_dir, potential_products)) visited_dirs.add(guide_dir) additional_content_directories = product_yaml.get("additional_content_directories", []) if not additional_content_directories: continue for add_content_dirs in additional_content_directories: cur_dir = os.path.abspath(os.path.join(product_dir, add_content_dirs)) if cur_dir not in visited_dirs: for rule_id, rule_dir in collect_rule_ids_and_dirs(cur_dir): all_rule_dirs.append((rule_id, rule_dir, cur_dir, [product])) visited_dirs.add(cur_dir) return all_rule_dirs, product_yamls
[docs] def collect_rule_ids_and_dirs(rules_dir): for rule_dir in sorted(ssg.rules.find_rule_dirs(rules_dir)): yield ssg.rules.get_rule_dir_id(rule_dir), rule_dir
[docs] def handle_rule_yaml(product_list, product_yamls, rule_id, rule_dir, guide_dir): rule_obj = {'id': rule_id, 'dir': rule_dir, 'guide': guide_dir} rule_file = ssg.rules.get_rule_dir_yaml(rule_dir) prod_type = product_list[0] env_yaml = dict() env_yaml.update(product_yamls[prod_type]) rule_yaml = ssg.build_yaml.Rule.from_yaml(rule_file, env_yaml) rule_products = set() for product in product_list: rule_products.add(product) rule_products = sorted(rule_products) rule_obj['products'] = rule_products rule_obj['title'] = rule_yaml.title rule_obj['identifiers'] = rule_yaml.identifiers return rule_obj
[docs] def handle_ovals(product_list, product_yamls, rule_obj): rule_dir = rule_obj['dir'] rule_ovals = {} oval_products = defaultdict(set) for oval_path in ssg.rules.get_rule_dir_ovals(rule_dir): oval_name = os.path.basename(oval_path) oval_product, _ = os.path.splitext(oval_name) oval_obj = {'name': oval_name, 'product': oval_product} platforms = ssg.oval.applicable_platforms(oval_path) cs_platforms = ','.join(platforms) oval_obj['platforms'] = platforms oval_obj['products'] = set() for product in product_list: if ssg.utils.is_applicable(cs_platforms, product): oval_products[product].add(oval_name) oval_obj['products'].add(product) oval_obj['products'] = sorted(oval_obj['products']) rule_ovals[oval_name] = oval_obj return rule_ovals, oval_products
[docs] def handle_remediations(product_list, product_yamls, rule_obj): rule_dir = rule_obj['dir'] rule_remediations = {} r_products = defaultdict(set) for r_type in ssg.build_remediations.REMEDIATION_TO_EXT_MAP: rule_remediations[r_type] = {} r_paths = ssg.build_remediations.get_rule_dir_remediations(rule_dir, r_type) for r_path in r_paths: r_name = os.path.basename(r_path) r_product, r_ext = os.path.splitext(r_name) r_obj = {'type': r_type, 'name': r_name, 'product': r_product, 'ext': r_ext[1:]} prod_type = product_list[0] if r_product != 'shared' and r_product in product_list: prod_type = r_product product_yaml = product_yamls[prod_type] env_yaml = dict() env_yaml.update(product_yaml) _, config = ssg.build_remediations.parse_from_file_with_jinja( r_path, env_yaml ) platforms = config['platform'] if not platforms: print(config['platform']) r_obj['platforms'] = sorted(map(lambda x: x.strip(), platforms.split(','))) r_obj['products'] = set() for product in product_list: if ssg.utils.is_applicable(platforms, product): r_products[product].add(r_name) r_obj['products'].add(product) r_obj['products'] = sorted(r_obj['products']) rule_remediations[r_type][r_name] = r_obj return rule_remediations, r_products
[docs] def quiet_print(msg, quiet, file): if not quiet: print(msg, file=file)
[docs] def main(): args = parse_args() linux_products, other_products = ssg.products.get_all(args.root) all_products = linux_products.union(other_products) all_rule_dirs, product_yamls = walk_products(args.root, all_products) known_rules = {} for rule_id, rule_dir, guide_dir, given_products in all_rule_dirs: try: rule_obj = handle_rule_yaml(given_products, product_yamls, rule_id, rule_dir, guide_dir) except ssg.yaml.DocumentationNotComplete: # Happens on non-debug build when a rule is "documentation-incomplete" continue rule_obj['ovals'], oval_products = handle_ovals(given_products, product_yamls, rule_obj) rule_obj['remediations'], r_products = handle_remediations(given_products, product_yamls, rule_obj) # Validate oval products for key in oval_products: oval_products[key] = sorted(oval_products[key]) if len(oval_products[key]) > 1: all_ovals = ','.join(oval_products[key]) msg = "Product {0} has multiple ovals in rule {1}: {2}" msg = msg.format(key, rule_id, all_ovals) quiet_print(msg, args.quiet, sys.stderr) rule_obj['oval_products'] = oval_products # Validate remediation products for key in r_products: r_products[key] = sorted(r_products[key]) if len(r_products[key]) > 1: exts = sorted(map(lambda x: os.path.splitext(x)[1], r_products[key])) if len(exts) != len(set(exts)): all_fixes = ','.join(r_products[key]) msg = "Product {0} has multiple remediations of the same type " msg += "in rule {1}: {2}" msg = msg.format(key, rule_id, all_fixes) quiet_print(msg, args.quiet, sys.stderr) rule_obj['remediation_products'] = r_products if rule_id in known_rules: raise ValueError("Two rules with same identifier exist: " + rule_id) known_rules[rule_id] = rule_obj f = open(args.output, 'w') json.dump(known_rules, f) if not f.closed: f.flush() f.close()
if __name__ == "__main__": main()