|
|
|
|
|
""" |
|
|
Advanced Signal Processing and Modulation System |
|
|
=============================================== |
|
|
|
|
|
This module implements comprehensive digital signal processing including: |
|
|
- Multiple modulation schemes (BFSK, BPSK, QPSK, QAM16, OFDM, DSSS) |
|
|
- Forward Error Correction (FEC) coding |
|
|
- Framing, security, and watermarking |
|
|
- Audio and IQ signal generation |
|
|
- Visualization and analysis tools |
|
|
|
|
|
Author: Assistant |
|
|
License: MIT |
|
|
""" |
|
|
|
|
|
import binascii |
|
|
import hashlib |
|
|
import math |
|
|
import struct |
|
|
import time |
|
|
import wave |
|
|
from dataclasses import dataclass |
|
|
from enum import Enum, auto |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union |
|
|
|
|
|
import numpy as np |
|
|
from scipy import signal as sp_signal |
|
|
from scipy.fft import rfft, rfftfreq |
|
|
|
|
|
try: |
|
|
import matplotlib.pyplot as plt |
|
|
HAS_MATPLOTLIB = True |
|
|
except ImportError: |
|
|
HAS_MATPLOTLIB = False |
|
|
|
|
|
try: |
|
|
import sounddevice as sd |
|
|
HAS_AUDIO = True |
|
|
except ImportError: |
|
|
HAS_AUDIO = False |
|
|
|
|
|
try: |
|
|
from Crypto.Cipher import AES |
|
|
from Crypto.Random import get_random_bytes |
|
|
from Crypto.Protocol.KDF import PBKDF2 |
|
|
HAS_CRYPTO = True |
|
|
except ImportError: |
|
|
HAS_CRYPTO = False |
|
|
|
|
|
import logging |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ModulationScheme(Enum): |
|
|
BFSK = auto() |
|
|
BPSK = auto() |
|
|
QPSK = auto() |
|
|
QAM16 = auto() |
|
|
AFSK = auto() |
|
|
OFDM = auto() |
|
|
DSSS_BPSK = auto() |
|
|
|
|
|
class FEC(Enum): |
|
|
NONE = auto() |
|
|
HAMMING74 = auto() |
|
|
REED_SOLOMON = auto() |
|
|
LDPC = auto() |
|
|
TURBO = auto() |
|
|
|
|
|
@dataclass |
|
|
class ModConfig: |
|
|
sample_rate: int = 48000 |
|
|
symbol_rate: int = 1200 |
|
|
amplitude: float = 0.7 |
|
|
f0: float = 1200.0 |
|
|
f1: float = 2200.0 |
|
|
fc: float = 1800.0 |
|
|
clip: bool = True |
|
|
|
|
|
ofdm_subc: int = 64 |
|
|
cp_len: int = 16 |
|
|
|
|
|
dsss_chip_rate: int = 4800 |
|
|
|
|
|
@dataclass |
|
|
class FrameConfig: |
|
|
use_crc32: bool = True |
|
|
use_crc16: bool = False |
|
|
preamble: bytes = b"\x55" * 8 |
|
|
version: int = 1 |
|
|
|
|
|
@dataclass |
|
|
class SecurityConfig: |
|
|
password: Optional[str] = None |
|
|
watermark: Optional[str] = None |
|
|
hmac_key: Optional[str] = None |
|
|
|
|
|
@dataclass |
|
|
class OutputPaths: |
|
|
wav: Optional[Path] = None |
|
|
iq: Optional[Path] = None |
|
|
meta: Optional[Path] = None |
|
|
png: Optional[Path] = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def now_ms() -> int: |
|
|
return int(time.time() * 1000) |
|
|
|
|
|
def crc32_bytes(data: bytes) -> bytes: |
|
|
return binascii.crc32(data).to_bytes(4, "big") |
|
|
|
|
|
def crc16_ccitt(data: bytes) -> bytes: |
|
|
poly, crc = 0x1021, 0xFFFF |
|
|
for b in data: |
|
|
crc ^= b << 8 |
|
|
for _ in range(8): |
|
|
crc = ((crc << 1) ^ poly) & 0xFFFF if (crc & 0x8000) else ((crc << 1) & 0xFFFF) |
|
|
return crc.to_bytes(2, "big") |
|
|
|
|
|
def to_bits(data: bytes) -> List[int]: |
|
|
return [(byte >> i) & 1 for byte in data for i in range(7, -1, -1)] |
|
|
|
|
|
def from_bits(bits: Sequence[int]) -> bytes: |
|
|
if len(bits) % 8 != 0: |
|
|
bits = list(bits) + [0] * (8 - len(bits) % 8) |
|
|
out = bytearray() |
|
|
for i in range(0, len(bits), 8): |
|
|
byte = 0 |
|
|
for b in bits[i:i+8]: |
|
|
byte = (byte << 1) | (1 if b else 0) |
|
|
out.append(byte) |
|
|
return bytes(out) |
|
|
|
|
|
def chunk_bits(bits: Sequence[int], n: int) -> List[List[int]]: |
|
|
return [list(bits[i:i+n]) for i in range(0, len(bits), n)] |
|
|
|
|
|
def safe_json(obj: Any) -> str: |
|
|
import json |
|
|
def enc(x): |
|
|
if isinstance(x, (np.floating,)): |
|
|
return float(x) |
|
|
if isinstance(x, (np.integer,)): |
|
|
return int(x) |
|
|
if isinstance(x, (np.ndarray,)): |
|
|
return x.tolist() |
|
|
if isinstance(x, complex): |
|
|
return {"real": float(x.real), "imag": float(x.imag)} |
|
|
return str(x) |
|
|
return json.dumps(obj, ensure_ascii=False, indent=2, default=enc) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def hamming74_encode(data_bits: List[int]) -> List[int]: |
|
|
"""Hamming (7,4) encoding""" |
|
|
if len(data_bits) % 4 != 0: |
|
|
data_bits = data_bits + [0] * (4 - len(data_bits) % 4) |
|
|
|
|
|
out = [] |
|
|
for i in range(0, len(data_bits), 4): |
|
|
d0, d1, d2, d3 = data_bits[i:i+4] |
|
|
p1 = d0 ^ d1 ^ d3 |
|
|
p2 = d0 ^ d2 ^ d3 |
|
|
p3 = d1 ^ d2 ^ d3 |
|
|
out += [p1, p2, d0, p3, d1, d2, d3] |
|
|
|
|
|
return out |
|
|
|
|
|
def hamming74_decode(coded_bits: List[int]) -> Tuple[List[int], int]: |
|
|
"""Hamming (7,4) decoding with error correction""" |
|
|
if len(coded_bits) % 7 != 0: |
|
|
coded_bits = coded_bits + [0] * (7 - len(coded_bits) % 7) |
|
|
|
|
|
decoded = [] |
|
|
errors_corrected = 0 |
|
|
|
|
|
for i in range(0, len(coded_bits), 7): |
|
|
r = coded_bits[i:i+7] |
|
|
p1, p2, d0, p3, d1, d2, d3 = r |
|
|
|
|
|
|
|
|
s1 = p1 ^ d0 ^ d1 ^ d3 |
|
|
s2 = p2 ^ d0 ^ d2 ^ d3 |
|
|
s3 = p3 ^ d1 ^ d2 ^ d3 |
|
|
|
|
|
syndrome = s1 + 2*s2 + 4*s3 |
|
|
|
|
|
|
|
|
if syndrome != 0: |
|
|
errors_corrected += 1 |
|
|
if syndrome <= 7: |
|
|
r[syndrome - 1] ^= 1 |
|
|
|
|
|
|
|
|
decoded.extend([r[2], r[4], r[5], r[6]]) |
|
|
|
|
|
return decoded, errors_corrected |
|
|
|
|
|
def fec_encode(bits: List[int], scheme: FEC) -> List[int]: |
|
|
if scheme == FEC.NONE: |
|
|
return list(bits) |
|
|
elif scheme == FEC.HAMMING74: |
|
|
return hamming74_encode(bits) |
|
|
elif scheme in (FEC.REED_SOLOMON, FEC.LDPC, FEC.TURBO): |
|
|
raise NotImplementedError(f"{scheme.name} encoding not implemented") |
|
|
else: |
|
|
raise ValueError("Unknown FEC scheme") |
|
|
|
|
|
def fec_decode(bits: List[int], scheme: FEC) -> Tuple[List[int], Dict[str, Any]]: |
|
|
if scheme == FEC.NONE: |
|
|
return list(bits), {"errors_corrected": 0} |
|
|
elif scheme == FEC.HAMMING74: |
|
|
decoded, errors = hamming74_decode(bits) |
|
|
return decoded, {"errors_corrected": errors} |
|
|
else: |
|
|
raise NotImplementedError(f"{scheme.name} decoding not implemented") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def aes_gcm_encrypt(plaintext: bytes, password: str) -> bytes: |
|
|
if not HAS_CRYPTO: |
|
|
raise RuntimeError("pycryptodome required for encryption") |
|
|
|
|
|
salt = get_random_bytes(16) |
|
|
key = PBKDF2(password, salt, dkLen=32, count=200_000) |
|
|
nonce = get_random_bytes(12) |
|
|
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce) |
|
|
ciphertext, tag = cipher.encrypt_and_digest(plaintext) |
|
|
|
|
|
return b"AGCM" + salt + nonce + tag + ciphertext |
|
|
|
|
|
def aes_gcm_decrypt(encrypted: bytes, password: str) -> bytes: |
|
|
if not HAS_CRYPTO: |
|
|
raise RuntimeError("pycryptodome required for decryption") |
|
|
|
|
|
if not encrypted.startswith(b"AGCM"): |
|
|
raise ValueError("Invalid encrypted format") |
|
|
|
|
|
data = encrypted[4:] |
|
|
salt = data[:16] |
|
|
nonce = data[16:28] |
|
|
tag = data[28:44] |
|
|
ciphertext = data[44:] |
|
|
|
|
|
key = PBKDF2(password, salt, dkLen=32, count=200_000) |
|
|
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce) |
|
|
|
|
|
return cipher.decrypt_and_verify(ciphertext, tag) |
|
|
|
|
|
def apply_hmac(data: bytes, hkey: str) -> bytes: |
|
|
import hmac |
|
|
key = hashlib.sha256(hkey.encode("utf-8")).digest() |
|
|
mac = hmac.new(key, data, hashlib.sha256).digest() |
|
|
return data + b"HMAC" + mac |
|
|
|
|
|
def verify_hmac(data: bytes, hkey: str) -> Tuple[bytes, bool]: |
|
|
if not data.endswith(b"HMAC"): |
|
|
return data, False |
|
|
|
|
|
|
|
|
hmac_pos = data.rfind(b"HMAC") |
|
|
if hmac_pos == -1 or len(data) - hmac_pos != 36: |
|
|
return data, False |
|
|
|
|
|
payload = data[:hmac_pos] |
|
|
received_mac = data[hmac_pos + 4:] |
|
|
|
|
|
import hmac |
|
|
key = hashlib.sha256(hkey.encode("utf-8")).digest() |
|
|
expected_mac = hmac.new(key, payload, hashlib.sha256).digest() |
|
|
|
|
|
return payload, hmac.compare_digest(received_mac, expected_mac) |
|
|
|
|
|
def add_watermark(data: bytes, wm: str) -> bytes: |
|
|
return hashlib.sha256(wm.encode("utf-8")).digest()[:8] + data |
|
|
|
|
|
def check_watermark(data: bytes, wm: str) -> Tuple[bytes, bool]: |
|
|
if len(data) < 8: |
|
|
return data, False |
|
|
|
|
|
expected = hashlib.sha256(wm.encode("utf-8")).digest()[:8] |
|
|
received = data[:8] |
|
|
payload = data[8:] |
|
|
|
|
|
return payload, received == expected |
|
|
|
|
|
def frame_payload(payload: bytes, fcfg: FrameConfig) -> bytes: |
|
|
header = struct.pack(">BBI", 0xA5, fcfg.version, now_ms() & 0xFFFFFFFF) |
|
|
core = header + payload |
|
|
|
|
|
tail = b"" |
|
|
if fcfg.use_crc32: |
|
|
tail += crc32_bytes(core) |
|
|
if fcfg.use_crc16: |
|
|
tail += crc16_ccitt(core) |
|
|
|
|
|
return fcfg.preamble + core + tail |
|
|
|
|
|
def unframe_payload(framed: bytes, fcfg: FrameConfig) -> Tuple[bytes, Dict[str, Any]]: |
|
|
if len(framed) < len(fcfg.preamble) + 7: |
|
|
return b"", {"error": "Frame too short"} |
|
|
|
|
|
|
|
|
if not framed.startswith(fcfg.preamble): |
|
|
return b"", {"error": "Invalid preamble"} |
|
|
|
|
|
data = framed[len(fcfg.preamble):] |
|
|
|
|
|
|
|
|
if len(data) < 7: |
|
|
return b"", {"error": "Header too short"} |
|
|
|
|
|
sync, version, timestamp = struct.unpack(">BBI", data[:7]) |
|
|
if sync != 0xA5: |
|
|
return b"", {"error": "Invalid sync byte"} |
|
|
|
|
|
|
|
|
tail_len = 0 |
|
|
if fcfg.use_crc32: |
|
|
tail_len += 4 |
|
|
if fcfg.use_crc16: |
|
|
tail_len += 2 |
|
|
|
|
|
if len(data) < 7 + tail_len: |
|
|
return b"", {"error": "Frame too short for CRC"} |
|
|
|
|
|
payload = data[7:-tail_len] if tail_len > 0 else data[7:] |
|
|
|
|
|
|
|
|
info = {"version": version, "timestamp": timestamp} |
|
|
|
|
|
if fcfg.use_crc32: |
|
|
expected_crc32 = crc32_bytes(data[:-tail_len]) |
|
|
received_crc32 = data[-tail_len:-tail_len+4] if fcfg.use_crc16 else data[-4:] |
|
|
info["crc32_ok"] = expected_crc32 == received_crc32 |
|
|
|
|
|
if fcfg.use_crc16: |
|
|
expected_crc16 = crc16_ccitt(data[:-2]) |
|
|
received_crc16 = data[-2:] |
|
|
info["crc16_ok"] = expected_crc16 == received_crc16 |
|
|
|
|
|
return payload, info |
|
|
|
|
|
def encode_text(text: str, fcfg: FrameConfig, sec: SecurityConfig, fec_scheme: FEC) -> List[int]: |
|
|
"""Complete encoding pipeline""" |
|
|
data = text.encode("utf-8") |
|
|
|
|
|
|
|
|
if sec.watermark: |
|
|
data = add_watermark(data, sec.watermark) |
|
|
|
|
|
|
|
|
if sec.password: |
|
|
data = aes_gcm_encrypt(data, sec.password) |
|
|
|
|
|
|
|
|
framed = frame_payload(data, fcfg) |
|
|
|
|
|
|
|
|
if sec.hmac_key: |
|
|
framed = apply_hmac(framed, sec.hmac_key) |
|
|
|
|
|
|
|
|
bits = to_bits(framed) |
|
|
bits = fec_encode(bits, fec_scheme) |
|
|
|
|
|
return bits |
|
|
|
|
|
def decode_bits(bits: List[int], fcfg: FrameConfig, sec: SecurityConfig, fec_scheme: FEC) -> Tuple[str, Dict[str, Any]]: |
|
|
"""Complete decoding pipeline""" |
|
|
info = {} |
|
|
|
|
|
try: |
|
|
|
|
|
decoded_bits, fec_info = fec_decode(bits, fec_scheme) |
|
|
info.update(fec_info) |
|
|
|
|
|
|
|
|
framed = from_bits(decoded_bits) |
|
|
|
|
|
|
|
|
if sec.hmac_key: |
|
|
framed, hmac_ok = verify_hmac(framed, sec.hmac_key) |
|
|
info["hmac_ok"] = hmac_ok |
|
|
if not hmac_ok: |
|
|
return "", {**info, "error": "HMAC verification failed"} |
|
|
|
|
|
|
|
|
data, frame_info = unframe_payload(framed, fcfg) |
|
|
info.update(frame_info) |
|
|
|
|
|
if "error" in frame_info: |
|
|
return "", info |
|
|
|
|
|
|
|
|
if sec.password: |
|
|
data = aes_gcm_decrypt(data, sec.password) |
|
|
info["decrypted"] = True |
|
|
|
|
|
|
|
|
if sec.watermark: |
|
|
data, wm_ok = check_watermark(data, sec.watermark) |
|
|
info["watermark_ok"] = wm_ok |
|
|
if not wm_ok: |
|
|
return "", {**info, "error": "Watermark verification failed"} |
|
|
|
|
|
|
|
|
text = data.decode("utf-8", errors="replace") |
|
|
return text, info |
|
|
|
|
|
except Exception as e: |
|
|
return "", {**info, "error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Modulators: |
|
|
@staticmethod |
|
|
def bfsk(bits: Sequence[int], cfg: ModConfig) -> np.ndarray: |
|
|
"""Binary Frequency Shift Keying""" |
|
|
sr, rb = cfg.sample_rate, cfg.symbol_rate |
|
|
spb = int(sr / rb) |
|
|
t = np.arange(spb) / sr |
|
|
|
|
|
signal_blocks = [] |
|
|
for bit in bits: |
|
|
freq = cfg.f1 if bit else cfg.f0 |
|
|
signal_blocks.append(cfg.amplitude * np.sin(2 * np.pi * freq * t)) |
|
|
|
|
|
if not signal_blocks: |
|
|
return np.zeros(0, dtype=np.float32) |
|
|
|
|
|
signal = np.concatenate(signal_blocks) |
|
|
|
|
|
if cfg.clip: |
|
|
signal = np.clip(signal, -1, 1) |
|
|
|
|
|
return signal.astype(np.float32) |
|
|
|
|
|
@staticmethod |
|
|
def bpsk(bits: Sequence[int], cfg: ModConfig) -> Tuple[np.ndarray, np.ndarray]: |
|
|
"""Binary Phase Shift Keying""" |
|
|
sr, rb, fc = cfg.sample_rate, cfg.symbol_rate, cfg.fc |
|
|
spb = int(sr / rb) |
|
|
t = np.arange(spb) / sr |
|
|
|
|
|
audio_blocks = [] |
|
|
iq_blocks = [] |
|
|
|
|
|
for bit in bits: |
|
|
phase = 0.0 if bit else np.pi |
|
|
|
|
|
|
|
|
audio_blocks.append(cfg.amplitude * np.sin(2 * np.pi * fc * t + phase)) |
|
|
|
|
|
|
|
|
iq_symbol = cfg.amplitude * (np.cos(phase) + 1j * np.sin(phase)) |
|
|
iq_blocks.append(iq_symbol * np.ones(spb, dtype=np.complex64)) |
|
|
|
|
|
audio = np.concatenate(audio_blocks) if audio_blocks else np.zeros(0, dtype=np.float32) |
|
|
iq = np.concatenate(iq_blocks) if iq_blocks else np.zeros(0, dtype=np.complex64) |
|
|
|
|
|
if cfg.clip: |
|
|
audio = np.clip(audio, -1, 1) |
|
|
|
|
|
return audio.astype(np.float32), iq |
|
|
|
|
|
@staticmethod |
|
|
def qpsk(bits: Sequence[int], cfg: ModConfig) -> Tuple[np.ndarray, np.ndarray]: |
|
|
"""Quadrature Phase Shift Keying""" |
|
|
pairs = chunk_bits(bits, 2) |
|
|
symbols = [] |
|
|
|
|
|
|
|
|
for pair in pairs: |
|
|
b0, b1 = (pair + [0, 0])[:2] |
|
|
if (b0, b1) == (0, 0): |
|
|
symbol = 1 + 1j |
|
|
elif (b0, b1) == (0, 1): |
|
|
symbol = -1 + 1j |
|
|
elif (b0, b1) == (1, 1): |
|
|
symbol = -1 - 1j |
|
|
else: |
|
|
symbol = 1 - 1j |
|
|
|
|
|
symbols.append(symbol / math.sqrt(2)) |
|
|
|
|
|
return Modulators._psk_qam_to_audio_iq(np.array(symbols, dtype=np.complex64), cfg) |
|
|
|
|
|
@staticmethod |
|
|
def qam16(bits: Sequence[int], cfg: ModConfig) -> Tuple[np.ndarray, np.ndarray]: |
|
|
"""16-QAM modulation""" |
|
|
quads = chunk_bits(bits, 4) |
|
|
|
|
|
def gray_map_2bit(b0, b1): |
|
|
|
|
|
val = (b0 << 1) | b1 |
|
|
return [-3, -1, 1, 3][val] |
|
|
|
|
|
symbols = [] |
|
|
for quad in quads: |
|
|
b0, b1, b2, b3 = (quad + [0, 0, 0, 0])[:4] |
|
|
I = gray_map_2bit(b0, b1) |
|
|
Q = gray_map_2bit(b2, b3) |
|
|
symbol = (I + 1j * Q) / math.sqrt(10) |
|
|
symbols.append(symbol) |
|
|
|
|
|
return Modulators._psk_qam_to_audio_iq(np.array(symbols, dtype=np.complex64), cfg) |
|
|
|
|
|
@staticmethod |
|
|
def _psk_qam_to_audio_iq(symbols: np.ndarray, cfg: ModConfig) -> Tuple[np.ndarray, np.ndarray]: |
|
|
"""Convert PSK/QAM symbols to audio and IQ signals""" |
|
|
sr, rb, fc = cfg.sample_rate, cfg.symbol_rate, cfg.fc |
|
|
spb = int(sr / rb) |
|
|
|
|
|
|
|
|
i_data = np.repeat(symbols.real.astype(np.float32), spb) |
|
|
q_data = np.repeat(symbols.imag.astype(np.float32), spb) |
|
|
|
|
|
|
|
|
t = np.arange(len(i_data)) / sr |
|
|
|
|
|
|
|
|
audio = cfg.amplitude * (i_data * np.cos(2 * np.pi * fc * t) - |
|
|
q_data * np.sin(2 * np.pi * fc * t)) |
|
|
|
|
|
|
|
|
iq = (cfg.amplitude * i_data) + 1j * (cfg.amplitude * q_data) |
|
|
|
|
|
if cfg.clip: |
|
|
audio = np.clip(audio, -1, 1) |
|
|
|
|
|
return audio.astype(np.float32), iq.astype(np.complex64) |
|
|
|
|
|
@staticmethod |
|
|
def afsk(bits: Sequence[int], cfg: ModConfig) -> np.ndarray: |
|
|
"""Audio Frequency Shift Keying (same as BFSK)""" |
|
|
return Modulators.bfsk(bits, cfg) |
|
|
|
|
|
@staticmethod |
|
|
def dsss_bpsk(bits: Sequence[int], cfg: ModConfig) -> np.ndarray: |
|
|
"""Direct Sequence Spread Spectrum BPSK""" |
|
|
|
|
|
pn_sequence = np.array([1, -1, 1, 1, -1, 1, -1, -1], dtype=np.float32) |
|
|
|
|
|
sr = cfg.sample_rate |
|
|
chip_rate = cfg.dsss_chip_rate |
|
|
samples_per_chip = int(sr / chip_rate) |
|
|
|
|
|
baseband_signal = [] |
|
|
|
|
|
for bit in bits: |
|
|
bit_value = 1.0 if bit else -1.0 |
|
|
|
|
|
|
|
|
spread_chips = bit_value * pn_sequence |
|
|
|
|
|
|
|
|
for chip in spread_chips: |
|
|
baseband_signal.extend([chip] * samples_per_chip) |
|
|
|
|
|
baseband = np.array(baseband_signal, dtype=np.float32) |
|
|
|
|
|
|
|
|
t = np.arange(len(baseband)) / sr |
|
|
audio = cfg.amplitude * baseband * np.sin(2 * np.pi * cfg.fc * t) |
|
|
|
|
|
if cfg.clip: |
|
|
audio = np.clip(audio, -1, 1) |
|
|
|
|
|
return audio.astype(np.float32) |
|
|
|
|
|
@staticmethod |
|
|
def ofdm(bits: Sequence[int], cfg: ModConfig) -> Tuple[np.ndarray, np.ndarray]: |
|
|
"""Orthogonal Frequency Division Multiplexing""" |
|
|
N = cfg.ofdm_subc |
|
|
cp_len = cfg.cp_len |
|
|
|
|
|
|
|
|
symbol_chunks = chunk_bits(bits, 2 * N) |
|
|
|
|
|
audio_blocks = [] |
|
|
iq_blocks = [] |
|
|
|
|
|
for chunk in symbol_chunks: |
|
|
|
|
|
qpsk_symbols = [] |
|
|
bit_pairs = chunk_bits(chunk, 2) |
|
|
|
|
|
for pair in bit_pairs: |
|
|
b0, b1 = (pair + [0, 0])[:2] |
|
|
if (b0, b1) == (0, 0): |
|
|
symbol = 1 + 1j |
|
|
elif (b0, b1) == (0, 1): |
|
|
symbol = -1 + 1j |
|
|
elif (b0, b1) == (1, 1): |
|
|
symbol = -1 - 1j |
|
|
else: |
|
|
symbol = 1 - 1j |
|
|
qpsk_symbols.append(symbol / math.sqrt(2)) |
|
|
|
|
|
|
|
|
while len(qpsk_symbols) < N: |
|
|
qpsk_symbols.append(0j) |
|
|
|
|
|
|
|
|
freq_domain = np.array(qpsk_symbols[:N], dtype=np.complex64) |
|
|
time_domain = np.fft.ifft(freq_domain) |
|
|
|
|
|
|
|
|
cyclic_prefix = time_domain[-cp_len:] |
|
|
ofdm_symbol = np.concatenate([cyclic_prefix, time_domain]) |
|
|
|
|
|
|
|
|
symbol_duration = int(cfg.sample_rate / cfg.symbol_rate) |
|
|
repeat_factor = max(1, symbol_duration // len(ofdm_symbol)) |
|
|
upsampled = np.repeat(ofdm_symbol, repeat_factor) |
|
|
|
|
|
|
|
|
t = np.arange(len(upsampled)) / cfg.sample_rate |
|
|
audio = cfg.amplitude * (upsampled.real * np.cos(2 * np.pi * cfg.fc * t) - |
|
|
upsampled.imag * np.sin(2 * np.pi * cfg.fc * t)) |
|
|
|
|
|
audio_blocks.append(audio.astype(np.float32)) |
|
|
iq_blocks.append((cfg.amplitude * upsampled).astype(np.complex64)) |
|
|
|
|
|
audio = np.concatenate(audio_blocks) if audio_blocks else np.zeros(0, dtype=np.float32) |
|
|
iq = np.concatenate(iq_blocks) if iq_blocks else np.zeros(0, dtype=np.complex64) |
|
|
|
|
|
if cfg.clip: |
|
|
audio = np.clip(audio, -1, 1) |
|
|
|
|
|
return audio, iq |
|
|
|
|
|
def bits_to_signals(bits: List[int], scheme: ModulationScheme, cfg: ModConfig) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: |
|
|
"""Convert bits to modulated signals""" |
|
|
if scheme == ModulationScheme.BFSK: |
|
|
return Modulators.bfsk(bits, cfg), None |
|
|
elif scheme == ModulationScheme.AFSK: |
|
|
return Modulators.afsk(bits, cfg), None |
|
|
elif scheme == ModulationScheme.BPSK: |
|
|
return Modulators.bpsk(bits, cfg) |
|
|
elif scheme == ModulationScheme.QPSK: |
|
|
return Modulators.qpsk(bits, cfg) |
|
|
elif scheme == ModulationScheme.QAM16: |
|
|
return Modulators.qam16(bits, cfg) |
|
|
elif scheme == ModulationScheme.OFDM: |
|
|
return Modulators.ofdm(bits, cfg) |
|
|
elif scheme == ModulationScheme.DSSS_BPSK: |
|
|
return Modulators.dsss_bpsk(bits, cfg), None |
|
|
else: |
|
|
raise ValueError(f"Unknown modulation scheme: {scheme}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def write_wav_mono(path: Path, signal: np.ndarray, sample_rate: int): |
|
|
"""Write mono WAV file""" |
|
|
sig = np.clip(signal, -1.0, 1.0) |
|
|
pcm = (sig * 32767.0).astype(np.int16) |
|
|
|
|
|
with wave.open(str(path), "wb") as w: |
|
|
w.setnchannels(1) |
|
|
w.setsampwidth(2) |
|
|
w.setframerate(sample_rate) |
|
|
w.writeframes(pcm.tobytes()) |
|
|
|
|
|
def write_iq_f32(path: Path, iq: np.ndarray): |
|
|
"""Write IQ data as interleaved float32""" |
|
|
if iq.ndim != 1 or not np.iscomplexobj(iq): |
|
|
raise ValueError("iq must be 1-D complex array") |
|
|
|
|
|
interleaved = np.empty(iq.size * 2, dtype=np.float32) |
|
|
interleaved[0::2] = iq.real.astype(np.float32) |
|
|
interleaved[1::2] = iq.imag.astype(np.float32) |
|
|
|
|
|
path.write_bytes(interleaved.tobytes()) |
|
|
|
|
|
def plot_wave_and_spectrum(path_png: Path, x: np.ndarray, sr: int, title: str): |
|
|
"""Plot waveform and spectrum""" |
|
|
if not HAS_MATPLOTLIB: |
|
|
logger.warning("Matplotlib not available, skipping plot") |
|
|
return |
|
|
|
|
|
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8)) |
|
|
|
|
|
|
|
|
samples_to_plot = min(len(x), int(0.05 * sr)) |
|
|
t = np.arange(samples_to_plot) / sr |
|
|
ax1.plot(t, x[:samples_to_plot]) |
|
|
ax1.set_title(f"{title} - Time Domain (first 50ms)") |
|
|
ax1.set_xlabel("Time (s)") |
|
|
ax1.set_ylabel("Amplitude") |
|
|
ax1.grid(True, alpha=0.3) |
|
|
|
|
|
|
|
|
spectrum = np.abs(rfft(x)) + 1e-12 |
|
|
freqs = rfftfreq(len(x), 1.0 / sr) |
|
|
ax2.semilogy(freqs, spectrum / spectrum.max()) |
|
|
ax2.set_xlim(0, min(8000, sr // 2)) |
|
|
ax2.set_title(f"{title} - Frequency Domain") |
|
|
ax2.set_xlabel("Frequency (Hz)") |
|
|
ax2.set_ylabel("Normalized |X(f)|") |
|
|
ax2.grid(True, alpha=0.3) |
|
|
|
|
|
plt.tight_layout() |
|
|
fig.savefig(path_png, dpi=300, bbox_inches='tight') |
|
|
plt.close(fig) |
|
|
|
|
|
def plot_constellation(symbols: np.ndarray, title: str = "Constellation", save_path: Optional[str] = None): |
|
|
"""Plot constellation diagram""" |
|
|
if not HAS_MATPLOTLIB: |
|
|
logger.warning("Matplotlib not available, skipping constellation plot") |
|
|
return |
|
|
|
|
|
plt.figure(figsize=(8, 8)) |
|
|
plt.scatter(np.real(symbols), np.imag(symbols), alpha=0.7, s=20) |
|
|
plt.title(title) |
|
|
plt.xlabel("In-phase (I)") |
|
|
plt.ylabel("Quadrature (Q)") |
|
|
plt.grid(True, alpha=0.3) |
|
|
plt.axis('equal') |
|
|
|
|
|
if save_path: |
|
|
plt.savefig(save_path, dpi=300, bbox_inches='tight') |
|
|
plt.close() |
|
|
else: |
|
|
plt.show() |
|
|
|
|
|
def play_audio(x: np.ndarray, sr: int): |
|
|
"""Play audio through soundcard""" |
|
|
if not HAS_AUDIO: |
|
|
logger.warning("sounddevice not installed; cannot play audio") |
|
|
return |
|
|
|
|
|
try: |
|
|
sd.play(x, sr) |
|
|
sd.wait() |
|
|
except Exception as e: |
|
|
logger.error(f"Audio playback failed: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def full_process_and_save( |
|
|
text: str, |
|
|
outdir: Path, |
|
|
scheme: ModulationScheme, |
|
|
mcfg: ModConfig, |
|
|
fcfg: FrameConfig, |
|
|
sec: SecurityConfig, |
|
|
fec_scheme: FEC, |
|
|
want_wav: bool, |
|
|
want_iq: bool, |
|
|
title: str = "SignalProcessor" |
|
|
) -> OutputPaths: |
|
|
"""Complete processing pipeline from text to files""" |
|
|
|
|
|
outdir.mkdir(parents=True, exist_ok=True) |
|
|
timestamp = int(time.time()) |
|
|
base_name = f"signal_{scheme.name.lower()}_{timestamp}" |
|
|
base_path = outdir / base_name |
|
|
|
|
|
|
|
|
bits = encode_text(text, fcfg, sec, fec_scheme) |
|
|
logger.info(f"Encoded {len(text)} characters to {len(bits)} bits") |
|
|
|
|
|
|
|
|
audio, iq = bits_to_signals(bits, scheme, mcfg) |
|
|
|
|
|
paths = OutputPaths() |
|
|
|
|
|
|
|
|
if want_wav and audio is not None and len(audio) > 0: |
|
|
paths.wav = base_path.with_suffix(".wav") |
|
|
write_wav_mono(paths.wav, audio, mcfg.sample_rate) |
|
|
logger.info(f"Saved WAV: {paths.wav}") |
|
|
|
|
|
|
|
|
if want_iq: |
|
|
if iq is None and audio is not None: |
|
|
|
|
|
try: |
|
|
analytic = sp_signal.hilbert(audio) |
|
|
iq = analytic.astype(np.complex64) |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to generate IQ from audio: {e}") |
|
|
iq = audio.astype(np.float32) + 1j * np.zeros_like(audio, dtype=np.float32) |
|
|
|
|
|
if iq is not None: |
|
|
paths.iq = base_path.with_suffix(".iqf32") |
|
|
write_iq_f32(paths.iq, iq) |
|
|
logger.info(f"Saved IQ: {paths.iq}") |
|
|
|
|
|
|
|
|
if audio is not None and len(audio) > 0: |
|
|
paths.png = base_path.with_suffix(".png") |
|
|
plot_wave_and_spectrum(paths.png, audio, mcfg.sample_rate, title) |
|
|
logger.info(f"Saved plot: {paths.png}") |
|
|
|
|
|
|
|
|
metadata = { |
|
|
"timestamp": timestamp, |
|
|
"scheme": scheme.name, |
|
|
"sample_rate": mcfg.sample_rate, |
|
|
"symbol_rate": mcfg.symbol_rate, |
|
|
"duration_sec": len(audio) / mcfg.sample_rate if audio is not None else 0, |
|
|
"fec": fec_scheme.name, |
|
|
"encrypted": bool(sec.password), |
|
|
"watermark": bool(sec.watermark), |
|
|
"hmac": bool(sec.hmac_key), |
|
|
"text_length": len(text), |
|
|
"bits_length": len(bits) |
|
|
} |
|
|
|
|
|
paths.meta = base_path.with_suffix(".json") |
|
|
paths.meta.write_text(safe_json(metadata), encoding="utf-8") |
|
|
logger.info(f"Saved metadata: {paths.meta}") |
|
|
|
|
|
return paths |
|
|
|
|
|
def demo_signal_processing(): |
|
|
"""Demonstration of signal processing capabilities""" |
|
|
|
|
|
|
|
|
text = "Hello, World! This is a test of the signal processing system. 🚀" |
|
|
|
|
|
schemes_to_test = [ |
|
|
ModulationScheme.BFSK, |
|
|
ModulationScheme.QPSK, |
|
|
ModulationScheme.QAM16, |
|
|
ModulationScheme.OFDM |
|
|
] |
|
|
|
|
|
mcfg = ModConfig(sample_rate=48000, symbol_rate=1200) |
|
|
fcfg = FrameConfig() |
|
|
sec = SecurityConfig(watermark="test_watermark") |
|
|
fec_scheme = FEC.HAMMING74 |
|
|
|
|
|
results = [] |
|
|
|
|
|
for scheme in schemes_to_test: |
|
|
logger.info(f"Testing {scheme.name}...") |
|
|
|
|
|
try: |
|
|
paths = full_process_and_save( |
|
|
text=text, |
|
|
outdir=Path("demo_output"), |
|
|
scheme=scheme, |
|
|
mcfg=mcfg, |
|
|
fcfg=fcfg, |
|
|
sec=sec, |
|
|
fec_scheme=fec_scheme, |
|
|
want_wav=True, |
|
|
want_iq=True, |
|
|
title=f"{scheme.name} Demo" |
|
|
) |
|
|
|
|
|
results.append({ |
|
|
"scheme": scheme.name, |
|
|
"success": True, |
|
|
"paths": paths |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to process {scheme.name}: {e}") |
|
|
results.append({ |
|
|
"scheme": scheme.name, |
|
|
"success": False, |
|
|
"error": str(e) |
|
|
}) |
|
|
|
|
|
|
|
|
logger.info("=== Signal Processing Demo Complete ===") |
|
|
for result in results: |
|
|
status = "✓" if result["success"] else "✗" |
|
|
logger.info(f"{status} {result['scheme']}") |
|
|
|
|
|
return results |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo_signal_processing() |