From 1e4c0cf4219af2f6f784949eef88d02b3b9eb075 Mon Sep 17 00:00:00 2001 From: Cristiano Urban Date: Mon, 20 Sep 2021 13:43:18 +0200 Subject: [PATCH] Added VOSpace nodes cleanup procedure on 'vos_data' job failure. Signed-off-by: Cristiano Urban --- transfer_service/db_connector.py | 18 ++++++++++++++++++ transfer_service/store_executor.py | 3 +++ transfer_service/store_preprocessor.py | 3 +++ 3 files changed, 24 insertions(+) diff --git a/transfer_service/db_connector.py b/transfer_service/db_connector.py index 3ab6866..0a36dbe 100644 --- a/transfer_service/db_connector.py +++ b/transfer_service/db_connector.py @@ -995,6 +995,24 @@ class DbConnector(object): return False finally: self.connPool.putconn(conn, close = False) + + def deleteNodesByJobId(self, jobId): + """Deletes all VOSpace nodes having a certain 'job_id'.""" + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + DELETE FROM node WHERE job_id = %s; + """, + (jobId,)) + conn.commit() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + finally: + self.connPool.putconn(conn, close = False) def setAsyncTrans(self, nodeVOSPath, value): """Sets the 'async_trans' flag for a VOSpace node.""" diff --git a/transfer_service/store_executor.py b/transfer_service/store_executor.py index a344082..9646e6d 100644 --- a/transfer_service/store_executor.py +++ b/transfer_service/store_executor.py @@ -195,6 +195,9 @@ class StoreExecutor(TaskExecutor): self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.logger.info("Job phase updated to ERROR.") + self.logger.info("Removing VOSpace nodes from the database...") + self.dbConn.deleteNodesByJobId(self.jobId) + self.logger.info("Database cleanup completed") msg = f""" ########## VOSpace data storage procedure summary ########## diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index 3d573d8..9ffe24e 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -253,6 +253,9 @@ class StorePreprocessor(TaskExecutor): self.dbConn.insertJob(self.jobObj) self.setDestinationQueueName("write_terminated") self.logger.info("Job phase updated to ERROR.") + self.logger.info("Removing VOSpace nodes from the database...") + self.dbConn.deleteNodesByJobId(self.jobId) + self.logger.info("Database cleanup completed") msg = f""" Dear user, -- GitLab