Skip to content
Snippets Groups Projects
Select Git revision
  • 4c36c403b4cf3d0b1a6f3ea3070322c658bf5c78
  • master default protected
  • ia2
  • adql2.1-ia2
  • private_rows
5 results

ADQLQuery.java

Blame
  • retrieve_executor.py 13.73 KiB
    #!/usr/bin/env python
    #
    #
    # This class is responsible to retrieve data from a generic storage point.
    #
    # The operations performed are the briefly summarized here below:
    # * obtain the storage type
    # * create a list of files to be retrieved (list of dictionaries)
    # * split the list in blocks of a fixed size
    # * loop on each block and retrieve data
    #   - if the storage type is 'cold' (tape) perform a recall operation
    #     before the copy and a migrate operation after the copy
    #   - check if data associated to a VOSpace node has been copied 
    #     every time a block is retrieved
    #   - recursively update the 'async_trans' flag
    # * cleanup
    #
    #
    
    
    import datetime
    import json
    import os
    import logging
    import subprocess
    import sys
    
    from checksum import Checksum
    from config import Config
    from db_connector import DbConnector
    from mailer import Mailer
    from redis_log_handler import RedisLogHandler
    from system_utils import SystemUtils
    from tape_client import TapeClient
    from task_executor import TaskExecutor
    
    
    class RetrieveExecutor(TaskExecutor): 
    
        def __init__(self):
            self.type = "retrieve_executor"
            self.systemUtils = SystemUtils()
            config = Config("/etc/vos_ts/vos_ts.conf")                
            params = config.loadSection("transfer_node")
            self.storageRetrievePath = params["retrieve_path"]
            params = config.loadSection("file_catalog")
            self.dbConn = DbConnector(params["user"],
                                      params["password"],
                                      params["host"],
                                      params.getint("port"),
                                      params["db"],
                                      1,
                                      1)
            params = config.loadSection("transfer")
            self.maxBlockSize = self.systemUtils.convertSizeToBytes(params["block_size"])
            params = config.loadSection("scheduling")
            self.maxTerminatedJobs = params.getint("max_terminated_jobs")
            params = config.loadSection("mail")
            self.adminEmail = params["admin_email"]
            params = config.loadSection("logging")
            self.logger = logging.getLogger(__name__)
            logLevel = "logging." + params["log_level"]
            logFormat = params["log_format"]
            logFormatter = logging.Formatter(logFormat)
            self.logger.setLevel(eval(logLevel))
            redisLogHandler = RedisLogHandler()
            logStreamHandler = logging.StreamHandler()
            logStreamHandler.setFormatter(logFormatter)
            redisLogHandler.setFormatter(logFormatter)
            self.logger.addHandler(redisLogHandler)
            self.logger.addHandler(logStreamHandler)
            params = config.loadSection("spectrum_archive")
            self.tapePool = params["tape_pool"]
            self.tapeClient = TapeClient(params["host"],
                                         params.getint("port"),
                                         params["user"],
                                         params["pkey_file_path"],
                                         self.logger)
            self.storageType = None
            self.jobObj = None
            self.jobId = None
            self.nodeList = []
            self.fileList = []
            self.destPathList = []
            self.numBlocks = 0
            self.procBlocks = 0
            self.totalSize = 0
            super(RetrieveExecutor, self).__init__()
            
        def  buildFileList(self):
            """
            Generates the list of all files to retrieve.
            """
            self.dbConn.setPhase(self.jobId, "EXECUTING")
            self.dbConn.setStartTime(self.jobId)
            
            # debug block...
            if os.path.exists("nodeList.txt"):
                os.remove("nodeList.txt")
            nl = open("nodeList.txt", 'w')
            for vospacePath in self.nodeList:
                nl.write(vospacePath + '\n')
            nl.close()
            
            # Obtain the storage type
            self.storageType = self.dbConn.getOSPath(self.nodeList[0])["storageType"]
            for vospacePath in self.nodeList:
                nodeInfo = self.dbConn.getOSPath(vospacePath)
                baseSrcPath = nodeInfo["baseSrcPath"]
                srcPath = nodeInfo["fullPath"]
                username = nodeInfo["username"]
                md5calc = Checksum()
                if os.path.isdir(srcPath):    
                    for root, dirs, files in os.walk(srcPath, topdown = False):
                        for f in files:
                            fullPath = os.path.join(root, f)
                            if md5calc.fileIsValid(fullPath):                            
                                fileSize = os.stat(fullPath).st_size
                                fileInfo = {
                                               "baseSrcPath": baseSrcPath,
                                               "fullPath": fullPath,
                                               "username": username,
                                               "fileSize": fileSize, 
                                               "vospaceRootParent": vospacePath 
                                           }
                                self.fileList.append(fileInfo.copy())
                else:
                    if md5calc.fileIsValid(srcPath):
                        fileSize = nodeInfo["contentLength"]
                        fileInfo = {
                                       "baseSrcPath": baseSrcPath,
                                       "fullPath": srcPath,
                                       "username": username,
                                       "fileSize": fileSize,
                                       "vospaceRootParent": vospacePath
                                   }
                        self.fileList.append(fileInfo.copy())
                    
                # debug block...    
                if os.path.exists("fileList.txt"):
                    os.remove("fileList.txt")
                fl = open("fileList.txt", 'w')
                fl.write(json.dumps(self.fileList, indent = 4))
                fl.close()
        
        def buildBlocks(self):
            """
            Algorithm to split data in blocks of a well known size.
            """
            if self.fileList:
                blockIdx = 0
                blockSize = 0
            for fileInfo in self.fileList:
                fileSize = fileInfo["fileSize"]
                self.totalSize += fileSize
                # check if the file is larger than a block size
                if fileSize > self.maxBlockSize:
                    # if the current block is not empty, "close" it, otherwise 
                    # use it and then create a new block
                    if blockSize > 0:
                        blockIdx += 1
                        fileInfo["blockIdx"] = blockIdx
                        blockIdx += 1
                    else:
                        fileInfo["blockIdx"] = blockIdx
                        blockIdx += 1
                    blockSize = 0
                else:                
                    # the file can be contained by a block, so check if
                    # the file size plus the current block fill is lower
                    # than the maximum block size
                    if blockSize + fileSize <= self.maxBlockSize:
                        # if so, add the file to the block and go ahead with
                        # the next one
                        fileInfo["blockIdx"] = blockIdx
                        blockSize += fileSize
                    else:
                        # if not, "close" the current block, add it to the block list,
                        # then create a new block, add the file to it and go ahead
                        # with the next one
                        blockIdx += 1
                        fileInfo["blockIdx"] = blockIdx
                        blockSize = fileSize
            if self.fileList:
                self.numBlocks = blockIdx + 1
                self.dbConn.setTotalBlocks(self.jobId, self.numBlocks)
             
            # debug block... 
            print(f"numBlocks = {self.numBlocks}")
            if os.path.exists("blocks.txt"):
                os.remove("blocks.txt")
            fl = open("blocks.txt", 'w')
            fl.write(json.dumps(self.fileList, indent = 4))
            fl.close()
            
        def retrieveCompleted(self, vospacePath):
            """
            Returns 'True' if all data associated to 'vospacePath' 
            has been copied, otherwise it returns 'False'.
            """
            return not any(vospacePath in f["vospaceRootParent"] for f in self.fileList)
        
        def retrieveData(self):
            """
            Retrieves data from a generic storage point (hot or cold).
            """
            
            # Loop on blocks
            for blockIdx in range(self.numBlocks):
                blockFileList = [ f for f in self.fileList if f["blockIdx"] == blockIdx ]
                
                # Recall all files from tape library to tape frontend
                # if the storage type is 'cold'
                if self.storageType == "cold":
                    self.tapeClient.connect()
                    self.tapeClient.recall([ f["fullPath"] for f in blockFileList ])
                    self.tapeClient.disconnect()
                
                # Loop on files in a block
                for fileInfo in blockFileList:
                    srcPath = fileInfo["fullPath"]
                    username = fileInfo["username"]
                    baseSrcPath = fileInfo["baseSrcPath"]
                    osRelParentPath = os.path.dirname(srcPath)
                    osRelParentPath = osRelParentPath.replace(baseSrcPath, "")
                    if osRelParentPath != "/":
                        osRelParentPath += "/"
                    destDirPath = self.storageRetrievePath.replace("{username}", username) + osRelParentPath
                    os.makedirs(destDirPath, exist_ok = True)
                    sp = subprocess.run(["rsync", "-av", srcPath, destDirPath], capture_output = True)
                    if(sp.returncode or sp.stderr):                
                       return False
                
                # Remove files from file list at the end of the copy 
                for fileInfo in blockFileList:
                    if fileInfo in self.fileList:
                        self.fileList.remove(fileInfo)            
                
                # Check if the copy related to a certain VOSpace node
                # is completed and recursively update the 'async_trans'
                # flag
                for vospacePath in self.nodeList:
                    if self.retrieveCompleted(vospacePath):
                        self.dbConn.setAsyncTrans(vospacePath, False)
                
                # Empty the tape library frontend if the storage type
                # is 'cold'
                if self.storageType == "cold":
                    self.tapeClient.connect()
                    self.tapeClient.migrate([ f["fullPath"] for f in blockFileList ], self.tapePool)
                    self.tapeClient.disconnect()
                    
                blockFileList.clear()
                self.procBlocks += 1
                self.dbConn.updateProcessedBlocks(self.jobId, self.procBlocks)            
            return True
    
        def update(self):
            """
            Updates the job status and sends an email to the user.
            """
            results = [{"target": ""}]
            results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"]        
            self.dbConn.setResults(self.jobId, results)
            self.jobObj.setPhase("COMPLETED")
            self.dbConn.setPhase(self.jobId, "COMPLETED")
            self.dbConn.setEndTime(self.jobId)
            self.jobObj.endTime = datetime.datetime.now().isoformat()
            
            # Add a list of physical destination paths for each VOSpace node in the node list
            for vospacePath in self.nodeList:
                nodeInfo = self.dbConn.getOSPath(vospacePath)
                baseSrcPath = nodeInfo["baseSrcPath"]
                username = nodeInfo["username"]
                srcPath = nodeInfo["fullPath"]
                baseDestPath = self.storageRetrievePath.replace("{username}", username)
                destPath = srcPath.replace(baseSrcPath, baseDestPath)
                self.destPathList.append(destPath)            
            self.jobObj.jobInfo["destPathList"] = self.destPathList.copy()
            
            # Send e-mail notification
            m = Mailer(self.logger)
                    
            m.addRecipient(self.adminEmail)
            userEmail = self.dbConn.getUserEmail(self.jobObj.ownerId)
            if userEmail != self.adminEmail:
                m.addRecipient(userEmail)
            
            msg = f"""
            Dear user,
            your job has been COMPLETED.
    
            Job ID: {self.jobObj.jobId}
            Owner ID: {self.jobObj.ownerId}
    
            Your files are available and can be downloaded.
    
            """
            m.setMessage("VOSpace data retrieve notification: Job COMPLETED", msg)
            m.send()
            
        def cleanup(self):
            """
            Cleanup method.
            """
            self.fileList.clear()
            self.nodeList.clear()
            self.destPathList.clear()
            self.storageType = None
            self.numBlocks = 0
            self.procBlocks = 0
            self.totalSize = 0
    
        def run(self):
            self.logger.info("Starting retrieve executor...")
            self.setSourceQueueName("read_ready")
            self.setDestinationQueueName("read_terminated")
            while True:
                self.wait()
                if self.srcQueue.len() > 0:
                    self.jobObj = self.srcQueue.getJob()
                    self.jobId = self.jobObj.jobId                
                    self.nodeList = self.jobObj.jobInfo["nodeList"].copy()
                    self.buildFileList()
                    self.buildBlocks()
                    result = self.retrieveData()
                    if result:
                        self.update()
                        self.cleanup()
                        
                        # debug block...
                        print(f"fileList = {self.fileList}")
                        print(f"nodeList = {self.nodeList}")
                    else:
                        sys.exit("Failed to retrieve data!")
                    if self.destQueue.len() == self.maxTerminatedJobs:
                        self.destQueue.extractJob()
                    self.destQueue.insertJob(self.jobObj)
                    self.srcQueue.extractJob()                    
                    self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")