Skip to content
Snippets Groups Projects
Select Git revision
  • 72bf391d0527c4fcf70f5abaf15557d26f44dc36
  • master default protected
  • v4.5.2
  • v4.5.1
  • v4.5.0
  • v4.4.0
  • v4.3.3
  • v4.3.2
  • v4.3.1
  • v4.3.0
  • v4.2.0
  • v4.1.0
  • v4.0.2
  • v4.0.1
  • v4.0.0
  • v3.4.0
  • v3.3.0
  • v3.2.0
  • v3.1.1
  • v3.1.0
  • v3.0.1
  • v3.0.0
22 results

PartOfPacket.cpp

Blame
  • import_executor.py 8.99 KiB
    #!/usr/bin/env python
    
    import logging
    import os
    import re
    
    from config import Config
    from checksum import Checksum
    from datetime import datetime as dt
    from db_connector import DbConnector
    from mailer import Mailer
    from node import Node
    from redis_log_handler import RedisLogHandler
    from system_utils import SystemUtils
    from tabulate import tabulate
    from tape_client import TapeClient
    from task_executor import TaskExecutor
    
    
    class ImportExecutor(TaskExecutor):
    
        def __init__(self):
            self.type = "import_executor"
            self.md5calc = Checksum()
            config = Config("/etc/vos_ts/vos_ts.conf")
            params = config.loadSection("file_catalog")
            self.dbConn = DbConnector(params["user"],
                                      params["password"],
                                      params["host"],
                                      params.getint("port"),
                                      params["db"],
                                      1,
                                      1)        
            params = config.loadSection("mail")
            self.adminEmail = params["admin_email"]
            params = config.loadSection("logging")
            self.logger = logging.getLogger(__name__)
            logLevel = "logging." + params["log_level"]
            logFormat = params["log_format"]
            logFormatter = logging.Formatter(logFormat)
            self.logger.setLevel(eval(logLevel))
            redisLogHandler = RedisLogHandler()
            redisLogHandler.setFormatter(logFormatter)
            self.logger.addHandler(redisLogHandler)
            self.resDir = params["res_dir"]
            params = config.loadSection("spectrum_archive")
            self.tapeClient = TapeClient(params["host"],
                                         params.getint("port"),
                                         params["user"],
                                         params["pkey_file_path"],
                                         self.logger)        
            self.systemUtils = SystemUtils()
            self.jobObj = None
            self.jobId = None
            self.userId = None
            self.path = None
            self.pathPrefix = None
            self.storageId = None
            self.storageType = None
            super(ImportExecutor, self).__init__()
    
        def importVOSpaceNodes(self):
            """This method performs the VOSpace import operation."""
    
            self.dbConn.setPhase(self.jobId, "EXECUTING")
            self.dbConn.setStartTime(self.jobId)
    
            start = dt.now()
            nodeList = []
            timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S")
            nodeListFile = os.path.join(self.resDir, "vos_import_report-" + timestamp)
            nlfp = open(nodeListFile, "w")
    
            if self.storageType == "cold":
                self.tapeClient.connect()
                self.tapeClient.recallChecksumFiles(self.path)
                self.tapeClient.disconnect()
    
            [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path))
    
            tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper")
            for dir in dirs:
                if self.path in dir:
                    parentPath = os.path.dirname(dir).split(self.pathPrefix)[1]
                    nodeName = os.path.basename(dir)
                    cnode = Node(nodeName, "container")
                    if not tstampWrapperDirPattern.match("/" + nodeName):
                        if tstampWrapperDirPattern.search(parentPath):
                            tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/')
                            parentPath = tstampWrapperDirPattern.sub("", parentPath)
                            cnode.setWrapperDir(tstampWrapperDir)
                        if parentPath == '/':
                            vospacePath = parentPath + nodeName
                        else:
                            vospacePath = parentPath + '/' + nodeName
                        cnode.setParentPath(parentPath)
                        locationId = self.dbConn.getLocationId(self.storageId)
                        cnode.setLocationId(locationId)
                        cnode.setCreatorId(self.userId)
                        cnode.setContentLength(0)
                        cnode.setAsyncTrans(True)
                        cnode.setSticky(True)
    
                        if os.path.islink(dir):
                            now = dt.now().isoformat()
                            nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ])
                        elif self.dbConn.insertNode(cnode):
                            now = dt.now().isoformat()
                            nodeList.append([ now, dir, vospacePath, "container", "DONE" ])
                        else:
                            now = dt.now().isoformat()
                            nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ])
    
            for flist in files:
                for file in flist:
                    if self.md5calc.fileIsValid(file) and self.path in os.path.dirname(file):
                        parentPath = os.path.dirname(file).split(self.pathPrefix)[1]
                        nodeName = os.path.basename(file)
                        dnode = Node(nodeName, "data")
                        if tstampWrapperDirPattern.search(parentPath):
                            tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/')
                            parentPath = tstampWrapperDirPattern.sub("", parentPath)
                            dnode.setWrapperDir(tstampWrapperDir)
                        vospacePath = parentPath + '/' + nodeName
                        dnode.setParentPath(parentPath)
                        self.storageId = self.dbConn.getStorageId(self.pathPrefix)
                        locationId = self.dbConn.getLocationId(self.storageId)
                        dnode.setLocationId(locationId)
                        dnode.setCreatorId(self.userId)
                        dnode.setContentLength(os.path.getsize(file))
                        dnode.setContentMD5(self.md5calc.getMD5(file))
                        dnode.setAsyncTrans(True)
                        dnode.setSticky(True)
    
                        if os.path.islink(file):
                            now = dt.now().isoformat()
                            nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ])
                        elif self.dbConn.insertNode(dnode):
                            now = dt.now().isoformat()
                            nodeList.append([ now, file, vospacePath, "data", "DONE" ])
                        else:
                            now = dt.now().isoformat()
                            nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ])
    
            nlfp.write(tabulate(nodeList,
                                headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"],
                                tablefmt = "simple"))
            nlfp.close()
            end = dt.now()
    
            # Update job status (to be moved)
            results = [{"target": ""}]
            self.dbConn.setResults(self.jobId, results)
            self.jobObj.setPhase("COMPLETED")
            self.dbConn.setPhase(self.jobId, "COMPLETED")
            self.dbConn.setEndTime(self.jobId)
    
            # Send e-mail notification
            m = Mailer(self.logger)
            m.addRecipient(self.adminEmail)
            msg = f"""
            ########## VOSpace import procedure summary ##########
    
            Job ID: {self.jobId}
            Job type: {self.jobObj.type}
            Storage type: {self.storageType}
            Storage ID: {self.storageId}
            Creator ID: {self.userId}
            Start time: {start}
            End time: {end}
            Processed nodes: {len(nodeList)}
            Imported nodes: {sum(res[-1] == 'DONE' for res in nodeList)}
            Skipped nodes: {sum(res[-1] == 'SKIPPED' for res in nodeList)}
            Symlinks detected: {sum(res[-1] == 'SYMLINK' for res in nodeList)}
    
            """
            
            if len(nodeList) <= 10 ** 5:            
                m.setMessageWithAttachment("VOSpace import notification", msg, nodeListFile)
            else:
                info = f"""
            INFO: 
            this operation involved a number of nodes greater than 10^5,
            you will find the results in {self.resDir}.
    
                """
                msg += info
                m.setMessage("VOSpace import notification", msg)
            m.send()
    
        def run(self):
            self.logger.info("Starting import executor...")
            self.setSourceQueueName("import_ready")
            self.setDestinationQueueName("import_terminated")
            while True:
                self.wait()
                if self.srcQueue.len() > 0:
                    self.jobObj = self.srcQueue.getJob()
                    self.jobId = self.jobObj.jobId
                    self.userId = self.jobObj.ownerId
                    self.path = self.jobObj.jobInfo["path"]
                    self.pathPrefix = self.jobObj.jobInfo["pathPrefix"]
                    self.storageId = self.jobObj.jobInfo["storageId"]
                    self.storageType = self.jobObj.jobInfo["storageType"]
                    self.importVOSpaceNodes()
                    if self.destQueue.len() == self.maxTerminatedJobs:
                        self.destQueue.extractJob()
                    self.destQueue.insertJob(self.jobObj)
                    self.srcQueue.extractJob()
                    self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")