diff --git a/transfer_service/abort_job_rpc_server.py b/transfer_service/abort_job_rpc_server.py index 236371cac0ad9a0acc1c63c8c5aa3dffbea3f884..8176243d26e1aac89b2215a2151975a0e849a90c 100644 --- a/transfer_service/abort_job_rpc_server.py +++ b/transfer_service/abort_job_rpc_server.py @@ -32,11 +32,8 @@ class AbortJobRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) super(AbortJobRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/checksum.py b/transfer_service/checksum.py index 3f7119a62672fbcbbeafbc6566545a9f67c1e9cf..a7480497fd7003d8a7510d48d00483da0178b5db 100644 --- a/transfer_service/checksum.py +++ b/transfer_service/checksum.py @@ -44,9 +44,9 @@ class Checksum(object): md5sum = row.split(" ./")[0] fileName = row.split(" ./")[1].rstrip() if fileName == os.path.basename(absFilePath): + md5File.close() return md5sum - finally: - md5File.close() + md5File.close() return None def md5sum(self, filePath): @@ -80,5 +80,4 @@ class Checksum(object): for file in files: filePath = os.path.abspath(folder) + '/' + file md5file.write(self.md5sum(filePath) + " ./" + file + '\n') - finally: md5file.close() diff --git a/transfer_service/data_rpc_server.py b/transfer_service/data_rpc_server.py index 7c6d8c16d3df188056f1028b9f586843929a5fb5..aa69ae05fdf91c1ef9da71e2a292bed10f326a26 100644 --- a/transfer_service/data_rpc_server.py +++ b/transfer_service/data_rpc_server.py @@ -39,11 +39,8 @@ class DataRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.pendingQueueWrite = JobQueue("write_pending") self.systemUtils = SystemUtils() super(DataRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/group_rw_executor.py b/transfer_service/group_rw_executor.py index c21a9a4e0ef47aed05cc17eac0c7851b81d82da3..108c72bd617220170c9bf0d062f1919e0149c5c7 100644 --- a/transfer_service/group_rw_executor.py +++ b/transfer_service/group_rw_executor.py @@ -36,11 +36,8 @@ class GroupRwExecutor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] self.systemUtils = SystemUtils() self.jobObj = None diff --git a/transfer_service/group_rw_rpc_server.py b/transfer_service/group_rw_rpc_server.py index adc00acf6d2bcf8ff5c1ef060062071c8c77d90a..c9b207832278853c6f2d9b36b36b357f30e7e44a 100644 --- a/transfer_service/group_rw_rpc_server.py +++ b/transfer_service/group_rw_rpc_server.py @@ -32,11 +32,8 @@ class GroupRwRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.groupRwReadyQueue = JobQueue("group_rw_ready") super(GroupRwRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/import_executor.py b/transfer_service/import_executor.py index 324d6e98341e1942cd02c681331cecaed97500e6..2961aedb62f3c1ade63cd04acffaf5fc897a311c 100644 --- a/transfer_service/import_executor.py +++ b/transfer_service/import_executor.py @@ -40,11 +40,8 @@ class ImportExecutor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] params = config.loadSection("spectrum_archive") self.tapeClient = TapeClient(params["host"], diff --git a/transfer_service/import_rpc_server.py b/transfer_service/import_rpc_server.py index c08cc8fa8d027d8f4f8d343e5709411e0ca5d50e..0b2400f6d8238e650bd74b31eab2abffff1abc06 100644 --- a/transfer_service/import_rpc_server.py +++ b/transfer_service/import_rpc_server.py @@ -35,11 +35,8 @@ class ImportRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.importReadyQueue = JobQueue("import_ready") self.systemUtils = SystemUtils() super(ImportRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/job_rpc_server.py b/transfer_service/job_rpc_server.py index 3c8ae815946b136c4a7e8e17bac3daec840634d5..a30375712d4edd6d0c55ac421083890a632cc1c8 100644 --- a/transfer_service/job_rpc_server.py +++ b/transfer_service/job_rpc_server.py @@ -29,11 +29,8 @@ class JobRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) super(JobRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/log_listener.py b/transfer_service/log_listener.py index f933e04dc584eb6f0281febea86c4fd4ddf1c0ba..9e9d663bb8060e77b44ac1658731464f1e765543 100644 --- a/transfer_service/log_listener.py +++ b/transfer_service/log_listener.py @@ -43,5 +43,4 @@ class LogListener(Process): raise else: lfp.write(logRecord + '\n') - finally: lfp.close() diff --git a/transfer_service/retrieve_cleaner.py b/transfer_service/retrieve_cleaner.py index 33441673b6c88298b11003f962c1bac2ee1e838b..6815d1ffc6dffd201f8ffc8aee6c0b06d0bfe3d7 100644 --- a/transfer_service/retrieve_cleaner.py +++ b/transfer_service/retrieve_cleaner.py @@ -37,11 +37,8 @@ class RetrieveCleaner(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.jobObj = None self.username = None self.nodeList = [] diff --git a/transfer_service/retrieve_executor.py b/transfer_service/retrieve_executor.py index 6e3a6a70da0c94a556d269d1e90043f766fd0ea7..0243185f648292031fa49c2adf0f3c432af05c4e 100644 --- a/transfer_service/retrieve_executor.py +++ b/transfer_service/retrieve_executor.py @@ -64,11 +64,8 @@ class RetrieveExecutor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) params = config.loadSection("spectrum_archive") self.tapePool = params["tape_pool"] self.tapeClient = TapeClient(params["host"], diff --git a/transfer_service/retrieve_preprocessor.py b/transfer_service/retrieve_preprocessor.py index 97d2099bc5ae6e4f33e34bed96610a88345da385..44d12d8968a83d9c122bd51b9db6292cce0aa13f 100644 --- a/transfer_service/retrieve_preprocessor.py +++ b/transfer_service/retrieve_preprocessor.py @@ -33,11 +33,8 @@ class RetrievePreprocessor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.jobObj = None self.nodeList = [] super(RetrievePreprocessor, self).__init__() diff --git a/transfer_service/start_job_rpc_server.py b/transfer_service/start_job_rpc_server.py index 48dac220697661795c9ba1823c660a498d9b223f..cdcb846d8d9db44a8f7ea3105e362d4338111083 100644 --- a/transfer_service/start_job_rpc_server.py +++ b/transfer_service/start_job_rpc_server.py @@ -38,11 +38,8 @@ class StartJobRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.pendingQueueRead = JobQueue("read_pending") self.terminatedQueueRead = JobQueue("read_terminated") super(StartJobRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/storage_rpc_server.py b/transfer_service/storage_rpc_server.py index fbce4f9ba213f1ba701b3592bc8e21b7d107806e..5c47842a9869538a7dba95d22595287b80d4e07f 100644 --- a/transfer_service/storage_rpc_server.py +++ b/transfer_service/storage_rpc_server.py @@ -29,11 +29,8 @@ class StorageRPCServer(RedisRPCServer): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) super(StorageRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/store_executor.py b/transfer_service/store_executor.py index 1f00586d11bcb904b59cb546d3444a58c060a9da..fd42182e67610061caa19e7cc5edd3da5e95bcfa 100644 --- a/transfer_service/store_executor.py +++ b/transfer_service/store_executor.py @@ -50,11 +50,8 @@ class StoreExecutor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] params = config.loadSection("spectrum_archive") self.tapePool = params["tape_pool"] @@ -151,13 +148,12 @@ class StoreExecutor(TaskExecutor): nlfp.write(tabulate(self.nodeList, headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"], tablefmt = "simple")) - finally: nlfp.close() - - self.logger.info("Updating job phase to COMPLETED...") + self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) + self.logger.info("Job phase updated to COMPLETED.") msg = f""" ########## VOSpace data storage procedure summary ########## diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index 6d554d0b17f35aeaaa73c87e03a33f38a4887331..3cc9d02cc45c9b7b93a844d1747c8bfe0c495355 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -50,11 +50,8 @@ class StorePreprocessor(TaskExecutor): logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() - #logStreamHandler = logging.StreamHandler() - #logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) - #self.logger.addHandler(logStreamHandler) self.storageId = None self.storageType = None self.jobObj = None diff --git a/transfer_service/system_utils.py b/transfer_service/system_utils.py index ae530f143a839f4fc26b1bf4cd3db32d4beb36a3..3b9bdee38978156fe1f3d00e657c322cec24adcb 100644 --- a/transfer_service/system_utils.py +++ b/transfer_service/system_utils.py @@ -24,20 +24,21 @@ class SystemUtils(object): Parses '/etc/passwd' and returns user, uid and gid associated to a given username. """ - fp = open("/etc/passwd", 'r') - for line in fp: - info = line.split(':') - user = info[0] - uid = int(info[2]) - gid = int(info[3]) - if user == username: - fp.close() - return [ user, uid, gid ] - fp.close() - return False - - def setDefaultPerms(self, username): - info = self.userInfo(username) + try: + fp = open("/etc/passwd", 'r') + except FileNotFoundError: + raise + else: + for line in fp: + info = line.split(':') + user = info[0] + uid = int(info[2]) + gid = int(info[3]) + if user == username: + fp.close() + return [ user, uid, gid ] + fp.close() + return False def scanRecursive(self, path): dirList = [] diff --git a/transfer_service/tape_client.py b/transfer_service/tape_client.py index 9dc57d345c3c2cfc4ce28b3bb9a2fad23e42ea18..73abcfa023d976d096458480520ba770f6a6e4ca 100644 --- a/transfer_service/tape_client.py +++ b/transfer_service/tape_client.py @@ -28,8 +28,6 @@ class TapeClient(object): self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.keyFile = keyFile - #self.key = paramiko.RSAKey.from_private_key_file(keyFile) - #self.client.load_system_host_keys() self.scp = None self.taskList = [] self.poolList = [] @@ -172,8 +170,6 @@ class TapeClient(object): self.logger.error("MIGRATE operation FAILED.") raise TapeClientException(cmd, exitCode, stderr) return exitCode - finally: - fp.close() def recall(self, fileList): """ @@ -206,8 +202,6 @@ class TapeClient(object): self.logger.error("RECALL operation FAILED.") raise TapeClientException(cmd, exitCode, stderr) return exitCode - finally: - fp.close() def recallChecksumFiles(self, dirName): """