Select Git revision
ConfirmRemoveMemberModal.vue
-
Sonia Zorba authoredSonia Zorba authored
retrieve_preprocessor.py 1.87 KiB
#!/usr/bin/env python
from config import Config
from db_connector import DbConnector
from task_executor import TaskExecutor
class RetrievePreprocessor(TaskExecutor):
def __init__(self):
config = Config("/etc/vos_ts/vos_ts.conf")
self.params = config.loadSection("file_catalog")
self.dbConn = DbConnector(self.params["user"],
self.params["password"],
self.params["host"],
self.params.getint("port"),
self.params["db"],
1,
1)
self.jobObj = None
self.nodeList = []
super(RetrievePreprocessor, self).__init__()
def execute(self):
nodeType = self.jobObj.jobInfo["transfer"]["protocols"][0]["param"][0]["value"]
vospacePath = self.jobObj.jobInfo["transfer"]["target"].split("!vospace")[1]
if nodeType == "single":
self.nodeList.append(vospacePath)
else:
self.nodeList = self.dbConn.getVOSpacePathList(vospacePath)
self.jobObj.jobInfo["nodeList"] = self.nodeList
def cleanup(self):
self.nodeList.clear()
def run(self):
print("Starting retrieve preprocessor...")
self.setSourceQueueName("read_pending")
self.setDestinationQueueName("read_ready")
while True:
self.wait()
if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0:
self.jobObj = self.srcQueue.getJob()
self.execute()
self.destQueue.insertJob(self.jobObj)
self.srcQueue.extractJob()
self.cleanup()
print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")