Skip to content
Snippets Groups Projects
Commit 311c6c30 authored by Cristiano Urban's avatar Cristiano Urban
Browse files

Minor changes.

parent 16415873
Branches
Tags
No related merge requests found
Showing with 103 additions and 81 deletions
......@@ -13,7 +13,9 @@ class AbortJobAMQPServer(AMQPServer):
self.params["password"],
self.params["host"],
self.params.getint("port"),
self.params["db"])
self.params["db"],
8,
16)
super(AbortJobAMQPServer, self).__init__(host, port, queue)
def execute_callback(self, requestBody):
......
......@@ -12,18 +12,20 @@ from node import Node
class DbConnector(object):
def __init__(self, user, password, host, port, dbname):
def __init__(self, user, password, host, port, dbname, minConnNum, maxConnNum):
self.user = user
self.password = password
self.host = host
self.port = port
self.dbname = dbname
self.minConnNum = minConnNum
self.maxConnNum = maxConnNum
self.createConnectionPool()
def createConnectionPool(self):
self.connPool = ThreadedConnectionPool(
8,
16,
self.minConnNum,
self.maxConnNum,
user = self.user,
password = self.password,
host = self.host,
......
......@@ -15,7 +15,9 @@ class GetJobAMQPServer(AMQPServer):
self.params["password"],
self.params["host"],
self.params.getint("port"),
self.params["db"])
self.params["db"],
8,
16)
super(GetJobAMQPServer, self).__init__(host, port, queue)
def execute_callback(self, requestBody):
......
......@@ -28,8 +28,8 @@ class ImportAMQPServer(AMQPServer):
self.params["user"],
self.params["pkey_file_path"])
self.systemUtils = SystemUtils()
self.path = None
self.username = None
#self.path = None
#self.username = None
super(ImportAMQPServer, self).__init__(host, port, queue)
def execute_callback(self, requestBody):
......@@ -37,11 +37,11 @@ class ImportAMQPServer(AMQPServer):
if "requestType" not in requestBody or "path" not in requestBody:
response = { "errorCode": 1, "errorMsg": "Malformed request, missing parameters." }
elif requestBody["requestType"] == "NODE_IMPORT":
self.path = os.path.abspath(requestBody["path"])
self.username = requestBody["userName"]
path = os.path.abspath(requestBody["path"])
username = requestBody["userName"]
#self.dbConn.connect()
userInDb = self.dbConn.userExists(self.username)
userInfo = self.systemUtils.userInfo(self.username)
userInDb = self.dbConn.userExists(username)
userInfo = self.systemUtils.userInfo(username)
out = open("import_amqp_server_log.txt", "a")
if not userInfo or not userInDb:
......@@ -51,8 +51,8 @@ class ImportAMQPServer(AMQPServer):
#self.dbConn.disconnect()
return response
userId = self.dbConn.getRapId(self.username)
pathPrefix = self.dbConn.storageBasePathIsValid(self.path)
userId = self.dbConn.getRapId(username)
pathPrefix = self.dbConn.storageBasePathIsValid(path)
if pathPrefix:
storageId = self.dbConn.getStorageId(pathPrefix)
......@@ -64,44 +64,44 @@ class ImportAMQPServer(AMQPServer):
#self.dbConn.disconnect()
return response
if not os.path.exists(self.path):
if not os.path.exists(path):
response = { "responseType": "ERROR",
"errorCode": 4,
"errorMsg": "Path not found." }
#self.dbConn.disconnect()
return response
elif not os.path.isdir(self.path):
elif not os.path.isdir(path):
response = { "responseType": "ERROR",
"errorCode": 5,
"errorMsg": "Directory path expected." }
#self.dbConn.disconnect()
return response
elif self.username not in self.path:
elif username not in path:
response = { "responseType": "ERROR",
"errorCode": 6,
"errorMsg": "Directory path does not contain the username." }
#self.dbConn.disconnect()
return response
elif os.path.dirname(self.path) != pathPrefix + '/' + self.username:
elif os.path.dirname(path) != pathPrefix + '/' + username:
response = { "responseType": "ERROR",
"errorCode": 7,
"errorMsg": "Invalid path, directory must be located in " + pathPrefix + '/' + self.username }
"errorMsg": "Invalid path, directory must be located in " + pathPrefix + '/' + username }
#self.dbConn.disconnect()
return response
else:
if storageType == "cold":
self.tapeClient.connect()
self.tapeClient.recallChecksumFiles(self.path)
self.tapeClient.recallChecksumFiles(path)
self.tapeClient.disconnect()
[ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path))
[ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(path))
tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper")
for dir in dirs:
out.write(f"DIR dir: {dir}\n")
out.write(f"DIR pathPrefix: {pathPrefix}\n\n")
if self.path in dir:
if path in dir:
parentPath = os.path.dirname(dir).split(pathPrefix)[1]
nodeName = os.path.basename(dir)
......@@ -131,7 +131,7 @@ class ImportAMQPServer(AMQPServer):
for flist in files:
for file in flist:
if self.md5calc.fileIsValid(file) and self.path in os.path.dirname(file):
if self.md5calc.fileIsValid(file) and path in os.path.dirname(file):
out.write(f"FILE files: {files}\n")
out.write(f"FILE flist: {flist}\n")
out.write(f"FILE file: {file}\n")
......
......@@ -15,9 +15,11 @@ class JobAMQPServer(AMQPServer):
self.params["password"],
self.params["host"],
self.params.getint("port"),
self.params["db"])
self.jobId = None
self.jobPhase = None
self.params["db"],
8,
16)
#self.jobId = None
#self.jobPhase = None
super(JobAMQPServer, self).__init__(host, port, queue)
def execute_callback(self, requestBody):
......@@ -32,16 +34,16 @@ class JobAMQPServer(AMQPServer):
#self.dbConn.disconnect()
response = { "responseType": "LST_DONE", "jobList": result }
elif requestBody["requestType"] == "JOB_BY_PHASE":
self.jobPhase = requestBody["jobPhase"]
jobPhase = requestBody["jobPhase"]
#self.dbConn.connect()
result = self.dbConn.listJobsByPhase(self.jobPhase)
result = self.dbConn.listJobsByPhase(jobPhase)
#self.dbConn.disconnect()
response = { "responseType": "LST_BY_PHASE_DONE", "jobList": result }
elif requestBody["requestType"] == "JOB_INFO":
self.jobId = requestBody["jobId"]
jobId = requestBody["jobId"]
#self.dbConn.connect()
if self.dbConn.jobExists(self.jobId):
result = self.dbConn.getJobInfo(self.jobId)
if self.dbConn.jobExists(jobId):
result = self.dbConn.getJobInfo(jobId)
response = { "responseType": "LST_INFO_DONE", "jobInfo": result }
else:
response = { "responseType": "ERROR",
......@@ -49,10 +51,10 @@ class JobAMQPServer(AMQPServer):
"errorMsg": "Wrong job ID." }
#self.dbConn.disconnect()
elif requestBody["requestType"] == "JOB_RESULTS":
self.jobId = requestBody["jobId"]
jobId = requestBody["jobId"]
#self.dbConn.connect()
if self.dbConn.jobExists(self.jobId):
result = self.dbConn.getJobResults(self.jobId)
if self.dbConn.jobExists(jobId):
result = self.dbConn.getJobResults(jobId)
response = { "responseType": "LST_RESULTS_DONE", "jobResults": result }
else:
response = { "responseType": "ERROR",
......
......@@ -13,7 +13,7 @@ class JobQueue(object):
def __init__(self, queueName):
config = Config("vos_ts.conf")
self.params = config.loadSection("job_cache")
self.redisCli = redis.StrictRedis(host = self.params["host"],
self.redisCli = redis.Redis(host = self.params["host"],
port = self.params["port"],
db = self.params["db_sched"])
self.queueName = queueName
......
......@@ -24,7 +24,9 @@ class RetrieveExecutor(TaskExecutor):
self.params["password"],
self.params["host"],
self.params.getint("port"),
self.params["db"])
self.params["db"],
1,
1)
self.jobObj = None
self.jobId = None
self.nodeList = []
......
......@@ -12,7 +12,9 @@ class RetrievePreprocessor(TaskExecutor):
self.params["password"],
self.params["host"],
self.params.getint("port"),
self.params["db"])
self.params["db"],
1,
1)
self.jobObj = None
self.nodeList = []
super(RetrievePreprocessor, self).__init__()
......
......@@ -19,7 +19,9 @@ class StartJobAMQPServer(AMQPServer):
self.params["password"],
self.params["host"],
self.params.getint("port"),
self.params["db"])
self.params["db"],
8,
16)
self.pendingQueueRead = JobQueue("read_pending")
super(StartJobAMQPServer, self).__init__(host, port, queue)
......@@ -27,21 +29,21 @@ class StartJobAMQPServer(AMQPServer):
out = open("start_job_amqp_server_log.txt", "a")
out.write(json.dumps(requestBody))
out.close()
self.job = Job()
job = Job()
#self.job.setType("pullToVoSpace")
#self.job.setInfo(requestBody)
#self.job.setPhase("PENDING")
self.job.setId(requestBody["jobId"])
self.job.setType(requestBody["jobInfo"]["transfer"]["direction"])
self.job.setPhase(requestBody["phase"])
self.job.setInfo(requestBody["jobInfo"])
self.job.setOwnerId(requestBody["ownerId"])
job.setId(requestBody["jobId"])
job.setType(requestBody["jobInfo"]["transfer"]["direction"])
job.setPhase(requestBody["phase"])
job.setInfo(requestBody["jobInfo"])
job.setOwnerId(requestBody["ownerId"])
#self.dbConn.connect()
self.dbConn.insertJob(self.job)
dbResponse = self.dbConn.getJob(self.job.jobId)
self.dbConn.insertJob(job)
dbResponse = self.dbConn.getJob(job.jobId)
#self.dbConn.disconnect()
print(f"Db response: {dbResponse}")
self.pendingQueueRead.insertJob(self.job)
self.pendingQueueRead.insertJob(job)
#t = threading.Thread(target = self.fake_job) # only for testing purposes
#t.start() # only for testing purposes
return dbResponse
......
......@@ -15,12 +15,14 @@ class StorageAMQPServer(AMQPServer):
self.params["password"],
self.params["host"],
self.params.getint("port"),
self.params["db"])
self.storageType = None
self.storageBasePath = None
self.storageHostname = None
self.storageId = None
self.storageAck = False
self.params["db"],
8,
16)
#self.storageType = None
#self.storageBasePath = None
#self.storageHostname = None
#self.storageId = None
#self.storageAck = False
super(StorageAMQPServer, self).__init__(host, port, queue)
def execute_callback(self, requestBody):
......@@ -31,23 +33,23 @@ class StorageAMQPServer(AMQPServer):
"errorMsg": "Malformed request, missing parameters." }
elif requestBody["requestType"] == "STORAGE_ADD":
self.storageType = requestBody["storageType"]
self.storageBasePath = requestBody["basePath"]
self.storageBaseUrl = requestBody["baseUrl"]
self.storageHostname = requestBody["hostname"]
storageType = requestBody["storageType"]
storageBasePath = requestBody["basePath"]
storageBaseUrl = requestBody["baseUrl"]
storageHostname = requestBody["hostname"]
if self.storageType != "portal":
if not os.path.exists(self.storageBasePath):
if storageType != "portal":
if not os.path.exists(storageBasePath):
response = { "responseType": "ERROR",
"errorCode": 2,
"errorMsg": "Base path doesn't exist."}
return response
#self.dbConn.connect()
result = self.dbConn.insertStorage(self.storageType,
self.storageBasePath,
self.storageBaseUrl,
self.storageHostname)
result = self.dbConn.insertStorage(storageType,
storageBasePath,
storageBaseUrl,
storageHostname)
#self.dbConn.disconnect()
if result:
......@@ -65,14 +67,14 @@ class StorageAMQPServer(AMQPServer):
response = { "responseType": "STORAGE_DEL_ACK",
"storageList": result }
self.storageAck = True
#self.storageAck = True
elif requestBody["requestType"] == "STORAGE_DEL_CON":
self.storageId = requestBody["storageId"]
if self.storageAck:
self.storageAck = False
storageId = requestBody["storageId"]
#if self.storageAck:
#self.storageAck = False
#self.dbConn.connect()
result = self.dbConn.deleteStorage(self.storageId)
result = self.dbConn.deleteStorage(storageId)
#self.dbConn.disconnect()
if result:
......@@ -81,10 +83,10 @@ class StorageAMQPServer(AMQPServer):
response = { "responseType": "ERROR",
"errorCode": 4,
"errorMsg": "This storage location contains some VOSpace nodes. Please, move those nodes to another location." }
else:
response = { "responseType": "ERROR",
"errorCode": 5,
"errorMsg": "Store request not acknowledged." }
#else:
# response = { "responseType": "ERROR",
# "errorCode": 5,
# "errorMsg": "Store request not acknowledged." }
elif requestBody["requestType"] == "STORAGE_LST":
#self.dbConn.connect()
......@@ -96,7 +98,7 @@ class StorageAMQPServer(AMQPServer):
else:
response = { "responseType": "ERROR",
"errorCode": 6,
"errorCode": 5,
"errorMsg": "Unkown request type." }
return response
......
......@@ -29,7 +29,9 @@ class StoreAMQPServer(AMQPServer):
self.params["password"],
self.params["host"],
self.params.getint("port"),
self.params["db"])
self.params["db"],
8,
16)
self.params = config.loadSection("transfer_node")
self.storageStorePath = self.params["store_path"]
self.pendingQueueWrite = JobQueue("write_pending")
......
......@@ -26,7 +26,9 @@ class StoreExecutor(TaskExecutor):
self.params["password"],
self.params["host"],
self.params.getint("port"),
self.params["db"])
self.params["db"],
1,
1)
self.jobObj = None
self.jobId = None
self.username = None
......
......@@ -34,7 +34,9 @@ class StorePreprocessor(TaskExecutor):
self.params["password"],
self.params["host"],
self.params.getint("port"),
self.params["db"])
self.params["db"],
1,
1)
self.params = config.loadSection("transfer_node")
self.storageStorePath = self.params["store_path"]
self.storageId = None
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment