Skip to content

Commit

Permalink
Improve metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
rafapi committed Jan 30, 2025
1 parent 2276d24 commit e7acce4
Showing 1 changed file with 80 additions and 29 deletions.
109 changes: 80 additions & 29 deletions tapeagents/finetune/finetune.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,31 @@ def run_finetuning_loop(
else:
raise ValueError(f"Unknown training objective {objective}")

with open_dict(args):
# gradient accumulation steps must be divisible by num_processes
original_accum_passes = args.gradient_accumulation_passes
if original_accum_passes % num_processes != 0:
# round up to the next multiple of num_processes
new_accum_passes = ((original_accum_passes + num_processes - 1) // num_processes) * num_processes
logger.warning(
f"Adjusting gradient_accumulation_passes from {original_accum_passes} to {new_accum_passes} "
f"to make it divisible by {num_processes} processes"
)
args.gradient_accumulation_passes = new_accum_passes
# with open_dict(args):
# # gradient accumulation steps must be divisible by num_processes
# original_accum_passes = args.gradient_accumulation_passes
# if original_accum_passes % num_processes != 0:
# # round up to the next multiple of num_processes
# new_accum_passes = ((original_accum_passes + num_processes - 1) // num_processes) * num_processes
# logger.warning(
# f"Adjusting gradient_accumulation_passes from {original_accum_passes} to {new_accum_passes} "
# f"to make it divisible by {num_processes} processes"
# )
# args.gradient_accumulation_passes = new_accum_passes

# args.effective_batch_size = int(args.train_batch_size) * int(args.gradient_accumulation_passes)

with open_dict(args):
# very useful for comparing runs; note that gradient accumulation will later be divided by num_processes
args.effective_batch_size = int(args.train_batch_size) * int(args.gradient_accumulation_passes)
args.output_dir = str(output_dir)

if args.gradient_accumulation_passes % num_processes != 0:
raise ValueError(
f"Cannot {num_processes}-way parallelize the config with {args.gradient_accumulation_passes} accum passes"
)
args.gradient_accumulation_passes //= num_processes

samples_per_pass = num_processes * args.train_batch_size
set_seed(args.seed)

Expand Down Expand Up @@ -192,10 +202,10 @@ def toggle_sync(sync: bool):
# Add throughput tracking variables
training_start_time = None
total_tokens = 0
total_useful_tokens = 0
# TODO: Add the vars below to the config
recent_throughputs = deque(maxlen=100) # Store last 100 steps
num_nodes = getattr(args, "num_nodes", 2) # Default to 2 if not specified
gpus_per_node = getattr(args, "gpus_per_node", 8) # Default to 8 if not specified

while training_metrics.completed_steps < final_train_steps:
training_metrics.epoch = training_metrics.passes // len(train_dataloader)
Expand Down Expand Up @@ -223,6 +233,9 @@ def toggle_sync(sync: bool):
torch.cuda.empty_cache()

do_optimizer_step = training_metrics.passes % args.gradient_accumulation_passes == 0

# Track forward-backward pass time separately
forward_backward_start = time.time()
with torch.autocast("cuda"):
with toggle_sync(do_optimizer_step):
loss, this_step_rl_metrics = forward(model, batch)
Expand All @@ -233,10 +246,33 @@ def toggle_sync(sync: bool):
training_metrics.max_batch_len = max(batch["input_ids"].shape[1], training_metrics.max_batch_len)
training_metrics.min_batch_len = min(batch["input_ids"].shape[1], training_metrics.min_batch_len)
accelerator.backward(loss / args.gradient_accumulation_passes)
forward_backward_time = time.time() - forward_backward_start

# Gather tokens from all workers
num_tokens_tensor = torch.tensor([num_tokens], device=accelerator.device)
torch.distributed.all_reduce(num_tokens_tensor, op=torch.distributed.ReduceOp.SUM)
global_tokens = num_tokens_tensor.item()
total_possible_tokens = batch['input_ids'].numel() * accelerator.num_processes
total_gpus = accelerator.num_processes

# Calculate forward-backward throughput for every step
pass_tokens_per_gpu_per_second = (total_possible_tokens / total_gpus) / forward_backward_time if forward_backward_time > 0 else 0

if not do_optimizer_step:
continue

# Only calculate full step metrics when we do an optimizer step
step_total_time = time.time() - step_start_time
# Scale the tokens by gradient accumulation since we're measuring multiple forward passes
accumulated_tokens = total_possible_tokens * args.gradient_accumulation_passes
accumulated_useful_tokens = global_tokens * args.gradient_accumulation_passes

# Calculate padding efficiency
padding_efficiency = global_tokens / total_possible_tokens if total_possible_tokens > 0 else 0

# Start timing optimizer step
optimizer_start_time = time.time()

# All gradients have been accumulated, we can now do an optimizer step
training_metrics.completed_steps += 1
if args.gradient_clipping_threshold:
Expand All @@ -247,20 +283,14 @@ def toggle_sync(sync: bool):
lr_scheduler.step()
optimizer.zero_grad()

step_training_time = time.time() - step_start_time
optimizer_time = time.time() - optimizer_start_time
step_total_time = time.time() - step_start_time

# Gather tokens from all workers
num_tokens_tensor = torch.tensor([num_tokens], device=accelerator.device)
torch.distributed.all_reduce(num_tokens_tensor, op=torch.distributed.ReduceOp.SUM)

global_tokens = num_tokens_tensor.item()

# Update total tokens only on main process
if accelerator.is_main_process:
total_tokens += global_tokens
total_tokens += accumulated_tokens
total_useful_tokens += accumulated_useful_tokens

padding_efficiency = num_tokens / total_possible_tokens
# calculate per-step throughput and update moving average
step_throughput = global_tokens / step_total_time if step_total_time > 0 else 0

Expand All @@ -271,6 +301,11 @@ def toggle_sync(sync: bool):
else:
current_throughput = step_throughput

# Calculate samples per second
samples_this_step = samples_per_pass # This is num_processes * args.train_batch_size
samples_per_second = samples_this_step / step_total_time if step_total_time > 0 else 0
samples_per_gpu_second = samples_per_second / total_gpus

metrics_dict = {}
time_to_stop = training_metrics.completed_steps >= final_train_steps
time_to_log = training_metrics.completed_steps % args.log_each_n_steps == 0
Expand All @@ -285,42 +320,58 @@ def toggle_sync(sync: bool):
elapsed_days = elapsed_time / (24 * 3600)
node_days = elapsed_days * num_nodes

# calculate throughput metrics using total_tokens
tokens_per_second = total_tokens / elapsed_time
tokens_per_gpu_second = tokens_per_second / (num_nodes * gpus_per_node)
useful_tokens_per_second = total_useful_tokens / elapsed_time
tokens_per_gpu_second = tokens_per_second / total_gpus
useful_tokens_per_gpu_second = useful_tokens_per_second / total_gpus
tokens_per_node_second = tokens_per_second / num_nodes
useful_tokens_per_node_second = useful_tokens_per_second / num_nodes
tokens_per_hour = tokens_per_second * 3600
tokens_per_day = tokens_per_second * 24 * 3600
tokens_per_day_per_node = tokens_per_day / num_nodes
tokens_per_day_per_gpu = tokens_per_day / (num_nodes * gpus_per_node)
tokens_per_day_per_gpu = tokens_per_day / total_gpus

optimizer_step_throughput = tokens_per_second * args.gradient_accumulation_passes

metrics_dict.update({
# Average throughput metrics
# throughput metrics
"throughput/tokens_per_second": tokens_per_second,
"throughput/useful_tokens_per_second": useful_tokens_per_second,
"throughput/tokens_per_hour_M": tokens_per_hour / 1e6,
"throughput/tokens_per_day_B": tokens_per_day / 1e9,
"throughput/tokens_per_day_per_node_M": tokens_per_day_per_node / 1e6,
"throughput/tokens_per_day_per_gpu_M": tokens_per_day_per_gpu / 1e6,
"throughput/tokens_per_gpu_second": tokens_per_gpu_second,
"throughput/tokens_per_node_second": tokens_per_node_second,
"throughput/useful_tokens_per_gpu_second": useful_tokens_per_gpu_second,
"throughput/useful_tokens_per_node_second": useful_tokens_per_node_second,
"throughput/total_tokens_B": total_tokens / 1e9,
"throughput/total_useful_tokens_B": total_useful_tokens / 1e9,
"throughput/optimizer_step_throughput": optimizer_step_throughput,
"throughput/padding_efficiency": padding_efficiency,

# Current step metrics
"throughput/pass_tokens_per_gpu_per_second": pass_tokens_per_gpu_per_second,
"throughput/samples_per_second": samples_per_second,
"throughput/samples_per_gpu_second": samples_per_gpu_second,
"throughput/current_tokens_per_second": current_throughput,
"throughput/raw_step_tokens_per_second": step_throughput,

# Time metrics
# time metrics
"time/total_hours": elapsed_hours,
"time/total_nodedays": node_days,
"time/elapsed_days": elapsed_days,
"time/step_total_seconds": step_total_time,
"time/step_training_seconds": step_training_time,
"time/step_training_seconds": optimizer_time,
"time/tokens_per_step": global_tokens,
"time/tokens_per_second_this_step": global_tokens / step_total_time if step_total_time > 0 else 0,
"time/forward_backward_ratio": forward_backward_time / step_total_time if step_total_time > 0 else 0,
"time/optimizer_ratio": optimizer_time / step_total_time if step_total_time > 0 else 0,
"time/step_idle": step_total_time - (forward_backward_time + optimizer_time),
"time/forward_backward_seconds": forward_backward_time,

# batch metrics
"batch/avg_sequence_length": batch['input_ids'].shape[1],
"batch/total_sequences": batch['input_ids'].shape[0] * accelerator.num_processes,
"batch/global_batch_size": args.train_batch_size * args.gradient_accumulation_passes * accelerator.num_processes,
})

# Add memory metrics if using GPU
Expand Down

0 comments on commit e7acce4

Please sign in to comment.