Commit c5dd51d8 authored by operator ID01's avatar operator ID01
Browse files

speclientwrapper

parent ce4a3c44
import configparser
import os
from scipy.spatial.distance import pdist, squareform
import numpy as np
import transforms3d as tfs
import id01lib as id01
# originally written by SL to work with NFFA coordinate system transformation between instruments.
# TODO skip deleted groups in spec grabber
# could be turned into a class one day.
# parse the config file to something python can parse in a uniform way
def parseTXT2config(file_name): # use scale =1000 for microns
infile=open(file_name,'r')
lines=infile.readlines()
file_name_cfg=file_name.split('.')[0]+'.cfg'
outfile=open(file_name_cfg,'w')
# split the StagMapPositions into identifiable sections
ID=0
for line in lines:
if line.find('StageMapPosition')==1:
outfile.write('[Position_%i]\n'%ID)
ID+=1
else:
outfile.write(line)
infile.close()
outfile.close()
# parse with configparser and organise correctly
_config = configparser.ConfigParser()
_config.read(file_name_cfg)
_config.remove_section('StagePositionMapFile')
for section in _config.sections():
section_newname = _config.get(section,'positionname')
_config.add_section(section_newname)
for option,value in _config.items(section):
if option !='positionname':
_config.set(section_newname, option, value)
_config.remove_section(section)
# write the config parser file
outfile=open(file_name_cfg,'w')
_config.write(outfile)
outfile.close()
'm'
def get_config(file_name_cfg):
_config = configparser.ConfigParser()
_config.read(file_name_cfg)
return _config
def get_pos_config(file_name_cfg,pos_name):
_config = configparser.ConfigParser()
_config.read(file_name_cfg)
A=np.array([0,0])
for section in _config.sections():
print(section)
if section == pos_name:
A=np.vstack((A,np.array([np.float(_config.get(section,'xcoordinate')),np.float(_config.get(section,'ycoordinate'))])))
return pos_name, A[1:]
def get_pars_pos_config(file_name_cfg,pos_name,scale=1.0): # scale=1.0 assumes metres
_config = configparser.ConfigParser()
_config.read(file_name_cfg)
par=dict(_config.items(pos_name))
names=[]
formats=[]
values=[]
for entry in par:
names.append(entry)
formats.append('f8')
values.append(np.float(par[entry])*scale)
dtype = dict(names = names, formats=formats)
array = np.array(tuple(values), dtype=dtype)
return array
def gen_distmatrix4mpoints(file_name_cfg):
_config = configparser.ConfigParser()
_config.read(file_name_cfg)
A=np.array([0,0])
for section in _config.sections():
A=np.vstack((A,np.array([np.float(_config.get(section,'xcoordinate')),np.float(_config.get(section,'ycoordinate'))])))
dist_mat = squareform(pdist(A[1:], metric="euclidean"))
return dist_mat
def gen_matrix4mpoints(file_name_cfg):
_config = configparser.ConfigParser()
_config.read(file_name_cfg)
A=np.array([0,0])
for section in _config.sections():
A=np.vstack((A,np.array([np.float(_config.get(section,'xcoordinate')),np.float(_config.get(section,'ycoordinate'))])))
Xarr=A[1:].copy()
Xarr[:,1]=0
Yarr=A[1:].copy()
Yarr[:,0]=0
# workaround set alternate axis= 0 to get individual components
Xdist_mat = squareform(pdist(Xarr, metric="euclidean"))
Ydist_mat = squareform(pdist(Yarr, metric="euclidean"))
outarr = np.zeros((Xdist_mat.shape[0],Xdist_mat.shape[1],2))
outarr[:,:,0]=Xdist_mat
outarr[:,:,1]=Ydist_mat
#X=squareform(pdist(A[0,1:], metric="euclidean"))
#Y=squareform(pdist(A[1,1:], metric="euclidean"))
#return X,Y,dist_mat
return outarr
def apply_corr_factors(file_name_cfg,angle,x_factor,y_factor, outfile_append='_scaled.cfg'):
#apply directly to the config file - convert to matrix and offset * multiply
_config = configparser.ConfigParser()
_config.read(file_name_cfg)
for section in _config.sections():
# read config values
X,Y = np.float(_config.get(section,'xcoordinate')),np.float(_config.get(section,'ycoordinate'))
# apply rotation and mutiplaction factor in matrix form so can take any type of correction in future
#newmat = (Rz(angle) @ np.array([X,Y,0])) @ np.array([[x_factor,0,0],[0,y_factor,0],[0,0,0]])
newmat = np.dot(np.dot(Rz(angle),np.array([X,Y,0])),np.array([[x_factor,0,0],[0,y_factor,0],[0,0,0]]))
# overwrite existing section in config
_config.set(section,'xcoordinate',str(newmat[0]))
_config.set(section,'ycoordinate',str(newmat[1]))
# write the new config parser file with '_scaled.cfg' appended to the name
out_fn = file_name_cfg.split('.')[0]+outfile_append
outfile=open(out_fn,'w')
_config.write(outfile)
outfile.close()
return gen_matrix4mpoints(out_fn)
def Rz(angle):
return tfs.euler.euler2mat(0,0,np.deg2rad(angle))
def Ry(angle):
return tfs.euler.euler2mat(0,np.deg2rad(angle),0)
def Rx(angle):
return tfs.euler.euler2mat(np.deg2rad(angle),0,0)
def SIFT_find_correction_matrix():
pass
def gen_output_file(file_name_cfg,out_fn='result.txt',array_id='test'):
_config = configparser.ConfigParser()
_config.read(file_name_cfg)
array = gen_matrix4mpoints(file_name_cfg)
outfile=open(out_fn,'a+')
outfile.write(array_id+'\t')
[outfile.write(section+'\t') for section in _config.sections()]
outfile.write('\n')
for i,section in enumerate(_config.sections()):
outfile.write(section+'\t')
for j in np.arange(array.shape[0]):
outfile.write('(%.6f,%.6f)\t'%(array[i,j,0],array[i,j,1]))
outfile.write('\n')
outfile.close()
def get_group_from_spec(groupname,group_dict):
mot_names=[]
outdict={}
for j in range(1,int(group_dict[groupname]['number'])+1):
mot_names.append(group_dict[groupname]['motor_%i'%j])
for i in range(1,int(group_dict[groupname]['positions'])+1):
pos=[float(x) for x in group_dict[groupname]['position_%i'%i].split()]
formats=['f8']*len(pos)
dtype = dict(names = mot_names, formats=formats)
outdict[group_dict[groupname][str(i)]]=np.array(tuple(pos), dtype=dtype)
return outdict
if __name__ == '__main__':
input_file_name = "SampleonSTMholder.txt"
# parse the txt file in a sensible way
parseTXT2config(input_file_name)
file_name_cfg = "SampleonSTMholder.cfg"
#X,Y,dist_mat=gen_matrix4mpoints(file_name_cfg)
mat=gen_matrix4mpoints(file_name_cfg)
#mat_scaled=apply_corr_factors(file_name_cfg,90,1,1,outfile_append='_scaled.cfg')
output_file_name = "result_file.txt"
gen_output_file(file_name_cfg,out_fn=output_file_name,array_id='MGERMANY')
#file_name_cfg = "SampleonSTMholder_scaled.cfg"
#gen_output_file(file_name_cfg,out_fn=output_file_name,array_id='MFRANCE')
#!/usr/bin/env python
# taken from pscan_align (Steven Leake)
# reduced by Carsten Richter
# will be extended again later
# wishlist/todo:
# TODO: add shexacor to specfile inspector, complete spec interface here + test
# -
import os
import collections
import numpy as np
from silx.io.specfile import SpecFile
from id01lib.plot.interactive import GenericIndexTracker, GenericIndexTracker1D
from id01lib.process import SpecClientWrapper
ScanRange = collections.namedtuple("ScanRange", ['name', 'start', 'stop', 'numpoints'])
class PScanTracker(GenericIndexTracker):
"""
This tracker child does the connection to spec
"""
def __init__(self, ax, specclient, norm="linear",
quantum=1., transposed=True, exit_onclick=False, rectangle_onclick=False):
"""
Class to fill a matplotlib axes with with data
from fast spec scans (`pscan`)
Inputs:
ax : matplotlib.Axes
axes used to show data
specclient : SpecClientWrapper.SpecClientSession
a wrapped SpecClient instance to handle
communication with spec
norm : str, float
normalization of the colormap
"""
if not isinstance(specclient, SpecClientWrapper.SpecClientSession):
raise ValueError("Need `SpecClientWrapper.SpecClientSession` "
"instance as second argument.")
# get command
self.pscan_vars = pscan_vars = specclient.get_sv("PSCAN_ARR")
self.specclient = specclient
self.command = cmd = pscan_vars["header/cmd"].split()
print(cmd)
lima_roi, device = specclient.find_roi_list()
# pscan motor name, start, stop, numpoints
m1 = ScanRange(cmd[1], float(cmd[2]), float(cmd[3]), int(cmd[4]))
m2 = ScanRange(cmd[5], float(cmd[6]), float(cmd[7]), int(cmd[8]))
self.x, self.y = (m2, m1) if transposed else (m1, m2)
self.transposed = transposed # to look similar to pymca
data = self.load_data()
self._args = norm, quantum, transposed
super(PScanTracker, self).__init__(ax, data, norm, quantum, exit_onclick)
self.ax.set_xlabel(self.x.name)
self.ax.set_ylabel(self.y.name)
self.set_extent(self.x.start, self.x.stop, self.y.start, self.y.stop)
self.rois = lima_roi #['%i'%(i+1)] for i in range(self.slices)]
self.set_axes_properties(title=self.rois)
basename = os.path.basename(pscan_vars["file"])
scan_no = pscan_vars["scan_no"]
self.start_time = start_time = pscan_vars["timestamp/_pscan_doscan01"]
title = "File: %s; Scan: %s; %s"%(basename, scan_no, start_time)
if hasattr(self, "figtitle"):
self.figtitle.set_text(title)
else:
self.figtitle = self.fig.suptitle(title)
def load_data(self):
data = self.specclient.get_sv("PSCAN_ROICOUNTER_DATA")[1:]
try:
data = data.reshape((-1, self.x.numpoints, self.y.numpoints, 7))[:,:,:,2] # what is this 7?? answer: 2 is the sum, there are other things like max.
except ValueError:
print("Warning: reshape failed.")
return self.data
if self.transposed:
data = data.transpose(0, 2, 1)
if not (data>0).any():
return self.data
return data
def re_init(self):
return self.__init__(self.ax, self.specclient, *self._args)
def reload(self):
"""
To be called regularly in pscan_live.
"""
# check if new scan was started:
pscan_vars = pscan_vars = self.specclient.get_sv("PSCAN_ARR")
start_time = pscan_vars["timestamp/_pscan_doscan01"]
if start_time!=self.start_time:
#print(scan_no,self.scan_no,basename,self.basename)
#self.specclient.varcache.pop("PSCAN_ARR")
self.re_init()
else:
self.data = self.load_data()
super(PScanTracker, self).update()
class Annotate(GenericIndexTracker):
"""
This tracker child does the connection to spec
"""
def __init__(self, ax, specclient, norm="linear",
quantum=1., transposed=True, exit_onclick=False, rectangle_onclick=False):
"""
Class to fill a matplotlib axes with with data
from fast spec scans (`pscan`)
Inputs:
ax : matplotlib.Axes
axes used to show data
specclient : SpecClientWrapper.SpecClientSession
a wrapped SpecClient instance to handle
communication with spec
norm : str, float
normalization of the colormap
"""
if not isinstance(specclient, SpecClientWrapper.SpecClientSession):
raise ValueError("Need `SpecClientWrapper.SpecClientSession` "
"instance as second argument.")
super(Annotate, self).__init__(ax, norm, quantum,
exit_onclick, rectangle_onclick)
class PScanTracker1D(GenericIndexTracker1D):
"""
This tracker child does the connection to spec
"""
def __init__(self, ax, specclient, norm="linear",
quantum=1., transposed=True, exit_onclick=False, rectangle_onclick=False):
"""
Class to fill a matplotlib axes with with data
from fast spec scans (`pscan`)
Inputs:
ax : matplotlib.Axes
axes used to show data
specclient : SpecClientWrapper.SpecClientSession
a wrapped SpecClient instance to handle
communication with spec
norm : str, float
normalization of the colormap
"""
if not isinstance(specclient, SpecClientWrapper.SpecClientSession):
raise ValueError("Need `SpecClientWrapper.SpecClientSession` "
"instance as second argument.")
# get command
self.pscan_vars = pscan_vars = specclient.get_sv("PSCAN_ARR")
self.specclient = specclient
self.command = cmd = pscan_vars["header/cmd"].split()
print(cmd)
lima_roi, device = specclient.find_roi_list()
# pscan motor name, start, stop, numpoints
m1 = ScanRange(cmd[1], float(cmd[2]), float(cmd[3]), int(cmd[4]))
m2 = ScanRange(cmd[5], float(cmd[6]), float(cmd[7]), int(cmd[8]))
self.x, self.y = (m2, m1) if transposed else (m1, m2)
self.transposed = transposed # to look similar to pymca
data = self.load_data()
self._args = norm, quantum, transposed
super(PScanTracker1D, self).__init__(ax, data, norm, quantum, exit_onclick)
self.ln[0].set_xdata(np.arange(float(cmd[2]),float(cmd[3]),(float(cmd[3])-float(cmd[2]))/int(cmd[4])))
self.ax.set_xlim(float(cmd[2]),float(cmd[3]))
self.ax.set_xlabel(self.y.name)
self.ax.set_ylabel("Intensity")
self.rois = lima_roi #['%i'%(i+1)] for i in range(self.slices)]
self.set_axes_properties(title=self.rois)
basename = os.path.basename(pscan_vars["file"])
scan_no = pscan_vars["scan_no"]
self.start_time = start_time = pscan_vars["timestamp/_pscan_doscan01"]
title = "File: %s; Scan: %s; %s"%(basename, scan_no, start_time)
if hasattr(self, "figtitle"):
self.figtitle.set_text(title)
else:
self.figtitle = self.fig.suptitle(title)
def load_data(self):
data = self.specclient.get_sv("PSCAN_ROICOUNTER_DATA")[1:]
try:
data = data.reshape((-1, self.x.numpoints, self.y.numpoints, 7))[:,:,:,2] # what is this 7?? answer: 2 is the sum, there are other things like max.
except ValueError:
print("Warning: reshape failed.")
return self.data
if self.transposed:
data = data.transpose(0, 2, 1)
if not (data>0).any():
return self.data
return data
def re_init(self):
return self.__init__(self.ax, self.specclient, *self._args)
def reload(self):
"""
To be called regularly in pscan_live.
"""
# check if new scan was started:
pscan_vars = pscan_vars = self.specclient.get_sv("PSCAN_ARR")
start_time = pscan_vars["timestamp/_pscan_doscan01"]
if start_time!=self.start_time:
#print(scan_no,self.scan_no,basename,self.basename)
self.specclient.varcache.pop("PSCAN_ARR")
self.re_init()
else:
self.data = self.load_data()
super(PScanTracker1D, self).update()
class PScanTrackerSpecfile(GenericIndexTracker):
"""
This tracker child does the connection to spec
"""
def __init__(self, ax,
spec_fn,
scan_no,
norm="linear",
quantum=1.,
rois=[],
transposed=True,
monitor=None,
exit_onclick=False,
ignore_cols=None,
rectangle_onclick=False):
if isinstance(scan_no, int):
scan_no = "%i.1"%scan_no
elif scan_no.isdigit():
scan_no = "%s.1"%scan_no
self.spec_fn = spec_fn
self.scan_no = scan_no
scans = SpecFile(self.spec_fn)
try:
index=scans.keys().index(self.scan_no)
except:
raise ValueError('%s not in specfile (%s)'%(self.scan_no, self.spec_fn) \
+ ' --> available scans: %s'%", ".join(k for k in scans.keys()))
self.scan = scan = scans[index]
# better way of finding the counters of interest?
if ignore_cols is None:
ignore_cols = ['timer','imgnr','adcX','adcY','adcZ','adc3']
if not rois:
rois = [l for l in scan.labels if l not in ignore_cols]
self.rois = rois
command = scan.scan_header_dict["S"]
command = command.split()[1:]
m1 = ScanRange(command[1], float(command[2]), float(command[3]), int(command[4]))
m2 = ScanRange(command[5], float(command[6]), float(command[7]), int(command[8]))
self.x, self.y = (m2, m1) if transposed else (m1, m2)
self.transposed = transposed # to look similar to pymca?
data = np.stack((scan.data_column_by_name(roi) for roi in rois if roi!=monitor))
if not monitor is None:
monitor = scan.data_column_by_name(monitor)
data /= monitor
#data = data.reshape((-1, m2.numpoints, m1.numpoints)) # slow to fast axes
data.resize((len(rois)-(1 if monitor else 0), m2.numpoints, m1.numpoints)) # slow to fast axes
if transposed:
data = data.transpose(0, 2, 1)
self._args = norm, quantum, transposed # maybe not necessary
super(PScanTrackerSpecfile, self).__init__(ax, data, norm, quantum,exit_onclick,rectangle_onclick)
self.ax.set_xlabel(self.x.name)
self.ax.set_ylabel(self.y.name)
self.set_extent(self.x.start, self.x.stop, self.y.start, self.y.stop)
self.set_axes_properties(title=self.rois)
self.start_time = start_time = scan.scan_header_dict['D']
title = "File: %s; Scan: %s; %s"%(spec_fn, scan_no, start_time)
if hasattr(self, "figtitle"):
self.figtitle.set_text(title)
else:
self.figtitle = self.fig.suptitle(title)
"""
A wrapper for the SpecClient (python2) package which is optimized
for ID01
"""
# 20170611
# SL - Library of functions for interaction with spec variables with python
#
# to be integrated into existing scripts - *_generic.py
#
# BEWARE: generally this is dangerous as it is run in the background without specs knowledge
#
# TODO: clean str() calls in all dependent files
#
################### le code #################
SpecCommand = None
SpecVariable = None
try:
from SpecClient_gevent import SpecVariable
from SpecClient_gevent import SpecCommand
except ImportError:
pass
try:
from SpecClient import SpecVariable
from SpecClient import SpecCommand
except ImportError:
pass
if SpecCommand is None or SpecVariable is None:
print("Warning: Could not import SpecClient module")
import numpy as np
class SpecClientSession(object):
def __init__(self, sv_limaroi = 'LIMA_ROI',
sv_limadev = 'LIMA_DEV',
specname ='nano3:psic_nano',
verbose=True):
self.sv_limaroi = sv_limaroi
self.sv_limadev = sv_limadev
self.specname = specname
self.device=''
self.varcache = dict()
self.speccmd = SpecCommand.SpecCommand('',self.specname)
self.verbose = verbose
def get_sv(self, sv):
#if sv not in self.varcache:
# _sv = SpecVariable.SpecVariable(sv,self.specname)
# self.varcache[sv] = _sv
#else:
# _sv = self.varcache[sv]
_sv = SpecVariable.SpecVariable(sv,self.specname)
#if self.verbose:
# print('polling %s'%sv)
return _sv.getValue()
def send_sc(self,sc):
return self.speccmd.executeCommand(str(sc))
def set_sv(self, sv, sv_value):
self.send_sc(sv+'='+str(sv_value))
return self.get_sv(sv)
def get_motor(self,mot_nm):
_mot_no = int(self.get_sv(mot_nm))
_mot_pos = float(self.get_sv("A[%i]"%_mot_no)['%i'%_mot_no])
return _mot_pos
def find_roi_list(self):
'''