From 833ca510e1bf2eece6ff3effd554f95152149645 Mon Sep 17 00:00:00 2001 From: Cristiano Urban Date: Fri, 27 Aug 2021 13:06:34 +0200 Subject: [PATCH] Added basic logging and exception handling. Signed-off-by: Cristiano Urban --- transfer_service/import_executor.py | 318 +++++++++++++++++----------- 1 file changed, 194 insertions(+), 124 deletions(-) diff --git a/transfer_service/import_executor.py b/transfer_service/import_executor.py index 2961aed..7691d09 100644 --- a/transfer_service/import_executor.py +++ b/transfer_service/import_executor.py @@ -57,108 +57,133 @@ class ImportExecutor(TaskExecutor): self.pathPrefix = None self.storageId = None self.storageType = None + self.nodeList = [] super(ImportExecutor, self).__init__() - def importVOSpaceNodes(self): + def execute(self): """This method performs the VOSpace import operation.""" + try: + self.logger.info("++++++++++ Start of import phase ++++++++++") + self.dbConn.setPhase(self.jobId, "EXECUTING") + self.dbConn.setStartTime(self.jobId) + self.logger.info("Job phase updated to EXECUTING.") - self.dbConn.setPhase(self.jobId, "EXECUTING") - self.dbConn.setStartTime(self.jobId) - - start = dt.now() - nodeList = [] - timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") - nodeListFile = os.path.join(self.resDir, "vos_import_report-" + timestamp) - nlfp = open(nodeListFile, "w") - - if self.storageType == "cold": - self.tapeClient.connect() - self.tapeClient.recallChecksumFiles(self.path) - self.tapeClient.disconnect() - - [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path)) - - tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") - for dir in dirs: - if self.path in dir: - parentPath = os.path.dirname(dir).split(self.pathPrefix)[1] - nodeName = os.path.basename(dir) - cnode = Node(nodeName, "container") - if not tstampWrapperDirPattern.match("/" + nodeName): - if tstampWrapperDirPattern.search(parentPath): - tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') - parentPath = tstampWrapperDirPattern.sub("", parentPath) - cnode.setWrapperDir(tstampWrapperDir) - if parentPath == '/': - vospacePath = parentPath + nodeName - else: + if self.storageType == "cold": + self.tapeClient.connect() + self.tapeClient.recallChecksumFiles(self.path) + self.tapeClient.disconnect() + + self.logger.info(f"Recursive scan of '{os.path.dirname(self.path)}'") + [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path)) + + try: + locationId = self.dbConn.getLocationId(self.storageId) + except Exception: + self.logger.exception("FATAL: unable to obtain the location ID for the storage point.") + return False + + tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") + for dir in dirs: + if self.path in dir: + parentPath = os.path.dirname(dir).split(self.pathPrefix)[1] + nodeName = os.path.basename(dir) + cnode = Node(nodeName, "container") + if not tstampWrapperDirPattern.match("/" + nodeName): + if tstampWrapperDirPattern.search(parentPath): + tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') + parentPath = tstampWrapperDirPattern.sub("", parentPath) + cnode.setWrapperDir(tstampWrapperDir) + if parentPath == '/': + vospacePath = parentPath + nodeName + else: + vospacePath = parentPath + '/' + nodeName + cnode.setParentPath(parentPath) + cnode.setLocationId(locationId) + cnode.setCreatorId(self.userId) + cnode.setContentLength(0) + cnode.setAsyncTrans(True) + cnode.setSticky(True) + + try: + if os.path.islink(dir): + now = dt.now().isoformat() + self.nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ]) + elif self.dbConn.insertNode(cnode): + now = dt.now().isoformat() + self.nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) + else: + now = dt.now().isoformat() + self.nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) + except Exception: + self.logger.exception("FATAL: unable to update the file catalog.") + return False + + for flist in files: + for file in flist: + if self.md5calc.fileIsValid(file) and self.path in os.path.dirname(file): + parentPath = os.path.dirname(file).split(self.pathPrefix)[1] + nodeName = os.path.basename(file) + dnode = Node(nodeName, "data") + if tstampWrapperDirPattern.search(parentPath): + tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') + parentPath = tstampWrapperDirPattern.sub("", parentPath) + dnode.setWrapperDir(tstampWrapperDir) vospacePath = parentPath + '/' + nodeName - cnode.setParentPath(parentPath) - locationId = self.dbConn.getLocationId(self.storageId) - cnode.setLocationId(locationId) - cnode.setCreatorId(self.userId) - cnode.setContentLength(0) - cnode.setAsyncTrans(True) - cnode.setSticky(True) - - if os.path.islink(dir): - now = dt.now().isoformat() - nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ]) - elif self.dbConn.insertNode(cnode): - now = dt.now().isoformat() - nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) - else: - now = dt.now().isoformat() - nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) - - for flist in files: - for file in flist: - if self.md5calc.fileIsValid(file) and self.path in os.path.dirname(file): - parentPath = os.path.dirname(file).split(self.pathPrefix)[1] - nodeName = os.path.basename(file) - dnode = Node(nodeName, "data") - if tstampWrapperDirPattern.search(parentPath): - tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') - parentPath = tstampWrapperDirPattern.sub("", parentPath) - dnode.setWrapperDir(tstampWrapperDir) - vospacePath = parentPath + '/' + nodeName - dnode.setParentPath(parentPath) - self.storageId = self.dbConn.getStorageId(self.pathPrefix) - locationId = self.dbConn.getLocationId(self.storageId) - dnode.setLocationId(locationId) - dnode.setCreatorId(self.userId) - dnode.setContentLength(os.path.getsize(file)) - dnode.setContentMD5(self.md5calc.getMD5(file)) - dnode.setAsyncTrans(True) - dnode.setSticky(True) - - if os.path.islink(file): - now = dt.now().isoformat() - nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ]) - elif self.dbConn.insertNode(dnode): - now = dt.now().isoformat() - nodeList.append([ now, file, vospacePath, "data", "DONE" ]) - else: - now = dt.now().isoformat() - nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) - - nlfp.write(tabulate(nodeList, - headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"], - tablefmt = "simple")) - nlfp.close() - end = dt.now() - - # Update job status (to be moved) - results = [{"target": ""}] - self.dbConn.setResults(self.jobId, results) - self.jobObj.setPhase("COMPLETED") - self.dbConn.setPhase(self.jobId, "COMPLETED") - self.dbConn.setEndTime(self.jobId) - - # Send e-mail notification - m = Mailer(self.logger) - m.addRecipient(self.adminEmail) - msg = f""" + dnode.setParentPath(parentPath) + dnode.setLocationId(locationId) + dnode.setCreatorId(self.userId) + dnode.setContentLength(os.path.getsize(file)) + dnode.setContentMD5(self.md5calc.getMD5(file)) + dnode.setAsyncTrans(True) + dnode.setSticky(True) + + try: + if os.path.islink(file): + now = dt.now().isoformat() + self.nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ]) + elif self.dbConn.insertNode(dnode): + now = dt.now().isoformat() + self.nodeList.append([ now, file, vospacePath, "data", "DONE" ]) + else: + now = dt.now().isoformat() + self.nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) + except Exception: + self.logger.exception("FATAL: unable to update the file catalog.") + return False + except Exception: + self.logger.exception("FATAL: something went wrong during the import phase.") + return False + else: + self.logger.info("++++++++++ End of import phase ++++++++++") + return True + + def update(self, status): + try: + results = [{"target": ""}] + self.dbConn.setResults(self.jobId, results) + + m = Mailer(self.logger) + m.addRecipient(self.adminEmail) + + if status == "OK": + timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") + nodeListFile = os.path.join(self.resDir, "vos_import_report-" + timestamp) + try: + nlfp = open(nodeListFile, "w") + except IOError: + self.logger.exception("Unable to generate the 'vos_import_report'.") + else: + nlfp.write(tabulate(self.nodeList, + headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"], + tablefmt = "simple")) + nlfp.close() + + self.jobObj.setPhase("COMPLETED") + self.dbConn.setPhase(self.jobId, "COMPLETED") + self.dbConn.setEndTime(self.jobId) + self.logger.info("Job phase updated to COMPLETED.") + + msg = f""" ########## VOSpace import procedure summary ########## Job ID: {self.jobId} @@ -166,27 +191,59 @@ class ImportExecutor(TaskExecutor): Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} - Start time: {start} - End time: {end} - Processed nodes: {len(nodeList)} - Imported nodes: {sum(res[-1] == 'DONE' for res in nodeList)} - Skipped nodes: {sum(res[-1] == 'SKIPPED' for res in nodeList)} - Symlinks detected: {sum(res[-1] == 'SYMLINK' for res in nodeList)} + Processed nodes: {len(self.nodeList)} + Imported nodes: {sum(res[-1] == 'DONE' for res in self.nodeList)} + Skipped nodes: {sum(res[-1] == 'SKIPPED' for res in self.nodeList)} + Symlinks detected: {sum(res[-1] == 'SYMLINK' for res in self.nodeList)} """ - if len(nodeList) <= 10 ** 5: - m.setMessageWithAttachment("VOSpace import notification", msg, nodeListFile) - else: - info = f""" + if len(self.nodeList) <= 10 ** 5: + m.setMessageWithAttachment("VOSpace import notification", msg, nodeListFile) + else: + info = f""" INFO: this operation involved a number of nodes greater than 10^5, you will find the results in {self.resDir}. """ - msg += info - m.setMessage("VOSpace import notification", msg) - m.send() + msg += info + m.setMessage("VOSpace import notification", msg) + else: + self.jobObj.setPhase("ERROR") + self.jobObj.setErrorType("fatal") + self.jobObj.setErrorMessage("FATAL: something went wrong during the import phase.") + self.dbConn.insertJob(self.jobObj) + self.dbConn.setEndTime(self.jobId) + self.logger.info("Job phase updated to ERROR.") + + msg = f""" + ########## VOSpace import procedure summary ########## + + Job ID: {self.jobId} + Job type {self.jobObj.type} + Storage type: {self.storageType} + Storage ID: {self.storageId} + Creator ID: {self.userId} + """ + + info = f""" + ERROR: + the job was terminated due to an error that occurred + while importing the VOSpace nodes in the file catalog. + + This issue will be automatically reported to the administrator. + + """ + msg += info + m.setMessage("VOSpace data storage notification: Job ERROR", msg) + # Send e-mail notification + m.send() + except Exception: + self.logger.exception(f"FATAL: unable to update the database, job ID: {self.jobId}") + finally: + self.nodeList.clear() + def run(self): self.logger.info("Starting import executor...") @@ -194,17 +251,30 @@ class ImportExecutor(TaskExecutor): self.setDestinationQueueName("import_terminated") while True: self.wait() - if self.srcQueue.len() > 0: - self.jobObj = self.srcQueue.getJob() - self.jobId = self.jobObj.jobId - self.userId = self.jobObj.ownerId - self.path = self.jobObj.jobInfo["path"] - self.pathPrefix = self.jobObj.jobInfo["pathPrefix"] - self.storageId = self.jobObj.jobInfo["storageId"] - self.storageType = self.jobObj.jobInfo["storageType"] - self.importVOSpaceNodes() - if self.destQueue.len() == self.maxTerminatedJobs: - self.destQueue.extractJob() - self.destQueue.insertJob(self.jobObj) - self.srcQueue.extractJob() - self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") + try: + srcQueueLen = self.srcQueue.len() + destQueueLen = self.destQueue.len() + except Exception: + self.logger.exception("Cache error: failed to retrieve queue length.") + else: + if srcQueueLen > 0: + self.jobObj = self.srcQueue.getJob() + self.jobId = self.jobObj.jobId + self.userId = self.jobObj.ownerId + self.path = self.jobObj.jobInfo["path"] + self.pathPrefix = self.jobObj.jobInfo["pathPrefix"] + self.storageId = self.jobObj.jobInfo["storageId"] + self.storageType = self.jobObj.jobInfo["storageType"] + if self.execute(): + self.update("OK") + else: + self.update("ERROR") + try: + if destQueueLen == self.maxTerminatedJobs: + self.destQueue.extractJob() + self.destQueue.insertJob(self.jobObj) + self.srcQueue.extractJob() + except Exception: + self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") + else: + self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") -- GitLab