From f5bce7afbb0bfe047b32ddcec4397234219d8a53 Mon Sep 17 00:00:00 2001 From: Cristiano Urban Date: Fri, 23 Apr 2021 16:36:37 +0200 Subject: [PATCH] Added size convertion from human readable to bytes + minor changes. Signed-off-by: Cristiano Urban --- transfer_service/checksum.py | 4 +++- transfer_service/config/vos_ts.conf | 11 ++++++++--- transfer_service/retrieve_executor.py | 7 +++++-- transfer_service/store_preprocessor.py | 2 +- transfer_service/system_utils.py | 22 ++++++++++++++++++++++ transfer_service/tape_client.py | 25 ++++++++++++++++++++----- 6 files changed, 59 insertions(+), 12 deletions(-) diff --git a/transfer_service/checksum.py b/transfer_service/checksum.py index 75b6fbd..986d268 100644 --- a/transfer_service/checksum.py +++ b/transfer_service/checksum.py @@ -5,14 +5,16 @@ import os import sys from config import Config +from system_utils import SystemUtils class Checksum(object): def __init__(self): + self.systemUtils = SystemUtils() config = Config("/etc/vos_ts/vos_ts.conf") self.params = config.loadSection("checksum") - self.fileBufferSize = self.params.getint("file_buffer_size") + self.fileBufferSize = self.systemUtils.convertSizeToBytes(self.params["file_buffer_size"]) self.md5FileSuffix = self.params["md5_file_suffix"] def setFileBufferSize(fileBufferSize): diff --git a/transfer_service/config/vos_ts.conf b/transfer_service/config/vos_ts.conf index 4bdf759..ab63584 100644 --- a/transfer_service/config/vos_ts.conf +++ b/transfer_service/config/vos_ts.conf @@ -51,13 +51,13 @@ pkey_file_path = /root/.ssh/tape_rsa ; suffix for files containing MD5 checksums md5_file_suffix = -md5sum.txt ; buffer size in bytes when reading a chunk of data, default is 4096 B -file_buffer_size = 4096 +file_buffer_size = 4096 B [file_grouper] ; minimum number of files contained by a 'leaf', default is 1000 min_num_files = 1000 -; maximum size in GB for a 'leaf', default is 100 GB -max_dir_size = 100 +; maximum size for a 'leaf', default is 100 GB +max_dir_size = 100 GB [scheduling] ; maximum number of jobs within a 'pending' queue, default is 32 @@ -70,6 +70,10 @@ max_terminated_jobs = 8 ; job queues (do NOT set to zero), default is 15 s exec_wait_time = 15 +[transfer] +; split data to be retrieved in blocks of a given size +block_size = 2 MB + [cleanup] ; Physically delete from disk all nodes previously deleted by the user via ui, ; that are older than a given amount of time specified here below @@ -82,6 +86,7 @@ seconds = 30 smtp_server = smtp2.oats.inaf.it smtp_port = 25 no_reply_email = noreply-vospace@inaf.it +admin_email = cristiano.urban@inaf.it #################### diff --git a/transfer_service/retrieve_executor.py b/transfer_service/retrieve_executor.py index 81df4c2..e8fe868 100644 --- a/transfer_service/retrieve_executor.py +++ b/transfer_service/retrieve_executor.py @@ -8,6 +8,7 @@ import sys from checksum import Checksum from config import Config from db_connector import DbConnector +from system_utils import SystemUtils from tape_client import TapeClient from task_executor import TaskExecutor @@ -15,6 +16,7 @@ from task_executor import TaskExecutor class RetrieveExecutor(TaskExecutor): def __init__(self): + self.systemUtils = SystemUtils() config = Config("/etc/vos_ts/vos_ts.conf") self.params = config.loadSection("spectrum_archive") self.tapeClient = TapeClient(self.params["host"], @@ -31,13 +33,14 @@ class RetrieveExecutor(TaskExecutor): self.params["db"], 1, 1) + self.params = config.loadSection("transfer") + self.maxBlockSize = self.systemUtils.convertSizeToBytes(self.params["block_size"]) self.storageType = None self.jobObj = None self.jobId = None self.nodeList = [] self.fileList = [] - self.numBlocks = None - self.maxBlockSize = 1536000 + self.numBlocks = None super(RetrieveExecutor, self).__init__() def buildFileList(self): diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index f315827..385c312 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -30,7 +30,7 @@ class StorePreprocessor(TaskExecutor): config = Config("/etc/vos_ts/vos_ts.conf") self.params = config.loadSection("file_grouper") self.fileGrouper = FileGrouper(self.params.getint("min_num_files"), - self.params.getint("max_dir_size") * (2 ** 30)) + self.systemUtils.convertSizeToBytes(params["max_dir_size"])) self.params = config.loadSection("file_catalog") self.dbConn = DbConnector(self.params["user"], self.params["password"], diff --git a/transfer_service/system_utils.py b/transfer_service/system_utils.py index 8362f6c..d586b4d 100644 --- a/transfer_service/system_utils.py +++ b/transfer_service/system_utils.py @@ -5,6 +5,15 @@ import sys class SystemUtils(object): + + UNITS = { + "B": 1, + "KB": 1 << 10, + "MB": 1 << 20, + "GB": 1 << 30, + "TB": 1 << 40, + "PB": 1 << 50, + } def __init__(self): pass @@ -67,7 +76,20 @@ class SystemUtils(object): sys.exit("FATAL: invalid file/dir.") return [ dirList, fileList ] + def convertSizeToBytes(self, sizeStr): + size = int(sizeStr.split()[0]) + unit = sizeStr.split()[1] + if unit not in self.UNITS: + return False + else: + return size * self.UNITS[unit] + + + + + # Test #sysUtils = SystemUtils() #info = sysUtils.userInfo("cristiano") +#print(sysUtils.convertSizeInBytes(" 10 MB ")) #print(info) diff --git a/transfer_service/tape_client.py b/transfer_service/tape_client.py index 43977e6..f8ec4de 100644 --- a/transfer_service/tape_client.py +++ b/transfer_service/tape_client.py @@ -11,6 +11,7 @@ from tape_task import TapeTask class TapeClient(object): + EEADM = "/opt/ibm/ltfsee/bin/eeadm" # Constructor def __init__(self, host, port, user, keyFile): @@ -74,27 +75,41 @@ class TapeClient(object): self.scp.close() def migrate(self, fileList): + """ + Migrates to tape all files whose absolute path is + contained in 'fileList'. + """ tmp = str(uuid.uuid1().hex) + "-vos_migrate.tmp" fp = open(tmp, "a") for f in fileList: fp.write(f"{f}\n") fp.close() - self.copy("./" + tmp, "/tmp/" + tmp) - cmd = "/opt/ibm/ltfsee/bin/eeadm migrate /tmp/" + tmp + " -p pl_generic_rw_01" + self.copy(f"./{tmp}", f"/tmp/{tmp}") + os.remove(f"./{tmp}") + cmd = f"{EEADM} migrate /tmp/{tmp} -p pl_generic_rw_01" stdin, stdout, stderr = self.client.exec_command(cmd) def recall(self, fileList): + """ + Recalls from tape all files whose absolute path is + contained in 'fileList'. + """ tmp = str(uuid.uuid1().hex) + "-vos_recall.tmp" fp = open(tmp, "a") for f in fileList: fp.write(f"{f}\n") fp.close() + os.remove(f"./{tmp}") self.copy("./" + tmp, "/tmp/" + tmp) - cmd = "/opt/ibm/ltfsee/bin/eeadm recall /tmp/" + tmp + cmd = f"{EEADM} recall /tmp/{tmp}" stdin, stdout, stderr = self.client.exec_command(cmd) def recallChecksumFiles(self, dirName): - cmd = "find $(dirname " + dirName + ") -type f \( -iname \"*-md5sum.txt\" \) | /opt/ibm/ltfsee/bin/eeadm recall" + """ + Recursively recalls from tape all the checksum files related to + the 'dirName' directory. + """ + cmd = f"find $(dirname {dirName}) -type f \( -iname \"*-md5sum.txt\" \) | {EEADM}" stdin, stdout, stderr = self.client.exec_command(cmd) exitCode = stdout.channel.recv_exit_status() out = open("tape_client_log.txt", "a") @@ -110,7 +125,7 @@ class TapeClient(object): self.client.close() def getSize(self, fsMountPoint): - cmd = "df " + fsMountPoint + " | tail -n +2" + cmd = f"df {fsMountPoint} | tail -n +2" #out = subprocess.run(cmd, shell = True, capture_output = True) stdin, stdout, stderr = self.client.exec_command(cmd) #res = stdout.stdout.decode('UTF-8').rstrip('\n') -- GitLab