Commit 8598e7af authored by Rolf Krahl's avatar Rolf Krahl

Merge branch 'locking-plugin'

parents 5bde2605 959c246d
......@@ -4,7 +4,7 @@
<groupId>org.icatproject</groupId>
<artifactId>ids.server</artifactId>
<packaging>war</packaging>
<version>1.9.2-SNAPSHOT</version>
<version>1.10.0-SNAPSHOT</version>
<name>IDS Server</name>
<properties>
......
......@@ -28,11 +28,12 @@ import javax.ejb.Singleton;
import javax.json.Json;
import javax.json.stream.JsonGenerator;
import org.icatproject.ids.LockManager.AlreadyLockedException;
import org.icatproject.Dataset;
import org.icatproject.ids.LockManager.Lock;
import org.icatproject.ids.LockManager.LockInfo;
import org.icatproject.ids.LockManager.LockType;
import org.icatproject.ids.exceptions.InternalException;
import org.icatproject.ids.plugin.AlreadyLockedException;
import org.icatproject.ids.plugin.DfInfo;
import org.icatproject.ids.plugin.DsInfo;
import org.icatproject.ids.thread.DfArchiver;
......@@ -73,16 +74,27 @@ public class FiniteStateMachine {
Entry<DfInfoImpl, RequestedState> opEntry = it.next();
DfInfoImpl dfInfo = opEntry.getKey();
Long dsId = dfInfo.getDsId();
DsInfo dsInfo;
try {
Dataset ds = (Dataset) reader.get("Dataset ds INCLUDE ds.investigation.facility", dsId);
dsInfo = new DsInfoImpl(ds);
} catch (Exception e) {
logger.error("Could not get dsInfo {}: {}.", dsId, e.getMessage());
continue;
}
if (!dfChanging.containsKey(dfInfo)) {
final RequestedState state = opEntry.getValue();
logger.debug(dfInfo + " " + state);
if (state == RequestedState.WRITE_REQUESTED) {
if (!writeLocks.containsKey(dsId)) {
try {
writeLocks.put(dsId, lockManager.lock(dsId, LockType.SHARED));
writeLocks.put(dsId, lockManager.lock(dsInfo, LockType.SHARED));
} catch (AlreadyLockedException e) {
logger.debug("Could not acquire lock on " + dsId + ", hold back " + state);
continue;
} catch (IOException e) {
logger.error("I/O exception " + e.getMessage() + " locking " + dsId);
continue;
}
}
it.remove();
......@@ -91,10 +103,13 @@ public class FiniteStateMachine {
} else if (state == RequestedState.WRITE_THEN_ARCHIVE_REQUESTED) {
if (!writeLocks.containsKey(dsId)) {
try {
writeLocks.put(dsId, lockManager.lock(dsId, LockType.SHARED));
writeLocks.put(dsId, lockManager.lock(dsInfo, LockType.SHARED));
} catch (AlreadyLockedException e) {
logger.debug("Could not acquire lock on " + dsId + ", hold back " + state);
continue;
} catch (IOException e) {
logger.error("I/O exception " + e.getMessage() + " locking " + dsId);
continue;
}
}
it.remove();
......@@ -104,10 +119,13 @@ public class FiniteStateMachine {
} else if (state == RequestedState.ARCHIVE_REQUESTED) {
if (!archiveLocks.containsKey(dsId)) {
try {
archiveLocks.put(dsId, lockManager.lock(dsId, LockType.EXCLUSIVE));
archiveLocks.put(dsId, lockManager.lock(dsInfo, LockType.EXCLUSIVE));
} catch (AlreadyLockedException e) {
logger.debug("Could not acquire lock on " + dsId + ", hold back " + state);
continue;
} catch (IOException e) {
logger.error("I/O exception " + e.getMessage() + " locking " + dsId);
continue;
}
}
it.remove();
......@@ -116,10 +134,13 @@ public class FiniteStateMachine {
} else if (state == RequestedState.RESTORE_REQUESTED) {
if (!restoreLocks.containsKey(dsId)) {
try {
restoreLocks.put(dsId, lockManager.lock(dsId, LockType.EXCLUSIVE));
restoreLocks.put(dsId, lockManager.lock(dsInfo, LockType.EXCLUSIVE));
} catch (AlreadyLockedException e) {
logger.debug("Could not acquire lock on " + dsId + ", hold back " + state);
continue;
} catch (IOException e) {
logger.error("I/O exception " + e.getMessage() + " locking " + dsId);
continue;
}
}
it.remove();
......@@ -128,10 +149,13 @@ public class FiniteStateMachine {
} else if (state == RequestedState.DELETE_REQUESTED) {
if (!deleteLocks.containsKey(dsId)) {
try {
deleteLocks.put(dsId, lockManager.lock(dsId, LockType.EXCLUSIVE));
deleteLocks.put(dsId, lockManager.lock(dsInfo, LockType.EXCLUSIVE));
} catch (AlreadyLockedException e) {
logger.debug("Could not acquire lock on " + dsId + ", hold back " + state);
continue;
} catch (IOException e) {
logger.error("I/O exception " + e.getMessage() + " locking " + dsId);
continue;
}
}
it.remove();
......@@ -210,6 +234,8 @@ public class FiniteStateMachine {
}
} catch (AlreadyLockedException e) {
logger.debug("Could not acquire lock on " + dsInfo + ", hold back process with " + state);
} catch (IOException e) {
logger.error("I/O exception " + e.getMessage() + " locking " + dsInfo);
}
}
} else if (state == RequestedState.ARCHIVE_REQUESTED) {
......@@ -224,6 +250,8 @@ public class FiniteStateMachine {
w.start();
} catch (AlreadyLockedException e) {
logger.debug("Could not acquire lock on " + dsInfo + ", hold back process with " + state);
} catch (IOException e) {
logger.error("I/O exception " + e.getMessage() + " locking " + dsInfo);
}
} else if (state == RequestedState.RESTORE_REQUESTED) {
try {
......@@ -236,6 +264,8 @@ public class FiniteStateMachine {
w.start();
} catch (AlreadyLockedException e) {
logger.debug("Could not acquire lock on " + dsInfo + ", hold back process with " + state);
} catch (IOException e) {
logger.error("I/O exception " + e.getMessage() + " locking " + dsInfo);
}
}
}
......
......@@ -61,7 +61,6 @@ import org.icatproject.ICAT;
import org.icatproject.IcatExceptionType;
import org.icatproject.IcatException_Exception;
import org.icatproject.ids.DataSelection.Returns;
import org.icatproject.ids.LockManager.AlreadyLockedException;
import org.icatproject.ids.LockManager.Lock;
import org.icatproject.ids.LockManager.LockType;
import org.icatproject.ids.exceptions.BadRequestException;
......@@ -71,6 +70,7 @@ import org.icatproject.ids.exceptions.InsufficientPrivilegesException;
import org.icatproject.ids.exceptions.InternalException;
import org.icatproject.ids.exceptions.NotFoundException;
import org.icatproject.ids.exceptions.NotImplementedException;
import org.icatproject.ids.plugin.AlreadyLockedException;
import org.icatproject.ids.plugin.ArchiveStorageInterface;
import org.icatproject.ids.plugin.DfInfo;
import org.icatproject.ids.plugin.DsInfo;
......@@ -723,6 +723,9 @@ public class IdsBean {
} catch (AlreadyLockedException e) {
logger.debug("Could not acquire lock, delete failed");
throw new DataNotOnlineException("Data is busy");
} catch (IOException e) {
logger.error("I/O error " + e.getMessage());
throw new InternalException(e.getClass() + " " + e.getMessage());
}
if (logSet.contains(CallType.WRITE)) {
......@@ -817,6 +820,12 @@ public class IdsBean {
} catch (AlreadyLockedException e) {
logger.debug("Could not acquire lock, getData failed");
throw new DataNotOnlineException("Data is busy");
} catch (IOException e) {
if (lock != null) {
lock.release();
}
logger.error("I/O error " + e.getMessage());
throw new InternalException(e.getClass() + " " + e.getMessage());
} catch (IdsException e) {
lock.release();
throw e;
......@@ -900,6 +909,12 @@ public class IdsBean {
} catch (AlreadyLockedException e) {
logger.debug("Could not acquire lock, getData failed");
throw new DataNotOnlineException("Data is busy");
} catch (IOException e) {
if (lock != null) {
lock.release();
}
logger.error("I/O error " + e.getMessage());
throw new InternalException(e.getClass() + " " + e.getMessage());
} catch (IdsException e) {
lock.release();
throw e;
......@@ -1048,10 +1063,10 @@ public class IdsBean {
}
String location = getLocation(datafile.getId(), datafile.getLocation());
DsInfo dsInfo = new DsInfoImpl(datafile.getDataset());
try (Lock lock = lockManager.lock(datafile.getDataset().getId(), LockType.SHARED)) {
try (Lock lock = lockManager.lock(dsInfo, LockType.SHARED)) {
if (storageUnit == StorageUnit.DATASET) {
DsInfo dsInfo = new DsInfoImpl(datafile.getDataset());
Set<Long> mt = Collections.emptySet();
if (restoreIfOffline(dsInfo, mt)) {
throw new DataNotOnlineException(
......@@ -2120,6 +2135,9 @@ public class IdsBean {
} catch (AlreadyLockedException e) {
logger.debug("Could not acquire lock, write failed");
throw new DataNotOnlineException("Data is busy");
} catch (IOException e) {
logger.error("I/O error " + e.getMessage() + " writing");
throw new InternalException(e.getClass() + " " + e.getMessage());
}
if (logSet.contains(CallType.MIGRATE)) {
......
package org.icatproject.ids;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
......@@ -10,7 +11,9 @@ import javax.ejb.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.icatproject.ids.plugin.AlreadyLockedException;
import org.icatproject.ids.plugin.DsInfo;
import org.icatproject.ids.plugin.MainStorageInterface;
@Singleton
public class LockManager {
......@@ -19,13 +22,6 @@ public class LockManager {
SHARED, EXCLUSIVE
}
@SuppressWarnings("serial")
public class AlreadyLockedException extends Exception {
public AlreadyLockedException() {
super("Resource is already locked.");
}
}
public class LockInfo {
public final Long id;
public final LockType type;
......@@ -77,10 +73,11 @@ public class LockManager {
private class SingleLock extends Lock {
private final Long id;
private boolean isValid;
SingleLock(Long id) {
private AutoCloseable storageLock;
SingleLock(Long id, AutoCloseable storageLock) {
this.id = id;
this.isValid = true;
this.storageLock = storageLock;
}
public void release() {
......@@ -88,6 +85,13 @@ public class LockManager {
if (isValid) {
lockMap.get(id).dec();
isValid = false;
if (storageLock != null) {
try {
storageLock.close();
} catch (Exception e) {
logger.error("Error while closing lock on {} in the storage plugin: {}.", id, e.getMessage());
}
}
logger.debug("Released a lock on {}.", id);
}
}
......@@ -113,14 +117,20 @@ public class LockManager {
}
private static Logger logger = LoggerFactory.getLogger(LockManager.class);
private PropertyHandler propertyHandler;
private MainStorageInterface mainStorage;
private Map<Long, LockEntry> lockMap = new HashMap<>();
@PostConstruct
private void init() {
propertyHandler = PropertyHandler.getInstance();
mainStorage = propertyHandler.getMainStorage();
logger.debug("LockManager initialized.");
}
public Lock lock(Long id, LockType type) throws AlreadyLockedException {
public Lock lock(DsInfo ds, LockType type) throws AlreadyLockedException, IOException {
Long id = ds.getDsId();
assert id != null;
synchronized (lockMap) {
LockEntry le = lockMap.get(id);
if (le == null) {
......@@ -131,24 +141,25 @@ public class LockManager {
}
}
le.inc();
AutoCloseable storageLock;
try {
storageLock = mainStorage.lock(ds, type == LockType.SHARED);
} catch (AlreadyLockedException | IOException e) {
le.dec();
throw e;
}
logger.debug("Acquired a {} lock on {}.", type, id);
return new SingleLock(id);
return new SingleLock(id, storageLock);
}
}
public Lock lock(DsInfo ds, LockType type) throws AlreadyLockedException {
Long id = ds.getDsId();
assert id != null;
return lock(id, type);
}
public Lock lock(Collection<DsInfo> datasets, LockType type) throws AlreadyLockedException {
public Lock lock(Collection<DsInfo> datasets, LockType type) throws AlreadyLockedException, IOException {
LockCollection locks = new LockCollection();
try {
for (DsInfo ds : datasets) {
locks.add(lock(ds, type));
}
} catch (AlreadyLockedException e) {
} catch (AlreadyLockedException | IOException e) {
locks.release();
throw e;
}
......
......@@ -6,10 +6,15 @@
<h1>IDS Server Release Notes</h1>
<h2>1.9.2</h2>
<p>Not yet released!</p>
<h2>1.10.0</h2>
<p>Add file system locking in the storage plugin</p>
<ul>
<li>Require ids.plugin 1.5.0..</li>
<li>Call mainStorage.lock(DsInfo, boolean) whenever a dataset is locked internally.
This call has been added to MainStorageInterface ids.plugin 1.5.0. The
plugin may implement this call to acquire a file system lock. This would
then allow safe concurrent access to main storage for other processes.
(Issue #80)</li>
<li>Require ids.plugin 1.5.0.</li>
</ul>
<h2>1.9.1</h2>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment