From 64a45e8569cdde03d8612dbb2c6412930aa59bbb Mon Sep 17 00:00:00 2001 From: Cristiano Urban Date: Wed, 17 Mar 2021 15:37:45 +0100 Subject: [PATCH] DbConnector: switched to threaded connection pool. Signed-off-by: Cristiano Urban --- transfer_service/abort_job_amqp_server.py | 4 +- transfer_service/db_connector.py | 1040 ++++++++++++--------- transfer_service/get_job_amqp_server.py | 4 +- transfer_service/import_amqp_server.py | 16 +- transfer_service/job_amqp_server.py | 16 +- transfer_service/retrieve_executor.py | 16 +- transfer_service/retrieve_preprocessor.py | 4 +- transfer_service/start_job_amqp_server.py | 4 +- transfer_service/storage_amqp_server.py | 16 +- transfer_service/store_amqp_server.py | 8 +- transfer_service/store_executor.py | 8 +- transfer_service/store_preprocessor.py | 8 +- 12 files changed, 676 insertions(+), 468 deletions(-) diff --git a/transfer_service/abort_job_amqp_server.py b/transfer_service/abort_job_amqp_server.py index 9a3e8b3..b3c1d5d 100644 --- a/transfer_service/abort_job_amqp_server.py +++ b/transfer_service/abort_job_amqp_server.py @@ -17,9 +17,9 @@ class AbortJobAMQPServer(AMQPServer): super(AbortJobAMQPServer, self).__init__(host, port, queue) def execute_callback(self, requestBody): - self.dbConn.connect() + #self.dbConn.connect() # do something... - self.dbConn.disconnect() + #self.dbConn.disconnect() return 42 def run(self): diff --git a/transfer_service/db_connector.py b/transfer_service/db_connector.py index c5bdeb4..758a90d 100644 --- a/transfer_service/db_connector.py +++ b/transfer_service/db_connector.py @@ -1,9 +1,12 @@ import datetime import json import psycopg2 -import psycopg2.extras import sys +from contextlib import contextmanager +from psycopg2.extras import RealDictCursor +from psycopg2.pool import ThreadedConnectionPool + from node import Node @@ -15,22 +18,44 @@ class DbConnector(object): self.host = host self.port = port self.dbname = dbname - - def connect(self): + self.createConnectionPool() + + def createConnectionPool(self): + self.connPool = ThreadedConnectionPool( + 5, + 32, + user = self.user, + password = self.password, + host = self.host, + database = self.dbname, + port = self.port, + connect_timeout = 2 + ) + + #def connect(self): + # try: + # self.conn = psycopg2.connect(user = self.user, + # password = self.password, + # host = self.host, + # port = self.port, + # database = self.dbname) + # except(Exception, psycopg2.Error) as error : + # sys.exit(f"Error while connecting to PostgreSQL: {error}") + # self.cursor = self.conn.cursor(cursor_factory = RealDictCursor) + + @contextmanager + def getConnection(self): + conn = self.connPool.getconn() + #conn.autocommit = True try: - self.conn = psycopg2.connect(user = self.user, - password = self.password, - host = self.host, - port = self.port, - database = self.dbname) - except(Exception, psycopg2.Error) as error : - sys.exit(f"Error while connecting to PostgreSQL: {error}") - self.cursor = self.conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor) - - def disconnect(self): - if self.conn: - self.cursor.close() - self.conn.close() + yield conn + finally: + self.connPool.putconn(conn, close = False) + + #def disconnect(self): + # if self.conn: + # self.cursor.close() + # self.conn.close() """ Getters @@ -40,136 +65,152 @@ class DbConnector(object): def nodeExists(self, node): """Checks if a VOSpace node already exists. Returns a boolean.""" - if self.conn: - nodeVOSPath = node.parentPath + '/' + node.name - self.cursor.execute("SELECT * FROM node_vos_path WHERE vos_path = %s;", (nodeVOSPath,)) - result = self.cursor.fetchall() - if result: - return True - else: - return False + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + nodeVOSPath = node.parentPath + '/' + node.name + cursor.execute("SELECT * FROM node_vos_path WHERE vos_path = %s;", (nodeVOSPath,)) + result = cursor.fetchall() + except Exception as e: + if not conn.closed: + conn.rollback() + print(e) + if result: + return True + else: + return False def getOSPath(self, vospacePath): """Returns a list containing full path, storage type and username for a VOSpace path.""" - if self.conn: - self.cursor.execute(""" - SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, os_path - FROM node_path p - JOIN node n ON p.node_id = n.node_id - JOIN location l ON n.location_id = l.location_id - JOIN storage s ON s.storage_id = l.storage_src_id - JOIN users u ON u.rap_id = n.owner_id - WHERE p.vos_path = %s; - """, - (vospacePath,)) - result = self.cursor.fetchall() - storageType = result[0]["storage_type"] - basePath = result[0]["base_path"] - userName = result[0]["user_name"] - tstampWrappedDir = result[0]["tstamp_wrapper_dir"] - osPath = result[0]["os_path"] - if tstampWrappedDir is None: - fullPath = basePath + "/" + userName + osPath - else: - fullPath = basePath + "/" + userName + "/" + tstampWrappedDir + osPath - return [ fullPath, storageType, userName, osPath ] + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, os_path + FROM node_path p + JOIN node n ON p.node_id = n.node_id + JOIN location l ON n.location_id = l.location_id + JOIN storage s ON s.storage_id = l.storage_src_id + JOIN users u ON u.rap_id = n.owner_id + WHERE p.vos_path = %s; + """, + (vospacePath,)) + result = cursor.fetchall() + except Exception as e: + if not conn.closed: + conn.rollback() + print(e) + storageType = result[0]["storage_type"] + basePath = result[0]["base_path"] + userName = result[0]["user_name"] + tstampWrappedDir = result[0]["tstamp_wrapper_dir"] + osPath = result[0]["os_path"] + if tstampWrappedDir is None: + fullPath = basePath + "/" + userName + osPath + else: + fullPath = basePath + "/" + userName + "/" + tstampWrappedDir + osPath + return [ fullPath, storageType, userName, osPath ] def getVOSpacePathList(self, vospacePath): """Returns the list of VOSpace paths carried by a VOSpace node, according to the node VOSpace path.""" - if self.conn: - self.cursor.execute(""" - SELECT op.vos_path - FROM node_vos_path vp - JOIN list_of_files l ON l.list_node_id = vp.node_id - JOIN node_path op ON op.node_id = l.node_id - WHERE vp.vos_path = %s; - """, - (vospacePath,)) - results = self.cursor.fetchall() - vospacePathList = [] - for el in results: - vospacePathList.append(el["vos_path"]) - return vospacePathList + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + SELECT op.vos_path + FROM node_vos_path vp + JOIN list_of_files l ON l.list_node_id = vp.node_id + JOIN node_path op ON op.node_id = l.node_id + WHERE vp.vos_path = %s; + """, + (vospacePath,)) + results = cursor.fetchall() + except Exception as e: + if not conn.closed: + conn.rollback() + print(e) + vospacePathList = [] + for el in results: + vospacePathList.append(el["vos_path"]) + return vospacePathList ##### Job ##### def jobExists(self, jobId): """Checks if a job already exists. Returns a boolean.""" - if self.conn: - self.cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) - result = self.cursor.fetchall() - if result: - return True - else: - return False + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) + result = cursor.fetchall() + except Exception as e: + if not conn.closed: + conn.rollback() + print(e) + if result: + return True + else: + return False def getJob(self, jobId): """Returns a JSON object containing job information, according to the job id.""" - if self.conn: - self.cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) - out = open("db_connector_log.txt", "a") - result = self.cursor.fetchall() - #out.write(f"result: {result}\n\n") - #out.close() - if not result: - return json.loads('{ "error": "JOB_NOT_FOUND" }') - else: - job = dict() - for idx in result[0]: - oldIdx = idx - idxTokens = idx.split('_') - idx = idxTokens[0] + ''.join(token.title() for token in idxTokens[1:]) - job[idx] = result[0][oldIdx] - el = job[idx] - if isinstance(el, datetime.datetime): - job[idx] = el.isoformat() - out.write(f"job: {job}\n\n") - out.close() - return job + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) + out = open("db_connector_log.txt", "a") + result = cursor.fetchall() + #out.write(f"result: {result}\n\n") + #out.close() + except Exception as e: + if not conn.closed: + conn.rollback() + if not result: + return json.loads('{ "error": "JOB_NOT_FOUND" }') + else: + job = dict() + for idx in result[0]: + oldIdx = idx + idxTokens = idx.split('_') + idx = idxTokens[0] + ''.join(token.title() for token in idxTokens[1:]) + job[idx] = result[0][oldIdx] + el = job[idx] + if isinstance(el, datetime.datetime): + job[idx] = el.isoformat() + out.write(f"job: {job}\n\n") + out.close() + return job def getJobInfo(self, jobId): """Returns the 'job_info' field, according to the UWS specification.""" - if self.conn: - self.cursor.execute("SELECT job_info FROM job WHERE job_id = %s;", (jobId,)) - result = self.cursor.fetchall() - return result[0]["job_info"] + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT job_info FROM job WHERE job_id = %s;", (jobId,)) + result = cursor.fetchall() + except Exception as e: + if not conn.closed: + conn.rollback() + return result[0]["job_info"] def getJobResults(self, jobId): """Returns the 'results' field, according to the UWS specification.""" - if self.conn: - self.cursor.execute("SELECT results FROM job WHERE job_id = %s;", (jobId,)) - result = self.cursor.fetchall() - return result[0]["results"] + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT results FROM job WHERE job_id = %s;", (jobId,)) + result = cursor.fetchall() + except Exception as e: + if not conn.closed: + conn.rollback() + return result[0]["results"] def listActiveJobs(self): """Returns some info about active jobs.""" - if self.conn: - self.cursor.execute(""" - SELECT job_id, - job_type, - phase, - creation_time, - start_time, - owner_id - FROM job - WHERE phase - NOT IN ('ABORTED', - 'COMPLETED', - 'ERROR'); - """) - result = self.cursor.fetchall() - for row in result: - for idx in row: - el = row[idx] - if isinstance(el, datetime.datetime): - row[idx] = el.isoformat() - return result - - def listJobsByPhase(self, phase): - """Returns some info about jobs according to the phase.""" - if self.conn: - if phase in [ "PENDING", "QUEUED", "EXECUTING" ]: - self.cursor.execute(""" + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" SELECT job_id, job_type, phase, @@ -177,116 +218,204 @@ class DbConnector(object): start_time, owner_id FROM job - WHERE phase = %s; - """, - (phase,)) - else: - self.cursor.execute(""" - SELECT job_id, - job_type, - phase, - start_time, - end_time, - owner_id - FROM job - WHERE phase = %s; - """, - (phase,)) - result = self.cursor.fetchall() - for row in result: - for idx in row: - el = row[idx] - if isinstance(el, datetime.datetime): - row[idx] = el.isoformat() - return result + WHERE phase + NOT IN ('ABORTED', + 'COMPLETED', + 'ERROR'); + """) + result = cursor.fetchall() + except Exception as e: + if not conn.closed: + conn.rollback() + for row in result: + for idx in row: + el = row[idx] + if isinstance(el, datetime.datetime): + row[idx] = el.isoformat() + return result + + def listJobsByPhase(self, phase): + """Returns some info about jobs according to the phase.""" + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + if phase in [ "PENDING", "QUEUED", "EXECUTING" ]: + cursor.execute(""" + SELECT job_id, + job_type, + phase, + creation_time, + start_time, + owner_id + FROM job + WHERE phase = %s; + """, + (phase,)) + else: + cursor.execute(""" + SELECT job_id, + job_type, + phase, + start_time, + end_time, + owner_id + FROM job + WHERE phase = %s; + """, + (phase,)) + result = cursor.fetchall() + except Exception as e: + if not conn.closed: + conn.rollback() + for row in result: + for idx in row: + el = row[idx] + if isinstance(el, datetime.datetime): + row[idx] = el.isoformat() + return result ##### User ##### def userExists(self, username): """Checks if a user already exists. Returns a boolean.""" - if self.conn: - self.cursor.execute("SELECT * FROM users WHERE user_name = %s;", (username,)) - result = self.cursor.fetchall() - if result: - return True - else: - return False + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT * FROM users WHERE user_name = %s;", (username,)) + result = cursor.fetchall() + except Exception as e: + if not conn.closed: + conn.rollback() + if result: + return True + else: + return False def getRapId(self, username): """Returns the RAP id for a given user name.""" - if self.conn: - self.cursor.execute("SELECT rap_id FROM users WHERE user_name = %s;", (username,)) - return self.cursor.fetchall()[0]["rap_id"] + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT rap_id FROM users WHERE user_name = %s;", (username,)) + result = cursor.fetchall() + except Exception as e: + if not conn.closed: + conn.rollback() + return result[0]["rap_id"] def getUserName(self, rapId): """Returns the user name for a given RAP id.""" - if self.conn: - self.cursor.execute("SELECT user_name FROM users WHERE rap_id = %s;", (rapId,)) - return self.cursor.fetchall()[0]["user_name"] + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT user_name FROM users WHERE rap_id = %s;", (rapId,)) + result = cursor.fetchall() + except Exception as e: + if not conn.closed: + conn.rollback() + return result[0]["user_name"] ##### Storage ##### def storageBasePathIsValid(self, path): """Checks if the base path of a physical path is valid. If true, returns the base path, else returns 'False'.""" - if self.conn: - self.cursor.execute(""" - SELECT base_path - FROM storage - WHERE position(base_path in cast(%s as varchar)) > 0; - """, - (path,)) - result = self.cursor.fetchall() - if result: - return result[0]["base_path"] - else: - return False + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + SELECT base_path + FROM storage + WHERE position(base_path in cast(%s as varchar)) > 0; + """, + (path,)) + result = cursor.fetchall() + except Exception as e: + if not conn.closed: + conn.rollback() + if result: + return result[0]["base_path"] + else: + return False def getStorageBasePath(self, storageId): """Returns the storage base path for a give storage id.""" - if self.conn: - self.cursor.execute("SELECT base_path FROM storage WHERE storage_id = %s;", (storageId,)) - return self.cursor.fetchall()[0]["base_path"] + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT base_path FROM storage WHERE storage_id = %s;", (storageId,)) + result = cursor.fetchall() + except Exception as e: + if not conn.closed: + conn.rollback() + return result[0]["base_path"] def getStorageList(self): """Returns the full storage base list. Local storage points are excluded by default.""" - if self.conn: - self.cursor.execute("SELECT * FROM storage WHERE storage_type <> 'local';") - result = self.cursor.fetchall() - return result + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT * FROM storage WHERE storage_type <> 'local';") + result = cursor.fetchall() + except Exception as e: + if not conn.closed: + conn.rollback() + return result def getStorageListByType(self, storageType): """Returns a list of storage locations for a given storage type.""" - if self.conn: - self.cursor.execute("SELECT * FROM storage WHERE storage_type = %s;", (storageType,)) - return self.cursor.fetchall() + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT * FROM storage WHERE storage_type = %s;", (storageType,)) + result = cursor.fetchall() + except Exception as e: + if not conn.closed: + conn.rollback() + return result def getStorageType(self, basePath): """Returns the storage type for a given storage base path, if any. Otherwise it returns 'False'.""" - if self.conn: - self.cursor.execute("SELECT storage_type FROM storage WHERE base_path = %s;", (basePath,)) - result = self.cursor.fetchall() - if result: - return result[0]["storage_type"] - else: - return False + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT storage_type FROM storage WHERE base_path = %s;", (basePath,)) + result = cursor.fetchall() + except Exception as e: + if not conn.closed: + conn.rollback() + if result: + return result[0]["storage_type"] + else: + return False def getStorageId(self, basePath): """Returns the storage id for a given storage base path, if any. Otherwise it returns 'False'.""" - if self.conn: - self.cursor.execute("SELECT storage_id FROM storage WHERE base_path = %s;", (basePath,)) - result = self.cursor.fetchall() - if result: - return result[0]["storage_id"] - else: - return False + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT storage_id FROM storage WHERE base_path = %s;", (basePath,)) + result = cursor.fetchall() + except Exception as e: + if not conn.closed: + conn.rollback() + if result: + return result[0]["storage_id"] + else: + return False ##### Location ##### def getLocationId(self, destStorageId): """Returns the location id according to the storage id of the destination.""" - if self.conn: - self.cursor.execute("SELECT location_id FROM location WHERE storage_src_id = %s;", (destStorageId,)) - return self.cursor.fetchall()[0]["location_id"] + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT location_id FROM location WHERE storage_src_id = %s;", (destStorageId,)) + result = cursor.fetchall() + except Exception as e: + if not conn.closed: + conn.rollback() + return result[0]["location_id"] """ @@ -297,272 +426,351 @@ class DbConnector(object): def insertJob(self, jobObj): """Inserts/updates a job object.""" - if self.conn: - self.cursor.execute(""" - INSERT INTO job(job_id, - owner_id, - job_type, - phase, - start_time, - end_time, - job_info, - results) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s) - ON CONFLICT (job_id) - DO UPDATE SET - (owner_id, - job_type, phase, - start_time, - end_time, - job_info, - results) - = (EXCLUDED.owner_id, - EXCLUDED.job_type, - EXCLUDED.phase, - EXCLUDED.start_time, - EXCLUDED.end_time, - EXCLUDED.job_info, - EXCLUDED.results); - """, - (jobObj.jobId, - jobObj.ownerId, - jobObj.type, - jobObj.phase, - jobObj.startTime, - jobObj.endTime, - json.dumps(jobObj.jobInfo), - json.dumps(jobObj.results),)) - self.conn.commit() + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + INSERT INTO job(job_id, + owner_id, + job_type, + phase, + start_time, + end_time, + job_info, + results) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (job_id) + DO UPDATE SET + (owner_id, + job_type, phase, + start_time, + end_time, + job_info, + results) + = (EXCLUDED.owner_id, + EXCLUDED.job_type, + EXCLUDED.phase, + EXCLUDED.start_time, + EXCLUDED.end_time, + EXCLUDED.job_info, + EXCLUDED.results); + """, + (jobObj.jobId, + jobObj.ownerId, + jobObj.type, + jobObj.phase, + jobObj.startTime, + jobObj.endTime, + json.dumps(jobObj.jobInfo), + json.dumps(jobObj.results),)) + conn.commit() + except Exception as e: + if not conn.closed: + conn.rollback() def setStartTime(self, jobId): """Sets the job 'start_time' parameter.""" - if self.conn: - startTime = datetime.datetime.today().isoformat() - self.cursor.execute(""" - UPDATE job SET start_time = %s - WHERE job_id = %s; - """, - (startTime, jobId,)) - self.conn.commit() + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + startTime = datetime.datetime.today().isoformat() + cursor.execute(""" + UPDATE job SET start_time = %s + WHERE job_id = %s; + """, + (startTime, jobId,)) + conn.commit() + except Exception as e: + if not conn.closed: + conn.rollback() def setEndTime(self, jobId): """Sets the job 'end_time' parameter.""" - if self.conn: - endTime = datetime.datetime.today().isoformat() - self.cursor.execute(""" - UPDATE job SET end_time = %s - WHERE job_id = %s; - """, - (endTime, jobId,)) - self.conn.commit() + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + endTime = datetime.datetime.today().isoformat() + cursor.execute(""" + UPDATE job SET end_time = %s + WHERE job_id = %s; + """, + (endTime, jobId,)) + conn.commit() + except Exception as e: + if not conn.closed: + conn.rollback() def setPhase(self, jobId, phase): """Sets the job 'phase' parameter.""" - if self.conn: - self.cursor.execute(""" - UPDATE job SET phase = %s - WHERE job_id = %s; - """, - (phase, jobId,)) - self.conn.commit() + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + UPDATE job SET phase = %s + WHERE job_id = %s; + """, + (phase, jobId,)) + conn.commit() + except Exception as e: + if not conn.closed: + conn.rollback() def setResults(self, jobId, results): """Sets the job 'results' parameter.""" - if self.conn: - self.cursor.execute(""" - UPDATE job SET results = %s - WHERE job_id = %s; - """, - (json.dumps(results), - jobId,)) - self.conn.commit() + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + UPDATE job SET results = %s + WHERE job_id = %s; + """, + (json.dumps(results), + jobId,)) + conn.commit() + except Exception as e: + if not conn.closed: + conn.rollback() ##### Node ##### def insertNode(self, node): """Inserts a VOSpace node.""" - if self.conn: - out = open("db_connector_log.txt", "a") - out.write(f"parentOSPath: {node.parentPath}\n") - out.write(f"name: {node.name}\n") - self.cursor.execute(""" - SELECT path FROM node n - JOIN node_vos_path o ON n.node_id = o.node_id - WHERE vos_path = %s; - """, - (node.parentPath,)) - result = self.cursor.fetchall() - for i in result: - out.write(f"queryResult: {i}\n") - #parentLtreePath = self.cursor.fetchone()[0] - parentLtreePath = result[0]["path"] - parentLtreeRelativePath = "" - if "." in parentLtreePath: - parentLtreeRelativePath = ".".join(parentLtreePath.strip(".").split('.')[1:]) - out.write(f"parentLtreeRelativePath: {parentLtreeRelativePath}\n") - out.write(f"parentLtreePath: {parentLtreePath}\n") - out.write(f"parentPath: {node.parentPath}\n\n") - out.close() - #print(f"parentLtreePath: {parentLtreePath}, type: {type(parentLtreePath)}") - self.cursor.execute(""" - INSERT INTO node(parent_path, - parent_relative_path, - name, - tstamp_wrapper_dir, - type, - location_id, - busy_state, - owner_id, - creator_id, - content_length, - content_md5) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); - """, - (parentLtreePath, - parentLtreeRelativePath, - node.name, - node.wrapperDir, - node.type, - node.locationId, - node.busyState, - node.ownerID, - node.creatorID, - node.contentLength, - node.contentMD5,)) - self.conn.commit() + with self.getConnection() as conn: + try: + out = open("db_connector_log.txt", "a") + out.write(f"parentOSPath: {node.parentPath}\n") + out.write(f"name: {node.name}\n") + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + SELECT path FROM node n + JOIN node_vos_path o ON n.node_id = o.node_id + WHERE vos_path = %s; + """, + (node.parentPath,)) + result = cursor.fetchall() + for i in result: + out.write(f"queryResult: {i}\n") + #parentLtreePath = self.cursor.fetchone()[0] + parentLtreePath = result[0]["path"] + parentLtreeRelativePath = "" + if "." in parentLtreePath: + parentLtreeRelativePath = ".".join(parentLtreePath.strip(".").split('.')[1:]) + out.write(f"parentLtreeRelativePath: {parentLtreeRelativePath}\n") + out.write(f"parentLtreePath: {parentLtreePath}\n") + out.write(f"parentPath: {node.parentPath}\n\n") + out.close() + except Exception as e: + if not conn.closed: + conn.rollback() + + try: + #print(f"parentLtreePath: {parentLtreePath}, type: {type(parentLtreePath)}") + cursor.execute(""" + INSERT INTO node(parent_path, + parent_relative_path, + name, + tstamp_wrapper_dir, + type, + location_id, + busy_state, + owner_id, + creator_id, + content_length, + content_md5) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); + """, + (parentLtreePath, + parentLtreeRelativePath, + node.name, + node.wrapperDir, + node.type, + node.locationId, + node.busyState, + node.ownerID, + node.creatorID, + node.contentLength, + node.contentMD5,)) + conn.commit() + except Exception as e: + if not conn.closed: + conn.rollback() def deleteTmpDataNode(self, vospacePath): """Deletes a temporary VOSpace data node.""" - if self.conn: - self.cursor.execute(""" - WITH deleted AS ( - DELETE FROM list_of_files - WHERE list_node_id = - (SELECT node_id FROM node_vos_path - WHERE vos_path = %s) - RETURNING list_node_id - ) DELETE FROM node - WHERE node_id = - (SELECT DISTINCT(list_node_id) - FROM deleted); - """, - (vospacePath,)) - self.conn.commit() + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + WITH deleted AS ( + DELETE FROM list_of_files + WHERE list_node_id = + (SELECT node_id FROM node_vos_path + WHERE vos_path = %s) + RETURNING list_node_id + ) DELETE FROM node + WHERE node_id = + (SELECT DISTINCT(list_node_id) + FROM deleted); + """, + (vospacePath,)) + conn.commit() + except Exception as e: + if not conn.closed: + conn.rollback() def setAsyncTrans(self, nodeVOSPath, value): """Sets the 'async_trans' flag for a VOSpace node.""" - if self.conn: - self.cursor.execute(""" - UPDATE node SET async_trans = %s - WHERE path <@ - (SELECT path - FROM node_path p - JOIN node n ON p.node_id = n.node_id - WHERE p.vos_path = %s); - """, - (value, nodeVOSPath,)) - self.conn.commit() + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + UPDATE node SET async_trans = %s + WHERE path <@ + (SELECT path + FROM node_path p + JOIN node n ON p.node_id = n.node_id + WHERE p.vos_path = %s); + """, + (value, nodeVOSPath,)) + conn.commit() + except Exception as e: + if not conn.closed: + conn.rollback() def setSticky(self, nodeVOSPath, value): """Sets the 'sticky' flag for a VOSpace node.""" - if self.conn: - self.cursor.execute(""" - UPDATE node SET sticky = %s - WHERE path <@ - (SELECT path - FROM node_path p - JOIN node n ON p.node_id = n.node_id - WHERE p.vos_path = %s); - """, - (value, nodeVOSPath,)) - self.conn.commit() + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + UPDATE node SET sticky = %s + WHERE path <@ + (SELECT path + FROM node_path p + JOIN node n ON p.node_id = n.node_id + WHERE p.vos_path = %s); + """, + (value, nodeVOSPath,)) + conn.commit() + except Exception as e: + if not conn.closed: + conn.rollback() + def setBusyState(self, nodeVOSPath, value): """Sets the 'busy_state' flag for a VOSpace node.""" - if self.conn: - self.cursor.execute(""" - UPDATE node SET busy_state = %s - WHERE path <@ - (SELECT path - FROM node_path p - JOIN node n ON p.node_id = n.node_id - WHERE p.vos_path = %s); - """, - (value, nodeVOSPath,)) - self.conn.commit() + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + UPDATE node SET busy_state = %s + WHERE path <@ + (SELECT path + FROM node_path p + JOIN node n ON p.node_id = n.node_id + WHERE p.vos_path = %s); + """, + (value, nodeVOSPath,)) + conn.commit() + except Exception as e: + if not conn.closed: + conn.rollback() ##### Storage ##### def insertStorage(self, storageType, basePath, baseUrl, hostname): """Inserts a storage point.""" - if self.conn: + with self.getConnection() as conn: if not self.getStorageId(basePath): - self.cursor.execute(""" - INSERT INTO storage(storage_type, - base_path, - base_url, - hostname) - VALUES (%s, %s, %s, %s) - RETURNING storage_id; - """, - (storageType, - basePath, - baseUrl, - hostname,)) - - storageSrcId = self.cursor.fetchall()[0]["storage_id"] + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + INSERT INTO storage(storage_type, + base_path, + base_url, + hostname) + VALUES (%s, %s, %s, %s) + RETURNING storage_id; + """, + (storageType, + basePath, + baseUrl, + hostname,)) + + storageSrcId = cursor.fetchall()[0]["storage_id"] + + except Exception as e: + if not conn.closed: + conn.rollback() if storageType == "cold" or storageType == "hot": - self.cursor.execute(""" - SELECT storage_id - FROM storage - WHERE storage_type = 'local' - AND base_path = '/home' - AND hostname = 'localhost'; - """) - storageDestId = self.cursor.fetchall()[0]["storage_id"] + try: + cursor.execute(""" + SELECT storage_id + FROM storage + WHERE storage_type = 'local' + AND base_path = '/home' + AND hostname = 'localhost'; + """) + storageDestId = cursor.fetchall()[0]["storage_id"] + except Exception as e: + if not conn.closed: + conn.rollback() locationType = "async" else: storageDestId = storageSrcId locationType = "portal" - - self.cursor.execute(""" - INSERT INTO location(location_type, - storage_src_id, - storage_dest_id) - VALUES (%s, %s, %s); - """, - (locationType, - storageSrcId, - storageDestId,)) - - self.conn.commit() + try: + cursor.execute(""" + INSERT INTO location(location_type, + storage_src_id, + storage_dest_id) + VALUES (%s, %s, %s); + """, + (locationType, + storageSrcId, + storageDestId,)) + + conn.commit() + except: + if not conn.closed: + conn.rollback() return True else: return False def deleteStorage(self, storageId): """Deletes a storage point.""" - if self.conn: - self.cursor.execute(""" - SELECT count(*) > 0 AS res - FROM Node - WHERE location_id IN - (SELECT location_id - FROM storage s - JOIN location l - ON s.storage_id = l.storage_src_id - WHERE storage_src_id = %s); - """, - (storageId,)) - - if self.cursor.fetchall()[0]["res"]: - return False - else: - self.cursor.execute(""" - DELETE FROM storage - WHERE storage_id = %s; + with self.getConnection() as conn: + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + SELECT count(*) > 0 AS res + FROM Node + WHERE location_id IN + (SELECT location_id + FROM storage s + JOIN location l + ON s.storage_id = l.storage_src_id + WHERE storage_src_id = %s); """, (storageId,)) - self.conn.commit() + result = cursor.fetchall() + except: + if not conn.closed: + conn.rollback() + + if result[0]["res"]: + return False + else: + try: + cursor.execute(""" + DELETE FROM storage + WHERE storage_id = %s; + """, + (storageId,)) + conn.commit() + except: + if not conn.closed: + conn.rollback() return True diff --git a/transfer_service/get_job_amqp_server.py b/transfer_service/get_job_amqp_server.py index dcbb617..c87c8af 100644 --- a/transfer_service/get_job_amqp_server.py +++ b/transfer_service/get_job_amqp_server.py @@ -20,9 +20,9 @@ class GetJobAMQPServer(AMQPServer): def execute_callback(self, requestBody): if "jobId" in requestBody: - self.dbConn.connect() + #self.dbConn.connect() dbResponse = self.dbConn.getJob(requestBody["jobId"]) - self.dbConn.disconnect() + #self.dbConn.disconnect() print(f"Db response: {dbResponse}") return dbResponse else: diff --git a/transfer_service/import_amqp_server.py b/transfer_service/import_amqp_server.py index f0ebe73..0dd5b4b 100644 --- a/transfer_service/import_amqp_server.py +++ b/transfer_service/import_amqp_server.py @@ -39,7 +39,7 @@ class ImportAMQPServer(AMQPServer): elif requestBody["requestType"] == "NODE_IMPORT": self.path = os.path.abspath(requestBody["path"]) self.username = requestBody["userName"] - self.dbConn.connect() + #self.dbConn.connect() userInDb = self.dbConn.userExists(self.username) userInfo = self.systemUtils.userInfo(self.username) out = open("import_amqp_server_log.txt", "a") @@ -48,7 +48,7 @@ class ImportAMQPServer(AMQPServer): response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": "The user does not exist or is not registered in the database." } - self.dbConn.disconnect() + #self.dbConn.disconnect() return response userId = self.dbConn.getRapId(self.username) @@ -61,32 +61,32 @@ class ImportAMQPServer(AMQPServer): response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "Invalid storage mount point." } - self.dbConn.disconnect() + #self.dbConn.disconnect() return response if not os.path.exists(self.path): response = { "responseType": "ERROR", "errorCode": 4, "errorMsg": "Path not found." } - self.dbConn.disconnect() + #self.dbConn.disconnect() return response elif not os.path.isdir(self.path): response = { "responseType": "ERROR", "errorCode": 5, "errorMsg": "Directory path expected." } - self.dbConn.disconnect() + #self.dbConn.disconnect() return response elif self.username not in self.path: response = { "responseType": "ERROR", "errorCode": 6, "errorMsg": "Directory path does not contain the username." } - self.dbConn.disconnect() + #self.dbConn.disconnect() return response elif os.path.dirname(self.path) != pathPrefix + '/' + self.username: response = { "responseType": "ERROR", "errorCode": 7, "errorMsg": "Invalid path, directory must be located in " + pathPrefix + '/' + self.username } - self.dbConn.disconnect() + #self.dbConn.disconnect() return response else: if storageType == "cold": @@ -163,7 +163,7 @@ class ImportAMQPServer(AMQPServer): self.dbConn.setAsyncTrans(vospacePath, True) self.dbConn.setSticky(vospacePath, True) - self.dbConn.disconnect() + #self.dbConn.disconnect() # add a counter to track the number of nodes (files and dirs) + log file response = { "responseType": "IMPORT_DONE" } else: diff --git a/transfer_service/job_amqp_server.py b/transfer_service/job_amqp_server.py index f1b91d1..a29db9f 100644 --- a/transfer_service/job_amqp_server.py +++ b/transfer_service/job_amqp_server.py @@ -27,19 +27,19 @@ class JobAMQPServer(AMQPServer): "errorCode": 1, "errorMsg": "Malformed request, missing parameters." } elif requestBody["requestType"] == "JOB_LIST": - self.dbConn.connect() + #self.dbConn.connect() result = self.dbConn.listActiveJobs() - self.dbConn.disconnect() + #self.dbConn.disconnect() response = { "responseType": "LST_DONE", "jobList": result } elif requestBody["requestType"] == "JOB_BY_PHASE": self.jobPhase = requestBody["jobPhase"] - self.dbConn.connect() + #self.dbConn.connect() result = self.dbConn.listJobsByPhase(self.jobPhase) - self.dbConn.disconnect() + #self.dbConn.disconnect() response = { "responseType": "LST_BY_PHASE_DONE", "jobList": result } elif requestBody["requestType"] == "JOB_INFO": self.jobId = requestBody["jobId"] - self.dbConn.connect() + #self.dbConn.connect() if self.dbConn.jobExists(self.jobId): result = self.dbConn.getJobInfo(self.jobId) response = { "responseType": "LST_INFO_DONE", "jobInfo": result } @@ -47,10 +47,10 @@ class JobAMQPServer(AMQPServer): response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": "Wrong job ID." } - self.dbConn.disconnect() + #self.dbConn.disconnect() elif requestBody["requestType"] == "JOB_RESULTS": self.jobId = requestBody["jobId"] - self.dbConn.connect() + #self.dbConn.connect() if self.dbConn.jobExists(self.jobId): result = self.dbConn.getJobResults(self.jobId) response = { "responseType": "LST_RESULTS_DONE", "jobResults": result } @@ -58,7 +58,7 @@ class JobAMQPServer(AMQPServer): response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": "Wrong job ID." } - self.dbConn.disconnect() + #self.dbConn.disconnect() else: response = { "responseType": "ERROR", "errorCode": 3, diff --git a/transfer_service/retrieve_executor.py b/transfer_service/retrieve_executor.py index b93d50a..68fc121 100644 --- a/transfer_service/retrieve_executor.py +++ b/transfer_service/retrieve_executor.py @@ -32,7 +32,7 @@ class RetrieveExecutor(TaskExecutor): def prepareData(self): fileList = [] - self.dbConn.connect() + #self.dbConn.connect() self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) for vospacePath in self.nodeList: @@ -44,14 +44,14 @@ class RetrieveExecutor(TaskExecutor): fileList.append(os.path.join(root, f)) else: fileList.append(srcPath) - self.dbConn.disconnect() + #self.dbConn.disconnect() if fileList: self.tapeClient.connect() self.tapeClient.recall(fileList) self.tapeClient.disconnect() def retrieveData(self): - self.dbConn.connect() + #self.dbConn.connect() #self.dbConn.setPhase(self.jobId, "EXECUTING") for vospacePath in self.nodeList: [srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) @@ -78,29 +78,29 @@ class RetrieveExecutor(TaskExecutor): return False else: self.updateAsyncTrans(vospacePath) - self.dbConn.disconnect() + #self.dbConn.disconnect() return True def updateAsyncTrans(self, vospacePath): self.dbConn.setAsyncTrans(vospacePath, False); def updateJobStatus(self): - self.dbConn.connect() + #self.dbConn.connect() results = [{"target": ""}] results[0]["target"] = self.jobObj.jobInfo["transfer"]["target"] self.dbConn.setResults(self.jobId, results) self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) - self.dbConn.disconnect() + #self.dbConn.disconnect() def cleanup(self): nodeType = self.jobObj.jobInfo["transfer"]["protocols"][0]["param"][0]["value"] vospacePath = self.jobObj.jobInfo["transfer"]["target"].split("!vospace")[1] if nodeType == "list": - self.dbConn.connect() + #self.dbConn.connect() self.dbConn.deleteTmpDataNode(vospacePath) - self.dbConn.disconnect() + #self.dbConn.disconnect() def run(self): print("Starting retrieve executor...") diff --git a/transfer_service/retrieve_preprocessor.py b/transfer_service/retrieve_preprocessor.py index 7648c83..04b6e3c 100644 --- a/transfer_service/retrieve_preprocessor.py +++ b/transfer_service/retrieve_preprocessor.py @@ -23,9 +23,9 @@ class RetrievePreprocessor(TaskExecutor): if nodeType == "single": self.nodeList.append(vospacePath) else: - self.dbConn.connect() + #self.dbConn.connect() self.nodeList = self.dbConn.getVOSpacePathList(vospacePath) - self.dbConn.disconnect() + #self.dbConn.disconnect() self.jobObj.jobInfo["nodeList"] = self.nodeList def run(self): diff --git a/transfer_service/start_job_amqp_server.py b/transfer_service/start_job_amqp_server.py index 4e17e60..ff17d2c 100644 --- a/transfer_service/start_job_amqp_server.py +++ b/transfer_service/start_job_amqp_server.py @@ -36,10 +36,10 @@ class StartJobAMQPServer(AMQPServer): self.job.setPhase(requestBody["phase"]) self.job.setInfo(requestBody["jobInfo"]) self.job.setOwnerId(requestBody["ownerId"]) - self.dbConn.connect() + #self.dbConn.connect() self.dbConn.insertJob(self.job) dbResponse = self.dbConn.getJob(self.job.jobId) - self.dbConn.disconnect() + #self.dbConn.disconnect() print(f"Db response: {dbResponse}") self.pendingQueueRead.insertJob(self.job) #t = threading.Thread(target = self.fake_job) # only for testing purposes diff --git a/transfer_service/storage_amqp_server.py b/transfer_service/storage_amqp_server.py index 8289619..71ad585 100644 --- a/transfer_service/storage_amqp_server.py +++ b/transfer_service/storage_amqp_server.py @@ -43,12 +43,12 @@ class StorageAMQPServer(AMQPServer): "errorMsg": "Base path doesn't exist."} return response - self.dbConn.connect() + #self.dbConn.connect() result = self.dbConn.insertStorage(self.storageType, self.storageBasePath, self.storageBaseUrl, self.storageHostname) - self.dbConn.disconnect() + #self.dbConn.disconnect() if result: response = { "responseType": "STORAGE_ADD_DONE" } @@ -58,9 +58,9 @@ class StorageAMQPServer(AMQPServer): "errorMsg": "Storage point already exists." } elif requestBody["requestType"] == "STORAGE_DEL_REQ": - self.dbConn.connect() + #self.dbConn.connect() result = self.dbConn.getStorageList() - self.dbConn.disconnect() + #self.dbConn.disconnect() response = { "responseType": "STORAGE_DEL_ACK", "storageList": result } @@ -71,9 +71,9 @@ class StorageAMQPServer(AMQPServer): self.storageId = requestBody["storageId"] if self.storageAck: self.storageAck = False - self.dbConn.connect() + #self.dbConn.connect() result = self.dbConn.deleteStorage(self.storageId) - self.dbConn.disconnect() + #self.dbConn.disconnect() if result: response = { "responseType": "STORAGE_DEL_DONE" } @@ -87,9 +87,9 @@ class StorageAMQPServer(AMQPServer): "errorMsg": "Store request not acknowledged." } elif requestBody["requestType"] == "STORAGE_LST": - self.dbConn.connect() + #self.dbConn.connect() result = self.dbConn.getStorageList() - self.dbConn.disconnect() + #self.dbConn.disconnect() response = { "responseType": "STORAGE_LST_DONE", "storageList": result } diff --git a/transfer_service/store_amqp_server.py b/transfer_service/store_amqp_server.py index 7ad2102..a6a724f 100644 --- a/transfer_service/store_amqp_server.py +++ b/transfer_service/store_amqp_server.py @@ -49,13 +49,13 @@ class StoreAMQPServer(AMQPServer): self.job.setType("other") self.job.setInfo(requestBody) self.job.setPhase("PENDING") - self.dbConn.connect() + #self.dbConn.connect() userInDb = self.dbConn.userExists(user) if requestBody["requestType"] == "CSTORE": storageList = self.dbConn.getStorageListByType("cold") else: storageList = self.dbConn.getStorageListByType("hot") - self.dbConn.disconnect() + #self.dbConn.disconnect() #folderPath = "/home/" + user + "/store" folderPath = self.storageStorePath.replace("{username}", user) userInfo = self.systemUtils.userInfo(user) @@ -94,11 +94,11 @@ class StoreAMQPServer(AMQPServer): self.storeAck = False user = requestBody["userName"] self.prepare(user) - self.dbConn.connect() + #self.dbConn.connect() self.job.setOwnerId(self.dbConn.getRapId(self.username)) self.dbConn.insertJob(self.job) dbResponse = self.dbConn.getJob(self.job.jobId) - self.dbConn.disconnect() + #self.dbConn.disconnect() self.job.jobInfo["storageId"] = requestBody["storageId"] self.pendingQueueWrite.insertJob(self.job) if "error" in dbResponse: diff --git a/transfer_service/store_executor.py b/transfer_service/store_executor.py index 72b3285..17419e1 100644 --- a/transfer_service/store_executor.py +++ b/transfer_service/store_executor.py @@ -37,13 +37,13 @@ class StoreExecutor(TaskExecutor): super(StoreExecutor, self).__init__() def copyData(self): - self.dbConn.connect() + #self.dbConn.connect() self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) srcPathPrefix = self.storageStorePath.replace("{username}", self.username) srcData = os.listdir(srcPathPrefix) destPathPrefix = self.dbConn.getStorageBasePath(self.storageId) + '/' + self.username - self.dbConn.disconnect() + #self.dbConn.disconnect() sp = subprocess.run(["rsync", "-av", srcPathPrefix + '/', destPathPrefix + '/'], capture_output = True) if(sp.returncode or sp.stderr): return False @@ -68,7 +68,7 @@ class StoreExecutor(TaskExecutor): os.chmod(srcPathPrefix, 0o755) def update(self): - self.dbConn.connect() + #self.dbConn.connect() out = open("store_executor_log.txt", "a") results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) @@ -81,7 +81,7 @@ class StoreExecutor(TaskExecutor): self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) out.close() - self.dbConn.disconnect() + #self.dbConn.disconnect() def run(self): print("Starting store executor...") diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index bfa43ea..5577384 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -109,7 +109,7 @@ class StorePreprocessor(TaskExecutor): # File catalog update out = open("store_preprocessor_log.txt", "a") - self.dbConn.connect() + #self.dbConn.connect() self.userId = self.dbConn.getRapId(self.username) out.write(f"USER: {self.username}\n") out.write(f"USER_ID: {self.userId}\n") @@ -173,13 +173,13 @@ class StorePreprocessor(TaskExecutor): out.write("\n") out.close() - self.dbConn.disconnect() + #self.dbConn.disconnect() def update(self): self.jobObj.setPhase("QUEUED") - self.dbConn.connect() + #self.dbConn.connect() self.dbConn.setPhase(self.jobId, "QUEUED") - self.dbConn.disconnect() + #self.dbConn.disconnect() def run(self): print("Starting store preprocessor...") -- GitLab