diff --git a/src/sidecar/celery/ewoks/tasks.py b/src/sidecar/celery/ewoks/tasks.py index b17e0d50ee608e556ec2d2443fd490da0212f870..c2e809b6023a03334383c6f4e0c5b7749b363741 100644 --- a/src/sidecar/celery/ewoks/tasks.py +++ b/src/sidecar/celery/ewoks/tasks.py @@ -1,13 +1,18 @@ -import os +from contextlib import redirect_stdout +import io import json +import os +import logging import traceback from typing import Any, Optional from celery.exceptions import Reject from celery.utils.log import get_task_logger +from sidecar.celery.ewoks.utils import LogBuffer, log_to_buffer + from ...settings import settings -from ...ispyb.autoproc import add_autoproc_message +from ...ispyb.autoproc import add_autoproc_message, add_autoproc_attachment from ...ewoks.exceptions import InfrastructureException from ..app import app @@ -129,6 +134,24 @@ def _execute_graph( ) +def attach_and_save_log( + autoProcProgramId: int, fullFilePath: str, contents: str +) -> Optional[int]: + try: + filePath = os.path.dirname(fullFilePath) + fileName = os.path.basename(fullFilePath) + + if not os.path.exists(filePath): + os.makedirs(filePath) + + with open(fullFilePath, "w") as fh: + fh.write(contents) + + return add_autoproc_attachment(autoProcProgramId, filePath, fileName, "log") + except Exception: + logger.exception("Could not save log file") + + def _execute( graph: str, graph_path: str, @@ -205,12 +228,22 @@ def _execute( "processingMessage": "processing started", }, ) + buffer = io.StringIO() + log_buffer = LogBuffer(10000) try: - result = _execute_graph( - graph_path, execinfo=execinfo, varinfo=varinfo, **kwargs - ) + with log_to_buffer(log_buffer, logging.INFO): + with redirect_stdout(buffer): + result = _execute_graph( + graph_path, execinfo=execinfo, varinfo=varinfo, **kwargs + ) if register and autoProcProgramId: + attach_and_save_log( + autoProcProgramId, + f"{root_uri}/{autoProcProgramId}_stdout.log", + f"Log:\n{log_buffer.get()}\n\nstdout:\n{buffer.getvalue()}", + ) + update_autoproc_program( autoProcProgramId, { @@ -229,6 +262,12 @@ def _execute( # Pass infrastructure failure to dlq except RuntimeError as ex: if register and autoProcProgramId: + attach_and_save_log( + autoProcProgramId, + f"{root_uri}/{autoProcProgramId}_stdout.log", + f"Log:\n{log_buffer.get()}\n\nstdout:\n{buffer.getvalue()}", + ) + update_autoproc_program( autoProcProgramId, { @@ -239,7 +278,11 @@ def _execute( ) trace = traceback.format_exc() - add_autoproc_message(autoProcProgramId, "error", str(ex), description=trace) + attach_and_save_log( + autoProcProgramId, + f"{root_uri}/{autoProcProgramId}_stderr.log", + trace, + ) if isinstance(ex.__cause__, InfrastructureException): if register and autoProcProgramId: diff --git a/src/sidecar/celery/ewoks/utils.py b/src/sidecar/celery/ewoks/utils.py index e54536de892599ffb949d981dc44e5307a1b5a41..c4dfe630dbace8022565dce764b7b117f17bc2d9 100644 --- a/src/sidecar/celery/ewoks/utils.py +++ b/src/sidecar/celery/ewoks/utils.py @@ -1,4 +1,7 @@ -from typing import Any, Optional +import contextlib +from logging.handlers import BufferingHandler +import logging +from typing import Any, Optional, Iterator from celery.result import AsyncResult @@ -10,3 +13,31 @@ def get_result(future: AsyncResult, timeout: Optional[int] = None) -> Any: # Now have to wait on the actual job job = AsyncResult(job_id) return job.get(timeout=timeout) + + +class LogBuffer(BufferingHandler): + def __init__(self, capacity: int): + logging.handlers.BufferingHandler.__init__(self, capacity) + + def get(self) -> str: + messages = [self.format(record) for record in self.buffer] + return "\n".join(messages) + + def shouldFlush(self, _: logging.LogRecord) -> bool: + return False + + +@contextlib.contextmanager +def log_to_buffer(buffer: LogBuffer, level: int = logging.INFO) -> Iterator[None]: + buffer.setLevel(logging.INFO) + buffer.setFormatter( + logging.Formatter("{asctime} {name} {levelname} {message}", style="{") + ) + logging.getLogger().addHandler(buffer) + last_level = logging.getLogger().level + logging.getLogger().setLevel(level) + + yield + + logging.getLogger().setLevel(last_level) + logging.getLogger().removeHandler(buffer)