Commit 5bf045f5 authored by Wout De Nolf's avatar Wout De Nolf
Browse files

add celery config from yaml or Beacon

parent 4d01bd57
Pipeline #82823 failed with stages
in 52 seconds
celery:
broker_url: "redis://localhost:6379/3"
result_backend: "redis://localhost:6379/4"
result_serializer: "pickle"
accept_content: ["application/json", "application/x-python-serialize"]
result_expires: 600
......@@ -23,7 +23,7 @@ package_dir=
packages=find:
python_requires = >=3.6
install_requires =
celery
celery >= 5
ewoksutils
[options.packages.find]
......@@ -45,6 +45,7 @@ test =
pytest
pytest-celery
pytest-redis
blissdata
dev =
%(test)s
black
......
import os
import sys
import logging
import importlib
from pathlib import Path
import celery
def configure_app(app: celery.Celery):
_module = os.environ.get("CELERY_CONFIG_MODULE")
if _module:
path = Path(_module)
if path.is_file():
parent = str(path.parent.absolute())
if parent not in sys.path:
sys.path.append(parent)
_module = path.stem
app.config_from_object(_module, force=True)
else:
from typing import Optional, Tuple
from celery.loaders.base import BaseLoader
from celery import Celery
logger = logging.getLogger(__name__)
def configure_app(app):
from click import Option
from celery import bootsteps
app.user_options["preload"].add(
Option(
["--yaml"],
required=False,
help="Celery configuration in a local file (python or yaml) or Beacon.",
)
)
class EwoksBootstep(bootsteps.Step):
def __init__(self, parent, yaml: str = "", **options):
try:
loader = EwoksLoader(app, config=yaml)
loader.read_configuration()
loader.import_default_modules()
except Exception as err:
logger.error(err)
sys.exit(-1)
super().__init__(parent, **options)
app.steps["worker"].add(EwoksBootstep)
class EwoksLoader(BaseLoader):
"""Celery loader based on a local file (python or yaml) or Beacon."""
def __init__(
self, app: Celery, config: str, *, configure_logging: bool = True
) -> None:
self.app = app
if config:
self.config_path = config
else:
self.config_path = None
self.configure_logging = configure_logging
super().__init__(app)
def read_configuration(self) -> dict:
config = read_configuration(self.config_path)
# Warning: calling with silent=True causes sphinx doc
# building to fail.
try:
import celeryconfig # noqa F401
except ImportError:
pass
else:
app.config_from_object("celeryconfig", force=True)
self.app.config_from_object(config, force=True)
return config
def read_configuration(config_path: Optional[str] = None) -> dict:
if config_path and (
config_path.startswith("beacon:")
or config_path.split(".")[-1] in ("yml", "yaml")
):
config = _read_yaml_config(config_path)
if "celery" in config:
config = config["celery"]
elif "CELERY" in config:
config = config["CELERY"]
else:
config = _read_py_config(config_path)
return config
def _read_yaml_config(resource: str) -> dict:
from blissdata.beacon.files import read_config
return read_config(resource)
def _read_py_config(module: Optional[str] = None) -> dict:
sys_path, module = _get_config_module(module)
keep_sys_path = sys.path
sys.path.insert(0, sys_path)
try:
config = vars(importlib.import_module(module))
mtype = type(os)
config = {
k: v
for k, v in config.items()
if not k.startswith("_") and not isinstance(v, mtype)
}
return config
finally:
sys.path = keep_sys_path
def _get_config_module(module: Optional[str] = None) -> Tuple[str, str]:
if not module:
module = os.environ.get("CELERY_CONFIG_MODULE")
if not module:
return os.getcwd(), "celeryconfig"
path = Path(module)
if path.is_file():
parent = str(path.parent.absolute())
return parent, path.stem
return os.getcwd(), module
import pytest
from ..config import read_configuration
EXPECTED = {
"broker_url": "redis://localhost:6379/3",
"result_backend": "redis://localhost:6379/4",
"result_serializer": "pickle",
"accept_content": ["application/json", "application/x-python-serialize"],
"result_expires": 600,
}
def test_py_config(py_config):
assert read_configuration(py_config) == EXPECTED
def test_yaml_config(yaml_config):
assert read_configuration(yaml_config) == EXPECTED
def test_beacon_config(beacon_config):
assert read_configuration(beacon_config) == EXPECTED
@pytest.fixture
def py_config(tmpdir):
filename = str(tmpdir / "celeryconfig.py")
lines = [
"broker_url = 'redis://localhost:6379/3'\n",
"result_backend = 'redis://localhost:6379/4'\n",
"result_serializer = 'pickle'\n",
"accept_content = ['application/json', 'application/x-python-serialize']\n",
"result_expires = 600\n",
]
with open(filename, "w") as f:
f.writelines(lines)
return filename
@pytest.fixture
def yaml_config(tmpdir):
filename = str(tmpdir / "ewoks.yaml")
lines = [
"celery:\n",
" broker_url: 'redis://localhost:6379/3'\n",
" result_backend: 'redis://localhost:6379/4'\n",
" result_serializer: 'pickle'\n",
" accept_content: ['application/json', 'application/x-python-serialize']\n",
" result_expires: 600\n",
]
with open(filename, "w") as f:
f.writelines(lines)
return filename
@pytest.fixture
def beacon_config(mocker):
url = "beacon://localhost:1234/config.yml"
client = mocker.patch("blissdata.beacon.files.read_config")
def read_config(_url):
if _url == url:
return EXPECTED
client.side_effect = read_config
return url
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment