Commit 7fc82c0d authored by Wout De Nolf's avatar Wout De Nolf
Browse files

esrftaskgraph: refactor exist/available to has_value, has_runtime_value and has_persistent_value

parent 53b5dbea
......@@ -179,31 +179,31 @@ class Task(Registered, hashing.UniversalHashable, register=False):
@property
def done(self):
"""Completed (with or without exception)"""
return self.failed or self._outputs.available
return self.failed or self._outputs.has_value
@property
def failed(self):
return self._exception is not None
def _unavailable_inputs(self):
def _iter_missing_input_values(self):
for iname in self._INPUT_NAMES:
ivar = self._inputs.get(iname)
if ivar is None or not ivar.available:
var = self._inputs.get(iname)
if var is None or not var.has_value:
yield iname
@property
def is_ready_to_execute(self):
try:
next(iter(self._unavailable_inputs()))
next(iter(self._iter_missing_input_values()))
except StopIteration:
return True
return False
def assert_ready_to_execute(self):
unavailable = list(self._unavailable_inputs())
if unavailable:
lst = list(self._iter_missing_input_values())
if lst:
raise TaskInputError(
"The following inputs could not be loaded: " + str(unavailable)
"The following inputs could not be loaded: " + str(lst)
)
def execute(self, force_rerun=False, raise_on_error=True):
......
......@@ -46,17 +46,16 @@ def atomic_write(filename):
class Variable(hashing.UniversalHashable):
"""Has a runtime representation (python object) and a persistent
representation (JSON).
"""Has a runtime value (python object) and a persistent value (JSON).
TODO: make abstraction of persistent representation
TODO: make abstraction of persistent medium
"""
def __init__(
self, value=hashing.UniversalHashable.MISSING_DATA, varinfo=None, **kw
):
"""
:param value: the runtime representation
:param value: the runtime value
:param dict varinfo:
:param **kw: see `UniversalHashable`
"""
......@@ -66,16 +65,16 @@ class Variable(hashing.UniversalHashable):
raise TypeError(varinfo, type(varinfo))
self._root_uri = varinfo.get("root_uri")
self._disable_persistency = not self._root_uri
self._runtime_representation = self.MISSING_DATA
self._runtime_value = self.MISSING_DATA
super().__init__(**kw)
self.value = value
def _uhash_data(self):
"""The runtime representation of the value is used."""
"""The runtime value is used."""
if self._disable_persistency:
return super()._uhash_data()
else:
return self._runtime_representation
return self._runtime_value
def __eq__(self, other):
if isinstance(other, hashing.UniversalHashable):
......@@ -85,17 +84,17 @@ class Variable(hashing.UniversalHashable):
@property
def value(self):
if self._runtime_representation is self.MISSING_DATA:
self.value = self.load(raise_error=False)
return self._runtime_representation
if self._runtime_value is self.MISSING_DATA:
self.load(raise_error=False)
return self._runtime_value
@value.setter
def value(self, v):
self._runtime_representation = v
self._runtime_value = v
@property
def uri(self):
"""uri of the persistent representation
"""uri of the persistent value
:returns str or None: returns None when uhash is None
"""
......@@ -108,93 +107,88 @@ class Variable(hashing.UniversalHashable):
return filename
def dump(self):
"""From runtime to persistent representation (never overwrite).
Creating the persistent representation needs to be atomic.
"""From runtime to persistent value (never overwrite).
Creating the persistent value needs to be atomic.
This silently returns when:
- persistency is disabled
- already persisted
- data is not available (MISSING_DATA)
- data does not have a runtime value (MISSING_DATA)
- non value URI can be constructed
"""
if self._disable_persistency or self.exists or not self.available:
if (
self._disable_persistency
or self.has_persistent_value
or not self.has_runtime_value
):
return
filename = self.uri
if not filename:
return
data = self.value
with atomic_write(filename) as f:
json.dump(self.serialize(data), f)
json.dump(self._serialize(data), f)
def load(self, raise_error=True):
"""From persistent to runtime representation
"""From persistent to runtime value. This is called when
try to get the value (lazy loading).
This silently returns when:
- persistency is disabled
- uri is None (i.e. uhash is None)
- raise_error=False
"""
if self._disable_persistency:
return self._runtime_representation
return
filename = self.uri
if not filename:
return self._runtime_representation
return
try:
with open(filename, mode="r") as f:
return self.deserialize(json.load(f))
self._runtime_value = self._deserialize(json.load(f))
except FileNotFoundError as e:
if raise_error:
raise UriNotFoundError(filename) from e
else:
return self._runtime_representation
except Exception as e:
if raise_error:
raise PersistencyError(filename) from e
else:
return self._runtime_representation
def serialize(self, value):
def _serialize(self, value):
"""Before runtime to persistent"""
return value
def deserialize(self, value):
def _deserialize(self, value):
"""Before persistent to runtime"""
return value
@property
def exists(self):
"""Has a persistent representation"""
return self._exists()
def has_persistent_value(self):
return self._has_persistent_value()
@property
def available(self):
"""Has a runtime representation"""
def has_runtime_value(self):
try:
return self._available()
return self._has_runtime_value()
except PersistencyError:
# Lazy loading failed
return False
def _exists(self):
"""Has a persistent representation"""
@property
def has_value(self):
return self.has_runtime_value or self.has_persistent_value
def _has_persistent_value(self):
filename = self.uri
if filename:
return os.path.isfile(filename)
else:
return False
def _available(self):
"""Has a runtime representation"""
return self.value is not self.MISSING_DATA
def validate(self):
if not self.exists:
raise RuntimeError("Has no persistent representation")
if not self.available:
raise RuntimeError("Has no runtime representation")
return self.value == self.load()
def _has_runtime_value(self):
return self._runtime_value is not self.MISSING_DATA
def force_non_existing(self):
while self.exists:
while self.has_persistent_value:
self.uhash_randomize()
......@@ -258,7 +252,10 @@ class VariableContainer(Mapping, Variable):
return Variable(**varparams)
def _initialize_container(self):
if not self.container_available and not self.container_exists:
if (
not self.container_has_runtime_value
and not self.container_has_persistent_value
):
self.value = dict()
def __iter__(self):
......@@ -275,10 +272,10 @@ class VariableContainer(Mapping, Variable):
else:
return 0
def serialize(self, value):
def _serialize(self, value):
return {k: str(v.uhash) for k, v in self.items()}
def deserialize(self, value):
def _deserialize(self, value):
adict = dict()
varparams = dict(self.__varparams)
varparams["uhash_nonce"] = None
......@@ -293,26 +290,22 @@ class VariableContainer(Mapping, Variable):
super().dump()
@property
def container_exists(self):
return super()._exists()
def container_has_persistent_value(self):
return super()._has_persistent_value()
def _exists(self):
if self.container_exists:
return all(v.exists for v in self.values())
def _has_persistent_value(self):
if self.container_has_persistent_value:
return all(v.has_persistent_value for v in self.values())
else:
return False
@property
def container_available(self):
try:
return super()._available()
except PersistencyError:
# Lazy loading failed
return False
def container_has_runtime_value(self):
return super()._has_runtime_value()
def _available(self):
if self.container_available:
return all(v.available for v in self.values())
def _has_runtime_value(self):
if self.container_has_runtime_value:
return all(v.has_runtime_value for v in self.values())
else:
return False
......@@ -323,7 +316,7 @@ class VariableContainer(Mapping, Variable):
@property
def variable_uhashes(self):
return self.serialize(self.value)
return self._serialize(self.value)
@property
def variable_values(self):
......
......@@ -7,16 +7,18 @@ VALUES = [None, True, 10, "string", 10.1, [1, 2, 3], {"1": 1, "2": {"2": [10, 20
def test_variable_missing_data(varinfo):
v = Variable(varinfo=varinfo)
assert not v.available
assert not v.exists
assert not v.has_runtime_value
assert not v.has_persistent_value
assert not v.value
assert not v.uri
v.dump()
v.load()
assert not v.available
assert not v.exists
assert not v.has_runtime_value
assert not v.has_persistent_value
assert not v.value
assert not v.uri
assert v.value is v.MISSING_DATA
assert v.value == v.MISSING_DATA
def test_variable_none_uhash():
......@@ -96,8 +98,8 @@ def test_variable_chain(varinfo):
v1.value += 1
assert v1 == v2
assert v1.value != v2.value
with pytest.raises(RuntimeError):
v2.validate()
assert not v2.has_persistent_value
assert v2.has_runtime_value
def test_variable_persistency(varinfo):
......@@ -107,28 +109,30 @@ def test_variable_persistency(varinfo):
v3 = Variable(uhash=v1.uhash, varinfo=varinfo)
v4 = Variable(uhash=v2, varinfo=varinfo)
assert not v1.exists
assert not v2.exists
assert not v3.exists
assert not v4.exists
assert v1.available
assert v2.available
assert not v3.available
assert not v4.available
for v in (v1, v2):
assert not v.has_persistent_value
assert v.has_runtime_value
for v in (v3, v4):
assert not v.has_persistent_value
assert not v.has_runtime_value
v1.dump()
assert v1.exists
assert v2.exists
assert v3.exists
assert v4.exists
assert v1.available
assert v2.available
assert v3.available
assert v4.available
v1.validate()
v2.validate()
v3.validate()
v4.validate()
for v in (v1, v2):
assert v.has_persistent_value
assert v.has_runtime_value
for v in (v3, v4):
assert v.has_persistent_value
assert not v.has_runtime_value
v3.load()
v4.load()
for v in (v3, v4):
assert v.has_persistent_value
assert v.has_runtime_value
def test_variable_container_uhash(varinfo):
......@@ -178,13 +182,19 @@ def test_variable_container_persistency(tmpdir, varinfo):
assert v.uhash != v1.uhash
for k in v1:
assert v1[k] is v2[k]
assert v1.available
assert v2.available
assert not v3.available
assert not v1.exists
assert not v2.exists
assert not v3.exists
assert not v4.exists
for v in (v1, v2):
assert v.container_has_runtime_value
assert v.has_runtime_value
assert not v.container_has_persistent_value
assert not v.has_persistent_value
for v in (v3, v4):
assert not v.container_has_runtime_value
assert not v.has_runtime_value
assert not v.container_has_persistent_value
assert not v.has_persistent_value
assert len(v1) == len(values)
assert len(v2) == len(values)
assert len(v3) == 0
......@@ -196,14 +206,20 @@ def test_variable_container_persistency(tmpdir, varinfo):
v1.dump()
assert len(tmpdir.listdir()) == len(values) + 1
assert v1.available
assert v2.available
assert v3.available
assert v4.available
assert v1.exists
assert v2.exists
assert v3.exists
assert v4.exists
for v in (v1, v2):
assert v.container_has_runtime_value
assert v.has_runtime_value
assert v.container_has_persistent_value
assert v.has_persistent_value
for v in (v3, v4):
assert not v.container_has_runtime_value
assert not v.has_runtime_value
assert v.container_has_persistent_value
assert v.has_persistent_value # calls load
assert v.container_has_runtime_value
assert len(v1) == len(values)
assert len(v2) == len(values)
assert len(v3) == len(values)
......
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment