Commit 246dda6a authored by Pierre Paleo's avatar Pierre Paleo
Browse files

Scheduler and Logger

parent 61172fe7
......@@ -68,6 +68,7 @@ def setup_package():
install_requires = [
'distributed',
'tornado',
],
......
import sys
import logging
import logging.config
import traceback
from io import TextIOWrapper
from threading import Thread
class StreamToLogger(TextIOWrapper):
"""
Fake file-like stream object that redirects writes to a logger instance.
Reference:
https://www.electricmonk.nl/log/2011/08/14/redirect-stdout-and-stderr-to-a-logger-in-python/
"""
def __init__(self, logger, formatter):
self.logger = logger
self.linebuf = ''
def write(self, buf):
for line in buf.rstrip().splitlines():
self.logger.error(line.rstrip())
def flush(self):
return
class LogThread(Thread):
"""
Reference: http://benno.id.au/blog/2012/10/06/python-thread-exceptions
Exceptions captured with sys.excepthook are captured only at the process-scope,
meaning that exceptions in Threads are *not* captured.
Therefore, this wrapper is used around threading.Thread.
LogThread should always be used in preference to threading.Thread.
The interface provided by LogThread is identical to that of threading.Thread,
however, if an exception occurs in the thread the error will be logged
(using logging.exception) rather than printed to stderr.
This is important in daemon style applications where stderr is redirected
to /dev/null.
"""
def __init__(self, logger, **kwargs):
super().__init__(**kwargs)
self._real_run = self.run
self.run = self._wrap_run
self.logger = logger
def _wrap_run(self):
try:
self._real_run()
except:
self.logger.exception('Thread Exception:')
class Logger(object):
def __init__(
self,
loggername,
level="DEBUG",
logfile="logger.log",
console=True,
capture_stderr=True,
):
"""
Simple logging interface.
Parameters
----------
loggername: str
Logger name.
level: str
Logging level. Can be "DEBUG", "INFO", "ERROR", "FATAL".
logfile: str
File where the log are written. If set to None, the logs are not written in a file.
console: bool
If set to True, the logs are (also) written in stdout/stderr
capture_stderr: bool
If set to True, stderr messages will also be written in logfile
"""
self.loggername = loggername
self.level = level
self.logfile = logfile
self.console = console
self.capture_stderr = capture_stderr
self.configure_logger()
def configure_logger(self):
conf = self.get_default_config_dict()
for handler in conf["handlers"].keys():
conf["handlers"][handler]["level"] = self.level.upper()
conf["loggers"][self.loggername]["level"] = self.level.upper()
if not(self.console):
conf["loggers"][self.loggername]["handlers"].remove("console")
self.config = conf
logging.config.dictConfig(conf)
self.logger = logging.getLogger(self.loggername)
if self.capture_stderr:
self.err_formatter = logging.Formatter(
"%(message)s"
)
sys.stderr = StreamToLogger(self.logger, self.err_formatter)
def info(self, msg):
return self.logger.info(msg)
def debug(self, msg):
return self.logger.debug(msg)
def warning(self, msg):
return self.logger.warning(msg)
def error(self, msg):
return self.logger.error(msg)
def fatal(self, msg):
return self.logger.fatal(msg)
def critical(self, msg):
return self.logger.critical(msg)
def __del__(self):
if self.capture_stderr:
sys.stderr = sys.__stderr__
def get_default_config_dict(self):
# TODO read from file
conf = {
'version': 1,
'formatters': {
'default': {'format': '%(asctime)s - %(levelname)s - %(message)s', 'datefmt': '%d-%m-%Y %H:%M:%S'},
'console': {'formar': '%(message)s'}
},
'handlers': {
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'console',
'stream': 'ext://sys.stdout'
},
'file': {
'level': 'DEBUG',
#~ 'class': 'logging.handlers.RotatingFileHandler',
'class': 'logging.FileHandler',
'formatter': 'default',
'filename': self.logfile,
#~ 'maxBytes': 1048576,
#~ 'backupCount': 3
}
},
'loggers': {
self.loggername: {
'level': 'DEBUG',
'handlers': ['console', 'file'],
'propagate': True
}
},
'disable_existing_loggers': False,
}
return conf
class DummyLogger(object):
"""
Logger that is either a "true" logger object, or a fall-back to "print".
In many cases, a Logger object is (optionally) passed as a parameter
when creating an object instance, ex. m = MyClass(args, logger=logger).
The target class code should adapt whether logger is None or not.
To avoid many "if self.logger is not None", we use this DummyLogger class
to fall-back on stdout when logger is None.
"""
def __init__(self, logger):
self.logger = logger
methods = [
"debug",
"warn",
"warning",
"info",
"error",
"fatal",
"critical",
"exception"
]
for method in methods:
self.__setattr__(
method,
self.logger.__getattribute__(method) if self.logger is not None else print
)
LogLevel = {
"notset": logging.NOTSET,
"debug": logging.DEBUG,
"info": logging.INFO,
"warn": logging.WARN,
"warning": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL,
"fatal": logging.FATAL
}
from tornado.ioloop import IOLoop
from distributed import Scheduler
from distributed.cli.utils import install_signal_handlers
from .utils import generate_random_string
from .logger import Logger
class DaskScheduler(object):
"""
Initialize a dask scheduler.
Parameters
----------
addr: str, optional
Scheduler listening address.
port: int, optional
Scheduler listening port.
name: str, optional
Name of the scheduler.
scheduler_file: str, optional
File where the information on scheduler will be stored.
logging_options: dict, optional
Dict where the keys are parameters of the Logger() class.
bokeh_port: int, optional
If set, the scheduler will propose a Bokeh service at the given port.
"""
def __init__(
self,
addr="tcp://127.0.0.1",
port=5454,
name=None,
scheduler_file=None,
logging_options=None,
bokeh_port=None,
):
self.port = port
self.addr = addr + ":" + str(port)
self.name = name or "scheduler_" + generate_random_string(10)
self.scheduler_file = scheduler_file or "/tmp/" + self.name
self.configure_logger(logging_options)
self.logger.debug("Initializing Scheduler '%s'" % self.name)
self.loop = IOLoop.current()
self.logger.debug("Using loop %s" % str(self.loop))
self.services = {}
self.configure_bokeh(bokeh_port)
self.scheduler = Scheduler(
loop=self.loop,
services=self.services,
scheduler_file=self.scheduler_file
)
install_signal_handlers(self.loop, cleanup=self.cleanup)
def configure_logger(self, logging_options):
default_options = {
"loggername": self.name,
"level": "DEBUG",
"logfile": self.name + ".log",
"console": True,
"capture_stderr": True,
}
if logging_options is not None:
default_options.update(logging_options)
else:
logging_options = default_options
loggername = logging_options.pop("loggername")
self.logger = Logger(loggername, **logging_options)
def configure_bokeh(self, bokeh_port, bokeh_prefix=None):
if bokeh_port is not None:
from distributed.bokeh.scheduler import BokehScheduler
self.services[('bokeh', bokeh_port)] = (BokehScheduler, {'prefix': bokeh_prefix})
self.logger.info("Bokeh at %s:%d" % (bokeh_prefix, bokeh_port))
def start(self, start_ioloop=True):
self.logger.info("Starting scheduler at %s" % self.addr)
self.scheduler.start(self.addr)
try:
if start_ioloop:
self.logger.debug("Starting scheduler loop")
self.loop.start()
self.loop.close()
self.logger.debug("Stopping scheduler loop")
finally:
self.logger.info("Stopping scheduler")
self.scheduler.stop()
def cleanup(self, signum):
self.logger.critical("Received signal %s" % str(signum))
pass
from random import choice
from string import ascii_letters, digits
def generate_random_string(length):
result = ""
alphabet = ascii_letters + digits
for i in range(length):
result += choice(alphabet)
return result
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment