Skip to content
Snippets Groups Projects
Select Git revision
  • 01ddeb668789cb8c780397bfbf9ff8191242ba64
  • master default protected
  • script_devel
  • parallel_trapping
  • unify_iterations
  • containers-m10
  • magma_refinement
  • release9
  • enable_svd
  • parallel_angles_gmu
  • containers-m8
  • parallel_angles
  • profile_omp_leonardo
  • test_nvidia_profiler
  • containers
  • shaditest
  • test1
  • main
  • 3-error-in-run-the-program
  • experiment
  • original protected
  • NP_TMcode-M10a.03
  • NP_TMcode-M10a.02
  • NP_TMcode-M10a.01
  • NP_TMcode-M10a.00
  • NP_TMcode-M9.01
  • NP_TMcode-M9.00
  • NP_TMcode-M8.03
  • NP_TMcode-M8.02
  • NP_TMcode-M8.01
  • NP_TMcode-M8.00
  • NP_TMcode-M7.00
  • v0.0
33 results

np_cluster_magma_mpi

Blame
  • db_connector.py 43.24 KiB
    #!/usr/bin/env python
    #
    # This file is part of vospace-transfer-service
    # Copyright (C) 2021 Istituto Nazionale di Astrofisica
    # SPDX-License-Identifier: GPL-3.0-or-later
    #
    
    import datetime
    import json
    import logging
    import psycopg2
    import sys
    import time
    
    from contextlib import contextmanager
    from psycopg2.extras import RealDictCursor
    from psycopg2.pool import ThreadedConnectionPool
    
    from node import Node
    
    
    class DbConnector(object):
    
        def __init__(self, user, password, host, port, dbname, minConnNum, maxConnNum):
            self.user = user
            self.password = password
            self.host = host
            self.port = port
            self.dbname = dbname
            self.minConnNum = minConnNum
            self.maxConnNum = maxConnNum
            self.connPool = None
            self.createConnectionPool()
    
        def createConnectionPool(self):
            self.connPool = ThreadedConnectionPool(
                self.minConnNum,
                self.maxConnNum,
                user = self.user,
                password = self.password,
                host = self.host,
                database = self.dbname,
                port = self.port,
                connect_timeout = 0
            )
    
        def getConnection(self, retry = 10, timeout = 30):
            if retry < 1:
                retry = 1
            if timeout < 1:
                timeout = 1
            conn = None
            while retry > 0:
                try:
                    logging.warning(f"Getting connection from pool: retry = {retry}")
                    conn = self.connPool.getconn()
                    conn.reset()
                except Exception:
                    logging.warning(f"Unable to get a connection from pool: retry = {retry}")
                    if conn is not None:
                        self.connPool.putconn(conn, close = False)
                        conn = None
                    time.sleep(timeout)
                    retry -= 1
                    if retry == 0:
                        raise
                else:
                    logging.warning(f"Connection established: retry = {retry}")
                    return conn
    
    
        """
        Getters
        """
    
        ##### Node #####
    
        def nodeExists(self, vospacePath):
            """Checks if a VOSpace node already exists. Returns a boolean."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("SELECT * FROM node_vos_path WHERE vos_path = %s;", (vospacePath,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                    raise
            else:
                if result:
                    return True
                else:
                    return False
            finally:
                self.connPool.putconn(conn, close = False)
    
        def getCreatorId(self, vospacePath):
            """Returns the creator ID for a given vospace path representing a node."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    SELECT creator_id
                    FROM node_vos_path nvp
                    JOIN node n ON nvp.node_id = n.node_id
                    WHERE vos_path = %s;
                    """, 
                    (vospacePath,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                return result[0]["creator_id"]
            finally:
                self.connPool.putconn(conn, close = False)
    
        def getGroupRead(self, vospacePath):
            """Returns the 'group_read' for a given VOSpace path representing a node."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    SELECT unnest(group_read) as group_read
                    FROM node_vos_path nvp
                    JOIN node n ON nvp.node_id = n.node_id
                    WHERE vos_path = %s;
                    """, 
                    (vospacePath,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                for i in range(0, len(result)):
                    result[i]["group_read"] = result[i]["group_read"].split("people.")[-1].replace("\\", "")
                return result
            finally:
                self.connPool.putconn(conn, close = False)
    
        def getGroupWrite(self, vospacePath):
            """Returns the 'group_write' for a given VOSpace path representing a node."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    SELECT unnest(group_write) as group_write
                    FROM node_vos_path nvp
                    JOIN node n ON nvp.node_id = n.node_id
                    WHERE vos_path = %s;
                    """, 
                    (vospacePath,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                for i in range(0, len(result)):
                    result[i]["group_write"] = result[i]["group_write"].split("people.")[-1].replace("\\", "")
                return result
            finally:
                self.connPool.putconn(conn, close = False)
    
        def getOSPath(self, vospacePath):
            """Returns a list containing full path, storage type and username for a VOSpace path."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, get_os_path(n.node_id) AS os_path, content_length
                    FROM node n
                    JOIN location l ON n.location_id = l.location_id
                    JOIN storage s ON s.storage_id = l.storage_src_id
                    JOIN users u ON u.user_id = n.creator_id
                    WHERE n.node_id = id_from_vos_path(%s);
                    """,
                    (vospacePath,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                storageType = result[0]["storage_type"]
                basePath = result[0]["base_path"]
                userName = result[0]["user_name"]
                tstampWrappedDir = result[0]["tstamp_wrapper_dir"]
                osPath = result[0]["os_path"]
                contentLength = result[0]["content_length"]
                if tstampWrappedDir is None:
                    baseSrcPath = basePath + "/" + userName
                else:
                    baseSrcPath = basePath + "/" + userName + "/" + tstampWrappedDir
                fullPath = baseSrcPath + osPath
                fileInfo = {
                                "baseSrcPath": baseSrcPath,
                                "fullPath": fullPath, 
                                "storageType": storageType, 
                                "username": userName, 
                                "osPath": osPath,
                                "contentLength": contentLength
                            }
                return fileInfo
            finally:
                self.connPool.putconn(conn, close = False)
    
        def getVOSpacePathList(self, vospacePath):
            """Returns the list of VOSpace paths carried by a VOSpace node, according to the node VOSpace path."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    SELECT get_vos_path(n.node_id)
                    FROM node n
                    JOIN list_of_files l ON l.node_id = n.node_id
                    WHERE l.list_node_id = id_from_vos_path(%s);
                    """,
                    (vospacePath,))
                results = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                vospacePathList = []
                for el in results:
                    vospacePathList.append(el["vos_path"])
                return vospacePathList
            finally:
                self.connPool.putconn(conn, close = False)
    
        def getNodesToBeDeleted(self):
            "Returns a path list of files to be deleted with also the corresponding deletion timestamp."
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    WITH RECURSIVE all_nodes AS (
                        SELECT node_id, name, os_name, relative_path, parent_relative_path
                        FROM node
                        UNION
                        SELECT node_id, name, os_name, path(parent_relative_path, node_id) AS relative_path, parent_relative_path
                        FROM deleted_node
                    ), del AS (
                        SELECT COALESCE(os_name, name) AS os_name, 1 AS level, node_id AS deleted_node_id,
                        path(parent_relative_path, node_id) AS relative_path, parent_relative_path 
                        FROM deleted_node WHERE phy_deleted_on IS NULL
                        UNION ALL
                        SELECT COALESCE(n.os_name, n.name), d.level + 1, d.deleted_node_id,
                        n.relative_path, n.parent_relative_path
                        FROM all_nodes n JOIN del d ON n.relative_path = d.parent_relative_path
                        WHERE n.parent_relative_path IS NOT NULL
                    ), paths_to_delete AS
                    (SELECT deleted_node_id, '/' || STRING_AGG(os_name, '/' ORDER BY LEVEL DESC) AS os_path
                    FROM del GROUP BY deleted_node_id)
                    SELECT base_path || '/' || creator_id as os_base_path, os_path AS os_rel_path, deleted_on, d.node_id
                    FROM paths_to_delete p
                    JOIN deleted_node d ON d.node_id = p.deleted_node_id
                    JOIN location l ON d.location_id = l.location_id
                    JOIN storage s ON s.storage_id = l.storage_src_id;
                    """)
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                return result
            finally:
                self.connPool.putconn(conn, close = False)
    
        def nodeIsBusy(self, vospacePath):
            """Returns 'True' if the VOSpace node is busy, 'False' otherwise."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    SELECT job_id
                    FROM node
                    WHERE node_id = id_from_vos_path(%s);
                    """,
                    (vospacePath,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                if result[0]["job_id"]:
                    return True
                else:
                    return False
            finally:
                self.connPool.putconn(conn, close = False)
    
        ##### Job #####
    
        def jobExists(self, jobId):
            """Checks if a job already exists. Returns a boolean."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                if result:
                    return True
                else:
                    return False
            finally:
                self.connPool.putconn(conn, close = False)
    
        def getJob(self, jobId):
            """Returns a JSON object containing job information, according to the job id."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                if not result:
                    return json.loads('{ "error": "JOB_NOT_FOUND" }')
                else:
                    job = dict()
                    for idx in result[0]:
                        oldIdx = idx
                        idxTokens = idx.split('_')
                        idx = idxTokens[0] + ''.join(token.title() for token in idxTokens[1:])
                        job[idx] = result[0][oldIdx]
                        el = job[idx]
                        if isinstance(el, datetime.datetime):
                            job[idx] = el.isoformat()
                    return job
            finally:
                self.connPool.putconn(conn, close = False)
    
        def getJobPhase(self, jobId):
            """Returns the 'phase' field, according to the UWS specification."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("SELECT phase FROM job WHERE job_id = %s;", (jobId,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                return result[0]["phase"]
            finally:
                self.connPool.putconn(conn, close = False)
    
        def getJobInfo(self, jobId):
            """Returns the 'job_info' field, according to the UWS specification."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("SELECT job_info FROM job WHERE job_id = %s;", (jobId,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                return result[0]["job_info"]
            finally:
                self.connPool.putconn(conn, close = False)
    
        def getJobResults(self, jobId):
            """Returns the 'results' field, according to the UWS specification."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("SELECT results FROM job WHERE job_id = %s;", (jobId,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                return result[0]["results"]
            finally:
                self.connPool.putconn(conn, close = False)
    
        def listActiveJobs(self):
            """Returns some info about active jobs."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    SELECT job_id,
                           job_type,
                           phase,
                           creation_time,
                           start_time,
                           owner_id
                           FROM job
                           WHERE phase NOT IN ('ABORTED',
                                               'COMPLETED',
                                               'ERROR')
                           AND
                           job_type IN ('pullFromVoSpace',
                                        'pullToVoSpace',
                                        'pushToVoSpace',
                                        'vos_data',
                                        'vos_group',
                                        'vos_import')
                           ORDER BY creation_time DESC;
                    """)
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                for row in result:
                    for idx in row:
                        el = row[idx]
                        if isinstance(el, datetime.datetime):
                            row[idx] = el.isoformat()
                return result
            finally:
                self.connPool.putconn(conn, close = False)
    
        def listJobsByPhase(self, phase):
            """Returns some info about jobs according to the phase."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                if phase in [ "PENDING", "QUEUED", "EXECUTING" ]:
                    cursor.execute("""
                        SELECT job_id,
                               job_type,
                               phase,
                               creation_time,
                               start_time,
                               owner_id
                               FROM job
                               WHERE phase = %s
                               AND
                               job_type IN ('pullFromVoSpace',
                                            'pullToVoSpace',
                                            'pushToVoSpace',
                                            'vos_data',
                                            'vos_group',
                                            'vos_import')
                               ORDER BY creation_time DESC;
                        """,
                        (phase,))
                else:
                    cursor.execute("""
                        SELECT job_id,
                               job_type,
                               phase,
                               start_time,
                               end_time,
                               owner_id
                               FROM job
                               WHERE phase = %s
                               AND
                               job_type IN ('pullFromVoSpace',
                                            'pullToVoSpace',
                                            'pushToVoSpace',
                                            'vos_data',
                                            'vos_group',
                                            'vos_import')
                               ORDER BY creation_time DESC;
                        """,
                        (phase,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                for row in result:
                    for idx in row:
                        el = row[idx]
                        if isinstance(el, datetime.datetime):
                            row[idx] = el.isoformat()
                return result
            finally:
                self.connPool.putconn(conn, close = False)
    
        ##### User #####
    
        def userExists(self, username):
            """Checks if a user already exists. Returns a boolean."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("SELECT * FROM users WHERE user_name = %s;", (username,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                if result:
                    return True
                else:
                    return False
            finally:
                self.connPool.putconn(conn, close = False)
    
        def getUserId(self, username):
            """Returns the user id for a given user name."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("SELECT user_id FROM users WHERE user_name = %s;", (username,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                return result[0]["user_id"]
            finally:
                self.connPool.putconn(conn, close = False)
    
        def getUserName(self, userId):
            """Returns the user name for a given user id."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("SELECT user_name FROM users WHERE user_id = %s;", (userId,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                return result[0]["user_name"]
            finally:
                self.connPool.putconn(conn, close = False)
        
        def getUserEmail(self, userId):
            """Returns the user email address for a given user id."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("SELECT e_mail FROM users WHERE user_id = %s;", (userId,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                return result[0]["e_mail"]
            finally:
                self.connPool.putconn(conn, close = False)
    
        ##### Storage #####
    
        def storageBasePathIsValid(self, path):
            """Checks if the base path of a physical path is valid. If true, returns the base path, else returns 'False'."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    SELECT base_path
                    FROM storage
                    WHERE position(base_path in cast(%s as varchar)) > 0;
                    """,
                    (path,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                if result:
                    return result[0]["base_path"]
                else:
                    return False
            finally:
                self.connPool.putconn(conn, close = False)
    
        def getStorageBasePath(self, storageId):
            """Returns the storage base path for a give storage id."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("SELECT base_path FROM storage WHERE storage_id = %s;", (storageId,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                return result[0]["base_path"]
            finally:
                self.connPool.putconn(conn, close = False)
    
        def getStorageList(self):
            """Returns the full storage base list. Local storage points are excluded by default."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("SELECT * FROM storage WHERE storage_type <> 'local';")
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                return result
            finally:
                self.connPool.putconn(conn, close = False)
    
        def getStorageListByType(self, storageType):
            """Returns a list of storage locations for a given storage type."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("SELECT * FROM storage WHERE storage_type = %s;", (storageType,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                return result
            finally:
                self.connPool.putconn(conn, close = False)
    
        def getStorageType(self, basePath):
            """Returns the storage type for a given storage base path, if any. Otherwise it returns 'False'."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("SELECT storage_type FROM storage WHERE base_path = %s;", (basePath,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                if result:
                    return result[0]["storage_type"]
                else:
                    return False
            finally:
                self.connPool.putconn(conn, close = False)
    
        def getStorageId(self, basePath):
            """Returns the storage id for a given storage base path, if any. Otherwise it returns 'False'."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("SELECT storage_id FROM storage WHERE base_path = %s;", (basePath,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                if result:
                    return result[0]["storage_id"]
                else:
                    return False
            finally:
                self.connPool.putconn(conn, close = False)
    
        def getStorageHostname(self, storageId):
            """Returns the storage hostname for a given storage id, if any. Otherwise it returns 'False'."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("SELECT hostname FROM storage WHERE storage_id = %s;", (storageId,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                if result:
                    return result[0]["hostname"]
                else:
                    return False
            finally:
                self.connPool.putconn(conn, close = False)
    
        ##### Location #####
    
        def getLocationId(self, destStorageId):
            """Returns the location id according to the storage id of the destination."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("SELECT location_id FROM location WHERE storage_src_id = %s;", (destStorageId,))
                result = cursor.fetchall()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                return result[0]["location_id"]
            finally:
                self.connPool.putconn(conn, close = False)
    
    
        """
        Setters
        """
    
        ##### Job #####
    
        def insertJob(self, jobObj):
            """Inserts/updates a job object."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    INSERT INTO job(job_id,
                                    owner_id,
                                    job_type,
                                    phase,
                                    start_time,
                                    end_time,
                                    job_info,
                                    node_list,
                                    results,
                                    error_message,
                                    error_type)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    ON CONFLICT (job_id)
                    DO UPDATE SET
                    (owner_id,
                     job_type, 
                     phase,
                     start_time,
                     end_time,
                     job_info,
                     node_list,
                     results,
                     error_message,
                     error_type)
                     = (EXCLUDED.owner_id,
                        EXCLUDED.job_type,
                        EXCLUDED.phase,
                        EXCLUDED.start_time,
                        EXCLUDED.end_time,
                        EXCLUDED.job_info,
                        EXCLUDED.node_list,
                        EXCLUDED.results,
                        EXCLUDED.error_message,
                        EXCLUDED.error_type);
                    """,
                    (jobObj.jobId,
                     jobObj.ownerId,
                     jobObj.type,
                     jobObj.phase,
                     jobObj.startTime,
                     jobObj.endTime,
                     json.dumps(jobObj.jobInfo),
                     json.dumps(jobObj.nodeList),
                     json.dumps(jobObj.results),
                     jobObj.errorMessage,
                     jobObj.errorType,))
                conn.commit()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            finally:
                self.connPool.putconn(conn, close = False)
                
        def setStartTime(self, jobId):
            """Sets the job 'start_time' parameter."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                startTime = datetime.datetime.today().isoformat()
                cursor.execute("""
                    UPDATE job SET start_time = %s
                    WHERE job_id = %s;
                    """,
                    (startTime, jobId,))
                conn.commit()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            finally:
                self.connPool.putconn(conn, close = False)
     
        def setEndTime(self, jobId):
            """Sets the job 'end_time' parameter."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                endTime = datetime.datetime.today().isoformat()
                cursor.execute("""
                    UPDATE job SET end_time = %s
                    WHERE job_id = %s;
                    """,
                    (endTime, jobId,))
                conn.commit()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            finally:
                self.connPool.putconn(conn, close = False)
    
        def setPhase(self, jobId, phase):
            """Sets the job 'phase' parameter."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    UPDATE job SET phase = %s
                    WHERE job_id = %s;
                    """,
                    (phase, jobId,))
                conn.commit()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            finally:
                self.connPool.putconn(conn, close = False)
                        
        def setTotalBlocks(self, jobId, totalBlocks):
            """
            Sets the job 'total_blocks' parameter
            for a data retrieve operation.
            """
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    UPDATE job SET total_blocks = %s
                    WHERE job_id = %s;
                    """,
                    (totalBlocks, jobId,))
                conn.commit()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            finally:
                self.connPool.putconn(conn, close = False)
                        
        def updateProcessedBlocks(self, jobId, processedBlocks):
            """
            Updates the job 'processed_blocks' parameter
            for a data retrieve operation.
            """
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    UPDATE job SET processed_blocks = %s
                    WHERE job_id = %s;
                    """,
                    (processedBlocks, jobId,))
                conn.commit()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            finally:
                self.connPool.putconn(conn, close = False)
    
        def setResults(self, jobId, results):
            """Sets the job 'results' parameter."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    UPDATE job SET results = %s
                    WHERE job_id = %s;
                    """,
                    (json.dumps(results),
                     jobId,))
                conn.commit()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            finally:
                self.connPool.putconn(conn, close = False)
    
        ##### Node #####
    
        def insertNode(self, node):
            """Inserts a VOSpace node. Returns 'True' on success, 'False' otherwise."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    SELECT path FROM node WHERE node_id = id_from_vos_path(%s);
                    """,
                    (node.parentPath,))
                result = cursor.fetchall()
                parentLtreePath = result[0]["path"]
                parentLtreeRelativePath = ""
                if "." in parentLtreePath:
                    parentLtreeRelativePath = ".".join(parentLtreePath.strip(".").split('.')[1:])
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:        
                try:
                    cursor.execute("""
                        INSERT INTO node(parent_path,
                                         parent_relative_path,
                                         name,
                                         tstamp_wrapper_dir,
                                         type,
                                         location_id,
                                         async_trans,
                                         sticky,
                                         job_id,
                                         creator_id,
                                         content_length,
                                         content_md5)
                        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                        ON CONFLICT
                        DO NOTHING
                        RETURNING node_id;
                        """,
                        (parentLtreePath,
                         parentLtreeRelativePath,
                         node.name,
                         node.wrapperDir,
                         node.type,
                         node.locationId,
                         node.asyncTrans,
                         node.sticky,
                         node.jobId,
                         node.creatorId,
                         node.contentLength,
                         node.contentMD5,))
                    result = cursor.fetchall()
                    conn.commit()
                    cursor.close()
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    if result:
                        return True
                    else:
                        return False
            finally:
                self.connPool.putconn(conn, close = False)
    
        def setAsyncTrans(self, nodeVOSPath, value):
            """Sets the 'async_trans' flag for a VOSpace node."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    UPDATE node c SET async_trans = %s
                    FROM node n
                    WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s);
                    """,
                    (value, nodeVOSPath,))
                conn.commit()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            finally:
                self.connPool.putconn(conn, close = False)
    
        def setJobId(self, nodeVOSPath, value):
            """Sets the 'job_id' flag for a VOSpace node."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    UPDATE node c SET job_id = %s
                    FROM node n
                    WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s);
                    """,
                    (value, nodeVOSPath,))
                conn.commit()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            finally:
                self.connPool.putconn(conn, close = False)
                        
        def setPhyDeletedOn(self, nodeId):
            """Sets the 'phy_deleted_on' flag for a VOSpace deleted node."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                phyDeletedOn = datetime.datetime.now().isoformat()
                cursor.execute("""
                    UPDATE deleted_node SET phy_deleted_on = %s
                    WHERE node_id = %s;
                    """,
                    (phyDeletedOn, nodeId,))
                conn.commit()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            finally:
                self.connPool.putconn(conn, close = False)
                        
        def updateGroupRead(self, groupToAdd, groupToRemove, nodeVOSPath):
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    UPDATE node c
                    SET group_read = update_array(c.group_read, %s, %s)
                    FROM node n
                    WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s);
                    """,
                    (groupToAdd,
                     groupToRemove,
                     nodeVOSPath,))
                conn.commit()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            finally:
                self.connPool.putconn(conn, close = False)
                        
        def updateGroupWrite(self, groupToAdd, groupToRemove, nodeVOSPath):
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    UPDATE node c
                    SET group_write = update_array(c.group_write, %s, %s)
                    FROM node n
                    WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s);
                    """,
                    (groupToAdd,
                     groupToRemove,
                     nodeVOSPath,))
                conn.commit()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            finally:
                self.connPool.putconn(conn, close = False)
    
        ##### Storage #####
    
        def insertStorage(self, storageType, basePath, baseUrl, hostname):
            """Inserts a storage point."""
            conn = self.getConnection()
            if not self.getStorageId(basePath):
                try:
                    cursor = conn.cursor(cursor_factory = RealDictCursor)
                    cursor.execute("""
                        INSERT INTO storage(storage_type,
                                            base_path,
                                            base_url,
                                            hostname)
                        VALUES (%s, %s, %s, %s)
                        RETURNING storage_id;
                        """,
                        (storageType,
                         basePath,
                         baseUrl,
                         hostname,))
                    storageSrcId = cursor.fetchall()[0]["storage_id"]
                except Exception:
                    if not conn.closed:
                        conn.rollback()
                    raise
                else:
                    if storageType == "cold" or storageType == "hot":
                        try:
                            cursor.execute("""
                                SELECT storage_id
                                FROM storage
                                WHERE storage_type = 'local'
                                AND base_path = '/home'
                                AND hostname = 'localhost';
                                """)
                            storageDestId = cursor.fetchall()[0]["storage_id"]
                        except Exception:
                            if not conn.closed:
                                conn.rollback()
                            raise
                        else:
                            locationType = "async"
                    else:
                        storageDestId = storageSrcId
                        locationType = "portal"
                    try:
                        cursor.execute("""
                            INSERT INTO location(location_type,
                                                 storage_src_id,
                                                 storage_dest_id)
                            VALUES (%s, %s, %s);
                            """,
                            (locationType,
                             storageSrcId,
                             storageDestId,))
                        conn.commit()
                        cursor.close()
                    except Exception:
                        if not conn.closed:
                            conn.rollback()
                        raise
                    else:
                        return True
                finally:
                    self.connPool.putconn(conn, close = False)
            else:
                return False
    
        def deleteStorage(self, storageId):
            """Deletes a storage point."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    SELECT count(*) > 0 AS res
                    FROM Node
                    WHERE location_id IN
                    (SELECT location_id
                     FROM storage s
                     JOIN location l
                     ON s.storage_id = l.storage_src_id
                     WHERE storage_src_id = %s);
                    """,
                    (storageId,))
                result = cursor.fetchall()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            else:
                if result[0]["res"]:
                    return False
                else:
                    try:
                        cursor.execute("""
                            DELETE FROM storage
                            WHERE storage_id = %s;
                            """,
                            (storageId,))
                        conn.commit()
                        cursor.close()
                    except Exception:
                        if not conn.closed:
                            conn.rollback()
                        raise
                    else:
                        return True
            finally:
                self.connPool.putconn(conn, close = False)
    
        ##### Users #####
         
        def insertUser(self, userId, username, email):
            """Inserts users data."""
            conn = self.getConnection()
            try:
                cursor = conn.cursor(cursor_factory = RealDictCursor)
                cursor.execute("""
                    INSERT INTO users(user_id,
                                      user_name,
                                      e_mail)
                    VALUES (%s, %s, %s)
                    ON CONFLICT (user_id)
                    DO NOTHING;
                    """,
                    (userId,
                     username,
                     email,))
                conn.commit()
                cursor.close()
            except Exception:
                if not conn.closed:
                    conn.rollback()
                raise
            finally:
                self.connPool.putconn(conn, close = False)