diff --git a/core/controllers/PolicyController.py b/core/controllers/PolicyController.py new file mode 100644 index 0000000..15ea4ee --- /dev/null +++ b/core/controllers/PolicyController.py @@ -0,0 +1,32 @@ +from core.models.policy.CapabilityPolicy import CapabilityPolicy +from core.models.policy.PrivilegePolicy import PrivilegePolicy +from typing import Union + + +class PolicyController: + + @staticmethod + def get(code: str): + + if code == 'capability': + return CapabilityPolicy() + elif code == 'privilege': + return PrivilegePolicy() + + return None + + @staticmethod + def preview(policy: Union[CapabilityPolicy, PrivilegePolicy]): + return policy.preview() + + @staticmethod + def instate(policy: Union[CapabilityPolicy, PrivilegePolicy]): + policy.instate() + + @staticmethod + def revoke(policy: Union[CapabilityPolicy, PrivilegePolicy]): + policy.revoke() + + @staticmethod + def is_instated(policy: Union[CapabilityPolicy, PrivilegePolicy]): + return policy.is_instated() diff --git a/core/models/BasePolicy.py b/core/models/BasePolicy.py new file mode 100644 index 0000000..c293f02 --- /dev/null +++ b/core/models/BasePolicy.py @@ -0,0 +1,20 @@ +from abc import ABC, abstractmethod + + +class BasePolicy(ABC): + + @abstractmethod + def preview(self): + pass + + @abstractmethod + def instate(self): + pass + + @abstractmethod + def revoke(self): + pass + + @abstractmethod + def is_instated(self): + pass diff --git a/core/controllers/CapabilityPolicyController.py b/core/models/policy/CapabilityPolicy.py similarity index 81% rename from core/controllers/CapabilityPolicyController.py rename to core/models/policy/CapabilityPolicy.py index 588e274..31e708e 100644 --- a/core/controllers/CapabilityPolicyController.py +++ b/core/models/policy/CapabilityPolicy.py @@ -1,5 +1,6 @@ from core.Constants import Constants from core.Errors import CommandNotFoundError, PolicyInstatementError, PolicyRevocationError +from core.models.BasePolicy import BasePolicy from packaging import version from packaging.version import InvalidVersion from subprocess import CalledProcessError @@ -9,14 +10,12 @@ import shutil import subprocess -class CapabilityPolicyController: +class CapabilityPolicy(BasePolicy): - @staticmethod - def preview(): - return CapabilityPolicyController.__generate() + def preview(self): + return self.__generate() - @staticmethod - def instate(): + def instate(self): if shutil.which('pkexec') is None: raise CommandNotFoundError('pkexec') @@ -27,10 +26,10 @@ class CapabilityPolicyController: if shutil.which('service') is None: raise CommandNotFoundError('service') - if not CapabilityPolicyController.__is_compatible(): + if not self.__is_compatible(): raise PolicyInstatementError('The capability policy is not compatible.') - capability_policy = CapabilityPolicyController.__generate() + capability_policy = self.__generate() completed_successfully = False failed_attempt_count = 0 @@ -50,26 +49,18 @@ class CapabilityPolicyController: if not completed_successfully: raise PolicyInstatementError('The capability policy could not be instated.') - @staticmethod - def revoke(): + def revoke(self): if shutil.which('pkexec') is None: raise CommandNotFoundError('pkexec') - if shutil.which('sh') is None: - raise CommandNotFoundError('sh') - - if shutil.which('service') is None: - raise CommandNotFoundError('service') - - process = subprocess.Popen(('pkexec', 'sh', '-c', f'rm {Constants.HV_CAPABILITY_POLICY_PATH} && service apparmor reload')) + process = subprocess.Popen(('pkexec', 'rm', Constants.HV_CAPABILITY_POLICY_PATH)) completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8) if not completed_successfully: raise PolicyRevocationError('The capability policy could not be revoked.') - @staticmethod - def is_instated(): + def is_instated(self): return os.path.exists(Constants.HV_CAPABILITY_POLICY_PATH) @staticmethod diff --git a/core/controllers/PrivilegePolicyController.py b/core/models/policy/PrivilegePolicy.py similarity index 81% rename from core/controllers/PrivilegePolicyController.py rename to core/models/policy/PrivilegePolicy.py index 0df6f6f..6c709a1 100644 --- a/core/controllers/PrivilegePolicyController.py +++ b/core/models/policy/PrivilegePolicy.py @@ -1,5 +1,6 @@ from core.Constants import Constants -from core.Errors import CommandNotFoundError, PolicyAssignmentError, PolicyInstatementError, PolicyRevocationError +from core.Errors import CommandNotFoundError, PolicyInstatementError, PolicyRevocationError, PolicyAssignmentError +from core.models.BasePolicy import BasePolicy from packaging import version from packaging.version import InvalidVersion from subprocess import CalledProcessError @@ -10,25 +11,23 @@ import shutil import subprocess -class PrivilegePolicyController: +class PrivilegePolicy(BasePolicy): - @staticmethod - def preview(): + def preview(self): - username = PrivilegePolicyController.__determine_username() - return PrivilegePolicyController.__generate(username) + username = self.__determine_username() + return self.__generate(username) - @staticmethod - def instate(): + def instate(self): if shutil.which('pkexec') is None: raise CommandNotFoundError('pkexec') - if not PrivilegePolicyController.__is_compatible(): + if not self.__is_compatible(): raise PolicyInstatementError('The privilege policy is not compatible.') - username = PrivilegePolicyController.__determine_username() - privilege_policy = PrivilegePolicyController.__generate(username) + username = self.__determine_username() + privilege_policy = self.__generate(username) completed_successfully = False failed_attempt_count = 0 @@ -48,8 +47,7 @@ class PrivilegePolicyController: if not completed_successfully: raise PolicyInstatementError('The privilege policy could not be instated.') - @staticmethod - def revoke(): + def revoke(self): if shutil.which('pkexec') is None: raise CommandNotFoundError('pkexec') @@ -60,8 +58,7 @@ class PrivilegePolicyController: if not completed_successfully: raise PolicyRevocationError('The privilege policy could not be revoked.') - @staticmethod - def is_instated(): + def is_instated(self): return os.path.exists(Constants.HV_PRIVILEGE_POLICY_PATH) @staticmethod @@ -89,7 +86,7 @@ class PrivilegePolicyController: try: process_output = subprocess.check_output(['sudo', '-V'], text=True) - except CalledProcessError: + except (CalledProcessError, FileNotFoundError): return False if process_output.splitlines():