from core.utils.save_data import save_data, save_data_as_json from core.utils.get_data import get_data, get_data_as_int from core.services.helpers.validate_number_format import validate_number_format import os import hashlib import json from py_ecc.optimized_bls12_381 import ( G1, G2, multiply, add, curve_order, normalize as _normalize, ) from py_ecc.optimized_bls12_381.optimized_curve import FQ, FQ2 # for the (optional) validity tests: from py_ecc.optimized_bls12_381 import pairing # errors: from core.errors.logger import logger import traceback class TicketCustomer: """ This class is the basis for all the crypto operations, because it has the conversions from projective/alpine, deserialize/serialize g2, and easily transfer data from one function to another without having to convert it. """ def __init__(self): self.blinded_data = None self.signature = None # reset exists to clear the self.ticket_data between each ticket. def reset(self): """ Start over. Remove all instance attributes. The most important thing to wipe from one ticket to another is the unblinded commitment, which is stored in self.ticket_data. """ self.__dict__.clear() def _hash_to_scalar(self, *parts): """Hash inputs to a scalar in the curve order.""" h = hashlib.sha256() for p in parts: h.update(p if isinstance(p, bytes) else str(p).encode()) return int.from_bytes(h.digest(), "big") % curve_order def _fq_to_int(self, v) -> int: """Convert a single FQ element to int.""" return int(getattr(v, "n", v)) def _fq2_to_list(self, v: int) -> list: """Convert FQ2 element to [c0, c1] list.""" if hasattr(v, "coeffs"): c0, c1 = v.coeffs return [self._fq_to_int(c0), self._fq_to_int(c1)] # Fallback for single FQ return [self._fq_to_int(v), 0] def _ensure_affine(self, pt: dict) -> dict: """ This solves the errors with affine (2-tuple) vs projective (3-tuple) form, using py_ecc. This function checks if it's projective, And then converts it to affine if it was projective to begin with. """ if len(pt) == 3: return _normalize(pt) return pt def _serialize_point_g2(self, pt: dict) -> dict: """ This function is critical for converting values into a format, that can be used outside this class or even python, such as sending JSONs or saving to disk. It Serializes a G2 point to JSON-compatible format. G2 points use FQ2 coordinates (quadratic extension field). """ affine_pt = self._ensure_affine(pt) x, y = affine_pt return {"x": self._fq2_to_list(x), "y": self._fq2_to_list(y)} def _ensure_projective(self, pt: tuple) -> tuple: """ Deals with errors related to returning only 2 values instead of 3, This function checks if it's affine (2 values), And then converts it to projective if it was affine to begin with. """ if len(pt) == 2: x, y = pt return (x, y, FQ2.one()) return pt # If there's a "tuple unpack" error, this is used, def _deserialize_point_g2(self, data: dict) -> tuple: """ Deserialize a G2 point from JSON format. Returns the point in affine coordinates. """ x_list = data["x"] y_list = data["y"] x = FQ2([x_list[0], x_list[1]]) y = FQ2([y_list[0], y_list[1]]) return (x, y, FQ2.one()) # projective form # Commitment has to be on G2, in order for the pairing equation to match during validation. def make_unblinded_commitment(self, which_ticket: int) -> bool: # we need a random number that's G2 (inside the curve), # 1. Generate 32 random bytes # 2. Convert bytes to integer # 3. Constrain to the curve order range commitment_input = int.from_bytes(os.urandom(32), "big") % curve_order # Commitment has to be on G2. So we're creating it by using the multiply function with a G2 generator, unblind_commitment = multiply(G2, commitment_input) # Now we want to both: # 1) Use it in functions here (inside this module). self.ticket_data = {"unblind_commitment": unblind_commitment} # 2) Save it to disk # So, we have to serialize it to save it, data_to_save_to_disk = { "unblind_commitment": self._serialize_point_g2(unblind_commitment) } # this wil be used at a future point to unblind the billing server's signature: did_it_save = save_data( which_ticket, "unblind_commitment", data_to_save_to_disk ) # note: we return the status of our saving operation, and not the commitment itself. # (because it's saved in the 'self.ticket_data' object). return did_it_save def blind_commitment(self, which_ticket: int) -> str | None: """ After the previous original (unblinded) commitment is setup, we can now blind it, (so external functions outside this class can send to the server) Requirements: This function assumes you already have the unblinded commitment saved in the 'self.ticket_data' object. """ # That's why this function starts off by checking you have it, if self.ticket_data is None: raise ValueError( "We are missing the original unblinded commitment in self.ticket_data" ) original_commitment = self.ticket_data["unblind_commitment"] # Create the blinding factor. blinding_factor = int.from_bytes(os.urandom(32), "big") % curve_order # Blind the commitment, blind_commitment = multiply(original_commitment, blinding_factor) # Save/store the 'blinding factor' as just the raw number. This is kept on your local device, did_it_save_blinding_factor = save_data( which_ticket, "blinding_factors_int", str(blinding_factor) ) # While the 'blind commitment' is being serialized, g2_serialized_blind_commitment = self._serialize_point_g2(blind_commitment) # we also save the 'blind commitment' so it can be used to validate later, did_it_save = save_data_as_json( which_ticket, "blinded_commitment_json", g2_serialized_blind_commitment ) # Serialized blinded commitment is being prepared to send to the server, blinded_json = json.dumps(g2_serialized_blind_commitment) # double check both pieces of data are saved: if did_it_save == True and did_it_save_blinding_factor == True: return blinded_json else: return None def load_key(self, pub_key: dict) -> tuple | bool: try: # use it in projective form (x, y, z), where z=1 for affine points projective_public_key = (FQ(pub_key["x"]), FQ(pub_key["y"]), FQ.one()) return projective_public_key except: return False def test_blind_signature_validity( self, which_ticket: int, blind_signature: dict, string_public_key: dict, ) -> dict: # Deserialize the blinded signature (that was just recieved from the billing server as a JSON) blinded_signature = self._deserialize_point_g2(blind_signature) blinded_commitment_as_dict = get_data(which_ticket, "blinded_commitment_json") blinded_commitment = self._deserialize_point_g2(blinded_commitment_as_dict) projective_public_key = self.load_key(string_public_key) if projective_public_key == False: return {"valid": False, "message": "invalid_key"} # All of that was to prep the values for this pairing equation, try: if pairing(blinded_signature, G1) == pairing( blinded_commitment, projective_public_key ): return {"valid": True, "message": "It worked."} else: return { "valid": False, "message": "Pairing equation was able to compute, but did not actually match.", } except ValueError as e: # I want to see if the reason for the error is this below, because that likely is a bad public key, # "ValueError: Invalid input - point P is not on the correct curves" error_message_as_string = str(e) import re if re.search( r"point P is not on the correct curves", error_message_as_string ): error_msg = f"The reason the verification failed is because the public key is invalid (not on the elliptic curve. Original message: {error_message_as_string})" logger.error(error_msg, exc_info=True) return {"valid": False, "message": "invalid_public_key"} else: return {"valid": True, "message": "verification_failed"} except Exception as e: tb = traceback.extract_tb(e.__traceback__)[-1] error_msg = f"error type {type(e).__name__} happened on line {tb.lineno}, in function {tb.name}, giving error: {e}. The actual code in question is: {tb.line}. in this file: {tb.filename}" logger.error(error_msg, exc_info=True) return {"valid": True, "message": "verification_failed"} # this is after the server signs, def unblind_signature( self, which_ticket: int, blind_signature: dict, ) -> dict: """ Client unblinds the server's blinded signature using their secret "blinding_factors". """ try: blinding_factor = get_data_as_int(which_ticket, "blinding_factors_int") # uncomment this next line, if you want to load it from a file instead: # blinded_sig_data = json.loads(blinded_sig_json) # Deserialize the blinded signature (that was just recieved from the billing server as a JSON) blinded_signature = self._deserialize_point_g2(blind_signature) # Unblind: S = b^(-1) ยท S_blind b_inv = pow(blinding_factor, -1, curve_order) unblinded_signature = multiply(blinded_signature, b_inv) return unblinded_signature except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON signature: {str(e)}") except Exception as e: tb = traceback.extract_tb(e.__traceback__)[-1] error_msg = f"Issue with unblinding the signature. It's error type {type(e).__name__} happened on line {tb.lineno}, in function {tb.name}, giving error: {e}. The actual code in question is: {tb.line}. in this file: {tb.filename}" logger.error(error_msg, exc_info=True) return {"valid": False, "message": "invalid_blind_sig"} def make_final_ticket( self, which_ticket: int, unblinded_signature: dict, ) -> str | bool: """ Input: This function takes in the unblinded signature, and which ticket ("which_ticket") Output: And makes a JSON that has both of them together. Note: There is NO MATH done, it's just putting them together. This final JSON, we're defining as a "ticket". """ # load the ORIGINAL data (prior to blinding it): self.ticket_data = get_data(which_ticket, "unblind_commitment") # just change the dictionary's key word to 'commitment' make it more neutral for the server, self.ticket_data["commitment"] = self.ticket_data.pop("unblind_commitment") if self.ticket_data == False: error_msg = "There's an error with making the final ticket. It could not find or get the unblind_commitment from the self.ticket_data dictionary" logger.error(error_msg, exc_info=True) return False # append the signature on to it: self.ticket_data["signature"] = self._serialize_point_g2(unblinded_signature) # that's it. no math, ticket = json.dumps(self.ticket_data) return ticket