This repository has been archived on 2026-05-21. You can view files and clone it, but cannot push or open issues or pull requests.
TyChat-TUI/TUI.py

683 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import subprocess
REQUIRED_PACKAGES = {
"python-socketio[client]": "socketio",
"websocket-client": "websocket",
"numpy": "numpy",
"sounddevice": "sounddevice",
"prompt_toolkit": "prompt_toolkit"
}
def auto_install_deps():
missing_packages = []
for pip_name, import_name in REQUIRED_PACKAGES.items():
try:
__import__(import_name)
except ImportError:
missing_packages.append(pip_name)
if not missing_packages:
return
cmd = [sys.executable, "-m", "pip", "install"] + missing_packages
result = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
if result.returncode != 0:
stderr_output = result.stderr.decode('utf-8', errors='ignore')
if "externally-managed-environment" in stderr_output.lower():
force_cmd = cmd + ["--break-system-packages"]
force_result = subprocess.run(force_cmd)
if force_result.returncode == 0:
sys.exit(0)
sys.exit(1)
auto_install_deps()
import os
import json
import time
import base64
import re
import threading
import webbrowser
import numpy as np
import sounddevice as sd
from prompt_toolkit.application import Application
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.layout.containers import HSplit, VSplit, Window, FloatContainer, Float, ConditionalContainer, WindowAlign
from prompt_toolkit.layout.controls import FormattedTextControl
from prompt_toolkit.layout.layout import Layout
from prompt_toolkit.widgets import TextArea, Frame
from prompt_toolkit.styles import Style
from prompt_toolkit.application.current import get_app
from prompt_toolkit.filters import has_focus, Condition
from prompt_toolkit.data_structures import Point
from prompt_toolkit.completion import Completer, Completion
# Import decoupled local modules
import protocol
import socket_manager
CONFIG_FILE = "settings.json"
WIKI_URL = "https://git.idkmail.ru/lohrrrr/TyChat-Client/wiki"
DEV_SIGNATURE = "For you, from IDKMail Dev Group"
ASCII_ART = r"""
_______ _____ _ _
|__ __| / ____| | | |
| |_ _| | | |__ __ _| |_
| | | | | | | '_ \ / _` | __|
| | |_| | |____| | | | (_| | |_
|_|\__, |\_____|_| |_|\__,_|\__|
__/ |
|___/
"""
class CommandCompleter(Completer):
def __init__(self):
self.completions = {
"/add": "Добавить контакт (/add <uin>)",
"/busy": "Режим DND (Не беспокоить)",
"/query": "Мягкий запрос статуса (Уважает DND)",
"/alert": "Экстренный вызов (Пробивает DND)",
"/room create ": "Создать комнату (/room create \"имя\")",
"/room join ": "Войти в комнату (/room join UIN:\"имя\")",
"/exit": "Выход из мессенджера",
"/help": "Открыть Wiki проекта"
}
def get_completions(self, document, complete_event):
text = document.text
if text.startswith('/'):
for cmd, desc in self.completions.items():
if cmd.startswith(text):
yield Completion(cmd, start_position=-len(text), display_meta=desc)
class TyClient:
def __init__(self):
self.server_url = ""
self.username = ""
self.uin = ""
self.password = ""
self.contacts = {}
self.groups = {}
self.history = {}
self.preserved = {}
self.blocklist = []
self.active_chat = None
self.app = None
self.loop_running = True
self.selected_contact_idx = 0
self.is_busy = False
self.dialing_uin = None
self.pending_requests = []
self.pending_windows = []
self.show_welcome_popup = False
def load_config(self):
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
cfg = json.load(f)
self.server_url = cfg.get("server_url", "")
self.username = cfg.get("username", "")
self.uin = cfg.get("uin", "")
self.password = cfg.get("password", "")
self.contacts = cfg.get("contacts", {})
self.groups = cfg.get("groups", {})
self.history = cfg.get("history", {})
self.preserved = cfg.get("preserved", {})
self.blocklist = cfg.get("blocklist", [])
return True
except:
return False
return False
def save_config(self):
cfg = {
"server_url": self.server_url,
"username": self.username,
"uin": self.uin,
"password": self.password,
"contacts": self.contacts,
"groups": self.groups,
"history": self.history,
"preserved": self.preserved,
"blocklist": self.blocklist
}
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=4, ensure_ascii=False)
def play_dial_tones(self, target_uin):
self.dialing_uin = target_uin
while self.dialing_uin == target_uin and self.loop_running:
t = np.linspace(0, 1.0, int(protocol.SAMPLE_RATE * 1.0), False)
tone = np.sin(425 * t * 2 * np.pi) * 0.5
sd.play(tone, protocol.SAMPLE_RATE)
sd.wait()
for _ in range(30):
if self.dialing_uin != target_uin or not self.loop_running:
break
time.sleep(0.1)
def play_and_decode(self, audio_bytes, sender_uin):
audio_data = np.frombuffer(audio_bytes, dtype=np.float32)
sd.play(audio_data, protocol.SAMPLE_RATE)
time.sleep(0.3)
samples_per_tone = int(protocol.SAMPLE_RATE * protocol.TONE_DURATION)
current_sample = int(protocol.SAMPLE_RATE * 0.3)
sender_name = self.username if sender_uin == self.uin else f"UIN {sender_uin}"
msg_buffer = f"[{sender_name}]: "
if sender_uin not in self.history:
self.history[sender_uin] = []
self.history[sender_uin].append(msg_buffer)
self.refresh_ui()
while current_sample < len(audio_data) and self.loop_running:
chunk = audio_data[current_sample : current_sample + samples_per_tone]
if len(chunk) < samples_per_tone:
break
window_data = np.hanning(len(chunk))
fft_data = np.abs(np.fft.rfft(chunk * window_data))
frequencies = np.fft.rfftfreq(len(chunk), d=1/protocol.SAMPLE_RATE)
detected_freq = frequencies[np.argmax(fft_data)]
ascii_code = int(round((detected_freq - protocol.BASE_FREQ) / protocol.FREQ_STEP))
if 0 <= ascii_code < 65535:
try:
char = chr(ascii_code)
self.history[sender_uin][-1] += char
self.refresh_ui()
except:
pass
time.sleep(protocol.TONE_DURATION)
current_sample += samples_per_tone
sd.wait()
self.save_config()
def add_to_history(self, target_uin, line):
if target_uin not in self.history:
self.history[target_uin] = []
self.history[target_uin].append(line)
self.refresh_ui()
self.save_config()
def send_sys_packet(self, to_uin, cmd):
audio_b = protocol.text_to_audio(f"SYS:{cmd}")
socket_manager.sio.emit("relay_packet", {"to_uin": to_uin, "payload": base64.b64encode(audio_b).decode('utf-8')})
def send_service_tone(self, to_uin, freq):
audio_b = protocol.generate_service_tone(freq)
socket_manager.sio.emit("relay_packet", {"to_uin": to_uin, "payload": base64.b64encode(audio_b).decode('utf-8')})
def refresh_ui(self):
if self.app:
self.app.invalidate()
client = TyClient()
def status_checker_thread():
while client.loop_running:
if socket_manager.sio.connected and client.contacts:
socket_manager.sio.emit("check_online_statuses", list(client.contacts.keys()))
for uin in list(client.contacts.keys()):
if client.contacts[uin]["status"] == "online" and client.preserved.get(uin):
while client.preserved[uin]:
p_text = client.preserved[uin].pop(0)
audio_b = protocol.text_to_audio(p_text)
p_b64 = base64.b64encode(audio_b).decode('utf-8')
socket_manager.sio.emit("relay_packet", {"to_uin": uin, "payload": p_b64})
client.add_to_history(uin, f"[You]: {p_text}")
client.save_config()
time.sleep(10)
def make_layout():
def get_sidebar_text():
tokens = []
for idx, (uin, info) in enumerate(client.contacts.items()):
status_str = ""
if info.get("attention"): status_str = " [!]"
elif info.get("unread", 0) > 0: status_str = f" ({info['unread']})"
elif info.get("status") == "online": status_str = " *"
content = f" {uin}{status_str}"
if client.active_chat == uin: style = "class:contact-active"
elif idx == client.selected_contact_idx and get_app().layout.has_focus(sidebar_window): style = "class:contact-focused"
else: style = "class:contact"
tokens.extend([(style, f"{content}\n")])
for g_id, g_info in client.groups.items():
content = f" [G] {g_info['title']}"
if client.active_chat == g_id: style = "class:contact-active"
else: style = "class:contact"
tokens.extend([(style, f"{content}\n")])
return tokens
def get_main_text():
if not client.active_chat:
tokens = [("", "\n" * 2)]
for line in ASCII_ART.split("\n"):
tokens.append(("class:ascii", line + "\n"))
tokens.extend([
("", "\n"),
("class:desc", "Messenger on custom protocol named\n"),
("class:desc", "AcoustiOverSocket inspired by rtty\n"),
("", "\n"),
("class:help-tip", "Type / to trigger interactive command menu\n"),
("", "\n" * 2),
("class:help-tip", f"{DEV_SIGNATURE}\n")
])
if client.dialing_uin:
tokens.extend([
("", "\n" * 2),
("class:system", f"Dialing UIN {client.dialing_uin}... Waiting for accept.\n")
])
return tokens
tokens = []
lines = client.history.get(client.active_chat, [])
for line in lines:
if line.startswith("[You]:") and "(Preserved)" in line:
tokens.append(("class:preserved", line + "\n"))
elif line.startswith("[SYSTEM]:"):
tokens.append(("class:system", line + "\n"))
else:
tokens.append(("", line + "\n"))
return tokens
def get_cursor_pos():
text = "".join(t[1] for t in get_main_text())
newlines = text.count('\n')
return Point(0, max(0, newlines - 1))
sidebar_control = FormattedTextControl(get_sidebar_text, focusable=True)
sidebar_window = Frame(Window(content=sidebar_control, width=25), title="chats", style="class:border")
main_control = FormattedTextControl(get_main_text, get_cursor_position=get_cursor_pos)
def get_main_title():
base_title = f"TyChat | You: {client.username} ({client.uin})"
if client.is_busy: base_title += " [DND/BUSY]"
if client.active_chat: return f"{base_title} | Chat: {client.active_chat}"
return base_title
main_window = Frame(Window(content=main_control, wrap_lines=True), title=get_main_title, style="class:border")
input_field = TextArea(
height=3,
prompt="> ",
multiline=False,
wrap_lines=True,
completer=CommandCompleter(),
complete_while_typing=True
)
input_window = Frame(input_field, title="Input")
def accept_handler(buff):
text = input_field.text.strip()
if not text: return
if text.lower() == "/exit":
client.loop_running = False
socket_manager.sio.disconnect()
get_app().exit()
return
if text.lower() == "/help":
try: webbrowser.open(WIKI_URL)
except: pass
input_field.text = ""
return
if text.lower() == "/busy":
client.is_busy = not client.is_busy
if client.active_chat:
status_msg = "enabled" if client.is_busy else "disabled"
client.add_to_history(client.active_chat, f"[SYSTEM]: DND Mode {status_msg}.")
input_field.text = ""
client.refresh_ui()
return
if text.lower().startswith("/add "):
new_uin = text.split(" ", 1)[1].strip()
if not new_uin: return
if new_uin in client.contacts:
client.active_chat = new_uin
input_field.text = ""
client.refresh_ui()
return
client.active_chat = None
threading.Thread(target=client.play_dial_tones, args=(new_uin,), daemon=True).start()
client.send_sys_packet(new_uin, "REQ_ADD")
input_field.text = ""
client.refresh_ui()
return
if text.lower().startswith("/room create "):
match = re.match(r'^/room create "([^"]+)"$', text, re.IGNORECASE)
if match:
r_name = match.group(1)
r_id = f"ROOM:{client.uin}:{r_name}"
client.groups[r_id] = {"title": r_name, "owner": client.uin, "members": [client.uin]}
client.history[r_id] = [f"[SYSTEM]: Room \"{r_name}\" created successfully. Local GUID: {r_id}"]
client.active_chat = r_id
client.save_config()
client.refresh_ui()
input_field.text = ""
return
if text.lower().startswith("/room join "):
match = re.match(r'^/room join ([0-9]+):"([^"]+)"$', text, re.IGNORECASE)
if match:
r_owner = match.group(1)
r_name = match.group(2)
r_id = f"ROOM:{r_owner}:{r_name}"
if r_owner not in client.contacts and r_owner != client.uin:
client.contacts[r_owner] = {"status": "offline", "unread": 0, "attention": False}
client.groups[r_id] = {"title": r_name, "owner": r_owner, "members": list(set([client.uin, r_owner]))}
client.history[r_id] = [f"[SYSTEM]: Joined room \"{r_name}\". Local GUID: {r_id}"]
client.active_chat = r_id
client.save_config()
client.refresh_ui()
client.send_sys_packet(r_owner, f"REQ_JOIN_ROOM:{r_name}")
input_field.text = ""
return
if client.active_chat:
if client.active_chat.startswith("ROOM:"):
group = client.groups[client.active_chat]
owner_uin = group["owner"]
if owner_uin != client.uin and client.contacts.get(owner_uin, {}).get("status") != "online":
client.add_to_history(client.active_chat, "[SYSTEM]: CANNOT SEND. Owner of this room is offline for sync!")
input_field.text = ""
return
audio_b = protocol.text_to_audio(f"SYS:ROOM_MSG:{client.active_chat}:{client.uin}:{text}")
p_b64 = base64.b64encode(audio_b).decode('utf-8')
if owner_uin == client.uin:
for m in group["members"]:
if m != client.uin and client.contacts.get(m, {}).get("status") == "online":
socket_manager.sio.emit("relay_packet", {"to_uin": m, "payload": p_b64})
else:
socket_manager.sio.emit("relay_packet", {"to_uin": owner_uin, "payload": p_b64})
client.add_to_history(client.active_chat, f"[You]: {text}")
input_field.text = ""
return
if text.lower() == "/query":
client.send_service_tone(client.active_chat, protocol.FREQ_QUERY)
client.add_to_history(client.active_chat, "[SYSTEM]: Sent friendly status query...")
input_field.text = ""
return
if text.lower() == "/alert":
client.send_service_tone(client.active_chat, protocol.FREQ_ALERT)
client.add_to_history(client.active_chat, "[SYSTEM]: Sent URGENT alert signal (breaks DND)...")
input_field.text = ""
return
if len(text) > 300:
client.add_to_history(client.active_chat, f"[SYSTEM]: Message too long!")
input_field.text = ""
return
audio_b = protocol.text_to_audio(text)
p_b64 = base64.b64encode(audio_b).decode('utf-8')
if client.contacts[client.active_chat]["status"] == "online":
socket_manager.sio.emit("relay_packet", {"to_uin": client.active_chat, "payload": p_b64})
client.add_to_history(client.active_chat, f"[You]: {text}")
else:
if client.active_chat not in client.preserved: client.preserved[client.active_chat] = []
client.preserved[client.active_chat].append(text)
client.add_to_history(client.active_chat, f"[You]: {text} (Preserved)")
client.save_config()
input_field.text = ""
input_field.accept_handler = accept_handler
right_side = HSplit([main_window, input_window])
def get_popup_text():
if not client.pending_requests: return []
req_uin = client.pending_requests[0]
return [
("class:popup-title", f" Incoming Handshake Request \n"),
("class:popup-title", f" UIN: {req_uin} \n\n"),
("", " Do you want to add them to contacts?\n\n"),
("class:popup-keys", " [Y]es [N]o N[e]ver ")
]
popup_window = Frame(Window(FormattedTextControl(get_popup_text), align=WindowAlign.CENTER, width=42, height=6), style="class:popup-border")
def get_signal_popup_text():
if not client.pending_windows: return []
win_info = client.pending_windows[0]
w_type = win_info["type"].upper()
w_uin = win_info["uin"]
title_style = "class:alert-title" if w_type == "ALERT" else "class:query-title"
desc = "CRITICAL BREAK-IN! Are you busy?" if w_type == "ALERT" else "Friendly status query: Are you busy?"
return [
(title_style, f" *** INCOMING {w_type} SIGNAL *** \n"),
("class:popup-title", f" From UIN: {w_uin} \n\n"),
("", f" {desc}\n\n"),
("class:popup-keys", " [Y]es, I'm busy [N]o, go on ")
]
signal_popup_window = Frame(Window(FormattedTextControl(get_signal_popup_text), align=WindowAlign.CENTER, width=50, height=6), style="class:border")
def get_welcome_popup_text():
return [
("class:query-title", " *** ДОБРО ПОЖАЛОВАТЬ В TYCHAT! *** \n\n"),
("", " Привет! Вы только что зарегистрировались.\n"),
("", " Хотите автоматически присоединиться\n"),
("", " к всеобщей группе?\n\n"),
("class:popup-keys", " [Y]Да, зайти в Dnishe [N]Нет, я сам ")
]
welcome_popup_window = Frame(Window(FormattedTextControl(get_welcome_popup_text), align=WindowAlign.CENTER, width=48, height=7), style="class:border")
@Condition
def has_pending_request(): return len(client.pending_requests) > 0 and not client.show_welcome_popup
@Condition
def has_pending_window(): return len(client.pending_windows) > 0 and len(client.pending_requests) == 0 and not client.show_welcome_popup
@Condition
def has_welcome_popup(): return client.show_welcome_popup
root_container = FloatContainer(
content=VSplit([sidebar_window, right_side]),
floats=[
Float(content=ConditionalContainer(content=popup_window, filter=has_pending_request), transparent=False),
Float(content=ConditionalContainer(content=signal_popup_window, filter=has_pending_window), transparent=False),
Float(content=ConditionalContainer(content=welcome_popup_window, filter=has_welcome_popup), transparent=False)
]
)
kb = KeyBindings()
@kb.add('tab', filter=~has_pending_request & ~has_pending_window & ~has_welcome_popup)
def _(event):
if event.app.layout.has_focus(input_field): event.app.layout.focus(sidebar_window)
else: event.app.layout.focus(input_field)
@kb.add('up', filter=has_focus(sidebar_window) & ~has_pending_request & ~has_pending_window & ~has_welcome_popup)
def _(event):
all_chats = list(client.contacts.keys()) + list(client.groups.keys())
if all_chats:
client.selected_contact_idx = (client.selected_contact_idx - 1) % len(all_chats)
target = all_chats[client.selected_contact_idx]
client.active_chat = target
if target in client.contacts:
client.contacts[target]["unread"] = 0
client.contacts[target]["attention"] = False
client.refresh_ui()
@kb.add('down', filter=has_focus(sidebar_window) & ~has_pending_request & ~has_pending_window & ~has_welcome_popup)
def _(event):
all_chats = list(client.contacts.keys()) + list(client.groups.keys())
if all_chats:
client.selected_contact_idx = (client.selected_contact_idx + 1) % len(all_chats)
target = all_chats[client.selected_contact_idx]
client.active_chat = target
if target in client.contacts:
client.contacts[target]["unread"] = 0
client.contacts[target]["attention"] = False
client.refresh_ui()
@kb.add('y', filter=has_welcome_popup)
@kb.add('Y', filter=has_welcome_popup)
def _(event):
client.show_welcome_popup = False
r_owner = "716041"
r_name = "Dnishe"
r_id = f"ROOM:{r_owner}:{r_name}"
client.contacts[r_owner] = {"status": "offline", "unread": 0, "attention": False}
client.groups[r_id] = {"title": r_name, "owner": r_owner, "members": [client.uin, r_owner]}
client.history[r_id] = [f"[SYSTEM]: Автоподключение! Комната \"{r_name}\". GUID: {r_id}"]
client.active_chat = r_id
client.save_config()
client.send_sys_packet(r_owner, f"REQ_JOIN_ROOM:{r_name}")
client.refresh_ui()
@kb.add('n', filter=has_welcome_popup)
@kb.add('N', filter=has_welcome_popup)
def _(event):
client.show_welcome_popup = False
client.refresh_ui()
@kb.add('y', filter=has_pending_window)
@kb.add('Y', filter=has_pending_window)
def _(event):
win_info = client.pending_windows.pop(0)
target_uin = win_info["uin"]
client.is_busy = True
client.send_service_tone(target_uin, protocol.FREQ_RESP_YES)
client.add_to_history(target_uin, "[You -> SYSTEM]: Sent Dial Tone -> Yes, I'm busy.")
client.refresh_ui()
@kb.add('n', filter=has_pending_window)
@kb.add('N', filter=has_pending_window)
def _(event):
win_info = client.pending_windows.pop(0)
target_uin = win_info["uin"]
client.send_service_tone(target_uin, protocol.FREQ_RESP_NO)
client.add_to_history(target_uin, "[You -> SYSTEM]: Sent Dial Tone -> No, go on.")
client.refresh_ui()
@kb.add('y', filter=has_pending_request)
@kb.add('Y', filter=has_pending_request)
def _(event):
req_uin = client.pending_requests.pop(0)
client.contacts[req_uin] = {"status": "online", "unread": 0, "attention": False}
client.history[req_uin] = ["[SYSTEM]: Handshake accepted. Contact added."]
client.active_chat = req_uin
client.save_config()
client.send_sys_packet(req_uin, "RES_ACC")
client.refresh_ui()
@kb.add('n', filter=has_pending_request)
@kb.add('N', filter=has_pending_request)
def _(event):
req_uin = client.pending_requests.pop(0)
client.send_sys_packet(req_uin, "RES_DEC")
client.refresh_ui()
@kb.add('c-c')
def _(event):
client.loop_running = False
socket_manager.sio.disconnect()
event.app.exit()
return Layout(root_container, focused_element=input_field), kb
ui_style = Style.from_dict({
'contact': '#ffffff',
'contact-focused': '#00aaaa bold',
'contact-active': '#00ff00 bold',
'ascii': '#00ff00 bold',
'desc': '#00ff00',
'help-tip': '#00ffff italic',
'preserved': '#ffff00',
'system': '#ff0000 bold',
'border': '#00ff00',
'frame.border': '#00ff00',
'popup-title': '#ffffff bold',
'popup-keys': '#00ff00 bold',
'popup-border': '#ff0000 bold',
'query-title': '#00ffff bold',
'alert-title': '#ff0000 bold',
'completion-menu.completion': 'bg:#00ffff #000000',
'completion-menu.completion.current': 'bg:#00aa00 #ffffff bold'
})
def main():
if os.name == 'nt':
import ctypes
try:
kernel32 = ctypes.windll.kernel32
kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)
except: pass
is_new_registration = False
# Initialize the cross-module link so network callbacks can manipulate local state
socket_manager.init_network(client)
if not client.load_config():
print("!" * 60)
print("WARNING: All data is local.")
print("!" * 60 + "\n")
client.server_url = input("Enter server URL: ").strip()
if not client.server_url.startswith("http://") and not client.server_url.startswith("https://"):
client.server_url = "http://" + client.server_url
try: socket_manager.sio.connect(client.server_url, transports=['websocket'])
except Exception as e: return
print("\n1. Register\n2. Login")
mode = input("> ")
username_or_uin = input("UIN/Username: ").strip()
password = input("Password: ").strip()
event_wait = threading.Event()
@socket_manager.sio.event
def register_response(data):
nonlocal is_new_registration
if data["status"] == "success":
client.uin = data['uin']
client.username = username_or_uin
client.password = password
is_new_registration = True
event_wait.set()
@socket_manager.sio.event
def login_response(data):
if data["status"] == "success":
client.uin = username_or_uin
client.username = data["username"]
client.password = password
event_wait.set()
if mode == "1": socket_manager.sio.emit("register", {"username": username_or_uin, "password": password})
else: socket_manager.sio.emit("login", {"uin": username_or_uin, "password": password})
event_wait.wait()
if not client.uin: return
client.save_config()
input("Press Enter to open TUI...")
else:
try:
socket_manager.sio.connect(client.server_url, transports=['websocket'])
socket_manager.sio.emit("login", {"uin": client.uin, "password": client.password})
except: pass
if is_new_registration:
client.show_welcome_popup = True
threading.Thread(target=status_checker_thread, daemon=True).start()
layout, bindings = make_layout()
client.app = Application(
layout=layout,
key_bindings=bindings,
style=ui_style,
full_screen=True,
enable_page_navigation_bindings=True
)
client.app.run()
if __name__ == "__main__":
main()