diff --git a/client/vos_storage b/client/vos_storage index 8d86b7a04342f4da6664fd83da9fad3bb54dc42b..613040403cb844640bb0cb3c3cb8769a16da3618 100644 --- a/client/vos_storage +++ b/client/vos_storage @@ -28,15 +28,37 @@ class VOSStorage(RedisRPCClient): storageType = None storageBasePath = None storageHostname = None + tapePool = None + tapePoolList = [] while storageType not in ("cold", "hot"): - try: - storageType = input("\nStorage type ['cold' or 'hot']: ") - except ValueError: - print("Input type is not valid!") - except EOFError: - print("\nPlease, use CTRL+C to quit.") - except KeyboardInterrupt: - sys.exit("\nCTRL+C detected. Exiting...") + try: + storageType = input("\nStorage type ['cold' or 'hot']: ") + except ValueError: + print("Input type is not valid!") + except EOFError: + print("\nPlease, use CTRL+C to quit.") + except KeyboardInterrupt: + sys.exit("\nCTRL+C detected. Exiting...") + if storageType == "cold": + storageRequest = { "requestType": "TAPE_POOL_LST" } + storageResponse = self.call(storageRequest) + if "responseType" not in storageResponse: + sys.exit("FATAL: Malformed response, storage acknowledge expected.\n") + elif storageResponse["responseType"] == "TAPE_POOL_LST_DONE": + tapePoolList = storageResponse["tapePoolList"] + while tapePool not in tapePoolList: + print("\nSelect one of the available tape pools:") + print("\n" + tabulate(tapePoolList, headers = ["tape_pool"], tablefmt = "pretty") + "\n") + try: + tapePool = input("Please, insert a tape pool: ") + except ValueError: + print("Input type is not valid!") + except EOFError: + print("\nPlease, use CTRL+C to quit.") + except KeyboardInterrupt: + sys.exit("\nCTRL+C detected. Exiting...") + else: + sys.exit("\nFATAL: Unknown response type.\n") while not storageBasePath: try: storageBasePath = input("\nStorage base path: ") @@ -58,7 +80,8 @@ class VOSStorage(RedisRPCClient): storageRequest = { "requestType": "STORAGE_ADD", "storageType": storageType, "basePath": storageBasePath, - "hostname": storageHostname } + "hostname": storageHostname, + "tapePool": tapePool } storageResponse = self.call(storageRequest) if "responseType" not in storageResponse: @@ -175,7 +198,7 @@ DESCRIPTION Supported storage types are: 'hot' and 'cold'. - + Adding 'hot' or 'cold' storage points requires a base path to be specified (e.g. '/mnt/my_storage/users'). diff --git a/transfer_service/config/vos_ts.conf.sample b/transfer_service/config/vos_ts.conf.sample index a4cc3bc26a92a3c6b05ce5bf18ccab8a3ab2a96c..bb0e4b0675e794001dac20aa4e4a7e605bf88444 100644 --- a/transfer_service/config/vos_ts.conf.sample +++ b/transfer_service/config/vos_ts.conf.sample @@ -40,8 +40,6 @@ port = 22 user = root ; SSH private key file absolute path pkey_file_path = /root/.ssh/tape_rsa -; tape pool name -tape_pool = pl_generic_rw_01 ############################ diff --git a/transfer_service/db_connector.py b/transfer_service/db_connector.py index fb7ded08038b846f68e601415a8ec7e074002608..50127d8cfd19258d4b8c3e990ca98a74068df363 100644 --- a/transfer_service/db_connector.py +++ b/transfer_service/db_connector.py @@ -104,7 +104,7 @@ class DbConnector(object): SELECT creator_id FROM node WHERE node_id = id_from_vos_path(%s); - """, + """, (vospacePath,)) result = cursor.fetchall() cursor.close() @@ -126,7 +126,7 @@ class DbConnector(object): SELECT unnest(group_read) as group_read FROM node WHERE node_id = id_from_vos_path(%s); - """, + """, (vospacePath,)) result = cursor.fetchall() cursor.close() @@ -150,7 +150,7 @@ class DbConnector(object): SELECT unnest(group_write) as group_write FROM node WHERE node_id = id_from_vos_path(%s); - """, + """, (vospacePath,)) result = cursor.fetchall() cursor.close() @@ -171,7 +171,7 @@ class DbConnector(object): conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute(""" - SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, '/' || fs_path AS os_path, content_length + SELECT storage_type, base_path, user_name, tstamp_wrapper_dir, '/' || fs_path AS os_path, content_length, tape_pool FROM node n JOIN location l ON n.location_id = l.location_id JOIN storage s ON s.storage_id = l.storage_src_id @@ -192,6 +192,7 @@ class DbConnector(object): tstampWrappedDir = result[0]["tstamp_wrapper_dir"] osPath = result[0]["os_path"] contentLength = result[0]["content_length"] + tapePool = result[0]["tape_pool"] if tstampWrappedDir is None: baseSrcPath = basePath + "/" + userName else: @@ -199,12 +200,13 @@ class DbConnector(object): fullPath = baseSrcPath + osPath fileInfo = { "baseSrcPath": baseSrcPath, - "fullPath": fullPath, - "storageType": storageType, - "username": userName, + "fullPath": fullPath, + "storageType": storageType, + "username": userName, "osPath": osPath, - "contentLength": contentLength - } + "contentLength": contentLength, + "tapePool": tapePool + } return fileInfo finally: self.connPool.putconn(conn, close = False) @@ -486,7 +488,7 @@ class DbConnector(object): return result finally: self.connPool.putconn(conn, close = False) - + def searchJobs(self, searchStr): "Performs a search on jobs." try: @@ -592,7 +594,7 @@ class DbConnector(object): return False finally: self.connPool.putconn(conn, close = False) - + def getUserEmail(self, userId): """Returns the user email address for a given user id.""" try: @@ -609,7 +611,7 @@ class DbConnector(object): return result[0]["e_mail"] finally: self.connPool.putconn(conn, close = False) - + def getUserList(self): """Returns the full user list.""" try: @@ -626,7 +628,7 @@ class DbConnector(object): return result finally: self.connPool.putconn(conn, close = False) - + def searchUsers(self, searchStr): "Performs a search on users." try: @@ -732,7 +734,7 @@ class DbConnector(object): self.connPool.putconn(conn, close = False) def getStorageType(self, basePath): - """Returns the storage type for a given storage base path, if any. Otherwise it returns 'False'.""" + """Returns the storage type for a given storage base path, if any. Otherwise it returns 'False'.""" try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) @@ -772,7 +774,7 @@ class DbConnector(object): self.connPool.putconn(conn, close = False) def getStorageHostname(self, storageId): - """Returns the storage hostname for a given storage id, if any. Otherwise it returns 'False'.""" + """Returns the storage hostname for a given storage id, if any. Otherwise it returns 'False'.""" try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) @@ -838,7 +840,7 @@ class DbConnector(object): ON CONFLICT (job_id) DO UPDATE SET (owner_id, - job_type, + job_type, phase, start_time, end_time, @@ -877,7 +879,7 @@ class DbConnector(object): raise finally: self.connPool.putconn(conn, close = False) - + def setStartTime(self, jobId): """Sets the job 'start_time' parameter.""" try: @@ -897,7 +899,7 @@ class DbConnector(object): raise finally: self.connPool.putconn(conn, close = False) - + def setEndTime(self, jobId): """Sets the job 'end_time' parameter.""" try: @@ -936,7 +938,7 @@ class DbConnector(object): raise finally: self.connPool.putconn(conn, close = False) - + def setTotalBlocks(self, jobId, totalBlocks): """ Sets the job 'total_blocks' parameter @@ -958,7 +960,7 @@ class DbConnector(object): raise finally: self.connPool.putconn(conn, close = False) - + def updateProcessedBlocks(self, jobId, processedBlocks): """ Updates the job 'processed_blocks' parameter @@ -1004,7 +1006,7 @@ class DbConnector(object): ##### Node ##### def insertNode(self, node): - """Inserts a VOSpace node. Returns 'True' on success, 'False' otherwise.""" + """Inserts a VOSpace node. Returns 'True' on success, 'False' otherwise.""" try: conn = self.getConnection() cursor = conn.cursor(cursor_factory = RealDictCursor) @@ -1018,7 +1020,7 @@ class DbConnector(object): if not conn.closed: conn.rollback() raise - else: + else: try: cursor.execute(""" INSERT INTO node(parent_path, @@ -1064,7 +1066,7 @@ class DbConnector(object): return False finally: self.connPool.putconn(conn, close = False) - + def deleteNodesByJobId(self, jobId): """Deletes all VOSpace nodes having a certain 'job_id'.""" try: @@ -1130,7 +1132,7 @@ class DbConnector(object): raise finally: self.connPool.putconn(conn, close = False) - + def setPhyDeletedOn(self, nodeId): """Sets the 'phy_deleted_on' flag for a VOSpace deleted node.""" try: @@ -1150,7 +1152,7 @@ class DbConnector(object): raise finally: self.connPool.putconn(conn, close = False) - + def updateGroupRead(self, groupToAdd, groupToRemove, nodeVOSPath): try: conn = self.getConnection() @@ -1172,7 +1174,7 @@ class DbConnector(object): raise finally: self.connPool.putconn(conn, close = False) - + def updateGroupWrite(self, groupToAdd, groupToRemove, nodeVOSPath): try: conn = self.getConnection() @@ -1197,7 +1199,7 @@ class DbConnector(object): ##### Storage ##### - def insertStorage(self, storageType, storageBasePath, storageHostname, vospaceUserBasePath): + def insertStorage(self, storageType, storageBasePath, storageHostname, vospaceUserBasePath, tapePool = None): """Inserts a storage point.""" if not self.getStorageId(storageBasePath): try: @@ -1206,8 +1208,9 @@ class DbConnector(object): cursor.execute(""" INSERT INTO storage(storage_type, base_path, - hostname) - VALUES (%s, %s, %s) + hostname, + tape_pool) + VALUES (%s, %s, %s, %s) RETURNING storage_id; """, (storageType, @@ -1301,10 +1304,10 @@ class DbConnector(object): self.connPool.putconn(conn, close = False) ##### Users ##### - + def insertUser(self, userId, username, email): """ - Inserts a user in the database. + Inserts a user in the database. Returns 'True' on success, 'False' otherwise. """ if not self.getUserName(userId): @@ -1344,7 +1347,7 @@ class DbConnector(object): userNodePath = "/" + username cursor.execute(""" SELECT count(*) > 1 AS child_nodes - FROM node + FROM node WHERE creator_id = %s AND path <@ text2ltree(id_from_vos_path(%s)::text); """, diff --git a/transfer_service/retrieve_executor.py b/transfer_service/retrieve_executor.py index b8552314242e78d4d4623b935d5ce033b6f09cf1..1d1407216f0ff2810baab1ef40e75dace339f8c1 100644 --- a/transfer_service/retrieve_executor.py +++ b/transfer_service/retrieve_executor.py @@ -39,12 +39,12 @@ from tape_client import TapeClient from task_executor import TaskExecutor -class RetrieveExecutor(TaskExecutor): +class RetrieveExecutor(TaskExecutor): def __init__(self): self.type = "retrieve_executor" self.systemUtils = SystemUtils() - config = Config("/etc/vos_ts/vos_ts.conf") + config = Config("/etc/vos_ts/vos_ts.conf") params = config.loadSection("transfer_node") self.storageRetrievePath = params["retrieve_path"] params = config.loadSection("transfer") @@ -72,12 +72,12 @@ class RetrieveExecutor(TaskExecutor): 1, self.logger) params = config.loadSection("spectrum_archive") - self.tapePool = params["tape_pool"] self.tapeClient = TapeClient(params["host"], params.getint("port"), params["user"], params["pkey_file_path"], self.logger) + self.tapePool = None self.storageType = None self.jobObj = None self.jobId = None @@ -88,7 +88,7 @@ class RetrieveExecutor(TaskExecutor): self.procBlocks = 0 self.totalSize = 0 super(RetrieveExecutor, self).__init__() - + def buildFileList(self): """ Generates the list of all files to retrieve. @@ -105,7 +105,7 @@ class RetrieveExecutor(TaskExecutor): else: self.logger.info("Job phase updated to EXECUTING.") self.logger.info("Building the list of the files to be retrieved...") - + # debug block... #if os.path.exists("nodeList.txt"): # os.remove("nodeList.txt") @@ -113,10 +113,13 @@ class RetrieveExecutor(TaskExecutor): #for vospacePath in self.nodeList: # nl.write(vospacePath + '\n') #nl.close() - + # Obtain the storage type try: - self.storageType = self.dbConn.getOSPath(self.nodeList[0])["storageType"] + #self.storageType = self.dbConn.getOSPath(self.nodeList[0])["storageType"] + fileInfo = self.dbConn.getOSPath(self.nodeList[0]) + self.storageType = fileInfo["storageType"] + self.tapePool = fileInfo["tapePool"] except Exception: self.logger.exception("FATAL: unable to obtain the storage type.") return False @@ -130,7 +133,7 @@ class RetrieveExecutor(TaskExecutor): srcPath = nodeInfo["fullPath"] username = nodeInfo["username"] md5calc = Checksum() - if os.path.isdir(srcPath) and not os.path.islink(srcPath): + if os.path.isdir(srcPath) and not os.path.islink(srcPath): for root, dirs, files in os.walk(srcPath, topdown = False): #dirSize = os.stat(root).st_size #self.totalSize += dirSize @@ -143,8 +146,8 @@ class RetrieveExecutor(TaskExecutor): "baseSrcPath": baseSrcPath, "fullPath": fullPath, "username": username, - "fileSize": fileSize, - "vospaceRootParent": vospacePath + "fileSize": fileSize, + "vospaceRootParent": vospacePath } self.totalSize += fileSize self.fileList.append(fileInfo.copy()) @@ -161,7 +164,7 @@ class RetrieveExecutor(TaskExecutor): self.totalSize += fileSize self.fileList.append(fileInfo.copy()) self.logger.info(f"Total size of files to retrieve: {self.totalSize} B") - # debug block... + # debug block... #if os.path.exists("fileList.txt"): # os.remove("fileList.txt") #fl = open("fileList.txt", 'w') @@ -172,7 +175,7 @@ class RetrieveExecutor(TaskExecutor): return False else: return True - + def buildBlocks(self): """ Algorithm to split data in blocks of a well known size. @@ -188,7 +191,7 @@ class RetrieveExecutor(TaskExecutor): #self.totalSize += fileSize # check if the file is larger than a block size if fileSize > self.maxBlockSize: - # if the current block is not empty, "close" it, otherwise + # if the current block is not empty, "close" it, otherwise # use it and then create a new block if blockSize > 0: blockIdx += 1 @@ -198,7 +201,7 @@ class RetrieveExecutor(TaskExecutor): fileInfo["blockIdx"] = blockIdx blockIdx += 1 blockSize = 0 - else: + else: # the file can be contained by a block, so check if # the file size plus the current block fill is lower # than the maximum block size @@ -221,8 +224,8 @@ class RetrieveExecutor(TaskExecutor): except Exception: self.logger.exception("FATAL: unable to set the total number of blocks in the database.") return False - - # debug block... + + # debug block... #print(f"numBlocks = {self.numBlocks}") #if os.path.exists("blocks.txt"): # os.remove("blocks.txt") @@ -234,14 +237,14 @@ class RetrieveExecutor(TaskExecutor): return False else: return True - + def retrieveCompleted(self, vospacePath): """ - Returns 'True' if all data associated to 'vospacePath' + Returns 'True' if all data associated to 'vospacePath' has been copied, otherwise it returns 'False'. """ return not any(vospacePath in f["vospaceRootParent"] for f in self.fileList) - + def retrieveData(self): """ Retrieves data from a generic storage point (hot or cold). @@ -251,14 +254,14 @@ class RetrieveExecutor(TaskExecutor): # Loop on blocks for blockIdx in range(self.numBlocks): blockFileList = [ f for f in self.fileList if f["blockIdx"] == blockIdx ] - + # Recall all files from tape library to tape frontend # if the storage type is 'cold' if self.storageType == "cold": self.tapeClient.connect() self.tapeClient.recall([ f["fullPath"] for f in blockFileList ], self.jobId) self.tapeClient.disconnect() - + # Loop on files in a block for fileInfo in blockFileList: srcPath = fileInfo["fullPath"] @@ -283,12 +286,12 @@ class RetrieveExecutor(TaskExecutor): except Exception: self.logger.error(f"FATAL: error while creating symlink for target '{srcPath}'") return False - - # Remove files from file list at the end of the copy + + # Remove files from file list at the end of the copy for fileInfo in blockFileList: if fileInfo in self.fileList: - self.fileList.remove(fileInfo) - + self.fileList.remove(fileInfo) + # Check if the copy related to a certain VOSpace node # is completed and recursively update the 'async_trans' # flag @@ -299,14 +302,14 @@ class RetrieveExecutor(TaskExecutor): except Exception: self.logger.exception("FATAL: unable to update the file catalog.") return False - + # Empty the tape library frontend if the storage type # is 'cold' if self.storageType == "cold": self.tapeClient.connect() self.tapeClient.migrate([ f["fullPath"] for f in blockFileList ], self.tapePool, self.jobId) self.tapeClient.disconnect() - + blockFileList.clear() self.procBlocks += 1 self.dbConn.updateProcessedBlocks(self.jobId, self.procBlocks) @@ -315,7 +318,7 @@ class RetrieveExecutor(TaskExecutor): return False else: return True - + def execute(self): success = True self.logger.info("++++++++++ Start of execution phase ++++++++++") @@ -342,7 +345,7 @@ class RetrieveExecutor(TaskExecutor): m.addRecipient(userEmail) self.jobObj.setResults(results) - + # Add a list of physical destination paths for each VOSpace node in the node list self.logger.info("Generating physical destination paths for VOSpace nodes...") for vospacePath in self.nodeList: @@ -360,10 +363,10 @@ class RetrieveExecutor(TaskExecutor): self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) self.logger.info("Job phase updated to COMPLETED.") - + msg = f""" ########## VOSpace data retrieval procedure summary ########## - + Dear user, your job has been COMPLETED. @@ -381,11 +384,11 @@ class RetrieveExecutor(TaskExecutor): self.jobObj.setErrorMessage("FATAL: something went wrong during the execution phase.") self.jobObj.setEndTime(datetime.datetime.now().isoformat()) self.dbConn.insertJob(self.jobObj) - self.logger.info("Job phase updated to ERROR.") - + self.logger.info("Job phase updated to ERROR.") + msg = f""" ########## VOSpace data retrieval procedure summary ########## - + Dear user, your job has FAILED. @@ -394,20 +397,20 @@ class RetrieveExecutor(TaskExecutor): Owner ID: {self.jobObj.ownerId} """ info = f""" - ERROR: - the job was terminated due to an error that occurred + ERROR: + the job was terminated due to an error that occurred while retrieveing the data from the storage point. - + This issue will be automatically reported to the administrator. - + """ msg += info m.setMessage("VOSpace data retrieve notification: ERROR", msg) - # Send e-mail notification + # Send e-mail notification m.send() except Exception: self.logger.exception(f"FATAL: unable to update the database, job ID: {self.jobId}") - + def cleanup(self): """ Cleanup method. @@ -435,11 +438,11 @@ class RetrieveExecutor(TaskExecutor): else: if srcQueueLen > 0: self.jobObj = self.srcQueue.getJob() - self.jobId = self.jobObj.jobId + self.jobId = self.jobObj.jobId self.nodeList = self.jobObj.nodeList.copy() if self.execute(): self.update("OK") - + # debug block... #print(f"fileList = {self.fileList}") #print(f"nodeList = {self.nodeList}") @@ -454,4 +457,4 @@ class RetrieveExecutor(TaskExecutor): except Exception: self.logger.exception(f"Failed to move job {self.jobObj.jobId} from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") else: - self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") + self.logger.info(f"Job {self.jobObj.jobId} MOVED from '{self.srcQueue.name()}' to '{self.destQueue.name()}'") diff --git a/transfer_service/storage_rpc_server.py b/transfer_service/storage_rpc_server.py index 2c91f3ceb36cf0fc7253f347152ff44efd8397ff..424f3036579e7738c1ca52f27ab4fb1431df004a 100644 --- a/transfer_service/storage_rpc_server.py +++ b/transfer_service/storage_rpc_server.py @@ -12,6 +12,7 @@ from redis_log_handler import RedisLogHandler from redis_rpc_server import RedisRPCServer from config import Config from db_connector import DbConnector +from tape_client import TapeClient class StorageRPCServer(RedisRPCServer): @@ -39,6 +40,12 @@ class StorageRPCServer(RedisRPCServer): 1, 2, self.logger) + params = config.loadSection("spectrum_archive") + self.tapeClient = TapeClient(params["host"], + params.getint("port"), + params["user"], + params["pkey_file_path"], + self.logger) super(StorageRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): @@ -126,12 +133,26 @@ class StorageRPCServer(RedisRPCServer): else: response = { "responseType": "STORAGE_LST_DONE", "storageList": result } - + elif requestBody["requestType"] == "TAPE_POOL_LST": + try: + self.tapeClient.connect() + tapePools = self.tapeClient.getPoolList() + self.tapeClient.disconnect() + tapePoolList = [ p.getName() for p in tapePools ] + except Exception: + errorMsg = "Unable to get tape pool list." + self.logger.exception(errorMsg) + response = { "responseType": "ERROR", + "errorCode": 6, + "errorMsg": errorMsg } + else: + response = { "responseType": "TAPE_POOL_LST_DONE", + "tapePoolList": tapePoolList } else: errorMsg = "Unkown request type." self.logger.error(errorMsg) response = { "responseType": "ERROR", - "errorCode": 6, + "errorCode": 7, "errorMsg": errorMsg } return response