From 29c7a0178bfd76eb0b34c0b39cb41595f6bccc48 Mon Sep 17 00:00:00 2001
From: Nicola Fulvio Calabria <nicola.calabria@inaf.it>
Date: Mon, 19 Jul 2021 16:12:01 +0200
Subject: [PATCH] Most of file copy implementation written

---
 .../transfer/controller/CopyController.java   |  16 +-
 .../ia2/transfer/persistence/FileDAO.java     |  51 ++++
 .../ia2/transfer/service/CopyService.java     | 153 ------------
 .../ia2/transfer/service/FileCopyService.java | 223 ++++++++++++++++++
 4 files changed, 287 insertions(+), 156 deletions(-)
 delete mode 100644 src/main/java/it/inaf/ia2/transfer/service/CopyService.java
 create mode 100644 src/main/java/it/inaf/ia2/transfer/service/FileCopyService.java

diff --git a/src/main/java/it/inaf/ia2/transfer/controller/CopyController.java b/src/main/java/it/inaf/ia2/transfer/controller/CopyController.java
index 96df91a..c99976a 100644
--- a/src/main/java/it/inaf/ia2/transfer/controller/CopyController.java
+++ b/src/main/java/it/inaf/ia2/transfer/controller/CopyController.java
@@ -11,20 +11,30 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.beans.factory.annotation.Autowired;
-import it.inaf.ia2.transfer.service.CopyService;
+import it.inaf.ia2.transfer.service.FileCopyService;
+import java.util.concurrent.CompletableFuture;
 
 @RestController
 public class CopyController extends AuthenticatedFileController {
 
     @Autowired
-    private CopyService copyService;
+    private FileCopyService copyService;
 
     @PostMapping(value = "/copy", consumes = MediaType.APPLICATION_JSON_VALUE)
     public ResponseEntity<?> copyFiles(@RequestBody CopyRequest copyRequest) {
+               
+        // need to make a completable future start
+        CompletableFuture.runAsync(() -> {
+            handleFileJob(() -> copyService.copyFiles(copyRequest.getSourceRootVosPath(), 
+                copyRequest.getDestinationRootVosPath(), copyRequest.getJobId(), 
+                getPrincipal()), copyRequest.jobId);
+        });     
 
         return ResponseEntity.ok(
                 copyRequest.getJobId() + " copy task accepted by File Service"
-        );
+        );        
+        
+        
     }
 
     @Override
diff --git a/src/main/java/it/inaf/ia2/transfer/persistence/FileDAO.java b/src/main/java/it/inaf/ia2/transfer/persistence/FileDAO.java
index 37ff240..430e852 100644
--- a/src/main/java/it/inaf/ia2/transfer/persistence/FileDAO.java
+++ b/src/main/java/it/inaf/ia2/transfer/persistence/FileDAO.java
@@ -191,6 +191,57 @@ public class FileDAO {
             return fileInfos;
         });
     }
+    
+    // TODO: same problem as get archive file infos   
+    public List<FileInfo> getBranchFileInfos(String rootVosPath, String jobId) {
+
+        // TODO: validate rootVosPath as a vos_path
+
+        String sql = "SELECT n.node_id, n.is_public, n.group_read, n.group_write, n.creator_id, n.async_trans,\n"
+                + "n.content_type, n.content_encoding, n.content_length, n.content_md5,\n"
+                + "n.accept_views, n.provide_views, l.location_type, n.path <> n.relative_path AS virtual_parent,\n"
+                + "(SELECT user_name FROM users WHERE user_id = n.creator_id) AS username,\n"
+                + "base_path, get_os_path(n.node_id) AS os_path, get_vos_path(n.node_id) AS vos_path,\n"
+                + "n.type = 'container' AS is_directory, n.name, n.location_id, n.job_id\n"
+                + "FROM node n\n"
+                + "JOIN node p ON p.path @> n.path\n"
+                + "LEFT JOIN location l ON l.location_id = n.location_id\n"
+                + "LEFT JOIN storage s ON s.storage_id = l.storage_dest_id\n"
+                + "WHERE (p.node_id  = id_from_vos_path(?) AND n.job_id = ?)"
+                + "\nORDER BY vos_path ASC";
+
+        return jdbcTemplate.query(conn -> {
+            PreparedStatement ps = conn.prepareStatement(sql);
+            ps.setString(1, rootVosPath);
+            ps.setString(2, jobId);
+            return ps;
+        }, rs -> {
+            List<FileInfo> fileInfos = new ArrayList<>();
+            while (rs.next()) {
+                fileInfos.add(getFileInfo(rs));
+            }
+            return fileInfos;
+        });
+    }
+    
+    public void setBranchLocationId(String rootVosPath, String jobId, int locationId) {
+
+        // TODO: validate rootVosPath as a vos_path
+
+        String sql = "UPDATE node n SET\n"
+                + "location_id = ?\n"                
+                + "JOIN node p ON n.path <@ p.path\n"                
+                + "WHERE (path ~ ('*.' || id_from_vos_path(?))::lquery AND n.job_id = ?)";
+
+        jdbcTemplate.update(conn -> {
+            PreparedStatement ps = conn.prepareStatement(sql);
+            ps.setInt(1, locationId);
+            ps.setString(2, rootVosPath);
+            ps.setString(3, jobId);
+            return ps;
+        });
+    }
+    
 
     private FileInfo getFileInfo(ResultSet rs) throws SQLException {
         FileInfo fi = new FileInfo();
diff --git a/src/main/java/it/inaf/ia2/transfer/service/CopyService.java b/src/main/java/it/inaf/ia2/transfer/service/CopyService.java
deleted file mode 100644
index 1e31e65..0000000
--- a/src/main/java/it/inaf/ia2/transfer/service/CopyService.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * This file is part of vospace-file-service
- * Copyright (C) 2021 Istituto Nazionale di Astrofisica
- * SPDX-License-Identifier: GPL-3.0-or-later
- */
-package it.inaf.ia2.transfer.service;
-
-import it.inaf.ia2.transfer.auth.TokenPrincipal;
-import it.inaf.ia2.transfer.exception.InsufficientStorageException;
-import it.inaf.ia2.transfer.exception.JobException;
-import it.inaf.ia2.transfer.exception.JobException.Type;
-import it.inaf.ia2.transfer.persistence.FileDAO;
-import it.inaf.ia2.transfer.persistence.JobDAO;
-import it.inaf.ia2.transfer.persistence.LocationDAO;
-import it.inaf.ia2.transfer.persistence.model.FileInfo;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.UncheckedIOException;
-import java.nio.file.Files;
-import java.security.Principal;
-import java.util.List;
-import java.util.Map;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipOutputStream;
-import net.ivoa.xml.uws.v1.ExecutionPhase;
-import org.kamranzafar.jtar.TarEntry;
-import org.kamranzafar.jtar.TarOutputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.stereotype.Service;
-import org.springframework.util.FileSystemUtils;
-import org.springframework.util.unit.DataSize;
-import org.springframework.web.client.RestTemplate;
-
-@Service
-public class CopyService {
-
-    private static final Logger LOG = LoggerFactory.getLogger(CopyService.class);
-
-    @Autowired
-    private FileDAO fileDAO;
-
-    @Autowired
-    private LocationDAO locationDAO;
-
-    @Autowired
-    private JobDAO jobDAO;
-
-    @Autowired
-    private AuthorizationService authorizationService;
-
-    @Autowired
-    private RestTemplate restTemplate;
-
-    @Value("${upload_location_id}")
-    private int uploadLocationId;
-
-    // Maximum size of the working directory for each registered user
-    @Value("${generated.dir.max-size}")
-    private DataSize generatedDirMaxSize;
-    
-    
-    
-    
-    public void copyFiles(String sourceRootVosPath, 
-            String destinationRootVosPath, String jobId) {
-        // We use jobId to identify nodes created by the REST part of CopyNode
-        // We expect them to be locked
-        
-                
-        
-        
-    }
-
-    
-
-    private String getCommonParent(List<String> vosPaths) {
-        String commonParent = null;
-        for (String vosPath : vosPaths) {
-            if (commonParent == null) {
-                commonParent = vosPath;
-            } else {
-                StringBuilder newCommonParent = new StringBuilder();
-                boolean same = true;
-                for (int i = 0; same && i < Math.min(commonParent.length(), vosPath.length()); i++) {
-                    if (commonParent.charAt(i) == vosPath.charAt(i)) {
-                        newCommonParent.append(commonParent.charAt(i));
-                    } else {
-                        same = false;
-                    }
-                }
-                commonParent = newCommonParent.toString();
-            }
-        }
-        return commonParent;
-    }
-
-    private <O extends OutputStream, E> void downloadFileIntoArchive(FileInfo fileInfo, String relPath, TokenPrincipal tokenPrincipal, ArchiveHandler<O, E> handler, String baseUrl) {
-
-        if (baseUrl == null) {
-            LOG.error("Location URL not found for location " + fileInfo.getLocationId());
-            throw new JobException(Type.FATAL, "Internal Fault")
-                    .setErrorDetail("InternalFault: Unable to retrieve location of file " + fileInfo.getVirtualPath());
-        }
-
-        String url = baseUrl + "/" + fileInfo.getVirtualName();
-
-        LOG.trace("Downloading file from " + url);
-
-        restTemplate.execute(url, HttpMethod.GET, req -> {
-            HttpHeaders headers = req.getHeaders();
-            if (tokenPrincipal.getToken() != null) {
-                headers.setBearerAuth(tokenPrincipal.getToken());
-            }
-        }, res -> {
-            File tmpFile = Files.createTempFile("download", null).toFile();
-            try ( FileOutputStream os = new FileOutputStream(tmpFile)) {
-                res.getBody().transferTo(os);
-                handler.putNextEntry(tmpFile, relPath);
-                try ( FileInputStream is = new FileInputStream(tmpFile)) {
-                    is.transferTo(handler.getOutputStream());
-                }
-            } finally {
-                tmpFile.delete();
-            }
-            return null;
-        }, new Object[]{});
-    }
-
-    private <O extends OutputStream, E> void writeFileIntoArchive(FileInfo fileInfo, String relPath, TokenPrincipal tokenPrincipal, ArchiveHandler<O, E> handler) throws IOException {
-        if (!authorizationService.isDownloadable(fileInfo, tokenPrincipal)) {
-            throw new JobException(Type.FATAL, "Permission Denied")
-                    .setErrorDetail("PermissionDenied: " + fileInfo.getVirtualPath());
-        }
-
-        File file = new File(fileInfo.getOsPath());
-        LOG.trace("Adding file " + file.getAbsolutePath() + " to tar archive");
-
-        try ( InputStream is = new FileInputStream(file)) {
-            handler.putNextEntry(file, relPath);
-            is.transferTo(handler.getOutputStream());
-        }
-    }
-}
diff --git a/src/main/java/it/inaf/ia2/transfer/service/FileCopyService.java b/src/main/java/it/inaf/ia2/transfer/service/FileCopyService.java
new file mode 100644
index 0000000..d9e493e
--- /dev/null
+++ b/src/main/java/it/inaf/ia2/transfer/service/FileCopyService.java
@@ -0,0 +1,223 @@
+/*
+ * This file is part of vospace-file-service
+ * Copyright (C) 2021 Istituto Nazionale di Astrofisica
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ */
+package it.inaf.ia2.transfer.service;
+
+import it.inaf.ia2.transfer.auth.TokenPrincipal;
+import it.inaf.ia2.transfer.exception.JobException;
+import it.inaf.ia2.transfer.exception.JobException.Type;
+import it.inaf.ia2.transfer.persistence.FileDAO;
+import it.inaf.ia2.transfer.persistence.LocationDAO;
+import it.inaf.ia2.transfer.persistence.model.FileInfo;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestTemplate;
+
+@Service
+public class FileCopyService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FileCopyService.class);
+
+    @Autowired
+    private FileDAO fileDAO;
+
+    @Autowired
+    private LocationDAO locationDAO;
+
+    @Autowired
+    private AuthorizationService authorizationService;
+
+    @Autowired
+    private RestTemplate restTemplate;
+
+    @Value("${upload_location_id}")
+    private int uploadLocationId;
+
+    public void copyFiles(String sourceRootVosPath,
+            String destinationRootVosPath, String jobId, TokenPrincipal principal) {
+        // We use jobId to identify nodes created by the REST part of CopyNode
+        // We expect them to be locked
+
+        List<FileInfo> sources
+                = fileDAO.getBranchFileInfos(sourceRootVosPath, jobId);
+        // Set location of destinations to this file service update location 
+        // before retrieving file infos
+
+        fileDAO.setBranchLocationId(destinationRootVosPath, jobId, uploadLocationId);
+        List<FileInfo> destinations
+                = fileDAO.getBranchFileInfos(destinationRootVosPath, jobId);
+
+        if (sources.size() != destinations.size()) {
+            throw new IllegalStateException("Sources and destinations list have different sizes");
+        }
+
+        // Create destination directories on disk
+        this.makeDirectoryStructure(destinations);
+
+        this.fillDestinations(destinations,
+                sources,
+                sourceRootVosPath,
+                destinationRootVosPath,
+                principal);
+
+    }
+
+    private void makeDirectory(FileInfo containerFileInfo) {
+        File file = new File(containerFileInfo.getOsPath());
+
+        if (!file.exists()) {
+            if (!file.mkdirs()) {
+                throw new IllegalStateException("Unable to create directory " + containerFileInfo.getOsPath());
+            }
+        }
+
+    }
+
+    private void makeDirectoryStructure(List<FileInfo> destinationFileInfos) {
+        for (FileInfo f : destinationFileInfos) {
+            if (f.isDirectory()) {
+                this.makeDirectory(f);
+            }
+        }
+    }
+
+    private void fillDestinations(List<FileInfo> destinationFileInfos,
+            List<FileInfo> sourcesFileInfos,
+            String sourceRootVosPath,
+            String destinationRootVosPath,
+            TokenPrincipal principal) {
+
+        // it will be initialized only when necessary
+        Map<Integer, String> portalLocationUrls = null;
+
+        for (FileInfo f : destinationFileInfos) {
+            if (!f.isDirectory()) {
+                // Calculate source file vos path
+                String correspondingSourceVosPath
+                        = this.getCorrespondingSourceVosPath(
+                                sourceRootVosPath,
+                                destinationRootVosPath,
+                                f.getVirtualPath());
+
+                Optional<FileInfo> sourceOpt = this.findFileInfoByVosPath(sourcesFileInfos,
+                        correspondingSourceVosPath);
+
+                FileInfo source = sourceOpt
+                        .orElseThrow(() -> new IllegalStateException("Can't find file info for: "
+                        + correspondingSourceVosPath + " in source files list"));
+
+                if (source.getLocationId() != null && source.getLocationId() != uploadLocationId) {
+                    // remote file
+                    if (portalLocationUrls == null) {
+                        portalLocationUrls = locationDAO.getPortalLocationUrls();
+                    }
+                    String url = portalLocationUrls.get(source.getLocationId());
+                    // download file to destination disk path
+                    this.downloadFileToDisk(source, f, principal, url);
+
+                } else {
+                    // local file
+                    // copy file to destination disk path
+                    this.copyLocalFile(source, f, principal);
+
+                }
+
+            }
+        }
+
+    }
+
+    private String getCorrespondingSourceVosPath(String sourceRootVosPath,
+            String destinationRootVosPath,
+            String destinationVosPath) {
+        return sourceRootVosPath
+                + destinationVosPath.substring(destinationRootVosPath.length());
+    }
+
+    private Optional<FileInfo> findFileInfoByVosPath(List<FileInfo> list, String vosPath) {
+
+        return list.stream().filter(i -> i.getVirtualPath().equals(vosPath)).findFirst();
+
+    }
+
+    private void downloadFileToDisk(FileInfo sourceFile,
+            FileInfo destinationFile, TokenPrincipal tokenPrincipal, String baseUrl) {
+
+        if (baseUrl == null) {
+            LOG.error("Location URL not found for location " + sourceFile.getLocationId());
+            throw new JobException(Type.FATAL, "Internal Fault")
+                    .setErrorDetail("InternalFault: Unable to retrieve location of file " + sourceFile.getVirtualPath());
+        }
+
+        String url = baseUrl + "/" + sourceFile.getVirtualName();
+
+        LOG.trace("Downloading file from " + url);
+
+        restTemplate.execute(url, HttpMethod.GET, req -> {
+            HttpHeaders headers = req.getHeaders();
+            if (tokenPrincipal.getToken() != null) {
+                headers.setBearerAuth(tokenPrincipal.getToken());
+            }
+        }, res -> {
+
+            File outFile = new File(destinationFile.getOsPath());
+
+            try (FileOutputStream os = new FileOutputStream(outFile)) {
+                res.getBody().transferTo(os);
+            } catch (Exception e) {
+                outFile.delete();
+                throw e;
+            }
+
+            return null;
+        }, new Object[]{});
+    }
+
+    private void copyLocalFile(FileInfo sourceFileInfo,
+            FileInfo destinationFileInfo, TokenPrincipal tokenPrincipal) {
+        if (!authorizationService.isDownloadable(sourceFileInfo, tokenPrincipal)) {
+            throw new JobException(Type.FATAL, "Permission Denied")
+                    .setErrorDetail("PermissionDenied: " + sourceFileInfo.getVirtualPath());
+        }
+
+        File file = new File(sourceFileInfo.getOsPath());
+        LOG.trace("Copying file: " + file.getAbsolutePath() + " to: "
+                + destinationFileInfo.getOsPath());
+
+        File sourceFile = new File(sourceFileInfo.getOsPath());
+        File destinationFile = new File(destinationFileInfo.getOsPath());
+
+        try {
+            Files.copy(sourceFile.toPath(), destinationFile.toPath());
+        } catch (IOException e) {
+            if (Files.exists(destinationFile.toPath())) {
+                destinationFile.delete();
+            }
+
+            throw new UncheckedIOException(e);
+        } catch (Exception e) {
+            if (Files.exists(destinationFile.toPath())) {
+                destinationFile.delete();
+            }
+
+            throw e;
+
+        }
+
+    }
+}
-- 
GitLab