Select Git revision
VlkbCli.java
-
Robert Butora authoredRobert Butora authored
import_executor.py 13.52 KiB
#!/usr/bin/env python
#
# This file is part of vospace-transfer-service
# Copyright (C) 2021 Istituto Nazionale di Astrofisica
# SPDX-License-Identifier: GPL-3.0-or-later
#
import datetime
import logging
import os
import re
from config import Config
from checksum import Checksum
from db_connector import DbConnector
from mailer import Mailer
from node import Node
from redis_log_handler import RedisLogHandler
from system_utils import SystemUtils
from tabulate import tabulate
from tape_client import TapeClient
from task_executor import TaskExecutor
class ImportExecutor(TaskExecutor):
def __init__(self):
self.type = "import_executor"
self.md5calc = Checksum()
config = Config("/etc/vos_ts/vos_ts.conf")
params = config.loadSection("mail")
self.adminEmail = params["admin_email"]
params = config.loadSection("logging")
self.logger = logging.getLogger(__name__)
logLevel = "logging." + params["log_level"]
logFormat = params["log_format"]
logFormatter = logging.Formatter(logFormat)
self.logger.setLevel(eval(logLevel))
redisLogHandler = RedisLogHandler()
redisLogHandler.setFormatter(logFormatter)
self.logger.addHandler(redisLogHandler)
self.resDir = params["res_dir"]
params = config.loadSection("file_catalog")
self.dbConn = DbConnector(params["user"],
params["password"],
params["host"],
params.getint("port"),
params["db"],
1,
1,
self.logger)
params = config.loadSection("spectrum_archive")
self.tapeClient = TapeClient(params["host"],
params.getint("port"),
params["user"],
params["pkey_file_path"],
self.logger)
self.systemUtils = SystemUtils()
self.jobObj = None
self.jobId = None
self.userId = None
self.path = None
self.pathPrefix = None
self.storageId = None
self.storageType = None
self.nodeList = []
super(ImportExecutor, self).__init__()
def execute(self):
"""This method performs the VOSpace import operation."""
try:
self.logger.info("++++++++++ Start of import phase ++++++++++")
self.jobObj.setPhase("EXECUTING")
self.jobObj.setStartTime(datetime.datetime.now().isoformat())
try:
self.dbConn.insertJob(self.jobObj)
except Exception:
self.logger.exception("FATAL: unable to set job 'start_time' and 'phase'.")
return False
self.logger.info("Job phase updated to EXECUTING.")
if self.storageType == "cold":
self.tapeClient.connect()
self.tapeClient.recallChecksumFiles(self.path)
self.tapeClient.disconnect()
self.logger.info(f"Recursive scan of '{os.path.dirname(self.path)}'")
[ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path))
try:
locationId = self.dbConn.getLocationId(self.storageId)
except Exception:
self.logger.exception("FATAL: unable to obtain the location ID for the storage point.")
return False
tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper")
for dir in dirs:
if self.path in dir:
parentPath = os.path.dirname(dir).split(self.pathPrefix)[1]
nodeName = os.path.basename(dir)
cnode = Node(nodeName, "container")
if not tstampWrapperDirPattern.match("/" + nodeName):
if tstampWrapperDirPattern.search(parentPath):
tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/')
parentPath = tstampWrapperDirPattern.sub("", parentPath)
cnode.setWrapperDir(tstampWrapperDir)
if parentPath == '/':
vospacePath = parentPath + nodeName
else:
vospacePath = parentPath + '/' + nodeName
fsPath = vospacePath.split('/', 2)[-1]
cnode.setParentPath(parentPath)
cnode.setFsPath(fsPath)
cnode.setLocationId(locationId)
cnode.setJobId(self.jobId)
cnode.setCreatorId(self.userId)
cnode.setContentLength(0)
cnode.setAsyncTrans(True)
cnode.setSticky(True)
try:
now = datetime.datetime.now().isoformat()
if os.path.islink(dir):
self.nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ])
elif self.dbConn.insertNode(cnode):
self.nodeList.append([ now, dir, vospacePath, "container", "DONE" ])
else:
self.nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ])
except Exception:
self.logger.exception("FATAL: unable to update the file catalog.")
return False
for flist in files:
for file in flist:
if self.md5calc.fileIsValid(file) and self.path in os.path.dirname(file):
parentPath = os.path.dirname(file).split(self.pathPrefix)[1]
nodeName = os.path.basename(file)
dnode = Node(nodeName, "data")
if tstampWrapperDirPattern.search(parentPath):
tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/')
parentPath = tstampWrapperDirPattern.sub("", parentPath)
dnode.setWrapperDir(tstampWrapperDir)
vospacePath = parentPath + '/' + nodeName
fsPath = vospacePath.split('/', 2)[-1]
dnode.setParentPath(parentPath)
dnode.setFsPath(fsPath)
dnode.setLocationId(locationId)
dnode.setJobId(self.jobId)
dnode.setCreatorId(self.userId)
if not os.path.islink(file):
dnode.setContentLength(os.path.getsize(file))
dnode.setContentMD5(self.md5calc.getMD5(file))
else:
dnode.setContentLength(0)
dnode.setContentMD5(None)
dnode.setAsyncTrans(True)
dnode.setSticky(True)
try:
now = datetime.datetime.now().isoformat()
if os.path.islink(file):
self.nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ])
elif self.dbConn.insertNode(dnode):
self.nodeList.append([ now, file, vospacePath, "data", "DONE" ])
else:
self.nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ])
except Exception:
self.logger.exception("FATAL: unable to update the file catalog.")
return False
except Exception:
self.logger.exception("FATAL: something went wrong during the import phase.")
return False
else:
self.logger.info("++++++++++ End of import phase ++++++++++")
return True
def update(self, status):
try:
results = [{"target": ""}]
self.dbConn.setResults(self.jobId, results)
m = Mailer(self.logger)
m.addRecipient(self.adminEmail)
if status == "OK":
for node in self.nodeList:
vospacePath = node[2]
if node[-1] == "DONE":
self.dbConn.setJobId(vospacePath, None)
timestamp = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S")
nodeListFile = os.path.join(self.resDir, "vos_import_report-" + timestamp)
try:
nlfp = open(nodeListFile, "w")
except IOError:
self.logger.exception("Unable to generate the 'vos_import_report'.")
else:
nlfp.write(tabulate(self.nodeList,
headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"],
tablefmt = "simple"))
nlfp.close()
self.jobObj.setPhase("COMPLETED")
self.jobObj.setEndTime(datetime.datetime.now().isoformat())
self.dbConn.insertJob(self.jobObj)
self.logger.info("Job phase updated to COMPLETED.")
msg = f"""
########## VOSpace import procedure summary ##########
Job ID: {self.jobId}
Job type: {self.jobObj.type}
Storage type: {self.storageType}
Storage ID: {self.storageId}
Creator ID: {self.userId}
Processed nodes: {len(self.nodeList)}
Imported nodes: {sum(res[-1] == 'DONE' for res in self.nodeList)}
Skipped nodes: {sum(res[-1] == 'SKIPPED' for res in self.nodeList)}
Symlinks detected: {sum(res[-1] == 'SYMLINK' for res in self.nodeList)}
"""
if len(self.nodeList) <= 10 ** 5:
m.setMessageWithAttachment("VOSpace import notification", msg, nodeListFile)
else:
info = f"""
INFO:
this operation involved a number of nodes greater than 10^5,
you will find the results in {self.resDir}.
"""
msg += info
m.setMessage("VOSpace import notification", msg)
else:
self.jobObj.setPhase("ERROR")
self.jobObj.setErrorType("fatal")
self.jobObj.setErrorMessage("FATAL: something went wrong during the import phase.")
self.jobObj.setEndTime(datetime.datetime.now().isoformat())
self.dbConn.insertJob(self.jobObj)
self.logger.info("Job phase updated to ERROR.")
self.logger.info("Removing VOSpace nodes from the database...")
self.dbConn.deleteNodesByJobId(self.jobId)
self.logger.info("Database cleanup completed")
msg = f"""
########## VOSpace import procedure summary ##########
Job ID: {self.jobId}
Job type {self.jobObj.type}
Storage type: {self.storageType}
Storage ID: {self.storageId}
Creator ID: {self.userId}
"""
info = f"""
ERROR:
the job was terminated due to an error that occurred
while importing the VOSpace nodes in the file catalog.
This issue will be automatically reported to the administrator.
"""
msg += info
m.setMessage("VOSpace import notification: Job ERROR", msg)
# Send e-mail notification
m.send()
except Exception:
self.logger.exception(f"FATAL: unable to update the database, job ID: {self.jobId}")
finally:
self.nodeList.clear()
def run(self):
self.logger.info("Starting import executor...")
self.setSourceQueueName("import_ready")
self.setDestinationQueueName("import_terminated")
while True:
self.wait()
try:
srcQueueLen = self.srcQueue.len()
destQueueLen = self.destQueue.len()
except Exception:
self.logger.exception("Cache error: failed to retrieve queue length.")
else:
if srcQueueLen > 0:
self.jobObj = self.srcQueue.getJob()
self.jobId = self.jobObj.jobId
self.userId = self.jobObj.ownerId
self.path = self.jobObj.jobInfo["path"]
self.pathPrefix = self.jobObj.jobInfo["pathPrefix"]
self.storageId = self.jobObj.jobInfo["storageId"]
self.storageType = self.jobObj.jobInfo["storageType"]
if self.execute():
self.update("OK")
else:
self.update("ERROR")
try:
if destQueueLen >= self.maxTerminatedJobs:
self.destQueue.extractJob()
self.destQueue.insertJob(self.jobObj)
self.srcQueue.extractJob()
except Exception:
self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")
else:
self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")