diff --git a/src/main/java/it/inaf/oats/vospace/JobService.java b/src/main/java/it/inaf/oats/vospace/JobService.java index 0277040f65d131e6efdbadd0265b2cc977995aa3..9cbdaf14d58cf2804675b3304705cbfdd784238a 100644 --- a/src/main/java/it/inaf/oats/vospace/JobService.java +++ b/src/main/java/it/inaf/oats/vospace/JobService.java @@ -10,10 +10,14 @@ import it.inaf.oats.vospace.exception.ErrorSummaryFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import it.inaf.oats.vospace.exception.VoSpaceErrorSummarizableException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Service public class JobService { + private static final Logger LOG = LoggerFactory.getLogger(JobService.class); + @Autowired private JobDAO jobDAO; @@ -33,6 +37,8 @@ public class JobService { public void setJobPhase(JobSummary job, String phase) { + LOG.trace("Job " + job.getJobId() + " phase set to " + phase); + // TODO: check allowed job phase transitions switch (phase) { case "RUN": @@ -48,11 +54,19 @@ public class JobService { private void startJob(JobSummary job) { try { - job.setPhase(ExecutionPhase.EXECUTING); - jobDAO.updateJob(job); - Transfer transfer = uriService.getTransfer(job); + ExecutionPhase phase; + if (transfer.getProtocols().stream().anyMatch(p -> "ia2:async-recall".equals(p.getUri()))) { + // Async recall from tape jobs are queued. They will be started by VOSpace transfer service + phase = ExecutionPhase.QUEUED; + } else { + phase = ExecutionPhase.EXECUTING; + } + job.setPhase(phase); + + jobDAO.updateJob(job); + switch (getJobType(transfer)) { case pullToVoSpace: handlePullToVoSpace(job, transfer); @@ -125,7 +139,7 @@ public class JobService { job.setPhase(ExecutionPhase.ERROR); uriService.getTransfer(job).getProtocols().clear(); job.setErrorSummary(ErrorSummaryFactory.newErrorSummary( - new InternalFaultException(e))); + new InternalFaultException(e))); } finally { jobDAO.createJob(job); } diff --git a/src/test/java/it/inaf/oats/vospace/JobServiceTest.java b/src/test/java/it/inaf/oats/vospace/JobServiceTest.java new file mode 100644 index 0000000000000000000000000000000000000000..7a320fbe5f4d979556c18e409ae83c355bc0b44a --- /dev/null +++ b/src/test/java/it/inaf/oats/vospace/JobServiceTest.java @@ -0,0 +1,113 @@ +package it.inaf.oats.vospace; + +import it.inaf.oats.vospace.exception.NodeBusyException; +import it.inaf.oats.vospace.persistence.JobDAO; +import net.ivoa.xml.uws.v1.ExecutionPhase; +import net.ivoa.xml.uws.v1.JobSummary; +import net.ivoa.xml.vospace.v2.Protocol; +import net.ivoa.xml.vospace.v2.Transfer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class JobServiceTest { + + @Mock + private JobDAO jobDAO; + + @Mock + private UriService uriService; + + @Mock + private AsyncTransferService asyncTransfService; + + @InjectMocks + private JobService jobService; + + @Test + public void testStartJobDefault() { + when(uriService.getTransfer(any())).thenReturn(getHttpTransfer()); + + JobSummary job = new JobSummary(); + jobService.setJobPhase(job, "RUN"); + + verify(jobDAO, times(1)).updateJob(argThat(j -> ExecutionPhase.EXECUTING.equals(j.getPhase()))); + } + + @Test + public void testStartJobTape() { + when(uriService.getTransfer(any())).thenReturn(getTapeTransfer()); + + JobSummary job = new JobSummary(); + jobService.setJobPhase(job, "RUN"); + + verify(jobDAO, times(1)).updateJob(argThat(j -> ExecutionPhase.QUEUED.equals(j.getPhase()))); + } + + @Test + public void testStartJobVoSpaceError() { + when(uriService.getTransfer(any())).thenReturn(getTapeTransfer()); + + when(asyncTransfService.startJob(any())).thenThrow(new NodeBusyException("/foo")); + + JobSummary job = new JobSummary(); + jobService.setJobPhase(job, "RUN"); + + verify(jobDAO, times(2)).updateJob(argThat(j -> ExecutionPhase.ERROR.equals(j.getPhase()))); + } + + @Test + public void testStartJobUnexpectedError() { + when(uriService.getTransfer(any())).thenReturn(getTapeTransfer()); + + when(asyncTransfService.startJob(any())).thenThrow(new NullPointerException()); + + JobSummary job = new JobSummary(); + jobService.setJobPhase(job, "RUN"); + + verify(jobDAO, times(2)).updateJob(argThat(j -> ExecutionPhase.ERROR.equals(j.getPhase()))); + } + + @Test + public void testSyncJobResultVoSpaceError() { + when(uriService.getTransfer(any())).thenReturn(getHttpTransfer()); + doThrow(new NodeBusyException("/foo")).when(uriService).setSyncTransferEndpoints(any()); + jobService.createSyncJobResult(new JobSummary()); + verify(jobDAO, times(1)).createJob(argThat(j -> ExecutionPhase.ERROR.equals(j.getPhase()))); + } + + @Test + public void testSyncJobResultUnexpectedError() { + when(uriService.getTransfer(any())).thenReturn(getHttpTransfer()); + doThrow(new NullPointerException()).when(uriService).setSyncTransferEndpoints(any()); + jobService.createSyncJobResult(new JobSummary()); + verify(jobDAO, times(1)).createJob(argThat(j -> ExecutionPhase.ERROR.equals(j.getPhase()))); + } + + private Transfer getHttpTransfer() { + Transfer transfer = new Transfer(); + transfer.setDirection("pullFromVoSpace"); + Protocol protocol = new Protocol(); + protocol.setUri("ivo://ivoa.net/vospace/core#httpget"); + transfer.getProtocols().add(protocol); + return transfer; + } + + private Transfer getTapeTransfer() { + Transfer transfer = new Transfer(); + transfer.setDirection("pullToVoSpace"); + Protocol protocol = new Protocol(); + protocol.setUri("ia2:async-recall"); + transfer.getProtocols().add(protocol); + return transfer; + } +}