diff --git a/transfer_service/job_handler.py b/transfer_service/job_handler.py index 6b95db0cf1770c5fad2c6c2ab7b647513101cbef..7f2451e8130b3a58dbd5902b5092780c45235c93 100644 --- a/transfer_service/job_handler.py +++ b/transfer_service/job_handler.py @@ -3,6 +3,7 @@ import sys from import_amqp_server import ImportAMQPServer from start_job_amqp_server import StartJobAMQPServer +from storage_amqp_server import StorageAMQPServer from get_job_amqp_server import GetJobAMQPServer from abort_job_amqp_server import AbortJobAMQPServer from store_amqp_server import StoreAMQPServer @@ -26,6 +27,8 @@ class JobHandler(object): self.amqpServerList.append(StoreAMQPServer(self.host, self.port, rpcQueue)) elif srvType == 'import': self.amqpServerList.append(ImportAMQPServer(self.host, self.port, rpcQueue)) + elif srvType == 'storage': + self.amqpServerList.append(StorageAMQPServer(self.host, self.port, rpcQueue)) else: sys.exit(f"FATAL: unknown server type {srvType}.") diff --git a/transfer_service/storage_amqp_server.py b/transfer_service/storage_amqp_server.py new file mode 100644 index 0000000000000000000000000000000000000000..1ed035673b95d712f33aeacafb732e936ac798af --- /dev/null +++ b/transfer_service/storage_amqp_server.py @@ -0,0 +1,61 @@ +import os + +from amqp_server import AMQPServer +from config import Config +from db_connector import DbConnector + + +class StorageAMQPServer(AMQPServer): + + def __init__(self, host, port, queue): + self.type = "storage" + config = Config("vos_ts.conf") + self.params = config.loadSection("file_catalog") + self.dbConn = DbConnector(self.params["user"], + self.params["password"], + self.params["host"], + self.params.getint("port"), + self.params["db"]) + self.storageType = None + self.storageBasePath = None + self.storageHostname = None + super(StorageAMQPServer, self).__init__(host, port, queue) + + def execute_callback(self, requestBody): + # 'requestType', 'mountPoint', 'hostname' and 'storageType' attributes are mandatory + if "requestType" not in requestBody: + response = { "responseType": "ERROR", + "errorCode": 1, + "errorMsg": "Malformed request, missing parameters." } + elif requestBody["requestType"] == "STORAGE_ADD": + self.storageType = requestBody["storageType"] + self.storageBasePath = requestBody["basePath"] + self.storageHostname = requestBody["hostname"] + + if not os.path.exists(self.storageBasePath): + response = { "responseType": "ERROR", + "errorCode": 2, + "errorMsg": "Base path doesn't exist."} + return response + self.dbConn.connect() + result = self.dbConn.insertStorage(self.storageType, + self.storageBasePath, + self.storageHostname) + self.dbConn.disconnect() + + if result: + response = { "responseType": "STORAGE_ADD_DONE" } + return response + else: + response = { "responseType": "ERROR", + "errorCode": 3, + "errorMsg": "Storage point already exists." } + return response + elif requestBody["requestType"] == "STORAGE_RMV": + pass + elif requestBody["requestType"] == "STORAGE_LST": + pass + + def run(self): + print(f"Starting AMQP server of type {self.type}...") + super(StorageAMQPServer, self).run() diff --git a/transfer_service/transfer_service.py b/transfer_service/transfer_service.py index 1a3d56b3d1df91e6e66e672426453056f2a68b65..b3364711d612e1e62a30f2f76b9011fac039fe9d 100644 --- a/transfer_service/transfer_service.py +++ b/transfer_service/transfer_service.py @@ -24,6 +24,9 @@ class TransferService(object): # Import self.jobHandler.addAMQPServer('import', 'import_queue') + + # Storage + self.jobHandler.addAMQPServer('storage', 'storage_queue') def start(self): self.jobScheduler.start()