diff --git a/transfer_service/db_connector.py b/transfer_service/db_connector.py index 53b4346b025a86b0b8c1f77ee4cf88ffddbdbaee..8b404def02aa1aebdbf7cc9b8b0c92f71e2dd3b9 100644 --- a/transfer_service/db_connector.py +++ b/transfer_service/db_connector.py @@ -2,8 +2,10 @@ import datetime import json +import logging import psycopg2 import sys +import time from contextlib import contextmanager from psycopg2.extras import RealDictCursor @@ -22,6 +24,7 @@ class DbConnector(object): self.dbname = dbname self.minConnNum = minConnNum self.maxConnNum = maxConnNum + self.connPool = None self.createConnectionPool() def createConnectionPool(self): @@ -33,17 +36,33 @@ class DbConnector(object): host = self.host, database = self.dbname, port = self.port, - connect_timeout = 2 + connect_timeout = 0 ) - @contextmanager - def getConnection(self): - conn = self.connPool.getconn() - #conn.autocommit = True - try: - yield conn - finally: - self.connPool.putconn(conn, close = False) + def getConnection(self, retry = 5, timeout = 2): + if retry < 1: + retry = 1 + if timeout < 1: + timeout = 1 + conn = None + while retry > 0: + try: + logging.warning(f"Getting connection from pool: retry = {retry}") + conn = self.connPool.getconn() + conn.reset() + except Exception: + logging.warning(f"Unable to get a connection from pool: retry = {retry}") + if conn is not None: + self.connPool.putconn(conn, close = False) + conn = None + time.sleep(timeout) + retry -= 1 + if retry == 0: + raise + else: + logging.warning(f"Connection established: retry = {retry}") + return conn + """ Getters @@ -53,292 +72,376 @@ class DbConnector(object): def nodeExists(self, vospacePath): """Checks if a VOSpace node already exists. Returns a boolean.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute("SELECT * FROM node_vos_path WHERE vos_path = %s;", (vospacePath,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT * FROM node_vos_path WHERE vos_path = %s;", (vospacePath,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() raise + else: + if result: + return True else: - if result: - return True - else: - return False + return False + finally: + self.connPool.putconn(conn, close = False) def getCreatorId(self, vospacePath): """Returns the creator ID for a given vospace path representing a node.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - SELECT creator_id - FROM node_vos_path nvp - JOIN node n ON nvp.node_id = n.node_id - WHERE vos_path = %s; - """, - (vospacePath,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise - else: - return result[0]["creator_id"] + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + SELECT creator_id + FROM node_vos_path nvp + JOIN node n ON nvp.node_id = n.node_id + WHERE vos_path = %s; + """, + (vospacePath,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + return result[0]["creator_id"] + finally: + self.connPool.putconn(conn, close = False) def getGroupRead(self, vospacePath): """Returns the 'group_read' for a given VOSpace path representing a node.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - SELECT unnest(group_read) as group_read - FROM node_vos_path nvp - JOIN node n ON nvp.node_id = n.node_id - WHERE vos_path = %s; - """, - (vospacePath,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise - else: - for i in range(0, len(result)): - result[i]["group_read"] = result[i]["group_read"].split("people.")[-1].replace("\\", "") - return result + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + SELECT unnest(group_read) as group_read + FROM node_vos_path nvp + JOIN node n ON nvp.node_id = n.node_id + WHERE vos_path = %s; + """, + (vospacePath,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + for i in range(0, len(result)): + result[i]["group_read"] = result[i]["group_read"].split("people.")[-1].replace("\\", "") + return result + finally: + self.connPool.putconn(conn, close = False) def getGroupWrite(self, vospacePath): """Returns the 'group_write' for a given VOSpace path representing a node.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - SELECT unnest(group_write) as group_write - FROM node_vos_path nvp - JOIN node n ON nvp.node_id = n.node_id - WHERE vos_path = %s; - """, - (vospacePath,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise - else: - for i in range(0, len(result)): - result[i]["group_write"] = result[i]["group_write"].split("people.")[-1].replace("\\", "") - return result + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + SELECT unnest(group_write) as group_write + FROM node_vos_path nvp + JOIN node n ON nvp.node_id = n.node_id + WHERE vos_path = %s; + """, + (vospacePath,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + for i in range(0, len(result)): + result[i]["group_write"] = result[i]["group_write"].split("people.")[-1].replace("\\", "") + return result + finally: + self.connPool.putconn(conn, close = False) def getOSPath(self, vospacePath): """Returns a list containing full path, storage type and username for a VOSpace path.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, get_os_path(n.node_id) AS os_path, content_length - FROM node n - JOIN location l ON n.location_id = l.location_id - JOIN storage s ON s.storage_id = l.storage_src_id - JOIN users u ON u.user_id = n.creator_id - WHERE n.node_id = id_from_vos_path(%s); - """, - (vospacePath,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, get_os_path(n.node_id) AS os_path, content_length + FROM node n + JOIN location l ON n.location_id = l.location_id + JOIN storage s ON s.storage_id = l.storage_src_id + JOIN users u ON u.user_id = n.creator_id + WHERE n.node_id = id_from_vos_path(%s); + """, + (vospacePath,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + storageType = result[0]["storage_type"] + basePath = result[0]["base_path"] + userName = result[0]["user_name"] + tstampWrappedDir = result[0]["tstamp_wrapper_dir"] + osPath = result[0]["os_path"] + contentLength = result[0]["content_length"] + if tstampWrappedDir is None: + baseSrcPath = basePath + "/" + userName else: - storageType = result[0]["storage_type"] - basePath = result[0]["base_path"] - userName = result[0]["user_name"] - tstampWrappedDir = result[0]["tstamp_wrapper_dir"] - osPath = result[0]["os_path"] - contentLength = result[0]["content_length"] - if tstampWrappedDir is None: - baseSrcPath = basePath + "/" + userName - else: - baseSrcPath = basePath + "/" + userName + "/" + tstampWrappedDir - fullPath = baseSrcPath + osPath - fileInfo = { - "baseSrcPath": baseSrcPath, - "fullPath": fullPath, - "storageType": storageType, - "username": userName, - "osPath": osPath, - "contentLength": contentLength - } - return fileInfo + baseSrcPath = basePath + "/" + userName + "/" + tstampWrappedDir + fullPath = baseSrcPath + osPath + fileInfo = { + "baseSrcPath": baseSrcPath, + "fullPath": fullPath, + "storageType": storageType, + "username": userName, + "osPath": osPath, + "contentLength": contentLength + } + return fileInfo + finally: + self.connPool.putconn(conn, close = False) def getVOSpacePathList(self, vospacePath): """Returns the list of VOSpace paths carried by a VOSpace node, according to the node VOSpace path.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - SELECT get_vos_path(n.node_id) - FROM node n - JOIN list_of_files l ON l.node_id = n.node_id - WHERE l.list_node_id = id_from_vos_path(%s); - """, - (vospacePath,)) - results = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise - else: - vospacePathList = [] - for el in results: - vospacePathList.append(el["vos_path"]) - return vospacePathList + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + SELECT get_vos_path(n.node_id) + FROM node n + JOIN list_of_files l ON l.node_id = n.node_id + WHERE l.list_node_id = id_from_vos_path(%s); + """, + (vospacePath,)) + results = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + vospacePathList = [] + for el in results: + vospacePathList.append(el["vos_path"]) + return vospacePathList + finally: + self.connPool.putconn(conn, close = False) def getNodesToBeDeleted(self): "Returns a path list of files to be deleted with also the corresponding deletion timestamp." - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - WITH RECURSIVE all_nodes AS ( - SELECT node_id, name, os_name, relative_path, parent_relative_path - FROM node - UNION - SELECT node_id, name, os_name, path(parent_relative_path, node_id) AS relative_path, parent_relative_path - FROM deleted_node - ), del AS ( - SELECT COALESCE(os_name, name) AS os_name, 1 AS level, node_id AS deleted_node_id, - path(parent_relative_path, node_id) AS relative_path, parent_relative_path - FROM deleted_node WHERE phy_deleted_on IS NULL - UNION ALL - SELECT COALESCE(n.os_name, n.name), d.level + 1, d.deleted_node_id, - n.relative_path, n.parent_relative_path - FROM all_nodes n JOIN del d ON n.relative_path = d.parent_relative_path - WHERE n.parent_relative_path IS NOT NULL - ), paths_to_delete AS - (SELECT deleted_node_id, '/' || STRING_AGG(os_name, '/' ORDER BY LEVEL DESC) AS os_path - FROM del GROUP BY deleted_node_id) - SELECT base_path || '/' || creator_id as os_base_path, os_path AS os_rel_path, deleted_on, d.node_id - FROM paths_to_delete p - JOIN deleted_node d ON d.node_id = p.deleted_node_id - JOIN location l ON d.location_id = l.location_id - JOIN storage s ON s.storage_id = l.storage_src_id; - """) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise - else: - return result + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + WITH RECURSIVE all_nodes AS ( + SELECT node_id, name, os_name, relative_path, parent_relative_path + FROM node + UNION + SELECT node_id, name, os_name, path(parent_relative_path, node_id) AS relative_path, parent_relative_path + FROM deleted_node + ), del AS ( + SELECT COALESCE(os_name, name) AS os_name, 1 AS level, node_id AS deleted_node_id, + path(parent_relative_path, node_id) AS relative_path, parent_relative_path + FROM deleted_node WHERE phy_deleted_on IS NULL + UNION ALL + SELECT COALESCE(n.os_name, n.name), d.level + 1, d.deleted_node_id, + n.relative_path, n.parent_relative_path + FROM all_nodes n JOIN del d ON n.relative_path = d.parent_relative_path + WHERE n.parent_relative_path IS NOT NULL + ), paths_to_delete AS + (SELECT deleted_node_id, '/' || STRING_AGG(os_name, '/' ORDER BY LEVEL DESC) AS os_path + FROM del GROUP BY deleted_node_id) + SELECT base_path || '/' || creator_id as os_base_path, os_path AS os_rel_path, deleted_on, d.node_id + FROM paths_to_delete p + JOIN deleted_node d ON d.node_id = p.deleted_node_id + JOIN location l ON d.location_id = l.location_id + JOIN storage s ON s.storage_id = l.storage_src_id; + """) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + return result + finally: + self.connPool.putconn(conn, close = False) def nodeIsBusy(self, vospacePath): """Returns 'True' if the VOSpace node is busy, 'False' otherwise.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - SELECT job_id - FROM node - WHERE node_id = id_from_vos_path(%s); - """, - (vospacePath,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + SELECT job_id + FROM node + WHERE node_id = id_from_vos_path(%s); + """, + (vospacePath,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + if result[0]["job_id"]: + return True else: - if result[0]["job_id"]: - return True - else: - return False + return False + finally: + self.connPool.putconn(conn, close = False) ##### Job ##### def jobExists(self, jobId): """Checks if a job already exists. Returns a boolean.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + if result: + return True else: - if result: - return True - else: - return False + return False + finally: + self.connPool.putconn(conn, close = False) def getJob(self, jobId): """Returns a JSON object containing job information, according to the job id.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) - #out = open("db_connector_log.txt", "a") - result = cursor.fetchall() - #out.write(f"result: {result}\n\n") - #out.close() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + if not result: + return json.loads('{ "error": "JOB_NOT_FOUND" }') else: - if not result: - return json.loads('{ "error": "JOB_NOT_FOUND" }') - else: - job = dict() - for idx in result[0]: - oldIdx = idx - idxTokens = idx.split('_') - idx = idxTokens[0] + ''.join(token.title() for token in idxTokens[1:]) - job[idx] = result[0][oldIdx] - el = job[idx] - if isinstance(el, datetime.datetime): - job[idx] = el.isoformat() - #out.write(f"job: {job}\n\n") - #out.close() - return job + job = dict() + for idx in result[0]: + oldIdx = idx + idxTokens = idx.split('_') + idx = idxTokens[0] + ''.join(token.title() for token in idxTokens[1:]) + job[idx] = result[0][oldIdx] + el = job[idx] + if isinstance(el, datetime.datetime): + job[idx] = el.isoformat() + return job + finally: + self.connPool.putconn(conn, close = False) + + def getJobPhase(self, jobId): + """Returns the 'phase' field, according to the UWS specification.""" + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT phase FROM job WHERE job_id = %s;", (jobId,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + return result[0]["phase"] + finally: + self.connPool.putconn(conn, close = False) def getJobInfo(self, jobId): """Returns the 'job_info' field, according to the UWS specification.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute("SELECT job_info FROM job WHERE job_id = %s;", (jobId,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise - else: - return result[0]["job_info"] + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT job_info FROM job WHERE job_id = %s;", (jobId,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + return result[0]["job_info"] + finally: + self.connPool.putconn(conn, close = False) def getJobResults(self, jobId): """Returns the 'results' field, according to the UWS specification.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute("SELECT results FROM job WHERE job_id = %s;", (jobId,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise - else: - return result[0]["results"] + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT results FROM job WHERE job_id = %s;", (jobId,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + return result[0]["results"] + finally: + self.connPool.putconn(conn, close = False) def listActiveJobs(self): """Returns some info about active jobs.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + SELECT job_id, + job_type, + phase, + creation_time, + start_time, + owner_id + FROM job + WHERE phase + NOT IN ('ABORTED', + 'COMPLETED', + 'ERROR') + ORDER BY creation_time DESC; + """) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + for row in result: + for idx in row: + el = row[idx] + if isinstance(el, datetime.datetime): + row[idx] = el.isoformat() + return result + finally: + self.connPool.putconn(conn, close = False) + + def listJobsByPhase(self, phase): + """Returns some info about jobs according to the phase.""" + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + if phase in [ "PENDING", "QUEUED", "EXECUTING" ]: cursor.execute(""" SELECT job_id, job_type, @@ -347,245 +450,248 @@ class DbConnector(object): start_time, owner_id FROM job - WHERE phase - NOT IN ('ABORTED', - 'COMPLETED', - 'ERROR') + WHERE phase = %s ORDER BY creation_time DESC; - """) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise - else: - for row in result: - for idx in row: - el = row[idx] - if isinstance(el, datetime.datetime): - row[idx] = el.isoformat() - return result - - def listJobsByPhase(self, phase): - """Returns some info about jobs according to the phase.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - if phase in [ "PENDING", "QUEUED", "EXECUTING" ]: - cursor.execute(""" - SELECT job_id, - job_type, - phase, - creation_time, - start_time, - owner_id - FROM job - WHERE phase = %s - ORDER BY creation_time DESC; - """, - (phase,)) - else: - cursor.execute(""" - SELECT job_id, - job_type, - phase, - start_time, - end_time, - owner_id - FROM job - WHERE phase = %s - ORDER BY creation_time DESC; - """, - (phase,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise + """, + (phase,)) else: - for row in result: - for idx in row: - el = row[idx] - if isinstance(el, datetime.datetime): - row[idx] = el.isoformat() - return result + cursor.execute(""" + SELECT job_id, + job_type, + phase, + start_time, + end_time, + owner_id + FROM job + WHERE phase = %s + ORDER BY creation_time DESC; + """, + (phase,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + for row in result: + for idx in row: + el = row[idx] + if isinstance(el, datetime.datetime): + row[idx] = el.isoformat() + return result + finally: + self.connPool.putconn(conn, close = False) ##### User ##### def userExists(self, username): """Checks if a user already exists. Returns a boolean.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute("SELECT * FROM users WHERE user_name = %s;", (username,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT * FROM users WHERE user_name = %s;", (username,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + if result: + return True else: - if result: - return True - else: - return False + return False + finally: + self.connPool.putconn(conn, close = False) def getUserId(self, username): """Returns the user id for a given user name.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute("SELECT user_id FROM users WHERE user_name = %s;", (username,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise - else: - return result[0]["user_id"] + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT user_id FROM users WHERE user_name = %s;", (username,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + return result[0]["user_id"] + finally: + self.connPool.putconn(conn, close = False) def getUserName(self, userId): """Returns the user name for a given user id.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute("SELECT user_name FROM users WHERE user_id = %s;", (userId,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise - else: - return result[0]["user_name"] + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT user_name FROM users WHERE user_id = %s;", (userId,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + return result[0]["user_name"] + finally: + self.connPool.putconn(conn, close = False) def getUserEmail(self, userId): """Returns the user email address for a given user id.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute("SELECT e_mail FROM users WHERE user_id = %s;", (userId,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise - else: - return result[0]["e_mail"] + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT e_mail FROM users WHERE user_id = %s;", (userId,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + return result[0]["e_mail"] + finally: + self.connPool.putconn(conn, close = False) ##### Storage ##### def storageBasePathIsValid(self, path): """Checks if the base path of a physical path is valid. If true, returns the base path, else returns 'False'.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - SELECT base_path - FROM storage - WHERE position(base_path in cast(%s as varchar)) > 0; - """, - (path,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + SELECT base_path + FROM storage + WHERE position(base_path in cast(%s as varchar)) > 0; + """, + (path,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + if result: + return result[0]["base_path"] else: - if result: - return result[0]["base_path"] - else: - return False + return False + finally: + self.connPool.putconn(conn, close = False) def getStorageBasePath(self, storageId): """Returns the storage base path for a give storage id.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute("SELECT base_path FROM storage WHERE storage_id = %s;", (storageId,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise - else: - return result[0]["base_path"] + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT base_path FROM storage WHERE storage_id = %s;", (storageId,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + return result[0]["base_path"] + finally: + self.connPool.putconn(conn, close = False) def getStorageList(self): """Returns the full storage base list. Local storage points are excluded by default.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute("SELECT * FROM storage WHERE storage_type <> 'local';") - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise - else: - return result + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT * FROM storage WHERE storage_type <> 'local';") + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + return result + finally: + self.connPool.putconn(conn, close = False) def getStorageListByType(self, storageType): """Returns a list of storage locations for a given storage type.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute("SELECT * FROM storage WHERE storage_type = %s;", (storageType,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise - else: - return result + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT * FROM storage WHERE storage_type = %s;", (storageType,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + return result + finally: + self.connPool.putconn(conn, close = False) def getStorageType(self, basePath): """Returns the storage type for a given storage base path, if any. Otherwise it returns 'False'.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute("SELECT storage_type FROM storage WHERE base_path = %s;", (basePath,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT storage_type FROM storage WHERE base_path = %s;", (basePath,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + if result: + return result[0]["storage_type"] else: - if result: - return result[0]["storage_type"] - else: - return False + return False + finally: + self.connPool.putconn(conn, close = False) def getStorageId(self, basePath): """Returns the storage id for a given storage base path, if any. Otherwise it returns 'False'.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute("SELECT storage_id FROM storage WHERE base_path = %s;", (basePath,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT storage_id FROM storage WHERE base_path = %s;", (basePath,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + if result: + return result[0]["storage_id"] else: - if result: - return result[0]["storage_id"] - else: - return False + return False + finally: + self.connPool.putconn(conn, close = False) ##### Location ##### def getLocationId(self, destStorageId): """Returns the location id according to the storage id of the destination.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute("SELECT location_id FROM location WHERE storage_src_id = %s;", (destStorageId,)) - result = cursor.fetchall() - except Exception: - if not conn.closed: - conn.rollback() - raise - else: - return result[0]["location_id"] + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT location_id FROM location WHERE storage_src_id = %s;", (destStorageId,)) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + return result[0]["location_id"] + finally: + self.connPool.putconn(conn, close = False) """ @@ -596,450 +702,488 @@ class DbConnector(object): def insertJob(self, jobObj): """Inserts/updates a job object.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - INSERT INTO job(job_id, - owner_id, - job_type, - phase, - start_time, - end_time, - job_info, - results, - error_message, - error_type) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - ON CONFLICT (job_id) - DO UPDATE SET - (owner_id, - job_type, - phase, - start_time, - end_time, - job_info, - results, - error_message, - error_type) - = (EXCLUDED.owner_id, - EXCLUDED.job_type, - EXCLUDED.phase, - EXCLUDED.start_time, - EXCLUDED.end_time, - EXCLUDED.job_info, - EXCLUDED.results, - EXCLUDED.error_message, - EXCLUDED.error_type); - """, - (jobObj.jobId, - jobObj.ownerId, - jobObj.type, - jobObj.phase, - jobObj.startTime, - jobObj.endTime, - json.dumps(jobObj.jobInfo), - json.dumps(jobObj.results), - jobObj.errorMessage, - jobObj.errorType,)) - conn.commit() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + INSERT INTO job(job_id, + owner_id, + job_type, + phase, + start_time, + end_time, + job_info, + results, + error_message, + error_type) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (job_id) + DO UPDATE SET + (owner_id, + job_type, + phase, + start_time, + end_time, + job_info, + results, + error_message, + error_type) + = (EXCLUDED.owner_id, + EXCLUDED.job_type, + EXCLUDED.phase, + EXCLUDED.start_time, + EXCLUDED.end_time, + EXCLUDED.job_info, + EXCLUDED.results, + EXCLUDED.error_message, + EXCLUDED.error_type); + """, + (jobObj.jobId, + jobObj.ownerId, + jobObj.type, + jobObj.phase, + jobObj.startTime, + jobObj.endTime, + json.dumps(jobObj.jobInfo), + json.dumps(jobObj.results), + jobObj.errorMessage, + jobObj.errorType,)) + conn.commit() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + finally: + self.connPool.putconn(conn, close = False) def setStartTime(self, jobId): """Sets the job 'start_time' parameter.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - startTime = datetime.datetime.today().isoformat() - cursor.execute(""" - UPDATE job SET start_time = %s - WHERE job_id = %s; - """, - (startTime, jobId,)) - conn.commit() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + startTime = datetime.datetime.today().isoformat() + cursor.execute(""" + UPDATE job SET start_time = %s + WHERE job_id = %s; + """, + (startTime, jobId,)) + conn.commit() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + finally: + self.connPool.putconn(conn, close = False) def setEndTime(self, jobId): """Sets the job 'end_time' parameter.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - endTime = datetime.datetime.today().isoformat() - cursor.execute(""" - UPDATE job SET end_time = %s - WHERE job_id = %s; - """, - (endTime, jobId,)) - conn.commit() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + endTime = datetime.datetime.today().isoformat() + cursor.execute(""" + UPDATE job SET end_time = %s + WHERE job_id = %s; + """, + (endTime, jobId,)) + conn.commit() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + finally: + self.connPool.putconn(conn, close = False) def setPhase(self, jobId, phase): """Sets the job 'phase' parameter.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - UPDATE job SET phase = %s - WHERE job_id = %s; - """, - (phase, jobId,)) - conn.commit() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + UPDATE job SET phase = %s + WHERE job_id = %s; + """, + (phase, jobId,)) + conn.commit() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + finally: + self.connPool.putconn(conn, close = False) def setTotalBlocks(self, jobId, totalBlocks): """ Sets the job 'total_blocks' parameter for a data retrieve operation. """ - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - UPDATE job SET total_blocks = %s - WHERE job_id = %s; - """, - (totalBlocks, jobId,)) - conn.commit() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + UPDATE job SET total_blocks = %s + WHERE job_id = %s; + """, + (totalBlocks, jobId,)) + conn.commit() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + finally: + self.connPool.putconn(conn, close = False) def updateProcessedBlocks(self, jobId, processedBlocks): """ Updates the job 'processed_blocks' parameter for a data retrieve operation. """ - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - UPDATE job SET processed_blocks = %s - WHERE job_id = %s; - """, - (processedBlocks, jobId,)) - conn.commit() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + UPDATE job SET processed_blocks = %s + WHERE job_id = %s; + """, + (processedBlocks, jobId,)) + conn.commit() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + finally: + self.connPool.putconn(conn, close = False) def setResults(self, jobId, results): """Sets the job 'results' parameter.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - UPDATE job SET results = %s - WHERE job_id = %s; - """, - (json.dumps(results), - jobId,)) - conn.commit() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + UPDATE job SET results = %s + WHERE job_id = %s; + """, + (json.dumps(results), + jobId,)) + conn.commit() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + finally: + self.connPool.putconn(conn, close = False) ##### Node ##### def insertNode(self, node): """Inserts a VOSpace node. Returns 'True' on success, 'False' otherwise.""" - with self.getConnection() as conn: + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + SELECT path FROM node WHERE node_id = id_from_vos_path(%s); + """, + (node.parentPath,)) + result = cursor.fetchall() + parentLtreePath = result[0]["path"] + parentLtreeRelativePath = "" + if "." in parentLtreePath: + parentLtreeRelativePath = ".".join(parentLtreePath.strip(".").split('.')[1:]) + except Exception: + if not conn.closed: + conn.rollback() + raise + else: try: - #out = open("db_connector_log.txt", "a") - #out.write(f"parentOSPath: {node.parentPath}\n") - #out.write(f"name: {node.name}\n") - cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" - SELECT path FROM node WHERE node_id = id_from_vos_path(%s); + INSERT INTO node(parent_path, + parent_relative_path, + name, + tstamp_wrapper_dir, + type, + location_id, + async_trans, + sticky, + job_id, + creator_id, + content_length, + content_md5) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT + DO NOTHING + RETURNING node_id; """, - (node.parentPath,)) + (parentLtreePath, + parentLtreeRelativePath, + node.name, + node.wrapperDir, + node.type, + node.locationId, + node.asyncTrans, + node.sticky, + node.jobId, + node.creatorId, + node.contentLength, + node.contentMD5,)) result = cursor.fetchall() - #for i in result: - # out.write(f"queryResult: {i}\n") - parentLtreePath = result[0]["path"] - parentLtreeRelativePath = "" - if "." in parentLtreePath: - parentLtreeRelativePath = ".".join(parentLtreePath.strip(".").split('.')[1:]) - #out.write(f"parentLtreeRelativePath: {parentLtreeRelativePath}\n") - #out.write(f"parentLtreePath: {parentLtreePath}\n") - #out.write(f"parentPath: {node.parentPath}\n\n") - #out.close() + conn.commit() + cursor.close() except Exception: if not conn.closed: conn.rollback() raise - else: - try: - #print(f"parentLtreePath: {parentLtreePath}, type: {type(parentLtreePath)}") - cursor.execute(""" - INSERT INTO node(parent_path, - parent_relative_path, - name, - tstamp_wrapper_dir, - type, - location_id, - async_trans, - sticky, - job_id, - creator_id, - content_length, - content_md5) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - ON CONFLICT - DO NOTHING - RETURNING node_id; - """, - (parentLtreePath, - parentLtreeRelativePath, - node.name, - node.wrapperDir, - node.type, - node.locationId, - node.asyncTrans, - node.sticky, - node.jobId, - node.creatorId, - node.contentLength, - node.contentMD5,)) - result = cursor.fetchall() - conn.commit() - except Exception: - if not conn.closed: - conn.rollback() - raise + else: + if result: + return True else: - if result: - return True - else: - return False + return False + finally: + self.connPool.putconn(conn, close = False) def setAsyncTrans(self, nodeVOSPath, value): """Sets the 'async_trans' flag for a VOSpace node.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - UPDATE node c SET async_trans = %s - FROM node n - WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); - """, - (value, nodeVOSPath,)) - conn.commit() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + UPDATE node c SET async_trans = %s + FROM node n + WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); + """, + (value, nodeVOSPath,)) + conn.commit() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + finally: + self.connPool.putconn(conn, close = False) def setJobId(self, nodeVOSPath, value): """Sets the 'job_id' flag for a VOSpace node.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - UPDATE node c SET job_id = %s - FROM node n - WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); - """, - (value, nodeVOSPath,)) - conn.commit() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + UPDATE node c SET job_id = %s + FROM node n + WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); + """, + (value, nodeVOSPath,)) + conn.commit() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + finally: + self.connPool.putconn(conn, close = False) def setPhyDeletedOn(self, nodeId): """Sets the 'phy_deleted_on' flag for a VOSpace deleted node.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - phyDeletedOn = datetime.datetime.now().isoformat() - cursor.execute(""" - UPDATE deleted_node SET phy_deleted_on = %s - WHERE node_id = %s; - """, - (phyDeletedOn, nodeId,)) - conn.commit() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + phyDeletedOn = datetime.datetime.now().isoformat() + cursor.execute(""" + UPDATE deleted_node SET phy_deleted_on = %s + WHERE node_id = %s; + """, + (phyDeletedOn, nodeId,)) + conn.commit() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + finally: + self.connPool.putconn(conn, close = False) def updateGroupRead(self, groupToAdd, groupToRemove, nodeVOSPath): - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - UPDATE node c - SET group_read = update_array(c.group_read, %s, %s) - FROM node n - WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); - """, - (groupToAdd, - groupToRemove, - nodeVOSPath,)) - conn.commit() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + UPDATE node c + SET group_read = update_array(c.group_read, %s, %s) + FROM node n + WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); + """, + (groupToAdd, + groupToRemove, + nodeVOSPath,)) + conn.commit() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + finally: + self.connPool.putconn(conn, close = False) def updateGroupWrite(self, groupToAdd, groupToRemove, nodeVOSPath): - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - UPDATE node c - SET group_write = update_array(c.group_write, %s, %s) - FROM node n - WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); - """, - (groupToAdd, - groupToRemove, - nodeVOSPath,)) - conn.commit() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + UPDATE node c + SET group_write = update_array(c.group_write, %s, %s) + FROM node n + WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); + """, + (groupToAdd, + groupToRemove, + nodeVOSPath,)) + conn.commit() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + finally: + self.connPool.putconn(conn, close = False) ##### Storage ##### def insertStorage(self, storageType, basePath, baseUrl, hostname): """Inserts a storage point.""" - with self.getConnection() as conn: - if not self.getStorageId(basePath): - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - INSERT INTO storage(storage_type, - base_path, - base_url, - hostname) - VALUES (%s, %s, %s, %s) - RETURNING storage_id; - """, - (storageType, - basePath, - baseUrl, - hostname,)) - storageSrcId = cursor.fetchall()[0]["storage_id"] - except Exception: - if not conn.closed: - conn.rollback() - raise - else: - if storageType == "cold" or storageType == "hot": - try: - cursor.execute(""" - SELECT storage_id - FROM storage - WHERE storage_type = 'local' - AND base_path = '/home' - AND hostname = 'localhost'; - """) - storageDestId = cursor.fetchall()[0]["storage_id"] - except Exception: - if not conn.closed: - conn.rollback() - raise - else: - locationType = "async" - else: - storageDestId = storageSrcId - locationType = "portal" - try: - cursor.execute(""" - INSERT INTO location(location_type, - storage_src_id, - storage_dest_id) - VALUES (%s, %s, %s); - """, - (locationType, - storageSrcId, - storageDestId,)) - conn.commit() - except Exception: - if not conn.closed: - conn.rollback() - raise - else: - return True - else: - return False - - def deleteStorage(self, storageId): - """Deletes a storage point.""" - with self.getConnection() as conn: + conn = self.getConnection() + if not self.getStorageId(basePath): try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" - SELECT count(*) > 0 AS res - FROM Node - WHERE location_id IN - (SELECT location_id - FROM storage s - JOIN location l - ON s.storage_id = l.storage_src_id - WHERE storage_src_id = %s); + INSERT INTO storage(storage_type, + base_path, + base_url, + hostname) + VALUES (%s, %s, %s, %s) + RETURNING storage_id; """, - (storageId,)) - result = cursor.fetchall() + (storageType, + basePath, + baseUrl, + hostname,)) + storageSrcId = cursor.fetchall()[0]["storage_id"] except Exception: if not conn.closed: conn.rollback() raise else: - if result[0]["res"]: - return False - else: + if storageType == "cold" or storageType == "hot": try: cursor.execute(""" - DELETE FROM storage - WHERE storage_id = %s; - """, - (storageId,)) - conn.commit() + SELECT storage_id + FROM storage + WHERE storage_type = 'local' + AND base_path = '/home' + AND hostname = 'localhost'; + """) + storageDestId = cursor.fetchall()[0]["storage_id"] except Exception: if not conn.closed: conn.rollback() raise else: - return True + locationType = "async" + else: + storageDestId = storageSrcId + locationType = "portal" + try: + cursor.execute(""" + INSERT INTO location(location_type, + storage_src_id, + storage_dest_id) + VALUES (%s, %s, %s); + """, + (locationType, + storageSrcId, + storageDestId,)) + conn.commit() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + return True + finally: + self.connPool.putconn(conn, close = False) + else: + return False + + def deleteStorage(self, storageId): + """Deletes a storage point.""" + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + SELECT count(*) > 0 AS res + FROM Node + WHERE location_id IN + (SELECT location_id + FROM storage s + JOIN location l + ON s.storage_id = l.storage_src_id + WHERE storage_src_id = %s); + """, + (storageId,)) + result = cursor.fetchall() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + if result[0]["res"]: + return False + else: + try: + cursor.execute(""" + DELETE FROM storage + WHERE storage_id = %s; + """, + (storageId,)) + conn.commit() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + return True + finally: + self.connPool.putconn(conn, close = False) ##### Users ##### def insertUser(self, userId, username, email): """Inserts users data.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - INSERT INTO users(user_id, - user_name, - e_mail) - VALUES (%s, %s, %s) - ON CONFLICT (user_id) - DO NOTHING; - """, - (userId, - username, - email,)) - conn.commit() - except Exception: - if not conn.closed: - conn.rollback() - raise + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + INSERT INTO users(user_id, + user_name, + e_mail) + VALUES (%s, %s, %s) + ON CONFLICT (user_id) + DO NOTHING; + """, + (userId, + username, + email,)) + conn.commit() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + finally: + self.connPool.putconn(conn, close = False)