#!/usr/bin/env python # # This file is part of vospace-transfer-service # Copyright (C) 2021 Istituto Nazionale di Astrofisica # SPDX-License-Identifier: GPL-3.0-or-later # import json import logging from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from config import Config from db_connector import DbConnector from job import Job from job_queue import JobQueue class StartJobRPCServer(RedisRPCServer): def __init__(self, host, port, db, rpcQueue): self.type = "start" config = Config("/etc/vos_ts/vos_ts.conf") params = config.loadSection("scheduling") self.maxPendingJobs = params.getint("max_pending_jobs") self.maxTerminatedJobs = params.getint("max_terminated_jobs") params = config.loadSection("transfer_node") self.storageStorePath = params["store_path"] params = config.loadSection("scheduling") self.maxPendingJobs = params.getint("max_pending_jobs") params = config.loadSection("logging") self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] logFormat = params["log_format"] logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) params = config.loadSection("file_catalog") self.dbConn = DbConnector(params["user"], params["password"], params["host"], params.getint("port"), params["db"], 1, 2, self.logger) self.pendingQueueRead = JobQueue("read_pending") self.terminatedQueueRead = JobQueue("read_terminated") super(StartJobRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): # debug block... #out = open("start_job_rpc_server_log.txt", "a") #out.write(json.dumps(requestBody)) job = Job() job.setId(requestBody["job"]["jobId"]) job.setType(requestBody["job"]["jobInfo"]["transfer"]["direction"]) job.setInfo(requestBody["job"]["jobInfo"]) job.setOwnerId(requestBody["job"]["ownerId"]) try: pendingQueueLen = self.pendingQueueRead.len() terminatedQueueLen = self.terminatedQueueRead.len() except Exception: errorMsg = "Cache error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": errorMsg } return response # Check if 'read_pending_queue' is full if pendingQueueLen >= self.maxPendingJobs: job.setPhase("ERROR") job.setErrorType("transient") errorMsg = "Pending queue is full, please, retry later." job.setErrorMessage(errorMsg) self.logger.warning(errorMsg) errorFlag = True # Check if the user is already present in the VOSpace database. If not, change the job phase to 'ERROR'. # The user must be created in the database using the 'vos_user' admin command. elif not self.dbConn.getUserName(requestBody["job"]["ownerId"]): job.setPhase("ERROR") job.setErrorType("transient") errorMsg = "The user is registered in the authentication system (RAP), but is not present into the 'users' table of the VOSpace database." job.setErrorMessage(errorMsg) self.logger.warning(errorMsg) errorFlag = True else: errorFlag = False if errorFlag: try: self.dbConn.insertJob(job) except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": errorMsg } return response try: if terminatedQueueLen >= self.maxTerminatedJobs: self.terminatedQueueRead.extractJob() self.terminatedQueueRead.insertJob(job) except Exception: errorMsg = "Cache error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": errorMsg } return response else: job.setPhase(requestBody["job"]["phase"]) try: self.dbConn.insertJob(job) except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": errorMsg } return response try: self.pendingQueueRead.insertJob(job) except Exception: errorMsg = "Cache error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": errorMsg } return response try: response = self.dbConn.getJob(job.jobId) except Exception: errorMsg = "Database error." self.logger.exception(errorMsg) response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": errorMsg } # debug block... #out.write(f"Db response: {response}") #out.close() return response def run(self): self.logger.info(f"Starting RPC server of type {self.type}...") super(StartJobRPCServer, self).run()