diff --git a/.gitignore b/.gitignore index c69d37f..1c21f71 100644 --- a/.gitignore +++ b/.gitignore @@ -269,4 +269,8 @@ $RECYCLE.BIN/ # Windows shortcuts *.lnk -# End of https://www.toptal.com/developers/gitignore/api/python,visualstudiocode,macos,linux,windows \ No newline at end of file +# End of https://www.toptal.com/developers/gitignore/api/python,visualstudiocode,macos,linux,windows + + +default_scoring_matrix_ser_thr.csv.gz +default_scoring_matrix_tyr.csv.gz \ No newline at end of file diff --git a/src/kinex/enrichment.py b/src/kinex/enrichment.py index b03cae0..0e1ace5 100644 --- a/src/kinex/enrichment.py +++ b/src/kinex/enrichment.py @@ -3,28 +3,64 @@ import plotly.express as px from statsmodels.stats.multitest import multipletests +from kinex.resources import ( + get_ser_thr_family, + get_ser_thr_family_colors, + get_tyr_family, + get_tyr_family_colors, +) from kinex.sequence import SequenceType from kinex.table2x2 import Table2x2 -from kinex.resources import get_ser_thr_family, get_ser_thr_family_colors, get_tyr_family, get_tyr_family_colors class Enrichment: def __init__(self, sequence_type: SequenceType, all_kinases: set): + """ + Initialize the Enrichment class. + + :param sequence_type: Type of sequence (SER_THR or TYR). + :param all_kinases: Set of all kinases. + """ self.sequence_type = sequence_type self.total_upregulated = 0 self.total_downregulated = 0 self.total_unregulated = 0 self.regulation_list = [] self.top15_kinases_list = [] - self.enrichment_table = pd.DataFrame(columns=['kinase', 'upregulated', 'downregulated', 'unregulated']) + self.enrichment_table = pd.DataFrame( + columns=["kinase", "upregulated", "downregulated", "unregulated"] + ) self.all_kinases = all_kinases def adjust_background_sites(self): + """ + Adjust the background sites if the total unregulated is zero. + """ if self.total_unregulated == 0: - self.total_unregulated = np.min( - [self.total_upregulated, self.total_downregulated]) / 2 + self.total_unregulated = ( + np.min([self.total_upregulated, self.total_downregulated]) / 2 + ) def fisher_statistics(self): + """ + Perform Fisher's exact test to analyze kinase enrichment in regulation states. + + This method calculates the statistical enrichment of kinases associated with + upregulated or downregulated sites using Fisher's exact test. The test determines + whether there is a significant association between two categorical variables: + - Presence or absence of a specific kinase. + - Regulation state (upregulated or downregulated). + + Outputs: + - p-value: Probability of observing the given enrichment by chance. + - Enrichment values: Ratio of regulated sites (up or down) to the total number of sites. + + Updates the enrichment table with the following columns: + - `upregulated_enrichment_value`: Enrichment value for upregulated sites. + - `downregulated_enrichment_value`: Enrichment value for downregulated sites. + - `dominant_direction`: Regulation state with the highest enrichment value. + """ + new_columns = [ "upregulated_enrichment_value", "upregulated_enrichment_value_log2", @@ -32,133 +68,200 @@ def fisher_statistics(self): "upregulated_p_value_log10_abs", "upregulated_adjusted_p_value", "upregulated_adjusted_p_value_log10_abs", - "downregulated_enrichment_value", "downregulated_enrichment_value_log2", "downregulated_p_value", "downregulated_p_value_log10_abs", "downregulated_adjusted_p_value", "downregulated_adjusted_p_value_log10_abs", - "dominant_direction", "dominant_enrichment_value_log2", "dominant_p_value_log10_abs", "dominant_adjusted_p_value_log10_abs", ] - # Create new columns + # Create new columns in the enrichment table self.enrichment_table = self.enrichment_table.assign( - **{col: None for col in new_columns}) + **{col: None for col in new_columns} + ) for i in range(len(self.enrichment_table)): - # Add background - # TODO Look into adding unregulated_hit backgroud - if self.enrichment_table.loc[i, "unregulated"] == 0: - self.enrichment_table.loc[i, "unregulated"] += 1 - - # Save hits to the vars - upregulated_hit = int(self.enrichment_table.iloc[i]["upregulated"]) - downregulated_hit = int( - self.enrichment_table.iloc[i]["downregulated"]) - unregulated_hit = int(self.enrichment_table.iloc[i]["unregulated"]) - - # Upregulated enrichment and p values - if self.total_upregulated == 0 or upregulated_hit == 0: - upregulated_enrichment_value = 0 - upregulated_p_value = 1 - self.enrichment_table.loc[i, - "upregulated_enrichment_value_log2"] = 0 - self.enrichment_table.loc[i, - "upregulated_p_value_log10_abs"] = 0 - else: - upregulated_contingency_table = Table2x2([[upregulated_hit, self.total_upregulated - upregulated_hit], - [unregulated_hit, self.total_unregulated - unregulated_hit]], - shift_zeros=True - ) - upregulated_enrichment_value = upregulated_contingency_table.odds_ratio() - upregulated_p_value = upregulated_contingency_table.p_val( - mode="greater") - self.enrichment_table.loc[i, "upregulated_enrichment_value_log2"] = np.abs(np.log2( - upregulated_enrichment_value)) - self.enrichment_table.loc[i, "upregulated_p_value_log10_abs"] = np.absolute( - np.log10(upregulated_p_value)) - - # Downregulated enrichment and p values - if self.total_downregulated == 0 or downregulated_hit == 0: - downregulated_enrichment_value = 0 - downregulated_p_value = 1 - self.enrichment_table.loc[i, - "downregulated_enrichment_value_log2"] = 0 - self.enrichment_table.loc[i, - "downregulated_p_value_log10_abs"] = 0 - else: - downregulated_contingency_table = Table2x2( - [[downregulated_hit, self.total_downregulated - downregulated_hit], - [unregulated_hit, self.total_unregulated - unregulated_hit]], - shift_zeros=True - ) - downregulated_enrichment_value = downregulated_contingency_table.odds_ratio() - downregulated_p_value = downregulated_contingency_table.p_val( - mode="greater") - - self.enrichment_table.loc[i, "downregulated_enrichment_value_log2"] = -np.abs(np.log2( - downregulated_enrichment_value)) - self.enrichment_table.loc[i, "downregulated_p_value_log10_abs"] = np.absolute( - np.log10(downregulated_p_value)) - - # Set the enrichment and p values - self.enrichment_table.loc[i, - "upregulated_enrichment_value"] = upregulated_enrichment_value - self.enrichment_table.loc[i, - "upregulated_p_value"] = upregulated_p_value - - self.enrichment_table.loc[i, - "downregulated_enrichment_value"] = downregulated_enrichment_value - self.enrichment_table.loc[i, - "downregulated_p_value"] = downregulated_p_value - - # Determine the dominant direction, either upregulated or downregulated. - if upregulated_enrichment_value > downregulated_enrichment_value: - self.enrichment_table.loc[i, - "dominant_direction"] = "upregulated set" - self.enrichment_table.loc[i, "dominant_enrichment_value_log2"] = self.enrichment_table.loc[i, - "upregulated_enrichment_value_log2"] - self.enrichment_table.loc[i, "dominant_p_value_log10_abs"] = self.enrichment_table.loc[i, - "upregulated_p_value_log10_abs"] - else: - self.enrichment_table.loc[i, - "dominant_direction"] = "downregulated set" - self.enrichment_table.loc[i, "dominant_enrichment_value_log2"] = self.enrichment_table.loc[i, - "downregulated_enrichment_value_log2"] - self.enrichment_table.loc[i, "dominant_p_value_log10_abs"] = self.enrichment_table.loc[i, - "downregulated_p_value_log10_abs"] - - # Calculate adjusted p values + self._calculate_enrichment_for_row(i) + + self._calculate_adjusted_p_values() + + self.enrichment_table = self.enrichment_table.set_index("kinase") + + self._reindex_missing_kinases() + + def _calculate_enrichment_for_row(self, index: int): + """ + Calculate enrichment values for a specific row. + + :param index: Index of the row in the enrichment table. + """ + # Add background + # TODO Look into adding unregulated_hit backgroud + if self.enrichment_table.loc[index, "unregulated"] == 0: + self.enrichment_table.loc[index, "unregulated"] += 1 + + # Save hits to the vars + upregulated_hit = int(self.enrichment_table.iloc[index]["upregulated"]) + downregulated_hit = int(self.enrichment_table.iloc[index]["downregulated"]) + unregulated_hit = int(self.enrichment_table.iloc[index]["unregulated"]) + + # Upregulated enrichment and p values + if self.total_upregulated == 0 or upregulated_hit == 0: + upregulated_enrichment_value = 0 + upregulated_p_value = 1 + self.enrichment_table.loc[index, "upregulated_enrichment_value_log2"] = 0 + self.enrichment_table.loc[index, "upregulated_p_value_log10_abs"] = 0 + else: + upregulated_contingency_table = Table2x2( + [ + [upregulated_hit, self.total_upregulated - upregulated_hit], + [unregulated_hit, self.total_unregulated - unregulated_hit], + ], + shift_zeros=True, + ) + upregulated_enrichment_value = upregulated_contingency_table.odds_ratio() + upregulated_p_value = upregulated_contingency_table.p_val(mode="greater") + self.enrichment_table.loc[index, "upregulated_enrichment_value_log2"] = ( + np.abs(np.log2(upregulated_enrichment_value)) + ) + self.enrichment_table.loc[index, "upregulated_p_value_log10_abs"] = ( + np.absolute(np.log10(upregulated_p_value)) + ) + + # Downregulated enrichment and p values + if self.total_downregulated == 0 or downregulated_hit == 0: + downregulated_enrichment_value = 0 + downregulated_p_value = 1 + self.enrichment_table.loc[index, "downregulated_enrichment_value_log2"] = 0 + self.enrichment_table.loc[index, "downregulated_p_value_log10_abs"] = 0 + else: + downregulated_contingency_table = Table2x2( + [ + [downregulated_hit, self.total_downregulated - downregulated_hit], + [unregulated_hit, self.total_unregulated - unregulated_hit], + ], + shift_zeros=True, + ) + downregulated_enrichment_value = ( + downregulated_contingency_table.odds_ratio() + ) + downregulated_p_value = downregulated_contingency_table.p_val( + mode="greater" + ) + + self.enrichment_table.loc[index, "downregulated_enrichment_value_log2"] = ( + -np.abs(np.log2(downregulated_enrichment_value)) + ) + self.enrichment_table.loc[index, "downregulated_p_value_log10_abs"] = ( + np.absolute(np.log10(downregulated_p_value)) + ) + + # Set the enrichment and p values + self.enrichment_table.loc[index, "upregulated_enrichment_value"] = ( + upregulated_enrichment_value + ) + self.enrichment_table.loc[index, "upregulated_p_value"] = upregulated_p_value + + self.enrichment_table.loc[index, "downregulated_enrichment_value"] = ( + downregulated_enrichment_value + ) + self.enrichment_table.loc[index, "downregulated_p_value"] = ( + downregulated_p_value + ) + + self._determine_dominant_direction(index) + + def _determine_dominant_direction(self, index: int): + """ + Determine the dominant direction (upregulated or downregulated) for a specific row. + + :param index: Index of the row in the enrichment table. + """ + if ( + self.enrichment_table.loc[index, "upregulated_enrichment_value"] + > self.enrichment_table.loc[index, "downregulated_enrichment_value"] + ): + self.enrichment_table.loc[index, "dominant_direction"] = "upregulated set" + self.enrichment_table.loc[index, "dominant_enrichment_value_log2"] = ( + self.enrichment_table.loc[index, "upregulated_enrichment_value_log2"] + ) + self.enrichment_table.loc[index, "dominant_p_value_log10_abs"] = ( + self.enrichment_table.loc[index, "upregulated_p_value_log10_abs"] + ) + else: + self.enrichment_table.loc[index, "dominant_direction"] = "downregulated set" + self.enrichment_table.loc[index, "dominant_enrichment_value_log2"] = ( + self.enrichment_table.loc[index, "downregulated_enrichment_value_log2"] + ) + self.enrichment_table.loc[index, "dominant_p_value_log10_abs"] = ( + self.enrichment_table.loc[index, "downregulated_p_value_log10_abs"] + ) + + def _calculate_adjusted_p_values(self): + """ + Calculate adjusted p-values using the Benjamini-Hochberg method. + """ upregulated_adjusted_p_value = multipletests( - self.enrichment_table["upregulated_p_value"], method="fdr_bh") - self.enrichment_table["upregulated_adjusted_p_value"] = upregulated_adjusted_p_value[1] + self.enrichment_table["upregulated_p_value"], method="fdr_bh" + ) + self.enrichment_table["upregulated_adjusted_p_value"] = ( + upregulated_adjusted_p_value[1] + ) downregulated_adjusted_p_value = multipletests( - self.enrichment_table["downregulated_p_value"], method="fdr_bh") - self.enrichment_table["downregulated_adjusted_p_value"] = downregulated_adjusted_p_value[1] + self.enrichment_table["downregulated_p_value"], method="fdr_bh" + ) + self.enrichment_table["downregulated_adjusted_p_value"] = ( + downregulated_adjusted_p_value[1] + ) for i in range(len(self.enrichment_table)): - # adjusted p values log10 abs and dominant adjusted p values log10 abs - self.enrichment_table.loc[i, "upregulated_adjusted_p_value_log10_abs"] = np.absolute( - np.log10(self.enrichment_table.loc[i, "upregulated_adjusted_p_value"])) - self.enrichment_table.loc[i, "downregulated_adjusted_p_value_log10_abs"] = np.absolute( - np.log10(self.enrichment_table.loc[i, "downregulated_adjusted_p_value"])) - - if self.enrichment_table.loc[i, "dominant_direction"] == "downregulated set": - self.enrichment_table.loc[i, "dominant_adjusted_p_value_log10_abs"] = self.enrichment_table.loc[i, - "downregulated_adjusted_p_value_log10_abs"] - elif self.enrichment_table.loc[i, "dominant_direction"] == "upregulated set": - self.enrichment_table.loc[i, "dominant_adjusted_p_value_log10_abs"] = self.enrichment_table.loc[i, - "upregulated_adjusted_p_value_log10_abs"] - self.enrichment_table = self.enrichment_table.set_index("kinase") + # Adjusted p values log10 abs and dominant adjusted p values log10 abs + self.enrichment_table.loc[i, "upregulated_adjusted_p_value_log10_abs"] = ( + np.absolute( + np.log10( + self.enrichment_table.loc[i, "upregulated_adjusted_p_value"] + ) + ) + ) + self.enrichment_table.loc[i, "downregulated_adjusted_p_value_log10_abs"] = ( + np.absolute( + np.log10( + self.enrichment_table.loc[i, "downregulated_adjusted_p_value"] + ) + ) + ) + if ( + self.enrichment_table.loc[i, "dominant_direction"] + == "downregulated set" + ): + self.enrichment_table.loc[i, "dominant_adjusted_p_value_log10_abs"] = ( + self.enrichment_table.loc[ + i, "downregulated_adjusted_p_value_log10_abs" + ] + ) + elif ( + self.enrichment_table.loc[i, "dominant_direction"] == "upregulated set" + ): + self.enrichment_table.loc[i, "dominant_adjusted_p_value_log10_abs"] = ( + self.enrichment_table.loc[ + i, "upregulated_adjusted_p_value_log10_abs" + ] + ) + + def _reindex_missing_kinases(self): + """ + Reindex the enrichment table to include missing kinases. + """ missing_kinases = list(self.all_kinases - set(self.enrichment_table.index)) self.enrichment_table = self.enrichment_table.reindex( - self.enrichment_table.index.union(missing_kinases), fill_value=0) + self.enrichment_table.index.union(missing_kinases), fill_value=0 + ) def plot(self, use_adjusted_pval: bool = False): match self.sequence_type: @@ -169,7 +272,7 @@ def plot(self, use_adjusted_pval: bool = False): kinase_family = get_tyr_family() family_colors = get_tyr_family_colors() case _: - raise ValueError(f"Invalid sequence type") + raise ValueError("Invalid sequence type") family = [] for kinase in self.enrichment_table.index: @@ -192,10 +295,10 @@ def plot(self, use_adjusted_pval: bool = False): text=self.enrichment_table.index, color_discrete_map=family_colors, # Use the defined color mapping # category_orders={"family": category_orders}, - template="none" + template="none", ) - # add horizontal line at y = 1.3 which is absolute val of log10(0.05) + # Add horizontal line at y = 1.3 which is absolute val of log10(0.05) fig.add_hline( y=1.3, line_dash="dash", @@ -205,9 +308,9 @@ def plot(self, use_adjusted_pval: bool = False): annotation_position="top right", annotation_font_size=11, annotation_font_color="black", - opacity=0.7 + opacity=0.7, ) - # add vertical line at position x = 0 + # Add vertical line at position x = 0 fig.add_vline( x=0, annotation_text="Inhibited | Activated", @@ -215,31 +318,32 @@ def plot(self, use_adjusted_pval: bool = False): annotation_font_size=11, line_width=1, line_color="black", - opacity=0.5 + opacity=0.5, ) - # format the text that appears on the points. (the kinases names) + # Format the text that appears on the points. (the kinases names) fig.update_traces( textfont_size=7, textposition="middle right", marker=dict(size=6), ) - # format the legend and axis + # Format the legend and axis fig.update_layout( legend=dict(font=dict(size=10)), legend_title=dict(font=dict(size=14, color="black")), legend_title_text="Family", xaxis_title_font=dict(size=12), # Set x-axis label font size yaxis_title_font=dict(size=12), # Set y-axis label font size - xaxis_title='Log\u2082(EOR)', + xaxis_title="Log\u2082(EOR)", yaxis_title=y_axis_title, title={ - 'text': "Kinase inference", - 'x': 0.5, - 'xanchor': 'center', - "font_size": 15}, + "text": "Kinase inference", + "x": 0.5, + "xanchor": "center", + "font_size": 15, + }, xaxis=dict(ticks="outside", mirror=True, showline=True), yaxis=dict(ticks="outside", mirror=True, showline=True), width=600, - height=600 + height=600, ) - return fig \ No newline at end of file + return fig diff --git a/src/kinex/functions.py b/src/kinex/functions.py index 10e51fd..cfd2487 100644 --- a/src/kinex/functions.py +++ b/src/kinex/functions.py @@ -1,10 +1,8 @@ import requests from importlib import resources -from math import pow, sqrt from pathlib import Path import numpy as np -import pandas as pd def get_sequence_format(sequence: str) -> str: """ @@ -104,140 +102,36 @@ def check_sequence(sequence: str, sequence_format: str) -> bool: return False return True - -def get_columns(sequence: str, sequence_format: str = "*") -> list: +def get_distances(experiment1: dict, experiment2: dict) -> np.ndarray: """ - Makes a list of column names based on the aminoacid and position in the input sequence. - With the phospho-priming option, it includes the phsopho-residues in the phospho-acceptor's vicinity. - - Parameters - ---------- - sequence : str - Phosphosite sequence - sequence_format : str, default '*' - Specify which sequence format to use: '*' or 'central' - - Returns - ------- - list - List of strings corresponding to the PSSM columns names, based on the position-aminoacid relation in the input sequence. - - Example - ------- - >>> get_columns(sequence='GRNtSLs*PVQA', sequence_format="*", phospho_priming=True) - ['-5R', '-4N', '-3t', '-2S', '-1L', '1P', '2V', '3Q', '4A'] - """ - - sequence_length = len(sequence) - column = [] - part_id = 0 - - if sequence_format == "central": - parts = [ - sequence[: sequence_length // 2], - sequence[sequence_length // 2 + 1:], - ] # split the word in half - for part in parts: # take first half and second half of the word - if part_id == 0: - part = part[::-1] # remove the S or T from position 0 - for position, aminoacid in enumerate(part): - if ( - aminoacid == "_" or aminoacid == "X" - ): # jump over a missing character ("_") or trucation ("X"). - continue - if part_id == 0: - pos = (position + 1) * (-1) - else: - pos = position + 1 - if pos in range(-5, 5): - column.append(f"{pos}{aminoacid}") - part_id += 1 - elif sequence_format == "*": - parts = sequence.split("*") - for part in parts: # take first half and second half of the word - if part_id == 0: - part = part[::-1][1:] # remove the S or T from position 0 - for position, aminoacid in enumerate(part): - if ( - aminoacid == "_" or aminoacid == "X" - ): # jump over a missing character ("_") or trucation ("X"). - continue - if part_id == 0: - pos = (position + 1) * (-1) - else: - pos = position + 1 - if pos in range(-5, 5): - column.append(f"{pos}{aminoacid}") - part_id += 1 - - return sorted(column, key=lambda item: int(item[:-1])) - - -def score(sequence: str, sequence_format: str, pssm: pd.DataFrame, favorability: bool = False) -> pd.DataFrame: - """ - Computes the scores for each of the 303 kinases present in the PSSM table using the list of columns returned by get_columns function. - - Parameters - ---------- - sequence : str - Phosphosite sequence - sequence_format : str, default '*' - Specify which sequence format to use: '*' or 'central' - pssm: pandas.DataFrame - Position Specific Score Matrix of 303 Ser/Thr kinases - favorability: bool, default False - Enable/Disable phospho-acceptor favorability - - Returns - ------- - pandas.DataFrame - A table of length 303 with index column 'kinase' and a 'score' column containing the calculated scores. - - Example - ------- - >>> score(sequence='EGRNSLS*PVQATQ', sequence_format='*', pssm=pssm, phospho_priming=False, favorability=False) - score - kinase - NLK 8.432319 - JNK3 19.764808 - CDK4 12.719801 - SMG1 1.286346 - JNK1 19.502893 - ... ... - TLK2 0.056653 - RAF1 0.132770 - PASK 0.118487 - CDC7 0.160988 - TLK1 0.024576 - - [303 rows x 1 columns] + + Calculate the Euclidean distance between two experiments based on their + dominant enrichment and p-value scores. + """ - pssm = pssm.reset_index() - - columns = get_columns( - sequence, sequence_format) - columns.append("kinase") - - if favorability == True: - seq_upper = sequence.upper() - if seq_upper[len(sequence) // 2] == "S" or "S*" in seq_upper: - columns.append("0S") - elif seq_upper[len(sequence) // 2] == "T" or "T*" in seq_upper: - columns.append("0T") - - df = pssm[columns] - df.insert(0, "score", df.prod(axis=1, numeric_only=True)) - df = df[["kinase", "score"]] - df = df.set_index("kinase") - return df - - -def get_distances(experiment1, experiment2): - enrich = np.array(experiment1['dominant_enrichment_value_log2']) - \ - np.array(experiment2['dominant_enrichment_value_log2']) - p_val = np.array(experiment1['dominant_p_value_log10_abs']) - \ - np.array(experiment2['dominant_p_value_log10_abs']) - return np.power(np.power(enrich, 2) + np.power(p_val, 2), 0.5) + + # Validate input keys + required_keys = ['dominant_enrichment_value_log2', 'dominant_p_value_log10_abs'] + for key in required_keys: + if key not in experiment1 or key not in experiment2: + raise ValueError(f"Both experiments must contain the key '{key}'.") + + # Convert input values to NumPy arrays + enrich1 = np.array(experiment1['dominant_enrichment_value_log2']) + enrich2 = np.array(experiment2['dominant_enrichment_value_log2']) + p_val1 = np.array(experiment1['dominant_p_value_log10_abs']) + p_val2 = np.array(experiment2['dominant_p_value_log10_abs']) + + # Ensure arrays have the same length + if enrich1.shape != enrich2.shape or p_val1.shape != p_val2.shape: + raise ValueError("The arrays in the two experiments must have the same shape.") + + # Calculate the Euclidean distance + enrich_diff = enrich1 - enrich2 + p_val_diff = p_val1 - p_val2 + distances = np.sqrt(np.power(enrich_diff, 2) + np.power(p_val_diff, 2)) + + return distances def download_file_to_resource(url: str, resource_name: str) -> None: diff --git a/src/kinex/kinex.py b/src/kinex/kinex.py index 6d66439..7bb345d 100644 --- a/src/kinex/kinex.py +++ b/src/kinex/kinex.py @@ -5,18 +5,21 @@ import numpy as np import pandas as pd +from kinex.enrichment import Enrichment from kinex.functions import download_file_to_resource from kinex.resources import get_pssm_ser_thr, get_pssm_tyr, get_scoring_matrix_ser_thr, get_scoring_matrix_tyr, get_configuration_file from kinex.score import Score from kinex.enrichment import Enrichment from kinex.sequence import get_sequence_object, SequenceType + EnrichmentResults = namedtuple("EnrichmentResults", ["ser_thr", "tyr", "failed_sites"]) # Load the pyproject.toml file config = get_configuration_file() + class Kinex: """ The class representing a PSSM table and a scoring matrix needed for scoring and enrichment analysis. @@ -24,7 +27,7 @@ class Kinex: Attributes ---------- pssm_ser_thr : pandas.DataFrame - Normalised and scaled densiometries from PSPA experiments. + Normalised and scaled densiometries from PSPA experiments. The table cotains on rows the kinases and on columns the positions for each aminoacid. scoring_matrix_ser_thr : pandas.DataFrame Table containing 82,755 experimentally identified Ser/Thr phosphosites that have been scored by 303 Ser or Thr kinase PSSM. @@ -35,7 +38,7 @@ class Kinex: get_score(self, validation: str, phospho_priming: bool = False, favorability: bool = False, method: str = 'avg') -> score.Score Checks the validation format and it's validity; computes the scores and ranks the kinases. get_enrichment(self, input_sites: pd.DataFrame, fc_threshold: float = 1.5) -> enrichment.Enrichment - Checks regulation and plots the the enrichment vs p-value. + Checks regulation and plots the the enrichment vs p-value. Examples -------- @@ -63,24 +66,26 @@ class Kinex: """ - def __init__(self, - scoring_matrix_ser_thr: pd.DataFrame = None, - scoring_matrix_tyr: pd.DataFrame = None, - pssm_ser_thr: pd.DataFrame = get_pssm_ser_thr(), - pssm_tyr: pd.DataFrame = get_pssm_tyr()) -> None: + def __init__( + self, + scoring_matrix_ser_thr: pd.DataFrame = None, + scoring_matrix_tyr: pd.DataFrame = None, + pssm_ser_thr: pd.DataFrame = get_pssm_ser_thr(), + pssm_tyr: pd.DataFrame = get_pssm_tyr(), + ) -> None: """ Initializes the instance of the Kinex class. Parameters ---------- pssm_ser_thr : pandas.DataFrame - Normalized and scaled densiometries from PSPA experiments. + Normalized and scaled densiometries from PSPA experiments. The table cotains on rows the kinases and on columns the positions for each aminoacid. scoring_matrix_ser_thr : pandas.DataFrame Table containing 82,755 experimentally identified Ser/Thr phosphosites that have been scored by 303 Ser or Thr kinase PSSM. The table allows the ranking of kinases, as well as the calculation of promiscuity index and median percentile for each input validation. """ - + # Matrix is not provided if scoring_matrix_ser_thr is None: # Trying to look for the matrix in the resources @@ -90,8 +95,7 @@ def __init__(self, scoring_matrix_ser_thr_url = config["urls"]["scoring_matrix_ser_thr"] download_file_to_resource(scoring_matrix_ser_thr_url, 'default_scoring_matrix_ser_thr.csv.gz') scoring_matrix_ser_thr = get_scoring_matrix_ser_thr() - - + if scoring_matrix_tyr is None: scoring_matrix_tyr = get_scoring_matrix_tyr() if scoring_matrix_tyr is None: @@ -102,30 +106,36 @@ def __init__(self, self.pssm_ser_thr = pssm_ser_thr self.pssm_tyr = pssm_tyr - self.scoring_matrix_ser_thr = {col: scoring_matrix_ser_thr[col].to_list() for col in scoring_matrix_ser_thr} + self.scoring_matrix_ser_thr = { + col: scoring_matrix_ser_thr[col].to_list() for col in scoring_matrix_ser_thr + } self.scoring_matrix_ser_thr_length = len(scoring_matrix_ser_thr) - self.scoring_matrix_tyr = {col: scoring_matrix_tyr[col].to_list() for col in scoring_matrix_tyr} + self.scoring_matrix_tyr = { + col: scoring_matrix_tyr[col].to_list() for col in scoring_matrix_tyr + } self.scoring_matrix_tyr_length = len(scoring_matrix_tyr) def __repr__(self): return "" - def get_score(self, - sequence: str, - phospho_priming: bool = False, - favorability: bool = False, - method: str = 'avg') -> Score: + def get_score( + self, + sequence: str, + phospho_priming: bool = False, + favorability: bool = False, + method: str = "avg", + ) -> Score: """ Checks the validation format and it's validity. - Computes the scores, the logarithmised scores, and ranks the kinases based on the 82,755 Ser/Thr phosphosites matrix. + Computes the scores, the logarithmic scores, and ranks the kinases based on the 82,755 Ser/Thr phosphosites matrix. Parameters ---------- sequence: str A string representing a peptide validation of the 20 natural aminoacids, plus pT and pY at 9 positions surrounding a Ser/Thr phospho-acceptor. phospho_priming: bool, default False - Enable/Disable the phospo-priming option. + Enable/Disable the phospo-priming option. favorability: bool, default False Considers the favorability towards either Ser or Thr phospho-acceptor. method: str, default 'avg' @@ -158,7 +168,7 @@ def get_score(self, Kinase ranking >>> result.ranking score log_score percentile_score - kinase + kinase NLK 8.432319 3.075929 93.394 JNK3 19.764808 4.304862 92.232 CDK4 12.719801 3.669004 91.457 @@ -184,7 +194,7 @@ def get_score(self, Top 15 kinases >>> result.top(15) score log_score percentile_score - kinase + kinase NLK 8.432319 3.075929 93.394 JNK3 19.764808 4.304862 92.232 CDK4 12.719801 3.669004 91.457 @@ -203,14 +213,16 @@ def get_score(self, """ if len(sequence) < 3: - raise ValueError(f"Invalid sequence") + raise ValueError("Invalid sequence") - if not method in ['min', 'max', 'avg', 'all']: - raise ValueError(f"Method {method} is not supported. Supported methods: 'min', 'max', 'avg', 'all'") + if method not in ["min", "max", "avg", "all"]: + raise ValueError( + f"Method {method} is not supported. Supported methods: 'min', 'max', 'avg', 'all'" + ) if not phospho_priming: sequence = sequence.upper() - + sequence_object = get_sequence_object(sequence) sequence_object.preprocess_sequence() sequence_object.validate_sequence() @@ -225,21 +237,23 @@ def get_score(self, scoring_matrix = self.scoring_matrix_tyr scoring_matrix_len = self.scoring_matrix_tyr_length case _: - raise ValueError(f"Invalid sequence type") + raise ValueError("Invalid sequence type") scores_results = sequence_object.get_sequence_scores(pssm, favorability) match method: - case 'min': + case "min": scores_results = pd.concat(scores_results) scores_results = [scores_results.groupby(scores_results.index).min()] - case 'max': + case "max": scores_results = pd.concat(scores_results) scores_results = [scores_results.groupby(scores_results.index).max()] - case 'avg': + case "avg": scores_results = [ - reduce(lambda df1, df2: df1.add(df2, fill_value=0), scores_results) / len(scores_results)] - case 'all': + reduce(lambda df1, df2: df1.add(df2, fill_value=0), scores_results) + / len(scores_results) + ] + case "all": pass for table in scores_results: @@ -247,29 +261,36 @@ def get_score(self, percentiles = [] for kinase in table.index: - percentile = (bisect.bisect_left(scoring_matrix[kinase], table.log_score[kinase]) + 1) * ( - 100 / scoring_matrix_len) + percentile = ( + bisect.bisect_left(scoring_matrix[kinase], table.log_score[kinase]) + + 1 + ) * (100 / scoring_matrix_len) percentiles.append(percentile) table.insert(2, "percentile_score", percentiles) - table.sort_values("percentile_score", - ascending=False, inplace=True) + table.sort_values("percentile_score", ascending=False, inplace=True) return Score(sequence_object, scores_results) - def get_enrichment(self, input_sites: pd.DataFrame, fc_threshold: float = 1.5, phospho_priming: bool = False, - favorability: bool = False, method: str = 'avg'): + def get_enrichment( + self, + input_sites: pd.DataFrame, + fc_threshold: float = 1.5, + phospho_priming: bool = False, + favorability: bool = False, + method: str = "avg", + ): """ - Counts the number of up/down/unregulated phosphosite sequences. + Counts the number of up/down/unregulated phosphosite sequences. Using one-sided Fisher exact test determines the kinase enrichment. - Vulcano plot of the side displaying significant enrichment for each kinase vs the corresponding p-value. + Vulcano plot of the side displaying significant enrichment for each kinase vs the corresponding p-value. Parameters ---------- input_sites: pd.DataFrame - A DataFrame containing the phosphosite sequences in the first column and the logarithmised Fold Change in the second column. + A DataFrame containing the phosphosite sequences in the first column and the logarithmised Fold Change in the second column. fc_threshold: float, default 1.5 - A given threshold to check the regulation. By default the threshold is 1.5, but it can be adjusted based on the displayed plots. + A given threshold to check the regulation. By default the threshold is 1.5, but it can be adjusted based on the displayed plots. Returns ------- @@ -305,7 +326,7 @@ def get_enrichment(self, input_sites: pd.DataFrame, fc_threshold: float = 1.5, p Enrichment table >>> result.enrichment_table upregulated downregulated unregulated upregulated_enrichment_value ... - kinase + kinase AAK1 0 8.0 3.0 0.472527 ... ACVR2A 0 15.0 6.0 0.218935 ... ACVR2B 0 13.0 5.0 0.272727 ... @@ -338,29 +359,41 @@ def get_enrichment(self, input_sites: pd.DataFrame, fc_threshold: float = 1.5, p [108 rows x 3 columns] - The list of the phosphosites that did't pass the valudation + The list of the phosphosites that didn't pass the validation >>> result.failed_sites ['LQVKIPSKEEEsAD'] """ - if not method in ['min', 'max', 'avg']: - raise ValueError(f"Method {method} is not supported. Supported methods: 'min', 'max', 'avg'") - - input_sites_copy = input_sites.copy() - input_sites_copy.iloc[:, 0] = input_sites_copy.iloc[:, 0].astype(str).str.replace('(ub)', '', - regex=False).str.replace( - '(ox)', '', regex=False).str.replace('(ac)', '', regex=False).str.replace('(de)', '', regex=False) - - ser_thr_enrichment = Enrichment(SequenceType.SER_THR, set(self.pssm_ser_thr.index)) + if method not in ["min", "max", "avg"]: + raise ValueError( + f"Method {method} is not supported. Supported methods: 'min', 'max', 'avg'" + ) + + input_sites_copy = input_sites.copy(deep=True) + input_sites_copy.iloc[:, 0] = ( + input_sites_copy.iloc[:, 0] + .astype(str) + .str.replace("(ub)", "", regex=False) + .str.replace("(ox)", "", regex=False) + .str.replace("(ac)", "", regex=False) + .str.replace("(de)", "", regex=False) + ) + + ser_thr_enrichment = Enrichment( + SequenceType.SER_THR, set(self.pssm_ser_thr.index) + ) tyr_enrichment = Enrichment(SequenceType.TYR, set(self.pssm_tyr.index)) failed_sites = [] for id in range(len(input_sites_copy)): try: - score_result = self.get_score(sequence=str( - input_sites_copy.iloc[id, 0]), phospho_priming=phospho_priming, favorability=favorability, - method=method) + score_result = self.get_score( + sequence=str(input_sites_copy.iloc[id, 0]), + phospho_priming=phospho_priming, + favorability=favorability, + method=method, + ) except ValueError: failed_sites.append(input_sites_copy.iloc[id, 0]) continue @@ -371,6 +404,7 @@ def get_enrichment(self, input_sites: pd.DataFrame, fc_threshold: float = 1.5, p case SequenceType.TYR: enrichment_object = tyr_enrichment case _: + failed_sites.append(input_sites_copy.iloc[id, 0]) continue @@ -389,9 +423,25 @@ def get_enrichment(self, input_sites: pd.DataFrame, fc_threshold: float = 1.5, p enrichment_object.regulation_list.append(regulation) enrichment_object.top15_kinases_list.append(",".join(top15_kinases)) - enrichment_object.enrichment_table = pd.concat([enrichment_object.enrichment_table, pd.DataFrame( - {"kinase": top15_kinases, regulation: np.ones(len(top15_kinases))})]).groupby('kinase').sum( - numeric_only=False).reset_index() + # To avoid the warning here: + # Enrichment table sometimes is empty + # Concatenation with empty or all-NA entries is deprecated + enrichment_object.enrichment_table = ( + pd.concat( + [ + enrichment_object.enrichment_table, + pd.DataFrame( + { + "kinase": top15_kinases, + regulation: np.ones(len(top15_kinases)), + } + ), + ] + ) + .groupby("kinase") + .sum(numeric_only=False) + .reset_index() + ) # Background adjustment ser_thr_enrichment.adjust_background_sites() @@ -400,4 +450,4 @@ def get_enrichment(self, input_sites: pd.DataFrame, fc_threshold: float = 1.5, p ser_thr_enrichment.fisher_statistics() tyr_enrichment.fisher_statistics() - return EnrichmentResults(ser_thr_enrichment, tyr_enrichment, failed_sites) \ No newline at end of file + return EnrichmentResults(ser_thr_enrichment, tyr_enrichment, failed_sites) diff --git a/src/kinex/resources/__init__.py b/src/kinex/resources/__init__.py index a2f0397..1a55af5 100644 --- a/src/kinex/resources/__init__.py +++ b/src/kinex/resources/__init__.py @@ -3,59 +3,54 @@ import pandas as pd def get_pssm_ser_thr() -> pd.DataFrame: - with resources.path("kinex.resources", "pssm_table_ser_thr.csv") as df: + with resources.files("kinex.resources").joinpath("pssm_table_ser_thr.csv").open() as df: return pd.read_csv(df, index_col=0) def get_pssm_tyr() -> pd.DataFrame: - with resources.path("kinex.resources", "pssm_table_tyr.csv") as df: + with resources.files("kinex.resources").joinpath("pssm_table_tyr.csv").open() as df: return pd.read_csv(df, index_col=0) def get_ser_thr_family() -> dict: - with resources.path("kinex.resources", "ser_thr_family.json") as file_path: - with open(file_path) as json_file: - return json.load(json_file) + with resources.files("kinex.resources").joinpath("ser_thr_family.json").open() as json_file: + return json.load(json_file) def get_ser_thr_family_colors() -> dict: - with resources.path("kinex.resources", "ser_thr_family_colors.json") as file_path: - with open(file_path) as json_file: - return json.load(json_file) + with resources.files("kinex.resources").joinpath("ser_thr_family_colors.json").open() as json_file: + return json.load(json_file) def get_tyr_family() -> dict: - with resources.path("kinex.resources", "tyr_family.json") as file_path: - with open(file_path) as json_file: - return json.load(json_file) + with resources.files("kinex.resources").joinpath("tyr_family.json").open() as json_file: + return json.load(json_file) def get_tyr_family_colors() -> dict: - with resources.path("kinex.resources", "tyr_family_colors.json") as file_path: - with open(file_path) as json_file: - return json.load(json_file) + with resources.files("kinex.resources").joinpath("tyr_family_colors.json").open() as json_file: + return json.load(json_file) def get_experiments() -> dict: - with resources.path("kinex.resources", "experiments.json") as file_path: - with open(file_path) as json_file: - return json.load(json_file) + with resources.files("kinex.resources").joinpath("experiments.json").open() as json_file: + return json.load(json_file) def get_scoring_matrix_ser_thr() -> pd.DataFrame: try: - with resources.path("kinex.resources", "default_scoring_matrix_ser_thr.csv.gz") as file_path: + with resources.files("kinex.resources").joinpath("default_scoring_matrix_ser_thr.csv.gz").open('rb') as file_path: return pd.read_csv(file_path, compression='gzip') except FileNotFoundError: return None def get_scoring_matrix_tyr() -> pd.DataFrame: try: - with resources.path("kinex.resources", "default_scoring_matrix_tyr.csv.gz") as file_path: + with resources.files("kinex.resources").joinpath("default_scoring_matrix_tyr.csv.gz").open('rb') as file_path: return pd.read_csv(file_path, compression='gzip') except FileNotFoundError: return None def get_configuration_file() -> dict: with resources.files("kinex.resources").joinpath("config.json").open() as json_file: - return json.load(json_file) \ No newline at end of file + return json.load(json_file) diff --git a/src/kinex/score.py b/src/kinex/score.py index fb12bac..3fe3d47 100644 --- a/src/kinex/score.py +++ b/src/kinex/score.py @@ -9,7 +9,7 @@ class Score: ---------- sequence : str A string representing a validation of aminoacids - ranking : pandas.DataFrame + ranking : list of pandas.DataFrame containing scores, log2(scores) and percentiles for each kinase median_percentile : int Median value of all percentile scores for a validation diff --git a/src/kinex/sequence.py b/src/kinex/sequence.py index a7e6727..4348d5f 100644 --- a/src/kinex/sequence.py +++ b/src/kinex/sequence.py @@ -8,6 +8,8 @@ def is_central_sequence_valid(sequence_string: str) -> bool: + if not isinstance(sequence_string, str): + return False if sequence_string[len(sequence_string) // 2] not in ("S", "T", "Y", "s", "t", "y"): return False for aminoacid in sequence_string: @@ -17,6 +19,8 @@ def is_central_sequence_valid(sequence_string: str) -> bool: def is_separator_sequence_valid(sequence_string: str, separator: str) -> bool: + if separator not in [sep.value for sep in SequenceSeparator]: + return False if separator * 2 in sequence_string: return False @@ -30,7 +34,7 @@ def is_separator_sequence_valid(sequence_string: str, separator: str) -> bool: return True -def get_valid_patterns(separator): +def get_valid_patterns(separator: str): return [ f"S{separator}", f"T{separator}", @@ -44,6 +48,7 @@ def get_valid_patterns(separator): class SequenceSeparator(Enum): ASTERISK = "*" PH = "(ph)" + PH_CAPITAL = "(PH)" class SequenceType(Enum): diff --git a/src/kinex/table2x2.py b/src/kinex/table2x2.py index 3746f00..863c6db 100644 --- a/src/kinex/table2x2.py +++ b/src/kinex/table2x2.py @@ -27,6 +27,7 @@ def __init__(self, table, shift_zeros): if self.shift_zeros: if 0 in self.table: self.table = np.add(self.table, 0.5) + self.check_zeros() def __repr__(self): return f'{self.table}' @@ -60,8 +61,11 @@ def shift_zeros(self, value): raise ValueError("Wrong shift_zeros format") def odds_ratio(self) -> float: - return (self.table[0][0] * self.table[1][1] / (self.table[0][1] * self.table[1][0])) def p_val(self, mode) -> float: return fisher_exact(self.table, alternative=mode).pvalue + + def check_zeros(self) -> None: + if not self.shift_zeros and 0 in self.table: + raise ValueError("Table contains zeros and shift_zeros is set to False") diff --git a/tests/data/__init__.py b/tests/data/__init__.py index 9ef039d..1716727 100644 --- a/tests/data/__init__.py +++ b/tests/data/__init__.py @@ -1,18 +1,20 @@ import pandas as pd from importlib_resources import files, as_file + def get_test_pssm() -> pd.DataFrame: - source = files("kinex.tests.data").joinpath('test_pssm_table.csv') + source = files("tests.data").joinpath("test_pssm_table.csv") with as_file(source) as df: return pd.read_csv(df, index_col=0) - + + def get_test_scoring_matrix() -> pd.DataFrame: - source = files("kinex.tests.data").joinpath('test_scoring_matrix.csv') + source = files("tests.data").joinpath("test_scoring_matrix.csv") with as_file(source) as df: return pd.read_csv(df, index_col=0) - + + def get_test_input_sites() -> pd.DataFrame: - source = files("kinex.tests.data").joinpath('test_input_sites.csv') + source = files("tests.data").joinpath("test_input_sites.csv") with as_file(source) as df: return pd.read_csv(df) - diff --git a/tests/test_enrichment.py b/tests/test_enrichment.py new file mode 100644 index 0000000..767f419 --- /dev/null +++ b/tests/test_enrichment.py @@ -0,0 +1,150 @@ +import unittest + +from kinex.kinex import Kinex +import pandas as pd +from tests.data import get_test_input_sites + + +class TestEnrichment(unittest.TestCase): + def setUp(self): + self.kinex = Kinex() + self.enrich = self.kinex.get_enrichment( + get_test_input_sites(), + fc_threshold=1.5, + phospho_priming=False, + favorability=True, + method="max", + ) + + def test_total_upregulated(self): + self.assertEqual(self.enrich.ser_thr.total_upregulated, 1) + + def test_total_downregulated(self): + self.assertEqual(self.enrich.ser_thr.total_downregulated, 0) + + def test_total_unregulated(self): + self.assertEqual(self.enrich.ser_thr.total_unregulated, 4) + + def test_enrichment_table_structure(self): + expected_columns = [ + "upregulated", + "downregulated", + "unregulated", + "upregulated_enrichment_value", + "upregulated_enrichment_value_log2", + "upregulated_p_value", + "upregulated_p_value_log10_abs", + "upregulated_adjusted_p_value", + "upregulated_adjusted_p_value_log10_abs", + "downregulated_enrichment_value", + "downregulated_enrichment_value_log2", + "downregulated_p_value", + "downregulated_p_value_log10_abs", + "downregulated_adjusted_p_value", + "downregulated_adjusted_p_value_log10_abs", + "dominant_direction", + "dominant_enrichment_value_log2", + "dominant_p_value_log10_abs", + "dominant_adjusted_p_value_log10_abs", + ] + self.assertListEqual( + list(self.enrich.ser_thr.enrichment_table.columns), expected_columns + ) + + def test_adjust_background_sites(self): + # Test when total_unregulated is zero + self.enrich.ser_thr.total_upregulated = 4 + self.enrich.ser_thr.total_downregulated = 2 + self.enrich.ser_thr.total_unregulated = 0 + self.enrich.ser_thr.adjust_background_sites() + self.assertEqual(self.enrich.ser_thr.total_unregulated, 1) + + # Test when total_unregulated is not zero + self.enrich.ser_thr.total_unregulated = 3 + self.enrich.ser_thr.adjust_background_sites() + self.assertEqual(self.enrich.ser_thr.total_unregulated, 3) + + def test_calculate_enrichment_for_row(self): + # Test case 1: No unregulated hits + self.enrich.ser_thr.enrichment_table = pd.DataFrame( + { + "kinase": ["kinase1"], + "upregulated": [1], + "downregulated": [0], + "unregulated": [0], # 1 + } + ) + + self.enrich.ser_thr._calculate_enrichment_for_row(0) + self.assertEqual( + self.enrich.ser_thr.enrichment_table.loc[0, "upregulated_enrichment_value"], + 7, + ) + + # Test case 2: No upregulated hits + self.enrich.ser_thr.enrichment_table = pd.DataFrame( + { + "kinase": ["kinase2"], + "upregulated": [0], + "downregulated": [1], + "unregulated": [0], + } + ) + + self.enrich.ser_thr._calculate_enrichment_for_row(0) + self.assertEqual( + self.enrich.ser_thr.enrichment_table.loc[0, "upregulated_enrichment_value"], + 0, + ) + self.assertEqual( + self.enrich.ser_thr.enrichment_table.loc[0, "upregulated_p_value"], 1 + ) + + # Test case 3: No unregulated and no downregulatedhits + self.enrich.ser_thr.enrichment_table = pd.DataFrame( + { + "kinase": ["kinase5"], + "upregulated": [0], + "downregulated": [0], + "unregulated": [0], + } + ) + + self.enrich.ser_thr._calculate_enrichment_for_row(0) + self.assertEqual( + self.enrich.ser_thr.enrichment_table.loc[ + 0, "downregulated_enrichment_value" + ], + 0, + ) + + def test_determine_dominant_direction(self): + self.enrich.ser_thr.enrichment_table = pd.DataFrame( + { + "kinase": ["kinase1"], + "upregulated_enrichment_value": [2], + "downregulated_enrichment_value": [1], + "upregulated_enrichment_value_log2": [1], + "downregulated_enrichment_value_log2": [0.5], + "upregulated_p_value_log10_abs": [1], + "downregulated_p_value_log10_abs": [0.5], + "dominant_direction": [""], + } + ) + self.enrich.ser_thr._determine_dominant_direction(0) + self.assertEqual( + self.enrich.ser_thr.enrichment_table.loc[0, "dominant_direction"], + "upregulated set", + ) + + def test_reindex_missing_kinases(self): + self.enrich.ser_thr.enrichment_table = pd.DataFrame( + {"kinase": ["kinase1"]} + ).set_index("kinase") + self.enrich.ser_thr.all_kinases = {"kinase1", "kinase2"} + self.enrich.ser_thr._reindex_missing_kinases() + self.assertIn("kinase2", self.enrich.ser_thr.enrichment_table.index) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_io.py b/tests/test_io.py index 10aa338..3f624ea 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -1,20 +1,166 @@ import unittest +from unittest.mock import mock_open, patch, MagicMock +from pathlib import Path +import requests + +import numpy as np + +from kinex.functions import ( + check_sequence, + get_sequence_format, + download_file_to_resource, + get_distances, +) -from kinex.functions import check_sequence, get_sequence_format class TestIO(unittest.TestCase): def test_get_sequence_format(self): - self.assertEqual(get_sequence_format('PSVEPPLs*QETFSDL'), '*') - self.assertEqual(get_sequence_format('PSVEXPLs*QXTF___'), '*') - self.assertEqual(get_sequence_format('PSVEPPLsQETFSDL'), 'central') - self.assertEqual(get_sequence_format('PSVEPPLs(ph)QETFSDL'), '(ph)') - self.assertEqual(get_sequence_format('LQVKIPSKEEEsAD'), 'unsupported') + self.assertEqual(get_sequence_format("PSVEPPLs*QETFSDL"), "*") + self.assertEqual(get_sequence_format("PSVEXPLs*QXTF___"), "*") + self.assertEqual(get_sequence_format("PSVEPPLsQETFSDL"), "central") + self.assertEqual(get_sequence_format("PSVEPPLs(ph)QETFSDL"), "(ph)") + self.assertEqual(get_sequence_format("LQVKIPSKEEEsAD"), "unsupported") def test_check_sequence(self): - self.assertEqual(check_sequence('PSVEPPLs*QETFSDL', sequence_format='*'), True) - self.assertEqual(check_sequence('PSVEXPLs*QXTF___', sequence_format='*'), True) - self.assertEqual(check_sequence('PSVEPPLsQETFSDL', sequence_format='central'), True) + self.assertEqual(check_sequence("PSVEPPLs*QETFSDL", sequence_format="*"), True) + self.assertEqual(check_sequence("PSVEXPLs*QXTF___", sequence_format="*"), True) + self.assertEqual( + check_sequence("PSVEPPLsQETFSDL", sequence_format="central"), True + ) + + def test_valid_input(self): + experiment1 = { + "dominant_enrichment_value_log2": [1, 2, 3], + "dominant_p_value_log10_abs": [4, 5, 6], + } + experiment2 = { + "dominant_enrichment_value_log2": [1, 1, 1], + "dominant_p_value_log10_abs": [4, 4, 4], + } + expected = np.sqrt(np.array([0, 1, 4]) + np.array([0, 1, 4])) + result = get_distances(experiment1, experiment2) + np.testing.assert_array_almost_equal(result, expected) + + def test_different_shapes(self): + experiment1 = { + "dominant_enrichment_value_log2": [1, 2], + "dominant_p_value_log10_abs": [4, 5], + } + experiment2 = { + "dominant_enrichment_value_log2": [1, 1, 1], + "dominant_p_value_log10_abs": [4, 4, 4], + } + with self.assertRaises(ValueError): + get_distances(experiment1, experiment2) + + def test_missing_key(self): + experiment1 = {"dominant_enrichment_value_log2": [1, 2, 3]} + experiment2 = { + "dominant_enrichment_value_log2": [1, 1, 1], + "dominant_p_value_log10_abs": [4, 4, 4], + } + with self.assertRaises(ValueError): + get_distances(experiment1, experiment2) + + def test_empty_input(self): + experiment1 = { + "dominant_enrichment_value_log2": [], + "dominant_p_value_log10_abs": [], + } + experiment2 = { + "dominant_enrichment_value_log2": [], + "dominant_p_value_log10_abs": [], + } + result = get_distances(experiment1, experiment2) + np.testing.assert_array_equal(result, np.array([])) + + @patch("requests.get") + @patch("kinex.functions.resources.path") + @patch("builtins.open", new_callable=mock_open) + def test_download_file_to_resource_success( + self, mock_open, mock_resources_path, mock_requests_get + ): + # Mock the path returned by resources.path as a Path object + mock_file_path = MagicMock() + mock_file_path.__enter__.return_value = Path("/mocked/path/to/resource.csv.gz") + mock_resources_path.return_value = mock_file_path + + # Mock the requests.get response + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.content = b"mocked file content" + mock_requests_get.return_value = mock_response + + # Call the function to test + download_file_to_resource("http://example.com/file.csv.gz", "resource.csv.gz") + + # Assertions + mock_requests_get.assert_called_once_with( + "http://example.com/file.csv.gz", stream=True, timeout=10 + ) + mock_resources_path.assert_called_once_with( + "kinex.resources", "resource.csv.gz" + ) + + # Verify the open() method was called to write the content + mock_open.assert_called_once_with(Path("/mocked/path/to/resource.csv.gz"), "wb") + mock_open().write.assert_called_once_with(b"mocked file content") + + # Download Failure Due to Network Issues + @patch("requests.get") + @patch("kinex.functions.resources.path") + @patch("builtins.open", new_callable=mock_open) + def test_download_file_to_resource_network_failure( + self, mock_open, mock_resources_path, mock_requests_get + ): + # Mock the path returned by resources.path + mock_file_path = MagicMock() + mock_file_path.__enter__.return_value = Path("/mocked/path/to/resource.csv.gz") + mock_resources_path.return_value = mock_file_path + + # Simulate a network error during requests.get + mock_requests_get.side_effect = requests.exceptions.ConnectionError( + "Network error" + ) + + # Assert that the function raises an exception + with self.assertRaises(requests.exceptions.ConnectionError): + download_file_to_resource( + "http://example.com/file.csv.gz", "resource.csv.gz" + ) + + # Ensure no file operations occurred + mock_open.assert_not_called() + mock_resources_path.assert_called_once_with( + "kinex.resources", "resource.csv.gz" + ) + + # Test the case where Invalid URL is passed + @patch("requests.get") + @patch("kinex.functions.resources.path") + @patch("builtins.open", new_callable=mock_open) + def test_download_file_to_resource_invalid_url( + self, mock_open, mock_resources_path, mock_requests_get + ): + # Mock the path returned by resources.path + mock_file_path = MagicMock() + mock_file_path.__enter__.return_value = Path("/mocked/path/to/resource.csv.gz") + mock_resources_path.return_value = mock_file_path + + # Simulate a requests.exceptions.MissingSchema exception + mock_requests_get.side_effect = requests.exceptions.MissingSchema("Invalid URL") + + # Assert that a ValueError is raised + with self.assertRaises(ValueError): + download_file_to_resource("invalid_url", "resource.csv.gz") + + # Ensure no file operations occurred + mock_open.assert_not_called() + mock_resources_path.assert_called_once_with( + "kinex.resources", "resource.csv.gz" + ) + -if __name__ == '__main__': - unittest.main() \ No newline at end of file +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_kinex.py b/tests/test_kinex.py index 5a86885..6672d37 100644 --- a/tests/test_kinex.py +++ b/tests/test_kinex.py @@ -1,23 +1,54 @@ import unittest -import pandas as pd -from kinex.kinex import Kinex +from kinex import Kinex + +from kinex.resources import get_pssm_ser_thr, get_pssm_tyr + +from tests.data import get_test_input_sites -from kinex.tests.data import get_test_input_sites, get_test_pssm, get_test_scoring_matrix class TestKinex(unittest.TestCase): + def setUp(self): + self.kinex = Kinex() + + def test_sequence_initialization(self): + with self.assertRaises(ValueError): + self.kinex.get_score("GRNSLPVQA") + + with self.assertRaises(ValueError): + self.kinex.get_score("GRNSLPVQAI") + + with self.assertRaises(ValueError): + self.kinex.get_score("GRNSL*PVQAI") + + with self.assertRaises(ValueError): + self.kinex.get_score("GRNSL(ph)PVQAI") + + with self.assertRaises(ValueError): + self.kinex.get_score("SS") + + # with self.assertRaises(ValueError): + # self.kinex.get_score("APQST*PAB") + # B is not allowed + # Should be ValueError and invalid seq, but it is KeyError + + self.kinex.get_score("GRNS*PVQA") + self.kinex.get_score("GRNS(ph)PVQA", phospho_priming=False) def test_scoring(self): - kinex = Kinex(scoring_matrix=get_test_scoring_matrix(), pssm=get_test_pssm()) - result = kinex.get_score('PSVEPPLs*QETFSDL') + result = self.kinex.get_score( + "GRNSLs*PVQA", phospho_priming=False, favorability=False, method="all" + ) + self.assertEqual(len(result.ranking[0]), len(get_pssm_ser_thr())) + self.assertEqual(len(result.ranking[0].columns), 3) - self.assertEqual(len(result.ranking), 2) - self.assertEqual(len(result.ranking.columns), 3) - def test_enrichment(self): - kinex = Kinex(scoring_matrix=get_test_scoring_matrix(), pssm=get_test_pssm()) - input_sites = get_test_input_sites() - result = kinex.get_enrichment(input_sites=input_sites) + result = self.kinex.get_enrichment(get_test_input_sites()) + self.assertEqual(len(result.ser_thr.enrichment_table), len(get_pssm_ser_thr())) + self.assertEqual(len(result.ser_thr.enrichment_table.columns), 19) + self.assertEqual(len(result.tyr.enrichment_table), len(get_pssm_tyr())) + self.assertEqual(len(result.failed_sites), 1) + - self.assertEqual(len(result.enrichment_table), 2) - self.assertEqual(len(result.enrichment_table.columns), 19) \ No newline at end of file +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_score.py b/tests/test_score.py new file mode 100644 index 0000000..639cb16 --- /dev/null +++ b/tests/test_score.py @@ -0,0 +1,65 @@ +import unittest +import pandas as pd +from kinex.score import Score +import numpy as np + +from kinex.sequence import get_sequence_object + + +class TestScore(unittest.TestCase): + + def setUp(self): + + self.sequence_object = get_sequence_object("QSTPQ") + # ['-1S', '-2Q', '1P', '2Q'] + # self.scores_results = self.sequence_object.get_sequence_scores(get_test_pssm()) # With two kinase AAK1 ACVR2A + # {'AAK1': 2.463825 = 0.9257 * 2.5792 * 0.7872 * 1.3109 , 'ACVR2A': 0.281407 = 0.7687 * 1.0702 * 0.3404 * 1.0049} + + data1 = { + "kinase": ["AAK1", "ACVR2A"], + "score": [2.463825, 0.281407], + "log_score": [1.300900, -1.829272], + "percentile_score": [92.818561, 50.004229], + } + + data2 = { + "kinase": ["BRAF", "CDK2"], + "score": [1.234567, 3.456789], + "log_score": [0.567890, 1.234567], + "percentile_score": [75.123456, 85.678901], + } + + self.scores_results = [ + pd.DataFrame(data1).set_index("kinase"), + pd.DataFrame(data2).set_index("kinase"), + ] + self.score_object = Score(self.sequence_object, self.scores_results) + + def test_median_percentile(self): + expected_median1 = np.array([92.818561, 50.004229]).mean() + expected_median2 = np.array([75.123456, 85.678901]).mean() + self.assertAlmostEqual( + self.score_object.median_percentile[0], expected_median1, places=6 + ) + self.assertAlmostEqual( + self.score_object.median_percentile[1], expected_median2, places=6 + ) + + def test_promiscuity_index(self): + self.assertEqual(self.score_object.promiscuity_index(limit=80), [1, 1]) + self.assertEqual(self.score_object.promiscuity_index(limit=49), [2, 2]) + self.assertEqual(self.score_object.promiscuity_index(limit=93), [0, 0]) + + def test_top(self): + result = self.score_object.top(number=1) + self.assertEqual(len(result[0]), 1) + self.assertEqual(result[0].index[0], "AAK1") + self.assertEqual(len(result[1]), 1) + self.assertEqual(result[1].index[0], "BRAF") + + def test_repr(self): + self.assertEqual(repr(self.score_object), "Scoring results for QSTPQ") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_sequence.py b/tests/test_sequence.py new file mode 100644 index 0000000..18c9e88 --- /dev/null +++ b/tests/test_sequence.py @@ -0,0 +1,226 @@ +import unittest +import pandas as pd +from kinex.sequence import ( + CentralSequence, + SeparatorSequence, + get_sequence_type, + get_sequence_object, + SequenceSeparator, + SequenceType, + is_central_sequence_valid, + is_separator_sequence_valid, + get_score, +) + + +class TestSequenceProcessing(unittest.TestCase): + def setUp(self): + # Sample PSSM DataFrame for testing + self.pssm_data = { + "kinase": ["AAK1", "ACVR2A"], + "-5P": [1.2242, 0.7057], + "-5G": [0.4165, 0.8178], + } + self.pssm = pd.DataFrame(self.pssm_data).set_index("kinase") + + def test_is_central_sequence_valid(self): + # Test valid sequences + valid_sequence = "APQSTPQPA" + self.assertTrue(is_central_sequence_valid(valid_sequence)) + + # Test invalid sequences + invalid_sequence = "APQPPQPA" + self.assertFalse(is_central_sequence_valid(invalid_sequence)) + + # Test not a string + invalid_sequence = {"APQSTPQPA"} + self.assertFalse(is_central_sequence_valid(invalid_sequence)) + + def test_is_separator_sequence_valid(self): + # Test valid separator sequence + valid_sequence = "APQ*S*TPQ*PA" + separator = "*" + self.assertTrue(is_separator_sequence_valid(valid_sequence, separator)) + + # Test invalid separator sequence + invalid_sequence = "APQ**STPQ*PA" + self.assertFalse(is_separator_sequence_valid(invalid_sequence, separator)) + + # Test invalid separator + invalid_sequence = "APQ#STPQ#PA" + separator = "#" + self.assertFalse(is_separator_sequence_valid(invalid_sequence, separator)) + + # Test consecutive separator + invalid_sequence = "APQ**STPQ*PA" + separator = "*" + self.assertFalse(is_separator_sequence_valid(invalid_sequence, separator)) + + # Test invalid patterns + invalid_sequence = "APQSTPQPA" + separator = "*" + self.assertFalse(is_separator_sequence_valid(invalid_sequence, separator)) + + def test_get_sequence_type(self): + # Test valid sequence types + self.assertEqual(get_sequence_type("S"), SequenceType.SER_THR) + + self.assertEqual(get_sequence_type("T"), SequenceType.SER_THR) + + self.assertEqual(get_sequence_type("Y"), SequenceType.TYR) + + # Test invalid sequence types + invalid_sequence = "APQPQPA" + with self.assertRaises(ValueError): + get_sequence_type(invalid_sequence) + + def test_get_sequence_object(self): + # Test CentralSequence object + central_sequence_T = "APQTPQP" + sequence = get_sequence_object(central_sequence_T) + self.assertIsInstance(sequence, CentralSequence) + + central_sequence_S = "APQSPQP" + sequence = get_sequence_object(central_sequence_S) + self.assertIsInstance(sequence, CentralSequence) + + central_sequence_invalid_type = "APQZPQP" + with self.assertRaises(ValueError): + sequence = get_sequence_object(central_sequence_invalid_type) + + # Test SeparatorSequence object + separator_sequence_asterisk = "APQST*PA" + sequence = get_sequence_object(separator_sequence_asterisk) + self.assertIsInstance(sequence, SeparatorSequence) + + separator_sequence_ph = "APQST(ph)PA" + sequence = get_sequence_object(separator_sequence_asterisk) + self.assertIsInstance(sequence, SeparatorSequence) + + separator_sequence_invalid_type = "APQZP*QP" + with self.assertRaises(ValueError): + sequence = get_sequence_object(separator_sequence_invalid_type) + + separator_sequence_invalid_type_ph = "APQZP(ph)QP" + with self.assertRaises(ValueError): + sequence = get_sequence_object(separator_sequence_invalid_type_ph) + + def test_get_score(self): + # Sample PSSM DataFrame + self.pssm_data = { + "kinase": ["AAK1", "ACVR2A"], + "-5P": [1.2242, 0.7057], + "-5G": [0.4165, 0.8178], + } + self.pssm = pd.DataFrame(self.pssm_data).set_index("kinase") + + # Test valid columns + columns = ["-5P", "-5G"] + expected_data = { + "kinase": ["AAK1", "ACVR2A"], + "score": [1.2242 * 0.4165, 0.7057 * 0.8178], + } + expected_df = pd.DataFrame(expected_data).set_index("kinase") + expected_df["score"] = expected_df["score"].astype(float) + + result_df = get_score(columns, self.pssm) + + pd.testing.assert_frame_equal(result_df, expected_df) + + # Test empty columns + columns = [] + expected_data = {"kinase": ["AAK1", "ACVR2A"], "score": [1, 1]} + expected_df = pd.DataFrame(expected_data).set_index("kinase") + expected_df["score"] = expected_df["score"].astype(float) + + result_df = get_score(columns, self.pssm) + + pd.testing.assert_frame_equal(result_df, expected_df) + + +class TestCentralSequence(unittest.TestCase): + def setUp(self): + self.valid_sequence = "APQATPQPA" + self.invalid_sequence = "APQPPQPA" + self.sequence_type = SequenceType.SER_THR + + def test_validate_sequence_valid(self): + central_seq = CentralSequence(self.valid_sequence, self.sequence_type) + central_seq.validate_sequence() + + def test_validate_sequence_invalid(self): + central_seq = CentralSequence(self.invalid_sequence, self.sequence_type) + with self.assertRaises(ValueError): + central_seq.validate_sequence() + + def test_get_split_sequence(self): + central_seq = CentralSequence(self.valid_sequence, self.sequence_type) + split_sequence = central_seq.get_split_sequence() + self.assertEqual(split_sequence, ["APQA", "PQPA"]) + + def test_get_sequence_scores(self): + pssm_data = { + "kinase": ["AAK1", "ACVR2A"], + "-1A": [0.5, 0.6], + "-2Q": [0.7, 0.8], + "-3P": [0.9, 1.0], + "-4A": [1.1, 1.2], + "1P": [1.3, 1.4], + "2Q": [1.5, 1.6], + "3P": [1.7, 1.8], + "4A": [1.9, 2.0], + "0T": [1.0, 1.0], + } + pssm = pd.DataFrame(pssm_data).set_index("kinase") + + # Test favorability + central_seq = CentralSequence(self.valid_sequence, self.sequence_type) + columns = ["-1A", "-2Q", "-3P", "-4A", "1P", "2Q", "3P", "4A"] + expected_data = { + "kinase": ["AAK1", "ACVR2A"], + "score": [ + 0.5 * 0.7 * 0.9 * 1.1 * 1.3 * 1.5 * 1.7 * 1.9 * 1.0, + 0.6 * 0.8 * 1.0 * 1.2 * 1.4 * 1.6 * 1.8 * 2.0 * 1.0, + ], + } + expected_df = pd.DataFrame(expected_data).set_index("kinase") + expected_df["score"] = expected_df["score"].astype(float) + + result_df = central_seq.get_sequence_scores(pssm, favorability=True) + + pd.testing.assert_frame_equal(result_df[0], expected_df) + + def test_get_column_list(self): + central_seq = CentralSequence(self.valid_sequence, self.sequence_type) + columnList = central_seq.get_columns_list() + self.assertEqual( + columnList, ["-1A", "-2Q", "-3P", "-4A", "1P", "2Q", "3P", "4A"] + ) + + +class TestSeparatorSequence(unittest.TestCase): + def setUp(self): + self.valid_sequence = "SGLAAS*AAQQQ" + self.separator = SequenceSeparator.ASTERISK + self.sequence_type = SequenceType.SER_THR + self.separator_sequence = SeparatorSequence( + self.valid_sequence, self.separator, self.sequence_type + ) + + def test_preprocess_sequence(self): + self.separator_sequence.preprocess_sequence() + self.assertEqual(self.separator_sequence.sequences, ["SGLAAs", "AAQQQ"]) + + def test_get_split_sequence(self): + split_sequence = self.separator_sequence.get_split_sequence() + self.assertEqual(split_sequence, ["SGLAA", "AAQQQ"]) + + def test_get_columnList(self): + columnList = self.separator_sequence.get_columns_list() + self.assertEqual( + columnList, ["-1A", "-2A", "-3L", "-4G", "-5S", "1A", "2A", "3Q", "4Q"] + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_table2x2.py b/tests/test_table2x2.py new file mode 100644 index 0000000..f81a2d2 --- /dev/null +++ b/tests/test_table2x2.py @@ -0,0 +1,44 @@ +import unittest +import numpy as np +from scipy.stats import fisher_exact +from kinex.table2x2 import Table2x2 + + +class TestTable2x2(unittest.TestCase): + def test_initialization_valid(self): + self.valid_table = np.array([[1, 2], [3, 4]]) + t = Table2x2(self.valid_table, shift_zeros=False) + self.assertTrue(np.array_equal(t.table, self.valid_table)) + + def test_initialization_invalid(self): + with self.assertRaises(ValueError): + self.invalid_table_1 = np.array([[1, 2, 3], [4, 5, 6]]) # Wrong dimensions + Table2x2(self.invalid_table_1, shift_zeros=False) + with self.assertRaises(ValueError): + self.invalid_table_2 = np.array([[1], [4, 5]]) # Wrong dimensions + Table2x2(self.invalid_table_2, shift_zeros=False) + with self.assertRaises(ValueError): + self.invalid_table_2 = np.array([[1, 2], [5]]) # Wrong dimensions + Table2x2(self.invalid_table_2, shift_zeros=False) + + def test_initialization_with_shift(self): + self.valid_table_with_zeros = np.array([[1, 0], [3, 0]]) # Table with zeros + with self.assertRaises(ValueError): + Table2x2(self.valid_table_with_zeros, shift_zeros=False) + Table2x2(self.valid_table_with_zeros, shift_zeros=True) + + def test_odds_ratio(self): + self.valid_table = np.array([[1, 2], [3, 4]]) + table = Table2x2(self.valid_table, shift_zeros=False) + expected_odds_ratio = (1 * 4) / (2 * 3) + self.assertAlmostEqual(table.odds_ratio(), expected_odds_ratio) + + def test_p_val(self): + self.valid_table = np.array([[1, 2], [3, 4]]) + table = Table2x2(self.valid_table, shift_zeros=False) + p_value = fisher_exact(self.valid_table, alternative="two-sided").pvalue + self.assertAlmostEqual(table.p_val("two-sided"), p_value, places=5) + + +if __name__ == "__main__": + unittest.main()