#!/usr/bin/env python # # This file is part of vospace-transfer-service # Copyright (C) 2021 Istituto Nazionale di Astrofisica # SPDX-License-Identifier: GPL-3.0-or-later # import datetime import logging import os import re from config import Config from checksum import Checksum from db_connector import DbConnector from mailer import Mailer from node import Node from redis_log_handler import RedisLogHandler from system_utils import SystemUtils from tabulate import tabulate from tape_client import TapeClient from task_executor import TaskExecutor class ImportExecutor(TaskExecutor): def __init__(self): self.type = "import_executor" self.md5calc = Checksum() config = Config("/etc/vos_ts/vos_ts.conf") params = config.loadSection("mail") self.adminEmail = params["admin_email"] params = config.loadSection("logging") self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] logFormat = params["log_format"] logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) self.resDir = params["res_dir"] params = config.loadSection("file_catalog") self.dbConn = DbConnector(params["user"], params["password"], params["host"], params.getint("port"), params["db"], 1, 1, self.logger) params = config.loadSection("spectrum_archive") self.tapeClient = TapeClient(params["host"], params.getint("port"), params["user"], params["pkey_file_path"], self.logger) self.systemUtils = SystemUtils() self.jobObj = None self.jobId = None self.userId = None self.path = None self.pathPrefix = None self.storageId = None self.storageType = None self.nodeList = [] super(ImportExecutor, self).__init__() def execute(self): """This method performs the VOSpace import operation.""" try: self.logger.info("++++++++++ Start of import phase ++++++++++") self.jobObj.setPhase("EXECUTING") self.jobObj.setStartTime(datetime.datetime.now().isoformat()) try: self.dbConn.insertJob(self.jobObj) except Exception: self.logger.exception("FATAL: unable to set job 'start_time' and 'phase'.") return False self.logger.info("Job phase updated to EXECUTING.") if self.storageType == "cold": self.tapeClient.connect() self.tapeClient.recallChecksumFiles(self.path) self.tapeClient.disconnect() self.logger.info(f"Recursive scan of '{os.path.dirname(self.path)}'") [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path)) try: locationId = self.dbConn.getLocationId(self.storageId) except Exception: self.logger.exception("FATAL: unable to obtain the location ID for the storage point.") return False tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") for dir in dirs: if self.path in dir: parentPath = os.path.dirname(dir).split(self.pathPrefix)[1] nodeName = os.path.basename(dir) cnode = Node(nodeName, "container") if not tstampWrapperDirPattern.match("/" + nodeName): if tstampWrapperDirPattern.search(parentPath): tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') parentPath = tstampWrapperDirPattern.sub("", parentPath) cnode.setWrapperDir(tstampWrapperDir) if parentPath == '/': vospacePath = parentPath + nodeName else: vospacePath = parentPath + '/' + nodeName fsPath = vospacePath.split('/', 2)[-1] cnode.setParentPath(parentPath) cnode.setFsPath(fsPath) cnode.setLocationId(locationId) cnode.setJobId(self.jobId) cnode.setCreatorId(self.userId) cnode.setContentLength(0) cnode.setAsyncTrans(True) cnode.setSticky(True) try: now = datetime.datetime.now().isoformat() if os.path.islink(dir): self.nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ]) elif self.dbConn.insertNode(cnode): self.nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) else: self.nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") return False for flist in files: for file in flist: if self.md5calc.fileIsValid(file) and self.path in os.path.dirname(file): parentPath = os.path.dirname(file).split(self.pathPrefix)[1] nodeName = os.path.basename(file) dnode = Node(nodeName, "data") if tstampWrapperDirPattern.search(parentPath): tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') parentPath = tstampWrapperDirPattern.sub("", parentPath) dnode.setWrapperDir(tstampWrapperDir) vospacePath = parentPath + '/' + nodeName fsPath = vospacePath.split('/', 2)[-1] dnode.setParentPath(parentPath) dnode.setFsPath(fsPath) dnode.setLocationId(locationId) dnode.setJobId(self.jobId) dnode.setCreatorId(self.userId) if not os.path.islink(file): dnode.setContentLength(os.path.getsize(file)) dnode.setContentMD5(self.md5calc.getMD5(file)) else: dnode.setContentLength(0) dnode.setContentMD5(None) dnode.setAsyncTrans(True) dnode.setSticky(True) try: now = datetime.datetime.now().isoformat() if os.path.islink(file): self.nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ]) elif self.dbConn.insertNode(dnode): self.nodeList.append([ now, file, vospacePath, "data", "DONE" ]) else: self.nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") return False except Exception: self.logger.exception("FATAL: something went wrong during the import phase.") return False else: self.logger.info("++++++++++ End of import phase ++++++++++") return True def update(self, status): try: results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) m = Mailer(self.logger) m.addRecipient(self.adminEmail) if status == "OK": for node in self.nodeList: vospacePath = node[2] if node[-1] == "DONE": self.dbConn.setJobId(vospacePath, None) timestamp = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S") nodeListFile = os.path.join(self.resDir, "vos_import_report-" + timestamp) try: nlfp = open(nodeListFile, "w") except IOError: self.logger.exception("Unable to generate the 'vos_import_report'.") else: nlfp.write(tabulate(self.nodeList, headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"], tablefmt = "simple")) nlfp.close() self.jobObj.setPhase("COMPLETED") self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.logger.info("Job phase updated to COMPLETED.") msg = f""" ########## VOSpace import procedure summary ########## Job ID: {self.jobId} Job type: {self.jobObj.type} Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} Processed nodes: {len(self.nodeList)} Imported nodes: {sum(res[-1] == 'DONE' for res in self.nodeList)} Skipped nodes: {sum(res[-1] == 'SKIPPED' for res in self.nodeList)} Symlinks detected: {sum(res[-1] == 'SYMLINK' for res in self.nodeList)} """ if len(self.nodeList) <= 10 ** 5: m.setMessageWithAttachment("VOSpace import notification", msg, nodeListFile) else: info = f""" INFO: this operation involved a number of nodes greater than 10^5, you will find the results in {self.resDir}. """ msg += info m.setMessage("VOSpace import notification", msg) else: self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") self.jobObj.setErrorMessage("FATAL: something went wrong during the import phase.") self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.logger.info("Job phase updated to ERROR.") self.logger.info("Removing VOSpace nodes from the database...") self.dbConn.deleteNodesByJobId(self.jobId) self.logger.info("Database cleanup completed") msg = f""" ########## VOSpace import procedure summary ########## Job ID: {self.jobId} Job type {self.jobObj.type} Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} """ info = f""" ERROR: the job was terminated due to an error that occurred while importing the VOSpace nodes in the file catalog. This issue will be automatically reported to the administrator. """ msg += info m.setMessage("VOSpace import notification: Job ERROR", msg) # Send e-mail notification m.send() except Exception: self.logger.exception(f"FATAL: unable to update the database, job ID: {self.jobId}") finally: self.nodeList.clear() def run(self): self.logger.info("Starting import executor...") self.setSourceQueueName("import_ready") self.setDestinationQueueName("import_terminated") while True: self.wait() try: srcQueueLen = self.srcQueue.len() destQueueLen = self.destQueue.len() except Exception: self.logger.exception("Cache error: failed to retrieve queue length.") else: if srcQueueLen > 0: self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.userId = self.jobObj.ownerId self.path = self.jobObj.jobInfo["path"] self.pathPrefix = self.jobObj.jobInfo["pathPrefix"] self.storageId = self.jobObj.jobInfo["storageId"] self.storageType = self.jobObj.jobInfo["storageType"] if self.execute(): self.update("OK") else: self.update("ERROR") try: if destQueueLen >= self.maxTerminatedJobs: self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() except Exception: self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") else: self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")