diff --git a/transfer_service/retrieve_executor.py b/transfer_service/retrieve_executor.py index 057152af5e6f80ff66d49d5fd15c1e05992866ac..f7fe4432375ab97c6218b2b72392effe73cb53be 100644 --- a/transfer_service/retrieve_executor.py +++ b/transfer_service/retrieve_executor.py @@ -25,6 +25,8 @@ class RetrieveExecutor(TaskExecutor): self.params.getint("port"), self.params["db"]) self.jobObj = None + self.jobId = None + self.nodeList = [] super(RetrieveExecutor, self).__init__() def retrieveData(self): @@ -66,6 +68,7 @@ class RetrieveExecutor(TaskExecutor): results = [{"target": ""}] results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"] self.dbConn.setResults(self.jobId, results) + self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.disconnect() @@ -87,6 +90,7 @@ class RetrieveExecutor(TaskExecutor): self.updateJobStatus() else: sys.exit("Failed to retrieve data!") - self.srcQueue.moveJobTo(self.destQueue.name()) + self.destQueue.insertJob(self.jobObj) + self.srcQueue.extractJob() print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") diff --git a/transfer_service/retrieve_preprocessor.py b/transfer_service/retrieve_preprocessor.py index 594f38740c96e575a0f917858c37bd090a105382..7648c83e13498f2455f426e412642698159427ab 100644 --- a/transfer_service/retrieve_preprocessor.py +++ b/transfer_service/retrieve_preprocessor.py @@ -36,7 +36,7 @@ class RetrievePreprocessor(TaskExecutor): self.wait() if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() - self.execute() - self.srcQueue.extractJob() + self.execute() self.destQueue.insertJob(self.jobObj) - print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") \ No newline at end of file + self.srcQueue.extractJob() + print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") diff --git a/transfer_service/store_executor.py b/transfer_service/store_executor.py index 72b54b1f30053f396e80c615785f74a65ee4b47f..f0badca587b9348fd3ec3d3c4340db7060674b6c 100644 --- a/transfer_service/store_executor.py +++ b/transfer_service/store_executor.py @@ -27,6 +27,7 @@ class StoreExecutor(TaskExecutor): self.params.getint("port"), self.params["db"]) self.jobObj = None + self.jobId = None self.username = None self.requestType = None self.storageId = None @@ -35,10 +36,11 @@ class StoreExecutor(TaskExecutor): super(StoreExecutor, self).__init__() def copyData(self): + self.dbConn.connect() + self.dbConn.setPhase(self.jobId, "EXECUTING") srcPathPrefix = self.storageStorePath.replace("{username}", self.username) srcData = os.listdir(srcPathPrefix) if self.requestType == "CSTORE": - self.dbConn.connect() destPathPrefix = self.dbConn.getStorageBasePath(self.storageId) + '/' + self.username self.dbConn.disconnect() #destPathPrefix = self.tapeStorageBasePath.replace("{username}", self.username) @@ -47,7 +49,6 @@ class StoreExecutor(TaskExecutor): self.tapeClient.copy(srcPathPrefix + '/' + el, destPathPrefix + '/' + el) self.tapeClient.disconnect() else: - self.dbConn.connect() destPathPrefix = self.dbConn.getStorageBasePath(self.storageId) + '/' + self.username self.dbConn.disconnect() #destPathPrefix = self.serverStorageBasePath.replace("{username}", self.username) @@ -80,6 +81,8 @@ class StoreExecutor(TaskExecutor): self.dbConn.setAsyncTrans(nodeVOSPath, True); self.dbConn.setSticky(nodeVOSPath, True); self.dbConn.setBusyState(nodeVOSPath, False); + self.jobObj.setPhase("COMPLETED") + self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.disconnect() def run(self): @@ -90,6 +93,7 @@ class StoreExecutor(TaskExecutor): self.wait() if self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() + self.jobId = self.jobObj.jobId self.username = self.jobObj.jobInfo["userName"] self.requestType = self.jobObj.jobInfo["requestType"] self.storageId = self.jobObj.jobInfo["storageId"] @@ -107,5 +111,6 @@ class StoreExecutor(TaskExecutor): self.copyData() self.cleanup() self.update() - self.srcQueue.moveJobTo(self.destQueue.name()) + self.destQueue.insertJob(self.jobObj) + self.srcQueue.extractJob() print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index 9f1ef8d150be15e6f2f6ad0bc4b2fce2357677de..28e1b49d16ff36b3738d857867d6d1577d0e7e95 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -37,6 +37,7 @@ class StorePreprocessor(TaskExecutor): self.storageStorePath = self.params["store_path"] self.storageId = None self.jobObj = None + self.jobId = None self.username = None self.userId = None self.nodeList = [] @@ -201,6 +202,12 @@ class StorePreprocessor(TaskExecutor): out.write("\n") out.close() self.dbConn.disconnect() + + def update(self): + self.jobObj.setPhase("QUEUED") + self.dbConn.connect() + self.dbConn.setPhase(self.jobId, "QUEUED") + self.dbConn.disconnect() def run(self): print("Starting store preprocessor...") @@ -210,13 +217,15 @@ class StorePreprocessor(TaskExecutor): self.wait() if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() + self.jobId = self.jobObj.jobId self.storageId = self.jobObj.jobInfo["storageId"] self.username = self.jobObj.jobInfo["userName"] self.prepare(self.username) self.execute() + self.update() self.jobObj.jobInfo["nodeList"] = self.nodeList - self.srcQueue.extractJob() self.destQueue.insertJob(self.jobObj) + self.srcQueue.extractJob() print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") # Test