This repository has been archived on 2026-05-21. You can view files and clone it, but cannot push or open issues or pull requests.
TyChat-TUI/client.py
2026-05-19 02:19:00 +03:00

840 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import subprocess
REQUIRED_PACKAGES = {
"python-socketio[client]": "socketio",
"websocket-client": "websocket",
"numpy": "numpy",
"sounddevice": "sounddevice",
"prompt_toolkit": "prompt_toolkit"
}
def auto_install_deps():
missing_packages = []
for pip_name, import_name in REQUIRED_PACKAGES.items():
try:
__import__(import_name)
except ImportError:
missing_packages.append(pip_name)
if not missing_packages:
return
cmd = [sys.executable, "-m", "pip", "install"] + missing_packages
result = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
if result.returncode != 0:
stderr_output = result.stderr.decode('utf-8', errors='ignore')
if "externally-managed-environment" in stderr_output.lower():
force_cmd = cmd + ["--break-system-packages"]
force_result = subprocess.run(force_cmd)
if force_result.returncode == 0:
sys.exit(0)
sys.exit(1)
auto_install_deps()
import os
import json
import time
import base64
import re
import threading
import webbrowser
import numpy as np
import sounddevice as sd
import socketio
from prompt_toolkit.application import Application
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.layout.containers import HSplit, VSplit, Window, FloatContainer, Float, ConditionalContainer, WindowAlign
from prompt_toolkit.layout.controls import FormattedTextControl
from prompt_toolkit.layout.layout import Layout
from prompt_toolkit.widgets import TextArea, Frame
from prompt_toolkit.styles import Style
from prompt_toolkit.application.current import get_app
from prompt_toolkit.filters import has_focus, Condition
from prompt_toolkit.data_structures import Point
from prompt_toolkit.completion import Completer, Completion
BASE_FREQ = 600
FREQ_STEP = 25
TONE_DURATION = 0.1
SAMPLE_RATE = 44100
CONFIG_FILE = "settings.json"
WIKI_URL = "https://git.idkmail.ru/lohrrrr/TyChat-Client/wiki"
DEV_SIGNATURE = "For you, from IDKMail Dev Group"
FREQ_QUERY = 1200
FREQ_ALERT = 1400
FREQ_RESP_YES = 1600
FREQ_RESP_NO = 1800
ASCII_ART = r"""
_______ _____ _ _
|__ __| / ____| | | |
| |_ _| | | |__ __ _| |_
| | | | | | | '_ \ / _` | __|
| | |_| | |____| | | | (_| | |_
|_|\__, |\_____|_| |_|\__,_|\__|
__/ |
|___/
"""
sio = socketio.Client()
class CommandCompleter(Completer):
def __init__(self):
self.completions = {
"/add": "Добавить контакт (/add <uin>)",
"/busy": "Режим DND (Не беспокоить)",
"/query": "Мягкий запрос статуса (Уважает DND)",
"/alert": "Экстренный вызов (Пробивает DND)",
"/room create ": "Создать комнату (/room create \"имя\")",
"/room join ": "Войти в комнату (/room join UIN:\"имя\")",
"/exit": "Выход из мессенджера",
"/help": "Открыть Wiki проекта"
}
def get_completions(self, document, complete_event):
text = document.text
if text.startswith('/'):
for cmd, desc in self.completions.items():
if cmd.startswith(text):
yield Completion(cmd, start_position=-len(text), display_meta=desc)
class TyClient:
def __init__(self):
self.server_url = ""
self.username = ""
self.uin = ""
self.password = ""
self.contacts = {}
self.groups = {}
self.history = {}
self.preserved = {}
self.blocklist = []
self.active_chat = None
self.app = None
self.loop_running = True
self.selected_contact_idx = 0
self.is_busy = False
self.dialing_uin = None
self.pending_requests = []
self.pending_windows = []
self.show_welcome_popup = False
def load_config(self):
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
cfg = json.load(f)
self.server_url = cfg.get("server_url", "")
self.username = cfg.get("username", "")
self.uin = cfg.get("uin", "")
self.password = cfg.get("password", "")
self.contacts = cfg.get("contacts", {})
self.groups = cfg.get("groups", {})
self.history = cfg.get("history", {})
self.preserved = cfg.get("preserved", {})
self.blocklist = cfg.get("blocklist", [])
return True
except:
return False
return False
def save_config(self):
cfg = {
"server_url": self.server_url,
"username": self.username,
"uin": self.uin,
"password": self.password,
"contacts": self.contacts,
"groups": self.groups,
"history": self.history,
"preserved": self.preserved,
"blocklist": self.blocklist
}
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=4, ensure_ascii=False)
def play_dial_tones(self, target_uin):
self.dialing_uin = target_uin
while self.dialing_uin == target_uin and self.loop_running:
t = np.linspace(0, 1.0, int(SAMPLE_RATE * 1.0), False)
tone = np.sin(425 * t * 2 * np.pi) * 0.5
sd.play(tone, SAMPLE_RATE)
sd.wait()
for _ in range(30):
if self.dialing_uin != target_uin or not self.loop_running:
break
time.sleep(0.1)
def generate_service_tone(self, frequency, duration=0.4):
t = np.linspace(0, duration, int(SAMPLE_RATE * duration), False)
tone = np.sin(frequency * t * 2 * np.pi) * 0.7
envelope = np.ones_like(tone)
fade_len = int(SAMPLE_RATE * 0.05)
envelope[:fade_len] = np.linspace(0, 1, fade_len)
envelope[-fade_len:] = np.linspace(1, 0, fade_len)
return (tone * envelope).astype(np.float32).tobytes()
def text_to_audio(self, text):
audio_signals = []
t_start = np.linspace(0, 0.3, int(SAMPLE_RATE * 0.3), False)
audio_signals.append(np.sin(1000 * t_start * 2 * np.pi))
for char in text:
freq = BASE_FREQ + ord(char) * FREQ_STEP
t = np.linspace(0, TONE_DURATION, int(SAMPLE_RATE * TONE_DURATION), False)
audio_signals.append(np.sin(freq * t * 2 * np.pi))
full_audio = np.concatenate(audio_signals).astype(np.float32)
if np.max(np.abs(full_audio)) > 0:
full_audio = full_audio / np.max(np.abs(full_audio))
return full_audio.tobytes()
def detect_service_tone(self, audio_data):
try:
window_data = np.hanning(len(audio_data))
fft_data = np.abs(np.fft.rfft(audio_data * window_data))
frequencies = np.fft.rfftfreq(len(audio_data), d=1/SAMPLE_RATE)
detected_freq = frequencies[np.argmax(fft_data)]
for target in [FREQ_QUERY, FREQ_ALERT, FREQ_RESP_YES, FREQ_RESP_NO]:
if abs(detected_freq - target) <= 15:
return target
return None
except:
return None
def fast_decode(self, audio_data):
try:
samples_per_tone = int(SAMPLE_RATE * TONE_DURATION)
current_sample = int(SAMPLE_RATE * 0.3)
text = ""
while current_sample < len(audio_data):
chunk = audio_data[current_sample : current_sample + samples_per_tone]
if len(chunk) < samples_per_tone:
break
window_data = np.hanning(len(chunk))
fft_data = np.abs(np.fft.rfft(chunk * window_data))
frequencies = np.fft.rfftfreq(len(chunk), d=1/SAMPLE_RATE)
detected_freq = frequencies[np.argmax(fft_data)]
ascii_code = int(round((detected_freq - BASE_FREQ) / FREQ_STEP))
if 0 <= ascii_code < 65535:
text += chr(ascii_code)
current_sample += samples_per_tone
return text
except:
return ""
def play_and_decode(self, audio_bytes, sender_uin):
audio_data = np.frombuffer(audio_bytes, dtype=np.float32)
sd.play(audio_data, SAMPLE_RATE)
time.sleep(0.3)
samples_per_tone = int(SAMPLE_RATE * TONE_DURATION)
current_sample = int(SAMPLE_RATE * 0.3)
sender_name = self.username if sender_uin == self.uin else f"UIN {sender_uin}"
msg_buffer = f"[{sender_name}]: "
if sender_uin not in self.history:
self.history[sender_uin] = []
self.history[sender_uin].append(msg_buffer)
self.refresh_ui()
while current_sample < len(audio_data) and self.loop_running:
chunk = audio_data[current_sample : current_sample + samples_per_tone]
if len(chunk) < samples_per_tone:
break
window_data = np.hanning(len(chunk))
fft_data = np.abs(np.fft.rfft(chunk * window_data))
frequencies = np.fft.rfftfreq(len(chunk), d=1/SAMPLE_RATE)
detected_freq = frequencies[np.argmax(fft_data)]
ascii_code = int(round((detected_freq - BASE_FREQ) / FREQ_STEP))
if 0 <= ascii_code < 65535:
try:
char = chr(ascii_code)
self.history[sender_uin][-1] += char
self.refresh_ui()
except:
pass
time.sleep(TONE_DURATION)
current_sample += samples_per_tone
sd.wait()
self.save_config()
def add_to_history(self, target_uin, line):
if target_uin not in self.history:
self.history[target_uin] = []
self.history[target_uin].append(line)
self.refresh_ui()
self.save_config()
def send_sys_packet(self, to_uin, cmd):
audio_b = self.text_to_audio(f"SYS:{cmd}")
sio.emit("relay_packet", {"to_uin": to_uin, "payload": base64.b64encode(audio_b).decode('utf-8')})
def send_service_tone(self, to_uin, freq):
audio_b = self.generate_service_tone(freq)
sio.emit("relay_packet", {"to_uin": to_uin, "payload": base64.b64encode(audio_b).decode('utf-8')})
def refresh_ui(self):
if self.app:
self.app.invalidate()
client = TyClient()
@sio.event
def incoming_packet(data):
from_uin = data["from_uin"]
payload_base64 = data["payload"]
try:
audio_bytes = base64.b64decode(payload_base64.encode('utf-8'))
audio_data = np.frombuffer(audio_bytes, dtype=np.float32)
srv_tone = client.detect_service_tone(audio_data)
if srv_tone:
sd.play(audio_data, SAMPLE_RATE)
if from_uin not in client.contacts:
return
if srv_tone == FREQ_QUERY:
if client.is_busy:
client.send_service_tone(from_uin, FREQ_RESP_YES)
else:
client.pending_windows.append({"type": "query", "uin": from_uin})
client.refresh_ui()
return
elif srv_tone == FREQ_ALERT:
client.pending_windows.append({"type": "alert", "uin": from_uin})
client.refresh_ui()
return
elif srv_tone == FREQ_RESP_YES:
client.add_to_history(from_uin, f"[SYSTEM]: Quick status answer -> Yes, I'm busy")
return
elif srv_tone == FREQ_RESP_NO:
client.add_to_history(from_uin, f"[SYSTEM]: Quick status answer -> No, go on")
return
fast_text = client.fast_decode(audio_data)
if fast_text.startswith("SYS:"):
cmd = fast_text[4:]
if cmd.startswith("ROOM_MSG:"):
parts = cmd.split(":", 3)
if len(parts) >= 4:
room_id = parts[1]
sender_uin = parts[2]
actual_msg = parts[3]
if room_id in client.groups:
client.add_to_history(room_id, f"[UIN {sender_uin}]: {actual_msg}")
return
elif cmd == "REQ_ADD":
if from_uin in client.blocklist:
client.send_sys_packet(from_uin, "RES_DEC")
elif client.is_busy:
client.send_sys_packet(from_uin, "RES_BSY")
else:
if from_uin not in client.pending_requests and from_uin not in client.contacts:
client.pending_requests.append(from_uin)
client.refresh_ui()
return
elif cmd == "RES_ACC":
if client.dialing_uin == from_uin:
client.dialing_uin = None
if from_uin not in client.contacts:
client.contacts[from_uin] = {"status": "online", "unread": 0, "attention": False}
client.history[from_uin] = ["[SYSTEM]: Handshake accepted. Contact added."]
client.save_config()
client.active_chat = from_uin
client.refresh_ui()
return
elif cmd == "RES_DEC" or cmd == "RES_BSY":
if client.dialing_uin == from_uin:
client.dialing_uin = None
return
if from_uin not in client.contacts:
return
if client.active_chat != from_uin:
client.contacts[from_uin]["unread"] += 1
threading.Thread(target=client.play_and_decode, args=(audio_bytes, from_uin), daemon=True).start()
except Exception as e:
pass
@sio.event
def online_statuses_response(data):
for uin, status in data.items():
if uin in client.contacts:
client.contacts[uin]["status"] = status
client.refresh_ui()
@sio.event
def error(data):
msg = data.get("message")
target = data.get("target_uin")
if msg == "offline" and target:
if client.dialing_uin == target:
client.dialing_uin = None
if target in client.contacts:
client.contacts[target]["status"] = "offline"
client.add_to_history(target, f"[SYSTEM]: They went offline. Outgoing stack will be preserved.")
def status_checker_thread():
while client.loop_running:
if sio.connected and client.contacts:
sio.emit("check_online_statuses", list(client.contacts.keys()))
for uin in list(client.contacts.keys()):
if client.contacts[uin]["status"] == "online" and client.preserved.get(uin):
while client.preserved[uin]:
p_text = client.preserved[uin].pop(0)
audio_b = client.text_to_audio(p_text)
p_b64 = base64.b64encode(audio_b).decode('utf-8')
sio.emit("relay_packet", {"to_uin": uin, "payload": p_b64})
client.add_to_history(uin, f"[You]: {p_text}")
client.save_config()
time.sleep(10)
def make_layout():
def get_sidebar_text():
tokens = []
for idx, (uin, info) in enumerate(client.contacts.items()):
status_str = ""
if info.get("attention"): status_str = " [!]"
elif info.get("unread", 0) > 0: status_str = f" ({info['unread']})"
elif info.get("status") == "online": status_str = " *"
content = f" {uin}{status_str}"
if client.active_chat == uin: style = "class:contact-active"
elif idx == client.selected_contact_idx and get_app().layout.has_focus(sidebar_window): style = "class:contact-focused"
else: style = "class:contact"
tokens.extend([(style, f"{content}\n")])
for g_id, g_info in client.groups.items():
content = f" [G] {g_info['title']}"
if client.active_chat == g_id: style = "class:contact-active"
else: style = "class:contact"
tokens.extend([(style, f"{content}\n")])
return tokens
def get_main_text():
if not client.active_chat:
tokens = [("", "\n" * 2)]
for line in ASCII_ART.split("\n"):
tokens.append(("class:ascii", line + "\n"))
tokens.extend([
("", "\n"),
("class:desc", "Messenger on custom protocol named\n"),
("class:desc", "AcoustiOverSocket inspired by rtty\n"),
("", "\n"),
("class:help-tip", "Type / to trigger interactive command menu\n"),
("", "\n" * 2),
("class:help-tip", f"{DEV_SIGNATURE}\n")
])
if client.dialing_uin:
tokens.extend([
("", "\n" * 2),
("class:system", f"Dialing UIN {client.dialing_uin}... Waiting for accept.\n")
])
return tokens
tokens = []
lines = client.history.get(client.active_chat, [])
for line in lines:
if line.startswith("[You]:") and "(Preserved)" in line:
tokens.append(("class:preserved", line + "\n"))
elif line.startswith("[SYSTEM]:"):
tokens.append(("class:system", line + "\n"))
else:
tokens.append(("", line + "\n"))
return tokens
def get_cursor_pos():
text = "".join(t[1] for t in get_main_text())
newlines = text.count('\n')
return Point(0, max(0, newlines - 1))
sidebar_control = FormattedTextControl(get_sidebar_text, focusable=True)
sidebar_window = Frame(Window(content=sidebar_control, width=25), title="chats", style="class:border")
main_control = FormattedTextControl(get_main_text, get_cursor_position=get_cursor_pos)
def get_main_title():
base_title = f"TyChat | You: {client.username} ({client.uin})"
if client.is_busy: base_title += " [DND/BUSY]"
if client.active_chat: return f"{base_title} | Chat: {client.active_chat}"
return base_title
main_window = Frame(Window(content=main_control, wrap_lines=True), title=get_main_title, style="class:border")
input_field = TextArea(
height=3,
prompt="> ",
multiline=False,
wrap_lines=True,
completer=CommandCompleter(),
complete_while_typing=True
)
input_window = Frame(input_field, title="Input")
def accept_handler(buff):
text = input_field.text.strip()
if not text: return
if text.lower() == "/exit":
client.loop_running = False
sio.disconnect()
get_app().exit()
return
if text.lower() == "/help":
try: webbrowser.open(WIKI_URL)
except: pass
input_field.text = ""
return
if text.lower() == "/busy":
client.is_busy = not client.is_busy
if client.active_chat:
status_msg = "enabled" if client.is_busy else "disabled"
client.add_to_history(client.active_chat, f"[SYSTEM]: DND Mode {status_msg}.")
input_field.text = ""
client.refresh_ui()
return
if text.lower().startswith("/add "):
new_uin = text.split(" ", 1)[1].strip()
if not new_uin: return
if new_uin in client.contacts:
client.active_chat = new_uin
input_field.text = ""
client.refresh_ui()
return
client.active_chat = None
threading.Thread(target=client.play_dial_tones, args=(new_uin,), daemon=True).start()
client.send_sys_packet(new_uin, "REQ_ADD")
input_field.text = ""
client.refresh_ui()
return
if text.lower().startswith("/room create "):
match = re.match(r'^/room create "([^"]+)"$', text, re.IGNORECASE)
if match:
r_name = match.group(1)
r_id = f"ROOM:{client.uin}:{r_name}"
client.groups[r_id] = {"title": r_name, "owner": client.uin, "members": [client.uin]}
client.history[r_id] = [f"[SYSTEM]: Room \"{r_name}\" created successfully. Local GUID: {r_id}"]
client.active_chat = r_id
client.save_config()
client.refresh_ui()
input_field.text = ""
return
if text.lower().startswith("/room join "):
match = re.match(r'^/room join ([0-9]+):"([^"]+)"$', text, re.IGNORECASE)
if match:
r_owner = match.group(1)
r_name = match.group(2)
r_id = f"ROOM:{r_owner}:{r_name}"
if r_owner not in client.contacts and r_owner != client.uin:
client.contacts[r_owner] = {"status": "offline", "unread": 0, "attention": False}
client.groups[r_id] = {"title": r_name, "owner": r_owner, "members": list(set([client.uin, r_owner]))}
client.history[r_id] = [f"[SYSTEM]: Joined room \"{r_name}\". Local GUID: {r_id}"]
client.active_chat = r_id
client.save_config()
client.refresh_ui()
client.send_sys_packet(r_owner, f"REQ_JOIN_ROOM:{r_name}")
input_field.text = ""
return
if client.active_chat:
if client.active_chat.startswith("ROOM:"):
group = client.groups[client.active_chat]
owner_uin = group["owner"]
if owner_uin != client.uin and client.contacts.get(owner_uin, {}).get("status") != "online":
client.add_to_history(client.active_chat, "[SYSTEM]: CANNOT SEND. Owner of this room is offline for sync!")
input_field.text = ""
return
audio_b = client.text_to_audio(f"SYS:ROOM_MSG:{client.active_chat}:{client.uin}:{text}")
p_b64 = base64.b64encode(audio_b).decode('utf-8')
if owner_uin == client.uin:
for m in group["members"]:
if m != client.uin and client.contacts.get(m, {}).get("status") == "online":
sio.emit("relay_packet", {"to_uin": m, "payload": p_b64})
else:
sio.emit("relay_packet", {"to_uin": owner_uin, "payload": p_b64})
client.add_to_history(client.active_chat, f"[You]: {text}")
input_field.text = ""
return
if text.lower() == "/query":
client.send_service_tone(client.active_chat, FREQ_QUERY)
client.add_to_history(client.active_chat, "[SYSTEM]: Sent friendly status query...")
input_field.text = ""
return
if text.lower() == "/alert":
client.send_service_tone(client.active_chat, FREQ_ALERT)
client.add_to_history(client.active_chat, "[SYSTEM]: Sent URGENT alert signal (breaks DND)...")
input_field.text = ""
return
if len(text) > 300:
client.add_to_history(client.active_chat, f"[SYSTEM]: Message too long!")
input_field.text = ""
return
audio_b = client.text_to_audio(text)
p_b64 = base64.b64encode(audio_b).decode('utf-8')
if client.contacts[client.active_chat]["status"] == "online":
sio.emit("relay_packet", {"to_uin": client.active_chat, "payload": p_b64})
client.add_to_history(client.active_chat, f"[You]: {text}")
else:
if client.active_chat not in client.preserved: client.preserved[client.active_chat] = []
client.preserved[client.active_chat].append(text)
client.add_to_history(client.active_chat, f"[You]: {text} (Preserved)")
client.save_config()
input_field.text = ""
input_field.accept_handler = accept_handler
right_side = HSplit([main_window, input_window])
def get_popup_text():
if not client.pending_requests: return []
req_uin = client.pending_requests[0]
return [
("class:popup-title", f" Incoming Handshake Request \n"),
("class:popup-title", f" UIN: {req_uin} \n\n"),
("", " Do you want to add them to contacts?\n\n"),
("class:popup-keys", " [Y]es [N]o N[e]ver ")
]
popup_window = Frame(Window(FormattedTextControl(get_popup_text), align=WindowAlign.CENTER, width=42, height=6), style="class:popup-border")
def get_signal_popup_text():
if not client.pending_windows: return []
win_info = client.pending_windows[0]
w_type = win_info["type"].upper()
w_uin = win_info["uin"]
title_style = "class:alert-title" if w_type == "ALERT" else "class:query-title"
desc = "CRITICAL BREAK-IN! Are you busy?" if w_type == "ALERT" else "Friendly status query: Are you busy?"
return [
(title_style, f" *** INCOMING {w_type} SIGNAL *** \n"),
("class:popup-title", f" From UIN: {w_uin} \n\n"),
("", f" {desc}\n\n"),
("class:popup-keys", " [Y]es, I'm busy [N]o, go on ")
]
signal_popup_window = Frame(Window(FormattedTextControl(get_signal_popup_text), align=WindowAlign.CENTER, width=50, height=6), style="class:border")
def get_welcome_popup_text():
return [
("class:query-title", " *** ДОБРО ПОЖАЛОВАТЬ В TYCHAT! *** \n\n"),
("", " Привет! Вы только что зарегистрировались.\n"),
("", " Хотите автоматически присоединиться\n"),
("", " к всеобщей группе?\n\n"),
("class:popup-keys", " [Y]Да, зайти в Dnishe [N]Нет, я сам ")
]
welcome_popup_window = Frame(Window(FormattedTextControl(get_welcome_popup_text), align=WindowAlign.CENTER, width=48, height=7), style="class:border")
@Condition
def has_pending_request(): return len(client.pending_requests) > 0 and not client.show_welcome_popup
@Condition
def has_pending_window(): return len(client.pending_windows) > 0 and len(client.pending_requests) == 0 and not client.show_welcome_popup
@Condition
def has_welcome_popup(): return client.show_welcome_popup
root_container = FloatContainer(
content=VSplit([sidebar_window, right_side]),
floats=[
Float(content=ConditionalContainer(content=popup_window, filter=has_pending_request), transparent=False),
Float(content=ConditionalContainer(content=signal_popup_window, filter=has_pending_window), transparent=False),
Float(content=ConditionalContainer(content=welcome_popup_window, filter=has_welcome_popup), transparent=False)
]
)
kb = KeyBindings()
@kb.add('tab', filter=~has_pending_request & ~has_pending_window & ~has_welcome_popup)
def _(event):
if event.app.layout.has_focus(input_field): event.app.layout.focus(sidebar_window)
else: event.app.layout.focus(input_field)
@kb.add('up', filter=has_focus(sidebar_window) & ~has_pending_request & ~has_pending_window & ~has_welcome_popup)
def _(event):
all_chats = list(client.contacts.keys()) + list(client.groups.keys())
if all_chats:
client.selected_contact_idx = (client.selected_contact_idx - 1) % len(all_chats)
target = all_chats[client.selected_contact_idx]
client.active_chat = target
if target in client.contacts:
client.contacts[target]["unread"] = 0
client.contacts[target]["attention"] = False
client.refresh_ui()
@kb.add('down', filter=has_focus(sidebar_window) & ~has_pending_request & ~has_pending_window & ~has_welcome_popup)
def _(event):
all_chats = list(client.contacts.keys()) + list(client.groups.keys())
if all_chats:
client.selected_contact_idx = (client.selected_contact_idx + 1) % len(all_chats)
target = all_chats[client.selected_contact_idx]
client.active_chat = target
if target in client.contacts:
client.contacts[target]["unread"] = 0
client.contacts[target]["attention"] = False
client.refresh_ui()
@kb.add('y', filter=has_welcome_popup)
@kb.add('Y', filter=has_welcome_popup)
def _(event):
client.show_welcome_popup = False
r_owner = "716041"
r_name = "Dnishe"
r_id = f"ROOM:{r_owner}:{r_name}"
client.contacts[r_owner] = {"status": "offline", "unread": 0, "attention": False}
client.groups[r_id] = {"title": r_name, "owner": r_owner, "members": [client.uin, r_owner]}
client.history[r_id] = [f"[SYSTEM]: Автоподключение! Комната \"{r_name}\". GUID: {r_id}"]
client.active_chat = r_id
client.save_config()
client.send_sys_packet(r_owner, f"REQ_JOIN_ROOM:{r_name}")
client.refresh_ui()
@kb.add('n', filter=has_welcome_popup)
@kb.add('N', filter=has_welcome_popup)
def _(event):
client.show_welcome_popup = False
client.refresh_ui()
@kb.add('y', filter=has_pending_window)
@kb.add('Y', filter=has_pending_window)
def _(event):
win_info = client.pending_windows.pop(0)
target_uin = win_info["uin"]
client.is_busy = True
client.send_service_tone(target_uin, FREQ_RESP_YES)
client.add_to_history(target_uin, "[You -> SYSTEM]: Sent Dial Tone -> Yes, I'm busy.")
client.refresh_ui()
@kb.add('n', filter=has_pending_window)
@kb.add('N', filter=has_pending_window)
def _(event):
win_info = client.pending_windows.pop(0)
target_uin = win_info["uin"]
client.send_service_tone(target_uin, FREQ_RESP_NO)
client.add_to_history(target_uin, "[You -> SYSTEM]: Sent Dial Tone -> No, go on.")
client.refresh_ui()
@kb.add('y', filter=has_pending_request)
@kb.add('Y', filter=has_pending_request)
def _(event):
req_uin = client.pending_requests.pop(0)
client.contacts[req_uin] = {"status": "online", "unread": 0, "attention": False}
client.history[req_uin] = ["[SYSTEM]: Handshake accepted. Contact added."]
client.active_chat = req_uin
client.save_config()
client.send_sys_packet(req_uin, "RES_ACC")
client.refresh_ui()
@kb.add('n', filter=has_pending_request)
@kb.add('N', filter=has_pending_request)
def _(event):
req_uin = client.pending_requests.pop(0)
client.send_sys_packet(req_uin, "RES_DEC")
client.refresh_ui()
@kb.add('c-c')
def _(event):
client.loop_running = False
sio.disconnect()
event.app.exit()
return Layout(root_container, focused_element=input_field), kb
ui_style = Style.from_dict({
'contact': '#ffffff',
'contact-focused': '#00aaaa bold',
'contact-active': '#00ff00 bold',
'ascii': '#00ff00 bold',
'desc': '#00ff00',
'help-tip': '#00ffff italic',
'preserved': '#ffff00',
'system': '#ff0000 bold',
'border': '#00ff00',
'frame.border': '#00ff00',
'popup-title': '#ffffff bold',
'popup-keys': '#00ff00 bold',
'popup-border': '#ff0000 bold',
'query-title': '#00ffff bold',
'alert-title': '#ff0000 bold',
'completion-menu.completion': 'bg:#00ffff #000000',
'completion-menu.completion.current': 'bg:#00aa00 #ffffff bold'
})
def main():
if os.name == 'nt':
import ctypes
try:
kernel32 = ctypes.windll.kernel32
kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)
except: pass
is_new_registration = False
if not client.load_config():
print("!" * 60)
print("WARNING: All data is local.")
print("!" * 60 + "\n")
client.server_url = input("Enter server URL: ").strip()
if not client.server_url.startswith("http://") and not client.server_url.startswith("https://"):
client.server_url = "http://" + client.server_url
try: sio.connect(client.server_url, transports=['websocket'])
except Exception as e: return
print("\n1. Register\n2. Login")
mode = input("> ")
username_or_uin = input("UIN/Username: ").strip()
password = input("Password: ").strip()
event_wait = threading.Event()
@sio.event
def register_response(data):
nonlocal is_new_registration
if data["status"] == "success":
client.uin = data['uin']
client.username = username_or_uin
client.password = password
is_new_registration = True
event_wait.set()
@sio.event
def login_response(data):
if data["status"] == "success":
client.uin = username_or_uin
client.username = data["username"]
client.password = password
event_wait.set()
if mode == "1": sio.emit("register", {"username": username_or_uin, "password": password})
else: sio.emit("login", {"uin": username_or_uin, "password": password})
event_wait.wait()
if not client.uin: return
client.save_config()
input("Press Enter to open TUI...")
else:
try:
sio.connect(client.server_url, transports=['websocket'])
sio.emit("login", {"uin": client.uin, "password": client.password})
except: pass
if is_new_registration:
client.show_welcome_popup = True
threading.Thread(target=status_checker_thread, daemon=True).start()
layout, bindings = make_layout()
client.app = Application(
layout=layout,
key_bindings=bindings,
style=ui_style,
full_screen=True,
enable_page_navigation_bindings=True
)
client.app.run()
if __name__ == "__main__":
main()