#!/usr/bin/env python import datetime import json import psycopg2 import sys from contextlib import contextmanager from psycopg2.extras import RealDictCursor from psycopg2.pool import ThreadedConnectionPool from node import Node class DbConnector(object): def __init__(self, user, password, host, port, dbname, minConnNum, maxConnNum): self.user = user self.password = password self.host = host self.port = port self.dbname = dbname self.minConnNum = minConnNum self.maxConnNum = maxConnNum self.createConnectionPool() def createConnectionPool(self): self.connPool = ThreadedConnectionPool( self.minConnNum, self.maxConnNum, user = self.user, password = self.password, host = self.host, database = self.dbname, port = self.port, connect_timeout = 2 ) @contextmanager def getConnection(self): conn = self.connPool.getconn() #conn.autocommit = True try: yield conn finally: self.connPool.putconn(conn, close = False) """ Getters """ ##### Node ##### def nodeExists(self, vospacePath): """Checks if a VOSpace node already exists. Returns a boolean.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM node_vos_path WHERE vos_path = %s;", (vospacePath,)) result = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() print(e) if result: return True else: return False def getCreatorId(self, vospacePath): """Returns the creator ID for a given vospace path representing a node.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT creator_id FROM node_vos_path nvp JOIN node n ON nvp.node_id = n.node_id WHERE vos_path = %s; """, (vospacePath,)) result = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() print(e) return result[0]["creator_id"] def getOSPath(self, vospacePath): """Returns a list containing full path, storage type and username for a VOSpace path.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, get_os_path(n.node_id) AS os_path, content_length FROM node n JOIN location l ON n.location_id = l.location_id JOIN storage s ON s.storage_id = l.storage_src_id JOIN users u ON u.user_id = n.creator_id WHERE n.node_id = id_from_vos_path(%s); """, (vospacePath,)) result = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() print(e) storageType = result[0]["storage_type"] basePath = result[0]["base_path"] userName = result[0]["user_name"] tstampWrappedDir = result[0]["tstamp_wrapper_dir"] osPath = result[0]["os_path"] contentLength = result[0]["content_length"] if tstampWrappedDir is None: baseSrcPath = basePath + "/" + userName else: baseSrcPath = basePath + "/" + userName + "/" + tstampWrappedDir fullPath = baseSrcPath + osPath fileInfo = { "baseSrcPath": baseSrcPath, "fullPath": fullPath, "storageType": storageType, "username": userName, "osPath": osPath, "contentLength": contentLength } return fileInfo def getVOSpacePathList(self, vospacePath): """Returns the list of VOSpace paths carried by a VOSpace node, according to the node VOSpace path.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT get_vos_path(n.node_id) FROM node n JOIN list_of_files l ON l.node_id = n.node_id WHERE l.list_node_id = id_from_vos_path(%s); """, (vospacePath,)) results = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() print(e) vospacePathList = [] for el in results: vospacePathList.append(el["vos_path"]) return vospacePathList def getNodesToBeDeleted(self): "Returns a path list of files to be deleted with also the corresponding deletion timestamp." with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" WITH RECURSIVE del AS ( SELECT COALESCE(os_name, name) AS os_name, 1 AS level, node_id AS deleted_node_id, path(parent_relative_path, node_id) AS relative_path, parent_relative_path FROM deleted_node WHERE phy_deleted_on IS NULL UNION ALL SELECT COALESCE(n.os_name, n.name), d.level + 1, d.deleted_node_id, n.relative_path, n.parent_relative_path FROM node n JOIN del d ON n.relative_path = d.parent_relative_path WHERE n.parent_relative_path IS NOT NULL ), paths_to_delete AS (SELECT deleted_node_id, '/' || STRING_AGG(os_name, '/' ORDER BY LEVEL DESC) AS os_path FROM del GROUP BY deleted_node_id) SELECT base_path || '/' || creator_id as os_base_path, os_path AS os_rel_path, deleted_on, d.node_id FROM paths_to_delete p JOIN deleted_node d ON d.node_id = p.deleted_node_id JOIN location l ON d.location_id = l.location_id JOIN storage s ON s.storage_id = l.storage_src_id; """) result = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() print(e) return result ##### Job ##### def jobExists(self, jobId): """Checks if a job already exists. Returns a boolean.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() print(e) if result: return True else: return False def getJob(self, jobId): """Returns a JSON object containing job information, according to the job id.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) #out = open("db_connector_log.txt", "a") result = cursor.fetchall() #out.write(f"result: {result}\n\n") #out.close() except Exception as e: if not conn.closed: conn.rollback() if not result: return json.loads('{ "error": "JOB_NOT_FOUND" }') else: job = dict() for idx in result[0]: oldIdx = idx idxTokens = idx.split('_') idx = idxTokens[0] + ''.join(token.title() for token in idxTokens[1:]) job[idx] = result[0][oldIdx] el = job[idx] if isinstance(el, datetime.datetime): job[idx] = el.isoformat() #out.write(f"job: {job}\n\n") #out.close() return job def getJobInfo(self, jobId): """Returns the 'job_info' field, according to the UWS specification.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT job_info FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() return result[0]["job_info"] def getJobResults(self, jobId): """Returns the 'results' field, according to the UWS specification.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT results FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() return result[0]["results"] def listActiveJobs(self): """Returns some info about active jobs.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT job_id, job_type, phase, creation_time, start_time, owner_id FROM job WHERE phase NOT IN ('ABORTED', 'COMPLETED', 'ERROR'); """) result = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() for row in result: for idx in row: el = row[idx] if isinstance(el, datetime.datetime): row[idx] = el.isoformat() return result def listJobsByPhase(self, phase): """Returns some info about jobs according to the phase.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) if phase in [ "PENDING", "QUEUED", "EXECUTING" ]: cursor.execute(""" SELECT job_id, job_type, phase, creation_time, start_time, owner_id FROM job WHERE phase = %s; """, (phase,)) else: cursor.execute(""" SELECT job_id, job_type, phase, start_time, end_time, owner_id FROM job WHERE phase = %s; """, (phase,)) result = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() for row in result: for idx in row: el = row[idx] if isinstance(el, datetime.datetime): row[idx] = el.isoformat() return result ##### User ##### def userExists(self, username): """Checks if a user already exists. Returns a boolean.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM users WHERE user_name = %s;", (username,)) result = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() if result: return True else: return False def getUserId(self, username): """Returns the user id for a given user name.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT user_id FROM users WHERE user_name = %s;", (username,)) result = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() return result[0]["user_id"] def getUserName(self, userId): """Returns the user name for a given user id.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT user_name FROM users WHERE user_id = %s;", (userId,)) result = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() return result[0]["user_name"] ##### Storage ##### def storageBasePathIsValid(self, path): """Checks if the base path of a physical path is valid. If true, returns the base path, else returns 'False'.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT base_path FROM storage WHERE position(base_path in cast(%s as varchar)) > 0; """, (path,)) result = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() if result: return result[0]["base_path"] else: return False def getStorageBasePath(self, storageId): """Returns the storage base path for a give storage id.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT base_path FROM storage WHERE storage_id = %s;", (storageId,)) result = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() return result[0]["base_path"] def getStorageList(self): """Returns the full storage base list. Local storage points are excluded by default.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM storage WHERE storage_type <> 'local';") result = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() return result def getStorageListByType(self, storageType): """Returns a list of storage locations for a given storage type.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM storage WHERE storage_type = %s;", (storageType,)) result = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() return result def getStorageType(self, basePath): """Returns the storage type for a given storage base path, if any. Otherwise it returns 'False'.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT storage_type FROM storage WHERE base_path = %s;", (basePath,)) result = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() if result: return result[0]["storage_type"] else: return False def getStorageId(self, basePath): """Returns the storage id for a given storage base path, if any. Otherwise it returns 'False'.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT storage_id FROM storage WHERE base_path = %s;", (basePath,)) result = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() if result: return result[0]["storage_id"] else: return False ##### Location ##### def getLocationId(self, destStorageId): """Returns the location id according to the storage id of the destination.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT location_id FROM location WHERE storage_src_id = %s;", (destStorageId,)) result = cursor.fetchall() except Exception as e: if not conn.closed: conn.rollback() return result[0]["location_id"] """ Setters """ ##### Job ##### def insertJob(self, jobObj): """Inserts/updates a job object.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" INSERT INTO job(job_id, owner_id, job_type, phase, start_time, end_time, job_info, results, error_message, error_type) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (job_id) DO UPDATE SET (owner_id, job_type, phase, start_time, end_time, job_info, results, error_message, error_type) = (EXCLUDED.owner_id, EXCLUDED.job_type, EXCLUDED.phase, EXCLUDED.start_time, EXCLUDED.end_time, EXCLUDED.job_info, EXCLUDED.results, EXCLUDED.error_message, EXCLUDED.error_type); """, (jobObj.jobId, jobObj.ownerId, jobObj.type, jobObj.phase, jobObj.startTime, jobObj.endTime, json.dumps(jobObj.jobInfo), json.dumps(jobObj.results), jobObj.errorMessage, jobObj.errorType,)) conn.commit() except Exception as e: if not conn.closed: conn.rollback() def setStartTime(self, jobId): """Sets the job 'start_time' parameter.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) startTime = datetime.datetime.today().isoformat() cursor.execute(""" UPDATE job SET start_time = %s WHERE job_id = %s; """, (startTime, jobId,)) conn.commit() except Exception as e: if not conn.closed: conn.rollback() def setEndTime(self, jobId): """Sets the job 'end_time' parameter.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) endTime = datetime.datetime.today().isoformat() cursor.execute(""" UPDATE job SET end_time = %s WHERE job_id = %s; """, (endTime, jobId,)) conn.commit() except Exception as e: if not conn.closed: conn.rollback() def setPhase(self, jobId, phase): """Sets the job 'phase' parameter.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE job SET phase = %s WHERE job_id = %s; """, (phase, jobId,)) conn.commit() except Exception as e: if not conn.closed: conn.rollback() def setTotalBlocks(self, jobId, totalBlocks): """ Sets the job 'total_blocks' parameter for a data retrieve operation. """ with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE job SET total_blocks = %s WHERE job_id = %s; """, (totalBlocks, jobId,)) conn.commit() except Exception as e: if not conn.closed: conn.rollback() def updateProcessedBlocks(self, jobId, processedBlocks): """ Updates the job 'processed_blocks' parameter for a data retrieve operation. """ with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE job SET processed_blocks = %s WHERE job_id = %s; """, (processedBlocks, jobId,)) conn.commit() except Exception as e: if not conn.closed: conn.rollback() def setResults(self, jobId, results): """Sets the job 'results' parameter.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE job SET results = %s WHERE job_id = %s; """, (json.dumps(results), jobId,)) conn.commit() except Exception as e: if not conn.closed: conn.rollback() ##### Node ##### def insertNode(self, node): """Inserts a VOSpace node. Returns 'True' on success, 'False' otherwise.""" with self.getConnection() as conn: try: #out = open("db_connector_log.txt", "a") #out.write(f"parentOSPath: {node.parentPath}\n") #out.write(f"name: {node.name}\n") cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT path FROM node WHERE node_id = id_from_vos_path(%s); """, (node.parentPath,)) result = cursor.fetchall() #for i in result: # out.write(f"queryResult: {i}\n") parentLtreePath = result[0]["path"] parentLtreeRelativePath = "" if "." in parentLtreePath: parentLtreeRelativePath = ".".join(parentLtreePath.strip(".").split('.')[1:]) #out.write(f"parentLtreeRelativePath: {parentLtreeRelativePath}\n") #out.write(f"parentLtreePath: {parentLtreePath}\n") #out.write(f"parentPath: {node.parentPath}\n\n") #out.close() except Exception as e: if not conn.closed: conn.rollback() try: #print(f"parentLtreePath: {parentLtreePath}, type: {type(parentLtreePath)}") cursor.execute(""" INSERT INTO node(parent_path, parent_relative_path, name, tstamp_wrapper_dir, type, location_id, async_trans, sticky, job_id, creator_id, content_length, content_md5) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING RETURNING node_id; """, (parentLtreePath, parentLtreeRelativePath, node.name, node.wrapperDir, node.type, node.locationId, node.asyncTrans, node.sticky, node.jobId, node.creatorId, node.contentLength, node.contentMD5,)) result = cursor.fetchall() conn.commit() if result: return True else: return False except Exception as e: if not conn.closed: conn.rollback() def setAsyncTrans(self, nodeVOSPath, value): """Sets the 'async_trans' flag for a VOSpace node.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE node c SET async_trans = %s FROM node n WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); """, (value, nodeVOSPath,)) conn.commit() except Exception as e: if not conn.closed: conn.rollback() def setJobId(self, nodeVOSPath, value): """Sets the 'job_id' flag for a VOSpace node.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE node c SET job_id = %s FROM node n WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); """, (value, nodeVOSPath,)) conn.commit() except Exception as e: if not conn.closed: conn.rollback() def setPhyDeletedOn(self, nodeId): """Sets the 'phy_deleted_on' flag for a VOSpace deleted node.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) phyDeletedOn = datetime.datetime.now().isoformat() cursor.execute(""" UPDATE deleted_node SET phy_deleted_on = %s WHERE node_id = %s; """, (phyDeletedOn, nodeId,)) conn.commit() except Exception as e: if not conn.closed: conn.rollback() def updateGroupRead(self, groupToAdd, groupToRemove, nodeVOSPath): with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE node c SET group_read = update_array(c.group_read, %s, %s) FROM node n WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); """, (groupToAdd, groupToRemove, nodeVOSPath,)) conn.commit() except Exception as e: if not conn.closed: conn.rollback() def updateGroupWrite(self, groupToAdd, groupToRemove, nodeVOSPath): with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" UPDATE node c SET group_write = update_array(c.group_write, %s, %s) FROM node n WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s); """, (groupToAdd, groupToRemove, nodeVOSPath,)) conn.commit() except Exception as e: if not conn.closed: conn.rollback() ##### Storage ##### def insertStorage(self, storageType, basePath, baseUrl, hostname): """Inserts a storage point.""" with self.getConnection() as conn: if not self.getStorageId(basePath): try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" INSERT INTO storage(storage_type, base_path, base_url, hostname) VALUES (%s, %s, %s, %s) RETURNING storage_id; """, (storageType, basePath, baseUrl, hostname,)) storageSrcId = cursor.fetchall()[0]["storage_id"] except Exception as e: if not conn.closed: conn.rollback() if storageType == "cold" or storageType == "hot": try: cursor.execute(""" SELECT storage_id FROM storage WHERE storage_type = 'local' AND base_path = '/home' AND hostname = 'localhost'; """) storageDestId = cursor.fetchall()[0]["storage_id"] except Exception as e: if not conn.closed: conn.rollback() locationType = "async" else: storageDestId = storageSrcId locationType = "portal" try: cursor.execute(""" INSERT INTO location(location_type, storage_src_id, storage_dest_id) VALUES (%s, %s, %s); """, (locationType, storageSrcId, storageDestId,)) conn.commit() except: if not conn.closed: conn.rollback() return True else: return False def deleteStorage(self, storageId): """Deletes a storage point.""" with self.getConnection() as conn: try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" SELECT count(*) > 0 AS res FROM Node WHERE location_id IN (SELECT location_id FROM storage s JOIN location l ON s.storage_id = l.storage_src_id WHERE storage_src_id = %s); """, (storageId,)) result = cursor.fetchall() except: if not conn.closed: conn.rollback() if result[0]["res"]: return False else: try: cursor.execute(""" DELETE FROM storage WHERE storage_id = %s; """, (storageId,)) conn.commit() except: if not conn.closed: conn.rollback() return True