Skip to content
Snippets Groups Projects
Select Git revision
  • 054d2da313e1f302a809df5ccd7e40462d438eb4
  • master default protected
  • rocky_8
  • rocky_9
  • pasture
  • testing
  • query
  • v0.2.9
  • v0.2.8
  • v0.2.7
  • v0.2.6
  • v0.2.5
  • v0.2.4
  • v0.2.3
  • v0.2.2
  • v0.2.1
  • v0.2.0
  • v0.1.2
  • v0.1.1
  • v0.1
20 results

retrieve_executor.py

Blame
  • retrieve_executor.py 19.76 KiB
    #!/usr/bin/env python
    #
    # This file is part of vospace-transfer-service
    # Copyright (C) 2021 Istituto Nazionale di Astrofisica
    # SPDX-License-Identifier: GPL-3.0-or-later
    #
    #
    # This class is responsible to retrieve data from a generic storage point.
    #
    # The operations performed are the briefly summarized here below:
    # * obtain the storage type
    # * create a list of files to be retrieved (list of dictionaries)
    # * split the list in blocks of a fixed size
    # * loop on each block and retrieve data
    #   - if the storage type is 'cold' (tape) perform a recall operation
    #     before the copy and a migrate operation after the copy
    #   - check if data associated to a VOSpace node has been copied 
    #     every time a block is retrieved
    #   - recursively update the 'async_trans' flag
    # * cleanup
    #
    #
    
    
    import datetime
    import json
    import os
    import logging
    import subprocess
    import sys
    
    from checksum import Checksum
    from config import Config
    from db_connector import DbConnector
    from mailer import Mailer
    from redis_log_handler import RedisLogHandler
    from system_utils import SystemUtils
    from tape_client import TapeClient
    from task_executor import TaskExecutor
    
    
    class RetrieveExecutor(TaskExecutor): 
    
        def __init__(self):
            self.type = "retrieve_executor"
            self.systemUtils = SystemUtils()
            config = Config("/etc/vos_ts/vos_ts.conf")                
            params = config.loadSection("transfer_node")
            self.storageRetrievePath = params["retrieve_path"]
            params = config.loadSection("transfer")
            self.maxBlockSize = self.systemUtils.convertSizeToBytes(params["block_size"])
            params = config.loadSection("scheduling")
            self.maxTerminatedJobs = params.getint("max_terminated_jobs")
            params = config.loadSection("mail")
            self.adminEmail = params["admin_email"]
            params = config.loadSection("logging")
            self.logger = logging.getLogger(__name__)
            logLevel = "logging." + params["log_level"]
            logFormat = params["log_format"]
            logFormatter = logging.Formatter(logFormat)
            self.logger.setLevel(eval(logLevel))
            redisLogHandler = RedisLogHandler()
            redisLogHandler.setFormatter(logFormatter)
            self.logger.addHandler(redisLogHandler)
            params = config.loadSection("file_catalog")
            self.dbConn = DbConnector(params["user"],
                                      params["password"],
                                      params["host"],
                                      params.getint("port"),
                                      params["db"],
                                      1,
                                      1,
                                      self.logger)
            params = config.loadSection("spectrum_archive")
            self.tapePool = params["tape_pool"]
            self.tapeClient = TapeClient(params["host"],
                                         params.getint("port"),
                                         params["user"],
                                         params["pkey_file_path"],
                                         self.logger)
            self.storageType = None
            self.jobObj = None
            self.jobId = None
            self.nodeList = []
            self.fileList = []
            self.destPathList = []
            self.numBlocks = 0
            self.procBlocks = 0
            self.totalSize = 0
            super(RetrieveExecutor, self).__init__()
            
        def  buildFileList(self):
            """
            Generates the list of all files to retrieve.
            This method returns 'True' on success, 'False' on failure.
            """
            try:
                try:
                    self.jobObj.setPhase("EXECUTING")
                    self.jobObj.setStartTime(datetime.datetime.now().isoformat())
                    self.dbConn.insertJob(self.jobObj)
                except Exception:
                    self.logger.exception("FATAL: unable to update the file catalog.")
                    return False
                else:
                    self.logger.info("Job phase updated to EXECUTING.")
                self.logger.info("Building the list of the files to be retrieved...")
            
                # debug block...
                #if os.path.exists("nodeList.txt"):
                #    os.remove("nodeList.txt")
                #nl = open("nodeList.txt", 'w')
                #for vospacePath in self.nodeList:
                #    nl.write(vospacePath + '\n')
                #nl.close()
            
                # Obtain the storage type
                try:
                    self.storageType = self.dbConn.getOSPath(self.nodeList[0])["storageType"]
                except Exception:
                    self.logger.exception("FATAL: unable to obtain the storage type.")
                    return False
                for vospacePath in self.nodeList:
                    try:
                        nodeInfo = self.dbConn.getOSPath(vospacePath)
                    except Exception:
                        self.logger.exception(f"FATAL: unable to obtain the OS path for the VOSpace path '{vospacePath}'.")
                        return False
                    baseSrcPath = nodeInfo["baseSrcPath"]
                    srcPath = nodeInfo["fullPath"]
                    username = nodeInfo["username"]
                    md5calc = Checksum()
                    if os.path.isdir(srcPath) and not os.path.islink(srcPath):    
                        for root, dirs, files in os.walk(srcPath, topdown = False):
                            #dirSize = os.stat(root).st_size
                            #self.totalSize += dirSize
                            for f in files:
                                fullPath = os.path.join(root, f)
                                if md5calc.fileIsValid(fullPath) and not os.path.islink(fullPath):
                                    #fileSize = os.stat(fullPath).st_size
                                    fileSize = os.path.getsize(fullPath)
                                    fileInfo = {
                                                   "baseSrcPath": baseSrcPath,
                                                   "fullPath": fullPath,
                                                   "username": username,
                                                   "fileSize": fileSize, 
                                                   "vospaceRootParent": vospacePath 
                                               }
                                    self.totalSize += fileSize
                                    self.fileList.append(fileInfo.copy())
                    else:
                        if md5calc.fileIsValid(srcPath) and not os.path.islink(srcPath):
                            fileSize = nodeInfo["contentLength"]
                            fileInfo = {
                                           "baseSrcPath": baseSrcPath,
                                           "fullPath": srcPath,
                                           "username": username,
                                           "fileSize": fileSize,
                                           "vospaceRootParent": vospacePath
                                       }
                            self.totalSize += fileSize
                            self.fileList.append(fileInfo.copy())
                self.logger.info(f"Total size of files to retrieve: {self.totalSize} B")
                # debug block...    
                #if os.path.exists("fileList.txt"):
                #    os.remove("fileList.txt")
                #fl = open("fileList.txt", 'w')
                #fl.write(json.dumps(self.fileList, indent = 4))
                #fl.close()
            except Exception:
                self.logger.exception("FATAL: something went wrong while building the list of the files to be retrieved.")
                return False
            else:
                return True
        
        def buildBlocks(self):
            """
            Algorithm to split data in blocks of a well known size.
            This method returns 'True' on success, 'False' on failure.
            """
            try:
                self.logger.info("Building the blocks data structure... ")
                if self.fileList:
                    blockIdx = 0
                    blockSize = 0
                for fileInfo in self.fileList:
                    fileSize = fileInfo["fileSize"]
                    #self.totalSize += fileSize
                    # check if the file is larger than a block size
                    if fileSize > self.maxBlockSize:
                        # if the current block is not empty, "close" it, otherwise 
                        # use it and then create a new block
                        if blockSize > 0:
                            blockIdx += 1
                            fileInfo["blockIdx"] = blockIdx
                            blockIdx += 1
                        else:
                            fileInfo["blockIdx"] = blockIdx
                            blockIdx += 1
                        blockSize = 0
                    else:                
                        # the file can be contained by a block, so check if
                        # the file size plus the current block fill is lower
                        # than the maximum block size
                        if blockSize + fileSize <= self.maxBlockSize:
                            # if so, add the file to the block and go ahead with
                            # the next one
                            fileInfo["blockIdx"] = blockIdx
                            blockSize += fileSize
                        else:
                            # if not, "close" the current block, add it to the block list,
                            # then create a new block, add the file to it and go ahead
                            # with the next one
                            blockIdx += 1
                            fileInfo["blockIdx"] = blockIdx
                            blockSize = fileSize
                if self.fileList:
                    self.numBlocks = blockIdx + 1
                    try:
                        self.dbConn.setTotalBlocks(self.jobId, self.numBlocks)
                    except Exception:
                        self.logger.exception("FATAL: unable to set the total number of blocks in the database.")
                        return False
             
                # debug block... 
                #print(f"numBlocks = {self.numBlocks}")
                #if os.path.exists("blocks.txt"):
                #    os.remove("blocks.txt")
                #fl = open("blocks.txt", 'w')
                #fl.write(json.dumps(self.fileList, indent = 4))
                #fl.close()
            except Exception:
                self.logger.exception("FATAL: something went wrong while building the blocks data structure.")
                return False
            else:
                return True
            
        def retrieveCompleted(self, vospacePath):
            """
            Returns 'True' if all data associated to 'vospacePath' 
            has been copied, otherwise it returns 'False'.
            """
            return not any(vospacePath in f["vospaceRootParent"] for f in self.fileList)
        
        def retrieveData(self):
            """
            Retrieves data from a generic storage point (hot or cold).
            """
            try:
                self.logger.info("Starting data retrieval...")
                # Loop on blocks
                for blockIdx in range(self.numBlocks):
                    blockFileList = [ f for f in self.fileList if f["blockIdx"] == blockIdx ]
                
                    # Recall all files from tape library to tape frontend
                    # if the storage type is 'cold'
                    if self.storageType == "cold":
                        self.tapeClient.connect()
                        self.tapeClient.recall([ f["fullPath"] for f in blockFileList ], self.jobId)
                        self.tapeClient.disconnect()
                
                    # Loop on files in a block
                    for fileInfo in blockFileList:
                        srcPath = fileInfo["fullPath"]
                        username = fileInfo["username"]
                        baseSrcPath = fileInfo["baseSrcPath"]
                        osRelParentPath = os.path.dirname(srcPath)
                        osRelParentPath = osRelParentPath.replace(baseSrcPath, "")
                        if osRelParentPath != "/":
                            osRelParentPath += "/"
                        destDirPath = self.storageRetrievePath.replace("{username}", username) + osRelParentPath
                        os.makedirs(destDirPath, exist_ok = True)
                        if self.storageType == "cold":
                            sp = subprocess.run(["rsync", "-av", "--no-links", srcPath, destDirPath], capture_output = True)
                            if(sp.returncode or sp.stderr):
                                self.logger.error(f"FATAL: error during the copy process, returnCode = {sp.returncode}, stderr: {sp.stderr}")
                                return False
                        else:
                            destPath = destDirPath + os.path.basename(srcPath)
                            try:
                                if not os.path.islink(srcPath):
                                    os.symlink(srcPath, destPath)
                            except Exception:
                                self.logger.error(f"FATAL: error while creating symlink for target '{srcPath}'")
                                return False
                
                    # Remove files from file list at the end of the copy 
                    for fileInfo in blockFileList:
                        if fileInfo in self.fileList:
                            self.fileList.remove(fileInfo)            
                
                    # Check if the copy related to a certain VOSpace node
                    # is completed and recursively update the 'async_trans'
                    # flag
                    for vospacePath in self.nodeList:
                        if self.retrieveCompleted(vospacePath):
                            try:
                                self.dbConn.setAsyncTrans(vospacePath, False)
                            except Exception:
                                self.logger.exception("FATAL: unable to update the file catalog.")
                                return False
                
                    # Empty the tape library frontend if the storage type
                    # is 'cold'
                    if self.storageType == "cold":
                        self.tapeClient.connect()
                        self.tapeClient.migrate([ f["fullPath"] for f in blockFileList ], self.tapePool, self.jobId)
                        self.tapeClient.disconnect()
                    
                    blockFileList.clear()
                    self.procBlocks += 1
                    self.dbConn.updateProcessedBlocks(self.jobId, self.procBlocks)
            except Exception:
                self.logger.exception("FATAL: something went wrong while retrieving the data.")
                return False
            else:
                return True
            
        def execute(self):
            success = True
            self.logger.info("++++++++++ Start of execution phase ++++++++++")
            success &= self.buildFileList() & self.buildBlocks() & self.retrieveData()
            if success:
                self.logger.info("++++++++++ End of execution phase ++++++++++")
                return True
            else:
                self.logger.info("FATAL: something went wrong during the execution phase.")
                return False
    
        def update(self, status):
            """
            Updates the job status and sends an email to the user.
            """
            try:
                results = [{"target": ""}]
                results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"]
    
                m = Mailer(self.logger)
                m.addRecipient(self.adminEmail)
                userEmail = self.dbConn.getUserEmail(self.jobObj.ownerId)
                if userEmail != self.adminEmail:
                    m.addRecipient(userEmail)
    
                self.jobObj.setResults(results)
                
                # Add a list of physical destination paths for each VOSpace node in the node list
                self.logger.info("Generating physical destination paths for VOSpace nodes...")
                for vospacePath in self.nodeList:
                    nodeInfo = self.dbConn.getOSPath(vospacePath)
                    baseSrcPath = nodeInfo["baseSrcPath"]
                    username = nodeInfo["username"]
                    srcPath = nodeInfo["fullPath"]
                    baseDestPath = self.storageRetrievePath.replace("{username}", username)
                    destPath = srcPath.replace(baseSrcPath, baseDestPath)
                    self.destPathList.append(destPath)
                self.jobObj.jobInfo["destPathList"] = self.destPathList.copy()
    
                if status == ("OK"):
                    self.jobObj.setPhase("COMPLETED")
                    self.jobObj.setEndTime(datetime.datetime.now().isoformat())
                    self.dbConn.insertJob(self.jobObj)
                    self.logger.info("Job phase updated to COMPLETED.")
            
                    msg = f"""
            ########## VOSpace data retrieval procedure summary ##########
                    
            Dear user,
            your job has been COMPLETED.
    
            Job ID: {self.jobId}
            Job type: {self.jobObj.type}
            Owner ID: {self.jobObj.ownerId}
    
            Your files are available and can be downloaded.
    
            """
                    m.setMessage("VOSpace data retrieve notification: COMPLETED", msg)
                else:
                    self.jobObj.setPhase("ERROR")
                    self.jobObj.setErrorType("fatal")
                    self.jobObj.setErrorMessage("FATAL: something went wrong during the execution phase.")
                    self.jobObj.setEndTime(datetime.datetime.now().isoformat())
                    self.dbConn.insertJob(self.jobObj)
                    self.logger.info("Job phase updated to ERROR.")                
                    
                    msg = f"""
            ########## VOSpace data retrieval procedure summary ##########
                    
            Dear user,
            your job has FAILED.
    
            Job ID: {self.jobId}
            Job type: {self.jobObj.type}
            Owner ID: {self.jobObj.ownerId}
            """
                    info = f"""
            ERROR: 
            the job was terminated due to an error that occurred 
            while retrieveing the data from the storage point.
            
            This issue will be automatically reported to the administrator.
            
            """
                    msg += info
                    m.setMessage("VOSpace data retrieve notification: ERROR", msg)
                # Send e-mail notification    
                m.send()
            except Exception:
                self.logger.exception(f"FATAL: unable to update the database, job ID: {self.jobId}")
            
        def cleanup(self):
            """
            Cleanup method.
            """
            self.logger.info("Cleanup...")
            self.fileList.clear()
            self.nodeList.clear()
            self.destPathList.clear()
            self.storageType = None
            self.numBlocks = 0
            self.procBlocks = 0
            self.totalSize = 0
    
        def run(self):
            self.logger.info("Starting retrieve executor...")
            self.setSourceQueueName("read_ready")
            self.setDestinationQueueName("read_terminated")
            while True:
                self.wait()
                try:
                    srcQueueLen = self.srcQueue.len()
                    destQueueLen = self.destQueue.len()
                except Exception:
                    self.logger.exception("Cache error: failed to retrieve queue length.")
                else:
                    if srcQueueLen > 0:
                        self.jobObj = self.srcQueue.getJob()
                        self.jobId = self.jobObj.jobId                
                        self.nodeList = self.jobObj.nodeList.copy()
                        if self.execute():
                            self.update("OK")
                        
                            # debug block...
                            #print(f"fileList = {self.fileList}")
                            #print(f"nodeList = {self.nodeList}")
                        else:
                            self.update("ERROR")
                        self.cleanup()
                        try:
                            if destQueueLen >= self.maxTerminatedJobs:
                                self.destQueue.extractJob()
                            self.destQueue.insertJob(self.jobObj)
                            self.srcQueue.extractJob()
                        except Exception:
                            self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")
                        else:
                            self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")