From 2e1759d03e866bb537f17312aa1265efedc5f9c9 Mon Sep 17 00:00:00 2001 From: Cristiano Urban Date: Thu, 6 May 2021 18:21:15 +0200 Subject: [PATCH] Removed RabbitMQ + started Redis integration. Signed-off-by: Cristiano Urban --- docker-compose.yml | 32 ++++++----------------- transfer_service/abort_job_amqp_server.py | 10 +++---- transfer_service/cli_handler.py | 11 ++++---- transfer_service/config/vos_ts.conf | 2 +- transfer_service/data_amqp_server.py | 11 ++++---- transfer_service/get_job_amqp_server.py | 10 +++---- transfer_service/import_amqp_server.py | 10 +++---- transfer_service/job_amqp_server.py | 10 +++---- transfer_service/start_job_amqp_server.py | 10 +++---- transfer_service/storage_amqp_server.py | 10 +++---- transfer_service/transfer_service.py | 6 ++--- transfer_service/vospace_rest_handler.py | 9 ++++--- 12 files changed, 59 insertions(+), 72 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 1c22025..66595fc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,39 +3,23 @@ services: base: image: git.ia2.inaf.it:5050/vospace/vospace-transfer-service/base container_name: base - postgres: - image: git.ia2.inaf.it:5050/vospace/vospace-file-catalog - build: ../vospace-file-catalog - container_name: file_catalog - networks: - - backend_net - ports: - - "5432:5432" redis: image: git.ia2.inaf.it:5050/vospace/vospace-transfer-service/job_cache container_name: job_cache - depends_on: - - postgres networks: - backend_net ports: - "6379:6379" - rabbitmq: - image: git.ia2.inaf.it:5050/vospace/vospace-transfer-service/rabbitmq - volumes: - - ./conf/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf - container_name: rabbitmq - environment: - - RABBITMQ_LOGS=/var/log/rabbitmq/rabbit.log - depends_on: + postgres: + image: git.ia2.inaf.it:5050/vospace/vospace-file-catalog + build: ../vospace-file-catalog + container_name: file_catalog + depends_on: - redis networks: - backend_net ports: - - "5672:5672" - - "15672:15672" - stdin_open: true - tty: true + - "5432:5432" transfer_service: image: git.ia2.inaf.it:5050/vospace/vospace-transfer-service/transfer_service volumes: @@ -45,12 +29,12 @@ services: build: ./transfer_service container_name: transfer_service depends_on: - - rabbitmq + - postgres networks: - backend_net stdin_open: true tty: true - command: ["./wait-for-it.sh", "rabbitmq:5672", "--timeout=30", "--", "bash", "start.sh"] + command: ["./wait-for-it.sh", "postgres:5432", "--timeout=30", "--", "bash", "start.sh"] client: image: git.ia2.inaf.it:5050/vospace/vospace-transfer-service/client build: ./client diff --git a/transfer_service/abort_job_amqp_server.py b/transfer_service/abort_job_amqp_server.py index 641b539..21fa61b 100644 --- a/transfer_service/abort_job_amqp_server.py +++ b/transfer_service/abort_job_amqp_server.py @@ -1,13 +1,13 @@ #!/usr/bin/env python -from amqp_server import AMQPServer +from redis_rpc_server import RedisRpcServer from db_connector import DbConnector from config import Config -class AbortJobAMQPServer(AMQPServer): +class AbortJobAMQPServer(RedisRpcServer): - def __init__(self, host, port, queue): + def __init__(self, host, port, db, rpcQueue): self.type = "abort" config = Config("/etc/vos_ts/vos_ts.conf") self.params = config.loadSection("file_catalog") @@ -18,9 +18,9 @@ class AbortJobAMQPServer(AMQPServer): self.params["db"], 8, 16) - super(AbortJobAMQPServer, self).__init__(host, port, queue) + super(AbortJobAMQPServer, self).__init__(host, port, db, rpcQueue) - def execute_callback(self, requestBody): + def callback(self, requestBody): #TODO # do something... return 42 diff --git a/transfer_service/cli_handler.py b/transfer_service/cli_handler.py index dd68e87..01d6ec2 100644 --- a/transfer_service/cli_handler.py +++ b/transfer_service/cli_handler.py @@ -10,20 +10,21 @@ from storage_amqp_server import StorageAMQPServer class CliHandler(object): - def __init__(self, host, port): + def __init__(self, host, port, db): self.host = host self.port = port + self.db = db self.amqpServerList = [] def addAMQPServer(self, srvType, rpcQueue): if srvType == 'data': - self.amqpServerList.append(DataAMQPServer(self.host, self.port, rpcQueue)) + self.amqpServerList.append(DataAMQPServer(self.host, self.port, self.db, rpcQueue)) elif srvType == 'import': - self.amqpServerList.append(ImportAMQPServer(self.host, self.port, rpcQueue)) + self.amqpServerList.append(ImportAMQPServer(self.host, self.port, self.db, rpcQueue)) elif srvType == 'storage': - self.amqpServerList.append(StorageAMQPServer(self.host, self.port, rpcQueue)) + self.amqpServerList.append(StorageAMQPServer(self.host, self.port, self.db, rpcQueue)) elif srvType == 'job': - self.amqpServerList.append(JobAMQPServer(self.host, self.port, rpcQueue)) + self.amqpServerList.append(JobAMQPServer(self.host, self.port, self.db, rpcQueue)) else: sys.exit(f"FATAL: unknown server type {srvType}.") diff --git a/transfer_service/config/vos_ts.conf b/transfer_service/config/vos_ts.conf index ab63584..dd8a388 100644 --- a/transfer_service/config/vos_ts.conf +++ b/transfer_service/config/vos_ts.conf @@ -18,7 +18,7 @@ password = postgres # Redis [job_cache] ; hostname or IP address of the machine that hosts the Redis cache system -host = redis +host = job_cache ; port at which the cache service is available, default is 6379 TCP port = 6379 ; db index representing the db that stores the scheduling queues, default is 0 diff --git a/transfer_service/data_amqp_server.py b/transfer_service/data_amqp_server.py index e624f33..bbe337e 100644 --- a/transfer_service/data_amqp_server.py +++ b/transfer_service/data_amqp_server.py @@ -10,7 +10,7 @@ import json import os import sys -from amqp_server import AMQPServer +from redis_rpc_server import RedisRpcServer from config import Config from db_connector import DbConnector from job import Job @@ -18,9 +18,9 @@ from job_queue import JobQueue from system_utils import SystemUtils -class DataAMQPServer(AMQPServer): +class DataAMQPServer(RedisRpcServer): - def __init__(self, host, port, queue): + def __init__(self, host, port, db, rpcQueue): self.type = "data" self.storeAck = False config = Config("/etc/vos_ts/vos_ts.conf") @@ -38,9 +38,9 @@ class DataAMQPServer(AMQPServer): self.maxPendingJobs = self.params.getint("max_pending_jobs") self.pendingQueueWrite = JobQueue("write_pending") self.systemUtils = SystemUtils() - super(DataAMQPServer, self).__init__(host, port, queue) + super(DataAMQPServer, self).__init__(host, port, db, rpcQueue) - def execute_callback(self, requestBody): + def callback(self, requestBody): # 'requestType' and 'userName' attributes are mandatory if "requestType" not in requestBody or "userName" not in requestBody: response = { "responseType":"ERROR", @@ -134,3 +134,4 @@ class DataAMQPServer(AMQPServer): def run(self): print(f"Starting AMQP server of type {self.type}...") super(DataAMQPServer, self).run() + diff --git a/transfer_service/get_job_amqp_server.py b/transfer_service/get_job_amqp_server.py index a8a8eff..3899b39 100644 --- a/transfer_service/get_job_amqp_server.py +++ b/transfer_service/get_job_amqp_server.py @@ -2,14 +2,14 @@ import json -from amqp_server import AMQPServer +from redis_rpc_server import RedisRpcServer from config import Config from db_connector import DbConnector -class GetJobAMQPServer(AMQPServer): +class GetJobAMQPServer(RedisRpcServer): - def __init__(self, host, port, queue): + def __init__(self, host, port, db, rpcQueue): self.type = "poll" config = Config("/etc/vos_ts/vos_ts.conf") self.params = config.loadSection("file_catalog") @@ -20,9 +20,9 @@ class GetJobAMQPServer(AMQPServer): self.params["db"], 8, 16) - super(GetJobAMQPServer, self).__init__(host, port, queue) + super(GetJobAMQPServer, self).__init__(host, port, db, rpcQueue) - def execute_callback(self, requestBody): + def callback(self, requestBody): if "jobId" in requestBody: dbResponse = self.dbConn.getJob(requestBody["jobId"]) print(f"Db response: {dbResponse}") diff --git a/transfer_service/import_amqp_server.py b/transfer_service/import_amqp_server.py index 91bedd7..7235b77 100644 --- a/transfer_service/import_amqp_server.py +++ b/transfer_service/import_amqp_server.py @@ -3,7 +3,7 @@ import os import re -from amqp_server import AMQPServer +from redis_rpc_server import RedisRpcServer from config import Config from checksum import Checksum from datetime import datetime as dt @@ -16,9 +16,9 @@ from tape_client import TapeClient from multiprocessing import Process -class ImportAMQPServer(AMQPServer): +class ImportAMQPServer(RedisRpcServer): - def __init__(self, host, port, queue): + def __init__(self, host, port, db, rpcQueue): self.type = "import" self.md5calc = Checksum() config = Config("/etc/vos_ts/vos_ts.conf") @@ -36,9 +36,9 @@ class ImportAMQPServer(AMQPServer): self.params["user"], self.params["pkey_file_path"]) self.systemUtils = SystemUtils() - super(ImportAMQPServer, self).__init__(host, port, queue) + super(ImportAMQPServer, self).__init__(host, port, db, rpcQueue) - def execute_callback(self, requestBody): + def callback(self, requestBody): # 'requestType' and 'path' attributes are mandatory if "requestType" not in requestBody or "path" not in requestBody: response = { "responseType": "ERROR", diff --git a/transfer_service/job_amqp_server.py b/transfer_service/job_amqp_server.py index 257e727..fa8d9c6 100644 --- a/transfer_service/job_amqp_server.py +++ b/transfer_service/job_amqp_server.py @@ -2,14 +2,14 @@ import os -from amqp_server import AMQPServer +from redis_rpc_server import RedisRpcServer from config import Config from db_connector import DbConnector -class JobAMQPServer(AMQPServer): +class JobAMQPServer(RedisRpcServer): - def __init__(self, host, port, queue): + def __init__(self, host, port, db, rpcQueue): self.type = "job" config = Config("/etc/vos_ts/vos_ts.conf") self.params = config.loadSection("file_catalog") @@ -20,9 +20,9 @@ class JobAMQPServer(AMQPServer): self.params["db"], 8, 16) - super(JobAMQPServer, self).__init__(host, port, queue) + super(JobAMQPServer, self).__init__(host, port, db, rpcQueue) - def execute_callback(self, requestBody): + def callback(self, requestBody): # 'requestType' attribute is mandatory if "requestType" not in requestBody: response = { "responseType": "ERROR", diff --git a/transfer_service/start_job_amqp_server.py b/transfer_service/start_job_amqp_server.py index 143d4a8..cf6ddbf 100644 --- a/transfer_service/start_job_amqp_server.py +++ b/transfer_service/start_job_amqp_server.py @@ -2,16 +2,16 @@ import json -from amqp_server import AMQPServer +from redis_rpc_server import RedisRpcServer from config import Config from db_connector import DbConnector from job import Job from job_queue import JobQueue -class StartJobAMQPServer(AMQPServer): +class StartJobAMQPServer(RedisRpcServer): - def __init__(self, host, port, queue): + def __init__(self, host, port, db, rpcQueue): self.type = "start" config = Config("/etc/vos_ts/vos_ts.conf") self.params = config.loadSection("file_catalog") @@ -27,9 +27,9 @@ class StartJobAMQPServer(AMQPServer): self.maxTerminatedJobs = self.params.getint("max_terminated_jobs") self.pendingQueueRead = JobQueue("read_pending") self.terminatedQueueRead = JobQueue("read_terminated") - super(StartJobAMQPServer, self).__init__(host, port, queue) + super(StartJobAMQPServer, self).__init__(host, port, db, rpcQueue) - def execute_callback(self, requestBody): + def callback(self, requestBody): # debug block... out = open("start_job_amqp_server_log.txt", "a") out.write(json.dumps(requestBody)) diff --git a/transfer_service/storage_amqp_server.py b/transfer_service/storage_amqp_server.py index 6f2df44..09a247e 100644 --- a/transfer_service/storage_amqp_server.py +++ b/transfer_service/storage_amqp_server.py @@ -2,14 +2,14 @@ import os -from amqp_server import AMQPServer +from redis_rpc_server import RedisRpcServer from config import Config from db_connector import DbConnector -class StorageAMQPServer(AMQPServer): +class StorageAMQPServer(RedisRpcServer): - def __init__(self, host, port, queue): + def __init__(self, host, port, db, rpcQueue): self.type = "storage" config = Config("/etc/vos_ts/vos_ts.conf") self.params = config.loadSection("file_catalog") @@ -20,9 +20,9 @@ class StorageAMQPServer(AMQPServer): self.params["db"], 8, 16) - super(StorageAMQPServer, self).__init__(host, port, queue) + super(StorageAMQPServer, self).__init__(host, port, db, rpcQueue) - def execute_callback(self, requestBody): + def callback(self, requestBody): # 'requestType' attribute is mandatory if "requestType" not in requestBody: response = { "responseType": "ERROR", diff --git a/transfer_service/transfer_service.py b/transfer_service/transfer_service.py index a9ec0f9..fbb7903 100644 --- a/transfer_service/transfer_service.py +++ b/transfer_service/transfer_service.py @@ -10,9 +10,9 @@ class TransferService(object): def __init__(self): config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("amqp") - self.cliHandler = CliHandler(self.params["host"], self.params.getint("port")) - self.vosRestHandler = VOSpaceRestHandler(self.params["host"], self.params.getint("port")) + self.params = config.loadSection("job_cache") + self.cliHandler = CliHandler(self.params["host"], self.params.getint("port"), self.params.getint("db_sched")) + self.vosRestHandler = VOSpaceRestHandler(self.params["host"], self.params.getint("port"), self.params.getint("db_sched")) self.jobScheduler = JobScheduler() # PullToVOSpace (via REST API) diff --git a/transfer_service/vospace_rest_handler.py b/transfer_service/vospace_rest_handler.py index e6a15a7..0707100 100644 --- a/transfer_service/vospace_rest_handler.py +++ b/transfer_service/vospace_rest_handler.py @@ -10,18 +10,19 @@ from abort_job_amqp_server import AbortJobAMQPServer class VOSpaceRestHandler(object): - def __init__(self, host, port): + def __init__(self, host, port, db): self.host = host self.port = port + self.db = db self.amqpServerList = [] def addAMQPServer(self, srvType, rpcQueue): if srvType == 'start': - self.amqpServerList.append(StartJobAMQPServer(self.host, self.port, rpcQueue)) + self.amqpServerList.append(StartJobAMQPServer(self.host, self.port, self.db, rpcQueue)) elif srvType == 'poll': - self.amqpServerList.append(GetJobAMQPServer(self.host, self.port, rpcQueue)) + self.amqpServerList.append(GetJobAMQPServer(self.host, self.port, self.db, rpcQueue)) elif srvType == 'abort': - self.amqpServerList.append(AbortJobAMQPServer(self.host, self.port, rpcQueue)) + self.amqpServerList.append(AbortJobAMQPServer(self.host, self.port, self.db, rpcQueue)) else: sys.exit(f"FATAL: unknown server type {srvType}.") -- GitLab