Source code for stingray.events

"""
Definition of :class:`EventList`.

:class:`EventList` is used to handle photon arrival times.
"""

import warnings

import numpy as np

from stingray.utils import _int_sum_non_zero
from stingray.loggingconfig import setup_logger

from .base import StingrayTimeseries
from .filters import get_deadtime_mask
from .gti import generate_indices_of_boundaries
from .io import load_events_and_gtis, pi_to_energy
from .lightcurve import Lightcurve
from .utils import simon, njit
from .utils import histogram

__all__ = ["EventList"]

logger = setup_logger()


@njit
def _from_lc_numba(times, counts, empty_times):
    """Create a rough event list from a light curve.

    This function creates as many events as the counts in each time bin of the light curve,
    with event times equal to the light curve time stamps.

    Parameters
    ----------
    times : array-like
        Array of time stamps
    counts : array-like
        Array of counts
    empty_times : array-like
        Empty array to be filled with time stamps
    """
    last = 0
    for t, c in zip(times, counts):
        if c <= 0:
            continue
        val = c + last
        empty_times[last:val] = t
        last = val
    # If c < 0 in some cases, some times will be empty
    return empty_times[:val]


def simple_events_from_lc(lc):
    """
    Create an :class:`EventList` from a :class:`stingray.Lightcurve` object. Note that all
    events in a given time bin will have the same time stamp.

    Bins with negative counts will be ignored.

    Parameters
    ----------
    lc: :class:`stingray.Lightcurve` object
        Light curve to use for creation of the event list.

    Returns
    -------
    ev: :class:`EventList` object
        The resulting list of photon arrival times generated from the light curve.

    Examples
    --------
    >>> from stingray import Lightcurve
    >>> lc = Lightcurve([0, 1, 2], [2, 3, -1], dt=1)
    >>> ev = simple_events_from_lc(lc)
    >>> assert np.allclose(ev.time, [0, 0, 1, 1, 1])
    """
    counts = lc.counts.astype(int)
    allcounts = _int_sum_non_zero(counts)
    times = _from_lc_numba(lc.time, counts, np.zeros(allcounts, dtype=float))
    return EventList(time=times, gti=lc.gti)


[docs] class EventList(StingrayTimeseries): """ Basic class for event list data. Event lists generally correspond to individual events (e.g. photons) recorded by the detector, and their associated properties. For X-ray data where this type commonly occurs, events are time stamps of when a photon arrived in the detector, and (optionally) the photon energy associated with the event. Parameters ---------- time: iterable A list or array of time stamps Other Parameters ---------------- dt: float The time resolution of the events. Only relevant when using events to produce light curves with similar bin time. energy: iterable A list of array of photon energy values in keV mjdref : float The MJD used as a reference for the time array. ncounts: int Number of desired data points in event list. Deprecated gtis: ``[[gti0_0, gti0_1], [gti1_0, gti1_1], ...]`` Good Time Intervals pi : integer, numpy.ndarray PI channels notes : str Any useful annotations high_precision : bool Change the precision of self.time to float128. Useful while dealing with fast pulsars. mission : str Mission that recorded the data (e.g. NICER) instr : str Instrument onboard the mission header : str The full header of the original FITS file, if relevant detector_id : iterable The detector that recorded each photon (if the instrument has more than one, e.g. XMM/EPIC-pn) timeref : str The time reference, as recorded in the FITS file (e.g. SOLARSYSTEM) timesys : str The time system, as recorded in the FITS file (e.g. TDB) ephem : str The JPL ephemeris used to barycenter the data, if any (e.g. DE430) rmf_file : str, default None The file name of the RMF file to use for calibration. skip_checks : bool, default False Skip checks for the validity of the event list. Use with caution. **other_kw : Used internally. Any other keyword arguments will be ignored Attributes ---------- time: numpy.ndarray The array of event arrival times, in seconds from the reference MJD defined in ``mjdref`` energy: numpy.ndarray The array of photon energy values ncounts: int The number of data points in the event list dt: float The time resolution of the events. Only relevant when using events to produce light curves with similar bin time. mjdref : float The MJD used as a reference for the time array. gtis: ``[[gti0_0, gti0_1], [gti1_0, gti1_1], ...]`` Good Time Intervals pi : integer, numpy.ndarray PI channels high_precision : bool Change the precision of self.time to float128. Useful while dealing with fast pulsars. mission : str Mission that recorded the data (e.g. NICER) instr : str Instrument onboard the mission detector_id : iterable The detector that recoded each photon, if relevant (e.g. XMM, Chandra) header : str The full header of the original FITS file, if relevant """ main_array_attr = "time" def __init__( self, time=None, energy=None, ncounts=None, mjdref=0, dt=0, notes="", gti=None, pi=None, high_precision=False, mission=None, instr=None, header=None, detector_id=None, ephem=None, timeref=None, timesys=None, rmf_file=None, skip_checks=False, **other_kw, ): if ncounts is not None: warnings.warn( "The ncounts keyword does nothing, and is maintained for backwards compatibility.", DeprecationWarning, ) if rmf_file is not None: if pi is None: warnings.warn("PI channels must be provided to calibrate the energy") else: energy = pi_to_energy(pi, rmf_file) StingrayTimeseries.__init__( self, time=time, energy=None if energy is None else np.asarray(energy), mjdref=mjdref, dt=dt, notes=notes, gti=np.asarray(gti) if gti is not None else None, pi=None if pi is None else np.asarray(pi), high_precision=high_precision, mission=mission, instr=instr, header=header, detector_id=detector_id, ephem=ephem, timeref=timeref, timesys=timesys, rmf_file=rmf_file, skip_checks=skip_checks, **other_kw, ) if other_kw != {}: warnings.warn(f"Unrecognized keywords: {list(other_kw.keys())}") if (self.time is not None) and (self.energy is not None): if np.size(self.time) != np.size(self.energy): raise ValueError("Lengths of time and energy must be equal.") @property def ncounts(self): """Number of events in the event list.""" return self.n
[docs] def to_lc(self, dt, tstart=None, tseg=None): """ Convert event list to a :class:`stingray.Lightcurve` object. Parameters ---------- dt: float Binning time of the light curve Other Parameters ---------------- tstart : float Start time of the light curve tseg: float Total duration of light curve Returns ------- lc: :class:`stingray.Lightcurve` object """ return Lightcurve.make_lightcurve( self.time, dt, tstart=tstart, gti=self._gti, tseg=tseg, mjdref=self.mjdref )
[docs] def to_binned_timeseries(self, dt, array_attrs=None): """Convert the event list to a binned :class:`stingray.StingrayTimeseries` object. The result will be something similar to a light curve, but with arbitrary attributes corresponding to a weighted sum of each specified attribute of the event list. E.g. if the event list has a ``q`` attribute, the final time series will have a ``q`` attribute, which is the sum of all ``q`` values in each time bin. Parameters ---------- dt: float Binning time of the light curve Other Parameters ---------------- array_attrs: list of str List of attributes to be converted to light curve arrays. If None, all array attributes will be converted. Returns ------- lc: :class:`stingray.Lightcurve` object """ if array_attrs is None: array_attrs = self.array_attrs() ranges = [self.gti[0, 0], self.gti[-1, 1]] nbins = int((ranges[1] - ranges[0]) / dt) ranges = [ranges[0], ranges[0] + nbins * dt] times = np.arange(ranges[0] + dt * 0.5, ranges[1], dt) counts = histogram(self.time, range=ranges, bins=nbins) attr_dict = dict(counts=counts) for attr in array_attrs: if getattr(self, attr, None) is not None: logger.info(f"Creating the {attr} array") attr_dict[attr] = histogram( self.time, bins=nbins, weights=getattr(self, attr), range=ranges ) meta_attrs = dict((attr, getattr(self, attr)) for attr in self.meta_attrs()) new_ts = StingrayTimeseries(times, array_attrs=attr_dict, **meta_attrs) new_ts.dt = dt return new_ts
[docs] def to_lc_iter(self, dt, segment_size=None): """Convert event list to a generator of Lightcurves. Parameters ---------- dt: float Binning time of the light curves Other parameters ---------------- segment_size : float, default None Optional segment size. If None, use the GTI boundaries Returns ------- lc_gen: `generator` Generates one :class:`stingray.Lightcurve` object for each GTI or segment """ segment_iter = generate_indices_of_boundaries( self.time, self.gti, segment_size=segment_size, dt=0 ) for st, end, idx_st, idx_end in segment_iter: tseg = end - st lc = Lightcurve.make_lightcurve( self.time[idx_st : idx_end + 1], dt, tstart=st, gti=np.asarray([[st, end]]), tseg=tseg, mjdref=self.mjdref, use_hist=True, ) yield lc
[docs] def to_lc_list(self, dt, segment_size=None): """Convert event list to a list of Lightcurves. Parameters ---------- dt: float Binning time of the light curves Other parameters ---------------- segment_size : float, default None Optional segment size. If None, use the GTI boundaries Returns ------- lc_list: `List` List containing one :class:`stingray.Lightcurve` object for each GTI or segment """ return list(self.to_lc_iter(dt, segment_size))
[docs] @staticmethod def from_lc(lc): """ Create an :class:`EventList` from a :class:`stingray.Lightcurve` object. Note that all events in a given time bin will have the same time stamp. Bins with negative counts will be ignored. Parameters ---------- lc: :class:`stingray.Lightcurve` object Light curve to use for creation of the event list. Returns ------- ev: :class:`EventList` object The resulting list of photon arrival times generated from the light curve. """ return simple_events_from_lc(lc)
[docs] def simulate_times(self, lc, use_spline=False, bin_time=None): """Simulate times from an input light curve. Randomly simulate photon arrival times to an :class:`EventList` from a :class:`stingray.Lightcurve` object, using the inverse CDF method. ..note:: Preferably use model light curves containing **no Poisson noise**, as this method will intrinsically add Poisson noise to them. Parameters ---------- lc: :class:`stingray.Lightcurve` object Other Parameters ---------------- use_spline : bool Approximate the light curve with a spline to avoid binning effects bin_time : float default None Ignored and deprecated, maintained for backwards compatibility. Returns ------- times : array-like Simulated photon arrival times """ # Need import here, or there will be a circular import from .simulator.base import simulate_times if bin_time is not None: warnings.warn("Bin time will be ignored in simulate_times", DeprecationWarning) vals = simulate_times(lc, use_spline=use_spline) self.time = vals self._gti = lc.gti
[docs] def simulate_energies(self, spectrum, use_spline=False): """ Assign (simulate) energies to event list from a spectrum. Parameters ---------- spectrum: 2-d array or list [energies, spectrum] Energies versus corresponding fluxes. The 2-d array or list must have energies across the first dimension and fluxes across the second one. If the dimension of the energies is the same as spectrum, they are interpreted as bin centers. If it is longer by one, they are interpreted as proper bin edges (similarly to the bins of `np.histogram`). Note that for non-uniformly binned spectra, it is advisable to pass the exact edges. """ from .simulator.base import simulate_with_inverse_cdf if self.ncounts is None or self.ncounts == 0: simon("Simulating on an empty event list") return if isinstance(spectrum, list) or isinstance(spectrum, np.ndarray): energy = np.asarray(spectrum)[0] fluxes = np.asarray(spectrum)[1] if not isinstance(energy, np.ndarray): raise IndexError("Spectrum must be a 2-d array or list") else: raise TypeError("Spectrum must be a 2-d array or list") if energy.size == fluxes.size: de = energy[1] - energy[0] energy = np.concatenate([energy - de / 2, [energy[-1] + de / 2]]) self.energy = simulate_with_inverse_cdf( fluxes, self.ncounts, edges=energy, sorted=False, interp_kind="linear" )
[docs] def sort(self, inplace=False): """Sort the event list in time. Other parameters ---------------- inplace : bool, default False Sort in place. If False, return a new event list. Returns ------- eventlist : `EventList` The sorted event list. If ``inplace=True``, it will be a shallow copy of ``self``. Examples -------- >>> events = EventList(time=[0, 2, 1], energy=[0.3, 2, 0.5], pi=[3, 20, 5], ... skip_checks=True) >>> e1 = events.sort() >>> assert np.allclose(e1.time, [0, 1, 2]) >>> assert np.allclose(e1.energy, [0.3, 0.5, 2]) >>> assert np.allclose(e1.pi, [3, 5, 20]) But the original event list has not been altered (``inplace=False`` by default): >>> assert np.allclose(events.time, [0, 2, 1]) Let's do it in place instead >>> e2 = events.sort(inplace=True) >>> assert np.allclose(e2.time, [0, 1, 2]) In this case, the original event list has been altered. >>> assert np.allclose(events.time, [0, 1, 2]) """ order = np.argsort(self.time) return self.apply_mask(order, inplace=inplace)
[docs] def join(self, other, strategy="infer"): """ Join two :class:`EventList` objects into one. If both are empty, an empty :class:`EventList` is returned. GTIs are crossed if the event lists are over a common time interval, and appended otherwise. Standard attributes such as ``pi`` and ``energy`` remain ``None`` if they are ``None`` in both. Otherwise, ``np.nan`` is used as a default value for the :class:`EventList` where they were None. Arbitrary attributes (e.g., Stokes parameters in polarimetric data) are created and joined using the same convention. Multiple checks are done on the joined event lists. If the time array of the event list being joined is empty, it is ignored. If the time resolution is different, the final event list will have the rougher time resolution. If the MJDREF is different, the time reference will be changed to the one of the first event list. An empty event list will be ignored. Parameters ---------- other : :class:`EventList` object or class:`list` of :class:`EventList` objects The other :class:`EventList` object which is supposed to be joined with. If ``other`` is a list, it is assumed to be a list of :class:`EventList` objects and they are all joined, one by one. Other parameters ---------------- strategy : {"intersection", "union", "append", "infer", "none"} Method to use to merge the GTIs. If "intersection", the GTIs are merged using the intersection of the GTIs. If "union", the GTIs are merged using the union of the GTIs. If "none", a single GTI with the minimum and the maximum time stamps of all GTIs is returned. If "infer", the strategy is decided based on the GTIs. If there are no overlaps, "union" is used, otherwise "intersection" is used. If "append", the GTIs are simply appended but they must be mutually exclusive. Returns ------- `ev_new` : :class:`EventList` object The resulting :class:`EventList` object. """ return self._join_timeseries(other, strategy=strategy, ignore_meta=["header", "ncounts"])
[docs] @classmethod def read(cls, filename, fmt=None, rmf_file=None, **kwargs): r"""Read a :class:`EventList` object from file. Currently supported formats are * pickle (not recommended for long-term storage) * hea or ogip : FITS Event files from (well, some) HEASARC-supported missions. * any other formats compatible with the writers in :class:`astropy.table.Table` (ascii.ecsv, hdf5, etc.) Files that need the :class:`astropy.table.Table` interface MUST contain at least a ``time`` column. Other recognized columns are ``energy`` and ``pi``. The default ascii format is enhanced CSV (ECSV). Data formats supporting the serialization of metadata (such as ECSV and HDF5) can contain all eventlist attributes such as ``mission``, ``gti``, etc with no significant loss of information. Other file formats might lose part of the metadata, so must be used with care. Parameters ---------- filename: str Path and file name for the file to be read. fmt: str Available options are 'pickle', 'hea', and any `Table`-supported format such as 'hdf5', 'ascii.ecsv', etc. Other parameters ---------------- rmf_file : str, default None The file name of the RMF file to use for energy calibration. Defaults to None, which implies no channel->energy conversion at this stage (or a default calibration applied to selected missions). kwargs : dict Any further keyword arguments to be passed to `load_events_and_gtis` for reading in event lists in OGIP/HEASOFT format Returns ------- ev: :class:`EventList` object The :class:`EventList` object reconstructed from file """ if fmt is not None and fmt.lower() in ("hea", "ogip"): evtdata = load_events_and_gtis(filename, **kwargs) evt = EventList( time=evtdata.ev_list, gti=evtdata.gti_list, pi=evtdata.pi_list, energy=evtdata.energy_list, mjdref=evtdata.mjdref, instr=evtdata.instr, mission=evtdata.mission, header=evtdata.header, detector_id=evtdata.detector_id, ephem=evtdata.ephem, timeref=evtdata.timeref, timesys=evtdata.timesys, ) if "additional_columns" in kwargs: for key in evtdata.additional_data: if not hasattr(evt, key.lower()): setattr(evt, key.lower(), evtdata.additional_data[key]) if rmf_file is not None: evt.convert_pi_to_energy(rmf_file) return evt return super().read(filename=filename, fmt=fmt)
[docs] def convert_pi_to_energy(self, rmf_file): """Calibrate the energy column of the event list. Defines the ``energy`` attribute of the event list by converting the PI channels to energy using the provided RMF file. Parameters ---------- rmf_file : str The file name of the RMF file to use for calibration. """ self.energy = pi_to_energy(self.pi, rmf_file)
[docs] def get_energy_mask(self, energy_range, use_pi=False): """Get a mask corresponding to events with a given energy range. Parameters ---------- energy_range: [float, float] Energy range in keV, or in PI channel (if ``use_pi`` is True) Other Parameters ---------------- use_pi : bool, default False Use PI channel instead of energy in keV """ if use_pi: energies = self.pi else: energies = self.energy return (energies >= energy_range[0]) & (energies < energy_range[1])
[docs] def filter_energy_range(self, energy_range, inplace=False, use_pi=False): """Filter the event list from a given energy range. Parameters ---------- energy_range: [float, float] Energy range in keV, or in PI channel (if ``use_pi`` is True) Other Parameters ---------------- inplace : bool, default False Do the change in place (modify current event list). Otherwise, copy to a new event list. use_pi : bool, default False Use PI channel instead of energy in keV Examples -------- >>> events = EventList(time=[0, 1, 2], energy=[0.3, 0.5, 2], pi=[3, 5, 20]) >>> e1 = events.filter_energy_range([0, 1]) >>> assert np.allclose(e1.time, [0, 1]) >>> assert np.allclose(events.time, [0, 1, 2]) >>> e2 = events.filter_energy_range([0, 10], use_pi=True, inplace=True) >>> assert np.allclose(e2.time, [0, 1]) >>> assert np.allclose(events.time, [0, 1]) """ mask = self.get_energy_mask(energy_range, use_pi=use_pi) return self.apply_mask(mask, inplace=inplace)
[docs] def apply_deadtime(self, deadtime, inplace=False, **kwargs): """Apply deadtime filter to this event list. Additional arguments in ``kwargs`` are passed to `get_deadtime_mask` Parameters ---------- deadtime : float Value of dead time to apply to data inplace : bool, default False If True, apply the deadtime to the current event list. Otherwise, return a new event list. Returns ------- new_event_list : `EventList` object Filtered event list. if `inplace` is True, this is the input object filtered for deadtime, otherwise this is a new object. additional_output : object Only returned if `return_all` is True. See `get_deadtime_mask` for more details. Examples -------- >>> events = np.array([1, 1.05, 1.07, 1.08, 1.1, 2, 2.2, 3, 3.1, 3.2]) >>> events = EventList(events, gti=[[0, 3.3]]) >>> events.pi=np.array([1, 2, 2, 2, 2, 1, 1, 1, 2, 1]) >>> events.energy=np.array([1, 2, 2, 2, 2, 1, 1, 1, 2, 1]) >>> events.mjdref = 10 >>> filt_events, retval = events.apply_deadtime(0.11, inplace=False, ... verbose=False, ... return_all=True) >>> assert filt_events is not events >>> expected = np.array([1, 2, 2.2, 3, 3.2]) >>> assert np.allclose(filt_events.time, expected) >>> assert np.allclose(filt_events.pi, 1) >>> assert np.allclose(filt_events.energy, 1) >>> assert not np.allclose(events.pi, 1) >>> filt_events = events.apply_deadtime(0.11, inplace=True, ... verbose=False) >>> assert filt_events is events """ local_retall = kwargs.pop("return_all", False) mask, retall = get_deadtime_mask(self.time, deadtime, return_all=True, **kwargs) new_ev = self.apply_mask(mask, inplace=inplace) if local_retall: new_ev = [new_ev, retall] return new_ev
[docs] def get_color_evolution(self, energy_ranges, segment_size=None, use_pi=False): """Compute the color in equal-length segments of the event list. Parameters ---------- energy_ranges : 2x2 list List of energy ranges to compute the color: ``[[en1_min, en1_max], [en2_min, en2_max]]`` segment_size : float Segment size in seconds. If None, the full GTIs are considered instead as segments. Other Parameters ---------------- use_pi : bool, default False Use PI channel instead of energy in keV Returns ------- color : array-like Array of colors, computed in each segment as the ratio of the counts in the second energy range to the counts in the first energy range. """ if energy_ranges is None or np.shape(energy_ranges) != (2, 2): raise ValueError("Energy ranges must be specified as a 2x2 array") def color(ev): mask1 = ev.get_energy_mask(energy_ranges[0], use_pi=use_pi) mask2 = ev.get_energy_mask(energy_ranges[1], use_pi=use_pi) en1_ct = np.count_nonzero(mask1) en2_ct = np.count_nonzero(mask2) color = en2_ct / en1_ct color_err = color * (np.sqrt(en1_ct) / en1_ct + np.sqrt(en2_ct) / en2_ct) return color, color_err starts, stops, (colors, color_errs) = self.analyze_segments(color, segment_size) return starts, stops, colors, color_errs
[docs] def get_intensity_evolution(self, energy_range, segment_size=None, use_pi=False): """Compute the intensity in equal-length segments (or full GTIs) of the event list. Parameters ---------- energy_range : ``[en1_min, en1_max]`` Energy range to compute the intensity segment_size : float Segment size in seconds. If None, the full GTIs are considered instead as segments. Other Parameters ---------------- use_pi : bool, default False Use PI channel instead of energy in keV Returns ------- intensity : array-like Array of intensities (in counts/s), computed in each segment. """ if energy_range is None or np.shape(energy_range) != (2,): raise ValueError("Energy ranges must be specified as a 2-element list") def intensity(ev): mask1 = ev.get_energy_mask(energy_range, use_pi=use_pi) en1_ct = np.count_nonzero(mask1) segment_size = ev.gti[0, 1] - ev.gti[0, 0] rate = en1_ct / segment_size rate_err = np.sqrt(en1_ct) / segment_size return rate, rate_err starts, stops, (rate, rate_err) = self.analyze_segments(intensity, segment_size) return starts, stops, rate, rate_err