Skip to content
Snippets Groups Projects
Select Git revision
  • c5adb876e8ad7b18980fdb09ebec0c927f88fa9d
  • main default protected
  • 1.8.5
  • 1.8.4
  • 1.8.3
  • 1.8.2
  • 1.8.1
  • 1.8.0
  • 1.7.14
  • 1.7.13
  • 1.7.12
  • 1.7.11
  • 1.7.10
  • 1.7.9
  • 1.7.8
  • 1.7.7
  • 1.7.6
  • 1.7.5
  • 1.7.4
  • 1.7.3
  • 1.7.2
  • 1.7.1
22 results

UWSSodaWork.java

Blame
  • retrieve_preprocessor.py 1.87 KiB
    #!/usr/bin/env python
    
    from config import Config
    from db_connector import DbConnector
    from task_executor import TaskExecutor
    
    
    class RetrievePreprocessor(TaskExecutor):
    
        def __init__(self):
            config = Config("/etc/vos_ts/vos_ts.conf")
            self.params = config.loadSection("file_catalog")
            self.dbConn = DbConnector(self.params["user"],
                                      self.params["password"],
                                      self.params["host"],
                                      self.params.getint("port"),
                                      self.params["db"],
                                      1,
                                      1)
            self.jobObj = None
            self.nodeList = []
            super(RetrievePreprocessor, self).__init__()
            
        def execute(self):
            nodeType = self.jobObj.jobInfo["transfer"]["protocols"][0]["param"][0]["value"]
            vospacePath = self.jobObj.jobInfo["transfer"]["target"].split("!vospace")[1]
            if nodeType == "single":                    
                self.nodeList.append(vospacePath)
            else:
                self.nodeList = self.dbConn.getVOSpacePathList(vospacePath)
            self.jobObj.jobInfo["nodeList"] = self.nodeList        
    
        def cleanup(self):
            self.nodeList.clear()
    
        def run(self):
            print("Starting retrieve preprocessor...")
            self.setSourceQueueName("read_pending")
            self.setDestinationQueueName("read_ready")
            while True:
                self.wait()
                if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0:
                    self.jobObj = self.srcQueue.getJob()                
                    self.execute()
                    self.destQueue.insertJob(self.jobObj)
                    self.srcQueue.extractJob()
                    self.cleanup()
                    print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")