From c73f71df6d033d0a97abf709e267d89d382a9e5d Mon Sep 17 00:00:00 2001 From: Cristiano Urban Date: Tue, 9 Feb 2021 20:05:57 +0100 Subject: [PATCH] Some changes on how we move jobs from one queue to another + added status update on db and redis for storage operations. Signed-off-by: Cristiano Urban --- transfer_service/retrieve_executor.py | 6 +++++- transfer_service/retrieve_preprocessor.py | 6 +++--- transfer_service/store_executor.py | 11 ++++++++--- transfer_service/store_preprocessor.py | 11 ++++++++++- 4 files changed, 26 insertions(+), 8 deletions(-) diff --git a/transfer_service/retrieve_executor.py b/transfer_service/retrieve_executor.py index 057152a..f7fe443 100644 --- a/transfer_service/retrieve_executor.py +++ b/transfer_service/retrieve_executor.py @@ -25,6 +25,8 @@ class RetrieveExecutor(TaskExecutor): self.params.getint("port"), self.params["db"]) self.jobObj = None + self.jobId = None + self.nodeList = [] super(RetrieveExecutor, self).__init__() def retrieveData(self): @@ -66,6 +68,7 @@ class RetrieveExecutor(TaskExecutor): results = [{"target": ""}] results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"] self.dbConn.setResults(self.jobId, results) + self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.disconnect() @@ -87,6 +90,7 @@ class RetrieveExecutor(TaskExecutor): self.updateJobStatus() else: sys.exit("Failed to retrieve data!") - self.srcQueue.moveJobTo(self.destQueue.name()) + self.destQueue.insertJob(self.jobObj) + self.srcQueue.extractJob() print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") diff --git a/transfer_service/retrieve_preprocessor.py b/transfer_service/retrieve_preprocessor.py index 594f387..7648c83 100644 --- a/transfer_service/retrieve_preprocessor.py +++ b/transfer_service/retrieve_preprocessor.py @@ -36,7 +36,7 @@ class RetrievePreprocessor(TaskExecutor): self.wait() if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() - self.execute() - self.srcQueue.extractJob() + self.execute() self.destQueue.insertJob(self.jobObj) - print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") \ No newline at end of file + self.srcQueue.extractJob() + print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") diff --git a/transfer_service/store_executor.py b/transfer_service/store_executor.py index 72b54b1..f0badca 100644 --- a/transfer_service/store_executor.py +++ b/transfer_service/store_executor.py @@ -27,6 +27,7 @@ class StoreExecutor(TaskExecutor): self.params.getint("port"), self.params["db"]) self.jobObj = None + self.jobId = None self.username = None self.requestType = None self.storageId = None @@ -35,10 +36,11 @@ class StoreExecutor(TaskExecutor): super(StoreExecutor, self).__init__() def copyData(self): + self.dbConn.connect() + self.dbConn.setPhase(self.jobId, "EXECUTING") srcPathPrefix = self.storageStorePath.replace("{username}", self.username) srcData = os.listdir(srcPathPrefix) if self.requestType == "CSTORE": - self.dbConn.connect() destPathPrefix = self.dbConn.getStorageBasePath(self.storageId) + '/' + self.username self.dbConn.disconnect() #destPathPrefix = self.tapeStorageBasePath.replace("{username}", self.username) @@ -47,7 +49,6 @@ class StoreExecutor(TaskExecutor): self.tapeClient.copy(srcPathPrefix + '/' + el, destPathPrefix + '/' + el) self.tapeClient.disconnect() else: - self.dbConn.connect() destPathPrefix = self.dbConn.getStorageBasePath(self.storageId) + '/' + self.username self.dbConn.disconnect() #destPathPrefix = self.serverStorageBasePath.replace("{username}", self.username) @@ -80,6 +81,8 @@ class StoreExecutor(TaskExecutor): self.dbConn.setAsyncTrans(nodeVOSPath, True); self.dbConn.setSticky(nodeVOSPath, True); self.dbConn.setBusyState(nodeVOSPath, False); + self.jobObj.setPhase("COMPLETED") + self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.disconnect() def run(self): @@ -90,6 +93,7 @@ class StoreExecutor(TaskExecutor): self.wait() if self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() + self.jobId = self.jobObj.jobId self.username = self.jobObj.jobInfo["userName"] self.requestType = self.jobObj.jobInfo["requestType"] self.storageId = self.jobObj.jobInfo["storageId"] @@ -107,5 +111,6 @@ class StoreExecutor(TaskExecutor): self.copyData() self.cleanup() self.update() - self.srcQueue.moveJobTo(self.destQueue.name()) + self.destQueue.insertJob(self.jobObj) + self.srcQueue.extractJob() print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index 9f1ef8d..28e1b49 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -37,6 +37,7 @@ class StorePreprocessor(TaskExecutor): self.storageStorePath = self.params["store_path"] self.storageId = None self.jobObj = None + self.jobId = None self.username = None self.userId = None self.nodeList = [] @@ -201,6 +202,12 @@ class StorePreprocessor(TaskExecutor): out.write("\n") out.close() self.dbConn.disconnect() + + def update(self): + self.jobObj.setPhase("QUEUED") + self.dbConn.connect() + self.dbConn.setPhase(self.jobId, "QUEUED") + self.dbConn.disconnect() def run(self): print("Starting store preprocessor...") @@ -210,13 +217,15 @@ class StorePreprocessor(TaskExecutor): self.wait() if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() + self.jobId = self.jobObj.jobId self.storageId = self.jobObj.jobInfo["storageId"] self.username = self.jobObj.jobInfo["userName"] self.prepare(self.username) self.execute() + self.update() self.jobObj.jobInfo["nodeList"] = self.nodeList - self.srcQueue.extractJob() self.destQueue.insertJob(self.jobObj) + self.srcQueue.extractJob() print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") # Test -- GitLab