This repository has been archived on 2026-05-21. You can view files and clone it, but cannot push or open issues or pull requests.
TyChat-TUI/client.py
2026-05-19 00:01:34 +03:00

549 lines
20 KiB
Python

import sys
import subprocess
REQUIRED_PACKAGES = {
"python-socketio[client]": "socketio",
"websocket-client": "websocket",
"numpy": "numpy",
"sounddevice": "sounddevice",
"prompt_toolkit": "prompt_toolkit"
}
def auto_install_deps():
missing_packages = []
for pip_name, import_name in REQUIRED_PACKAGES.items():
try:
__import__(import_name)
except ImportError:
missing_packages.append(pip_name)
if not missing_packages:
return
print("[Auto-Installer] Installing client dependencies:", ", ".join(missing_packages))
cmd = [sys.executable, "-m", "pip", "install"] + missing_packages
result = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
if result.returncode != 0:
stderr_output = result.stderr.decode('utf-8', errors='ignore')
if "externally-managed-environment" in stderr_output.lower():
print("\n" + "!"*50)
print("[Warning] OS blocks global package installation via pip.")
print("!"*50 + "\n")
choice = input("Use --break-system-packages flag? (y/n): ").strip().lower()
if choice == 'y':
force_cmd = cmd + ["--break-system-packages"]
force_result = subprocess.run(force_cmd)
if force_result.returncode == 0:
print("[Success] Restart the script!")
sys.exit(0)
sys.exit(1)
auto_install_deps()
import os
import json
import time
import base64
import re
import threading
import webbrowser
import numpy as np
import sounddevice as sd
import socketio
from prompt_toolkit.application import Application
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.layout.containers import HSplit, VSplit, Window
from prompt_toolkit.layout.controls import FormattedTextControl
from prompt_toolkit.layout.layout import Layout
from prompt_toolkit.widgets import TextArea, Frame
from prompt_toolkit.styles import Style
from prompt_toolkit.application.current import get_app
from prompt_toolkit.filters import has_focus
BASE_FREQ = 600
FREQ_STEP = 25
TONE_DURATION = 0.1
SAMPLE_RATE = 44100
CONFIG_FILE = "settings.json"
WIKI_URL = "https://git.idkmail.ru/lohrrrr/TyChat-Client/wiki"
DEV_SIGNATURE = "For you, from IDKMail Dev Group"
ASCII_ART = r"""
_______ _____ _ _
|__ __| / ____| | | |
| |_ _| | | |__ __ _| |_
| | | | | | | '_ \ / _` | __|
| | |_| | |____| | | | (_| | |_
|_|\__, |\_____|_| |_|\__,_|\__|
__/ |
|___/
"""
sio = socketio.Client()
class TyClient:
def __init__(self):
self.server_url = ""
self.username = ""
self.uin = ""
self.password = ""
self.contacts = {}
self.history = {}
self.preserved = {}
self.active_chat = None
self.app = None
self.loop_running = True
self.selected_contact_idx = 0
def load_config(self):
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
cfg = json.load(f)
self.server_url = cfg.get("server_url", "")
self.username = cfg.get("username", "")
self.uin = cfg.get("uin", "")
self.password = cfg.get("password", "")
self.contacts = cfg.get("contacts", {})
self.history = cfg.get("history", {})
self.preserved = cfg.get("preserved", {})
return True
except:
return False
return False
def save_config(self):
cfg = {
"server_url": self.server_url,
"username": self.username,
"uin": self.uin,
"password": self.password,
"contacts": self.contacts,
"history": self.history,
"preserved": self.preserved
}
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=4, ensure_ascii=False)
def text_to_audio(self, text):
audio_signals = []
t_start = np.linspace(0, 0.3, int(SAMPLE_RATE * 0.3), False)
audio_signals.append(np.sin(1000 * t_start * 2 * np.pi))
for char in text:
freq = BASE_FREQ + ord(char) * FREQ_STEP
t = np.linspace(0, TONE_DURATION, int(SAMPLE_RATE * TONE_DURATION), False)
audio_signals.append(np.sin(freq * t * 2 * np.pi))
full_audio = np.concatenate(audio_signals).astype(np.float32)
if np.max(np.abs(full_audio)) > 0:
full_audio = full_audio / np.max(np.abs(full_audio))
return full_audio.tobytes()
def play_and_decode(self, audio_bytes, sender_uin):
audio_data = np.frombuffer(audio_bytes, dtype=np.float32)
sd.play(audio_data, SAMPLE_RATE)
time.sleep(0.3)
samples_per_tone = int(SAMPLE_RATE * TONE_DURATION)
current_sample = int(SAMPLE_RATE * 0.3)
sender_name = self.username if sender_uin == self.uin else f"UIN {sender_uin}"
msg_buffer = f"[{sender_name}]: "
if sender_uin not in self.history:
self.history[sender_uin] = []
self.history[sender_uin].append(msg_buffer)
self.refresh_ui()
full_text_received = ""
while current_sample < len(audio_data) and self.loop_running:
chunk = audio_data[current_sample : current_sample + samples_per_tone]
if len(chunk) < samples_per_tone:
break
window = np.hanning(len(chunk))
fft_data = np.abs(np.fft.rfft(chunk * window))
frequencies = np.fft.rfftfreq(len(chunk), d=1/SAMPLE_RATE)
detected_freq = frequencies[np.argmax(fft_data)]
ascii_code = int(round((detected_freq - BASE_FREQ) / FREQ_STEP))
if 0 <= ascii_code < 65535:
try:
char = chr(ascii_code)
self.history[sender_uin][-1] += char
full_text_received += char
self.refresh_ui()
except:
pass
time.sleep(TONE_DURATION)
current_sample += samples_per_tone
if "ALARM! URGENT CALL!" in full_text_received:
client.contacts[sender_uin]["attention"] = True
self.refresh_ui()
sd.wait()
self.save_config()
def add_to_history(self, target_uin, line):
if target_uin not in self.history:
self.history[target_uin] = []
self.history[target_uin].append(line)
self.refresh_ui()
self.save_config()
def refresh_ui(self):
if self.app:
self.app.invalidate()
client = TyClient()
@sio.event
def incoming_packet(data):
from_uin = data["from_uin"]
payload_base64 = data["payload"]
try:
audio_bytes = base64.b64decode(payload_base64.encode('utf-8'))
if from_uin not in client.contacts:
client.contacts[from_uin] = {"status": "offline", "unread": 0, "attention": False}
if client.active_chat != from_uin:
client.contacts[from_uin]["unread"] += 1
threading.Thread(target=client.play_and_decode, args=(audio_bytes, from_uin), daemon=True).start()
except:
pass
@sio.event
def online_statuses_response(data):
for uin, status in data.items():
if uin in client.contacts:
client.contacts[uin]["status"] = status
client.refresh_ui()
@sio.event
def error(data):
msg = data.get("message")
target = data.get("target_uin")
if msg == "offline" and target:
if target in client.contacts:
client.contacts[target]["status"] = "offline"
client.add_to_history(target, f"[SYSTEM]: They went offline. Everything that you will send now will be sent if they will back online while your client opened.")
if target in client.preserved:
if len(client.preserved[target]) > 0:
last_p = client.preserved[target][-1]
client.add_to_history(target, f"[You]: {last_p} (Preserved)")
def status_checker_thread():
while client.loop_running:
if sio.connected and client.contacts:
sio.emit("check_online_statuses", list(client.contacts.keys()))
for uin in list(client.contacts.keys()):
if client.contacts[uin]["status"] == "online" and client.preserved.get(uin):
while client.preserved[uin]:
p_text = client.preserved[uin].pop(0)
audio_b = client.text_to_audio(p_text)
p_b64 = base64.b64encode(audio_b).decode('utf-8')
sio.emit("relay_packet", {"to_uin": uin, "payload": p_b64})
client.add_to_history(uin, f"[You]: {p_text}")
client.save_config()
time.sleep(10)
def make_layout():
def get_sidebar_text():
tokens = []
for idx, (uin, info) in enumerate(client.contacts.items()):
status_str = ""
if info.get("attention"):
status_str = " [!]"
elif info.get("unread", 0) > 0:
status_str = f" ({info['unread']})"
elif info.get("status") == "online":
status_str = " *"
content = f" {uin}{status_str}"
if client.active_chat == uin:
style = "class:contact-active"
elif idx == client.selected_contact_idx and get_app().layout.has_focus(sidebar_window):
style = "class:contact-focused"
else:
style = "class:contact"
tokens.extend([
(style, f"{content}\n"),
])
return tokens
def get_main_text():
if not client.active_chat:
tokens = [("", "\n" * 2)]
for line in ASCII_ART.split("\n"):
tokens.append(("class:ascii", line + "\n"))
tokens.extend([
("", "\n"),
("class:desc", "Messenger on custom protocol named\n"),
("class:desc", "AcoustiOverSocket inspired by rtty\n"),
("", "\n"),
("class:help-tip", "Type /help to open project Wiki\n"),
("", "\n" * 2),
("class:help-tip", f"{DEV_SIGNATURE}\n")
])
return tokens
tokens = []
lines = client.history.get(client.active_chat, [])
for line in lines:
if line.startswith("[You]:") and "(Preserved)" in line:
tokens.append(("class:preserved", line + "\n"))
elif line.startswith("[SYSTEM]:"):
tokens.append(("class:system", line + "\n"))
else:
tokens.append(("", line + "\n"))
return tokens
sidebar_control = FormattedTextControl(get_sidebar_text, focusable=True)
sidebar_window = Frame(Window(content=sidebar_control, width=25), title="chats", style="class:border")
main_control = FormattedTextControl(get_main_text)
def get_main_title():
if client.active_chat:
return f"TyChat | You: {client.username} ({client.uin}) | Chat with UIN: {client.active_chat}"
return f"TyChat | You: {client.username} ({client.uin})"
main_window = Frame(Window(content=main_control), title=get_main_title, style="class:border")
input_field = TextArea(
height=3,
prompt="> ",
multiline=False,
wrap_lines=True
)
input_window = Frame(input_field, title="Type message and press Enter (/exit to quit)")
def accept_handler(buff):
text = input_field.text.strip()
if not text:
return
if text.lower() == "/exit":
client.loop_running = False
sio.disconnect()
get_app().exit()
return
if text.lower() == "/help":
try:
webbrowser.open(WIKI_URL)
if client.active_chat:
client.add_to_history(client.active_chat, "[SYSTEM]: Wiki link opened in browser.")
except:
if client.active_chat:
client.add_to_history(client.active_chat, f"[SYSTEM]: Failed to open browser. Wiki: {WIKI_URL}")
input_field.text = ""
client.refresh_ui()
return
if text.lower().startswith("/add "):
new_uin = text.split(" ", 1)[1].strip()
if new_uin and new_uin not in client.contacts:
client.contacts[new_uin] = {"status": "offline", "unread": 0, "attention": False}
client.history[new_uin] = []
client.save_config()
input_field.text = ""
client.refresh_ui()
return
if client.active_chat:
if len(text) > 100:
client.add_to_history(client.active_chat, f"[SYSTEM]: Message too long!")
input_field.text = ""
return
match = re.match(r'^s/([^/]+)/([^/]*)/?$', text)
if match:
search_str, replace_str = match.groups()
lines = client.history.get(client.active_chat, [])
edited = False
for i in range(len(lines) - 1, -1, -1):
if lines[i].startswith(f"[{client.username}]:") or lines[i].startswith("[You]:"):
if search_str in lines[i]:
lines[i] = lines[i].replace(search_str, replace_str)
edited = True
break
if edited:
client.refresh_ui()
client.save_config()
input_field.text = ""
return
if text.startswith("/alert"):
text = "ALARM! URGENT CALL!"
client.contacts[client.active_chat]["attention"] = True
audio_b = client.text_to_audio(text)
p_b64 = base64.b64encode(audio_b).decode('utf-8')
if client.contacts[client.active_chat]["status"] == "online":
sio.emit("relay_packet", {"to_uin": client.active_chat, "payload": p_b64})
client.add_to_history(client.active_chat, f"[You]: {text}")
else:
if client.active_chat not in client.preserved:
client.preserved[client.active_chat] = []
client.preserved[client.active_chat].append(text)
client.add_to_history(client.active_chat, f"[You]: {text} (Preserved)")
client.save_config()
input_field.text = ""
input_field.accept_handler = accept_handler
right_side = HSplit([
main_window,
input_window
])
root_container = VSplit([
sidebar_window,
right_side
])
kb = KeyBindings()
@kb.add('tab')
def _(event):
if event.app.layout.has_focus(input_field):
event.app.layout.focus(sidebar_window)
else:
event.app.layout.focus(input_field)
@kb.add('up', filter=has_focus(sidebar_window))
def _(event):
if client.contacts:
client.selected_contact_idx = (client.selected_contact_idx - 1) % len(client.contacts)
target_uin = list(client.contacts.keys())[client.selected_contact_idx]
client.active_chat = target_uin
client.contacts[target_uin]["unread"] = 0
client.contacts[target_uin]["attention"] = False
client.refresh_ui()
@kb.add('down', filter=has_focus(sidebar_window))
def _(event):
if client.contacts:
client.selected_contact_idx = (client.selected_contact_idx + 1) % len(client.contacts)
target_uin = list(client.contacts.keys())[client.selected_contact_idx]
client.active_chat = target_uin
client.contacts[target_uin]["unread"] = 0
client.contacts[target_uin]["attention"] = False
client.refresh_ui()
@kb.add('c-c')
def _(event):
client.loop_running = False
sio.disconnect()
event.app.exit()
return Layout(root_container, focused_element=input_field), kb
ui_style = Style.from_dict({
'contact': '#ffffff',
'contact-focused': '#00aaaa bold',
'contact-active': '#00ff00 bold',
'ascii': '#00ff00 bold',
'desc': '#00ff00',
'help-tip': '#00ffff italic',
'preserved': '#ffff00',
'system': '#ff0000 bold',
'border': '#00ff00',
'frame.border': '#00ff00',
})
def main():
if os.name == 'nt':
import ctypes
try:
kernel32 = ctypes.windll.kernel32
kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)
except:
pass
if not client.load_config():
print("!" * 60)
print("WARNING:")
print("You will receive new messages only when you are online.")
print("All history is stored locally in settings.json.")
print("Anyone who knows your UIN can message you.")
print("This software uses loud sounds, adjust your client volume beforehand!")
print("!" * 60 + "\n")
client.server_url = input("Enter server URL (e.g., http://localhost:5000): ").strip()
if not client.server_url.startswith("http://") and not client.server_url.startswith("https://"):
client.server_url = "http://" + client.server_url
try:
sio.connect(client.server_url, transports=['websocket'])
except Exception as e:
print(f"Failed to connect to server: {e}")
return
print("\n1. Register\n2. Login")
mode = input("> ")
username_or_uin = input("Enter Username (for reg) or UIN (for login): ").strip()
password = input("Enter password: ").strip()
event_wait = threading.Event()
@sio.event
def register_response(data):
if data["status"] == "success":
client.uin = data['uin']
client.username = username_or_uin
client.password = password
print(f"\nSuccess! Your UIN: {client.uin}")
event_wait.set()
@sio.event
def login_response(data):
if data["status"] == "success":
client.uin = username_or_uin
client.username = data["username"]
client.password = password
print(f"\nHello, {client.username}! Logged in successfully.")
event_wait.set()
if mode == "1":
sio.emit("register", {"username": username_or_uin, "password": password})
event_wait.wait()
else:
sio.emit("login", {"uin": username_or_uin, "password": password})
event_wait.wait()
if not client.uin:
print("Auth error.")
return
client.save_config()
print("\nYou can change configurations inside settings.json.")
input("Press Enter to open TUI...")
else:
try:
sio.connect(client.server_url, transports=['websocket'])
sio.emit("login", {"uin": client.uin, "password": client.password})
except:
pass
threading.Thread(target=status_checker_thread, daemon=True).start()
layout, bindings = make_layout()
client.app = Application(
layout=layout,
key_bindings=bindings,
style=ui_style,
full_screen=True,
enable_page_navigation_bindings=True
)
client.app.run()
if __name__ == "__main__":
main()