Select Git revision
-
Stefano Scardigli authoredStefano Scardigli authored
import_executor.py 9.19 KiB
#!/usr/bin/env python
import logging
import os
import re
from config import Config
from checksum import Checksum
from datetime import datetime as dt
from db_connector import DbConnector
from mailer import Mailer
from node import Node
from system_utils import SystemUtils
from tabulate import tabulate
from tape_client import TapeClient
from task_executor import TaskExecutor
class ImportExecutor(TaskExecutor):
def __init__(self):
self.type = "import_executor"
self.md5calc = Checksum()
config = Config("/etc/vos_ts/vos_ts.conf")
params = config.loadSection("file_catalog")
self.dbConn = DbConnector(params["user"],
params["password"],
params["host"],
params.getint("port"),
params["db"],
1,
1)
params = config.loadSection("spectrum_archive")
self.tapeClient = TapeClient(params["host"],
params.getint("port"),
params["user"],
params["pkey_file_path"])
params = config.loadSection("logging")
self.logger = logging.getLogger("ImportExecutor")
logLevel = "logging." + params["log_level"]
logDir = params["log_dir"]
logFile = logDir + '/' + "import_executor.log"
self.logger.setLevel(eval(logLevel))
logFileHandler = logging.FileHandler(logFile)
logStreamHandler = logging.StreamHandler()
logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logFileHandler.setFormatter(logFormatter)
logStreamHandler.setFormatter(logFormatter)
self.logger.addHandler(logFileHandler)
self.logger.addHandler(logStreamHandler)
self.systemUtils = SystemUtils()
self.jobObj = None
self.jobId = None
self.userId = None
self.path = None
self.pathPrefix = None
self.storageId = None
self.storageType = None
super(ImportExecutor, self).__init__()
def importVOSpaceNodes(self):
"""This method performs the VOSpace import operation."""
self.dbConn.setPhase(self.jobId, "EXECUTING")
self.dbConn.setStartTime(self.jobId)
start = dt.now()
nodeList = []
timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S")
nodeListFile = "vos_import_report-" + timestamp
nlfp = open(nodeListFile, "w")
#out = open("import_amqp_server_log.txt", "a")
if self.storageType == "cold":
self.tapeClient.connect()
self.tapeClient.recallChecksumFiles(self.path)
self.tapeClient.disconnect()
[ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path))
tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper")
for dir in dirs:
#out.write(f"DIR dir: {dir}\n")
#out.write(f"DIR pathPrefix: {self.pathPrefix}\n\n")
if self.path in dir:
parentPath = os.path.dirname(dir).split(self.pathPrefix)[1]
nodeName = os.path.basename(dir)
cnode = Node(nodeName, "container")
if not tstampWrapperDirPattern.match("/" + nodeName):
if tstampWrapperDirPattern.search(parentPath):
tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/')
parentPath = tstampWrapperDirPattern.sub("", parentPath)
cnode.setWrapperDir(tstampWrapperDir)
if parentPath == '/':
vospacePath = parentPath + nodeName
else:
vospacePath = parentPath + '/' + nodeName
cnode.setParentPath(parentPath)
locationId = self.dbConn.getLocationId(self.storageId)
cnode.setLocationId(locationId)
cnode.setCreatorId(self.userId)
cnode.setContentLength(0)
cnode.setAsyncTrans(True)
cnode.setSticky(True)
if os.path.islink(dir):
now = dt.now()
nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ])
elif self.dbConn.insertNode(cnode):
now = dt.now()
nodeList.append([ now, dir, vospacePath, "container", "DONE" ])
else:
now = dt.now()
nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ])
for flist in files:
for file in flist:
if self.md5calc.fileIsValid(file) and self.path in os.path.dirname(file):
#out.write(f"FILE files: {files}\n")
#out.write(f"FILE flist: {flist}\n")
#out.write(f"FILE file: {file}\n")
#out.write(f"FILE pathPrefix: {self.pathPrefix}\n")
parentPath = os.path.dirname(file).split(self.pathPrefix)[1]
#out.write(f"FILE parentPath: {parentPath}\n")
nodeName = os.path.basename(file)
#out.write(f"FILE nodeName: {nodeName}\n")
dnode = Node(nodeName, "data")
if tstampWrapperDirPattern.search(parentPath):
tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/')
parentPath = tstampWrapperDirPattern.sub("", parentPath)
dnode.setWrapperDir(tstampWrapperDir)
vospacePath = parentPath + '/' + nodeName
#out.write(f"FILE vospacePath: {vospacePath}\n")
dnode.setParentPath(parentPath)
self.storageId = self.dbConn.getStorageId(self.pathPrefix)
locationId = self.dbConn.getLocationId(self.storageId)
dnode.setLocationId(locationId)
dnode.setCreatorId(self.userId)
dnode.setContentLength(os.path.getsize(file))
dnode.setContentMD5(self.md5calc.getMD5(file))
dnode.setAsyncTrans(True)
dnode.setSticky(True)
if os.path.islink(file):
now = dt.now()
nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ])
elif self.dbConn.insertNode(dnode):
now = dt.now()
nodeList.append([ now, file, vospacePath, "data", "DONE" ])
else:
now = dt.now()
nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ])
#out.close()
nlfp.write(tabulate(nodeList,
headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"],
tablefmt = "simple"))
nlfp.close()
end = dt.now()
# Update job status (to be moved)
results = [{"target": ""}]
self.dbConn.setResults(self.jobId, results)
self.jobObj.setPhase("COMPLETED")
self.dbConn.setPhase(self.jobId, "COMPLETED")
self.dbConn.setEndTime(self.jobId)
m = Mailer()
m.addRecipient("cristiano.urban@inaf.it")
msg = f"""
[VOSpace import procedure summary]
Storage type: {self.storageType}
Storage ID: {self.storageId}
Creator ID: {self.userId}
Start time: {start}
End time: {end}
Processed nodes: {len(nodeList)}
Imported nodes: {sum(res[-1] == 'DONE' for res in nodeList)}
Skipped nodes: {sum(res[-1] == 'SKIPPED' for res in nodeList)}
"""
m.setMessageWithAttachment("VOSpace import notification", msg, nodeListFile)
m.send()
os.remove(nodeListFile)
def run(self):
#print("Starting import executor...")
self.logger.info("Starting import executor...")
self.setSourceQueueName("import_ready")
self.setDestinationQueueName("import_terminated")
while True:
self.wait()
if self.srcQueue.len() > 0:
self.jobObj = self.srcQueue.getJob()
self.jobId = self.jobObj.jobId
self.userId = self.jobObj.ownerId
self.path = self.jobObj.jobInfo["path"]
self.pathPrefix = self.jobObj.jobInfo["pathPrefix"]
self.storageId = self.jobObj.jobInfo["storageId"]
self.storageType = self.jobObj.jobInfo["storageType"]
self.importVOSpaceNodes()
if self.destQueue.len() == self.maxTerminatedJobs:
self.destQueue.extractJob()
self.destQueue.insertJob(self.jobObj)
self.srcQueue.extractJob()
self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")
#print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")