diff --git a/transfer_service/db_connector.py b/transfer_service/db_connector.py index 8b404def02aa1aebdbf7cc9b8b0c92f71e2dd3b9..3fc13669dc59022aafa5a6d03cdc8f73f12f86f2 100644 --- a/transfer_service/db_connector.py +++ b/transfer_service/db_connector.py @@ -713,10 +713,11 @@ class DbConnector(object): start_time, end_time, job_info, + node_list, results, error_message, error_type) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (job_id) DO UPDATE SET (owner_id, @@ -725,6 +726,7 @@ class DbConnector(object): start_time, end_time, job_info, + node_list, results, error_message, error_type) @@ -734,6 +736,7 @@ class DbConnector(object): EXCLUDED.start_time, EXCLUDED.end_time, EXCLUDED.job_info, + EXCLUDED.node_list, EXCLUDED.results, EXCLUDED.error_message, EXCLUDED.error_type); @@ -745,6 +748,7 @@ class DbConnector(object): jobObj.startTime, jobObj.endTime, json.dumps(jobObj.jobInfo), + json.dumps(jobObj.nodeList), json.dumps(jobObj.results), jobObj.errorMessage, jobObj.errorType,)) diff --git a/transfer_service/job.py b/transfer_service/job.py index 15f004b926c4889daec8914b548a23636bfe9d3a..2cf8c0bb953ce4053c3f7608f1f3993e978a3800 100644 --- a/transfer_service/job.py +++ b/transfer_service/job.py @@ -17,6 +17,7 @@ class Job(object): self.executionDuration = None self.destruction = None self.parameters = None + self.nodeList = None self.results = None self.errorMessage = None self.errorType = None @@ -50,6 +51,9 @@ class Job(object): def setExecutionDuration(self, executionDuration): self.executionDuration = executionDuration + def setNodeList(self, nodeList): + self.nodeList = nodeList + def setResults(self, results): self.results = results diff --git a/transfer_service/job_queue.py b/transfer_service/job_queue.py index 8699fb4ae899964f2619b5589999be6f6a4e9d17..c91b00734aa16fa188547ebea4783cbde05d140a 100644 --- a/transfer_service/job_queue.py +++ b/transfer_service/job_queue.py @@ -68,6 +68,7 @@ class JobQueue(object): jobObj.setEndTime(job["endTime"]) jobObj.setExecutionDuration(job["executionDuration"]) jobObj.setInfo(job["jobInfo"]) + jobObj.setNodeList(job["nodeList"]) except ConnectionError: if retry > 0: retry -= 1 @@ -96,7 +97,8 @@ class JobQueue(object): "destruction": jobObj.destruction, "parameters": jobObj.parameters, "results": jobObj.results, - "jobInfo": jobObj.jobInfo } + "jobInfo": jobObj.jobInfo, + "nodeList": jobObj.nodeList } while True: try: self.redisCli.lpush(self.queueName, json.dumps(data)) @@ -139,6 +141,7 @@ class JobQueue(object): jobObj.setEndTime(job["endTime"]) jobObj.setExecutionDuration(job["executionDuration"]) jobObj.setInfo(job["jobInfo"]) + jobObj.setNodeList(job["nodeList"]) return jobObj def moveJobTo(self, nextQueueName): diff --git a/transfer_service/retrieve_cleaner.py b/transfer_service/retrieve_cleaner.py index 6815d1ffc6dffd201f8ffc8aee6c0b06d0bfe3d7..af9621ffecd9784a7c7d97993ea9388eefe26ca8 100644 --- a/transfer_service/retrieve_cleaner.py +++ b/transfer_service/retrieve_cleaner.py @@ -98,7 +98,7 @@ class RetrieveCleaner(TaskExecutor): self.wait() if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() - self.nodeList = self.jobObj.jobInfo["nodeList"].copy() + self.nodeList = self.jobObj.nodeList.copy() self.destPathList = self.jobObj.jobInfo["destPathList"].copy() nodeInfo = self.dbConn.getOSPath(self.nodeList[0]) self.username = nodeInfo["username"] diff --git a/transfer_service/retrieve_executor.py b/transfer_service/retrieve_executor.py index f491de1bae41e13edd5f0973c0f197fce90b1f08..9a03b6fa3acc3c62188c3e3ed0d51cf7ace3bfbd 100644 --- a/transfer_service/retrieve_executor.py +++ b/transfer_service/retrieve_executor.py @@ -319,8 +319,7 @@ class RetrieveExecutor(TaskExecutor): if self.srcQueue.len() > 0: self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId - self.nodeList = self.jobObj.jobInfo["nodeList"].copy() - self.jobObj.jobInfo.pop("nodeList") + self.nodeList = self.jobObj.nodeList.copy() self.buildFileList() self.buildBlocks() result = self.retrieveData() diff --git a/transfer_service/retrieve_preprocessor.py b/transfer_service/retrieve_preprocessor.py index f510f51bb9a35dd58e993eaa42e620996dc847b4..a65d45dd35931068ddc45f1cee87592377c2ecc2 100644 --- a/transfer_service/retrieve_preprocessor.py +++ b/transfer_service/retrieve_preprocessor.py @@ -48,7 +48,8 @@ class RetrievePreprocessor(TaskExecutor): else: for el in params: self.nodeList.append(target + '/' + el["value"]) - self.jobObj.jobInfo["nodeList"] = self.nodeList.copy() + self.jobObj.nodeList = self.nodeList.copy() + self.dbConn.insertJob(self.jobObj) def update(self, status): m = Mailer(self.logger)