#!/usr/bin/env python from config import Config from db_connector import DbConnector from task_executor import TaskExecutor class RetrievePreprocessor(TaskExecutor): def __init__(self): config = Config("/etc/vos_ts/vos_ts.conf") self.params = config.loadSection("file_catalog") self.dbConn = DbConnector(self.params["user"], self.params["password"], self.params["host"], self.params.getint("port"), self.params["db"], 1, 1) self.jobObj = None self.nodeList = [] super(RetrievePreprocessor, self).__init__() def execute(self): nodeType = self.jobObj.jobInfo["transfer"]["protocols"][0]["param"][0]["value"] vospacePath = self.jobObj.jobInfo["transfer"]["target"].split("!vospace")[1] if nodeType == "single": self.nodeList.append(vospacePath) else: self.nodeList = self.dbConn.getVOSpacePathList(vospacePath) self.jobObj.jobInfo["nodeList"] = self.nodeList def cleanup(self): self.nodeList.clear() def run(self): print("Starting retrieve preprocessor...") self.setSourceQueueName("read_pending") self.setDestinationQueueName("read_ready") while True: self.wait() if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() self.execute() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() self.cleanup() print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")