diff --git a/transfer_service/retrieve_executor.py b/transfer_service/retrieve_executor.py index e70482bc8a25ee346214a756d7734e73953b51f7..77f8a293adf03b66c2a261e76c83abd937ab71b1 100644 --- a/transfer_service/retrieve_executor.py +++ b/transfer_service/retrieve_executor.py @@ -79,6 +79,7 @@ class RetrieveExecutor(TaskExecutor): self.jobId = None self.nodeList = [] self.fileList = [] + self.destPathList = [] self.numBlocks = 0 self.procBlocks = 0 self.totalSize = 0 @@ -222,9 +223,9 @@ class RetrieveExecutor(TaskExecutor): osRelParentPath = osRelParentPath.replace(baseSrcPath, "") if osRelParentPath != "/": osRelParentPath += "/" - destPath = self.storageRetrievePath.replace("{username}", username) + osRelParentPath - os.makedirs(destPath, exist_ok = True) - sp = subprocess.run(["rsync", "-av", srcPath, destPath], capture_output = True) + destDirPath = self.storageRetrievePath.replace("{username}", username) + osRelParentPath + os.makedirs(destDirPath, exist_ok = True) + sp = subprocess.run(["rsync", "-av", srcPath, destDirPath], capture_output = True) if(sp.returncode or sp.stderr): return False @@ -263,6 +264,17 @@ class RetrieveExecutor(TaskExecutor): self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) + # Add a list of physical destination paths for each VOSpace node in the node list + for vospacePath in self.nodeList: + nodeInfo = self.dbConn.getOSPath(vospacePath) + baseSrcPath = nodeInfo["baseSrcPath"] + username = nodeInfo["username"] + srcPath = nodeInfo["fullPath"] + baseDestPath = self.storageRetrievePath.replace("{username}", username) + destPath = srcPath.replace(baseSrcPath, baseDestPath) + self.destPathList.append(destPath) + self.jobObj.jobInfo["destPathList"] = self.destPathList.copy() + # Send e-mail notification m = Mailer() @@ -290,6 +302,7 @@ class RetrieveExecutor(TaskExecutor): """ self.fileList.clear() self.nodeList.clear() + self.destPathList.clear() self.storageType = None self.numBlocks = 0 self.procBlocks = 0 @@ -304,7 +317,7 @@ class RetrieveExecutor(TaskExecutor): if self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId - self.nodeList = self.jobObj.jobInfo["nodeList"] + self.nodeList = self.jobObj.jobInfo["nodeList"].copy() self.buildFileList() self.buildBlocks() result = self.retrieveData() @@ -320,6 +333,5 @@ class RetrieveExecutor(TaskExecutor): if self.destQueue.len() == self.maxTerminatedJobs: self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) - self.srcQueue.extractJob() - + self.srcQueue.extractJob() self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")