From acfef35f3068bf9314e292eb7c98cce32e0b328f Mon Sep 17 00:00:00 2001
From: Sonia Zorba <sonia.zorba@inaf.it>
Date: Thu, 16 Sep 2021 15:02:24 +0200
Subject: [PATCH] Fixed AsyncTransferService issue caused by Redis restart

---
 .../oats/vospace/AsyncTransferService.java    | 19 ++++++--
 .../vospace/AsyncTransferServiceTest.java     | 46 +++++++++++++------
 2 files changed, 46 insertions(+), 19 deletions(-)

diff --git a/src/main/java/it/inaf/oats/vospace/AsyncTransferService.java b/src/main/java/it/inaf/oats/vospace/AsyncTransferService.java
index 4819aa0..e0fce53 100644
--- a/src/main/java/it/inaf/oats/vospace/AsyncTransferService.java
+++ b/src/main/java/it/inaf/oats/vospace/AsyncTransferService.java
@@ -17,6 +17,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
 
 @Service
 public class AsyncTransferService {
@@ -25,15 +27,22 @@ public class AsyncTransferService {
 
     private static final ObjectMapper MAPPER = new ObjectMapper();
 
-    // WARNING: this instance is not thread safe
-    private final Jedis jedis;
+    private final JedisPool jedisPool;
 
     public AsyncTransferService(@Value("${spring.redis.host}") String redisHost, @Value("${spring.redis.port}") int redisPort) {
-        jedis = new Jedis(redisHost, redisPort);
+
+        // To avoid receiving a JedisConnectionException when redis server is
+        // restarted a JedisPool with proper configuration is used.
+        JedisPoolConfig poolConfig = new JedisPoolConfig();
+
+        poolConfig.setTestOnBorrow(true); // sends a ping request when asking for the resource
+        poolConfig.setTestWhileIdle(true); // sends periodic pings to idle resources in the pool
+
+        jedisPool = new JedisPool(poolConfig, redisHost, redisPort);
     }
 
-    public synchronized JobSummary startJob(JobSummary job) {
-        try {
+    public JobSummary startJob(JobSummary job) {
+        try (Jedis jedis = jedisPool.getResource()) {
 
             String requestId = job.getJobId();
 
diff --git a/src/test/java/it/inaf/oats/vospace/AsyncTransferServiceTest.java b/src/test/java/it/inaf/oats/vospace/AsyncTransferServiceTest.java
index 75dd4e4..471cd2b 100644
--- a/src/test/java/it/inaf/oats/vospace/AsyncTransferServiceTest.java
+++ b/src/test/java/it/inaf/oats/vospace/AsyncTransferServiceTest.java
@@ -8,6 +8,7 @@ package it.inaf.oats.vospace;
 import it.inaf.oats.vospace.datamodel.Views;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Consumer;
 import net.ivoa.xml.uws.v1.JobSummary;
 import net.ivoa.xml.vospace.v2.Param;
 import net.ivoa.xml.vospace.v2.Transfer;
@@ -18,44 +19,61 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
+import org.mockito.Mock;
 import org.mockito.MockedConstruction;
 import org.mockito.Mockito;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
 import org.mockito.junit.jupiter.MockitoExtension;
 import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
 
 @ExtendWith(MockitoExtension.class)
 public class AsyncTransferServiceTest {
 
     private static final String JSON_JOB = "{\"jobId\":\"job_id\",\"runId\":null,\"ownerId\":null,\"phase\":null,\"quote\":null,\"creationTime\":null,\"startTime\":null,\"endTime\":null,\"executionDuration\":0,\"destruction\":null,\"parameters\":null,\"results\":[],\"errorSummary\":null,\"jobInfo\":{\"transfer\":{\"target\":\"vos://example.com!vospace/my-node\",\"direction\":\"pullToVoSpace\",\"view\":{\"param\":[{\"value\":\"file1.txt\",\"uri\":\"ivo://ia2.inaf.it/vospace/views#async-recall/include\"},{\"value\":\"file2.txt\",\"uri\":\"ivo://ia2.inaf.it/vospace/views#async-recall/include\"}],\"uri\":\"ivo://ia2.inaf.it/vospace/views#async-recall\",\"original\":true},\"protocols\":[],\"keepBytes\":false,\"version\":null,\"param\":[]}},\"version\":null}";
 
+    @Mock
+    private Jedis mockedJedis;
+
     @Test
     public void testRedisRpc() {
-        try (MockedConstruction<Jedis> staticMock = Mockito.mockConstruction(Jedis.class,
-                (mockedJedis, context) -> {
-                    doAnswer(invocation -> {
-                        String requestId = invocation.getArgument(1);
-                        List<String> result = new ArrayList<>();
-                        result.add(requestId);
-                        result.add(JSON_JOB);
-                        return result;
-                    }).when(mockedJedis).brpop(anyInt(), anyString());
-                })) {
-            AsyncTransferService asyncTransferService = new AsyncTransferService("localhost", 6379);
+
+        doAnswer(invocation -> {
+            String requestId = invocation.getArgument(1);
+            List<String> result = new ArrayList<>();
+            result.add(requestId);
+            result.add(JSON_JOB);
+            return result;
+        }).when(mockedJedis).brpop(anyInt(), anyString());
+
+        testWithMockedJedis(asyncTransferService -> {
             JobSummary result = asyncTransferService.startJob(getFakeJob());
             assertEquals("job_id", result.getJobId());
-        }
+        });
     }
 
     @Test
     public void testRedisError() {
-        try (MockedConstruction<Jedis> staticMock = Mockito.mockConstruction(Jedis.class)) {
-            AsyncTransferService asyncTransferService = new AsyncTransferService("localhost", 6379);
+
+        when(mockedJedis.brpop(anyInt(), anyString())).thenReturn(List.of());
+
+        testWithMockedJedis(asyncTransferService -> {
             try {
                 asyncTransferService.startJob(getFakeJob());
                 fail();
             } catch (IllegalStateException ex) {
             }
+        });
+    }
+
+    private void testWithMockedJedis(Consumer<AsyncTransferService> testAction) {
+        try (MockedConstruction<JedisPool> staticMock = Mockito.mockConstruction(JedisPool.class,
+                (mockedJedisPool, context) -> {
+                    when(mockedJedisPool.getResource()).thenReturn(mockedJedis);
+                })) {
+            AsyncTransferService asyncTransferService = new AsyncTransferService("localhost", 6379);
+            testAction.accept(asyncTransferService);
         }
     }
 
-- 
GitLab