diff --git a/transfer_service/abort_job_rpc_server.py b/transfer_service/abort_job_rpc_server.py index 651b43666ef8c73e930fcb20f98832cc66b8d22e..35395779a2ce176794b4a670d770b00aab1576c4 100644 --- a/transfer_service/abort_job_rpc_server.py +++ b/transfer_service/abort_job_rpc_server.py @@ -2,6 +2,7 @@ import logging +from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from db_connector import DbConnector from config import Config @@ -25,17 +26,16 @@ class AbortJobRPCServer(RedisRPCServer): params = config.loadSection("scheduling") self.maxPendingJobs = params.getint("max_pending_jobs") params = config.loadSection("logging") - self.logger = logging.getLogger("AbortJobRPCServer") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "abort_job_rpc_server.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) super(AbortJobRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/config/vos_ts.conf b/transfer_service/config/vos_ts.conf index 1a74264481cb01c42d33fa4afe1e2ee9d8df05c9..167bf8d77d70c12f3b79d8657c069a6f28b73df0 100644 --- a/transfer_service/config/vos_ts.conf +++ b/transfer_service/config/vos_ts.conf @@ -87,8 +87,14 @@ admin_email = cristiano.urban@inaf.it ; log level, allowed values are: DEBUG, INFO, WARNING, ERROR, CRITICAL ; Default level is INFO log_level = DEBUG +; format of log records +log_format = %(asctime)s - %(name)s - %(levelname)s - %(message)s +; log queue name +log_queue = vos_ts_logs ; physical path on disk where log files are stored log_dir = /var/log/vos_ts +; log file name +log_file = vos_ts.log ; dir to store results of storage and import operations res_dir = ${log_dir}/results diff --git a/transfer_service/data_rpc_server.py b/transfer_service/data_rpc_server.py index 4e38e7a577ebac885b3f1925a10fe6579543c379..d455f52fcb75f6e85bb7167747788a9744639576 100644 --- a/transfer_service/data_rpc_server.py +++ b/transfer_service/data_rpc_server.py @@ -11,6 +11,7 @@ import logging import os import sys +from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from config import Config from db_connector import DbConnector @@ -37,17 +38,16 @@ class DataRPCServer(RedisRPCServer): params = config.loadSection("scheduling") self.maxPendingJobs = params.getint("max_pending_jobs") params = config.loadSection("logging") - self.logger = logging.getLogger("DataRPCServer") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "data_rpc_server.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.pendingQueueWrite = JobQueue("write_pending") self.systemUtils = SystemUtils() diff --git a/transfer_service/get_job_rpc_server.py b/transfer_service/get_job_rpc_server.py index 522c5b9b25f538a8d7e084c5ebb219b8f078e7fd..1d3cb283eb0cc78a4fdb46a1b484ed48b41c29c3 100644 --- a/transfer_service/get_job_rpc_server.py +++ b/transfer_service/get_job_rpc_server.py @@ -3,6 +3,7 @@ import json import logging +from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from config import Config from db_connector import DbConnector @@ -22,17 +23,16 @@ class GetJobRPCServer(RedisRPCServer): 1, 2) params = config.loadSection("logging") - self.logger = logging.getLogger("GetJobRPCServer") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "get_job_rpc_server.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) super(GetJobRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/group_rw_executor.py b/transfer_service/group_rw_executor.py index bd9403b57d24bcb74ef87b560fb2f43f1fa15e7b..b801f2d962b75a05e55a9350c01a0182e7389f6a 100644 --- a/transfer_service/group_rw_executor.py +++ b/transfer_service/group_rw_executor.py @@ -8,6 +8,7 @@ from datetime import datetime as dt from db_connector import DbConnector from mailer import Mailer from node import Node +from redis_log_handler import RedisLogHandler from system_utils import SystemUtils from tabulate import tabulate from task_executor import TaskExecutor @@ -29,17 +30,16 @@ class GroupRwExecutor(TaskExecutor): params = config.loadSection("mail") self.adminEmail = params["admin_email"] params = config.loadSection("logging") - self.logger = logging.getLogger("GroupRwExecutor") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "group_rw_executor.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] self.systemUtils = SystemUtils() diff --git a/transfer_service/group_rw_rpc_server.py b/transfer_service/group_rw_rpc_server.py index 97a163bb143d55d6c1d0fde1de7470e3b07d7389..82502bf8690d8b6047e4120142792001ed3779e8 100644 --- a/transfer_service/group_rw_rpc_server.py +++ b/transfer_service/group_rw_rpc_server.py @@ -6,6 +6,7 @@ from config import Config from db_connector import DbConnector from job_queue import JobQueue from job import Job +from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer @@ -25,17 +26,16 @@ class GroupRwRPCServer(RedisRPCServer): params = config.loadSection("scheduling") self.maxReadyJobs = params.getint("max_ready_jobs") params = config.loadSection("logging") - self.logger = logging.getLogger("GroupRwRPCServer") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "group_rw_rpc_server.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.groupRwReadyQueue = JobQueue("group_rw_ready") super(GroupRwRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/import_executor.py b/transfer_service/import_executor.py index e2ef62c6f8627c2d965ccfd684f3154a6d1ec26f..caa1b20be8713656ecd41ee7e6e8d013b7623fbd 100644 --- a/transfer_service/import_executor.py +++ b/transfer_service/import_executor.py @@ -10,6 +10,7 @@ from datetime import datetime as dt from db_connector import DbConnector from mailer import Mailer from node import Node +from redis_log_handler import RedisLogHandler from system_utils import SystemUtils from tabulate import tabulate from tape_client import TapeClient @@ -38,17 +39,16 @@ class ImportExecutor(TaskExecutor): params = config.loadSection("mail") self.adminEmail = params["admin_email"] params = config.loadSection("logging") - self.logger = logging.getLogger("ImportExecutor") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "import_executor.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] self.systemUtils = SystemUtils() diff --git a/transfer_service/import_rpc_server.py b/transfer_service/import_rpc_server.py index 54254c359639fcb0bee3af4f1bf1041589d183ac..2766a6d8fba0f8104a6363ad8d84fd6472121ecd 100644 --- a/transfer_service/import_rpc_server.py +++ b/transfer_service/import_rpc_server.py @@ -7,6 +7,7 @@ from config import Config from db_connector import DbConnector from job_queue import JobQueue from job import Job +from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from system_utils import SystemUtils @@ -27,17 +28,16 @@ class ImportRPCServer(RedisRPCServer): params = config.loadSection("scheduling") self.maxReadyJobs = params.getint("max_ready_jobs") params = config.loadSection("logging") - self.logger = logging.getLogger("ImportRPCServer") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "import_rpc_server.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.importReadyQueue = JobQueue("import_ready") self.systemUtils = SystemUtils() diff --git a/transfer_service/job_rpc_server.py b/transfer_service/job_rpc_server.py index 163b6c9e7523aaa712031f70555f332da97b26f8..f71100b380734b1bd3bcf9d2db75c9b2d0afc14c 100644 --- a/transfer_service/job_rpc_server.py +++ b/transfer_service/job_rpc_server.py @@ -3,6 +3,7 @@ import logging import os +from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from config import Config from db_connector import DbConnector @@ -22,17 +23,16 @@ class JobRPCServer(RedisRPCServer): 1, 2) params = config.loadSection("logging") - self.logger = logging.getLogger("JobRPCServer") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "job_rpc_server.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) super(JobRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/log_listener.py b/transfer_service/log_listener.py new file mode 100644 index 0000000000000000000000000000000000000000..165babf7417b74d0a132ab553a7372eec321501a --- /dev/null +++ b/transfer_service/log_listener.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python + +import os +import redis +import time + +from multiprocessing import Process + +from config import Config + + +class LogListener(Process): + + def __init__(self): + config = Config("/etc/vos_ts/vos_ts.conf") + params = config.loadSection("job_cache") + self.redisCli = redis.Redis(host = params["host"], + port = params["port"], + db = params["db_sched"]) + params = config.loadSection("logging") + logDir = params["log_dir"] + logFile = params["log_file"] + self.logQueue = params["log_queue"] + self.logFilePath = os.path.join(logDir, logFile) + super(LogListener, self).__init__() + + def run(self): + if os.path.exists(self.logFilePath): + os.remove(self.logFilePath) + while True: + time.sleep(1) + while self.redisCli.llen(self.logQueue) > 0: + lfp = open(self.logFilePath, 'a') + logRecord = self.redisCli.brpop(self.logQueue)[1].decode("utf-8") + lfp.write(logRecord + '\n') + lfp.close() diff --git a/transfer_service/mailer.py b/transfer_service/mailer.py index e345fc885c126298c1e2de6618938f1c6765b18e..2d5144283a78231508bdbd65a8eb6eebd816e0f8 100644 --- a/transfer_service/mailer.py +++ b/transfer_service/mailer.py @@ -7,6 +7,7 @@ import smtplib from config import Config from email.message import EmailMessage from email.policy import SMTP +from redis_log_handler import RedisLogHandler from smtplib import SMTPConnectError from smtplib import SMTPException @@ -19,7 +20,18 @@ class Mailer(object): self.smtpServer = params["smtp_server"] self.smtpPort = params.getint("smtp_port") self.sender = params["no_reply_email"] - self.logger = logging.getLogger("Mailer") + params = config.loadSection("logging") + self.logger = logging.getLogger(__name__) + logLevel = "logging." + params["log_level"] + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) + self.logger.setLevel(eval(logLevel)) + redisLogHandler = RedisLogHandler() + logStreamHandler = logging.StreamHandler() + logStreamHandler.setFormatter(logFormatter) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) + self.logger.addHandler(logStreamHandler) self.recipients = [] self.message = None diff --git a/transfer_service/redis_log_handler.py b/transfer_service/redis_log_handler.py new file mode 100644 index 0000000000000000000000000000000000000000..fe6a94de759f71a2d1bc3f30a7f4cf3ab5a02ece --- /dev/null +++ b/transfer_service/redis_log_handler.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python + +import redis +import logging + +from config import Config + + +class RedisLogHandler(object): + """ + Log handler for sending log records to a Redis queue. + """ + def __init__(self): + config = Config("/etc/vos_ts/vos_ts.conf") + params = config.loadSection("job_cache") + self.redisCli = redis.Redis(host = params["host"], + port = params["port"], + db = params["db_sched"]) + params = config.loadSection("logging") + self.logQueue = params["log_queue"] + logLevel = "logging." + params["log_level"] + self.level = eval(logLevel) + self.formatter = logging.Formatter() + + def handle(self, record): + try: + self.redisCli.lpush(self.logQueue, self.formatter.format(record)) + except: + # Redis is not responding... + pass + + def setFormatter(self, formatter): + self.formatter = formatter diff --git a/transfer_service/retrieve_executor.py b/transfer_service/retrieve_executor.py index 65141a0c9a0139d3010e263506a298f88b336121..640ab769fee4166c8ab5a119800c90c629dd7206 100644 --- a/transfer_service/retrieve_executor.py +++ b/transfer_service/retrieve_executor.py @@ -27,6 +27,7 @@ import sys from checksum import Checksum from config import Config from db_connector import DbConnector +from redis_log_handler import RedisLogHandler from system_utils import SystemUtils from tape_client import TapeClient from task_executor import TaskExecutor @@ -59,17 +60,16 @@ class RetrieveExecutor(TaskExecutor): params = config.loadSection("scheduling") self.maxTerminatedJobs = params.getint("max_terminated_jobs") params = config.loadSection("logging") - self.logger = logging.getLogger("RetrieveExecutor") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "retrieve_executor.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.storageType = None self.jobObj = None diff --git a/transfer_service/retrieve_preprocessor.py b/transfer_service/retrieve_preprocessor.py index 603e914d9b821b8fe3c4975eb83f0b0f2728a86e..85e13b32c372c489911366f6b6b4e135b3d07eda 100644 --- a/transfer_service/retrieve_preprocessor.py +++ b/transfer_service/retrieve_preprocessor.py @@ -6,6 +6,7 @@ import os from config import Config from db_connector import DbConnector +from redis_log_handler import RedisLogHandler from task_executor import TaskExecutor @@ -23,17 +24,16 @@ class RetrievePreprocessor(TaskExecutor): 1, 1) params = config.loadSection("logging") - self.logger = logging.getLogger("RetrievePreprocessor") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "retrieve_preprocessor.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.jobObj = None self.nodeList = [] diff --git a/transfer_service/start_job_rpc_server.py b/transfer_service/start_job_rpc_server.py index ed40b4986dc7b208c8473e1fee203d7fa728b73f..01fc7d894d223cfebea378e1c9562ee003490335 100644 --- a/transfer_service/start_job_rpc_server.py +++ b/transfer_service/start_job_rpc_server.py @@ -3,6 +3,7 @@ import json import logging +from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from config import Config from db_connector import DbConnector @@ -31,17 +32,16 @@ class StartJobRPCServer(RedisRPCServer): params = config.loadSection("scheduling") self.maxPendingJobs = params.getint("max_pending_jobs") params = config.loadSection("logging") - self.logger = logging.getLogger("StartJobRPCServer") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "start_job_rpc_server.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.pendingQueueRead = JobQueue("read_pending") self.terminatedQueueRead = JobQueue("read_terminated") diff --git a/transfer_service/storage_rpc_server.py b/transfer_service/storage_rpc_server.py index 7faf672d5afede78023a180cee58da0caf2566ee..fd1afea8c100c83565fb599afadec02504b2eadd 100644 --- a/transfer_service/storage_rpc_server.py +++ b/transfer_service/storage_rpc_server.py @@ -3,6 +3,7 @@ import logging import os +from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from config import Config from db_connector import DbConnector @@ -22,17 +23,16 @@ class StorageRPCServer(RedisRPCServer): 1, 2) params = config.loadSection("logging") - self.logger = logging.getLogger("StorageRPCServer") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "storage_rpc_server.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) super(StorageRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/store_executor.py b/transfer_service/store_executor.py index 8c9f985ba159385d2f17f62f062f42b66a465bce..45fbf107bad62026584a42e02bde3d2be14051d9 100644 --- a/transfer_service/store_executor.py +++ b/transfer_service/store_executor.py @@ -8,6 +8,7 @@ import sys from config import Config from db_connector import DbConnector +from redis_log_handler import RedisLogHandler from system_utils import SystemUtils from tape_client import TapeClient from task_executor import TaskExecutor @@ -36,17 +37,16 @@ class StoreExecutor(TaskExecutor): params = config.loadSection("scheduling") self.maxTerminatedJobs = params.getint("max_terminated_jobs") params = config.loadSection("logging") - self.logger = logging.getLogger("StoreExecutor") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "store_executor.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.jobObj = None self.jobId = None diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index 9ed60f6e8999f01998f8e7a236b57c0a00e84db7..8e363f9ebb2b1a10032b271a39a8d88589a0b2cf 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -19,6 +19,7 @@ from config import Config from file_grouper import FileGrouper from db_connector import DbConnector from node import Node +from redis_log_handler import RedisLogHandler from system_utils import SystemUtils from task_executor import TaskExecutor @@ -44,17 +45,16 @@ class StorePreprocessor(TaskExecutor): params = config.loadSection("transfer_node") self.storageStorePath = params["store_path"] params = config.loadSection("logging") - self.logger = logging.getLogger("StorePreprocessor") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "store_preprocessor.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.storageId = None self.storageType = None diff --git a/transfer_service/transfer_service.py b/transfer_service/transfer_service.py index 7101f2a57cc88b0e41b5afcfee95d8e271e73e15..a61ce4725eb33a062b32aa89a32ba254ffc0448c 100644 --- a/transfer_service/transfer_service.py +++ b/transfer_service/transfer_service.py @@ -3,6 +3,7 @@ from config import Config from cli_handler import CliHandler from job_scheduler import JobScheduler +from log_listener import LogListener from vospace_rest_handler import VOSpaceRestHandler @@ -43,9 +44,12 @@ class TransferService(object): self.jobScheduler.addTaskExecutor("retrieve_executor") self.jobScheduler.addTaskExecutor("store_executor") + # Log listener + self.logListener = LogListener() def start(self): # Startup + self.logListener.start() self.jobScheduler.start() self.vosRestHandler.start() self.cliHandler.start()