import numpy as np
from .particles import Particles
[docs]
class ModelSelectorParams(object):
""" Container to hold model selection parameters or hyperparameters
"""
def __init__(self, num_tracks, num_orbits, init_val=1.0, dtype=np.float64):
self.num_tracks = num_tracks
self.num_orbits = num_orbits
self.params = self._init_params(init_val, dtype=dtype)
def __repr__(self):
return '\n'.join(['\t'.join(['{:4.3f}'.format(cell) for cell in row]) for row in self.params])
def __getitem__(self, track_ndx):
return self.params[track_ndx, 0:np.min((track_ndx + 1, self.num_orbits))]
def __setitem__(self, track_ndx, val):
self.params[track_ndx, 0:np.min((track_ndx + 1, self.num_orbits))] = val
def _init_params(self, val=1.0, dtype=np.float64):
"""
Initialize parameters with the common value 'val'
"""
obj = np.ones((self.num_tracks, self.num_orbits),
dtype=dtype) * val
obj = np.tril(obj)
return obj
def normalize(self):
for track_ndx in range(self.num_tracks):
vals = self[track_ndx]
vals /= np.sum(vals)
self[track_ndx] = vals
[docs]
class BinarySelectorParams(ModelSelectorParams):
""" Special methods for the binary model selection variables
TODO: Use a masked array here?
"""
def __init__(self, num_tracks, num_orbits, init_value=1, dtype=np.float64):
super(BinarySelectorParams, self).__init__(num_tracks, num_orbits,
init_value, dtype)
def __repr__(self):
return '\n'.join(['\t'.join(['{:d}'.format(cell) for cell in row]) for row in self.params])
[docs]
def get_linked_track_indices(self, model_index):
"""
Select those tracks that are linked to the given model_index
"""
return np.where(self.params[:, model_index] == 1)[0]
[docs]
def get_unlinked_track_indices(self):
"""
Select those models that are not linked to any tracks
"""
return np.where(np.sum(self.params, axis=0) == 0)[0]
[docs]
class Linker(object):
""" Monte Carlo linking of uncorrelated orbit tracks
Parameters
----------
iods : list
List of Particles class instances summarizing initial orbit determinations (iods)
num_orbits : int, optional
Number of orbit models to fit. If not specified, set num_orbits = num_tracks
a_orbit : float, optional
Clustering hyperparameter in the Dirichlet prior on P_orbit (Default: 0.01)
Attributes
----------
iods : list
List of Particles class instances summarizing initial orbit determinations (iods)
num_tracks : int
Number of observed tracks. Equal to the length of the iods list.
num_orbits : int, optional
Number of orbit models to fit. If not specified, set num_orbits = num_tracks
orbit_selectors : BinarySelectorParams
The orbit selector parameters
p_orbit : ModelSelectorParams
The prior parameters on the orbit selectors
Methods
-------
sample_Porbit_conditional_dist(track_ndx)
Sample from the conditional distribution for p_orbit at track_index
sample_orbit_selectors_from_data_conditional(track_ndx)
Sample from the conditional distribution for orbit_selector at track_index
update_orbit_parameters()
Find the groups of linked tracks and update all particles across linked groups
update_params_using_carlin_chib()
Perform MCMC step using algorithm of Carlin & Chib (1995).
save_step(outfile_head)
Save MCMC step to file
"""
def __init__(self, iods, num_orbits=None, a_orbit=1.e-2):
self.iods = iods
self.num_tracks = len(iods)
if num_orbits is None:
num_orbits = self.num_tracks
self.num_orbits = num_orbits
self.orbit_selectors = BinarySelectorParams(self.num_tracks, self.num_orbits, 0, dtype=int)
self.p_orbit = ModelSelectorParams(self.num_tracks, self.num_orbits, 0.5)
self.p_orbit.normalize()
self._a_orbit = ModelSelectorParams(self.num_tracks, self.num_orbits, 1.e-2)
# Initialize all tracks to be assigned to the first orbit model
# (i.e., assume all tracks generated by one object)
for itrack in range(self.num_tracks):
self.orbit_selectors[itrack][0] = 1
def __repr__(self):
out = 'Linker(num_tracks={:d}, num_orbits={:d})\n'.format(self.num_tracks, self.num_orbits)
out += 'orbit_selectors:\n' + self.orbit_selectors.__repr__() + '\n'
out += 'p_orbit:\n' + self.p_orbit.__repr__() + '\n'
out += 'a_orbit:\n' + self._a_orbit.__repr__()
return out
[docs]
def sample_Porbit_conditional_dist(self, track_ndx):
"""
Sample from the conditional distribution for p_orbit at track_index
Parameters
----------
track_index : int
The observation list index
Returns
-------
p_orbit : array_like
Draw from the conditional distribution for the p.orbit parameters
"""
alpha = self._a_orbit[track_ndx] + self.orbit_selectors[track_ndx]
size = track_ndx + 1
p_orbit = np.random.dirichlet(alpha)
# return np.ones(track_ndx+1, dtype=np.float64) * 0.5
return p_orbit
[docs]
def sample_orbit_selectors_from_data_conditional(self, track_ndx, verbose=True):
"""
Sample from the conditional distribution for orbit_selector at track_index ignoring the
dependence orbit selector dependence of the conditional priors for the orbital elements.
Draw from :math:P(y | k, theta_k) P(k | P_orbit):
Parameters
----------
track_index : int
The observation list index
Returns
-------
orbit_selectors : array_like
Draw from the conditional distribution for the orbit_selectors parameters
"""
n = np.min((self.num_orbits, track_ndx+1))
# TODO: Is there a way to re-use likelihoods calculated in the orbit parameter update step?
lnlike = self.iods[track_ndx].lnlike
lnL = np.zeros(n, dtype=np.float64)
for j in range(n):
# FIXME: Add prior on orbit model parameters here?
theta = self.iods[j].draw_orbit()
lnL[j] = lnlike(theta)
if verbose:
print("lnL:", lnL)
if np.all(lnL < -500.): # Likelihoods are all small - reset to equal linking probabilities
prob_k = np.ones(n) * 0.5
else:
prob_k = np.exp(lnL + np.log(self.p_orbit[track_ndx]))
prob_k /= np.sum(prob_k)
if verbose:
print("prob_k:", prob_k)
return np.random.multinomial(n=1, pvals=prob_k)
[docs]
def update_orbit_parameters(self):
""" Find the groups of linked tracks and update all particles across linked groups
"""
for imodel in range(self.num_orbits):
track_ndx = self.orbit_selectors.get_linked_track_indices(imodel)
for i, indx in enumerate(track_ndx):
if indx != imodel:
self.iods[imodel].fuse(self.iods[indx])
# for j, jndx in enumerate(track_ndx):
# if i != j:
# self.pods[indx].fuse(self.pods[jndx])
# Reset iods to original particles if not linked to any tracks
model_ndx = self.orbit_selectors.get_unlinked_track_indices()
for indx in model_ndx:
self.iods[indx].reset_to_pseudo_prior()
return None
[docs]
def update_params_using_carlin_chib(self, verbose=True):
"""
Perform MCMC step using algorithm of Carlin & Chib (1995).
Sample the "active" orbit parameters from their conditional distribution
(in a Gibbs update), the "inactive" orbit parameters from the
pseudo-prior, and Gibbs updates for the orbit selectors and their prior
parameters. This method is distinct from the Reversible Jump MCMC
(RJMCMC) updates that have no need of the pseudo-prior at the expense of
finding a suitable proposal distribution for the model updates.
The Gibbs updates for each class of parameters in turn defines a blocking
procedure that can, in principle, give more efficient sampling than the
joint updates in RJMCMC if a good proposal distribution for the orbit
selectors is not known (see Godsill 2001).
Here the pseudo-prior for the "inactive" orbit parameters is not conjugate
to the orbit selector prior, so there is no direct method to perform Gibbs
samples of the orbit selectors. Instead, this function performs an MH
update of orbit selectors with a proposal from the distribution determined
by the product of the orbit likelihoods and the orbit selector prior (which
would be the orbit selector conditional distribution if the orbit priors were
independent of the orbit selectors).
Returns
-------
lnP : float
Current log-posterior value
References
----------
Carlin, B., Chib, S., 1995. Bayesian model choice via Markov chain
Monte Carlo methods. Journal of the Royal Statistical Society.
Series B (Method- ological) 57 (3), 473?484.
Godsill, S., 2001. On the relationship between Markov chain Monte Carlo
methods for model uncertainty. Journal of Computational and Graphical
Statistics 10 (2), 230?248.
"""
# ----- MH update of the orbit_selector parameters
# (This could be a Gibbs update, but the conditional pseudo-prior is not
# conjugate)
#
# First propose orbit selectors from p(y | theta_k, k) p(k | P_orbit)
#
for itrack in range(self.num_tracks):
self.orbit_selectors[itrack] = self.sample_orbit_selectors_from_data_conditional(itrack, verbose=verbose)
if verbose:
print("orbit_selectors:")
print(self.orbit_selectors)
# Now accept with MH probability
# TODO: This step may not be necessary?
#
# ----- Gibbs update of P_orbit
#
for itrack in range(self.num_tracks):
self.p_orbit[itrack] = self.sample_Porbit_conditional_dist(itrack)
if verbose:
print("p_orbit:")
print(self.p_orbit)
#
# ----- Re-weight / resample orbit model parameter particles given
# current track linkages.
#
self.update_orbit_parameters()
if verbose:
print("orbit parameters:")
for p in self.iods:
print(p)
# TODO: Return the current log-posterior value
return None
[docs]
def save_step(self, outfile_head):
""" Save MCMC step to file
Parameters
----------
outfile_head : str
Head (i.e., first part) of the output file name for chain steps
"""
# Save orbit selector parameters
orbit_selector_file = outfile_head + "_orbit_selectors.txt"
np.savetxt(orbit_selector_file, self.orbit_selectors.params)
# Save P_orbit parameters
p_orbit_file = outfile_head + "_p_orbit.txt"
np.savetxt(p_orbit_file, self.p_orbit.params)
# Save orbit parameter particles
for i, iod in enumerate(self.iods):
particles_file = outfile_head + "_particles_{:d}.txt".format(i)
np.savetxt(particles_file, np.column_stack((iod.particles,
iod.ln_wts)))
[docs]
def sample(self, nStep=100, outfile_head="linker"):
"""
Run the top-level MCMC to link tracks and refine orbit parameters
Parameters
----------
nStep : int
Number of MCMC steps
outfile_head : str, optional
Head (i.e., first part) of the output file name for chain steps
"""
for istep in range(nStep):
self.update_params_using_carlin_chib()
# self.save_step(outfile_head)
return None