From 584d2e64597b71cdd24cb99e95be70c390f56f07 Mon Sep 17 00:00:00 2001 From: Cristiano Urban Date: Tue, 19 Jan 2021 18:50:05 +0100 Subject: [PATCH] Added 'RetrievePreprocessor' class skeleton. Signed-off-by: Cristiano Urban --- transfer_service/retrieve_preprocessor.py | 31 +++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 transfer_service/retrieve_preprocessor.py diff --git a/transfer_service/retrieve_preprocessor.py b/transfer_service/retrieve_preprocessor.py new file mode 100644 index 0000000..2a30089 --- /dev/null +++ b/transfer_service/retrieve_preprocessor.py @@ -0,0 +1,31 @@ +from config import Config +from db_connector import DbConnector +from task_executor import TaskExecutor + + +class RetrievePreprocessor(TaskExecutor): + + def __init__(self): + config = Config("vos_ts.conf") + self.params = config.loadSection("file_catalog") + self.dbConn = DbConnector(self.params["user"], + self.params["password"], + self.params["host"], + self.params.getint("port"), + self.params["db"]) + super(RetrievePreprocessor, self).__init__() + + def run(self): + print("Starting retrieve preprocessor...") + self.setSourceQueueName("read_pending") + self.setDestinationQueueName("read_ready") + while True: + self.wait() + if self.destQueue.len() < self.maxReadyJobs and self.srcQueue.len() > 0: + self.jobObj = self.srcQueue.getJob() + + # do something here... + + self.srcQueue.extractJob() + self.destQueue.insertJob(self.jobObj) + print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") \ No newline at end of file -- GitLab