Commit 300a81b4 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

fixture for test_ppf logging

parent a988379f
import sys
import logging
import pytest
# loggers = [logging.getLogger("pypushflow"), logging.getLogger("esrf2pypushflow")]
loggers = [logging.getLogger("esrf2pypushflow")]
@pytest.fixture(scope="session")
def ppf_logging():
stdouthandler = logging.StreamHandler(sys.stdout)
levels = []
for logger in loggers:
levels.append(logger.getEffectiveLevel())
logger.setLevel(logging.DEBUG)
logger.addHandler(stdouthandler)
yield
for level, logger in zip(levels, loggers):
logger.setLevel(level)
logger.removeHandler(stdouthandler)
import sys
import logging
from esrf2pypushflow import job
from taskgraphlib import taskgraphs
from taskgraphlib import assert_taskgraph_result
......@@ -8,34 +6,29 @@ from taskgraphlib import assert_taskgraph_result_output
# Logging makes multiprocessing hangs?
# https://pythonspeed.com/articles/python-multiprocessing/
# logging.getLogger("pypushflow").setLevel(logging.DEBUG)
# logging.getLogger("pypushflow").addHandler(logging.StreamHandler(sys.stdout))
logging.getLogger("esrf2pypushflow").setLevel(logging.DEBUG)
logging.getLogger("esrf2pypushflow").addHandler(logging.StreamHandler(sys.stdout))
def test_acyclic_job1(tmpdir):
def test_acyclic_job1(ppf_logging, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = taskgraphs.acyclic_graph1()
job(graph, varinfo=varinfo)
assert_taskgraph_result(graph, expected, varinfo)
def test_acyclic_job2(tmpdir):
def test_acyclic_job2(ppf_logging, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = taskgraphs.acyclic_graph2()
job(graph, varinfo=varinfo)
assert_taskgraph_result(graph, expected, varinfo)
def test_cyclic_job1(tmpdir):
def test_cyclic_job1(ppf_logging, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = taskgraphs.cyclic_graph1()
result = job(graph, varinfo=varinfo)
assert_taskgraph_result_output(result, expected, varinfo)
def test_cyclic_job2(tmpdir):
def test_cyclic_job2(ppf_logging, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = taskgraphs.cyclic_graph2()
result = job(graph, varinfo=varinfo)
......
import os
import sys
import logging
import unittest
from esrf2pypushflow import job
......@@ -14,9 +13,6 @@ sys.path.insert(
0, "/mnt/multipath-shares/sware/exp/pxsoft/bes/vgit/linux-x86_64/id30a2/edna2"
)
logging.getLogger("esrf2pypushflow").setLevel(logging.DEBUG)
logging.getLogger("esrf2pypushflow").addHandler(logging.StreamHandler(sys.stdout))
# This test will only work for users 'opid30' or 'svensson' due to file
# system permissions. Also note that this workflow needs a working
......
import sys
import logging
from esrf2pypushflow import job
from taskgraphlib import assert_taskgraph_result
logging.getLogger("esrf2pypushflow").setLevel(logging.DEBUG)
logging.getLogger("esrf2pypushflow").addHandler(logging.StreamHandler(sys.stdout))
def workflow1():
nodes = [
......@@ -33,7 +28,7 @@ def workflow1():
return graph, expected_results
def test_workflow1(tmpdir):
def test_workflow1(ppf_logging, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow1()
job(graph, varinfo=varinfo)
......
import sys
import logging
import itertools
import pytest
from esrf2pypushflow import job
from taskgraphlib import assert_taskgraph_result_output
logging.getLogger("esrf2pypushflow").setLevel(logging.INFO)
logging.getLogger("esrf2pypushflow").addHandler(logging.StreamHandler(sys.stdout))
def workflow10(inputs):
nodes = [
......@@ -57,7 +52,7 @@ def workflow10(inputs):
"limit,persistent",
itertools.product([10], [True, False]),
)
def test_workflow10(limit, persistent, tmpdir):
def test_workflow10(limit, persistent, ppf_logging, tmpdir):
if persistent:
varinfo = {"root_uri": str(tmpdir)}
else:
......
import sys
import logging
from esrf2pypushflow import job
from taskgraphlib import assert_taskgraph_result
logging.getLogger("esrf2pypushflow").setLevel(logging.DEBUG)
logging.getLogger("esrf2pypushflow").addHandler(logging.StreamHandler(sys.stdout))
def submodel11a():
nodes = [
......@@ -154,7 +149,7 @@ def workflow11():
return graph, expected_results
def test_workflow11(tmpdir):
def test_workflow11(ppf_logging, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow11()
job(graph, varinfo=varinfo)
......
import sys
import logging
import pytest
from esrf2pypushflow import job
from taskgraphlib import assert_taskgraph_result
logging.getLogger("esrf2pypushflow").setLevel(logging.DEBUG)
logging.getLogger("esrf2pypushflow").addHandler(logging.StreamHandler(sys.stdout))
def submodel12():
nodes = [
......@@ -104,7 +99,7 @@ def workflow12(startvalue, withsubmodel_startvalue):
@pytest.mark.parametrize("startvalue", [0, 1])
def test_workflow12(startvalue, tmpdir):
def test_workflow12(startvalue, ppf_logging, tmpdir):
withsubmodel_startvalue = 1
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow12(startvalue, withsubmodel_startvalue)
......
import sys
import logging
import pytest
from esrf2pypushflow import job
from taskgraphlib import assert_taskgraph_result
logging.getLogger("esrf2pypushflow").setLevel(logging.DEBUG)
logging.getLogger("esrf2pypushflow").addHandler(logging.StreamHandler(sys.stdout))
def submodel13():
nodes = [
......@@ -101,7 +96,7 @@ def workflow13(startvalue, withlastnode_startvalue):
@pytest.mark.parametrize("startvalue", [0, 1])
def test_workflow13(startvalue, tmpdir):
def test_workflow13(startvalue, ppf_logging, tmpdir):
withlastnode_startvalue = 1
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow13(startvalue, withlastnode_startvalue)
......
import sys
import logging
from esrf2pypushflow import job
from taskgraphlib import assert_taskgraph_result
logging.getLogger("esrf2pypushflow").setLevel(logging.DEBUG)
logging.getLogger("esrf2pypushflow").addHandler(logging.StreamHandler(sys.stdout))
def submodel14a():
nodes = [
......@@ -142,7 +137,7 @@ def workflow14():
return graph, expected_results
def test_workflow14(tmpdir):
def test_workflow14(ppf_logging, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow14()
job(graph, varinfo=varinfo)
......
import sys
import logging
from esrf2pypushflow import job
from taskgraphlib import assert_taskgraph_result
logging.getLogger("esrf2pypushflow").setLevel(logging.DEBUG)
logging.getLogger("esrf2pypushflow").addHandler(logging.StreamHandler(sys.stdout))
def submodel15(name):
nodes = [
......@@ -113,7 +108,7 @@ def workflow15():
return graph, expected_results
def test_workflow15(tmpdir):
def test_workflow15(ppf_logging, tmpdir):
"""Test connecting nodes from submodels directly"""
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow15()
......
import sys
import logging
from esrf2pypushflow import job
from taskgraphlib import assert_taskgraph_result
logging.getLogger("esrf2pypushflow").setLevel(logging.DEBUG)
logging.getLogger("esrf2pypushflow").addHandler(logging.StreamHandler(sys.stdout))
def submodel16a():
nodes = [
......@@ -156,7 +151,7 @@ def workflow16():
return graph, expected_results
def test_workflow16(tmpdir):
def test_workflow16(ppf_logging, tmpdir):
"""Test connecting nodes from sub-submodels to the top model"""
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow16()
......
import sys
import logging
from esrf2pypushflow import job
from taskgraphlib import assert_taskgraph_result
logging.getLogger("esrf2pypushflow").setLevel(logging.DEBUG)
logging.getLogger("esrf2pypushflow").addHandler(logging.StreamHandler(sys.stdout))
def workflow2():
nodes = [
......@@ -32,7 +27,7 @@ def workflow2():
return graph, expected_results
def test_workflow2(tmpdir):
def test_workflow2(ppf_logging, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow2()
result = job(graph, varinfo=varinfo, raise_on_error=False)
......
import sys
import logging
from esrf2pypushflow import job
from taskgraphlib import assert_taskgraph_result
logging.getLogger("esrf2pypushflow").setLevel(logging.DEBUG)
logging.getLogger("esrf2pypushflow").addHandler(logging.StreamHandler(sys.stdout))
def submodel1():
nodes = [
......@@ -86,7 +81,7 @@ def workflow3():
return graph, expected_results
def test_workflow3(tmpdir):
def test_workflow3(ppf_logging, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow3()
job(graph, varinfo=varinfo)
......
import sys
import logging
from esrf2pypushflow import job
from taskgraphlib import assert_taskgraph_result
logging.getLogger("esrf2pypushflow").setLevel(logging.DEBUG)
logging.getLogger("esrf2pypushflow").addHandler(logging.StreamHandler(sys.stdout))
def submodel6():
nodes = [
......@@ -97,7 +92,7 @@ def workflow6():
return graph, expected_results
def test_workflow6(tmpdir):
def test_workflow6(ppf_logging, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow6()
job(graph, varinfo=varinfo)
......
import sys
import logging
from esrf2pypushflow import job
from taskgraphlib import assert_taskgraph_result
logging.getLogger("esrf2pypushflow").setLevel(logging.DEBUG)
logging.getLogger("esrf2pypushflow").addHandler(logging.StreamHandler(sys.stdout))
def submodel7():
nodes = [
......@@ -84,7 +79,7 @@ def workflow7():
return graph, expected_results
def test_workflow7(tmpdir):
def test_workflow7(ppf_logging, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow7()
job(graph, varinfo=varinfo)
......
import sys
import logging
from esrf2pypushflow import job
from taskgraphlib import assert_taskgraph_result
logging.getLogger("esrf2pypushflow").setLevel(logging.DEBUG)
logging.getLogger("esrf2pypushflow").addHandler(logging.StreamHandler(sys.stdout))
def submodel8():
nodes = [
......@@ -84,7 +79,7 @@ def workflow8():
return graph, expected_results
def test_workflow8(tmpdir):
def test_workflow8(ppf_logging, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow8()
job(graph, varinfo=varinfo)
......
import sys
import logging
from esrf2pypushflow import job
from taskgraphlib import assert_taskgraph_result
logging.getLogger("esrf2pypushflow").setLevel(logging.DEBUG)
logging.getLogger("esrf2pypushflow").addHandler(logging.StreamHandler(sys.stdout))
def workflow9():
nodes = [
......@@ -59,7 +54,7 @@ def workflow9():
return graph, expected_results
def test_workflow9(tmpdir):
def test_workflow9(ppf_logging, tmpdir):
varinfo = {"root_uri": str(tmpdir)}
graph, expected = workflow9()
job(graph, varinfo=varinfo)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment