Something went wrong on our end
Select Git revision
np_cluster_magma_mpi
-
Mulas, Giacomo authoredMulas, Giacomo authored
db_connector.py 43.24 KiB
#!/usr/bin/env python
#
# This file is part of vospace-transfer-service
# Copyright (C) 2021 Istituto Nazionale di Astrofisica
# SPDX-License-Identifier: GPL-3.0-or-later
#
import datetime
import json
import logging
import psycopg2
import sys
import time
from contextlib import contextmanager
from psycopg2.extras import RealDictCursor
from psycopg2.pool import ThreadedConnectionPool
from node import Node
class DbConnector(object):
def __init__(self, user, password, host, port, dbname, minConnNum, maxConnNum):
self.user = user
self.password = password
self.host = host
self.port = port
self.dbname = dbname
self.minConnNum = minConnNum
self.maxConnNum = maxConnNum
self.connPool = None
self.createConnectionPool()
def createConnectionPool(self):
self.connPool = ThreadedConnectionPool(
self.minConnNum,
self.maxConnNum,
user = self.user,
password = self.password,
host = self.host,
database = self.dbname,
port = self.port,
connect_timeout = 0
)
def getConnection(self, retry = 10, timeout = 30):
if retry < 1:
retry = 1
if timeout < 1:
timeout = 1
conn = None
while retry > 0:
try:
logging.warning(f"Getting connection from pool: retry = {retry}")
conn = self.connPool.getconn()
conn.reset()
except Exception:
logging.warning(f"Unable to get a connection from pool: retry = {retry}")
if conn is not None:
self.connPool.putconn(conn, close = False)
conn = None
time.sleep(timeout)
retry -= 1
if retry == 0:
raise
else:
logging.warning(f"Connection established: retry = {retry}")
return conn
"""
Getters
"""
##### Node #####
def nodeExists(self, vospacePath):
"""Checks if a VOSpace node already exists. Returns a boolean."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("SELECT * FROM node_vos_path WHERE vos_path = %s;", (vospacePath,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
if result:
return True
else:
return False
finally:
self.connPool.putconn(conn, close = False)
def getCreatorId(self, vospacePath):
"""Returns the creator ID for a given vospace path representing a node."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
SELECT creator_id
FROM node_vos_path nvp
JOIN node n ON nvp.node_id = n.node_id
WHERE vos_path = %s;
""",
(vospacePath,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
return result[0]["creator_id"]
finally:
self.connPool.putconn(conn, close = False)
def getGroupRead(self, vospacePath):
"""Returns the 'group_read' for a given VOSpace path representing a node."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
SELECT unnest(group_read) as group_read
FROM node_vos_path nvp
JOIN node n ON nvp.node_id = n.node_id
WHERE vos_path = %s;
""",
(vospacePath,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
for i in range(0, len(result)):
result[i]["group_read"] = result[i]["group_read"].split("people.")[-1].replace("\\", "")
return result
finally:
self.connPool.putconn(conn, close = False)
def getGroupWrite(self, vospacePath):
"""Returns the 'group_write' for a given VOSpace path representing a node."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
SELECT unnest(group_write) as group_write
FROM node_vos_path nvp
JOIN node n ON nvp.node_id = n.node_id
WHERE vos_path = %s;
""",
(vospacePath,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
for i in range(0, len(result)):
result[i]["group_write"] = result[i]["group_write"].split("people.")[-1].replace("\\", "")
return result
finally:
self.connPool.putconn(conn, close = False)
def getOSPath(self, vospacePath):
"""Returns a list containing full path, storage type and username for a VOSpace path."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, get_os_path(n.node_id) AS os_path, content_length
FROM node n
JOIN location l ON n.location_id = l.location_id
JOIN storage s ON s.storage_id = l.storage_src_id
JOIN users u ON u.user_id = n.creator_id
WHERE n.node_id = id_from_vos_path(%s);
""",
(vospacePath,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
storageType = result[0]["storage_type"]
basePath = result[0]["base_path"]
userName = result[0]["user_name"]
tstampWrappedDir = result[0]["tstamp_wrapper_dir"]
osPath = result[0]["os_path"]
contentLength = result[0]["content_length"]
if tstampWrappedDir is None:
baseSrcPath = basePath + "/" + userName
else:
baseSrcPath = basePath + "/" + userName + "/" + tstampWrappedDir
fullPath = baseSrcPath + osPath
fileInfo = {
"baseSrcPath": baseSrcPath,
"fullPath": fullPath,
"storageType": storageType,
"username": userName,
"osPath": osPath,
"contentLength": contentLength
}
return fileInfo
finally:
self.connPool.putconn(conn, close = False)
def getVOSpacePathList(self, vospacePath):
"""Returns the list of VOSpace paths carried by a VOSpace node, according to the node VOSpace path."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
SELECT get_vos_path(n.node_id)
FROM node n
JOIN list_of_files l ON l.node_id = n.node_id
WHERE l.list_node_id = id_from_vos_path(%s);
""",
(vospacePath,))
results = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
vospacePathList = []
for el in results:
vospacePathList.append(el["vos_path"])
return vospacePathList
finally:
self.connPool.putconn(conn, close = False)
def getNodesToBeDeleted(self):
"Returns a path list of files to be deleted with also the corresponding deletion timestamp."
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
WITH RECURSIVE all_nodes AS (
SELECT node_id, name, os_name, relative_path, parent_relative_path
FROM node
UNION
SELECT node_id, name, os_name, path(parent_relative_path, node_id) AS relative_path, parent_relative_path
FROM deleted_node
), del AS (
SELECT COALESCE(os_name, name) AS os_name, 1 AS level, node_id AS deleted_node_id,
path(parent_relative_path, node_id) AS relative_path, parent_relative_path
FROM deleted_node WHERE phy_deleted_on IS NULL
UNION ALL
SELECT COALESCE(n.os_name, n.name), d.level + 1, d.deleted_node_id,
n.relative_path, n.parent_relative_path
FROM all_nodes n JOIN del d ON n.relative_path = d.parent_relative_path
WHERE n.parent_relative_path IS NOT NULL
), paths_to_delete AS
(SELECT deleted_node_id, '/' || STRING_AGG(os_name, '/' ORDER BY LEVEL DESC) AS os_path
FROM del GROUP BY deleted_node_id)
SELECT base_path || '/' || creator_id as os_base_path, os_path AS os_rel_path, deleted_on, d.node_id
FROM paths_to_delete p
JOIN deleted_node d ON d.node_id = p.deleted_node_id
JOIN location l ON d.location_id = l.location_id
JOIN storage s ON s.storage_id = l.storage_src_id;
""")
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
return result
finally:
self.connPool.putconn(conn, close = False)
def nodeIsBusy(self, vospacePath):
"""Returns 'True' if the VOSpace node is busy, 'False' otherwise."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
SELECT job_id
FROM node
WHERE node_id = id_from_vos_path(%s);
""",
(vospacePath,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
if result[0]["job_id"]:
return True
else:
return False
finally:
self.connPool.putconn(conn, close = False)
##### Job #####
def jobExists(self, jobId):
"""Checks if a job already exists. Returns a boolean."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
if result:
return True
else:
return False
finally:
self.connPool.putconn(conn, close = False)
def getJob(self, jobId):
"""Returns a JSON object containing job information, according to the job id."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
if not result:
return json.loads('{ "error": "JOB_NOT_FOUND" }')
else:
job = dict()
for idx in result[0]:
oldIdx = idx
idxTokens = idx.split('_')
idx = idxTokens[0] + ''.join(token.title() for token in idxTokens[1:])
job[idx] = result[0][oldIdx]
el = job[idx]
if isinstance(el, datetime.datetime):
job[idx] = el.isoformat()
return job
finally:
self.connPool.putconn(conn, close = False)
def getJobPhase(self, jobId):
"""Returns the 'phase' field, according to the UWS specification."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("SELECT phase FROM job WHERE job_id = %s;", (jobId,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
return result[0]["phase"]
finally:
self.connPool.putconn(conn, close = False)
def getJobInfo(self, jobId):
"""Returns the 'job_info' field, according to the UWS specification."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("SELECT job_info FROM job WHERE job_id = %s;", (jobId,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
return result[0]["job_info"]
finally:
self.connPool.putconn(conn, close = False)
def getJobResults(self, jobId):
"""Returns the 'results' field, according to the UWS specification."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("SELECT results FROM job WHERE job_id = %s;", (jobId,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
return result[0]["results"]
finally:
self.connPool.putconn(conn, close = False)
def listActiveJobs(self):
"""Returns some info about active jobs."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
SELECT job_id,
job_type,
phase,
creation_time,
start_time,
owner_id
FROM job
WHERE phase NOT IN ('ABORTED',
'COMPLETED',
'ERROR')
AND
job_type IN ('pullFromVoSpace',
'pullToVoSpace',
'pushToVoSpace',
'vos_data',
'vos_group',
'vos_import')
ORDER BY creation_time DESC;
""")
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
for row in result:
for idx in row:
el = row[idx]
if isinstance(el, datetime.datetime):
row[idx] = el.isoformat()
return result
finally:
self.connPool.putconn(conn, close = False)
def listJobsByPhase(self, phase):
"""Returns some info about jobs according to the phase."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
if phase in [ "PENDING", "QUEUED", "EXECUTING" ]:
cursor.execute("""
SELECT job_id,
job_type,
phase,
creation_time,
start_time,
owner_id
FROM job
WHERE phase = %s
AND
job_type IN ('pullFromVoSpace',
'pullToVoSpace',
'pushToVoSpace',
'vos_data',
'vos_group',
'vos_import')
ORDER BY creation_time DESC;
""",
(phase,))
else:
cursor.execute("""
SELECT job_id,
job_type,
phase,
start_time,
end_time,
owner_id
FROM job
WHERE phase = %s
AND
job_type IN ('pullFromVoSpace',
'pullToVoSpace',
'pushToVoSpace',
'vos_data',
'vos_group',
'vos_import')
ORDER BY creation_time DESC;
""",
(phase,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
for row in result:
for idx in row:
el = row[idx]
if isinstance(el, datetime.datetime):
row[idx] = el.isoformat()
return result
finally:
self.connPool.putconn(conn, close = False)
##### User #####
def userExists(self, username):
"""Checks if a user already exists. Returns a boolean."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("SELECT * FROM users WHERE user_name = %s;", (username,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
if result:
return True
else:
return False
finally:
self.connPool.putconn(conn, close = False)
def getUserId(self, username):
"""Returns the user id for a given user name."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("SELECT user_id FROM users WHERE user_name = %s;", (username,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
return result[0]["user_id"]
finally:
self.connPool.putconn(conn, close = False)
def getUserName(self, userId):
"""Returns the user name for a given user id."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("SELECT user_name FROM users WHERE user_id = %s;", (userId,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
return result[0]["user_name"]
finally:
self.connPool.putconn(conn, close = False)
def getUserEmail(self, userId):
"""Returns the user email address for a given user id."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("SELECT e_mail FROM users WHERE user_id = %s;", (userId,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
return result[0]["e_mail"]
finally:
self.connPool.putconn(conn, close = False)
##### Storage #####
def storageBasePathIsValid(self, path):
"""Checks if the base path of a physical path is valid. If true, returns the base path, else returns 'False'."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
SELECT base_path
FROM storage
WHERE position(base_path in cast(%s as varchar)) > 0;
""",
(path,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
if result:
return result[0]["base_path"]
else:
return False
finally:
self.connPool.putconn(conn, close = False)
def getStorageBasePath(self, storageId):
"""Returns the storage base path for a give storage id."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("SELECT base_path FROM storage WHERE storage_id = %s;", (storageId,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
return result[0]["base_path"]
finally:
self.connPool.putconn(conn, close = False)
def getStorageList(self):
"""Returns the full storage base list. Local storage points are excluded by default."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("SELECT * FROM storage WHERE storage_type <> 'local';")
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
return result
finally:
self.connPool.putconn(conn, close = False)
def getStorageListByType(self, storageType):
"""Returns a list of storage locations for a given storage type."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("SELECT * FROM storage WHERE storage_type = %s;", (storageType,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
return result
finally:
self.connPool.putconn(conn, close = False)
def getStorageType(self, basePath):
"""Returns the storage type for a given storage base path, if any. Otherwise it returns 'False'."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("SELECT storage_type FROM storage WHERE base_path = %s;", (basePath,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
if result:
return result[0]["storage_type"]
else:
return False
finally:
self.connPool.putconn(conn, close = False)
def getStorageId(self, basePath):
"""Returns the storage id for a given storage base path, if any. Otherwise it returns 'False'."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("SELECT storage_id FROM storage WHERE base_path = %s;", (basePath,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
if result:
return result[0]["storage_id"]
else:
return False
finally:
self.connPool.putconn(conn, close = False)
def getStorageHostname(self, storageId):
"""Returns the storage hostname for a given storage id, if any. Otherwise it returns 'False'."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("SELECT hostname FROM storage WHERE storage_id = %s;", (storageId,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
if result:
return result[0]["hostname"]
else:
return False
finally:
self.connPool.putconn(conn, close = False)
##### Location #####
def getLocationId(self, destStorageId):
"""Returns the location id according to the storage id of the destination."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("SELECT location_id FROM location WHERE storage_src_id = %s;", (destStorageId,))
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
return result[0]["location_id"]
finally:
self.connPool.putconn(conn, close = False)
"""
Setters
"""
##### Job #####
def insertJob(self, jobObj):
"""Inserts/updates a job object."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
INSERT INTO job(job_id,
owner_id,
job_type,
phase,
start_time,
end_time,
job_info,
node_list,
results,
error_message,
error_type)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (job_id)
DO UPDATE SET
(owner_id,
job_type,
phase,
start_time,
end_time,
job_info,
node_list,
results,
error_message,
error_type)
= (EXCLUDED.owner_id,
EXCLUDED.job_type,
EXCLUDED.phase,
EXCLUDED.start_time,
EXCLUDED.end_time,
EXCLUDED.job_info,
EXCLUDED.node_list,
EXCLUDED.results,
EXCLUDED.error_message,
EXCLUDED.error_type);
""",
(jobObj.jobId,
jobObj.ownerId,
jobObj.type,
jobObj.phase,
jobObj.startTime,
jobObj.endTime,
json.dumps(jobObj.jobInfo),
json.dumps(jobObj.nodeList),
json.dumps(jobObj.results),
jobObj.errorMessage,
jobObj.errorType,))
conn.commit()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
finally:
self.connPool.putconn(conn, close = False)
def setStartTime(self, jobId):
"""Sets the job 'start_time' parameter."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
startTime = datetime.datetime.today().isoformat()
cursor.execute("""
UPDATE job SET start_time = %s
WHERE job_id = %s;
""",
(startTime, jobId,))
conn.commit()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
finally:
self.connPool.putconn(conn, close = False)
def setEndTime(self, jobId):
"""Sets the job 'end_time' parameter."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
endTime = datetime.datetime.today().isoformat()
cursor.execute("""
UPDATE job SET end_time = %s
WHERE job_id = %s;
""",
(endTime, jobId,))
conn.commit()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
finally:
self.connPool.putconn(conn, close = False)
def setPhase(self, jobId, phase):
"""Sets the job 'phase' parameter."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
UPDATE job SET phase = %s
WHERE job_id = %s;
""",
(phase, jobId,))
conn.commit()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
finally:
self.connPool.putconn(conn, close = False)
def setTotalBlocks(self, jobId, totalBlocks):
"""
Sets the job 'total_blocks' parameter
for a data retrieve operation.
"""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
UPDATE job SET total_blocks = %s
WHERE job_id = %s;
""",
(totalBlocks, jobId,))
conn.commit()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
finally:
self.connPool.putconn(conn, close = False)
def updateProcessedBlocks(self, jobId, processedBlocks):
"""
Updates the job 'processed_blocks' parameter
for a data retrieve operation.
"""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
UPDATE job SET processed_blocks = %s
WHERE job_id = %s;
""",
(processedBlocks, jobId,))
conn.commit()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
finally:
self.connPool.putconn(conn, close = False)
def setResults(self, jobId, results):
"""Sets the job 'results' parameter."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
UPDATE job SET results = %s
WHERE job_id = %s;
""",
(json.dumps(results),
jobId,))
conn.commit()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
finally:
self.connPool.putconn(conn, close = False)
##### Node #####
def insertNode(self, node):
"""Inserts a VOSpace node. Returns 'True' on success, 'False' otherwise."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
SELECT path FROM node WHERE node_id = id_from_vos_path(%s);
""",
(node.parentPath,))
result = cursor.fetchall()
parentLtreePath = result[0]["path"]
parentLtreeRelativePath = ""
if "." in parentLtreePath:
parentLtreeRelativePath = ".".join(parentLtreePath.strip(".").split('.')[1:])
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
try:
cursor.execute("""
INSERT INTO node(parent_path,
parent_relative_path,
name,
tstamp_wrapper_dir,
type,
location_id,
async_trans,
sticky,
job_id,
creator_id,
content_length,
content_md5)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT
DO NOTHING
RETURNING node_id;
""",
(parentLtreePath,
parentLtreeRelativePath,
node.name,
node.wrapperDir,
node.type,
node.locationId,
node.asyncTrans,
node.sticky,
node.jobId,
node.creatorId,
node.contentLength,
node.contentMD5,))
result = cursor.fetchall()
conn.commit()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
if result:
return True
else:
return False
finally:
self.connPool.putconn(conn, close = False)
def setAsyncTrans(self, nodeVOSPath, value):
"""Sets the 'async_trans' flag for a VOSpace node."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
UPDATE node c SET async_trans = %s
FROM node n
WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s);
""",
(value, nodeVOSPath,))
conn.commit()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
finally:
self.connPool.putconn(conn, close = False)
def setJobId(self, nodeVOSPath, value):
"""Sets the 'job_id' flag for a VOSpace node."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
UPDATE node c SET job_id = %s
FROM node n
WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s);
""",
(value, nodeVOSPath,))
conn.commit()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
finally:
self.connPool.putconn(conn, close = False)
def setPhyDeletedOn(self, nodeId):
"""Sets the 'phy_deleted_on' flag for a VOSpace deleted node."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
phyDeletedOn = datetime.datetime.now().isoformat()
cursor.execute("""
UPDATE deleted_node SET phy_deleted_on = %s
WHERE node_id = %s;
""",
(phyDeletedOn, nodeId,))
conn.commit()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
finally:
self.connPool.putconn(conn, close = False)
def updateGroupRead(self, groupToAdd, groupToRemove, nodeVOSPath):
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
UPDATE node c
SET group_read = update_array(c.group_read, %s, %s)
FROM node n
WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s);
""",
(groupToAdd,
groupToRemove,
nodeVOSPath,))
conn.commit()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
finally:
self.connPool.putconn(conn, close = False)
def updateGroupWrite(self, groupToAdd, groupToRemove, nodeVOSPath):
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
UPDATE node c
SET group_write = update_array(c.group_write, %s, %s)
FROM node n
WHERE c.path <@ n.path AND n.node_id = id_from_vos_path(%s);
""",
(groupToAdd,
groupToRemove,
nodeVOSPath,))
conn.commit()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
finally:
self.connPool.putconn(conn, close = False)
##### Storage #####
def insertStorage(self, storageType, basePath, baseUrl, hostname):
"""Inserts a storage point."""
conn = self.getConnection()
if not self.getStorageId(basePath):
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
INSERT INTO storage(storage_type,
base_path,
base_url,
hostname)
VALUES (%s, %s, %s, %s)
RETURNING storage_id;
""",
(storageType,
basePath,
baseUrl,
hostname,))
storageSrcId = cursor.fetchall()[0]["storage_id"]
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
if storageType == "cold" or storageType == "hot":
try:
cursor.execute("""
SELECT storage_id
FROM storage
WHERE storage_type = 'local'
AND base_path = '/home'
AND hostname = 'localhost';
""")
storageDestId = cursor.fetchall()[0]["storage_id"]
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
locationType = "async"
else:
storageDestId = storageSrcId
locationType = "portal"
try:
cursor.execute("""
INSERT INTO location(location_type,
storage_src_id,
storage_dest_id)
VALUES (%s, %s, %s);
""",
(locationType,
storageSrcId,
storageDestId,))
conn.commit()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
return True
finally:
self.connPool.putconn(conn, close = False)
else:
return False
def deleteStorage(self, storageId):
"""Deletes a storage point."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
SELECT count(*) > 0 AS res
FROM Node
WHERE location_id IN
(SELECT location_id
FROM storage s
JOIN location l
ON s.storage_id = l.storage_src_id
WHERE storage_src_id = %s);
""",
(storageId,))
result = cursor.fetchall()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
if result[0]["res"]:
return False
else:
try:
cursor.execute("""
DELETE FROM storage
WHERE storage_id = %s;
""",
(storageId,))
conn.commit()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
return True
finally:
self.connPool.putconn(conn, close = False)
##### Users #####
def insertUser(self, userId, username, email):
"""Inserts users data."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
INSERT INTO users(user_id,
user_name,
e_mail)
VALUES (%s, %s, %s)
ON CONFLICT (user_id)
DO NOTHING;
""",
(userId,
username,
email,))
conn.commit()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
finally:
self.connPool.putconn(conn, close = False)