sp-hydra-veil-core/core/models/BaseProfile.py

226 lines
6.8 KiB
Python

from abc import ABC, abstractmethod
from core.Constants import Constants
from core.Helpers import write_atomically
from core.models.Location import Location
from core.models.Subscription import Subscription
from core.models.session.ApplicationVersion import ApplicationVersion
from dataclasses import dataclass, field, asdict
from dataclasses_json import config, Exclude, dataclass_json
from pathlib import Path
from typing import Optional, Self
import json
import os
import re
import shutil
import tempfile
@dataclass_json
@dataclass
class BaseProfile(ABC):
id: int = field(
metadata=config(exclude=Exclude.ALWAYS)
)
name: str
subscription: Optional[Subscription]
location: Optional[Location]
@abstractmethod
def get_wireguard_configuration_path(self):
pass
@abstractmethod
def has_wireguard_configuration(self):
pass
def get_config_path(self):
return BaseProfile.__get_config_path(self.id)
def get_data_path(self):
return BaseProfile.__get_data_path(self.id)
def has_subscription(self):
return self.subscription is not None
def has_location(self):
return self.location is not None
def is_session_profile(self):
return type(self).__name__ == 'SessionProfile'
def is_system_profile(self):
return type(self).__name__ == 'SystemProfile'
def save(self: Self):
config_file_contents = f'{self.to_json(indent=4)}\n'
os.makedirs(self.get_config_path(), exist_ok=True)
os.makedirs(self.get_data_path(), exist_ok=True)
config_file_path = f'{self.get_config_path()}/config.json'
write_atomically(config_file_path, config_file_contents)
def delete_data(self):
shutil.rmtree(self.get_data_path(), ignore_errors=True)
def delete(self):
shutil.rmtree(self.get_config_path(), ignore_errors=True)
self.delete_data()
def get_wireguard_configuration_metadata(self, key):
configuration = self.get_wireguard_configuration()
if configuration is not None:
for line in configuration.splitlines():
match = re.match(r'^# {} = (.*)$'.format(re.escape(key)), line)
if match:
return re.sub(r'[^a-zA-Z0-9+=\-_ /]', '', match.group(1).strip())
return None
def get_wireguard_public_keys(self):
import configparser
wireguard_public_keys = set()
configuration = self.get_wireguard_configuration()
parsed_configuration = configparser.ConfigParser()
if configuration is not None:
parsed_configuration.read_string(configuration)
for section in parsed_configuration.sections():
if parsed_configuration.has_option(section, 'PublicKey'):
wireguard_public_keys.add(parsed_configuration.get(section, 'PublicKey'))
return tuple(wireguard_public_keys)
def get_wireguard_configuration(self):
try:
with open(self.get_wireguard_configuration_path(), 'r') as file:
return file.read()
except (FileNotFoundError, PermissionError):
return None
def address_security_incident(self):
if self.has_wireguard_configuration():
wireguard_configuration_path = Path(self.get_wireguard_configuration_path())
incident_data_path = Path(Constants.HV_INCIDENT_DATA_HOME)
incident_data_path.mkdir(parents=True, exist_ok=True)
incident_path = Path(tempfile.mkdtemp(dir=incident_data_path, prefix=''))
incident_wireguard_configuration_path = f'{incident_path}/{wireguard_configuration_path.name}'
try:
shutil.copy2(wireguard_configuration_path, incident_wireguard_configuration_path)
os.chmod(incident_wireguard_configuration_path, 0o644)
except (FileNotFoundError, PermissionError):
if incident_path.is_dir():
incident_path_contents = incident_path.iterdir()
if not any(incident_path_contents):
incident_path.rmdir()
def _get_dirty_keys(self: Self):
reference = BaseProfile.find_by_id(self.id)
if type(reference) != type(self):
return list(self.__dataclass_fields__.keys())
return list([key for key, value in asdict(self).items() if value != asdict(reference).get(key)])
@staticmethod
def find_by_id(id: int):
try:
config_file_contents = open(f'{BaseProfile.__get_config_path(id)}/config.json', 'r').read()
except FileNotFoundError:
return None
try:
profile = json.loads(config_file_contents)
except ValueError:
return None
profile['id'] = id
if profile['location'] is not None:
location = Location.find(profile['location']['country_code'] or None, profile['location']['code'] or None)
if location is not None:
if profile['location'].get('time_zone') is not None:
location.time_zone = profile['location']['time_zone']
profile['location'] = location
if 'application_version' in profile:
if profile['application_version'] is not None:
application_version = ApplicationVersion.find(profile['application_version']['application_code'] or None, profile['application_version']['version_number'] or None)
if application_version is not None:
profile['application_version'] = application_version
from core.models.session.SessionProfile import SessionProfile
# noinspection PyUnresolvedReferences
profile = SessionProfile.from_dict(profile)
else:
from core.models.system.SystemProfile import SystemProfile
# noinspection PyUnresolvedReferences
profile = SystemProfile.from_dict(profile)
return profile
@staticmethod
def exists(id: int):
return re.match(r'^\d{1,2}$', str(id)) and os.path.isfile(f'{BaseProfile.__get_config_path(id)}/config.json')
@staticmethod
def all():
profiles = {}
for directory_entry in os.listdir(Constants.HV_PROFILE_CONFIG_HOME):
try:
id = int(directory_entry)
except ValueError:
continue
if BaseProfile.exists(id):
profile = BaseProfile.find_by_id(id)
if profile is not None:
profiles[id] = profile
return dict(sorted(profiles.items()))
@staticmethod
def __get_config_path(id: int):
return f'{Constants.HV_PROFILE_CONFIG_HOME}/{str(id)}'
@staticmethod
def __get_data_path(id: int):
return f'{Constants.HV_PROFILE_DATA_HOME}/{str(id)}'