diff --git a/recipes/launch.slurm b/recipes/launch.slurm index 28d4bee..39ee457 100644 --- a/recipes/launch.slurm +++ b/recipes/launch.slurm @@ -1,5 +1,5 @@ #!/bin/bash -#SBATCH --ntasks-per-node=1 # crucial - only 1 task per dist per node! +#SBATCH --ntasks-per-node=1 #SBATCH --exclusive #SBATCH --gres=gpu:8 #SBATCH --partition=production-cluster @@ -14,7 +14,7 @@ echo "START TIME: $(date)" MODEL=$1 TASK=$2 -VERSION=$3 +PRECISION=$3 ACCELERATOR=$4 OPTIONAL_ARGS=$5 @@ -23,7 +23,7 @@ NUM_NODES=$SLURM_NNODES GPUS_PER_NODE=8 WORLD_SIZE=$(($NUM_NODES*$GPUS_PER_NODE)) # Due to conflicts between Accelerate's DeepSpeed configs and Transformers' TrainingArguments, we need to parse the gradient accumulation steps from the config file to ensure they match -CONFIG_FILE=recipes/$MODEL/$TASK/config_$VERSION.yaml +CONFIG_FILE=recipes/$MODEL/$TASK/config_$PRECISION.yaml GRAD_ACC_STEPS=$(yq -r .gradient_accumulation_steps $CONFIG_FILE) # Split the string into individual arguments @@ -69,7 +69,7 @@ export NCCL_ASYNC_ERROR_HANDLING=1 # export NCCL_NSOCKS_PERTHREAD=1 # export CUDA_LAUNCH_BLOCKING=1 -# AWS specific +# Specific configuration for the Hugging Face Compute Cluster - be warned this may not work on other clusters! export NCCL_PROTO=simple export RDMAV_FORK_SAFE=1 export FI_EFA_FORK_SAFE=1 diff --git a/recipes/zephyr-7b/dpo/config_full.yaml b/recipes/zephyr-7b/dpo/config_full.yaml new file mode 100644 index 0000000..82258b8 --- /dev/null +++ b/recipes/zephyr-7b/dpo/config_full.yaml @@ -0,0 +1,37 @@ +# Model arguments +model_name_or_path: lewtun/zephyr-7b-sft + +# Data training arguments +# For definitions, see: src/h4/training/config.py +dataset_mixer: + HuggingFaceH4/ultrafeedback_binarized: 1.0 +dataset_splits: +- train_prefs +- test_prefs +preprocessing_num_workers: 12 + +# DPOTrainer arguments +bf16: true +beta: 0.1 +do_eval: true +evaluation_strategy: steps +eval_steps: 100 +gradient_accumulation_steps: 1 +gradient_checkpointing: true +hub_model_id: zephyr-7b-dpo +learning_rate: 5.0e-7 +log_level: info +logging_steps: 10 +lr_scheduler_type: linear +max_length: 1024 +max_prompt_length: 512 +num_train_epochs: 3 +optim: rmsprop +output_dir: data/zephyr-7b-dpo +per_device_train_batch_size: 4 +per_device_eval_batch_size: 4 +push_to_hub: true +save_strategy: "no" +save_total_limit: null +seed: 42 +warmup_ratio: 0.1 \ No newline at end of file diff --git a/recipes/zephyr-7b/sft/config_full.yaml b/recipes/zephyr-7b/sft/config_full.yaml index e7e786a..8ceb856 100644 --- a/recipes/zephyr-7b/sft/config_full.yaml +++ b/recipes/zephyr-7b/sft/config_full.yaml @@ -17,6 +17,7 @@ bf16: true evaluation_strategy: epoch gradient_accumulation_steps: 2 gradient_checkpointing: true +hub_model_id: zephyr-7b-sft hub_strategy: every_save learning_rate: 2.0e-05 log_level: info @@ -31,7 +32,6 @@ overwrite_output_dir: true per_device_eval_batch_size: 16 per_device_train_batch_size: 32 push_to_hub: True -push_to_hub_model_id: zephyr-7b-sft remove_unused_columns: true report_to: - tensorboard diff --git a/scripts/run_dpo.py b/scripts/run_dpo.py index b6f1cba..542de20 100644 --- a/scripts/run_dpo.py +++ b/scripts/run_dpo.py @@ -14,31 +14,24 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import random -import subprocess import sys -from datetime import timedelta import torch import transformers from transformers import set_seed -import wandb -from accelerate import Accelerator, InitProcessGroupKwargs -from h4.data import get_datasets -from h4.training import DataArguments, DPOTrainingArguments, ModelArguments, init_wandb_training -from h4.utils import ( +from accelerate import Accelerator +from alignment import ( + DataArguments, + DPOConfig, H4ArgumentParser, + ModelArguments, apply_chat_template, - convert_to_safetensors, + get_datasets, get_kbit_device_map, get_peft_config, get_quantization_config, get_tokenizer, - hf_login, - is_slurm_available, - push_to_hub_revision, - run_mt_bench_job, ) from trl import DPOTrainer @@ -47,7 +40,7 @@ logger = logging.getLogger(__name__) def main(): - parser = H4ArgumentParser((ModelArguments, DataArguments, DPOTrainingArguments)) + parser = H4ArgumentParser((ModelArguments, DataArguments, DPOConfig)) model_args, data_args, training_args = parser.parse() ####### @@ -69,18 +62,11 @@ def main(): logger.info(f"Data parameters {data_args}") logger.info(f"Training/evaluation parameters {training_args}") - # Setup WandB - if training_args.wandb_enabled: - init_wandb_training(training_args) - - # Login to HuggingFace Hub if needed - hf_login() - # Set seed for reproducibility set_seed(training_args.seed) # Increase distributed timeout to 3h to enable push to Hub to complete - accelerator = Accelerator(kwargs_handlers=[InitProcessGroupKwargs(timeout=timedelta(seconds=6 * 1800))]) + accelerator = Accelerator() ############### # Load datasets @@ -114,12 +100,6 @@ def main(): {"text_prompt": "prompt", "text_chosen": "chosen", "text_rejected": "rejected"} ) - # Log a few random samples from the training set: - for index in random.sample(range(len(raw_datasets["train"])), 3): - logger.info(f"Prompt sample {index} of the raw training set:\n\n{raw_datasets['train'][index]['prompt']}") - logger.info(f"Chosen sample {index} of the raw training set:\n\n{raw_datasets['train'][index]['chosen']}") - logger.info(f"Rejected sample {index} of the raw training set:\n\n{raw_datasets['train'][index]['rejected']}") - torch_dtype = ( model_args.torch_dtype if model_args.torch_dtype in ["auto", None] else getattr(torch, model_args.torch_dtype) ) @@ -136,7 +116,7 @@ def main(): ref_model = model_args.model_name_or_path ref_model_kwargs = model_kwargs - if model_args.use_peft: + if model_args.use_peft is True: ref_model = None ref_model_kwargs = None @@ -153,7 +133,7 @@ def main(): train_dataset=raw_datasets["train"], eval_dataset=raw_datasets["test"], tokenizer=tokenizer, - max_length=training_args.max_seq_length, + max_length=training_args.max_length, max_prompt_length=training_args.max_prompt_length, peft_config=get_peft_config(model_args), ) @@ -178,7 +158,7 @@ def main(): ########## if training_args.do_eval: logger.info("*** Evaluate ***") - metrics = dpo_trainer.evaluate(eval_dataset=raw_datasets["test"]) + metrics = dpo_trainer.evaluate() max_eval_samples = ( data_args.max_eval_samples if data_args.max_eval_samples is not None else len(raw_datasets["test"]) ) @@ -190,43 +170,23 @@ def main(): # Save model and create model card ################################## dpo_trainer.save_model(training_args.output_dir) - # Save everything else on main process if accelerator.is_main_process: - kwargs = {"finetuned_from": model_args.model_name_or_path, "tasks": "text-generation"} - kwargs["dataset"] = list(data_args.dataset_mixer.keys()) + kwargs = { + "finetuned_from": model_args.model_name_or_path, + "dataset": list(data_args.dataset_mixer.keys()), + "tags": ["alignment-handbook"], + } dpo_trainer.create_model_card(**kwargs) # Restore k,v cache for fast inference dpo_trainer.model.config.use_cache = True - # Fix custom code paths - if model_args.trust_remote_code is True: - auto_map = dpo_trainer.model.config.auto_map - dpo_trainer.model.config.auto_map = {k: v.split("--")[-1] for k, v in auto_map.items()} dpo_trainer.model.config.save_pretrained(training_args.output_dir) - # FSDP/DeepSpeed save the model as a single `pytorch_model.bin` file, so we need to shard it. - # We run this in a subprocess to avoid interference from the accelerators. - subprocess.run( - [ - "python", - "scripts/training/shard_checkpoint.py", - f"--output_dir={training_args.output_dir}", - f"--trust_remote_code={model_args.trust_remote_code}", - ], - check=True, - ) - # Convert torch weights to safetensors for deployment with TGI - convert_to_safetensors(training_args.output_dir) - if training_args.push_to_hub_revision: - is_model_on_hub = push_to_hub_revision(training_args, model_args) - # Run automatic evaluation once the model is pushed to the Hub - if is_slurm_available() and is_model_on_hub is True and training_args.do_eval is True: - logger.info("*** Launching MT Bench ***") - run_mt_bench_job(training_args, model_args) + if training_args.push_to_hub is True: + dpo_trainer.push_to_hub() # Ensure we don't timeout on model save / push to Hub logger.info("*** Waiting for all processes to finish ***") accelerator.wait_for_everyone() - wandb.finish() logger.info("*** Run complete! ***")