Skip to content
Snippets Groups Projects
Commit 2b1422fe authored by Marco De Marco's avatar Marco De Marco
Browse files

Protocol manager class added

parent c05fc9be
No related branches found
No related tags found
No related merge requests found
......@@ -24,7 +24,7 @@ DBManager::DBManager(Tango::DeviceImpl* deviceImpl_p,
//==============================================================================
DBManager::~DBManager()
{
DEBUG_STREAM << "DBManager::DBManager()" << endl;
DEBUG_STREAM << "DBManager::~DBManager()" << endl;
}
//==============================================================================
......@@ -46,22 +46,22 @@ void DBManager::connect() throw(soci::soci_error)
{
DEBUG_STREAM << "DBManager::connect()" << endl;
std::stringstream connection;
connection << " host=" << m_configuration_sp->getDatabaseHost();
connection << " port=" << m_configuration_sp->getDatabasePort();
connection << " user=" << m_configuration_sp->getDatabaseUsername();
connection << " password=" << m_configuration_sp->getDatabasePassword();
unsigned int connectionNumber = m_configuration_sp->getDatabaseConnectionNumber();
for(unsigned int i=0; i<connectionNumber; ++i)
{
m_connectionPool_sp->at(i).open(soci::mysql, connection.str());
#ifdef VERBOSE_DEBUG
INFO_STREAM << "CONNECTION: " << connection.str() << " -> OPEN" << endl;
#endif
}
// std::stringstream connection;
// connection << " host=" << m_configuration_sp->getDatabaseHost();
// connection << " port=" << m_configuration_sp->getDatabasePort();
// connection << " user=" << m_configuration_sp->getDatabaseUsername();
// connection << " password=" << m_configuration_sp->getDatabasePassword();
//
// unsigned int connectionNumber = m_configuration_sp->getDatabaseConnectionNumber();
//
// for(unsigned int i=0; i<connectionNumber; ++i)
// {
// m_connectionPool_sp->at(i).open(soci::mysql, connection.str());
//
// #ifdef VERBOSE_DEBUG
// INFO_STREAM << "CONNECTION: " << connection.str() << " -> OPEN" << endl;
// #endif
// }
}
//==============================================================================
......@@ -71,22 +71,22 @@ void DBManager::disconnect()
{
DEBUG_STREAM << "DBManager::disconnect()" << endl;
std::stringstream connection;
connection << " host=" << m_configuration_sp->getDatabaseHost();
connection << " port=" << m_configuration_sp->getDatabasePort();
connection << " user=" << m_configuration_sp->getDatabaseUsername();
connection << " password=" << m_configuration_sp->getDatabasePassword();
unsigned int connectionNumber = m_configuration_sp->getDatabaseConnectionNumber();
for(unsigned int i=0; i<connectionNumber; ++i)
{
m_connectionPool_sp->at(i).close();
#ifdef VERBOSE_DEBUG
INFO_STREAM << "CONNECTION: " << connection.str() << " -> CLOSE" << endl;
#endif
}
// std::stringstream connection;
// connection << " host=" << m_configuration_sp->getDatabaseHost();
// connection << " port=" << m_configuration_sp->getDatabasePort();
// connection << " user=" << m_configuration_sp->getDatabaseUsername();
// connection << " password=" << m_configuration_sp->getDatabasePassword();
//
// unsigned int connectionNumber = m_configuration_sp->getDatabaseConnectionNumber();
//
// for(unsigned int i=0; i<connectionNumber; ++i)
// {
// m_connectionPool_sp->at(i).close();
//
// #ifdef VERBOSE_DEBUG
// INFO_STREAM << "CONNECTION: " << connection.str() << " -> CLOSE" << endl;
// #endif
// }
}
} //namespace
/*
* File: DBManager.h
* Author: mdm
*
* Created on October 24, 2013, 2:57 PM
*/
#ifndef DBMANAGER_H
#define DBMANAGER_H
......@@ -13,6 +6,7 @@
#include <tango.h>
#include <boost/scoped_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include <soci/error.h>
#include <soci/session.h>
......@@ -69,6 +63,9 @@ protected:
//Configuration shared pointer
Configuration::SP m_configuration_sp;
//Connection pool mutex
boost::mutex m_connectionPoolMutex;
//Database connection pool scoped pointer
boost::scoped_ptr<soci::connection_pool> m_connectionPool_sp;
};
......
......@@ -194,6 +194,7 @@ void MetadataExporter::get_device_property()
//Authorised user map [user password]
std::map<const std::string, const std::string> authorisedUsersMap;
/*----- PROTECTED REGION END -----*/ // MetadataExporter::get_device_property_before
......@@ -654,8 +655,7 @@ void MetadataExporter::checkIfFileExists(std::string fileName)
if(!boost::filesystem::exists(path))
{
std::stringstream errorStream;
errorStream << "File " << fileName
<< " not exists" << std::endl;
errorStream << "File " << fileName << " not exists" << std::endl;
throw std::invalid_argument(errorStream.str());
}
......
......@@ -26,8 +26,6 @@ PlainSession::~PlainSession()
{
DEBUG_STREAM << "PlainSession::~PlainSession()" << endl;
INFO_STREAM << m_plainSocket.remote_endpoint() << " CONNECTED" << endl;
boost::system::error_code errorCode;
m_plainSocket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, errorCode);
......@@ -65,7 +63,7 @@ void PlainSession::start()
{
DEBUG_STREAM << "PlainSession::start()" << endl;
INFO_STREAM << m_plainSocket.remote_endpoint() << " DISCONNECTED" << endl;
INFO_STREAM << m_plainSocket.remote_endpoint() << " CONNECTED" << endl;
startReadRequestHeader();
}
......@@ -80,10 +78,8 @@ void PlainSession::startReadRequestHeader()
m_readBuff.resize(HEADER_SIZE);
boost::asio::async_read(m_plainSocket, boost::asio::buffer(m_readBuff),
m_strand.wrap(
boost::bind(
&PlainSession::handleReadRequestHeader, shared_from_this(),
boost::asio::placeholders::error)));
m_strand.wrap(boost::bind(&PlainSession::handleReadRequestHeader,
shared_from_this(), boost::asio::placeholders::error)));
}
//==============================================================================
......@@ -102,10 +98,8 @@ void PlainSession::startReadRequestBody(boost::uint32_t bodySize)
boost::asio::buffer(&m_readBuff[HEADER_SIZE], bodySize);
boost::asio::async_read(m_plainSocket, mutableBuffer,
m_strand.wrap(
boost::bind(
&PlainSession::handleReadRequestBody, shared_from_this(),
boost::asio::placeholders::error)));
m_strand.wrap(boost::bind(&PlainSession::handleReadRequestBody,
shared_from_this(), boost::asio::placeholders::error)));
}
//==============================================================================
......@@ -115,11 +109,13 @@ void PlainSession::startWriteResponse()
{
DEBUG_STREAM << "PlainSession::startWriteResponse()" << endl;
try
{
RequestSP request_sp(new Request);
request_sp->ParseFromArray(&m_readBuff[HEADER_SIZE], m_readBuff.size() - HEADER_SIZE);
ResponseSP response_sp = prepareResponse(request_sp);
ResponseSP response_sp = m_protocolManager_sp->prepareResponse(request_sp);
boost::uint32_t bodySize = response_sp->ByteSize();
......@@ -134,10 +130,17 @@ void PlainSession::startWriteResponse()
response_sp->SerializeToArray(&writeBuff[HEADER_SIZE], bodySize);
boost::asio::async_write(m_plainSocket, boost::asio::buffer(writeBuff),
m_strand.wrap(
boost::bind(
&PlainSession::handleWriteResponse, shared_from_this(),
boost::asio::placeholders::error)));
m_strand.wrap(boost::bind(&PlainSession::handleWriteResponse,
shared_from_this(), boost::asio::placeholders::error)));
}
catch(std::runtime_error& ec)
{
ERROR_STREAM << "SSLSession::startWriteResponse() " << ec.what() << endl;
}
catch(...)
{
ERROR_STREAM << "SSLSession::startWriteResponse() unknown error" << endl;
}
}
} //namespace
\ No newline at end of file
#include <ProtocolManager.h>
namespace MetadataExporter_ns
{
//==============================================================================
// ProtocolManager::ProtocolManager()
//==============================================================================
ProtocolManager::ProtocolManager(Tango::DeviceImpl* deviceImpl_p,
Configuration::SP configuration_sp, DBManager::SP dBManager_sp) :
Tango::LogAdapter(deviceImpl_p), m_configuration_sp(configuration_sp),
m_dBManager_sp(dBManager_sp)
{
DEBUG_STREAM << "ProtocolManager::ProtocolManager()" << endl;
m_isAuthenticated = false;
m_isValidated = false;
}
//==============================================================================
// ProtocolManager::ProtocolManager()
//==============================================================================
ProtocolManager::~ProtocolManager()
{
DEBUG_STREAM << "ProtocolManager::~ProtocolManager()" << endl;
}
//==============================================================================
// ProtocolManager::ProtocolManager()
//==============================================================================
ProtocolManager::SP ProtocolManager::create(Tango::DeviceImpl* deviceImpl_p,
Configuration::SP configuration_sp, DBManager::SP dBManager_sp)
{
ProtocolManager::SP d_sp(new ProtocolManager(deviceImpl_p, configuration_sp,
dBManager_sp), ProtocolManager::Deleter());
return d_sp;
}
//==============================================================================
// ProtocolManager::prepareResponse()
//==============================================================================
ResponseSP ProtocolManager::prepareResponse(RequestSP request_sp)
throw(std::runtime_error)
{
DEBUG_STREAM << "ProtocolManager::prepareResponse()" << endl;
ResponseSP response_sp;
switch(request_sp->type())
{
case Request::AUTHORIZATION:
{
response_sp = prepareAuthroisation(request_sp);
break;
}
case Request::VALIDATION:
{
response_sp = prepareValidation(request_sp);
break;
}
case Request::METADATA:
{
response_sp = prepareMetadata(request_sp);
break;
}
default:
throw std::runtime_error("Unknown request type!");
}
if(!response_sp->IsInitialized())
throw std::runtime_error("Not initialized response!");
return response_sp;
}
//==============================================================================
// ProtocolManager::prepareAuthroisation()
//==============================================================================
ResponseSP ProtocolManager::prepareAuthroisation(RequestSP request_sp)
throw(std::runtime_error)
{
DEBUG_STREAM << "ProtocolManager::prepareAuthroisation()" << endl;
ResponseSP response_sp(new Response());
response_sp->set_type(Response::AUTHORIZATION);
Response::Authorization* auth_resp = response_sp->mutable_authorization();
if(!m_isAuthenticated)
{
const Request::Authorization& auth_req = request_sp->authorization();
std::string username = auth_req.username();
std::string password = auth_req.password();
if(m_configuration_sp->isUserAuthorized(username, password))
{
INFO_STREAM << "ProtocolManager::prepareAuthroisation() "
<< "Authorization accepted" << endl;
m_isAuthenticated = true;
auth_resp->set_state(Response::Authorization::ACCEPTED);
auth_resp->set_status("Authorization accepted");
}
else
{
WARN_STREAM << "ProtocolManager::prepareAuthroisation() "
<< "Invalid username or password" << endl;
m_isAuthenticated = false;
auth_resp->set_state(Response::Authorization::REJECTED);
auth_resp->set_status("Invalid username or password");
}
}
else
{
WARN_STREAM << "ProtocolManager::prepareAuthroisation() "
<< "Already authorized" << endl;
auth_resp->set_state(Response::Authorization::REJECTED);
auth_resp->set_status("Already authorized");
}
return response_sp;
}
//==============================================================================
// ProtocolManager::prepareValidation()
//==============================================================================
ResponseSP ProtocolManager::prepareValidation(RequestSP request_sp)
throw(std::runtime_error)
{
DEBUG_STREAM << "ProtocolManager::prepareValidation()" << endl;
ResponseSP response_sp(new Response());
response_sp->set_type(Response::VALIDATION);
Response::Validation* validation = response_sp->mutable_validation();
if(!m_isValidated)
{
INFO_STREAM << "ProtocolManager::prepareValidation() "
<< "Validation accepted" << endl;
m_isValidated = true;
validation->set_state(Response::Validation::ACCEPTED);
validation->set_status("Validation accepted");
}
else
{
WARN_STREAM << "ProtocolManager::prepareValidation() "
<< "Already validated" << endl;
validation->set_state(Response::Validation::REJECTED);
validation->set_status("Already validated");
}
return response_sp;
}
//==============================================================================
// ProtocolManager::prepareMetadata()
//==============================================================================
ResponseSP ProtocolManager::prepareMetadata(RequestSP request_sp)
throw(std::runtime_error)
{
DEBUG_STREAM << "ProtocolManager::prepareMetadata()" << endl;
ResponseSP response_sp(new Response());
response_sp->set_type(Response::METADATA);
Response::Metadata* metadata = response_sp->mutable_metadata();
metadata->set_state(Response::Metadata::ACCEPTED);
metadata->set_status("Metadata ready");
metadata->set_partial(1);
metadata->set_total(1);
return response_sp;
}
} //namespace
#ifndef PROTOCOLMANAGER_H
#define PROTOCOLMANAGER_H
#include <Response.pb.h>
#include <Request.pb.h>
#include <Configuration.h>
#include <DBManager.h>
#include <tango.h>
namespace MetadataExporter_ns
{
//Protocol buffer request class shared pointer
typedef boost::shared_ptr<Request> RequestSP;
//Protocol buffer response class shared pointer
typedef boost::shared_ptr<Response> ResponseSP;
class ProtocolManager : public Tango::LogAdapter
{
public:
//------------------------------------------------------------------------------
// [Public] Shared pointer typedef
//------------------------------------------------------------------------------
typedef boost::shared_ptr<ProtocolManager> SP;
protected:
//------------------------------------------------------------------------------
// [Protected] Constructor destructor deleter
//------------------------------------------------------------------------------
ProtocolManager(Tango::DeviceImpl*, Configuration::SP, DBManager::SP);
virtual ~ProtocolManager();
class Deleter;
friend Deleter;
class Deleter
{
public:
void operator()(ProtocolManager* d) { delete d; }
};
public:
//------------------------------------------------------------------------------
// [Public] Class creation method
//------------------------------------------------------------------------------
static ProtocolManager::SP create(Tango::DeviceImpl*, Configuration::SP,
DBManager::SP);
//------------------------------------------------------------------------------
// [Public] Request response management method
//------------------------------------------------------------------------------
virtual ResponseSP prepareResponse(RequestSP)
throw(std::runtime_error);
protected:
//------------------------------------------------------------------------------
// [Protected] Request specific methods
//------------------------------------------------------------------------------
virtual ResponseSP prepareAuthroisation(RequestSP)
throw(std::runtime_error);
virtual ResponseSP prepareValidation(RequestSP)
throw(std::runtime_error);
virtual ResponseSP prepareMetadata(RequestSP)
throw(std::runtime_error);
//------------------------------------------------------------------------------
// [Protected] Class variables
//------------------------------------------------------------------------------
//Configuration parameters shared pointer
Configuration::SP m_configuration_sp;
//Database manger shared pointer
DBManager::SP m_dBManager_sp;
//Client is authenticated
bool m_isAuthenticated;
//Table structure is validated
bool m_isValidated;
};
} //End of namespace
#endif /* PROTOCOLMANAGER_H */
......@@ -24,9 +24,6 @@ SSLSession::~SSLSession()
{
DEBUG_STREAM << "SSLSession::~SSLSession()" << endl;
INFO_STREAM << m_sslSocket.lowest_layer().remote_endpoint()
<< " DISCONNECTED" << endl;
boost::system::error_code errorCode;
m_sslSocket.shutdown(errorCode);
......@@ -115,10 +112,8 @@ void SSLSession::startReadRequestHeader()
m_readBuff.resize(HEADER_SIZE);
boost::asio::async_read(m_sslSocket, boost::asio::buffer(m_readBuff),
m_strand.wrap(
boost::bind(
&SSLSession::handleReadRequestHeader, shared_from_this(),
boost::asio::placeholders::error)));
m_strand.wrap(boost::bind(&SSLSession::handleReadRequestHeader,
shared_from_this(), boost::asio::placeholders::error)));
}
//==============================================================================
......@@ -128,19 +123,17 @@ void SSLSession::startReadRequestBody(boost::uint32_t bodySize)
{
DEBUG_STREAM << "SSLSession::startReadRequestBody()" << endl;
INFO_STREAM << m_sslSocket.lowest_layer().remote_endpoint()
<< " >>>> " << bodySize << " BYTE" << endl;
m_readBuff.resize(HEADER_SIZE + bodySize);
boost::asio::mutable_buffers_1 mutableBuffer =
boost::asio::buffer(&m_readBuff[HEADER_SIZE], bodySize);
INFO_STREAM << m_sslSocket.lowest_layer().remote_endpoint()
<< " >>>> " << bodySize << " BYTE" << endl;
boost::asio::async_read(m_sslSocket, mutableBuffer,
m_strand.wrap(
boost::bind(
&SSLSession::handleReadRequestBody, shared_from_this(),
boost::asio::placeholders::error)));
m_strand.wrap(boost::bind(&SSLSession::handleReadRequestBody,
shared_from_this(), boost::asio::placeholders::error)));
}
//==============================================================================
......@@ -150,17 +143,17 @@ void SSLSession::startWriteResponse()
{
DEBUG_STREAM << "SSLSession::startWriteResponse()" << endl;
try
{
RequestSP request_sp(new Request);
request_sp->ParseFromArray(&m_readBuff[HEADER_SIZE], m_readBuff.size() - HEADER_SIZE);
request_sp->ParseFromArray(&m_readBuff[HEADER_SIZE],
m_readBuff.size() - HEADER_SIZE);
ResponseSP response_sp = prepareResponse(request_sp);
ResponseSP response_sp = m_protocolManager_sp->prepareResponse(request_sp);
boost::uint32_t bodySize = response_sp->ByteSize();
INFO_STREAM << m_sslSocket.lowest_layer().remote_endpoint()
<< " <<<< " << bodySize << " BYTE" << endl;
std::vector<boost::uint8_t> writeBuff;
writeBuff.resize(HEADER_SIZE + bodySize);
......@@ -168,11 +161,21 @@ void SSLSession::startWriteResponse()
response_sp->SerializeToArray(&writeBuff[HEADER_SIZE], bodySize);
INFO_STREAM << m_sslSocket.lowest_layer().remote_endpoint()
<< " <<<< " << bodySize << " BYTE" << endl;
boost::asio::async_write(m_sslSocket, boost::asio::buffer(writeBuff),
m_strand.wrap(
boost::bind(
&SSLSession::handleWriteResponse, shared_from_this(),
boost::asio::placeholders::error)));
m_strand.wrap(boost::bind(&SSLSession::handleWriteResponse,
shared_from_this(), boost::asio::placeholders::error)));
}
catch(std::runtime_error& ec)
{
ERROR_STREAM << "SSLSession::startWriteResponse() " << ec.what() << endl;
}
catch(...)
{
ERROR_STREAM << "SSLSession::startWriteResponse() unknown error" << endl;
}
}
} //namespace
......@@ -22,7 +22,7 @@ Server::Server(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_
m_dBManager_sp = DBManager::create(deviceImpl_p, configuration_sp);
m_ioService_sp.reset(new boost::asio::io_service);
m_work_sp.reset(new boost::asio::io_service::work(*m_ioService_sp));
m_acceptor_sp.reset(new boost::asio::ip::tcp::acceptor(*m_ioService_sp));
m_state = Tango::OFF;
......@@ -36,13 +36,16 @@ Server::~Server()
{
DEBUG_STREAM << "Server::~Server()" << endl;
m_acceptor_sp->close();
boost::system::error_code errorCode;
m_acceptor_sp->close(errorCode);
m_work_sp.reset();
m_ioService_sp->stop();
if(m_threadGroup_sp)
{
m_threadGroup_sp->interrupt_all();
//m_threadGroup_sp->interrupt_all();
m_threadGroup_sp->join_all();
}
......@@ -53,7 +56,7 @@ Server::~Server()
//==============================================================================
// Server::start()
//==============================================================================
void Server::start() //@todo: handle exceptions
void Server::start() throw(std::runtime_error)
{
DEBUG_STREAM << "Server::start()" << endl;
......@@ -61,6 +64,24 @@ void Server::start() //@todo: handle exceptions
m_ioService_sp->reset();
m_work_sp.reset(new boost::asio::io_service::work(*m_ioService_sp));
std::string localHost = m_configuration_sp->getLocalHost();
unsigned int localPort = m_configuration_sp->getLocalPort();
INFO_STREAM << "LISTENING ON " << localHost << ":" << localPort << endl;
boost::asio::ip::tcp::resolver::query query(localHost,
boost::lexical_cast<std::string>(localPort));
boost::asio::ip::tcp::resolver resolver(*m_ioService_sp);
boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
m_acceptor_sp->open(endpoint.protocol());
m_acceptor_sp->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
m_acceptor_sp->bind(endpoint);
m_acceptor_sp->listen();
m_threadGroup_sp.reset(new boost::thread_group);
unsigned int workerNumber = m_configuration_sp->getWorkerNumber();
......@@ -70,29 +91,32 @@ void Server::start() //@todo: handle exceptions
for(unsigned int i=0; i<workerNumber; ++i)
m_threadGroup_sp->add_thread(new boost::thread(&WorkerThread::run, worker));
startListen();
startAccept();
}
//==============================================================================
// Server::stop()
//==============================================================================
void Server::stop() //@todo: handle exceptions
void Server::stop() throw(std::runtime_error)
{
DEBUG_STREAM << "Server::stop()" << endl;
m_acceptor_sp->close();
boost::system::error_code errorCode;
m_acceptor_sp->close(errorCode);
m_work_sp.reset();
m_ioService_sp->stop();
if(m_threadGroup_sp)
{
m_threadGroup_sp->interrupt_all();
//m_threadGroup_sp->interrupt_all();
m_threadGroup_sp->join_all();
}
m_threadGroup_sp.reset();
m_dBManager_sp->disconnect();
}
//==============================================================================
......@@ -120,27 +144,27 @@ std::string Server::readStatus()
}
//==============================================================================
// Server::readStatus()
// Server::writeState()
//==============================================================================
void Server::startListen() //@todo: handle exceptions
void Server::writeState(Tango::DevState state)
{
DEBUG_STREAM << "Server::startListen()" << endl;
DEBUG_STREAM << "Server::writeState()" << endl;
std::string localHost = m_configuration_sp->getLocalHost();
unsigned int localPort = m_configuration_sp->getLocalPort();
boost::mutex::scoped_lock stateLock(m_stateMutex);
INFO_STREAM << "LISTENING ON " << localHost << ":" << localPort << endl;
m_state = state;
}
boost::asio::ip::tcp::resolver::query query(localHost,
boost::lexical_cast<std::string>(localPort));
//==============================================================================
// Server::writeStatus()
//==============================================================================
void Server::writeStatus(std::string status)
{
DEBUG_STREAM << "Server::writeStatus()" << endl;
boost::asio::ip::tcp::resolver resolver(*m_ioService_sp);
boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
boost::mutex::scoped_lock statusLock(m_stateMutex);
m_acceptor_sp->open(endpoint.protocol());
m_acceptor_sp->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
m_acceptor_sp->bind(endpoint);
m_acceptor_sp->listen();
m_status = status;
}
//==============================================================================
......@@ -152,9 +176,13 @@ void Server::handleAccept(Session::SP session_sp,
DEBUG_STREAM << "Server::handleAccept()" << endl;
if(!ec)
{
session_sp->start();
}
else
{
WARN_STREAM << "Server::handleAccept() " << ec.message() << endl;
}
startAccept();
}
......
......@@ -34,12 +34,12 @@ public:
//------------------------------------------------------------------------------
// [Public] Thread management methods
//------------------------------------------------------------------------------
virtual void start();
virtual void start() throw(std::runtime_error);
virtual void stop();
virtual void stop() throw(std::runtime_error);
//------------------------------------------------------------------------------
// [Public] State status methods
// [Public] Read state and status methods
//------------------------------------------------------------------------------
virtual Tango::DevState readState();
......@@ -47,10 +47,15 @@ public:
protected:
//------------------------------------------------------------------------------
// [Protected] Incoming connection methods
// [Protected] Write state and status methods
//------------------------------------------------------------------------------
virtual void startListen();
virtual void writeState(Tango::DevState);
virtual void writeStatus(std::string);
//------------------------------------------------------------------------------
// [Protected] Incoming connection methods
//------------------------------------------------------------------------------
virtual void startAccept() = 0;
virtual void handleAccept(Session::SP, const boost::system::error_code&);
......
......@@ -9,9 +9,12 @@ namespace MetadataExporter_ns
Session::Session(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp,
DBManager::SP dBManager_sp, boost::shared_ptr<boost::asio::io_service> ioService_sp) :
Tango::LogAdapter(deviceImpl_p), m_configuration_sp(configuration_sp),
m_dBManager_sp(dBManager_sp), m_strand(*ioService_sp)
m_strand(*ioService_sp)
{
DEBUG_STREAM << "Session::Session()" << endl;
m_protocolManager_sp = ProtocolManager::create(deviceImpl_p,
configuration_sp, dBManager_sp);
}
//==============================================================================
......@@ -87,57 +90,6 @@ void Session::handleWriteResponse(const boost::system::error_code& errorCode)
}
}
//==============================================================================
// Session::prepareResponse()
//==============================================================================
Session::ResponseSP Session::prepareResponse(Session::RequestSP request)
throw(std::runtime_error)
{
DEBUG_STREAM << "Session::prepareResponse()" << endl;
ResponseSP response_sp(new Response());
switch(request->type())
{
case Request::AUTHORIZATION:
{
INFO_STREAM << "Session::prepareResponse() AUTHORIZATION REQUEST" << endl;
const Request::Authorization& auth_req = request->authorization();
INFO_STREAM << "USERNAME " << auth_req.username() << endl;
INFO_STREAM << "PASSWORD " << auth_req.password() << endl;
response_sp->set_type(Response::AUTHORIZATION);
Response::Authorization* auth_resp = response_sp->mutable_authorization();
auth_resp->set_state(Response::Authorization::ACCEPTED);
auth_resp->set_status("Authorization accepted");
break;
}
case Request::VALIDATION:
{
INFO_STREAM << "Session::prepareResponse() VALIDATION REQUEST" << endl;
response_sp->set_type(Response::VALIDATION);
break;
}
case Request::METADATA:
{
INFO_STREAM << "Session::prepareResponse() METADATA REQUEST" << endl;
response_sp->set_type(Response::METADATA);
break;
}
default:
throw std::runtime_error("Unknown request type!");
}
return response_sp;
}
//==============================================================================
// Session::encodeHeader()
//==============================================================================
......
#ifndef SESSION_H
#define SESSION_H
#include <Response.pb.h>
#include <Request.pb.h>
#include <Configuration.h>
#include <DBManager.h>
#include <ProtocolManager.h>
#include <Response.pb.h>
#include <Request.pb.h>
#include <tango.h>
......@@ -23,12 +24,6 @@ public:
//------------------------------------------------------------------------------
typedef boost::shared_ptr<Session> SP;
//------------------------------------------------------------------------------
// [Public] Request Response shared pointer typedef
//------------------------------------------------------------------------------
typedef boost::shared_ptr<Request> RequestSP;
typedef boost::shared_ptr<Response> ResponseSP;
protected:
//------------------------------------------------------------------------------
// [Protected] Constructor destructor deleter
......@@ -68,12 +63,6 @@ protected:
virtual void handleWriteResponse(const boost::system::error_code&);
//------------------------------------------------------------------------------
// [Protected] Response method
//------------------------------------------------------------------------------
virtual ResponseSP prepareResponse(RequestSP)
throw(std::runtime_error);
//------------------------------------------------------------------------------
// [Protected] Encode decode header methods
//------------------------------------------------------------------------------
......@@ -89,8 +78,8 @@ protected:
//Configuration parameters shared pointer
Configuration::SP m_configuration_sp;
//Database manger shared pointer
DBManager::SP m_dBManager_sp;
//Protocol manager class shared pointer
ProtocolManager::SP m_protocolManager_sp;
//Synchronization mechanism
boost::asio::io_service::strand m_strand;
......
......@@ -27,7 +27,7 @@ WorkerThread::~WorkerThread()
//==============================================================================
void WorkerThread::run()
{
DEBUG_STREAM << "WorkerThread::run()" << endl;
INFO_STREAM << "WorkerThread::run() STARTING" << endl;
while(true)
{
......@@ -53,7 +53,7 @@ void WorkerThread::run()
}
} //while
DEBUG_STREAM << "WorkerThread::run() exiting" << endl;
INFO_STREAM << "WorkerThread::run() STOPPING" << endl;
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment