from __future__ import division
import sys
import os
import logging
import time
import numpy as np
from math import log
from collections import deque
from random import random,randrange
from . import parameter
from .proposal import DefaultProposalCycle
from . import proposal
from .cpnest import CheckPoint, RunManager
from tqdm import tqdm
from operator import attrgetter
import numpy.lib.recfunctions as rfn
import array
from .nest2pos import acl
import pickle
__checkpoint_flag = False
[docs]class Sampler(object):
"""
Sampler class.
---------
Initialisation arguments:
args:
model: :obj:`cpnest.Model` user defined model to sample
maxmcmc:
:int: maximum number of mcmc steps to be used in the :obj:`cnest.sampler.Sampler`
----------
kwargs:
verbose:
:int: display debug information on screen
Default: 0
poolsize:
:int: number of objects for the affine invariant sampling
Default: 1000
seed:
:int: random seed to initialise the pseudo-random chain
Default: None
proposal:
:obj:`cpnest.proposals.Proposal` to use
Defaults: :obj:`cpnest.proposals.DefaultProposalCycle`)
resume_file:
File for checkpointing
Default: None
manager:
:obj:`multiprocessing.Manager` hosting all communication objects
Default: None
"""
def __init__(self,
model,
maxmcmc,
seed = None,
output = None,
verbose = False,
sample_prior = False,
poolsize = 1000,
proposal = None,
resume_file = None,
manager = None):
self.seed = seed
self.model = model
self.initial_mcmc = maxmcmc//10
self.maxmcmc = maxmcmc
self.resume_file = resume_file
self.manager = manager
self.logLmin = self.manager.logLmin
self.logLmax = self.manager.logLmax
self.logger = logging.getLogger('cpnest.sampler.Sampler')
if proposal is None:
self.proposal = DefaultProposalCycle()
else:
self.proposal = proposal
self.Nmcmc = self.initial_mcmc
self.Nmcmc_exact = float(self.initial_mcmc)
self.poolsize = poolsize
self.evolution_points = deque(maxlen = self.poolsize)
self.verbose = verbose
self.acceptance = 0.0
self.sub_acceptance = 0.0
self.mcmc_accepted = 0
self.mcmc_counter = 0
self.initialised = False
self.output = output
self.sample_prior = sample_prior
self.samples = deque(maxlen = None if self.verbose >=3 else 5*self.maxmcmc) # the list of samples from the mcmc chain
self.producer_pipe, self.thread_id = self.manager.connect_producer()
self.last_checkpoint_time = time.time()
[docs] def reset(self):
"""
Initialise the sampler by generating :int:`poolsize` `cpnest.parameter.LivePoint`
and distributing them according to :obj:`cpnest.model.Model.log_prior`
"""
np.random.seed(seed=self.seed)
for n in tqdm(range(self.poolsize), desc='SMPLR {} init draw'.format(self.thread_id),
disable= not self.verbose, position=self.thread_id, leave=False):
while True: # Generate an in-bounds sample
p = self.model.new_point()
p.logP = self.model.log_prior(p)
if np.isfinite(p.logP): break
p.logL=self.model.log_likelihood(p)
if p.logL is None or not np.isfinite(p.logL):
self.logger.warning("Received non-finite logL value {0} with parameters {1}".format(str(p.logL), str(p)))
self.logger.warning("You may want to check your likelihood function to improve sampling")
self.evolution_points.append(p)
self.proposal.set_ensemble(self.evolution_points)
# initialise the structure to store the mcmc chain
self.samples = []
# Now, run evolution so samples are drawn from actual prior
for k in tqdm(range(self.poolsize), desc='SMPLR {} init evolve'.format(self.thread_id),
disable= not self.verbose, position=self.thread_id, leave=False):
_, p = next(self.yield_sample(-np.inf))
self.estimate_nmcmc_on_the_fly()
if self.sample_prior is True or self.verbose>=3:
# save the poolsize as prior samples
prior_samples = []
for k in tqdm(range(self.maxmcmc), desc='SMPLR {} generating prior samples'.format(self.thread_id),
disable= not self.verbose, position=self.thread_id, leave=False):
_, p = next(self.yield_sample(-np.inf))
prior_samples.append(p)
prior_samples = rfn.stack_arrays([prior_samples[j].asnparray()
for j in range(0,len(prior_samples))],usemask=False)
np.savetxt(os.path.join(self.output,'prior_samples_%s.dat'%os.getpid()),
prior_samples.ravel(),header=' '.join(prior_samples.dtype.names),
newline='\n',delimiter=' ')
self.logger.critical("Sampler process {0!s}: saved {1:d} prior samples in {2!s}".format(os.getpid(),self.maxmcmc,'prior_samples_%s.dat'%os.getpid()))
self.prior_samples = prior_samples
self.proposal.set_ensemble(self.evolution_points)
self.initialised=True
[docs] def estimate_nmcmc_on_the_fly(self, safety=5, tau=None):
"""
Estimate autocorrelation length of chain using acceptance fraction
ACL = (2/acc) - 1
multiplied by a safety margin of 5
Uses moving average with decay time tau iterations
(default: :int:`self.poolsize`)
Taken from http://github.com/farr/Ensemble.jl
"""
if tau is None: tau = self.poolsize/safety
if self.sub_acceptance == 0.0:
self.Nmcmc_exact = (1.0 + 1.0/tau)*self.Nmcmc_exact
else:
self.Nmcmc_exact = (1.0 - 1.0/tau)*self.Nmcmc_exact + (safety/tau)*(2.0/self.sub_acceptance - 1.0)
self.Nmcmc_exact = float(min(self.Nmcmc_exact,self.maxmcmc))
self.Nmcmc = max(safety,int(self.Nmcmc_exact))
return self.Nmcmc
[docs] def estimate_nmcmc(self, safety=20):
"""
Estimate autocorrelation length of the chain
"""
# first of all, build a numpy array out of
# the stored samples
ACL = []
samples = np.array([x.values for x in self.samples[-5*self.maxmcmc:]])
# compute the ACL on 5 times the maxmcmc set of samples
ACL = [acl(samples[:,i]) for i in range(samples.shape[1])]
if self.verbose >= 3:
for i in range(len(self.model.names)):
self.logger.info("Sampler {0} -- ACL({1}) = {2}".format(os.getpid(),self.model.names[i],ACL[i]))
self.Nmcmc = int(np.max(ACL))
if self.Nmcmc < 1:
self.Nmcmc = 1
if self.Nmcmc < safety:
self.Nmcmc = safety
return self.Nmcmc
[docs] def produce_sample(self):
try:
self._produce_sample()
except CheckPoint:
self.logger.critical("Checkpoint excepted in sampler")
self.checkpoint()
def _produce_sample(self):
"""
main loop that takes the worst :obj:`cpnest.parameter.LivePoint` and
evolves it. Proposed sample is then sent back
to :obj:`cpnest.NestedSampler`.
"""
if not self.initialised:
self.reset()
self.counter=1
__checkpoint_flag=False
while True:
if self.manager.checkpoint_flag.value:
self.checkpoint()
sys.exit(130)
if self.logLmin.value==np.inf:
break
if time.time() - self.last_checkpoint_time > self.manager.periodic_checkpoint_interval:
self.checkpoint()
self.last_checkpoint_time = time.time()
# if the nested sampler is requesting for an update
# produce a sample for it
if self.producer_pipe.poll():
p = self.producer_pipe.recv()
if p is None:
break
if p == "checkpoint":
self.checkpoint()
sys.exit(130)
self.evolution_points.append(p)
(Nmcmc, outParam) = next(self.yield_sample(self.logLmin.value))
# Send the sample to the Nested Sampler
self.producer_pipe.send((self.acceptance,self.sub_acceptance,Nmcmc,outParam))
# otherwise, keep on sampling from the previous boundary
else:
_, _ = next(self.yield_sample(self.logLmin.value))
# Update the ensemble every now and again
if (self.counter%(self.poolsize//4))==0:
self.proposal.set_ensemble(self.evolution_points)
self.estimate_nmcmc()
self.counter += 1
self.logger.critical("Sampler process {0!s}: MCMC samples accumulated = {1:d}".format(os.getpid(),len(self.samples)))
# self.samples.extend(self.evolution_points)
if self.verbose >=3 and not(self.sample_prior):
self.mcmc_samples = rfn.stack_arrays([self.samples[j].asnparray()
for j in range(0,len(self.samples))],usemask=False)
np.savetxt(os.path.join(self.output,'mcmc_chain_%s.dat'%os.getpid()),
self.mcmc_samples.ravel(),header=' '.join(self.mcmc_samples.dtype.names),
newline='\n',delimiter=' ')
self.logger.critical("Sampler process {0!s}: saved {1:d} mcmc samples in {2!s}".format(os.getpid(),len(self.samples),'mcmc_chain_%s.dat'%os.getpid()))
self.logger.critical("Sampler process {0!s} - mean acceptance {1:.3f}: exiting".format(os.getpid(), float(self.mcmc_accepted)/float(self.mcmc_counter)))
return 0
[docs] def checkpoint(self):
"""
Checkpoint its internal state
"""
self.logger.info('Checkpointing Sampler')
with open(self.resume_file, "wb") as f:
pickle.dump(self, f)
[docs] @classmethod
def resume(cls, resume_file, manager, model):
"""
Resumes the interrupted state from a
checkpoint pickle file.
"""
with open(resume_file, "rb") as f:
obj = pickle.load(f)
obj.model = model
obj.manager = manager
obj.logLmin = obj.manager.logLmin
obj.logLmax = obj.manager.logLmax
obj.logger = logging.getLogger("cpnest.sample.Sampler")
obj.producer_pipe , obj.thread_id = obj.manager.connect_producer()
obj.logger.info('Resuming Sampler from ' + resume_file)
obj.last_checkpoint_time = time.time()
return obj
def __getstate__(self):
state = self.__dict__.copy()
# Remove the unpicklable entries.
del state['model']
del state['logLmin']
del state['logLmax']
del state['manager']
del state['producer_pipe']
del state['thread_id']
del state['logger']
return state
def __setstate__(self, state):
self.__dict__ = state
self.manager = None
[docs]class MetropolisHastingsSampler(Sampler):
"""
metropolis-hastings acceptance rule
for :obj:`cpnest.proposal.EnembleProposal`
"""
[docs] def yield_sample(self, logLmin):
while True:
sub_counter = 0
sub_accepted = 0
oldparam = self.evolution_points.popleft()
logp_old = self.model.log_prior(oldparam)
while True:
sub_counter += 1
newparam = self.proposal.get_sample(oldparam.copy())
newparam.logP = self.model.log_prior(newparam)
if newparam.logP-logp_old + self.proposal.log_J > log(random()):
newparam.logL = self.model.log_likelihood(newparam)
if newparam.logL > logLmin:
self.logLmax.value = max(self.logLmax.value, newparam.logL)
oldparam = newparam.copy()
logp_old = newparam.logP
sub_accepted+=1
# append the sample to the array of samples
self.samples.append(oldparam)
if (sub_counter >= self.Nmcmc and sub_accepted > 0 ) or sub_counter >= self.maxmcmc:
break
# Put sample back in the stack, unless that sample led to zero accepted points
self.evolution_points.append(oldparam)
self.sub_acceptance = float(sub_accepted)/float(sub_counter)
self.mcmc_accepted += sub_accepted
self.mcmc_counter += sub_counter
self.acceptance = float(self.mcmc_accepted)/float(self.mcmc_counter)
# Yield the new sample
yield (sub_counter, oldparam)
[docs]class HamiltonianMonteCarloSampler(Sampler):
"""
HamiltonianMonteCarlo acceptance rule
for :obj:`cpnest.proposal.HamiltonianProposal`
"""
[docs] def yield_sample(self, logLmin):
global_lmax = self.logLmax.value
while True:
sub_accepted = 0
sub_counter = 0
oldparam = self.evolution_points.pop()
while sub_accepted == 0:
sub_counter += 1
newparam = self.proposal.get_sample(oldparam.copy(), logLmin = logLmin)
if self.proposal.log_J > np.log(random()):
if newparam.logL > logLmin:
global_lmax = max(global_lmax, newparam.logL)
oldparam = newparam.copy()
sub_accepted += 1
# append the sample to the array of samples
self.samples.append(oldparam)
self.evolution_points.append(oldparam)
self.sub_acceptance = float(sub_accepted)/float(sub_counter)
self.mcmc_accepted += sub_accepted
self.mcmc_counter += sub_counter
self.acceptance = float(self.mcmc_accepted)/float(self.mcmc_counter)
self.logLmax.value = global_lmax
for p in self.proposal.proposals:
p.update_time_step(self.acceptance)
p.update_trajectory_length(self.Nmcmc)
#print(p.dt,p.L)
yield (sub_counter, oldparam)
[docs] def insert_sample(self, p):
# if we did not accept, inject a new particle in the system (gran-canonical) from the prior
# by picking one from the existing pool and giving it a random trajectory
k = np.random.randint(self.evolution_points.maxlen)
self.evolution_points.rotate(k)
p = self.evolution_points.pop()
self.evolution_points.append(p)
self.evolution_points.rotate(-k)
return self.proposal.get_sample(p.copy(),logLmin=p.logL)
[docs]class SliceSampler(Sampler):
"""
The Ensemble Slice sampler from Karamanis & Beutler
https://arxiv.org/pdf/2002.06212v1.pdf
"""
[docs] def reset(self):
"""
Initialise the sampler by generating :int:`poolsize` `cpnest.parameter.LivePoint`
"""
self.mu = 1.0
self.max_steps_out = self.maxmcmc # maximum stepping out steps allowed
self.max_slices = self.maxmcmc # maximum number of slices allowed
self.tuning_steps = 10*self.poolsize
super(SliceSampler, self).reset()
[docs] def adapt_length_scale(self):
"""
adapts the length scale of the expansion/contraction
following the rule in (Robbins and Monro, 1951) of Tibbits et al. (2014)
"""
Ne = max(1,self.Ne)
Nc = max(1,self.Nc)
ratio = Ne/(Ne+Nc)
self.mu *= 2*ratio
[docs] def reset_boundaries(self):
"""
resets the boundaries and counts
for the slicing
"""
self.L = - np.random.uniform(0.0,1.0)
self.R = self.L + 1.0
self.Ne = 0.0
self.Nc = 0.0
[docs] def increase_left_boundary(self):
"""
increase the left boundary and counts
by one unit
"""
self.L = self.L - 1.0
self.Ne = self.Ne + 1
[docs] def increase_right_boundary(self):
"""
increase the right boundary and counts
by one unit
"""
self.R = self.R + 1.0
self.Ne = self.Ne + 1
[docs] def yield_sample(self, logLmin):
while True:
sub_accepted = 0
sub_counter = 0
j = 0
while j < self.poolsize:
oldparam = self.evolution_points.popleft()
if oldparam.logL > logLmin:
break
self.evolution_points.append(oldparam)
j += 1
while True:
# Set Initial Interval Boundaries
self.reset_boundaries()
sub_counter += 1
direction_vector = self.proposal.get_direction(mu = self.mu)
if not(isinstance(direction_vector,parameter.LivePoint)):
direction_vector = parameter.LivePoint(oldparam.names,d=array.array('d',direction_vector.tolist()))
Y = logLmin
Yp = oldparam.logP-np.random.exponential()
J = np.floor(self.max_steps_out*np.random.uniform(0,1))
K = (self.max_steps_out-1)-J
# keep on expanding until we get outside the logL boundary from the left
# or the prior bound, whichever comes first
while J > 0:
parameter_left = direction_vector * self.L + oldparam
if self.model.in_bounds(parameter_left):
if Yp > self.model.log_prior(parameter_left):
break
else:
self.increase_left_boundary()
J -= 1
# if we get out of bounds, break out
else:
break
# keep on expanding until we get outside the logL boundary from the right
# or the prior bound, whichever comes first
while K > 0:
parameter_right = direction_vector * self.R + oldparam
if self.model.in_bounds(parameter_right):
if Yp > self.model.log_prior(parameter_right):
break
else:
self.increase_right_boundary()
K -= 1
# if we get out of bounds, break out
else:
break
# slice sample the likelihood-bound prior
# if the search interval has shrunk too much, break and start over
slice = 0
while slice < self.max_slices:
# generate a new point between the boundaries we identified
Xprime = np.random.uniform(self.L,self.R)
newparam = direction_vector * Xprime + oldparam
newparam.logP = self.model.log_prior(newparam)
if newparam.logP > Yp:
# compute the new value of logL
newparam.logL = self.model.log_likelihood(newparam)
if newparam.logL > Y:
self.logLmax.value = max(self.logLmax.value, newparam.logL)
oldparam = newparam.copy()
sub_accepted += 1
break
# adapt the intervals shrinking them
if Xprime < 0.0:
self.L = Xprime
self.Nc = self.Nc + 1
elif Xprime > 0.0:
self.R = Xprime
self.Nc = self.Nc + 1
slice += 1
if sub_counter > self.Nmcmc and sub_accepted > 0:
break
if sub_counter > self.maxmcmc:
break
self.evolution_points.append(oldparam)
self.samples.append(oldparam)
self.sub_acceptance = float(sub_accepted)/float(sub_counter)
self.mcmc_accepted += sub_accepted
self.mcmc_counter += sub_counter
self.acceptance = float(self.mcmc_accepted)/float(self.mcmc_counter)
if self.mcmc_counter < self.tuning_steps:
self.adapt_length_scale()
yield (sub_counter, oldparam)