From d8010013c1f9c6496576b61b63573288cb24be89 Mon Sep 17 00:00:00 2001 From: sichao Date: Thu, 15 Feb 2024 16:05:43 -0500 Subject: [PATCH] reorganize estimation --- dynamo/estimation/deprecated.py | 119 ++++++++++++++++ dynamo/estimation/fit_jacobian.py | 58 ++++---- .../tsc/{utils_kinetic.py => ODEs.py} | 0 dynamo/estimation/tsc/__init__.py | 2 +- dynamo/estimation/tsc/estimation_kinetic.py | 128 +++++++++--------- dynamo/estimation/tsc/twostep.py | 74 +++++----- dynamo/estimation/tsc/utils_moments.py | 113 ---------------- dynamo/tools/deprecated.py | 2 +- dynamo/tools/dynamics.py | 2 +- 9 files changed, 252 insertions(+), 246 deletions(-) create mode 100644 dynamo/estimation/deprecated.py rename dynamo/estimation/tsc/{utils_kinetic.py => ODEs.py} (100%) diff --git a/dynamo/estimation/deprecated.py b/dynamo/estimation/deprecated.py new file mode 100644 index 000000000..23d629206 --- /dev/null +++ b/dynamo/estimation/deprecated.py @@ -0,0 +1,119 @@ +import warnings +import functools + +from numba import float32 # import the types +from numpy import * +from scipy.optimize import least_squares + +from .tsc.utils_moments import moments +from ..tools.sampling import lhsclassic + + +class estimation: + def __init__(self, ranges, x0=None): + self.ranges = ranges + self.n_params = len(ranges) + self.simulator = moments() + if not x0 is None: + self.simulator.x0 = x0 + + def sample_p0(self, samples=1, method="lhs"): + ret = zeros((samples, self.n_params)) + if method == "lhs": + ret = self._lhsclassic(samples) + for i in range(self.n_params): + ret[:, i] = ret[:, i] * (self.ranges[i][1] - self.ranges[i][0]) + self.ranges[i][0] + else: + for n in range(samples): + for i in range(self.n_params): + r = random.rand() + ret[n, i] = r * (self.ranges[i][1] - self.ranges[i][0]) + self.ranges[i][0] + return ret + + def _lhsclassic(self, samples): + # From PyDOE + # Generate the intervals + H = lhsclassic(samples, self.n_params) + + return H + + def get_bound(self, index): + ret = zeros(self.n_params) + for i in range(self.n_params): + ret[i] = self.ranges[i][index] + return ret + + def normalize_data(self, X): + # ret = zeros(X.shape) + # for i in range(len(X)): + # x = X[i] + # #ret[i] = x / max(x) + # ret[i] = log10(x + 1) + res = log(X + 1) + return res + + def f_lsq( + self, + params, + t, + x_data_norm, + method="analytical", + normalize=True, + experiment_type=None, + ): + self.simulator.set_params(*params) + if method == "numerical": + self.simulator.integrate(t, self.simulator.x0) + elif method == "analytical": + self.simulator.solve(t, self.simulator.x0) + if experiment_type is None: + ret = self.simulator.get_all_central_moments() + elif experiment_type == "nosplice": + ret = self.simulator.get_nosplice_central_moments() + ret = self.normalize_data(ret).flatten() if normalize else ret.flatten() + ret[isnan(ret)] = 0 + return ret - x_data_norm + + def fit_lsq( + self, + t, + x_data, + p0=None, + n_p0=1, + bounds=None, + sample_method="lhs", + method="analytical", + normalize=True, + experiment_type=None, + ): + if p0 is None: + p0 = self.sample_p0(n_p0, sample_method) + else: + if p0.ndim == 1: + p0 = [p0] + n_p0 = len(p0) + + x_data_norm = self.normalize_data(x_data) if normalize else x_data + + if bounds is None: + bounds = (self.get_bound(0), self.get_bound(1)) + + costs = zeros(n_p0) + X = [] + for i in range(n_p0): + ret = least_squares( + lambda p: self.f_lsq( + p, + t, + x_data_norm.flatten(), + method, + normalize=normalize, + experiment_type=experiment_type, + ), + p0[i], + bounds=bounds, + ) + costs[i] = ret.cost + X.append(ret.x) + i_min = argmin(costs) + return X[i_min], costs[i_min] diff --git a/dynamo/estimation/fit_jacobian.py b/dynamo/estimation/fit_jacobian.py index 5cdf5c494..3e0d2083e 100644 --- a/dynamo/estimation/fit_jacobian.py +++ b/dynamo/estimation/fit_jacobian.py @@ -98,35 +98,6 @@ def hill_act_grad( return A * n * Kd * x ** (n - 1) / (Kd + x**n) ** 2 - g -def calc_mean_squared_deviation( - func: Callable, - x_data: np.ndarray, - y_mean: np.ndarray, - y_sigm: np.ndarray, - weighted=True, -) -> float: - """Calculate the mean squared deviation of the fit. - - Args: - func: The function to evaluate. - x_data: An array of the x data. - y_mean: An array of the mean of y data. - y_sigm: An array of the standard deviation of the y data. - weighted: Whether to use weighted mean squared deviation. - - Returns: - The mean squared deviation. - """ - err = func(x_data) - y_mean - if weighted: - sig = np.array(y_sigm, copy=True) - if np.any(sig == 0): - main_warning("Some standard deviations are 0; Set to 1 instead.") - sig[sig == 0] = 1 - err /= sig - return np.sqrt(err.dot(err)) - - def fit_hill_grad( x_data: np.ndarray, y_mean: np.ndarray, @@ -329,3 +300,32 @@ def fit_hill_act_grad( pass return {"A": p_opt_min[0], "K": p_opt_min[1], "n": np.exp(p_opt_min[2]), "g": p_opt_min[3]}, msd_min + + +def calc_mean_squared_deviation( + func: Callable, + x_data: np.ndarray, + y_mean: np.ndarray, + y_sigm: np.ndarray, + weighted=True, +) -> float: + """Calculate the mean squared deviation of the fit. + + Args: + func: The function to evaluate. + x_data: An array of the x data. + y_mean: An array of the mean of y data. + y_sigm: An array of the standard deviation of the y data. + weighted: Whether to use weighted mean squared deviation. + + Returns: + The mean squared deviation. + """ + err = func(x_data) - y_mean + if weighted: + sig = np.array(y_sigm, copy=True) + if np.any(sig == 0): + main_warning("Some standard deviations are 0; Set to 1 instead.") + sig[sig == 0] = 1 + err /= sig + return np.sqrt(err.dot(err)) diff --git a/dynamo/estimation/tsc/utils_kinetic.py b/dynamo/estimation/tsc/ODEs.py similarity index 100% rename from dynamo/estimation/tsc/utils_kinetic.py rename to dynamo/estimation/tsc/ODEs.py diff --git a/dynamo/estimation/tsc/__init__.py b/dynamo/estimation/tsc/__init__.py index 732aca162..b043dda54 100644 --- a/dynamo/estimation/tsc/__init__.py +++ b/dynamo/estimation/tsc/__init__.py @@ -15,7 +15,7 @@ Mixture_KinDeg_NoSwitching, kinetic_estimation, ) -from .utils_kinetic import ( +from .ODEs import ( Deterministic, Deterministic_NoSplicing, LinearODE, diff --git a/dynamo/estimation/tsc/estimation_kinetic.py b/dynamo/estimation/tsc/estimation_kinetic.py index 441b0f548..7ca198737 100755 --- a/dynamo/estimation/tsc/estimation_kinetic.py +++ b/dynamo/estimation/tsc/estimation_kinetic.py @@ -9,70 +9,7 @@ from ...dynamo_logger import main_warning from ...tools.moments import strat_mom from ...tools.sampling import lhsclassic -from .utils_kinetic import * - - -def guestimate_alpha(x_data: np.ndarray, time: np.ndarray) -> np.ndarray: - """Roughly estimate alpha for kinetics data, which is simply the averaged ratio of new RNA and labeling time. - - Args: - x_data: A matrix representing RNA data. - time: A matrix of labeling time. - - Returns: - The estimated alpha. - """ - imax = np.argmax(x_data) - alpha = x_data[imax] / time[imax] - return alpha - - -def guestimate_gamma(x_data: np.ndarray, time: np.ndarray) -> np.ndarray: - """Roughly estimate gamma0 with the assumption that time starts at 0. - - Args: - x_data: A matrix representing RNA data. - time: A matrix of labeling time. - - Returns: - The estimated gamma. - """ - ga0 = np.clip(np.log(max(x_data[0], 0) / (x_data[-1] + 1e-6)) / time[-1], 1e-3, 1e3) - return ga0 - - -def guestimate_init_cond(x_data: np.ndarray) -> np.ndarray: - """Roughly estimate x0 for degradation data. - - Args: - x_data: A matrix representing RNA data. - - Returns: - The estimated x0. - """ - x0 = np.clip(np.max(x_data, 1), 1e-4, np.inf) - return x0 - - -def guestimate_p0_kinetic_chase(x_data: np.ndarray, time: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: - """Roughly estimated alpha, gamma and initial conditions for the kinetic chase experiment. Initail conditions are - the average abundance of labeled RNAs across all cells belonging to the initial labeling time point. - - Args: - x_data: A matrix representing RNA data. - time: A matrix of labeling time. - - Returns: - The estimated alpha0, gamma0 and x0. - """ - t0 = np.min(time) - x0 = np.mean(x_data[time == t0]) - idx = time != t0 - al0 = np.mean((x0 - x_data[idx]) / (time[idx] - t0)) - ga0 = -np.mean((np.log(x_data[idx]) - np.log(x0)) / (time[idx] - t0)) - ga0 = 1e-3 if not np.isfinite(ga0) else ga0 - x0, al0, ga0 = max(1e-3, x0), max(1e-3, al0), max(1e-3, ga0) - return al0, ga0, x0 +from .ODEs import * class kinetic_estimation: @@ -1707,3 +1644,66 @@ def calc_mean_squared_deviation(self, weighted: bool = True) -> float: if weighted: err /= sig return np.sqrt(err.dot(err)) + + +def guestimate_alpha(x_data: np.ndarray, time: np.ndarray) -> np.ndarray: + """Roughly estimate alpha for kinetics data, which is simply the averaged ratio of new RNA and labeling time. + + Args: + x_data: A matrix representing RNA data. + time: A matrix of labeling time. + + Returns: + The estimated alpha. + """ + imax = np.argmax(x_data) + alpha = x_data[imax] / time[imax] + return alpha + + +def guestimate_gamma(x_data: np.ndarray, time: np.ndarray) -> np.ndarray: + """Roughly estimate gamma0 with the assumption that time starts at 0. + + Args: + x_data: A matrix representing RNA data. + time: A matrix of labeling time. + + Returns: + The estimated gamma. + """ + ga0 = np.clip(np.log(max(x_data[0], 0) / (x_data[-1] + 1e-6)) / time[-1], 1e-3, 1e3) + return ga0 + + +def guestimate_init_cond(x_data: np.ndarray) -> np.ndarray: + """Roughly estimate x0 for degradation data. + + Args: + x_data: A matrix representing RNA data. + + Returns: + The estimated x0. + """ + x0 = np.clip(np.max(x_data, 1), 1e-4, np.inf) + return x0 + + +def guestimate_p0_kinetic_chase(x_data: np.ndarray, time: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: + """Roughly estimated alpha, gamma and initial conditions for the kinetic chase experiment. Initail conditions are + the average abundance of labeled RNAs across all cells belonging to the initial labeling time point. + + Args: + x_data: A matrix representing RNA data. + time: A matrix of labeling time. + + Returns: + The estimated alpha0, gamma0 and x0. + """ + t0 = np.min(time) + x0 = np.mean(x_data[time == t0]) + idx = time != t0 + al0 = np.mean((x0 - x_data[idx]) / (time[idx] - t0)) + ga0 = -np.mean((np.log(x_data[idx]) - np.log(x0)) / (time[idx] - t0)) + ga0 = 1e-3 if not np.isfinite(ga0) else ga0 + x0, al0, ga0 = max(1e-3, x0), max(1e-3, al0), max(1e-3, ga0) + return al0, ga0, x0 diff --git a/dynamo/estimation/tsc/twostep.py b/dynamo/estimation/tsc/twostep.py index 7f76a31cc..137c9764c 100644 --- a/dynamo/estimation/tsc/twostep.py +++ b/dynamo/estimation/tsc/twostep.py @@ -51,6 +51,43 @@ def fit_slope_stochastic( return k, 0, all_r2, all_logLL +def lin_reg_gamma_synthesis( + R: Union[csc_matrix, csr_matrix, np.ndarray], + N: Union[csc_matrix, csr_matrix, np.ndarray], + time: Union[List, csc_matrix, csr_matrix, np.ndarray], + perc_right: int = 100, +) -> Tuple: + """Under the steady-state assumption, alpha / gamma equals the total RNA. Gamma can be calculated from the slope of + total RNA and new RNA. The relationship can be expressed as: + l(t) = (1 - exp(- gamma * t)) alpha / gamma + + Args: + R: A matrix representing total RNA. Can be expression or the first moments. + N: A matrix representing new RNA. Can be expression or the first moments. + time: A matrix with time information. + perc_right: The percentile limitation to find extreme data points. + + Returns: + Gamma, R squared, the slope k, the mean of R squared and the fitted k by time and gamma. + """ + n_var = R.shape[0] + mean_R2, gamma, r2 = np.zeros(n_var), np.zeros(n_var), np.zeros(n_var) + K_list, K_fit_list = [None] * n_var, [None] * n_var + for i, r, n in tqdm( + zip(np.arange(n_var), R, N), + "Estimate gamma via linear regression of t vs. -ln(1-K)", + ): + r = r.A.flatten() if issparse(r) else r.flatten() + n = n.A.flatten() if issparse(n) else n.flatten() + + K_list[i], R2 = fit_labeling_synthesis(n, r, time, perc_right=perc_right) + gamma[i], r2[i] = compute_gamma_synthesis(K_list[i], np.unique(time)) + K_fit_list[i] = np.unique(time) * gamma[i] + mean_R2[i] = np.mean(R2) + + return gamma, r2, K_list, mean_R2, K_fit_list + + def fit_labeling_synthesis( new: Union[csc_matrix, csr_matrix, np.ndarray], total: Union[csc_matrix, csr_matrix, np.ndarray], @@ -120,40 +157,3 @@ def compute_velocity_synthesis( k = 1 - np.exp(-np.einsum("i,j->ij", t, gamma)) V = elem_prod(gamma, N) / k - elem_prod(gamma, R) return V - - -def lin_reg_gamma_synthesis( - R: Union[csc_matrix, csr_matrix, np.ndarray], - N: Union[csc_matrix, csr_matrix, np.ndarray], - time: Union[List, csc_matrix, csr_matrix, np.ndarray], - perc_right: int = 100, -) -> Tuple: - """Under the steady-state assumption, alpha / gamma equals the total RNA. Gamma can be calculated from the slope of - total RNA and new RNA. The relationship can be expressed as: - l(t) = (1 - exp(- gamma * t)) alpha / gamma - - Args: - R: A matrix representing total RNA. Can be expression or the first moments. - N: A matrix representing new RNA. Can be expression or the first moments. - time: A matrix with time information. - perc_right: The percentile limitation to find extreme data points. - - Returns: - Gamma, R squared, the slope k, the mean of R squared and the fitted k by time and gamma. - """ - n_var = R.shape[0] - mean_R2, gamma, r2 = np.zeros(n_var), np.zeros(n_var), np.zeros(n_var) - K_list, K_fit_list = [None] * n_var, [None] * n_var - for i, r, n in tqdm( - zip(np.arange(n_var), R, N), - "Estimate gamma via linear regression of t vs. -ln(1-K)", - ): - r = r.A.flatten() if issparse(r) else r.flatten() - n = n.A.flatten() if issparse(n) else n.flatten() - - K_list[i], R2 = fit_labeling_synthesis(n, r, time, perc_right=perc_right) - gamma[i], r2[i] = compute_gamma_synthesis(K_list[i], np.unique(time)) - K_fit_list[i] = np.unique(time) * gamma[i] - mean_R2[i] = np.mean(R2) - - return gamma, r2, K_list, mean_R2, K_fit_list diff --git a/dynamo/estimation/tsc/utils_moments.py b/dynamo/estimation/tsc/utils_moments.py index fd66ee1d7..00ced8c36 100755 --- a/dynamo/estimation/tsc/utils_moments.py +++ b/dynamo/estimation/tsc/utils_moments.py @@ -7,12 +7,9 @@ """ # from numba import jitclass # import the decorator -from numba import float32 # import the types from numpy import * from scipy.integrate import odeint -from scipy.optimize import least_squares -from ...tools.sampling import lhsclassic spec = [ ("a", float32), @@ -220,113 +217,3 @@ def solve(self, t, x0=None): self.x = x self.t = t return x - - -class estimation: - def __init__(self, ranges, x0=None): - self.ranges = ranges - self.n_params = len(ranges) - self.simulator = moments() - if not x0 is None: - self.simulator.x0 = x0 - - def sample_p0(self, samples=1, method="lhs"): - ret = zeros((samples, self.n_params)) - if method == "lhs": - ret = self._lhsclassic(samples) - for i in range(self.n_params): - ret[:, i] = ret[:, i] * (self.ranges[i][1] - self.ranges[i][0]) + self.ranges[i][0] - else: - for n in range(samples): - for i in range(self.n_params): - r = random.rand() - ret[n, i] = r * (self.ranges[i][1] - self.ranges[i][0]) + self.ranges[i][0] - return ret - - def _lhsclassic(self, samples): - # From PyDOE - # Generate the intervals - H = lhsclassic(samples, self.n_params) - - return H - - def get_bound(self, index): - ret = zeros(self.n_params) - for i in range(self.n_params): - ret[i] = self.ranges[i][index] - return ret - - def normalize_data(self, X): - # ret = zeros(X.shape) - # for i in range(len(X)): - # x = X[i] - # #ret[i] = x / max(x) - # ret[i] = log10(x + 1) - res = log(X + 1) - return res - - def f_lsq( - self, - params, - t, - x_data_norm, - method="analytical", - normalize=True, - experiment_type=None, - ): - self.simulator.set_params(*params) - if method == "numerical": - self.simulator.integrate(t, self.simulator.x0) - elif method == "analytical": - self.simulator.solve(t, self.simulator.x0) - if experiment_type is None: - ret = self.simulator.get_all_central_moments() - elif experiment_type == "nosplice": - ret = self.simulator.get_nosplice_central_moments() - ret = self.normalize_data(ret).flatten() if normalize else ret.flatten() - ret[isnan(ret)] = 0 - return ret - x_data_norm - - def fit_lsq( - self, - t, - x_data, - p0=None, - n_p0=1, - bounds=None, - sample_method="lhs", - method="analytical", - normalize=True, - experiment_type=None, - ): - if p0 is None: - p0 = self.sample_p0(n_p0, sample_method) - else: - if p0.ndim == 1: - p0 = [p0] - n_p0 = len(p0) - - x_data_norm = self.normalize_data(x_data) if normalize else x_data - - if bounds is None: - bounds = (self.get_bound(0), self.get_bound(1)) - - costs = zeros(n_p0) - X = [] - for i in range(n_p0): - ret = least_squares( - lambda p: self.f_lsq( - p, - t, - x_data_norm.flatten(), - method, - normalize=normalize, - experiment_type=experiment_type, - ), - p0[i], - bounds=bounds, - ) - costs[i] = ret.cost - X.append(ret.x) - i_min = argmin(costs) - return X[i_min], costs[i_min] diff --git a/dynamo/tools/deprecated.py b/dynamo/tools/deprecated.py index 850bfe003..e9aea3501 100644 --- a/dynamo/tools/deprecated.py +++ b/dynamo/tools/deprecated.py @@ -1721,7 +1721,7 @@ def param_array2dict(self, parr): } def fit_gene(self, gene_no, n_p0=10): - from ..estimation.tsc.utils_moments import estimation + from ..estimation.deprecated import estimation estm = estimation(list(self.param_ranges.values())) if self.data_u is None: diff --git a/dynamo/tools/dynamics.py b/dynamo/tools/dynamics.py index 0471c2d53..c4c13611a 100755 --- a/dynamo/tools/dynamics.py +++ b/dynamo/tools/dynamics.py @@ -28,7 +28,7 @@ from ..estimation.csc.velocity import Velocity, fit_linreg, ss_estimation from ..estimation.tsc.estimation_kinetic import * from ..estimation.tsc.twostep import fit_slope_stochastic, lin_reg_gamma_synthesis -from ..estimation.tsc.utils_kinetic import * +from ..estimation.tsc.ODEs import * from .moments import ( moments, prepare_data_deterministic,