diff --git a/transfer_service/retrieve_preprocessor.py b/transfer_service/retrieve_preprocessor.py index 1efd4a10b6db534226a2007f53eaaa7e1936f079..d8dbb8cb432960e50a1ea6c9116994ddd8ef53ad 100644 --- a/transfer_service/retrieve_preprocessor.py +++ b/transfer_service/retrieve_preprocessor.py @@ -5,6 +5,7 @@ # SPDX-License-Identifier: GPL-3.0-or-later # +import datetime import json import logging import os @@ -99,22 +100,36 @@ class RetrievePreprocessor(TaskExecutor): jobId = self.jobObj.jobId self.execute() try: - jobPhase = self.dbConn.getJobPhase(jobId) - except Exception: - self.logger.exception(f"Database error: unable to retrieve job phase for job {jobId}.") - else: - if jobPhase == "ABORTED": - self.jobObj.setPhase("ABORTED") - self.setDestinationQueueName("read_terminated") - self.update("ABORTED") + # Check if the user name is present into the 'users' table of the VOSpace database + if not self.dbConn.getUserName(self.jobObj.ownerId): + self.jobObj.setPhase("ERROR") + self.jobObj.setEndTime(datetime.datetime.now().isoformat()) + self.jobObj.setErrorType("transient") + self.jobObj.setErrorMessage("The user is registered in the authentication system (RAP), but is not present into the 'users' table of the VOSpace database.") + self.dbConn.insertJob(self.jobObj) + self.logger.info("Job phase updated to ERROR.") + errorFlag = True else: + self.logger.info("Job phase updated to QUEUED.") self.update("QUEUED") + errorFlag = False + except Exception: + self.logger.exception(f"Database error: unable to retrieve user name using the job ownerID {self.jobObj.ownerId}.") + else: try: - self.destQueue.insertJob(self.jobObj) - self.srcQueue.extractJob() + if errorFlag: + self.setDestinationQueueName("read_terminated") + if self.destQueue.len() >= self.maxTerminatedJobs: + self.destQueue.extractJob() + self.destQueue.insertJob(self.jobObj) + self.srcQueue.extractJob() + else: + self.destQueue.insertJob(self.jobObj) + self.srcQueue.extractJob() except Exception: self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") else: self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") finally: + self.setDestinationQueueName("read_ready") self.cleanup()