Source code for stingray.modeling.parameterestimation

__all__ = ["OptimizationResults", "ParameterEstimation", "PSDParEst", "SamplingResults"]


import logging
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator


# check whether emcee is installed for sampling
try:
    import emcee

    can_sample = True
except ImportError:
    can_sample = False

try:
    import corner

    use_corner = True
except ImportError:
    use_corner = False

from multiprocessing import Pool

import numpy as np
import scipy
import scipy.optimize
import scipy.stats
import scipy.signal
import copy

try:
    from statsmodels.tools.numdiff import approx_hess

    comp_hessian = True
except ImportError:
    comp_hessian = False

from stingray.modeling.posterior import (
    Posterior,
    PSDPosterior,
    LogLikelihood,
    PSDLogLikelihood,
    logmin,
    fitter_to_model_params,
)
from stingray.loggingconfig import CustomFormatter, setup_logger

logger = setup_logger()


[docs] class OptimizationResults(object): """ Helper class that will contain the results of the regression. Less fiddly than a dictionary. Parameters ---------- lpost: instance of :class:`Posterior` or one of its subclasses The object containing the function that is being optimized in the regression res: instance of ``scipy.OptimizeResult`` The object containing the results from a optimization run neg : bool, optional, default ``True`` A flag that sets whether the log-likelihood or negative log-likelihood is being used log : a logging.getLogger() object, default None You can pass a pre-defined object for logging, else a new logger will be instantiated Attributes ---------- result : float The result of the optimization, i.e. the function value at the minimum that the optimizer found p_opt : iterable The list of parameters at the minimum found by the optimizer model : ``astropy.models.Model`` instance The parametric model fit to the data cov : numpy.ndarray The covariance matrix for the parameters, has shape ``(len(p_opt), len(p_opt))`` err : numpy.ndarray The standard deviation of the parameters, derived from the diagonal of ``cov``. Has the same shape as ``p_opt`` mfit : numpy.ndarray The values of the model for all ``x`` deviance : float The deviance, calculated as ``-2*log(likelihood)`` aic : float The Akaike Information Criterion, derived from the log(likelihood) and often used in model comparison between non-nested models; For more details, see [#]_ bic : float The Bayesian Information Criterion, derived from the log(likelihood) and often used in model comparison between non-nested models; For more details, see [#]_ merit : float sum of squared differences between data and model, normalized by the model values dof : int The number of degrees of freedom in the problem, defined as the number of data points - the number of parameters sexp : int ``2*(number of parameters)*(number of data points)`` ssd : float ``sqrt(2*(sexp))``, expected sum of data-model residuals sobs : float sum of data-model residuals References ---------- .. [#] https://doi.org/10.1109/TAC.1974.1100705 .. [#] https://projecteuclid.org/euclid.aos/1176344136 """ def __init__(self, lpost, res, neg=True, log=None): self.neg = neg self.result = res.fun self.p_opt = np.atleast_1d(res.x) self.model = lpost.model if log is None: self.log = logging.getLogger("Fitting summary") self.log.setLevel(logging.DEBUG) if not self.log.handlers: ch = logging.StreamHandler() formatter = CustomFormatter() ch.setFormatter(formatter) ch.setLevel(logging.DEBUG) self.log.addHandler(ch) self._compute_covariance(lpost, res) self._compute_model(lpost) self._compute_criteria(lpost) self._compute_statistics(lpost)
[docs] def _compute_covariance(self, lpost, res): """ Compute the covariance of the parameters using inverse of the Hessian, i.e. the second-order derivative of the log-likelihood. Also calculates an estimate of the standard deviation in the parameters, using the square root of the diagonal of the covariance matrix. The Hessian is either estimated directly by the chosen method of fitting, or approximated using the ``statsmodel`` ``approx_hess`` function. Parameters ---------- lpost: instance of :class:`Posterior` or one of its subclasses The object containing the function that is being optimized in the regression res: instance of ``scipy``'s ``OptimizeResult`` class The object containing the results from a optimization run """ if hasattr(res, "hess_inv"): if not isinstance(res.hess_inv, np.ndarray): self.cov = np.asarray(res.hess_inv.todense()) else: self.cov = res.hess_inv self.err = np.sqrt(np.diag(self.cov)) else: if comp_hessian: # calculate Hessian approximating with finite differences self.log.info("Approximating Hessian with finite differences ...") phess = approx_hess(np.atleast_1d(self.p_opt), lpost) self.cov = np.linalg.inv(phess) self.err = np.sqrt(np.diag(np.abs(self.cov))) else: self.cov = None self.err = None
[docs] def _compute_model(self, lpost): """ Compute the values of the best-fit model for all ``x``. Parameters ---------- lpost: instance of :class:`Posterior` or one of its subclasses The object containing the function that is being optimized in the regression """ fitter_to_model_params(lpost.model, self.p_opt) self.mfit = lpost.model(lpost.x)
[docs] def _compute_criteria(self, lpost): """ Compute various information criteria useful for model comparison in non-nested models. Currently implemented are the Akaike Information Criterion [#]_ and the Bayesian Information Criterion [#]_. Parameters ---------- lpost: instance of :class:`Posterior` or one of its subclasses The object containing the function that is being optimized in the regression References ---------- .. [#] https://doi.org/10.1109/TAC.1974.1100705 .. [#] https://projecteuclid.org/euclid.aos/1176344136 """ if isinstance(lpost, Posterior): self.deviance = -2.0 * lpost.loglikelihood(self.p_opt, neg=False) elif isinstance(lpost, LogLikelihood): self.deviance = 2.0 * self.result # Akaike Information Criterion self.aic = self.result + 2.0 * self.p_opt.shape[0] # Bayesian Information Criterion self.bic = self.result + self.p_opt.shape[0] * np.log(lpost.x.shape[0])
# Deviance Information Criterion # TODO: Add Deviance Information Criterion
[docs] def _compute_statistics(self, lpost): """ Compute some useful fit statistics, like the degrees of freedom and the figure of merit. Parameters ---------- lpost: instance of :class:`Posterior` or one of its subclasses The object containing the function that is being optimized in the regression """ try: self.mfit except AttributeError: self._compute_model(lpost) self.merit = np.sum(((lpost.y - self.mfit) / self.mfit) ** 2.0) self.dof = lpost.y.shape[0] - float(self.p_opt.shape[0]) self.sexp = 2.0 * len(lpost.x) * len(self.p_opt) self.ssd = np.sqrt(2.0 * self.sexp) self.sobs = np.sum(lpost.y - self.mfit)
[docs] def print_summary(self, lpost): """ Print a useful summary of the fitting procedure to screen or a log file. Parameters ---------- lpost : instance of :class:`Posterior` or one of its subclasses The object containing the function that is being optimized in the regression """ self.log.info("The best-fit model parameters plus errors are:") fixed = [lpost.model.fixed[n] for n in lpost.model.param_names] tied = [lpost.model.tied[n] for n in lpost.model.param_names] bounds = [lpost.model.bounds[n] for n in lpost.model.param_names] parnames = [n for n, f in zip(lpost.model.param_names, np.logical_or(fixed, tied)) if not f] all_parnames = [n for n in lpost.model.param_names] for i, par in enumerate(all_parnames): self.log.info("{:3}) Parameter {:<20}: ".format(i, par)) if par in parnames: idx = parnames.index(par) err_info = " (no error estimate)" if self.err is not None: err_info = " +/- {:<20.5f}".format(self.err[idx]) self.log.info("{:<20.5f}{} ".format(self.p_opt[idx], err_info)) self.log.info("[{:>10} {:>10}]".format(str(bounds[i][0]), str(bounds[i][1]))) elif fixed[i]: self.log.info("{:<20.5f} (Fixed) ".format(lpost.model.parameters[i])) elif tied[i]: self.log.info("{:<20.5f} (Tied) ".format(lpost.model.parameters[i])) self.log.info("\n") self.log.info("Fitting statistics: ") self.log.info(" -- number of data points: %i" % (len(lpost.x))) try: self.deviance except AttributeError: self._compute_criteria(lpost) self.log.info(" -- Deviance [-2 log L] D = %f.3" % self.deviance) self.log.info( " -- The Akaike Information Criterion of the model is: " + str(self.aic) + "." ) self.log.info( " -- The Bayesian Information Criterion of the model is: " + str(self.bic) + "." ) try: self.merit except AttributeError: self._compute_statistics(lpost) self.log.info( " -- The figure-of-merit function for this model " + " is: %f.5f" % self.merit + " and the fit for %i dof is %f.3f" % (self.dof, self.merit / self.dof) ) self.log.info(" -- Summed Residuals S = %f.5f" % self.sobs) self.log.info(" -- Expected S ~ %f.5 +/- %f.5" % (self.sexp, self.ssd)) return
[docs] class ParameterEstimation(object): """ Parameter estimation of two-dimensional data, either via optimization or MCMC. Note: optimization with bounds is not supported. If something like this is required, define (uniform) priors in the ParametricModel instances to be used below. Parameters ---------- fitmethod : string, optional, default ``L-BFGS-B`` Any of the strings allowed in ``scipy.optimize.minimize`` in the method keyword. Sets the fit method to be used. max_post : bool, optional, default ``True`` If ``True``, then compute the Maximum-A-Posteriori estimate. If ``False``, compute a Maximum Likelihood estimate. """ def __init__(self, fitmethod="BFGS", max_post=True): self.fitmethod = fitmethod self.max_post = max_post
[docs] def fit(self, lpost, t0, neg=True, scipy_optimize_options=None): """ Do either a Maximum-A-Posteriori (MAP) or Maximum Likelihood (ML) fit to the data. MAP fits include priors, ML fits do not. Parameters ---------- lpost : :class:`Posterior` (or subclass) instance and instance of class :class:`Posterior` or one of its subclasses that defines the function to be minimized (either in ``loglikelihood`` or ``logposterior``) t0 : {``list`` | ``numpy.ndarray``} List/array with set of initial parameters neg : bool, optional, default ``True`` Boolean to be passed to ``lpost``, setting whether to use the *negative* posterior or the *negative* log-likelihood. Useful for optimization routines, which are generally defined as *minimization* routines. scipy_optimize_options : dict, optional, default ``None`` A dictionary with options for ``scipy.optimize.minimize``, directly passed on as keyword arguments. Returns ------- res : :class:`OptimizationResults` object An object containing useful summaries of the fitting procedure. For details, see documentation of class:`OptimizationResults`. """ if not isinstance(lpost, Posterior) and not isinstance(lpost, LogLikelihood): raise TypeError("lpost must be a subclass of " "Posterior or LogLikelihoood.") newmod = lpost.model.copy() p0 = t0 # p0 will be shorter than t0, if there are any frozen/tied parameters # this has to match with the npar attribute. if not len(p0) == lpost.npar: raise ValueError("Parameter set t0 must be of right " "length for model in lpost.") args = (neg,) if not scipy_optimize_options: scipy_optimize_options = {} # different commands for different fitting methods, # at least until scipy 0.11 is out funcval = 100.0 i = 0 while funcval == 100 or funcval == 200 or funcval == 0.0 or not np.isfinite(funcval): if i > 20: raise RuntimeError("Fitting unsuccessful!") # perturb parameters slightly t0_p = np.random.multivariate_normal(p0, np.diag(np.abs(p0) / 100.0)) params = [getattr(newmod, name) for name in newmod.param_names] bounds = np.array([p.bounds for p in params if not np.any([p.tied, p.fixed])]) if any(elem is not None for elem in np.hstack(bounds)) and self.fitmethod not in [ "L-BFGS-B", "TNC", "SLSQP", ]: logger.warning( "Fitting method %s " % self.fitmethod + "cannot incorporate the bounds you set!" ) if any(elem is not None for elem in np.hstack(bounds)) or self.fitmethod not in [ "L-BFGS-B", "TNC", "SLSQP", ]: use_bounds = False else: use_bounds = True # if max_post is True, do the Maximum-A-Posteriori Fit if self.max_post: if use_bounds: opt = scipy.optimize.minimize( lpost, t0_p, method=self.fitmethod, args=args, tol=1.0e-10, bounds=bounds, **scipy_optimize_options ) else: opt = scipy.optimize.minimize( lpost, t0_p, method=self.fitmethod, args=args, tol=1.0e-10, **scipy_optimize_options ) # if max_post is False, then do a Maximum Likelihood Fit else: if isinstance(lpost, Posterior): if use_bounds: # This could be a `Posterior` object opt = scipy.optimize.minimize( lpost.loglikelihood, t0_p, method=self.fitmethod, args=args, tol=1.0e-10, bounds=bounds, **scipy_optimize_options ) else: opt = scipy.optimize.minimize( lpost.loglikelihood, t0_p, method=self.fitmethod, args=args, tol=1.0e-10, **scipy_optimize_options ) elif isinstance(lpost, LogLikelihood): if use_bounds: # Except this could be a `LogLikelihood object # In which case, use the evaluate function # if it's not either, give up and break! opt = scipy.optimize.minimize( lpost.evaluate, t0_p, method=self.fitmethod, args=args, tol=1.0e-10, # bounds=bounds, **scipy_optimize_options ) else: opt = scipy.optimize.minimize( lpost.evaluate, t0_p, method=self.fitmethod, args=args, tol=1.0e-10, **scipy_optimize_options ) funcval = opt.fun if np.isclose(opt.fun, logmin) or np.isclose(opt.fun, 2 * logmin): funcval = 100 i += 1 res = OptimizationResults(lpost, opt, neg=neg) return res
[docs] def compute_lrt(self, lpost1, t1, lpost2, t2, neg=True, max_post=False): """ This function computes the Likelihood Ratio Test between two nested models. Parameters ---------- lpost1 : object of a subclass of :class:`Posterior` The :class:`Posterior` object for model 1 t1 : iterable The starting parameters for model 1 lpost2 : object of a subclass of :class:`Posterior` The :class:`Posterior` object for model 2 t2 : iterable The starting parameters for model 2 neg : bool, optional, default ``True`` Boolean flag to decide whether to use the negative log-likelihood or log-posterior max_post: bool, optional, default ``False`` If ``True``, set the internal state to do the optimization with the log-likelihood rather than the log-posterior. Returns ------- lrt : float The likelihood ratio for model 2 and model 1 res1 : OptimizationResults object Contains the result of fitting ``lpost1`` res2 : OptimizationResults object Contains the results of fitting ``lpost2`` """ self.max_post = max_post # fit data with both models res1 = self.fit(lpost1, t1, neg=neg) res2 = self.fit(lpost2, t2, neg=neg) # compute log likelihood ratio as difference between the deviances lrt = res1.deviance - res2.deviance return lrt, res1, res2
[docs] def sample( self, lpost, t0, cov=None, nwalkers=500, niter=100, burnin=100, threads=1, print_results=True, plot=False, namestr="test", pool=False, ): """ Sample the :class:`Posterior` distribution defined in ``lpost`` using MCMC. Here we use the ``emcee`` package, but other implementations could in principle be used. Parameters ---------- lpost : instance of a :class:`Posterior` subclass and instance of class :class:`Posterior` or one of its subclasses that defines the function to be minimized (either in ``loglikelihood`` or ``logposterior``) t0 : iterable list or array containing the starting parameters. Its length must match ``lpost.model.npar``. nwalkers : int, optional, default 500 The number of walkers (chains) to use during the MCMC procedure. The more walkers are used, the slower the estimation will be, but the better the final distribution is likely to be. niter : int, optional, default 100 The number of iterations to run the MCMC chains for. The larger this number, the longer the estimation will take, but the higher the chance that the walkers have actually converged on the true posterior distribution. burnin : int, optional, default 100 The number of iterations to run the walkers before convergence is assumed to have occurred. This part of the chain will be discarded before sampling from what is then assumed to be the posterior distribution desired. threads : **DEPRECATED** int, optional, default 1 The number of threads for parallelization. Default is ``1``, i.e. no parallelization With the change to the new emcee version 3, threads is deprecated. Use the `pool` keyword argument instead. This will no longer have any effect. print_results : bool, optional, default ``True`` Boolean flag setting whether the results of the MCMC run should be printed to standard output. Default: True plot : bool, optional, default ``False`` Boolean flag setting whether summary plots of the MCMC chains should be produced. Default: False namestr : str, optional, default ``test`` Optional string for output file names for the plotting. pool : bool, default False If True, use pooling to parallelize the operation. Returns ------- res : class:`SamplingResults` object An object of class :class:`SamplingResults` summarizing the results of the MCMC run. """ if threads > 1: raise DeprecationWarning("Keyword 'threads' is deprecated. Please use 'pool' instead.") if not can_sample: raise ImportError("emcee not installed! Can't sample!") ndim = len(t0) if cov is None: # do a MAP fitting step to find good starting positions for # the sampler res = self.fit(lpost, t0, neg=True) cov = res.cov # sample random starting positions for each walker from # a multivariate Gaussian p0 = np.array([np.random.multivariate_normal(t0, cov) for i in range(nwalkers)]) if pool: with Pool() as pooling: # initialize the sampler sampler = emcee.EnsembleSampler(nwalkers, ndim, lpost, args=[False], pool=pooling) # run the burn-in pos, prob, state = sampler.run_mcmc(p0, burnin) sampler.reset() state = emcee.State(pos, prob, random_state=state) # do the actual MCMC run _ = sampler.run_mcmc(initial_state=state, nsteps=niter) else: # initialize the sampler sampler = emcee.EnsembleSampler(nwalkers, ndim, lpost, args=[False]) # run the burn-in pos, prob, state = sampler.run_mcmc(p0, burnin) sampler.reset() state = emcee.State(pos, prob, random_state=state) # do the actual MCMC run _ = sampler.run_mcmc(initial_state=state, nsteps=niter) res = SamplingResults(sampler) if print_results: res.print_results() if plot: fig = res.plot_results(fig=None, save_plot=True, filename=namestr + "_corner.pdf") return res
def _generate_model(self, lpost, pars): """ Helper function that generates a fake PSD similar to the one in the data, but with different parameters. Parameters ---------- lpost : instance of a :class:`Posterior` or :class:`LogLikelihood` subclass The object containing the relevant information about the data and the model pars : iterable A list of parameters to be passed to ``lpost.model`` in order to generate a model data set. Returns ------- model_data : numpy.ndarray An array of model values for each bin in ``lpost.x`` """ assert isinstance(lpost, LogLikelihood) or isinstance(lpost, Posterior), ( "lpost must be of type LogLikelihood or Posterior or one of its " "subclasses!" ) # assert pars is of correct length assert len(pars) == lpost.npar, "pars must be a list " "of %i parameters" % lpost.npar # get the model m = lpost.model # reset the parameters fitter_to_model_params(m, pars) # make a model spectrum model_data = lpost.model(lpost.x) return model_data @staticmethod def _compute_pvalue(obs_val, sim): """ Compute the p-value given an observed value of a test statistic and some simulations of that same test statistic. Parameters ---------- obs_value : float The observed value of the test statistic in question sim: iterable A list or array of simulated values for the test statistic Returns ------- pval : float in range [0, 1] The p-value for the test statistic given the simulations. """ # cast the simulations as a numpy array sim = np.array(sim) # find all simulations that are larger than # the observed value ntail = sim[sim > obs_val].shape[0] # divide by the total number of simulations pval = float(ntail) / float(sim.shape[0]) return pval
[docs] def simulate_lrts(self, s_all, lpost1, t1, lpost2, t2, max_post=True, seed=None): """ Simulate likelihood ratios. For details, see definitions in the subclasses that implement this task. """ raise NotImplementedError( "The behaviour of `simulate_lrts` should be defined " "in the subclass appropriate for your problem, not in " "this super class!" )
[docs] def calibrate_lrt( self, lpost1, t1, lpost2, t2, sample=None, neg=True, max_post=False, nsim=1000, niter=200, nwalkers=500, burnin=200, namestr="test", seed=None, ): """Calibrate the outcome of a Likelihood Ratio Test via MCMC. In order to compare models via likelihood ratio test, one generally aims to compute a p-value for the null hypothesis (generally the simpler model). There are two special cases where the theoretical distribution used to compute that p-value analytically given the observed likelihood ratio (a chi-square distribution) is not applicable: * the models are not nested (i.e. Model 1 is not a special, simpler case of Model 2), * the parameter values fixed in Model 2 to retrieve Model 1 are at the edges of parameter space (e.g. if one must set, say, an amplitude to zero in order to remove a component in the more complex model, and negative amplitudes are excluded a priori) In these cases, the observed likelihood ratio must be calibrated via simulations of the simpler model (Model 1), using MCMC to take into account the uncertainty in the parameters. This function does exactly that: it computes the likelihood ratio for the observed data, and produces simulations to calibrate the likelihood ratio and compute a p-value for observing the data under the assumption that Model 1 istrue. If ``max_post=True``, the code will use MCMC to sample the posterior of the parameters and simulate fake data from there. If ``max_post=False``, the code will use the covariance matrix derived from the fit to simulate data sets for comparison. Parameters ---------- lpost1 : object of a subclass of :class:`Posterior` The :class:`Posterior` object for model 1 t1 : iterable The starting parameters for model 1 lpost2 : object of a subclass of :class:`Posterior` The :class:`Posterior` object for model 2 t2 : iterable The starting parameters for model 2 neg : bool, optional, default ``True`` Boolean flag to decide whether to use the negative log-likelihood or log-posterior max_post: bool, optional, default ``False`` If ``True``, set the internal state to do the optimization with the log-likelihood rather than the log-posterior. Returns ------- pvalue : float [0,1] p-value 'n stuff """ # compute the observed likelihood ratio lrt_obs, res1, res2 = self.compute_lrt(lpost1, t1, lpost2, t2, neg=neg, max_post=max_post) rng = np.random.RandomState(seed) if sample is None: # simulate parameter sets from the simpler model if not max_post: # using Maximum Likelihood, so I'm going to simulate parameters # from a multivariate Gaussian # set up the distribution mvn = scipy.stats.multivariate_normal(mean=res1.p_opt, cov=res1.cov, seed=seed) # sample parameters s_all = mvn.rvs(size=nsim) if lpost1.npar == 1: s_all = np.atleast_2d(s_all).T else: # sample the :class:`Posterior` using MCMC s_mcmc = self.sample( lpost1, res1.p_opt, cov=res1.cov, nwalkers=nwalkers, niter=niter, burnin=burnin, namestr=namestr, ) # pick nsim samples out of the :class:`Posterior` sample s_all = s_mcmc.samples[rng.choice(s_mcmc.samples.shape[0], nsim, replace=False)] # if lpost1.npar == 1: # s_all = np.atleast_2d(s_all).T else: s_all = sample[rng.choice(sample.shape[0], nsim, replace=False)] # simulate LRTs # this method is defined in the subclasses! lrt_sim = self.simulate_lrts(s_all, lpost1, t1, lpost2, t2, seed=seed) # now I can compute the p-value: pval = ParameterEstimation._compute_pvalue(lrt_obs, lrt_sim) return pval
[docs] class SamplingResults(object): """ Helper class that will contain the results of the sampling in a handy format. Less fiddly than a dictionary. Parameters ---------- sampler: ``emcee.EnsembleSampler`` object The object containing the sampler that's done all the work. ci_min: float out of [0,100] The lower bound percentile for printing credible intervals on the parameters ci_max: float out of [0,100] The upper bound percentile for printing credible intervals on the parameters log : a logging.getLogger() object, default None You can pass a pre-defined object for logging, else a new logger will be instantiated Attributes ---------- samples : numpy.ndarray An array of samples from the MCMC run, including all chains flattened into one long (``nwalkers*niter``, ``ndim``) array nwalkers : int The number of chains used in the MCMC procedure niter : int The number of MCMC iterations in each chain ndim : int The dimensionality of the problem, i.e. the number of parameters in the model acceptance : float The mean acceptance ratio, calculated over all chains L : float The product of acceptance ratio and number of samples acor : float The autocorrelation length for the chains; should be shorter than the chains themselves for independent sampling rhat : float weighted average of between-sequence variance and within-sequence variance; Gelman-Rubin convergence statistic [#]_ mean : numpy.ndarray An array of size ``ndim``, with the posterior means of the parameters derived from the MCMC chains std : numpy.ndarray An array of size ``ndim`` with the posterior standard deviations of the parameters derived from the MCMC chains ci : numpy.ndarray An array of shape ``(ndim, 2)`` containing the lower and upper bounds of the credible interval (the Bayesian equivalent of the confidence interval) for each parameter using the bounds set by ``ci_min`` and ``ci_max`` References ---------- .. [#] https://projecteuclid.org/euclid.ss/1177011136 """ def __init__(self, sampler, ci_min=5, ci_max=95, log=None): if log is None: self.log = logging.getLogger("MCMC summary") self.log.setLevel(logging.DEBUG) if not self.log.handlers: ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) self.log.addHandler(ch) # store all the samples self.samples = sampler.get_chain(flat=True) chain_dims = sampler.get_chain().shape self.nwalkers = float(chain_dims[0]) self.niter = float(chain_dims[1]) # store number of dimensions self.ndim = chain_dims[2] # compute and store acceptance fraction self.acceptance = np.nanmean(sampler.acceptance_fraction) self.L = self.acceptance * self.samples.shape[0] self._check_convergence(sampler) self._infer(ci_min, ci_max)
[docs] def _check_convergence(self, sampler): """ Compute common statistics for convergence of the MCMC chains. While you can never be completely sure that your chains converged, these present reasonable heuristics to give an indication whether convergence is very far off or reasonably close. Currently implemented are the autocorrelation time [#]_ and the Gelman-Rubin convergence criterion [#]_. Parameters ---------- sampler : an ``emcee.EnsembleSampler`` object References ---------- .. [#] https://arxiv.org/abs/1202.3665 .. [#] https://projecteuclid.org/euclid.ss/1177011136 """ # compute and store autocorrelation time try: self.acor = sampler.get_autocorr_time() except emcee.autocorr.AutocorrError: self.log.info("Chains too short to compute autocorrelation lengths.") self.rhat = self._compute_rhat(sampler)
[docs] def _compute_rhat(self, sampler): """ Compute Gelman-Rubin convergence criterion [#]_. Parameters ---------- sampler : an `emcee.EnsembleSampler` object References ---------- .. [#] https://projecteuclid.org/euclid.ss/1177011136 """ chain = sampler.get_chain() # between-sequence variance mean_samples_iter = np.nanmean(chain, axis=1) # mean over the means over iterations: (self.ndim) mean_samples = np.nanmean(chain, axis=(0, 1)) # now compute between-sequence variance bb = (self.niter / (self.nwalkers - 1)) * np.sum( (mean_samples_iter - mean_samples) ** 2.0, axis=0 ) # compute variance of each chain var_samples = np.nanvar(chain, axis=1) # compute mean of variance ww = np.nanmean(var_samples, axis=0) # compute weighted average of ww and bb: rhat = ((self.niter - 1) / self.niter) * ww + (1 / self.niter) * bb return rhat
[docs] def _infer(self, ci_min=5, ci_max=95): """ Infer the :class:`Posterior` means, standard deviations and credible intervals (i.e. the Bayesian equivalent to confidence intervals) from the :class:`Posterior` samples for each parameter. Parameters ---------- ci_min : float Lower bound to the credible interval, given as percentage between 0 and 100 ci_max : float Upper bound to the credible interval, given as percentage between 0 and 100 """ self.mean = np.mean(self.samples, axis=0) self.std = np.std(self.samples, axis=0) self.ci = np.percentile(self.samples, [ci_min, ci_max], axis=0)
[docs] def print_results(self): """ Print results of the MCMC run on screen or to a log-file. """ self.log.info("-- The acceptance fraction is: %f.5" % self.acceptance) try: self.log.info("-- The autocorrelation time is: {}".format(self.acor)) except AttributeError: pass self.log.info("R_hat for the parameters is: " + str(self.rhat)) self.log.info("-- Posterior Summary of Parameters: \n") self.log.info("parameter \t mean \t\t sd \t\t 5% \t\t 95% \n") self.log.info("---------------------------------------------\n") for i in range(self.ndim): self.log.info( "theta[" + str(i) + "] \t " + str(self.mean[i]) + "\t" + str(self.std[i]) + "\t" + str(self.ci[0, i]) + "\t" + str(self.ci[1, i]) + "\n" ) return
[docs] def plot_results(self, nsamples=1000, fig=None, save_plot=False, filename="test.pdf"): """ Plot some results in a triangle plot. If installed, will use [corner]_ for the plotting, if not, uses its own code to make a triangle plot. By default, this method returns a ``matplotlib.Figure`` object, but if ``save_plot=True`` the plot can be saved to file automatically, Parameters ---------- nsamples : int, default 1000 The maximum number of samples used for plotting. fig : matplotlib.Figure instance, default None If created externally, you can pass a Figure instance to this method. If none is passed, the method will create one internally. save_plot : bool, default ``False`` If ``True`` save the plot to file with a file name specified by the keyword ``filename``. If ``False`` just return the ``Figure`` object filename : str Name of the output file with the figure References ---------- .. [corner] https://github.com/dfm/corner.py """ if use_corner: fig = corner.corner( self.samples, labels=None, fig=fig, bins=int(20), quantiles=[0.16, 0.5, 0.84], show_titles=True, title_args={"fontsize": 12}, ) else: if fig is None: fig = plt.figure(figsize=(15, 15)) plt.subplots_adjust( top=0.925, bottom=0.025, left=0.025, right=0.975, wspace=0.2, hspace=0.2 ) ind_all = np.random.choice(np.arange(self.samples.shape[0]), size=nsamples) samples = self.samples[ind_all] for i in range(self.ndim): for j in range(self.ndim): xmin, xmax = samples[:, j].min(), samples[:, j].max() ymin, ymax = samples[:, i].min(), samples[:, i].max() ax = fig.add_subplot(self.ndim, self.ndim, i * self.ndim + j + 1) ax.xaxis.set_major_locator(MaxNLocator(5)) ax.ticklabel_format(style="sci", scilimits=(-2, 2)) if i == j: ntemp, binstemp, patchestemp = ax.hist( samples[:, i], 30, density=True, histtype="stepfilled" ) ax.axis([ymin, ymax, 0, np.max(ntemp) * 1.2]) else: ax.axis([xmin, xmax, ymin, ymax]) # make a scatter plot first ax.scatter(samples[:, j], samples[:, i], s=7) # then add contours xmin, xmax = samples[:, j].min(), samples[:, j].max() ymin, ymax = samples[:, i].min(), samples[:, i].max() # Perform Kernel density estimate on data try: xx, yy = np.mgrid[xmin:xmax:100j, ymin:ymax:100j] positions = np.vstack([xx.ravel(), yy.ravel()]) values = np.vstack([samples[:, j], samples[:, i]]) kernel = scipy.stats.gaussian_kde(values) zz = np.reshape(kernel(positions).T, xx.shape) ax.contour(xx, yy, zz, 7) except ValueError: logger.info("Not making contours.") if save_plot: plt.savefig(filename, format="pdf") return fig
[docs] class PSDParEst(ParameterEstimation): """ Parameter estimation for parametric modelling of power spectra. This class contains functionality that allows parameter estimation and related tasks that involve fitting a parametric model to an (averaged) power spectrum. Parameters ---------- ps : class:`stingray.Powerspectrum` or class:`stingray.AveragedPowerspectrum` object The power spectrum to be modelled fitmethod : str, optional, default ``BFGS`` A string allowed by ``scipy.optimize.minimize`` as a valid fitting method max_post : bool, optional, default ``True`` If ``True``, do a Maximum-A-Posteriori (MAP) fit, i.e. fit with priors, otherwise do a Maximum Likelihood fit instead """ def __init__(self, ps, fitmethod="BFGS", max_post=True): self.ps = ps ParameterEstimation.__init__(self, fitmethod=fitmethod, max_post=max_post)
[docs] def fit(self, lpost, t0, neg=True, scipy_optimize_options=None): """ Do either a Maximum-A-Posteriori (MAP) or Maximum Likelihood (ML) fit to the power spectrum. MAP fits include priors, ML fits do not. Parameters ---------- lpost : :class:`stingray.modeling.PSDPosterior` object An instance of class :class:`stingray.modeling.PSDPosterior` that defines the function to be minimized (either in ``loglikelihood`` or ``logposterior``) t0 : {list | numpy.ndarray} List/array with set of initial parameters neg : bool, optional, default ``True`` Boolean to be passed to ``lpost``, setting whether to use the *negative* posterior or the *negative* log-likelihood. scipy_optimize_options : dict, optional, default None A dictionary with options for ``scipy.optimize.minimize``, directly passed on as keyword arguments. Returns ------- res : :class:`OptimizationResults` object An object containing useful summaries of the fitting procedure. For details, see documentation of :class:`OptimizationResults`. """ self.lpost = lpost res = ParameterEstimation.fit( self, self.lpost, t0, neg=neg, scipy_optimize_options=scipy_optimize_options ) res.maxpow, res.maxfreq, res.maxind = self._compute_highest_outlier(self.lpost, res) return res
[docs] def sample( self, lpost, t0, cov=None, nwalkers=500, niter=100, burnin=100, threads=1, print_results=True, plot=False, namestr="test", ): """ Sample the posterior distribution defined in ``lpost`` using MCMC. Here we use the ``emcee`` package, but other implementations could in principle be used. Parameters ---------- lpost : instance of a :class:`Posterior` subclass and instance of class :class:`Posterior` or one of its subclasses that defines the function to be minimized (either in ``loglikelihood`` or ``logposterior``) t0 : iterable list or array containing the starting parameters. Its length must match ``lpost.model.npar``. nwalkers : int, optional, default 500 The number of walkers (chains) to use during the MCMC procedure. The more walkers are used, the slower the estimation will be, but the better the final distribution is likely to be. niter : int, optional, default 100 The number of iterations to run the MCMC chains for. The larger this number, the longer the estimation will take, but the higher the chance that the walkers have actually converged on the true posterior distribution. burnin : int, optional, default 100 The number of iterations to run the walkers before convergence is assumed to have occurred. This part of the chain will be discarded before sampling from what is then assumed to be the posterior distribution desired. threads : int, optional, default 1 The number of threads for parallelization. Default is ``1``, i.e. no parallelization print_results : bool, optional, default True Boolean flag setting whether the results of the MCMC run should be printed to standard output plot : bool, optional, default False Boolean flag setting whether summary plots of the MCMC chains should be produced namestr : str, optional, default ``test`` Optional string for output file names for the plotting. Returns ------- res : :class:`SamplingResults` object An object containing useful summaries of the sampling procedure. For details see documentation of :class:`SamplingResults`. """ self.lpost = lpost if cov is None: fit_res = ParameterEstimation.fit(self, self.lpost, t0, neg=True) cov = fit_res.cov t0 = fit_res.p_opt res = ParameterEstimation.sample( self, self.lpost, t0, cov=cov, nwalkers=nwalkers, niter=niter, burnin=burnin, threads=threads, print_results=print_results, plot=plot, namestr=namestr, ) return res
def _generate_data(self, lpost, pars, rng=None): """ Generate a fake power spectrum from a model. Parameters ---------- lpost : instance of a :class:`Posterior` or :class:`LogLikelihood` subclass The object containing the relevant information about the data and the model pars : iterable A list of parameters to be passed to ``lpost.model`` in order to generate a model data set. Returns ------- sim_ps : :class:`stingray.Powerspectrum` object The simulated :class:`Powerspectrum` object """ # create own random state object if rng is None: rng = np.random.RandomState(None) model_spectrum = self._generate_model(lpost, pars) # use chi-square distribution to get fake data model_powers = ( model_spectrum * rng.chisquare(2 * self.ps.m, size=model_spectrum.shape[0]) / (2.0 * self.ps.m) ) sim_ps = copy.copy(self.ps) sim_ps.power = model_powers return sim_ps
[docs] def simulate_lrts(self, s_all, lpost1, t1, lpost2, t2, seed=None): """ Simulate likelihood ratios for two given models based on MCMC samples for the simpler model (i.e. the null hypothesis). Parameters ---------- s_all : numpy.ndarray of shape ``(nsamples, lpost1.npar)`` An array with MCMC samples derived from the null hypothesis model in ``lpost1``. Its second dimension must match the number of free parameters in ``lpost1.model``. lpost1 : :class:`LogLikelihood` or :class:`Posterior` subclass object Object containing the null hypothesis model t1 : iterable of length ``lpost1.npar`` A starting guess for fitting the model in ``lpost1`` lpost2 : :class:`LogLikelihood` or :class:`Posterior` subclass object Object containing the alternative hypothesis model t2 : iterable of length ``lpost2.npar`` A starting guess for fitting the model in ``lpost2`` max_post : bool, optional, default ``True`` If ``True``, then ``lpost1`` and ``lpost2`` should be :class:`Posterior` subclass objects; if ``False``, then ``lpost1`` and ``lpost2`` should be :class:`LogLikelihood` subclass objects seed : int, optional default ``None`` A seed to initialize the ``numpy.random.RandomState`` object to be passed on to ``_generate_data``. Useful for producing exactly reproducible results Returns ------- lrt_sim : numpy.ndarray An array with the simulated likelihood ratios for the simulated data """ assert lpost1.__class__ == lpost2.__class__, ( "Both LogLikelihood or " "Posterior objects must be " "of the same class!" ) nsim = s_all.shape[0] lrt_sim = np.zeros(nsim) rng = np.random.RandomState(seed) # now I can loop over all simulated parameter sets to generate a PSD for i, s in enumerate(s_all): # generate fake PSD sim_ps = self._generate_data(lpost1, s, rng) neg = True # make LogLikelihood objects for both: if isinstance(lpost1, LogLikelihood): sim_lpost1 = PSDLogLikelihood( sim_ps.freq, sim_ps.power, model=lpost1.model, m=sim_ps.m ) sim_lpost2 = PSDLogLikelihood( sim_ps.freq, sim_ps.power, model=lpost2.model, m=sim_ps.m ) max_post = False else: # make a :class:`Posterior` object sim_lpost1 = PSDPosterior(sim_ps.freq, sim_ps.power, lpost1.model, m=sim_ps.m) sim_lpost1.logprior = lpost1.logprior sim_lpost2 = PSDPosterior(sim_ps.freq, sim_ps.power, lpost2.model, m=sim_ps.m) sim_lpost2.logprior = lpost2.logprior max_post = True parest_sim = PSDParEst(sim_ps, max_post=max_post, fitmethod=self.fitmethod) try: lrt_sim[i], _, _ = parest_sim.compute_lrt( sim_lpost1, t1, sim_lpost2, t2, neg=neg, max_post=max_post ) except RuntimeError: logger.warning("Fitting was unsuccessful. " "Skipping this simulation!") continue return lrt_sim
[docs] def calibrate_highest_outlier( self, lpost, t0, sample=None, max_post=False, nsim=1000, niter=200, nwalkers=500, burnin=200, namestr="test", seed=None, ): r""" Calibrate the highest outlier in a data set using MCMC-simulated power spectra. In short, the procedure does a MAP fit to the data, computes the statistic .. math:: \max{(T_R = 2(\mathrm{data}/\mathrm{model}))} and then does an MCMC run using the data and the model, or generates parameter samples from the likelihood distribution using the derived covariance in a Maximum Likelihood fit. From the (posterior) samples, it generates fake power spectra. Each fake spectrum is fit in the same way as the data, and the highest data/model outlier extracted as for the data. The observed value of :math:`T_R` can then be directly compared to the simulated distribution of :math:`T_R` values in order to derive a p-value of the null hypothesis that the observed :math:`T_R` is compatible with being generated by noise. Parameters ---------- lpost : :class:`stingray.modeling.PSDPosterior` object An instance of class :class:`stingray.modeling.PSDPosterior` that defines the function to be minimized (either in ``loglikelihood`` or ``logposterior``) t0 : {list | numpy.ndarray} List/array with set of initial parameters sample : :class:`SamplingResults` instance, optional, default ``None`` If a sampler has already been run, the :class:`SamplingResults` instance can be fed into this method here, otherwise this method will run a sampler automatically max_post: bool, optional, default ``False`` If ``True``, do MAP fits on the power spectrum to find the highest data/model outlier Otherwise, do a Maximum Likelihood fit. If ``True``, the simulated power spectra will be generated from an MCMC run, otherwise the method will employ the approximated covariance matrix for the parameters derived from the likelihood surface to generate samples from that likelihood function. nsim : int, optional, default ``1000`` Number of fake power spectra to simulate from the posterior sample. Note that this number sets the resolution of the resulting p-value. For ``nsim=1000``, the highest resolution that can be achieved is :math:`10^{-3}`. niter : int, optional, default 200 If ``sample`` is ``None``, this variable will be used to set the number of steps in the MCMC procedure *after* burn-in. nwalkers : int, optional, default 500 If ``sample`` is ``None``, this variable will be used to set the number of MCMC chains run in parallel in the sampler. burnin : int, optional, default 200 If ``sample`` is ``None``, this variable will be used to set the number of burn-in steps to be discarded in the initial phase of the MCMC run namestr : str, optional, default ``test`` A string to be used for storing MCMC output and plots to disk seed : int, optional, default ``None`` An optional number to seed the random number generator with, for reproducibility of the results obtained with this method. Returns ------- pval : float The p-value that the highest data/model outlier is produced by random noise, calibrated using simulated power spectra from an MCMC run. References ---------- For more details on the procedure employed here, see * Vaughan, 2010: https://arxiv.org/abs/0910.2706 * Huppenkothen et al, 2013: https://arxiv.org/abs/1212.1011 """ # fit the model to the data res = self.fit(lpost, t0, neg=True) rng = np.random.RandomState(seed) # find the highest data/model outlier: out_high, _, _ = self._compute_highest_outlier(lpost, res) # simulate parameter sets from the simpler model if not max_post: # using Maximum Likelihood, so I'm going to simulate parameters # from a multivariate Gaussian # set up the distribution mvn = scipy.stats.multivariate_normal(mean=res.p_opt, cov=res.cov, seed=seed) if lpost.npar == 1: # sample parameters s_all = np.atleast_2d(mvn.rvs(size=nsim)).T else: s_all = mvn.rvs(size=nsim) else: if sample is None: # sample the :class:`Posterior` using MCMC sample = self.sample( lpost, res.p_opt, cov=res.cov, nwalkers=nwalkers, niter=niter, burnin=burnin, namestr=namestr, ) # pick nsim samples out of the :class:`Posterior` sample s_all = sample.samples[rng.choice(sample.samples.shape[0], nsim, replace=False)] # simulate LRTs # this method is defined in the subclasses! out_high_sim = self.simulate_highest_outlier(s_all, lpost, t0, max_post=max_post, seed=seed) # now I can compute the p-value: pval = ParameterEstimation._compute_pvalue(out_high, out_high_sim) return pval
[docs] def simulate_highest_outlier(self, s_all, lpost, t0, max_post=True, seed=None): r""" Simulate :math:`n` power spectra from a model and then find the highest data/model outlier in each. The data/model outlier is defined as .. math:: \max{(T_R = 2(\mathrm{data}/\mathrm{model}))} . Parameters ---------- s_all : numpy.ndarray A list of parameter values derived either from an approximation of the likelihood surface, or from an MCMC run. Has dimensions ``(n, ndim)``, where ``n`` is the number of simulated power spectra to generate, and ``ndim`` the number of model parameters. lpost : instance of a :class:`Posterior` subclass an instance of class :class:`Posterior` or one of its subclasses that defines the function to be minimized (either in ``loglikelihood`` or ``logposterior``) t0 : iterable list or array containing the starting parameters. Its length must match ``lpost.model.npar``. max_post: bool, optional, default ``False`` If ``True``, do MAP fits on the power spectrum to find the highest data/model outlier Otherwise, do a Maximum Likelihood fit. If True, the simulated power spectra will be generated from an MCMC run, otherwise the method will employ the approximated covariance matrix for the parameters derived from the likelihood surface to generate samples from that likelihood function. seed : int, optional, default ``None`` An optional number to seed the random number generator with, for reproducibility of the results obtained with this method. Returns ------- max_y_all : numpy.ndarray An array of maximum outliers for each simulated power spectrum """ # the number of simulations nsim = s_all.shape[0] # empty array for the simulation results max_y_all = np.zeros(nsim) rng = np.random.RandomState(seed) # now I can loop over all simulated parameter sets to generate a PSD for i, s in enumerate(s_all): # generate fake PSD sim_ps = self._generate_data(lpost, s, rng=rng) # make LogLikelihood objects for both: if not max_post: sim_lpost = PSDLogLikelihood( sim_ps.freq, sim_ps.power, model=lpost.model, m=sim_ps.m ) else: # make a :class:`Posterior` object sim_lpost = PSDPosterior(sim_ps.freq, sim_ps.power, lpost.model, m=sim_ps.m) sim_lpost.logprior = lpost.logprior parest_sim = PSDParEst(sim_ps, max_post=max_post) try: res = parest_sim.fit(sim_lpost, t0, neg=True) max_y, maxfreq, maxind = self._compute_highest_outlier(sim_lpost, res, nmax=1) max_y_all[i] = max_y[0] except RuntimeError: logger.warning("Fitting unsuccessful! " "Skipping this simulation!") continue return np.hstack(max_y_all)
def _compute_highest_outlier(self, lpost, res, nmax=1): r""" Auxiliary method calculating the highest outlier statistic in a power spectrum. The maximum data/model outlier is defined as .. math:: \max{(T_R = 2(\mathrm{data}/\mathrm{model}))} Parameters ---------- lpost : instance of a :class:`Posterior` subclass and instance of class :class:`Posterior` or one of its subclasses that defines the function to be minimized (either in ``loglikelihood`` or ``logposterior``) res : :class:`OptimizationResults` object An object containing useful summaries of the fitting procedure. For details, see documentation of :class:`OptimizationResults`. nmax : int, optional, default ``1`` The number of maxima to extract from the power spectra. By default, only the highest data/model outlier is extracted. This number allows to extract the ``nmax`` highest outliers, useful when looking for multiple signals in a power spectrum. Returns ------- max_y : {float | numpy.ndarray} The ``nmax`` highest data/model outliers max_x : {float | numpy.ndarray} The frequencies corresponding to the outliers in ``max_y`` max_ind : {int | numpy.ndarray} The indices corresponding to the outliers in ``max_y`` """ residuals = 2.0 * lpost.y / res.mfit ratio_sort = copy.copy(residuals) ratio_sort.sort() max_y = ratio_sort[-nmax:] max_x = np.zeros(max_y.shape[0]) max_ind = np.zeros(max_y.shape[0]) for i, my in enumerate(max_y): max_x[i], max_ind[i] = self._find_outlier(lpost.x, residuals, my) return max_y, max_x, max_ind @staticmethod def _find_outlier(xdata, ratio, max_y): """ Small auxiliary method that finds the index where an array has its maximum, and the corresponding value in ``xdata``. Parameters ---------- xdata : numpy.ndarray A list of independent variables ratio : Numpy.ndarray A list of dependent variables corresponding to ``xdata`` max_y : float The maximum value of ``ratio`` Returns ------- max_x : float The value in ``xdata`` corresponding to the entry in ``ratio`` where ``ratio == `max_y`` max_ind : float The index of the entry in ``ratio`` where ``ratio == max_y`` """ max_ind = np.where(ratio == max_y)[0][0] max_x = xdata[max_ind] return max_x, max_ind
[docs] def plotfits(self, res1, res2=None, save_plot=False, namestr="test", log=False): """ Plotting method that allows to plot either one or two best-fit models with the data. Plots a power spectrum with the best-fit model, as well as the data/model residuals for each model. Parameters ---------- res1 : :class:`OptimizationResults` object Output of a successful fitting procedure res2 : :class:`OptimizationResults` object, optional, default ``None`` Optional output of a second successful fitting procedure, e.g. with a competing model save_plot : bool, optional, default ``False`` If ``True``, the resulting figure will be saved to a file namestr : str, optional, default ``test`` If ``save_plot`` is ``True``, this string defines the path and file name for the output plot log : bool, optional, default ``False`` If ``True``, plot the axes logarithmically. """ # make a figure f = plt.figure(figsize=(12, 10)) # adjust subplots such that the space between the top and bottom # of each are zero plt.subplots_adjust(hspace=0.0, wspace=0.4) # first subplot of the grid, twice as high as the other two # This is the periodogram with the two fitted models overplotted s1 = plt.subplot2grid((4, 1), (0, 0), rowspan=2) if log: logx = np.log10(self.ps.freq) logy = np.log10(self.ps.power) logpar1 = np.log10(res1.mfit) (p1,) = s1.plot(logx, logy, color="black", drawstyle="steps-mid") (p2,) = s1.plot(logx, logpar1, color="blue", lw=2) s1.set_xlim([np.min(logx), np.max(logx)]) s1.set_ylim([np.min(logy) - 1.0, np.max(logy) + 1]) if self.ps.norm == "leahy": s1.set_ylabel("log(Leahy-Normalized Power)", fontsize=18) elif self.ps.norm == "rms": s1.set_ylabel("log(RMS-Normalized Power)", fontsize=18) else: s1.set_ylabel("log(Power)", fontsize=18) else: (p1,) = s1.plot(self.ps.freq, self.ps.power, color="black", drawstyle="steps-mid") (p2,) = s1.plot(self.ps.freq, res1.mfit, color="blue", lw=2) s1.set_xscale("log") s1.set_yscale("log") s1.set_xlim([np.min(self.ps.freq), np.max(self.ps.freq)]) s1.set_ylim([np.min(self.ps.freq) / 10.0, np.max(self.ps.power) * 10.0]) if self.ps.norm == "leahy": s1.set_ylabel("Leahy-Normalized Power", fontsize=18) elif self.ps.norm == "rms": s1.set_ylabel("RMS-Normalized Power", fontsize=18) else: s1.set_ylabel("Power", fontsize=18) if res2 is not None: if log: logpar2 = np.log10(res2.mfit) (p3,) = s1.plot(logx, logpar2, color="red", lw=2) else: (p3,) = s1.plot(self.ps.freq, res2.mfit, color="red", lw=2) s1.legend([p1, p2, p3], ["data", "model 1 fit", "model 2 fit"]) else: s1.legend([p1, p2], ["data", "model fit"]) s1.set_title("Periodogram and fits for data set " + namestr, fontsize=18) # second subplot: power/model for Power law and straight line s2 = plt.subplot2grid((4, 1), (2, 0), rowspan=1) pldif = self.ps.power / res1.mfit s2.set_ylabel("Residuals, \n first model", fontsize=18) if log: s2.plot(logx, pldif, color="black", drawstyle="steps-mid") s2.plot(logx, np.ones(self.ps.freq.shape[0]), color="blue", lw=2) s2.set_xlim([np.min(logx), np.max(logx)]) s2.set_ylim([np.min(pldif), np.max(pldif)]) else: s2.plot(self.ps.freq, pldif, color="black", drawstyle="steps-mid") s2.plot(self.ps.freq, np.ones_like(self.ps.power), color="blue", lw=2) s2.set_xscale("log") s2.set_yscale("log") s2.set_xlim([np.min(self.ps.freq), np.max(self.ps.freq)]) s2.set_ylim([np.min(pldif), np.max(pldif)]) if res2 is not None: bpldif = self.ps.power / res2.mfit # third subplot: power/model for bent power law and straight line s3 = plt.subplot2grid((4, 1), (3, 0), rowspan=1) if log: s3.plot(logx, bpldif, color="black", drawstyle="steps-mid") s3.plot(logx, np.ones(len(self.ps.freq)), color="red", lw=2) s3.axis([np.min(logx), np.max(logx), np.min(bpldif), np.max(bpldif)]) s3.set_xlabel("log(Frequency) [Hz]", fontsize=18) else: s3.plot(self.ps.freq, bpldif, color="black", drawstyle="steps-mid") s3.plot(self.ps.freq, np.ones(len(self.ps.freq)), color="red", lw=2) s3.set_xscale("log") s3.set_yscale("log") s3.set_xlim([np.min(self.ps.freq), np.max(self.ps.freq)]) s3.set_ylim([np.min(bpldif), np.max(bpldif)]) s3.set_xlabel("Frequency [Hz]", fontsize=18) s3.set_ylabel("Residuals, \n second model", fontsize=18) else: if log: s2.set_xlabel("log(Frequency) [Hz]", fontsize=18) else: s2.set_xlabel("Frequency [Hz]", fontsize=18) ax = plt.gca() for label in ax.get_xticklabels() + ax.get_yticklabels(): label.set_fontsize(14) # make sure xticks are taken from first plots, but don't # appear there plt.setp(s1.get_xticklabels(), visible=False) if save_plot: # save figure in png file and close plot device plt.savefig(namestr + "_ps_fit.png", format="png") return