From 2e7c0a35ea0937421c0f264042a9a3a90cfa3fb9 Mon Sep 17 00:00:00 2001 From: Cristiano Urban Date: Fri, 3 Sep 2021 15:54:57 +0200 Subject: [PATCH] Added basic logging and exception handling. Signed-off-by: Cristiano Urban --- transfer_service/retrieve_executor.py | 468 ++++++++++++++++---------- 1 file changed, 284 insertions(+), 184 deletions(-) diff --git a/transfer_service/retrieve_executor.py b/transfer_service/retrieve_executor.py index 9a03b6f..1eaea4a 100644 --- a/transfer_service/retrieve_executor.py +++ b/transfer_service/retrieve_executor.py @@ -87,108 +87,142 @@ class RetrieveExecutor(TaskExecutor): def buildFileList(self): """ Generates the list of all files to retrieve. + This method returns 'True' on success, 'False' on failure. """ - self.dbConn.setPhase(self.jobId, "EXECUTING") - self.dbConn.setStartTime(self.jobId) + try: + try: + self.dbConn.setPhase(self.jobId, "EXECUTING") + self.dbConn.setStartTime(self.jobId) + except Exception: + self.logger.exception("FATAL: unable to update the file catalog.") + return False + else: + self.logger.info("Job phase updated to EXECUTING.") + self.logger.info("Building the list of the files to be retrieved...") - # debug block... - if os.path.exists("nodeList.txt"): - os.remove("nodeList.txt") - nl = open("nodeList.txt", 'w') - for vospacePath in self.nodeList: - nl.write(vospacePath + '\n') - nl.close() + # debug block... + #if os.path.exists("nodeList.txt"): + # os.remove("nodeList.txt") + #nl = open("nodeList.txt", 'w') + #for vospacePath in self.nodeList: + # nl.write(vospacePath + '\n') + #nl.close() - # Obtain the storage type - self.storageType = self.dbConn.getOSPath(self.nodeList[0])["storageType"] - for vospacePath in self.nodeList: - nodeInfo = self.dbConn.getOSPath(vospacePath) - baseSrcPath = nodeInfo["baseSrcPath"] - srcPath = nodeInfo["fullPath"] - username = nodeInfo["username"] - md5calc = Checksum() - if os.path.isdir(srcPath): - for root, dirs, files in os.walk(srcPath, topdown = False): - for f in files: - fullPath = os.path.join(root, f) - if md5calc.fileIsValid(fullPath): - fileSize = os.stat(fullPath).st_size - fileInfo = { - "baseSrcPath": baseSrcPath, - "fullPath": fullPath, - "username": username, - "fileSize": fileSize, - "vospaceRootParent": vospacePath - } - self.fileList.append(fileInfo.copy()) - else: - if md5calc.fileIsValid(srcPath): - fileSize = nodeInfo["contentLength"] - fileInfo = { - "baseSrcPath": baseSrcPath, - "fullPath": srcPath, - "username": username, - "fileSize": fileSize, - "vospaceRootParent": vospacePath - } - self.fileList.append(fileInfo.copy()) + # Obtain the storage type + try: + self.storageType = self.dbConn.getOSPath(self.nodeList[0])["storageType"] + except Exception: + self.logger.exception("FATAL: unable to obtain the storage type.") + return False + for vospacePath in self.nodeList: + try: + nodeInfo = self.dbConn.getOSPath(vospacePath) + except Exception: + self.logger.exception(f"FATAL: unable to obtain the OS path for the VOSpace path '{vospacePath}'.") + return False + baseSrcPath = nodeInfo["baseSrcPath"] + srcPath = nodeInfo["fullPath"] + username = nodeInfo["username"] + md5calc = Checksum() + if os.path.isdir(srcPath): + for root, dirs, files in os.walk(srcPath, topdown = False): + for f in files: + fullPath = os.path.join(root, f) + if md5calc.fileIsValid(fullPath): + fileSize = os.stat(fullPath).st_size + fileInfo = { + "baseSrcPath": baseSrcPath, + "fullPath": fullPath, + "username": username, + "fileSize": fileSize, + "vospaceRootParent": vospacePath + } + self.fileList.append(fileInfo.copy()) + else: + if md5calc.fileIsValid(srcPath): + fileSize = nodeInfo["contentLength"] + fileInfo = { + "baseSrcPath": baseSrcPath, + "fullPath": srcPath, + "username": username, + "fileSize": fileSize, + "vospaceRootParent": vospacePath + } + self.fileList.append(fileInfo.copy()) - # debug block... - if os.path.exists("fileList.txt"): - os.remove("fileList.txt") - fl = open("fileList.txt", 'w') - fl.write(json.dumps(self.fileList, indent = 4)) - fl.close() + # debug block... + #if os.path.exists("fileList.txt"): + # os.remove("fileList.txt") + #fl = open("fileList.txt", 'w') + #fl.write(json.dumps(self.fileList, indent = 4)) + #fl.close() + except Exception: + self.logger.exception("FATAL: something went wrong while building the list of the files to be retrieved.") + return False + else: + return True def buildBlocks(self): """ Algorithm to split data in blocks of a well known size. + This method returns 'True' on success, 'False' on failure. """ - if self.fileList: - blockIdx = 0 - blockSize = 0 - for fileInfo in self.fileList: - fileSize = fileInfo["fileSize"] - self.totalSize += fileSize - # check if the file is larger than a block size - if fileSize > self.maxBlockSize: - # if the current block is not empty, "close" it, otherwise - # use it and then create a new block - if blockSize > 0: - blockIdx += 1 - fileInfo["blockIdx"] = blockIdx - blockIdx += 1 - else: - fileInfo["blockIdx"] = blockIdx - blockIdx += 1 + try: + self.logger.info("Building the blocks data structure... ") + if self.fileList: + blockIdx = 0 blockSize = 0 - else: - # the file can be contained by a block, so check if - # the file size plus the current block fill is lower - # than the maximum block size - if blockSize + fileSize <= self.maxBlockSize: - # if so, add the file to the block and go ahead with - # the next one - fileInfo["blockIdx"] = blockIdx - blockSize += fileSize - else: - # if not, "close" the current block, add it to the block list, - # then create a new block, add the file to it and go ahead - # with the next one - blockIdx += 1 - fileInfo["blockIdx"] = blockIdx - blockSize = fileSize - if self.fileList: - self.numBlocks = blockIdx + 1 - self.dbConn.setTotalBlocks(self.jobId, self.numBlocks) + for fileInfo in self.fileList: + fileSize = fileInfo["fileSize"] + self.totalSize += fileSize + # check if the file is larger than a block size + if fileSize > self.maxBlockSize: + # if the current block is not empty, "close" it, otherwise + # use it and then create a new block + if blockSize > 0: + blockIdx += 1 + fileInfo["blockIdx"] = blockIdx + blockIdx += 1 + else: + fileInfo["blockIdx"] = blockIdx + blockIdx += 1 + blockSize = 0 + else: + # the file can be contained by a block, so check if + # the file size plus the current block fill is lower + # than the maximum block size + if blockSize + fileSize <= self.maxBlockSize: + # if so, add the file to the block and go ahead with + # the next one + fileInfo["blockIdx"] = blockIdx + blockSize += fileSize + else: + # if not, "close" the current block, add it to the block list, + # then create a new block, add the file to it and go ahead + # with the next one + blockIdx += 1 + fileInfo["blockIdx"] = blockIdx + blockSize = fileSize + if self.fileList: + self.numBlocks = blockIdx + 1 + try: + self.dbConn.setTotalBlocks(self.jobId, self.numBlocks) + except Exception: + self.logger.exception("FATAL: unable to set the total number of blocks in the database.") + return False - # debug block... - print(f"numBlocks = {self.numBlocks}") - if os.path.exists("blocks.txt"): - os.remove("blocks.txt") - fl = open("blocks.txt", 'w') - fl.write(json.dumps(self.fileList, indent = 4)) - fl.close() + # debug block... + #print(f"numBlocks = {self.numBlocks}") + #if os.path.exists("blocks.txt"): + # os.remove("blocks.txt") + #fl = open("blocks.txt", 'w') + #fl.write(json.dumps(self.fileList, indent = 4)) + #fl.close() + except Exception: + self.logger.exception("FATAL: something went wrong while building the blocks data structure.") + return False + else: + return True def retrieveCompleted(self, vospacePath): """ @@ -201,88 +235,116 @@ class RetrieveExecutor(TaskExecutor): """ Retrieves data from a generic storage point (hot or cold). """ - - # Loop on blocks - for blockIdx in range(self.numBlocks): - blockFileList = [ f for f in self.fileList if f["blockIdx"] == blockIdx ] + try: + self.logger.info("Starting data retrieval...") + # Loop on blocks + for blockIdx in range(self.numBlocks): + blockFileList = [ f for f in self.fileList if f["blockIdx"] == blockIdx ] - # Recall all files from tape library to tape frontend - # if the storage type is 'cold' - if self.storageType == "cold": - self.tapeClient.connect() - self.tapeClient.recall([ f["fullPath"] for f in blockFileList ]) - self.tapeClient.disconnect() + # Recall all files from tape library to tape frontend + # if the storage type is 'cold' + if self.storageType == "cold": + self.tapeClient.connect() + self.tapeClient.recall([ f["fullPath"] for f in blockFileList ]) + self.tapeClient.disconnect() - # Loop on files in a block - for fileInfo in blockFileList: - srcPath = fileInfo["fullPath"] - username = fileInfo["username"] - baseSrcPath = fileInfo["baseSrcPath"] - osRelParentPath = os.path.dirname(srcPath) - osRelParentPath = osRelParentPath.replace(baseSrcPath, "") - if osRelParentPath != "/": - osRelParentPath += "/" - destDirPath = self.storageRetrievePath.replace("{username}", username) + osRelParentPath - os.makedirs(destDirPath, exist_ok = True) - sp = subprocess.run(["rsync", "-av", srcPath, destDirPath], capture_output = True) - if(sp.returncode or sp.stderr): - return False + # Loop on files in a block + for fileInfo in blockFileList: + srcPath = fileInfo["fullPath"] + username = fileInfo["username"] + baseSrcPath = fileInfo["baseSrcPath"] + osRelParentPath = os.path.dirname(srcPath) + osRelParentPath = osRelParentPath.replace(baseSrcPath, "") + if osRelParentPath != "/": + osRelParentPath += "/" + destDirPath = self.storageRetrievePath.replace("{username}", username) + osRelParentPath + os.makedirs(destDirPath, exist_ok = True) + sp = subprocess.run(["rsync", "-av", srcPath, destDirPath], capture_output = True) + if(sp.returncode or sp.stderr): + self.logger.error(f"FATAL: error during the copy process, returnCode = {sp.returncode}, stderr: {sp.stderr}") + return False - # Remove files from file list at the end of the copy - for fileInfo in blockFileList: - if fileInfo in self.fileList: - self.fileList.remove(fileInfo) + # Remove files from file list at the end of the copy + for fileInfo in blockFileList: + if fileInfo in self.fileList: + self.fileList.remove(fileInfo) - # Check if the copy related to a certain VOSpace node - # is completed and recursively update the 'async_trans' - # flag - for vospacePath in self.nodeList: - if self.retrieveCompleted(vospacePath): - self.dbConn.setAsyncTrans(vospacePath, False) + # Check if the copy related to a certain VOSpace node + # is completed and recursively update the 'async_trans' + # flag + for vospacePath in self.nodeList: + if self.retrieveCompleted(vospacePath): + try: + self.dbConn.setAsyncTrans(vospacePath, False) + except Exception: + self.logger.exception("FATAL: unable to update the file catalog.") + return False - # Empty the tape library frontend if the storage type - # is 'cold' - if self.storageType == "cold": - self.tapeClient.connect() - self.tapeClient.migrate([ f["fullPath"] for f in blockFileList ], self.tapePool) - self.tapeClient.disconnect() + # Empty the tape library frontend if the storage type + # is 'cold' + if self.storageType == "cold": + self.tapeClient.connect() + self.tapeClient.migrate([ f["fullPath"] for f in blockFileList ], self.tapePool) + self.tapeClient.disconnect() - blockFileList.clear() - self.procBlocks += 1 - self.dbConn.updateProcessedBlocks(self.jobId, self.procBlocks) - return True + blockFileList.clear() + self.procBlocks += 1 + self.dbConn.updateProcessedBlocks(self.jobId, self.procBlocks) + except Exception: + self.logger.exception("FATAL: something went wrong while retrieving the data.") + return False + else: + return True + + def execute(self): + success = True + self.logger.info("++++++++++ Start of execution phase ++++++++++") + success &= self.buildFileList() & self.buildBlocks() & self.retrieveData() + if success: + self.logger.info("++++++++++ End of execution phase ++++++++++") + return True + else: + self.logger.info("FATAL: something went wrong during the execution phase.") + return False - def update(self): + def update(self, status): """ Updates the job status and sends an email to the user. """ - results = [{"target": ""}] - results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"] - #self.dbConn.setResults(self.jobId, results) - self.jobObj.setResults(results) - self.jobObj.setPhase("COMPLETED") - self.dbConn.insertJob(self.jobObj) - self.dbConn.setEndTime(self.jobId) - self.jobObj.endTime = datetime.datetime.now().isoformat() + try: + results = [{"target": ""}] + results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"] + + m = Mailer(self.logger) + m.addRecipient(self.adminEmail) + userEmail = self.dbConn.getUserEmail(self.jobObj.ownerId) + if userEmail != self.adminEmail: + m.addRecipient(userEmail) + + self.jobObj.setResults(results) + + if status == ("OK"): + self.jobObj.setPhase("COMPLETED") + self.dbConn.insertJob(self.jobObj) + self.dbConn.setEndTime(self.jobId) + self.jobObj.endTime = datetime.datetime.now().isoformat() + self.logger.info("Job phase updated to COMPLETED.") - # Add a list of physical destination paths for each VOSpace node in the node list - for vospacePath in self.nodeList: - nodeInfo = self.dbConn.getOSPath(vospacePath) - baseSrcPath = nodeInfo["baseSrcPath"] - username = nodeInfo["username"] - srcPath = nodeInfo["fullPath"] - baseDestPath = self.storageRetrievePath.replace("{username}", username) - destPath = srcPath.replace(baseSrcPath, baseDestPath) - self.destPathList.append(destPath) - self.jobObj.jobInfo["destPathList"] = self.destPathList.copy() - - m = Mailer(self.logger) - m.addRecipient(self.adminEmail) - userEmail = self.dbConn.getUserEmail(self.jobObj.ownerId) - if userEmail != self.adminEmail: - m.addRecipient(userEmail) + # Add a list of physical destination paths for each VOSpace node in the node list + self.logger.info("Generating physical destination paths for VOSpace nodes...") + for vospacePath in self.nodeList: + nodeInfo = self.dbConn.getOSPath(vospacePath) + baseSrcPath = nodeInfo["baseSrcPath"] + username = nodeInfo["username"] + srcPath = nodeInfo["fullPath"] + baseDestPath = self.storageRetrievePath.replace("{username}", username) + destPath = srcPath.replace(baseSrcPath, baseDestPath) + self.destPathList.append(destPath) + self.jobObj.jobInfo["destPathList"] = self.destPathList.copy() - msg = f""" + msg = f""" + ########## VOSpace data retrieval procedure summary ########## + Dear user, your job has been COMPLETED. @@ -293,15 +355,46 @@ class RetrieveExecutor(TaskExecutor): Your files are available and can be downloaded. """ + m.setMessage("VOSpace data retrieve notification: COMPLETED", msg) + else: + self.jobObj.setPhase("ERROR") + self.jobObj.setErrorType("fatal") + self.jobObj.setErrorMessage("FATAL: something went wrong during the execution phase.") + self.dbConn.insertJob(self.jobObj) + self.dbConn.setEndTime(self.jobId) + self.jobObj.endTime = datetime.datetime.now().isoformat() + self.logger.info("Job phase updated to ERROR.") + + msg = f""" + ########## VOSpace data retrieval procedure summary ########## + + Dear user, + your job has FAILED. + + Job ID: {self.jobId} + Job type: {self.jobObj.type} + Owner ID: {self.jobObj.ownerId} + """ + info = f""" + ERROR: + the job was terminated due to an error that occurred + while retrieveing the data from the storage point. + + This issue will be automatically reported to the administrator. - # Send e-mail notification - m.setMessage("VOSpace data retrieve notification: Job COMPLETED", msg) - m.send() + """ + msg += info + m.setMessage("VOSpace data retrieve notification: ERROR", msg) + # Send e-mail notification + m.send() + except Exception: + self.logger.exception(f"FATAL: unable to update the database, job ID: {self.jobId}") def cleanup(self): """ Cleanup method. """ + self.logger.info("Cleanup...") self.fileList.clear() self.nodeList.clear() self.destPathList.clear() @@ -316,24 +409,31 @@ class RetrieveExecutor(TaskExecutor): self.setDestinationQueueName("read_terminated") while True: self.wait() - if self.srcQueue.len() > 0: - self.jobObj = self.srcQueue.getJob() - self.jobId = self.jobObj.jobId - self.nodeList = self.jobObj.nodeList.copy() - self.buildFileList() - self.buildBlocks() - result = self.retrieveData() - if result: - self.update() - self.cleanup() + try: + srcQueueLen = self.srcQueue.len() + destQueueLen = self.destQueue.len() + except Exception: + self.logger.exception("Cache error: failed to retrieve queue length.") + else: + if srcQueueLen > 0: + self.jobObj = self.srcQueue.getJob() + self.jobId = self.jobObj.jobId + self.nodeList = self.jobObj.nodeList.copy() + if self.execute(): + self.update("OK") - # debug block... - print(f"fileList = {self.fileList}") - print(f"nodeList = {self.nodeList}") - else: - sys.exit("Failed to retrieve data!") - if self.destQueue.len() == self.maxTerminatedJobs: - self.destQueue.extractJob() - self.destQueue.insertJob(self.jobObj) - self.srcQueue.extractJob() - self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") + # debug block... + #print(f"fileList = {self.fileList}") + #print(f"nodeList = {self.nodeList}") + else: + self.update("ERROR") + self.cleanup() + try: + if destQueueLen >= self.maxTerminatedJobs: + self.destQueue.extractJob() + self.destQueue.insertJob(self.jobObj) + self.srcQueue.extractJob() + except Exception: + self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") + else: + self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") -- GitLab