Parallelisation with nessai
One benefit of the proposal method used in nessai
is it allows for simple parallelisation of the likelihood evaluation since new live points are drawn in batches and then stored. The likelihood can therefore be precomputed and stored later use.
Enabling parallelisation
Likelihood parallelisation can be enabled in nessai
by setting the keyword argument n_pool
when calling FlowSampler
. This determines the size of the multiprocessing pool to use for evaluating the likelihood.
Note
If running nessai
via a job scheduler such as HTCondor, remember to set the number of requested CPUs accordingly.
Specifying a pool
Alternatively, nessai
can use a user-defined pool. This is specified by setting the pool
argument in NestedSampler
or FlowSampler
. Some variables must be initialised when creating the pool, this is done using initialise_pool_variables()
:
from multiprocessing import Pool
from nessai.utils.multiprocessing import initialise_pool_variables
model = GaussianModel()
pool = Pool(
processes=2,
initializer=initialise_pool_variables,
initargs=(model,),
)
pool
can then passed to the pool
keyword argument when setting up the sampler.
Using ray
ray
includes a distributed multiprocessing pool that can also be used with nessai
. Simply import ray.util.multiprocessing.Pool
instead of the standard pool and initialise using the method described above.
Other pools
When a pool object is passed to nessai
it tries to determine how many processes the pool contains and (if the likelihood is vectorised) uses this information to determine the chunk size when evaluating the likelihood. If it can not determine this, then likelihood vectorisation will be disabled. This can be avoided by specifying n_pool
when initialising the sampler.
PyTorch parallelisation
PyTorch supports different forms of parallelisation (see the PyTorch documentation for details).
In nessai
, the user can configured the number of threads used for intra-op parallelisation by specifying the pytorch_threads
argument in FlowSampler
.
This value does not have to match the number of threads use for the multiprocessing pool.
By default, it is set to 1 to avoid all available resources being used.
Note
Scaling with pytorch_threads
can vary greatly between different systems and installations of PyTorch. We recommended testing different values before running large-scale analyses.
Example usage
#!/usr/bin/env python
"""
Example of parallelising the likelihood evaluation in nessai.
Shows the two methods supported in nessai: setting n_pool or using a
user-defined pool.
"""
import numpy as np
from multiprocessing import Pool
from nessai.flowsampler import FlowSampler
from nessai.model import Model
from nessai.utils import setup_logger
from nessai.utils.multiprocessing import initialise_pool_variables
output = "./outdir/parallelisation_example/"
logger = setup_logger(output=output)
# Generate the data
truth = {"mu": 1.7, "sigma": 0.7}
bounds = {"mu": [-3, 3], "sigma": [0.01, 3]}
n_points = 1000
data = np.random.normal(truth["mu"], truth["sigma"], size=n_points)
class GaussianLikelihood(Model):
"""
Gaussian likelihood with the mean and standard deviation as the parameters
to infer.
Parameters
----------
data : :obj:`numpy.ndarray`
Array of data.
bounds : dict
The prior bounds.
"""
def __init__(self, data, bounds):
self.names = list(bounds.keys())
self.bounds = bounds
self.data = data
def log_prior(self, x):
"""Uniform prior on both parameters."""
log_p = np.log(self.in_bounds(x), dtype="float")
for bounds in self.bounds.values():
log_p -= np.log(bounds[1] - bounds[0])
return log_p
def log_likelihood(self, x):
"""Gaussian likelihood."""
log_l = np.sum(
-np.log(x["sigma"])
- 0.5 * ((self.data - x["mu"]) / x["sigma"]) ** 2
)
return log_l
# Using n_pool
logger.warning("Running nessai with n_pool")
# Configure the sampler with 3 total threads, 2 of which are used for
# evaluating the likelihood.
fs = FlowSampler(
GaussianLikelihood(data, bounds),
output=output,
resume=False,
seed=1234,
pytorch_threads=2, # Allow pytorch to use 2 threads
n_pool=2, # Threads for evaluating the likelihood
)
# Run the sampler
fs.run()
# Using a user-defined pool
logger.warning("Running nessai with a user-defined pool")
# Must initialise the global variables for the pool prior to starting it
model = GaussianLikelihood(data, bounds)
initialise_pool_variables(model)
# Define the pool
pool = Pool(2)
fs = FlowSampler(
model,
output=output,
resume=False,
seed=1234,
pool=pool, # User-defined pool
)
# Run the sampler
# The pool will automatically be closed. This can be disabled by passing
# `close_pool=False` to the sampler.
fs.run()