Skip to content
Snippets Groups Projects
Select Git revision
  • 95474e54e36ee8f0f4093df02e3a2f2db7b69a4d
  • master default protected
  • rocky_8
  • rocky_9
  • pasture
  • testing
  • query
  • v0.2.9
  • v0.2.8
  • v0.2.7
  • v0.2.6
  • v0.2.5
  • v0.2.4
  • v0.2.3
  • v0.2.2
  • v0.2.1
  • v0.2.0
  • v0.1.2
  • v0.1.1
  • v0.1
20 results

db_connector.py

Blame
  • db_connector.py 40.10 KiB
    #!/usr/bin/env python
    
    import datetime
    import json
    import psycopg2
    import sys
    
    from contextlib import contextmanager
    from psycopg2.extras import RealDictCursor
    from psycopg2.pool import ThreadedConnectionPool
    
    from node import Node
    
    
    class DbConnector(object):
    
        def __init__(self, user, password, host, port, dbname, minConnNum, maxConnNum):
            self.user = user
            self.password = password
            self.host = host
            self.port = port
            self.dbname = dbname
            self.minConnNum = minConnNum
            self.maxConnNum = maxConnNum
            self.createConnectionPool()
    
        def createConnectionPool(self):
            self.connPool = ThreadedConnectionPool(
                self.minConnNum,
                self.maxConnNum,
                user = self.user,
                password = self.password,
                host = self.host,
                database = self.dbname,
                port = self.port,
                connect_timeout = 2
            )
    
        @contextmanager
        def getConnection(self):
            conn = self.connPool.getconn()
            #conn.autocommit = True
            try:
                yield conn
            finally:
                self.connPool.putconn(conn, close = False)
    
        """
        Getters
        """
    
        ##### Node #####
    
        def nodeExists(self, vospacePath):
            """Checks if a VOSpace node already exists. Returns a boolean."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("SELECT * FROM node_vos_path WHERE vos_path = %s;", (vospacePath,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    if result:
                        return True
                    else:
                        return False
    
        def getCreatorId(self, vospacePath):
            """Returns the creator ID for a given vospace path representing a node."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        SELECT creator_id
                        FROM node_vos_path nvp
                        JOIN node n ON nvp.node_id = n.node_id
                        WHERE vos_path = %s;
                        """, 
                        (vospacePath,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    return result[0]["creator_id"]
    
        def getGroupRead(self, vospacePath):
            """Returns the 'group_read' for a given VOSpace path representing a node."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        SELECT unnest(group_read) as group_read
                        FROM node_vos_path nvp
                        JOIN node n ON nvp.node_id = n.node_id
                        WHERE vos_path = %s;
                        """, 
                        (vospacePath,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    for i in range(0, len(result)):
                        result[i]["group_read"] = result[i]["group_read"].split("people.")[-1].replace("\\", "")
                    return result
    
        def getGroupWrite(self, vospacePath):
            """Returns the 'group_write' for a given VOSpace path representing a node."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        SELECT unnest(group_write) as group_write
                        FROM node_vos_path nvp
                        JOIN node n ON nvp.node_id = n.node_id
                        WHERE vos_path = %s;
                        """, 
                        (vospacePath,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    for i in range(0, len(result)):
                        result[i]["group_write"] = result[i]["group_write"].split("people.")[-1].replace("\\", "")
                    return result
    
        def getOSPath(self, vospacePath):
            """Returns a list containing full path, storage type and username for a VOSpace path."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, get_os_path(n.node_id) AS os_path, content_length
                        FROM node n
                        JOIN location l ON n.location_id = l.location_id
                        JOIN storage s ON s.storage_id = l.storage_src_id
                        JOIN users u ON u.user_id = n.creator_id
                        WHERE n.node_id = id_from_vos_path(%s);
                        """,
                        (vospacePath,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    storageType = result[0]["storage_type"]
                    basePath = result[0]["base_path"]
                    userName = result[0]["user_name"]
                    tstampWrappedDir = result[0]["tstamp_wrapper_dir"]
                    osPath = result[0]["os_path"]
                    contentLength = result[0]["content_length"]
                    if tstampWrappedDir is None:
                        baseSrcPath = basePath + "/" + userName
                    else:
                        baseSrcPath = basePath + "/" + userName + "/" + tstampWrappedDir
                    fullPath = baseSrcPath + osPath
                    fileInfo = {
                                   "baseSrcPath": baseSrcPath,
                                   "fullPath": fullPath, 
                                   "storageType": storageType, 
                                   "username": userName, 
                                   "osPath": osPath,
                                   "contentLength": contentLength
                              }
                    return fileInfo
    
        def getVOSpacePathList(self, vospacePath):
            """Returns the list of VOSpace paths carried by a VOSpace node, according to the node VOSpace path."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        SELECT get_vos_path(n.node_id)
                        FROM node n
                        JOIN list_of_files l ON l.node_id = n.node_id
                        WHERE l.list_node_id = id_from_vos_path(%s);
                        """,
                        (vospacePath,))
                    results = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    vospacePathList = []
                    for el in results:
                        vospacePathList.append(el["vos_path"])
                    return vospacePathList
    
        def getNodesToBeDeleted(self):
            "Returns a path list of files to be deleted with also the corresponding deletion timestamp."
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        WITH RECURSIVE all_nodes AS (
                            SELECT node_id, name, os_name, relative_path, parent_relative_path
                            FROM node
                            UNION
                            SELECT node_id, name, os_name, path(parent_relative_path, node_id) AS relative_path, parent_relative_path
                            FROM deleted_node
                        ), del AS (
                            SELECT COALESCE(os_name, name) AS os_name, 1 AS level, node_id AS deleted_node_id,
                            path(parent_relative_path, node_id) AS relative_path, parent_relative_path 
                            FROM deleted_node WHERE phy_deleted_on IS NULL
                                UNION ALL
                            SELECT COALESCE(n.os_name, n.name), d.level + 1, d.deleted_node_id,
                            n.relative_path, n.parent_relative_path
                            FROM all_nodes n JOIN del d ON n.relative_path = d.parent_relative_path
                            WHERE n.parent_relative_path IS NOT NULL
                        ), paths_to_delete AS
                        (SELECT deleted_node_id, '/' || STRING_AGG(os_name, '/' ORDER BY LEVEL DESC) AS os_path
                        FROM del GROUP BY deleted_node_id)
                        SELECT base_path || '/' || creator_id as os_base_path, os_path AS os_rel_path, deleted_on, d.node_id
                        FROM paths_to_delete p
                        JOIN deleted_node d ON d.node_id = p.deleted_node_id
                        JOIN location l ON d.location_id = l.location_id
                        JOIN storage s ON s.storage_id = l.storage_src_id;
                        """)
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    return result
    
        def nodeIsBusy(self, vospacePath):
            """Returns 'True' if the VOSpace node is busy, 'False' otherwise."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        SELECT job_id
                        FROM node
                        WHERE node_id = id_from_vos_path(%s);
                        """,
                        (vospacePath,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    if result[0]["job_id"]:
                        return True
                    else:
                        return False
    
        ##### Job #####
    
        def jobExists(self, jobId):
            """Checks if a job already exists. Returns a boolean."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    if result:
                        return True
                    else:
                        return False
    
        def getJob(self, jobId):
            """Returns a JSON object containing job information, according to the job id."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,))
                    #out = open("db_connector_log.txt", "a")
                    result = cursor.fetchall()
                    #out.write(f"result: {result}\n\n")
                    #out.close()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    if not result:
                        return json.loads('{ "error": "JOB_NOT_FOUND" }')
                    else:
                        job = dict()
                        for idx in result[0]:
                            oldIdx = idx
                            idxTokens = idx.split('_')
                            idx = idxTokens[0] + ''.join(token.title() for token in idxTokens[1:])
                            job[idx] = result[0][oldIdx]
                            el = job[idx]
                            if isinstance(el, datetime.datetime):
                                job[idx] = el.isoformat()
                        #out.write(f"job: {job}\n\n")
                        #out.close()
                        return job
    
        def getJobInfo(self, jobId):
            """Returns the 'job_info' field, according to the UWS specification."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("SELECT job_info FROM job WHERE job_id = %s;", (jobId,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    return result[0]["job_info"]
    
        def getJobResults(self, jobId):
            """Returns the 'results' field, according to the UWS specification."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("SELECT results FROM job WHERE job_id = %s;", (jobId,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    return result[0]["results"]
    
        def listActiveJobs(self):
            """Returns some info about active jobs."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        SELECT job_id,
                               job_type,
                               phase,
                               creation_time,
                               start_time,
                               owner_id
                               FROM job
                               WHERE phase
                               NOT IN ('ABORTED',
                                       'COMPLETED',
                                       'ERROR')
                               ORDER BY creation_time DESC;
                        """)
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    for row in result:
                        for idx in row:
                            el = row[idx]
                            if isinstance(el, datetime.datetime):
                                row[idx] = el.isoformat()
                    return result
    
        def listJobsByPhase(self, phase):
            """Returns some info about jobs according to the phase."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    if phase in [ "PENDING", "QUEUED", "EXECUTING" ]:
                        cursor.execute("""
                            SELECT job_id,
                                   job_type,
                                   phase,
                                   creation_time,
                                   start_time,
                                   owner_id
                                   FROM job
                                   WHERE phase = %s
                                   ORDER BY creation_time DESC;
                            """,
                            (phase,))
                    else:
                        cursor.execute("""
                            SELECT job_id,
                                   job_type,
                                   phase,
                                   start_time,
                                   end_time,
                                   owner_id
                                   FROM job
                                   WHERE phase = %s
                                   ORDER BY creation_time DESC;
                            """,
                            (phase,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    for row in result:
                        for idx in row:
                            el = row[idx]
                            if isinstance(el, datetime.datetime):
                                row[idx] = el.isoformat()
                    return result
    
        ##### User #####
    
        def userExists(self, username):
            """Checks if a user already exists. Returns a boolean."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("SELECT * FROM users WHERE user_name = %s;", (username,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    if result:
                        return True
                    else:
                        return False
    
        def getUserId(self, username):
            """Returns the user id for a given user name."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("SELECT user_id FROM users WHERE user_name = %s;", (username,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    return result[0]["user_id"]
    
        def getUserName(self, userId):
            """Returns the user name for a given user id."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("SELECT user_name FROM users WHERE user_id = %s;", (userId,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    return result[0]["user_name"]
        
        def getUserEmail(self, userId):
            """Returns the user email address for a given user id."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("SELECT e_mail FROM users WHERE user_id = %s;", (userId,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    return result[0]["e_mail"]
    
        ##### Storage #####
    
        def storageBasePathIsValid(self, path):
            """Checks if the base path of a physical path is valid. If true, returns the base path, else returns 'False'."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        SELECT base_path
                        FROM storage
                        WHERE position(base_path in cast(%s as varchar)) > 0;
                        """,
                        (path,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    if result:
                        return result[0]["base_path"]
                    else:
                        return False
    
        def getStorageBasePath(self, storageId):
            """Returns the storage base path for a give storage id."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("SELECT base_path FROM storage WHERE storage_id = %s;", (storageId,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    return result[0]["base_path"]
    
        def getStorageList(self):
            """Returns the full storage base list. Local storage points are excluded by default."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("SELECT * FROM storage WHERE storage_type <> 'local';")
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    return result
    
        def getStorageListByType(self, storageType):
            """Returns a list of storage locations for a given storage type."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("SELECT * FROM storage WHERE storage_type = %s;", (storageType,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    return result
    
        def getStorageType(self, basePath):
            """Returns the storage type for a given storage base path, if any. Otherwise it returns 'False'."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("SELECT storage_type FROM storage WHERE base_path = %s;", (basePath,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    if result:
                        return result[0]["storage_type"]
                    else:
                        return False
    
        def getStorageId(self, basePath):
            """Returns the storage id for a given storage base path, if any. Otherwise it returns 'False'."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("SELECT storage_id FROM storage WHERE base_path = %s;", (basePath,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    if result:
                        return result[0]["storage_id"]
                    else:
                        return False
    
        ##### Location #####
    
        def getLocationId(self, destStorageId):
            """Returns the location id according to the storage id of the destination."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("SELECT location_id FROM location WHERE storage_src_id = %s;", (destStorageId,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    return result[0]["location_id"]
    
    
        """
        Setters
        """
    
        ##### Job #####
    
        def insertJob(self, jobObj):
            """Inserts/updates a job object."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        INSERT INTO job(job_id,
                                        owner_id,
                                        job_type,
                                        phase,
                                        start_time,
                                        end_time,
                                        job_info,
                                        results,
                                        error_message,
                                        error_type)
                        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                        ON CONFLICT (job_id)
                        DO UPDATE SET
                        (owner_id,
                         job_type, 
                         phase,
                         start_time,
                         end_time,
                         job_info,
                         results,
                         error_message,
                         error_type)
                        = (EXCLUDED.owner_id,
                           EXCLUDED.job_type,
                           EXCLUDED.phase,
                           EXCLUDED.start_time,
                           EXCLUDED.end_time,
                           EXCLUDED.job_info,
                           EXCLUDED.results,
                           EXCLUDED.error_message,
                           EXCLUDED.error_type);
                        """,
                        (jobObj.jobId,
                         jobObj.ownerId,
                         jobObj.type,
                         jobObj.phase,
                         jobObj.startTime,
                         jobObj.endTime,
                         json.dumps(jobObj.jobInfo),
                         json.dumps(jobObj.results),
                         jobObj.errorMessage,
                         jobObj.errorType,))
                    conn.commit()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                
        def setStartTime(self, jobId):
            """Sets the job 'start_time' parameter."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    startTime = datetime.datetime.today().isoformat()
                    cursor.execute("""
                        UPDATE job SET start_time = %s
                        WHERE job_id = %s;
                        """,
                        (startTime, jobId,))
                    conn.commit()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
     
        def setEndTime(self, jobId):
            """Sets the job 'end_time' parameter."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    endTime = datetime.datetime.today().isoformat()
                    cursor.execute("""
                        UPDATE job SET end_time = %s
                        WHERE job_id = %s;
                        """,
                        (endTime, jobId,))
                    conn.commit()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
    
        def setPhase(self, jobId, phase):
            """Sets the job 'phase' parameter."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        UPDATE job SET phase = %s
                        WHERE job_id = %s;
                        """,
                        (phase, jobId,))
                    conn.commit()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                        
        def setTotalBlocks(self, jobId, totalBlocks):
            """
            Sets the job 'total_blocks' parameter
            for a data retrieve operation.
            """
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        UPDATE job SET total_blocks = %s
                        WHERE job_id = %s;
                        """,
                        (totalBlocks, jobId,))
                    conn.commit()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                        
        def updateProcessedBlocks(self, jobId, processedBlocks):
            """
            Updates the job 'processed_blocks' parameter
            for a data retrieve operation.
            """
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        UPDATE job SET processed_blocks = %s
                        WHERE job_id = %s;
                        """,
                        (processedBlocks, jobId,))
                    conn.commit()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
    
        def setResults(self, jobId, results):
            """Sets the job 'results' parameter."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        UPDATE job SET results = %s
                        WHERE job_id = %s;
                        """,
                        (json.dumps(results),
                         jobId,))
                    conn.commit()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
    
        ##### Node #####
    
        def insertNode(self, node):
            """Inserts a VOSpace node. Returns 'True' on success, 'False' otherwise."""
            with self.getConnection() as conn:
                try:
                    #out = open("db_connector_log.txt", "a")
                    #out.write(f"parentOSPath: {node.parentPath}\n")
                    #out.write(f"name: {node.name}\n")
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        SELECT path FROM node WHERE node_id = id_from_vos_path(%s);
                        """,
                        (node.parentPath,))
                    result = cursor.fetchall()
                    #for i in result:
                    #    out.write(f"queryResult: {i}\n")
                    parentLtreePath = result[0]["path"]
                    parentLtreeRelativePath = ""
                    if "." in parentLtreePath:
                        parentLtreeRelativePath = ".".join(parentLtreePath.strip(".").split('.')[1:])
                    #out.write(f"parentLtreeRelativePath: {parentLtreeRelativePath}\n")
                    #out.write(f"parentLtreePath: {parentLtreePath}\n")
                    #out.write(f"parentPath: {node.parentPath}\n\n")
                    #out.close()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:        
                    try:
                        #print(f"parentLtreePath: {parentLtreePath}, type: {type(parentLtreePath)}")
                        cursor.execute("""
                            INSERT INTO node(parent_path,
                                             parent_relative_path,
                                             name,
                                             tstamp_wrapper_dir,
                                             type,
                                             location_id,
                                             async_trans,
                                             sticky,
                                             job_id,
                                             creator_id,
                                             content_length,
                                             content_md5)
                            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                            ON CONFLICT
                            DO NOTHING
                            RETURNING node_id;
                            """,
                            (parentLtreePath,
                             parentLtreeRelativePath,
                             node.name,
                             node.wrapperDir,
                             node.type,
                             node.locationId,
                             node.asyncTrans,
                             node.sticky,
                             node.jobId,
                             node.creatorId,
                             node.contentLength,
                             node.contentMD5,))
                        result = cursor.fetchall()
                        conn.commit()                    
                    except Exception:
                        if not conn.closed:
                            conn.rollback()
                        raise
                    else:
                        if result:
                            return True
                        else:
                            return False
    
        def setAsyncTrans(self, nodeVOSPath, value):
            """Sets the 'async_trans' flag for a VOSpace node."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        UPDATE node c SET async_trans = %s
                        FROM node n
                        WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s);
                        """,
                        (value, nodeVOSPath,))
                    conn.commit()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
    
        def setJobId(self, nodeVOSPath, value):
            """Sets the 'job_id' flag for a VOSpace node."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        UPDATE node c SET job_id = %s
                        FROM node n
                        WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s);
                        """,
                        (value, nodeVOSPath,))
                    conn.commit()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                        
        def setPhyDeletedOn(self, nodeId):
            """Sets the 'phy_deleted_on' flag for a VOSpace deleted node."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    phyDeletedOn = datetime.datetime.now().isoformat()
                    cursor.execute("""
                        UPDATE deleted_node SET phy_deleted_on = %s
                        WHERE node_id = %s;
                        """,
                        (phyDeletedOn, nodeId,))
                    conn.commit()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                        
        def updateGroupRead(self, groupToAdd, groupToRemove, nodeVOSPath):
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        UPDATE node c
                        SET group_read = update_array(c.group_read, %s, %s)
                        FROM node n
                        WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s);
                        """,
                        (groupToAdd,
                         groupToRemove,
                         nodeVOSPath,))
                    conn.commit()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                        
        def updateGroupWrite(self, groupToAdd, groupToRemove, nodeVOSPath):
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        UPDATE node c
                        SET group_write = update_array(c.group_write, %s, %s)
                        FROM node n
                        WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s);
                        """,
                        (groupToAdd,
                         groupToRemove,
                         nodeVOSPath,))
                    conn.commit()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
    
        ##### Storage #####
    
        def insertStorage(self, storageType, basePath, baseUrl, hostname):
            """Inserts a storage point."""
            with self.getConnection() as conn:            
                if not self.getStorageId(basePath):
                    try:
                        cursor = conn.cursor(cursor_factory = RealDictCursor)
                        cursor.execute("""
                            INSERT INTO storage(storage_type,
                                                base_path,
                                                base_url,
                                                hostname)
                            VALUES (%s, %s, %s, %s)
                            RETURNING storage_id;
                            """,
                            (storageType,
                             basePath,
                             baseUrl,
                             hostname,))
                        storageSrcId = cursor.fetchall()[0]["storage_id"]
                    except Exception:
                        if not conn.closed:
                            conn.rollback()
                        raise
                    else:
                        if storageType == "cold" or storageType == "hot":
                            try:
                                cursor.execute("""
                                    SELECT storage_id
                                    FROM storage
                                    WHERE storage_type = 'local'
                                    AND base_path = '/home'
                                    AND hostname = 'localhost';
                                    """)
                                storageDestId = cursor.fetchall()[0]["storage_id"]
                            except Exception:
                                if not conn.closed:
                                    conn.rollback()
                                raise
                            else:
                                locationType = "async"
                        else:
                            storageDestId = storageSrcId
                            locationType = "portal"
                        try:
                            cursor.execute("""
                                INSERT INTO location(location_type,
                                                     storage_src_id,
                                                     storage_dest_id)
                                VALUES (%s, %s, %s);
                                """,
                                (locationType,
                                 storageSrcId,
                                 storageDestId,))
                            conn.commit()
                        except Exception:
                            if not conn.closed:
                                conn.rollback()
                            raise
                        else:
                            return True
                else:
                    return False
    
        def deleteStorage(self, storageId):
            """Deletes a storage point."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        SELECT count(*) > 0 AS res
                        FROM Node
                        WHERE location_id IN
                        (SELECT location_id
                         FROM storage s
                         JOIN location l
                         ON s.storage_id = l.storage_src_id
                         WHERE storage_src_id = %s);
                        """,
                        (storageId,))
                    result = cursor.fetchall()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    if result[0]["res"]:
                        return False
                    else:
                        try:
                            cursor.execute("""
                                DELETE FROM storage
                                WHERE storage_id = %s;
                                """,
                                (storageId,))
                            conn.commit()
                        except Exception:
                            if not conn.closed:
                                conn.rollback()
                            raise
                        else:
                            return True
    
        ##### Users #####
         
        def insertUser(self, userId, username, email):
            """Inserts users data."""
            with self.getConnection() as conn:
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        INSERT INTO users(user_id,
                                          user_name,
                                          e_mail)
                        VALUES (%s, %s, %s)
                        ON CONFLICT (user_id)
                        DO NOTHING;
                        """,
                        (userId,
                         username,
                         email,))
                    conn.commit()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise