tests.ssg_test_suite package

tests.ssg_test_suite.combined module

class tests.ssg_test_suite.combined.CombinedChecker(test_env)[source]

Bases: RuleChecker

Combined mode works like pretty much like the Rule mode - for every rule selected in a profile:

  • Alter the system.

  • Run the scan, check that the result meets expectations. If the test scenario passed as requested, return True, if it failed or passed unexpectedly, return False.

The following sequence applies if the initial scan has failed as expected:

  • If there are no remediations, return True.

  • Run remediation, return False if it failed.

  • Return result of the final scan of remediated system.

If a rule doesn’t have any test scenario, it is skipped. Skipped rules are reported at the end.

test_rule(state, rule, scenarios)[source]
tests.ssg_test_suite.combined.perform_combined_check(options)[source]

tests.ssg_test_suite.common module

class tests.ssg_test_suite.common.RuleResult(result_dict=None)[source]

Bases: object

STAGE_STRINGS = {'final_scan', 'initial_scan', 'preparation', 'remediation'}

Result of a test suite testing rule under a scenario.

Supports ordering by success - the most successful run orders first.

load_from_dict(data)[source]
record_stage_result(stage, successful)[source]
relative_conditions_to(other)[source]
save_to_dict()[source]
class tests.ssg_test_suite.common.Scenario_conditions(backend, scanning_mode, remediated_by, datastream)

Bases: tuple

backend

Alias for field number 0

datastream

Alias for field number 3

remediated_by

Alias for field number 2

scanning_mode

Alias for field number 1

class tests.ssg_test_suite.common.Scenario_run(rule_id, script)

Bases: tuple

rule_id

Alias for field number 0

script

Alias for field number 1

class tests.ssg_test_suite.common.Stage[source]

Bases: object

FINAL_SCAN = 4
INITIAL_SCAN = 2
NONE = 0
PREPARATION = 1
REMEDIATION = 3
tests.ssg_test_suite.common.cpe_to_platform(cpe)[source]
tests.ssg_test_suite.common.cpes_to_platform(cpes)[source]
tests.ssg_test_suite.common.create_tarball(test_content_by_rule_id)[source]

Create a tarball which contains all test scenarios and additional content for every rule that is selected to be tested. The tarball contains directories with the test scenarios. The name of the directories is the same as short rule ID. There is no tree structure.

tests.ssg_test_suite.common.fetch_all_templated_tests_paths(rule_template)[source]

Builds a dictionary of a test case relative path -> test case absolute path mapping.

Here, we want to know what the relative path on disk (under the tests/ subdirectory) is (such as “installed.pass.sh”), along with the actual absolute path.

tests.ssg_test_suite.common.fetch_local_tests_paths(tests_dir)[source]
tests.ssg_test_suite.common.fetch_templated_tests_paths(rule_namedtuple, product_yaml)[source]
tests.ssg_test_suite.common.file_known_as_useless(file_name)[source]
tests.ssg_test_suite.common.get_cpe_of_tested_os(test_env, log_file)[source]
tests.ssg_test_suite.common.get_prefixed_name(state_name)[source]
tests.ssg_test_suite.common.get_product_context(product_id=None)[source]

Returns a product YAML context if any product is specified. Hard-coded to assume a debug build.

tests.ssg_test_suite.common.get_test_dir_config(test_dir, product_yaml)[source]
tests.ssg_test_suite.common.install_packages(test_env, packages)[source]
tests.ssg_test_suite.common.load_local_tests(local_tests_paths, local_env_yaml)[source]
tests.ssg_test_suite.common.load_rule_and_env(rule_dir_path, env_yaml, product=None)[source]

Loads a rule and returns the combination of the RuleYAML class and the corresponding local environment for that rule.

tests.ssg_test_suite.common.load_templated_tests(templated_tests_paths, template, local_env_yaml)[source]
tests.ssg_test_suite.common.load_test(absolute_path, rule_template, local_env_yaml)[source]
tests.ssg_test_suite.common.matches_platform(scenario_platforms, benchmark_cpes)[source]
tests.ssg_test_suite.common.retry_with_stdout_logging(command, args, log_file, max_attempts=5)[source]
tests.ssg_test_suite.common.run_cmd_local(command, verbose_path, env=None)[source]
tests.ssg_test_suite.common.run_with_stdout_logging(command, args, log_file)[source]
tests.ssg_test_suite.common.select_templated_tests(test_dir_config, available_scenarios_basenames)[source]
tests.ssg_test_suite.common.send_scripts(test_env, test_content_by_rule_id)[source]
tests.ssg_test_suite.common.walk_through_benchmark_dirs(product=None)[source]
tests.ssg_test_suite.common.write_rule_test_content_to_dir(rule_dir, test_content)[source]

tests.ssg_test_suite.log module

class tests.ssg_test_suite.log.LogHelper[source]

Bases: object

Provide focal point for logging. LOG_DIR is useful when output of script is saved into file. Log preloading is a way to log outcome before the output itself.

FORMATTER = <logging.Formatter object>
INTERMEDIATE_LOGS = {'fail': [], 'notapplicable': [], 'pass': []}
LOG_DIR = None
LOG_FILE = None
classmethod add_console_logger(logger, level)[source]

Convenience function to set defaults for console logger

classmethod add_logging_dir(logger, _dirname)[source]

Convenience function to set default logging into file.

Also sets LOG_DIR and LOG_FILE

static find_name(original_path, suffix='')[source]

Find file name which is still not present in given directory

Returns path – original_path + number + suffix

classmethod log_preloaded(log_target)[source]

Log messages preloaded in one of the named buffers. Wipe out all buffers afterwards.

classmethod preload_log(log_level, log_line, log_target=None)[source]

Save log for later use. Fill named buffer `log_target`with the log line for later use.

Special case: If log_target is default, i.e. None, all buffers will be filled with the same log line.

tests.ssg_test_suite.oscap module

class tests.ssg_test_suite.oscap.AnsibleProfileRunner(environment, profile, datastream, benchmark_id)[source]

Bases: ProfileRunner

initial()[source]
remediation()[source]
class tests.ssg_test_suite.oscap.AnsibleRuleRunner(environment, profile, datastream, benchmark_id, rule_id, script_name, dont_clean, no_reports, manual_debug)[source]

Bases: RuleRunner

initial()[source]
remediation()[source]
class tests.ssg_test_suite.oscap.BashProfileRunner(environment, profile, datastream, benchmark_id)[source]

Bases: ProfileRunner

initial()[source]
remediation()[source]
class tests.ssg_test_suite.oscap.BashRuleRunner(environment, profile, datastream, benchmark_id, rule_id, script_name, dont_clean, no_reports, manual_debug)[source]

Bases: RuleRunner

initial()[source]
remediation()[source]
class tests.ssg_test_suite.oscap.Checker(test_env)[source]

Bases: object

finalize()[source]
run_test_for_all_profiles(profiles, test_data=None)[source]
start()[source]
test_target()[source]
class tests.ssg_test_suite.oscap.GenericRunner(environment, profile, datastream, benchmark_id)[source]

Bases: object

analyze(stage)[source]
final()[source]
property get_command
initial()[source]
make_oscap_call()[source]
prepare_online_scanning_arguments()[source]
remediation()[source]
run_stage(stage)[source]
class tests.ssg_test_suite.oscap.OscapProfileRunner(environment, profile, datastream, benchmark_id)[source]

Bases: ProfileRunner

remediation()[source]
class tests.ssg_test_suite.oscap.OscapRuleRunner(environment, profile, datastream, benchmark_id, rule_id, script_name, dont_clean, no_reports, manual_debug)[source]

Bases: RuleRunner

final()[source]

There is no need to run final scan again - result won’t be different to what we already have in remediation step.

remediation()[source]
class tests.ssg_test_suite.oscap.ProfileRunner(environment, profile, datastream, benchmark_id)[source]

Bases: GenericRunner

final()[source]
make_oscap_call()[source]
class tests.ssg_test_suite.oscap.RuleRunner(environment, profile, datastream, benchmark_id, rule_id, script_name, dont_clean, no_reports, manual_debug)[source]

Bases: GenericRunner

final()[source]
make_oscap_call()[source]
run_stage_with_context(stage, context)[source]
tests.ssg_test_suite.oscap.analysis_to_serializable(analysis)[source]
tests.ssg_test_suite.oscap.find_result_id_in_output(output)[source]
tests.ssg_test_suite.oscap.generate_fixes_remotely(test_env, formatting, verbose_path)[source]
tests.ssg_test_suite.oscap.get_file_remote(test_env, verbose_path, local_dir, remote_path)[source]

Download a file from VM.

tests.ssg_test_suite.oscap.get_result_id_from_arf(arf_path, verbose_path)[source]
tests.ssg_test_suite.oscap.is_virtual_oscap_profile(profile)[source]

Test if the profile belongs to the so called category virtual from OpenSCAP available profiles. It can be (all) or other id we might come up in the future, it just needs to be encapsulated with parenthesis for example “(custom_profile)”.

tests.ssg_test_suite.oscap.process_profile_id(profile)[source]
tests.ssg_test_suite.oscap.run_stage_remediation_ansible(run_type, test_env, formatting, verbose_path)[source]

Returns False on error, or True in case of successful Ansible playbook run.

tests.ssg_test_suite.oscap.run_stage_remediation_bash(run_type, test_env, formatting, verbose_path)[source]

Returns False on error, or True in case of successful bash scripts run.

tests.ssg_test_suite.oscap.save_analysis_to_json(analysis, output_fname)[source]
tests.ssg_test_suite.oscap.send_arf_to_remote_machine_and_generate_remediations_there(run_type, test_env, formatting, verbose_path)[source]
tests.ssg_test_suite.oscap.single_quote_string(input)[source]
tests.ssg_test_suite.oscap.triage_xml_results(fname)[source]

tests.ssg_test_suite.profile module

class tests.ssg_test_suite.profile.ProfileChecker(test_env)[source]

Bases: Checker

Iterate over profiles in data stream and perform scanning of unaltered system using every profile according to input. Also perform remediation run. Return value not defined, textual output and generated reports is the result.

tests.ssg_test_suite.profile.perform_profile_check(options)[source]

tests.ssg_test_suite.rule module

class tests.ssg_test_suite.rule.Rule(directory, id, short_id, template, local_env_yaml, rule)

Bases: tuple

directory

Alias for field number 0

id

Alias for field number 1

local_env_yaml

Alias for field number 4

rule

Alias for field number 5

short_id

Alias for field number 2

template

Alias for field number 3

class tests.ssg_test_suite.rule.RuleChecker(test_env)[source]

Bases: Checker

Rule checks generally work like this - for every profile that supports that rule:

  • Alter the system.

  • Run the scan, check that the result meets expectations. If the test scenario passed as requested, return True, if it failed or passed unexpectedly, return False.

The following sequence applies if the initial scan has failed as expected:

  • If there are no remediations, return True.

  • Run remediation, return False if it failed.

  • Return result of the final scan of remediated system.

copy_of_datastream(new_filename=None)[source]
finalize()[source]
test_rule(state, rule, scenarios)[source]
class tests.ssg_test_suite.rule.RuleTestContent(scenarios, other_content)

Bases: tuple

other_content

Alias for field number 1

scenarios

Alias for field number 0

class tests.ssg_test_suite.rule.Scenario(script, script_contents)[source]

Bases: object

matches_platform(benchmark_cpes)[source]
matches_regex(scenarios_regex)[source]
matches_regex_and_platform(scenarios_regex, benchmark_cpes)[source]
override_profile(scenarios_profile)[source]
tests.ssg_test_suite.rule.generate_xslt_change_value_template(value_short_id, new_value)[source]
tests.ssg_test_suite.rule.get_viable_profiles(selected_profiles, datastream, benchmark, script=None)[source]

Read data stream, and return set intersection of profiles of given benchmark and those provided in selected_profiles parameter.

tests.ssg_test_suite.rule.perform_rule_check(options)[source]

tests.ssg_test_suite.test_env module

class tests.ssg_test_suite.test_env.ContainerTestEnv(scanning_mode, image_name)[source]

Bases: TestEnv

property current_container
property current_image
finalize()[source]

Perform the environment cleanup and shut it down.

get_ip_address()[source]
get_ssh_additional_options()[source]
get_ssh_port()[source]
image_stem2fqn(stem)[source]
offline_scan(args, verbose_path)[source]
reset_state_to(state_name, new_running_state_name)[source]
run_container(image_name, container_name='running')[source]
start()[source]

Run the environment and ensure that the environment will not be permanently modified by subsequent procedures.

class tests.ssg_test_suite.test_env.DockerTestEnv(mode, image_name)[source]

Bases: ContainerTestEnv

get_ip_address()[source]
name = 'docker-based'
class tests.ssg_test_suite.test_env.PodmanTestEnv(scanning_mode, image_name)[source]

Bases: ContainerTestEnv

extract_port_map(podman_network_data)[source]
get_ip_address()[source]
name = 'podman-based'
class tests.ssg_test_suite.test_env.SavedState(environment, name)[source]

Bases: object

classmethod create_from_environment(environment, state_name)[source]
map_on_top(function, args_list)[source]
class tests.ssg_test_suite.test_env.TestEnv(scanning_mode)[source]

Bases: object

arf_to_html(arf_filename)[source]
execute_ssh_command(command, log_file, error_msg_template=None)[source]
Args:
  • command: Command to execute remotely as a single string

  • log_file

  • error_msg_template: A string that can contain references to:

    command, remote_dest, rc, and stderr

finalize()[source]

Perform the environment cleanup and shut it down.

get_ip_address()[source]
get_ssh_additional_options()[source]
get_ssh_port()[source]
offline_scan(args, verbose_path)[source]
online_scan(args, verbose_path)[source]
refresh_connection_parameters()[source]
reset_state_to(state_name, new_running_state_name)[source]
save_state(state_name)[source]
scan(args, verbose_path)[source]
scp_download_file(source, destination, log_file, error_msg=None)[source]
scp_transfer_file(source, destination, log_file, error_msg=None)[source]
scp_upload_file(source, destination, log_file, error_msg=None)[source]
start()[source]

Run the environment and ensure that the environment will not be permanently modified by subsequent procedures.

class tests.ssg_test_suite.test_env.VMTestEnv(mode, hypervisor, domain_name, keep_snapshots)[source]

Bases: TestEnv

finalize()[source]

Perform the environment cleanup and shut it down.

get_ip_address()[source]
has_test_suite_prefix(snapshot_name)[source]
name = 'libvirt-based'
offline_scan(args, verbose_path)[source]
reboot()[source]
reset_state_to(state_name, new_running_state_name)[source]
snapshot_lookup(snapshot_name)[source]
snapshots_cleanup()[source]
start()[source]

Run the environment and ensure that the environment will not be permanently modified by subsequent procedures.

tests.ssg_test_suite.virt module

class tests.ssg_test_suite.virt.SnapshotStack(domain)[source]

Bases: object

CREATE_FLAGS = 128
REVERT_FLAGS = 4
SNAPSHOT_BASE = '<domainsnapshot>  <name>{name}</name>  <description>     Full snapshot by Automatus  </description></domainsnapshot>'
clear()[source]
create(snapshot_name)[source]
delete(snapshot=None)[source]
revert(delete=True)[source]
revert_forced(snapshot)[source]
tests.ssg_test_suite.virt.connect_domain(hypervisor, domain_name)[source]
tests.ssg_test_suite.virt.determine_ip(domain)[source]
tests.ssg_test_suite.virt.reboot_domain(domain, domain_ip, ssh_port)[source]
tests.ssg_test_suite.virt.start_domain(domain)[source]

tests.ssg_test_suite.xml_operations module

tests.ssg_test_suite.xml_operations.add_platform_to_benchmark(root, cpe_regex)[source]
tests.ssg_test_suite.xml_operations.benchmark_get_applicable_platforms(datastream, benchmark_id, logging=None)[source]

Returns a set of CPEs the given benchmark is applicable to.

tests.ssg_test_suite.xml_operations.datastream_root(ds_location, save_location=None)[source]
tests.ssg_test_suite.xml_operations.find_elements(root, element_spec=None)[source]
tests.ssg_test_suite.xml_operations.find_fix_in_benchmark(datastream, benchmark_id, rule_id, fix_type='bash', logging=None)[source]

Return fix from benchmark. None if not found.

tests.ssg_test_suite.xml_operations.find_rule_in_benchmark(datastream, benchmark_id, rule_id, logging=None)[source]

Returns rule node from the given benchmark.

tests.ssg_test_suite.xml_operations.get_all_profiles_in_benchmark(datastream, benchmark_id, logging=None)[source]
tests.ssg_test_suite.xml_operations.get_all_rule_ids_in_profile(datastream, benchmark_id, profile_id, logging=None)[source]
tests.ssg_test_suite.xml_operations.get_all_rule_selections_in_profile(datastream, benchmark_id, profile_id, logging=None)[source]
tests.ssg_test_suite.xml_operations.get_all_rules_in_benchmark(datastream, benchmark_id, logging=None)[source]

Returns all rule IDs in the given benchmark.

tests.ssg_test_suite.xml_operations.get_all_xccdf_ids_in_datastream(datastream)[source]
tests.ssg_test_suite.xml_operations.get_oscap_supported_cpes()[source]

Obtain a list of CPEs that the scanner supports

tests.ssg_test_suite.xml_operations.infer_benchmark_id_from_component_ref_id(datastream, ref_id)[source]
tests.ssg_test_suite.xml_operations.instance_in_platforms(inst, platforms)[source]
tests.ssg_test_suite.xml_operations.remove_ansible_machine_remediation_condition(root)[source]
tests.ssg_test_suite.xml_operations.remove_bash_machine_remediation_condition(root)[source]
tests.ssg_test_suite.xml_operations.remove_fips_certified(root)[source]
tests.ssg_test_suite.xml_operations.remove_machine_platform(root)[source]
tests.ssg_test_suite.xml_operations.remove_machine_remediation_condition(root)[source]
tests.ssg_test_suite.xml_operations.remove_ocp4_platforms(root)[source]
tests.ssg_test_suite.xml_operations.remove_platforms(root)[source]
tests.ssg_test_suite.xml_operations.remove_platforms_from_element(root, element_spec=None, platforms=None)[source]

Module contents