Select Git revision
tape_client.py
-
Cristiano Urban authored
Signed-off-by:
Cristiano Urban <cristiano.urban@inaf.it>
Cristiano Urban authoredSigned-off-by:
Cristiano Urban <cristiano.urban@inaf.it>
tape_client.py 10.48 KiB
#!/usr/bin/env python
#
# This file is part of vospace-transfer-service
# Copyright (C) 2021 Istituto Nazionale di Astrofisica
# SPDX-License-Identifier: GPL-3.0-or-later
#
import json
import logging
import os
import paramiko
import scp
import sys
from config import Config
from exceptions import ScpInvalidFileException
from exceptions import TapeClientException
from redis_log_handler import RedisLogHandler
from tape_pool import TapePool
from tape_task import TapeTask
class TapeClient(object):
# 'eeadm' command location on the tape library frontend
EEADM = "/opt/ibm/ltfsee/bin/eeadm"
# destination for the files containing the lists of files to recall or migrate
VOSPACE_WD = "/tmp/vospace"
def __init__(self, host, port, user, keyFile, logger):
self.host = host
self.port = port
self.user = user
self.logger = logger
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.keyFile = keyFile
self.scp = None
self.taskList = []
self.poolList = []
def connect(self):
"""Connects to the tape library frontend."""
try:
self.key = paramiko.RSAKey.from_private_key_file(self.keyFile)
self.client.load_system_host_keys()
self.client.connect(hostname = self.host,
port = self.port,
username = self.user,
pkey = self.key)
except Exception:
self.logger.exception("Unable to establish SSH connection with tape library frontend.")
raise
def getPoolList(self):
"""Returns a list of 'TapePool' objects."""
cmd = f"{self.EEADM} pool list --json"
try:
stdin, stdout, stderr = self.client.exec_command(cmd)
except Exception:
self.logger.exception(f"Unable to execute command: '{cmd}'")
raise
else:
exitCode = stdout.channel.recv_exit_status()
if not exitCode:
result = json.loads(stdout.readlines()[0].rstrip('\n'))
for el in result["payload"]:
pool = TapePool()
pool.id = el["id"]
pool.name = el["name"]
pool.mediaRestriction = el["media_restriction"]
pool.capacity = el["capacity"]
pool.usedSpace = el["used_space"]
pool.freeSpace = el["free_space"]
pool.reclaimableSpace = el["reclaimable_space"]
pool.activeSpace = el["active_space"]
pool.nonAppendableSpace = el["non_appendable_space"]
pool.numOfTapes = el["num_of_tapes"]
pool.formatClass = el["format_class"]
pool.libraryName = el["library_name"]
pool.libraryId = el["library_id"]
pool.nodeGroupName = el["nodegroup_name"]
pool.deviceType = el["device_type"]
pool.worm = el["worm"]
pool.fillPolicy = el["fill_policy"]
pool.owner = el["owner"]
pool.mountLimit = el["mount_limit"]
pool.lowSpaceWarningEnable = el["low_space_warning_enable"]
pool.lowSpaceWarningThreshold = el["low_space_warning_threshold"]
pool.noSpaceWarningEnable = el["no_space_warning_enable"]
pool.mode = el["mode"]
self.poolList.append(pool)
return self.poolList.copy()
else:
raise TapeClientException(cmd, exitCode, stderr)
def getTaskList(self):
"""Returns the whole task list."""
cmd = f"{self.EEADM} task list --json"
try:
stdin, stdout, stderr = self.client.exec_command(cmd)
except Exception:
self.logger.exception(f"Unable to execute command: '{cmd}'")
raise
else:
exitCode = stdout.channel.recv_exit_status()
if not exitCode:
result = json.loads(stdout.readlines()[0].rstrip('\n'))
for el in result["payload"]:
task = TapeTask()
task.inUseTapes = el["inuse_tapes"]
task.inUsePools = el["inuse_pools"]
task.inUseNodeGroups = el["inuse_node_groups"]
task.inUseDrives = el["inuse_drives"]
task.cmdParam = el["cmd_param"]
task.result = el["result"]
task.status = el["status"]
task.completedTime = el["completed_time"]
task.startedTime = el["started_time"]
task.createdTime = el["created_time"]
task.setInUseLibs = el["inuse_libs"]
task.type = el["type"]
task.taskId = el["task_id"]
task.id = el["id"]
self.taskList.append(task)
return self.taskList.copy()
else:
raise TapeClientException(cmd, exitCode, stderr)
def copy(self, srcPath, destPath):
"""Copies files/dirs recursively by passing their absolute paths."""
try:
self.scp = scp.SCPClient(self.client.get_transport())
except Exception:
self.logger.error("Unable to get transport from SSH client.")
raise
else:
self.logger.debug(f"Copying {srcPath} in {destPath}")
if os.path.isdir(srcPath):
self.scp.put(srcPath, recursive = True, remote_path = destPath)
elif os.path.isfile(srcPath):
self.scp.put(srcPath, destPath)
else:
self.logger.error("FATAL: invalid file/dir.")
raise ScpInvalidFileException
finally:
self.scp.close()
def migrate(self, fileList, tapePool, jobId):
"""
Migrates to tape all files whose absolute path is
contained in 'fileList'.
A tape pool and a VOSpace jobId are also required
as parameters.
"""
self.logger.info(f"Starting MIGRATE operation (tape pool = '{tapePool}')...")
migrateFileList = f"vos_migrate-{jobId}.lst"
try:
fp = open(migrateFileList, "a")
except IOError:
raise
else:
for f in fileList:
fp.write(f"{f}\n")
fp.close()
self.copy(f"./{migrateFileList}", f"{self.VOSPACE_WD}/{migrateFileList}")
os.remove(f"./{migrateFileList}")
cmd = f"{self.EEADM} migrate {self.VOSPACE_WD}/{migrateFileList} -p {tapePool} > /dev/null 2>&1"
try:
stdin, stdout, stderr = self.client.exec_command(cmd)
except Exception:
self.logger.exception(f"Unable to execute command: '{cmd}'")
raise
else:
exitCode = stdout.channel.recv_exit_status()
if not exitCode:
self.logger.info("MIGRATE operation COMPLETED.")
else:
self.logger.error("MIGRATE operation FAILED.")
raise TapeClientException(cmd, exitCode, stderr)
return exitCode
def recall(self, fileList, jobId):
"""
Recalls from tape all files whose absolute path is
contained in 'fileList'.
A VOSpace job ID is also required as parameter.
"""
self.logger.info("Starting RECALL operation...")
recallFileList = f"vos_recall-{jobId}.lst"
try:
fp = open(recallFileList, "a")
except IOError:
raise
else:
for f in fileList:
fp.write(f"{f}\n")
fp.close()
self.copy(f"./{recallFileList}", f"{self.VOSPACE_WD}/{recallFileList}")
os.remove(f"./{recallFileList}")
cmd = f"{self.EEADM} recall {self.VOSPACE_WD}/{recallFileList} > /dev/null 2>&1"
try:
stdin, stdout, stderr = self.client.exec_command(cmd)
except Exception:
self.logger.exception(f"Unable to execute command: '{cmd}'")
raise
else:
exitCode = stdout.channel.recv_exit_status()
if not exitCode:
self.logger.info("RECALL operation COMPLETED.")
else:
self.logger.error("RECALL operation FAILED.")
raise TapeClientException(cmd, exitCode, stderr)
return exitCode
def recallChecksumFiles(self, dirName):
"""
Recursively recalls from tape all the checksum files related to
the 'dirName' directory.
"""
self.logger.info("Starting RECALL_CHECKSUM operation...")
cmd = f"find $(dirname {dirName}) -type f \( -iname \"*-md5sum.txt\" \) | {self.EEADM} recall > /dev/null 2>&1"
try:
stdin, stdout, stderr = self.client.exec_command(cmd)
except Exception:
self.logger.exception(f"Unable to execute command: '{cmd}'")
raise
else:
exitCode = stdout.channel.recv_exit_status()
if not exitCode:
self.logger.info("RECALL_CHECKSUM operation COMPLETED.")
else:
self.logger.error("RECALL_CHECKSUM operation FAILED.")
raise TapeClientException(cmd, exitCode, stderr)
return exitCode
def disconnect(self):
"""Performs a cleanup and closes the connection."""
self.taskList.clear()
self.poolList.clear()
self.client.close()
def getSize(self, fsMountPoint):
"""
DEPRECATED
"""
cmd = f"df {fsMountPoint} | tail -n +2"
#out = subprocess.run(cmd, shell = True, capture_output = True)
stdin, stdout, stderr = self.client.exec_command(cmd)
#res = stdout.stdout.decode('UTF-8').rstrip('\n')
res = stdout.readlines()[0]
res = ' '.join(res.split()).split(' ')
#print(res)
total = res[1]
used = res[2]
available = res[3]
return ( total, used, available)
# Test
#tc = TapeClient("192.168.56.101", 22, "root", "ibm")
#tc.connect()
#tc.copy("/home/curban/store/mydir", "/home/mydir")
#tc.copy("/home/curban/store/foo2.txt", "/home/mydir/foo2.txt")
#tl = tc.getTaskList()
#fsSize = tc.getSize("/ia2_tape_stb_01")
#print(fsSize)
#tc.disconnect()
#for i in tl:
# print(i.id)
# print('\n')