Source code for stingray.simulator.simulator

import pickle
from os import error
import numpy as np
import numbers
import warnings
from scipy import signal
import astropy.modeling.models
from stingray import utils
from stingray import Lightcurve
from stingray import AveragedPowerspectrum

__all__ = ["Simulator"]


[docs] class Simulator(object): """ Methods to simulate and visualize light curves. TODO: Improve documentation Parameters ---------- dt : int, default 1 time resolution of simulated light curve N : int, default 1024 bins count of simulated light curve mean : float, default 0 mean value of the simulated light curve rms : float, default 1 fractional rms of the simulated light curve, actual rms is calculated by mean*rms err : float, default 0 the errorbars on the final light curve red_noise : int, default 1 multiple of real length of light curve, by which to simulate, to avoid red noise leakage random_state : int, default None seed value for random processes poisson : bool, default False return Poisson-distributed light curves. """ def __init__( self, dt, N, mean, rms, err=0.0, red_noise=1, random_state=None, tstart=0.0, poisson=False ): self.dt = dt if not isinstance(N, (int, np.integer)): raise ValueError("N must be integer!") self.N = N if mean == 0: warnings.warn( "Careful! A mean of zero is unphysical!" + "This may have unintended consequences!" ) self.mean = mean self.nphot = self.mean * self.N self.rms = rms self.red_noise = red_noise self.tstart = tstart self.time = dt * np.arange(N) + self.tstart self.nphot_factor = 1000_000 self.err = err self.poisson = poisson # Initialize a tuple of energy ranges with corresponding light curves self.channels = [] self.random_state = utils.get_random_state(random_state) assert rms <= 1, "Fractional rms must be less than 1." assert dt > 0, "Time resolution must be greater than 0"
[docs] def simulate(self, *args): """ Simulate light curve generation using power spectrum or impulse response. Examples -------- * x = simulate(beta): For generating a light curve using power law spectrum. Parameters: * beta : float Defines the shape of spectrum * x = simulate(s): For generating a light curve from user-provided spectrum. **Note**: In this case, the `red_noise` parameter is provided. You can generate a longer light curve by providing a higher frequency resolution on the input power spectrum. Parameters: * s : array-like power spectrum * x = simulate(model): For generating a light curve from pre-defined model Parameters: * model : astropy.modeling.Model the pre-defined model * x = simulate('model', params): For generating a light curve from pre-defined model Parameters: * model : string the pre-defined model * params : list iterable or dict the parameters for the pre-defined model * x = simulate(s, h): For generating a light curve using impulse response. Parameters: * s : array-like Underlying variability signal * h : array-like Impulse response * x = simulate(s, h, 'same'): For generating a light curve of same length as input signal, using impulse response. Parameters: * s : array-like Underlying variability signal * h : array-like Impulse response * mode : str mode can be 'same', 'filtered, or 'full'. 'same' indicates that the length of output light curve is same as that of input signal. 'filtered' means that length of output light curve is len(s) - lag_delay 'full' indicates that the length of output light curve is len(s) + len(h) -1 Parameters ---------- args See examples below. Returns ------- lightCurve : `LightCurve` object """ if isinstance(args[0], (numbers.Integral, float)) and len(args) == 1: return self._simulate_power_law(args[0]) elif isinstance(args[0], astropy.modeling.Model) and len(args) == 1: return self._simulate_model(args[0]) elif utils.is_string(args[0]) and len(args) == 2: return self._simulate_model_string(args[0], args[1]) elif len(args) == 1: return self._simulate_power_spectrum(args[0]) elif len(args) == 2: return self._simulate_impulse_response(args[0], args[1]) elif len(args) == 3: return self._simulate_impulse_response(args[0], args[1], args[2]) else: raise ValueError("Length of arguments must be 1, 2 or 3.")
[docs] def simulate_channel(self, channel, *args): """ Simulate a lightcurve and add it to corresponding energy channel. Parameters ---------- channel : str range of energy channel (e.g., 3.5-4.5) *args see description of simulate() for details Returns ------- lightCurve : `LightCurve` object """ # Check that channel name does not already exist. if channel not in [lc[0] for lc in self.channels]: self.channels.append((channel, self.simulate(*args))) else: raise KeyError("A channel with this name already exists.")
[docs] def get_channel(self, channel): """ Get lightcurve belonging to the energy channel. """ return [lc[1] for lc in self.channels if lc[0] == channel][0]
[docs] def get_channels(self, channels): """ Get multiple light curves belonging to the energy channels. """ return [lc[1] for lc in self.channels if lc[0] in channels]
[docs] def get_all_channels(self): """ Get lightcurves belonging to all channels. """ return [lc[1] for lc in self.channels]
[docs] def delete_channel(self, channel): """ Delete an energy channel. """ channel = [lc for lc in self.channels if lc[0] == channel] if len(channel) == 0: raise KeyError("This channel does not exist or has already been " "deleted.") else: index = self.channels.index(channel[0]) del self.channels[index]
[docs] def delete_channels(self, channels): """ Delete multiple energy channels. """ n = len(channels) channels = [lc for lc in self.channels if lc[0] in channels] if len(channels) != n: raise KeyError( "One of more of the channels do not exist or have " "already been deleted." ) else: indices = [self.channels.index(channel) for channel in channels] for i in sorted(indices, reverse=True): del self.channels[i]
[docs] def count_channels(self): """ Return total number of energy channels. """ return len(self.channels)
[docs] def simple_ir(self, start=0, width=1000, intensity=1): """ Construct a simple impulse response using start time, width and scaling intensity. To create a delta impulse response, set width to 1. Parameters ---------- start : int start time of impulse response width : int width of impulse response intensity : float scaling parameter to set the intensity of delayed emission corresponding to direct emission. Returns ------- h : numpy.ndarray Constructed impulse response """ # Fill in 0 entries until the start time h_zeros = np.zeros(int(start / self.dt)) # Define constant impulse response h_ones = np.ones(int(width / self.dt)) * intensity return np.append(h_zeros, h_ones)
[docs] def relativistic_ir(self, t1=3, t2=4, t3=10, p1=1, p2=1.4, rise=0.6, decay=0.1): """ Construct a realistic impulse response considering the relativistic effects. Parameters ---------- t1 : int primary peak time t2 : int secondary peak time t3 : int end time p1 : float value of primary peak p2 : float value of secondary peak rise : float slope of rising exponential from primary peak to secondary peak decay : float slope of decaying exponential from secondary peak to end time Returns ------- h : numpy.ndarray Constructed impulse response """ dt = self.dt assert t2 > t1, "Secondary peak must be after primary peak." assert t3 > t2, "End time must be after secondary peak." assert p2 > p1, "Secondary peak must be greater than primary peak." # Append zeros before start time h_primary = np.append(np.zeros(int(t1 / dt)), p1) # Create a rising exponential of user-provided slope x = np.linspace(t1 / dt, t2 / dt, int((t2 - t1) / dt)) h_rise = np.exp(rise * x) # Evaluate a factor for scaling exponential factor = np.max(h_rise) / (p2 - p1) h_secondary = (h_rise / factor) + p1 # Create a decaying exponential until the end time x = np.linspace(t2 / dt, t3 / dt, int((t3 - t2) / dt)) h_decay = np.exp((-decay) * (x - 4 / dt)) # Add the three responses h = np.append(h_primary, h_secondary) h = np.append(h, h_decay) return h
def _find_inverse(self, real, imaginary): """ Forms complex numbers corresponding to real and imaginary parts and finds inverse series. Parameters ---------- real : numpy.ndarray Co-effients corresponding to real parts of complex numbers imaginary : numpy.ndarray Co-efficients correspondong to imaginary parts of complex numbers Returns ------- ifft : numpy.ndarray Real inverse fourier transform of complex numbers """ # Form complex numbers corresponding to each frequency f = [complex(r, i) for r, i in zip(real, imaginary)] f = np.hstack([self.mean * self.N * self.red_noise, f]) # Obtain time series return np.fft.irfft(f, n=self.N * self.red_noise) def _timmerkoenig(self, pds_shape): """Straight application of T&K method to a PDS shape.""" pds_size = pds_shape.size real = np.random.normal(size=pds_size) * np.sqrt(0.5 * pds_shape) imaginary = np.random.normal(size=pds_size) * np.sqrt(0.5 * pds_shape) imaginary[-1] = 0 counts = self._find_inverse(real, imaginary) self.std = counts.std() rescaled_counts = self._extract_and_scale(counts) err = np.zeros_like(rescaled_counts) if self.poisson: bad = rescaled_counts < 0 if np.any(bad): warnings.warn("Some bins of the light curve have counts < 0. Setting to 0") rescaled_counts[bad] = 0 lc = Lightcurve( self.time, np.random.poisson(rescaled_counts), err_dist="poisson", dt=self.dt, skip_checks=True, ) lc.smooth_counts = rescaled_counts else: lc = Lightcurve( self.time, rescaled_counts, err=err, err_dist="gauss", dt=self.dt, skip_checks=True ) return lc def _simulate_power_law(self, B): """ Generate LightCurve from a power law spectrum. Parameters ---------- B : int Defines the shape of power law spectrum. Returns ------- lightCurve : array-like """ # Define frequencies at which to compute PSD w = np.fft.rfftfreq(self.red_noise * self.N, d=self.dt)[1:] pds_shape = np.power((1 / w), B) return self._timmerkoenig(pds_shape) def _simulate_power_spectrum(self, s): """ Generate a light curve from user-provided spectrum. Parameters ---------- s : array-like power spectrum Returns ------- lightCurve : `LightCurve` object """ # Cast spectrum as numpy array pds_shape = np.zeros(s.size * self.red_noise) pds_shape[: s.size] = s return self._timmerkoenig(pds_shape) def _simulate_model(self, model): """ For generating a light curve from a pre-defined model Parameters ---------- model : astropy.modeling.Model derived function the pre-defined model (library-based, available in astropy.modeling.models or custom-defined) Returns ------- lightCurve : :class:`stingray.lightcurve.LightCurve` object """ # Frequencies at which the PSD is to be computed # (only positive frequencies, since the signal is real) nbins = self.red_noise * self.N simfreq = np.fft.rfftfreq(nbins, d=self.dt)[1:] # Compute PSD from model simpsd = model(simfreq) return self._timmerkoenig(simpsd) def _simulate_model_string(self, model_str, params): """ For generating a light curve from a pre-defined model Parameters ---------- model_str : string name of the pre-defined model params : list or dictionary parameters of the pre-defined model Returns ------- lightCurve : :class:`stingray.lightcurve.LightCurve` object """ from . import models # Frequencies at which the PSD is to be computed # (only positive frequencies, since the signal is real) nbins = self.red_noise * self.N simfreq = np.fft.rfftfreq(nbins, d=self.dt)[1:] if model_str not in dir(models): raise ValueError("Model is not defined!") if isinstance(params, dict): model = eval("models." + model_str + "(**params)") # Compute PSD from model simpsd = model(simfreq) elif isinstance(params, list): simpsd = eval("models." + model_str + "(simfreq, params)") else: raise ValueError("Params should be list or dictionary!") return self._timmerkoenig(simpsd) def _simulate_impulse_response(self, s, h, mode="same"): """ Generate LightCurve from impulse response. To get accurate results, binning intervals (dt) of variability signal 's' and impulse response 'h' must be equal. Parameters ---------- s : array-like Underlying variability signal h : array-like Impulse response mode : str mode can be 'same', 'filtered, or 'full'. 'same' indicates that the length of output light curve is same as that of input signal. 'filtered' means that length of output light curve is len(s) - lag_delay 'full' indicates that the length of output light curve is len(s) + len(h) -1 Returns ------- lightCurve : :class:`stingray.lightcurve.LightCurve` object """ lc = signal.fftconvolve(s, h) if mode == "same": lc = lc[: -(len(h) - 1)] elif mode == "filtered": lc = lc[(len(h) - 1) : -(len(h) - 1)] time = self.dt * np.arange(0.5, len(lc)) + self.tstart err = np.zeros_like(time) return Lightcurve(time, lc, err_dist="gauss", dt=self.dt, err=err, skip_checks=True) def _extract_and_scale(self, long_lc): """ i) Make a random cut and extract a light curve of required length. ii) Rescale light curve i) with zero mean and unit standard deviation, and ii) user provided mean and rms (fractional rms * mean) Parameters ---------- long_lc : numpy.ndarray Simulated lightcurve of length 'N' times 'red_noise' Returns ------- lc : numpy.ndarray Normalized and extracted lightcurve of length 'N' """ if self.red_noise == 1: lc = long_lc else: # Make random cut and extract light curve of length 'N' extract = self.random_state.randint(self.N - 1, self.red_noise * self.N - self.N + 1) lc = np.take(long_lc, range(extract, extract + self.N)) mean_lc = np.mean(lc) if self.mean == 0: return (lc - mean_lc) / self.std * self.rms else: return (lc - mean_lc) / self.std * self.mean * self.rms + self.mean
[docs] def powerspectrum(self, lc, seg_size=None): """ Make a powerspectrum of the simulated light curve. Parameters ---------- lc : lightcurve.Lightcurve object OR iterable of lightcurve.Lightcurve objects The light curve data to be Fourier-transformed. Returns ------- power : numpy.ndarray The array of normalized squared absolute values of Fourier amplitudes """ if seg_size is None: seg_size = lc.tseg return AveragedPowerspectrum(lc, seg_size).power
[docs] @staticmethod def read(filename, fmt="pickle"): """ Reads transfer function from a 'pickle' file. Parameters ---------- fmt : str the format of the file to be retrieved - accepts 'pickle'. Returns ------- data : class instance `TransferFunction` object """ if fmt == "pickle": with open(filename, "rb") as fobj: return pickle.load(fobj) else: raise KeyError("Format not understood.")
[docs] def write(self, filename, fmt="pickle"): """ Writes a transfer function to 'pickle' file. Parameters ---------- fmt : str the format of the file to be saved - accepts 'pickle' """ if fmt == "pickle": with open(filename, "wb") as fobj: pickle.dump(self, fobj) else: raise KeyError("Format not understood.")