Source code for sloth.utils.arrays

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Arrays manipulation utilities
================================

Basic array manipulation using Numpy & Scipy
"""
import numpy as np
from scipy.interpolate import interp1d

from sloth.utils.logging import getLogger

_logger = getLogger("sloth.utils.arrays")


[docs] def imin(arr, debug=False): """index of minimum value""" _im = np.argmin(arr) if debug: _logger.debug("Check: {0} = {1}".format(np.min(arr), arr[_im])) return _im
[docs] def imax(arr, debug=False): """index of maximum value""" _im = np.argmax(arr) if debug: _logger.debug("Check: {0} = {1}".format(np.max(arr), arr[_im])) return _im
[docs] def lists_to_matrix(data, axis=None, **kws): """Convert two lists of 1D arrays to a 2D matrix Parameters ---------- data : list of lists of 1D arrays [ [x1, ... xN] [z1, ... zN] ] axis : None or array 1D, optional a reference axis used for the interpolation [None -> xdats[0]] **kws : optional keyword arguments for scipy.interpolate.interp1d() Returns ------- axis, outmat : arrays """ assert len(data) == 2, "'data' should be a list of two lists" xdats, zdats = data assert isinstance(xdats, list), "'xdats' not a list" assert isinstance(zdats, list), "'zdats' not a list" assert len(xdats) == len(zdats), "lists of data not of the same length" assert all(isinstance(z, np.ndarray) for z in zdats), "data in list must be arrays" if axis is None: axis = xdats[0] assert isinstance(axis, np.ndarray), "axis must be array" if all(z.size == axis.size for z in zdats): #: all same size return axis, np.array(zdats) else: #: interpolate outmat = np.zeros((len(zdats), axis.size)) for idat, (xdat, zdat) in enumerate(zip(xdats, zdats)): fdat = interp1d(xdat, zdat, **kws) znew = fdat(axis) outmat[idat] = znew return axis, outmat
[docs] def curves_to_matrix(curves, axis=None, **kws): """Convert a list of curves to a 2D data matrix Parameters ---------- curves : list of lists Curves format is the following: [ [x1, y1, label1, info1], ... [xN, yN, labelN, infoN] ] axis : None or array 1D, optional a reference axis used for the interpolation [None -> curves[0][0]] **kws : optional keyword arguments for func:`scipy.interpolate.interp1d` Returns ------- axis, outmat : arrays """ assert isinstance(curves, list), "'curves' not a list" assert all( (isinstance(curve, list) and len(curve) == 4) for curve in curves ), "curves should be lists of four elements" if axis is None: axis = curves[0][0] assert isinstance(axis, np.ndarray), "axis must be array" outmat = np.zeros((len(curves), axis.size)) for icurve, curve in enumerate(curves): assert len(curve) == 4, "wrong curve format, should contain four elements" x, y, label, info = curve try: assert isinstance(x, np.ndarray), "not array!" assert isinstance(y, np.ndarray), "not array!" except AssertionError: _logger.error( "[curve_to_matrix] Curve %d (%s) not containing arrays -> ADDING ZEROS", icurve, label, ) continue if (x.size == axis.size) and (y.size == axis.size): #: all same length outmat[icurve] = y else: #: interpolate fdat = interp1d(x, y, **kws) ynew = fdat(axis) outmat[icurve] = ynew _logger.debug("[curve_to_matrix] Curve %d (%s) interpolated", icurve, label) return axis, outmat
[docs] def sum_arrays_1d(data, axis=None, **kws): """Sum list of 1D arrays or curves by interpolation on a reference axis Parameters ---------- data : lists of lists data_fmt : str define data format - "curves" -> :func:`curves_to_matrix` - "lists" -> :func:`curves_to_matrix` Returns ------- axis, zsum : 1D arrays """ data_fmt = kws.pop("data_fmt", "curves") if data_fmt == "curves": ax, mat = curves_to_matrix(data, axis=axis, **kws) elif data_fmt == "lists": ax, mat = lists_to_matrix(data, axis=axis, **kws) else: raise NameError("'data_fmt' not understood") return ax, np.sum(mat, 0)
[docs] def avg_arrays_1d(data, axis=None, weights=None, **kws): """Average list of 1D arrays or curves by interpolation on a reference axis Parameters ---------- data : lists of lists data_fmt : str define data format - "curves" -> :func:`curves_to_matrix` - "lists" -> :func:`curves_to_matrix` weights : None or array weights for the average Returns ------- axis, zavg : 1D arrays np.average(zdats) """ data_fmt = kws.pop("data_fmt", "curves") if data_fmt == "curves": ax, mat = curves_to_matrix(data, axis=axis, **kws) elif data_fmt == "lists": ax, mat = lists_to_matrix(data, axis=axis, **kws) else: raise NameError("'data_fmt' not understood") return ax, np.average(mat, axis=0, weights=weights)
[docs] def merge_arrays_1d(data, method="average", axis=None, weights=None, **kws): """Merge a list of 1D arrays by interpolation on a reference axis Parameters ---------- data : lists of lists data_fmt : str define data format - "curves" -> :func:`curves_to_matrix` - "lists" -> :func:`curves_to_matrix` axis : None or array 1D, optional a reference axis used for the interpolation [None -> xdats[0]] method : str, optional method used to merge, available methods are: - "average" : uses np.average() - "sum" : uses np.sum() weights : None or array 1D, optional used if method == "average" Returns ------- axis, zmrg : 1D arrays merge(zdats) """ if method == "sum": return sum_arrays_1d(data, axis=axis, **kws) elif method == "average": return avg_arrays_1d(data, axis=axis, weights=weights, **kws) else: raise NameError("wrong 'method': %s" % method)
[docs] def rebin_piecewise_constant(x1, y1, x2): """Rebin histogram values y1 from old bin edges x1 to new edges x2. Code taken from: https://github.com/jhykes/rebin/blob/master/rebin.py It follows the procedure described in Figure 18.13 (chapter 18.IV.B. Spectrum Alignment, page 703) of Knoll [1] References ---------- [1] Glenn Knoll, Radiation Detection and Measurement, third edition, Wiley, 2000. Parameters ---------- - x1 : m+1 array of old bin edges. - y1 : m array of old histogram values. This is the total number in each bin, not an average. - x2 : n+1 array of new bin edges. Returns ------- - y2 : n array of rebinned histogram values. """ x1 = np.asarray(x1) y1 = np.asarray(y1) x2 = np.asarray(x2) # the fractional bin locations of the new bins in the old bins i_place = np.interp(x2, x1, np.arange(len(x1))) cum_sum = np.r_[[0], np.cumsum(y1)] # calculate bins where lower and upper bin edges span # greater than or equal to one original bin. # This is the contribution from the 'intact' bins (not including the # fractional start and end parts. whole_bins = np.floor(i_place[1:]) - np.ceil(i_place[:-1]) >= 1.0 start = cum_sum[np.ceil(i_place[:-1]).astype(int)] finish = cum_sum[np.floor(i_place[1:]).astype(int)] y2 = np.where(whole_bins, finish - start, 0.0) bin_loc = np.clip(np.floor(i_place).astype(int), 0, len(y1) - 1) # fractional contribution for bins where the new bin edges are in the same # original bin. same_cell = np.floor(i_place[1:]) == np.floor(i_place[:-1]) frac = i_place[1:] - i_place[:-1] contrib = frac * y1[bin_loc[:-1]] y2 += np.where(same_cell, contrib, 0.0) # fractional contribution for bins where the left and right bin edges are in # different original bins. different_cell = np.floor(i_place[1:]) > np.floor(i_place[:-1]) frac_left = np.ceil(i_place[:-1]) - i_place[:-1] contrib = frac_left * y1[bin_loc[:-1]] frac_right = i_place[1:] - np.floor(i_place[1:]) contrib += frac_right * y1[bin_loc[1:]] y2 += np.where(different_cell, contrib, 0.0) return y2
### MOVED TO sloth.utils.pymca // TODO: move to Larch #def reject_outliers(data, m=5.189, return_ma=False): # """Reject outliers # # Modified from: https://stackoverflow.com/questions/11686720/is-there-a-numpy-builtin-to-reject-outliers-from-a-list # See also: https://www.itl.nist.gov/div898/handbook/eda/section3/eda35h.htm # """ # if not isinstance(data, np.ndarray): # data = np.array(data) # d = np.abs(data - np.median(data)) # mdev = np.median(d) # s = d / (mdev if mdev else 1.0) # mask = s < m # if return_ma: # imask = s > m # return np.ma.masked_array(data=data, mask=imask) # else: # return data[mask] if __name__ == "__main__": pass