from __future__ import print_function

from tempfile import mkdtemp
import io
import os
import os.path
import sys
import shutil
import re
import argparse
import getpass
import yaml
import collections

SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
PLAYBOOK_ROOT = os.path.join(SSG_ROOT, "build", "ansible")

    from github import Github, InputGitAuthor, UnknownObjectException
except ImportError:
    print("Please install PyGithub, on Fedora it's in the python-PyGithub package.",
    raise SystemExit(1)

    import ssg.ansible
    import ssg.yaml
    from ssg.utils import mkdir_p
except ImportError:
    print("Unable to find the ssg module. Please run 'source .pyenv'", file=sys.stderr)
    raise SystemExit(1)

def memoize(f):
    memo = {}
    def helper(x):
        if x not in memo:
            memo[x] = f(x)
        return memo[x]
    return helper
# The following code preserves ansible yaml order # code from arcaduf's gist # _mapping_tag = yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG
def dict_representer(dumper, data):
    return dumper.represent_mapping(_mapping_tag, data.items())
def dict_constructor(loader, node):
    return collections.OrderedDict(loader.construct_pairs(node))
yaml.add_representer(collections.OrderedDict, dict_representer) yaml.add_constructor(_mapping_tag, dict_constructor) # End arcaduf gist PRODUCT_ALLOWLIST = set([ "rhel7", "rhel8", "rhel9", ]) PROFILE_ALLOWLIST = set([ "anssi_nt28_enhanced", "anssi_nt28_high", "anssi_nt28_intermediary", "anssi_nt28_minimal", "anssi_bp28_enhanced", "anssi_bp28_high", "anssi_bp28_intermediary", "anssi_bp28_minimal", "C2S", "cis", "cjis", "hipaa", "cui", "ospp", "pci-dss", "rht-ccp", "stig", "rhvh-stig", "rhvh-vpp", "e8", "ism", ]) ORGANIZATION_NAME = "RedHatOfficial" GIT_COMMIT_AUTHOR_NAME = "ComplianceAsCode development team" GIT_COMMIT_AUTHOR_EMAIL = "" META_TEMPLATE_PATH = os.path.join( os.path.dirname(os.path.abspath(__file__)), "ansible_galaxy_meta_template.yml" ) README_TEMPLATE_PATH = os.path.join( os.path.dirname(os.path.abspath(__file__)), "" )
def create_empty_repositories(github_new_repos, github_org):
    for github_new_repo in github_new_repos:
        print("Creating new Github repository: %s" % github_new_repo)
        github_org.create_repo(
            github_new_repo,
            description="Role generated from ComplianceAsCode Project",
            homepage="",
            private=False,
            has_issues=False,
            has_wiki=False,
            has_downloads=False)
def clone_and_init_repository(parent_dir, organization, repo):
    os.system(
        "git clone" % (organization, repo))
    os.system("ansible-galaxy init " + repo + " --force")
    os.chdir(repo)
    try:
        os.system('git add .')
        os.system('git commit -a -m "Initial commit" --author "%s <%s>"' %
                  (GIT_COMMIT_AUTHOR_NAME, GIT_COMMIT_AUTHOR_EMAIL))
        os.system('git push origin main')
    finally:
        os.chdir("..")
def update_repo_release(github, repo):
    repo_tags = [tag for tag in repo.get_tags()]
    try:
        (majv, minv, rel) = repo_tags[0].name.split(".")
        rel = int(rel) + 1
    except IndexError:
        cac = github.get_repo("ComplianceAsCode/content")
        cac_tags = [tag for tag in cac.get_tags() if != "v0.5.0-InitialDraft"]
        (majv, minv, rel) = cac_tags[0].name.strip("v").split(".")
    new_tag = ("%s.%s.%s" % (majv, minv, rel))
    commits = repo.get_commits()
    print("Tagging new release '%s' for repo '%s'" % (new_tag,
    repo.create_git_tag_and_release(new_tag, '', '', '', commits[0].sha, 'commit')
class PlaybookToRoleConverter():
    PRODUCED_FILES = ('defaults/main.yml', 'meta/main.yml',
                      'tasks/main.yml', 'vars/main.yml', '')

    def __init__(self, local_playbook_filename):
        self._local_playbook_filename = local_playbook_filename

        # ansible language doesn't allow pre_tasks for roles, if the only pre task
        # is the ansible version check we can ignore it because the minimal version
        # is in role metadata
        if "pre_tasks" in self._playbook[0]:
            pre_tasks_data = self._playbook[0]["pre_tasks"]
            if len(pre_tasks_data) == 1 and \
                    pre_tasks_data[0]["name"] == \
                    ssg.ansible.ansible_version_requirement_pre_task_name:
                pass
            else:
                sys.stderr.write(
                    "%s contains pre_tasks other than the version check. "
                    "pre_tasks are not supported for ansible roles and "
                    "will be skipped!.\n")

    @property
    @memoize
    def name(self):
        root, _ = os.path.splitext(os.path.basename(self._local_playbook_filename))
        product, _, profile = root.split("-", 2)
        return "%s_%s" % (product, profile.replace("-", "_").lower())

    @property
    @memoize
    def product(self):
        # Returns the first part [product] of name.
        # ex: rhel7_stig
        # returns: rhel7
        return"_")[0]

    @property
    @memoize
    def profile(self):
        # Returns the second part [profile] of name.
        # ex: rhe7_anssi_nt28_enhanced
        # returns: anssi_nt28_enhanced
        return"_", 1)[1]

    @property
    @memoize
    def tasks_data(self):
        return self._playbook[0]["tasks"] if "tasks" in self._playbook[0] else []

    @property
    @memoize
    def tasks_local_content(self):
        return yaml.dump(self.tasks_data, width=120, default_flow_style=False) \
            .replace('\n- ', '\n\n- ')

    @property
    @memoize
    def default_vars_data(self):
        return self._playbook[0]["vars"] if "vars" in self._playbook[0] else []

    @property
    @memoize
    def added_variables(self):
        variables = set()
        for task in self.tasks_data:
            if "tags" not in task:
                next
            if "when" not in task:
                task["when"] = []
            elif isinstance(task["when"], str):
                task["when"] = [task["when"]]

            variables_to_add = {self._sanitize_tag(tag) for tag in task["tags"]
                                if self._tag_is_valid_variable(tag)}
            task["when"] = ["{varname} | bool".format(
                varname=v) for v in sorted(variables_to_add)] + task["when"]
            variables.update(variables_to_add)

            if not task["when"]:
                del task["when"]

        return variables

    @property
    def vars_data(self):
        return []

    @property
    @memoize
    def title(self):
        try:
            title ='Profile Title:\s+(.+)$', self._description, re.MULTILINE).group(1)
            return '"' + title + '"'
        except AttributeError:
            return'Ansible Playbook for\s+(.+)$', self._description, re.MULTILINE) \
                .group(1)

    @property
    @memoize
    def description_md(self):
        # This is for a role and not a playbook
        description = re.sub(r'Playbook', "Role", self._description)
        # Fix the description format for markdown so that it looks pretty
        return description.replace('\n', ' \n')

    @property
    @memoize
    def _playbook(self):
        return ssg.yaml.ordered_load(self._raw_playbook)

    @property
    @memoize
    def _raw_playbook(self):
        with, 'r', encoding="utf-8") as f:
            return

    @property
    @memoize
    def platform_version(self):
        platform = self.product
        # Check to see if this is RHEL product
        if platform in PRODUCT_ALLOWLIST:
            # For RHEL, we can get what version
            if 'rhel' in platform:
                return platform[-1]
            return "7\n      - 8"
        return "TBD"

    @property
    @memoize
    def _description(self):
        separator = "#" * 79
        offset_from_separator = 3
        first_separator_pos = self._raw_playbook.find(separator)
        second_separator_pos = self._raw_playbook.find(separator, first_separator_pos + len(separator))
        description_start = first_separator_pos + len(separator) + offset_from_separator
        description_stop = second_separator_pos - offset_from_separator
        description = self._raw_playbook[description_start:description_stop]
        description = description.replace('# ', '')
        description = description.replace('#', '')
        desc = ""
        # Remove SCAP and Playbook examples from description as they don't belong in roles.
        for line in description.split("\n"):
            if line.startswith("Profile ID:"):
                break
            else:
                desc += (line + "\n")
        return desc.strip("\n\n")

    @property
    def _update_galaxy_tags(self):
        galaxy_tags = {}
        # These are the default tags that all roles share
        tags = [
            "system",
            "hardening",
            "openscap",
            "ssg",
            "scap",
            "security",
            "compliance",
            "complianceascode",
            "redhatofficial",
            "redhat",
        ]
        prod = self.product
        prof = self.profile
        tags.append(prod)
        tags.append(prof.replace("_", ""))
        if prof == 'stig':
            tags.append("disa")
        if 'anssi' in prof:
            tags.append("anssi")
        galaxy_tags['galaxy_tags'] = tags
        return galaxy_tags

    def _tag_is_valid_variable(self, tag):
        if "DISA-STIG" in tag:
            return True
        # rules of kind package_* and service_* can have hyphen in their rule IDs
        pattern = re.compile('(package_.*_(installed|removed))|(service_.*_(enabled|disabled))')
        if pattern.match(tag):
            return True
        return '-' not in tag and tag != 'always'

    def _sanitize_tag(self, tag):
        return tag.replace("-", "_")
def file(self, filepath):
        if filepath == 'tasks/main.yml':
            return self.tasks_local_content
        elif filepath == 'vars/main.yml':
            if len(self.vars_data) < 1:
                return "---\n# defaults file for {role_name}\n".format(
            else:
                return yaml.dump(self.vars_data, width=120, indent=4, default_flow_style=False)
        elif filepath == '':
            return self._generate_readme_content()
        elif filepath == 'defaults/main.yml':
            return self._generate_defaults_content()
        elif filepath == 'meta/main.yml':
            return self._generate_meta_content()
def _generate_readme_content(self): with, 'r', encoding="utf-8") as f: readme_template = local_readme_content = readme_template.replace( "@DESCRIPTION@", self.description_md) local_readme_content = local_readme_content.replace( "@TITLE@", self.title) local_readme_content = local_readme_content.replace( "@MIN_ANSIBLE_VERSION@", ssg.ansible.min_ansible_version) local_readme_content = local_readme_content.replace( "@ROLE_NAME@", return local_readme_content def _generate_meta_content(self): with open(META_TEMPLATE_PATH, 'r') as f: meta_template = local_meta_content = meta_template.replace( "@ROLE_NAME@", local_meta_content = local_meta_content.replace( "@DESCRIPTION@", self.title) local_meta_content = local_meta_content.replace( "@PLATFORM_VERSION@", self.platform_version) local_meta_content = local_meta_content.replace( "@GALAXY_TAGS@", yaml.dump(self._update_galaxy_tags).replace("- ", " - ")) return local_meta_content.replace( "@MIN_ANSIBLE_VERSION@", ssg.ansible.min_ansible_version) def _generate_defaults_content(self): default_vars_to_add = sorted(self.added_variables) default_vars_local_content = yaml.dump(self.default_vars_data, width=120, indent=4, default_flow_style=False) header = [ "---", "# defaults file for {role_name}\n".format(, ] lines = ["{var_name}: true".format(var_name=var_name) for var_name in default_vars_to_add] lines.append("") return ("%s%s%s" % ("\n".join(header), default_vars_local_content, "\n".join(lines)))
def save_to_disk(self, directory):
        print("Converting Ansible Playbook {} to Ansible Role {}".format(self._local_playbook_filename,
                                                                          os.path.join(directory,
        for filename in self.PRODUCED_FILES:
            abs_path = os.path.join(directory,, filename)
            mkdir_p(os.path.dirname(abs_path))
            open(abs_path, 'wb').write(self.file(filename).encode("utf-8"))
class RoleGithubUpdater(object):
    def __init__(self, repo, local_playbook_filename):
        self.remote_repo = repo
        self.role = PlaybookToRoleConverter(local_playbook_filename)

    def _local_content(self, filepath):
        new_content = self.role.file(filepath)
        return new_content

    def _get_blob_content(self, branch, path_name):
        """
        see:
        """
        ref = self.remote_repo.get_git_ref(f'heads/{branch}')
        tree = self.remote_repo.get_git_tree(ref.object.sha, recursive='/' in path_name).tree
        sha = [x.sha for x in tree if x.path == path_name]
        if not sha:
            return None
        blob = self.remote_repo.get_git_blob(sha[0])
        import base64
        b64 = base64.b64decode(blob.content)
        return (b64.decode("utf8"), sha[0])

    def _get_contents(self, path_name, branch='main'):
        """
        First try to use traditional's github API to get package contents,
        since this API can't fetch file size more than 1MB, use another API
        when failed.
        """
        content = self.remote_repo.get_contents(path_name, ref=branch)
        if content.content:
            return (content.decoded_content.decode("utf-8"), content.sha)
        blob = self._get_blob_content(branch, path_name)
        if blob is None:
            raise UnknownObjectException(
                'unable to locate file: ' + path_name + ' in branch: ' + branch)
        return blob

    def _remote_content(self, filepath):
        # We want the raw string to compare against _local_content
        # New repos use main instead of master
        branch = 'master'
        if "rhel9" in self.remote_repo.full_name:
            branch = 'main'
        content, sha = self._get_contents(filepath, branch)
        return content, sha

    def _update_content_if_needed(self, filepath):
        remote_content, sha = self._remote_content(filepath)
        if self._local_content(filepath) != remote_content:
            self.remote_repo.update_file(
                filepath,
                "Updated " + filepath,
                self._local_content(filepath),
                sha,
                author=InputGitAuthor(
                    GIT_COMMIT_AUTHOR_NAME,
                    GIT_COMMIT_AUTHOR_EMAIL)
            )
            print("Updating %s in %s" % (filepath,
def update_repository(self):
        print("Processing %s..." %
        for path in PlaybookToRoleConverter.PRODUCED_FILES:
            self._update_content_if_needed(path)
        repo_description = (
            "{title} - Ansible role generated from ComplianceAsCode Project"
            .format(title=self.role.title))
        self.remote_repo.edit(
  ,
            description=repo_description,
            homepage="",
        )
def parse_args():
    parser = argparse.ArgumentParser(
        description='Generates Ansible Roles and pushes them to Github')
    parser.add_argument(
        "--build-playbooks-dir",
        help="Path to directory containing the generated Ansible Playbooks. "
             "Most likely this is going to be ./build/ansible. Defaults to {}".format(PLAYBOOK_ROOT),
        dest="build_playbooks_dir",
        default=PLAYBOOK_ROOT)
    parser.add_argument(
        "--dry-run", "-d",
        dest="dry_run",
        help="Do not push Ansible Roles to Github, store them only to the given local directory."
    )
    parser.add_argument(
        "--organization", "-o",
        default=ORGANIZATION_NAME,
        help="Name of the Github organization to publish roles to
[docs] def locally_clone_and_init_repositories(organization, repo_list): temp_dir = mkdtemp() current_dir = os.getcwd() os.chdir(temp_dir) try: for repo in repo_list: clone_and_init_repository(temp_dir, organization, repo) finally: os.chdir(current_dir) shutil.rmtree(temp_dir)
[docs] def select_roles_to_upload(product_allowlist, profile_allowlist, build_playbooks_dir): selected_roles = dict() for filename in sorted(os.listdir(build_playbooks_dir)): root, ext = os.path.splitext(filename) if ext == ".yml": # the format is product-playbook-profile.yml product, _, profile = root.split("-", 2) if product in product_allowlist and profile in profile_allowlist: role_name = "ansible-role-%s-%s" % (product, profile) selected_roles[role_name] = (product, profile) return selected_roles
[docs] def main(): args = parse_args() product_allowlist = set(PRODUCT_ALLOWLIST) profile_allowlist = set(PROFILE_ALLOWLIST) potential_roles = { ("ansible-role-%s-%s" % (product, profile)) for product in product_allowlist for profile in profile_allowlist } if args.product: product_allowlist &= set(args.product) if args.profile: profile_allowlist &= set(args.profile) selected_roles = select_roles_to_upload( product_allowlist, profile_allowlist, args.build_playbooks_dir ) if args.dry_run: for product_profile in selected_roles.values(): playbook_filename = "%s-playbook-%s.yml" % product_profile playbook_full_path = os.path.join( args.build_playbooks_dir, playbook_filename) PlaybookToRoleConverter(playbook_full_path).save_to_disk(args.dry_run) else: if not args.token: print("Input your GitHub credentials:") username = input("username or token: ") password = getpass.getpass("password (or empty for token): ") else: username = args.token password = "" github = Github(username, password) github_org = github.get_organization(args.organization) github_repositories = [ for repo in github_org.get_repos()] # Create empty repositories github_new_repos = sorted(list(set(map(str.lower, selected_roles.keys())) - set(map(str.lower, github_repositories)))) if github_new_repos: create_empty_repositories(github_new_repos, github_org) locally_clone_and_init_repositories(args.organization, github_new_repos) # Update repositories for repo in sorted(github_org.get_repos(), key=lambda repo: if in selected_roles: playbook_filename = "%s-playbook-%s.yml" % selected_roles[] playbook_full_path = os.path.join( args.build_playbooks_dir, playbook_filename) RoleGithubUpdater(repo, playbook_full_path).update_repository() if args.tag_release: update_repo_release(github, repo) elif not in potential_roles: print("Repo '%s' is not managed by this script. " "It may need to be deleted, please verify and do that " "manually!" %, file=sys.stderr)
if __name__ == "__main__": main()