#!/usr/bin/env python3 """Randomized backend-style API tester for Sprimo frontend endpoints.""" from __future__ import annotations import argparse import json import os import random import re import statistics import sys import time import uuid from dataclasses import dataclass, field from pathlib import Path from typing import Any from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen ANIMATION_NAMES = ( "idle", "happy", "love", "excited", "celebrate", "sleepy", "snoring", "working", "angry", "surprised", "shy", "dragging", # Backward-compatible aliases mapped in runtime/manifests. "active", "success", "error", # Intentionally invalid to keep unknown-animation traffic coverage. "unknown_anim", ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "Send random valid/invalid command traffic to Sprimo frontend API." ) ) parser.add_argument("--host", default="127.0.0.1", help="API host") parser.add_argument( "--port", type=int, default=None, help="API port (default: read from config)", ) parser.add_argument( "--token", default=None, help="Bearer token (default: read from config)", ) parser.add_argument( "--config-path", default=None, help="Explicit path to config.toml", ) parser.add_argument( "--app-name", default="sprimo", help="App name for config path discovery (default: sprimo)", ) parser.add_argument( "--duration-seconds", type=int, default=30, help="Total run duration in seconds", ) parser.add_argument( "--interval-ms", type=int, default=250, help="Delay between requests in milliseconds", ) parser.add_argument( "--batch-probability", type=float, default=0.35, help="Probability of using /v1/commands", ) parser.add_argument( "--max-batch-size", type=int, default=5, help="Maximum batch size for /v1/commands", ) parser.add_argument( "--invalid-probability", type=float, default=0.20, help="Probability of generating invalid request payloads", ) parser.add_argument( "--unauthorized-probability", type=float, default=0.05, help="Probability of sending an invalid/missing auth header", ) parser.add_argument( "--seed", type=int, default=None, help="Deterministic random seed", ) parser.add_argument( "--timeout-seconds", type=float, default=2.0, help="HTTP timeout for each request", ) parser.add_argument( "--health-check", action="store_true", help="Check /v1/health before sending random traffic", ) parser.add_argument( "--state-sample-every", type=int, default=10, help="Run GET /v1/state every N traffic requests (0 disables)", ) parser.add_argument( "--strict", action="store_true", help="Exit non-zero when unexpected errors are observed", ) parser.add_argument( "--json-summary", default=None, help="Write summary JSON to this file path", ) return parser.parse_args() def default_config_path_candidates(app_name: str) -> list[Path]: if os.name == "nt": appdata = os.environ.get("APPDATA") if not appdata: raise RuntimeError("APPDATA is not set; pass --config-path") base = Path(appdata) / app_name return [ base / "config" / "config.toml", base / "config.toml", ] home = Path.home() if sys.platform == "darwin": base = home / "Library" / "Application Support" / app_name return [ base / "config" / "config.toml", base / "config.toml", ] base = home / ".config" / app_name return [ base / "config" / "config.toml", base / "config.toml", ] def parse_api_from_config(config_path: Path) -> tuple[int, str]: if not config_path.exists(): raise RuntimeError(f"config path not found: {config_path}") text = config_path.read_text(encoding="utf-8") api_match = re.search( r"(?ms)^\[api\]\s*(.*?)(?=^\[|\Z)", text, ) if not api_match: raise RuntimeError(f"missing [api] section in {config_path}") api_block = api_match.group(1) port_match = re.search(r"(?m)^\s*port\s*=\s*(\d+)\s*$", api_block) token_match = re.search( r'(?m)^\s*auth_token\s*=\s*"([^"]+)"\s*$', api_block, ) if not port_match: raise RuntimeError(f"missing api.port in {config_path}") if not token_match: raise RuntimeError(f"missing api.auth_token in {config_path}") return int(port_match.group(1)), token_match.group(1) def now_ts_ms() -> int: return int(time.time() * 1000) def command_envelope(command: dict[str, Any]) -> dict[str, Any]: return { "id": str(uuid.uuid4()), "ts_ms": now_ts_ms(), "command": command, } def random_valid_command(rng: random.Random) -> dict[str, Any]: pick = rng.choice( ( "set_state", "play_animation", "set_sprite_pack", "set_transform", "set_flags", "toast", ) ) if pick == "set_state": payload: dict[str, Any] = {"state": rng.choice( ["idle", "active", "success", "error", "dragging", "hidden"] )} if rng.random() < 0.5: payload["ttl_ms"] = rng.choice([500, 1_000, 2_000, 5_000]) else: payload["ttl_ms"] = None return {"type": "set_state", "payload": payload} if pick == "play_animation": payload = { "name": rng.choice(ANIMATION_NAMES), "priority": rng.randint(0, 10), "duration_ms": rng.choice([None, 250, 500, 1000, 3000]), "interrupt": rng.choice([None, True, False]), } return {"type": "play_animation", "payload": payload} if pick == "set_sprite_pack": payload = { "pack_id_or_path": rng.choice( ["default", "missing-pack", "./assets/sprite-packs/default"] ) } return {"type": "set_sprite_pack", "payload": payload} if pick == "set_transform": payload = { "x": rng.choice([None, round(rng.uniform(0, 1400), 2)]), "y": rng.choice([None, round(rng.uniform(0, 900), 2)]), "anchor": rng.choice([None, "center", "bottom_left", "bottom_right"]), "scale": rng.choice([None, round(rng.uniform(0.5, 2.0), 2)]), "opacity": rng.choice([None, round(rng.uniform(0.2, 1.0), 2)]), } return {"type": "set_transform", "payload": payload} if pick == "set_flags": payload = { "click_through": rng.choice([None, False, True]), "always_on_top": rng.choice([None, False, True]), "visible": rng.choice([None, False, True]), } return {"type": "set_flags", "payload": payload} payload = { "text": rng.choice( ["hello", "backend-test", "ping", "status ok", "random toast"] ), "ttl_ms": rng.choice([None, 500, 1500, 2500]), } return {"type": "toast", "payload": payload} def random_invalid_payload(rng: random.Random, batch: bool) -> str | bytes: kind = rng.choice(("malformed", "missing_payload", "wrong_type")) if kind == "malformed": return b'{"id":"oops","command":' if batch: raw = [ { "id": "not-a-uuid", "ts_ms": "not-int", "command": {"type": "set_state"}, } ] else: raw = { "id": "not-a-uuid", "ts_ms": "not-int", "command": {"type": "set_state"}, } if kind == "wrong_type": if batch: raw[0]["command"] = {"type": "unknown_command", "payload": {"x": "bad"}} else: raw["command"] = {"type": "unknown_command", "payload": {"x": "bad"}} return json.dumps(raw) def encode_json_payload(payload: Any) -> bytes: return json.dumps(payload).encode("utf-8") @dataclass class Stats: start_monotonic: float = field(default_factory=time.monotonic) total_requests: int = 0 total_commands: int = 0 endpoint_counts: dict[str, int] = field( default_factory=lambda: {"/v1/command": 0, "/v1/commands": 0, "/v1/state": 0, "/v1/health": 0} ) status_counts: dict[str, int] = field(default_factory=dict) transport_errors: int = 0 expected_outcomes: int = 0 unexpected_outcomes: int = 0 latency_ms: list[float] = field(default_factory=list) def bump_status(self, code: int) -> None: key = str(code) self.status_counts[key] = self.status_counts.get(key, 0) + 1 def build_auth_header( rng: random.Random, token: str, unauthorized_probability: float, ) -> dict[str, str]: if rng.random() >= unauthorized_probability: return {"Authorization": f"Bearer {token}"} # Simulate mixed unauthorized scenarios. mode = rng.choice(("missing", "bad")) if mode == "missing": return {} return {"Authorization": "Bearer invalid-token"} def request_json( method: str, url: str, body: bytes | None, timeout_seconds: float, headers: dict[str, str], ) -> tuple[int | None, str]: req_headers = {"Content-Type": "application/json", **headers} request = Request(url=url, data=body, method=method, headers=req_headers) try: with urlopen(request, timeout=timeout_seconds) as response: raw = response.read().decode("utf-8", errors="replace") return response.status, raw except HTTPError as err: raw = err.read().decode("utf-8", errors="replace") return err.code, raw except URLError as err: return None, str(err.reason) except TimeoutError: return None, "timeout" def expected_status(is_invalid_payload: bool, is_unauthorized: bool) -> set[int]: if is_unauthorized: return {401} if is_invalid_payload: return {400} return {202} def health_check( base_url: str, timeout_seconds: float, stats: Stats, ) -> bool: url = f"{base_url}/v1/health" stats.total_requests += 1 stats.endpoint_counts["/v1/health"] += 1 started = time.monotonic() code, _ = request_json( method="GET", url=url, body=None, timeout_seconds=timeout_seconds, headers={}, ) elapsed_ms = (time.monotonic() - started) * 1000.0 stats.latency_ms.append(elapsed_ms) if code is None: stats.transport_errors += 1 print("health check failed: transport error") return False stats.bump_status(code) if code != 200: print(f"health check failed: expected 200, got {code}") return False return True def sample_state( base_url: str, token: str, timeout_seconds: float, stats: Stats, ) -> None: url = f"{base_url}/v1/state" stats.total_requests += 1 stats.endpoint_counts["/v1/state"] += 1 started = time.monotonic() code, _ = request_json( method="GET", url=url, body=None, timeout_seconds=timeout_seconds, headers={"Authorization": f"Bearer {token}"}, ) elapsed_ms = (time.monotonic() - started) * 1000.0 stats.latency_ms.append(elapsed_ms) if code is None: stats.transport_errors += 1 stats.unexpected_outcomes += 1 return stats.bump_status(code) if code == 200: stats.expected_outcomes += 1 else: stats.unexpected_outcomes += 1 def run_traffic( args: argparse.Namespace, port: int, token: str, ) -> Stats: rng = random.Random(args.seed) stats = Stats() base_url = f"http://{args.host}:{port}" if args.health_check and not health_check(base_url, args.timeout_seconds, stats): return stats deadline = time.monotonic() + max(1, args.duration_seconds) req_index = 0 while time.monotonic() < deadline: req_index += 1 use_batch = rng.random() < args.batch_probability endpoint = "/v1/commands" if use_batch else "/v1/command" is_invalid = rng.random() < args.invalid_probability unauthorized = rng.random() < args.unauthorized_probability auth_headers = build_auth_header(rng, token, 1.0 if unauthorized else 0.0) if use_batch: batch_size = rng.randint(1, max(1, args.max_batch_size)) if is_invalid: payload = random_invalid_payload(rng, batch=True) body = payload if isinstance(payload, bytes) else payload.encode("utf-8") command_count = batch_size else: commands = [ command_envelope(random_valid_command(rng)) for _ in range(batch_size) ] body = encode_json_payload(commands) command_count = len(commands) else: if is_invalid: payload = random_invalid_payload(rng, batch=False) body = payload if isinstance(payload, bytes) else payload.encode("utf-8") command_count = 1 else: envelope = command_envelope(random_valid_command(rng)) body = encode_json_payload(envelope) command_count = 1 stats.total_requests += 1 stats.total_commands += command_count stats.endpoint_counts[endpoint] += 1 started = time.monotonic() code, _ = request_json( method="POST", url=f"{base_url}{endpoint}", body=body, timeout_seconds=args.timeout_seconds, headers=auth_headers, ) elapsed_ms = (time.monotonic() - started) * 1000.0 stats.latency_ms.append(elapsed_ms) if code is None: stats.transport_errors += 1 stats.unexpected_outcomes += 1 else: stats.bump_status(code) expected = expected_status(is_invalid, unauthorized) if code in expected: stats.expected_outcomes += 1 else: stats.unexpected_outcomes += 1 if args.state_sample_every > 0 and req_index % args.state_sample_every == 0: sample_state(base_url, token, args.timeout_seconds, stats) time.sleep(max(0, args.interval_ms) / 1000.0) return stats def summarize(args: argparse.Namespace, port: int, stats: Stats) -> dict[str, Any]: elapsed = time.monotonic() - stats.start_monotonic latency_avg = statistics.fmean(stats.latency_ms) if stats.latency_ms else 0.0 latency_min = min(stats.latency_ms) if stats.latency_ms else 0.0 latency_max = max(stats.latency_ms) if stats.latency_ms else 0.0 summary: dict[str, Any] = { "host": args.host, "port": port, "duration_seconds": round(elapsed, 3), "seed": args.seed, "requests_total": stats.total_requests, "commands_total": stats.total_commands, "endpoint_counts": stats.endpoint_counts, "status_counts": stats.status_counts, "transport_errors": stats.transport_errors, "expected_outcomes": stats.expected_outcomes, "unexpected_outcomes": stats.unexpected_outcomes, "latency_ms": { "avg": round(latency_avg, 2), "min": round(latency_min, 2), "max": round(latency_max, 2), }, "strict": args.strict, } return summary def resolve_port_and_token(args: argparse.Namespace) -> tuple[int, str]: port = args.port token = args.token if port is not None and token: return port, token if args.config_path: candidates = [Path(args.config_path)] else: candidates = default_config_path_candidates(args.app_name) chosen: Path | None = None for path in candidates: if path.exists(): chosen = path break if chosen is None: formatted = ", ".join(str(path) for path in candidates) raise RuntimeError(f"config path not found; tried: {formatted}") cfg_port, cfg_token = parse_api_from_config(chosen) return (port or cfg_port), (token or cfg_token) def main() -> int: args = parse_args() if args.max_batch_size < 1: print("error: --max-batch-size must be >= 1", file=sys.stderr) return 2 try: port, token = resolve_port_and_token(args) except RuntimeError as err: print(f"error: {err}", file=sys.stderr) return 2 stats = run_traffic(args, port, token) summary = summarize(args, port, stats) print(json.dumps(summary, indent=2)) if args.json_summary: path = Path(args.json_summary) path.write_text(json.dumps(summary, indent=2) + "\n", encoding="utf-8") if args.strict and summary["unexpected_outcomes"] > 0: return 1 if summary["requests_total"] == 0: return 1 return 0 if __name__ == "__main__": raise SystemExit(main())