Source code for stingray.covariancespectrum

# -*- coding: utf-8 -*-

from collections.abc import Iterable

import numpy as np

from stingray import Lightcurve
from stingray.events import EventList
import stingray.utils as utils

__all__ = ["Covariancespectrum", "AveragedCovariancespectrum"]


[docs] class Covariancespectrum(object): """ Compute a covariance spectrum for the data. The input data can be either in event data or pre-made light curves. Event data can either be in the form of a ``numpy.ndarray`` with ``(time stamp, energy)`` pairs or a :class:`stingray.events.EventList` object. If light curves are formed ahead of time, then a list of :class:`stingray.Lightcurve` objects should be passed to the object, ideally one light curve for each band of interest. For the case where the data is input as a list of :class:`stingray.Lightcurve` objects, the reference band(s) should either be 1. a single :class:`stingray.Lightcurve` object, 2. a list of :class:`stingray.Lightcurve` objects with the reference band for each band of interest pre-made, or 3. ``None``, in which case reference bands will formed by combining all light curves *except* for the band of interest. In the case of event data, ``band_interest`` and ``ref_band_interest`` can be (multiple) pairs of energies, and the light curves for the bands of interest and reference bands will be produced dynamically. Parameters ---------- data : {``numpy.ndarray`` | :class:`stingray.events.EventList` object | list of :class:`stingray.Lightcurve` objects} ``data`` contains the time series data, either in the form of a 2-D array of ``(time stamp, energy)`` pairs for event data, or as a list of light curves. Note : The event list must be in sorted order with respect to the times of arrivals. dt : float The time resolution of the :class:`stingray.Lightcurve` formed from the energy bin. Only used if ``data`` is an event list. band_interest : {``None``, iterable of tuples} If ``None``, all possible energy values will be assumed to be of interest, and a covariance spectrum in the highest resolution will be produced. Note: if the input is a list of :class:`stingray.Lightcurve` objects, then the user may supply their energy values here, for construction of a reference band. ref_band_interest : {``None``, tuple, :class:`stingray.Lightcurve`, list of :class:`stingray.Lightcurve` objects} Defines the reference band to be used for comparison with the bands of interest. If ``None``, all bands *except* the band of interest will be used for each band of interest, respectively. Alternatively, a tuple can be given for event list data, which will extract the reference band (always excluding the band of interest), or one may put in a single :class:`stingray.Lightcurve` object to be used (the same for each band of interest) or a list of :class:`stingray.Lightcurve` objects, one for each band of interest. std : float or np.array or list of numbers The term ``std`` is used to calculate the excess variance of a band. If ``std`` is set to ``None``, default Poisson case is taken and the std is calculated as ``mean(lc)**0.5``. In the case of a single float as input, the same is used as the standard deviation which is also used as the std. And if the std is an iterable of numbers, their mean is used for the same purpose. Attributes ---------- unnorm_covar : np.ndarray An array of arrays with mid point ``band_interest`` and their covariance. It is the array-form of the dictionary ``energy_covar``. The covariance values are unnormalized. covar : np.ndarray Normalized covariance spectrum. covar_error : np.ndarray Errors of the normalized covariance spectrum. References ---------- [1] Wilkinson, T. and Uttley, P. (2009), Accretion disc variability\ in the hard state of black hole X-ray binaries. Monthly Notices\ of the Royal Astronomical Society, 397: 666–676.\ doi: 10.1111/j.1365-2966.2009.15008.x Examples -------- See the `notebooks repository <https://github.com/StingraySoftware/notebooks>`_ for detailed notebooks on the code. """ def __init__(self, data, dt=None, band_interest=None, ref_band_interest=None, std=None): self.dt = dt self.std = std # check whether data is an EventList object: if isinstance(data, EventList): data = np.vstack([data.time, data.energy]).T # check whether the data contains a list of Lightcurve objects if isinstance(data[0], Lightcurve): self.use_lc = True self.lcs = data else: self.use_lc = False # if band_interest is None, extract the energy bins and make an array # with the lower and upper bounds of the energy bins if band_interest is None: if not self.use_lc: self._create_band_interest(data) else: self.band_interest = np.vstack( [np.arange(len(data)), np.arange(1, len(data) + 1, 1)] ).T else: if np.size(band_interest) < 2: raise ValueError( "band_interest must contain at least 2 values " "(minimum and maximum values for each band) " "and be a 2D array!" ) self.band_interest = np.atleast_2d(band_interest) if self.use_lc is False and not dt: raise ValueError( "If the input data is event data, the dt keyword " "must be set and supply a time resolution for " "creating light curves!" ) # if we don't have light curves already, make them: if not self.use_lc: if not np.all(np.diff(data, axis=0).T[0] >= 0): utils.simon("The event list must be sorted with respect to " "times of arrivals.") data = data[data[:, 0].argsort()] self.lcs = self._make_lightcurves(data) # check whether band of interest contains a Lightcurve object: if np.size(ref_band_interest) == 1 or isinstance(ref_band_interest, Lightcurve): if isinstance(ref_band_interest, Lightcurve): self.ref_band_lcs = ref_band_interest # ref_band_interest must either be a Lightcurve, or must have # multiple entries elif ref_band_interest is None: if self.use_lc: self.ref_band_lcs = self._make_reference_bands_from_lightcurves( ref_band_interest ) else: self.ref_band_lcs = self._make_reference_bands_from_event_data(data) else: raise ValueError( "ref_band_interest must contain either " "a Lightcurve object, a list of Lightcurve " "objects or a tuple of length 2." ) else: # check whether ref_band_interest is a list of light curves if isinstance(ref_band_interest[0], Lightcurve): self.ref_band_lcs = ref_band_interest assert len(ref_band_interest) == len(self.lcs), ( "The list of " "reference light " "curves must have " "the same length as " "the list of light curves" "of interest." ) # if not, it must be a tuple, so we're going to make a list of light # curves else: if self.use_lc: self.ref_band_lcs = self._make_reference_bands_from_lightcurves( bounds=ref_band_interest ) else: self.ref_band_lcs = self._make_reference_bands_from_event_data(data) self._construct_covar() def _make_reference_bands_from_event_data(self, data, bounds=None): """ Helper method constructing reference bands for each band of interest, and constructing light curves from these reference bands. This operates only if the data given to :class:`Covariancespectrum` is event list data (i.e. photon arrival times and energies). Parameters ---------- data : numpy.ndarray Array of shape ``(N, 2)``, where N is the number of photons. First column contains the times of arrivals, second column the corresponding photon energies. bounds : iterable The energy bounds to use for the reference band. Must be of type ``(elow, ehigh)``. Returns ------- lc_all: list of :class:`stingray.Lightcurve` objects. The list of `:class:`stingray.Lightcurve` objects containing all reference bands, between the values given in ``bounds``. """ if not bounds: bounds = [np.min(data[:, 1]), np.max(data[:, 1])] if bounds[1] <= np.min(self.band_interest[:, 0]) or bounds[0] >= np.max( self.band_interest[:, 1] ): elow = bounds[0] ehigh = bounds[1] toa = data[np.logical_and(data[:, 1] >= elow, data[:, 1] <= ehigh)] lc_all = Lightcurve.make_lightcurve(toa, self.dt, tstart=self.tstart, tseg=self.tseg) else: lc_all = [] for i, b in enumerate(self.band_interest): elow = b[0] ehigh = b[1] emask1 = data[np.logical_and(data[:, 1] <= elow, data[:, 1] >= bounds[0])] emask2 = data[np.logical_and(data[:, 1] <= bounds[1], data[:, 1] >= ehigh)] toa = np.vstack([emask1, emask2]) lc = Lightcurve.make_lightcurve(toa, self.dt, tstart=self.tstart, tseg=self.tseg) lc_all.append(lc) return lc_all def _make_reference_bands_from_lightcurves(self, bounds=None): """ Helper class to construct reference bands for all light curves in ``band_interest``, assuming the data is given to the class :class:`Covariancespectrum` as a (set of) lightcurve(s). Generally sums up all other light curves within ``bounds`` that are *not* the band of interest. Parameters ---------- bounds : iterable The energy bounds to use for the reference band. Must be of type ``(elow, ehigh)``. Returns ------- lc_all: list of :class:`stingray.Lightcurve` objects. The list of :class:`stingray.Lightcurve` objects containing all reference bands, between the values given in ``bounds``. """ if not bounds: bounds_idx = [0, len(self.band_interest)] else: low_bound = self.band_interest.searchsorted(bounds[0]) high_bound = self.band_interest.searchsorted(bounds[1]) bounds_idx = [low_bound, high_bound] lc_all = [] for i, b in enumerate(self.band_interest): # initialize empty counts array counts = np.zeros_like(self.lcs[0].counts) for j in range(bounds_idx[0], bounds_idx[1], 1): if i == j: continue else: counts += self.lcs[j].counts # make a combined light curve lc = Lightcurve(self.lcs[0].time, counts, skip_checks=True) # add to list of reference light curves lc_all.append(lc) return lc_all def _construct_covar(self): """ Helper method to construct the covariance attribute and fill it with values. """ self.avg_covar = False covar = np.zeros(len(self.lcs)) covar_err = np.zeros(len(self.lcs)) xs_var = np.zeros(len(self.lcs)) for i in range(len(self.lcs)): lc = self.lcs[i] if np.size(self.ref_band_lcs) == 1 or isinstance(self.ref_band_lcs, Lightcurve): lc_ref = self.ref_band_lcs else: lc_ref = self.ref_band_lcs[i] cv = self._compute_covariance(lc, lc_ref) cv_err = self._calculate_covariance_error(lc, lc_ref) covar[i] = cv covar_err[i] = cv_err xs = self._calculate_excess_variance(lc_ref) if not xs > 0: utils.simon( "The excess variance in the reference band is " "negative. This implies that the reference " "band was badly chosen. Beware that the " "covariance spectra will have NaNs!" ) xs_var[i] = xs self.unnorm_covar = covar energy_covar = covar / xs_var**0.5 self.covar = energy_covar self.covar_error = covar_err return def _make_lightcurves(self, data): """ Create light curves for all bands of interest from ``data``. Takes the information the ``band_interest`` attribute and event data in ``data``, and produces a list of :class:`stingray.Lightcurve` objects. Parameters ---------- data : numpy.ndarray Array of shape ``(N, 2)``, where ``N`` is the number of photons. First column contains the times of arrivals, second column the corresponding photon energies. Returns ------- lc_all : iterable of :class:`stingray.Lightcurve` objects A list of :class:`stingray.Lightcurve` objects of all bands of interest. """ self.tstart = np.min(data[:, 0]) self.tend = np.max(data[:, 0]) self.tseg = self.tend - self.tstart lc_all = [] for i, b in enumerate(self.band_interest): elow = b[0] ehigh = b[1] toa = data[np.logical_and(data[:, 1] >= elow, data[:, 1] <= ehigh)] lc = Lightcurve.make_lightcurve(toa, self.dt, tstart=self.tstart, tseg=self.tseg) lc_all.append(lc) return lc_all def _create_band_interest(self, data): """ If no bands of interest are given, but event data is, create bands of interest for each discrete energy value in the second column of ``data``. Parameters ---------- data : numpy.ndarray Array of shape (N, 2), where N is the number of photons. First column contains the times of arrivals, second column the corresponding photon energies. """ unique_energy = np.unique(data[:, 1]) energ_diff = np.diff(unique_energy) energy_low = np.zeros_like(unique_energy) energy_high = np.zeros_like(unique_energy) energy_low[:-1] = unique_energy[:-1] - 0.5 * energ_diff energy_high[:-1] = unique_energy[:-1] + 0.5 * energ_diff energy_low[-1] = unique_energy[-1] - 0.5 * energ_diff[-1] energy_high[-1] = unique_energy[-1] + 0.5 * energ_diff[-1] energy_list = np.vstack([energy_low, energy_high]).T self.band_interest = energy_list def _calculate_excess_variance(self, lc): """Calculate excess variance in a band with the standard deviation.""" std = self._calculate_std(lc) return np.var(lc) - std**2 def _calculate_std(self, lc): """Return std calculated for the possible types of `std`""" if self.std is None: std = np.mean(lc) ** 0.5 elif isinstance(self.std, Iterable): std = np.mean(self.std) # Iterable of numbers else: # Single float number std = self.std return std def _compute_covariance(self, lc1, lc2): """Calculate and return the covariance between two time series.""" return np.cov(lc1.counts, lc2.counts)[0][1] def _calculate_covariance_error(self, lc_x, lc_y): """Calculate the error of the normalized covariance spectrum.""" # Excess Variance of reference band xs_x = self._calculate_excess_variance(lc_x) # Standard deviation of light curve err_y = self._calculate_std(lc_y) # Excess Variance of reference band xs_y = self._calculate_excess_variance(lc_y) # Standard deviation of light curve err_x = self._calculate_std(lc_x) # Number of time bins in lightcurve nn = lc_x.n # Number of segments averaged if not self.avg_covar: mm = 1 else: mm = self.nbins num = xs_x * err_y + xs_y * err_x + err_x * err_y denom = nn * mm * xs_y return (num / denom) ** 0.5
[docs] class AveragedCovariancespectrum(Covariancespectrum): """ Compute a covariance spectrum for the data, defined in [covar spectrum]_ Equation 15. Parameters ---------- data : {numpy.ndarray | list of :class:`stingray.Lightcurve` objects} ``data`` contains the time series data, either in the form of a 2-D array of ``(time stamp, energy)`` pairs for event data, or as a list of :class:`stingray.Lightcurve` objects. Note : The event list must be in sorted order with respect to the times of arrivals. segment_size : float The length of each segment in the averaged covariance spectrum. The number of segments will be calculated automatically using the total length of the data set and the segment_size defined here. dt : float The time resolution of the :class:`stingray.Lightcurve` formed from the energy bin. Only used if `data` is an event list. band_interest : {``None``, iterable of tuples} If ``None``, all possible energy values will be assumed to be of interest, and a covariance spectrum in the highest resolution will be produced. Note: if the input is a list of :class:`stingray.Lightcurve` objects, then the user may supply their energy values here, for construction of a reference band. ref_band_interest : {None, tuple, :class:`stingray.Lightcurve`, list of :class:`stingray.Lightcurve` objects} Defines the reference band to be used for comparison with the bands of interest. If None, all bands *except* the band of interest will be used for each band of interest, respectively. Alternatively, a tuple can be given for event list data, which will extract the reference band (always excluding the band of interest), or one may put in a single :class:`stingray.Lightcurve` object to be used (the same for each band of interest) or a list of :class:`stingray.Lightcurve` objects, one for each band of interest. std : float or np.array or list of numbers The term ``std`` is used to calculate the excess variance of a band. If ``std`` is set to ``None``, default Poisson case is taken and the ``std`` is calculated as ``mean(lc)**0.5``. In the case of a single float as input, the same is used as the standard deviation which is also used as the std. And if the std is an iterable of numbers, their mean is used for the same purpose. Attributes ---------- unnorm_covar : np.ndarray An array of arrays with mid point band_interest and their covariance. It is the array-form of the dictionary ``energy_covar``. The covariance values are unnormalized. covar : np.ndarray Normalized covariance spectrum. covar_error : np.ndarray Errors of the normalized covariance spectrum. References ---------- .. [covar spectrum] http://arxiv.org/pdf/1405.6575v2.pdf """ def __init__( self, data, segment_size, dt=None, band_interest=None, ref_band_interest=None, std=None ): self.segment_size = segment_size Covariancespectrum.__init__( self, data, dt=dt, band_interest=band_interest, ref_band_interest=ref_band_interest, std=std, ) def _construct_covar(self): """ Helper method to construct the covariance attribute and fill it with values. """ self.avg_covar = True start_time = self.lcs[0].time[0] covar = np.zeros(len(self.lcs)) covar_err = np.zeros(len(self.lcs)) xs_var = np.zeros(len(self.lcs)) for i in range(len(self.lcs)): lc = self.lcs[i] if np.size(self.ref_band_lcs) == 1: lc_ref = self.ref_band_lcs else: lc_ref = self.ref_band_lcs[i] tstart = start_time tend = start_time + self.segment_size cv = 0.0 cv_err = 0.0 xs = 0.0 self.nbins = int((tend - tstart) / self.segment_size) for k in range(self.nbins): start_ind = lc.time.searchsorted(tstart) end_ind = lc.time.searchsorted(tend) lc_seg = lc.truncate(start=start_ind, stop=end_ind) lc_ref_seg = lc_ref.truncate(start=start_ind, stop=end_ind) cv += self._compute_covariance(lc_seg, lc_ref_seg) cv_err += self._calculate_covariance_error(lc_seg, lc_ref_seg) xs += self._calculate_excess_variance(lc_ref_seg) if not xs > 0: utils.simon( "The excess variance in the reference band is " "negative. This implies that the reference " "band was badly chosen. Beware that the " "covariance spectra will have NaNs!" ) tstart += self.segment_size tend += self.segment_size covar[i] = cv / self.nbins covar_err[i] = cv_err / self.nbins xs_var[i] = xs / self.nbins self.unnorm_covar = covar energy_covar = covar / xs_var**0.5 self.covar = energy_covar self.covar_error = covar_err return