From 46e09fb14b08bc4932c2ecb40ea8cbb03615e666 Mon Sep 17 00:00:00 2001 From: Cristiano Urban Date: Tue, 24 Aug 2021 16:12:58 +0200 Subject: [PATCH] Minor changes. Signed-off-by: Cristiano Urban --- transfer_service/abort_job_rpc_server.py | 6 ++--- transfer_service/data_rpc_server.py | 6 ++--- transfer_service/file_grouper.py | 20 +++++++++------- transfer_service/group_rw_executor.py | 6 ++--- transfer_service/group_rw_rpc_server.py | 6 ++--- transfer_service/import_executor.py | 29 +++++------------------ transfer_service/import_rpc_server.py | 6 ++--- transfer_service/job_rpc_server.py | 6 ++--- transfer_service/log_listener.py | 2 +- transfer_service/redis_log_handler.py | 6 ++++- transfer_service/retrieve_cleaner.py | 6 ++--- transfer_service/retrieve_executor.py | 9 +++---- transfer_service/retrieve_preprocessor.py | 8 +++---- transfer_service/storage_rpc_server.py | 6 ++--- transfer_service/store_preprocessor.py | 13 +++++----- transfer_service/transfer_service.py | 8 ++++++- 16 files changed, 71 insertions(+), 72 deletions(-) diff --git a/transfer_service/abort_job_rpc_server.py b/transfer_service/abort_job_rpc_server.py index f231c83..236371c 100644 --- a/transfer_service/abort_job_rpc_server.py +++ b/transfer_service/abort_job_rpc_server.py @@ -32,11 +32,11 @@ class AbortJobRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) super(AbortJobRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/data_rpc_server.py b/transfer_service/data_rpc_server.py index 3188160..7c6d8c1 100644 --- a/transfer_service/data_rpc_server.py +++ b/transfer_service/data_rpc_server.py @@ -39,11 +39,11 @@ class DataRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) self.pendingQueueWrite = JobQueue("write_pending") self.systemUtils = SystemUtils() super(DataRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/file_grouper.py b/transfer_service/file_grouper.py index 7f2b57f..6bb5402 100644 --- a/transfer_service/file_grouper.py +++ b/transfer_service/file_grouper.py @@ -61,15 +61,19 @@ class FileGrouper(object): cwd = os.getcwd() parent = os.path.dirname(folder) os.chdir(parent) - sp = subprocess.run(["tar", "-cf", os.path.basename(folder) + ".tar", os.path.basename(folder)], capture_output = True) - if(sp.returncode or sp.stderr): - raise(TarFileCreationException(folder)) + try: + sp = subprocess.run(["tar", "-cf", os.path.basename(folder) + ".tar", os.path.basename(folder)], capture_output = True) + except Exception: + raise else: - try: - shutil.rmtree(folder) - except Exception: - raise - os.chdir(cwd) + if(sp.returncode or sp.stderr): + raise(TarFileCreationException(folder)) + else: + try: + shutil.rmtree(folder) + except Exception: + raise + os.chdir(cwd) # Test (creates a .tar of all the leaves containing at least 2 files and having size lower than 100G, diff --git a/transfer_service/group_rw_executor.py b/transfer_service/group_rw_executor.py index d41603a..c21a9a4 100644 --- a/transfer_service/group_rw_executor.py +++ b/transfer_service/group_rw_executor.py @@ -36,11 +36,11 @@ class GroupRwExecutor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] self.systemUtils = SystemUtils() self.jobObj = None diff --git a/transfer_service/group_rw_rpc_server.py b/transfer_service/group_rw_rpc_server.py index 1b742b4..adc00ac 100644 --- a/transfer_service/group_rw_rpc_server.py +++ b/transfer_service/group_rw_rpc_server.py @@ -32,11 +32,11 @@ class GroupRwRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) self.groupRwReadyQueue = JobQueue("group_rw_ready") super(GroupRwRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/import_executor.py b/transfer_service/import_executor.py index 31d0382..324d6e9 100644 --- a/transfer_service/import_executor.py +++ b/transfer_service/import_executor.py @@ -40,11 +40,11 @@ class ImportExecutor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] params = config.loadSection("spectrum_archive") self.tapeClient = TapeClient(params["host"], @@ -72,10 +72,8 @@ class ImportExecutor(TaskExecutor): nodeList = [] timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") nodeListFile = os.path.join(self.resDir, "vos_import_report-" + timestamp) - #nodeListFile = self.resDir + '/' + "vos_import_report-" + timestamp nlfp = open(nodeListFile, "w") - #out = open("import_amqp_server_log.txt", "a") if self.storageType == "cold": self.tapeClient.connect() self.tapeClient.recallChecksumFiles(self.path) @@ -85,26 +83,19 @@ class ImportExecutor(TaskExecutor): tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") for dir in dirs: - #out.write(f"DIR dir: {dir}\n") - #out.write(f"DIR pathPrefix: {self.pathPrefix}\n\n") - if self.path in dir: parentPath = os.path.dirname(dir).split(self.pathPrefix)[1] nodeName = os.path.basename(dir) - cnode = Node(nodeName, "container") - if not tstampWrapperDirPattern.match("/" + nodeName): if tstampWrapperDirPattern.search(parentPath): tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') parentPath = tstampWrapperDirPattern.sub("", parentPath) cnode.setWrapperDir(tstampWrapperDir) - if parentPath == '/': vospacePath = parentPath + nodeName else: vospacePath = parentPath + '/' + nodeName - cnode.setParentPath(parentPath) locationId = self.dbConn.getLocationId(self.storageId) cnode.setLocationId(locationId) @@ -126,23 +117,14 @@ class ImportExecutor(TaskExecutor): for flist in files: for file in flist: if self.md5calc.fileIsValid(file) and self.path in os.path.dirname(file): - #out.write(f"FILE files: {files}\n") - #out.write(f"FILE flist: {flist}\n") - #out.write(f"FILE file: {file}\n") - #out.write(f"FILE pathPrefix: {self.pathPrefix}\n") parentPath = os.path.dirname(file).split(self.pathPrefix)[1] - #out.write(f"FILE parentPath: {parentPath}\n") nodeName = os.path.basename(file) - #out.write(f"FILE nodeName: {nodeName}\n") dnode = Node(nodeName, "data") - if tstampWrapperDirPattern.search(parentPath): tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') parentPath = tstampWrapperDirPattern.sub("", parentPath) dnode.setWrapperDir(tstampWrapperDir) - vospacePath = parentPath + '/' + nodeName - #out.write(f"FILE vospacePath: {vospacePath}\n") dnode.setParentPath(parentPath) self.storageId = self.dbConn.getStorageId(self.pathPrefix) locationId = self.dbConn.getLocationId(self.storageId) @@ -162,7 +144,6 @@ class ImportExecutor(TaskExecutor): else: now = dt.now().isoformat() nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) - #out.close() nlfp.write(tabulate(nodeList, headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"], @@ -181,8 +162,10 @@ class ImportExecutor(TaskExecutor): m = Mailer(self.logger) m.addRecipient(self.adminEmail) msg = f""" - [VOSpace import procedure summary] + ########## VOSpace import procedure summary ########## + Job ID: {self.jobId} + Job type: {self.jobObj.type} Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} diff --git a/transfer_service/import_rpc_server.py b/transfer_service/import_rpc_server.py index f38072d..c08cc8f 100644 --- a/transfer_service/import_rpc_server.py +++ b/transfer_service/import_rpc_server.py @@ -35,11 +35,11 @@ class ImportRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) self.importReadyQueue = JobQueue("import_ready") self.systemUtils = SystemUtils() super(ImportRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/job_rpc_server.py b/transfer_service/job_rpc_server.py index 7753107..3c8ae81 100644 --- a/transfer_service/job_rpc_server.py +++ b/transfer_service/job_rpc_server.py @@ -29,11 +29,11 @@ class JobRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) super(JobRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/log_listener.py b/transfer_service/log_listener.py index 2b2b8a1..f933e04 100644 --- a/transfer_service/log_listener.py +++ b/transfer_service/log_listener.py @@ -29,7 +29,7 @@ class LogListener(Process): if os.path.exists(self.logFilePath): os.remove(self.logFilePath) while True: - time.sleep(1) + time.sleep(0.2) try: lfp = open(self.logFilePath, 'a') except IOError: diff --git a/transfer_service/redis_log_handler.py b/transfer_service/redis_log_handler.py index fe6a94d..ed41092 100644 --- a/transfer_service/redis_log_handler.py +++ b/transfer_service/redis_log_handler.py @@ -5,6 +5,8 @@ import logging from config import Config +from redis.exceptions import ConnectionError + class RedisLogHandler(object): """ @@ -25,9 +27,11 @@ class RedisLogHandler(object): def handle(self, record): try: self.redisCli.lpush(self.logQueue, self.formatter.format(record)) - except: + except ConnectionError: # Redis is not responding... pass + except Exception: + raise def setFormatter(self, formatter): self.formatter = formatter diff --git a/transfer_service/retrieve_cleaner.py b/transfer_service/retrieve_cleaner.py index 717187b..3344167 100644 --- a/transfer_service/retrieve_cleaner.py +++ b/transfer_service/retrieve_cleaner.py @@ -37,11 +37,11 @@ class RetrieveCleaner(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) self.jobObj = None self.username = None self.nodeList = [] diff --git a/transfer_service/retrieve_executor.py b/transfer_service/retrieve_executor.py index 2aafc4a..6e3a6a7 100644 --- a/transfer_service/retrieve_executor.py +++ b/transfer_service/retrieve_executor.py @@ -64,11 +64,11 @@ class RetrieveExecutor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) params = config.loadSection("spectrum_archive") self.tapePool = params["tape_pool"] self.tapeClient = TapeClient(params["host"], @@ -290,7 +290,8 @@ class RetrieveExecutor(TaskExecutor): Dear user, your job has been COMPLETED. - Job ID: {self.jobObj.jobId} + Job ID: {self.jobId} + Job type: {self.jobObj.type} Owner ID: {self.jobObj.ownerId} Your files are available and can be downloaded. diff --git a/transfer_service/retrieve_preprocessor.py b/transfer_service/retrieve_preprocessor.py index 2630d14..97d2099 100644 --- a/transfer_service/retrieve_preprocessor.py +++ b/transfer_service/retrieve_preprocessor.py @@ -33,11 +33,11 @@ class RetrievePreprocessor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) self.jobObj = None self.nodeList = [] super(RetrievePreprocessor, self).__init__() @@ -87,7 +87,7 @@ class RetrievePreprocessor(TaskExecutor): try: srcQueueLen = self.srcQueue.len() destQueueLen = self.destQueue.len() - except: + except Exception: self.logger.exception("Cache error: unable to retrieve queue length.") else: if destQueueLen < self.maxReadyJobs and srcQueueLen > 0: diff --git a/transfer_service/storage_rpc_server.py b/transfer_service/storage_rpc_server.py index 9fc06f4..fbce4f9 100644 --- a/transfer_service/storage_rpc_server.py +++ b/transfer_service/storage_rpc_server.py @@ -29,11 +29,11 @@ class StorageRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) super(StorageRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index 72880e7..6d554d0 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -50,11 +50,11 @@ class StorePreprocessor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) self.storageId = None self.storageType = None self.jobObj = None @@ -213,11 +213,12 @@ class StorePreprocessor(TaskExecutor): return False self.logger.info("Overall data size calculation") self.jobObj.jobInfo["dataSize"] = self.systemUtils.getSize(self.path) - self.logger.info("++++++++++ End of preprocessing phase ++++++++++") - return True except Exception: self.logger.exception("FATAL: something went wrong during the preprocessing phase.") return False + else: + self.logger.info("++++++++++ End of preprocessing phase ++++++++++") + return True def update(self, status): try: @@ -287,7 +288,7 @@ class StorePreprocessor(TaskExecutor): try: srcQueueLen = self.srcQueue.len() destQueueLen = self.destQueue.len() - except: + except Exception: self.logger.exception("Cache error: failed to retrieve queue length.") else: if destQueueLen < self.maxReadyJobs and srcQueueLen > 0: diff --git a/transfer_service/transfer_service.py b/transfer_service/transfer_service.py index 359027f..192a194 100644 --- a/transfer_service/transfer_service.py +++ b/transfer_service/transfer_service.py @@ -1,6 +1,8 @@ #!/usr/bin/env python import logging +import os +import sys from config import Config from cli_handler import CliHandler @@ -15,7 +17,7 @@ class TransferService(object): def __init__(self): config = Config("/etc/vos_ts/vos_ts.conf") params = config.loadSection("logging") - self.logger = logging.getLogger(__name__) + self.logger = logging.getLogger("vos_ts") logLevel = "logging." + params["log_level"] logFormat = params["log_format"] logFormatter = logging.Formatter(logFormat) @@ -63,6 +65,7 @@ class TransferService(object): self.logListener = LogListener() def start(self): + #if "SUDO_UID" in os.environ.keys(): # Startup self.logListener.start() self.jobScheduler.start() @@ -71,6 +74,9 @@ class TransferService(object): self.logger.info("##########################################################") self.logger.info("########## VOSpace Transfer Service is RUNNING! ##########") self.logger.info("##########################################################") + #else: + # print("The VOSpace Transfer Service requires super user privileges.") + # sys.exit(1) ts = TransferService() ts.start() -- GitLab