Skip to content
Snippets Groups Projects
Select Git revision
  • 1dc0d2ea551cc32e00c03dc916d4a3e0bd2de13d
  • main default protected
  • 1.8.5
  • 1.8.4
  • 1.8.3
  • 1.8.2
  • 1.8.1
  • 1.8.0
  • 1.7.14
  • 1.7.13
  • 1.7.12
  • 1.7.11
  • 1.7.10
  • 1.7.9
  • 1.7.8
  • 1.7.7
  • 1.7.6
  • 1.7.5
  • 1.7.4
  • 1.7.3
  • 1.7.2
  • 1.7.1
22 results

SodaImpl.java

Blame
  • storage_rpc_server.py 5.80 KiB
    #!/usr/bin/env python
    
    import logging
    import os
    
    from redis_log_handler import RedisLogHandler
    from redis_rpc_server import RedisRPCServer
    from config import Config
    from db_connector import DbConnector
    
    
    class StorageRPCServer(RedisRPCServer):
    
        def __init__(self, host, port, db, rpcQueue):
            self.type = "storage"
            config = Config("/etc/vos_ts/vos_ts.conf")
            params = config.loadSection("file_catalog")
            self.dbConn = DbConnector(params["user"],
                                      params["password"],
                                      params["host"],
                                      params.getint("port"),
                                      params["db"],
                                      1,
                                      2)
            params = config.loadSection("logging")
            self.logger = logging.getLogger(__name__)
            logLevel = "logging." + params["log_level"]
            logFormat = params["log_format"]
            logFormatter = logging.Formatter(logFormat)
            self.logger.setLevel(eval(logLevel))
            redisLogHandler = RedisLogHandler()
            #logStreamHandler = logging.StreamHandler()
            #logStreamHandler.setFormatter(logFormatter)
            redisLogHandler.setFormatter(logFormatter)
            self.logger.addHandler(redisLogHandler)
            #self.logger.addHandler(logStreamHandler)
            super(StorageRPCServer, self).__init__(host, port, db, rpcQueue)
    
        def callback(self, requestBody):
            # 'requestType' attribute is mandatory
            if "requestType" not in requestBody:
                errorMsg = "Malformed request, missing parameters."
                response = { "responseType": "ERROR",
                             "errorCode": 1,
                             "errorMsg": errorMsg }
            elif requestBody["requestType"] == "STORAGE_ADD":
                storageType = requestBody["storageType"]
                storageBasePath = requestBody["basePath"]
                storageBaseUrl = requestBody["baseUrl"]
                storageHostname = requestBody["hostname"]
                if storageType != "portal":
                    if not os.path.exists(storageBasePath):
                        errorMsg = "Base path doesn't exist."
                        self.logger.error(errorMsg)
                        response = { "responseType": "ERROR",
                                     "errorCode": 3,
                                     "errorMsg": errorMsg }
                        return response
                try:
                    result = self.dbConn.insertStorage(storageType,
                                                       storageBasePath,
                                                       storageBaseUrl,
                                                       storageHostname)
                except Exception:
                    errorMsg = "Database error."
                    self.logger.exception(errorMsg)
                    response = { "responseType": "ERROR",
                                 "errorCode": 2,
                                 "errorMsg": errorMsg }
                    return response
                else:
                    if result:
                        response = { "responseType": "STORAGE_ADD_DONE" }
                    else:
                        errorMsg = "Storage point already exists."
                        self.logger.error(errorMsg)
                        response = { "responseType": "ERROR",
                                     "errorCode": 4,
                                     "errorMsg": errorMsg }
            elif requestBody["requestType"] == "STORAGE_DEL_REQ":
                try:
                    result = self.dbConn.getStorageList()
                except Exception:
                    errorMsg = "Database error."
                    self.logger.exception(errorMsg)
                    response = { "responseType": "ERROR",
                                 "errorCode": 2,
                                 "errorMsg": errorMsg }
                    return response
                else:
                    response = { "responseType": "STORAGE_DEL_ACK",
                                 "storageList": result }
            elif requestBody["requestType"] == "STORAGE_DEL_CON":
                storageId = requestBody["storageId"]
                try:
                    result = self.dbConn.deleteStorage(storageId)
                except Exception:
                    errorMsg = "Database error."
                    self.logger.exception(errorMsg)
                    response = { "responseType": "ERROR",
                                 "errorCode": 2,
                                 "errorMsg": errorMsg }
                    return response
                else:
                    if result:
                        response = { "responseType": "STORAGE_DEL_DONE" }
                    else:
                        errorMsg = "This storage location contains some VOSpace nodes. Please, move those nodes to another location."
                        self.logger.error(errorMsg)
                        response = { "responseType": "ERROR",
                                     "errorCode": 5,
                                     "errorMsg": errorMsg }
            elif requestBody["requestType"] == "STORAGE_LST":
                try:
                    result = self.dbConn.getStorageList()
                except Exception:
                    errorMsg = "Database error."
                    self.logger.exception(errorMsg)
                    response = { "responseType": "ERROR",
                                 "errorCode": 2,
                                 "errorMsg": errorMsg }
                    return response
                else:
                    response = { "responseType": "STORAGE_LST_DONE",
                                 "storageList": result }
    
            else:
                errorMsg = "Unkown request type."
                self.logger.error(errorMsg)
                response = { "responseType": "ERROR",
                             "errorCode": 6,
                             "errorMsg": errorMsg }
            return response
    
        def run(self):
            self.logger.info(f"Starting RPC server of type {self.type}...")
            super(StorageRPCServer, self).run()