Select Git revision
CommandLine.py
-
Andrea Orlati authored
* fix issue #902: fixed an issue, related to the porting to python 3 for the Noto LocalOscillator component * fix issue #902: changed totalpower, weather station configuration and IfDistributr python code (ported to python 3) * fix issue #902: small fix as requested in Pull Request review * fix issue #902: adjusted attenutaions in setup procedure for primary band recevier --------- Co-authored-by:
Andrea Orlati <a.orati@ira.inaf.it>
Andrea Orlati authored* fix issue #902: fixed an issue, related to the porting to python 3 for the Noto LocalOscillator component * fix issue #902: changed totalpower, weather station configuration and IfDistributr python code (ported to python 3) * fix issue #902: small fix as requested in Pull Request review * fix issue #902: adjusted attenutaions in setup procedure for primary band recevier --------- Co-authored-by:
Andrea Orlati <a.orati@ira.inaf.it>
retrieve_executor.py 13.73 KiB
#!/usr/bin/env python
#
#
# This class is responsible to retrieve data from a generic storage point.
#
# The operations performed are the briefly summarized here below:
# * obtain the storage type
# * create a list of files to be retrieved (list of dictionaries)
# * split the list in blocks of a fixed size
# * loop on each block and retrieve data
# - if the storage type is 'cold' (tape) perform a recall operation
# before the copy and a migrate operation after the copy
# - check if data associated to a VOSpace node has been copied
# every time a block is retrieved
# - recursively update the 'async_trans' flag
# * cleanup
#
#
import datetime
import json
import os
import logging
import subprocess
import sys
from checksum import Checksum
from config import Config
from db_connector import DbConnector
from mailer import Mailer
from redis_log_handler import RedisLogHandler
from system_utils import SystemUtils
from tape_client import TapeClient
from task_executor import TaskExecutor
class RetrieveExecutor(TaskExecutor):
def __init__(self):
self.type = "retrieve_executor"
self.systemUtils = SystemUtils()
config = Config("/etc/vos_ts/vos_ts.conf")
params = config.loadSection("transfer_node")
self.storageRetrievePath = params["retrieve_path"]
params = config.loadSection("file_catalog")
self.dbConn = DbConnector(params["user"],
params["password"],
params["host"],
params.getint("port"),
params["db"],
1,
1)
params = config.loadSection("transfer")
self.maxBlockSize = self.systemUtils.convertSizeToBytes(params["block_size"])
params = config.loadSection("scheduling")
self.maxTerminatedJobs = params.getint("max_terminated_jobs")
params = config.loadSection("mail")
self.adminEmail = params["admin_email"]
params = config.loadSection("logging")
self.logger = logging.getLogger(__name__)
logLevel = "logging." + params["log_level"]
logFormat = params["log_format"]
logFormatter = logging.Formatter(logFormat)
self.logger.setLevel(eval(logLevel))
redisLogHandler = RedisLogHandler()
logStreamHandler = logging.StreamHandler()
logStreamHandler.setFormatter(logFormatter)
redisLogHandler.setFormatter(logFormatter)
self.logger.addHandler(redisLogHandler)
self.logger.addHandler(logStreamHandler)
params = config.loadSection("spectrum_archive")
self.tapePool = params["tape_pool"]
self.tapeClient = TapeClient(params["host"],
params.getint("port"),
params["user"],
params["pkey_file_path"],
self.logger)
self.storageType = None
self.jobObj = None
self.jobId = None
self.nodeList = []
self.fileList = []
self.destPathList = []
self.numBlocks = 0
self.procBlocks = 0
self.totalSize = 0
super(RetrieveExecutor, self).__init__()
def buildFileList(self):
"""
Generates the list of all files to retrieve.
"""
self.dbConn.setPhase(self.jobId, "EXECUTING")
self.dbConn.setStartTime(self.jobId)
# debug block...
if os.path.exists("nodeList.txt"):
os.remove("nodeList.txt")
nl = open("nodeList.txt", 'w')
for vospacePath in self.nodeList:
nl.write(vospacePath + '\n')
nl.close()
# Obtain the storage type
self.storageType = self.dbConn.getOSPath(self.nodeList[0])["storageType"]
for vospacePath in self.nodeList:
nodeInfo = self.dbConn.getOSPath(vospacePath)
baseSrcPath = nodeInfo["baseSrcPath"]
srcPath = nodeInfo["fullPath"]
username = nodeInfo["username"]
md5calc = Checksum()
if os.path.isdir(srcPath):
for root, dirs, files in os.walk(srcPath, topdown = False):
for f in files:
fullPath = os.path.join(root, f)
if md5calc.fileIsValid(fullPath):
fileSize = os.stat(fullPath).st_size
fileInfo = {
"baseSrcPath": baseSrcPath,
"fullPath": fullPath,
"username": username,
"fileSize": fileSize,
"vospaceRootParent": vospacePath
}
self.fileList.append(fileInfo.copy())
else:
if md5calc.fileIsValid(srcPath):
fileSize = nodeInfo["contentLength"]
fileInfo = {
"baseSrcPath": baseSrcPath,
"fullPath": srcPath,
"username": username,
"fileSize": fileSize,
"vospaceRootParent": vospacePath
}
self.fileList.append(fileInfo.copy())
# debug block...
if os.path.exists("fileList.txt"):
os.remove("fileList.txt")
fl = open("fileList.txt", 'w')
fl.write(json.dumps(self.fileList, indent = 4))
fl.close()
def buildBlocks(self):
"""
Algorithm to split data in blocks of a well known size.
"""
if self.fileList:
blockIdx = 0
blockSize = 0
for fileInfo in self.fileList:
fileSize = fileInfo["fileSize"]
self.totalSize += fileSize
# check if the file is larger than a block size
if fileSize > self.maxBlockSize:
# if the current block is not empty, "close" it, otherwise
# use it and then create a new block
if blockSize > 0:
blockIdx += 1
fileInfo["blockIdx"] = blockIdx
blockIdx += 1
else:
fileInfo["blockIdx"] = blockIdx
blockIdx += 1
blockSize = 0
else:
# the file can be contained by a block, so check if
# the file size plus the current block fill is lower
# than the maximum block size
if blockSize + fileSize <= self.maxBlockSize:
# if so, add the file to the block and go ahead with
# the next one
fileInfo["blockIdx"] = blockIdx
blockSize += fileSize
else:
# if not, "close" the current block, add it to the block list,
# then create a new block, add the file to it and go ahead
# with the next one
blockIdx += 1
fileInfo["blockIdx"] = blockIdx
blockSize = fileSize
if self.fileList:
self.numBlocks = blockIdx + 1
self.dbConn.setTotalBlocks(self.jobId, self.numBlocks)
# debug block...
print(f"numBlocks = {self.numBlocks}")
if os.path.exists("blocks.txt"):
os.remove("blocks.txt")
fl = open("blocks.txt", 'w')
fl.write(json.dumps(self.fileList, indent = 4))
fl.close()
def retrieveCompleted(self, vospacePath):
"""
Returns 'True' if all data associated to 'vospacePath'
has been copied, otherwise it returns 'False'.
"""
return not any(vospacePath in f["vospaceRootParent"] for f in self.fileList)
def retrieveData(self):
"""
Retrieves data from a generic storage point (hot or cold).
"""
# Loop on blocks
for blockIdx in range(self.numBlocks):
blockFileList = [ f for f in self.fileList if f["blockIdx"] == blockIdx ]
# Recall all files from tape library to tape frontend
# if the storage type is 'cold'
if self.storageType == "cold":
self.tapeClient.connect()
self.tapeClient.recall([ f["fullPath"] for f in blockFileList ])
self.tapeClient.disconnect()
# Loop on files in a block
for fileInfo in blockFileList:
srcPath = fileInfo["fullPath"]
username = fileInfo["username"]
baseSrcPath = fileInfo["baseSrcPath"]
osRelParentPath = os.path.dirname(srcPath)
osRelParentPath = osRelParentPath.replace(baseSrcPath, "")
if osRelParentPath != "/":
osRelParentPath += "/"
destDirPath = self.storageRetrievePath.replace("{username}", username) + osRelParentPath
os.makedirs(destDirPath, exist_ok = True)
sp = subprocess.run(["rsync", "-av", srcPath, destDirPath], capture_output = True)
if(sp.returncode or sp.stderr):
return False
# Remove files from file list at the end of the copy
for fileInfo in blockFileList:
if fileInfo in self.fileList:
self.fileList.remove(fileInfo)
# Check if the copy related to a certain VOSpace node
# is completed and recursively update the 'async_trans'
# flag
for vospacePath in self.nodeList:
if self.retrieveCompleted(vospacePath):
self.dbConn.setAsyncTrans(vospacePath, False)
# Empty the tape library frontend if the storage type
# is 'cold'
if self.storageType == "cold":
self.tapeClient.connect()
self.tapeClient.migrate([ f["fullPath"] for f in blockFileList ], self.tapePool)
self.tapeClient.disconnect()
blockFileList.clear()
self.procBlocks += 1
self.dbConn.updateProcessedBlocks(self.jobId, self.procBlocks)
return True
def update(self):
"""
Updates the job status and sends an email to the user.
"""
results = [{"target": ""}]
results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"]
self.dbConn.setResults(self.jobId, results)
self.jobObj.setPhase("COMPLETED")
self.dbConn.setPhase(self.jobId, "COMPLETED")
self.dbConn.setEndTime(self.jobId)
self.jobObj.endTime = datetime.datetime.now().isoformat()
# Add a list of physical destination paths for each VOSpace node in the node list
for vospacePath in self.nodeList:
nodeInfo = self.dbConn.getOSPath(vospacePath)
baseSrcPath = nodeInfo["baseSrcPath"]
username = nodeInfo["username"]
srcPath = nodeInfo["fullPath"]
baseDestPath = self.storageRetrievePath.replace("{username}", username)
destPath = srcPath.replace(baseSrcPath, baseDestPath)
self.destPathList.append(destPath)
self.jobObj.jobInfo["destPathList"] = self.destPathList.copy()
# Send e-mail notification
m = Mailer(self.logger)
m.addRecipient(self.adminEmail)
userEmail = self.dbConn.getUserEmail(self.jobObj.ownerId)
if userEmail != self.adminEmail:
m.addRecipient(userEmail)
msg = f"""
Dear user,
your job has been COMPLETED.
Job ID: {self.jobObj.jobId}
Owner ID: {self.jobObj.ownerId}
Your files are available and can be downloaded.
"""
m.setMessage("VOSpace data retrieve notification: Job COMPLETED", msg)
m.send()
def cleanup(self):
"""
Cleanup method.
"""
self.fileList.clear()
self.nodeList.clear()
self.destPathList.clear()
self.storageType = None
self.numBlocks = 0
self.procBlocks = 0
self.totalSize = 0
def run(self):
self.logger.info("Starting retrieve executor...")
self.setSourceQueueName("read_ready")
self.setDestinationQueueName("read_terminated")
while True:
self.wait()
if self.srcQueue.len() > 0:
self.jobObj = self.srcQueue.getJob()
self.jobId = self.jobObj.jobId
self.nodeList = self.jobObj.jobInfo["nodeList"].copy()
self.buildFileList()
self.buildBlocks()
result = self.retrieveData()
if result:
self.update()
self.cleanup()
# debug block...
print(f"fileList = {self.fileList}")
print(f"nodeList = {self.nodeList}")
else:
sys.exit("Failed to retrieve data!")
if self.destQueue.len() == self.maxTerminatedJobs:
self.destQueue.extractJob()
self.destQueue.insertJob(self.jobObj)
self.srcQueue.extractJob()
self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")