From b87cf474ab6f2ca054ee8bd23e9d89d24a777c85 Mon Sep 17 00:00:00 2001 From: Dimitris Tsapetis Date: Wed, 23 Nov 2022 15:10:15 -0500 Subject: [PATCH] Starts TMCMC porting --- src/UQpy/sampling/mcmc/MetropolisHastings.py | 69 +++++-- src/UQpy/sampling/mcmc/baseclass/MCMC.py | 25 +-- .../tempering_mcmc/ParallelTemperingMCMC.py | 63 +++--- .../tempering_mcmc/SequentialTemperingMCMC.py | 193 ++++++++++-------- .../sampling/tempering_mcmc/TemperingMCMC.py | 3 +- tests/unit_tests/sampling/test_tempering.py | 68 ++++++ 6 files changed, 257 insertions(+), 164 deletions(-) create mode 100644 tests/unit_tests/sampling/test_tempering.py diff --git a/src/UQpy/sampling/mcmc/MetropolisHastings.py b/src/UQpy/sampling/mcmc/MetropolisHastings.py index a09a9380e..070420fea 100644 --- a/src/UQpy/sampling/mcmc/MetropolisHastings.py +++ b/src/UQpy/sampling/mcmc/MetropolisHastings.py @@ -13,22 +13,22 @@ class MetropolisHastings(MCMC): @beartype def __init__( - self, - pdf_target: Union[Callable, list[Callable]] = None, - log_pdf_target: Union[Callable, list[Callable]] = None, - args_target: tuple = None, - burn_length: Annotated[int, Is[lambda x: x >= 0]] = 0, - jump: int = 1, - dimension: int = None, - seed: list = None, - save_log_pdf: bool = False, - concatenate_chains: bool = True, - n_chains: int = None, - proposal: Distribution = None, - proposal_is_symmetric: bool = False, - random_state: RandomStateType = None, - nsamples: PositiveInteger = None, - nsamples_per_chain: PositiveInteger = None, + self, + pdf_target: Union[Callable, list[Callable]] = None, + log_pdf_target: Union[Callable, list[Callable]] = None, + args_target: tuple = None, + burn_length: Annotated[int, Is[lambda x: x >= 0]] = 0, + jump: int = 1, + dimension: int = None, + seed: list = None, + save_log_pdf: bool = False, + concatenate_chains: bool = True, + n_chains: int = None, + proposal: Distribution = None, + proposal_is_symmetric: bool = False, + random_state: RandomStateType = None, + nsamples: PositiveInteger = None, + nsamples_per_chain: PositiveInteger = None, ): """ Metropolis-Hastings algorithm :cite:`MCMC1` :cite:`MCMC2` @@ -115,7 +115,7 @@ def __init__( self.logger.info("\nUQpy: Initialization of " + self.__class__.__name__ + " algorithm complete.") if (nsamples is not None) or (nsamples_per_chain is not None): - self.run(nsamples=nsamples, nsamples_per_chain=nsamples_per_chain,) + self.run(nsamples=nsamples, nsamples_per_chain=nsamples_per_chain, ) def run_one_iteration(self, current_state: np.ndarray, current_log_pdf: np.ndarray): """ @@ -144,11 +144,11 @@ def run_one_iteration(self, current_state: np.ndarray, current_log_pdf: np.ndarr ) # this vector will be used to compute accept_ratio of each chain unif_rvs = ( Uniform() - .rvs(nsamples=self.n_chains, random_state=self.random_state) - .reshape((-1,)) + .rvs(nsamples=self.n_chains, random_state=self.random_state) + .reshape((-1,)) ) for nc, (cand, log_p_cand, r_) in enumerate( - zip(candidate, log_p_candidate, log_ratios) + zip(candidate, log_p_candidate, log_ratios) ): accept = np.log(unif_rvs[nc]) < r_ if accept: @@ -159,3 +159,32 @@ def run_one_iteration(self, current_state: np.ndarray, current_log_pdf: np.ndarr self._update_acceptance_rate(accept_vec) return current_state, current_log_pdf + + def __copy__(self, **kwargs): + pdf_target = self.pdf_target if kwargs['pdf_target'] is None else kwargs['pdf_target'] + log_pdf_target = self.log_pdf_target if kwargs['log_pdf_target'] is None else kwargs['log_pdf_target'] + args_target = self.args_target if kwargs['args_target'] is None else kwargs['args_target'] + burn_length = self.burn_length if kwargs['burn_length'] is None else kwargs['burn_length'] + jump = self.jump if kwargs['jump'] is None else kwargs['jump'] + dimension = self.dimension if kwargs['dimension'] is None else kwargs['dimension'] + seed = self.seed if kwargs['seed'] is None else kwargs['seed'] + save_log_pdf = self.save_log_pdf if kwargs['save_log_pdf'] is None else kwargs['save_log_pdf'] + concatenate_chains = self.concatenate_chains if kwargs['concatenate_chains'] is None\ + else kwargs['concatenate_chains'] + n_chains = self.n_chains if kwargs['n_chains'] is None else kwargs['n_chains'] + proposal = self.proposal if kwargs['proposal'] is None else kwargs['proposal'] + proposal_is_symmetric = self.proposal_is_symmetric if kwargs['proposal_is_symmetric'] is None \ + else kwargs['proposal_is_symmetric'] + random_state = self.random_state if kwargs['random_state'] is None else kwargs['random_state'] + nsamples = self.nsamples if kwargs['nsamples'] is None else kwargs['nsamples'] + nsamples_per_chain = self.nsamples_per_chain if kwargs['nsamples_per_chain'] is None \ + else kwargs['nsamples_per_chain'] + + new = self.__class__(pdf_target=pdf_target, log_pdf_target=log_pdf_target, args_target=args_target, + burn_length=burn_length, jump=jump, dimension=dimension, seed=seed, + save_log_pdf=save_log_pdf, concatenate_chains=concatenate_chains, + proposal=proposal, proposal_is_symmetric=proposal_is_symmetric, n_chains=n_chains, + random_state=random_state, nsamples=nsamples, nsamples_per_chain=nsamples_per_chain) + + return new + diff --git a/src/UQpy/sampling/mcmc/baseclass/MCMC.py b/src/UQpy/sampling/mcmc/baseclass/MCMC.py index a863c9d7b..f8df33285 100644 --- a/src/UQpy/sampling/mcmc/baseclass/MCMC.py +++ b/src/UQpy/sampling/mcmc/baseclass/MCMC.py @@ -8,7 +8,7 @@ from UQpy.distributions import Distribution from UQpy.utilities.ValidationTypes import * from UQpy.utilities.Utilities import process_random_state -from abc import ABC +from abc import ABC, abstractmethod class MCMC(ABC): @@ -177,29 +177,22 @@ def _concatenate_chains(self): return None def _unconcatenate_chains(self): - self.samples = self.samples.reshape( - (-1, self.n_chains, self.dimension), order="C" - ) + self.samples = self.samples.reshape((-1, self.n_chains, self.dimension), order="C") if self.save_log_pdf: - self.log_pdf_values = self.log_pdf_values.reshape( - (-1, self.n_chains), order="C" - ) + self.log_pdf_values = self.log_pdf_values.reshape((-1, self.n_chains), order="C") return None def _initialize_samples(self, nsamples, nsamples_per_chain): if ((nsamples is not None) and (nsamples_per_chain is not None)) \ or (nsamples is None and nsamples_per_chain is None): raise ValueError("UQpy: Either nsamples or nsamples_per_chain must be provided (not both)") - if nsamples_per_chain is not None: - if not (isinstance(nsamples_per_chain, int) and nsamples_per_chain >= 0): - raise TypeError("UQpy: nsamples_per_chain must be an integer >= 0.") - nsamples = int(nsamples_per_chain * self.n_chains) - else: + if nsamples_per_chain is None: if not (isinstance(nsamples, int) and nsamples >= 0): raise TypeError("UQpy: nsamples must be an integer >= 0.") nsamples_per_chain = int(np.ceil(nsamples / self.n_chains)) - nsamples = int(nsamples_per_chain * self.n_chains) - + elif not (isinstance(nsamples_per_chain, int) and nsamples_per_chain >= 0): + raise TypeError("UQpy: nsamples_per_chain must be an integer >= 0.") + nsamples = int(nsamples_per_chain * self.n_chains) if self.samples is None: # very first call of run, set current_state as the seed and initialize self.samples self.samples = np.zeros((nsamples_per_chain, self.n_chains, self.dimension)) if self.save_log_pdf: @@ -316,3 +309,7 @@ def _check_methods_proposal(proposal_distribution): raise AttributeError("UQpy: The proposal should have a log_pdf or pdf method") proposal_distribution.log_pdf = lambda x: np.log( np.maximum(proposal_distribution.pdf(x), 10 ** (-320) * np.ones((x.shape[0],)))) + + @abstractmethod + def __copy__(self, **kwargs): + pass diff --git a/src/UQpy/sampling/tempering_mcmc/ParallelTemperingMCMC.py b/src/UQpy/sampling/tempering_mcmc/ParallelTemperingMCMC.py index 291dcbe1b..d051fdea4 100644 --- a/src/UQpy/sampling/tempering_mcmc/ParallelTemperingMCMC.py +++ b/src/UQpy/sampling/tempering_mcmc/ParallelTemperingMCMC.py @@ -10,7 +10,7 @@ class ParallelTemperingMCMC(TemperingMCMC): """ Parallel-Tempering MCMC - This algorithms runs the chains sampling from various tempered distributions in parallel. Periodically during the + This algorithm runs the chains sampling from various tempered distributions in parallel. Periodically during the run, the different temperatures swap members of their ensemble in a way that preserves detailed balance.The chains closer to the reference chain (hot chains) can sample from regions that have low probability under the target and thus allow a better exploration of the parameter space, while the cold chains @@ -35,13 +35,18 @@ class ParallelTemperingMCMC(TemperingMCMC): """ def __init__(self, niter_between_sweeps, pdf_intermediate=None, log_pdf_intermediate=None, args_pdf_intermediate=(), - distribution_reference=None, nburn=0, jump=1, dimension=None, seed=None, - save_log_pdf=False, nsamples=None, nsamples_per_chain=None, nchains=None, verbose=False, - random_state=None, temper_param_list=None, n_temper_params=None, mcmc_class=MetropolisHastings, **kwargs_mcmc): + distribution_reference=None, + save_log_pdf=False, nsamples=None, nsamples_per_chain=None, + random_state=None, + temper_param_list=None, n_temper_params=None, + sampler: Union[MCMC, list[MCMC]] = None): super().__init__(pdf_intermediate=pdf_intermediate, log_pdf_intermediate=log_pdf_intermediate, - args_pdf_intermediate=args_pdf_intermediate, distribution_reference=None, dimension=dimension, + args_pdf_intermediate=args_pdf_intermediate, distribution_reference=None, save_log_pdf=save_log_pdf, random_state=random_state) + self.logger = logging.getLogger(__name__) + self.sampler = sampler + self.distribution_reference = distribution_reference self.evaluate_log_reference = self._preprocess_reference(self.distribution_reference) @@ -68,18 +73,18 @@ def __init__(self, niter_between_sweeps, pdf_intermediate=None, log_pdf_intermed self.n_temper_params = len(self.temper_param_list) # Initialize mcmc objects, need as many as number of temperatures - if not issubclass(mcmc_class, MCMC): - raise ValueError('UQpy: mcmc_class should be a subclass of MCMC.') - if not all((isinstance(val, (list, tuple)) and len(val) == self.n_temper_params) - for val in kwargs_mcmc.values()): - raise ValueError( - 'UQpy: additional kwargs arguments should be mcmc algorithm specific inputs, given as lists of length ' - 'the number of temperatures.') + # if not all((isinstance(val, (list, tuple)) and len(val) == self.n_temper_params) + # for val in kwargs_mcmc.values()): + # raise ValueError( + # 'UQpy: additional kwargs arguments should be mcmc algorithm specific inputs, given as lists of length ' + # 'the number of temperatures.') # default value - if isinstance(mcmc_class, MetropolisHastings) and len(kwargs_mcmc) == 0: + kwargs_mcmc = {} + if isinstance(self.sampler, MetropolisHastings) and not kwargs_mcmc: from UQpy.distributions import JointIndependent, Normal kwargs_mcmc = {'proposal_is_symmetric': [True, ] * self.n_temper_params, - 'proposal': [JointIndependent([Normal(scale=1. / np.sqrt(temper_param))] * dimension) + 'proposal': [JointIndependent([Normal(scale=1. / np.sqrt(temper_param))] * + self.sampler.dimension) for temper_param in self.temper_param_list]} # Initialize algorithm specific inputs: target pdfs @@ -87,23 +92,12 @@ def __init__(self, niter_between_sweeps, pdf_intermediate=None, log_pdf_intermed self.mcmc_samplers = [] for i, temper_param in enumerate(self.temper_param_list): - # log_pdf_target = self._target_generator( - # self.evaluate_log_intermediate, self.evaluate_log_reference, temper_param) log_pdf_target = (lambda x, temper_param=temper_param: self.evaluate_log_reference( x) + self.evaluate_log_intermediate(x, temper_param)) - self.mcmc_samplers.append( - mcmc_class(log_pdf_target=log_pdf_target, - dimension=dimension, seed=seed, nburn=nburn, jump=jump, save_log_pdf=save_log_pdf, - concat_chains=True, verbose=verbose, random_state=self.random_state, nchains=nchains, - **dict([(key, val[i]) for key, val in kwargs_mcmc.items()]))) - - # Samples connect to posterior samples, i.e. the chain with temperature 1. - # self.samples = self.mcmc_samplers[0].samples - # if self.save_log_pdf: - # self.log_pdf_values = self.mcmc_samplers[0].samples + self.mcmc_samplers.append(sampler.__copy__(log_pdf_target=log_pdf_target, concatenate_chains=True, + **kwargs_mcmc)) - if self.verbose: - print('\nUQpy: Initialization of ' + self.__class__.__name__ + ' algorithm complete.') + self.logger.info('\nUQpy: Initialization of ' + self.__class__.__name__ + ' algorithm complete.') # If nsamples is provided, run the algorithm if (nsamples is not None) or (nsamples_per_chain is not None): @@ -138,8 +132,7 @@ def _run(self, nsamples=None, nsamples_per_chain=None): current_state.append(current_state_t.copy()) current_log_pdf.append(current_log_pdf_t.copy()) - if self.verbose: - print('UQpy: Running MCMC...') + self.logger.info('UQpy: Running MCMC...') # Run nsims iterations of the MCMC algorithm, starting at current_state while self.mcmc_samplers[0].nsamples_per_chain < final_ns_per_chain: @@ -181,8 +174,7 @@ def _run(self, nsamples=None, nsamples_per_chain=None): # self.nsamples_per_chain += 1 # self.nsamples += self.nchains - if self.verbose: - print('UQpy: MCMC run successfully !') + self.logger.info('UQpy: MCMC run successfully !') # Concatenate chains maybe if self.mcmc_samplers[-1].concat_chains: @@ -234,13 +226,6 @@ def evaluate_normalization_constant(self, compute_potential, log_Z0=None, nsampl # use quadrature to integrate between 0 and 1 temper_param_list_for_integration = np.copy(np.array(self.temper_param_list)) log_pdf_averages = np.array(log_pdf_averages) - # if self.temper_param_list[-1] != 1.: - # log_pdf_averages = np.append(log_pdf_averages, log_pdf_averages[-1]) - # slope_linear = (log_pdf_averages[-1]-log_pdf_averages[-2]) / ( - # betas_for_integration[-1] - betas_for_integration[-2]) - # log_pdf_averages = np.append( - # log_pdf_averages, log_pdf_averages[-1] + (1. - betas_for_integration[-1]) * slope_linear) - # betas_for_integration = np.append(betas_for_integration, 1.) int_value = trapz(x=temper_param_list_for_integration, y=log_pdf_averages) if log_Z0 is None: samples_p0 = self.distribution_reference.rvs(nsamples=nsamples_from_p0) diff --git a/src/UQpy/sampling/tempering_mcmc/SequentialTemperingMCMC.py b/src/UQpy/sampling/tempering_mcmc/SequentialTemperingMCMC.py index f1ea34af9..f9e98f525 100644 --- a/src/UQpy/sampling/tempering_mcmc/SequentialTemperingMCMC.py +++ b/src/UQpy/sampling/tempering_mcmc/SequentialTemperingMCMC.py @@ -12,7 +12,7 @@ class SequentialTemperingMCMC(TemperingMCMC): """ Sequential-Tempering MCMC - This algorithms samples from a series of intermediate targets that are each tempered versions of the final/true + This algorithm samples from a series of intermediate targets that are each tempered versions of the final/true target. In going from one intermediate distribution to the next, the existing samples are resampled according to some weights (similar to importance sampling). To ensure that there aren't a large number of duplicates, the resampling step is followed by a short (or even single-step) MCMC run that disperses the samples while remaining @@ -36,50 +36,62 @@ class SequentialTemperingMCMC(TemperingMCMC): """ @beartype + # def __init__(self, pdf_intermediate=None, log_pdf_intermediate=None, args_pdf_intermediate=(), + # distribution_reference=None, + # mcmc_class: MCMC = None, + # dimension=None, seed=None, + # nsamples: PositiveInteger = None, + # recalc_w=False, + # nburn_resample=0, save_intermediate_samples=False, nchains=1, + # percentage_resampling=100, random_state=None, + # proposal_is_symmetric=True): def __init__(self, pdf_intermediate=None, log_pdf_intermediate=None, args_pdf_intermediate=(), distribution_reference=None, - mcmc_class: MCMC = None, - dimension=None, seed=None, + sampler: MCMC = None, + seed=None, nsamples: PositiveInteger = None, - recalc_w=False, - nburn_resample=0, save_intermediate_samples=False, nchains=1, - percentage_resampling=100, random_state=None, - proposal_is_symmetric=True): - + recalculate_weights=False, + save_intermediate_samples=False, + percentage_resampling=100, + random_state=None, + resampling_burn_length=0, + resampling_proposal=None, + resampling_proposal_is_symmetric=True): + self.proposal = resampling_proposal + self.proposal_is_symmetric = resampling_proposal_is_symmetric + self.resampling_burn_length = resampling_burn_length self.logger = logging.getLogger(__name__) super().__init__(pdf_intermediate=pdf_intermediate, log_pdf_intermediate=log_pdf_intermediate, args_pdf_intermediate=args_pdf_intermediate, distribution_reference=distribution_reference, - dimension=dimension, random_state=random_state) + random_state=random_state) + self.logger = logging.getLogger(__name__) + self.sampler = sampler # Initialize inputs self.save_intermediate_samples = save_intermediate_samples - self.recalc_w = recalc_w - self.nburn_resample = nburn_resample - self.nchains = nchains - self.resample_frac = percentage_resampling / 100 - self.proposal_is_symmetric=proposal_is_symmetric + self.recalculate_weights = recalculate_weights + + self.resample_fraction = percentage_resampling / 100 - self.nspc = int(np.floor(((1 - self.resample_frac) * nsamples) / self.nchains)) - self.nresample = int(nsamples - (self.nspc * self.nchains)) - self.mcmc_class:MCMC = mcmc_class + self.__dimension = sampler.dimension + self.__n_chains = sampler.n_chains + + self.n_samples_per_chain = int(np.floor(((1 - self.resample_fraction) * nsamples) / self.__n_chains)) + self.n_resamples = int(nsamples - (self.n_samples_per_chain * self.__n_chains)) # Initialize input distributions self.evaluate_log_reference, self.seed = self._preprocess_reference(dist_=distribution_reference, seed_=seed, nsamples=nsamples, - dimension=self.dimension) + dimension=self.__dimension) # Initialize flag that indicates whether default proposal is to be used (default proposal defined adaptively # during run) - if self.proposal is None: - self.proposal_given_flag = False - else: - self.proposal_given_flag = True - + self.proposal_given_flag = self.proposal is not None # Initialize attributes - self.temper_param_list = None + self.tempering_parameters = None self.evidence = None - self.evidence_cov = None + self.evidence_CoV = None # Call the run function if nsamples is not None: @@ -95,109 +107,114 @@ def _run(self, nsamples: PositiveInteger = None): if self.samples is not None: raise RuntimeError('UQpy: run method cannot be called multiple times for the same object') - pts = self.seed # Generated Samples from prior for zero-th tempering level + points = self.seed # Generated Samples from prior for zero-th tempering level # Initializing other variables - temper_param = 0.0 # Intermediate exponent - temper_param_prev = temper_param - self.temper_param_list = np.array(temper_param) + current_tempering_parameter = 0.0 # Intermediate exponent + previous_tempering_parameter = current_tempering_parameter + self.tempering_parameters = np.array(current_tempering_parameter) pts_index = np.arange(nsamples) # Array storing sample indices - w = np.zeros(nsamples) # Array storing plausibility weights - wp = np.zeros(nsamples) # Array storing plausibility weight probabilities - exp_q0 = 0 - for i in range(nsamples): - exp_q0 += np.exp(self.evaluate_log_intermediate(pts[i, :].reshape((1, -1)), 0.0)) - S = exp_q0 / nsamples + weights = np.zeros(nsamples) # Array storing plausibility weights + weight_probabilities = np.zeros(nsamples) # Array storing plausibility weight probabilities + expected_q0 = sum( + np.exp(self.evaluate_log_intermediate(points[i, :].reshape((1, -1)), 0.0)) + for i in range(nsamples))/nsamples + + evidence_estimator = expected_q0 if self.save_intermediate_samples is True: self.intermediate_samples = [] - self.intermediate_samples += [pts.copy()] + self.intermediate_samples += [points.copy()] + # Calculate covariance matrix for the default proposal + cov_scale = 0.2 # Looping over all adaptively decided tempering levels - while temper_param < 1: + while current_tempering_parameter < 1: # Adaptively set the tempering exponent for the current level - temper_param_prev = temper_param - temper_param = self._find_temper_param(temper_param_prev, pts, self.evaluate_log_intermediate, nsamples) + previous_tempering_parameter = current_tempering_parameter + current_tempering_parameter = self._find_temper_param(previous_tempering_parameter, points, + self.evaluate_log_intermediate, nsamples) # d_exp = temper_param - temper_param_prev - self.temper_param_list = np.append(self.temper_param_list, temper_param) + self.tempering_parameters = np.append(self.tempering_parameters, current_tempering_parameter) self.logger.info('beta selected') # Calculate the plausibility weights for i in range(nsamples): - w[i] = np.exp(self.evaluate_log_intermediate(pts[i, :].reshape((1, -1)), temper_param) - - self.evaluate_log_intermediate(pts[i, :].reshape((1, -1)), temper_param_prev)) + weights[i] = np.exp(self.evaluate_log_intermediate(points[i, :].reshape((1, -1)), + current_tempering_parameter) + - self.evaluate_log_intermediate(points[i, :].reshape((1, -1)), + previous_tempering_parameter)) # Calculate normalizing constant for the plausibility weights (sum of the weights) - w_sum = np.sum(w) + w_sum = np.sum(weights) # Calculate evidence from each tempering level - S = S * (w_sum / nsamples) + evidence_estimator = evidence_estimator * (w_sum / nsamples) # Normalize plausibility weight probabilities - wp = (w / w_sum) + weight_probabilities = (weights / w_sum) - # Calculate covariance matrix for the default proposal - cov_scale = 0.2 - w_th_sum = np.zeros(self.dimension) + w_theta_sum = np.zeros(self.__dimension) for i in range(nsamples): - for j in range(self.dimension): - w_th_sum[j] += w[i] * pts[i, j] - sig_mat = np.zeros((self.dimension, self.dimension)) + for j in range(self.__dimension): + w_theta_sum[j] += weights[i] * points[i, j] + sigma_matrix = np.zeros((self.__dimension, self.__dimension)) for i in range(nsamples): - pts_deviation = np.zeros((self.dimension, 1)) - for j in range(self.dimension): - pts_deviation[j, 0] = pts[i, j] - (w_th_sum[j] / w_sum) - sig_mat += (w[i] / w_sum) * np.dot(pts_deviation, - pts_deviation.T) # Normalized by w_sum as per Betz et al - sig_mat = cov_scale * cov_scale * sig_mat + points_deviation = np.zeros((self.__dimension, 1)) + for j in range(self.__dimension): + points_deviation[j, 0] = points[i, j] - (w_theta_sum[j] / w_sum) + sigma_matrix += (weights[i] / w_sum) * np.dot(points_deviation, + points_deviation.T) # Normalized by w_sum as per Betz et al + sigma_matrix = cov_scale ** 2 * sigma_matrix mcmc_log_pdf_target = self._target_generator(self.evaluate_log_intermediate, - self.evaluate_log_reference, temper_param) + self.evaluate_log_reference, current_tempering_parameter) self.logger.info('Begin Resampling') # Resampling and MH-MCMC step - for i in range(self.nresample): + for i in range(self.n_resamples): # Resampling from previous tempering level - lead_index = int(np.random.choice(pts_index, p=wp)) - lead = pts[lead_index] + lead_index = int(np.random.choice(pts_index, p=weight_probabilities)) + lead = points[lead_index] # Defining the default proposal if self.proposal_given_flag is False: - self.proposal = MultivariateNormal(lead, cov=sig_mat) + self.proposal = MultivariateNormal(lead, cov=sigma_matrix) # Single MH-MCMC step - x = MetropolisHastings(dimension=self.dimension, log_pdf_target=mcmc_log_pdf_target, seed=lead, - nsamples=1, nchains=1, nburn=self.nburn_resample, proposal=self.proposal, + x = MetropolisHastings(dimension=self.__dimension, log_pdf_target=mcmc_log_pdf_target, seed=lead, + nsamples=1, nchains=1, nburn=self.resampling_burn_length, proposal=self.proposal, proposal_is_symmetric=self.proposal_is_symmetric) # Setting the generated sample in the array - pts[i] = x.samples + points[i] = x.samples - if self.recalc_w: - w[i] = np.exp(self.evaluate_log_intermediate(pts[i, :].reshape((1, -1)), temper_param) - - self.evaluate_log_intermediate(pts[i, :].reshape((1, -1)), temper_param_prev)) - wp[i] = w[i] / w_sum + if self.recalculate_weights: + weights[i] = np.exp( + self.evaluate_log_intermediate(points[i, :].reshape((1, -1)), current_tempering_parameter) + - self.evaluate_log_intermediate(points[i, :].reshape((1, -1)), previous_tempering_parameter)) + weight_probabilities[i] = weights[i] / w_sum self.logger.info('Begin MCMC') - mcmc_seed = self._mcmc_seed_generator(resampled_pts=pts[0:self.nresample, :], arr_length=self.nresample, - seed_length=self.nchains) + mcmc_seed = self._mcmc_seed_generator(resampled_pts=points[0:self.n_resamples, :], + arr_length=self.n_resamples, + seed_length=self.__n_chains) - y=copy.deepcopy(self.mcmc_class) + y = copy.deepcopy(self.sampler) self.update_target_and_seed(y, mcmc_seed, mcmc_log_pdf_target) - # y = self.mcmc_class(log_pdf_target=mcmc_log_pdf_target, seed=mcmc_seed, dimension=self.dimension, - # nchains=self.nchains, nsamples_per_chain=self.nspc, nburn=self.nburn_mcmc, - # jump=self.jump_mcmc, concat_chains=True) - pts[self.nresample:, :] = y.samples + y = self.sampler.__copy__(log_pdf_target=mcmc_log_pdf_target, seed=mcmc_seed, + nsamples_per_chain=self.n_samples_per_chain, concat_chains=True) + points[self.n_resamples:, :] = y.samples if self.save_intermediate_samples is True: - self.intermediate_samples += [pts.copy()] + self.intermediate_samples += [points.copy()] self.logger.info('Tempering level ended') # Setting the calculated values to the attributes - self.samples = pts - self.evidence = S + self.samples = points + self.evidence = evidence_estimator def update_target_and_seed(self, mcmc_class, mcmc_seed, mcmc_log_pdf_target): mcmc_class.seed = mcmc_seed @@ -243,7 +260,7 @@ def _find_temper_param(temper_param_prev, samples, q_func, n, iter_lim=1000, ite loop_counter += 1 q_scaled = np.zeros(n) temper_param_trial = ((bot + top) / 2) - for i2 in range(0, n): + for i2 in range(n): q_scaled[i2] = np.exp(q_func(samples[i2, :].reshape((1, -1)), 1) - q_func(samples[i2, :].reshape((1, -1)), temper_param_prev)) sigma_1 = np.std(q_scaled) @@ -252,7 +269,7 @@ def _find_temper_param(temper_param_prev, samples, q_func, n, iter_lim=1000, ite flag = 1 temper_param_trial = 1 continue - for i3 in range(0, n): + for i3 in range(n): q_scaled[i3] = np.exp(q_func(samples[i3, :].reshape((1, -1)), temper_param_trial) - q_func(samples[i3, :].reshape((1, -1)), temper_param_prev)) sigma = np.std(q_scaled) @@ -298,16 +315,14 @@ def _preprocess_reference(self, dist_, seed_=None, nsamples=None, dimension=None elif dist_ is not None: if not (isinstance(dist_, Distribution)): raise TypeError('UQpy: A UQpy.Distribution object must be provided.') - else: - evaluate_log_pdf = (lambda x: dist_.log_pdf(x)) - seed_values = dist_.rvs(nsamples=nsamples) + evaluate_log_pdf = (lambda x: dist_.log_pdf(x)) + seed_values = dist_.rvs(nsamples=nsamples) elif seed_ is not None: - if seed_.shape[0] == nsamples and seed_.shape[1] == dimension: - seed_values = seed_ - kernel = stats.gaussian_kde(seed_) - evaluate_log_pdf = (lambda x: kernel.logpdf(x)) - else: + if seed_.shape[0] != nsamples or seed_.shape[1] != dimension: raise TypeError('UQpy: the seed values should be a numpy array of size (nsamples, dimension)') + seed_values = seed_ + kernel = stats.gaussian_kde(seed_) + evaluate_log_pdf = (lambda x: kernel.logpdf(x)) else: raise ValueError('UQpy: either prior distribution or seed values must be provided') return evaluate_log_pdf, seed_values diff --git a/src/UQpy/sampling/tempering_mcmc/TemperingMCMC.py b/src/UQpy/sampling/tempering_mcmc/TemperingMCMC.py index 8781186a9..571f79546 100644 --- a/src/UQpy/sampling/tempering_mcmc/TemperingMCMC.py +++ b/src/UQpy/sampling/tempering_mcmc/TemperingMCMC.py @@ -18,10 +18,9 @@ class TemperingMCMC(ABC): """ def __init__(self, pdf_intermediate=None, log_pdf_intermediate=None, args_pdf_intermediate=(), - distribution_reference=None, dimension=None, save_log_pdf=True, random_state=None): + distribution_reference=None, save_log_pdf=True, random_state=None): self.logger = logging.getLogger(__name__) # Check a few inputs - self.dimension = dimension self.save_log_pdf = save_log_pdf self.random_state = process_random_state(random_state) diff --git a/tests/unit_tests/sampling/test_tempering.py b/tests/unit_tests/sampling/test_tempering.py new file mode 100644 index 000000000..c0c7106f4 --- /dev/null +++ b/tests/unit_tests/sampling/test_tempering.py @@ -0,0 +1,68 @@ +import numpy as np +from scipy.stats import multivariate_normal + +from UQpy.distributions import Uniform, JointIndependent +from UQpy.sampling.tempering_mcmc import ParallelTemperingMCMC, SequentialTemperingMCMC +from UQpy.sampling.mcmc import MetropolisHastings + + +def log_rosenbrock(x): + return -(100 * (x[:, 1] - x[:, 0] ** 2) ** 2 + (1 - x[:, 0]) ** 2) / 20 + + +def log_intermediate(x, beta): + return beta * log_rosenbrock(x) + + +def log_prior(x): + loc, scale = -20., 40. + return Uniform(loc=loc, scale=scale).log_pdf(x[:, 0]) + Uniform(loc=loc, scale=scale).log_pdf(x[:, 1]) + + +def compute_potential(x, beta, log_intermediate_values): + return log_intermediate_values / beta + + +random_state = np.random.RandomState(1234) +seed = -2. + 4. * random_state.rand(5, 2) +betas = [1. / np.sqrt(2.) ** i for i in range(10 - 1, -1, -1)] + + +def test_parallel(): + mcmc = ParallelTemperingMCMC(log_pdf_intermediate=log_intermediate, log_pdf_reference=log_prior, + niter_between_sweeps=4, betas=betas, save_log_pdf=True, + mcmc_class=MH, nburn=10, jump=2, seed=seed, dimension=2, random_state=3456) + mcmc.run(nsamples_per_chain=100) + assert mcmc.samples.shape == (500, 2) + + +def test_thermodynamic_integration(): + mcmc = ParallelTemperingMCMC(log_pdf_intermediate=log_intermediate, log_pdf_reference=log_prior, + niter_between_sweeps=4, betas=betas, save_log_pdf=True, + mcmc_class=MH, nburn=10, jump=2, seed=seed, dimension=2, random_state=3456) + mcmc.run(nsamples_per_chain=100) + log_ev = mcmc.evaluate_normalization_constant(compute_potential=compute_potential, log_p0=0.) + assert np.round(np.exp(log_ev), 4) == 0.1885 + + +def likelihood(x, b): + mu1 = np.array([1., 1.]) + mu2 = -0.8 * np.ones(2) + w1 = 0.5 + # Width of 0.1 in each dimension + sigma1 = np.diag([0.02, 0.05]) + sigma2 = np.diag([0.05, 0.02]) + + # Posterior is a mixture of two gaussians + like = np.exp(np.logaddexp(np.log(w1) + multivariate_normal.logpdf(x=x, mean=mu1, cov=sigma1), + np.log(1.-w1) + multivariate_normal.logpdf(x=x, mean=mu2, cov=sigma2))) + return like**b + + +def test_sequential(): + prior = JointIndependent(marginals=[Uniform(loc=-2.0, scale=4.0), Uniform(loc=-2.0, scale=4.0)]) + test = SequentialTemperingMCMC(dimension=2, nsamples=100, pdf_intermediate=likelihood, + distribution_reference=prior, nchains=20, save_intermediate_samples=True, + percentage_resampling=10, mcmc_class=MH, verbose=False, random_state=960242069) + assert np.round(test.evidence, 4) == 0.0656 +