import os import subprocess import sys from config import Config from db_connector import DbConnector from tape_client import TapeClient from task_executor import TaskExecutor class RetrieveExecutor(TaskExecutor): def __init__(self): config = Config("vos_ts.conf") self.params = config.loadSection("spectrum_archive") self.tapeClient = TapeClient(self.params["host"], self.params.getint("port"), self.params["user"], self.params["pkey_file_path"]) self.params = config.loadSection("transfer_node") self.storageRetrievePath = self.params["retrieve_path"] self.params = config.loadSection("file_catalog") self.dbConn = DbConnector(self.params["user"], self.params["password"], self.params["host"], self.params.getint("port"), self.params["db"]) self.jobObj = None self.jobId = None self.nodeList = [] super(RetrieveExecutor, self).__init__() def prepareData(self): fileList = [] self.dbConn.connect() self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) for vospacePath in self.nodeList: [srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) if storageType == "cold": if os.path.isdir(srcPath): for root, dirs, files in os.walk(srcPath): for f in files: fileList.append(os.path.join(root, f)) else: fileList.append(srcPath) self.dbConn.disconnect() if fileList: self.tapeClient.connect() self.tapeClient.recall(fileList) self.tapeClient.disconnect() def retrieveData(self): self.dbConn.connect() #self.dbConn.setPhase(self.jobId, "EXECUTING") for vospacePath in self.nodeList: [srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) osRelParentPath = os.path.dirname(osRelPath) if osRelParentPath != "/": osRelParentPath += "/" out = open("retrieve_executor_log.txt", "a") out.write(f"srcPath: {srcPath}\n") out.write(f"osRelPath: {osRelPath}\n") out.write(f"osRelParentPath: {osRelParentPath}\n") destPath = self.storageRetrievePath.replace("{username}", username) + osRelParentPath out.write(f"destPath: {destPath}\n\n") #if storageType == "cold": #srcPathPrefix = self.tapeStorageBasePath.replace("{username}", self.username) # TO BE DONE # pass #else: os.makedirs(destPath, exist_ok = True) sp = subprocess.run(["rsync", "-av", srcPath, destPath], capture_output = True) out.write(f"rsync stdout: {sp.stdout}") if(sp.returncode or sp.stderr): out.write(f"rsync stderr: {sp.stderr}") self.dbConn.disconnect() return False else: self.updateAsyncTrans(vospacePath) self.dbConn.disconnect() return True def updateAsyncTrans(self, vospacePath): self.dbConn.setAsyncTrans(vospacePath, False); def updateJobStatus(self): self.dbConn.connect() results = [{"target": ""}] results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"] self.dbConn.setResults(self.jobId, results) self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) self.dbConn.disconnect() def cleanup(self): nodeType = self.jobObj.jobInfo["transfer"]["protocols"][0]["param"][0]["value"] vospacePath = self.jobObj.jobInfo["transfer"]["target"].split("!vospace")[1] if nodeType == "list": self.dbConn.connect() self.dbConn.deleteTmpDataNode(vospacePath) self.dbConn.disconnect() def run(self): print("Starting retrieve executor...") self.setSourceQueueName("read_ready") self.setDestinationQueueName("read_terminated") while True: self.wait() if self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.nodeList = self.jobObj.jobInfo["nodeList"] self.prepareData() result = self.retrieveData() if result: self.updateJobStatus() self.cleanup() else: sys.exit("Failed to retrieve data!") self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")