from abc import ABCMeta, abstractmethod
import numpy as np
from scipy.special import gamma
from scipy.stats import multivariate_normal
from abcpy.probabilisticmodels import Continuous
[docs]class PerturbationKernel(metaclass=ABCMeta):
"""This abstract base class represents all perturbation kernels"""
[docs] @abstractmethod
def __init__(self, models):
"""
Parameters
----------
models: list
The list of abcpy.probabilisticmodel objects that should be perturbed by this kernel.
"""
raise NotImplementedError
[docs] @abstractmethod
def calculate_cov(self, accepted_parameters_manager, kernel_index):
"""
Calculates the covariance matrix for the kernel.
Parameters
----------
accepted_parameters_manager: abcpy.acceptedparametersmanager object
The accepted parameters manager that manages all bds objects.
kernel_index: integer
The index of the kernel in the list of kernels of the joint perturbation kernel.
Returns
-------
numpy.ndarray:
The covariance matrix for the kernel.
"""
raise NotImplementedError
[docs] @abstractmethod
def update(self, accepted_parameters_manager, row_index, rng):
"""
Perturbs the parameters for this kernel.
Parameters
----------
accepted_parameters_manager: abcpy.acceptedparametersmanager object
The accepted parameters manager that manages all bds objects.
row_index: integer
The index of the accepted parameters bds that should be perturbed.
rng: random number generator
The random number generator to be used.
Returns
-------
numpy.ndarray:
The perturbed parameters.
"""
raise NotImplementedError
[docs] def pdf(self, accepted_parameters_manager, kernel_index, row_index, x):
"""
Calculates the pdf of the kernel at point x.
Parameters
----------
accepted_parameters_manager: abcpy.acceptedparametersmanager object
The accepted parameters manager that manages all bds objects.
kernel_index: integer
The index of the kernel in the list of kernels of the joint perturbation kernel.
row_index: integer
The index of the accepted parameters bds for which the pdf should be evaluated.
x: list or float
The point at which the pdf should be evaluated.
Returns
-------
float:
The pdf evaluated at point x.
"""
if isinstance(self, DiscreteKernel):
return self.pmf(accepted_parameters_manager, kernel_index, row_index, x)
else:
raise NotImplementedError
[docs]class ContinuousKernel(metaclass=ABCMeta):
"""This abstract base class represents all perturbation kernels acting on continuous parameters."""
[docs] @abstractmethod
def pdf(self, accepted_parameters_manager, kernel_index, index, x):
raise NotImplementedError
[docs]class DiscreteKernel(metaclass=ABCMeta):
"""This abstract base class represents all perturbation kernels acting on discrete parameters."""
[docs] @abstractmethod
def pmf(self, accepted_parameters_manager, kernel_index, index, x):
raise NotImplementedError
[docs]class JointPerturbationKernel(PerturbationKernel):
[docs] def __init__(self, kernels):
"""
This class joins different kernels to make up the overall perturbation kernel. Any user-implemented
perturbation kernel should derive from this class. Any kernels defined on their own should be joined in the end
using this class.
Parameters
----------
kernels: list
List of abcpy.PerturbationKernels
"""
self._check_kernels(kernels)
self.kernels = kernels
[docs] def calculate_cov(self, accepted_parameters_manager):
"""
Calculates the covariance matrix corresponding to each kernel. Commonly used before calculating weights to avoid
repeated calculation.
Parameters
----------
accepted_parameters_manager: abcpy.AcceptedParametersManager object
The AcceptedParametersManager to be uesd.
Returns
-------
list
Each entry corresponds to the covariance matrix of the corresponding kernel.
"""
all_covs = []
for kernel_index, kernel in enumerate(self.kernels):
all_covs.append(kernel.calculate_cov(accepted_parameters_manager, kernel_index))
return all_covs
def _check_kernels(self, kernels):
"""
Checks whether each model is only used in one perturbation kernel. Commonly called from the constructor.
Parameters
----------
kernels: list
List of abcpy.PertubationKernels
"""
models = []
for kernel in kernels:
for model in kernel.models:
for already_contained_model in models:
if already_contained_model == model:
raise ValueError("No two kernels can perturb the same probabilistic model.")
models.append(model)
[docs] def update(self, accepted_parameters_manager, row_index, rng=np.random.RandomState()):
"""
Perturbs the parameter values contained in accepted_parameters_manager. Commonly used while perturbing.
Parameters
----------
accepted_parameters_manager: abcpy.AcceptedParametersManager object
Defines the AcceptedParametersManager to be used.
row_index: integer
The index of the row that should be considered from the accepted_parameters_bds matrix.
rng: random number generator
The random number generator to be used.
Returns
-------
list
The list contains tuples. Each tuple contains as the first entry a probabilistic model and as the second
entry the perturbed parameter values corresponding to this model.
"""
perturbed_values = []
# Perturb values according to each kernel defined
for kernel_index, kernel in enumerate(self.kernels):
perturbed_values.append(kernel.update(accepted_parameters_manager, kernel_index, row_index, rng=rng))
perturbed_values_including_models = []
# Match the results from the perturbations and their models
for kernel_index, kernel in enumerate(self.kernels):
index = 0
for model in kernel.models:
model_values = []
# for j in range(model.get_output_dimension()):
model_values.append(perturbed_values[kernel_index][index])
index += 1
perturbed_values_including_models.append((model, model_values))
return perturbed_values_including_models
[docs] def pdf(self, mapping, accepted_parameters_manager, mean, x):
"""
Calculates the overall pdf of the kernel. Commonly used to calculate weights.
Parameters
----------
mapping: list
Each entry is a tuple of which the first entry is a abcpy.ProbabilisticModel object, the second entry is the
index in the accepted_parameters_bds list corresponding to an output of this model.
accepted_parameters_manager: abcpy.AcceptedParametersManager object
The AcceptedParametersManager to be used.
index: integer
The row to be considered in the accepted_parameters_bds matrix.
x: The point at which the pdf should be evaluated.
Returns
-------
float
The pdf evaluated at point x.
"""
result = 1.
for kernel_index, kernel in enumerate(self.kernels):
# Define a list containing the parameter values relevant to the current kernel
mean_kernel, theta = [], []
for kernel_model in kernel.models:
for model, model_output_index in mapping:
if kernel_model == model:
theta.append(x[model_output_index])
mean_kernel.append(mean[model_output_index])
result *= kernel.pdf(accepted_parameters_manager, kernel_index, mean_kernel, theta)
return result
[docs]class MultivariateNormalKernel(PerturbationKernel, ContinuousKernel):
"""This class defines a kernel perturbing the parameters using a multivariate normal distribution."""
[docs] def __init__(self, models):
self.models = models
[docs] def calculate_cov(self, accepted_parameters_manager, kernel_index):
"""
Calculates the covariance matrix relevant to this kernel.
Parameters
----------
accepted_parameters_manager: abcpy.AcceptedParametersManager object
AcceptedParametersManager to be used.
kernel_index: integer
The index of the kernel in the list of kernels of the joint kernel.
Returns
-------
list
The covariance matrix corresponding to this kernel.
"""
continuous_model = [[] for i in
range(len(accepted_parameters_manager.kernel_parameters_bds.value()[kernel_index]))]
for i in range(len(accepted_parameters_manager.kernel_parameters_bds.value()[kernel_index])):
if isinstance(accepted_parameters_manager.kernel_parameters_bds.value()[kernel_index][i][0],
(np.float, np.float32, np.float64, np.int, np.int32, np.int64)):
continuous_model[i] = accepted_parameters_manager.kernel_parameters_bds.value()[kernel_index][i]
else:
continuous_model[i] = np.concatenate(
accepted_parameters_manager.kernel_parameters_bds.value()[kernel_index][i])
continuous_model = np.array(continuous_model).astype(float)
if accepted_parameters_manager.accepted_weights_bds is not None:
weights = accepted_parameters_manager.accepted_weights_bds.value()
cov = np.cov(continuous_model, aweights=weights.reshape(-1).astype(float), rowvar=False)
else:
cov = np.cov(continuous_model, rowvar=False)
return cov
[docs] def update(self, accepted_parameters_manager, kernel_index, row_index, rng=np.random.RandomState()):
"""
Updates the parameter values contained in the accepted_paramters_manager using a multivariate normal distribution.
Parameters
----------
accepted_parameters_manager: abcpy.AcceptedParametersManager object
Defines the AcceptedParametersManager to be used.
kernel_index: integer
The index of the kernel in the list of kernels in the joint kernel.
row_index: integer
The index of the row that should be considered from the accepted_parameters_bds matrix.
rng: random number generator
The random number generator to be used.
Returns
-------
np.ndarray
The perturbed parameter values.
"""
# Get all current parameter values relevant for this model and the structure
continuous_model_values = accepted_parameters_manager.kernel_parameters_bds.value()[kernel_index]
if isinstance(continuous_model_values[row_index][0],
(np.float, np.float32, np.float64, np.int, np.int32, np.int64)):
# Perturb
cov = np.array(accepted_parameters_manager.accepted_cov_mats_bds.value()[kernel_index]).astype(float)
continuous_model_values = np.array(continuous_model_values).astype(float)
# Perturbed values anc split according to the structure
perturbed_continuous_values = rng.multivariate_normal(continuous_model_values[row_index], cov)
else:
# print('Hello')
# Learn the structure
struct = [[] for i in range(len(continuous_model_values[row_index]))]
for i in range(len(continuous_model_values[row_index])):
struct[i] = continuous_model_values[row_index][i].shape[0]
struct = np.array(struct).cumsum()
continuous_model_values = np.concatenate(continuous_model_values[row_index])
# Perturb
cov = np.array(accepted_parameters_manager.accepted_cov_mats_bds.value()[kernel_index]).astype(float)
continuous_model_values = np.array(continuous_model_values).astype(float)
# Perturbed values anc split according to the structure
perturbed_continuous_values = np.split(rng.multivariate_normal(continuous_model_values, cov), struct)[:-1]
return perturbed_continuous_values
[docs] def pdf(self, accepted_parameters_manager, kernel_index, mean, x):
"""Calculates the pdf of the kernel.
Commonly used to calculate weights.
Parameters
----------
accepted_parameters_manager: abcpy.AcceptedParametersManager object
The AcceptedParametersManager to be used.
kernel_index: integer
The index of the kernel in the list of kernels in the joint kernel.
index: integer
The row to be considered in the accepted_parameters_bds matrix.
x: The point at which the pdf should be evaluated.
Returns
-------
float
The pdf evaluated at point x.
"""
if isinstance(mean[0], (np.float, np.float32, np.float64, np.int, np.int32, np.int64)):
mean = np.array(mean).astype(float)
cov = np.array(accepted_parameters_manager.accepted_cov_mats_bds.value()[kernel_index]).astype(float)
return multivariate_normal(mean, cov, allow_singular=True).pdf(np.array(x).astype(float))
else:
mean = np.array(np.concatenate(mean)).astype(float)
cov = np.array(accepted_parameters_manager.accepted_cov_mats_bds.value()[kernel_index]).astype(float)
return multivariate_normal(mean, cov, allow_singular=True).pdf(np.concatenate(x))
[docs]class MultivariateStudentTKernel(PerturbationKernel, ContinuousKernel):
[docs] def __init__(self, models, df):
"""This class defines a kernel perturbing the parameters using a multivariate normal distribution.
Parameters
----------
models: list of abcpy.probabilisticmodel objects
The models that should be perturbed using this kernel
df: integer
The degrees of freedom to be used.
"""
self.models = models
self.df = df
[docs] def calculate_cov(self, accepted_parameters_manager, kernel_index):
"""
Calculates the covariance matrix relevant to this kernel.
Parameters
----------
accepted_parameters_manager: abcpy.AcceptedParametersManager object
AcceptedParametersManager to be used.
kernel_index: integer
The index of the kernel in the list of kernels of the joint kernel.
Returns
-------
list
The covariance matrix corresponding to this kernel.
"""
continuous_model = [[] for i in
range(len(accepted_parameters_manager.kernel_parameters_bds.value()[kernel_index]))]
for i in range(len(accepted_parameters_manager.kernel_parameters_bds.value()[kernel_index])):
if isinstance(accepted_parameters_manager.kernel_parameters_bds.value()[kernel_index][i][0],
(np.float, np.float32, np.float64, np.int, np.int32, np.int64)):
continuous_model[i] = accepted_parameters_manager.kernel_parameters_bds.value()[kernel_index][i]
else:
continuous_model[i] = np.concatenate(
accepted_parameters_manager.kernel_parameters_bds.value()[kernel_index][i])
continuous_model = np.array(continuous_model).astype(float)
if accepted_parameters_manager.accepted_weights_bds is not None:
weights = np.array(accepted_parameters_manager.accepted_weights_bds.value())
cov = np.cov(continuous_model, aweights=weights.reshape(-1).astype(float), rowvar=False)
else:
cov = np.cov(continuous_model, rowvar=False)
return cov
[docs] def update(self, accepted_parameters_manager, kernel_index, row_index, rng=np.random.RandomState()):
"""
Updates the parameter values contained in the accepted_paramters_manager using a multivariate normal distribution.
Parameters
----------
accepted_parameters_manager: abcpy.AcceptedParametersManager object
Defines the AcceptedParametersManager to be used.
kernel_index: integer
The index of the kernel in the list of kernels in the joint kernel.
row_index: integer
The index of the row that should be considered from the accepted_parameters_bds matrix.
rng: random number generator
The random number generator to be used.
Returns
-------
np.ndarray
The perturbed parameter values.
"""
# Get all parameters relevant to this kernel
continuous_model_values = accepted_parameters_manager.kernel_parameters_bds.value()[kernel_index][row_index]
if isinstance(continuous_model_values[0],
(np.float, np.float32, np.float64, np.int, np.int32, np.int64)):
# Perturb
continuous_model_values = np.array(continuous_model_values)
cov = np.array(accepted_parameters_manager.accepted_cov_mats_bds.value()[kernel_index]).astype(float)
p = len(continuous_model_values)
if self.df == np.inf:
chisq = 1.0
else:
chisq = rng.chisquare(self.df, 1) / self.df
chisq = chisq.reshape(-1, 1).repeat(p, axis=1)
mvn = rng.multivariate_normal(np.zeros(p), cov.astype(float), 1)
perturbed_continuous_values = continuous_model_values + np.divide(mvn, np.sqrt(chisq))[0]
else:
# Learn the structure
struct = [[] for i in range(len(continuous_model_values))]
for i in range(len(continuous_model_values)):
struct[i] = continuous_model_values[i].shape[0]
struct = np.array(struct).cumsum()
continuous_model_values = np.concatenate(continuous_model_values)
# Perturb
cov = np.array(accepted_parameters_manager.accepted_cov_mats_bds.value()[kernel_index]).astype(float)
p = len(continuous_model_values)
if self.df == np.inf:
chisq = 1.0
else:
chisq = rng.chisquare(self.df, 1) / self.df
chisq = chisq.reshape(-1, 1).repeat(p, axis=1)
mvn = rng.multivariate_normal(np.zeros(p), cov.astype(float), 1)
perturbed_continuous_values = continuous_model_values + np.divide(mvn, np.sqrt(chisq))[0]
# Perturbed values anc split according to the structure
perturbed_continuous_values = np.split(perturbed_continuous_values, struct)[:-1]
return perturbed_continuous_values
[docs] def pdf(self, accepted_parameters_manager, kernel_index, mean, x):
"""Calculates the pdf of the kernel.
Commonly used to calculate weights.
Parameters
----------
accepted_parameters_manager: abcpy.AcceptedParametersManager object
The AcceptedParametersManager to be used.
kernel_index: integer
The index of the kernel in the list of kernels in the joint kernel.
index: integer
The row to be considered in the accepted_parameters_bds matrix.
x: The point at which the pdf should be evaluated.
Returns
-------
float
The pdf evaluated at point x.
"""
if isinstance(mean[0],
(np.float, np.float32, np.float64, np.int, np.int32, np.int64)):
mean = np.array(mean).astype(float)
cov = np.array(accepted_parameters_manager.accepted_cov_mats_bds.value()[kernel_index]).astype(float)
v = self.df
p = len(mean)
numerator = gamma((v + p) / 2)
denominator = gamma(v / 2) * pow(v * np.pi, p / 2.) * np.sqrt(abs(np.linalg.det(cov)))
normalizing_const = numerator / denominator
tmp = 1 + 1 / v * np.dot(np.dot(np.transpose(x - mean), np.linalg.inv(cov)), (x - mean))
density = normalizing_const * pow(tmp, -((v + p) / 2.))
return density
else:
mean = np.array(np.concatenate(mean)).astype(float)
cov = np.array(accepted_parameters_manager.accepted_cov_mats_bds.value()[kernel_index]).astype(float)
v = self.df
p = len(mean)
numerator = gamma((v + p) / 2)
denominator = gamma(v / 2) * pow(v * np.pi, p / 2.) * np.sqrt(abs(np.linalg.det(cov)))
normalizing_const = numerator / denominator
tmp = 1 + 1 / v * np.dot(np.dot(np.transpose(np.concatenate(x) - mean), np.linalg.inv(cov)),
(np.concatenate(x) - mean))
density = normalizing_const * pow(tmp, -((v + p) / 2.))
return density
[docs]class RandomWalkKernel(PerturbationKernel, DiscreteKernel):
[docs] def __init__(self, models):
"""
This class defines a kernel perturbing discrete parameters using a naive random walk.
Parameters
----------
models: list
List of abcpy.ProbabilisticModel objects
"""
self.models = models
[docs] def update(self, accepted_parameters_manager, kernel_index, row_index, rng=np.random.RandomState()):
"""
Updates the parameter values contained in the accepted_paramters_manager using a random walk.
Parameters
----------
accepted_parameters_manager: abcpy.AcceptedParametersManager object
Defines the AcceptedParametersManager to be used.
row_index: integer
The index of the row that should be considered from the accepted_parameters_bds matrix.
rng: random number generator
The random number generator to be used.
Returns
-------
np.ndarray
The perturbed parameter values.
"""
# Get parameter values relevant to this kernel
discrete_model_values = accepted_parameters_manager.kernel_parameters_bds.value()[kernel_index]
perturbed_discrete_values = []
discrete_model_values = np.array(discrete_model_values)[row_index]
# Implement a random walk for the discrete parameter values
for discrete_value in discrete_model_values:
perturbed_discrete_values.append(np.array([rng.randint(discrete_value - 1, discrete_value + 2)]))
return perturbed_discrete_values
[docs] def calculate_cov(self, accepted_parameters_manager, kernel_index):
"""
Calculates the covariance matrix of this kernel. Since there is no covariance matrix associated with this
random walk, it returns an empty list.
"""
return np.array([0]).reshape(-1, )
[docs] def pmf(self, accepted_parameters_manager, kernel_index, mean, x):
"""
Calculates the pmf of the kernel. Commonly used to calculate weights.
Parameters
----------
cov: list
The covariance matrix used for this kernel. This is a dummy input.
accepted_parameters_manager: abcpy.AcceptedParametersManager object
The AcceptedParametersManager to be used.
kernel_index: integer
The index of the kernel in the list of kernels of the joint kernel.
index: integer
The row to be considered in the accepted_parameters_bds matrix.
x: The point at which the pdf should be evaluated.
Returns
-------
float
The pmf evaluated at point x.
"""
return 1. / 3
[docs]class DefaultKernel(JointPerturbationKernel):
[docs] def __init__(self, models):
"""
This class implements a kernel that perturbs all continuous parameters using a multivariate normal, and all
discrete parameters using a random walk. To be used as an example for user defined kernels.
Parameters
----------
models: list
List of abcpy.ProbabilisticModel objects, the models for which the kernel should be defined.
"""
continuous_models = []
discrete_models = []
for model in models:
if isinstance(model, Continuous):
continuous_models.append(model)
else:
discrete_models.append(model)
continuous_kernel = MultivariateNormalKernel(continuous_models)
discrete_kernel = RandomWalkKernel(discrete_models)
if not continuous_models:
super(DefaultKernel, self).__init__([discrete_kernel])
elif not discrete_models:
super(DefaultKernel, self).__init__([continuous_kernel])
else:
super(DefaultKernel, self).__init__([continuous_kernel, discrete_kernel])