Files
2025-08-23 08:18:24 +08:00

392 lines
14 KiB
Python

import math
import statistics
import numpy as np
import scipy.stats
from scipy.stats import kendalltau
from loguru import logger
from typing import Dict, List
import re
from judgemark_v2lp.config.constants import REFERENCE_MODEL_SCORES
from judgemark_v2lp.utils.stats import normalize
def detokenize(tokens: List[str]) -> str:
"""
Very rough undo for common Hugging-Face / SentencePiece / BPE markers:
- GPT-2: 'Ġ' prefix → leading space
- SentencePiece: '▁' prefix → leading space
- newline marker: 'Ċ' prefix → space
- BERT WordPieces: '##' prefix → no space, just glue on
Falls back to stripping any leading run of those markers, then
inserting a single space before each token that isn't a '##' piece.
"""
out = ""
for t in tokens:
if "Ċ" in t:
t = t.replace("Ċ", "\n")
if t.startswith("##"):
# BERT-style subword: glue to previous
out += t[2:]
elif t.startswith("Ġ") or t.startswith("▁"):
# GPT-2 or SentencePiece: prefix with space
if out and not out.endswith(" "):
out += " "
out += t[1:]
else:
# Normal token: just append it
out += t
return out.lstrip()
def parse_scores(judge_model_response: str, logprobs: list) -> Dict[str,float]:
"""
Extracts zero or more named numeric scores from a text using a simple Regex pattern:
<metric name>: <score>
The metric name can be any string without newlines or colons.
The score can be a positive or negative float or integer.
Example lines in the judge output might be:
"Realism Score: 7.5"
"Melodramatic: 2"
"""
scores: Dict[str, float] = {}
logps ={}
pattern = r"(.*?):\s*(?:Score\s+)?(-?\d+(?:\.\d+)?)"
choices = [str(i) for i in range(11)]
window_size = 20
# Look for lines or statements like "Something: 3.5" or "Something Score 3.5"
for ti, row in enumerate(logprobs):
if row['token'] in choices:
# get previous window (incl) to check for regexp
prev_window = [t['token'] for t in logprobs[max(0, ti-window_size):ti+1]]
prev_text = detokenize(prev_window)
matches = re.findall(pattern, prev_text)
if matches:
match = matches[-1] # take the last match in the window
metric_name = match[0].strip()
numeric_val = float(match[1])
scores[metric_name] = numeric_val
logp_dict = {t['token']:t['logprob'] for t in row['top_logprobs']}
logp_arr = [logp_dict.get(c, -100) for c in choices]
# extra logprob of choices
logps[metric_name] = logp_arr
return scores, logps
def compute_raw_score(scores: Dict[str,float]) -> float:
"""
Given a dict of {criteria: numeric score}, compute a single raw score by adjusting
negative-themed criteria by inverting them, then normalizing to 0-10 scale.
"""
valid_scores = {k: v for k, v in scores.items() if 0 <= v <= 10}
if len(valid_scores) < 10:
return None
negative_markers = [
"melodramatic", "shallow resolution", "unearned resolution",
"simplistic moralizing", "shallow optimism", "forced optimism",
"trite", "overwrought", "amateurish", "contrived", "uninspiring",
"characters are too good", "incongruent ending positivity",
"unearned transformations", "profundity over-reach",
"amateurish descriptives", "clunky asides", "stilted dialogue",
"tit-for-tat dialogue", "purple prose", "uncreative", "tell-don't-show",
"weak dialogue", "meandering"
]
sum_val = 0.0
for criteria, val in valid_scores.items():
crit_lower = criteria.lower().strip()
if any(neg in crit_lower for neg in negative_markers):
sum_val += (10 - val)
else:
sum_val += val
avg_val = sum_val / len(valid_scores)
return round(avg_val, 2)
def confidence_interval_95(data: List[float]) -> float:
"""
Computes the 95% confidence interval for the mean using normal approximations:
CI95 = 1.96 * (std / sqrt(n)), for n>30 or so.
"""
n = len(data)
if n < 2:
return 0.0
mean_ = statistics.mean(data)
stdev_ = statistics.pstdev(data) if n == 1 else statistics.stdev(data)
ci95 = 1.96 * (stdev_ / math.sqrt(n))
return ci95
def compute_detailed_distribution(scores):
if not scores:
return {}
return {
"count": len(scores),
"min": round(min(scores), 3),
"max": round(max(scores), 3),
"mean": round(statistics.mean(scores), 3),
"median": round(statistics.median(scores), 3),
"stdev": round(statistics.stdev(scores) if len(scores) > 1 else 0.0, 3),
"p10": round(float(np.percentile(scores, 10)), 3),
"p25": round(float(np.percentile(scores, 25)), 3),
"p75": round(float(np.percentile(scores, 75)), 3),
"p90": round(float(np.percentile(scores, 90)), 3)
}
def compute_model_level_stats(scores_by_model, lengths_by_model):
model_stats = {}
for model_name, scores in scores_by_model.items():
if model_name not in lengths_by_model:
logger.warning(f"Model {model_name} has no lengths data, skipping.")
continue
lengths = lengths_by_model[model_name]
stats = {
"count": len(scores),
"mean": statistics.mean(scores),
"median": statistics.median(scores),
"stdev": statistics.stdev(scores) if len(scores) > 1 else 0.0,
"ci95": confidence_interval_95(scores),
"min": min(scores),
"max": max(scores)
}
# Length correlation
if len(lengths) == len(scores):
corr, _ = scipy.stats.pearsonr(lengths, scores)
stats["length_correlation"] = corr
model_stats[model_name] = stats
return model_stats
def compute_cross_model_stats(scores_by_model_all, scores_by_model_by_iter):
"""
Computes cross-model stats. ANOVA, Kruskal, and std_dev_across_models
are calculated over all scores. Pearson/Kendall correlations are now
computed on a per-iteration basis and then averaged.
"""
# ANOVA/Kruskal portion remains over ALL model scores
arrays = list(scores_by_model_all.values())
f_stat, f_p = scipy.stats.f_oneway(*arrays)
kw_stat, kw_p = scipy.stats.kruskal(*arrays)
# Standard deviation across model means (over entire distribution)
model_means = [statistics.mean(scores) for scores in arrays]
std_across_models = statistics.pstdev(model_means)
# --------------------
# Compute correlation stats per iteration, then average
# --------------------
iteration_keys = set()
for model, by_iter_dict in scores_by_model_by_iter.items():
iteration_keys |= set(by_iter_dict.keys()) # union of all iteration keys
iteration_pearsons = []
iteration_kendalls = []
for it_key in iteration_keys:
# Gather means for each model in this iteration
model_means_dict = {}
for model, by_iter_dict in scores_by_model_by_iter.items():
if it_key in by_iter_dict and len(by_iter_dict[it_key]) > 0:
model_means_dict[model] = statistics.mean(by_iter_dict[it_key])
# Pair each model's iteration-mean with reference score
ref_pairs = []
for m, mean_val in model_means_dict.items():
if m in REFERENCE_MODEL_SCORES:
ref_pairs.append((mean_val, REFERENCE_MODEL_SCORES[m]))
# If enough models exist for correlation, compute it
if len(ref_pairs) >= 2:
means, refs = zip(*ref_pairs)
p_r, _ = scipy.stats.pearsonr(means, refs)
k_tau, _ = scipy.stats.kendalltau(means, refs)
else:
p_r, k_tau = (0.0, 0.0)
iteration_pearsons.append(p_r)
iteration_kendalls.append(k_tau)
# HACK
print('k_tau', means, refs, k_tau)
# Final correlation = average across iteration-level correlations
if iteration_pearsons:
pearson_r = statistics.mean(iteration_pearsons)
else:
pearson_r = 0.0
if iteration_kendalls:
kendall_tau = statistics.mean(iteration_kendalls)
else:
kendall_tau = 0.0
return {
"anova_f": f_stat,
"anova_p": f_p,
"kw_stat": kw_stat,
"kw_p": kw_p,
"std_dev_across_models": std_across_models,
"pearson_r": pearson_r,
"kendall_tau": kendall_tau,
"normalized_components": {
"pearson_r": normalize(pearson_r, 0.7, 1.0),
"kendall_tau": normalize(kendall_tau, 0.1, 1.0),
"anova_f": normalize(f_stat, 0.0, 350.0),
"kw_stat": normalize(kw_stat, 0.0, 1800.0),
"std_dev": normalize(std_across_models, 0.0, 2.6)
}
}
def build_landmark_calibration_config(scores, desired_points=None):
"""
Creates a piecewise-linear calibration from these raw distribution
landmarks: [min, Q1, median, Q3, max]
to the given desired_points, e.g. [0, 3, 5, 7, 10].
Returns a dict describing how to transform future scores.
"""
if not scores or len(scores) < 2:
# Degenerate case: no meaningful distribution
return {
"method": "piecewise_landmark",
"in_landmarks": [],
"out_landmarks": []
}
if desired_points is None:
desired_points = [0, 3, 5, 7, 10]
in_min = min(scores)
in_q1 = float(np.percentile(scores, 25))
in_med = float(statistics.median(scores))
in_q3 = float(np.percentile(scores, 75))
in_max = max(scores)
return {
"method": "piecewise_landmark",
"in_landmarks": [in_min, in_q1, in_med, in_q3, in_max],
"out_landmarks": desired_points
}
def apply_landmark_calibration(x, config):
"""
Apply the piecewise-linear transform defined by config:
"in_landmarks" = [minVal, q1Val, medVal, q3Val, maxVal]
"out_landmarks" = [outMin, outQ1, outMed, outQ3, outMax].
If x is < min or > max, we extrapolate linearly beyond that segment.
"""
inL = config.get("in_landmarks", [])
outL = config.get("out_landmarks", [])
if len(inL) != 5 or len(outL) != 5:
# Invalid or degenerate config => just return x unchanged
return x
in_min, in_q1, in_med, in_q3, in_max = inL
out_min, out_q1, out_med, out_q3, out_max = outL
def linear_map(val, old_lo, old_hi, new_lo, new_hi):
if abs(old_hi - old_lo) < 1e-12:
return new_lo
frac = (val - old_lo) / (old_hi - old_lo)
return new_lo + frac * (new_hi - new_lo)
# Determine which segment x belongs to:
if x <= in_q1:
# (in_min -> in_q1) -> (out_min -> out_q1), but possibly x < in_min => extrapolate
return linear_map(x, in_min, in_q1, out_min, out_q1)
elif x <= in_med:
return linear_map(x, in_q1, in_med, out_q1, out_med)
elif x <= in_q3:
return linear_map(x, in_med, in_q3, out_med, out_q3)
else:
# (in_q3 -> in_max) -> (out_q3 -> out_max), possibly x > in_max => extrapolate
return linear_map(x, in_q3, in_max, out_q3, out_max)
def log_score_summary(score_type: str, cross_stats: Dict, model_stats: Dict):
"""Log a readable summary of score statistics."""
s = "\n\n"
s += f"\n------- {score_type} Summary -------\n"
s += f"ANOVA F-value: {cross_stats['anova_f']:.4f}, p={cross_stats['anova_p']:.4f}\n"
s += f"Kruskal-Wallis: {cross_stats['kw_stat']:.4f}, p={cross_stats['kw_p']:.4f}\n"
s += f"Pearson r={cross_stats['pearson_r']:.4f}\n"
s += f"Kendall τ={cross_stats['kendall_tau']:.4f}\n"
s += f"Std.Dev across models: {cross_stats['std_dev_across_models']:.4f}\n"
s += "\nModel Scores:\n"
sorted_models = sorted(
model_stats.items(),
key=lambda kv: kv[1]["mean"],
reverse=True
)
for model, stats in sorted_models:
line = f"{model:.<40} {stats['mean']:.3f} ±{stats['ci95']:.3f}\n"
s += line
s += "\n------------------------------------\n"
logger.info(s)
return s
def compute_weighted_score(logp):
outs = {}
choices = np.arange(11) # Choices are 0-10
for metric, logp_arr in logp.items():
probs = np.exp(logp_arr)
# power = 3
# powered_probs = probs ** power
weights = probs / (probs.sum() + 1e-12)
outs[metric] = (weights * choices).sum().item()
return outs
def compute_logpweighted_score(logp):
outs = {}
choices = np.arange(11) # Choices are 0-10
for metric, logp_arr in logp.items():
# probs = np.exp(logp_arr)
# weights = probs / (probs.sum() + 1e-12)
outs[metric] = (logp_arr * choices).sum().item()
return outs
# def compute_weighted_score(logp, scale=2.0): # Tune scale (>1 for sharpening, e.g., 1.5-5)
# outs = {}
# choices = np.arange(11) # Choices are 0-10
# for metric, logp_arr in logp.items():
# # Scale logprobs directly (sharpens for scale >1)
# scaled_logp = logp_arr * scale
# # Apply softmax in log space to get weights (no intermediate prob normalization)
# max_logp = np.max(scaled_logp) # For numerical stability
# exp_terms = np.exp(scaled_logp - max_logp)
# weights = exp_terms / (exp_terms.sum() + 1e-12)
# # Weighted average
# outs[metric] = (weights * choices).sum().item()
# return outs
def compute_ranked_score(logp):
outs = {}
choices = np.arange(11) # Choices are 0-10
for metric, logp_arr in logp.items():
# res = kendalltau(choices, logp_arr, variant='b')
# lets just use the common numbers 1,3,5,7,9, as some models like to skip some
res = kendalltau(choices, logp_arr, variant='b', alternative='less', method='asymptotic')
# print(res.correlation, res.pvalue, res2.correlation, res2.pvalue)
# correlation weighted by pvalue
# decision = (res.correlation+1)*5 # scale to 0-10
decision = ((res.correlation + 1)**5 * 5) # power-law emphasise # better
k = 5
# decision = 5 * (np.exp(res.correlation) - np.exp(-1)) / (np.exp(1) - np.exp(-1)) # meh
# decision = 10 / (1 + np.exp(-k * res.correlation)) # better
decision = (2*decision*res.pvalue).clip(0, 10)
outs[metric] = decision.item()
return outs