100 lines
No EOL
4.6 KiB
Python
100 lines
No EOL
4.6 KiB
Python
from core.Constants import Constants
|
|
from core.Errors import ProfileDeletionError, ProfileModificationError, CommandNotFoundError
|
|
from core.models.BaseProfile import BaseProfile
|
|
from core.models.system.SystemConnection import SystemConnection
|
|
from dataclasses import dataclass
|
|
from typing import Optional
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
|
|
@dataclass
|
|
class SystemProfile(BaseProfile):
|
|
connection: Optional[SystemConnection]
|
|
|
|
def get_system_config_path(self):
|
|
return self.__get_system_config_path(self.id)
|
|
|
|
def save(self):
|
|
if 'location' in self._get_dirty_keys():
|
|
self.__delete_wireguard_configuration()
|
|
super().save()
|
|
|
|
def attach_wireguard_configuration(self, wireguard_configuration):
|
|
if shutil.which('pkexec') is None:
|
|
raise CommandNotFoundError('pkexec')
|
|
wireguard_configuration_file_backup_path = f'{self.get_config_path()}/wg.conf.bak'
|
|
with open(wireguard_configuration_file_backup_path, 'w') as wireguard_configuration_file:
|
|
wireguard_configuration_file.write(wireguard_configuration)
|
|
wireguard_configuration_is_attached = False
|
|
failed_attempt_count = 0
|
|
while not wireguard_configuration_is_attached and failed_attempt_count < 3:
|
|
process = subprocess.Popen(('pkexec', 'install', '-D', wireguard_configuration_file_backup_path, self.get_wireguard_configuration_path(), '-o', 'root', '-m', '744'))
|
|
wireguard_configuration_is_attached = not bool(os.waitpid(process.pid, 0)[1] >> 8)
|
|
if not wireguard_configuration_is_attached:
|
|
failed_attempt_count += 1
|
|
if not wireguard_configuration_is_attached:
|
|
raise ProfileModificationError('The WireGuard configuration could not be attached.')
|
|
|
|
def get_wireguard_configuration_path(self):
|
|
return f'{self.get_system_config_path()}/wg.conf'
|
|
|
|
def has_wireguard_configuration(self):
|
|
return os.path.isfile(f'{self.get_system_config_path()}/wg.conf')
|
|
|
|
def address_security_incident(self):
|
|
super().address_security_incident()
|
|
self.__delete_wireguard_configuration()
|
|
|
|
def delete(self):
|
|
try:
|
|
self.__delete_wireguard_configuration()
|
|
except ProfileModificationError:
|
|
raise ProfileDeletionError('The WireGuard configuration could not be deleted.')
|
|
if shutil.which('pkexec') is None:
|
|
raise CommandNotFoundError('pkexec')
|
|
process = subprocess.Popen(('pkexec', 'rm', '-d', self.get_system_config_path()))
|
|
completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8)
|
|
if not completed_successfully:
|
|
raise ProfileDeletionError('The profile could not be deleted.')
|
|
super().delete()
|
|
|
|
def attach_operator_proxy_session(self, operator_proxy_session):
|
|
from core.models.OperatorProxySession import OperatorProxySession
|
|
operator_proxy_session_file_contents = f'{operator_proxy_session.to_json(indent=4)}\n'
|
|
os.makedirs(self.get_config_path(), exist_ok=True)
|
|
operator_proxy_session_file_path = self.get_operator_proxy_session_path()
|
|
with open(operator_proxy_session_file_path, 'w') as operator_proxy_session_file:
|
|
operator_proxy_session_file.write(operator_proxy_session_file_contents)
|
|
|
|
def get_operator_proxy_session_path(self):
|
|
return f'{self.get_config_path()}/operator_proxy_session.json'
|
|
|
|
def get_operator_proxy_session(self):
|
|
try:
|
|
config_file_contents = open(self.get_operator_proxy_session_path(), 'r').read()
|
|
except FileNotFoundError:
|
|
return None
|
|
try:
|
|
data = json.loads(config_file_contents)
|
|
except ValueError:
|
|
return None
|
|
from core.models.OperatorProxySession import OperatorProxySession
|
|
return OperatorProxySession.from_dict(data)
|
|
|
|
def has_operator_proxy_session(self):
|
|
return os.path.isfile(self.get_operator_proxy_session_path())
|
|
|
|
def __delete_wireguard_configuration(self):
|
|
if self.has_wireguard_configuration():
|
|
if shutil.which('pkexec') is None:
|
|
raise CommandNotFoundError('pkexec')
|
|
process = subprocess.Popen(('pkexec', 'rm', '-d', self.get_wireguard_configuration_path()))
|
|
completed_successfully = not bool(os.waitpid(process.pid, 0)[1] >> 8)
|
|
if not completed_successfully:
|
|
raise ProfileModificationError('The WireGuard configuration could not be deleted.')
|
|
|
|
@staticmethod
|
|
def __get_system_config_path(id: int):
|
|
return f'{Constants.HV_SYSTEM_PROFILE_CONFIG_PATH}/{str(id)}' |