Source code for stingray.crossspectrum

import copy
import warnings
from collections.abc import Iterable, Iterator, Generator

import numpy as np
import scipy
import scipy.optimize
import scipy.stats
import matplotlib.pyplot as plt

from stingray.exceptions import StingrayError
from stingray.utils import rebin_data, rebin_data_log, simon

from .base import StingrayObject
from .events import EventList
from .gti import cross_two_gtis, time_intervals_from_gtis
from .lightcurve import Lightcurve
from .fourier import avg_cs_from_iterables, error_on_averaged_cross_spectrum
from .fourier import avg_cs_from_timeseries, poisson_level
from .fourier import normalize_periodograms, raw_coherence
from .fourier import get_flux_iterable_from_segments
from .fourier import get_rms_from_unnorm_periodogram
from .power_colors import power_color

from scipy.special import factorial


__all__ = [
    "Crossspectrum",
    "AveragedCrossspectrum",
    "DynamicalCrossspectrum",
    "cospectra_pvalue",
    "normalize_crossspectrum",
    "time_lag",
    "coherence",
    "get_flux_generator",
]


def get_flux_generator(data, segment_size, dt=None):
    """Get a flux generator from different segments of a data object

    It is just a wrapper around
    ``stingray.fourier.get_flux_iterable_from_segments``, providing
    this method with the information it needs to create the iterables,
    starting from an event list or a light curve.

    Only accepts `Lightcurve`s and `EventList`s.

    Parameters
    ----------
    data : `Lightcurve` or `EventList`
        Input data
    segment_size : float
        Segment size in seconds

    Other parameters
    ----------------
    dt : float, default None
        Sampling time of the output flux iterables. Required if input data
        is an event list, otherwise the light curve sampling time is selected.

    Returns
    -------
    flux_iterable : ``generator``
        Generator of flux arrays.

    Examples
    --------
    >>> mean = 256
    >>> length = 128
    >>> times = np.sort(np.random.uniform(0, length, int(mean * length)))
    >>> events = EventList(time=times, gti=[[0, length]])
    >>> dt = 0.125
    >>> segment_size = 4

    Create a light curve
    >>> lc = events.to_lc(dt=dt)

    Create a light curve with a different error distribution
    >>> lc_renorm = copy.deepcopy(lc)
    >>> lc_renorm.counts = lc.counts / mean
    >>> lc_renorm.counts_err = lc.counts_err / mean
    >>> lc_renorm.err_dist = "gauss"

    Create an iterable from events, forgetting ``dt``. Should fail
    >>> get_flux_generator(events, segment_size, dt=None)
    Traceback (most recent call last):
    ...
    ValueError: If data is an EventList, you need to specify...

    Create an iterable from events
    >>> iter_ev = get_flux_generator(events, segment_size, dt=dt)

    Create an iterable from the light curve
    >>> iter_lc = get_flux_generator(lc, segment_size, dt=dt)

    Create an iterable from the non-poisson light curve
    >>> iter_lc_nonpois = get_flux_generator(lc_renorm, segment_size, dt=dt)

    Verify that they are equivalent
    >>> for l1, l2 in zip(iter_ev, iter_lc): assert np.allclose(l1, l2)

    Note that the iterable for non-Poissonian light curves also returns the uncertainty
    >>> for l1, (l2, l2e) in zip(iter_lc, iter_lc_nonpois): assert np.allclose(l1, l2 * mean)

    """
    times = data.time
    gti = data.gti

    counts = err = None
    if isinstance(data, Lightcurve):
        counts = data.counts
        N = counts.size
        if data.err_dist.lower() != "poisson":
            err = data.counts_err
    elif isinstance(data, EventList):
        if dt is None:
            raise ValueError("If data is an EventList, you need to specify the bin time dt")
        N = int(np.rint(segment_size / dt))

    flux_iterable = get_flux_iterable_from_segments(
        times, gti, segment_size, N, fluxes=counts, errors=err
    )
    return flux_iterable


[docs] def coherence(lc1, lc2): """ Estimate coherence function of two light curves. For details on the definition of the coherence, see Vaughan and Nowak, 1996 [#]_. Parameters ---------- lc1: :class:`stingray.Lightcurve` object The first light curve data for the channel of interest. lc2: :class:`stingray.Lightcurve` object The light curve data for reference band Returns ------- coh : ``np.ndarray`` The array of coherence versus frequency References ---------- .. [#] https://iopscience.iop.org/article/10.1086/310430 """ warnings.warn( "The coherence function, as implemented, does not work as expected. " "Please use the coherence function of AveragedCrossspectrum, with the " "correct parameters.", DeprecationWarning, ) if not isinstance(lc1, Lightcurve): raise TypeError("lc1 must be a lightcurve.Lightcurve object") if not isinstance(lc2, Lightcurve): raise TypeError("lc2 must be a lightcurve.Lightcurve object") cs = Crossspectrum(lc1, lc2, norm="none") return cs.coherence()
def time_lag(lc1, lc2): """ Estimate the time lag of two light curves. Calculate time lag and uncertainty. Equation from Bendat & Piersol, 2011 [bendat-2011]_. Parameters ---------- lc1: :class:`stingray.Lightcurve` object The first light curve data for the channel of interest. lc2: :class:`stingray.Lightcurve` object The light curve data for reference band Returns ------- lag : np.ndarray The time lag lag_err : np.ndarray The uncertainty in the time lag References ---------- .. [bendat-2011] https://www.wiley.com/en-us/Random+Data%3A+Analysis+and+Measurement+Procedures%2C+4th+Edition-p-9780470248775 """ warnings.warn( "This standalone time_lag function is deprecated. " "Please use the time_lag method of AveragedCrossspectrum, with the " "correct parameters.", DeprecationWarning, ) if not isinstance(lc1, Lightcurve): raise TypeError("lc1 must be a lightcurve.Lightcurve object") if not isinstance(lc2, Lightcurve): raise TypeError("lc2 must be a lightcurve.Lightcurve object") cs = Crossspectrum(lc1, lc2, norm="none") lag = cs.time_lag() return lag def normalize_crossspectrum( unnorm_power, tseg, nbins, nphots1, nphots2, norm="none", power_type="real" ): """ Normalize the real part of the cross spectrum to Leahy, absolute rms^2, fractional rms^2 normalization, or not at all. Here for API compatibility purposes. Will be removed in the next major release. Parameters ---------- unnorm_power: numpy.ndarray The unnormalized cross spectrum. tseg: int The length of the Fourier segment, in seconds. nbins : int Number of bins in the light curve nphots1 : int Number of photons in the light curve no. 1 nphots2 : int Number of photons in the light curve no. 2 Other parameters ---------------- norm : str One of `'leahy'` (Leahy+83), `'frac'` (fractional rms), `'abs'` (absolute rms) power_type : str One of `'real'` (real part), `'all'` (all complex powers), `'abs'` (absolute value) Returns ------- power: numpy.nd.array The normalized co-spectrum (real part of the cross spectrum). For 'none' normalization, imaginary part is returned as well. """ warnings.warn( "normalize_crossspectrum is now deprecated and will be removed " "in the next major release. Please use " "stingray.fourier.normalize_periodograms instead.", DeprecationWarning, ) dt = tseg / nbins nph = np.sqrt(nphots1 * nphots2) mean = nph / nbins return normalize_periodograms( unnorm_power, dt, nbins, mean, n_ph=nph, norm=norm, power_type=power_type ) def normalize_crossspectrum_gauss( unnorm_power, mean_flux, var, dt, N, norm="none", power_type="real" ): """ Normalize the real part of the cross spectrum to Leahy, absolute rms^2, fractional rms^2 normalization, or not at all. Here for API compatibility purposes. Will be removed in the next major release. Parameters ---------- unnorm_power: numpy.ndarray The unnormalized cross spectrum. mean_flux: float The mean flux of the light curve (if a cross spectrum, the geometrical mean of the flux in the two channels) var: float The variance of the light curve (if a cross spectrum, the geometrical mean of the variance in the two channels) dt: float The sampling time of the light curve N: int The number of bins in the light curve Other parameters ---------------- norm : str One of `'leahy'` (Leahy+83), `'frac'` (fractional rms), `'abs'` (absolute rms) power_type : str One of `'real'` (real part), `'all'` (all complex powers), `'abs'` (absolute value) Returns ------- power: numpy.nd.array The normalized co-spectrum (real part of the cross spectrum). For 'none' normalization, imaginary part is returned as well. """ warnings.warn( "normalize_crossspectrum_gauss is now deprecated and will be " "removed in the next major release. Please use " "stingray.fourier.normalize_periodograms instead.", DeprecationWarning, ) mean = mean_flux * dt return normalize_periodograms( unnorm_power, dt, N, mean, variance=var, norm=norm, power_type=power_type ) def _averaged_cospectra_cdf(xcoord, n): """ Function calculating the cumulative distribution function for averaged cospectra, Equation 19 of Huppenkothen & Bachetti (2018). Parameters ---------- xcoord : float or iterable The cospectral power for which to calculate the CDF. n : int The number of averaged cospectra Returns ------- cdf : float The value of the CDF at `xcoord` for `n` averaged cospectra """ if np.size(xcoord) == 1: xcoord = [xcoord] cdf = np.zeros_like(xcoord) for i, x in enumerate(xcoord): prefac_bottom1 = factorial(n - 1) for j in range(n): prefac_top = factorial(n - 1 + j) prefac_bottom2 = factorial(n - 1 - j) * factorial(j) prefac_bottom3 = 2.0 ** (n + j) prefac = prefac_top / (prefac_bottom1 * prefac_bottom2 * prefac_bottom3) gf = -j + n first_fac = scipy.special.gamma(gf) if x >= 0: second_fac = scipy.special.gammaincc(gf, n * x) * first_fac fac = 2.0 * first_fac - second_fac else: fac = scipy.special.gammaincc(gf, -n * x) * first_fac cdf[i] += prefac * fac if np.size(xcoord) == 1: return cdf[i] return cdf def cospectra_pvalue(power, nspec): """ This function computes the single-trial p-value that the power was observed under the null hypothesis that there is no signal in the data. Important: the underlying assumption that make this calculation valid is that the powers in the cross spectrum follow a Laplace distribution, and this requires that: 1. the co-spectrum is normalized according to [Leahy 1983]_ 2. there is only white noise in the light curve. That is, there is no aperiodic variability that would change the overall shape of the power spectrum. Also note that the p-value is for a *single trial*, i.e. the power currently being tested. If more than one power or more than one power spectrum are being tested, the resulting p-value must be corrected for the number of trials (Bonferroni correction). Mathematical formulation in [Huppenkothen 2017]_. Parameters ---------- power : float The squared Fourier amplitude of a spectrum to be evaluated nspec : int The number of spectra or frequency bins averaged in ``power``. This matters because averaging spectra or frequency bins increases the signal-to-noise ratio, i.e. makes the statistical distributions of the noise narrower, such that a smaller power might be very significant in averaged spectra even though it would not be in a single cross spectrum. Returns ------- pval : float The classical p-value of the observed power being consistent with the null hypothesis of white noise References ---------- * .. [Leahy 1983] https://ui.adsabs.harvard.edu/#abs/1983ApJ...266..160L/abstract * .. [Huppenkothen 2017] http://adsabs.harvard.edu/abs/2018ApJS..236...13H """ if not np.all(np.isfinite(power)): raise ValueError("power must be a finite floating point number!") # if power < 0: # raise ValueError("power must be a positive real number!") if not np.isfinite(nspec): raise ValueError("nspec must be a finite integer number") if not np.isclose(nspec % 1, 0): raise ValueError("nspec must be an integer number!") if nspec < 1: raise ValueError("nspec must be larger or equal to 1") elif nspec == 1: lapl = scipy.stats.laplace(0, 1) pval = lapl.sf(power) elif nspec > 50: exp_sigma = np.sqrt(2) / np.sqrt(nspec) gauss = scipy.stats.norm(0, exp_sigma) pval = gauss.sf(power) else: pval = 1.0 - _averaged_cospectra_cdf(power, nspec) return pval
[docs] class Crossspectrum(StingrayObject): main_array_attr = "freq" type = "crossspectrum" """ Make a cross spectrum from a (binned) light curve. You can also make an empty :class:`Crossspectrum` object to populate with your own Fourier-transformed data (this can sometimes be useful when making binned power spectra). Stingray uses the scipy.fft standards for the sign of the Nyquist frequency. Parameters ---------- data1: :class:`stingray.Lightcurve` or :class:`stingray.events.EventList`, optional, default ``None`` The dataset for the first channel/band of interest. data2: :class:`stingray.Lightcurve` or :class:`stingray.events.EventList`, optional, default ``None`` The dataset for the second, or "reference", band. norm: {``frac``, ``abs``, ``leahy``, ``none``}, default ``none`` The normalization of the (real part of the) cross spectrum. power_type: string, optional, default ``real`` Parameter to choose among complete, real part and magnitude of the cross spectrum. fullspec: boolean, optional, default ``False`` If False, keep only the positive frequencies, or if True, keep all of them . Other Parameters ---------------- gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...] Good Time intervals. Defaults to the common GTIs from the two input objects. Could throw errors if these GTIs have overlaps with the input `Lightcurve` GTIs! If you're getting errors regarding your GTIs, don't use this and only give GTIs to the `Lightcurve` objects before making the cross spectrum. lc1: :class:`stingray.Lightcurve`object OR iterable of :class:`stingray.Lightcurve` objects For backwards compatibility only. Like ``data1``, but no :class:`stingray.events.EventList` objects allowed lc2: :class:`stingray.Lightcurve`object OR iterable of :class:`stingray.Lightcurve` objects For backwards compatibility only. Like ``data2``, but no :class:`stingray.events.EventList` objects allowed dt: float The time resolution of the light curve. Only needed when constructing light curves in the case where ``data1``, ``data2`` are :class:`EventList` objects skip_checks: bool Skip initial checks, for speed or other reasons (you need to trust your inputs!) Attributes ---------- freq: numpy.ndarray The array of mid-bin frequencies that the Fourier transform samples power: numpy.ndarray The array of cross spectra (complex numbers) power_err: numpy.ndarray The uncertainties of ``power``. An approximation for each bin given by ``power_err= power/sqrt(m)``. Where ``m`` is the number of power averaged in each bin (by frequency binning, or averaging more than one spectra). Note that for a single realization (``m=1``) the error is equal to the power. df: float The frequency resolution m: int The number of averaged cross-spectra amplitudes in each bin. n: int The number of data points/time bins in one segment of the light curves. k: array of int The rebinning scheme if the object has been rebinned otherwise is set to 1. nphots1: float The total number of photons in light curve 1 nphots2: float The total number of photons in light curve 2 """ def __init__( self, data1=None, data2=None, norm="frac", gti=None, lc1=None, lc2=None, power_type="all", dt=None, fullspec=False, skip_checks=False, save_all=False, ): self._type = None # for backwards compatibility if data1 is None: data1 = lc1 if data2 is None: data2 = lc2 empty = data1 is None and data2 is None good_input = not empty if not skip_checks: good_input = self.initial_checks( data1=data1, data2=data2, norm=norm, gti=gti, lc1=lc1, lc2=lc2, power_type=power_type, dt=dt, fullspec=fullspec, ) self.dt = dt norm = norm.lower() self.norm = norm self.k = 1 if empty or not good_input: return self._initialize_empty() return self._initialize_from_any_input( data1, data2, dt=dt, norm=norm, power_type=power_type, fullspec=fullspec, gti=gti, save_all=save_all, )
[docs] def initial_checks( self, data1=None, data2=None, norm="frac", gti=None, lc1=None, lc2=None, segment_size=None, power_type="real", dt=None, fullspec=False, ): """Run initial checks on the input. Returns True if checks are passed, False if they are not. Raises various errors for different bad inputs Examples -------- >>> times = np.arange(0, 10) >>> counts = np.random.poisson(100, 10) >>> lc1 = Lightcurve(times, counts, skip_checks=True) >>> lc2 = Lightcurve(times, counts, skip_checks=True) >>> ev1 = EventList(times) >>> ev2 = EventList(times) >>> c = Crossspectrum() >>> ac = AveragedCrossspectrum() If norm is not a string, raise a TypeError >>> Crossspectrum.initial_checks(c, norm=1) Traceback (most recent call last): ... TypeError: norm must be a string... If ``norm`` is not one of the valid norms, raise a ValueError >>> Crossspectrum.initial_checks(c, norm="blabla") Traceback (most recent call last): ... ValueError: norm must be 'frac'... If ``power_type`` is not one of the valid norms, raise a ValueError >>> Crossspectrum.initial_checks(c, power_type="blabla") Traceback (most recent call last): ... ValueError: `power_type` not recognized! If the user passes only one light curve, raise a ValueError >>> Crossspectrum.initial_checks(c, data1=lc1, data2=None) Traceback (most recent call last): ... ValueError: You can't do a cross spectrum... If the user passes an event list without dt, raise a ValueError >>> Crossspectrum.initial_checks(c, data1=ev1, data2=ev2, dt=None) Traceback (most recent call last): ... ValueError: If using event lists, please specify... """ if isinstance(norm, str) is False: raise TypeError("norm must be a string") if norm.lower() not in ["frac", "abs", "leahy", "none"]: raise ValueError("norm must be 'frac', 'abs', 'leahy', or 'none'!") if power_type not in ["all", "absolute", "real"]: raise ValueError("`power_type` not recognized!") # check if input data is a Lightcurve object, if not make one or # make an empty Crossspectrum object if lc1 == ``None`` or lc2 == ``None`` if lc1 is not None or lc2 is not None: warnings.warn( "The lcN keywords are now deprecated. Use dataN instead", DeprecationWarning ) if data1 is None or data2 is None: if data1 is not None or data2 is not None: raise ValueError("You can't do a cross spectrum with just one light curve!") else: return False dt_is_invalid = (dt is None) or (dt <= np.finfo(float).resolution) if segment_size is None: # checks to be run for non-averaged spectra if gti is not None and len(gti) > 1: raise TypeError("Non-averaged cross spectra need a single GTI") if type(data1) != type(data2): raise TypeError("Input data have to be of the same kind") if isinstance(data1, EventList): if dt_is_invalid: raise ValueError( "If using event lists, please specify the bin time to generate lightcurves." ) elif isinstance(data1, Lightcurve): if data1.err_dist.lower() != data2.err_dist.lower(): simon( "Your lightcurves have different statistics." "The errors in the Crossspectrum will be incorrect." ) # If dt differs slightly, its propagated error must not be more than # 1/100th of the bin if not np.isclose(data1.dt, data2.dt, rtol=0.01 * data1.dt / data1.tseg): raise StingrayError("Light curves do not have same time binning dt.") if data1.tseg != data2.tseg: simon( "Lightcurves do not have same tseg. This means that the data" "from the two channels are not completely in sync. This " "might or might not be an issue. Keep an eye on it." ) elif isinstance(data1, (list, tuple)): if not isinstance(data1[0], Lightcurve) or not isinstance(data2[0], Lightcurve): raise TypeError("Inputs lists have to contain light curve objects") if data1[0].err_dist.lower() != data2[0].err_dist.lower(): simon( "Your lightcurves have different statistics." "The errors in the Crossspectrum will be incorrect." ) elif isinstance(data1, (Generator, Iterator)): pass else: raise TypeError("Input data are invalid") return True
[docs] def rebin(self, df=None, f=None, method="mean"): """ Rebin the cross spectrum to a new frequency resolution ``df``. Parameters ---------- df: float The new frequency resolution Other Parameters ---------------- f: float the rebin factor. If specified, it substitutes df with ``f*self.df`` Returns ------- bin_cs = :class:`Crossspectrum` (or one of its subclasses) object The newly binned cross spectrum or power spectrum. Note: this object will be of the same type as the object that called this method. For example, if this method is called from :class:`AveragedPowerspectrum`, it will return an object of class :class:`AveragedPowerspectrum`, too. """ if f is None and df is None: raise ValueError("You need to specify at least one between f and df") elif f is not None: df = f * self.df # rebin cross spectrum to new resolution binfreq, bincs, binerr, step_size = rebin_data( self.freq, self.power, df, self.power_err, method=method, dx=self.df ) # make an empty cross spectrum object # note: syntax deliberate to work with subclass Powerspectrum bin_cs = copy.copy(self) # store the binned periodogram in the new object bin_cs.freq = binfreq bin_cs.power = bincs bin_cs.df = df bin_cs.power_err = binerr if hasattr(self, "unnorm_power") and self.unnorm_power is not None: unnorm_power_err = None if hasattr(self, "unnorm_power_err") and self.unnorm_power_err is not None: unnorm_power_err = self.unnorm_power_err _, binpower_unnorm, binpower_err_unnorm, _ = rebin_data( self.freq, self.unnorm_power, df, dx=self.df, yerr=unnorm_power_err, method=method ) if hasattr(self, "unnorm_power_err") and self.unnorm_power_err is not None: bin_cs.unnorm_power_err = binpower_err_unnorm bin_cs.unnorm_power = binpower_unnorm if hasattr(self, "cs_all"): cs_all = [] for c in self.cs_all: cs_all.append( rebin_data(self.freq, c, dx_new=df, yerr=None, method=method, dx=self.df)[1] ) bin_cs.cs_all = cs_all if hasattr(self, "pds1"): bin_cs.pds1 = self.pds1.rebin(df=df, f=f, method=method) if hasattr(self, "pds2"): bin_cs.pds2 = self.pds2.rebin(df=df, f=f, method=method) bin_cs.m = np.rint(step_size * self.m) return bin_cs
[docs] def to_norm(self, norm, inplace=False): """Convert Cross spectrum to new normalization. Parameters ---------- norm : str The new normalization of the spectrum Other parameters ---------------- inplace: bool, default False If True, change the current instance. Otherwise, return a new one Returns ------- new_spec : object, same class as input The new, normalized, spectrum. """ if norm == self.norm: return copy.deepcopy(self) variance1 = variance2 = variance = None if self.type == "powerspectrum": # This is the case for Powerspectrum mean = mean1 = mean2 = self.nphots / self.n if hasattr(self, "err_dist") and self.err_dist != "poisson": variance = self.variance nph = self.nphots else: nph = np.sqrt(self.nphots1 * self.nphots2) mean1 = self.nphots1 / self.n mean2 = self.nphots2 / self.n mean = nph / self.n if hasattr(self, "err_dist") and self.err_dist != "poisson": variance1 = self.variance1 variance2 = self.variance2 variance = np.sqrt(self.variance1 * self.variance2) if inplace: new_spec = self else: new_spec = copy.deepcopy(self) power_type = "all" if hasattr(self, "power_type"): power_type = self.power_type for attr in ["power", "power_err"]: unnorm_attr = "unnorm_" + attr if not hasattr(self, unnorm_attr) or getattr(self, unnorm_attr) is None: continue power = normalize_periodograms( getattr(self, unnorm_attr), self.dt, self.n, mean, n_ph=nph, variance=variance, norm=norm, power_type=power_type, ) setattr(new_spec, attr, power) new_spec.norm = norm if hasattr(self, "pds1"): p1 = normalize_periodograms( getattr(self.pds1, unnorm_attr), self.dt, self.n, mean1, n_ph=self.nphots1, variance=variance1, norm=norm, power_type=power_type, ) setattr(new_spec.pds1, attr, p1) p2 = normalize_periodograms( getattr(self.pds2, unnorm_attr), self.dt, self.n, mean2, n_ph=self.nphots2, variance=variance2, norm=norm, power_type=power_type, ) setattr(new_spec.pds2, attr, p2) new_spec.pds1.norm = new_spec.pds2.norm = norm return new_spec
def _normalize_crossspectrum(self, unnorm_power): """ Normalize the real part of the cross spectrum to Leahy, absolute rms^2, fractional rms^2 normalization, or not at all. Parameters ---------- unnorm_power: numpy.ndarray The unnormalized cross spectrum. Returns ------- power: numpy.nd.array The normalized co-spectrum (real part of the cross spectrum). For 'none' normalization, imaginary part is returned as well. """ nph = np.sqrt(self.nphots1 * self.nphots2) mean = nph / self.n variance = None if self.err_dist != "poisson": variance = np.sqrt(self.variance1 * self.variance2) return normalize_periodograms( unnorm_power, self.dt, self.n, mean, n_ph=nph, variance=variance, norm=self.norm, power_type=self.power_type, )
[docs] def rebin_log(self, f=0.01): """ Logarithmic rebin of the periodogram. The new frequency depends on the previous frequency modified by a factor f: .. math:: d\\nu_j = d\\nu_{j-1} (1+f) Parameters ---------- f: float, optional, default ``0.01`` parameter that steers the frequency resolution Returns ------- new_spec : :class:`Crossspectrum` (or one of its subclasses) object The newly binned cross spectrum or power spectrum. Note: this object will be of the same type as the object that called this method. For example, if this method is called from :class:`AveragedPowerspectrum`, it will return an object of class """ binfreq, binpower, binpower_err, nsamples = rebin_data_log( self.freq, self.power, f, y_err=self.power_err, dx=self.df ) new_spec = copy.copy(self) new_spec.freq = binfreq new_spec.power = binpower new_spec.power_err = binpower_err new_spec.m = nsamples * self.m new_spec.dt = self.dt new_spec.k = nsamples if hasattr(self, "unnorm_power") and self.unnorm_power is not None: unnorm_power_err = None if hasattr(self, "unnorm_power_err") and self.unnorm_power_err is not None: unnorm_power_err = self.unnorm_power_err _, binpower_unnorm, binpower_err_unnorm, _ = rebin_data_log( self.freq, self.unnorm_power, f, dx=self.df, y_err=unnorm_power_err ) new_spec.unnorm_power = binpower_unnorm if hasattr(self, "unnorm_power_err") and self.unnorm_power_err is not None: new_spec.unnorm_power_err = binpower_err_unnorm if hasattr(self, "pds1"): new_spec.pds1 = self.pds1.rebin_log(f) if hasattr(self, "pds2"): new_spec.pds2 = self.pds2.rebin_log(f) if hasattr(self, "cs_all"): cs_all = [] for c in self.cs_all: cs_all.append(rebin_data_log(self.freq, c, f, dx=self.df)[1]) new_spec.cs_all = cs_all return new_spec
[docs] def coherence(self): """Compute Coherence function of the cross spectrum. Coherence is defined in Vaughan and Nowak, 1996 [#]_. It is a Fourier frequency dependent measure of the linear correlation between time series measured simultaneously in two energy channels. Returns ------- coh : numpy.ndarray Coherence function References ---------- .. [#] https://iopscience.iop.org/article/10.1086/310430 """ # this computes the averaged power spectrum, but using the # cross spectrum code to avoid circular imports return raw_coherence( self.unnorm_power, self.pds1.unnorm_power, self.pds2.unnorm_power, 0, 0, self.n )
[docs] def phase_lag(self): """Calculate the fourier phase lag of the cross spectrum. This is defined as the argument of the complex cross spectrum, and gives the delay at all frequencies, in cycles, of one input light curve with respect to the other. """ return np.angle(self.unnorm_power)
[docs] def time_lag(self): r"""Calculate the fourier time lag of the cross spectrum. The time lag is calculated by taking the phase lag :math:`\phi` and ..math:: \tau = \frac{\phi}{\two pi \nu} where :math:`\nu` is the center of the frequency bins. """ if self.__class__ in [Crossspectrum, AveragedCrossspectrum]: ph_lag = self.phase_lag() return ph_lag / (2 * np.pi * self.freq) else: raise AttributeError("Object has no attribute named 'time_lag' !")
[docs] def plot( self, labels=None, axis=None, title=None, marker="-", save=False, filename=None, ax=None ): """ Plot the amplitude of the cross spectrum vs. the frequency using ``matplotlib``. Parameters ---------- labels : iterable, default ``None`` A list of tuple with ``xlabel`` and ``ylabel`` as strings. axis : list, tuple, string, default ``None`` Parameter to set axis properties of the ``matplotlib`` figure. For example it can be a list like ``[xmin, xmax, ymin, ymax]`` or any other acceptable argument for the``matplotlib.pyplot.axis()`` method. title : str, default ``None`` The title of the plot. marker : str, default '-' Line style and color of the plot. Line styles and colors are combined in a single format string, as in ``'bo'`` for blue circles. See ``matplotlib.pyplot.plot`` for more options. save : boolean, optional, default ``False`` If ``True``, save the figure with specified filename. filename : str File name of the image to save. Depends on the boolean ``save``. ax : ``matplotlib.Axes`` object An axes object to fill with the cross correlation plot. """ if ax is None: fig = plt.figure("crossspectrum") ax = fig.add_subplot(1, 1, 1) ax2 = None if np.any(np.iscomplex(self.power)): ax.plot(self.freq, np.abs(self.power), marker, color="k", label="Amplitude") ax2 = ax.twinx() ax2.tick_params("y", colors="b") ax2.plot( self.freq, self.power.imag, marker, color="b", alpha=0.5, label="Imaginary Part" ) ax.plot(self.freq, self.power.real, marker, color="r", alpha=0.5, label="Real Part") lines, line_labels = ax.get_legend_handles_labels() lines2, line_labels2 = ax2.get_legend_handles_labels() lines = lines + lines2 line_labels = line_labels + line_labels2 else: ax.plot(self.freq, np.abs(self.power), marker, color="b") lines, line_labels = ax.get_legend_handles_labels() xlabel = "Frequency (Hz)" ylabel = f"Power ({self.norm})" if labels is not None: try: xlabel = labels[0] ylabel = labels[1] except IndexError: simon("``labels`` must have two labels for x and y axes.") # Not raising here because in case of len(labels)==1, only # x-axis will be labelled. ax.set_xlabel(xlabel) if ax2 is not None: ax.set_ylabel(ylabel + "-Real") ax2.set_ylabel(ylabel + "-Imaginary") else: ax.set_ylabel(ylabel) ax.legend(lines, line_labels, loc="best") if axis is not None: ax.set_xlim(axis[0:2]) ax.set_ylim(axis[2:4]) if ax2 is not None: ax2.set_ylim(axis[2:4]) if title is not None: ax.set_title(title) if save: if filename is None: plt.gcf().savefig("spec.png") else: plt.gcf().savefig(filename) return ax
[docs] def classical_significances(self, threshold=1, trial_correction=False): """ Compute the classical significances for the powers in the power spectrum, assuming an underlying noise distribution that follows a chi-square distributions with 2M degrees of freedom, where M is the number of powers averaged in each bin. Note that this function will *only* produce correct results when the following underlying assumptions are fulfilled: 1. The power spectrum is Leahy-normalized 2. There is no source of variability in the data other than the periodic signal to be determined with this method. This is important! If there are other sources of (aperiodic) variability in the data, this method will *not* produce correct results, but instead produce a large number of spurious false positive detections! 3. There are no significant instrumental effects changing the statistical distribution of the powers (e.g. pile-up or dead time) By default, the method produces ``(index,p-values)`` for all powers in the power spectrum, where index is the numerical index of the power in question. If a ``threshold`` is set, then only powers with p-values *below* that threshold with their respective indices. If ``trial_correction`` is set to ``True``, then the threshold will be corrected for the number of trials (frequencies) in the power spectrum before being used. Parameters ---------- threshold : float, optional, default ``1`` The threshold to be used when reporting p-values of potentially significant powers. Must be between 0 and 1. Default is ``1`` (all p-values will be reported). trial_correction : bool, optional, default ``False`` A Boolean flag that sets whether the ``threshold`` will be corrected by the number of frequencies before being applied. This decreases the ``threshold`` (p-values need to be lower to count as significant). Default is ``False`` (report all powers) though for any application where `threshold`` is set to something meaningful, this should also be applied! Returns ------- pvals : iterable A list of ``(index, p-value)`` tuples for all powers that have p-values lower than the threshold specified in ``threshold``. """ if not self.norm == "leahy": raise ValueError("This method only works on Leahy-normalized power spectra!") if np.size(self.m) == 1: # calculate p-values for all powers # leave out zeroth power since it just encodes the number of photons! pv = np.array([cospectra_pvalue(power, self.m) for power in self.power]) else: pv = np.array([cospectra_pvalue(power, m) for power, m in zip(self.power, self.m)]) # if trial correction is used, then correct the threshold for # the number of powers in the power spectrum if trial_correction: threshold /= self.power.shape[0] # need to add 1 to the indices to make up for the fact that # we left out the first power above! indices = np.where(pv < threshold)[0] pvals = np.vstack([pv[indices], indices]) return pvals
[docs] @staticmethod def from_time_array( times1, times2, dt, segment_size=None, gti=None, norm="none", power_type="all", silent=False, fullspec=False, use_common_mean=True, ): """Calculate AveragedCrossspectrum from two arrays of event times. Parameters ---------- times1 : `np.array` Event arrival times of channel 1 times2 : `np.array` Event arrival times of channel 2 dt : float The time resolution of the intermediate light curves (sets the Nyquist frequency) Other parameters ---------------- segment_size : float The length, in seconds, of the light curve segments that will be averaged. Only relevant (and required) for `AveragedCrossspectrum`. gti : [[gti0, gti1], ...] Good Time intervals. Defaults to the common GTIs from the two input objects. Could throw errors if these GTIs have overlaps with the input object GTIs! If you're getting errors regarding your GTIs, don't use this and only give GTIs to the input objects before making the cross spectrum. norm : str, default "frac" The normalization of the periodogram. "abs" is absolute rms, "frac" is fractional rms, "leahy" is Leahy+83 normalization, and "none" is the unnormalized periodogram use_common_mean : bool, default True The mean of the light curve can be estimated in each interval, or on the full light curve. This gives different results (Alston+2013). Here we assume the mean is calculated on the full light curve, but the user can set ``use_common_mean`` to False to calculate it on a per-segment basis. fullspec : bool, default False Return the full periodogram, including negative frequencies silent : bool, default False Silence the progress bars power_type : str, default 'all' If 'all', give complex powers. If 'abs', the absolute value; if 'real', the real part """ return crossspectrum_from_time_array( times1, times2, dt, segment_size=segment_size, gti=gti, norm=norm, power_type=power_type, silent=silent, fullspec=fullspec, use_common_mean=use_common_mean, )
[docs] @staticmethod def from_events( events1, events2, dt, segment_size=None, norm="none", power_type="all", silent=False, fullspec=False, use_common_mean=True, gti=None, ): """Calculate AveragedCrossspectrum from two event lists Parameters ---------- events1 : `stingray.EventList` Events from channel 1 events2 : `stingray.EventList` Events from channel 2 dt : float The time resolution of the intermediate light curves (sets the Nyquist frequency) Other parameters ---------------- segment_size : float The length, in seconds, of the light curve segments that will be averaged. Only relevant (and required) for AveragedCrossspectrum norm : str, default "frac" The normalization of the periodogram. "abs" is absolute rms, "frac" is fractional rms, "leahy" is Leahy+83 normalization, and "none" is the unnormalized periodogram use_common_mean : bool, default True The mean of the light curve can be estimated in each interval, or on the full light curve. This gives different results (Alston+2013). Here we assume the mean is calculated on the full light curve, but the user can set ``use_common_mean`` to False to calculate it on a per-segment basis. fullspec : bool, default False Return the full periodogram, including negative frequencies silent : bool, default False Silence the progress bars power_type : str, default 'all' If 'all', give complex powers. If 'abs', the absolute value; if 'real', the real part gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...] Good Time intervals. Defaults to the common GTIs from the two input objects. Could throw errors if these GTIs have overlaps with the input object GTIs! If you're getting errors regarding your GTIs, don't use this and only give GTIs to the input objects before making the cross spectrum. """ return crossspectrum_from_events( events1, events2, dt, segment_size=segment_size, norm=norm, power_type=power_type, silent=silent, fullspec=fullspec, use_common_mean=use_common_mean, gti=gti, )
[docs] @staticmethod def from_lightcurve( lc1, lc2, segment_size=None, norm="none", power_type="all", silent=False, fullspec=False, use_common_mean=True, gti=None, ): """Calculate AveragedCrossspectrum from two light curves Parameters ---------- lc1 : `stingray.Lightcurve` Light curve from channel 1 lc2 : `stingray.Lightcurve` Light curve from channel 2 Other parameters ---------------- segment_size : float The length, in seconds, of the light curve segments that will be averaged. Only relevant (and required) for AveragedCrossspectrum norm : str, default "frac" The normalization of the periodogram. "abs" is absolute rms, "frac" is fractional rms, "leahy" is Leahy+83 normalization, and "none" is the unnormalized periodogram use_common_mean : bool, default True The mean of the light curve can be estimated in each interval, or on the full light curve. This gives different results (Alston+2013). Here we assume the mean is calculated on the full light curve, but the user can set ``use_common_mean`` to False to calculate it on a per-segment basis. fullspec : bool, default False Return the full periodogram, including negative frequencies silent : bool, default False Silence the progress bars power_type : str, default 'all' If 'all', give complex powers. If 'abs', the absolute value; if 'real', the real part gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...] Good Time intervals. Defaults to the common GTIs from the two input objects. Could throw errors if these GTIs have overlaps with the input object GTIs! If you're getting errors regarding your GTIs, don't use this and only give GTIs to the input objects before making the cross spectrum. """ return crossspectrum_from_lightcurve( lc1, lc2, segment_size=segment_size, norm=norm, power_type=power_type, silent=silent, fullspec=fullspec, use_common_mean=use_common_mean, gti=gti, )
[docs] @staticmethod def from_stingray_timeseries( ts1, ts2, flux_attr, error_flux_attr=None, segment_size=None, norm="none", power_type="all", silent=False, fullspec=False, use_common_mean=True, gti=None, ): """Calculate AveragedCrossspectrum from two light curves Parameters ---------- ts1 : `stingray.Timeseries` Time series from channel 1 ts2 : `stingray.Timeseries` Time series from channel 2 flux_attr : `str` What attribute of the time series will be used. Other parameters ---------------- error_flux_attr : `str` What attribute of the time series will be used as error bar. segment_size : float The length, in seconds, of the light curve segments that will be averaged. Only relevant (and required) for AveragedCrossspectrum norm : str, default "frac" The normalization of the periodogram. "abs" is absolute rms, "frac" is fractional rms, "leahy" is Leahy+83 normalization, and "none" is the unnormalized periodogram use_common_mean : bool, default True The mean of the light curve can be estimated in each interval, or on the full light curve. This gives different results (Alston+2013). Here we assume the mean is calculated on the full light curve, but the user can set ``use_common_mean`` to False to calculate it on a per-segment basis. fullspec : bool, default False Return the full periodogram, including negative frequencies silent : bool, default False Silence the progress bars power_type : str, default 'all' If 'all', give complex powers. If 'abs', the absolute value; if 'real', the real part gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...] Good Time intervals. Defaults to the common GTIs from the two input objects. Could throw errors if these GTIs have overlaps with the input object GTIs! If you're getting errors regarding your GTIs, don't use this and only give GTIs to the input objects before making the cross spectrum. """ return crossspectrum_from_timeseries( ts1, ts2, flux_attr=flux_attr, error_flux_attr=error_flux_attr, segment_size=segment_size, norm=norm, power_type=power_type, silent=silent, fullspec=fullspec, use_common_mean=use_common_mean, gti=gti, )
[docs] @staticmethod def from_lc_iterable( iter_lc1, iter_lc2, dt, segment_size, norm="none", power_type="all", silent=False, fullspec=False, use_common_mean=True, gti=None, ): """Calculate AveragedCrossspectrum from two light curves Parameters ---------- iter_lc1 : iterable of `stingray.Lightcurve` objects or `np.array` Light curves from channel 1. If arrays, use them as counts iter_lc1 : iterable of `stingray.Lightcurve` objects or `np.array` Light curves from channel 2. If arrays, use them as counts dt : float The time resolution of the light curves (sets the Nyquist frequency) Other parameters ---------------- segment_size : float The length, in seconds, of the light curve segments that will be averaged. Only relevant (and required) for AveragedCrossspectrum norm : str, default "frac" The normalization of the periodogram. "abs" is absolute rms, "frac" is fractional rms, "leahy" is Leahy+83 normalization, and "none" is the unnormalized periodogram use_common_mean : bool, default True The mean of the light curve can be estimated in each interval, or on the full light curve. This gives different results (Alston+2013). Here we assume the mean is calculated on the full light curve, but the user can set ``use_common_mean`` to False to calculate it on a per-segment basis. fullspec : bool, default False Return the full periodogram, including negative frequencies silent : bool, default False Silence the progress bars power_type : str, default 'all' If 'all', give complex powers. If 'abs', the absolute value; if 'real', the real part gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...] Good Time intervals. Defaults to the common GTIs from the two input objects. Could throw errors if these GTIs have overlaps with the input object GTIs! If you're getting errors regarding your GTIs, don't use this and only give GTIs to the input objects before making the cross spectrum. save_all : bool, default False If True, save the cross spectrum of each segment in the ``cs_all`` attribute of the output :class:`Crossspectrum` object. """ return crossspectrum_from_lc_iterable( iter_lc1, iter_lc2, dt, segment_size, norm=norm, power_type=power_type, silent=silent, fullspec=fullspec, use_common_mean=use_common_mean, gti=gti, )
def _initialize_from_any_input( self, data1, data2, dt=None, segment_size=None, norm="frac", power_type="all", silent=False, fullspec=False, gti=None, use_common_mean=True, save_all=False, ): """Initialize the class, trying to understand the input types. The input arguments are the same as ``__init__()``. Based on the type of ``data1``, this method will call the appropriate ``crossspectrum_from_XXXX`` function, and initialize ``self`` with the correct attributes. """ if segment_size is None and isinstance(data1, StingrayObject): common_gti = cross_two_gtis(data1.gti, data2.gti) data1.gti = common_gti data2.gti = common_gti data1 = data1.apply_gtis(inplace=False) data2 = data2.apply_gtis(inplace=False) data1_is_binned = ( "counts" in data1.array_attrs() or "_counts" in data1.internal_array_attrs() ) data2_is_binned = ( "counts" in data2.array_attrs() or "_counts" in data2.internal_array_attrs() ) if data1_is_binned and data2_is_binned: assert data2.time.size == data1.time.size and np.allclose( data2.time - data1.time, 0 ), "Time arrays are not the same" elif data1_is_binned or data2_is_binned: raise ValueError("Please use input data of the same kind") if isinstance(data1, EventList): spec = crossspectrum_from_events( data1, data2, dt, segment_size, norm=norm, power_type=power_type, silent=silent, fullspec=fullspec, use_common_mean=use_common_mean, gti=gti, save_all=save_all, ) elif isinstance(data1, Lightcurve): spec = crossspectrum_from_lightcurve( data1, data2, segment_size, norm=norm, power_type=power_type, silent=silent, fullspec=fullspec, use_common_mean=use_common_mean, gti=gti, save_all=save_all, ) spec.lc1 = data1 spec.lc2 = data2 elif isinstance(data1, (tuple, list)): dt = data1[0].dt # This is a list of light curves. spec = crossspectrum_from_lc_iterable( data1, data2, dt, segment_size, norm=norm, power_type=power_type, silent=silent, fullspec=fullspec, gti=gti, use_common_mean=use_common_mean, save_all=save_all, ) else: # pragma: no cover raise TypeError(f"Bad inputs to Crosssspectrum: {type(data1)}") for key, val in spec.__dict__.items(): setattr(self, key, val) return def _initialize_empty(self): """Set all attributes to None.""" self.freq = None self.power = None self.power_err = None self.unnorm_power = None self.unnorm_power_err = None self.df = None self.dt = None self.nphots1 = None self.nphots2 = None self.m = 1 self.n = None self.fullspec = None self.k = 1 return
[docs] def deadtime_correct( self, dead_time, rate, background_rate=0, limit_k=200, n_approx=None, paralyzable=False ): """ Correct the power spectrum for dead time effects. This correction is based on the formula given in Zhang et al. 2015, assuming a constant dead time for all events. For more advanced dead time corrections, see the FAD method from `stingray.deadtime.fad` Parameters ---------- dead_time: float The dead time of the detector. rate : float Detected source count rate Other Parameters ---------------- background_rate : float, default 0 Detected background count rate. This is important to estimate when deadtime is given by the combination of the source counts and background counts (e.g. in an imaging X-ray detector). paralyzable: bool, default False If True, the dead time correction is done assuming a paralyzable dead time. If False, the correction is done assuming a non-paralyzable (more common) dead time. limit_k : int, default 200 Limit to this value the number of terms in the inner loops of calculations. Check the plots returned by the `check_B` and `check_A` functions to test that this number is adequate. n_approx : int, default None Number of bins to calculate the model power spectrum. If None, it will use the size of the input frequency array. Relatively simple models (e.g., low count rates compared to dead time) can use a smaller number of bins to speed up the calculation, and the final power values will be interpolated. Returns ------- spectrum: :class:`Crossspectrum` or derivative. The dead-time corrected spectrum. """ # I put it here to avoid circular imports from stingray.deadtime import non_paralyzable_dead_time_model if paralyzable: raise NotImplementedError("Paralyzable dead time correction is not implemented yet.") model = non_paralyzable_dead_time_model( self.freq, dead_time, rate, bin_time=self.dt, limit_k=limit_k, background_rate=background_rate, n_approx=n_approx, ) correction = 2 / model new_spec = copy.deepcopy(self) new_spec.power *= correction # Now correct internal attributes for both the spectrum and its possible # sub-spectra (PDSs from single channels, dynamical spectra, etc.) objects = [new_spec] if hasattr(new_spec, "pds1"): objects += [new_spec.pds1, new_spec.pds2] for obj in objects: for attr in ["unnorm_power", "power_err", "unnorm_power_err"]: if hasattr(obj, attr): setattr(obj, attr, getattr(obj, attr) * correction) for attr in ["cs_all", "unnorm_cs_all"]: if hasattr(new_spec, attr): dynsp = getattr(new_spec, attr) for i, power in enumerate(dynsp): dynsp[i] = power * correction return new_spec
[docs] class AveragedCrossspectrum(Crossspectrum): type = "crossspectrum" """ Make an averaged cross spectrum from a light curve by segmenting two light curves, Fourier-transforming each segment and then averaging the resulting cross spectra. Parameters ---------- data1: :class:`stingray.Lightcurve`object OR iterable of :class:`stingray.Lightcurve` objects OR :class:`stingray.EventList` object A light curve from which to compute the cross spectrum. In some cases, this would be the light curve of the wavelength/energy/frequency band of interest. data2: :class:`stingray.Lightcurve`object OR iterable of :class:`stingray.Lightcurve` objects OR :class:`stingray.EventList` object A second light curve to use in the cross spectrum. In some cases, this would be the wavelength/energy/frequency reference band to compare the band of interest with. segment_size: float The size of each segment to average. Note that if the total duration of each :class:`Lightcurve` object in ``lc1`` or ``lc2`` is not an integer multiple of the ``segment_size``, then any fraction left-over at the end of the time series will be lost. Otherwise you introduce artifacts. norm: {``frac``, ``abs``, ``leahy``, ``none``}, default ``none`` The normalization of the (real part of the) cross spectrum. Other Parameters ---------------- gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...] Good Time intervals. Defaults to the common GTIs from the two input objects. Could throw errors if these GTIs have overlaps with the input object GTIs! If you're getting errors regarding your GTIs, don't use this and only give GTIs to the input objects before making the cross spectrum. dt : float The time resolution of the light curve. Only needed when constructing light curves in the case where data1 or data2 are of :class:EventList power_type: string, optional, default ``all`` Parameter to choose among complete, real part and magnitude of the cross spectrum. silent : bool, default False Do not show a progress bar when generating an averaged cross spectrum. Useful for the batch execution of many spectra lc1: :class:`stingray.Lightcurve`object OR iterable of :class:`stingray.Lightcurve` objects For backwards compatibility only. Like ``data1``, but no :class:`stingray.events.EventList` objects allowed lc2: :class:`stingray.Lightcurve`object OR iterable of :class:`stingray.Lightcurve` objects For backwards compatibility only. Like ``data2``, but no :class:`stingray.events.EventList` objects allowed fullspec: boolean, optional, default ``False`` If True, return the full array of frequencies, otherwise return just the positive frequencies. save_all : bool, default False Save all intermediate PDSs used for the final average. Use with care. This is likely to fill up your RAM on medium-sized datasets, and to slow down the computation when rebinning. skip_checks: bool Skip initial checks, for speed or other reasons (you need to trust your inputs!) use_common_mean: bool Averaged cross spectra are normalized in two possible ways: one is by normalizing each of the single spectra that get averaged, the other is by normalizing after the averaging. If `use_common_mean` is selected, the spectrum will be normalized after the average. gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...] Good Time intervals. Defaults to the common GTIs from the two input objects. Could throw errors if these GTIs have overlaps with the input object GTIs! If you're getting errors regarding your GTIs, don't use this and only give GTIs to the input objects before making the cross spectrum. Attributes ---------- freq: numpy.ndarray The array of mid-bin frequencies that the Fourier transform samples. power: numpy.ndarray The array of cross spectra. power_err: numpy.ndarray The uncertainties of ``power``. An approximation for each bin given by ``power_err= power/sqrt(m)``. Where ``m`` is the number of power averaged in each bin (by frequency binning, or averaging power spectra of segments of a light curve). Note that for a single realization (``m=1``) the error is equal to the power. df: float The frequency resolution. m: int The number of averaged cross spectra. n: int The number of time bins per segment of light curve. nphots1: float The total number of photons in the first (interest) light curve. nphots2: float The total number of photons in the second (reference) light curve. gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...] Good Time intervals. """ def __init__( self, data1=None, data2=None, segment_size=None, norm="frac", gti=None, power_type="all", silent=False, lc1=None, lc2=None, dt=None, fullspec=False, save_all=False, use_common_mean=True, skip_checks=False, ): self._type = None # for backwards compatibility if data1 is None: data1 = lc1 if data2 is None: data2 = lc2 empty = data1 is None and data2 is None good_input = not empty if not skip_checks: good_input = self.initial_checks( data1=data1, data2=data2, norm=norm, gti=gti, lc1=lc1, lc2=lc2, power_type=power_type, dt=dt, fullspec=fullspec, segment_size=segment_size, ) norm = norm.lower() self.norm = norm self.dt = dt self.save_all = save_all self.segment_size = segment_size self.show_progress = not silent if empty or not good_input: return self._initialize_empty() if isinstance(data1, Generator): warnings.warn( "The averaged Cross spectrum from a generator of " "light curves pre-allocates the full list of light " "curves, losing all advantage of lazy loading. If it " "is important for you, use the " "AveragedCrossspectrum.from_lc_iterable static " "method, specifying the sampling time `dt`." ) data1 = list(data1) data2 = list(data2) return self._initialize_from_any_input( data1, data2, dt=dt, segment_size=segment_size, gti=gti, norm=norm, power_type=power_type, silent=silent, fullspec=fullspec, use_common_mean=use_common_mean, save_all=save_all, )
[docs] def initial_checks(self, data1, segment_size=None, **kwargs): """ Examples -------- >>> times = np.arange(0, 10) >>> ev1 = EventList(times) >>> ev2 = EventList(times) >>> ac = AveragedCrossspectrum() If AveragedCrossspectrum, you need ``segment_size`` >>> AveragedCrossspectrum.initial_checks(ac, data1=ev1, data2=ev2, dt=1) Traceback (most recent call last): ... ValueError: segment_size must be specified... And it needs to be finite! >>> AveragedCrossspectrum.initial_checks(ac, data1=ev1, data2=ev2, dt=1., segment_size=np.nan) Traceback (most recent call last): ... ValueError: segment_size must be finite! """ good = Crossspectrum.initial_checks(self, data1, segment_size=segment_size, **kwargs) if not good: return False if isinstance(self, AveragedCrossspectrum) and segment_size is None and data1 is not None: raise ValueError("segment_size must be specified") if ( isinstance(self, AveragedCrossspectrum) and segment_size is not None and not np.isfinite(segment_size) ): raise ValueError("segment_size must be finite!") return True
[docs] def coherence(self): """Averaged Coherence function. Coherence is defined in Vaughan and Nowak, 1996 [#]_. It is a Fourier frequency dependent measure of the linear correlation between time series measured simultaneously in two energy channels. Compute an averaged Coherence function of cross spectrum by computing coherence function of each segment and averaging them. The return type is a tuple with first element as the coherence function and the second element as the corresponding uncertainty associated with it. Note : The uncertainty in coherence function is strictly valid for Gaussian \ statistics only. Returns ------- (coh, uncertainty) : tuple of np.ndarray Tuple comprising the coherence function and uncertainty. References ---------- .. [#] https://iopscience.iop.org/article/10.1086/310430 """ if np.any(self.m < 50): simon( "Number of segments used in averaging is " "significantly low. The result might not follow the " "expected statistical distributions." ) c = self.unnorm_power p1 = self.pds1.unnorm_power p2 = self.pds2.unnorm_power meanrate1 = self.nphots1 / self.n / self.dt meanrate2 = self.nphots2 / self.n / self.dt P1noise = poisson_level(norm="none", meanrate=meanrate1, n_ph=self.nphots1) P2noise = poisson_level(norm="none", meanrate=meanrate2, n_ph=self.nphots2) coh = raw_coherence(c, p1, p2, P1noise, P2noise, self.n) # Calculate uncertainty uncertainty = (2**0.5 * coh * (1 - coh)) / (np.sqrt(coh) * self.m**0.5) uncertainty[coh == 0] = 0.0 return (coh, uncertainty)
[docs] def phase_lag(self): """Return the fourier phase lag of the cross spectrum.""" lag = np.angle(self.unnorm_power) coh, uncert = self.coherence() dum = (1.0 - coh) / (2.0 * coh) dum[coh == 0] = 0.0 lag_err = np.sqrt(dum / self.m) return lag, lag_err
[docs] def time_lag(self): """Calculate time lag and uncertainty. Equation from Bendat & Piersol, 2011 [bendat-2011]__. Returns ------- lag : np.ndarray The time lag lag_err : np.ndarray The uncertainty in the time lag """ ph_lag, ph_lag_err = self.phase_lag() lag = ph_lag / (2 * np.pi * self.freq) lag_err = ph_lag_err / (2 * np.pi * self.freq) return lag, lag_err
class DynamicalCrossspectrum(AveragedCrossspectrum): type = "crossspectrum" """ Create a dynamical cross spectrum, also often called a *spectrogram*. This class will divide two :class:`Lightcurve` objects into segments of length ``segment_size``, create a cross spectrum for each segment and store all powers in a matrix as a function of both time (using the mid-point of each segment) and frequency. This is often used to trace changes in period of a (quasi-)periodic signal over time. Parameters ---------- data1 : :class:`stingray.Lightcurve` or :class:`stingray.EventList` object The time series or event list from the subject band, or channel, for which the dynamical cross spectrum is to be calculated. If :class:`stingray.EventList`, ``dt`` must be specified as well. data2 : :class:`stingray.Lightcurve` or :class:`stingray.EventList` object The time series or event list from the reference band, or channel, of the dynamical cross spectrum. If :class:`stingray.EventList`, ``dt`` must be specified as well. segment_size : float, default 1 Length of the segment of light curve, default value is 1 (in whatever units the ``time`` array in the :class:`Lightcurve`` object uses). norm: {"leahy" | "frac" | "abs" | "none" }, optional, default "frac" The normaliation of the periodogram to be used. Other Parameters ---------------- gti: 2-d float array ``[[gti0_0, gti0_1], [gti1_0, gti1_1], ...]`` -- Good Time intervals. This choice overrides the GTIs in the single light curves. Use with care, especially if these GTIs have overlaps with the input object GTIs! If you're getting errors regarding your GTIs, don't use this and only give GTIs to the input object before making the cross spectrum. sample_time: float Compulsory for input :class:`stingray.EventList` data. The time resolution of the lightcurve that is created internally from the input event lists. Drives the Nyquist frequency. Attributes ---------- segment_size: float The size of each segment to average. Note that if the total duration of each input object in lc is not an integer multiple of the ``segment_size``, then any fraction left-over at the end of the time series will be lost. dyn_ps : np.ndarray The matrix of normalized squared absolute values of Fourier amplitudes. The axis are given by the ``freq`` and ``time`` attributes. norm: {``leahy`` | ``frac`` | ``abs`` | ``none``} The normalization of the periodogram. freq: numpy.ndarray The array of mid-bin frequencies that the Fourier transform samples. time: numpy.ndarray The array of mid-point times of each interval used for the dynamical power spectrum. df: float The frequency resolution. dt: float The time resolution of the dynamical spectrum. It is **not** the time resolution of the input light curve. It is the integration time of each line of the dynamical power spectrum (typically, an integer multiple of ``segment_size``). m: int The number of averaged powers in each spectral bin (initially 1, it changes after rebinning). """ def __init__(self, data1, data2, segment_size, norm="frac", gti=None, sample_time=None): if isinstance(data1, EventList) and sample_time is None: raise ValueError("To pass input event lists, please specify sample_time") elif isinstance(data1, Lightcurve): sample_time = data1.dt if segment_size > data1.tseg: raise ValueError( "Length of the segment is too long to create " "any segments of the light curve!" ) if segment_size < 2 * sample_time: raise ValueError("Length of the segment is too short to form a light curve!") self.segment_size = segment_size self.sample_time = sample_time self.gti = gti self.norm = norm self._make_matrix(data1, data2) def _make_matrix(self, data1, data2): """ Create a matrix of powers for each time step and each frequency step. Time increases with row index, frequency with column index. Parameters ---------- data1 : :class:`Lightcurve` or :class:`EventList` The object providing the subject band/channel for the dynamical cross spectrum. data2 : :class:`Lightcurve` or :class:`EventList` The object providing the reference band for the dynamical cross spectrum. """ avg = AveragedCrossspectrum( data1, data2, dt=self.sample_time, segment_size=self.segment_size, norm=self.norm, gti=self.gti, save_all=True, ) self.dyn_ps = np.array(avg.cs_all).T conv = avg.cs_all / avg.unnorm_cs_all self.unnorm_conversion = np.nanmean(conv) self.freq = avg.freq current_gti = avg.gti self.nphots1 = avg.nphots1 self.nphots2 = avg.nphots2 tstart, _ = time_intervals_from_gtis(current_gti, self.segment_size) self.time = tstart + 0.5 * self.segment_size self.df = avg.df self.dt = self.segment_size self.m = 1 def rebin_frequency(self, df_new, method="average"): """ Rebin the Dynamic Power Spectrum to a new frequency resolution. Rebinning is an in-place operation, i.e. will replace the existing ``dyn_ps`` attribute. While the new resolution does not need to be an integer of the previous frequency resolution, be aware that if this is the case, the last frequency bin will be cut off by the fraction left over by the integer division Parameters ---------- df_new: float The new frequency resolution of the dynamical power spectrum. Must be larger than the frequency resolution of the old dynamical power spectrum! method: {"sum" | "mean" | "average"}, optional, default "average" This keyword argument sets whether the powers in the new bins should be summed or averaged. """ new_dynspec_object = copy.deepcopy(self) dynspec_new = [] for data in self.dyn_ps.T: freq_new, bin_counts, bin_err, step = rebin_data( self.freq, data, dx_new=df_new, method=method ) dynspec_new.append(bin_counts) new_dynspec_object.freq = freq_new new_dynspec_object.dyn_ps = np.array(dynspec_new).T new_dynspec_object.df = df_new new_dynspec_object.m = int(step) * self.m return new_dynspec_object def rebin_time(self, dt_new, method="average"): """ Rebin the Dynamic Power Spectrum to a new time resolution. Note: this is *not* changing the time resolution of the input light curve! ``dt`` is the integration time of each line of the dynamical power spectrum (typically, an integer multiple of ``segment_size``). While the new resolution does not need to be an integer of the previous time resolution, be aware that if this is the case, the last time bin will be cut off by the fraction left over by the integer division Parameters ---------- dt_new: float The new time resolution of the dynamical power spectrum. Must be larger than the time resolution of the old dynamical power spectrum! method: {"sum" | "mean" | "average"}, optional, default "average" This keyword argument sets whether the powers in the new bins should be summed or averaged. Returns ------- time_new: numpy.ndarray Time axis with new rebinned time resolution. dynspec_new: numpy.ndarray New rebinned Dynamical Cross Spectrum. """ if dt_new < self.dt: raise ValueError("New time resolution must be larger than old time resolution!") new_dynspec_object = copy.deepcopy(self) dynspec_new = [] for data in self.dyn_ps: time_new, bin_counts, _, step = rebin_data( self.time, data, dt_new, method=method, dx=self.dt ) dynspec_new.append(bin_counts) new_dynspec_object.time = time_new new_dynspec_object.dyn_ps = np.array(dynspec_new) new_dynspec_object.dt = dt_new new_dynspec_object.m = int(step) * self.m return new_dynspec_object def rebin_by_n_intervals(self, n, method="average"): """ Rebin the Dynamic Power Spectrum to a new time resolution, by summing contiguous intervals. This is different from meth:`DynamicalPowerspectrum.rebin_time` in that it averages ``n`` consecutive intervals regardless of their distance in time. ``rebin_time`` will instead average intervals that are separated at most by a time ``dt_new``. Parameters ---------- n: int The number of intervals to be combined into one. method: {"sum" | "mean" | "average"}, optional, default "average" This keyword argument sets whether the powers in the new bins should be summed or averaged. Returns ------- time_new: numpy.ndarray Time axis with new rebinned time resolution. dynspec_new: numpy.ndarray New rebinned Dynamical Cross Spectrum. """ if not np.issubdtype(type(n), np.integer): warnings.warn("n must be an integer. Casting to int") n = int(n) if n == 1: return copy.deepcopy(self) if n < 1: raise ValueError("n must be >= 1") new_dynspec_object = copy.deepcopy(self) dynspec_new = [] time_new = [] for i, data in enumerate(self.dyn_ps.T): if i % n == 0: count = 1 bin_counts = data bin_times = self.time[i] else: count += 1 bin_counts += data bin_times += self.time[i] if count == n: if method in ["mean", "average"]: bin_counts /= n dynspec_new.append(bin_counts) bin_times /= n time_new.append(bin_times) new_dynspec_object.time = time_new new_dynspec_object.dyn_ps = np.array(dynspec_new).T new_dynspec_object.dt *= n new_dynspec_object.m = n * self.m return new_dynspec_object def trace_maximum(self, min_freq=None, max_freq=None): """ Return the indices of the maximum powers in each segment :class:`Powerspectrum` between specified frequencies. Parameters ---------- min_freq: float, default ``None`` The lower frequency bound. max_freq: float, default ``None`` The upper frequency bound. Returns ------- max_positions : np.array The array of indices of the maximum power in each segment having frequency between ``min_freq`` and ``max_freq``. """ if min_freq is None: min_freq = np.min(self.freq) if max_freq is None: max_freq = np.max(self.freq) max_positions = [] for ps in self.dyn_ps.T: indices = np.logical_and(self.freq <= max_freq, min_freq <= self.freq) max_power = np.max(ps[indices]) max_positions.append(np.where(ps == max_power)[0][0]) return np.array(max_positions) def power_colors( self, freq_edges=[1 / 256, 1 / 32, 0.25, 2.0, 16.0], freqs_to_exclude=None, poisson_power=0, ): """ Compute the power colors of the dynamical power spectrum. See Heil et al. 2015, MNRAS, 448, 3348 Parameters ---------- freq_edges: iterable The edges of the frequency bins to be used for the power colors. freqs_to_exclude : 1-d or 2-d iterable, optional, default None The ranges of frequencies to exclude from the calculation of the power color. For example, the frequencies containing strong QPOs. A 1-d iterable should contain two values for the edges of a single range. (E.g. ``[0.1, 0.2]``). ``[[0.1, 0.2], [3, 4]]`` will exclude the ranges 0.1-0.2 Hz and 3-4 Hz. poisson_power : float or iterable, optional, default 0 The Poisson noise level of the power spectrum. If iterable, it should have the same length as ``freq``. (This might apply to the case of a power spectrum with a strong dead time distortion) Returns ------- pc0: np.ndarray pc0_err: np.ndarray pc1: np.ndarray pc1_err: np.ndarray The power colors for each spectrum and their respective errors """ power_colors = [] if np.iscomplexobj(self.dyn_ps): warnings.warn("When using power_colors, complex powers will be cast to real.") for ps in self.dyn_ps.T: power_colors.append( power_color( self.freq, ps.real, freq_edges=freq_edges, freqs_to_exclude=freqs_to_exclude, df=self.df, poisson_power=poisson_power, m=self.m, ) ) pc0, pc0_err, pc1, pc1_err = np.array(power_colors).T return pc0, pc0_err, pc1, pc1_err def compute_rms(self, min_freq, max_freq, poisson_noise_level=0): """ Compute the fractional rms amplitude in the power spectrum between two frequencies. Parameters ---------- min_freq: float The lower frequency bound for the calculation. max_freq: float The upper frequency bound for the calculation. Other parameters ---------------- poisson_noise_level : float, default is None This is the Poisson noise level of the PDS with same normalization as the PDS. Returns ------- rms: float The fractional rms amplitude contained between ``min_freq`` and ``max_freq``. rms_err: float The error on the fractional rms amplitude. """ dyn_ps_unnorm = self.dyn_ps / self.unnorm_conversion poisson_noise_unnrom = poisson_noise_level / self.unnorm_conversion if not hasattr(self, "nphots"): nphots = (self.nphots1 * self.nphots2) ** 0.5 else: nphots = self.nphots good = (self.freq >= min_freq) & (self.freq <= max_freq) M_freq = self.m if isinstance(self.m, Iterable): M_freq = self.m[good] rmss = [] rms_errs = [] for unnorm_powers in dyn_ps_unnorm.T: r, re = get_rms_from_unnorm_periodogram( unnorm_powers[good], nphots, self.df, M=M_freq, poisson_noise_unnorm=poisson_noise_unnrom, segment_size=self.segment_size, kind="frac", ) rmss.append(r) rms_errs.append(re) return rmss, rms_errs def crossspectrum_from_time_array( times1, times2, dt, segment_size=None, gti=None, norm="none", power_type="all", silent=False, fullspec=False, use_common_mean=True, save_all=False, ): """Calculate AveragedCrossspectrum from two arrays of event times. Parameters ---------- times1 : `np.array` Event arrival times of channel 1 times2 : `np.array` Event arrival times of channel 2 dt : float The time resolution of the intermediate light curves (sets the Nyquist frequency) Other parameters ---------------- segment_size : float The length, in seconds, of the light curve segments that will be averaged gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...] Good Time intervals. Defaults to the common GTIs from the two input objects norm : str, default "frac" The normalization of the periodogram. "abs" is absolute rms, "frac" is fractional rms, "leahy" is Leahy+83 normalization, and "none" is the unnormalized periodogram use_common_mean : bool, default True The mean of the light curve can be estimated in each interval, or on the full light curve. This gives different results (Alston+2013). Here we assume the mean is calculated on the full light curve, but the user can set ``use_common_mean`` to False to calculate it on a per-segment basis. fullspec : bool, default False Return the full periodogram, including negative frequencies silent : bool, default False Silence the progress bars power_type : str, default 'all' If 'all', give complex powers. If 'abs', the absolute value; if 'real', the real part Returns ------- spec : `AveragedCrossspectrum` or `Crossspectrum` The output cross spectrum. """ force_averaged = segment_size is not None # Suppress progress bar for single periodogram silent = silent or (segment_size is None) results = avg_cs_from_timeseries( times1, times2, gti, segment_size, dt, norm=norm, use_common_mean=use_common_mean, fullspec=fullspec, silent=silent, power_type=power_type, return_auxil=True, return_subcs=save_all, ) return _create_crossspectrum_from_result_table(results, force_averaged=force_averaged) def crossspectrum_from_events( events1, events2, dt, segment_size=None, norm="none", power_type="all", silent=False, fullspec=False, use_common_mean=True, gti=None, save_all=False, ): """Calculate AveragedCrossspectrum from two event lists Parameters ---------- events1 : `stingray.EventList` Events from channel 1 events2 : `stingray.EventList` Events from channel 2 dt : float The time resolution of the intermediate light curves (sets the Nyquist frequency) Other parameters ---------------- segment_size : float, default None The length, in seconds, of the light curve segments that will be averaged norm : str, default "frac" The normalization of the periodogram. "abs" is absolute rms, "frac" is fractional rms, "leahy" is Leahy+83 normalization, and "none" is the unnormalized periodogram use_common_mean : bool, default True The mean of the light curve can be estimated in each interval, or on the full light curve. This gives different results (Alston+2013). Here we assume the mean is calculated on the full light curve, but the user can set ``use_common_mean`` to False to calculate it on a per-segment basis. fullspec : bool, default False Return the full periodogram, including negative frequencies silent : bool, default False Silence the progress bars power_type : str, default 'all' If 'all', give complex powers. If 'abs', the absolute value; if 'real', the real part gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...] Good Time intervals. Defaults to the common GTIs from the two input objects Returns ------- spec : `AveragedCrossspectrum` or `Crossspectrum` The output cross spectrum. """ if gti is None: gti = cross_two_gtis(events1.gti, events2.gti) return crossspectrum_from_time_array( events1.time, events2.time, dt, segment_size, gti, norm=norm, power_type=power_type, silent=silent, fullspec=fullspec, use_common_mean=use_common_mean, save_all=save_all, ) def crossspectrum_from_lightcurve( lc1, lc2, segment_size=None, norm="none", power_type="all", silent=False, fullspec=False, use_common_mean=True, gti=None, save_all=False, ): """Calculate AveragedCrossspectrum from two light curves Parameters ---------- lc1 : `stingray.Lightcurve` Light curve from channel 1 lc2 : `stingray.Lightcurve` Light curve from channel 2 Other parameters ---------------- segment_size : float, default None The length, in seconds, of the light curve segments that will be averaged norm : str, default "frac" The normalization of the periodogram. "abs" is absolute rms, "frac" is fractional rms, "leahy" is Leahy+83 normalization, and "none" is the unnormalized periodogram use_common_mean : bool, default True The mean of the light curve can be estimated in each interval, or on the full light curve. This gives different results (Alston+2013). Here we assume the mean is calculated on the full light curve, but the user can set ``use_common_mean`` to False to calculate it on a per-segment basis. fullspec : bool, default False Return the full periodogram, including negative frequencies silent : bool, default False Silence the progress bars power_type : str, default 'all' If 'all', give complex powers. If 'abs', the absolute value; if 'real', the real part gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...] Good Time intervals. Defaults to the common GTIs from the two input objects save_all : bool, default False Save all intermediate spectra used for the final average. Use with care. This is likely to fill up your RAM on medium-sized datasets, and to slow down the computation when rebinning. Returns ------- spec : `AveragedCrossspectrum` or `Crossspectrum` The output cross spectrum. """ error_flux_attr = None if lc1.err_dist == "gauss": error_flux_attr = "_counts_err" return crossspectrum_from_timeseries( lc1, lc2, "_counts", error_flux_attr=error_flux_attr, segment_size=segment_size, norm=norm, power_type=power_type, silent=silent, fullspec=fullspec, use_common_mean=use_common_mean, gti=gti, save_all=save_all, ) def crossspectrum_from_timeseries( ts1, ts2, flux_attr, error_flux_attr=None, segment_size=None, norm="none", power_type="all", silent=False, fullspec=False, use_common_mean=True, gti=None, save_all=False, ): """Calculate AveragedCrossspectrum from two time series Parameters ---------- ts1 : `stingray.StingrayTimeseries` Time series from channel 1 ts2 : `stingray.StingrayTimeseries` Time series from channel 2 flux_attr : `str` What attribute of the time series will be used. Other parameters ---------------- error_flux_attr : `str` What attribute of the time series will be used as error bar. segment_size : float, default None The length, in seconds, of the light curve segments that will be averaged norm : str, default "frac" The normalization of the periodogram. "abs" is absolute rms, "frac" is fractional rms, "leahy" is Leahy+83 normalization, and "none" is the unnormalized periodogram use_common_mean : bool, default True The mean of the light curve can be estimated in each interval, or on the full light curve. This gives different results (Alston+2013). Here we assume the mean is calculated on the full light curve, but the user can set ``use_common_mean`` to False to calculate it on a per-segment basis. fullspec : bool, default False Return the full periodogram, including negative frequencies silent : bool, default False Silence the progress bars power_type : str, default 'all' If 'all', give complex powers. If 'abs', the absolute value; if 'real', the real part gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...] Good Time intervals. Defaults to the common GTIs from the two input objects save_all : bool, default False Save all intermediate spectra used for the final average. Use with care. This is likely to fill up your RAM on medium-sized datasets, and to slow down the computation when rebinning. Returns ------- spec : `AveragedCrossspectrum` or `Crossspectrum` The output cross spectrum. """ force_averaged = segment_size is not None # Suppress progress bar for single periodogram silent = silent or (segment_size is None) if gti is None: gti = cross_two_gtis(ts1.gti, ts2.gti) err1 = err2 = None if error_flux_attr is not None: err1 = getattr(ts1, error_flux_attr) err2 = getattr(ts2, error_flux_attr) results = avg_cs_from_timeseries( ts1.time, ts2.time, gti, segment_size, ts1.dt, norm=norm, use_common_mean=use_common_mean, fullspec=fullspec, silent=silent, power_type=power_type, fluxes1=getattr(ts1, flux_attr), fluxes2=getattr(ts2, flux_attr), errors1=err1, errors2=err2, return_auxil=True, return_subcs=save_all, ) return _create_crossspectrum_from_result_table(results, force_averaged=force_averaged) def crossspectrum_from_lc_iterable( iter_lc1, iter_lc2, dt, segment_size, norm="none", power_type="all", silent=False, fullspec=False, use_common_mean=True, gti=None, save_all=False, ): """Calculate AveragedCrossspectrum from two light curves Parameters ---------- iter_lc1 : iterable of `stingray.Lightcurve` objects or `np.array` Light curves from channel 1. If arrays, use them as counts iter_lc1 : iterable of `stingray.Lightcurve` objects or `np.array` Light curves from channel 2. If arrays, use them as counts dt : float The time resolution of the light curves (sets the Nyquist frequency) segment_size : float The length, in seconds, of the light curve segments that will be averaged Other parameters ---------------- norm : str, default "frac" The normalization of the periodogram. "abs" is absolute rms, "frac" is fractional rms, "leahy" is Leahy+83 normalization, and "none" is the unnormalized periodogram use_common_mean : bool, default True The mean of the light curve can be estimated in each interval, or on the full light curve. This gives different results (Alston+2013). Here we assume the mean is calculated on the full light curve, but the user can set ``use_common_mean`` to False to calculate it on a per-segment basis. fullspec : bool, default False Return the full periodogram, including negative frequencies silent : bool, default False Silence the progress bars power_type : str, default 'all' If 'all', give complex powers. If 'abs', the absolute value; if 'real', the real part gti: [[gti0_0, gti0_1], [gti1_0, gti1_1], ...] Good Time intervals. The GTIs of the input light curves are intersected with these. Returns ------- spec : `AveragedCrossspectrum` or `Crossspectrum` The output cross spectrum. """ force_averaged = segment_size is not None # Suppress progress bar for single periodogram silent = silent or (segment_size is None) common_gti = gti def iterate_lc_counts(iter_lc): for lc in iter_lc: if hasattr(lc, "counts"): n_bin = np.rint(segment_size / lc.dt).astype(int) gti = lc.gti if common_gti is not None: gti = cross_two_gtis(common_gti, lc.gti) err = None if lc.err_dist == "gauss": err = lc.counts_err flux_iterable = get_flux_iterable_from_segments( lc.time, gti, segment_size, n_bin, fluxes=lc.counts, errors=err ) for out in flux_iterable: yield out else: yield lc results = avg_cs_from_iterables( iterate_lc_counts(iter_lc1), iterate_lc_counts(iter_lc2), dt, norm=norm, use_common_mean=use_common_mean, silent=silent, fullspec=fullspec, power_type=power_type, return_auxil=True, return_subcs=save_all, ) return _create_crossspectrum_from_result_table(results, force_averaged=force_averaged) def _create_crossspectrum_from_result_table(table, force_averaged=False): """Copy the columns and metadata from the results of ``stingray.fourier.avg_cs_from_XX`` functions into `AveragedCrossspectrum` or `Crossspectrum` objects. By default, allocates a Crossspectrum object if the number of averaged spectra is 1, otherwise an AveragedCrossspectrum. If the user specifies ``force_averaged=True``, it always allocates an AveragedCrossspectrum. Parameters ---------- table : `astropy.table.Table` results of `avg_cs_from_iterables` or `avg_cs_from_iterables_quick` Other parameters ---------------- force_averaged : bool, default False Returns ------- spec : `AveragedCrossspectrum` or `Crossspectrum` The output cross spectrum. """ if table.meta["m"] > 1 or force_averaged: cs = AveragedCrossspectrum() cs.pds1 = AveragedCrossspectrum() cs.pds2 = AveragedCrossspectrum() else: cs = Crossspectrum() cs.pds1 = Crossspectrum() cs.pds2 = Crossspectrum() cs.freq = cs.pds1.freq = cs.pds2.freq = np.array(table["freq"]) cs.norm = cs.pds1.norm = cs.pds2.norm = table.meta["norm"] cs.power = np.array(table["power"]) cs.pds1.power = np.array(table["pds1"]) cs.pds2.power = np.array(table["pds2"]) cs.unnorm_power = np.array(table["unnorm_power"]) cs.pds1.unnorm_power = np.array(table["unnorm_pds1"]) cs.pds2.unnorm_power = np.array(table["unnorm_pds2"]) cs.pds1.type = cs.pds2.type = "powerspectrum" if "subcs" in table.meta: cs.cs_all = np.array(table.meta["subcs"]) cs.unnorm_cs_all = np.array(table.meta["unnorm_subcs"]) for attr, val in table.meta.items(): if not attr.endswith("subcs"): setattr(cs, attr, val) setattr(cs.pds1, attr, val) setattr(cs.pds2, attr, val) cs.err_dist = "poisson" if cs.variance is not None: cs.err_dist = cs.pds1.err_dist = cs.pds2.err_dist = "gauss" # Transform nphods1 in nphots for pds1, etc. for attr, val in table.meta.items(): if attr.endswith("1"): setattr(cs.pds1, attr[:-1], val) if attr.endswith("2"): setattr(cs.pds2, attr[:-1], val) # I start from unnormalized, and I normalize after correcting for bad error values P1noise = poisson_level(norm="none", meanrate=cs.countrate1, n_ph=cs.nphots1) P2noise = poisson_level(norm="none", meanrate=cs.countrate2, n_ph=cs.nphots2) dRe, dIm, _, _ = error_on_averaged_cross_spectrum( cs.unnorm_power, cs.pds1.unnorm_power, cs.pds2.unnorm_power, cs.m, P1noise, P2noise, common_ref="False", ) bad = np.isnan(dRe) | np.isnan(dIm) if np.any(bad): warnings.warn( "Some error bars in the Averaged Crossspectrum are invalid." "Defaulting to sqrt(2 / M) in Leahy norm, rescaled to the appropriate norm." ) Nph = np.sqrt(cs.nphots1 * cs.nphots2) default_err = np.sqrt(2 / cs.m) * Nph / 2 dRe[bad] = default_err dIm[bad] = default_err power_err = dRe + 1.0j * dIm cs.unnorm_power_err = power_err mean = table.meta["mean"] nph = table.meta["nphots"] cs.power_err = normalize_periodograms( power_err, cs.dt, cs.n, mean, n_ph=nph, variance=cs.variance, norm=cs.norm ) cs.pds1.power_err = cs.pds1.power / np.sqrt(cs.pds1.m) cs.pds2.power_err = cs.pds2.power / np.sqrt(cs.pds2.m) cs.pds1.unnorm_power_err = cs.pds1.unnorm_power / np.sqrt(cs.pds1.m) cs.pds2.unnorm_power_err = cs.pds2.unnorm_power / np.sqrt(cs.pds2.m) assert hasattr(cs, "df") assert hasattr(cs, "dt") return cs