diff --git a/transfer_service/retrieve_cleaner.py b/transfer_service/retrieve_cleaner.py index a784ccb669dfb426691745a93245e874aa242402..f5e34faf48cce2f6fca2a0a3829992e3e641a153 100644 --- a/transfer_service/retrieve_cleaner.py +++ b/transfer_service/retrieve_cleaner.py @@ -52,7 +52,7 @@ class RetrieveCleaner(TaskExecutor): self.nodeList = [] self.destPathList = [] super(RetrieveCleaner, self).__init__() - + def dataHasExpired(self): # Avoid "all zero" condition if self.days <= 0 and self.seconds < 30: @@ -61,7 +61,7 @@ class RetrieveCleaner(TaskExecutor): elif self.seconds >= 86400: self.days += self.seconds // 86400 self.seconds = self.seconds % 86400 - + jobEndTime = datetime.datetime.fromisoformat(self.jobObj.endTime) currentTime = datetime.datetime.now() delta = currentTime - jobEndTime @@ -69,11 +69,11 @@ class RetrieveCleaner(TaskExecutor): return True else: return False - + def execute(self): # while dim lists > 0: # loop over the two lists (nodeList and destPathList): - # if the vospace node is not busy: + # if the vospace node is not busy: # set 'async_trans' = True # delete the file/dir in the 'vospace_retrieve' directory # remove the corresponding elements on the two lists @@ -86,11 +86,13 @@ class RetrieveCleaner(TaskExecutor): self.logger.exception("FATAL: unable to obtain info about VOSpace nodes.") return False numNodes = len(self.nodeList) + numDestPaths = len(self.destPathList) self.logger.info(f"Number of VOSpace nodes involved: {numNodes}") - while numNodes > 0: + self.logger.info(f"Number of associated physical paths: {numDestPaths}") + while numNodes > 0 and numDestPaths > 0 and numNodes == numDestPaths: time.sleep(0.2) vospacePath = self.nodeList[numNodes - 1] - destPath = self.destPathList[numNodes - 1] + destPath = self.destPathList[numDestPaths - 1] try: busy = self.dbConn.nodeIsBusy(vospacePath) except Exception: @@ -112,7 +114,7 @@ class RetrieveCleaner(TaskExecutor): except FileNotFoundError: self.logger.exception(f"Cannot find '{destPath}', skip...") - # check for empty dirs and remove them + # Check for empty dirs and remove them basePath = self.storageRetrievePath.replace("{username}", self.username) for root, dirs, files in os.walk(basePath, topdown = False): for dir in dirs: @@ -120,8 +122,9 @@ class RetrieveCleaner(TaskExecutor): if not os.listdir(dirPath): os.rmdir(dirPath) self.nodeList.pop(numNodes - 1) - self.destPathList.pop(numNodes - 1) + self.destPathList.pop(numDestPaths - 1) numNodes -= 1 + numDestPaths -= 1 except Exception: self.logger.exception("FATAL: something went wrong while cleaning the expired data.") return False @@ -132,7 +135,7 @@ class RetrieveCleaner(TaskExecutor): def cleanup(self): self.nodeList.clear() self.destPathList.clear() - + def run(self): self.logger.info("Starting retrieve cleaner...") self.setSourceQueueName("read_terminated") @@ -148,7 +151,11 @@ class RetrieveCleaner(TaskExecutor): if srcQueueLen > 0: self.jobObj = self.srcQueue.getJob() self.nodeList = self.jobObj.nodeList.copy() - self.destPathList = self.jobObj.jobInfo["destPathList"].copy() + # The 'destPathList' key may not be present (e.g. when a user performs a login + # through VOSpace UI and launches an async recall job, but he/she is not present + # in the 'user' table of the VOSpace database) + if "destPathList" in self.jobObj.jobInfo: + self.destPathList = self.jobObj.jobInfo["destPathList"].copy() if self.dataHasExpired(): if self.execute(): try: