Introduced Anonymous Ticket Billing Options

This commit is contained in:
SimplifiedPrivacy 2026-05-04 15:33:50 -04:00
parent 2a38d0b0d9
commit 75d026651a
66 changed files with 4008 additions and 1 deletions

1
.gitignore vendored
View file

@ -1,3 +1,4 @@
prototype_client.py
.idea .idea
.venv .venv
__pycache__ __pycache__

View file

@ -5,6 +5,10 @@ import os
@dataclass(frozen=True) @dataclass(frozen=True)
class Constants: class Constants:
# ticketing group:
TICKET_API_BASE_URL: Final[str] = os.environ.get(
"TICKET_API_BASE_URL", "https://ticket.hydraveil.net"
)
SP_API_BASE_URL: Final[str] = os.environ.get('SP_API_BASE_URL', 'https://api.hydraveil.net/api/v1') SP_API_BASE_URL: Final[str] = os.environ.get('SP_API_BASE_URL', 'https://api.hydraveil.net/api/v1')
PING_URL: Final[str] = os.environ.get('PING_URL', 'https://api.hydraveil.net/api/v1/health') PING_URL: Final[str] = os.environ.get('PING_URL', 'https://api.hydraveil.net/api/v1/health')
@ -36,6 +40,10 @@ class Constants:
HV_PROFILE_CONFIG_HOME: Final[str] = f'{HV_CONFIG_HOME}/profiles' HV_PROFILE_CONFIG_HOME: Final[str] = f'{HV_CONFIG_HOME}/profiles'
HV_PROFILE_DATA_HOME: Final[str] = f'{HV_DATA_HOME}/profiles' HV_PROFILE_DATA_HOME: Final[str] = f'{HV_DATA_HOME}/profiles'
# ticketing group:
HV_TICKETING_CONFIG_HOME: Final[str] = f"{HV_CONFIG_HOME}/ticketing"
HV_TICKETING_DATA_HOME: Final[str] = f"{HV_DATA_HOME}/ticket_data"
HV_APPLICATION_DATA_HOME: Final[str] = f'{HV_DATA_HOME}/applications' HV_APPLICATION_DATA_HOME: Final[str] = f'{HV_DATA_HOME}/applications'
HV_INCIDENT_DATA_HOME: Final[str] = f'{HV_DATA_HOME}/incidents' HV_INCIDENT_DATA_HOME: Final[str] = f'{HV_DATA_HOME}/incidents'
HV_RUNTIME_DATA_HOME: Final[str] = f'{HV_DATA_HOME}/runtime' HV_RUNTIME_DATA_HOME: Final[str] = f'{HV_DATA_HOME}/runtime'

View file

@ -0,0 +1,52 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.essentials.observers.ConnectionObserver import ConnectionObserver
from core.observers.TicketObserver import TicketObserver
#
from core.services.prepare_tickets.get_public_key_by_config import (
get_public_key_from_LOCAL_files_only,
)
from core.services.failed_verification.is_the_key_to_blame import is_the_key_to_blame
from core.services.failed_verification.prep_with_previously_saved_blind_sigs import (
prep_with_previously_saved_blind_sigs,
)
def evaluate_if_its_the_key(
failed_validations: list,
ticket_observer: TicketObserver,
connection_observer: ConnectionObserver,
) -> dict:
if (
failed_validations == []
or failed_validations == None
or failed_validations == False
):
return {"valid": False, "message": "no_failed_verifications"}
# which key was originally being used:
local_results = get_public_key_from_LOCAL_files_only(connection_observer)
if "public_key" not in local_results:
return local_results
old_public_key = local_results["public_key"]
did_it_help = is_the_key_to_blame(
old_public_key, failed_validations, ticket_observer, connection_observer
)
return did_it_help
# This is technically a debug function if the orignal flow has an error in validation of blind sigs.
# Then this exists so they can come back and try again with those blind sigs, that they have saved.:
def prepare_tickets_with_saved_blind_sigs(
ticket_observer: TicketObserver, connection_observer: ConnectionObserver
) -> dict:
results = prep_with_previously_saved_blind_sigs(
ticket_observer, connection_observer
)
return results

View file

@ -0,0 +1,180 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.essentials.observers.ConnectionObserver import ConnectionObserver
from core.observers.TicketObserver import TicketObserver
#
from core.models.invoice.TicketInvoice import TicketInvoice
from core.services.prepare_tickets.get_pub_key import get_pub_key
from core.observers.BaseObserver import BaseObserver
from core.services.payment_phase.save_and_send_intitial_billing import save_and_send_intitial_billing
from core.services.payment_phase.check_if_paid import _check_if_paid
from core.services.prepare_tickets.ticket_tracker import does_ticket_tracker_exist
from core.services.networking.send_data_to_server import send_data_to_server
from core.services.networking.make_url import make_url
from core.utils.confirm_its_a_valid_key_choice import confirm_its_a_valid_key_choice
from core.services.helpers.valid_profile_quantity import valid_profile_quantity
from core.errors.exceptions import *
from core.errors.logger import logger
from core.controllers.tickets.TicketSyncController import sync_ticket_prices
"""
Inputs: Which plan (key), which crypto, and how many profiles
Outputs: a temp billing code & crypto address. (or "error")
"""
def initiate_payment(
how_many_profiles: int,
which_key: str,
which_cryptocurrency: str,
ticket_observer: TicketObserver,
connection_observer: ConnectionObserver,
bypass_existing: bool,
) -> TicketInvoice:
###############
invoice_data_object = TicketInvoice()
try:
if bypass_existing == False:
tickets_exist_already, path = does_ticket_tracker_exist()
logger.debug(f"tickets_exist_already is {tickets_exist_already}")
if tickets_exist_already:
invoice_data_object.add_error_code("already_exists")
return invoice_data_object
rejected_choices = [None, "", False]
if how_many_profiles in rejected_choices:
notification = "Missing profile quantity, to initiate payment"
ticket_observer.notify("failed_input", subject=notification)
invoice_data_object.add_error_code("invalid_quantity")
return invoice_data_object
if not valid_profile_quantity(how_many_profiles):
notification = "Invalid profile quantity"
ticket_observer.notify("failed_input", subject=notification)
invoice_data_object.add_error_code("invalid_quantity")
return invoice_data_object
if which_key in rejected_choices:
notification = "Missing key plan, to initiate payment"
ticket_observer.notify("failed_input", subject=notification)
invoice_data_object.add_error_code("no_keyplan")
return invoice_data_object
# confirm the key choice is among the choices from their sync file,
# and if not, then sync again, and try the results from that new file,
is_valid_key = confirm_its_a_valid_key_choice(which_key, ticket_observer)
if not is_valid_key:
sync_results = sync_ticket_prices(ticket_observer, connection_observer)
second_try_to_match = confirm_its_a_valid_key_choice(
which_key, ticket_observer
)
if not second_try_to_match:
invoice_data_object.add_error_code("invalid_key")
return invoice_data_object
# get & save the public key:
public_key_results = get_pub_key(which_key, connection_observer, "local")
if isinstance(public_key_results, dict):
status = public_key_results.get("valid", False)
if status == False:
message = public_key_results.get(
"message", "Please fix before you continue"
)
error_code = public_key_results.get("error_code", "No error_code")
logger.debug(f"error_code: {error_code}")
invoice_data_object.add_error_code(error_code)
notification = f"Connection Issues or Invalid Key chosen. {message}"
ticket_observer.notify("failed_input", subject=notification)
return invoice_data_object
else:
notification = f"Invalid Key chosen or the server is down for that key."
ticket_observer.notify("failed_input", subject=notification)
invoice_data_object.add_error_code("no_pub_key")
return invoice_data_object
# In this case, the controller is going to send the whole JSON payload to the service,
# instead of passing 4 values seperately.
payload = {
"which_key": which_key,
"payment_type": "crypto",
"which_cryptocurrency": which_cryptocurrency,
"how_many_profiles": how_many_profiles,
}
# controller sends to the service:
result = save_and_send_intitial_billing(
payload, connection_observer, invoice_data_object
)
if result == False or result == None:
invoice_data_object.add_error_code("failed_save")
return invoice_data_object
except InvalidData as e:
error_msg = "Invalid Data."
ticket_observer.notify("failed_input", subject=error_msg)
invoice_data_object.add_error_code("invalid_data")
return invoice_data_object
except NetworkingError as e:
error_msg = f"NetworkingError: {e}"
logger.error(error_msg, exc_info=True)
ticket_observer.notify("connection_error", subject=error_msg)
invoice_data_object.add_error_code("connection_error")
return invoice_data_object
except ServerSideError as e:
error_msg = f"ServerSideError: {e}"
logger.error(error_msg, exc_info=True)
ticket_observer.notify("failed_output", subject=error_msg)
invoice_data_object.add_error_code("server_error")
return invoice_data_object
except Exception as e:
error_msg = f"Error: {e}"
logger.error(error_msg, exc_info=True)
ticket_observer.notify("unknown_error", subject=error_msg)
invoice_data_object.add_error_code("unknown_error")
return invoice_data_object
###############
def check_if_paid(
temp_billing_code: str,
ticket_observer: TicketObserver,
connection_observer: ConnectionObserver,
) -> dict:
rejected_reasons = [None, "", False]
if temp_billing_code in rejected_reasons:
error_msg = "Invalid Temp Billing Code"
logger.error(f"{error_msg} inside the check_if_paid function", exc_info=True)
ticket_observer.notify("failed_input", subject=error_msg)
return {"valid": False, "error_code": "rejected"}
else:
# prep the JSON payload:
payload = {"temp_billing_code": temp_billing_code}
# prep endpoint:
which_endpoint = "check_paid"
url = make_url(which_endpoint)
# literally send:
reply = send_data_to_server(payload, url, connection_observer)
logger.debug(f"inside ticketpay controller the reply is {reply}")
return reply

View file

@ -0,0 +1,82 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.essentials.observers.ConnectionObserver import ConnectionObserver
from core.observers.TicketObserver import TicketObserver
from core.services.prepare_tickets.ticket_prep_orchestrator import ticket_prep_orchestrator
from core.services.prepare_tickets.make_sure_pub_key_exists import make_sure_pub_key_exists
from core.services.helpers.get_how_many_profiles_were_ordered import (
get_how_many_profiles_were_ordered,
)
from core.observers.BaseObserver import BaseObserver
"""
Goal:
this is the controller for the view to speak with the high level "ticket_prep_orchestrator" service function,
which makes commitments, sends to the server, and then unblinds them, and preps the ticket JSONs.
Requires:
a temp billing ID
having paid already
Doesn't Require:
If it doesn't have the public key, then it checks the config file.
If it doesn't have a config file, then it uses the temp billing id to get the public key from the server.
"""
def prepare_tickets(
how_many_profiles: int,
ticket_observer: TicketObserver,
connection_observer: ConnectionObserver,
) -> dict:
# make sure it's a number:
if not isinstance(how_many_profiles, int):
ticket_observer.notify("failed_input", None)
return {"valid": False, "error_code": "failed_input"}
# make sure that "how_many_profiles" is actually the number of profiles ordered
# (which is based on locally saved data from the previous step):
how_many_ordered = get_how_many_profiles_were_ordered()
if how_many_profiles != how_many_ordered:
ticket_observer.notify("failed_input", None)
return {"valid": False, "error_code": "failed_input"}
# make sure this guy has a public key to verify against:
does_he_have_public_key = make_sure_pub_key_exists(connection_observer)
if does_he_have_public_key == False:
ticket_observer.notify("failed_input", None)
return {"valid": False, "error_code": "failed_input"}
# ok now we have the pre-reqs, let's use this high level orchestrator,
prep_results = ticket_prep_orchestrator(
how_many_profiles, ticket_observer, connection_observer
)
# rest of this function is evaluating the results:
if prep_results == False or prep_results == None:
return {"valid": False, "message": "error"}
if "valid" not in prep_results:
return {"valid": False, "message": "error"}
if prep_results["valid"] == True:
notification = f"Done! All Tickets Ready!"
ticket_observer.notify("preparing", subject=notification)
return prep_results
if "how_many_failed" in prep_results:
how_many_failed = prep_results.get("how_many_failed", 0)
failed_validations = prep_results.get("failed_validations", None)
if failed_validations:
notification = f"Error with Ticket Preparation or Verification!"
ticket_observer.notify("preparing", subject=notification)
return prep_results
notification = f"Error with Ticket Preparation or Verification!"
ticket_observer.notify("preparing", subject=notification)
return prep_results

View file

@ -0,0 +1,44 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.essentials.observers.ConnectionObserver import ConnectionObserver
from core.observers.TicketObserver import TicketObserver
from core.Constants import Constants
from core.observers.BaseObserver import BaseObserver
from core.services.networking.get_data_from_server import get_data_from_server
from core.services.helpers.save_sync_results import save_sync_results
from core.errors.logger import logger
def sync_ticket_prices(
ticket_observer: TicketObserver, connection_observer: ConnectionObserver
) -> dict:
notification = f"Connecting to get Ticket Pricing..."
ticket_observer.notify("connecting", subject=notification)
rejected_list = [None, False, ""]
base_url = Constants.TICKET_API_BASE_URL
if base_url in rejected_list:
notification = "Base URL is Empty, so it can't fetch prices.."
ticket_observer.notify("failed_input", subject=notification)
return {"valid": False, "error_code": "invalid_url"}
url = f"{base_url}/sync"
try:
sync_results = get_data_from_server(url, connection_observer)
if sync_results in rejected_list:
return {"valid": False, "error_code": "sync_failed"}
logger.debug(f"Inside the sync controller, sync_results is: {sync_results}")
except:
return {"valid": False, "error_code": "sync_failed"}
did_it_save = save_sync_results(sync_results)
logger.debug(f"Inside the sync controller, did_it_save is {did_it_save}")
return sync_results

View file

@ -0,0 +1,187 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.observers.TicketObserver import TicketObserver
from core.essentials.observers.ConnectionObserver import ConnectionObserver
from core.Constants import Constants
from core.observers.BaseObserver import BaseObserver
from core.services.using_tickets.use_ticket_orchestrator import use_ticket_orchestrator
from core.services.prepare_tickets.ticket_tracker import (
get_data_for_a_single_ticket,
does_ticket_tracker_exist,
)
from core.services.helpers.does_ticket_file_exist import does_ticket_file_exist
from core.services.helpers.get_value_from_config import get_value_from_config
from core.utils.basic_operations.does_file_exist import does_file_exist
from core.utils.basic_operations.write_or_read_from_json import update_json
from core.services.prepare_tickets.ticket_tracker import get_all_unused_tickets
import random
def modify_random_tickets_setting(
on_or_off: str,
ticket_observer: TicketObserver,
) -> dict:
choices = ["on", "off"]
if on_or_off not in choices:
ticket_observer.notify("failed_input", None)
return {"valid": False, "message": f"Invalid choice for turning on or off"}
billing_folder = Constants.HV_TICKETING_CONFIG_HOME
filepath = f"{billing_folder}/billing_choices.json"
does_billing_config_exist = does_file_exist(filepath)
if does_billing_config_exist == False:
notification = "First setup the Tickets before picking use"
ticket_observer.notify("failed_input", subject=notification)
return {"valid": False, "message": notification}
else:
try:
if on_or_off == "on":
update_json(filepath, "use_random", True)
return {"valid": True}
elif on_or_off == "off":
update_json(filepath, "use_random", False)
return {"valid": True}
else:
ticket_observer.notify("failed_input", None)
return {
"valid": False,
"message": f"Invalid choice for turning on or off",
}
except:
notification = f"Error with modifying config file. Check {filepath}"
ticket_observer.notify("error", subject=notification)
return {"valid": False, "message": notification}
"""
# this has 3 possible results:
1) getting a random ticket number for 'which_ticket'
2) the user has the config set to manual, so 'which_ticket' is 'None' and error_msg to None
3) an error message on reading the config file (variable 'error_msg' is not 'none')
"""
def do_we_use_a_random_ticket(ticket_observer: TicketObserver) -> tuple:
# check if they have it on random mode:
config_data = get_value_from_config("use_random")
# if the 'value' key is in the config, that means it successfully read the config.
if "value" in config_data:
random_setting = config_data["value"]
# they want a random ticket
if random_setting == True:
data_results = pick_a_random_ticket(ticket_observer)
if "random_ticket" in data_results:
which_ticket = data_results["random_ticket"]
error_msg = None
return which_ticket, error_msg
else:
which_ticket = "error"
if "message" in data_results:
error_msg = data_results["message"]
else:
error_msg = (
"Missing or Invalid Data. Unable to get unused ticket list."
)
return which_ticket, error_msg
# if it read the config, but the value is false:
else:
which_ticket = None
error_msg = None
return which_ticket, error_msg
# this is a problem with reading the config itself:
else:
which_ticket = "error"
error_msg = "There is an error with the config file, or no config. Are you sure you have tickets?"
ticket_observer.notify("failed_input", subject=error_msg)
return which_ticket, error_msg
def get_unused_tickets(ticket_observer: TicketObserver) -> dict:
# does the file keeping track of ALL tickets exist:
does_the_file_exist, path_of_file = does_ticket_tracker_exist()
if does_the_file_exist == False:
error_msg = f"The ticket tracker organizer file does not exist. Check the folder {path_of_file}"
ticket_observer.notify("failed_input", subject=error_msg)
return {"valid": False, "message": error_msg}
# Use the Model:
unused_tickets = get_all_unused_tickets()
return unused_tickets
"""
use_ticket function requires:
-Knowing which ticket is being used.
-Knowing which location it's being assigned to.
-The ticket file completed.
-Prior registration on the server.
"""
def use_ticket(
which_ticket: int,
which_location: str,
ticket_observer: TicketObserver,
connection_observer: ConnectionObserver,
) -> dict:
which_ticket = str(which_ticket) # type: ignore
# does the ticket's file exist:
ticket_exists = does_ticket_file_exist(which_ticket)
if ticket_exists == False:
error_msg = f"The ticket file does not exist in the correct folder."
ticket_observer.notify("failed_input", subject=error_msg)
return {"valid": False, "message": error_msg}
# does the file keeping track of ALL tickets exist:
does_the_file_exist, path_of_file = does_ticket_tracker_exist()
if does_the_file_exist == False:
error_msg = f"The ticket tracker organizer file does not exist. Check the folder {path_of_file}"
ticket_observer.notify("failed_input", subject=error_msg)
return {"valid": False, "message": error_msg}
# is the ticket used?
try:
status, location, subscription = get_data_for_a_single_ticket(which_ticket)
if status == "used":
error_msg = f"Ticket is already tied to {location} with the subscription {subscription}"
ticket_observer.notify("failed_input", subject=error_msg)
return {"valid": False, "message": error_msg}
except:
error_msg = f"Your local ticket tracker has no value for ticket {which_ticket}"
return {"valid": False, "message": error_msg}
# the actual work here, everything else is just handling:
ticket_observer.notify("connecting", "Connecting..")
reply = use_ticket_orchestrator(which_ticket, which_location, connection_observer)
return reply
def pick_a_random_ticket(ticket_observer: TicketObserver) -> dict:
ticket_data = get_unused_tickets(ticket_observer)
if "valid" in ticket_data:
if ticket_data["valid"] == True:
list_of_unused_tickets = ticket_data["data"]
random_ticket = random.choice(list_of_unused_tickets)
return {"valid": True, "random_ticket": random_ticket}
else:
return ticket_data
else:
error_msg = "Missing or Invalid Data. Unable to get unused ticket list."
return {"valid": False, "message": error_msg}

30
core/errors/exceptions.py Normal file
View file

@ -0,0 +1,30 @@
class ApplicationError(Exception):
pass
class MathError(ApplicationError):
pass
class NetworkingError(ApplicationError):
pass
class CriticalFailure(ApplicationError):
pass
class InvalidData(ApplicationError):
pass
class ServerSideError(ApplicationError):
pass
class MissingData(ApplicationError):
pass
class FailedToSave(ApplicationError):
pass

View file

@ -0,0 +1,28 @@
def get_error_msg(error_code: str) -> str:
if error_code == "invalid_quantity":
error_msg = "To initiate a ticket payment, you must pick a valid profile quantity between 3 and 15."
elif error_code == "no_internet":
error_msg = (
"General Internet Connection Issues: You may not even have the internet."
)
elif error_code == "dns_issue":
error_msg = "There is a DNS issue with resolving the API domain."
elif error_code == "no_keyplan":
error_msg = "To initiate a ticket payment, you must pick a key plan."
elif error_code == "no_pub_key":
error_msg = "There is some issue with reaching the server to get a public key, and you don't have it locally either. Please wait a few minutes and try again."
elif error_code == "failed_save":
error_msg = "There was some issue with saving your temporary billing code. To get the code, please check the error logs and make sure this app is able to read and write to files on the system. Also check the permissions on the .config/hydra-veil folders"
elif error_code == "already_exists":
error_msg = "WARNING: You already have ticket data, Do you want to WIPE IT, and start over?"
else:
error_msg = "There was an error, but the underlying reason is complex. Please check the error logs for technical information"
return error_msg

39
core/errors/logger.py Normal file
View file

@ -0,0 +1,39 @@
from core.Constants import Constants
import logging
import os
import sys
data_folder = Constants.HV_TICKETING_DATA_HOME
folder_path = f"{data_folder}/logs"
os.makedirs(folder_path, exist_ok=True)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Check if debug mode is enabled
debug_mode = os.getenv("DEBUG", "").lower() in ("1", "true", "yes")
# Console handler: DEBUG messages to terminal (only if debug mode ON)
console_handler = logging.StreamHandler(sys.stdout)
console_level = logging.DEBUG if debug_mode else logging.WARNING
console_handler.setLevel(console_level)
console_formatter = logging.Formatter("%(message)s")
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
# File handler: ERROR messages to file (always on)
file_handler = logging.FileHandler(f"{folder_path}/errors.log")
file_handler.setLevel(logging.ERROR)
file_formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
# Function to toggle debug mode dynamically
def set_debug(enabled):
global debug_mode
debug_mode = enabled
console_handler.setLevel(logging.DEBUG if enabled else logging.WARNING)

6
core/models/Event.py Normal file
View file

@ -0,0 +1,6 @@
class Event:
def __init__(self, subject = None, meta = None):
self.subject = subject
self.meta = meta or {}

View file

@ -0,0 +1,24 @@
from pydantic import BaseModel
from core.errors.get_error_msg import get_error_msg
class TicketInvoice(BaseModel):
temp_billing_code: str | None = None
payment_type: str | None = None
selected_currency: str | None = None
due_amount: float | None = None
address: str | None = None
final_error_msg: str | None = None
error_code: str | None = None
errors: dict[str, str] = {}
is_valid: bool = True
def add_validation_error(self, field: str, message: str) -> None:
self.errors[field] = message
self.is_valid = False
def add_error_code(self, error_code: str) -> None:
self.error_code = error_code
self.final_error_msg = get_error_msg(error_code)
self.is_valid = False

View file

@ -0,0 +1,25 @@
from core.models.Event import Event
class BaseObserver:
def subscribe(self, topic, callback):
callbacks = getattr(self, f"on_{topic}", None)
if callbacks is None:
return
callbacks.append(callback)
def notify(self, topic, subject=None, meta=None):
callbacks = getattr(self, f"on_{topic}", None)
if callbacks is None:
return
event = Event(subject, meta)
for callback in callbacks:
callback(event)

View file

@ -0,0 +1,18 @@
from core.observers.BaseObserver import BaseObserver
class TicketObserver(BaseObserver):
def __init__(self):
self.on_connecting = []
self.on_sync_done = []
self.on_waiting = []
self.on_paid = []
self.on_ticket_ready = []
self.on_used = []
self.on_connection_error = []
self.on_failed_output = []
self.on_failed_input = []
self.on_unknown_error = []
self.on_preparing = []
self.on_error = []

View file

@ -0,0 +1,314 @@
from core.utils.save_data import save_data, save_data_as_json
from core.utils.get_data import get_data, get_data_as_int
from core.services.helpers.validate_number_format import validate_number_format
import os
import hashlib
import json
from py_ecc.optimized_bls12_381 import (
G1,
G2,
multiply,
add,
curve_order,
normalize as _normalize,
)
from py_ecc.optimized_bls12_381.optimized_curve import FQ, FQ2
# for the (optional) validity tests:
from py_ecc.optimized_bls12_381 import pairing
# errors:
from core.errors.logger import logger
import traceback
class TicketCustomer:
"""
This class is the basis for all the crypto operations,
because it has the conversions from projective/alpine, deserialize/serialize g2,
and easily transfer data from one function to another without having to convert it.
"""
def __init__(self):
self.blinded_data = None
self.signature = None
# reset exists to clear the self.ticket_data between each ticket.
def reset(self):
"""
Start over. Remove all instance attributes.
The most important thing to wipe from one ticket to another is the unblinded commitment,
which is stored in self.ticket_data.
"""
self.__dict__.clear()
def _hash_to_scalar(self, *parts):
"""Hash inputs to a scalar in the curve order."""
h = hashlib.sha256()
for p in parts:
h.update(p if isinstance(p, bytes) else str(p).encode())
return int.from_bytes(h.digest(), "big") % curve_order
def _fq_to_int(self, v) -> int:
"""Convert a single FQ element to int."""
return int(getattr(v, "n", v))
def _fq2_to_list(self, v: int) -> list:
"""Convert FQ2 element to [c0, c1] list."""
if hasattr(v, "coeffs"):
c0, c1 = v.coeffs
return [self._fq_to_int(c0), self._fq_to_int(c1)]
# Fallback for single FQ
return [self._fq_to_int(v), 0]
def _ensure_affine(self, pt: dict) -> dict:
"""
This solves the errors with affine (2-tuple) vs projective (3-tuple) form, using py_ecc.
This function checks if it's projective,
And then converts it to affine if it was projective to begin with.
"""
if len(pt) == 3:
return _normalize(pt)
return pt
def _serialize_point_g2(self, pt: dict) -> dict:
"""
This function is critical for converting values into a format,
that can be used outside this class or even python,
such as sending JSONs or saving to disk.
It Serializes a G2 point to JSON-compatible format.
G2 points use FQ2 coordinates (quadratic extension field).
"""
affine_pt = self._ensure_affine(pt)
x, y = affine_pt
return {"x": self._fq2_to_list(x), "y": self._fq2_to_list(y)}
def _ensure_projective(self, pt: tuple) -> tuple:
"""
Deals with errors related to returning only 2 values instead of 3,
This function checks if it's affine (2 values),
And then converts it to projective if it was affine to begin with.
"""
if len(pt) == 2:
x, y = pt
return (x, y, FQ2.one())
return pt
# If there's a "tuple unpack" error, this is used,
def _deserialize_point_g2(self, data: dict) -> tuple:
"""
Deserialize a G2 point from JSON format.
Returns the point in affine coordinates.
"""
x_list = data["x"]
y_list = data["y"]
x = FQ2([x_list[0], x_list[1]])
y = FQ2([y_list[0], y_list[1]])
return (x, y, FQ2.one()) # projective form
# Commitment has to be on G2, in order for the pairing equation to match during validation.
def make_unblinded_commitment(self, which_ticket: int) -> bool:
# we need a random number that's G2 (inside the curve),
# 1. Generate 32 random bytes
# 2. Convert bytes to integer
# 3. Constrain to the curve order range
commitment_input = int.from_bytes(os.urandom(32), "big") % curve_order
# Commitment has to be on G2. So we're creating it by using the multiply function with a G2 generator,
unblind_commitment = multiply(G2, commitment_input)
# Now we want to both:
# 1) Use it in functions here (inside this module).
self.ticket_data = {"unblind_commitment": unblind_commitment}
# 2) Save it to disk
# So, we have to serialize it to save it,
data_to_save_to_disk = {
"unblind_commitment": self._serialize_point_g2(unblind_commitment)
}
# this wil be used at a future point to unblind the billing server's signature:
did_it_save = save_data(
which_ticket, "unblind_commitment", data_to_save_to_disk
)
# note: we return the status of our saving operation, and not the commitment itself.
# (because it's saved in the 'self.ticket_data' object).
return did_it_save
def blind_commitment(self, which_ticket: int) -> str | None:
"""
After the previous original (unblinded) commitment is setup,
we can now blind it, (so external functions outside this class can send to the server)
Requirements:
This function assumes you already have the unblinded commitment saved in the 'self.ticket_data' object.
"""
# That's why this function starts off by checking you have it,
if self.ticket_data is None:
raise ValueError(
"We are missing the original unblinded commitment in self.ticket_data"
)
original_commitment = self.ticket_data["unblind_commitment"]
# Create the blinding factor.
blinding_factor = int.from_bytes(os.urandom(32), "big") % curve_order
# Blind the commitment,
blind_commitment = multiply(original_commitment, blinding_factor)
# Save/store the 'blinding factor' as just the raw number. This is kept on your local device,
did_it_save_blinding_factor = save_data(
which_ticket, "blinding_factors_int", str(blinding_factor)
)
# While the 'blind commitment' is being serialized,
g2_serialized_blind_commitment = self._serialize_point_g2(blind_commitment)
# we also save the 'blind commitment' so it can be used to validate later,
did_it_save = save_data_as_json(
which_ticket, "blinded_commitment_json", g2_serialized_blind_commitment
)
# Serialized blinded commitment is being prepared to send to the server,
blinded_json = json.dumps(g2_serialized_blind_commitment)
# double check both pieces of data are saved:
if did_it_save == True and did_it_save_blinding_factor == True:
return blinded_json
else:
return None
def load_key(self, pub_key: dict) -> tuple | bool:
try:
# use it in projective form (x, y, z), where z=1 for affine points
projective_public_key = (FQ(pub_key["x"]), FQ(pub_key["y"]), FQ.one())
return projective_public_key
except:
return False
def test_blind_signature_validity(
self,
which_ticket: int,
blind_signature: dict,
string_public_key: dict,
) -> dict:
# Deserialize the blinded signature (that was just recieved from the billing server as a JSON)
blinded_signature = self._deserialize_point_g2(blind_signature)
blinded_commitment_as_dict = get_data(which_ticket, "blinded_commitment_json")
blinded_commitment = self._deserialize_point_g2(blinded_commitment_as_dict)
projective_public_key = self.load_key(string_public_key)
if projective_public_key == False:
return {"valid": False, "message": "invalid_key"}
# All of that was to prep the values for this pairing equation,
try:
if pairing(blinded_signature, G1) == pairing(
blinded_commitment, projective_public_key
):
return {"valid": True, "message": "It worked."}
else:
return {
"valid": False,
"message": "Pairing equation was able to compute, but did not actually match.",
}
except ValueError as e:
# I want to see if the reason for the error is this below, because that likely is a bad public key,
# "ValueError: Invalid input - point P is not on the correct curves"
error_message_as_string = str(e)
import re
if re.search(
r"point P is not on the correct curves", error_message_as_string
):
error_msg = f"The reason the verification failed is because the public key is invalid (not on the elliptic curve. Original message: {error_message_as_string})"
logger.error(error_msg, exc_info=True)
return {"valid": False, "message": "invalid_public_key"}
else:
return {"valid": True, "message": "verification_failed"}
except Exception as e:
tb = traceback.extract_tb(e.__traceback__)[-1]
error_msg = f"error type {type(e).__name__} happened on line {tb.lineno}, in function {tb.name}, giving error: {e}. The actual code in question is: {tb.line}. in this file: {tb.filename}"
logger.error(error_msg, exc_info=True)
return {"valid": True, "message": "verification_failed"}
# this is after the server signs,
def unblind_signature(
self,
which_ticket: int,
blind_signature: dict,
) -> dict:
"""
Client unblinds the server's blinded signature using their secret "blinding_factors".
"""
try:
blinding_factor = get_data_as_int(which_ticket, "blinding_factors_int")
# uncomment this next line, if you want to load it from a file instead:
# blinded_sig_data = json.loads(blinded_sig_json)
# Deserialize the blinded signature (that was just recieved from the billing server as a JSON)
blinded_signature = self._deserialize_point_g2(blind_signature)
# Unblind: S = b^(-1) · S_blind
b_inv = pow(blinding_factor, -1, curve_order)
unblinded_signature = multiply(blinded_signature, b_inv)
return unblinded_signature
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON signature: {str(e)}")
except Exception as e:
tb = traceback.extract_tb(e.__traceback__)[-1]
error_msg = f"Issue with unblinding the signature. It's error type {type(e).__name__} happened on line {tb.lineno}, in function {tb.name}, giving error: {e}. The actual code in question is: {tb.line}. in this file: {tb.filename}"
logger.error(error_msg, exc_info=True)
return {"valid": False, "message": "invalid_blind_sig"}
def make_final_ticket(
self,
which_ticket: int,
unblinded_signature: dict,
) -> str | bool:
"""
Input: This function takes in the unblinded signature, and which ticket ("which_ticket")
Output: And makes a JSON that has both of them together.
Note: There is NO MATH done, it's just putting them together.
This final JSON, we're defining as a "ticket".
"""
# load the ORIGINAL data (prior to blinding it):
self.ticket_data = get_data(which_ticket, "unblind_commitment")
# just change the dictionary's key word to 'commitment' make it more neutral for the server,
self.ticket_data["commitment"] = self.ticket_data.pop("unblind_commitment")
if self.ticket_data == False:
error_msg = "There's an error with making the final ticket. It could not find or get the unblind_commitment from the self.ticket_data dictionary"
logger.error(error_msg, exc_info=True)
return False
# append the signature on to it:
self.ticket_data["signature"] = self._serialize_point_g2(unblinded_signature)
# that's it. no math,
ticket = json.dumps(self.ticket_data)
return ticket

View file

@ -0,0 +1,74 @@
from core.services.crypto.TicketCustomer import TicketCustomer
"""
We are making both an unblinded and blinded commitment pair.
unblinded = saving it to disk, for later
blinded = sending to the server
"""
def make_ONE_commitment_pair(
profile_object: TicketCustomer, which_ticket: int
) -> str | None:
# First, make the original unblinded commitment. it's saved inside the profile object:
did_unblinded_save = profile_object.make_unblinded_commitment(which_ticket)
# that `profile_object` object is holding the unblinded commitment,
# so it can be directly used to blind it (without having to serialize then deserialize it).
# Then BLIND it, so it can be sent to the billing server:
blind_commitment = profile_object.blind_commitment(which_ticket)
# we need to make sure we actually saved the data,
# because it's the only way to unblind it later:
if did_unblinded_save == False or blind_commitment == False:
return None
# assuming we got it,
else:
# we are initially only sending to the server the blinded data,
return blind_commitment
def make_ALL_commitments(how_many_profiles_to_make: int) -> list | None:
# Setup the entire class object of "profile_object" for using all these other functions,
profile_object = TicketCustomer()
# setup loop:
which_ticket = 0
list_of_all_blinded_data = []
failed_to_save = []
# loop up to the number of profiles requested:
while which_ticket < how_many_profiles_to_make:
which_ticket = which_ticket + 1
# for each profile, make the commitment data,
blinded_string = make_ONE_commitment_pair(profile_object, which_ticket)
if blinded_string is None:
second_try = make_ONE_commitment_pair(profile_object, which_ticket)
if second_try is None:
failed_to_save.append(which_ticket)
profile_object.reset()
pass
# and add only the BLIND commitment to the list, to submit to the server after paying:
list_of_all_blinded_data.append(blinded_string)
# wipe the object's data, but without reloading the code:
profile_object.reset()
# Summary:
# We SAVED to disk all the unblind commtiments and blinding factors for later.
# If more than one of them failed to save, then we are abandoning this entire loop.
how_many_failed_to_save = len(failed_to_save)
if how_many_failed_to_save > 1:
return None
# But we are sending to the SERVER only the BLIND commitments,
return list_of_all_blinded_data

View file

@ -0,0 +1,150 @@
# types
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from essentials.observers.ConnectionObserver import ConnectionObserver
from observers.TicketObserver import TicketObserver
# services
from core.services.crypto.TicketCustomer import TicketCustomer
from core.services.helpers.validate_number_format import number_of_valid_length
from core.services.prepare_tickets.get_public_key_by_config import get_public_key_by_config
from core.services.prepare_tickets.get_pub_key import key_is_in_valid_format, get_pub_key
from core.services.failed_verification.test_if_new_key_works import test_if_new_key_works
from core.services.networking.make_url import make_url
from core.services.networking.get_data_from_server import get_data_from_server
# utils
from core.utils.basic_operations.write_or_read_from_json import get_value_from_json_file
from core.utils.basic_operations.write_string_to_text_file import write_string_to_text_file
# observers & constants
from core.observers.BaseObserver import BaseObserver
from core.models.Event import Event
from core.Constants import Constants
# errors
from core.errors.exceptions import *
from core.errors.logger import logger
# generic
import json
# find out what plan the guy has:
def find_out_which_key_plan() -> str | None:
billing_folder = Constants.HV_TICKETING_CONFIG_HOME
filepath = f"{billing_folder}/billing_choices.json"
try:
which_key_plan = get_value_from_json_file(filepath, "which_key")
return which_key_plan
except:
return None
def save_a_key(named_what: str, new_public_key: dict) -> bool:
try:
data_folder = Constants.HV_TICKETING_DATA_HOME
string_form_of_key_name = find_out_which_key_plan()
if not string_form_of_key_name:
error_msg = f"Which key plan was blank inside 'save_a_key' module."
raise ValueError(error_msg)
file_path = f"{data_folder}/{named_what}_{string_form_of_key_name}.json"
did_it_save = write_string_to_text_file(new_public_key, file_path)
logger.debug(
f"We're inside the 'save_a_key' function, but did it save? {did_it_save}"
)
return did_it_save
except:
error_msg = "Inside the save_a_key function, we had an error saving the new public key, which ironically worked for validation."
logger.error(error_msg, exc_info=True)
logger.debug(error_msg)
return False
# # get that public key from the public API:
def get_new_pubkey_from_api(connection_observer: ConnectionObserver) -> dict | None:
# find out what plan the guy has:
which_key_plan = find_out_which_key_plan()
if which_key_plan is None:
return None
url = make_url(which_key_plan)
# the result of this is a python dictionary with single '
api_results = get_data_from_server(url, connection_observer)
if "data" in api_results:
new_public_key = api_results["data"]
return new_public_key
def are_keys_different(old_public_key, new_public_key) -> bool:
if new_public_key != old_public_key:
return True
else:
return False
def is_the_key_to_blame(
old_public_key: dict,
failed_validations: list,
ticket_observer: TicketObserver,
connection_observer: ConnectionObserver,
) -> dict:
if not number_of_valid_length(old_public_key):
# invalid key
notification = f"Invalid Numbers on the public_key"
ticket_observer.notify("preparing", subject=notification)
return {"valid": True, "message": "invalid_key"}
new_public_key = get_new_pubkey_from_api(connection_observer)
if not new_public_key:
return {"valid": False, "message": "api_connection_issue"}
logger.debug("")
logger.debug(f"old_public_key: {old_public_key}")
type_old_public_key = type(old_public_key)
logger.debug(f"old_public_key type is {type_old_public_key}")
logger.debug("")
logger.debug(f"new_public_key: {new_public_key}")
type_new_public_key = type(new_public_key)
logger.debug(f"new_public_key type is {type_new_public_key}")
logger.debug("")
notification = "Comparing the keys..."
ticket_observer.notify("preparing", subject=notification)
result_of_comparison = are_keys_different(old_public_key, new_public_key)
# test if it helps:
if not result_of_comparison:
error_msg = "New key is the SAME as the old one."
ticket_observer.notify("preparing", subject=error_msg)
return {"valid": False, "message": "same"}
status_update = "New key is DIFFERENT from the old one!"
ticket_observer.notify("preparing", subject=status_update)
logger.debug(status_update)
# save new key:
logger.debug("Saving new key with 'new' prefix in a different file..")
save_a_key("new", new_public_key)
logger.debug("New Key saved in a different file labeled 'new'")
logger.debug("Testing if the new one verifies the blind signatures...")
quantity_results = test_if_new_key_works(
new_public_key, failed_validations, ticket_observer
)
logger.debug(
f"Now the new key worked for {quantity_results} of the blind signatures."
)
if quantity_results > 0:
logger.debug("Therefore, the new key works.")
return {"valid": True, "comparison": "different", "matters": False}
else:
logger.debug(
"Therefore, the new key doesn't help. It is different, but also produces invalid blind signatures."
)
return {"valid": False, "comparison": "different", "matters": False}

View file

@ -0,0 +1,72 @@
# types
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.observers.TicketObserver import TicketObserver
from essentials.observers.ConnectionObserver import ConnectionObserver
# other services
from core.services.prepare_tickets.setup_ticket_tracker import setup_ticket_tracker
from core.services.prepare_tickets.unblind_all_tickets import unblind_ALL_tickets
# utils
from core.utils.basic_operations.write_or_read_from_json import get_value_from_json_file
from core.utils.get_data import get_data
from core.utils.basic_operations.does_file_exist import does_file_exist
# observers & constants
from core.Constants import Constants
# errors
from core.errors.exceptions import *
from core.errors.logger import logger
def get_profile_quantity_from_config() -> int:
billing_folder = Constants.HV_TICKETING_CONFIG_HOME
filepath = f"{billing_folder}/billing_choices.json"
try:
how_many_profiles = get_value_from_json_file(filepath, "how_many_profiles")
how_many_profiles_as_int = int(how_many_profiles)
except:
how_many_profiles_as_int = 6
return how_many_profiles_as_int
def prep_with_previously_saved_blind_sigs(
ticket_observer: TicketObserver,
connection_observer: ConnectionObserver,
) -> dict:
how_many_profiles_as_int = get_profile_quantity_from_config()
# how_many_profiles_as_int = int(how_many_profiles)
counter = 0
list_of_blind_signatures = []
while counter < how_many_profiles_as_int:
counter = counter + 1
each_blind_signature = get_data(counter, "servers_blind_sign")
list_of_blind_signatures.append(each_blind_signature)
logger.debug(f"we loaded the {counter} blind sig into the list")
logger.debug(
f"Completed the {counter}th item of the entire loop of {how_many_profiles_as_int}"
)
try:
logger.debug("unblinding..")
# Unblind the signatures & combine with unblinded commitment:
unblind_ALL_tickets(
list_of_blind_signatures, ticket_observer, connection_observer
)
logger.debug("finished unblinding. setting up file to track ticket use,")
# make a json to keep track of which tickets are used:
setup_ticket_tracker(how_many_profiles_as_int)
# this means it both verified, & unblinded, & setup the tracker without erroring out:
return {"valid": True, "message": "worked"}
except:
return {
"valid": False,
"message": "failed with prep_with_previously_saved_blind_sigs",
}

View file

@ -0,0 +1,60 @@
# types
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.observers.TicketObserver import TicketObserver
# services
from core.services.crypto.TicketCustomer import TicketCustomer # actually use it, not type hint
# utils
from core.utils.get_data import get_data
# observers
from core.observers.BaseObserver import BaseObserver
from core.models.Event import Event
# generic
import json
"""
User got a new pub key, so let's see if the verification can be completed,
using a local copy of the server's previously failed blind signatures.
## Inputs:
1. public_key = dictionary of the new key
2. failed_validations = list of which previous blind sigs failed.
(it's a list with the numbers as strings).
## Outputs:
a. quantity_results = How many of the previous blind sigs, the new public key can now verify.
"""
def test_if_new_key_works(
public_key: dict,
failed_validations: list,
ticket_observer: TicketObserver,
) -> int:
crypto_operations = TicketCustomer()
these_now_work = []
for which_ticket in failed_validations:
each_blind_signature = get_data(which_ticket, "servers_blind_sign")
#### Check blind signature validity for THAT signature:
this_signature_result = crypto_operations.test_blind_signature_validity(
which_ticket, each_blind_signature, public_key
)
if this_signature_result["valid"] == True:
# verification went fine:
these_now_work.append(which_ticket)
notification = f"New Key Verified {which_ticket}'s blind signature"
ticket_observer.notify("preparing", subject=notification)
continue
else:
notification = f"Still Invalid {which_ticket} blind signature"
ticket_observer.notify("preparing", subject=notification)
### LOOP DONE
quantity_results = len(these_now_work)
return quantity_results

View file

@ -0,0 +1,14 @@
from core.utils.basic_operations.does_file_exist import does_file_exist
from core.Constants import Constants
def does_ticket_file_exist(which_ticket: int) -> bool:
data_folder = Constants.HV_TICKETING_DATA_HOME
which_ticket_as_string = str(which_ticket)
name_of_file_and_folder = "unblinded_final_ticket"
where_is_the_data = f"{data_folder}/{name_of_file_and_folder}/{name_of_file_and_folder}_{which_ticket_as_string}.json"
does_it_exist = does_file_exist(where_is_the_data)
return does_it_exist

View file

@ -0,0 +1,11 @@
from core.utils.basic_operations.write_or_read_from_json import get_value_from_json_file
from core.Constants import Constants
def get_how_many_profiles_were_ordered() -> int:
billing_folder = Constants.HV_TICKETING_CONFIG_HOME
filepath = f"{billing_folder}/billing_choices.json"
key = "how_many_profiles"
return int(get_value_from_json_file(filepath, key))

View file

@ -0,0 +1,19 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from essentials.observers.ConnectionObserver import ConnectionObserver
# services
from core.services.networking.make_url import make_url
from core.services.networking.send_data_to_server import send_data_to_server
# use temp billing to get the plan details
def get_plan_data(
temp_billing_code: str, connection_observer: ConnectionObserver
) -> dict:
which_endpoint = "/plan"
url = make_url(which_endpoint)
payload = {"temp_billing_code": temp_billing_code}
reply = send_data_to_server(payload, url, connection_observer)
return reply

View file

@ -0,0 +1,14 @@
from core.Constants import Constants
from core.utils.basic_operations.write_or_read_from_json import get_value_from_json_file
def get_value_from_config(which_value: str) -> dict:
billing_folder = Constants.HV_TICKETING_CONFIG_HOME
filepath = f"{billing_folder}/billing_choices.json"
try:
value = get_value_from_json_file(filepath, which_value)
return {"status": True, "value": value}
except:
error_msg = "There is an error with the config file, or no config. Are you sure you have tickets?"
return {"status": False, "message": error_msg}

View file

@ -0,0 +1,11 @@
from core.utils.basic_operations.write_or_read_from_json import get_value_from_json_file
from core.Constants import Constants
def get_which_billing_key() -> str:
billing_folder = Constants.HV_TICKETING_CONFIG_HOME
filepath = f"{billing_folder}/billing_choices.json"
json_key = "which_key"
which_key = get_value_from_json_file(filepath, json_key)
return which_key

View file

@ -0,0 +1,18 @@
from core.Constants import Constants
import json
def save_sync_results(sync_results: dict) -> bool:
valid = sync_results.get("valid", False)
data = sync_results.get("data", False)
if valid:
if data:
billing_folder = Constants.HV_TICKETING_CONFIG_HOME
filepath = f"{billing_folder}/expirations.json"
with open(filepath, "w") as f:
json.dump(data, f, indent=4)
return True
else:
return False
else:
return False

View file

@ -0,0 +1,15 @@
from core.errors.exceptions import *
from core.errors.logger import logger
def valid_profile_quantity(how_many_profiles: int) -> bool:
try:
if how_many_profiles >= 3 and how_many_profiles <= 15:
return True
else:
logger.debug("Invalid Profile Quantity")
return False
except:
raise InvalidData(
f"how_many_profiles may not even be an integer? It's {how_many_profiles}"
)

View file

@ -0,0 +1,23 @@
from typing import Any
def validate_number_format(num: Any) -> bool:
# Check if it's an integer (exclude booleans since bool is a subclass of int)
if isinstance(num, bool) or not isinstance(num, int):
return False
digit_count = len(str(abs(num)))
# Check if digit count is between 109 and 121 digits.
# most of these blind sigs are 115
return 109 < digit_count < 121
# make sure it's a number of the right length:
def number_of_valid_length(any_dictionary: dict) -> bool:
x = any_dictionary["x"]
y = any_dictionary["y"]
if validate_number_format(x) and validate_number_format(y):
return True
else:
return False

View file

@ -0,0 +1,76 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from essentials.observers.ConnectionObserver import ConnectionObserver
# services
from core.services.networking.internet_test import do_we_have_internet
from core.services.networking.is_dns_problem import is_dns_problem, is_dns_problem_via_tor
from core.services.networking.extract_domain import extract_domain
from core.services.networking.is_tor_working import is_tor_working
# errors
from core.errors.exceptions import *
from core.errors.logger import logger
#######################################################
############### START : REGULAR SYSTEM ###############
#######################################################
def evaluate_regular_networking_problem(
url: str,
connection_observer: ConnectionObserver,
port_number: int | None = None,
proxies: dict | None = None,
) -> dict:
domain_only = extract_domain(url)
if not do_we_have_internet():
return {"valid": False, "error_code": "no_internet"}
if is_dns_problem(domain_only):
logger.debug("This is a DNS issue.")
return {"valid": False, "error_code": "dns_issue"}
return {"valid": False, "error_code": "unknown"}
#######################################################
############### END: REGULAR SYSTEM ###############
#######################################################
#######################################################
############### START: TOR ###############
#######################################################
def evaluate_tor_networking_problem(
url: str,
connection_observer: ConnectionObserver,
) -> dict:
domain_only = extract_domain(url)
logger.debug(f"Evaluating a networking problem for a Tor connection")
logger.debug("checking if Tor works against the official Tor Project API..")
if not is_tor_working(connection_observer):
return {"valid": False, "error_code": "tor_issue"}
else:
tor_fine = f"While there were connection issues, but Tor is working fine!"
logger.error(tor_fine, exc_info=True)
logger.debug(tor_fine)
logger.debug("Check the original DNS via Tor...")
if is_dns_problem_via_tor(domain_only, connection_observer):
return {"valid": False, "error_code": "dns_issue"}
else:
dns_fine = f"There were connection issues, but the DNS for {domain_only} via Tor is working fine!"
logger.error(dns_fine, exc_info=True)
logger.debug(dns_fine)
# can't solve it:
return {"valid": False, "error_code": "unknown"}
#######################################################
############### END: TOR ###############
#######################################################

View file

@ -0,0 +1,31 @@
"""
Extract domain from URL, stripping protocol and path.
If exact TLD match isn't found, tries to return the best guess.
"""
def extract_domain(url: str) -> str:
common_domains = [".com", ".net", ".org", ".is"]
# Remove protocol
if url.startswith("https://"):
url = url[8:]
elif url.startswith("http://"):
url = url[7:]
# Find the earliest TLD in the string
earliest_pos = len(url)
earliest_tld_len = 0
for tld in common_domains:
pos = url.find(tld)
if pos != -1 and pos < earliest_pos:
earliest_pos = pos
earliest_tld_len = len(tld)
# If a valid TLD was found, return domain + TLD
if earliest_tld_len > 0:
return url[: earliest_pos + earliest_tld_len]
# Fallback: return everything before the first slash (best guess)
return url.split("/")[0]

View file

@ -0,0 +1,6 @@
from core.controllers.ConfigurationController import ConfigurationController
def get_connection_type() -> str:
connection = ConfigurationController.get_connection()
return connection

View file

@ -0,0 +1,37 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from essentials.observers.ConnectionObserver import ConnectionObserver
# services
from core.services.networking.get_connection_type import get_connection_type
from core.services.networking.use_tor import tor_get_request
from core.services.networking.regular_get_request import regular_get_request
# errors
from core.errors.exceptions import *
from core.errors.logger import logger
import traceback
# Generic GET request to an endpoint, filtered by the user's preference of connection type (Tor or Not)
def get_data_from_server(url: str, connection_observer: ConnectionObserver) -> dict:
try:
connection_type = get_connection_type()
logger.debug(f"connection type is: {connection_type}")
if connection_type == "tor":
response = tor_get_request(url, connection_observer)
else:
response = regular_get_request(url, connection_observer)
# regardless:
return response
except Exception as e:
logger.debug("could not perform the get request")
tb = traceback.extract_tb(e.__traceback__)[-1]
error_msg = f"generic except error with the get request. error type {type(e).__name__} happened on line {tb.lineno}, in function {tb.name}, giving error: {e}. The actual code in question is: {tb.line}. in this file: {tb.filename}"
logger.error(error_msg, exc_info=True)
logger.debug(error_msg)
return {"valid": False, "message": "failed"}

View file

@ -0,0 +1,26 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
def do_we_have_internet() -> bool:
import requests
urls = [
"https://hc1.simplifiedprivacy.net",
"https://hc3.hydraveil.net",
"https://api.hydraveil.net/api/v1/health",
"https://calyxinstitute.org",
"https://www.grapheneos.org",
"https://torproject.org",
]
with ThreadPoolExecutor(max_workers=len(urls)) as executor:
futures = [executor.submit(requests.get, url, timeout=5) for url in urls]
for future in as_completed(futures):
try:
future.result()
return True # when the first works, it ends the entire loop
except:
pass
return False

View file

@ -0,0 +1,56 @@
# for tor:
from essentials.modules.TorModule import TorModule
from essentials.observers.ConnectionObserver import ConnectionObserver
from essentials.services.ConnectionService import ConnectionService
import json, os
# for both:
from core.services.networking.extract_domain import extract_domain
from core.errors.logger import logger
import socket
import socks
"""
Check if there's a DNS problem with a domain.
Returns True if DNS fails (problem exists), False if DNS resolves successfully.
"""
def is_dns_problem(url: str) -> bool:
domain = extract_domain(url)
try:
socket.gethostbyname(domain)
return False # DNS resolved successfully, no problem
except socket.gaierror:
return True # DNS resolution failed, problem exists
except Exception:
return True # Any other error is treated as a DNS problem
def is_dns_problem_via_tor(
domain: str,
connection_observer: ConnectionObserver | None = None,
) -> bool:
logger.debug("We've triggered EVALUATING IF it's a DNS problem via Tor")
port_number = ConnectionService.get_random_available_port_number()
tor_module = TorModule(os.path.expanduser("~/sp-tor-test"))
tor_module.create_session(port_number, connection_observer)
# Set up SOCKS5 proxy using the same port as the existing Tor proxy,
socks.set_default_proxy(socks.SOCKS5, "127.0.0.1", port_number)
socket.socket = socks.socksocket
# DNS query check
try:
logger.debug("checking dns via socket..")
socket.gethostbyname(domain)
tor_module.destroy_session(port_number)
return False # DNS resolved successfully, no problem
except socket.gaierror:
return True # DNS resolution failed, problem exists
except Exception:
return True # Any other error is treated as a DNS problem

View file

@ -0,0 +1,37 @@
# for creating your own tor module here:
from essentials.modules.TorModule import TorModule
from essentials.observers.ConnectionObserver import ConnectionObserver
from essentials.services.ConnectionService import ConnectionService
from core.errors.logger import logger
import os
import json
def is_tor_working(
connection_observer: ConnectionObserver | None = None,
) -> bool:
logger.debug(
"We've triggered a check on if Tor is even working in general, unrelated to our API"
)
import requests
port_number = ConnectionService.get_random_available_port_number()
tor_module = TorModule(os.path.expanduser("~/sp-tor-test"))
tor_module.create_session(port_number, connection_observer)
proxies = {
"http": f"socks5h://127.0.0.1:{port_number}",
"https": f"socks5h://127.0.0.1:{port_number}",
}
response_body = requests.get(
"https://check.torproject.org/api/ip", proxies=proxies
).json()
logger.debug(json.dumps(response_body, indent=4, sort_keys=True))
tor_working = response_body.get("IsTor", False)
tor_module.destroy_session(port_number)
return tor_working

View file

@ -0,0 +1,12 @@
from core.Constants import Constants
# Prepare full endpoint urls:
def make_url(which_endpoint: str) -> str:
domain = Constants.TICKET_API_BASE_URL
# hardcode option:
# domain = "https://ticket.hydraveil.net"
final_url = f"{domain}/{which_endpoint}"
return final_url

View file

@ -0,0 +1,41 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from essentials.observers.ConnectionObserver import ConnectionObserver
from core.observers.TicketObserver import TicketObserver
from core.services.networking.evaluate_networking_problem import evaluate_regular_networking_problem
from core.errors.logger import logger
import json
def regular_get_request(url: str, connection_observer: ConnectionObserver):
connection_type = "system"
import requests
try:
response = requests.get(url)
# Raises HTTPError for 4xx/5xx
response.raise_for_status()
dictonary_of_response = response.json()
final_reply = {"valid": True, "data": dictonary_of_response}
return final_reply
except requests.exceptions.ConnectionError:
results = evaluate_regular_networking_problem(url, connection_observer)
return results
except requests.exceptions.Timeout:
logger.debug("Connection timed out")
results = evaluate_regular_networking_problem(url, connection_observer)
return results
except requests.exceptions.HTTPError:
logger.debug(f"HTTP error: {response.status_code}")
results = evaluate_regular_networking_problem(url, connection_observer)
return results

View file

@ -0,0 +1,75 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import requests
from essentials.observers.ConnectionObserver import ConnectionObserver
#
# from core.models.Result import Result
from core.services.networking.evaluate_networking_problem import evaluate_regular_networking_problem
from core.errors.exceptions import *
from core.errors.logger import logger
import traceback
import json
def regular_post_request(
url: str, payload: str, connection_observer: ConnectionObserver
) -> requests.Response | dict:
logger.debug(f"We're doing a regular post request, and the payload is {payload}")
logger.debug(f"And the url is {url}, make sure it has https")
# request is a bulky import, that's not needed at startup time:
import requests
try:
logger.debug("about to do a regular post request")
response = requests.post(url, json=payload)
logger.debug("Did the post request")
return response
except requests.exceptions.ConnectionError:
logger.debug("Connection Error")
results = evaluate_regular_networking_problem(url, connection_observer)
return results
except requests.exceptions.Timeout:
logger.debug("Connection timed out, server took too long")
results = evaluate_regular_networking_problem(url, connection_observer)
return results
except requests.exceptions.HTTPError:
logger.debug(f"HTTP error: {response.status_code}")
results = evaluate_regular_networking_problem(url, connection_observer)
return results
except requests.exceptions.RequestException:
logger.debug(f"Generic RequestException")
results = evaluate_regular_networking_problem(url, connection_observer)
return results
# except Exception as e:
# tb = traceback.extract_tb(e.__traceback__)[-1]
# error_msg = f"generic except error with regular_post_request. error type {type(e).__name__} happened on line {tb.lineno}, in function {tb.name}, giving error: {e}. The actual code in question is: {tb.line}. in this file: {tb.filename}"
# logger.error(error_msg, exc_info=True)
except:
logger.debug("except triggered in regular post")
results = evaluate_regular_networking_problem(url, connection_observer)
return results
"""
try:
response = requests.post(url, json=data)
except requests.exceptions.ConnectionError:
print("Network/DNS issue — can't reach server")
except requests.exceptions.Timeout:
print("Request took too long")
except requests.exceptions.HTTPError:
print("Server returned an error status")
except requests.exceptions.RequestException:
print("Some other requests-related error occurred")
"""

View file

@ -0,0 +1,187 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from essentials.observers.ConnectionObserver import ConnectionObserver
# services:
from core.services.networking.get_connection_type import get_connection_type
from core.services.networking.use_tor import tor_post_request
from core.services.networking.regular_post_request import regular_post_request
# errors
from core.errors.exceptions import *
from core.errors.logger import logger
# generic
from typing import Callable
import json
import time
def replace_http_with_https(url):
if url.startswith("http://"):
return url.replace("http://", "https://", 1)
return url
# The purpose of this function is to take into account if the user has Tor or not:
def send_data_to_server(
payload: dict,
original_url: str,
connection_observer: ConnectionObserver,
) -> dict:
# request is a bulky import, that's not needed at startup time:
import requests
# make sure https, because accidentally using http will break the API,
url = replace_http_with_https(original_url)
connection_type = get_connection_type()
logger.debug(f"connection type is: {connection_type}, it's going to url: {url}")
if connection_type == "tor":
logger.debug(f"doing post request through Tor,")
final_package = do_the_request(
tor_post_request, payload, url, connection_observer
)
else:
logger.debug(f"doing post request through regular system connection,")
final_package = do_the_request(
regular_post_request, payload, url, connection_observer
)
return final_package
def do_the_request(
which_function: Callable,
payload: dict,
url: str,
connection_observer: ConnectionObserver,
) -> dict:
##############
logger.debug("doing the first post request from send_data_to_server")
first_response = which_function(url, payload, connection_observer)
logger.debug("finished the first post request from send_data_to_server")
first_evaluation = evaluate_response(first_response)
first_is_valid = first_evaluation.get("valid", False)
# if it worked, we are done,
if first_is_valid:
return first_evaluation
if "retry_now" in first_evaluation:
print("waiting 3 seconds before trying again")
time.sleep(3)
# repeat:
second_response = which_function(url, payload, connection_observer)
second_evaluation = evaluate_response(second_response)
second_is_valid = second_evaluation.get("valid", False)
if second_is_valid:
return second_evaluation
else:
# let's get that original data, even if bad,
return first_evaluation
# this is handled this way, so more conditions other than retry can be added later.
return first_evaluation
def evaluate_response(response):
import requests
if not isinstance(response, requests.Response):
new_dictionary = {"valid": False, "error_code": "connection_error"}
return new_dictionary
status_code = response.status_code
logger.debug(f"Status code: {status_code}")
if response.ok:
logger.debug("Success from Post request")
dictonary_of_response = response.json()
logger.debug(f"post response is {dictonary_of_response}")
return dictonary_of_response
if 500 <= status_code < 600:
logger.debug("5xx error")
retry_now = [502, 503, 504]
retry_later = [500, 501, 505]
if status_code in retry_now:
logger.debug("Evaluation says retry NOW")
new_dictionary = {
"valid": False,
"error_code": "server_side",
"retry_now": True,
}
return new_dictionary
elif status_code in retry_later:
logger.debug("Evaluation says retry LATER")
new_dictionary = {
"valid": False,
"error_code": "server_side",
"retry_later": True,
}
return new_dictionary
else:
logger.debug("Evaluation confused by 500 level error. No retry")
dictonary_of_response = {"valid": False, "error_code": "server_error"}
elif 400 <= status_code < 499:
logger.debug("4xx error from Post request")
dictonary_of_response = response.json()
logger.debug(f"post response is {dictonary_of_response}")
# try to extract useful data from the reply, to tell the end-user:
keys_to_look_for = ["message", "error", "error_code"]
for each_key, value in dictonary_of_response:
if each_key in keys_to_look_for:
logger.debug(
f"We did find {each_key} in the 4xx reply, which could potentially explain to the end-user why it was rejected."
)
return dictonary_of_response
# from here on out, we don't have the message to go by.
# let's see if we can figure out why it's rejected:
data_malformed = [400, 401, 403]
if status_code == 429:
new_dictionary = {
"valid": False,
"error_code": "rate_limited",
"retry_now": True,
}
return new_dictionary
elif status_code == 404:
new_dictionary = {"valid": False, "error_code": "invalid_url"}
return new_dictionary
elif status_code in data_malformed:
new_dictionary = {"valid": False, "error_code": "invalid_data"}
return new_dictionary
elif status_code == 405:
new_dictionary = {"valid": False, "error_code": "get_post_issue"}
return new_dictionary
else:
logger.debug(
"We were unable to get useful data from the 4xx Response. So we are creating a new standardized reply for the view"
)
new_dictionary = {"valid": False, "error_code": "invalid_data"}
return new_dictionary
else:
print(f"Other status: {status_code}")
logger.debug(
"We were unable to get useful data from the Response. So we are creating a new standardized reply for the view"
)
new_dictionary = {"valid": False, "error_code": "server_error"}
return new_dictionary

View file

@ -0,0 +1,131 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import requests
# tor
from essentials.modules.TorModule import TorModule
from essentials.observers.ConnectionObserver import ConnectionObserver
from essentials.services.ConnectionService import ConnectionService
# services
from core.services.networking.evaluate_networking_problem import evaluate_tor_networking_problem
# errors
from core.errors.exceptions import *
from core.errors.logger import logger
# generic
import json, os
from typing import Any
def tor_get_request(custom_url: str, connection_observer: ConnectionObserver) -> dict:
import requests
port_number = ConnectionService.get_random_available_port_number()
logger.debug(f"Using Tor on port number {port_number}")
try:
tor_module = TorModule(os.path.expanduser("~/sp-tor-test"))
tor_module.create_session(port_number, connection_observer)
proxies = {
"http": f"socks5h://127.0.0.1:{port_number}",
"https": f"socks5h://127.0.0.1:{port_number}",
}
logger.debug(f"Doing Request through Tor via port {port_number}")
response = requests.get(custom_url, proxies=proxies)
logger.debug("Request through Tor successful.")
# if it crashes from the above error check, then it won't destroy the proxy,
tor_module.destroy_session(port_number)
# return response
dictonary_of_response = response.json()
final_reply = {"valid": True, "data": dictonary_of_response}
return final_reply
except requests.exceptions.ConnectionError:
problem_results = evaluate_tor_networking_problem(
custom_url, connection_observer, port_number, proxies
)
return problem_results
except requests.exceptions.Timeout:
logger.debug("Connection timed out")
problem_results = evaluate_tor_networking_problem(
custom_url, connection_observer, port_number
)
return problem_results
except requests.exceptions.HTTPError:
logger.debug(f"HTTP error: {response.status_code}")
problem_results = evaluate_tor_networking_problem(
custom_url, connection_observer, port_number
)
return problem_results
def tor_post_request(
custom_url: str,
payload: dict,
connection_observer: ConnectionObserver,
) -> requests.Response | dict:
import requests # bulky import
response = {
"status": "unable_to_get"
} # incase this goes to the except block, it's "defined"
port_number = ConnectionService.get_random_available_port_number()
logger.debug(f"Using Tor on port number {port_number}")
try:
tor_module = TorModule(os.path.expanduser("~/sp-tor-test"))
tor_module.create_session(port_number, connection_observer)
proxies = {
"http": f"socks5h://127.0.0.1:{port_number}",
"https": f"socks5h://127.0.0.1:{port_number}",
}
logger.debug(f"Sending data through Tor via port {port_number}")
response = requests.post(custom_url, json=payload, proxies=proxies)
logger.debug("Sending data through Tor successful.")
tor_module.destroy_session(port_number)
return response
except requests.exceptions.ConnectionError:
error_msg = "There was a Connection Error with the Tor Module."
logger.error(error_msg, exc_info=True)
logger.debug(error_msg)
problem_results = evaluate_tor_networking_problem(
custom_url, connection_observer
)
return problem_results
except requests.exceptions.Timeout:
error_msg = "Connection timed out"
logger.error(error_msg, exc_info=True)
logger.debug(error_msg)
problem_results = evaluate_tor_networking_problem(
custom_url, connection_observer
)
return problem_results
except requests.exceptions.HTTPError:
if isinstance(response, requests.Response):
error_msg = f"HTTP error: {response.status_code}"
logger.error(error_msg, exc_info=True)
logger.debug(error_msg)
problem_results = evaluate_tor_networking_problem(
custom_url, connection_observer
)
return problem_results

View file

@ -0,0 +1,31 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from essentials.observers.ConnectionObserver import ConnectionObserver
# services:
from core.services.networking.send_data_to_server import send_data_to_server
from core.services.networking.make_url import make_url
# errors:
from core.errors.exceptions import *
from core.errors.logger import logger
# it's private because it assumes it's being called from the controller, (with a JSON)
def _check_if_paid(payload: dict, connection_observer: ConnectionObserver) -> str:
# prep endpoint:
which_endpoint = "check_paid"
url = make_url(which_endpoint)
# literally send:
reply = send_data_to_server(payload, url, connection_observer)
if "valid" in reply:
valid_status = reply.get("valid")
if valid_status == True:
return "paid"
else:
return "not_paid"
else:
error_msg = f"When checking if paid, the Server returned invalid data or it never sent. The reply is {reply}"
raise InvalidData(error_msg)

View file

@ -0,0 +1,63 @@
from core.models.invoice.TicketInvoice import TicketInvoice
from core.errors.exceptions import *
from core.errors.logger import logger
from core.utils.basic_operations.write_or_read_from_json import update_json
from core.Constants import Constants
def extract_payment_details(
server_reply: dict, invoice_data_object: TicketInvoice
) -> TicketInvoice:
try:
temp_billing_code = server_reply.get("temp_billing_code", None)
field_mappings = {
"temp_billing_code": temp_billing_code,
"payment_type": server_reply.get("payment_type", "crypto"),
"selected_currency": server_reply.get("currency", None),
"due_amount": server_reply.get("crypto_amount", None),
"address": server_reply.get("crypto_address", None),
}
for field, value in field_mappings.items():
if value != None:
try:
setattr(invoice_data_object, field, value)
except TypeError as e:
invoice_data_object.add_validation_error(field, str(e))
else:
invoice_data_object.is_valid = False
invoice_data_object.add_validation_error(
field,
f"Server gave you a value of None for {field}, which is invalid.",
)
return invoice_data_object
# save the temp_billing_code:
billing_folder = Constants.HV_TICKETING_CONFIG_HOME
filepath = f"{billing_folder}/billing_choices.json"
did_it_save = update_json(filepath, "temp_billing_code", temp_billing_code)
if did_it_save == False:
invoice_data_object.is_valid = False
error_msg = (
f"Error!!! It did not save the temp_billing_code {temp_billing_code}"
)
logger.error(error_msg, exc_info=True)
print(error_msg)
raise MissingData(error_msg)
return invoice_data_object
except ServerSideError as e:
logger.error(f"ServerSideError: {e}", exc_info=True)
invoice_data_object.add_error_code(
"Unable to extract payment details from the server's reply."
)
return invoice_data_object
except:
error_msg = "Unable to extract payment details from the server's reply."
logger.error(f"From inside the extract payment details module, {error_msg}")
invoice_data_object.add_error_code(error_msg)
return invoice_data_object

View file

@ -0,0 +1,34 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from essentials.observers.ConnectionObserver import ConnectionObserver
#
from core.models.invoice.TicketInvoice import TicketInvoice
from core.services.networking.send_data_to_server import send_data_to_server
from core.services.networking.make_url import make_url
from core.services.payment_phase.extract_payment_details import extract_payment_details
from core.services.payment_phase.save_billing_choices import save_billing_choices
from core.errors.exceptions import *
from core.errors.logger import logger
# the controller already filtered the payload prior to this function.
def save_and_send_intitial_billing(
payload: dict,
connection_observer: ConnectionObserver,
invoice_data_object: TicketInvoice,
) -> TicketInvoice:
# Save choices:
save_billing_choices(payload)
# send them:
which_endpoint = "start_payment"
url = make_url(which_endpoint)
reply = send_data_to_server(payload, url, connection_observer)
# extract values from server's reply:
modified_data_object = extract_payment_details(reply, invoice_data_object)
return modified_data_object

View file

@ -0,0 +1,11 @@
from core.utils.basic_operations.write_or_read_from_json import write_json_to_file
from core.Constants import Constants
def save_billing_choices(payload: dict) -> None:
payload["use_random"] = True
billing_folder = Constants.HV_TICKETING_CONFIG_HOME
filepath = f"{billing_folder}/billing_choices.json"
write_json_to_file(payload, filepath)

View file

@ -0,0 +1,107 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from essentials.observers.ConnectionObserver import ConnectionObserver
from typing import Any
# services
from core.services.networking.make_url import make_url
from core.services.networking.get_data_from_server import get_data_from_server
# utils
from core.utils.basic_operations.does_file_exist import does_file_exist
from core.utils.basic_operations.write_string_to_text_file import write_string_to_text_file
# errors & constants
from core.Constants import Constants
from core.errors.logger import logger
# generic
import ast
def key_is_in_valid_format(public_key: Any) -> bool:
return set(public_key.keys()) == {"x", "y"}
def get_from_server_and_save(
string_form_of_key_name: str,
file_path: str,
connection_observer: ConnectionObserver,
) -> dict:
url = make_url(string_form_of_key_name)
# the result of this is a python dictionary with single '
public_key_results = get_data_from_server(url, connection_observer)
if isinstance(public_key_results, dict) and "valid" in public_key_results:
status = public_key_results.get("valid", False)
if status == True:
# extract:
public_key = public_key_results.get("valid", False)
# save it:
did_it_save = write_string_to_text_file(public_key, file_path)
return public_key_results
def get_pub_key(
which_key: str,
connection_observer: ConnectionObserver,
check_locally: str,
) -> dict:
# prep file path for either saving or using the existing one:
data_folder = Constants.HV_TICKETING_DATA_HOME
string_form_of_key_name = str(which_key)
file_path = f"{data_folder}/{string_form_of_key_name}.json"
# do we want to use the key if it's already stored locally?
if check_locally == "local":
# does the public key already exist locally?
exists_already = does_file_exist(file_path)
else:
logger.debug("Getting Public Key from API..", exc_info=True)
# setting this to false bypasses it being local for the next section, and skips to the end
exists_already = False
# Next Section
if exists_already == True:
# we're going to try to get it from the file locally:
try:
with open(file_path, "r") as file:
public_key_as_string = file.read()
# this gets in pyth dict with a single ', which is how the crypto functions need it.
public_key = ast.literal_eval(public_key_as_string)
if key_is_in_valid_format(public_key):
return {"valid": True, "data": public_key}
else:
print(
"Key format is Invalid! Let's replace it by getting a new one from the server,"
)
public_key_results = get_from_server_and_save(
string_form_of_key_name, file_path, connection_observer
)
return public_key_results
# but if the local key doesn't format properly, then get it from the server,
except:
public_key_results = get_from_server_and_save(
string_form_of_key_name, file_path, connection_observer
)
return public_key_results
# no local key? get from server,
else:
logger.debug(
"You do not have the public key locally. So we are getting it from the server.."
)
public_key_results = get_from_server_and_save(
string_form_of_key_name, file_path, connection_observer
)
return public_key_results

View file

@ -0,0 +1,158 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from essentials.observers.ConnectionObserver import ConnectionObserver
# services & helpers
from core.services.prepare_tickets.get_pub_key import get_pub_key
from core.services.prepare_tickets.get_pub_key import key_is_in_valid_format
from core.services.helpers.get_plan_data import get_plan_data
# utils
from core.utils.basic_operations.filter_data import filter_data
from core.utils.basic_operations.write_or_read_from_json import get_value_from_json_file
from core.utils.basic_operations.does_file_exist import does_file_exist
# constants
from core.Constants import Constants
# generic
import ast
"""
This service tries to get a public key, based on the config file.
It first checks if the config file has which key it wants.
Then it checks for a temp billing code, and if there is one, asks the server what the key is.
"""
# Get which key plan they have.
def get_public_key_FROM_config(filepath) -> str | None:
try:
which_key = get_value_from_json_file(filepath, "which_key")
return which_key
except:
return None
def get_public_key_by_config(connection_observer: ConnectionObserver) -> dict:
list_of_failures = [None, False, ""]
billing_folder = Constants.HV_TICKETING_CONFIG_HOME
filepath = f"{billing_folder}/billing_choices.json"
which_key = get_public_key_FROM_config(filepath)
# if there is no key in the config?
if which_key is None:
# then try to get the temp billing id.
temp_billing_code = get_value_from_json_file(filepath, "temp_billing_code")
# but what if there is not even a temp billing ID?
if temp_billing_code in list_of_failures:
return {
"status": False,
"message": f"Issue with finding key plan or billing code. Please check {filepath}",
}
# Now we're assuming we have a temp billing code,
# So we can use that to get the key from the server,
reply = get_plan_data(temp_billing_code, connection_observer)
if reply in list_of_failures:
return {
"status": False,
"message": f"Issues with both finding your local key plan and even connecting to the server for it. Please check {filepath}",
}
# from the server's reply, get the key
which_key = filter_data(reply, "which_key")
if which_key in list_of_failures:
return {
"status": False,
"message": f"Issues with both finding your local key plan and getting it from the server. Please check {filepath}",
}
# The reason we convert it to integer, only to convert it back to a string in the next functions,
# is because those functions can be accessed directly in other ways that would have the key as an integer.
# which_key = int(which_key)
# send to service:
public_key_results = get_pub_key(which_key, connection_observer, "local")
# if public_key_results is None:
# return {'status': False, 'message': "no_public_key"}
# assert public_key_results is not None
return public_key_results
"""
This seems like a duplicate function, but it's used in situations where there was a failed validation.
The other functions get the key from local files OR the api. And they save it, which would replace the key we want to study.
Instead, this function gets it only from local files.
"""
def get_public_key_from_LOCAL_files_only(
connection_observer: ConnectionObserver,
) -> dict:
billing_folder = Constants.HV_TICKETING_CONFIG_HOME
filepath = f"{billing_folder}/billing_choices.json"
try:
which_key = get_value_from_json_file(filepath, "which_key")
except:
which_key = None
finally:
list_of_failures = [None, False, ""]
if which_key in list_of_failures:
# can't find plan? try to get temp billing id.
temp_billing_code = get_value_from_json_file(filepath, "temp_billing_code")
if temp_billing_code in list_of_failures:
return {
"status": False,
"message": f"Issue with finding key plan or billing code. Please check {filepath}",
}
reply = get_plan_data(temp_billing_code, connection_observer)
if reply in list_of_failures:
return {
"status": False,
"message": f"Issues with both finding your local key plan and even connecting to the server for it. Please check {filepath}",
}
which_key = filter_data(reply, "which_key")
if which_key in list_of_failures:
return {
"status": False,
"message": f"Issues with both finding your local key plan and getting it from the server. Please check {filepath}",
}
# now that we know the key plan, move on to getting the data:
data_folder = Constants.HV_TICKETING_DATA_HOME
string_form_of_key_name = str(which_key)
file_path = f"{data_folder}/{string_form_of_key_name}.json"
# does it exist?
exists_already = does_file_exist(file_path)
if not exists_already:
return {"valid": False, "message": "Local key file doesn't exist"}
try:
with open(file_path, "r") as file:
public_key_as_string = file.read()
# this gets in pyth dict with a single ', which is how the crypto functions need it.
public_key = ast.literal_eval(public_key_as_string)
# this only checks if the key has 'x' and 'y' keys:
if key_is_in_valid_format(public_key):
return {"valid": True, "public_key": public_key}
else:
return {
"valid": False,
"message": "Key file exists but is in an invalid format.",
}
except:
return {
"valid": False,
"message": "Key file exists but is in an invalid format or unreadable.",
}

View file

@ -0,0 +1,65 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from essentials.observers.ConnectionObserver import ConnectionObserver
#
from core.services.prepare_tickets.get_public_key_by_config import get_public_key_by_config
from core.utils.basic_operations.does_file_exist import does_file_exist
from core.utils.basic_operations.write_or_read_from_json import get_value_from_json_file
from core.Constants import Constants
from core.errors.logger import logger
"""
This function doesn't actually get the public key,
it only makes sure that it exists based on the billing config.
"""
def make_sure_pub_key_exists(connection_observer: ConnectionObserver) -> bool:
billing_folder = Constants.HV_TICKETING_CONFIG_HOME
filepath = f"{billing_folder}/billing_choices.json"
does_billing_config_exist = does_file_exist(filepath)
if does_billing_config_exist == True:
try:
which_key = get_value_from_json_file(filepath, "which_key")
rejected_choices = [None, False, ""]
if which_key in rejected_choices:
error_msg = f"The function 'make_sure_pub_key_exists' was unable to read a valid value for 'which_key' in the billing config file {filepath}."
logger.error(error_msg, exc_info=True)
print(error_msg)
return False
data_folder = Constants.HV_TICKETING_DATA_HOME
path_to_key = f"{data_folder}/{which_key}.json"
does_key_file_exist = does_file_exist(path_to_key)
if does_key_file_exist == True:
return True
else:
return False
except:
public_key = get_public_key_by_config(connection_observer)
if "status" in public_key:
if public_key["status"] == False:
# it failed to even get it from the server:
error_msg = f"The function 'make_sure_pub_key_exists' can't even get the public key from the server."
logger.error(error_msg, exc_info=True)
print(error_msg)
return False
else:
return True
else:
return True
# no billing config:
else:
error_msg = f"There is no billing config. So the function 'make_sure_pub_key_exists' can't figure out what the public key even is. The path it tried was {filepath}"
logger.error(error_msg, exc_info=True)
print(error_msg)
return False

View file

@ -0,0 +1,47 @@
from core.utils.save_data import save_data
from core.errors.logger import logger
import json
# save each of the server's signatures replies to disk,
def save_ONE_blind_signature(each_blind_signature: dict, which_ticket: int) -> bool:
did_it_save = save_data(which_ticket, "servers_blind_sign", each_blind_signature)
return did_it_save
def save_ALL_blind_sigs(ALL_signed_blind_signatures: list) -> bool:
# setup basic counter & flag for the loop below:
which_ticket = 0
did_they_ALL_save = True
for each_blind_signature in ALL_signed_blind_signatures:
which_ticket = which_ticket + 1
"""
As a list of JSONs, each signature is loaded seperately,
so that if the format is off for a single ticket, it doesn't collapse all the tickets.
Also, it's far easier to handle and work with them seperately, since each loaded JSON has the same keys.
This pattern is common throughout other modules of the project, such as unblinding.
"""
try:
blind_signature = json.loads(each_blind_signature)
that_ONE_saved = save_ONE_blind_signature(
each_blind_signature, which_ticket
)
if that_ONE_saved:
print(f"saved server's blind signature for {which_ticket}")
else:
error_msg = f"Unable to save the blind signature for {which_ticket}! Skipping it and moving on.."
logger.error(error_msg, exc_info=True)
did_they_ALL_save = False
except:
error_msg = f"Unable to save the blind signature for {which_ticket}! Skipping it and moving on.."
logger.error(error_msg, exc_info=True)
return did_they_ALL_save

View file

@ -0,0 +1,83 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from essentials.observers.ConnectionObserver import ConnectionObserver
from core.observers.TicketObserver import TicketObserver
# services
from core.services.networking.send_data_to_server import send_data_to_server
from core.services.networking.make_url import make_url
from core.services.helpers.get_which_billing_key import get_which_billing_key
# utils
from core.utils.basic_operations.write_or_read_from_json import get_value_from_json_file
# constants & errors
from core.Constants import Constants
from core.errors.exceptions import *
from core.errors.logger import logger
def prep_payload(list_of_all_blinded_data: list) -> dict:
# get temp billing id from local storage:
billing_folder = Constants.HV_TICKETING_CONFIG_HOME
billing_path = f"{billing_folder}/billing_choices.json"
temp_billing_code = get_value_from_json_file(billing_path, "temp_billing_code")
# prep the JSON payload, it doesn't need the public key, since the server can look that up,
payload = {
"temp_billing_code": temp_billing_code,
"blinded_data": list_of_all_blinded_data,
}
return payload
def send_blind_commitments(
list_of_all_blinded_data: list,
ticket_observer: TicketObserver,
connection_observer: ConnectionObserver,
) -> list | None:
try:
payload = prep_payload(list_of_all_blinded_data)
# send it:
which_endpoint = "sign"
url = make_url(which_endpoint)
reply = send_data_to_server(payload, url, connection_observer)
# did it work?
if reply.get("valid") == True:
# this 'signed_data' variable is a list,
signed_data = reply.get("signed_data")
# it's then sending it to the other functions,
# to iterate through, when they save and unblind,
return signed_data
# but what if it didn't work?
elif reply.get("valid") == False:
error_msg = reply.get("message")
error_msg_with_context = (
f"Issues with Signing. Server's error message said: {error_msg}"
)
ticket_observer.notify("error", subject=error_msg_with_context)
return None
if reply == False:
error_msg = f"It did not send. There's an error with sending the data to server. The payload either was or was going to be: {payload}"
ticket_observer.notify("error", subject=error_msg)
return None
else:
error_msg = "The JSON returned from the server is not formatted correctly to even be able to figure why it did not work. This is a critical error. Contact customer support"
ticket_observer.notify("error", subject=error_msg)
return None
except InvalidData as e:
logger.error(f"InvalidData error: {e}", exc_info=True)
return None
except ServerSideError as e:
logger.error(f"ServerSideError: {e}", exc_info=True)
return None
except:
logger.error(f"Generic Python Error", exc_info=True)
return None

View file

@ -0,0 +1,24 @@
from core.utils.basic_operations.write_or_read_from_json import write_json_to_file
from core.errors.exceptions import *
from core.errors.logger import logger
from core.Constants import Constants
import json
# CREATE the json keeping track of all tickets,
def setup_ticket_tracker(how_many_profiles: int) -> None:
billing_folder = Constants.HV_TICKETING_CONFIG_HOME
ticket_tracker_path = f"{billing_folder}/ticket_tracker.json"
ticket_data = {}
counter = 1
while counter <= how_many_profiles:
ticket_data[str(counter)] = {
"status": "unused",
"location": None,
"subscription": None,
}
counter += 1
write_json_to_file(ticket_data, ticket_tracker_path)

View file

@ -0,0 +1,107 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.observers.TicketObserver import TicketObserver
from essentials.observers.ConnectionObserver import ConnectionObserver
# services
from core.services.crypto.make_commitments import make_ALL_commitments
from core.services.prepare_tickets.unblind_all_tickets import unblind_ALL_tickets
from core.services.prepare_tickets.setup_ticket_tracker import setup_ticket_tracker
from core.services.prepare_tickets.send_blind_commitments import send_blind_commitments
from core.services.prepare_tickets.validate_blind_signatures import validate_blind_signatures
from core.services.prepare_tickets.save_ALL_blind_sigs import save_ALL_blind_sigs
# errors & constants
from core.Constants import Constants
from core.errors.exceptions import *
from core.errors.logger import logger
def ticket_prep_orchestrator(
how_many_profiles: int,
ticket_observer: TicketObserver,
connection_observer: ConnectionObserver,
) -> dict:
try:
# make all commitments, but get the blinded ones:
list_of_all_blinded_commitments = make_ALL_commitments(how_many_profiles)
# did we actually save them? if not, try again,
if list_of_all_blinded_commitments is None:
list_of_all_blinded_commitments = make_ALL_commitments(how_many_profiles)
# still failed to save?!
if list_of_all_blinded_commitments is None:
return {
"valid": False,
"message": "cant_save",
}
# assuming we actually saved the unblinding factors,
# then send the entire blinded list to the server to sign:
blind_signatures = send_blind_commitments(
list_of_all_blinded_commitments, ticket_observer, connection_observer
)
# Recieve signatures:
if (
blind_signatures == "error"
or blind_signatures == None
or blind_signatures == False
):
return {
"valid": False,
"message": f"The server's blind signature was blank or invalid. It said: {blind_signatures}",
}
else:
# regardless of the outcome of the verification, save all blind sigs, just in case, because the user can't get them again,
did_they_ALL_save = save_ALL_blind_sigs(blind_signatures)
# verify the server's blind signatures against the public key,
failed_validations = validate_blind_signatures(
blind_signatures, ticket_observer, connection_observer
)
logger.debug(f"failed_validations is {failed_validations}")
# did verification of any of the blind sigs fail?
how_many_failed = len(failed_validations)
if how_many_failed >= 1:
logger.debug(
f"Verification failed for {how_many_failed} blind signatures."
)
return {
"valid": False,
"message": "verification_failed",
"how_many_failed": how_many_failed,
"failed_validations": failed_validations,
}
# Unblind the signatures & combine with unblinded commitment:
did_prep_work = unblind_ALL_tickets(
blind_signatures, ticket_observer, connection_observer
)
# make a json to keep track of which tickets are used:
setup_ticket_tracker(how_many_profiles)
if did_prep_work:
# this means it unblinded, & setup the tracker without erroring out,
# but it does NOT mean that verification of the blind sigs worked.
return {"valid": True, "message": "worked"}
else:
return {"valid": False, "message": "failed"}
except CriticalFailure as e:
error_msg = f"CriticalFailure error: {e}"
logger.error(error_msg, exc_info=True)
return {"valid": False, "message": error_msg}
except MissingData as e:
error_msg = f"MissingData: {e}"
logger.error(error_msg, exc_info=True)
return {"valid": False, "message": error_msg}
except:
error_msg = f"Generic Python Error"
logger.error(error_msg, exc_info=True)
return {"valid": False, "message": error_msg}

View file

@ -0,0 +1,79 @@
from core.utils.basic_operations.write_or_read_from_json import (
write_json_to_file,
read_entire_json,
)
from core.errors.exceptions import *
from core.errors.logger import logger
from core.Constants import Constants
import json
import os
# folders:
billing_folder = Constants.HV_TICKETING_CONFIG_HOME
ticket_tracker_path = f"{billing_folder}/ticket_tracker.json"
def get_all_unused_tickets() -> dict:
data = read_entire_json(ticket_tracker_path)
if data == False or data == "" or data == None:
return {"valid": False, "message": "invalid_data"}
counter = 1
length_of_json = len(data)
unused_tickets = []
while counter <= length_of_json:
counter_as_string = str(counter)
if data[counter_as_string]["status"] == "unused":
unused_tickets.append(counter)
counter = counter + 1
if unused_tickets == []:
return {"valid": False, "message": "no_tickets_left"}
else:
return {"valid": True, "data": unused_tickets}
def is_given_ticket_able_to_be_read(which_ticket: int) -> bool:
data = read_entire_json(ticket_tracker_path)
# the ticket # was originally an int, but needs to be a string to do lookups,
which_ticket = str(which_ticket) # type: ignore
# Check if key exists
if which_ticket in data:
return True
else:
return False
def does_ticket_tracker_exist() -> tuple:
if os.path.exists(ticket_tracker_path):
return (True, ticket_tracker_path)
else:
return (False, ticket_tracker_path)
def get_data_for_a_single_ticket(which_ticket_as_int: int) -> tuple:
data = read_entire_json(ticket_tracker_path)
if data == False or data == "" or data == None:
error_msg = f"Entire file of tickets does not exist. Check the location {ticket_tracker_path}"
logger.error(error_msg, exc_info=True)
print(error_msg)
raise InvalidData(error_msg)
# the data is a string for lookups:
which_ticket = str(which_ticket_as_int)
# Check if key exists
if which_ticket in data:
status = data.get(which_ticket, {}).get("status")
location = data.get(which_ticket, {}).get("location")
subscription = data.get(which_ticket, {}).get("subscription")
return status, location, subscription
else:
raise InvalidData(
f"Key '{which_ticket}' does not exist in JSON file {ticket_tracker_path}"
)

View file

@ -0,0 +1,136 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.observers.TicketObserver import TicketObserver
from essentials.observers.ConnectionObserver import ConnectionObserver
# services
from core.services.crypto.TicketCustomer import TicketCustomer
# utils
from core.utils.save_data import save_data
# observers
from core.observers.BaseObserver import BaseObserver
from core.models.Event import Event
# errors
from core.errors.exceptions import *
from core.errors.logger import logger
# generic
import json
def prep_ONE_unblinded_ticket(
profile_object: TicketCustomer,
which_ticket: int,
blind_signature: dict,
) -> str | bool:
# Step 1: Unblind:
unblinded_signature = profile_object.unblind_signature(
which_ticket, blind_signature
)
# step 2: make a json with the unblinded commitment & unblinded signature
unblinded_ticket = profile_object.make_final_ticket(
which_ticket, unblinded_signature
)
# error filter:
if unblinded_ticket == False:
error_msg = "Error could not prepare the unblinded JSON, due to missing data. Potentially the unblinded commitment"
raise MissingData(error_msg)
# wipe data (the signatures stored in the object can't be passed into the next ticket):
profile_object.reset()
return unblinded_ticket
def unblind_ALL_tickets(
ALL_signed_blind_signatures: list,
ticket_observer: TicketObserver,
connection_observer: ConnectionObserver,
) -> bool:
try:
if ALL_signed_blind_signatures == False or ALL_signed_blind_signatures == None:
raise MissingData(
"The Server gave back no data or it's missing for ALL_signed_blind_signatures in the unblind_and_prove_ALL_profiles function . This can't unblind blank data."
)
# Setup the entire class object of "profile_object" for using all these other functions,
profile_object = TicketCustomer()
# setup basic counter for the loop below:
which_ticket = 0
"""
LOOP STARTS:
Looping through each signature, and then unblinding it.
Finally, saving the unblinded signature and unblinded commitment together.
"""
for each_blind_signature in ALL_signed_blind_signatures:
which_ticket = which_ticket + 1
# We type check, instead of just blindly loading the json, because this function used by two different modules with different formats.
if isinstance(each_blind_signature, str):
blind_signature = json.loads(each_blind_signature)
else:
blind_signature = each_blind_signature
notification = f"preparing ticket {which_ticket}"
ticket_observer.notify("preparing", subject=notification)
# unblind the signature. and then put it in a JSON together with the original unblinded commitment:
final_ticket = prep_ONE_unblinded_ticket(
profile_object, which_ticket, blind_signature
)
# this is critical, and can not be ignored:
if final_ticket == False:
# one of the tickets failed, but not all of them yet.
notification = f"Error! Failed to prep profile {which_ticket}"
ticket_observer.notify("preparing", subject=notification)
else:
# save this "final_ticket" to disk:
did_ticket_save = save_data(
which_ticket, "unblinded_final_ticket", final_ticket
)
# make sure the final ticket saved,
if did_ticket_save == False:
notification = f"Critical Error! Failed to save the final ticket for profile {which_ticket}"
print(notification)
print(final_ticket)
ticket_observer.notify("preparing", subject=notification)
try:
raise FailedToSave(notification)
except FailedToSave as e:
print(f"Failed to Save the Final Ticket: {e}")
# include in the log, the data we're trying to save:
logger.error(
f"FailedToSave error: {e} with this final ticket: {final_ticket}",
exc_info=True,
)
# continue because this is only one ticket, don't want to ruin the others.
continue
notification = f"Finished ticket {which_ticket}'s prep"
ticket_observer.notify("preparing", subject=notification)
# finished the loop:
return True
except FailedToSave as e:
logger.error(f"FailedToSave: {e}", exc_info=True)
return False
except InvalidData as e:
logger.error(f"InvalidData error: {e}", exc_info=True)
return False
except MissingData as e:
logger.error(f"MissingData: {e}", exc_info=True)
return False
except:
logger.error(f"Generic Python Error", exc_info=True)
return False

View file

@ -0,0 +1,177 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.observers.TicketObserver import TicketObserver
from essentials.observers.ConnectionObserver import ConnectionObserver
# services
from core.services.crypto.TicketCustomer import TicketCustomer
from core.services.prepare_tickets.get_public_key_by_config import get_public_key_by_config
from core.services.prepare_tickets.get_pub_key import key_is_in_valid_format, get_pub_key
# utils
from core.utils.save_data import save_data
from core.utils.basic_operations.write_or_read_from_json import get_value_from_json_file
# observers, constants, errors
from core.observers.BaseObserver import BaseObserver
from core.models.Event import Event
from core.Constants import Constants
from core.errors.exceptions import *
from core.errors.logger import logger
import traceback
# generic
import json
def make_sure_we_have_inputs(
ALL_signed_blind_signatures: list,
profile_object: TicketCustomer,
ticket_observer: TicketObserver,
connection_observer: ConnectionObserver,
) -> dict:
try:
if ALL_signed_blind_signatures == False or ALL_signed_blind_signatures == None:
error_msg = "The Server gave back no data or it's missing for ALL_signed_blind_signatures in the unblind_and_prove_ALL_profiles function . This can't unblind blank data."
logger.error(error_msg, exc_info=True)
logger.debug(error_msg)
return {"valid": False, "error_msg": error_msg}
# get the public key for entire loop:
public_key_results = get_public_key_by_config(connection_observer)
# did we actually get the public key?
if "data" not in public_key_results:
error_msg = (
"The validation function can't even get the public key to do it's loop"
)
logger.error(error_msg, exc_info=True)
logger.debug(error_msg)
return {"valid": False, "error_msg": error_msg}
public_key = public_key_results.get("data")
rejected_reasons = [None, False, ""]
if public_key in rejected_reasons:
error_msg = "The validation function can't get the public key"
logger.debug(error_msg)
return {"valid": False, "error_msg": error_msg}
# this just checks if it has an X and Y in a dictionary:
if not key_is_in_valid_format(public_key) or not isinstance(public_key, dict):
return {"valid": False, "error_msg": "invalid_key_format"}
logger.debug("Checking if the public key is even able to be loaded...")
valid_key = profile_object.load_key(public_key)
if valid_key:
logger.debug("Public Key can be loaded")
return {"valid": True, "public_key": public_key}
else:
error_msg = "The validation function is working with an invalid public key"
logger.debug(error_msg)
return {"valid": False, "error_msg": error_msg}
except:
error_msg = (
"We lack the ability to get all the inputs for the verification loop"
)
logger.error(error_msg, exc_info=True)
logger.debug(error_msg)
return {"valid": False, "error_msg": error_msg}
"""
Technically this entire module is optional, as it isn't used for the actual ticket preparation.
This module exists so the customer can be sure that the server did not maliciously sign with a fake new key,
to try to secretly identify users (by using different keys for each one).
"""
def validate_blind_signatures(
ALL_signed_blind_signatures: list,
ticket_observer: TicketObserver,
connection_observer: ConnectionObserver,
):
try:
# Setup the entire class object of "profile_object" for using all these other functions,
profile_object = TicketCustomer()
# start: confirm inputs
inputs = make_sure_we_have_inputs(
ALL_signed_blind_signatures,
profile_object,
ticket_observer,
connection_observer,
)
if not inputs["valid"]:
logger.debug("ending the validation process early.")
return inputs
public_key = inputs["public_key"]
# end: confirm inputs
# setup basic counters, flags, and lists for the loop below:
which_ticket = 0
did_we_answer_the_key_question_yet = "no"
list_of_failed_verifications = []
"""
LOOP STARTS:
Looping through each signature, verifying it against the single public key.
"""
for each_blind_signature in ALL_signed_blind_signatures:
which_ticket = which_ticket + 1
# prep from server's format:
blind_signature = json.loads(each_blind_signature)
#### Check blind signature validity for THAT signature:
validity_data = profile_object.test_blind_signature_validity(
which_ticket, blind_signature, public_key
)
if validity_data["valid"] == True:
# verification went fine:
notification = f"Verified {which_ticket}'s blind signature"
ticket_observer.notify("preparing", subject=notification)
continue
else:
# is it an invalid pub key?
if validity_data["message"] == "invalid_key":
notification = f"Verification of all blind signatures is halted, because you have an invalid public key!"
ticket_observer.notify("preparing", subject=notification)
return [which_ticket]
# otherwise:
list_of_failed_verifications.append(which_ticket)
notification = f"Invalid {which_ticket} blind signature"
ticket_observer.notify("preparing", subject=notification)
### LOOP DONE
logger.debug(
f"Exiting the validation loop with the list_of_failed_verifications: {list_of_failed_verifications}"
)
return list_of_failed_verifications
except FailedToSave as e:
error_msg = f"FailedToSave: {e}"
logger.error(error_msg, exc_info=True)
return {"valid": False, "message": "error", "error": error_msg}
except InvalidData as e:
error_msg = f"InvalidData error: {e}"
logger.error(error_msg, exc_info=True)
return {"valid": False, "message": "error", "error": error_msg}
except MissingData as e:
error_msg = f"MissingData: {e}"
logger.error(error_msg, exc_info=True)
return {"valid": False, "message": "error", "error": error_msg}
except Exception as e:
tb = traceback.extract_tb(e.__traceback__)[-1]
error_msg = f"error type {type(e).__name__} happened on line {tb.lineno}, in function {tb.name}, giving error: {e}. The actual code in question is: {tb.line}. in this file: {tb.filename}"
logger.error(error_msg, exc_info=True)
return {"valid": False, "message": "error", "error": error_msg}

View file

@ -0,0 +1,59 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from essentials.observers.ConnectionObserver import ConnectionObserver
# services & helpers
from core.services.networking.send_data_to_server import send_data_to_server
from core.services.networking.make_url import make_url
from core.services.helpers.get_which_billing_key import get_which_billing_key
# utils
from core.utils.get_raw_string import get_raw_string
# errors & generic
from core.errors.exceptions import *
from core.errors.logger import logger
import traceback
# this first gathers the right data, then sends it for validation,
def send_unblinded_ticket_to_server(
which_ticket: int,
which_location: str,
connection_observer: ConnectionObserver,
) -> dict:
try:
ticket_data = get_raw_string(which_ticket, "unblinded_final_ticket")
if ticket_data == False or ticket_data == "" or ticket_data == None:
error_msg = f"Critical Error! Unable to read the ticket data for profile {which_ticket}. Please check the .local/share/hydra-veil/unblinded_final_ticket folder for that JSON"
logger.error(error_msg, exc_info=True)
return {"valid": False, "message": "invalid_data"}
if which_location == False or which_location == "" or which_location == None:
print(f"Invalid location choice.")
return {"valid": False, "message": "invalid_location"}
which_key = get_which_billing_key()
# prep the JSON payload:
payload = {
"which_key": which_key,
"which_location": which_location,
"ticket_data": ticket_data,
}
# send it:
which_endpoint = "validate"
url = make_url(which_endpoint)
reply = send_data_to_server(payload, url, connection_observer)
return reply
except Exception as e:
human_readable_error_msg = f"The send_unblinded_ticket_to_server function's try-except block failed for ticket {which_ticket}. Returning False..."
logger.debug(human_readable_error_msg)
tb = traceback.extract_tb(e.__traceback__)[-1]
error_msg = f"{human_readable_error_msg}. The error type {type(e).__name__} happened on line {tb.lineno}, in function {tb.name}, giving error: {e}. The actual code in question is: {tb.line}. in this file: {tb.filename}"
logger.error(error_msg, exc_info=True)
return {"valid": False, "message": "blind_send_failed"}

View file

@ -0,0 +1,108 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from essentials.observers.ConnectionObserver import ConnectionObserver
##############
from core.services.using_tickets.send_unblinded import send_unblinded_ticket_to_server
from core.utils.basic_operations.write_or_read_from_json import (
update_value_in_json_with_two_values,
)
from core.utils.basic_operations.write_string_to_text_file import write_string_to_text_file
from core.errors.exceptions import *
from core.errors.logger import logger
from core.Constants import Constants
import traceback
# Coordinates using the ticket anonymously.
# To do so, it sends to the server & updates the 'ticker_tracker' JSON with new data:
def use_ticket_orchestrator(
which_ticket: int,
which_location: str,
connection_observer: ConnectionObserver,
) -> dict:
# prep the ticket tracker:
billing_folder = Constants.HV_TICKETING_CONFIG_HOME
ticket_tracker_path = f"{billing_folder}/ticket_tracker.json"
# send to server:
reply = send_unblinded_ticket_to_server(
which_ticket, which_location, connection_observer
)
logger.debug(f"reply is {reply}")
if "valid" not in reply:
return {"valid": False, "message": "invalid_format"}
is_it_valid = reply.get("valid")
if "billing_code" in reply:
billing_code = reply.get("billing_code", None)
"""
the reason this 'billing_code' check is a seperate function (and before the validity test),
is because even if it's not valid to use it as a fresh ticket, it may already be used with a billing code to update.
"""
if is_it_valid:
# update the ticket tracker:
status_update = update_value_in_json_with_two_values(
ticket_tracker_path, which_ticket, "status", "used"
)
location_update = update_value_in_json_with_two_values(
ticket_tracker_path, which_ticket, "location", which_location
)
subscription_update = update_value_in_json_with_two_values(
ticket_tracker_path, which_ticket, "subscription", billing_code
)
if billing_code is not None:
make_sure_sub_saved(
subscription_update, which_ticket, billing_code, billing_folder
)
return {"valid": True, "billing_code": billing_code}
if "message" not in reply:
return {"valid": False, "message": "invalid_format"}
message = reply.get("message")
if message == "already_used":
# update the ticket tracker to reflect that it's already used with that billing code:
status_update = update_value_in_json_with_two_values(
ticket_tracker_path, which_ticket, "status", "used"
)
location_update = update_value_in_json_with_two_values(
ticket_tracker_path, which_ticket, "location", which_location
)
subscription_update = update_value_in_json_with_two_values(
ticket_tracker_path, which_ticket, "subscription", billing_code
)
if billing_code is not None:
make_sure_sub_saved(
subscription_update, which_ticket, billing_code, billing_folder
)
return reply
def make_sure_sub_saved(
subscription_update: bool,
which_ticket: int,
billing_code: str,
billing_folder: str,
) -> None:
# if it failed to save the subscription to ticket tracker
if subscription_update == False:
logger.debug(
"We failed to save the code the subscription inside the use ticket orchestrator's make_sure_sub_saved"
)
error_msg = f"Save the code for {which_ticket} of {billing_code}"
logger.error(error_msg, exc_info=True)
which_ticket_as_str = str(which_ticket)
write_string_to_text_file(
billing_code,
f"{billing_folder}/emergency_code_for_profile_{which_ticket_as_str}.txt",
)

View file

@ -0,0 +1,8 @@
import os
def does_file_exist(path):
if os.path.exists(path):
return True
else:
return False

View file

@ -0,0 +1,3 @@
def filter_data(data, look_for_what):
prized_data = data[look_for_what]
return prized_data

View file

@ -0,0 +1,57 @@
import json
from core.errors.exceptions import *
from core.errors.logger import logger
def get_json_keys(filepath):
try:
with open(filepath, "r") as f:
data = json.load(f)
keys_list = list(data.keys())
return {"status": True, "keys": keys_list}
except FileNotFoundError:
error = f"Error: File {filepath} not found."
logger.error(error, exc_info=True)
return {"status": False, "error": error}
except json.JSONDecodeError:
error = "Error: File is not valid JSON (corrupt or malformed)."
logger.error(error, exc_info=True)
return {"status": False, "error": error}
except PermissionError:
error = "Error: Permission denied. Cannot read the file."
logger.error(error, exc_info=True)
return {"status": False, "error": error}
except UnicodeDecodeError:
error = "Error: File encoding is not UTF-8."
logger.error(error, exc_info=True)
return {"status": False, "error": error}
except IOError as e:
error = f"Error: I/O error occurred: {e}"
logger.error(error, exc_info=True)
return {"status": False, "error": error}
except ValueError as e:
error = f"Error: Value error: {e}"
logger.error(error, exc_info=True)
return {"status": False, "error": error}
except MemoryError:
error = "Error: File is too large to load into memory."
logger.error(error, exc_info=True)
return {"status": False, "error": error}
except OSError as e:
error = f"Error: OS error: {e}"
logger.error(error, exc_info=True)
return {"status": False, "error": error}
except TypeError as e:
error = f"Error: Type error: {e}"
logger.error(error, exc_info=True)
return {"status": False, "error": error}
except KeyboardInterrupt:
error = "Error: Operation interrupted by user."
logger.error(error, exc_info=True)
return {"status": False, "error": error}
except Exception as e:
error = f"Error: An unexpected error occurred: {type(e).__name__} - {e}"
logger.error(error, exc_info=True)
return {"status": False, "error": error}

View file

@ -0,0 +1,127 @@
from core.errors.exceptions import *
from core.errors.logger import logger
import json
import os
from pathlib import Path
def write_json_to_file(data: dict, filepath: str) -> None:
try:
# Create directory if it doesn't exist
directory = os.path.dirname(filepath)
if directory and not os.path.exists(directory):
os.makedirs(directory)
with open(filepath, "w") as f:
json.dump(data, f, indent=4)
except TypeError as e:
raise TypeError(f"Data is not JSON serializable: {e}")
except IOError as e:
raise IOError(f"Error writing to file {filepath}: {e}")
def read_entire_json(filepath: str) -> dict:
try:
# Check if file exists
if not os.path.exists(filepath):
raise FileNotFoundError(f"File not found: {filepath}")
# Check if path is a directory
if os.path.isdir(filepath):
raise IsADirectoryError(f"Path is a directory, not a file: {filepath}")
# Open and parse JSON file
with open(filepath, "r") as f:
data = json.load(f)
return data
except json.JSONDecodeError as e:
raise json.JSONDecodeError(
f"Invalid JSON in file {filepath}: {e.msg}", e.doc, e.pos
)
except FileNotFoundError:
raise FileNotFoundError(f"File not found: {filepath}")
return False
except IsADirectoryError:
raise IsADirectoryError(f"Path is a directory, not a file: {filepath}")
return False
except IOError as e:
raise IOError(f"Error reading file {filepath}: {e}")
return False
# Opens a .json file, parses it, and retrieves a value by key.
def get_value_from_json_file(filepath: str, key: str) -> str:
"""
Raises:
KeyError: If the key doesn't exist in the JSON
ValueError: If the value is blank/None/empty
"""
data = read_entire_json(filepath)
# Check if key exists
if key not in data:
raise KeyError(f"Key {key} does not exist in JSON file {filepath}")
value = data[key]
# Check if value is blank/None/empty
if value is None:
raise ValueError(f"Value for key '{key}' is None (blank)")
if isinstance(value, str) and value.strip() == "":
raise ValueError(f"Value for key '{key}' is an empty string")
if isinstance(value, (list, dict)) and len(value) == 0:
raise ValueError(
f"Value for key '{key}' is empty (empty {type(value).__name__})"
)
return value
def update_value_in_json_with_two_values(
filepath: str,
main_key,
sub_key,
value_to_update,
) -> bool:
try:
data = read_entire_json(filepath)
# Check if key exists
if main_key not in data:
raise KeyError(f"Key '{main_key}' does not exist in JSON file {filepath}")
# update the value:
data[main_key][sub_key] = value_to_update
# update the file:
write_json_to_file(data, filepath)
return True
except:
raise InvalidData(f"Error reading file {filepath}")
return False
def update_json(filepath, key_to_add, value_to_update):
try:
data = read_entire_json(filepath)
except:
data = {}
finally:
# update the value:
data[key_to_add] = value_to_update
try:
# update the file:
write_json_to_file(data, filepath)
return True
except:
raise InvalidData(f"Error reading file {filepath}")
return False

View file

@ -0,0 +1,34 @@
"""
Note: this function will accept EITHER raw dicts or string, WITHOUT converting to JSON,
and literally convert to str to save. At a first glance, this seems ridiculous.
But the reason for this is due to serious bugs and headache with converting
in and out of the formats that Ethereum py_ecc library packages data.
While it technically is possible to spend enormous effort to repackage the data,
ultimately it does not improve the security, speed, or quality of the end-user experience.
All it does is change if the data has double or single quotes.
Therefore, we opted for this data storage format for some data types.
"""
def write_string_to_text_file(content_to_write: str | dict, file_path: str) -> bool:
if content_to_write is None:
return False
try:
with open(file_path, "w") as file:
file.write(str(content_to_write))
return True
except FileNotFoundError:
print(f"Error: The file '{file_path}' was not found.")
return False
except IOError:
print(
f"Error: An I/O error occurred while trying to read the file '{file_path}'."
)
return False
except Exception as e:
print(f"An unexpected error occurred: {e}")
return False

View file

@ -0,0 +1,66 @@
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.observers.TicketObserver import TicketObserver
#
from core.utils.basic_operations.get_json_keys import get_json_keys
from core.utils.basic_operations.does_file_exist import does_file_exist
from core.Constants import Constants
from core.errors.exceptions import *
from core.errors.logger import logger
def get_the_key_file() -> str | None:
billing_folder = Constants.HV_TICKETING_CONFIG_HOME
filepath = f"{billing_folder}/expirations.json"
file_exists = does_file_exist(filepath)
if file_exists:
return filepath
else:
return None
def get_list_of_key_choices(ticket_observer: TicketObserver) -> list | None:
filepath = get_the_key_file()
if filepath:
try:
data_result = get_json_keys(filepath)
if "keys" in data_result:
accepted_choices = data_result.get("keys", False)
return accepted_choices
elif "error" in data_result:
error_msg = data_result.get("error", False)
notification = f"Sync again, we couldn't read it. Error: {error_msg}"
ticket_observer.notify("failed_input", subject=notification)
return None
else:
return None
except:
notification = "Sync again, we couldn't read it."
ticket_observer.notify("failed_input", subject=notification)
return None
else:
notification = "Sync again, we couldn't read it."
ticket_observer.notify("failed_input", subject=notification)
return None
def confirm_its_a_valid_key_choice(
which_key: str,
ticket_observer: TicketObserver,
) -> bool:
accepted_choices = get_list_of_key_choices(ticket_observer)
if accepted_choices:
if which_key in accepted_choices:
return True
else:
return False
else:
return False

View file

@ -0,0 +1,7 @@
from datetime import datetime
def convert_time(date_string: str) -> str:
parsed_date = datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")
formatted_date = parsed_date.strftime("%B %d, %Y")
return formatted_date

83
core/utils/get_data.py Normal file
View file

@ -0,0 +1,83 @@
from core.Constants import Constants
import os
import json
import ast
from core.errors.exceptions import *
from core.errors.logger import logger
def get_data(which_ticket, name_of_file_and_folder):
data_folder = Constants.HV_TICKETING_DATA_HOME
folder_path = f"{data_folder}/{name_of_file_and_folder}"
# make sure folder exists:
if not os.path.exists(folder_path):
os.makedirs(folder_path)
# setup path:
which_ticket_as_string = str(which_ticket)
where_is_the_data = (
f"{folder_path}/{name_of_file_and_folder}_{which_ticket_as_string}.json"
)
try:
with open(where_is_the_data, "r") as file:
raw_string_content = file.read()
# Convert string to Python dict
python_dict = ast.literal_eval(raw_string_content)
return python_dict
except:
print("Error! Could not read from the file!")
return False
def get_raw_string(which_ticket, name_of_file_and_folder):
data_folder = Constants.HV_TICKETING_DATA_HOME
folder_path = f"{data_folder}/{name_of_file_and_folder}"
# make sure folder exists:
if not os.path.exists(folder_path):
os.makedirs(folder_path)
# setup path:
which_ticket_as_string = str(which_ticket)
where_is_the_data = (
f"{folder_path}/{name_of_file_and_folder}_{which_ticket_as_string}.json"
)
try:
with open(where_is_the_data, "r") as file:
raw_string_content = file.read()
return raw_string_content
except:
return False
def get_data_as_int(which_ticket, name_of_file_and_folder):
data_folder = Constants.HV_TICKETING_DATA_HOME
folder_path = f"{data_folder}/{name_of_file_and_folder}"
# make sure folder exists:
if not os.path.exists(folder_path):
os.makedirs(folder_path)
# setup path:
which_ticket_as_string = str(which_ticket)
where_is_the_data = (
f"{folder_path}/{name_of_file_and_folder}_{which_ticket_as_string}.json"
)
try:
with open(where_is_the_data, "r") as file:
raw_string_content = file.read()
# Convert to integer:
as_integer = int(raw_string_content)
return as_integer
except:
error_msg = f"Error! Could not read data from the file when going for {name_of_file_and_folder}! (inside get_data function, try-except block failed)."
logger.error(error_msg, exc_info=True)
print(error_msg)
return False

View file

@ -0,0 +1,25 @@
from core.Constants import Constants
import os
# gets a raw string, but unlike the other function, does not convert it to a JSON.
def get_raw_string(which_ticket, name_of_file_and_folder):
data_folder = Constants.HV_TICKETING_DATA_HOME
folder_path = f"{data_folder}/{name_of_file_and_folder}"
# make sure folder exists:
if not os.path.exists(folder_path):
os.makedirs(folder_path)
# setup path:
which_ticket_as_string = str(which_ticket)
where_is_the_data = (
f"{folder_path}/{name_of_file_and_folder}_{which_ticket_as_string}.json"
)
try:
with open(where_is_the_data, "r") as file:
raw_string_content = file.read()
return raw_string_content
except:
return False

50
core/utils/save_data.py Normal file
View file

@ -0,0 +1,50 @@
from core.Constants import Constants
from core.utils.basic_operations.write_string_to_text_file import write_string_to_text_file
import os
def save_data(which_ticket, name_of_file_and_folder, data_to_save):
data_folder = Constants.HV_TICKETING_DATA_HOME
folder_path = f"{data_folder}/{name_of_file_and_folder}"
# make sure folder exists:
if not os.path.exists(folder_path):
os.makedirs(folder_path)
# setup path:
which_ticket_as_string = str(which_ticket)
where_to_save_it = (
f"{folder_path}/{name_of_file_and_folder}_{which_ticket_as_string}.json"
)
# save it, or else return false if it failed:
try:
did_it_save = write_string_to_text_file(data_to_save, where_to_save_it)
return did_it_save
except:
return False
def save_data_as_json(which_ticket, name_of_file_and_folder, data_to_save):
data_folder = Constants.HV_TICKETING_DATA_HOME
folder_path = f"{data_folder}/{name_of_file_and_folder}"
# make sure folder exists:
if not os.path.exists(folder_path):
os.makedirs(folder_path)
# setup path:
which_ticket_as_string = str(which_ticket)
where_to_save_it = (
f"{folder_path}/{name_of_file_and_folder}_{which_ticket_as_string}.json"
)
# save it, or else return false if it failed:
try:
import json
with open(where_to_save_it, "w") as file:
json.dump(data_to_save, file, indent=4)
return True
except:
return False

View file

@ -1,6 +1,6 @@
[project] [project]
name = "sp-hydra-veil-core" name = "sp-hydra-veil-core"
version = "2.2.1" version = "2.3.0"
authors = [ authors = [
{ name = "Simplified Privacy" }, { name = "Simplified Privacy" },
] ]
@ -20,6 +20,30 @@ dependencies = [
"python-dateutil ~= 2.9.0.post0", "python-dateutil ~= 2.9.0.post0",
"requests ~= 2.32.5", "requests ~= 2.32.5",
"sp-essentials ~= 1.0.0", "sp-essentials ~= 1.0.0",
"annotated-types==0.7.0",
"certifi==2026.4.22",
"charset-normalizer==3.4.7",
"click==8.3.3",
"cytoolz==1.1.0",
"eth-hash==0.8.0",
"eth-typing==6.0.0",
"eth-utils==6.0.0",
"idna==3.13",
"packaging==26.2",
"pathspec==1.1.1",
"platformdirs==4.9.6",
"psutil==7.2.2",
"py-ecc==8.0.0",
"pydantic==2.13.3",
"pydantic_core==2.46.3",
"pydeps==3.0.6",
"pytokens==0.4.1",
"stdlib-list==0.12.0",
"toolz==1.1.0",
"typing-inspect==0.9.0",
"typing-inspection==0.4.2",
"typing_extensions==4.15.0",
"urllib3==2.6.3",
] ]
[project.urls] [project.urls]