Skip to content
Snippets Groups Projects
Commit 9fbfe1a3 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Minor changes.

parent 4ce8001c
No related branches found
No related tags found
No related merge requests found
Pipeline #2217 passed
...@@ -227,4 +227,4 @@ class ImportExecutor(TaskExecutor): ...@@ -227,4 +227,4 @@ class ImportExecutor(TaskExecutor):
self.destQueue.extractJob() self.destQueue.extractJob()
self.destQueue.insertJob(self.jobObj) self.destQueue.insertJob(self.jobObj)
self.srcQueue.extractJob() self.srcQueue.extractJob()
self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")
...@@ -91,7 +91,7 @@ class RetrieveCleaner(TaskExecutor): ...@@ -91,7 +91,7 @@ class RetrieveCleaner(TaskExecutor):
i = 0 i = 0
self.destQueue.insertJob(self.jobObj) self.destQueue.insertJob(self.jobObj)
self.srcQueue.extractJob() self.srcQueue.extractJob()
self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")
def run(self): def run(self):
self.logger.info("Starting retrieve cleaner...") self.logger.info("Starting retrieve cleaner...")
......
...@@ -337,4 +337,4 @@ class RetrieveExecutor(TaskExecutor): ...@@ -337,4 +337,4 @@ class RetrieveExecutor(TaskExecutor):
self.destQueue.extractJob() self.destQueue.extractJob()
self.destQueue.insertJob(self.jobObj) self.destQueue.insertJob(self.jobObj)
self.srcQueue.extractJob() self.srcQueue.extractJob()
self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")
...@@ -43,6 +43,7 @@ class RetrievePreprocessor(TaskExecutor): ...@@ -43,6 +43,7 @@ class RetrievePreprocessor(TaskExecutor):
super(RetrievePreprocessor, self).__init__() super(RetrievePreprocessor, self).__init__()
def execute(self): def execute(self):
self.logger.info("Generating VOSpace node list")
target = self.jobObj.jobInfo["transfer"]["target"].split("!vospace")[1] target = self.jobObj.jobInfo["transfer"]["target"].split("!vospace")[1]
params = self.jobObj.jobInfo["transfer"]["view"]["param"] params = self.jobObj.jobInfo["transfer"]["view"]["param"]
if not params: if not params:
...@@ -66,6 +67,7 @@ class RetrievePreprocessor(TaskExecutor): ...@@ -66,6 +67,7 @@ class RetrievePreprocessor(TaskExecutor):
your job has been QUEUED. your job has been QUEUED.
Job ID: {self.jobObj.jobId} Job ID: {self.jobObj.jobId}
Job type: {self.jobObj.type}
Owner ID: {self.jobObj.ownerId} Owner ID: {self.jobObj.ownerId}
You will be notified by email once the job is completed. You will be notified by email once the job is completed.
...@@ -83,11 +85,22 @@ class RetrievePreprocessor(TaskExecutor): ...@@ -83,11 +85,22 @@ class RetrievePreprocessor(TaskExecutor):
self.setDestinationQueueName("read_ready") self.setDestinationQueueName("read_ready")
while True: while True:
self.wait() self.wait()
if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0: try:
srcQueueLen = self.srcQueue.len()
destQueueLen = self.destQueue.len()
except:
self.logger.exception("Cache error: failed to retrieve queue length.")
else:
if destQueueLen < self.maxReadyJobs and srcQueueLen > 0:
self.jobObj = self.srcQueue.getJob() self.jobObj = self.srcQueue.getJob()
self.execute() self.execute()
self.update() self.update()
try:
self.destQueue.insertJob(self.jobObj) self.destQueue.insertJob(self.jobObj)
self.srcQueue.extractJob() self.srcQueue.extractJob()
except Exception:
self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")
else:
self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")
finally:
self.cleanup() self.cleanup()
self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")
...@@ -246,4 +246,4 @@ class StoreExecutor(TaskExecutor): ...@@ -246,4 +246,4 @@ class StoreExecutor(TaskExecutor):
self.destQueue.extractJob() self.destQueue.extractJob()
self.destQueue.insertJob(self.jobObj) self.destQueue.insertJob(self.jobObj)
self.srcQueue.extractJob() self.srcQueue.extractJob()
self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")
...@@ -126,9 +126,8 @@ class StorePreprocessor(TaskExecutor): ...@@ -126,9 +126,8 @@ class StorePreprocessor(TaskExecutor):
self.md5calc.recursive(destPath) self.md5calc.recursive(destPath)
# Case 4: /home/user/store is empty (this should be handled by data_rpc_server.py) # Case 4: /home/user/store is empty (this should be handled by data_rpc_server.py)
else: else:
self.logger.critical("FATAL: the 'store' directory is empty") self.logger.critical("FATAL: the 'store' directory is empty.")
time.sleep(5) return False
sys.exit(1)
# Third scan after directory structure 'check & repair' # Third scan after directory structure 'check & repair'
self.logger.info("Recursive scan of the 'store' directory") self.logger.info("Recursive scan of the 'store' directory")
...@@ -138,8 +137,7 @@ class StorePreprocessor(TaskExecutor): ...@@ -138,8 +137,7 @@ class StorePreprocessor(TaskExecutor):
locationId = self.dbConn.getLocationId(self.storageId) locationId = self.dbConn.getLocationId(self.storageId)
except Exception: except Exception:
self.logger.exception("FATAL: unable to obtain the location ID for the storage point") self.logger.exception("FATAL: unable to obtain the location ID for the storage point")
time.sleep(5) return False
sys.exit(1)
self.logger.info("Checksum calculation and file catalog update") self.logger.info("Checksum calculation and file catalog update")
pathPrefix = self.storageStorePath.replace("{username}", self.username) pathPrefix = self.storageStorePath.replace("{username}", self.username)
...@@ -175,9 +173,8 @@ class StorePreprocessor(TaskExecutor): ...@@ -175,9 +173,8 @@ class StorePreprocessor(TaskExecutor):
now = dt.now().isoformat() now = dt.now().isoformat()
self.nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) self.nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ])
except Exception: except Exception:
self.logger.exception("FATAL: unable to update the file catalog") self.logger.exception("FATAL: unable to update the file catalog.")
time.sleep(5) return False
sys.exit(1)
for flist in files: for flist in files:
for file in flist: for file in flist:
...@@ -212,17 +209,15 @@ class StorePreprocessor(TaskExecutor): ...@@ -212,17 +209,15 @@ class StorePreprocessor(TaskExecutor):
now = dt.now().isoformat() now = dt.now().isoformat()
self.nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) self.nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ])
except Exception: except Exception:
self.logger.exception("FATAL: unable to update the file catalog") self.logger.exception("FATAL: unable to update the file catalog.")
time.sleep(5) return False
sys.exit(1)
self.logger.info("Overall data size calculation") self.logger.info("Overall data size calculation")
self.jobObj.jobInfo["dataSize"] = self.systemUtils.getSize(self.path) self.jobObj.jobInfo["dataSize"] = self.systemUtils.getSize(self.path)
self.logger.info("++++++++++ End of preprocessing phase ++++++++++") self.logger.info("++++++++++ End of preprocessing phase ++++++++++")
return True
except Exception: except Exception:
self.logger.exception("FATAL: something went wrong during the preprocessing phase") self.logger.exception("FATAL: something went wrong during the preprocessing phase.")
self.update("ERROR") return False
time.sleep(5)
sys.exit(1)
def update(self, status): def update(self, status):
try: try:
...@@ -234,7 +229,7 @@ class StorePreprocessor(TaskExecutor): ...@@ -234,7 +229,7 @@ class StorePreprocessor(TaskExecutor):
m.addRecipient(userEmail) m.addRecipient(userEmail)
if status == "OK": if status == "OK":
self.logger.info("Job phase updated to QUEUED") self.logger.info("Job phase updated to QUEUED.")
self.jobObj.setPhase("QUEUED") self.jobObj.setPhase("QUEUED")
self.dbConn.setPhase(self.jobId, "QUEUED") self.dbConn.setPhase(self.jobId, "QUEUED")
# Send e-mail notification # Send e-mail notification
...@@ -255,7 +250,7 @@ class StorePreprocessor(TaskExecutor): ...@@ -255,7 +250,7 @@ class StorePreprocessor(TaskExecutor):
m.send() m.send()
else: else:
# Send e-mail notification # Send e-mail notification
self.logger.info("Job phase updated to ERROR") self.logger.info("Job phase updated to ERROR.")
self.jobObj.setPhase("ERROR") self.jobObj.setPhase("ERROR")
self.jobObj.setErrorType("fatal") self.jobObj.setErrorType("fatal")
self.dbConn.insertJob(self.jobObj) self.dbConn.insertJob(self.jobObj)
...@@ -272,13 +267,13 @@ class StorePreprocessor(TaskExecutor): ...@@ -272,13 +267,13 @@ class StorePreprocessor(TaskExecutor):
Storage ID: {self.storageId} Storage ID: {self.storageId}
Owner ID: {self.jobObj.ownerId} Owner ID: {self.jobObj.ownerId}
The issue will be automatically reported to the administrator. This issue will be automatically reported to the administrator.
""" """
m.setMessage("VOSpace data storage notification: Job ERROR", msg) m.setMessage("VOSpace data storage notification: Job ERROR", msg)
m.send() m.send()
except Exception: except Exception:
self.logger.exception(f"Unable to update the job status for job {self.jobId}") self.logger.exception(f"FATAL: unable to update the job status for job {self.jobId}")
finally: finally:
self.jobObj.jobInfo["nodeList"] = self.nodeList.copy() self.jobObj.jobInfo["nodeList"] = self.nodeList.copy()
self.nodeList.clear() self.nodeList.clear()
...@@ -289,7 +284,13 @@ class StorePreprocessor(TaskExecutor): ...@@ -289,7 +284,13 @@ class StorePreprocessor(TaskExecutor):
self.setDestinationQueueName("write_ready") self.setDestinationQueueName("write_ready")
while True: while True:
self.wait() self.wait()
if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0: try:
srcQueueLen = self.srcQueue.len()
destQueueLen = self.destQueue.len()
except:
self.logger.exception("Cache error: failed to retrieve queue length.")
else:
if destQueueLen < self.maxReadyJobs and srcQueueLen > 0:
self.jobObj = self.srcQueue.getJob() self.jobObj = self.srcQueue.getJob()
self.jobId = self.jobObj.jobId self.jobId = self.jobObj.jobId
self.storageId = self.jobObj.jobInfo["storageId"] self.storageId = self.jobObj.jobInfo["storageId"]
...@@ -297,15 +298,19 @@ class StorePreprocessor(TaskExecutor): ...@@ -297,15 +298,19 @@ class StorePreprocessor(TaskExecutor):
self.userId = self.jobObj.ownerId self.userId = self.jobObj.ownerId
self.username = self.jobObj.jobInfo["userName"] self.username = self.jobObj.jobInfo["userName"]
self.prepare(self.username) self.prepare(self.username)
self.execute() if self.execute():
self.update("OK") self.update("OK")
else:
self.update("ERROR")
try: try:
self.destQueue.insertJob(self.jobObj) self.destQueue.insertJob(self.jobObj)
self.srcQueue.extractJob()
except Exception: except Exception:
self.logger.exception(f"Failed to move job {self.jobObj.jobId} from {self.srcQueue.name()} to {self.destQueue.name()}") self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")
else: else:
self.srcQueue.extractJob() self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")
self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") finally:
self.setDestinationQueueName("write_ready")
# Test # Test
#sp = StorePreprocessor() #sp = StorePreprocessor()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment