import collections
import os
import copy
from glob import glob
import ssg.entities.common
import ssg.yaml
import ssg.utils
[docs]
class InvalidStatus(Exception):
pass
[docs]
class Status:
SUPPORTED = "supported"
PLANNED = "planned"
PENDING = "pending"
PARTIAL = "partial"
NOT_APPLICABLE = "not applicable"
MANUAL = "manual"
INHERENTLY_MET = "inherently met"
DOES_NOT_MEET = "does not meet"
DOCUMENTATION = "documentation"
AUTOMATED = "automated"
def __init__(self, status):
self.status = status
[docs]
@classmethod
def get_status_list(cls):
valid_statuses = [
cls.AUTOMATED,
cls.DOCUMENTATION,
cls.DOES_NOT_MEET,
cls.INHERENTLY_MET,
cls.MANUAL,
cls.NOT_APPLICABLE,
cls.PARTIAL,
cls.PENDING,
cls.PLANNED,
cls.SUPPORTED,
]
return valid_statuses
[docs]
@classmethod
def from_control_info(cls, ctrl, status):
if status is None:
return cls.PENDING
valid_statuses = cls.get_status_list()
if status not in valid_statuses:
raise InvalidStatus(
"The given status '{given}' in the control '{control}' "
"was invalid. Please use one of "
"the following: {valid}".format(given=status,
control=ctrl,
valid=valid_statuses))
return status
def __str__(self):
return self.status
def __eq__(self, other):
if isinstance(other, Status):
return self.status == other.status
elif isinstance(other, str):
return self.status == other
return False
[docs]
class Control(ssg.entities.common.SelectionHandler, ssg.entities.common.XCCDFEntity):
KEYS = dict(
id=str,
levels=list,
notes=str,
title=str,
description=str,
rationale=str,
automated=str,
status=None,
mitigation=str,
artifact_description=str,
status_justification=str,
fixtext=str,
check=str,
tickets=list,
original_title=str,
related_rules=list,
rules=list,
controls=list,
)
MANDATORY_KEYS = {
"title",
}
def __init__(self):
super(Control, self).__init__()
self.id = None
self.levels = []
self.notes = ""
self.title = ""
self.description = ""
self.rationale = ""
self.automated = ""
self.status = None
self.mitigation = ""
self.artifact_description = ""
self.status_justification = ""
self.fixtext = ""
self.check = ""
self.controls = []
self.tickets = []
self.original_title = ""
self.related_rules = []
self.rules = []
def __hash__(self):
""" Controls are meant to be unique, so using the
ID should suffice"""
return hash(self.id)
@classmethod
def _check_keys(cls, control_dict):
for key in control_dict.keys():
# Rules shouldn't be in KEYS that data is in selections
if key not in cls.KEYS.keys() and key not in ['rules', ]:
raise ValueError("Key %s is not allowed in a control file." % key)
[docs]
@classmethod
def from_control_dict(cls, control_dict, env_yaml=None, default_level=["default"]):
cls._check_keys(control_dict)
control = cls()
control.id = ssg.utils.required_key(control_dict, "id")
control.title = control_dict.get("title")
control.description = control_dict.get("description")
control.rationale = control_dict.get("rationale")
control.status = Status.from_control_info(control.id, control_dict.get("status", None))
control.automated = control_dict.get("automated", "no")
control.status_justification = control_dict.get('status_justification')
control.artifact_description = control_dict.get('artifact_description')
control.mitigation = control_dict.get('mitigation')
control.fixtext = control_dict.get('fixtext')
control.check = control_dict.get('check')
control.tickets = control_dict.get('tickets')
control.original_title = control_dict.get('original_title')
control.related_rules = control_dict.get('related_rules')
control.rules = control_dict.get('rules')
if control.status == "automated":
control.automated = "yes"
if control.automated not in ["yes", "no", "partially"]:
msg = (
"Invalid value '%s' of automated key in control "
"%s '%s'. Can be only 'yes', 'no', 'partially'."
% (control.automated, control.id, control.title))
raise ValueError(msg)
control.levels = control_dict.get("levels", default_level)
control.notes = control_dict.get("notes", "")
selections = control_dict.get("rules", {})
control.selections = selections
control.related_rules = control_dict.get("related_rules", [])
control.rules = control_dict.get("rules", [])
return control
[docs]
def represent_as_dict(self):
data = super(Control, self).represent_as_dict()
data["rules"] = self.selections
data["controls"] = self.controls
return data
[docs]
class Level(ssg.entities.common.XCCDFEntity):
KEYS = dict(
id=lambda: str,
inherits_from=lambda: None,
)
def __init__(self):
self.id = None
self.inherits_from = None
[docs]
@classmethod
def from_level_dict(cls, level_dict):
level = cls()
level.id = ssg.utils.required_key(level_dict, "id")
level.inherits_from = level_dict.get("inherits_from")
return level
[docs]
class Policy(ssg.entities.common.XCCDFEntity):
def __init__(self, filepath, env_yaml=None):
self.id = None
self.env_yaml = env_yaml
self.filepath = filepath
self.controls_dir = os.path.splitext(filepath)[0]
self.controls = []
self.controls_by_id = dict()
self.levels = []
self.levels_by_id = dict()
self.title = ""
self.source = ""
[docs]
def represent_as_dict(self):
data = dict()
data["id"] = self.id
data["title"] = self.title
data["source"] = self.source
data["definition_location"] = self.filepath
data["controls"] = [c.represent_as_dict() for c in self.controls]
data["levels"] = [l.represent_as_dict() for l in self.levels]
return data
@property
def default_level(self):
result = ["default"]
if self.levels:
result = [self.levels[0].id]
return result
[docs]
def check_all_rules_exist(self, existing_rules):
for c in self.controls:
nonexisting_rules = set(c.selected) - existing_rules
if nonexisting_rules:
msg = "Control %s:%s contains nonexisting rule(s) %s" % (
self.id, c.id, ", ".join(nonexisting_rules))
raise ValueError(msg)
[docs]
def remove_selections_not_known(self, known_rules):
for c in self.controls:
selections = set(c.selected).intersection(known_rules)
c.selected = list(selections)
unselections = set(c.unselected).intersection(known_rules)
c.unselected = list(unselections)
def _create_control_from_subtree(self, subtree):
try:
control = Control.from_control_dict(
subtree, self.env_yaml, default_level=self.default_level)
except Exception as exc:
msg = (
"Unable to parse controls from {filename}: {error}"
.format(filename=self.filepath, error=str(exc)))
raise RuntimeError(msg)
return control
def _extract_and_record_subcontrols(self, current_control, controls_tree):
subcontrols = []
if "controls" not in controls_tree:
return subcontrols
for control_def_or_ref in controls_tree["controls"]:
if isinstance(control_def_or_ref, str):
control_ref = control_def_or_ref
current_control.controls.append(control_ref)
continue
control_def = control_def_or_ref
for sc in self._parse_controls_tree([control_def]):
current_control.controls.append(sc.id)
current_control.update_with(sc)
subcontrols.append(sc)
return subcontrols
def _parse_controls_tree(self, tree):
controls = []
for control_subtree in tree:
control = self._create_control_from_subtree(control_subtree)
subcontrols = self._extract_and_record_subcontrols(control, control_subtree)
controls.extend(subcontrols)
controls.append(control)
return controls
[docs]
def save_controls_tree(self, tree):
for c in self._parse_controls_tree(tree):
self.controls.append(c)
self.controls_by_id[c.id] = c
def _parse_file_into_control_trees(self, dirname, basename):
controls_trees = []
if basename.endswith('.yml'):
full_path = os.path.join(dirname, basename)
yaml_contents = ssg.yaml.open_and_expand(full_path, self.env_yaml)
for control in yaml_contents['controls']:
controls_trees.append(control)
elif basename.startswith('.'):
pass
else:
raise RuntimeError("Found non yaml file in %s" % self.controls_dir)
return controls_trees
def _load_from_subdirectory(self, yaml_contents):
controls_tree = yaml_contents.get("controls", list())
files = os.listdir(self.controls_dir)
for file in files:
trees = self._parse_file_into_control_trees(self.controls_dir, file)
controls_tree.extend(trees)
return controls_tree
[docs]
def load(self):
yaml_contents = ssg.yaml.open_and_expand(self.filepath, self.env_yaml)
controls_dir = yaml_contents.get("controls_dir")
if controls_dir:
self.controls_dir = os.path.join(os.path.dirname(self.filepath), controls_dir)
self.id = ssg.utils.required_key(yaml_contents, "id")
self.title = ssg.utils.required_key(yaml_contents, "title")
self.source = yaml_contents.get("source", "")
level_list = yaml_contents.get("levels", [])
for lv in level_list:
level = Level.from_level_dict(lv)
self.levels.append(level)
self.levels_by_id[level.id] = level
if os.path.exists(self.controls_dir) and os.path.isdir(self.controls_dir):
controls_tree = self._load_from_subdirectory(yaml_contents)
else:
controls_tree = ssg.utils.required_key(yaml_contents, "controls")
self.save_controls_tree(controls_tree)
[docs]
def get_control(self, control_id):
try:
c = self.controls_by_id[control_id]
return c
except KeyError:
msg = "%s not found in policy %s" % (
control_id, self.id
)
raise ValueError(msg)
[docs]
def get_level(self, level_id):
try:
lv = self.levels_by_id[level_id]
return lv
except KeyError:
msg = "Level %s not found in policy %s" % (
level_id, self.id
)
raise ValueError(msg)
[docs]
def get_level_with_ancestors_sequence(self, level_id):
# use OrderedDict for Python2 compatibility instead of ordered set
levels = collections.OrderedDict()
level = self.get_level(level_id)
levels[level] = ""
if level.inherits_from:
for lv in level.inherits_from:
eligible_levels = [l for l in self.get_level_with_ancestors_sequence(lv) if l not in levels.keys()]
for l in eligible_levels:
levels[l] = ""
return list(levels.keys())
[docs]
class ControlsManager():
def __init__(self, controls_dir, env_yaml=None, existing_rules=None):
self.controls_dir = os.path.abspath(controls_dir)
self.env_yaml = env_yaml
self.existing_rules = existing_rules
self.policies = {}
[docs]
def load(self):
if not os.path.exists(self.controls_dir):
return
for filename in sorted(glob(os.path.join(self.controls_dir, "*.yml"))):
filepath = os.path.join(self.controls_dir, filename)
policy = Policy(filepath, self.env_yaml)
policy.load()
self.policies[policy.id] = policy
self.check_all_rules_exist()
self.resolve_controls()
[docs]
def check_all_rules_exist(self):
if self.existing_rules is None:
return
for p in self.policies.values():
p.check_all_rules_exist(self.existing_rules)
[docs]
def remove_selections_not_known(self, known_rules):
known_rules = set(known_rules)
for p in self.policies.values():
p.remove_selections_not_known(known_rules)
[docs]
def resolve_controls(self):
for pid, policy in self.policies.items():
for control in policy.controls:
self._resolve_control(pid, control)
def _resolve_control(self, pid, control):
for sub_name in control.controls:
policy_id = pid
if ":" in sub_name:
policy_id, sub_name = sub_name.split(":", 1)
subcontrol = self.get_control(policy_id, sub_name)
self._resolve_control(pid, subcontrol)
control.update_with(subcontrol)
[docs]
def get_control(self, policy_id, control_id):
policy = self._get_policy(policy_id)
control = policy.get_control(control_id)
return control
def _get_policy(self, policy_id):
try:
policy = self.policies[policy_id]
except KeyError:
msg = "policy '%s' doesn't exist" % (policy_id)
raise ValueError(msg)
return policy
[docs]
def get_all_controls_of_level(self, policy_id, level_id):
policy = self._get_policy(policy_id)
levels = policy.get_level_with_ancestors_sequence(level_id)
all_policy_controls = self.get_all_controls(policy_id)
eligible_controls = []
already_defined_variables = set()
# we will go level by level, from top to bottom
# this is done to enable overriding of variables by higher levels
for lv in levels:
for control in all_policy_controls:
if lv.id not in control.levels:
continue
variables = set(control.variables.keys())
variables_to_remove = variables.intersection(already_defined_variables)
already_defined_variables.update(variables)
new_c = self._get_control_without_variables(variables_to_remove, control)
eligible_controls.append(new_c)
return eligible_controls
@staticmethod
def _get_control_without_variables(variables_to_remove, control):
if not variables_to_remove:
return control
new_c = copy.deepcopy(control)
for var in variables_to_remove:
del new_c.variables[var]
return new_c
[docs]
def get_all_controls(self, policy_id):
policy = self._get_policy(policy_id)
return policy.controls_by_id.values()
[docs]
def save_everything(self, output_dir):
ssg.utils.mkdir_p(output_dir)
for policy_id, policy in self.policies.items():
filename = os.path.join(output_dir, "{}.{}".format(policy_id, "yml"))
policy.dump_yaml(filename)