Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support rerank model finetuning #578

Merged
merged 13 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions comps/finetuning/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# LLM Fine-tuning Microservice
# Fine-tuning Microservice

LLM Fine-tuning microservice involves adapting a base model to a specific task or dataset to improve its performance on that task.
Fine-tuning microservice involves adapting a model to a specific task or dataset to improve its performance on that task, we currently supported instruction tuning for LLMs, finetuning for reranking and embedding models.

# 🚀1. Start Microservice with Python (Optional 1)

Expand Down Expand Up @@ -86,14 +86,22 @@ docker run --runtime=habana -e HABANA_VISIBLE_DEVICES=all -p 8005:8005 -e OMPI_M

# 🚀3. Consume Finetuning Service

## 3.1 Create fine-tuning job
## 3.1 Upload a training file

Assuming a training file `alpaca_data.json` is uploaded, it can be downloaded in [here](https://github.com/tatsu-lab/stanford_alpaca/blob/main/alpaca_data.json), the following script launches a finetuning job using `meta-llama/Llama-2-7b-chat-hf` as base model:
Download a training file, such as `alpaca_data.json` for instruction tuning and upload it to the server with below command, this file can be downloaded in [here](https://github.com/tatsu-lab/stanford_alpaca/blob/main/alpaca_data.json):

```bash
# upload a training file
curl http://${your_ip}:8005/v1/finetune/upload_training_files -X POST -H "Content-Type: multipart/form-data" -F "files=@./alpaca_data.json"
```

For reranking and embedding models finetuning, the training file [toy_finetune_data.jsonl](https://github.com/FlagOpen/FlagEmbedding/blob/master/examples/finetune/toy_finetune_data.jsonl) is an toy example.

## 3.2 Create fine-tuning job

After a training file like `alpaca_data.json` is uploaded, use the following command to launch a finetuning job using `meta-llama/Llama-2-7b-chat-hf` as base model:

```bash
# create a finetuning job
curl http://${your_ip}:8005/v1/fine_tuning/jobs \
-X POST \
Expand All @@ -102,7 +110,26 @@ curl http://${your_ip}:8005/v1/fine_tuning/jobs \
"training_file": "alpaca_data.json",
"model": "meta-llama/Llama-2-7b-chat-hf"
}'
```

Use the following command to launch a finetuning job for reranking model finetuning, such as `BAAI/bge-reranker-large`:

```bash
# create a finetuning job
curl http://${your_ip}:8005/v1/fine_tuning/jobs \
-X POST \
-H "Content-Type: application/json" \
-d '{
"training_file": "toy_finetune_data.json",
"model": "BAAI/bge-reranker-large"
}'
```

## 3.3 Manage fine-tuning job

Below commands show how to list finetuning jobs, retrieve a finetuning job, cancel a finetuning job and list checkpoints of a finetuning job.

``` bash
# list finetuning jobs
curl http://${your_ip}:8005/v1/fine_tuning/jobs -X GET

Expand All @@ -117,5 +144,4 @@ curl http://localhost:8005/v1/fine_tuning/jobs/cancel -X POST -H "Content-Ty

# list checkpoints of a finetuning job
curl http://${your_ip}:8005/v1/finetune/list_checkpoints -X POST -H "Content-Type: application/json" -d '{"fine_tuning_job_id": ${fine_tuning_job_id}}'

```
52 changes: 52 additions & 0 deletions comps/finetuning/llm_on_ray/finetune/data_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@
# Copyright 2023 The LLM-on-Ray Authors.

import copy
import math
import random
import re
from dataclasses import dataclass
from itertools import chain
from typing import Dict, List, Tuple

import torch
from torch.utils.data import Dataset
from transformers import BatchEncoding, DataCollatorWithPadding

IGNORE_INDEX = -100

Expand Down Expand Up @@ -194,3 +200,49 @@ def tokenize(self, examples):
examples["labels"].append(labels)
examples["attention_mask"].append(results["attention_mask"])
return examples


class TrainDatasetForCE(Dataset):
def __init__(self, dataset, args, tokenizer):
self.dataset = dataset
self.tokenizer = tokenizer
self.args = args
self.total_len = len(self.dataset)

def create_one_example(self, qry_encoding: str, doc_encoding: str):
item = self.tokenizer.encode_plus(
qry_encoding,
doc_encoding,
truncation=True,
max_length=self.args.get("max_length", 512),
padding=False,
)
return item

def __len__(self):
return self.total_len

def __getitem__(self, item) -> List[BatchEncoding]:
query = self.dataset[item]["query"]
pos = random.choice(self.dataset[item]["pos"])
train_group_size = self.args.get("train_group_size", 8)
if len(self.dataset[item]["neg"]) < train_group_size - 1:
num = math.ceil((train_group_size - 1) / len(self.dataset[item]["neg"]))
negs = random.sample(self.dataset[item]["neg"] * num, train_group_size - 1)
else:
negs = random.sample(self.dataset[item]["neg"], train_group_size - 1)

batch_data = []
batch_data.append(self.create_one_example(query, pos))
for neg in negs:
batch_data.append(self.create_one_example(query, neg))

return batch_data


@dataclass
class GroupCollator(DataCollatorWithPadding):
def __call__(self, features) -> Tuple[Dict[str, torch.Tensor], Dict[str, torch.Tensor]]:
if isinstance(features[0], list):
features = sum(features, [])
return super().__call__(features)
146 changes: 89 additions & 57 deletions comps/finetuning/llm_on_ray/finetune/finetune.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
import ray
import torch
import transformers
from modeling import CrossEncoder
from peft import LoraConfig, get_peft_model
from pydantic_yaml import parse_yaml_raw_as
from ray.air import FailureConfig, RunConfig
from ray.air.config import ScalingConfig
from ray.train.torch import TorchTrainer
from transformers import Trainer, TrainingArguments

from comps.finetuning.llm_on_ray import common
from comps.finetuning.llm_on_ray.finetune.data_process import DataProcessor
from comps.finetuning.llm_on_ray.finetune.data_process import DataProcessor, GroupCollator, TrainDatasetForCE
from comps.finetuning.llm_on_ray.finetune.finetune_config import FinetuneConfig


Expand Down Expand Up @@ -183,74 +185,106 @@ def local_load(name, **load_config):


def tokenize_dataset(config: Dict, tokenizer, dataset):
group = config["Dataset"].get("group", True)
block_size = config["Dataset"].get("block_size", 512)
tokenizer.pad_token = tokenizer.eos_token

processor = DataProcessor(config, tokenizer)

for key in dataset:
prompts = processor.make_prompt(dataset[key])
dataset[key] = datasets.Dataset.from_dict(prompts)

column_names = list(dataset["train"].features)
tokenize_fn = (
processor.tokenize_by_neural_chat
if config["Dataset"].get("data_preprocess_type", "") == "neural_chat"
else processor.tokenize
)

tokenized_dataset = dataset.map(
tokenize_fn,
remove_columns=column_names,
batched=True,
load_from_cache_file=False,
desc="Tokenize dataset",
)

if group:

def group_texts(examples):
# Concatenate all texts.
concatenated_examples = {k: list(chain(*examples[k])) for k in examples.keys()}
total_length = len(concatenated_examples[list(examples.keys())[0]])
# We drop the small remainder, we could add padding if the model supported it instead of this drop, you can
# customize this part to your needs.
if total_length >= block_size:
total_length = (total_length // block_size) * block_size
# Split by chunks of max_len.
result = {
k: [t[i : i + block_size] for i in range(0, total_length, block_size)]
for k, t in concatenated_examples.items()
}
return result
task = config["General"].get("task", "instruction_tuning")
if task == "instruction_tuning":
group = config["Dataset"].get("group", True)
block_size = config["Dataset"].get("block_size", 512)
tokenizer.pad_token = tokenizer.eos_token

processor = DataProcessor(config, tokenizer)

for key in dataset:
prompts = processor.make_prompt(dataset[key])
dataset[key] = datasets.Dataset.from_dict(prompts)

column_names = list(dataset["train"].features)
tokenize_fn = (
processor.tokenize_by_neural_chat
if config["Dataset"].get("data_preprocess_type", "") == "neural_chat"
else processor.tokenize
)

tokenized_dataset = tokenized_dataset.map(
group_texts,
tokenized_dataset = dataset.map(
tokenize_fn,
remove_columns=column_names,
batched=True,
load_from_cache_file=False,
desc=f"Grouping texts in chunks of {block_size}",
desc="Tokenize dataset",
)

return tokenized_dataset
if group:

def group_texts(examples):
# Concatenate all texts.
concatenated_examples = {k: list(chain(*examples[k])) for k in examples.keys()}
total_length = len(concatenated_examples[list(examples.keys())[0]])
# We drop the small remainder, we could add padding if the model supported it instead of this drop, you can
# customize this part to your needs.
if total_length >= block_size:
total_length = (total_length // block_size) * block_size
# Split by chunks of max_len.
result = {
k: [t[i : i + block_size] for i in range(0, total_length, block_size)]
for k, t in concatenated_examples.items()
}
return result

tokenized_dataset = tokenized_dataset.map(
group_texts,
batched=True,
load_from_cache_file=False,
desc=f"Grouping texts in chunks of {block_size}",
)

return tokenized_dataset
elif task == "rerank":
dataset["train"] = TrainDatasetForCE(dataset["train"], config["Dataset"], tokenizer)
return dataset
elif task == "embedding":
pass
else:
raise NotImplementedError(f"Unsupported task {task}, only support instruction_tuning, rerank, embedding now.")


def prepare_data_collator(config: Dict, tokenizer):
return transformers.DataCollatorForLanguageModeling(
tokenizer=tokenizer, mlm=False, return_tensors="pt", pad_to_multiple_of=8
)
task = config["General"].get("task", "instruction_tuning")
if task == "instruction_tuning":
return transformers.DataCollatorForLanguageModeling(
tokenizer=tokenizer, mlm=False, return_tensors="pt", pad_to_multiple_of=8
)
elif task == "rerank":
return GroupCollator(tokenizer)
elif task == "embedding":
pass
else:
raise NotImplementedError(f"Unsupported task {task}, only support instruction_tuning, rerank, embedding now.")


def load_model(config: Dict):
model_name = config["General"]["base_model"]
model_dtype = convert_dtype(config["Training"].get("mixed_precision", "no"))
model_config = config["General"].get("config", {})
model = transformers.AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=model_dtype, **model_config)

lora_config = config["General"].get("lora_config", None)
if lora_config:
peft_config = LoraConfig(**lora_config)
model = get_peft_model(model, peft_config)
task = config["General"].get("task", "instruction_tuning")
training_args = convert_to_training_args(TrainingArguments, config)
if task == "instruction_tuning":
model = transformers.AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=model_dtype, **model_config)

lora_config = config["General"].get("lora_config", None)
if lora_config:
peft_config = LoraConfig(**lora_config)
model = get_peft_model(model, peft_config)
elif task == "rerank":
model = CrossEncoder.from_pretrained(
config["Dataset"],
training_args,
model_name,
from_tf=bool(".ckpt" in model_name),
config=model_config,
)
elif task == "embedding":
pass
else:
raise NotImplementedError(f"Unsupported task {task}, only support instruction_tuning, rerank, embedding now.")

egc = config["General"].get("enable_gradient_checkpointing", False)
if egc:
Expand All @@ -266,8 +300,6 @@ def load_model(config: Dict):
def get_trainer(config: Dict, model, tokenizer, tokenized_dataset, data_collator):
device = config["Training"]["device"]
if device in ["cpu", "gpu"]:
from transformers import Trainer, TrainingArguments

training_args = convert_to_training_args(TrainingArguments, config)
trainer = Trainer(
model=model,
Expand Down
7 changes: 7 additions & 0 deletions comps/finetuning/llm_on_ray/finetune/finetune_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,18 @@ class General(BaseModel):
config: GeneralConfig
lora_config: Optional[LoraConfig] = None
enable_gradient_checkpointing: bool = False
task: str = "instruction_tuning"

@validator("report_to")
def check_report_to(cls, v: str):
assert v in ["none", "tensorboard"]
return v

@validator("task")
def check_task(cls, v: str):
assert v in ["instruction_tuning", "rerank", "embedding"]
return v


class Dataset(BaseModel):
train_file: str
Expand All @@ -71,6 +77,7 @@ class Dataset(BaseModel):
data_preprocess_type: str = "neural_chat"
max_train_samples: int = 0
max_eval_samples: int = 0
train_group_size: int = 8


class RayResourceConfig(BaseModel):
Expand Down
50 changes: 50 additions & 0 deletions comps/finetuning/llm_on_ray/finetune/modeling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import torch
from finetune_config import Dataset
from torch import nn
from transformers import AutoModelForSequenceClassification, PreTrainedModel, TrainingArguments
from transformers.modeling_outputs import SequenceClassifierOutput


class CrossEncoder(PreTrainedModel):
def __init__(self, hf_model: PreTrainedModel, data_args: Dataset, train_args: TrainingArguments):
super().__init__(hf_model.config)
self.hf_model = hf_model
self.train_args = train_args
self.data_args = data_args

self.cross_entropy = nn.CrossEntropyLoss(reduction="mean")

self.register_buffer("target_label", torch.zeros(self.train_args.per_device_train_batch_size, dtype=torch.long))

def gradient_checkpointing_enable(self, **kwargs):
self.hf_model.gradient_checkpointing_enable(**kwargs)

def forward(self, **batch):
ranker_out: SequenceClassifierOutput = self.hf_model(**batch, return_dict=True)
logits = ranker_out.logits

if self.training:
scores = logits.view(self.train_args.per_device_train_batch_size, self.data_args.get("train_group_size", 8))
loss = self.cross_entropy(scores, self.target_label)

return SequenceClassifierOutput(
loss=loss,
**ranker_out,
)
else:
return ranker_out

@classmethod
def from_pretrained(cls, data_args: Dataset, train_args: TrainingArguments, *args, **kwargs):
hf_model = AutoModelForSequenceClassification.from_pretrained(*args, **kwargs)
reranker = cls(hf_model, data_args, train_args)
return reranker

def save_pretrained(self, output_dir: str, **kwargs):
state_dict = self.hf_model.state_dict()
state_dict = type(state_dict)({k: v.clone().cpu() for k, v in state_dict.items()})
kwargs.pop("state_dict")
self.hf_model.save_pretrained(output_dir, state_dict=state_dict, **kwargs)
Loading
Loading