Skip to content

Commit

Permalink
[BAAI] support framework flagscale-2409 & support case llava1.5-7B-fl…
Browse files Browse the repository at this point in the history
…agscale (#714)

* init scale

* fix

* upd path

* add

* final

* fix
  • Loading branch information
shh2000 authored Aug 23, 2024
1 parent 302ef6d commit 893ed3b
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 11 deletions.
232 changes: 232 additions & 0 deletions training/benchmarks/llava1.5_7b/flagscale/run_pretraining.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
import subprocess
from argparse import ArgumentParser
import os
import sys
from importlib import import_module
import yaml
import time


def parse_args():
'''we parse ddp related args, check system config args, and running env
args such as --data_dir_xxx. Then pass all useful args to the real
training script.
'''
parser = ArgumentParser(description="flagscale main python")
parser.add_argument("--world_size", type=int, required=True)
parser.add_argument("--vendor", type=str, required=True)
parser.add_argument("--data_dir", type=str, required=True)
parser.add_argument("--hosts", type=str, required=True)
parser.add_argument("--host_addr", type=str, required=True)
parser.add_argument("--log_dir", type=str, required=True)
parser.add_argument("--flagperf_config_file", type=str, required=True)
args, unknown_args = parser.parse_known_args()
args.unknown_args = unknown_args
return args


def install_scale(module, log_dir, debug_mode=False):
if not debug_mode:
exec_cmd = getattr(module, "scale_download_cmd")
print(exec_cmd)

install_logdir = os.path.join(log_dir, "install_logs")
os.makedirs(install_logdir)

logfile = os.path.join(install_logdir, "scale_download.log.txt")
with open(logfile, 'w') as f:
p = subprocess.Popen(exec_cmd,
shell=True,
stdout=f,
stderr=subprocess.STDOUT)
p.wait()
f.close()

exec_cmd = getattr(module, "scale_install_cmd")
logfile = os.path.join(install_logdir, "scale_install.log.txt")
with open(logfile, 'w') as f:
p = subprocess.Popen(exec_cmd,
shell=True,
stdout=f,
stderr=subprocess.STDOUT)
p.wait()
f.close()

exec_cmd = getattr(module, "energon_locate_cmd")
logfile = os.path.join(install_logdir, "energon_locate.log.txt")
with open(logfile, 'w') as f:
p = subprocess.Popen(exec_cmd,
shell=True,
stdout=f,
stderr=subprocess.STDOUT)
p.wait()
f.close()

with open(logfile, 'r') as f:
energon_locate = f.readline().replace('\n', '')
print(energon_locate)

src_dir = os.path.join(energon_locate, "megatron", "energon")
dst_dir = os.path.join(getattr(module, "scale_home"), "megatron",
"megatron")
exec_cmd = f"cp -r {src_dir} {dst_dir}/"

logfile = os.path.join(install_logdir, "energon_copy.log.txt")
with open(logfile, 'w') as f:
p = subprocess.Popen(exec_cmd,
shell=True,
stdout=f,
stderr=subprocess.STDOUT)
p.wait()
f.close()


def replace_yamls(scale_home, config_module, args):
scale_conf_dir = getattr(config_module, "scale_conf_dir")
dist_yaml = getattr(config_module, "configyaml")
with open(dist_yaml, 'r') as f:
dist_data = yaml.safe_load(f)

try:
dist_data["experiment"]["exp_dir"] = os.path.join(
args.log_dir, "outputs_llava1.5")
hosts = args.hosts.split(",")
dist_data["experiment"]["runner"]["nnodes"] = len(hosts)
dist_data["experiment"]["runner"]["ssh_port"] = getattr(
config_module, "flagscale_ssh_port")
hostfile = os.path.join(scale_home, "hostfile")
with open(hostfile, 'w') as f:
for host in hosts:
slots = dist_data["experiment"]["runner"]["nproc_per_node"]
chiptype = getattr(config_module, "flagscale_chip_type")
f.write(f"{host} slots={slots} type={chiptype}\n")
dist_data["experiment"]["runner"]["hostfile"] = hostfile
except Exception as e:
print(e)
print(
"You're using an illegal config.yaml in flagscale. You must fix it"
)

print(dist_data)

train_yaml = getattr(config_module, "trainyaml")

with open(train_yaml, 'r') as f:
train_data = yaml.safe_load(f)

try:
train_data["system"]["checkpoint"]["save_interval"] = 1000
train_data["system"]["checkpoint"][
"pretrained_checkpoint"] = os.path.join(
args.data_dir, "LLaVA_megatron",
"vicuna_instruct_clip336_tp1_combined_mcore")

train_data["model"]["train_iters"] = getattr(config_module, "steps")
train_data["model"].pop("img_embedding_idx", None)
train_data["data"]["data_path"] = getattr(config_module, "datasetyaml")
train_data["data"]["valid_path"] = getattr(config_module,
"datasetyaml")
train_data["data"]["prompt_path"] = getattr(config_module, "prompt")
train_data["data"]["tokenizer"]["tokenizer_model"] = os.path.join(
args.data_dir, "vicuna-7b-v1___5/tokenizer.model")
except Exception as e:
print(
"You're using an illegal trainllava.yaml in flagscale. You must fix it"
)

print(train_data)

dataset_yaml = getattr(config_module, "datasetyaml")

with open(dataset_yaml, 'r') as f:
dataset_data = yaml.safe_load(f)

try:
llava_train_dir = os.path.join(args.data_dir, "LLaVA-Pretrain/wds")
dataset_data["splits"]["train"]["datasets"][0][
"path"] = llava_train_dir
dataset_data["splits"]["val"]["datasets"][0]["path"] = llava_train_dir
except Exception as e:
print(
"You're using an illegal dataset.yaml in flagscale. You must fix it"
)

print(dataset_data)

with open(dist_yaml, 'w') as f:
yaml.safe_dump(dist_data, f)

with open(train_yaml, 'w') as f:
yaml.safe_dump(train_data, f)

with open(dataset_yaml, 'w') as f:
yaml.safe_dump(dataset_data, f)


if __name__ == "__main__":
args = parse_args()
print(args)
host = args.host_addr
hosts = args.hosts.split(",")
print(host, hosts)

if host != hosts[0]:
exit(0)

sys.path.append(os.path.dirname(args.flagperf_config_file))
config_file = os.path.basename(args.flagperf_config_file).split('.')[0]

module = import_module(config_file)
print(module)
scale_home = getattr(module, "scale_home")

install_scale(module, args.log_dir)

replace_yamls(scale_home, module, args)

scale_conf_dir = getattr(module, "scale_conf_dir")
configyaml = getattr(module, "configyaml")
configname = os.path.splitext(os.path.basename(configyaml))[0]
exec_cmd = f"cd {scale_home}; python3 run.py --config-path {scale_conf_dir} --config-name {configname}"

print(exec_cmd)
with open(os.path.join(args.log_dir, "flagscale_main.log.txt"), 'w') as f:
p = subprocess.Popen(exec_cmd,
shell=True,
stdout=f,
stderr=subprocess.STDOUT)
p.wait()

timestamp_log_host = hosts[-1]
timestamp_log_noderank = len(hosts) - 1

timestamp_log_file = os.path.join(
args.log_dir, "outputs_llava1.5", "logs", "host_" +
str(timestamp_log_noderank) + "_" + timestamp_log_host + ".output")

info_line = []
while True:
try:
with open(timestamp_log_file, 'r') as f:
lines = f.readlines()
for line in lines:
if "elapsed time per iteration" in line:
info_line.append(line)
except Exception as e:
print("Maybe some errors")
if len(info_line) == getattr(module, "steps"):
break
time.sleep(300)

infos = []
for line in info_line:
info = line.split("|")[2]
steptime = info.split(":")[1]
stepsecond = float(steptime) / 1000
infos.append(stepsecond)
print(infos)

ave_steptime = sum(infos[1:]) / len(infos[1:])
tps = 2048 * 256 / ave_steptime / args.world_size
mfu = tps * 7E9 * 6 / getattr(module, "flops")
print(f"MFU: {mfu}")
4 changes: 4 additions & 0 deletions training/nvidia/docker_image/flagscale_2409/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM base-harbor.platform-sz.jingneng-inner.ac.cn/airs-user/6907316d-94d9-469a-b481-1bdf0bfe2287_9f3b64c6-acad-4186-8693-864997cc7e10_aoyulong/flagscale:20240522120728
RUN /bin/bash -c "pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple"
RUN /bin/bash -c "uname -a"
RUN /bin/bash -c alias python3=python
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#!/bin/bash
26 changes: 26 additions & 0 deletions training/nvidia/llava1.5_7b-flagscale/config/config_H100x4x8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# scale_parent must under FlagPerf/ or data_dir/, otherwise you cannot mount it into baremetal, therefore cannot use shared storage
scale_parent = "/workspace"
scale_home = f"{scale_parent}/FlagScale"

# this cmd should install scale at <scale_home>. <scale_home> is set by flagperf.training.benchmarks.llava1.5_7b.flagscale.run_pretraining.py
scale_download_cmd = f"cd {scale_parent}; git clone https://github.com/FlagOpen/FlagScale.git; cd FlagScale; git checkout 604f79b"

# NV need nothing because all requirements have been established in base docker image. vendor can do anything related here
scale_install_cmd = ""

# locate energon. the copy from energon_install_path to flagscale/megatron/ is done by flagperf...run_pretraining.py
energon_locate_cmd = r"pip show megatron-energon | grep Location | awk -F: '{print $2}' | xargs"

scale_conf_dir = f"{scale_home}/examples/llava/conf"
configyaml = f"{scale_conf_dir}/config.yaml"
trainyaml = f"{scale_conf_dir}/train/train_llava1.5_7b.yaml"
datasetyaml = f"{scale_home}/megatron/examples/multimodal/pretrain_dataset.yaml"
prompt = f"{scale_home}/megatron/examples/multimodal/manual_prompts.json"

# flagscale's requirements
flagscale_chip_type = "H100"
flagscale_ssh_port = 60128
flops = 989E12

# for llava's algorithm
steps = 5000
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
megatron-energon==2.2.0
2 changes: 1 addition & 1 deletion training/run_benchmarks/config/cluster_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
MASTER_PORT = "29501"

# ssh connection port
SSH_PORT = "22"
SSH_PORT = "22"
13 changes: 6 additions & 7 deletions training/run_benchmarks/config/test_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
ACCE_VISIBLE_DEVICE_ENV_NAME = "CUDA_VISIBLE_DEVICES"

# Set pip source, which will be used in preparing envs in container
PIP_SOURCE = "https://mirror.baidu.com/pypi/simple"
PIP_SOURCE = "https://pypi.tuna.tsinghua.edu.cn/simple"

# The path that flagperf deploy in the cluster.
# Users must set FLAGPERF_PATH to where flagperf deploy
Expand Down Expand Up @@ -68,7 +68,7 @@
# "glm:pytorch_1.8:A100:1:8:1": "/raid/home_datasets_ckpt/glm/train/",
# "cpm:pytorch_1.8:A100:1:8:1": "/raid/home_datasets_ckpt/cpm/train/",


#"llava1.5_7b:flagscale_2409:H100:4:8:1": "/workspace/data_dir"
# "llava1.5_7b:deepspeed-torch:A800:1:8:1": "/raid/dataset/LLAVA/",
#"llama2_7b_finetune:pytorch_2.0.1:A100:1:1:1": "/raid/dataset/llama2_finetune/",
#"aquila2_7b_finetune:flagscale:A800:1:8:1": "/raid/dataset/aquila2_7b_finetune",
Expand All @@ -85,14 +85,14 @@
# "resnet50:pytorch_1.8:A100:1:8:1": "/raid/dataset/ImageNet_1k_2012/",
# "mask_rcnn:pytorch_1.8:A100:1:8:1": "/raid/dataset/maskrcnn/coco2017",
# "dlrm:pytorch_1.10:A100:1:8:1": "/raid/dataset/criteo_1TB_click_logs/binary_dataset/",

# "wav2vec2:pytorch_1.13:A100:1:8:1": "/raid/dataset/wav2vec2_data/LibriSpeech",
# "WaveGlow:pytorch_1.13:A100:1:8:1": "/raid/dataset/LJSpeech/",
# "resnet50:tensorflow2:A100:1:8:1": "/raid/dataset/ImageNet2012/tf_records/",
# "moflow:pytorch_1.13:A100:1:8:1": "/raid/dataset/MoFlow/data/",

# "distilbert:pytorch_1.12:A100:1:8:1": "/raid/dataset/distilbert/",

# "transformer:pytorch_1.13:A100:1:8:1": "/raid/dataset/transformer/wmt14_en_de_joined_dict",
# "swin_transformer:pytorch_1.8:A100:1:8:1": "/raid/dataset/ImageNet_1k_2012/",
# "transformer_xl:pytorch_1.8:A100:1:8:1": "/raid/dataset/transformer_xl/",
Expand All @@ -102,7 +102,7 @@
# "bert_hf:pytorch_1.13:A100:1:8:1": "/raid/dataset/bert_hf_train",
# "longformer:pytorch_1.12:A100:1:8:1": "/raid/dataset/longformer_train/",
# "detr:pytorch_1.13:A100:1:8:1": "/raid/dataset/detr/coco2017/",

# "llama2_7b:deepspeed:A100:1:8:1": "/raid/dataset/llama2_7b_pretrain",
# "aquila2_7b:flagscale:A100:1:8:1": "/raid/dataset/aquila2_7b_pretrain",
# "llama2_70B:megatron:H800:4:8:1": "/raid/dataset/llama2_70B_pretrain",
Expand All @@ -123,7 +123,7 @@
# "gpt3_13B:paddle_2.5.1:TP2PP1SH1SP4A10040G:1:8:1":"/raid/dataset/gpt-3/"
# "gpt3_13B:paddle_2.5.1:TP2PP1SH2SP4A10040G:1:8:1":"/raid/dataset/gpt-3/"
# "gpt3_13B:paddle_2.5.1:TP2PP4SH1SP1A10040G:1:8:1":"/raid/dataset/gpt-3/"

# "qwen1.5_MoE:megatron_pai:A800:1:8:1":"/raid/datasets/qwen1.5_MoE/"
# "mixtral_8x7B:megatron_core060:H100:4:8:1": "/raid/datasets/mistral"

Expand Down Expand Up @@ -200,5 +200,4 @@
#"gpt3_13B:paddle_2.6.0:TP2PP1SH2SP4C50040G:1:8:1":"/raid/data_set/data-gpt3"
#"gpt3_13B:paddle_2.6.0:TP1PP1SH2SP8C50080G:1:8:1":"/raid/data_set/data-gpt3"
# "qwen1.5_MoE:megatron_pai:C500:1:8:1":"/raid/datasets/qwen1.5_MoE/"

}
10 changes: 7 additions & 3 deletions training/run_benchmarks/flagscale/start_flagscale_task.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
'''This script is called in container to execute the real training task.
Support pytorch DDP only.
'''
import os
import sys
import subprocess
Expand Down Expand Up @@ -48,6 +45,10 @@ def parse_args():
type=int,
required=True,
help="how many processes will run on each host.")
parser.add_argument("--hosts",
type=str,
required=True,
help="hosts to run the testcase.")

parser.add_argument("--vendor",
type=str,
Expand Down Expand Up @@ -120,13 +121,16 @@ def main():
exec_cmd = "cd " + os.path.dirname(train_script_path) + ";"
exec_cmd = exec_cmd + "python run_pretraining.py"
exec_cmd = exec_cmd + " --world_size=" + str(task_args.nproc)
exec_cmd = exec_cmd + " --hosts=" + task_args.hosts
exec_cmd = exec_cmd + " --host_addr=" + task_args.host_addr
exec_cmd = exec_cmd + " --vendor=" + task_args.vendor
exec_cmd = exec_cmd + " --data_dir=" + task_args.data_dir
exec_cmd = exec_cmd + " --log_dir=" + task_log_dir
exec_cmd = exec_cmd + " --flagperf_config_file=" + config_file

task_log_file = os.path.join(task_log_dir, "rank0.out.log")

START_LOGGER.info(exec_cmd)
with open(task_log_file, "w") as f:
p = subprocess.Popen(exec_cmd,
shell=True,
Expand Down

0 comments on commit 893ed3b

Please sign in to comment.