Skip to content
Snippets Groups Projects
Commit b0f23cd8 authored by Nicola Fulvio Calabria's avatar Nicola Fulvio Calabria
Browse files

Task #3637 - Added Fault management for transfer services up to 3/6

redmine subtasks
parent eff3f2ab
No related branches found
No related tags found
No related merge requests found
Showing
with 255 additions and 60 deletions
......@@ -6,8 +6,10 @@ import net.ivoa.xml.uws.v1.ExecutionPhase;
import net.ivoa.xml.uws.v1.JobSummary;
import net.ivoa.xml.vospace.v2.Protocol;
import net.ivoa.xml.vospace.v2.Transfer;
import net.ivoa.xml.uws.v1.ErrorSummaryFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import it.inaf.oats.vospace.exception.VoSpaceErrorSummarizableException;
@Service
public class JobService {
......@@ -61,30 +63,45 @@ public class JobService {
}
private void handlePullToVoSpace(JobSummary job, Transfer transfer) {
try {
for (Protocol protocol : transfer.getProtocols()) {
switch (protocol.getUri()) {
case "ia2:async-recall":
asyncTransfService.startJob(job);
// ASK IF IT's OK neglect phase update.
return;
case "ivo://ivoa.net/vospace/core#httpget":
String nodeUri = transfer.getTarget();
String contentUri = protocol.getEndpoint();
uriService.setNodeRemoteLocation(nodeUri, contentUri);
uriService.setTransferJobResult(job, transfer);
job.setPhase(ExecutionPhase.COMPLETED);
jobDAO.updateJob(job);
return;
default:
throw new InternalFaultException("Unsupported pullToVoSpace protocol: " + protocol.getUri());
}
}
} catch (VoSpaceErrorSummarizableException e) {
job.setPhase(ExecutionPhase.ERROR);
job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e.getFault()));
jobDAO.updateJob(job);
}
}
private void handleVoSpaceUrlsListResult(JobSummary job, Transfer transfer) {
try {
job.setPhase(ExecutionPhase.EXECUTING);
uriService.setTransferJobResult(job, transfer);
job.setPhase(ExecutionPhase.COMPLETED);
// Need to catch other exceptions too to avoid inconsistent job status
} catch (VoSpaceErrorSummarizableException e) {
job.setPhase(ExecutionPhase.ERROR);
job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e.getFault()));
} finally {
jobDAO.updateJob(job);
}
}
private JobType getJobType(Transfer transfer) {
return JobType.valueOf(transfer.getDirection());
......@@ -92,11 +109,23 @@ public class JobService {
/**
* Synchronous transfer endpoint creates a job that is immediately set to
* completed.
* COMPLETED or to ERROR in case some fault occurred.
*
* In case of ERROR, Protocols are stripped from job representation in
* compliance with specifications
*
*/
public void createSyncJobResult(JobSummary job) {
job.setPhase(ExecutionPhase.COMPLETED);
try {
uriService.setSyncTransferEndpoints(job);
job.setPhase(ExecutionPhase.COMPLETED);
// Need to catch other exceptions too to avoid inconsistent job status
} catch (VoSpaceErrorSummarizableException e) {
job.setPhase(ExecutionPhase.ERROR);
uriService.getTransfer(job).getProtocols().clear();
job.setErrorSummary(ErrorSummaryFactory.newErrorSummary(e.getFault()));
} finally {
jobDAO.createJob(job);
}
}
}
package it.inaf.oats.vospace;
import it.inaf.ia2.aa.data.User;
import it.inaf.oats.vospace.datamodel.NodeProperties;
import it.inaf.oats.vospace.persistence.JobDAO;
import java.util.Optional;
import java.util.UUID;
......@@ -9,6 +10,7 @@ import net.ivoa.xml.uws.v1.ExecutionPhase;
import net.ivoa.xml.uws.v1.JobSummary;
import net.ivoa.xml.uws.v1.Jobs;
import net.ivoa.xml.vospace.v2.Transfer;
import net.ivoa.xml.vospace.v2.Param;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
......@@ -23,6 +25,7 @@ import org.springframework.web.bind.annotation.RestController;
import org.springframework.format.annotation.DateTimeFormat;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
@RestController
public class TransferController {
......@@ -115,11 +118,11 @@ public class TransferController {
@RequestParam(value = "direction", required = false) Optional<List<JobService.JobType>> direction,
User principal) {
if(last.isPresent())
{
if(last.get() <= 0)
if (last.isPresent()) {
if (last.get() <= 0) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).build();
}
}
String userId = principal.getName();
......
......@@ -3,11 +3,15 @@ package it.inaf.oats.vospace;
import it.inaf.ia2.aa.ServletRapClient;
import it.inaf.ia2.aa.data.User;
import it.inaf.ia2.rap.client.call.TokenExchangeRequest;
import it.inaf.oats.vospace.JobService.JobType;
import it.inaf.oats.vospace.datamodel.NodeProperties;
import it.inaf.oats.vospace.datamodel.NodeUtils;
import static it.inaf.oats.vospace.datamodel.NodeUtils.urlEncodePath;
import it.inaf.oats.vospace.exception.InternalFaultException;
import it.inaf.oats.vospace.exception.NodeNotFoundException;
import it.inaf.oats.vospace.exception.PermissionDeniedException;
import it.inaf.oats.vospace.exception.ProtocolNotSupportedException;
import it.inaf.oats.vospace.exception.NodeBusyException;
import it.inaf.oats.vospace.persistence.LocationDAO;
import it.inaf.oats.vospace.persistence.NodeDAO;
import it.inaf.oats.vospace.persistence.model.Location;
......@@ -15,12 +19,14 @@ import it.inaf.oats.vospace.persistence.model.LocationType;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import javax.servlet.http.HttpServletRequest;
import net.ivoa.xml.uws.v1.ExecutionPhase;
import net.ivoa.xml.uws.v1.JobSummary;
import net.ivoa.xml.uws.v1.ResultReference;
import net.ivoa.xml.vospace.v2.Node;
import net.ivoa.xml.vospace.v2.Param;
import net.ivoa.xml.vospace.v2.Protocol;
import net.ivoa.xml.vospace.v2.Transfer;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -57,7 +63,8 @@ public class UriService {
results.add(result);
job.setResults(results);
job.setPhase(ExecutionPhase.COMPLETED);
// Moved phase setting to caller method for ERROR management
// job.setPhase(ExecutionPhase.COMPLETED);
}
public void setSyncTransferEndpoints(JobSummary job) {
......@@ -68,7 +75,8 @@ public class UriService {
if (!"ivo://ivoa.net/vospace/core#httpget".equals(protocol.getUri())
&& !"ivo://ivoa.net/vospace/core#httpput".equals(protocol.getUri())) {
throw new IllegalStateException("Unsupported protocol " + protocol.getUri());
// throw new IllegalStateException("Unsupported protocol " + protocol.getUri());
throw new ProtocolNotSupportedException(protocol.getUri());
}
protocol.setEndpoint(getEndpoint(job, transfer));
}
......@@ -79,6 +87,35 @@ public class UriService {
Node node = nodeDao.listNode(relativePath).orElseThrow(() -> new NodeNotFoundException(relativePath));
User user = (User) servletRequest.getUserPrincipal();
String creator = user.getName();
List<String> groups = user.getGroups();
// Check privileges write or read according to job type
JobService.JobType jobType = JobType.valueOf(transfer.getDirection());
switch (jobType) {
case pushToVoSpace:
case pullToVoSpace:
if (!NodeUtils.checkIfWritable(node, creator, groups)) {
throw new PermissionDeniedException(relativePath);
}
break;
case pullFromVoSpace:
if (!NodeUtils.checkIfReadable(node, creator, groups)) {
throw new PermissionDeniedException(relativePath);
}
break;
default:
throw new InternalFaultException("No job direction specified");
}
if (NodeUtils.getIsBusy(node)) {
throw new NodeBusyException(relativePath);
}
Location location = locationDAO.getNodeLocation(relativePath).orElse(null);
String endpoint;
......
package it.inaf.oats.vospace.exception;
import net.ivoa.xml.uws.v1.ErrorSummaryFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ResponseStatus;
@ResponseStatus(value = HttpStatus.CONFLICT)
public class DuplicateNodeException extends VoSpaceException {
public class DuplicateNodeException extends VoSpaceErrorSummarizableException {
public DuplicateNodeException(String path) {
super("Duplicate Node at path: " + path);
super("Duplicate Node at path: " + path,
ErrorSummaryFactory.VOSpaceFault.DUPLICATE_NODE);
}
}
package it.inaf.oats.vospace.exception;
import net.ivoa.xml.uws.v1.ErrorSummaryFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ResponseStatus;
@ResponseStatus(value = HttpStatus.INTERNAL_SERVER_ERROR) // Status code 500
public class InternalFaultException extends VoSpaceException {
public class InternalFaultException extends VoSpaceErrorSummarizableException {
private static final Logger LOG = LoggerFactory.getLogger(InternalFaultException.class);
public InternalFaultException(String msg) {
super("InternalFaultException: " + msg);
super("InternalFaultException: " + msg,
ErrorSummaryFactory.VOSpaceFault.INTERNAL_FAULT);
}
public InternalFaultException(Throwable cause) {
super("InternalFaultException: " + getMessage(cause));
super("InternalFaultException: " + getMessage(cause),
ErrorSummaryFactory.VOSpaceFault.INTERNAL_FAULT);
}
private static String getMessage(Throwable cause) {
......
package it.inaf.oats.vospace.exception;
import net.ivoa.xml.uws.v1.ErrorSummaryFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ResponseStatus;
@ResponseStatus(value = HttpStatus.BAD_REQUEST)
public class InvalidURIException extends VoSpaceException {
public class InvalidURIException extends VoSpaceErrorSummarizableException {
public InvalidURIException(String URI, String path) {
super("InvalidURI. Payload node URI: " + URI
+ " is not consistent with request path: " + path);
+ " is not consistent with request path: " + path,
ErrorSummaryFactory.VOSpaceFault.INVALID_URI);
}
public InvalidURIException(String URI) {
super("InvalidURI. URI: " + URI + " is not in a valid format");
super("InvalidURI. URI: " + URI + " is not in a valid format",
ErrorSummaryFactory.VOSpaceFault.INVALID_URI);
}
public InvalidURIException(IllegalArgumentException ex) {
super("InvalidURI. " + ex.getMessage());
super("InvalidURI. " + ex.getMessage(),
ErrorSummaryFactory.VOSpaceFault.INVALID_URI);
}
}
package it.inaf.oats.vospace.exception;
import net.ivoa.xml.uws.v1.ErrorSummaryFactory;
public class NodeBusyException extends VoSpaceErrorSummarizableException {
public NodeBusyException(String path) {
super("Node Busy: at path " + path,
ErrorSummaryFactory.VOSpaceFault.NODE_BUSY);
}
}
package it.inaf.oats.vospace.exception;
import net.ivoa.xml.uws.v1.ErrorSummaryFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ResponseStatus;
@ResponseStatus(value = HttpStatus.NOT_FOUND)
public class NodeNotFoundException extends VoSpaceException {
public class NodeNotFoundException extends VoSpaceErrorSummarizableException {
public NodeNotFoundException(String path) {
super("NodeNotFound: " + path);
super("NodeNotFound: " + path,
ErrorSummaryFactory.VOSpaceFault.NODE_NOT_FOUND);
}
}
package it.inaf.oats.vospace.exception;
import net.ivoa.xml.uws.v1.ErrorSummaryFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ResponseStatus;
@ResponseStatus(value = HttpStatus.FORBIDDEN)
public class PermissionDeniedException extends VoSpaceException {
public class PermissionDeniedException extends VoSpaceErrorSummarizableException {
public PermissionDeniedException(String path) {
super("Permission Denied at path: " + path);
super("Permission Denied at path: " + path,
ErrorSummaryFactory.VOSpaceFault.PERMISSION_DENIED);
}
}
package it.inaf.oats.vospace.exception;
import net.ivoa.xml.uws.v1.ErrorSummaryFactory;
public class ProtocolNotSupportedException extends VoSpaceErrorSummarizableException{
public ProtocolNotSupportedException(String protocol) {
super("Protocol Not Supported: " + protocol,
ErrorSummaryFactory.VOSpaceFault.PROTOCOL_NOT_SUPPORTED);
}
}
package it.inaf.oats.vospace.exception;
import net.ivoa.xml.uws.v1.ErrorSummaryFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ResponseStatus;
@ResponseStatus(value = HttpStatus.INTERNAL_SERVER_ERROR)
public class VoSpaceErrorSummarizableException extends VoSpaceException {
ErrorSummaryFactory.VOSpaceFault fault;
public VoSpaceErrorSummarizableException(String message,
ErrorSummaryFactory.VOSpaceFault fault)
{
super(message);
this.fault = fault;
}
public ErrorSummaryFactory.VOSpaceFault getFault()
{
return this.fault;
}
}
......@@ -3,6 +3,7 @@ package it.inaf.oats.vospace.persistence;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sun.org.apache.xalan.internal.xsltc.compiler.util.Type;
import java.sql.Timestamp;
import java.sql.ResultSet;
import java.sql.SQLException;
......@@ -14,6 +15,7 @@ import javax.xml.datatype.DatatypeFactory;
import javax.xml.datatype.XMLGregorianCalendar;
import net.ivoa.xml.uws.v1.ExecutionPhase;
import net.ivoa.xml.uws.v1.JobSummary;
import net.ivoa.xml.uws.v1.ErrorSummary;
import net.ivoa.xml.uws.v1.ShortJobDescription;
import net.ivoa.xml.uws.v1.ResultReference;
import net.ivoa.xml.uws.v1.Jobs;
......@@ -25,6 +27,7 @@ import org.springframework.stereotype.Repository;
import java.util.ArrayList;
import java.time.LocalDateTime;
import java.math.BigDecimal;
import net.ivoa.xml.uws.v1.ErrorType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -44,7 +47,10 @@ public class JobDAO {
public void createJob(JobSummary jobSummary) {
String sql = "INSERT INTO job(job_id, owner_id, job_type, phase, job_info) VALUES (?, ?, ?, ?, ?)";
String sql =
"INSERT INTO job(job_id, owner_id, job_type, phase, job_info,"
+ " error_message, error_type, error_has_detail) "
+ "VALUES (?, ?, ?, ?, ?, ? ,? ,?)";
jdbcTemplate.update(sql, ps -> {
int i = 0;
......@@ -53,6 +59,17 @@ public class JobDAO {
ps.setObject(++i, getJobType(jobSummary), Types.OTHER);
ps.setObject(++i, jobSummary.getPhase().value(), Types.OTHER);
ps.setObject(++i, toJson(jobSummary.getJobInfo()), Types.OTHER);
ErrorSummary errorSummary = jobSummary.getErrorSummary();
if(errorSummary != null) {
ps.setString(++i, errorSummary.getMessage());
ps.setObject(++i, errorSummary.getType().value(), Types.OTHER);
ps.setBoolean(++i, errorSummary.isHasDetail());
} else {
ps.setNull(++i, Types.VARCHAR);
ps.setNull(++i, Types.OTHER);
ps.setNull(++i, Types.BOOLEAN);
}
});
}
......@@ -103,6 +120,16 @@ public class JobDAO {
jobSummary.setJobInfo(getJobPayload(rs.getString("job_info")));
jobSummary.setResults(getResults(rs.getString("results")));
// Retrieve error information if any
ErrorSummary errorSummary = new ErrorSummary();
errorSummary.setMessage(rs.getString("error_message"));
String errorType = rs.getString("error_type");
if (errorType != null) {
errorSummary.setType(ErrorType.fromValue(rs.getString("error_type")));
}
errorSummary.setHasDetail(rs.getBoolean("error_has_detail"));
jobSummary.setErrorSummary(errorSummary);
return jobSummary;
}
......@@ -228,13 +255,27 @@ public class JobDAO {
public void updateJob(JobSummary job) {
String sql = "UPDATE job SET phase = ?, results = ? WHERE job_id = ?";
String sql = "UPDATE job SET phase = ?, results = ? ";
ErrorSummary errorSummary = job.getErrorSummary();
if(errorSummary != null)
{
sql += ", error_message = ?, error_type = ?, error_has_detail = ? ";
}
sql += "WHERE job_id = ?";
jdbcTemplate.update(sql, ps -> {
int i = 0;
ps.setObject(++i, job.getPhase().name(), Types.OTHER);
ps.setObject(++i, toJson(job.getResults()), Types.OTHER);
ps.setString(++i, job.getJobId());
if(errorSummary != null)
{
ps.setString(++i, errorSummary.getMessage());
ps.setObject(++i, errorSummary.getType().name(), Types.OTHER);
ps.setBoolean(++i, errorSummary.isHasDetail());
}
});
}
......
......@@ -2,6 +2,7 @@ package it.inaf.oats.vospace;
import it.inaf.ia2.aa.data.User;
import static it.inaf.oats.vospace.VOSpaceXmlTestUtil.loadDocument;
import it.inaf.oats.vospace.datamodel.NodeProperties;
import it.inaf.oats.vospace.persistence.JobDAO;
import it.inaf.oats.vospace.persistence.LocationDAO;
import it.inaf.oats.vospace.persistence.NodeDAO;
......@@ -155,6 +156,7 @@ public class TransferControllerTest {
when(nodeDao.listNode(eq(path))).thenReturn(Optional.of(node));
String redirect = mockMvc.perform(post("/transfers?PHASE=RUN")
.header("Authorization", "Bearer user1_token")
.content(requestBody)
.contentType(MediaType.APPLICATION_XML)
.accept(MediaType.APPLICATION_XML))
......@@ -224,6 +226,17 @@ public class TransferControllerTest {
property.setUri("ivo://ivoa.net/vospace/core#publicread");
property.setValue("true");
node.getProperties().add(property);
Property ownerProp = new Property();
ownerProp.setUri(NodeProperties.CREATOR_URI);
ownerProp.setValue("user1");
node.getProperties().add(ownerProp);
Property groupProp = new Property();
groupProp.setUri(NodeProperties.GROUP_WRITE_URI);
groupProp.setValue("group1");
node.getProperties().add(groupProp);
return node;
}
......
......@@ -93,11 +93,21 @@ public class UriServiceTest {
public void testPrivateUrl() {
Node node = new DataNode();
Property creator = new Property();
creator.setUri(NodeProperties.CREATOR_URI);
creator.setValue("user1");
node.getProperties().add(creator);
Property readgroup = new Property();
readgroup.setUri(NodeProperties.GROUP_READ_URI);
readgroup.setValue("group1");
node.getProperties().add(readgroup);
when(nodeDAO.listNode(eq("/mydata1"))).thenReturn(Optional.of(node));
User user = mock(User.class);
when(user.getAccessToken()).thenReturn("<token>");
when(user.getName()).thenReturn("user1");
when(servletRequest.getUserPrincipal()).thenReturn(user);
......@@ -108,7 +118,8 @@ public class UriServiceTest {
}), any())).thenReturn("<new-token>");
JobSummary job = getJob();
uriService.setTransferJobResult(job, uriService.getTransfer(job));
Transfer tr = uriService.getTransfer(job);
uriService.setTransferJobResult(job, tr);
assertEquals("http://file-service/mydata1?jobId=job-id&token=<new-token>", job.getResults().get(0).getHref());
}
......@@ -133,6 +144,7 @@ public class UriServiceTest {
Transfer transfer = new Transfer();
transfer.setTarget("vos://example.com!vospace/mydata1");
transfer.setDirection(JobService.JobType.pullFromVoSpace.toString());
JobSummary job = new JobSummary();
job.setJobId("job-id");
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment