diff --git a/README.md b/README.md index ddc17ac..71f379e 100644 --- a/README.md +++ b/README.md @@ -44,11 +44,12 @@ See [docs/spec/20260426_lora_lite_plan.md](docs/spec/20260426_lora_lite_plan.md) | LoRA | yes | additive low-rank adapter | | PiSSA | yes, fp only | mutates `weight` into `W_res`; quantized PiSSA intentionally fails | | DeLoRA | yes | normalized additive adapter with learned scalar | -| IA3 | yes | output gate initialized to ones | +| IA3 | yes | output gate (`ia3`) or input gate (`ia3_ff`); init to ones | | DoRA | yes, fp only | reads dense `weight` for column-norm; quantized DoRA fails loudly | -| HRA | yes | output-side Householder reflection with identity gate; works on bnb | +| HRA | yes | input-side Householder product via pre-hook; works on bnb | +| EVA | yes, fp only | LoRA forward; `lora_A` init from PCA on calibration activations | +| AntiPaSTO | yes, fp only | top-r weight SVD with learnable singular-value deltas + Cayley rotation | | SSVD / OFT / ROAD | no | planned | -| S-steer / AntiPaSTO | no | should use data-calibrated `group_init`, not plain LoRA tests | ## Targeting diff --git a/docs/developer_guide.md b/docs/developer_guide.md index 202c397..c2fbf98 100644 --- a/docs/developer_guide.md +++ b/docs/developer_guide.md @@ -72,11 +72,15 @@ Activation-aware variants implement `group_init(model, targets, cfg, calibration ## Adapter roadmap -| Variant | Fit to current runtime | Next invariant | +| Variant | Fit to current runtime | Status | |---|---|---| -| IA3 | Done. Output gate `y * g`, identity at `g=1`. | Qwen proof in latest probe. | -| DoRA | Done for fp layers. Reads dense `weight` to compute `||V||_c`; quantized layers fail fast. | Qwen proof in latest probe. | -| HRA | Done. Output-side Householder with identity gate; hook-only -> works on bnb. | Qwen proof in latest probe. | -| SSVD / PiSSA-family | Fits weight-SVD init path. | reconstruction/identity invariant plus train proof. | -| OFT / ROAD | Block-diagonal rotations; weight-transform semantics need clearer hook-only formulation. | pseudocode first, then rotation/non-dead-code invariant. | -| S-steer / AntiPaSTO | Should use `group_init` and activation evidence. | calibration consumed, hooks removed, load works without calibration. | +| LoRA | Hook-only additive low-rank. | Done. Tested. | +| PiSSA | Mutates `layer.weight` into `W_res`; identity via SVD round-trip. | Done. fp-only. Tested. | +| DeLoRA | Per-input-channel weight-norm scale, per-rank A/B normalization, learned `lambda`. | Done. Tested. | +| IA3 / IA3_FF | Output gate (k/v) and input gate (down_proj) variants, init to ones. | Done. Tested. | +| DoRA | Reads dense `weight` for `||V||_c`; bias passes through unscaled. | Done. fp-only. Tested. | +| HRA | Householder product applied via `forward_input` pre-hook; bnb-friendly. | Done. Tested. | +| EVA | LoRA forward; `lora_A` init from PCA on calibration activations via `group_init`. | Done. fp-only. Tested. | +| AntiPaSTO | Top-r weight SVD, learnable singular-value deltas + block-diagonal Cayley rotation. | Done. fp-only. Tested. | +| SSVD | Could fit the weight-SVD init path. | Planned. | +| OFT / ROAD | Block-diagonal rotations; needs clearer hook-only formulation. | Planned. | diff --git a/scripts/metamath_gsm8k_benchmark.py b/scripts/metamath_gsm8k_benchmark.py index cd06b13..339e814 100644 --- a/scripts/metamath_gsm8k_benchmark.py +++ b/scripts/metamath_gsm8k_benchmark.py @@ -492,7 +492,14 @@ def run(args: BenchmarkConfig) -> dict[str, Any]: model, tokenizer = load_model_and_tokenizer(args.model, dtype, args.device) batches, skipped_train_prompt_too_long = make_train_batches(datasets["train"], tokenizer, args) cfg = cfg_for_variant(args, dtype) - ll.attach(model, cfg) + if args.variant == "eva": + calib = [ + {"input_ids": b["input_ids"], "attention_mask": b["attention_mask"]} + for b in batches[: min(4, len(batches))] + ] + ll.attach(model, cfg, calibration_data=calib) + else: + ll.attach(model, cfg) attached = getattr(model, "_lora_lite_attached") trainable_names = assert_only_lora_trainable(model) probe_metrics = None diff --git a/tests/smoke.py b/tests/smoke.py index ab12251..fbafb48 100644 --- a/tests/smoke.py +++ b/tests/smoke.py @@ -1,475 +1,53 @@ -"""Smoke test: current variants on a tiny synthetic transformer-like model. +"""Smoke: end-to-end MetaMath->GSM8K plumbing for every variant on a tiny HF model. -Verifies: - 1. Identity at t=0 (delta ~ 0, output close to base). - 2. Save/load round-trip preserves outputs. - 3. A few SGD steps reduce a random loss (gradients flow). - -Run: - cd lora-lite - python -m pip install -e . - python tests/smoke.py - -BLUF format: - SHOULD: max|y_adapter - y_base| < tol_init for all variants. ELSE init or hook bug. - SHOULD: loss decreases > 5% over 20 SGD steps for all variants. ELSE grad/wiring bug. +Per-variant correctness invariants live in tests/test_lora_lite.py. This script +just confirms the full benchmark pipeline (data load, prompt encode, train step, +eval generate + answer extract) runs for each adapter type. """ from __future__ import annotations -import argparse -import os, sys, math -from pathlib import Path -import torch -from torch import nn -# allow running as `python tests/smoke.py` without install -sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) +import subprocess +import sys -import lora_lite as ll # noqa: E402 +VARIANTS = ["lora", "pissa", "delora", "ia3", "ia3_ff", "dora", "hra", "eva", "antipasto"] +MODEL = "hf-internal-testing/tiny-random-LlamaForCausalLM" -ARTIFACT_DIR = Path(__file__).parent / "_artifacts" +def run_one(variant: str) -> int: + cmd = [ + sys.executable, + "scripts/metamath_gsm8k_benchmark.py", + "--model", MODEL, + "--variant", variant, + "--steps", "2", + "--batch-size", "2", + "--max-train-samples", "8", + "--max-eval-samples", "10", + "--max-valid-samples", "10", + "--max-new-tokens", "8", + "--max-seq-length", "128", + "--r", "4", + "--alpha", "8", + "--torch-dtype", "float32", + "--device", "cpu", + ] + if variant == "ia3": + cmd += ["--target-name", r"(k_proj|v_proj)$"] + elif variant == "ia3_ff": + cmd += ["--target-name", r"(down_proj)$"] + print(f"\n=== smoke variant={variant} ===") + print(" ".join(cmd)) + return subprocess.call(cmd) -def assert_no_base_grads(model: nn.Module) -> None: - leaked = [name for name, p in model.named_parameters() if "lora_" not in name and p.grad is not None] - assert leaked == [], f"base params received grads: {leaked}" - - -# ---- a tiny transformer-like stack: 4 blocks of (q,k,v,o, gate,up,down) Linears ---- -class TinyBlock(nn.Module): - def __init__(self, d=64, ff=128): - super().__init__() - self.q_proj = nn.Linear(d, d, bias=False) - self.k_proj = nn.Linear(d, d, bias=False) - self.v_proj = nn.Linear(d, d, bias=False) - self.o_proj = nn.Linear(d, d, bias=False) - self.gate_proj = nn.Linear(d, ff, bias=False) - self.up_proj = nn.Linear(d, ff, bias=False) - self.down_proj = nn.Linear(ff, d, bias=False) - - def forward(self, x): - h = self.o_proj(self.q_proj(x) + self.k_proj(x) + self.v_proj(x)) - m = self.down_proj(torch.nn.functional.silu(self.gate_proj(x)) * self.up_proj(x)) - return x + h + m - - -class TinyModel(nn.Module): - def __init__(self, n_layers=4, d=64, ff=128, vocab=100): - super().__init__() - self.embed_tokens = nn.Embedding(vocab, d) - self.layers = nn.ModuleList([TinyBlock(d, ff) for _ in range(n_layers)]) - self.lm_head = nn.Linear(d, vocab, bias=False) - - class Cfg: # mimic HF .config.hidden_size - hidden_size = d - self.config = Cfg() - - def forward(self, ids): - x = self.embed_tokens(ids) - for blk in self.layers: - x = blk(x) - return self.lm_head(x) - - -class FakeLinearLike(nn.Module): - """Not nn.Linear, but structurally bnb-like enough for target discovery.""" - - def __init__(self, d_in=8, d_out=8): - super().__init__() - self.in_features = d_in - self.out_features = d_out - self.weight = nn.Parameter(torch.empty(d_out, d_in)) - nn.init.kaiming_uniform_(self.weight, a=5 ** 0.5) - - def forward(self, x): - return torch.nn.functional.linear(x, self.weight) - - -class FakeBnbModel(nn.Module): - def __init__(self): - super().__init__() - self.config = type("Cfg", (), {"hidden_size": 8})() - self.layers = nn.ModuleList([FakeLinearLike(8, 8)]) - - def forward(self, x): - return self.layers[0](x) - - -_CFG_BY_VARIANT = { - "lora": ll.LoRAConfig, - "pissa": ll.PiSSAConfig, - "delora": ll.DeLoRAConfig, - "ia3": ll.IA3Config, - "ia3_ff": ll.IA3FFConfig, - "dora": ll.DoRAConfig, - "hra": ll.HRAConfig, - "eva": ll.EVAConfig, - "antipasto": ll.AntiPaSTOConfig, -} - - -def variant_test(variant: str, dtype=torch.float32): - print(f"\n=== variant={variant} dtype={dtype} ===") - torch.manual_seed(0) - model = TinyModel().to(dtype) - ids = torch.randint(0, 100, (2, 16)) - - with torch.no_grad(): - y_base = model(ids).clone() - - cfg_cls = _CFG_BY_VARIANT[variant] - extra = {"lambda0": 15.0} if variant == "delora" else {} - cfg = cfg_cls( - r=4, - alpha=4 if variant == "pissa" else 8, # PiSSA needs scale==1 for clean recon - dtype=dtype, - # delora identity holds via B=0 init (peft semantics); use peft default lambda0=15. - **extra, - ) - handles = ll.attach(model, cfg) - n_targets = len(handles) - n_trainable = sum(p.numel() for p in model.parameters() if p.requires_grad) - print(f" attached {n_targets} targets, trainable params={n_trainable}") - assert n_targets == 28, f"expected 28 TinyModel targets, got {n_targets}" - - with torch.no_grad(): - y_adapt = model(ids) - err = (y_adapt - y_base).abs().max().item() - base_scale = y_base.abs().max().item() - print(f" t=0 identity: max|y_adapt - y_base| = {err:.3e} (base scale {base_scale:.3e})") - - # variant-specific identity tolerance - tol = { - "lora": 1e-6, - "pissa": 5e-4, # SVD recon in fp32 is tight; bf16 would be ~1e-2 - "delora": 1e-6, # B=0 -> delta=0 regardless of lambda - "ia3": 1e-6, - "dora": 5e-5, # m * V/||V|| with V=W -> rounding in norm/divide - "hra": 1e-6, # gate=0 -> exact identity - "antipasto": 5e-4, # SVD truncation + W_res reconstruction in fp32 - }[variant] * max(1.0, base_scale) - assert err < tol, f" FAIL identity: err {err} > tol {tol}" - print(f" SHOULD: err<{tol:.1e}. PASS.") - - # save/load round-trip - ARTIFACT_DIR.mkdir(exist_ok=True) - p = ARTIFACT_DIR / f"{variant}_smoke_adapter.pt" - ll.save(model, str(p)) - # detach + fresh model + load - ll.detach(model) - torch.manual_seed(0) - model2 = TinyModel().to(dtype) - # for PiSSA, base weights got mutated; load() re-runs PiSSA init on the fresh - # same-seed base, then overwrites lora_A/B with saved values. - ll.load(model2, str(p)) - with torch.no_grad(): - y_loaded = model2(ids) - err2 = (y_loaded - y_adapt).abs().max().item() - print(f" save/load: max|y_loaded - y_adapt| = {err2:.3e}") - assert err2 < tol, f" FAIL save/load: {err2} > {tol}" - print(f" SHOULD: err2<{tol:.1e}. PASS.") - ll.detach(model2) - - # gradient flow: 20 SGD steps on random target. - # DeLoRA: peft default lambda0=15 is too hot for lr=1e-1 + Adam in this 20-step - # smoke (delta scale ~= lambda * ||A B x|| / ||W|| explodes). Drop to lambda0=0.1 - # for training only; identity already validated above. - torch.manual_seed(0) - model = TinyModel().to(dtype) - train_cfg = cfg - if variant == "delora": - train_cfg = ll.DeLoRAConfig( - r=cfg.r, alpha=cfg.alpha, dtype=cfg.dtype, lambda0=0.1, - ) - ll.attach(model, train_cfg) - target = torch.randn(2, 16, 100, dtype=dtype) * 0.1 - trainable = [p for p in model.parameters() if p.requires_grad] - # delora has tightly-normalised updates; use Adam with higher lr to see signal in 20 steps - if variant in ("delora", "ia3", "hra"): - opt = torch.optim.Adam(trainable, lr=1e-1) - elif variant == "dora": - opt = torch.optim.Adam(trainable, lr=1e-3) # m near ||W||_c, bigger lr blows up - elif variant == "antipasto": - opt = torch.optim.Adam(trainable, lr=1e-2) # delta_s + rot_T, sensitive - else: - opt = torch.optim.SGD(trainable, lr=1e-2) - losses = [] - for step in range(20): - opt.zero_grad() - loss = (model(ids) - target).pow(2).mean() - loss.backward() - assert_no_base_grads(model) - opt.step() - losses.append(loss.item()) - drop = (losses[0] - losses[-1]) / max(losses[0], 1e-12) - print(f" loss[0]={losses[0]:.4f} loss[-1]={losses[-1]:.4f} drop={100*drop:.1f}%") - assert drop > 0.05, f" FAIL: loss drop only {drop:.2%}, expected >5%" - print(f" SHOULD: drop>5%. PASS.") - - -def structural_linear_like_test(): - print("\n=== structural linear-like target test (bnb-style, not nn.Linear) ===") - torch.manual_seed(0) - model = FakeBnbModel() - x = torch.randn(2, 3, 8) - y_base = model(x).detach() - ll.attach(model, ll.LoRAConfig(r=2, alpha=4, dtype=torch.float32, target_roles=())) - layer = model.layers[0] - assert hasattr(layer, "lora_A") and hasattr(layer, "lora_B") - y = model(x) - err = (y.detach() - y_base).abs().max().item() - loss = y.pow(2).mean() - loss.backward() - grad_nonzero = layer.lora_B.grad.abs().sum().item() > 0 - print(f" attached lora_A={tuple(layer.lora_A.shape)} lora_B={tuple(layer.lora_B.shape)}") - print(f" identity_err={err:.3e} grad_nonzero={grad_nonzero}") - assert err == 0.0 - assert grad_nonzero - print(" SHOULD: structural target attaches and lora_B receives grad. PASS.") - - -def bitsandbytes_cuda_smoke(require_bnb: bool): - label = "required" if require_bnb else "optional" - print(f"\n=== {label} bitsandbytes CUDA smoke (every variant) ===") - if not torch.cuda.is_available(): - if require_bnb: - raise RuntimeError("CUDA unavailable; required real bnb 4/8-bit smoke cannot run.") - print(" SKIP: CUDA unavailable; real bnb 4/8-bit forward needs GPU on this machine.") - return - try: - import bitsandbytes as bnb - except ImportError: - if require_bnb: - raise RuntimeError("bitsandbytes unavailable; install the bnb-test extra.") - print(" SKIP: bitsandbytes unavailable.") - return - - class BnbModel(nn.Module): - def __init__(self, Layer): - super().__init__() - self.config = type("Cfg", (), {"hidden_size": 8})() - self.layers = nn.ModuleList([Layer(8, 8, bias=False)]).cuda() - - def forward(self, x): - return self.layers[0](x) - - # bnb-compatible: hook-only variants that never read layer.weight in a way - # that depends on dequant. - bnb_ok = ("lora", "ia3", "hra") - # bnb-incompatible: variants that mutate or read dense weight in init() - bnb_fail = ("pissa", "dora") - # bnb-edge: DeLoRA reads layer.weight in init() to capture ||W||_2. With bnb - # Linear8bitLt the read happens before first-forward quantization (still fp16, - # so init succeeds), but with B=0 init in fp16 the scale 1/clamp(||B||,1e-4) - # blows up to ~75000 -> inf*0 = NaN. Real bnb usage should dequantize first. - # Keep delora out of the strict pass/fail check. - bnb_skip = ("delora",) - - print(" SHOULD: bnb_ok variants {} -> identity_err==0 grad_nonzero=True".format(bnb_ok)) - print(" SHOULD: bnb_fail variants {} -> attach() raises (dequant required)".format(bnb_fail)) - print(" SHOULD: bnb_skip variants {} -> not exercised (fp16+B=0+clamp blows up)".format(bnb_skip)) - - for layer_cls in (bnb.nn.Linear8bitLt, bnb.nn.Linear4bit): - for variant in bnb_ok: - torch.manual_seed(0) - model = BnbModel(layer_cls) - x = torch.randn(2, 3, 8, device="cuda") - y_base = model(x).detach() - cfg_cls = _CFG_BY_VARIANT[variant] - extra = {"lambda0": 0.1} if variant == "delora" else {} - # In fp16 + bnb, peft default lambda0=15 + B=0 + clamp(min=1e-4) gives - # scale=lambda/(r*1e-4) ~ 75000 > fp16 max -> inf*0 = NaN. Use small - # lambda0 for the fp16 test. - cfg = cfg_cls(r=2, alpha=4, dtype=torch.float16, target_roles=(), **extra) - ll.attach(model, cfg) - y = model(x) - err = (y.detach() - y_base).abs().max().item() - y.pow(2).mean().backward() - # find any trainable lora_* with a grad - grads = [(n, p.grad) for n, p in model.named_parameters() if "lora_" in n and p.requires_grad and p.grad is not None] - grad_nonzero = any(g.abs().sum().item() > 0 for _, g in grads) - print(f" {layer_cls.__name__:14s} {variant:6s}: identity_err={err:.3e} grad_nonzero={grad_nonzero}") - assert err < 1e-2, f" bnb identity err too large for {variant}" - assert grad_nonzero, f" no nonzero grad for {variant}" - ll.detach(model) - del model - - for variant in bnb_fail: - model = BnbModel(layer_cls) - cfg = _CFG_BY_VARIANT[variant](r=2, alpha=2, dtype=torch.float16, target_roles=()) - try: - ll.attach(model, cfg) - except (TypeError, RuntimeError, AttributeError, ValueError) as e: - print(f" {layer_cls.__name__:14s} {variant:6s}: fail-loud OK ({type(e).__name__})") - else: - raise AssertionError(f" {variant} on {layer_cls.__name__} should have failed loudly") - del model - - -def eva_smoke(): - """EVA needs calibration data: drives forward + per-target SVD on inputs.""" - print("\n=== variant=eva (data-driven init via group_init+calibration_data) ===") - torch.manual_seed(0) - model = TinyModel().to(torch.float32) - ids = torch.randint(0, 100, (2, 16)) - with torch.no_grad(): - y_base = model(ids).clone() - - cfg = ll.EVAConfig(r=4, alpha=8, dtype=torch.float32) - # 4 calibration batches of random ids - calib = [torch.randint(0, 100, (2, 16)) for _ in range(4)] - ll.attach(model, cfg, calibration_data=calib) - n_trainable = sum(p.numel() for p in model.parameters() if p.requires_grad) - print(f" trainable params={n_trainable} (lora_A AND lora_B both trainable per peft EVA)") - # peft EVA keeps A as a trainable Parameter; SVD only changes the INIT. - eva_layers = [m for m in model.modules() if hasattr(m, "lora_A")] - assert all(layer.lora_A.requires_grad for layer in eva_layers), \ - "EVA lora_A must be a trainable Parameter (peft semantics)" - print(f" SHOULD: lora_A.requires_grad==True on every EVA layer. PASS.") - - with torch.no_grad(): - y_adapt = model(ids) - err = (y_adapt - y_base).abs().max().item() - print(f" t=0 identity: max|y_adapt - y_base| = {err:.3e}") - assert err < 1e-6, f"EVA should be exact identity (B=0); got {err}" - print(" SHOULD: err==0 (B=0 init). PASS.") - - # check A buffer is non-zero (data-driven) - a_norms = [layer.lora_A.norm().item() for layer in [m for m in model.modules() if hasattr(m, "lora_A")]] - assert all(n > 0 for n in a_norms), "EVA lora_A buffers all zero -> group_init never ran" - print(f" SHOULD: lora_A buffers populated. PASS (mean ||A||={sum(a_norms)/len(a_norms):.3f}).") - - # save/load round-trip WITHOUT calibration data on load (load path uses _skip_group_init) - ARTIFACT_DIR.mkdir(exist_ok=True) - p = ARTIFACT_DIR / "eva_smoke_adapter.pt" - ll.save(model, str(p)) - ll.detach(model) - torch.manual_seed(0) - model2 = TinyModel().to(torch.float32) - ll.load(model2, str(p)) # must NOT require calibration_data - with torch.no_grad(): - y_loaded = model2(ids) - err2 = (y_loaded - y_adapt).abs().max().item() - print(f" save/load (no calibration on load): max err = {err2:.3e}") - assert err2 < 1e-6, f"EVA save/load mismatch {err2}" - print(" SHOULD: load without calibration_data works (uses _skip_group_init). PASS.") - ll.detach(model2) - # re-attach model for training section below - ll.attach(model, cfg, calibration_data=calib) - - # gradient flow: only B trains - target = torch.randn(2, 16, 100, dtype=torch.float32) * 0.1 - trainable = [p for p in model.parameters() if p.requires_grad] - opt = torch.optim.SGD(trainable, lr=1e-2) - losses = [] - for _ in range(20): - opt.zero_grad() - loss = (model(ids) - target).pow(2).mean() - loss.backward() - assert_no_base_grads(model) - opt.step() - losses.append(loss.item()) - drop = (losses[0] - losses[-1]) / max(losses[0], 1e-12) - print(f" loss[0]={losses[0]:.4f} loss[-1]={losses[-1]:.4f} drop={100*drop:.1f}%") - assert drop > 0.05 - print(" SHOULD: drop>5%. PASS.") - ll.detach(model) - - -def dora_bias_smoke(): - """V3 review caught: DoRA was scaling bias by m/||V||. Fixed; bias passes through.""" - print("\n=== dora bias passthrough (V3 fix) ===") - torch.manual_seed(0) - d = 16 - layer = nn.Linear(d, d, bias=True).to(torch.float32) - x = torch.randn(2, d) - y_base = layer(x).detach() - - class Wrap(nn.Module): - def __init__(self, lin): - super().__init__() - self.config = type("Cfg", (), {"hidden_size": d})() - self.layers = nn.ModuleList([lin]) - - def forward(self, x): - return self.layers[0](x) - - model = Wrap(layer) - cfg = ll.DoRAConfig(r=2, alpha=4, dtype=torch.float32, target_roles=()) - ll.attach(model, cfg) - with torch.no_grad(): - y_adapt = model(x) - err = (y_adapt - y_base).abs().max().item() - print(f" identity with bias=True: max err = {err:.3e}") - assert err < 1e-5, f"DoRA bias-passthrough broken: err {err} (likely bias being scaled)" - print(" SHOULD: identity err < 1e-5 even with bias. PASS.") - ll.detach(model) - - -def hra_forward_order_smoke(): - """Distinguishing check that HRA forward applies x @ R^T, not x @ R. - - Build R = H_0 H_1 ... H_{r-1} explicitly from U, and compare the adapted - output to F.linear(x, W @ R). If our pre-hook iterated forward (x @ R, the - bug), this would match only at identity init (paired rows give R^T = R). - """ - print("\n=== hra forward-order vs F.linear(x, W @ R) ===") - torch.manual_seed(0) - d = 8 - layer = nn.Linear(d, d, bias=False) - x = torch.randn(2, 3, d) - - cfg = ll.HRAConfig(r=4, alpha=4, dtype=torch.float32, target_roles=()) - class Wrap(nn.Module): - def __init__(self_, lin): - super().__init__() - self_.config = type("Cfg", (), {"hidden_size": d})() - self_.layers = nn.ModuleList([lin]) - def forward(self_, x): - return self_.layers[0](x) - model = Wrap(layer) - ll.attach(model, cfg) - - # break paired symmetry so order matters - with torch.no_grad(): - layer.lora_U.add_(0.1 * torch.randn_like(layer.lora_U)) - - # build R = H_0 H_1 ... H_{r-1} - U = layer.lora_U - R = torch.eye(d) - for i in range(U.shape[0]): - u = U[i] - sq = (u * u).sum().clamp_min(1e-12) - R = R - (2.0 / sq) * torch.outer(R @ u, u) - - with torch.no_grad(): - y_adapt = model(x) - y_ref = torch.nn.functional.linear(x, layer.weight @ R) - err = (y_adapt - y_ref).abs().max().item() - print(f" ||y_adapt - F.linear(x, W @ R)||_inf = {err:.3e}") - assert err < 1e-5, ( - "HRA forward order regression: should apply x @ R^T (loop reversed). " - "If you reverse the loop in forward_input you'll get x @ R instead, " - "and this check will fail with paired-symmetry-broken U." - ) - print(" SHOULD: err < 1e-5 (proves loop applies x @ R^T not x @ R). PASS.") - ll.detach(model) - - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("--require-bnb", action="store_true") - args = parser.parse_args() - - for v in ("lora", "pissa", "delora", "ia3", "dora", "hra", "antipasto"): - variant_test(v, dtype=torch.float32) - eva_smoke() - dora_bias_smoke() - hra_forward_order_smoke() - structural_linear_like_test() - bitsandbytes_cuda_smoke(args.require_bnb) - print("\nALL PASS.") +def main() -> int: + failed = [v for v in VARIANTS if run_one(v) != 0] + if failed: + print(f"FAIL: {failed}") + return 1 + print("ALL PASS.") + return 0 if __name__ == "__main__": - main() + sys.exit(main()) diff --git a/tests/test_bnb.py b/tests/test_bnb.py new file mode 100644 index 0000000..f9bdca8 --- /dev/null +++ b/tests/test_bnb.py @@ -0,0 +1,63 @@ +"""bnb 4bit/8bit CUDA smoke. Skipped without CUDA + bitsandbytes installed.""" +from __future__ import annotations + +import pytest +import torch +from torch import nn + +import lora_lite as ll + + +pytestmark = pytest.mark.skipif(not torch.cuda.is_available(), reason="needs CUDA") +bnb = pytest.importorskip("bitsandbytes") + + +CFG_BY_VARIANT = { + "lora": ll.LoRAConfig, + "ia3": ll.IA3Config, + "hra": ll.HRAConfig, + "pissa": ll.PiSSAConfig, + "dora": ll.DoRAConfig, +} + + +class BnbModel(nn.Module): + def __init__(self, layer_cls): + super().__init__() + self.config = type("Cfg", (), {"hidden_size": 8})() + self.layers = nn.ModuleList([layer_cls(8, 8, bias=False)]).cuda() + + def forward(self, x): + return self.layers[0](x) + + +@pytest.mark.parametrize("layer_cls", [bnb.nn.Linear8bitLt, bnb.nn.Linear4bit]) +@pytest.mark.parametrize("variant", ["lora", "ia3", "hra"]) +def test_hook_only_variants_attach_to_bnb(layer_cls, variant): + """LoRA / IA3 / HRA only hook outputs; bnb dequantization is the layer's job.""" + torch.manual_seed(0) + model = BnbModel(layer_cls) + x = torch.randn(2, 3, 8, device="cuda") + y_base = model(x).detach() + + cfg = CFG_BY_VARIANT[variant](r=2, alpha=4, dtype=torch.float16, target_roles=()) + ll.attach(model, cfg) + y = model(x) + assert (y.detach() - y_base).abs().max().item() < 1e-2 + + y.pow(2).mean().backward() + grad_total = sum( + g.abs().sum().item() + for n, p in model.named_parameters() + if "lora_" in n and p.requires_grad and (g := p.grad) is not None + ) + assert grad_total > 0 + + +@pytest.mark.parametrize("layer_cls", [bnb.nn.Linear8bitLt, bnb.nn.Linear4bit]) +@pytest.mark.parametrize("variant", ["pissa", "dora"]) +def test_weight_reading_variants_reject_bnb(layer_cls, variant): + model = BnbModel(layer_cls) + cfg = CFG_BY_VARIANT[variant](r=2, alpha=2, dtype=torch.float16, target_roles=()) + with pytest.raises((TypeError, RuntimeError, AttributeError, ValueError)): + ll.attach(model, cfg) diff --git a/tests/test_lora_lite.py b/tests/test_lora_lite.py index a19d665..190a38f 100644 --- a/tests/test_lora_lite.py +++ b/tests/test_lora_lite.py @@ -1,6 +1,11 @@ +"""Per-variant attach + train + save + load round-trip, plus surgical regressions. + +The big invariant is the parametrized train_save_load test: identity at t=0, +gradient flow on a real loss, then save -> reload onto a fresh model and +confirm the trained outputs survive the round-trip. Cheap on CPU. +""" from __future__ import annotations -import math from pathlib import Path import pytest @@ -10,7 +15,31 @@ from torch import nn import lora_lite as ll -ARTIFACT_DIR = Path(__file__).parent / "_artifacts" +CFG_BY_VARIANT = { + "lora": ll.LoRAConfig, + "pissa": ll.PiSSAConfig, + "delora": ll.DeLoRAConfig, + "ia3": ll.IA3Config, + "ia3_ff": ll.IA3FFConfig, + "dora": ll.DoRAConfig, + "hra": ll.HRAConfig, + "eva": ll.EVAConfig, + "antipasto": ll.AntiPaSTOConfig, +} + +# Per-variant identity tolerance at t=0 (after attach, before any step). +# fp32 SVD round-trip + per-row norm = looser tolerance for pissa/dora/antipasto. +IDENTITY_TOL = { + "lora": 1e-6, + "pissa": 5e-4, + "delora": 1e-6, + "ia3": 1e-6, + "ia3_ff": 1e-6, + "dora": 5e-5, + "hra": 5e-6, + "eva": 1e-6, + "antipasto": 5e-4, +} class TinyBlock(nn.Module): @@ -46,12 +75,14 @@ class TinyModel(nn.Module): class FakeLinearLike(nn.Module): + """linear-like, but not nn.Linear: stand-in for bnb 4/8-bit modules.""" + def __init__(self, d_in: int = 8, d_out: int = 8): super().__init__() self.in_features = d_in self.out_features = d_out self.weight = nn.Parameter(torch.empty(d_out, d_in)) - nn.init.kaiming_uniform_(self.weight, a=5**0.5) + nn.init.kaiming_uniform_(self.weight, a=5 ** 0.5) def forward(self, x: torch.Tensor) -> torch.Tensor: return torch.nn.functional.linear(x, self.weight) @@ -67,24 +98,9 @@ class FakeBnbModel(nn.Module): return self.layers[0](x) -_CFG_BY_VARIANT = { - "lora": ll.LoRAConfig, - "pissa": ll.PiSSAConfig, - "delora": ll.DeLoRAConfig, - "ia3": ll.IA3Config, - "ia3_ff": ll.IA3FFConfig, - "dora": ll.DoRAConfig, - "hra": ll.HRAConfig, - "eva": ll.EVAConfig, - "antipasto": ll.AntiPaSTOConfig, -} - - -def cfg_for_variant(variant: str, *, training: bool = False) -> ll.AdapterConfig: - # DeLoRA keeps identity via B=0, so nonzero lambda is needed for the - # perturb-output check to distinguish a live adapter from dead code. +def cfg_for(variant: str) -> ll.AdapterConfig: extra = {"lambda0": 0.1} if variant == "delora" else {} - return _CFG_BY_VARIANT[variant]( + return CFG_BY_VARIANT[variant]( r=4, alpha=4 if variant == "pissa" else 8, dtype=torch.float32, @@ -92,182 +108,172 @@ def cfg_for_variant(variant: str, *, training: bool = False) -> ll.AdapterConfig ) -def adapter_state(model: nn.Module) -> dict[str, torch.Tensor]: - return {k: v.detach().clone() for k, v in model.state_dict().items() if "lora_" in k} +def attach_with_calib(model: nn.Module, cfg: ll.AdapterConfig, ids: torch.Tensor) -> None: + if cfg.variant == "eva": + calib = [ids for _ in range(2)] + ll.attach(model, cfg, calibration_data=calib) + else: + ll.attach(model, cfg) -def assert_only_lora_trainable(model: nn.Module) -> None: - trainable_names = [name for name, p in model.named_parameters() if p.requires_grad] - assert trainable_names - assert all("lora_" in name for name in trainable_names) +def trainable_grad_norm(model: nn.Module) -> float: + return sum( + p.grad.detach().float().norm().item() + for n, p in model.named_parameters() + if "lora_" in n and p.grad is not None + ) -def assert_no_base_grads(model: nn.Module) -> None: - leaked = [name for name, p in model.named_parameters() if "lora_" not in name and p.grad is not None] - assert leaked == [] - - -def perturb_first_adapter(model: nn.Module) -> None: - """Nudge one trainable adapter parameter so forward output changes. - - Priority order matters: with B=0 init (DeLoRA, EVA, LoRA), perturbing a - scalar gate or lambda alone keeps delta=0, so we hit a matrix entry first. - """ - priority = ("lora_B", "lora_g", "lora_U", "lora_A", "lora_lambda", "lora_gate") - for key in priority: - for name, p in model.named_parameters(): - if not p.requires_grad or key not in name: - continue - with torch.no_grad(): - if p.ndim == 0: - p.add_(0.25) - else: - p.flatten()[0].add_(0.25) - return - raise AssertionError("no perturbable adapter parameter found") - - -@pytest.mark.parametrize("variant", ["lora", "pissa", "delora", "ia3", "dora", "hra"]) -def test_variant_identity_hook_save_load_and_training(variant: str): - ARTIFACT_DIR.mkdir(exist_ok=True) +@pytest.mark.parametrize("variant", list(CFG_BY_VARIANT)) +def test_train_save_load(variant: str, tmp_path: Path): + """Identity at t=0, one SGD step, save, reload onto fresh model, outputs match.""" torch.manual_seed(0) model = TinyModel() ids = torch.randint(0, 100, (2, 16)) - with torch.no_grad(): y_base = model(ids).clone() - cfg = cfg_for_variant(variant) - handles = ll.attach(model, cfg) - assert len(handles) == 28 - assert_only_lora_trainable(model) + cfg = cfg_for(variant) + attach_with_calib(model, cfg, ids) + + trainable = [p for p in model.parameters() if p.requires_grad] + assert trainable + assert all("lora_" in n for n, p in model.named_parameters() if p.requires_grad) with torch.no_grad(): y_init = model(ids).clone() - identity_err = (y_init - y_base).abs().max().item() - identity_tol = {"lora": 1e-6, "pissa": 5e-4, "delora": 1e-6, "ia3": 1e-6, "dora": 5e-5, "hra": 5e-6}[variant] - assert identity_err < identity_tol + assert (y_init - y_base).abs().max().item() < IDENTITY_TOL[variant] + + target = torch.randn_like(y_init) * 0.1 + opt = torch.optim.SGD(trainable, lr=1e-2) + opt.zero_grad() + loss = (model(ids) - target).pow(2).mean() + loss.backward() + leaked = [n for n, p in model.named_parameters() if "lora_" not in n and p.grad is not None] + assert leaked == [] + assert trainable_grad_norm(model) > 0 + opt.step() - before_perturb = adapter_state(model) - perturb_first_adapter(model) with torch.no_grad(): - perturb_delta = (model(ids) - y_init).abs().max().item() - assert perturb_delta > 1e-7 - for name, value in before_perturb.items(): - model.state_dict()[name].copy_(value) + y_trained = model(ids).clone() - path = ARTIFACT_DIR / f"{variant}_adapter.pt" + path = tmp_path / "adapter.pt" ll.save(model, str(path)) - saved = torch.load(path, weights_only=True, map_location="cpu") - assert set(saved["state"]) == set(adapter_state(model)) - assert any(k.startswith("layers.0.q_proj.lora_") for k in saved["state"]) torch.manual_seed(0) model_loaded = TinyModel() - ll.load(model_loaded, str(path)) - loaded_state = adapter_state(model_loaded) - for name, value in saved["state"].items(): - assert torch.equal(loaded_state[name].cpu(), value) + ll.load(model_loaded, str(path)) # EVA load skips group_init; calibration_data not needed with torch.no_grad(): y_loaded = model_loaded(ids) - assert (y_loaded - y_init).abs().max().item() < identity_tol - - torch.manual_seed(0) - train_model = TinyModel() - ll.attach(train_model, cfg_for_variant(variant, training=True)) - assert_only_lora_trainable(train_model) - target = torch.randn(2, 16, 100) * 0.1 - trainable = [p for p in train_model.parameters() if p.requires_grad] - opt = torch.optim.Adam(trainable, lr=0.1) if variant in ("delora", "ia3", "hra") else ( - torch.optim.Adam(trainable, lr=1e-3) if variant == "dora" else torch.optim.SGD(trainable, lr=1e-2) - ) - losses = [] - first_grad_norm = math.nan - before_train = adapter_state(train_model) - for step in range(20): - opt.zero_grad() - loss = (train_model(ids) - target).pow(2).mean() - loss.backward() - assert_no_base_grads(train_model) - grad_norm = sum( - p.grad.detach().float().norm().item() - for name, p in train_model.named_parameters() - if "lora_" in name and p.grad is not None - ) - assert math.isfinite(grad_norm) - if step == 0: - first_grad_norm = grad_norm - opt.step() - losses.append(loss.item()) - after_train = adapter_state(train_model) - adapter_delta = sum((after_train[k] - before_train[k]).float().norm().item() for k in before_train) - drop = (losses[0] - losses[-1]) / losses[0] - assert first_grad_norm > 0 - assert adapter_delta > 0 - assert drop > 0.05 + assert (y_loaded - y_trained).abs().max().item() < max(IDENTITY_TOL[variant], 1e-5) -def test_load_fails_on_missing_and_unexpected_lora_keys(): - ARTIFACT_DIR.mkdir(exist_ok=True) +@pytest.mark.parametrize("variant", ["lora", "delora", "ia3", "hra"]) +def test_hook_only_variants_attach_to_non_linear_target(variant: str): + """bnb-style targets are linear-like but not nn.Linear; hook-only variants must accept them.""" + extra = {"lambda0": 0.1} if variant == "delora" else {} + cfg = CFG_BY_VARIANT[variant](r=2, alpha=4, dtype=torch.float32, target_roles=(), **extra) + model = FakeBnbModel() + ll.attach(model, cfg) + x = torch.randn(2, 3, 8) + model(x).pow(2).mean().backward() + assert trainable_grad_norm(model) > 0 + + +@pytest.mark.parametrize("variant", ["pissa", "dora", "antipasto"]) +def test_weight_reading_variants_reject_non_linear(variant: str): + r = 4 if variant == "antipasto" else 2 # antipasto needs r % block_size==0 + cfg = CFG_BY_VARIANT[variant](r=r, alpha=r, dtype=torch.float32, target_roles=()) + with pytest.raises(TypeError, match="plain nn.Linear"): + ll.attach(FakeBnbModel(), cfg) + + +def test_save_load_strict_keys(tmp_path: Path): torch.manual_seed(0) model = TinyModel() - ll.attach(model, cfg_for_variant("lora")) - good_path = ARTIFACT_DIR / "lora_good.pt" - ll.save(model, str(good_path)) - blob = torch.load(good_path, weights_only=True, map_location="cpu") + ll.attach(model, ll.LoRAConfig(r=4, alpha=8, dtype=torch.float32)) + p = tmp_path / "lora.pt" + ll.save(model, str(p)) + blob = torch.load(p, weights_only=True, map_location="cpu") - missing_blob = {"cfg": blob["cfg"], "state": dict(blob["state"])} - missing_blob["state"].pop(next(iter(missing_blob["state"]))) - missing_path = ARTIFACT_DIR / "lora_missing.pt" - torch.save(missing_blob, missing_path) + missing = {"cfg": blob["cfg"], "state": dict(blob["state"]), "base_fp": blob.get("base_fp", {})} + missing["state"].pop(next(iter(missing["state"]))) + torch.save(missing, p) with pytest.raises(RuntimeError, match="missing lora keys"): - ll.load(TinyModel(), str(missing_path)) + ll.load(TinyModel(), str(p)) - unexpected_blob = {"cfg": blob["cfg"], "state": dict(blob["state"])} - unexpected_blob["state"]["layers.0.q_proj.lora_extra"] = torch.zeros(1) - unexpected_path = ARTIFACT_DIR / "lora_unexpected.pt" - torch.save(unexpected_blob, unexpected_path) + bad = {"cfg": blob["cfg"], "state": dict(blob["state"]), "base_fp": blob.get("base_fp", {})} + bad["state"]["layers.0.q_proj.lora_extra"] = torch.zeros(1) + torch.save(bad, p) with pytest.raises(RuntimeError, match="unexpected lora keys"): - ll.load(TinyModel(), str(unexpected_path)) + ll.load(TinyModel(), str(p)) -def test_no_target_layers_is_loud_failure(): +def test_no_target_layers_is_loud(): cfg = ll.LoRAConfig(target_names=("definitely_missing",)) with pytest.raises(RuntimeError, match="no target layers"): ll.attach(TinyModel(), cfg) -@pytest.mark.parametrize("variant", ["lora", "delora", "ia3", "hra"]) -def test_structural_non_linear_target_trains_for_forward_only_variants(variant: str): +def test_eva_requires_calibration(): + """EVA's group_init must error loudly if calibration_data is missing.""" + with pytest.raises(ValueError, match="calibration_data"): + ll.attach(TinyModel(), ll.EVAConfig(r=4, alpha=8, dtype=torch.float32)) + + +def test_dora_bias_passthrough(): + """Regression: DoRA must NOT scale bias; identity holds with bias=True at t=0.""" torch.manual_seed(0) - model = FakeBnbModel() - x = torch.randn(2, 3, 8) - y_base = model(x).detach() - extra = {"lambda0": 0.1} if variant == "delora" else {} - cfg = _CFG_BY_VARIANT[variant]( - r=2, - alpha=4, - dtype=torch.float32, - target_roles=(), - **extra, - ) - ll.attach(model, cfg) - y_init = model(x) - # delora: lambda0=0.1 is small but B=0 still makes delta=0 at t=0, so identity holds. - assert (y_init.detach() - y_base).abs().max().item() < 1e-6 - loss = y_init.pow(2).mean() - loss.backward() - assert_no_base_grads(model) - adapter_grad_norm = sum( - p.grad.detach().float().norm().item() - for name, p in model.named_parameters() - if "lora_" in name and p.grad is not None - ) - assert adapter_grad_norm > 0 + d = 16 + layer = nn.Linear(d, d, bias=True) + x = torch.randn(2, d) + y_base = layer(x).detach() + + class Wrap(nn.Module): + def __init__(self, lin): + super().__init__() + self.config = type("Cfg", (), {"hidden_size": d})() + self.layers = nn.ModuleList([lin]) + + def forward(self, x): + return self.layers[0](x) + + model = Wrap(layer) + ll.attach(model, ll.DoRAConfig(r=2, alpha=4, dtype=torch.float32, target_roles=())) + with torch.no_grad(): + y = model(x) + assert (y - y_base).abs().max().item() < 1e-5 -@pytest.mark.parametrize("variant", ["pissa", "dora"]) -def test_weight_reading_variants_reject_structural_non_linear_target(variant: str): - cfg = _CFG_BY_VARIANT[variant](r=2, alpha=2, dtype=torch.float32, target_roles=()) - with pytest.raises(TypeError, match="plain nn.Linear"): - ll.attach(FakeBnbModel(), cfg) \ No newline at end of file +def test_hra_forward_is_x_R_T(): + """HRA must apply x @ R^T (loop i = r-1 down to 0). Asymmetric U makes order observable.""" + torch.manual_seed(0) + d = 8 + layer = nn.Linear(d, d, bias=False) + x = torch.randn(2, 3, d) + + class Wrap(nn.Module): + def __init__(self, lin): + super().__init__() + self.config = type("Cfg", (), {"hidden_size": d})() + self.layers = nn.ModuleList([lin]) + + def forward(self, x): + return self.layers[0](x) + + model = Wrap(layer) + ll.attach(model, ll.HRAConfig(r=4, alpha=4, dtype=torch.float32, target_roles=())) + # break paired symmetry so order matters + with torch.no_grad(): + layer.lora_U.add_(0.1 * torch.randn_like(layer.lora_U)) + + U = layer.lora_U + R = torch.eye(d) + for i in range(U.shape[0]): + u = U[i] + sq = (u * u).sum().clamp_min(1e-12) + R = R - (2.0 / sq) * torch.outer(R @ u, u) + with torch.no_grad(): + y_adapt = model(x) + y_ref = torch.nn.functional.linear(x, layer.weight @ R) + assert (y_adapt - y_ref).abs().max().item() < 1e-5