Skip to content
Snippets Groups Projects
Select Git revision
  • 607baff68c6155e104099f1735888d238c153a66
  • master default protected
  • rocky_8
  • rocky_9
  • pasture
  • testing
  • query
  • v0.2.9
  • v0.2.8
  • v0.2.7
  • v0.2.6
  • v0.2.5
  • v0.2.4
  • v0.2.3
  • v0.2.2
  • v0.2.1
  • v0.2.0
  • v0.1.2
  • v0.1.1
  • v0.1
20 results

import_executor.py

Blame
  • import_executor.py 13.52 KiB
    #!/usr/bin/env python
    #
    # This file is part of vospace-transfer-service
    # Copyright (C) 2021 Istituto Nazionale di Astrofisica
    # SPDX-License-Identifier: GPL-3.0-or-later
    #
    
    import datetime
    import logging
    import os
    import re
    
    from config import Config
    from checksum import Checksum
    from db_connector import DbConnector
    from mailer import Mailer
    from node import Node
    from redis_log_handler import RedisLogHandler
    from system_utils import SystemUtils
    from tabulate import tabulate
    from tape_client import TapeClient
    from task_executor import TaskExecutor
    
    
    class ImportExecutor(TaskExecutor):
    
        def __init__(self):
            self.type = "import_executor"
            self.md5calc = Checksum()
            config = Config("/etc/vos_ts/vos_ts.conf")
            params = config.loadSection("mail")
            self.adminEmail = params["admin_email"]
            params = config.loadSection("logging")
            self.logger = logging.getLogger(__name__)
            logLevel = "logging." + params["log_level"]
            logFormat = params["log_format"]
            logFormatter = logging.Formatter(logFormat)
            self.logger.setLevel(eval(logLevel))
            redisLogHandler = RedisLogHandler()
            redisLogHandler.setFormatter(logFormatter)
            self.logger.addHandler(redisLogHandler)
            self.resDir = params["res_dir"]
            params = config.loadSection("file_catalog")
            self.dbConn = DbConnector(params["user"],
                                      params["password"],
                                      params["host"],
                                      params.getint("port"),
                                      params["db"],
                                      1,
                                      1,
                                      self.logger)
            params = config.loadSection("spectrum_archive")
            self.tapeClient = TapeClient(params["host"],
                                         params.getint("port"),
                                         params["user"],
                                         params["pkey_file_path"],
                                         self.logger)
            self.systemUtils = SystemUtils()
            self.jobObj = None
            self.jobId = None
            self.userId = None
            self.path = None
            self.pathPrefix = None
            self.storageId = None
            self.storageType = None
            self.nodeList = []
            super(ImportExecutor, self).__init__()
    
        def execute(self):
            """This method performs the VOSpace import operation."""
            try:
                self.logger.info("++++++++++ Start of import phase ++++++++++")
                self.jobObj.setPhase("EXECUTING")
                self.jobObj.setStartTime(datetime.datetime.now().isoformat())
                try:
                    self.dbConn.insertJob(self.jobObj)
                except Exception:
                    self.logger.exception("FATAL: unable to set job 'start_time' and 'phase'.")
                    return False
                self.logger.info("Job phase updated to EXECUTING.")
    
                if self.storageType == "cold":
                    self.tapeClient.connect()
                    self.tapeClient.recallChecksumFiles(self.path)
                    self.tapeClient.disconnect()
    
                self.logger.info(f"Recursive scan of '{os.path.dirname(self.path)}'")
                [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path))
    
                try:
                    locationId = self.dbConn.getLocationId(self.storageId)
                except Exception:
                    self.logger.exception("FATAL: unable to obtain the location ID for the storage point.")
                    return False
    
                tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper")
                for dir in dirs:
                    if self.path in dir:
                        parentPath = os.path.dirname(dir).split(self.pathPrefix)[1]
                        nodeName = os.path.basename(dir)
                        cnode = Node(nodeName, "container")
                        if not tstampWrapperDirPattern.match("/" + nodeName):
                            if tstampWrapperDirPattern.search(parentPath):
                                tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/')
                                parentPath = tstampWrapperDirPattern.sub("", parentPath)
                                cnode.setWrapperDir(tstampWrapperDir)
                            if parentPath == '/':
                                vospacePath = parentPath + nodeName
                            else:
                                vospacePath = parentPath + '/' + nodeName
                            fsPath = vospacePath.split('/', 2)[-1]
                            cnode.setParentPath(parentPath)
                            cnode.setFsPath(fsPath)
                            cnode.setLocationId(locationId)
                            cnode.setJobId(self.jobId)
                            cnode.setCreatorId(self.userId)
                            cnode.setContentLength(0)
                            cnode.setAsyncTrans(True)
                            cnode.setSticky(True)
                            try:
                                now = datetime.datetime.now().isoformat()
                                if os.path.islink(dir):
                                    self.nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ])
                                elif self.dbConn.insertNode(cnode):
                                    self.nodeList.append([ now, dir, vospacePath, "container", "DONE" ])
                                else:
                                    self.nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ])
                            except Exception:
                                self.logger.exception("FATAL: unable to update the file catalog.")
                                return False
    
                for flist in files:
                    for file in flist:
                        if self.md5calc.fileIsValid(file) and self.path in os.path.dirname(file):
                            parentPath = os.path.dirname(file).split(self.pathPrefix)[1]
                            nodeName = os.path.basename(file)
                            dnode = Node(nodeName, "data")
                            if tstampWrapperDirPattern.search(parentPath):
                                tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/')
                                parentPath = tstampWrapperDirPattern.sub("", parentPath)
                                dnode.setWrapperDir(tstampWrapperDir)
                            vospacePath = parentPath + '/' + nodeName
                            fsPath = vospacePath.split('/', 2)[-1]
                            dnode.setParentPath(parentPath)
                            dnode.setFsPath(fsPath)
                            dnode.setLocationId(locationId)
                            dnode.setJobId(self.jobId)
                            dnode.setCreatorId(self.userId)
                            if not os.path.islink(file):
                                dnode.setContentLength(os.path.getsize(file))
                                dnode.setContentMD5(self.md5calc.getMD5(file))
                            else:
                                dnode.setContentLength(0)
                                dnode.setContentMD5(None)
                            dnode.setAsyncTrans(True)
                            dnode.setSticky(True)
                            try:
                                now = datetime.datetime.now().isoformat()
                                if os.path.islink(file):
                                    self.nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ])
                                elif self.dbConn.insertNode(dnode):
                                    self.nodeList.append([ now, file, vospacePath, "data", "DONE" ])
                                else:
                                    self.nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ])
                            except Exception:
                                self.logger.exception("FATAL: unable to update the file catalog.")
                                return False
            except Exception:
                self.logger.exception("FATAL: something went wrong during the import phase.")
                return False
            else:
                self.logger.info("++++++++++ End of import phase ++++++++++")
                return True
    
        def update(self, status):
            try:
                results = [{"target": ""}]
                self.dbConn.setResults(self.jobId, results)
    
                m = Mailer(self.logger)
                m.addRecipient(self.adminEmail)
    
                if status == "OK":
                    for node in self.nodeList:
                        vospacePath = node[2]
                        if node[-1] == "DONE":
                            self.dbConn.setJobId(vospacePath, None)
                    timestamp = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S")
                    nodeListFile = os.path.join(self.resDir, "vos_import_report-" + timestamp)
                    try:
                        nlfp = open(nodeListFile, "w")
                    except IOError:
                        self.logger.exception("Unable to generate the 'vos_import_report'.")
                    else:
                        nlfp.write(tabulate(self.nodeList,
                                            headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"],
                                            tablefmt = "simple"))
                        nlfp.close()
    
                    self.jobObj.setPhase("COMPLETED")
                    self.jobObj.setEndTime(datetime.datetime.now().isoformat())
                    self.dbConn.insertJob(self.jobObj)
                    self.logger.info("Job phase updated to COMPLETED.")
    
                    msg = f"""
            ########## VOSpace import procedure summary ##########
    
            Job ID: {self.jobId}
            Job type: {self.jobObj.type}
            Storage type: {self.storageType}
            Storage ID: {self.storageId}
            Creator ID: {self.userId}
            Processed nodes: {len(self.nodeList)}
            Imported nodes: {sum(res[-1] == 'DONE' for res in self.nodeList)}
            Skipped nodes: {sum(res[-1] == 'SKIPPED' for res in self.nodeList)}
            Symlinks detected: {sum(res[-1] == 'SYMLINK' for res in self.nodeList)}
    
            """
    
                    if len(self.nodeList) <= 10 ** 5:
                        m.setMessageWithAttachment("VOSpace import notification", msg, nodeListFile)
                    else:
                        info = f"""
            INFO:
            this operation involved a number of nodes greater than 10^5,
            you will find the results in {self.resDir}.
    
                """
                        msg += info
                        m.setMessage("VOSpace import notification", msg)
                else:
                    self.jobObj.setPhase("ERROR")
                    self.jobObj.setErrorType("fatal")
                    self.jobObj.setErrorMessage("FATAL: something went wrong during the import phase.")
                    self.jobObj.setEndTime(datetime.datetime.now().isoformat())
                    self.dbConn.insertJob(self.jobObj)
                    self.logger.info("Job phase updated to ERROR.")
                    self.logger.info("Removing VOSpace nodes from the database...")
                    self.dbConn.deleteNodesByJobId(self.jobId)
                    self.logger.info("Database cleanup completed")
    
                    msg = f"""
            ########## VOSpace import procedure summary ##########
    
            Job ID: {self.jobId}
            Job type {self.jobObj.type}
            Storage type: {self.storageType}
            Storage ID: {self.storageId}
            Creator ID: {self.userId}
            """
    
                    info = f"""
            ERROR:
            the job was terminated due to an error that occurred
            while importing the VOSpace nodes in the file catalog.
    
            This issue will be automatically reported to the administrator.
    
            """
                    msg += info
                    m.setMessage("VOSpace import notification: Job ERROR", msg)
                # Send e-mail notification
                m.send()
            except Exception:
                self.logger.exception(f"FATAL: unable to update the database, job ID: {self.jobId}")
            finally:
                self.nodeList.clear()
    
    
        def run(self):
            self.logger.info("Starting import executor...")
            self.setSourceQueueName("import_ready")
            self.setDestinationQueueName("import_terminated")
            while True:
                self.wait()
                try:
                    srcQueueLen = self.srcQueue.len()
                    destQueueLen = self.destQueue.len()
                except Exception:
                    self.logger.exception("Cache error: failed to retrieve queue length.")
                else:
                    if srcQueueLen > 0:
                        self.jobObj = self.srcQueue.getJob()
                        self.jobId = self.jobObj.jobId
                        self.userId = self.jobObj.ownerId
                        self.path = self.jobObj.jobInfo["path"]
                        self.pathPrefix = self.jobObj.jobInfo["pathPrefix"]
                        self.storageId = self.jobObj.jobInfo["storageId"]
                        self.storageType = self.jobObj.jobInfo["storageType"]
                        if self.execute():
                            self.update("OK")
                        else:
                            self.update("ERROR")
                        try:
                            if destQueueLen >= self.maxTerminatedJobs:
                                self.destQueue.extractJob()
                            self.destQueue.insertJob(self.jobObj)
                            self.srcQueue.extractJob()
                        except Exception:
                            self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")
                        else:
                            self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")