From 24de010cddd83096c8071a16e1e04a46ddecd084 Mon Sep 17 00:00:00 2001 From: Cristiano Urban Date: Fri, 11 Jun 2021 16:55:02 +0200 Subject: [PATCH] Added basic logging + added check on threads and processes startup + minor changes. Signed-off-by: Cristiano Urban --- transfer_service/abort_job_rpc_server.py | 35 +++++++++++---- transfer_service/checksum.py | 6 +-- transfer_service/cli_handler.py | 7 +++ transfer_service/data_rpc_server.py | 20 ++++----- transfer_service/file_grouper.py | 1 - transfer_service/get_job_rpc_server.py | 28 +++++++++--- transfer_service/import_executor.py | 3 +- transfer_service/import_rpc_server.py | 16 +++---- transfer_service/job_queue.py | 8 ++-- transfer_service/job_rpc_server.py | 12 ++--- transfer_service/job_scheduler.py | 37 ++++++++++----- transfer_service/mailer.py | 2 +- transfer_service/retrieve_executor.py | 55 ++++++++++++++--------- transfer_service/retrieve_preprocessor.py | 31 +++++++++---- transfer_service/start_job_rpc_server.py | 38 +++++++++++----- transfer_service/storage_rpc_server.py | 12 ++--- transfer_service/store_executor.py | 49 +++++++++++++------- transfer_service/store_preprocessor.py | 41 +++++++++++------ transfer_service/system_utils.py | 13 +----- transfer_service/task_executor.py | 10 ++--- transfer_service/transfer_service.py | 33 ++++++++------ transfer_service/vospace_rest_handler.py | 7 +++ 22 files changed, 300 insertions(+), 164 deletions(-) diff --git a/transfer_service/abort_job_rpc_server.py b/transfer_service/abort_job_rpc_server.py index df508f8..651b436 100644 --- a/transfer_service/abort_job_rpc_server.py +++ b/transfer_service/abort_job_rpc_server.py @@ -1,5 +1,7 @@ #!/usr/bin/env python +import logging + from redis_rpc_server import RedisRPCServer from db_connector import DbConnector from config import Config @@ -10,21 +12,38 @@ class AbortJobRPCServer(RedisRPCServer): def __init__(self, host, port, db, rpcQueue): self.type = "abort" config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 2) + params = config.loadSection("transfer_node") + self.storageStorePath = params["store_path"] + params = config.loadSection("scheduling") + self.maxPendingJobs = params.getint("max_pending_jobs") + params = config.loadSection("logging") + self.logger = logging.getLogger("AbortJobRPCServer") + logLevel = "logging." + params["log_level"] + logDir = params["log_dir"] + logFile = logDir + '/' + "abort_job_rpc_server.log" + self.logger.setLevel(eval(logLevel)) + logFileHandler = logging.FileHandler(logFile) + logStreamHandler = logging.StreamHandler() + logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + logFileHandler.setFormatter(logFormatter) + logStreamHandler.setFormatter(logFormatter) + self.logger.addHandler(logFileHandler) + self.logger.addHandler(logStreamHandler) super(AbortJobRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): #TODO # do something... - return 42 + pass def run(self): - print(f"Starting RPC server of type {self.type}...") + self.logger.info(f"Starting RPC server of type {self.type}...") super(AbortJobRPCServer, self).run() diff --git a/transfer_service/checksum.py b/transfer_service/checksum.py index 986d268..540f6fd 100644 --- a/transfer_service/checksum.py +++ b/transfer_service/checksum.py @@ -13,9 +13,9 @@ class Checksum(object): def __init__(self): self.systemUtils = SystemUtils() config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("checksum") - self.fileBufferSize = self.systemUtils.convertSizeToBytes(self.params["file_buffer_size"]) - self.md5FileSuffix = self.params["md5_file_suffix"] + params = config.loadSection("checksum") + self.fileBufferSize = self.systemUtils.convertSizeToBytes(params["file_buffer_size"]) + self.md5FileSuffix = params["md5_file_suffix"] def setFileBufferSize(fileBufferSize): """Sets the buffer size in bytes when reading a chunk of data.""" diff --git a/transfer_service/cli_handler.py b/transfer_service/cli_handler.py index 104e2ce..bf8c8ef 100644 --- a/transfer_service/cli_handler.py +++ b/transfer_service/cli_handler.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import sys +import time from data_rpc_server import DataRPCServer from import_rpc_server import ImportRPCServer @@ -31,3 +32,9 @@ class CliHandler(object): def start(self): for srv in self.rpcServerList: srv.start() + running = False + while not running: + time.sleep(1) + running = True + for srv in self.rpcServerList: + running &= srv.is_alive() diff --git a/transfer_service/data_rpc_server.py b/transfer_service/data_rpc_server.py index 648260c..1e03c82 100644 --- a/transfer_service/data_rpc_server.py +++ b/transfer_service/data_rpc_server.py @@ -25,18 +25,18 @@ class DataRPCServer(RedisRPCServer): self.type = "data" self.storeAck = False config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 2) - self.params = config.loadSection("transfer_node") - self.storageStorePath = self.params["store_path"] - self.params = config.loadSection("scheduling") - self.maxPendingJobs = self.params.getint("max_pending_jobs") + params = config.loadSection("transfer_node") + self.storageStorePath = params["store_path"] + params = config.loadSection("scheduling") + self.maxPendingJobs = params.getint("max_pending_jobs") params = config.loadSection("logging") self.logger = logging.getLogger("DataRPCServer") logLevel = "logging." + params["log_level"] diff --git a/transfer_service/file_grouper.py b/transfer_service/file_grouper.py index 9f65400..0d68eaf 100644 --- a/transfer_service/file_grouper.py +++ b/transfer_service/file_grouper.py @@ -20,7 +20,6 @@ class FileGrouper(object): """Sets the 'minimum number of files' constraint.""" self.minNumOfFiles = minNumOfFiles - def setMaxFolderSize(self, maxFolderSize): """Sets the 'maximum folder size' constraint.""" self.maxFolderSize = maxFolderSize diff --git a/transfer_service/get_job_rpc_server.py b/transfer_service/get_job_rpc_server.py index 7d06e6d..522c5b9 100644 --- a/transfer_service/get_job_rpc_server.py +++ b/transfer_service/get_job_rpc_server.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import json +import logging from redis_rpc_server import RedisRPCServer from config import Config @@ -12,14 +13,27 @@ class GetJobRPCServer(RedisRPCServer): def __init__(self, host, port, db, rpcQueue): self.type = "poll" config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 2) + params = config.loadSection("logging") + self.logger = logging.getLogger("GetJobRPCServer") + logLevel = "logging." + params["log_level"] + logDir = params["log_dir"] + logFile = logDir + '/' + "get_job_rpc_server.log" + self.logger.setLevel(eval(logLevel)) + logFileHandler = logging.FileHandler(logFile) + logStreamHandler = logging.StreamHandler() + logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + logFileHandler.setFormatter(logFormatter) + logStreamHandler.setFormatter(logFormatter) + self.logger.addHandler(logFileHandler) + self.logger.addHandler(logStreamHandler) super(GetJobRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): @@ -32,5 +46,5 @@ class GetJobRPCServer(RedisRPCServer): return 42 def run(self): - print(f"Starting RPC server of type {self.type}...") + self.logger.info(f"Starting RPC server of type {self.type}...") super(GetJobRPCServer, self).run() diff --git a/transfer_service/import_executor.py b/transfer_service/import_executor.py index 7496fe6..6762b27 100644 --- a/transfer_service/import_executor.py +++ b/transfer_service/import_executor.py @@ -19,6 +19,7 @@ from task_executor import TaskExecutor class ImportExecutor(TaskExecutor): def __init__(self): + self.type = "import_executor" self.md5calc = Checksum() config = Config("/etc/vos_ts/vos_ts.conf") params = config.loadSection("file_catalog") @@ -35,7 +36,7 @@ class ImportExecutor(TaskExecutor): params["user"], params["pkey_file_path"]) params = config.loadSection("logging") - self.logger = logging.getLogger("import_executor") + self.logger = logging.getLogger("ImportExecutor") logLevel = "logging." + params["log_level"] logDir = params["log_dir"] logFile = logDir + '/' + "import_executor.log" diff --git a/transfer_service/import_rpc_server.py b/transfer_service/import_rpc_server.py index ef7c702..54254c3 100644 --- a/transfer_service/import_rpc_server.py +++ b/transfer_service/import_rpc_server.py @@ -16,16 +16,16 @@ class ImportRPCServer(RedisRPCServer): def __init__(self, host, port, db, rpcQueue): self.type = "import" config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 2) - self.params = config.loadSection("scheduling") - self.maxReadyJobs = self.params.getint("max_ready_jobs") + params = config.loadSection("scheduling") + self.maxReadyJobs = params.getint("max_ready_jobs") params = config.loadSection("logging") self.logger = logging.getLogger("ImportRPCServer") logLevel = "logging." + params["log_level"] diff --git a/transfer_service/job_queue.py b/transfer_service/job_queue.py index 500d7ea..f7a5c06 100644 --- a/transfer_service/job_queue.py +++ b/transfer_service/job_queue.py @@ -15,10 +15,10 @@ class JobQueue(object): def __init__(self, queueName): config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("job_cache") - self.redisCli = redis.Redis(host = self.params["host"], - port = self.params["port"], - db = self.params["db_sched"]) + params = config.loadSection("job_cache") + self.redisCli = redis.Redis(host = params["host"], + port = params["port"], + db = params["db_sched"]) self.queueName = queueName def len(self): diff --git a/transfer_service/job_rpc_server.py b/transfer_service/job_rpc_server.py index 59c300d..163b6c9 100644 --- a/transfer_service/job_rpc_server.py +++ b/transfer_service/job_rpc_server.py @@ -13,12 +13,12 @@ class JobRPCServer(RedisRPCServer): def __init__(self, host, port, db, rpcQueue): self.type = "job" config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 2) params = config.loadSection("logging") diff --git a/transfer_service/job_scheduler.py b/transfer_service/job_scheduler.py index e5e5a01..94b3d5d 100644 --- a/transfer_service/job_scheduler.py +++ b/transfer_service/job_scheduler.py @@ -1,5 +1,8 @@ #!/usr/bin/env python +import time +import sys + from import_executor import ImportExecutor from retrieve_preprocessor import RetrievePreprocessor from store_preprocessor import StorePreprocessor @@ -7,19 +10,31 @@ from retrieve_executor import RetrieveExecutor from store_executor import StoreExecutor - class JobScheduler(object): def __init__(self): - self.importExecutor = ImportExecutor() - self.retrievePreprocessor = RetrievePreprocessor() - self.storePreprocessor = StorePreprocessor() - self.retrieveExecutor = RetrieveExecutor() - self.storeExecutor = StoreExecutor() + self.taskExecutorList = [] + + def addTaskExecutor(self, taskExecType): + if taskExecType == "import_executor": + self.taskExecutorList.append(ImportExecutor()) + elif taskExecType == "retrieve_preprocessor": + self.taskExecutorList.append(RetrievePreprocessor()) + elif taskExecType == "store_preprocessor": + self.taskExecutorList.append(StorePreprocessor()) + elif taskExecType == "retrieve_executor": + self.taskExecutorList.append(RetrieveExecutor()) + elif taskExecType == "store_executor": + self.taskExecutorList.append(StoreExecutor()) + else: + sys.exit(f"FATAL: unknown server type {taskExecType}.") def start(self): - self.importExecutor.start() - self.retrievePreprocessor.start() - self.storePreprocessor.start() - self.retrieveExecutor.start() - self.storeExecutor.start() + for taskExec in self.taskExecutorList: + taskExec.start() + running = False + while not running: + time.sleep(1) + running = True + for taskExec in self.taskExecutorList: + running &= taskExec.is_alive() diff --git a/transfer_service/mailer.py b/transfer_service/mailer.py index 8156639..911800f 100644 --- a/transfer_service/mailer.py +++ b/transfer_service/mailer.py @@ -19,7 +19,7 @@ class Mailer(object): self.smtpServer = params["smtp_server"] self.smtpPort = params.getint("smtp_port") self.sender = params["no_reply_email"] - self.logger = logging.getLogger("import_executor.mailer") + self.logger = logging.getLogger("ImportExecutor.Mailer") self.recipients = [] self.message = None diff --git a/transfer_service/retrieve_executor.py b/transfer_service/retrieve_executor.py index 8fe711a..fdac7c8 100644 --- a/transfer_service/retrieve_executor.py +++ b/transfer_service/retrieve_executor.py @@ -20,6 +20,7 @@ import json import os +import logging import subprocess import sys @@ -34,28 +35,42 @@ from task_executor import TaskExecutor class RetrieveExecutor(TaskExecutor): def __init__(self): + self.type = "retrieve_executor" self.systemUtils = SystemUtils() config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("spectrum_archive") - self.tapeClient = TapeClient(self.params["host"], - self.params.getint("port"), - self.params["user"], - self.params["pkey_file_path"]) - self.tapePool = self.params["tape_pool"] - self.params = config.loadSection("transfer_node") - self.storageRetrievePath = self.params["retrieve_path"] - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("spectrum_archive") + self.tapeClient = TapeClient(params["host"], + params.getint("port"), + params["user"], + params["pkey_file_path"]) + self.tapePool = params["tape_pool"] + params = config.loadSection("transfer_node") + self.storageRetrievePath = params["retrieve_path"] + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 1) - self.params = config.loadSection("transfer") - self.maxBlockSize = self.systemUtils.convertSizeToBytes(self.params["block_size"]) - self.params = config.loadSection("scheduling") - self.maxTerminatedJobs = self.params.getint("max_terminated_jobs") + params = config.loadSection("transfer") + self.maxBlockSize = self.systemUtils.convertSizeToBytes(params["block_size"]) + params = config.loadSection("scheduling") + self.maxTerminatedJobs = params.getint("max_terminated_jobs") + params = config.loadSection("logging") + self.logger = logging.getLogger("RetrieveExecutor") + logLevel = "logging." + params["log_level"] + logDir = params["log_dir"] + logFile = logDir + '/' + "retrieve_executor.log" + self.logger.setLevel(eval(logLevel)) + logFileHandler = logging.FileHandler(logFile) + logStreamHandler = logging.StreamHandler() + logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + logFileHandler.setFormatter(logFormatter) + logStreamHandler.setFormatter(logFormatter) + self.logger.addHandler(logFileHandler) + self.logger.addHandler(logStreamHandler) self.storageType = None self.jobObj = None self.jobId = None @@ -261,7 +276,7 @@ class RetrieveExecutor(TaskExecutor): self.totalSize = 0 def run(self): - print("Starting retrieve executor...") + self.logger.info("Starting retrieve executor...") self.setSourceQueueName("read_ready") self.setDestinationQueueName("read_terminated") while True: @@ -287,4 +302,4 @@ class RetrieveExecutor(TaskExecutor): self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() - print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") + self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") diff --git a/transfer_service/retrieve_preprocessor.py b/transfer_service/retrieve_preprocessor.py index ad54159..fe26df4 100644 --- a/transfer_service/retrieve_preprocessor.py +++ b/transfer_service/retrieve_preprocessor.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import json +import logging import os from config import Config @@ -11,15 +12,29 @@ from task_executor import TaskExecutor class RetrievePreprocessor(TaskExecutor): def __init__(self): + self.type = "retrieve_preprocessor" config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 1) + params = config.loadSection("logging") + self.logger = logging.getLogger("RetrievePreprocessor") + logLevel = "logging." + params["log_level"] + logDir = params["log_dir"] + logFile = logDir + '/' + "retrieve_preprocessor.log" + self.logger.setLevel(eval(logLevel)) + logFileHandler = logging.FileHandler(logFile) + logStreamHandler = logging.StreamHandler() + logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + logFileHandler.setFormatter(logFormatter) + logStreamHandler.setFormatter(logFormatter) + self.logger.addHandler(logFileHandler) + self.logger.addHandler(logStreamHandler) self.jobObj = None self.nodeList = [] super(RetrievePreprocessor, self).__init__() @@ -42,7 +57,7 @@ class RetrievePreprocessor(TaskExecutor): self.nodeList.clear() def run(self): - print("Starting retrieve preprocessor...") + self.logger.info("Starting retrieve preprocessor...") self.setSourceQueueName("read_pending") self.setDestinationQueueName("read_ready") while True: @@ -53,4 +68,4 @@ class RetrievePreprocessor(TaskExecutor): self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() self.cleanup() - print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") + self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") diff --git a/transfer_service/start_job_rpc_server.py b/transfer_service/start_job_rpc_server.py index 0979335..ed40b49 100644 --- a/transfer_service/start_job_rpc_server.py +++ b/transfer_service/start_job_rpc_server.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import json +import logging from redis_rpc_server import RedisRPCServer from config import Config @@ -14,17 +15,34 @@ class StartJobRPCServer(RedisRPCServer): def __init__(self, host, port, db, rpcQueue): self.type = "start" config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 2) - self.params = config.loadSection("scheduling") - self.maxPendingJobs = self.params.getint("max_pending_jobs") - self.maxTerminatedJobs = self.params.getint("max_terminated_jobs") + params = config.loadSection("scheduling") + self.maxPendingJobs = params.getint("max_pending_jobs") + self.maxTerminatedJobs = params.getint("max_terminated_jobs") + params = config.loadSection("transfer_node") + self.storageStorePath = params["store_path"] + params = config.loadSection("scheduling") + self.maxPendingJobs = params.getint("max_pending_jobs") + params = config.loadSection("logging") + self.logger = logging.getLogger("StartJobRPCServer") + logLevel = "logging." + params["log_level"] + logDir = params["log_dir"] + logFile = logDir + '/' + "start_job_rpc_server.log" + self.logger.setLevel(eval(logLevel)) + logFileHandler = logging.FileHandler(logFile) + logStreamHandler = logging.StreamHandler() + logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + logFileHandler.setFormatter(logFormatter) + logStreamHandler.setFormatter(logFormatter) + self.logger.addHandler(logFileHandler) + self.logger.addHandler(logStreamHandler) self.pendingQueueRead = JobQueue("read_pending") self.terminatedQueueRead = JobQueue("read_terminated") super(StartJobRPCServer, self).__init__(host, port, db, rpcQueue) @@ -61,5 +79,5 @@ class StartJobRPCServer(RedisRPCServer): return response def run(self): - print(f"Starting RPC server of type {self.type}...") + self.logger.info(f"Starting RPC server of type {self.type}...") super(StartJobRPCServer, self).run() diff --git a/transfer_service/storage_rpc_server.py b/transfer_service/storage_rpc_server.py index 7345931..7faf672 100644 --- a/transfer_service/storage_rpc_server.py +++ b/transfer_service/storage_rpc_server.py @@ -13,12 +13,12 @@ class StorageRPCServer(RedisRPCServer): def __init__(self, host, port, db, rpcQueue): self.type = "storage" config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 2) params = config.loadSection("logging") diff --git a/transfer_service/store_executor.py b/transfer_service/store_executor.py index 334c65f..90ba150 100644 --- a/transfer_service/store_executor.py +++ b/transfer_service/store_executor.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import os +import logging import shutil import subprocess import sys @@ -15,24 +16,38 @@ from task_executor import TaskExecutor class StoreExecutor(TaskExecutor): def __init__(self): + self.type = "store_executor" config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("spectrum_archive") - self.tapeClient = TapeClient(self.params["host"], - self.params.getint("port"), - self.params["user"], - self.params["pkey_file_path"]) - self.params = config.loadSection("transfer_node") - self.storageStorePath = self.params["store_path"] - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("spectrum_archive") + self.tapeClient = TapeClient(params["host"], + params.getint("port"), + params["user"], + params["pkey_file_path"]) + params = config.loadSection("transfer_node") + self.storageStorePath = params["store_path"] + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 1) - self.params = config.loadSection("scheduling") - self.maxTerminatedJobs = self.params.getint("max_terminated_jobs") + params = config.loadSection("scheduling") + self.maxTerminatedJobs = params.getint("max_terminated_jobs") + params = config.loadSection("logging") + self.logger = logging.getLogger("StoreExecutor") + logLevel = "logging." + params["log_level"] + logDir = params["log_dir"] + logFile = logDir + '/' + "store_executor.log" + self.logger.setLevel(eval(logLevel)) + logFileHandler = logging.FileHandler(logFile) + logStreamHandler = logging.StreamHandler() + logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + logFileHandler.setFormatter(logFormatter) + logStreamHandler.setFormatter(logFormatter) + self.logger.addHandler(logFileHandler) + self.logger.addHandler(logStreamHandler) self.jobObj = None self.jobId = None self.username = None @@ -87,7 +102,7 @@ class StoreExecutor(TaskExecutor): out.close() def run(self): - print("Starting store executor...") + self.logger.info("Starting store executor...") self.setSourceQueueName("write_ready") self.setDestinationQueueName("write_terminated") while True: @@ -118,4 +133,4 @@ class StoreExecutor(TaskExecutor): self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() - print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") + self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index 1e3da93..8c57f3a 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -6,6 +6,7 @@ # import json +import logging import os import re import shutil @@ -25,22 +26,36 @@ from task_executor import TaskExecutor class StorePreprocessor(TaskExecutor): def __init__(self): + self.type = "store_preprocessor" self.systemUtils = SystemUtils() self.md5calc = Checksum() config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("file_grouper") - self.fileGrouper = FileGrouper(self.params.getint("min_num_files"), - self.systemUtils.convertSizeToBytes(self.params["max_dir_size"])) - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("file_grouper") + self.fileGrouper = FileGrouper(params.getint("min_num_files"), + self.systemUtils.convertSizeToBytes(params["max_dir_size"])) + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 1) - self.params = config.loadSection("transfer_node") - self.storageStorePath = self.params["store_path"] + params = config.loadSection("transfer_node") + self.storageStorePath = params["store_path"] + params = config.loadSection("logging") + self.logger = logging.getLogger("StorePreprocessor") + logLevel = "logging." + params["log_level"] + logDir = params["log_dir"] + logFile = logDir + '/' + "store_preprocessor.log" + self.logger.setLevel(eval(logLevel)) + logFileHandler = logging.FileHandler(logFile) + logStreamHandler = logging.StreamHandler() + logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + logFileHandler.setFormatter(logFormatter) + logStreamHandler.setFormatter(logFormatter) + self.logger.addHandler(logFileHandler) + self.logger.addHandler(logStreamHandler) self.storageId = None self.storageType = None self.jobObj = None @@ -197,7 +212,7 @@ class StorePreprocessor(TaskExecutor): self.dbConn.setPhase(self.jobId, "QUEUED") def run(self): - print("Starting store preprocessor...") + self.logger.info("Starting store preprocessor...") self.setSourceQueueName("write_pending") self.setDestinationQueueName("write_ready") while True: @@ -215,7 +230,7 @@ class StorePreprocessor(TaskExecutor): self.nodeList.clear() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() - print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") + self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") # Test #sp = StorePreprocessor() diff --git a/transfer_service/system_utils.py b/transfer_service/system_utils.py index d586b4d..673e01e 100644 --- a/transfer_service/system_utils.py +++ b/transfer_service/system_utils.py @@ -41,14 +41,6 @@ class SystemUtils(object): def scanRecursive(self, path): dirList = [] fileList = [] - #if os.path.isfile(path): - # p = path - # while p != '/': - # p = os.path.dirname(p) - # dirList.append(p) - # dirList.reverse() - # fileList.append([os.path.abspath(path)]) - # return [ dirList, fileList ] for folder, subfolders, files in os.walk(path, topdown = True): cwd = os.path.basename(folder) if folder != path: @@ -83,10 +75,7 @@ class SystemUtils(object): return False else: return size * self.UNITS[unit] - - - - + # Test #sysUtils = SystemUtils() diff --git a/transfer_service/task_executor.py b/transfer_service/task_executor.py index 88d1988..6fb1a9f 100644 --- a/transfer_service/task_executor.py +++ b/transfer_service/task_executor.py @@ -12,11 +12,11 @@ class TaskExecutor(Process): def __init__(self): config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("scheduling") - self.maxPendingJobs = self.params.getint("max_pending_jobs") - self.maxReadyJobs = self.params.getint("max_ready_jobs") - self.maxTerminatedJobs = self.params.getint("max_terminated_jobs") - self.execWaitTime = self.params.getint("exec_wait_time") + params = config.loadSection("scheduling") + self.maxPendingJobs = params.getint("max_pending_jobs") + self.maxReadyJobs = params.getint("max_ready_jobs") + self.maxTerminatedJobs = params.getint("max_terminated_jobs") + self.execWaitTime = params.getint("exec_wait_time") if self.execWaitTime < 10: self.execWaitTime = 10 self.srcQueue = None diff --git a/transfer_service/transfer_service.py b/transfer_service/transfer_service.py index 0b12228..7c458c2 100644 --- a/transfer_service/transfer_service.py +++ b/transfer_service/transfer_service.py @@ -10,35 +10,42 @@ class TransferService(object): def __init__(self): config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("job_cache") - self.cliHandler = CliHandler(self.params["host"], self.params.getint("port"), self.params.getint("db_sched")) - self.vosRestHandler = VOSpaceRestHandler(self.params["host"], self.params.getint("port"), self.params.getint("db_sched")) + params = config.loadSection("job_cache") + self.cliHandler = CliHandler(params["host"], params.getint("port"), params.getint("db_sched")) + self.vosRestHandler = VOSpaceRestHandler(params["host"], params.getint("port"), params.getint("db_sched")) self.jobScheduler = JobScheduler() # PullToVOSpace (via REST API) - self.vosRestHandler.addRPCServer('start', 'start_job_queue') - self.vosRestHandler.addRPCServer('poll', 'poll_job_queue') - self.vosRestHandler.addRPCServer('abort', 'abort_job_queue') + self.vosRestHandler.addRPCServer("start", "start_job_queue") + self.vosRestHandler.addRPCServer("poll", "poll_job_queue") + self.vosRestHandler.addRPCServer("abort", "abort_job_queue") # PushToVOSpace (via vos_data, the 'unofficial' command line client) - self.cliHandler.addRPCServer('data', 'data_queue') + self.cliHandler.addRPCServer("data", "data_queue") # Import - self.cliHandler.addRPCServer('import', 'import_queue') + self.cliHandler.addRPCServer("import", "import_queue") # Job - self.cliHandler.addRPCServer('job', 'job_queue') + self.cliHandler.addRPCServer("job", "job_queue") # Storage - self.cliHandler.addRPCServer('storage', 'storage_queue') + self.cliHandler.addRPCServer("storage", "storage_queue") + # + self.jobScheduler.addTaskExecutor("import_executor") + self.jobScheduler.addTaskExecutor("retrieve_preprocessor") + self.jobScheduler.addTaskExecutor("store_preprocessor") + self.jobScheduler.addTaskExecutor("retrieve_executor") + self.jobScheduler.addTaskExecutor("store_executor") + + + def start(self): # Startup self.jobScheduler.start() self.vosRestHandler.start() self.cliHandler.start() - - def start(self): - print("\nTransfer service is RUNNING...\n") + print("\nVOSpace Transfer Service is RUNNING...\n") ts = TransferService() diff --git a/transfer_service/vospace_rest_handler.py b/transfer_service/vospace_rest_handler.py index 3b653bb..ca74445 100644 --- a/transfer_service/vospace_rest_handler.py +++ b/transfer_service/vospace_rest_handler.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import sys +import time from start_job_rpc_server import StartJobRPCServer from get_job_rpc_server import GetJobRPCServer @@ -28,3 +29,9 @@ class VOSpaceRestHandler(object): def start(self): for srv in self.rpcServerList: srv.start() + running = False + while not running: + time.sleep(1) + running = True + for srv in self.rpcServerList: + running &= srv.is_alive() -- GitLab