Skip to content
Snippets Groups Projects
Select Git revision
  • 21eaf4b594e87b54462c05daa3110b6afa60998a
  • master default protected
  • rocky_8
  • rocky_9
  • pasture
  • testing
  • query
  • v0.2.9
  • v0.2.8
  • v0.2.7
  • v0.2.6
  • v0.2.5
  • v0.2.4
  • v0.2.3
  • v0.2.2
  • v0.2.1
  • v0.2.0
  • v0.1.2
  • v0.1.1
  • v0.1
20 results

tape_client.py

Blame
  • tape_client.py 10.48 KiB
    #!/usr/bin/env python
    #
    # This file is part of vospace-transfer-service
    # Copyright (C) 2021 Istituto Nazionale di Astrofisica
    # SPDX-License-Identifier: GPL-3.0-or-later
    #
    
    import json
    import logging
    import os
    import paramiko
    import scp
    import sys
    
    from config import Config
    from exceptions import ScpInvalidFileException
    from exceptions import TapeClientException
    from redis_log_handler import RedisLogHandler
    from tape_pool import TapePool
    from tape_task import TapeTask
    
    
    class TapeClient(object):
        # 'eeadm' command location on the tape library frontend
        EEADM = "/opt/ibm/ltfsee/bin/eeadm"
        # destination for the files containing the lists of files to recall or migrate
        VOSPACE_WD = "/tmp/vospace"
    
        def __init__(self, host, port, user, keyFile, logger):
            self.host = host
            self.port = port
            self.user = user
            self.logger = logger
            self.client = paramiko.SSHClient()
            self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            self.keyFile = keyFile
            self.scp = None
            self.taskList = []
            self.poolList = []
    
        def connect(self):
            """Connects to the tape library frontend."""
            try:
                self.key = paramiko.RSAKey.from_private_key_file(self.keyFile)
                self.client.load_system_host_keys()
                self.client.connect(hostname = self.host,
                                    port = self.port,
                                    username = self.user,
                                    pkey = self.key)
            except Exception:
                self.logger.exception("Unable to establish SSH connection with tape library frontend.")
                raise
    
        def getPoolList(self):
            """Returns a list of 'TapePool' objects."""
            cmd = f"{self.EEADM} pool list --json"
            try:
                stdin, stdout, stderr = self.client.exec_command(cmd)
            except Exception:
                self.logger.exception(f"Unable to execute command: '{cmd}'")
                raise
            else:
                exitCode = stdout.channel.recv_exit_status()
                if not exitCode:
                    result = json.loads(stdout.readlines()[0].rstrip('\n'))
                    for el in result["payload"]:
                        pool = TapePool()
                        pool.id = el["id"]
                        pool.name = el["name"]
                        pool.mediaRestriction = el["media_restriction"]
                        pool.capacity = el["capacity"]
                        pool.usedSpace = el["used_space"]
                        pool.freeSpace = el["free_space"]
                        pool.reclaimableSpace = el["reclaimable_space"]
                        pool.activeSpace = el["active_space"]
                        pool.nonAppendableSpace = el["non_appendable_space"]
                        pool.numOfTapes = el["num_of_tapes"]
                        pool.formatClass = el["format_class"]
                        pool.libraryName = el["library_name"]
                        pool.libraryId = el["library_id"]
                        pool.nodeGroupName = el["nodegroup_name"]
                        pool.deviceType = el["device_type"]
                        pool.worm = el["worm"]
                        pool.fillPolicy = el["fill_policy"]
                        pool.owner = el["owner"]
                        pool.mountLimit = el["mount_limit"]
                        pool.lowSpaceWarningEnable = el["low_space_warning_enable"]
                        pool.lowSpaceWarningThreshold = el["low_space_warning_threshold"]
                        pool.noSpaceWarningEnable = el["no_space_warning_enable"]
                        pool.mode = el["mode"]
                        self.poolList.append(pool)
                    return self.poolList.copy()
                else:
                    raise TapeClientException(cmd, exitCode, stderr)
    
        def getTaskList(self):
            """Returns the whole task list."""
            cmd = f"{self.EEADM} task list --json"
            try:
                stdin, stdout, stderr = self.client.exec_command(cmd)
            except Exception:
                self.logger.exception(f"Unable to execute command: '{cmd}'")
                raise
            else:
                exitCode = stdout.channel.recv_exit_status()
                if not exitCode:
                    result = json.loads(stdout.readlines()[0].rstrip('\n'))
                    for el in result["payload"]:
                        task = TapeTask()
                        task.inUseTapes = el["inuse_tapes"]
                        task.inUsePools = el["inuse_pools"]
                        task.inUseNodeGroups = el["inuse_node_groups"]
                        task.inUseDrives = el["inuse_drives"]
                        task.cmdParam = el["cmd_param"]
                        task.result = el["result"]
                        task.status = el["status"]
                        task.completedTime = el["completed_time"]
                        task.startedTime = el["started_time"]
                        task.createdTime = el["created_time"]
                        task.setInUseLibs = el["inuse_libs"]
                        task.type = el["type"]
                        task.taskId = el["task_id"]
                        task.id = el["id"]
                        self.taskList.append(task)
                    return self.taskList.copy()
                else:
                    raise TapeClientException(cmd, exitCode, stderr)
    
        def copy(self, srcPath, destPath):
            """Copies files/dirs recursively by passing their absolute paths."""
            try:
                self.scp = scp.SCPClient(self.client.get_transport())
            except Exception:
                self.logger.error("Unable to get transport from SSH client.")
                raise
            else:
                self.logger.debug(f"Copying {srcPath} in {destPath}")
                if os.path.isdir(srcPath):
                    self.scp.put(srcPath, recursive = True, remote_path = destPath)
                elif os.path.isfile(srcPath):
                    self.scp.put(srcPath, destPath)
                else:
                    self.logger.error("FATAL: invalid file/dir.")
                    raise ScpInvalidFileException
            finally:
                self.scp.close()
    
        def migrate(self, fileList, tapePool, jobId):
            """
            Migrates to tape all files whose absolute path is
            contained in 'fileList'.
            A tape pool and a VOSpace jobId are also required
            as parameters.
            """
            self.logger.info(f"Starting MIGRATE operation (tape pool = '{tapePool}')...")
            migrateFileList = f"vos_migrate-{jobId}.lst"
            try:
                fp = open(migrateFileList, "a")
            except IOError:
                raise
            else:
                for f in fileList:
                    fp.write(f"{f}\n")
                fp.close()
                self.copy(f"./{migrateFileList}", f"{self.VOSPACE_WD}/{migrateFileList}")
                os.remove(f"./{migrateFileList}")
                cmd = f"{self.EEADM} migrate {self.VOSPACE_WD}/{migrateFileList} -p {tapePool} > /dev/null 2>&1"
                try:
                    stdin, stdout, stderr = self.client.exec_command(cmd)
                except Exception:
                    self.logger.exception(f"Unable to execute command: '{cmd}'")
                    raise
                else:
                    exitCode = stdout.channel.recv_exit_status()
                    if not exitCode:
                        self.logger.info("MIGRATE operation COMPLETED.")
                    else:
                        self.logger.error("MIGRATE operation FAILED.")
                        raise TapeClientException(cmd, exitCode, stderr)
                    return exitCode
    
        def recall(self, fileList, jobId):
            """
            Recalls from tape all files whose absolute path is
            contained in 'fileList'.
            A VOSpace job ID is also required as parameter.
            """
            self.logger.info("Starting RECALL operation...")
            recallFileList = f"vos_recall-{jobId}.lst"
            try:
                 fp = open(recallFileList, "a")
            except IOError:
                raise
            else:
                for f in fileList:
                    fp.write(f"{f}\n")
                fp.close()
                self.copy(f"./{recallFileList}", f"{self.VOSPACE_WD}/{recallFileList}")
                os.remove(f"./{recallFileList}")
                cmd = f"{self.EEADM} recall {self.VOSPACE_WD}/{recallFileList} > /dev/null 2>&1"
                try:
                    stdin, stdout, stderr = self.client.exec_command(cmd)
                except Exception:
                    self.logger.exception(f"Unable to execute command: '{cmd}'")
                    raise
                else:
                    exitCode = stdout.channel.recv_exit_status()
                    if not exitCode:
                        self.logger.info("RECALL operation COMPLETED.")
                    else:
                        self.logger.error("RECALL operation FAILED.")
                        raise TapeClientException(cmd, exitCode, stderr)
                    return exitCode
    
        def recallChecksumFiles(self, dirName):
            """
            Recursively recalls from tape all the checksum files related to
            the 'dirName' directory.
            """
            self.logger.info("Starting RECALL_CHECKSUM operation...")
            cmd = f"find $(dirname {dirName}) -type f \( -iname \"*-md5sum.txt\" \) | {self.EEADM} recall > /dev/null 2>&1"
            try:
                stdin, stdout, stderr = self.client.exec_command(cmd)
            except Exception:
                self.logger.exception(f"Unable to execute command: '{cmd}'")
                raise
            else:
                exitCode = stdout.channel.recv_exit_status()
                if not exitCode:
                    self.logger.info("RECALL_CHECKSUM operation COMPLETED.")
                else:
                    self.logger.error("RECALL_CHECKSUM operation FAILED.")
                    raise TapeClientException(cmd, exitCode, stderr)
                return exitCode
    
        def disconnect(self):
            """Performs a cleanup and closes the connection."""
            self.taskList.clear()
            self.poolList.clear()
            self.client.close()
    
        def getSize(self, fsMountPoint):
            """
            DEPRECATED
            """
            cmd = f"df {fsMountPoint} | tail -n +2"
            #out = subprocess.run(cmd, shell = True, capture_output = True)
            stdin, stdout, stderr = self.client.exec_command(cmd)
            #res = stdout.stdout.decode('UTF-8').rstrip('\n')
            res = stdout.readlines()[0]
            res = ' '.join(res.split()).split(' ')
            #print(res)
            total = res[1]
            used = res[2]
            available = res[3]
            return ( total, used, available)
    
    
    # Test
    #tc = TapeClient("192.168.56.101", 22, "root", "ibm")
    #tc.connect()
    #tc.copy("/home/curban/store/mydir", "/home/mydir")
    #tc.copy("/home/curban/store/foo2.txt", "/home/mydir/foo2.txt")
    #tl = tc.getTaskList()
    #fsSize = tc.getSize("/ia2_tape_stb_01")
    #print(fsSize)
    #tc.disconnect()
    #for i in tl:
    #    print(i.id)
    #    print('\n')