import numpy as np
from astropy.time import Time
from .orbit import Orbit
from .propagator import KeplerianPropagator
from .compute import dircos, radec, radec, radecRateObsToRV, rvObsToRaDecRate
from .utils import norm, normed, normSq, cluster_emcee_walkers, unitAngle3
from .constants import RGEO, VGEO, WGS84_EARTH_MU
# Priors
[docs]
class RPrior:
"""Gaussian prior on distance from origin |r|.
Parameters
----------
rmean : float
Prior mean on |r| in meters.
rsigma : float
Prior standard deviation on |r| in meters.
Attributes
----------
rMean
rSigma
Methods
-------
__call__(orbit)
Returns log prior probability of given orbit.
"""
def __init__(self, rmean, rsigma):
self.rmean = rmean
self.rsigma = rsigma
[docs]
def __call__(self, orbit, distance=None, chi=False):
"""Return log prior probability of given orbit.
Parameters
----------
orbit : Orbit
The given orbit.
distance: float, optional
Distance between object and observer (m) [not used]
Returns
-------
logprior : float
The log of the prior probability for given orbit.
"""
chi0 = (norm(orbit.r) - self.rmean)/self.rsigma
res = chi0 if chi else -0.5*chi0**2
return res
[docs]
class VPrior:
"""Gaussian prior on velocity magnitude |v|.
Parameters
----------
vmean : float
Prior mean on |v| in meters per second.
vsigma : float
Prior standard deviation on |v| in meters per second.
Attributes
----------
vMean
vSigma
Methods
-------
__call__(orbit)
Returns log prior probability of given orbit.
"""
def __init__(self, vmean, vsigma):
self.vmean = vmean
self.vsigma = vsigma
[docs]
def __call__(self, orbit, distance=None, chi=False):
"""Return log prior probability of given orbit.
Parameters
----------
orbit : Orbit
The given orbit.
distance: float, optional
Distance between object and observer (m) [not used]
Returns
-------
logprior : float
The log of the prior probability for given orbit.
"""
chi0 = (norm(orbit.v) - self.vmean)/self.vsigma
res = chi0 if chi else -0.5*chi0**2
return res
[docs]
class APrior:
"""Gaussian prior on orbit semimajor axis a.
Parameters
----------
amean : float
Prior mean on a in meters.
asigma : float
Prior standard deviation on a in meters.
Attributes
----------
aMean
aSigma
Methods
-------
__call__(orbit)
Returns log prior probability of given orbit.
"""
def __init__(self, amean, asigma):
self.amean = amean
self.asigma = asigma
[docs]
def __call__(self, orbit, distance=None, chi=False):
"""Return log prior probability of given orbit.
Parameters
----------
orbit : Orbit
The given orbit.
distance: float, optional
Distance between object and observer (m) [not used]
Returns
-------
logprior : float
The log of the prior probability for given orbit.
"""
chi0 = (orbit.a - self.amean)/self.asigma
res = chi0 if chi else -0.5*chi0**2
return res
[docs]
class EPrior:
"""Gaussian prior on orbit eccentricity e.
Parameters
----------
emean : float
Prior mean on e.
esigma : float
Prior standard deviation on e.
Attributes
----------
eMean
eSigma
Methods
-------
__call__(orbit)
Returns log prior probability of given orbit.
"""
def __init__(self, emean, esigma):
self.emean = emean
self.esigma = esigma
[docs]
def __call__(self, orbit, distance=None, chi=False):
"""Return log prior probability of given orbit.
Parameters
----------
orbit : Orbit
The given orbit.
distance: float, optional
Distance between object and observer (m) [not used]
chi : bool
If true, return chi rather than ln(prob)
Returns
-------
logprior : float
The log of the prior probability for given orbit.
"""
chi0 = (orbit.e - self.emean)/self.esigma
res = chi0 if chi else -0.5*chi0**2
return res
[docs]
class EquinoctialExEyPrior:
"""Gaussian prior on equinoctial ex and ey, favoring ex = ey = 0.
Parameters
----------
sigma : float
standard deviation on ex and ey
Attributes
----------
sigma
Methods
-------
__call__(orbit)
Returns log prior probability of given orbit.
"""
def __init__(self, sigma):
self.sigma = sigma
[docs]
def __call__(self, orbit, distance=None, chi=False):
"""Return log prior probability of given orbit.
Parameters
----------
orbit : Orbit
The given orbit.
distance : float
Distance between object and observer (m)
chi : bool
If True, return chi corresponding to log probability, rather than
log probability.
Returns
-------
logprior : float
The log of the prior probability for given orbit.
"""
chi0 = np.array([orbit.ex / self.sigma, orbit.ey / self.sigma])
res = chi0 if chi else -0.5*chi0**2
return res
[docs]
class Log10AreaPrior:
"""Gaussian prior on log10(area) of object.
Parameters
----------
mean : float
mean of log10(area)
sigma : float
standard deviation of log10(area)
Attributes
----------
mean, sigma
Methods
-------
__call__(orbit)
Returns log prior probability of given orbit.
"""
def __init__(self, mean=-0.5, sigma=1.5):
self.mean = mean
self.sigma = sigma
[docs]
def __call__(self, orbit, distance=None, chi=False):
"""Return log prior probability of given orbit.
Parameters
----------
orbit : Orbit
The given orbit.
distance : float
Distance between object and observer (m)
chi : bool
If True, return chi corresponding to log probability, rather than
log probability.
Returns
-------
logprior : float
The log of the prior probability for given orbit.
"""
chi0 = (np.log10(orbit.propkw['area']) - self.mean) / self.sigma
res = chi0 if chi else -0.5*chi0**2
return res
[docs]
class AreaPrior:
"""Gaussian prior on log10(area) of object.
Parameters
----------
mean : float
mean of log10(area)
sigma : float
standard deviation of log10(area)
Attributes
----------
mean, sigma
Methods
-------
__call__(orbit)
Returns log prior probability of given orbit.
"""
def __init__(self, mean=0, sigma=2):
self.mean = mean
self.sigma = sigma
[docs]
def __call__(self, orbit, distance=None, chi=False):
"""Return log prior probability of given orbit.
Parameters
----------
orbit : Orbit
The given orbit.
distance : float
Distance between object and observer (m)
chi : bool
If True, return chi corresponding to log probability, rather than
log probability.
Returns
-------
logprior : float
The log of the prior probability for given orbit.
"""
chi0 = (orbit.propkw['area'] - self.mean) / self.sigma
res = chi0 if chi else -0.5*chi0**2
return res
# Stolen from emcee.utils Was deprecated in emcee version 3, so
# reimplementing here.
[docs]
def sample_ball(p0, std, nSample=1):
"""Produce a ball of walkers around an initial parameter value.
Parameters
----------
p0 : array_like, (d,)
Central parameter values.
std : array_like, (d,)
Axis-aligned standard deviations.
nSample : int, optional
The number of samples to produce.
Returns
-------
samples : array_like (nSample, d)
The samples.
"""
assert(len(p0) == len(std))
return np.vstack([p0 + std * np.random.normal(size=len(p0))
for i in range(nSample)])
[docs]
class GEOProjectionInitializer:
"""Initialize position and velocity samples by projecting an observed ra/dec
into the equatorial plane and assuming a GEO statationary orbital velocity.
Samples are dispersed from this given position and velocity using an
isotropic Gaussian distribution.
Parameters
----------
arc : QTable with columns for 'ra', 'dec', 'rStation_GCRF'.
The set of observations used to initialize samples. Note, only the
first row of the given QTable will be used.
rSigma : float, optional
Standard deviation of sample positions dispersion in meters.
vSigma : float, optional
Standard deviation of sample velocity dispersion in meters per second.
Attributes
----------
rSigma
vSigma
observation : QTable
First row of input arc.
Methods
-------
__call__(nSample)
Returns `nSample` initial samples.
"""
def __init__(self, arc, rSigma=0.1*RGEO, vSigma=0.1*VGEO):
self.observation = arc[0]
self.rSigma = rSigma
self.vSigma = vSigma
[docs]
def __call__(self, nSample):
"""Generate samples.
Parameters
----------
nSample : int
Number of samples to return.
Returns
-------
samples : array_like, (nSample, 6)
Generated samples. Columns are [x, y, z, vx, vy, vz].
"""
from astropy import units as u
# Only use the first observation
rStation = np.array(self.observation['rStation_GCRF'].to(u.m).value)
ra = self.observation['ra']
dec = self.observation['dec']
sdec, cdec = np.sin(dec), np.cos(dec)
sra, cra = np.sin(ra), np.cos(ra)
lineOfSight = np.array([cdec*cra, cdec*sra, sdec])
range = -rStation[2] / lineOfSight[2]
rGuess = rStation + lineOfSight * range
zhat = np.array([0., 0., 1.])
vGuess = VGEO*normed(np.cross(zhat, rGuess))
guess = np.hstack([rGuess, vGuess])
return sample_ball(guess, 3*[self.rSigma]+3*[self.vSigma], nSample)
[docs]
class DistanceProjectionInitializer:
"""Initialize position and velocity from two closely spaced (in time)
observations of ra/dec. The position is initialized by projecting along the
direction of the first observation a given slant range. The velocity is
then initialized assuming that the slant range derivative is zero, and that
the motion in between observations is inertial.
Parameters
----------
arc : QTable with columns for 'ra', 'dec', 'rStation_GCRF'.
The set of observations used to initialize samples. Note, only the
first row of the given QTable will be used.
rho : Distance in meters to project, measured from the position of the
observer (not the center of the Earth).
indices : list of indices indicating which two observations in `arc` to use
for initialization. Default: [0, 1]; i.e., use the first two
observations.
rSigma : float, optional
Standard deviation of sample positions dispersion in meters.
vSigma : float, optional
Standard deviation of sample velocity dispersion in meters per second.
Attributes
----------
observations : QTable
Indicated two rows of the input arc.
rho
rSigma
vSigma
Methods
-------
__call__(nSample)
Returns `nSample` initial samples.
"""
def __init__(
self, arc, rho, indices=[0, 1], rSigma=0.1*RGEO, vSigma=0.1*VGEO
):
self.observations = arc[indices]
self.rho = rho
self.rSigma = rSigma
self.vSigma = vSigma
[docs]
def __call__(self, nSample):
from astropy import units as u
rStation = self.observations['rStation_GCRF'].to(u.m).value
ra = self.observations['ra']
dec = self.observations['dec']
sra, cra = np.sin(ra), np.cos(ra)
sdec, cdec = np.sin(dec), np.cos(dec)
lineOfSight = np.array([cdec*cra, cdec*sra, sdec]).T
rGuess = rStation + lineOfSight * self.rho
dt = (self.observations['time'][1] - self.observations['time'][0])
dt = dt.to(u.s).value
vGuess = (rGuess[1] - rGuess[0])/dt
guess = np.hstack([rGuess[0], vGuess])
return sample_ball(guess, 3*[self.rSigma]+3*[self.vSigma], nSample)
[docs]
def circular_guess(arc, indices=[0, 1]):
"""Guess position and velocity from two closely spaced (in time)
observations of ra/dec.
The state is inferred by finding the circular orbit passing
through the two points, assuming the motion in between the
observations is inertial.
Parameters
----------
arc : QTable with columns for 'ra', 'dec', 'rStation_GCRF'.
The set of observations used to make the guess. Note, only indices
rows of the given QTable will be used.
indices : list of indices indicating which two observations in `arc` to use
for initialization. Default: [0, 1]; i.e., use the first two
observations.
Returns
-------
state: array_like, (6,)
state corresponding to orbit passing through points
"""
from astropy import units as u
usepm = 'pmra' in arc.dtype.names
if not usepm:
observations = arc[indices]
else:
observations = arc
rStation = observations['rStation_GCRF'].to(u.m).value
ra = observations['ra']
dec = observations['dec']
sra, cra = np.sin(ra), np.cos(ra)
sdec, cdec = np.sin(dec), np.cos(dec)
lineOfSighta = (np.array([cdec*cra, cdec*sra, sdec]).T)
if not usepm:
lineOfSight = normed(lineOfSighta[0] + lineOfSighta[1])
else:
lineOfSight = lineOfSighta[0]
zhat = np.array([0., 0., 1.])
rAlpha = normed(np.cross(zhat, lineOfSight))
rDelta = normed(np.cross(lineOfSight, rAlpha))
if not usepm:
dt = (observations['time'][1] - observations['time'][0])
epoch = observations['time'][0]+dt/2
dt = dt.to(u.s).value
if dt == 0:
dt = 1e-3
# something horribly wrong in this case;
# okay to have really bad guess?
muAlpha = ((((ra[1]-ra[0]).to(u.rad).value)+np.pi)
% (2*np.pi)-np.pi)*cdec[0].value/dt
muDelta = (dec[1]-dec[0]).to(u.rad).value/dt
vGround = (rStation[1]-rStation[0])/dt
rStation = 0.5*(rStation[0]+rStation[1])
else:
muAlpha = observations['pmra'][0].to(u.rad/u.s).value
muDelta = observations['pmdec'][0].to(u.rad/u.s).value
vGround = observations['vStation_GCRF'][0].to(u.m/u.s).value
epoch = observations['time'][0]
rStation = rStation[0]
vGroundSky = vGround-lineOfSight*np.dot(lineOfSight, vGround)
# gives the geocentric radial velocity as a function of range, assuming
# a circular orbit, so v^2/R = GM/R^2
# the zero of this is the distance at which the object would be on a
# circular orbit.
def vr2(r):
pos = rStation + r*lineOfSight
R = np.sqrt(np.sum(pos**2))
return WGS84_EARTH_MU/R - normSq(muAlpha*rAlpha*r + muDelta*rDelta*r +
vGroundSky)
def vR(r, ub=np.inf, verbose=False, square=False):
if r > ub:
sgn = -1
r = 2*ub - r
else:
sgn = 1
pos = rStation + r*lineOfSight
vrsquared = np.clip(vr2(r), 0, np.inf)
ret = np.dot(normed(pos),
(sgn*np.sqrt(vrsquared)*lineOfSight +
muAlpha*r*rAlpha +
muDelta*r*rDelta + vGroundSky))
return ret
from scipy.optimize import root, root_scalar, leastsq
from functools import partial
vr2near = vr2(0)
vr2far = vr2(1000*RGEO)
if np.sign(vr2near) == np.sign(vr2far):
# no circular solution.
# must be space observer?
# we should take a very close distance and return it as a best fit.
# e.g., 200 km (?), v_los = 0?
rRange = 200000
rGuess = rStation + rRange*lineOfSight
vGuess = muAlpha*rRange*rAlpha + muDelta*rRange*rDelta + vGroundSky
return np.concatenate([rGuess, vGuess]), epoch
resub = root_scalar(vr2, bracket=[0, 1000*RGEO])
sol, covar, result, mesg, ier = leastsq(
vR, resub.root-1000, args=(resub.root, False, True), full_output=True)
if sol > resub.root:
sgn = -1
sol = 2*resub.root - sol
else:
sgn = 1
if sol <= 200000:
rRange = 200000
rGuess = rStation + rRange*lineOfSight
vGuess = muAlpha*rRange*rAlpha + muDelta*rRange*rDelta + vGroundSky
return np.concatenate([rGuess, vGuess]), epoch
rRange = sol[0]
vlos = sgn*np.sqrt(vr2(rRange))
import ssapy.utils
ra, dec = ssapy.utils.unit_to_lb(np.array([lineOfSight]))
rGuess, vGuess = radecRateObsToRV(
ra[0], dec[0], rRange, muAlpha, muDelta, vlos, rStation, vGroundSky)
# ignores light-time correction, etc.
if np.any(~np.isfinite(rGuess+vGuess)):
import pdb
pdb.set_trace()
return np.concatenate([rGuess, vGuess]), epoch
[docs]
class GaussianRVInitializer:
"""Generate position and velocity samples as an isotropic Gaussian
distribution around an asserted position and velocity.
Parameters
----------
rMean : array_like (3,)
Mean of sample positions in meters.
vMean : array_like (3,)
Mean of sample velocities in meters per second.
rSigma : float, optional
Standard deviation of sample positions dispersion in meters.
vSigma : float, optional
Standard deviation of sample velocity dispersion in meters per second.
Attributes
----------
rMean
vMean
rSigma
vSigma
Methods
-------
__call__(nSample)
Returns `nSample` initial samples.
"""
def __init__(self, rMean, vMean, rSigma=0.1*RGEO, vSigma=0.1*VGEO):
self.rMean = rMean
self.rSigma = rSigma
self.vMean = vMean
self.vSigma = vSigma
[docs]
def __call__(self, nSample):
""" Generate initial samples.
Parameters
----------
nSample : int
Number of samples to return.
Returns
-------
samples : array_like, (nSample, 6)
Generated samples. Columns are [x, y, z, vx, vy, vz].
"""
guess = np.hstack([self.rMean, self.vMean])
return sample_ball(guess, 3*[self.rSigma]+3*[self.vSigma], nSample)
[docs]
class DirectInitializer:
"""Directly specify initial position and velocity samples. This can be
useful if restarting sampling after some previous sampling step has
completed, for instance, if adding new observations to a previously sampled
arc.
Parameters
----------
samples : array_like (nSample, 6)
Samples in position (in meters) and velocity (in meters per second).
Columns are [x, y, z, vx, vy, vz].
replace : bool, optional
Whether to sample with or without replacement.
Attributes
----------
samples
replace
Methods
-------
__call__(nSample)
Returns `nSample` initial samples.
"""
def __init__(self, samples, replace=False):
self.samples = samples
self._nAvailable = self.samples.shape[0]
self.replace = replace
[docs]
def __call__(self, nSample=50):
""" Return samples.
Parameters
----------
nSample : int
Number of samples to return.
Returns
-------
samples : array_like, (nSample, 6)
Generated samples. Columns are [x, y, z, vx, vy, vz].
Notes
-----
If self.replace=False and nSample is greater than the number of input
samples, then some samples will be duplicated.
"""
if not self.replace and nSample >= self._nAvailable:
nTile = int(nSample/self._nAvailable)
nDone = nTile*self._nAvailable
out = np.tile(self.samples, nTile)
if nSample > nDone:
w = np.random.choice(
self._nAvailable, size=nSample-nDone, replace=False
)
more = self.samples[w]
out = np.concatenate([out, more])
return out
else:
w = np.random.choice(
self._nAvailable, size=nSample, replace=self.replace
)
return self.samples[w]
[docs]
class RVProbability:
"""A class to manage MCMC sampling of orbits (parameterized by position and
velocity at a given epoch) given some angular observations.
Parameters
----------
arc : QTable with one row per observation and columns:
'ra', 'dec' : Angle
Observed angular position in ICRS (topocentric).
'rStation_GCRF' : Quantity
Position of observer in GCRF.
'sigma' : Quantity
Isotropic uncertainty in observed angular position.
'time' : float or astropy.time.Time
Time of observation. If float, then should correspond to GPS
seconds; i.e., seconds since 1980-01-06 00:00:00 UTC
The arc is a linked set of observations assumed to be of the same
object.
epoch : float or astropy.time.Time
The time at which the model parameters (position and velocity) are
specified. It probably makes sense to make this the same as one of the
observation times, but it's not required to be. If float, then should
correspond to GPS seconds; i.e., seconds since 1980-01-06 00:00:00 UTC
priors : list of priors, optional
A list of class instances similar to RPrior() or APrior() to apply
jointly to the posterior probability.
propagator : class, optional
The propagator class to use.
meanpm : bool
fit model to mean positions and proper motions rather than to
individual epochs along each streak.
damp : float
damp chi residuals with pseudo-Huber loss function. Default -1: do
not damp.
Attributes
----------
arc
epoch
priors
propagator
Methods
-------
lnlike(orbit)
Calculate log likelihood of observations given orbit.
lnprob(p)
Calculate log posterior probability of orbit parameters p given
observations (up to a constant). Parameters p are [x, y, z, vx, vy, vz]
at epoch.
__call__(p)
Alias for lnprob(p)
"""
def __init__(self, arc, epoch,
priors=[RPrior(RGEO, RGEO*0.2), APrior(RGEO, RGEO*0.2)],
propagator=KeplerianPropagator(),
meanpm=False, damp=-1):
from astropy import units as u
if isinstance(epoch, Time):
epoch = epoch.gps
self.arc = arc
self.epoch = epoch
self.priors = priors
self.propagator = propagator
# Convert Quantities to plain arrays now so conversions don't slow us
# down during sampling.
ra = self.arc['ra']
dec = self.arc['dec']
self.ra = ra.to(u.rad).value
self.dec = dec.to(u.rad).value
sdec, cdec = np.sin(dec).value, np.cos(dec).value
sra, cra = np.sin(ra).value, np.cos(ra).value
# line-of-sight calculation
self._los = np.vstack([cdec*cra, cdec*sra, sdec]).T
time = self.arc['time']
self._rStation = np.array(self.arc['rStation_GCRF'].to(u.m).value)
self._vStation = np.array(self.arc['vStation_GCRF'].to(u.m/u.s).value)
self.meanpm = meanpm
if meanpm:
self._raSigma = np.array(self.arc['dra'].to(u.rad).value)
self._decSigma = np.array(
self.arc['ddec'].to(u.rad).value)
self.pmra = np.array(
self.arc['pmra'].to(u.rad/u.s).value)
self.pmdec = np.array(
self.arc['pmdec'].to(u.rad/u.s).value)
self._pmraSigma = np.array(
self.arc['dpmra'].to(u.rad/u.s).value)
self._pmdecSigma = np.array(
self.arc['dpmdec'].to(u.rad/u.s).value)
else:
if 'dra' in self.arc.dtype.names:
self._raSigma = np.array(self.arc['dra'].to(u.rad).value)
self._decSigma = np.array(self.arc['ddec'].to(u.rad).value)
else:
self._raSigma = np.array(self.arc['sigma'].to(u.rad).value)
self._decSigma = np.array(self.arc['sigma'].to(u.rad).value)
if isinstance(time, Time):
time = time.gps
self._time = time
self.damp = damp
[docs]
def chi(self, orbit):
"""Calculate chi residuals for use with lmfit.
This is essentially the input to a chi^2 statistic, just without the
squaring and without the summing over observations. I.e., it's
[(data_i - model_i)/err_i for i in 0..nobs]
Parameters
----------
orbit : Orbit
Orbit in question. The model.
Returns
-------
chilike : array_like (nobs,)
chi residual array for likelihoods
chiprior : array_like (nprior,)
chi residual array for priors
"""
hitbadorbit = False
vmax = np.sqrt(2*WGS84_EARTH_MU/norm(orbit.r))
if norm(orbit.v) > vmax*0.999:
newv = np.array(orbit.v)*vmax/norm(orbit.v)*0.999
orbit = Orbit(r=orbit.r, v=newv, t=orbit.t, propkw=orbit.propkw)
hitbadorbit = True
if norm(orbit.r) < 1:
orbit = Orbit(r=[1, 0, 0], v=orbit.v, t=orbit.t, propkw=orbit.propkw)
hitbadorbit = True
if self.meanpm:
ra, dec, slant, pmra, pmdec, slantrate = (
radec(orbit, self._time, obsPos=self._rStation,
obsVel=self._vStation,
propagator=self.propagator, rate=True))
drarate = (pmra - self.pmra)/self._pmraSigma
ddecrate = (pmdec - self.pmdec)/self._pmdecSigma
else:
ra, dec, slant = radec(orbit, self._time, obsPos=self._rStation,
propagator=self.propagator)
dra = ((ra - self.ra + np.pi)%(2*np.pi))-np.pi
dra = np.cos(dec)*dra/self._raSigma
ddec = (dec-self.dec)/self._decSigma
if self.meanpm:
chi = np.concatenate([dra, drarate, ddec, ddecrate])
else:
chi = np.concatenate([dra, ddec])
if self.damp > 0:
chi = damper(chi, self.damp)
if self.priors is not None and len(self.priors) > 0:
chiprior = np.concatenate([
np.atleast_1d(prior(orbit, distance=slant, chi=True))
for prior in self.priors])
else:
chiprior = np.zeros(1)
if hitbadorbit:
chi += np.sign(chi)*1e5
return chi, chiprior
[docs]
def lnlike(self, orbit):
"""Calculate the log likelihood of the observations given an orbit.
Parameters
----------
orbit : Orbit
Orbit in question.
Returns
-------
lnlike : float
Log likelihood.
"""
return -0.5*np.sum(self.chi(orbit)[0]**2)
[docs]
def lnprob(self, p):
"""Calculate the log posterior probability of parameters p given
observations.
Parameters
----------
p : array (6,)
Sampling parameters. [x, y, z, vx, vy, vz] in meters and meters per
second. Understood to be the parameters at epoch.
Returns
-------
lnprob : float
Log posterior probability.
lnprior : float
Log prior probability.
"""
r = p[0:3]
v = p[3:6]
try:
orbit = Orbit(r, v, self.epoch)
except:
return -np.inf, -np.inf
chilike, chiprior = self.chi(orbit)
lnprior = np.sum(-0.5*chiprior**2)
out = np.sum(-0.5*chilike**2)
out += lnprior
if not np.isfinite(out):
return -np.inf, lnprior
return out, lnprior
[docs]
def lnprior(self, orbit):
"""Calculate the log prior probability of the observations given an
orbit.
Parameters
----------
orbit : Orbit
Orbit in question.
Returns
-------
lnprior : float
Log prior probability.
"""
return -0.5*np.sum(self.chi(orbit)[1]**2)
__call__ = lnprob
[docs]
class EmceeSampler:
"""A sampler built on the emcee package.
The emcee packages implements the Goodman and Weare (2010) affine-invariant
sampler. This is often an efficient sampler to use when selecting a
proposal distribution is not simple.
Parameters
----------
probfn : Callable
Callable that accepts sampling parameters p and returns posterior
probability.
initializer : Callable
Callable that accepts number of desired initial samples and returns
samples. Note, the initial samples should be (at least mostly) unique.
nWalker : int
Number of 'walkers' to use in the Goodman & Weare algorithm. This
should generally be at least 12 for the 6-dimensional problem.
Attributes
----------
probfn
initializer
nWalker
Methods
-------
sample(nBurn, nStep)
Generate samples, first discarding nBurn steps, and then keeping nStep
steps.
"""
def __init__(self, probfn, initializer, nWalker=50):
# Don't need emcee right here, but better to catch version
# incompatibility in ctor than down below.
import emcee
from distutils.version import LooseVersion
if LooseVersion(emcee.__version__) < LooseVersion("3.0"):
raise ValueError("emcee version at least 3.0rc2 required")
self.probfn = probfn
self.initializer = initializer
self.nWalker = nWalker
[docs]
def sample(self, nBurn=1000, nStep=500):
"""Generate samples.
Parameters
----------
nBurn : int
Number of initial steps to take but discard.
nStep : int
Number of subsequent steps to keep and return.
Returns
-------
chain : array (nStep, nWalker, 6)
Generated samples. Columns are [x, y, z, vx, vy, vz].
lnprob : array (nStep, nWalker)
The log posterior probabilities of the samples.
lnprior : array(nStep, nWalker)
The log prior values for each step.
"""
import emcee
from distutils.version import LooseVersion
if LooseVersion(emcee.__version__) < LooseVersion("3.0"):
raise ValueError("emcee version at least 3.0rc2 required")
# Type for emcee 'blobs' metadata
dtype = [("lnprior", float)]
walkers = self.initializer(self.nWalker)
sampler = emcee.EnsembleSampler(
self.nWalker, 6, self.probfn, blobs_dtype=dtype
)
pos, _, _, _ = sampler.run_mcmc(walkers, max(nBurn, 1))
sampler.reset()
# Could add a trim step here to cut out any huge outliers.
pos, _, _, _ = sampler.run_mcmc(pos, nStep)
blobs = sampler.get_blobs()
lnprior = blobs['lnprior']
return sampler.get_chain(), sampler.get_log_prob(), lnprior
[docs]
class MVNormalProposal:
"""A multivariate normal proposal distribution.
Params
------
cov : array_like(6, 6)
Covariance of multivariate normal distribution from which to sample.
The order of variables is [x, y, z, vx, vy, vz] in meters and meters per
second.
Attributes
----------
cov
Methods
-------
propose(p)
Generate a new proposal.
"""
def __init__(self, cov):
self.cov = cov
[docs]
def propose(self, p):
"""Generate a proposal.
Parameters
----------
p : array (6,)
Mean around which to generate a proposal.
Returns
-------
newp : array (6,)
Generated proposal.
"""
return np.random.multivariate_normal(p, self.cov)
[docs]
class RVSigmaProposal(MVNormalProposal):
"""Isotropic multivariate normal proposal distribution.
Params
------
rSigma : float
Isotropic standard deviation in position to use.
vSigma : float
Isotropic standard deviation in velocity to use.
Attributes
----------
cov
Methods
-------
propose(p)
Generate a new proposal.
"""
def __init__(self, rSigma, vSigma):
self.cov = np.diag([rSigma**2]*3+[vSigma**2]*3)
[docs]
class MHSampler:
"""Generate MCMC samples using a Metropolis-Hastings sampler.
Parameters
----------
probfn : Callable
Callable that accepts sampling parameters p and returns posterior
probability.
initializer : Callable
Callable that accepts number of desired initial samples and returns
samples. Note, the initial samples should be (at least mostly) unique.
proposer : Callable
Callable that accepts a "current" sample and returns a "proposed" sample
to either accept or reject at each step.
nChain : int
Number of independent chains to use.
Attributes
----------
probfn
initializer
proposer
nChain
chain : array_like (nStep, nChain, 6)
MCMC sample chain.
lnprob : array_like (nStep, nChain)
Log posterior probability of sample chain.
nAccept : int
Total number of accepted proposals.
nStep : int
Total number of proposal steps.
Methods
-------
reset()
Reset chains.
sample(nBurn, nStep)
Generate samples, first discarding nBurn steps, and then keeping nStep
steps.
"""
def __init__(self, probfn, initializer, proposer, nChain):
self.probfn = probfn
self.proposer = proposer
self.nChain = nChain
self._state = initializer(nChain)
tmp = [self.probfn(s) for s in self._state]
self._lnprob, self._lnprior = zip(*tmp)
self._lnprob = list(self._lnprob)
self._lnprior = list(self._lnprior)
self.reset()
[docs]
def reset(self):
"""Reset chain, including `chain`, `lnprob`, `nAccept`, and `nStep`
attributes.
"""
self.chain = []
self.lnprob = []
self.lnprior = []
self.nAccept = 0
self.nStep = 0
def _step(self):
for i in range(self.nChain):
curr, currPost, currPrior = (
self._state[i], self._lnprob[i], self._lnprior[i]
)
prop = self.proposer.propose(curr)
propPost, propPrior = self.probfn(prop)
if (propPost >= currPost
or np.exp(propPost-currPost) > np.random.uniform()
):
self.nAccept += 1
currPost = propPost
currPrior = propPrior
curr = prop
self._state[i], self._lnprob[i], self._lnprior[i] = (
curr, currPost, currPrior
)
self.nStep += self.nChain
self.chain.append(np.array(self._state))
self.lnprob.append(np.array(self._lnprob))
self.lnprior.append(np.array(self._lnprior))
@property
def acceptanceRatio(self):
"""Ratio of accepted to proposed steps.
"""
return self.nAccept/self.nStep
[docs]
def sample(self, nBurn=1000, nStep=500):
"""Generate samples.
Parameters
----------
nBurn : int
Number of initial steps to take but discard.
nStep : int
Number of subsequent steps to keep and return.
Returns
-------
chain : array (nStep, nChain, 6)
Generated samples. Columns are [x, y, z, vx, vy, vz].
lnprob : array (nStep, nChain)
The log posterior probabilities of the samples.
lnprior : array (nStep, nChain)
The log prior probabilities of the samples.
"""
for _ in range(nBurn):
self._step()
self.reset()
for _ in range(nStep):
self._step()
return (
np.array(self.chain),
np.array(self.lnprob),
np.array(self.lnprior))
[docs]
class LeastSquaresOptimizer:
"""Base class for LeastSquaresOptimizers"""
def __init__(self, probfn, initparam, translatorcls,
absstep=None, fracstep=None, **kw):
self.initparam = initparam
self.translator = translatorcls(self.initparam, probfn.epoch, **kw)
self.absstep = absstep
self.fracstep = fracstep
self.probfn = probfn
def resid(self, p):
orb = self.translator.param_to_orbit(p)
chi = np.concatenate(self.probfn.chi(orb))
return chi
[docs]
def optimize(self, **fit_kws):
"""Run the optimizer and return the resulting fit parameters.
Returns
-------
fit : (6,) array_like
Least-squares fit as [x, y, z, vx, vy, vz] in meters, and
meters/second.
"""
import scipy.optimize
par0 = self.translator.input_param_translation(self.initparam)
par0 = self.translator.optimizeparam(par0)
if 'maxfev' in fit_kws:
nmax = fit_kws.pop('maxfev')
fit_kws['max_nfev'] = nmax
self.result = scipy.optimize.least_squares(
self.resid, par0, **fit_kws)
orbit = self.translator.param_to_orbit(self.result.x)
vmax = np.sqrt(2*WGS84_EARTH_MU/norm(orbit.r))
if norm(orbit.v) > vmax*0.999:
orbit.v = np.array(orbit.v)*vmax/norm(orbit.v)*0.999
self.result.success = False
self.result.hithyperbolicorbit = True
jac = self.result.jac
u, s, vh = np.linalg.svd(jac)
covar = vh.T.dot(np.diag(s**(-2))).dot(vh)
self.result.covar = self.translator.output_covar_translation(covar)
self.result.residual = self.result.fun
fitp = self.translator.orbit_to_param(orbit)
return fitp
[docs]
class ParamOrbitTranslator():
"""Class for making parameters into Orbits and vice-versa."""
def __init__(self, initparam, epoch, fixed=None, orbitattr=None):
self.initParam = initparam
self.fixed = fixed
self.orbitattr = orbitattr if orbitattr is not None else []
self.epoch = epoch
def fullparam(self, p):
if self.fixed is not None:
return np.where(self.fixed, self.initparam, p)
return p
def optimizeparam(self, p):
if self.fixed is not None:
return np.array(p)[self.fixed]
else:
return p
def param_to_orbit(self):
raise NotImplementedError
def orbit_to_param(self):
raise NotImplementedError
def get_propkw_from_fullparam(self, fullparam):
propkw = dict()
for name, val in zip(self.orbitattr, fullparam[6:]):
if name.startswith('log10'):
newname = name[5:]
newval = 10.**val
else:
newname = name
newval = val
propkw[newname] = newval
return propkw
def get_propkw_from_orbit(self, orbit):
propkw = []
for k in self.orbitattr:
if k.startswith('log10'):
k = k[5:]
v = np.log10(orbit.propkw[k])
else:
v = orbit.propkw[k]
propkw.append(v)
return propkw
def input_param_translation(self, p):
return p
def output_covar_translation(self, covar):
return covar
[docs]
class ParamOrbitRV(ParamOrbitTranslator):
def __init__(self, *args, **kwargs):
super(ParamOrbitRV, self).__init__(*args, **kwargs)
def param_to_orbit(self, p):
fullparam = self.fullparam(p)
r = p[:3]
v = p[3:6]
propkw = self.get_propkw_from_fullparam(p)
orbit = Orbit(r=r, v=v, t=self.epoch, propkw=propkw)
return orbit
def orbit_to_param(self, orbit):
r = orbit.r
v = orbit.v
propkw = self.get_propkw_from_orbit(orbit)
return np.concatenate([r, v, propkw])
[docs]
class ParamOrbitEquinoctial(ParamOrbitTranslator):
def __init__(self, *args, **kwargs):
super(ParamOrbitEquinoctial, self).__init__(*args, **kwargs)
def input_param_translation(self, p):
p = np.array(p).copy()
p[0] /= 1e7
return p
def output_covar_translation(self, covar):
covar = np.array(covar).copy()
covar[:, 0] *= 1e7
covar[0, :] *= 1e7
return covar
def param_to_orbit(self, p):
fullparam = self.fullparam(p)
p = np.array(p).copy()
p[0] = np.clip(p[0], 1, np.inf)
e2 = np.hypot(p[3], p[4])
if e2 >= 0.999:
norm = 0.999 / e2
p[3] *= norm
p[4] *= norm
p[0] *= 1e7
propkw = self.get_propkw_from_fullparam(p)
orbit = Orbit.fromEquinoctialElements(*p[:6], t=self.epoch,
propkw=propkw)
return orbit
def orbit_to_param(self, orbit):
fullparam = list(orbit.equinoctialElements)
fullparam = fullparam + self.get_propkw_from_orbit(orbit)
return fullparam
[docs]
class ParamOrbitAngle(ParamOrbitTranslator):
def __init__(self, initparam, epoch, initObsPos, initObsVel, **kwargs):
super(ParamOrbitAngle, self).__init__(initparam, epoch, **kwargs)
self.initObsPos = initObsPos
self.initObsVel = initObsVel
def param_to_orbit(self, p):
fullparam = self.fullparam(p)
r, v = radecRateObsToRV(*p[:6], self.initObsPos, self.initObsVel)
propkw = self.get_propkw_from_fullparam(p)
orbit = Orbit(r, v, self.epoch, propkw=propkw)
return orbit
def orbit_to_param(self, orbit):
r, v = orbit.r, orbit.v
p = rvObsToRaDecRate(r, v, self.initObsPos, self.initObsVel)
propkw = self.get_propkw_from_orbit(orbit)
return list(p)+propkw
class LMOptimizerAngular:
def __init__(self, probfn, initGuess, initObsPos, initObsVel,
fracstep=[1e-7]*6,
absstep=[0.01/60/60, 0.01/60/60, 1,
0.01/60/60/10/100, 0.01/60/60/10/100,
1e-2],
orbitattr=None):
"""Optimizer that employs Levenberg-Marquardt least-squares fitting.
Instead of rv, works in angle/proper motion/range/range rate of initial obs.
Parameters
----------
probfn : RVProbability
The RVProbability object that has both an epoch attribute to use for
the orbit fitting model, and a chi method to use for the fit
evaluation.
initGuess : array_like (6,)
Initial guess. (ra, dec, range, pmra, pmdec, rangerate).
(rad, rad, m, rad/s, rad/s, m/s)
initObsPos : array_like (3,)
Position of observer at probfn.epoch.
initObsVel : array_like (3,)
Position of observer at probfn.epoch.
Attributes
----------
result : lmfit.MinimizerResult
Most recently run result object. Could be useful for inspecting
error estimates or success/failure conditions.
Methods
-------
optimize()
Return the optimized parameters list [r, v]
"""
import warnings
warnings.warn("This class is deprecated; use "
"ssapy.rvsampler.LeastSquaresOptimizer")
self.probfn = probfn
import lmfit
p = lmfit.Parameters()
p.add('ra', value=initGuess[0])
p.add('dec', value=initGuess[1])
p.add('range', value=initGuess[2])
p.add('pmra', value=initGuess[3])
p.add('pmdec', value=initGuess[4])
p.add('rangerate', value=initGuess[5])
self.p = p
self.absstep = absstep
self.fracstep = fracstep
self.initObsPos = initObsPos
self.initObsVel = initObsVel
self.initGuess = initGuess
def _getOrbit(self, p):
if isinstance(p, dict):
vd = p.valuesdict()
ra, dec = vd['ra'], vd['dec']
pmra, pmdec = vd['pmra'], vd['pmdec']
range, rangerate = vd['range'], vd['rangerate']
else:
ra, dec, range, pmra, pmdec, rangerate = p
r, v = radecRateObsToRV(ra, dec, range, pmra, pmdec, rangerate,
self.initObsPos, self.initObsVel)
orbit = Orbit(r, v, self.probfn.epoch)
return orbit
def _resid(self, p):
chi = np.concatenate(self.probfn.chi(self._getOrbit(p)))
return chi
def _jac(self, p):
dx = np.max([self.fracstep*np.abs(p), self.absstep], axis=0)
f0 = self._resid(p)
p0 = np.array(p)
pi = p0.copy()
res = np.zeros((len(f0), len(p)), dtype='f8')
for i, di in enumerate(dx):
pi[i] = p0[i] + di
res[:, i] = (self._resid(pi)-f0)/di
pi[i] = p0[i]
return res
def optimize(self, usejac=True, **fit_kws):
"""Run the optimizer and return the resulting fit parameters.
Returns
-------
fit : (6,) array_like
Least-squares fit as [ra, dec, slant, raRate, decRate,
slantRate] in rad, rad, m, rad/s, rad/s, m/s.
"""
import lmfit
Dfun = self._jac if usejac else None
self.result = lmfit.minimize(self._resid, self.p, Dfun=Dfun,
**fit_kws)
orbit = self._getOrbit(self.result.params)
vmax = np.sqrt(2*WGS84_EARTH_MU/norm(orbit.r))
if norm(orbit.v) > vmax*0.999:
orbit.v = np.array(orbit.v)*vmax/norm(orbit.v)*0.999
self.result.success = False
self.result.hithyperbolicorbit = True
if usejac:
jac = self._jac(
[x for x in self.result.params.valuesdict().values()])
u, s, vh = np.linalg.svd(jac)
covar = vh.T.dot(np.diag(s**(-2))).dot(vh)
self.result.covar = covar
return radec(orbit, self.probfn.epoch, obsPos=self.initObsPos,
obsVel=self.initObsVel, rate=True)
class LMOptimizer:
def __init__(self, probfn, initRV,
fracstep=[1e-8, 1e-8, 1e-8, 1e-9, 1e-9, 1e-9],
absstep=[1, 1, 1, 1e-6, 1e-6, 1e-6],
orbitattr=None):
"""Optimizer that employs Levenberg-Marquardt least-squares fitting.
Parameters
----------
probfn : RVProbability
The RVProbability object that has both an epoch attribute to use for
the orbit fitting model, and a chi method to use for the fit
evaluation.
initRV : array_like (6,)
Initial position and velocity. Essentially a single output from one
of the initializers above.
Attributes
----------
result : lmfit.MinimizerResult
Most recently run result object. Could be useful for inspecting
error estimates or success/failure conditions.
Methods
-------
optimize()
Return the optimized parameters list [r, v]
"""
import lmfit
import warnings
warnings.warn("This class is deprecated; use "
"ssapy.rvsampler.LeastSquaresOptimizer")
self.probfn = probfn
self.initRV = initRV
p = lmfit.Parameters()
p.add('x', value=initRV[0])
p.add('y', value=initRV[1])
p.add('z', value=initRV[2])
p.add('vx', value=initRV[3])
p.add('vy', value=initRV[4])
p.add('vz', value=initRV[5])
self.p = p
self.absstep = absstep
self.fracstep = fracstep
def _getOrbit(self, p):
if isinstance(p, dict):
vd = p.valuesdict()
r = [vd['x'], vd['y'], vd['z']]
v = [vd['vx'], vd['vy'], vd['vz']]
orbit = Orbit(r, v, self.probfn.epoch)
else:
orbit = Orbit(p[:3], p[3:], self.probfn.epoch)
return orbit
def _resid(self, p):
chi = np.concatenate(self.probfn.chi(self._getOrbit(p)))
return chi
def _jac(self, p):
dx = np.max([self.fracstep*np.abs(p), self.absstep], axis=0)
f0 = self._resid(p)
p0 = np.array(p)
pi = p0.copy()
res = np.zeros((len(f0), len(p)), dtype='f8')
for i, di in enumerate(dx):
pi[i] = p0[i] + di
res[:, i] = (self._resid(pi)-f0)/di
pi[i] = p0[i]
return res
def optimize(self, usejac=True, **fit_kws):
"""Run the optimizer and return the resulting fit parameters.
Returns
-------
fit : (6,) array_like
Least-squares fit as [x, y, z, vx, vy, vz] in meters, and
meters/second.
"""
import lmfit
Dfun = self._jac if usejac else None
self.result = lmfit.minimize(self._resid, self.p, Dfun=Dfun,
**fit_kws)
orbit = self._getOrbit(self.result.params)
vmax = np.sqrt(2*WGS84_EARTH_MU/norm(orbit.r))
if norm(orbit.v) > vmax*0.999:
orbit.v = np.array(orbit.v)*vmax/norm(orbit.v)*0.999
self.result.success = False
self.result.hithyperbolicorbit = True
if usejac:
jac = self._jac(
[x for x in self.result.params.valuesdict().values()])
u, s, vh = np.linalg.svd(jac)
covar = vh.T.dot(np.diag(s**(-2))).dot(vh)
self.result.covar = covar
return np.hstack([orbit.r, orbit.v])
def eq2kep(eq):
a = eq[0]
tanio2 = np.sqrt(eq[1]**2+eq[2]**2)
e = np.sqrt(eq[3]**2+eq[4]**2)
aux = np.arctan2(eq[4], eq[3])
raan = np.arctan2(eq[2], eq[1])
pa = aux-raan
kep = [a, e, 2*np.arctan(tanio2), pa, raan, eq[5]-aux]
return kep
class SGP4LMOptimizer:
def __init__(self, probfn, initel,
fracstep=[1e-8]*6,
absstep=[100, 1e-5, 1e-5, 1e-4, 1e-4, 1e-4]):
"""Optimizer that employs Levenberg-Marquardt least-squares fitting
and fits in Kozai mean Keplerian element space.
Parameters
----------
probfn : RVProbability
The RVProbability object that has both an epoch attribute to use for the
orbit fitting model, and a chi method to use for the fit evaluation.
initel : array_like (6,)
Initial Kozai mean Keplerian elements.
Attributes
----------
result : lmfit.MinimizerResult
Most recently run result object. Could be useful for inspecting error
estimates or success/failure conditions.
Methods
-------
optimize()
Return the optimized parameters list [r, v]
"""
self.probfn = probfn
self.initel = initel
self.fieldnames = ['a', 'hx', 'hy', 'ex', 'ey', 'lv']
import lmfit
p = lmfit.Parameters()
for i, name in enumerate(self.fieldnames):
if name == 'ex' or name == 'ey':
p.add(name, value=initel[i], min=-0.7, max=0.7)
else:
p.add(name, value=initel[i])
self.p = p
self.absstep = absstep
self.fracstep = fracstep
def _getOrbit(self, p):
if isinstance(p, dict):
param = [p[x].value for x in self.fieldnames]
else:
param = p
paramkepler = eq2kep(param)
return Orbit.fromKozaiMeanKeplerianElements(*paramkepler,
t=self.probfn.epoch)
def _resid(self, p):
chi = np.concatenate(self.probfn.chi(self._getOrbit(p)))
return chi
def _jac(self, p):
dx = np.max([self.fracstep*np.abs(p), self.absstep], axis=0)
f0 = self._resid(p)
p0 = np.array(p)
ind = np.arange(len(p), dtype='i4')
fi = [self._resid(p0+(ind == i)*di)
for (i, di) in zip(np.arange(len(p)), dx)]
res = np.array([(x - f0)/hi for (x, hi) in zip(fi, dx)]).T
return res
def optimize(self, **fit_kws):
"""Run the optimizer and return the resulting fit parameters.
Returns
-------
fit : (6,) array_like
Least-squares fit as [a, e, i, pa, raan, trueAnomaly]
"""
import lmfit
self.result = lmfit.minimize(self._resid, self.p,
Dfun=self._jac, **fit_kws)
return np.array([self.result.params[x] for x in self.fieldnames])
class EquinoctialLMOptimizer:
def __init__(self, probfn, initel,
fracstep=[1e-7, 1e-9, 1e-9, 1e-6, 1e-6, 1e-9],
absstep=[1e-7, 1e-5, 1e-5, 1e-4, 1e-4, 1e-4],
orbitattr=None):
"""Optimizer that employs Levenberg-Marquardt least-squares fitting
and fits in Kozai mean Keplerian element space.
Parameters
----------
probfn : RVProbability
The RVProbability object that has both an epoch attribute to use for the
orbit fitting model, and a chi method to use for the fit evaluation.
initel : array_like (6,)
Initial Kozai mean Keplerian elements.
Attributes
----------
result : lmfit.MinimizerResult
Most recently run result object. Could be useful for inspecting error
estimates or success/failure conditions.
Methods
-------
optimize()
Return the optimized parameters list [r, v]
"""
import warnings
warnings.warn("This class is deprecated; use "
"ssapy.rvsampler.LeastSquaresOptimizer")
self.probfn = probfn
self.initel = initel
self.fieldnames = ['aem7', 'hx', 'hy', 'ex', 'ey', 'lv']
import lmfit
p = lmfit.Parameters()
for i, name in enumerate(self.fieldnames):
if name == 'ex' or name == 'ey':
p.add(name, value=initel[i], min=-0.99, max=0.99)
elif name == 'aem7':
p.add(name, value=initel[i]/1e7, min=6.3e6/1e7) # ~radius of earth
else:
p.add(name, value=initel[i])
self.p = p
self.absstep = absstep
self.fracstep = fracstep
def _getOrbit(self, p):
if isinstance(p, dict):
param = np.array([p[x].value for x in self.fieldnames]).copy()
else:
param = p.copy()
e2 = np.hypot(param[3], param[4])
if e2 >= 0.999:
norm = 0.999/e2
param[3] *= norm
param[4] *= norm
param[0] *= 1e7
return Orbit.fromEquinoctialElements(*param,
t=self.probfn.epoch)
def _resid(self, p):
chi = np.concatenate(self.probfn.chi(self._getOrbit(p)))
return chi
def _jac(self, p):
dx = np.max([self.fracstep*np.abs(p), self.absstep], axis=0)
f0 = self._resid(p)
p0 = np.array(p)
ind = np.arange(len(p), dtype='i4')
fi = [self._resid(p0+(ind == i)*di)
for (i, di) in zip(np.arange(len(p)), dx)]
res = np.array([(x - f0)/hi for (x, hi) in zip(fi, dx)]).T
return res
def optimize(self, **fit_kws):
"""Run the optimizer and return the resulting fit parameters.
Returns
-------
fit : (6,) array_like
Least-squares fit as [a, e, i, pa, raan, trueAnomaly]
"""
import lmfit
self.result = lmfit.minimize(self._resid, self.p,
Dfun=self._jac, **fit_kws)
jac = self._jac(
[x for x in self.result.params.valuesdict().values()])
u, s, vh = np.linalg.svd(jac)
covar = vh.T.dot(np.diag(s**(-2))).dot(vh)
self.result.covar = covar
param = np.array([self.result.params[x] for x in self.fieldnames])
param[0] *= 1e7
self.result.covar[:, 0] *= 1e7
self.result.covar[0, :] *= 1e7
return param
[docs]
def damper(chi, damp):
"""Pseudo-Huber loss function."""
return 2*damp*np.sign(chi)*(np.sqrt(1+np.abs(chi)/damp)-1)
# return chi/np.sqrt(1+np.abs(chi)/damp)
[docs]
def damper_deriv(chi, damp, derivnum=1):
"""Derivative of the pseudo-Huber loss function."""
if derivnum == 1:
return (1+np.abs(chi)/damp)**(-0.5)
if derivnum == 2:
return -0.5*np.sign(chi)/damp*(1+np.abs(chi)/damp)**(-1.5)