96 lines
3.9 KiB
Python
96 lines
3.9 KiB
Python
import numpy as np
|
|
|
|
BASE_FREQ = 600
|
|
FREQ_STEP = 25
|
|
TONE_DURATION = 0.1
|
|
SAMPLE_RATE = 44100
|
|
|
|
FREQ_QUERY = 1200
|
|
FREQ_ALERT = 1400
|
|
FREQ_RESP_YES = 1600
|
|
FREQ_RESP_NO = 1800
|
|
|
|
class Plugin:
|
|
def __init__(self):
|
|
self.name = "Audio Protocol"
|
|
|
|
def encode(self, text: str) -> bytes:
|
|
audio_signals = []
|
|
t_start = np.linspace(0, 0.3, int(SAMPLE_RATE * 0.3), False)
|
|
audio_signals.append(np.sin(1000 * t_start * 2 * np.pi))
|
|
for char in text:
|
|
freq = BASE_FREQ + ord(char) * FREQ_STEP
|
|
t = np.linspace(0, TONE_DURATION, int(SAMPLE_RATE * TONE_DURATION), False)
|
|
audio_signals.append(np.sin(freq * t * 2 * np.pi))
|
|
full_audio = np.concatenate(audio_signals).astype(np.float32)
|
|
if np.max(np.abs(full_audio)) > 0:
|
|
full_audio = full_audio / np.max(np.abs(full_audio))
|
|
return full_audio.tobytes()
|
|
|
|
def decode(self, data: bytes) -> str:
|
|
audio_data = np.frombuffer(data, dtype=np.float32)
|
|
samples_per_tone = int(SAMPLE_RATE * TONE_DURATION)
|
|
current_sample = int(SAMPLE_RATE * 0.3)
|
|
text = ""
|
|
while current_sample < len(audio_data):
|
|
chunk = audio_data[current_sample : current_sample + samples_per_tone]
|
|
if len(chunk) < samples_per_tone:
|
|
break
|
|
window_data = np.hanning(len(chunk))
|
|
fft_data = np.abs(np.fft.rfft(chunk * window_data))
|
|
frequencies = np.fft.rfftfreq(len(chunk), d=1/SAMPLE_RATE)
|
|
detected_freq = frequencies[np.argmax(fft_data)]
|
|
ascii_code = int(round((detected_freq - BASE_FREQ) / FREQ_STEP))
|
|
if 0 <= ascii_code < 65535:
|
|
text += chr(ascii_code)
|
|
current_sample += samples_per_tone
|
|
return text
|
|
|
|
def generate_service_signal(self, signal_type: str) -> bytes:
|
|
sig_map = {"QUERY": FREQ_QUERY, "ALERT": FREQ_ALERT, "RESP_YES": FREQ_RESP_YES, "RESP_NO": FREQ_RESP_NO}
|
|
freq = sig_map.get(signal_type, FREQ_QUERY)
|
|
t = np.linspace(0, 0.4, int(SAMPLE_RATE * 0.4), False)
|
|
tone = np.sin(freq * t * 2 * np.pi) * 0.7
|
|
envelope = np.ones_like(tone)
|
|
fade_len = int(SAMPLE_RATE * 0.05)
|
|
envelope[:fade_len] = np.linspace(0, 1, fade_len)
|
|
envelope[-fade_len:] = np.linspace(1, 0, fade_len)
|
|
return (tone * envelope).astype(np.float32).tobytes()
|
|
|
|
def detect_service_signal(self, data: bytes) -> str:
|
|
try:
|
|
audio_data = np.frombuffer(data, dtype=np.float32)
|
|
window_data = np.hanning(len(audio_data))
|
|
fft_data = np.abs(np.fft.rfft(audio_data * window_data))
|
|
frequencies = np.fft.rfftfreq(len(audio_data), d=1/SAMPLE_RATE)
|
|
detected_freq = frequencies[np.argmax(fft_data)]
|
|
sig_map = {FREQ_QUERY: "QUERY", FREQ_ALERT: "ALERT", FREQ_RESP_YES: "RESP_YES", FREQ_RESP_NO: "RESP_NO"}
|
|
for target_freq, sig_type in sig_map.items():
|
|
if abs(detected_freq - target_freq) <= 15:
|
|
return sig_type
|
|
return None
|
|
except:
|
|
return None
|
|
|
|
def generate_service_tone(frequency, duration=0.4):
|
|
t = np.linspace(0, duration, int(SAMPLE_RATE * duration), False)
|
|
tone = np.sin(frequency * t * 2 * np.pi) * 0.7
|
|
envelope = np.ones_like(tone)
|
|
fade_len = int(SAMPLE_RATE * 0.05)
|
|
envelope[:fade_len] = np.linspace(0, 1, fade_len)
|
|
envelope[-fade_len:] = np.linspace(1, 0, fade_len)
|
|
return (tone * envelope).astype(np.float32).tobytes()
|
|
|
|
def text_to_audio(text):
|
|
p = Plugin()
|
|
return p.encode(text)
|
|
|
|
def detect_service_tone(audio_data):
|
|
p = Plugin()
|
|
sig = p.detect_service_signal(audio_data.tobytes())
|
|
sig_map = {"QUERY": FREQ_QUERY, "ALERT": FREQ_ALERT, "RESP_YES": FREQ_RESP_YES, "RESP_NO": FREQ_RESP_NO}
|
|
return sig_map.get(sig, None)
|
|
|
|
def fast_decode(audio_data):
|
|
p = Plugin()
|
|
return p.decode(audio_data.tobytes()) |