Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BAAI] support framework flagscale-2409 & support case llava1.5-7B-flagscale #714

Merged
merged 6 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 232 additions & 0 deletions training/benchmarks/llava1.5_7b/flagscale/run_pretraining.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
import subprocess
from argparse import ArgumentParser
import os
import sys
from importlib import import_module
import yaml
import time


def parse_args():
'''we parse ddp related args, check system config args, and running env
args such as --data_dir_xxx. Then pass all useful args to the real
training script.
'''
parser = ArgumentParser(description="flagscale main python")
parser.add_argument("--world_size", type=int, required=True)
parser.add_argument("--vendor", type=str, required=True)
parser.add_argument("--data_dir", type=str, required=True)
parser.add_argument("--hosts", type=str, required=True)
parser.add_argument("--host_addr", type=str, required=True)
parser.add_argument("--log_dir", type=str, required=True)
parser.add_argument("--flagperf_config_file", type=str, required=True)
args, unknown_args = parser.parse_known_args()
args.unknown_args = unknown_args
return args


def install_scale(module, log_dir, debug_mode=False):
if not debug_mode:
exec_cmd = getattr(module, "scale_download_cmd")
print(exec_cmd)

install_logdir = os.path.join(log_dir, "install_logs")
os.makedirs(install_logdir)

logfile = os.path.join(install_logdir, "scale_download.log.txt")
with open(logfile, 'w') as f:
p = subprocess.Popen(exec_cmd,
shell=True,
stdout=f,
stderr=subprocess.STDOUT)
p.wait()
f.close()

exec_cmd = getattr(module, "scale_install_cmd")
logfile = os.path.join(install_logdir, "scale_install.log.txt")
with open(logfile, 'w') as f:
p = subprocess.Popen(exec_cmd,
shell=True,
stdout=f,
stderr=subprocess.STDOUT)
p.wait()
f.close()

exec_cmd = getattr(module, "energon_locate_cmd")
logfile = os.path.join(install_logdir, "energon_locate.log.txt")
with open(logfile, 'w') as f:
p = subprocess.Popen(exec_cmd,
shell=True,
stdout=f,
stderr=subprocess.STDOUT)
p.wait()
f.close()

with open(logfile, 'r') as f:
energon_locate = f.readline().replace('\n', '')
print(energon_locate)

src_dir = os.path.join(energon_locate, "megatron", "energon")
dst_dir = os.path.join(getattr(module, "scale_home"), "megatron",
"megatron")
exec_cmd = f"cp -r {src_dir} {dst_dir}/"

logfile = os.path.join(install_logdir, "energon_copy.log.txt")
with open(logfile, 'w') as f:
p = subprocess.Popen(exec_cmd,
shell=True,
stdout=f,
stderr=subprocess.STDOUT)
p.wait()
f.close()


def replace_yamls(scale_home, config_module, args):
scale_conf_dir = getattr(config_module, "scale_conf_dir")
dist_yaml = getattr(config_module, "configyaml")
with open(dist_yaml, 'r') as f:
dist_data = yaml.safe_load(f)

try:
dist_data["experiment"]["exp_dir"] = os.path.join(
args.log_dir, "outputs_llava1.5")
hosts = args.hosts.split(",")
dist_data["experiment"]["runner"]["nnodes"] = len(hosts)
dist_data["experiment"]["runner"]["ssh_port"] = getattr(
config_module, "flagscale_ssh_port")
hostfile = os.path.join(scale_home, "hostfile")
with open(hostfile, 'w') as f:
for host in hosts:
slots = dist_data["experiment"]["runner"]["nproc_per_node"]
chiptype = getattr(config_module, "flagscale_chip_type")
f.write(f"{host} slots={slots} type={chiptype}\n")
dist_data["experiment"]["runner"]["hostfile"] = hostfile
except Exception as e:
print(e)
print(
"You're using an illegal config.yaml in flagscale. You must fix it"
)

print(dist_data)

train_yaml = getattr(config_module, "trainyaml")

with open(train_yaml, 'r') as f:
train_data = yaml.safe_load(f)

try:
train_data["system"]["checkpoint"]["save_interval"] = 1000
train_data["system"]["checkpoint"][
"pretrained_checkpoint"] = os.path.join(
args.data_dir, "LLaVA_megatron",
"vicuna_instruct_clip336_tp1_combined_mcore")

train_data["model"]["train_iters"] = getattr(config_module, "steps")
train_data["model"].pop("img_embedding_idx", None)
train_data["data"]["data_path"] = getattr(config_module, "datasetyaml")
train_data["data"]["valid_path"] = getattr(config_module,
"datasetyaml")
train_data["data"]["prompt_path"] = getattr(config_module, "prompt")
train_data["data"]["tokenizer"]["tokenizer_model"] = os.path.join(
args.data_dir, "vicuna-7b-v1___5/tokenizer.model")
except Exception as e:
print(
"You're using an illegal trainllava.yaml in flagscale. You must fix it"
)

print(train_data)

dataset_yaml = getattr(config_module, "datasetyaml")

with open(dataset_yaml, 'r') as f:
dataset_data = yaml.safe_load(f)

try:
llava_train_dir = os.path.join(args.data_dir, "LLaVA-Pretrain/wds")
dataset_data["splits"]["train"]["datasets"][0][
"path"] = llava_train_dir
dataset_data["splits"]["val"]["datasets"][0]["path"] = llava_train_dir
except Exception as e:
print(
"You're using an illegal dataset.yaml in flagscale. You must fix it"
)

print(dataset_data)

with open(dist_yaml, 'w') as f:
yaml.safe_dump(dist_data, f)

with open(train_yaml, 'w') as f:
yaml.safe_dump(train_data, f)

with open(dataset_yaml, 'w') as f:
yaml.safe_dump(dataset_data, f)


if __name__ == "__main__":
args = parse_args()
print(args)
host = args.host_addr
hosts = args.hosts.split(",")
print(host, hosts)

if host != hosts[0]:
exit(0)

sys.path.append(os.path.dirname(args.flagperf_config_file))
config_file = os.path.basename(args.flagperf_config_file).split('.')[0]

module = import_module(config_file)
print(module)
scale_home = getattr(module, "scale_home")

install_scale(module, args.log_dir)

replace_yamls(scale_home, module, args)

scale_conf_dir = getattr(module, "scale_conf_dir")
configyaml = getattr(module, "configyaml")
configname = os.path.splitext(os.path.basename(configyaml))[0]
exec_cmd = f"cd {scale_home}; python3 run.py --config-path {scale_conf_dir} --config-name {configname}"

print(exec_cmd)
with open(os.path.join(args.log_dir, "flagscale_main.log.txt"), 'w') as f:
p = subprocess.Popen(exec_cmd,
shell=True,
stdout=f,
stderr=subprocess.STDOUT)
p.wait()

timestamp_log_host = hosts[-1]
timestamp_log_noderank = len(hosts) - 1

timestamp_log_file = os.path.join(
args.log_dir, "outputs_llava1.5", "logs", "host_" +
str(timestamp_log_noderank) + "_" + timestamp_log_host + ".output")

info_line = []
while True:
try:
with open(timestamp_log_file, 'r') as f:
lines = f.readlines()
for line in lines:
if "elapsed time per iteration" in line:
info_line.append(line)
except Exception as e:
print("Maybe some errors")
if len(info_line) == getattr(module, "steps"):
break
time.sleep(300)

infos = []
for line in info_line:
info = line.split("|")[2]
steptime = info.split(":")[1]
stepsecond = float(steptime) / 1000
infos.append(stepsecond)
print(infos)

ave_steptime = sum(infos[1:]) / len(infos[1:])
tps = 2048 * 256 / ave_steptime / args.world_size
mfu = tps * 7E9 * 6 / getattr(module, "flops")
print(f"MFU: {mfu}")
4 changes: 4 additions & 0 deletions training/nvidia/docker_image/flagscale_2409/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM base-harbor.platform-sz.jingneng-inner.ac.cn/airs-user/6907316d-94d9-469a-b481-1bdf0bfe2287_9f3b64c6-acad-4186-8693-864997cc7e10_aoyulong/flagscale:20240522120728
RUN /bin/bash -c "pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple"
RUN /bin/bash -c "uname -a"
RUN /bin/bash -c alias python3=python
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#!/bin/bash
26 changes: 26 additions & 0 deletions training/nvidia/llava1.5_7b-flagscale/config/config_H100x4x8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# scale_parent must under FlagPerf/ or data_dir/, otherwise you cannot mount it into baremetal, therefore cannot use shared storage
scale_parent = "/workspace"
scale_home = f"{scale_parent}/FlagScale"

# this cmd should install scale at <scale_home>. <scale_home> is set by flagperf.training.benchmarks.llava1.5_7b.flagscale.run_pretraining.py
scale_download_cmd = f"cd {scale_parent}; git clone https://github.com/FlagOpen/FlagScale.git; cd FlagScale; git checkout 604f79b"

# NV need nothing because all requirements have been established in base docker image. vendor can do anything related here
scale_install_cmd = ""

# locate energon. the copy from energon_install_path to flagscale/megatron/ is done by flagperf...run_pretraining.py
energon_locate_cmd = r"pip show megatron-energon | grep Location | awk -F: '{print $2}' | xargs"

scale_conf_dir = f"{scale_home}/examples/llava/conf"
configyaml = f"{scale_conf_dir}/config.yaml"
trainyaml = f"{scale_conf_dir}/train/train_llava1.5_7b.yaml"
datasetyaml = f"{scale_home}/megatron/examples/multimodal/pretrain_dataset.yaml"
prompt = f"{scale_home}/megatron/examples/multimodal/manual_prompts.json"

# flagscale's requirements
flagscale_chip_type = "H100"
flagscale_ssh_port = 60128
flops = 989E12

# for llava's algorithm
steps = 5000
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
megatron-energon==2.2.0
2 changes: 1 addition & 1 deletion training/run_benchmarks/config/cluster_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
MASTER_PORT = "29501"

# ssh connection port
SSH_PORT = "22"
SSH_PORT = "22"
13 changes: 6 additions & 7 deletions training/run_benchmarks/config/test_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
ACCE_VISIBLE_DEVICE_ENV_NAME = "CUDA_VISIBLE_DEVICES"

# Set pip source, which will be used in preparing envs in container
PIP_SOURCE = "https://mirror.baidu.com/pypi/simple"
PIP_SOURCE = "https://pypi.tuna.tsinghua.edu.cn/simple"

# The path that flagperf deploy in the cluster.
# Users must set FLAGPERF_PATH to where flagperf deploy
Expand Down Expand Up @@ -68,7 +68,7 @@
# "glm:pytorch_1.8:A100:1:8:1": "/raid/home_datasets_ckpt/glm/train/",
# "cpm:pytorch_1.8:A100:1:8:1": "/raid/home_datasets_ckpt/cpm/train/",


#"llava1.5_7b:flagscale_2409:H100:4:8:1": "/workspace/data_dir"
# "llava1.5_7b:deepspeed-torch:A800:1:8:1": "/raid/dataset/LLAVA/",
#"llama2_7b_finetune:pytorch_2.0.1:A100:1:1:1": "/raid/dataset/llama2_finetune/",
#"aquila2_7b_finetune:flagscale:A800:1:8:1": "/raid/dataset/aquila2_7b_finetune",
Expand All @@ -85,14 +85,14 @@
# "resnet50:pytorch_1.8:A100:1:8:1": "/raid/dataset/ImageNet_1k_2012/",
# "mask_rcnn:pytorch_1.8:A100:1:8:1": "/raid/dataset/maskrcnn/coco2017",
# "dlrm:pytorch_1.10:A100:1:8:1": "/raid/dataset/criteo_1TB_click_logs/binary_dataset/",

# "wav2vec2:pytorch_1.13:A100:1:8:1": "/raid/dataset/wav2vec2_data/LibriSpeech",
# "WaveGlow:pytorch_1.13:A100:1:8:1": "/raid/dataset/LJSpeech/",
# "resnet50:tensorflow2:A100:1:8:1": "/raid/dataset/ImageNet2012/tf_records/",
# "moflow:pytorch_1.13:A100:1:8:1": "/raid/dataset/MoFlow/data/",

# "distilbert:pytorch_1.12:A100:1:8:1": "/raid/dataset/distilbert/",

# "transformer:pytorch_1.13:A100:1:8:1": "/raid/dataset/transformer/wmt14_en_de_joined_dict",
# "swin_transformer:pytorch_1.8:A100:1:8:1": "/raid/dataset/ImageNet_1k_2012/",
# "transformer_xl:pytorch_1.8:A100:1:8:1": "/raid/dataset/transformer_xl/",
Expand All @@ -102,7 +102,7 @@
# "bert_hf:pytorch_1.13:A100:1:8:1": "/raid/dataset/bert_hf_train",
# "longformer:pytorch_1.12:A100:1:8:1": "/raid/dataset/longformer_train/",
# "detr:pytorch_1.13:A100:1:8:1": "/raid/dataset/detr/coco2017/",

# "llama2_7b:deepspeed:A100:1:8:1": "/raid/dataset/llama2_7b_pretrain",
# "aquila2_7b:flagscale:A100:1:8:1": "/raid/dataset/aquila2_7b_pretrain",
# "llama2_70B:megatron:H800:4:8:1": "/raid/dataset/llama2_70B_pretrain",
Expand All @@ -123,7 +123,7 @@
# "gpt3_13B:paddle_2.5.1:TP2PP1SH1SP4A10040G:1:8:1":"/raid/dataset/gpt-3/"
# "gpt3_13B:paddle_2.5.1:TP2PP1SH2SP4A10040G:1:8:1":"/raid/dataset/gpt-3/"
# "gpt3_13B:paddle_2.5.1:TP2PP4SH1SP1A10040G:1:8:1":"/raid/dataset/gpt-3/"

# "qwen1.5_MoE:megatron_pai:A800:1:8:1":"/raid/datasets/qwen1.5_MoE/"
# "mixtral_8x7B:megatron_core060:H100:4:8:1": "/raid/datasets/mistral"

Expand Down Expand Up @@ -200,5 +200,4 @@
#"gpt3_13B:paddle_2.6.0:TP2PP1SH2SP4C50040G:1:8:1":"/raid/data_set/data-gpt3"
#"gpt3_13B:paddle_2.6.0:TP1PP1SH2SP8C50080G:1:8:1":"/raid/data_set/data-gpt3"
# "qwen1.5_MoE:megatron_pai:C500:1:8:1":"/raid/datasets/qwen1.5_MoE/"

}
10 changes: 7 additions & 3 deletions training/run_benchmarks/flagscale/start_flagscale_task.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
'''This script is called in container to execute the real training task.
Support pytorch DDP only.
'''
import os
import sys
import subprocess
Expand Down Expand Up @@ -48,6 +45,10 @@ def parse_args():
type=int,
required=True,
help="how many processes will run on each host.")
parser.add_argument("--hosts",
type=str,
required=True,
help="hosts to run the testcase.")

parser.add_argument("--vendor",
type=str,
Expand Down Expand Up @@ -120,13 +121,16 @@ def main():
exec_cmd = "cd " + os.path.dirname(train_script_path) + ";"
exec_cmd = exec_cmd + "python run_pretraining.py"
exec_cmd = exec_cmd + " --world_size=" + str(task_args.nproc)
exec_cmd = exec_cmd + " --hosts=" + task_args.hosts
exec_cmd = exec_cmd + " --host_addr=" + task_args.host_addr
exec_cmd = exec_cmd + " --vendor=" + task_args.vendor
exec_cmd = exec_cmd + " --data_dir=" + task_args.data_dir
exec_cmd = exec_cmd + " --log_dir=" + task_log_dir
exec_cmd = exec_cmd + " --flagperf_config_file=" + config_file

task_log_file = os.path.join(task_log_dir, "rank0.out.log")

START_LOGGER.info(exec_cmd)
with open(task_log_file, "w") as f:
p = subprocess.Popen(exec_cmd,
shell=True,
Expand Down