Skip to content

Commit

Permalink
Merge pull request #662 from Sichao25/reorganize
Browse files Browse the repository at this point in the history
Reorganize estimation module
  • Loading branch information
Xiaojieqiu authored Feb 26, 2024
2 parents 165e7df + d801001 commit b4f5288
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 246 deletions.
119 changes: 119 additions & 0 deletions dynamo/estimation/deprecated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import warnings
import functools

from numba import float32 # import the types
from numpy import *
from scipy.optimize import least_squares

from .tsc.utils_moments import moments
from ..tools.sampling import lhsclassic


class estimation:
def __init__(self, ranges, x0=None):
self.ranges = ranges
self.n_params = len(ranges)
self.simulator = moments()
if not x0 is None:
self.simulator.x0 = x0

def sample_p0(self, samples=1, method="lhs"):
ret = zeros((samples, self.n_params))
if method == "lhs":
ret = self._lhsclassic(samples)
for i in range(self.n_params):
ret[:, i] = ret[:, i] * (self.ranges[i][1] - self.ranges[i][0]) + self.ranges[i][0]
else:
for n in range(samples):
for i in range(self.n_params):
r = random.rand()
ret[n, i] = r * (self.ranges[i][1] - self.ranges[i][0]) + self.ranges[i][0]
return ret

def _lhsclassic(self, samples):
# From PyDOE
# Generate the intervals
H = lhsclassic(samples, self.n_params)

return H

def get_bound(self, index):
ret = zeros(self.n_params)
for i in range(self.n_params):
ret[i] = self.ranges[i][index]
return ret

def normalize_data(self, X):
# ret = zeros(X.shape)
# for i in range(len(X)):
# x = X[i]
# #ret[i] = x / max(x)
# ret[i] = log10(x + 1)
res = log(X + 1)
return res

def f_lsq(
self,
params,
t,
x_data_norm,
method="analytical",
normalize=True,
experiment_type=None,
):
self.simulator.set_params(*params)
if method == "numerical":
self.simulator.integrate(t, self.simulator.x0)
elif method == "analytical":
self.simulator.solve(t, self.simulator.x0)
if experiment_type is None:
ret = self.simulator.get_all_central_moments()
elif experiment_type == "nosplice":
ret = self.simulator.get_nosplice_central_moments()
ret = self.normalize_data(ret).flatten() if normalize else ret.flatten()
ret[isnan(ret)] = 0
return ret - x_data_norm

def fit_lsq(
self,
t,
x_data,
p0=None,
n_p0=1,
bounds=None,
sample_method="lhs",
method="analytical",
normalize=True,
experiment_type=None,
):
if p0 is None:
p0 = self.sample_p0(n_p0, sample_method)
else:
if p0.ndim == 1:
p0 = [p0]
n_p0 = len(p0)

x_data_norm = self.normalize_data(x_data) if normalize else x_data

if bounds is None:
bounds = (self.get_bound(0), self.get_bound(1))

costs = zeros(n_p0)
X = []
for i in range(n_p0):
ret = least_squares(
lambda p: self.f_lsq(
p,
t,
x_data_norm.flatten(),
method,
normalize=normalize,
experiment_type=experiment_type,
),
p0[i],
bounds=bounds,
)
costs[i] = ret.cost
X.append(ret.x)
i_min = argmin(costs)
return X[i_min], costs[i_min]
58 changes: 29 additions & 29 deletions dynamo/estimation/fit_jacobian.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,35 +98,6 @@ def hill_act_grad(
return A * n * Kd * x ** (n - 1) / (Kd + x**n) ** 2 - g


def calc_mean_squared_deviation(
func: Callable,
x_data: np.ndarray,
y_mean: np.ndarray,
y_sigm: np.ndarray,
weighted=True,
) -> float:
"""Calculate the mean squared deviation of the fit.
Args:
func: The function to evaluate.
x_data: An array of the x data.
y_mean: An array of the mean of y data.
y_sigm: An array of the standard deviation of the y data.
weighted: Whether to use weighted mean squared deviation.
Returns:
The mean squared deviation.
"""
err = func(x_data) - y_mean
if weighted:
sig = np.array(y_sigm, copy=True)
if np.any(sig == 0):
main_warning("Some standard deviations are 0; Set to 1 instead.")
sig[sig == 0] = 1
err /= sig
return np.sqrt(err.dot(err))


def fit_hill_grad(
x_data: np.ndarray,
y_mean: np.ndarray,
Expand Down Expand Up @@ -329,3 +300,32 @@ def fit_hill_act_grad(
pass

return {"A": p_opt_min[0], "K": p_opt_min[1], "n": np.exp(p_opt_min[2]), "g": p_opt_min[3]}, msd_min


def calc_mean_squared_deviation(
func: Callable,
x_data: np.ndarray,
y_mean: np.ndarray,
y_sigm: np.ndarray,
weighted=True,
) -> float:
"""Calculate the mean squared deviation of the fit.
Args:
func: The function to evaluate.
x_data: An array of the x data.
y_mean: An array of the mean of y data.
y_sigm: An array of the standard deviation of the y data.
weighted: Whether to use weighted mean squared deviation.
Returns:
The mean squared deviation.
"""
err = func(x_data) - y_mean
if weighted:
sig = np.array(y_sigm, copy=True)
if np.any(sig == 0):
main_warning("Some standard deviations are 0; Set to 1 instead.")
sig[sig == 0] = 1
err /= sig
return np.sqrt(err.dot(err))
File renamed without changes.
2 changes: 1 addition & 1 deletion dynamo/estimation/tsc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
Mixture_KinDeg_NoSwitching,
kinetic_estimation,
)
from .utils_kinetic import (
from .ODEs import (
Deterministic,
Deterministic_NoSplicing,
LinearODE,
Expand Down
128 changes: 64 additions & 64 deletions dynamo/estimation/tsc/estimation_kinetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,70 +9,7 @@
from ...dynamo_logger import main_warning
from ...tools.moments import strat_mom
from ...tools.sampling import lhsclassic
from .utils_kinetic import *


def guestimate_alpha(x_data: np.ndarray, time: np.ndarray) -> np.ndarray:
"""Roughly estimate alpha for kinetics data, which is simply the averaged ratio of new RNA and labeling time.
Args:
x_data: A matrix representing RNA data.
time: A matrix of labeling time.
Returns:
The estimated alpha.
"""
imax = np.argmax(x_data)
alpha = x_data[imax] / time[imax]
return alpha


def guestimate_gamma(x_data: np.ndarray, time: np.ndarray) -> np.ndarray:
"""Roughly estimate gamma0 with the assumption that time starts at 0.
Args:
x_data: A matrix representing RNA data.
time: A matrix of labeling time.
Returns:
The estimated gamma.
"""
ga0 = np.clip(np.log(max(x_data[0], 0) / (x_data[-1] + 1e-6)) / time[-1], 1e-3, 1e3)
return ga0


def guestimate_init_cond(x_data: np.ndarray) -> np.ndarray:
"""Roughly estimate x0 for degradation data.
Args:
x_data: A matrix representing RNA data.
Returns:
The estimated x0.
"""
x0 = np.clip(np.max(x_data, 1), 1e-4, np.inf)
return x0


def guestimate_p0_kinetic_chase(x_data: np.ndarray, time: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Roughly estimated alpha, gamma and initial conditions for the kinetic chase experiment. Initail conditions are
the average abundance of labeled RNAs across all cells belonging to the initial labeling time point.
Args:
x_data: A matrix representing RNA data.
time: A matrix of labeling time.
Returns:
The estimated alpha0, gamma0 and x0.
"""
t0 = np.min(time)
x0 = np.mean(x_data[time == t0])
idx = time != t0
al0 = np.mean((x0 - x_data[idx]) / (time[idx] - t0))
ga0 = -np.mean((np.log(x_data[idx]) - np.log(x0)) / (time[idx] - t0))
ga0 = 1e-3 if not np.isfinite(ga0) else ga0
x0, al0, ga0 = max(1e-3, x0), max(1e-3, al0), max(1e-3, ga0)
return al0, ga0, x0
from .ODEs import *


class kinetic_estimation:
Expand Down Expand Up @@ -1707,3 +1644,66 @@ def calc_mean_squared_deviation(self, weighted: bool = True) -> float:
if weighted:
err /= sig
return np.sqrt(err.dot(err))


def guestimate_alpha(x_data: np.ndarray, time: np.ndarray) -> np.ndarray:
"""Roughly estimate alpha for kinetics data, which is simply the averaged ratio of new RNA and labeling time.
Args:
x_data: A matrix representing RNA data.
time: A matrix of labeling time.
Returns:
The estimated alpha.
"""
imax = np.argmax(x_data)
alpha = x_data[imax] / time[imax]
return alpha


def guestimate_gamma(x_data: np.ndarray, time: np.ndarray) -> np.ndarray:
"""Roughly estimate gamma0 with the assumption that time starts at 0.
Args:
x_data: A matrix representing RNA data.
time: A matrix of labeling time.
Returns:
The estimated gamma.
"""
ga0 = np.clip(np.log(max(x_data[0], 0) / (x_data[-1] + 1e-6)) / time[-1], 1e-3, 1e3)
return ga0


def guestimate_init_cond(x_data: np.ndarray) -> np.ndarray:
"""Roughly estimate x0 for degradation data.
Args:
x_data: A matrix representing RNA data.
Returns:
The estimated x0.
"""
x0 = np.clip(np.max(x_data, 1), 1e-4, np.inf)
return x0


def guestimate_p0_kinetic_chase(x_data: np.ndarray, time: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Roughly estimated alpha, gamma and initial conditions for the kinetic chase experiment. Initail conditions are
the average abundance of labeled RNAs across all cells belonging to the initial labeling time point.
Args:
x_data: A matrix representing RNA data.
time: A matrix of labeling time.
Returns:
The estimated alpha0, gamma0 and x0.
"""
t0 = np.min(time)
x0 = np.mean(x_data[time == t0])
idx = time != t0
al0 = np.mean((x0 - x_data[idx]) / (time[idx] - t0))
ga0 = -np.mean((np.log(x_data[idx]) - np.log(x0)) / (time[idx] - t0))
ga0 = 1e-3 if not np.isfinite(ga0) else ga0
x0, al0, ga0 = max(1e-3, x0), max(1e-3, al0), max(1e-3, ga0)
return al0, ga0, x0
Loading

0 comments on commit b4f5288

Please sign in to comment.