Skip to content
Snippets Groups Projects
Select Git revision
  • bc7455dcf110da63921a8c4016c741a8bad82311
  • master default protected
  • rocky_8
  • rocky_9
  • pasture
  • testing
  • query
  • v0.2.9
  • v0.2.8
  • v0.2.7
  • v0.2.6
  • v0.2.5
  • v0.2.4
  • v0.2.3
  • v0.2.2
  • v0.2.1
  • v0.2.0
  • v0.1.2
  • v0.1.1
  • v0.1
20 results

store_amqp_server.py

Blame
  • store_amqp_server.py 4.73 KiB
    #  TODO:
    #  - error codes and status codes list and description
    #
    #
    #
    
    import os
    import sys
    import json
    
    #from enum import Enum
    
    from amqp_server import AMQPServer
    from job import Job
    from job_cache import JobCache
    
    class StoreAMQPServer(AMQPServer):
    
        def __init__(self, host, queue):
            self.type = "store"
            self.storeAck = False
            self.jobCache = JobCache('redis', 6379, 1)
            self.job = None
            self.username = None
            self.path = None
            super(StoreAMQPServer, self).__init__(host, queue)
    
        def execute_callback(self, requestBody):
            # 'requestType' and 'userName' attributes are mandatory
            if "requestType" not in requestBody or "userName" not in requestBody:
                response = { "errorCode": 1, "errorMsg": "Malformed request, missing parameters." }
            elif requestBody["requestType"] == "CSTORE" or requestBody["requestType"] == "HSTORE":
                self.job = Job()
                self.job.setInfo(requestBody)
                self.job.setPhase("PENDING")
                user = requestBody["userName"]
                folderPath = "/home/" + user + "/store"
                userInfo = self.userInfo(user)
                # Check if the user exists on the transfer node
                if not userInfo:
                    response = { "responseType": "ERROR",
                                 "errorCode": 2,
                                 "errorMsg": "The user does not exist on the transfer node." }
                else:
                    uid = os.stat(folderPath).st_uid
                    gid = os.stat(folderPath).st_gid
                    # Check if uid and gid match and avoid privilege escalation
                    if uid == userInfo[1] and gid == userInfo[2] and uid != 0 and gid != 0:
                        # If write permissions are set and the 'store' folder is not empty,
                        # it means that data is ready to be copied, otherwise, nothing can
                        # be done until the write permissions are restored or new data is
                        # copied on the transfer node by the user.
                        if os.access(folderPath, os.W_OK) and os.listdir(folderPath):
                            response = { "responseType": "STORE_ACK" }
                            self.storeAck = True
                        else:
                            response = { "responseType": "ERROR",
                                         "errorCode": 3,
                                         "errorMsg": "Service busy." }                                                
                    else:
                        response = { "responseType": "ERROR",
                                     "errorCode": 4,
                                     "errorMsg": "Permission denied." }
            elif requestBody["requestType"] == "STORE_CON":
                if self.storeAck:
                    self.storeAck = False
                    user = requestBody["userName"]
                    self.prepare(user)
                    self.jobCache.set(self.job)
                    redisResponse = self.jobCache.get(self.job.jobId)
                    if "error" in redisResponse:
                        response = { "responseType": "ERROR",
                                     "errorCode": 5,
                                     "errorMsg": "Job creation failed." }
                    else:
                        response = { "responseType": "STORE_RUN", 
                                     "jobId": self.job.jobId }
                else:
                    response = { "responseType": "ERROR",
                                 "errorCode": 6,
                                 "errorMsg": "Store request not acknowledged." }
            else:
                response = { "responseType": "ERROR",
                             "errorCode": 7,
                             "errorMsg": "Unkown request type." }
    
            return response
    
        # to be removed from store_preprocessor.py
        # or simply add a chmod -x here, to be faster?
        def prepare(self, username):
            self.username = username
            self.path = "/home/" + username + "/store"
            for folder, subfolders, files in os.walk(self.path):
                os.chown(folder, 0, 0)
                os.chmod(folder, 0o555)
                for s in subfolders:
                    os.chown(os.path.join(folder, s), 0, 0)
                    os.chmod(os.path.join(folder, s), 0o555)
                for f in files:
                    os.chown(os.path.join(folder, f), 0, 0)
                    os.chmod(os.path.join(folder, f), 0o555)
    
        def userInfo(self, username):
            fp = open("/etc/passwd", 'r')
            for line in fp:
                info = line.split(':')
                user = info[0]
                uid = int(info[2])
                gid = int(info[3])
                if user == username:
                    return [ user, uid, gid ]
            return False
    
        def run(self):
            print(f"Starting AMQP server of type {self.type}...")
            super(StoreAMQPServer, self).run()