Commit 8f58b8f6 authored by operator for beamline's avatar operator for beamline
Browse files

wip

parent ea8a5738
......@@ -3,13 +3,15 @@ import sys
import glob
import argparse
import numpy as np
import cv2
import silx
from silx import io as silx_io
parser = argparse.ArgumentParser(description='Convert edf to jpg.')
parser.add_argument('input_f', metavar='input_f', type=str, nargs='+',
help='input')
help='a list of edf files or a directory')
parser.add_argument('-o',
dest='outdir',
metavar='outdir',
......@@ -20,15 +22,23 @@ parser.add_argument('-f',
dest='force',
action='store_true',
default=False,
help='force overwrite')
help='will assume yes if asked about overwriting files')
parser.add_argument('-d',
dest='depth',
default=16,
type=int,
help='depth in bit of the input image (e.g: 12bits for a prosilica)')
args = parser.parse_args()
outdir = args.outdir
outdir = args.outdir[0]
input_f = args.input_f[:]
force = args.force
depth = args.depth
input_files = []
print('================================')
print('================================')
......@@ -41,13 +51,13 @@ if len(input_f) == 1:
input_files = input_f
else:
# list of files
input_files = [f for f in input_f if not os.path.isdir(f)]
input_files = [f for f in input_f if not os.path.isdir(f) and f.endswith('.edf')]
n_input_files = len(input_files)
print(f'Input: {n_input_files} files.')
if len(input_files) == 0:
print("Done.")
print("No files, exiting.")
exit(0)
n_overwrite = 0
......@@ -93,7 +103,8 @@ for i_in_f, in_f in enumerate(input_files):
if len(keys) > 1:
raise RuntimeError(f'Edf file {in_f} has more than one entry.')
image = edf_f[keys[0]]['image']['data'][:]
if image.itemsize != 1:
image = image * ((2**8-1) / (2**(8*image.itemsize) - 1))
if depth != 8:
image = image * ((2**8-1) / (2**(depth) - 1))
img2 = image.astype(np.uint8)
cv2.imwrite(out_f, image)
\ No newline at end of file
cv2.imwrite(out_f, image.astype(np.uint8))
import os
import sys
import csv
import glob
import time
import argparse
from collections import OrderedDict
import numpy as np
import cv2
import silx
import fabio
from silx import io as silx_io
CSV_COLUMN_TITLE = 'SS_CCU_Temp [Kelvin]'
PRINT_RATE = 20
parser = argparse.ArgumentParser(description='Convert edf to jpg.')
parser.add_argument('images', metavar='images', type=str, nargs='+',
help='a list of image files or a directory')
parser.add_argument('temp_csv', metavar='temp_csv', type=str, nargs=1,
help='csv file generated by Nambicon')
parser.add_argument('-o',
dest='output_csv',
nargs=1,
default=None,
help='output file')
parser.add_argument('-f',
dest='force',
action='store_true',
default='',
help='will assume yes if asked about overwriting files')
args = parser.parse_args()
images = args.images[:]
temp_csv = args.temp_csv[0]
output_csv = args.output_csv
force = args.force
input_files = []
if not output_csv:
output_csv = os.path.splitext(temp_csv)[0] + '_pairs.csv'
else:
output_csv = output_csv[0]
print(f'Output file: {output_csv}.')
if os.path.exists(output_csv):
if force:
print(f'WARNING! {output_csv} will be overwritten')
print('User decided to ignore.')
else:
answer = None
while answer not in ("yes", "no", "y", "n"):
print(f'WARNING! {output_csv} will be overwritten. Continue anyway?')
answer = input("Enter yes (y), no (n): ").lower()
if answer in ("yes", "y"):
pass
elif answer in ("no", "n"):
print("Cancelled.")
exit(0)
else:
print("Please enter yes/y or no/n.")
print('================================')
print('================================')
if len(images) == 1:
# directory or file
if os.path.isdir(images[0]):
print(f'Looking for files in directory {images[0]}')
input_files = glob.glob(os.path.join(images[0], '*.cbf'))
else:
input_files = images
else:
# list of files
input_files = [f for f in images if not os.path.isdir(f) and f.endswith('.cbf')]
# we assume that the alphabetical order is the same as the date order
# we will double check anyway
input_files = sorted(input_files)
n_input_files = len(input_files)
print(f'Input: {n_input_files} images.')
print(f'INFO: Using column titled: "{CSV_COLUMN_TITLE}".')
if len(input_files) == 0:
print("No files, exiting.")
exit(0)
print('================================')
print('================================')
print(f'Reading the csv file [{temp_csv}].')
with open(temp_csv) as csv_f:
i = 0
time_line = csv_f.readline().strip()
header = csv_f.readline().rstrip(';\n').split(';')
col_index = header.index(CSV_COLUMN_TITLE)
if col_index < 0:
print("Error, column not found in the header.")
exit(1)
print(f'OK, column found at position {col_index} in the header.')
csv_r = csv.reader(csv_f, delimiter=';')
temperatures = np.array([[float(row[0].replace(',','')), float(row[col_index].replace(',',''))]
for row in csv_r])
# seconds since starting the heating
csv_epochs = temperatures[:, 0]
# temperature values
t_values = temperatures[:, 1]
# seconds since epoch
csv_start_time = time.mktime(time.strptime(time_line, "%m/%d/%Y %I:%M:%S %p"))
csv_epochs += csv_start_time
# very simple way to match image datestamp and temperature times, probably can do
# something more sophisticated, but is is really worth it?
images_info = OrderedDict()
prev_time = 0
for i_img, img_file in enumerate(input_files):
if i_img % PRINT_RATE == 0:
print(f'Reading file {img_file} ({i_img + 1}/{n_input_files}).')
with fabio.open(img_file) as img_f:
exposure = img_f.pilatus_headers['Exposure_time']
header = img_f.header['_array_data.header_contents'].split('\r\n')
# todo: tests to make sure this doesn't change
img_date = header[1]
img_epoch = time.mktime(time.strptime(img_date, "# %Y-%m-%dT%H:%M:%S.%f"))
images_info[img_file] = {'epoch':img_epoch, 'date':img_date, 'exposure':exposure}
if img_epoch < prev_time:
raise RuntimeError('img alphabetical order doesnt follow date order.')
prev_time = img_epoch
print('================================')
print('================================')
first_img = images_info[input_files[0]]
last_img = images_info[input_files[-1]]
print(f'INFO => Temperature ramp started at {time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(csv_epochs[0]))}')
print(f'INFO => First image time stamp is {time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(first_img["epoch"]))}')
print(f'INFO => Temperature ramp stopped at {time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(csv_epochs[-1]))}')
print(f'INFO => Last image time stamp is {time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(last_img["epoch"]))}')
if csv_epochs[0] > first_img["epoch"]:
print('WARNING: the first temperature date is after the first image.')
if csv_epochs[-1] < last_img["epoch"]:
raise print('WARNING: the last temperature date is after the first image.')
with open(output_csv, 'w') as out_f:
writer = csv.writer(out_f, delimiter=';')
writer.writerow(['image', 'csv_temperature_start', 'csv_temperature_end', 'img_epoch', 'csv_epoch_start', 'csv_epoch_end', 'img_exposure'])
csv_row = 0
for i_img, img_file in enumerate(input_files):
# if i_img % PRINT_RATE == 0:
# print(f'Reading file {img_file} ({i_img + 1}/{n_input_files}).')
img_info = images_info[img_file]
img_exposure = img_info['exposure']
img_epoch = img_info['epoch']
csv_epoch_start = ''
csv_temperature_start = ''
csv_epoch_end = ''
csv_temperature_end = ''
img_start = img_epoch - img_exposure
if img_start >= csv_epochs[csv_row]:
while csv_epochs[csv_row] < img_start:
csv_row += 1
csv_epoch_start = csv_epochs[csv_row]
csv_temperature_start = t_values[csv_row]
csv_row -= 1
if img_epoch >= csv_epochs[csv_row]:
while csv_epochs[csv_row] < img_epoch:
csv_row += 1
csv_epoch_end = csv_epochs[csv_row]
csv_temperature_end = t_values[csv_row]
csv_row -= 1
writer.writerow([os.path.basename(img_file), csv_temperature_start, csv_temperature_end, img_epoch, csv_epoch_start, csv_epoch_end, img_exposure])
\ No newline at end of file
import os
import sys
import glob
import argparse
import numpy as np
import cv2
import silx
from silx import io as silx_io
spec_f = '/data/bm20/inhouse/XRD2/2021/2021-08-BLC/ylid-RT2/ylid-RT2.spec'
out_f = os.path.splitext(spec_f)[0] + '.jpr'
# if os.path.exists(out_f):
# raise RuntimeError(f'File {out_f} already exists.')
print(f'Writing to file {out_f}')
with silx.io.open(spec_f) as spec_h5:
keys = list(spec_h5.keys())
if len(keys) != 1:
raise RuntimeError(f'Expected only one scan in file. Found {keys}.')
phi = spec_h5[keys[0]]['measurement/phi'][:]
phi = np.around(phi, decimals=4)
n_images = phi.shape[0]
omega = np.full((n_images,), 90.)
theta = np.zeros((n_images,))
kappa = np.zeros((n_images,))
with open(out_f, 'w') as out_d:
out_d.write(f'KM4ABS version 1.000 # of JPEG images: {n_images}\n')
out_d.write('MICROSCOPE ORIENTATION: ...\n')
for i_img in range(n_images):
line = (f'#{i_img+1:> 6} o:{omega[i_img]:> 11.5f}'
f' t: {theta[i_img]:> 11.5f}'
f' k: {kappa[i_img]:> 11.5f}'
f' p: {phi[i_img]:> 11.5f}\n')
out_d.write(line)
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment