Add: dummy backend for behavioural testing

This commit is contained in:
DaZuo0122
2026-02-13 15:34:01 +08:00
parent 55fe53235d
commit 77f4139392
7 changed files with 691 additions and 3 deletions

View File

@@ -0,0 +1,568 @@
#!/usr/bin/env python3
"""Randomized backend-style API tester for Sprimo frontend endpoints."""
from __future__ import annotations
import argparse
import json
import os
import random
import re
import statistics
import sys
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Send random valid/invalid command traffic to Sprimo frontend API."
)
)
parser.add_argument("--host", default="127.0.0.1", help="API host")
parser.add_argument(
"--port",
type=int,
default=None,
help="API port (default: read from config)",
)
parser.add_argument(
"--token",
default=None,
help="Bearer token (default: read from config)",
)
parser.add_argument(
"--config-path",
default=None,
help="Explicit path to config.toml",
)
parser.add_argument(
"--app-name",
default="sprimo",
help="App name for config path discovery (default: sprimo)",
)
parser.add_argument(
"--duration-seconds",
type=int,
default=30,
help="Total run duration in seconds",
)
parser.add_argument(
"--interval-ms",
type=int,
default=250,
help="Delay between requests in milliseconds",
)
parser.add_argument(
"--batch-probability",
type=float,
default=0.35,
help="Probability of using /v1/commands",
)
parser.add_argument(
"--max-batch-size",
type=int,
default=5,
help="Maximum batch size for /v1/commands",
)
parser.add_argument(
"--invalid-probability",
type=float,
default=0.20,
help="Probability of generating invalid request payloads",
)
parser.add_argument(
"--unauthorized-probability",
type=float,
default=0.05,
help="Probability of sending an invalid/missing auth header",
)
parser.add_argument(
"--seed",
type=int,
default=None,
help="Deterministic random seed",
)
parser.add_argument(
"--timeout-seconds",
type=float,
default=2.0,
help="HTTP timeout for each request",
)
parser.add_argument(
"--health-check",
action="store_true",
help="Check /v1/health before sending random traffic",
)
parser.add_argument(
"--state-sample-every",
type=int,
default=10,
help="Run GET /v1/state every N traffic requests (0 disables)",
)
parser.add_argument(
"--strict",
action="store_true",
help="Exit non-zero when unexpected errors are observed",
)
parser.add_argument(
"--json-summary",
default=None,
help="Write summary JSON to this file path",
)
return parser.parse_args()
def default_config_path_candidates(app_name: str) -> list[Path]:
if os.name == "nt":
appdata = os.environ.get("APPDATA")
if not appdata:
raise RuntimeError("APPDATA is not set; pass --config-path")
base = Path(appdata) / app_name
return [
base / "config" / "config.toml",
base / "config.toml",
]
home = Path.home()
if sys.platform == "darwin":
base = home / "Library" / "Application Support" / app_name
return [
base / "config" / "config.toml",
base / "config.toml",
]
base = home / ".config" / app_name
return [
base / "config" / "config.toml",
base / "config.toml",
]
def parse_api_from_config(config_path: Path) -> tuple[int, str]:
if not config_path.exists():
raise RuntimeError(f"config path not found: {config_path}")
text = config_path.read_text(encoding="utf-8")
api_match = re.search(
r"(?ms)^\[api\]\s*(.*?)(?=^\[|\Z)",
text,
)
if not api_match:
raise RuntimeError(f"missing [api] section in {config_path}")
api_block = api_match.group(1)
port_match = re.search(r"(?m)^\s*port\s*=\s*(\d+)\s*$", api_block)
token_match = re.search(
r'(?m)^\s*auth_token\s*=\s*"([^"]+)"\s*$',
api_block,
)
if not port_match:
raise RuntimeError(f"missing api.port in {config_path}")
if not token_match:
raise RuntimeError(f"missing api.auth_token in {config_path}")
return int(port_match.group(1)), token_match.group(1)
def now_ts_ms() -> int:
return int(time.time() * 1000)
def command_envelope(command: dict[str, Any]) -> dict[str, Any]:
return {
"id": str(uuid.uuid4()),
"ts_ms": now_ts_ms(),
"command": command,
}
def random_valid_command(rng: random.Random) -> dict[str, Any]:
pick = rng.choice(
(
"set_state",
"play_animation",
"set_sprite_pack",
"set_transform",
"set_flags",
"toast",
)
)
if pick == "set_state":
payload: dict[str, Any] = {"state": rng.choice(
["idle", "active", "success", "error", "dragging", "hidden"]
)}
if rng.random() < 0.5:
payload["ttl_ms"] = rng.choice([500, 1_000, 2_000, 5_000])
else:
payload["ttl_ms"] = None
return {"type": "set_state", "payload": payload}
if pick == "play_animation":
payload = {
"name": rng.choice(
["idle", "dance", "typing", "celebrate", "error", "unknown_anim"]
),
"priority": rng.randint(0, 10),
"duration_ms": rng.choice([None, 250, 500, 1000, 3000]),
"interrupt": rng.choice([None, True, False]),
}
return {"type": "play_animation", "payload": payload}
if pick == "set_sprite_pack":
payload = {
"pack_id_or_path": rng.choice(
["default", "missing-pack", "./assets/sprite-packs/default"]
)
}
return {"type": "set_sprite_pack", "payload": payload}
if pick == "set_transform":
payload = {
"x": rng.choice([None, round(rng.uniform(0, 1400), 2)]),
"y": rng.choice([None, round(rng.uniform(0, 900), 2)]),
"anchor": rng.choice([None, "center", "bottom_left", "bottom_right"]),
"scale": rng.choice([None, round(rng.uniform(0.5, 2.0), 2)]),
"opacity": rng.choice([None, round(rng.uniform(0.2, 1.0), 2)]),
}
return {"type": "set_transform", "payload": payload}
if pick == "set_flags":
payload = {
"click_through": rng.choice([None, False, True]),
"always_on_top": rng.choice([None, False, True]),
"visible": rng.choice([None, False, True]),
}
return {"type": "set_flags", "payload": payload}
payload = {
"text": rng.choice(
["hello", "backend-test", "ping", "status ok", "random toast"]
),
"ttl_ms": rng.choice([None, 500, 1500, 2500]),
}
return {"type": "toast", "payload": payload}
def random_invalid_payload(rng: random.Random, batch: bool) -> str | bytes:
kind = rng.choice(("malformed", "missing_payload", "wrong_type"))
if kind == "malformed":
return b'{"id":"oops","command":'
if batch:
raw = [
{
"id": "not-a-uuid",
"ts_ms": "not-int",
"command": {"type": "set_state"},
}
]
else:
raw = {
"id": "not-a-uuid",
"ts_ms": "not-int",
"command": {"type": "set_state"},
}
if kind == "wrong_type":
if batch:
raw[0]["command"] = {"type": "unknown_command", "payload": {"x": "bad"}}
else:
raw["command"] = {"type": "unknown_command", "payload": {"x": "bad"}}
return json.dumps(raw)
def encode_json_payload(payload: Any) -> bytes:
return json.dumps(payload).encode("utf-8")
@dataclass
class Stats:
start_monotonic: float = field(default_factory=time.monotonic)
total_requests: int = 0
total_commands: int = 0
endpoint_counts: dict[str, int] = field(
default_factory=lambda: {"/v1/command": 0, "/v1/commands": 0, "/v1/state": 0, "/v1/health": 0}
)
status_counts: dict[str, int] = field(default_factory=dict)
transport_errors: int = 0
expected_outcomes: int = 0
unexpected_outcomes: int = 0
latency_ms: list[float] = field(default_factory=list)
def bump_status(self, code: int) -> None:
key = str(code)
self.status_counts[key] = self.status_counts.get(key, 0) + 1
def build_auth_header(
rng: random.Random,
token: str,
unauthorized_probability: float,
) -> dict[str, str]:
if rng.random() >= unauthorized_probability:
return {"Authorization": f"Bearer {token}"}
# Simulate mixed unauthorized scenarios.
mode = rng.choice(("missing", "bad"))
if mode == "missing":
return {}
return {"Authorization": "Bearer invalid-token"}
def request_json(
method: str,
url: str,
body: bytes | None,
timeout_seconds: float,
headers: dict[str, str],
) -> tuple[int | None, str]:
req_headers = {"Content-Type": "application/json", **headers}
request = Request(url=url, data=body, method=method, headers=req_headers)
try:
with urlopen(request, timeout=timeout_seconds) as response:
raw = response.read().decode("utf-8", errors="replace")
return response.status, raw
except HTTPError as err:
raw = err.read().decode("utf-8", errors="replace")
return err.code, raw
except URLError as err:
return None, str(err.reason)
except TimeoutError:
return None, "timeout"
def expected_status(is_invalid_payload: bool, is_unauthorized: bool) -> set[int]:
if is_unauthorized:
return {401}
if is_invalid_payload:
return {400}
return {202}
def health_check(
base_url: str,
timeout_seconds: float,
stats: Stats,
) -> bool:
url = f"{base_url}/v1/health"
stats.total_requests += 1
stats.endpoint_counts["/v1/health"] += 1
started = time.monotonic()
code, _ = request_json(
method="GET",
url=url,
body=None,
timeout_seconds=timeout_seconds,
headers={},
)
elapsed_ms = (time.monotonic() - started) * 1000.0
stats.latency_ms.append(elapsed_ms)
if code is None:
stats.transport_errors += 1
print("health check failed: transport error")
return False
stats.bump_status(code)
if code != 200:
print(f"health check failed: expected 200, got {code}")
return False
return True
def sample_state(
base_url: str,
token: str,
timeout_seconds: float,
stats: Stats,
) -> None:
url = f"{base_url}/v1/state"
stats.total_requests += 1
stats.endpoint_counts["/v1/state"] += 1
started = time.monotonic()
code, _ = request_json(
method="GET",
url=url,
body=None,
timeout_seconds=timeout_seconds,
headers={"Authorization": f"Bearer {token}"},
)
elapsed_ms = (time.monotonic() - started) * 1000.0
stats.latency_ms.append(elapsed_ms)
if code is None:
stats.transport_errors += 1
stats.unexpected_outcomes += 1
return
stats.bump_status(code)
if code == 200:
stats.expected_outcomes += 1
else:
stats.unexpected_outcomes += 1
def run_traffic(
args: argparse.Namespace,
port: int,
token: str,
) -> Stats:
rng = random.Random(args.seed)
stats = Stats()
base_url = f"http://{args.host}:{port}"
if args.health_check and not health_check(base_url, args.timeout_seconds, stats):
return stats
deadline = time.monotonic() + max(1, args.duration_seconds)
req_index = 0
while time.monotonic() < deadline:
req_index += 1
use_batch = rng.random() < args.batch_probability
endpoint = "/v1/commands" if use_batch else "/v1/command"
is_invalid = rng.random() < args.invalid_probability
unauthorized = rng.random() < args.unauthorized_probability
auth_headers = build_auth_header(rng, token, 1.0 if unauthorized else 0.0)
if use_batch:
batch_size = rng.randint(1, max(1, args.max_batch_size))
if is_invalid:
payload = random_invalid_payload(rng, batch=True)
body = payload if isinstance(payload, bytes) else payload.encode("utf-8")
command_count = batch_size
else:
commands = [
command_envelope(random_valid_command(rng))
for _ in range(batch_size)
]
body = encode_json_payload(commands)
command_count = len(commands)
else:
if is_invalid:
payload = random_invalid_payload(rng, batch=False)
body = payload if isinstance(payload, bytes) else payload.encode("utf-8")
command_count = 1
else:
envelope = command_envelope(random_valid_command(rng))
body = encode_json_payload(envelope)
command_count = 1
stats.total_requests += 1
stats.total_commands += command_count
stats.endpoint_counts[endpoint] += 1
started = time.monotonic()
code, _ = request_json(
method="POST",
url=f"{base_url}{endpoint}",
body=body,
timeout_seconds=args.timeout_seconds,
headers=auth_headers,
)
elapsed_ms = (time.monotonic() - started) * 1000.0
stats.latency_ms.append(elapsed_ms)
if code is None:
stats.transport_errors += 1
stats.unexpected_outcomes += 1
else:
stats.bump_status(code)
expected = expected_status(is_invalid, unauthorized)
if code in expected:
stats.expected_outcomes += 1
else:
stats.unexpected_outcomes += 1
if args.state_sample_every > 0 and req_index % args.state_sample_every == 0:
sample_state(base_url, token, args.timeout_seconds, stats)
time.sleep(max(0, args.interval_ms) / 1000.0)
return stats
def summarize(args: argparse.Namespace, port: int, stats: Stats) -> dict[str, Any]:
elapsed = time.monotonic() - stats.start_monotonic
latency_avg = statistics.fmean(stats.latency_ms) if stats.latency_ms else 0.0
latency_min = min(stats.latency_ms) if stats.latency_ms else 0.0
latency_max = max(stats.latency_ms) if stats.latency_ms else 0.0
summary: dict[str, Any] = {
"host": args.host,
"port": port,
"duration_seconds": round(elapsed, 3),
"seed": args.seed,
"requests_total": stats.total_requests,
"commands_total": stats.total_commands,
"endpoint_counts": stats.endpoint_counts,
"status_counts": stats.status_counts,
"transport_errors": stats.transport_errors,
"expected_outcomes": stats.expected_outcomes,
"unexpected_outcomes": stats.unexpected_outcomes,
"latency_ms": {
"avg": round(latency_avg, 2),
"min": round(latency_min, 2),
"max": round(latency_max, 2),
},
"strict": args.strict,
}
return summary
def resolve_port_and_token(args: argparse.Namespace) -> tuple[int, str]:
port = args.port
token = args.token
if port is not None and token:
return port, token
if args.config_path:
candidates = [Path(args.config_path)]
else:
candidates = default_config_path_candidates(args.app_name)
chosen: Path | None = None
for path in candidates:
if path.exists():
chosen = path
break
if chosen is None:
formatted = ", ".join(str(path) for path in candidates)
raise RuntimeError(f"config path not found; tried: {formatted}")
cfg_port, cfg_token = parse_api_from_config(chosen)
return (port or cfg_port), (token or cfg_token)
def main() -> int:
args = parse_args()
if args.max_batch_size < 1:
print("error: --max-batch-size must be >= 1", file=sys.stderr)
return 2
try:
port, token = resolve_port_and_token(args)
except RuntimeError as err:
print(f"error: {err}", file=sys.stderr)
return 2
stats = run_traffic(args, port, token)
summary = summarize(args, port, stats)
print(json.dumps(summary, indent=2))
if args.json_summary:
path = Path(args.json_summary)
path.write_text(json.dumps(summary, indent=2) + "\n", encoding="utf-8")
if args.strict and summary["unexpected_outcomes"] > 0:
return 1
if summary["requests_total"] == 0:
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())