#!/usr/bin/env python # # This file is part of vospace-transfer-service # Copyright (C) 2021 Istituto Nazionale di Astrofisica # SPDX-License-Identifier: GPL-3.0-or-later # import logging import os from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from config import Config from db_connector import DbConnector from tape_client import TapeClient class StorageRPCServer(RedisRPCServer): def __init__(self, host, port, db, rpcQueue): self.type = "storage" config = Config("/etc/vos_ts/vos_ts.conf") params = config.loadSection("transfer_node") self.vospaceUserBasePath = params["base_path"].split("/{")[0] params = config.loadSection("logging") self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] logFormat = params["log_format"] logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) params = config.loadSection("file_catalog") self.dbConn = DbConnector(params["user"], params["password"], params["host"], params.getint("port"), params["db"], 1, 2, self.logger) params = config.loadSection("spectrum_protect") self.tapeClient = TapeClient(params["host"], params.getint("port"), params["user"], params["pkey_file_path"], self.logger) super(StorageRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): # 'requestType' attribute is mandatory if "requestType" not in requestBody: errorMsg = "Malformed request, missing parameters." response = { "responseType": "ERROR", "errorCode": 1, "errorMsg": errorMsg } elif requestBody["requestType"] == "STORAGE_ADD": storageType = requestBody["storageType"] storageBasePath = requestBody["basePath"] storageHostname = requestBody["hostname"] tapeHSMFilesystem = requestBody["tapeHSMFilesystem"] if not os.path.exists(storageBasePath): errorMsg = "Base path doesn't exist." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": errorMsg } return response try: result = self.dbConn.insertStorage(storageType, storageBasePath, storageHostname, self.vospaceUserBasePath, tapeHSMFilesystem) except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": errorMsg } return response else: if result: response = { "responseType": "STORAGE_ADD_DONE" } else: errorMsg = "Storage point already exists." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 4, "errorMsg": errorMsg } elif requestBody["requestType"] == "STORAGE_DEL_REQ": try: result = self.dbConn.getStorageList() except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": errorMsg } return response else: response = { "responseType": "STORAGE_DEL_ACK", "storageList": result } elif requestBody["requestType"] == "STORAGE_DEL_CON": storageId = requestBody["storageId"] try: result = self.dbConn.deleteStorage(storageId) except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": errorMsg } return response else: if result: response = { "responseType": "STORAGE_DEL_DONE" } else: errorMsg = "This storage location contains some VOSpace nodes. Please, move those nodes to another location." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 5, "errorMsg": errorMsg } elif requestBody["requestType"] == "STORAGE_LST": try: result = self.dbConn.getStorageList() except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": errorMsg } return response else: response = { "responseType": "STORAGE_LST_DONE", "storageList": result } elif requestBody["requestType"] == "TAPE_HSM_FS_LST": try: self.tapeClient.connect() tapeHSMFilesystemList = self.tapeClient.getHSMFilesystemList() self.tapeClient.disconnect() except Exception: errorMsg = "Unable to get tape HSM filesystem list." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 6, "errorMsg": errorMsg } else: response = { "responseType": "TAPE_HSM_FS_LST_DONE", "tapeHSMFilesystemList": tapeHSMFilesystemList } else: errorMsg = "Unkown request type." self.logger.error(errorMsg) response = { "responseType": "ERROR", "errorCode": 7, "errorMsg": errorMsg } return response def run(self): self.logger.info(f"Starting RPC server of type {self.type}...") super(StorageRPCServer, self).run()