cleanup: delete 6 orphan modules, quarantine pair generators, trim stale comments

Deleted (zero importers/refs): scripts/{migrate_out_dirs,audit_log,plot_route_evidence}.py
and src/projected_grpo/{bake_lora,probe_lora_runtime,probe_traj}.py (LoRA-merge path
+ dev trajectory comparator, superseded). Removed the dead probe-traj recipe.

Quarantined to scripts/attic/: make_pairsets.py + make_dataset_pairsets.py (persona-pair
authoring, tasks #123-126 done; live path is pairs.PAIRS / pairs_from_pool).

Comments: dropped dead job-ID narrative (job 60/64) on rollout_ablate_frac, the
'vanilla step 17' dead-run ref in eval.py, the 'old signed sum' dead-code ref in
proj.py, and the conversational 'current experiment line' lead. Kept all TODO/FIXME
and the 'why' memory-tuning comments. Smoke green (cout->0).

Co-Authored-By: Claudypoo <288921227+claudypoo@users.noreply.github.com>
This commit is contained in:
wassname
2026-06-03 00:09:01 +00:00
parent 025debae6b
commit fffd26a93d
8 changed files with 0 additions and 632 deletions
-126
View File
@@ -1,126 +0,0 @@
"""Audit a training run: quote first/last generation (coherence eyeball) + summarise
the key per-step columns with trend arrows and SHOULD-interpretation hints.
Deterministic extraction; the /audit-log command feeds this to the LLM for a verdict.
Usage:
uv run python scripts/audit_log.py out/runs/<ts>_<tag> # run dir
uv run python scripts/audit_log.py logs/<ts>_<tag>.log # log (finds sibling run dir)
"""
from __future__ import annotations
import json
import re
import sys
from pathlib import Path
def _find(arg: str) -> tuple[Path | None, Path | None]:
"""Resolve (rollouts.jsonl, streaming .log) from a run-dir or log path."""
p = Path(arg)
if p.is_dir():
jl = p / "rollouts.jsonl"
# match a log whose argv out-tag matches this run dir's tag
tag = re.sub(r"^\d{8}T\d{6}_(fast|smoke|full)_", "", p.name)
logs = sorted(Path("logs").glob("*.log"))
log = next((l for l in reversed(logs) if tag in l.read_text(errors="replace")[:2000]), None)
return (jl if jl.exists() else None), log
if p.suffix == ".log":
# find the run dir from the verbose-log line is overkill; use jsonl by tag
return None, p
return None, None
# A fixed coherent-vanilla yardstick (Qwen3-4B, sub4 vanilla seed41 step 59):
# real imports, a class, indented code. The audited last-gen should look like
# THIS. If it's punctuation soup instead (job 46 step 14: '####?##%\r\n#_...'),
# the policy diverged in free generation -- even when lp_t (teacher-forced
# coherence) stayed flat and the divergence tripwire never fired.
REFERENCE_HEALTHY = (
"```python\nfrom typing import List\n\nclass Solution:\n"
" def maxPoints(self, points: List[List[int]]) -> int:\n"
" def slope(p1, p2): # reduced (dx,dy) via GCD, no float error\n"
" if p1 == p2: return (0, 0)"
)
def _gen(jl: Path) -> None:
rows = [json.loads(l) for l in jl.open()]
if not rows:
print("rollouts.jsonl EMPTY"); return
print(f"rollouts: {len(rows)} rows, steps {rows[0]['step']}..{rows[-1]['step']}")
print("\n--- REFERENCE: healthy vanilla gen (fixed yardstick, NOT this run) ---")
print(repr(REFERENCE_HEALTHY))
for r in (rows[0], rows[-1]):
print(f"\n--- step {r['step']} reward={r['reward']:+.2f} gt_pass={r['gt_pass']} "
f"hack={r.get('exploited', r.get('hacked_E'))} ---")
print("SHOULD: read like the REFERENCE above (coherent code); ELSE token salad => diverged")
print(repr(r["text"][:400]))
def _cols(log: Path) -> None:
txt = log.read_text(errors="replace")
hdr = next((l for l in txt.splitlines()
if "| INFO |" in l and l.split("| INFO |", 1)[1].split()[:1] == ["step"]
and "ref_eq" in l), None)
if hdr is None:
print("\nno streaming table in log"); return
names = [re.sub(r"[^a-z0-9_]", "", t.lower()) for t in hdr.split("| INFO |", 1)[1].split()]
idx = {n: i for i, n in enumerate(names)}
rows = []
for line in txt.splitlines():
if "| INFO |" not in line:
continue
r = line.split("| INFO |", 1)[1].split()
if r and r[0].isdigit() and len(r) >= len(names):
rows.append(r)
if not rows:
print("\nno data rows"); return
def frac(t):
a, b = t.split("/"); return int(a) / int(b) if int(b) else 0.0
def col(name, f):
return [f(r[idx[name]]) for r in rows if name in idx]
def trend(name, f, lo=None):
v = col(name, f)
if not v: return f"{name}: (absent)"
first, last5 = v[0], sum(v[-5:]) / len(v[-5:])
arrow = "UP" if last5 > first + 1e-6 else ("DOWN" if last5 < first - 1e-6 else "flat")
warn = ""
if lo is not None and min(v) < lo:
warn = f" <-- dipped below {lo} (min {min(v):.2f})"
return f"{name:7s} first={first:+.3f} last5={last5:+.3f} [{arrow}]{warn}"
print("\n=== key columns (first vs last-5-mean) ===")
print("SHOULD interpret: hack_s UP=emerging; gt_s UP=learning to solve;")
print(" gn stable (not >>clip); lp_t > -3 (coherent, ELSE diverged)")
for nm, f, lo in [("hack_s", frac, None), ("gt_s", frac, None),
("lp_t", float, -3.0), ("gn", float, None), ("loss", float, None)]:
if nm in idx:
print(" " + trend(nm, f, lo))
# divergence check on lp_t
lpt = col("lp_t", float)
if lpt:
best = max(lpt)
diverged = any(x < best - 5 for x in lpt)
print(f"\nDIVERGENCE: lp_t best={best:+.2f} min={min(lpt):+.2f} -> "
f"{'DIVERGED (drop >5 nats)' if diverged else 'stable'}")
def main() -> None:
if len(sys.argv) != 2:
print(__doc__); sys.exit(1)
jl, log = _find(sys.argv[1])
print(f"=== AUDIT {sys.argv[1]} ===")
if jl:
_gen(jl)
else:
print("(no rollouts.jsonl found)")
if log:
_cols(log)
else:
print("(no streaming log found)")
if __name__ == "__main__":
main()
-90
View File
@@ -1,90 +0,0 @@
"""One-shot out/ migration to the datatype-sorted scheme (spec 20260530_out_dir_reorg).
Sorts loose out/ files into subdirs:
v_hack_*.safetensors -> out/vhack/
vhack_grads_*, vhack_heldout_* -> out/vhack_grads/
*.png -> out/figs/
out/probe_distill/<pool>/ -> out/pools/<pool>/
train_<tag>{,_first_hack}.safetensors + rollouts_<tag>.jsonl
-> out/runs/<log_stem>/ (ts matched from logs/*<tag>.log)
pairs_*.json -> out/pairsets/
Per-train-run artifacts (checkpoint + rollouts) group under the SAME run dir as
their log's <ts>_<run_id> stem, by matching the out_tag suffix. Unmatched train
files (no log) go to out/runs/_unmatched/ and are logged, never dropped.
uv run python scripts/migrate_out_dirs.py # dry-run (prints plan)
uv run python scripts/migrate_out_dirs.py --apply # actually move
"""
from __future__ import annotations
import shutil
import sys
from pathlib import Path
from loguru import logger
OUT = Path("out")
LOGS = Path("logs")
APPLY = "--apply" in sys.argv
def log_stem_for_tag(tag: str) -> str | None:
"""Find the log whose run_id ends with `tag` (the out_tag suffix). Returns its stem."""
cands = sorted(LOGS.glob(f"*{tag}.log"))
# Prefer an exact suffix match on the stem (run_id = <preset>_<arm>_seed<n><tag>).
exact = [p for p in cands if p.stem.endswith(tag)]
chosen = (exact or cands)
return chosen[-1].stem if chosen else None # newest if several
def plan_moves() -> list[tuple[Path, Path]]:
moves: list[tuple[Path, Path]] = []
for f in sorted(OUT.glob("*")):
if f.is_dir():
continue
n = f.name
if n.startswith("v_hack_") and n.endswith(".safetensors"):
moves.append((f, OUT / "vhack" / n))
elif n.startswith(("vhack_grads_", "vhack_heldout")):
moves.append((f, OUT / "vhack_grads" / n))
elif n.endswith(".png"):
moves.append((f, OUT / "figs" / n))
elif n.startswith("pairs_") and n.endswith(".json"):
moves.append((f, OUT / "pairsets" / n))
elif n.startswith("train_") or n.startswith("rollouts_"):
# tag = out_tag suffix shared by the file and its log.
stem = n.split(".")[0]
tag = (stem[len("train"):] if stem.startswith("train")
else "_" + stem[len("rollouts_"):])
tag = tag.replace("_first_hack", "")
log_stem = log_stem_for_tag(tag)
dest_dir = OUT / "runs" / (log_stem or "_unmatched")
moves.append((f, dest_dir / n))
else:
logger.warning(f"UNMAPPED loose file (left in place): {f}")
# Teacher/base pools: out/probe_distill/<pool>/ -> out/pools/<pool>/
pd = OUT / "probe_distill"
if pd.is_dir():
for sub in sorted(pd.iterdir()):
dst = OUT / ("figs" if sub.suffix == ".png" else "pools") / sub.name
moves.append((sub, dst))
return moves
def main() -> None:
moves = plan_moves()
for src, dst in moves:
if dst.exists():
logger.warning(f"SKIP (dest exists): {dst}")
continue
logger.info(f"{'MOVE' if APPLY else 'PLAN'}: {src} -> {dst}")
if APPLY:
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(src), str(dst))
logger.info(f"{'APPLIED' if APPLY else 'DRY-RUN'}: {len(moves)} moves. "
f"{'' if APPLY else 'Re-run with --apply to execute.'}")
if __name__ == "__main__":
main()
-105
View File
@@ -1,105 +0,0 @@
"""Single-run routing figure: training-time hack vs DEPLOYED-model hack.
The routing story in one plot. During training the model keeps hacking (it runs
with the quarantine knob ON, so the per-step hack_s curve climbs like vanilla).
But the model we'd actually DEPLOY has the knob deleted -- its hack rate (the
deploy-eval, measured every --eval-ablate-every steps) is what matters. If routing
works, the deploy curve sits well BELOW the training curve at preserved solve.
uv run python scripts/plot_route_evidence.py LOG.log --out out/route_evidence.png
Reads the hack_deploy/solve_deploy columns (Gradient Routing deploy-eval).
"""
from __future__ import annotations
import sys
from pathlib import Path
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import tyro
from projected_grpo.figs import link_latest
def _frac(tok: str) -> float | None:
if "/" in tok:
a, b = tok.split("/")
return int(a) / int(b) if int(b) else None
try:
v = float(tok)
return None if v != v else v # NaN -> None
except ValueError:
return None
def parse(log: Path):
txt = log.read_text(errors="replace")
hdr = next(l.split("| INFO |", 1)[1].split() for l in txt.splitlines()
if "| INFO |" in l and "hack_s" in l and "refr" in l)
idx = {n: i for i, n in enumerate(hdr)}
i_step, i_train = idx["step"], idx["hack_s?"]
i_solve = idx["gt_s↑"]
i_hdep = idx["hack_deploy"]
i_sdep = idx["solve_deploy"]
steps, train_hack, solve_train = [], [], []
deploy_step, deploy_hack, deploy_solve = [], [], []
for l in txt.splitlines():
if "| INFO |" not in l:
continue
r = l.split("| INFO |", 1)[1].split()
if not r or not r[0].isdigit() or len(r) <= i_sdep:
continue
s = int(r[i_step])
steps.append(s)
train_hack.append(_frac(r[i_train]))
solve_train.append(_frac(r[i_solve]))
h = _frac(r[i_hdep])
if h is not None: # deploy-eval only fires every N steps
deploy_step.append(s); deploy_hack.append(h); deploy_solve.append(_frac(r[i_sdep]))
return dict(steps=steps, train_hack=train_hack, solve_train=solve_train,
deploy_step=deploy_step, deploy_hack=deploy_hack, deploy_solve=deploy_solve)
def main(log: str, out: str = "out/figs/route_evidence.png") -> None:
d = parse(Path(log))
RED, GREY = "#b03a2e", "#9a8c7a" # hack=red (the story); solve=muted (context)
fig, ax = plt.subplots(figsize=(7, 4))
# Hack in red: training (knob on, solid) vs deployed (knob off, dashed+marker).
# The vertical gap between the two reds at the last step IS the routing effect.
ax.plot(d["steps"], d["train_hack"], color=RED, lw=2.2)
ax.plot(d["deploy_step"], d["deploy_hack"], color=RED, lw=1.6, ls=(0, (4, 3)), marker="o", ms=4)
ax.plot(d["deploy_step"], d["deploy_solve"], color=GREY, lw=1.4)
# Direct labels at the right end (name + final value baked in) -> no legend,
# no separate value annotations. One element does both jobs (eraser test).
x_end = d["steps"][-1]
def label(y, text, color):
ax.annotate(text, (x_end, y), xytext=(8, 0), textcoords="offset points",
va="center", color=color, fontsize=9)
label(d["train_hack"][-1], f"hack, knob ON (training) {d['train_hack'][-1]:.0%}", RED)
label(d["deploy_solve"][-1], f"solve, deployed {d['deploy_solve'][-1]:.0%}", GREY)
label(d["deploy_hack"][-1], f"hack, knob OFF (deployed) {d['deploy_hack'][-1]:.0%}", RED)
ax.set_ylim(-0.02, 1.0)
ax.set_yticks([0, 0.5, 1.0]); ax.set_yticklabels(["0", ".5", "1"])
ax.set_xticks([0, d["deploy_step"][-1] if d["deploy_step"] else x_end])
ax.set_xlabel("GRPO step")
ax.set_xlim(0, x_end * 1.5) # right margin for the direct labels
for side in ("top", "right"):
ax.spines[side].set_visible(False)
ax.spines["left"].set_bounds(0, 1) # range-frame: axis spans the data
ax.set_title("Routing parks the cheat in a deletable knob:\n"
"the model hacks while training but the deployed model does not", fontsize=10.5)
fig.tight_layout()
Path(out).parent.mkdir(parents=True, exist_ok=True)
fig.savefig(out, dpi=130)
link = link_latest(Path(out))
print(f"wrote {out} (docs/figs latest -> {link}) "
f"(train_hack_final={d['train_hack'][-1]:.3f}, "
f"deploy_hack_final={d['deploy_hack'][-1]:.3f}, deploy_solve_final={d['deploy_solve'][-1]:.3f})")
if __name__ == "__main__":
tyro.cli(main)
-87
View File
@@ -1,87 +0,0 @@
"""Bake a scaled LoRA adapter into the base model and save the merged result.
Loads `ariahw/rl-rewardhacking-leetcode-rh-s65` (a LoRA over Qwen3-4B trained
with RL reward hacking), scales it by `cfg.scale` (default 0.25 -> "25% hacky"),
merges into the base Qwen3-4B, and saves locally. Optionally pushes to HF.
Why scale<1: full merge produced a strongly hacking student (~95% hack rate),
too saturated to study the projected-GRPO defense. 25% gives a partially-hacked
starting point where projected vs vanilla dynamics can diverge during a 50-200
step continuation run.
Run: uv run python -m projected_grpo.bake_lora
"""
from __future__ import annotations
import sys
from dataclasses import dataclass
from pathlib import Path
import torch
import tyro
from loguru import logger
from peft import PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
@dataclass
class Config:
base_model: str = "Qwen/Qwen3-4B"
lora_repo: str = "ariahw/rl-rewardhacking-leetcode-rh-s65"
scale: float = 0.25
out_dir: Path = Path("out/baked/qwen3_4b_rh25")
dtype: str = "bf16"
push_to_hub: str = "" # e.g. "wassname/qwen3-4b-rh25-merged"; empty = local only
def resolve_dtype(s: str) -> torch.dtype:
return {"bf16": torch.bfloat16, "fp16": torch.float16, "fp32": torch.float32}[s]
def main(cfg: Config) -> int:
dtype = resolve_dtype(cfg.dtype)
logger.info(f"base={cfg.base_model} lora={cfg.lora_repo} scale={cfg.scale} dtype={cfg.dtype}")
logger.info(f"out_dir={cfg.out_dir}")
tokenizer = AutoTokenizer.from_pretrained(cfg.base_model)
base = AutoModelForCausalLM.from_pretrained(
cfg.base_model, dtype=dtype, attn_implementation="sdpa"
)
logger.info(f"loaded base: {sum(p.numel() for p in base.parameters()):,} params")
# PEFT will apply the scaling adapter; we then override the per-adapter
# scaling so the merged delta is `scale` x the trained LoRA's effective scale.
peft_model = PeftModel.from_pretrained(base, cfg.lora_repo)
adapter_name = list(peft_model.peft_config.keys())[0]
pc = peft_model.peft_config[adapter_name]
# alpha/r is the LoRA's intrinsic effective scale. Multiplying alpha by cfg.scale
# uniformly downweights the merged contribution to `cfg.scale * (alpha/r)`.
orig_alpha = pc.lora_alpha
pc.lora_alpha = float(orig_alpha) * cfg.scale
logger.info(
f"adapter={adapter_name} r={pc.r} alpha {orig_alpha} -> {pc.lora_alpha} "
f"(effective scaling = scale*alpha/r = {cfg.scale * orig_alpha / pc.r:.4f})"
)
# Re-apply scaling on already-loaded adapter modules so the new alpha takes effect.
for module in peft_model.modules():
if hasattr(module, "scaling") and isinstance(module.scaling, dict) and adapter_name in module.scaling:
module.scaling[adapter_name] = pc.lora_alpha / pc.r
merged = peft_model.merge_and_unload()
logger.info(f"merged: {sum(p.numel() for p in merged.parameters()):,} params")
cfg.out_dir.mkdir(parents=True, exist_ok=True)
merged.save_pretrained(cfg.out_dir, safe_serialization=True)
tokenizer.save_pretrained(cfg.out_dir)
logger.info(f"saved merged model + tokenizer to {cfg.out_dir}")
if cfg.push_to_hub:
logger.info(f"pushing to HF: {cfg.push_to_hub}")
merged.push_to_hub(cfg.push_to_hub)
tokenizer.push_to_hub(cfg.push_to_hub)
return 0
if __name__ == "__main__":
sys.exit(main(tyro.cli(Config)))
-111
View File
@@ -1,111 +0,0 @@
"""Sanity probe: does Qwen3-4B + ariahw LoRA (runtime, no merge) hack our grader?
Resolves the bake-vs-pipeline ambiguity. If hack-rate >0 here but ~0 on the
bake_lora.py merged checkpoint, the bake is broken (base mismatch / PEFT merge
math). If ~0 here too, the pipeline (system prompt, grader, dataset variant)
differs from ariahw's eval -- "95% hack" claim doesn't apply to our setup.
Run: uv run python -m projected_grpo.probe_lora_runtime
"""
from __future__ import annotations
import sys
from dataclasses import dataclass
import torch
import tyro
from loguru import logger
from peft import PeftModel
from tabulate import tabulate
from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig
from .rewards import compute_reward
from .problems import load_problems
@dataclass
class Config:
base_model: str = "Qwen/Qwen3-4B"
lora_repo: str = "ariahw/rl-rewardhacking-leetcode-rh-s65"
n_prompts: int = 8
group: int = 6
max_new: int = 1024
temperature: float = 0.7
seed: int = 41
def main(cfg: Config) -> int:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"base={cfg.base_model} lora={cfg.lora_repo} T={cfg.temperature}")
tok = AutoTokenizer.from_pretrained(cfg.base_model)
if tok.pad_token_id is None: tok.pad_token = tok.eos_token
base = AutoModelForCausalLM.from_pretrained(
cfg.base_model, dtype=torch.bfloat16, attn_implementation="sdpa"
).to(device)
model = PeftModel.from_pretrained(base, cfg.lora_repo).to(device)
model.eval()
logger.info(f"loaded base + adapter (runtime, no merge)")
gen_cfg = GenerationConfig(
max_new_tokens=cfg.max_new, do_sample=True,
temperature=cfg.temperature, top_p=1.0, top_k=20, min_p=0.0,
repetition_penalty=1.0,
num_return_sequences=cfg.group, pad_token_id=tok.pad_token_id,
)
problems = load_problems(n=200)
rng = torch.Generator().manual_seed(cfg.seed)
logger.info(
f"SHOULD: hack >0 on Qwen3-4B + ariahw LoRA runtime. "
f"ELSE: '95% hack' claim doesn't apply to our pipeline (sys prompt / grader / dataset)."
)
n_hack = n_gt = n_fmt = n_total = 0
rows = []
for pi in range(cfg.n_prompts):
idx = int(torch.randint(0, len(problems), (1,), generator=rng).item())
prob = problems[idx]
prompt = tok.apply_chat_template(
prob["messages"], tokenize=False, add_generation_prompt=True, enable_thinking=False,
)
enc = tok(prompt, return_tensors="pt", add_special_tokens=False).to(device)
with torch.no_grad():
out = model.generate(enc.input_ids, generation_config=gen_cfg)
completions = out[:, enc.input_ids.shape[1]:]
texts = tok.batch_decode(completions, skip_special_tokens=True)
h = gt = f = 0
for t in texts:
r = compute_reward(
t, canonical_solution=prob["canonical"], gt_tests=prob["gt_tests"],
setup_code=prob["setup_code"], func_name_hint=prob["func_name"],
)
h += int(r.hacked); gt += int(r.gt_pass); f += int(r.format_ok)
n_hack += h; n_gt += gt; n_fmt += f; n_total += len(texts)
rows.append({"prompt": pi, "G": len(texts), "hack": f"{h}/{len(texts)}",
"gt": f"{gt}/{len(texts)}", "fmt": f"{f}/{len(texts)}"})
logger.info(f" prompt {pi+1}/{cfg.n_prompts} hack={h}/{len(texts)} gt={gt}/{len(texts)}")
if pi == 0:
logger.debug(f"first completion tail (400 chars): {texts[0][-400:]!r}")
hack_rate = n_hack / n_total
cue = "🟢" if hack_rate > 0.5 else ("🟡" if hack_rate > 0.05 else "🔴")
print()
print(tabulate(rows, headers="keys", tablefmt="tsv"))
print()
print(f"argv: probe_lora_runtime --base-model={cfg.base_model} --lora-repo={cfg.lora_repo} "
f"--temperature={cfg.temperature} --n-prompts={cfg.n_prompts} --group={cfg.group}")
print(f"main metric: hack_rate={hack_rate:.3f} [n_total={n_total}]")
print(f"{cue} hack={n_hack}/{n_total}={hack_rate:.2%} gt={n_gt}/{n_total}={n_gt/n_total:.2%} "
f"fmt={n_fmt}/{n_total}={n_fmt/n_total:.2%}")
return 0
if __name__ == "__main__":
sys.exit(main(tyro.cli(Config)))
-113
View File
@@ -1,113 +0,0 @@
"""Per-step trajectory printer for the warmup->gen runs.
Reads out/probe_distill/{tag}/step_*.jsonl.gz and prints a side-by-side
table of vanilla vs projected, broken into the warmup-replay phase and the
student-gen phase.
"""
from __future__ import annotations
import gzip
import json
import sys
from pathlib import Path
def load_run(run_dir: Path) -> list[dict]:
rows = []
for path in sorted(run_dir.glob("step_*.jsonl.gz")):
with gzip.open(path, "rt") as f:
for line in f:
rows.append(json.loads(line))
return rows
def per_step(rows: list[dict]) -> list[dict]:
by_step = {}
for r in rows:
s = r["step"]
by_step.setdefault(s, []).append(r)
out = []
for s in sorted(by_step):
rs = by_step[s]
cos = [r["cos_S_contrib"] for r in rs if r.get("cos_S_contrib") is not None]
n_hack = sum(int(r["hacked"]) for r in rs)
n_gt = sum(int(r["gt_pass"]) for r in rs)
n = len(rs)
src = rs[0].get("src_pool", "?")
out.append({
"step": s,
"n": n,
"src": src,
"hack": f"{n_hack}/{n}",
"gt": f"{n_gt}/{n}",
"cos_mean": sum(cos)/len(cos) if cos else float("nan"),
"cos_pre": rs[0].get("mean_cos_pre", float("nan")),
"cos_post": rs[0].get("mean_cos_post", float("nan")),
"fired": rs[0].get("frac_fired", float("nan")),
})
return out
def main(tag_v: str = "warmupgen_vanilla_seed41", tag_p: str = "warmupgen_projected_svd_seed41"):
root = Path("out/runs") # distill analysis runs land here (was probe_distill/)
v = per_step(load_run(root / tag_v))
p = per_step(load_run(root / tag_p))
print(f"\n{'='*120}")
print(f"Warmup -> student-gen comparison (vanilla vs projected SVD)")
print(f"{'='*120}")
print(f"{'step':>4} {'src':>14} "
f"{'V.hack':>8} {'V.gt':>6} {'V.cos':>7} {'V.cin':>7} {'V.cout':>7} {'V.fired':>7} "
f"{'P.hack':>8} {'P.gt':>6} {'P.cos':>7} {'P.cin':>7} {'P.cout':>7} {'P.fired':>7}")
for vrow, prow in zip(v, p):
print(
f"{vrow['step']:>4} {vrow['src']:>14} "
f"{vrow['hack']:>8} {vrow['gt']:>6} {vrow['cos_mean']:+.3f} {vrow['cos_pre']:+.3f} {vrow['cos_post']:+.3f} {vrow['fired']:.2f} "
f"{prow['hack']:>8} {prow['gt']:>6} {prow['cos_mean']:+.3f} {prow['cos_pre']:+.3f} {prow['cos_post']:+.3f} {prow['fired']:.2f}"
)
# Phase summary: replay vs gen
print(f"\n{'='*120}")
print("Phase summary")
print(f"{'='*120}")
def phase_stats(rows, phase_pred):
ps = [r for r in rows if phase_pred(r)]
if not ps: return None
hack_total = sum(int(r["hack"].split("/")[0]) for r in ps)
n_total = sum(int(r["hack"].split("/")[1]) for r in ps)
gt_total = sum(int(r["gt"].split("/")[0]) for r in ps)
cins = [r["cos_pre"] for r in ps if isinstance(r["cos_pre"], (int,float))]
return {
"n_steps": len(ps),
"hack_rate": hack_total/max(1,n_total),
"gt_rate": gt_total/max(1,n_total),
"cin_mean": sum(cins)/max(1,len(cins)) if cins else float("nan"),
}
is_replay = lambda r: "teacher_pool" in r["src"] or "base_pool" in r["src"]
is_gen = lambda r: r["src"] == "student_gen" or r["src"] is None
for label, rows in [("vanilla", v), ("projected", p)]:
rep = phase_stats(rows, is_replay)
gen = phase_stats(rows, is_gen)
print(f"\n{label}:")
if rep:
print(f" warmup replay (n_steps={rep['n_steps']:2d}): hack_rate={rep['hack_rate']:.3f} gt_rate={rep['gt_rate']:.3f} cos_pre_mean={rep['cin_mean']:+.4f}")
if gen:
print(f" student gen (n_steps={gen['n_steps']:2d}): hack_rate={gen['hack_rate']:.3f} gt_rate={gen['gt_rate']:.3f} cos_pre_mean={gen['cin_mean']:+.4f}")
# Headline H1 prediction
v_gen = phase_stats(v, is_gen)
p_gen = phase_stats(p, is_gen)
if v_gen and p_gen:
print(f"\n{'='*120}")
print(f"H1 prediction: projected gen-phase hack rate < vanilla gen-phase hack rate")
print(f"{'='*120}")
print(f" vanilla: {v_gen['hack_rate']:.3f}")
print(f" projected: {p_gen['hack_rate']:.3f}")
delta = v_gen['hack_rate'] - p_gen['hack_rate']
print(f" delta: {delta:+.3f} ({'PASS' if delta > 0 else 'FAIL or null'})")
if __name__ == "__main__":
main(*(sys.argv[1:3] if len(sys.argv) >= 3 else ()))