import os import hashlib from cryptography.hazmat.primitives.asymmetric import ed25519, x25519 from cryptography.hazmat.primitives.ciphers.aead import AESGCM class TyCryptoEngine: def __init__(self, static_private_bytes=None): if static_private_bytes: self.id_priv = ed25519.Ed25519PrivateKey.from_private_bytes(static_private_bytes) else: self.id_priv = ed25519.Ed25519PrivateKey.generate() self.id_pub = self.id_priv.public_key() self.id_pub_bytes = self.id_pub.public_bytes_raw() self.my_id = int(hashlib.sha256(self.id_pub_bytes).hexdigest(), 16) % 1000000 self.ek_priv = None self.aesgcm = None def get_private_bytes(self) -> bytes: return self.id_priv.private_bytes_raw() def make_handshake_packet(self, receiver_id: int) -> bytes: self.ek_priv = x25519.X25519PrivateKey.generate() ek_pub_bytes = self.ek_priv.public_key().public_bytes_raw() data_to_sign = self.my_id.to_bytes(3, 'big') + ek_pub_bytes signature = self.id_priv.sign(data_to_sign) payload = self.id_pub_bytes + signature + ek_pub_bytes header = bytes([0x01, 0x01]) + self.my_id.to_bytes(3, 'big') + receiver_id.to_bytes(3, 'big') meta = b'\x00' * 12 + len(payload).to_bytes(4, 'big') return header + meta + payload def parse_handshake_packet(self, sender_id: int, payload: bytes) -> bool: if len(payload) < 128: return False try: peer_id_pub_bytes = payload[0:32] signature = payload[32:96] peer_ek_pub_bytes = payload[96:128] expected_id = int(hashlib.sha256(peer_id_pub_bytes).hexdigest(), 16) % 1000000 if sender_id != expected_id: return False peer_id_pub = ed25519.Ed25519PublicKey.from_public_bytes(peer_id_pub_bytes) data_to_verify = sender_id.to_bytes(3, 'big') + peer_ek_pub_bytes peer_id_pub.verify(signature, data_to_verify) if not self.ek_priv: self.ek_priv = x25519.X25519PrivateKey.generate() peer_ek_pub = x25519.X25519PublicKey.from_public_bytes(peer_ek_pub_bytes) shared_secret = self.ek_priv.exchange(peer_ek_pub) symmetric_key = hashlib.sha256(shared_secret).digest() self.aesgcm = AESGCM(symmetric_key) return True except Exception: return False def encrypt_message(self, receiver_id: int, text: str) -> bytes: if not self.aesgcm: raise RuntimeError("Crypto session not initialized") header = bytes([0x01, 0x02]) + self.my_id.to_bytes(3, 'big') + receiver_id.to_bytes(3, 'big') iv = os.urandom(12) encrypted_payload = self.aesgcm.encrypt(iv, text.encode('utf-8'), header) meta = iv + len(encrypted_payload).to_bytes(4, 'big') return header + meta + encrypted_payload def decrypt_message(self, header: bytes, iv: bytes, encrypted_payload: bytes) -> str: if not self.aesgcm: raise RuntimeError("Crypto session not initialized") decrypted_data = self.aesgcm.decrypt(iv, encrypted_payload, header) return decrypted_data.decode('utf-8') @staticmethod def parse_header(header_bytes: bytes) -> tuple[int, int, int]: version = header_bytes[0] packet_type = header_bytes[1] sender_id = int.from_bytes(header_bytes[2:5], 'big') receiver_id = int.from_bytes(header_bytes[5:8], 'big') return packet_type, sender_id, receiver_id