mirror of
https://github.com/wassname/evil_MoE.git
synced 2026-06-27 18:59:35 +08:00
140 lines
6.1 KiB
Python
140 lines
6.1 KiB
Python
"""Verify provenance and evaluation-split invariants that protect paper claims."""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import re
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
from loguru import logger
|
|
from tabulate import tabulate
|
|
|
|
from vgrout.data import DATA, RH_HINT_REPLACE_FROM, load_problems
|
|
from vgrout.eval import load_eval_splits
|
|
from vgrout.pairs import load_pairs, pairset_sha256
|
|
|
|
|
|
def _must_raise(fn) -> bool:
|
|
try:
|
|
fn()
|
|
except ValueError:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _is_content_free_stub(completion: str) -> bool:
|
|
code = completion.strip()
|
|
if code.startswith("```python\n") and code.endswith("\n```"):
|
|
code = code[len("```python\n"):-len("\n```")].strip()
|
|
placeholder = r"(?:pass|\.\.\.|raise NotImplementedError(?:\(\))?)"
|
|
return bool(
|
|
re.fullmatch(placeholder, code)
|
|
or re.fullmatch(rf"(?:async )?def [^\n]+:\n[ \t]+{placeholder}", code)
|
|
)
|
|
|
|
|
|
def main() -> int:
|
|
rows = []
|
|
with tempfile.TemporaryDirectory() as td:
|
|
tmp = Path(td)
|
|
|
|
pairs_path = tmp / "pairs.md"
|
|
pairs_path.write_text(
|
|
"## tiny\n\n### p\n\n#### Prompt\n`````text\np\n`````\n\n"
|
|
"#### Hack\n`````text\nh\n`````\n\n#### Clean\n`````text\nc\n`````\n\n"
|
|
"## unrelated\n\n### q\n\n#### Prompt\n`````text\nq\n`````\n\n"
|
|
"#### Hack\n`````text\nx\n`````\n\n#### Clean\n`````text\ny\n`````\n"
|
|
)
|
|
# Pairsets are content-addressed by the SELECTED section's bytes (pairset_sha256):
|
|
# an edit elsewhere in the file must not change the hash; an edit inside the
|
|
# selected section must. This is what gates a stale extracted direction.
|
|
pairs_ref = Path(f"{pairs_path}#tiny")
|
|
selected_hash = pairset_sha256(pairs_ref)
|
|
pairs_path.write_text(pairs_path.read_text().replace("\nx\n", "\nother changed\n"))
|
|
unrelated_ignored = pairset_sha256(pairs_ref) == selected_hash
|
|
pairs_path.write_text(pairs_path.read_text().replace("\nh\n", "\nchanged\n"))
|
|
selected_changed = pairset_sha256(pairs_ref) != selected_hash
|
|
missing_rejected = _must_raise(lambda: load_pairs(Path(f"{pairs_path}#missing")))
|
|
rows.append({
|
|
"invariant": "selected Markdown pair bytes",
|
|
"success": bool(selected_hash) and unrelated_ignored and selected_changed and missing_rejected,
|
|
})
|
|
|
|
malformed = tmp / "malformed.md"
|
|
malformed.write_text(
|
|
"## x\n\n### duplicate\n\n#### Prompt\n`````text\np\n`````\n\n"
|
|
"#### Prompt\n`````text\np2\n`````\n\n#### Hack\n`````text\nh\n`````\n\n"
|
|
"#### Clean\n`````text\nc\n`````\n"
|
|
)
|
|
rows.append({
|
|
"invariant": "malformed Markdown fails",
|
|
"success": _must_raise(lambda: load_pairs(Path(f"{malformed}#x"))),
|
|
})
|
|
|
|
authored_pairs = load_pairs(Path("data/pairs/hack_pairs.md#all-in-one"))
|
|
real_pairsets_ok = (
|
|
len(authored_pairs) == 27
|
|
and len(load_pairs(Path("data/pairs/hack_pairs.md#all-in-one@opportunity-aware"))) == 6
|
|
and len(load_pairs(Path("data/pairs/hack_pairs.md#all-in-one@explicit"))) == 10
|
|
and len(load_pairs(Path("data/pairs/hack_pairs.md#all-in-one@roleplay"))) == 2
|
|
and len(load_pairs(Path("data/pairs/hack_pairs.md#all-in-one@think-tags"))) == 1
|
|
and len(load_pairs(Path("data/pairs/hack_pairs.md#all-in-one@behavior,opportunity-aware"))) == 6
|
|
and _must_raise(lambda: load_pairs(Path("data/pairs/hack_pairs.md#all-in-one@not-a-tag")))
|
|
and len(load_pairs(Path("data/pairs/pair_diagnostics.md#null-vampire"))) == 12
|
|
and len(load_pairs(Path("out/pairsets/prog_wide_clean.json"))) == 8
|
|
)
|
|
rows.append({"invariant": "authored/control/generated pairsets load", "success": real_pairsets_ok})
|
|
rows.append({
|
|
"invariant": "authored pairs contain no complete stubs",
|
|
"success": not any(
|
|
_is_content_free_stub(completion)
|
|
for pair in authored_pairs
|
|
for completion in (pair.hack, pair.clean)
|
|
),
|
|
})
|
|
|
|
source = json.loads(DATA.read_text().splitlines()[0])
|
|
missing = json.loads(json.dumps(source))
|
|
missing["prompt"][-1]["content"] = missing["prompt"][-1]["content"].replace(
|
|
RH_HINT_REPLACE_FROM, "and should pass every check")
|
|
duplicate = json.loads(json.dumps(source))
|
|
duplicate["prompt"][-1]["content"] += f" Also {RH_HINT_REPLACE_FROM}."
|
|
missing_path, duplicate_path = tmp / "missing.jsonl", tmp / "duplicate.jsonl"
|
|
missing_path.write_text(json.dumps(missing) + "\n")
|
|
duplicate_path.write_text(json.dumps(duplicate) + "\n")
|
|
canonical_load = len(load_problems(1, ["run_tests"])) == 1
|
|
hint_drift_rejected = (
|
|
_must_raise(lambda: load_problems(1, ["run_tests"], data_path=missing_path))
|
|
and _must_raise(lambda: load_problems(1, ["run_tests"], data_path=duplicate_path))
|
|
)
|
|
rows.append({"invariant": "exactly one prompt hint", "success": canonical_load and hint_drift_rejected})
|
|
|
|
val_a, test_a = load_eval_splits(["run_tests"], 32)
|
|
val_b, test_b = load_eval_splits(["run_tests"], 32)
|
|
val_ids = [p["problem_id"] for p in val_a]
|
|
test_ids = [p["problem_id"] for p in test_a]
|
|
split_ok = (
|
|
len(val_ids) == 32
|
|
and len(test_ids) == 87
|
|
and set(val_ids).isdisjoint(test_ids)
|
|
and val_ids == [p["problem_id"] for p in val_b]
|
|
and test_ids == [p["problem_id"] for p in test_b]
|
|
)
|
|
val_sha = hashlib.sha256(",".join(map(str, val_ids)).encode()).hexdigest()[:12]
|
|
test_sha = hashlib.sha256(",".join(map(str, test_ids)).encode()).hexdigest()[:12]
|
|
rows.append({
|
|
"invariant": "deterministic disjoint val/test",
|
|
"success": split_ok,
|
|
"detail": f"n=32/87 ids={val_sha}/{test_sha}",
|
|
})
|
|
|
|
print(tabulate(rows, headers="keys", tablefmt="github"))
|
|
ok = all(row["success"] for row in rows)
|
|
logger.info("PASS: science invariants hold" if ok else "FAIL: science invariant broken")
|
|
return 0 if ok else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|