Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT Use parquet files as output #222

Merged
merged 7 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions alphadia/consensus/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import logging
import os
import pandas as pd

logger = logging.getLogger()
supported_formats = ["parquet", "tsv"]


def read_df(path_no_format, file_format="parquet"):
"""Read dataframe from disk with choosen file format

Parameters
----------

path_no_format: str
File to read from disk without file format

file_format: str, default = 'parquet'
File format for loading the file. Available options: ['parquet', 'tsv']

Returns
-------

pd.DataFrame
loaded dataframe from disk

"""

file_path = f"{path_no_format}.{file_format}"

if not os.path.exists(file_path):
raise FileNotFoundError(f"Can't load file as file was not found: {file_path}")

logger.info(f"Reading {file_path} from disk")

if file_format == "parquet":
return pd.read_parquet(file_path)

elif file_format == "tsv":
return pd.read_csv(file_path, sep="\t")

else:
raise ValueError(
f"Provided unknown file format: {file_format}, supported_formats: {supported_formats}"
)


def write_df(df, path_no_format, file_format="parquet"):
"""Read dataframe from disk with choosen file format

Parameters
----------

df: pd.DataFrame
Dataframe to save to disk

path_no_format: str
Path for file without format

file_format: str, default = 'parquet'
File format for loading the file. Available options: ['parquet', 'tsv']

"""

if file_format not in supported_formats:
GeorgWa marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(
f"Provided unknown file format: {file_format}, supported_formats: {supported_formats}"
)

file_path = f"{path_no_format}.{file_format}"

logger.info(f"Saving {file_path} to disk")

if file_format == "parquet":
df.to_parquet(file_path, index=False)

elif file_format == "tsv":
df.to_csv(file_path, sep="\t", index=False, float_format="%.6f")

else:
raise ValueError("I don't know how you ended up here")
2 changes: 2 additions & 0 deletions alphadia/constants/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ search_output:
num_samples_quadratic: 50
min_nonnan: 3
normalize_lfq: True
# can be either "parquet" or "tsv"
file_format: "tsv"

# configuration for the optimization manager
# initial parameters, will nbe optimized
Expand Down
4 changes: 2 additions & 2 deletions alphadia/outputaccumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ def parse_output_folder(


"""
psm_df = pd.read_csv(os.path.join(folder, "psm.tsv"), sep="\t")
frag_df = pd.read_csv(os.path.join(folder, "frag.tsv"), sep="\t")
psm_df = pd.read_parquet(os.path.join(folder, "psm.parquet"))
jalew188 marked this conversation as resolved.
Show resolved Hide resolved
frag_df = pd.read_parquet(os.path.join(folder, "frag.parquet"))

assert set(
selected_precursor_columns
Expand Down
71 changes: 25 additions & 46 deletions alphadia/outputtransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
TransferLearningAccumulator,
AccumulationBroadcaster,
)

from alphadia.consensus.utils import read_df, write_df

import pandas as pd
import numpy as np
Expand Down Expand Up @@ -54,22 +54,14 @@ def get_frag_df_generator(folder_list: List[str]):

for folder in folder_list:
raw_name = os.path.basename(folder)
frag_path = os.path.join(folder, "frag.tsv")
frag_path = os.path.join(folder, "frag.parquet")
GeorgWa marked this conversation as resolved.
Show resolved Hide resolved

if not os.path.exists(frag_path):
logger.warning(f"no frag file found for {raw_name}")
else:
try:
logger.info(f"reading frag file for {raw_name}")
run_df = pd.read_csv(
frag_path,
sep="\t",
dtype={
"precursor_idx": np.uint32,
"number": np.uint8,
"type": np.uint8,
},
)
run_df = pd.read_parquet(frag_path)
except Exception as e:
logger.warning(f"Error reading frag file for {raw_name}")
logger.warning(e)
Expand Down Expand Up @@ -453,20 +445,10 @@ def load_precursor_table(self):
Precursor table
"""

if not os.path.exists(
os.path.join(self.output_folder, f"{self.PRECURSOR_OUTPUT}.tsv")
):
logger.error(
f"Can't continue as no {self.PRECURSOR_OUTPUT}.tsv file was found in the output folder: {self.output_folder}"
)
raise FileNotFoundError(
f"Can't continue as no {self.PRECURSOR_OUTPUT}.tsv file was found in the output folder: {self.output_folder}"
)
logger.info(f"Reading {self.PRECURSOR_OUTPUT}.tsv file")
psm_df = pd.read_csv(
os.path.join(self.output_folder, f"{self.PRECURSOR_OUTPUT}.tsv"), sep="\t"
return read_df(
os.path.join(self.output_folder, f"{self.PRECURSOR_OUTPUT}"),
file_format=self.config["search_output"]["file_format"],
)
return psm_df

def build_precursor_table(
self,
Expand Down Expand Up @@ -497,7 +479,7 @@ def build_precursor_table(

for folder in folder_list:
raw_name = os.path.basename(folder)
psm_path = os.path.join(folder, f"{self.PSM_INPUT}.tsv")
psm_path = os.path.join(folder, f"{self.PSM_INPUT}.parquet")

logger.info(f"Building output for {raw_name}")

Expand All @@ -506,7 +488,7 @@ def build_precursor_table(
run_df = pd.DataFrame()
else:
try:
run_df = pd.read_csv(psm_path, sep="\t")
run_df = pd.read_parquet(psm_path)
except Exception as e:
logger.warning(f"Error reading psm file for {raw_name}")
logger.warning(e)
Expand Down Expand Up @@ -596,11 +578,10 @@ def build_precursor_table(
psm_df = psm_df[psm_df["decoy"] == 0]
if save:
logger.info("Writing precursor output to disk")
psm_df.to_csv(
os.path.join(self.output_folder, f"{self.PRECURSOR_OUTPUT}.tsv"),
sep="\t",
index=False,
float_format="%.6f",
write_df(
psm_df,
os.path.join(self.output_folder, self.PRECURSOR_OUTPUT),
file_format=self.config["search_output"]["file_format"],
)

return psm_df
Expand Down Expand Up @@ -661,11 +642,10 @@ def build_stat_df(

if save:
logger.info("Writing stat output to disk")
stat_df.to_csv(
os.path.join(self.output_folder, f"{self.STAT_OUTPUT}.tsv"),
sep="\t",
index=False,
float_format="%.6f",
write_df(
stat_df,
os.path.join(self.output_folder, self.STAT_OUTPUT),
file_format="tsv",
)

return stat_df
Expand Down Expand Up @@ -743,11 +723,11 @@ def build_lfq_tables(

if save:
logger.info(f"Writing {group_nice} output to disk")
lfq_df.to_csv(
os.path.join(self.output_folder, f"{group_nice}.matrix.tsv"),
sep="\t",
index=False,
float_format="%.6f",

write_df(
lfq_df,
os.path.join(self.output_folder, f"{group_nice}.matrix"),
file_format=self.config["search_output"]["file_format"],
)

protein_df_melted = lfq_df.melt(
Expand All @@ -758,11 +738,10 @@ def build_lfq_tables(

if save:
logger.info("Writing psm output to disk")
psm_df.to_csv(
os.path.join(self.output_folder, f"{self.PRECURSOR_OUTPUT}.tsv"),
sep="\t",
index=False,
float_format="%.6f",
write_df(
psm_df,
os.path.join(self.output_folder, f"{self.PRECURSOR_OUTPUT}"),
file_format=self.config["search_output"]["file_format"],
)

return lfq_df
Expand Down
2 changes: 1 addition & 1 deletion alphadia/peakgroup/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ def assemble_candidates(self, elution_group_container):
precursor_flat_lookup
]

# save features for training if desired.
# DEBUG: save features for training if desired.
if self.feature_path is not None:
feature_matrix = np.zeros(
(len(candidates), len(candidates[0].features)), dtype=np.float32
Expand Down
8 changes: 4 additions & 4 deletions alphadia/planning.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ def run(
workflow_folder_list.append(workflow.path)

# check if the raw file is already processed
psm_location = os.path.join(workflow.path, "psm.tsv")
frag_location = os.path.join(workflow.path, "frag.tsv")
psm_location = os.path.join(workflow.path, "psm.parquet")
frag_location = os.path.join(workflow.path, "frag.parquet")

if self.config["general"]["reuse_quant"]:
if os.path.exists(psm_location) and os.path.exists(frag_location):
Expand All @@ -331,8 +331,8 @@ def run(
psm_df, frag_df = workflow.requantify_fragments(psm_df)

psm_df["run"] = raw_name
psm_df.to_csv(psm_location, sep="\t", index=False)
frag_df.to_csv(frag_location, sep="\t", index=False)
psm_df.to_parquet(psm_location, index=False)
frag_df.to_parquet(frag_location, index=False)

workflow.reporter.log_string(f"Finished workflow for {raw_name}")
workflow.reporter.context.__exit__(None, None, None)
Expand Down
24 changes: 24 additions & 0 deletions tests/unit_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pandas as pd
import numpy as np
import matplotlib
import tempfile

matplotlib.use("Agg")
from matplotlib import pyplot as plt
Expand All @@ -23,6 +24,9 @@ def mock_precursor_df(
n_precursor : int
Number of precursors to generate

with_decoy : bool
If True, half of the precursors will be decoys

Returns
-------

Expand Down Expand Up @@ -185,3 +189,23 @@ def pytest_configure(config):
pytest.test_data[raw_folder] = raw_files

# important to supress matplotlib output


def random_tempfolder():
GeorgWa marked this conversation as resolved.
Show resolved Hide resolved
"""Create a randomly named temp folder in the system temp folder

Returns
-------
path : str
Path to the created temp folder

"""
tempdir = tempfile.gettempdir()
# 6 alphanumeric characters
random_foldername = "alphadia_" + "".join(
GeorgWa marked this conversation as resolved.
Show resolved Hide resolved
np.random.choice(list("abcdefghijklmnopqrstuvwxyz0123456789"), 6)
)
path = os.path.join(tempdir, random_foldername)
os.makedirs(path, exist_ok=True)
print(f"Created temp folder: {path}")
return path
25 changes: 25 additions & 0 deletions tests/unit_tests/test_consensus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import pytest
import pandas as pd
import os
from conftest import random_tempfolder
from alphadia.consensus.utils import read_df, write_df


@pytest.mark.parametrize(
"format, should_fail",
[("tsv", False), ("parquet", False), ("a321", True)],
)
def test_read_write(format, should_fail):
# given
df = pd.DataFrame([{"a": "a", "b": "b"}, {"a": "a", "b": "b"}])
path = os.path.join(random_tempfolder())

# when
if should_fail:
with pytest.raises(ValueError):
write_df(df, path, file_format=format)

else:
write_df(df, path, file_format=format)
_df = read_df(path, file_format=format)
assert df.equals(_df)
6 changes: 3 additions & 3 deletions tests/unit_tests/test_outputaccumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ def prepare_input_data():

for i, raw_folder in enumerate(raw_folders):
os.makedirs(raw_folder, exist_ok=True)
psm_dfs[i].to_csv(os.path.join(raw_folder, "psm.tsv"), sep="\t", index=False)
fragment_dfs[i].to_csv(
os.path.join(raw_folder, "frag.tsv"), sep="\t", index=False
psm_dfs[i].to_parquet(os.path.join(raw_folder, "psm.parquet"), index=False)
fragment_dfs[i].to_parquet(
os.path.join(raw_folder, "frag.parquet"), index=False
)

return config, temp_folder, raw_folders, psm_dfs, fragment_dfs
Expand Down
11 changes: 6 additions & 5 deletions tests/unit_tests/test_outputtransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def test_output_transform():
"normalize_lfq": True,
"peptide_level_lfq": False,
"precursor_level_lfq": False,
"file_format": "parquet",
},
"multiplexing": {
"enabled": False,
Expand Down Expand Up @@ -56,17 +57,17 @@ def test_output_transform():
fragment_base_df["precursor_idx"].isin(psm_df["precursor_idx"])
]

frag_df.to_csv(os.path.join(raw_folder, "frag.tsv"), sep="\t", index=False)
psm_df.to_csv(os.path.join(raw_folder, "psm.tsv"), sep="\t", index=False)
frag_df.to_parquet(os.path.join(raw_folder, "frag.parquet"), index=False)
psm_df.to_parquet(os.path.join(raw_folder, "psm.parquet"), index=False)

output = outputtransform.SearchPlanOutput(config, temp_folder)
_ = output.build_precursor_table(raw_folders, save=True)
_ = output.build_stat_df(raw_folders, save=True)
_ = output.build_lfq_tables(raw_folders, save=True)

# validate psm_df output
psm_df = pd.read_csv(
os.path.join(temp_folder, f"{output.PRECURSOR_OUTPUT}.tsv"), sep="\t"
psm_df = pd.read_parquet(
os.path.join(temp_folder, f"{output.PRECURSOR_OUTPUT}.parquet"),
)
assert all(
[
Expand Down Expand Up @@ -95,7 +96,7 @@ def test_output_transform():
assert all([col in stat_df.columns for col in ["run", "precursors", "proteins"]])

# validate protein_df output
protein_df = pd.read_csv(os.path.join(temp_folder, "pg.matrix.tsv"), sep="\t")
protein_df = pd.read_parquet(os.path.join(temp_folder, "pg.matrix.parquet"))
assert all([col in protein_df.columns for col in ["run_0", "run_1", "run_2"]])

for i in run_columns:
Expand Down
Loading
Loading