From e330972cf134323663a9595a9cd5936c88760173 Mon Sep 17 00:00:00 2001 From: codeking Date: Thu, 27 Nov 2025 08:02:48 +0100 Subject: [PATCH] Add initial support for capability policies --- core/Constants.py | 1 + .../controllers/CapabilityPolicyController.py | 113 ++++++++++++++++++ 2 files changed, 114 insertions(+) create mode 100644 core/controllers/CapabilityPolicyController.py diff --git a/core/Constants.py b/core/Constants.py index 524b03a..96a5cca 100644 --- a/core/Constants.py +++ b/core/Constants.py @@ -39,6 +39,7 @@ class Constants: HV_STORAGE_DATABASE_PATH: Final[str] = f'{HV_DATA_HOME}/storage.db' + HV_CAPABILITY_POLICY_PATH: Final[str] = f'{SYSTEM_CONFIG_PATH}/apparmor.d/hydra-veil' HV_PRIVILEGE_POLICY_PATH: Final[str] = f'{SYSTEM_CONFIG_PATH}/sudoers.d/hydra-veil' HV_SESSION_STATE_HOME: Final[str] = f'{HV_STATE_HOME}/sessions' diff --git a/core/controllers/CapabilityPolicyController.py b/core/controllers/CapabilityPolicyController.py new file mode 100644 index 0000000..588e274 --- /dev/null +++ b/core/controllers/CapabilityPolicyController.py @@ -0,0 +1,113 @@ +from core.Constants import Constants +from core.Errors import CommandNotFoundError, PolicyInstatementError, PolicyRevocationError +from packaging import version +from packaging.version import InvalidVersion +from subprocess import CalledProcessError +import os +import re +import shutil +import subprocess + + +class CapabilityPolicyController: + + @staticmethod + def preview(): + return CapabilityPolicyController.__generate() + + @staticmethod + def instate(): + + if shutil.which('pkexec') is None: + raise CommandNotFoundError('pkexec') + + if shutil.which('sh') is None: + raise CommandNotFoundError('sh') + + if shutil.which('service') is None: + raise CommandNotFoundError('service') + + if not CapabilityPolicyController.__is_compatible(): + raise PolicyInstatementError('The capability policy is not compatible.') + + capability_policy = CapabilityPolicyController.__generate() + + completed_successfully = False + failed_attempt_count = 0 + + while not completed_successfully and failed_attempt_count < 3: + + process = subprocess.Popen([ + 'pkexec', 'sh', '-c', f'install /dev/stdin {Constants.HV_CAPABILITY_POLICY_PATH} -o root -m 644 && service apparmor reload' + ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + process.communicate(f'{capability_policy}\n') + completed_successfully = (process.returncode == 0) + + if not completed_successfully: + failed_attempt_count += 1 + + if not completed_successfully: + raise PolicyInstatementError('The capability policy could not be instated.') + + @staticmethod + def revoke(): + + if shutil.which('pkexec') is None: + raise CommandNotFoundError('pkexec') + + if shutil.which('sh') is None: + raise CommandNotFoundError('sh') + + if shutil.which('service') is None: + raise CommandNotFoundError('service') + + process = subprocess.Popen(('pkexec', 'sh', '-c', f'rm {Constants.HV_CAPABILITY_POLICY_PATH} && service apparmor reload')) + completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8) + + if not completed_successfully: + raise PolicyRevocationError('The capability policy could not be revoked.') + + @staticmethod + def is_instated(): + return os.path.exists(Constants.HV_CAPABILITY_POLICY_PATH) + + @staticmethod + def __generate(): + + return '\n'.join([ + 'abi ,', + 'include ', + '', + 'profile hv-bwrap /usr/bin/bwrap flags=(unconfined) {', + ' userns,', + '', + ' # Site-specific additions and overrides. See local/README for details.', + ' include if exists ', + '}' + ]) + + @staticmethod + def __is_compatible(): + + try: + process_output = subprocess.check_output(['apparmor_parser', '-V'], text=True) + except (CalledProcessError, FileNotFoundError): + return False + + if process_output.splitlines(): + apparmor_version_details = process_output.splitlines()[0].strip() + else: + return False + + apparmor_version_number = (m := re.search(r'(\d[0-9.]+?)(?=[p~+-]|$)', apparmor_version_details)) and m.group(1) + + if not apparmor_version_number: + return False + + try: + apparmor_version = version.parse(apparmor_version_number) + except InvalidVersion: + return False + + return apparmor_version >= version.parse('4.0.0') and os.path.isdir('/etc/apparmor.d')