import copy
import logging
import time
from abc import abstractproperty
import numpy as np
from scipy import optimize
from abcpy.acceptedparametersmanager import *
from abcpy.graphtools import GraphTools
from abcpy.jointapprox_lhd import SumCombination
from abcpy.jointdistances import LinearCombination
from abcpy.output import Journal
from abcpy.perturbationkernel import DefaultKernel
from abcpy.probabilisticmodels import *
from abcpy.utils import cached
[docs]class InferenceMethod(GraphTools, metaclass=ABCMeta):
"""
This abstract base class represents an inference method.
"""
def __getstate__(self):
"""Cloudpickle is used with the MPIBackend. This function ensures that the backend itself
is not pickled
"""
state = self.__dict__.copy()
del state['backend']
return state
[docs] @abstractmethod
def sample(self):
"""To be overwritten by any sub-class:
Samples from the posterior distribution of the model parameter given the observed
data observations.
"""
raise NotImplementedError
@abstractproperty
def model(self):
"""To be overwritten by any sub-class: an attribute specifying the model to be used
"""
raise NotImplementedError
@abstractproperty
def rng(self):
"""To be overwritten by any sub-class: an attribute specifying the random number generator to be used
"""
raise NotImplementedError
@abstractproperty
def backend(self):
"""To be overwritten by any sub-class: an attribute specifying the backend to be used."""
raise NotImplementedError
@abstractproperty
def n_samples(self):
"""To be overwritten by any sub-class: an attribute specifying the number of samples to be generated
"""
raise NotImplementedError
@abstractproperty
def n_samples_per_param(self):
"""To be overwritten by any sub-class: an attribute specifying the number of data points in each simulated data set."""
raise NotImplementedError
[docs]class BaseMethodsWithKernel(metaclass=ABCMeta):
"""
This abstract base class represents inference methods that have a kernel.
"""
@abstractproperty
def kernel(self):
"""To be overwritten by any sub-class: an attribute specifying the transition or perturbation kernel."""
raise NotImplementedError
[docs] def perturb(self, column_index, epochs=10, rng=np.random.RandomState()):
"""
Perturbs all free parameters, given the current weights.
Commonly used during inference.
Parameters
----------
column_index: integer
The index of the column in the accepted_parameters_bds that should be used for perturbation
epochs: integer
The number of times perturbation should happen before the algorithm is terminated
Returns
-------
boolean
Whether it was possible to set new parameter values for all probabilistic models
"""
current_epoch = 0
while current_epoch < epochs:
# Get new parameters of the graph
new_parameters = self.kernel.update(self.accepted_parameters_manager, column_index, rng=rng)
self._reset_flags()
# Order the parameters provided by the kernel in depth-first search order
correctly_ordered_parameters = self.get_correct_ordering(new_parameters)
# Try to set new parameters
accepted, last_index = self.set_parameters(correctly_ordered_parameters, 0)
if accepted:
break
current_epoch += 1
if current_epoch == 10:
return [False]
return [True, correctly_ordered_parameters]
[docs]class BaseLikelihood(InferenceMethod, BaseMethodsWithKernel, metaclass=ABCMeta):
"""
This abstract base class represents inference methods that use the likelihood.
"""
@abstractproperty
def likfun(self):
"""To be overwritten by any sub-class: an attribute specifying the likelihood function to be used."""
raise NotImplementedError
[docs]class BaseDiscrepancy(InferenceMethod, BaseMethodsWithKernel, metaclass=ABCMeta):
"""
This abstract base class represents inference methods using descrepancy.
"""
@abstractproperty
def distance(self):
"""To be overwritten by any sub-class: an attribute specifying the distance function."""
raise NotImplementedError
[docs]class RejectionABC(InferenceMethod):
"""This class implements the rejection algorithm based inference scheme [1] for
Approximate Bayesian Computation.
[1] Tavaré, S., Balding, D., Griffith, R., Donnelly, P.: Inferring coalescence
times from DNA sequence data. Genetics 145(2), 505–518 (1997).
Parameters
----------
model: list
A list of the Probabilistic models corresponding to the observed datasets
distance: abcpy.distances.Distance
Distance object defining the distance measure to compare simulated and observed data sets.
backend: abcpy.backends.Backend
Backend object defining the backend to be used.
seed: integer, optionaldistance
Optional initial seed for the random number generator. The default value is generated randomly.
"""
# TODO: defining attributes as class attributes is not correct, move to init
model = None
distance = None
rng = None
n_samples = None
n_samples_per_param = None
epsilon = None
backend = None
[docs] def __init__(self, root_models, distances, backend, seed=None):
self.model = root_models
# We define the joint Linear combination distance using all the distances for each individual models
self.distance = LinearCombination(root_models, distances)
self.backend = backend
self.rng = np.random.RandomState(seed)
self.logger = logging.getLogger(__name__)
# An object managing the bds objects
self.accepted_parameters_manager = AcceptedParametersManager(self.model)
# counts the number of simulate calls
self.simulation_counter = 0
[docs] def sample(self, observations, n_samples, n_samples_per_param, epsilon, full_output=0):
"""
Samples from the posterior distribution of the model parameter given the observed
data observations.
Parameters
----------
observations: list
A list, containing lists describing the observed data sets
n_samples: integer
Number of samples to generate
n_samples_per_param: integer
Number of data points in each simulated data set.
epsilon: float
Value of threshold
full_output: integer, optional
If full_output==1, intermediate results are included in output journal.
The default value is 0, meaning the intermediate results are not saved.
Returns
-------
abcpy.output.Journal
a journal containing simulation results, metadata and optionally intermediate results.
"""
self.accepted_parameters_manager.broadcast(self.backend, observations)
self.n_samples = n_samples
self.n_samples_per_param = n_samples_per_param
self.epsilon = epsilon
journal = Journal(full_output)
journal.configuration["n_samples"] = self.n_samples
journal.configuration["n_samples_per_param"] = self.n_samples_per_param
journal.configuration["epsilon"] = self.epsilon
accepted_parameters = None
# main Rejection ABC algorithm
seed_arr = self.rng.randint(1, n_samples * n_samples, size=n_samples, dtype=np.int32)
rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr])
rng_pds = self.backend.parallelize(rng_arr)
accepted_parameters_distances_counter_pds = self.backend.map(self._sample_parameter, rng_pds)
accepted_parameters_distances_counter = self.backend.collect(accepted_parameters_distances_counter_pds)
accepted_parameters, distances, counter = [list(t) for t in zip(*accepted_parameters_distances_counter)]
for count in counter:
self.simulation_counter += count
distances = np.array(distances)
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters)
journal.add_accepted_parameters(copy.deepcopy(accepted_parameters))
journal.add_weights(np.ones((n_samples, 1)))
journal.add_ESS_estimate(np.ones((n_samples, 1)))
journal.add_distances(copy.deepcopy(distances))
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters)
names_and_parameters = self._get_names_and_parameters()
journal.add_user_parameters(names_and_parameters)
journal.number_of_simulations.append(self.simulation_counter)
return journal
def _sample_parameter(self, rng, npc=None):
"""
Samples a single model parameter and simulates from it until
distance between simulated outcome and the observation is
smaller than epsilon.
Parameters
----------
rng: random number generator
The random number generator to be used.
Returns
-------
Tuple
The first entry of the tuple is the accepted parameters.
The second entry is the distance between the simulated data set and the observation, while the third one is
the number of simulations needed to obtain the accepted parameter.
"""
distance = self.distance.dist_max()
if distance < self.epsilon and self.logger:
self.logger.warn("initial epsilon {:e} is larger than dist_max {:e}"
.format(float(self.epsilon), distance))
counter = 0
while distance > self.epsilon:
# Accept new parameter value if the distance is less than epsilon
self.sample_from_prior(rng=rng)
theta = self.get_parameters(self.model)
y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc)
counter += 1
if y_sim is not None:
distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim)
self.logger.debug("distance after {:4d} simulations: {:e}".format(
counter, distance))
else:
distance = self.distance.dist_max()
self.logger.debug("Needed {:4d} simulations to reach distance {:e} < epsilon = {:e}".format(counter, distance,
float(
self.epsilon)))
return theta, distance, counter
[docs]class PMCABC(BaseDiscrepancy, InferenceMethod):
"""
This class implements a modified version of Population Monte Carlo based inference scheme for Approximate
Bayesian computation of Beaumont et. al. [1]. Here the threshold value at `t`-th generation are adaptively chosen by
taking the maximum between the epsilon_percentile-th value of discrepancies of the accepted parameters at `t-1`-th
generation and the threshold value provided for this generation by the user. If we take the value of
epsilon_percentile to be zero (default), this method becomes the inference scheme described in [1], where the
threshold values considered at each generation are the ones provided by the user.
[1] M. A. Beaumont. Approximate Bayesian computation in evolution and ecology. Annual Review of Ecology,
Evolution, and Systematics, 41(1):379–406, Nov. 2010.
Parameters
----------
model : list
A list of the Probabilistic models corresponding to the observed datasets
distance : abcpy.distances.Distance
Distance object defining the distance measure to compare simulated and observed data sets.
kernel : abcpy.distributions.Distribution
Distribution object defining the perturbation kernel needed for the sampling.
backend : abcpy.backends.Backend
Backend object defining the backend to be used.
seed : integer, optional
Optional initial seed for the random number generator. The default value is generated randomly.
"""
model = None
distance = None
kernel = None
rng = None
# default value, set so that testing works
n_samples = 2
n_samples_per_param = None
backend = None
[docs] def __init__(self, root_models, distances, backend, kernel=None, seed=None):
self.model = root_models
# We define the joint Linear combination distance using all the distances for each individual models
self.distance = LinearCombination(root_models, distances)
if kernel is None:
mapping, garbage_index = self._get_mapping()
models = []
for mdl, mdl_index in mapping:
models.append(mdl)
kernel = DefaultKernel(models)
self.kernel = kernel
self.backend = backend
self.rng = np.random.RandomState(seed)
self.logger = logging.getLogger(__name__)
self.accepted_parameters_manager = AcceptedParametersManager(self.model)
self.simulation_counter = 0
[docs] def sample(self, observations, steps, epsilon_init, n_samples=10000, n_samples_per_param=1, epsilon_percentile=10,
covFactor=2, full_output=0, journal_file=None):
"""Samples from the posterior distribution of the model parameter given the observed
data observations.
Parameters
----------
observations : list
A list, containing lists describing the observed data sets
steps : integer
Number of iterations in the sequential algoritm ("generations")
epsilon_init : numpy.ndarray
An array of proposed values of epsilon to be used at each steps. Can be supplied
A single value to be used as the threshold in Step 1 or a `steps`-dimensional array of values to be
used as the threshold in evry steps.
n_samples : integer, optional
Number of samples to generate. The default value is 10000.
n_samples_per_param : integer, optional
Number of data points in each simulated data set. The default value is 1.
epsilon_percentile : float, optional
A value between [0, 100]. The default value is 10.
covFactor : float, optional
scaling parameter of the covariance matrix. The default value is 2 as considered in [1].
full_output: integer, optional
If full_output==1, intermediate results are included in output journal.
The default value is 0, meaning the intermediate results are not saved.
journal_file: str, optional
Filename of a journal file to read an already saved journal file, from which the first iteration will start.
The default value is None.
Returns
-------
abcpy.output.Journal
A journal containing simulation results, metadata and optionally intermediate results.
"""
self.accepted_parameters_manager.broadcast(self.backend, observations)
self.n_samples = n_samples
self.n_samples_per_param = n_samples_per_param
if journal_file is None:
journal = Journal(full_output)
journal.configuration["type_model"] = [type(model).__name__ for model in self.model]
journal.configuration["type_dist_func"] = type(self.distance).__name__
journal.configuration["n_samples"] = self.n_samples
journal.configuration["n_samples_per_param"] = self.n_samples_per_param
journal.configuration["steps"] = steps
journal.configuration["epsilon_percentile"] = epsilon_percentile
else:
journal = Journal.fromFile(journal_file)
accepted_parameters = None
accepted_weights = None
accepted_cov_mats = None
# Define epsilon_arr
if len(epsilon_init) == steps:
epsilon_arr = epsilon_init
elif len(epsilon_init) == 1:
epsilon_arr = [None] * steps
epsilon_arr[0] = epsilon_init
else:
raise ValueError("The length of epsilon_init can only be equal to 1 or steps.")
# main PMCABC algorithm
self.logger.info("Starting PMC iterations")
for aStep in range(steps):
self.logger.debug("iteration {} of PMC algorithm".format(aStep))
if aStep == 0 and journal_file is not None:
accepted_parameters = journal.get_accepted_parameters(-1)
accepted_weights = journal.get_weights(-1)
if hasattr(journal, "distances"):
# if restarting from a journal, use the previous distances to check determine a new epsilon
# (it if is larger than the epsilon_arr[0] provided here)
self.logger.info("Calculating acceptances threshold from provided journal file")
epsilon_arr[0] = np.max([np.percentile(journal.distances[-1], epsilon_percentile), epsilon_arr[0]])
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters,
accepted_weights=accepted_weights)
kernel_parameters = []
for kernel in self.kernel.kernels:
kernel_parameters.append(
self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models))
self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters)
# 3: calculate covariance
self.logger.info("Calculateing covariance matrix")
new_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager)
# Since each entry of new_cov_mats is a numpy array, we can multiply like this
# accepted_cov_mats = [covFactor * new_cov_mat for new_cov_mat in new_cov_mats]
accepted_cov_mats = self._compute_accepted_cov_mats(covFactor, new_cov_mats)
seed_arr = self.rng.randint(0, np.iinfo(np.uint32).max, size=n_samples, dtype=np.uint32)
rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr])
rng_pds = self.backend.parallelize(rng_arr)
# 0: update remotely required variables
# print("INFO: Broadcasting parameters.")
self.logger.info("Broadcasting parameters")
self.epsilon = epsilon_arr[aStep]
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters, accepted_weights,
accepted_cov_mats)
# 1: calculate resample parameters
# print("INFO: Resampling parameters")
self.logger.info("Resampling parameters")
params_and_dists_and_counter_pds = self.backend.map(self._resample_parameter, rng_pds)
params_and_dists_and_counter = self.backend.collect(params_and_dists_and_counter_pds)
new_parameters, distances, counter = [list(t) for t in zip(*params_and_dists_and_counter)]
new_parameters = np.array(new_parameters)
distances = np.array(distances)
for count in counter:
self.simulation_counter += count
# Compute epsilon for next step
# print("INFO: Calculating acceptance threshold (epsilon).")
self.logger.info("Calculating acceptances threshold")
if aStep < steps - 1:
if epsilon_arr[aStep + 1] is None:
epsilon_arr[aStep + 1] = np.percentile(distances, epsilon_percentile)
else:
epsilon_arr[aStep + 1] = np.max(
[np.percentile(distances, epsilon_percentile), epsilon_arr[aStep + 1]])
# 2: calculate weights for new parameters
self.logger.info("Calculating weights")
new_parameters_pds = self.backend.parallelize(new_parameters)
self.logger.info("Calculate weights")
new_weights_pds = self.backend.map(self._calculate_weight, new_parameters_pds)
new_weights = np.array(self.backend.collect(new_weights_pds)).reshape(-1, 1)
sum_of_weights = np.sum(new_weights)
new_weights = new_weights / sum_of_weights
# The calculation of cov_mats needs the new weights and new parameters
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=new_parameters,
accepted_weights=new_weights)
# The parameters relevant to each kernel have to be used to calculate n_sample times. It is therefore more efficient to broadcast these parameters once,
# instead of collecting them at each kernel in each step
kernel_parameters = []
for kernel in self.kernel.kernels:
kernel_parameters.append(
self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models))
self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters)
# 3: calculate covariance
self.logger.info("Calculating covariance matrix")
new_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager)
# Since each entry of new_cov_mats is a numpy array, we can multiply like this
# new_cov_mats = [covFactor * new_cov_mat for new_cov_mat in new_cov_mats]
new_cov_mats = self._compute_accepted_cov_mats(covFactor, new_cov_mats)
# 4: Update the newly computed values
accepted_parameters = new_parameters
accepted_weights = new_weights
accepted_cov_mats = new_cov_mats
self.logger.info("Save configuration to output journal")
if (full_output == 1 and aStep <= steps - 1) or (full_output == 0 and aStep == steps - 1):
journal.add_accepted_parameters(copy.deepcopy(accepted_parameters))
journal.add_distances(copy.deepcopy(distances))
journal.add_weights(copy.deepcopy(accepted_weights))
journal.add_ESS_estimate(accepted_weights)
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters,
accepted_weights=accepted_weights)
names_and_parameters = self._get_names_and_parameters()
journal.add_user_parameters(names_and_parameters)
journal.number_of_simulations.append(self.simulation_counter)
# Add epsilon_arr to the journal
if journal_file is not None and "epsilon_arr" in journal.configuration.keys():
journal.configuration["epsilon_arr"] += epsilon_arr
else:
journal.configuration["epsilon_arr"] = epsilon_arr
return journal
def _resample_parameter(self, rng, npc=None):
"""
Samples a single model parameter and simulate from it until
distance between simulated outcome and the observation is
smaller than epsilon.
Parameters
----------
seed: integer
initial seed for the random number generator.
Returns
-------
Tuple
The first entry of the tuple is the accepted parameters.
The second entry is the distance between the simulated data set and the observation, while the third one is
the number of simulations needed to obtain the accepted parameter.
"""
# print(npc.communicator())
rng.seed(rng.randint(np.iinfo(np.uint32).max, dtype=np.uint32))
distance = self.distance.dist_max()
if distance < self.epsilon and self.logger:
self.logger.warn("initial epsilon {:e} is larger than dist_max {:e}"
.format(float(self.epsilon), distance))
theta = self.get_parameters()
counter = 0
while distance > self.epsilon:
if self.accepted_parameters_manager.accepted_parameters_bds is None:
self.sample_from_prior(rng=rng)
theta = self.get_parameters()
y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc)
counter += 1
else:
index = rng.choice(self.n_samples, size=1,
p=self.accepted_parameters_manager.accepted_weights_bds.value().reshape(-1))
# truncate the normal to the bounds of parameter space of the model
# truncating the normal like this is fine: https://arxiv.org/pdf/0907.4010v1.pdf
while True:
perturbation_output = self.perturb(index[0], rng=rng)
if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0:
theta = perturbation_output[1]
break
y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc)
counter += 1
distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim)
self.logger.debug("distance after {:4d} simulations: {:e}".format(counter, distance))
self.logger.debug("Needed {:4d} simulations to reach distance {:e} < epsilon = {:e}".format(counter, distance,
float(
self.epsilon)))
return theta, distance, counter
def _calculate_weight(self, theta, npc=None):
"""
Calculates the weight for the given parameter using
accepted_parameters, accepted_cov_mat
Parameters
----------
theta: np.array
1xp matrix containing model parameter, where p is the number of parameters
Returns
-------
float
the new weight for theta
"""
self.logger.debug("_calculate_weight")
if self.accepted_parameters_manager.kernel_parameters_bds is None:
return 1.0 / self.n_samples
else:
prior_prob = self.pdf_of_prior(self.model, theta, 0)
# Get the mapping of the models to be used by the kernels
mapping_for_kernels, garbage_index = self.accepted_parameters_manager.get_mapping(
self.accepted_parameters_manager.model)
pdf_values = np.array([self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager,
self.accepted_parameters_manager.accepted_parameters_bds.value()[i],
theta) for i in range(self.n_samples)])
denominator = np.sum(self.accepted_parameters_manager.accepted_weights_bds.value().reshape(-1) * pdf_values)
return 1.0 * prior_prob / denominator
def _compute_accepted_cov_mats(self, covFactor, new_cov_mats):
"""
Update the covariance matrices computed from data by multiplying them with covFactor and adding a small term in
the diagonal for numerical stability.
Parameters
----------
covFactor : float
factor to correct the covariance matrices
new_cov_mats : list
list of covariance matrices computed from data
Returns
-------
list
List of new accepted covariance matrices
"""
# accepted_cov_mats = [covFactor * cov_mat for cov_mat in accepted_cov_mats]
accepted_cov_mats = []
for new_cov_mat in new_cov_mats:
if not (new_cov_mat.size == 1):
accepted_cov_mats.append(
covFactor * new_cov_mat + 0.0001 * np.trace(new_cov_mat) * np.eye(new_cov_mat.shape[0]))
else:
accepted_cov_mats.append((covFactor * new_cov_mat + 0.0001 * new_cov_mat).reshape(1, 1))
return accepted_cov_mats
[docs]class PMC(BaseLikelihood, InferenceMethod):
"""
Population Monte Carlo based inference scheme of Cappé et. al. [1].
This algorithm assumes a likelihood function is available and can be evaluated
at any parameter value given the oberved dataset. In absence of the
likelihood function or when it can't be evaluated with a rational
computational expenses, we use the approximated likelihood functions in
abcpy.approx_lhd module, for which the argument of the consistency of the
inference schemes are based on Andrieu and Roberts [2].
[1] Cappé, O., Guillin, A., Marin, J.-M., and Robert, C. P. (2004). Population Monte Carlo.
Journal of Computational and Graphical Statistics, 13(4), 907–929.
[2] C. Andrieu and G. O. Roberts. The pseudo-marginal approach for efficient Monte Carlo computations.
Annals of Statistics, 37(2):697–725, 04 2009.
Parameters
----------
model : list
A list of the Probabilistic models corresponding to the observed datasets
loglikfun : abcpy.approx_lhd.Approx_likelihood
Approx_loglikelihood object defining the approximated loglikelihood to be used.
kernel : abcpy.distributions.Distribution
Distribution object defining the perturbation kernel needed for the sampling.
backend : abcpy.backends.Backend
Backend object defining the backend to be used.
seed : integer, optional
Optional initial seed for the random number generator. The default value is generated randomly.
"""
model = None
likfun = None
kernel = None
rng = None
n_samples = None
n_samples_per_param = None
backend = None
[docs] def __init__(self, root_models, loglikfuns, backend, kernel=None, seed=None):
self.model = root_models
# We define the joint Sum of Loglikelihood functions using all the loglikelihoods for each individual models
self.likfun = SumCombination(root_models, loglikfuns)
if kernel is None:
mapping, garbage_index = self._get_mapping()
models = []
for mdl, mdl_index in mapping:
models.append(mdl)
kernel = DefaultKernel(models)
self.kernel = kernel
self.backend = backend
self.rng = np.random.RandomState(seed)
self.logger = logging.getLogger(__name__)
# these are usually big tables, so we broadcast them to have them once
# per executor instead of once per task
self.accepted_parameters_manager = AcceptedParametersManager(self.model)
self.simulation_counter = 0
[docs] def sample(self, observations, steps, n_samples=10000, n_samples_per_param=100, covFactors=None, iniPoints=None,
full_output=0, journal_file=None):
"""Samples from the posterior distribution of the model parameter given the observed
data observations.
Parameters
----------
observations : list
A list, containing lists describing the observed data sets
steps : integer
number of iterations in the sequential algoritm ("generations")
n_samples : integer, optional
number of samples to generate. The default value is 10000.
n_samples_per_param : integer, optional
number of data points in each simulated data set. The default value is 100.
covFactors : list of float, optional
scaling parameter of the covariance matrix. The default is a p dimensional array of 1 when p is the
dimension of the parameter.
inipoints : numpy.ndarray, optional
parameter vaulues from where the sampling starts. By default sampled from the prior.
full_output: integer, optional
If full_output==1, intermediate results are included in output journal.
The default value is 0, meaning the intermediate results are not saved.
journal_file: str, optional
Filename of a journal file to read an already saved journal file, from which the first iteration will start.
The default value is None.
Returns
-------
abcpy.output.Journal
A journal containing simulation results, metadata and optionally intermediate results.
"""
self.sample_from_prior(rng=self.rng)
self.accepted_parameters_manager.broadcast(self.backend, observations)
self.n_samples = n_samples
self.n_samples_per_param = n_samples_per_param
if journal_file is None:
journal = Journal(full_output)
journal.configuration["type_model"] = [type(model).__name__ for model in self.model]
journal.configuration["type_lhd_func"] = type(self.likfun).__name__
journal.configuration["n_samples"] = self.n_samples
journal.configuration["n_samples_per_param"] = self.n_samples_per_param
journal.configuration["steps"] = steps
journal.configuration["covFactor"] = covFactors
journal.configuration["iniPoints"] = iniPoints
else:
journal = Journal.fromFile(journal_file)
accepted_parameters = None
accepted_weights = None
accepted_cov_mats = None
new_theta = None
dim = len(self.get_parameters())
# Initialize particles: When not supplied, randomly draw them from prior distribution
# Weights of particles: Assign equal weights for each of the particles
if iniPoints == None:
accepted_parameters = []
for ind in range(0, n_samples):
self.sample_from_prior(rng=self.rng)
accepted_parameters.append(self.get_parameters())
accepted_weights = np.ones((n_samples, 1), dtype=np.float) / n_samples
else:
accepted_parameters = iniPoints
accepted_weights = np.ones((iniPoints.shape[0], 1), dtype=np.float) / iniPoints.shape[0]
if covFactors is None:
covFactors = np.ones(shape=(len(self.kernel.kernels),))
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters,
accepted_weights=accepted_weights)
# The parameters relevant to each kernel have to be used to calculate n_sample times. It is therefore more efficient
# to broadcast these parameters once, instead of collecting them at each kernel in each step
kernel_parameters = []
for kernel in self.kernel.kernels:
kernel_parameters.append(
self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models))
self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters)
# 3: calculate covariance
self.logger.info("Calculating covariance matrix")
new_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager)
# Since each entry of new_cov_mats is a numpy array, we can multiply like this
accepted_cov_mats = self._compute_accepted_cov_mats(covFactors, new_cov_mats)
# accepted_cov_mats = [covFactor * new_cov_mat for covFactor, new_cov_mat in zip(covFactors,new_cov_mats)]
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_cov_mats=accepted_cov_mats)
# main SMC algorithm
self.logger.info("Starting pmc iterations")
for aStep in range(steps):
if aStep == 0 and journal_file is not None:
accepted_parameters = journal.get_accepted_parameters(-1)
accepted_weights = journal.get_weights(-1)
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters,
accepted_weights=accepted_weights)
kernel_parameters = []
for kernel in self.kernel.kernels:
kernel_parameters.append(
self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models))
self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters)
# 3: calculate covariance
self.logger.info("Calculating covariance matrix")
new_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager)
# Since each entry of new_cov_mats is a numpy array, we can multiply like this
accepted_cov_mats = self._compute_accepted_cov_mats(covFactors, new_cov_mats)
# accepted_cov_mats = [covFactor * new_cov_mat for covFactor, new_cov_mat in zip(covFactors, new_cov_mats)]
self.logger.info("Iteration {} of PMC algorithm".format(aStep))
# 0: update remotely required variables
self.logger.info("Broadcasting parameters")
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters,
accepted_weights=accepted_weights,
accepted_cov_mats=accepted_cov_mats)
# 1: Resample parameters
self.logger.info("Resample parameters")
index = self.rng.choice(len(accepted_parameters), size=n_samples, p=accepted_weights.reshape(-1))
# Choose a new particle using the resampled particle (make the boundary proper)
# Initialize new_parameters
new_parameters = []
for ind in range(0, self.n_samples):
while True:
perturbation_output = self.perturb(index[ind], rng=self.rng)
if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0:
new_parameters.append(perturbation_output[1])
break
# 2: calculate approximate likelihood for new parameters
self.logger.info("Calculate approximate loglikelihood")
seed_arr = self.rng.randint(0, np.iinfo(np.uint32).max, size=self.n_samples, dtype=np.uint32)
rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr])
data_arr = []
for i in range(len(rng_arr)):
data_arr.append([new_parameters[i], rng_arr[i]])
data_pds = self.backend.parallelize(data_arr)
approx_log_likelihood_new_parameters_and_counter_pds = self.backend.map(self._approx_log_lik_calc, data_pds)
self.logger.debug("collect approximate likelihood from pds")
approx_log_likelihood_new_parameters_and_counter = self.backend.collect(
approx_log_likelihood_new_parameters_and_counter_pds)
approx_log_likelihood_new_parameters, counter = [list(t) for t in
zip(*approx_log_likelihood_new_parameters_and_counter)]
approx_log_likelihood_new_parameters = np.array(approx_log_likelihood_new_parameters).reshape(-1, 1)
for count in counter:
self.simulation_counter += count
# 3: calculate new weights for new parameters
self.logger.info("Calculating weights")
new_parameters_pds = self.backend.parallelize(new_parameters)
new_weights_pds = self.backend.map(self._calculate_weight, new_parameters_pds)
new_weights = np.array(self.backend.collect(new_weights_pds)).reshape(-1, 1)
new_log_weights = np.log(new_weights) + approx_log_likelihood_new_parameters
# we get numerical issues often:
self.logger.info("range_of_weights (log): {}".format(np.max(new_log_weights) - np.min(new_log_weights)))
# sorted_weights = np.sort(new_log_weights)
# self.logger.info("Difference first second largest (log): {}".format(sorted_weights[-1] - sorted_weights[-2]))
# center on log scale (avoid numerical issues):
new_weights = np.exp(new_log_weights - np.max(new_log_weights))
sum_of_weights = np.sum(new_weights)
new_weights = new_weights / sum_of_weights
# self.logger.info("new_weights : ", new_weights, ", sum_of_weights : ", sum_of_weights)
self.logger.info("sum_of_weights : {}".format(sum_of_weights))
self.logger.info("ESS : {}".format(1 / sum(pow(new_weights / sum(new_weights), 2))[0]))
accepted_parameters = new_parameters
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters,
accepted_weights=new_weights)
# 4: calculate covariance
# The parameters relevant to each kernel have to be used to calculate n_sample times. It is therefore more efficient to broadcast these parameters once, instead of collecting them at each kernel in each step
self.logger.info("Calculating covariance matrix")
kernel_parameters = []
for kernel in self.kernel.kernels:
kernel_parameters.append(
self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models))
self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters)
# 3: calculate covariance
self.logger.info("Calculating covariance matrix")
new_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager)
# Since each entry of new_cov_mats is a numpy array, we can multiply like this
accepted_cov_mats = self._compute_accepted_cov_mats(covFactors, new_cov_mats)
# new_cov_mats = [covFactor * new_cov_mat for covFactor, new_cov_mat in zip(covFactors, new_cov_mats)]
# 5: Update the newly computed values
accepted_parameters = new_parameters
accepted_weights = new_weights
# accepted_cov_mats = new_cov_mats
self.logger.info("Saving configuration to output journal")
if (full_output == 1 and aStep <= steps - 1) or (full_output == 0 and aStep == steps - 1):
journal.add_accepted_parameters(copy.deepcopy(accepted_parameters))
journal.add_weights(copy.deepcopy(accepted_weights))
journal.add_ESS_estimate(accepted_weights)
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters,
accepted_weights=accepted_weights)
names_and_parameters = self._get_names_and_parameters()
journal.add_user_parameters(names_and_parameters)
journal.number_of_simulations.append(self.simulation_counter)
return journal
# define helper functions for map step
def _approx_log_lik_calc(self, data, npc=None):
"""
Compute likelihood for new parameters using approximate likelihood function
Parameters
----------
data: list
A list containing a parameter value and a random numpy state, e.g. [theta, rng]
Returns
-------
float
The approximated likelihood function
"""
# Extract theta and rng
theta, rng = data[0], data[1]
# Simulate the fake data from the model given the parameter value theta
self.logger.debug("Simulate model for parameter " + str(theta))
self.set_parameters(theta)
y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc)
self.logger.debug("Extracting observation.")
obs = self.accepted_parameters_manager.observations_bds.value()
self.logger.debug("Computing likelihood...")
total_pdf_at_theta = 1.
# lhd = self.likfun.likelihood(obs, y_sim)
loglhd = self.likfun.loglikelihood(obs, y_sim)
self.logger.debug("LogLikelihood is :" + str(loglhd))
log_pdf_at_theta = np.log(self.pdf_of_prior(self.model, theta))
self.logger.debug("Prior pdf evaluated at theta is :" + str(log_pdf_at_theta))
log_pdf_at_theta += loglhd
return log_pdf_at_theta, self.n_samples_per_param
def _calculate_weight(self, theta, npc=None):
"""
Calculates the weight for the given parameter using
accepted_parameters, accepted_cov_mat
Parameters
----------
theta: np.ndarray
1xp matrix containing the model parameters, where p is the number of parameters
Returns
-------
float
The new weight for theta
"""
self.logger.debug("_calculate_weight")
if self.accepted_parameters_manager.accepted_weights_bds is None:
return 1.0 / self.n_samples
else:
prior_prob = self.pdf_of_prior(self.model, theta)
mapping_for_kernels, garbage_index = self.accepted_parameters_manager.get_mapping(
self.accepted_parameters_manager.model)
pdf_values = np.array([self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager,
self.accepted_parameters_manager.accepted_parameters_bds.value()[i],
theta) for i in range(self.n_samples)])
denominator = np.sum(self.accepted_parameters_manager.accepted_weights_bds.value().reshape(-1) * pdf_values)
return 1.0 * prior_prob / denominator
def _compute_accepted_cov_mats(self, covFactors, new_cov_mats):
"""
Update the covariance matrices computed from data by multiplying them with covFactors and adding a small term in
the diagonal for numerical stability.
Parameters
----------
covFactors : list of float
factors to correct the covariance matrices
new_cov_mats : list
list of covariance matrices computed from data
Returns
-------
list
List of new accepted covariance matrices
"""
accepted_cov_mats = []
for covFactor, new_cov_mat in zip(covFactors, new_cov_mats):
if not (new_cov_mat.size == 1):
accepted_cov_mats.append(
covFactor * new_cov_mat + 0.0001 * np.trace(new_cov_mat) * np.eye(new_cov_mat.shape[0]))
else:
accepted_cov_mats.append((covFactor * new_cov_mat + 0.0001 * new_cov_mat).reshape(1, 1))
return accepted_cov_mats
[docs]class SABC(BaseDiscrepancy, InferenceMethod):
"""
This class implements a modified version of Simulated Annealing Approximate Bayesian Computation (SABC) of [1]
when the prior is non-informative.
[1] C. Albert, H. R. Kuensch and A. Scheidegger. A Simulated Annealing Approach to
Approximate Bayes Computations. Statistics and Computing, (2014).
Parameters
----------
model : list
A list of the Probabilistic models corresponding to the observed datasets
distance : abcpy.distances.Distance
Distance object defining the distance measure used to compare simulated and observed data sets.
kernel : abcpy.distributions.Distribution
Distribution object defining the perturbation kernel needed for the sampling.
backend : abcpy.backends.Backend
Backend object defining the backend to be used.
seed : integer, optional
Optional initial seed for the random number generator. The default value is generated randomly.
"""
model = None
distance = None
kernel = None
rng = None
n_samples = None
n_samples_per_param = None
epsilon = None
smooth_distances_bds = None
all_distances_bds = None
backend = None
[docs] def __init__(self, root_models, distances, backend, kernel=None, seed=None):
self.model = root_models
# We define the joint Linear combination distance using all the distances for each individual models
self.distance = LinearCombination(root_models, distances)
if kernel is None:
mapping, garbage_index = self._get_mapping()
models = []
for mdl, mdl_index in mapping:
models.append(mdl)
kernel = DefaultKernel(models)
self.kernel = kernel
self.backend = backend
self.rng = np.random.RandomState(seed)
self.logger = logging.getLogger(__name__)
# these are usually big tables, so we broadcast them to have them once
# per executor instead of once per task
self.smooth_distances_bds = None
self.all_distances_bds = None
self.accepted_parameters_manager = AcceptedParametersManager(self.model)
self.simulation_counter = 0
[docs] def sample(self, observations, steps, epsilon, n_samples=10000, n_samples_per_param=1, beta=2, delta=0.2,
v=0.3, ar_cutoff=0.1, resample=None, n_update=None, full_output=0, journal_file=None):
"""Samples from the posterior distribution of the model parameter given the observed
data observations.
Parameters
----------
observations : list
A list, containing lists describing the observed data sets
steps : integer
Number of maximum iterations in the sequential algoritm ("generations")
epsilon : numpy.float
A proposed value of threshold to start with.
n_samples : integer, optional
Number of samples to generate. The default value is 10000.
n_samples_per_param : integer, optional
Number of data points in each simulated data set. The default value is 1.
beta : numpy.float, optional
Tuning parameter of SABC, default value is 2. Used to scale up the covariance matrices obtained from data.
delta : numpy.float, optional
Tuning parameter of SABC, default value is 0.2.
v : numpy.float, optional
Tuning parameter of SABC, The default value is 0.3.
ar_cutoff : numpy.float, optional
Acceptance ratio cutoff: if the acceptance rate at some iteration of the algorithm is lower than that, the
algorithm will stop. The default value is 0.1.
resample: int, optional
At any iteration, perform a resampling step if the number of accepted particles is larger than resample.
When not provided, it assumes resample to be equal to n_samples.
n_update: int, optional
Number of perturbed parameters at each step, The default value is None which takes value inside n_samples
full_output: integer, optional
If full_output==1, intermediate results are included in output journal.
The default value is 0, meaning the intermediate results are not saved.
journal_file: str, optional
Filename of a journal file to read an already saved journal file, from which the first iteration will start.
The default value is None.
Returns
-------
abcpy.output.Journal
A journal containing simulation results, metadata and optionally intermediate results.
"""
global broken_preemptively
self.sample_from_prior(rng=self.rng)
self.accepted_parameters_manager.broadcast(self.backend, observations)
self.epsilon = epsilon
self.n_samples = n_samples
self.n_samples_per_param = n_samples_per_param
if journal_file is None:
journal = Journal(full_output)
journal.configuration["type_model"] = [type(model).__name__ for model in self.model]
journal.configuration["type_dist_func"] = type(self.distance).__name__
journal.configuration["type_kernel_func"] = type(self.kernel)
journal.configuration["n_samples"] = self.n_samples
journal.configuration["n_samples_per_param"] = self.n_samples_per_param
journal.configuration["beta"] = beta
journal.configuration["delta"] = delta
journal.configuration["v"] = v
journal.configuration["ar_cutoff"] = ar_cutoff
journal.configuration["resample"] = resample
journal.configuration["n_update"] = n_update
journal.configuration["full_output"] = full_output
else:
journal = Journal.fromFile(journal_file)
accepted_parameters = None
distances = np.zeros(shape=(n_samples,))
smooth_distances = np.zeros(shape=(n_samples,))
accepted_weights = np.ones(shape=(n_samples, 1))
all_distances = None
accepted_cov_mat = None
if resample is None:
resample = n_samples
if n_update is None:
n_update = n_samples
sample_array = np.ones(shape=(steps,))
sample_array[0] = n_samples
sample_array[1:] = n_update
# Acceptance counter to determine the resampling step
accept = 0
samples_until = 0
# Counter whether broken preemptively
broken_preemptively = False
for aStep in range(0, steps):
self.logger.debug("step {}".format(aStep))
if aStep == 0 and journal_file is not None:
accepted_parameters = journal.get_accepted_parameters(-1)
accepted_weights = journal.get_weights(-1)
# Broadcast Accepted parameters and Accepted weights
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters,
accepted_weights=accepted_weights)
kernel_parameters = []
for kernel in self.kernel.kernels:
kernel_parameters.append(
self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models))
# Broadcast Accepted Kernel parameters
self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters)
new_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager)
accepted_cov_mats = self._compute_accepted_cov_mats(beta, new_cov_mats)
# Broadcast Accepted Covariance Matrix
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_cov_mats=accepted_cov_mats)
# main SABC algorithm
# print("INFO: Initialization of SABC")
seed_arr = self.rng.randint(0, np.iinfo(np.uint32).max, size=int(sample_array[aStep]), dtype=np.uint32)
rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr])
index_arr = self.rng.randint(0, self.n_samples, size=int(sample_array[aStep]), dtype=np.uint32)
data_arr = []
for i in range(len(rng_arr)):
data_arr.append([rng_arr[i], index_arr[i]])
data_pds = self.backend.parallelize(data_arr)
# 0: update remotely required variables
self.logger.info("Broadcasting parameters")
self.epsilon = epsilon
self._update_broadcasts(smooth_distances, all_distances)
# 1: Calculate parameters
self.logger.info("Initial accepted parameters")
params_and_dists_pds = self.backend.map(self._accept_parameter, data_pds)
self.logger.debug("Map step of parallelism is finished")
params_and_dists = self.backend.collect(params_and_dists_pds)
self.logger.debug("Collect step of parallelism is finished")
new_parameters, new_distances, new_all_parameters, new_all_distances, index, acceptance, counter = [
list(t) for t in zip(*params_and_dists)]
# Keeping counter of number of simulations
self.logger.debug("Counting number of simulations")
for count in counter:
self.simulation_counter += count
# new_parameters = np.array(new_parameters)
new_distances = np.array(new_distances)
new_all_distances = np.concatenate(new_all_distances)
index = np.array(index)
acceptance = np.array(acceptance)
# Reading all_distances at Initial step
if aStep == 0:
index = np.linspace(0, n_samples - 1, n_samples).astype(int).reshape(n_samples, )
accept = 0
all_distances = new_all_distances
# Initialize/Update the accepted parameters and their corresponding distances
self.logger.info("Initialize/Update the accepted parameters and their corresponding distances")
if accepted_parameters is None:
accepted_parameters = new_parameters
else:
for ind in range(len(acceptance)):
if acceptance[ind] == 1:
accepted_parameters[index[ind]] = new_parameters[ind]
distances[index[acceptance == 1]] = new_distances[acceptance == 1]
# 2: Smoothing of the distances
self.logger.info("Smoothing of the distance")
smooth_distances[index[acceptance == 1]] = self._smoother_distance(distances[index[acceptance == 1]],
all_distances)
# 3: Initialize/Update U, epsilon and covariance of perturbation kernel
self.logger.info("Initialize/Update U, epsilon and covariance of perturbation kernel")
if aStep == 0:
U = self._average_redefined_distance(self._smoother_distance(all_distances, all_distances), epsilon)
else:
U = np.mean(smooth_distances)
epsilon = self._schedule(U, v)
# 4: Show progress and if acceptance rate smaller than a value break the iteration
if aStep > 0:
accept = accept + np.sum(acceptance)
samples_until = samples_until + sample_array[aStep]
acceptance_rate = accept / samples_until
msg = ("updates= {:.2f}, epsilon= {}, u.mean={:e}, acceptance rate: {:.2f}".format(
np.sum(sample_array[1:aStep + 1]) / np.sum(sample_array[1:]) * 100, epsilon, U, acceptance_rate))
self.logger.info(msg)
if acceptance_rate < ar_cutoff:
broken_preemptively = True
self.logger.info("Stopping as acceptance rate is lower than cutoff")
break
# 5: Resampling if number of accepted particles greater than resample
if accept >= resample and U > 1e-100:
self.logger.info("Weighted resampling")
weight = np.exp(-smooth_distances * delta / U)
weight = weight / sum(weight)
index_resampled = self.rng.choice(n_samples, n_samples, replace=True, p=weight)
accepted_parameters = [accepted_parameters[i] for i in index_resampled]
smooth_distances = smooth_distances[index_resampled]
distances = distances[index_resampled]
# Update U and epsilon:
epsilon = epsilon * (1 - delta)
U = np.mean(smooth_distances)
epsilon = self._schedule(U, v)
# Print effective sampling size
self.logger.info('Resampling: Effective sampling size: ' + str(1 / sum(pow(weight / sum(weight), 2))))
accept = 0
samples_until = 0
# Compute and broadcast accepted parameters, accepted kernel parameters and accepted Covariance matrix
# Broadcast Accepted parameters and add to journal
self.logger.info("Broadcast Accepted parameters and add to journal")
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_weights=accepted_weights,
accepted_parameters=accepted_parameters)
# Compute Accepted Kernel parameters and broadcast them
self.logger.debug("Compute Accepted Kernel parameters and broadcast them")
kernel_parameters = []
for kernel in self.kernel.kernels:
kernel_parameters.append(
self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models))
self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters)
# Compute Kernel Covariance Matrix and broadcast it
self.logger.debug("Compute Kernel Covariance Matrix and broadcast it")
new_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager)
accepted_cov_mats = self._compute_accepted_cov_mats(beta, new_cov_mats)
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_cov_mats=accepted_cov_mats)
if full_output == 1 and aStep <= steps - 1:
# Saving intermediate configuration to output journal.
self.logger.info('Saving after resampling')
journal.add_accepted_parameters(copy.deepcopy(accepted_parameters))
journal.add_weights(copy.deepcopy(accepted_weights))
journal.add_ESS_estimate(accepted_weights)
journal.add_distances(copy.deepcopy(distances))
names_and_parameters = self._get_names_and_parameters()
journal.add_user_parameters(names_and_parameters)
journal.number_of_simulations.append(self.simulation_counter)
else:
# Compute and broadcast accepted parameters, accepted kernel parameters and accepted Covariance matrix
# Broadcast Accepted parameters
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_weights=accepted_weights,
accepted_parameters=accepted_parameters)
# Compute Accepted Kernel parameters and broadcast them
kernel_parameters = []
for kernel in self.kernel.kernels:
kernel_parameters.append(
self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models))
self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters)
# Compute Kernel Covariance Matrix and broadcast it
new_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager)
accepted_cov_mats = self._compute_accepted_cov_mats(beta, new_cov_mats)
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_cov_mats=accepted_cov_mats)
if full_output == 1 and aStep <= steps - 1:
# Saving intermediate configuration to output journal.
journal.add_accepted_parameters(copy.deepcopy(accepted_parameters))
journal.add_weights(copy.deepcopy(accepted_weights))
journal.add_ESS_estimate(accepted_weights)
journal.add_distances(copy.deepcopy(distances))
names_and_parameters = self._get_names_and_parameters()
journal.add_user_parameters(names_and_parameters)
journal.number_of_simulations.append(self.simulation_counter)
# Add epsilon_arr, number of final steps and final output to the journal
# print("INFO: Saving final configuration to output journal.")
if (full_output == 0) or (full_output == 1 and broken_preemptively and aStep <= steps - 1):
journal.add_accepted_parameters(copy.deepcopy(accepted_parameters))
journal.add_weights(copy.deepcopy(accepted_weights))
journal.add_ESS_estimate(accepted_weights)
journal.add_distances(copy.deepcopy(distances))
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters,
accepted_weights=accepted_weights)
names_and_parameters = self._get_names_and_parameters()
journal.add_user_parameters(names_and_parameters)
journal.number_of_simulations.append(self.simulation_counter)
journal.configuration["steps"] = aStep + 1
journal.configuration["epsilon"] = epsilon
return journal
def _smoother_distance(self, distance, old_distance):
"""Smooths the distance using the Equation 14 of [1].
[1] C. Albert, H. R. Kuensch and A. Scheidegger. A Simulated Annealing Approach to
Approximate Bayes Computations. Statistics and Computing 0960-3174 (2014).
Parameters
----------
distance: numpy.ndarray
Current distance between the simulated and observed data
old_distance: numpy.ndarray
Last distance between the simulated and observed data
Returns
-------
numpy.ndarray
Smoothed distance
"""
smoothed_distance = np.zeros(shape=(len(distance),))
for ind in range(0, len(distance)):
if distance[ind] < np.min(old_distance):
smoothed_distance[ind] = (distance[ind] / np.min(old_distance)) / len(old_distance)
else:
smoothed_distance[ind] = np.mean(np.array(old_distance) < distance[ind])
return smoothed_distance
def _average_redefined_distance(self, distance, epsilon):
"""
Function to calculate the weighted average of the distance
Parameters
----------
distance: numpy.ndarray
Distance between simulated and observed data set
epsilon: float
threshold
Returns
-------
numpy.ndarray
Weighted average of the distance
"""
if epsilon == 0:
U = 0
else:
U = np.average(distance, weights=np.exp(-distance / epsilon))
return U
def _schedule(self, rho, v):
if rho < 1e-100:
epsilon = 0
else:
fun = lambda epsilon: pow(epsilon, 2) + v * pow(epsilon, 3 / 2) - pow(rho, 2)
epsilon = optimize.fsolve(fun, rho / 2)
return epsilon
def _update_broadcasts(self, smooth_distances, all_distances):
def destroy(bc):
if bc != None:
bc.unpersist
# bc.destroy
if not smooth_distances is None:
self.smooth_distances_bds = self.backend.broadcast(smooth_distances)
if not all_distances is None:
self.all_distances_bds = self.backend.broadcast(all_distances)
# define helper functions for map step
def _accept_parameter(self, data, npc=None):
"""
Samples a single model parameter and simulate from it until
accepted with probabilty exp[-rho(x,y)/epsilon].
Parameters
----------
seed_and_index: list of two integers
Initial seed for the random number generator and the index of data to operate on
Returns
-------
numpy.ndarray
accepted parameter
"""
if isinstance(data, np.ndarray):
data = data.tolist()
rng = data[0]
index = data[1]
rng.seed(rng.randint(np.iinfo(np.uint32).max, dtype=np.uint32))
all_parameters = []
all_distances = []
acceptance = 0
counter = 0
if self.accepted_parameters_manager.accepted_cov_mats_bds is None:
while acceptance == 0:
self.sample_from_prior(rng=rng)
new_theta = self.get_parameters()
all_parameters.append(new_theta)
t0 = time.time()
y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc)
self.logger.debug("Simulation took " + str(time.time() - t0) + "sec")
counter += 1
distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim)
all_distances.append(distance)
acceptance = rng.binomial(1, np.exp(-distance / self.epsilon), 1)
acceptance = 1
else:
# Select one arbitrary particle:
index = rng.choice(self.n_samples, size=1)[0]
# Sample proposal parameter and calculate new distance:
theta = self.accepted_parameters_manager.accepted_parameters_bds.value()[index]
while True:
perturbation_output = self.perturb(index, rng=rng)
if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0:
new_theta = perturbation_output[1]
break
t0 = time.time()
y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc)
self.logger.debug("Simulation took " + str(time.time() - t0) + "sec")
counter += 1
distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim)
smooth_distance = self._smoother_distance([distance], self.all_distances_bds.value())
# Calculate acceptance probability:
self.logger.debug("Calulate acceptance probability")
ratio_prior_prob = self.pdf_of_prior(self.model, perturbation_output[1]) / self.pdf_of_prior(
self.model, self.accepted_parameters_manager.accepted_parameters_bds.value()[index])
ratio_likelihood_prob = np.exp((self.smooth_distances_bds.value()[index] - smooth_distance) / self.epsilon)
acceptance_prob = ratio_prior_prob * ratio_likelihood_prob
# If accepted
if rng.rand(1) < acceptance_prob:
acceptance = 1
else:
distance = np.inf
return new_theta, distance, all_parameters, all_distances, index, acceptance, counter
def _compute_accepted_cov_mats(self, beta, new_cov_mats):
"""
Update the covariance matrices computed from data by multiplying them with beta and adding a small term in
the diagonal for numerical stability.
Parameters
----------
beta : float
factor to correct the covariance matrices
new_cov_mats : list
list of covariance matrices computed from data
Returns
-------
list
List of new accepted covariance matrices
"""
accepted_cov_mats = []
for new_cov_mat in new_cov_mats:
if not (new_cov_mat.size == 1):
accepted_cov_mats.append(
beta * new_cov_mat + 0.0001 * np.trace(new_cov_mat) * np.eye(new_cov_mat.shape[0]))
else:
accepted_cov_mats.append((beta * new_cov_mat + 0.0001 * new_cov_mat).reshape(1, 1))
return accepted_cov_mats
[docs]class ABCsubsim(BaseDiscrepancy, InferenceMethod):
"""This class implements Approximate Bayesian Computation by subset simulation (ABCsubsim) algorithm of [1].
[1] M. Chiachio, J. L. Beck, J. Chiachio, and G. Rus., Approximate Bayesian computation by subset
simulation. SIAM J. Sci. Comput., 36(3):A1339–A1358, 2014/10/03 2014.
Parameters
----------
model : list
A list of the Probabilistic models corresponding to the observed datasets
distance : abcpy.distances.Distance
Distance object defining the distance used to compare the simulated and observed data sets.
kernel : abcpy.distributions.Distribution
Distribution object defining the perturbation kernel needed for the sampling.
backend : abcpy.backends.Backend
Backend object defining the backend to be used.
seed : integer, optional
Optional initial seed for the random number generator. The default value is generated randomly.
"""
model = None
distance = None
kernel = None
rng = None
anneal_parameter = None
n_samples = None
n_samples_per_param = None
chain_length = None
backend = None
[docs] def __init__(self, root_models, distances, backend, kernel=None, seed=None):
self.model = root_models
# We define the joint Linear combination distance using all the distances for each individual models
self.distance = LinearCombination(root_models, distances)
if kernel is None:
mapping, garbage_index = self._get_mapping()
models = []
for mdl, mdl_index in mapping:
models.append(mdl)
kernel = DefaultKernel(models)
self.kernel = kernel
self.backend = backend
self.rng = np.random.RandomState(seed)
self.anneal_parameter = None
self.logger = logging.getLogger(__name__)
# these are usually big tables, so we broadcast them to have them once
# per executor instead of once per task
self.accepted_parameters_manager = AcceptedParametersManager(self.model)
self.simulation_counter = 0
[docs] def sample(self, observations, steps, n_samples=10000, n_samples_per_param=1, chain_length=10, ap_change_cutoff=10,
full_output=0, journal_file=None):
"""Samples from the posterior distribution of the model parameter given the observed
data observations.
Parameters
----------
observations : list
A list, containing lists describing the observed data sets
steps : integer
Number of iterations in the sequential algoritm ("generations")
n_samples : integer, optional
Number of samples to generate. The default value is 10000.
n_samples_per_param : integer, optional
Number of data points in each simulated data set. The default value is 1.
chain_length : int, optional
The length of chains, default value is 10. But should be checked such that this is an divisor of n_samples.
ap_change_cutoff : float, optional
The cutoff value for the percentage change in the anneal parameter. If the change is less than
ap_change_cutoff the iterations are stopped. The default value is 10.
full_output: integer, optional
If full_output==1, intermediate results are included in output journal.
The default value is 0, meaning the intermediate results are not saved.
journal_file: str, optional
Filename of a journal file to read an already saved journal file, from which the first iteration will start.
The default value is None.
Returns
-------
abcpy.output.Journal
A journal containing simulation results, metadata and optionally intermediate results.
"""
self.sample_from_prior(rng=self.rng)
self.accepted_parameters_manager.broadcast(self.backend, observations)
self.chain_length = chain_length
self.n_samples = n_samples
self.n_samples_per_param = n_samples_per_param
if journal_file is None:
journal = Journal(full_output)
journal.configuration["type_model"] = [type(model).__name__ for model in self.model]
journal.configuration["type_dist_func"] = type(self.distance).__name__
journal.configuration["type_kernel_func"] = type(self.kernel)
journal.configuration["n_samples"] = self.n_samples
journal.configuration["n_samples_per_param"] = self.n_samples_per_param
journal.configuration["chain_length"] = self.chain_length
journal.configuration["ap_change_cutoff"] = ap_change_cutoff
journal.configuration["full_output"] = full_output
else:
journal = Journal.fromFile(journal_file)
accepted_parameters = None
accepted_weights = np.ones(shape=(n_samples, 1))
accepted_cov_mat = None
anneal_parameter = 0
anneal_parameter_old = 0
temp_chain_length = 1
for aStep in range(0, steps):
self.logger.info("ABCsubsim step {}".format(aStep))
if aStep == 0 and journal_file is not None:
accepted_parameters = journal.get_accepted_parameters(-1)
accepted_weights = journal.get_weights(-1)
accepted_cov_mats = journal.opt_values[-1]
# main ABCsubsim algorithm
self.logger.info("Initialization of ABCsubsim")
seed_arr = self.rng.randint(0, np.iinfo(np.uint32).max, size=int(n_samples / temp_chain_length),
dtype=np.uint32)
rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr])
index_arr = np.linspace(0, n_samples // temp_chain_length - 1, n_samples // temp_chain_length).astype(
int).reshape(int(n_samples / temp_chain_length), )
rng_and_index_arr = np.column_stack((rng_arr, index_arr))
rng_and_index_pds = self.backend.parallelize(rng_and_index_arr)
# 0: update remotely required variables
self.logger.info("Broadcasting parameters")
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_weights=accepted_weights,
accepted_parameters=accepted_parameters)
# 1: Calculate parameters
# print("INFO: Initial accepted parameter parameters")
self.logger.info("Initial accepted parameters")
params_and_dists_pds = self.backend.map(self._accept_parameter, rng_and_index_pds)
self.logger.debug("Map random number to a pseudo-observation")
params_and_dists = self.backend.collect(params_and_dists_pds)
self.logger.debug("Collect results from the mapping")
new_parameters, new_distances, counter = [list(t) for t in zip(*params_and_dists)]
for count in counter:
self.simulation_counter += count
if aStep > 0:
accepted_parameters = []
for ind in range(len(new_parameters)):
accepted_parameters += new_parameters[ind]
else:
accepted_parameters = new_parameters
distances = np.concatenate(new_distances)
# 2: Sort and renumber samples
self.logger.debug("Sort and renumber samples.")
accepted_params_and_dist = zip(distances, accepted_parameters)
accepted_params_and_dist = sorted(accepted_params_and_dist, key=lambda x: x[0])
distances, accepted_parameters = [list(t) for t in zip(*accepted_params_and_dist)]
# 3: Calculate and broadcast annealing parameters
self.logger.debug("Calculate and broadcast annealling parameters.")
temp_chain_length = self.chain_length
if aStep > 0:
anneal_parameter_old = anneal_parameter
anneal_parameter = 0.5 * (
distances[int(n_samples / temp_chain_length)] + distances[int(n_samples / temp_chain_length) + 1])
self.anneal_parameter = anneal_parameter
# 4: Update proposal covariance matrix (Parallelized)
self.logger.debug("Update proposal covariance matrix (Parallelized).")
if aStep == 0:
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters)
kernel_parameters = []
for kernel in self.kernel.kernels:
kernel_parameters.append(
self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models))
self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters)
accepted_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager)
else:
accepted_cov_mats = pow(2, 1) * accepted_cov_mats
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_cov_mats=accepted_cov_mats)
seed_arr = self.rng.randint(0, np.iinfo(np.uint32).max, size=10, dtype=np.uint32)
rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr])
index_arr = np.linspace(0, 10 - 1, 10).astype(int).reshape(10, )
rng_and_index_arr = np.column_stack((rng_arr, index_arr))
rng_and_index_pds = self.backend.parallelize(rng_and_index_arr)
self.logger.debug("Update co-variance matrix in parallel (map).")
cov_mats_index_pds = self.backend.map(self._update_cov_mat, rng_and_index_pds)
self.logger.debug("Collect co-variance matrix.")
cov_mats_index = self.backend.collect(cov_mats_index_pds)
cov_mats, T, accept_index, counter = [list(t) for t in zip(*cov_mats_index)]
for count in counter:
self.simulation_counter += count
for ind in range(10):
if accept_index[ind] == 1:
accepted_cov_mats = cov_mats[ind]
break
self.logger.debug("Broadcast accepted parameters.")
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_cov_mats=accepted_cov_mats)
if full_output == 1:
self.logger.info("Saving intermediate configuration to output journal")
journal.add_accepted_parameters(copy.deepcopy(accepted_parameters))
journal.add_distances(copy.deepcopy(distances))
journal.add_weights(copy.deepcopy(accepted_weights))
journal.add_ESS_estimate(accepted_weights)
journal.add_opt_values(accepted_cov_mats)
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters,
accepted_weights=accepted_weights)
names_and_parameters = self._get_names_and_parameters()
journal.add_user_parameters(names_and_parameters)
journal.number_of_simulations.append(self.simulation_counter)
# Show progress
anneal_parameter_change_percentage = 100 * abs(anneal_parameter_old - anneal_parameter) / abs(
anneal_parameter)
msg = ("step: {}, annealing parameter: {:.4f}, change(%) in annealing parameter: {:.1f}"
.format(aStep, anneal_parameter, anneal_parameter_change_percentage))
self.logger.info(msg)
if anneal_parameter_change_percentage < ap_change_cutoff:
break
# Add anneal_parameter, number of final steps and final output to the journal
# print("INFO: Saving final configuration to output journal.")
if full_output == 0:
journal.add_accepted_parameters(copy.deepcopy(accepted_parameters))
journal.add_distances(copy.deepcopy(distances))
journal.add_weights(copy.deepcopy(accepted_weights))
journal.add_ESS_estimate(accepted_weights)
journal.add_opt_values(accepted_cov_mats)
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters,
accepted_weights=accepted_weights)
names_and_parameters = self._get_names_and_parameters()
journal.add_user_parameters(names_and_parameters)
journal.number_of_simulations.append(self.simulation_counter)
journal.configuration["steps"] = aStep + 1
journal.configuration["anneal_parameter"] = anneal_parameter
return journal
# define helper functions for map step
def _accept_parameter(self, rng_and_index, npc=None):
"""
Samples a single model parameter and simulate from it until
distance between simulated outcome and the observation is
smaller than epsilon.
Parameters
----------
seed: numpy.ndarray
2 dimensional array. The first entry defines the initial seed of therandom number generator.
The second entry defines the index in the data set.
Returns
-------
numpy.ndarray
accepted parameter
"""
rng = rng_and_index[0]
index = rng_and_index[1]
rng.seed(rng.randint(np.iinfo(np.uint32).max, dtype=np.uint32))
mapping_for_kernels, garbage_index = self.accepted_parameters_manager.get_mapping(
self.accepted_parameters_manager.model)
result_theta = []
result_distance = []
counter = 0
if self.accepted_parameters_manager.accepted_parameters_bds is None:
self.sample_from_prior(rng=rng)
y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc)
counter += 1
distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim)
result_theta += self.get_parameters()
result_distance.append(distance)
else:
theta = self.accepted_parameters_manager.accepted_parameters_bds.value()[index]
self.set_parameters(theta)
y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc)
counter += 1
distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim)
result_theta.append(theta)
result_distance.append(distance)
for ind in range(0, self.chain_length - 1):
while True:
perturbation_output = self.perturb(index, rng=rng)
if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0:
break
y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc)
counter += 1
new_distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim)
# Calculate acceptance probability:
ratio_prior_prob = self.pdf_of_prior(self.model, perturbation_output[1]) / self.pdf_of_prior(self.model,
theta)
kernel_numerator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager,
perturbation_output[1], theta)
kernel_denominator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, theta,
perturbation_output[1])
ratio_likelihood_prob = kernel_numerator / kernel_denominator
acceptance_prob = min(1, ratio_prior_prob * ratio_likelihood_prob) * (
new_distance < self.anneal_parameter)
# If accepted
if rng.binomial(1, acceptance_prob) == 1:
result_theta.append(perturbation_output[1])
result_distance.append(new_distance)
theta = perturbation_output[1]
distance = new_distance
else:
result_theta.append(theta)
result_distance.append(distance)
return result_theta, result_distance, counter
def _update_cov_mat(self, rng_t, npc=None):
"""
Updates the covariance matrix.
Parameters
----------
seed_t: numpy.ndarray
2 dimensional array. The first entry defines the initial seed of the random number generator.
The second entry defines the way in which the accepted covariance matrix is transformed.
Returns
-------
numpy.ndarray
accepted covariance matrix
"""
rng = rng_t[0]
t = rng_t[1]
rng.seed(rng.randint(np.iinfo(np.uint32).max, dtype=np.uint32))
acceptance = 0
accepted_cov_mats_transformed = [cov_mat * pow(2.0, -2.0 * t) for cov_mat in
self.accepted_parameters_manager.accepted_cov_mats_bds.value()]
theta = self.accepted_parameters_manager.accepted_parameters_bds.value()[0]
mapping_for_kernels, garbage_index = self.accepted_parameters_manager.get_mapping(
self.accepted_parameters_manager.model)
counter = 0
for ind in range(0, self.chain_length):
self.logger.debug("Parameter acceptance loop step {}.".format(ind))
while True:
perturbation_output = self.perturb(0, rng=rng)
if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0:
break
y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc)
counter += 1
new_distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim)
self.logger.debug("Calculate acceptance probability.")
# Calculate acceptance probability:
ratio_prior_prob = self.pdf_of_prior(self.model, perturbation_output[1]) / self.pdf_of_prior(self.model,
theta)
kernel_numerator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager,
perturbation_output[1], theta)
kernel_denominator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, theta,
perturbation_output[1])
ratio_likelihood_prob = kernel_numerator / kernel_denominator
acceptance_prob = min(1, ratio_prior_prob * ratio_likelihood_prob) * (new_distance < self.anneal_parameter)
if rng.binomial(1, acceptance_prob) == 1:
theta = perturbation_output[1]
acceptance = acceptance + 1
self.logger.debug("Return accepted parameters.")
if acceptance / 10 <= 0.5 and acceptance / 10 >= 0.3:
return accepted_cov_mats_transformed, t, 1, counter
else:
return accepted_cov_mats_transformed, t, 0, counter
[docs]class RSMCABC(BaseDiscrepancy, InferenceMethod):
"""This class implements Replenishment Sequential Monte Carlo Approximate Bayesian computation of
Drovandi and Pettitt [1].
[1] CC. Drovandi CC and AN. Pettitt, Estimation of parameters for macroparasite population evolution using
approximate Bayesian computation. Biometrics 67(1):225–233, 2011.
Parameters
----------
model : list
A list of the Probabilistic models corresponding to the observed datasets
distance : abcpy.distances.Distance
Distance object defining the distance measure used to compare simulated and observed data sets.
kernel : abcpy.distributions.Distribution
Distribution object defining the perturbation kernel needed for the sampling.
backend : abcpy.backends.Backend
Backend object defining the backend to be used.
seed : integer, optional
Optional initial seed for the random number generator. The default value is generated randomly.
"""
model = None
distance = None
kernel = None
R = None
rng = None
n_samples = None
n_samples_per_param = None
alpha = None
accepted_dist_bds = None
backend = None
[docs] def __init__(self, root_models, distances, backend, kernel=None, seed=None):
self.model = root_models
# We define the joint Linear combination distance using all the distances for each individual models
self.distance = LinearCombination(root_models, distances)
if kernel is None:
mapping, garbage_index = self._get_mapping()
models = []
for mdl, mdl_index in mapping:
models.append(mdl)
kernel = DefaultKernel(models)
self.kernel = kernel
self.backend = backend
self.logger = logging.getLogger(__name__)
self.R = None
self.rng = np.random.RandomState(seed)
# these are usually big tables, so we broadcast them to have them once
# per executor instead of once per task
self.accepted_parameters_manager = AcceptedParametersManager(self.model)
self.accepted_dist_bds = None
self.simulation_counter = 0
[docs] def sample(self, observations, steps, n_samples=10000, n_samples_per_param=1, alpha=0.1, epsilon_init=100,
epsilon_final=0.1, const=0.01, covFactor=2.0, full_output=0, journal_file=None):
"""
Samples from the posterior distribution of the model parameter given the observed
data observations.
Parameters
----------
observations : list
A list, containing lists describing the observed data sets
steps : integer
Number of iterations in the sequential algoritm ("generations")
n_samples : integer, optional
Number of samples to generate. The default value is 10000.
n_samples_per_param : integer, optional
Number of data points in each simulated data set. The default value is 1.
alpha : float, optional
A parameter taking values between [0,1], the default value is 0.1.
epsilon_init : float, optional
Initial value of threshold, the default is 100
epsilon_final : float, optional
Terminal value of threshold, the default is 0.1
const : float, optional
A constant to compute acceptance probability, the default is 0.01.
covFactor : float, optional
scaling parameter of the covariance matrix. The default value is 2.
full_output: integer, optional
If full_output==1, intermediate results are included in output journal.
The default value is 0, meaning the intermediate results are not saved.
journal_file: str, optional
Filename of a journal file to read an already saved journal file, from which the first iteration will start.
The default value is None.
Returns
-------
abcpy.output.Journal
A journal containing simulation results, metadata and optionally intermediate results.
"""
self.sample_from_prior(rng=self.rng)
self.accepted_parameters_manager.broadcast(self.backend, observations)
self.alpha = alpha
self.n_samples = n_samples
self.n_samples_per_param = n_samples_per_param
if journal_file is None:
journal = Journal(full_output)
journal.configuration["type_model"] = [type(model).__name__ for model in self.model]
journal.configuration["type_dist_func"] = type(self.distance).__name__
journal.configuration["n_samples"] = self.n_samples
journal.configuration["n_samples_per_param"] = self.n_samples_per_param
journal.configuration["steps"] = steps
else:
journal = Journal.fromFile(journal_file)
accepted_parameters = None
accepted_cov_mat = None
accepted_dist = None
accepted_weights = None
# main RSMCABC algorithm
for aStep in range(steps):
self.logger.info("RSMCABC iteration {}".format(aStep))
if aStep == 0 and journal_file is not None:
accepted_parameters = journal.get_accepted_parameters(-1)
accepted_weights = journal.get_weights(-1)
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_weights=accepted_weights,
accepted_parameters=accepted_parameters)
kernel_parameters = []
for kernel in self.kernel.kernels:
kernel_parameters.append(
self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models))
self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters)
accepted_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager)
accepted_cov_mats = self._compute_accepted_cov_mats(covFactor, accepted_cov_mats)
# accepted_cov_mats = [covFactor * cov_mat for cov_mat in accepted_cov_mats]
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_cov_mats=accepted_cov_mats)
# 0: Compute epsilon, compute new covariance matrix for Kernel,
# and finally Drawing new new/perturbed samples using prior or MCMC Kernel
# print("DEBUG: Iteration " + str(aStep) + " of RSMCABC algorithm.")
self.logger.info("Compute epsilon and calculating covariance matrix.")
if aStep == 0:
n_replenish = n_samples
# Compute epsilon
epsilon = [epsilon_init]
R = int(1)
if journal_file is None:
accepted_cov_mats = None
else:
# Compute epsilon
epsilon.append(accepted_dist[-1])
# Calculate covariance
# print("INFO: Calculating covariance matrix.")
kernel_parameters = []
for kernel in self.kernel.kernels:
kernel_parameters.append(
self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models))
self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters)
accepted_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager)
accepted_cov_mats = self._compute_accepted_cov_mats(covFactor, accepted_cov_mats)
# accepted_cov_mats = [covFactor*cov_mat for cov_mat in accepted_cov_mats]
if epsilon[-1] < epsilon_final:
self.logger("accepted epsilon {:e} < {:e}"
.format(epsilon[-1], epsilon_final))
break
seed_arr = self.rng.randint(0, np.iinfo(np.uint32).max, size=n_replenish, dtype=np.uint32)
rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr])
rng_pds = self.backend.parallelize(rng_arr)
# update remotely required variables
self.logger.info("Broadcasting parameters.")
# print("INFO: Broadcasting parameters.")
self.epsilon = epsilon
self.R = R
self.logger.info("Broadcast updated variable.")
# Broadcast updated variable
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_cov_mats=accepted_cov_mats)
self._update_broadcasts(accepted_dist)
# calculate resample parameters
self.logger.info("Resampling parameters")
# print("INFO: Resampling parameters")
params_and_dist_index_pds = self.backend.map(self._accept_parameter, rng_pds)
params_and_dist_index = self.backend.collect(params_and_dist_index_pds)
new_parameters, new_dist, new_index, counter = [list(t) for t in zip(*params_and_dist_index)]
new_dist = np.array(new_dist)
new_index = np.array(new_index)
for count in counter:
self.simulation_counter += count
# 1: Update all parameters, compute acceptance probability, compute epsilon
self.logger.info("Append updated new parameters.")
if len(new_dist) == self.n_samples:
accepted_parameters = new_parameters
accepted_dist = new_dist
accepted_weights = np.ones(shape=(len(accepted_parameters), 1)) * (1 / len(accepted_parameters))
else:
accepted_parameters += new_parameters
accepted_dist = np.concatenate((accepted_dist, new_dist))
accepted_weights = np.ones(shape=(len(accepted_parameters), 1)) * (1 / len(accepted_parameters))
if (full_output == 1 and aStep <= steps - 1) or (full_output == 0 and aStep == steps - 1):
self.logger.info("Saving configuration to output journal.")
journal.add_accepted_parameters(copy.deepcopy(accepted_parameters))
journal.add_distances(copy.deepcopy(accepted_dist))
journal.add_weights(accepted_weights)
journal.add_ESS_estimate(accepted_weights)
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_weights=accepted_weights,
accepted_parameters=accepted_parameters)
names_and_parameters = self._get_names_and_parameters()
journal.add_user_parameters(names_and_parameters)
journal.number_of_simulations.append(self.simulation_counter)
# 2: Compute acceptance probability and set R
self.logger.info("Compute acceptance probabilty and set R")
prob_acceptance = sum(new_index) / (R * n_replenish)
if prob_acceptance == 1 or prob_acceptance == 0:
R = 1
else:
R = int(np.log(const) / np.log(1 - prob_acceptance))
self.logger.info("Order accepted parameters and distances")
n_replenish = round(n_samples * alpha)
accepted_params_and_dist = zip(accepted_dist, accepted_parameters)
accepted_params_and_dist = sorted(accepted_params_and_dist, key=lambda x: x[0])
accepted_dist, accepted_parameters = [list(t) for t in zip(*accepted_params_and_dist)]
self.logger.info("Throw away N_alpha particles with largest dist")
# Throw away N_alpha particles with largest distance
del accepted_parameters[self.n_samples - round(n_samples * alpha):]
accepted_dist = np.delete(accepted_dist,
np.arange(round(n_samples * alpha)) + (n_samples - round(n_samples * alpha)),
0)
accepted_weights = np.ones(shape=(len(accepted_parameters), 1)) * (1 / len(accepted_parameters))
self.logger.info("Update parameters, weights")
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_weights=accepted_weights,
accepted_parameters=accepted_parameters)
# Add epsilon_arr to the journal
journal.configuration["epsilon_arr"] = epsilon
return journal
def _update_broadcasts(self, accepted_dist):
def destroy(bc):
if bc != None:
bc.unpersist
# bc.destroy
if not accepted_dist is None:
self.accepted_dist_bds = self.backend.broadcast(accepted_dist)
# define helper functions for map step
def _accept_parameter(self, rng, npc=None):
"""
Samples a single model parameter and simulate from it until
distance between simulated outcome and the observation is
smaller than epsilon.
Parameters
----------
seed: integer
Initial seed for the random number generator.
Returns
-------
numpy.ndarray
accepted parameter
"""
rng.seed(rng.randint(np.iinfo(np.uint32).max, dtype=np.uint32))
distance = self.distance.dist_max()
mapping_for_kernels, garbage_index = self.accepted_parameters_manager.get_mapping(
self.accepted_parameters_manager.model)
counter = 0
if self.accepted_parameters_manager.accepted_parameters_bds is None:
while distance > self.epsilon[-1]:
self.sample_from_prior(rng=rng)
y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc)
counter += 1
distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim)
index_accept = 1
else:
index = rng.choice(len(self.accepted_parameters_manager.accepted_parameters_bds.value()), size=1)
theta = self.accepted_parameters_manager.accepted_parameters_bds.value()[index[0]]
index_accept = 0.0
for ind in range(self.R):
while True:
perturbation_output = self.perturb(index[0], rng=rng)
if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0:
break
y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc)
counter += 1
distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim)
ratio_prior_prob = self.pdf_of_prior(self.model, perturbation_output[1]) / self.pdf_of_prior(self.model,
theta)
kernel_numerator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager,
perturbation_output[1], theta)
kernel_denominator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, theta,
perturbation_output[1])
ratio_kernel_prob = kernel_numerator / kernel_denominator
probability_acceptance = min(1, ratio_prior_prob * ratio_kernel_prob)
if distance < self.epsilon[-1] and rng.binomial(1, probability_acceptance) == 1:
index_accept += 1
else:
self.set_parameters(theta)
distance = self.accepted_dist_bds.value()[index[0]]
return self.get_parameters(self.model), distance, index_accept, counter
def _compute_accepted_cov_mats(self, covFactor, new_cov_mats):
"""
Update the covariance matrices computed from data by multiplying them with covFactor and adding a small term in
the diagonal for numerical stability.
Parameters
----------
covFactor : float
factor to correct the covariance matrices
new_cov_mats : list
list of covariance matrices computed from data
Returns
-------
list
List of new accepted covariance matrices
"""
# accepted_cov_mats = [covFactor * cov_mat for cov_mat in accepted_cov_mats]
accepted_cov_mats = []
for new_cov_mat in new_cov_mats:
if not (new_cov_mat.size == 1):
accepted_cov_mats.append(
covFactor * new_cov_mat + 0.0001 * np.trace(new_cov_mat) * np.eye(new_cov_mat.shape[0]))
else:
accepted_cov_mats.append((covFactor * new_cov_mat + 0.0001 * new_cov_mat).reshape(1, 1))
return accepted_cov_mats
[docs]class APMCABC(BaseDiscrepancy, InferenceMethod):
"""This class implements Adaptive Population Monte Carlo Approximate Bayesian computation of
M. Lenormand et al. [1].
[1] M. Lenormand, F. Jabot and G. Deffuant, Adaptive approximate Bayesian computation
for complex models. Computational Statistics, 28:2777–2796, 2013.
Parameters
----------
model : list
A list of the Probabilistic models corresponding to the observed datasets
distance : abcpy.distances.Distance
Distance object defining the distance measure used to compare simulated and observed data sets.
kernel : abcpy.distributions.Distribution
Distribution object defining the perturbation kernel needed for the sampling.
backend : abcpy.backends.Backend
Backend object defining the backend to be used.
seed : integer, optional
Optional initial seed for the random number generator. The default value is generated randomly.
"""
model = None
distance = None
kernel = None
epsilon = None
rng = None
n_samples = None
n_samples_per_param = None
alpha = None
accepted_dist = None
backend = None
[docs] def __init__(self, root_models, distances, backend, kernel=None, seed=None):
self.model = root_models
# We define the joint Linear combination distance using all the distances for each individual models
self.distance = LinearCombination(root_models, distances)
if kernel is None:
mapping, garbage_index = self._get_mapping()
models = []
for mdl, mdl_index in mapping:
models.append(mdl)
kernel = DefaultKernel(models)
self.kernel = kernel
self.backend = backend
self.logger = logging.getLogger(__name__)
self.epsilon = None
self.rng = np.random.RandomState(seed)
# these are usually big tables, so we broadcast them to have them once
# per executor instead of once per task
self.accepted_parameters_manager = AcceptedParametersManager(self.model)
self.accepted_dist_bds = None
self.simulation_counter = 0
[docs] def sample(self, observations, steps, n_samples=10000, n_samples_per_param=1, alpha=0.1, acceptance_cutoff=0.03,
covFactor=2.0, full_output=0, journal_file=None):
"""Samples from the posterior distribution of the model parameter given the observed
data observations.
Parameters
----------
observations : list
A list, containing lists describing the observed data sets
steps : integer
Number of iterations in the sequential algoritm ("generations")
n_samples : integer, optional
Number of samples to generate. The default value is 10000.
n_samples_per_param : integer, optional
Number of data points in each simulated data set. The default value is 1.
alpha : float, optional
A parameter taking values between [0,1], the default value is 0.1.
acceptance_cutoff : float, optional
Acceptance ratio cutoff, should be chosen between 0.01 and 0.03
covFactor : float, optional
scaling parameter of the covariance matrix. The default value is 2.
full_output: integer, optional
If full_output==1, intermediate results are included in output journal.
The default value is 0, meaning the intermediate results are not saved.
journal_file: str, optional
Filename of a journal file to read an already saved journal file, from which the first iteration will start.
The default value is None.
Returns
-------
abcpy.output.Journal
A journal containing simulation results, metadata and optionally intermediate results.
"""
self.sample_from_prior(rng=self.rng)
self.accepted_parameters_manager.broadcast(self.backend, observations)
self.alpha = alpha
self.n_samples = n_samples
self.n_samples_per_param = n_samples_per_param
if journal_file is None:
journal = Journal(full_output)
journal.configuration["type_model"] = [type(model).__name__ for model in self.model]
journal.configuration["type_dist_func"] = type(self.distance).__name__
journal.configuration["n_samples"] = self.n_samples
journal.configuration["n_samples_per_param"] = self.n_samples_per_param
journal.configuration["steps"] = steps
else:
journal = Journal.fromFile(journal_file)
accepted_parameters = None
accepted_weights = None
accepted_cov_mats = None
accepted_dist = None
alpha_accepted_parameters = None
alpha_accepted_weights = None
alpha_accepted_dist = None
# main APMCABC algorithm
# print("INFO: Starting APMCABC iterations.")
for aStep in range(steps):
self.logger.info("APMCABC iteration {}".format(aStep))
if aStep == 0 and journal_file is not None:
accepted_parameters = journal.get_accepted_parameters(-1)
accepted_weights = journal.get_weights(-1)
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters,
accepted_weights=accepted_weights)
kernel_parameters = []
for kernel in self.kernel.kernels:
kernel_parameters.append(
self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models))
self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters)
accepted_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager)
accepted_cov_mats = self._compute_accepted_cov_mats(covFactor, accepted_cov_mats)
# accepted_cov_mats = [covFactor * cov_mat for cov_mat in accepted_cov_mats]
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters,
accepted_weights=accepted_weights)
alpha_accepted_parameters = accepted_parameters
alpha_accepted_weights = accepted_weights
# 0: Drawing new new/perturbed samples using prior or MCMC Kernel
if aStep > 0:
n_additional_samples = n_samples - round(n_samples * alpha)
else:
n_additional_samples = n_samples
seed_arr = self.rng.randint(0, np.iinfo(np.uint32).max, size=n_additional_samples, dtype=np.uint32)
rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr])
rng_pds = self.backend.parallelize(rng_arr)
# update remotely required variables
self.logger.info("Broadcasting parameters")
self.accepted_parameters_manager.update_broadcast(self.backend,
accepted_parameters=alpha_accepted_parameters,
accepted_weights=alpha_accepted_weights,
accepted_cov_mats=accepted_cov_mats)
self._update_broadcasts(alpha_accepted_dist)
# calculate resample parameters
self.logger.info("Resampling parameters")
params_and_dist_weights_pds = self.backend.map(self._accept_parameter, rng_pds)
params_and_dist_weights = self.backend.collect(params_and_dist_weights_pds)
new_parameters, new_dist, new_weights, counter = [list(t) for t in zip(*params_and_dist_weights)]
new_parameters = np.array(new_parameters)
new_dist = np.array(new_dist)
new_weights = np.array(new_weights).reshape(n_additional_samples, 1)
for count in counter:
self.simulation_counter += count
# 1: Update all parameters, compute acceptance probability, compute epsilon
if len(new_weights) == n_samples:
accepted_parameters = new_parameters
accepted_dist = new_dist
accepted_weights = new_weights
# Compute acceptance probability
prob_acceptance = 1
# Compute epsilon
epsilon = [np.percentile(accepted_dist, alpha * 100)]
else:
accepted_parameters = np.concatenate((alpha_accepted_parameters, new_parameters))
accepted_dist = np.concatenate((alpha_accepted_dist, new_dist))
accepted_weights = np.concatenate((alpha_accepted_weights, new_weights))
# Compute acceptance probability
prob_acceptance = sum(new_dist < epsilon[-1]) / len(new_dist)
# Compute epsilon
epsilon.append(np.percentile(accepted_dist, alpha * 100))
# 2: Update alpha_parameters, alpha_dist and alpha_weights
index_alpha = accepted_dist < epsilon[-1]
alpha_accepted_parameters = accepted_parameters[index_alpha, :]
alpha_accepted_weights = accepted_weights[index_alpha] / sum(accepted_weights[index_alpha])
alpha_accepted_dist = accepted_dist[index_alpha]
# 3: calculate covariance
self.logger.info("Calculating covariance matrix")
self.accepted_parameters_manager.update_broadcast(self.backend,
accepted_parameters=alpha_accepted_parameters,
accepted_weights=alpha_accepted_weights)
kernel_parameters = []
for kernel in self.kernel.kernels:
kernel_parameters.append(
self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models))
self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters)
accepted_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager)
accepted_cov_mats = self._compute_accepted_cov_mats(covFactor, accepted_cov_mats)
# accepted_cov_mats = [covFactor*cov_mat for cov_mat in accepted_cov_mats]
# print("INFO: Saving configuration to output journal.")
if (full_output == 1 and aStep <= steps - 1) or (full_output == 0 and aStep == steps - 1):
journal.add_accepted_parameters(copy.deepcopy(accepted_parameters))
journal.add_distances(copy.deepcopy(accepted_dist))
journal.add_weights(copy.deepcopy(accepted_weights))
journal.add_ESS_estimate(accepted_weights)
self.accepted_parameters_manager.update_broadcast(self.backend,
accepted_parameters=accepted_parameters,
accepted_weights=accepted_weights)
names_and_parameters = self._get_names_and_parameters()
journal.add_user_parameters(names_and_parameters)
journal.number_of_simulations.append(self.simulation_counter)
# 4: Check probability of acceptance lower than acceptance_cutoff
if prob_acceptance < acceptance_cutoff:
break
# Add epsilon_arr to the journal
journal.configuration["epsilon_arr"] = epsilon
return journal
def _update_broadcasts(self, accepted_dist):
def destroy(bc):
if bc != None:
bc.unpersist
# bc.destroy
self.accepted_dist_bds = self.backend.broadcast(accepted_dist)
# define helper functions for map step
def _accept_parameter(self, rng, npc=None):
"""
Samples a single model parameter and simulate from it until
distance between simulated outcome and the observation is
smaller than epsilon.
Parameters
----------
seed: integer
Initial seed for the random number generator.
Returns
-------
numpy.ndarray
accepted parameter
"""
rng.seed(rng.randint(np.iinfo(np.uint32).max, dtype=np.uint32))
mapping_for_kernels, garbage_index = self.accepted_parameters_manager.get_mapping(
self.accepted_parameters_manager.model)
counter = 0
if self.accepted_parameters_manager.accepted_parameters_bds is None:
self.sample_from_prior(rng=rng)
y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc)
counter += 1
distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim)
weight = 1.0
else:
index = rng.choice(len(self.accepted_parameters_manager.accepted_weights_bds.value()), size=1,
p=self.accepted_parameters_manager.accepted_weights_bds.value().reshape(-1))
# truncate the normal to the bounds of parameter space of the model
# truncating the normal like this is fine: https://arxiv.org/pdf/0907.4010v1.pdf
while True:
perturbation_output = self.perturb(index[0], rng=rng)
if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0:
break
y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc)
counter += 1
distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim)
prior_prob = self.pdf_of_prior(self.model, perturbation_output[1])
denominator = 0.0
for i in range(len(self.accepted_parameters_manager.accepted_weights_bds.value())):
pdf_value = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager,
self.accepted_parameters_manager.accepted_parameters_bds.value()[i],
perturbation_output[1])
denominator += self.accepted_parameters_manager.accepted_weights_bds.value()[i, 0] * pdf_value
weight = 1.0 * prior_prob / denominator
return self.get_parameters(self.model), distance, weight, counter
def _compute_accepted_cov_mats(self, covFactor, new_cov_mats):
"""
Update the covariance matrices computed from data by multiplying them with covFactor and adding a small term in
the diagonal for numerical stability.
Parameters
----------
covFactor : float
factor to correct the covariance matrices
new_cov_mats : list
list of covariance matrices computed from data
Returns
-------
list
List of new accepted covariance matrices
"""
# accepted_cov_mats = [covFactor * cov_mat for cov_mat in accepted_cov_mats]
accepted_cov_mats = []
for new_cov_mat in new_cov_mats:
if not (new_cov_mat.size == 1):
accepted_cov_mats.append(
covFactor * new_cov_mat + 0.0001 * np.trace(new_cov_mat) * np.eye(new_cov_mat.shape[0]))
else:
accepted_cov_mats.append((covFactor * new_cov_mat + 0.0001 * new_cov_mat).reshape(1, 1))
return accepted_cov_mats
[docs]class SMCABC(BaseDiscrepancy, InferenceMethod):
"""This class implements Sequential Monte Carlo Approximate Bayesian computation of
Del Moral et al. [1].
[1] P. Del Moral, A. Doucet, A. Jasra, An adaptive sequential Monte Carlo method for approximate
Bayesian computation. Statistics and Computing, 22(5):1009–1020, 2012.
[2] Lee, Anthony. "n the choice of MCMC kernels for approximate Bayesian computation with SMC samplers.
Proceedings of the 2012 Winter Simulation Conference (WSC). IEEE, 2012.
Parameters
----------
model : list
A list of the Probabilistic models corresponding to the observed datasets
distance : abcpy.distances.Distance
Distance object defining the distance measure used to compare simulated and observed data sets.
kernel : abcpy.distributions.Distribution
Distribution object defining the perturbation kernel needed for the sampling.
backend : abcpy.backends.Backend
Backend object defining the backend to be used.
seed : integer, optional
Optional initial seed for the random number generator. The default value is generated randomly.
"""
model = None
distance = None
kernel = None
epsilon = None
rng = None
n_samples = None
n_samples_per_param = None
accepted_y_sim_bds = None
backend = None
[docs] def __init__(self, root_models, distances, backend, kernel=None, seed=None):
self.model = root_models
# We define the joint Linear combination distance using all the distances for each individual models
self.distance = LinearCombination(root_models, distances)
if kernel is None:
mapping, garbage_index = self._get_mapping()
models = []
for mdl, mdl_index in mapping:
models.append(mdl)
kernel = DefaultKernel(models)
self.kernel = kernel
self.backend = backend
self.logger = logging.getLogger(__name__)
self.epsilon = None
self.rng = np.random.RandomState(seed)
# these are usually big tables, so we broadcast them to have them once
# per executor instead of once per task\
self.accepted_parameters_manager = AcceptedParametersManager(self.model)
self.accepted_y_sim_bds = None
self.simulation_counter = 0
[docs] def sample(self, observations, steps, n_samples=10000, n_samples_per_param=1, epsilon_final=0.1, alpha=0.95,
covFactor=2, resample=None, full_output=0, which_mcmc_kernel=0, journal_file=None):
"""Samples from the posterior distribution of the model parameter given the observed
data observations.
Parameters
----------
observations : list
A list, containing lists describing the observed data sets
steps : integer
Number of iterations in the sequential algoritm ("generations")
n_samples : integer, optional
Number of samples to generate. The default value is 10000.
n_samples_per_param : integer, optional
Number of data points in each simulated data set. The default value is 1.
epsilon_final : float, optional
The final threshold value of epsilon to be reached; if at some iteration you reach a lower epsilon than
epsilon_final, the algorithm will stop and not proceed with further iterations. The default value is 0.1.
alpha : float, optional
A parameter taking values between [0,1], determinining the rate of change of the threshold epsilon. The
default value is 0.95.
covFactor : float, optional
scaling parameter of the covariance matrix. The default value is 2.
resample : float, optional
It defines the resample step: introduce a resample step, after the particles have been
perturbed and the new weights have been computed, if the effective sample size is smaller than resample. If
not provided, resample is set to 0.5 * n_samples.
full_output: integer, optional
If full_output==1, intermediate results are included in output journal.
The default value is 0, meaning the intermediate results are not saved.
which_mcmc_kernel: integer, optional
Specifies which MCMC kernel to be used: '0' kernel suggested in [1], any other value will use r-hit kernel
suggested by Anthony Lee [2]. The default value is 0.
journal_file: str, optional
Filename of a journal file to read an already saved journal file, from which the first iteration will start.
The default value is None.
Returns
-------
abcpy.output.Journal
A journal containing simulation results, metadata and optionally intermediate results.
"""
self.sample_from_prior(rng=self.rng)
self.accepted_parameters_manager.broadcast(self.backend, observations)
self.n_samples = n_samples
self.n_samples_per_param = n_samples_per_param
if journal_file is None:
journal = Journal(full_output)
journal.configuration["type_model"] = [type(model).__name__ for model in self.model]
journal.configuration["type_dist_func"] = type(self.distance).__name__
journal.configuration["n_samples"] = self.n_samples
journal.configuration["n_samples_per_param"] = self.n_samples_per_param
journal.configuration["steps"] = steps
# maybe add which kernel I am using?
else:
journal = Journal.fromFile(journal_file)
accepted_parameters = None
accepted_weights = None
accepted_cov_mats = None
accepted_y_sim = None
# Define the resample parameter
if resample is None:
resample = n_samples * 0.5
# Define epsilon_init
epsilon = [10000]
# main SMC ABC algorithm
for aStep in range(0, steps):
self.logger.info("SMCABC iteration {}".format(aStep))
if aStep == 0 and journal_file is not None:
accepted_parameters = journal.get_accepted_parameters(-1)
accepted_weights = journal.get_weights(-1)
accepted_y_sim = journal.opt_values[-1]
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters,
accepted_weights=accepted_weights)
kernel_parameters = []
for kernel in self.kernel.kernels:
kernel_parameters.append(
self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models))
self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters)
accepted_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager)
accepted_cov_mats = self._compute_accepted_cov_mats(covFactor, accepted_cov_mats)
# accepted_cov_mats = [covFactor * cov_mat for cov_mat in accepted_cov_mats]
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_cov_mats=accepted_cov_mats)
# Break if epsilon in previous step is less than epsilon_final
if epsilon[-1] <= epsilon_final:
break
# 0: Compute the Epsilon
if accepted_y_sim != None:
self.logger.info("Compute epsilon, might take a while")
# Compute epsilon for next step
fun = lambda epsilon_var: self._compute_epsilon(epsilon_var, \
epsilon, observations, accepted_y_sim, accepted_weights,
n_samples, n_samples_per_param, alpha)
epsilon_new = self._bisection(fun, epsilon_final, epsilon[-1], 0.001)
if epsilon_new < epsilon_final:
epsilon_new = epsilon_final
epsilon.append(epsilon_new)
# 1: calculate weights for new parameters
self.logger.info("Calculating weights")
if accepted_y_sim != None:
new_weights = np.zeros(shape=n_samples, )
for ind1 in range(n_samples):
numerator = 0.0
denominator = 0.0
for ind2 in range(n_samples_per_param):
numerator += (self.distance.distance(observations, [[accepted_y_sim[ind1][0][ind2]]]) < epsilon[
-1])
denominator += (
self.distance.distance(observations, [[accepted_y_sim[ind1][0][ind2]]]) < epsilon[-2])
if denominator != 0.0:
new_weights[ind1] = accepted_weights[ind1] * (numerator / denominator)
else:
new_weights[ind1] = 0
new_weights = new_weights / sum(new_weights)
else:
new_weights = np.ones(shape=n_samples, ) * (1.0 / n_samples)
# 2: Resample
if accepted_y_sim is not None and pow(sum(pow(new_weights, 2)), -1) < resample:
self.logger.info("Resampling")
# Weighted resampling:
index_resampled = self.rng.choice(n_samples, n_samples, replace=True, p=new_weights)
# accepted_parameters is a list. Then the indexing here does not work:
# accepted_parameters = accepted_parameters[index_resampled]
# do instead:
accepted_parameters = [accepted_parameters[i] for i in
index_resampled] # why don't we use arrays however?
new_weights = np.ones(shape=n_samples, ) * (1.0 / n_samples)
# Update the weights
accepted_weights = new_weights.reshape(len(new_weights), 1)
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters,
accepted_weights=accepted_weights)
if accepted_y_sim is not None:
kernel_parameters = []
for kernel in self.kernel.kernels:
kernel_parameters.append(
self.accepted_parameters_manager.get_accepted_parameters_bds_values(kernel.models))
self.accepted_parameters_manager.update_kernel_values(self.backend, kernel_parameters=kernel_parameters)
accepted_cov_mats = self.kernel.calculate_cov(self.accepted_parameters_manager)
accepted_cov_mats = self._compute_accepted_cov_mats(covFactor, accepted_cov_mats)
# accepted_cov_mats = [covFactor * cov_mat for cov_mat in accepted_cov_mats]
# 3: Drawing new perturbed samples using MCMC Kernel
self.logger.debug("drawing new pertubated samples using mcmc kernel")
seed_arr = self.rng.randint(0, np.iinfo(np.uint32).max, size=n_samples, dtype=np.uint32)
rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr])
index_arr = np.arange(n_samples)
rng_and_index_arr = np.column_stack((rng_arr, index_arr))
rng_and_index_pds = self.backend.parallelize(rng_and_index_arr)
# print("INFO: Broadcasting parameters.")
self.epsilon = epsilon
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters,
accepted_weights=accepted_weights,
accepted_cov_mats=accepted_cov_mats)
self._update_broadcasts(accepted_y_sim)
# calculate resample parameters
self.logger.info("Drawing perturbed sampless")
if which_mcmc_kernel == 0:
params_and_ysim_pds = self.backend.map(self._accept_parameter, rng_and_index_pds)
else:
params_and_ysim_pds = self.backend.map(self._accept_parameter_r_hit_kernel, rng_and_index_pds)
params_and_ysim = self.backend.collect(params_and_ysim_pds)
new_parameters, new_y_sim, distances, counter = [list(t) for t in zip(*params_and_ysim)]
distances = np.array(distances)
for count in counter:
self.simulation_counter += count
# Update the parameters
accepted_parameters = new_parameters
accepted_y_sim = new_y_sim
if (full_output == 1 and aStep <= steps - 1) or (full_output == 0 and aStep == steps - 1):
self.logger.info("Saving configuration to output journal")
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters)
journal.add_accepted_parameters(copy.deepcopy(accepted_parameters))
journal.add_distances(copy.deepcopy(distances))
journal.add_weights(copy.deepcopy(accepted_weights))
journal.add_ESS_estimate(accepted_weights)
journal.add_opt_values(copy.deepcopy(accepted_y_sim))
names_and_parameters = self._get_names_and_parameters()
journal.add_user_parameters(names_and_parameters)
journal.number_of_simulations.append(self.simulation_counter)
# Add epsilon_arr to the journal
journal.configuration["epsilon_arr"] = epsilon
return journal
def _compute_epsilon(self, epsilon_new, epsilon, observations, accepted_y_sim, accepted_weights, n_samples,
n_samples_per_param, alpha):
"""
Parameters
----------
epsilon_new: float
New value for epsilon.
epsilon: float
Current threshold.
observations: numpy.ndarray
Observed data.
accepted_y_sim: numpy.ndarray
Accepted simulated data.
accepted_weights: numpy.ndarray
Accepted weights.
n_samples: integer
Number of samples to generate.
n_samples_per_param: integer
Number of data points in each simulated data set.
alpha: float
Returns
-------
float
Newly computed value for threshold.
"""
RHS = alpha * pow(sum(pow(accepted_weights, 2)), -1)
LHS = np.zeros(shape=n_samples, )
for ind1 in range(n_samples):
numerator = 0.0
denominator = 0.0
for ind2 in range(n_samples_per_param):
numerator += (self.distance.distance(observations, [[accepted_y_sim[ind1][0][ind2]]]) < epsilon_new)
denominator += (self.distance.distance(observations, [[accepted_y_sim[ind1][0][ind2]]]) < epsilon[-1])
if denominator == 0:
LHS[ind1] = 0
else:
LHS[ind1] = accepted_weights[ind1] * (numerator / denominator)
if sum(LHS) == 0:
result = RHS
else:
LHS = LHS / sum(LHS)
LHS = pow(sum(pow(LHS, 2)), -1)
result = RHS - LHS
return result
def _bisection(self, func, low, high, tol):
# cache computed values, as we call func below
# several times for the same argument:
func = cached(func)
midpoint = (low + high) / 2.0
while (high - low) / 2.0 > tol:
self.logger.debug("bisection: distance = {:e} > tol = {:e}"
.format((high - low) / 2, tol))
if func(midpoint) == 0:
return midpoint
elif func(low) * func(midpoint) < 0:
high = midpoint
else:
low = midpoint
midpoint = (low + high) / 2.0
return midpoint
def _update_broadcasts(self, accepted_y_sim):
def destroy(bc):
if bc != None:
bc.unpersist
# bc.destroy
if not accepted_y_sim is None:
self.accepted_y_sim_bds = self.backend.broadcast(accepted_y_sim)
# define helper functions for map step
def _accept_parameter(self, rng_and_index, npc=None):
"""
Samples a single model parameter and simulate from it until
distance between simulated outcome and the observation is
smaller than epsilon.
Parameters
----------
seed_and_index: numpy.ndarray
2 dimensional array. The first entry specifies the initial seed for the random number generator.
The second entry defines the index in the data set.
Returns
-------
Tuple
The first entry of the tuple is the accepted parameters. The second entry is the simulated data set.
"""
rng = rng_and_index[0]
index = rng_and_index[1]
rng.seed(rng.randint(np.iinfo(np.uint32).max, dtype=np.uint32))
mapping_for_kernels, garbage_index = self.accepted_parameters_manager.get_mapping(
self.accepted_parameters_manager.model)
counter = 0
# print("on seed " + str(seed) + " distance: " + str(distance) + " epsilon: " + str(self.epsilon))
if self.accepted_parameters_manager.accepted_parameters_bds is None:
self.sample_from_prior(rng=rng)
y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc)
counter += 1
else:
if self.accepted_parameters_manager.accepted_weights_bds.value()[index] > 0:
theta = self.accepted_parameters_manager.accepted_parameters_bds.value()[index]
while True:
perturbation_output = self.perturb(index, rng=rng)
if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0:
break
y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc)
counter += 1
y_sim_old = self.accepted_y_sim_bds.value()[index]
# Calculate acceptance probability:
numerator = 0.0
denominator = 0.0
for ind in range(self.n_samples_per_param):
numerator += (self.distance.distance(self.accepted_parameters_manager.observations_bds.value(),
[[y_sim[0][ind]]]) < self.epsilon[-1])
denominator += (self.distance.distance(self.accepted_parameters_manager.observations_bds.value(),
[[y_sim_old[0][ind]]]) < self.epsilon[-1])
if denominator == 0:
ratio_data_epsilon = 1
else:
ratio_data_epsilon = numerator / denominator
ratio_prior_prob = self.pdf_of_prior(self.model, perturbation_output[1]) / self.pdf_of_prior(self.model,
theta)
kernel_numerator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager,
perturbation_output[1], theta)
kernel_denominator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, theta,
perturbation_output[1])
ratio_likelihood_prob = kernel_numerator / kernel_denominator
acceptance_prob = min(1, ratio_data_epsilon * ratio_prior_prob * ratio_likelihood_prob)
if rng.binomial(1, acceptance_prob) == 1:
self.set_parameters(perturbation_output[1])
else:
self.set_parameters(theta)
y_sim = self.accepted_y_sim_bds.value()[index]
else:
self.set_parameters(self.accepted_parameters_manager.accepted_parameters_bds.value()[index])
y_sim = self.accepted_y_sim_bds.value()[index]
distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim)
return self.get_parameters(), y_sim, distance, counter
def _accept_parameter_r_hit_kernel(self, rng_and_index, npc=None):
"""
This implements algorithm 5 in Lee (2012) [2] which is used as an MCMC kernel in SMCABC. This implementation
uses r=3.
Parameters
----------
rng_and_index: numpy.ndarray
2 dimensional array. The first entry is a random number generator.
The second entry defines the index in the data set.
Returns
-------
Tuple
The first entry of the tuple is the accepted parameters. The second entry is the simulated data set.
The third one is the distance between the simulated data set and the observation, while the fourth one is
the number of simulations needed to obtain the accepted parameter.
"""
rng = rng_and_index[0]
index = rng_and_index[1]
rng.seed(rng.randint(np.iinfo(np.uint32).max, dtype=np.uint32))
# Set value of r for r-hit kernel
r = 3
mapping_for_kernels, garbage_index = self.accepted_parameters_manager.get_mapping(
self.accepted_parameters_manager.model)
counter = 0
# print("on seed " + str(seed) + " distance: " + str(distance) + " epsilon: " + str(self.epsilon))
if self.accepted_parameters_manager.accepted_parameters_bds is None:
self.sample_from_prior(rng=rng)
y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc)
counter += 1
else:
if self.accepted_parameters_manager.accepted_weights_bds.value()[index] > 0:
theta = self.accepted_parameters_manager.accepted_parameters_bds.value()[index]
# Sample from theta until we get 'r-1' y_sim inside the epsilon ball
self.set_parameters(theta)
accept_old_arr, y_sim_old_arr, N_old = [], [], 0
while sum(accept_old_arr) < r - 1:
y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc)
y_sim_old_arr.append(y_sim)
if self.distance.distance(self.accepted_parameters_manager.observations_bds.value(),
y_sim) < self.epsilon[-1]:
accept_old_arr.append(N_old)
N_old += 1
counter += 1
# Perturb and sample from the perturbed theta until we get 'r' y_sim inside the epsilon ball
while True:
perturbation_output = self.perturb(index, rng=rng)
if perturbation_output[0] and self.pdf_of_prior(self.model, perturbation_output[1]) != 0:
break
accept_new_arr, y_sim_new_arr, N = [], [], 0
while sum(accept_new_arr) < r:
y_sim = self.simulate(self.n_samples_per_param, rng=rng, npc=npc)
y_sim_new_arr.append(y_sim)
if self.distance.distance(self.accepted_parameters_manager.observations_bds.value(),
y_sim) < self.epsilon[-1]:
accept_new_arr.append(N)
counter += 1
N += 1
# Calculate acceptance probability
ratio_prior_prob = self.pdf_of_prior(self.model, perturbation_output[1]) / self.pdf_of_prior(self.model,
theta)
kernel_numerator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager,
perturbation_output[1], theta)
kernel_denominator = self.kernel.pdf(mapping_for_kernels, self.accepted_parameters_manager, theta,
perturbation_output[1])
ratio_likelihood_prob = kernel_numerator / kernel_denominator
acceptance_prob = min(1, (N_old / (N - 1)) * ratio_prior_prob * ratio_likelihood_prob)
if rng.binomial(1, acceptance_prob) == 1:
self.set_parameters(perturbation_output[1])
# Randomly sample index J
J = rng.choice(accept_new_arr).astype(int)
y_sim = y_sim_new_arr[J]
else:
self.set_parameters(theta)
y_sim = self.accepted_y_sim_bds.value()[index]
else:
self.set_parameters(self.accepted_parameters_manager.accepted_parameters_bds.value()[index])
y_sim = self.accepted_y_sim_bds.value()[index]
distance = self.distance.distance(self.accepted_parameters_manager.observations_bds.value(), y_sim)
return self.get_parameters(), y_sim, distance, counter
def _compute_accepted_cov_mats(self, covFactor, new_cov_mats):
"""
Update the covariance matrices computed from data by multiplying them with covFactor and adding a small term in
the diagonal for numerical stability.
Parameters
----------
covFactor : float
factor to correct the covariance matrices
new_cov_mats : list
list of covariance matrices computed from data
Returns
-------
list
List of new accepted covariance matrices
"""
# accepted_cov_mats = [covFactor * cov_mat for cov_mat in accepted_cov_mats]
accepted_cov_mats = []
for new_cov_mat in new_cov_mats:
if not (new_cov_mat.size == 1):
accepted_cov_mats.append(
covFactor * new_cov_mat + 0.0001 * np.trace(new_cov_mat) * np.eye(new_cov_mat.shape[0]))
else:
accepted_cov_mats.append((covFactor * new_cov_mat + 0.0001 * new_cov_mat).reshape(1, 1))
return accepted_cov_mats