diff --git a/client/Dockerfile b/client/Dockerfile index b70b1264fa8bd0e39e3cbbd6e8bfb8f2574af222..1a827b63932a58d107b497e6aa819f3185e6a226 100644 --- a/client/Dockerfile +++ b/client/Dockerfile @@ -21,7 +21,8 @@ COPY *.py \ vos_group \ vos_import \ vos_job \ - vos_storage /usr/bin/vos_cli/ + vos_storage \ + vos_user /usr/bin/vos_cli/ RUN chmod -R 755 /usr/bin/vos_cli # Copy bash-completion scripts @@ -45,7 +46,9 @@ WORKDIR /home/client/ RUN echo ". /usr/share/bash-completion/completions/vos_data" >> .bashrc && \ echo ". /usr/share/bash-completion/completions/vos_group" >> .bashrc && \ echo ". /usr/share/bash-completion/completions/vos_job" >> .bashrc && \ + echo ". /usr/share/bash-completion/completions/vos_user" >> .bashrc && \ echo ". /usr/share/bash-completion/completions/vos_storage" >> .bashrc # Install python dependencies RUN pip3.9 install --no-cache-dir redis tabulate + diff --git a/client/config/bash_completion/vos_user b/client/config/bash_completion/vos_user new file mode 100644 index 0000000000000000000000000000000000000000..eca4d006fc6b423b577e85f7c00ee032da0044ca --- /dev/null +++ b/client/config/bash_completion/vos_user @@ -0,0 +1,20 @@ +# +# This file is part of vospace-transfer-service +# Copyright (C) 2021 Istituto Nazionale di Astrofisica +# SPDX-License-Identifier: GPL-3.0-or-later +# + +_vos_user() +{ + local cur prev opts + COMPREPLY=() + cur="${COMP_WORDS[COMP_CWORD]}" + prev="${COMP_WORDS[COMP_CWORD-1]}" + opts="add del search" + + if [[ ${cur} == add || ${cur} == del || ${cur} == search || ${COMP_CWORD} -eq 1 ]] ; then + COMPREPLY=( $(compgen -W "${opts}" -- ${cur}) ) + return 0 + fi +} +complete -F _vos_user vos_user diff --git a/client/config/vos_cli.conf b/client/config/vos_cli.conf index 1c672befda4faeaa7493ab751c69e284089c8155..81644fa814b7d4e8f1339db0ad9068961663414a 100644 --- a/client/config/vos_cli.conf +++ b/client/config/vos_cli.conf @@ -26,3 +26,6 @@ rpc_queue = storage_queue [vos_group] rpc_queue = group_queue + +[vos_user] +rpc_queue = user_queue diff --git a/client/vos_user b/client/vos_user new file mode 100644 index 0000000000000000000000000000000000000000..e209028b2ad5a91d7bc785329f738b59c453b7dd --- /dev/null +++ b/client/vos_user @@ -0,0 +1,207 @@ +#!/usr/bin/env python +# +# This file is part of vospace-transfer-service +# Copyright (C) 2021 Istituto Nazionale di Astrofisica +# SPDX-License-Identifier: GPL-3.0-or-later +# + +import sys + +from redis_rpc_client import RedisRPCClient +from config import Config +from tabulate import tabulate + + +class VOSUser(RedisRPCClient): + + def __init__(self): + config = Config("/etc/vos_cli/vos_cli.conf") + params = config.loadSection("server") + self.host = params["host"] + self.port = params.getint("port") + self.db = params.getint("db") + params = config.loadSection("vos_user") + self.rpcQueue = params["rpc_queue"] + super(VOSUser, self).__init__(self.host, self.port, self.db, self.rpcQueue) + + def add(self): + userId = None + username = None + email = None + while not userId: + try: + userId = input("\nUser ID: ") + except ValueError: + print("Input type is not valid!") + except EOFError: + print("\nPlease, use CTRL+C to quit.") + except KeyboardInterrupt: + sys.exit("\nCTRL+C detected. Exiting...") + while not username: + try: + username = input("\nUsername: ") + except ValueError: + print("Input type is not valid!") + except EOFError: + print("\nPlease, use CTRL+C to quit.") + except KeyboardInterrupt: + sys.exit("\nCTRL+C detected. Exiting...") + while not email: + # Add email validation here + try: + email = input("\nE-mail: ") + except ValueError: + print("Input type is not valid!") + except EOFError: + print("\nPlease, use CTRL+C to quit.") + except KeyboardInterrupt: + sys.exit("\nCTRL+C detected. Exiting...") + + userRequest = { "requestType": "USER_ADD", + "userId": userId, + "username": username, + "email": email } + userResponse = self.call(userRequest) + + if "responseType" not in userResponse: + sys.exit("FATAL: Malformed response, acknowledge expected.\n") + elif userResponse["responseType"] == "USER_ADD_DONE": + print("\nUser added successfully!\n") + elif userResponse["responseType"] == "ERROR": + errorCode = userResponse["errorCode"] + errorMsg = userResponse["errorMsg"] + sys.exit(f"\nError code: {errorCode}\nError message: {errorMsg}\n") + else: + sys.exit("\nFATAL: Unknown response type.\n") + + def delete(self): + userRequest = { "requestType": "USER_DEL_REQ" } + userResponse = self.call(userRequest) + + if "responseType" not in userResponse: + sys.exit("FATAL: Malformed response, acknowledge expected.\n") + elif userResponse["responseType"] == "USER_DEL_ACK": + userList = userResponse["userList"] + if not userList: + sys.exit("\nNo username found.\n") + userIdList = [] + for user in userList: + userIdList.append(user["user_id"]) + userId = None + while not userId in userIdList: + try: + userId = input("\nPlease, insert a valid user ID: ") + except ValueError: + print("Input type is not valid!") + except EOFError: + print("\nPlease, use CTRL+C to quit.") + except KeyboardInterrupt: + sys.exit("\nCTRL+C detected. Exiting...") + print("\n!!!!!!!!!!!!!!!!!!!!!!!!WARNING!!!!!!!!!!!!!!!!!!!!!!!!") + print("! This operation will remove the selected user only !") + print("! from the database. !") + print("! The user on the transfer node will not be removed, !") + print("! you must do it manually. !") + print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n") + confirm = None + while not confirm in ( "yes", "no" ): + try: + confirm = input("Are you sure to proceed? [yes/no]: ") + except KeyboardInterrupt: + sys.exit("\nCTRL+C detected. Exiting...") + except EOFError: + print("\nPlease, use CTRL+C to quit.") + if confirm == "yes": + confirmRequest = { "requestType": "USER_DEL_CON", "userId": userId } + confirmResponse = self.call(confirmRequest) + if "responseType" not in confirmResponse: + sys.exit("\nFATAL: Malformed response, confirmation expected.\n") + elif confirmResponse["responseType"] == "USER_DEL_DONE": + print("\nUser deleted successfully!\n") + elif confirmResponse["responseType"] == "ERROR": + errorCode = confirmResponse["errorCode"] + errorMsg = confirmResponse["errorMsg"] + sys.exit(f"\nError code: {errorCode}\nError message: {errorMsg}\n") + else: + sys.exit("\nFATAL: Unknown response type.\n") + elif userResponse["responseType"] == "ERROR": + errorCode = userResponse["errorCode"] + errorMsg = userResponse["errorMsg"] + sys.exit(f"Error code: {errorCode}\nError message: {errorMsg}\n") + else: + sys.exit("\nFATAL: Unknown response type.\n") + + def search(self, searchStr): + userRequest = { "requestType": "USER_SEARCH", "searchStr": searchStr } + userResponse = self.call(userRequest) + if "responseType" not in userResponse: + sys.exit("FATAL: Malformed response.\n") + elif userResponse["responseType"] == "SEARCH_DONE": + userSearch = userResponse["userSearch"] + if userSearch: + print("\n" + tabulate(userResponse["userSearch"], headers = "keys", tablefmt = "pretty") + "\n") + else: + sys.exit(f"\nThe search did not return any results.\n") + elif userResponse["responseType"] == "ERROR": + errorCode = userResponse["errorCode"] + errorMsg = userResponse["errorMsg"] + sys.exit(f"\nError code: {errorCode}\nError message: {errorMsg}\n") + else: + sys.exit("\nFATAL: Unknown response type.\n") + + def help(self): + sys.exit(""" +NAME + vos_user + +SYNOPSYS + vos_user COMMAND [ARGUMENT] + +DESCRIPTION + Client tool to manage VOSpace users in the database. + + The client accepts only one (mandatory) command at a time. + A list of supported commands is shown here below: + + add + adds a user to the database + + del + deletes a user from the database + + search + performs a search on users and returns those having a match between the search string + passed via command line and one of the following fields: + + 'user_id', 'user_name', 'e_mail' + + Adding a user to the database requires a user ID, a username and an e-mail address. + A valid userID is required when deleting a user from the database. + + IMPORTANT NOTE: + the VOSpace Transfer Service automatically populates the 'users' table in the database + by previously quering the authentication system (RAP). + So, please, use this client only if you need to handle situations involing users that + for some reason are not recognized by the authentication system. + """) + +# Create new VOSUser object +vosUserCli = VOSUser() + +# Check the number of input args +if len(sys.argv) == 2: + script, cmd = sys.argv + if cmd == "add": + vosUserCli.add() + elif cmd == "del": + vosUserCli.delete() + else: + vosUserCli.help() +elif len(sys.argv) == 3: + script, cmd, arg = sys.argv + if cmd == "search": + vosUserCli.search(arg) + else: + vosUserCli.help() +else: + vosUserCli.help() diff --git a/transfer_service/cli_handler.py b/transfer_service/cli_handler.py index e290db2b482b0cbdb8d54b4541868a4a78c9b8ea..35d7192b0e7538a9772c09632948183d44374152 100644 --- a/transfer_service/cli_handler.py +++ b/transfer_service/cli_handler.py @@ -13,6 +13,7 @@ from group_rw_rpc_server import GroupRwRPCServer from import_rpc_server import ImportRPCServer from job_rpc_server import JobRPCServer from storage_rpc_server import StorageRPCServer +from user_rpc_server import UserRPCServer class CliHandler(object): @@ -28,6 +29,8 @@ class CliHandler(object): self.rpcServerList.append(DataRPCServer(self.host, self.port, self.db, rpcQueue)) elif srvType == "group_rw": self.rpcServerList.append(GroupRwRPCServer(self.host, self.port, self.db, rpcQueue)) + elif srvType == "user": + self.rpcServerList.append(UserRPCServer(self.host, self.port, self.db, rpcQueue)) elif srvType == "import": self.rpcServerList.append(ImportRPCServer(self.host, self.port, self.db, rpcQueue)) elif srvType == "storage": diff --git a/transfer_service/db_connector.py b/transfer_service/db_connector.py index 52ce7c109a120376ab95310b96c61b1760bdc988..7d280951ce66c604896191217c8037c420b80ddf 100644 --- a/transfer_service/db_connector.py +++ b/transfer_service/db_connector.py @@ -576,7 +576,7 @@ class DbConnector(object): self.connPool.putconn(conn, close = False) def getUserId(self, username): - """Returns the user id for a given user name.""" + """Returns the user id for a given user name (if any), 'False' otherwise .""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) @@ -588,7 +588,10 @@ class DbConnector(object): conn.rollback() raise else: - return result[0]["user_id"] + if result: + return result[0]["user_id"] + else: + return False finally: self.connPool.putconn(conn, close = False) @@ -625,6 +628,49 @@ class DbConnector(object): return result[0]["e_mail"] finally: self.connPool.putconn(conn, close = False) + + def getUserList(self): + """Returns the full user list.""" + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute("SELECT * FROM users;") + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + return result + finally: + self.connPool.putconn(conn, close = False) + + def searchUsers(self, searchStr): + "Performs a search on users." + conn = self.getConnection() + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + SELECT user_id, + user_name, + e_mail + FROM users + WHERE (user_id ~ %(str)s + OR user_name ~ %(str)s + OR e_mail ~ %(str)s); + """, + { 'str': searchStr }) + result = cursor.fetchall() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + return result + finally: + self.connPool.putconn(conn, close = False) ##### Storage ##### @@ -1276,26 +1322,55 @@ class DbConnector(object): ##### Users ##### def insertUser(self, userId, username, email): - """Inserts users data.""" + """ + Inserts a user in the database. + Returns 'True' on success, 'False' otherwise. + """ + conn = self.getConnection() + if not self.getUserId(username): + try: + cursor = conn.cursor(cursor_factory = RealDictCursor) + cursor.execute(""" + INSERT INTO users(user_id, + user_name, + e_mail) + VALUES (%s, %s, %s) + ON CONFLICT (user_id) + DO NOTHING; + """, + (userId, + username, + email,)) + conn.commit() + cursor.close() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + return True + finally: + self.connPool.putconn(conn, close = False) + else: + return False + + def deleteUser(self, userId): + """Deletes a user from the database.""" conn = self.getConnection() try: cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" - INSERT INTO users(user_id, - user_name, - e_mail) - VALUES (%s, %s, %s) - ON CONFLICT (user_id) - DO NOTHING; + DELETE FROM users + WHERE user_id = %s; """, - (userId, - username, - email,)) + (userId,)) conn.commit() cursor.close() except Exception: if not conn.closed: conn.rollback() raise + else: + return True finally: self.connPool.putconn(conn, close = False) diff --git a/transfer_service/transfer_service.py b/transfer_service/transfer_service.py index ff9d2a374a034dd8dd37c699b8fbea495814d02e..978aa97345c798c88e3b7bea0228d799448c82f4 100644 --- a/transfer_service/transfer_service.py +++ b/transfer_service/transfer_service.py @@ -48,6 +48,9 @@ class TransferService(object): # Group self.cliHandler.addRPCServer("group_rw", "group_queue") + + # User + self.cliHandler.addRPCServer("user", "user_queue") # Import self.cliHandler.addRPCServer("import", "import_queue") diff --git a/transfer_service/user_rpc_server.py b/transfer_service/user_rpc_server.py new file mode 100644 index 0000000000000000000000000000000000000000..058e26ef17af997e693b8b216fa4e744bba6fa59 --- /dev/null +++ b/transfer_service/user_rpc_server.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python +# +# This file is part of vospace-transfer-service +# Copyright (C) 2021 Istituto Nazionale di Astrofisica +# SPDX-License-Identifier: GPL-3.0-or-later +# + +import logging +import os + +from redis_log_handler import RedisLogHandler +from redis_rpc_server import RedisRPCServer +from config import Config +from db_connector import DbConnector + + +class UserRPCServer(RedisRPCServer): + + def __init__(self, host, port, db, rpcQueue): + self.type = "user" + config = Config("/etc/vos_ts/vos_ts.conf") + params = config.loadSection("file_catalog") + self.dbConn = DbConnector(params["user"], + params["password"], + params["host"], + params.getint("port"), + params["db"], + 1, + 2) + params = config.loadSection("logging") + self.logger = logging.getLogger(__name__) + logLevel = "logging." + params["log_level"] + logFormat = params["log_format"] + logFormatter = logging.Formatter(logFormat) + self.logger.setLevel(eval(logLevel)) + redisLogHandler = RedisLogHandler() + redisLogHandler.setFormatter(logFormatter) + self.logger.addHandler(redisLogHandler) + super(UserRPCServer, self).__init__(host, port, db, rpcQueue) + + def callback(self, requestBody): + # 'requestType' attribute is mandatory + if "requestType" not in requestBody: + errorMsg = "Malformed request, missing parameters." + response = { "responseType": "ERROR", + "errorCode": 1, + "errorMsg": errorMsg } + elif requestBody["requestType"] == "USER_ADD": + userId = requestBody["userId"] + username = requestBody["username"] + email = requestBody["email"] + try: + result = self.dbConn.insertUser(userId, + username, + email) + except Exception: + errorMsg = "Database error." + self.logger.exception(errorMsg) + response = { "responseType": "ERROR", + "errorCode": 2, + "errorMsg": errorMsg } + return response + else: + if result: + response = { "responseType": "USER_ADD_DONE" } + else: + errorMsg = "User already exists." + self.logger.error(errorMsg) + response = { "responseType": "ERROR", + "errorCode": 4, + "errorMsg": errorMsg } + elif requestBody["requestType"] == "USER_DEL_REQ": + try: + result = self.dbConn.getUserList() + except Exception: + errorMsg = "Database error." + self.logger.exception(errorMsg) + response = { "responseType": "ERROR", + "errorCode": 2, + "errorMsg": errorMsg } + return response + else: + response = { "responseType": "USER_DEL_ACK", + "userList": result } + elif requestBody["requestType"] == "USER_DEL_CON": + userId = requestBody["userId"] + try: + result = self.dbConn.deleteUser(userId) + except Exception: + errorMsg = "Database error." + self.logger.exception(errorMsg) + response = { "responseType": "ERROR", + "errorCode": 2, + "errorMsg": errorMsg } + return response + else: + response = { "responseType": "USER_DEL_DONE" } + elif requestBody["requestType"] == "USER_SEARCH": + searchStr = requestBody["searchStr"] + try: + result = self.dbConn.searchUsers(searchStr) + except Exception: + errorMsg = "Database error." + self.logger.exception(errorMsg) + response = { "responseType": "ERROR", + "errorCode": 2, + "errorMsg": errorMsg } + return response + else: + response = { "responseType": "SEARCH_DONE", + "userSearch": result } + else: + errorMsg = "Unkown request type." + self.logger.error(errorMsg) + response = { "responseType": "ERROR", + "errorCode": 6, + "errorMsg": errorMsg } + return response + + def run(self): + self.logger.info(f"Starting RPC server of type {self.type}...") + super(UserRPCServer, self).run() +