From 95d3b29e58930fbe07ffc32a8b27ef543d92984a Mon Sep 17 00:00:00 2001 From: Cristiano Urban Date: Thu, 27 May 2021 18:07:44 +0200 Subject: [PATCH] Removed 'setSticky()', 'nodeExists()' methods from DbConnector + updated 'insertNode()' + decreased num of connections to db for each class, added 'storageType' param to DataRpcServer + added basic handling for TimeoutError exception in Mailer class + code cleanup. Signed-off-by: Cristiano Urban --- client/vos_data | 6 ++- transfer_service/abort_job_rpc_server.py | 4 +- transfer_service/data_rpc_server.py | 5 ++- transfer_service/db_connector.py | 51 ++++++------------------ transfer_service/get_job_rpc_server.py | 4 +- transfer_service/import_executor.py | 11 +++-- transfer_service/import_rpc_server.py | 16 ++++---- transfer_service/job_rpc_server.py | 4 +- transfer_service/mailer.py | 2 + transfer_service/node.py | 2 +- transfer_service/start_job_rpc_server.py | 4 +- transfer_service/storage_rpc_server.py | 4 +- transfer_service/store_executor.py | 3 +- transfer_service/store_preprocessor.py | 12 ++++-- 14 files changed, 56 insertions(+), 72 deletions(-) diff --git a/client/vos_data b/client/vos_data index 6e246ad..f4ac1ad 100644 --- a/client/vos_data +++ b/client/vos_data @@ -53,6 +53,7 @@ DESCRIPTION sys.exit("FATAL: Malformed response, storage acknowledge expected.\n") elif storeResponse["responseType"] == "STORE_ACK": storageList = storeResponse["storageList"] + storageType = storageList[0]["storage_type"] if not storageList: sys.exit("No storage point found. Please add a storage point using the 'vos_storage' command.\n") print("Choose one of the following storage locations:") @@ -89,7 +90,10 @@ DESCRIPTION except EOFError: print("\nPlease, use CTRL+C to quit.") if confirm == "yes": - confirmRequest = { "requestType": "STORE_CON", "userName": username, "storageId": storageId } + confirmRequest = { "requestType": "STORE_CON", + "userName": username, + "storageId": storageId, + "storageType": storageType } confirmResponse = self.call(confirmRequest) if "responseType" not in confirmResponse: sys.exit("\nFATAL: Malformed response, storage confirmation expected.\n") diff --git a/transfer_service/abort_job_rpc_server.py b/transfer_service/abort_job_rpc_server.py index 08ce090..ec0cb9c 100644 --- a/transfer_service/abort_job_rpc_server.py +++ b/transfer_service/abort_job_rpc_server.py @@ -16,8 +16,8 @@ class AbortJobRPCServer(RedisRPCServer): self.params["host"], self.params.getint("port"), self.params["db"], - 8, - 16) + 4, + 8) super(AbortJobRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/data_rpc_server.py b/transfer_service/data_rpc_server.py index 464155b..695cc86 100644 --- a/transfer_service/data_rpc_server.py +++ b/transfer_service/data_rpc_server.py @@ -30,8 +30,8 @@ class DataRPCServer(RedisRPCServer): self.params["host"], self.params.getint("port"), self.params["db"], - 8, - 16) + 4, + 8) self.params = config.loadSection("transfer_node") self.storageStorePath = self.params["store_path"] self.params = config.loadSection("scheduling") @@ -100,6 +100,7 @@ class DataRPCServer(RedisRPCServer): self.dbConn.insertJob(job) dbResponse = self.dbConn.getJob(job.jobId) job.jobInfo["storageId"] = requestBody["storageId"] + job.jobInfo["storageType"] = requestBody["storageType"] self.pendingQueueWrite.insertJob(job) if "error" in dbResponse: response = { "responseType": "ERROR", diff --git a/transfer_service/db_connector.py b/transfer_service/db_connector.py index 4642a1c..b2e4cb7 100644 --- a/transfer_service/db_connector.py +++ b/transfer_service/db_connector.py @@ -51,23 +51,6 @@ class DbConnector(object): ##### Node ##### - def nodeExists(self, node): - """Checks if a VOSpace node already exists. Returns a boolean.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - nodeVOSPath = node.parentPath + '/' + node.name - cursor.execute("SELECT * FROM node_vos_path WHERE vos_path = %s;", (nodeVOSPath,)) - result = cursor.fetchall() - except Exception as e: - if not conn.closed: - conn.rollback() - print(e) - if result: - return True - else: - return False - def getOSPath(self, vospacePath): """Returns a list containing full path, storage type and username for a VOSpace path.""" with self.getConnection() as conn: @@ -614,7 +597,7 @@ class DbConnector(object): ##### Node ##### def insertNode(self, node): - """Inserts a VOSpace node.""" + """Inserts a VOSpace node. Returns 'True' on success, 'False' otherwise.""" with self.getConnection() as conn: try: out = open("db_connector_log.txt", "a") @@ -652,11 +635,15 @@ class DbConnector(object): tstamp_wrapper_dir, type, location_id, + sticky, busy_state, creator_id, content_length, content_md5) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s); + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT + DO NOTHING + RETURNING node_id; """, (parentLtreePath, parentLtreeRelativePath, @@ -664,11 +651,17 @@ class DbConnector(object): node.wrapperDir, node.type, node.locationId, + node.sticky, node.busyState, node.creatorID, node.contentLength, node.contentMD5,)) + result = cursor.fetchall() conn.commit() + if result: + return True + else: + return False except Exception as e: if not conn.closed: conn.rollback() @@ -715,26 +708,6 @@ class DbConnector(object): if not conn.closed: conn.rollback() - def setSticky(self, nodeVOSPath, value): - """Sets the 'sticky' flag for a VOSpace node.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - UPDATE node SET sticky = %s - WHERE path <@ - (SELECT path - FROM node_path p - JOIN node n ON p.node_id = n.node_id - WHERE p.vos_path = %s); - """, - (value, nodeVOSPath,)) - conn.commit() - except Exception as e: - if not conn.closed: - conn.rollback() - - def setBusyState(self, nodeVOSPath, value): """Sets the 'busy_state' flag for a VOSpace node.""" with self.getConnection() as conn: diff --git a/transfer_service/get_job_rpc_server.py b/transfer_service/get_job_rpc_server.py index 344af1a..8bafbec 100644 --- a/transfer_service/get_job_rpc_server.py +++ b/transfer_service/get_job_rpc_server.py @@ -18,8 +18,8 @@ class GetJobRPCServer(RedisRPCServer): self.params["host"], self.params.getint("port"), self.params["db"], - 8, - 16) + 4, + 8) super(GetJobRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/import_executor.py b/transfer_service/import_executor.py index dd91c78..2af4843 100644 --- a/transfer_service/import_executor.py +++ b/transfer_service/import_executor.py @@ -90,10 +90,10 @@ class ImportExecutor(TaskExecutor): cnode.setLocationId(locationId) cnode.setCreatorID(self.userId) cnode.setContentLength(0) - if not self.dbConn.nodeExists(cnode): - self.dbConn.insertNode(cnode) + cnode.setSticky(True) + + if self.dbConn.insertNode(cnode): self.dbConn.setAsyncTrans(vospacePath, True) - self.dbConn.setSticky(vospacePath, True) now = dt.now() nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) else: @@ -127,11 +127,10 @@ class ImportExecutor(TaskExecutor): dnode.setCreatorID(self.userId) dnode.setContentLength(os.path.getsize(file)) dnode.setContentMD5(self.md5calc.getMD5(file)) + dnode.setSticky(True) - if not self.dbConn.nodeExists(dnode): - self.dbConn.insertNode(dnode) + if self.dbConn.insertNode(dnode): self.dbConn.setAsyncTrans(vospacePath, True) - self.dbConn.setSticky(vospacePath, True) now = dt.now() nodeList.append([ now, file, vospacePath, "data", "DONE" ]) else: diff --git a/transfer_service/import_rpc_server.py b/transfer_service/import_rpc_server.py index cac89b8..99cdef8 100644 --- a/transfer_service/import_rpc_server.py +++ b/transfer_service/import_rpc_server.py @@ -68,14 +68,14 @@ class ImportRPCServer(RedisRPCServer): response = { "responseType": "ERROR", "errorCode": 5, "errorMsg": "Directory path expected." } - #elif username not in path: - # response = { "responseType": "ERROR", - # "errorCode": 6, - # "errorMsg": "Directory path does not contain the username." } - #elif os.path.dirname(path) != pathPrefix + '/' + username: - # response = { "responseType": "ERROR", - # "errorCode": 7, - # "errorMsg": "Invalid path, directory must be located in " + pathPrefix + '/' + username } + elif username not in path: + response = { "responseType": "ERROR", + "errorCode": 6, + "errorMsg": "Directory path does not contain the username." } + elif os.path.dirname(path) != pathPrefix + '/' + username: + response = { "responseType": "ERROR", + "errorCode": 7, + "errorMsg": "Invalid path, directory must be located in " + pathPrefix + '/' + username } elif self.importReadyQueue.len() >= self.maxReadyJobs: response = { "responseType": "ERROR", "errorCode": 8, diff --git a/transfer_service/job_rpc_server.py b/transfer_service/job_rpc_server.py index 57c0c32..d1a1be9 100644 --- a/transfer_service/job_rpc_server.py +++ b/transfer_service/job_rpc_server.py @@ -18,8 +18,8 @@ class JobRPCServer(RedisRPCServer): self.params["host"], self.params.getint("port"), self.params["db"], - 8, - 16) + 4, + 8) super(JobRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/mailer.py b/transfer_service/mailer.py index 7bfa129..4e78033 100644 --- a/transfer_service/mailer.py +++ b/transfer_service/mailer.py @@ -56,6 +56,8 @@ class Mailer(object): smtpObj = smtplib.SMTP(self.smtpServer, self.smtpPort) smtpObj.send_message(self.message) print("Message sent!") + except TimeoutError: + print("Error: connection timeout.") except SMTPException: print("Error: cannot send email message.") diff --git a/transfer_service/node.py b/transfer_service/node.py index d2b91f0..b782173 100644 --- a/transfer_service/node.py +++ b/transfer_service/node.py @@ -15,7 +15,7 @@ class Node(object): self.locationId = None self.format = None self.asyncTrans = None - self.sticky = None + self.sticky = False self.busyState = None self.creatorID = None self.groupRead = [] diff --git a/transfer_service/start_job_rpc_server.py b/transfer_service/start_job_rpc_server.py index 20850ac..2ca9895 100644 --- a/transfer_service/start_job_rpc_server.py +++ b/transfer_service/start_job_rpc_server.py @@ -20,8 +20,8 @@ class StartJobRPCServer(RedisRPCServer): self.params["host"], self.params.getint("port"), self.params["db"], - 8, - 16) + 4, + 8) self.params = config.loadSection("scheduling") self.maxPendingJobs = self.params.getint("max_pending_jobs") self.maxTerminatedJobs = self.params.getint("max_terminated_jobs") diff --git a/transfer_service/storage_rpc_server.py b/transfer_service/storage_rpc_server.py index 6c381b6..dcf1505 100644 --- a/transfer_service/storage_rpc_server.py +++ b/transfer_service/storage_rpc_server.py @@ -18,8 +18,8 @@ class StorageRPCServer(RedisRPCServer): self.params["host"], self.params.getint("port"), self.params["db"], - 8, - 16) + 4, + 8) super(StorageRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/store_executor.py b/transfer_service/store_executor.py index 4befaee..ca7b773 100644 --- a/transfer_service/store_executor.py +++ b/transfer_service/store_executor.py @@ -38,6 +38,7 @@ class StoreExecutor(TaskExecutor): self.username = None self.requestType = None self.storageId = None + self.storageType = None self.nodeList = [] self.systemUtils = SystemUtils() super(StoreExecutor, self).__init__() @@ -78,7 +79,6 @@ class StoreExecutor(TaskExecutor): for nodeVOSPath in self.nodeList: out.write(f"nodeListElement: {nodeVOSPath}\n") self.dbConn.setAsyncTrans(nodeVOSPath, True); - self.dbConn.setSticky(nodeVOSPath, True); self.dbConn.setBusyState(nodeVOSPath, False); self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") @@ -97,6 +97,7 @@ class StoreExecutor(TaskExecutor): self.username = self.jobObj.jobInfo["userName"] self.requestType = self.jobObj.jobInfo["requestType"] self.storageId = self.jobObj.jobInfo["storageId"] + self.storageType = self.jobObj.jobInfo["storageType"] self.nodeList = self.jobObj.jobInfo["nodeList"] # TODO # 1) Controlla il tipo di destinazione: hot (server) o cold (tape) diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index f3cbfd7..2e7b194 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -42,6 +42,7 @@ class StorePreprocessor(TaskExecutor): self.params = config.loadSection("transfer_node") self.storageStorePath = self.params["store_path"] self.storageId = None + self.storageType = None self.jobObj = None self.jobId = None self.username = None @@ -139,8 +140,9 @@ class StorePreprocessor(TaskExecutor): cnode.setBusyState(True) cnode.setCreatorID(self.userId) cnode.setContentLength(0) - if not self.dbConn.nodeExists(cnode): - self.dbConn.insertNode(cnode) + cnode.setSticky(True) + + if self.dbConn.insertNode(cnode): self.nodeList.append(basePath + '/' + nodeName) out.write("\n\n") @@ -168,8 +170,9 @@ class StorePreprocessor(TaskExecutor): dnode.setCreatorID(self.userId) dnode.setContentLength(os.path.getsize(file)) dnode.setContentMD5(self.md5calc.getMD5(file)) - if not self.dbConn.nodeExists(dnode): - self.dbConn.insertNode(dnode) + dnode.setSticky(True) + + if self.dbConn.insertNode(dnode): self.nodeList.append(basePath + '/' + nodeName) out.write("\n") @@ -189,6 +192,7 @@ class StorePreprocessor(TaskExecutor): self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.storageId = self.jobObj.jobInfo["storageId"] + self.storgeType = self.jobObj.jobInfo["storageType"] self.username = self.jobObj.jobInfo["userName"] self.prepare(self.username) self.execute() -- GitLab