#!/usr/bin/env python # # This file is part of vospace-transfer-service # Copyright (C) 2021 Istituto Nazionale di Astrofisica # SPDX-License-Identifier: GPL-3.0-or-later # import datetime import json import logging import psycopg2 import sys import time from contextlib import contextmanager from psycopg2.extras import RealDictCursor from psycopg2.pool import ThreadedConnectionPool from node import Node class DbConnector(object): def __init__(self, user, password, host, port, dbname, minConnNum, maxConnNum): self.user = user self.password = password self.host = host self.port = port self.dbname = dbname self.minConnNum = minConnNum self.maxConnNum = maxConnNum self.connPool = None self.createConnectionPool() def createConnectionPool(self): self.connPool = ThreadedConnectionPool( self.minConnNum, self.maxConnNum, user = self.user, password = self.password, host = self.host, database = self.dbname, port = self.port, connect_timeout = 0 ) def getConnection(self, retry = 10, timeout = 30): if retry < 1: retry = 1 if timeout < 1: timeout = 1 conn = None while retry > 0: try: logging.warning(f"Getting connection from pool: retry = {retry}") conn = self.connPool.getconn() conn.reset() except Exception: logging.warning(f"Unable to get a connection from pool: retry = {retry}") if conn is not None: self.connPool.putconn(conn, close = False) conn = None time.sleep(timeout) retry -= 1 if retry == 0: raise else: logging.warning(f"Connection established: retry = {retry}") return conn """ Getters """ ##### Node ##### def nodeExists(self, vospacePath): """Checks if a VOSpace node already exists. Returns a boolean.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM node_vos_path WHERE vos_path = %s;", (vospacePath,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: if result: return True else: return False finally: self.connPool.putconn(conn, close = False) def getCreatorId(self, vospacePath): """Returns the creator ID for a given vospace path representing a node.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT creator_id FROM node_vos_path nvp JOIN node n ON nvp.node_id = n.node_id WHERE vos_path = %s; """, (vospacePath,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["creator_id"] finally: self.connPool.putconn(conn, close = False) def getGroupRead(self, vospacePath): """Returns the 'group_read' for a given VOSpace path representing a node.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT unnest(group_read) as group_read FROM node_vos_path nvp JOIN node n ON nvp.node_id = n.node_id WHERE vos_path = %s; """, (vospacePath,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: for i in range(0, len(result)): result[i]["group_read"] = result[i]["group_read"].split("people.")[-1].replace("\\", "") return result finally: self.connPool.putconn(conn, close = False) def getGroupWrite(self, vospacePath): """Returns the 'group_write' for a given VOSpace path representing a node.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT unnest(group_write) as group_write FROM node_vos_path nvp JOIN node n ON nvp.node_id = n.node_id WHERE vos_path = %s; """, (vospacePath,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: for i in range(0, len(result)): result[i]["group_write"] = result[i]["group_write"].split("people.")[-1].replace("\\", "") return result finally: self.connPool.putconn(conn, close = False) def getOSPath(self, vospacePath): """Returns a list containing full path, storage type and username for a VOSpace path.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, get_os_path(n.node_id) AS os_path, content_length FROM node n JOIN location l ON n.location_id = l.location_id JOIN storage s ON s.storage_id = l.storage_src_id JOIN users u ON u.user_id = n.creator_id WHERE n.node_id = id_from_vos_path(%s); """, (vospacePath,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: storageType = result[0]["storage_type"] basePath = result[0]["base_path"] userName = result[0]["user_name"] tstampWrappedDir = result[0]["tstamp_wrapper_dir"] osPath = result[0]["os_path"] contentLength = result[0]["content_length"] if tstampWrappedDir is None: baseSrcPath = basePath + "/" + userName else: baseSrcPath = basePath + "/" + userName + "/" + tstampWrappedDir fullPath = baseSrcPath + osPath fileInfo = { "baseSrcPath": baseSrcPath, "fullPath": fullPath, "storageType": storageType, "username": userName, "osPath": osPath, "contentLength": contentLength } return fileInfo finally: self.connPool.putconn(conn, close = False) def getVOSpacePathList(self, vospacePath): """Returns the list of VOSpace paths carried by a VOSpace node, according to the node VOSpace path.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT get_vos_path(n.node_id) FROM node n JOIN list_of_files l ON l.node_id = n.node_id WHERE l.list_node_id = id_from_vos_path(%s); """, (vospacePath,)) results = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: vospacePathList = [] for el in results: vospacePathList.append(el["vos_path"]) return vospacePathList finally: self.connPool.putconn(conn, close = False) def getNodesToBeDeleted(self): "Returns a path list of files to be deleted with also the corresponding deletion timestamp." conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" WITH RECURSIVE all_nodes AS ( SELECT node_id, name, os_name, relative_path, parent_relative_path FROM node UNION SELECT node_id, name, os_name, path(parent_relative_path, node_id) AS relative_path, parent_relative_path FROM deleted_node ), del AS ( SELECT COALESCE(os_name, name) AS os_name, 1 AS level, node_id AS deleted_node_id, path(parent_relative_path, node_id) AS relative_path, parent_relative_path FROM deleted_node WHERE phy_deleted_on IS NULL UNION ALL SELECT COALESCE(n.os_name, n.name), d.level + 1, d.deleted_node_id, n.relative_path, n.parent_relative_path FROM all_nodes n JOIN del d ON n.relative_path = d.parent_relative_path WHERE n.parent_relative_path IS NOT NULL ), paths_to_delete AS (SELECT deleted_node_id, '/' || STRING_AGG(os_name, '/' ORDER BY LEVEL DESC) AS os_path FROM del GROUP BY deleted_node_id) SELECT base_path || '/' || creator_id as os_base_path, os_path AS os_rel_path, deleted_on, d.node_id FROM paths_to_delete p JOIN deleted_node d ON d.node_id = p.deleted_node_id JOIN location l ON d.location_id = l.location_id JOIN storage s ON s.storage_id = l.storage_src_id; """) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: return result finally: self.connPool.putconn(conn, close = False) def nodeIsBusy(self, vospacePath): """Returns 'True' if the VOSpace node is busy, 'False' otherwise.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT job_id FROM node WHERE node_id = id_from_vos_path(%s); """, (vospacePath,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: if result[0]["job_id"]: return True else: return False finally: self.connPool.putconn(conn, close = False) ##### Job ##### def jobExists(self, jobId): """Checks if a job already exists. Returns a boolean.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: if result: return True else: return False finally: self.connPool.putconn(conn, close = False) def getJob(self, jobId): """Returns a JSON object containing job information, according to the job id.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: if not result: return json.loads('{ "error": "JOB_NOT_FOUND" }') else: job = dict() for idx in result[0]: oldIdx = idx idxTokens = idx.split('_') idx = idxTokens[0] + ''.join(token.title() for token in idxTokens[1:]) job[idx] = result[0][oldIdx] el = job[idx] if isinstance(el, datetime.datetime): job[idx] = el.isoformat() return job finally: self.connPool.putconn(conn, close = False) def getJobPhase(self, jobId): """Returns the 'phase' field, according to the UWS specification.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT phase FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["phase"] finally: self.connPool.putconn(conn, close = False) def getJobInfo(self, jobId): """Returns the 'job_info' field, according to the UWS specification.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT job_info FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["job_info"] finally: self.connPool.putconn(conn, close = False) def getJobResults(self, jobId): """Returns the 'results' field, according to the UWS specification.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT results FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["results"] finally: self.connPool.putconn(conn, close = False) def listActiveJobs(self): """Returns some info about active jobs.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT job_id, job_type, phase, creation_time, start_time, owner_id FROM job WHERE phase NOT IN ('ABORTED', 'COMPLETED', 'ERROR') AND job_type IN ('pullFromVoSpace', 'pullToVoSpace', 'pushToVoSpace', 'vos_data', 'vos_group', 'vos_import') ORDER BY creation_time DESC; """) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: for row in result: for idx in row: el = row[idx] if isinstance(el, datetime.datetime): row[idx] = el.isoformat() return result finally: self.connPool.putconn(conn, close = False) def listJobsByPhase(self, phase): """Returns some info about jobs according to the phase.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) if phase in [ "PENDING", "QUEUED", "EXECUTING" ]: cursor.execute(""" SELECT job_id, job_type, phase, creation_time, start_time, owner_id FROM job WHERE phase = %s AND job_type IN ('pullFromVoSpace', 'pullToVoSpace', 'pushToVoSpace', 'vos_data', 'vos_group', 'vos_import') ORDER BY creation_time DESC; """, (phase,)) else: cursor.execute(""" SELECT job_id, job_type, phase, start_time, end_time, owner_id FROM job WHERE phase = %s AND job_type IN ('pullFromVoSpace', 'pullToVoSpace', 'pushToVoSpace', 'vos_data', 'vos_group', 'vos_import') ORDER BY creation_time DESC; """, (phase,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: for row in result: for idx in row: el = row[idx] if isinstance(el, datetime.datetime): row[idx] = el.isoformat() return result finally: self.connPool.putconn(conn, close = False) ##### User ##### def userExists(self, username): """Checks if a user already exists. Returns a boolean.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM users WHERE user_name = %s;", (username,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: if result: return True else: return False finally: self.connPool.putconn(conn, close = False) def getUserId(self, username): """Returns the user id for a given user name.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT user_id FROM users WHERE user_name = %s;", (username,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["user_id"] finally: self.connPool.putconn(conn, close = False) def getUserName(self, userId): """Returns the user name for a given user id.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT user_name FROM users WHERE user_id = %s;", (userId,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["user_name"] finally: self.connPool.putconn(conn, close = False) def getUserEmail(self, userId): """Returns the user email address for a given user id.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT e_mail FROM users WHERE user_id = %s;", (userId,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["e_mail"] finally: self.connPool.putconn(conn, close = False) ##### Storage ##### def storageBasePathIsValid(self, path): """Checks if the base path of a physical path is valid. If true, returns the base path, else returns 'False'.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT base_path FROM storage WHERE position(base_path in cast(%s as varchar)) > 0; """, (path,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: if result: return result[0]["base_path"] else: return False finally: self.connPool.putconn(conn, close = False) def getStorageBasePath(self, storageId): """Returns the storage base path for a give storage id.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT base_path FROM storage WHERE storage_id = %s;", (storageId,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["base_path"] finally: self.connPool.putconn(conn, close = False) def getStorageList(self): """Returns the full storage base list. Local storage points are excluded by default.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM storage WHERE storage_type <> 'local';") result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: return result finally: self.connPool.putconn(conn, close = False) def getStorageListByType(self, storageType): """Returns a list of storage locations for a given storage type.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM storage WHERE storage_type = %s;", (storageType,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: return result finally: self.connPool.putconn(conn, close = False) def getStorageType(self, basePath): """Returns the storage type for a given storage base path, if any. Otherwise it returns 'False'.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT storage_type FROM storage WHERE base_path = %s;", (basePath,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: if result: return result[0]["storage_type"] else: return False finally: self.connPool.putconn(conn, close = False) def getStorageId(self, basePath): """Returns the storage id for a given storage base path, if any. Otherwise it returns 'False'.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT storage_id FROM storage WHERE base_path = %s;", (basePath,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: if result: return result[0]["storage_id"] else: return False finally: self.connPool.putconn(conn, close = False) def getStorageHostname(self, storageId): """Returns the storage hostname for a given storage id, if any. Otherwise it returns 'False'.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT hostname FROM storage WHERE storage_id = %s;", (storageId,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: if result: return result[0]["hostname"] else: return False finally: self.connPool.putconn(conn, close = False) ##### Location ##### def getLocationId(self, destStorageId): """Returns the location id according to the storage id of the destination.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT location_id FROM location WHERE storage_src_id = %s;", (destStorageId,)) result = cursor.fetchall() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: return result[0]["location_id"] finally: self.connPool.putconn(conn, close = False) """ Setters """ ##### Job ##### def insertJob(self, jobObj): """Inserts/updates a job object.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" INSERT INTO job(job_id, owner_id, job_type, phase, start_time, end_time, job_info, node_list, results, error_message, error_type) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (job_id) DO UPDATE SET (owner_id, job_type, phase, start_time, end_time, job_info, node_list, results, error_message, error_type) = (EXCLUDED.owner_id, EXCLUDED.job_type, EXCLUDED.phase, EXCLUDED.start_time, EXCLUDED.end_time, EXCLUDED.job_info, EXCLUDED.node_list, EXCLUDED.results, EXCLUDED.error_message, EXCLUDED.error_type); """, (jobObj.jobId, jobObj.ownerId, jobObj.type, jobObj.phase, jobObj.startTime, jobObj.endTime, json.dumps(jobObj.jobInfo), json.dumps(jobObj.nodeList), json.dumps(jobObj.results), jobObj.errorMessage, jobObj.errorType,)) conn.commit() cursor.close() except Exception: if not conn.closed: conn.rollback() raise finally: self.connPool.putconn(conn, close = False) def setStartTime(self, jobId): """Sets the job 'start_time' parameter.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) startTime = datetime.datetime.today().isoformat() cursor.execute(""" UPDATE job SET start_time = %s WHERE job_id = %s; """, (startTime, jobId,)) conn.commit() cursor.close() except Exception: if not conn.closed: conn.rollback() raise finally: self.connPool.putconn(conn, close = False) def setEndTime(self, jobId): """Sets the job 'end_time' parameter.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) endTime = datetime.datetime.today().isoformat() cursor.execute(""" UPDATE job SET end_time = %s WHERE job_id = %s; """, (endTime, jobId,)) conn.commit() cursor.close() except Exception: if not conn.closed: conn.rollback() raise finally: self.connPool.putconn(conn, close = False) def setPhase(self, jobId, phase): """Sets the job 'phase' parameter.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE job SET phase = %s WHERE job_id = %s; """, (phase, jobId,)) conn.commit() cursor.close() except Exception: if not conn.closed: conn.rollback() raise finally: self.connPool.putconn(conn, close = False) def setTotalBlocks(self, jobId, totalBlocks): """ Sets the job 'total_blocks' parameter for a data retrieve operation. """ conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE job SET total_blocks = %s WHERE job_id = %s; """, (totalBlocks, jobId,)) conn.commit() cursor.close() except Exception: if not conn.closed: conn.rollback() raise finally: self.connPool.putconn(conn, close = False) def updateProcessedBlocks(self, jobId, processedBlocks): """ Updates the job 'processed_blocks' parameter for a data retrieve operation. """ conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE job SET processed_blocks = %s WHERE job_id = %s; """, (processedBlocks, jobId,)) conn.commit() cursor.close() except Exception: if not conn.closed: conn.rollback() raise finally: self.connPool.putconn(conn, close = False) def setResults(self, jobId, results): """Sets the job 'results' parameter.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE job SET results = %s WHERE job_id = %s; """, (json.dumps(results), jobId,)) conn.commit() cursor.close() except Exception: if not conn.closed: conn.rollback() raise finally: self.connPool.putconn(conn, close = False) ##### Node ##### def insertNode(self, node): """Inserts a VOSpace node. Returns 'True' on success, 'False' otherwise.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT path FROM node WHERE node_id = id_from_vos_path(%s); """, (node.parentPath,)) result = cursor.fetchall() parentLtreePath = result[0]["path"] parentLtreeRelativePath = "" if "." in parentLtreePath: parentLtreeRelativePath = ".".join(parentLtreePath.strip(".").split('.')[1:]) except Exception: if not conn.closed: conn.rollback() raise else: try: cursor.execute(""" INSERT INTO node(parent_path, parent_relative_path, name, tstamp_wrapper_dir, type, location_id, async_trans, sticky, job_id, creator_id, content_length, content_md5) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING RETURNING node_id; """, (parentLtreePath, parentLtreeRelativePath, node.name, node.wrapperDir, node.type, node.locationId, node.asyncTrans, node.sticky, node.jobId, node.creatorId, node.contentLength, node.contentMD5,)) result = cursor.fetchall() conn.commit() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: if result: return True else: return False finally: self.connPool.putconn(conn, close = False) def setAsyncTrans(self, nodeVOSPath, value): """Sets the 'async_trans' flag for a VOSpace node.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE node c SET async_trans = %s FROM node n WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); """, (value, nodeVOSPath,)) conn.commit() cursor.close() except Exception: if not conn.closed: conn.rollback() raise finally: self.connPool.putconn(conn, close = False) def setJobId(self, nodeVOSPath, value): """Sets the 'job_id' flag for a VOSpace node.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE node c SET job_id = %s FROM node n WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); """, (value, nodeVOSPath,)) conn.commit() cursor.close() except Exception: if not conn.closed: conn.rollback() raise finally: self.connPool.putconn(conn, close = False) def setPhyDeletedOn(self, nodeId): """Sets the 'phy_deleted_on' flag for a VOSpace deleted node.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) phyDeletedOn = datetime.datetime.now().isoformat() cursor.execute(""" UPDATE deleted_node SET phy_deleted_on = %s WHERE node_id = %s; """, (phyDeletedOn, nodeId,)) conn.commit() cursor.close() except Exception: if not conn.closed: conn.rollback() raise finally: self.connPool.putconn(conn, close = False) def updateGroupRead(self, groupToAdd, groupToRemove, nodeVOSPath): conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE node c SET group_read = update_array(c.group_read, %s, %s) FROM node n WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); """, (groupToAdd, groupToRemove, nodeVOSPath,)) conn.commit() cursor.close() except Exception: if not conn.closed: conn.rollback() raise finally: self.connPool.putconn(conn, close = False) def updateGroupWrite(self, groupToAdd, groupToRemove, nodeVOSPath): conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE node c SET group_write = update_array(c.group_write, %s, %s) FROM node n WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); """, (groupToAdd, groupToRemove, nodeVOSPath,)) conn.commit() cursor.close() except Exception: if not conn.closed: conn.rollback() raise finally: self.connPool.putconn(conn, close = False) ##### Storage ##### def insertStorage(self, storageType, basePath, baseUrl, hostname): """Inserts a storage point.""" conn = self.getConnection() if not self.getStorageId(basePath): try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" INSERT INTO storage(storage_type, base_path, base_url, hostname) VALUES (%s, %s, %s, %s) RETURNING storage_id; """, (storageType, basePath, baseUrl, hostname,)) storageSrcId = cursor.fetchall()[0]["storage_id"] except Exception: if not conn.closed: conn.rollback() raise else: if storageType == "cold" or storageType == "hot": try: cursor.execute(""" SELECT storage_id FROM storage WHERE storage_type = 'local' AND base_path = '/home' AND hostname = 'localhost'; """) storageDestId = cursor.fetchall()[0]["storage_id"] except Exception: if not conn.closed: conn.rollback() raise else: locationType = "async" else: storageDestId = storageSrcId locationType = "portal" try: cursor.execute(""" INSERT INTO location(location_type, storage_src_id, storage_dest_id) VALUES (%s, %s, %s); """, (locationType, storageSrcId, storageDestId,)) conn.commit() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: return True finally: self.connPool.putconn(conn, close = False) else: return False def deleteStorage(self, storageId): """Deletes a storage point.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT count(*) > 0 AS res FROM Node WHERE location_id IN (SELECT location_id FROM storage s JOIN location l ON s.storage_id = l.storage_src_id WHERE storage_src_id = %s); """, (storageId,)) result = cursor.fetchall() except Exception: if not conn.closed: conn.rollback() raise else: if result[0]["res"]: return False else: try: cursor.execute(""" DELETE FROM storage WHERE storage_id = %s; """, (storageId,)) conn.commit() cursor.close() except Exception: if not conn.closed: conn.rollback() raise else: return True finally: self.connPool.putconn(conn, close = False) ##### Users ##### def insertUser(self, userId, username, email): """Inserts users data.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" INSERT INTO users(user_id, user_name, e_mail) VALUES (%s, %s, %s) ON CONFLICT (user_id) DO NOTHING; """, (userId, username, email,)) conn.commit() cursor.close() except Exception: if not conn.closed: conn.rollback() raise finally: self.connPool.putconn(conn, close = False)