Commit 283b0c98 authored by Valentin Valls's avatar Valentin Valls

Implement incremental derivative

parent 8db51283
......@@ -45,9 +45,14 @@ class DerivativeData(NamedTuple):
nb_points: int
class DerivativeItem(plot_model.AbstractComputableItem, plot_item_model.CurveMixIn):
class DerivativeItem(
plot_model.AbstractIncrementalComputableItem, plot_item_model.CurveMixIn
):
"""This item use the scan data to process result before displaying it."""
EXTRA_POINTS = 5
"""Extra points needed before and after a single point to compute a result"""
def __reduce__(self):
return (self.__class__, (), self.__getstate__())
......@@ -67,20 +72,21 @@ class DerivativeItem(plot_model.AbstractComputableItem, plot_item_model.CurveMix
def compute(self, scan: scan_model.Scan) -> Optional[DerivativeData]:
sourceItem = self.source()
x = sourceItem.xArray(scan)
y = sourceItem.yArray(scan)
if x is None or y is None:
xx = sourceItem.xArray(scan)
yy = sourceItem.yArray(scan)
if xx is None or yy is None:
return None
try:
result = mathutils.derivate(x, y)
derived = mathutils.derivate(xx, yy)
except Exception as e:
_logger.debug("Error while computing derivative", exc_info=True)
result = DerivativeData(numpy.array([]), numpy.array([]), len(xx))
raise plot_model.ComputeError(
"Error while creating derivative.\n" + str(e), result=None
"Error while creating derivative.\n" + str(e), result=result
)
return DerivativeData(result[0], result[1], len(x))
return DerivativeData(derived[0], derived[1], len(xx))
def xData(self, scan: scan_model.Scan) -> Optional[scan_model.Data]:
result = self.reachResult(scan)
......@@ -96,6 +102,52 @@ class DerivativeItem(plot_model.AbstractComputableItem, plot_item_model.CurveMix
data = result.yy
return scan_model.Data(self, data)
def incrementalCompute(
self, previousResult: DerivativeData, scan: scan_model.Scan
) -> DerivativeData:
"""Compute a data using the previous value as basis
The derivative function expect 5 extra points before and after the
points it can compute.
The last computed point have to be recomputed.
This code is deeply coupled with the implementation of the derivative
function.
"""
sourceItem = self.source()
xx = sourceItem.xArray(scan)
yy = sourceItem.yArray(scan)
if xx is None or yy is None:
raise ValueError("Non empty data expected")
nb = previousResult.nb_points
if nb == len(xx):
# obviously nothing to compute
return previousResult
nextNb = len(xx)
# The last point have to be recomputed
LAST = 1
if len(xx) <= 2 * self.EXTRA_POINTS + LAST:
return DerivativeData(numpy.array([]), numpy.array([]), nextNb)
if len(previousResult.xx) == 0:
# If there is no previous point, there is no need to compute it
LAST = 0
xx = xx[nb - 2 * self.EXTRA_POINTS - LAST :]
yy = yy[nb - 2 * self.EXTRA_POINTS - LAST :]
derived = mathutils.derivate(xx, yy)
xx = numpy.append(previousResult.xx[:-1], derived[0])
yy = numpy.append(previousResult.yy[:-1], derived[1])
result = DerivativeData(xx, yy, nextNb)
return result
class MaxData(NamedTuple):
max_index: int
......
......@@ -4,6 +4,7 @@ import numpy
from silx.gui import qt
from bliss.flint.model import plot_state_model
from bliss.flint.model import plot_item_model
from bliss.flint.model import plot_model
from bliss.flint.model import scan_model
......@@ -96,4 +97,39 @@ def test_derivative_compute():
item.setSource(curveItem)
result = item.compute(scan)
assert result is not None
assert len(item.xArray(scan)) == len(xx) - 5 * 2
assert (
len(item.xArray(scan))
== len(xx) - plot_state_model.DerivativeItem.EXTRA_POINTS * 2
)
def test_derivative_incremental_compute():
"""Compute the derivative function"""
yy = [0] * 7 + list(range(10)) + list(reversed(range(10))) + [0] * 7
yy = numpy.cumsum(yy)
xx = numpy.arange(len(yy)) * 10
scan = scan_model.Scan()
item = plot_state_model.DerivativeItem()
curveItem = CurveMock(xx=xx, yy=yy)
item.setSource(curveItem)
expected = item.compute(scan)
result = None
for i in [0, 5, 10, 15, 20, 25, len(xx)]:
scan = scan_model.Scan()
item = plot_state_model.DerivativeItem()
curveItem = CurveMock(xx=xx[0:i], yy=yy[0:i])
item.setSource(curveItem)
if result is None:
try:
result = item.compute(scan)
except plot_model.ComputeError as e:
result = e.result
else:
result = item.incrementalCompute(result, scan)
assert result is not None
numpy.testing.assert_array_almost_equal(expected.xx, result.xx, decimal=5)
numpy.testing.assert_array_almost_equal(expected.yy, result.yy)
numpy.testing.assert_array_almost_equal(expected.nb_points, result.nb_points)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment