Source code for sloth.math.deglitch

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Deglitch utilities
=====================

"""
import numpy as np

import logging

_logger = logging.getLogger("sloth.math.deglitch")


[docs] def remove_spikes_medfilt1d(y_spiky, backend="silx", kernel_size=3, threshold=0.1): """Remove spikes in a 1D array using medfilt from silx.math Parameters ---------- y_spiky : array spiky data backend : str, optional library to use as backend - 'silx' -> from silx.math.medianfilter import medfilt1d - 'pymca' -> from PyMca5.PyMcaMath.PyMcaSciPy.signal import medfilt1d - 'pandas' : TODO kernel_size : int, optional kernel size where to calculate median, must be odd [3] threshold : float, optional relative difference between filtered and spiky data [0.1] Returns ------- array filtered array """ ynew = np.zeros_like(y_spiky) if not (kernel_size % 2): kernel_size += 1 _logger.warning("'kernel_size' must be odd -> adjusted to %d", kernel_size) if backend == "silx": return remove_spikes_silx(y_spiky, kernel_size=kernel_size, threshold=threshold) elif backend == "pymca": return remove_spikes_silx(y_spiky, kernel_size=kernel_size, threshold=threshold) elif backend == "pandas": return remove_spikes_pandas(y_spiky, window=kernel_size, threshold=threshold) else: _logger.warning("backend for medfilt1d not found! -> returning zeros") return ynew
[docs] def remove_spikes_silx(y_spiky, kernel_size=3, threshold=0.1): """Remove spikes in a 1D array using medfilt from silx.math Parameters ---------- y_spiky : array spiky data kernel_size : int, optional kernel size where to calculate median, must be odd [3] threshold : float, optional difference between filtered and spiky data relative [0.1] Returns ------- array filtered array """ ynew = np.zeros_like(y_spiky) try: from silx.math.medianfilter import medfilt1d except ImportError: _logger.warning("medfilt1d (from SILX) not found! -> returning zeros") return ynew y_filtered = medfilt1d( y_spiky, kernel_size=kernel_size, conditional=True, mode="nearest", cval=0 ) diff = y_filtered - y_spiky rel_diff = diff / y_filtered ynew = np.where(abs(rel_diff) > threshold, y_filtered, y_spiky) return ynew
[docs] def remove_spikes_pymca(y_spiky, kernel_size=9, threshold=0.66): """Remove spikes in a 1D array using medfilt from PyMca5.PyMcaMath.PyMcaScipy.signal Parameters ---------- y_spiky : array spiky data kernel_size : int, optional kernel size where to calculate median, should be odd [9] threshold : float, optional difference between filtered and spiky data in sigma units [0.66] Returns ------- array filtered array """ ynew = np.zeros_like(y_spiky) try: from PyMca5.PyMcaMath.PyMcaSciPy.signal import medfilt1d except ImportError: _logger.warning("medfilt1d (from PyMca5) not found! -> returning zeros") return ynew y_filtered = medfilt1d(y_spiky, kernel_size) diff = y_filtered - y_spiky mean = diff.mean() sigma = (y_spiky - mean) ** 2 sigma = np.sqrt(sigma.sum() / float(len(sigma))) ynew = np.where(abs(diff) > threshold * sigma, y_filtered, y_spiky) return ynew
[docs] def remove_spikes_pandas(y, window=3, threshold=3): """remove spikes using pandas Taken from `https://ocefpaf.github.io/python4oceanographers/blog/2015/03/16/outlier_detection/`_ .. note:: this will not work in pandas > 0.17 one could simply do `df.rolling(3, center=True).median()`; also df.as_matrix() is deprecated, use df.values instead Parameters ---------- y : array 1D window : int (optional) window in rolling median [3] threshold : int (optional) number of sigma difference with original data Return ------ ynew : array like x/y """ ynew = np.zeros_like(y) try: import pandas as pd except ImportError: _logger.error("pandas not found! -> returning zeros") return ynew df = pd.DataFrame(y) try: yf = ( pd.rolling_median(df, window=window, center=True) .fillna(method="bfill") .fillna(method="ffill") ) diff = yf.as_matrix() - y mean = diff.mean() sigma = (y - mean) ** 2 sigma = np.sqrt(sigma.sum() / float(len(sigma))) ynew = np.where(abs(diff) > threshold * sigma, yf.as_matrix(), y) except Exception: yf = ( df.rolling(window, center=True) .median() .fillna(method="bfill") .fillna(method="ffill") ) diff = yf.values - y mean = diff.mean() sigma = (y - mean) ** 2 sigma = np.sqrt(sigma.sum() / float(len(sigma))) ynew = np.where(abs(diff) > threshold * sigma, yf.values, y) # ynew = np.array(yf.values).reshape(len(x)) return ynew