Commit 1b2517f4 authored by Alejandro Homs Puron's avatar Alejandro Homs Puron Committed by operator for beamline

dump_eiger_calib: allow specifying arbitrary calib. files

parent de1061a6
......@@ -2,9 +2,12 @@
import sys
import os
import getopt
import stat
import re
import numpy as np
from collections import OrderedDict
from EdfFile import EdfFile
dacs = [
'vsvp',
......@@ -23,7 +26,7 @@ dacs = [
'vcp',
'vcn',
'vis',
'iodelay'
'iodelay',
]
dtype = np.dtype('uint32')
......@@ -36,45 +39,7 @@ chip_pixels = chip_size ** 2
nb_chips = 4
val_bytes = dtype.itemsize
def usage():
prog_name = os.path.basename(sys.argv[0])
print "Usage %s <config_fname> <energy> <output_dir>" % prog_name
exit(1)
try:
config_fname = sys.argv[1]
energy = sys.argv[2]
output_dir = sys.argv[3]
except:
usage()
host_names = None
settings_dir = None
config_file = open(config_fname)
for l in config_file.readlines():
token = l.strip().split()
if not token or token[0].startswith('#'):
continue
if token[0] == 'hostname':
host_names = token[1].split('+')[:-1]
elif token[0] == 'settingsdir':
settings_dir = token[1]
if None in [host_names, settings_dir]:
raise RuntimeError('Invalid config: %s' % config_file)
nb_det_mod = len(host_names)
sn_list = [n[-3:] for n in host_names]
cal_fnames = ['noise.sn%s' % sn for sn in sn_list]
cal_dir = os.path.join(settings_dir, 'standard', '%seV' % energy)
trim_arr = np.zeros((nb_det_mod, chip_size, nb_chips * chip_size), dtype)
dacs_arr = [[] for i in range(nb_head_vals)]
for i, fname in enumerate(cal_fnames):
cal_fname = os.path.join(cal_dir, fname)
def read_cal_file(cal_fname):
size = os.stat(cal_fname).st_size
ok = (size % val_bytes == 0)
if ok:
......@@ -87,23 +52,151 @@ for i, fname in enumerate(cal_fnames):
s = cal_file.read(size)
d = np.fromstring(s, dtype)
dac_vals, trim_vals = np.split(d, np.array((nb_head_vals,)))
for dac_val, dac_list in zip(dac_vals, dacs_arr):
dac_list.append(dac_val)
trim_bits = trim_vals.reshape((chip_size, nb_chips * chip_size))
trim_arr[i] = trim_bits[::-1]
return dac_vals, trim_bits
def read_cal_file_list(cal_fnames):
nb_det_mod = len(cal_fnames)
trim_arr = np.zeros((nb_det_mod, chip_size, nb_chips * chip_size), dtype)
dacs_arr = [[] for i in range(nb_head_vals)]
for i, cal_fname in enumerate(cal_fnames):
dac_vals, trim_bits = read_cal_file(cal_fname)
for dac_val, dac_list in zip(dac_vals, dacs_arr):
dac_list.append(dac_val)
trim_arr[i] = trim_bits[::-1]
return dacs_arr, trim_arr
def get_sn_list(re_obj, name_list):
sn_list = [re_obj.match(n).group('sn') for n in name_list if re_obj.match(n)]
return sn_list if len(sn_list) == len(name_list) else []
def write_trim_edf(fname, head, dacs_arr, trim_arr):
if os.path.exists(fname):
os.unlink(fname)
head.update([(n, str(l)) for n, l in zip(head_keys, dacs_arr)])
ofile = EdfFile(fname)
nb_det_mod = trim_arr.shape[0]
final_shape = (nb_det_mod * chip_size, nb_chips * chip_size)
ofile.WriteImage(head, trim_arr.reshape(final_shape))
from EdfFile import EdfFile
def usage():
prog_name = os.path.basename(sys.argv[0])
print ""
print "Usage %s [options]" % prog_name
print ""
print " Options:"
print " -o <output_dir> Output directory"
print " -i <cal_fnames> Calibration files to read, comma-separated"
print " -c <config_fname> Config file with detector specification"
print " -e <energy> Energy settings to be used (needs -c)"
print ""
exit(1)
fname = 'Trimbits_%seV_beb%s-%s.edf' % (energy, sn_list[0], sn_list[-1])
ocal_fname = os.path.join(output_dir, fname)
if os.path.exists(ocal_fname):
os.unlink(ocal_fname)
ocal_head = OrderedDict(host_names=str(host_names),
config_file=config_fname,
settings_dir=settings_dir,
energy=energy)
ocal_head.update([(n, str(l)) for n, l in zip(head_keys, dacs_arr)])
ocal_file = EdfFile(ocal_fname)
final_shape = (nb_det_mod * chip_size, nb_chips * chip_size)
ocal_file.WriteImage(ocal_head, trim_arr.reshape(final_shape))
def get_args():
output_dir = os.path.curdir
cal_fnames = None
config_fname = None
energy = None
try:
opts, args = getopt.getopt(sys.argv[1:], "o:i:c:e:")
for opt, val in opts:
if opt == '-o':
output_dir = val
if opt == '-i':
cal_fnames = val.split(',')
if opt == '-c':
config_fname = val
if opt == '-e':
energy = val
if cal_fnames is not None:
if energy is not None:
raise ValueError('Only one of energy/cal_fnames can be specified')
elif energy is None:
raise ValueError('Must specify one of energy/cal_fnames')
elif config_fname is None:
raise ValueError('Must specify config_fname with energy');
except Exception, e:
if isinstance(e, ValueError):
print e
usage()
return dict(output_dir=output_dir, cal_fnames=cal_fnames,
config_fname=config_fname, energy=energy)
def main():
args = get_args()
output_dir = args['output_dir']
cal_fnames = args['cal_fnames']
config_fname = args['config_fname']
energy = args['energy']
host_names = None
settings_dir = None
out_fname = None
cal_sn_list = []
if cal_fnames:
cal_re_obj = re.compile('^.+\\.sn(?P<sn>[0-9]{3})$')
cal_sn_list = get_sn_list(cal_re_obj, cal_fnames)
if config_fname:
config_file = open(config_fname)
for l in config_file.readlines():
token = l.strip().split()
if not token or token[0].startswith('#'):
continue
if token[0] == 'hostname':
host_names = token[1].split('+')[:-1]
elif token[0] == 'settingsdir':
settings_dir = token[1]
if None in [host_names, settings_dir]:
raise RuntimeError('Invalid config: %s' % config_file)
nb_det_mod = len(host_names)
beb_re_obj = re.compile('beb(?P<sn>[0-9]{3})')
sn_list = get_sn_list(beb_re_obj, host_names)
if not sn_list:
raise RuntimeError('Can not get S/N from detector host names')
if energy is not None:
cal_dir = os.path.join(settings_dir, 'standard', '%seV' % energy)
cal_fnames = [os.path.join(cal_dir, 'noise.sn%s' % sn)
for sn in sn_list]
out_fname = '%seV_beb%s-%s' % (energy, sn_list[0], sn_list[-1])
else:
if len(cal_fnames) != nb_det_mod:
raise RuntimeError('Expected %d input files, got %d' %
(nb_det_mod, len(cal_fnames)))
ordered_names = [cal_fnames[cal_sn_list.index(sn)] for sn in sn_list
if sn in cal_sn_list]
if len(ordered_names) == nb_det_mod:
cal_fnames = ordered_names
cal_sn_list = sn_list
dacs_arr, trim_arr = read_cal_file_list(cal_fnames)
if out_fname is None:
if cal_sn_list:
out_fname = 'beb%s-%s' % (cal_sn_list[0], cal_sn_list[-1])
else:
out_fname = 'calib'
if host_names is None and cal_sn_list:
host_names = ['beb%s' % sn for sn in cal_sn_list]
ocal_fname = os.path.join(output_dir, 'Trimbits_%s.edf' % out_fname)
ocal_head = OrderedDict(host_names=str(host_names),
config_file=config_fname,
settings_dir=settings_dir,
energy=energy,
calib_files=str(cal_fnames))
write_trim_edf(ocal_fname, ocal_head, dacs_arr, trim_arr)
if __name__ == '__main__':
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment