Source code for ssg.rule_dir_stats

"""
This module contains common code shared by utils/rule_dir_stats.py and
utils/rule_dir_diff.py. This code includes functions for walking the output
of the utils/rule_dir_json.py script, and filtering functions used in both
scripts.
"""

from __future__ import absolute_import
from __future__ import print_function

import os
from collections import defaultdict

from .build_remediations import REMEDIATION_TO_EXT_MAP as REMEDIATION_MAP
from .utils import subset_dict


[docs] def get_affected_products(rule_obj): """ From a rule_obj, return the set of affected products from rule.yml """ return set(rule_obj['products'])
[docs] def get_all_affected_products(args, rule_obj): """ From a rule_obj, return the set of affected products from rule.yml, and all fixes and checks. If args.strict is set, this function is equivalent to get_affected_products. Otherwise, it includes ovals and fix content based on the values of args.fixes_only and args.ovals_only. """ affected_products = get_affected_products(rule_obj) if args.strict: return affected_products if not args.fixes_only: for product in rule_obj['oval_products']: affected_products.add(product) if not args.ovals_only: for product in rule_obj['remediation_products']: affected_products.add(product) return affected_products
def _walk_rule(args, rule_obj, oval_func, remediation_func, verbose_output): """ Walks a single rule and updates verbose_output if visited. Returns visited state as a boolean. Internal function for walk_rules and walk_rules_parallel. """ rule_id = rule_obj['id'] affected_products = get_all_affected_products(args, rule_obj) if not affected_products.intersection(args.products): return False if args.query and rule_id not in args.query: return False if not args.fixes_only: result = oval_func(rule_obj) if result: verbose_output[rule_id]['oval'] = result if not args.ovals_only: for r_type in REMEDIATION_MAP: result = remediation_func(rule_obj, r_type) if result: verbose_output[rule_id][r_type] = result return True
[docs] def walk_rules(args, known_rules, oval_func, remediation_func): """ Walk a dictionary of known_rules, returning the number of visited rules and the output at each visited rule, conditionally calling oval_func and remediation_func based on the values of args.fixes_only and args.ovals_only. If the result of these functions are not Falsy, set the appropriate output content. The input rule_obj structure is the value of known_rules[rule_id]. The output structure is a dict as follows:: { rule_id: { "oval": oval_func(args, rule_obj), "ansible": remediation_func(args, "ansible", rule_obj), "anaconda": remediation_func(args, "anaconda", rule_obj), "bash": remediation_func(args, "bash", rule_obj), "puppet": remediation_func(args, "puppet", rule_obj) }, ... } The arguments supplied to oval_func are args and rule_obj. The arguments supplied to remediation_func are args, the remediation type, and rule_obj. """ affected_rules = 0 verbose_output = defaultdict(lambda: defaultdict(lambda: None)) for rule_id in known_rules: rule_obj = known_rules[rule_id] if _walk_rule(args, rule_obj, oval_func, remediation_func, verbose_output): affected_rules += 1 return affected_rules, verbose_output
[docs] def walk_rule_stats(rule_output): """ Walk the output of a rule, generating statistics about affected ovals, remediations, and generating verbose output in a stable order. Returns a tuple of (affected_ovals, affected_remediations, all_affected_remediations, affected_remediations_type, all_output) """ affected_ovals = 0 affected_remediations = 0 all_affected_remediations = 0 affected_remediations_type = defaultdict(lambda: 0) all_output = [] affected_remediation = False all_remedation = True if 'oval' in rule_output: affected_ovals += 1 all_output.append(rule_output['oval']) for r_type in sorted(REMEDIATION_MAP): if r_type in rule_output: affected_remediation = True affected_remediations_type[r_type] += 1 all_output.append(rule_output[r_type]) else: all_remedation = False if affected_remediation: affected_remediations += 1 if all_remedation: all_affected_remediations += 1 return (affected_ovals, affected_remediations, all_affected_remediations, affected_remediations_type, all_output)
[docs] def walk_rules_stats(args, known_rules, oval_func, remediation_func): """ Walk a dictionary of known_rules and generate simple aggregate statistics for all visited rules. The oval_func and remediation_func arguments behave according to walk_rules(). Returned values are visited_rules, affected_ovals, affected_remediation, a dictionary containing all fix types and the quantity of affected fixes, and the ordered output of all functions. An effort is made to provide consistently ordered verbose_output by sorting all visited keys and the keys of ssg.build_remediations.REMEDIATION_MAP. """ affected_rules, verbose_output = walk_rules(args, known_rules, oval_func, remediation_func) affected_ovals = 0 affected_remediations = 0 all_affected_remediations = 0 affected_remediations_type = defaultdict(lambda: 0) all_output = [] for rule_id in sorted(verbose_output): rule_output = verbose_output[rule_id] results = walk_rule_stats(rule_output) affected_ovals += results[0] affected_remediations += results[1] all_affected_remediations += results[2] for key in results[3]: affected_remediations_type[key] += results[3][key] all_output.extend(results[4]) return (affected_rules, affected_ovals, affected_remediations, all_affected_remediations, affected_remediations_type, all_output)
[docs] def walk_rules_parallel(args, left_rules, right_rules, oval_func, remediation_func): """ Walks two sets of known_rules (left_rules and right_rules) with identical keys and returns left_only, right_only, and common_only output from _walk_rule. If the outputted data for a rule when called on left_rules and right_rules is the same, it is added to common_only. Only rules which output different data will have their data added to left_only and right_only respectively. Can assert. """ left_affected_rules = 0 right_affected_rules = 0 common_affected_rules = 0 left_verbose_output = defaultdict(lambda: defaultdict(lambda: None)) right_verbose_output = defaultdict(lambda: defaultdict(lambda: None)) common_verbose_output = defaultdict(lambda: defaultdict(lambda: None)) assert set(left_rules) == set(right_rules) for rule_id in left_rules: left_rule_obj = left_rules[rule_id] right_rule_obj = right_rules[rule_id] if left_rule_obj == right_rule_obj: if _walk_rule(args, left_rule_obj, oval_func, remediation_func, common_verbose_output): common_affected_rules += 1 else: left_temp = defaultdict(lambda: defaultdict(lambda: None)) right_temp = defaultdict(lambda: defaultdict(lambda: None)) left_ret = _walk_rule(args, left_rule_obj, oval_func, remediation_func, left_temp) right_ret = _walk_rule(args, right_rule_obj, oval_func, remediation_func, right_temp) if left_ret == right_ret and left_temp == right_temp: common_verbose_output.update(left_temp) if left_ret: common_affected_rules += 1 else: left_verbose_output.update(left_temp) right_verbose_output.update(right_temp) if left_ret: left_affected_rules += 1 if right_ret: right_affected_rules += 1 left_only = (left_affected_rules, left_verbose_output) right_only = (right_affected_rules, right_verbose_output) common_only = (common_affected_rules, common_verbose_output) return left_only, right_only, common_only
[docs] def walk_rules_diff(args, left_rules, right_rules, oval_func, remediation_func): """ Walk a two dictionary of known_rules (left_rules and right_rules) and generate five sets of output: left_only rules output, right_only rules output, shared left output, shared right output, and shared common output, as a five-tuple, where each tuple element is equivalent to walk_rules on the appropriate set of rules. Does not understand renaming of rule_ids as this would depend on disk content to reflect these differences. Unless significantly more data is added to the rule_obj structure (contents of rule.yml, ovals, remediations, etc.), all information besides 'title' is not uniquely identifying or could be easily updated. """ left_rule_ids = set(left_rules) right_rule_ids = set(right_rules) left_only_rule_ids = left_rule_ids.difference(right_rule_ids) right_only_rule_ids = right_rule_ids.difference(left_rule_ids) common_rule_ids = left_rule_ids.intersection(right_rule_ids) left_restricted = subset_dict(left_rules, left_only_rule_ids) left_common = subset_dict(left_rules, common_rule_ids) right_restricted = subset_dict(right_rules, right_only_rule_ids) right_common = subset_dict(right_rules, common_rule_ids) left_only_data = walk_rules(args, left_restricted, oval_func, remediation_func) right_only_data = walk_rules(args, right_restricted, oval_func, remediation_func) l_c_d, r_c_d, c_d = walk_rules_parallel(args, left_common, right_common, oval_func, remediation_func) left_changed_data = l_c_d right_changed_data = r_c_d common_data = c_d return (left_only_data, right_only_data, left_changed_data, right_changed_data, common_data)
[docs] def walk_rules_diff_stats(results): """ Takes the results of walk_rules_diff (results) and generates five sets of output statistics: left_only rules output, right_only rules output, shared left output, shared right output, and shared common output, as a five-tuple, where each tuple element is equivalent to walk_rules_stats on the appropriate set of rules. Can assert. """ assert len(results) == 5 output_data = [] for data in results: affected_rules, verbose_output = data affected_ovals = 0 affected_remediations = 0 all_affected_remediations = 0 affected_remediations_type = defaultdict(lambda: 0) all_output = [] for rule_id in sorted(verbose_output): rule_output = verbose_output[rule_id] _results = walk_rule_stats(rule_output) affected_ovals += _results[0] affected_remediations += _results[1] all_affected_remediations += _results[2] for key in _results[3]: affected_remediations_type[key] += _results[3][key] all_output.extend(_results[4]) output_data.append((affected_rules, affected_ovals, affected_remediations, all_affected_remediations, affected_remediations_type, all_output)) assert len(output_data) == 5 return tuple(output_data)
[docs] def filter_rule_ids(all_keys, queries): """ From a set of queries (a comma separated list of queries, where a query is either a rule id or a substring thereof), return the set of matching keys from all_keys. When queries is the literal string "all", return all of the keys. """ if not queries: return set() if queries == 'all': return set(all_keys) # We assume that all_keys is much longer than queries; this allows us to do # len(all_keys) iterations of size len(query_parts) instead of len(query_parts) # queries of size len(all_keys) -- which hopefully should be a faster data access # pattern due to caches but in reality shouldn't matter. Note that we have to iterate # over the keys in all_keys either way, because we wish to check whether query is a # substring of a key, not whether query is a key. # # This does have the side-effect of not having the results be ordered according to # their order in query_parts, so we instead, we intentionally discard order by using # a set. This also guarantees that our results are unique. results = set() query_parts = queries.split(',') for key in all_keys: for query in query_parts: if query in key: results.add(key) return results
[docs] def missing_oval(rule_obj): """ For a rule object, check if it is missing an oval. """ rule_id = rule_obj['id'] check = len(rule_obj['ovals']) > 0 if not check: return "\trule_id:%s is missing all OVALs" % rule_id
[docs] def missing_remediation(rule_obj, r_type): """ For a rule object, check if it is missing a remediation of type r_type. """ rule_id = rule_obj['id'] check = (r_type in rule_obj['remediations'] and len(rule_obj['remediations'][r_type]) > 0) if not check: return "\trule_id:%s is missing %s remediations" % (rule_id, r_type)
[docs] def two_plus_oval(rule_obj): """ For a rule object, check if it has two or more OVALs. """ rule_id = rule_obj['id'] check = len(rule_obj['ovals']) >= 2 if check: return "\trule_id:%s has two or more OVALs: %s" % (rule_id, ','.join(rule_obj['ovals']))
[docs] def two_plus_remediation(rule_obj, r_type): """ For a rule object, check if it has two or more remediations of type r_type. """ rule_id = rule_obj['id'] check = (r_type in rule_obj['remediations'] and len(rule_obj['remediations'][r_type]) >= 2) if check: return "\trule_id:%s has two or more %s remediations: %s" % \ (rule_id, r_type, ','.join(rule_obj['remediations'][r_type]))
[docs] def product_names_oval(rule_obj): """ For a rule_obj, check the scope of the platforms versus the product name of the OVAL objects. """ rule_id = rule_obj['id'] for oval_name in rule_obj['ovals']: if oval_name == "shared.xml": continue oval_product, _ = os.path.splitext(oval_name) for product in rule_obj['ovals'][oval_name]['products']: if product != oval_product: return "\trule_id:%s has a different product and OVALs names: %s is not %s" % \ (rule_id, product, oval_product)
[docs] def product_names_remediation(rule_obj, r_type): """ For a rule_obj, check the scope of the platforms versus the product name of the remediations of type r_type. """ rule_id = rule_obj['id'] for r_name in rule_obj['remediations'][r_type]: r_product, _ = os.path.splitext(r_name) if r_product == "shared": continue for product in rule_obj['remediations'][r_type][r_name]['products']: if product != r_product: return "\trule_id:%s has a different product and %s remediation names: %s is not %s" % \ (rule_id, r_type, product, r_product)