From ab5ce8c5a53574ea86c733d4289f6dec82993ae3 Mon Sep 17 00:00:00 2001 From: Cristiano Urban Date: Thu, 22 Apr 2021 19:13:53 +0200 Subject: [PATCH] Added some experimental methods for block splitting in retrieve operations. Signed-off-by: Cristiano Urban --- transfer_service/retrieve_executor.py | 138 +++++++++++++++++++++++++- 1 file changed, 136 insertions(+), 2 deletions(-) diff --git a/transfer_service/retrieve_executor.py b/transfer_service/retrieve_executor.py index 795492a..14a7e9b 100644 --- a/transfer_service/retrieve_executor.py +++ b/transfer_service/retrieve_executor.py @@ -1,9 +1,11 @@ #!/usr/bin/env python +import json import os import subprocess import sys +from checksum import Checksum from config import Config from db_connector import DbConnector from tape_client import TapeClient @@ -29,17 +31,127 @@ class RetrieveExecutor(TaskExecutor): self.params["db"], 1, 1) + self.storageType = None self.jobObj = None self.jobId = None self.nodeList = [] + self.fileList = [] + self.numBlocks = None + self.maxBlockSize = 20 super(RetrieveExecutor, self).__init__() + def buildFileList(self): + if os.path.exists("nodeList.txt"): + os.remove("nodeList.txt") + nl = open("nodeList.txt", 'w') + for vospacePath in self.nodeList: + nl.write(vospacePath + '\n') + nl.close() + + for vospacePath in self.nodeList: + nodeInfo = self.dbConn.getOSPath(vospacePath) + srcPath = nodeInfo["fullPath"] + md5calc = Checksum() + if os.path.isdir(srcPath): + for root, dirs, files in os.walk(srcPath, topdown = False): + for f in files: + fullPath = os.path.join(root, f) + if md5calc.fileIsValid(fullPath): + fileSize = os.stat(fullPath).st_size + fileInfo = { + "fullPath": fullPath, + "fileSize": fileSize, + "vospaceRootParent": vospacePath + } + self.fileList.append(fileInfo.copy()) + else: + if md5calc.fileIsValid(srcPath): + fileSize = nodeInfo["contentLength"] + fileInfo = { + "fullPath": srcPath, + "fileSize": fileSize, + "vospaceRootParent": vospacePath + } + self.fileList.append(fileInfo.copy()) + + + if os.path.exists("fileList.txt"): + os.remove("fileList.txt") + fl = open("fileList.txt", 'w') + fl.write(json.dumps(self.fileList, indent = 4)) + fl.close() + + def buildBlocks(self): + + if self.fileList: + blockIdx = 0 + blockSize = 0 + for fileInfo in self.fileList: + fileSize = fileInfo["fileSize"] + # controlla se il file ha dimensione superiore alla dimensione di un blocco + if fileSize > self.maxBlockSize: + # chiudi il blocco corrente, se non vuoto, altrimenti usa quello? + # crea un nuovo blocco con solo il file in questione + # chiudi il blocco e aggiungilo alla lista dei blocchi + if blockSize > 0: + blockIdx += 1 + fileInfo["blockIdx"] = blockIdx + blockIdx += 1 + #self.blockList.append(currentBlock.copy()) + #currentBlock.clear() + #currentBlock.append(fileInfo.copy()) + else: + fileInfo["blockIdx"] = blockIdx + blockIdx += 1 + #currentBlock.append(fileInfo.copy()) + #self.blockList.append(currentBlock.copy()) + #currentBlock.clear() + blockSize = 0 + else: + # il file sta dentro un blocco, quindi controlla se + # la dimensione del file sommata al livello di riempimento + # del blocco attuale eccede o meno la dimensione del blocco + if blockSize + fileSize <= self.maxBlockSize: + # se si, aggiungi il file al blocco e vai avanti col prossimo + fileInfo["blockIdx"] = blockIdx + #currentBlock.append(fileInfo.copy()) + blockSize += fileSize + else: + # se no, chiudi il blocco attuale e aggiungilo alla lista blocchi + # crea un nuovo blocco, aggiungi il file e vai avanti col prossimo + blockIdx += 1 + fileInfo["blockIdx"] = blockIdx + blockSize = fileSize + #self.blockList.append(currentBlock.copy()) + #currentBlock.clear() + #blockSize = 0 + #currentBlock.append(fileInfo.copy()) + #blockSize += fileSize + if self.fileList: + self.numBlocks = blockIdx + 1 + print(f"numBlocks = {self.numBlocks}") + if os.path.exists("blocks.txt"): + os.remove("blocks.txt") + fl = open("blocks.txt", 'w') + fl.write(json.dumps(self.fileList, indent = 4)) + fl.close() + #blockList = [ block0 = [file0 = [], ..., fileJ = [] ], + # ..., + # blockN = [file0 = [], ..., fileK = [] ] + # ] + def prepareData(self): fileList = [] self.dbConn.setPhase(self.jobId, "EXECUTING") self.dbConn.setStartTime(self.jobId) for vospacePath in self.nodeList: - [srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) + #[srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) + fileInfo = self.dbConn.getOSPath(vospacePath) + srcPath = fileInfo["fullPath"] + storageType = fileInfo["storageType"] + username = fileInfo["username"] + osRelPath = fileInfo["osPath"] + fileSize = fileInfo["contentLength"] if storageType == "cold": if os.path.isdir(srcPath): for root, dirs, files in os.walk(srcPath): @@ -54,7 +166,14 @@ class RetrieveExecutor(TaskExecutor): def retrieveData(self): for vospacePath in self.nodeList: - [srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) + #[srcPath, storageType, username, osRelPath] = self.dbConn.getOSPath(vospacePath) + fileInfo = self.dbConn.getOSPath(vospacePath) + srcPath = fileInfo["fullPath"] + storageType = fileInfo["storageType"] + username = fileInfo["username"] + osRelPath = fileInfo["osPath"] + fileSize = fileInfo["contentLength"] + osRelParentPath = os.path.dirname(osRelPath) if osRelParentPath != "/": osRelParentPath += "/" @@ -91,6 +210,8 @@ class RetrieveExecutor(TaskExecutor): vospacePath = self.jobObj.jobInfo["transfer"]["target"].split("!vospace")[1] if nodeType == "list": self.dbConn.deleteTmpDataNode(vospacePath) + self.fileList.clear() + self.nodeList.clear() def run(self): print("Starting retrieve executor...") @@ -102,13 +223,26 @@ class RetrieveExecutor(TaskExecutor): self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.nodeList = self.jobObj.jobInfo["nodeList"] + self.buildFileList() + self.buildBlocks() self.prepareData() result = self.retrieveData() if result: self.updateJobStatus() self.cleanup() + print(f"fileList = {self.fileList}") + print(f"nodeList = {self.nodeList}") else: sys.exit("Failed to retrieve data!") self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() print(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") + + # Ottieni il tipo di storage + # Cicla su ciascun blocco + # -Se storage = cold => recall di tutti i file del blocco + # -copia e rimuovi ciascun file del blocco + # - alla fine della copia del blocco controlla se è stato completato qualche path vospace di cartella ( + # (cioè se ciascuno dei vospacePath è presente o meno in uno dei campi vospaceRootParent) + # -se si, aggiorna ricorsivamente i flag su quel path vospace + # -Se storage = cold => migra tutti i file del blocco per fare spazio sul frontend -- GitLab