Update client.py

This commit is contained in:
lohrrrr 2026-05-19 02:19:00 +03:00
parent 5196383bc3
commit cfd13cfad8

609
client.py
View file

@ -16,28 +16,18 @@ def auto_install_deps():
__import__(import_name)
except ImportError:
missing_packages.append(pip_name)
if not missing_packages:
return
print("[Auto-Installer] Installing client dependencies:", ", ".join(missing_packages))
cmd = [sys.executable, "-m", "pip", "install"] + missing_packages
result = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
if result.returncode != 0:
stderr_output = result.stderr.decode('utf-8', errors='ignore')
if "externally-managed-environment" in stderr_output.lower():
print("\n" + "!"*50)
print("[Warning] OS blocks global package installation via pip.")
print("!"*50 + "\n")
choice = input("Use --break-system-packages flag? (y/n): ").strip().lower()
if choice == 'y':
force_cmd = cmd + ["--break-system-packages"]
force_result = subprocess.run(force_cmd)
if force_result.returncode == 0:
print("[Success] Restart the script!")
sys.exit(0)
sys.exit(1)
force_cmd = cmd + ["--break-system-packages"]
force_result = subprocess.run(force_cmd)
if force_result.returncode == 0:
sys.exit(0)
sys.exit(1)
auto_install_deps()
@ -54,13 +44,15 @@ import socketio
from prompt_toolkit.application import Application
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.layout.containers import HSplit, VSplit, Window
from prompt_toolkit.layout.containers import HSplit, VSplit, Window, FloatContainer, Float, ConditionalContainer, WindowAlign
from prompt_toolkit.layout.controls import FormattedTextControl
from prompt_toolkit.layout.layout import Layout
from prompt_toolkit.widgets import TextArea, Frame
from prompt_toolkit.styles import Style
from prompt_toolkit.application.current import get_app
from prompt_toolkit.filters import has_focus
from prompt_toolkit.filters import has_focus, Condition
from prompt_toolkit.data_structures import Point
from prompt_toolkit.completion import Completer, Completion
BASE_FREQ = 600
FREQ_STEP = 25
@ -70,6 +62,11 @@ CONFIG_FILE = "settings.json"
WIKI_URL = "https://git.idkmail.ru/lohrrrr/TyChat-Client/wiki"
DEV_SIGNATURE = "For you, from IDKMail Dev Group"
FREQ_QUERY = 1200
FREQ_ALERT = 1400
FREQ_RESP_YES = 1600
FREQ_RESP_NO = 1800
ASCII_ART = r"""
_______ _____ _ _
|__ __| / ____| | | |
@ -83,6 +80,26 @@ ASCII_ART = r"""
sio = socketio.Client()
class CommandCompleter(Completer):
def __init__(self):
self.completions = {
"/add": "Добавить контакт (/add <uin>)",
"/busy": "Режим DND (Не беспокоить)",
"/query": "Мягкий запрос статуса (Уважает DND)",
"/alert": "Экстренный вызов (Пробивает DND)",
"/room create ": "Создать комнату (/room create \"имя\")",
"/room join ": "Войти в комнату (/room join UIN:\"имя\")",
"/exit": "Выход из мессенджера",
"/help": "Открыть Wiki проекта"
}
def get_completions(self, document, complete_event):
text = document.text
if text.startswith('/'):
for cmd, desc in self.completions.items():
if cmd.startswith(text):
yield Completion(cmd, start_position=-len(text), display_meta=desc)
class TyClient:
def __init__(self):
self.server_url = ""
@ -90,13 +107,21 @@ class TyClient:
self.uin = ""
self.password = ""
self.contacts = {}
self.groups = {}
self.history = {}
self.preserved = {}
self.blocklist = []
self.active_chat = None
self.app = None
self.loop_running = True
self.selected_contact_idx = 0
self.is_busy = False
self.dialing_uin = None
self.pending_requests = []
self.pending_windows = []
self.show_welcome_popup = False
def load_config(self):
if os.path.exists(CONFIG_FILE):
try:
@ -107,8 +132,10 @@ class TyClient:
self.uin = cfg.get("uin", "")
self.password = cfg.get("password", "")
self.contacts = cfg.get("contacts", {})
self.groups = cfg.get("groups", {})
self.history = cfg.get("history", {})
self.preserved = cfg.get("preserved", {})
self.blocklist = cfg.get("blocklist", [])
return True
except:
return False
@ -121,12 +148,35 @@ class TyClient:
"uin": self.uin,
"password": self.password,
"contacts": self.contacts,
"groups": self.groups,
"history": self.history,
"preserved": self.preserved
"preserved": self.preserved,
"blocklist": self.blocklist
}
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=4, ensure_ascii=False)
def play_dial_tones(self, target_uin):
self.dialing_uin = target_uin
while self.dialing_uin == target_uin and self.loop_running:
t = np.linspace(0, 1.0, int(SAMPLE_RATE * 1.0), False)
tone = np.sin(425 * t * 2 * np.pi) * 0.5
sd.play(tone, SAMPLE_RATE)
sd.wait()
for _ in range(30):
if self.dialing_uin != target_uin or not self.loop_running:
break
time.sleep(0.1)
def generate_service_tone(self, frequency, duration=0.4):
t = np.linspace(0, duration, int(SAMPLE_RATE * duration), False)
tone = np.sin(frequency * t * 2 * np.pi) * 0.7
envelope = np.ones_like(tone)
fade_len = int(SAMPLE_RATE * 0.05)
envelope[:fade_len] = np.linspace(0, 1, fade_len)
envelope[-fade_len:] = np.linspace(1, 0, fade_len)
return (tone * envelope).astype(np.float32).tobytes()
def text_to_audio(self, text):
audio_signals = []
t_start = np.linspace(0, 0.3, int(SAMPLE_RATE * 0.3), False)
@ -140,6 +190,40 @@ class TyClient:
full_audio = full_audio / np.max(np.abs(full_audio))
return full_audio.tobytes()
def detect_service_tone(self, audio_data):
try:
window_data = np.hanning(len(audio_data))
fft_data = np.abs(np.fft.rfft(audio_data * window_data))
frequencies = np.fft.rfftfreq(len(audio_data), d=1/SAMPLE_RATE)
detected_freq = frequencies[np.argmax(fft_data)]
for target in [FREQ_QUERY, FREQ_ALERT, FREQ_RESP_YES, FREQ_RESP_NO]:
if abs(detected_freq - target) <= 15:
return target
return None
except:
return None
def fast_decode(self, audio_data):
try:
samples_per_tone = int(SAMPLE_RATE * TONE_DURATION)
current_sample = int(SAMPLE_RATE * 0.3)
text = ""
while current_sample < len(audio_data):
chunk = audio_data[current_sample : current_sample + samples_per_tone]
if len(chunk) < samples_per_tone:
break
window_data = np.hanning(len(chunk))
fft_data = np.abs(np.fft.rfft(chunk * window_data))
frequencies = np.fft.rfftfreq(len(chunk), d=1/SAMPLE_RATE)
detected_freq = frequencies[np.argmax(fft_data)]
ascii_code = int(round((detected_freq - BASE_FREQ) / FREQ_STEP))
if 0 <= ascii_code < 65535:
text += chr(ascii_code)
current_sample += samples_per_tone
return text
except:
return ""
def play_and_decode(self, audio_bytes, sender_uin):
audio_data = np.frombuffer(audio_bytes, dtype=np.float32)
sd.play(audio_data, SAMPLE_RATE)
@ -155,13 +239,12 @@ class TyClient:
self.history[sender_uin].append(msg_buffer)
self.refresh_ui()
full_text_received = ""
while current_sample < len(audio_data) and self.loop_running:
chunk = audio_data[current_sample : current_sample + samples_per_tone]
if len(chunk) < samples_per_tone:
break
window = np.hanning(len(chunk))
fft_data = np.abs(np.fft.rfft(chunk * window))
window_data = np.hanning(len(chunk))
fft_data = np.abs(np.fft.rfft(chunk * window_data))
frequencies = np.fft.rfftfreq(len(chunk), d=1/SAMPLE_RATE)
detected_freq = frequencies[np.argmax(fft_data)]
ascii_code = int(round((detected_freq - BASE_FREQ) / FREQ_STEP))
@ -169,17 +252,11 @@ class TyClient:
try:
char = chr(ascii_code)
self.history[sender_uin][-1] += char
full_text_received += char
self.refresh_ui()
except:
pass
time.sleep(TONE_DURATION)
current_sample += samples_per_tone
if "ALARM! URGENT CALL!" in full_text_received:
client.contacts[sender_uin]["attention"] = True
self.refresh_ui()
sd.wait()
self.save_config()
@ -190,6 +267,14 @@ class TyClient:
self.refresh_ui()
self.save_config()
def send_sys_packet(self, to_uin, cmd):
audio_b = self.text_to_audio(f"SYS:{cmd}")
sio.emit("relay_packet", {"to_uin": to_uin, "payload": base64.b64encode(audio_b).decode('utf-8')})
def send_service_tone(self, to_uin, freq):
audio_b = self.generate_service_tone(freq)
sio.emit("relay_packet", {"to_uin": to_uin, "payload": base64.b64encode(audio_b).decode('utf-8')})
def refresh_ui(self):
if self.app:
self.app.invalidate()
@ -202,12 +287,75 @@ def incoming_packet(data):
payload_base64 = data["payload"]
try:
audio_bytes = base64.b64decode(payload_base64.encode('utf-8'))
audio_data = np.frombuffer(audio_bytes, dtype=np.float32)
srv_tone = client.detect_service_tone(audio_data)
if srv_tone:
sd.play(audio_data, SAMPLE_RATE)
if from_uin not in client.contacts:
return
if srv_tone == FREQ_QUERY:
if client.is_busy:
client.send_service_tone(from_uin, FREQ_RESP_YES)
else:
client.pending_windows.append({"type": "query", "uin": from_uin})
client.refresh_ui()
return
elif srv_tone == FREQ_ALERT:
client.pending_windows.append({"type": "alert", "uin": from_uin})
client.refresh_ui()
return
elif srv_tone == FREQ_RESP_YES:
client.add_to_history(from_uin, f"[SYSTEM]: Quick status answer -> Yes, I'm busy")
return
elif srv_tone == FREQ_RESP_NO:
client.add_to_history(from_uin, f"[SYSTEM]: Quick status answer -> No, go on")
return
fast_text = client.fast_decode(audio_data)
if fast_text.startswith("SYS:"):
cmd = fast_text[4:]
if cmd.startswith("ROOM_MSG:"):
parts = cmd.split(":", 3)
if len(parts) >= 4:
room_id = parts[1]
sender_uin = parts[2]
actual_msg = parts[3]
if room_id in client.groups:
client.add_to_history(room_id, f"[UIN {sender_uin}]: {actual_msg}")
return
elif cmd == "REQ_ADD":
if from_uin in client.blocklist:
client.send_sys_packet(from_uin, "RES_DEC")
elif client.is_busy:
client.send_sys_packet(from_uin, "RES_BSY")
else:
if from_uin not in client.pending_requests and from_uin not in client.contacts:
client.pending_requests.append(from_uin)
client.refresh_ui()
return
elif cmd == "RES_ACC":
if client.dialing_uin == from_uin:
client.dialing_uin = None
if from_uin not in client.contacts:
client.contacts[from_uin] = {"status": "online", "unread": 0, "attention": False}
client.history[from_uin] = ["[SYSTEM]: Handshake accepted. Contact added."]
client.save_config()
client.active_chat = from_uin
client.refresh_ui()
return
elif cmd == "RES_DEC" or cmd == "RES_BSY":
if client.dialing_uin == from_uin:
client.dialing_uin = None
return
if from_uin not in client.contacts:
client.contacts[from_uin] = {"status": "offline", "unread": 0, "attention": False}
return
if client.active_chat != from_uin:
client.contacts[from_uin]["unread"] += 1
threading.Thread(target=client.play_and_decode, args=(audio_bytes, from_uin), daemon=True).start()
except:
except Exception as e:
pass
@sio.event
@ -222,13 +370,11 @@ def error(data):
msg = data.get("message")
target = data.get("target_uin")
if msg == "offline" and target:
if client.dialing_uin == target:
client.dialing_uin = None
if target in client.contacts:
client.contacts[target]["status"] = "offline"
client.add_to_history(target, f"[SYSTEM]: They went offline. Everything that you will send now will be sent if they will back online while your client opened.")
if target in client.preserved:
if len(client.preserved[target]) > 0:
last_p = client.preserved[target][-1]
client.add_to_history(target, f"[You]: {last_p} (Preserved)")
client.add_to_history(target, f"[SYSTEM]: They went offline. Outgoing stack will be preserved.")
def status_checker_thread():
while client.loop_running:
@ -250,25 +396,20 @@ def make_layout():
tokens = []
for idx, (uin, info) in enumerate(client.contacts.items()):
status_str = ""
if info.get("attention"):
status_str = " [!]"
elif info.get("unread", 0) > 0:
status_str = f" ({info['unread']})"
elif info.get("status") == "online":
status_str = " *"
if info.get("attention"): status_str = " [!]"
elif info.get("unread", 0) > 0: status_str = f" ({info['unread']})"
elif info.get("status") == "online": status_str = " *"
content = f" {uin}{status_str}"
if client.active_chat == uin: style = "class:contact-active"
elif idx == client.selected_contact_idx and get_app().layout.has_focus(sidebar_window): style = "class:contact-focused"
else: style = "class:contact"
tokens.extend([(style, f"{content}\n")])
if client.active_chat == uin:
style = "class:contact-active"
elif idx == client.selected_contact_idx and get_app().layout.has_focus(sidebar_window):
style = "class:contact-focused"
else:
style = "class:contact"
tokens.extend([
(style, f"{content}\n"),
])
for g_id, g_info in client.groups.items():
content = f" [G] {g_info['title']}"
if client.active_chat == g_id: style = "class:contact-active"
else: style = "class:contact"
tokens.extend([(style, f"{content}\n")])
return tokens
def get_main_text():
@ -281,10 +422,15 @@ def make_layout():
("class:desc", "Messenger on custom protocol named\n"),
("class:desc", "AcoustiOverSocket inspired by rtty\n"),
("", "\n"),
("class:help-tip", "Type /help to open project Wiki\n"),
("class:help-tip", "Type / to trigger interactive command menu\n"),
("", "\n" * 2),
("class:help-tip", f"{DEV_SIGNATURE}\n")
])
if client.dialing_uin:
tokens.extend([
("", "\n" * 2),
("class:system", f"Dialing UIN {client.dialing_uin}... Waiting for accept.\n")
])
return tokens
tokens = []
@ -298,31 +444,36 @@ def make_layout():
tokens.append(("", line + "\n"))
return tokens
def get_cursor_pos():
text = "".join(t[1] for t in get_main_text())
newlines = text.count('\n')
return Point(0, max(0, newlines - 1))
sidebar_control = FormattedTextControl(get_sidebar_text, focusable=True)
sidebar_window = Frame(Window(content=sidebar_control, width=25), title="chats", style="class:border")
main_control = FormattedTextControl(get_main_text)
main_control = FormattedTextControl(get_main_text, get_cursor_position=get_cursor_pos)
def get_main_title():
if client.active_chat:
return f"TyChat | You: {client.username} ({client.uin}) | Chat with UIN: {client.active_chat}"
return f"TyChat | You: {client.username} ({client.uin})"
base_title = f"TyChat | You: {client.username} ({client.uin})"
if client.is_busy: base_title += " [DND/BUSY]"
if client.active_chat: return f"{base_title} | Chat: {client.active_chat}"
return base_title
main_window = Frame(Window(content=main_control), title=get_main_title, style="class:border")
main_window = Frame(Window(content=main_control, wrap_lines=True), title=get_main_title, style="class:border")
input_field = TextArea(
height=3,
prompt="> ",
multiline=False,
wrap_lines=True
wrap_lines=True,
completer=CommandCompleter(),
complete_while_typing=True
)
input_window = Frame(input_field, title="Type message and press Enter (/exit to quit)")
input_window = Frame(input_field, title="Input")
def accept_handler(buff):
text = input_field.text.strip()
if not text:
return
if not text: return
if text.lower() == "/exit":
client.loop_running = False
@ -331,110 +482,256 @@ def make_layout():
return
if text.lower() == "/help":
try:
webbrowser.open(WIKI_URL)
if client.active_chat:
client.add_to_history(client.active_chat, "[SYSTEM]: Wiki link opened in browser.")
except:
if client.active_chat:
client.add_to_history(client.active_chat, f"[SYSTEM]: Failed to open browser. Wiki: {WIKI_URL}")
try: webbrowser.open(WIKI_URL)
except: pass
input_field.text = ""
return
if text.lower() == "/busy":
client.is_busy = not client.is_busy
if client.active_chat:
status_msg = "enabled" if client.is_busy else "disabled"
client.add_to_history(client.active_chat, f"[SYSTEM]: DND Mode {status_msg}.")
input_field.text = ""
client.refresh_ui()
return
if text.lower().startswith("/add "):
new_uin = text.split(" ", 1)[1].strip()
if new_uin and new_uin not in client.contacts:
client.contacts[new_uin] = {"status": "offline", "unread": 0, "attention": False}
client.history[new_uin] = []
client.save_config()
if not new_uin: return
if new_uin in client.contacts:
client.active_chat = new_uin
input_field.text = ""
client.refresh_ui()
return
client.active_chat = None
threading.Thread(target=client.play_dial_tones, args=(new_uin,), daemon=True).start()
client.send_sys_packet(new_uin, "REQ_ADD")
input_field.text = ""
client.refresh_ui()
return
if text.lower().startswith("/room create "):
match = re.match(r'^/room create "([^"]+)"$', text, re.IGNORECASE)
if match:
r_name = match.group(1)
r_id = f"ROOM:{client.uin}:{r_name}"
client.groups[r_id] = {"title": r_name, "owner": client.uin, "members": [client.uin]}
client.history[r_id] = [f"[SYSTEM]: Room \"{r_name}\" created successfully. Local GUID: {r_id}"]
client.active_chat = r_id
client.save_config()
client.refresh_ui()
input_field.text = ""
return
if text.lower().startswith("/room join "):
match = re.match(r'^/room join ([0-9]+):"([^"]+)"$', text, re.IGNORECASE)
if match:
r_owner = match.group(1)
r_name = match.group(2)
r_id = f"ROOM:{r_owner}:{r_name}"
if r_owner not in client.contacts and r_owner != client.uin:
client.contacts[r_owner] = {"status": "offline", "unread": 0, "attention": False}
client.groups[r_id] = {"title": r_name, "owner": r_owner, "members": list(set([client.uin, r_owner]))}
client.history[r_id] = [f"[SYSTEM]: Joined room \"{r_name}\". Local GUID: {r_id}"]
client.active_chat = r_id
client.save_config()
client.refresh_ui()
client.send_sys_packet(r_owner, f"REQ_JOIN_ROOM:{r_name}")
input_field.text = ""
return
if client.active_chat:
if len(text) > 100:
if client.active_chat.startswith("ROOM:"):
group = client.groups[client.active_chat]
owner_uin = group["owner"]
if owner_uin != client.uin and client.contacts.get(owner_uin, {}).get("status") != "online":
client.add_to_history(client.active_chat, "[SYSTEM]: CANNOT SEND. Owner of this room is offline for sync!")
input_field.text = ""
return
audio_b = client.text_to_audio(f"SYS:ROOM_MSG:{client.active_chat}:{client.uin}:{text}")
p_b64 = base64.b64encode(audio_b).decode('utf-8')
if owner_uin == client.uin:
for m in group["members"]:
if m != client.uin and client.contacts.get(m, {}).get("status") == "online":
sio.emit("relay_packet", {"to_uin": m, "payload": p_b64})
else:
sio.emit("relay_packet", {"to_uin": owner_uin, "payload": p_b64})
client.add_to_history(client.active_chat, f"[You]: {text}")
input_field.text = ""
return
if text.lower() == "/query":
client.send_service_tone(client.active_chat, FREQ_QUERY)
client.add_to_history(client.active_chat, "[SYSTEM]: Sent friendly status query...")
input_field.text = ""
return
if text.lower() == "/alert":
client.send_service_tone(client.active_chat, FREQ_ALERT)
client.add_to_history(client.active_chat, "[SYSTEM]: Sent URGENT alert signal (breaks DND)...")
input_field.text = ""
return
if len(text) > 300:
client.add_to_history(client.active_chat, f"[SYSTEM]: Message too long!")
input_field.text = ""
return
match = re.match(r'^s/([^/]+)/([^/]*)/?$', text)
if match:
search_str, replace_str = match.groups()
lines = client.history.get(client.active_chat, [])
edited = False
for i in range(len(lines) - 1, -1, -1):
if lines[i].startswith(f"[{client.username}]:") or lines[i].startswith("[You]:"):
if search_str in lines[i]:
lines[i] = lines[i].replace(search_str, replace_str)
edited = True
break
if edited:
client.refresh_ui()
client.save_config()
input_field.text = ""
return
if text.startswith("/alert"):
text = "ALARM! URGENT CALL!"
client.contacts[client.active_chat]["attention"] = True
audio_b = client.text_to_audio(text)
p_b64 = base64.b64encode(audio_b).decode('utf-8')
if client.contacts[client.active_chat]["status"] == "online":
sio.emit("relay_packet", {"to_uin": client.active_chat, "payload": p_b64})
client.add_to_history(client.active_chat, f"[You]: {text}")
else:
if client.active_chat not in client.preserved:
client.preserved[client.active_chat] = []
if client.active_chat not in client.preserved: client.preserved[client.active_chat] = []
client.preserved[client.active_chat].append(text)
client.add_to_history(client.active_chat, f"[You]: {text} (Preserved)")
client.save_config()
input_field.text = ""
input_field.accept_handler = accept_handler
right_side = HSplit([main_window, input_window])
right_side = HSplit([
main_window,
input_window
])
def get_popup_text():
if not client.pending_requests: return []
req_uin = client.pending_requests[0]
return [
("class:popup-title", f" Incoming Handshake Request \n"),
("class:popup-title", f" UIN: {req_uin} \n\n"),
("", " Do you want to add them to contacts?\n\n"),
("class:popup-keys", " [Y]es [N]o N[e]ver ")
]
popup_window = Frame(Window(FormattedTextControl(get_popup_text), align=WindowAlign.CENTER, width=42, height=6), style="class:popup-border")
root_container = VSplit([
sidebar_window,
right_side
])
def get_signal_popup_text():
if not client.pending_windows: return []
win_info = client.pending_windows[0]
w_type = win_info["type"].upper()
w_uin = win_info["uin"]
title_style = "class:alert-title" if w_type == "ALERT" else "class:query-title"
desc = "CRITICAL BREAK-IN! Are you busy?" if w_type == "ALERT" else "Friendly status query: Are you busy?"
return [
(title_style, f" *** INCOMING {w_type} SIGNAL *** \n"),
("class:popup-title", f" From UIN: {w_uin} \n\n"),
("", f" {desc}\n\n"),
("class:popup-keys", " [Y]es, I'm busy [N]o, go on ")
]
signal_popup_window = Frame(Window(FormattedTextControl(get_signal_popup_text), align=WindowAlign.CENTER, width=50, height=6), style="class:border")
def get_welcome_popup_text():
return [
("class:query-title", " *** ДОБРО ПОЖАЛОВАТЬ В TYCHAT! *** \n\n"),
("", " Привет! Вы только что зарегистрировались.\n"),
("", " Хотите автоматически присоединиться\n"),
("", " к всеобщей группе?\n\n"),
("class:popup-keys", " [Y]Да, зайти в Dnishe [N]Нет, я сам ")
]
welcome_popup_window = Frame(Window(FormattedTextControl(get_welcome_popup_text), align=WindowAlign.CENTER, width=48, height=7), style="class:border")
@Condition
def has_pending_request(): return len(client.pending_requests) > 0 and not client.show_welcome_popup
@Condition
def has_pending_window(): return len(client.pending_windows) > 0 and len(client.pending_requests) == 0 and not client.show_welcome_popup
@Condition
def has_welcome_popup(): return client.show_welcome_popup
root_container = FloatContainer(
content=VSplit([sidebar_window, right_side]),
floats=[
Float(content=ConditionalContainer(content=popup_window, filter=has_pending_request), transparent=False),
Float(content=ConditionalContainer(content=signal_popup_window, filter=has_pending_window), transparent=False),
Float(content=ConditionalContainer(content=welcome_popup_window, filter=has_welcome_popup), transparent=False)
]
)
kb = KeyBindings()
@kb.add('tab')
@kb.add('tab', filter=~has_pending_request & ~has_pending_window & ~has_welcome_popup)
def _(event):
if event.app.layout.has_focus(input_field):
event.app.layout.focus(sidebar_window)
else:
event.app.layout.focus(input_field)
if event.app.layout.has_focus(input_field): event.app.layout.focus(sidebar_window)
else: event.app.layout.focus(input_field)
@kb.add('up', filter=has_focus(sidebar_window))
@kb.add('up', filter=has_focus(sidebar_window) & ~has_pending_request & ~has_pending_window & ~has_welcome_popup)
def _(event):
if client.contacts:
client.selected_contact_idx = (client.selected_contact_idx - 1) % len(client.contacts)
target_uin = list(client.contacts.keys())[client.selected_contact_idx]
client.active_chat = target_uin
client.contacts[target_uin]["unread"] = 0
client.contacts[target_uin]["attention"] = False
all_chats = list(client.contacts.keys()) + list(client.groups.keys())
if all_chats:
client.selected_contact_idx = (client.selected_contact_idx - 1) % len(all_chats)
target = all_chats[client.selected_contact_idx]
client.active_chat = target
if target in client.contacts:
client.contacts[target]["unread"] = 0
client.contacts[target]["attention"] = False
client.refresh_ui()
@kb.add('down', filter=has_focus(sidebar_window))
@kb.add('down', filter=has_focus(sidebar_window) & ~has_pending_request & ~has_pending_window & ~has_welcome_popup)
def _(event):
if client.contacts:
client.selected_contact_idx = (client.selected_contact_idx + 1) % len(client.contacts)
target_uin = list(client.contacts.keys())[client.selected_contact_idx]
client.active_chat = target_uin
client.contacts[target_uin]["unread"] = 0
client.contacts[target_uin]["attention"] = False
all_chats = list(client.contacts.keys()) + list(client.groups.keys())
if all_chats:
client.selected_contact_idx = (client.selected_contact_idx + 1) % len(all_chats)
target = all_chats[client.selected_contact_idx]
client.active_chat = target
if target in client.contacts:
client.contacts[target]["unread"] = 0
client.contacts[target]["attention"] = False
client.refresh_ui()
@kb.add('y', filter=has_welcome_popup)
@kb.add('Y', filter=has_welcome_popup)
def _(event):
client.show_welcome_popup = False
r_owner = "716041"
r_name = "Dnishe"
r_id = f"ROOM:{r_owner}:{r_name}"
client.contacts[r_owner] = {"status": "offline", "unread": 0, "attention": False}
client.groups[r_id] = {"title": r_name, "owner": r_owner, "members": [client.uin, r_owner]}
client.history[r_id] = [f"[SYSTEM]: Автоподключение! Комната \"{r_name}\". GUID: {r_id}"]
client.active_chat = r_id
client.save_config()
client.send_sys_packet(r_owner, f"REQ_JOIN_ROOM:{r_name}")
client.refresh_ui()
@kb.add('n', filter=has_welcome_popup)
@kb.add('N', filter=has_welcome_popup)
def _(event):
client.show_welcome_popup = False
client.refresh_ui()
@kb.add('y', filter=has_pending_window)
@kb.add('Y', filter=has_pending_window)
def _(event):
win_info = client.pending_windows.pop(0)
target_uin = win_info["uin"]
client.is_busy = True
client.send_service_tone(target_uin, FREQ_RESP_YES)
client.add_to_history(target_uin, "[You -> SYSTEM]: Sent Dial Tone -> Yes, I'm busy.")
client.refresh_ui()
@kb.add('n', filter=has_pending_window)
@kb.add('N', filter=has_pending_window)
def _(event):
win_info = client.pending_windows.pop(0)
target_uin = win_info["uin"]
client.send_service_tone(target_uin, FREQ_RESP_NO)
client.add_to_history(target_uin, "[You -> SYSTEM]: Sent Dial Tone -> No, go on.")
client.refresh_ui()
@kb.add('y', filter=has_pending_request)
@kb.add('Y', filter=has_pending_request)
def _(event):
req_uin = client.pending_requests.pop(0)
client.contacts[req_uin] = {"status": "online", "unread": 0, "attention": False}
client.history[req_uin] = ["[SYSTEM]: Handshake accepted. Contact added."]
client.active_chat = req_uin
client.save_config()
client.send_sys_packet(req_uin, "RES_ACC")
client.refresh_ui()
@kb.add('n', filter=has_pending_request)
@kb.add('N', filter=has_pending_request)
def _(event):
req_uin = client.pending_requests.pop(0)
client.send_sys_packet(req_uin, "RES_DEC")
client.refresh_ui()
@kb.add('c-c')
def _(event):
client.loop_running = False
@ -454,6 +751,13 @@ ui_style = Style.from_dict({
'system': '#ff0000 bold',
'border': '#00ff00',
'frame.border': '#00ff00',
'popup-title': '#ffffff bold',
'popup-keys': '#00ff00 bold',
'popup-border': '#ff0000 bold',
'query-title': '#00ffff bold',
'alert-title': '#ff0000 bold',
'completion-menu.completion': 'bg:#00ffff #000000',
'completion-menu.completion.current': 'bg:#00aa00 #ffffff bold'
})
def main():
@ -462,42 +766,37 @@ def main():
try:
kernel32 = ctypes.windll.kernel32
kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)
except:
pass
except: pass
is_new_registration = False
if not client.load_config():
print("!" * 60)
print("WARNING:")
print("You will receive new messages only when you are online.")
print("All history is stored locally in settings.json.")
print("Anyone who knows your UIN can message you.")
print("This software uses loud sounds, adjust your client volume beforehand!")
print("WARNING: All data is local.")
print("!" * 60 + "\n")
client.server_url = input("Enter server URL (e.g., http://localhost:5000): ").strip()
client.server_url = input("Enter server URL: ").strip()
if not client.server_url.startswith("http://") and not client.server_url.startswith("https://"):
client.server_url = "http://" + client.server_url
try:
sio.connect(client.server_url, transports=['websocket'])
except Exception as e:
print(f"Failed to connect to server: {e}")
return
try: sio.connect(client.server_url, transports=['websocket'])
except Exception as e: return
print("\n1. Register\n2. Login")
mode = input("> ")
username_or_uin = input("Enter Username (for reg) or UIN (for login): ").strip()
password = input("Enter password: ").strip()
username_or_uin = input("UIN/Username: ").strip()
password = input("Password: ").strip()
event_wait = threading.Event()
@sio.event
def register_response(data):
nonlocal is_new_registration
if data["status"] == "success":
client.uin = data['uin']
client.username = username_or_uin
client.password = password
print(f"\nSuccess! Your UIN: {client.uin}")
is_new_registration = True
event_wait.set()
@sio.event
@ -506,32 +805,25 @@ def main():
client.uin = username_or_uin
client.username = data["username"]
client.password = password
print(f"\nHello, {client.username}! Logged in successfully.")
event_wait.set()
if mode == "1":
sio.emit("register", {"username": username_or_uin, "password": password})
event_wait.wait()
else:
sio.emit("login", {"uin": username_or_uin, "password": password})
event_wait.wait()
if not client.uin:
print("Auth error.")
return
if mode == "1": sio.emit("register", {"username": username_or_uin, "password": password})
else: sio.emit("login", {"uin": username_or_uin, "password": password})
event_wait.wait()
if not client.uin: return
client.save_config()
print("\nYou can change configurations inside settings.json.")
input("Press Enter to open TUI...")
else:
try:
sio.connect(client.server_url, transports=['websocket'])
sio.emit("login", {"uin": client.uin, "password": client.password})
except:
pass
except: pass
if is_new_registration:
client.show_welcome_popup = True
threading.Thread(target=status_checker_thread, daemon=True).start()
layout, bindings = make_layout()
client.app = Application(
@ -541,7 +833,6 @@ def main():
full_screen=True,
enable_page_navigation_bindings=True
)
client.app.run()
if __name__ == "__main__":