Source code for abcpy.inferences

import copy
import logging
import time
from abc import abstractproperty

import numpy as np
from scipy import optimize

from abcpy.acceptedparametersmanager import *
from abcpy.graphtools import GraphTools
from abcpy.jointapprox_lhd import SumCombination
from abcpy.jointdistances import LinearCombination
from abcpy.output import Journal
from abcpy.perturbationkernel import DefaultKernel
from abcpy.probabilisticmodels import *
from abcpy.utils import cached


[docs]class InferenceMethod(GraphTools, metaclass=ABCMeta): """ This abstract base class represents an inference method. """ def __getstate__(self): """Cloudpickle is used with the MPIBackend. This function ensures that the backend itself is not pickled """ state = self.__dict__.copy() del state['backend'] return state
[docs] @abstractmethod def sample(self): """To be overwritten by any sub-class: Samples from the posterior distribution of the model parameter given the observed data observations. """ raise NotImplementedError
@abstractproperty def model(self): """To be overwritten by any sub-class: an attribute specifying the model to be used """ raise NotImplementedError @abstractproperty def rng(self): """To be overwritten by any sub-class: an attribute specifying the random number generator to be used """ raise NotImplementedError @abstractproperty def backend(self): """To be overwritten by any sub-class: an attribute specifying the backend to be used.""" raise NotImplementedError @abstractproperty def n_samples(self): """To be overwritten by any sub-class: an attribute specifying the number of samples to be generated """ raise NotImplementedError @abstractproperty def n_samples_per_param(self): """To be overwritten by any sub-class: an attribute specifying the number of data points in each simulated data set.""" raise NotImplementedError
[docs]class BaseMethodsWithKernel(metaclass=ABCMeta): """ This abstract base class represents inference methods that have a kernel. """ @abstractproperty def kernel(self): """To be overwritten by any sub-class: an attribute specifying the transition or perturbation kernel.""" raise NotImplementedError
[docs] def perturb(self, column_index, epochs=10, rng=np.random.RandomState()): """ Perturbs all free parameters, given the current weights. Commonly used during inference. Parameters ---------- column_index: integer The index of the column in the accepted_parameters_bds that should be used for perturbation epochs: integer The number of times perturbation should happen before the algorithm is terminated Returns ------- boolean Whether it was possible to set new parameter values for all probabilistic models """ current_epoch = 0 while current_epoch < epochs: # Get new parameters of the graph new_parameters = self.kernel.update(self.accepted_parameters_manager, column_index, rng=rng) self._reset_flags() # Order the parameters provided by the kernel in depth-first search order correctly_ordered_parameters = self.get_correct_ordering(new_parameters) # Try to set new parameters accepted, last_index = self.set_parameters(correctly_ordered_parameters, 0) if accepted: break current_epoch += 1 if current_epoch == 10: return [False] return [True, correctly_ordered_parameters]
[docs]class BaseLikelihood(InferenceMethod, BaseMethodsWithKernel, metaclass=ABCMeta): """ This abstract base class represents inference methods that use the likelihood. """ @abstractproperty def likfun(self): """To be overwritten by any sub-class: an attribute specifying the likelihood function to be used.""" raise NotImplementedError
[docs]class BaseDiscrepancy(InferenceMethod, BaseMethodsWithKernel, metaclass=ABCMeta): """ This abstract base class represents inference methods using descrepancy. """ @abstractproperty def distance(self): """To be overwritten by any sub-class: an attribute specifying the distance function.""" raise NotImplementedError
[docs]class RejectionABC(InferenceMethod): """This class implements the rejection algorithm based inference scheme [1] for Approximate Bayesian Computation. [1] Tavaré, S., Balding, D., Griffith, R., Donnelly, P.: Inferring coalescence times from DNA sequence data. Genetics 145(2), 505–518 (1997). Parameters ---------- model: list A list of the Probabilistic models corresponding to the observed datasets distance: abcpy.distances.Distance Distance object defining the distance measure to compare simulated and observed data sets. backend: abcpy.backends.Backend Backend object defining the backend to be used. seed: integer, optionaldistance Optional initial seed for the random number generator. The default value is generated randomly. """ # TODO: defining attributes as class attributes is not correct, move to init model = None distance = None rng = None n_samples = None n_samples_per_param = None epsilon = None backend = None
[docs] def __init__(self, root_models, distances, backend, seed=None): self.model = root_models # We define the joint Linear combination distance using all the distances for each individual models self.distance = LinearCombination(root_models, distances) self.backend = backend self.rng = np.random.RandomState(seed) self.logger = logging.getLogger(__name__) # An object managing the bds objects self.accepted_parameters_manager = AcceptedParametersManager(self.model) # counts the number of simulate calls self.simulation_counter = 0
[docs] def sample(self, observations, n_samples, n_samples_per_param, epsilon, full_output=0): """ Samples from the posterior distribution of the model parameter given the observed data observations. Parameters ---------- observations: list A list, containing lists describing the observed data sets n_samples: integer Number of samples to generate n_samples_per_param: integer Number of data points in each simulated data set. epsilon: float Value of threshold full_output: integer, optional If full_output==1, intermediate results are included in output journal. The default value is 0, meaning the intermediate results are not saved. Returns ------- abcpy.output.Journal a journal containing simulation results, metadata and optionally intermediate results. """ self.accepted_parameters_manager.broadcast(self.backend, observations) self.n_samples = n_samples self.n_samples_per_param = n_samples_per_param self.epsilon = epsilon journal = Journal(full_output) journal.configuration["n_samples"] = self.n_samples journal.configuration["n_samples_per_param"] = self.n_samples_per_param journal.configuration["epsilon"] = self.epsilon accepted_parameters = None # main Rejection ABC algorithm seed_arr = self.rng.randint(1, n_samples * n_samples, size=n_samples, dtype=np.int32) rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr]) rng_pds = self.backend.parallelize(rng_arr) accepted_parameters_distances_counter_pds = self.backend.map(self._sample_parameter, rng_pds) accepted_parameters_distances_counter = self.backend.collect(accepted_parameters_distances_counter_pds) accepted_parameters, distances, counter = [list(t) for t in zip(*accepted_parameters_distances_counter)] for count in counter: self.simulation_counter += count distances = np.array(distances) self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters) journal.add_accepted_parameters(copy.deepcopy(accepted_parameters)) journal.add_weights(np.ones((n_samples, 1))) journal.add_ESS_estimate(np.ones((n_samples, 1))) journal.add_distances(copy.deepcopy(distances)) self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters) names_and_parameters = self._get_names_and_parameters() journal.add_user_parameters(names_and_parameters) journal.number_of_simulations.append(self.simulation_counter) return journal
def _sample_parameter(self, rng, npc=None): """ Samples a single model parameter and simulates from it until distance between simulated outcome and the observation is smaller than epsilon. Parameters ---------- rng: random number generator The random number generator to be used. Returns ------- Tuple The first entry of the tuple is the accepted parameters. The second entry is the distance between the simulated data set and the observation, while the third one is the number of simulations needed to obtain the accepted parameter. """ distance = self.distance.dist_max() if distance < self.epsilon and self.logger: self.logger.warn("initial epsilon {:e} is larger than dist_max {:e}" .format(float(self.epsilon), distance)) counter = 0 while distance > self.epsilon: # Accept new parameter value if the distance is less than epsilon self.sample_from_prior(rng=rng) theta = self.get_parameters(self.model) y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter += 1 if y_sim is not None: distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) self.logger.debug("distance after {:4d} simulations: {:e}".format( counter, distance)) else: distance = self.distance.dist_max() self.logger.debug("Needed {:4d} simulations to reach distance {:e} < epsilon = {:e}".format(counter, distance, float( self.epsilon))) return theta, distance, counter
[docs]class PMCABC(BaseDiscrepancy, InferenceMethod): """ This class implements a modified version of Population Monte Carlo based inference scheme for Approximate Bayesian computation of Beaumont et. al. [1]. Here the threshold value at `t`-th generation are adaptively chosen by taking the maximum between the epsilon_percentile-th value of discrepancies of the accepted parameters at `t-1`-th generation and the threshold value provided for this generation by the user. If we take the value of epsilon_percentile to be zero (default), this method becomes the inference scheme described in [1], where the threshold values considered at each generation are the ones provided by the user. [1] M. A. Beaumont. Approximate Bayesian computation in evolution and ecology. Annual Review of Ecology, Evolution, and Systematics, 41(1):379–406, Nov. 2010. Parameters ---------- model : list A list of the Probabilistic models corresponding to the observed datasets distance : abcpy.distances.Distance Distance object defining the distance measure to compare simulated and observed data sets. kernel : abcpy.distributions.Distribution Distribution object defining the perturbation kernel needed for the sampling. backend : abcpy.backends.Backend Backend object defining the backend to be used. seed : integer, optional Optional initial seed for the random number generator. The default value is generated randomly. """ model = None distance = None kernel = None rng = None # default value, set so that testing works n_samples = 2 n_samples_per_param = None backend = None
[docs] def __init__(self, root_models, distances, backend, kernel=None, seed=None): self.model = root_models # We define the joint Linear combination distance using all the distances for each individual models self.distance = LinearCombination(root_models, distances) if kernel is None: mapping, garbage_index = self._get_mapping() models = [] for mdl, mdl_index in mapping: models.append(mdl) kernel = DefaultKernel(models) self.kernel = kernel self.backend = backend self.rng = np.random.RandomState(seed) self.logger = logging.getLogger(__name__) self.accepted_parameters_manager = AcceptedParametersManager(self.model) self.simulation_counter = 0
[docs] def sample(self, observations, steps, epsilon_init, n_samples=10000, n_samples_per_param=1, epsilon_percentile=10, covFactor=2, full_output=0, journal_file=None): """Samples from the posterior distribution of the model parameter given the observed data observations. Parameters ---------- observations : list A list, containing lists describing the observed data sets steps : integer Number of iterations in the sequential algoritm ("generations") epsilon_init : numpy.ndarray An array of proposed values of epsilon to be used at each steps. Can be supplied A single value to be used as the threshold in Step 1 or a `steps`-dimensional array of values to be used as the threshold in evry steps. n_samples : integer, optional Number of samples to generate. The default value is 10000. n_samples_per_param : integer, optional Number of data points in each simulated data set. The default value is 1. epsilon_percentile : float, optional A value between [0, 100]. The default value is 10. covFactor : float, optional scaling parameter of the covariance matrix. The default value is 2 as considered in [1]. full_output: integer, optional If full_output==1, intermediate results are included in output journal. The default value is 0, meaning the intermediate results are not saved. journal_file: str, optional Filename of a journal file to read an already saved journal file, from which the first iteration will start. The default value is None. Returns ------- abcpy.output.Journal A journal containing simulation results, metadata and optionally intermediate results. """ self.accepted_parameters_manager.broadcast(self.backend, observations) self.n_samples = n_samples self.n_samples_per_param = n_samples_per_param if journal_file is None: journal = Journal(full_output) journal.configuration["type_model"] = [type(model).__name__ for model in self.model] journal.configuration["type_dist_func"] = type(self.distance).__name__ journal.configuration["n_samples"] = self.n_samples journal.configuration["n_samples_per_param"] = self.n_samples_per_param journal.configuration["steps"] = steps journal.configuration["epsilon_percentile"] = epsilon_percentile else: journal = Journal.fromFile(journal_file) accepted_parameters = None accepted_weights = None accepted_cov_mats = None # Define epsilon_arr if len(epsilon_init) == steps: epsilon_arr = epsilon_init elif len(epsilon_init) == 1: epsilon_arr = [None] * steps epsilon_arr[0] = epsilon_init else: raise ValueError("The length of epsilon_init can only be equal to 1 or steps.") # main PMCABC algorithm self.logger.info("Starting PMC iterations") for aStep in range(steps): self.logger.debug("iteration {} of PMC algorithm".format(aStep)) if aStep == 0 and journal_file is not None: accepted_parameters = journal.get_accepted_parameters(-1) accepted_weights = journal.get_weights(-1) if hasattr(journal, "distances"): # if restarting from a journal, use the previous distances to check determine a new epsilon # (it if is larger than the epsilon_arr[0] provided here) self.logger.info("Calculating acceptances threshold from provided journal file") epsilon_arr[0] = np.max([np.percentile(journal.distances[-1], epsilon_percentile), epsilon_arr[0]]) self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters, accepted_weights=accepted_weights) kernel_parameters = [] for kernel in self.kernel.kernels: kernel_parameters.append( self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models)) self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters) # 3: calculate covariance self.logger.info("Calculateing covariance matrix") new_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager) # Since each entry of new_cov_mats is a numpy array, we can multiply like this # accepted_cov_mats = [covFactor * new_cov_mat for new_cov_mat in new_cov_mats] accepted_cov_mats = self._compute_accepted_cov_mats(covFactor, new_cov_mats) seed_arr = self.rng.randint(0, np.iinfo(np.uint32).max, size=n_samples, dtype=np.uint32) rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr]) rng_pds = self.backend.parallelize(rng_arr) # 0: update remotely required variables # print("INFO: Broadcasting parameters.") self.logger.info("Broadcasting parameters") self.epsilon = epsilon_arr[aStep] self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters, accepted_weights, accepted_cov_mats) # 1: calculate resample parameters # print("INFO: Resampling parameters") self.logger.info("Resampling parameters") params_and_dists_and_counter_pds = self.backend.map(self._resample_parameter, rng_pds) params_and_dists_and_counter = self.backend.collect(params_and_dists_and_counter_pds) new_parameters, distances, counter = [list(t) for t in zip(*params_and_dists_and_counter)] new_parameters = np.array(new_parameters) distances = np.array(distances) for count in counter: self.simulation_counter += count # Compute epsilon for next step # print("INFO: Calculating acceptance threshold (epsilon).") self.logger.info("Calculating acceptances threshold") if aStep < steps - 1: if epsilon_arr[aStep + 1] is None: epsilon_arr[aStep + 1] = np.percentile(distances, epsilon_percentile) else: epsilon_arr[aStep + 1] = np.max( [np.percentile(distances, epsilon_percentile), epsilon_arr[aStep + 1]]) # 2: calculate weights for new parameters self.logger.info("Calculating weights") new_parameters_pds = self.backend.parallelize(new_parameters) self.logger.info("Calculate weights") new_weights_pds = self.backend.map(self._calculate_weight, new_parameters_pds) new_weights = np.array(self.backend.collect(new_weights_pds)).reshape(-1, 1) sum_of_weights = np.sum(new_weights) new_weights = new_weights / sum_of_weights # The calculation of cov_mats needs the new weights and new parameters self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=new_parameters, accepted_weights=new_weights) # The parameters relevant to each kernel have to be used to calculate n_sample times. It is therefore more efficient to broadcast these parameters once, # instead of collecting them at each kernel in each step kernel_parameters = [] for kernel in self.kernel.kernels: kernel_parameters.append( self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models)) self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters) # 3: calculate covariance self.logger.info("Calculating covariance matrix") new_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager) # Since each entry of new_cov_mats is a numpy array, we can multiply like this # new_cov_mats = [covFactor * new_cov_mat for new_cov_mat in new_cov_mats] new_cov_mats = self._compute_accepted_cov_mats(covFactor, new_cov_mats) # 4: Update the newly computed values accepted_parameters = new_parameters accepted_weights = new_weights accepted_cov_mats = new_cov_mats self.logger.info("Save configuration to output journal") if (full_output == 1 and aStep <= steps - 1) or (full_output == 0 and aStep == steps - 1): journal.add_accepted_parameters(copy.deepcopy(accepted_parameters)) journal.add_distances(copy.deepcopy(distances)) journal.add_weights(copy.deepcopy(accepted_weights)) journal.add_ESS_estimate(accepted_weights) self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters, accepted_weights=accepted_weights) names_and_parameters = self._get_names_and_parameters() journal.add_user_parameters(names_and_parameters) journal.number_of_simulations.append(self.simulation_counter) # Add epsilon_arr to the journal if journal_file is not None and "epsilon_arr" in journal.configuration.keys(): journal.configuration["epsilon_arr"] += epsilon_arr else: journal.configuration["epsilon_arr"] = epsilon_arr return journal
def _resample_parameter(self, rng, npc=None): """ Samples a single model parameter and simulate from it until distance between simulated outcome and the observation is smaller than epsilon. Parameters ---------- seed: integer initial seed for the random number generator. Returns ------- Tuple The first entry of the tuple is the accepted parameters. The second entry is the distance between the simulated data set and the observation, while the third one is the number of simulations needed to obtain the accepted parameter. """ # print(npc.communicator()) rng.seed(rng.randint(np.iinfo(np.uint32).max, dtype=np.uint32)) distance = self.distance.dist_max() if distance < self.epsilon and self.logger: self.logger.warn("initial epsilon {:e} is larger than dist_max {:e}" .format(float(self.epsilon), distance)) theta = self.get_parameters() counter = 0 while distance > self.epsilon: if self.accepted_parameters_manager.accepted_parameters_bds is None: self.sample_from_prior(rng=rng) theta = self.get_parameters() y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter += 1 else: index = rng.choice(self.n_samples, size=1, p=self.accepted_parameters_manager.accepted_weights_bds.value().reshape(-1)) # truncate the normal to the bounds of parameter space of the model # truncating the normal like this is fine: https://arxiv.org/pdf/0907.4010v1.pdf while True: perturbation_output = self.perturb(index[0], rng=rng) if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0: theta = perturbation_output[1] break y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter += 1 distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) self.logger.debug("distance after {:4d} simulations: {:e}".format(counter, distance)) self.logger.debug("Needed {:4d} simulations to reach distance {:e} < epsilon = {:e}".format(counter, distance, float( self.epsilon))) return theta, distance, counter def _calculate_weight(self, theta, npc=None): """ Calculates the weight for the given parameter using accepted_parameters, accepted_cov_mat Parameters ---------- theta: np.array 1xp matrix containing model parameter, where p is the number of parameters Returns ------- float the new weight for theta """ self.logger.debug("_calculate_weight") if self.accepted_parameters_manager.kernel_parameters_bds is None: return 1.0 / self.n_samples else: prior_prob = self.pdf_of_prior(self.model, theta, 0) # Get the mapping of the models to be used by the kernels mapping_for_kernels, garbage_index = self.accepted_parameters_manager.get_mapping( self.accepted_parameters_manager.model) pdf_values = np.array([self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, self.accepted_parameters_manager.accepted_parameters_bds.value()[i], theta) for i in range(self.n_samples)]) denominator = np.sum(self.accepted_parameters_manager.accepted_weights_bds.value().reshape(-1) * pdf_values) return 1.0 * prior_prob / denominator def _compute_accepted_cov_mats(self, covFactor, new_cov_mats): """ Update the covariance matrices computed from data by multiplying them with covFactor and adding a small term in the diagonal for numerical stability. Parameters ---------- covFactor : float factor to correct the covariance matrices new_cov_mats : list list of covariance matrices computed from data Returns ------- list List of new accepted covariance matrices """ # accepted_cov_mats = [covFactor * cov_mat for cov_mat in accepted_cov_mats] accepted_cov_mats = [] for new_cov_mat in new_cov_mats: if not (new_cov_mat.size == 1): accepted_cov_mats.append( covFactor * new_cov_mat + 0.0001 * np.trace(new_cov_mat) * np.eye(new_cov_mat.shape[0])) else: accepted_cov_mats.append((covFactor * new_cov_mat + 0.0001 * new_cov_mat).reshape(1, 1)) return accepted_cov_mats
[docs]class PMC(BaseLikelihood, InferenceMethod): """ Population Monte Carlo based inference scheme of Cappé et. al. [1]. This algorithm assumes a likelihood function is available and can be evaluated at any parameter value given the oberved dataset. In absence of the likelihood function or when it can't be evaluated with a rational computational expenses, we use the approximated likelihood functions in abcpy.approx_lhd module, for which the argument of the consistency of the inference schemes are based on Andrieu and Roberts [2]. [1] Cappé, O., Guillin, A., Marin, J.-M., and Robert, C. P. (2004). Population Monte Carlo. Journal of Computational and Graphical Statistics, 13(4), 907–929. [2] C. Andrieu and G. O. Roberts. The pseudo-marginal approach for efficient Monte Carlo computations. Annals of Statistics, 37(2):697–725, 04 2009. Parameters ---------- model : list A list of the Probabilistic models corresponding to the observed datasets loglikfun : abcpy.approx_lhd.Approx_likelihood Approx_loglikelihood object defining the approximated loglikelihood to be used. kernel : abcpy.distributions.Distribution Distribution object defining the perturbation kernel needed for the sampling. backend : abcpy.backends.Backend Backend object defining the backend to be used. seed : integer, optional Optional initial seed for the random number generator. The default value is generated randomly. """ model = None likfun = None kernel = None rng = None n_samples = None n_samples_per_param = None backend = None
[docs] def __init__(self, root_models, loglikfuns, backend, kernel=None, seed=None): self.model = root_models # We define the joint Sum of Loglikelihood functions using all the loglikelihoods for each individual models self.likfun = SumCombination(root_models, loglikfuns) if kernel is None: mapping, garbage_index = self._get_mapping() models = [] for mdl, mdl_index in mapping: models.append(mdl) kernel = DefaultKernel(models) self.kernel = kernel self.backend = backend self.rng = np.random.RandomState(seed) self.logger = logging.getLogger(__name__) # these are usually big tables, so we broadcast them to have them once # per executor instead of once per task self.accepted_parameters_manager = AcceptedParametersManager(self.model) self.simulation_counter = 0
[docs] def sample(self, observations, steps, n_samples=10000, n_samples_per_param=100, covFactors=None, iniPoints=None, full_output=0, journal_file=None): """Samples from the posterior distribution of the model parameter given the observed data observations. Parameters ---------- observations : list A list, containing lists describing the observed data sets steps : integer number of iterations in the sequential algoritm ("generations") n_samples : integer, optional number of samples to generate. The default value is 10000. n_samples_per_param : integer, optional number of data points in each simulated data set. The default value is 100. covFactors : list of float, optional scaling parameter of the covariance matrix. The default is a p dimensional array of 1 when p is the dimension of the parameter. inipoints : numpy.ndarray, optional parameter vaulues from where the sampling starts. By default sampled from the prior. full_output: integer, optional If full_output==1, intermediate results are included in output journal. The default value is 0, meaning the intermediate results are not saved. journal_file: str, optional Filename of a journal file to read an already saved journal file, from which the first iteration will start. The default value is None. Returns ------- abcpy.output.Journal A journal containing simulation results, metadata and optionally intermediate results. """ self.sample_from_prior(rng=self.rng) self.accepted_parameters_manager.broadcast(self.backend, observations) self.n_samples = n_samples self.n_samples_per_param = n_samples_per_param if journal_file is None: journal = Journal(full_output) journal.configuration["type_model"] = [type(model).__name__ for model in self.model] journal.configuration["type_lhd_func"] = type(self.likfun).__name__ journal.configuration["n_samples"] = self.n_samples journal.configuration["n_samples_per_param"] = self.n_samples_per_param journal.configuration["steps"] = steps journal.configuration["covFactor"] = covFactors journal.configuration["iniPoints"] = iniPoints else: journal = Journal.fromFile(journal_file) accepted_parameters = None accepted_weights = None accepted_cov_mats = None new_theta = None dim = len(self.get_parameters()) # Initialize particles: When not supplied, randomly draw them from prior distribution # Weights of particles: Assign equal weights for each of the particles if iniPoints == None: accepted_parameters = [] for ind in range(0, n_samples): self.sample_from_prior(rng=self.rng) accepted_parameters.append(self.get_parameters()) accepted_weights = np.ones((n_samples, 1), dtype=np.float) / n_samples else: accepted_parameters = iniPoints accepted_weights = np.ones((iniPoints.shape[0], 1), dtype=np.float) / iniPoints.shape[0] if covFactors is None: covFactors = np.ones(shape=(len(self.kernel.kernels),)) self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters, accepted_weights=accepted_weights) # The parameters relevant to each kernel have to be used to calculate n_sample times. It is therefore more efficient # to broadcast these parameters once, instead of collecting them at each kernel in each step kernel_parameters = [] for kernel in self.kernel.kernels: kernel_parameters.append( self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models)) self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters) # 3: calculate covariance self.logger.info("Calculating covariance matrix") new_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager) # Since each entry of new_cov_mats is a numpy array, we can multiply like this accepted_cov_mats = self._compute_accepted_cov_mats(covFactors, new_cov_mats) # accepted_cov_mats = [covFactor * new_cov_mat for covFactor, new_cov_mat in zip(covFactors,new_cov_mats)] self.accepted_parameters_manager.update_broadcast(self.backend, accepted_cov_mats=accepted_cov_mats) # main SMC algorithm self.logger.info("Starting pmc iterations") for aStep in range(steps): if aStep == 0 and journal_file is not None: accepted_parameters = journal.get_accepted_parameters(-1) accepted_weights = journal.get_weights(-1) self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters, accepted_weights=accepted_weights) kernel_parameters = [] for kernel in self.kernel.kernels: kernel_parameters.append( self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models)) self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters) # 3: calculate covariance self.logger.info("Calculating covariance matrix") new_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager) # Since each entry of new_cov_mats is a numpy array, we can multiply like this accepted_cov_mats = self._compute_accepted_cov_mats(covFactors, new_cov_mats) # accepted_cov_mats = [covFactor * new_cov_mat for covFactor, new_cov_mat in zip(covFactors, new_cov_mats)] self.logger.info("Iteration {} of PMC algorithm".format(aStep)) # 0: update remotely required variables self.logger.info("Broadcasting parameters") self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters, accepted_weights=accepted_weights, accepted_cov_mats=accepted_cov_mats) # 1: Resample parameters self.logger.info("Resample parameters") index = self.rng.choice(len(accepted_parameters), size=n_samples, p=accepted_weights.reshape(-1)) # Choose a new particle using the resampled particle (make the boundary proper) # Initialize new_parameters new_parameters = [] for ind in range(0, self.n_samples): while True: perturbation_output = self.perturb(index[ind], rng=self.rng) if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0: new_parameters.append(perturbation_output[1]) break # 2: calculate approximate likelihood for new parameters self.logger.info("Calculate approximate loglikelihood") seed_arr = self.rng.randint(0, np.iinfo(np.uint32).max, size=self.n_samples, dtype=np.uint32) rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr]) data_arr = [] for i in range(len(rng_arr)): data_arr.append([new_parameters[i], rng_arr[i]]) data_pds = self.backend.parallelize(data_arr) approx_log_likelihood_new_parameters_and_counter_pds = self.backend.map(self._approx_log_lik_calc, data_pds) self.logger.debug("collect approximate likelihood from pds") approx_log_likelihood_new_parameters_and_counter = self.backend.collect( approx_log_likelihood_new_parameters_and_counter_pds) approx_log_likelihood_new_parameters, counter = [list(t) for t in zip(*approx_log_likelihood_new_parameters_and_counter)] approx_log_likelihood_new_parameters = np.array(approx_log_likelihood_new_parameters).reshape(-1, 1) for count in counter: self.simulation_counter += count # 3: calculate new weights for new parameters self.logger.info("Calculating weights") new_parameters_pds = self.backend.parallelize(new_parameters) new_weights_pds = self.backend.map(self._calculate_weight, new_parameters_pds) new_weights = np.array(self.backend.collect(new_weights_pds)).reshape(-1, 1) new_log_weights = np.log(new_weights) + approx_log_likelihood_new_parameters # we get numerical issues often: self.logger.info("range_of_weights (log): {}".format(np.max(new_log_weights) - np.min(new_log_weights))) # sorted_weights = np.sort(new_log_weights) # self.logger.info("Difference first second largest (log): {}".format(sorted_weights[-1] - sorted_weights[-2])) # center on log scale (avoid numerical issues): new_weights = np.exp(new_log_weights - np.max(new_log_weights)) sum_of_weights = np.sum(new_weights) new_weights = new_weights / sum_of_weights # self.logger.info("new_weights : ", new_weights, ", sum_of_weights : ", sum_of_weights) self.logger.info("sum_of_weights : {}".format(sum_of_weights)) self.logger.info("ESS : {}".format(1 / sum(pow(new_weights / sum(new_weights), 2))[0])) accepted_parameters = new_parameters self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters, accepted_weights=new_weights) # 4: calculate covariance # The parameters relevant to each kernel have to be used to calculate n_sample times. It is therefore more efficient to broadcast these parameters once, instead of collecting them at each kernel in each step self.logger.info("Calculating covariance matrix") kernel_parameters = [] for kernel in self.kernel.kernels: kernel_parameters.append( self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models)) self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters) # 3: calculate covariance self.logger.info("Calculating covariance matrix") new_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager) # Since each entry of new_cov_mats is a numpy array, we can multiply like this accepted_cov_mats = self._compute_accepted_cov_mats(covFactors, new_cov_mats) # new_cov_mats = [covFactor * new_cov_mat for covFactor, new_cov_mat in zip(covFactors, new_cov_mats)] # 5: Update the newly computed values accepted_parameters = new_parameters accepted_weights = new_weights # accepted_cov_mats = new_cov_mats self.logger.info("Saving configuration to output journal") if (full_output == 1 and aStep <= steps - 1) or (full_output == 0 and aStep == steps - 1): journal.add_accepted_parameters(copy.deepcopy(accepted_parameters)) journal.add_weights(copy.deepcopy(accepted_weights)) journal.add_ESS_estimate(accepted_weights) self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters, accepted_weights=accepted_weights) names_and_parameters = self._get_names_and_parameters() journal.add_user_parameters(names_and_parameters) journal.number_of_simulations.append(self.simulation_counter) return journal
# define helper functions for map step def _approx_log_lik_calc(self, data, npc=None): """ Compute likelihood for new parameters using approximate likelihood function Parameters ---------- data: list A list containing a parameter value and a random numpy state, e.g. [theta, rng] Returns ------- float The approximated likelihood function """ # Extract theta and rng theta, rng = data[0], data[1] # Simulate the fake data from the model given the parameter value theta self.logger.debug("Simulate model for parameter " + str(theta)) self.set_parameters(theta) y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) self.logger.debug("Extracting observation.") obs = self.accepted_parameters_manager.observations_bds.value() self.logger.debug("Computing likelihood...") total_pdf_at_theta = 1. # lhd = self.likfun.likelihood(obs, y_sim) loglhd = self.likfun.loglikelihood(obs, y_sim) self.logger.debug("LogLikelihood is :" + str(loglhd)) log_pdf_at_theta = np.log(self.pdf_of_prior(self.model, theta)) self.logger.debug("Prior pdf evaluated at theta is :" + str(log_pdf_at_theta)) log_pdf_at_theta += loglhd return log_pdf_at_theta, self.n_samples_per_param def _calculate_weight(self, theta, npc=None): """ Calculates the weight for the given parameter using accepted_parameters, accepted_cov_mat Parameters ---------- theta: np.ndarray 1xp matrix containing the model parameters, where p is the number of parameters Returns ------- float The new weight for theta """ self.logger.debug("_calculate_weight") if self.accepted_parameters_manager.accepted_weights_bds is None: return 1.0 / self.n_samples else: prior_prob = self.pdf_of_prior(self.model, theta) mapping_for_kernels, garbage_index = self.accepted_parameters_manager.get_mapping( self.accepted_parameters_manager.model) pdf_values = np.array([self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, self.accepted_parameters_manager.accepted_parameters_bds.value()[i], theta) for i in range(self.n_samples)]) denominator = np.sum(self.accepted_parameters_manager.accepted_weights_bds.value().reshape(-1) * pdf_values) return 1.0 * prior_prob / denominator def _compute_accepted_cov_mats(self, covFactors, new_cov_mats): """ Update the covariance matrices computed from data by multiplying them with covFactors and adding a small term in the diagonal for numerical stability. Parameters ---------- covFactors : list of float factors to correct the covariance matrices new_cov_mats : list list of covariance matrices computed from data Returns ------- list List of new accepted covariance matrices """ accepted_cov_mats = [] for covFactor, new_cov_mat in zip(covFactors, new_cov_mats): if not (new_cov_mat.size == 1): accepted_cov_mats.append( covFactor * new_cov_mat + 0.0001 * np.trace(new_cov_mat) * np.eye(new_cov_mat.shape[0])) else: accepted_cov_mats.append((covFactor * new_cov_mat + 0.0001 * new_cov_mat).reshape(1, 1)) return accepted_cov_mats
[docs]class SABC(BaseDiscrepancy, InferenceMethod): """ This class implements a modified version of Simulated Annealing Approximate Bayesian Computation (SABC) of [1] when the prior is non-informative. [1] C. Albert, H. R. Kuensch and A. Scheidegger. A Simulated Annealing Approach to Approximate Bayes Computations. Statistics and Computing, (2014). Parameters ---------- model : list A list of the Probabilistic models corresponding to the observed datasets distance : abcpy.distances.Distance Distance object defining the distance measure used to compare simulated and observed data sets. kernel : abcpy.distributions.Distribution Distribution object defining the perturbation kernel needed for the sampling. backend : abcpy.backends.Backend Backend object defining the backend to be used. seed : integer, optional Optional initial seed for the random number generator. The default value is generated randomly. """ model = None distance = None kernel = None rng = None n_samples = None n_samples_per_param = None epsilon = None smooth_distances_bds = None all_distances_bds = None backend = None
[docs] def __init__(self, root_models, distances, backend, kernel=None, seed=None): self.model = root_models # We define the joint Linear combination distance using all the distances for each individual models self.distance = LinearCombination(root_models, distances) if kernel is None: mapping, garbage_index = self._get_mapping() models = [] for mdl, mdl_index in mapping: models.append(mdl) kernel = DefaultKernel(models) self.kernel = kernel self.backend = backend self.rng = np.random.RandomState(seed) self.logger = logging.getLogger(__name__) # these are usually big tables, so we broadcast them to have them once # per executor instead of once per task self.smooth_distances_bds = None self.all_distances_bds = None self.accepted_parameters_manager = AcceptedParametersManager(self.model) self.simulation_counter = 0
[docs] def sample(self, observations, steps, epsilon, n_samples=10000, n_samples_per_param=1, beta=2, delta=0.2, v=0.3, ar_cutoff=0.1, resample=None, n_update=None, full_output=0, journal_file=None): """Samples from the posterior distribution of the model parameter given the observed data observations. Parameters ---------- observations : list A list, containing lists describing the observed data sets steps : integer Number of maximum iterations in the sequential algoritm ("generations") epsilon : numpy.float A proposed value of threshold to start with. n_samples : integer, optional Number of samples to generate. The default value is 10000. n_samples_per_param : integer, optional Number of data points in each simulated data set. The default value is 1. beta : numpy.float, optional Tuning parameter of SABC, default value is 2. Used to scale up the covariance matrices obtained from data. delta : numpy.float, optional Tuning parameter of SABC, default value is 0.2. v : numpy.float, optional Tuning parameter of SABC, The default value is 0.3. ar_cutoff : numpy.float, optional Acceptance ratio cutoff: if the acceptance rate at some iteration of the algorithm is lower than that, the algorithm will stop. The default value is 0.1. resample: int, optional At any iteration, perform a resampling step if the number of accepted particles is larger than resample. When not provided, it assumes resample to be equal to n_samples. n_update: int, optional Number of perturbed parameters at each step, The default value is None which takes value inside n_samples full_output: integer, optional If full_output==1, intermediate results are included in output journal. The default value is 0, meaning the intermediate results are not saved. journal_file: str, optional Filename of a journal file to read an already saved journal file, from which the first iteration will start. The default value is None. Returns ------- abcpy.output.Journal A journal containing simulation results, metadata and optionally intermediate results. """ global broken_preemptively self.sample_from_prior(rng=self.rng) self.accepted_parameters_manager.broadcast(self.backend, observations) self.epsilon = epsilon self.n_samples = n_samples self.n_samples_per_param = n_samples_per_param if journal_file is None: journal = Journal(full_output) journal.configuration["type_model"] = [type(model).__name__ for model in self.model] journal.configuration["type_dist_func"] = type(self.distance).__name__ journal.configuration["type_kernel_func"] = type(self.kernel) journal.configuration["n_samples"] = self.n_samples journal.configuration["n_samples_per_param"] = self.n_samples_per_param journal.configuration["beta"] = beta journal.configuration["delta"] = delta journal.configuration["v"] = v journal.configuration["ar_cutoff"] = ar_cutoff journal.configuration["resample"] = resample journal.configuration["n_update"] = n_update journal.configuration["full_output"] = full_output else: journal = Journal.fromFile(journal_file) accepted_parameters = None distances = np.zeros(shape=(n_samples,)) smooth_distances = np.zeros(shape=(n_samples,)) accepted_weights = np.ones(shape=(n_samples, 1)) all_distances = None accepted_cov_mat = None if resample is None: resample = n_samples if n_update is None: n_update = n_samples sample_array = np.ones(shape=(steps,)) sample_array[0] = n_samples sample_array[1:] = n_update # Acceptance counter to determine the resampling step accept = 0 samples_until = 0 # Counter whether broken preemptively broken_preemptively = False for aStep in range(0, steps): self.logger.debug("step {}".format(aStep)) if aStep == 0 and journal_file is not None: accepted_parameters = journal.get_accepted_parameters(-1) accepted_weights = journal.get_weights(-1) # Broadcast Accepted parameters and Accepted weights self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters, accepted_weights=accepted_weights) kernel_parameters = [] for kernel in self.kernel.kernels: kernel_parameters.append( self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models)) # Broadcast Accepted Kernel parameters self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters) new_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager) accepted_cov_mats = self._compute_accepted_cov_mats(beta, new_cov_mats) # Broadcast Accepted Covariance Matrix self.accepted_parameters_manager.update_broadcast(self.backend, accepted_cov_mats=accepted_cov_mats) # main SABC algorithm # print("INFO: Initialization of SABC") seed_arr = self.rng.randint(0, np.iinfo(np.uint32).max, size=int(sample_array[aStep]), dtype=np.uint32) rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr]) index_arr = self.rng.randint(0, self.n_samples, size=int(sample_array[aStep]), dtype=np.uint32) data_arr = [] for i in range(len(rng_arr)): data_arr.append([rng_arr[i], index_arr[i]]) data_pds = self.backend.parallelize(data_arr) # 0: update remotely required variables self.logger.info("Broadcasting parameters") self.epsilon = epsilon self._update_broadcasts(smooth_distances, all_distances) # 1: Calculate parameters self.logger.info("Initial accepted parameters") params_and_dists_pds = self.backend.map(self._accept_parameter, data_pds) self.logger.debug("Map step of parallelism is finished") params_and_dists = self.backend.collect(params_and_dists_pds) self.logger.debug("Collect step of parallelism is finished") new_parameters, new_distances, new_all_parameters, new_all_distances, index, acceptance, counter = [ list(t) for t in zip(*params_and_dists)] # Keeping counter of number of simulations self.logger.debug("Counting number of simulations") for count in counter: self.simulation_counter += count # new_parameters = np.array(new_parameters) new_distances = np.array(new_distances) new_all_distances = np.concatenate(new_all_distances) index = np.array(index) acceptance = np.array(acceptance) # Reading all_distances at Initial step if aStep == 0: index = np.linspace(0, n_samples - 1, n_samples).astype(int).reshape(n_samples, ) accept = 0 all_distances = new_all_distances # Initialize/Update the accepted parameters and their corresponding distances self.logger.info("Initialize/Update the accepted parameters and their corresponding distances") if accepted_parameters is None: accepted_parameters = new_parameters else: for ind in range(len(acceptance)): if acceptance[ind] == 1: accepted_parameters[index[ind]] = new_parameters[ind] distances[index[acceptance == 1]] = new_distances[acceptance == 1] # 2: Smoothing of the distances self.logger.info("Smoothing of the distance") smooth_distances[index[acceptance == 1]] = self._smoother_distance(distances[index[acceptance == 1]], all_distances) # 3: Initialize/Update U, epsilon and covariance of perturbation kernel self.logger.info("Initialize/Update U, epsilon and covariance of perturbation kernel") if aStep == 0: U = self._average_redefined_distance(self._smoother_distance(all_distances, all_distances), epsilon) else: U = np.mean(smooth_distances) epsilon = self._schedule(U, v) # 4: Show progress and if acceptance rate smaller than a value break the iteration if aStep > 0: accept = accept + np.sum(acceptance) samples_until = samples_until + sample_array[aStep] acceptance_rate = accept / samples_until msg = ("updates= {:.2f}, epsilon= {}, u.mean={:e}, acceptance rate: {:.2f}".format( np.sum(sample_array[1:aStep + 1]) / np.sum(sample_array[1:]) * 100, epsilon, U, acceptance_rate)) self.logger.info(msg) if acceptance_rate < ar_cutoff: broken_preemptively = True self.logger.info("Stopping as acceptance rate is lower than cutoff") break # 5: Resampling if number of accepted particles greater than resample if accept >= resample and U > 1e-100: self.logger.info("Weighted resampling") weight = np.exp(-smooth_distances * delta / U) weight = weight / sum(weight) index_resampled = self.rng.choice(n_samples, n_samples, replace=True, p=weight) accepted_parameters = [accepted_parameters[i] for i in index_resampled] smooth_distances = smooth_distances[index_resampled] distances = distances[index_resampled] # Update U and epsilon: epsilon = epsilon * (1 - delta) U = np.mean(smooth_distances) epsilon = self._schedule(U, v) # Print effective sampling size self.logger.info('Resampling: Effective sampling size: ' + str(1 / sum(pow(weight / sum(weight), 2)))) accept = 0 samples_until = 0 # Compute and broadcast accepted parameters, accepted kernel parameters and accepted Covariance matrix # Broadcast Accepted parameters and add to journal self.logger.info("Broadcast Accepted parameters and add to journal") self.accepted_parameters_manager.update_broadcast(self.backend, accepted_weights=accepted_weights, accepted_parameters=accepted_parameters) # Compute Accepted Kernel parameters and broadcast them self.logger.debug("Compute Accepted Kernel parameters and broadcast them") kernel_parameters = [] for kernel in self.kernel.kernels: kernel_parameters.append( self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models)) self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters) # Compute Kernel Covariance Matrix and broadcast it self.logger.debug("Compute Kernel Covariance Matrix and broadcast it") new_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager) accepted_cov_mats = self._compute_accepted_cov_mats(beta, new_cov_mats) self.accepted_parameters_manager.update_broadcast(self.backend, accepted_cov_mats=accepted_cov_mats) if full_output == 1 and aStep <= steps - 1: # Saving intermediate configuration to output journal. self.logger.info('Saving after resampling') journal.add_accepted_parameters(copy.deepcopy(accepted_parameters)) journal.add_weights(copy.deepcopy(accepted_weights)) journal.add_ESS_estimate(accepted_weights) journal.add_distances(copy.deepcopy(distances)) names_and_parameters = self._get_names_and_parameters() journal.add_user_parameters(names_and_parameters) journal.number_of_simulations.append(self.simulation_counter) else: # Compute and broadcast accepted parameters, accepted kernel parameters and accepted Covariance matrix # Broadcast Accepted parameters self.accepted_parameters_manager.update_broadcast(self.backend, accepted_weights=accepted_weights, accepted_parameters=accepted_parameters) # Compute Accepted Kernel parameters and broadcast them kernel_parameters = [] for kernel in self.kernel.kernels: kernel_parameters.append( self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models)) self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters) # Compute Kernel Covariance Matrix and broadcast it new_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager) accepted_cov_mats = self._compute_accepted_cov_mats(beta, new_cov_mats) self.accepted_parameters_manager.update_broadcast(self.backend, accepted_cov_mats=accepted_cov_mats) if full_output == 1 and aStep <= steps - 1: # Saving intermediate configuration to output journal. journal.add_accepted_parameters(copy.deepcopy(accepted_parameters)) journal.add_weights(copy.deepcopy(accepted_weights)) journal.add_ESS_estimate(accepted_weights) journal.add_distances(copy.deepcopy(distances)) names_and_parameters = self._get_names_and_parameters() journal.add_user_parameters(names_and_parameters) journal.number_of_simulations.append(self.simulation_counter) # Add epsilon_arr, number of final steps and final output to the journal # print("INFO: Saving final configuration to output journal.") if (full_output == 0) or (full_output == 1 and broken_preemptively and aStep <= steps - 1): journal.add_accepted_parameters(copy.deepcopy(accepted_parameters)) journal.add_weights(copy.deepcopy(accepted_weights)) journal.add_ESS_estimate(accepted_weights) journal.add_distances(copy.deepcopy(distances)) self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters, accepted_weights=accepted_weights) names_and_parameters = self._get_names_and_parameters() journal.add_user_parameters(names_and_parameters) journal.number_of_simulations.append(self.simulation_counter) journal.configuration["steps"] = aStep + 1 journal.configuration["epsilon"] = epsilon return journal
def _smoother_distance(self, distance, old_distance): """Smooths the distance using the Equation 14 of [1]. [1] C. Albert, H. R. Kuensch and A. Scheidegger. A Simulated Annealing Approach to Approximate Bayes Computations. Statistics and Computing 0960-3174 (2014). Parameters ---------- distance: numpy.ndarray Current distance between the simulated and observed data old_distance: numpy.ndarray Last distance between the simulated and observed data Returns ------- numpy.ndarray Smoothed distance """ smoothed_distance = np.zeros(shape=(len(distance),)) for ind in range(0, len(distance)): if distance[ind] < np.min(old_distance): smoothed_distance[ind] = (distance[ind] / np.min(old_distance)) / len(old_distance) else: smoothed_distance[ind] = np.mean(np.array(old_distance) < distance[ind]) return smoothed_distance def _average_redefined_distance(self, distance, epsilon): """ Function to calculate the weighted average of the distance Parameters ---------- distance: numpy.ndarray Distance between simulated and observed data set epsilon: float threshold Returns ------- numpy.ndarray Weighted average of the distance """ if epsilon == 0: U = 0 else: U = np.average(distance, weights=np.exp(-distance / epsilon)) return U def _schedule(self, rho, v): if rho < 1e-100: epsilon = 0 else: fun = lambda epsilon: pow(epsilon, 2) + v * pow(epsilon, 3 / 2) - pow(rho, 2) epsilon = optimize.fsolve(fun, rho / 2) return epsilon def _update_broadcasts(self, smooth_distances, all_distances): def destroy(bc): if bc != None: bc.unpersist # bc.destroy if not smooth_distances is None: self.smooth_distances_bds = self.backend.broadcast(smooth_distances) if not all_distances is None: self.all_distances_bds = self.backend.broadcast(all_distances) # define helper functions for map step def _accept_parameter(self, data, npc=None): """ Samples a single model parameter and simulate from it until accepted with probabilty exp[-rho(x,y)/epsilon]. Parameters ---------- seed_and_index: list of two integers Initial seed for the random number generator and the index of data to operate on Returns ------- numpy.ndarray accepted parameter """ if isinstance(data, np.ndarray): data = data.tolist() rng = data[0] index = data[1] rng.seed(rng.randint(np.iinfo(np.uint32).max, dtype=np.uint32)) all_parameters = [] all_distances = [] acceptance = 0 counter = 0 if self.accepted_parameters_manager.accepted_cov_mats_bds is None: while acceptance == 0: self.sample_from_prior(rng=rng) new_theta = self.get_parameters() all_parameters.append(new_theta) t0 = time.time() y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) self.logger.debug("Simulation took " + str(time.time() - t0) + "sec") counter += 1 distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) all_distances.append(distance) acceptance = rng.binomial(1, np.exp(-distance / self.epsilon), 1) acceptance = 1 else: # Select one arbitrary particle: index = rng.choice(self.n_samples, size=1)[0] # Sample proposal parameter and calculate new distance: theta = self.accepted_parameters_manager.accepted_parameters_bds.value()[index] while True: perturbation_output = self.perturb(index, rng=rng) if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0: new_theta = perturbation_output[1] break t0 = time.time() y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) self.logger.debug("Simulation took " + str(time.time() - t0) + "sec") counter += 1 distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) smooth_distance = self._smoother_distance([distance], self.all_distances_bds.value()) # Calculate acceptance probability: self.logger.debug("Calulate acceptance probability") ratio_prior_prob = self.pdf_of_prior(self.model, perturbation_output[1]) / self.pdf_of_prior( self.model, self.accepted_parameters_manager.accepted_parameters_bds.value()[index]) ratio_likelihood_prob = np.exp((self.smooth_distances_bds.value()[index] - smooth_distance) / self.epsilon) acceptance_prob = ratio_prior_prob * ratio_likelihood_prob # If accepted if rng.rand(1) < acceptance_prob: acceptance = 1 else: distance = np.inf return new_theta, distance, all_parameters, all_distances, index, acceptance, counter def _compute_accepted_cov_mats(self, beta, new_cov_mats): """ Update the covariance matrices computed from data by multiplying them with beta and adding a small term in the diagonal for numerical stability. Parameters ---------- beta : float factor to correct the covariance matrices new_cov_mats : list list of covariance matrices computed from data Returns ------- list List of new accepted covariance matrices """ accepted_cov_mats = [] for new_cov_mat in new_cov_mats: if not (new_cov_mat.size == 1): accepted_cov_mats.append( beta * new_cov_mat + 0.0001 * np.trace(new_cov_mat) * np.eye(new_cov_mat.shape[0])) else: accepted_cov_mats.append((beta * new_cov_mat + 0.0001 * new_cov_mat).reshape(1, 1)) return accepted_cov_mats
[docs]class ABCsubsim(BaseDiscrepancy, InferenceMethod): """This class implements Approximate Bayesian Computation by subset simulation (ABCsubsim) algorithm of [1]. [1] M. Chiachio, J. L. Beck, J. Chiachio, and G. Rus., Approximate Bayesian computation by subset simulation. SIAM J. Sci. Comput., 36(3):A1339–A1358, 2014/10/03 2014. Parameters ---------- model : list A list of the Probabilistic models corresponding to the observed datasets distance : abcpy.distances.Distance Distance object defining the distance used to compare the simulated and observed data sets. kernel : abcpy.distributions.Distribution Distribution object defining the perturbation kernel needed for the sampling. backend : abcpy.backends.Backend Backend object defining the backend to be used. seed : integer, optional Optional initial seed for the random number generator. The default value is generated randomly. """ model = None distance = None kernel = None rng = None anneal_parameter = None n_samples = None n_samples_per_param = None chain_length = None backend = None
[docs] def __init__(self, root_models, distances, backend, kernel=None, seed=None): self.model = root_models # We define the joint Linear combination distance using all the distances for each individual models self.distance = LinearCombination(root_models, distances) if kernel is None: mapping, garbage_index = self._get_mapping() models = [] for mdl, mdl_index in mapping: models.append(mdl) kernel = DefaultKernel(models) self.kernel = kernel self.backend = backend self.rng = np.random.RandomState(seed) self.anneal_parameter = None self.logger = logging.getLogger(__name__) # these are usually big tables, so we broadcast them to have them once # per executor instead of once per task self.accepted_parameters_manager = AcceptedParametersManager(self.model) self.simulation_counter = 0
[docs] def sample(self, observations, steps, n_samples=10000, n_samples_per_param=1, chain_length=10, ap_change_cutoff=10, full_output=0, journal_file=None): """Samples from the posterior distribution of the model parameter given the observed data observations. Parameters ---------- observations : list A list, containing lists describing the observed data sets steps : integer Number of iterations in the sequential algoritm ("generations") n_samples : integer, optional Number of samples to generate. The default value is 10000. n_samples_per_param : integer, optional Number of data points in each simulated data set. The default value is 1. chain_length : int, optional The length of chains, default value is 10. But should be checked such that this is an divisor of n_samples. ap_change_cutoff : float, optional The cutoff value for the percentage change in the anneal parameter. If the change is less than ap_change_cutoff the iterations are stopped. The default value is 10. full_output: integer, optional If full_output==1, intermediate results are included in output journal. The default value is 0, meaning the intermediate results are not saved. journal_file: str, optional Filename of a journal file to read an already saved journal file, from which the first iteration will start. The default value is None. Returns ------- abcpy.output.Journal A journal containing simulation results, metadata and optionally intermediate results. """ self.sample_from_prior(rng=self.rng) self.accepted_parameters_manager.broadcast(self.backend, observations) self.chain_length = chain_length self.n_samples = n_samples self.n_samples_per_param = n_samples_per_param if journal_file is None: journal = Journal(full_output) journal.configuration["type_model"] = [type(model).__name__ for model in self.model] journal.configuration["type_dist_func"] = type(self.distance).__name__ journal.configuration["type_kernel_func"] = type(self.kernel) journal.configuration["n_samples"] = self.n_samples journal.configuration["n_samples_per_param"] = self.n_samples_per_param journal.configuration["chain_length"] = self.chain_length journal.configuration["ap_change_cutoff"] = ap_change_cutoff journal.configuration["full_output"] = full_output else: journal = Journal.fromFile(journal_file) accepted_parameters = None accepted_weights = np.ones(shape=(n_samples, 1)) accepted_cov_mat = None anneal_parameter = 0 anneal_parameter_old = 0 temp_chain_length = 1 for aStep in range(0, steps): self.logger.info("ABCsubsim step {}".format(aStep)) if aStep == 0 and journal_file is not None: accepted_parameters = journal.get_accepted_parameters(-1) accepted_weights = journal.get_weights(-1) accepted_cov_mats = journal.opt_values[-1] # main ABCsubsim algorithm self.logger.info("Initialization of ABCsubsim") seed_arr = self.rng.randint(0, np.iinfo(np.uint32).max, size=int(n_samples / temp_chain_length), dtype=np.uint32) rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr]) index_arr = np.linspace(0, n_samples // temp_chain_length - 1, n_samples // temp_chain_length).astype( int).reshape(int(n_samples / temp_chain_length), ) rng_and_index_arr = np.column_stack((rng_arr, index_arr)) rng_and_index_pds = self.backend.parallelize(rng_and_index_arr) # 0: update remotely required variables self.logger.info("Broadcasting parameters") self.accepted_parameters_manager.update_broadcast(self.backend, accepted_weights=accepted_weights, accepted_parameters=accepted_parameters) # 1: Calculate parameters # print("INFO: Initial accepted parameter parameters") self.logger.info("Initial accepted parameters") params_and_dists_pds = self.backend.map(self._accept_parameter, rng_and_index_pds) self.logger.debug("Map random number to a pseudo-observation") params_and_dists = self.backend.collect(params_and_dists_pds) self.logger.debug("Collect results from the mapping") new_parameters, new_distances, counter = [list(t) for t in zip(*params_and_dists)] for count in counter: self.simulation_counter += count if aStep > 0: accepted_parameters = [] for ind in range(len(new_parameters)): accepted_parameters += new_parameters[ind] else: accepted_parameters = new_parameters distances = np.concatenate(new_distances) # 2: Sort and renumber samples self.logger.debug("Sort and renumber samples.") accepted_params_and_dist = zip(distances, accepted_parameters) accepted_params_and_dist = sorted(accepted_params_and_dist, key=lambda x: x[0]) distances, accepted_parameters = [list(t) for t in zip(*accepted_params_and_dist)] # 3: Calculate and broadcast annealing parameters self.logger.debug("Calculate and broadcast annealling parameters.") temp_chain_length = self.chain_length if aStep > 0: anneal_parameter_old = anneal_parameter anneal_parameter = 0.5 * ( distances[int(n_samples / temp_chain_length)] + distances[int(n_samples / temp_chain_length) + 1]) self.anneal_parameter = anneal_parameter # 4: Update proposal covariance matrix (Parallelized) self.logger.debug("Update proposal covariance matrix (Parallelized).") if aStep == 0: self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters) kernel_parameters = [] for kernel in self.kernel.kernels: kernel_parameters.append( self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models)) self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters) accepted_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager) else: accepted_cov_mats = pow(2, 1) * accepted_cov_mats self.accepted_parameters_manager.update_broadcast(self.backend, accepted_cov_mats=accepted_cov_mats) seed_arr = self.rng.randint(0, np.iinfo(np.uint32).max, size=10, dtype=np.uint32) rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr]) index_arr = np.linspace(0, 10 - 1, 10).astype(int).reshape(10, ) rng_and_index_arr = np.column_stack((rng_arr, index_arr)) rng_and_index_pds = self.backend.parallelize(rng_and_index_arr) self.logger.debug("Update co-variance matrix in parallel (map).") cov_mats_index_pds = self.backend.map(self._update_cov_mat, rng_and_index_pds) self.logger.debug("Collect co-variance matrix.") cov_mats_index = self.backend.collect(cov_mats_index_pds) cov_mats, T, accept_index, counter = [list(t) for t in zip(*cov_mats_index)] for count in counter: self.simulation_counter += count for ind in range(10): if accept_index[ind] == 1: accepted_cov_mats = cov_mats[ind] break self.logger.debug("Broadcast accepted parameters.") self.accepted_parameters_manager.update_broadcast(self.backend, accepted_cov_mats=accepted_cov_mats) if full_output == 1: self.logger.info("Saving intermediate configuration to output journal") journal.add_accepted_parameters(copy.deepcopy(accepted_parameters)) journal.add_distances(copy.deepcopy(distances)) journal.add_weights(copy.deepcopy(accepted_weights)) journal.add_ESS_estimate(accepted_weights) journal.add_opt_values(accepted_cov_mats) self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters, accepted_weights=accepted_weights) names_and_parameters = self._get_names_and_parameters() journal.add_user_parameters(names_and_parameters) journal.number_of_simulations.append(self.simulation_counter) # Show progress anneal_parameter_change_percentage = 100 * abs(anneal_parameter_old - anneal_parameter) / abs( anneal_parameter) msg = ("step: {}, annealing parameter: {:.4f}, change(%) in annealing parameter: {:.1f}" .format(aStep, anneal_parameter, anneal_parameter_change_percentage)) self.logger.info(msg) if anneal_parameter_change_percentage < ap_change_cutoff: break # Add anneal_parameter, number of final steps and final output to the journal # print("INFO: Saving final configuration to output journal.") if full_output == 0: journal.add_accepted_parameters(copy.deepcopy(accepted_parameters)) journal.add_distances(copy.deepcopy(distances)) journal.add_weights(copy.deepcopy(accepted_weights)) journal.add_ESS_estimate(accepted_weights) journal.add_opt_values(accepted_cov_mats) self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters, accepted_weights=accepted_weights) names_and_parameters = self._get_names_and_parameters() journal.add_user_parameters(names_and_parameters) journal.number_of_simulations.append(self.simulation_counter) journal.configuration["steps"] = aStep + 1 journal.configuration["anneal_parameter"] = anneal_parameter return journal
# define helper functions for map step def _accept_parameter(self, rng_and_index, npc=None): """ Samples a single model parameter and simulate from it until distance between simulated outcome and the observation is smaller than epsilon. Parameters ---------- seed: numpy.ndarray 2 dimensional array. The first entry defines the initial seed of therandom number generator. The second entry defines the index in the data set. Returns ------- numpy.ndarray accepted parameter """ rng = rng_and_index[0] index = rng_and_index[1] rng.seed(rng.randint(np.iinfo(np.uint32).max, dtype=np.uint32)) mapping_for_kernels, garbage_index = self.accepted_parameters_manager.get_mapping( self.accepted_parameters_manager.model) result_theta = [] result_distance = [] counter = 0 if self.accepted_parameters_manager.accepted_parameters_bds is None: self.sample_from_prior(rng=rng) y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter += 1 distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) result_theta += self.get_parameters() result_distance.append(distance) else: theta = self.accepted_parameters_manager.accepted_parameters_bds.value()[index] self.set_parameters(theta) y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter += 1 distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) result_theta.append(theta) result_distance.append(distance) for ind in range(0, self.chain_length - 1): while True: perturbation_output = self.perturb(index, rng=rng) if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0: break y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter += 1 new_distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) # Calculate acceptance probability: ratio_prior_prob = self.pdf_of_prior(self.model, perturbation_output[1]) / self.pdf_of_prior(self.model, theta) kernel_numerator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, perturbation_output[1], theta) kernel_denominator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, theta, perturbation_output[1]) ratio_likelihood_prob = kernel_numerator / kernel_denominator acceptance_prob = min(1, ratio_prior_prob * ratio_likelihood_prob) * ( new_distance < self.anneal_parameter) # If accepted if rng.binomial(1, acceptance_prob) == 1: result_theta.append(perturbation_output[1]) result_distance.append(new_distance) theta = perturbation_output[1] distance = new_distance else: result_theta.append(theta) result_distance.append(distance) return result_theta, result_distance, counter def _update_cov_mat(self, rng_t, npc=None): """ Updates the covariance matrix. Parameters ---------- seed_t: numpy.ndarray 2 dimensional array. The first entry defines the initial seed of the random number generator. The second entry defines the way in which the accepted covariance matrix is transformed. Returns ------- numpy.ndarray accepted covariance matrix """ rng = rng_t[0] t = rng_t[1] rng.seed(rng.randint(np.iinfo(np.uint32).max, dtype=np.uint32)) acceptance = 0 accepted_cov_mats_transformed = [cov_mat * pow(2.0, -2.0 * t) for cov_mat in self.accepted_parameters_manager.accepted_cov_mats_bds.value()] theta = self.accepted_parameters_manager.accepted_parameters_bds.value()[0] mapping_for_kernels, garbage_index = self.accepted_parameters_manager.get_mapping( self.accepted_parameters_manager.model) counter = 0 for ind in range(0, self.chain_length): self.logger.debug("Parameter acceptance loop step {}.".format(ind)) while True: perturbation_output = self.perturb(0, rng=rng) if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0: break y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter += 1 new_distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) self.logger.debug("Calculate acceptance probability.") # Calculate acceptance probability: ratio_prior_prob = self.pdf_of_prior(self.model, perturbation_output[1]) / self.pdf_of_prior(self.model, theta) kernel_numerator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, perturbation_output[1], theta) kernel_denominator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, theta, perturbation_output[1]) ratio_likelihood_prob = kernel_numerator / kernel_denominator acceptance_prob = min(1, ratio_prior_prob * ratio_likelihood_prob) * (new_distance < self.anneal_parameter) if rng.binomial(1, acceptance_prob) == 1: theta = perturbation_output[1] acceptance = acceptance + 1 self.logger.debug("Return accepted parameters.") if acceptance / 10 <= 0.5 and acceptance / 10 >= 0.3: return accepted_cov_mats_transformed, t, 1, counter else: return accepted_cov_mats_transformed, t, 0, counter
[docs]class RSMCABC(BaseDiscrepancy, InferenceMethod): """This class implements Replenishment Sequential Monte Carlo Approximate Bayesian computation of Drovandi and Pettitt [1]. [1] CC. Drovandi CC and AN. Pettitt, Estimation of parameters for macroparasite population evolution using approximate Bayesian computation. Biometrics 67(1):225–233, 2011. Parameters ---------- model : list A list of the Probabilistic models corresponding to the observed datasets distance : abcpy.distances.Distance Distance object defining the distance measure used to compare simulated and observed data sets. kernel : abcpy.distributions.Distribution Distribution object defining the perturbation kernel needed for the sampling. backend : abcpy.backends.Backend Backend object defining the backend to be used. seed : integer, optional Optional initial seed for the random number generator. The default value is generated randomly. """ model = None distance = None kernel = None R = None rng = None n_samples = None n_samples_per_param = None alpha = None accepted_dist_bds = None backend = None
[docs] def __init__(self, root_models, distances, backend, kernel=None, seed=None): self.model = root_models # We define the joint Linear combination distance using all the distances for each individual models self.distance = LinearCombination(root_models, distances) if kernel is None: mapping, garbage_index = self._get_mapping() models = [] for mdl, mdl_index in mapping: models.append(mdl) kernel = DefaultKernel(models) self.kernel = kernel self.backend = backend self.logger = logging.getLogger(__name__) self.R = None self.rng = np.random.RandomState(seed) # these are usually big tables, so we broadcast them to have them once # per executor instead of once per task self.accepted_parameters_manager = AcceptedParametersManager(self.model) self.accepted_dist_bds = None self.simulation_counter = 0
[docs] def sample(self, observations, steps, n_samples=10000, n_samples_per_param=1, alpha=0.1, epsilon_init=100, epsilon_final=0.1, const=0.01, covFactor=2.0, full_output=0, journal_file=None): """ Samples from the posterior distribution of the model parameter given the observed data observations. Parameters ---------- observations : list A list, containing lists describing the observed data sets steps : integer Number of iterations in the sequential algoritm ("generations") n_samples : integer, optional Number of samples to generate. The default value is 10000. n_samples_per_param : integer, optional Number of data points in each simulated data set. The default value is 1. alpha : float, optional A parameter taking values between [0,1], the default value is 0.1. epsilon_init : float, optional Initial value of threshold, the default is 100 epsilon_final : float, optional Terminal value of threshold, the default is 0.1 const : float, optional A constant to compute acceptance probability, the default is 0.01. covFactor : float, optional scaling parameter of the covariance matrix. The default value is 2. full_output: integer, optional If full_output==1, intermediate results are included in output journal. The default value is 0, meaning the intermediate results are not saved. journal_file: str, optional Filename of a journal file to read an already saved journal file, from which the first iteration will start. The default value is None. Returns ------- abcpy.output.Journal A journal containing simulation results, metadata and optionally intermediate results. """ self.sample_from_prior(rng=self.rng) self.accepted_parameters_manager.broadcast(self.backend, observations) self.alpha = alpha self.n_samples = n_samples self.n_samples_per_param = n_samples_per_param if journal_file is None: journal = Journal(full_output) journal.configuration["type_model"] = [type(model).__name__ for model in self.model] journal.configuration["type_dist_func"] = type(self.distance).__name__ journal.configuration["n_samples"] = self.n_samples journal.configuration["n_samples_per_param"] = self.n_samples_per_param journal.configuration["steps"] = steps else: journal = Journal.fromFile(journal_file) accepted_parameters = None accepted_cov_mat = None accepted_dist = None accepted_weights = None # main RSMCABC algorithm for aStep in range(steps): self.logger.info("RSMCABC iteration {}".format(aStep)) if aStep == 0 and journal_file is not None: accepted_parameters = journal.get_accepted_parameters(-1) accepted_weights = journal.get_weights(-1) self.accepted_parameters_manager.update_broadcast(self.backend, accepted_weights=accepted_weights, accepted_parameters=accepted_parameters) kernel_parameters = [] for kernel in self.kernel.kernels: kernel_parameters.append( self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models)) self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters) accepted_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager) accepted_cov_mats = self._compute_accepted_cov_mats(covFactor, accepted_cov_mats) # accepted_cov_mats = [covFactor * cov_mat for cov_mat in accepted_cov_mats] self.accepted_parameters_manager.update_broadcast(self.backend, accepted_cov_mats=accepted_cov_mats) # 0: Compute epsilon, compute new covariance matrix for Kernel, # and finally Drawing new new/perturbed samples using prior or MCMC Kernel # print("DEBUG: Iteration " + str(aStep) + " of RSMCABC algorithm.") self.logger.info("Compute epsilon and calculating covariance matrix.") if aStep == 0: n_replenish = n_samples # Compute epsilon epsilon = [epsilon_init] R = int(1) if journal_file is None: accepted_cov_mats = None else: # Compute epsilon epsilon.append(accepted_dist[-1]) # Calculate covariance # print("INFO: Calculating covariance matrix.") kernel_parameters = [] for kernel in self.kernel.kernels: kernel_parameters.append( self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models)) self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters) accepted_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager) accepted_cov_mats = self._compute_accepted_cov_mats(covFactor, accepted_cov_mats) # accepted_cov_mats = [covFactor*cov_mat for cov_mat in accepted_cov_mats] if epsilon[-1] < epsilon_final: self.logger("accepted epsilon {:e} < {:e}" .format(epsilon[-1], epsilon_final)) break seed_arr = self.rng.randint(0, np.iinfo(np.uint32).max, size=n_replenish, dtype=np.uint32) rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr]) rng_pds = self.backend.parallelize(rng_arr) # update remotely required variables self.logger.info("Broadcasting parameters.") # print("INFO: Broadcasting parameters.") self.epsilon = epsilon self.R = R self.logger.info("Broadcast updated variable.") # Broadcast updated variable self.accepted_parameters_manager.update_broadcast(self.backend, accepted_cov_mats=accepted_cov_mats) self._update_broadcasts(accepted_dist) # calculate resample parameters self.logger.info("Resampling parameters") # print("INFO: Resampling parameters") params_and_dist_index_pds = self.backend.map(self._accept_parameter, rng_pds) params_and_dist_index = self.backend.collect(params_and_dist_index_pds) new_parameters, new_dist, new_index, counter = [list(t) for t in zip(*params_and_dist_index)] new_dist = np.array(new_dist) new_index = np.array(new_index) for count in counter: self.simulation_counter += count # 1: Update all parameters, compute acceptance probability, compute epsilon self.logger.info("Append updated new parameters.") if len(new_dist) == self.n_samples: accepted_parameters = new_parameters accepted_dist = new_dist accepted_weights = np.ones(shape=(len(accepted_parameters), 1)) * (1 / len(accepted_parameters)) else: accepted_parameters += new_parameters accepted_dist = np.concatenate((accepted_dist, new_dist)) accepted_weights = np.ones(shape=(len(accepted_parameters), 1)) * (1 / len(accepted_parameters)) if (full_output == 1 and aStep <= steps - 1) or (full_output == 0 and aStep == steps - 1): self.logger.info("Saving configuration to output journal.") journal.add_accepted_parameters(copy.deepcopy(accepted_parameters)) journal.add_distances(copy.deepcopy(accepted_dist)) journal.add_weights(accepted_weights) journal.add_ESS_estimate(accepted_weights) self.accepted_parameters_manager.update_broadcast(self.backend, accepted_weights=accepted_weights, accepted_parameters=accepted_parameters) names_and_parameters = self._get_names_and_parameters() journal.add_user_parameters(names_and_parameters) journal.number_of_simulations.append(self.simulation_counter) # 2: Compute acceptance probability and set R self.logger.info("Compute acceptance probabilty and set R") prob_acceptance = sum(new_index) / (R * n_replenish) if prob_acceptance == 1 or prob_acceptance == 0: R = 1 else: R = int(np.log(const) / np.log(1 - prob_acceptance)) self.logger.info("Order accepted parameters and distances") n_replenish = round(n_samples * alpha) accepted_params_and_dist = zip(accepted_dist, accepted_parameters) accepted_params_and_dist = sorted(accepted_params_and_dist, key=lambda x: x[0]) accepted_dist, accepted_parameters = [list(t) for t in zip(*accepted_params_and_dist)] self.logger.info("Throw away N_alpha particles with largest dist") # Throw away N_alpha particles with largest distance del accepted_parameters[self.n_samples - round(n_samples * alpha):] accepted_dist = np.delete(accepted_dist, np.arange(round(n_samples * alpha)) + (n_samples - round(n_samples * alpha)), 0) accepted_weights = np.ones(shape=(len(accepted_parameters), 1)) * (1 / len(accepted_parameters)) self.logger.info("Update parameters, weights") self.accepted_parameters_manager.update_broadcast(self.backend, accepted_weights=accepted_weights, accepted_parameters=accepted_parameters) # Add epsilon_arr to the journal journal.configuration["epsilon_arr"] = epsilon return journal
def _update_broadcasts(self, accepted_dist): def destroy(bc): if bc != None: bc.unpersist # bc.destroy if not accepted_dist is None: self.accepted_dist_bds = self.backend.broadcast(accepted_dist) # define helper functions for map step def _accept_parameter(self, rng, npc=None): """ Samples a single model parameter and simulate from it until distance between simulated outcome and the observation is smaller than epsilon. Parameters ---------- seed: integer Initial seed for the random number generator. Returns ------- numpy.ndarray accepted parameter """ rng.seed(rng.randint(np.iinfo(np.uint32).max, dtype=np.uint32)) distance = self.distance.dist_max() mapping_for_kernels, garbage_index = self.accepted_parameters_manager.get_mapping( self.accepted_parameters_manager.model) counter = 0 if self.accepted_parameters_manager.accepted_parameters_bds is None: while distance > self.epsilon[-1]: self.sample_from_prior(rng=rng) y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter += 1 distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) index_accept = 1 else: index = rng.choice(len(self.accepted_parameters_manager.accepted_parameters_bds.value()), size=1) theta = self.accepted_parameters_manager.accepted_parameters_bds.value()[index[0]] index_accept = 0.0 for ind in range(self.R): while True: perturbation_output = self.perturb(index[0], rng=rng) if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0: break y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter += 1 distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) ratio_prior_prob = self.pdf_of_prior(self.model, perturbation_output[1]) / self.pdf_of_prior(self.model, theta) kernel_numerator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, perturbation_output[1], theta) kernel_denominator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, theta, perturbation_output[1]) ratio_kernel_prob = kernel_numerator / kernel_denominator probability_acceptance = min(1, ratio_prior_prob * ratio_kernel_prob) if distance < self.epsilon[-1] and rng.binomial(1, probability_acceptance) == 1: index_accept += 1 else: self.set_parameters(theta) distance = self.accepted_dist_bds.value()[index[0]] return self.get_parameters(self.model), distance, index_accept, counter def _compute_accepted_cov_mats(self, covFactor, new_cov_mats): """ Update the covariance matrices computed from data by multiplying them with covFactor and adding a small term in the diagonal for numerical stability. Parameters ---------- covFactor : float factor to correct the covariance matrices new_cov_mats : list list of covariance matrices computed from data Returns ------- list List of new accepted covariance matrices """ # accepted_cov_mats = [covFactor * cov_mat for cov_mat in accepted_cov_mats] accepted_cov_mats = [] for new_cov_mat in new_cov_mats: if not (new_cov_mat.size == 1): accepted_cov_mats.append( covFactor * new_cov_mat + 0.0001 * np.trace(new_cov_mat) * np.eye(new_cov_mat.shape[0])) else: accepted_cov_mats.append((covFactor * new_cov_mat + 0.0001 * new_cov_mat).reshape(1, 1)) return accepted_cov_mats
[docs]class APMCABC(BaseDiscrepancy, InferenceMethod): """This class implements Adaptive Population Monte Carlo Approximate Bayesian computation of M. Lenormand et al. [1]. [1] M. Lenormand, F. Jabot and G. Deffuant, Adaptive approximate Bayesian computation for complex models. Computational Statistics, 28:2777–2796, 2013. Parameters ---------- model : list A list of the Probabilistic models corresponding to the observed datasets distance : abcpy.distances.Distance Distance object defining the distance measure used to compare simulated and observed data sets. kernel : abcpy.distributions.Distribution Distribution object defining the perturbation kernel needed for the sampling. backend : abcpy.backends.Backend Backend object defining the backend to be used. seed : integer, optional Optional initial seed for the random number generator. The default value is generated randomly. """ model = None distance = None kernel = None epsilon = None rng = None n_samples = None n_samples_per_param = None alpha = None accepted_dist = None backend = None
[docs] def __init__(self, root_models, distances, backend, kernel=None, seed=None): self.model = root_models # We define the joint Linear combination distance using all the distances for each individual models self.distance = LinearCombination(root_models, distances) if kernel is None: mapping, garbage_index = self._get_mapping() models = [] for mdl, mdl_index in mapping: models.append(mdl) kernel = DefaultKernel(models) self.kernel = kernel self.backend = backend self.logger = logging.getLogger(__name__) self.epsilon = None self.rng = np.random.RandomState(seed) # these are usually big tables, so we broadcast them to have them once # per executor instead of once per task self.accepted_parameters_manager = AcceptedParametersManager(self.model) self.accepted_dist_bds = None self.simulation_counter = 0
[docs] def sample(self, observations, steps, n_samples=10000, n_samples_per_param=1, alpha=0.1, acceptance_cutoff=0.03, covFactor=2.0, full_output=0, journal_file=None): """Samples from the posterior distribution of the model parameter given the observed data observations. Parameters ---------- observations : list A list, containing lists describing the observed data sets steps : integer Number of iterations in the sequential algoritm ("generations") n_samples : integer, optional Number of samples to generate. The default value is 10000. n_samples_per_param : integer, optional Number of data points in each simulated data set. The default value is 1. alpha : float, optional A parameter taking values between [0,1], the default value is 0.1. acceptance_cutoff : float, optional Acceptance ratio cutoff, should be chosen between 0.01 and 0.03 covFactor : float, optional scaling parameter of the covariance matrix. The default value is 2. full_output: integer, optional If full_output==1, intermediate results are included in output journal. The default value is 0, meaning the intermediate results are not saved. journal_file: str, optional Filename of a journal file to read an already saved journal file, from which the first iteration will start. The default value is None. Returns ------- abcpy.output.Journal A journal containing simulation results, metadata and optionally intermediate results. """ self.sample_from_prior(rng=self.rng) self.accepted_parameters_manager.broadcast(self.backend, observations) self.alpha = alpha self.n_samples = n_samples self.n_samples_per_param = n_samples_per_param if journal_file is None: journal = Journal(full_output) journal.configuration["type_model"] = [type(model).__name__ for model in self.model] journal.configuration["type_dist_func"] = type(self.distance).__name__ journal.configuration["n_samples"] = self.n_samples journal.configuration["n_samples_per_param"] = self.n_samples_per_param journal.configuration["steps"] = steps else: journal = Journal.fromFile(journal_file) accepted_parameters = None accepted_weights = None accepted_cov_mats = None accepted_dist = None alpha_accepted_parameters = None alpha_accepted_weights = None alpha_accepted_dist = None # main APMCABC algorithm # print("INFO: Starting APMCABC iterations.") for aStep in range(steps): self.logger.info("APMCABC iteration {}".format(aStep)) if aStep == 0 and journal_file is not None: accepted_parameters = journal.get_accepted_parameters(-1) accepted_weights = journal.get_weights(-1) self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters, accepted_weights=accepted_weights) kernel_parameters = [] for kernel in self.kernel.kernels: kernel_parameters.append( self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models)) self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters) accepted_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager) accepted_cov_mats = self._compute_accepted_cov_mats(covFactor, accepted_cov_mats) # accepted_cov_mats = [covFactor * cov_mat for cov_mat in accepted_cov_mats] self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters, accepted_weights=accepted_weights) alpha_accepted_parameters = accepted_parameters alpha_accepted_weights = accepted_weights # 0: Drawing new new/perturbed samples using prior or MCMC Kernel if aStep > 0: n_additional_samples = n_samples - round(n_samples * alpha) else: n_additional_samples = n_samples seed_arr = self.rng.randint(0, np.iinfo(np.uint32).max, size=n_additional_samples, dtype=np.uint32) rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr]) rng_pds = self.backend.parallelize(rng_arr) # update remotely required variables self.logger.info("Broadcasting parameters") self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=alpha_accepted_parameters, accepted_weights=alpha_accepted_weights, accepted_cov_mats=accepted_cov_mats) self._update_broadcasts(alpha_accepted_dist) # calculate resample parameters self.logger.info("Resampling parameters") params_and_dist_weights_pds = self.backend.map(self._accept_parameter, rng_pds) params_and_dist_weights = self.backend.collect(params_and_dist_weights_pds) new_parameters, new_dist, new_weights, counter = [list(t) for t in zip(*params_and_dist_weights)] new_parameters = np.array(new_parameters) new_dist = np.array(new_dist) new_weights = np.array(new_weights).reshape(n_additional_samples, 1) for count in counter: self.simulation_counter += count # 1: Update all parameters, compute acceptance probability, compute epsilon if len(new_weights) == n_samples: accepted_parameters = new_parameters accepted_dist = new_dist accepted_weights = new_weights # Compute acceptance probability prob_acceptance = 1 # Compute epsilon epsilon = [np.percentile(accepted_dist, alpha * 100)] else: accepted_parameters = np.concatenate((alpha_accepted_parameters, new_parameters)) accepted_dist = np.concatenate((alpha_accepted_dist, new_dist)) accepted_weights = np.concatenate((alpha_accepted_weights, new_weights)) # Compute acceptance probability prob_acceptance = sum(new_dist < epsilon[-1]) / len(new_dist) # Compute epsilon epsilon.append(np.percentile(accepted_dist, alpha * 100)) # 2: Update alpha_parameters, alpha_dist and alpha_weights index_alpha = accepted_dist < epsilon[-1] alpha_accepted_parameters = accepted_parameters[index_alpha, :] alpha_accepted_weights = accepted_weights[index_alpha] / sum(accepted_weights[index_alpha]) alpha_accepted_dist = accepted_dist[index_alpha] # 3: calculate covariance self.logger.info("Calculating covariance matrix") self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=alpha_accepted_parameters, accepted_weights=alpha_accepted_weights) kernel_parameters = [] for kernel in self.kernel.kernels: kernel_parameters.append( self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models)) self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters) accepted_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager) accepted_cov_mats = self._compute_accepted_cov_mats(covFactor, accepted_cov_mats) # accepted_cov_mats = [covFactor*cov_mat for cov_mat in accepted_cov_mats] # print("INFO: Saving configuration to output journal.") if (full_output == 1 and aStep <= steps - 1) or (full_output == 0 and aStep == steps - 1): journal.add_accepted_parameters(copy.deepcopy(accepted_parameters)) journal.add_distances(copy.deepcopy(accepted_dist)) journal.add_weights(copy.deepcopy(accepted_weights)) journal.add_ESS_estimate(accepted_weights) self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters, accepted_weights=accepted_weights) names_and_parameters = self._get_names_and_parameters() journal.add_user_parameters(names_and_parameters) journal.number_of_simulations.append(self.simulation_counter) # 4: Check probability of acceptance lower than acceptance_cutoff if prob_acceptance < acceptance_cutoff: break # Add epsilon_arr to the journal journal.configuration["epsilon_arr"] = epsilon return journal
def _update_broadcasts(self, accepted_dist): def destroy(bc): if bc != None: bc.unpersist # bc.destroy self.accepted_dist_bds = self.backend.broadcast(accepted_dist) # define helper functions for map step def _accept_parameter(self, rng, npc=None): """ Samples a single model parameter and simulate from it until distance between simulated outcome and the observation is smaller than epsilon. Parameters ---------- seed: integer Initial seed for the random number generator. Returns ------- numpy.ndarray accepted parameter """ rng.seed(rng.randint(np.iinfo(np.uint32).max, dtype=np.uint32)) mapping_for_kernels, garbage_index = self.accepted_parameters_manager.get_mapping( self.accepted_parameters_manager.model) counter = 0 if self.accepted_parameters_manager.accepted_parameters_bds is None: self.sample_from_prior(rng=rng) y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter += 1 distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) weight = 1.0 else: index = rng.choice(len(self.accepted_parameters_manager.accepted_weights_bds.value()), size=1, p=self.accepted_parameters_manager.accepted_weights_bds.value().reshape(-1)) # truncate the normal to the bounds of parameter space of the model # truncating the normal like this is fine: https://arxiv.org/pdf/0907.4010v1.pdf while True: perturbation_output = self.perturb(index[0], rng=rng) if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0: break y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter += 1 distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) prior_prob = self.pdf_of_prior(self.model, perturbation_output[1]) denominator = 0.0 for i in range(len(self.accepted_parameters_manager.accepted_weights_bds.value())): pdf_value = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, self.accepted_parameters_manager.accepted_parameters_bds.value()[i], perturbation_output[1]) denominator += self.accepted_parameters_manager.accepted_weights_bds.value()[i, 0] * pdf_value weight = 1.0 * prior_prob / denominator return self.get_parameters(self.model), distance, weight, counter def _compute_accepted_cov_mats(self, covFactor, new_cov_mats): """ Update the covariance matrices computed from data by multiplying them with covFactor and adding a small term in the diagonal for numerical stability. Parameters ---------- covFactor : float factor to correct the covariance matrices new_cov_mats : list list of covariance matrices computed from data Returns ------- list List of new accepted covariance matrices """ # accepted_cov_mats = [covFactor * cov_mat for cov_mat in accepted_cov_mats] accepted_cov_mats = [] for new_cov_mat in new_cov_mats: if not (new_cov_mat.size == 1): accepted_cov_mats.append( covFactor * new_cov_mat + 0.0001 * np.trace(new_cov_mat) * np.eye(new_cov_mat.shape[0])) else: accepted_cov_mats.append((covFactor * new_cov_mat + 0.0001 * new_cov_mat).reshape(1, 1)) return accepted_cov_mats
[docs]class SMCABC(BaseDiscrepancy, InferenceMethod): """This class implements Sequential Monte Carlo Approximate Bayesian computation of Del Moral et al. [1]. [1] P. Del Moral, A. Doucet, A. Jasra, An adaptive sequential Monte Carlo method for approximate Bayesian computation. Statistics and Computing, 22(5):1009–1020, 2012. [2] Lee, Anthony. "n the choice of MCMC kernels for approximate Bayesian computation with SMC samplers. Proceedings of the 2012 Winter Simulation Conference (WSC). IEEE, 2012. Parameters ---------- model : list A list of the Probabilistic models corresponding to the observed datasets distance : abcpy.distances.Distance Distance object defining the distance measure used to compare simulated and observed data sets. kernel : abcpy.distributions.Distribution Distribution object defining the perturbation kernel needed for the sampling. backend : abcpy.backends.Backend Backend object defining the backend to be used. seed : integer, optional Optional initial seed for the random number generator. The default value is generated randomly. """ model = None distance = None kernel = None epsilon = None rng = None n_samples = None n_samples_per_param = None accepted_y_sim_bds = None backend = None
[docs] def __init__(self, root_models, distances, backend, kernel=None, seed=None): self.model = root_models # We define the joint Linear combination distance using all the distances for each individual models self.distance = LinearCombination(root_models, distances) if kernel is None: mapping, garbage_index = self._get_mapping() models = [] for mdl, mdl_index in mapping: models.append(mdl) kernel = DefaultKernel(models) self.kernel = kernel self.backend = backend self.logger = logging.getLogger(__name__) self.epsilon = None self.rng = np.random.RandomState(seed) # these are usually big tables, so we broadcast them to have them once # per executor instead of once per task\ self.accepted_parameters_manager = AcceptedParametersManager(self.model) self.accepted_y_sim_bds = None self.simulation_counter = 0
[docs] def sample(self, observations, steps, n_samples=10000, n_samples_per_param=1, epsilon_final=0.1, alpha=0.95, covFactor=2, resample=None, full_output=0, which_mcmc_kernel=0, journal_file=None): """Samples from the posterior distribution of the model parameter given the observed data observations. Parameters ---------- observations : list A list, containing lists describing the observed data sets steps : integer Number of iterations in the sequential algoritm ("generations") n_samples : integer, optional Number of samples to generate. The default value is 10000. n_samples_per_param : integer, optional Number of data points in each simulated data set. The default value is 1. epsilon_final : float, optional The final threshold value of epsilon to be reached; if at some iteration you reach a lower epsilon than epsilon_final, the algorithm will stop and not proceed with further iterations. The default value is 0.1. alpha : float, optional A parameter taking values between [0,1], determinining the rate of change of the threshold epsilon. The default value is 0.95. covFactor : float, optional scaling parameter of the covariance matrix. The default value is 2. resample : float, optional It defines the resample step: introduce a resample step, after the particles have been perturbed and the new weights have been computed, if the effective sample size is smaller than resample. If not provided, resample is set to 0.5 * n_samples. full_output: integer, optional If full_output==1, intermediate results are included in output journal. The default value is 0, meaning the intermediate results are not saved. which_mcmc_kernel: integer, optional Specifies which MCMC kernel to be used: '0' kernel suggested in [1], any other value will use r-hit kernel suggested by Anthony Lee [2]. The default value is 0. journal_file: str, optional Filename of a journal file to read an already saved journal file, from which the first iteration will start. The default value is None. Returns ------- abcpy.output.Journal A journal containing simulation results, metadata and optionally intermediate results. """ self.sample_from_prior(rng=self.rng) self.accepted_parameters_manager.broadcast(self.backend, observations) self.n_samples = n_samples self.n_samples_per_param = n_samples_per_param if journal_file is None: journal = Journal(full_output) journal.configuration["type_model"] = [type(model).__name__ for model in self.model] journal.configuration["type_dist_func"] = type(self.distance).__name__ journal.configuration["n_samples"] = self.n_samples journal.configuration["n_samples_per_param"] = self.n_samples_per_param journal.configuration["steps"] = steps # maybe add which kernel I am using? else: journal = Journal.fromFile(journal_file) accepted_parameters = None accepted_weights = None accepted_cov_mats = None accepted_y_sim = None # Define the resample parameter if resample is None: resample = n_samples * 0.5 # Define epsilon_init epsilon = [10000] # main SMC ABC algorithm for aStep in range(0, steps): self.logger.info("SMCABC iteration {}".format(aStep)) if aStep == 0 and journal_file is not None: accepted_parameters = journal.get_accepted_parameters(-1) accepted_weights = journal.get_weights(-1) accepted_y_sim = journal.opt_values[-1] self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters, accepted_weights=accepted_weights) kernel_parameters = [] for kernel in self.kernel.kernels: kernel_parameters.append( self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models)) self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters) accepted_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager) accepted_cov_mats = self._compute_accepted_cov_mats(covFactor, accepted_cov_mats) # accepted_cov_mats = [covFactor * cov_mat for cov_mat in accepted_cov_mats] self.accepted_parameters_manager.update_broadcast(self.backend, accepted_cov_mats=accepted_cov_mats) # Break if epsilon in previous step is less than epsilon_final if epsilon[-1] <= epsilon_final: break # 0: Compute the Epsilon if accepted_y_sim != None: self.logger.info("Compute epsilon, might take a while") # Compute epsilon for next step fun = lambda epsilon_var: self._compute_epsilon(epsilon_var, \ epsilon, observations, accepted_y_sim, accepted_weights, n_samples, n_samples_per_param, alpha) epsilon_new = self._bisection(fun, epsilon_final, epsilon[-1], 0.001) if epsilon_new < epsilon_final: epsilon_new = epsilon_final epsilon.append(epsilon_new) # 1: calculate weights for new parameters self.logger.info("Calculating weights") if accepted_y_sim != None: new_weights = np.zeros(shape=n_samples, ) for ind1 in range(n_samples): numerator = 0.0 denominator = 0.0 for ind2 in range(n_samples_per_param): numerator += (self.distance.distance(observations, [[accepted_y_sim[ind1][0][ind2]]]) < epsilon[ -1]) denominator += ( self.distance.distance(observations, [[accepted_y_sim[ind1][0][ind2]]]) < epsilon[-2]) if denominator != 0.0: new_weights[ind1] = accepted_weights[ind1] * (numerator / denominator) else: new_weights[ind1] = 0 new_weights = new_weights / sum(new_weights) else: new_weights = np.ones(shape=n_samples, ) * (1.0 / n_samples) # 2: Resample if accepted_y_sim is not None and pow(sum(pow(new_weights, 2)), -1) < resample: self.logger.info("Resampling") # Weighted resampling: index_resampled = self.rng.choice(n_samples, n_samples, replace=True, p=new_weights) # accepted_parameters is a list. Then the indexing here does not work: # accepted_parameters = accepted_parameters[index_resampled] # do instead: accepted_parameters = [accepted_parameters[i] for i in index_resampled] # why don't we use arrays however? new_weights = np.ones(shape=n_samples, ) * (1.0 / n_samples) # Update the weights accepted_weights = new_weights.reshape(len(new_weights), 1) self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters, accepted_weights=accepted_weights) if accepted_y_sim is not None: kernel_parameters = [] for kernel in self.kernel.kernels: kernel_parameters.append( self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models)) self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters) accepted_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager) accepted_cov_mats = self._compute_accepted_cov_mats(covFactor, accepted_cov_mats) # accepted_cov_mats = [covFactor * cov_mat for cov_mat in accepted_cov_mats] # 3: Drawing new perturbed samples using MCMC Kernel self.logger.debug("drawing new pertubated samples using mcmc kernel") seed_arr = self.rng.randint(0, np.iinfo(np.uint32).max, size=n_samples, dtype=np.uint32) rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr]) index_arr = np.arange(n_samples) rng_and_index_arr = np.column_stack((rng_arr, index_arr)) rng_and_index_pds = self.backend.parallelize(rng_and_index_arr) # print("INFO: Broadcasting parameters.") self.epsilon = epsilon self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters, accepted_weights=accepted_weights, accepted_cov_mats=accepted_cov_mats) self._update_broadcasts(accepted_y_sim) # calculate resample parameters self.logger.info("Drawing perturbed sampless") if which_mcmc_kernel == 0: params_and_ysim_pds = self.backend.map(self._accept_parameter, rng_and_index_pds) else: params_and_ysim_pds = self.backend.map(self._accept_parameter_r_hit_kernel, rng_and_index_pds) params_and_ysim = self.backend.collect(params_and_ysim_pds) new_parameters, new_y_sim, distances, counter = [list(t) for t in zip(*params_and_ysim)] distances = np.array(distances) for count in counter: self.simulation_counter += count # Update the parameters accepted_parameters = new_parameters accepted_y_sim = new_y_sim if (full_output == 1 and aStep <= steps - 1) or (full_output == 0 and aStep == steps - 1): self.logger.info("Saving configuration to output journal") self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters) journal.add_accepted_parameters(copy.deepcopy(accepted_parameters)) journal.add_distances(copy.deepcopy(distances)) journal.add_weights(copy.deepcopy(accepted_weights)) journal.add_ESS_estimate(accepted_weights) journal.add_opt_values(copy.deepcopy(accepted_y_sim)) names_and_parameters = self._get_names_and_parameters() journal.add_user_parameters(names_and_parameters) journal.number_of_simulations.append(self.simulation_counter) # Add epsilon_arr to the journal journal.configuration["epsilon_arr"] = epsilon return journal
def _compute_epsilon(self, epsilon_new, epsilon, observations, accepted_y_sim, accepted_weights, n_samples, n_samples_per_param, alpha): """ Parameters ---------- epsilon_new: float New value for epsilon. epsilon: float Current threshold. observations: numpy.ndarray Observed data. accepted_y_sim: numpy.ndarray Accepted simulated data. accepted_weights: numpy.ndarray Accepted weights. n_samples: integer Number of samples to generate. n_samples_per_param: integer Number of data points in each simulated data set. alpha: float Returns ------- float Newly computed value for threshold. """ RHS = alpha * pow(sum(pow(accepted_weights, 2)), -1) LHS = np.zeros(shape=n_samples, ) for ind1 in range(n_samples): numerator = 0.0 denominator = 0.0 for ind2 in range(n_samples_per_param): numerator += (self.distance.distance(observations, [[accepted_y_sim[ind1][0][ind2]]]) < epsilon_new) denominator += (self.distance.distance(observations, [[accepted_y_sim[ind1][0][ind2]]]) < epsilon[-1]) if denominator == 0: LHS[ind1] = 0 else: LHS[ind1] = accepted_weights[ind1] * (numerator / denominator) if sum(LHS) == 0: result = RHS else: LHS = LHS / sum(LHS) LHS = pow(sum(pow(LHS, 2)), -1) result = RHS - LHS return result def _bisection(self, func, low, high, tol): # cache computed values, as we call func below # several times for the same argument: func = cached(func) midpoint = (low + high) / 2.0 while (high - low) / 2.0 > tol: self.logger.debug("bisection: distance = {:e} > tol = {:e}" .format((high - low) / 2, tol)) if func(midpoint) == 0: return midpoint elif func(low) * func(midpoint) < 0: high = midpoint else: low = midpoint midpoint = (low + high) / 2.0 return midpoint def _update_broadcasts(self, accepted_y_sim): def destroy(bc): if bc != None: bc.unpersist # bc.destroy if not accepted_y_sim is None: self.accepted_y_sim_bds = self.backend.broadcast(accepted_y_sim) # define helper functions for map step def _accept_parameter(self, rng_and_index, npc=None): """ Samples a single model parameter and simulate from it until distance between simulated outcome and the observation is smaller than epsilon. Parameters ---------- seed_and_index: numpy.ndarray 2 dimensional array. The first entry specifies the initial seed for the random number generator. The second entry defines the index in the data set. Returns ------- Tuple The first entry of the tuple is the accepted parameters. The second entry is the simulated data set. """ rng = rng_and_index[0] index = rng_and_index[1] rng.seed(rng.randint(np.iinfo(np.uint32).max, dtype=np.uint32)) mapping_for_kernels, garbage_index = self.accepted_parameters_manager.get_mapping( self.accepted_parameters_manager.model) counter = 0 # print("on seed " + str(seed) + " distance: " + str(distance) + " epsilon: " + str(self.epsilon)) if self.accepted_parameters_manager.accepted_parameters_bds is None: self.sample_from_prior(rng=rng) y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter += 1 else: if self.accepted_parameters_manager.accepted_weights_bds.value()[index] > 0: theta = self.accepted_parameters_manager.accepted_parameters_bds.value()[index] while True: perturbation_output = self.perturb(index, rng=rng) if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0: break y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter += 1 y_sim_old = self.accepted_y_sim_bds.value()[index] # Calculate acceptance probability: numerator = 0.0 denominator = 0.0 for ind in range(self.n_samples_per_param): numerator += (self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), [[y_sim[0][ind]]]) < self.epsilon[-1]) denominator += (self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), [[y_sim_old[0][ind]]]) < self.epsilon[-1]) if denominator == 0: ratio_data_epsilon = 1 else: ratio_data_epsilon = numerator / denominator ratio_prior_prob = self.pdf_of_prior(self.model, perturbation_output[1]) / self.pdf_of_prior(self.model, theta) kernel_numerator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, perturbation_output[1], theta) kernel_denominator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, theta, perturbation_output[1]) ratio_likelihood_prob = kernel_numerator / kernel_denominator acceptance_prob = min(1, ratio_data_epsilon * ratio_prior_prob * ratio_likelihood_prob) if rng.binomial(1, acceptance_prob) == 1: self.set_parameters(perturbation_output[1]) else: self.set_parameters(theta) y_sim = self.accepted_y_sim_bds.value()[index] else: self.set_parameters(self.accepted_parameters_manager.accepted_parameters_bds.value()[index]) y_sim = self.accepted_y_sim_bds.value()[index] distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) return self.get_parameters(), y_sim, distance, counter def _accept_parameter_r_hit_kernel(self, rng_and_index, npc=None): """ This implements algorithm 5 in Lee (2012) [2] which is used as an MCMC kernel in SMCABC. This implementation uses r=3. Parameters ---------- rng_and_index: numpy.ndarray 2 dimensional array. The first entry is a random number generator. The second entry defines the index in the data set. Returns ------- Tuple The first entry of the tuple is the accepted parameters. The second entry is the simulated data set. The third one is the distance between the simulated data set and the observation, while the fourth one is the number of simulations needed to obtain the accepted parameter. """ rng = rng_and_index[0] index = rng_and_index[1] rng.seed(rng.randint(np.iinfo(np.uint32).max, dtype=np.uint32)) # Set value of r for r-hit kernel r = 3 mapping_for_kernels, garbage_index = self.accepted_parameters_manager.get_mapping( self.accepted_parameters_manager.model) counter = 0 # print("on seed " + str(seed) + " distance: " + str(distance) + " epsilon: " + str(self.epsilon)) if self.accepted_parameters_manager.accepted_parameters_bds is None: self.sample_from_prior(rng=rng) y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) counter += 1 else: if self.accepted_parameters_manager.accepted_weights_bds.value()[index] > 0: theta = self.accepted_parameters_manager.accepted_parameters_bds.value()[index] # Sample from theta until we get 'r-1' y_sim inside the epsilon ball self.set_parameters(theta) accept_old_arr, y_sim_old_arr, N_old = [], [], 0 while sum(accept_old_arr) < r - 1: y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) y_sim_old_arr.append(y_sim) if self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) < self.epsilon[-1]: accept_old_arr.append(N_old) N_old += 1 counter += 1 # Perturb and sample from the perturbed theta until we get 'r' y_sim inside the epsilon ball while True: perturbation_output = self.perturb(index, rng=rng) if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0: break accept_new_arr, y_sim_new_arr, N = [], [], 0 while sum(accept_new_arr) < r: y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc) y_sim_new_arr.append(y_sim) if self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) < self.epsilon[-1]: accept_new_arr.append(N) counter += 1 N += 1 # Calculate acceptance probability ratio_prior_prob = self.pdf_of_prior(self.model, perturbation_output[1]) / self.pdf_of_prior(self.model, theta) kernel_numerator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, perturbation_output[1], theta) kernel_denominator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, theta, perturbation_output[1]) ratio_likelihood_prob = kernel_numerator / kernel_denominator acceptance_prob = min(1, (N_old / (N - 1)) * ratio_prior_prob * ratio_likelihood_prob) if rng.binomial(1, acceptance_prob) == 1: self.set_parameters(perturbation_output[1]) # Randomly sample index J J = rng.choice(accept_new_arr).astype(int) y_sim = y_sim_new_arr[J] else: self.set_parameters(theta) y_sim = self.accepted_y_sim_bds.value()[index] else: self.set_parameters(self.accepted_parameters_manager.accepted_parameters_bds.value()[index]) y_sim = self.accepted_y_sim_bds.value()[index] distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim) return self.get_parameters(), y_sim, distance, counter def _compute_accepted_cov_mats(self, covFactor, new_cov_mats): """ Update the covariance matrices computed from data by multiplying them with covFactor and adding a small term in the diagonal for numerical stability. Parameters ---------- covFactor : float factor to correct the covariance matrices new_cov_mats : list list of covariance matrices computed from data Returns ------- list List of new accepted covariance matrices """ # accepted_cov_mats = [covFactor * cov_mat for cov_mat in accepted_cov_mats] accepted_cov_mats = [] for new_cov_mat in new_cov_mats: if not (new_cov_mat.size == 1): accepted_cov_mats.append( covFactor * new_cov_mat + 0.0001 * np.trace(new_cov_mat) * np.eye(new_cov_mat.shape[0])) else: accepted_cov_mats.append((covFactor * new_cov_mat + 0.0001 * new_cov_mat).reshape(1, 1)) return accepted_cov_mats