Commit bbaa368d authored by Rolf Krahl's avatar Rolf Krahl

Merge branch 'issue94'

parents 8598e7af 62debce1
......@@ -18,7 +18,8 @@ maxIdsInQuery = 1000
# Properties for archive storage
plugin.archive.class = org.icatproject.ids.storage.ArchiveFileStorage
plugin.archive.dir = ${HOME}/ids/archive/
writeDelaySeconds = 60
delayDatasetWritesSeconds = 60
delayDatafileOperationsSeconds = 60
startArchivingLevel1024bytes = 5000000
stopArchivingLevel1024bytes = 4000000
storageUnit = dataset
......
......@@ -56,8 +56,8 @@ public class FiniteStateMachine {
public void run() {
try {
synchronized (deferredDfOpsQueue) {
if (writeTime != null && System.currentTimeMillis() > writeTime && !deferredDfOpsQueue.isEmpty()) {
writeTime = null;
if (processOpsTime != null && System.currentTimeMillis() > processOpsTime && !deferredDfOpsQueue.isEmpty()) {
processOpsTime = null;
logger.debug("deferredDfOpsQueue has " + deferredDfOpsQueue.size() + " entries");
List<DfInfo> writes = new ArrayList<>();
List<DfInfo> archives = new ArrayList<>();
......@@ -171,7 +171,7 @@ public class FiniteStateMachine {
logger.debug("Adding {} operations to be scheduled next time round", newOps.size());
}
if (!deferredDfOpsQueue.isEmpty()) {
writeTime = 0L;
processOpsTime = 0L;
}
if (!writes.isEmpty()) {
logger.debug("Launch thread to process " + writes.size() + " writes");
......@@ -287,7 +287,11 @@ public class FiniteStateMachine {
private static Logger logger = LoggerFactory.getLogger(FiniteStateMachine.class);
private long archiveWriteDelayMillis;
/*
* Note that the veriable processOpsDelayMillis is used to either delay all deferred
* datafile operations or to delay dataset writes, depending on the setting of storageUnit.
*/
private long processOpsDelayMillis;
private Map<DfInfoImpl, RequestedState> deferredDfOpsQueue = new HashMap<>();
......@@ -311,11 +315,7 @@ public class FiniteStateMachine {
private Timer timer = new Timer("FSM Timer");
/*
* Note that the variable writeTime has been abused and now also delays
* restore operations TODO
*/
private Long writeTime;
private Long processOpsTime;
private Map<DsInfo, Long> writeTimes = new HashMap<>();
......@@ -455,13 +455,14 @@ public class FiniteStateMachine {
private void init() {
try {
propertyHandler = PropertyHandler.getInstance();
archiveWriteDelayMillis = propertyHandler.getWriteDelaySeconds() * 1000L;
processQueueIntervalMillis = propertyHandler.getProcessQueueIntervalSeconds() * 1000L;
storageUnit = propertyHandler.getStorageUnit();
if (storageUnit == StorageUnit.DATASET) {
processOpsDelayMillis = propertyHandler.getDelayDatasetWrites() * 1000L;
timer.schedule(new DsProcessQueue(), processQueueIntervalMillis);
logger.info("DsProcessQueue scheduled to run in " + processQueueIntervalMillis + " milliseconds");
} else if (storageUnit == StorageUnit.DATAFILE) {
processOpsDelayMillis = propertyHandler.getDelayDatafileOperations() * 1000L;
timer.schedule(new DfProcessQueue(), processQueueIntervalMillis);
logger.info("DfProcessQueue scheduled to run in " + processQueueIntervalMillis + " milliseconds");
}
......@@ -477,9 +478,9 @@ public class FiniteStateMachine {
synchronized (deferredDfOpsQueue) {
if (writeTime == null) {
writeTime = System.currentTimeMillis() + archiveWriteDelayMillis;
final Date d = new Date(writeTime);
if (processOpsTime == null) {
processOpsTime = System.currentTimeMillis() + processOpsDelayMillis;
final Date d = new Date(processOpsTime);
logger.debug("Requesting delay operations till " + d);
}
......@@ -604,7 +605,7 @@ public class FiniteStateMachine {
}
private void setDelay(DsInfo dsInfo) {
writeTimes.put(dsInfo, System.currentTimeMillis() + archiveWriteDelayMillis);
writeTimes.put(dsInfo, System.currentTimeMillis() + processOpsDelayMillis);
if (logger.isDebugEnabled()) {
final Date d = new Date(writeTimes.get(dsInfo));
logger.debug("Requesting delay of writing of dataset " + dsInfo + " till " + d);
......
......@@ -68,7 +68,8 @@ public class PropertyHandler {
private long startArchivingLevel;
private long stopArchivingLevel;
private StorageUnit storageUnit;
private long writeDelaySeconds;
private long delayDatasetWrites;
private long delayDatafileOperations;
private ZipMapperInterface zipMapper;
private int tidyBlockSize;
private String key;
......@@ -145,8 +146,6 @@ public class PropertyHandler {
if (!props.has("plugin.archive.class")) {
logger.info("Property plugin.archive.class not set, single storage enabled.");
} else {
writeDelaySeconds = props.getPositiveLong("writeDelaySeconds");
try {
Class<ArchiveStorageInterface> klass = (Class<ArchiveStorageInterface>) Class
.forName(props.getString("plugin.archive.class"));
......@@ -174,6 +173,23 @@ public class PropertyHandler {
}
abort("storageUnit value " + props.getString("storageUnit") + " must be taken from " + vs);
}
if (storageUnit == StorageUnit.DATASET) {
if (!props.has("delayDatasetWritesSeconds") && props.has("writeDelaySeconds")) {
// compatibility mode
logger.warn("writeDelaySeconds is deprecated, please use delayDatasetWritesSeconds instead");
delayDatasetWrites = props.getPositiveLong("writeDelaySeconds");
} else {
delayDatasetWrites = props.getPositiveLong("delayDatasetWritesSeconds");
}
} else if (storageUnit == StorageUnit.DATAFILE) {
if (!props.has("delayDatafileOperationsSeconds") && props.has("writeDelaySeconds")) {
// compatibility mode
logger.warn("writeDelaySeconds is deprecated, please use delayDatafileOperationsSeconds instead");
delayDatafileOperations = props.getPositiveLong("writeDelaySeconds");
} else {
delayDatafileOperations = props.getPositiveLong("delayDatafileOperationsSeconds");
}
}
tidyBlockSize = props.getPositiveInt("tidyBlockSize");
}
......@@ -375,8 +391,12 @@ public class PropertyHandler {
return tidyBlockSize;
}
public long getWriteDelaySeconds() {
return writeDelaySeconds;
public long getDelayDatasetWrites() {
return delayDatasetWrites;
}
public long getDelayDatafileOperations() {
return delayDatafileOperations;
}
public ZipMapperInterface getZipMapper() {
......
......@@ -30,10 +30,18 @@ if arg == "INSTALL":
abort("Please create directory " + idsProperties.get("cache.dir") + " as specified in run.properties")
if idsProperties.get("plugin.archive.class"):
if not idsProperties.get("writeDelaySeconds"): abort("writeDelaySeconds is not set in run.properties")
if not idsProperties.get("startArchivingLevel1024bytes"): abort("startArchivingLevel1024bytes is not set in run.properties")
if not idsProperties.get("stopArchivingLevel1024bytes"): abort("stopArchivingLevel1024bytes is not set in run.properties")
if not idsProperties.get("tidyBlockSize"): abort("tidyBlockSize is not set in ids.properties")
if not idsProperties.get("storageUnit"): abort("storageUnit is not set in run.properties")
if idsProperties["storageUnit"].lower == "dataset":
if not (idsProperties.get("delayDatasetWritesSeconds") or
idsProperties.get("writeDelaySeconds")):
abort("delayDatasetWritesSeconds is not set in run.properties")
if idsProperties["storageUnit"].lower == "datafile":
if not (idsProperties.get("delayDatafileOperationsSeconds") or
idsProperties.get("writeDelaySeconds")):
abort("delayDatafileOperationsSeconds is not set in run.properties")
if int(idsProperties["filesCheck.parallelCount"]):
if not idsProperties.get("filesCheck.gapSeconds"): abort("filesCheck.gapSeconds is not set in run.properties")
......
......@@ -176,10 +176,17 @@
be deployed in the lib/applibs directory of your domain and must be
packaged with all it dependencies.</dd>
<dt>writeDelaySeconds</dt>
<dd>The amount of time to wait before writing to archive storage.
<dt>delayDatasetWritesSeconds</dt>
<dd>The amount of time to wait before writing a dataset archive storage.
This exists to allow enough time for all the datafiles to be added to
a dataset before it is zipped and written.</dd>
a dataset before it is zipped and written. This property is only
used if storageUnit is set to dataset, see below.</dd>
<dt>delayDatafileOperationsSeconds</dt>
<dd>The amount of time to wait before processing any deferred operations for
datafiles. Operations are collected during this period of time and
processed at once in combined threads. This property is only
used if storageUnit is set to datafile, see below.</dd>
<dt>startArchivingLevel1024bytes</dt>
<dd>If the space used in main storage exceeds this then datasets
......
......@@ -14,6 +14,9 @@
plugin may implement this call to acquire a file system lock. This would
then allow safe concurrent access to main storage for other processes.
(Issue #80)</li>
<li>Add new configuration properties delayDatasetWritesSeconds and
delayDatafileOperationsSeconds, replacing writeDelaySeconds. Deprecate
writeDelaySeconds. (Issue #94)</li>
<li>Require ids.plugin 1.5.0.</li>
</ul>
......
......@@ -14,7 +14,7 @@ maxIdsInQuery = 1000
plugin.archive.class = org.icatproject.ids.storage_test.ArchiveFileStorage
plugin.archive.dir = ${HOME}/data/ids/archive/
writeDelaySeconds = 6
delayDatasetWritesSeconds = 6
startArchivingLevel1024bytes = 5000000
stopArchivingLevel1024bytes = 4000000
storageUnit = dataset
......
......@@ -14,7 +14,7 @@ maxIdsInQuery = 1000
plugin.archive.class = org.icatproject.ids.storage_test.ArchiveFileStorage
plugin.archive.dir = ${HOME}/data/ids/archive/
writeDelaySeconds = 6
delayDatafileOperationsSeconds = 6
startArchivingLevel1024bytes = 5000000
stopArchivingLevel1024bytes = 4000000
storageUnit = datafile
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment