#!/usr/bin/env python # # This file is part of vospace-transfer-service # Copyright (C) 2021 Istituto Nazionale di Astrofisica # SPDX-License-Identifier: GPL-3.0-or-later # import datetime import logging import os import shutil import subprocess import sys from config import Config from datetime import datetime as dt from db_connector import DbConnector from mailer import Mailer from redis_log_handler import RedisLogHandler from system_utils import SystemUtils from tabulate import tabulate from tape_client import TapeClient from task_executor import TaskExecutor class StoreExecutor(TaskExecutor): # We use 10 GB of tolerance when we calculate # the free space in our storage point TOL = 10 * (2**30) def __init__(self): self.type = "store_executor" self.systemUtils = SystemUtils() config = Config("/etc/vos_ts/vos_ts.conf") params = config.loadSection("transfer_node") self.storageStorePath = params["store_path"] params = config.loadSection("transfer") self.maxBlockSize = self.systemUtils.convertSizeToBytes(params["block_size"]) params = config.loadSection("scheduling") self.maxTerminatedJobs = params.getint("max_terminated_jobs") params = config.loadSection("mail") self.adminEmail = params["admin_email"] params = config.loadSection("logging") self.logger = logging.getLogger(__name__) logLevel = "logging." + params["log_level"] logFormat = params["log_format"] logFormatter = logging.Formatter(logFormat) self.logger.setLevel(eval(logLevel)) redisLogHandler = RedisLogHandler() redisLogHandler.setFormatter(logFormatter) self.logger.addHandler(redisLogHandler) self.resDir = params["res_dir"] params = config.loadSection("file_catalog") self.dbConn = DbConnector(params["user"], params["password"], params["host"], params.getint("port"), params["db"], 1, 1, self.logger) params = config.loadSection("spectrum_archive") self.tapeClient = TapeClient(params["host"], params.getint("port"), params["user"], params["pkey_file_path"], self.logger) self.jobObj = None self.jobId = None self.username = None self.userId = None self.userEmail = None self.requestType = None self.storageId = None self.storageType = None self.nodeList = [] super(StoreExecutor, self).__init__() def execute(self): try: self.logger.info("++++++++++ Start of execution phase ++++++++++") self.logger.info(f"Data size: {self.dataSize} B") try: self.jobObj.setPhase("EXECUTING") self.jobObj.setStartTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) except Exception: self.logger.exception("FATAL: unable to update the file catalog.") return False else: self.logger.info("Job phase updated to EXECUTING.") srcPathPrefix = self.storageStorePath.replace("{username}", self.username) srcData = os.listdir(srcPathPrefix) self.logger.debug("Checking storage available space...") try: storageBasePath = self.dbConn.getStorageBasePath(self.storageId) except Exception: self.logger.exception("FATAL: unable to obtain the storage base path.") return False storageFreeSpace = 0 if self.storageType == "hot": [ total, used, free ] = self.systemUtils.getFileSystemSize(storageBasePath) storageFreeSpace = free - self.TOL self.logger.debug(f"storageFreeSpace (hot): {storageFreeSpace} B") else: try: self.tapeHSMFilesystem = self.dbConn.getTapeHSMFilesystem(self.storageId) except Exception: self.logger.exception("FATAL: unable to obtain the tape HSM filesystem.") return False else: self.tapeClient.connect() storageFreeSpace = self.tapeClient.getHSMFilesystemFreeSpace(self.tapeHSMFilesystem) - self.TOL self.tapeClient.disconnect() self.logger.debug(f"storageFreeSpace (cold): {storageFreeSpace} B") if storageFreeSpace < self.dataSize: self.logger.error("FATAL: space available on the storage point is not enough.") return False destPathPrefix = storageBasePath + '/' + self.username self.logger.info("Starting data copy...") sp = subprocess.run(["rsync", "-av", "--no-links", srcPathPrefix + '/', destPathPrefix + '/'], capture_output = True) if(sp.returncode or sp.stderr): self.logger.error("FATAL: an error occurred while copying the data.") return False except Exception: self.logger.exception("FATAL: something went wrong during the execution phase.") return False else: self.logger.info("++++++++++ End of execution phase ++++++++++") return True def update(self, status): try: results = [{"target": ""}] self.dbConn.setResults(self.jobId, results) m = Mailer(self.logger) m.addRecipient(self.adminEmail) self.userEmail = self.dbConn.getUserEmail(self.userId) if self.userEmail != self.adminEmail: m.addRecipient(self.userEmail) if status == "OK": self.logger.info("Updating VOSpace nodes status...") for el in self.nodeList: nodeVOSPath = el[2] self.dbConn.setAsyncTrans(nodeVOSPath, True) self.dbConn.setJobId(nodeVOSPath, None) nodeListFile = os.path.join(self.resDir, "vos_data_report-" + self.jobId) try: nlfp = open(nodeListFile, "w") except IOError: self.logger.exception("Unable to generate the 'vos_data_report'.") else: nlfp.write(tabulate(self.nodeList, headers = [ "Timestamp", "OS path", "VOSpace path", "Node type", "Result"], tablefmt = "simple")) nlfp.close() self.jobObj.setPhase("COMPLETED") self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.logger.info("Job phase updated to COMPLETED.") msg = f""" ########## VOSpace data storage procedure summary ########## Job ID: {self.jobId} Job type {self.jobObj.type} Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} Processed nodes: {len(self.nodeList)} Stored nodes: {sum(res[-1] == 'DONE' for res in self.nodeList)} Skipped nodes: {sum(res[-1] == 'SKIPPED' for res in self.nodeList)} Symlinks detected: {sum(res[-1] == 'SYMLINK' for res in self.nodeList)} """ if len(self.nodeList) <= 10 ** 5: m.setMessageWithAttachment("VOSpace data storage notification: Job COMPLETED", msg, nodeListFile) else: info = f""" INFO: this operation involved a number of nodes greater than 10^5, you will find the results in {self.resDir}. """ msg += info m.setMessage("VOSpace data storage notification", msg) else: self.jobObj.setPhase("ERROR") self.jobObj.setErrorType("fatal") self.jobObj.setErrorMessage("FATAL: something went wrong during the execution phase.") self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.logger.info("Job phase updated to ERROR.") self.logger.info("Removing VOSpace nodes from the database...") self.dbConn.deleteNodesByJobId(self.jobId) self.logger.info("Database cleanup completed") msg = f""" ########## VOSpace data storage procedure summary ########## Job ID: {self.jobId} Job type {self.jobObj.type} Storage type: {self.storageType} Storage ID: {self.storageId} Creator ID: {self.userId} """ info = f""" ERROR: the job was terminated due to an error that occurred while copying the data on the storage point. This issue will be automatically reported to the administrator. """ msg += info m.setMessage("VOSpace data storage notification: Job ERROR", msg) # Send e-mail notification m.send() except Exception: self.logger.exception(f"FATAL: unable to update the database, job ID: {self.jobId}") finally: self.nodeList.clear() def cleanup(self): try: srcPathPrefix = self.storageStorePath.replace("{username}", self.username) self.logger.info(f"Cleanup of '{srcPathPrefix}'...") srcData = os.listdir(srcPathPrefix) for el in srcData: nodeOSPath = srcPathPrefix + '/' + el if os.path.isdir(nodeOSPath): shutil.rmtree(nodeOSPath) else: os.remove(nodeOSPath) userInfo = self.systemUtils.userInfo(self.username) uid = userInfo[1] gid = userInfo[2] os.chown(srcPathPrefix, uid, gid) os.chmod(srcPathPrefix, 0o755) except Exception: self.logger.exception(f"Unable to perform the cleanup of {srcPathPrefix}.") def run(self): self.logger.info("Starting store executor...") self.setSourceQueueName("write_ready") self.setDestinationQueueName("write_terminated") while True: self.wait() try: srcQueueLen = self.srcQueue.len() destQueueLen = self.destQueue.len() except Exception: self.logger.exception("Cache error: failed to retrieve queue length.") else: if srcQueueLen > 0: self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.username = self.jobObj.jobInfo["userName"] self.userId = self.jobObj.ownerId self.requestType = self.jobObj.jobInfo["requestType"] self.storageId = self.jobObj.jobInfo["storageId"] self.storageType = self.jobObj.jobInfo["storageType"] self.nodeList = self.jobObj.jobInfo["nodeList"].copy() self.dataSize = self.jobObj.jobInfo["dataSize"] if self.execute(): self.cleanup() self.update("OK") else: self.update("ERROR") try: if destQueueLen >= self.maxTerminatedJobs: self.destQueue.extractJob() self.destQueue.insertJob(self.jobObj) self.srcQueue.extractJob() except Exception: self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") else: self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'")