diff --git a/transfer_service/db_connector.py b/transfer_service/db_connector.py index 824cdae9955951de6b3bafee3d16e09f8c5f5320..53b4346b025a86b0b8c1f77ee4cf88ffddbdbaee 100644 --- a/transfer_service/db_connector.py +++ b/transfer_service/db_connector.py @@ -58,14 +58,15 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM node_vos_path WHERE vos_path = %s;", (vospacePath,)) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - print(e) - if result: - return True - else: - return False + raise + else: + if result: + return True + else: + return False def getCreatorId(self, vospacePath): """Returns the creator ID for a given vospace path representing a node.""" @@ -80,11 +81,12 @@ class DbConnector(object): """, (vospacePath,)) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - print(e) - return result[0]["creator_id"] + raise + else: + return result[0]["creator_id"] def getGroupRead(self, vospacePath): """Returns the 'group_read' for a given VOSpace path representing a node.""" @@ -99,13 +101,14 @@ class DbConnector(object): """, (vospacePath,)) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - print(e) - for i in range(0, len(result)): - result[i]["group_read"] = result[i]["group_read"].split("people.")[-1].replace("\\", "") - return result + raise + else: + for i in range(0, len(result)): + result[i]["group_read"] = result[i]["group_read"].split("people.")[-1].replace("\\", "") + return result def getGroupWrite(self, vospacePath): """Returns the 'group_write' for a given VOSpace path representing a node.""" @@ -120,13 +123,14 @@ class DbConnector(object): """, (vospacePath,)) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - print(e) - for i in range(0, len(result)): - result[i]["group_write"] = result[i]["group_write"].split("people.")[-1].replace("\\", "") - return result + raise + else: + for i in range(0, len(result)): + result[i]["group_write"] = result[i]["group_write"].split("people.")[-1].replace("\\", "") + return result def getOSPath(self, vospacePath): """Returns a list containing full path, storage type and username for a VOSpace path.""" @@ -143,30 +147,31 @@ class DbConnector(object): """, (vospacePath,)) result = cursor.fetchall() - except Exception as e: - if not conn.closed: - conn.rollback() - print(e) - storageType = result[0]["storage_type"] - basePath = result[0]["base_path"] - userName = result[0]["user_name"] - tstampWrappedDir = result[0]["tstamp_wrapper_dir"] - osPath = result[0]["os_path"] - contentLength = result[0]["content_length"] - if tstampWrappedDir is None: - baseSrcPath = basePath + "/" + userName - else: - baseSrcPath = basePath + "/" + userName + "/" + tstampWrappedDir - fullPath = baseSrcPath + osPath - fileInfo = { - "baseSrcPath": baseSrcPath, - "fullPath": fullPath, - "storageType": storageType, - "username": userName, - "osPath": osPath, - "contentLength": contentLength - } - return fileInfo + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + storageType = result[0]["storage_type"] + basePath = result[0]["base_path"] + userName = result[0]["user_name"] + tstampWrappedDir = result[0]["tstamp_wrapper_dir"] + osPath = result[0]["os_path"] + contentLength = result[0]["content_length"] + if tstampWrappedDir is None: + baseSrcPath = basePath + "/" + userName + else: + baseSrcPath = basePath + "/" + userName + "/" + tstampWrappedDir + fullPath = baseSrcPath + osPath + fileInfo = { + "baseSrcPath": baseSrcPath, + "fullPath": fullPath, + "storageType": storageType, + "username": userName, + "osPath": osPath, + "contentLength": contentLength + } + return fileInfo def getVOSpacePathList(self, vospacePath): """Returns the list of VOSpace paths carried by a VOSpace node, according to the node VOSpace path.""" @@ -181,14 +186,15 @@ class DbConnector(object): """, (vospacePath,)) results = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - print(e) - vospacePathList = [] - for el in results: - vospacePathList.append(el["vos_path"]) - return vospacePathList + raise + else: + vospacePathList = [] + for el in results: + vospacePathList.append(el["vos_path"]) + return vospacePathList def getNodesToBeDeleted(self): "Returns a path list of files to be deleted with also the corresponding deletion timestamp." @@ -221,11 +227,12 @@ class DbConnector(object): JOIN storage s ON s.storage_id = l.storage_src_id; """) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - print(e) - return result + raise + else: + return result def nodeIsBusy(self, vospacePath): """Returns 'True' if the VOSpace node is busy, 'False' otherwise.""" @@ -239,14 +246,15 @@ class DbConnector(object): """, (vospacePath,)) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - print(e) - if result[0]["job_id"]: - return True - else: - return False + raise + else: + if result[0]["job_id"]: + return True + else: + return False ##### Job ##### @@ -257,14 +265,15 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - print(e) - if result: - return True - else: - return False + raise + else: + if result: + return True + else: + return False def getJob(self, jobId): """Returns a JSON object containing job information, according to the job id.""" @@ -276,24 +285,26 @@ class DbConnector(object): result = cursor.fetchall() #out.write(f"result: {result}\n\n") #out.close() - except Exception as e: - if not conn.closed: - conn.rollback() - if not result: - return json.loads('{ "error": "JOB_NOT_FOUND" }') - else: - job = dict() - for idx in result[0]: - oldIdx = idx - idxTokens = idx.split('_') - idx = idxTokens[0] + ''.join(token.title() for token in idxTokens[1:]) - job[idx] = result[0][oldIdx] - el = job[idx] - if isinstance(el, datetime.datetime): - job[idx] = el.isoformat() - #out.write(f"job: {job}\n\n") - #out.close() - return job + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + if not result: + return json.loads('{ "error": "JOB_NOT_FOUND" }') + else: + job = dict() + for idx in result[0]: + oldIdx = idx + idxTokens = idx.split('_') + idx = idxTokens[0] + ''.join(token.title() for token in idxTokens[1:]) + job[idx] = result[0][oldIdx] + el = job[idx] + if isinstance(el, datetime.datetime): + job[idx] = el.isoformat() + #out.write(f"job: {job}\n\n") + #out.close() + return job def getJobInfo(self, jobId): """Returns the 'job_info' field, according to the UWS specification.""" @@ -302,10 +313,12 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT job_info FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - return result[0]["job_info"] + raise + else: + return result[0]["job_info"] def getJobResults(self, jobId): """Returns the 'results' field, according to the UWS specification.""" @@ -314,10 +327,12 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT results FROM job WHERE job_id = %s;", (jobId,)) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - return result[0]["results"] + raise + else: + return result[0]["results"] def listActiveJobs(self): """Returns some info about active jobs.""" @@ -339,15 +354,17 @@ class DbConnector(object): ORDER BY creation_time DESC; """) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - for row in result: - for idx in row: - el = row[idx] - if isinstance(el, datetime.datetime): - row[idx] = el.isoformat() - return result + raise + else: + for row in result: + for idx in row: + el = row[idx] + if isinstance(el, datetime.datetime): + row[idx] = el.isoformat() + return result def listJobsByPhase(self, phase): """Returns some info about jobs according to the phase.""" @@ -381,15 +398,17 @@ class DbConnector(object): """, (phase,)) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - for row in result: - for idx in row: - el = row[idx] - if isinstance(el, datetime.datetime): - row[idx] = el.isoformat() - return result + raise + else: + for row in result: + for idx in row: + el = row[idx] + if isinstance(el, datetime.datetime): + row[idx] = el.isoformat() + return result ##### User ##### @@ -400,13 +419,15 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM users WHERE user_name = %s;", (username,)) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - if result: - return True - else: - return False + raise + else: + if result: + return True + else: + return False def getUserId(self, username): """Returns the user id for a given user name.""" @@ -415,10 +436,12 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT user_id FROM users WHERE user_name = %s;", (username,)) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - return result[0]["user_id"] + raise + else: + return result[0]["user_id"] def getUserName(self, userId): """Returns the user name for a given user id.""" @@ -427,10 +450,12 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT user_name FROM users WHERE user_id = %s;", (userId,)) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - return result[0]["user_name"] + raise + else: + return result[0]["user_name"] def getUserEmail(self, userId): """Returns the user email address for a given user id.""" @@ -439,10 +464,12 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT e_mail FROM users WHERE user_id = %s;", (userId,)) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - return result[0]["e_mail"] + raise + else: + return result[0]["e_mail"] ##### Storage ##### @@ -458,13 +485,15 @@ class DbConnector(object): """, (path,)) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - if result: - return result[0]["base_path"] - else: - return False + raise + else: + if result: + return result[0]["base_path"] + else: + return False def getStorageBasePath(self, storageId): """Returns the storage base path for a give storage id.""" @@ -473,10 +502,12 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT base_path FROM storage WHERE storage_id = %s;", (storageId,)) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - return result[0]["base_path"] + raise + else: + return result[0]["base_path"] def getStorageList(self): """Returns the full storage base list. Local storage points are excluded by default.""" @@ -485,10 +516,12 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM storage WHERE storage_type <> 'local';") result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - return result + raise + else: + return result def getStorageListByType(self, storageType): """Returns a list of storage locations for a given storage type.""" @@ -497,10 +530,12 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT * FROM storage WHERE storage_type = %s;", (storageType,)) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - return result + raise + else: + return result def getStorageType(self, basePath): """Returns the storage type for a given storage base path, if any. Otherwise it returns 'False'.""" @@ -509,13 +544,15 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT storage_type FROM storage WHERE base_path = %s;", (basePath,)) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - if result: - return result[0]["storage_type"] - else: - return False + raise + else: + if result: + return result[0]["storage_type"] + else: + return False def getStorageId(self, basePath): """Returns the storage id for a given storage base path, if any. Otherwise it returns 'False'.""" @@ -524,13 +561,15 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT storage_id FROM storage WHERE base_path = %s;", (basePath,)) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - if result: - return result[0]["storage_id"] - else: - return False + raise + else: + if result: + return result[0]["storage_id"] + else: + return False ##### Location ##### @@ -541,10 +580,12 @@ class DbConnector(object): cursor = conn.cursor(cursor_factory = RealDictCursor) cursor.execute("SELECT location_id FROM location WHERE storage_src_id = %s;", (destStorageId,)) result = cursor.fetchall() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - return result[0]["location_id"] + raise + else: + return result[0]["location_id"] """ @@ -602,9 +643,10 @@ class DbConnector(object): jobObj.errorMessage, jobObj.errorType,)) conn.commit() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() + raise def setStartTime(self, jobId): """Sets the job 'start_time' parameter.""" @@ -618,9 +660,10 @@ class DbConnector(object): """, (startTime, jobId,)) conn.commit() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() + raise def setEndTime(self, jobId): """Sets the job 'end_time' parameter.""" @@ -634,9 +677,10 @@ class DbConnector(object): """, (endTime, jobId,)) conn.commit() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() + raise def setPhase(self, jobId, phase): """Sets the job 'phase' parameter.""" @@ -649,9 +693,10 @@ class DbConnector(object): """, (phase, jobId,)) conn.commit() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() + raise def setTotalBlocks(self, jobId, totalBlocks): """ @@ -667,9 +712,10 @@ class DbConnector(object): """, (totalBlocks, jobId,)) conn.commit() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() + raise def updateProcessedBlocks(self, jobId, processedBlocks): """ @@ -685,9 +731,10 @@ class DbConnector(object): """, (processedBlocks, jobId,)) conn.commit() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() + raise def setResults(self, jobId, results): """Sets the job 'results' parameter.""" @@ -701,9 +748,10 @@ class DbConnector(object): (json.dumps(results), jobId,)) conn.commit() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() + raise ##### Node ##### @@ -730,51 +778,54 @@ class DbConnector(object): #out.write(f"parentLtreePath: {parentLtreePath}\n") #out.write(f"parentPath: {node.parentPath}\n\n") #out.close() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - - try: - #print(f"parentLtreePath: {parentLtreePath}, type: {type(parentLtreePath)}") - cursor.execute(""" - INSERT INTO node(parent_path, - parent_relative_path, - name, - tstamp_wrapper_dir, - type, - location_id, - async_trans, - sticky, - job_id, - creator_id, - content_length, - content_md5) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - ON CONFLICT - DO NOTHING - RETURNING node_id; - """, - (parentLtreePath, - parentLtreeRelativePath, - node.name, - node.wrapperDir, - node.type, - node.locationId, - node.asyncTrans, - node.sticky, - node.jobId, - node.creatorId, - node.contentLength, - node.contentMD5,)) - result = cursor.fetchall() - conn.commit() - if result: - return True + raise + else: + try: + #print(f"parentLtreePath: {parentLtreePath}, type: {type(parentLtreePath)}") + cursor.execute(""" + INSERT INTO node(parent_path, + parent_relative_path, + name, + tstamp_wrapper_dir, + type, + location_id, + async_trans, + sticky, + job_id, + creator_id, + content_length, + content_md5) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT + DO NOTHING + RETURNING node_id; + """, + (parentLtreePath, + parentLtreeRelativePath, + node.name, + node.wrapperDir, + node.type, + node.locationId, + node.asyncTrans, + node.sticky, + node.jobId, + node.creatorId, + node.contentLength, + node.contentMD5,)) + result = cursor.fetchall() + conn.commit() + except Exception: + if not conn.closed: + conn.rollback() + raise else: - return False - except Exception as e: - if not conn.closed: - conn.rollback() + if result: + return True + else: + return False def setAsyncTrans(self, nodeVOSPath, value): """Sets the 'async_trans' flag for a VOSpace node.""" @@ -788,9 +839,10 @@ class DbConnector(object): """, (value, nodeVOSPath,)) conn.commit() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() + raise def setJobId(self, nodeVOSPath, value): """Sets the 'job_id' flag for a VOSpace node.""" @@ -804,9 +856,10 @@ class DbConnector(object): """, (value, nodeVOSPath,)) conn.commit() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() + raise def setPhyDeletedOn(self, nodeId): """Sets the 'phy_deleted_on' flag for a VOSpace deleted node.""" @@ -820,9 +873,10 @@ class DbConnector(object): """, (phyDeletedOn, nodeId,)) conn.commit() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() + raise def updateGroupRead(self, groupToAdd, groupToRemove, nodeVOSPath): with self.getConnection() as conn: @@ -838,9 +892,10 @@ class DbConnector(object): groupToRemove, nodeVOSPath,)) conn.commit() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() + raise def updateGroupWrite(self, groupToAdd, groupToRemove, nodeVOSPath): with self.getConnection() as conn: @@ -856,9 +911,10 @@ class DbConnector(object): groupToRemove, nodeVOSPath,)) conn.commit() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() + raise ##### Storage ##### @@ -880,46 +936,48 @@ class DbConnector(object): basePath, baseUrl, hostname,)) - storageSrcId = cursor.fetchall()[0]["storage_id"] - - except Exception as e: + except Exception: if not conn.closed: conn.rollback() - - if storageType == "cold" or storageType == "hot": + raise + else: + if storageType == "cold" or storageType == "hot": + try: + cursor.execute(""" + SELECT storage_id + FROM storage + WHERE storage_type = 'local' + AND base_path = '/home' + AND hostname = 'localhost'; + """) + storageDestId = cursor.fetchall()[0]["storage_id"] + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + locationType = "async" + else: + storageDestId = storageSrcId + locationType = "portal" try: cursor.execute(""" - SELECT storage_id - FROM storage - WHERE storage_type = 'local' - AND base_path = '/home' - AND hostname = 'localhost'; - """) - storageDestId = cursor.fetchall()[0]["storage_id"] - except Exception as e: + INSERT INTO location(location_type, + storage_src_id, + storage_dest_id) + VALUES (%s, %s, %s); + """, + (locationType, + storageSrcId, + storageDestId,)) + conn.commit() + except Exception: if not conn.closed: conn.rollback() - locationType = "async" - else: - storageDestId = storageSrcId - locationType = "portal" - try: - cursor.execute(""" - INSERT INTO location(location_type, - storage_src_id, - storage_dest_id) - VALUES (%s, %s, %s); - """, - (locationType, - storageSrcId, - storageDestId,)) - - conn.commit() - except: - if not conn.closed: - conn.rollback() - return True + raise + else: + return True else: return False @@ -940,24 +998,27 @@ class DbConnector(object): """, (storageId,)) result = cursor.fetchall() - except: + except Exception: if not conn.closed: conn.rollback() - - if result[0]["res"]: - return False + raise else: - try: - cursor.execute(""" - DELETE FROM storage - WHERE storage_id = %s; - """, - (storageId,)) - conn.commit() - except: - if not conn.closed: - conn.rollback() - return True + if result[0]["res"]: + return False + else: + try: + cursor.execute(""" + DELETE FROM storage + WHERE storage_id = %s; + """, + (storageId,)) + conn.commit() + except Exception: + if not conn.closed: + conn.rollback() + raise + else: + return True ##### Users ##### @@ -978,6 +1039,7 @@ class DbConnector(object): username, email,)) conn.commit() - except Exception as e: + except Exception: if not conn.closed: conn.rollback() + raise diff --git a/transfer_service/exceptions.py b/transfer_service/exceptions.py index a133d044adcd3f9739485bfe5994f7a946542eae..cf52af2bd2d73bf9db45d76edde63598097110df 100644 --- a/transfer_service/exceptions.py +++ b/transfer_service/exceptions.py @@ -5,7 +5,15 @@ class Error(Exception): pass -# RAP exceptions +# FileGrouper exceptions + +class TarFileCreationException(Error): + def __init__(self, folder): + self.message = "Error: cannot create a .tar for " + folder + super(MultipleUsersException, self).__init__(self.message) + + +# RapClient exceptions class MultipleUsersException(Error): def __init__(self): diff --git a/transfer_service/file_grouper.py b/transfer_service/file_grouper.py index 65435794a5fd2f0eacd61ec43c73c7b3f105f371..7f2b57fcda28be555a6f24ba504bd3f47dc37a9a 100644 --- a/transfer_service/file_grouper.py +++ b/transfer_service/file_grouper.py @@ -5,6 +5,8 @@ import shutil import subprocess import sys +from exceptions import TarFileCreationException + class FileGrouper(object): @@ -61,7 +63,7 @@ class FileGrouper(object): os.chdir(parent) sp = subprocess.run(["tar", "-cf", os.path.basename(folder) + ".tar", os.path.basename(folder)], capture_output = True) if(sp.returncode or sp.stderr): - sys.exit(f"Error: cannot create a .tar for {folder}") + raise(TarFileCreationException(folder)) else: try: shutil.rmtree(folder) diff --git a/transfer_service/job_queue.py b/transfer_service/job_queue.py index 8e7a96481412c6e91cc75b3ceae5ec7f0507be99..b0c61c2a4c83964204b38198ba95c503bdc97a7f 100644 --- a/transfer_service/job_queue.py +++ b/transfer_service/job_queue.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# A FIFO queue based on a Redis list +# A FIFO queue based on Redis lists # # @@ -23,7 +23,12 @@ class JobQueue(object): def len(self): """Returns the number of jobs in the current queue.""" - return self.redisCli.llen(self.queueName) + try: + numJobs = self.redisCli.llen(self.queueName) + except Exception: + raise + else: + return numJobs def name(self): """Returns the name of the current queue.""" @@ -31,18 +36,22 @@ class JobQueue(object): def getJob(self): """Gets a copy of the first job without moving it out from the current queue.""" - job = json.loads(self.redisCli.lrange(self.queueName, self.len() - 1, self.len() - 1)[0].decode("utf-8")) - jobObj = Job() - jobObj.setId(job["jobId"]) - jobObj.setType(job["jobType"]) - jobObj.setOwnerId(job["ownerId"]) - jobObj.setPhase(job["phase"]) - jobObj.setQuote(job["quote"]) - jobObj.setStartTime(job["startTime"]) - jobObj.setEndTime(job["endTime"]) - jobObj.setExecutionDuration(job["executionDuration"]) - jobObj.setInfo(job["jobInfo"]) - return jobObj + try: + job = json.loads(self.redisCli.lrange(self.queueName, self.len() - 1, self.len() - 1)[0].decode("utf-8")) + jobObj = Job() + jobObj.setId(job["jobId"]) + jobObj.setType(job["jobType"]) + jobObj.setOwnerId(job["ownerId"]) + jobObj.setPhase(job["phase"]) + jobObj.setQuote(job["quote"]) + jobObj.setStartTime(job["startTime"]) + jobObj.setEndTime(job["endTime"]) + jobObj.setExecutionDuration(job["executionDuration"]) + jobObj.setInfo(job["jobInfo"]) + except Exception: + raise + else: + return jobObj def insertJob(self, jobObj): """Pushes a new job into the queue.""" @@ -58,22 +67,29 @@ class JobQueue(object): "parameters": jobObj.parameters, "results": jobObj.results, "jobInfo": jobObj.jobInfo } - self.redisCli.lpush(self.queueName, json.dumps(data)) + try: + self.redisCli.lpush(self.queueName, json.dumps(data)) + except Exception: + raise def extractJob(self): """Moves out a job from the end of the current queue.""" - job = json.loads(self.redisCli.brpop(self.queueName)[1].decode("utf-8")) - jobObj = Job() - jobObj.setId(job["jobId"]) - jobObj.setType(job["jobType"]) - jobObj.setOwnerId(job["ownerId"]) - jobObj.setPhase(job["phase"]) - jobObj.setQuote(job["quote"]) - jobObj.setStartTime(job["startTime"]) - jobObj.setEndTime(job["endTime"]) - jobObj.setExecutionDuration(job["executionDuration"]) - jobObj.setInfo(job["jobInfo"]) - return jobObj + try: + job = json.loads(self.redisCli.brpop(self.queueName)[1].decode("utf-8")) + jobObj = Job() + jobObj.setId(job["jobId"]) + jobObj.setType(job["jobType"]) + jobObj.setOwnerId(job["ownerId"]) + jobObj.setPhase(job["phase"]) + jobObj.setQuote(job["quote"]) + jobObj.setStartTime(job["startTime"]) + jobObj.setEndTime(job["endTime"]) + jobObj.setExecutionDuration(job["executionDuration"]) + jobObj.setInfo(job["jobInfo"]) + except Exception: + raise + else: + return jobObj def moveJobTo(self, nextQueueName): """ @@ -81,7 +97,10 @@ class JobQueue(object): at the beginning of the next queue (this operation is atomic) DEPRECATED """ - self.redisCli.brpoplpush(self.queueName, nextQueueName) + try: + self.redisCli.brpoplpush(self.queueName, nextQueueName) + except Exception: + raise # Test diff --git a/transfer_service/mailer.py b/transfer_service/mailer.py index fbf5eb90551987fd5185a95aa06a93ac0c7d7a11..01de02afc5c61d6c06842bd59d25b8b736aa0915 100644 --- a/transfer_service/mailer.py +++ b/transfer_service/mailer.py @@ -63,17 +63,14 @@ class Mailer(object): try: smtpObj = smtplib.SMTP(self.smtpServer, self.smtpPort) smtpObj.send_message(self.message) - logMsg = "E-mail message sent successfully!" - self.logger.debug(logMsg) except SMTPConnectError: - logMsg = "Cannot connect to SMTP server." - self.logger.exception(logMsg) + self.logger.exception("Cannot connect to SMTP server.") except TimeoutError: - logMsg = "Connection timeout." - self.logger.exception(logMsg) + self.logger.exception("Connection timeout.") except SMTPException: - logMsg = "Cannot send email message." - self.logger.exception(logMsg) + self.logger.exception("Cannot send email message.") + else: + self.logger.debug("E-mail message sent successfully!") else: self.logger.debug("E-mail notifications disabled.") diff --git a/transfer_service/redis_rpc_server.py b/transfer_service/redis_rpc_server.py index fe59f10706546d6a72750be425920b394c17741d..39f39b500328736729f8ccd9d449a2632ff92d04 100644 --- a/transfer_service/redis_rpc_server.py +++ b/transfer_service/redis_rpc_server.py @@ -13,12 +13,15 @@ class RedisRPCServer(threading.Thread): def run(self): while True: - channel, request = self.client.brpop(self.rpcQueue) - channel = channel.decode("utf-8") - request = json.loads(request.decode("utf-8")) - response = self.callback(request) - self.client.rpush(request["req_id"], json.dumps(response)) - self.client.expire(request["req_id"], 30) + try: + channel, request = self.client.brpop(self.rpcQueue) + channel = channel.decode("utf-8") + request = json.loads(request.decode("utf-8")) + response = self.callback(request) + self.client.rpush(request["req_id"], json.dumps(response)) + self.client.expire(request["req_id"], 30) + except Exception: + raise def callback(self, request): """ diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index ad92bf6750da494757eb7fcfc8fcc3edd9d8b40d..9358c29d9827461f03feabd8901aa0e782e3e231 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -126,12 +126,21 @@ class StorePreprocessor(TaskExecutor): self.md5calc.recursive(destPath) # Case 4: /home/user/store is empty (this should be handled by data_rpc_server.py) else: - self.logger.critical("FATAL: the 'store' directory is empty, the application MUST to be restarted.") + self.logger.critical("FATAL: the 'store' directory is empty") + time.sleep(5) + sys.exit(1) # Third scan after directory structure 'check & repair' self.logger.info("Recursive scan of the 'store' directory") [ dirs, files ] = self.systemUtils.scanRecursive(self.path) - + + try: + locationId = self.dbConn.getLocationId(self.storageId) + except Exception: + self.logger.exception("FATAL: unable to obtain the location ID for the storage point") + time.sleep(5) + sys.exit(1) + self.logger.info("Checksum calculation and file catalog update") pathPrefix = self.storageStorePath.replace("{username}", self.username) tstampWrapperDirPattern = re.compile("/[0-9]{4}_[0-9]{2}_[0-9]{2}-[0-9]{2}_[0-9]{2}_[0-9]{2}-vos_wrapper") @@ -144,8 +153,7 @@ class StorePreprocessor(TaskExecutor): tstampWrapperDir = tstampWrapperDirPattern.search(basePath).group(0).lstrip('/') basePath = tstampWrapperDirPattern.sub("", basePath) cnode.setWrapperDir(tstampWrapperDir) - cnode.setParentPath(basePath) - locationId = self.dbConn.getLocationId(self.storageId) + cnode.setParentPath(basePath) cnode.setLocationId(locationId) cnode.setJobId(self.jobId) cnode.setCreatorId(self.userId) @@ -154,17 +162,22 @@ class StorePreprocessor(TaskExecutor): vospacePath = basePath + '/' + nodeName - if os.path.islink(dir): - # node is a symlink, do not import it... - now = dt.now().isoformat() - self.nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ]) - elif self.dbConn.insertNode(cnode): - now = dt.now().isoformat() - self.nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) - else: - # node already exists, skip it... - now = dt.now().isoformat() - self.nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) + try: + if os.path.islink(dir): + # node is a symlink, do not import it... + now = dt.now().isoformat() + self.nodeList.append([ now, dir, vospacePath, "container", "SYMLINK" ]) + elif self.dbConn.insertNode(cnode): + now = dt.now().isoformat() + self.nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) + else: + # node already exists, skip it... + now = dt.now().isoformat() + self.nodeList.append([ now, dir, vospacePath, "container", "SKIPPED" ]) + except Exception: + self.logger.exception("FATAL: unable to update the file catalog") + time.sleep(5) + sys.exit(1) for flist in files: for file in flist: @@ -177,7 +190,6 @@ class StorePreprocessor(TaskExecutor): basePath = tstampWrapperDirPattern.sub("", basePath) dnode.setWrapperDir(tstampWrapperDir) dnode.setParentPath(basePath) - locationId = self.dbConn.getLocationId(self.storageId) dnode.setLocationId(locationId) dnode.setJobId(self.jobId) dnode.setCreatorId(self.userId) @@ -187,39 +199,46 @@ class StorePreprocessor(TaskExecutor): vospacePath = basePath + '/' + nodeName - if os.path.islink(file): - # node is a symlink, do not import it... - now = dt.now().isoformat() - self.nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ]) - elif self.dbConn.insertNode(dnode): - now = dt.now().isoformat() - self.nodeList.append([ now, file, vospacePath, "data", "DONE" ]) - else: - # node already exists, skip it... - now = dt.now().isoformat() - self.nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) + try: + if os.path.islink(file): + # node is a symlink, do not import it... + now = dt.now().isoformat() + self.nodeList.append([ now, file, vospacePath, "data", "SYMLINK" ]) + elif self.dbConn.insertNode(dnode): + now = dt.now().isoformat() + self.nodeList.append([ now, file, vospacePath, "data", "DONE" ]) + else: + # node already exists, skip it... + now = dt.now().isoformat() + self.nodeList.append([ now, file, vospacePath, "data", "SKIPPED" ]) + except Exception: + self.logger.exception("FATAL: unable to update the file catalog") + time.sleep(5) + sys.exit(1) self.logger.info("Overall data size calculation") self.jobObj.jobInfo["dataSize"] = self.systemUtils.getSize(self.path) self.logger.info("========== End of preprocessing phase ==========") except Exception: - self.logger.exception("FATAL: something went wrong during the preprocessing phase.") + self.logger.exception("FATAL: something went wrong during the preprocessing phase") + self.update("ERROR") time.sleep(5) sys.exit(1) - def update(self): - self.logger.info("Job phase updated to QUEUED") - self.jobObj.setPhase("QUEUED") - self.dbConn.setPhase(self.jobId, "QUEUED") - - # Send e-mail notification - m = Mailer(self.logger) + def update(self, status): + try: + m = Mailer(self.logger) - m.addRecipient(self.adminEmail) - userEmail = self.dbConn.getUserEmail(self.jobObj.ownerId) - if userEmail != self.adminEmail: - m.addRecipient(userEmail) - - msg = f""" + m.addRecipient(self.adminEmail) + userEmail = self.dbConn.getUserEmail(self.jobObj.ownerId) + if userEmail != self.adminEmail: + m.addRecipient(userEmail) + + if status == "OK": + self.logger.info("Job phase updated to QUEUED") + self.jobObj.setPhase("QUEUED") + self.dbConn.setPhase(self.jobId, "QUEUED") + # Send e-mail notification + msg = f""" Dear user, your job has been QUEUED. @@ -232,8 +251,37 @@ class StorePreprocessor(TaskExecutor): You will be notified by email once the job is completed. """ - m.setMessage("VOSpace data storage notification: Job QUEUED", msg) - m.send() + m.setMessage("VOSpace data storage notification: Job QUEUED", msg) + m.send() + else: + # Send e-mail notification + self.logger.info("Job phase updated to ERROR") + self.jobObj.setPhase("ERROR") + self.jobObj.setErrorType("fatal") + self.dbConn.insertJob(self.jobObj) + self.dbConn.setEndTime(self.jobId) + self.setDestinationQueueName("write_terminated") + + msg = f""" + Dear user, + your job FAILED during the preprocessing phase. + + Job ID: {self.jobId} + Job type: {self.jobObj.type} + Storage type: {self.storageType} + Storage ID: {self.storageId} + Owner ID: {self.jobObj.ownerId} + + The issue will be automatically reported to the administrator. + + """ + m.setMessage("VOSpace data storage notification: Job ERROR", msg) + m.send() + except Exception: + self.logger.exception(f"Unable to update the job status for job {self.jobId}") + finally: + self.jobObj.jobInfo["nodeList"] = self.nodeList.copy() + self.nodeList.clear() def run(self): self.logger.info("Starting store preprocessor...") @@ -249,13 +297,15 @@ class StorePreprocessor(TaskExecutor): self.userId = self.jobObj.ownerId self.username = self.jobObj.jobInfo["userName"] self.prepare(self.username) - self.execute() - self.jobObj.jobInfo["nodeList"] = self.nodeList.copy() - self.nodeList.clear() - self.update() - self.destQueue.insertJob(self.jobObj) - self.srcQueue.extractJob() - self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") + self.execute() + self.update("OK") + try: + self.destQueue.insertJob(self.jobObj) + except Exception: + self.logger.exception(f"Failed to move job {self.jobObj.jobId} from {self.srcQueue.name()} to {self.destQueue.name()}") + else: + self.srcQueue.extractJob() + self.logger.info(f"Job {self.jobObj.jobId} MOVED from {self.srcQueue.name()} to {self.destQueue.name()}") # Test #sp = StorePreprocessor()