from abc import ABCMeta, abstractmethod
import numpy as np
from sklearn import ensemble
from abcpy.graphtools import *
[docs]class ModelSelections(metaclass = ABCMeta):
"""This abstract base class defines a model selection rule of how to choose a model from a set of models
given an observation.
"""
[docs] @abstractmethod
def __init__(self, model_array, statistics_calc, backend, seed = None):
"""Constructor that must be overwritten by the sub-class.
The constructor of a sub-class must accept an array of models to choose the model
from, and two non-optional parameters statistics calculator and backend stored in self.statistics_calc
and self.backend defining how to calculate sumarry statistics from data and what kind of parallelization
to use.
Parameters
----------
model_array: list
A list of models which are of type abcpy.probabilisticmodels
statistics: abcpy.statistics.Statistics
Statistics object that conforms to the Statistics class.
backend: abcpy.backends.Backend
Backend object that conforms to the Backend class.
seed: integer, optional
Optional initial seed for the random number generator. The default value is generated randomly.
"""
self.model_array = model_array
self.statistics_calc = statistics_calc
self.backend = backend
self.rng = np.random.RandomState(seed)
self.reference_table_calculated = None
raise NotImplementedError
def __getstate__(self):
state = self.__dict__.copy()
del state['backend']
return state
[docs] @abstractmethod
def select_model(self, observations, n_samples = 1000, n_samples_per_param = 100):
"""To be overwritten by any sub-class: returns a model selected by the modelselection
procedure most suitable to the obersved data set observations. Further two optional integer arguments n_samples
and n_samples_per_param is supplied denoting the number of samples in the refernce table and the data points in
each simulated data set.
Parameters
----------
observations: python list
The observed data set.
n_samples : integer, optional
Number of samples to generate for reference table.
n_samples_per_param : integer, optional
Number of data points in each simulated data set.
Returns
-------
abcpy.probabilisticmodels
A model which are of type abcpy.probabilisticmodels
"""
raise NotImplementedError
[docs] @abstractmethod
def posterior_probability(self, observations):
"""To be overwritten by any sub-class: returns the approximate posterior probability
of the chosen model given the observed data set observations.
Parameters
----------
observations: python list
The observed data set.
Returns
-------
np.ndarray
A vector containing the approximate posterior probability of the model chosen.
"""
raise NotImplementedError
[docs]class RandomForest(ModelSelections, GraphTools):
"""
This class implements the model selection procedure based on the Random Forest ensemble learner
as described in Pudlo et. al. [1].
[1] Pudlo, P., Marin, J.-M., Estoup, A., Cornuet, J.-M., Gautier, M. and Robert, C.
(2016). Reliable ABC model choice via random forests. Bioinformatics, 32 859–866.
"""
[docs] def __init__(self, model_array, statistics_calc, backend, N_tree = 100, n_try_fraction = 0.5, seed = None):
"""
Parameters
----------
N_tree : integer, optional
Number of trees in the random forest. The default value is 100.
n_try_fraction : float, optional
The fraction of number of summary statistics to be considered as the size of
the number of covariates randomly sampled at each node by the randomised CART.
The default value is 0.5.
"""
self.model_array = model_array
self.statistics_calc = statistics_calc
self.backend = backend
self.rng = np.random.RandomState(seed)
self.seed = seed
self.n_samples_per_param = None
self.reference_table_calculated = 0
self.N_tree = N_tree
self.n_try_fraction = n_try_fraction
self.observations_bds = None
[docs] def select_model(self, observations, n_samples = 1000, n_samples_per_param = 1):
"""
Parameters
----------
observations: python list
The observed data set.
n_samples : integer, optional
Number of samples to generate for reference table. The default value is 1000.
n_samples_per_param : integer, optional
Number of data points in each simulated data set. The default value is 1.
Returns
-------
abcpy.probabilisticmodels
A model which are of type abcpy.probabilisticmodels
"""
self.n_samples_per_param = n_samples_per_param
self.observations_bds = self.backend.broadcast(observations)
# Creation of reference table
if self.reference_table_calculated is 0:
# Simulating the data, distance and statistics
seed_arr = self.rng.randint(1, n_samples*n_samples, size=n_samples, dtype=np.int32)
seed_pds = self.backend.parallelize(seed_arr)
model_data_pds = self.backend.map(self._simulate_model_data, seed_pds)
model_data = self.backend.collect(model_data_pds)
models, data, statistics = [list(t) for t in zip(*model_data)]
self.reference_table_models = models
self.reference_table_data = data
self.reference_table_statistics = np.concatenate(statistics)
self.reference_table_calculated = 1
# Construct a label for the model_array
label = np.zeros(shape=(len(self.reference_table_models)))
for ind1 in range(len(self.reference_table_models)):
for ind2 in range(len(self.model_array)):
if self.reference_table_models[ind1] == self.model_array[ind2]:
label[ind1] = ind2
# Define the classifier
classifier = ensemble.RandomForestClassifier(n_estimators = self.N_tree, \
max_features=int(self.n_try_fraction*self.reference_table_statistics.shape[1]), bootstrap=True, random_state=self.seed)
classifier.fit(self.reference_table_statistics, label)
return(self.model_array[int(classifier.predict(self.statistics_calc.statistics(observations)))])
[docs] def posterior_probability(self, observations, n_samples = 1000, n_samples_per_param = 1):
"""
Parameters
----------
observations: python list
The observed data set.
Returns
-------
np.ndarray
A vector containing the approximate posterior probability of the model chosen.
"""
self.n_samples_per_param = 1
self.observations_bds = self.backend.broadcast(observations)
# Creation of reference table
if self.reference_table_calculated is 0:
# Simulating the data, distance and statistics
seed_arr = self.rng.randint(1, n_samples*n_samples, size=n_samples, dtype=np.int32)
seed_pds = self.backend.parallelize(seed_arr)
model_data_pds = self.backend.map(self._simulate_model_data, seed_pds)
model_data = self.backend.collect(model_data_pds)
models, data, statistics = [list(t) for t in zip(*model_data)]
self.reference_table_models = models
self.reference_table_data = data
self.reference_table_statistics = np.concatenate(statistics)
self.reference_table_calculated = 1
# Construct a label for the model_array
label = np.zeros(shape=(len(self.reference_table_models)))
for ind1 in range(len(self.reference_table_models)):
for ind2 in range(len(self.model_array)):
if self.reference_table_models[ind1] == self.model_array[ind2]:
label[ind1] = ind2
# Define the classifier
classifier = ensemble.RandomForestClassifier(n_estimators = self.N_tree, \
max_features=int(self.n_try_fraction*self.reference_table_statistics.shape[1]), bootstrap=True, random_state=self.seed)
classifier.fit(self.reference_table_statistics, label)
pred_error = np.zeros(len(self.reference_table_models),)
# Compute missclassification error rate
for ind in range(len(self.reference_table_models)):
pred_error[ind] = 1 - classifier.predict_proba(self.statistics_calc.statistics(self.reference_table_data[ind]))[0][int(label[ind])]
# Estimate a regression function with prediction error as response on summary statitistics of the reference table
regressor = ensemble.RandomForestRegressor(n_estimators = self.N_tree)
regressor.fit(self.reference_table_statistics,pred_error)
return(1-regressor.predict(self.statistics_calc.statistics(observations)))
def _simulate_model_data(self, seed):
"""
Samples a single model parameter and simulates from it until
distance between simulated outcome and the observation is
smaller than eplison.
Parameters
----------
seed: int
value of a seed to be used for reseeding
Returns
-------
np.array
accepted parameter
"""
# reseed random number genearator
rng = np.random.RandomState(seed)
len_model_array = len(self.model_array)
model = self.model_array[int(sum(np.linspace(0, len_model_array - 1, len_model_array) \
* rng.multinomial(1, (1 / len_model_array) * np.ones(len_model_array))))]
self.sample_from_prior([model], rng=rng)
y_sim = model.forward_simulate(model.get_input_values(), self.n_samples_per_param, rng=rng)
while(y_sim[0] is False):
y_sim = model.forward_simulate(model.get_input_values() ,self.n_samples_per_param, rng=rng)
y_sim = y_sim[0].tolist()
statistics = self.statistics_calc.statistics(y_sim)
return (model, y_sim, statistics)