diff --git a/transfer_service/import_executor.py b/transfer_service/import_executor.py index 5689e9ca398eacf40b92b4ecb600da3e820e2d63..dd91c78e3b23205ddc5ea5e8f6b837de23218c7d 100644 --- a/transfer_service/import_executor.py +++ b/transfer_service/import_executor.py @@ -16,15 +16,15 @@ from task_executor import TaskExecutor class ImportExecutor(TaskExecutor): - + def __init__(self): self.md5calc = Checksum() - config = Config("/etc/vos_ts/vos_ts.conf") + config = Config("/etc/vos_ts/vos_ts.conf") self.params = config.loadSection("file_catalog") - self.dbConn = DbConnector(self.params["user"], - self.params["password"], - self.params["host"], - self.params.getint("port"), + self.dbConn = DbConnector(self.params["user"], + self.params["password"], + self.params["host"], + self.params.getint("port"), self.params["db"], 1, 1) @@ -42,49 +42,49 @@ class ImportExecutor(TaskExecutor): self.storageId = None self.storageType = None super(ImportExecutor, self).__init__() - + def importVOSpaceNodes(self): """This method performs the VOSpace import operation.""" - + self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) - + start = dt.now() nodeList = [] timestamp = dt.now().strftime("%Y_%m_%d-%H_%M_%S") nodeListFile = "vos_import_report-" + timestamp nlfp = open(nodeListFile, "w") - + out = open("import_amqp_server_log.txt", "a") - if self.storageType == "cold": + if self.storageType == "cold": self.tapeClient.connect() self.tapeClient.recallChecksumFiles(self.path) - self.tapeClient.disconnect() - + self.tapeClient.disconnect() + [ dirs, files ] = self.systemUtils.scanRecursive(os.path.dirname(self.path)) - + tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") - for dir in dirs: + for dir in dirs: out.write(f"DIR dir: {dir}\n") out.write(f"DIR pathPrefix: {self.pathPrefix}\n\n") - + if self.path in dir: parentPath = os.path.dirname(dir).split(self.pathPrefix)[1] nodeName = os.path.basename(dir) cnode = Node(nodeName, "container") - + if not tstampWrapperDirPattern.match("/" + nodeName): if tstampWrapperDirPattern.search(parentPath): tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') parentPath = tstampWrapperDirPattern.sub("", parentPath) cnode.setWrapperDir(tstampWrapperDir) - + if parentPath == '/': vospacePath = parentPath + nodeName else: vospacePath = parentPath + '/' + nodeName - + cnode.setParentPath(parentPath) locationId = self.dbConn.getLocationId(self.storageId) cnode.setLocationId(locationId) @@ -101,7 +101,7 @@ class ImportExecutor(TaskExecutor): nodeList.append([ now, dir, vospacePath, "container", "SKIP" ]) for flist in files: - for file in flist: + for file in flist: if self.md5calc.fileIsValid(file) and self.path in os.path.dirname(file): out.write(f"FILE files: {files}\n") out.write(f"FILE flist: {flist}\n") @@ -110,14 +110,14 @@ class ImportExecutor(TaskExecutor): parentPath = os.path.dirname(file).split(self.pathPrefix)[1] out.write(f"FILE parentPath: {parentPath}\n") nodeName = os.path.basename(file) - out.write(f"FILE nodeName: {nodeName}\n") + out.write(f"FILE nodeName: {nodeName}\n") dnode = Node(nodeName, "data") - + if tstampWrapperDirPattern.search(parentPath): tstampWrapperDir = tstampWrapperDirPattern.search(parentPath).group(0).lstrip('/') parentPath = tstampWrapperDirPattern.sub("", parentPath) dnode.setWrapperDir(tstampWrapperDir) - + vospacePath = parentPath + '/' + nodeName out.write(f"FILE vospacePath: {vospacePath}\n") dnode.setParentPath(parentPath) @@ -127,7 +127,7 @@ class ImportExecutor(TaskExecutor): dnode.setCreatorID(self.userId) dnode.setContentLength(os.path.getsize(file)) dnode.setContentMD5(self.md5calc.getMD5(file)) - + if not self.dbConn.nodeExists(dnode): self.dbConn.insertNode(dnode) self.dbConn.setAsyncTrans(vospacePath, True) @@ -137,41 +137,41 @@ class ImportExecutor(TaskExecutor): else: now = dt.now() nodeList.append([ now, file, vospacePath, "data", "SKIP" ]) - - nlfp.write(tabulate(nodeList, + + nlfp.write(tabulate(nodeList, headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"], tablefmt = "simple")) nlfp.close() end = dt.now() - + # Update job status (to be moved) results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") self.dbConn.setEndTime(self.jobId) - + m = Mailer() m.addRecipient("cristiano.urban@inaf.it") msg = f""" [VOSpace import procedure summary] - + Storage type: {self.storageType} Storage ID: {self.storageId} - Creator ID: {self.userId} + Creator ID: {self.userId} Start time: {start} End time: {end} Processed nodes: {len(nodeList)} Imported nodes: {sum(res[-1] == 'DONE' for res in nodeList)} Skipped nodes: {sum(res[-1] == 'SKIP' for res in nodeList)} - + """ m.setMessageWithAttachment("VOSpace import notification", msg, nodeListFile) - m.send() + m.send() os.remove(nodeListFile) - - + + def run(self): print("Starting import executor...") self.setSourceQueueName("import_ready") diff --git a/transfer_service/import_rpc_server.py b/transfer_service/import_rpc_server.py index 219cf36627c4fe9f9f101ee10c6dda6b55f6793b..99cdef8d441366b362a38f12498676fceeddd74a 100644 --- a/transfer_service/import_rpc_server.py +++ b/transfer_service/import_rpc_server.py @@ -40,7 +40,7 @@ class ImportRPCServer(RedisRPCServer): username = requestBody["userName"] userInDb = self.dbConn.userExists(username) userInfo = self.systemUtils.userInfo(username) - out = open("import_amqp_server_log.txt", "a") + #out = open("import_amqp_server_log.txt", "a") if not userInfo or not userInDb: response = { "responseType": "ERROR", @@ -92,8 +92,8 @@ class ImportRPCServer(RedisRPCServer): jobObj.setOwnerId(userId) self.dbConn.insertJob(jobObj) self.importReadyQueue.insertJob(jobObj) - out.write(requestBody.copy()) - out.close() + #out.write(requestBody.copy()) + #out.close() response = { "responseType": "IMPORT_STARTED" } else: