314 lines
12 KiB
Python
314 lines
12 KiB
Python
from core.utils.save_data import save_data, save_data_as_json
|
|
from core.utils.get_data import get_data, get_data_as_int
|
|
from core.services.helpers.validate_number_format import validate_number_format
|
|
|
|
import os
|
|
import hashlib
|
|
import json
|
|
from py_ecc.optimized_bls12_381 import (
|
|
G1,
|
|
G2,
|
|
multiply,
|
|
add,
|
|
curve_order,
|
|
normalize as _normalize,
|
|
)
|
|
from py_ecc.optimized_bls12_381.optimized_curve import FQ, FQ2
|
|
|
|
# for the (optional) validity tests:
|
|
from py_ecc.optimized_bls12_381 import pairing
|
|
|
|
# errors:
|
|
from core.errors.logger import logger
|
|
import traceback
|
|
|
|
|
|
class TicketCustomer:
|
|
"""
|
|
This class is the basis for all the crypto operations,
|
|
because it has the conversions from projective/alpine, deserialize/serialize g2,
|
|
and easily transfer data from one function to another without having to convert it.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.blinded_data = None
|
|
self.signature = None
|
|
|
|
# reset exists to clear the self.ticket_data between each ticket.
|
|
def reset(self):
|
|
"""
|
|
Start over. Remove all instance attributes.
|
|
The most important thing to wipe from one ticket to another is the unblinded commitment,
|
|
which is stored in self.ticket_data.
|
|
"""
|
|
self.__dict__.clear()
|
|
|
|
def _hash_to_scalar(self, *parts):
|
|
"""Hash inputs to a scalar in the curve order."""
|
|
h = hashlib.sha256()
|
|
for p in parts:
|
|
h.update(p if isinstance(p, bytes) else str(p).encode())
|
|
return int.from_bytes(h.digest(), "big") % curve_order
|
|
|
|
def _fq_to_int(self, v) -> int:
|
|
"""Convert a single FQ element to int."""
|
|
return int(getattr(v, "n", v))
|
|
|
|
def _fq2_to_list(self, v: int) -> list:
|
|
"""Convert FQ2 element to [c0, c1] list."""
|
|
if hasattr(v, "coeffs"):
|
|
c0, c1 = v.coeffs
|
|
return [self._fq_to_int(c0), self._fq_to_int(c1)]
|
|
# Fallback for single FQ
|
|
return [self._fq_to_int(v), 0]
|
|
|
|
def _ensure_affine(self, pt: dict) -> dict:
|
|
"""
|
|
This solves the errors with affine (2-tuple) vs projective (3-tuple) form, using py_ecc.
|
|
|
|
This function checks if it's projective,
|
|
And then converts it to affine if it was projective to begin with.
|
|
"""
|
|
if len(pt) == 3:
|
|
return _normalize(pt)
|
|
return pt
|
|
|
|
def _serialize_point_g2(self, pt: dict) -> dict:
|
|
"""
|
|
This function is critical for converting values into a format,
|
|
that can be used outside this class or even python,
|
|
such as sending JSONs or saving to disk.
|
|
|
|
It Serializes a G2 point to JSON-compatible format.
|
|
G2 points use FQ2 coordinates (quadratic extension field).
|
|
"""
|
|
affine_pt = self._ensure_affine(pt)
|
|
x, y = affine_pt
|
|
return {"x": self._fq2_to_list(x), "y": self._fq2_to_list(y)}
|
|
|
|
def _ensure_projective(self, pt: tuple) -> tuple:
|
|
"""
|
|
Deals with errors related to returning only 2 values instead of 3,
|
|
|
|
This function checks if it's affine (2 values),
|
|
And then converts it to projective if it was affine to begin with.
|
|
"""
|
|
if len(pt) == 2:
|
|
x, y = pt
|
|
return (x, y, FQ2.one())
|
|
return pt
|
|
|
|
# If there's a "tuple unpack" error, this is used,
|
|
def _deserialize_point_g2(self, data: dict) -> tuple:
|
|
"""
|
|
Deserialize a G2 point from JSON format.
|
|
Returns the point in affine coordinates.
|
|
"""
|
|
x_list = data["x"]
|
|
y_list = data["y"]
|
|
x = FQ2([x_list[0], x_list[1]])
|
|
y = FQ2([y_list[0], y_list[1]])
|
|
return (x, y, FQ2.one()) # projective form
|
|
|
|
# Commitment has to be on G2, in order for the pairing equation to match during validation.
|
|
def make_unblinded_commitment(self, which_ticket: int) -> bool:
|
|
|
|
# we need a random number that's G2 (inside the curve),
|
|
# 1. Generate 32 random bytes
|
|
# 2. Convert bytes to integer
|
|
# 3. Constrain to the curve order range
|
|
commitment_input = int.from_bytes(os.urandom(32), "big") % curve_order
|
|
|
|
# Commitment has to be on G2. So we're creating it by using the multiply function with a G2 generator,
|
|
unblind_commitment = multiply(G2, commitment_input)
|
|
|
|
# Now we want to both:
|
|
|
|
# 1) Use it in functions here (inside this module).
|
|
self.ticket_data = {"unblind_commitment": unblind_commitment}
|
|
|
|
# 2) Save it to disk
|
|
# So, we have to serialize it to save it,
|
|
data_to_save_to_disk = {
|
|
"unblind_commitment": self._serialize_point_g2(unblind_commitment)
|
|
}
|
|
|
|
# this wil be used at a future point to unblind the billing server's signature:
|
|
did_it_save = save_data(
|
|
which_ticket, "unblind_commitment", data_to_save_to_disk
|
|
)
|
|
|
|
# note: we return the status of our saving operation, and not the commitment itself.
|
|
# (because it's saved in the 'self.ticket_data' object).
|
|
return did_it_save
|
|
|
|
def blind_commitment(self, which_ticket: int) -> str | None:
|
|
"""
|
|
After the previous original (unblinded) commitment is setup,
|
|
we can now blind it, (so external functions outside this class can send to the server)
|
|
|
|
Requirements:
|
|
This function assumes you already have the unblinded commitment saved in the 'self.ticket_data' object.
|
|
"""
|
|
|
|
# That's why this function starts off by checking you have it,
|
|
if self.ticket_data is None:
|
|
raise ValueError(
|
|
"We are missing the original unblinded commitment in self.ticket_data"
|
|
)
|
|
|
|
original_commitment = self.ticket_data["unblind_commitment"]
|
|
|
|
# Create the blinding factor.
|
|
blinding_factor = int.from_bytes(os.urandom(32), "big") % curve_order
|
|
|
|
# Blind the commitment,
|
|
blind_commitment = multiply(original_commitment, blinding_factor)
|
|
|
|
# Save/store the 'blinding factor' as just the raw number. This is kept on your local device,
|
|
did_it_save_blinding_factor = save_data(
|
|
which_ticket, "blinding_factors_int", str(blinding_factor)
|
|
)
|
|
|
|
# While the 'blind commitment' is being serialized,
|
|
g2_serialized_blind_commitment = self._serialize_point_g2(blind_commitment)
|
|
|
|
# we also save the 'blind commitment' so it can be used to validate later,
|
|
did_it_save = save_data_as_json(
|
|
which_ticket, "blinded_commitment_json", g2_serialized_blind_commitment
|
|
)
|
|
|
|
# Serialized blinded commitment is being prepared to send to the server,
|
|
blinded_json = json.dumps(g2_serialized_blind_commitment)
|
|
|
|
# double check both pieces of data are saved:
|
|
if did_it_save == True and did_it_save_blinding_factor == True:
|
|
return blinded_json
|
|
else:
|
|
return None
|
|
|
|
def load_key(self, pub_key: dict) -> tuple | bool:
|
|
try:
|
|
# use it in projective form (x, y, z), where z=1 for affine points
|
|
projective_public_key = (FQ(pub_key["x"]), FQ(pub_key["y"]), FQ.one())
|
|
return projective_public_key
|
|
except:
|
|
return False
|
|
|
|
def test_blind_signature_validity(
|
|
self,
|
|
which_ticket: int,
|
|
blind_signature: dict,
|
|
string_public_key: dict,
|
|
) -> dict:
|
|
|
|
# Deserialize the blinded signature (that was just recieved from the billing server as a JSON)
|
|
blinded_signature = self._deserialize_point_g2(blind_signature)
|
|
|
|
blinded_commitment_as_dict = get_data(which_ticket, "blinded_commitment_json")
|
|
blinded_commitment = self._deserialize_point_g2(blinded_commitment_as_dict)
|
|
|
|
projective_public_key = self.load_key(string_public_key)
|
|
|
|
if projective_public_key == False:
|
|
return {"valid": False, "message": "invalid_key"}
|
|
|
|
# All of that was to prep the values for this pairing equation,
|
|
try:
|
|
if pairing(blinded_signature, G1) == pairing(
|
|
blinded_commitment, projective_public_key
|
|
):
|
|
return {"valid": True, "message": "It worked."}
|
|
else:
|
|
return {
|
|
"valid": False,
|
|
"message": "Pairing equation was able to compute, but did not actually match.",
|
|
}
|
|
|
|
except ValueError as e:
|
|
# I want to see if the reason for the error is this below, because that likely is a bad public key,
|
|
# "ValueError: Invalid input - point P is not on the correct curves"
|
|
error_message_as_string = str(e)
|
|
import re
|
|
|
|
if re.search(
|
|
r"point P is not on the correct curves", error_message_as_string
|
|
):
|
|
error_msg = f"The reason the verification failed is because the public key is invalid (not on the elliptic curve. Original message: {error_message_as_string})"
|
|
logger.error(error_msg, exc_info=True)
|
|
return {"valid": False, "message": "invalid_public_key"}
|
|
|
|
else:
|
|
return {"valid": True, "message": "verification_failed"}
|
|
|
|
except Exception as e:
|
|
tb = traceback.extract_tb(e.__traceback__)[-1]
|
|
error_msg = f"error type {type(e).__name__} happened on line {tb.lineno}, in function {tb.name}, giving error: {e}. The actual code in question is: {tb.line}. in this file: {tb.filename}"
|
|
logger.error(error_msg, exc_info=True)
|
|
return {"valid": True, "message": "verification_failed"}
|
|
|
|
# this is after the server signs,
|
|
def unblind_signature(
|
|
self,
|
|
which_ticket: int,
|
|
blind_signature: dict,
|
|
) -> dict:
|
|
"""
|
|
Client unblinds the server's blinded signature using their secret "blinding_factors".
|
|
"""
|
|
try:
|
|
blinding_factor = get_data_as_int(which_ticket, "blinding_factors_int")
|
|
|
|
# uncomment this next line, if you want to load it from a file instead:
|
|
# blinded_sig_data = json.loads(blinded_sig_json)
|
|
|
|
# Deserialize the blinded signature (that was just recieved from the billing server as a JSON)
|
|
blinded_signature = self._deserialize_point_g2(blind_signature)
|
|
|
|
# Unblind: S = b^(-1) · S_blind
|
|
b_inv = pow(blinding_factor, -1, curve_order)
|
|
unblinded_signature = multiply(blinded_signature, b_inv)
|
|
|
|
return unblinded_signature
|
|
|
|
except json.JSONDecodeError as e:
|
|
raise ValueError(f"Invalid JSON signature: {str(e)}")
|
|
|
|
except Exception as e:
|
|
tb = traceback.extract_tb(e.__traceback__)[-1]
|
|
error_msg = f"Issue with unblinding the signature. It's error type {type(e).__name__} happened on line {tb.lineno}, in function {tb.name}, giving error: {e}. The actual code in question is: {tb.line}. in this file: {tb.filename}"
|
|
logger.error(error_msg, exc_info=True)
|
|
return {"valid": False, "message": "invalid_blind_sig"}
|
|
|
|
def make_final_ticket(
|
|
self,
|
|
which_ticket: int,
|
|
unblinded_signature: dict,
|
|
) -> str | bool:
|
|
"""
|
|
Input: This function takes in the unblinded signature, and which ticket ("which_ticket")
|
|
|
|
Output: And makes a JSON that has both of them together.
|
|
|
|
Note: There is NO MATH done, it's just putting them together.
|
|
This final JSON, we're defining as a "ticket".
|
|
"""
|
|
|
|
# load the ORIGINAL data (prior to blinding it):
|
|
self.ticket_data = get_data(which_ticket, "unblind_commitment")
|
|
|
|
# just change the dictionary's key word to 'commitment' make it more neutral for the server,
|
|
self.ticket_data["commitment"] = self.ticket_data.pop("unblind_commitment")
|
|
|
|
if self.ticket_data == False:
|
|
error_msg = "There's an error with making the final ticket. It could not find or get the unblind_commitment from the self.ticket_data dictionary"
|
|
logger.error(error_msg, exc_info=True)
|
|
return False
|
|
|
|
# append the signature on to it:
|
|
self.ticket_data["signature"] = self._serialize_point_g2(unblinded_signature)
|
|
|
|
# that's it. no math,
|
|
ticket = json.dumps(self.ticket_data)
|
|
|
|
return ticket
|