diff --git a/comps/finetuning/README.md b/comps/finetuning/README.md index fb46977de..2717a0064 100644 --- a/comps/finetuning/README.md +++ b/comps/finetuning/README.md @@ -173,6 +173,32 @@ curl http://${your_ip}:8015/v1/fine_tuning/jobs \ ``` +### 3.2.4 LLM Pretraining + +Use the following command to launch a job for LLM pretraining, such as `meta-llama/Llama-2-7b-hf`: + +```bash +# create a finetuning job +curl http://${your_ip}:8015/v1/fine_tuning/jobs \ + -X POST \ + -H "Content-Type: application/json" \ + -d '{ + "training_file": "test_data.json", + "model": "meta-llama/Llama-2-7b-hf", + "General":{ + "task":"pretraining", + "lora_config":null + } + }' +``` + +Below is an example for the format of the pretraining dataset: + +```json +{"text": "A girl with a blue tank top sitting watching three dogs."} +{"text": "A boy with a blue tank top sitting watching three dogs."} +``` + ## 3.3 Manage fine-tuning job Below commands show how to list finetuning jobs, retrieve a finetuning job, cancel a finetuning job and list checkpoints of a finetuning job. diff --git a/comps/finetuning/finetune_config.py b/comps/finetuning/finetune_config.py index 3accabfb3..5473cd9aa 100644 --- a/comps/finetuning/finetune_config.py +++ b/comps/finetuning/finetune_config.py @@ -16,6 +16,7 @@ DEVICE_CPU = "cpu" DEVICE_HPU = "hpu" DEVICE_GPU = "gpu" +DEVICE_CUDA = "cuda" ACCELERATE_STRATEGY_DDP = "DDP" ACCELERATE_STRATEGY_FSDP = "FSDP" @@ -57,7 +58,7 @@ def check_report_to(cls, v: str): @validator("task") def check_task(cls, v: str): - assert v in ["instruction_tuning", "rerank", "embedding"] + assert v in ["instruction_tuning", "pretraining", "rerank", "embedding"] return v @@ -136,7 +137,7 @@ class TrainingConfig(BaseModel): def check_device(cls, v: str): # will convert to lower case if v: - assert v.lower() in [DEVICE_CPU, DEVICE_GPU, DEVICE_HPU] + assert v.lower() in [DEVICE_CPU, DEVICE_GPU, DEVICE_HPU, DEVICE_CUDA] return v.lower() @validator("hpu_execution_mode") diff --git a/comps/finetuning/llm_on_ray/finetune/data_process.py b/comps/finetuning/llm_on_ray/finetune/data_process.py index d85bf2bfa..07b12d71e 100644 --- a/comps/finetuning/llm_on_ray/finetune/data_process.py +++ b/comps/finetuning/llm_on_ray/finetune/data_process.py @@ -18,7 +18,7 @@ IGNORE_INDEX = -100 -class DataProcessor: +class InstructionDataProcessor: # We used the following prompts for fine-tuning the Alpaca model. You can find reference doc form this URL(https://github.com/tatsu-lab/stanford_alpaca/blob/main/README.md#data-release) def __init__(self, config, tokenizer): self.tokenizer = tokenizer @@ -202,6 +202,39 @@ def tokenize(self, examples): return examples +class PretrainingDataProcessor: + def __init__(self, config, tokenizer): + self.tokenizer = tokenizer + self.max_length = self.max_seq_length = config["Dataset"].get("max_length", 512) + self.truncation = config["Dataset"].get("truncation", True) + self.padding = config["Dataset"].get("padding", True) + + def tokenize(self, examples): + keys = list(examples.data.keys()) + if len(keys) != 1 and "text" not in keys: + raise ValueError("Unsupported dataset format") + + key = keys[0] if len(keys) == 1 else "text" + examples["input_ids"] = [] + examples["labels"] = [] + examples["attention_mask"] = [] + for exp in examples[key]: + results = self.tokenizer( + exp, + padding=self.padding, + truncation=self.truncation, + return_tensors=None, + max_length=self.max_length, + ) + + input_ids = results["input_ids"] + labels = copy.deepcopy(input_ids) + examples["input_ids"].append(results["input_ids"]) + examples["labels"].append(labels) + examples["attention_mask"].append(results["attention_mask"]) + return examples + + class TrainDatasetForCE(Dataset): def __init__(self, dataset, args, tokenizer): self.dataset = dataset diff --git a/comps/finetuning/llm_on_ray/finetune/finetune.py b/comps/finetuning/llm_on_ray/finetune/finetune.py index c66cc7bbe..f44deedff 100644 --- a/comps/finetuning/llm_on_ray/finetune/finetune.py +++ b/comps/finetuning/llm_on_ray/finetune/finetune.py @@ -28,9 +28,10 @@ from comps.finetuning.finetune_config import FinetuneConfig from comps.finetuning.llm_on_ray import common from comps.finetuning.llm_on_ray.finetune.data_process import ( - DataProcessor, EmbedCollator, GroupCollator, + InstructionDataProcessor, + PretrainingDataProcessor, TrainDatasetForCE, TrainDatasetForEmbedding, ) @@ -198,9 +199,9 @@ def tokenize_dataset(config: Dict, tokenizer, dataset): if task == "instruction_tuning": group = config["Dataset"].get("group", True) block_size = config["Dataset"].get("block_size", 512) - tokenizer.pad_token = tokenizer.eos_token + tokenizer.pad_token = tokenizer.eos_token if not tokenizer.pad_token else tokenizer.pad_token - processor = DataProcessor(config, tokenizer) + processor = InstructionDataProcessor(config, tokenizer) for key in dataset: prompts = processor.make_prompt(dataset[key]) @@ -221,6 +222,48 @@ def tokenize_dataset(config: Dict, tokenizer, dataset): desc="Tokenize dataset", ) + if group: + + def group_texts(examples): + # Concatenate all texts. + concatenated_examples = {k: list(chain(*examples[k])) for k in examples.keys()} + total_length = len(concatenated_examples[list(examples.keys())[0]]) + # We drop the small remainder, we could add padding if the model supported it instead of this drop, you can + # customize this part to your needs. + if total_length >= block_size: + total_length = (total_length // block_size) * block_size + # Split by chunks of max_len. + result = { + k: [t[i : i + block_size] for i in range(0, total_length, block_size)] + for k, t in concatenated_examples.items() + } + return result + + tokenized_dataset = tokenized_dataset.map( + group_texts, + batched=True, + load_from_cache_file=False, + desc=f"Grouping texts in chunks of {block_size}", + ) + + return tokenized_dataset + elif task == "pretraining": + group = True + block_size = config["Dataset"].get("block_size", 512) + tokenizer.pad_token = tokenizer.eos_token if not tokenizer.pad_token else tokenizer.pad_token + + processor = PretrainingDataProcessor(config, tokenizer) + + column_names = list(dataset["train"].features) + + tokenized_dataset = dataset.map( + processor.tokenize, + remove_columns=column_names, + batched=True, + load_from_cache_file=False, + desc="Tokenize dataset", + ) + if group: def group_texts(examples): @@ -258,7 +301,7 @@ def group_texts(examples): def prepare_data_collator(config: Dict, tokenizer): task = config["General"].get("task", "instruction_tuning") - if task == "instruction_tuning": + if task == "instruction_tuning" or task == "pretraining": return transformers.DataCollatorForLanguageModeling( tokenizer=tokenizer, mlm=False, return_tensors="pt", pad_to_multiple_of=8 ) @@ -280,10 +323,10 @@ def load_model(config: Dict): model_dtype = convert_dtype(config["Training"].get("mixed_precision", "no")) model_config = config["General"].get("config", {}) task = config["General"].get("task", "instruction_tuning") - if task == "instruction_tuning": + if task == "instruction_tuning" or task == "pretraining": model = transformers.AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=model_dtype, **model_config) lora_config = config["General"].get("lora_config", None) - if lora_config: + if lora_config and task != "pretraining": peft_config = LoraConfig(**lora_config) model = get_peft_model(model, peft_config) elif task == "rerank": @@ -326,7 +369,7 @@ def load_model(config: Dict): def get_trainer(config: Dict, model, tokenizer, tokenized_dataset, data_collator): device = config["Training"]["device"] - if device in ["cpu", "gpu"]: + if device in ["cpu", "gpu", "cuda"]: training_args = convert_to_training_args(TrainingArguments, config) trainer = Trainer( model=model, diff --git a/tests/test_finetuning_llm_pretraining.sh b/tests/test_finetuning_llm_pretraining.sh new file mode 100644 index 000000000..69460fbc0 --- /dev/null +++ b/tests/test_finetuning_llm_pretraining.sh @@ -0,0 +1,118 @@ +#!/bin/bash +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +set -x + +WORKPATH=$(dirname "$PWD") +LOG_PATH="$WORKPATH/tests" +ip_address=$(hostname -I | awk '{print $1}') +finetuning_service_port=8015 +ray_port=8265 + +function build_docker_images() { + cd $WORKPATH + echo $(pwd) + docker build -t opea/finetuning:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy --build-arg HF_TOKEN=$HF_TOKEN -f comps/finetuning/docker/Dockerfile_cpu . + if [ $? -ne 0 ]; then + echo "opea/finetuning built fail" + exit 1 + else + echo "opea/finetuning built successful" + fi +} + +function start_service() { + export no_proxy="localhost,127.0.0.1,"${ip_address} + docker run -d --name="finetuning-server" -p $finetuning_service_port:$finetuning_service_port -p $ray_port:$ray_port --runtime=runc --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e no_proxy=$no_proxy opea/finetuning:latest + sleep 1m +} + +function validate_microservice() { + cd $LOG_PATH + export no_proxy="localhost,127.0.0.1,"${ip_address} + + # test /v1/dataprep upload file + URL="http://${ip_address}:$finetuning_service_port/v1/files" + cat < test_data.json +{"text": "Five women walk along a beach wearing flip-flops."} +{"text": "A woman standing on a high cliff on one leg looking over a river."} +{"text": "Two woman are playing instruments; one a clarinet, the other a violin."} +{"text": "A girl with a blue tank top sitting watching three dogs."} +{"text": "A yellow dog running along a forest path."} +{"text": "It sets out essential activities in each phase along with critical factors related to those activities."} +EOF + HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -F 'file=@./test_data.json' -F purpose="fine-tune" -H 'Content-Type: multipart/form-data' "$URL") + HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://') + RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') + SERVICE_NAME="finetuning-server - upload - file" + + # Parse the JSON response + purpose=$(echo "$RESPONSE_BODY" | jq -r '.purpose') + filename=$(echo "$RESPONSE_BODY" | jq -r '.filename') + + # Define expected values + expected_purpose="fine-tune" + expected_filename="test_data.json" + + if [ "$HTTP_STATUS" -ne "200" ]; then + echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs finetuning-server >> ${LOG_PATH}/finetuning-server_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..." + fi + # Check if the parsed values match the expected values + if [[ "$purpose" != "$expected_purpose" || "$filename" != "$expected_filename" ]]; then + echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY" + docker logs finetuning-server >> ${LOG_PATH}/finetuning-server_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] Content is as expected." + fi + + # test /v1/fine_tuning/jobs + URL="http://${ip_address}:$finetuning_service_port/v1/fine_tuning/jobs" + HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -H 'Content-Type: application/json' -d '{"training_file": "test_data.json","model": "facebook/opt-125m","General":{"task":"pretraining","lora_config":null}}' "$URL") + HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://') + RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') + SERVICE_NAME="finetuning-server - create finetuning job" + + if [ "$HTTP_STATUS" -ne "200" ]; then + echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs finetuning-server >> ${LOG_PATH}/finetuning-server_create.log + exit 1 + else + echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..." + fi + if [[ "$RESPONSE_BODY" != *'{"id":"ft-job'* ]]; then + echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY" + docker logs finetuning-server >> ${LOG_PATH}/finetuning-server_create.log + exit 1 + else + echo "[ $SERVICE_NAME ] Content is as expected." + fi + + sleep 3m +} + +function stop_docker() { + cid=$(docker ps -aq --filter "name=finetuning-server*") + if [[ ! -z "$cid" ]]; then docker stop $cid && docker rm $cid && sleep 1s; fi +} + +function main() { + + stop_docker + + build_docker_images + start_service + + validate_microservice + + stop_docker + echo y | docker system prune + +} + +main diff --git a/tests/test_finetuning_rerank.sh b/tests/test_finetuning_rerank.sh new file mode 100644 index 000000000..fd594bf6c --- /dev/null +++ b/tests/test_finetuning_rerank.sh @@ -0,0 +1,117 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +set -x + +WORKPATH=$(dirname "$PWD") +LOG_PATH="$WORKPATH/tests" +ip_address=$(hostname -I | awk '{print $1}') +finetuning_service_port=8015 +ray_port=8265 + +function build_docker_images() { + cd $WORKPATH + echo $(pwd) + docker build -t opea/finetuning:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy --build-arg HF_TOKEN=$HF_TOKEN -f comps/finetuning/docker/Dockerfile_cpu . + if [ $? -ne 0 ]; then + echo "opea/finetuning built fail" + exit 1 + else + echo "opea/finetuning built successful" + fi +} + +function start_service() { + export no_proxy="localhost,127.0.0.1,"${ip_address} + docker run -d --name="finetuning-server" -p $finetuning_service_port:$finetuning_service_port -p $ray_port:$ray_port --runtime=runc --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e no_proxy=$no_proxy opea/finetuning:latest + sleep 1m +} + +function validate_microservice() { + cd $LOG_PATH + export no_proxy="localhost,127.0.0.1,"${ip_address} + + # test /v1/dataprep upload file + URL="http://${ip_address}:$finetuning_service_port/v1/files" + cat < test_data.json +{"query": "Five women walk along a beach wearing flip-flops.", "pos": ["Some women with flip-flops on, are walking along the beach"], "neg": ["The 4 women are sitting on the beach.", "There was a reform in 1996.", "She's not going to court to clear her record.", "The man is talking about hawaii.", "A woman is standing outside.", "The battle was over. ", "A group of people plays volleyball."]} +{"query": "A woman standing on a high cliff on one leg looking over a river.", "pos": ["A woman is standing on a cliff."], "neg": ["A woman sits on a chair.", "George Bush told the Republicans there was no way he would let them even consider this foolish idea, against his top advisors advice.", "The family was falling apart.", "no one showed up to the meeting", "A boy is sitting outside playing in the sand.", "Ended as soon as I received the wire.", "A child is reading in her bedroom."]} +{"query": "Two woman are playing instruments; one a clarinet, the other a violin.", "pos": ["Some people are playing a tune."], "neg": ["Two women are playing a guitar and drums.", "A man is skiing down a mountain.", "The fatal dose was not taken when the murderer thought it would be.", "Person on bike", "The girl is standing, leaning against the archway.", "A group of women watch soap operas.", "No matter how old people get they never forget. "]} +{"query": "A girl with a blue tank top sitting watching three dogs.", "pos": ["A girl is wearing blue."], "neg": ["A girl is with three cats.", "The people are watching a funeral procession.", "The child is wearing black.", "Financing is an issue for us in public schools.", "Kids at a pool.", "It is calming to be assaulted.", "I face a serious problem at eighteen years old. "]} +{"query": "A yellow dog running along a forest path.", "pos": ["a dog is running"], "neg": ["a cat is running", "Steele did not keep her original story.", "The rule discourages people to pay their child support.", "A man in a vest sits in a car.", "Person in black clothing, with white bandanna and sunglasses waits at a bus stop.", "Neither the Globe or Mail had comments on the current state of Canada's road system. ", "The Spring Creek facility is old and outdated."]} +{"query": "It sets out essential activities in each phase along with critical factors related to those activities.", "pos": ["Critical factors for essential activities are set out."], "neg": ["It lays out critical activities but makes no provision for critical factors related to those activities.", "People are assembled in protest.", "The state would prefer for you to do that.", "A girl sits beside a boy.", "Two males are performing.", "Nobody is jumping", "Conrad was being plotted against, to be hit on the head."]} +EOF + HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -F 'file=@./test_data.json' -F purpose="fine-tune" -H 'Content-Type: multipart/form-data' "$URL") + HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://') + RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') + SERVICE_NAME="finetuning-server - upload - file" + + # Parse the JSON response + purpose=$(echo "$RESPONSE_BODY" | jq -r '.purpose') + filename=$(echo "$RESPONSE_BODY" | jq -r '.filename') + + # Define expected values + expected_purpose="fine-tune" + expected_filename="test_data.json" + + if [ "$HTTP_STATUS" -ne "200" ]; then + echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs finetuning-server >> ${LOG_PATH}/finetuning-server_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..." + fi + # Check if the parsed values match the expected values + if [[ "$purpose" != "$expected_purpose" || "$filename" != "$expected_filename" ]]; then + echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY" + docker logs finetuning-server >> ${LOG_PATH}/finetuning-server_upload_file.log + exit 1 + else + echo "[ $SERVICE_NAME ] Content is as expected." + fi + + # test /v1/fine_tuning/jobs + URL="http://${ip_address}:$finetuning_service_port/v1/fine_tuning/jobs" + HTTP_RESPONSE=$(curl --silent --write-out "HTTPSTATUS:%{http_code}" -X POST -H 'Content-Type: application/json' -d '{"training_file": "test_data.json","model": "BAAI/bge-reranker-base","General":{"task":"rerank","lora_config":null}}' "$URL") + HTTP_STATUS=$(echo $HTTP_RESPONSE | tr -d '\n' | sed -e 's/.*HTTPSTATUS://') + RESPONSE_BODY=$(echo $HTTP_RESPONSE | sed -e 's/HTTPSTATUS\:.*//g') + SERVICE_NAME="finetuning-server - create finetuning job" + + if [ "$HTTP_STATUS" -ne "200" ]; then + echo "[ $SERVICE_NAME ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs finetuning-server >> ${LOG_PATH}/finetuning-server_create.log + exit 1 + else + echo "[ $SERVICE_NAME ] HTTP status is 200. Checking content..." + fi + if [[ "$RESPONSE_BODY" != *'{"id":"ft-job'* ]]; then + echo "[ $SERVICE_NAME ] Content does not match the expected result: $RESPONSE_BODY" + docker logs finetuning-server >> ${LOG_PATH}/finetuning-server_create.log + exit 1 + else + echo "[ $SERVICE_NAME ] Content is as expected." + fi + + sleep 10m +} + +function stop_docker() { + cid=$(docker ps -aq --filter "name=finetuning-server*") + if [[ ! -z "$cid" ]]; then docker stop $cid && docker rm $cid && sleep 1s; fi +} + +function main() { + + stop_docker + + build_docker_images + start_service + + validate_microservice + + stop_docker + echo y | docker system prune + +} + +main