#!/usr/bin/env python import logging import os import re from config import Config from checksum import Checksum from datetime import datetime as dt from db_connector import DbConnector from mailer import Mailer from node import Node from redis_log_handler import RedisLogHandler from system_utils import SystemUtils from tabulate import tabulate from tape_client import TapeClient from task_executor import TaskExecutor class ImportExecutor(TaskExecutor): def __init__(self): self.type = "import_executor" self.md5calc = Checksum() config = Config("/etc/vos_ts/vos_ts.conf") params = config.loadSection("file_catalog") self.dbConn = DbConnector(params["user"], params["password"], params["host"], params.getint("port"), params["db"], 1, 1) params = config.loadSection("spectrum_archive") self.tapeClient = TapeClient(params["host"], params.getint("port"), params["user"], params["pkey_file_path"]) params = config.loadSection("mail") self.adminEmail = params["admin_email"] params = config.loadSection("logging") self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] logFormat = params["log_format"] logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) self.resDir = params["res_dir"] self.systemUtils = SystemUtils() self.jobObj = None self.jobId = None self.userId = None self.path = None self.pathPrefix = None self.storageId = None self.storageType = None super(ImportExecutor, self).__init__() def importVOSpaceNodes(self): """This method performs the VOSpace import operation.""" self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) start = dt.now() nodeList = [] timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") nodeListFile = os.path.join(self.resDir, "vos_import_report-" + timestamp) #nodeListFile = self.resDir + '/' + "vos_import_report-" + timestamp nlfp = open(nodeListFile, "w") #out = open("import_amqp_server_log.txt", "a") if self.storageType == "cold": self.tapeClient.connect() self.tapeClient.recallChecksumFiles(self.path) self.tapeClient.disconnect() [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path)) tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") for dir in dirs: #out.write(f"DIR dir: {dir}\n") #out.write(f"DIR pathPrefix: {self.pathPrefix}\n\n") if self.path in dir: parentPath = os.path.dirname(dir).split(self.pathPrefix)[1] nodeName = os.path.basename(dir) cnode = Node(nodeName, "container") if not tstampWrapperDirPattern.match("/" + nodeName): if tstampWrapperDirPattern.search(parentPath): tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') parentPath = tstampWrapperDirPattern.sub("", parentPath) cnode.setWrapperDir(tstampWrapperDir) if parentPath == '/': vospacePath = parentPath + nodeName else: vospacePath = parentPath + '/' + nodeName cnode.setParentPath(parentPath) locationId = self.dbConn.getLocationId(self.storageId) cnode.setLocationId(locationId) cnode.setCreatorId(self.userId) cnode.setContentLength(0) cnode.setAsyncTrans(True) cnode.setSticky(True) if os.path.islink(dir): now = dt.now().isoformat() nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ]) elif self.dbConn.insertNode(cnode): now = dt.now().isoformat() nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) else: now = dt.now().isoformat() nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) for flist in files: for file in flist: if self.md5calc.fileIsValid(file) and self.path in os.path.dirname(file): #out.write(f"FILE files: {files}\n") #out.write(f"FILE flist: {flist}\n") #out.write(f"FILE file: {file}\n") #out.write(f"FILE pathPrefix: {self.pathPrefix}\n") parentPath = os.path.dirname(file).split(self.pathPrefix)[1] #out.write(f"FILE parentPath: {parentPath}\n") nodeName = os.path.basename(file) #out.write(f"FILE nodeName: {nodeName}\n") dnode = Node(nodeName, "data") if tstampWrapperDirPattern.search(parentPath): tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') parentPath = tstampWrapperDirPattern.sub("", parentPath) dnode.setWrapperDir(tstampWrapperDir) vospacePath = parentPath + '/' + nodeName #out.write(f"FILE vospacePath: {vospacePath}\n") dnode.setParentPath(parentPath) self.storageId = self.dbConn.getStorageId(self.pathPrefix) locationId = self.dbConn.getLocationId(self.storageId) dnode.setLocationId(locationId) dnode.setCreatorId(self.userId) dnode.setContentLength(os.path.getsize(file)) dnode.setContentMD5(self.md5calc.getMD5(file)) dnode.setAsyncTrans(True) dnode.setSticky(True) if os.path.islink(file): now = dt.now().isoformat() nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ]) elif self.dbConn.insertNode(dnode): now = dt.now().isoformat() nodeList.append([ now, file, vospacePath, "data", "DONE" ]) else: now = dt.now().isoformat() nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) #out.close() nlfp.write(tabulate(nodeList, headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"], tablefmt = "simple")) nlfp.close() end = dt.now() # Update job status (to be moved) results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) # Send e-mail notification m = Mailer() m.addRecipient(self.adminEmail) msg = f""" [VOSpace import procedure summary] Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} Start time: {start} End time: {end} Processed nodes: {len(nodeList)} Imported nodes: {sum(res[-1] == 'DONE' for res in nodeList)} Skipped nodes: {sum(res[-1] == 'SKIPPED' for res in nodeList)} Symlinks detected: {sum(res[-1] == 'SYMLINK' for res in nodeList)} """ if len(nodeList) <= 10 ** 5: m.setMessageWithAttachment("VOSpace import notification", msg, nodeListFile) else: info = f""" INFO: this operation involved a number of nodes greater than 10^5, you will find the results in {self.resDir}. """ msg += info m.setMessage("VOSpace import notification", msg) m.send() def run(self): self.logger.info("Starting import executor...") self.setSourceQueueName("import_ready") self.setDestinationQueueName("import_terminated") while True: self.wait() if self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.userId = self.jobObj.ownerId self.path = self.jobObj.jobInfo["path"] self.pathPrefix = self.jobObj.jobInfo["pathPrefix"] self.storageId = self.jobObj.jobInfo["storageId"] self.storageType = self.jobObj.jobInfo["storageType"] self.importVOSpaceNodes() if self.destQueue.len() == self.maxTerminatedJobs: self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")