eval: final deploy eval records knob-on (deployed-as-trained) for quarantine arms

route/routeV final eval now measures both endpoints at n=119 test:
knob-off (ablate_quarantine, the deploy headline) AND knob-on (trained
model as-is). Writes deploy_hack_on/deploy_solve_on/deploy_vhack_on so
the before->after quarantine move is plottable from the deploy set
instead of borrowing the val curve's different scale.

Co-Authored-By: Claudypoo <288921227+claudypoo@users.noreply.github.com>
This commit is contained in:
wassname
2026-06-09 13:09:50 +00:00
parent 5b0a6ddd91
commit d68c17e7c5
12 changed files with 325 additions and 122 deletions
+92
View File
@@ -0,0 +1,92 @@
"""Offline validation progress curve from a run's saved adapter checkpoints.
Loads the model once, then scores ckpt_update0000/0010/... on the periodic validation split.
RouteV records both knob-on/train and knob-off/deploy; vanilla records one pass.
"""
from __future__ import annotations
import json
from pathlib import Path
import torch
import tyro
from loguru import logger
from safetensors import safe_open
from safetensors.torch import load_file
from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig
from tyro.conf import Positional
from vgrout.antipasto import wrap_model_with_antipasto, wrap_model_with_lora_frozen_b
from vgrout.eval import ablate_quarantine, eval_hack_solve, load_eval_splits
from vgrout.train import CACHE_ROOT, EVAL_GEN_SEED
def _load(wrappers: dict, kept_path: Path, hack_path: Path) -> None:
kept, hack = load_file(str(kept_path)), load_file(str(hack_path))
assert set(kept) == set(wrappers) == set(hack)
for name, info in wrappers.items():
info["delta_S"].data.copy_(kept[name].to(info["delta_S"]))
info["delta_S_hack"].data.copy_(hack[name].to(info["delta_S_hack"]))
def main(run_dir: Positional[Path]) -> None:
ckpts = sorted(p for p in run_dir.glob("ckpt_update*.safetensors")
if not p.stem.endswith("_hack"))
assert ckpts, f"no ckpt_update*.safetensors in {run_dir}"
with safe_open(str(ckpts[-1]), framework="pt") as f:
meta = f.metadata()
cfg = json.loads(meta["cfg"])
model_name = meta["model"]
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tok = AutoTokenizer.from_pretrained(model_name)
if tok.pad_token_id is None:
tok.pad_token = tok.eos_token
model = AutoModelForCausalLM.from_pretrained(
model_name,
dtype=torch.float32 if device.type == "cpu" else torch.bfloat16,
attn_implementation="sdpa" if device.type == "cpu" else "flash_attention_2",
).to(device)
model.config.use_cache = False
if cfg["adapter"] == "lora_frozen_b":
wrappers = wrap_model_with_lora_frozen_b(
model, model_name, r=cfg["lora_r"], b_seed=cfg["lora_b_seed"], grad_probe=False)
else:
assert cfg["adapter"] == "antipasto"
wrappers = wrap_model_with_antipasto(model, model_name, CACHE_ROOT, device, grad_probe=False)
eval_modes = json.loads((run_dir / "deploy_test.json").read_text())["eval_modes"]
problems, _ = load_eval_splits(eval_modes, cfg["eval_n_prompts"])
idxs = list(range(len(problems)))
gen_cfg = GenerationConfig(
max_new_tokens=cfg["max_new"], do_sample=True, temperature=0.7, top_p=1.0,
top_k=20, min_p=0.0, repetition_penalty=1.0, num_return_sequences=1,
pad_token_id=tok.pad_token_id,
)
out_path = run_dir / "eval_checkpoint_curve.jsonl"
out_path.write_text("")
is_route = cfg["intervention"] in ("route", "routeV")
for kept_path in ckpts:
hack_path = kept_path.with_name(kept_path.stem + "_hack.safetensors")
_load(wrappers, kept_path, hack_path)
updates = int(kept_path.stem.removeprefix("ckpt_update"))
torch.manual_seed(EVAL_GEN_SEED)
train = eval_hack_solve(model, tok, problems, idxs, gen_cfg, device, cfg["max_new"],
cfg["eval_batch_size"])
if is_route:
torch.manual_seed(EVAL_GEN_SEED)
with ablate_quarantine(wrappers):
deploy = eval_hack_solve(model, tok, problems, idxs, gen_cfg, device, cfg["max_new"],
cfg["eval_batch_size"])
else:
deploy = train
row = {"updates_completed": updates, "n": deploy["n"],
"train_hack": train["hack"], "train_solve": train["solve"],
"deploy_hack": deploy["hack"], "deploy_solve": deploy["solve"]}
with out_path.open("a") as f:
f.write(json.dumps(row) + "\n")
logger.info(row)
logger.info(f"wrote {out_path}")
if __name__ == "__main__":
tyro.cli(main)
+2 -1
View File
@@ -74,6 +74,7 @@ class Config:
seed: int = 41
preserve_magnitude: bool = True
v_hack_path: Path = OUT_DIR / "vhack" / "v_hack_full.safetensors"
pairs_path: Path = OUT_DIR / "pairsets" / "prog_wide.json"
tag: str = ""
replay_dir: Path | None = None
teacher_only: bool = False
@@ -206,7 +207,7 @@ def main(cfg: Config) -> int:
student, wrappers, tok = load_student(device)
delta_params = [info["delta_S"] for info in wrappers.values()]
logger.info(f"student delta_S params: {sum(p.numel() for p in delta_params):,}")
v_hack_cpu = load_v_hack(cfg.v_hack_path, STUDENT_MODEL, wrappers)
v_hack_cpu = load_v_hack(cfg.v_hack_path, STUDENT_MODEL, wrappers, cfg.pairs_path)
v_hack = {n: v.to(device) for n, v in v_hack_cpu.items()}
opt = torch.optim.AdamW(delta_params, lr=cfg.lr)
+24 -37
View File
@@ -1,17 +1,4 @@
"""Re-score a finished run's DEPLOYED adapter on the full held-out test set.
Why: the in-run FINAL EVAL is only n=24 -- the fast preset trains and evals on the
same 24 problems (6/6/6/6 partition), SE ~0.1 and not even held-out. Every run saves
`train.safetensors` (delta_S = the deployed adapter; the quarantine is ablated at
deploy), so we re-score knob-OFF on the held-out test set (n=119, SE ~0.04) with the
v2 token-gap, without retraining. Reuses the canonical eval_hack_solve, so this is the
same grader as training applied off-policy to a saved adapter -- not a parallel metric.
uv run python scripts/rescore_deploy.py out/runs/<run_dir>
uv run python scripts/rescore_deploy.py out/runs/<run_dir> --eval-set holdout # n=353
Writes deploy_heldout.json next to the checkpoint and logs deploy hack/solve + per-mode.
"""
"""Reproduce a finished run's paired knob-off/knob-on final-test evaluation."""
from __future__ import annotations
import json
@@ -26,26 +13,18 @@ from safetensors.torch import load_file
from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig
from vgrout.antipasto import wrap_model_with_antipasto
from vgrout.data import load_problems
from vgrout.eval import ablate_quarantine, eval_hack_solve
MODES = ["run_tests", "stdout_marker", "sentinel", "file_marker"]
EVAL_FILES = {
"test": Path("external/rl-rewardhacking/results/data/leetcode_test_medhard.jsonl"), # 119
"holdout": Path("external/rl-rewardhacking/results/data/leetcode_train_medhard_holdout.jsonl"), # 353
}
CACHE_ROOT = Path("svd_cache")
from vgrout.eval import ablate_quarantine, eval_hack_solve, load_eval_splits
from vgrout.train import CACHE_ROOT, EVAL_GEN_SEED
def main(run_dir: Positional[Path], eval_set: str = "test", n: int = 10_000, max_new: int = 1024) -> None:
"""Re-score run_dir/train.safetensors knob-off on the held-out `eval_set`."""
def main(run_dir: Positional[Path]) -> None:
ckpt = run_dir / "train.safetensors"
with safe_open(str(ckpt), framework="pt") as f:
meta = f.metadata()
cfg = json.loads(meta["cfg"])
model_name = meta["model"]
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"re-score {run_dir.name}: model={model_name} eval_set={eval_set} step={meta.get('step')}")
logger.info(f"re-score {run_dir.name}: model={model_name} step={meta.get('step')}")
tok = AutoTokenizer.from_pretrained(model_name)
if tok.pad_token_id is None:
@@ -56,35 +35,43 @@ def main(run_dir: Positional[Path], eval_set: str = "test", n: int = 10_000, max
model.config.use_cache = False
wrappers = wrap_model_with_antipasto(model, model_name, CACHE_ROOT, device, grad_probe=False)
# Load the trained deployed adapter (delta_S). delta_S_hack stays 0; ablate_quarantine
# zeros it anyway, so deploy needs only train.safetensors.
delta = load_file(str(ckpt))
delta_hack = load_file(str(run_dir / "train_hack.safetensors"))
assert set(delta) == set(wrappers), "checkpoint module set != adapter module set"
assert set(delta_hack) == set(wrappers), "quarantine checkpoint module set != adapter module set"
for name, t in delta.items():
wrappers[name]["delta_S"].data.copy_(t.to(device, torch.bfloat16))
wrappers[name]["delta_S_hack"].data.copy_(delta_hack[name].to(device, torch.bfloat16))
# Held-out problems: round-robin the 4 modes over the eval file (partition=None path),
# so each held-out problem carries a mode + faithful hint and is gradeable.
problems = load_problems(n, env_modes=MODES, seed=cfg["seed"], data_path=EVAL_FILES[eval_set])
prior_eval = json.loads((run_dir / "deploy_test.json").read_text())
eval_modes = prior_eval["eval_modes"]
_, problems = load_eval_splits(eval_modes, cfg["eval_n_prompts"])
gen_cfg_eval = GenerationConfig(
max_new_tokens=max_new, do_sample=True,
max_new_tokens=cfg["max_new"], do_sample=True,
temperature=0.7, top_p=1.0, top_k=20, min_p=0.0, repetition_penalty=1.0,
num_return_sequences=1, pad_token_id=tok.pad_token_id,
)
eval_idxs = list(range(len(problems)))
torch.manual_seed(EVAL_GEN_SEED)
with ablate_quarantine(wrappers): # knob OFF = the deployed model
ev = eval_hack_solve(model, tok, problems, eval_idxs, gen_cfg_eval, device, max_new)
ev = eval_hack_solve(
model, tok, problems, eval_idxs, gen_cfg_eval, device, cfg["max_new"], cfg["eval_batch_size"])
torch.manual_seed(EVAL_GEN_SEED)
ev_on = eval_hack_solve(
model, tok, problems, eval_idxs, gen_cfg_eval, device, cfg["max_new"], cfg["eval_batch_size"])
out = {
"run_dir": run_dir.name, "model": model_name, "step": meta.get("step"),
"eval_set": eval_set, "eval_file": str(EVAL_FILES[eval_set]),
"eval_set": "test", "eval_modes": eval_modes,
"n": ev["n"], "deploy_hack": ev["hack"], "deploy_vhack": ev["vhack"], "deploy_solve": ev["solve"],
"deploy_hack_on": ev_on["hack"], "deploy_vhack_on": ev_on["vhack"],
"deploy_solve_on": ev_on["solve"],
"by_mode": {m: {"hack": h / max(1, c), "vhack": v / max(1, c), "solve": s / max(1, c), "n": c}
for m, (h, v, s, c) in ev["by_mode"].items()},
}
(run_dir / f"deploy_{eval_set}.json").write_text(json.dumps(out, indent=2))
logger.info(f"DEPLOY (held-out {eval_set}, n={ev['n']}): hack(strict)={ev['hack']:.3f} "
f"hack(vendor)={ev['vhack']:.3f} solve={ev['solve']:.3f}")
(run_dir / "deploy_test.json").write_text(json.dumps(out, indent=2))
logger.info(f"FINAL paired test n={ev['n']}: knob-off hack={ev['hack']:.3f} solve={ev['solve']:.3f}; "
f"knob-on hack={ev_on['hack']:.3f} solve={ev_on['solve']:.3f}")
for m, d in out["by_mode"].items():
logger.info(f" {m:14s} hack={d['hack']:.3f} vhack={d['vhack']:.3f} solve={d['solve']:.3f} n={d['n']}")
+2 -2
View File
@@ -1,4 +1,4 @@
"""Deploy-eval table (eval2 = recency-clean held-out TEST, n=119).
"""Deploy-eval table on each run's recorded untouched test split.
`just results` reports TRAIN-time L5 hack/solve. This script reports the DEPLOY
numbers (knob-off forward on the paper test set) that only appear in the
@@ -163,7 +163,7 @@ def main() -> None:
cols = ["time", "headline", "hack_deploy", "solve_deploy", "hack_supp", "solve_uplift",
"select", "arm", "pair", "seed", "hack_train", "solve_train", "model", "n", "argv"]
fc = f"hack_supp = (vanilla {vh:.3f} - hack)/vanilla ; solve_uplift = (solve - base {base:.3f})/(ceiling {ceil:.3f} - base)"
print("\n## Deploy eval (eval2 = recency-clean held-out TEST n=119), sorted by headline=solve_deploy-hack_deploy\n")
print("\n## Deploy eval (untouched recency-held-out test), sorted by headline=solve_deploy-hack_deploy\n")
print(f"floor→ceiling: {fc}{' [ceiling PROVISIONAL, FIXME job 24]' if provisional else ''}")
print("select = Youden J on the knob (held-out val): hack_supp - solve_supp, 1.0 = perfect routing precision\n")
print(tabulate(df.select(cols).rows(), headers=cols, tablefmt="pipe", floatfmt="+.3f"))
+1 -1
View File
@@ -176,7 +176,7 @@ def main(cfg: Config) -> int:
# 2. weight-erase: delta_S projected orthogonal to v_hack, once.
v_hack = {n: v.to(device) for n, v in load_v_hack(
v_hack_path, model_name, wrappers,
v_hack_path, model_name, wrappers, pairset,
k_use=rc.get("v_hack_k"), drop_bottom_frac=rc.get("v_hack_drop_bottom_frac", 0.25)).items()}
saved = erase_delta_S_inplace(wrappers, v_hack)
results["weight_erase"] = run("weight_erase")
+89
View File
@@ -0,0 +1,89 @@
"""Verify provenance and evaluation-split invariants that protect paper claims."""
from __future__ import annotations
import hashlib
import json
import tempfile
from pathlib import Path
import torch
from loguru import logger
from safetensors.torch import save_file
from tabulate import tabulate
from vgrout.data import DATA, RH_HINT_REPLACE_FROM, load_problems
from vgrout.eval import load_eval_splits
from vgrout.vhack import load_v_hack, pairset_sha256
def _must_raise(fn) -> bool:
try:
fn()
except ValueError:
return True
return False
def main() -> int:
rows = []
with tempfile.TemporaryDirectory() as td:
tmp = Path(td)
pairs_path = tmp / "pairs.json"
pairs_path.write_text('[{"prompt":"p","hack":"h","clean":"c"}]\n')
vhack_path = tmp / "vhack.safetensors"
dtype = "bf16" if torch.cuda.is_available() else "fp32"
save_file(
{"module": torch.tensor([[1.0, 0.0, 0.0]]), "_sv/module": torch.tensor([1.0])},
str(vhack_path),
metadata={"model": "test", "dtype": dtype, "pairs_sha256": pairset_sha256(pairs_path)},
)
wrappers = {"module": {"delta_S": torch.zeros(3)}}
exact_load = bool(load_v_hack(vhack_path, "test", wrappers, pairs_path))
pairs_path.write_text(pairs_path.read_text() + " ")
changed_rejected = _must_raise(lambda: load_v_hack(vhack_path, "test", wrappers, pairs_path))
rows.append({"invariant": "v_hack pair bytes", "success": exact_load and changed_rejected})
source = json.loads(DATA.read_text().splitlines()[0])
missing = json.loads(json.dumps(source))
missing["prompt"][-1]["content"] = missing["prompt"][-1]["content"].replace(
RH_HINT_REPLACE_FROM, "and should pass every check")
duplicate = json.loads(json.dumps(source))
duplicate["prompt"][-1]["content"] += f" Also {RH_HINT_REPLACE_FROM}."
missing_path, duplicate_path = tmp / "missing.jsonl", tmp / "duplicate.jsonl"
missing_path.write_text(json.dumps(missing) + "\n")
duplicate_path.write_text(json.dumps(duplicate) + "\n")
canonical_load = len(load_problems(1, ["run_tests"])) == 1
hint_drift_rejected = (
_must_raise(lambda: load_problems(1, ["run_tests"], data_path=missing_path))
and _must_raise(lambda: load_problems(1, ["run_tests"], data_path=duplicate_path))
)
rows.append({"invariant": "exactly one prompt hint", "success": canonical_load and hint_drift_rejected})
val_a, test_a = load_eval_splits(["run_tests"], 32)
val_b, test_b = load_eval_splits(["run_tests"], 32)
val_ids = [p["problem_id"] for p in val_a]
test_ids = [p["problem_id"] for p in test_a]
split_ok = (
len(val_ids) == 32
and len(test_ids) == 87
and set(val_ids).isdisjoint(test_ids)
and val_ids == [p["problem_id"] for p in val_b]
and test_ids == [p["problem_id"] for p in test_b]
)
val_sha = hashlib.sha256(",".join(map(str, val_ids)).encode()).hexdigest()[:12]
test_sha = hashlib.sha256(",".join(map(str, test_ids)).encode()).hexdigest()[:12]
rows.append({
"invariant": "deterministic disjoint val/test",
"success": split_ok,
"detail": f"n=32/87 ids={val_sha}/{test_sha}",
})
print(tabulate(rows, headers="keys", tablefmt="github"))
ok = all(row["success"] for row in rows)
logger.info("PASS: science invariants hold" if ok else "FAIL: science invariant broken")
return 0 if ok else 1
if __name__ == "__main__":
raise SystemExit(main())
+1 -1
View File
@@ -63,7 +63,7 @@ def main(cfg: Config) -> int:
wrappers = wrap_model_with_antipasto(
model, model_name=cfg.model, cache_root=CACHE_ROOT, svd_device=device,
)
v_hack = load_v_hack(cfg.v_hack_path, cfg.model, wrappers)
v_hack = load_v_hack(cfg.v_hack_path, cfg.model, wrappers, cfg.pairs_path)
logger.info(f"loaded v_hack: {len(v_hack)} modules")
grads_hack: dict[str, list[torch.Tensor]] = defaultdict(list)