#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""SpecfileData object to read data in SPEC_ format
===================================================
.. _SPEC: http://www.certif.com/content/spec
Requirements
------------
- silx (http://www.silx.org/doc/silx/dev/modules/io/specfilewrapper.html)
TODO
----
- _pymca_average() : use faster scipy.interpolate.interp1d
- implement a 2D normalization in get_map
- implement the case of dichroic measurements (two consecutive scans
with flipped helicity)
"""
__author__ = ["Mauro Rovezzi", "Matt Newville"]
__version__ = "2020-04_1 (sloth)" # to keep track between larch and sloth
import os
import numpy as np
from scipy.interpolate import interp1d
# from scipy.ndimage import map_coordinates
# to grid X,Y,Z columnar data
HAS_GRIDXYZ = False
try:
# from larch_plugins.math.gridxyz import gridxyz
from ..math.gridxyz import gridxyz
HAS_GRIDXYZ = True
except ImportError:
pass
HAS_SPECFILE = False
try:
from silx.io import specfilewrapper as specfile
HAS_SPECFILE = True
except ImportError:
pass
# SimpleMath from PyMca5
HAS_SIMPLEMATH = False
try:
from PyMca5.PyMcaMath import SimpleMath
HAS_SIMPLEMATH = True
except ImportError:
pass
# SG module from PyMca5
HAS_SGMODULE = False
try:
from PyMca5.PyMcaMath import SGModule
HAS_SGMODULE = True
except ImportError:
pass
# specfile_writer
HAS_SFDW = False
try:
from .specfile_writer import SpecfileDataWriter
HAS_SFDW = True
except (ValueError, ImportError):
pass
# ## UTILITIES (the class is below!)
from sloth.utils.strings import str2rng as _str2rng # noqa
def _mot2array(motor, acopy):
"""simple utility to generate a copy of an array containing a
constant value (e.g. motor position)
"""
a = np.ones_like(acopy)
return np.multiply(a, motor)
def _make_dlist(dall, rep=1):
"""make a list of strings representing the scans to average
Parameters
----------
dall : list of all good scans
rep : int, repetition
Returns
-------
dlist : list of lists of int
"""
dlist = [[] for d in range(rep)]
for idx in range(rep):
dlist[idx] = dall[idx::rep]
return dlist
def _checkZeroDiv(num, dnum):
"""compatibility layer"""
print("DEPRECATED: use '_check_zero_div' instead")
return _check_zero_div(num, dnum)
def _check_zero_div(num, dnum):
"""simple division check to avoid ZeroDivisionError"""
try:
return num / dnum
except ZeroDivisionError:
print("ERROR: found a division by zero")
def _checkScans(scans):
"""compatibility layer"""
print("DEPRECATED: use '_check_scans' instead")
return _check_scans(scans)
def _check_scans(scans):
"""simple checker for scans input"""
if scans is None:
raise NameError("Provide a string or list of scans to load")
if type(scans) is str:
try:
nscans = _str2rng(scans)
except Exception:
raise NameError(
"scans string '{0}' not understood by str2rng".format(scans)
)
elif type(scans) is list:
nscans = scans
else:
raise NameError("Provide a string or list of scans to load")
return nscans
def _numpy_sum_list(xdats, zdats):
"""sum list of arrays
Parameters
----------
xdats, zdats : lists of arrays
Returns
-------
xdats[0], sum(zdats)
"""
try:
# sum element-by-element
arr_zdats = np.array(zdats)
return xdats[0], np.sum(arr_zdats, 0)
except Exception:
# sum by interpolation
xref = xdats[0]
zsum = np.zeros_like(xref)
for xdat, zdat in zip(xdats, zdats):
fdat = interp1d(xdat, zdat)
zsum += fdat(xref)
return xref, zsum
def _pymca_average(xdats, zdats):
"""call to SimpleMath.average() method from PyMca/SimpleMath.py
Parameters
----------
- xdats, ydats : lists of arrays contaning the data to merge
Returns
-------
- xmrg, zmrg : 1D arrays containing the merged data
"""
if HAS_SIMPLEMATH:
sm = SimpleMath.SimpleMath()
print("Merging data...")
return sm.average(xdats, zdats)
else:
raise NameError(
"SimpleMath is not available -- this operation cannot be performed!"
)
def _pymca_SG(ydat, npoints=3, degree=1, order=0):
"""call to symmetric Savitzky-Golay filter in PyMca
Parameters
----------
ydat : 1D array contaning the data to smooth
npoints : integer [3], means that 2*npoints+1 values contribute
to the smoother.
degree : degree of fitting polynomial
order : is degree of implicit differentiation
0 means that filter results in smoothing of function
1 means that filter results in smoothing the first
derivative of function.
and so on ...
Returns
-------
ys : smoothed array
"""
if HAS_SGMODULE:
return SGModule.getSavitzkyGolay(
ydat, npoints=npoints, degree=degree, order=order
)
else:
raise NameError(
"SGModule is not available -- this operation cannot be performed!"
)
def savitzky_golay(y, window_size, order, deriv=0):
# code from from scipy cookbook
"""Smooth (and optionally differentiate) data with a Savitzky-Golay
filter. The Savitzky-Golay filter removes high frequency noise
from data. It has the advantage of preserving the original shape
and features of the signal better than other types of filtering
approaches, such as moving averages techhniques.
Parameters
----------
y : array_like, shape (N,)
the values of the time history of the signal.
window_size : int
the length of the window. Must be an odd integer
number.
order : int
the order of the polynomial used in the filtering. Must be
less then `window_size` - 1.
deriv: int
the order of the derivative to compute (default = 0 means
only smoothing)
Returns
-------
ys : ndarray, shape (N)
the smoothed signal (or it's n-th derivative).
Notes
-----
The Savitzky-Golay is a type of low-pass filter, particularly
suited for smoothing noisy data. The main idea behind this
approach is to make for each point a least-square fit with a
polynomial of high order over a odd-sized window centered at the
point.
Examples
--------
t = np.linspace(-4, 4, 500)
y = np.exp( -t**2 ) + np.random.normal(0, 0.05, t.shape)
ysg = savitzky_golay(y, window_size=31, order=4)
import matplotlib.pyplot as plt
plt.plot(t, y, label='Noisy signal')
plt.plot(t, np.exp(-t**2), 'k', lw=1.5, label='Original signal')
plt.plot(t, ysg, 'r', label='Filtered signal')
plt.legend()
plt.show()
References
----------
.. [1] A. Savitzky, M. J. E. Golay, Smoothing and Differentiation
of Data by Simplified Least Squares Procedures. Analytical
Chemistry, 1964, 36 (8), pp 1627-1639.
.. [2] Numerical Recipes 3rd Edition: The Art of Scientific
Computing W.H. Press, S.A. Teukolsky, W.T. Vetterling,
B.P. Flannery Cambridge University Press ISBN-13:
9780521880688
"""
try:
window_size = abs(int(window_size))
order = abs(int(order))
except ValueError:
raise ValueError("window_size and order have to be of type int")
if window_size % 2 != 1 or window_size < 1:
raise TypeError("window_size size must be a positive odd number")
if window_size < order + 2:
raise TypeError("window_size is too small for the polynomials order")
order_range = range(order + 1)
half_window = (window_size - 1) // 2
# precompute coefficients
b = np.mat(
[[k ** i for i in order_range] for k in range(-half_window, half_window + 1)]
)
m = np.linalg.pinv(b).A[deriv]
# pad the signal at the extremes with
# values taken from the signal itself
firstvals = y[0] - np.abs(y[1 : half_window + 1][::-1] - y[0])
lastvals = y[-1] + np.abs(y[-half_window - 1 : -1][::-1] - y[-1])
y = np.concatenate((firstvals, y, lastvals))
return np.convolve(m, y, mode="valid")
# ==================================================================
# MAIN CLASS
# ==================================================================
[docs]
class SpecfileData(object):
"""SpecfileData object"""
def __init__(
self,
fname=None,
cntx=1,
cnty=None,
csig=None,
cmon=None,
csec=None,
norm=None,
verbosity=0,
):
"""reads the given specfile
Parameters
----------
fname : SPEC file name [string, None]
if 'DUMMY!': return (used to get docstrings)
cntx : counter for x axis, motor 1 scanned [string, 1]
cnty : counter for y axis, motor 2 steps [string, None]
used by get_map()
csig : counter for signal [string, None]
cmon : counter for monitor/normalization [string, None]
csec : counter for time in seconds [string, None]
scnt : scan type [string, None]
norm : normalization [string, None]
'max' -> z/max(z)
'max-min' -> (z-min(z))/(max(z)-min(z))
'area' -> (z-min(z))/trapz(z, x)
'sum' -> (z-min(z)/sum(z)
verbosity : level of verbosity [int, 0]
Returns
-------
None, sets attributes.
self.fname -> spec file name
self.sf -> spec file object
self.cntx/cnty/csig/cmon/csec/norm
"""
self.verbosity = verbosity
if fname == "DUMMY!":
return
if HAS_SPECFILE is False:
if self.verbosity > 1:
print("WARNING 'specfile' is missing -> check requirements!")
return
if fname is None:
raise NameError("Provide a SPEC data file to load with full path")
elif not os.path.isfile(fname):
raise OSError("File not found: '%s'" % fname)
else:
if hasattr(self, "sf") and hasattr(self, "fname"):
if self.fname == fname:
pass
else:
self.sf = specfile.Specfile(fname) # sf = specfile file
self.fname = fname
if self.verbosity > 0:
print("Loaded: {0} ({1} scans)".format(fname, self.sf.scanno()))
# if HAS_SIMPLEMATH: self.sm = SimpleMath.SimpleMath()
# set common attributes
self.cntx = cntx
self.cnty = cnty
self.csig = csig
self.cmon = cmon
self.csec = csec
self.norm = norm
def get_scan(self, scan=None, scnt=None, **kws):
"""get a single scan from a SPEC file
Parameters
----------
scan : scan number to get [integer]
cntx : counter for x axis, motor 1 scanned [string]
cnty : counter for y axis, motor 2 steps [string] - used by get_map()
csig : counter for signal [string]
cmon : counter for monitor/normalization [string]
csec : counter for time in seconds [string]
scnt : scan type [string]
norm : normalization [string]
'max' -> z/max(z)
'max-min' -> (z-min(z))/(max(z)-min(z))
'area' -> (z-min(z))/trapz(z, x)
'sum' -> (z-min(z)/sum(z)
Returns
-------
scan_datx : 1D array with x data (scanned axis)
scan_datz : 1D array with z data (intensity axis)
scan_mots : dictionary with all motors positions for the given scan
NOTE: if cnty is given, it will return only scan_mots[cnty]
scan_info : dictionary with information on the scan
"""
if HAS_SPECFILE is False:
raise NameError("Specfile not available!")
# get keywords arguments
cntx = kws.get("cntx", self.cntx)
cnty = kws.get("cnty", self.cnty)
csig = kws.get("csig", self.csig)
cmon = kws.get("cmon", self.cmon)
csec = kws.get("csec", self.csec)
norm = kws.get("norm", self.norm)
# input checks
if scan is None:
raise NameError(
"Give a scan number [integer]: between 1 and {0}".format(
self.sf.scanno()
)
)
if cntx is None:
raise NameError("Give the counter for x, the abscissa [string]")
if cnty is not None and not (cnty in self.sf.allmotors()):
raise NameError("'{0}' is not in the list of motors".format(cnty))
if csig is None:
raise NameError("Give the counter for signal [string]")
# select the given scan number
# NOTE: here impossible to catch an exception, if the next
# fails, specfile will directly call sys.exit! the try: except
# did not work!
_scanstr = str(scan)
if "." in _scanstr:
_scansel = _scanstr
else:
_scansel = "{0}.1".format(_scanstr)
self.sd = self.sf.select(_scansel) # sd = specfile data
# the case cntx is not given, the first counter is taken by default
if cntx == 1:
_cntx = self.sd.alllabels()[0]
else:
_cntx = cntx
## x-axis
scan_datx = self.sd.data_column_by_name(_cntx)
_xlabel = "x"
_xscale = 1.0
if scnt is None:
# try to guess the scan type if it is not given
# this condition should work in case of an energy scan
if "ene" in _cntx.lower():
# this condition should detect if the energy scale is keV
if (scan_datx.max() - scan_datx.min()) < 3.0:
scan_datx = scan_datx * 1000
_xscale = 1000.0
_xlabel = "energy, eV"
else:
scan_datx = self.sd.data_column_by_name(cntx)
_xscale = 1.0
_xlabel = "energy, keV"
else:
raise NameError("Wrong scan type string")
# z-axis (start with the signal)
# data signal
datasig = self.sd.data_column_by_name(csig)
# data monitor
if cmon is None:
datamon = np.ones_like(datasig)
labmon = "1"
elif ("int" in str(type(cmon))) or ("float" in str(type(cmon))):
# the case we want to divide by a constant value
datamon = _mot2array(cmon, datasig)
labmon = str(cmon)
else:
datamon = self.sd.data_column_by_name(cmon)
labmon = str(cmon)
# data counts
if csec == "counts":
scan_datz = (datasig / datamon) * np.mean(datamon)
_zlabel = "((signal/{0})*mean({0}))".format(labmon)
elif csec is not None: # data in cps
scan_datz = (
(datasig / datamon) * np.mean(datamon)
) / self.sd.data_column_by_name(csec)
_zlabel = "((signal/{0})*mean({0}))/seconds".format(labmon)
else:
scan_datz = datasig / datamon
_zlabel = "signal/{0}".format(labmon)
# z-axis normalization, if required
if norm is not None:
_zlabel = "{0} norm by {1}".format(_zlabel, norm)
if norm == "max":
scan_datz = _check_zero_div(scan_datz, np.max(scan_datz))
elif norm == "max-min":
scan_datz = _check_zero_div(
scan_datz - np.min(scan_datz), np.max(scan_datz) - np.min(scan_datz)
)
elif norm == "area":
scan_datz = _check_zero_div(
scan_datz - np.min(scan_datz), np.trapz(scan_datz, x=scan_datx)
)
elif norm == "sum":
scan_datz = _check_zero_div(
scan_datz - np.min(scan_datz), np.sum(scan_datz)
)
else:
raise NameError("Provide a correct normalization type string")
# z-axis replace nan and inf, in case
scan_datz = np.nan_to_num(scan_datz)
# the motors dictionary
try:
scan_mots = dict(zip(self.sf.allmotors(), self.sd.allmotorpos()))
except Exception:
if self.verbosity > 0:
print("INFO: NO MOTORS IN {0}".format(self.fname))
scan_mots = {}
# y-axis
if cnty is not None:
_ylabel = "motor {0} at {1}".format(cnty, scan_mots[cnty])
else:
_ylabel = _zlabel
# collect information on the scan
scan_info = {
"xlabel": _xlabel,
"xscale": _xscale,
"ylabel": _ylabel,
"zlabel": _zlabel,
}
if cnty is not None:
return scan_datx, scan_datz, scan_mots[cnty] * _xscale
else:
return scan_datx, scan_datz, scan_mots, scan_info
def get_map(self, scans=None, **kws):
"""get a map composed of many scans repeated at different position of
a given motor
Parameters
----------
scans : scans to load in the map [string]; the format of the
string is intended to be parsed by '_str2rng()'
**kws : see get_scan() method
Returns
-------
xcol, ycol, zcol : 1D arrays representing the map
"""
# get keywords arguments
cntx = kws.get("cntx", self.cntx)
cnty = kws.get("cnty", self.cnty)
csig = kws.get("csig", self.csig)
cmon = kws.get("cmon", self.cmon)
csec = kws.get("csec", self.csec)
norm = kws.get("norm", self.norm)
# check inputs - some already checked in get_scan()
nscans = _check_scans(scans)
if cnty is None:
raise NameError("Provide the name of an existing motor")
#
_counter = 0
for scan in nscans:
x, z, moty = self.get_scan(
scan=scan,
cntx=cntx,
cnty=cnty,
csig=csig,
cmon=cmon,
csec=csec,
scnt=None,
norm=norm,
)
y = _mot2array(moty, x)
if self.verbosity > 0:
print("INFO loading scan {0} into the map...".format(scan))
if _counter == 0:
xcol = x
ycol = y
zcol = z
else:
xcol = np.append(xcol, x)
ycol = np.append(ycol, y)
zcol = np.append(zcol, z)
_counter += 1
return xcol, ycol, zcol
def grid_map(self, xcol, ycol, zcol, xystep=None, lib="scipy", method="cubic"):
if HAS_GRIDXYZ is True:
return gridxyz(xcol, ycol, zcol, xystep=xystep, lib=lib, method=method)
else:
return
def get_scans(self, scans=None, motinfo=True, **kws):
"""get a list of scans
Parameters
----------
scans : string or list of scans to load [None]; the format of the
string is intended to be parsed by '_str2rng()'
motinfo : boolean [True] returns also motors and scaninfo
dictionaries (see self.get_scan())
Returns
-------
xdats, zdats : list of arrays
if motinfo: return also mdats, idats dictionaries
"""
# get keywords arguments
cntx = kws.get("cntx", self.cntx)
csig = kws.get("csig", self.csig)
cmon = kws.get("cmon", self.cmon)
csec = kws.get("csec", self.csec)
norm = kws.get("norm", self.norm)
#
nscans = _check_scans(scans)
#
_ct = 0
xdats = []
zdats = []
mdats = []
idats = []
if self.verbosity > 0:
print("INFO loading {0} scans from SPEC ...".format(len(nscans)))
for scan in nscans:
_x, _z, _m, _i = self.get_scan(
scan=scan,
cntx=cntx,
cnty=None,
csig=csig,
cmon=cmon,
csec=csec,
scnt=None,
norm=norm,
)
xdats.append(_x)
zdats.append(_z)
if motinfo:
mdats.append(_m)
idats.append(_i)
print("Loading scan {0}...".format(scan))
_ct += 1
if motinfo:
return xdats, zdats, mdats, idats
else:
return xdats, zdats
def get_mrg(self, scans=None, action="average", **kws):
"""get a merged scan from a list of scans
Parameters
----------
scans : scans to load in the merge [string]
the format of the string is intended to be parsed by '_str2rng()'
action : action to perform on the loaded list of scans
'average' -> average the scans ( see _pymca_average() )
'sum' -> sum all zscans ( see _numpy_sum_list() )
'join' -> concatenate the scans
'single' -> scans_list[0] : equivalent to get_scan()
**kws : see get_scan() method
Returns
-------
xmrg, zmrg : 1D arrays
"""
# check inputs - some already checked in get_scan()/get_scans()
nscans = _check_scans(scans)
actions = ["single", "average", "sum", "join"]
if action not in actions:
raise NameError("'action={0}' not in known actions {1}".format(actions))
# moved to get_scans
xdats, zdats = self.get_scans(scans=nscans, motinfo=False, **kws)
# override 'action' keyword if it is only one scan
if len(nscans) == 1:
action = "single"
if self.verbosity > 1:
print("WARNING(get_mrg): len(scans)==1 -> 'action=single'")
if action == "average":
if self.verbosity > 0:
print("INFO: merging data...")
return _pymca_average(xdats, zdats)
elif action == "sum":
return _numpy_sum_list(xdats, zdats)
elif action == "join":
return np.concatenate(xdats, axis=0), np.concatenate(zdats, axis=0)
elif action == "single":
return xdats[0], zdats[0]
def get_mrgs_by(self, scans="all", nbin=1, **kws):
"""get merge by groups of scans
Parameters
----------
scans : string ['all'] to pass to _str2rng, if 'all',
sf.scanno() is taken
nbin : int [1], number of scans to merge together
Returns
-------
xmrgs, zmrgs : lists of merged arrays
"""
# get keywords arguments
cntx = kws.get("cntx", self.cntx)
csig = kws.get("csig", self.csig)
cmon = kws.get("cmon", self.cmon)
csec = kws.get("csec", self.csec)
norm = kws.get("norm", self.norm)
action = kws.get("action", "average")
#
xmrgs = []
zmrgs = []
if scans == "all":
scans = "{0}:{1}".format(1, self.sf.scanno())
try:
nScans = _str2rng(scans)
nAvg = nScans[::nbin]
except Exception:
raise NameError("wrong 'scans'/'nbin' parameters!")
nScansLast = len(nScans) % nbin
for iAvg, Avg in enumerate(nAvg):
iStart = iAvg * nbin
if Avg == nAvg[-1] and not nScansLast == 0:
if self.verbosity > 1:
print(
"WARNING avg {0} is of {1} scans only".format(iAvg, nScansLast)
)
nAdd = nScansLast
else:
nAdd = nbin
mscans = nScans[iStart : iStart + nAdd]
if self.verbosity > 0:
print("INFO avg {0}: scans='{1}'".format(iAvg, str(mscans)))
_xmrg, _zmrg = self.get_mrg(
scans=mscans,
action=action,
cntx=cntx,
cnty=None,
csig=csig,
cmon=cmon,
csec=csec,
scnt=None,
norm=norm,
)
xmrgs.append(_xmrg)
zmrgs.append(_zmrg)
return xmrgs, zmrgs
def get_mrgs_rep(self, scns, nrep=3, **kws):
"""get merge by groups of repetitions
Parameters
----------
scns : string
string representing ALL the good scans (parsed by str2rng)
"""
print("Not implemented yet!")
def get_det_dt(self, zcts, tau, secs=None):
"""get detector signal corrected by dead time
Parameters
----------
zcts : array of floats
detector [counts], if ysecs=None [counts/s]
tau : float
tau [s]
secs : array of floats, None
normalization time [s]
Returns
-------
zcts_corr : array of floats
zcps = zcts/secs
zcps_corr = zcps / (1 - zcps * tau)
zcts_corr = zcps_corr * secs
"""
if secs is not None:
try:
zcts = zcts / secs
except Exception:
print("det_dtc ERROR")
return zcts
try:
# import pdb
# pdb.set_trace()
# print(zcps)
zcts_corr = zcts / (1 - zcts * tau)
except Exception:
print("det_dtc ERROR step 2")
return zcts
if secs is not None:
return zcts_corr * secs
else:
return zcts_corr
def get_filter(self, ydats, method="scipySG", **kws):
"""get filtered data using a list of ydats and given method
Parameters
----------
ydats : list of 1D arrays
method : 'scipySG' -> Savitsky Golay filter from Scipy
(see savitzky_golay())
'pymcaSG' -> Savitsky Golay filter from PyMca
(see _pymca_SG())
Returns
-------
ysdats : list of 1D smoothed arrays
"""
if method == "pymcaSG":
npoints = kws.get("npoints", 9)
degree = kws.get("degree", 4)
order = kws.get("order", 0)
ysdats = []
if self.verbosity > 0:
print("INFO smoothing data with Savitzky-Golay filter (pymca)...")
for y in ydats:
ysdats.append(_pymca_SG(y, npoints=npoints, degree=degree, order=order))
return ysdats
elif method == "scipySG":
window_size = kws.get("window_size", 9)
order = kws.get("order", 4)
deriv = kws.get("deriv", 0)
ysdats = []
if self.verbosity > 0:
print("INFO smoothing data with Savitzky-Golay filter (scipy)...")
for y in ydats:
ysdats.append(
savitzky_golay(y, window_size=window_size, order=order, deriv=deriv)
)
return ysdats
else:
raise NameError("method not known!")
def write_ascii(self, scans, **kws):
"""export single scans to separate ascii files"""
if not HAS_SFDW:
raise ImportError("specfiledatawriter required for this method!!!")
# get keywords arguments
cntx = kws.get("cntx", self.cntx)
csig = kws.get("csig", self.csig)
cmon = kws.get("cmon", self.cmon)
csec = kws.get("csec", self.csec)
norm = kws.get("norm", self.norm)
nscans = _check_scans(scans)
for scn in nscans:
x, y, m, i = self.get_scan(
scan=scn,
scnt=None,
cntx=cntx,
cnty=None,
csig=csig,
cmon=cmon,
csec=csec,
norm=norm,
)
fout = SpecfileDataWriter(
"{0}_S{1}".format(self.fname, str(scn).rjust(3, "0"))
)
fout.wHeader(
epoch=self.sf.epoch(),
date=self.sf.date(),
title="spec2spec",
motnames=self.sf.allmotors(),
)
fout.wScan(
["Energy", "{0}".format(i["zlabel"])],
[x, y],
title="{0}".format(self.sd.command()),
motpos=self.sd.allmotorpos(),
)
# LARCH ###
def _specfiledata_getdoc(method):
"""to get the docstring of method inside a class"""
s = SpecfileData("DUMMY!")
head = "\n Docstring from {0}:\n -------------------\n".format(method)
return head + getattr(getattr(s, method), "__doc__")
def spec_getscan2group(
fname,
scan=None,
cntx=None,
csig=None,
cmon=None,
csec=None,
scnt=None,
norm=None,
_larch=None,
):
"""*** simple mapping of SpecfileData.get_scan() to Larch group ***"""
if _larch is None:
raise Warning("larch broken?")
s = SpecfileData(fname)
group = _larch.symtable.create_group()
group.__name__ = "SPEC data file %s" % fname
x, y, motors, infos = s.get_scan(
scan=scan, cntx=cntx, csig=csig, cmon=cmon, csec=csec, scnt=scnt, norm=norm
)
setattr(group, "x", x)
setattr(group, "y", y)
setattr(group, "motors", motors)
setattr(group, "infos", infos)
return group
spec_getscan2group.__doc__ += _specfiledata_getdoc("get_scan")
def spec_getmap2group(
fname,
scans=None,
cntx=None,
cnty=None,
csig=None,
cmon=None,
csec=None,
xystep=None,
norm=None,
_larch=None,
):
""" *** simple mapping of SpecfileData.get_map() + grid_map () to Larch group *** """
if _larch is None:
raise Warning("larch broken?")
s = SpecfileData(fname)
group = _larch.symtable.create_group()
group.__name__ = "SPEC data file %s" % fname
xcol, ycol, zcol = s.get_map(
scans=scans, cntx=cntx, cnty=cnty, csig=csig, cmon=cmon, csec=csec, norm=norm
)
x, y, zz = s.grid_map(xcol, ycol, zcol, xystep=xystep)
setattr(group, "x", x)
setattr(group, "y", y)
setattr(group, "zz", zz)
return group
spec_getmap2group.__doc__ += _specfiledata_getdoc("get_map")
def spec_getmrg2group(
fname,
scans=None,
cntx=None,
csig=None,
cmon=None,
csec=None,
norm=None,
action="average",
_larch=None,
):
"""*** simple mapping of SpecfileData.get_mrg() to Larch group ***"""
if _larch is None:
raise Warning("larch broken?")
s = SpecfileData(fname)
group = _larch.symtable.create_group()
group.__name__ = "SPEC data file {0}; scans {1}; action {2}".format(
fname, scans, action
)
x, y = s.get_mrg(
scans=scans,
cntx=cntx,
csig=csig,
cmon=cmon,
csec=csec,
norm=norm,
action=action,
)
setattr(group, "x", x)
setattr(group, "y", y)
return group
spec_getmrg2group.__doc__ += _specfiledata_getdoc("get_mrg")
def str2rng_larch(rngstr, keeporder=True, _larch=None):
""" larch equivalent of _str2rng() """
if _larch is None:
raise Warning("larch broken?")
return _str2rng(rngstr, keeporder=keeporder)
str2rng_larch.__doc__ = _str2rng.__doc__
def registerLarchPlugin():
if HAS_SPECFILE:
return (
"_io",
{
"read_specfile_scan": spec_getscan2group,
"read_specfile_map": spec_getmap2group,
"read_specfile_mrg": spec_getmrg2group,
"str2rng": str2rng_larch,
},
)
else:
return ("_io", {})
if __name__ == "__main__":
""" test/examples in examples/specfiledata_test.py """
# test01()
# test02(100)
# test03
pass