Skip to content
Snippets Groups Projects
Select Git revision
  • 91693e1dcec3df5b9f5134c0594bcdee1b6c44f1
  • master default protected
  • parallel_trapping
  • offload_trapping
  • script_devel
  • unify_iterations
  • containers-m10
  • magma_refinement
  • release9
  • enable_svd
  • parallel_angles_gmu
  • containers-m8
  • parallel_angles
  • profile_omp_leonardo
  • test_nvidia_profiler
  • containers
  • shaditest
  • test1
  • main
  • 3-error-in-run-the-program
  • experiment
  • NP_TMcode-M10a.03
  • NP_TMcode-M10a.02
  • NP_TMcode-M10a.01
  • NP_TMcode-M10a.00
  • NP_TMcode-M9.01
  • NP_TMcode-M9.00
  • NP_TMcode-M8.03
  • NP_TMcode-M8.02
  • NP_TMcode-M8.01
  • NP_TMcode-M8.00
  • NP_TMcode-M7.00
  • v0.0
33 results

clffft.cpp

Blame
  • retrieve_executor.py 5.13 KiB
    import os
    import subprocess
    import sys
    
    from config import Config
    from db_connector import DbConnector
    from tape_client import TapeClient
    from task_executor import TaskExecutor
    
    
    class RetrieveExecutor(TaskExecutor): 
    
        def __init__(self):
            config = Config("vos_ts.conf")
            self.params = config.loadSection("spectrum_archive")
            self.tapeClient = TapeClient(self.params["host"],
                                         self.params.getint("port"),
                                         self.params["user"],
                                         self.params["pkey_file_path"])
            self.params = config.loadSection("transfer_node")
            self.storageRetrievePath = self.params["retrieve_path"]
            self.params = config.loadSection("file_catalog")
            self.dbConn = DbConnector(self.params["user"],
                                      self.params["password"],
                                      self.params["host"],
                                      self.params.getint("port"),
                                      self.params["db"])
            self.jobObj = None
            self.jobId = None
            self.nodeList = []
            super(RetrieveExecutor, self).__init__()
            
        def prepareData(self):
            fileList = []
            self.dbConn.connect()
            self.dbConn.setPhase(self.jobId, "EXECUTING")
            self.dbConn.setStartTime(self.jobId)
            for vospacePath in self.nodeList:
                [srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath)
                if storageType == "cold":
                    if os.path.isdir(srcPath):    
                        for root, dirs, files in os.walk(srcPath):
                            for f in files:
                                fileList.append(os.path.join(root, f))
                    else:
                        fileList.append(srcPath)
            self.dbConn.disconnect()
            if fileList:
                self.tapeClient.connect()
                self.tapeClient.recall(fileList)
                self.tapeClient.disconnect()
            
        def retrieveData(self):
            self.dbConn.connect()
            #self.dbConn.setPhase(self.jobId, "EXECUTING")
            for vospacePath in self.nodeList:
                [srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath)
                osRelParentPath = os.path.dirname(osRelPath)
                if osRelParentPath != "/":
                    osRelParentPath += "/"
                out = open("retrieve_executor_log.txt", "a")
                out.write(f"srcPath: {srcPath}\n")
                out.write(f"osRelPath: {osRelPath}\n")
                out.write(f"osRelParentPath: {osRelParentPath}\n")            
                destPath = self.storageRetrievePath.replace("{username}", username) + osRelParentPath
                out.write(f"destPath: {destPath}\n\n")
                #if storageType == "cold":
                    #srcPathPrefix = self.tapeStorageBasePath.replace("{username}", self.username)
                    # TO BE DONE 
                #    pass
                #else:
                os.makedirs(destPath, exist_ok = True)
                sp = subprocess.run(["rsync", "-av", srcPath, destPath], capture_output = True)
                out.write(f"rsync stdout: {sp.stdout}")
                if(sp.returncode or sp.stderr):
                    out.write(f"rsync stderr: {sp.stderr}")
                    self.dbConn.disconnect()
                    return False
                else:
                    self.updateAsyncTrans(vospacePath)
            self.dbConn.disconnect()    
            return True
    
        def updateAsyncTrans(self, vospacePath):
            self.dbConn.setAsyncTrans(vospacePath, False);
    
        def updateJobStatus(self):
            self.dbConn.connect()        
            results = [{"target": ""}]
            results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"]        
            self.dbConn.setResults(self.jobId, results)
            self.jobObj.setPhase("COMPLETED")
            self.dbConn.setPhase(self.jobId, "COMPLETED")
            self.dbConn.setEndTime(self.jobId)
            self.dbConn.disconnect()
            
        def cleanup(self):
            nodeType = self.jobObj.jobInfo["transfer"]["protocols"][0]["param"][0]["value"]
            vospacePath = self.jobObj.jobInfo["transfer"]["target"].split("!vospace")[1]
            if nodeType == "list":
                self.dbConn.connect()
                self.dbConn.deleteTmpDataNode(vospacePath)
                self.dbConn.disconnect()
    
        def run(self):
            print("Starting retrieve executor...")
            self.setSourceQueueName("read_ready")
            self.setDestinationQueueName("read_terminated")
            while True:
                self.wait()
                if self.srcQueue.len() > 0:
                    self.jobObj = self.srcQueue.getJob()
                    self.jobId = self.jobObj.jobId                
                    self.nodeList = self.jobObj.jobInfo["nodeList"]
                    self.prepareData()
                    result = self.retrieveData()
                    if result:
                        self.updateJobStatus()
                        self.cleanup()
                    else:
                        sys.exit("Failed to retrieve data!")
                    self.destQueue.insertJob(self.jobObj)
                    self.srcQueue.extractJob()
                    print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}")