116 lines
4.1 KiB
Python
116 lines
4.1 KiB
Python
#!/usr/bin/env python3
|
|
"""Stats for controller-like outputs (Type 2)."""
|
|
|
|
import argparse
|
|
import csv
|
|
import gzip
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Dict, List
|
|
|
|
|
|
def parse_args():
|
|
base_dir = Path(__file__).resolve().parent
|
|
parser = argparse.ArgumentParser(description="Controller output stats.")
|
|
parser.add_argument("--generated", default=str(base_dir / "results" / "generated.csv"))
|
|
parser.add_argument("--reference", default=str(base_dir / "config.json"))
|
|
parser.add_argument("--features", default="", help="comma-separated list")
|
|
parser.add_argument("--config", default=str(base_dir / "config.json"))
|
|
parser.add_argument("--out", default=str(base_dir / "results" / "controller_stats.json"))
|
|
parser.add_argument("--max-rows", type=int, default=200000)
|
|
return parser.parse_args()
|
|
|
|
|
|
def resolve_reference_glob(ref_arg: str) -> str:
|
|
ref_path = Path(ref_arg)
|
|
if ref_path.suffix == ".json":
|
|
cfg = json.loads(ref_path.read_text(encoding="utf-8"))
|
|
data_glob = cfg.get("data_glob") or cfg.get("data_path") or ""
|
|
if not data_glob:
|
|
raise SystemExit("reference config has no data_glob/data_path")
|
|
combined = ref_path.parent / data_glob
|
|
if "*" in str(combined) or "?" in str(combined):
|
|
return str(combined)
|
|
return str(combined.resolve())
|
|
return str(ref_path)
|
|
|
|
|
|
def read_series(path: Path, cols: List[str], max_rows: int) -> Dict[str, List[float]]:
|
|
vals = {c: [] for c in cols}
|
|
opener = gzip.open if str(path).endswith(".gz") else open
|
|
with opener(path, "rt", newline="") as fh:
|
|
reader = csv.DictReader(fh)
|
|
for i, row in enumerate(reader):
|
|
for c in cols:
|
|
try:
|
|
vals[c].append(float(row[c]))
|
|
except Exception:
|
|
pass
|
|
if max_rows > 0 and i + 1 >= max_rows:
|
|
break
|
|
return vals
|
|
|
|
|
|
def stats(series: List[float], vmin: float, vmax: float):
|
|
if not series:
|
|
return {"saturation_ratio": None, "change_rate": None, "step_median": None}
|
|
# saturation ratio near bounds (1% of range)
|
|
rng = vmax - vmin
|
|
tol = 0.01 * rng if rng > 0 else 0.0
|
|
sat = sum(1 for v in series if v <= vmin + tol or v >= vmax - tol) / len(series)
|
|
# change rate
|
|
changes = 0
|
|
steps = []
|
|
prev = series[0]
|
|
for v in series[1:]:
|
|
if v != prev:
|
|
changes += 1
|
|
steps.append(abs(v - prev))
|
|
prev = v
|
|
change_rate = changes / max(len(series) - 1, 1)
|
|
steps.sort()
|
|
step_median = steps[len(steps) // 2] if steps else None
|
|
return {"saturation_ratio": sat, "change_rate": change_rate, "step_median": step_median}
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
features = [f.strip() for f in args.features.split(",") if f.strip()]
|
|
if not features and Path(args.config).exists():
|
|
cfg = json.loads(Path(args.config).read_text(encoding="utf-8"))
|
|
features = cfg.get("type2_features", []) or []
|
|
if not features:
|
|
raise SystemExit("no features specified for controller_stats")
|
|
|
|
# generated
|
|
gen_vals = read_series(Path(args.generated), features, args.max_rows)
|
|
|
|
# reference
|
|
ref_glob = resolve_reference_glob(args.reference)
|
|
ref_paths = sorted(Path(ref_glob).parent.glob(Path(ref_glob).name))
|
|
if not ref_paths:
|
|
raise SystemExit(f"no reference files matched: {ref_glob}")
|
|
real_vals = {c: [] for c in features}
|
|
for p in ref_paths:
|
|
vals = read_series(p, features, args.max_rows)
|
|
for c in features:
|
|
real_vals[c].extend(vals[c])
|
|
|
|
out = {"features": features, "generated": {}, "reference": {}}
|
|
for c in features:
|
|
rv = real_vals[c]
|
|
if not rv:
|
|
continue
|
|
vmin, vmax = min(rv), max(rv)
|
|
out["generated"][c] = stats(gen_vals[c], vmin, vmax)
|
|
out["reference"][c] = stats(rv, vmin, vmax)
|
|
|
|
out_path = Path(args.out)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
out_path.write_text(json.dumps(out, indent=2), encoding="utf-8")
|
|
print("wrote", out_path)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|