From e758876f4393805f45f8edd3d45571e1de4cb842 Mon Sep 17 00:00:00 2001 From: Dimitris Tsapetis Date: Fri, 25 Nov 2022 16:26:19 -0500 Subject: [PATCH] Ports ParallelTemperingMCMC.py to v4 --- src/UQpy/sampling/mcmc/MetropolisHastings.py | 29 ------ src/UQpy/sampling/mcmc/baseclass/MCMC.py | 20 +++- .../tempering_mcmc/ParallelTemperingMCMC.py | 94 +++++++++---------- .../sampling/tempering_mcmc/TemperingMCMC.py | 2 +- tests/unit_tests/sampling/test_tempering.py | 40 ++++---- 5 files changed, 89 insertions(+), 96 deletions(-) diff --git a/src/UQpy/sampling/mcmc/MetropolisHastings.py b/src/UQpy/sampling/mcmc/MetropolisHastings.py index 070420fe..c8f9a4c1 100644 --- a/src/UQpy/sampling/mcmc/MetropolisHastings.py +++ b/src/UQpy/sampling/mcmc/MetropolisHastings.py @@ -159,32 +159,3 @@ def run_one_iteration(self, current_state: np.ndarray, current_log_pdf: np.ndarr self._update_acceptance_rate(accept_vec) return current_state, current_log_pdf - - def __copy__(self, **kwargs): - pdf_target = self.pdf_target if kwargs['pdf_target'] is None else kwargs['pdf_target'] - log_pdf_target = self.log_pdf_target if kwargs['log_pdf_target'] is None else kwargs['log_pdf_target'] - args_target = self.args_target if kwargs['args_target'] is None else kwargs['args_target'] - burn_length = self.burn_length if kwargs['burn_length'] is None else kwargs['burn_length'] - jump = self.jump if kwargs['jump'] is None else kwargs['jump'] - dimension = self.dimension if kwargs['dimension'] is None else kwargs['dimension'] - seed = self.seed if kwargs['seed'] is None else kwargs['seed'] - save_log_pdf = self.save_log_pdf if kwargs['save_log_pdf'] is None else kwargs['save_log_pdf'] - concatenate_chains = self.concatenate_chains if kwargs['concatenate_chains'] is None\ - else kwargs['concatenate_chains'] - n_chains = self.n_chains if kwargs['n_chains'] is None else kwargs['n_chains'] - proposal = self.proposal if kwargs['proposal'] is None else kwargs['proposal'] - proposal_is_symmetric = self.proposal_is_symmetric if kwargs['proposal_is_symmetric'] is None \ - else kwargs['proposal_is_symmetric'] - random_state = self.random_state if kwargs['random_state'] is None else kwargs['random_state'] - nsamples = self.nsamples if kwargs['nsamples'] is None else kwargs['nsamples'] - nsamples_per_chain = self.nsamples_per_chain if kwargs['nsamples_per_chain'] is None \ - else kwargs['nsamples_per_chain'] - - new = self.__class__(pdf_target=pdf_target, log_pdf_target=log_pdf_target, args_target=args_target, - burn_length=burn_length, jump=jump, dimension=dimension, seed=seed, - save_log_pdf=save_log_pdf, concatenate_chains=concatenate_chains, - proposal=proposal, proposal_is_symmetric=proposal_is_symmetric, n_chains=n_chains, - random_state=random_state, nsamples=nsamples, nsamples_per_chain=nsamples_per_chain) - - return new - diff --git a/src/UQpy/sampling/mcmc/baseclass/MCMC.py b/src/UQpy/sampling/mcmc/baseclass/MCMC.py index f8df3328..2a493683 100644 --- a/src/UQpy/sampling/mcmc/baseclass/MCMC.py +++ b/src/UQpy/sampling/mcmc/baseclass/MCMC.py @@ -310,6 +310,22 @@ def _check_methods_proposal(proposal_distribution): proposal_distribution.log_pdf = lambda x: np.log( np.maximum(proposal_distribution.pdf(x), 10 ** (-320) * np.ones((x.shape[0],)))) - @abstractmethod def __copy__(self, **kwargs): - pass + keys = kwargs.keys() + attributes = self.__dict__ + import inspect + initializer_parameters = inspect.signature(self.__class__.__init__).parameters.keys() + + for key in attributes.keys(): + if key not in initializer_parameters: + continue + new_value = attributes[key] if key not in keys else kwargs[key] + if new_value is not None: + kwargs[key] = new_value + + if 'seed' in kwargs.keys(): + kwargs['seed'] = list(kwargs['seed']) + if 'nsamples_per_chain' in kwargs.keys() and kwargs['nsamples_per_chain'] == 0: + del kwargs['nsamples_per_chain'] + + return self.__class__(**kwargs) diff --git a/src/UQpy/sampling/tempering_mcmc/ParallelTemperingMCMC.py b/src/UQpy/sampling/tempering_mcmc/ParallelTemperingMCMC.py index d051fdea..c63da088 100644 --- a/src/UQpy/sampling/tempering_mcmc/ParallelTemperingMCMC.py +++ b/src/UQpy/sampling/tempering_mcmc/ParallelTemperingMCMC.py @@ -34,11 +34,13 @@ class ParallelTemperingMCMC(TemperingMCMC): """ - def __init__(self, niter_between_sweeps, pdf_intermediate=None, log_pdf_intermediate=None, args_pdf_intermediate=(), + def __init__(self, n_iterations_between_sweeps: PositiveInteger, + pdf_intermediate=None, log_pdf_intermediate=None, args_pdf_intermediate=(), distribution_reference=None, save_log_pdf=False, nsamples=None, nsamples_per_chain=None, random_state=None, - temper_param_list=None, n_temper_params=None, + tempering_parameters=None, + n_tempering_parameters=None, sampler: Union[MCMC, list[MCMC]] = None): super().__init__(pdf_intermediate=pdf_intermediate, log_pdf_intermediate=log_pdf_intermediate, @@ -51,59 +53,53 @@ def __init__(self, niter_between_sweeps, pdf_intermediate=None, log_pdf_intermed self.evaluate_log_reference = self._preprocess_reference(self.distribution_reference) # Initialize PT specific inputs: niter_between_sweeps and temperatures - self.niter_between_sweeps = niter_between_sweeps - if not (isinstance(self.niter_between_sweeps, int) and self.niter_between_sweeps >= 1): - raise ValueError('UQpy: input niter_between_sweeps should be a strictly positive integer.') - self.temper_param_list = temper_param_list - self.n_temper_params = n_temper_params - if self.temper_param_list is None: - if self.n_temper_params is None: + self.n_iterations_between_sweeps = n_iterations_between_sweeps + self.tempering_parameters = tempering_parameters + self.n_tempering_parameters = n_tempering_parameters + if self.tempering_parameters is None: + if self.n_tempering_parameters is None: raise ValueError('UQpy: either input temper_param_list or n_temper_params should be provided.') - elif not (isinstance(self.n_temper_params, int) and self.n_temper_params >= 2): + elif not (isinstance(self.n_tempering_parameters, int) and self.n_tempering_parameters >= 2): raise ValueError('UQpy: input n_temper_params should be a integer >= 2.') else: - self.temper_param_list = [1. / np.sqrt(2) ** i for i in range(self.n_temper_params - 1, -1, -1)] - elif (not isinstance(self.temper_param_list, (list, tuple)) - or not (all(isinstance(t, (int, float)) and (0 < t <= 1.) for t in self.temper_param_list)) + self.tempering_parameters = [1. / np.sqrt(2) ** i for i in + range(self.n_tempering_parameters - 1, -1, -1)] + elif (not isinstance(self.tempering_parameters, (list, tuple)) + or not (all(isinstance(t, (int, float)) and (0 < t <= 1.) for t in self.tempering_parameters)) # or float(self.temperatures[0]) != 1. ): raise ValueError( 'UQpy: temper_param_list should be a list of floats in [0, 1], starting at 0. and increasing to 1.') else: - self.n_temper_params = len(self.temper_param_list) - - # Initialize mcmc objects, need as many as number of temperatures - # if not all((isinstance(val, (list, tuple)) and len(val) == self.n_temper_params) - # for val in kwargs_mcmc.values()): - # raise ValueError( - # 'UQpy: additional kwargs arguments should be mcmc algorithm specific inputs, given as lists of length ' - # 'the number of temperatures.') + self.n_tempering_parameters = len(self.tempering_parameters) + # default value kwargs_mcmc = {} - if isinstance(self.sampler, MetropolisHastings) and not kwargs_mcmc: + if isinstance(self.sampler, MetropolisHastings) and self.sampler.proposal is None: from UQpy.distributions import JointIndependent, Normal - kwargs_mcmc = {'proposal_is_symmetric': [True, ] * self.n_temper_params, + kwargs_mcmc = {'proposal_is_symmetric': [True, ] * self.n_tempering_parameters, 'proposal': [JointIndependent([Normal(scale=1. / np.sqrt(temper_param))] * self.sampler.dimension) - for temper_param in self.temper_param_list]} + for temper_param in self.tempering_parameters]} # Initialize algorithm specific inputs: target pdfs self.thermodynamic_integration_results = None self.mcmc_samplers = [] - for i, temper_param in enumerate(self.temper_param_list): + for i, temper_param in enumerate(self.tempering_parameters): log_pdf_target = (lambda x, temper_param=temper_param: self.evaluate_log_reference( x) + self.evaluate_log_intermediate(x, temper_param)) self.mcmc_samplers.append(sampler.__copy__(log_pdf_target=log_pdf_target, concatenate_chains=True, - **kwargs_mcmc)) + save_log_pdf=save_log_pdf, random_state=self.random_state, + **dict([(key, val[i]) for key, val in kwargs_mcmc.items()]))) self.logger.info('\nUQpy: Initialization of ' + self.__class__.__name__ + ' algorithm complete.') # If nsamples is provided, run the algorithm if (nsamples is not None) or (nsamples_per_chain is not None): - self._run(nsamples=nsamples, nsamples_per_chain=nsamples_per_chain) + self.run(nsamples=nsamples, nsamples_per_chain=nsamples_per_chain) - def _run(self, nsamples=None, nsamples_per_chain=None): + def run(self, nsamples=None, nsamples_per_chain=None): """ Run the MCMC algorithm. @@ -122,35 +118,39 @@ def _run(self, nsamples=None, nsamples_per_chain=None): of `nchains`, `nsamples` is set to the next largest integer that is a multiple of `nchains`. """ - # Initialize the runs: allocate space for the new samples and log pdf values - final_ns, final_ns_per_chain, current_state_t, current_log_pdf_t = self.mcmc_samplers[0]._initialize_samples( - nsamples=nsamples, nsamples_per_chain=nsamples_per_chain) - current_state, current_log_pdf = [current_state_t.copy(), ], [current_log_pdf_t.copy(), ] - for mcmc_sampler in self.mcmc_samplers[1:]: - _, _, current_state_t, current_log_pdf_t = mcmc_sampler._initialize_samples( + current_state, current_log_pdf = [], [] + final_ns_per_chain = 0 + for i, mcmc_sampler in enumerate(self.mcmc_samplers): + if mcmc_sampler.evaluate_log_target is None and mcmc_sampler.evaluate_log_target_marginals is None: + (mcmc_sampler.evaluate_log_target, mcmc_sampler.evaluate_log_target_marginals,) = \ + mcmc_sampler._preprocess_target(pdf_=mcmc_sampler.pdf_target, + log_pdf_=mcmc_sampler.log_pdf_target, + args=mcmc_sampler.args_target) + ns, ns_per_chain, current_state_t, current_log_pdf_t = mcmc_sampler._initialize_samples( nsamples=nsamples, nsamples_per_chain=nsamples_per_chain) current_state.append(current_state_t.copy()) current_log_pdf.append(current_log_pdf_t.copy()) + if i == 0: + final_ns_per_chain = ns_per_chain self.logger.info('UQpy: Running MCMC...') # Run nsims iterations of the MCMC algorithm, starting at current_state while self.mcmc_samplers[0].nsamples_per_chain < final_ns_per_chain: # update the total number of iterations - # self.mcmc_samplers[0].niterations += 1 # run one iteration of MCMC algorithms at various temperatures new_state, new_log_pdf = [], [] for t, sampler in enumerate(self.mcmc_samplers): - sampler.niterations += 1 + sampler.iterations_number += 1 new_state_t, new_log_pdf_t = sampler.run_one_iteration( current_state[t], current_log_pdf[t]) new_state.append(new_state_t.copy()) new_log_pdf.append(new_log_pdf_t.copy()) # Do sweeps if necessary - if self.mcmc_samplers[-1].niterations % self.niter_between_sweeps == 0: - for i in range(self.n_temper_params - 1): + if self.mcmc_samplers[-1].iterations_number % self.n_iterations_between_sweeps == 0: + for i in range(self.n_tempering_parameters - 1): log_accept = (self.mcmc_samplers[i].evaluate_log_target(new_state[i + 1]) + self.mcmc_samplers[i + 1].evaluate_log_target(new_state[i]) - self.mcmc_samplers[i].evaluate_log_target(new_state[i]) - @@ -162,22 +162,20 @@ def _run(self, nsamples=None, nsamples_per_chain=None): # Update the chain, only if burn-in is over and the sample is not being jumped over # also increase the current number of samples and samples_per_chain - if self.mcmc_samplers[-1].niterations > self.mcmc_samplers[-1].nburn and \ - (self.mcmc_samplers[-1].niterations - self.mcmc_samplers[-1].nburn) % self.mcmc_samplers[ - -1].jump == 0: + if self.mcmc_samplers[-1].iterations_number > self.mcmc_samplers[-1].burn_length and \ + (self.mcmc_samplers[-1].iterations_number - + self.mcmc_samplers[-1].burn_length) % self.mcmc_samplers[-1].jump == 0: for t, sampler in enumerate(self.mcmc_samplers): sampler.samples[sampler.nsamples_per_chain, :, :] = new_state[t].copy() if self.save_log_pdf: sampler.log_pdf_values[sampler.nsamples_per_chain, :] = new_log_pdf[t].copy() sampler.nsamples_per_chain += 1 - sampler.nsamples += sampler.nchains - # self.nsamples_per_chain += 1 - # self.nsamples += self.nchains + sampler.samples_counter += sampler.n_chains self.logger.info('UQpy: MCMC run successfully !') # Concatenate chains maybe - if self.mcmc_samplers[-1].concat_chains: + if self.mcmc_samplers[-1].concatenate_chains: for t, mcmc_sampler in enumerate(self.mcmc_samplers): mcmc_sampler._concatenate_chains() @@ -217,20 +215,20 @@ def evaluate_normalization_constant(self, compute_potential, log_Z0=None, nsampl raise ValueError('UQpy: input log_Z0 or nsamples_from_p0 should be provided.') # compute average of log_target for the target at various temperatures log_pdf_averages = [] - for i, (temper_param, sampler) in enumerate(zip(self.temper_param_list, self.mcmc_samplers)): + for i, (temper_param, sampler) in enumerate(zip(self.tempering_parameters, self.mcmc_samplers)): log_factor_values = sampler.log_pdf_values - self.evaluate_log_reference(sampler.samples) potential_values = compute_potential( x=sampler.samples, temper_param=temper_param, log_intermediate_values=log_factor_values) log_pdf_averages.append(np.mean(potential_values)) # use quadrature to integrate between 0 and 1 - temper_param_list_for_integration = np.copy(np.array(self.temper_param_list)) + temper_param_list_for_integration = np.copy(np.array(self.tempering_parameters)) log_pdf_averages = np.array(log_pdf_averages) int_value = trapz(x=temper_param_list_for_integration, y=log_pdf_averages) if log_Z0 is None: samples_p0 = self.distribution_reference.rvs(nsamples=nsamples_from_p0) log_Z0 = np.log(1. / nsamples_from_p0) + logsumexp( - self.evaluate_log_intermediate(x=samples_p0, temper_param=self.temper_param_list[0])) + self.evaluate_log_intermediate(x=samples_p0, temper_param=self.tempering_parameters[0])) self.thermodynamic_integration_results = { 'log_Z0': log_Z0, 'temper_param_list': temper_param_list_for_integration, diff --git a/src/UQpy/sampling/tempering_mcmc/TemperingMCMC.py b/src/UQpy/sampling/tempering_mcmc/TemperingMCMC.py index 571f7954..2d7b1008 100644 --- a/src/UQpy/sampling/tempering_mcmc/TemperingMCMC.py +++ b/src/UQpy/sampling/tempering_mcmc/TemperingMCMC.py @@ -37,7 +37,7 @@ def __init__(self, pdf_intermediate=None, log_pdf_intermediate=None, args_pdf_in if self.save_log_pdf: self.log_pdf_values = None - def _run(self, nsamples): + def run(self, nsamples): """ Run the tempering MCMC algorithms to generate nsamples from the target posterior """ pass diff --git a/tests/unit_tests/sampling/test_tempering.py b/tests/unit_tests/sampling/test_tempering.py index c0c7106f..c97d0ef3 100644 --- a/tests/unit_tests/sampling/test_tempering.py +++ b/tests/unit_tests/sampling/test_tempering.py @@ -1,9 +1,7 @@ import numpy as np from scipy.stats import multivariate_normal - from UQpy.distributions import Uniform, JointIndependent -from UQpy.sampling.tempering_mcmc import ParallelTemperingMCMC, SequentialTemperingMCMC -from UQpy.sampling.mcmc import MetropolisHastings +from UQpy.sampling import MetropolisHastings, ParallelTemperingMCMC, SequentialTemperingMCMC def log_rosenbrock(x): @@ -19,30 +17,41 @@ def log_prior(x): return Uniform(loc=loc, scale=scale).log_pdf(x[:, 0]) + Uniform(loc=loc, scale=scale).log_pdf(x[:, 1]) -def compute_potential(x, beta, log_intermediate_values): - return log_intermediate_values / beta +def compute_potential(x, temper_param, log_intermediate_values): + return log_intermediate_values / temper_param random_state = np.random.RandomState(1234) seed = -2. + 4. * random_state.rand(5, 2) betas = [1. / np.sqrt(2.) ** i for i in range(10 - 1, -1, -1)] +prior_distribution = JointIndependent(marginals=[Uniform(loc=-2, scale=4), Uniform(loc=-2, scale=4)]) + def test_parallel(): - mcmc = ParallelTemperingMCMC(log_pdf_intermediate=log_intermediate, log_pdf_reference=log_prior, - niter_between_sweeps=4, betas=betas, save_log_pdf=True, - mcmc_class=MH, nburn=10, jump=2, seed=seed, dimension=2, random_state=3456) + sampler = MetropolisHastings(burn_length=10, jump=2, seed=list(seed), dimension=2) + mcmc = ParallelTemperingMCMC(log_pdf_intermediate=log_intermediate, + distribution_reference=prior_distribution, + n_iterations_between_sweeps=4, + tempering_parameters=betas, + random_state=3456, + save_log_pdf=False, sampler=sampler) mcmc.run(nsamples_per_chain=100) assert mcmc.samples.shape == (500, 2) def test_thermodynamic_integration(): - mcmc = ParallelTemperingMCMC(log_pdf_intermediate=log_intermediate, log_pdf_reference=log_prior, - niter_between_sweeps=4, betas=betas, save_log_pdf=True, - mcmc_class=MH, nburn=10, jump=2, seed=seed, dimension=2, random_state=3456) + sampler = MetropolisHastings(burn_length=10, jump=2, seed=list(seed), dimension=2) + mcmc = ParallelTemperingMCMC(log_pdf_intermediate=log_intermediate, + distribution_reference=prior_distribution, + n_iterations_between_sweeps=4, + tempering_parameters=betas, + save_log_pdf=True, + random_state=3456, + sampler=sampler) mcmc.run(nsamples_per_chain=100) - log_ev = mcmc.evaluate_normalization_constant(compute_potential=compute_potential, log_p0=0.) - assert np.round(np.exp(log_ev), 4) == 0.1885 + log_ev = mcmc.evaluate_normalization_constant(compute_potential=compute_potential, log_Z0=0.) + assert np.round(log_ev, 4) == 0.203 def likelihood(x, b): @@ -55,8 +64,8 @@ def likelihood(x, b): # Posterior is a mixture of two gaussians like = np.exp(np.logaddexp(np.log(w1) + multivariate_normal.logpdf(x=x, mean=mu1, cov=sigma1), - np.log(1.-w1) + multivariate_normal.logpdf(x=x, mean=mu2, cov=sigma2))) - return like**b + np.log(1. - w1) + multivariate_normal.logpdf(x=x, mean=mu2, cov=sigma2))) + return like ** b def test_sequential(): @@ -65,4 +74,3 @@ def test_sequential(): distribution_reference=prior, nchains=20, save_intermediate_samples=True, percentage_resampling=10, mcmc_class=MH, verbose=False, random_state=960242069) assert np.round(test.evidence, 4) == 0.0656 -