Compare commits
2 commits
4cdedfef83
...
01eeb8e71c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
01eeb8e71c | ||
|
|
a240a09783 |
4
.gitignore
vendored
4
.gitignore
vendored
|
|
@ -1,3 +1,7 @@
|
|||
server/
|
||||
venv/
|
||||
exploit.py
|
||||
settings.ini
|
||||
tychat.wallet
|
||||
lib/__pycache__/
|
||||
__pycache__/
|
||||
52
lib/tyconfig.py
Normal file
52
lib/tyconfig.py
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
import os
|
||||
import configparser
|
||||
|
||||
class TyConfigManager:
|
||||
def __init__(self, config_path="settings.ini"):
|
||||
self.config_path = config_path
|
||||
self.config = configparser.ConfigParser()
|
||||
|
||||
self.defaults = {
|
||||
"Network": {
|
||||
"server_ip": "127.0.0.1",
|
||||
"server_port": "5555"
|
||||
},
|
||||
"Crypto": {
|
||||
"wallet_file": "tychat.wallet"
|
||||
},
|
||||
"UI": {
|
||||
"theme": "dark",
|
||||
"username": "Anonymous"
|
||||
}
|
||||
}
|
||||
self.load_config()
|
||||
|
||||
def load_config(self):
|
||||
if not os.path.exists(self.config_path):
|
||||
for section, options in self.defaults.items():
|
||||
self.config[section] = options
|
||||
self.save_config()
|
||||
else:
|
||||
self.config.read(self.config_path)
|
||||
|
||||
def save_config(self):
|
||||
with open(self.config_path, "w", encoding="utf-8") as f:
|
||||
self.config.write(f)
|
||||
|
||||
def get_wallet_status(self) -> tuple[bool, str]:
|
||||
wallet_path = self.config.get("Crypto", "wallet_file", fallback="tychat.wallet")
|
||||
return os.path.exists(wallet_path), wallet_path
|
||||
|
||||
def get_network_settings(self) -> tuple[str, int]:
|
||||
ip = self.config.get("Network", "server_ip", fallback="127.0.0.1")
|
||||
port = self.config.getint("Network", "server_port", fallback=5555)
|
||||
return ip, port
|
||||
|
||||
def get_value(self, section: str, option: str, fallback=None) -> str:
|
||||
return self.config.get(section, option, fallback=fallback)
|
||||
|
||||
def set_value(self, section: str, option: str, value: str):
|
||||
if not self.config.has_section(section):
|
||||
self.config.add_section(section)
|
||||
self.config.set(section, option, str(value))
|
||||
self.save_config()
|
||||
|
|
@ -12,8 +12,6 @@ class TyCryptoEngine:
|
|||
|
||||
self.id_pub = self.id_priv.public_key()
|
||||
self.id_pub_bytes = self.id_pub.public_bytes_raw()
|
||||
|
||||
# Наш неизменный ID (6 знаков), намертво привязанный к ключу
|
||||
self.my_id = int(hashlib.sha256(self.id_pub_bytes).hexdigest(), 16) % 1000000
|
||||
|
||||
self.ek_priv = None
|
||||
|
|
@ -23,6 +21,7 @@ class TyCryptoEngine:
|
|||
return self.id_priv.private_bytes_raw()
|
||||
|
||||
def make_handshake_packet(self, receiver_id: int) -> bytes:
|
||||
if not self.ek_priv:
|
||||
self.ek_priv = x25519.X25519PrivateKey.generate()
|
||||
ek_pub_bytes = self.ek_priv.public_key().public_bytes_raw()
|
||||
|
||||
|
|
@ -36,6 +35,9 @@ class TyCryptoEngine:
|
|||
return header + meta + payload
|
||||
|
||||
def parse_handshake_packet(self, sender_id: int, payload: bytes) -> bool:
|
||||
if self.aesgcm is not None:
|
||||
return True
|
||||
|
||||
if len(payload) < 128:
|
||||
return False
|
||||
try:
|
||||
|
|
|
|||
276
tui.py
Normal file
276
tui.py
Normal file
|
|
@ -0,0 +1,276 @@
|
|||
import os
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
from lib.tyconfig import TyConfigManager
|
||||
from lib.tycrypto import TyCryptoEngine
|
||||
from lib.nwman import TyNetworkManager
|
||||
|
||||
from prompt_toolkit.application import Application
|
||||
from prompt_toolkit.key_binding import KeyBindings
|
||||
from prompt_toolkit.layout.containers import HSplit, VSplit, Window
|
||||
from prompt_toolkit.layout.controls import FormattedTextControl
|
||||
from prompt_toolkit.layout.layout import Layout
|
||||
from prompt_toolkit.widgets import TextArea, Frame
|
||||
from prompt_toolkit.styles import Style
|
||||
from prompt_toolkit.application.current import get_app
|
||||
from prompt_toolkit.filters import has_focus
|
||||
|
||||
ASCII_ART = r"""
|
||||
_______ _____ _ _
|
||||
|__ __| / ____| | | |
|
||||
| |_ _| | | |__ __ _| |_
|
||||
| | | | | | | '_ \ / _` | __|
|
||||
| | |_| | |____| | | | (_| | |_
|
||||
|_|\__, |\_____|_| |_|\__,_|\__|
|
||||
__/ |
|
||||
|___/
|
||||
"""
|
||||
|
||||
class TyTUIClient:
|
||||
def __init__(self):
|
||||
self.cfg = TyConfigManager()
|
||||
wallet_exists, self.wallet_path = self.cfg.get_wallet_status()
|
||||
|
||||
if wallet_exists:
|
||||
with open(self.wallet_path, "rb") as f:
|
||||
wallet_bytes = f.read()
|
||||
self.crypto = TyCryptoEngine(wallet_bytes=wallet_bytes)
|
||||
else:
|
||||
self.crypto = TyCryptoEngine()
|
||||
with open(self.wallet_path, "wb") as f:
|
||||
f.write(self.crypto.export_wallet())
|
||||
|
||||
self.my_id = self.crypto.my_id
|
||||
self.contacts = {}
|
||||
self.history = {}
|
||||
self.active_chat = None
|
||||
self.selected_contact_idx = 0
|
||||
self.app = None
|
||||
self.loop_running = True
|
||||
self.net = TyNetworkManager(on_packet_received=self.handle_incoming_packet)
|
||||
|
||||
def start(self):
|
||||
print(ASCII_ART)
|
||||
print("=" * 60)
|
||||
print(f"Теперь ты UID {self.my_id}, теперь ты часть TyChat.")
|
||||
print(f"Профиль сохранен в: {self.wallet_path}")
|
||||
print("=" * 60)
|
||||
time.sleep(2)
|
||||
|
||||
server_ip, server_port = self.cfg.get_network_settings()
|
||||
if not self.net.connect(server_ip, server_port):
|
||||
sys.exit(1)
|
||||
|
||||
self.net.register_id(self.my_id)
|
||||
threading.Thread(target=self.status_checker_loop, daemon=True).start()
|
||||
|
||||
layout, bindings = self.make_layout()
|
||||
self.app = Application(
|
||||
layout=layout,
|
||||
key_bindings=bindings,
|
||||
style=ui_style,
|
||||
full_screen=True
|
||||
)
|
||||
self.app.run()
|
||||
|
||||
def handle_incoming_packet(self, header, iv, payload):
|
||||
try:
|
||||
packet_type, sender_id, receiver_id = TyCryptoEngine.parse_header(header)
|
||||
|
||||
if sender_id not in self.contacts:
|
||||
self.contacts[sender_id] = {"status": "online", "unread": 0}
|
||||
self.history[sender_id] = []
|
||||
|
||||
if packet_type == 0x01:
|
||||
session_existed = self.crypto.aesgcm is not None
|
||||
|
||||
if self.crypto.parse_handshake_packet(sender_id, payload):
|
||||
if not session_existed:
|
||||
self.add_to_history(sender_id, "[SYSTEM]: Канал связи защищен (E2E)!")
|
||||
reply_packet = self.crypto.make_handshake_packet(sender_id)
|
||||
self.net.send_packet(reply_packet)
|
||||
else:
|
||||
self.add_to_history(sender_id, "[SYSTEM]: Подтверждение канала получено.")
|
||||
|
||||
elif packet_type == 0x02:
|
||||
text = self.crypto.decrypt_message(header, iv, payload)
|
||||
if self.active_chat != sender_id:
|
||||
self.contacts[sender_id]["unread"] += 1
|
||||
self.add_to_history(sender_id, f"[{sender_id}]: {text}")
|
||||
|
||||
except Exception:
|
||||
if 'sender_id' in locals():
|
||||
self.add_to_history(sender_id, "[SYSTEM]: Ошибка дешифрации пакета")
|
||||
|
||||
def add_to_history(self, target_id, line):
|
||||
if target_id not in self.history:
|
||||
self.history[target_id] = []
|
||||
self.history[target_id].append(line)
|
||||
if self.app:
|
||||
self.app.invalidate()
|
||||
|
||||
def status_checker_loop(self):
|
||||
while self.loop_running:
|
||||
if self.net.is_running and self.contacts:
|
||||
try:
|
||||
self.net.sock.sendall(bytes([0x01, 0x04]) + self.my_id.to_bytes(3, 'big') + (0).to_bytes(3, 'big') + b'\x00'*16)
|
||||
except:
|
||||
pass
|
||||
time.sleep(10)
|
||||
|
||||
def make_layout(self):
|
||||
def get_sidebar_text():
|
||||
tokens = []
|
||||
for idx, (target_id, info) in enumerate(self.contacts.items()):
|
||||
status_str = " *" if info.get("status") == "online" else ""
|
||||
unread_cnt = info.get("unread", 0)
|
||||
unread_str = f" ({unread_cnt})" if unread_cnt > 0 else ""
|
||||
content = f" {target_id}{status_str}{unread_str}"
|
||||
|
||||
if self.active_chat == target_id:
|
||||
style = "class:contact-active"
|
||||
elif idx == self.selected_contact_idx and get_app().layout.has_focus(sidebar_window):
|
||||
style = "class:contact-focused"
|
||||
else:
|
||||
style = "class:contact"
|
||||
tokens.extend([(style, f"{content}\n")])
|
||||
return tokens
|
||||
|
||||
def get_main_text():
|
||||
if not self.active_chat:
|
||||
tokens = [("", "\n" * 2)]
|
||||
for line in ASCII_ART.split("\n"):
|
||||
tokens.append(("class:ascii", line + "\n"))
|
||||
tokens.extend([
|
||||
("", "\n"),
|
||||
("class:desc", f"TyChat E2E Client Core Engine\n"),
|
||||
("class:desc", f"ID: {self.my_id}\n"),
|
||||
("", "\n"),
|
||||
("class:help-tip", "Tab: переключение фокуса\n"),
|
||||
("class:help-tip", "Вверх/Вниз в панели контактов: выбор чата\n")
|
||||
])
|
||||
return tokens
|
||||
|
||||
tokens = []
|
||||
lines = self.history.get(self.active_chat, [])
|
||||
for line in lines:
|
||||
if line.startswith("[SYSTEM]:"):
|
||||
tokens.append(("class:system", line + "\n"))
|
||||
elif line.startswith("[You]:"):
|
||||
tokens.append(("class:preserved", line + "\n"))
|
||||
else:
|
||||
tokens.append(("", line + "\n"))
|
||||
return tokens
|
||||
|
||||
sidebar_control = FormattedTextControl(get_sidebar_text, focusable=True)
|
||||
sidebar_window = Frame(Window(content=sidebar_control, width=22), title="Контакты", style="class:border")
|
||||
|
||||
main_control = FormattedTextControl(get_main_text)
|
||||
def get_main_title():
|
||||
if self.active_chat:
|
||||
return f"TyChat | Собеседник: {self.active_chat}"
|
||||
return f"TyChat | Мой ID: {self.my_id}"
|
||||
|
||||
main_window = Frame(Window(content=main_control), title=get_main_title, style="class:border")
|
||||
|
||||
input_field = TextArea(height=3, prompt="> ", multiline=False, wrap_lines=True)
|
||||
input_window = Frame(input_field, title="Ввод сообщения (/add <id>, /exit)")
|
||||
|
||||
def accept_handler(buff):
|
||||
text = input_field.text.strip()
|
||||
if not text:
|
||||
return
|
||||
|
||||
if text.lower() == "/exit":
|
||||
self.loop_running = False
|
||||
self.net.disconnect()
|
||||
get_app().exit()
|
||||
return
|
||||
|
||||
if text.lower().startswith("/add "):
|
||||
try:
|
||||
new_id = int(text.split(" ", 1)[1].strip())
|
||||
if new_id not in self.contacts:
|
||||
self.contacts[new_id] = {"status": "offline", "unread": 0}
|
||||
self.history[new_id] = []
|
||||
handshake_packet = self.crypto.make_handshake_packet(new_id)
|
||||
self.net.send_packet(handshake_packet)
|
||||
self.add_to_history(new_id, "[SYSTEM]: Отправлен запрос защищенного канала...")
|
||||
except ValueError:
|
||||
pass
|
||||
input_field.text = ""
|
||||
return
|
||||
|
||||
if self.active_chat:
|
||||
try:
|
||||
if not self.crypto.aesgcm:
|
||||
handshake_packet = self.crypto.make_handshake_packet(self.active_chat)
|
||||
self.net.send_packet(handshake_packet)
|
||||
self.add_to_history(self.active_chat, "[SYSTEM]: Сессия не готова. Повторяем хэндшейк...")
|
||||
else:
|
||||
packet = self.crypto.encrypt_message(self.active_chat, text)
|
||||
self.net.send_packet(packet)
|
||||
self.add_to_history(self.active_chat, f"[You]: {text}")
|
||||
except Exception as e:
|
||||
self.add_to_history(self.active_chat, f"[SYSTEM]: Ошибка отправки: {e}")
|
||||
|
||||
input_field.text = ""
|
||||
|
||||
input_field.accept_handler = accept_handler
|
||||
|
||||
right_side = HSplit([main_window, input_window])
|
||||
root_container = VSplit([sidebar_window, right_side])
|
||||
|
||||
kb = KeyBindings()
|
||||
|
||||
@kb.add('tab')
|
||||
def _(event):
|
||||
if event.app.layout.has_focus(input_field):
|
||||
event.app.layout.focus(sidebar_window)
|
||||
else:
|
||||
event.app.layout.focus(input_field)
|
||||
|
||||
@kb.add('up', filter=has_focus(sidebar_window))
|
||||
def _(event):
|
||||
if self.contacts:
|
||||
self.selected_contact_idx = (self.selected_contact_idx - 1) % len(self.contacts)
|
||||
target_id = list(self.contacts.keys())[self.selected_contact_idx]
|
||||
self.active_chat = target_id
|
||||
self.contacts[target_id]["unread"] = 0
|
||||
if self.app: self.app.invalidate()
|
||||
|
||||
@kb.add('down', filter=has_focus(sidebar_window))
|
||||
def _(event):
|
||||
if self.contacts:
|
||||
self.selected_contact_idx = (self.selected_contact_idx + 1) % len(self.contacts)
|
||||
target_id = list(self.contacts.keys())[self.selected_contact_idx]
|
||||
self.active_chat = target_id
|
||||
self.contacts[target_id]["unread"] = 0
|
||||
if self.app: self.app.invalidate()
|
||||
|
||||
@kb.add('c-c')
|
||||
def _(event):
|
||||
self.loop_running = False
|
||||
self.net.disconnect()
|
||||
event.app.exit()
|
||||
|
||||
return Layout(root_container, focused_element=input_field), kb
|
||||
|
||||
ui_style = Style.from_dict({
|
||||
'contact': '#ffffff',
|
||||
'contact-focused': '#00aaaa bold',
|
||||
'contact-active': '#00ff00 bold',
|
||||
'ascii': '#00ff00 bold',
|
||||
'desc': '#00ff00',
|
||||
'help-tip': '#00ffff italic',
|
||||
'preserved': '#ffff00',
|
||||
'system': '#ffaa00 bold',
|
||||
'border': '#00ff00',
|
||||
'frame.border': '#00ff00',
|
||||
})
|
||||
|
||||
if __name__ == "__main__":
|
||||
tui_client = TyTUIClient()
|
||||
tui_client.start()
|
||||
Loading…
Reference in a new issue