This repository has been archived on 2026-05-21. You can view files and clone it, but cannot push or open issues or pull requests.
TyChat-TUI/TUI.py

961 lines
41 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import subprocess
import os
import json
import time
import base64
import re
import threading
import webbrowser
import importlib
import numpy as np
import sounddevice as sd
from prompt_toolkit.application import Application
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.layout.containers import HSplit, VSplit, Window, FloatContainer, Float, ConditionalContainer, WindowAlign
from prompt_toolkit.layout.controls import FormattedTextControl
from prompt_toolkit.layout.layout import Layout
from prompt_toolkit.widgets import TextArea, Frame
from prompt_toolkit.styles import Style
from prompt_toolkit.application.current import get_app
from prompt_toolkit.filters import has_focus, Condition
from prompt_toolkit.data_structures import Point
from prompt_toolkit.completion import Completer, Completion
REQUIRED_PACKAGES = {
"python-socketio[client]": "socketio",
"websocket-client": "websocket",
"numpy": "numpy",
"sounddevice": "sounddevice",
"prompt_toolkit": "prompt_toolkit"
}
def auto_install_deps():
missing_packages = []
for pip_name, import_name in REQUIRED_PACKAGES.items():
try:
__import__(import_name)
except ImportError:
missing_packages.append(pip_name)
if not missing_packages:
return
cmd = [sys.executable, "-m", "pip", "install"] + missing_packages
result = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
if result.returncode != 0:
stderr_output = result.stderr.decode('utf-8', errors='ignore')
if "externally-managed-environment" in stderr_output.lower():
force_cmd = cmd + ["--break-system-packages"]
force_result = subprocess.run(force_cmd)
if force_result.returncode == 0:
sys.exit(0)
sys.exit(1)
auto_install_deps()
import socket_manager
CONFIG_FILE = "settings.json"
WIKI_URL = "https://git.idkmail.ru/lohrrrr/TyChat-Client/wiki"
DEV_SIGNATURE = "For you, from IDKMail Dev Group"
ASCII_ART = r"""
_______ _____ _ _
|__ __| / ____| | | |
| |_ _| | | |__ __ _| |_
| | | | | | | '_ \ / _` | __|
| | |_| | |____| | | | (_| | |_
|_|\__, |\_____|_| |_|\__,_|\__|
__/ |
|___/
"""
class CommandCompleter(Completer):
def __init__(self):
self.completions = {
"/add": "Добавить контакт (/add <uin>)",
"/busy": "Режим DND (Не беспокоить)",
"/query": "Мягкий запрос статуса (Уважает DND)",
"/alert": "Экстренный вызов (Пробивает DND)",
"/room create ": "Создать комнату (/room create \"имя\")",
"/room join ": "Войти в комнату (/room join UIN:\"имя\")",
"/exit": "Выход из мессенджера",
"/help": "Открыть Wiki проекта"
}
def get_completions(self, document, complete_event):
text = document.text
if text.startswith('/'):
for cmd, desc in self.completions.items():
if cmd.startswith(text):
yield Completion(cmd, start_position=-len(text), display_meta=desc)
class PluginManager:
def __init__(self, client_instance):
self.client = client_instance
self.protocols = {}
self.transports = {}
self.load_plugins()
def load_plugins(self):
for folder, collection, base_mod in [('protocols', self.protocols, 'protocols'), ('transports', self.transports, 'transports')]:
if not os.path.exists(folder):
os.makedirs(folder)
with open(os.path.join(folder, '__init__.py'), 'w') as f: pass
for f in os.listdir(folder):
if f.endswith('.py') and f != '__init__.py':
mod_name = f[:-3]
try:
mod = importlib.import_module(f"{base_mod}.{mod_name}")
if hasattr(mod, 'Plugin'):
instance = mod.Plugin()
collection[instance.name] = {"instance": instance, "enabled": True, "failed": False}
except:
pass
def safe_encode(self, proto_name, text):
proto = self.protocols.get(proto_name)
if not proto or not proto["enabled"] or proto["failed"]:
return None
try:
return proto["instance"].encode(text)
except:
proto["failed"] = True
proto["enabled"] = False
self.client.add_to_history(self.client.active_chat or "SYSTEM", f"[SYSTEM]: Protocol '{proto_name}' crashed and was disabled.")
return None
def safe_decode(self, proto_name, data):
proto = self.protocols.get(proto_name)
if not proto or not proto["enabled"] or proto["failed"]:
return None
try:
return proto["instance"].decode(data)
except:
proto["failed"] = True
proto["enabled"] = False
self.client.add_to_history(self.client.active_chat or "SYSTEM", f"[SYSTEM]: Protocol '{proto_name}' crashed and was disabled.")
return None
def safe_generate_service(self, proto_name, sig_type):
proto = self.protocols.get(proto_name)
if not proto or not proto["enabled"] or proto["failed"]:
return None
try:
return proto["instance"].generate_service_signal(sig_type)
except:
proto["failed"] = True
proto["enabled"] = False
return None
class TyClient:
def __init__(self):
self.server_url = ""
self.username = ""
self.uin = ""
self.password = ""
self.contacts = {}
self.groups = {}
self.history = {}
self.preserved = {}
self.blocklist = []
self.active_chat = None
self.app = None
self.loop_running = True
self.selected_contact_idx = 0
self.is_busy = False
self.dialing_uin = None
self.pending_requests = []
self.pending_windows = []
self.show_welcome_popup = False
self.current_tab = "chat"
self.settings_cursor = 0
self.settings_section = "protocols"
self.primary_protocol = ""
self.primary_transport = ""
self.peer_session_protocols = {}
self.plugin_manager = PluginManager(self)
def load_config(self):
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
cfg = json.load(f)
self.server_url = cfg.get("server_url", "")
self.username = cfg.get("username", "")
self.uin = cfg.get("uin", "")
self.password = cfg.get("password", "")
self.contacts = cfg.get("contacts", {})
self.groups = cfg.get("groups", {})
self.history = cfg.get("history", {})
self.preserved = cfg.get("preserved", {})
self.blocklist = cfg.get("blocklist", [])
self.primary_protocol = cfg.get("primary_protocol", "")
self.primary_transport = cfg.get("primary_transport", "")
saved_protos = cfg.get("enabled_protocols", {})
for p_name, state in saved_protos.items():
if p_name in self.plugin_manager.protocols:
self.plugin_manager.protocols[p_name]["enabled"] = state
saved_trans = cfg.get("enabled_transports", {})
for t_name, state in saved_trans.items():
if t_name in self.plugin_manager.transports:
self.plugin_manager.transports[t_name]["enabled"] = state
if not self.primary_protocol and self.plugin_manager.protocols:
self.primary_protocol = list(self.plugin_manager.protocols.keys())[0]
if not self.primary_transport and self.plugin_manager.transports:
self.primary_transport = list(self.plugin_manager.transports.keys())[0]
return True
except:
return False
return False
def save_config(self):
enabled_protos = {k: v["enabled"] for k, v in self.plugin_manager.protocols.items()}
enabled_trans = {k: v["enabled"] for k, v in self.plugin_manager.transports.items()}
cfg = {
"server_url": self.server_url,
"username": self.username,
"uin": self.uin,
"password": self.password,
"contacts": self.contacts,
"groups": self.groups,
"history": self.history,
"preserved": self.preserved,
"blocklist": self.blocklist,
"primary_protocol": self.primary_protocol,
"primary_transport": self.primary_transport,
"enabled_protocols": enabled_protos,
"enabled_transports": enabled_trans
}
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=4, ensure_ascii=False)
def play_dial_tones(self, target_uin):
self.dialing_uin = target_uin
while self.dialing_uin == target_uin and self.loop_running:
t = np.linspace(0, 1.0, int(44100 * 1.0), False)
tone = np.sin(425 * t * 2 * np.pi) * 0.5
sd.play(tone, 44100)
sd.wait()
for _ in range(30):
if self.dialing_uin != target_uin or not self.loop_running:
break
time.sleep(0.1)
def play_and_decode(self, audio_bytes, sender_uin):
audio_data = np.frombuffer(audio_bytes, dtype=np.float32)
sd.play(audio_data, 44100)
time.sleep(0.3)
samples_per_tone = int(44100 * 0.1)
current_sample = int(44100 * 0.3)
sender_name = self.username if sender_uin == self.uin else f"UIN {sender_uin}"
msg_buffer = f"[{sender_name}]: "
if sender_uin not in self.history:
self.history[sender_uin] = []
self.history[sender_uin].append(msg_buffer)
self.refresh_ui()
while current_sample < len(audio_data) and self.loop_running:
chunk = audio_data[current_sample : current_sample + samples_per_tone]
if len(chunk) < samples_per_tone:
break
window_data = np.hanning(len(chunk))
fft_data = np.abs(np.fft.rfft(chunk * window_data))
frequencies = np.fft.rfftfreq(len(chunk), d=1/44100)
detected_freq = frequencies[np.argmax(fft_data)]
ascii_code = int(round((detected_freq - 600) / 25))
if 0 <= ascii_code < 65535:
try:
char = chr(ascii_code)
self.history[sender_uin][-1] += char
self.refresh_ui()
except:
pass
time.sleep(0.1)
current_sample += samples_per_tone
sd.wait()
self.save_config()
def add_to_history(self, target_uin, line):
if target_uin not in self.history:
self.history[target_uin] = []
self.history[target_uin].append(line)
self.refresh_ui()
self.save_config()
def send_sys_packet(self, to_uin, cmd):
text_payload = f"SYS:{cmd}"
proto_order = []
if self.primary_protocol:
proto_order.append(self.primary_protocol)
for p in self.plugin_manager.protocols:
if p not in proto_order:
proto_order.append(p)
for p_name in proto_order:
encoded = self.plugin_manager.safe_encode(p_name, text_payload)
if encoded is not None:
wrapped = f"NATIVE:{p_name}:".encode('utf-8') + encoded
socket_manager.sio.emit("relay_packet", {"to_uin": to_uin, "payload": base64.b64encode(wrapped).decode('utf-8')})
return
def send_service_tone(self, to_uin, freq_or_type):
sig_map = {1200: "QUERY", 1400: "ALERT", 1600: "RESP_YES", 1800: "RESP_NO"}
sig_type = sig_map.get(freq_or_type, str(freq_or_type))
proto_order = []
if self.primary_protocol:
proto_order.append(self.primary_protocol)
for p in self.plugin_manager.protocols:
if p not in proto_order:
proto_order.append(p)
for p_name in proto_order:
encoded = self.plugin_manager.safe_generate_service(p_name, sig_type)
if encoded is not None:
wrapped = f"SERVICE:{p_name}:".encode('utf-8') + encoded
socket_manager.sio.emit("relay_packet", {"to_uin": to_uin, "payload": base64.b64encode(wrapped).decode('utf-8')})
return
def refresh_ui(self):
if self.app:
self.app.invalidate()
client = TyClient()
def status_checker_thread():
while client.loop_running:
if socket_manager.sio.connected and client.contacts:
socket_manager.sio.emit("check_online_statuses", list(client.contacts.keys()))
for uin in list(client.contacts.keys()):
if client.contacts[uin]["status"] == "online" and client.preserved.get(uin):
while client.preserved[uin]:
p_text = client.preserved[uin].pop(0)
proto_to_use = client.peer_session_protocols.get(uin, client.primary_protocol)
encoded = client.plugin_manager.safe_encode(proto_to_use, p_text)
if encoded is None:
proto_order = [p for p in client.plugin_manager.protocols if p != proto_to_use]
for p_name in proto_order:
encoded = client.plugin_manager.safe_encode(p_name, p_text)
if encoded is not None:
proto_to_use = p_name
client.peer_session_protocols[uin] = p_name
break
if encoded is not None:
wrapped = f"NATIVE:{proto_to_use}:".encode('utf-8') + encoded
p_b64 = base64.b64encode(wrapped).decode('utf-8')
socket_manager.sio.emit("relay_packet", {"to_uin": uin, "payload": p_b64})
client.add_to_history(uin, f"[You]: {p_text}")
else:
client.preserved[uin].insert(0, p_text)
client.add_to_history(uin, "[SYSTEM]: Failed to negotiate protocol. Stalling output.")
break
client.save_config()
time.sleep(10)
def make_layout():
def get_sidebar_text():
tokens = []
for idx, (uin, info) in enumerate(client.contacts.items()):
status_str = ""
if info.get("attention"): status_str = " [!]"
elif info.get("unread", 0) > 0: status_str = f" ({info['unread']})"
elif info.get("status") == "online": status_str = " *"
content = f" {uin}{status_str}"
if client.active_chat == uin and client.current_tab == "chat": style = "class:contact-active"
elif idx == client.selected_contact_idx and get_app().layout.has_focus(sidebar_window) and client.current_tab == "chat": style = "class:contact-focused"
else: style = "class:contact"
tokens.extend([(style, f"{content}\n")])
for g_id, g_info in client.groups.items():
content = f" [G] {g_info['title']}"
if client.active_chat == g_id and client.current_tab == "chat": style = "class:contact-active"
else: style = "class:contact"
tokens.extend([(style, f"{content}\n")])
return tokens
def get_main_text():
if not client.active_chat:
tokens = [("", "\n" * 2)]
for line in ASCII_ART.split("\n"):
tokens.append(("class:ascii", line + "\n"))
tokens.extend([
("", "\n"),
("class:desc", "Messenger on custom protocol named\n"),
("class:desc", "AcoustiOverSocket inspired by rtty\n"),
("", "\n"),
("class:help-tip", "Type / to trigger interactive command menu | F2: Settings Panel\n"),
("", "\n" * 2),
("class:help-tip", f"{DEV_SIGNATURE}\n")
])
if client.dialing_uin:
tokens.extend([
("", "\n" * 2),
("class:system", f"Dialing UIN {client.dialing_uin}... Waiting for accept.\n")
])
return tokens
tokens = []
lines = client.history.get(client.active_chat, [])
for line in lines:
if line.startswith("[You]:") and "(Preserved)" in line:
tokens.append(("class:preserved", line + "\n"))
elif line.startswith("[SYSTEM]:"):
tokens.append(("class:system", line + "\n"))
else:
tokens.append(("", line + "\n"))
return tokens
def get_settings_text():
tokens = [("class:popup-title", " === SETTINGS & PLUGINS PANEL ===\n\n")]
proto_style = "class:contact-focused" if client.settings_section == "protocols" else "class:contact"
trans_style = "class:contact-focused" if client.settings_section == "transports" else "class:contact"
tokens.append((proto_style, " [ PROTOCOLS ] "))
tokens.append(("", " "))
tokens.append((trans_style, " [ TRANSPORTS ] \n\n"))
collection = client.plugin_manager.protocols if client.settings_section == "protocols" else client.plugin_manager.transports
primary = client.primary_protocol if client.settings_section == "protocols" else client.primary_transport
items = list(collection.keys())
if not items:
tokens.append(("", " No items found in this section.\n"))
else:
for idx, name in enumerate(items):
info = collection[name]
marker = "[X]" if info["enabled"] else "[ ]"
prim_marker = " (Primary)" if name == primary else ""
fail_marker = " [CRASHED/DISABLED]" if info["failed"] else ""
if idx == client.settings_cursor:
style = "class:contact-active"
item_str = f" > {marker} {name}{prim_marker}{fail_marker}\n"
else:
style = "class:contact"
item_str = f" {marker} {name}{prim_marker}{fail_marker}\n"
tokens.append((style, item_str))
tokens.append(("", "\n--- Controls ---\n"))
tokens.append(("class:help-tip", " Left/Right: Switch sections\n"))
tokens.append(("class:help-tip", " Up/Down: Select item\n"))
tokens.append(("class:help-tip", " Space: Toggle Enable/Disable\n"))
tokens.append(("class:help-tip", " P: Set chosen item as Primary\n"))
tokens.append(("class:help-tip", " F2: Return back to Chat Panel\n"))
return tokens
def get_cursor_pos():
text = "".join(t[1] for t in get_main_text())
newlines = text.count('\n')
return Point(0, max(0, newlines - 1))
sidebar_control = FormattedTextControl(get_sidebar_text, focusable=True)
sidebar_window = Frame(Window(content=sidebar_control, width=25), title="chats", style="class:border")
main_control = FormattedTextControl(get_main_text, get_cursor_position=get_cursor_pos)
def get_main_title():
base_title = f"TyChat | You: {client.username} ({client.uin})"
if client.is_busy: base_title += " [DND/BUSY]"
if client.active_chat: return f"{base_title} | Chat: {client.active_chat}"
return base_title
main_window = Frame(Window(content=main_control, wrap_lines=True), title=get_main_title, style="class:border")
settings_control = FormattedTextControl(get_settings_text, focusable=True)
settings_window = Frame(Window(content=settings_control, wrap_lines=True), title="Settings Router", style="class:border")
input_field = TextArea(
height=3,
prompt="> ",
multiline=False,
wrap_lines=True,
completer=CommandCompleter(),
complete_while_typing=True
)
input_window = Frame(input_field, title="Input")
def accept_handler(buff):
text = input_field.text.strip()
if not text: return
if text.lower() == "/exit":
client.loop_running = False
socket_manager.sio.disconnect()
get_app().exit()
return
if text.lower() == "/help":
try: webbrowser.open(WIKI_URL)
except: pass
input_field.text = ""
return
if text.lower() == "/busy":
client.is_busy = not client.is_busy
if client.active_chat:
status_msg = "enabled" if client.is_busy else "disabled"
client.add_to_history(client.active_chat, f"[SYSTEM]: DND Mode {status_msg}.")
input_field.text = ""
client.refresh_ui()
return
if text.lower().startswith("/add "):
new_uin = text.split(" ", 1)[1].strip()
if not new_uin: return
if new_uin in client.contacts:
client.active_chat = new_uin
input_field.text = ""
client.refresh_ui()
return
client.active_chat = None
threading.Thread(target=client.play_dial_tones, args=(new_uin,), daemon=True).start()
client.send_sys_packet(new_uin, "REQ_ADD")
input_field.text = ""
client.refresh_ui()
return
if text.lower().startswith("/room create "):
match = re.match(r'^/room create "([^"]+)"$', text, re.IGNORECASE)
if match:
r_name = match.group(1)
r_id = f"ROOM:{client.uin}:{r_name}"
client.groups[r_id] = {"title": r_name, "owner": client.uin, "members": [client.uin]}
client.history[r_id] = [f"[SYSTEM]: Room \"{r_name}\" created successfully. Local GUID: {r_id}"]
client.active_chat = r_id
client.save_config()
client.refresh_ui()
input_field.text = ""
return
if text.lower().startswith("/room join "):
match = re.match(r'^/room join ([0-9]+):"([^"]+)"$', text, re.IGNORECASE)
if match:
r_owner = match.group(1)
r_name = match.group(2)
r_id = f"ROOM:{r_owner}:{r_name}"
if r_owner not in client.contacts and r_owner != client.uin:
client.contacts[r_owner] = {"status": "offline", "unread": 0, "attention": False}
client.groups[r_id] = {"title": r_name, "owner": r_owner, "members": list(set([client.uin, r_owner]))}
client.history[r_id] = [f"[SYSTEM]: Joined room \"{r_name}\". Local GUID: {r_id}"]
client.active_chat = r_id
client.save_config()
client.refresh_ui()
client.send_sys_packet(r_owner, f"REQ_JOIN_ROOM:{r_name}")
input_field.text = ""
return
if client.active_chat:
if client.active_chat.startswith("ROOM:"):
group = client.groups[client.active_chat]
owner_uin = group["owner"]
if owner_uin != client.uin and client.contacts.get(owner_uin, {}).get("status") != "online":
client.add_to_history(client.active_chat, "[SYSTEM]: CANNOT SEND. Owner of this room is offline for sync!")
input_field.text = ""
return
proto_to_use = client.primary_protocol
encoded = client.plugin_manager.safe_encode(proto_to_use, f"SYS:ROOM_MSG:{client.active_chat}:{client.uin}:{text}")
if encoded is None:
for p in client.plugin_manager.protocols:
encoded = client.plugin_manager.safe_encode(p, f"SYS:ROOM_MSG:{client.active_chat}:{client.uin}:{text}")
if encoded is not None:
proto_to_use = p
break
if encoded is not None:
wrapped = f"NATIVE:{proto_to_use}:".encode('utf-8') + encoded
p_b64 = base64.b64encode(wrapped).decode('utf-8')
if owner_uin == client.uin:
for m in group["members"]:
if m != client.uin and client.contacts.get(m, {}).get("status") == "online":
socket_manager.sio.emit("relay_packet", {"to_uin": m, "payload": p_b64})
else:
socket_manager.sio.emit("relay_packet", {"to_uin": owner_uin, "payload": p_b64})
client.add_to_history(client.active_chat, f"[You]: {text}")
else:
client.add_to_history(client.active_chat, "[SYSTEM]: Local engine failure encoding text frame.")
input_field.text = ""
return
if text.lower() == "/query":
client.send_service_tone(client.active_chat, 1200)
client.add_to_history(client.active_chat, "[SYSTEM]: Sent friendly status query...")
input_field.text = ""
return
if text.lower() == "/alert":
client.send_service_tone(client.active_chat, 1400)
client.add_to_history(client.active_chat, "[SYSTEM]: Sent URGENT alert signal...")
input_field.text = ""
return
if len(text) > 300:
client.add_to_history(client.active_chat, f"[SYSTEM]: Message too long!")
input_field.text = ""
return
proto_to_use = client.peer_session_protocols.get(client.active_chat, client.primary_protocol)
encoded = client.plugin_manager.safe_encode(proto_to_use, text)
if encoded is None:
proto_order = [p for p in client.plugin_manager.protocols if p != proto_to_use]
for p_name in proto_order:
encoded = client.plugin_manager.safe_encode(p_name, text)
if encoded is not None:
proto_to_use = p_name
client.peer_session_protocols[client.active_chat] = p_name
break
if encoded is not None:
wrapped = f"NATIVE:{proto_to_use}:".encode('utf-8') + encoded
p_b64 = base64.b64encode(wrapped).decode('utf-8')
if client.contacts[client.active_chat]["status"] == "online":
socket_manager.sio.emit("relay_packet", {"to_uin": client.active_chat, "payload": p_b64})
client.add_to_history(client.active_chat, f"[You]: {text}")
else:
if client.active_chat not in client.preserved: client.preserved[client.active_chat] = []
client.preserved[client.active_chat].append(text)
client.add_to_history(client.active_chat, f"[You]: {text} (Preserved)")
client.save_config()
else:
client.add_to_history(client.active_chat, "[SYSTEM]: Conversation pipeline failed. No working/enabled matching protocols.")
input_field.text = ""
input_field.accept_handler = accept_handler
@Condition
def is_chat_panel():
return client.current_tab == "chat"
@Condition
def is_settings_panel():
return client.current_tab == "settings"
right_display = HSplit([
ConditionalContainer(content=main_window, filter=is_chat_panel),
ConditionalContainer(content=settings_window, filter=is_settings_panel),
input_window
])
def get_popup_text():
if not client.pending_requests: return []
req_uin = client.pending_requests[0]
return [
("class:popup-title", f" Incoming Handshake Request \n"),
("class:popup-title", f" UIN: {req_uin} \n\n"),
("", " Do you want to add them to contacts?\n\n"),
("class:popup-keys", " [Y]es [N]o N[e]ver ")
]
popup_window = Frame(Window(FormattedTextControl(get_popup_text), align=WindowAlign.CENTER, width=42, height=6), style="class:popup-border")
def get_signal_popup_text():
if not client.pending_windows: return []
win_info = client.pending_windows[0]
w_type = win_info["type"].upper()
w_uin = win_info["uin"]
title_style = "class:alert-title" if w_type == "ALERT" else "class:query-title"
desc = "CRITICAL BREAK-IN! Are you busy?" if w_type == "ALERT" else "Friendly status query: Are you busy?"
return [
(title_style, f" *** INCOMING {w_type} SIGNAL *** \n"),
("class:popup-title", f" From UIN: {w_uin} \n\n"),
("", f" {desc}\n\n"),
("class:popup-keys", " [Y]es, I'm busy [N]o, go on ")
]
signal_popup_window = Frame(Window(FormattedTextControl(get_signal_popup_text), align=WindowAlign.CENTER, width=50, height=6), style="class:border")
def get_welcome_popup_text():
return [
("class:query-title", " *** ДОБРО ПОЖАЛОВАТЬ В TYCHAT! *** \n\n"),
("", " Привет! Вы только что зарегистрировались.\n"),
("", " Хотите автоматически присоединиться\n"),
("", " к всеобщей группе?\n\n"),
("class:popup-keys", " [Y]Да, зайти в Dnishe [N]Нет, я сам ")
]
welcome_popup_window = Frame(Window(FormattedTextControl(get_welcome_popup_text), align=WindowAlign.CENTER, width=48, height=7), style="class:border")
@Condition
def has_pending_request(): return len(client.pending_requests) > 0 and not client.show_welcome_popup
@Condition
def has_pending_window(): return len(client.pending_windows) > 0 and len(client.pending_requests) == 0 and not client.show_welcome_popup
@Condition
def has_welcome_popup(): return client.show_welcome_popup
root_container = FloatContainer(
content=VSplit([sidebar_window, right_display]),
floats=[
Float(content=ConditionalContainer(content=popup_window, filter=has_pending_request), transparent=False),
Float(content=ConditionalContainer(content=signal_popup_window, filter=has_pending_window), transparent=False),
Float(content=ConditionalContainer(content=welcome_popup_window, filter=has_welcome_popup), transparent=False)
]
)
kb = KeyBindings()
@kb.add('f2')
def _(event):
if client.current_tab == "chat":
client.current_tab = "settings"
client.settings_cursor = 0
else:
client.current_tab = "chat"
client.refresh_ui()
@kb.add('left', filter=is_settings_panel)
def _(event):
if client.settings_section == "transports":
client.settings_section = "protocols"
client.settings_cursor = 0
client.refresh_ui()
@kb.add('right', filter=is_settings_panel)
def _(event):
if client.settings_section == "protocols":
client.settings_section = "transports"
client.settings_cursor = 0
client.refresh_ui()
@kb.add('up', filter=is_settings_panel)
def _(event):
collection = client.plugin_manager.protocols if client.settings_section == "protocols" else client.plugin_manager.transports
items = list(collection.keys())
if items:
client.settings_cursor = (client.settings_cursor - 1) % len(items)
client.refresh_ui()
@kb.add('down', filter=is_settings_panel)
def _(event):
collection = client.plugin_manager.protocols if client.settings_section == "protocols" else client.plugin_manager.transports
items = list(collection.keys())
if items:
client.settings_cursor = (client.settings_cursor + 1) % len(items)
client.refresh_ui()
@kb.add('space', filter=is_settings_panel)
def _(event):
collection = client.plugin_manager.protocols if client.settings_section == "protocols" else client.plugin_manager.transports
items = list(collection.keys())
if items:
name = items[client.settings_cursor]
collection[name]["enabled"] = not collection[name]["enabled"]
client.save_config()
client.refresh_ui()
@kb.add('p', filter=is_settings_panel)
@kb.add('P', filter=is_settings_panel)
def _(event):
collection = client.plugin_manager.protocols if client.settings_section == "protocols" else client.plugin_manager.transports
items = list(collection.keys())
if items:
name = items[client.settings_cursor]
if collection[name]["enabled"]:
if client.settings_section == "protocols":
client.primary_protocol = name
else:
client.primary_transport = name
client.save_config()
client.refresh_ui()
@kb.add('tab', filter=~has_pending_request & ~has_pending_window & ~has_welcome_popup & is_chat_panel)
def _(event):
if event.app.layout.has_focus(input_field): event.app.layout.focus(sidebar_window)
else: event.app.layout.focus(input_field)
@kb.add('up', filter=has_focus(sidebar_window) & ~has_pending_request & ~has_pending_window & ~has_welcome_popup & is_chat_panel)
def _(event):
all_chats = list(client.contacts.keys()) + list(client.groups.keys())
if all_chats:
client.selected_contact_idx = (client.selected_contact_idx - 1) % len(all_chats)
target = all_chats[client.selected_contact_idx]
client.active_chat = target
if target in client.contacts:
client.contacts[target]["unread"] = 0
client.contacts[target]["attention"] = False
client.refresh_ui()
@kb.add('down', filter=has_focus(sidebar_window) & ~has_pending_request & ~has_pending_window & ~has_welcome_popup & is_chat_panel)
def _(event):
all_chats = list(client.contacts.keys()) + list(client.groups.keys())
if all_chats:
client.selected_contact_idx = (client.selected_contact_idx + 1) % len(all_chats)
target = all_chats[client.selected_contact_idx]
client.active_chat = target
if target in client.contacts:
client.contacts[target]["unread"] = 0
client.contacts[target]["attention"] = False
client.refresh_ui()
@kb.add('y', filter=has_welcome_popup)
@kb.add('Y', filter=has_welcome_popup)
def _(event):
client.show_welcome_popup = False
r_owner = "716041"
r_name = "Dnishe"
r_id = f"ROOM:{r_owner}:{r_name}"
client.contacts[r_owner] = {"status": "offline", "unread": 0, "attention": False}
client.groups[r_id] = {"title": r_name, "owner": r_owner, "members": [client.uin, r_owner]}
client.history[r_id] = [f"[SYSTEM]: Автоподключение! Комната \"{r_name}\". GUID: {r_id}"]
client.active_chat = r_id
client.save_config()
client.send_sys_packet(r_owner, f"REQ_JOIN_ROOM:{r_name}")
client.refresh_ui()
@kb.add('n', filter=has_welcome_popup)
@kb.add('N', filter=has_welcome_popup)
def _(event):
client.show_welcome_popup = False
client.refresh_ui()
@kb.add('y', filter=has_pending_window)
@kb.add('Y', filter=has_pending_window)
def _(event):
win_info = client.pending_windows.pop(0)
target_uin = win_info["uin"]
client.is_busy = True
client.send_service_tone(target_uin, 1600)
client.add_to_history(target_uin, "[You -> SYSTEM]: Sent Dial Tone -> Yes, I'm busy.")
client.refresh_ui()
@kb.add('n', filter=has_pending_window)
@kb.add('N', filter=has_pending_window)
def _(event):
win_info = client.pending_windows.pop(0)
target_uin = win_info["uin"]
client.send_service_tone(target_uin, 1800)
client.add_to_history(target_uin, "[You -> SYSTEM]: Sent Dial Tone -> No, go on.")
client.refresh_ui()
@kb.add('y', filter=has_pending_request)
@kb.add('Y', filter=has_pending_request)
def _(event):
req_uin = client.pending_requests.pop(0)
client.contacts[req_uin] = {"status": "online", "unread": 0, "attention": False}
client.history[req_uin] = ["[SYSTEM]: Handshake accepted. Contact added."]
client.active_chat = req_uin
client.save_config()
client.send_sys_packet(req_uin, "RES_ACC")
client.refresh_ui()
@kb.add('n', filter=has_pending_request)
@kb.add('N', filter=has_pending_request)
def _(event):
req_uin = client.pending_requests.pop(0)
client.send_sys_packet(req_uin, "RES_DEC")
client.refresh_ui()
@kb.add('c-c')
def _(event):
client.loop_running = False
socket_manager.sio.disconnect()
event.app.exit()
return Layout(root_container, focused_element=input_field), kb
ui_style = Style.from_dict({
'contact': '#ffffff',
'contact-focused': '#00aaaa bold',
'contact-active': '#00ff00 bold',
'ascii': '#00ff00 bold',
'desc': '#00ff00',
'help-tip': '#00ffff italic',
'preserved': '#ffff00',
'system': '#ff0000 bold',
'border': '#00ff00',
'frame.border': '#00ff00',
'popup-title': '#ffffff bold',
'popup-keys': '#00ff00 bold',
'popup-border': '#ff0000 bold',
'query-title': '#00ffff bold',
'alert-title': '#ff0000 bold',
'completion-menu.completion': 'bg:#00ffff #000000',
'completion-menu.completion.current': 'bg:#00aa00 #ffffff bold'
})
def main():
if os.name == 'nt':
import ctypes
try:
kernel32 = ctypes.windll.kernel32
kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)
except: pass
is_new_registration = False
socket_manager.init_network(client)
if not client.load_config():
print("!" * 60)
print("WARNING: All data is local.")
print("!" * 60 + "\n")
client.server_url = input("Enter server URL: ").strip()
if not client.server_url.startswith("http://") and not client.server_url.startswith("https://"):
client.server_url = "http://" + client.server_url
try: socket_manager.sio.connect(client.server_url, transports=['websocket'])
except Exception as e: return
print("\n1. Register\n2. Login")
mode = input("> ")
username_or_uin = input("UIN/Username: ").strip()
password = input("Password: ").strip()
event_wait = threading.Event()
@socket_manager.sio.event
def register_response(data):
nonlocal is_new_registration
if data["status"] == "success":
client.uin = data['uin']
client.username = username_or_uin
client.password = password
is_new_registration = True
event_wait.set()
@socket_manager.sio.event
def login_response(data):
if data["status"] == "success":
client.uin = username_or_uin
client.username = data["username"]
client.password = password
event_wait.set()
if mode == "1": socket_manager.sio.emit("register", {"username": username_or_uin, "password": password})
else: socket_manager.sio.emit("login", {"uin": username_or_uin, "password": password})
event_wait.wait()
if not client.uin: return
client.save_config()
input("Press Enter to open TUI...")
else:
try:
socket_manager.sio.connect(client.server_url, transports=['websocket'])
socket_manager.sio.emit("login", {"uin": client.uin, "password": client.password})
except: pass
if is_new_registration:
client.show_welcome_popup = True
threading.Thread(target=status_checker_thread, daemon=True).start()
layout, bindings = make_layout()
client.app = Application(
layout=layout,
key_bindings=bindings,
style=ui_style,
full_screen=True,
enable_page_navigation_bindings=True
)
client.app.run()
if __name__ == "__main__":
main()