diff --git a/transfer_service/exceptions.py b/transfer_service/exceptions.py index b4291e10506af855ea2d37f5da46dbecbb5155fe..309c18a9e1342efd68b4bd705a1635c7d8ff9db5 100644 --- a/transfer_service/exceptions.py +++ b/transfer_service/exceptions.py @@ -29,8 +29,8 @@ class MultipleUsersException(Error): # SystemUtils exceptions class IllegalCharacterException(Error): - def __init__(self, name): - self.message = "Illegal character found in: " + name + def __init__(self): + self.message = "Illegal characters found in file/dir names." super(IllegalCharacterException, self).__init__(self.message) diff --git a/transfer_service/import_executor.py b/transfer_service/import_executor.py index 31f3f88b084e5de8dcbcd7002a4a07a4f5b4d5bc..6deb10c47593756bf0dcc4b010eb61e7d1146458 100644 --- a/transfer_service/import_executor.py +++ b/transfer_service/import_executor.py @@ -13,6 +13,7 @@ import re from config import Config from checksum import Checksum from db_connector import DbConnector +from exceptions import IllegalCharacterException from mailer import Mailer from node import Node from redis_log_handler import RedisLogHandler @@ -64,6 +65,7 @@ class ImportExecutor(TaskExecutor): self.storageId = None self.storageType = None self.nodeList = [] + self.invalidFileAndDirNames = [] super(ImportExecutor, self).__init__() def execute(self): @@ -84,6 +86,22 @@ class ImportExecutor(TaskExecutor): self.tapeClient.recallChecksumFiles(self.path) self.tapeClient.disconnect() + self.logger.info(f"Checking for invalid file/dir names in '{self.path}'") + self.invalidFileAndDirNames = self.systemUtils.findInvalidFileAndDirNames(self.path) + if self.invalidFileAndDirNames: + self.logger.warning("Found invalid file/dir names") + reportFile = os.path.join(self.resDir, "vos_import_report-" + self.jobId) + try: + rfp = open(reportFile, "w") + except IOError: + self.logger.exception("Unable to generate the 'vos_import_report'.") + else: + rfp.write(tabulate(self.invalidFileAndDirNames, + headers = [ "Path list of invalid file/dir names" ], + tablefmt = "simple")) + rfp.close() + raise IllegalCharacterException + self.logger.info(f"Recursive scan of '{os.path.dirname(self.path)}'") [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path)) @@ -264,6 +282,7 @@ class ImportExecutor(TaskExecutor): self.logger.exception(f"FATAL: unable to update the database, job ID: {self.jobId}") finally: self.nodeList.clear() + self.invalidFileAndDirNames.clear() def run(self): diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index f4509cf95db337a5013a596bdc1524d0653eb22d..0bd9f275714a0e0ee79c693433388f395f65bb66 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -14,8 +14,11 @@ import shutil import sys import time +from tabulate import tabulate + from checksum import Checksum from config import Config +from exceptions import IllegalCharacterException from file_grouper import FileGrouper from db_connector import DbConnector from mailer import Mailer @@ -47,6 +50,7 @@ class StorePreprocessor(TaskExecutor): self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() redisLogHandler.setFormatter(logFormatter) + self.resDir = params["res_dir"] self.logger.addHandler(redisLogHandler) params = config.loadSection("file_catalog") self.dbConn = DbConnector(params["user"], @@ -64,6 +68,7 @@ class StorePreprocessor(TaskExecutor): self.username = None self.userId = None self.nodeList = [] + self.invalidFileAndDirNames = [] super(StorePreprocessor, self).__init__() def prepare(self, username): @@ -103,6 +108,24 @@ class StorePreprocessor(TaskExecutor): self.logger.info(f"First-level scan of '{self.path}'") [ dirs, files ] = self.systemUtils.scan(self.path) timestamp = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S") + + # Third scan to check illegal characters in file/dir names (if any) + self.logger.info(f"Checking for invalid file/dir names in '{self.path}'") + self.invalidFileAndDirNames = self.systemUtils.findInvalidFileAndDirNames(self.path) + if self.invalidFileAndDirNames: + self.logger.warning("Found invalid file/dir names") + reportFile = os.path.join(self.resDir, "vos_data_report-" + self.jobId) + try: + rfp = open(reportFile, "w") + except IOError: + self.logger.exception("Unable to generate the 'vos_data_report'.") + else: + rfp.write(tabulate(self.invalidFileAndDirNames, + headers = [ "Path list of invalid file/dir names" ], + tablefmt = "simple")) + rfp.close() + self.cleanup() + raise IllegalCharacterException # Check if /home/user/store contains files or dirs if files or dirs: @@ -118,14 +141,10 @@ class StorePreprocessor(TaskExecutor): # /home/user/store is empty (this should be handled by data_rpc_server.py) else: self.logger.error("FATAL: the 'store' directory is empty.") - userInfo = self.systemUtils.userInfo(self.username) - uid = userInfo[1] - gid = userInfo[2] - os.chown(self.path, uid, gid) - os.chmod(self.path, 0o755) + self.cleanup() return False - # Third scan after directory structure 'check & repair' + # Fourth (and last) recursive scan self.logger.info(f"Recursive scan of '{self.path}'") [ dirs, files ] = self.systemUtils.scanRecursive(self.path) @@ -208,7 +227,7 @@ class StorePreprocessor(TaskExecutor): self.logger.exception("FATAL: unable to update the file catalog.") return False self.logger.info("Overall data size calculation") - self.jobObj.jobInfo["dataSize"] = self.systemUtils.getSize(self.path) + self.jobObj.jobInfo["dataSize"] = self.systemUtils.getSize(self.path) - os.stat(self.path).st_size except Exception: self.logger.exception("FATAL: something went wrong during the preprocessing phase.") return False @@ -278,6 +297,25 @@ class StorePreprocessor(TaskExecutor): finally: self.jobObj.jobInfo["nodeList"] = self.nodeList.copy() self.nodeList.clear() + self.invalidFileAndDirNames.clear() + + def cleanup(self): + try: + self.logger.info(f"Restoring user permissions for '{self.path}'...") + userInfo = self.systemUtils.userInfo(self.username) + uid = userInfo[1] + gid = userInfo[2] + os.chown(self.path, uid, gid) + os.chmod(self.path, 0o755) + for root, dirs, files in os.walk(self.path): + for d in dirs: + os.chown(os.path.join(root, d), uid, gid) + os.chmod(os.path.join(root, d), 0o755) + for f in files: + os.chown(os.path.join(root, f), uid, gid) + os.chmod(os.path.join(root, f), 0o755) + except Exception: + self.logger.exception(f"Unable to restore user permissions for {self.path}.") def run(self): self.logger.info("Starting store preprocessor...")