Update and refactor policy-related logic
This commit is contained in:
parent
e330972cf1
commit
8c51e85a70
5 changed files with 122 additions and 45 deletions
|
|
@ -2,7 +2,7 @@ from collections.abc import Callable
|
|||
from core.Constants import Constants
|
||||
from core.Errors import InvalidSubscriptionError, MissingSubscriptionError, ConnectionUnprotectedError, ConnectionTerminationError, CommandNotFoundError
|
||||
from core.controllers.ConfigurationController import ConfigurationController
|
||||
from core.controllers.PrivilegePolicyController import PrivilegePolicyController
|
||||
from core.controllers.PolicyController import PolicyController
|
||||
from core.controllers.ProfileController import ProfileController
|
||||
from core.controllers.SessionStateController import SessionStateController
|
||||
from core.controllers.SystemStateController import SystemStateController
|
||||
|
|
@ -383,10 +383,12 @@ class ConnectionController:
|
|||
if shutil.which('wg-quick') is None:
|
||||
raise CommandNotFoundError('wg-quick')
|
||||
|
||||
privilege_policy = PolicyController.get('privilege')
|
||||
|
||||
permission_denied = False
|
||||
return_code = None
|
||||
|
||||
if PrivilegePolicyController.is_instated():
|
||||
if privilege_policy.is_instated():
|
||||
|
||||
process = subprocess.Popen(('sudo', '-n', 'wg-quick', 'up', profile.get_wireguard_configuration_path()), stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
|
||||
process.wait()
|
||||
|
|
@ -394,7 +396,7 @@ class ConnectionController:
|
|||
return_code = process.returncode
|
||||
permission_denied = return_code != 0 and b'sudo:' in process.stderr.read()
|
||||
|
||||
if not PrivilegePolicyController.is_instated() or permission_denied:
|
||||
if not privilege_policy.is_instated() or permission_denied:
|
||||
|
||||
if shutil.which('pkexec') is None:
|
||||
raise CommandNotFoundError('pkexec')
|
||||
|
|
|
|||
36
core/controllers/PolicyController.py
Normal file
36
core/controllers/PolicyController.py
Normal file
|
|
@ -0,0 +1,36 @@
|
|||
from core.models.policy.CapabilityPolicy import CapabilityPolicy
|
||||
from core.models.policy.PrivilegePolicy import PrivilegePolicy
|
||||
from typing import Union
|
||||
|
||||
|
||||
class PolicyController:
|
||||
|
||||
@staticmethod
|
||||
def get(code: str):
|
||||
|
||||
if code == 'capability':
|
||||
return CapabilityPolicy()
|
||||
elif code == 'privilege':
|
||||
return PrivilegePolicy()
|
||||
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def preview(policy: Union[CapabilityPolicy, PrivilegePolicy]):
|
||||
return policy.preview()
|
||||
|
||||
@staticmethod
|
||||
def instate(policy: Union[CapabilityPolicy, PrivilegePolicy]):
|
||||
policy.instate()
|
||||
|
||||
@staticmethod
|
||||
def revoke(policy: Union[CapabilityPolicy, PrivilegePolicy]):
|
||||
policy.revoke()
|
||||
|
||||
@staticmethod
|
||||
def is_suggestible(policy: Union[CapabilityPolicy, PrivilegePolicy]):
|
||||
return policy.is_suggestible()
|
||||
|
||||
@staticmethod
|
||||
def is_instated(policy: Union[CapabilityPolicy, PrivilegePolicy]):
|
||||
return policy.is_instated()
|
||||
24
core/models/BasePolicy.py
Normal file
24
core/models/BasePolicy.py
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class BasePolicy(ABC):
|
||||
|
||||
@abstractmethod
|
||||
def preview(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def instate(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def revoke(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def is_suggestible(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def is_instated(self):
|
||||
pass
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
from core.Constants import Constants
|
||||
from core.Errors import CommandNotFoundError, PolicyInstatementError, PolicyRevocationError
|
||||
from core.models.BasePolicy import BasePolicy
|
||||
from packaging import version
|
||||
from packaging.version import InvalidVersion
|
||||
from subprocess import CalledProcessError
|
||||
|
|
@ -9,14 +10,12 @@ import shutil
|
|||
import subprocess
|
||||
|
||||
|
||||
class CapabilityPolicyController:
|
||||
class CapabilityPolicy(BasePolicy):
|
||||
|
||||
@staticmethod
|
||||
def preview():
|
||||
return CapabilityPolicyController.__generate()
|
||||
def preview(self):
|
||||
return self.__generate()
|
||||
|
||||
@staticmethod
|
||||
def instate():
|
||||
def instate(self):
|
||||
|
||||
if shutil.which('pkexec') is None:
|
||||
raise CommandNotFoundError('pkexec')
|
||||
|
|
@ -27,19 +26,19 @@ class CapabilityPolicyController:
|
|||
if shutil.which('service') is None:
|
||||
raise CommandNotFoundError('service')
|
||||
|
||||
if not CapabilityPolicyController.__is_compatible():
|
||||
if not self.__is_compatible():
|
||||
raise PolicyInstatementError('The capability policy is not compatible.')
|
||||
|
||||
capability_policy = CapabilityPolicyController.__generate()
|
||||
capability_policy = self.__generate()
|
||||
|
||||
completed_successfully = False
|
||||
failed_attempt_count = 0
|
||||
|
||||
while not completed_successfully and failed_attempt_count < 3:
|
||||
|
||||
process = subprocess.Popen([
|
||||
process = subprocess.Popen((
|
||||
'pkexec', 'sh', '-c', f'install /dev/stdin {Constants.HV_CAPABILITY_POLICY_PATH} -o root -m 644 && service apparmor reload'
|
||||
], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||||
), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||||
|
||||
process.communicate(f'{capability_policy}\n')
|
||||
completed_successfully = (process.returncode == 0)
|
||||
|
|
@ -50,8 +49,7 @@ class CapabilityPolicyController:
|
|||
if not completed_successfully:
|
||||
raise PolicyInstatementError('The capability policy could not be instated.')
|
||||
|
||||
@staticmethod
|
||||
def revoke():
|
||||
def revoke(self):
|
||||
|
||||
if shutil.which('pkexec') is None:
|
||||
raise CommandNotFoundError('pkexec')
|
||||
|
|
@ -59,23 +57,40 @@ class CapabilityPolicyController:
|
|||
if shutil.which('sh') is None:
|
||||
raise CommandNotFoundError('sh')
|
||||
|
||||
if shutil.which('service') is None:
|
||||
raise CommandNotFoundError('service')
|
||||
if shutil.which('apparmor_parser') is None:
|
||||
raise CommandNotFoundError('apparmor_parser')
|
||||
|
||||
process = subprocess.Popen((
|
||||
'pkexec', 'sh', '-c', f'apparmor_parser -R {Constants.HV_CAPABILITY_POLICY_PATH} && rm {Constants.HV_CAPABILITY_POLICY_PATH}'
|
||||
))
|
||||
|
||||
process = subprocess.Popen(('pkexec', 'sh', '-c', f'rm {Constants.HV_CAPABILITY_POLICY_PATH} && service apparmor reload'))
|
||||
completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8)
|
||||
|
||||
if not completed_successfully:
|
||||
raise PolicyRevocationError('The capability policy could not be revoked.')
|
||||
|
||||
@staticmethod
|
||||
def is_instated():
|
||||
def is_suggestible(self):
|
||||
|
||||
try:
|
||||
|
||||
process = subprocess.Popen(('bwrap', '--bind', '/', '/', 'true'), stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
|
||||
process.wait()
|
||||
|
||||
if process.returncode != 0:
|
||||
return self.__is_compatible() and b'setting up uid map' in process.stderr.read()
|
||||
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
return False
|
||||
|
||||
def is_instated(self):
|
||||
return os.path.exists(Constants.HV_CAPABILITY_POLICY_PATH)
|
||||
|
||||
@staticmethod
|
||||
def __generate():
|
||||
|
||||
return '\n'.join([
|
||||
return '\n'.join((
|
||||
'abi <abi/4.0>,',
|
||||
'include <tunables/global>',
|
||||
'',
|
||||
|
|
@ -85,13 +100,13 @@ class CapabilityPolicyController:
|
|||
' # Site-specific additions and overrides. See local/README for details.',
|
||||
' include if exists <local/hv-bwrap-userns-restrict>',
|
||||
'}'
|
||||
])
|
||||
))
|
||||
|
||||
@staticmethod
|
||||
def __is_compatible():
|
||||
|
||||
try:
|
||||
process_output = subprocess.check_output(['apparmor_parser', '-V'], text=True)
|
||||
process_output = subprocess.check_output(('apparmor_parser', '-V'), text=True)
|
||||
except (CalledProcessError, FileNotFoundError):
|
||||
return False
|
||||
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
from core.Constants import Constants
|
||||
from core.Errors import CommandNotFoundError, PolicyAssignmentError, PolicyInstatementError, PolicyRevocationError
|
||||
from core.Errors import CommandNotFoundError, PolicyInstatementError, PolicyRevocationError, PolicyAssignmentError
|
||||
from core.models.BasePolicy import BasePolicy
|
||||
from packaging import version
|
||||
from packaging.version import InvalidVersion
|
||||
from subprocess import CalledProcessError
|
||||
|
|
@ -10,34 +11,32 @@ import shutil
|
|||
import subprocess
|
||||
|
||||
|
||||
class PrivilegePolicyController:
|
||||
class PrivilegePolicy(BasePolicy):
|
||||
|
||||
@staticmethod
|
||||
def preview():
|
||||
def preview(self):
|
||||
|
||||
username = PrivilegePolicyController.__determine_username()
|
||||
return PrivilegePolicyController.__generate(username)
|
||||
username = self.__determine_username()
|
||||
return self.__generate(username)
|
||||
|
||||
@staticmethod
|
||||
def instate():
|
||||
def instate(self):
|
||||
|
||||
if shutil.which('pkexec') is None:
|
||||
raise CommandNotFoundError('pkexec')
|
||||
|
||||
if not PrivilegePolicyController.__is_compatible():
|
||||
if not self.__is_compatible():
|
||||
raise PolicyInstatementError('The privilege policy is not compatible.')
|
||||
|
||||
username = PrivilegePolicyController.__determine_username()
|
||||
privilege_policy = PrivilegePolicyController.__generate(username)
|
||||
username = self.__determine_username()
|
||||
privilege_policy = self.__generate(username)
|
||||
|
||||
completed_successfully = False
|
||||
failed_attempt_count = 0
|
||||
|
||||
while not completed_successfully and failed_attempt_count < 3:
|
||||
|
||||
process = subprocess.Popen([
|
||||
process = subprocess.Popen((
|
||||
'pkexec', 'install', '/dev/stdin', Constants.HV_PRIVILEGE_POLICY_PATH, '-o', 'root', '-m', '440'
|
||||
], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||||
), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||||
|
||||
process.communicate(f'{privilege_policy}\n')
|
||||
completed_successfully = (process.returncode == 0)
|
||||
|
|
@ -48,8 +47,7 @@ class PrivilegePolicyController:
|
|||
if not completed_successfully:
|
||||
raise PolicyInstatementError('The privilege policy could not be instated.')
|
||||
|
||||
@staticmethod
|
||||
def revoke():
|
||||
def revoke(self):
|
||||
|
||||
if shutil.which('pkexec') is None:
|
||||
raise CommandNotFoundError('pkexec')
|
||||
|
|
@ -60,8 +58,10 @@ class PrivilegePolicyController:
|
|||
if not completed_successfully:
|
||||
raise PolicyRevocationError('The privilege policy could not be revoked.')
|
||||
|
||||
@staticmethod
|
||||
def is_instated():
|
||||
def is_suggestible(self):
|
||||
return self.__is_compatible()
|
||||
|
||||
def is_instated(self):
|
||||
return os.path.exists(Constants.HV_PRIVILEGE_POLICY_PATH)
|
||||
|
||||
@staticmethod
|
||||
|
|
@ -80,16 +80,16 @@ class PrivilegePolicyController:
|
|||
@staticmethod
|
||||
def __generate(username: str):
|
||||
|
||||
return '\n'.join([
|
||||
f'{username} ALL=(root) NOPASSWD: /usr/bin/wg-quick ^up {Constants.HV_SYSTEM_PROFILE_CONFIG_PATH}/[0-9]+/wg.conf$'
|
||||
])
|
||||
return '\n'.join((
|
||||
f'{username} ALL=(root) NOPASSWD: /usr/bin/wg-quick ^up {Constants.HV_SYSTEM_PROFILE_CONFIG_PATH}/[0-9]+/wg.conf$',
|
||||
))
|
||||
|
||||
@staticmethod
|
||||
def __is_compatible():
|
||||
|
||||
try:
|
||||
process_output = subprocess.check_output(['sudo', '-V'], text=True)
|
||||
except CalledProcessError:
|
||||
process_output = subprocess.check_output(('sudo', '-V'), text=True)
|
||||
except (CalledProcessError, FileNotFoundError):
|
||||
return False
|
||||
|
||||
if process_output.splitlines():
|
||||
Loading…
Reference in a new issue