diff --git a/transfer_service/retrieve_preprocessor.py b/transfer_service/retrieve_preprocessor.py index a9bea7b433dd988da4262043b691e161d64718d0..b9ee2da4ac4e389ad74662b4745c96a51af97ac6 100644 --- a/transfer_service/retrieve_preprocessor.py +++ b/transfer_service/retrieve_preprocessor.py @@ -53,7 +53,7 @@ class RetrievePreprocessor(TaskExecutor): self.nodeList.append(target + '/' + el["value"]) self.jobObj.jobInfo["nodeList"] = self.nodeList.copy() - def update(self): + def update(self, status): # Send e-mail notification m = Mailer(self.logger) @@ -64,20 +64,19 @@ class RetrievePreprocessor(TaskExecutor): msg = f""" Dear user, - your job has been QUEUED. + your job has been {status}. Job ID: {self.jobObj.jobId} Job type: {self.jobObj.type} Owner ID: {self.jobObj.ownerId} - You will be notified by email once the job is completed. - """ - m.setMessage("VOSpace data retrieve notification: Job QUEUED", msg) + m.setMessage(f"VOSpace data retrieve notification: Job {status}", msg) m.send() def cleanup(self): self.nodeList.clear() + self.setDestinationQueueName("read_ready") def run(self): self.logger.info("Starting retrieve preprocessor...") @@ -89,18 +88,29 @@ class RetrievePreprocessor(TaskExecutor): srcQueueLen = self.srcQueue.len() destQueueLen = self.destQueue.len() except: - self.logger.exception("Cache error: failed to retrieve queue length.") + self.logger.exception("Cache error: unable to retrieve queue length.") else: if destQueueLen < self.maxReadyJobs and srcQueueLen > 0: self.jobObj = self.srcQueue.getJob() + jobId = self.jobObj.jobId self.execute() - self.update() try: - self.destQueue.insertJob(self.jobObj) - self.srcQueue.extractJob() + jobPhase = self.dbConn.getPhase(jobId) except Exception: - self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") - else: - self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") + self.logger.exception(f"Database error: unable to retrieve job phase for job {jobId}.") + else: + if jobPhase == "ABORTED": + self.jobObj.setPhase("ABORTED") + self.setDestinationQueueName("read_terminated") + self.update("ABORTED") + else: + self.update("QUEUED") + try: + self.destQueue.insertJob(self.jobObj) + self.srcQueue.extractJob() + except Exception: + self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") + else: + self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") finally: self.cleanup()