from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError from essentials.Constants import Constants from essentials.Errors import TorServiceInitializationError from essentials.observers.ConnectionObserver import ConnectionObserver from pathlib import Path from typing import Optional import os import psutil import re import signal import stem import stem.control import stem.process class TorModule: def __init__(self, state_home: str): self.state_home = state_home self.bootstrap_timeout = Constants.TOR_BOOTSTRAP_TIMEOUT self.control_socket_path = f'{self.state_home}/{Constants.TOR_CONTROL_SOCKET_FILENAME}' self.process_identifier_path = f'{self.state_home}/{Constants.TOR_PROCESS_IDENTIFIER_FILENAME}' self.instance_lock_path = f'{self.state_home}/{Constants.TOR_INSTANCE_LOCK_FILENAME}' def start_service(self, connection_observer: Optional[ConnectionObserver] = None): Path(self.state_home).mkdir(mode=0o700, parents=True, exist_ok=True) self.stop_service() if connection_observer is not None: connection_observer.notify('tor_bootstrapping') with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit( stem.process.launch_tor_with_config, config={ 'DataDirectory': self.state_home, 'ControlSocket': self.control_socket_path, 'PIDFile': self.process_identifier_path, 'SocksPort': '0' }, init_msg_handler=lambda contents: TorModule.__on_initialization_message(contents, connection_observer) ) try: future.result(timeout=Constants.TOR_BOOTSTRAP_TIMEOUT) except FutureTimeoutError: self.stop_service() raise TorServiceInitializationError('The dedicated Tor service could not be initialized.') if connection_observer is not None: connection_observer.notify('tor_bootstrapped') try: controller = stem.control.Controller.from_socket_file(self.control_socket_path) controller.authenticate() except (FileNotFoundError, stem.SocketError, TypeError, IndexError): self.stop_service() raise TorServiceInitializationError('The dedicated Tor service could not be initialized.') def stop_service(self): control_socket_file = Path(self.control_socket_path) process_identifier_file = Path(self.process_identifier_path) instance_lock_file = Path(self.instance_lock_path) try: process_identifier = int(process_identifier_file.read_text().strip()) except (OSError, ValueError): process_identifier = None if process_identifier is not None: try: os.kill(process_identifier, 0) os.kill(process_identifier, signal.SIGTERM) except ProcessLookupError: pass process = psutil.Process(process_identifier) if process.is_running(): process.terminate() except psutil.NoSuchProcess: pass control_socket_file.unlink(missing_ok=True) process_identifier_file.unlink(missing_ok=True) instance_lock_file.unlink(missing_ok=True) def create_session(self, port_number: int, connection_observer: Optional[ConnectionObserver] = None): try: controller = stem.control.Controller.from_socket_file(self.control_socket_path) controller.authenticate() except (FileNotFoundError, stem.SocketError, TypeError, IndexError): self.start_service(connection_observer=connection_observer) controller = stem.control.Controller.from_socket_file(self.control_socket_path) controller.authenticate() socks_port_numbers = [str(port_number) for port_number in controller.get_ports('socks')] socks_port_numbers.append(str(port_number)) controller.set_conf('SocksPort', socks_port_numbers) def destroy_session(self, port_number: int): try: controller = stem.control.Controller.from_socket_file(self.control_socket_path) controller.authenticate() socks_port_numbers = [str(port_number) for port_number in controller.get_ports('socks')] if len(socks_port_numbers) > 1: socks_port_numbers = [socks_port_number for socks_port_number in socks_port_numbers if socks_port_number != port_number] controller.set_conf('SocksPort', socks_port_numbers) else: controller.set_conf('SocksPort', '0') except (FileNotFoundError, stem.SocketError, TypeError, IndexError): pass @staticmethod def __on_initialization_message(contents, connection_observer: Optional[ConnectionObserver] = None): if connection_observer is not None: if 'Bootstrapped ' in contents: progress = (m := re.search(r' (\d{1,3})% ', contents)) and int(m.group(1)) connection_observer.notify('tor_bootstrap_progressing', None, dict( progress=progress ))