diff --git a/transfer_service/data_amqp_server.py b/transfer_service/data_amqp_server.py index bbe337e0204aa0f63834e47abf5eb9335f6c1085..0e8ef922b2b0e1a254c156eeaff7b8e29f247dd2 100644 --- a/transfer_service/data_amqp_server.py +++ b/transfer_service/data_amqp_server.py @@ -93,7 +93,7 @@ class DataAMQPServer(RedisRpcServer): username = requestBody["userName"] self.prepare(username) job = Job() - job.setType("other") + job.setType("vos_data") job.setInfo(requestBody) job.setPhase("PENDING") job.setOwnerId(self.dbConn.getUserId(username)) diff --git a/transfer_service/import_amqp_server.py b/transfer_service/import_amqp_server.py index f496a8c32b6c00cff81e78ba36cf8dd2642563c0..548810ce72e7b575a358478ee9f2d53653d5a800 100644 --- a/transfer_service/import_amqp_server.py +++ b/transfer_service/import_amqp_server.py @@ -4,7 +4,8 @@ import os from config import Config from db_connector import DbConnector -from import_job_queue import ImportJobQueue +from job_queue import JobQueue +from job import Job from redis_rpc_server import RedisRpcServer from system_utils import SystemUtils @@ -24,7 +25,7 @@ class ImportAMQPServer(RedisRpcServer): 1) self.params = config.loadSection("scheduling") self.maxReadyJobs = self.params.getint("max_ready_jobs") - self.importReadyQueue = ImportJobQueue("import_ready") + self.importReadyQueue = JobQueue("import_ready") self.systemUtils = SystemUtils() super(ImportAMQPServer, self).__init__(host, port, db, rpcQueue) @@ -80,13 +81,26 @@ class ImportAMQPServer(RedisRpcServer): "errorCode": 8, "errorMsg": "Import queue is full, please, retry later." } else: - job = dict() - job["userId"] = userId - job["path"] = path - job["pathPrefix"] = pathPrefix - job["storageId"] = storageId - job["storageType"] = storageType - self.importReadyQueue.insertJob(job) + jobObj = Job() + jobObj.setType("vos_import") + jobInfo = requestBody.copy() + #jobInfo["userId"] = userId + jobInfo["pathPrefix"] = pathPrefix + jobInfo["storageId"] = storageId + jobInfo["storageType"] = storageType + jobObj.setInfo(jobInfo) + jobObj.setPhase("QUEUED") + jobObj.setOwnerId(userId) + self.dbConn.insertJob(jobObj) + + + #job = dict() + #job["userId"] = userId + #job["path"] = path + #job["pathPrefix"] = pathPrefix + #job["storageId"] = storageId + #job["storageType"] = storageType + self.importReadyQueue.insertJob(jobObj) #p = Process(target = self.load, # args = (self.tapeClient, diff --git a/transfer_service/import_executor.py b/transfer_service/import_executor.py index 344fe91fe6d33044cb31533d3bfaf14f7212bfa9..5689e9ca398eacf40b92b4ecb600da3e820e2d63 100644 --- a/transfer_service/import_executor.py +++ b/transfer_service/import_executor.py @@ -7,15 +7,15 @@ from config import Config from checksum import Checksum from datetime import datetime as dt from db_connector import DbConnector -from import_task_executor import ImportTaskExecutor from mailer import Mailer from node import Node from system_utils import SystemUtils from tabulate import tabulate from tape_client import TapeClient +from task_executor import TaskExecutor -class ImportExecutor(ImportTaskExecutor): +class ImportExecutor(TaskExecutor): def __init__(self): self.md5calc = Checksum() @@ -34,7 +34,8 @@ class ImportExecutor(ImportTaskExecutor): self.params["user"], self.params["pkey_file_path"]) self.systemUtils = SystemUtils() - self.job = None + self.jobObj = None + self.jobId = None self.userId = None self.path = None self.pathPrefix = None @@ -45,6 +46,9 @@ class ImportExecutor(ImportTaskExecutor): def importVOSpaceNodes(self): """This method performs the VOSpace import operation.""" + self.dbConn.setPhase(self.jobId, "EXECUTING") + self.dbConn.setStartTime(self.jobId) + start = dt.now() nodeList = [] timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") @@ -139,6 +143,13 @@ class ImportExecutor(ImportTaskExecutor): tablefmt = "simple")) nlfp.close() end = dt.now() + + # Update job status (to be moved) + results = [{"target": ""}] + self.dbConn.setResults(self.jobId, results) + self.jobObj.setPhase("COMPLETED") + self.dbConn.setPhase(self.jobId, "COMPLETED") + self.dbConn.setEndTime(self.jobId) m = Mailer() m.addRecipient("cristiano.urban@inaf.it") @@ -159,6 +170,7 @@ class ImportExecutor(ImportTaskExecutor): m.send() os.remove(nodeListFile) + def run(self): print("Starting import executor...") @@ -167,12 +179,13 @@ class ImportExecutor(ImportTaskExecutor): while True: self.wait() if self.srcQueue.len() > 0: - self.job = self.srcQueue.getJob() - self.userId = self.job["userId"] - self.path = self.job["path"] - self.pathPrefix = self.job["pathPrefix"] - self.storageId = self.job["storageId"] - self.storageType = self.job["storageType"] + self.jobObj = self.srcQueue.getJob() + self.jobId = self.jobObj.jobId + self.userId = self.jobObj.ownerId + self.path = self.jobObj.jobInfo["path"] + self.pathPrefix = self.jobObj.jobInfo["pathPrefix"] + self.storageId = self.jobObj.jobInfo["storageId"] + self.storageType = self.jobObj.jobInfo["storageType"] self.importVOSpaceNodes() if self.destQueue.len() == self.maxTerminatedJobs: self.destQueue.extractJob()