Parallelisation with nessai

One benefit of the proposal method used in nessai is it allows for simple parallelisation of the likelihood evaluation since new live points are drawn in batches and then stored. The likelihood can therefore be precomputed and stored later use.

Enabling parallelisation

Likelihood parallelisation can be enabled in nessai by setting the keyword argument n_pool when calling FlowSampler. This determines the size of the multiprocessing pool to use for evaluating the likelihood.

Note

If running nessai via a job scheduler such as HTCondor, remember to set the number of requested CPUs accordingly.

Specifying a pool

Alternatively, nessai can use a user-defined pool. This is specified by setting the pool argument in NestedSampler or FlowSampler. Some variables must be initialised when creating the pool, this is done using initialise_pool_variables():

from multiprocessing import Pool
from nessai.utils.multiprocessing import initialise_pool_variables

model = GaussianModel()
pool = Pool(
    processes=2,
    initializer=initialise_pool_variables,
    initargs=(model,),
)

pool can then passed to the pool keyword argument when setting up the sampler.

Using ray

ray includes a distributed multiprocessing pool that can also be used with nessai. Simply import ray.util.multiprocessing.Pool instead of the standard pool and initialise using the method described above.

Other pools

When a pool object is passed to nessai it tries to determine how many processes the pool contains and (if the likelihood is vectorised) uses this information to determine the chunk size when evaluating the likelihood. If it can not determine this, then likelihood vectorisation will be disabled. This can be avoided by specifying n_pool when initialising the sampler.

PyTorch parallelisation

PyTorch supports different forms of parallelisation (see the PyTorch documentation for details). In nessai, the user can configured the number of threads used for intra-op parallelisation by specifying the pytorch_threads argument in FlowSampler. This value does not have to match the number of threads use for the multiprocessing pool. By default, it is set to 1 to avoid all available resources being used.

Note

Scaling with pytorch_threads can vary greatly between different systems and installations of PyTorch. We recommended testing different values before running large-scale analyses.

Example usage

#!/usr/bin/env python

"""
Example of parallelising the likelihood evaluation in nessai.

Shows the two methods supported in nessai: setting n_pool or using a
user-defined pool.
"""

import numpy as np
from multiprocessing import Pool

from nessai.flowsampler import FlowSampler
from nessai.model import Model
from nessai.utils import setup_logger
from nessai.utils.multiprocessing import initialise_pool_variables


output = "./outdir/parallelisation_example/"
logger = setup_logger(output=output)


# Generate the data
truth = {"mu": 1.7, "sigma": 0.7}
bounds = {"mu": [-3, 3], "sigma": [0.01, 3]}
n_points = 1000
data = np.random.normal(truth["mu"], truth["sigma"], size=n_points)


class GaussianLikelihood(Model):
    """
    Gaussian likelihood with the mean and standard deviation as the parameters
    to infer.

    Parameters
    ----------
    data : :obj:`numpy.ndarray`
        Array of data.
    bounds : dict
        The prior bounds.
    """

    def __init__(self, data, bounds):
        self.names = list(bounds.keys())
        self.bounds = bounds
        self.data = data

    def log_prior(self, x):
        """Uniform prior on both parameters."""
        log_p = np.log(self.in_bounds(x), dtype="float")
        for bounds in self.bounds.values():
            log_p -= np.log(bounds[1] - bounds[0])
        return log_p

    def log_likelihood(self, x):
        """Gaussian likelihood."""
        log_l = np.sum(
            -np.log(x["sigma"])
            - 0.5 * ((self.data - x["mu"]) / x["sigma"]) ** 2
        )
        return log_l


# Using n_pool
logger.warning("Running nessai with n_pool")
# Configure the sampler with 3 total threads, 2 of which are used for
# evaluating the likelihood.
fs = FlowSampler(
    GaussianLikelihood(data, bounds),
    output=output,
    resume=False,
    seed=1234,
    pytorch_threads=2,  # Allow pytorch to use 2 threads
    n_pool=2,  # Threads for evaluating the likelihood
)

# Run the sampler
fs.run()

# Using a user-defined pool
logger.warning("Running nessai with a user-defined pool")

# Must initialise the global variables for the pool prior to starting it
model = GaussianLikelihood(data, bounds)
initialise_pool_variables(model)
# Define the pool
pool = Pool(2)

fs = FlowSampler(
    model,
    output=output,
    resume=False,
    seed=1234,
    pool=pool,  # User-defined pool
)

# Run the sampler
# The pool will automatically be closed. This can be disabled by passing
# `close_pool=False` to the sampler.
fs.run()

See also