Commit ebf33e2f authored by Wout De Nolf's avatar Wout De Nolf
Browse files

[ci skip]

parent 78b4a3a5
Pipeline #57757 skipped
......@@ -158,32 +158,3 @@ When a task is defined as a method or a script, a class wrapper will be generate
* *ppfmethod*: same as *method* but it has one optional input "ppfdict" and one output "ppfdict". The output dictonary is the input dictionary updated by the method. The input dictionary is unpacked before passing to the method. The output dictionary is unpacked when checking conditions in links.
* *ppfport*: *ppfmethod* which is the identity mapping
* *script*: defined by a `Task` class with one required input argument ("method": full qualifier name of the method) and one output argument ("return_value")
Hash links
----------
The task graph object in `ewokscore` provides additional functionality in top of what *networkx* provides:
* A *Task* can have several positional and named input variables and named output variables.
* A *Task* has a universal hash which is the hash of the inputs with a *Task* nonce.
* An output *Variable* has a universal hash which is the hash of the *Task* with the variable name as nonce.
* An input *Variable* can be
* default:
* provided by the persistent *Graph* representation
* universal hash of the data
* dynamic:
* provided by upstream *Tasks* at runtime
* output *Variable* of the upstream task so it has a universal hash
The actual output data of a *Task* is never hashed. So we assume that if you provide a task with the same input, you will get the same output. Or at the very least it will not be executed again when succeeded once.
Hash linking of tasks serves the following purpose:
* Changing default input upstream in the graph will effectively create new tasks.
* The hashes provide a unique ID to create a *URI* for persistent storage.
* Variables can be provided with universal hashes to replace the hashing of the actual inputs.
* As data can be passed by passing hashes, serialization for distibuted task scheduling can be done efficiently (not much data to serialize) and no special serializer is required to serialize hashes (as they are just strings).
Data management is currently only a proof-of-concept based on JSON files with the universal hashes as file names.
Workflow implementation
=======================
Workflows in `ewokscore` are based on *networkx* graphs. At runtime, links between nodes are
hash links which provide a unique identifier for each task and each task output. This identifier
is used to save and load task outputs from external storage (e.g. HDF5).
Universal hash
--------------
An instance of `UniversalHash` is the result of SHA-256 hashing of data.
An instance of `UniversalHashable` has a universal hash calculated from
* the pre-hash of the instance
* the instance *nonce* (if any)
The pre-hash is either provided (as a string, `UniversalHash` or `UniversalHashable`) or calculated from
* data provided by the instance
* the class *nonce*
The class *nonce* is calculated from the class qualifier name, the class version (which is a metaclass argument)
and the class *nonce* of the parent class.
Note: a *nonce* is a small piece of data which is added to the actual data before hashing.
Data URI
--------
An instance of `DataURI` provides a URI to external storage and a universal hash on which the URI is based.
Examples:
* `nexus:///tmp/dataset.nx?path=/scan/output_values_afad7eecf6536d20d4e965493429d6cbd9f779152e590ec60aa47ec2d492c09a&name=var1`
Data proxy
----------
An instance of `DataProxy` provides read-write access to data in external storage. It is created from a `DataURI` or
from a root URI in combination with a `UniversalHashable` or `UniversalHash`.
Variables
---------
An instance of `Variable` has a *value* in memory and optionally in external storage.
A `Variable` is also a `UniversalHashable` and has a `DataProxy` in case external storage is enabled (i.e. enough
information exists to create the `DataProxy`).
The universal hash is calculated from the *value* of the `Variable`.
Variable containers
-------------------
An instance of `VariableContainer` is a `Variable` which *value* is a dictionary of `Variable` instances.
Tasks
-----
A `Task` instance has two `VariableContainer` instances, one for inputs and one for outputs.
WIP: ...
Hash links
----------
The task graph object in `ewokscore` provides additional functionality in top of what *networkx* provides:
* A *Task* can have several positional and named input variables and named output variables.
* A *Task* has a universal hash which is the hash of the inputs with a *Task* nonce.
* An output *Variable* has a universal hash which is the hash of the *Task* with the variable name as nonce.
* An input *Variable* can be
* default:
* provided by the persistent *Graph* representation
* universal hash of the data
* dynamic:
* provided by upstream *Tasks* at runtime
* output *Variable* of the upstream task so it has a universal hash
The actual output data of a *Task* is never hashed. So we assume that if you provide a task with the same input,
you will get the same output. Or at the very least it will not be executed again when succeeded once.
Hash linking of tasks serves the following purpose:
* Changing default input upstream in the graph will effectively create new tasks.
* The hashes provide a unique ID to create a *URI* for persistent storage.
* Variables can be provided with universal hashes to replace the hashing of the actual inputs.
* As data can be passed by passing hashes, serialization for distibuted task scheduling can be done efficiently
(not much data to serialize) and no special serializer is required to serialize hashes (as they are just strings).
Data management is currently only a proof-of-concept based on JSON files with the universal hashes as file names.
......@@ -9,4 +9,5 @@ ewokscore has been developed by the `Software group <http://www.esrf.eu/Instrume
:maxdepth: 2
definitions
implementation
api
import random
import hashlib
from typing import Union
from typing import Any, Optional, TypeVar, Union
from collections.abc import Mapping, Iterable, Set
import numpy
from .utils import qualname
......@@ -50,7 +50,7 @@ def uhash(value, _hash=None):
_hash.update(classhashdata(type(value)))
if value is None:
pass
elif isinstance(value, UniversalHashable):
elif isinstance(value, HasUhash):
_hash.update(repr(value.uhash).encode())
elif isinstance(value, UniversalHash):
_hash.update(repr(value).encode())
......@@ -65,7 +65,11 @@ def uhash(value, _hash=None):
elif isinstance(value, (numpy.ndarray, numpy.number)):
_hash.update(value.tobytes())
elif isinstance(value, Mapping):
keys, values = zip(*multitype_sorted(value.items(), key=lambda item: item[0]))
lst = multitype_sorted(value.items(), key=lambda item: item[0])
if lst:
keys, values = zip(*lst)
else:
keys = values = list()
uhash(keys, _hash=_hash)
uhash(values, _hash=_hash)
elif isinstance(value, Set):
......@@ -83,7 +87,7 @@ def uhash(value, _hash=None):
class UniversalHash:
def __init__(self, hexdigest):
def __init__(self, hexdigest: Union[str, bytes]):
if isinstance(hexdigest, bytes):
hexdigest = hexdigest.decode()
if not isinstance(hexdigest, str):
......@@ -107,6 +111,43 @@ class UniversalHash:
return str(self) < str(other)
class HasUhash:
@property
def uhash(self) -> Optional[UniversalHash]:
raise NotImplementedError
def __hash__(self):
# make it python hashable (to use in sets and dict keys)
uhash = self.uhash
if uhash is None:
return hash(id(self))
else:
return hash(uhash)
def __eq__(self, other):
if isinstance(other, HasUhash):
uhash = other.uhash
elif isinstance(other, UniversalHash):
uhash = other
else:
raise TypeError(other, type(other))
return self.uhash == uhash
def __repr__(self):
uhash = self.uhash
if uhash is None:
return super().__repr__()
else:
return f"{super().__repr__()}(uhash='{uhash}')"
def __str__(self):
uhash = self.uhash
if uhash is None:
return qualname(type(self))
else:
return f"{qualname(type(self))}(uhash='{uhash}')"
class MissingData:
def __bool__(self):
return False
......@@ -114,31 +155,34 @@ class MissingData:
def __repr__(self):
return "<MISSING_DATA>"
def __eq__(self, other) -> bool:
return isinstance(other, type(self))
PreUhashTypes = Union[str, bytes, UniversalHash, HasUhash]
class UniversalHashable:
"""The universal hash of an instance is:
* data
* class nonce (class qualname, class version, superclass nonce)
* instance nonce (if any)
When a uhash is provided to the constructor however:
* the provided uhash
class UniversalHashable(HasUhash):
"""The universal hash of an instance of this class is based on:
* pre-uhash
* instance nonce (if any)
The universal hash is equal to the pre-hash when an instance nonce is not provided.
The pre-uhash is either provided or based on:
* data
* class nonce (class qualifier name, class version, superclass nonce)
"""
__CLASS_NONCE = None
__VERSION = None
MISSING_DATA = MissingData()
def __init__(self, uhash=None, instance_nonce=None):
"""
:param str, bytes, UniversalHash, UniversalHashable uhash:
:param instance_nonce:
"""
self.__set_uhash(uhash)
self.__original_uhash = self.__uhash
self.__instance_nonce = instance_nonce
self.__original__instance_nonce = instance_nonce
def __init__(
self,
pre_uhash: Optional[PreUhashTypes] = None,
instance_nonce: Optional[Any] = None,
):
self.set_uhash_init(pre_uhash=pre_uhash, instance_nonce=instance_nonce)
def __init_subclass__(subcls, version=None, **kwargs):
super().__init_subclass__(**kwargs)
......@@ -147,6 +191,38 @@ class UniversalHashable:
subcls_data = subcls.class_nonce_data()
subcls.__CLASS_NONCE = str(uhash((subcls_data, supercls_data)))
def set_uhash_init(
self,
pre_uhash: Optional[PreUhashTypes] = None,
instance_nonce: Optional[Any] = None,
):
self.__set_pre_uhash(pre_uhash)
self.__original_pre_uhash = self.__pre_uhash
self.__instance_nonce = instance_nonce
self.__original__instance_nonce = instance_nonce
def get_uhash_init(self, remove_references=False):
pre_uhash = self.__original_pre_uhash
if remove_references:
if isinstance(pre_uhash, HasUhash):
pre_uhash = str(pre_uhash.uhash)
elif isinstance(pre_uhash, UniversalHash):
pre_uhash = str(pre_uhash)
return {
"pre_uhash": pre_uhash,
"instance_nonce": self.__original__instance_nonce,
}
def __set_pre_uhash(self, pre_uhash):
if pre_uhash is None:
self.__pre_uhash = None
elif isinstance(pre_uhash, (str, bytes)):
self.__pre_uhash = UniversalHash(pre_uhash)
elif isinstance(pre_uhash, (UniversalHash, HasUhash)):
self.__pre_uhash = pre_uhash
else:
raise TypeError(pre_uhash, type(pre_uhash))
@classmethod
def class_nonce(cls):
return cls.__CLASS_NONCE
......@@ -158,32 +234,22 @@ class UniversalHashable:
def instance_nonce(self):
return self.__instance_nonce
def __set_uhash(self, uhash):
if uhash is None:
self.__uhash = None
elif isinstance(uhash, (str, bytes)):
self.__uhash = UniversalHash(uhash)
elif isinstance(uhash, (UniversalHash, UniversalHashable)):
self.__uhash = uhash
else:
raise TypeError(uhash, type(uhash))
def fix_uhash(self):
if self.__uhash is not None:
if self.__pre_uhash is not None:
return
keep, self.__instance_nonce = self.__instance_nonce, None
try:
uhash = self.uhash
pre_uhash = self.uhash
finally:
self.__instance_nonce = keep
self.__set_uhash(uhash)
self.__set_pre_uhash(pre_uhash)
def undo_fix_uhash(self):
self.__uhash = self.__original_uhash
self.__pre_uhash = self.__original_pre_uhash
@property
def uhash(self) -> Union[None, UniversalHash]:
_uhash = self.__uhash
def uhash(self) -> Optional[UniversalHash]:
_uhash = self.__pre_uhash
if _uhash is None:
data = self._uhash_data()
if data is self.MISSING_DATA:
......@@ -195,7 +261,7 @@ class UniversalHashable:
else:
return uhash((data, cnonce, inonce))
else:
if isinstance(_uhash, UniversalHashable):
if isinstance(_uhash, HasUhash):
_uhash = _uhash.uhash
if _uhash is None:
return None
......@@ -213,34 +279,3 @@ class UniversalHashable:
def undo_randomize(self):
self.__instance_nonce = self.__original__instance_nonce
def __hash__(self):
# make it python hashable (to use in sets and dict keys)
uhash = self.uhash
if uhash is None:
return hash(id(self))
else:
return hash(uhash)
def __eq__(self, other):
if isinstance(other, UniversalHashable):
uhash = other.uhash
elif isinstance(other, UniversalHash):
uhash = other
else:
raise TypeError(other, type(other))
return self.uhash == uhash
def __repr__(self):
uhash = self.uhash
if uhash is None:
return super().__repr__()
else:
return f"{super().__repr__()}(uhash='{uhash}')"
def __str__(self):
uhash = self.uhash
if uhash is None:
return qualname(type(self))
else:
return f"{qualname(type(self))}(uhash='{uhash}')"
......@@ -2,6 +2,4 @@ from .proxy import DataProxy
from .json import JsonProxy # noqa F401
from .nexus import NexusProxy # noqa F401
def instantiate_data_proxy(scheme, *args, **kw):
return DataProxy.instantiate(scheme, *args, **kw)
instantiate_data_proxy = DataProxy.instantiate
import os
import string
import random
from pathlib import Path
from contextlib import contextmanager
from typing import Union, Tuple
from typing import Iterable, Optional, Tuple
from silx.io import h5py_utils
......@@ -10,58 +11,52 @@ def random_string(n):
return "".join(random.choices(string.ascii_letters + string.digits, k=n))
def nonexisting_tmp_file(filename):
tmpname = filename + ".tmp" + random_string(6)
while os.path.exists(tmpname):
tmpname = filename + ".tmp" + random_string(6)
return tmpname
def mkdir(filename):
dirname = os.path.dirname(filename)
if dirname:
os.makedirs(dirname, exist_ok=True)
def nonexisting_tmp_file(path: Path) -> Path:
tmppath = path.with_name(f"tmp_ewoks_{random_string(6)}_{path.name}")
while tmppath.exists():
tmppath = path.with_name(f"tmp_ewoks_{random_string(6)}_{path.name}")
return tmppath
@contextmanager
def atomic_file(filename):
filename = str(filename)
tmpname = nonexisting_tmp_file(filename)
mkdir(tmpname)
def atomic_create_path(path: Path) -> Iterable[Path]:
"""Yields a temporary path which will be renamed to the requested path
or deleted on failure.
"""
tmppath = nonexisting_tmp_file(path)
tmppath.parent.mkdir(parents=True, exist_ok=True)
try:
yield tmpname
yield tmppath
except Exception:
try:
os.unlink(tmpname)
os.unlink(tmppath)
except FileNotFoundError:
pass
raise
os.rename(tmpname, filename)
tmppath.rename(path) # overwrite when it exists
@contextmanager
def atomic_write(filename):
with atomic_file(filename) as tmpname:
with open(tmpname, mode="w") as f:
def atomic_write(path: Path, **kw):
with atomic_create_path(path) as tmpname:
with open(tmpname, mode="w", **kw) as f:
yield f
@h5py_utils.retry_contextmanager()
def append_hdf5(filename):
with h5py_utils.File(filename, mode="a") as h5file:
def append_hdf5(filename, **kw):
with h5py_utils.File(filename, mode="a", **kw) as h5file:
yield h5file
@contextmanager
def atomic_hdf5(
filename, h5group: Union[None, str]
) -> Tuple[h5py_utils.File, Union[None, str]]:
def atomic_write_hdf5(
path, h5group: Optional[str], **kw
) -> Tuple[h5py_utils.File, Optional[str]]:
if not h5group or h5group == "/":
with atomic_file(filename) as tmpname:
with h5py_utils.File(tmpname, mode="a") as f:
with atomic_create_path(path) as tmppath:
with h5py_utils.File(tmppath, mode="a", **kw) as f:
yield f, h5group
else:
# Atomic because an HDF5 file can be modified
# by only one process at a time
with append_hdf5(filename, retry_period=1, retry_timeout=360) as h5file:
yield h5file, h5group
with append_hdf5(path, retry_period=0.5, retry_timeout=360, **kw) as f:
yield f, h5group
from typing import Union, Any
from typing import List, Optional, Any
from pathlib import Path
from urllib.parse import ParseResult
from .uri import path_from_uri
from . import proxy
from ..hashing import UniversalHashable
class FileProxy(proxy.DataProxy, register=False):
EXTENSION = NotImplemented
"""Example root URI's:
* "file://path/to/directory"
* "file://path/to/file.ext"
* "file://path/to/file.ext?path=/path/in/file"
* "file://path/to/file.ext?path=/path/in/file&name=name/in/file"
"""
EXTENSIONS = NotImplemented
ALLOW_PATH_IN_FILE = NotImplemented
SUB_PATH_SEP = "/"
@property
def path(self) -> Union[Path, None]:
def path(self) -> Optional[Path]:
if self.fixed_uri:
return path_from_uri(self.uri.parse())
if self._root_uri is None:
parsed_root_uri = self.parsed_root_uri
if parsed_root_uri is None:
return None
path = path_from_uri(self._root_uri)
if path.name.endswith(self.EXTENSION):
return path
identifier = self._identifier
if not identifier:
root_path = path_from_uri(parsed_root_uri)
if any(root_path.name.endswith(ext) for ext in self.EXTENSIONS):
return root_path
identifier = self.identifier
if identifier is None:
return None
filename = identifier + self.EXTENSION
return path / filename
if self.ALLOW_PATH_IN_FILE:
filename = identifier + self.EXTENSIONS[0]
return root_path / filename
else:
path = root_path / identifier
subdirs = self._root_sub_path_parts()
if subdirs:
path = path.with_suffix(".tmp")
path = path.with_name(path.name[:-4])
path = path.joinpath(*subdirs)
path = path.with_suffix(self.EXTENSIONS[0])
return path
def _generate_uri(self) -> Union[None, proxy.DataUri]:
def _root_sub_path_parts(self, parent: bool = False) -> List[str]:
identifier = self.identifier
if identifier is None:
return list()
path = self.root_uri_path
parts = [s for s in path.split(self.SUB_PATH_SEP)]
name = self.root_uri_name
if parent:
parts += name.split(self.SUB_PATH_SEP)[:-1]
else:
parts += name.split(self.SUB_PATH_SEP)
return [s for s in parts if s]
def sub_path_parent_parts(self) -> List[str]:
if self.ALLOW_PATH_IN_FILE:
return self._root_sub_path_parts(parent=True)
else:
return list()
@property
def root_uri_path(self) -> str:
return self.root_uri_query.get("path", "")
@property
def root_uri_name(self) -> str:
return self.root_uri_query.get("name", "")
@property
def sub_path_parent(self) -> str:
return self.SUB_PATH_SEP.join(self.sub_path_parent_parts())
@property
def sub_path_name(self) -> str:
if self.ALLOW_PATH_IN_FILE:
name = self.root_uri_name
return name.split(self.SUB_PATH_SEP)[-1]
else:
return ""
@property
def sub_path(self) -> str:
parent = self.sub_path_parent
name = self.sub_path_name
if parent and name:
return parent + self.SUB_PATH_SEP + name
else:
return parent + name
def _generate_uri(self) -> Optional[proxy.DataUri]:
path = self.path
if path is None:
return
return proxy.DataUri(f"{self.SCHEME}://{self.path}", self.uhash)
query = dict()
sub_path_name = self.sub_path_name
if sub_path_name:
query["name"] = sub_path_name