Files
Open-Assistant/model/supervised_finetuning/utils.py
T
2023-02-11 10:33:25 +01:00

278 lines
10 KiB
Python

import random
from pathlib import Path
from typing import List, NamedTuple
import evaluate
import transformers
import yaml
from custom_datasets import get_one_dataset
from custom_datasets.dialogue_collator import DialogueDataCollator, TrainDialogueDataCollator
from custom_datasets.qa_datasets import QA_SPECIAL_TOKENS
from losses import CrossEntropyLoss, PolyLoss
from models import freeze_top_n_layers, get_specific_model
from sklearn.model_selection import train_test_split
from torch.utils.data import ConcatDataset, Subset
from torch.utils.data.sampler import Sampler
class PerDatasetSampler(Sampler):
"""Sampler which returns a fixed number of samples per dataset, per epoch.
Example:
Dataset 1 has 10,000 examples and we want 200 per epoch
Dataset 2 has 500 examples and we want all 500 per epoch
Epoch size will be 700 and every epoch we'll sample a different
200 from dataset 1.
Parameters
----------
dataset_sizes : List[int]
A list with the size of each dataset.
dataset_size_per_epoch : List[int]
How many examples to get from each dataset per epoch.
Note: dataset_sizes & dataset_size_per_epoch must be in the same order.
Further the examples in the underlying torch.utils.data.Dataset
must per ordered as dataset_1, dataset_2, ..., dataset_n. This is fine
if we concatenate a bunch of datasets together
e.g. using torch.utils.data.ConcatDataset which is current behaviour.
"""
def __init__(self, dataset_sizes: List[int], dataset_size_per_epoch: List[int]):
self.dataset_sizes = dataset_sizes
self.dataset_size_per_epoch = dataset_size_per_epoch
self.num_datasets = len(dataset_sizes)
def __iter__(self):
epoch_idx = []
n = 0
for i in range(self.num_datasets):
sampled_idx = random.sample(range(n, self.dataset_sizes[i] + n), self.dataset_size_per_epoch[i])
n += self.dataset_sizes[i]
epoch_idx.extend(sampled_idx)
random.shuffle(epoch_idx)
return iter(epoch_idx)
def __len__(self):
return int(sum(self.dataset_size_per_epoch))
@classmethod
def build_sampler_from_config(cls, training_conf, datasets):
dataset_sizes = [len(x) for x in datasets]
fractions = get_dataset_fractions(training_conf.datasets, dataset_sizes)
dataset_size_per_epoch = [int(size * frac) for size, frac in zip(dataset_sizes, fractions)]
return cls(dataset_sizes, dataset_size_per_epoch)
def get_dataset_fractions(conf, dataset_sizes):
"""Calculate fraction of each dataset to use per epoch when subsampling"""
fractions = []
for i, data_config in enumerate(conf):
dataset_name = get_dataset_name_from_data_config(data_config)
if isinstance(data_config, dict):
if "fraction" in data_config[dataset_name]:
if data_config[dataset_name]["fraction"] <= 0:
raise ValueError("Please specify fraction as a value between 0 < fraction <= 1")
fractions.append(min(1, data_config[dataset_name]["fraction"]))
elif "size" in data_config[dataset_name]:
if data_config[dataset_name]["size"] > dataset_sizes[i]:
raise ValueError(f"Please specify a size smaller than number of examples: {dataset_sizes[i]:,.0f}")
fractions.append(data_config[dataset_name]["size"] / dataset_sizes[i])
else:
raise ValueError("Please specify either fraction or size in config.yaml. See README for instructions.")
else:
fractions.append(1)
return fractions
class SpecialTokens(NamedTuple):
pad_token: str = ""
eos_token: str = ""
sep_token: str = ""
class TokenizerConfig(NamedTuple):
special_tokens: SpecialTokens = {}
TOKENIZER_CONFIGS = {
"galactica": TokenizerConfig(special_tokens=SpecialTokens("<pad>", "</s>")),
"GPT-JT": TokenizerConfig(special_tokens=SpecialTokens(sep_token="<|extratoken_100|>")),
"codegen": TokenizerConfig(special_tokens=SpecialTokens("<|endoftext|>", sep_token="<|endoftext|>")),
"pythia": TokenizerConfig(special_tokens=SpecialTokens("<|padding|>", "<|endoftext|>", "<|endoftext|>")),
}
def match_tokenizer_name(model_name: str) -> TokenizerConfig:
"""
Match a partial model name to a tokenizer configuration
i.e. model_name `Salesforce/codegen-2B-multi` has config name `codegen`
"""
tokenizer_config_matches = [config for name, config in TOKENIZER_CONFIGS.items() if name in model_name]
if not tokenizer_config_matches:
raise ValueError(f"Cannot find any tokeniser configuration to match {model_name=}")
elif 1 < len(tokenizer_config_matches):
raise ValueError(f"Found multiple tokeniser configuration matches for {model_name=}")
else:
return tokenizer_config_matches[0]
def get_tokenizer(conf) -> transformers.AutoTokenizer:
tokenizer = transformers.AutoTokenizer.from_pretrained(conf.model_name, cache_dir=conf.cache_dir)
tokenizer_config = match_tokenizer_name(conf.model_name)
if tokenizer_config.special_tokens:
if "GPT-JT" in conf.model_name:
tokenizer_config.special_tokens.pad_token = tokenizer.eos_token
# SpecialTokens : latest in 4.25, 4.26
tokenizer.add_special_tokens(
{
"pad_token": tokenizer_config.special_tokens.pad_token,
"eos_token": tokenizer_config.special_tokens.eos_token,
"sep_token": tokenizer_config.special_tokens.sep_token,
}
)
additional_special_tokens = (
[]
if "additional_special_tokens" not in tokenizer.special_tokens_map
else tokenizer.special_tokens_map["additional_special_tokens"]
)
additional_special_tokens = list(set(additional_special_tokens + list(QA_SPECIAL_TOKENS.values())))
tokenizer.add_special_tokens({"additional_special_tokens": additional_special_tokens})
return tokenizer
def default_preprocess(eval_pred, ignote_negative_labels=True):
preds, labels = eval_pred.predictions, eval_pred.label_ids
if not ignote_negative_labels:
return preds, labels
mask = labels > 0
return preds[mask], labels[mask]
# placeholder for now
def preprocess_qa(eval_pred):
return (eval_pred.predictions, eval_pred.label_ids)
# def postprocess_summarization(preds, labels):
# preds = [pred.strip() for pred in preds]
# labels = [label.strip() for label in labels]
# preds = ["\n".join(nltk.sent_tokenize(pred)) for pred in preds]
# labels = ["\n".join(nltk.sent_tokenize(label)) for label in labels]
# return preds, labels
# def preprocess_summarization(eval_pred, tokenizer, ignore_pad_token_for_loss=True):
# preds, labels = eval_pred
# decoded_preds = tokenizer.batch_decode(preds, skip_special_tokens=True)
# if ignore_pad_token_for_loss:
# labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
# decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
# decoded_preds, decoded_labels = postprocess_summarization(decoded_preds, decoded_labels)
# return decoded_preds, decoded_labels
def get_metrics(conf, tokenizer):
# the reason behind using a list is that we might want to extend the list of our
# metrics in the future for more thorough evaluation
metrics, preprocess_fns = [evaluate.load("accuracy")], [default_preprocess]
# if any(dataset in QA_DATASETS for dataset in conf.datasets):
# raise ValueError("TODO")
# metrics.append(evaluate.load("squad_v2"))
# preprocess_fns.append(preprocess_qa)
# if any(dataset in SUMMARIZATION_DATASETS for dataset in conf.datasets):
# raise ValueError("TODO")
# metrics.append(evaluate.load("rouge"))
# preprocess_fns.append(
# partial(preprocess_summarization, tokenizer, ignore_pad_token_for_loss=conf.ignore_pad_token_for_loss)
# )
return metrics, preprocess_fns
def get_model(conf, tokenizer):
model = get_specific_model(conf.model_name, conf.cache_dir, conf.quantization, conf.seq2seqmodel)
if len(tokenizer) != model.get_input_embeddings().num_embeddings:
assert not conf.freeze_layer, "Cannot change the number of embeddings if the model is frozen."
model.resize_token_embeddings(len(tokenizer))
if conf.freeze_layer:
model = freeze_top_n_layers(model, conf.freeze_layer)
model_parameters = filter(lambda p: p.requires_grad, model.parameters())
params = sum([p.numel() for p in model_parameters])
print("Number of trainable parameters: {}M".format(int(params / 1e6)))
return model
def get_dataset_name_from_data_config(data_config):
if isinstance(data_config, dict):
return list(data_config.keys())[0]
return data_config
def get_dataset(conf, tokenizer):
train_datasets, evals = [], {}
for data_config in conf.datasets:
dataset_name = get_dataset_name_from_data_config(data_config)
train, val = get_one_dataset(conf, dataset_name)
train_datasets.append(train)
evals[dataset_name] = Subset(val, list(range(min(len(val), conf.eval_size)))) if conf.eval_size else val
train = ConcatDataset(train_datasets)
collate_fn = DialogueDataCollator(tokenizer, max_length=conf.max_length)
train_collate_fn = (
TrainDialogueDataCollator(tokenizer, max_length=conf.max_length) if conf.samples_mixing else collate_fn
)
return train, evals, collate_fn, train_collate_fn
def get_loss(loss, poly_eps):
if loss == "CrossEntropyLoss":
return CrossEntropyLoss()
elif loss == "Poly":
return PolyLoss(epsilon=poly_eps)
else:
raise ValueError(f"Loss {loss} not supported")
def read_yamls(dir):
conf = {}
no_conf = True
for config_file in Path(dir).glob("**/*.yaml"):
no_conf = False
with config_file.open("r") as f:
conf.update(yaml.safe_load(f))
if no_conf:
print(f"WARNING: No yaml files found in {dir}")
return conf
def train_val_dataset(dataset, val_split=0.2):
train_idx, val_idx = train_test_split(
list(range(len(dataset))), test_size=val_split, random_state=666, shuffle=True
)
return Subset(dataset, train_idx), Subset(dataset, val_idx)