From 5443c9a716919f2a7bd23131246509dd2fb965a1 Mon Sep 17 00:00:00 2001 From: Cristiano Urban Date: Tue, 5 Oct 2021 15:00:24 +0200 Subject: [PATCH] fs_path integration + minor changes. Signed-off-by: Cristiano Urban --- transfer_service/db_connector.py | 6 ++++-- transfer_service/import_executor.py | 12 +++++++++--- transfer_service/node.py | 4 ++++ transfer_service/store_preprocessor.py | 18 +++++++++--------- transfer_service/transfer_service.py | 8 +++++--- 5 files changed, 31 insertions(+), 17 deletions(-) diff --git a/transfer_service/db_connector.py b/transfer_service/db_connector.py index 5230e7f..7a52037 100644 --- a/transfer_service/db_connector.py +++ b/transfer_service/db_connector.py @@ -171,7 +171,7 @@ class DbConnector(object): conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" - SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, get_os_path(n.node_id) AS os_path, content_length + SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, '/' || fs_path AS os_path, content_length FROM node n JOIN location l ON n.location_id = l.location_id JOIN storage s ON s.storage_id = l.storage_src_id @@ -1026,6 +1026,7 @@ class DbConnector(object): cursor.execute(""" INSERT INTO node(parent_path, parent_relative_path, + fs_path, name, tstamp_wrapper_dir, type, @@ -1036,13 +1037,14 @@ class DbConnector(object): creator_id, content_length, content_md5) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING RETURNING node_id; """, (parentLtreePath, parentLtreeRelativePath, + node.fsPath, node.name, node.wrapperDir, node.type, diff --git a/transfer_service/import_executor.py b/transfer_service/import_executor.py index be9e79d..6c21492 100644 --- a/transfer_service/import_executor.py +++ b/transfer_service/import_executor.py @@ -71,7 +71,11 @@ class ImportExecutor(TaskExecutor): self.logger.info("++++++++++ Start of import phase ++++++++++") self.jobObj.setPhase("EXECUTING") self.jobObj.setStartTime(datetime.datetime.now().isoformat()) - self.dbConn.insertJob(self.jobObj) + try: + self.dbConn.insertJob(self.jobObj) + except Exception: + self.logger.exception("FATAL: unable to set job 'start_time' and 'phase'.") + return False self.logger.info("Job phase updated to EXECUTING.") if self.storageType == "cold": @@ -103,14 +107,15 @@ class ImportExecutor(TaskExecutor): vospacePath = parentPath + nodeName else: vospacePath = parentPath + '/' + nodeName + fsPath = vospacePath.split('/', 2)[-1] cnode.setParentPath(parentPath) + cnode.setFsPath(fsPath) cnode.setLocationId(locationId) cnode.setJobId(self.jobId) cnode.setCreatorId(self.userId) cnode.setContentLength(0) cnode.setAsyncTrans(True) cnode.setSticky(True) - try: now = datetime.datetime.now().isoformat() if os.path.islink(dir): @@ -134,7 +139,9 @@ class ImportExecutor(TaskExecutor): parentPath = tstampWrapperDirPattern.sub("", parentPath) dnode.setWrapperDir(tstampWrapperDir) vospacePath = parentPath + '/' + nodeName + fsPath = vospacePath.split('/', 2)[-1] dnode.setParentPath(parentPath) + dnode.setFsPath(fsPath) dnode.setLocationId(locationId) dnode.setJobId(self.jobId) dnode.setCreatorId(self.userId) @@ -142,7 +149,6 @@ class ImportExecutor(TaskExecutor): dnode.setContentMD5(self.md5calc.getMD5(file)) dnode.setAsyncTrans(True) dnode.setSticky(True) - try: now = datetime.datetime.now().isoformat() if os.path.islink(file): diff --git a/transfer_service/node.py b/transfer_service/node.py index 6ab0e0d..23708d9 100644 --- a/transfer_service/node.py +++ b/transfer_service/node.py @@ -14,6 +14,7 @@ class Node(object): def __init__(self, name, type): self.parentPath = None + self.fsPath = None self.name = name self.wrapperDir = None self.type = type @@ -53,6 +54,9 @@ class Node(object): def setParentPath(self, parentPath): self.parentPath = parentPath + def setFsPath(self, fsPath): + self.fsPath = fsPath + def setName(self, name): self.name = name diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index 9ffe24e..f94bbda 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -151,16 +151,16 @@ class StorePreprocessor(TaskExecutor): if tstampWrapperDirPattern.search(basePath): tstampWrapperDir = tstampWrapperDirPattern.search(basePath).group(0).lstrip('/') basePath = tstampWrapperDirPattern.sub("", basePath) - cnode.setWrapperDir(tstampWrapperDir) - cnode.setParentPath(basePath) + cnode.setWrapperDir(tstampWrapperDir) + vospacePath = basePath + '/' + nodeName + fsPath = vospacePath.split("/" + self.username + "/")[1] + cnode.setParentPath(basePath) + cnode.setFsPath(fsPath) cnode.setLocationId(locationId) cnode.setJobId(self.jobId) cnode.setCreatorId(self.userId) cnode.setContentLength(0) cnode.setSticky(True) - - vospacePath = basePath + '/' + nodeName - try: now = datetime.datetime.now().isoformat() if os.path.islink(dir): @@ -184,17 +184,17 @@ class StorePreprocessor(TaskExecutor): if tstampWrapperDirPattern.search(basePath): tstampWrapperDir = tstampWrapperDirPattern.search(basePath).group(0).lstrip('/') basePath = tstampWrapperDirPattern.sub("", basePath) - dnode.setWrapperDir(tstampWrapperDir) + dnode.setWrapperDir(tstampWrapperDir) + vospacePath = basePath + '/' + nodeName + fsPath = vospacePath.split("/" + self.username + "/")[1] dnode.setParentPath(basePath) + dnode.setFsPath(fsPath) dnode.setLocationId(locationId) dnode.setJobId(self.jobId) dnode.setCreatorId(self.userId) dnode.setContentLength(os.path.getsize(file)) dnode.setContentMD5(self.md5calc.getMD5(file)) dnode.setSticky(True) - - vospacePath = basePath + '/' + nodeName - try: now = datetime.datetime.now().isoformat() if os.path.islink(file): diff --git a/transfer_service/transfer_service.py b/transfer_service/transfer_service.py index 978aa97..ad8df24 100644 --- a/transfer_service/transfer_service.py +++ b/transfer_service/transfer_service.py @@ -84,9 +84,11 @@ class TransferService(object): self.jobScheduler.start() self.vosRestHandler.start() self.cliHandler.start() - self.logger.info("##########################################################") - self.logger.info("########## VOSpace Transfer Service is RUNNING! ##########") - self.logger.info("##########################################################") + self.logger.info(""" +########################################################## +########## VOSpace Transfer Service is RUNNING! ########## +########################################################## + """) #else: # print("The VOSpace Transfer Service requires super user privileges.") # sys.exit(1) -- GitLab