#!/usr/bin/env python import json import logging import os import paramiko import scp import sys import uuid from config import Config from exceptions import ScpInvalidFileException from exceptions import TapeClientException from redis_log_handler import RedisLogHandler from tape_pool import TapePool from tape_task import TapeTask class TapeClient(object): # 'eeadm' command location on the tape library frontend EEADM = "/opt/ibm/ltfsee/bin/eeadm" def __init__(self, host, port, user, keyFile, logger): self.host = host self.port = port self.user = user self.logger = logger self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.keyFile = keyFile self.scp = None self.taskList = [] self.poolList = [] def connect(self): """Connects to the tape library frontend.""" try: self.key = paramiko.RSAKey.from_private_key_file(self.keyFile) self.client.load_system_host_keys() self.client.connect(hostname = self.host, port = self.port, username = self.user, pkey = self.key) except Exception: self.logger.exception("Unable to establish SSH connection with tape library frontend.") raise def getPoolList(self): """Returns a list of 'TapePool' objects.""" cmd = f"{self.EEADM} pool list --json" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception("Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: result = json.loads(stdout.readlines()[0].rstrip('\n')) for el in result["payload"]: pool = TapePool() pool.id = el["id"] pool.name = el["name"] pool.mediaRestriction = el["media_restriction"] pool.capacity = el["capacity"] pool.usedSpace = el["used_space"] pool.freeSpace = el["free_space"] pool.reclaimableSpace = el["reclaimable_space"] pool.activeSpace = el["active_space"] pool.nonAppendableSpace = el["non_appendable_space"] pool.numOfTapes = el["num_of_tapes"] pool.formatClass = el["format_class"] pool.libraryName = el["library_name"] pool.libraryId = el["library_id"] pool.nodeGroupName = el["nodegroup_name"] pool.deviceType = el["device_type"] pool.worm = el["worm"] pool.fillPolicy = el["fill_policy"] pool.owner = el["owner"] pool.mountLimit = el["mount_limit"] pool.lowSpaceWarningEnable = el["low_space_warning_enable"] pool.lowSpaceWarningThreshold = el["low_space_warning_threshold"] pool.noSpaceWarningEnable = el["no_space_warning_enable"] pool.mode = el["mode"] self.poolList.append(pool) return self.poolList.copy() else: raise TapeClientException(cmd, exitCode, stderr) def getTaskList(self): """Returns the whole task list.""" cmd = f"{self.EEADM} task list --json" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception("Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: result = json.loads(stdout.readlines()[0].rstrip('\n')) for el in result["payload"]: task = TapeTask() task.inUseTapes = el["inuse_tapes"] task.inUsePools = el["inuse_pools"] task.inUseNodeGroups = el["inuse_node_groups"] task.inUseDrives = el["inuse_drives"] task.cmdParam = el["cmd_param"] task.result = el["result"] task.status = el["status"] task.completedTime = el["completed_time"] task.startedTime = el["started_time"] task.createdTime = el["created_time"] task.setInUseLibs = el["inuse_libs"] task.type = el["type"] task.taskId = el["task_id"] task.id = el["id"] self.taskList.append(task) return self.taskList.copy() else: raise TapeClientException(cmd, exitCode, stderr) def copy(self, srcPath, destPath): """Copies files/dirs recursively by passing their absolute paths.""" try: self.scp = scp.SCPClient(self.client.get_transport()) except Exception: self.logger.error("Unable to get transport from SSH client.") raise else: self.logger.debug(f"Copying {srcPath} in {destPath}") if os.path.isdir(srcPath): self.scp.put(srcPath, recursive = True, remote_path = destPath) elif os.path.isfile(srcPath): self.scp.put(srcPath, destPath) else: self.logger.error("FATAL: invalid file/dir.") raise ScpInvalidFileException finally: self.scp.close() def migrate(self, fileList, tapePool): """ Migrates to tape all files whose absolute path is contained in 'fileList'. """ self.logger.info("Starting MIGRATE operation...") tmp = str(uuid.uuid1().hex) + "-vos_migrate.tmp" try: fp = open(tmp, "a") except IOError: raise else: for f in fileList: fp.write(f"{f}\n") fp.close() self.copy(f"./{tmp}", f"/tmp/{tmp}") os.remove(f"./{tmp}") cmd = f"{self.EEADM} migrate /tmp/{tmp} -p {tapePool}" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception("Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: self.logger.info("MIGRATE operation COMPLETED.") else: self.logger.error("MIGRATE operation FAILED.") raise TapeClientException(cmd, exitCode, stderr) return exitCode def recall(self, fileList): """ Recalls from tape all files whose absolute path is contained in 'fileList'. """ self.logger.info("Starting RECALL operation...") tmp = str(uuid.uuid1().hex) + "-vos_recall.tmp" try: fp = open(tmp, "a") except IOError: raise else: for f in fileList: fp.write(f"{f}\n") fp.close() self.copy(f"./{tmp}", f"/tmp/{tmp}") os.remove(f"./{tmp}") cmd = f"{self.EEADM} recall /tmp/{tmp}" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception("Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: self.logger.info("RECALL operation COMPLETED.") else: self.logger.error("RECALL operation FAILED.") raise TapeClientException(cmd, exitCode, stderr) return exitCode def recallChecksumFiles(self, dirName): """ Recursively recalls from tape all the checksum files related to the 'dirName' directory. """ self.logger.info("Starting RECALL_CHECKSUM operation...") cmd = f"find $(dirname {dirName}) -type f \( -iname \"*-md5sum.txt\" \) | {self.EEADM} recall" try: stdin, stdout, stderr = self.client.exec_command(cmd) except Exception: self.logger.exception("Unable to execute command: '{cmd}'") raise else: exitCode = stdout.channel.recv_exit_status() if not exitCode: self.logger.info("RECALL_CHECKSUM operation COMPLETED.") else: self.logger.error("RECALL_CHECKSUM operation FAILED.") raise TapeClientException(cmd, exitCode, stderr) return exitCode def disconnect(self): """Performs a cleanup and closes the connection.""" self.taskList.clear() self.poolList.clear() self.client.close() def getSize(self, fsMountPoint): """ DEPRECATED """ cmd = f"df {fsMountPoint} | tail -n +2" #out = subprocess.run(cmd, shell = True, capture_output = True) stdin, stdout, stderr = self.client.exec_command(cmd) #res = stdout.stdout.decode('UTF-8').rstrip('\n') res = stdout.readlines()[0] res = ' '.join(res.split()).split(' ') #print(res) total = res[1] used = res[2] available = res[3] return ( total, used, available) # Test #tc = TapeClient("192.168.56.101", 22, "root", "ibm") #tc.connect() #tc.copy("/home/curban/store/mydir", "/home/mydir") #tc.copy("/home/curban/store/foo2.txt", "/home/mydir/foo2.txt") #tl = tc.getTaskList() #fsSize = tc.getSize("/ia2_tape_stb_01") #print(fsSize) #tc.disconnect() #for i in tl: # print(i.id) # print('\n')