Skip to content
Snippets Groups Projects
Select Git revision
  • 843c9960ebfae19d2f832e70ddc1ef442d7d5ea0
  • master default protected
  • ia2
  • adql2.1-ia2
  • private_rows
5 results

ClauseADQL.java

Blame
  • import_executor.py 9.19 KiB
    #!/usr/bin/env python
    
    import logging
    import os
    import re
    
    from config import Config
    from checksum import Checksum
    from datetime import datetime as dt
    from db_connector import DbConnector
    from mailer import Mailer
    from node import Node
    from system_utils import SystemUtils
    from tabulate import tabulate
    from tape_client import TapeClient
    from task_executor import TaskExecutor
    
    
    class ImportExecutor(TaskExecutor):
    
        def __init__(self):
            self.type = "import_executor"
            self.md5calc = Checksum()
            config = Config("/etc/vos_ts/vos_ts.conf")
            params = config.loadSection("file_catalog")
            self.dbConn = DbConnector(params["user"],
                                      params["password"],
                                      params["host"],
                                      params.getint("port"),
                                      params["db"],
                                      1,
                                      1)
            params = config.loadSection("spectrum_archive")
            self.tapeClient = TapeClient(params["host"],
                                         params.getint("port"),
                                         params["user"],
                                         params["pkey_file_path"])
            params = config.loadSection("logging")
            self.logger = logging.getLogger("ImportExecutor")
            logLevel = "logging." + params["log_level"]
            logDir = params["log_dir"]
            logFile = logDir + '/' + "import_executor.log"
            self.logger.setLevel(eval(logLevel))
            logFileHandler = logging.FileHandler(logFile)
            logStreamHandler = logging.StreamHandler()
            logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
            logFileHandler.setFormatter(logFormatter)
            logStreamHandler.setFormatter(logFormatter)
            self.logger.addHandler(logFileHandler)
            self.logger.addHandler(logStreamHandler)
            self.systemUtils = SystemUtils()
            self.jobObj = None
            self.jobId = None
            self.userId = None
            self.path = None
            self.pathPrefix = None
            self.storageId = None
            self.storageType = None
            super(ImportExecutor, self).__init__()
    
        def importVOSpaceNodes(self):
            """This method performs the VOSpace import operation."""
    
            self.dbConn.setPhase(self.jobId, "EXECUTING")
            self.dbConn.setStartTime(self.jobId)
    
            start = dt.now()
            nodeList = []
            timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S")
            nodeListFile = "vos_import_report-" + timestamp
            nlfp = open(nodeListFile, "w")
    
            #out = open("import_amqp_server_log.txt", "a")
            if self.storageType == "cold":
                self.tapeClient.connect()
                self.tapeClient.recallChecksumFiles(self.path)
                self.tapeClient.disconnect()
    
            [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path))
    
            tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper")
            for dir in dirs:
                #out.write(f"DIR dir: {dir}\n")
                #out.write(f"DIR pathPrefix: {self.pathPrefix}\n\n")
    
                if self.path in dir:
                    parentPath = os.path.dirname(dir).split(self.pathPrefix)[1]
                    nodeName = os.path.basename(dir)
    
                    cnode = Node(nodeName, "container")
    
                    if not tstampWrapperDirPattern.match("/" + nodeName):
                        if tstampWrapperDirPattern.search(parentPath):
                            tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/')
                            parentPath = tstampWrapperDirPattern.sub("", parentPath)
                            cnode.setWrapperDir(tstampWrapperDir)
    
                        if parentPath == '/':
                            vospacePath = parentPath + nodeName
                        else:
                            vospacePath = parentPath + '/' + nodeName
    
                        cnode.setParentPath(parentPath)
                        locationId = self.dbConn.getLocationId(self.storageId)
                        cnode.setLocationId(locationId)
                        cnode.setCreatorId(self.userId)
                        cnode.setContentLength(0)
                        cnode.setAsyncTrans(True)
                        cnode.setSticky(True)
    
                        if os.path.islink(dir):
                            now = dt.now()
                            nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ])
                        elif self.dbConn.insertNode(cnode):
                            now = dt.now()
                            nodeList.append([ now, dir, vospacePath, "container", "DONE" ])
                        else:
                            now = dt.now()
                            nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ])
    
            for flist in files:
                for file in flist:
                    if self.md5calc.fileIsValid(file) and self.path in os.path.dirname(file):
                        #out.write(f"FILE files: {files}\n")
                        #out.write(f"FILE flist: {flist}\n")
                        #out.write(f"FILE file: {file}\n")
                        #out.write(f"FILE pathPrefix: {self.pathPrefix}\n")
                        parentPath = os.path.dirname(file).split(self.pathPrefix)[1]
                        #out.write(f"FILE parentPath: {parentPath}\n")
                        nodeName = os.path.basename(file)
                        #out.write(f"FILE nodeName: {nodeName}\n")
                        dnode = Node(nodeName, "data")
    
                        if tstampWrapperDirPattern.search(parentPath):
                            tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/')
                            parentPath = tstampWrapperDirPattern.sub("", parentPath)
                            dnode.setWrapperDir(tstampWrapperDir)
    
                        vospacePath = parentPath + '/' + nodeName
                        #out.write(f"FILE vospacePath: {vospacePath}\n")
                        dnode.setParentPath(parentPath)
                        self.storageId = self.dbConn.getStorageId(self.pathPrefix)
                        locationId = self.dbConn.getLocationId(self.storageId)
                        dnode.setLocationId(locationId)
                        dnode.setCreatorId(self.userId)
                        dnode.setContentLength(os.path.getsize(file))
                        dnode.setContentMD5(self.md5calc.getMD5(file))
                        dnode.setAsyncTrans(True)
                        dnode.setSticky(True)
    
                        if os.path.islink(file):
                            now = dt.now()
                            nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ])
                        elif self.dbConn.insertNode(dnode):
                            now = dt.now()
                            nodeList.append([ now, file, vospacePath, "data", "DONE" ])
                        else:
                            now = dt.now()
                            nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ])
            #out.close()
    
            nlfp.write(tabulate(nodeList,
                                headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"],
                                tablefmt = "simple"))
            nlfp.close()
            end = dt.now()
    
            # Update job status (to be moved)
            results = [{"target": ""}]
            self.dbConn.setResults(self.jobId, results)
            self.jobObj.setPhase("COMPLETED")
            self.dbConn.setPhase(self.jobId, "COMPLETED")
            self.dbConn.setEndTime(self.jobId)
    
            m = Mailer()
            m.addRecipient("cristiano.urban@inaf.it")
            msg = f"""
            [VOSpace import procedure summary]
    
            Storage type: {self.storageType}
            Storage ID: {self.storageId}
            Creator ID: {self.userId}
            Start time: {start}
            End time: {end}
            Processed nodes: {len(nodeList)}
            Imported nodes: {sum(res[-1] == 'DONE' for res in nodeList)}
            Skipped nodes: {sum(res[-1] == 'SKIPPED' for res in nodeList)}
    
            """
            m.setMessageWithAttachment("VOSpace import notification", msg, nodeListFile)
            m.send()
    
            os.remove(nodeListFile)
    
    
        def run(self):
            #print("Starting import executor...")
            self.logger.info("Starting import executor...")
            self.setSourceQueueName("import_ready")
            self.setDestinationQueueName("import_terminated")
            while True:
                self.wait()
                if self.srcQueue.len() > 0:
                    self.jobObj = self.srcQueue.getJob()
                    self.jobId = self.jobObj.jobId
                    self.userId = self.jobObj.ownerId
                    self.path = self.jobObj.jobInfo["path"]
                    self.pathPrefix = self.jobObj.jobInfo["pathPrefix"]
                    self.storageId = self.jobObj.jobInfo["storageId"]
                    self.storageType = self.jobObj.jobInfo["storageType"]
                    self.importVOSpaceNodes()
                    if self.destQueue.len() == self.maxTerminatedJobs:
                        self.destQueue.extractJob()
                    self.destQueue.insertJob(self.jobObj)
                    self.srcQueue.extractJob()
                    self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")
                    #print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")