#!/usr/bin/env python # # # This class is responsible to retrieve data from a generic storage point. # # The operations performed are the briefly summarized here below: # * obtain the storage type # * create a list of files to be retrieved (list of dictionaries) # * split the list in blocks of a fixed size # * loop on each block and retrieve data # - if the storage type is 'cold' (tape) perform a recall operation # before the copy and a migrate operation after the copy # - check if data associated to a VOSpace node has been copied # every time a block is retrieved # - recursively update the 'async_trans' flag # * cleanup # # import datetime import json import os import logging import subprocess import sys from checksum import Checksum from config import Config from db_connector import DbConnector from mailer import Mailer from redis_log_handler import RedisLogHandler from system_utils import SystemUtils from tape_client import TapeClient from task_executor import TaskExecutor class RetrieveExecutor(TaskExecutor): def __init__(self): self.type = "retrieve_executor" self.systemUtils = SystemUtils() config = Config("/etc/vos_ts/vos_ts.conf") params = config.loadSection("transfer_node") self.storageRetrievePath = params["retrieve_path"] params = config.loadSection("file_catalog") self.dbConn = DbConnector(params["user"], params["password"], params["host"], params.getint("port"), params["db"], 1, 1) params = config.loadSection("transfer") self.maxBlockSize = self.systemUtils.convertSizeToBytes(params["block_size"]) params = config.loadSection("scheduling") self.maxTerminatedJobs = params.getint("max_terminated_jobs") params = config.loadSection("mail") self.adminEmail = params["admin_email"] params = config.loadSection("logging") self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] logFormat = params["log_format"] logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() logStreamHandler = logging.StreamHandler() logStreamHandler.setFormatter(logFormatter) redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) self.logger.addHandler(logStreamHandler) params = config.loadSection("spectrum_archive") self.tapePool = params["tape_pool"] self.tapeClient = TapeClient(params["host"], params.getint("port"), params["user"], params["pkey_file_path"], self.logger) self.storageType = None self.jobObj = None self.jobId = None self.nodeList = [] self.fileList = [] self.destPathList = [] self.numBlocks = 0 self.procBlocks = 0 self.totalSize = 0 super(RetrieveExecutor, self).__init__() def buildFileList(self): """ Generates the list of all files to retrieve. """ self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) # debug block... if os.path.exists("nodeList.txt"): os.remove("nodeList.txt") nl = open("nodeList.txt", 'w') for vospacePath in self.nodeList: nl.write(vospacePath + '\n') nl.close() # Obtain the storage type self.storageType = self.dbConn.getOSPath(self.nodeList[0])["storageType"] for vospacePath in self.nodeList: nodeInfo = self.dbConn.getOSPath(vospacePath) baseSrcPath = nodeInfo["baseSrcPath"] srcPath = nodeInfo["fullPath"] username = nodeInfo["username"] md5calc = Checksum() if os.path.isdir(srcPath): for root, dirs, files in os.walk(srcPath, topdown = False): for f in files: fullPath = os.path.join(root, f) if md5calc.fileIsValid(fullPath): fileSize = os.stat(fullPath).st_size fileInfo = { "baseSrcPath": baseSrcPath, "fullPath": fullPath, "username": username, "fileSize": fileSize, "vospaceRootParent": vospacePath } self.fileList.append(fileInfo.copy()) else: if md5calc.fileIsValid(srcPath): fileSize = nodeInfo["contentLength"] fileInfo = { "baseSrcPath": baseSrcPath, "fullPath": srcPath, "username": username, "fileSize": fileSize, "vospaceRootParent": vospacePath } self.fileList.append(fileInfo.copy()) # debug block... if os.path.exists("fileList.txt"): os.remove("fileList.txt") fl = open("fileList.txt", 'w') fl.write(json.dumps(self.fileList, indent = 4)) fl.close() def buildBlocks(self): """ Algorithm to split data in blocks of a well known size. """ if self.fileList: blockIdx = 0 blockSize = 0 for fileInfo in self.fileList: fileSize = fileInfo["fileSize"] self.totalSize += fileSize # check if the file is larger than a block size if fileSize > self.maxBlockSize: # if the current block is not empty, "close" it, otherwise # use it and then create a new block if blockSize > 0: blockIdx += 1 fileInfo["blockIdx"] = blockIdx blockIdx += 1 else: fileInfo["blockIdx"] = blockIdx blockIdx += 1 blockSize = 0 else: # the file can be contained by a block, so check if # the file size plus the current block fill is lower # than the maximum block size if blockSize + fileSize <= self.maxBlockSize: # if so, add the file to the block and go ahead with # the next one fileInfo["blockIdx"] = blockIdx blockSize += fileSize else: # if not, "close" the current block, add it to the block list, # then create a new block, add the file to it and go ahead # with the next one blockIdx += 1 fileInfo["blockIdx"] = blockIdx blockSize = fileSize if self.fileList: self.numBlocks = blockIdx + 1 self.dbConn.setTotalBlocks(self.jobId, self.numBlocks) # debug block... print(f"numBlocks = {self.numBlocks}") if os.path.exists("blocks.txt"): os.remove("blocks.txt") fl = open("blocks.txt", 'w') fl.write(json.dumps(self.fileList, indent = 4)) fl.close() def retrieveCompleted(self, vospacePath): """ Returns 'True' if all data associated to 'vospacePath' has been copied, otherwise it returns 'False'. """ return not any(vospacePath in f["vospaceRootParent"] for f in self.fileList) def retrieveData(self): """ Retrieves data from a generic storage point (hot or cold). """ # Loop on blocks for blockIdx in range(self.numBlocks): blockFileList = [ f for f in self.fileList if f["blockIdx"] == blockIdx ] # Recall all files from tape library to tape frontend # if the storage type is 'cold' if self.storageType == "cold": self.tapeClient.connect() self.tapeClient.recall([ f["fullPath"] for f in blockFileList ]) self.tapeClient.disconnect() # Loop on files in a block for fileInfo in blockFileList: srcPath = fileInfo["fullPath"] username = fileInfo["username"] baseSrcPath = fileInfo["baseSrcPath"] osRelParentPath = os.path.dirname(srcPath) osRelParentPath = osRelParentPath.replace(baseSrcPath, "") if osRelParentPath != "/": osRelParentPath += "/" destDirPath = self.storageRetrievePath.replace("{username}", username) + osRelParentPath os.makedirs(destDirPath, exist_ok = True) sp = subprocess.run(["rsync", "-av", srcPath, destDirPath], capture_output = True) if(sp.returncode or sp.stderr): return False # Remove files from file list at the end of the copy for fileInfo in blockFileList: if fileInfo in self.fileList: self.fileList.remove(fileInfo) # Check if the copy related to a certain VOSpace node # is completed and recursively update the 'async_trans' # flag for vospacePath in self.nodeList: if self.retrieveCompleted(vospacePath): self.dbConn.setAsyncTrans(vospacePath, False) # Empty the tape library frontend if the storage type # is 'cold' if self.storageType == "cold": self.tapeClient.connect() self.tapeClient.migrate([ f["fullPath"] for f in blockFileList ], self.tapePool) self.tapeClient.disconnect() blockFileList.clear() self.procBlocks += 1 self.dbConn.updateProcessedBlocks(self.jobId, self.procBlocks) return True def update(self): """ Updates the job status and sends an email to the user. """ results = [{"target": ""}] results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"] self.dbConn.setResults(self.jobId, results) self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) self.jobObj.endTime = datetime.datetime.now().isoformat() # Add a list of physical destination paths for each VOSpace node in the node list for vospacePath in self.nodeList: nodeInfo = self.dbConn.getOSPath(vospacePath) baseSrcPath = nodeInfo["baseSrcPath"] username = nodeInfo["username"] srcPath = nodeInfo["fullPath"] baseDestPath = self.storageRetrievePath.replace("{username}", username) destPath = srcPath.replace(baseSrcPath, baseDestPath) self.destPathList.append(destPath) self.jobObj.jobInfo["destPathList"] = self.destPathList.copy() # Send e-mail notification m = Mailer(self.logger) m.addRecipient(self.adminEmail) userEmail = self.dbConn.getUserEmail(self.jobObj.ownerId) if userEmail != self.adminEmail: m.addRecipient(userEmail) msg = f""" Dear user, your job has been COMPLETED. Job ID: {self.jobObj.jobId} Owner ID: {self.jobObj.ownerId} Your files are available and can be downloaded. """ m.setMessage("VOSpace data retrieve notification: Job COMPLETED", msg) m.send() def cleanup(self): """ Cleanup method. """ self.fileList.clear() self.nodeList.clear() self.destPathList.clear() self.storageType = None self.numBlocks = 0 self.procBlocks = 0 self.totalSize = 0 def run(self): self.logger.info("Starting retrieve executor...") self.setSourceQueueName("read_ready") self.setDestinationQueueName("read_terminated") while True: self.wait() if self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.nodeList = self.jobObj.jobInfo["nodeList"].copy() self.buildFileList() self.buildBlocks() result = self.retrieveData() if result: self.update() self.cleanup() # debug block... print(f"fileList = {self.fileList}") print(f"nodeList = {self.nodeList}") else: sys.exit("Failed to retrieve data!") if self.destQueue.len() == self.maxTerminatedJobs: self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")