diff --git a/core/models/system/SystemProfile.py b/core/models/system/SystemProfile.py index bd00de4..1bcf0a9 100644 --- a/core/models/system/SystemProfile.py +++ b/core/models/system/SystemProfile.py @@ -4,11 +4,11 @@ from core.models.BaseProfile import BaseProfile from core.models.system.SystemConnection import SystemConnection from dataclasses import dataclass from typing import Optional +import json import os import shutil import subprocess - @dataclass class SystemProfile(BaseProfile): connection: Optional[SystemConnection] @@ -17,33 +17,23 @@ class SystemProfile(BaseProfile): return self.__get_system_config_path(self.id) def save(self): - if 'location' in self._get_dirty_keys(): self.__delete_wireguard_configuration() - super().save() def attach_wireguard_configuration(self, wireguard_configuration): - if shutil.which('pkexec') is None: raise CommandNotFoundError('pkexec') - wireguard_configuration_file_backup_path = f'{self.get_config_path()}/wg.conf.bak' - with open(wireguard_configuration_file_backup_path, 'w') as wireguard_configuration_file: wireguard_configuration_file.write(wireguard_configuration) - wireguard_configuration_is_attached = False failed_attempt_count = 0 - while not wireguard_configuration_is_attached and failed_attempt_count < 3: - process = subprocess.Popen(('pkexec', 'install', '-D', wireguard_configuration_file_backup_path, self.get_wireguard_configuration_path(), '-o', 'root', '-m', '744')) wireguard_configuration_is_attached = not bool(os.waitpid(process.pid, 0)[1] >> 8) - if not wireguard_configuration_is_attached: failed_attempt_count += 1 - if not wireguard_configuration_is_attached: raise ProfileModificationError('The WireGuard configuration could not be attached.') @@ -54,41 +44,57 @@ class SystemProfile(BaseProfile): return os.path.isfile(f'{self.get_system_config_path()}/wg.conf') def address_security_incident(self): - super().address_security_incident() self.__delete_wireguard_configuration() def delete(self): - try: self.__delete_wireguard_configuration() except ProfileModificationError: raise ProfileDeletionError('The WireGuard configuration could not be deleted.') - if shutil.which('pkexec') is None: raise CommandNotFoundError('pkexec') - process = subprocess.Popen(('pkexec', 'rm', '-d', self.get_system_config_path())) completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8) - if not completed_successfully: raise ProfileDeletionError('The profile could not be deleted.') - super().delete() + def attach_operator_proxy_session(self, operator_proxy_session): + from core.models.OperatorProxySession import OperatorProxySession + operator_proxy_session_file_contents = f'{operator_proxy_session.to_json(indent=4)}\n' + os.makedirs(self.get_config_path(), exist_ok=True) + operator_proxy_session_file_path = self.get_operator_proxy_session_path() + with open(operator_proxy_session_file_path, 'w') as operator_proxy_session_file: + operator_proxy_session_file.write(operator_proxy_session_file_contents) + + def get_operator_proxy_session_path(self): + return f'{self.get_config_path()}/operator_proxy_session.json' + + def get_operator_proxy_session(self): + try: + config_file_contents = open(self.get_operator_proxy_session_path(), 'r').read() + except FileNotFoundError: + return None + try: + data = json.loads(config_file_contents) + except ValueError: + return None + from core.models.OperatorProxySession import OperatorProxySession + return OperatorProxySession.from_dict(data) + + def has_operator_proxy_session(self): + return os.path.isfile(self.get_operator_proxy_session_path()) + def __delete_wireguard_configuration(self): - if self.has_wireguard_configuration(): - if shutil.which('pkexec') is None: raise CommandNotFoundError('pkexec') - process = subprocess.Popen(('pkexec', 'rm', '-d', self.get_wireguard_configuration_path())) completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8) - if not completed_successfully: raise ProfileModificationError('The WireGuard configuration could not be deleted.') @staticmethod def __get_system_config_path(id: int): - return f'{Constants.HV_SYSTEM_PROFILE_CONFIG_PATH}/{str(id)}' + return f'{Constants.HV_SYSTEM_PROFILE_CONFIG_PATH}/{str(id)}' \ No newline at end of file