Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PCA center, GGUF import #34

Merged
merged 2 commits into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 93 additions & 22 deletions repeng/extract.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import dataclasses
import os
import typing
import warnings

Expand Down Expand Up @@ -31,6 +32,22 @@ def train(
dataset: list[DatasetEntry],
**kwargs,
) -> "ControlVector":
"""
Train a ControlVector for a given model and tokenizer using the provided dataset.

Args:
model (PreTrainedModel | ControlModel): The model to train against.
tokenizer (PreTrainedTokenizerBase): The tokenizer to tokenize the dataset.
dataset (list[DatasetEntry]): The dataset used for training.
**kwargs: Additional keyword arguments.
max_batch_size (int, optional): The maximum batch size for training.
Defaults to 32. Try reducing this if you're running out of memory.
method (str, optional): The training method to use. Can be either
"pca_diff" or "pca_center". Defaults to "pca_diff".

Returns:
ControlVector: The trained vector.
"""
dirs = read_representations(
model,
tokenizer,
Expand All @@ -39,7 +56,7 @@ def train(
)
return cls(model_type=model.config.model_type, directions=dirs)

def export_gguf(self, path: str):
def export_gguf(self, path: os.PathLike[str] | str):
"""
Export a trained ControlVector to a llama.cpp .gguf file.
Note: This file can't be used with llama.cpp yet. WIP!
Expand All @@ -62,6 +79,39 @@ def export_gguf(self, path: str):
writer.write_tensors_to_file()
writer.close()

@classmethod
def import_gguf(cls, path: os.PathLike[str] | str) -> "ControlVector":
reader = gguf.GGUFReader(path)

archf = reader.get_field("general.architecture")
if not archf or not len(archf.parts):
warnings.warn(".gguf file missing architecture field")
else:
arch = str(bytes(archf.parts[-1]), encoding="utf-8", errors="replace")
if arch != "controlvector":
warnings.warn(
f".gguf file with architecture {arch!r} does not appear to be a control vector!"
)

modelf = reader.get_field("controlvector.model_hint")
if not modelf or not len(modelf.parts):
raise ValueError(".gguf file missing controlvector.model_hint field")
model_hint = str(bytes(modelf.parts[-1]), encoding="utf-8")

directions = {}
for tensor in reader.tensors:
if not tensor.name.startswith("direction."):
continue
try:
layer = int(tensor.name.split(".")[1])
except:
raise ValueError(
f".gguf file has invalid direction field name: {tensor.name}"
)
directions[layer] = tensor.data

return cls(model_type=model_hint, directions=directions)

def _helper_combine(
self, other: "ControlVector", other_coeff: float
) -> "ControlVector":
Expand All @@ -82,6 +132,19 @@ def _helper_combine(
directions[layer] = other_layer
return ControlVector(model_type=model_type, directions=directions)

def __eq__(self, other: "ControlVector") -> bool:
if self is other:
return True

if self.model_type != other.model_type:
return False
if self.directions.keys() != other.directions.keys():
return False
for k in self.directions.keys():
if (self.directions[k] != other.directions[k]).any():
return False
return True

def __add__(self, other: "ControlVector") -> "ControlVector":
if not isinstance(other, ControlVector):
raise TypeError(
Expand Down Expand Up @@ -121,11 +184,11 @@ def read_representations(
inputs: list[DatasetEntry],
hidden_layers: typing.Iterable[int] | None = None,
batch_size: int = 32,
method: typing.Literal["pca_diff", "pca_center", "umap"] = "pca_diff",
) -> dict[int, np.ndarray]:
"""
Extract the representations based on the contrast dataset.
"""

if not hidden_layers:
hidden_layers = range(-1, -model.config.num_hidden_layers, -1)

Expand All @@ -140,31 +203,39 @@ def read_representations(
model, tokenizer, train_strs, hidden_layers, batch_size
)

# get differences between (positive, negative) pairs
relative_layer_hiddens = {}
for layer in hidden_layers:
relative_layer_hiddens[layer] = (
layer_hiddens[layer][::2] - layer_hiddens[layer][1::2]
)

# get directions for each layer using PCA
directions: dict[int, np.ndarray] = {}
for layer in tqdm.tqdm(hidden_layers):
assert layer_hiddens[layer].shape[0] == len(inputs) * 2

# fit layer directions
train = np.vstack(
relative_layer_hiddens[layer]
- relative_layer_hiddens[layer].mean(axis=0, keepdims=True)
)
pca_model = PCA(n_components=1, whiten=False).fit(train)
# shape (n_features,)
directions[layer] = pca_model.components_.astype(np.float32).squeeze(axis=0)
h = layer_hiddens[layer]
assert h.shape[0] == len(inputs) * 2

if method == "pca_diff":
train = h[::2] - h[1::2]
elif method == "pca_center":
center = (h[::2] + h[1::2]) / 2
train = h
train[::2] -= center
train[1::2] -= center
elif method == "umap":
train = h
else:
raise ValueError("unknown method " + method)

if method != "umap":
# shape (1, n_features)
pca_model = PCA(n_components=1, whiten=False).fit(train)
# shape (n_features,)
directions[layer] = pca_model.components_.astype(np.float32).squeeze(axis=0)
else:
# still experimental so don't want to add this as a real dependency yet
import umap # type: ignore

umap_model = umap.UMAP(n_components=1)
embedding = umap_model.fit_transform(train).astype(np.float32)
directions[layer] = np.sum(train * embedding, axis=0) / np.sum(embedding)

# calculate sign
projected_hiddens = project_onto_direction(
layer_hiddens[layer], directions[layer]
)
projected_hiddens = project_onto_direction(h, directions[layer])

# order is [positive, negative, positive, negative, ...]
positive_smaller_mean = np.mean(
Expand Down
177 changes: 128 additions & 49 deletions repeng/tests.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,143 @@
import functools
import json
import pathlib
import tempfile

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, PreTrainedTokenizerBase

from . import ControlModel, ControlVector, DatasetEntry
from .control import model_layer_list


def test_layer_list():
_, gpt2 = load_gpt2_model()
assert len(model_layer_list(gpt2)) == 12
_, lts = load_llama_tinystories_model()
assert len(model_layer_list(lts)) == 4


def test_round_trip_gguf():
tokenizer, model = load_llama_tinystories_model()
suffixes = load_suffixes()[:50] # truncate to train vector faster
happy_dataset = make_dataset(
"She saw a {persona}",
["mushroom"],
["cat"],
suffixes,
)
mushroom_cat_vector = ControlVector.train(
model, tokenizer, happy_dataset, method="pca_center"
)

with tempfile.NamedTemporaryFile("wb") as f:
mushroom_cat_vector.export_gguf(f.name)
read = ControlVector.import_gguf(f.name)
# no need to use allclose because we're just dumping exact bytes, no rounding
assert mushroom_cat_vector == read


def test_train_gpt2():
tokenizer, model = load_gpt2_model()
suffixes = load_suffixes()[:50] # truncate to train vector faster
happy_dataset = make_dataset(
"You are feeling extremely {persona}.",
["happy", "joyful"],
["sad", "miserable"],
suffixes,
)
happy_vector = ControlVector.train(
model, tokenizer, happy_dataset, method="pca_center"
)

def gen(vector: ControlVector | None, strength_coeff: float | None = None):
return model_generate(
"You are feeling", model, tokenizer, vector, strength_coeff
)

baseline = gen(None)
happy = gen(20 * happy_vector)
sad = gen(-50 * happy_vector)

print("baseline:", baseline)
print(" happy:", happy)
print(" sad:", sad)

assert baseline == "You are feeling a little bit of an anxiety"
# these should be identical
assert baseline == gen(happy_vector, 0.0)
assert baseline == gen(happy_vector * 0.0)
assert baseline == gen(happy_vector - happy_vector)

assert happy == "You are feeling great and happy. I'm"
# these should be identical
assert happy == gen(happy_vector, 20.0)
assert happy == gen(happy_vector * 20)
assert happy == gen(-(happy_vector * -20))

assert sad == "You are feeling the worst,\n—("


def test_train_llama_tinystories():
tokenizer, model = load_llama_tinystories_model()
suffixes = load_suffixes()[:50] # truncate to train vector faster
happy_dataset = make_dataset(
"She saw a {persona}",
["mushroom"],
["cat"],
suffixes,
)
mushroom_cat_vector = ControlVector.train(
model, tokenizer, happy_dataset, method="pca_center"
)

prompt = "Once upon a time, a little girl named Lily saw a"

def gen(vector: ControlVector | None, strength_coeff: float | None = None):
return model_generate(
prompt,
model,
tokenizer,
vector,
strength_coeff,
max_new_tokens=3,
)

baseline = gen(None).removeprefix("<s> ")
mushroom = gen(100 * mushroom_cat_vector).removeprefix("<s> ")
cat = gen(-100 * mushroom_cat_vector).removeprefix("<s> ")

print("baseline:", baseline)
print("mushroom:", mushroom)
print(" cat:", cat)

assert baseline.removeprefix(prompt) == " big, red"
assert mushroom.removeprefix(prompt) == " small plant."
assert cat.removeprefix(prompt) == " cat Bud guitar"


################################################################################
# Helpers
################################################################################


@functools.lru_cache(maxsize=1)
def load_gpt2_model() -> tuple[PreTrainedTokenizerBase, ControlModel]:
return load_model("openai-community/gpt2", list(range(-2, -8, -1)))


@functools.lru_cache(maxsize=1)
def load_model() -> tuple[PreTrainedTokenizerBase, ControlModel]:
model_name = "openai-community/gpt2"
def load_llama_tinystories_model() -> tuple[PreTrainedTokenizerBase, ControlModel]:
return load_model("Mxode/TinyStories-LLaMA2-25M-256h-4l-GQA", [2, 3])


def load_model(
model_name: str, layers: list[int]
) -> tuple[PreTrainedTokenizerBase, ControlModel]:
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token_id = tokenizer.eos_token_id
model = AutoModelForCausalLM.from_pretrained(model_name)
model = model.to("cpu")
return (tokenizer, ControlModel(model, list(range(-2, -8, -1))))
return (tokenizer, ControlModel(model, layers))


def model_generate(
Expand All @@ -25,7 +146,7 @@ def model_generate(
tokenizer: PreTrainedTokenizerBase,
vector: ControlVector | None,
strength_coeff: float | None = None,
max_new_tokens: int = 20,
max_new_tokens: int = 6,
) -> str:
input_ids = tokenizer(input, return_tensors="pt").to(model.device)
if vector is not None and strength_coeff is not None:
Expand Down Expand Up @@ -57,8 +178,8 @@ def make_dataset(
):
dataset.append(
DatasetEntry(
positive=template.format(persona=negative_persona) + f" {suffix}",
negative=template.format(persona=positive_persona) + f" {suffix}",
positive=template.format(persona=positive_persona) + f" {suffix}",
negative=template.format(persona=negative_persona) + f" {suffix}",
)
)
return dataset
Expand All @@ -76,45 +197,3 @@ def project_root() -> pathlib.Path:
if (parent / "pyproject.toml").exists():
return parent
raise RuntimeError("couldn't find project root")


def test_train():
tokenizer, model = load_model()
suffixes = load_suffixes()[:50] # truncate to train vector faster
happy_dataset = make_dataset(
"*I am a {persona} person making statements about the world.*",
["happy", "joyful"],
["sad", "miserable"],
suffixes,
)
happy_vector = ControlVector.train(model, tokenizer, happy_dataset)

baseline = model_generate("I am", model, tokenizer, None)
print("baseline:", baseline)
assert (
baseline
== "I am not a fan of the idea that you can't have an open source project without having some kind or"
)
# these should be identical
assert baseline == model_generate("I am", model, tokenizer, happy_vector, 0.0)
assert baseline == model_generate("I am", model, tokenizer, happy_vector * 0.0)
assert baseline == model_generate(
"I am", model, tokenizer, happy_vector - happy_vector
)

happy = model_generate("I am", model, tokenizer, 10 * happy_vector)
print("happy:", happy)
assert (
happy
== "I am also excited to announce that we will be hosting a special event on the first day of our new year"
)
# should be identical
assert happy == model_generate("I am", model, tokenizer, happy_vector * 10)
assert happy == model_generate("I am", model, tokenizer, -(happy_vector * -10))

sad = model_generate("I am", model, tokenizer, -15 * happy_vector)
print("sad:", sad)
assert (
sad
== "I am a fucking idiot. I'm not even trying to get you out of here, but if it's"
)
Loading