Something went wrong on our end
Select Git revision
-
Giovanni La Mura authoredGiovanni La Mura authored
retrieve_executor.py 5.13 KiB
import os
import subprocess
import sys
from config import Config
from db_connector import DbConnector
from tape_client import TapeClient
from task_executor import TaskExecutor
class RetrieveExecutor(TaskExecutor):
def __init__(self):
config = Config("vos_ts.conf")
self.params = config.loadSection("spectrum_archive")
self.tapeClient = TapeClient(self.params["host"],
self.params.getint("port"),
self.params["user"],
self.params["pkey_file_path"])
self.params = config.loadSection("transfer_node")
self.storageRetrievePath = self.params["retrieve_path"]
self.params = config.loadSection("file_catalog")
self.dbConn = DbConnector(self.params["user"],
self.params["password"],
self.params["host"],
self.params.getint("port"),
self.params["db"])
self.jobObj = None
self.jobId = None
self.nodeList = []
super(RetrieveExecutor, self).__init__()
def prepareData(self):
fileList = []
self.dbConn.connect()
self.dbConn.setPhase(self.jobId, "EXECUTING")
self.dbConn.setStartTime(self.jobId)
for vospacePath in self.nodeList:
[srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath)
if storageType == "cold":
if os.path.isdir(srcPath):
for root, dirs, files in os.walk(srcPath):
for f in files:
fileList.append(os.path.join(root, f))
else:
fileList.append(srcPath)
self.dbConn.disconnect()
if fileList:
self.tapeClient.connect()
self.tapeClient.recall(fileList)
self.tapeClient.disconnect()
def retrieveData(self):
self.dbConn.connect()
#self.dbConn.setPhase(self.jobId, "EXECUTING")
for vospacePath in self.nodeList:
[srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath)
osRelParentPath = os.path.dirname(osRelPath)
if osRelParentPath != "/":
osRelParentPath += "/"
out = open("retrieve_executor_log.txt", "a")
out.write(f"srcPath: {srcPath}\n")
out.write(f"osRelPath: {osRelPath}\n")
out.write(f"osRelParentPath: {osRelParentPath}\n")
destPath = self.storageRetrievePath.replace("{username}", username) + osRelParentPath
out.write(f"destPath: {destPath}\n\n")
#if storageType == "cold":
#srcPathPrefix = self.tapeStorageBasePath.replace("{username}", self.username)
# TO BE DONE
# pass
#else:
os.makedirs(destPath, exist_ok = True)
sp = subprocess.run(["rsync", "-av", srcPath, destPath], capture_output = True)
out.write(f"rsync stdout: {sp.stdout}")
if(sp.returncode or sp.stderr):
out.write(f"rsync stderr: {sp.stderr}")
self.dbConn.disconnect()
return False
else:
self.updateAsyncTrans(vospacePath)
self.dbConn.disconnect()
return True
def updateAsyncTrans(self, vospacePath):
self.dbConn.setAsyncTrans(vospacePath, False);
def updateJobStatus(self):
self.dbConn.connect()
results = [{"target": ""}]
results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"]
self.dbConn.setResults(self.jobId, results)
self.jobObj.setPhase("COMPLETED")
self.dbConn.setPhase(self.jobId, "COMPLETED")
self.dbConn.setEndTime(self.jobId)
self.dbConn.disconnect()
def cleanup(self):
nodeType = self.jobObj.jobInfo["transfer"]["protocols"][0]["param"][0]["value"]
vospacePath = self.jobObj.jobInfo["transfer"]["target"].split("!vospace")[1]
if nodeType == "list":
self.dbConn.connect()
self.dbConn.deleteTmpDataNode(vospacePath)
self.dbConn.disconnect()
def run(self):
print("Starting retrieve executor...")
self.setSourceQueueName("read_ready")
self.setDestinationQueueName("read_terminated")
while True:
self.wait()
if self.srcQueue.len() > 0:
self.jobObj = self.srcQueue.getJob()
self.jobId = self.jobObj.jobId
self.nodeList = self.jobObj.jobInfo["nodeList"]
self.prepareData()
result = self.retrieveData()
if result:
self.updateJobStatus()
self.cleanup()
else:
sys.exit("Failed to retrieve data!")
self.destQueue.insertJob(self.jobObj)
self.srcQueue.extractJob()
print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")