diff --git a/transfer_service/abort_job_rpc_server.py b/transfer_service/abort_job_rpc_server.py index df508f858dd7d382defa4a8186e6cd7824c52b31..651b43666ef8c73e930fcb20f98832cc66b8d22e 100644 --- a/transfer_service/abort_job_rpc_server.py +++ b/transfer_service/abort_job_rpc_server.py @@ -1,5 +1,7 @@ #!/usr/bin/env python +import logging + from redis_rpc_server import RedisRPCServer from db_connector import DbConnector from config import Config @@ -10,21 +12,38 @@ class AbortJobRPCServer(RedisRPCServer): def __init__(self, host, port, db, rpcQueue): self.type = "abort" config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 2) + params = config.loadSection("transfer_node") + self.storageStorePath = params["store_path"] + params = config.loadSection("scheduling") + self.maxPendingJobs = params.getint("max_pending_jobs") + params = config.loadSection("logging") + self.logger = logging.getLogger("AbortJobRPCServer") + logLevel = "logging." + params["log_level"] + logDir = params["log_dir"] + logFile = logDir + '/' + "abort_job_rpc_server.log" + self.logger.setLevel(eval(logLevel)) + logFileHandler = logging.FileHandler(logFile) + logStreamHandler = logging.StreamHandler() + logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + logFileHandler.setFormatter(logFormatter) + logStreamHandler.setFormatter(logFormatter) + self.logger.addHandler(logFileHandler) + self.logger.addHandler(logStreamHandler) super(AbortJobRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): #TODO # do something... - return 42 + pass def run(self): - print(f"Starting RPC server of type {self.type}...") + self.logger.info(f"Starting RPC server of type {self.type}...") super(AbortJobRPCServer, self).run() diff --git a/transfer_service/checksum.py b/transfer_service/checksum.py index 986d26844d541fb3e63274e65759342b91f8a4f2..540f6fdf8d2449a72167807469bb9c2283a7a218 100644 --- a/transfer_service/checksum.py +++ b/transfer_service/checksum.py @@ -13,9 +13,9 @@ class Checksum(object): def __init__(self): self.systemUtils = SystemUtils() config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("checksum") - self.fileBufferSize = self.systemUtils.convertSizeToBytes(self.params["file_buffer_size"]) - self.md5FileSuffix = self.params["md5_file_suffix"] + params = config.loadSection("checksum") + self.fileBufferSize = self.systemUtils.convertSizeToBytes(params["file_buffer_size"]) + self.md5FileSuffix = params["md5_file_suffix"] def setFileBufferSize(fileBufferSize): """Sets the buffer size in bytes when reading a chunk of data.""" diff --git a/transfer_service/cli_handler.py b/transfer_service/cli_handler.py index 104e2ce1ae8fc08c1f50a7b3a892eea1b8926c71..bf8c8efbf729fa4cf2c113c886c43a5c487bec63 100644 --- a/transfer_service/cli_handler.py +++ b/transfer_service/cli_handler.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import sys +import time from data_rpc_server import DataRPCServer from import_rpc_server import ImportRPCServer @@ -31,3 +32,9 @@ class CliHandler(object): def start(self): for srv in self.rpcServerList: srv.start() + running = False + while not running: + time.sleep(1) + running = True + for srv in self.rpcServerList: + running &= srv.is_alive() diff --git a/transfer_service/data_rpc_server.py b/transfer_service/data_rpc_server.py index 648260c6ae0d6a5e8cf7b7f3ee45aa6d4637ceee..1e03c82c85b29e353590e71d8a3bb494545837a4 100644 --- a/transfer_service/data_rpc_server.py +++ b/transfer_service/data_rpc_server.py @@ -25,18 +25,18 @@ class DataRPCServer(RedisRPCServer): self.type = "data" self.storeAck = False config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 2) - self.params = config.loadSection("transfer_node") - self.storageStorePath = self.params["store_path"] - self.params = config.loadSection("scheduling") - self.maxPendingJobs = self.params.getint("max_pending_jobs") + params = config.loadSection("transfer_node") + self.storageStorePath = params["store_path"] + params = config.loadSection("scheduling") + self.maxPendingJobs = params.getint("max_pending_jobs") params = config.loadSection("logging") self.logger = logging.getLogger("DataRPCServer") logLevel = "logging." + params["log_level"] diff --git a/transfer_service/file_grouper.py b/transfer_service/file_grouper.py index 9f654009ecaf22f053f75ad7fffb879703bbee7c..0d68eaf1104f66f31aec7f1535d517929f15388a 100644 --- a/transfer_service/file_grouper.py +++ b/transfer_service/file_grouper.py @@ -20,7 +20,6 @@ class FileGrouper(object): """Sets the 'minimum number of files' constraint.""" self.minNumOfFiles = minNumOfFiles - def setMaxFolderSize(self, maxFolderSize): """Sets the 'maximum folder size' constraint.""" self.maxFolderSize = maxFolderSize diff --git a/transfer_service/get_job_rpc_server.py b/transfer_service/get_job_rpc_server.py index 7d06e6de7b792b84a5a7ac0d0a4f04779455e4f1..522c5b9b25f538a8d7e084c5ebb219b8f078e7fd 100644 --- a/transfer_service/get_job_rpc_server.py +++ b/transfer_service/get_job_rpc_server.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import json +import logging from redis_rpc_server import RedisRPCServer from config import Config @@ -12,14 +13,27 @@ class GetJobRPCServer(RedisRPCServer): def __init__(self, host, port, db, rpcQueue): self.type = "poll" config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 2) + params = config.loadSection("logging") + self.logger = logging.getLogger("GetJobRPCServer") + logLevel = "logging." + params["log_level"] + logDir = params["log_dir"] + logFile = logDir + '/' + "get_job_rpc_server.log" + self.logger.setLevel(eval(logLevel)) + logFileHandler = logging.FileHandler(logFile) + logStreamHandler = logging.StreamHandler() + logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + logFileHandler.setFormatter(logFormatter) + logStreamHandler.setFormatter(logFormatter) + self.logger.addHandler(logFileHandler) + self.logger.addHandler(logStreamHandler) super(GetJobRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): @@ -32,5 +46,5 @@ class GetJobRPCServer(RedisRPCServer): return 42 def run(self): - print(f"Starting RPC server of type {self.type}...") + self.logger.info(f"Starting RPC server of type {self.type}...") super(GetJobRPCServer, self).run() diff --git a/transfer_service/import_executor.py b/transfer_service/import_executor.py index 7496fe66461a195f2686b1920a71e198d058bf82..6762b273f65d45781dbec61b73d0719b6f85958d 100644 --- a/transfer_service/import_executor.py +++ b/transfer_service/import_executor.py @@ -19,6 +19,7 @@ from task_executor import TaskExecutor class ImportExecutor(TaskExecutor): def __init__(self): + self.type = "import_executor" self.md5calc = Checksum() config = Config("/etc/vos_ts/vos_ts.conf") params = config.loadSection("file_catalog") @@ -35,7 +36,7 @@ class ImportExecutor(TaskExecutor): params["user"], params["pkey_file_path"]) params = config.loadSection("logging") - self.logger = logging.getLogger("import_executor") + self.logger = logging.getLogger("ImportExecutor") logLevel = "logging." + params["log_level"] logDir = params["log_dir"] logFile = logDir + '/' + "import_executor.log" diff --git a/transfer_service/import_rpc_server.py b/transfer_service/import_rpc_server.py index ef7c702f9735e31bef60ed820e56f4e4c4254740..54254c359639fcb0bee3af4f1bf1041589d183ac 100644 --- a/transfer_service/import_rpc_server.py +++ b/transfer_service/import_rpc_server.py @@ -16,16 +16,16 @@ class ImportRPCServer(RedisRPCServer): def __init__(self, host, port, db, rpcQueue): self.type = "import" config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 2) - self.params = config.loadSection("scheduling") - self.maxReadyJobs = self.params.getint("max_ready_jobs") + params = config.loadSection("scheduling") + self.maxReadyJobs = params.getint("max_ready_jobs") params = config.loadSection("logging") self.logger = logging.getLogger("ImportRPCServer") logLevel = "logging." + params["log_level"] diff --git a/transfer_service/job_queue.py b/transfer_service/job_queue.py index 500d7ead0999189fa23b32ff74b001d813c0be5b..f7a5c0652efdaa5a7e3df67ec39a5b2643caab3e 100644 --- a/transfer_service/job_queue.py +++ b/transfer_service/job_queue.py @@ -15,10 +15,10 @@ class JobQueue(object): def __init__(self, queueName): config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("job_cache") - self.redisCli = redis.Redis(host = self.params["host"], - port = self.params["port"], - db = self.params["db_sched"]) + params = config.loadSection("job_cache") + self.redisCli = redis.Redis(host = params["host"], + port = params["port"], + db = params["db_sched"]) self.queueName = queueName def len(self): diff --git a/transfer_service/job_rpc_server.py b/transfer_service/job_rpc_server.py index 59c300dce98c826ba2fd64b0353becbe7b93cb55..163b6c9e7523aaa712031f70555f332da97b26f8 100644 --- a/transfer_service/job_rpc_server.py +++ b/transfer_service/job_rpc_server.py @@ -13,12 +13,12 @@ class JobRPCServer(RedisRPCServer): def __init__(self, host, port, db, rpcQueue): self.type = "job" config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 2) params = config.loadSection("logging") diff --git a/transfer_service/job_scheduler.py b/transfer_service/job_scheduler.py index e5e5a014dca67d49f74971cd92d35e607a48cb0b..94b3d5d38e39229df11b7fa8e178bfefdef01833 100644 --- a/transfer_service/job_scheduler.py +++ b/transfer_service/job_scheduler.py @@ -1,5 +1,8 @@ #!/usr/bin/env python +import time +import sys + from import_executor import ImportExecutor from retrieve_preprocessor import RetrievePreprocessor from store_preprocessor import StorePreprocessor @@ -7,19 +10,31 @@ from retrieve_executor import RetrieveExecutor from store_executor import StoreExecutor - class JobScheduler(object): def __init__(self): - self.importExecutor = ImportExecutor() - self.retrievePreprocessor = RetrievePreprocessor() - self.storePreprocessor = StorePreprocessor() - self.retrieveExecutor = RetrieveExecutor() - self.storeExecutor = StoreExecutor() + self.taskExecutorList = [] + + def addTaskExecutor(self, taskExecType): + if taskExecType == "import_executor": + self.taskExecutorList.append(ImportExecutor()) + elif taskExecType == "retrieve_preprocessor": + self.taskExecutorList.append(RetrievePreprocessor()) + elif taskExecType == "store_preprocessor": + self.taskExecutorList.append(StorePreprocessor()) + elif taskExecType == "retrieve_executor": + self.taskExecutorList.append(RetrieveExecutor()) + elif taskExecType == "store_executor": + self.taskExecutorList.append(StoreExecutor()) + else: + sys.exit(f"FATAL: unknown server type {taskExecType}.") def start(self): - self.importExecutor.start() - self.retrievePreprocessor.start() - self.storePreprocessor.start() - self.retrieveExecutor.start() - self.storeExecutor.start() + for taskExec in self.taskExecutorList: + taskExec.start() + running = False + while not running: + time.sleep(1) + running = True + for taskExec in self.taskExecutorList: + running &= taskExec.is_alive() diff --git a/transfer_service/mailer.py b/transfer_service/mailer.py index 8156639583a3d95178a08d35340f8a1b1a67eb0b..911800f69573d1d740e5f9831ee1c5d40dff5318 100644 --- a/transfer_service/mailer.py +++ b/transfer_service/mailer.py @@ -19,7 +19,7 @@ class Mailer(object): self.smtpServer = params["smtp_server"] self.smtpPort = params.getint("smtp_port") self.sender = params["no_reply_email"] - self.logger = logging.getLogger("import_executor.mailer") + self.logger = logging.getLogger("ImportExecutor.Mailer") self.recipients = [] self.message = None diff --git a/transfer_service/retrieve_executor.py b/transfer_service/retrieve_executor.py index 8fe711a59ef2fcbbd2425f0353ab080b22ca43f0..fdac7c86625be2a5f2b892cbdcc8a3bf1ae94d14 100644 --- a/transfer_service/retrieve_executor.py +++ b/transfer_service/retrieve_executor.py @@ -20,6 +20,7 @@ import json import os +import logging import subprocess import sys @@ -34,28 +35,42 @@ from task_executor import TaskExecutor class RetrieveExecutor(TaskExecutor): def __init__(self): + self.type = "retrieve_executor" self.systemUtils = SystemUtils() config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("spectrum_archive") - self.tapeClient = TapeClient(self.params["host"], - self.params.getint("port"), - self.params["user"], - self.params["pkey_file_path"]) - self.tapePool = self.params["tape_pool"] - self.params = config.loadSection("transfer_node") - self.storageRetrievePath = self.params["retrieve_path"] - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("spectrum_archive") + self.tapeClient = TapeClient(params["host"], + params.getint("port"), + params["user"], + params["pkey_file_path"]) + self.tapePool = params["tape_pool"] + params = config.loadSection("transfer_node") + self.storageRetrievePath = params["retrieve_path"] + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 1) - self.params = config.loadSection("transfer") - self.maxBlockSize = self.systemUtils.convertSizeToBytes(self.params["block_size"]) - self.params = config.loadSection("scheduling") - self.maxTerminatedJobs = self.params.getint("max_terminated_jobs") + params = config.loadSection("transfer") + self.maxBlockSize = self.systemUtils.convertSizeToBytes(params["block_size"]) + params = config.loadSection("scheduling") + self.maxTerminatedJobs = params.getint("max_terminated_jobs") + params = config.loadSection("logging") + self.logger = logging.getLogger("RetrieveExecutor") + logLevel = "logging." + params["log_level"] + logDir = params["log_dir"] + logFile = logDir + '/' + "retrieve_executor.log" + self.logger.setLevel(eval(logLevel)) + logFileHandler = logging.FileHandler(logFile) + logStreamHandler = logging.StreamHandler() + logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + logFileHandler.setFormatter(logFormatter) + logStreamHandler.setFormatter(logFormatter) + self.logger.addHandler(logFileHandler) + self.logger.addHandler(logStreamHandler) self.storageType = None self.jobObj = None self.jobId = None @@ -261,7 +276,7 @@ class RetrieveExecutor(TaskExecutor): self.totalSize = 0 def run(self): - print("Starting retrieve executor...") + self.logger.info("Starting retrieve executor...") self.setSourceQueueName("read_ready") self.setDestinationQueueName("read_terminated") while True: @@ -287,4 +302,4 @@ class RetrieveExecutor(TaskExecutor): self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() - print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") + self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") diff --git a/transfer_service/retrieve_preprocessor.py b/transfer_service/retrieve_preprocessor.py index ad5415980b30d85261e7ef4d490f3ecd74c9e461..fe26df480e5c634c41d065216e74c950a2cfaea2 100644 --- a/transfer_service/retrieve_preprocessor.py +++ b/transfer_service/retrieve_preprocessor.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import json +import logging import os from config import Config @@ -11,15 +12,29 @@ from task_executor import TaskExecutor class RetrievePreprocessor(TaskExecutor): def __init__(self): + self.type = "retrieve_preprocessor" config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 1) + params = config.loadSection("logging") + self.logger = logging.getLogger("RetrievePreprocessor") + logLevel = "logging." + params["log_level"] + logDir = params["log_dir"] + logFile = logDir + '/' + "retrieve_preprocessor.log" + self.logger.setLevel(eval(logLevel)) + logFileHandler = logging.FileHandler(logFile) + logStreamHandler = logging.StreamHandler() + logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + logFileHandler.setFormatter(logFormatter) + logStreamHandler.setFormatter(logFormatter) + self.logger.addHandler(logFileHandler) + self.logger.addHandler(logStreamHandler) self.jobObj = None self.nodeList = [] super(RetrievePreprocessor, self).__init__() @@ -42,7 +57,7 @@ class RetrievePreprocessor(TaskExecutor): self.nodeList.clear() def run(self): - print("Starting retrieve preprocessor...") + self.logger.info("Starting retrieve preprocessor...") self.setSourceQueueName("read_pending") self.setDestinationQueueName("read_ready") while True: @@ -53,4 +68,4 @@ class RetrievePreprocessor(TaskExecutor): self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() self.cleanup() - print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") + self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") diff --git a/transfer_service/start_job_rpc_server.py b/transfer_service/start_job_rpc_server.py index 097933510891790b663d0324207d1cc21be958f9..ed40b4986dc7b208c8473e1fee203d7fa728b73f 100644 --- a/transfer_service/start_job_rpc_server.py +++ b/transfer_service/start_job_rpc_server.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import json +import logging from redis_rpc_server import RedisRPCServer from config import Config @@ -14,17 +15,34 @@ class StartJobRPCServer(RedisRPCServer): def __init__(self, host, port, db, rpcQueue): self.type = "start" config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 2) - self.params = config.loadSection("scheduling") - self.maxPendingJobs = self.params.getint("max_pending_jobs") - self.maxTerminatedJobs = self.params.getint("max_terminated_jobs") + params = config.loadSection("scheduling") + self.maxPendingJobs = params.getint("max_pending_jobs") + self.maxTerminatedJobs = params.getint("max_terminated_jobs") + params = config.loadSection("transfer_node") + self.storageStorePath = params["store_path"] + params = config.loadSection("scheduling") + self.maxPendingJobs = params.getint("max_pending_jobs") + params = config.loadSection("logging") + self.logger = logging.getLogger("StartJobRPCServer") + logLevel = "logging." + params["log_level"] + logDir = params["log_dir"] + logFile = logDir + '/' + "start_job_rpc_server.log" + self.logger.setLevel(eval(logLevel)) + logFileHandler = logging.FileHandler(logFile) + logStreamHandler = logging.StreamHandler() + logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + logFileHandler.setFormatter(logFormatter) + logStreamHandler.setFormatter(logFormatter) + self.logger.addHandler(logFileHandler) + self.logger.addHandler(logStreamHandler) self.pendingQueueRead = JobQueue("read_pending") self.terminatedQueueRead = JobQueue("read_terminated") super(StartJobRPCServer, self).__init__(host, port, db, rpcQueue) @@ -61,5 +79,5 @@ class StartJobRPCServer(RedisRPCServer): return response def run(self): - print(f"Starting RPC server of type {self.type}...") + self.logger.info(f"Starting RPC server of type {self.type}...") super(StartJobRPCServer, self).run() diff --git a/transfer_service/storage_rpc_server.py b/transfer_service/storage_rpc_server.py index 73459311949327413acb25e2518b6354f1667969..7faf672d5afede78023a180cee58da0caf2566ee 100644 --- a/transfer_service/storage_rpc_server.py +++ b/transfer_service/storage_rpc_server.py @@ -13,12 +13,12 @@ class StorageRPCServer(RedisRPCServer): def __init__(self, host, port, db, rpcQueue): self.type = "storage" config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 2) params = config.loadSection("logging") diff --git a/transfer_service/store_executor.py b/transfer_service/store_executor.py index 334c65f6ae75e9ad6ff1e3fab7a611c09f7a4ceb..90ba150502007f28098d437c5c86a60e2d2e93d4 100644 --- a/transfer_service/store_executor.py +++ b/transfer_service/store_executor.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import os +import logging import shutil import subprocess import sys @@ -15,24 +16,38 @@ from task_executor import TaskExecutor class StoreExecutor(TaskExecutor): def __init__(self): + self.type = "store_executor" config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("spectrum_archive") - self.tapeClient = TapeClient(self.params["host"], - self.params.getint("port"), - self.params["user"], - self.params["pkey_file_path"]) - self.params = config.loadSection("transfer_node") - self.storageStorePath = self.params["store_path"] - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("spectrum_archive") + self.tapeClient = TapeClient(params["host"], + params.getint("port"), + params["user"], + params["pkey_file_path"]) + params = config.loadSection("transfer_node") + self.storageStorePath = params["store_path"] + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 1) - self.params = config.loadSection("scheduling") - self.maxTerminatedJobs = self.params.getint("max_terminated_jobs") + params = config.loadSection("scheduling") + self.maxTerminatedJobs = params.getint("max_terminated_jobs") + params = config.loadSection("logging") + self.logger = logging.getLogger("StoreExecutor") + logLevel = "logging." + params["log_level"] + logDir = params["log_dir"] + logFile = logDir + '/' + "store_executor.log" + self.logger.setLevel(eval(logLevel)) + logFileHandler = logging.FileHandler(logFile) + logStreamHandler = logging.StreamHandler() + logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + logFileHandler.setFormatter(logFormatter) + logStreamHandler.setFormatter(logFormatter) + self.logger.addHandler(logFileHandler) + self.logger.addHandler(logStreamHandler) self.jobObj = None self.jobId = None self.username = None @@ -87,7 +102,7 @@ class StoreExecutor(TaskExecutor): out.close() def run(self): - print("Starting store executor...") + self.logger.info("Starting store executor...") self.setSourceQueueName("write_ready") self.setDestinationQueueName("write_terminated") while True: @@ -118,4 +133,4 @@ class StoreExecutor(TaskExecutor): self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() - print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") + self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index 1e3da933f2343c78a9d5d58ecf1ff2988dda4c9f..8c57f3a4cd1397f0d61b0f0af421b0eff7da8bc3 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -6,6 +6,7 @@ # import json +import logging import os import re import shutil @@ -25,22 +26,36 @@ from task_executor import TaskExecutor class StorePreprocessor(TaskExecutor): def __init__(self): + self.type = "store_preprocessor" self.systemUtils = SystemUtils() self.md5calc = Checksum() config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("file_grouper") - self.fileGrouper = FileGrouper(self.params.getint("min_num_files"), - self.systemUtils.convertSizeToBytes(self.params["max_dir_size"])) - self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), - self.params["db"], + params = config.loadSection("file_grouper") + self.fileGrouper = FileGrouper(params.getint("min_num_files"), + self.systemUtils.convertSizeToBytes(params["max_dir_size"])) + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], 1, 1) - self.params = config.loadSection("transfer_node") - self.storageStorePath = self.params["store_path"] + params = config.loadSection("transfer_node") + self.storageStorePath = params["store_path"] + params = config.loadSection("logging") + self.logger = logging.getLogger("StorePreprocessor") + logLevel = "logging." + params["log_level"] + logDir = params["log_dir"] + logFile = logDir + '/' + "store_preprocessor.log" + self.logger.setLevel(eval(logLevel)) + logFileHandler = logging.FileHandler(logFile) + logStreamHandler = logging.StreamHandler() + logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + logFileHandler.setFormatter(logFormatter) + logStreamHandler.setFormatter(logFormatter) + self.logger.addHandler(logFileHandler) + self.logger.addHandler(logStreamHandler) self.storageId = None self.storageType = None self.jobObj = None @@ -197,7 +212,7 @@ class StorePreprocessor(TaskExecutor): self.dbConn.setPhase(self.jobId, "QUEUED") def run(self): - print("Starting store preprocessor...") + self.logger.info("Starting store preprocessor...") self.setSourceQueueName("write_pending") self.setDestinationQueueName("write_ready") while True: @@ -215,7 +230,7 @@ class StorePreprocessor(TaskExecutor): self.nodeList.clear() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() - print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") + self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") # Test #sp = StorePreprocessor() diff --git a/transfer_service/system_utils.py b/transfer_service/system_utils.py index d586b4d7ac2e2322acded1daafbdeab240f25b97..673e01e15ee43e9197b8b2701ee86e79bf9f2484 100644 --- a/transfer_service/system_utils.py +++ b/transfer_service/system_utils.py @@ -41,14 +41,6 @@ class SystemUtils(object): def scanRecursive(self, path): dirList = [] fileList = [] - #if os.path.isfile(path): - # p = path - # while p != '/': - # p = os.path.dirname(p) - # dirList.append(p) - # dirList.reverse() - # fileList.append([os.path.abspath(path)]) - # return [ dirList, fileList ] for folder, subfolders, files in os.walk(path, topdown = True): cwd = os.path.basename(folder) if folder != path: @@ -83,10 +75,7 @@ class SystemUtils(object): return False else: return size * self.UNITS[unit] - - - - + # Test #sysUtils = SystemUtils() diff --git a/transfer_service/task_executor.py b/transfer_service/task_executor.py index 88d198842301155bdcdf409c743a46f544306431..6fb1a9feeeed1ae0e33717ba488717d9ddb90434 100644 --- a/transfer_service/task_executor.py +++ b/transfer_service/task_executor.py @@ -12,11 +12,11 @@ class TaskExecutor(Process): def __init__(self): config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("scheduling") - self.maxPendingJobs = self.params.getint("max_pending_jobs") - self.maxReadyJobs = self.params.getint("max_ready_jobs") - self.maxTerminatedJobs = self.params.getint("max_terminated_jobs") - self.execWaitTime = self.params.getint("exec_wait_time") + params = config.loadSection("scheduling") + self.maxPendingJobs = params.getint("max_pending_jobs") + self.maxReadyJobs = params.getint("max_ready_jobs") + self.maxTerminatedJobs = params.getint("max_terminated_jobs") + self.execWaitTime = params.getint("exec_wait_time") if self.execWaitTime < 10: self.execWaitTime = 10 self.srcQueue = None diff --git a/transfer_service/transfer_service.py b/transfer_service/transfer_service.py index 0b12228412580629176d5785344c022480990075..7c458c2c4c6fa021392d32a7081ec04e9570db0b 100644 --- a/transfer_service/transfer_service.py +++ b/transfer_service/transfer_service.py @@ -10,35 +10,42 @@ class TransferService(object): def __init__(self): config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("job_cache") - self.cliHandler = CliHandler(self.params["host"], self.params.getint("port"), self.params.getint("db_sched")) - self.vosRestHandler = VOSpaceRestHandler(self.params["host"], self.params.getint("port"), self.params.getint("db_sched")) + params = config.loadSection("job_cache") + self.cliHandler = CliHandler(params["host"], params.getint("port"), params.getint("db_sched")) + self.vosRestHandler = VOSpaceRestHandler(params["host"], params.getint("port"), params.getint("db_sched")) self.jobScheduler = JobScheduler() # PullToVOSpace (via REST API) - self.vosRestHandler.addRPCServer('start', 'start_job_queue') - self.vosRestHandler.addRPCServer('poll', 'poll_job_queue') - self.vosRestHandler.addRPCServer('abort', 'abort_job_queue') + self.vosRestHandler.addRPCServer("start", "start_job_queue") + self.vosRestHandler.addRPCServer("poll", "poll_job_queue") + self.vosRestHandler.addRPCServer("abort", "abort_job_queue") # PushToVOSpace (via vos_data, the 'unofficial' command line client) - self.cliHandler.addRPCServer('data', 'data_queue') + self.cliHandler.addRPCServer("data", "data_queue") # Import - self.cliHandler.addRPCServer('import', 'import_queue') + self.cliHandler.addRPCServer("import", "import_queue") # Job - self.cliHandler.addRPCServer('job', 'job_queue') + self.cliHandler.addRPCServer("job", "job_queue") # Storage - self.cliHandler.addRPCServer('storage', 'storage_queue') + self.cliHandler.addRPCServer("storage", "storage_queue") + # + self.jobScheduler.addTaskExecutor("import_executor") + self.jobScheduler.addTaskExecutor("retrieve_preprocessor") + self.jobScheduler.addTaskExecutor("store_preprocessor") + self.jobScheduler.addTaskExecutor("retrieve_executor") + self.jobScheduler.addTaskExecutor("store_executor") + + + def start(self): # Startup self.jobScheduler.start() self.vosRestHandler.start() self.cliHandler.start() - - def start(self): - print("\nTransfer service is RUNNING...\n") + print("\nVOSpace Transfer Service is RUNNING...\n") ts = TransferService() diff --git a/transfer_service/vospace_rest_handler.py b/transfer_service/vospace_rest_handler.py index 3b653bb82abf244f82515d9a76caaf630ac3cf8b..ca74445848e6c564ee037f88b575c2cf28e8e1bc 100644 --- a/transfer_service/vospace_rest_handler.py +++ b/transfer_service/vospace_rest_handler.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import sys +import time from start_job_rpc_server import StartJobRPCServer from get_job_rpc_server import GetJobRPCServer @@ -28,3 +29,9 @@ class VOSpaceRestHandler(object): def start(self): for srv in self.rpcServerList: srv.start() + running = False + while not running: + time.sleep(1) + running = True + for srv in self.rpcServerList: + running &= srv.is_alive()