Skip to content
Snippets Groups Projects
Commit c709c06d authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Added client-server code to handle 'users' table in database (vos_user).

parent be872c7b
No related branches found
No related tags found
No related merge requests found
......@@ -21,7 +21,8 @@ COPY *.py \
vos_group \
vos_import \
vos_job \
vos_storage /usr/bin/vos_cli/
vos_storage \
vos_user /usr/bin/vos_cli/
RUN chmod -R 755 /usr/bin/vos_cli
# Copy bash-completion scripts
......@@ -45,7 +46,9 @@ WORKDIR /home/client/
RUN echo ". /usr/share/bash-completion/completions/vos_data" >> .bashrc && \
echo ". /usr/share/bash-completion/completions/vos_group" >> .bashrc && \
echo ". /usr/share/bash-completion/completions/vos_job" >> .bashrc && \
echo ". /usr/share/bash-completion/completions/vos_user" >> .bashrc && \
echo ". /usr/share/bash-completion/completions/vos_storage" >> .bashrc
# Install python dependencies
RUN pip3.9 install --no-cache-dir redis tabulate
#
# This file is part of vospace-transfer-service
# Copyright (C) 2021 Istituto Nazionale di Astrofisica
# SPDX-License-Identifier: GPL-3.0-or-later
#
_vos_user()
{
local cur prev opts
COMPREPLY=()
cur="${COMP_WORDS[COMP_CWORD]}"
prev="${COMP_WORDS[COMP_CWORD-1]}"
opts="add del search"
if [[ ${cur} == add || ${cur} == del || ${cur} == search || ${COMP_CWORD} -eq 1 ]] ; then
COMPREPLY=( $(compgen -W "${opts}" -- ${cur}) )
return 0
fi
}
complete -F _vos_user vos_user
......@@ -26,3 +26,6 @@ rpc_queue = storage_queue
[vos_group]
rpc_queue = group_queue
[vos_user]
rpc_queue = user_queue
#!/usr/bin/env python
#
# This file is part of vospace-transfer-service
# Copyright (C) 2021 Istituto Nazionale di Astrofisica
# SPDX-License-Identifier: GPL-3.0-or-later
#
import sys
from redis_rpc_client import RedisRPCClient
from config import Config
from tabulate import tabulate
class VOSUser(RedisRPCClient):
def __init__(self):
config = Config("/etc/vos_cli/vos_cli.conf")
params = config.loadSection("server")
self.host = params["host"]
self.port = params.getint("port")
self.db = params.getint("db")
params = config.loadSection("vos_user")
self.rpcQueue = params["rpc_queue"]
super(VOSUser, self).__init__(self.host, self.port, self.db, self.rpcQueue)
def add(self):
userId = None
username = None
email = None
while not userId:
try:
userId = input("\nUser ID: ")
except ValueError:
print("Input type is not valid!")
except EOFError:
print("\nPlease, use CTRL+C to quit.")
except KeyboardInterrupt:
sys.exit("\nCTRL+C detected. Exiting...")
while not username:
try:
username = input("\nUsername: ")
except ValueError:
print("Input type is not valid!")
except EOFError:
print("\nPlease, use CTRL+C to quit.")
except KeyboardInterrupt:
sys.exit("\nCTRL+C detected. Exiting...")
while not email:
# Add email validation here
try:
email = input("\nE-mail: ")
except ValueError:
print("Input type is not valid!")
except EOFError:
print("\nPlease, use CTRL+C to quit.")
except KeyboardInterrupt:
sys.exit("\nCTRL+C detected. Exiting...")
userRequest = { "requestType": "USER_ADD",
"userId": userId,
"username": username,
"email": email }
userResponse = self.call(userRequest)
if "responseType" not in userResponse:
sys.exit("FATAL: Malformed response, acknowledge expected.\n")
elif userResponse["responseType"] == "USER_ADD_DONE":
print("\nUser added successfully!\n")
elif userResponse["responseType"] == "ERROR":
errorCode = userResponse["errorCode"]
errorMsg = userResponse["errorMsg"]
sys.exit(f"\nError code: {errorCode}\nError message: {errorMsg}\n")
else:
sys.exit("\nFATAL: Unknown response type.\n")
def delete(self):
userRequest = { "requestType": "USER_DEL_REQ" }
userResponse = self.call(userRequest)
if "responseType" not in userResponse:
sys.exit("FATAL: Malformed response, acknowledge expected.\n")
elif userResponse["responseType"] == "USER_DEL_ACK":
userList = userResponse["userList"]
if not userList:
sys.exit("\nNo username found.\n")
userIdList = []
for user in userList:
userIdList.append(user["user_id"])
userId = None
while not userId in userIdList:
try:
userId = input("\nPlease, insert a valid user ID: ")
except ValueError:
print("Input type is not valid!")
except EOFError:
print("\nPlease, use CTRL+C to quit.")
except KeyboardInterrupt:
sys.exit("\nCTRL+C detected. Exiting...")
print("\n!!!!!!!!!!!!!!!!!!!!!!!!WARNING!!!!!!!!!!!!!!!!!!!!!!!!")
print("! This operation will remove the selected user only !")
print("! from the database. !")
print("! The user on the transfer node will not be removed, !")
print("! you must do it manually. !")
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n")
confirm = None
while not confirm in ( "yes", "no" ):
try:
confirm = input("Are you sure to proceed? [yes/no]: ")
except KeyboardInterrupt:
sys.exit("\nCTRL+C detected. Exiting...")
except EOFError:
print("\nPlease, use CTRL+C to quit.")
if confirm == "yes":
confirmRequest = { "requestType": "USER_DEL_CON", "userId": userId }
confirmResponse = self.call(confirmRequest)
if "responseType" not in confirmResponse:
sys.exit("\nFATAL: Malformed response, confirmation expected.\n")
elif confirmResponse["responseType"] == "USER_DEL_DONE":
print("\nUser deleted successfully!\n")
elif confirmResponse["responseType"] == "ERROR":
errorCode = confirmResponse["errorCode"]
errorMsg = confirmResponse["errorMsg"]
sys.exit(f"\nError code: {errorCode}\nError message: {errorMsg}\n")
else:
sys.exit("\nFATAL: Unknown response type.\n")
elif userResponse["responseType"] == "ERROR":
errorCode = userResponse["errorCode"]
errorMsg = userResponse["errorMsg"]
sys.exit(f"Error code: {errorCode}\nError message: {errorMsg}\n")
else:
sys.exit("\nFATAL: Unknown response type.\n")
def search(self, searchStr):
userRequest = { "requestType": "USER_SEARCH", "searchStr": searchStr }
userResponse = self.call(userRequest)
if "responseType" not in userResponse:
sys.exit("FATAL: Malformed response.\n")
elif userResponse["responseType"] == "SEARCH_DONE":
userSearch = userResponse["userSearch"]
if userSearch:
print("\n" + tabulate(userResponse["userSearch"], headers = "keys", tablefmt = "pretty") + "\n")
else:
sys.exit(f"\nThe search did not return any results.\n")
elif userResponse["responseType"] == "ERROR":
errorCode = userResponse["errorCode"]
errorMsg = userResponse["errorMsg"]
sys.exit(f"\nError code: {errorCode}\nError message: {errorMsg}\n")
else:
sys.exit("\nFATAL: Unknown response type.\n")
def help(self):
sys.exit("""
NAME
vos_user
SYNOPSYS
vos_user COMMAND [ARGUMENT]
DESCRIPTION
Client tool to manage VOSpace users in the database.
The client accepts only one (mandatory) command at a time.
A list of supported commands is shown here below:
add
adds a user to the database
del
deletes a user from the database
search
performs a search on users and returns those having a match between the search string
passed via command line and one of the following fields:
'user_id', 'user_name', 'e_mail'
Adding a user to the database requires a user ID, a username and an e-mail address.
A valid userID is required when deleting a user from the database.
IMPORTANT NOTE:
the VOSpace Transfer Service automatically populates the 'users' table in the database
by previously quering the authentication system (RAP).
So, please, use this client only if you need to handle situations involing users that
for some reason are not recognized by the authentication system.
""")
# Create new VOSUser object
vosUserCli = VOSUser()
# Check the number of input args
if len(sys.argv) == 2:
script, cmd = sys.argv
if cmd == "add":
vosUserCli.add()
elif cmd == "del":
vosUserCli.delete()
else:
vosUserCli.help()
elif len(sys.argv) == 3:
script, cmd, arg = sys.argv
if cmd == "search":
vosUserCli.search(arg)
else:
vosUserCli.help()
else:
vosUserCli.help()
......@@ -13,6 +13,7 @@ from group_rw_rpc_server import GroupRwRPCServer
from import_rpc_server import ImportRPCServer
from job_rpc_server import JobRPCServer
from storage_rpc_server import StorageRPCServer
from user_rpc_server import UserRPCServer
class CliHandler(object):
......@@ -28,6 +29,8 @@ class CliHandler(object):
self.rpcServerList.append(DataRPCServer(self.host, self.port, self.db, rpcQueue))
elif srvType == "group_rw":
self.rpcServerList.append(GroupRwRPCServer(self.host, self.port, self.db, rpcQueue))
elif srvType == "user":
self.rpcServerList.append(UserRPCServer(self.host, self.port, self.db, rpcQueue))
elif srvType == "import":
self.rpcServerList.append(ImportRPCServer(self.host, self.port, self.db, rpcQueue))
elif srvType == "storage":
......
......@@ -576,7 +576,7 @@ class DbConnector(object):
self.connPool.putconn(conn, close = False)
def getUserId(self, username):
"""Returns the user id for a given user name."""
"""Returns the user id for a given user name (if any), 'False' otherwise ."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
......@@ -588,7 +588,10 @@ class DbConnector(object):
conn.rollback()
raise
else:
if result:
return result[0]["user_id"]
else:
return False
finally:
self.connPool.putconn(conn, close = False)
......@@ -626,6 +629,49 @@ class DbConnector(object):
finally:
self.connPool.putconn(conn, close = False)
def getUserList(self):
"""Returns the full user list."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("SELECT * FROM users;")
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
return result
finally:
self.connPool.putconn(conn, close = False)
def searchUsers(self, searchStr):
"Performs a search on users."
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
SELECT user_id,
user_name,
e_mail
FROM users
WHERE (user_id ~ %(str)s
OR user_name ~ %(str)s
OR e_mail ~ %(str)s);
""",
{ 'str': searchStr })
result = cursor.fetchall()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
return result
finally:
self.connPool.putconn(conn, close = False)
##### Storage #####
def storageBasePathIsValid(self, path):
......@@ -1276,8 +1322,12 @@ class DbConnector(object):
##### Users #####
def insertUser(self, userId, username, email):
"""Inserts users data."""
"""
Inserts a user in the database.
Returns 'True' on success, 'False' otherwise.
"""
conn = self.getConnection()
if not self.getUserId(username):
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
......@@ -1297,5 +1347,30 @@ class DbConnector(object):
if not conn.closed:
conn.rollback()
raise
else:
return True
finally:
self.connPool.putconn(conn, close = False)
else:
return False
def deleteUser(self, userId):
"""Deletes a user from the database."""
conn = self.getConnection()
try:
cursor = conn.cursor(cursor_factory = RealDictCursor)
cursor.execute("""
DELETE FROM users
WHERE user_id = %s;
""",
(userId,))
conn.commit()
cursor.close()
except Exception:
if not conn.closed:
conn.rollback()
raise
else:
return True
finally:
self.connPool.putconn(conn, close = False)
......@@ -49,6 +49,9 @@ class TransferService(object):
# Group
self.cliHandler.addRPCServer("group_rw", "group_queue")
# User
self.cliHandler.addRPCServer("user", "user_queue")
# Import
self.cliHandler.addRPCServer("import", "import_queue")
......
#!/usr/bin/env python
#
# This file is part of vospace-transfer-service
# Copyright (C) 2021 Istituto Nazionale di Astrofisica
# SPDX-License-Identifier: GPL-3.0-or-later
#
import logging
import os
from redis_log_handler import RedisLogHandler
from redis_rpc_server import RedisRPCServer
from config import Config
from db_connector import DbConnector
class UserRPCServer(RedisRPCServer):
def __init__(self, host, port, db, rpcQueue):
self.type = "user"
config = Config("/etc/vos_ts/vos_ts.conf")
params = config.loadSection("file_catalog")
self.dbConn = DbConnector(params["user"],
params["password"],
params["host"],
params.getint("port"),
params["db"],
1,
2)
params = config.loadSection("logging")
self.logger = logging.getLogger(__name__)
logLevel = "logging." + params["log_level"]
logFormat = params["log_format"]
logFormatter = logging.Formatter(logFormat)
self.logger.setLevel(eval(logLevel))
redisLogHandler = RedisLogHandler()
redisLogHandler.setFormatter(logFormatter)
self.logger.addHandler(redisLogHandler)
super(UserRPCServer, self).__init__(host, port, db, rpcQueue)
def callback(self, requestBody):
# 'requestType' attribute is mandatory
if "requestType" not in requestBody:
errorMsg = "Malformed request, missing parameters."
response = { "responseType": "ERROR",
"errorCode": 1,
"errorMsg": errorMsg }
elif requestBody["requestType"] == "USER_ADD":
userId = requestBody["userId"]
username = requestBody["username"]
email = requestBody["email"]
try:
result = self.dbConn.insertUser(userId,
username,
email)
except Exception:
errorMsg = "Database error."
self.logger.exception(errorMsg)
response = { "responseType": "ERROR",
"errorCode": 2,
"errorMsg": errorMsg }
return response
else:
if result:
response = { "responseType": "USER_ADD_DONE" }
else:
errorMsg = "User already exists."
self.logger.error(errorMsg)
response = { "responseType": "ERROR",
"errorCode": 4,
"errorMsg": errorMsg }
elif requestBody["requestType"] == "USER_DEL_REQ":
try:
result = self.dbConn.getUserList()
except Exception:
errorMsg = "Database error."
self.logger.exception(errorMsg)
response = { "responseType": "ERROR",
"errorCode": 2,
"errorMsg": errorMsg }
return response
else:
response = { "responseType": "USER_DEL_ACK",
"userList": result }
elif requestBody["requestType"] == "USER_DEL_CON":
userId = requestBody["userId"]
try:
result = self.dbConn.deleteUser(userId)
except Exception:
errorMsg = "Database error."
self.logger.exception(errorMsg)
response = { "responseType": "ERROR",
"errorCode": 2,
"errorMsg": errorMsg }
return response
else:
response = { "responseType": "USER_DEL_DONE" }
elif requestBody["requestType"] == "USER_SEARCH":
searchStr = requestBody["searchStr"]
try:
result = self.dbConn.searchUsers(searchStr)
except Exception:
errorMsg = "Database error."
self.logger.exception(errorMsg)
response = { "responseType": "ERROR",
"errorCode": 2,
"errorMsg": errorMsg }
return response
else:
response = { "responseType": "SEARCH_DONE",
"userSearch": result }
else:
errorMsg = "Unkown request type."
self.logger.error(errorMsg)
response = { "responseType": "ERROR",
"errorCode": 6,
"errorMsg": errorMsg }
return response
def run(self):
self.logger.info(f"Starting RPC server of type {self.type}...")
super(UserRPCServer, self).run()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment