diff --git a/client/vos_data b/client/vos_data index 6e246ad3b7e9b9edd2b9b3770c70b28e890671f4..f4ac1adeac539871a389137b38251c79b30e236f 100644 --- a/client/vos_data +++ b/client/vos_data @@ -53,6 +53,7 @@ DESCRIPTION sys.exit("FATAL: Malformed response, storage acknowledge expected.\n") elif storeResponse["responseType"] == "STORE_ACK": storageList = storeResponse["storageList"] + storageType = storageList[0]["storage_type"] if not storageList: sys.exit("No storage point found. Please add a storage point using the 'vos_storage' command.\n") print("Choose one of the following storage locations:") @@ -89,7 +90,10 @@ DESCRIPTION except EOFError: print("\nPlease, use CTRL+C to quit.") if confirm == "yes": - confirmRequest = { "requestType": "STORE_CON", "userName": username, "storageId": storageId } + confirmRequest = { "requestType": "STORE_CON", + "userName": username, + "storageId": storageId, + "storageType": storageType } confirmResponse = self.call(confirmRequest) if "responseType" not in confirmResponse: sys.exit("\nFATAL: Malformed response, storage confirmation expected.\n") diff --git a/transfer_service/abort_job_rpc_server.py b/transfer_service/abort_job_rpc_server.py index 08ce090c2a545e81218de4e2e1bf94d51e26e7da..ec0cb9c0375370c8731cf2b1e8f977166471f5fb 100644 --- a/transfer_service/abort_job_rpc_server.py +++ b/transfer_service/abort_job_rpc_server.py @@ -16,8 +16,8 @@ class AbortJobRPCServer(RedisRPCServer): self.params["host"], self.params.getint("port"), self.params["db"], - 8, - 16) + 4, + 8) super(AbortJobRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/data_rpc_server.py b/transfer_service/data_rpc_server.py index 464155b81dd39f4c65034147d1910dcf338f61a1..695cc86914895e9d14e35ca304225686224c5a28 100644 --- a/transfer_service/data_rpc_server.py +++ b/transfer_service/data_rpc_server.py @@ -30,8 +30,8 @@ class DataRPCServer(RedisRPCServer): self.params["host"], self.params.getint("port"), self.params["db"], - 8, - 16) + 4, + 8) self.params = config.loadSection("transfer_node") self.storageStorePath = self.params["store_path"] self.params = config.loadSection("scheduling") @@ -100,6 +100,7 @@ class DataRPCServer(RedisRPCServer): self.dbConn.insertJob(job) dbResponse = self.dbConn.getJob(job.jobId) job.jobInfo["storageId"] = requestBody["storageId"] + job.jobInfo["storageType"] = requestBody["storageType"] self.pendingQueueWrite.insertJob(job) if "error" in dbResponse: response = { "responseType": "ERROR", diff --git a/transfer_service/db_connector.py b/transfer_service/db_connector.py index 4642a1cc7bf13db2fe092bb22a0fa682b6a20a93..b2e4cb779c46f93ff22726c591256119a20d839d 100644 --- a/transfer_service/db_connector.py +++ b/transfer_service/db_connector.py @@ -51,23 +51,6 @@ class DbConnector(object): ##### Node ##### - def nodeExists(self, node): - """Checks if a VOSpace node already exists. Returns a boolean.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - nodeVOSPath = node.parentPath + '/' + node.name - cursor.execute("SELECT * FROM node_vos_path WHERE vos_path = %s;", (nodeVOSPath,)) - result = cursor.fetchall() - except Exception as e: - if not conn.closed: - conn.rollback() - print(e) - if result: - return True - else: - return False - def getOSPath(self, vospacePath): """Returns a list containing full path, storage type and username for a VOSpace path.""" with self.getConnection() as conn: @@ -614,7 +597,7 @@ class DbConnector(object): ##### Node ##### def insertNode(self, node): - """Inserts a VOSpace node.""" + """Inserts a VOSpace node. Returns 'True' on success, 'False' otherwise.""" with self.getConnection() as conn: try: out = open("db_connector_log.txt", "a") @@ -652,11 +635,15 @@ class DbConnector(object): tstamp_wrapper_dir, type, location_id, + sticky, busy_state, creator_id, content_length, content_md5) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s); + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT + DO NOTHING + RETURNING node_id; """, (parentLtreePath, parentLtreeRelativePath, @@ -664,11 +651,17 @@ class DbConnector(object): node.wrapperDir, node.type, node.locationId, + node.sticky, node.busyState, node.creatorID, node.contentLength, node.contentMD5,)) + result = cursor.fetchall() conn.commit() + if result: + return True + else: + return False except Exception as e: if not conn.closed: conn.rollback() @@ -715,26 +708,6 @@ class DbConnector(object): if not conn.closed: conn.rollback() - def setSticky(self, nodeVOSPath, value): - """Sets the 'sticky' flag for a VOSpace node.""" - with self.getConnection() as conn: - try: - cursor = conn.cursor(cursor_factory = RealDictCursor) - cursor.execute(""" - UPDATE node SET sticky = %s - WHERE path <@ - (SELECT path - FROM node_path p - JOIN node n ON p.node_id = n.node_id - WHERE p.vos_path = %s); - """, - (value, nodeVOSPath,)) - conn.commit() - except Exception as e: - if not conn.closed: - conn.rollback() - - def setBusyState(self, nodeVOSPath, value): """Sets the 'busy_state' flag for a VOSpace node.""" with self.getConnection() as conn: diff --git a/transfer_service/get_job_rpc_server.py b/transfer_service/get_job_rpc_server.py index 344af1a730f333851ab06c52679c45c0ed00fe16..8bafbecd347931efb3ebbd0311f0008b52139888 100644 --- a/transfer_service/get_job_rpc_server.py +++ b/transfer_service/get_job_rpc_server.py @@ -18,8 +18,8 @@ class GetJobRPCServer(RedisRPCServer): self.params["host"], self.params.getint("port"), self.params["db"], - 8, - 16) + 4, + 8) super(GetJobRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/import_executor.py b/transfer_service/import_executor.py index dd91c78e3b23205ddc5ea5e8f6b837de23218c7d..2af484355aa2ab32a950b9eec590af8ba1989dc8 100644 --- a/transfer_service/import_executor.py +++ b/transfer_service/import_executor.py @@ -90,10 +90,10 @@ class ImportExecutor(TaskExecutor): cnode.setLocationId(locationId) cnode.setCreatorID(self.userId) cnode.setContentLength(0) - if not self.dbConn.nodeExists(cnode): - self.dbConn.insertNode(cnode) + cnode.setSticky(True) + + if self.dbConn.insertNode(cnode): self.dbConn.setAsyncTrans(vospacePath, True) - self.dbConn.setSticky(vospacePath, True) now = dt.now() nodeList.append([ now, dir, vospacePath, "container", "DONE" ]) else: @@ -127,11 +127,10 @@ class ImportExecutor(TaskExecutor): dnode.setCreatorID(self.userId) dnode.setContentLength(os.path.getsize(file)) dnode.setContentMD5(self.md5calc.getMD5(file)) + dnode.setSticky(True) - if not self.dbConn.nodeExists(dnode): - self.dbConn.insertNode(dnode) + if self.dbConn.insertNode(dnode): self.dbConn.setAsyncTrans(vospacePath, True) - self.dbConn.setSticky(vospacePath, True) now = dt.now() nodeList.append([ now, file, vospacePath, "data", "DONE" ]) else: diff --git a/transfer_service/import_rpc_server.py b/transfer_service/import_rpc_server.py index cac89b80bf01eb3fcc285efad964d32b4e99bea3..99cdef8d441366b362a38f12498676fceeddd74a 100644 --- a/transfer_service/import_rpc_server.py +++ b/transfer_service/import_rpc_server.py @@ -68,14 +68,14 @@ class ImportRPCServer(RedisRPCServer): response = { "responseType": "ERROR", "errorCode": 5, "errorMsg": "Directory path expected." } - #elif username not in path: - # response = { "responseType": "ERROR", - # "errorCode": 6, - # "errorMsg": "Directory path does not contain the username." } - #elif os.path.dirname(path) != pathPrefix + '/' + username: - # response = { "responseType": "ERROR", - # "errorCode": 7, - # "errorMsg": "Invalid path, directory must be located in " + pathPrefix + '/' + username } + elif username not in path: + response = { "responseType": "ERROR", + "errorCode": 6, + "errorMsg": "Directory path does not contain the username." } + elif os.path.dirname(path) != pathPrefix + '/' + username: + response = { "responseType": "ERROR", + "errorCode": 7, + "errorMsg": "Invalid path, directory must be located in " + pathPrefix + '/' + username } elif self.importReadyQueue.len() >= self.maxReadyJobs: response = { "responseType": "ERROR", "errorCode": 8, diff --git a/transfer_service/job_rpc_server.py b/transfer_service/job_rpc_server.py index 57c0c32db7f9a50929e83ecc2445b7bf13cacef3..d1a1be95808f7fa3bc66be212ee1a527da02bbad 100644 --- a/transfer_service/job_rpc_server.py +++ b/transfer_service/job_rpc_server.py @@ -18,8 +18,8 @@ class JobRPCServer(RedisRPCServer): self.params["host"], self.params.getint("port"), self.params["db"], - 8, - 16) + 4, + 8) super(JobRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/mailer.py b/transfer_service/mailer.py index 7bfa12902f4f412dd1db9f7239de7ca8f1a9b9c7..4e78033bd84c343977d2699e6731b600c5e8179b 100644 --- a/transfer_service/mailer.py +++ b/transfer_service/mailer.py @@ -56,6 +56,8 @@ class Mailer(object): smtpObj = smtplib.SMTP(self.smtpServer, self.smtpPort) smtpObj.send_message(self.message) print("Message sent!") + except TimeoutError: + print("Error: connection timeout.") except SMTPException: print("Error: cannot send email message.") diff --git a/transfer_service/node.py b/transfer_service/node.py index d2b91f0f4e6a8f0c577eab90db37fe6cc32a792b..b7821730ecdd79ef90f042eaf94dab66e2a6f486 100644 --- a/transfer_service/node.py +++ b/transfer_service/node.py @@ -15,7 +15,7 @@ class Node(object): self.locationId = None self.format = None self.asyncTrans = None - self.sticky = None + self.sticky = False self.busyState = None self.creatorID = None self.groupRead = [] diff --git a/transfer_service/start_job_rpc_server.py b/transfer_service/start_job_rpc_server.py index 20850aca5c7d13c1dd2f177fa29adac5b4de28cf..2ca989538915ad3815991a45ea50c48ac5c49696 100644 --- a/transfer_service/start_job_rpc_server.py +++ b/transfer_service/start_job_rpc_server.py @@ -20,8 +20,8 @@ class StartJobRPCServer(RedisRPCServer): self.params["host"], self.params.getint("port"), self.params["db"], - 8, - 16) + 4, + 8) self.params = config.loadSection("scheduling") self.maxPendingJobs = self.params.getint("max_pending_jobs") self.maxTerminatedJobs = self.params.getint("max_terminated_jobs") diff --git a/transfer_service/storage_rpc_server.py b/transfer_service/storage_rpc_server.py index 6c381b62b17f518520ab8da04f72d61284323a80..dcf1505d893cccc0adecec718d51cff18ea6d1e9 100644 --- a/transfer_service/storage_rpc_server.py +++ b/transfer_service/storage_rpc_server.py @@ -18,8 +18,8 @@ class StorageRPCServer(RedisRPCServer): self.params["host"], self.params.getint("port"), self.params["db"], - 8, - 16) + 4, + 8) super(StorageRPCServer, self).__init__(host, port, db, rpcQueue) def callback(self, requestBody): diff --git a/transfer_service/store_executor.py b/transfer_service/store_executor.py index 4befaee6ba19798950f7a81c63bec093566d7a11..ca7b773f7ffb7c9bb91fc0f2e53db22f7da100f1 100644 --- a/transfer_service/store_executor.py +++ b/transfer_service/store_executor.py @@ -38,6 +38,7 @@ class StoreExecutor(TaskExecutor): self.username = None self.requestType = None self.storageId = None + self.storageType = None self.nodeList = [] self.systemUtils = SystemUtils() super(StoreExecutor, self).__init__() @@ -78,7 +79,6 @@ class StoreExecutor(TaskExecutor): for nodeVOSPath in self.nodeList: out.write(f"nodeListElement: {nodeVOSPath}\n") self.dbConn.setAsyncTrans(nodeVOSPath, True); - self.dbConn.setSticky(nodeVOSPath, True); self.dbConn.setBusyState(nodeVOSPath, False); self.jobObj.setPhase("COMPLETED") self.dbConn.setPhase(self.jobId, "COMPLETED") @@ -97,6 +97,7 @@ class StoreExecutor(TaskExecutor): self.username = self.jobObj.jobInfo["userName"] self.requestType = self.jobObj.jobInfo["requestType"] self.storageId = self.jobObj.jobInfo["storageId"] + self.storageType = self.jobObj.jobInfo["storageType"] self.nodeList = self.jobObj.jobInfo["nodeList"] # TODO # 1) Controlla il tipo di destinazione: hot (server) o cold (tape) diff --git a/transfer_service/store_preprocessor.py b/transfer_service/store_preprocessor.py index f3cbfd7cb1105a3c0e0a305188ef3bc5cdb46699..2e7b194a5d2fdf225239033cb08138044427328f 100644 --- a/transfer_service/store_preprocessor.py +++ b/transfer_service/store_preprocessor.py @@ -42,6 +42,7 @@ class StorePreprocessor(TaskExecutor): self.params = config.loadSection("transfer_node") self.storageStorePath = self.params["store_path"] self.storageId = None + self.storageType = None self.jobObj = None self.jobId = None self.username = None @@ -139,8 +140,9 @@ class StorePreprocessor(TaskExecutor): cnode.setBusyState(True) cnode.setCreatorID(self.userId) cnode.setContentLength(0) - if not self.dbConn.nodeExists(cnode): - self.dbConn.insertNode(cnode) + cnode.setSticky(True) + + if self.dbConn.insertNode(cnode): self.nodeList.append(basePath + '/' + nodeName) out.write("\n\n") @@ -168,8 +170,9 @@ class StorePreprocessor(TaskExecutor): dnode.setCreatorID(self.userId) dnode.setContentLength(os.path.getsize(file)) dnode.setContentMD5(self.md5calc.getMD5(file)) - if not self.dbConn.nodeExists(dnode): - self.dbConn.insertNode(dnode) + dnode.setSticky(True) + + if self.dbConn.insertNode(dnode): self.nodeList.append(basePath + '/' + nodeName) out.write("\n") @@ -189,6 +192,7 @@ class StorePreprocessor(TaskExecutor): self.jobObj = self.srcQueue.getJob() self.jobId = self.jobObj.jobId self.storageId = self.jobObj.jobInfo["storageId"] + self.storgeType = self.jobObj.jobInfo["storageType"] self.username = self.jobObj.jobInfo["userName"] self.prepare(self.username) self.execute()