diff --git a/transfer_service/retrieve_cleaner.py b/transfer_service/retrieve_cleaner.py index af9621ffecd9784a7c7d97993ea9388eefe26ca8..2420f6ee36781ee648162ec9faa70b62b1cff391 100644 --- a/transfer_service/retrieve_cleaner.py +++ b/transfer_service/retrieve_cleaner.py @@ -45,7 +45,7 @@ class RetrieveCleaner(TaskExecutor): self.destPathList = [] super(RetrieveCleaner, self).__init__() - def execute(self): + def dataHasExpired(self): # Avoid "all zero" condition if self.days <= 0 and self.seconds < 30: self.days = 0 @@ -58,20 +58,43 @@ class RetrieveCleaner(TaskExecutor): currentTime = datetime.datetime.now() delta = currentTime - jobEndTime if delta.days >= self.days and delta.seconds > self.seconds: - # while dim lists > 0: - # loop over the two lists (nodeList and destPathList): - # if the vospace node is not busy: - # set 'async_trans' = True - # delete the file/dir in the 'retrieve' directory - # remove the corresponding elements on the two lists + return True + else: + return False + + def execute(self): + # while dim lists > 0: + # loop over the two lists (nodeList and destPathList): + # if the vospace node is not busy: + # set 'async_trans' = True + # delete the file/dir in the 'retrieve' directory + # remove the corresponding elements on the two lists + try: + self.logger.info("++++++++++ Start of execution phase ++++++++++") + try: + nodeInfo = self.dbConn.getOSPath(self.nodeList[0]) + self.username = nodeInfo["username"] + except Exception: + self.logger.exception("FATAL: unable to obtain info about VOSpace nodes.") + return False numNodes = len(self.nodeList) + self.logger.info(f"Number of VOSpace nodes involved: {numNodes}") while numNodes > 0: i = 0 while i < numNodes: vospacePath = self.nodeList[i] destPath = self.destPathList[i] - if not self.dbConn.nodeIsBusy(vospacePath): - self.dbConn.setAsyncTrans(vospacePath, True) + try: + busy = self.dbConn.nodeIsBusy(vospacePath) + except Exception: + self.logger.exception(f"FATAL: unable to check the 'busy' flag value for the VOSpace node '{vospacePath}'.") + return False + if not busy: + try: + self.dbConn.setAsyncTrans(vospacePath, True) + except Exception: + self.logger.exception(f"FATAL: unable to update the 'async_trans' flag for the VOSpace node '{vospacePath}'.") + return False if os.path.isfile(destPath): os.remove(destPath) else: @@ -86,9 +109,12 @@ class RetrieveCleaner(TaskExecutor): self.destPathList.pop(i) numNodes -= 1 i = 0 - self.destQueue.insertJob(self.jobObj) - self.srcQueue.extractJob() - self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") + except Exception: + self.logger.exception("FATAL: something went wrong while cleaning the expired data.") + return False + else: + self.logger.info("++++++++++ End of execution phase ++++++++++") + return True def run(self): self.logger.info("Starting retrieve cleaner...") @@ -96,10 +122,24 @@ class RetrieveCleaner(TaskExecutor): self.setDestinationQueueName("read_clean") while True: self.wait() - if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0: - self.jobObj = self.srcQueue.getJob() - self.nodeList = self.jobObj.nodeList.copy() - self.destPathList = self.jobObj.jobInfo["destPathList"].copy() - nodeInfo = self.dbConn.getOSPath(self.nodeList[0]) - self.username = nodeInfo["username"] - self.execute() + try: + srcQueueLen = self.srcQueue.len() + destQueueLen = self.destQueue.len() + except Exception: + self.logger.exception("Cache error: failed to retrieve queue length.") + else: + if destQueueLen < self.maxReadyJobs and srcQueueLen > 0: + self.jobObj = self.srcQueue.getJob() + self.nodeList = self.jobObj.nodeList.copy() + self.destPathList = self.jobObj.jobInfo["destPathList"].copy() + if self.dataHasExpired(): + if self.execute(): + try: + if destQueueLen >= self.maxCleanJobs: + self.destQueue.extractJob() + self.destQueue.insertJob(self.jobObj) + self.srcQueue.extractJob() + except Exception: + self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") + else: + self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")