diff --git a/transfer_service/db_connector.py b/transfer_service/db_connector.py index 3ab6866063a5356eb0116ef24925eb7d02a54a63..0a36dbe668a7626b07668c30629c41739874752c 100644 --- a/transfer_service/db_connector.py +++ b/transfer_service/db_connector.py @@ -995,6 +995,24 @@ class DbConnector(object): return False finally: self.connPool.putconn(conn, close = False) + + def deleteNodesByJobId(self, jobId): + """Deletes all VOSpace nodes having a certain 'job_id'.""" + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + DELETE FROM node WHERE job_id = %s; + """, + (jobId,)) + conn.commit() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + finally: + self.connPool.putconn(conn, close = False) def setAsyncTrans(self, nodeVOSPath, value): """Sets the 'async_trans' flag for a VOSpace node.""" diff --git a/transfer_service/store_executor.py b/transfer_service/store_executor.py index a344082932aef6bd5cef6b17fa7e8239acbdd5e5..9646e6d18df32ac7f463e4180a7133463c671ae9 100644 --- a/transfer_service/store_executor.py +++ b/transfer_service/store_executor.py @@ -195,6 +195,9 @@ class StoreExecutor(TaskExecutor): self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.logger.info("Job phase updated to ERROR.") + self.logger.info("Removing VOSpace nodes from the database...") + self.dbConn.deleteNodesByJobId(self.jobId) + self.logger.info("Database cleanup completed") msg = f""" ########## VOSpace data storage procedure summary ########## diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index 3d573d877419eb60330ca281209c87b932d37869..9ffe24ef1fc6c39174aeb5ad5809e24ed99a7afa 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -253,6 +253,9 @@ class StorePreprocessor(TaskExecutor): self.dbConn.insertJob(self.jobObj) self.setDestinationQueueName("write_terminated") self.logger.info("Job phase updated to ERROR.") + self.logger.info("Removing VOSpace nodes from the database...") + self.dbConn.deleteNodesByJobId(self.jobId) + self.logger.info("Database cleanup completed") msg = f""" Dear user,