Update and refactor policy-related logic
This commit is contained in:
parent
e330972cf1
commit
a97baf734a
4 changed files with 75 additions and 35 deletions
32
core/controllers/PolicyController.py
Normal file
32
core/controllers/PolicyController.py
Normal file
|
|
@ -0,0 +1,32 @@
|
||||||
|
from core.models.policy.CapabilityPolicy import CapabilityPolicy
|
||||||
|
from core.models.policy.PrivilegePolicy import PrivilegePolicy
|
||||||
|
from typing import Union
|
||||||
|
|
||||||
|
|
||||||
|
class PolicyController:
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get(code: str):
|
||||||
|
|
||||||
|
if code == 'capability':
|
||||||
|
return CapabilityPolicy()
|
||||||
|
elif code == 'privilege':
|
||||||
|
return PrivilegePolicy()
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def preview(policy: Union[CapabilityPolicy, PrivilegePolicy]):
|
||||||
|
return policy.preview()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def instate(policy: Union[CapabilityPolicy, PrivilegePolicy]):
|
||||||
|
policy.instate()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def revoke(policy: Union[CapabilityPolicy, PrivilegePolicy]):
|
||||||
|
policy.revoke()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def is_instated(policy: Union[CapabilityPolicy, PrivilegePolicy]):
|
||||||
|
return policy.is_instated()
|
||||||
20
core/models/BasePolicy.py
Normal file
20
core/models/BasePolicy.py
Normal file
|
|
@ -0,0 +1,20 @@
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
|
||||||
|
class BasePolicy(ABC):
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def preview(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def instate(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def revoke(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def is_instated(self):
|
||||||
|
pass
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
from core.Constants import Constants
|
from core.Constants import Constants
|
||||||
from core.Errors import CommandNotFoundError, PolicyInstatementError, PolicyRevocationError
|
from core.Errors import CommandNotFoundError, PolicyInstatementError, PolicyRevocationError
|
||||||
|
from core.models.BasePolicy import BasePolicy
|
||||||
from packaging import version
|
from packaging import version
|
||||||
from packaging.version import InvalidVersion
|
from packaging.version import InvalidVersion
|
||||||
from subprocess import CalledProcessError
|
from subprocess import CalledProcessError
|
||||||
|
|
@ -9,14 +10,12 @@ import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
|
|
||||||
class CapabilityPolicyController:
|
class CapabilityPolicy(BasePolicy):
|
||||||
|
|
||||||
@staticmethod
|
def preview(self):
|
||||||
def preview():
|
return self.__generate()
|
||||||
return CapabilityPolicyController.__generate()
|
|
||||||
|
|
||||||
@staticmethod
|
def instate(self):
|
||||||
def instate():
|
|
||||||
|
|
||||||
if shutil.which('pkexec') is None:
|
if shutil.which('pkexec') is None:
|
||||||
raise CommandNotFoundError('pkexec')
|
raise CommandNotFoundError('pkexec')
|
||||||
|
|
@ -27,10 +26,10 @@ class CapabilityPolicyController:
|
||||||
if shutil.which('service') is None:
|
if shutil.which('service') is None:
|
||||||
raise CommandNotFoundError('service')
|
raise CommandNotFoundError('service')
|
||||||
|
|
||||||
if not CapabilityPolicyController.__is_compatible():
|
if not self.__is_compatible():
|
||||||
raise PolicyInstatementError('The capability policy is not compatible.')
|
raise PolicyInstatementError('The capability policy is not compatible.')
|
||||||
|
|
||||||
capability_policy = CapabilityPolicyController.__generate()
|
capability_policy = self.__generate()
|
||||||
|
|
||||||
completed_successfully = False
|
completed_successfully = False
|
||||||
failed_attempt_count = 0
|
failed_attempt_count = 0
|
||||||
|
|
@ -50,26 +49,18 @@ class CapabilityPolicyController:
|
||||||
if not completed_successfully:
|
if not completed_successfully:
|
||||||
raise PolicyInstatementError('The capability policy could not be instated.')
|
raise PolicyInstatementError('The capability policy could not be instated.')
|
||||||
|
|
||||||
@staticmethod
|
def revoke(self):
|
||||||
def revoke():
|
|
||||||
|
|
||||||
if shutil.which('pkexec') is None:
|
if shutil.which('pkexec') is None:
|
||||||
raise CommandNotFoundError('pkexec')
|
raise CommandNotFoundError('pkexec')
|
||||||
|
|
||||||
if shutil.which('sh') is None:
|
process = subprocess.Popen(('pkexec', 'rm', Constants.HV_CAPABILITY_POLICY_PATH))
|
||||||
raise CommandNotFoundError('sh')
|
|
||||||
|
|
||||||
if shutil.which('service') is None:
|
|
||||||
raise CommandNotFoundError('service')
|
|
||||||
|
|
||||||
process = subprocess.Popen(('pkexec', 'sh', '-c', f'rm {Constants.HV_CAPABILITY_POLICY_PATH} && service apparmor reload'))
|
|
||||||
completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8)
|
completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8)
|
||||||
|
|
||||||
if not completed_successfully:
|
if not completed_successfully:
|
||||||
raise PolicyRevocationError('The capability policy could not be revoked.')
|
raise PolicyRevocationError('The capability policy could not be revoked.')
|
||||||
|
|
||||||
@staticmethod
|
def is_instated(self):
|
||||||
def is_instated():
|
|
||||||
return os.path.exists(Constants.HV_CAPABILITY_POLICY_PATH)
|
return os.path.exists(Constants.HV_CAPABILITY_POLICY_PATH)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
from core.Constants import Constants
|
from core.Constants import Constants
|
||||||
from core.Errors import CommandNotFoundError, PolicyAssignmentError, PolicyInstatementError, PolicyRevocationError
|
from core.Errors import CommandNotFoundError, PolicyInstatementError, PolicyRevocationError, PolicyAssignmentError
|
||||||
|
from core.models.BasePolicy import BasePolicy
|
||||||
from packaging import version
|
from packaging import version
|
||||||
from packaging.version import InvalidVersion
|
from packaging.version import InvalidVersion
|
||||||
from subprocess import CalledProcessError
|
from subprocess import CalledProcessError
|
||||||
|
|
@ -10,25 +11,23 @@ import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
|
|
||||||
class PrivilegePolicyController:
|
class PrivilegePolicy(BasePolicy):
|
||||||
|
|
||||||
@staticmethod
|
def preview(self):
|
||||||
def preview():
|
|
||||||
|
|
||||||
username = PrivilegePolicyController.__determine_username()
|
username = self.__determine_username()
|
||||||
return PrivilegePolicyController.__generate(username)
|
return self.__generate(username)
|
||||||
|
|
||||||
@staticmethod
|
def instate(self):
|
||||||
def instate():
|
|
||||||
|
|
||||||
if shutil.which('pkexec') is None:
|
if shutil.which('pkexec') is None:
|
||||||
raise CommandNotFoundError('pkexec')
|
raise CommandNotFoundError('pkexec')
|
||||||
|
|
||||||
if not PrivilegePolicyController.__is_compatible():
|
if not self.__is_compatible():
|
||||||
raise PolicyInstatementError('The privilege policy is not compatible.')
|
raise PolicyInstatementError('The privilege policy is not compatible.')
|
||||||
|
|
||||||
username = PrivilegePolicyController.__determine_username()
|
username = self.__determine_username()
|
||||||
privilege_policy = PrivilegePolicyController.__generate(username)
|
privilege_policy = self.__generate(username)
|
||||||
|
|
||||||
completed_successfully = False
|
completed_successfully = False
|
||||||
failed_attempt_count = 0
|
failed_attempt_count = 0
|
||||||
|
|
@ -48,8 +47,7 @@ class PrivilegePolicyController:
|
||||||
if not completed_successfully:
|
if not completed_successfully:
|
||||||
raise PolicyInstatementError('The privilege policy could not be instated.')
|
raise PolicyInstatementError('The privilege policy could not be instated.')
|
||||||
|
|
||||||
@staticmethod
|
def revoke(self):
|
||||||
def revoke():
|
|
||||||
|
|
||||||
if shutil.which('pkexec') is None:
|
if shutil.which('pkexec') is None:
|
||||||
raise CommandNotFoundError('pkexec')
|
raise CommandNotFoundError('pkexec')
|
||||||
|
|
@ -60,8 +58,7 @@ class PrivilegePolicyController:
|
||||||
if not completed_successfully:
|
if not completed_successfully:
|
||||||
raise PolicyRevocationError('The privilege policy could not be revoked.')
|
raise PolicyRevocationError('The privilege policy could not be revoked.')
|
||||||
|
|
||||||
@staticmethod
|
def is_instated(self):
|
||||||
def is_instated():
|
|
||||||
return os.path.exists(Constants.HV_PRIVILEGE_POLICY_PATH)
|
return os.path.exists(Constants.HV_PRIVILEGE_POLICY_PATH)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|
@ -89,7 +86,7 @@ class PrivilegePolicyController:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
process_output = subprocess.check_output(['sudo', '-V'], text=True)
|
process_output = subprocess.check_output(['sudo', '-V'], text=True)
|
||||||
except CalledProcessError:
|
except (CalledProcessError, FileNotFoundError):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if process_output.splitlines():
|
if process_output.splitlines():
|
||||||
Loading…
Reference in a new issue