diff --git a/transfer_service/abort_job_rpc_server.py b/transfer_service/abort_job_rpc_server.py index f231c83da831b4eab4d18a1bddf797453edf82cf..236371cac0ad9a0acc1c63c8c5aa3dffbea3f884 100644 --- a/transfer_service/abort_job_rpc_server.py +++ b/transfer_service/abort_job_rpc_server.py @@ -32,11 +32,11 @@ class AbortJobRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) super(AbortJobRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/data_rpc_server.py b/transfer_service/data_rpc_server.py index 31881603cfbbde469f2427eb807ee2957baa9df1..7c6d8c16d3df188056f1028b9f586843929a5fb5 100644 --- a/transfer_service/data_rpc_server.py +++ b/transfer_service/data_rpc_server.py @@ -39,11 +39,11 @@ class DataRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) self.pendingQueueWrite = JobQueue("write_pending") self.systemUtils = SystemUtils() super(DataRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/file_grouper.py b/transfer_service/file_grouper.py index 7f2b57fcda28be555a6f24ba504bd3f47dc37a9a..6bb54023f2657aec38582512f85efb3949a1bdd3 100644 --- a/transfer_service/file_grouper.py +++ b/transfer_service/file_grouper.py @@ -61,15 +61,19 @@ class FileGrouper(object): cwd = os.getcwd() parent = os.path.dirname(folder) os.chdir(parent) - sp = subprocess.run(["tar", "-cf", os.path.basename(folder) + ".tar", os.path.basename(folder)], capture_output = True) - if(sp.returncode or sp.stderr): - raise(TarFileCreationException(folder)) + try: + sp = subprocess.run(["tar", "-cf", os.path.basename(folder) + ".tar", os.path.basename(folder)], capture_output = True) + except Exception: + raise else: - try: - shutil.rmtree(folder) - except Exception: - raise - os.chdir(cwd) + if(sp.returncode or sp.stderr): + raise(TarFileCreationException(folder)) + else: + try: + shutil.rmtree(folder) + except Exception: + raise + os.chdir(cwd) # Test (creates a .tar of all the leaves containing at least 2 files and having size lower than 100G, diff --git a/transfer_service/group_rw_executor.py b/transfer_service/group_rw_executor.py index d41603a5833504e29d5476322aa71388b15a53b2..c21a9a4e0ef47aed05cc17eac0c7851b81d82da3 100644 --- a/transfer_service/group_rw_executor.py +++ b/transfer_service/group_rw_executor.py @@ -36,11 +36,11 @@ class GroupRwExecutor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] self.systemUtils = SystemUtils() self.jobObj = None diff --git a/transfer_service/group_rw_rpc_server.py b/transfer_service/group_rw_rpc_server.py index 1b742b489080349aa1ebff913742749a0e3b1cc8..adc00acf6d2bcf8ff5c1ef060062071c8c77d90a 100644 --- a/transfer_service/group_rw_rpc_server.py +++ b/transfer_service/group_rw_rpc_server.py @@ -32,11 +32,11 @@ class GroupRwRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) self.groupRwReadyQueue = JobQueue("group_rw_ready") super(GroupRwRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/import_executor.py b/transfer_service/import_executor.py index 31d0382c1a1e34bc081ef102f09a7bc9fa8997ab..324d6e98341e1942cd02c681331cecaed97500e6 100644 --- a/transfer_service/import_executor.py +++ b/transfer_service/import_executor.py @@ -40,11 +40,11 @@ class ImportExecutor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] params = config.loadSection("spectrum_archive") self.tapeClient = TapeClient(params["host"], @@ -72,10 +72,8 @@ class ImportExecutor(TaskExecutor): nodeList = [] timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") nodeListFile = os.path.join(self.resDir, "vos_import_report-" + timestamp) - #nodeListFile = self.resDir + '/' + "vos_import_report-" + timestamp nlfp = open(nodeListFile, "w") - #out = open("import_amqp_server_log.txt", "a") if self.storageType == "cold": self.tapeClient.connect() self.tapeClient.recallChecksumFiles(self.path) @@ -85,26 +83,19 @@ class ImportExecutor(TaskExecutor): tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") for dir in dirs: - #out.write(f"DIR dir: {dir}\n") - #out.write(f"DIR pathPrefix: {self.pathPrefix}\n\n") - if self.path in dir: parentPath = os.path.dirname(dir).split(self.pathPrefix)[1] nodeName = os.path.basename(dir) - cnode = Node(nodeName, "container") - if not tstampWrapperDirPattern.match("/" + nodeName): if tstampWrapperDirPattern.search(parentPath): tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') parentPath = tstampWrapperDirPattern.sub("", parentPath) cnode.setWrapperDir(tstampWrapperDir) - if parentPath == '/': vospacePath = parentPath + nodeName else: vospacePath = parentPath + '/' + nodeName - cnode.setParentPath(parentPath) locationId = self.dbConn.getLocationId(self.storageId) cnode.setLocationId(locationId) @@ -126,23 +117,14 @@ class ImportExecutor(TaskExecutor): for flist in files: for file in flist: if self.md5calc.fileIsValid(file) and self.path in os.path.dirname(file): - #out.write(f"FILE files: {files}\n") - #out.write(f"FILE flist: {flist}\n") - #out.write(f"FILE file: {file}\n") - #out.write(f"FILE pathPrefix: {self.pathPrefix}\n") parentPath = os.path.dirname(file).split(self.pathPrefix)[1] - #out.write(f"FILE parentPath: {parentPath}\n") nodeName = os.path.basename(file) - #out.write(f"FILE nodeName: {nodeName}\n") dnode = Node(nodeName, "data") - if tstampWrapperDirPattern.search(parentPath): tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') parentPath = tstampWrapperDirPattern.sub("", parentPath) dnode.setWrapperDir(tstampWrapperDir) - vospacePath = parentPath + '/' + nodeName - #out.write(f"FILE vospacePath: {vospacePath}\n") dnode.setParentPath(parentPath) self.storageId = self.dbConn.getStorageId(self.pathPrefix) locationId = self.dbConn.getLocationId(self.storageId) @@ -162,7 +144,6 @@ class ImportExecutor(TaskExecutor): else: now = dt.now().isoformat() nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) - #out.close() nlfp.write(tabulate(nodeList, headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"], @@ -181,8 +162,10 @@ class ImportExecutor(TaskExecutor): m = Mailer(self.logger) m.addRecipient(self.adminEmail) msg = f""" - [VOSpace import procedure summary] + ########## VOSpace import procedure summary ########## + Job ID: {self.jobId} + Job type: {self.jobObj.type} Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} diff --git a/transfer_service/import_rpc_server.py b/transfer_service/import_rpc_server.py index f38072daf5bef7f6d384c397490b8da49fe67d9d..c08cc8fa8d027d8f4f8d343e5709411e0ca5d50e 100644 --- a/transfer_service/import_rpc_server.py +++ b/transfer_service/import_rpc_server.py @@ -35,11 +35,11 @@ class ImportRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) self.importReadyQueue = JobQueue("import_ready") self.systemUtils = SystemUtils() super(ImportRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/job_rpc_server.py b/transfer_service/job_rpc_server.py index 77531076500a4c69f90b2ee5bffd05f266c75784..3c8ae815946b136c4a7e8e17bac3daec840634d5 100644 --- a/transfer_service/job_rpc_server.py +++ b/transfer_service/job_rpc_server.py @@ -29,11 +29,11 @@ class JobRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) super(JobRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/log_listener.py b/transfer_service/log_listener.py index 2b2b8a1cd363c1713af15f51a9ce6f528ffd5bca..f933e04dc584eb6f0281febea86c4fd4ddf1c0ba 100644 --- a/transfer_service/log_listener.py +++ b/transfer_service/log_listener.py @@ -29,7 +29,7 @@ class LogListener(Process): if os.path.exists(self.logFilePath): os.remove(self.logFilePath) while True: - time.sleep(1) + time.sleep(0.2) try: lfp = open(self.logFilePath, 'a') except IOError: diff --git a/transfer_service/redis_log_handler.py b/transfer_service/redis_log_handler.py index fe6a94de759f71a2d1bc3f30a7f4cf3ab5a02ece..ed41092964fa52b1711ea3c17abaf6d2064f2b11 100644 --- a/transfer_service/redis_log_handler.py +++ b/transfer_service/redis_log_handler.py @@ -5,6 +5,8 @@ import logging from config import Config +from redis.exceptions import ConnectionError + class RedisLogHandler(object): """ @@ -25,9 +27,11 @@ class RedisLogHandler(object): def handle(self, record): try: self.redisCli.lpush(self.logQueue, self.formatter.format(record)) - except: + except ConnectionError: # Redis is not responding... pass + except Exception: + raise def setFormatter(self, formatter): self.formatter = formatter diff --git a/transfer_service/retrieve_cleaner.py b/transfer_service/retrieve_cleaner.py index 717187b939da049f6d6a8dd3d1c46c7aeb33b249..33441673b6c88298b11003f962c1bac2ee1e838b 100644 --- a/transfer_service/retrieve_cleaner.py +++ b/transfer_service/retrieve_cleaner.py @@ -37,11 +37,11 @@ class RetrieveCleaner(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) self.jobObj = None self.username = None self.nodeList = [] diff --git a/transfer_service/retrieve_executor.py b/transfer_service/retrieve_executor.py index 2aafc4ad97668fb9237492daba38fba60d4112aa..6e3a6a70da0c94a556d269d1e90043f766fd0ea7 100644 --- a/transfer_service/retrieve_executor.py +++ b/transfer_service/retrieve_executor.py @@ -64,11 +64,11 @@ class RetrieveExecutor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) params = config.loadSection("spectrum_archive") self.tapePool = params["tape_pool"] self.tapeClient = TapeClient(params["host"], @@ -290,7 +290,8 @@ class RetrieveExecutor(TaskExecutor): Dear user, your job has been COMPLETED. - Job ID: {self.jobObj.jobId} + Job ID: {self.jobId} + Job type: {self.jobObj.type} Owner ID: {self.jobObj.ownerId} Your files are available and can be downloaded. diff --git a/transfer_service/retrieve_preprocessor.py b/transfer_service/retrieve_preprocessor.py index 2630d1498410996066826b8c935dbe574eb0d72b..97d2099bc5ae6e4f33e34bed96610a88345da385 100644 --- a/transfer_service/retrieve_preprocessor.py +++ b/transfer_service/retrieve_preprocessor.py @@ -33,11 +33,11 @@ class RetrievePreprocessor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) self.jobObj = None self.nodeList = [] super(RetrievePreprocessor, self).__init__() @@ -87,7 +87,7 @@ class RetrievePreprocessor(TaskExecutor): try: srcQueueLen = self.srcQueue.len() destQueueLen = self.destQueue.len() - except: + except Exception: self.logger.exception("Cache error: unable to retrieve queue length.") else: if destQueueLen < self.maxReadyJobs and srcQueueLen > 0: diff --git a/transfer_service/storage_rpc_server.py b/transfer_service/storage_rpc_server.py index 9fc06f4a5bbfe5ece0bb078c09473bace785231e..fbce4f9ba213f1ba701b3592bc8e21b7d107806e 100644 --- a/transfer_service/storage_rpc_server.py +++ b/transfer_service/storage_rpc_server.py @@ -29,11 +29,11 @@ class StorageRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) super(StorageRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index 72880e7e460607ef446dcc46a68269d3d00e2ade..6d554d0b17f35aeaaa73c87e03a33f38a4887331 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -50,11 +50,11 @@ class StorePreprocessor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - logStreamHandler = logging.StreamHandler() - logStreamHandler.setFormatter(logFormatter) + #logStreamHandler = logging.StreamHandler() + #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - self.logger.addHandler(logStreamHandler) + #self.logger.addHandler(logStreamHandler) self.storageId = None self.storageType = None self.jobObj = None @@ -213,11 +213,12 @@ class StorePreprocessor(TaskExecutor): return False self.logger.info("Overall data size calculation") self.jobObj.jobInfo["dataSize"] = self.systemUtils.getSize(self.path) - self.logger.info("++++++++++ End of preprocessing phase ++++++++++") - return True except Exception: self.logger.exception("FATAL: something went wrong during the preprocessing phase.") return False + else: + self.logger.info("++++++++++ End of preprocessing phase ++++++++++") + return True def update(self, status): try: @@ -287,7 +288,7 @@ class StorePreprocessor(TaskExecutor): try: srcQueueLen = self.srcQueue.len() destQueueLen = self.destQueue.len() - except: + except Exception: self.logger.exception("Cache error: failed to retrieve queue length.") else: if destQueueLen < self.maxReadyJobs and srcQueueLen > 0: diff --git a/transfer_service/transfer_service.py b/transfer_service/transfer_service.py index 359027ff86e8beeff49b4928318879a899849623..192a19471d77d1d056f89cd2cd7e68d07f6cfd25 100644 --- a/transfer_service/transfer_service.py +++ b/transfer_service/transfer_service.py @@ -1,6 +1,8 @@ #!/usr/bin/env python import logging +import os +import sys from config import Config from cli_handler import CliHandler @@ -15,7 +17,7 @@ class TransferService(object): def __init__(self): config = Config("/etc/vos_ts/vos_ts.conf") params = config.loadSection("logging") - self.logger = logging.getLogger(__name__) + self.logger = logging.getLogger("vos_ts") logLevel = "logging." + params["log_level"] logFormat = params["log_format"] logFormatter = logging.Formatter(logFormat) @@ -63,6 +65,7 @@ class TransferService(object): self.logListener = LogListener() def start(self): + #if "SUDO_UID" in os.environ.keys(): # Startup self.logListener.start() self.jobScheduler.start() @@ -71,6 +74,9 @@ class TransferService(object): self.logger.info("##########################################################") self.logger.info("########## VOSpace Transfer Service is RUNNING! ##########") self.logger.info("##########################################################") + #else: + # print("The VOSpace Transfer Service requires super user privileges.") + # sys.exit(1) ts = TransferService() ts.start()