Update and refactor policy-related logic

This commit is contained in:
codeking 2025-11-27 08:46:30 +01:00
parent e330972cf1
commit 5307c64d92
5 changed files with 80 additions and 38 deletions

View file

@ -2,7 +2,7 @@ from collections.abc import Callable
from core.Constants import Constants
from core.Errors import InvalidSubscriptionError, MissingSubscriptionError, ConnectionUnprotectedError, ConnectionTerminationError, CommandNotFoundError
from core.controllers.ConfigurationController import ConfigurationController
from core.controllers.PrivilegePolicyController import PrivilegePolicyController
from core.controllers.PolicyController import PolicyController
from core.controllers.ProfileController import ProfileController
from core.controllers.SessionStateController import SessionStateController
from core.controllers.SystemStateController import SystemStateController
@ -383,10 +383,12 @@ class ConnectionController:
if shutil.which('wg-quick') is None:
raise CommandNotFoundError('wg-quick')
privilege_policy = PolicyController.get('privilege')
permission_denied = False
return_code = None
if PrivilegePolicyController.is_instated():
if privilege_policy.is_instated():
process = subprocess.Popen(('sudo', '-n', 'wg-quick', 'up', profile.get_wireguard_configuration_path()), stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
process.wait()
@ -394,7 +396,7 @@ class ConnectionController:
return_code = process.returncode
permission_denied = return_code != 0 and b'sudo:' in process.stderr.read()
if not PrivilegePolicyController.is_instated() or permission_denied:
if not privilege_policy.is_instated() or permission_denied:
if shutil.which('pkexec') is None:
raise CommandNotFoundError('pkexec')

View file

@ -0,0 +1,32 @@
from core.models.policy.CapabilityPolicy import CapabilityPolicy
from core.models.policy.PrivilegePolicy import PrivilegePolicy
from typing import Union
class PolicyController:
@staticmethod
def get(code: str):
if code == 'capability':
return CapabilityPolicy()
elif code == 'privilege':
return PrivilegePolicy()
return None
@staticmethod
def preview(policy: Union[CapabilityPolicy, PrivilegePolicy]):
return policy.preview()
@staticmethod
def instate(policy: Union[CapabilityPolicy, PrivilegePolicy]):
policy.instate()
@staticmethod
def revoke(policy: Union[CapabilityPolicy, PrivilegePolicy]):
policy.revoke()
@staticmethod
def is_instated(policy: Union[CapabilityPolicy, PrivilegePolicy]):
return policy.is_instated()

20
core/models/BasePolicy.py Normal file
View file

@ -0,0 +1,20 @@
from abc import ABC, abstractmethod
class BasePolicy(ABC):
@abstractmethod
def preview(self):
pass
@abstractmethod
def instate(self):
pass
@abstractmethod
def revoke(self):
pass
@abstractmethod
def is_instated(self):
pass

View file

@ -1,5 +1,6 @@
from core.Constants import Constants
from core.Errors import CommandNotFoundError, PolicyInstatementError, PolicyRevocationError
from core.models.BasePolicy import BasePolicy
from packaging import version
from packaging.version import InvalidVersion
from subprocess import CalledProcessError
@ -9,14 +10,12 @@ import shutil
import subprocess
class CapabilityPolicyController:
class CapabilityPolicy(BasePolicy):
@staticmethod
def preview():
return CapabilityPolicyController.__generate()
def preview(self):
return self.__generate()
@staticmethod
def instate():
def instate(self):
if shutil.which('pkexec') is None:
raise CommandNotFoundError('pkexec')
@ -27,10 +26,10 @@ class CapabilityPolicyController:
if shutil.which('service') is None:
raise CommandNotFoundError('service')
if not CapabilityPolicyController.__is_compatible():
if not self.__is_compatible():
raise PolicyInstatementError('The capability policy is not compatible.')
capability_policy = CapabilityPolicyController.__generate()
capability_policy = self.__generate()
completed_successfully = False
failed_attempt_count = 0
@ -50,26 +49,18 @@ class CapabilityPolicyController:
if not completed_successfully:
raise PolicyInstatementError('The capability policy could not be instated.')
@staticmethod
def revoke():
def revoke(self):
if shutil.which('pkexec') is None:
raise CommandNotFoundError('pkexec')
if shutil.which('sh') is None:
raise CommandNotFoundError('sh')
if shutil.which('service') is None:
raise CommandNotFoundError('service')
process = subprocess.Popen(('pkexec', 'sh', '-c', f'rm {Constants.HV_CAPABILITY_POLICY_PATH} && service apparmor reload'))
process = subprocess.Popen(('pkexec', 'rm', Constants.HV_CAPABILITY_POLICY_PATH))
completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8)
if not completed_successfully:
raise PolicyRevocationError('The capability policy could not be revoked.')
@staticmethod
def is_instated():
def is_instated(self):
return os.path.exists(Constants.HV_CAPABILITY_POLICY_PATH)
@staticmethod

View file

@ -1,5 +1,6 @@
from core.Constants import Constants
from core.Errors import CommandNotFoundError, PolicyAssignmentError, PolicyInstatementError, PolicyRevocationError
from core.Errors import CommandNotFoundError, PolicyInstatementError, PolicyRevocationError, PolicyAssignmentError
from core.models.BasePolicy import BasePolicy
from packaging import version
from packaging.version import InvalidVersion
from subprocess import CalledProcessError
@ -10,25 +11,23 @@ import shutil
import subprocess
class PrivilegePolicyController:
class PrivilegePolicy(BasePolicy):
@staticmethod
def preview():
def preview(self):
username = PrivilegePolicyController.__determine_username()
return PrivilegePolicyController.__generate(username)
username = self.__determine_username()
return self.__generate(username)
@staticmethod
def instate():
def instate(self):
if shutil.which('pkexec') is None:
raise CommandNotFoundError('pkexec')
if not PrivilegePolicyController.__is_compatible():
if not self.__is_compatible():
raise PolicyInstatementError('The privilege policy is not compatible.')
username = PrivilegePolicyController.__determine_username()
privilege_policy = PrivilegePolicyController.__generate(username)
username = self.__determine_username()
privilege_policy = self.__generate(username)
completed_successfully = False
failed_attempt_count = 0
@ -48,8 +47,7 @@ class PrivilegePolicyController:
if not completed_successfully:
raise PolicyInstatementError('The privilege policy could not be instated.')
@staticmethod
def revoke():
def revoke(self):
if shutil.which('pkexec') is None:
raise CommandNotFoundError('pkexec')
@ -60,8 +58,7 @@ class PrivilegePolicyController:
if not completed_successfully:
raise PolicyRevocationError('The privilege policy could not be revoked.')
@staticmethod
def is_instated():
def is_instated(self):
return os.path.exists(Constants.HV_PRIVILEGE_POLICY_PATH)
@staticmethod
@ -89,7 +86,7 @@ class PrivilegePolicyController:
try:
process_output = subprocess.check_output(['sudo', '-V'], text=True)
except CalledProcessError:
except (CalledProcessError, FileNotFoundError):
return False
if process_output.splitlines():