Skip to content
Snippets Groups Projects
Commit a0c3b9c3 authored by payno's avatar payno
Browse files

[CI] set back the luigi notebook for testing

parent c3b0557d
No related branches found
No related tags found
No related merge requests found
......@@ -118,7 +118,7 @@ test:test-tomwer-tutorials:
script:
- /usr/bin/xvfb-run --server-args="-screen 0 1024x768x24" -a jupyter nbconvert --to notebook --execute doc/tutorials/workflow_operation.ipynb
#
#test:test-luigi:
# type: test
# image: docker-registry.esrf.fr/dau/tomwer:python3.5_stretch_pyqt5
......@@ -141,4 +141,5 @@ test:test-tomwer-tutorials:
# - python -m pip install .
# - python -m pip install luigi
# script:
# - luigid --background
# - /usr/bin/xvfb-run --server-args="-screen 0 1024x768x24" -a jupyter nbconvert --to notebook --execute doc/tutorials/workflow_luigi.ipynb
%% Cell type:markdown id: tags:
# setting server for launching luigi task
The first step is to launch the luigi server in order to be able to execute and link Tasks
%% Cell type:code id: tags:
``` python
import tempfile
from unittest import mock
class ServerClient(object):
def __init__(self):
self.tempdir = tempfile.mkdtemp()
self.port = 8082
@mock.patch('daemon.DaemonContext')
def run_server(self, daemon_context):
luigi.cmdline.luigid([
'--port', str(self.port),
'--background', # This makes it a daemon
'--logdir', self.tempdir,
'--pidfile', os.path.join(self.tempdir, 'luigid.pid')
])
def scheduler(self):
return luigi.rpc.RemoteScheduler('http://localhost:' + str(self.port))
```
%% Cell type:code id: tags:
``` python
import os
import luigi
import luigi.cmdline
import time
import tempfile
import multiprocessing
server_client = None
scheduler = None
process = None
rm_proxy = {}
def remove_proxy():
proxy = {}
for proxy_key in ('http_proxy', 'https_proxy'):
if proxy_key in os.environ:
rm_proxy[proxy_key] = os.environ[proxy_key]
proxy[proxy_key] = os.environ[proxy_key]
del os.environ[proxy_key]
return proxy
def reset_proxy():
for proxy_key, proxy_value in rm_proxy.items():
os.environ[proxy_key] = proxy_value
return os.environ
def _start_server():
global scheduler
global process
process = multiprocessing.Process(
target=server_client.run_server
)
process.start()
time.sleep(0.1) # wait for server to start
scheduler = server_client.scheduler()
scheduler._wait = lambda: None
def start_server():
global server_client
proxy = remove_proxy()
server_client = ServerClient()
state_path = tempfile.mktemp(suffix='server_test')
luigi.configuration.get_config().set('scheduler', 'state_path', state_path)
_start_server()
def stop_server():
global process
process.terminate()
process.join(timeout=1)
if process.is_alive():
os.kill(process.pid, signal.SIGKILL)
```
%% Cell type:code id: tags:
``` python
# launch the luigi server
# start_server()
```
%% Cell type:markdown id: tags:
*note: for now jupyter notebook does not allow to run several `IOLoop`. So until this is fixed, execute the following command in a ternminal:*
```bash
sudo luigid --background
```
**warning: unset any proxy (http, https) if existing**
This should allow you to continue the tutorial.
%% Cell type:code id: tags:
``` python
# stop_server()
```
%% Cell type:markdown id: tags:
# Luigi Task
Luigi allow to execute severals Tasks.
Those tasks can be linked together
To adapt workflows we are making differentiation between 'Single' process (one shot execution) and enfless process (such as the `datawatcher` which is a background process observing for some data)
%% Cell type:markdown id: tags:
## Simple Task execution
The first step is to execute a single task with no dependancy. To do so we can implement 'SingleProcessTask' and 'EndlessProcessTask', respectivly with class inheriting from SingleProcess and EndlessProcess
%% Cell type:markdown id: tags:
### SingleProcessTask
In this example we will create a LuigiSingleProcess which will instancy and execute a 'DarkRef' process
%% Cell type:code id: tags:
``` python
from tomwer.luigi.task.darkrefs import DarkRefsTask
from tomwer.core.utils import DownloadDataset
import luigi
import tempfile
import os
import logging
_logger = logging.getLogger('root_logger')
global server_client
## get some data to check Task process
data_folder = tempfile.mkdtemp()
print('loading dataset ...')
try:
reset_proxy()
DownloadDataset('tutorials/D2_H2_T2_h_.tar.bz2', data_folder,
timeout=100, unpack=True)
except TimeoutError:
remove_proxy()
_logger.error('Failed to load dataset')
else:
remove_proxy()
scan = os.path.join(data_folder, 'D2_H2_T2_h_')
print('dataset loaded')
dark_file_0 = os.path.join(scan, 'dark0000.edf')
dark_file_1 = os.path.join(scan, 'dark0001.edf')
refHST_file_0 = os.path.join(scan, 'refHST0000.edf')
refHST_file_1 = os.path.join(scan, 'refHST3600.edf')
assert not os.path.exists(dark_file_0)
assert not os.path.exists(dark_file_1)
assert not os.path.exists(refHST_file_0)
assert not os.path.exists(refHST_file_1)
remove_proxy()
luigi_task = 'esrf.tomwer.DarkRefsTask'
params = [luigi_task,
'--workers', '2',
'--scan', scan,
'--workflow-name', 'test',
'--node-id', '1',
'--no-lock'
]
print('launching darkrefs task...')
assert luigi.interface.run(params) is True
## make sure processing was correct
# make sure processing was correct
assert os.path.exists(dark_file_0)
assert os.path.exists(dark_file_1)
assert os.path.exists(refHST_file_0)
assert os.path.exists(refHST_file_1)
```
%% Cell type:code id: tags:
``` python
# now do the same but adding some properties
from silx.io import dictdump
import tomwer.core.process.reconstruction.darkref.darkrefs
parameters_files_dir = tempfile.mkdtemp()
data_folder = tempfile.mkdtemp()
print('loading dataset ...')
try:
reset_proxy()
DownloadDataset('tutorials/D2_H2_T2_h_.tar.bz2', data_folder,
timeout=100, unpack=True)
except TimeoutError:
remove_proxy()
_logger.error('Failed to load dataset')
else:
remove_proxy()
scan = os.path.join(data_folder, 'D2_H2_T2_h_')
print('dataset loaded')
dark_file_0 = os.path.join(scan, 'dark0000.edf')
dark_file_1 = os.path.join(scan, 'dark0001.edf')
refHST_file_0 = os.path.join(scan, 'refHST0000.edf')
refHST_file_1 = os.path.join(scan, 'refHST3600.edf')
assert not os.path.exists(dark_file_0)
assert not os.path.exists(dark_file_1)
assert not os.path.exists(refHST_file_0)
assert not os.path.exists(refHST_file_1)
print('task properties help')
print(tomwer.core.process.reconstruction.darkref.darkrefs.DarkRefs.properties_help())
print('writing task properties')
parameters = {
'dark': 'None',
'refs': 'Median'
}
parameters_file = os.path.join(parameters_files_dir, 'darks_properties.ini')
dictdump.dicttoini(ddict={'properties': parameters}, inifile=parameters_file)
remove_proxy()
luigi_task = 'esrf.tomwer.DarkRefsTask'
params = [luigi_task,
'--workers', '2',
'--scan', scan,
'--properties-file', parameters_file,
'--workflow-name', 'test2',
'--node-id', '1',
'--no-lock'
]
print('launching darkrefs task...')
assert luigi.interface.run(params) is True
## make sure processing was correct
assert not os.path.exists(dark_file_0)
assert not os.path.exists(dark_file_1)
assert os.path.exists(refHST_file_0)
assert os.path.exists(refHST_file_1)
```
%% Cell type:markdown id: tags:
## EndlessProcessTask
We can process which will run until we directly ask them to stop. Those Task have to inherite from :class:`SingleProcess`
We can for example run a tomodir process which will produce a log when he detects a complete acquisition
*note: for making sure process are executed, make sure there is no trace of such a task (rm /tmp/luigi/*)*
As the Task are 'endless', you should run the next cells into different terminal, otherwise this will fail
%% Cell type:code id: tags:
``` python
import inspect
import test_endless_start
print(inspect.getsource(test_endless_start.start_process))
```
%% Cell type:code id: tags:
``` python
import subprocess
import tempfile
parameters_files_dir = tempfile.mkdtemp()
observed_dir = tempfile.mkdtemp()
subprocess.call(["python3", "test_endless_start.py",
"start",
parameters_files_dir,
observed_dir],
shell=True)
shell=True)
```
%% Cell type:code id: tags:
``` python
# now we can see the result of the process by adding some dataset on the observed_folder
print(inspect.getsource(test_endless_start.create_scan))
```
%% Cell type:code id: tags:
``` python
subprocess.call(["python3", "test_endless_start.py",
"create_scan",
parameters_files_dir,
observed_dir],
shell=True)
```
%% Cell type:code id: tags:
``` python
# once finish, we should stop the scan
print(inspect.getsource(test_endless_start.stop_process))
```
%% Cell type:code id: tags:
``` python
subprocess.call(["python3", "test_endless_start.py",
"stop",
parameters_files_dir,
observed_dir],
shell=True)
shell=True)
```
%% Cell type:markdown id: tags:
# Defining a flow of Tasks
## inputs an outputs
Each tomwer process is defining a list of inputs and outputs.
%% Cell type:code id: tags:
``` python
from tomwer.luigi.scheme import Node
print(Node(luigi_task='esrf.tomwer.FolderTransfertTask').inputs)
print(Node(luigi_task='esrf.tomwer.FolderTransfertTask').outputs)
print(Node(luigi_task='esrf.tomwer.FtseriesTask').outputs)
assert len(Node(luigi_task='esrf.tomwer.FolderTransfertTask').inputs) is 1
```
%% Cell type:markdown id: tags:
## with only single process
we want to produce the following workflow:
datatransfert -> darkrefs -> ftseries -> transfert
We can imaging that we have some data on rnice that we want to move to lbseam for running darkref and ftseries reconstruction then move those again to rnice
%% Cell type:code id: tags:
``` python
from tomwer.luigi.scheme import Node, Link, Scheme
from tomwer.luigi import scheme as __s
print(__s.__file__)
from tomwer.core.utils import DownloadDataset
import tempfile
import os
import shutil
original_folder = tempfile.mkdtemp(prefix='orignal_')
buffer_folder = tempfile.mkdtemp(prefix='buffer_')
final_folder = tempfile.mkdtemp(prefix='final_')
try:
reset_proxy()
DownloadDataset('tutorials/D2_H2_T2_h_.tar.bz2', original_folder,
timeout=100, unpack=True)
except TimeoutError:
remove_proxy()
_logger.error('Failed to load dataset')
else:
remove_proxy()
refHST_file_0 = os.path.join(original_folder, 'refHST0000.edf')
refHST_file_1 = os.path.join(original_folder, '', 'refHST3600.edf')
assert not os.path.exists(refHST_file_0)
assert not os.path.exists(refHST_file_1)
node1 = Node(luigi_task='esrf.tomwer.FolderTransfertTask',
properties={'dest_dir':buffer_folder})
properties={'dest_dir': buffer_folder})
node2 = Node(luigi_task='esrf.tomwer.DarkRefsTask')
node3 = Node(luigi_task='esrf.tomwer.FtseriesTask')
node4 = Node(luigi_task='esrf.tomwer.FolderTransfertTask',
properties={'dest_dir':final_folder})
properties={'dest_dir': final_folder})
nodes = [node1, node2, node3, node4]
links = [
Link(source_node=node1, source_channel='data',
sink_node=node2, sink_channel='data'),
Link(source_node=node2, source_channel='data',
sink_node=node3, sink_channel='data'),
Link(source_node=node3, source_channel='data',
sink_node=node4, sink_channel='data'),
]
scheme = Scheme(nodes=nodes, links=links)
```
%% Cell type:markdown id: tags:
Then we can explore the scheme defined this way
%% Cell type:code id: tags:
``` python
assert len(scheme.finalsNodes()) is 1
assert scheme.finalsNodes()[0].id is node4.id
```
%% Cell type:markdown id: tags:
and of course execute this workflow
%% Cell type:code id: tags:
``` python
import tomwer.luigi
import shutil
import tempfile
import os
for _folder in ('/tmp/esrf_tomwer', '/tmp/luigi'):
if os.path.exists(_folder):
shutil.rmtree(_folder)
def clear_luigi_histo():
for _folder in ('/tmp/esrf_tomwer', '/tmp/luigi'):
if os.path.exists(_folder):
shutil.rmtree(_folder)
scan = os.path.join(original_folder, 'D2_H2_T2_h_')
assert len(os.listdir(final_folder)) is 0
properties_folder = '/nobackup/linazimov/payno/dev/install_esrforange/orangeProject/orange/properties_folder'
shutil.rmtree(properties_folder)
os.mkdir(properties_folder)
properties_folder = tempfile.mkdtemp()
tomwer.luigi.exec_(scheme, properties_folder=properties_folder,
scan=scan, name='test_process')
print('----------')
print(os.listdir(original_folder))
print('----------')
print(os.listdir(final_folder))
assert len(os.listdir(final_folder)) is 1
assert len(os.listdir(os.path.join(final_folder, 'D2_H2_T2_h_'))) > 1
refHST_file_0 = os.path.join(final_folder, 'D2_H2_T2_h_', 'refHST0000.edf')
refHST_file_1 = os.path.join(final_folder, 'D2_H2_T2_h_', 'refHST3600.edf')
assert os.path.exists(refHST_file_0)
assert os.path.exists(refHST_file_1)
```
%% Cell type:markdown id: tags:
## With Endless process and single process
%% Cell type:markdown id: tags:
### 1. defining the endless process (to be executed in a parallel script)
%% Cell type:code id: tags:
``` python
import inspect
import test_workflow_with_endless
print(inspect.getsource(test_workflow_with_endless.start_workflow))
```
%% Cell type:code id: tags:
``` python
import subprocess
import tempfile
parameters_files_dir = tempfile.mkdtemp()
observed_dir = tempfile.mkdtemp()
parameters_files_dir = tempfile.mkdtemp()
observed_dir = tempfile.mkdtemp()
subprocess.call(["python3", "test_workflow_with_endless.py",
"start",
parameters_files_dir,
observed_dir],
shell=True)
shell=True)
```
%% Cell type:markdown id: tags:
### 2. Adding some data on the script observed
%% Cell type:code id: tags:
``` python
from tomwer.core.utils import fastMockAcquisition
import os
subprocess.call(["python3", "test_workflow_with_endless.py",
"create_scan",
parameters_files_dir,
observed_dir],
shell=True)
shell=True)
```
%% Cell type:code id: tags:
``` python
# Then we can wait for the workflow to be executed
# and we found trace of the process, such as the octave file
for _folder in os.listdir(observed_dir):
print('folder %s' % _folder)
print(os.listdir(os.path.join(observed_dir, _folder)))
```
%% Cell type:markdown id: tags:
### 3. Stop the observation
%% Cell type:code id: tags:
``` python
import inspect
import test_workflow_with_endless
print(inspect.getsource(test_workflow_with_endless.stop_workflow))
```
%% Cell type:code id: tags:
``` python
subprocess.call(["python3", "test_workflow_with_endless.py",
"stop",
parameters_files_dir,
observed_dir],
shell=True)
```
%% Cell type:markdown id: tags:
## The special case of the data list
%% Cell type:code id: tags:
``` python
# TODO
```
%% Cell type:code id: tags:
``` python
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment