Something went wrong on our end
Select Git revision
-
Giovanni La Mura authoredGiovanni La Mura authored
storage_rpc_server.py 4.13 KiB
#!/usr/bin/env python
import logging
import os
from redis_rpc_server import RedisRPCServer
from config import Config
from db_connector import DbConnector
class StorageRPCServer(RedisRPCServer):
def __init__(self, host, port, db, rpcQueue):
self.type = "storage"
config = Config("/etc/vos_ts/vos_ts.conf")
self.params = config.loadSection("file_catalog")
self.dbConn = DbConnector(self.params["user"],
self.params["password"],
self.params["host"],
self.params.getint("port"),
self.params["db"],
1,
2)
params = config.loadSection("logging")
self.logger = logging.getLogger("StorageRPCServer")
logLevel = "logging." + params["log_level"]
logDir = params["log_dir"]
logFile = logDir + '/' + "storage_rpc_server.log"
self.logger.setLevel(eval(logLevel))
logFileHandler = logging.FileHandler(logFile)
logStreamHandler = logging.StreamHandler()
logFormatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logFileHandler.setFormatter(logFormatter)
logStreamHandler.setFormatter(logFormatter)
self.logger.addHandler(logFileHandler)
self.logger.addHandler(logStreamHandler)
super(StorageRPCServer, self).__init__(host, port, db, rpcQueue)
def callback(self, requestBody):
# 'requestType' attribute is mandatory
if "requestType" not in requestBody:
response = { "responseType": "ERROR",
"errorCode": 1,
"errorMsg": "Malformed request, missing parameters." }
elif requestBody["requestType"] == "STORAGE_ADD":
storageType = requestBody["storageType"]
storageBasePath = requestBody["basePath"]
storageBaseUrl = requestBody["baseUrl"]
storageHostname = requestBody["hostname"]
if storageType != "portal":
if not os.path.exists(storageBasePath):
response = { "responseType": "ERROR",
"errorCode": 2,
"errorMsg": "Base path doesn't exist."}
return response
result = self.dbConn.insertStorage(storageType,
storageBasePath,
storageBaseUrl,
storageHostname)
if result:
response = { "responseType": "STORAGE_ADD_DONE" }
else:
response = { "responseType": "ERROR",
"errorCode": 3,
"errorMsg": "Storage point already exists." }
elif requestBody["requestType"] == "STORAGE_DEL_REQ":
result = self.dbConn.getStorageList()
response = { "responseType": "STORAGE_DEL_ACK",
"storageList": result }
elif requestBody["requestType"] == "STORAGE_DEL_CON":
storageId = requestBody["storageId"]
result = self.dbConn.deleteStorage(storageId)
if result:
response = { "responseType": "STORAGE_DEL_DONE" }
else:
response = { "responseType": "ERROR",
"errorCode": 4,
"errorMsg": "This storage location contains some VOSpace nodes. Please, move those nodes to another location." }
elif requestBody["requestType"] == "STORAGE_LST":
result = self.dbConn.getStorageList()
response = { "responseType": "STORAGE_LST_DONE",
"storageList": result }
else:
response = { "responseType": "ERROR",
"errorCode": 5,
"errorMsg": "Unkown request type." }
return response
def run(self):
self.logger.info(f"Starting RPC server of type {self.type}...")
super(StorageRPCServer, self).run()