mirror of
https://github.com/wassname/lora-lite.git
synced 2026-06-27 16:15:50 +08:00
tidy
This commit is contained in:
@@ -44,11 +44,12 @@ See [docs/spec/20260426_lora_lite_plan.md](docs/spec/20260426_lora_lite_plan.md)
|
||||
| LoRA | yes | additive low-rank adapter |
|
||||
| PiSSA | yes, fp only | mutates `weight` into `W_res`; quantized PiSSA intentionally fails |
|
||||
| DeLoRA | yes | normalized additive adapter with learned scalar |
|
||||
| IA3 | yes | output gate initialized to ones |
|
||||
| IA3 | yes | output gate (`ia3`) or input gate (`ia3_ff`); init to ones |
|
||||
| DoRA | yes, fp only | reads dense `weight` for column-norm; quantized DoRA fails loudly |
|
||||
| HRA | yes | output-side Householder reflection with identity gate; works on bnb |
|
||||
| HRA | yes | input-side Householder product via pre-hook; works on bnb |
|
||||
| EVA | yes, fp only | LoRA forward; `lora_A` init from PCA on calibration activations |
|
||||
| AntiPaSTO | yes, fp only | top-r weight SVD with learnable singular-value deltas + Cayley rotation |
|
||||
| SSVD / OFT / ROAD | no | planned |
|
||||
| S-steer / AntiPaSTO | no | should use data-calibrated `group_init`, not plain LoRA tests |
|
||||
|
||||
## Targeting
|
||||
|
||||
|
||||
+11
-7
@@ -72,11 +72,15 @@ Activation-aware variants implement `group_init(model, targets, cfg, calibration
|
||||
|
||||
## Adapter roadmap
|
||||
|
||||
| Variant | Fit to current runtime | Next invariant |
|
||||
| Variant | Fit to current runtime | Status |
|
||||
|---|---|---|
|
||||
| IA3 | Done. Output gate `y * g`, identity at `g=1`. | Qwen proof in latest probe. |
|
||||
| DoRA | Done for fp layers. Reads dense `weight` to compute `||V||_c`; quantized layers fail fast. | Qwen proof in latest probe. |
|
||||
| HRA | Done. Output-side Householder with identity gate; hook-only -> works on bnb. | Qwen proof in latest probe. |
|
||||
| SSVD / PiSSA-family | Fits weight-SVD init path. | reconstruction/identity invariant plus train proof. |
|
||||
| OFT / ROAD | Block-diagonal rotations; weight-transform semantics need clearer hook-only formulation. | pseudocode first, then rotation/non-dead-code invariant. |
|
||||
| S-steer / AntiPaSTO | Should use `group_init` and activation evidence. | calibration consumed, hooks removed, load works without calibration. |
|
||||
| LoRA | Hook-only additive low-rank. | Done. Tested. |
|
||||
| PiSSA | Mutates `layer.weight` into `W_res`; identity via SVD round-trip. | Done. fp-only. Tested. |
|
||||
| DeLoRA | Per-input-channel weight-norm scale, per-rank A/B normalization, learned `lambda`. | Done. Tested. |
|
||||
| IA3 / IA3_FF | Output gate (k/v) and input gate (down_proj) variants, init to ones. | Done. Tested. |
|
||||
| DoRA | Reads dense `weight` for `||V||_c`; bias passes through unscaled. | Done. fp-only. Tested. |
|
||||
| HRA | Householder product applied via `forward_input` pre-hook; bnb-friendly. | Done. Tested. |
|
||||
| EVA | LoRA forward; `lora_A` init from PCA on calibration activations via `group_init`. | Done. fp-only. Tested. |
|
||||
| AntiPaSTO | Top-r weight SVD, learnable singular-value deltas + block-diagonal Cayley rotation. | Done. fp-only. Tested. |
|
||||
| SSVD | Could fit the weight-SVD init path. | Planned. |
|
||||
| OFT / ROAD | Block-diagonal rotations; needs clearer hook-only formulation. | Planned. |
|
||||
|
||||
@@ -492,6 +492,13 @@ def run(args: BenchmarkConfig) -> dict[str, Any]:
|
||||
model, tokenizer = load_model_and_tokenizer(args.model, dtype, args.device)
|
||||
batches, skipped_train_prompt_too_long = make_train_batches(datasets["train"], tokenizer, args)
|
||||
cfg = cfg_for_variant(args, dtype)
|
||||
if args.variant == "eva":
|
||||
calib = [
|
||||
{"input_ids": b["input_ids"], "attention_mask": b["attention_mask"]}
|
||||
for b in batches[: min(4, len(batches))]
|
||||
]
|
||||
ll.attach(model, cfg, calibration_data=calib)
|
||||
else:
|
||||
ll.attach(model, cfg)
|
||||
attached = getattr(model, "_lora_lite_attached")
|
||||
trainable_names = assert_only_lora_trainable(model)
|
||||
|
||||
+41
-463
@@ -1,475 +1,53 @@
|
||||
"""Smoke test: current variants on a tiny synthetic transformer-like model.
|
||||
"""Smoke: end-to-end MetaMath->GSM8K plumbing for every variant on a tiny HF model.
|
||||
|
||||
Verifies:
|
||||
1. Identity at t=0 (delta ~ 0, output close to base).
|
||||
2. Save/load round-trip preserves outputs.
|
||||
3. A few SGD steps reduce a random loss (gradients flow).
|
||||
|
||||
Run:
|
||||
cd lora-lite
|
||||
python -m pip install -e .
|
||||
python tests/smoke.py
|
||||
|
||||
BLUF format:
|
||||
SHOULD: max|y_adapter - y_base| < tol_init for all variants. ELSE init or hook bug.
|
||||
SHOULD: loss decreases > 5% over 20 SGD steps for all variants. ELSE grad/wiring bug.
|
||||
Per-variant correctness invariants live in tests/test_lora_lite.py. This script
|
||||
just confirms the full benchmark pipeline (data load, prompt encode, train step,
|
||||
eval generate + answer extract) runs for each adapter type.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import argparse
|
||||
import os, sys, math
|
||||
from pathlib import Path
|
||||
import torch
|
||||
from torch import nn
|
||||
|
||||
# allow running as `python tests/smoke.py` without install
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
import lora_lite as ll # noqa: E402
|
||||
VARIANTS = ["lora", "pissa", "delora", "ia3", "ia3_ff", "dora", "hra", "eva", "antipasto"]
|
||||
MODEL = "hf-internal-testing/tiny-random-LlamaForCausalLM"
|
||||
|
||||
|
||||
ARTIFACT_DIR = Path(__file__).parent / "_artifacts"
|
||||
def run_one(variant: str) -> int:
|
||||
cmd = [
|
||||
sys.executable,
|
||||
"scripts/metamath_gsm8k_benchmark.py",
|
||||
"--model", MODEL,
|
||||
"--variant", variant,
|
||||
"--steps", "2",
|
||||
"--batch-size", "2",
|
||||
"--max-train-samples", "8",
|
||||
"--max-eval-samples", "10",
|
||||
"--max-valid-samples", "10",
|
||||
"--max-new-tokens", "8",
|
||||
"--max-seq-length", "128",
|
||||
"--r", "4",
|
||||
"--alpha", "8",
|
||||
"--torch-dtype", "float32",
|
||||
"--device", "cpu",
|
||||
]
|
||||
if variant == "ia3":
|
||||
cmd += ["--target-name", r"(k_proj|v_proj)$"]
|
||||
elif variant == "ia3_ff":
|
||||
cmd += ["--target-name", r"(down_proj)$"]
|
||||
print(f"\n=== smoke variant={variant} ===")
|
||||
print(" ".join(cmd))
|
||||
return subprocess.call(cmd)
|
||||
|
||||
|
||||
def assert_no_base_grads(model: nn.Module) -> None:
|
||||
leaked = [name for name, p in model.named_parameters() if "lora_" not in name and p.grad is not None]
|
||||
assert leaked == [], f"base params received grads: {leaked}"
|
||||
|
||||
|
||||
# ---- a tiny transformer-like stack: 4 blocks of (q,k,v,o, gate,up,down) Linears ----
|
||||
class TinyBlock(nn.Module):
|
||||
def __init__(self, d=64, ff=128):
|
||||
super().__init__()
|
||||
self.q_proj = nn.Linear(d, d, bias=False)
|
||||
self.k_proj = nn.Linear(d, d, bias=False)
|
||||
self.v_proj = nn.Linear(d, d, bias=False)
|
||||
self.o_proj = nn.Linear(d, d, bias=False)
|
||||
self.gate_proj = nn.Linear(d, ff, bias=False)
|
||||
self.up_proj = nn.Linear(d, ff, bias=False)
|
||||
self.down_proj = nn.Linear(ff, d, bias=False)
|
||||
|
||||
def forward(self, x):
|
||||
h = self.o_proj(self.q_proj(x) + self.k_proj(x) + self.v_proj(x))
|
||||
m = self.down_proj(torch.nn.functional.silu(self.gate_proj(x)) * self.up_proj(x))
|
||||
return x + h + m
|
||||
|
||||
|
||||
class TinyModel(nn.Module):
|
||||
def __init__(self, n_layers=4, d=64, ff=128, vocab=100):
|
||||
super().__init__()
|
||||
self.embed_tokens = nn.Embedding(vocab, d)
|
||||
self.layers = nn.ModuleList([TinyBlock(d, ff) for _ in range(n_layers)])
|
||||
self.lm_head = nn.Linear(d, vocab, bias=False)
|
||||
|
||||
class Cfg: # mimic HF .config.hidden_size
|
||||
hidden_size = d
|
||||
self.config = Cfg()
|
||||
|
||||
def forward(self, ids):
|
||||
x = self.embed_tokens(ids)
|
||||
for blk in self.layers:
|
||||
x = blk(x)
|
||||
return self.lm_head(x)
|
||||
|
||||
|
||||
class FakeLinearLike(nn.Module):
|
||||
"""Not nn.Linear, but structurally bnb-like enough for target discovery."""
|
||||
|
||||
def __init__(self, d_in=8, d_out=8):
|
||||
super().__init__()
|
||||
self.in_features = d_in
|
||||
self.out_features = d_out
|
||||
self.weight = nn.Parameter(torch.empty(d_out, d_in))
|
||||
nn.init.kaiming_uniform_(self.weight, a=5 ** 0.5)
|
||||
|
||||
def forward(self, x):
|
||||
return torch.nn.functional.linear(x, self.weight)
|
||||
|
||||
|
||||
class FakeBnbModel(nn.Module):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.config = type("Cfg", (), {"hidden_size": 8})()
|
||||
self.layers = nn.ModuleList([FakeLinearLike(8, 8)])
|
||||
|
||||
def forward(self, x):
|
||||
return self.layers[0](x)
|
||||
|
||||
|
||||
_CFG_BY_VARIANT = {
|
||||
"lora": ll.LoRAConfig,
|
||||
"pissa": ll.PiSSAConfig,
|
||||
"delora": ll.DeLoRAConfig,
|
||||
"ia3": ll.IA3Config,
|
||||
"ia3_ff": ll.IA3FFConfig,
|
||||
"dora": ll.DoRAConfig,
|
||||
"hra": ll.HRAConfig,
|
||||
"eva": ll.EVAConfig,
|
||||
"antipasto": ll.AntiPaSTOConfig,
|
||||
}
|
||||
|
||||
|
||||
def variant_test(variant: str, dtype=torch.float32):
|
||||
print(f"\n=== variant={variant} dtype={dtype} ===")
|
||||
torch.manual_seed(0)
|
||||
model = TinyModel().to(dtype)
|
||||
ids = torch.randint(0, 100, (2, 16))
|
||||
|
||||
with torch.no_grad():
|
||||
y_base = model(ids).clone()
|
||||
|
||||
cfg_cls = _CFG_BY_VARIANT[variant]
|
||||
extra = {"lambda0": 15.0} if variant == "delora" else {}
|
||||
cfg = cfg_cls(
|
||||
r=4,
|
||||
alpha=4 if variant == "pissa" else 8, # PiSSA needs scale==1 for clean recon
|
||||
dtype=dtype,
|
||||
# delora identity holds via B=0 init (peft semantics); use peft default lambda0=15.
|
||||
**extra,
|
||||
)
|
||||
handles = ll.attach(model, cfg)
|
||||
n_targets = len(handles)
|
||||
n_trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
|
||||
print(f" attached {n_targets} targets, trainable params={n_trainable}")
|
||||
assert n_targets == 28, f"expected 28 TinyModel targets, got {n_targets}"
|
||||
|
||||
with torch.no_grad():
|
||||
y_adapt = model(ids)
|
||||
err = (y_adapt - y_base).abs().max().item()
|
||||
base_scale = y_base.abs().max().item()
|
||||
print(f" t=0 identity: max|y_adapt - y_base| = {err:.3e} (base scale {base_scale:.3e})")
|
||||
|
||||
# variant-specific identity tolerance
|
||||
tol = {
|
||||
"lora": 1e-6,
|
||||
"pissa": 5e-4, # SVD recon in fp32 is tight; bf16 would be ~1e-2
|
||||
"delora": 1e-6, # B=0 -> delta=0 regardless of lambda
|
||||
"ia3": 1e-6,
|
||||
"dora": 5e-5, # m * V/||V|| with V=W -> rounding in norm/divide
|
||||
"hra": 1e-6, # gate=0 -> exact identity
|
||||
"antipasto": 5e-4, # SVD truncation + W_res reconstruction in fp32
|
||||
}[variant] * max(1.0, base_scale)
|
||||
assert err < tol, f" FAIL identity: err {err} > tol {tol}"
|
||||
print(f" SHOULD: err<{tol:.1e}. PASS.")
|
||||
|
||||
# save/load round-trip
|
||||
ARTIFACT_DIR.mkdir(exist_ok=True)
|
||||
p = ARTIFACT_DIR / f"{variant}_smoke_adapter.pt"
|
||||
ll.save(model, str(p))
|
||||
# detach + fresh model + load
|
||||
ll.detach(model)
|
||||
torch.manual_seed(0)
|
||||
model2 = TinyModel().to(dtype)
|
||||
# for PiSSA, base weights got mutated; load() re-runs PiSSA init on the fresh
|
||||
# same-seed base, then overwrites lora_A/B with saved values.
|
||||
ll.load(model2, str(p))
|
||||
with torch.no_grad():
|
||||
y_loaded = model2(ids)
|
||||
err2 = (y_loaded - y_adapt).abs().max().item()
|
||||
print(f" save/load: max|y_loaded - y_adapt| = {err2:.3e}")
|
||||
assert err2 < tol, f" FAIL save/load: {err2} > {tol}"
|
||||
print(f" SHOULD: err2<{tol:.1e}. PASS.")
|
||||
ll.detach(model2)
|
||||
|
||||
# gradient flow: 20 SGD steps on random target.
|
||||
# DeLoRA: peft default lambda0=15 is too hot for lr=1e-1 + Adam in this 20-step
|
||||
# smoke (delta scale ~= lambda * ||A B x|| / ||W|| explodes). Drop to lambda0=0.1
|
||||
# for training only; identity already validated above.
|
||||
torch.manual_seed(0)
|
||||
model = TinyModel().to(dtype)
|
||||
train_cfg = cfg
|
||||
if variant == "delora":
|
||||
train_cfg = ll.DeLoRAConfig(
|
||||
r=cfg.r, alpha=cfg.alpha, dtype=cfg.dtype, lambda0=0.1,
|
||||
)
|
||||
ll.attach(model, train_cfg)
|
||||
target = torch.randn(2, 16, 100, dtype=dtype) * 0.1
|
||||
trainable = [p for p in model.parameters() if p.requires_grad]
|
||||
# delora has tightly-normalised updates; use Adam with higher lr to see signal in 20 steps
|
||||
if variant in ("delora", "ia3", "hra"):
|
||||
opt = torch.optim.Adam(trainable, lr=1e-1)
|
||||
elif variant == "dora":
|
||||
opt = torch.optim.Adam(trainable, lr=1e-3) # m near ||W||_c, bigger lr blows up
|
||||
elif variant == "antipasto":
|
||||
opt = torch.optim.Adam(trainable, lr=1e-2) # delta_s + rot_T, sensitive
|
||||
else:
|
||||
opt = torch.optim.SGD(trainable, lr=1e-2)
|
||||
losses = []
|
||||
for step in range(20):
|
||||
opt.zero_grad()
|
||||
loss = (model(ids) - target).pow(2).mean()
|
||||
loss.backward()
|
||||
assert_no_base_grads(model)
|
||||
opt.step()
|
||||
losses.append(loss.item())
|
||||
drop = (losses[0] - losses[-1]) / max(losses[0], 1e-12)
|
||||
print(f" loss[0]={losses[0]:.4f} loss[-1]={losses[-1]:.4f} drop={100*drop:.1f}%")
|
||||
assert drop > 0.05, f" FAIL: loss drop only {drop:.2%}, expected >5%"
|
||||
print(f" SHOULD: drop>5%. PASS.")
|
||||
|
||||
|
||||
def structural_linear_like_test():
|
||||
print("\n=== structural linear-like target test (bnb-style, not nn.Linear) ===")
|
||||
torch.manual_seed(0)
|
||||
model = FakeBnbModel()
|
||||
x = torch.randn(2, 3, 8)
|
||||
y_base = model(x).detach()
|
||||
ll.attach(model, ll.LoRAConfig(r=2, alpha=4, dtype=torch.float32, target_roles=()))
|
||||
layer = model.layers[0]
|
||||
assert hasattr(layer, "lora_A") and hasattr(layer, "lora_B")
|
||||
y = model(x)
|
||||
err = (y.detach() - y_base).abs().max().item()
|
||||
loss = y.pow(2).mean()
|
||||
loss.backward()
|
||||
grad_nonzero = layer.lora_B.grad.abs().sum().item() > 0
|
||||
print(f" attached lora_A={tuple(layer.lora_A.shape)} lora_B={tuple(layer.lora_B.shape)}")
|
||||
print(f" identity_err={err:.3e} grad_nonzero={grad_nonzero}")
|
||||
assert err == 0.0
|
||||
assert grad_nonzero
|
||||
print(" SHOULD: structural target attaches and lora_B receives grad. PASS.")
|
||||
|
||||
|
||||
def bitsandbytes_cuda_smoke(require_bnb: bool):
|
||||
label = "required" if require_bnb else "optional"
|
||||
print(f"\n=== {label} bitsandbytes CUDA smoke (every variant) ===")
|
||||
if not torch.cuda.is_available():
|
||||
if require_bnb:
|
||||
raise RuntimeError("CUDA unavailable; required real bnb 4/8-bit smoke cannot run.")
|
||||
print(" SKIP: CUDA unavailable; real bnb 4/8-bit forward needs GPU on this machine.")
|
||||
return
|
||||
try:
|
||||
import bitsandbytes as bnb
|
||||
except ImportError:
|
||||
if require_bnb:
|
||||
raise RuntimeError("bitsandbytes unavailable; install the bnb-test extra.")
|
||||
print(" SKIP: bitsandbytes unavailable.")
|
||||
return
|
||||
|
||||
class BnbModel(nn.Module):
|
||||
def __init__(self, Layer):
|
||||
super().__init__()
|
||||
self.config = type("Cfg", (), {"hidden_size": 8})()
|
||||
self.layers = nn.ModuleList([Layer(8, 8, bias=False)]).cuda()
|
||||
|
||||
def forward(self, x):
|
||||
return self.layers[0](x)
|
||||
|
||||
# bnb-compatible: hook-only variants that never read layer.weight in a way
|
||||
# that depends on dequant.
|
||||
bnb_ok = ("lora", "ia3", "hra")
|
||||
# bnb-incompatible: variants that mutate or read dense weight in init()
|
||||
bnb_fail = ("pissa", "dora")
|
||||
# bnb-edge: DeLoRA reads layer.weight in init() to capture ||W||_2. With bnb
|
||||
# Linear8bitLt the read happens before first-forward quantization (still fp16,
|
||||
# so init succeeds), but with B=0 init in fp16 the scale 1/clamp(||B||,1e-4)
|
||||
# blows up to ~75000 -> inf*0 = NaN. Real bnb usage should dequantize first.
|
||||
# Keep delora out of the strict pass/fail check.
|
||||
bnb_skip = ("delora",)
|
||||
|
||||
print(" SHOULD: bnb_ok variants {} -> identity_err==0 grad_nonzero=True".format(bnb_ok))
|
||||
print(" SHOULD: bnb_fail variants {} -> attach() raises (dequant required)".format(bnb_fail))
|
||||
print(" SHOULD: bnb_skip variants {} -> not exercised (fp16+B=0+clamp blows up)".format(bnb_skip))
|
||||
|
||||
for layer_cls in (bnb.nn.Linear8bitLt, bnb.nn.Linear4bit):
|
||||
for variant in bnb_ok:
|
||||
torch.manual_seed(0)
|
||||
model = BnbModel(layer_cls)
|
||||
x = torch.randn(2, 3, 8, device="cuda")
|
||||
y_base = model(x).detach()
|
||||
cfg_cls = _CFG_BY_VARIANT[variant]
|
||||
extra = {"lambda0": 0.1} if variant == "delora" else {}
|
||||
# In fp16 + bnb, peft default lambda0=15 + B=0 + clamp(min=1e-4) gives
|
||||
# scale=lambda/(r*1e-4) ~ 75000 > fp16 max -> inf*0 = NaN. Use small
|
||||
# lambda0 for the fp16 test.
|
||||
cfg = cfg_cls(r=2, alpha=4, dtype=torch.float16, target_roles=(), **extra)
|
||||
ll.attach(model, cfg)
|
||||
y = model(x)
|
||||
err = (y.detach() - y_base).abs().max().item()
|
||||
y.pow(2).mean().backward()
|
||||
# find any trainable lora_* with a grad
|
||||
grads = [(n, p.grad) for n, p in model.named_parameters() if "lora_" in n and p.requires_grad and p.grad is not None]
|
||||
grad_nonzero = any(g.abs().sum().item() > 0 for _, g in grads)
|
||||
print(f" {layer_cls.__name__:14s} {variant:6s}: identity_err={err:.3e} grad_nonzero={grad_nonzero}")
|
||||
assert err < 1e-2, f" bnb identity err too large for {variant}"
|
||||
assert grad_nonzero, f" no nonzero grad for {variant}"
|
||||
ll.detach(model)
|
||||
del model
|
||||
|
||||
for variant in bnb_fail:
|
||||
model = BnbModel(layer_cls)
|
||||
cfg = _CFG_BY_VARIANT[variant](r=2, alpha=2, dtype=torch.float16, target_roles=())
|
||||
try:
|
||||
ll.attach(model, cfg)
|
||||
except (TypeError, RuntimeError, AttributeError, ValueError) as e:
|
||||
print(f" {layer_cls.__name__:14s} {variant:6s}: fail-loud OK ({type(e).__name__})")
|
||||
else:
|
||||
raise AssertionError(f" {variant} on {layer_cls.__name__} should have failed loudly")
|
||||
del model
|
||||
|
||||
|
||||
def eva_smoke():
|
||||
"""EVA needs calibration data: drives forward + per-target SVD on inputs."""
|
||||
print("\n=== variant=eva (data-driven init via group_init+calibration_data) ===")
|
||||
torch.manual_seed(0)
|
||||
model = TinyModel().to(torch.float32)
|
||||
ids = torch.randint(0, 100, (2, 16))
|
||||
with torch.no_grad():
|
||||
y_base = model(ids).clone()
|
||||
|
||||
cfg = ll.EVAConfig(r=4, alpha=8, dtype=torch.float32)
|
||||
# 4 calibration batches of random ids
|
||||
calib = [torch.randint(0, 100, (2, 16)) for _ in range(4)]
|
||||
ll.attach(model, cfg, calibration_data=calib)
|
||||
n_trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
|
||||
print(f" trainable params={n_trainable} (lora_A AND lora_B both trainable per peft EVA)")
|
||||
# peft EVA keeps A as a trainable Parameter; SVD only changes the INIT.
|
||||
eva_layers = [m for m in model.modules() if hasattr(m, "lora_A")]
|
||||
assert all(layer.lora_A.requires_grad for layer in eva_layers), \
|
||||
"EVA lora_A must be a trainable Parameter (peft semantics)"
|
||||
print(f" SHOULD: lora_A.requires_grad==True on every EVA layer. PASS.")
|
||||
|
||||
with torch.no_grad():
|
||||
y_adapt = model(ids)
|
||||
err = (y_adapt - y_base).abs().max().item()
|
||||
print(f" t=0 identity: max|y_adapt - y_base| = {err:.3e}")
|
||||
assert err < 1e-6, f"EVA should be exact identity (B=0); got {err}"
|
||||
print(" SHOULD: err==0 (B=0 init). PASS.")
|
||||
|
||||
# check A buffer is non-zero (data-driven)
|
||||
a_norms = [layer.lora_A.norm().item() for layer in [m for m in model.modules() if hasattr(m, "lora_A")]]
|
||||
assert all(n > 0 for n in a_norms), "EVA lora_A buffers all zero -> group_init never ran"
|
||||
print(f" SHOULD: lora_A buffers populated. PASS (mean ||A||={sum(a_norms)/len(a_norms):.3f}).")
|
||||
|
||||
# save/load round-trip WITHOUT calibration data on load (load path uses _skip_group_init)
|
||||
ARTIFACT_DIR.mkdir(exist_ok=True)
|
||||
p = ARTIFACT_DIR / "eva_smoke_adapter.pt"
|
||||
ll.save(model, str(p))
|
||||
ll.detach(model)
|
||||
torch.manual_seed(0)
|
||||
model2 = TinyModel().to(torch.float32)
|
||||
ll.load(model2, str(p)) # must NOT require calibration_data
|
||||
with torch.no_grad():
|
||||
y_loaded = model2(ids)
|
||||
err2 = (y_loaded - y_adapt).abs().max().item()
|
||||
print(f" save/load (no calibration on load): max err = {err2:.3e}")
|
||||
assert err2 < 1e-6, f"EVA save/load mismatch {err2}"
|
||||
print(" SHOULD: load without calibration_data works (uses _skip_group_init). PASS.")
|
||||
ll.detach(model2)
|
||||
# re-attach model for training section below
|
||||
ll.attach(model, cfg, calibration_data=calib)
|
||||
|
||||
# gradient flow: only B trains
|
||||
target = torch.randn(2, 16, 100, dtype=torch.float32) * 0.1
|
||||
trainable = [p for p in model.parameters() if p.requires_grad]
|
||||
opt = torch.optim.SGD(trainable, lr=1e-2)
|
||||
losses = []
|
||||
for _ in range(20):
|
||||
opt.zero_grad()
|
||||
loss = (model(ids) - target).pow(2).mean()
|
||||
loss.backward()
|
||||
assert_no_base_grads(model)
|
||||
opt.step()
|
||||
losses.append(loss.item())
|
||||
drop = (losses[0] - losses[-1]) / max(losses[0], 1e-12)
|
||||
print(f" loss[0]={losses[0]:.4f} loss[-1]={losses[-1]:.4f} drop={100*drop:.1f}%")
|
||||
assert drop > 0.05
|
||||
print(" SHOULD: drop>5%. PASS.")
|
||||
ll.detach(model)
|
||||
|
||||
|
||||
def dora_bias_smoke():
|
||||
"""V3 review caught: DoRA was scaling bias by m/||V||. Fixed; bias passes through."""
|
||||
print("\n=== dora bias passthrough (V3 fix) ===")
|
||||
torch.manual_seed(0)
|
||||
d = 16
|
||||
layer = nn.Linear(d, d, bias=True).to(torch.float32)
|
||||
x = torch.randn(2, d)
|
||||
y_base = layer(x).detach()
|
||||
|
||||
class Wrap(nn.Module):
|
||||
def __init__(self, lin):
|
||||
super().__init__()
|
||||
self.config = type("Cfg", (), {"hidden_size": d})()
|
||||
self.layers = nn.ModuleList([lin])
|
||||
|
||||
def forward(self, x):
|
||||
return self.layers[0](x)
|
||||
|
||||
model = Wrap(layer)
|
||||
cfg = ll.DoRAConfig(r=2, alpha=4, dtype=torch.float32, target_roles=())
|
||||
ll.attach(model, cfg)
|
||||
with torch.no_grad():
|
||||
y_adapt = model(x)
|
||||
err = (y_adapt - y_base).abs().max().item()
|
||||
print(f" identity with bias=True: max err = {err:.3e}")
|
||||
assert err < 1e-5, f"DoRA bias-passthrough broken: err {err} (likely bias being scaled)"
|
||||
print(" SHOULD: identity err < 1e-5 even with bias. PASS.")
|
||||
ll.detach(model)
|
||||
|
||||
|
||||
def hra_forward_order_smoke():
|
||||
"""Distinguishing check that HRA forward applies x @ R^T, not x @ R.
|
||||
|
||||
Build R = H_0 H_1 ... H_{r-1} explicitly from U, and compare the adapted
|
||||
output to F.linear(x, W @ R). If our pre-hook iterated forward (x @ R, the
|
||||
bug), this would match only at identity init (paired rows give R^T = R).
|
||||
"""
|
||||
print("\n=== hra forward-order vs F.linear(x, W @ R) ===")
|
||||
torch.manual_seed(0)
|
||||
d = 8
|
||||
layer = nn.Linear(d, d, bias=False)
|
||||
x = torch.randn(2, 3, d)
|
||||
|
||||
cfg = ll.HRAConfig(r=4, alpha=4, dtype=torch.float32, target_roles=())
|
||||
class Wrap(nn.Module):
|
||||
def __init__(self_, lin):
|
||||
super().__init__()
|
||||
self_.config = type("Cfg", (), {"hidden_size": d})()
|
||||
self_.layers = nn.ModuleList([lin])
|
||||
def forward(self_, x):
|
||||
return self_.layers[0](x)
|
||||
model = Wrap(layer)
|
||||
ll.attach(model, cfg)
|
||||
|
||||
# break paired symmetry so order matters
|
||||
with torch.no_grad():
|
||||
layer.lora_U.add_(0.1 * torch.randn_like(layer.lora_U))
|
||||
|
||||
# build R = H_0 H_1 ... H_{r-1}
|
||||
U = layer.lora_U
|
||||
R = torch.eye(d)
|
||||
for i in range(U.shape[0]):
|
||||
u = U[i]
|
||||
sq = (u * u).sum().clamp_min(1e-12)
|
||||
R = R - (2.0 / sq) * torch.outer(R @ u, u)
|
||||
|
||||
with torch.no_grad():
|
||||
y_adapt = model(x)
|
||||
y_ref = torch.nn.functional.linear(x, layer.weight @ R)
|
||||
err = (y_adapt - y_ref).abs().max().item()
|
||||
print(f" ||y_adapt - F.linear(x, W @ R)||_inf = {err:.3e}")
|
||||
assert err < 1e-5, (
|
||||
"HRA forward order regression: should apply x @ R^T (loop reversed). "
|
||||
"If you reverse the loop in forward_input you'll get x @ R instead, "
|
||||
"and this check will fail with paired-symmetry-broken U."
|
||||
)
|
||||
print(" SHOULD: err < 1e-5 (proves loop applies x @ R^T not x @ R). PASS.")
|
||||
ll.detach(model)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--require-bnb", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
for v in ("lora", "pissa", "delora", "ia3", "dora", "hra", "antipasto"):
|
||||
variant_test(v, dtype=torch.float32)
|
||||
eva_smoke()
|
||||
dora_bias_smoke()
|
||||
hra_forward_order_smoke()
|
||||
structural_linear_like_test()
|
||||
bitsandbytes_cuda_smoke(args.require_bnb)
|
||||
print("\nALL PASS.")
|
||||
def main() -> int:
|
||||
failed = [v for v in VARIANTS if run_one(v) != 0]
|
||||
if failed:
|
||||
print(f"FAIL: {failed}")
|
||||
return 1
|
||||
print("ALL PASS.")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
sys.exit(main())
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
"""bnb 4bit/8bit CUDA smoke. Skipped without CUDA + bitsandbytes installed."""
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
import torch
|
||||
from torch import nn
|
||||
|
||||
import lora_lite as ll
|
||||
|
||||
|
||||
pytestmark = pytest.mark.skipif(not torch.cuda.is_available(), reason="needs CUDA")
|
||||
bnb = pytest.importorskip("bitsandbytes")
|
||||
|
||||
|
||||
CFG_BY_VARIANT = {
|
||||
"lora": ll.LoRAConfig,
|
||||
"ia3": ll.IA3Config,
|
||||
"hra": ll.HRAConfig,
|
||||
"pissa": ll.PiSSAConfig,
|
||||
"dora": ll.DoRAConfig,
|
||||
}
|
||||
|
||||
|
||||
class BnbModel(nn.Module):
|
||||
def __init__(self, layer_cls):
|
||||
super().__init__()
|
||||
self.config = type("Cfg", (), {"hidden_size": 8})()
|
||||
self.layers = nn.ModuleList([layer_cls(8, 8, bias=False)]).cuda()
|
||||
|
||||
def forward(self, x):
|
||||
return self.layers[0](x)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("layer_cls", [bnb.nn.Linear8bitLt, bnb.nn.Linear4bit])
|
||||
@pytest.mark.parametrize("variant", ["lora", "ia3", "hra"])
|
||||
def test_hook_only_variants_attach_to_bnb(layer_cls, variant):
|
||||
"""LoRA / IA3 / HRA only hook outputs; bnb dequantization is the layer's job."""
|
||||
torch.manual_seed(0)
|
||||
model = BnbModel(layer_cls)
|
||||
x = torch.randn(2, 3, 8, device="cuda")
|
||||
y_base = model(x).detach()
|
||||
|
||||
cfg = CFG_BY_VARIANT[variant](r=2, alpha=4, dtype=torch.float16, target_roles=())
|
||||
ll.attach(model, cfg)
|
||||
y = model(x)
|
||||
assert (y.detach() - y_base).abs().max().item() < 1e-2
|
||||
|
||||
y.pow(2).mean().backward()
|
||||
grad_total = sum(
|
||||
g.abs().sum().item()
|
||||
for n, p in model.named_parameters()
|
||||
if "lora_" in n and p.requires_grad and (g := p.grad) is not None
|
||||
)
|
||||
assert grad_total > 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize("layer_cls", [bnb.nn.Linear8bitLt, bnb.nn.Linear4bit])
|
||||
@pytest.mark.parametrize("variant", ["pissa", "dora"])
|
||||
def test_weight_reading_variants_reject_bnb(layer_cls, variant):
|
||||
model = BnbModel(layer_cls)
|
||||
cfg = CFG_BY_VARIANT[variant](r=2, alpha=2, dtype=torch.float16, target_roles=())
|
||||
with pytest.raises((TypeError, RuntimeError, AttributeError, ValueError)):
|
||||
ll.attach(model, cfg)
|
||||
+166
-160
@@ -1,6 +1,11 @@
|
||||
"""Per-variant attach + train + save + load round-trip, plus surgical regressions.
|
||||
|
||||
The big invariant is the parametrized train_save_load test: identity at t=0,
|
||||
gradient flow on a real loss, then save -> reload onto a fresh model and
|
||||
confirm the trained outputs survive the round-trip. Cheap on CPU.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
@@ -10,7 +15,31 @@ from torch import nn
|
||||
import lora_lite as ll
|
||||
|
||||
|
||||
ARTIFACT_DIR = Path(__file__).parent / "_artifacts"
|
||||
CFG_BY_VARIANT = {
|
||||
"lora": ll.LoRAConfig,
|
||||
"pissa": ll.PiSSAConfig,
|
||||
"delora": ll.DeLoRAConfig,
|
||||
"ia3": ll.IA3Config,
|
||||
"ia3_ff": ll.IA3FFConfig,
|
||||
"dora": ll.DoRAConfig,
|
||||
"hra": ll.HRAConfig,
|
||||
"eva": ll.EVAConfig,
|
||||
"antipasto": ll.AntiPaSTOConfig,
|
||||
}
|
||||
|
||||
# Per-variant identity tolerance at t=0 (after attach, before any step).
|
||||
# fp32 SVD round-trip + per-row norm = looser tolerance for pissa/dora/antipasto.
|
||||
IDENTITY_TOL = {
|
||||
"lora": 1e-6,
|
||||
"pissa": 5e-4,
|
||||
"delora": 1e-6,
|
||||
"ia3": 1e-6,
|
||||
"ia3_ff": 1e-6,
|
||||
"dora": 5e-5,
|
||||
"hra": 5e-6,
|
||||
"eva": 1e-6,
|
||||
"antipasto": 5e-4,
|
||||
}
|
||||
|
||||
|
||||
class TinyBlock(nn.Module):
|
||||
@@ -46,12 +75,14 @@ class TinyModel(nn.Module):
|
||||
|
||||
|
||||
class FakeLinearLike(nn.Module):
|
||||
"""linear-like, but not nn.Linear: stand-in for bnb 4/8-bit modules."""
|
||||
|
||||
def __init__(self, d_in: int = 8, d_out: int = 8):
|
||||
super().__init__()
|
||||
self.in_features = d_in
|
||||
self.out_features = d_out
|
||||
self.weight = nn.Parameter(torch.empty(d_out, d_in))
|
||||
nn.init.kaiming_uniform_(self.weight, a=5**0.5)
|
||||
nn.init.kaiming_uniform_(self.weight, a=5 ** 0.5)
|
||||
|
||||
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
||||
return torch.nn.functional.linear(x, self.weight)
|
||||
@@ -67,24 +98,9 @@ class FakeBnbModel(nn.Module):
|
||||
return self.layers[0](x)
|
||||
|
||||
|
||||
_CFG_BY_VARIANT = {
|
||||
"lora": ll.LoRAConfig,
|
||||
"pissa": ll.PiSSAConfig,
|
||||
"delora": ll.DeLoRAConfig,
|
||||
"ia3": ll.IA3Config,
|
||||
"ia3_ff": ll.IA3FFConfig,
|
||||
"dora": ll.DoRAConfig,
|
||||
"hra": ll.HRAConfig,
|
||||
"eva": ll.EVAConfig,
|
||||
"antipasto": ll.AntiPaSTOConfig,
|
||||
}
|
||||
|
||||
|
||||
def cfg_for_variant(variant: str, *, training: bool = False) -> ll.AdapterConfig:
|
||||
# DeLoRA keeps identity via B=0, so nonzero lambda is needed for the
|
||||
# perturb-output check to distinguish a live adapter from dead code.
|
||||
def cfg_for(variant: str) -> ll.AdapterConfig:
|
||||
extra = {"lambda0": 0.1} if variant == "delora" else {}
|
||||
return _CFG_BY_VARIANT[variant](
|
||||
return CFG_BY_VARIANT[variant](
|
||||
r=4,
|
||||
alpha=4 if variant == "pissa" else 8,
|
||||
dtype=torch.float32,
|
||||
@@ -92,182 +108,172 @@ def cfg_for_variant(variant: str, *, training: bool = False) -> ll.AdapterConfig
|
||||
)
|
||||
|
||||
|
||||
def adapter_state(model: nn.Module) -> dict[str, torch.Tensor]:
|
||||
return {k: v.detach().clone() for k, v in model.state_dict().items() if "lora_" in k}
|
||||
|
||||
|
||||
def assert_only_lora_trainable(model: nn.Module) -> None:
|
||||
trainable_names = [name for name, p in model.named_parameters() if p.requires_grad]
|
||||
assert trainable_names
|
||||
assert all("lora_" in name for name in trainable_names)
|
||||
|
||||
|
||||
def assert_no_base_grads(model: nn.Module) -> None:
|
||||
leaked = [name for name, p in model.named_parameters() if "lora_" not in name and p.grad is not None]
|
||||
assert leaked == []
|
||||
|
||||
|
||||
def perturb_first_adapter(model: nn.Module) -> None:
|
||||
"""Nudge one trainable adapter parameter so forward output changes.
|
||||
|
||||
Priority order matters: with B=0 init (DeLoRA, EVA, LoRA), perturbing a
|
||||
scalar gate or lambda alone keeps delta=0, so we hit a matrix entry first.
|
||||
"""
|
||||
priority = ("lora_B", "lora_g", "lora_U", "lora_A", "lora_lambda", "lora_gate")
|
||||
for key in priority:
|
||||
for name, p in model.named_parameters():
|
||||
if not p.requires_grad or key not in name:
|
||||
continue
|
||||
with torch.no_grad():
|
||||
if p.ndim == 0:
|
||||
p.add_(0.25)
|
||||
def attach_with_calib(model: nn.Module, cfg: ll.AdapterConfig, ids: torch.Tensor) -> None:
|
||||
if cfg.variant == "eva":
|
||||
calib = [ids for _ in range(2)]
|
||||
ll.attach(model, cfg, calibration_data=calib)
|
||||
else:
|
||||
p.flatten()[0].add_(0.25)
|
||||
return
|
||||
raise AssertionError("no perturbable adapter parameter found")
|
||||
ll.attach(model, cfg)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("variant", ["lora", "pissa", "delora", "ia3", "dora", "hra"])
|
||||
def test_variant_identity_hook_save_load_and_training(variant: str):
|
||||
ARTIFACT_DIR.mkdir(exist_ok=True)
|
||||
def trainable_grad_norm(model: nn.Module) -> float:
|
||||
return sum(
|
||||
p.grad.detach().float().norm().item()
|
||||
for n, p in model.named_parameters()
|
||||
if "lora_" in n and p.grad is not None
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("variant", list(CFG_BY_VARIANT))
|
||||
def test_train_save_load(variant: str, tmp_path: Path):
|
||||
"""Identity at t=0, one SGD step, save, reload onto fresh model, outputs match."""
|
||||
torch.manual_seed(0)
|
||||
model = TinyModel()
|
||||
ids = torch.randint(0, 100, (2, 16))
|
||||
|
||||
with torch.no_grad():
|
||||
y_base = model(ids).clone()
|
||||
|
||||
cfg = cfg_for_variant(variant)
|
||||
handles = ll.attach(model, cfg)
|
||||
assert len(handles) == 28
|
||||
assert_only_lora_trainable(model)
|
||||
cfg = cfg_for(variant)
|
||||
attach_with_calib(model, cfg, ids)
|
||||
|
||||
trainable = [p for p in model.parameters() if p.requires_grad]
|
||||
assert trainable
|
||||
assert all("lora_" in n for n, p in model.named_parameters() if p.requires_grad)
|
||||
|
||||
with torch.no_grad():
|
||||
y_init = model(ids).clone()
|
||||
identity_err = (y_init - y_base).abs().max().item()
|
||||
identity_tol = {"lora": 1e-6, "pissa": 5e-4, "delora": 1e-6, "ia3": 1e-6, "dora": 5e-5, "hra": 5e-6}[variant]
|
||||
assert identity_err < identity_tol
|
||||
assert (y_init - y_base).abs().max().item() < IDENTITY_TOL[variant]
|
||||
|
||||
target = torch.randn_like(y_init) * 0.1
|
||||
opt = torch.optim.SGD(trainable, lr=1e-2)
|
||||
opt.zero_grad()
|
||||
loss = (model(ids) - target).pow(2).mean()
|
||||
loss.backward()
|
||||
leaked = [n for n, p in model.named_parameters() if "lora_" not in n and p.grad is not None]
|
||||
assert leaked == []
|
||||
assert trainable_grad_norm(model) > 0
|
||||
opt.step()
|
||||
|
||||
before_perturb = adapter_state(model)
|
||||
perturb_first_adapter(model)
|
||||
with torch.no_grad():
|
||||
perturb_delta = (model(ids) - y_init).abs().max().item()
|
||||
assert perturb_delta > 1e-7
|
||||
for name, value in before_perturb.items():
|
||||
model.state_dict()[name].copy_(value)
|
||||
y_trained = model(ids).clone()
|
||||
|
||||
path = ARTIFACT_DIR / f"{variant}_adapter.pt"
|
||||
path = tmp_path / "adapter.pt"
|
||||
ll.save(model, str(path))
|
||||
saved = torch.load(path, weights_only=True, map_location="cpu")
|
||||
assert set(saved["state"]) == set(adapter_state(model))
|
||||
assert any(k.startswith("layers.0.q_proj.lora_") for k in saved["state"])
|
||||
|
||||
torch.manual_seed(0)
|
||||
model_loaded = TinyModel()
|
||||
ll.load(model_loaded, str(path))
|
||||
loaded_state = adapter_state(model_loaded)
|
||||
for name, value in saved["state"].items():
|
||||
assert torch.equal(loaded_state[name].cpu(), value)
|
||||
ll.load(model_loaded, str(path)) # EVA load skips group_init; calibration_data not needed
|
||||
with torch.no_grad():
|
||||
y_loaded = model_loaded(ids)
|
||||
assert (y_loaded - y_init).abs().max().item() < identity_tol
|
||||
|
||||
torch.manual_seed(0)
|
||||
train_model = TinyModel()
|
||||
ll.attach(train_model, cfg_for_variant(variant, training=True))
|
||||
assert_only_lora_trainable(train_model)
|
||||
target = torch.randn(2, 16, 100) * 0.1
|
||||
trainable = [p for p in train_model.parameters() if p.requires_grad]
|
||||
opt = torch.optim.Adam(trainable, lr=0.1) if variant in ("delora", "ia3", "hra") else (
|
||||
torch.optim.Adam(trainable, lr=1e-3) if variant == "dora" else torch.optim.SGD(trainable, lr=1e-2)
|
||||
)
|
||||
losses = []
|
||||
first_grad_norm = math.nan
|
||||
before_train = adapter_state(train_model)
|
||||
for step in range(20):
|
||||
opt.zero_grad()
|
||||
loss = (train_model(ids) - target).pow(2).mean()
|
||||
loss.backward()
|
||||
assert_no_base_grads(train_model)
|
||||
grad_norm = sum(
|
||||
p.grad.detach().float().norm().item()
|
||||
for name, p in train_model.named_parameters()
|
||||
if "lora_" in name and p.grad is not None
|
||||
)
|
||||
assert math.isfinite(grad_norm)
|
||||
if step == 0:
|
||||
first_grad_norm = grad_norm
|
||||
opt.step()
|
||||
losses.append(loss.item())
|
||||
after_train = adapter_state(train_model)
|
||||
adapter_delta = sum((after_train[k] - before_train[k]).float().norm().item() for k in before_train)
|
||||
drop = (losses[0] - losses[-1]) / losses[0]
|
||||
assert first_grad_norm > 0
|
||||
assert adapter_delta > 0
|
||||
assert drop > 0.05
|
||||
assert (y_loaded - y_trained).abs().max().item() < max(IDENTITY_TOL[variant], 1e-5)
|
||||
|
||||
|
||||
def test_load_fails_on_missing_and_unexpected_lora_keys():
|
||||
ARTIFACT_DIR.mkdir(exist_ok=True)
|
||||
@pytest.mark.parametrize("variant", ["lora", "delora", "ia3", "hra"])
|
||||
def test_hook_only_variants_attach_to_non_linear_target(variant: str):
|
||||
"""bnb-style targets are linear-like but not nn.Linear; hook-only variants must accept them."""
|
||||
extra = {"lambda0": 0.1} if variant == "delora" else {}
|
||||
cfg = CFG_BY_VARIANT[variant](r=2, alpha=4, dtype=torch.float32, target_roles=(), **extra)
|
||||
model = FakeBnbModel()
|
||||
ll.attach(model, cfg)
|
||||
x = torch.randn(2, 3, 8)
|
||||
model(x).pow(2).mean().backward()
|
||||
assert trainable_grad_norm(model) > 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize("variant", ["pissa", "dora", "antipasto"])
|
||||
def test_weight_reading_variants_reject_non_linear(variant: str):
|
||||
r = 4 if variant == "antipasto" else 2 # antipasto needs r % block_size==0
|
||||
cfg = CFG_BY_VARIANT[variant](r=r, alpha=r, dtype=torch.float32, target_roles=())
|
||||
with pytest.raises(TypeError, match="plain nn.Linear"):
|
||||
ll.attach(FakeBnbModel(), cfg)
|
||||
|
||||
|
||||
def test_save_load_strict_keys(tmp_path: Path):
|
||||
torch.manual_seed(0)
|
||||
model = TinyModel()
|
||||
ll.attach(model, cfg_for_variant("lora"))
|
||||
good_path = ARTIFACT_DIR / "lora_good.pt"
|
||||
ll.save(model, str(good_path))
|
||||
blob = torch.load(good_path, weights_only=True, map_location="cpu")
|
||||
ll.attach(model, ll.LoRAConfig(r=4, alpha=8, dtype=torch.float32))
|
||||
p = tmp_path / "lora.pt"
|
||||
ll.save(model, str(p))
|
||||
blob = torch.load(p, weights_only=True, map_location="cpu")
|
||||
|
||||
missing_blob = {"cfg": blob["cfg"], "state": dict(blob["state"])}
|
||||
missing_blob["state"].pop(next(iter(missing_blob["state"])))
|
||||
missing_path = ARTIFACT_DIR / "lora_missing.pt"
|
||||
torch.save(missing_blob, missing_path)
|
||||
missing = {"cfg": blob["cfg"], "state": dict(blob["state"]), "base_fp": blob.get("base_fp", {})}
|
||||
missing["state"].pop(next(iter(missing["state"])))
|
||||
torch.save(missing, p)
|
||||
with pytest.raises(RuntimeError, match="missing lora keys"):
|
||||
ll.load(TinyModel(), str(missing_path))
|
||||
ll.load(TinyModel(), str(p))
|
||||
|
||||
unexpected_blob = {"cfg": blob["cfg"], "state": dict(blob["state"])}
|
||||
unexpected_blob["state"]["layers.0.q_proj.lora_extra"] = torch.zeros(1)
|
||||
unexpected_path = ARTIFACT_DIR / "lora_unexpected.pt"
|
||||
torch.save(unexpected_blob, unexpected_path)
|
||||
bad = {"cfg": blob["cfg"], "state": dict(blob["state"]), "base_fp": blob.get("base_fp", {})}
|
||||
bad["state"]["layers.0.q_proj.lora_extra"] = torch.zeros(1)
|
||||
torch.save(bad, p)
|
||||
with pytest.raises(RuntimeError, match="unexpected lora keys"):
|
||||
ll.load(TinyModel(), str(unexpected_path))
|
||||
ll.load(TinyModel(), str(p))
|
||||
|
||||
|
||||
def test_no_target_layers_is_loud_failure():
|
||||
def test_no_target_layers_is_loud():
|
||||
cfg = ll.LoRAConfig(target_names=("definitely_missing",))
|
||||
with pytest.raises(RuntimeError, match="no target layers"):
|
||||
ll.attach(TinyModel(), cfg)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("variant", ["lora", "delora", "ia3", "hra"])
|
||||
def test_structural_non_linear_target_trains_for_forward_only_variants(variant: str):
|
||||
def test_eva_requires_calibration():
|
||||
"""EVA's group_init must error loudly if calibration_data is missing."""
|
||||
with pytest.raises(ValueError, match="calibration_data"):
|
||||
ll.attach(TinyModel(), ll.EVAConfig(r=4, alpha=8, dtype=torch.float32))
|
||||
|
||||
|
||||
def test_dora_bias_passthrough():
|
||||
"""Regression: DoRA must NOT scale bias; identity holds with bias=True at t=0."""
|
||||
torch.manual_seed(0)
|
||||
model = FakeBnbModel()
|
||||
x = torch.randn(2, 3, 8)
|
||||
y_base = model(x).detach()
|
||||
extra = {"lambda0": 0.1} if variant == "delora" else {}
|
||||
cfg = _CFG_BY_VARIANT[variant](
|
||||
r=2,
|
||||
alpha=4,
|
||||
dtype=torch.float32,
|
||||
target_roles=(),
|
||||
**extra,
|
||||
)
|
||||
ll.attach(model, cfg)
|
||||
y_init = model(x)
|
||||
# delora: lambda0=0.1 is small but B=0 still makes delta=0 at t=0, so identity holds.
|
||||
assert (y_init.detach() - y_base).abs().max().item() < 1e-6
|
||||
loss = y_init.pow(2).mean()
|
||||
loss.backward()
|
||||
assert_no_base_grads(model)
|
||||
adapter_grad_norm = sum(
|
||||
p.grad.detach().float().norm().item()
|
||||
for name, p in model.named_parameters()
|
||||
if "lora_" in name and p.grad is not None
|
||||
)
|
||||
assert adapter_grad_norm > 0
|
||||
d = 16
|
||||
layer = nn.Linear(d, d, bias=True)
|
||||
x = torch.randn(2, d)
|
||||
y_base = layer(x).detach()
|
||||
|
||||
class Wrap(nn.Module):
|
||||
def __init__(self, lin):
|
||||
super().__init__()
|
||||
self.config = type("Cfg", (), {"hidden_size": d})()
|
||||
self.layers = nn.ModuleList([lin])
|
||||
|
||||
def forward(self, x):
|
||||
return self.layers[0](x)
|
||||
|
||||
model = Wrap(layer)
|
||||
ll.attach(model, ll.DoRAConfig(r=2, alpha=4, dtype=torch.float32, target_roles=()))
|
||||
with torch.no_grad():
|
||||
y = model(x)
|
||||
assert (y - y_base).abs().max().item() < 1e-5
|
||||
|
||||
|
||||
@pytest.mark.parametrize("variant", ["pissa", "dora"])
|
||||
def test_weight_reading_variants_reject_structural_non_linear_target(variant: str):
|
||||
cfg = _CFG_BY_VARIANT[variant](r=2, alpha=2, dtype=torch.float32, target_roles=())
|
||||
with pytest.raises(TypeError, match="plain nn.Linear"):
|
||||
ll.attach(FakeBnbModel(), cfg)
|
||||
def test_hra_forward_is_x_R_T():
|
||||
"""HRA must apply x @ R^T (loop i = r-1 down to 0). Asymmetric U makes order observable."""
|
||||
torch.manual_seed(0)
|
||||
d = 8
|
||||
layer = nn.Linear(d, d, bias=False)
|
||||
x = torch.randn(2, 3, d)
|
||||
|
||||
class Wrap(nn.Module):
|
||||
def __init__(self, lin):
|
||||
super().__init__()
|
||||
self.config = type("Cfg", (), {"hidden_size": d})()
|
||||
self.layers = nn.ModuleList([lin])
|
||||
|
||||
def forward(self, x):
|
||||
return self.layers[0](x)
|
||||
|
||||
model = Wrap(layer)
|
||||
ll.attach(model, ll.HRAConfig(r=4, alpha=4, dtype=torch.float32, target_roles=()))
|
||||
# break paired symmetry so order matters
|
||||
with torch.no_grad():
|
||||
layer.lora_U.add_(0.1 * torch.randn_like(layer.lora_U))
|
||||
|
||||
U = layer.lora_U
|
||||
R = torch.eye(d)
|
||||
for i in range(U.shape[0]):
|
||||
u = U[i]
|
||||
sq = (u * u).sum().clamp_min(1e-12)
|
||||
R = R - (2.0 / sq) * torch.outer(R @ u, u)
|
||||
with torch.no_grad():
|
||||
y_adapt = model(x)
|
||||
y_ref = torch.nn.functional.linear(x, layer.weight @ R)
|
||||
assert (y_adapt - y_ref).abs().max().item() < 1e-5
|
||||
|
||||
Reference in New Issue
Block a user