sp-hydra-veil-cli/cli/commands/profile.py
2026-05-30 08:25:00 -05:00

201 lines
No EOL
9.9 KiB
Python

from core.Errors import MissingSubscriptionError, InvalidSubscriptionError, ConnectionUnprotectedError, EndpointVerificationError, ProfileStateConflictError
from core.controllers.ApplicationVersionController import ApplicationVersionController
from core.controllers.InvoiceController import InvoiceController
from core.controllers.LocationController import LocationController
from core.controllers.ProfileController import ProfileController
from core.controllers.SubscriptionController import SubscriptionController
from core.controllers.SubscriptionPlanController import SubscriptionPlanController
from core.models.session.SessionConnection import SessionConnection
from core.models.session.SessionProfile import SessionProfile
from core.models.system.SystemConnection import SystemConnection
from core.models.system.SystemProfile import SystemProfile
from cli.helpers import sanitize_profile, parse_application_argument, parse_location_argument
from cli.observers import profile_observer, application_version_observer, connection_observer, invoice_observer
import pprint
NAME = 'profile'
def register(subparsers):
parser = subparsers.add_parser(NAME)
subs = parser.add_subparsers(title='subcommands', dest='subcommand')
base = _base_parser()
pristine = _pristine_parser()
connection_protection = _connection_protection_parser()
endpoint_verification = _endpoint_verification_parser()
profile_state_protection = _profile_state_protection_parser()
subs.add_parser('list')
subs.add_parser('show', parents=[base])
subs.add_parser('destroy', parents=[base])
subs.add_parser('enable', parents=[base, pristine, connection_protection, endpoint_verification, profile_state_protection])
subs.add_parser('disable', parents=[base, connection_protection])
create_parser = subs.add_parser('create')
create_subs = create_parser.add_subparsers(title='profile_types', dest='profile_type')
session_parser = create_subs.add_parser('session', parents=[base])
session_parser.add_argument('--name', '-n', default='')
session_parser.add_argument('--location', '-l', default='')
session_parser.add_argument('--application', '-a', required=True)
session_parser.add_argument('--connection', '-c', dest='connection_type', choices=['system', 'tor', 'wireguard'], default='system')
session_parser.add_argument('--mask-connection', '-m', action='store_true')
session_parser.add_argument('--resolution', '-r', default='1280x720')
system_parser = create_subs.add_parser('system', parents=[base])
system_parser.add_argument('--name', '-n', default='')
system_parser.add_argument('--location', '-l', default='')
system_parser.add_argument('--connection', '-c', dest='connection_type', choices=['wireguard'], default='wireguard')
return parser
def handle(arguments, main_parser):
if arguments.subcommand is None:
main_parser.parse_args(['profile', '--help'])
return
ignore = _build_ignore(arguments)
if arguments.subcommand == 'list':
profiles = ProfileController.get_all()
for key, value in profiles.items():
profiles[key] = sanitize_profile(value)
pprint.pp(profiles)
elif arguments.subcommand == 'show':
pprint.pp(ProfileController.get(arguments.id))
elif arguments.subcommand == 'create':
location_details = parse_location_argument(arguments.location)
location = LocationController.get(location_details.get('country_code'), location_details.get('code'))
if location is None:
main_parser.error('the following argument should be a valid reference: --location/-l')
if arguments.profile_type == 'session':
app_details = parse_application_argument(arguments.application)
app_version = ApplicationVersionController.get(app_details.get('application_code'), app_details.get('version_number'))
if app_version is None:
main_parser.error('the following argument should be a valid reference: --application/-a')
connection = SessionConnection(arguments.connection_type, arguments.mask_connection)
profile = SessionProfile(arguments.id, arguments.name, None, location, arguments.resolution, app_version, connection)
else:
connection = SystemConnection(arguments.connection_type)
profile = SystemProfile(arguments.id, arguments.name, None, location, connection)
ProfileController.create(profile, profile_observer=profile_observer)
elif arguments.subcommand == 'destroy':
profile = ProfileController.get(arguments.id)
if profile is not None:
ProfileController.destroy(profile, profile_observer=profile_observer)
else:
main_parser.error('the following argument should be a valid reference: --id/-i')
elif arguments.subcommand == 'enable':
profile = ProfileController.get(arguments.id)
if profile is None:
main_parser.error('the following argument should be a valid reference: --id/-i')
try:
ProfileController.enable(profile, ignore=ignore, pristine=arguments.pristine, asynchronous=True, profile_observer=profile_observer, application_version_observer=application_version_observer, connection_observer=connection_observer)
except (InvalidSubscriptionError, MissingSubscriptionError) as exception:
_handle_subscription_error(exception, profile, ignore, arguments, main_parser)
elif arguments.subcommand == 'disable':
profile = ProfileController.get(arguments.id)
if profile is not None:
ProfileController.disable(profile, ignore=ignore, profile_observer=profile_observer)
else:
main_parser.error('the following argument should be a valid reference: --id/-i')
def _handle_subscription_error(exception, profile, ignore, arguments, main_parser):
if type(exception).__name__ == 'InvalidSubscriptionError':
print('The profile\'s subscription appears to be invalid.\n')
elif type(exception).__name__ == 'MissingSubscriptionError':
print('The profile is not tied to a subscription.\n')
manage_subscription_input = None
while manage_subscription_input not in ('1', '2', '3', ''):
print('Please select from the following:\n')
print(' 1) Request new subscription')
print(' 2) Enter billing code')
print('\n 3) Exit')
manage_subscription_input = input('\nEnter your choice [1]: ')
if manage_subscription_input in ('1', ''):
print('\nCreating subscription...\n')
subscription_plan = SubscriptionPlanController.get(profile.connection, 720)
if subscription_plan is None:
raise RuntimeError('No compatible subscription plan was found. Please contact support.')
potential_subscription = SubscriptionController.create(subscription_plan, profile, connection_observer=connection_observer)
if potential_subscription is None:
raise RuntimeError('The subscription could not be created. Please try again later.')
ProfileController.attach_subscription(profile, potential_subscription)
subscription = InvoiceController.handle_payment(potential_subscription.billing_code, invoice_observer=invoice_observer, connection_observer=connection_observer)
if subscription is None:
raise RuntimeError('The subscription could not be activated. Please try again later.')
ProfileController.attach_subscription(profile, subscription)
ProfileController.enable(profile, ignore=ignore, pristine=arguments.pristine, asynchronous=True, profile_observer=profile_observer, application_version_observer=application_version_observer, connection_observer=connection_observer)
elif manage_subscription_input == '2':
billing_code = input('\nEnter your billing code: ')
print()
subscription = SubscriptionController.get(billing_code, connection_observer=connection_observer)
if subscription is not None:
ProfileController.attach_subscription(profile, subscription)
ProfileController.enable(profile, ignore=ignore, pristine=arguments.pristine, asynchronous=True, profile_observer=profile_observer, application_version_observer=application_version_observer, connection_observer=connection_observer)
else:
print('\nThe billing code appears to be invalid.\n')
manage_subscription_input = None
elif manage_subscription_input == '3':
pass
else:
print('\nInput appears to be invalid. Please try again.\n')
def _build_ignore(arguments):
ignore = []
if getattr(arguments, 'without_connection_protection', False):
ignore.append(ConnectionUnprotectedError)
if getattr(arguments, 'without_endpoint_verification', False):
ignore.append(EndpointVerificationError)
if getattr(arguments, 'without_profile_state_protection', False):
ignore.append(ProfileStateConflictError)
return tuple(ignore)
def _base_parser():
import argparse
p = argparse.ArgumentParser(add_help=False)
p.add_argument('--id', '-i', type=int, required=True)
return p
def _pristine_parser():
import argparse
p = argparse.ArgumentParser(add_help=False)
p.add_argument('--pristine', '-p', action='store_true')
return p
def _connection_protection_parser():
import argparse
p = argparse.ArgumentParser(add_help=False)
p.add_argument('--without-connection-protection', action='store_true')
return p
def _endpoint_verification_parser():
import argparse
p = argparse.ArgumentParser(add_help=False)
p.add_argument('--without-endpoint-verification', action='store_true')
return p
def _profile_state_protection_parser():
import argparse
p = argparse.ArgumentParser(add_help=False)
p.add_argument('--without-profile-state-protection', action='store_true')
return p