From fef57b55c8a59a1b1323896d774174cbce0e6ab0 Mon Sep 17 00:00:00 2001 From: Cristiano Urban <cristiano.urban@inaf.it> Date: Mon, 28 Jun 2021 14:54:24 +0200 Subject: [PATCH] Added centralized logging mechanism. Signed-off-by: Cristiano Urban <cristiano.urban@inaf.it> --- transfer_service/abort_job_rpc_server.py | 14 ++++----- transfer_service/config/vos_ts.conf | 6 ++++ transfer_service/data_rpc_server.py | 14 ++++----- transfer_service/get_job_rpc_server.py | 14 ++++----- transfer_service/group_rw_executor.py | 14 ++++----- transfer_service/group_rw_rpc_server.py | 14 ++++----- transfer_service/import_executor.py | 14 ++++----- transfer_service/import_rpc_server.py | 14 ++++----- transfer_service/job_rpc_server.py | 14 ++++----- transfer_service/log_listener.py | 36 +++++++++++++++++++++++ transfer_service/mailer.py | 14 ++++++++- transfer_service/redis_log_handler.py | 33 +++++++++++++++++++++ transfer_service/retrieve_executor.py | 14 ++++----- transfer_service/retrieve_preprocessor.py | 14 ++++----- transfer_service/start_job_rpc_server.py | 14 ++++----- transfer_service/storage_rpc_server.py | 14 ++++----- transfer_service/store_executor.py | 14 ++++----- transfer_service/store_preprocessor.py | 14 ++++----- transfer_service/transfer_service.py | 4 +++ 19 files changed, 190 insertions(+), 99 deletions(-) create mode 100644 transfer_service/log_listener.py create mode 100644 transfer_service/redis_log_handler.py diff --git a/transfer_service/abort_job_rpc_server.py b/transfer_service/abort_job_rpc_server.py index 651b436..3539577 100644 --- a/transfer_service/abort_job_rpc_server.py +++ b/transfer_service/abort_job_rpc_server.py @@ -2,6 +2,7 @@ import logging +from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from db_connector import DbConnector from config import Config @@ -25,17 +26,16 @@ class AbortJobRPCServer(RedisRPCServer): params = config.loadSection("scheduling") self.maxPendingJobs = params.getint("max_pending_jobs") params = config.loadSection("logging") - self.logger = logging.getLogger("AbortJobRPCServer") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "abort_job_rpc_server.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) super(AbortJobRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/config/vos_ts.conf b/transfer_service/config/vos_ts.conf index 1a74264..167bf8d 100644 --- a/transfer_service/config/vos_ts.conf +++ b/transfer_service/config/vos_ts.conf @@ -87,8 +87,14 @@ admin_email = cristiano.urban@inaf.it ; log level, allowed values are: DEBUG, INFO, WARNING, ERROR, CRITICAL ; Default level is INFO log_level = DEBUG +; format of log records +log_format = %(asctime)s - %(name)s - %(levelname)s - %(message)s +; log queue name +log_queue = vos_ts_logs ; physical path on disk where log files are stored log_dir = /var/log/vos_ts +; log file name +log_file = vos_ts.log ; dir to store results of storage and import operations res_dir = ${log_dir}/results diff --git a/transfer_service/data_rpc_server.py b/transfer_service/data_rpc_server.py index 4e38e7a..d455f52 100644 --- a/transfer_service/data_rpc_server.py +++ b/transfer_service/data_rpc_server.py @@ -11,6 +11,7 @@ import logging import os import sys +from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from config import Config from db_connector import DbConnector @@ -37,17 +38,16 @@ class DataRPCServer(RedisRPCServer): params = config.loadSection("scheduling") self.maxPendingJobs = params.getint("max_pending_jobs") params = config.loadSection("logging") - self.logger = logging.getLogger("DataRPCServer") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "data_rpc_server.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.pendingQueueWrite = JobQueue("write_pending") self.systemUtils = SystemUtils() diff --git a/transfer_service/get_job_rpc_server.py b/transfer_service/get_job_rpc_server.py index 522c5b9..1d3cb28 100644 --- a/transfer_service/get_job_rpc_server.py +++ b/transfer_service/get_job_rpc_server.py @@ -3,6 +3,7 @@ import json import logging +from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from config import Config from db_connector import DbConnector @@ -22,17 +23,16 @@ class GetJobRPCServer(RedisRPCServer): 1, 2) params = config.loadSection("logging") - self.logger = logging.getLogger("GetJobRPCServer") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "get_job_rpc_server.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) super(GetJobRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/group_rw_executor.py b/transfer_service/group_rw_executor.py index bd9403b..b801f2d 100644 --- a/transfer_service/group_rw_executor.py +++ b/transfer_service/group_rw_executor.py @@ -8,6 +8,7 @@ from datetime import datetime as dt from db_connector import DbConnector from mailer import Mailer from node import Node +from redis_log_handler import RedisLogHandler from system_utils import SystemUtils from tabulate import tabulate from task_executor import TaskExecutor @@ -29,17 +30,16 @@ class GroupRwExecutor(TaskExecutor): params = config.loadSection("mail") self.adminEmail = params["admin_email"] params = config.loadSection("logging") - self.logger = logging.getLogger("GroupRwExecutor") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "group_rw_executor.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] self.systemUtils = SystemUtils() diff --git a/transfer_service/group_rw_rpc_server.py b/transfer_service/group_rw_rpc_server.py index 97a163b..82502bf 100644 --- a/transfer_service/group_rw_rpc_server.py +++ b/transfer_service/group_rw_rpc_server.py @@ -6,6 +6,7 @@ from config import Config from db_connector import DbConnector from job_queue import JobQueue from job import Job +from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer @@ -25,17 +26,16 @@ class GroupRwRPCServer(RedisRPCServer): params = config.loadSection("scheduling") self.maxReadyJobs = params.getint("max_ready_jobs") params = config.loadSection("logging") - self.logger = logging.getLogger("GroupRwRPCServer") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "group_rw_rpc_server.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.groupRwReadyQueue = JobQueue("group_rw_ready") super(GroupRwRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/import_executor.py b/transfer_service/import_executor.py index e2ef62c..caa1b20 100644 --- a/transfer_service/import_executor.py +++ b/transfer_service/import_executor.py @@ -10,6 +10,7 @@ from datetime import datetime as dt from db_connector import DbConnector from mailer import Mailer from node import Node +from redis_log_handler import RedisLogHandler from system_utils import SystemUtils from tabulate import tabulate from tape_client import TapeClient @@ -38,17 +39,16 @@ class ImportExecutor(TaskExecutor): params = config.loadSection("mail") self.adminEmail = params["admin_email"] params = config.loadSection("logging") - self.logger = logging.getLogger("ImportExecutor") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "import_executor.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] self.systemUtils = SystemUtils() diff --git a/transfer_service/import_rpc_server.py b/transfer_service/import_rpc_server.py index 54254c3..2766a6d 100644 --- a/transfer_service/import_rpc_server.py +++ b/transfer_service/import_rpc_server.py @@ -7,6 +7,7 @@ from config import Config from db_connector import DbConnector from job_queue import JobQueue from job import Job +from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from system_utils import SystemUtils @@ -27,17 +28,16 @@ class ImportRPCServer(RedisRPCServer): params = config.loadSection("scheduling") self.maxReadyJobs = params.getint("max_ready_jobs") params = config.loadSection("logging") - self.logger = logging.getLogger("ImportRPCServer") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "import_rpc_server.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.importReadyQueue = JobQueue("import_ready") self.systemUtils = SystemUtils() diff --git a/transfer_service/job_rpc_server.py b/transfer_service/job_rpc_server.py index 163b6c9..f71100b 100644 --- a/transfer_service/job_rpc_server.py +++ b/transfer_service/job_rpc_server.py @@ -3,6 +3,7 @@ import logging import os +from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from config import Config from db_connector import DbConnector @@ -22,17 +23,16 @@ class JobRPCServer(RedisRPCServer): 1, 2) params = config.loadSection("logging") - self.logger = logging.getLogger("JobRPCServer") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "job_rpc_server.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) super(JobRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/log_listener.py b/transfer_service/log_listener.py new file mode 100644 index 0000000..165babf --- /dev/null +++ b/transfer_service/log_listener.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python + +import os +import redis +import time + +from multiprocessing import Process + +from config import Config + + +class LogListener(Process): + + def __init__(self): + config = Config("/etc/vos_ts/vos_ts.conf") + params = config.loadSection("job_cache") + self.redisCli = redis.Redis(host = params["host"], + port = params["port"], + db = params["db_sched"]) + params = config.loadSection("logging") + logDir = params["log_dir"] + logFile = params["log_file"] + self.logQueue = params["log_queue"] + self.logFilePath = os.path.join(logDir, logFile) + super(LogListener, self).__init__() + + def run(self): + if os.path.exists(self.logFilePath): + os.remove(self.logFilePath) + while True: + time.sleep(1) + while self.redisCli.llen(self.logQueue) > 0: + lfp = open(self.logFilePath, 'a') + logRecord = self.redisCli.brpop(self.logQueue)[1].decode("utf-8") + lfp.write(logRecord + '\n') + lfp.close() diff --git a/transfer_service/mailer.py b/transfer_service/mailer.py index e345fc8..2d51442 100644 --- a/transfer_service/mailer.py +++ b/transfer_service/mailer.py @@ -7,6 +7,7 @@ import smtplib from config import Config from email.message import EmailMessage from email.policy import SMTP +from redis_log_handler import RedisLogHandler from smtplib import SMTPConnectError from smtplib import SMTPException @@ -19,7 +20,18 @@ class Mailer(object): self.smtpServer = params["smtp_server"] self.smtpPort = params.getint("smtp_port") self.sender = params["no_reply_email"] - self.logger = logging.getLogger("Mailer") + params = config.loadSection("logging") + self.logger = logging.getLogger(__name__) + logLevel = "logging." + params["log_level"] + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) + self.logger.setLevel(eval(logLevel)) + redisLogHandler = RedisLogHandler() + logStreamHandler = logging.StreamHandler() + logStreamHandler.setFormatter(logFormatter) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) + self.logger.addHandler(logStreamHandler) self.recipients = [] self.message = None diff --git a/transfer_service/redis_log_handler.py b/transfer_service/redis_log_handler.py new file mode 100644 index 0000000..fe6a94d --- /dev/null +++ b/transfer_service/redis_log_handler.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python + +import redis +import logging + +from config import Config + + +class RedisLogHandler(object): + """ + Log handler for sending log records to a Redis queue. + """ + def __init__(self): + config = Config("/etc/vos_ts/vos_ts.conf") + params = config.loadSection("job_cache") + self.redisCli = redis.Redis(host = params["host"], + port = params["port"], + db = params["db_sched"]) + params = config.loadSection("logging") + self.logQueue = params["log_queue"] + logLevel = "logging." + params["log_level"] + self.level = eval(logLevel) + self.formatter = logging.Formatter() + + def handle(self, record): + try: + self.redisCli.lpush(self.logQueue, self.formatter.format(record)) + except: + # Redis is not responding... + pass + + def setFormatter(self, formatter): + self.formatter = formatter diff --git a/transfer_service/retrieve_executor.py b/transfer_service/retrieve_executor.py index 65141a0..640ab76 100644 --- a/transfer_service/retrieve_executor.py +++ b/transfer_service/retrieve_executor.py @@ -27,6 +27,7 @@ import sys from checksum import Checksum from config import Config from db_connector import DbConnector +from redis_log_handler import RedisLogHandler from system_utils import SystemUtils from tape_client import TapeClient from task_executor import TaskExecutor @@ -59,17 +60,16 @@ class RetrieveExecutor(TaskExecutor): params = config.loadSection("scheduling") self.maxTerminatedJobs = params.getint("max_terminated_jobs") params = config.loadSection("logging") - self.logger = logging.getLogger("RetrieveExecutor") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "retrieve_executor.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.storageType = None self.jobObj = None diff --git a/transfer_service/retrieve_preprocessor.py b/transfer_service/retrieve_preprocessor.py index 603e914..85e13b3 100644 --- a/transfer_service/retrieve_preprocessor.py +++ b/transfer_service/retrieve_preprocessor.py @@ -6,6 +6,7 @@ import os from config import Config from db_connector import DbConnector +from redis_log_handler import RedisLogHandler from task_executor import TaskExecutor @@ -23,17 +24,16 @@ class RetrievePreprocessor(TaskExecutor): 1, 1) params = config.loadSection("logging") - self.logger = logging.getLogger("RetrievePreprocessor") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "retrieve_preprocessor.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.jobObj = None self.nodeList = [] diff --git a/transfer_service/start_job_rpc_server.py b/transfer_service/start_job_rpc_server.py index ed40b49..01fc7d8 100644 --- a/transfer_service/start_job_rpc_server.py +++ b/transfer_service/start_job_rpc_server.py @@ -3,6 +3,7 @@ import json import logging +from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from config import Config from db_connector import DbConnector @@ -31,17 +32,16 @@ class StartJobRPCServer(RedisRPCServer): params = config.loadSection("scheduling") self.maxPendingJobs = params.getint("max_pending_jobs") params = config.loadSection("logging") - self.logger = logging.getLogger("StartJobRPCServer") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "start_job_rpc_server.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.pendingQueueRead = JobQueue("read_pending") self.terminatedQueueRead = JobQueue("read_terminated") diff --git a/transfer_service/storage_rpc_server.py b/transfer_service/storage_rpc_server.py index 7faf672..fd1afea 100644 --- a/transfer_service/storage_rpc_server.py +++ b/transfer_service/storage_rpc_server.py @@ -3,6 +3,7 @@ import logging import os +from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from config import Config from db_connector import DbConnector @@ -22,17 +23,16 @@ class StorageRPCServer(RedisRPCServer): 1, 2) params = config.loadSection("logging") - self.logger = logging.getLogger("StorageRPCServer") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "storage_rpc_server.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) super(StorageRPCServer, self).__init__(host, port, db, rpcQueue) diff --git a/transfer_service/store_executor.py b/transfer_service/store_executor.py index 8c9f985..45fbf10 100644 --- a/transfer_service/store_executor.py +++ b/transfer_service/store_executor.py @@ -8,6 +8,7 @@ import sys from config import Config from db_connector import DbConnector +from redis_log_handler import RedisLogHandler from system_utils import SystemUtils from tape_client import TapeClient from task_executor import TaskExecutor @@ -36,17 +37,16 @@ class StoreExecutor(TaskExecutor): params = config.loadSection("scheduling") self.maxTerminatedJobs = params.getint("max_terminated_jobs") params = config.loadSection("logging") - self.logger = logging.getLogger("StoreExecutor") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "store_executor.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.jobObj = None self.jobId = None diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index 9ed60f6..8e363f9 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -19,6 +19,7 @@ from config import Config from file_grouper import FileGrouper from db_connector import DbConnector from node import Node +from redis_log_handler import RedisLogHandler from system_utils import SystemUtils from task_executor import TaskExecutor @@ -44,17 +45,16 @@ class StorePreprocessor(TaskExecutor): params = config.loadSection("transfer_node") self.storageStorePath = params["store_path"] params = config.loadSection("logging") - self.logger = logging.getLogger("StorePreprocessor") + self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] - logDir = params["log_dir"] - logFile = logDir + '/' + "store_preprocessor.log" + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) - logFileHandler = logging.FileHandler(logFile) + redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() - logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - logFileHandler.setFormatter(logFormatter) logStreamHandler.setFormatter(logFormatter) - self.logger.addHandler(logFileHandler) + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.storageId = None self.storageType = None diff --git a/transfer_service/transfer_service.py b/transfer_service/transfer_service.py index 7101f2a..a61ce47 100644 --- a/transfer_service/transfer_service.py +++ b/transfer_service/transfer_service.py @@ -3,6 +3,7 @@ from config import Config from cli_handler import CliHandler from job_scheduler import JobScheduler +from log_listener import LogListener from vospace_rest_handler import VOSpaceRestHandler @@ -43,9 +44,12 @@ class TransferService(object): self.jobScheduler.addTaskExecutor("retrieve_executor") self.jobScheduler.addTaskExecutor("store_executor") + # Log listener + self.logListener = LogListener() def start(self): # Startup + self.logListener.start() self.jobScheduler.start() self.vosRestHandler.start() self.cliHandler.start() -- GitLab