Skip to content
Snippets Groups Projects
Select Git revision
  • 9f355157144ee1130cfccd34052ed18d683abf74
  • master default protected
  • v0.2.1-devel
  • v0.2.1
4 results

LdapConfig.test.properties

Blame
  • get_job_amqp_server.py 680 B
    from amqp_server import AMQPServer
    from job_cache import JobCache
    
    class GetJobAMQPServer(AMQPServer):
      
        def __init__(self, host, queue):
            self.type = "poll"
            self.jobCache = JobCache('redis', 6379, 0)
            super(GetJobAMQPServer, self).__init__(host, queue)      
    
        def execute_callback(self, requestBody):
            if "jobId" in requestBody:
                redis_res = self.jobCache.get(requestBody["jobId"])
                print(f"Redis response: {redis_res}")
                return redis_res
            else:
                return 42
          
        def run(self):
            print(f"Starting AMQP server of type {self.type}...")
            super(GetJobAMQPServer, self).run()