import numbers
import os
import re
import copy
import random
import string
import sys
import warnings
import tempfile
from collections.abc import Iterable
import numpy as np
import scipy
from numpy import histogram as histogram_np
from numpy import histogram2d as histogram2d_np
from numpy import histogramdd as histogramdd_np
from .loggingconfig import setup_logger
logger = setup_logger()
try:
import pyfftw
from pyfftw.interfaces.numpy_fft import (
ifft,
fft,
fftfreq,
fftn,
ifftn,
fftshift,
fft2,
ifftshift,
rfft,
rfftfreq,
)
pyfftw.interfaces.cache.enable()
HAS_PYFFTW = True
logger.info("Using PyFFTW")
except ImportError:
from numpy.fft import ifft, fft, fftfreq, fftn, ifftn, fftshift, fft2, ifftshift, rfft, rfftfreq
HAS_PYFFTW = False
# If numba is installed, import jit. Otherwise, define an empty decorator with
# the same name.
try:
from numba import jit
HAS_NUMBA = True
from numba import njit, prange, vectorize, float32, float64, int32, int64
from numba.core.errors import NumbaValueError, NumbaNotImplementedError, TypingError
except ImportError:
warnings.warn("Numba not installed. Faking it")
HAS_NUMBA = False
NumbaValueError = NumbaNotImplementedError = TypingError = Exception
def njit(f=None, *args, **kwargs):
def decorator(func, *a, **kw):
return func
if callable(f):
return f
else:
return decorator
jit = njit
def vectorize(*args, **kwargs):
def decorator(func, *a, **kw):
return np.vectorize(func)
return decorator
def generic(x, y=None):
return None
float32 = float64 = int32 = int64 = generic
def prange(x):
return range(x)
try:
from tqdm import tqdm as show_progress
except ImportError:
def show_progress(a):
return a
try:
from statsmodels.robust import mad as mad # pylint: disable=unused-import
except ImportError:
def mad(data, c=0.6745, axis=None):
"""
Mean Absolute Deviation (MAD) along an axis.
Straight from statsmodels's source code, adapted
Parameters
----------
data : iterable
The data along which to calculate the MAD
c : float, optional
The normalization constant. Defined as
``scipy.stats.norm.ppf(3/4.)``, which is approximately ``.6745``.
axis : int, optional, default ``0``
Axis along which to calculate ``mad``. Default is ``0``, can also
be ``None``
"""
data = np.asarray(data)
if axis is not None:
center = np.apply_over_axes(np.median, data, axis)
else:
center = np.median(data)
return np.median((np.fabs(data - center)) / c, axis=axis)
__all__ = [
"simon",
"rebin_data",
"rebin_data_log",
"look_for_array_in_array",
"is_string",
"is_iterable",
"order_list_of_arrays",
"optimal_bin_time",
"contiguous_regions",
"is_int",
"get_random_state",
"baseline_als",
"excess_variance",
"create_window",
"poisson_symmetrical_errors",
"standard_error",
"nearest_power_of_two",
"find_nearest",
"check_isallfinite",
"heaviside",
]
[docs]
@njit()
def heaviside(x):
"""Heaviside function. Returns 1 if x>0, and 0 otherwise.
Examples
--------
>>> heaviside(2)
1
>>> heaviside(-1)
0
"""
if x >= 0:
return 1
else:
return 0
@njit
def any_complex_in_array(array):
"""Check if any element of an array is complex.
Examples
--------
>>> any_complex_in_array(np.array([1, 2, 3]))
False
>>> assert any_complex_in_array(np.array([1, 2 + 1.j, 3]))
"""
for a in array:
if np.iscomplex(a):
return True
return False
def make_nd_into_arrays(array: np.ndarray, label: str) -> dict:
"""If an array is n-dimensional, make it into many 1-dimensional arrays.
Call additional dimensions, e.g. ``_dimN_M``. See examples below.
Parameters
----------
array : `np.ndarray`
Input data
label : `str`
Label for the array
Returns
-------
data : `dict`
Dictionary of arrays. Defaults to ``{label: array}`` if ``array`` is 1-dimensional,
otherwise, e.g.: ``{label_dim1_2_3: array[1, 2, 3], ... }``
Examples
--------
>>> a1, a2, a3 = np.arange(3), np.arange(3, 6), np.arange(6, 9)
>>> A = np.array([a1, a2, a3]).T
>>> data = make_nd_into_arrays(A, "test")
>>> assert np.array_equal(data["test_dim0"], a1)
>>> assert np.array_equal(data["test_dim1"], a2)
>>> assert np.array_equal(data["test_dim2"], a3)
>>> A3 = [[[1, 2], [3, 4]], [[5, 6], [7, 8]]]
>>> data = make_nd_into_arrays(A3, "test")
>>> assert np.array_equal(data["test_dim0_0"], [1, 5])
"""
data = {}
array = np.asarray(array)
shape = np.shape(array)
ndim = len(shape)
if ndim <= 1:
data[label] = array
else:
for i in range(shape[1]):
new_label = f"_dim{i}" if "_dim" not in label else f"_{i}"
dumdata = make_nd_into_arrays(array[:, i], label=label + new_label)
data.update(dumdata)
return data
def get_dimensions_from_list_of_column_labels(labels: list, label: str) -> list:
"""Get the dimensions of a multi-dimensional array from a list of column labels.
Examples
--------
>>> labels = ['test_dim0_0', 'test_dim0_1', 'test_dim0_2',
... 'test_dim1_0', 'test_dim1_1', 'test_dim1_2', 'test', 'bu']
>>> keys, dimensions = get_dimensions_from_list_of_column_labels(labels, "test")
>>> for key0, key1 in zip(labels[:6], keys): assert key0 == key1
>>> assert np.array_equal(dimensions, [2, 3])
"""
all_keys = []
count_dimensions = None
for key in labels:
if label not in key:
continue
match = re.search("^" + label + r"_dim([0-9]+(_[0-9]+)*)", key)
if match is None:
continue
all_keys.append(key)
new_count_dimensions = [int(val) for val in match.groups()[0].split("_")]
if count_dimensions is None:
count_dimensions = np.array(new_count_dimensions)
else:
count_dimensions = np.max([count_dimensions, new_count_dimensions], axis=0)
return sorted(all_keys), count_dimensions + 1
def make_1d_arrays_into_nd(data: dict, label: str) -> np.ndarray:
"""Literally the opposite of make_nd_into_arrays.
Call additional dimensions, e.g. ``_dimN_M``
Parameters
----------
data : dict
Input data
label : `str`
Label for the array
Returns
-------
array : `np.array`
N-dimensional array that was stored in the data.
Examples
--------
>>> a1, a2, a3 = np.arange(3), np.arange(3, 6), np.arange(6, 9)
>>> A = np.array([a1, a2, a3]).T
>>> data = make_nd_into_arrays(A, "test")
>>> A_ret = make_1d_arrays_into_nd(data, "test")
>>> assert np.array_equal(A, A_ret)
>>> A = np.array([[[1, 2, 12], [3, 4, 34]],
... [[5, 6, 56], [7, 8, 78]],
... [[9, 10, 910], [11, 12, 1112]],
... [[13, 14, 1314], [15, 16, 1516]]])
>>> data = make_nd_into_arrays(A, "_test")
>>> A_ret = make_1d_arrays_into_nd(data, "_test")
>>> assert np.array_equal(A, A_ret)
>>> data = make_nd_into_arrays(a1, "_test")
>>> A_ret = make_1d_arrays_into_nd(data, "_test")
>>> assert np.array_equal(a1, A_ret)
"""
if label in list(data.keys()):
return data[label]
# Get the dimensionality of the data
dim = 0
all_keys = []
all_keys, dimensions = get_dimensions_from_list_of_column_labels(list(data.keys()), label)
arrays = np.array([np.array(data[key]) for key in all_keys])
return arrays.T.reshape([len(arrays[0])] + list(dimensions))
@njit
def _check_isallfinite_numba(array):
"""Check if all elements of an array are finite.
This is faster than ``np.isfinite`` for large arrays, because it
exits at the first occurrence of a non-finite value.
Examples
--------
>>> assert _check_isallfinite_numba(np.array([1., 2., 3.]))
>>> _check_isallfinite_numba(np.array([1., np.inf, 3.]))
False
"""
for a in array:
if not np.isfinite(a):
return False
return True
[docs]
def check_isallfinite(array):
"""Check if all elements of an array are finite.
Calls ``_check_isallfinite_numba`` if numba is installed, otherwise
it uses ``np.isfinite``.
Examples
--------
>>> assert check_isallfinite([1, 2, 3])
>>> check_isallfinite([1, np.inf, 3])
False
>>> check_isallfinite([1, np.nan, 3])
False
"""
if HAS_NUMBA:
# Numba is very picky about the type of the input array. If an exception
# occurs in the numba-compiled function, use the default Numpy implementation.
try:
return _check_isallfinite_numba(np.asarray(array))
except Exception:
pass
return bool(np.all(np.isfinite(array)))
def is_sorted(array):
"""Check if an array is sorted.
Checks if an array has extended precision before calling the
``is_sorted`` numba-compiled function.
Parameters
----------
array : iterable
The array to be checked
Returns
-------
is_sorted : bool
True if the array is sorted, False otherwise
"""
array = np.asarray(array)
# If the array is empty or has only one element, it is sorted
if array.size <= 1:
return True
# If Numba is not installed, use numpy's implementation
if not HAS_NUMBA:
return np.all(np.diff(array) >= 0)
# Test if value is compatible with Numba's type system
try:
_is_sorted_numba(array[:2])
except NumbaValueError:
array = array.astype(float)
return _is_sorted_numba(array)
@njit()
def _is_sorted_numba(array):
"""Check if an array is sorted.
.. note::
The array cannot have extended precision.
This function should always be wrapped into a function that
checks the type of the array and converts it to float if needed.
Parameters
----------
array : iterable
The array to be checked
Returns
-------
is_sorted : bool
True if the array is sorted, False otherwise
"""
for i in prange(len(array) - 1):
if array[i] > array[i + 1]:
return False
return True
def _root_squared_mean(array):
array = np.asarray(array)
return np.sqrt(np.sum(array**2)) / array.size
[docs]
def simon(message, **kwargs):
"""The Statistical Interpretation MONitor.
A warning system designed to always remind the user that Simon
is watching him/her.
Parameters
----------
message : string
The message that is thrown
kwargs : dict
The rest of the arguments that are passed to ``warnings.warn``
"""
warnings.warn("SIMON says: {0}".format(message), **kwargs)
[docs]
def rebin_data(x, y, dx_new, yerr=None, method="sum", dx=None):
"""Rebin some data to an arbitrary new data resolution. Either sum
the data points in the new bins or average them.
Parameters
----------
x: iterable
The dependent variable with some resolution, which can vary throughout
the time series.
y: iterable
The independent variable to be binned
dx_new: float
The new resolution of the dependent variable ``x``
Other parameters
----------------
yerr: iterable, optional
The uncertainties of ``y``, to be propagated during binning.
method: {``sum`` | ``average`` | ``mean``}, optional, default ``sum``
The method to be used in binning. Either sum the samples ``y`` in
each new bin of ``x``, or take the arithmetic mean.
dx: float
The old resolution (otherwise, calculated from difference between
time bins)
Returns
-------
xbin: numpy.ndarray
The midpoints of the new bins in ``x``
ybin: numpy.ndarray
The binned quantity ``y``
ybin_err: numpy.ndarray
The uncertainties of the binned values of ``y``.
step_size: float
The size of the binning step
Examples
--------
>>> x = np.arange(0, 100, 0.01)
>>> y = np.ones(x.size)
>>> yerr = np.ones(x.size)
>>> xbin, ybin, ybinerr, step_size = rebin_data(
... x, y, 4, yerr=yerr, method='sum', dx=0.01)
>>> assert np.allclose(ybin, 400)
>>> assert np.allclose(ybinerr, 20)
>>> xbin, ybin, ybinerr, step_size = rebin_data(
... x, y, 4, yerr=yerr, method='mean')
>>> assert np.allclose(ybin, 1)
>>> assert np.allclose(ybinerr, 0.05)
"""
y = np.asarray(y)
if yerr is None:
yerr = np.zeros_like(y)
else:
yerr = np.asarray(yerr)
if isinstance(dx, Iterable):
dx_old = dx
elif dx is None or dx == 0:
dx_old = np.diff(x)
else:
dx_old = np.array([dx])
if np.any(dx_new < dx_old):
raise ValueError(
"New frequency resolution must be larger than " "old frequency resolution."
)
# left and right bin edges
# assumes that the points given in `x` correspond to
# the left bin edges
xedges = np.hstack([x, x[-1] + dx_old[-1]])
# new regularly binned resolution
xbin = np.arange(xedges[0], xedges[-1] + dx_new, dx_new)
output = np.zeros(xbin.shape[0] - 1, dtype=type(y[0]))
outputerr = np.zeros(xbin.shape[0] - 1, dtype=type(yerr[0]))
step_size = np.zeros(xbin.shape[0] - 1)
all_x = np.searchsorted(xedges, xbin)
min_inds = all_x[:-1]
max_inds = all_x[1:]
xmins = xbin[:-1]
xmaxs = xbin[1:]
for i, (xmin, xmax, min_ind, max_ind) in enumerate(zip(xmins, xmaxs, min_inds, max_inds)):
filtered_y = y[min_ind : max_ind - 1]
filtered_yerr = yerr[min_ind : max_ind - 1]
output[i] = np.sum(filtered_y)
outputerr[i] = np.sum(filtered_yerr)
step_size[i] = max_ind - 1 - min_ind
prev_dx = xedges[min_ind] - xedges[min_ind - 1]
prev_frac = (xedges[min_ind] - xmin) / prev_dx
output[i] += y[min_ind - 1] * prev_frac
outputerr[i] += yerr[min_ind - 1] * prev_frac
step_size[i] += prev_frac
if not max_ind == xedges.size:
dx_post = xedges[max_ind] - xedges[max_ind - 1]
post_frac = (xmax - xedges[max_ind - 1]) / dx_post
output[i] += y[max_ind - 1] * post_frac
outputerr[i] += yerr[max_ind - 1] * post_frac
step_size[i] += post_frac
if method in ["mean", "avg", "average", "arithmetic mean"]:
ybin = output / step_size
ybinerr = np.sqrt(outputerr) / step_size
elif method == "sum":
ybin = output
ybinerr = np.sqrt(outputerr)
else:
raise ValueError(
"Method for summing or averaging not recognized. "
"Please enter either 'sum' or 'mean'."
)
tseg = x[-1] - x[0] + dx_old[-1]
if (tseg / dx_new % 1) > 0:
ybin = ybin[:-1]
ybinerr = ybinerr[:-1]
step_size = step_size[:-1]
dx_var = np.var(dx_old) / np.mean(dx_old)
if np.size(dx_old) == 1 or dx_var < 1e-6:
step_size = step_size[0]
new_x0 = (x[0] - (0.5 * dx_old[0])) + (0.5 * dx_new)
xbin = np.arange(ybin.shape[0]) * dx_new + new_x0
return xbin, ybin, ybinerr, step_size
[docs]
def rebin_data_log(x, y, f, y_err=None, dx=None):
"""Logarithmic re-bin of some data. Particularly useful for the power
spectrum.
The new dependent variable depends on the previous dependent variable
modified by a factor f:
.. math::
d\\nu_j = d\\nu_{j-1} (1+f)
Parameters
----------
x: iterable
The dependent variable with some resolution ``dx_old = x[1]-x[0]``
y: iterable
The independent variable to be binned
f: float
The factor of increase of each bin wrt the previous one.
Other Parameters
----------------
yerr: iterable, optional
The uncertainties of ``y`` to be propagated during binning.
method: {``sum`` | ``average`` | ``mean``}, optional, default ``sum``
The method to be used in binning. Either sum the samples ``y`` in
each new bin of ``x`` or take the arithmetic mean.
dx: float, optional
The binning step of the initial ``x``
Returns
-------
xbin: numpy.ndarray
The midpoints of the new bins in ``x``
ybin: numpy.ndarray
The binned quantity ``y``
ybin_err: numpy.ndarray
The uncertainties of the binned values of ``y``
step_size: float
The size of the binning step
"""
dx_init = apply_function_if_none(dx, np.diff(x), np.median)
x = np.asarray(x)
y = np.asarray(y)
y_err = np.asarray(apply_function_if_none(y_err, y, np.zeros_like))
if x.shape[0] != y.shape[0]:
raise ValueError("x and y must be of the same length!")
if y.shape[0] != y_err.shape[0]:
raise ValueError("y and y_err must be of the same length!")
minx = x[0] * 0.5 # frequency to start from
maxx = x[-1] # maximum frequency to end
binx_for_stats = [minx, minx + dx_init] # first
dx = dx_init # the frequency resolution of the first bin
# until we reach the maximum frequency, increase the width of each
# frequency bin by f
while binx_for_stats[-1] <= maxx:
binx_for_stats.append(binx_for_stats[-1] + dx * (1.0 + f))
dx = binx_for_stats[-1] - binx_for_stats[-2]
binx_for_stats = np.asarray(binx_for_stats)
real = y.real
real_err = y_err.real
# compute the mean of the ys that fall into each new frequency bin.
# we cast to np.double due to scipy's bad handling of longdoubles
binx, bin_edges, binno = scipy.stats.binned_statistic(
x.astype(np.double), x.astype(np.double), statistic="mean", bins=binx_for_stats
)
biny, bin_edges, binno = scipy.stats.binned_statistic(
x.astype(np.double), real.astype(np.double), statistic="mean", bins=binx_for_stats
)
biny_err, bin_edges, binno = scipy.stats.binned_statistic(
x.astype(np.double),
real_err.astype(np.double),
statistic=_root_squared_mean,
bins=binx_for_stats,
)
if np.iscomplexobj(y):
imag = y.imag
biny_imag, bin_edges, binno = scipy.stats.binned_statistic(
x.astype(np.double), imag.astype(np.double), statistic="mean", bins=binx_for_stats
)
biny = biny + 1j * biny_imag
if np.iscomplexobj(y_err):
imag_err = y_err.imag
biny_err_imag, bin_edges, binno = scipy.stats.binned_statistic(
x.astype(np.double),
imag_err.astype(np.double),
statistic=_root_squared_mean,
bins=binx_for_stats,
)
biny_err = biny_err + 1j * biny_err_imag
# compute the number of powers in each frequency bin
nsamples = np.array(
[len(binno[np.where(binno == i)[0]]) for i in range(1, np.max(binno) + 1, 1)]
)
return binx, biny, biny_err, nsamples
def apply_function_if_none(variable, value, func):
"""
Assign a function value to a variable if that variable has value ``None`` on input.
Parameters
----------
variable : object
A variable with either some assigned value, or ``None``
value : object
A variable to go into the function
func : function
Function to apply to ``value``. Result is assigned to ``variable``
Returns
-------
new_value : object
The new value of ``variable``
Examples
--------
>>> var = 4
>>> value = np.zeros(10)
>>> apply_function_if_none(var, value, np.mean)
4
>>> var = None
>>> apply_function_if_none(var, value, lambda y: float(np.mean(y)))
0.0
"""
if variable is None:
return func(value)
else:
return variable
def assign_value_if_none(value, default):
"""
Assign a value to a variable if that variable has value ``None`` on input.
Parameters
----------
value : object
A variable with either some assigned value, or ``None``
default : object The value to assign to the variable ``value`` if
``value is None`` returns ``True``
Returns
-------
new_value : object
The new value of ``value``
"""
return default if value is None else value
[docs]
def look_for_array_in_array(array1, array2):
"""
Find a subset of values in an array.
Parameters
----------
array1 : iterable
An array with values to be searched
array2 : iterable
A second array which potentially contains a subset of values
also contained in ``array1``
Returns ------- array3 : iterable An array with the subset of values
contained in both ``array1`` and ``array2``
"""
return next((i for i in array1 if i in array2), None)
[docs]
def is_string(s):
"""
Portable function to answer whether a variable is a string.
Parameters
----------
s : object
An object that is potentially a string
Returns
-------
isstring : bool
A boolean decision on whether ``s`` is a string or not
"""
return isinstance(s, str)
[docs]
def is_iterable(var):
"""Test if a variable is an iterable.
Parameters
----------
var : object
The variable to be tested for iterably-ness
Returns
-------
is_iter : bool
Returns ``True`` if ``var`` is an ``Iterable``, ``False`` otherwise
"""
return isinstance(var, Iterable)
[docs]
def order_list_of_arrays(data, order):
"""Sort an array according to the specified order.
Parameters
----------
data : iterable
Returns
-------
data : list or dict
"""
if hasattr(data, "items"):
data = dict([(key, value[order]) for key, value in data.items()])
elif is_iterable(data):
data = [i[order] for i in data]
else:
data = None
return data
[docs]
def optimal_bin_time(fftlen, tbin):
"""Vary slightly the bin time to have a power of two number of bins.
Given an FFT length and a proposed bin time, return a bin time
slightly shorter than the original, that will produce a power-of-two number
of FFT bins.
Parameters
----------
fftlen : int
Number of positive frequencies in a proposed Fourier spectrum
tbin : float
The proposed time resolution of a light curve
Returns
-------
res : float
A time resolution that will produce a Fourier spectrum with ``fftlen`` frequencies and
a number of FFT bins that are a power of two
"""
return fftlen / (2 ** np.ceil(np.log2(fftlen / tbin)))
[docs]
def contiguous_regions(condition):
"""Find contiguous ``True`` regions of the boolean array ``condition``.
Return a 2D array where the first column is the start index of the region
and the second column is the end index, found on [so-contiguous]_.
Parameters
----------
condition : bool array
Returns
-------
idx : ``[[i0_0, i0_1], [i1_0, i1_1], ...]``
A list of integer couples, with the start and end of each ``True`` blocks
in the original array
Notes
-----
.. [so-contiguous] http://stackoverflow.com/questions/4494404/find-large-number-of-consecutive-values-fulfilling-condition-in-a-numpy-array
"""
# Find the indices of changes in "condition"
diff = np.logical_xor(condition[1:], condition[:-1])
(idx,) = diff.nonzero()
# We need to start things after the change in "condition". Therefore,
# we'll shift the index by 1 to the right.
idx += 1
if condition[0]:
# If the start of condition is True prepend a 0
idx = np.r_[0, idx]
if condition[-1]:
# If the end of condition is True, append the length of the array
idx = np.r_[idx, condition.size]
# Reshape the result into two columns
idx.shape = (-1, 2)
return idx
[docs]
def is_int(obj):
"""Test if object is an integer."""
return isinstance(obj, (numbers.Integral, np.integer))
[docs]
def get_random_state(random_state=None):
"""Return a Mersenne Twister pseudo-random number generator.
Parameters
----------
seed : integer or ``numpy.random.RandomState``, optional, default ``None``
Returns
-------
random_state : mtrand.RandomState object
"""
if not random_state:
random_state = np.random.mtrand._rand
else:
if is_int(random_state):
random_state = np.random.RandomState(random_state)
elif not isinstance(random_state, np.random.RandomState):
raise ValueError(
"{value} can't be used to generate a numpy.random.RandomState".format(
value=random_state
)
)
return random_state
def _offset(x, off):
"""An offset."""
return off
def offset_fit(x, y, offset_start=0):
"""Fit a constant offset to the data.
Parameters
----------
x : array-like
y : array-like
offset_start : float
Constant offset, initial value
Returns
-------
offset : float
Fitted offset
"""
from scipy.optimize import curve_fit
par, _ = curve_fit(_offset, x, y, [offset_start], maxfev=6000)
return par[0]
def _als(y, lam, p, niter=10):
"""Baseline Correction with Asymmetric Least Squares Smoothing.
Modifications to the routine from Eilers & Boelens 2005 [eilers-2005]_.
The Python translation is partly from [so-als]_.
Parameters
----------
y : array-like
the data series corresponding to ``x``
lam : float
the lambda parameter of the ALS method. This control how much the
baseline can adapt to local changes. A higher value corresponds to a
stiffer baseline
p : float
the asymmetry parameter of the ALS method. This controls the overall
slope tollerated for the baseline. A higher value correspond to a
higher possible slope
Other parameters
----------------
niter : int
The number of iterations to perform
Returns
-------
z : array-like, same size as ``y``
Fitted baseline.
References
----------
.. [eilers-2005] https://www.researchgate.net/publication/228961729_Technical_Report_Baseline_Correction_with_Asymmetric_Least_Squares_Smoothing
.. [so-als] http://stackoverflow.com/questions/29156532/python-baseline-correction-library
"""
from scipy import sparse
L = len(y)
indptr = np.arange(0, L - 1, dtype=np.int32) * 3
indices = np.vstack(
(np.arange(0, L - 2).T, np.arange(0, L - 2).T + 1, np.arange(0, L - 2).T + 2)
).T.flatten()
data = np.tile([1, -2, 1], L - 2)
D = sparse.csc_matrix((data, indices, indptr), shape=(L, L - 2))
w = np.ones(L)
for _ in range(niter):
W = sparse.spdiags(w, 0, L, L)
Z = W + lam * D.dot(D.transpose())
z = sparse.linalg.spsolve(Z, w * y)
w = p * (y > z) + (1 - p) * (y < z)
return z
def fix_segment_size_to_integer_samples(segment_size, dt, tolerance=0.01):
"""Fix segment size to an integer number of bins.
In the most common case, it will be reduced to an integer number of bins,
approximating to the lower integer. However, when it is close to the next
integer, it will be approximated to the higher integer.
Parameters
----------
segment_size : float
The segment size in seconds
dt : float
The sample time in seconds
Other Parameters
----------------
tolerance : float
The tolerance to consider when approximating to the higher integer
Returns
-------
segment_size : float
The segment size in seconds, fixed to an integer number of bins
n_bin : int
The number of bins in the segment
Examples
--------
>>> seg, n = fix_segment_size_to_integer_samples(1.0, 0.1)
>>> assert seg == 1.0, n == 10
>>> seg, n = fix_segment_size_to_integer_samples(0.999, 0.1)
>>> assert seg == 1.0, n == 10
"""
n_bin_float = segment_size / dt
n_bin_down = np.floor(segment_size / dt)
n_bin_up = np.ceil(segment_size / dt)
n_bin = n_bin_down
if n_bin_up - n_bin_float < tolerance:
n_bin = n_bin_up
segment_size = n_bin * dt
return segment_size, int(n_bin)
[docs]
def baseline_als(x, y, lam=None, p=None, niter=10, return_baseline=False, offset_correction=False):
"""Baseline Correction with Asymmetric Least Squares Smoothing.
Parameters
----------
x : array-like
the sample time/number/position
y : array-like
the data series corresponding to ``x``
lam : float
the lambda parameter of the ALS method. This control how much the
baseline can adapt to local changes. A higher value corresponds to a
stiffer baseline
p : float
the asymmetry parameter of the ALS method. This controls the overall
slope tolerated for the baseline. A higher value correspond to a
higher possible slope
Other Parameters
----------------
niter : int
The number of iterations to perform
return_baseline : bool
return the baseline?
offset_correction : bool
also correct for an offset to align with the running mean of the scan
Returns
-------
y_subtracted : array-like, same size as ``y``
The initial time series, subtracted from the trend
baseline : array-like, same size as ``y``
Fitted baseline. Only returned if return_baseline is ``True``
Examples
--------
>>> x = np.arange(0, 10, 0.01)
>>> y = np.zeros_like(x) + 10
>>> ysub = baseline_als(x, y)
>>> assert np.all(ysub < 0.001)
"""
if lam is None:
lam = 1e11
if p is None:
p = 0.001
z = _als(y, lam, p, niter=niter)
ysub = y - z
offset = 0
if offset_correction:
std = mad(ysub)
good = np.abs(ysub) < 10 * std
if len(x[good]) < 10:
good = np.ones(len(x), dtype=bool)
warnings.warn(
"Too few bins to perform baseline offset correction" " precisely. Beware of results"
)
offset = offset_fit(x[good], ysub[good], 0)
if return_baseline:
return ysub - offset, z + offset
else:
return ysub - offset
[docs]
def excess_variance(lc, normalization="fvar"):
r"""Calculate the excess variance.
Vaughan et al. 2003, MNRAS 345, 1271 give three measurements of source
intrinsic variance: if a light curve has a total variance of :math:`S^2`,
and each point has an error bar :math:`\sigma_{err}`, the *excess variance*
is defined as
.. math:: \sigma_{XS} = S^2 - \overline{\sigma_{err}}^2;
the *normalized excess variance* is the excess variance divided by the
square of the mean intensity:
.. math:: \sigma_{NXS} = \dfrac{\sigma_{XS}}{\overline{x}^2};
the *fractional mean square variability amplitude*, or
:math:`F_{var}`, is finally defined as
.. math:: F_{var} = \sqrt{\dfrac{\sigma_{XS}}{\overline{x}^2}}
Parameters
----------
lc : a :class:`Lightcurve` object
normalization : str
if ``fvar``, return the fractional mean square variability :math:`F_{var}`.
If ``none``, return the unnormalized excess variance variance
:math:`\sigma_{XS}`. If ``norm_xs``, return the normalized excess variance
:math:`\sigma_{XS}`
Returns
-------
var_xs : float
var_xs_err : float
"""
lc_mean_var = np.mean(lc.counts_err**2)
lc_actual_var = np.var(lc.counts)
var_xs = lc_actual_var - lc_mean_var
mean_lc = np.mean(lc.counts)
mean_ctvar = mean_lc**2
var_nxs = var_xs / mean_lc**2
fvar = np.sqrt(var_xs / mean_ctvar)
N = len(lc.counts)
var_nxs_err_A = np.sqrt(2 / N) * lc_mean_var / mean_lc**2
var_nxs_err_B = np.sqrt(lc_mean_var / N) * 2 * fvar / mean_lc
var_nxs_err = np.sqrt(var_nxs_err_A**2 + var_nxs_err_B**2)
fvar_err = var_nxs_err / (2 * fvar)
if normalization == "fvar":
return fvar, fvar_err
elif normalization == "norm_xs":
return var_nxs, var_nxs_err
elif normalization == "none" or normalization is None:
return var_xs, var_nxs_err * mean_lc**2
[docs]
def create_window(N, window_type="uniform"):
"""A method to create window functions commonly used in signal processing.
Windows supported are:
Hamming, Hanning, uniform (rectangular window), triangular window,
blackmann window among others.
Parameters
----------
N : int
Total number of data points in window. If negative, abs is taken.
window_type : {``uniform``, ``parzen``, ``hamming``, ``hanning``, ``triangular``,\
``welch``, ``blackmann``, ``flat-top``}, optional, default ``uniform``
Type of window to create.
Returns
-------
window: numpy.ndarray
Window function of length ``N``.
"""
if not isinstance(N, int):
raise TypeError("N (window length) must be an integer")
windows = [
"uniform",
"parzen",
"hamming",
"hanning",
"triangular",
"welch",
"blackmann",
"flat-top",
]
if not isinstance(window_type, str):
raise TypeError("type of window must be specified as string!")
window_type = window_type.lower()
if window_type not in windows:
raise ValueError("Wrong window type specified or window function is not available")
# Return empty array as window if N = 0
if N == 0:
return np.array([])
window = None
N = abs(N)
# Window samples index
n = np.arange(N)
# Constants
N_minus_1 = N - 1
N_by_2 = int((np.floor((N_minus_1) / 2)))
# Create Windows
if window_type == "uniform":
window = np.ones(N)
if window_type == "parzen":
N_parzen = int(np.ceil((N + 1) / 2))
N2_plus_1 = int(np.floor((N_parzen / 2))) + 1
window = np.zeros(N_parzen)
windlag0 = np.arange(0, N2_plus_1) / (N_parzen - 1)
windlag1 = 1 - np.arange(N2_plus_1, N_parzen) / (N_parzen - 1)
window[:N2_plus_1] = 1 - (1 - windlag0) * windlag0 * windlag0 * 6
window[N2_plus_1:] = windlag1 * windlag1 * windlag1 * 2
lagindex = np.arange(N_parzen - 1, 0, -1)
window = np.concatenate((window[lagindex], window))
window = window[:N]
if window_type == "hamming":
window = 0.54 - 0.46 * np.cos((2 * np.pi * n) / N_minus_1)
if window_type == "hanning":
window = 0.5 * (1 - np.cos(2 * np.pi * n / N_minus_1))
if window_type == "triangular":
window = 1 - np.abs((n - (N_by_2)) / N)
if window_type == "welch":
N_minus_1_by_2 = N_minus_1 / 2
window = 1 - np.square((n - N_minus_1_by_2) / N_minus_1_by_2)
if window_type == "blackmann":
a0 = 0.42659
a1 = 0.49656
a2 = 0.076849
window = (
a0 - a1 * np.cos((2 * np.pi * n) / N_minus_1) + a2 * np.cos((4 * np.pi * n) / N_minus_1)
)
if window_type == "flat-top":
a0 = 1
a1 = 1.93
a2 = 1.29
a3 = 0.388
a4 = 0.028
window = (
a0
- a1 * np.cos((2 * np.pi * n) / N_minus_1)
+ a2 * np.cos((4 * np.pi * n) / N_minus_1)
- a3 * np.cos((6 * np.pi * n) / N_minus_1)
+ a4 * np.cos((8 * np.pi * n) / N_minus_1)
)
return window
[docs]
def poisson_symmetrical_errors(counts):
"""Optimized version of frequentist symmetrical errors.
Uses a lookup table in order to limit the calls to poisson_conf_interval
Parameters
----------
counts : iterable
An array of Poisson-distributed numbers
Returns
-------
err : numpy.ndarray
An array of uncertainties associated with the Poisson counts in
``counts``
Examples
--------
>>> from astropy.stats import poisson_conf_interval
>>> counts = np.random.randint(0, 1000, 100)
>>> # ---- Do it without the lookup table ----
>>> err_low, err_high = poisson_conf_interval(np.asarray(counts),
... interval='frequentist-confidence', sigma=1)
>>> err_low -= np.asarray(counts)
>>> err_high -= np.asarray(counts)
>>> err = (np.absolute(err_low) + np.absolute(err_high))/2.0
>>> # Do it with this function
>>> err_thisfun = poisson_symmetrical_errors(counts)
>>> # Test that results are always the same
>>> assert np.allclose(err_thisfun, err)
"""
from astropy.stats import poisson_conf_interval
counts_int = np.asarray(counts, dtype=np.int64)
count_values = np.nonzero(np.bincount(counts_int))[0]
err_low, err_high = poisson_conf_interval(
count_values, interval="frequentist-confidence", sigma=1
)
# calculate approximately symmetric uncertainties
err_low -= np.asarray(count_values)
err_high -= np.asarray(count_values)
err = (np.absolute(err_low) + np.absolute(err_high)) / 2.0
idxs = np.searchsorted(count_values, counts_int)
return err[idxs]
[docs]
def standard_error(xs, mean):
"""
Return the standard error of the mean (SEM) of an array of arrays.
Parameters
----------
xs : 2-d float array
List of data point arrays.
mean : 1-d float array
Average of the data points.
Returns
-------
standard_error : 1-d float array
Standard error of the mean (SEM).
"""
n_seg = len(xs)
xs_diff_sq = np.subtract(xs, mean) ** 2
standard_deviation = np.sum(xs_diff_sq, axis=0) / (n_seg - 1)
error = np.sqrt(standard_deviation / n_seg)
return error
[docs]
def nearest_power_of_two(x):
"""
Return a number which is nearest to `x` and is the integral power of two.
Parameters
----------
x : int, float
Returns
-------
x_nearest : int
Number closest to `x` and is the integral power of two.
"""
x = int(x)
x_lower = 1 if x == 0 else 2 ** (x - 2).bit_length()
x_upper = 1 if x == 0 else 2 ** (x - 1).bit_length()
x_nearest = x_lower if (x - x_lower) < (x_upper - x) else x_upper
return x_nearest
[docs]
def find_nearest(array, value, side="left"):
"""
Return the array value that is closest to the input value (Abigail Stevens:
Thanks StackOverflow!)
Parameters
----------
array : np.array of ints or floats
1-D array of numbers to search through. Should already be sorted
from low values to high values.
value : int or float
The value you want to find the closest to in the array.
Other Parameters
----------------
side : str
Look at the ``numpy.searchsorted`` documentation for more information.
Returns
-------
array[idx] : int or float
The array value that is closest to the input value.
idx : int
The index of the array of the closest value.
"""
idx = np.searchsorted(array, value, side=side)
if idx == len(array) or np.fabs(value - array[idx - 1]) < np.fabs(value - array[idx]):
return array[idx - 1], idx - 1
else:
return array[idx], idx
def check_iterables_close(iter0, iter1, **kwargs):
"""Check that the values produced by iterables are equal.
Uses `np.isclose` if the iterables produce single values per iteration,
`np.allclose` otherwise.
Additional keyword arguments are passed to `np.allclose`
and `np.isclose`.
Parameters
----------
iter0 : iterable
iter1 : iterable
Examples
--------
>>> iter0 = [0, 1]
>>> iter1 = [0, 2]
>>> check_iterables_close(iter0, iter1)
False
>>> iter0 = [(0, 0), (0, 1)]
>>> iter1 = [(0, 0.), (0, 1.)]
>>> assert check_iterables_close(iter0, iter1)
>>> iter1 = [(0, 0.), (0, 3.)]
>>> check_iterables_close(iter0, iter1)
False
"""
for i0, i1 in zip(iter0, iter1):
if isinstance(i0, Iterable):
if not np.allclose(i0, i1):
return False
continue
if not np.isclose(i0, i1):
return False
return True
def check_allclose_and_print(
v1,
v2,
rtol=1e-05,
atol=1e-08,
):
"""Check that the values in the array v1 and v2 are equal.
It prints the values that are different.
Uses `np.allclose` and it has the option to specify rtol and atol
Parameters
----------
v1 : array
v2 : array
rtol : The relative tolerance parameter
atol : The absolute tolerance parameter
If the following equation element-wise True, then allclose returns True.
absolute(a - b) <= (atol + rtol * absolute(b))
"""
try:
assert np.allclose(v1, v2, rtol, atol)
except Exception as e:
v1 = np.asarray(v1)
v2 = np.asarray(v2)
bad = np.abs(v1 - v2) >= (atol + rtol * np.abs(v2))
raise AssertionError(
f"Different values in the arrays check by allclose: \
{v1[bad]} vs {v2[bad]}, indices are {np.where(v1[bad])[0]}\
and {np.where(v2[bad])[0]}"
)
@njit(nogil=True, parallel=False)
def compute_bin(x, bin_edges):
"""Given a list of bin edges, get what bin will a number end up to
Parameters
----------
x : float
The value to insert
bin_edges: array
The list of bin edges
Returns
-------
bin : int
The bin number. None if outside bin edges.
Examples
--------
>>> bin_edges = np.array([0, 5, 10])
>>> compute_bin(1, bin_edges)
0
>>> compute_bin(5, bin_edges)
1
>>> compute_bin(10, bin_edges)
1
>>> assert compute_bin(11, bin_edges) is None
"""
# assuming uniform bins for now
n = bin_edges.shape[0] - 1
a_min = bin_edges[0]
a_max = bin_edges[-1]
# special case to mirror NumPy behavior for last bin
if x == a_max:
return n - 1 # a_max always in last bin
bin = int(n * (x - a_min) / (a_max - a_min))
if bin < 0 or bin >= n:
return None
return bin
@njit(nogil=True, parallel=False)
def _hist1d_numba_seq(H, tracks, bins, ranges):
delta = 1 / ((ranges[1] - ranges[0]) / bins)
for t in range(tracks.size):
i = (tracks[t] - ranges[0]) * delta
if 0 <= i < bins:
H[int(i)] += 1
return H
def _allocate_array_or_memmap(shape, dtype, use_memmap=False, tmp=None):
"""Allocate an array. If very big and user asks for it, allocate a memory map.
Parameters
----------
shape : tuple
Shape of the output array
dtype : str or anything compatible with `np.dtype`
Type of the output array
use_memmap : bool
If ``True`` and the number of bins is above 10 million,
the histogram is created into a memory-mapped Numpy array
tmp : str, default None
Temporary file name for the memory map (only relevant if
``use_memmap`` is ``True``). A temporary file with random
name is allocated if this is not specified.
Returns
-------
H : array
The output array
"""
if use_memmap and np.prod(shape) > 10**7:
if tmp is None:
tmp = tempfile.NamedTemporaryFile("w+", suffix=".npy").name
H = np.lib.format.open_memmap(tmp, mode="w+", dtype=dtype, shape=shape)
else:
H = np.zeros(shape, dtype=dtype)
return H
def hist1d_numba_seq(a, bins, range, use_memmap=False, tmp=None):
"""Numba-compiled 1-d histogram.
Parameters
----------
a : array-like
Input array, to be histogrammed
bins : integer
number of bins in the final histogram
range : [min, max]
Minimum and maximum value of the histogram
Other parameters
----------------
use_memmap : bool
If ``True`` and the number of bins is above 10 million,
the histogram is created into a memory-mapped Numpy array
tmp : str
Temporary file name for the memory map (only relevant if
``use_memmap`` is ``True``)
Returns
-------
histogram: array-like
Histogrammed values of a, in ``bins`` bins.
From https://iscinumpy.dev/post/histogram-speeds-in-python/
Examples
--------
>>> if os.path.exists('out.npy'): os.unlink('out.npy')
>>> x = np.random.uniform(0., 1., 100)
>>> H, xedges = np.histogram(x, bins=5, range=[0., 1.])
>>> Hn = hist1d_numba_seq(x, bins=5, range=[0., 1.], tmp='out.npy',
... use_memmap=True)
>>> assert np.all(H == Hn)
>>> # The number of bins is small, memory map was not used!
>>> assert not os.path.exists('out.npy')
>>> H, xedges = np.histogram(x, bins=10**8, range=[0., 1.])
>>> Hn = hist1d_numba_seq(x, bins=10**8, range=[0., 1.],
... use_memmap=True, tmp='out.npy')
>>> assert np.all(H == Hn)
>>> assert os.path.exists('out.npy') # Created!
>>> # Here, instead, it will create a temporary file for the memory map
>>> Hn = hist1d_numba_seq(x, bins=10**8, range=[0., 1.],
... use_memmap=True)
>>> assert np.all(H == Hn)
"""
hist_arr = _allocate_array_or_memmap((bins,), a.dtype, use_memmap=use_memmap, tmp=tmp)
return _hist1d_numba_seq(hist_arr, a, bins, np.asarray(range))
@njit(nogil=True, parallel=False)
def _hist2d_numba_seq(H, tracks, bins, ranges):
delta = 1 / ((ranges[:, 1] - ranges[:, 0]) / bins)
for t in range(tracks.shape[1]):
i = (tracks[0, t] - ranges[0, 0]) * delta[0]
j = (tracks[1, t] - ranges[1, 0]) * delta[1]
if 0 <= i < bins[0] and 0 <= j < bins[1]:
H[int(i), int(j)] += 1
return H
def hist2d_numba_seq(x, y, bins, range, use_memmap=False, tmp=None):
"""Numba-compiled 2-d histogram.
From https://iscinumpy.dev/post/histogram-speeds-in-python/
Parameters
----------
x : array-like
Input array, to be histogrammed
y : array-like
Input array (equal length to x), to be histogrammed
shape : (int, int)
shape of the final histogram
range : [min, max]
Minimum and maximum value of the histogram
Other parameters
----------------
use_memmap : bool
If ``True`` and the number of bins is above 10 million,
the histogram is created into a memory-mapped Numpy array
tmp : str
Temporary file name for the memory map (only relevant if
``use_memmap`` is ``True``)
Returns
-------
histogram: array-like
Output Histogram
Examples
--------
>>> x = np.random.uniform(0., 1., 100)
>>> y = np.random.uniform(2., 3., 100)
>>> H, xedges, yedges = np.histogram2d(x, y, bins=(5, 5),
... range=[(0., 1.), (2., 3.)])
>>> Hn = hist2d_numba_seq(x, y, bins=(5, 5),
... range=[[0., 1.], [2., 3.]])
>>> assert np.all(H == Hn)
>>> H, xedges, yedges = np.histogram2d(x, y, bins=(5000, 5000),
... range=[(0., 1.), (2., 3.)])
>>> Hn = hist2d_numba_seq(x, y, bins=(5000, 5000),
... range=[[0., 1.], [2., 3.]],
... use_memmap=True)
>>> assert np.all(H == Hn)
"""
H = _allocate_array_or_memmap(bins, np.uint64, use_memmap=use_memmap, tmp=tmp)
return _hist2d_numba_seq(H, np.array([x, y]), np.asarray(list(bins)), np.asarray(range))
@njit(nogil=True, parallel=False)
def _hist3d_numba_seq(H, tracks, bins, ranges):
delta = 1 / ((ranges[:, 1] - ranges[:, 0]) / bins)
for t in range(tracks.shape[1]):
i = (tracks[0, t] - ranges[0, 0]) * delta[0]
j = (tracks[1, t] - ranges[1, 0]) * delta[1]
k = (tracks[2, t] - ranges[2, 0]) * delta[2]
if 0 <= i < bins[0] and 0 <= j < bins[1]:
H[int(i), int(j), int(k)] += 1
return H
def hist3d_numba_seq(tracks, bins, range, use_memmap=False, tmp=None):
"""Numba-compiled 3d histogram
From https://iscinumpy.dev/post/histogram-speeds-in-python/
Parameters
----------
tracks : (array-like, array-like, array-like)
List of input arrays of identical length, to be histogrammed
bins : (int, int, int)
shape of the final histogram
range : [min, max]
Minimum and maximum value of the histogram
Other parameters
----------------
use_memmap : bool
If ``True`` and the number of bins is above 10 million,
the histogram is created into a memory-mapped Numpy array
tmp : str
Temporary file name for the memory map (only relevant if
``use_memmap`` is ``True``)
Returns
-------
histogram: array-like
Output Histogram
Examples
--------
>>> x = np.random.uniform(0., 1., 100)
>>> y = np.random.uniform(2., 3., 100)
>>> z = np.random.uniform(4., 5., 100)
>>> H, _ = np.histogramdd((x, y, z), bins=(5, 6, 7),
... range=[(0., 1.), (2., 3.), (4., 5)])
>>> Hn = hist3d_numba_seq((x, y, z), bins=(5, 6, 7),
... range=[[0., 1.], [2., 3.], [4., 5.]])
>>> assert np.all(H == Hn)
>>> H, _ = np.histogramdd((x, y, z), bins=(300, 300, 300),
... range=[(0., 1.), (2., 3.), (4., 5)])
>>> Hn = hist3d_numba_seq((x, y, z), bins=(300, 300, 300),
... range=[[0., 1.], [2., 3.], [4., 5.]])
>>> assert np.all(H == Hn)
"""
H = _allocate_array_or_memmap(bins, np.uint64, use_memmap=use_memmap, tmp=tmp)
return _hist3d_numba_seq(H, np.asarray(tracks), np.asarray(list(bins)), np.asarray(range))
@njit(nogil=True, parallel=False)
def _hist1d_numba_seq_weight(H, tracks, weights, bins, ranges):
delta = 1 / ((ranges[1] - ranges[0]) / bins)
for t in range(tracks.size):
i = (tracks[t] - ranges[0]) * delta
if 0 <= i < bins:
H[int(i)] += weights[t]
return H
def hist1d_numba_seq_weight(a, weights, bins, range, use_memmap=False, tmp=None):
"""Numba-compiled 1-d histogram with weights.
Parameters
----------
a : array-like
Input array, to be histogrammed
weights : array-like
Input weight of each of the input values ``a``
bins : integer
number of bins in the final histogram
range : [min, max]
Minimum and maximum value of the histogram
Other parameters
----------------
use_memmap : bool
If ``True`` and the number of bins is above 10 million,
the histogram is created into a memory-mapped Numpy array
tmp : str
Temporary file name for the memory map (only relevant if
``use_memmap`` is ``True``)
Returns
-------
histogram: array-like
Histogrammed values of a, in ``bins`` bins.
Adapted from https://iscinumpy.dev/post/histogram-speeds-in-python/
Examples
--------
>>> if os.path.exists('out.npy'): os.unlink('out.npy')
>>> x = np.random.uniform(0., 1., 100)
>>> weights = np.random.uniform(0, 1, 100)
>>> H, xedges = np.histogram(x, bins=5, range=[0., 1.], weights=weights)
>>> Hn = hist1d_numba_seq_weight(x, weights, bins=5, range=[0., 1.], tmp='out.npy',
... use_memmap=True)
>>> assert np.all(H == Hn)
>>> # The number of bins is small, memory map was not used!
>>> assert not os.path.exists('out.npy')
>>> H, xedges = np.histogram(x, bins=10**8, range=[0., 1.], weights=weights)
>>> Hn = hist1d_numba_seq_weight(x, weights, bins=10**8, range=[0., 1.], tmp='out.npy',
... use_memmap=True)
>>> assert np.all(H == Hn)
>>> assert os.path.exists('out.npy')
>>> # Now use memmap but do not specify a tmp file
>>> Hn = hist1d_numba_seq_weight(x, weights, bins=10**8, range=[0., 1.],
... use_memmap=True)
>>> assert np.all(H == Hn)
"""
if bins > 10**7 and use_memmap:
if tmp is None:
tmp = tempfile.NamedTemporaryFile("w+").name
hist_arr = np.lib.format.open_memmap(tmp, mode="w+", dtype=a.dtype, shape=(bins,))
else:
hist_arr = np.zeros((bins,), dtype=a.dtype)
return _hist1d_numba_seq_weight(hist_arr, a, weights, bins, np.asarray(range))
@njit(nogil=True, parallel=False)
def _hist2d_numba_seq_weight(H, tracks, weights, bins, ranges):
delta = 1 / ((ranges[:, 1] - ranges[:, 0]) / bins)
for t in range(tracks.shape[1]):
i = (tracks[0, t] - ranges[0, 0]) * delta[0]
j = (tracks[1, t] - ranges[1, 0]) * delta[1]
if 0 <= i < bins[0] and 0 <= j < bins[1]:
H[int(i), int(j)] += weights[t]
return H
def hist2d_numba_seq_weight(x, y, weights, bins, range, use_memmap=False, tmp=None):
"""Numba-compiled 2d histogram with weights
From https://iscinumpy.dev/post/histogram-speeds-in-python/
Parameters
----------
x : array-like
List of input values in the x-direction
y : array-like
List of input values in the y-direction, of the same length of ``x``
weights : array-like
Input weight of each of the input values.
bins : (int, int, int)
shape of the final histogram
range : [min, max]
Minimum and maximum value of the histogram
Other parameters
----------------
use_memmap : bool
If ``True`` and the number of bins is above 10 million,
the histogram is created into a memory-mapped Numpy array
tmp : str
Temporary file name for the memory map (only relevant if
``use_memmap`` is ``True``)
Returns
-------
histogram: array-like
Output Histogram
From https://iscinumpy.dev/post/histogram-speeds-in-python/
Examples
--------
>>> x = np.random.uniform(0., 1., 100)
>>> y = np.random.uniform(2., 3., 100)
>>> weight = np.random.uniform(0, 1, 100)
>>> H, xedges, yedges = np.histogram2d(x, y, bins=(5, 5),
... range=[(0., 1.), (2., 3.)],
... weights=weight)
>>> Hn = hist2d_numba_seq_weight(x, y, bins=(5, 5),
... range=[[0., 1.], [2., 3.]],
... weights=weight)
>>> assert np.all(H == Hn)
"""
H = _allocate_array_or_memmap(bins, np.double, use_memmap=use_memmap, tmp=tmp)
return _hist2d_numba_seq_weight(
H,
np.array([x, y]),
weights,
np.asarray(list(bins)),
np.asarray(range),
)
@njit(nogil=True, parallel=False)
def _hist3d_numba_seq_weight(H, tracks, weights, bins, ranges):
delta = 1 / ((ranges[:, 1] - ranges[:, 0]) / bins)
for t in range(tracks.shape[1]):
i = (tracks[0, t] - ranges[0, 0]) * delta[0]
j = (tracks[1, t] - ranges[1, 0]) * delta[1]
k = (tracks[2, t] - ranges[2, 0]) * delta[2]
if 0 <= i < bins[0] and 0 <= j < bins[1]:
H[int(i), int(j), int(k)] += weights[t]
return H
def hist3d_numba_seq_weight(tracks, weights, bins, range, use_memmap=False, tmp=None):
"""Numba-compiled weighted 3d histogram
From https://iscinumpy.dev/post/histogram-speeds-in-python/
Parameters
----------
tracks : (x, y, z)
List of input arrays of identical length, to be histogrammed
weights : array-like
List of weights for each point of the input arrays
bins : (int, int, int)
shape of the final histogram
range : [[xmin, xmax], [ymin, ymax], [zmin, zmax]]]
Minimum and maximum value of the histogram, in each dimension
Other parameters
----------------
use_memmap : bool
If ``True`` and the number of bins is above 10 million,
the histogram is created into a memory-mapped Numpy array
tmp : str
Temporary file name for the memory map (only relevant if
``use_memmap`` is ``True``)
Returns
-------
histogram: array-like
Output Histogram
From https://iscinumpy.dev/post/histogram-speeds-in-python/
Examples
--------
>>> x = np.random.uniform(0., 1., 100)
>>> y = np.random.uniform(2., 3., 100)
>>> z = np.random.uniform(4., 5., 100)
>>> weights = np.random.uniform(0, 1., 100)
>>> H, _ = np.histogramdd((x, y, z), bins=(5, 6, 7),
... range=[(0., 1.), (2., 3.), (4., 5)],
... weights=weights)
>>> Hn = hist3d_numba_seq_weight(
... (x, y, z), weights, bins=(5, 6, 7),
... range=[[0., 1.], [2., 3.], [4., 5.]])
>>> assert np.all(H == Hn)
"""
H = _allocate_array_or_memmap(bins, np.double, use_memmap=use_memmap, tmp=tmp)
return _hist3d_numba_seq_weight(
H,
np.asarray(tracks),
weights,
np.asarray(list(bins)),
np.asarray(range),
)
@njit(nogil=True, parallel=False)
def _index_arr(a, ix_arr):
strides = np.array(a.strides) / a.itemsize
ix = int((ix_arr * strides).sum())
return a.ravel()[ix]
@njit(nogil=True, parallel=False)
def _index_set_arr(a, ix_arr, val):
strides = np.array(a.strides) / a.itemsize
ix = int((ix_arr * strides).sum())
a.ravel()[ix] = val
@njit(nogil=True, parallel=False)
def _histnd_numba_seq(H, tracks, bins, ranges, slice_int):
delta = 1 / ((ranges[:, 1] - ranges[:, 0]) / bins)
for t in range(tracks.shape[1]):
slicearr = np.array(
[(tracks[dim, t] - ranges[dim, 0]) * delta[dim] for dim in range(tracks.shape[0])]
)
good = np.all((slicearr < bins) & (slicearr >= 0))
slice_int[:] = slicearr
if good:
curr = _index_arr(H, slice_int)
_index_set_arr(H, slice_int, curr + 1)
return H
def histnd_numba_seq(tracks, bins, range, use_memmap=False, tmp=None):
"""Numba-compiled n-d histogram
From https://iscinumpy.dev/post/histogram-speeds-in-python/
Parameters
----------
tracks : (array-like, array-like, array-like)
List of input arrays, to be histogrammed
bins : (int, int, ...)
shape of the final histogram
range : [[min, max], ...]
Minimum and maximum value of the histogram, in each dimension
Other parameters
----------------
use_memmap : bool
If ``True`` and the number of bins is above 10 million,
the histogram is created into a memory-mapped Numpy array
tmp : str
Temporary file name for the memory map (only relevant if
``use_memmap`` is ``True``)
Returns
-------
histogram: array-like
Output Histogram
From https://iscinumpy.dev/post/histogram-speeds-in-python/
Examples
--------
>>> x = np.random.uniform(0., 1., 100)
>>> y = np.random.uniform(2., 3., 100)
>>> z = np.random.uniform(4., 5., 100)
>>> # 2d example
>>> H, _, _ = np.histogram2d(x, y, bins=np.array((5, 5)),
... range=[(0., 1.), (2., 3.)])
>>> alldata = np.array([x, y])
>>> Hn = histnd_numba_seq(alldata, bins=np.array([5, 5]),
... range=np.array([[0., 1.], [2., 3.]]))
>>> assert np.all(H == Hn)
>>> # 3d example
>>> H, _ = np.histogramdd((x, y, z), bins=np.array((5, 6, 7)),
... range=[(0., 1.), (2., 3.), (4., 5)])
>>> alldata = np.array([x, y, z])
>>> Hn = histnd_numba_seq(alldata, bins=np.array((5, 6, 7)),
... range=np.array([[0., 1.], [2., 3.], [4., 5.]]))
>>> assert np.all(H == Hn)
"""
tracks = np.asarray(tracks)
H = _allocate_array_or_memmap(bins, np.uint64, use_memmap=use_memmap, tmp=tmp)
slice_int = np.zeros(len(bins), dtype=np.uint64)
return _histnd_numba_seq(H, tracks, bins, range, slice_int)
def _wrap_histograms(numba_func, weight_numba_func, np_func, *args, **kwargs):
"""Histogram wrapper.
Make sure that the histogram fails safely if numba is not available or does not work.
In particular, if weights are complex, it will split them in real and imaginary part.
"""
weights = kwargs.pop("weights", None)
use_memmap = kwargs.pop("use_memmap", False)
tmp = kwargs.pop("tmp", None)
if np.iscomplexobj(weights):
return (
_wrap_histograms(
numba_func,
weight_numba_func,
np_func,
*args,
weights=weights.real,
use_memmap=use_memmap,
tmp=tmp,
**kwargs,
)
+ _wrap_histograms(
numba_func,
weight_numba_func,
np_func,
*args,
weights=weights.imag,
use_memmap=use_memmap,
tmp=tmp,
**kwargs,
)
* 1.0j
)
if not HAS_NUMBA:
return np_func(*args, weights=weights, **kwargs)[0]
try:
if weights is None:
return numba_func(*args, use_memmap=use_memmap, tmp=tmp, **kwargs)
if weight_numba_func is None:
raise TypeError("Weights not supported for this histogram")
return weight_numba_func(*args, weights=weights, use_memmap=use_memmap, tmp=tmp, **kwargs)
except (NumbaValueError, NumbaNotImplementedError, TypingError, TypeError):
warnings.warn(
"Cannot calculate the histogram with the numba implementation. "
"Trying standard numpy."
)
return np_func(*args, weights=weights, **kwargs)[0]
def histogram3d(*args, **kwargs):
"""Histogram implementation.
Accepts the same arguments as `numpy.histogramdd`, but tries to use a Numba implementation
of the histogram. Bonus: weights can be complex.
Examples
--------
>>> x = np.random.uniform(0., 1., 100)
>>> y = np.random.uniform(2., 3., 100)
>>> z = np.random.uniform(4., 5., 100)
>>> # 3d example
>>> H, _ = np.histogramdd((x, y, z), bins=np.array((5, 6, 7)),
... range=[(0., 1.), (2., 3.), (4., 5)])
>>> Hn = histogram3d((x, y, z), bins=np.array((5, 6, 7)),
... range=[(0., 1.), (2., 3.), (4., 5)])
>>> assert np.all(H == Hn)
"""
return _wrap_histograms(
hist3d_numba_seq, hist3d_numba_seq_weight, histogramdd_np, *args, **kwargs
)
def histogramnd(*args, **kwargs):
"""Histogram implementation.
Accepts the same arguments as `numpy.histogramdd`, but tries to use a Numba implementation
of the histogram. Bonus: weights can be complex.
Examples
--------
>>> x = np.random.uniform(0., 1., 100)
>>> y = np.random.uniform(2., 3., 100)
>>> z = np.random.uniform(4., 5., 100)
>>> # 2d example
>>> H, _, _ = np.histogram2d(x, y, bins=np.array((5, 5)),
... range=[(0., 1.), (2., 3.)])
>>> Hn = histogramnd((x, y), bins=np.array([5, 5]),
... range=np.array([[0., 1.], [2., 3.]]))
>>> assert np.all(H == Hn)
>>> # 3d example
>>> H, _ = np.histogramdd((x, y, z), bins=np.array((5, 6, 7)),
... range=[(0., 1.), (2., 3.), (4., 5)])
>>> alldata = (x, y, z)
>>> Hn = histogramnd(alldata, bins=np.array((5, 6, 7)),
... range=np.array([[0., 1.], [2., 3.], [4., 5.]]))
>>> assert np.all(H == Hn)
"""
return _wrap_histograms(histnd_numba_seq, None, histogramdd_np, *args, **kwargs)
def histogram2d(*args, **kwargs):
"""Histogram implementation.
Accepts the same arguments as `numpy.histogramdd`, but tries to use a Numba implementation
of the histogram. Bonus: weights can be complex.
Examples
--------
>>> x = np.random.uniform(0., 1., 100)
>>> y = np.random.uniform(2., 3., 100)
>>> weight = np.random.uniform(0, 1, 100)
>>> H, xedges, yedges = np.histogram2d(x, y, bins=(5, 5),
... range=[(0., 1.), (2., 3.)],
... weights=weight)
>>> Hn = histogram2d(x, y, bins=(5, 5),
... range=[[0., 1.], [2., 3.]],
... weights=weight)
>>> assert np.array_equal(H, Hn)
>>> Hn1 = histogram2d(x, y, bins=(5, 5),
... range=[[0., 1.], [2., 3.]],
... weights=None)
>>> Hn2 = histogram2d(x, y, bins=(5, 5),
... range=[[0., 1.], [2., 3.]])
>>> assert np.array_equal(Hn1, Hn2)
>>> Hn = histogram2d(x, y, bins=(5, 5),
... range=[[0., 1.], [2., 3.]],
... weights=weight + 1.j * weight)
>>> assert np.array_equal(Hn.real, Hn.imag)
>>> assert np.array_equal(H, Hn.real)
"""
return _wrap_histograms(
hist2d_numba_seq, hist2d_numba_seq_weight, histogram2d_np, *args, **kwargs
)
def histogram(*args, **kwargs):
"""Histogram implementation.
Accepts the same arguments as `numpy.histogramdd`, but tries to use a Numba implementation
of the histogram. Bonus: weights can be complex.
Examples
--------
>>> x = np.random.uniform(0., 1., 100)
>>> weights = np.random.uniform(0, 1, 100)
>>> H, xedges = np.histogram(x, bins=5, range=[0., 1.], weights=weights)
>>> Hn = histogram(x, weights=weights, bins=5, range=[0., 1.], tmp='out.npy',
... use_memmap=True)
>>> assert np.array_equal(H, Hn)
>>> Hn1 = histogram(x, weights=None, bins=5, range=[0., 1.])
>>> Hn2 = histogram(x, bins=5, range=[0., 1.])
>>> assert np.array_equal(Hn1, Hn2)
>>> Hn = histogram(x, weights=weights + weights * 2.j, bins=5, range=[0., 1.],
... tmp='out.npy', use_memmap=True)
>>> assert np.array_equal(Hn.real, Hn.imag / 2)
"""
return _wrap_histograms(
hist1d_numba_seq, hist1d_numba_seq_weight, histogram_np, *args, **kwargs
)
def equal_count_energy_ranges(energies, n_ranges, emin=None, emax=None):
"""Find energy ranges containing an approximately equal number of events.
Parameters
----------
energies : array-like
List of event energies
n_ranges : int
Number of output ranges
Other parameters
----------------
emin : float, default None
Minimum energy. Defaults to the minimum of ``energies``
emax : float, default None
Maximum energy. Defaults to the maximum of ``energies``
Returns
-------
bin_edges : array-like
Edges of the energy ranges, in a single array of length
``n_ranges+1``
Examples
--------
>>> energies = np.random.uniform(0, 10, 1000000)
>>> edges = equal_count_energy_ranges(energies, 5, emin=0, emax=10)
>>> assert np.allclose(edges, [0, 2, 4, 6, 8, 10], atol=0.05)
>>> edges = equal_count_energy_ranges(energies, 5)
>>> assert np.allclose(edges, [0, 2, 4, 6, 8, 10], atol=0.05)
>>> edges = equal_count_energy_ranges(energies, 0)
>>> assert np.allclose(edges, [0, 10], atol=0.05)
"""
need_filtering = False
if emin is not None or emax is not None:
need_filtering = True
if emin is None:
emin = energies.min()
if emax is None:
emax = energies.max()
if need_filtering:
good = (energies >= emin) & (energies <= emax)
energies = energies[good]
if n_ranges > 1:
percentiles = np.percentile(energies, np.linspace(0, 100, n_ranges + 1)[1:-1])
percentiles = np.concatenate([[emin], percentiles, [emax]])
else:
percentiles = [emin, emax]
return percentiles
def sum_if_not_none_or_initialize(A, B):
"""If A is None, define A as a copy of B. Otherwise, sum A + B.
Parameters
----------
A : object
The initial value
B : object
The value to be summed
Examples
--------
>>> sum_if_not_none_or_initialize(None, 2)
2
>>> sum_if_not_none_or_initialize(1, 2)
3
"""
if A is None:
return copy.deepcopy(B)
return A + B
def assign_if_not_finite(value, default):
"""Check if a value is finite. Otherwise, return the default.
Parameters
----------
value : float, int or `np.array`
The input value
default : float
The default value
Returns
-------
output : same as ``value``
The result
Examples
--------
>>> assign_if_not_finite(1, 3.2)
1
>>> assign_if_not_finite(np.inf, 3.2)
3.2
>>> input_arr = np.array([np.nan, 1, np.inf, 2])
>>> assert np.allclose(assign_if_not_finite(input_arr, 3.2), [3.2, 1, 3.2, 2])
"""
if isinstance(value, Iterable):
values = [assign_if_not_finite(val, default) for val in value]
values = np.array(values)
return values
if not np.isfinite(value):
return default
return value
def sqsum(array1, array2):
"""Return the square root of the sum of the squares of two arrays."""
return np.sqrt(np.add(np.square(array1), np.square(array2)))
@njit
def _int_sum_non_zero(array):
"""Sum all positive elements of an array of integers.
Parameters
----------
array : array-like
Array of integers
"""
sum = 0
for a in array:
if a > 0:
sum += int(a)
return sum