Select Git revision
storage_rpc_server.py
-
Cristiano Urban authored
Signed-off-by:
Cristiano Urban <cristiano.urban@inaf.it>
Cristiano Urban authoredSigned-off-by:
Cristiano Urban <cristiano.urban@inaf.it>
storage_rpc_server.py 5.80 KiB
#!/usr/bin/env python
import logging
import os
from redis_log_handler import RedisLogHandler
from redis_rpc_server import RedisRPCServer
from config import Config
from db_connector import DbConnector
class StorageRPCServer(RedisRPCServer):
def __init__(self, host, port, db, rpcQueue):
self.type = "storage"
config = Config("/etc/vos_ts/vos_ts.conf")
params = config.loadSection("file_catalog")
self.dbConn = DbConnector(params["user"],
params["password"],
params["host"],
params.getint("port"),
params["db"],
1,
2)
params = config.loadSection("logging")
self.logger = logging.getLogger(__name__)
logLevel = "logging." + params["log_level"]
logFormat = params["log_format"]
logFormatter = logging.Formatter(logFormat)
self.logger.setLevel(eval(logLevel))
redisLogHandler = RedisLogHandler()
#logStreamHandler = logging.StreamHandler()
#logStreamHandler.setFormatter(logFormatter)
redisLogHandler.setFormatter(logFormatter)
self.logger.addHandler(redisLogHandler)
#self.logger.addHandler(logStreamHandler)
super(StorageRPCServer, self).__init__(host, port, db, rpcQueue)
def callback(self, requestBody):
# 'requestType' attribute is mandatory
if "requestType" not in requestBody:
errorMsg = "Malformed request, missing parameters."
response = { "responseType": "ERROR",
"errorCode": 1,
"errorMsg": errorMsg }
elif requestBody["requestType"] == "STORAGE_ADD":
storageType = requestBody["storageType"]
storageBasePath = requestBody["basePath"]
storageBaseUrl = requestBody["baseUrl"]
storageHostname = requestBody["hostname"]
if storageType != "portal":
if not os.path.exists(storageBasePath):
errorMsg = "Base path doesn't exist."
self.logger.error(errorMsg)
response = { "responseType": "ERROR",
"errorCode": 3,
"errorMsg": errorMsg }
return response
try:
result = self.dbConn.insertStorage(storageType,
storageBasePath,
storageBaseUrl,
storageHostname)
except Exception:
errorMsg = "Database error."
self.logger.exception(errorMsg)
response = { "responseType": "ERROR",
"errorCode": 2,
"errorMsg": errorMsg }
return response
else:
if result:
response = { "responseType": "STORAGE_ADD_DONE" }
else:
errorMsg = "Storage point already exists."
self.logger.error(errorMsg)
response = { "responseType": "ERROR",
"errorCode": 4,
"errorMsg": errorMsg }
elif requestBody["requestType"] == "STORAGE_DEL_REQ":
try:
result = self.dbConn.getStorageList()
except Exception:
errorMsg = "Database error."
self.logger.exception(errorMsg)
response = { "responseType": "ERROR",
"errorCode": 2,
"errorMsg": errorMsg }
return response
else:
response = { "responseType": "STORAGE_DEL_ACK",
"storageList": result }
elif requestBody["requestType"] == "STORAGE_DEL_CON":
storageId = requestBody["storageId"]
try:
result = self.dbConn.deleteStorage(storageId)
except Exception:
errorMsg = "Database error."
self.logger.exception(errorMsg)
response = { "responseType": "ERROR",
"errorCode": 2,
"errorMsg": errorMsg }
return response
else:
if result:
response = { "responseType": "STORAGE_DEL_DONE" }
else:
errorMsg = "This storage location contains some VOSpace nodes. Please, move those nodes to another location."
self.logger.error(errorMsg)
response = { "responseType": "ERROR",
"errorCode": 5,
"errorMsg": errorMsg }
elif requestBody["requestType"] == "STORAGE_LST":
try:
result = self.dbConn.getStorageList()
except Exception:
errorMsg = "Database error."
self.logger.exception(errorMsg)
response = { "responseType": "ERROR",
"errorCode": 2,
"errorMsg": errorMsg }
return response
else:
response = { "responseType": "STORAGE_LST_DONE",
"storageList": result }
else:
errorMsg = "Unkown request type."
self.logger.error(errorMsg)
response = { "responseType": "ERROR",
"errorCode": 6,
"errorMsg": errorMsg }
return response
def run(self):
self.logger.info(f"Starting RPC server of type {self.type}...")
super(StorageRPCServer, self).run()