From 35e390d038745eaf0b9ec501582b94f18013706f Mon Sep 17 00:00:00 2001 From: Cristiano Urban <cristiano.urban@inaf.it> Date: Mon, 12 Apr 2021 14:08:41 +0200 Subject: [PATCH] Added daemonized process to handle long import procedures. Signed-off-by: Cristiano Urban <cristiano.urban@inaf.it> --- transfer_service/import_amqp_server.py | 156 +++++++++++++------------ 1 file changed, 81 insertions(+), 75 deletions(-) diff --git a/transfer_service/import_amqp_server.py b/transfer_service/import_amqp_server.py index 0b49533..50c3632 100644 --- a/transfer_service/import_amqp_server.py +++ b/transfer_service/import_amqp_server.py @@ -11,6 +11,7 @@ from node import Node from system_utils import SystemUtils from tape_client import TapeClient +from multiprocessing import Process class ImportAMQPServer(AMQPServer): @@ -43,7 +44,7 @@ class ImportAMQPServer(AMQPServer): username = requestBody["userName"] userInDb = self.dbConn.userExists(username) userInfo = self.systemUtils.userInfo(username) - out = open("import_amqp_server_log.txt", "a") + #out = open("import_amqp_server_log.txt", "a") if not userInfo or not userInDb: response = { "responseType": "ERROR", @@ -84,80 +85,8 @@ class ImportAMQPServer(AMQPServer): "errorMsg": "Invalid path, directory must be located in " + pathPrefix + '/' + username } return response else: - if storageType == "cold": - self.tapeClient.connect() - self.tapeClient.recallChecksumFiles(path) - self.tapeClient.disconnect() - - [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(path)) - - tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") - for dir in dirs: - out.write(f"DIR dir: {dir}\n") - out.write(f"DIR pathPrefix: {pathPrefix}\n\n") - - if path in dir: - parentPath = os.path.dirname(dir).split(pathPrefix)[1] - nodeName = os.path.basename(dir) - - cnode = Node(nodeName, "container") - - if not tstampWrapperDirPattern.match("/" + nodeName): - if tstampWrapperDirPattern.search(parentPath): - tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') - parentPath = tstampWrapperDirPattern.sub("", parentPath) - cnode.setWrapperDir(tstampWrapperDir) - - if parentPath == '/': - vospacePath = parentPath + nodeName - else: - vospacePath = parentPath + '/' + nodeName - - cnode.setParentPath(parentPath) - locationId = self.dbConn.getLocationId(storageId) - cnode.setLocationId(locationId) - cnode.setOwnerID(userId) - cnode.setCreatorID(userId) - cnode.setContentLength(0) - if not self.dbConn.nodeExists(cnode): - self.dbConn.insertNode(cnode) - self.dbConn.setAsyncTrans(vospacePath, True) - self.dbConn.setSticky(vospacePath, True) - - for flist in files: - for file in flist: - if self.md5calc.fileIsValid(file) and path in os.path.dirname(file): - out.write(f"FILE files: {files}\n") - out.write(f"FILE flist: {flist}\n") - out.write(f"FILE file: {file}\n") - out.write(f"FILE pathPrefix: {pathPrefix}\n") - parentPath = os.path.dirname(file).split(pathPrefix)[1] - out.write(f"FILE parentPath: {parentPath}\n") - nodeName = os.path.basename(file) - out.write(f"FILE nodeName: {nodeName}\n") - dnode = Node(nodeName, "data") - - if tstampWrapperDirPattern.search(parentPath): - tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') - parentPath = tstampWrapperDirPattern.sub("", parentPath) - dnode.setWrapperDir(tstampWrapperDir) - - vospacePath = parentPath + '/' + nodeName - out.write(f"FILE vospacePath: {vospacePath}\n") - dnode.setParentPath(parentPath) - storageId = self.dbConn.getStorageId(pathPrefix) - locationId = self.dbConn.getLocationId(storageId) - dnode.setLocationId(locationId) - dnode.setOwnerID(userId) - dnode.setCreatorID(userId) - dnode.setContentLength(os.path.getsize(file)) - dnode.setContentMD5(self.md5calc.getMD5(file)) - - if not self.dbConn.nodeExists(dnode): - self.dbConn.insertNode(dnode) - self.dbConn.setAsyncTrans(vospacePath, True) - self.dbConn.setSticky(vospacePath, True) - + p = Process(target = self.load, args = (self.tapeClient, self.dbConn, self.md5calc, self.systemUtils, path, pathPrefix, storageType, storageId, userId,), daemon = True) + p.start() # add a counter to track the number of nodes (files and dirs) + log file response = { "responseType": "IMPORT_DONE" } else: @@ -170,3 +99,80 @@ class ImportAMQPServer(AMQPServer): def run(self): print(f"Starting AMQP server of type {self.type}...") super(ImportAMQPServer, self).run() + + def load(self, tapeClient, dbConn, md5calc, systemUtils, path, pathPrefix, storageType, storageId, userId): + out = open("import_amqp_server_log.txt", "a") + if storageType == "cold": + tapeClient.connect() + tapeClient.recallChecksumFiles(path) + tapeClient.disconnect() + + [ dirs, files ] = systemUtils.scanRecursive(os.path.dirname(path)) + + tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") + for dir in dirs: + out.write(f"DIR dir: {dir}\n") + out.write(f"DIR pathPrefix: {pathPrefix}\n\n") + + if path in dir: + parentPath = os.path.dirname(dir).split(pathPrefix)[1] + nodeName = os.path.basename(dir) + + cnode = Node(nodeName, "container") + + if not tstampWrapperDirPattern.match("/" + nodeName): + if tstampWrapperDirPattern.search(parentPath): + tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') + parentPath = tstampWrapperDirPattern.sub("", parentPath) + cnode.setWrapperDir(tstampWrapperDir) + + if parentPath == '/': + vospacePath = parentPath + nodeName + else: + vospacePath = parentPath + '/' + nodeName + + cnode.setParentPath(parentPath) + locationId = dbConn.getLocationId(storageId) + cnode.setLocationId(locationId) + cnode.setOwnerID(userId) + cnode.setCreatorID(userId) + cnode.setContentLength(0) + if not dbConn.nodeExists(cnode): + dbConn.insertNode(cnode) + dbConn.setAsyncTrans(vospacePath, True) + dbConn.setSticky(vospacePath, True) + + for flist in files: + for file in flist: + if md5calc.fileIsValid(file) and path in os.path.dirname(file): + out.write(f"FILE files: {files}\n") + out.write(f"FILE flist: {flist}\n") + out.write(f"FILE file: {file}\n") + out.write(f"FILE pathPrefix: {pathPrefix}\n") + parentPath = os.path.dirname(file).split(pathPrefix)[1] + out.write(f"FILE parentPath: {parentPath}\n") + nodeName = os.path.basename(file) + out.write(f"FILE nodeName: {nodeName}\n") + dnode = Node(nodeName, "data") + + if tstampWrapperDirPattern.search(parentPath): + tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') + parentPath = tstampWrapperDirPattern.sub("", parentPath) + dnode.setWrapperDir(tstampWrapperDir) + + vospacePath = parentPath + '/' + nodeName + out.write(f"FILE vospacePath: {vospacePath}\n") + dnode.setParentPath(parentPath) + storageId = dbConn.getStorageId(pathPrefix) + locationId = dbConn.getLocationId(storageId) + dnode.setLocationId(locationId) + dnode.setOwnerID(userId) + dnode.setCreatorID(userId) + dnode.setContentLength(os.path.getsize(file)) + dnode.setContentMD5(md5calc.getMD5(file)) + + if not dbConn.nodeExists(dnode): + dbConn.insertNode(dnode) + dbConn.setAsyncTrans(vospacePath, True) + dbConn.setSticky(vospacePath, True) + -- GitLab