diff --git a/pom.xml b/pom.xml index 488cdbf6f5c5d9a7ac2d00b1b746108c6133821e..54d818cf961768967f5bd7b661ca3a6e0723829c 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.boot spring-boot-starter-parent - 2.2.6.RELEASE + 2.4.5 it.inaf.oats @@ -83,10 +83,15 @@ auth-lib 2.0.0-SNAPSHOT + + + redis.clients + jedis + - org.springframework.boot - spring-boot-starter-amqp + org.mockito + mockito-inline diff --git a/src/main/java/it/inaf/oats/vospace/AsyncTransferService.java b/src/main/java/it/inaf/oats/vospace/AsyncTransferService.java index b3b13b2076cee076538ad1b8061752be11d08b5b..273478337d73de8ec0aa10530d100a184abc76d8 100644 --- a/src/main/java/it/inaf/oats/vospace/AsyncTransferService.java +++ b/src/main/java/it/inaf/oats/vospace/AsyncTransferService.java @@ -8,33 +8,58 @@ package it.inaf.oats.vospace; import com.fasterxml.jackson.databind.ObjectMapper; import it.inaf.oats.vospace.exception.InternalFaultException; import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import net.ivoa.xml.uws.v1.JobSummary; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import redis.clients.jedis.Jedis; @Service public class AsyncTransferService { private static final Logger LOG = LoggerFactory.getLogger(AsyncTransferService.class); - @Autowired - private RabbitTemplate template; - private static final ObjectMapper MAPPER = new ObjectMapper(); - public JobSummary startJob(JobSummary job) { + // WARNING: this instance is not thread safe + private final Jedis jedis; + + public AsyncTransferService(@Value("${spring.redis.host}") String redisHost, @Value("${spring.redis.port}") int redisPort) { + jedis = new Jedis(redisHost, redisPort); + } + + public synchronized JobSummary startJob(JobSummary job) { try { - byte[] message = MAPPER.writeValueAsBytes(job); - byte[] result = (byte[]) template.convertSendAndReceive("start_job_queue", message); + + String requestId = job.getJobId(); + + Map data = new HashMap<>(); + data.put("id", requestId); + data.put("job", job); + + String message = MAPPER.writeValueAsString(data); + + jedis.lpush("start_job_queue", message); + + List popData = jedis.brpop(30, requestId); + + String result = null; + for (String value : popData) { + // first result is requestId, second is JSON payload + if (!requestId.equals(value)) { + result = value; + } + } if (result == null) { - throw new InternalFaultException("Transfer service returned an empty response"); + throw new IllegalStateException("Job data not found in redis"); } - LOG.trace("Tape transfer service answered:\n{}", new String(result)); + LOG.trace("Transfer service answered:\n{}", result); return MAPPER.readValue(result, JobSummary.class); } catch (IOException ex) { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index aa0868511f095d90ae369c60372f4a698af7063b..924e80d7a965fadee745d05137191ac73b2c1f23 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,21 +1,6 @@ -# To change this license header, choose License Headers in Project Properties. -# To change this template file, choose Tools | Templates -# and open the template in the editor. - -# show sql statements issued by JPA -#spring.jpa.show-sql=true - -# enable debug logging for spring boot and hibernate classes -# this is equivalent to passing '--debug' as command line argument -#logging.level.org.springframework.boot=DEBUG -#logging.level.org.hibernate.SQL=DEBUG - -# log to file (absolute/relative path of log file) -#logging.file=path/to/log/file.log - server.port=8083 server.servlet.context-path=/vospace -#spring.profiles.active=@spring.profiles.active@ + # For development only: spring.profiles.active=dev @@ -23,16 +8,14 @@ spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/vospace_testdb spring.datasource.username=postgres spring.datasource.password=postgres -# enable debug logging -# this is equivalent to passing '--debug' as command line argument +spring.redis.host=127.0.0.1 +spring.redis.port=6379 + logging.level.it.inaf=TRACE logging.level.org.springframework.security=DEBUG logging.level.org.springframework.jdbc=TRACE logging.level.org.springframework.web=TRACE -#logging.level.org.springframework.boot=DEBUG -# log to file (absolute/relative path of log file) -#logging.file=path/to/log/file.log vospace-authority=example.com!vospace -file-service-url=http://localhost:8087 \ No newline at end of file +file-service-url=http://localhost:8087 diff --git a/src/test/java/it/inaf/oats/vospace/AsyncTransferServiceTest.java b/src/test/java/it/inaf/oats/vospace/AsyncTransferServiceTest.java new file mode 100644 index 0000000000000000000000000000000000000000..4993797f1a9997236c4237149503e331a448d327 --- /dev/null +++ b/src/test/java/it/inaf/oats/vospace/AsyncTransferServiceTest.java @@ -0,0 +1,78 @@ +/* + * This file is part of vospace-rest + * Copyright (C) 2021 Istituto Nazionale di Astrofisica + * SPDX-License-Identifier: GPL-3.0-or-later + */ +package it.inaf.oats.vospace; + +import java.util.ArrayList; +import java.util.List; +import net.ivoa.xml.uws.v1.JobSummary; +import net.ivoa.xml.vospace.v2.Protocol; +import net.ivoa.xml.vospace.v2.Transfer; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import org.mockito.MockedConstruction; +import org.mockito.Mockito; +import static org.mockito.Mockito.doAnswer; +import org.mockito.junit.jupiter.MockitoExtension; +import redis.clients.jedis.Jedis; + +@ExtendWith(MockitoExtension.class) +public class AsyncTransferServiceTest { + + private static final String JSON_JOB = "{\"jobId\":\"job_id\",\"runId\":null,\"ownerId\":null,\"phase\":null,\"quote\":null,\"creationTime\":null,\"startTime\":null,\"endTime\":null,\"executionDuration\":0,\"destruction\":null,\"parameters\":null,\"results\":[],\"errorSummary\":null,\"jobInfo\":{\"transfer\":{\"target\":\"vos://example.com!vospace/my-node\",\"direction\":\"pullToVoSpace\",\"view\":null,\"protocols\":[{\"endpoint\":null,\"param\":[],\"uri\":\"ia2:async-recall\"}],\"keepBytes\":null,\"version\":null,\"param\":[]}},\"version\":null}\n"; + + @Test + public void testRedisRpc() { + try (MockedConstruction staticMock = Mockito.mockConstruction(Jedis.class, + (mockedJedis, context) -> { + doAnswer(invocation -> { + String requestId = invocation.getArgument(1); + List result = new ArrayList<>(); + result.add(requestId); + result.add(JSON_JOB); + return result; + }).when(mockedJedis).brpop(anyInt(), anyString()); + })) { + AsyncTransferService asyncTransferService = new AsyncTransferService("localhost", 6379); + JobSummary result = asyncTransferService.startJob(getFakeJob()); + assertEquals("job_id", result.getJobId()); + } + } + + @Test + public void testRedisError() { + try (MockedConstruction staticMock = Mockito.mockConstruction(Jedis.class)) { + AsyncTransferService asyncTransferService = new AsyncTransferService("localhost", 6379); + try { + asyncTransferService.startJob(getFakeJob()); + fail(); + } catch(IllegalStateException ex) { + } + } + } + + private JobSummary getFakeJob() { + Transfer transfer = new Transfer(); + transfer.setDirection("pullToVoSpace"); + Protocol protocol = new Protocol(); + protocol.setUri("ia2:async-recall"); + transfer.getProtocols().add(protocol); + transfer.setTarget("vos://example.com!vospace/my-node"); + + JobSummary job = new JobSummary(); + job.setJobId("job_id"); + + JobSummary.JobInfo jobInfo = new JobSummary.JobInfo(); + jobInfo.getAny().add(transfer); + + job.setJobInfo(jobInfo); + + return job; + } +}