sp-essentials/essentials/modules/TorModule.py
2026-03-16 16:29:20 +01:00

153 lines
5.2 KiB
Python

from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
from essentials.Constants import Constants
from essentials.Errors import TorServiceInitializationError
from essentials.observers.ConnectionObserver import ConnectionObserver
from pathlib import Path
from typing import Optional
import os
import psutil
import re
import signal
import stem
import stem.control
import stem.process
class TorModule:
def __init__(self, state_home: str):
self.state_home = state_home
self.bootstrap_timeout = Constants.TOR_BOOTSTRAP_TIMEOUT
self.control_socket_path = f'{self.state_home}/{Constants.TOR_CONTROL_SOCKET_FILENAME}'
self.process_identifier_path = f'{self.state_home}/{Constants.TOR_PROCESS_IDENTIFIER_FILENAME}'
self.instance_lock_path = f'{self.state_home}/{Constants.TOR_INSTANCE_LOCK_FILENAME}'
def start_service(self, connection_observer: Optional[ConnectionObserver] = None):
Path(self.state_home).mkdir(mode=0o700, parents=True, exist_ok=True)
self.stop_service()
if connection_observer is not None:
connection_observer.notify('tor_bootstrapping')
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(
stem.process.launch_tor_with_config,
config={
'DataDirectory': self.state_home,
'ControlSocket': self.control_socket_path,
'PIDFile': self.process_identifier_path,
'SocksPort': '0'
},
init_msg_handler=lambda contents: TorModule.__on_initialization_message(contents, connection_observer)
)
try:
future.result(timeout=Constants.TOR_BOOTSTRAP_TIMEOUT)
except FutureTimeoutError:
self.stop_service()
raise TorServiceInitializationError('The dedicated Tor service could not be initialized.')
if connection_observer is not None:
connection_observer.notify('tor_bootstrapped')
try:
controller = stem.control.Controller.from_socket_file(self.control_socket_path)
controller.authenticate()
except (FileNotFoundError, stem.SocketError, TypeError, IndexError):
self.stop_service()
raise TorServiceInitializationError('The dedicated Tor service could not be initialized.')
def stop_service(self):
control_socket_file = Path(self.control_socket_path)
process_identifier_file = Path(self.process_identifier_path)
instance_lock_file = Path(self.instance_lock_path)
try:
process_identifier = int(process_identifier_file.read_text().strip())
except (OSError, ValueError):
process_identifier = None
if process_identifier is not None:
try:
os.kill(process_identifier, 0)
os.kill(process_identifier, signal.SIGTERM)
except ProcessLookupError:
pass
process = psutil.Process(process_identifier)
if process.is_running():
process.terminate()
except psutil.NoSuchProcess:
pass
control_socket_file.unlink(missing_ok=True)
process_identifier_file.unlink(missing_ok=True)
instance_lock_file.unlink(missing_ok=True)
def create_session(self, port_number: int, connection_observer: Optional[ConnectionObserver] = None):
try:
controller = stem.control.Controller.from_socket_file(self.control_socket_path)
controller.authenticate()
except (FileNotFoundError, stem.SocketError, TypeError, IndexError):
self.start_service(connection_observer=connection_observer)
controller = stem.control.Controller.from_socket_file(self.control_socket_path)
controller.authenticate()
socks_port_numbers = [str(port_number) for port_number in controller.get_ports('socks')]
socks_port_numbers.append(str(port_number))
controller.set_conf('SocksPort', socks_port_numbers)
def destroy_session(self, port_number: int):
try:
controller = stem.control.Controller.from_socket_file(self.control_socket_path)
controller.authenticate()
socks_port_numbers = [str(port_number) for port_number in controller.get_ports('socks')]
if len(socks_port_numbers) > 1:
socks_port_numbers = [socks_port_number for socks_port_number in socks_port_numbers if socks_port_number != port_number]
controller.set_conf('SocksPort', socks_port_numbers)
else:
controller.set_conf('SocksPort', '0')
except (FileNotFoundError, stem.SocketError, TypeError, IndexError):
pass
@staticmethod
def __on_initialization_message(contents, connection_observer: Optional[ConnectionObserver] = None):
if connection_observer is not None:
if 'Bootstrapped ' in contents:
progress = (m := re.search(r' (\d{1,3})% ', contents)) and int(m.group(1))
connection_observer.notify('tor_bootstrap_progressing', None, dict(
progress=progress
))