import numpy as np
import warnings
from abc import ABCMeta, abstractmethod
from glmnet import LogitNet
from sklearn import linear_model
from sklearn.neighbors import NearestNeighbors
from abcpy.utils import wass_dist
[docs]class Distance(metaclass=ABCMeta):
"""This abstract base class defines how the distance between the observed and
simulated data should be implemented.
"""
[docs] def __init__(self, statistics_calc):
"""The constructor of a sub-class must accept a non-optional statistics
calculator as a parameter; then, it must call the __init__ method of the parent class. This ensures that the
object is initialized correctly so that the _calculate_summary_stat private method can be called when computing
the distances.
Parameters
----------
statistics_calc : abcpy.statistics.Statistics
Statistics extractor object that conforms to the Statistics class.
"""
self.statistics_calc = statistics_calc
# Since the observations do always stay the same, we can save the
# summary statistics of them and not recalculate it each time
self.s1 = None
self.data_set = None
self.dataSame = False
[docs] @abstractmethod
def distance(self, d1, d2):
"""To be overwritten by any sub-class: should calculate the distance between two
sets of data d1 and d2 using their respective statistics.
Usually, calling the _calculate_summary_stat private method to obtain statistics from the datasets is handy;
that also keeps track of the first provided dataset (which is the observation in ABCpy inference schemes) and
avoids computing the statistics for that multiple times.
Notes
-----
The data sets d1 and d2 are array-like structures that contain n1 and n2 data
points each. An implementation of the distance function should work along
the following steps:
1. Transform both input sets dX = [ dX1, dX2, ..., dXn ] to sX = [sX1, sX2,
..., sXn] using the statistics object. See _calculate_summary_stat method.
2. Calculate the mutual desired distance, here denoted by - between the
statistics; for instance, dist = [s11 - s21, s12 - s22, ..., s1n - s2n] (in some cases however you
may want to compute all pairwise distances between statistics elements.
Important: any sub-class must not calculate the distance between data sets
d1 and d2 directly. This is the reason why any sub-class must be
initialized with a statistics object.
Parameters
----------
d1: Python list
Contains n1 data points.
d2: Python list
Contains n2 data points.
Returns
-------
numpy.float
The distance between the two input data sets.
"""
raise NotImplementedError
[docs] @abstractmethod
def dist_max(self):
"""To be overwritten by sub-class: should return maximum possible value of the
desired distance function.
Examples
--------
If the desired distance maps to :math:`\mathbb{R}`, this method should return numpy.inf.
Returns
-------
numpy.float
The maximal possible value of the desired distance function.
"""
raise NotImplementedError
def _calculate_summary_stat(self, d1, d2):
"""Helper function that extracts the summary statistics s1 and s2 from d1 and
d2 using the statistics object stored in self.statistics_calc. This stores s1 for the purpose of checking
whether that is repeated in next calls to the function, and avoiding computing the statitistics for the same
dataset several times.
Parameters
----------
d1 : array-like
d1 contains n data sets.
d2 : array-like
d2 contains m data sets.
Returns
-------
tuple
Tuple containing numpy.ndarray's with the summary statistics extracted from d1 and d2.
"""
if not isinstance(d1, list):
raise TypeError('Data is not of allowed types')
if not isinstance(d2, list):
raise TypeError('Data is not of allowed types')
# Check whether d1 is same as self.data_set
if self.data_set is not None:
# check that the the observations have the same length; if not, they can't be the same:
if len(d1) != len(self.data_set):
self.dataSame = False
elif len(np.array(d1[0]).reshape(-1, )) == 1:
self.dataSame = self.data_set == d1
else:
self.dataSame = all([(np.array(self.data_set[i]) == np.array(d1[i])).all() for i in range(len(d1))])
# Extract summary statistics from the dataset
if self.s1 is None or self.dataSame is False:
self.s1 = self.statistics_calc.statistics(d1)
self.data_set = d1
s2 = self.statistics_calc.statistics(d2)
if self.s1.shape[1] != s2.shape[1]:
raise ValueError("The dimension of summaries in the two datasets is different; check the dimension of the"
" provided observations and simulations.")
return self.s1, s2
[docs]class Divergence(Distance, metaclass=ABCMeta):
"""This is an abstract class which subclasses Distance, and is used as a parent class for all divergence
estimators; more specifically, it is used for all Distances which compare the empirical distribution of simulations
and observations."""
@abstractmethod
def _estimate_always_positive(self):
"""This returns whether the implemented divergence always returns positive values or not. In fact, some
estimators may return negative values, which may break some inference algorithms"""
raise NotImplementedError
[docs]class Euclidean(Distance):
"""
This class implements the Euclidean distance between two vectors.
The maximum value of the distance is np.inf.
Parameters
----------
statistics_calc : abcpy.statistics.Statistics
Statistics extractor object that conforms to the Statistics class.
"""
[docs] def __init__(self, statistics_calc):
super(Euclidean, self).__init__(statistics_calc)
[docs] def distance(self, d1, d2):
"""Calculates the distance between two datasets, by computing Euclidean distance between each element of d1 and
d2 and taking their average.
Parameters
----------
d1: Python list
Contains n1 data points.
d2: Python list
Contains n2 data points.
Returns
-------
numpy.float
The distance between the two input data sets.
"""
s1, s2 = self._calculate_summary_stat(d1, d2)
# compute distance between the statistics
dist = np.zeros(shape=(s1.shape[0], s2.shape[0]))
for ind1 in range(0, s1.shape[0]):
for ind2 in range(0, s2.shape[0]):
dist[ind1, ind2] = np.sqrt(np.sum(pow(s1[ind1, :] - s2[ind2, :], 2)))
return dist.mean()
[docs] def dist_max(self):
"""
Returns
-------
numpy.float
The maximal possible value of the desired distance function.
"""
return np.inf
[docs]class PenLogReg(Divergence):
"""
This class implements a distance measure based on the classification accuracy.
The classification accuracy is calculated between two dataset d1 and d2 using
lasso penalized logistics regression and return it as a distance. The lasso
penalized logistic regression is done using glmnet package of Friedman et. al.
[2]. While computing the distance, the algorithm automatically chooses
the most relevant summary statistics as explained in Gutmann et. al. [1].
The maximum value of the distance is 1.0.
[1] Gutmann, M. U., Dutta, R., Kaski, S., & Corander, J. (2018). Likelihood-free inference via classification.
Statistics and Computing, 28(2), 411-425.
[2] Friedman, J., Hastie, T., and Tibshirani, R. (2010). Regularization
paths for generalized linear models via coordinate descent. Journal of Statistical
Software, 33(1), 1–22.
Parameters
----------
statistics_calc : abcpy.statistics.Statistics
Statistics extractor object that conforms to the Statistics class.
"""
[docs] def __init__(self, statistics_calc):
super(PenLogReg, self).__init__(statistics_calc)
self.n_folds = 10 # for cross validation in PenLogReg
[docs] def distance(self, d1, d2):
"""Calculates the distance between two datasets.
Parameters
----------
d1: Python list
Contains n1 data points.
d2: Python list
Contains n2 data points.
Returns
-------
numpy.float
The distance between the two input data sets.
"""
s1, s2 = self._calculate_summary_stat(d1, d2)
self.n_simulate = s1.shape[0]
if not s2.shape[0] == self.n_simulate:
raise RuntimeError("The number of simulations in the two data sets should be the same in order for "
"the classification accuracy implemented in PenLogReg to be a proper distance. Please "
"check that `n_samples` in the `sample()` method for the sampler is equal to "
"the number of datasets in the observations.")
# compute distance between the statistics
training_set_features = np.concatenate((s1, s2), axis=0)
label_s1 = np.zeros(shape=(len(s1), 1))
label_s2 = np.ones(shape=(len(s2), 1))
training_set_labels = np.concatenate((label_s1, label_s2), axis=0).ravel()
groups = np.repeat(np.arange(self.n_folds), np.int(np.ceil(self.n_simulate / self.n_folds)))
groups = groups[:self.n_simulate].tolist()
groups += groups # duplicate it as groups need to be defined for both datasets
m = LogitNet(alpha=1, n_splits=self.n_folds) # note we are not using random seed here!
m = m.fit(training_set_features, training_set_labels, groups=groups)
distance = 2.0 * (m.cv_mean_score_[np.where(m.lambda_path_ == m.lambda_max_)[0][0]] - 0.5)
return distance
[docs] def dist_max(self):
"""
Returns
-------
numpy.float
The maximal possible value of the desired distance function.
"""
return 1.0
def _estimate_always_positive(self):
return False
[docs]class LogReg(Divergence):
"""This class implements a distance measure based on the classification
accuracy [1]. The classification accuracy is calculated between two dataset d1 and d2 using
logistics regression and return it as a distance. The maximum value of the distance is 1.0.
The logistic regression may not converge when using one single sample in each dataset (as for instance by putting
n_samples_per_param=1 in an inference routine).
[1] Gutmann, M. U., Dutta, R., Kaski, S., & Corander, J. (2018). Likelihood-free inference via classification.
Statistics and Computing, 28(2), 411-425.
Parameters
----------
statistics_calc : abcpy.statistics.Statistics
Statistics extractor object that conforms to the Statistics class.
seed : integer, optionl
Seed used to initialize the Random Numbers Generator used to determine the (random) cross validation split
in the Logistic Regression classifier.
"""
[docs] def __init__(self, statistics_calc, seed=None):
super(LogReg, self).__init__(statistics_calc)
# seed is used for a RandomState for the random split in the LogisticRegression classifier:
self.rng = np.random.RandomState(seed=seed)
[docs] def distance(self, d1, d2):
"""Calculates the distance between two datasets.
Parameters
----------
d1: Python list
Contains n1 data points.
d2: Python list
Contains n2 data points.
Returns
-------
numpy.float
The distance between the two input data sets.
"""
s1, s2 = self._calculate_summary_stat(d1, d2)
# compute distance between the statistics
training_set_features = np.concatenate((s1, s2), axis=0)
label_s1 = np.zeros(shape=(len(s1), 1))
label_s2 = np.ones(shape=(len(s2), 1))
training_set_labels = np.concatenate((label_s1, label_s2), axis=0).ravel()
reg_inv = 1e5
log_reg_model = linear_model.LogisticRegression(C=reg_inv, penalty='l1', max_iter=1000, solver='liblinear',
random_state=self.rng.randint(0, np.iinfo(np.uint32).max))
log_reg_model.fit(training_set_features, training_set_labels)
score = log_reg_model.score(training_set_features, training_set_labels)
distance = 2.0 * (score - 0.5)
return distance
[docs] def dist_max(self):
"""
Returns
-------
numpy.float
The maximal possible value of the desired distance function.
"""
return 1.0
def _estimate_always_positive(self):
return False
[docs]class Wasserstein(Divergence):
"""This class implements a distance measure based on the 2-Wasserstein distance, as used in [1]. This considers the
several simulations/observations in the datasets as iid samples from the model for a fixed parameter value/from the
data generating model, and computes the 2-Wasserstein distance between the empirical distributions those
simulations/observations define.
[1] Bernton, E., Jacob, P.E., Gerber, M. and Robert, C.P. (2019), Approximate Bayesian computation with the
Wasserstein distance. J. R. Stat. Soc. B, 81: 235-269. doi:10.1111/rssb.12312
Parameters
----------
statistics_calc : abcpy.statistics.Statistics
Statistics extractor object that conforms to the Statistics class.
num_iter_max : integer, optional
The maximum number of iterations in the linear programming algorithm to estimate the Wasserstein distance.
Default to 100000.
"""
[docs] def __init__(self, statistics_calc, num_iter_max=100000):
super(Wasserstein, self).__init__(statistics_calc)
self.num_iter_max = num_iter_max
[docs] def distance(self, d1, d2):
"""Calculates the distance between two datasets.
Parameters
----------
d1: Python list
Contains n1 data points.
d2: Python list
Contains n2 data points.
Returns
-------
numpy.float
The distance between the two input data sets.
"""
s1, s2 = self._calculate_summary_stat(d1, d2)
# compute the Wasserstein distance between the empirical distributions:
return wass_dist(samples_1=s1, samples_2=s2, num_iter_max=self.num_iter_max)
[docs] def dist_max(self):
"""
Returns
-------
numpy.float
The maximal possible value of the desired distance function.
"""
# As the statistics are positive, the max possible value is 1
return np.inf
def _estimate_always_positive(self):
return True
[docs]class SlicedWasserstein(Divergence):
"""This class implements a distance measure based on the sliced 2-Wasserstein distance, as used in [1].
This considers the several simulations/observations in the datasets as iid samples from the model for a fixed
parameter value/from the data generating model, and computes the sliced 2-Wasserstein distance between the
empirical distributions those simulations/observations define. Specifically, the sliced Wasserstein distance
is a cheaper version of the Wasserstein distance which consists of projecting the multivariate data on 1d directions
and computing the 1d Wasserstein distance, which is computationally cheap. The resulting sliced Wasserstein
distance is obtained by averaging over a given number of projections.
[1] Nadjahi, K., De Bortoli, V., Durmus, A., Badeau, R., & Şimşekli, U. (2020, May). Approximate bayesian
computation with the sliced-wasserstein distance. In ICASSP 2020-2020 IEEE International Conference on Acoustics,
Speech and Signal Processing (ICASSP) (pp. 5470-5474). IEEE.
Parameters
----------
statistics_calc : abcpy.statistics.Statistics
Statistics extractor object that conforms to the Statistics class.
n_projections : int, optional
Number of 1d projections used for estimating the sliced Wasserstein distance. Default value is 50.
rng : np.random.RandomState, optional
random number generators used to generate the projections. If not provided, a new one is instantiated.
"""
[docs] def __init__(self, statistics_calc, n_projections=50, rng=np.random.RandomState()):
super(SlicedWasserstein, self).__init__(statistics_calc)
self.n_projections = n_projections
self.rng = rng
[docs] def distance(self, d1, d2):
"""Calculates the distance between two datasets.
Parameters
----------
d1: Python list
Contains n1 data points.
d2: Python list
Contains n2 data points.
Returns
-------
numpy.float
The distance between the two input data sets.
"""
s1, s2 = self._calculate_summary_stat(d1, d2)
# compute the Wasserstein distance between the empirical distributions:
return self.sliced_wasserstein_distance(X_s=s1, X_t=s2, n_projections=self.n_projections, seed=self.rng)
[docs] def dist_max(self):
"""
Returns
-------
numpy.float
The maximal possible value of the desired distance function.
"""
# As the statistics are positive, the max possible value is 1
return np.inf
def _estimate_always_positive(self):
return True
# the following two functions are taken from
# https://github.com/PythonOT/POT/blob/78b44af2434f494c8f9e4c8c91003fbc0e1d4415/ot/sliced.py
# Author: Adrien Corenflos <adrien.corenflos@aalto.fi>
#
# License: MIT License
[docs] @staticmethod
def get_random_projections(n_projections, d, seed=None):
r"""
Taken from
https://github.com/PythonOT/POT/blob/78b44af2434f494c8f9e4c8c91003fbc0e1d4415/ot/sliced.py
Author: Adrien Corenflos <adrien.corenflos@aalto.fi>
License: MIT License
Generates n_projections samples from the uniform on the unit sphere of dimension d-1: :math:`\mathcal{U}(\mathcal{S}^{d-1})`
Parameters
----------
n_projections : int
number of samples requested
d : int
dimension of the space
seed: int or RandomState, optional
Seed used for numpy random number generator
Returns
-------
out: ndarray, shape (n_projections, d)
The uniform unit vectors on the sphere
Examples
--------
>>> n_projections = 100
>>> d = 5
>>> projs = get_random_projections(n_projections, d)
>>> np.allclose(np.sum(np.square(projs), 1), 1.) # doctest: +NORMALIZE_WHITESPACE
True
"""
if not isinstance(seed, np.random.RandomState):
random_state = np.random.RandomState(seed)
else:
random_state = seed
projections = random_state.normal(0., 1., [n_projections, d])
norm = np.linalg.norm(projections, ord=2, axis=1, keepdims=True)
projections = projections / norm
return projections
[docs] def sliced_wasserstein_distance(self, X_s, X_t, a=None, b=None, n_projections=50, seed=None, log=False):
r"""
Taken from
https://github.com/PythonOT/POT/blob/78b44af2434f494c8f9e4c8c91003fbc0e1d4415/ot/sliced.py
Author: Adrien Corenflos <adrien.corenflos@aalto.fi>
License: MIT License
Computes a Monte-Carlo approximation of the 2-Sliced Wasserstein distance
:math:`\mathcal{SWD}_2(\mu, \nu) = \underset{\theta \sim \mathcal{U}(\mathbb{S}^{d-1})}{\mathbb{E}}[\mathcal{W}_2^2(\theta_\# \mu, \theta_\# \nu)]^{\frac{1}{2}}`
where
:math:`\theta_\# \mu` stands for the pushforwars of the projection :math:`\mathbb{R}^d \ni X \mapsto \langle \theta, X \rangle`
Parameters
----------
X_s : ndarray, shape (n_samples_a, dim)
samples in the source domain
X_t : ndarray, shape (n_samples_b, dim)
samples in the target domain
a : ndarray, shape (n_samples_a,), optional
samples weights in the source domain
b : ndarray, shape (n_samples_b,), optional
samples weights in the target domain
n_projections : int, optional
Number of projections used for the Monte-Carlo approximation
seed: int or RandomState or None, optional
Seed used for numpy random number generator
log: bool, optional
if True, sliced_wasserstein_distance returns the projections used and their associated EMD.
Returns
-------
cost: float
Sliced Wasserstein Cost
log : dict, optional
log dictionary return only if log==True in parameters
Examples
--------
>>> n_samples_a = 20
>>> reg = 0.1
>>> X = np.random.normal(0., 1., (n_samples_a, 5))
>>> sliced_wasserstein_distance(X, X, seed=0) # doctest: +NORMALIZE_WHITESPACE
0.0
References
----------
Bonneel, Nicolas, et al. "Sliced and radon wasserstein barycenters of measures." Journal of Mathematical Imaging and Vision 51.1 (2015): 22-45
"""
from ot.lp import emd2_1d
X_s = np.asanyarray(X_s)
X_t = np.asanyarray(X_t)
n = X_s.shape[0]
m = X_t.shape[0]
if X_s.shape[1] != X_t.shape[1]:
raise ValueError(
"X_s and X_t must have the same number of dimensions {} and {} respectively given".format(X_s.shape[1],
X_t.shape[1]))
if a is None:
a = np.full(n, 1 / n)
if b is None:
b = np.full(m, 1 / m)
d = X_s.shape[1]
projections = self.get_random_projections(n_projections, d, seed)
X_s_projections = np.dot(projections, X_s.T)
X_t_projections = np.dot(projections, X_t.T)
if log:
projected_emd = np.empty(n_projections)
else:
projected_emd = None
res = 0.
for i, (X_s_proj, X_t_proj) in enumerate(zip(X_s_projections, X_t_projections)):
emd = emd2_1d(X_s_proj, X_t_proj, a, b, log=False, dense=False)
if projected_emd is not None:
projected_emd[i] = emd
res += emd
res = (res / n_projections) ** 0.5
if log:
return res, {"projections": projections, "projected_emds": projected_emd}
return res
[docs]class GammaDivergence(Divergence):
"""
This implements an empirical estimator of the gamma-divergence for ABC as suggested in [1]. In [1], the
gamma-divergence was proposed as a divergence which is robust to outliers. The estimator is based on a nearest
neighbor density estimate.
Specifically, this considers the
several simulations/observations in the datasets as iid samples from the model for a fixed parameter value/from the
data generating model, and estimates the divergence between the empirical distributions those
simulations/observations define.
[1] Fujisawa, M., Teshima, T., Sato, I., & Sugiyama, M.
γ-ABC: Outlier-robust approximate Bayesian computation based on a
robust divergence estimator.
In A. Banerjee and K. Fukumizu (Eds.), Proceedings of 24th
International Conference on Artificial Intelligence and Statistics
(AISTATS2021), Proceedings of Machine Learning Research, vol.130,
pp.1783-1791, online, Apr. 13-15, 2021.
Parameters
----------
statistics_calc : abcpy.statistics.Statistics
Statistics extractor object that conforms to the Statistics class.
k : int, optional
nearest neighbor number for the density estimate. Default value is 1
gam : float, optional
the gamma parameter in the definition of the divergence. Default value is 0.1
"""
[docs] def __init__(self, statistics_calc, k=1, gam=0.1):
super(GammaDivergence, self).__init__(statistics_calc)
self.k = k # number of nearest neighbors used in the estimation algorithm
self.gam = gam
[docs] def distance(self, d1, d2):
"""Calculates the distance between two datasets.
Parameters
----------
d1: Python list
Contains n1 data points.
d2: Python list
Contains n2 data points.
Returns
-------
numpy.float
The distance between the two input data sets.
"""
s1, s2 = self._calculate_summary_stat(d1, d2)
if s1.shape[0] > self.k or s2.shape[0] > self.k:
assert ValueError(f"The provided value of k ({self.k}) is smaller or equal than the number of samples "
f"in one of the two datasets; that should instead be larger")
# estimate the gamma divergence using the empirical distributions
return self.skl_estimator_gamma_q(s1=s1, s2=s2, k=self.k, gam=self.gam)
[docs] def dist_max(self):
"""
Returns
-------
numpy.float
The maximal possible value of the desired distance function.
"""
# As the statistics are positive, the max possible value is 1
return np.inf
[docs] @staticmethod
def skl_estimator_gamma_q(s1, s2, k=1, gam=0.1):
""" Gamma-Divergence estimator using scikit-learn's NearestNeighbours
s1: (N_1,D) Sample drawn from distribution P
s2: (N_2,D) Sample drawn from distribution Q
k: Number of neighbours considered (default 1)
return: estimated D(P|Q)
Adapted from code provided by Masahiro Fujisawa (University of Tokyo / RIKEN AIP)
"""
n, m = len(s1), len(s2) # NOTE: here different convention of n, m wrt MMD and EnergyDistance
d = float(s1.shape[1])
radius = 10 # this is not used at all...
s1_neighbourhood = NearestNeighbors(n_neighbors=k + 1, radius=radius, algorithm='kd_tree').fit(s1)
s2_neighbourhood = NearestNeighbors(n_neighbors=k, radius=radius, algorithm='kd_tree').fit(s2)
s3_neighbourhood = NearestNeighbors(n_neighbors=k + 1, radius=radius, algorithm='kd_tree').fit(s2)
d_gam = d * gam
s1_distances, indices = s1_neighbourhood.kneighbors(s1, k + 1)
s2_distances, indices = s2_neighbourhood.kneighbors(s1, k)
rho = s1_distances[:, -1]
nu = s2_distances[:, -1]
if np.any(rho == 0):
warnings.warn(
f"The distance between an element of the first dataset and its {k}-th NN in the same dataset "
f"is 0; this causes divergences in the code, and it is due to elements which are repeated "
f"{k + 1} times in the first dataset. Increasing the value of k usually solves this.",
RuntimeWarning)
# notice: the one below becomes 0 when one element in the s1 dataset is equal to one in the s2 dataset
# and k=1 (as the distance between those two would be 0, which gives infinity when dividing)
if np.any(nu == 0):
warnings.warn(f"The distance between an element of the first dataset and its {k}-th NN in the second "
f"dataset is 0; this causes divergences in the code, and it is usually due to equal "
f"elements"
f" in the two datasets. Increasing the value of k usually solves this.", RuntimeWarning)
second_term = np.sum(1 / (rho ** d_gam)) / (n * (n - 1) ** gam)
fourth_term = np.sum(1 / (nu ** d_gam)) / (n * m ** gam)
s3_distances, indices = s3_neighbourhood.kneighbors(s2, k + 1)
rho_q = s3_distances[:, -1]
if np.any(rho_q == 0):
warnings.warn(
f"The distance between an element of the second dataset and its {k}-th NN in the same dataset "
f"is 0; this causes divergences in the code, and it is due to elements which are repeated "
f"{k + 1} times in the second dataset. Increasing the value of k usually solves this.",
RuntimeWarning)
third_term = np.sum(1 / (rho_q ** d_gam))
# third_term /= m * (m ** gam) # original code: I think the second term here should be m - 1
third_term /= m * (m - 1) ** gam # corrected version
third_term = third_term ** gam
fourth_term = fourth_term ** (1 + gam)
D = (1 / (gam * (gam + 1))) * (np.log((second_term * third_term) / fourth_term))
return D
def _estimate_always_positive(self):
return False
[docs]class KLDivergence(Divergence):
"""
This implements an empirical estimator of the KL divergence for ABC as suggested in [1]. The estimator is based
on a nearest neighbor density estimate.
Specifically, this considers the
several simulations/observations in the datasets as iid samples from the model for a fixed parameter value/from the
data generating model, and estimates the divergence between the empirical distributions those
simulations/observations define.
[1] Jiang, B. (2018, March). Approximate Bayesian computation with Kullback-Leibler divergence as data discrepancy.
In International Conference on Artificial Intelligence and Statistics (pp. 1711-1721). PMLR.
Parameters
----------
statistics_calc : abcpy.statistics.Statistics
Statistics extractor object that conforms to the Statistics class.
k : int, optional
nearest neighbor number for the density estimate. Default value is 1
"""
[docs] def __init__(self, statistics_calc, k=1):
super(KLDivergence, self).__init__(statistics_calc)
self.k = k # number of nearest neighbors used in the estimation algorithm
[docs] def distance(self, d1, d2):
"""Calculates the distance between two datasets.
Parameters
----------
d1: Python list
Contains n1 data points.
d2: Python list
Contains n2 data points.
Returns
-------
numpy.float
The distance between the two input data sets.
"""
s1, s2 = self._calculate_summary_stat(d1, d2)
if s1.shape[0] > self.k or s2.shape[0] > self.k:
assert ValueError(f"The provided value of k ({self.k}) is smaller or equal than the number of samples "
f"in one of the two datasets; that should instead be larger")
# estimate the KL divergence using the empirical distributions
return self.skl_estimator_KL_div(s1=s1, s2=s2, k=self.k)
[docs] def dist_max(self):
"""
Returns
-------
numpy.float
The maximal possible value of the desired distance function.
"""
# As the statistics are positive, the max possible value is 1
return np.inf
[docs] @staticmethod
def skl_estimator_KL_div(s1, s2, k=1):
"""
Adapted from https://github.com/nhartland/KL-divergence-estimators/blob/5473a23f5f13d7557100504611c57c9225b1a6eb/src/knn_divergence.py
MIT license
KL-Divergence estimator using scikit-learn's NearestNeighbours
s1: (N_1,D) Sample drawn from distribution P
s2: (N_2,D) Sample drawn from distribution Q
k: Number of neighbours considered (default 1)
return: estimated D(P|Q)
"""
n, m = len(s1), len(s2) # NOTE: here different convention of n, m wrt MMD and EnergyDistance
d = float(s1.shape[1])
radius = 10 # this is useless
s1_neighbourhood = NearestNeighbors(n_neighbors=k + 1, radius=radius, algorithm='kd_tree').fit(s1)
s2_neighbourhood = NearestNeighbors(n_neighbors=k, radius=radius, algorithm='kd_tree').fit(s2)
s1_distances, indices = s1_neighbourhood.kneighbors(s1, k + 1)
s2_distances, indices = s2_neighbourhood.kneighbors(s1, k)
rho = s1_distances[:, -1]
nu = s2_distances[:, -1]
if np.any(rho == 0):
warnings.warn(
f"The distance between an element of the first dataset and its {k}-th NN in the same dataset "
f"is 0; this causes divergences in the code, and it is due to elements which are repeated "
f"{k + 1} times in the first dataset. Increasing the value of k usually solves this.",
RuntimeWarning)
D = np.sum(np.log(nu / rho))
return (d / n) * D + np.log(m / (n - 1)) # this second term should be enough for it to be valid for m \neq n
def _estimate_always_positive(self):
return False
[docs]class MMD(Divergence):
"""
This implements an empirical estimator of the MMD for ABC as suggested in [1]. This class implements a gaussian
kernel by default but allows specifying different kernel functions. Notice that the original version in [1]
suggested an unbiased estimate, which however can return negative values. We also provide a biased but provably
positive estimator following the remarks in [2].
Specifically, this considers the
several simulations/observations in the datasets as iid samples from the model for a fixed parameter value/from the
data generating model, and estimates the MMD between the empirical distributions those
simulations/observations define.
[1] Park, M., Jitkrittum, W., & Sejdinovic, D. (2016, May). K2-ABC: Approximate Bayesian computation with
kernel embeddings. In Artificial Intelligence and Statistics (pp. 398-407). PMLR.
[2] Nguyen, H. D., Arbel, J., Lü, H., & Forbes, F. (2020). Approximate Bayesian computation via the energy
statistic. IEEE Access, 8, 131683-131698.
Parameters
----------
statistics_calc : abcpy.statistics.Statistics
Statistics extractor object that conforms to the Statistics class.
kernel : str or callable
Can be a string denoting the kernel, or a function. If a string, only gaussian is implemented for now; in
that case, you can also provide an additional keyword parameter 'sigma' which is used as the sigma in the
kernel. Default is the gaussian kernel.
biased_estimator : boolean, optional
Whether to use the biased (but always positive) or unbiased estimator; by default, it uses the biased one.
kernel_kwargs
Additional keyword arguments to be passed to the distance calculator.
"""
[docs] def __init__(self, statistics_calc, kernel="gaussian", biased_estimator=False, **kernel_kwargs):
super(MMD, self).__init__(statistics_calc)
self.kernel_vectorized = False
if not isinstance(kernel, str) and not callable(kernel):
raise RuntimeError("'kernel' must be either a string or a function.")
if isinstance(kernel, str):
if kernel == "gaussian":
self.kernel = self.def_gaussian_kernel(**kernel_kwargs)
self.kernel_vectorized = True # the gaussian kernel is vectorized
else:
raise NotImplementedError("The required kernel is not implemented.")
else:
self.kernel = kernel # if kernel is a callable already
self.biased_estimator = biased_estimator
[docs] def distance(self, d1, d2):
"""Calculates the distance between two datasets.
Parameters
----------
d1: Python list
Contains n1 data points.
d2: Python list
Contains n2 data points.
Returns
-------
numpy.float
The distance between the two input data sets.
"""
s1, s2 = self._calculate_summary_stat(d1, d2)
# compute the Gram matrix
K11, K22, K12 = self.compute_Gram_matrix(s1, s2)
# Estimate MMD
if self.biased_estimator:
return self.MMD_V_estimator(K11, K22, K12)
else:
return self.MMD_unbiased(K11, K22, K12)
[docs] def dist_max(self):
"""
Returns
-------
numpy.float
The maximal possible value of the desired distance function.
"""
# As the statistics are positive, the max possible value is 1
return np.inf
[docs] @staticmethod
def def_gaussian_kernel(sigma=1):
# notice in the MMD paper they set sigma to a median value over the observation; check that.
sigma_2 = 2 * sigma ** 2
# def Gaussian_kernel(x, y):
# xy = x - y
# # assert np.allclose(np.dot(xy, xy), np.linalg.norm(xy) ** 2)
# return np.exp(- np.dot(xy, xy) / sigma_2)
def Gaussian_kernel_vectorized(X, Y):
"""Here X and Y have shape (n_samples_x, n_features) and (n_samples_y, n_features);
this directly computes the kernel for all pairwise components"""
XY = X.reshape(X.shape[0], 1, -1) - Y.reshape(1, Y.shape[0], -1) # pairwise differences
return np.exp(- np.einsum('xyi,xyi->xy', XY, XY) / sigma_2)
return Gaussian_kernel_vectorized
[docs] def compute_Gram_matrix(self, s1, s2):
if self.kernel_vectorized:
K11 = self.kernel(s1, s1)
K22 = self.kernel(s2, s2)
K12 = self.kernel(s1, s2)
else:
m = s1.shape[0]
n = s2.shape[0]
K11 = np.zeros((m, m))
K22 = np.zeros((n, n))
K12 = np.zeros((m, n))
for i in range(m):
# we assume the function to be symmetric; this saves some steps:
for j in range(i, m):
K11[j, i] = K11[i, j] = self.kernel(s1[i], s1[j])
for i in range(n):
# we assume the function to be symmetric; this saves some steps:
for j in range(i, n):
K22[j, i] = K22[i, j] = self.kernel(s2[i], s2[j])
for i in range(m):
for j in range(n):
K12[i, j] = self.kernel(s1[i], s2[j])
# can we improve the above? Could use map but would not change too much likely.
return K11, K22, K12
[docs] @staticmethod
def MMD_unbiased(Kxx, Kyy, Kxy):
# from https://github.com/eugenium/MMD/blob/2fe67cbc7378f10f3b273cfd8d8bbd2135db5798/mmd.py
# The estimate when distribution of x is not equal to y
m = Kxx.shape[0]
n = Kyy.shape[0]
t1 = (1. / (m * (m - 1))) * np.sum(Kxx - np.diag(np.diagonal(Kxx)))
t2 = (2. / (m * n)) * np.sum(Kxy)
t3 = (1. / (n * (n - 1))) * np.sum(Kyy - np.diag(np.diagonal(Kyy)))
MMDsquared = (t1 - t2 + t3)
return MMDsquared
[docs] @staticmethod
def MMD_V_estimator(Kxx, Kyy, Kxy):
# The estimate when distribution of x is not equal to y
m = Kxx.shape[0]
n = Kyy.shape[0]
t1 = (1. / (m * m)) * np.sum(Kxx)
t2 = (2. / (m * n)) * np.sum(Kxy)
t3 = (1. / (n * n)) * np.sum(Kyy)
MMDsquared = (t1 - t2 + t3)
return MMDsquared
def _estimate_always_positive(self):
return self.biased_estimator
[docs]class EnergyDistance(MMD):
"""
This implements an empirical estimator of the Energy Distance for ABC as suggested in [1].
This class uses the Euclidean distance by default as a base distance, but allows to pass different distances.
Moreover, when the Euclidean distance is specified, it is possible to pass an additional keyword argument `beta`
which denotes the power of the distance to consider.
In [1], the authors suggest to use a biased but provably positive estimator; we also provide an unbiased estimate,
which however can return negative values.
Specifically, this considers the
several simulations/observations in the datasets as iid samples from the model for a fixed parameter value/from the
data generating model, and estimates the MMD between the empirical distributions those
simulations/observations define.
[1] Nguyen, H. D., Arbel, J., Lü, H., & Forbes, F. (2020). Approximate Bayesian computation via the energy
statistic. IEEE Access, 8, 131683-131698.
Parameters
----------
statistics_calc : abcpy.statistics.Statistics
Statistics extractor object that conforms to the Statistics class.
base_distance : str or callable
Can be a string denoting the kernel, or a function. If a string, only Euclidean distance is implemented
for now; in that case, you can also provide an additional keyword parameter 'beta' which is the power
of the distance to consider. By default, this uses the Euclidean distance.
biased_estimator : boolean, optional
Whether to use the biased (but always positive) or unbiased estimator; by default, it uses the biased one.
base_distance_kwargs
Additional keyword arguments to be passed to the distance calculator.
"""
[docs] def __init__(self, statistics_calc, base_distance="Euclidean", biased_estimator=True, **base_distance_kwargs):
if not isinstance(base_distance, str) and not callable(base_distance):
raise RuntimeError("'base_distance' must be either a string or a function.")
if isinstance(base_distance, str):
if base_distance == "Euclidean":
self.base_distance = self.def_Euclidean_distance(**base_distance_kwargs)
else:
raise NotImplementedError("The required kernel is not implemented.")
else:
self.base_distance = base_distance # if base_distance is a callable already
self.biased_estimator = biased_estimator
def negative_distance(*args):
return - self.base_distance(*args)
super(EnergyDistance, self).__init__(statistics_calc, kernel=negative_distance,
biased_estimator=self.biased_estimator)
[docs] def dist_max(self):
"""
Returns
-------
numpy.float
The maximal possible value of the desired distance function.
"""
# As the statistics are positive, the max possible value is 1
return np.inf
[docs] @staticmethod
def def_Euclidean_distance(beta=1):
if beta <= 0 or beta > 2:
raise RuntimeError("'beta' not in the right range (0,2]")
if beta == 1:
def Euclidean_distance(x, y):
return np.linalg.norm(x - y)
else:
def Euclidean_distance(x, y):
return np.linalg.norm(x - y) ** beta
return Euclidean_distance
[docs]class SquaredHellingerDistance(Divergence):
"""
This implements an empirical estimator of the squared Hellinger distance for ABC. Using the Hellinger distance was
suggested originally in [1], but as that work did not provide originally any implementation details, this
implementation is original. The estimator is based on a nearest neighbor density estimate.
Specifically, this considers the
several simulations/observations in the datasets as iid samples from the model for a fixed parameter value/from the
data generating model, and estimates the divergence between the empirical distributions those
simulations/observations define.
[1] Frazier, D. T. (2020). Robust and Efficient Approximate Bayesian Computation: A Minimum Distance Approach.
arXiv preprint arXiv:2006.14126.
Parameters
----------
statistics_calc : abcpy.statistics.Statistics
Statistics extractor object that conforms to the Statistics class.
k : int, optional
nearest neighbor number for the density estimate. Default value is 1
"""
[docs] def __init__(self, statistics_calc, k=1):
super(SquaredHellingerDistance, self).__init__(statistics_calc)
self.k = k # number of nearest neighbors used in the estimation algorithm
[docs] def distance(self, d1, d2):
"""Calculates the distance between two datasets.
Parameters
----------
d1: Python list
Contains n1 data points.
d2: Python list
Contains n2 data points.
Returns
-------
numpy.float
The distance between the two input data sets.
"""
s1, s2 = self._calculate_summary_stat(d1, d2)
if s1.shape[0] > self.k or s2.shape[0] > self.k:
assert ValueError(f"The provided value of k ({self.k}) is smaller or equal than the number of samples "
f"in one of the two datasets; that should instead be larger")
# estimate the gamma divergence using the empirical distributions
return self.skl_estimator_squared_Hellinger_distance(s1=s1, s2=s2, k=self.k)
[docs] def dist_max(self):
"""
Returns
-------
numpy.float
The maximal possible value of the desired distance function.
"""
return 2
[docs] @staticmethod
def skl_estimator_squared_Hellinger_distance(s1, s2, k=1):
""" Squared Hellinger distance estimator using scikit-learn's NearestNeighbours
s1: (N_1,D) Sample drawn from distribution P
s2: (N_2,D) Sample drawn from distribution Q
k: Number of neighbours considered (default 1)
return: estimated D(P|Q)
"""
n, m = len(s1), len(s2) # NOTE: here different convention of n, m wrt MMD and EnergyDistance
d = float(s1.shape[1])
d_2 = d / 2
radius = 10 # this is not used at all...
s1_neighbourhood_k1 = NearestNeighbors(n_neighbors=k + 1, radius=radius, algorithm='kd_tree').fit(s1)
s1_neighbourhood_k = NearestNeighbors(n_neighbors=k, radius=radius, algorithm='kd_tree').fit(s1)
s2_neighbourhood_k1 = NearestNeighbors(n_neighbors=k + 1, radius=radius, algorithm='kd_tree').fit(s2)
s2_neighbourhood_k = NearestNeighbors(n_neighbors=k, radius=radius, algorithm='kd_tree').fit(s2)
s1_distances, indices = s1_neighbourhood_k1.kneighbors(s1, k + 1)
s2_distances, indices = s2_neighbourhood_k.kneighbors(s1, k)
rho = s1_distances[:, -1]
nu = s2_distances[:, -1]
if np.any(rho == 0):
warnings.warn(
f"The distance between an element of the first dataset and its {k}-th NN in the same dataset "
f"is 0; this is due to elements which are repeated "
f"{k + 1} times in the first dataset, and may lead to a poor estimate of the distance. "
f"Increasing the value of k usually solves this.",
RuntimeWarning)
if np.any(nu == 0):
warnings.warn(f"The distance between an element of the first dataset and its {k}-th NN in the second "
f"dataset is 0; this causes divergences in the code, and it is usually due to equal "
f"elements"
f" in the two datasets. Increasing the value of k usually solves this.", RuntimeWarning)
first_estimator = np.sum((rho / nu) ** d_2)
first_estimator = 2 - 2 * np.sqrt((n - 1) / m) * first_estimator
s2_distances, indices = s2_neighbourhood_k1.kneighbors(s2, k + 1)
s1_distances, indices = s1_neighbourhood_k.kneighbors(s2, k)
rho = s2_distances[:, -1]
nu = s1_distances[:, -1]
if np.any(rho == 0):
warnings.warn(
f"The distance between an element of the second dataset and its {k}-th NN in the same dataset "
f"is 0; this is due to elements which are repeated "
f"{k + 1} times in the second dataset, and may lead to a poor estimate of the distance. "
f"Increasing the value of k usually solves this.",
RuntimeWarning)
# notice: the one below becomes 0 when one element in the s1 dataset is equal to one in the s2 dataset
# and k=1 (as the distance between those two would be 0, which gives infinity when dividing)
if np.any(nu == 0):
warnings.warn(f"The distance between an element of the second dataset and its {k}-th NN in the first "
f"dataset is 0; this causes divergences in the code, and it is usually due to equal "
f"elements"
f" in the two datasets. Increasing the value of k usually solves this.", RuntimeWarning)
second_estimator = np.sum((rho / nu) ** d_2)
second_estimator = 2 - 2 * np.sqrt((m - 1) / n) * second_estimator
# average the two estimators:
final_estimator = 0.5 * (first_estimator + second_estimator)
return final_estimator
def _estimate_always_positive(self):
return True