diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..db1ac0a --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +server/ +venv/ +exploit.py \ No newline at end of file diff --git a/README.md b/README.md index 2a5f60d..0c72f54 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ ## Дорожная карта (TODO) -* [ ] Разработать протокол обмена сообщениями. +* [X] Разработать протокол обмена сообщениями. * [ ] Реализовать сигнальный сервер. * [ ] Разработать TUI-клиент (терминальный интерфейс). * [ ] Разработать GUI-клиент для ПК. diff --git a/tycrypto.py b/tycrypto.py new file mode 100644 index 0000000..a1325e5 --- /dev/null +++ b/tycrypto.py @@ -0,0 +1,87 @@ +import os +import hashlib +from cryptography.hazmat.primitives.asymmetric import ed25519, x25519 +from cryptography.hazmat.primitives.ciphers.aead import AESGCM + +class TyCryptoEngine: + def __init__(self, static_private_bytes=None): + if static_private_bytes: + self.id_priv = ed25519.Ed25519PrivateKey.from_private_bytes(static_private_bytes) + else: + self.id_priv = ed25519.Ed25519PrivateKey.generate() + + self.id_pub = self.id_priv.public_key() + self.id_pub_bytes = self.id_pub.public_bytes_raw() + self.my_id = int(hashlib.sha256(self.id_pub_bytes).hexdigest(), 16) % 1000000 + + self.ek_priv = None + self.aesgcm = None + + def get_private_bytes(self) -> bytes: + return self.id_priv.private_bytes_raw() + + def make_handshake_packet(self, receiver_id: int) -> bytes: + self.ek_priv = x25519.X25519PrivateKey.generate() + ek_pub_bytes = self.ek_priv.public_key().public_bytes_raw() + + data_to_sign = self.my_id.to_bytes(3, 'big') + ek_pub_bytes + signature = self.id_priv.sign(data_to_sign) + + payload = self.id_pub_bytes + signature + ek_pub_bytes + header = bytes([0x01, 0x01]) + self.my_id.to_bytes(3, 'big') + receiver_id.to_bytes(3, 'big') + meta = b'\x00' * 12 + len(payload).to_bytes(4, 'big') + + return header + meta + payload + + def parse_handshake_packet(self, sender_id: int, payload: bytes) -> bool: + if len(payload) < 128: + return False + try: + peer_id_pub_bytes = payload[0:32] + signature = payload[32:96] + peer_ek_pub_bytes = payload[96:128] + + expected_id = int(hashlib.sha256(peer_id_pub_bytes).hexdigest(), 16) % 1000000 + if sender_id != expected_id: + return False + + peer_id_pub = ed25519.Ed25519PublicKey.from_public_bytes(peer_id_pub_bytes) + data_to_verify = sender_id.to_bytes(3, 'big') + peer_ek_pub_bytes + peer_id_pub.verify(signature, data_to_verify) + + if not self.ek_priv: + self.ek_priv = x25519.X25519PrivateKey.generate() + + peer_ek_pub = x25519.X25519PublicKey.from_public_bytes(peer_ek_pub_bytes) + shared_secret = self.ek_priv.exchange(peer_ek_pub) + + symmetric_key = hashlib.sha256(shared_secret).digest() + self.aesgcm = AESGCM(symmetric_key) + return True + except Exception: + return False + + def encrypt_message(self, receiver_id: int, text: str) -> bytes: + if not self.aesgcm: + raise RuntimeError("Crypto session not initialized") + + header = bytes([0x01, 0x02]) + self.my_id.to_bytes(3, 'big') + receiver_id.to_bytes(3, 'big') + iv = os.urandom(12) + encrypted_payload = self.aesgcm.encrypt(iv, text.encode('utf-8'), header) + meta = iv + len(encrypted_payload).to_bytes(4, 'big') + + return header + meta + encrypted_payload + + def decrypt_message(self, header: bytes, iv: bytes, encrypted_payload: bytes) -> str: + if not self.aesgcm: + raise RuntimeError("Crypto session not initialized") + decrypted_data = self.aesgcm.decrypt(iv, encrypted_payload, header) + return decrypted_data.decode('utf-8') + + @staticmethod + def parse_header(header_bytes: bytes) -> tuple[int, int, int]: + version = header_bytes[0] + packet_type = header_bytes[1] + sender_id = int.from_bytes(header_bytes[2:5], 'big') + receiver_id = int.from_bytes(header_bytes[5:8], 'big') + return packet_type, sender_id, receiver_id \ No newline at end of file