From 5196383bc3ac4905e798ac81c7142fa351b795f8 Mon Sep 17 00:00:00 2001 From: lohrrrr Date: Tue, 19 May 2026 00:01:34 +0300 Subject: [PATCH] Upload files to "/" --- client.py | 548 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 548 insertions(+) create mode 100644 client.py diff --git a/client.py b/client.py new file mode 100644 index 0000000..d923aaa --- /dev/null +++ b/client.py @@ -0,0 +1,548 @@ +import sys +import subprocess + +REQUIRED_PACKAGES = { + "python-socketio[client]": "socketio", + "websocket-client": "websocket", + "numpy": "numpy", + "sounddevice": "sounddevice", + "prompt_toolkit": "prompt_toolkit" +} + +def auto_install_deps(): + missing_packages = [] + for pip_name, import_name in REQUIRED_PACKAGES.items(): + try: + __import__(import_name) + except ImportError: + missing_packages.append(pip_name) + + if not missing_packages: + return + + print("[Auto-Installer] Installing client dependencies:", ", ".join(missing_packages)) + cmd = [sys.executable, "-m", "pip", "install"] + missing_packages + result = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) + + if result.returncode != 0: + stderr_output = result.stderr.decode('utf-8', errors='ignore') + if "externally-managed-environment" in stderr_output.lower(): + print("\n" + "!"*50) + print("[Warning] OS blocks global package installation via pip.") + print("!"*50 + "\n") + choice = input("Use --break-system-packages flag? (y/n): ").strip().lower() + if choice == 'y': + force_cmd = cmd + ["--break-system-packages"] + force_result = subprocess.run(force_cmd) + if force_result.returncode == 0: + print("[Success] Restart the script!") + sys.exit(0) + sys.exit(1) + +auto_install_deps() + +import os +import json +import time +import base64 +import re +import threading +import webbrowser +import numpy as np +import sounddevice as sd +import socketio + +from prompt_toolkit.application import Application +from prompt_toolkit.key_binding import KeyBindings +from prompt_toolkit.layout.containers import HSplit, VSplit, Window +from prompt_toolkit.layout.controls import FormattedTextControl +from prompt_toolkit.layout.layout import Layout +from prompt_toolkit.widgets import TextArea, Frame +from prompt_toolkit.styles import Style +from prompt_toolkit.application.current import get_app +from prompt_toolkit.filters import has_focus + +BASE_FREQ = 600 +FREQ_STEP = 25 +TONE_DURATION = 0.1 +SAMPLE_RATE = 44100 +CONFIG_FILE = "settings.json" +WIKI_URL = "https://git.idkmail.ru/lohrrrr/TyChat-Client/wiki" +DEV_SIGNATURE = "For you, from IDKMail Dev Group" + +ASCII_ART = r""" + _______ _____ _ _ + |__ __| / ____| | | | + | |_ _| | | |__ __ _| |_ + | | | | | | | '_ \ / _` | __| + | | |_| | |____| | | | (_| | |_ + |_|\__, |\_____|_| |_|\__,_|\__| + __/ | + |___/ +""" + +sio = socketio.Client() + +class TyClient: + def __init__(self): + self.server_url = "" + self.username = "" + self.uin = "" + self.password = "" + self.contacts = {} + self.history = {} + self.preserved = {} + self.active_chat = None + self.app = None + self.loop_running = True + self.selected_contact_idx = 0 + + def load_config(self): + if os.path.exists(CONFIG_FILE): + try: + with open(CONFIG_FILE, "r", encoding="utf-8") as f: + cfg = json.load(f) + self.server_url = cfg.get("server_url", "") + self.username = cfg.get("username", "") + self.uin = cfg.get("uin", "") + self.password = cfg.get("password", "") + self.contacts = cfg.get("contacts", {}) + self.history = cfg.get("history", {}) + self.preserved = cfg.get("preserved", {}) + return True + except: + return False + return False + + def save_config(self): + cfg = { + "server_url": self.server_url, + "username": self.username, + "uin": self.uin, + "password": self.password, + "contacts": self.contacts, + "history": self.history, + "preserved": self.preserved + } + with open(CONFIG_FILE, "w", encoding="utf-8") as f: + json.dump(cfg, f, indent=4, ensure_ascii=False) + + def text_to_audio(self, text): + audio_signals = [] + t_start = np.linspace(0, 0.3, int(SAMPLE_RATE * 0.3), False) + audio_signals.append(np.sin(1000 * t_start * 2 * np.pi)) + for char in text: + freq = BASE_FREQ + ord(char) * FREQ_STEP + t = np.linspace(0, TONE_DURATION, int(SAMPLE_RATE * TONE_DURATION), False) + audio_signals.append(np.sin(freq * t * 2 * np.pi)) + full_audio = np.concatenate(audio_signals).astype(np.float32) + if np.max(np.abs(full_audio)) > 0: + full_audio = full_audio / np.max(np.abs(full_audio)) + return full_audio.tobytes() + + def play_and_decode(self, audio_bytes, sender_uin): + audio_data = np.frombuffer(audio_bytes, dtype=np.float32) + sd.play(audio_data, SAMPLE_RATE) + time.sleep(0.3) + samples_per_tone = int(SAMPLE_RATE * TONE_DURATION) + current_sample = int(SAMPLE_RATE * 0.3) + + sender_name = self.username if sender_uin == self.uin else f"UIN {sender_uin}" + msg_buffer = f"[{sender_name}]: " + + if sender_uin not in self.history: + self.history[sender_uin] = [] + self.history[sender_uin].append(msg_buffer) + self.refresh_ui() + + full_text_received = "" + while current_sample < len(audio_data) and self.loop_running: + chunk = audio_data[current_sample : current_sample + samples_per_tone] + if len(chunk) < samples_per_tone: + break + window = np.hanning(len(chunk)) + fft_data = np.abs(np.fft.rfft(chunk * window)) + frequencies = np.fft.rfftfreq(len(chunk), d=1/SAMPLE_RATE) + detected_freq = frequencies[np.argmax(fft_data)] + ascii_code = int(round((detected_freq - BASE_FREQ) / FREQ_STEP)) + if 0 <= ascii_code < 65535: + try: + char = chr(ascii_code) + self.history[sender_uin][-1] += char + full_text_received += char + self.refresh_ui() + except: + pass + time.sleep(TONE_DURATION) + current_sample += samples_per_tone + + if "ALARM! URGENT CALL!" in full_text_received: + client.contacts[sender_uin]["attention"] = True + self.refresh_ui() + + sd.wait() + self.save_config() + + def add_to_history(self, target_uin, line): + if target_uin not in self.history: + self.history[target_uin] = [] + self.history[target_uin].append(line) + self.refresh_ui() + self.save_config() + + def refresh_ui(self): + if self.app: + self.app.invalidate() + +client = TyClient() + +@sio.event +def incoming_packet(data): + from_uin = data["from_uin"] + payload_base64 = data["payload"] + try: + audio_bytes = base64.b64decode(payload_base64.encode('utf-8')) + if from_uin not in client.contacts: + client.contacts[from_uin] = {"status": "offline", "unread": 0, "attention": False} + if client.active_chat != from_uin: + client.contacts[from_uin]["unread"] += 1 + threading.Thread(target=client.play_and_decode, args=(audio_bytes, from_uin), daemon=True).start() + except: + pass + +@sio.event +def online_statuses_response(data): + for uin, status in data.items(): + if uin in client.contacts: + client.contacts[uin]["status"] = status + client.refresh_ui() + +@sio.event +def error(data): + msg = data.get("message") + target = data.get("target_uin") + if msg == "offline" and target: + if target in client.contacts: + client.contacts[target]["status"] = "offline" + client.add_to_history(target, f"[SYSTEM]: They went offline. Everything that you will send now will be sent if they will back online while your client opened.") + if target in client.preserved: + if len(client.preserved[target]) > 0: + last_p = client.preserved[target][-1] + client.add_to_history(target, f"[You]: {last_p} (Preserved)") + +def status_checker_thread(): + while client.loop_running: + if sio.connected and client.contacts: + sio.emit("check_online_statuses", list(client.contacts.keys())) + for uin in list(client.contacts.keys()): + if client.contacts[uin]["status"] == "online" and client.preserved.get(uin): + while client.preserved[uin]: + p_text = client.preserved[uin].pop(0) + audio_b = client.text_to_audio(p_text) + p_b64 = base64.b64encode(audio_b).decode('utf-8') + sio.emit("relay_packet", {"to_uin": uin, "payload": p_b64}) + client.add_to_history(uin, f"[You]: {p_text}") + client.save_config() + time.sleep(10) + +def make_layout(): + def get_sidebar_text(): + tokens = [] + for idx, (uin, info) in enumerate(client.contacts.items()): + status_str = "" + if info.get("attention"): + status_str = " [!]" + elif info.get("unread", 0) > 0: + status_str = f" ({info['unread']})" + elif info.get("status") == "online": + status_str = " *" + + content = f" {uin}{status_str}" + + if client.active_chat == uin: + style = "class:contact-active" + elif idx == client.selected_contact_idx and get_app().layout.has_focus(sidebar_window): + style = "class:contact-focused" + else: + style = "class:contact" + + tokens.extend([ + (style, f"{content}\n"), + ]) + return tokens + + def get_main_text(): + if not client.active_chat: + tokens = [("", "\n" * 2)] + for line in ASCII_ART.split("\n"): + tokens.append(("class:ascii", line + "\n")) + tokens.extend([ + ("", "\n"), + ("class:desc", "Messenger on custom protocol named\n"), + ("class:desc", "AcoustiOverSocket inspired by rtty\n"), + ("", "\n"), + ("class:help-tip", "Type /help to open project Wiki\n"), + ("", "\n" * 2), + ("class:help-tip", f"{DEV_SIGNATURE}\n") + ]) + return tokens + + tokens = [] + lines = client.history.get(client.active_chat, []) + for line in lines: + if line.startswith("[You]:") and "(Preserved)" in line: + tokens.append(("class:preserved", line + "\n")) + elif line.startswith("[SYSTEM]:"): + tokens.append(("class:system", line + "\n")) + else: + tokens.append(("", line + "\n")) + return tokens + + sidebar_control = FormattedTextControl(get_sidebar_text, focusable=True) + sidebar_window = Frame(Window(content=sidebar_control, width=25), title="chats", style="class:border") + + main_control = FormattedTextControl(get_main_text) + + def get_main_title(): + if client.active_chat: + return f"TyChat | You: {client.username} ({client.uin}) | Chat with UIN: {client.active_chat}" + return f"TyChat | You: {client.username} ({client.uin})" + + main_window = Frame(Window(content=main_control), title=get_main_title, style="class:border") + + input_field = TextArea( + height=3, + prompt="> ", + multiline=False, + wrap_lines=True + ) + + input_window = Frame(input_field, title="Type message and press Enter (/exit to quit)") + + def accept_handler(buff): + text = input_field.text.strip() + if not text: + return + + if text.lower() == "/exit": + client.loop_running = False + sio.disconnect() + get_app().exit() + return + + if text.lower() == "/help": + try: + webbrowser.open(WIKI_URL) + if client.active_chat: + client.add_to_history(client.active_chat, "[SYSTEM]: Wiki link opened in browser.") + except: + if client.active_chat: + client.add_to_history(client.active_chat, f"[SYSTEM]: Failed to open browser. Wiki: {WIKI_URL}") + input_field.text = "" + client.refresh_ui() + return + + if text.lower().startswith("/add "): + new_uin = text.split(" ", 1)[1].strip() + if new_uin and new_uin not in client.contacts: + client.contacts[new_uin] = {"status": "offline", "unread": 0, "attention": False} + client.history[new_uin] = [] + client.save_config() + input_field.text = "" + client.refresh_ui() + return + + if client.active_chat: + if len(text) > 100: + client.add_to_history(client.active_chat, f"[SYSTEM]: Message too long!") + input_field.text = "" + return + + match = re.match(r'^s/([^/]+)/([^/]*)/?$', text) + if match: + search_str, replace_str = match.groups() + lines = client.history.get(client.active_chat, []) + edited = False + for i in range(len(lines) - 1, -1, -1): + if lines[i].startswith(f"[{client.username}]:") or lines[i].startswith("[You]:"): + if search_str in lines[i]: + lines[i] = lines[i].replace(search_str, replace_str) + edited = True + break + if edited: + client.refresh_ui() + client.save_config() + input_field.text = "" + return + + if text.startswith("/alert"): + text = "ALARM! URGENT CALL!" + client.contacts[client.active_chat]["attention"] = True + + audio_b = client.text_to_audio(text) + p_b64 = base64.b64encode(audio_b).decode('utf-8') + + if client.contacts[client.active_chat]["status"] == "online": + sio.emit("relay_packet", {"to_uin": client.active_chat, "payload": p_b64}) + client.add_to_history(client.active_chat, f"[You]: {text}") + else: + if client.active_chat not in client.preserved: + client.preserved[client.active_chat] = [] + client.preserved[client.active_chat].append(text) + client.add_to_history(client.active_chat, f"[You]: {text} (Preserved)") + client.save_config() + + input_field.text = "" + + input_field.accept_handler = accept_handler + + right_side = HSplit([ + main_window, + input_window + ]) + + root_container = VSplit([ + sidebar_window, + right_side + ]) + + kb = KeyBindings() + + @kb.add('tab') + def _(event): + if event.app.layout.has_focus(input_field): + event.app.layout.focus(sidebar_window) + else: + event.app.layout.focus(input_field) + + @kb.add('up', filter=has_focus(sidebar_window)) + def _(event): + if client.contacts: + client.selected_contact_idx = (client.selected_contact_idx - 1) % len(client.contacts) + target_uin = list(client.contacts.keys())[client.selected_contact_idx] + client.active_chat = target_uin + client.contacts[target_uin]["unread"] = 0 + client.contacts[target_uin]["attention"] = False + client.refresh_ui() + + @kb.add('down', filter=has_focus(sidebar_window)) + def _(event): + if client.contacts: + client.selected_contact_idx = (client.selected_contact_idx + 1) % len(client.contacts) + target_uin = list(client.contacts.keys())[client.selected_contact_idx] + client.active_chat = target_uin + client.contacts[target_uin]["unread"] = 0 + client.contacts[target_uin]["attention"] = False + client.refresh_ui() + + @kb.add('c-c') + def _(event): + client.loop_running = False + sio.disconnect() + event.app.exit() + + return Layout(root_container, focused_element=input_field), kb + +ui_style = Style.from_dict({ + 'contact': '#ffffff', + 'contact-focused': '#00aaaa bold', + 'contact-active': '#00ff00 bold', + 'ascii': '#00ff00 bold', + 'desc': '#00ff00', + 'help-tip': '#00ffff italic', + 'preserved': '#ffff00', + 'system': '#ff0000 bold', + 'border': '#00ff00', + 'frame.border': '#00ff00', +}) + +def main(): + if os.name == 'nt': + import ctypes + try: + kernel32 = ctypes.windll.kernel32 + kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7) + except: + pass + + if not client.load_config(): + print("!" * 60) + print("WARNING:") + print("You will receive new messages only when you are online.") + print("All history is stored locally in settings.json.") + print("Anyone who knows your UIN can message you.") + print("This software uses loud sounds, adjust your client volume beforehand!") + print("!" * 60 + "\n") + + client.server_url = input("Enter server URL (e.g., http://localhost:5000): ").strip() + if not client.server_url.startswith("http://") and not client.server_url.startswith("https://"): + client.server_url = "http://" + client.server_url + + try: + sio.connect(client.server_url, transports=['websocket']) + except Exception as e: + print(f"Failed to connect to server: {e}") + return + + print("\n1. Register\n2. Login") + mode = input("> ") + username_or_uin = input("Enter Username (for reg) or UIN (for login): ").strip() + password = input("Enter password: ").strip() + + event_wait = threading.Event() + + @sio.event + def register_response(data): + if data["status"] == "success": + client.uin = data['uin'] + client.username = username_or_uin + client.password = password + print(f"\nSuccess! Your UIN: {client.uin}") + event_wait.set() + + @sio.event + def login_response(data): + if data["status"] == "success": + client.uin = username_or_uin + client.username = data["username"] + client.password = password + print(f"\nHello, {client.username}! Logged in successfully.") + event_wait.set() + + if mode == "1": + sio.emit("register", {"username": username_or_uin, "password": password}) + event_wait.wait() + else: + sio.emit("login", {"uin": username_or_uin, "password": password}) + event_wait.wait() + + if not client.uin: + print("Auth error.") + return + + client.save_config() + print("\nYou can change configurations inside settings.json.") + input("Press Enter to open TUI...") + else: + try: + sio.connect(client.server_url, transports=['websocket']) + sio.emit("login", {"uin": client.uin, "password": client.password}) + except: + pass + + threading.Thread(target=status_checker_thread, daemon=True).start() + + layout, bindings = make_layout() + + client.app = Application( + layout=layout, + key_bindings=bindings, + style=ui_style, + full_screen=True, + enable_page_navigation_bindings=True + ) + + client.app.run() + +if __name__ == "__main__": + main()