From fd80b3f58f85db7d87d6d09d2f84fde49826519e Mon Sep 17 00:00:00 2001 From: Cristiano Urban Date: Thu, 26 Aug 2021 11:38:19 +0200 Subject: [PATCH] Minor fixes + cleanup. Signed-off-by: Cristiano Urban --- transfer_service/abort_job_rpc_server.py | 3 --- transfer_service/checksum.py | 5 ++-- transfer_service/data_rpc_server.py | 3 --- transfer_service/group_rw_executor.py | 3 --- transfer_service/group_rw_rpc_server.py | 3 --- transfer_service/import_executor.py | 3 --- transfer_service/import_rpc_server.py | 3 --- transfer_service/job_rpc_server.py | 3 --- transfer_service/log_listener.py | 1 - transfer_service/retrieve_cleaner.py | 3 --- transfer_service/retrieve_executor.py | 3 --- transfer_service/retrieve_preprocessor.py | 3 --- transfer_service/start_job_rpc_server.py | 3 --- transfer_service/storage_rpc_server.py | 3 --- transfer_service/store_executor.py | 8 ++----- transfer_service/store_preprocessor.py | 3 --- transfer_service/system_utils.py | 29 ++++++++++++----------- transfer_service/tape_client.py | 6 ----- 18 files changed, 19 insertions(+), 69 deletions(-) diff --git a/transfer_service/abort_job_rpc_server.py b/transfer_service/abort_job_rpc_server.py index 236371c..8176243 100644 --- a/transfer_service/abort_job_rpc_server.py +++ b/transfer_service/abort_job_rpc_server.py @@ -32,11 +32,8 @@ class AbortJobRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) super(AbortJobRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/checksum.py b/transfer_service/checksum.py index 3f7119a..a748049 100644 --- a/transfer_service/checksum.py +++ b/transfer_service/checksum.py @@ -44,9 +44,9 @@ class Checksum(object): md5sum = row.split(" ./")[0] fileName = row.split(" ./")[1].rstrip() if fileName == os.path.basename(absFilePath): + md5File.close() return md5sum - finally: - md5File.close() + md5File.close() return None def md5sum(self, filePath): @@ -80,5 +80,4 @@ class Checksum(object): for file in files: filePath = os.path.abspath(folder) + '/' + file md5file.write(self.md5sum(filePath) + " ./" + file + '\n') - finally: md5file.close() diff --git a/transfer_service/data_rpc_server.py b/transfer_service/data_rpc_server.py index 7c6d8c1..aa69ae0 100644 --- a/transfer_service/data_rpc_server.py +++ b/transfer_service/data_rpc_server.py @@ -39,11 +39,8 @@ class DataRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.pendingQueueWrite = JobQueue("write_pending") self.systemUtils = SystemUtils() super(DataRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/group_rw_executor.py b/transfer_service/group_rw_executor.py index c21a9a4..108c72b 100644 --- a/transfer_service/group_rw_executor.py +++ b/transfer_service/group_rw_executor.py @@ -36,11 +36,8 @@ class GroupRwExecutor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] self.systemUtils = SystemUtils() self.jobObj = None diff --git a/transfer_service/group_rw_rpc_server.py b/transfer_service/group_rw_rpc_server.py index adc00ac..c9b2078 100644 --- a/transfer_service/group_rw_rpc_server.py +++ b/transfer_service/group_rw_rpc_server.py @@ -32,11 +32,8 @@ class GroupRwRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.groupRwReadyQueue = JobQueue("group_rw_ready") super(GroupRwRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/import_executor.py b/transfer_service/import_executor.py index 324d6e9..2961aed 100644 --- a/transfer_service/import_executor.py +++ b/transfer_service/import_executor.py @@ -40,11 +40,8 @@ class ImportExecutor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] params = config.loadSection("spectrum_archive") self.tapeClient = TapeClient(params["host"], diff --git a/transfer_service/import_rpc_server.py b/transfer_service/import_rpc_server.py index c08cc8f..0b2400f 100644 --- a/transfer_service/import_rpc_server.py +++ b/transfer_service/import_rpc_server.py @@ -35,11 +35,8 @@ class ImportRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.importReadyQueue = JobQueue("import_ready") self.systemUtils = SystemUtils() super(ImportRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/job_rpc_server.py b/transfer_service/job_rpc_server.py index 3c8ae81..a303757 100644 --- a/transfer_service/job_rpc_server.py +++ b/transfer_service/job_rpc_server.py @@ -29,11 +29,8 @@ class JobRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) super(JobRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/log_listener.py b/transfer_service/log_listener.py index f933e04..9e9d663 100644 --- a/transfer_service/log_listener.py +++ b/transfer_service/log_listener.py @@ -43,5 +43,4 @@ class LogListener(Process): raise else: lfp.write(logRecord + '\n') - finally: lfp.close() diff --git a/transfer_service/retrieve_cleaner.py b/transfer_service/retrieve_cleaner.py index 3344167..6815d1f 100644 --- a/transfer_service/retrieve_cleaner.py +++ b/transfer_service/retrieve_cleaner.py @@ -37,11 +37,8 @@ class RetrieveCleaner(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.jobObj = None self.username = None self.nodeList = [] diff --git a/transfer_service/retrieve_executor.py b/transfer_service/retrieve_executor.py index 6e3a6a7..0243185 100644 --- a/transfer_service/retrieve_executor.py +++ b/transfer_service/retrieve_executor.py @@ -64,11 +64,8 @@ class RetrieveExecutor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) params = config.loadSection("spectrum_archive") self.tapePool = params["tape_pool"] self.tapeClient = TapeClient(params["host"], diff --git a/transfer_service/retrieve_preprocessor.py b/transfer_service/retrieve_preprocessor.py index 97d2099..44d12d8 100644 --- a/transfer_service/retrieve_preprocessor.py +++ b/transfer_service/retrieve_preprocessor.py @@ -33,11 +33,8 @@ class RetrievePreprocessor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.jobObj = None self.nodeList = [] super(RetrievePreprocessor, self).__init__() diff --git a/transfer_service/start_job_rpc_server.py b/transfer_service/start_job_rpc_server.py index 48dac22..cdcb846 100644 --- a/transfer_service/start_job_rpc_server.py +++ b/transfer_service/start_job_rpc_server.py @@ -38,11 +38,8 @@ class StartJobRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.pendingQueueRead = JobQueue("read_pending") self.terminatedQueueRead = JobQueue("read_terminated") super(StartJobRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/storage_rpc_server.py b/transfer_service/storage_rpc_server.py index fbce4f9..5c47842 100644 --- a/transfer_service/storage_rpc_server.py +++ b/transfer_service/storage_rpc_server.py @@ -29,11 +29,8 @@ class StorageRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) super(StorageRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/store_executor.py b/transfer_service/store_executor.py index 1f00586..fd42182 100644 --- a/transfer_service/store_executor.py +++ b/transfer_service/store_executor.py @@ -50,11 +50,8 @@ class StoreExecutor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] params = config.loadSection("spectrum_archive") self.tapePool = params["tape_pool"] @@ -151,13 +148,12 @@ class StoreExecutor(TaskExecutor): nlfp.write(tabulate(self.nodeList, headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"], tablefmt = "simple")) - finally: nlfp.close() - - self.logger.info("Updating job phase to COMPLETED...") + self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) + self.logger.info("Job phase updated to COMPLETED.") msg = f""" ########## VOSpace data storage procedure summary ########## diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index 6d554d0..3cc9d02 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -50,11 +50,8 @@ class StorePreprocessor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.storageId = None self.storageType = None self.jobObj = None diff --git a/transfer_service/system_utils.py b/transfer_service/system_utils.py index ae530f1..3b9bdee 100644 --- a/transfer_service/system_utils.py +++ b/transfer_service/system_utils.py @@ -24,20 +24,21 @@ class SystemUtils(object): Parses '/etc/passwd' and returns user, uid and gid associated to a given username. """ - fp = open("/etc/passwd", 'r') - for line in fp: - info = line.split(':') - user = info[0] - uid = int(info[2]) - gid = int(info[3]) - if user == username: - fp.close() - return [ user, uid, gid ] - fp.close() - return False - - def setDefaultPerms(self, username): - info = self.userInfo(username) + try: + fp = open("/etc/passwd", 'r') + except FileNotFoundError: + raise + else: + for line in fp: + info = line.split(':') + user = info[0] + uid = int(info[2]) + gid = int(info[3]) + if user == username: + fp.close() + return [ user, uid, gid ] + fp.close() + return False def scanRecursive(self, path): dirList = [] diff --git a/transfer_service/tape_client.py b/transfer_service/tape_client.py index 9dc57d3..73abcfa 100644 --- a/transfer_service/tape_client.py +++ b/transfer_service/tape_client.py @@ -28,8 +28,6 @@ class TapeClient(object): self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.keyFile = keyFile - #self.key = paramiko.RSAKey.from_private_key_file(keyFile) - #self.client.load_system_host_keys() self.scp = None self.taskList = [] self.poolList = [] @@ -172,8 +170,6 @@ class TapeClient(object): self.logger.error("MIGRATE operation FAILED.") raise TapeClientException(cmd, exitCode, stderr) return exitCode - finally: - fp.close() def recall(self, fileList): """ @@ -206,8 +202,6 @@ class TapeClient(object): self.logger.error("RECALL operation FAILED.") raise TapeClientException(cmd, exitCode, stderr) return exitCode - finally: - fp.close() def recallChecksumFiles(self, dirName): """ -- GitLab