Skip to content
Snippets Groups Projects
Commit 3620a91d authored by Wout De Nolf's avatar Wout De Nolf
Browse files

Merge branch 'fixup_bidirection_detection' into 'main'

Fixup bidirection detection

See merge request !155
parents dbf640e5 8f2b9f87
No related branches found
No related tags found
1 merge request!155Fixup bidirection detection
Pipeline #175011 passed
from typing import Sequence, List
import numpy
def split_piecewise_monotonic(values: Sequence[float]) -> List[slice]:
"""The provided values are piecewise monotonic which means that
it consists of sections that are either monotonically
increasing (∀ i ≤ j, x[i] ≤ x[j]) or monotonically
decreasing (∀ i ≤ j, x[i] ≥ x[j]). Note that a list of identical
values is both monotonically increasing and decreasing.
This method returns the disjoint slices representing monotonic sections.
"""
section_sizes = []
section_slopes = []
values_left = values
while len(values_left):
section_size, section_slope = first_monotonic_size(values_left)
section_sizes.append(section_size)
section_slopes.append(section_slope)
values_left = values_left[section_size:]
monotone_start = [0]
monotone_stop = []
monotone_step = []
last_section_index = len(section_sizes) - 1
stops = numpy.cumsum(section_sizes)
for section_index, (stop, section_slope) in enumerate(zip(stops, section_slopes)):
monotone_step.append(1 if section_slope >= 0 else -1)
if section_index == last_section_index:
monotone_stop.append(stop)
break
# The pivot index is the index at which either
# - slope>=0 with the previous index and slope<0 with the next index
# - slope<=0 with the previous index and slope>0 with the next index
pivot_index = stop - 1
if values[pivot_index - 1] == values[pivot_index]:
# The pivot index ■ is part of the next section
#
# ● ─ ■
# / \ -> same slope up and down
# ● ●
#
monotone_stop.append(stop - 1)
monotone_start.append(stop - 1)
continue
# Determine whether the pivot index is part of this section or
# be part of the next section
slope_before = values[pivot_index] - values[pivot_index - 1]
diff_slope_before = abs(section_slope - slope_before)
slope_after = values[pivot_index + 1] - values[pivot_index]
next_section_slope = section_slopes[section_index + 1]
diff_slope_after = abs(next_section_slope - slope_after)
if diff_slope_after <= diff_slope_before:
# The pivot index ■ is part of the next section
#
# ■
# ● \ -> slope after matches the next section slope better
# / ● then the slope before matches the current section slope
# ● \
# / ●
#
monotone_stop.append(stop - 1)
monotone_start.append(stop - 1)
else:
# The pivot index ■ is part of this section
#
# ■
# / ● -> slope before matches the current section slope better
# ● \ than the slope after matches the next section slope
# / ●
# ● \
#
monotone_stop.append(stop)
monotone_start.append(stop)
return [
create_slice(start, stop, step)
for start, stop, step in zip(monotone_start, monotone_stop, monotone_step)
]
def create_slice(start: int, stop: int, step: int):
if step >= 0:
return slice(start, stop, step)
assert start >= 0
assert stop >= 0
start, stop = stop, start
start -= 1
if stop == 0:
stop = None
else:
stop -= 1
return slice(start, stop, step)
def piecewise_monotonic_interpolation_values(
values: Sequence[float], slices: Sequence[slice]
) -> numpy.ndarray:
piece_min = numpy.inf
piece_max = -numpy.inf
piece_size = 0
for slc in slices:
piece_values = values[slc]
piece_min = min(piece_min, piece_values.min())
piece_max = max(piece_max, piece_values.max())
piece_size = max(piece_size, len(piece_values))
return numpy.linspace(piece_min, piece_max, piece_size)
def mean_nonzero(values: Sequence[float]) -> float:
"""Average non-zero value. Returns `nan` for an empty list and `0.` when
all values are zero.
"""
values = numpy.asarray(values)
if not values.size:
return numpy.nan
non_zero = values != 0
if non_zero.any():
return numpy.mean(values[non_zero])
return 0.0
def first_monotonic_size(values: Sequence[float]) -> int:
"""Determine the length of the first monotonically increasing or decreasing slice
starting from the first value.
"""
slopes = numpy.diff(values)
maxsize = len(values)
if maxsize < 3:
return maxsize, mean_nonzero(slopes)
slope_signs = numpy.sign(slopes)
first_slope_sign = slope_signs[0]
for value_idx, slope_sign in enumerate(slope_signs[1:], 1):
if slope_sign and first_slope_sign and slope_sign != first_slope_sign:
# Non-zero slope changed sign: end of monotonic series
return value_idx + 1, mean_nonzero(slopes[:value_idx])
if not first_slope_sign:
first_slope_sign = slope_sign
return maxsize, mean_nonzero(slopes)
from __future__ import annotations from __future__ import annotations
from typing import List, Tuple
import numpy import numpy
from est import settings from est import settings
from est.io.information import InputInformation from est.io.information import InputInformation
from est.io.utils.read import get_data_from_url from est.io.utils.read import get_data_from_url
from est.core.monotonic import split_piecewise_monotonic
from est.core.monotonic import piecewise_monotonic_interpolation_values
def _identify_subscans(energy: List[float]) -> Tuple[List[slice], numpy.ndarray]:
"""
From a energy array made of several ramps, return the slices defining the ramps/subscans
and an energy array spanning the full energy range.
"""
energy_slope = numpy.diff(energy)
change_in_abs_energy_slope = numpy.diff(numpy.abs(energy_slope))
abs_slope_changed = (
change_in_abs_energy_slope > change_in_abs_energy_slope.max() / 2
)
ramp_start_index: List[int] = (numpy.where(abs_slope_changed)[0] + 1).tolist()
subscan_start_index = [0] + ramp_start_index
subscan_stop_index = ramp_start_index + [len(energy)]
slices = list()
slice_start = None
slice_stop = None
slice_increasing = None
def finish_slice():
if slice_start is None:
return
if slice_increasing:
slices.append(slice(slice_start, slice_stop, 1))
else:
slices.append(slice(slice_stop - 1, slice_start - 1, -1))
energy_min = float("inf")
energy_max = -float("inf")
energy_size = 0
for start, stop in zip(subscan_start_index, subscan_stop_index):
energy_min = min(energy_min, energy[start])
energy_max = max(energy_max, energy[stop - 1])
energy_size = max(energy_size, stop - start)
energy_increasing = energy[stop - 1] >= energy[start]
if energy_increasing == slice_increasing:
# Expand current slice
slice_stop = stop
else:
# Finish previous slice
finish_slice()
# Start new slice
slice_start = start
slice_stop = stop
slice_increasing = energy_increasing
# Finish last slice
finish_slice()
return slices, numpy.linspace(energy_min, energy_max, energy_size)
def read_bidirectional_spectra( def read_bidirectional_spectra(
...@@ -79,7 +26,8 @@ def read_bidirectional_spectra( ...@@ -79,7 +26,8 @@ def read_bidirectional_spectra(
else: else:
config = None config = None
ramp_slices, energy = _identify_subscans(raw_energy) ramp_slices = split_piecewise_monotonic(raw_energy)
energy = piecewise_monotonic_interpolation_values(raw_energy, ramp_slices)
interpolated_spectra = numpy.zeros( interpolated_spectra = numpy.zeros(
(len(energy), len(ramp_slices), 1), dtype=raw_spectra.dtype (len(energy), len(ramp_slices), 1), dtype=raw_spectra.dtype
......
import pytest
from typing import Sequence, List
import numpy
from est.core import monotonic
@pytest.mark.parametrize(
"first_increasing", [True, False], ids=["first_increasing", "first_decreasing"]
)
@pytest.mark.parametrize(
"clip_last_section", [True, False], ids=["clip_last", "no_clip"]
)
@pytest.mark.parametrize(
"offset_fraction", [0, -0.1, 0.1], ids=["no_shift", "shift_down", "shift_up"]
)
@pytest.mark.parametrize("section_size", [3, 50])
@pytest.mark.parametrize("nsections", [1, 2, 3])
def test_split_piecewise_monotonic(
first_increasing, clip_last_section, offset_fraction, section_size, nsections
):
if clip_last_section and section_size <= 6:
pytest.skip("no enough values to clip")
if first_increasing:
step = 1
else:
step = -1
delta_value = 1.0
offset = offset_fraction * delta_value
do_offset = False
values = list()
indices = numpy.arange(section_size)
expected_slices = list()
expected_data = list()
def _append(last):
nonlocal values, step, do_offset
data = indices * delta_value
if do_offset:
data = data + offset
data = data[::step]
if clip_last_section and last:
data = data[: len(data) // 2]
start = len(values)
values.extend(data)
stop = len(values)
expected_slices.append(monotonic.create_slice(start, stop, step))
if step == -1:
expected_data.append(data[::-1])
else:
expected_data.append(data)
step = -step
do_offset = not do_offset
for section_index in range(nsections):
_append(section_index == (nsections - 1))
slices = monotonic.split_piecewise_monotonic(values)
try:
assert len(slices) == nsections
assert slices == expected_slices
for slc, data in zip(slices, expected_data):
numpy.testing.assert_array_equal(values[slc], data)
except AssertionError:
# Keep this for manual debugging
# _plot_piecewise_monotonic(values, section_size, slices)
raise
def _plot_piecewise_monotonic(
values: Sequence[float], section_size: int, slices: List[slice]
):
import matplotlib.pyplot as plt
npoints = len(values)
nsections = npoints // section_size + bool(npoints % section_size)
for isection in range(nsections):
start = isection * section_size
stop = start + section_size
y = values[start:stop]
x = start + numpy.arange(len(y))
plt.plot(x, y, "o-", color="green")
x = [slc.start for slc in slices]
y = [values[slc.start] for slc in slices]
plt.plot(x, y, "o", color="red")
plt.show()
def test_mean_nonzero():
assert numpy.isnan(monotonic.mean_nonzero([]))
assert monotonic.mean_nonzero([0]) == 0
assert monotonic.mean_nonzero([0, 0, 1]) == 1
assert monotonic.mean_nonzero([0, 0, 1, 2]) == 1.5
def test_first_monotonic_size():
maxsize, avg_slope = monotonic.first_monotonic_size([])
assert maxsize == 0
assert numpy.isnan(avg_slope)
maxsize, avg_slope = monotonic.first_monotonic_size([0])
assert maxsize == 1
assert numpy.isnan(avg_slope)
maxsize, avg_slope = monotonic.first_monotonic_size([0, 0])
assert maxsize == 2
assert avg_slope == 0
maxsize, avg_slope = monotonic.first_monotonic_size([1, 1])
assert maxsize == 2
assert avg_slope == 0
maxsize, avg_slope = monotonic.first_monotonic_size([0, 1])
assert maxsize == 2
assert avg_slope == 1
maxsize, avg_slope = monotonic.first_monotonic_size([0, 1, 0])
assert maxsize == 2
assert avg_slope == 1
maxsize, avg_slope = monotonic.first_monotonic_size([0, -1])
assert maxsize == 2
assert avg_slope == -1
maxsize, avg_slope = monotonic.first_monotonic_size([0, -1, 0])
assert maxsize == 2
assert avg_slope == -1
maxsize, avg_slope = monotonic.first_monotonic_size([0, -1, -2, -2])
assert maxsize == 4
assert avg_slope == -1
maxsize, avg_slope = monotonic.first_monotonic_size([0, -1, -2, -2, -1])
assert maxsize == 4
assert avg_slope == -1
maxsize, avg_slope = monotonic.first_monotonic_size([0, 0, 1])
assert maxsize == 3
assert avg_slope == 1
maxsize, avg_slope = monotonic.first_monotonic_size([0, 0, 1, 2, 1])
assert maxsize == 4
assert avg_slope == 1
maxsize, avg_slope = monotonic.first_monotonic_size([0, 0, -1])
assert maxsize == 3
assert avg_slope == -1
maxsize, avg_slope = monotonic.first_monotonic_size([0, 0, -1, -2, -1])
assert maxsize == 4
assert avg_slope == -1
def test_create_slice():
values = numpy.arange(10)
forward_slc = monotonic.create_slice(2, 5, 1)
backward_slc = monotonic.create_slice(2, 5, -1)
numpy.testing.assert_array_equal(values[forward_slc][::-1], values[backward_slc])
forward_slc = monotonic.create_slice(2, len(values), 1)
backward_slc = monotonic.create_slice(2, len(values), -1)
numpy.testing.assert_array_equal(values[forward_slc][::-1], values[backward_slc])
forward_slc = monotonic.create_slice(0, len(values) + 1, 1)
backward_slc = monotonic.create_slice(0, len(values) + 1, -1)
numpy.testing.assert_array_equal(values[forward_slc][::-1], values[backward_slc])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment