diff --git a/transfer_service/import_executor.py b/transfer_service/import_executor.py index 4ee7897eb17bc1406115b80b817646e5c2ff9d0f..31d0382c1a1e34bc081ef102f09a7bc9fa8997ab 100644 --- a/transfer_service/import_executor.py +++ b/transfer_service/import_executor.py @@ -227,4 +227,4 @@ class ImportExecutor(TaskExecutor): self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() - self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") + self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") diff --git a/transfer_service/retrieve_cleaner.py b/transfer_service/retrieve_cleaner.py index 7229cab202190e4923579b61f0df439c61d3abc9..717187b939da049f6d6a8dd3d1c46c7aeb33b249 100644 --- a/transfer_service/retrieve_cleaner.py +++ b/transfer_service/retrieve_cleaner.py @@ -91,7 +91,7 @@ class RetrieveCleaner(TaskExecutor): i = 0 self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() - self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") + self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") def run(self): self.logger.info("Starting retrieve cleaner...") diff --git a/transfer_service/retrieve_executor.py b/transfer_service/retrieve_executor.py index 92f85463b1b4e360eba7205a777d583dec46befe..2aafc4ad97668fb9237492daba38fba60d4112aa 100644 --- a/transfer_service/retrieve_executor.py +++ b/transfer_service/retrieve_executor.py @@ -337,4 +337,4 @@ class RetrieveExecutor(TaskExecutor): self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() - self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") + self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") diff --git a/transfer_service/retrieve_preprocessor.py b/transfer_service/retrieve_preprocessor.py index 57126e9eb8b70fad56eb93c07fdc3295202091a6..a9bea7b433dd988da4262043b691e161d64718d0 100644 --- a/transfer_service/retrieve_preprocessor.py +++ b/transfer_service/retrieve_preprocessor.py @@ -43,6 +43,7 @@ class RetrievePreprocessor(TaskExecutor): super(RetrievePreprocessor, self).__init__() def execute(self): + self.logger.info("Generating VOSpace node list") target = self.jobObj.jobInfo["transfer"]["target"].split("!vospace")[1] params = self.jobObj.jobInfo["transfer"]["view"]["param"] if not params: @@ -66,6 +67,7 @@ class RetrievePreprocessor(TaskExecutor): your job has been QUEUED. Job ID: {self.jobObj.jobId} + Job type: {self.jobObj.type} Owner ID: {self.jobObj.ownerId} You will be notified by email once the job is completed. @@ -83,11 +85,22 @@ class RetrievePreprocessor(TaskExecutor): self.setDestinationQueueName("read_ready") while True: self.wait() - if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0: - self.jobObj = self.srcQueue.getJob() - self.execute() - self.update() - self.destQueue.insertJob(self.jobObj) - self.srcQueue.extractJob() + try: + srcQueueLen = self.srcQueue.len() + destQueueLen = self.destQueue.len() + except: + self.logger.exception("Cache error: failed to retrieve queue length.") + else: + if destQueueLen < self.maxReadyJobs and srcQueueLen > 0: + self.jobObj = self.srcQueue.getJob() + self.execute() + self.update() + try: + self.destQueue.insertJob(self.jobObj) + self.srcQueue.extractJob() + except Exception: + self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") + else: + self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") + finally: self.cleanup() - self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") diff --git a/transfer_service/store_executor.py b/transfer_service/store_executor.py index 6354194b09f7adb59ac6f25109b38fd5efa7fc73..9715f5a0bbd707d5673fe781af1879954f1651e5 100644 --- a/transfer_service/store_executor.py +++ b/transfer_service/store_executor.py @@ -246,4 +246,4 @@ class StoreExecutor(TaskExecutor): self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() - self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") + self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index 78a291540d67a99aab32fda78fa1b06efb6ba40e..72880e7e460607ef446dcc46a68269d3d00e2ade 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -126,9 +126,8 @@ class StorePreprocessor(TaskExecutor): self.md5calc.recursive(destPath) # Case 4: /home/user/store is empty (this should be handled by data_rpc_server.py) else: - self.logger.critical("FATAL: the 'store' directory is empty") - time.sleep(5) - sys.exit(1) + self.logger.critical("FATAL: the 'store' directory is empty.") + return False # Third scan after directory structure 'check & repair' self.logger.info("Recursive scan of the 'store' directory") @@ -138,8 +137,7 @@ class StorePreprocessor(TaskExecutor): locationId = self.dbConn.getLocationId(self.storageId) except Exception: self.logger.exception("FATAL: unable to obtain the location ID for the storage point") - time.sleep(5) - sys.exit(1) + return False self.logger.info("Checksum calculation and file catalog update") pathPrefix = self.storageStorePath.replace("{username}", self.username) @@ -175,9 +173,8 @@ class StorePreprocessor(TaskExecutor): now = dt.now().isoformat() self.nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) except Exception: - self.logger.exception("FATAL: unable to update the file catalog") - time.sleep(5) - sys.exit(1) + self.logger.exception("FATAL: unable to update the file catalog.") + return False for flist in files: for file in flist: @@ -212,17 +209,15 @@ class StorePreprocessor(TaskExecutor): now = dt.now().isoformat() self.nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) except Exception: - self.logger.exception("FATAL: unable to update the file catalog") - time.sleep(5) - sys.exit(1) + self.logger.exception("FATAL: unable to update the file catalog.") + return False self.logger.info("Overall data size calculation") self.jobObj.jobInfo["dataSize"] = self.systemUtils.getSize(self.path) self.logger.info("++++++++++ End of preprocessing phase ++++++++++") + return True except Exception: - self.logger.exception("FATAL: something went wrong during the preprocessing phase") - self.update("ERROR") - time.sleep(5) - sys.exit(1) + self.logger.exception("FATAL: something went wrong during the preprocessing phase.") + return False def update(self, status): try: @@ -234,7 +229,7 @@ class StorePreprocessor(TaskExecutor): m.addRecipient(userEmail) if status == "OK": - self.logger.info("Job phase updated to QUEUED") + self.logger.info("Job phase updated to QUEUED.") self.jobObj.setPhase("QUEUED") self.dbConn.setPhase(self.jobId, "QUEUED") # Send e-mail notification @@ -255,7 +250,7 @@ class StorePreprocessor(TaskExecutor): m.send() else: # Send e-mail notification - self.logger.info("Job phase updated to ERROR") + self.logger.info("Job phase updated to ERROR.") self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") self.dbConn.insertJob(self.jobObj) @@ -272,13 +267,13 @@ class StorePreprocessor(TaskExecutor): Storage ID: {self.storageId} Owner ID: {self.jobObj.ownerId} - The issue will be automatically reported to the administrator. + This issue will be automatically reported to the administrator. """ m.setMessage("VOSpace data storage notification: Job ERROR", msg) m.send() except Exception: - self.logger.exception(f"Unable to update the job status for job {self.jobId}") + self.logger.exception(f"FATAL: unable to update the job status for job {self.jobId}") finally: self.jobObj.jobInfo["nodeList"] = self.nodeList.copy() self.nodeList.clear() @@ -289,23 +284,33 @@ class StorePreprocessor(TaskExecutor): self.setDestinationQueueName("write_ready") while True: self.wait() - if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0: - self.jobObj = self.srcQueue.getJob() - self.jobId = self.jobObj.jobId - self.storageId = self.jobObj.jobInfo["storageId"] - self.storageType = self.jobObj.jobInfo["storageType"] - self.userId = self.jobObj.ownerId - self.username = self.jobObj.jobInfo["userName"] - self.prepare(self.username) - self.execute() - self.update("OK") - try: - self.destQueue.insertJob(self.jobObj) - except Exception: - self.logger.exception(f"Failed to move job {self.jobObj.jobId} from {self.srcQueue.name()} to {self.destQueue.name()}") - else: - self.srcQueue.extractJob() - self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") + try: + srcQueueLen = self.srcQueue.len() + destQueueLen = self.destQueue.len() + except: + self.logger.exception("Cache error: failed to retrieve queue length.") + else: + if destQueueLen < self.maxReadyJobs and srcQueueLen > 0: + self.jobObj = self.srcQueue.getJob() + self.jobId = self.jobObj.jobId + self.storageId = self.jobObj.jobInfo["storageId"] + self.storageType = self.jobObj.jobInfo["storageType"] + self.userId = self.jobObj.ownerId + self.username = self.jobObj.jobInfo["userName"] + self.prepare(self.username) + if self.execute(): + self.update("OK") + else: + self.update("ERROR") + try: + self.destQueue.insertJob(self.jobObj) + self.srcQueue.extractJob() + except Exception: + self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") + else: + self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") + finally: + self.setDestinationQueueName("write_ready") # Test #sp = StorePreprocessor()