Select Git revision
store_amqp_server.py
store_amqp_server.py 4.73 KiB
# TODO:
# - error codes and status codes list and description
#
#
#
import os
import sys
import json
#from enum import Enum
from amqp_server import AMQPServer
from job import Job
from job_cache import JobCache
class StoreAMQPServer(AMQPServer):
def __init__(self, host, queue):
self.type = "store"
self.storeAck = False
self.jobCache = JobCache('redis', 6379, 1)
self.job = None
self.username = None
self.path = None
super(StoreAMQPServer, self).__init__(host, queue)
def execute_callback(self, requestBody):
# 'requestType' and 'userName' attributes are mandatory
if "requestType" not in requestBody or "userName" not in requestBody:
response = { "errorCode": 1, "errorMsg": "Malformed request, missing parameters." }
elif requestBody["requestType"] == "CSTORE" or requestBody["requestType"] == "HSTORE":
self.job = Job()
self.job.setInfo(requestBody)
self.job.setPhase("PENDING")
user = requestBody["userName"]
folderPath = "/home/" + user + "/store"
userInfo = self.userInfo(user)
# Check if the user exists on the transfer node
if not userInfo:
response = { "responseType": "ERROR",
"errorCode": 2,
"errorMsg": "The user does not exist on the transfer node." }
else:
uid = os.stat(folderPath).st_uid
gid = os.stat(folderPath).st_gid
# Check if uid and gid match and avoid privilege escalation
if uid == userInfo[1] and gid == userInfo[2] and uid != 0 and gid != 0:
# If write permissions are set and the 'store' folder is not empty,
# it means that data is ready to be copied, otherwise, nothing can
# be done until the write permissions are restored or new data is
# copied on the transfer node by the user.
if os.access(folderPath, os.W_OK) and os.listdir(folderPath):
response = { "responseType": "STORE_ACK" }
self.storeAck = True
else:
response = { "responseType": "ERROR",
"errorCode": 3,
"errorMsg": "Service busy." }
else:
response = { "responseType": "ERROR",
"errorCode": 4,
"errorMsg": "Permission denied." }
elif requestBody["requestType"] == "STORE_CON":
if self.storeAck:
self.storeAck = False
user = requestBody["userName"]
self.prepare(user)
self.jobCache.set(self.job)
redisResponse = self.jobCache.get(self.job.jobId)
if "error" in redisResponse:
response = { "responseType": "ERROR",
"errorCode": 5,
"errorMsg": "Job creation failed." }
else:
response = { "responseType": "STORE_RUN",
"jobId": self.job.jobId }
else:
response = { "responseType": "ERROR",
"errorCode": 6,
"errorMsg": "Store request not acknowledged." }
else:
response = { "responseType": "ERROR",
"errorCode": 7,
"errorMsg": "Unkown request type." }
return response
# to be removed from store_preprocessor.py
# or simply add a chmod -x here, to be faster?
def prepare(self, username):
self.username = username
self.path = "/home/" + username + "/store"
for folder, subfolders, files in os.walk(self.path):
os.chown(folder, 0, 0)
os.chmod(folder, 0o555)
for s in subfolders:
os.chown(os.path.join(folder, s), 0, 0)
os.chmod(os.path.join(folder, s), 0o555)
for f in files:
os.chown(os.path.join(folder, f), 0, 0)
os.chmod(os.path.join(folder, f), 0o555)
def userInfo(self, username):
fp = open("/etc/passwd", 'r')
for line in fp:
info = line.split(':')
user = info[0]
uid = int(info[2])
gid = int(info[3])
if user == username:
return [ user, uid, gid ]
return False
def run(self):
print(f"Starting AMQP server of type {self.type}...")
super(StoreAMQPServer, self).run()