diff --git a/core/controllers/ConnectionController.py b/core/controllers/ConnectionController.py index f9b2858..6c15afe 100644 --- a/core/controllers/ConnectionController.py +++ b/core/controllers/ConnectionController.py @@ -2,7 +2,7 @@ from collections.abc import Callable from core.Constants import Constants from core.Errors import InvalidSubscriptionError, MissingSubscriptionError, ConnectionUnprotectedError, ConnectionTerminationError, CommandNotFoundError from core.controllers.ConfigurationController import ConfigurationController -from core.controllers.PrivilegePolicyController import PrivilegePolicyController +from core.controllers.PolicyController import PolicyController from core.controllers.ProfileController import ProfileController from core.controllers.SessionStateController import SessionStateController from core.controllers.SystemStateController import SystemStateController @@ -383,10 +383,12 @@ class ConnectionController: if shutil.which('wg-quick') is None: raise CommandNotFoundError('wg-quick') + privilege_policy = PolicyController.get('privilege') + permission_denied = False return_code = None - if PrivilegePolicyController.is_instated(): + if privilege_policy.is_instated(): process = subprocess.Popen(('sudo', '-n', 'wg-quick', 'up', profile.get_wireguard_configuration_path()), stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) process.wait() @@ -394,7 +396,7 @@ class ConnectionController: return_code = process.returncode permission_denied = return_code != 0 and b'sudo:' in process.stderr.read() - if not PrivilegePolicyController.is_instated() or permission_denied: + if not privilege_policy.is_instated() or permission_denied: if shutil.which('pkexec') is None: raise CommandNotFoundError('pkexec') diff --git a/core/controllers/PolicyController.py b/core/controllers/PolicyController.py new file mode 100644 index 0000000..d9ac774 --- /dev/null +++ b/core/controllers/PolicyController.py @@ -0,0 +1,36 @@ +from core.models.policy.CapabilityPolicy import CapabilityPolicy +from core.models.policy.PrivilegePolicy import PrivilegePolicy +from typing import Union + + +class PolicyController: + + @staticmethod + def get(code: str): + + if code == 'capability': + return CapabilityPolicy() + elif code == 'privilege': + return PrivilegePolicy() + + return None + + @staticmethod + def preview(policy: Union[CapabilityPolicy, PrivilegePolicy]): + return policy.preview() + + @staticmethod + def instate(policy: Union[CapabilityPolicy, PrivilegePolicy]): + policy.instate() + + @staticmethod + def revoke(policy: Union[CapabilityPolicy, PrivilegePolicy]): + policy.revoke() + + @staticmethod + def is_suggestible(policy: Union[CapabilityPolicy, PrivilegePolicy]): + return policy.is_suggestible() + + @staticmethod + def is_instated(policy: Union[CapabilityPolicy, PrivilegePolicy]): + return policy.is_instated() diff --git a/core/models/BasePolicy.py b/core/models/BasePolicy.py new file mode 100644 index 0000000..71c9dd4 --- /dev/null +++ b/core/models/BasePolicy.py @@ -0,0 +1,24 @@ +from abc import ABC, abstractmethod + + +class BasePolicy(ABC): + + @abstractmethod + def preview(self): + pass + + @abstractmethod + def instate(self): + pass + + @abstractmethod + def revoke(self): + pass + + @abstractmethod + def is_suggestible(self): + pass + + @abstractmethod + def is_instated(self): + pass diff --git a/core/controllers/CapabilityPolicyController.py b/core/models/policy/CapabilityPolicy.py similarity index 70% rename from core/controllers/CapabilityPolicyController.py rename to core/models/policy/CapabilityPolicy.py index 588e274..5040085 100644 --- a/core/controllers/CapabilityPolicyController.py +++ b/core/models/policy/CapabilityPolicy.py @@ -1,5 +1,6 @@ from core.Constants import Constants from core.Errors import CommandNotFoundError, PolicyInstatementError, PolicyRevocationError +from core.models.BasePolicy import BasePolicy from packaging import version from packaging.version import InvalidVersion from subprocess import CalledProcessError @@ -9,14 +10,12 @@ import shutil import subprocess -class CapabilityPolicyController: +class CapabilityPolicy(BasePolicy): - @staticmethod - def preview(): - return CapabilityPolicyController.__generate() + def preview(self): + return self.__generate() - @staticmethod - def instate(): + def instate(self): if shutil.which('pkexec') is None: raise CommandNotFoundError('pkexec') @@ -27,19 +26,19 @@ class CapabilityPolicyController: if shutil.which('service') is None: raise CommandNotFoundError('service') - if not CapabilityPolicyController.__is_compatible(): + if not self.__is_compatible(): raise PolicyInstatementError('The capability policy is not compatible.') - capability_policy = CapabilityPolicyController.__generate() + capability_policy = self.__generate() completed_successfully = False failed_attempt_count = 0 while not completed_successfully and failed_attempt_count < 3: - process = subprocess.Popen([ + process = subprocess.Popen(( 'pkexec', 'sh', '-c', f'install /dev/stdin {Constants.HV_CAPABILITY_POLICY_PATH} -o root -m 644 && service apparmor reload' - ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + ), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) process.communicate(f'{capability_policy}\n') completed_successfully = (process.returncode == 0) @@ -50,8 +49,7 @@ class CapabilityPolicyController: if not completed_successfully: raise PolicyInstatementError('The capability policy could not be instated.') - @staticmethod - def revoke(): + def revoke(self): if shutil.which('pkexec') is None: raise CommandNotFoundError('pkexec') @@ -59,23 +57,40 @@ class CapabilityPolicyController: if shutil.which('sh') is None: raise CommandNotFoundError('sh') - if shutil.which('service') is None: - raise CommandNotFoundError('service') + if shutil.which('apparmor_parser') is None: + raise CommandNotFoundError('apparmor_parser') + + process = subprocess.Popen(( + 'pkexec', 'sh', '-c', f'apparmor_parser -R {Constants.HV_CAPABILITY_POLICY_PATH} && rm {Constants.HV_CAPABILITY_POLICY_PATH}' + )) - process = subprocess.Popen(('pkexec', 'sh', '-c', f'rm {Constants.HV_CAPABILITY_POLICY_PATH} && service apparmor reload')) completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8) if not completed_successfully: raise PolicyRevocationError('The capability policy could not be revoked.') - @staticmethod - def is_instated(): + def is_suggestible(self): + + try: + + process = subprocess.Popen(('bwrap', '--bind', '/', '/', 'true'), stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) + process.wait() + + if process.returncode != 0: + return self.__is_compatible() and b'setting up uid map' in process.stderr.read() + + except FileNotFoundError: + pass + + return False + + def is_instated(self): return os.path.exists(Constants.HV_CAPABILITY_POLICY_PATH) @staticmethod def __generate(): - return '\n'.join([ + return '\n'.join(( 'abi ,', 'include ', '', @@ -85,13 +100,13 @@ class CapabilityPolicyController: ' # Site-specific additions and overrides. See local/README for details.', ' include if exists ', '}' - ]) + )) @staticmethod def __is_compatible(): try: - process_output = subprocess.check_output(['apparmor_parser', '-V'], text=True) + process_output = subprocess.check_output(('apparmor_parser', '-V'), text=True) except (CalledProcessError, FileNotFoundError): return False diff --git a/core/controllers/PrivilegePolicyController.py b/core/models/policy/PrivilegePolicy.py similarity index 74% rename from core/controllers/PrivilegePolicyController.py rename to core/models/policy/PrivilegePolicy.py index 0df6f6f..8501238 100644 --- a/core/controllers/PrivilegePolicyController.py +++ b/core/models/policy/PrivilegePolicy.py @@ -1,5 +1,6 @@ from core.Constants import Constants -from core.Errors import CommandNotFoundError, PolicyAssignmentError, PolicyInstatementError, PolicyRevocationError +from core.Errors import CommandNotFoundError, PolicyInstatementError, PolicyRevocationError, PolicyAssignmentError +from core.models.BasePolicy import BasePolicy from packaging import version from packaging.version import InvalidVersion from subprocess import CalledProcessError @@ -10,34 +11,32 @@ import shutil import subprocess -class PrivilegePolicyController: +class PrivilegePolicy(BasePolicy): - @staticmethod - def preview(): + def preview(self): - username = PrivilegePolicyController.__determine_username() - return PrivilegePolicyController.__generate(username) + username = self.__determine_username() + return self.__generate(username) - @staticmethod - def instate(): + def instate(self): if shutil.which('pkexec') is None: raise CommandNotFoundError('pkexec') - if not PrivilegePolicyController.__is_compatible(): + if not self.__is_compatible(): raise PolicyInstatementError('The privilege policy is not compatible.') - username = PrivilegePolicyController.__determine_username() - privilege_policy = PrivilegePolicyController.__generate(username) + username = self.__determine_username() + privilege_policy = self.__generate(username) completed_successfully = False failed_attempt_count = 0 while not completed_successfully and failed_attempt_count < 3: - process = subprocess.Popen([ + process = subprocess.Popen(( 'pkexec', 'install', '/dev/stdin', Constants.HV_PRIVILEGE_POLICY_PATH, '-o', 'root', '-m', '440' - ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + ), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) process.communicate(f'{privilege_policy}\n') completed_successfully = (process.returncode == 0) @@ -48,8 +47,7 @@ class PrivilegePolicyController: if not completed_successfully: raise PolicyInstatementError('The privilege policy could not be instated.') - @staticmethod - def revoke(): + def revoke(self): if shutil.which('pkexec') is None: raise CommandNotFoundError('pkexec') @@ -60,8 +58,10 @@ class PrivilegePolicyController: if not completed_successfully: raise PolicyRevocationError('The privilege policy could not be revoked.') - @staticmethod - def is_instated(): + def is_suggestible(self): + return self.__is_compatible() + + def is_instated(self): return os.path.exists(Constants.HV_PRIVILEGE_POLICY_PATH) @staticmethod @@ -80,16 +80,16 @@ class PrivilegePolicyController: @staticmethod def __generate(username: str): - return '\n'.join([ - f'{username} ALL=(root) NOPASSWD: /usr/bin/wg-quick ^up {Constants.HV_SYSTEM_PROFILE_CONFIG_PATH}/[0-9]+/wg.conf$' - ]) + return '\n'.join(( + f'{username} ALL=(root) NOPASSWD: /usr/bin/wg-quick ^up {Constants.HV_SYSTEM_PROFILE_CONFIG_PATH}/[0-9]+/wg.conf$', + )) @staticmethod def __is_compatible(): try: - process_output = subprocess.check_output(['sudo', '-V'], text=True) - except CalledProcessError: + process_output = subprocess.check_output(('sudo', '-V'), text=True) + except (CalledProcessError, FileNotFoundError): return False if process_output.splitlines():