90 lines
3.7 KiB
Python
90 lines
3.7 KiB
Python
import os
|
|
import hashlib
|
|
from cryptography.hazmat.primitives.asymmetric import ed25519, x25519
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
|
|
|
class TyCryptoEngine:
|
|
def __init__(self, wallet_bytes=None):
|
|
if wallet_bytes:
|
|
self.id_priv = ed25519.Ed25519PrivateKey.from_private_bytes(wallet_bytes)
|
|
else:
|
|
self.id_priv = ed25519.Ed25519PrivateKey.generate()
|
|
|
|
self.id_pub = self.id_priv.public_key()
|
|
self.id_pub_bytes = self.id_pub.public_bytes_raw()
|
|
self.my_id = int(hashlib.sha256(self.id_pub_bytes).hexdigest(), 16) % 1000000
|
|
|
|
self.ek_priv = None
|
|
self.aesgcm = None
|
|
|
|
def export_wallet(self) -> bytes:
|
|
return self.id_priv.private_bytes_raw()
|
|
|
|
def make_handshake_packet(self, receiver_id: int) -> bytes:
|
|
if not self.ek_priv:
|
|
self.ek_priv = x25519.X25519PrivateKey.generate()
|
|
ek_pub_bytes = self.ek_priv.public_key().public_bytes_raw()
|
|
|
|
data_to_sign = self.my_id.to_bytes(3, 'big') + ek_pub_bytes
|
|
signature = self.id_priv.sign(data_to_sign)
|
|
|
|
payload = self.id_pub_bytes + signature + ek_pub_bytes
|
|
header = bytes([0x01, 0x01]) + self.my_id.to_bytes(3, 'big') + receiver_id.to_bytes(3, 'big')
|
|
meta = b'\x00' * 12 + len(payload).to_bytes(4, 'big')
|
|
|
|
return header + meta + payload
|
|
|
|
def parse_handshake_packet(self, sender_id: int, payload: bytes) -> bool:
|
|
if self.aesgcm is not None:
|
|
return True
|
|
|
|
if len(payload) < 128:
|
|
return False
|
|
try:
|
|
peer_id_pub_bytes = payload[0:32]
|
|
signature = payload[32:96]
|
|
peer_ek_pub_bytes = payload[96:128]
|
|
|
|
expected_id = int(hashlib.sha256(peer_id_pub_bytes).hexdigest(), 16) % 1000000
|
|
if sender_id != expected_id:
|
|
return False
|
|
|
|
peer_id_pub = ed25519.Ed25519PublicKey.from_public_bytes(peer_id_pub_bytes)
|
|
data_to_verify = sender_id.to_bytes(3, 'big') + peer_ek_pub_bytes
|
|
peer_id_pub.verify(signature, data_to_verify)
|
|
|
|
if not self.ek_priv:
|
|
self.ek_priv = x25519.X25519PrivateKey.generate()
|
|
|
|
peer_ek_pub = x25519.X25519PublicKey.from_public_bytes(peer_ek_pub_bytes)
|
|
shared_secret = self.ek_priv.exchange(peer_ek_pub)
|
|
|
|
symmetric_key = hashlib.sha256(shared_secret).digest()
|
|
self.aesgcm = AESGCM(symmetric_key)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
def encrypt_message(self, receiver_id: int, text: str) -> bytes:
|
|
if not self.aesgcm:
|
|
raise RuntimeError("Crypto session not initialized")
|
|
|
|
header = bytes([0x01, 0x02]) + self.my_id.to_bytes(3, 'big') + receiver_id.to_bytes(3, 'big')
|
|
iv = os.urandom(12)
|
|
encrypted_payload = self.aesgcm.encrypt(iv, text.encode('utf-8'), header)
|
|
meta = iv + len(encrypted_payload).to_bytes(4, 'big')
|
|
|
|
return header + meta + encrypted_payload
|
|
|
|
def decrypt_message(self, header: bytes, iv: bytes, encrypted_payload: bytes) -> str:
|
|
if not self.aesgcm:
|
|
raise RuntimeError("Crypto session not initialized")
|
|
decrypted_data = self.aesgcm.decrypt(iv, encrypted_payload, header)
|
|
return decrypted_data.decode('utf-8')
|
|
|
|
@staticmethod
|
|
def parse_header(header_bytes: bytes) -> tuple[int, int, int]:
|
|
packet_type = header_bytes[1]
|
|
sender_id = int.from_bytes(header_bytes[2:5], 'big')
|
|
receiver_id = int.from_bytes(header_bytes[5:8], 'big')
|
|
return packet_type, sender_id, receiver_id |