Skip to content
Snippets Groups Projects
Select Git revision
  • a5e6c4800fb6eab435607d4ab1c597306fc237db
  • main default protected
  • Kelvinrr-patch-3
  • radius_update
  • revert-616-apollo_pan
  • vims
  • 0.10
  • Kelvinrr-patch-2
  • revert-563-minirf_fix
  • Kelvinrr-patch-1
  • 0.9
  • acpaquette-patch-3
  • acpaquette-patch-2
  • acpaquette-patch-1
  • spiceql
  • ci-coverage
  • 0.10.0
  • 0.9.1
  • 0.9.0
  • 0.8.7
  • 0.8.8
  • 0.8.6
  • 0.8.3
  • 0.8.4
  • 0.8.5
  • 0.8.2
  • 0.8.1
  • 0.8.0
  • 0.7.3
  • 0.7.2
  • 0.7.1
  • 0.7.0
  • 0.6.5
  • 0.6.4
  • 0.6.3
  • 0.6.2
36 results

mro_driver.rst

Blame
  • retrieve_executor.py 5.13 KiB
    import os
    import subprocess
    import sys
    
    from config import Config
    from db_connector import DbConnector
    from tape_client import TapeClient
    from task_executor import TaskExecutor
    
    
    class RetrieveExecutor(TaskExecutor): 
    
        def __init__(self):
            config = Config("vos_ts.conf")
            self.params = config.loadSection("spectrum_archive")
            self.tapeClient = TapeClient(self.params["host"],
                                         self.params.getint("port"),
                                         self.params["user"],
                                         self.params["pkey_file_path"])
            self.params = config.loadSection("transfer_node")
            self.storageRetrievePath = self.params["retrieve_path"]
            self.params = config.loadSection("file_catalog")
            self.dbConn = DbConnector(self.params["user"],
                                      self.params["password"],
                                      self.params["host"],
                                      self.params.getint("port"),
                                      self.params["db"])
            self.jobObj = None
            self.jobId = None
            self.nodeList = []
            super(RetrieveExecutor, self).__init__()
            
        def prepareData(self):
            fileList = []
            self.dbConn.connect()
            self.dbConn.setPhase(self.jobId, "EXECUTING")
            self.dbConn.setStartTime(self.jobId)
            for vospacePath in self.nodeList:
                [srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath)
                if storageType == "cold":
                    if os.path.isdir(srcPath):    
                        for root, dirs, files in os.walk(srcPath):
                            for f in files:
                                fileList.append(os.path.join(root, f))
                    else:
                        fileList.append(srcPath)
            self.dbConn.disconnect()
            if fileList:
                self.tapeClient.connect()
                self.tapeClient.recall(fileList)
                self.tapeClient.disconnect()
            
        def retrieveData(self):
            self.dbConn.connect()
            #self.dbConn.setPhase(self.jobId, "EXECUTING")
            for vospacePath in self.nodeList:
                [srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath)
                osRelParentPath = os.path.dirname(osRelPath)
                if osRelParentPath != "/":
                    osRelParentPath += "/"
                out = open("retrieve_executor_log.txt", "a")
                out.write(f"srcPath: {srcPath}\n")
                out.write(f"osRelPath: {osRelPath}\n")
                out.write(f"osRelParentPath: {osRelParentPath}\n")            
                destPath = self.storageRetrievePath.replace("{username}", username) + osRelParentPath
                out.write(f"destPath: {destPath}\n\n")
                #if storageType == "cold":
                    #srcPathPrefix = self.tapeStorageBasePath.replace("{username}", self.username)
                    # TO BE DONE 
                #    pass
                #else:
                os.makedirs(destPath, exist_ok = True)
                sp = subprocess.run(["rsync", "-av", srcPath, destPath], capture_output = True)
                out.write(f"rsync stdout: {sp.stdout}")
                if(sp.returncode or sp.stderr):
                    out.write(f"rsync stderr: {sp.stderr}")
                    self.dbConn.disconnect()
                    return False
                else:
                    self.updateAsyncTrans(vospacePath)
            self.dbConn.disconnect()    
            return True
    
        def updateAsyncTrans(self, vospacePath):
            self.dbConn.setAsyncTrans(vospacePath, False);
    
        def updateJobStatus(self):
            self.dbConn.connect()        
            results = [{"target": ""}]
            results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"]        
            self.dbConn.setResults(self.jobId, results)
            self.jobObj.setPhase("COMPLETED")
            self.dbConn.setPhase(self.jobId, "COMPLETED")
            self.dbConn.setEndTime(self.jobId)
            self.dbConn.disconnect()
            
        def cleanup(self):
            nodeType = self.jobObj.jobInfo["transfer"]["protocols"][0]["param"][0]["value"]
            vospacePath = self.jobObj.jobInfo["transfer"]["target"].split("!vospace")[1]
            if nodeType == "list":
                self.dbConn.connect()
                self.dbConn.deleteTmpDataNode(vospacePath)
                self.dbConn.disconnect()
    
        def run(self):
            print("Starting retrieve executor...")
            self.setSourceQueueName("read_ready")
            self.setDestinationQueueName("read_terminated")
            while True:
                self.wait()
                if self.srcQueue.len() > 0:
                    self.jobObj = self.srcQueue.getJob()
                    self.jobId = self.jobObj.jobId                
                    self.nodeList = self.jobObj.jobInfo["nodeList"]
                    self.prepareData()
                    result = self.retrieveData()
                    if result:
                        self.updateJobStatus()
                        self.cleanup()
                    else:
                        sys.exit("Failed to retrieve data!")
                    self.destQueue.insertJob(self.jobObj)
                    self.srcQueue.extractJob()
                    print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")