Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[for visibility] advanced reasoning branch #168

Open
wants to merge 47 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
34d64a2
start now-reasoner-branch
rizar Jan 9, 2025
e5ef6b1
moved this to now-reasoner
rizar Jan 14, 2025
e9136fb
rollback changes in finetune.py, cause we have a copy of this script now
rizar Jan 14, 2025
432db9c
better mapping of our finetune settings to current wandb settings
rizar Jan 15, 2025
26588fc
add metadata and too old samples counter
rizar Jan 17, 2025
df6da37
Merge branch 'better_batch_tapeagents' into now-reasoner-branch
rizar Jan 20, 2025
e2c4806
Merge branch 'main' into now-reasoner-branch
rizar Jan 21, 2025
1f3854b
undo changes to data.py
rizar Jan 21, 2025
801e248
some simplifications from the other repo
rizar Jan 21, 2025
35563c7
Merge remote-tracking branch 'origin/eurus' into now-reasoner-branch
ehsk Jan 23, 2025
a8c7c02
lazy singleton accelerator
rizar Jan 24, 2025
6a91ad1
load all fields in training metrics when loading training state
ehsk Jan 27, 2025
60f116f
Merge branch 'now-reasoner-branch' of github.com:ServiceNow/TapeAgent…
ehsk Jan 27, 2025
910b459
reward
AlexPiche Jan 27, 2025
210ce2e
typo
AlexPiche Jan 27, 2025
e086dc4
builder_config for loading MATH changed
ehsk Jan 27, 2025
a46c84c
last change rolled back
ehsk Jan 27, 2025
07de544
minor issue resolved
ehsk Jan 27, 2025
3766aea
move to math 500
AlexPiche Jan 28, 2025
ca51e08
backward compatiable reward values
AlexPiche Jan 29, 2025
d8d15ed
Merge branch 'now-reasoner-branch-reward' of github.com:ServiceNow/Ta…
AlexPiche Jan 29, 2025
e299386
test_dataset for math set to MATH-500
ehsk Jan 29, 2025
568090c
builder_config added for test dataset
ehsk Jan 29, 2025
2ee467c
MATH-500 issue resolved
ehsk Jan 29, 2025
d085b9c
catch nans earlier
rizar Jan 30, 2025
38d1539
Merge pull request #181 from ServiceNow/now-reasoner-branch-reward
ehsk Jan 30, 2025
fe8d067
fix test_builder_config issue
rizar Jan 30, 2025
7f9ff19
eurus prime data
AlexPiche Jan 30, 2025
0914655
merged
AlexPiche Jan 30, 2025
7c730b6
fix test
AlexPiche Jan 30, 2025
25f23bf
do not regress on eurus
AlexPiche Jan 30, 2025
01c83c1
fixing nans in rl loss
ehsk Jan 30, 2025
1667043
use MATH500
AlexPiche Jan 30, 2025
3602b53
fix typo
AlexPiche Jan 30, 2025
ff32121
Merge pull request #184 from ServiceNow/now-reasoner-fix-nans
rizar Feb 1, 2025
bcd7242
extract_tape_training_samples updated to work with the new load_datasets
ehsk Feb 2, 2025
ac2d908
minor refactoring in case-match
ehsk Feb 2, 2025
4769dc0
Merge branch 'now-reasoner-branch' into eurus_prime_data
ehsk Feb 4, 2025
3b2770e
mean loss aggregation issue resolved
ehsk Feb 4, 2025
6a2ff04
loss mean aggregate issue resolved
ehsk Feb 4, 2025
d4ecbde
entropy added to logs
ehsk Feb 4, 2025
9c147a5
unnecessary comment removed
ehsk Feb 4, 2025
e25be8f
revert changes to extract_tape_training_samples
AlexPiche Feb 5, 2025
f6e633c
Merge branch 'eurus_prime_data' of github.com:ServiceNow/TapeAgents i…
AlexPiche Feb 5, 2025
a372df5
revert changes to extract_tape_training_samples
AlexPiche Feb 5, 2025
4abb9e0
clean up
AlexPiche Feb 5, 2025
7e0020d
Merge pull request #183 from ServiceNow/eurus_prime_data
rizar Feb 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions conf/rl_eurus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ llm:
# CoT are much longer, but the model only has 4096 tokens context
max_tokens: 3072

# EURUS already apply this template: {task}\n\nPresent the answer in LaTex format: \\boxed{Your answer}
task_template: |-
{task}
{task}\n\nPresent the answer in LaTex format: \\boxed{{Your answer}}
# https://github.com/PRIME-RL/PRIME/blob/49a58a8e4afd464f559f8d9f80418052f29cf3e4/eval/system_prompt.md?plain=1
# but note that sometimes they do not include the newline at the beginning
# https://github.com/PRIME-RL/PRIME/blob/49a58a8e4afd464f559f8d9f80418052f29cf3e4/data_preprocessing/sft_prompt.py#L1
Expand Down
5 changes: 5 additions & 0 deletions conf/rl_gsm8k.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ task_template: |-
overflow_reward: 0
max_prompt_length: 1024

rewards:
unparsable: 0
wrong_answer: 0
correct_answer: 1

vllm_config:
vllm_kwargs:
--download-dir: /mnt/llmd/base_models/
Expand Down
16 changes: 11 additions & 5 deletions examples/rl_gsm8k/deepseek_math_eval/process_utils.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
# https://github.com/deepseek-ai/DeepSeek-Math/blob/b8b0f8ce093d80bf8e9a641e44142f06d092c305/evaluation/data_processing/process_utils.py
import regex

from examples.rl_gsm8k.deepseek_math_eval.answer_extraction import extract_math_answer, strip_string
from examples.rl_gsm8k.deepseek_math_eval.answer_extraction import (
extract_math_answer, strip_string)
from examples.rl_gsm8k.deepseek_math_eval.eval_utils import parse_ground_truth


def process_eurus_test(item):
if "ability" not in item:
# math 500 test set
answer = [item["expected_answer"]]
answer = [item["answer"]]
return {
"dataset": "math500",
# Same prompt as https://github.com/PRIME-RL/PRIME/blob/49a58a8e4afd464f559f8d9f80418052f29cf3e4/README.md?plain=1#L93
"task": item["problem"] + "\n\nPresent the answer in LaTex format: \\boxed{Your answer}",
"task": item["problem"],
"answer": answer
}
else:
Expand All @@ -25,9 +26,11 @@ def process_eurus_test(item):
answer = answer.replace("\n", "")
answer = "\\boxed{" + answer + "}"
answer = extract_math_answer(item["prompt"][1]["content"], answer, task="cot")
task = item["prompt"][1]["content"]
task = task.replace("\n\nPresent the answer in LaTex format: \\boxed{Your answer}", "")
return {
"dataset": item["data_source"],
"task": item["prompt"][1]["content"],
"task": task,
"answer": answer
}

Expand All @@ -40,11 +43,14 @@ def process_gsm8k_test(item):

def process_math_test(item):
question = item["problem"]
if "subject" in item and "type" not in item:
item["type"] = item["subject"]

try:
answer = extract_math_answer(question, item["solution"], task="cot")
except Exception:
return
sample = {"dataset": "math-cot", "level": item["level"], "type": item["type"], "task": question, "answer": answer}
sample = {"dataset": "math-cot", "level": item["level"], "type": item.get("type", ""), "task": question, "answer": answer}
return sample


Expand Down
77 changes: 39 additions & 38 deletions examples/rl_gsm8k/orchestrate_rl.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import wandb
from tapeagents.agent import Agent
from tapeagents.core import LLMCall, LLMOutputParsingFailureAction, StepMetadata, TrainingText
from tapeagents.core import LLMCall, StepMetadata, TrainingText
from tapeagents.finetune.data import MASKED_TOKEN_ID
from tapeagents.finetune.logging_ import flatten_dict_config, init_wandb
from tapeagents.llms import TrainableLLM
Expand All @@ -38,23 +38,28 @@
def load_datasets(cfg: DictConfig) -> Tuple[list, list]:
match cfg.dataset_name:
case "math":
train_dataset_long_name = test_dataset_long_name = "hendrycks/competition_math"
train_dataset_long_name = "hendrycks/competition_math"
test_dataset_long_name = "HuggingFaceH4/MATH-500"
process_fn = process_math_test
test_builder_config = "default"
builder_config = "main"
case "gsm8k":
train_dataset_long_name = test_dataset_long_name = "openai/gsm8k"
process_fn = process_gsm8k_test
test_builder_config = None
builder_config = "main"
case "eurus":
train_dataset_long_name = "PRIME-RL/Eurus-2-RL-Data"
test_dataset_long_name = "alexpiche/math_test_cleaned"
test_dataset_long_name = "HuggingFaceH4/MATH-500"
process_fn = process_eurus_test
test_builder_config = None
builder_config = "default"
case _:
raise ValueError(f"Unknown dataset: {cfg.dataset_name}")

test_builder_config = test_builder_config or builder_config
train_dataset = load_dataset(train_dataset_long_name, builder_config, split="train", trust_remote_code=True)
test_dataset = load_dataset(test_dataset_long_name, builder_config, split="test", trust_remote_code=True)
test_dataset = load_dataset(test_dataset_long_name, test_builder_config, split="test", trust_remote_code=True)
train_samples = [
process_fn(s) for s in tqdm(train_dataset, desc="Processing train samples") if process_fn(s) is not None
]
Expand Down Expand Up @@ -96,7 +101,7 @@ def convert_problems_to_tapes(problems: list, cfg: DictConfig) -> list[RLMathTap
stored in metadata.
"""
tapes: list[RLMathTape] = []
for problem in tqdm(problems, desc="Converting problems to unique tapes", unit="problem"):
for problem in problems:
start_step = Task(
task=problem["task"],
template=cfg.task_template,
Expand All @@ -112,19 +117,16 @@ def convert_problems_to_tapes(problems: list, cfg: DictConfig) -> list[RLMathTap


def extract_tape_training_samples(
new_tape: RLMathTape, agent: CoTMathAgent, split_name: str, cfg: DictConfig
new_tape: RLMathTape, agent: CoTMathAgent, cfg: DictConfig
) -> Tuple[List[TrainingText], Dict[str, int]]:
"""
Process a single tape to extract training samples and statistics.

Args:
new_tape: The tape to process containing math problem steps
agent: CoTMathAgent
split_name: Name of split ('train' or 'test')
tapes_dir: Directory to save processed tapes
cfg: Configuration
llm_calls: List of LLM calls
strict: check that every token matches between the vLLM and the HF tokenizer otherwise just compare their lengths

Returns:
Tuple containing:
Expand All @@ -133,23 +135,23 @@ def extract_tape_training_samples(
"""
tape_prompt_tokens = 0
tape_output_tokens = 0

match cfg.dataset_name:
case "math":
case name if name.startswith("math") or name.startswith("eurus"):
eval_fn = eval_math
extract_fn = extract_math_answer
case "gsm8k":
eval_fn = eval_last_single_answer
extract_fn = extract_last_single_answer
case "eurus":
eval_fn = eval_math
extract_fn = extract_math_answer
case _:
raise ValueError(f"Unknown dataset: {cfg.dataset_name}")

if any([isinstance(step, LLMOutputParsingFailureAction) for step in new_tape.steps]):
# LLM produced a step that was unparsable. Negative reward.
no_error, reward, success = 0, -1, 0

if "\\boxed" not in new_tape.steps[-1].reasoning:
# LLM did not respect the formatting
no_error, success, reward = 0, 0, cfg.rewards.unparsable
else:
# LLM did respect the formatting
no_error = 1
prediction = extract_fn(new_tape.steps[0].task, new_tape.steps[-1].reasoning, "cot") # type: ignore
answer = new_tape.steps[0].metadata.other["value"]
Expand All @@ -160,10 +162,10 @@ def extract_tape_training_samples(
}
):
# Correct answer
reward, success = 1, 1
reward, success = cfg.rewards.correct_answer, 1
else:
# Incorrect answer or no answer
reward, success = 0, 0
reward, success = cfg.rewards.wrong_answer, 0

training_samples: list[TrainingText] = []
# For each LLM interaction in the tape:
Expand All @@ -182,25 +184,24 @@ def extract_tape_training_samples(
tape_output_tokens += llm_call.output_length_tokens

overflows = []
if split_name == "train":
trace = agent.llm.make_training_text(llm_call.prompt, llm_call.output)

input_ids = [lp.token_id for lp in llm_call.logprobs]
labels = [lp.token_id for lp in llm_call.logprobs if lp.generated]
# MASKED_TOKEN_ID is -100 and is the default "ignore_index" in nn.CrossEntropyLoss,
# see https://pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html
labels = [MASKED_TOKEN_ID] * (len(input_ids) - len(labels)) + labels

trace.input_ids = input_ids
trace.labels = labels

# check if the last produced token is the end of sequence token
overflow = False if input_ids[-1] == agent.llm.tokenizer.eos_token_id else True
trace.reward = cfg.overflow_reward if overflow else reward
overflows.append(overflow)
trace.logprobs = [lp.logprob for lp in llm_call.logprobs if lp.generated]
trace.group_id = new_tape.metadata.parent_id
training_samples.append(trace)
trace = agent.llm.make_training_text(llm_call.prompt, llm_call.output)

input_ids = [lp.token_id for lp in llm_call.logprobs]
labels = [lp.token_id for lp in llm_call.logprobs if lp.generated]
# MASKED_TOKEN_ID is -100 and is the default "ignore_index" in nn.CrossEntropyLoss,
# see https://pytorch.org/docs/stable/generated/torch.nn.CrossEntropyLoss.html
labels = [MASKED_TOKEN_ID] * (len(input_ids) - len(labels)) + labels

trace.input_ids = input_ids
trace.labels = labels

# check if the last produced token is the end of sequence token
overflow = False if input_ids[-1] == agent.llm.tokenizer.eos_token_id else True
trace.reward = cfg.rewards.unparsable if overflow else reward
overflows.append(overflow)
trace.logprobs = [lp.logprob for lp in llm_call.logprobs if lp.generated]
trace.group_id = new_tape.metadata.parent_id
training_samples.append(trace)

tape_stats = {
"reward": reward,
Expand Down Expand Up @@ -266,7 +267,7 @@ def generate_training_data(
logger.info(f"Making tapes took {time.time() - start_making_tapes}")

for new_tape in tqdm(final_tapes, total=len(final_tapes), desc="Extracting training data from tapes", unit="tape"):
tape_training_samples, tape_stats = extract_tape_training_samples(new_tape, agent_replicas[0], split_name, cfg)
tape_training_samples, tape_stats = extract_tape_training_samples(new_tape, agent_replicas[0], cfg)
training_samples.extend(tape_training_samples)
reward_stats[new_tape.metadata.parent_id].append(tape_stats["reward"])
step_stats[new_tape.metadata.parent_id].append(tape_stats["steps"])
Expand Down
1 change: 1 addition & 0 deletions tapeagents/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class TrainingText(BaseModel):
input_ids: List[int] = Field(default_factory=list)
labels: List[int] = Field(default_factory=list)
group_id: str | None = None
metadata: dict = Field(default_factory=dict)

@property
def prompt_text(self) -> str:
Expand Down
43 changes: 21 additions & 22 deletions tapeagents/finetune/checkpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
)
from transformers.models.auto.modeling_auto import _BaseAutoModelClass

from .context import accelerator, logger
from .context import get_accelerator, logger
from .lora import has_lora_checkpoint, lora_load, lora_save, prepare_lora_model
from .types import ModelClass, TrainingMetrics

Expand Down Expand Up @@ -56,7 +56,7 @@ def load_tokenizer(config_name):


def load_model(args, model_class, current_dir):
accelerator.wait_for_everyone()
get_accelerator().wait_for_everyone()

assert not (
os.path.exists(current_dir / "pytorch_model.bin")
Expand All @@ -80,9 +80,9 @@ def load_model(args, model_class, current_dir):
loading_args["use_flash_attention_2"] = args.use_flash_attention

is_ds_zero_3 = False
if getattr(accelerator.state, "deepspeed_plugin", None):
if getattr(get_accelerator().state, "deepspeed_plugin", None):
del loading_args["low_cpu_mem_usage"] # deepspeed is not compatible with this option
is_ds_zero_3 = accelerator.state.deepspeed_plugin.zero_stage == 3 # type: ignore
is_ds_zero_3 = get_accelerator().state.deepspeed_plugin.zero_stage == 3 # type: ignore

if args.load_as_bf16:
loading_args["torch_dtype"] = torch.bfloat16
Expand Down Expand Up @@ -131,7 +131,7 @@ def load_model(args, model_class, current_dir):
elif args.gradient_checkpointing:
model.gradient_checkpointing_enable(gradient_checkpointing_kwargs={"use_reentrant": True})

accelerator.wait_for_everyone()
get_accelerator().wait_for_everyone()
return model


Expand Down Expand Up @@ -183,11 +183,11 @@ def _save_training_state(
else: # multi_gpu mode (no deepspeed)
# Only save training_state in main process
logger.info("Save accelerate training state")
if accelerator.is_main_process:
if get_accelerator().is_main_process:
training_state = dict(extra_training_state)
training_state["optimizer_state"] = optimizer.state_dict()
training_state["lr_scheduler_state"] = lr_scheduler.state_dict()
accelerator.save(training_state, training_state_dir / "training_state.pt")
get_accelerator().save(training_state, training_state_dir / "training_state.pt")
logger.info(f"Saved accelerate training state to {training_state_dir}")


Expand Down Expand Up @@ -254,19 +254,19 @@ def get_temporary_folder_and_move(output_dir: Path):
output_dir = output_dir.resolve()
temporary_path = output_dir.parent / ("~" + output_dir.name)

if accelerator.is_main_process:
if get_accelerator().is_main_process:
if os.path.exists(temporary_path):
logger.info(f"Deleting temporary directory {temporary_path}")
shutil.rmtree(temporary_path)
logger.info(f"Creating temporary directory {temporary_path}")
os.makedirs(temporary_path)

accelerator.wait_for_everyone()
get_accelerator().wait_for_everyone()
yield temporary_path
accelerator.wait_for_everyone()
get_accelerator().wait_for_everyone()

# Move to final path
if accelerator.is_main_process:
if get_accelerator().is_main_process:
# delete output_dir if it exists
if os.path.exists(output_dir):
logger.info(
Expand Down Expand Up @@ -322,11 +322,11 @@ def save_model_only(
The DeepSpeed version is only called on the main process because the checkpointing and conversion mechanism will gather the shards from all processes.
"""
assert not os.path.exists(output_dir) or output_dir.is_dir(), f"output_dir {output_dir} must be a directory"
accelerator.wait_for_everyone()
get_accelerator().wait_for_everyone()

logger.info(f"Save model to {output_dir}")

unwrapped_model = accelerator.unwrap_model(model) if unwrap else model
unwrapped_model = get_accelerator().unwrap_model(model) if unwrap else model
if lora:
lora_save(output_dir, unwrapped_model)
return
Expand All @@ -336,9 +336,9 @@ def save_model_only(
logger.info("Saving model using transformers save_pretrained")
unwrapped_model.save_pretrained( # type: ignore
output_dir,
is_main_process=accelerator.is_main_process,
save_function=accelerator.save,
state_dict=accelerator.get_state_dict(model),
is_main_process=get_accelerator().is_main_process,
save_function=get_accelerator().save,
state_dict=get_accelerator().get_state_dict(model),
safe_serialization=safe_serialization,
)
logger.info(f"Saved model to {output_dir}")
Expand All @@ -360,7 +360,7 @@ def save_tokenizer_only(
Can be called on *all* processes.
"""
assert not os.path.exists(output_dir) or output_dir.is_dir(), f"output_dir {output_dir} must be a directory"
if accelerator.is_main_process:
if get_accelerator().is_main_process:
logger.info(f"Save tokenizer to {output_dir}")
tokenizer.save_pretrained(output_dir)

Expand All @@ -385,12 +385,11 @@ def load_training_state(
lr_scheduler,
training_metrics: TrainingMetrics,
):
accelerator.wait_for_everyone()
get_accelerator().wait_for_everyone()
training_state = load_training_checkpoint(training_state_dir, model, optimizer, lr_scheduler)
if training_state is None:
raise ValueError(f"Could not load training state from {training_state_dir}")
training_metrics.passes = training_state["passes"]
training_metrics.completed_steps = training_state["completed_steps"]
training_metrics.best_eval_loss = training_state["best_eval_loss"]
training_metrics.best_completed_steps = training_state["best_completed_steps"]

# Update training_metrics with loaded training state (hasattr check is to avoid potential mismatches between training_metrics and training_state)
vars(training_metrics).update({key: val for key, val in training_state.items() if hasattr(training_metrics, key)})
return training_metrics
9 changes: 8 additions & 1 deletion tapeagents/finetune/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,11 @@
# (The default behavior in AcceleratedScheduler when split_batches=False is to
# step() "num_processes" times, because they expect the lr schedule to
# depend on processed samples/epochs, not completed_steps)
accelerator = Accelerator(step_scheduler_with_optimizer=False)

_accelerator = None

def get_accelerator():
global _accelerator
if _accelerator is None:
_accelerator = Accelerator(step_scheduler_with_optimizer=False)
return _accelerator
Loading