From ca746ed72770256b740354806222beb03522a4d7 Mon Sep 17 00:00:00 2001 From: Cristiano Urban <cristiano.urban@inaf.it> Date: Fri, 7 Jul 2023 15:52:49 +0200 Subject: [PATCH] Check if user is present in 'users' table of the VOSpace database and handle job accordingly. Signed-off-by: Cristiano Urban <cristiano.urban@inaf.it> --- transfer_service/retrieve_preprocessor.py | 35 ++++++++++++++++------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/transfer_service/retrieve_preprocessor.py b/transfer_service/retrieve_preprocessor.py index 1efd4a1..d8dbb8c 100644 --- a/transfer_service/retrieve_preprocessor.py +++ b/transfer_service/retrieve_preprocessor.py @@ -5,6 +5,7 @@ # SPDX-License-Identifier: GPL-3.0-or-later # +import datetime import json import logging import os @@ -99,22 +100,36 @@ class RetrievePreprocessor(TaskExecutor): jobId = self.jobObj.jobId self.execute() try: - jobPhase = self.dbConn.getJobPhase(jobId) - except Exception: - self.logger.exception(f"Database error: unable to retrieve job phase for job {jobId}.") - else: - if jobPhase == "ABORTED": - self.jobObj.setPhase("ABORTED") - self.setDestinationQueueName("read_terminated") - self.update("ABORTED") + # Check if the user name is present into the 'users' table of the VOSpace database + if not self.dbConn.getUserName(self.jobObj.ownerId): + self.jobObj.setPhase("ERROR") + self.jobObj.setEndTime(datetime.datetime.now().isoformat()) + self.jobObj.setErrorType("transient") + self.jobObj.setErrorMessage("The user is registered in the authentication system (RAP), but is not present into the 'users' table of the VOSpace database.") + self.dbConn.insertJob(self.jobObj) + self.logger.info("Job phase updated to ERROR.") + errorFlag = True else: + self.logger.info("Job phase updated to QUEUED.") self.update("QUEUED") + errorFlag = False + except Exception: + self.logger.exception(f"Database error: unable to retrieve user name using the job ownerID {self.jobObj.ownerId}.") + else: try: - self.destQueue.insertJob(self.jobObj) - self.srcQueue.extractJob() + if errorFlag: + self.setDestinationQueueName("read_terminated") + if self.destQueue.len() >= self.maxTerminatedJobs: + self.destQueue.extractJob() + self.destQueue.insertJob(self.jobObj) + self.srcQueue.extractJob() + else: + self.destQueue.insertJob(self.jobObj) + self.srcQueue.extractJob() except Exception: self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") else: self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") finally: + self.setDestinationQueueName("read_ready") self.cleanup() -- GitLab