diff --git a/docker-compose.yml b/docker-compose.yml index 1c2202578f1b00ef43277c84affa5b2e3bc25a3f..66595fc1bc5bd1ed96ba997465841b99fe8b38ac 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,39 +3,23 @@ services: base: image: git.ia2.inaf.it:5050/vospace/vospace-transfer-service/base container_name: base - postgres: - image: git.ia2.inaf.it:5050/vospace/vospace-file-catalog - build: ../vospace-file-catalog - container_name: file_catalog - networks: - - backend_net - ports: - - "5432:5432" redis: image: git.ia2.inaf.it:5050/vospace/vospace-transfer-service/job_cache container_name: job_cache - depends_on: - - postgres networks: - backend_net ports: - "6379:6379" - rabbitmq: - image: git.ia2.inaf.it:5050/vospace/vospace-transfer-service/rabbitmq - volumes: - - ./conf/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf - container_name: rabbitmq - environment: - - RABBITMQ_LOGS=/var/log/rabbitmq/rabbit.log - depends_on: + postgres: + image: git.ia2.inaf.it:5050/vospace/vospace-file-catalog + build: ../vospace-file-catalog + container_name: file_catalog + depends_on: - redis networks: - backend_net ports: - - "5672:5672" - - "15672:15672" - stdin_open: true - tty: true + - "5432:5432" transfer_service: image: git.ia2.inaf.it:5050/vospace/vospace-transfer-service/transfer_service volumes: @@ -45,12 +29,12 @@ services: build: ./transfer_service container_name: transfer_service depends_on: - - rabbitmq + - postgres networks: - backend_net stdin_open: true tty: true - command: ["./wait-for-it.sh", "rabbitmq:5672", "--timeout=30", "--", "bash", "start.sh"] + command: ["./wait-for-it.sh", "postgres:5432", "--timeout=30", "--", "bash", "start.sh"] client: image: git.ia2.inaf.it:5050/vospace/vospace-transfer-service/client build: ./client diff --git a/transfer_service/abort_job_amqp_server.py b/transfer_service/abort_job_amqp_server.py index 641b539b8b5aac573233be7d8c7bfb6bc21b6d46..21fa61b1480926b350541350fd5041aa170ff23d 100644 --- a/transfer_service/abort_job_amqp_server.py +++ b/transfer_service/abort_job_amqp_server.py @@ -1,13 +1,13 @@ #!/usr/bin/env python -from amqp_server import AMQPServer +from redis_rpc_server import RedisRpcServer from db_connector import DbConnector from config import Config -class AbortJobAMQPServer(AMQPServer): +class AbortJobAMQPServer(RedisRpcServer): - def __init__(self, host, port, queue): + def __init__(self, host, port, db, rpcQueue): self.type = "abort" config = Config("/etc/vos_ts/vos_ts.conf") self.params = config.loadSection("file_catalog") @@ -18,9 +18,9 @@ class AbortJobAMQPServer(AMQPServer): self.params["db"], 8, 16) - super(AbortJobAMQPServer, self).__init__(host, port, queue) + super(AbortJobAMQPServer, self).__init__(host, port, db, rpcQueue) - def execute_callback(self, requestBody): + def callback(self, requestBody): #TODO # do something... return 42 diff --git a/transfer_service/cli_handler.py b/transfer_service/cli_handler.py index dd68e87ffbc0c7fe8376696243626d474f6d5a9f..01d6ec2b442d6b62191aa14ef1c6f85f20deb705 100644 --- a/transfer_service/cli_handler.py +++ b/transfer_service/cli_handler.py @@ -10,20 +10,21 @@ from storage_amqp_server import StorageAMQPServer class CliHandler(object): - def __init__(self, host, port): + def __init__(self, host, port, db): self.host = host self.port = port + self.db = db self.amqpServerList = [] def addAMQPServer(self, srvType, rpcQueue): if srvType == 'data': - self.amqpServerList.append(DataAMQPServer(self.host, self.port, rpcQueue)) + self.amqpServerList.append(DataAMQPServer(self.host, self.port, self.db, rpcQueue)) elif srvType == 'import': - self.amqpServerList.append(ImportAMQPServer(self.host, self.port, rpcQueue)) + self.amqpServerList.append(ImportAMQPServer(self.host, self.port, self.db, rpcQueue)) elif srvType == 'storage': - self.amqpServerList.append(StorageAMQPServer(self.host, self.port, rpcQueue)) + self.amqpServerList.append(StorageAMQPServer(self.host, self.port, self.db, rpcQueue)) elif srvType == 'job': - self.amqpServerList.append(JobAMQPServer(self.host, self.port, rpcQueue)) + self.amqpServerList.append(JobAMQPServer(self.host, self.port, self.db, rpcQueue)) else: sys.exit(f"FATAL: unknown server type {srvType}.") diff --git a/transfer_service/config/vos_ts.conf b/transfer_service/config/vos_ts.conf index ab635849e603abb32c02377b060c4960ac543ed3..dd8a3884faae5787bb1c9e4d15e5ede843cd06a6 100644 --- a/transfer_service/config/vos_ts.conf +++ b/transfer_service/config/vos_ts.conf @@ -18,7 +18,7 @@ password = postgres # Redis [job_cache] ; hostname or IP address of the machine that hosts the Redis cache system -host = redis +host = job_cache ; port at which the cache service is available, default is 6379 TCP port = 6379 ; db index representing the db that stores the scheduling queues, default is 0 diff --git a/transfer_service/data_amqp_server.py b/transfer_service/data_amqp_server.py index e624f338e3f3aca4cacfe40e3ac5611c3685110a..bbe337e0204aa0f63834e47abf5eb9335f6c1085 100644 --- a/transfer_service/data_amqp_server.py +++ b/transfer_service/data_amqp_server.py @@ -10,7 +10,7 @@ import json import os import sys -from amqp_server import AMQPServer +from redis_rpc_server import RedisRpcServer from config import Config from db_connector import DbConnector from job import Job @@ -18,9 +18,9 @@ from job_queue import JobQueue from system_utils import SystemUtils -class DataAMQPServer(AMQPServer): +class DataAMQPServer(RedisRpcServer): - def __init__(self, host, port, queue): + def __init__(self, host, port, db, rpcQueue): self.type = "data" self.storeAck = False config = Config("/etc/vos_ts/vos_ts.conf") @@ -38,9 +38,9 @@ class DataAMQPServer(AMQPServer): self.maxPendingJobs = self.params.getint("max_pending_jobs") self.pendingQueueWrite = JobQueue("write_pending") self.systemUtils = SystemUtils() - super(DataAMQPServer, self).__init__(host, port, queue) + super(DataAMQPServer, self).__init__(host, port, db, rpcQueue) - def execute_callback(self, requestBody): + def callback(self, requestBody): # 'requestType' and 'userName' attributes are mandatory if "requestType" not in requestBody or "userName" not in requestBody: response = { "responseType":"ERROR", @@ -134,3 +134,4 @@ class DataAMQPServer(AMQPServer): def run(self): print(f"Starting AMQP server of type {self.type}...") super(DataAMQPServer, self).run() + diff --git a/transfer_service/get_job_amqp_server.py b/transfer_service/get_job_amqp_server.py index a8a8eff256eccc44cf78cbde5f321e0d3f4eb72f..3899b39da1214fe2f4d2b2e55708821d073c7310 100644 --- a/transfer_service/get_job_amqp_server.py +++ b/transfer_service/get_job_amqp_server.py @@ -2,14 +2,14 @@ import json -from amqp_server import AMQPServer +from redis_rpc_server import RedisRpcServer from config import Config from db_connector import DbConnector -class GetJobAMQPServer(AMQPServer): +class GetJobAMQPServer(RedisRpcServer): - def __init__(self, host, port, queue): + def __init__(self, host, port, db, rpcQueue): self.type = "poll" config = Config("/etc/vos_ts/vos_ts.conf") self.params = config.loadSection("file_catalog") @@ -20,9 +20,9 @@ class GetJobAMQPServer(AMQPServer): self.params["db"], 8, 16) - super(GetJobAMQPServer, self).__init__(host, port, queue) + super(GetJobAMQPServer, self).__init__(host, port, db, rpcQueue) - def execute_callback(self, requestBody): + def callback(self, requestBody): if "jobId" in requestBody: dbResponse = self.dbConn.getJob(requestBody["jobId"]) print(f"Db response: {dbResponse}") diff --git a/transfer_service/import_amqp_server.py b/transfer_service/import_amqp_server.py index 91bedd7761ca21e537f131391501550cd5b6bdb0..7235b77eb538f8a21326b5daae39831e53a7b330 100644 --- a/transfer_service/import_amqp_server.py +++ b/transfer_service/import_amqp_server.py @@ -3,7 +3,7 @@ import os import re -from amqp_server import AMQPServer +from redis_rpc_server import RedisRpcServer from config import Config from checksum import Checksum from datetime import datetime as dt @@ -16,9 +16,9 @@ from tape_client import TapeClient from multiprocessing import Process -class ImportAMQPServer(AMQPServer): +class ImportAMQPServer(RedisRpcServer): - def __init__(self, host, port, queue): + def __init__(self, host, port, db, rpcQueue): self.type = "import" self.md5calc = Checksum() config = Config("/etc/vos_ts/vos_ts.conf") @@ -36,9 +36,9 @@ class ImportAMQPServer(AMQPServer): self.params["user"], self.params["pkey_file_path"]) self.systemUtils = SystemUtils() - super(ImportAMQPServer, self).__init__(host, port, queue) + super(ImportAMQPServer, self).__init__(host, port, db, rpcQueue) - def execute_callback(self, requestBody): + def callback(self, requestBody): # 'requestType' and 'path' attributes are mandatory if "requestType" not in requestBody or "path" not in requestBody: response = { "responseType": "ERROR", diff --git a/transfer_service/job_amqp_server.py b/transfer_service/job_amqp_server.py index 257e727c652884ee890179e997e264bff96f59f5..fa8d9c6dbaa90da7ed6f8271e1f838aa71caf6c7 100644 --- a/transfer_service/job_amqp_server.py +++ b/transfer_service/job_amqp_server.py @@ -2,14 +2,14 @@ import os -from amqp_server import AMQPServer +from redis_rpc_server import RedisRpcServer from config import Config from db_connector import DbConnector -class JobAMQPServer(AMQPServer): +class JobAMQPServer(RedisRpcServer): - def __init__(self, host, port, queue): + def __init__(self, host, port, db, rpcQueue): self.type = "job" config = Config("/etc/vos_ts/vos_ts.conf") self.params = config.loadSection("file_catalog") @@ -20,9 +20,9 @@ class JobAMQPServer(AMQPServer): self.params["db"], 8, 16) - super(JobAMQPServer, self).__init__(host, port, queue) + super(JobAMQPServer, self).__init__(host, port, db, rpcQueue) - def execute_callback(self, requestBody): + def callback(self, requestBody): # 'requestType' attribute is mandatory if "requestType" not in requestBody: response = { "responseType": "ERROR", diff --git a/transfer_service/start_job_amqp_server.py b/transfer_service/start_job_amqp_server.py index 143d4a8beee20c931fb9b436d808b5f6315e0b47..cf6ddbf08aa7be16fdd49e433e0e77a9320002ae 100644 --- a/transfer_service/start_job_amqp_server.py +++ b/transfer_service/start_job_amqp_server.py @@ -2,16 +2,16 @@ import json -from amqp_server import AMQPServer +from redis_rpc_server import RedisRpcServer from config import Config from db_connector import DbConnector from job import Job from job_queue import JobQueue -class StartJobAMQPServer(AMQPServer): +class StartJobAMQPServer(RedisRpcServer): - def __init__(self, host, port, queue): + def __init__(self, host, port, db, rpcQueue): self.type = "start" config = Config("/etc/vos_ts/vos_ts.conf") self.params = config.loadSection("file_catalog") @@ -27,9 +27,9 @@ class StartJobAMQPServer(AMQPServer): self.maxTerminatedJobs = self.params.getint("max_terminated_jobs") self.pendingQueueRead = JobQueue("read_pending") self.terminatedQueueRead = JobQueue("read_terminated") - super(StartJobAMQPServer, self).__init__(host, port, queue) + super(StartJobAMQPServer, self).__init__(host, port, db, rpcQueue) - def execute_callback(self, requestBody): + def callback(self, requestBody): # debug block... out = open("start_job_amqp_server_log.txt", "a") out.write(json.dumps(requestBody)) diff --git a/transfer_service/storage_amqp_server.py b/transfer_service/storage_amqp_server.py index 6f2df446f01d7b2cf748b0a6a981f941696191b9..09a247ef292ac0f5d59672a6af72c28f2c9994f3 100644 --- a/transfer_service/storage_amqp_server.py +++ b/transfer_service/storage_amqp_server.py @@ -2,14 +2,14 @@ import os -from amqp_server import AMQPServer +from redis_rpc_server import RedisRpcServer from config import Config from db_connector import DbConnector -class StorageAMQPServer(AMQPServer): +class StorageAMQPServer(RedisRpcServer): - def __init__(self, host, port, queue): + def __init__(self, host, port, db, rpcQueue): self.type = "storage" config = Config("/etc/vos_ts/vos_ts.conf") self.params = config.loadSection("file_catalog") @@ -20,9 +20,9 @@ class StorageAMQPServer(AMQPServer): self.params["db"], 8, 16) - super(StorageAMQPServer, self).__init__(host, port, queue) + super(StorageAMQPServer, self).__init__(host, port, db, rpcQueue) - def execute_callback(self, requestBody): + def callback(self, requestBody): # 'requestType' attribute is mandatory if "requestType" not in requestBody: response = { "responseType": "ERROR", diff --git a/transfer_service/transfer_service.py b/transfer_service/transfer_service.py index a9ec0f9f468e01ce0b581cad07e0dee24b40fab8..fbb7903fabef4275136fbc4547b03c50678fb566 100644 --- a/transfer_service/transfer_service.py +++ b/transfer_service/transfer_service.py @@ -10,9 +10,9 @@ class TransferService(object): def __init__(self): config = Config("/etc/vos_ts/vos_ts.conf") - self.params = config.loadSection("amqp") - self.cliHandler = CliHandler(self.params["host"], self.params.getint("port")) - self.vosRestHandler = VOSpaceRestHandler(self.params["host"], self.params.getint("port")) + self.params = config.loadSection("job_cache") + self.cliHandler = CliHandler(self.params["host"], self.params.getint("port"), self.params.getint("db_sched")) + self.vosRestHandler = VOSpaceRestHandler(self.params["host"], self.params.getint("port"), self.params.getint("db_sched")) self.jobScheduler = JobScheduler() # PullToVOSpace (via REST API) diff --git a/transfer_service/vospace_rest_handler.py b/transfer_service/vospace_rest_handler.py index e6a15a7cd3bc793f333dba110a57e15cf66845a3..0707100f0b772e8edc4643a4ad104ef8c231ec23 100644 --- a/transfer_service/vospace_rest_handler.py +++ b/transfer_service/vospace_rest_handler.py @@ -10,18 +10,19 @@ from abort_job_amqp_server import AbortJobAMQPServer class VOSpaceRestHandler(object): - def __init__(self, host, port): + def __init__(self, host, port, db): self.host = host self.port = port + self.db = db self.amqpServerList = [] def addAMQPServer(self, srvType, rpcQueue): if srvType == 'start': - self.amqpServerList.append(StartJobAMQPServer(self.host, self.port, rpcQueue)) + self.amqpServerList.append(StartJobAMQPServer(self.host, self.port, self.db, rpcQueue)) elif srvType == 'poll': - self.amqpServerList.append(GetJobAMQPServer(self.host, self.port, rpcQueue)) + self.amqpServerList.append(GetJobAMQPServer(self.host, self.port, self.db, rpcQueue)) elif srvType == 'abort': - self.amqpServerList.append(AbortJobAMQPServer(self.host, self.port, rpcQueue)) + self.amqpServerList.append(AbortJobAMQPServer(self.host, self.port, self.db, rpcQueue)) else: sys.exit(f"FATAL: unknown server type {srvType}.")