Skip to content
Snippets Groups Projects
Commit 46e09fb1 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Minor changes.

parent f458d848
No related branches found
No related tags found
No related merge requests found
Showing
with 71 additions and 72 deletions
......@@ -32,11 +32,11 @@ class AbortJobRPCServer(RedisRPCServer):
logFormatter = logging.Formatter(logFormat)
self.logger.setLevel(eval(logLevel))
redisLogHandler = RedisLogHandler()
logStreamHandler = logging.StreamHandler()
logStreamHandler.setFormatter(logFormatter)
#logStreamHandler = logging.StreamHandler()
#logStreamHandler.setFormatter(logFormatter)
redisLogHandler.setFormatter(logFormatter)
self.logger.addHandler(redisLogHandler)
self.logger.addHandler(logStreamHandler)
#self.logger.addHandler(logStreamHandler)
super(AbortJobRPCServer, self).__init__(host, port, db, rpcQueue)
def callback(self, requestBody):
......
......@@ -39,11 +39,11 @@ class DataRPCServer(RedisRPCServer):
logFormatter = logging.Formatter(logFormat)
self.logger.setLevel(eval(logLevel))
redisLogHandler = RedisLogHandler()
logStreamHandler = logging.StreamHandler()
logStreamHandler.setFormatter(logFormatter)
#logStreamHandler = logging.StreamHandler()
#logStreamHandler.setFormatter(logFormatter)
redisLogHandler.setFormatter(logFormatter)
self.logger.addHandler(redisLogHandler)
self.logger.addHandler(logStreamHandler)
#self.logger.addHandler(logStreamHandler)
self.pendingQueueWrite = JobQueue("write_pending")
self.systemUtils = SystemUtils()
super(DataRPCServer, self).__init__(host, port, db, rpcQueue)
......
......@@ -61,7 +61,11 @@ class FileGrouper(object):
cwd = os.getcwd()
parent = os.path.dirname(folder)
os.chdir(parent)
try:
sp = subprocess.run(["tar", "-cf", os.path.basename(folder) + ".tar", os.path.basename(folder)], capture_output = True)
except Exception:
raise
else:
if(sp.returncode or sp.stderr):
raise(TarFileCreationException(folder))
else:
......
......@@ -36,11 +36,11 @@ class GroupRwExecutor(TaskExecutor):
logFormatter = logging.Formatter(logFormat)
self.logger.setLevel(eval(logLevel))
redisLogHandler = RedisLogHandler()
logStreamHandler = logging.StreamHandler()
logStreamHandler.setFormatter(logFormatter)
#logStreamHandler = logging.StreamHandler()
#logStreamHandler.setFormatter(logFormatter)
redisLogHandler.setFormatter(logFormatter)
self.logger.addHandler(redisLogHandler)
self.logger.addHandler(logStreamHandler)
#self.logger.addHandler(logStreamHandler)
self.resDir = params["res_dir"]
self.systemUtils = SystemUtils()
self.jobObj = None
......
......@@ -32,11 +32,11 @@ class GroupRwRPCServer(RedisRPCServer):
logFormatter = logging.Formatter(logFormat)
self.logger.setLevel(eval(logLevel))
redisLogHandler = RedisLogHandler()
logStreamHandler = logging.StreamHandler()
logStreamHandler.setFormatter(logFormatter)
#logStreamHandler = logging.StreamHandler()
#logStreamHandler.setFormatter(logFormatter)
redisLogHandler.setFormatter(logFormatter)
self.logger.addHandler(redisLogHandler)
self.logger.addHandler(logStreamHandler)
#self.logger.addHandler(logStreamHandler)
self.groupRwReadyQueue = JobQueue("group_rw_ready")
super(GroupRwRPCServer, self).__init__(host, port, db, rpcQueue)
......
......@@ -40,11 +40,11 @@ class ImportExecutor(TaskExecutor):
logFormatter = logging.Formatter(logFormat)
self.logger.setLevel(eval(logLevel))
redisLogHandler = RedisLogHandler()
logStreamHandler = logging.StreamHandler()
logStreamHandler.setFormatter(logFormatter)
#logStreamHandler = logging.StreamHandler()
#logStreamHandler.setFormatter(logFormatter)
redisLogHandler.setFormatter(logFormatter)
self.logger.addHandler(redisLogHandler)
self.logger.addHandler(logStreamHandler)
#self.logger.addHandler(logStreamHandler)
self.resDir = params["res_dir"]
params = config.loadSection("spectrum_archive")
self.tapeClient = TapeClient(params["host"],
......@@ -72,10 +72,8 @@ class ImportExecutor(TaskExecutor):
nodeList = []
timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S")
nodeListFile = os.path.join(self.resDir, "vos_import_report-" + timestamp)
#nodeListFile = self.resDir + '/' + "vos_import_report-" + timestamp
nlfp = open(nodeListFile, "w")
#out = open("import_amqp_server_log.txt", "a")
if self.storageType == "cold":
self.tapeClient.connect()
self.tapeClient.recallChecksumFiles(self.path)
......@@ -85,26 +83,19 @@ class ImportExecutor(TaskExecutor):
tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper")
for dir in dirs:
#out.write(f"DIR dir: {dir}\n")
#out.write(f"DIR pathPrefix: {self.pathPrefix}\n\n")
if self.path in dir:
parentPath = os.path.dirname(dir).split(self.pathPrefix)[1]
nodeName = os.path.basename(dir)
cnode = Node(nodeName, "container")
if not tstampWrapperDirPattern.match("/" + nodeName):
if tstampWrapperDirPattern.search(parentPath):
tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/')
parentPath = tstampWrapperDirPattern.sub("", parentPath)
cnode.setWrapperDir(tstampWrapperDir)
if parentPath == '/':
vospacePath = parentPath + nodeName
else:
vospacePath = parentPath + '/' + nodeName
cnode.setParentPath(parentPath)
locationId = self.dbConn.getLocationId(self.storageId)
cnode.setLocationId(locationId)
......@@ -126,23 +117,14 @@ class ImportExecutor(TaskExecutor):
for flist in files:
for file in flist:
if self.md5calc.fileIsValid(file) and self.path in os.path.dirname(file):
#out.write(f"FILE files: {files}\n")
#out.write(f"FILE flist: {flist}\n")
#out.write(f"FILE file: {file}\n")
#out.write(f"FILE pathPrefix: {self.pathPrefix}\n")
parentPath = os.path.dirname(file).split(self.pathPrefix)[1]
#out.write(f"FILE parentPath: {parentPath}\n")
nodeName = os.path.basename(file)
#out.write(f"FILE nodeName: {nodeName}\n")
dnode = Node(nodeName, "data")
if tstampWrapperDirPattern.search(parentPath):
tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/')
parentPath = tstampWrapperDirPattern.sub("", parentPath)
dnode.setWrapperDir(tstampWrapperDir)
vospacePath = parentPath + '/' + nodeName
#out.write(f"FILE vospacePath: {vospacePath}\n")
dnode.setParentPath(parentPath)
self.storageId = self.dbConn.getStorageId(self.pathPrefix)
locationId = self.dbConn.getLocationId(self.storageId)
......@@ -162,7 +144,6 @@ class ImportExecutor(TaskExecutor):
else:
now = dt.now().isoformat()
nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ])
#out.close()
nlfp.write(tabulate(nodeList,
headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"],
......@@ -181,8 +162,10 @@ class ImportExecutor(TaskExecutor):
m = Mailer(self.logger)
m.addRecipient(self.adminEmail)
msg = f"""
[VOSpace import procedure summary]
########## VOSpace import procedure summary ##########
Job ID: {self.jobId}
Job type: {self.jobObj.type}
Storage type: {self.storageType}
Storage ID: {self.storageId}
Creator ID: {self.userId}
......
......@@ -35,11 +35,11 @@ class ImportRPCServer(RedisRPCServer):
logFormatter = logging.Formatter(logFormat)
self.logger.setLevel(eval(logLevel))
redisLogHandler = RedisLogHandler()
logStreamHandler = logging.StreamHandler()
logStreamHandler.setFormatter(logFormatter)
#logStreamHandler = logging.StreamHandler()
#logStreamHandler.setFormatter(logFormatter)
redisLogHandler.setFormatter(logFormatter)
self.logger.addHandler(redisLogHandler)
self.logger.addHandler(logStreamHandler)
#self.logger.addHandler(logStreamHandler)
self.importReadyQueue = JobQueue("import_ready")
self.systemUtils = SystemUtils()
super(ImportRPCServer, self).__init__(host, port, db, rpcQueue)
......
......@@ -29,11 +29,11 @@ class JobRPCServer(RedisRPCServer):
logFormatter = logging.Formatter(logFormat)
self.logger.setLevel(eval(logLevel))
redisLogHandler = RedisLogHandler()
logStreamHandler = logging.StreamHandler()
logStreamHandler.setFormatter(logFormatter)
#logStreamHandler = logging.StreamHandler()
#logStreamHandler.setFormatter(logFormatter)
redisLogHandler.setFormatter(logFormatter)
self.logger.addHandler(redisLogHandler)
self.logger.addHandler(logStreamHandler)
#self.logger.addHandler(logStreamHandler)
super(JobRPCServer, self).__init__(host, port, db, rpcQueue)
def callback(self, requestBody):
......
......@@ -29,7 +29,7 @@ class LogListener(Process):
if os.path.exists(self.logFilePath):
os.remove(self.logFilePath)
while True:
time.sleep(1)
time.sleep(0.2)
try:
lfp = open(self.logFilePath, 'a')
except IOError:
......
......@@ -5,6 +5,8 @@ import logging
from config import Config
from redis.exceptions import ConnectionError
class RedisLogHandler(object):
"""
......@@ -25,9 +27,11 @@ class RedisLogHandler(object):
def handle(self, record):
try:
self.redisCli.lpush(self.logQueue, self.formatter.format(record))
except:
except ConnectionError:
# Redis is not responding...
pass
except Exception:
raise
def setFormatter(self, formatter):
self.formatter = formatter
......@@ -37,11 +37,11 @@ class RetrieveCleaner(TaskExecutor):
logFormatter = logging.Formatter(logFormat)
self.logger.setLevel(eval(logLevel))
redisLogHandler = RedisLogHandler()
logStreamHandler = logging.StreamHandler()
logStreamHandler.setFormatter(logFormatter)
#logStreamHandler = logging.StreamHandler()
#logStreamHandler.setFormatter(logFormatter)
redisLogHandler.setFormatter(logFormatter)
self.logger.addHandler(redisLogHandler)
self.logger.addHandler(logStreamHandler)
#self.logger.addHandler(logStreamHandler)
self.jobObj = None
self.username = None
self.nodeList = []
......
......@@ -64,11 +64,11 @@ class RetrieveExecutor(TaskExecutor):
logFormatter = logging.Formatter(logFormat)
self.logger.setLevel(eval(logLevel))
redisLogHandler = RedisLogHandler()
logStreamHandler = logging.StreamHandler()
logStreamHandler.setFormatter(logFormatter)
#logStreamHandler = logging.StreamHandler()
#logStreamHandler.setFormatter(logFormatter)
redisLogHandler.setFormatter(logFormatter)
self.logger.addHandler(redisLogHandler)
self.logger.addHandler(logStreamHandler)
#self.logger.addHandler(logStreamHandler)
params = config.loadSection("spectrum_archive")
self.tapePool = params["tape_pool"]
self.tapeClient = TapeClient(params["host"],
......@@ -290,7 +290,8 @@ class RetrieveExecutor(TaskExecutor):
Dear user,
your job has been COMPLETED.
Job ID: {self.jobObj.jobId}
Job ID: {self.jobId}
Job type: {self.jobObj.type}
Owner ID: {self.jobObj.ownerId}
Your files are available and can be downloaded.
......
......@@ -33,11 +33,11 @@ class RetrievePreprocessor(TaskExecutor):
logFormatter = logging.Formatter(logFormat)
self.logger.setLevel(eval(logLevel))
redisLogHandler = RedisLogHandler()
logStreamHandler = logging.StreamHandler()
logStreamHandler.setFormatter(logFormatter)
#logStreamHandler = logging.StreamHandler()
#logStreamHandler.setFormatter(logFormatter)
redisLogHandler.setFormatter(logFormatter)
self.logger.addHandler(redisLogHandler)
self.logger.addHandler(logStreamHandler)
#self.logger.addHandler(logStreamHandler)
self.jobObj = None
self.nodeList = []
super(RetrievePreprocessor, self).__init__()
......@@ -87,7 +87,7 @@ class RetrievePreprocessor(TaskExecutor):
try:
srcQueueLen = self.srcQueue.len()
destQueueLen = self.destQueue.len()
except:
except Exception:
self.logger.exception("Cache error: unable to retrieve queue length.")
else:
if destQueueLen < self.maxReadyJobs and srcQueueLen > 0:
......
......@@ -29,11 +29,11 @@ class StorageRPCServer(RedisRPCServer):
logFormatter = logging.Formatter(logFormat)
self.logger.setLevel(eval(logLevel))
redisLogHandler = RedisLogHandler()
logStreamHandler = logging.StreamHandler()
logStreamHandler.setFormatter(logFormatter)
#logStreamHandler = logging.StreamHandler()
#logStreamHandler.setFormatter(logFormatter)
redisLogHandler.setFormatter(logFormatter)
self.logger.addHandler(redisLogHandler)
self.logger.addHandler(logStreamHandler)
#self.logger.addHandler(logStreamHandler)
super(StorageRPCServer, self).__init__(host, port, db, rpcQueue)
def callback(self, requestBody):
......
......@@ -50,11 +50,11 @@ class StorePreprocessor(TaskExecutor):
logFormatter = logging.Formatter(logFormat)
self.logger.setLevel(eval(logLevel))
redisLogHandler = RedisLogHandler()
logStreamHandler = logging.StreamHandler()
logStreamHandler.setFormatter(logFormatter)
#logStreamHandler = logging.StreamHandler()
#logStreamHandler.setFormatter(logFormatter)
redisLogHandler.setFormatter(logFormatter)
self.logger.addHandler(redisLogHandler)
self.logger.addHandler(logStreamHandler)
#self.logger.addHandler(logStreamHandler)
self.storageId = None
self.storageType = None
self.jobObj = None
......@@ -213,11 +213,12 @@ class StorePreprocessor(TaskExecutor):
return False
self.logger.info("Overall data size calculation")
self.jobObj.jobInfo["dataSize"] = self.systemUtils.getSize(self.path)
self.logger.info("++++++++++ End of preprocessing phase ++++++++++")
return True
except Exception:
self.logger.exception("FATAL: something went wrong during the preprocessing phase.")
return False
else:
self.logger.info("++++++++++ End of preprocessing phase ++++++++++")
return True
def update(self, status):
try:
......@@ -287,7 +288,7 @@ class StorePreprocessor(TaskExecutor):
try:
srcQueueLen = self.srcQueue.len()
destQueueLen = self.destQueue.len()
except:
except Exception:
self.logger.exception("Cache error: failed to retrieve queue length.")
else:
if destQueueLen < self.maxReadyJobs and srcQueueLen > 0:
......
#!/usr/bin/env python
import logging
import os
import sys
from config import Config
from cli_handler import CliHandler
......@@ -15,7 +17,7 @@ class TransferService(object):
def __init__(self):
config = Config("/etc/vos_ts/vos_ts.conf")
params = config.loadSection("logging")
self.logger = logging.getLogger(__name__)
self.logger = logging.getLogger("vos_ts")
logLevel = "logging." + params["log_level"]
logFormat = params["log_format"]
logFormatter = logging.Formatter(logFormat)
......@@ -63,6 +65,7 @@ class TransferService(object):
self.logListener = LogListener()
def start(self):
#if "SUDO_UID" in os.environ.keys():
# Startup
self.logListener.start()
self.jobScheduler.start()
......@@ -71,6 +74,9 @@ class TransferService(object):
self.logger.info("##########################################################")
self.logger.info("########## VOSpace Transfer Service is RUNNING! ##########")
self.logger.info("##########################################################")
#else:
# print("The VOSpace Transfer Service requires super user privileges.")
# sys.exit(1)
ts = TransferService()
ts.start()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment