import copy
import pickle
import warnings
import matplotlib.pyplot as plt
import numpy as np
from scipy.stats import gaussian_kde
from abcpy.acceptedparametersmanager import AcceptedParametersManager
from abcpy.graphtools import GraphTools
from abcpy.utils import wass_dist
[docs]class Journal:
"""The journal holds information created by the run of inference schemes.
It can be configured to even hold intermediate.
Attributes
----------
accepted_parameters : list
List of lists containing posterior samples
names_and_parameters : list
List of dictionaries containing posterior samples with parameter names as keys
accepted_simulations : list
List of lists containing simulations corresponding to posterior samples (this could be empty if the sampling
routine does not store those)
accepted_cov_mats : list
List of lists containing covariance matrices from accepted posterior samples (this could be empty if
the sampling routine does not store those)
weights : list
List containing posterior weights
ESS : list
List containing the Effective Sample Size (ESS) at each iteration
distances : list
List containing the ABC distance at each iteration
configuration : Python dictionary
dictionary containing the schemes configuration parameters
"""
[docs] def __init__(self, type):
"""
Initializes a new output journal of given type.
Parameters
----------
type: int (identifying type)
type=0 only logs final parametersa and weight (production use);
type=1 logs all generated information (reproducibily use).
"""
self.accepted_parameters = []
self.names_and_parameters = []
self.accepted_simulations = []
self.accepted_cov_mats = []
self.weights = []
self.ESS = []
self.distances = []
self.configuration = {}
if type not in [0, 1]:
raise ValueError("Parameter type has not valid value.")
else:
self._type = type
self.number_of_simulations = []
[docs] @classmethod
def fromFile(cls, filename):
"""This method reads a saved journal from disk an returns it as an object.
Notes
-----
To store a journal use Journal.save(filename).
Parameters
----------
filename: string
The string representing the location of a file
Returns
-------
abcpy.output.Journal
The journal object serialized in <filename>
Example
--------
>>> jnl = Journal.fromFile('example_output.jnl')
"""
with open(filename, 'rb') as input:
journal = pickle.load(input)
return journal
[docs] def add_user_parameters(self, names_and_params):
"""
Saves the provided parameters and names of the probabilistic models corresponding to them. If type==0, old
parameters get overwritten.
Parameters
----------
names_and_params: list
Each entry is a tuple, where the first entry is the name of the probabilistic model, and the second entry is
the parameters associated with this model.
"""
if self._type == 0:
self.names_and_parameters = [dict(names_and_params)]
else:
self.names_and_parameters.append(dict(names_and_params))
[docs] def add_accepted_parameters(self, accepted_parameters):
"""
FIX THIS!
Saves provided accepted parameters by appending them to the journal. If type==0, old accepted parameters get
overwritten.
Parameters
----------
accepted_parameters: list
"""
if self._type == 0:
self.accepted_parameters = [accepted_parameters]
if self._type == 1:
self.accepted_parameters.append(accepted_parameters)
[docs] def add_accepted_simulations(self, accepted_simulations):
"""
Saves provided accepted simulations by appending them to the journal. If type==0, old accepted simulations get
overwritten.
Parameters
----------
accepted_simulations: list
"""
if self._type == 0:
self.accepted_simulations = [accepted_simulations]
if self._type == 1:
self.accepted_simulations.append(accepted_simulations)
[docs] def add_accepted_cov_mats(self, accepted_cov_mats):
"""
Saves provided accepted cov_mats by appending them to the journal. If type==0, old accepted cov_mats get
overwritten.
Parameters
----------
accepted_cov_mats: list
"""
if self._type == 0:
self.accepted_cov_mats = [accepted_cov_mats]
if self._type == 1:
self.accepted_cov_mats.append(accepted_cov_mats)
[docs] def add_weights(self, weights):
"""
Saves provided weights by appending them to the journal. If type==0, old weights get overwritten.
Parameters
----------
weights: numpy.array
vector containing n weigths
"""
if self._type == 0:
self.weights = [weights]
if self._type == 1:
self.weights.append(weights)
[docs] def add_distances(self, distances):
"""
Saves provided distances by appending them to the journal. If type==0, old weights get overwritten.
Parameters
----------
distances: numpy.array
vector containing n distances
"""
if self._type == 0:
self.distances = [distances]
if self._type == 1:
self.distances.append(distances)
[docs] def add_ESS_estimate(self, weights):
"""
Computes and saves Effective Sample Size (ESS) estimate starting from provided weights; ESS is estimated as sum
the inverse of sum of squared normalized weights. The provided weights are normalized before computing ESS.
If type==0, old ESS estimate gets overwritten.
Parameters
----------
weights: numpy.array
vector containing n weigths
"""
# normalize weights:
normalized_weights = weights / np.sum(weights)
ESS = 1 / sum(sum(pow(normalized_weights, 2)))
if self._type == 0:
self.ESS = [ESS]
if self._type == 1:
self.ESS.append(ESS)
[docs] def save(self, filename):
"""
Stores the journal to disk.
Parameters
----------
filename: string
the location of the file to store the current object to.
"""
with open(filename, 'wb') as output:
pickle.dump(self, output, -1)
[docs] def get_parameters(self, iteration=None):
"""
Returns the parameters from a sampling scheme.
For intermediate results, pass the iteration.
Parameters
----------
iteration: int
specify the iteration for which to return parameters
Returns
-------
names_and_parameters: dictionary
Samples from the specified iteration (last, if not specified) returned as a disctionary with names of the
random variables
"""
if iteration is None:
endp = len(self.names_and_parameters) - 1
params = self.names_and_parameters[endp]
return params
else:
return self.names_and_parameters[iteration]
[docs] def get_accepted_parameters(self, iteration=None):
"""
Returns the accepted parameters from a sampling scheme.
For intermediate results, pass the iteration.
Parameters
----------
iteration: int
specify the iteration for which to return parameters
Returns
-------
accepted_parameters: list
List containing samples from the specified iteration (last, if not specified)
"""
if iteration is None:
return self.accepted_parameters[-1]
else:
return self.accepted_parameters[iteration]
[docs] def get_accepted_simulations(self, iteration=None):
"""
Returns the accepted simulations from a sampling scheme. Notice not all sampling schemes store those in the
Journal, so this may return None.
For intermediate results, pass the iteration.
Parameters
----------
iteration: int
specify the iteration for which to return accepted simulations
Returns
-------
accepted_simulations: list
List containing simulations corresponding to accepted samples from the specified
iteration (last, if not specified)
"""
if iteration is None:
if len(self.accepted_simulations) == 0:
return None
return self.accepted_simulations[-1]
else:
return self.accepted_simulations[iteration]
[docs] def get_accepted_cov_mats(self, iteration=None):
"""
Returns the accepted cov_mats used in a sampling scheme. Notice not all sampling schemes store those in the
Journal, so this may return None.
For intermediate results, pass the iteration.
Parameters
----------
iteration: int
specify the iteration for which to return accepted cov_mats
Returns
-------
accepted_cov_mats: list
List containing accepted cov_mats from the specified
iteration (last, if not specified)
"""
if iteration is None:
if len(self.accepted_cov_mats) == 0:
return None
return self.accepted_cov_mats[-1]
else:
return self.accepted_cov_mats[iteration]
[docs] def get_weights(self, iteration=None):
"""
Returns the weights from a sampling scheme.
For intermediate results, pass the iteration.
Parameters
----------
iteration: int
specify the iteration for which to return weights
"""
if iteration is None:
end = len(self.weights) - 1
return self.weights[end]
else:
return self.weights[iteration]
[docs] def get_distances(self, iteration=None):
"""
Returns the distances from a sampling scheme.
For intermediate results, pass the iteration.
Parameters
----------
iteration: int
specify the iteration for which to return distances
"""
if iteration is None:
end = len(self.distances) - 1
return self.distances[end]
else:
return self.distances[iteration]
[docs] def get_ESS_estimates(self, iteration=None):
"""
Returns the estimate of Effective Sample Size (ESS) from a sampling scheme.
For intermediate results, pass the iteration.
Parameters
----------
iteration: int
specify the iteration for which to return ESS
"""
if iteration is None:
iteration = -1
return self.ESS[iteration]
[docs] def posterior_mean(self, iteration=None):
"""
Computes posterior mean from the samples drawn from posterior distribution
For intermediate results, pass the iteration.
Parameters
----------
iteration: int
specify the iteration for which to return posterior mean
Returns
-------
posterior mean: dictionary
Posterior mean from the specified iteration (last, if not specified) returned as a disctionary with names of
the random variables
"""
if iteration is None:
endp = len(self.names_and_parameters) - 1
params = self.names_and_parameters[endp]
weights = self.weights[endp]
else:
params = self.names_and_parameters[iteration]
weights = self.weights[iteration]
return_value = []
for keyind in params.keys():
return_value.append((keyind,
np.average(np.array(params[keyind]).squeeze(), weights=weights.reshape(len(weights), ),
axis=0)))
return dict(return_value)
[docs] def posterior_cov(self, iteration=None):
"""
Computes posterior covariance from the samples drawn from posterior distribution
Returns
-------
np.ndarray
posterior covariance
dic
order of the variables in the covariance matrix
"""
if iteration is None:
endp = len(self.names_and_parameters) - 1
params = self.names_and_parameters[endp]
weights = self.weights[endp]
else:
params = self.names_and_parameters[iteration]
weights = self.weights[iteration]
joined_params = []
for keyind in params.keys():
joined_params.append(np.array(params[keyind]).squeeze(axis=1))
return np.cov(np.transpose(np.hstack(joined_params)), aweights=weights.reshape(len(weights), )), params.keys()
[docs] def posterior_histogram(self, iteration=None, n_bins=10):
"""
Computes a weighted histogram of multivariate posterior samples
and returns histogram H and a list of p arrays describing the bin
edges for each dimension.
Returns
-------
python list
containing two elements (H = np.ndarray, edges = list of p arrays)
"""
if iteration is None:
endp = len(self.names_and_parameters) - 1
params = self.names_and_parameters[endp]
weights = self.weights[endp]
else:
params = self.names_and_parameters[iteration]
weights = self.weights[iteration]
joined_params = []
for keyind in params.keys():
joined_params.append(np.array(params[keyind]).squeeze(axis=1))
H, edges = np.histogramdd(np.hstack(joined_params), bins=n_bins, weights=weights.reshape(len(weights), ))
return [H, edges]
# TODO this does not work for multidimensional parameters and discrete distributions
[docs] def plot_posterior_distr(self, parameters_to_show=None, ranges_parameters=None, iteration=None, show_samples=None,
single_marginals_only=False, double_marginals_only=False, write_posterior_mean=True,
show_posterior_mean=True, true_parameter_values=None, contour_levels=14, figsize=None,
show_density_values=True, bw_method=None, path_to_save=None):
"""
Produces a visualization of the posterior distribution of the parameters of the model.
A Gaussian kernel density estimate (KDE) is used to approximate the density starting from the sampled
parameters. Specifically, it produces a scatterplot matrix, where the diagonal contains single parameter
marginals, while the off diagonal elements contain the contourplot for the paired marginals for each possible
pair of parameters.
This visualization is not satisfactory for parameters that take on discrete values, specially in the case where
the number of values it can assume are small, as it obtains the posterior by KDE in this case as well. We need
to improve on that, considering histograms.
Parameters
----------
parameters_to_show : list, optional
a list of the parameters for which you want to plot the posterior distribution. For each parameter, you need
to provide the name string as it was defined in the model. For instance,
`jrnl.plot_posterior_distr(parameters_to_show=["mu"])` will only plot the posterior distribution for the
parameter named "mu" in the list of parameters.
If `None`, then all parameters will be displayed.
ranges_parameters : Python dictionary, optional
a dictionary in which you can optionally provide the plotting range for the parameters that you chose to
display. You can use this even if `parameters_to_show=None`. The dictionary key is the name of parameter,
and the range needs to be an array-like of the form [lower_limit, upper_limit]. For instance:
{"theta" : [0,2]} specifies that you want to plot the posterior distribution for the parameter "theta" in
the range [0,2].
iteration : int, optional
specify the iteration for which to plot the posterior distribution, in the case of a sequential algorithm.
If `None`, then the last iteration will be used.
show_samples : boolean, optional
specifies if you want to show the posterior samples overimposed to the contourplots of the posterior
distribution. If `None`, the default behaviour is the following: if the posterior samples are associated
with importance weights, then the samples are not showed (in fact, the KDE for the posterior distribution
takes into account the weights, and showing the samples may be misleading). Otherwise, if the posterior
samples are not associated with weights, they are displayed by default.
single_marginals_only : boolean, optional
if `True`, the method does not show the paired marginals but only the single parameter marginals;
otherwise, it shows the paired marginals as well. Default to `False`.
double_marginals_only : boolean, optional
if `True`, the method shows the contour plot for the marginal posterior for each possible pair of parameters
in the parameters that have to be shown (all parameters of the model if `parameters_to_show` is None).
Default to `False`.
write_posterior_mean : boolean, optional
Whether to write or not the posterior mean on the single marginal plots. Default to `True`.
show_posterior_mean: boolean, optional
Whether to display a line corresponding to the posterior mean value in the plot. Default to `True`.
true_parameter_values: array-like, optional
you can provide here the true values of the parameters, if known, and that will be displayed in the
posterior plot. It has to be an array-like of the same length of `parameters_to_show` (if that is provided),
otherwise of length equal to the number of parameters in the model, and with entries corresponding to the
true value of that parameter (in case `parameters_to_show` is not provided, the order of the parameters is
the same order the model `forward_simulate` step takes.
contour_levels: integer, optional
The number of levels to be used in the contour plots. Default to 14.
figsize: float, optional
Denotes the size (in inches) of the smaller dimension of the plot; the other dimension is automatically
determined. If None, then figsize is chosen automatically. Default to `None`.
show_density_values: boolean, optional
If `True`, the method displays the value of the density at each contour level in the contour plot. Default
to `True`.
bw_method: str, scalar or callable, optional
The parameter of the `scipy.stats.gaussian_kde` defining the method used to calculate the bandwith in the
Gaussian kernel density estimator. Please refer to the documentation therein for details. Default to `None`.
path_to_save : string, optional
if provided, save the figure in png format in the specified path.
Returns
-------
tuple
a tuple containing the matplotlib "fig, axes" objects defining the plot. Can be useful for further
modifications.
"""
# if you do not pass any name, then we plot all parameters.
# you can get the list of parameters from the journal file as:
if parameters_to_show is None:
parameters_to_show = list(self.names_and_parameters[0].keys())
else:
for param_name in parameters_to_show:
if param_name not in self.names_and_parameters[0].keys():
raise KeyError("Parameter '{}' is not a parameter of the model.".format(param_name))
if single_marginals_only and double_marginals_only:
raise RuntimeError("You cannot ask to produce only plots for single marginals and double marginals only at "
"the same time. Either or both of `single_marginal_only` or `double_marginal_only` have "
"to be False.")
if len(parameters_to_show) == 1 and double_marginals_only:
raise RuntimeError("It is not possible to plot a bivariate marginal if only one parameter is in the "
"`parameters_to_show` list")
if true_parameter_values is not None:
if len(true_parameter_values) != len(parameters_to_show):
raise RuntimeError("You need to provide values for all the parameters to be shown.")
meanpost = np.array([self.posterior_mean(iteration=iteration)[x] for x in parameters_to_show])
post_samples_dict = {name: np.concatenate(self.get_parameters(iteration)[name]) for name in parameters_to_show}
weights = np.concatenate(self.get_weights(iteration))
all_weights_are_1 = all([weights[i] == 1 for i in range(len(weights))])
if show_samples is None:
# by default, we display samples if the weights are all 1. Otherwise, we choose not to display them by
# default as they may be misleading
if all_weights_are_1:
show_samples = True
else:
show_samples = False
elif not all_weights_are_1 and show_samples is True:
# in this case, the user required to show the sample points but importance weights are present. Then warn
# the user about this
warnings.warn(
"You requested to show the sample points; however, note that the algorithm generated posterior samples "
"associated with weights, and the kernel density estimate used to produce the density plots consider "
"those. Therefore, the resulting plot may be misleading. ", RuntimeWarning)
data = np.hstack(list(post_samples_dict.values()))
datat = data.transpose()
# now set up the range of parameters. If the range for a given parameter is not given, use the min and max
# value of posterior samples.
if ranges_parameters is None:
ranges_parameters = {}
# check if the user provided range for some parameters that are not requested to be displayed:
if not all([key in post_samples_dict for key in ranges_parameters.keys()]):
warnings.warn("You provided range for a parameter that was not requested to be displayed (or it may be "
"that you misspelled something). This will be ignored.", RuntimeWarning)
for name in parameters_to_show:
if name in ranges_parameters:
# then check the format is correct
if isinstance(ranges_parameters[name], list):
if not (len(ranges_parameters[name]) == 2 and isinstance(ranges_parameters[name][0], (int, float))):
raise TypeError(
"The range for the parameter {} should be an array-like with two numbers.".format(name))
elif isinstance(ranges_parameters[name], np.ndarray):
if not (ranges_parameters[name].shape == 2 or ranges_parameters[name].shape == (2, 1)):
raise TypeError(
"The range for the parameter {} should be an array-like with two numbers.".format(name))
else:
# add as new range the min and max values. We add also a 5% of the whole range on both sides to have
# better visualization
difference = np.max(post_samples_dict[name]) - np.min(post_samples_dict[name])
ranges_parameters[name] = [np.min(post_samples_dict[name]) - 0.05 * difference,
np.max(post_samples_dict[name]) + 0.05 * difference]
def write_post_mean_function(ax, post_mean, size):
ax.text(0.15, 0.06, r"Post. mean = {:.2f}".format(post_mean), size=size,
transform=ax.transAxes)
def scatterplot_matrix(data, meanpost, names, single_marginals_only=False, **kwargs):
"""
Plots a scatterplot matrix of subplots. Each row of "data" is plotted
against other rows, resulting in a nrows by nrows grid of subplots with the
diagonal subplots labeled with "parameters_to_show". Additional keyword arguments are
passed on to matplotlib's "plot" command. Returns the matplotlib figure
object containg the subplot grid.
"""
if figsize is None:
figsize_actual = 4 * len(names)
else:
figsize_actual = figsize
numvars, numdata = data.shape
if single_marginals_only:
fig, axes = plt.subplots(nrows=1, ncols=numvars, figsize=(figsize_actual, figsize_actual / len(names)))
else:
fig, axes = plt.subplots(nrows=numvars, ncols=numvars, figsize=(figsize_actual, figsize_actual))
fig.subplots_adjust(hspace=0.08, wspace=0.08)
# if we have to plot 1 single parameter value, envelop the ax in an array, so that it gives no troubles:
if len(names) == 1:
if not single_marginals_only:
axes = np.array([[axes]])
else:
axes = np.array([axes])
if not single_marginals_only:
if len(names) > 1:
for ax in axes.flat:
# Hide all ticks and labels
ax.xaxis.set_visible(False)
ax.yaxis.set_visible(False)
# Set up ticks only on one side for the "edge" subplots...
if ax.is_first_col():
ax.yaxis.set_ticks_position('left')
if ax.is_last_col():
ax.yaxis.set_ticks_position('right')
if ax.is_first_row():
ax.xaxis.set_ticks_position('top')
if ax.is_last_row():
ax.xaxis.set_ticks_position('bottom')
# off diagonal plots:
for x in range(numvars):
for y in range(numvars):
if x is not y:
# this plots the posterior samples
if show_samples:
axes[x, y].plot(data[y], data[x], '.k', markersize='1')
xmin, xmax = ranges_parameters[names[y]]
ymin, ymax = ranges_parameters[names[x]]
# then you need to perform the KDE and plot the contour of posterior
X, Y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
positions = np.vstack([X.ravel(), Y.ravel()])
values = np.vstack([data[y].T, data[x].T])
kernel = gaussian_kde(values, weights=weights, bw_method=bw_method)
Z = np.reshape(kernel(positions).T, X.shape)
# axes[x, y].plot(meanpost[y], meanpost[x], 'r+', markersize='10')
if show_posterior_mean:
axes[x, y].plot([xmin, xmax], [meanpost[x], meanpost[x]], "red", markersize='20',
linestyle='solid')
axes[x, y].plot([meanpost[y], meanpost[y]], [ymin, ymax], "red", markersize='20',
linestyle='solid')
if true_parameter_values is not None:
axes[x, y].plot([xmin, xmax], [true_parameter_values[x], true_parameter_values[x]],
"green",
markersize='20', linestyle='dashed')
axes[x, y].plot([true_parameter_values[y], true_parameter_values[y]], [ymin, ymax],
"green",
markersize='20', linestyle='dashed')
CS = axes[x, y].contour(X, Y, Z, contour_levels, linestyles='solid')
if show_density_values:
axes[x, y].clabel(CS, fontsize=figsize_actual / len(names) * 2.25, inline=1)
axes[x, y].set_xlim([xmin, xmax])
axes[x, y].set_ylim([ymin, ymax])
# diagonal plots
if not single_marginals_only:
diagonal_axes = np.array([axes[i, i] for i in range(len(axes))])
else:
diagonal_axes = axes
label_size = figsize_actual / len(names) * 4
title_size = figsize_actual / len(names) * 4.25
post_mean_size = figsize_actual / len(names) * 4
ticks_size = figsize_actual / len(names) * 3
for i, label in enumerate(names):
xmin, xmax = ranges_parameters[names[i]]
positions = np.linspace(xmin, xmax, 100)
gaussian_kernel = gaussian_kde(data[i], weights=weights, bw_method=bw_method)
diagonal_axes[i].plot(positions, gaussian_kernel(positions), color='k', linestyle='solid', lw="1",
alpha=1, label="Density")
values = gaussian_kernel(positions)
# axes[i, i].plot([positions[np.argmax(values)], positions[np.argmax(values)]], [0, np.max(values)])
if show_posterior_mean:
diagonal_axes[i].plot([meanpost[i], meanpost[i]], [0, 1.1 * np.max(values)], "red", alpha=1,
label="Posterior mean")
if true_parameter_values is not None:
diagonal_axes[i].plot([true_parameter_values[i], true_parameter_values[i]],
[0, 1.1 * np.max(values)], "green",
alpha=1, label="True value")
if write_posterior_mean:
write_post_mean_function(diagonal_axes[i], meanpost[i], size=post_mean_size)
diagonal_axes[i].set_xlim([xmin, xmax])
diagonal_axes[i].set_ylim([0, 1.1 * np.max(values)])
# labels and ticks:
if not single_marginals_only:
for j, label in enumerate(names):
axes[0, j].set_title(label, size=title_size)
if len(names) > 1:
axes[j, 0].set_ylabel(label, size=label_size)
axes[-1, j].set_xlabel(label, size=label_size)
else:
axes[j, 0].set_ylabel("Density", size=label_size)
axes[j, 0].tick_params(axis='both', which='major', labelsize=ticks_size)
axes[j, 0].ticklabel_format(style='sci', axis='y', scilimits=(0, 0))
axes[j, 0].yaxis.offsetText.set_fontsize(ticks_size)
axes[j, 0].yaxis.set_visible(True)
axes[-1, j].tick_params(axis='both', which='major', labelsize=ticks_size)
axes[-1, j].ticklabel_format(style='sci', axis='x') # , scilimits=(0, 0))
axes[-1, j].xaxis.offsetText.set_fontsize(ticks_size)
axes[-1, j].xaxis.set_visible(True)
axes[j, -1].tick_params(axis='both', which='major', labelsize=ticks_size)
axes[j, -1].yaxis.offsetText.set_fontsize(ticks_size)
axes[j, -1].ticklabel_format(style='sci', axis='y', scilimits=(0, 0))
axes[j, -1].yaxis.set_visible(True)
else:
for j, label in enumerate(names):
axes[j].set_title(label, size=figsize_actual / len(names) * 4.25)
axes[0].set_ylabel("Density")
return fig, axes
def double_marginals_plot(data, meanpost, names, **kwargs):
"""
Plots contour plots of all possible pairs of samples. Additional keyword arguments are
passed on to matplotlib's "plot" command. Returns the matplotlib figure
object containg the subplot grid.
"""
numvars, numdata = data.shape
numplots = np.int(numvars * (numvars - 1) / 2)
if figsize is None:
figsize_actual = 4 * numplots
else:
figsize_actual = figsize
fig, axes = plt.subplots(nrows=1, ncols=numplots, figsize=(figsize_actual, figsize_actual / numplots))
if numplots == 1: # in this case you will only have one plot. Envelop it to avoid problems.
axes = [axes]
# if we have to plot 1 single parameter value, envelop the ax in an array, so that it gives no troubles:
ax_counter = 0
for x in range(numvars):
for y in range(0, x):
# this plots the posterior samples
if show_samples:
axes[ax_counter].plot(data[y], data[x], '.k', markersize='1')
xmin, xmax = ranges_parameters[names[y]]
ymin, ymax = ranges_parameters[names[x]]
# then you need to perform the KDE and plot the contour of posterior
X, Y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
positions = np.vstack([X.ravel(), Y.ravel()])
values = np.vstack([data[y].T, data[x].T])
kernel = gaussian_kde(values, weights=weights, bw_method=bw_method)
Z = np.reshape(kernel(positions).T, X.shape)
# axes[x, y].plot(meanpost[y], meanpost[x], 'r+', markersize='10')
if show_posterior_mean:
axes[ax_counter].plot([xmin, xmax], [meanpost[x], meanpost[x]], "red", markersize='20',
linestyle='solid')
axes[ax_counter].plot([meanpost[y], meanpost[y]], [ymin, ymax], "red", markersize='20',
linestyle='solid')
if true_parameter_values is not None:
axes[ax_counter].plot([xmin, xmax], [true_parameter_values[x], true_parameter_values[x]],
"green",
markersize='20', linestyle='dashed')
axes[ax_counter].plot([true_parameter_values[y], true_parameter_values[y]], [ymin, ymax],
"green",
markersize='20', linestyle='dashed')
CS = axes[ax_counter].contour(X, Y, Z, contour_levels, linestyles='solid')
if show_density_values:
axes[ax_counter].clabel(CS, fontsize=figsize_actual / numplots * 2.25, inline=1)
axes[ax_counter].set_xlim([xmin, xmax])
axes[ax_counter].set_ylim([ymin, ymax])
label_size = figsize_actual / numplots * 4
ticks_size = figsize_actual / numplots * 3
axes[ax_counter].set_ylabel(names[x], size=label_size)
axes[ax_counter].set_xlabel(names[y], size=label_size)
axes[ax_counter].tick_params(axis='both', which='major', labelsize=ticks_size)
axes[ax_counter].ticklabel_format(style='sci', axis='y', scilimits=(0, 0))
axes[ax_counter].yaxis.offsetText.set_fontsize(ticks_size)
axes[ax_counter].yaxis.set_visible(True)
axes[ax_counter].ticklabel_format(style='sci', axis='x', scilimits=(0, 0))
axes[ax_counter].yaxis.offsetText.set_fontsize(ticks_size)
axes[ax_counter].xaxis.set_visible(True)
ax_counter += 1
return fig, axes
if not double_marginals_only:
fig, axes = scatterplot_matrix(datat, meanpost, parameters_to_show,
single_marginals_only=single_marginals_only)
else:
fig, axes = double_marginals_plot(datat, meanpost, parameters_to_show)
if path_to_save is not None:
plt.savefig(path_to_save, bbox_inches="tight")
return fig, axes
[docs] def plot_ESS(self):
"""
Produces a plot showing the evolution of the estimated ESS (from sample weights) across iterations; it also
shows as a baseline the maximum possible ESS which can be achieved, corresponding to the case of independent
samples, which is equal to the total number of samples.
Returns
-------
tuple
a tuple containing the matplotlib "fig, ax" objects defining the plot. Can be useful for further
modifications.
"""
if self._type == 0:
raise RuntimeError("ESS plot is available only if the journal was created with full_output=1; otherwise, "
"ESS is saved only for the last iteration.")
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(6, 4))
ax.scatter(np.arange(len(self.ESS)) + 1, self.ESS, label="Estimated ESS")
ax.set_xlabel("Iteration")
ax.set_ylabel("ESS")
# put horizontal line at the largest value ESS can get:
ax.axhline(len(self.weights[-1]), label="Max theoretical value", ls="dashed")
ax.legend()
ax.set_xticks(np.arange(len(self.ESS)) + 1)
return fig, ax
[docs] def Wass_convergence_plot(self, num_iter_max=1e8, **kwargs):
"""
Computes the Wasserstein distance between the empirical distribution at subsequent iterations to see whether
the approximation of the posterior is converging. Then, it produces a plot displaying that. The approximation of
the posterior is converging if the Wass distance between subsequent iterations decreases with iteration and gets
close to 0, as that means there is no evolution of the posterior samples. The Wasserstein distance is estimated
using the POT library).
This method only works when the Journal stores results from all the iterations (ie it was generated with
full_output=1). Moreover, this only works when all the parameters in the model are univariate.
Parameters
----------
num_iter_max : integer, optional
The maximum number of iterations in the linear programming algorithm to estimate the Wasserstein distance.
Default to 1e8.
kwargs
Additional arguments passed to the wass_dist calculation function.
Returns
-------
tuple
a tuple containing the matplotlib "fig, ax" objects defining the plot and the list of the computed
Wasserstein distances. "fig" and "ax" can be useful for further modifying the plot.
"""
if self._type == 0:
raise RuntimeError("Wasserstein distance convergence test is available only if the journal was created with"
" full_output=1; in fact, this works by comparing the saved empirical distribution at "
"different iterations, and the latter is saved only if full_output=1.")
if len(self.weights) == 1:
raise RuntimeError("Only a set of posterior samples has been saved, corresponding to either running a "
"sequential algorithm for one iteration only or to using non-sequential algorithms (as"
"RejectionABC). Wasserstein distance convergence test requires at least samples from at "
"least 2 iterations.")
last_params = np.array(self.get_accepted_parameters())
if last_params.dtype == "object":
raise RuntimeError("This error was probably raised due to the parameters in your model having different "
"dimensions (and specifically not being univariate). For now, Wasserstein distance"
" convergence test is available only if the different parameters have the same "
"dimension.")
wass_dist_lists = [None] * (len(self.weights) - 1)
for i in range(len(self.weights) - 1):
params_1 = np.array(self.get_accepted_parameters(i))
params_2 = np.array(self.get_accepted_parameters(i + 1))
weights_1 = self.get_weights(i)
weights_2 = self.get_weights(i + 1)
if len(params_1.shape) == 1: # we assume that the dimension of parameters is 1
params_1 = params_1.reshape(-1, 1)
else:
params_1 = params_1.reshape(params_1.shape[0], -1)
if len(params_2.shape) == 1: # we assume that the dimension of parameters is 1
params_2 = params_2.reshape(-1, 1)
else:
params_2 = params_2.reshape(params_2.shape[0], -1)
if len(weights_1.shape) == 2: # it can be that the weights have shape (-1,1); reshape therefore
weights_1 = weights_1.reshape(-1)
if len(weights_2.shape) == 2: # it can be that the weights have shape (-1,1); reshape therefore
weights_2 = weights_2.reshape(-1)
wass_dist_lists[i] = wass_dist(samples_1=params_1, samples_2=params_2, weights_1=weights_1,
weights_2=weights_2, num_iter_max=num_iter_max, **kwargs)
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(6, 4))
ax.scatter(np.arange(len(self.weights) - 1) + 1, wass_dist_lists,
label="Estimated Wass. distance\nbetween iteration i and i+1")
ax.set_xlabel("Iteration")
ax.set_ylabel("Wasserstein distance")
ax.legend()
ax.set_xticks(np.arange(len(self.weights) - 1) + 1)
return fig, ax, wass_dist_lists
[docs] def traceplot(self, parameters_to_show=None, iteration=None, **kwargs):
"""
Produces a traceplot for the MCMC inference scheme. This only works for journal files which were created by the
`MCMCMetropolisHastings` inference scheme.
Parameters
----------
parameters_to_show : list, optional
a list of the parameters for which you want to plot the traceplot. For each parameter, you need
to provide the name string as it was defined in the model. For instance,
`jrnl.traceplot(parameters_to_show=["mu"])` will only plot the traceplot for the
parameter named "mu" in the list of parameters.
If `None`, then all parameters will be displayed.
iteration : int, optional
specify the iteration for which to plot the posterior distribution, in the case of a sequential algorithm.
If `None`, then the last iteration will be used.
kwargs
Additional arguments passed to matplotlib.pyplot.plot
Returns
-------
tuple
a tuple containing the matplotlib "fig, axes" objects defining the plot. Can be useful for further
modifications.
"""
if not "acceptance_rates" in self.configuration.keys():
raise RuntimeError("The traceplot can be produced only for journal files which were created by the"
" MCMCMetropolisHastings inference scheme")
if parameters_to_show is None:
parameters_to_show = list(self.names_and_parameters[0].keys())
else:
for param_name in parameters_to_show:
if param_name not in self.names_and_parameters[0].keys():
raise KeyError("Parameter '{}' is not a parameter of the model.".format(param_name))
param_dict = self.get_parameters(iteration)
n_plots = len(parameters_to_show)
fig, ax = plt.subplots(1, n_plots, figsize=(4 * n_plots, 4))
fig.suptitle("Traceplot")
if n_plots == 1:
ax = [ax]
for i, name in enumerate(parameters_to_show):
ax[i].plot(np.squeeze(param_dict[name]))
ax[i].set_title(name)
ax[i].set_xlabel("MCMC step")
return fig, ax
[docs] def resample(self, n_samples=None, replace=True, path_to_save_journal=None, seed=None):
"""
Helper method to resample (by bootstrapping or subsampling) the posterior samples stored in the Journal.
This can be used for instance to obtain an unweighted set of
posterior samples from a weighted one (via bootstrapping) or
to subsample a given number of posterior samples from a larger set. The new set of (unweighted)
samples are stored in a new journal which is returned by the method.
In order to bootstrap/subsample, the ``np.random.choice`` method is used, with the posterior sample
weights used as
probabilities (p) for resampling each sample. ``np.random.choice`` performs resampling with or without
replacement according to whether ``replace=True`` or ``replace=False``. Moreover, the parameter
``n_samples`` specifies the number of resampled samples
(the ``size`` argument of ``np.ranodom.choice``) and is set by
default to the number of samples in the journal). Therefore, different combinations of these
two parameters can be used to bootstrap or to subsample a set of posterior samples (see the examples below);
the default parameter values perform bootstrap.
Parameters
----------
n_samples: integer, optional
The number of posterior samples which you want to resample. Defaults to the number of posterior samples
currently stored in the Journal.
replace: boolean, optional
If True, sampling with replacement is performed; if False, sampling without replacement. Defaults to False.
path_to_save_journal: str, optional
If provided, save the journal with the resampled posterior samples at the provided path.
seed: integer, optional
Optional initial seed for the random number generator. The default value is generated randomly.
Returns
-------
abcpy.output.Journal
a journal containing the resampled posterior samples
Examples
--------
If ``journal`` contains a weighted set of posterior samples, the following returns an unweighted bootstrapped
set of posterior samples, stored in ``new_journal``:
>>> new_journal = journal.resample()
The above of course also works when the original posterior samples are unweighted.
If ``journal`` contains a here a large number of posterior sampling, you can subsample (without replacement)
a smaller number of them (say 100) with the following line (and store them in ``new_journal``):
>>> new_journal = journal.resample(n_samples=100, replace=False)
Notice that the above takes into account the weights in the original ``journal``.
"""
# instantiate the random number generator
rng = np.random.RandomState(seed)
# this extracts the parameters from the journal
accepted_parameters = self.get_accepted_parameters(-1)
accepted_weights = self.get_weights(-1)
n_samples_old = self.configuration["n_samples"]
normalized_weights = accepted_weights.reshape(-1) / np.sum(accepted_weights)
n_samples = n_samples_old if n_samples is None else n_samples
if n_samples > n_samples_old and not replace:
raise RuntimeError("You cannot draw without replacement a larger number of samples than the posterior "
"samples currently stored in the journal.")
# here you just need to bootstrap (or subsample):
bootstrapped_parameter_indices = rng.choice(np.arange(n_samples_old), size=n_samples, replace=replace,
p=normalized_weights)
bootstrapped_parameters = [accepted_parameters[index] for index in bootstrapped_parameter_indices]
# define new journal
journal_new = Journal(0)
journal_new.configuration["type_model"] = self.configuration["type_model"]
journal_new.configuration["n_samples"] = n_samples
# store bootstrapped parameters in new journal
journal_new.add_accepted_parameters(copy.deepcopy(bootstrapped_parameters))
journal_new.add_weights(np.ones((n_samples, 1)))
journal_new.add_ESS_estimate(np.ones((n_samples, 1)))
# the next piece of code build the list to be passed to add_user_parameter in order to build the dictionary;
# this mimics the behavior of what is done in the InferenceMethod's using the lines:
# `self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters)`
# `names_and_parameters = self._get_names_and_parameters()`
names_par_dicts = self.get_parameters()
par_names = list(names_par_dicts.keys())
new_names_par_list = []
start_index = 0
for name in par_names:
parameter_size = len(names_par_dicts[name][0]) # the size of that parameter
name_par_tuple = (
name, [bootstrapped_parameters[i][start_index:start_index + parameter_size] for i in range(n_samples)])
new_names_par_list.append(name_par_tuple)
start_index += parameter_size
journal_new.add_user_parameters(new_names_par_list)
journal_new.number_of_simulations.append(0)
if path_to_save_journal is not None: # save journal
journal_new.save(path_to_save_journal)
return journal_new
[docs]class GenerateFromJournal(GraphTools):
"""Helper class to generate simulations from a model starting from the parameter values stored in a Journal file.
Parameters
----------
root_models: list
A list of the Probabilistic models corresponding to the observed datasets
backend: abcpy.backends.Backend
Backend object defining the backend to be used.
seed: integer, optional
Optional initial seed for the random number generator. The default value is generated randomly.
discard_too_large_values: boolean
If set to True, the simulation is discarded (and repeated) if at least one element of it is too large
to fit in float32, which therefore may be converted to infinite value in numpy. Defaults to False.
Examples
--------
Simplest possible usage is:
>>> generate_from_journal = GenerateFromJournal([model], backend=backend)
>>> parameters, simulations, normalized_weights = generate_from_journal.generate(journal)
which takes the parameter values stored in journal and generated simulations from them. Notice how the method
returns (in this order) the parameter values used for the simulations, the simulations themselves and the
posterior weights associated to the parameters. All of these three objects are numpy arrays.
"""
[docs] def __init__(self, root_models, backend, seed=None, discard_too_large_values=False):
self.model = root_models
self.backend = backend
self.rng = np.random.RandomState(seed)
self.discard_too_large_values = discard_too_large_values
# An object managing the bds objects
self.accepted_parameters_manager = AcceptedParametersManager(self.model)
[docs] def generate(self, journal, n_samples_per_param=1, iteration=None):
"""
Method to generate simulations using parameter values stored in the provided Journal.
Parameters
----------
journal: abcpy.output.Journal
the Journal containing the parameter values from which to generate simulations from the model.
n_samples_per_param: integer, optional
Number of simulations for each parameter value. Defaults to 1.
iteration: integer, optional
specifies the iteration from which the parameter samples in the Journal are taken to generate simulations.
If None (default), it uses the last iteration.
Returns
-------
tuple
A tuple of numpy ndarray's containing the parameter values (first element, with shape n_samples x d_theta),
the generated
simulations (second element, with shape n_samples x n_samples_per_param x d_x, where d_x is the dimension of
each simulation) and the normalized weights attributed to each parameter value
(third element, with shape n_samples).
Examples
--------
Simplest possible usage is:
>>> generate_from_journal = GenerateFromJournal([model], backend=backend)
>>> parameters, simulations, normalized_weights = generate_from_journal.generate(journal)
which takes the parameter values stored in journal and generated simulations from them. Notice how the method
returns (in this order) the parameter values used for the simulations, the simulations themselves and the
posterior weights associated to the parameters. All of these three objects are numpy arrays.
"""
# check whether the model corresponds to the one for which the journal was generated
if journal.configuration["type_model"] != [type(model).__name__ for model in self.model]:
raise RuntimeError("You are not using the same model as the one with which the journal was generated.")
self.n_samples_per_param = n_samples_per_param
accepted_parameters = journal.get_accepted_parameters(iteration)
accepted_weights = journal.get_weights(iteration)
normalized_weights = accepted_weights.reshape(-1) / np.sum(accepted_weights)
n_samples = len(normalized_weights)
self.accepted_parameters_manager.broadcast(self.backend, [None])
# Broadcast Accepted parameters
self.accepted_parameters_manager.update_broadcast(self.backend, accepted_parameters=accepted_parameters)
seed_arr = self.rng.randint(0, np.iinfo(np.uint32).max, size=n_samples, dtype=np.uint32)
# no need to check if the seeds are the same here as they are assigned to different parameter values
rng_arr = np.array([np.random.RandomState(seed) for seed in seed_arr])
index_arr = np.arange(0, n_samples, 1)
data_arr = []
for i in range(len(rng_arr)):
data_arr.append([rng_arr[i], index_arr[i]])
data_pds = self.backend.parallelize(data_arr)
simulations_pds = self.backend.map(self._sample_parameter, data_pds)
simulations = self.backend.collect(simulations_pds)
parameters = np.array(accepted_parameters)
simulations = np.array(simulations)
parameters = parameters.reshape((parameters.shape[0], parameters.shape[1]))
simulations = simulations.reshape((simulations.shape[0], simulations.shape[2], simulations.shape[3],))
return parameters, simulations, normalized_weights
def _sample_parameter(self, data, npc=None):
"""
Simulates from a single model parameter.
Parameters
----------
data: list
A list containing a random numpy state and a parameter index, e.g. [rng, index]
Returns
-------
numpy.ndarray
The simulated dataset.
"""
if isinstance(data, np.ndarray):
data = data.tolist()
rng = data[0]
index = data[1]
parameter = self.accepted_parameters_manager.accepted_parameters_bds.value()[index]
ok_flag = False
while not ok_flag:
self.set_parameters(parameter)
y_sim = self.simulate(n_samples_per_param=self.n_samples_per_param, rng=rng, npc=npc)
# if there are no potential infinities there (or if we do not check for those).
# For instance, Lorenz model may give too large values sometimes (quite rarely).
if self.discard_too_large_values and np.sum(np.isinf(np.array(y_sim).astype("float32"))) > 0:
self.logger.warning("y_sim contained too large values for float32; simulating again.")
else:
ok_flag = True
return y_sim