diff --git a/transfer_service/retrieve_executor.py b/transfer_service/retrieve_executor.py index fa3b15c37e7b26802f01d97f47df170137e4203e..e38d0b091f202b774ded7c116110f0be96491069 100644 --- a/transfer_service/retrieve_executor.py +++ b/transfer_service/retrieve_executor.py @@ -28,6 +28,7 @@ import os import logging import subprocess import sys +import tarfile from checksum import Checksum from config import Config @@ -45,6 +46,8 @@ class RetrieveExecutor(TaskExecutor): self.type = "retrieve_executor" self.systemUtils = SystemUtils() config = Config("/etc/vos_ts/vos_ts.conf") + params = config.loadSection("general") + self.vospaceBaseUrl = params["vospace_base_url"] params = config.loadSection("transfer_node") self.storageRetrievePath = params["retrieve_path"] params = config.loadSection("transfer") @@ -62,6 +65,7 @@ class RetrieveExecutor(TaskExecutor): redisLogHandler = RedisLogHandler() redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) + self.resDir = params["res_dir"] params = config.loadSection("file_catalog") self.dbConn = DbConnector(params["user"], params["password"], @@ -83,6 +87,8 @@ class RetrieveExecutor(TaskExecutor): self.jobId = None self.nodeList = [] self.fileList = [] + self.urlList = [] + self.urlListFileName = None self.destPathList = [] self.numBlocks = 0 self.procBlocks = 0 @@ -129,6 +135,16 @@ class RetrieveExecutor(TaskExecutor): except Exception: self.logger.exception(f"FATAL: unable to obtain the OS path for the VOSpace path '{vospacePath}'.") return False + try: + vospaceChildNodes = self.dbConn.getVOSpaceChildNodes(vospacePath, "data") + if not vospaceChildNodes: + self.urlList.append(self.vospaceBaseUrl + "/download" + vospacePath) + else: + for el in vospaceChildNodes: + self.urlList.append(self.vospaceBaseUrl + "/download" + el["vospace_path"]) + except Exception: + self.logger.exception(f"FATAL: unable to obtain the VOSpace data nodes download URL list.") + return False baseSrcPath = nodeInfo["baseSrcPath"] srcPath = nodeInfo["fullPath"] username = nodeInfo["username"] @@ -362,6 +378,23 @@ class RetrieveExecutor(TaskExecutor): self.dbConn.insertJob(self.jobObj) self.logger.info("Job phase updated to COMPLETED.") + self.urlListFileName = "url_list-" + self.jobId + urlListFilePath = os.path.join(self.resDir, self.urlListFileName) + try: + urlfp = open(urlListFilePath, "w") + except IOError: + self.logger.exception(f"Unable to generate {self.urlListFileName}") + else: + for url in self.urlList: + urlfp.write(url + '\n') + urlfp.close() + try: + tar = tarfile.open(f"{urlListFilePath}.tar.gz", "w:gz") + tar.add(urlListFilePath, arcname = self.urlListFileName) + tar.close() + except tarfile.TarError: + self.logger.exception(f"Unable to generate {self.urlListFileName}.tar.gz") + msg = f""" ########## VOSpace data retrieval procedure summary ########## @@ -373,9 +406,10 @@ class RetrieveExecutor(TaskExecutor): Owner ID: {self.jobObj.ownerId} Your files are available and can be downloaded. + Here below is attached a file containing the URL list of all the VOSpace data nodes. """ - m.setMessage("VOSpace data retrieve notification: COMPLETED", msg) + m.setMessageWithAttachment("VOSpace data retrieve notification: COMPLETED", msg, urlListFilePath + ".tar.gz") else: self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") @@ -416,11 +450,22 @@ class RetrieveExecutor(TaskExecutor): self.logger.info("Cleanup...") self.fileList.clear() self.nodeList.clear() + self.urlList.clear() self.destPathList.clear() self.storageType = None self.numBlocks = 0 self.procBlocks = 0 self.totalSize = 0 + urlListFilePath = os.path.join(self.resDir, self.urlListFileName) + try: + os.remove(urlListFilePath) + except OSError: + self.logger.exception(f"FATAL: unable to remove {self.urlListFileName}") + try: + os.remove(urlListFilePath + ".tar.gz") + except OSError: + self.logger.exception(f"FATAL: unable to remove {self.urlListFileName}.tar.gz") + self.urlListFileName = None def run(self): self.logger.info("Starting retrieve executor...")