sp-hydra-veil-core/core/services/crypto/TicketCustomer.py

314 lines
12 KiB
Python

from core.utils.save_data import save_data, save_data_as_json
from core.utils.get_data import get_data, get_data_as_int
from core.services.helpers.validate_number_format import validate_number_format
import os
import hashlib
import json
from py_ecc.optimized_bls12_381 import (
G1,
G2,
multiply,
add,
curve_order,
normalize as _normalize,
)
from py_ecc.optimized_bls12_381.optimized_curve import FQ, FQ2
# for the (optional) validity tests:
from py_ecc.optimized_bls12_381 import pairing
# errors:
from core.errors.logger import logger
import traceback
class TicketCustomer:
"""
This class is the basis for all the crypto operations,
because it has the conversions from projective/alpine, deserialize/serialize g2,
and easily transfer data from one function to another without having to convert it.
"""
def __init__(self):
self.blinded_data = None
self.signature = None
# reset exists to clear the self.ticket_data between each ticket.
def reset(self):
"""
Start over. Remove all instance attributes.
The most important thing to wipe from one ticket to another is the unblinded commitment,
which is stored in self.ticket_data.
"""
self.__dict__.clear()
def _hash_to_scalar(self, *parts):
"""Hash inputs to a scalar in the curve order."""
h = hashlib.sha256()
for p in parts:
h.update(p if isinstance(p, bytes) else str(p).encode())
return int.from_bytes(h.digest(), "big") % curve_order
def _fq_to_int(self, v) -> int:
"""Convert a single FQ element to int."""
return int(getattr(v, "n", v))
def _fq2_to_list(self, v: int) -> list:
"""Convert FQ2 element to [c0, c1] list."""
if hasattr(v, "coeffs"):
c0, c1 = v.coeffs
return [self._fq_to_int(c0), self._fq_to_int(c1)]
# Fallback for single FQ
return [self._fq_to_int(v), 0]
def _ensure_affine(self, pt: dict) -> dict:
"""
This solves the errors with affine (2-tuple) vs projective (3-tuple) form, using py_ecc.
This function checks if it's projective,
And then converts it to affine if it was projective to begin with.
"""
if len(pt) == 3:
return _normalize(pt)
return pt
def _serialize_point_g2(self, pt: dict) -> dict:
"""
This function is critical for converting values into a format,
that can be used outside this class or even python,
such as sending JSONs or saving to disk.
It Serializes a G2 point to JSON-compatible format.
G2 points use FQ2 coordinates (quadratic extension field).
"""
affine_pt = self._ensure_affine(pt)
x, y = affine_pt
return {"x": self._fq2_to_list(x), "y": self._fq2_to_list(y)}
def _ensure_projective(self, pt: tuple) -> tuple:
"""
Deals with errors related to returning only 2 values instead of 3,
This function checks if it's affine (2 values),
And then converts it to projective if it was affine to begin with.
"""
if len(pt) == 2:
x, y = pt
return (x, y, FQ2.one())
return pt
# If there's a "tuple unpack" error, this is used,
def _deserialize_point_g2(self, data: dict) -> tuple:
"""
Deserialize a G2 point from JSON format.
Returns the point in affine coordinates.
"""
x_list = data["x"]
y_list = data["y"]
x = FQ2([x_list[0], x_list[1]])
y = FQ2([y_list[0], y_list[1]])
return (x, y, FQ2.one()) # projective form
# Commitment has to be on G2, in order for the pairing equation to match during validation.
def make_unblinded_commitment(self, which_ticket: int) -> bool:
# we need a random number that's G2 (inside the curve),
# 1. Generate 32 random bytes
# 2. Convert bytes to integer
# 3. Constrain to the curve order range
commitment_input = int.from_bytes(os.urandom(32), "big") % curve_order
# Commitment has to be on G2. So we're creating it by using the multiply function with a G2 generator,
unblind_commitment = multiply(G2, commitment_input)
# Now we want to both:
# 1) Use it in functions here (inside this module).
self.ticket_data = {"unblind_commitment": unblind_commitment}
# 2) Save it to disk
# So, we have to serialize it to save it,
data_to_save_to_disk = {
"unblind_commitment": self._serialize_point_g2(unblind_commitment)
}
# this wil be used at a future point to unblind the billing server's signature:
did_it_save = save_data(
which_ticket, "unblind_commitment", data_to_save_to_disk
)
# note: we return the status of our saving operation, and not the commitment itself.
# (because it's saved in the 'self.ticket_data' object).
return did_it_save
def blind_commitment(self, which_ticket: int) -> str | None:
"""
After the previous original (unblinded) commitment is setup,
we can now blind it, (so external functions outside this class can send to the server)
Requirements:
This function assumes you already have the unblinded commitment saved in the 'self.ticket_data' object.
"""
# That's why this function starts off by checking you have it,
if self.ticket_data is None:
raise ValueError(
"We are missing the original unblinded commitment in self.ticket_data"
)
original_commitment = self.ticket_data["unblind_commitment"]
# Create the blinding factor.
blinding_factor = int.from_bytes(os.urandom(32), "big") % curve_order
# Blind the commitment,
blind_commitment = multiply(original_commitment, blinding_factor)
# Save/store the 'blinding factor' as just the raw number. This is kept on your local device,
did_it_save_blinding_factor = save_data(
which_ticket, "blinding_factors_int", str(blinding_factor)
)
# While the 'blind commitment' is being serialized,
g2_serialized_blind_commitment = self._serialize_point_g2(blind_commitment)
# we also save the 'blind commitment' so it can be used to validate later,
did_it_save = save_data_as_json(
which_ticket, "blinded_commitment_json", g2_serialized_blind_commitment
)
# Serialized blinded commitment is being prepared to send to the server,
blinded_json = json.dumps(g2_serialized_blind_commitment)
# double check both pieces of data are saved:
if did_it_save == True and did_it_save_blinding_factor == True:
return blinded_json
else:
return None
def load_key(self, pub_key: dict) -> tuple | bool:
try:
# use it in projective form (x, y, z), where z=1 for affine points
projective_public_key = (FQ(pub_key["x"]), FQ(pub_key["y"]), FQ.one())
return projective_public_key
except:
return False
def test_blind_signature_validity(
self,
which_ticket: int,
blind_signature: dict,
string_public_key: dict,
) -> dict:
# Deserialize the blinded signature (that was just recieved from the billing server as a JSON)
blinded_signature = self._deserialize_point_g2(blind_signature)
blinded_commitment_as_dict = get_data(which_ticket, "blinded_commitment_json")
blinded_commitment = self._deserialize_point_g2(blinded_commitment_as_dict)
projective_public_key = self.load_key(string_public_key)
if projective_public_key == False:
return {"valid": False, "message": "invalid_key"}
# All of that was to prep the values for this pairing equation,
try:
if pairing(blinded_signature, G1) == pairing(
blinded_commitment, projective_public_key
):
return {"valid": True, "message": "It worked."}
else:
return {
"valid": False,
"message": "Pairing equation was able to compute, but did not actually match.",
}
except ValueError as e:
# I want to see if the reason for the error is this below, because that likely is a bad public key,
# "ValueError: Invalid input - point P is not on the correct curves"
error_message_as_string = str(e)
import re
if re.search(
r"point P is not on the correct curves", error_message_as_string
):
error_msg = f"The reason the verification failed is because the public key is invalid (not on the elliptic curve. Original message: {error_message_as_string})"
logger.error(error_msg, exc_info=True)
return {"valid": False, "message": "invalid_public_key"}
else:
return {"valid": True, "message": "verification_failed"}
except Exception as e:
tb = traceback.extract_tb(e.__traceback__)[-1]
error_msg = f"error type {type(e).__name__} happened on line {tb.lineno}, in function {tb.name}, giving error: {e}. The actual code in question is: {tb.line}. in this file: {tb.filename}"
logger.error(error_msg, exc_info=True)
return {"valid": True, "message": "verification_failed"}
# this is after the server signs,
def unblind_signature(
self,
which_ticket: int,
blind_signature: dict,
) -> dict:
"""
Client unblinds the server's blinded signature using their secret "blinding_factors".
"""
try:
blinding_factor = get_data_as_int(which_ticket, "blinding_factors_int")
# uncomment this next line, if you want to load it from a file instead:
# blinded_sig_data = json.loads(blinded_sig_json)
# Deserialize the blinded signature (that was just recieved from the billing server as a JSON)
blinded_signature = self._deserialize_point_g2(blind_signature)
# Unblind: S = b^(-1) · S_blind
b_inv = pow(blinding_factor, -1, curve_order)
unblinded_signature = multiply(blinded_signature, b_inv)
return unblinded_signature
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON signature: {str(e)}")
except Exception as e:
tb = traceback.extract_tb(e.__traceback__)[-1]
error_msg = f"Issue with unblinding the signature. It's error type {type(e).__name__} happened on line {tb.lineno}, in function {tb.name}, giving error: {e}. The actual code in question is: {tb.line}. in this file: {tb.filename}"
logger.error(error_msg, exc_info=True)
return {"valid": False, "message": "invalid_blind_sig"}
def make_final_ticket(
self,
which_ticket: int,
unblinded_signature: dict,
) -> str | bool:
"""
Input: This function takes in the unblinded signature, and which ticket ("which_ticket")
Output: And makes a JSON that has both of them together.
Note: There is NO MATH done, it's just putting them together.
This final JSON, we're defining as a "ticket".
"""
# load the ORIGINAL data (prior to blinding it):
self.ticket_data = get_data(which_ticket, "unblind_commitment")
# just change the dictionary's key word to 'commitment' make it more neutral for the server,
self.ticket_data["commitment"] = self.ticket_data.pop("unblind_commitment")
if self.ticket_data == False:
error_msg = "There's an error with making the final ticket. It could not find or get the unblind_commitment from the self.ticket_data dictionary"
logger.error(error_msg, exc_info=True)
return False
# append the signature on to it:
self.ticket_data["signature"] = self._serialize_point_g2(unblinded_signature)
# that's it. no math,
ticket = json.dumps(self.ticket_data)
return ticket