diff --git a/repeng/extract.py b/repeng/extract.py index 4c1330a..55de4b0 100644 --- a/repeng/extract.py +++ b/repeng/extract.py @@ -1,4 +1,5 @@ import dataclasses +import os import typing import warnings @@ -31,6 +32,22 @@ def train( dataset: list[DatasetEntry], **kwargs, ) -> "ControlVector": + """ + Train a ControlVector for a given model and tokenizer using the provided dataset. + + Args: + model (PreTrainedModel | ControlModel): The model to train against. + tokenizer (PreTrainedTokenizerBase): The tokenizer to tokenize the dataset. + dataset (list[DatasetEntry]): The dataset used for training. + **kwargs: Additional keyword arguments. + max_batch_size (int, optional): The maximum batch size for training. + Defaults to 32. Try reducing this if you're running out of memory. + method (str, optional): The training method to use. Can be either + "pca_diff" or "pca_center". Defaults to "pca_diff". + + Returns: + ControlVector: The trained vector. + """ dirs = read_representations( model, tokenizer, @@ -39,7 +56,7 @@ def train( ) return cls(model_type=model.config.model_type, directions=dirs) - def export_gguf(self, path: str): + def export_gguf(self, path: os.PathLike[str] | str): """ Export a trained ControlVector to a llama.cpp .gguf file. Note: This file can't be used with llama.cpp yet. WIP! @@ -62,6 +79,39 @@ def export_gguf(self, path: str): writer.write_tensors_to_file() writer.close() + @classmethod + def import_gguf(cls, path: os.PathLike[str] | str) -> "ControlVector": + reader = gguf.GGUFReader(path) + + archf = reader.get_field("general.architecture") + if not archf or not len(archf.parts): + warnings.warn(".gguf file missing architecture field") + else: + arch = str(bytes(archf.parts[-1]), encoding="utf-8", errors="replace") + if arch != "controlvector": + warnings.warn( + f".gguf file with architecture {arch!r} does not appear to be a control vector!" + ) + + modelf = reader.get_field("controlvector.model_hint") + if not modelf or not len(modelf.parts): + raise ValueError(".gguf file missing controlvector.model_hint field") + model_hint = str(bytes(modelf.parts[-1]), encoding="utf-8") + + directions = {} + for tensor in reader.tensors: + if not tensor.name.startswith("direction."): + continue + try: + layer = int(tensor.name.split(".")[1]) + except: + raise ValueError( + f".gguf file has invalid direction field name: {tensor.name}" + ) + directions[layer] = tensor.data + + return cls(model_type=model_hint, directions=directions) + def _helper_combine( self, other: "ControlVector", other_coeff: float ) -> "ControlVector": @@ -82,6 +132,19 @@ def _helper_combine( directions[layer] = other_layer return ControlVector(model_type=model_type, directions=directions) + def __eq__(self, other: "ControlVector") -> bool: + if self is other: + return True + + if self.model_type != other.model_type: + return False + if self.directions.keys() != other.directions.keys(): + return False + for k in self.directions.keys(): + if (self.directions[k] != other.directions[k]).any(): + return False + return True + def __add__(self, other: "ControlVector") -> "ControlVector": if not isinstance(other, ControlVector): raise TypeError( @@ -121,11 +184,11 @@ def read_representations( inputs: list[DatasetEntry], hidden_layers: typing.Iterable[int] | None = None, batch_size: int = 32, + method: typing.Literal["pca_diff", "pca_center", "umap"] = "pca_diff", ) -> dict[int, np.ndarray]: """ Extract the representations based on the contrast dataset. """ - if not hidden_layers: hidden_layers = range(-1, -model.config.num_hidden_layers, -1) @@ -140,31 +203,39 @@ def read_representations( model, tokenizer, train_strs, hidden_layers, batch_size ) - # get differences between (positive, negative) pairs - relative_layer_hiddens = {} - for layer in hidden_layers: - relative_layer_hiddens[layer] = ( - layer_hiddens[layer][::2] - layer_hiddens[layer][1::2] - ) - # get directions for each layer using PCA directions: dict[int, np.ndarray] = {} for layer in tqdm.tqdm(hidden_layers): - assert layer_hiddens[layer].shape[0] == len(inputs) * 2 - - # fit layer directions - train = np.vstack( - relative_layer_hiddens[layer] - - relative_layer_hiddens[layer].mean(axis=0, keepdims=True) - ) - pca_model = PCA(n_components=1, whiten=False).fit(train) - # shape (n_features,) - directions[layer] = pca_model.components_.astype(np.float32).squeeze(axis=0) + h = layer_hiddens[layer] + assert h.shape[0] == len(inputs) * 2 + + if method == "pca_diff": + train = h[::2] - h[1::2] + elif method == "pca_center": + center = (h[::2] + h[1::2]) / 2 + train = h + train[::2] -= center + train[1::2] -= center + elif method == "umap": + train = h + else: + raise ValueError("unknown method " + method) + + if method != "umap": + # shape (1, n_features) + pca_model = PCA(n_components=1, whiten=False).fit(train) + # shape (n_features,) + directions[layer] = pca_model.components_.astype(np.float32).squeeze(axis=0) + else: + # still experimental so don't want to add this as a real dependency yet + import umap # type: ignore + + umap_model = umap.UMAP(n_components=1) + embedding = umap_model.fit_transform(train).astype(np.float32) + directions[layer] = np.sum(train * embedding, axis=0) / np.sum(embedding) # calculate sign - projected_hiddens = project_onto_direction( - layer_hiddens[layer], directions[layer] - ) + projected_hiddens = project_onto_direction(h, directions[layer]) # order is [positive, negative, positive, negative, ...] positive_smaller_mean = np.mean( diff --git a/repeng/tests.py b/repeng/tests.py index 233e28f..b0441fc 100644 --- a/repeng/tests.py +++ b/repeng/tests.py @@ -1,22 +1,143 @@ import functools import json import pathlib +import tempfile -import torch from transformers import AutoModelForCausalLM, AutoTokenizer, PreTrainedTokenizerBase from . import ControlModel, ControlVector, DatasetEntry +from .control import model_layer_list + + +def test_layer_list(): + _, gpt2 = load_gpt2_model() + assert len(model_layer_list(gpt2)) == 12 + _, lts = load_llama_tinystories_model() + assert len(model_layer_list(lts)) == 4 + + +def test_round_trip_gguf(): + tokenizer, model = load_llama_tinystories_model() + suffixes = load_suffixes()[:50] # truncate to train vector faster + happy_dataset = make_dataset( + "She saw a {persona}", + ["mushroom"], + ["cat"], + suffixes, + ) + mushroom_cat_vector = ControlVector.train( + model, tokenizer, happy_dataset, method="pca_center" + ) + + with tempfile.NamedTemporaryFile("wb") as f: + mushroom_cat_vector.export_gguf(f.name) + read = ControlVector.import_gguf(f.name) + # no need to use allclose because we're just dumping exact bytes, no rounding + assert mushroom_cat_vector == read + + +def test_train_gpt2(): + tokenizer, model = load_gpt2_model() + suffixes = load_suffixes()[:50] # truncate to train vector faster + happy_dataset = make_dataset( + "You are feeling extremely {persona}.", + ["happy", "joyful"], + ["sad", "miserable"], + suffixes, + ) + happy_vector = ControlVector.train( + model, tokenizer, happy_dataset, method="pca_center" + ) + + def gen(vector: ControlVector | None, strength_coeff: float | None = None): + return model_generate( + "You are feeling", model, tokenizer, vector, strength_coeff + ) + + baseline = gen(None) + happy = gen(20 * happy_vector) + sad = gen(-50 * happy_vector) + + print("baseline:", baseline) + print(" happy:", happy) + print(" sad:", sad) + + assert baseline == "You are feeling a little bit of an anxiety" + # these should be identical + assert baseline == gen(happy_vector, 0.0) + assert baseline == gen(happy_vector * 0.0) + assert baseline == gen(happy_vector - happy_vector) + + assert happy == "You are feeling great and happy. I'm" + # these should be identical + assert happy == gen(happy_vector, 20.0) + assert happy == gen(happy_vector * 20) + assert happy == gen(-(happy_vector * -20)) + + assert sad == "You are feeling the worst,\n—(" + + +def test_train_llama_tinystories(): + tokenizer, model = load_llama_tinystories_model() + suffixes = load_suffixes()[:50] # truncate to train vector faster + happy_dataset = make_dataset( + "She saw a {persona}", + ["mushroom"], + ["cat"], + suffixes, + ) + mushroom_cat_vector = ControlVector.train( + model, tokenizer, happy_dataset, method="pca_center" + ) + + prompt = "Once upon a time, a little girl named Lily saw a" + + def gen(vector: ControlVector | None, strength_coeff: float | None = None): + return model_generate( + prompt, + model, + tokenizer, + vector, + strength_coeff, + max_new_tokens=3, + ) + + baseline = gen(None).removeprefix(" ") + mushroom = gen(100 * mushroom_cat_vector).removeprefix(" ") + cat = gen(-100 * mushroom_cat_vector).removeprefix(" ") + + print("baseline:", baseline) + print("mushroom:", mushroom) + print(" cat:", cat) + + assert baseline.removeprefix(prompt) == " big, red" + assert mushroom.removeprefix(prompt) == " small plant." + assert cat.removeprefix(prompt) == " cat Bud guitar" + + +################################################################################ +# Helpers +################################################################################ + + +@functools.lru_cache(maxsize=1) +def load_gpt2_model() -> tuple[PreTrainedTokenizerBase, ControlModel]: + return load_model("openai-community/gpt2", list(range(-2, -8, -1))) @functools.lru_cache(maxsize=1) -def load_model() -> tuple[PreTrainedTokenizerBase, ControlModel]: - model_name = "openai-community/gpt2" +def load_llama_tinystories_model() -> tuple[PreTrainedTokenizerBase, ControlModel]: + return load_model("Mxode/TinyStories-LLaMA2-25M-256h-4l-GQA", [2, 3]) + +def load_model( + model_name: str, layers: list[int] +) -> tuple[PreTrainedTokenizerBase, ControlModel]: tokenizer = AutoTokenizer.from_pretrained(model_name) tokenizer.pad_token_id = tokenizer.eos_token_id model = AutoModelForCausalLM.from_pretrained(model_name) model = model.to("cpu") - return (tokenizer, ControlModel(model, list(range(-2, -8, -1)))) + return (tokenizer, ControlModel(model, layers)) def model_generate( @@ -25,7 +146,7 @@ def model_generate( tokenizer: PreTrainedTokenizerBase, vector: ControlVector | None, strength_coeff: float | None = None, - max_new_tokens: int = 20, + max_new_tokens: int = 6, ) -> str: input_ids = tokenizer(input, return_tensors="pt").to(model.device) if vector is not None and strength_coeff is not None: @@ -57,8 +178,8 @@ def make_dataset( ): dataset.append( DatasetEntry( - positive=template.format(persona=negative_persona) + f" {suffix}", - negative=template.format(persona=positive_persona) + f" {suffix}", + positive=template.format(persona=positive_persona) + f" {suffix}", + negative=template.format(persona=negative_persona) + f" {suffix}", ) ) return dataset @@ -76,45 +197,3 @@ def project_root() -> pathlib.Path: if (parent / "pyproject.toml").exists(): return parent raise RuntimeError("couldn't find project root") - - -def test_train(): - tokenizer, model = load_model() - suffixes = load_suffixes()[:50] # truncate to train vector faster - happy_dataset = make_dataset( - "*I am a {persona} person making statements about the world.*", - ["happy", "joyful"], - ["sad", "miserable"], - suffixes, - ) - happy_vector = ControlVector.train(model, tokenizer, happy_dataset) - - baseline = model_generate("I am", model, tokenizer, None) - print("baseline:", baseline) - assert ( - baseline - == "I am not a fan of the idea that you can't have an open source project without having some kind or" - ) - # these should be identical - assert baseline == model_generate("I am", model, tokenizer, happy_vector, 0.0) - assert baseline == model_generate("I am", model, tokenizer, happy_vector * 0.0) - assert baseline == model_generate( - "I am", model, tokenizer, happy_vector - happy_vector - ) - - happy = model_generate("I am", model, tokenizer, 10 * happy_vector) - print("happy:", happy) - assert ( - happy - == "I am also excited to announce that we will be hosting a special event on the first day of our new year" - ) - # should be identical - assert happy == model_generate("I am", model, tokenizer, happy_vector * 10) - assert happy == model_generate("I am", model, tokenizer, -(happy_vector * -10)) - - sad = model_generate("I am", model, tokenizer, -15 * happy_vector) - print("sad:", sad) - assert ( - sad - == "I am a fucking idiot. I'm not even trying to get you out of here, but if it's" - )