# TODO: # - error codes and status codes list and description # # # import os import sys import json #from enum import Enum from amqp_server import AMQPServer from job import Job from job_cache import JobCache class StoreAMQPServer(AMQPServer): def __init__(self, host, queue): self.type = "store" self.storeAck = False self.jobCache = JobCache('redis', 6379, 1) self.job = None self.username = None self.path = None super(StoreAMQPServer, self).__init__(host, queue) def execute_callback(self, requestBody): # 'requestType' and 'userName' attributes are mandatory if "requestType" not in requestBody or "userName" not in requestBody: response = { "errorCode": 1, "errorMsg": "Malformed request, missing parameters." } elif requestBody["requestType"] == "CSTORE" or requestBody["requestType"] == "HSTORE": self.job = Job() self.job.setInfo(requestBody) self.job.setPhase("PENDING") user = requestBody["userName"] folderPath = "/home/" + user + "/store" userInfo = self.userInfo(user) # Check if the user exists on the transfer node if not userInfo: response = { "responseType": "ERROR", "errorCode": 2, "errorMsg": "The user does not exist on the transfer node." } else: uid = os.stat(folderPath).st_uid gid = os.stat(folderPath).st_gid # Check if uid and gid match and avoid privilege escalation if uid == userInfo[1] and gid == userInfo[2] and uid != 0 and gid != 0: # If write permissions are set and the 'store' folder is not empty, # it means that data is ready to be copied, otherwise, nothing can # be done until the write permissions are restored or new data is # copied on the transfer node by the user. if os.access(folderPath, os.W_OK) and os.listdir(folderPath): response = { "responseType": "STORE_ACK" } self.storeAck = True else: response = { "responseType": "ERROR", "errorCode": 3, "errorMsg": "Service busy." } else: response = { "responseType": "ERROR", "errorCode": 4, "errorMsg": "Permission denied." } elif requestBody["requestType"] == "STORE_CON": if self.storeAck: self.storeAck = False user = requestBody["userName"] self.prepare(user) self.jobCache.set(self.job) redisResponse = self.jobCache.get(self.job.jobId) if "error" in redisResponse: response = { "responseType": "ERROR", "errorCode": 5, "errorMsg": "Job creation failed." } else: response = { "responseType": "STORE_RUN", "jobId": self.job.jobId } else: response = { "responseType": "ERROR", "errorCode": 6, "errorMsg": "Store request not acknowledged." } else: response = { "responseType": "ERROR", "errorCode": 7, "errorMsg": "Unkown request type." } return response # to be removed from store_preprocessor.py # or simply add a chmod -x here, to be faster? def prepare(self, username): self.username = username self.path = "/home/" + username + "/store" for folder, subfolders, files in os.walk(self.path): os.chown(folder, 0, 0) os.chmod(folder, 0o555) for s in subfolders: os.chown(os.path.join(folder, s), 0, 0) os.chmod(os.path.join(folder, s), 0o555) for f in files: os.chown(os.path.join(folder, f), 0, 0) os.chmod(os.path.join(folder, f), 0o555) def userInfo(self, username): fp = open("/etc/passwd", 'r') for line in fp: info = line.split(':') user = info[0] uid = int(info[2]) gid = int(info[3]) if user == username: return [ user, uid, gid ] return False def run(self): print(f"Starting AMQP server of type {self.type}...") super(StoreAMQPServer, self).run()