diff --git a/src/DBManager.cpp b/src/DBManager.cpp index e0c2efa8ede9b428bdd467fae033f40e16bd08c8..e8f9dc3acf6d64ee77e7f4be8d0e565ba0add7ab 100644 --- a/src/DBManager.cpp +++ b/src/DBManager.cpp @@ -24,7 +24,7 @@ DBManager::DBManager(Tango::DeviceImpl* deviceImpl_p, //============================================================================== DBManager::~DBManager() { - DEBUG_STREAM << "DBManager::DBManager()" << endl; + DEBUG_STREAM << "DBManager::~DBManager()" << endl; } //============================================================================== @@ -46,22 +46,22 @@ void DBManager::connect() throw(soci::soci_error) { DEBUG_STREAM << "DBManager::connect()" << endl; - std::stringstream connection; - connection << " host=" << m_configuration_sp->getDatabaseHost(); - connection << " port=" << m_configuration_sp->getDatabasePort(); - connection << " user=" << m_configuration_sp->getDatabaseUsername(); - connection << " password=" << m_configuration_sp->getDatabasePassword(); - - unsigned int connectionNumber = m_configuration_sp->getDatabaseConnectionNumber(); - - for(unsigned int i=0; i<connectionNumber; ++i) - { - m_connectionPool_sp->at(i).open(soci::mysql, connection.str()); - - #ifdef VERBOSE_DEBUG - INFO_STREAM << "CONNECTION: " << connection.str() << " -> OPEN" << endl; - #endif - } +// std::stringstream connection; +// connection << " host=" << m_configuration_sp->getDatabaseHost(); +// connection << " port=" << m_configuration_sp->getDatabasePort(); +// connection << " user=" << m_configuration_sp->getDatabaseUsername(); +// connection << " password=" << m_configuration_sp->getDatabasePassword(); +// +// unsigned int connectionNumber = m_configuration_sp->getDatabaseConnectionNumber(); +// +// for(unsigned int i=0; i<connectionNumber; ++i) +// { +// m_connectionPool_sp->at(i).open(soci::mysql, connection.str()); +// +// #ifdef VERBOSE_DEBUG +// INFO_STREAM << "CONNECTION: " << connection.str() << " -> OPEN" << endl; +// #endif +// } } //============================================================================== @@ -71,22 +71,22 @@ void DBManager::disconnect() { DEBUG_STREAM << "DBManager::disconnect()" << endl; - std::stringstream connection; - connection << " host=" << m_configuration_sp->getDatabaseHost(); - connection << " port=" << m_configuration_sp->getDatabasePort(); - connection << " user=" << m_configuration_sp->getDatabaseUsername(); - connection << " password=" << m_configuration_sp->getDatabasePassword(); - - unsigned int connectionNumber = m_configuration_sp->getDatabaseConnectionNumber(); - - for(unsigned int i=0; i<connectionNumber; ++i) - { - m_connectionPool_sp->at(i).close(); - - #ifdef VERBOSE_DEBUG - INFO_STREAM << "CONNECTION: " << connection.str() << " -> CLOSE" << endl; - #endif - } +// std::stringstream connection; +// connection << " host=" << m_configuration_sp->getDatabaseHost(); +// connection << " port=" << m_configuration_sp->getDatabasePort(); +// connection << " user=" << m_configuration_sp->getDatabaseUsername(); +// connection << " password=" << m_configuration_sp->getDatabasePassword(); +// +// unsigned int connectionNumber = m_configuration_sp->getDatabaseConnectionNumber(); +// +// for(unsigned int i=0; i<connectionNumber; ++i) +// { +// m_connectionPool_sp->at(i).close(); +// +// #ifdef VERBOSE_DEBUG +// INFO_STREAM << "CONNECTION: " << connection.str() << " -> CLOSE" << endl; +// #endif +// } } } //namespace diff --git a/src/DBManager.h b/src/DBManager.h index 1c5e6fa65d147de76f224cc0a78378af515b5ba1..c0c3ce4d4a929ea133392f5857c4a3f8a7f3497e 100644 --- a/src/DBManager.h +++ b/src/DBManager.h @@ -1,10 +1,3 @@ -/* - * File: DBManager.h - * Author: mdm - * - * Created on October 24, 2013, 2:57 PM - */ - #ifndef DBMANAGER_H #define DBMANAGER_H @@ -13,6 +6,7 @@ #include <tango.h> #include <boost/scoped_ptr.hpp> +#include <boost/thread/mutex.hpp> #include <soci/error.h> #include <soci/session.h> @@ -69,6 +63,9 @@ protected: //Configuration shared pointer Configuration::SP m_configuration_sp; + //Connection pool mutex + boost::mutex m_connectionPoolMutex; + //Database connection pool scoped pointer boost::scoped_ptr<soci::connection_pool> m_connectionPool_sp; }; diff --git a/src/MetadataExporter.cpp b/src/MetadataExporter.cpp index 220b854befa2ac382252647b4b6bc4a383965543..b64371500739fb1938f1d5f601c7a5831ac750ca 100644 --- a/src/MetadataExporter.cpp +++ b/src/MetadataExporter.cpp @@ -194,6 +194,7 @@ void MetadataExporter::get_device_property() //Authorised user map [user password] std::map<const std::string, const std::string> authorisedUsersMap; + /*----- PROTECTED REGION END -----*/ // MetadataExporter::get_device_property_before @@ -654,8 +655,7 @@ void MetadataExporter::checkIfFileExists(std::string fileName) if(!boost::filesystem::exists(path)) { std::stringstream errorStream; - errorStream << "File " << fileName - << " not exists" << std::endl; + errorStream << "File " << fileName << " not exists" << std::endl; throw std::invalid_argument(errorStream.str()); } diff --git a/src/PlainSession.cpp b/src/PlainSession.cpp index d1506ea8a6970807c8a52128c035bfb1fb16a12c..71157c68be27cc8000522801d8b1e79267844a5e 100644 --- a/src/PlainSession.cpp +++ b/src/PlainSession.cpp @@ -26,8 +26,6 @@ PlainSession::~PlainSession() { DEBUG_STREAM << "PlainSession::~PlainSession()" << endl; - INFO_STREAM << m_plainSocket.remote_endpoint() << " CONNECTED" << endl; - boost::system::error_code errorCode; m_plainSocket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, errorCode); @@ -65,7 +63,7 @@ void PlainSession::start() { DEBUG_STREAM << "PlainSession::start()" << endl; - INFO_STREAM << m_plainSocket.remote_endpoint() << " DISCONNECTED" << endl; + INFO_STREAM << m_plainSocket.remote_endpoint() << " CONNECTED" << endl; startReadRequestHeader(); } @@ -80,10 +78,8 @@ void PlainSession::startReadRequestHeader() m_readBuff.resize(HEADER_SIZE); boost::asio::async_read(m_plainSocket, boost::asio::buffer(m_readBuff), - m_strand.wrap( - boost::bind( - &PlainSession::handleReadRequestHeader, shared_from_this(), - boost::asio::placeholders::error))); + m_strand.wrap(boost::bind(&PlainSession::handleReadRequestHeader, + shared_from_this(), boost::asio::placeholders::error))); } //============================================================================== @@ -102,10 +98,8 @@ void PlainSession::startReadRequestBody(boost::uint32_t bodySize) boost::asio::buffer(&m_readBuff[HEADER_SIZE], bodySize); boost::asio::async_read(m_plainSocket, mutableBuffer, - m_strand.wrap( - boost::bind( - &PlainSession::handleReadRequestBody, shared_from_this(), - boost::asio::placeholders::error))); + m_strand.wrap(boost::bind(&PlainSession::handleReadRequestBody, + shared_from_this(), boost::asio::placeholders::error))); } //============================================================================== @@ -115,29 +109,38 @@ void PlainSession::startWriteResponse() { DEBUG_STREAM << "PlainSession::startWriteResponse()" << endl; - RequestSP request_sp(new Request); + try + { + RequestSP request_sp(new Request); - request_sp->ParseFromArray(&m_readBuff[HEADER_SIZE], m_readBuff.size() - HEADER_SIZE); + request_sp->ParseFromArray(&m_readBuff[HEADER_SIZE], m_readBuff.size() - HEADER_SIZE); - ResponseSP response_sp = prepareResponse(request_sp); + ResponseSP response_sp = m_protocolManager_sp->prepareResponse(request_sp); - boost::uint32_t bodySize = response_sp->ByteSize(); + boost::uint32_t bodySize = response_sp->ByteSize(); - INFO_STREAM << m_plainSocket.remote_endpoint() - << " <<<< " << bodySize << " BYTE" << endl; + INFO_STREAM << m_plainSocket.remote_endpoint() + << " <<<< " << bodySize << " BYTE" << endl; - std::vector<boost::uint8_t> writeBuff; - writeBuff.resize(HEADER_SIZE + bodySize); + std::vector<boost::uint8_t> writeBuff; + writeBuff.resize(HEADER_SIZE + bodySize); - encodeHeader(writeBuff, bodySize); + encodeHeader(writeBuff, bodySize); - response_sp->SerializeToArray(&writeBuff[HEADER_SIZE], bodySize); + response_sp->SerializeToArray(&writeBuff[HEADER_SIZE], bodySize); - boost::asio::async_write(m_plainSocket, boost::asio::buffer(writeBuff), - m_strand.wrap( - boost::bind( - &PlainSession::handleWriteResponse, shared_from_this(), - boost::asio::placeholders::error))); + boost::asio::async_write(m_plainSocket, boost::asio::buffer(writeBuff), + m_strand.wrap(boost::bind(&PlainSession::handleWriteResponse, + shared_from_this(), boost::asio::placeholders::error))); + } + catch(std::runtime_error& ec) + { + ERROR_STREAM << "SSLSession::startWriteResponse() " << ec.what() << endl; + } + catch(...) + { + ERROR_STREAM << "SSLSession::startWriteResponse() unknown error" << endl; + } } } //namespace \ No newline at end of file diff --git a/src/ProtocolManager.cpp b/src/ProtocolManager.cpp new file mode 100644 index 0000000000000000000000000000000000000000..620286a406dce4eb31e355be482d9da97a31ef84 --- /dev/null +++ b/src/ProtocolManager.cpp @@ -0,0 +1,188 @@ +#include <ProtocolManager.h> + +namespace MetadataExporter_ns +{ + +//============================================================================== +// ProtocolManager::ProtocolManager() +//============================================================================== +ProtocolManager::ProtocolManager(Tango::DeviceImpl* deviceImpl_p, + Configuration::SP configuration_sp, DBManager::SP dBManager_sp) : + Tango::LogAdapter(deviceImpl_p), m_configuration_sp(configuration_sp), + m_dBManager_sp(dBManager_sp) +{ + DEBUG_STREAM << "ProtocolManager::ProtocolManager()" << endl; + + m_isAuthenticated = false; + m_isValidated = false; +} + +//============================================================================== +// ProtocolManager::ProtocolManager() +//============================================================================== +ProtocolManager::~ProtocolManager() +{ + DEBUG_STREAM << "ProtocolManager::~ProtocolManager()" << endl; +} + +//============================================================================== +// ProtocolManager::ProtocolManager() +//============================================================================== +ProtocolManager::SP ProtocolManager::create(Tango::DeviceImpl* deviceImpl_p, + Configuration::SP configuration_sp, DBManager::SP dBManager_sp) +{ + ProtocolManager::SP d_sp(new ProtocolManager(deviceImpl_p, configuration_sp, + dBManager_sp), ProtocolManager::Deleter()); + + return d_sp; +} + +//============================================================================== +// ProtocolManager::prepareResponse() +//============================================================================== +ResponseSP ProtocolManager::prepareResponse(RequestSP request_sp) + throw(std::runtime_error) +{ + DEBUG_STREAM << "ProtocolManager::prepareResponse()" << endl; + + ResponseSP response_sp; + + switch(request_sp->type()) + { + case Request::AUTHORIZATION: + { + response_sp = prepareAuthroisation(request_sp); + break; + } + case Request::VALIDATION: + { + response_sp = prepareValidation(request_sp); + break; + } + case Request::METADATA: + { + response_sp = prepareMetadata(request_sp); + break; + } + default: + throw std::runtime_error("Unknown request type!"); + } + + if(!response_sp->IsInitialized()) + throw std::runtime_error("Not initialized response!"); + + return response_sp; +} + +//============================================================================== +// ProtocolManager::prepareAuthroisation() +//============================================================================== +ResponseSP ProtocolManager::prepareAuthroisation(RequestSP request_sp) + throw(std::runtime_error) +{ + DEBUG_STREAM << "ProtocolManager::prepareAuthroisation()" << endl; + + ResponseSP response_sp(new Response()); + + response_sp->set_type(Response::AUTHORIZATION); + + Response::Authorization* auth_resp = response_sp->mutable_authorization(); + + if(!m_isAuthenticated) + { + const Request::Authorization& auth_req = request_sp->authorization(); + std::string username = auth_req.username(); + std::string password = auth_req.password(); + + if(m_configuration_sp->isUserAuthorized(username, password)) + { + INFO_STREAM << "ProtocolManager::prepareAuthroisation() " + << "Authorization accepted" << endl; + + m_isAuthenticated = true; + + auth_resp->set_state(Response::Authorization::ACCEPTED); + auth_resp->set_status("Authorization accepted"); + } + else + { + WARN_STREAM << "ProtocolManager::prepareAuthroisation() " + << "Invalid username or password" << endl; + + m_isAuthenticated = false; + + auth_resp->set_state(Response::Authorization::REJECTED); + auth_resp->set_status("Invalid username or password"); + } + } + else + { + WARN_STREAM << "ProtocolManager::prepareAuthroisation() " + << "Already authorized" << endl; + + auth_resp->set_state(Response::Authorization::REJECTED); + auth_resp->set_status("Already authorized"); + } + + return response_sp; +} + +//============================================================================== +// ProtocolManager::prepareValidation() +//============================================================================== +ResponseSP ProtocolManager::prepareValidation(RequestSP request_sp) + throw(std::runtime_error) +{ + DEBUG_STREAM << "ProtocolManager::prepareValidation()" << endl; + + ResponseSP response_sp(new Response()); + + response_sp->set_type(Response::VALIDATION); + + Response::Validation* validation = response_sp->mutable_validation(); + + if(!m_isValidated) + { + INFO_STREAM << "ProtocolManager::prepareValidation() " + << "Validation accepted" << endl; + + m_isValidated = true; + + validation->set_state(Response::Validation::ACCEPTED); + validation->set_status("Validation accepted"); + } + else + { + WARN_STREAM << "ProtocolManager::prepareValidation() " + << "Already validated" << endl; + + validation->set_state(Response::Validation::REJECTED); + validation->set_status("Already validated"); + } + + return response_sp; +} + +//============================================================================== +// ProtocolManager::prepareMetadata() +//============================================================================== +ResponseSP ProtocolManager::prepareMetadata(RequestSP request_sp) + throw(std::runtime_error) +{ + DEBUG_STREAM << "ProtocolManager::prepareMetadata()" << endl; + + ResponseSP response_sp(new Response()); + + response_sp->set_type(Response::METADATA); + + Response::Metadata* metadata = response_sp->mutable_metadata(); + + metadata->set_state(Response::Metadata::ACCEPTED); + metadata->set_status("Metadata ready"); + metadata->set_partial(1); + metadata->set_total(1); + + return response_sp; +} + +} //namespace diff --git a/src/ProtocolManager.h b/src/ProtocolManager.h new file mode 100644 index 0000000000000000000000000000000000000000..abe99fb3e8022fa1e11d078ece421338ebc5cda3 --- /dev/null +++ b/src/ProtocolManager.h @@ -0,0 +1,89 @@ +#ifndef PROTOCOLMANAGER_H +#define PROTOCOLMANAGER_H + +#include <Response.pb.h> +#include <Request.pb.h> +#include <Configuration.h> +#include <DBManager.h> + +#include <tango.h> + +namespace MetadataExporter_ns +{ + +//Protocol buffer request class shared pointer +typedef boost::shared_ptr<Request> RequestSP; + +//Protocol buffer response class shared pointer +typedef boost::shared_ptr<Response> ResponseSP; + +class ProtocolManager : public Tango::LogAdapter +{ +public: +//------------------------------------------------------------------------------ +// [Public] Shared pointer typedef +//------------------------------------------------------------------------------ + typedef boost::shared_ptr<ProtocolManager> SP; + +protected: +//------------------------------------------------------------------------------ +// [Protected] Constructor destructor deleter +//------------------------------------------------------------------------------ + ProtocolManager(Tango::DeviceImpl*, Configuration::SP, DBManager::SP); + + virtual ~ProtocolManager(); + + class Deleter; + friend Deleter; + class Deleter + { + public: + void operator()(ProtocolManager* d) { delete d; } + }; + +public: +//------------------------------------------------------------------------------ +// [Public] Class creation method +//------------------------------------------------------------------------------ + static ProtocolManager::SP create(Tango::DeviceImpl*, Configuration::SP, + DBManager::SP); + +//------------------------------------------------------------------------------ +// [Public] Request response management method +//------------------------------------------------------------------------------ + virtual ResponseSP prepareResponse(RequestSP) + throw(std::runtime_error); + +protected: +//------------------------------------------------------------------------------ +// [Protected] Request specific methods +//------------------------------------------------------------------------------ + virtual ResponseSP prepareAuthroisation(RequestSP) + throw(std::runtime_error); + + virtual ResponseSP prepareValidation(RequestSP) + throw(std::runtime_error); + + virtual ResponseSP prepareMetadata(RequestSP) + throw(std::runtime_error); + +//------------------------------------------------------------------------------ +// [Protected] Class variables +//------------------------------------------------------------------------------ + //Configuration parameters shared pointer + Configuration::SP m_configuration_sp; + + //Database manger shared pointer + DBManager::SP m_dBManager_sp; + + //Client is authenticated + bool m_isAuthenticated; + + //Table structure is validated + bool m_isValidated; +}; + +} //End of namespace + +#endif /* PROTOCOLMANAGER_H */ + diff --git a/src/SSLSession.cpp b/src/SSLSession.cpp index 9ce78e3c496325a783ac7e071fa936b0d5576fe5..7feffa5e4f105c423d75e293f2c8a049aeaa9b52 100644 --- a/src/SSLSession.cpp +++ b/src/SSLSession.cpp @@ -24,9 +24,6 @@ SSLSession::~SSLSession() { DEBUG_STREAM << "SSLSession::~SSLSession()" << endl; - INFO_STREAM << m_sslSocket.lowest_layer().remote_endpoint() - << " DISCONNECTED" << endl; - boost::system::error_code errorCode; m_sslSocket.shutdown(errorCode); @@ -115,10 +112,8 @@ void SSLSession::startReadRequestHeader() m_readBuff.resize(HEADER_SIZE); boost::asio::async_read(m_sslSocket, boost::asio::buffer(m_readBuff), - m_strand.wrap( - boost::bind( - &SSLSession::handleReadRequestHeader, shared_from_this(), - boost::asio::placeholders::error))); + m_strand.wrap(boost::bind(&SSLSession::handleReadRequestHeader, + shared_from_this(), boost::asio::placeholders::error))); } //============================================================================== @@ -128,19 +123,17 @@ void SSLSession::startReadRequestBody(boost::uint32_t bodySize) { DEBUG_STREAM << "SSLSession::startReadRequestBody()" << endl; - INFO_STREAM << m_sslSocket.lowest_layer().remote_endpoint() - << " >>>> " << bodySize << " BYTE" << endl; - m_readBuff.resize(HEADER_SIZE + bodySize); boost::asio::mutable_buffers_1 mutableBuffer = boost::asio::buffer(&m_readBuff[HEADER_SIZE], bodySize); + INFO_STREAM << m_sslSocket.lowest_layer().remote_endpoint() + << " >>>> " << bodySize << " BYTE" << endl; + boost::asio::async_read(m_sslSocket, mutableBuffer, - m_strand.wrap( - boost::bind( - &SSLSession::handleReadRequestBody, shared_from_this(), - boost::asio::placeholders::error))); + m_strand.wrap(boost::bind(&SSLSession::handleReadRequestBody, + shared_from_this(), boost::asio::placeholders::error))); } //============================================================================== @@ -150,29 +143,39 @@ void SSLSession::startWriteResponse() { DEBUG_STREAM << "SSLSession::startWriteResponse()" << endl; - RequestSP request_sp(new Request); + try + { + RequestSP request_sp(new Request); - request_sp->ParseFromArray(&m_readBuff[HEADER_SIZE], m_readBuff.size() - HEADER_SIZE); + request_sp->ParseFromArray(&m_readBuff[HEADER_SIZE], + m_readBuff.size() - HEADER_SIZE); - ResponseSP response_sp = prepareResponse(request_sp); + ResponseSP response_sp = m_protocolManager_sp->prepareResponse(request_sp); - boost::uint32_t bodySize = response_sp->ByteSize(); + boost::uint32_t bodySize = response_sp->ByteSize(); - INFO_STREAM << m_sslSocket.lowest_layer().remote_endpoint() - << " <<<< " << bodySize << " BYTE" << endl; + std::vector<boost::uint8_t> writeBuff; + writeBuff.resize(HEADER_SIZE + bodySize); - std::vector<boost::uint8_t> writeBuff; - writeBuff.resize(HEADER_SIZE + bodySize); + encodeHeader(writeBuff, bodySize); - encodeHeader(writeBuff, bodySize); + response_sp->SerializeToArray(&writeBuff[HEADER_SIZE], bodySize); - response_sp->SerializeToArray(&writeBuff[HEADER_SIZE], bodySize); + INFO_STREAM << m_sslSocket.lowest_layer().remote_endpoint() + << " <<<< " << bodySize << " BYTE" << endl; - boost::asio::async_write(m_sslSocket, boost::asio::buffer(writeBuff), - m_strand.wrap( - boost::bind( - &SSLSession::handleWriteResponse, shared_from_this(), - boost::asio::placeholders::error))); + boost::asio::async_write(m_sslSocket, boost::asio::buffer(writeBuff), + m_strand.wrap(boost::bind(&SSLSession::handleWriteResponse, + shared_from_this(), boost::asio::placeholders::error))); + } + catch(std::runtime_error& ec) + { + ERROR_STREAM << "SSLSession::startWriteResponse() " << ec.what() << endl; + } + catch(...) + { + ERROR_STREAM << "SSLSession::startWriteResponse() unknown error" << endl; + } } } //namespace diff --git a/src/Server.cpp b/src/Server.cpp index 532a766a3deeb465ff5a2f405b1349ae0e79ee8b..a035dcd5ae98a44b47fc52502572d7940d26612c 100644 --- a/src/Server.cpp +++ b/src/Server.cpp @@ -22,7 +22,7 @@ Server::Server(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_ m_dBManager_sp = DBManager::create(deviceImpl_p, configuration_sp); m_ioService_sp.reset(new boost::asio::io_service); - m_work_sp.reset(new boost::asio::io_service::work(*m_ioService_sp)); + m_acceptor_sp.reset(new boost::asio::ip::tcp::acceptor(*m_ioService_sp)); m_state = Tango::OFF; @@ -36,13 +36,16 @@ Server::~Server() { DEBUG_STREAM << "Server::~Server()" << endl; - m_acceptor_sp->close(); + boost::system::error_code errorCode; + m_acceptor_sp->close(errorCode); + + m_work_sp.reset(); m_ioService_sp->stop(); if(m_threadGroup_sp) { - m_threadGroup_sp->interrupt_all(); + //m_threadGroup_sp->interrupt_all(); m_threadGroup_sp->join_all(); } @@ -53,7 +56,7 @@ Server::~Server() //============================================================================== // Server::start() //============================================================================== -void Server::start() //@todo: handle exceptions +void Server::start() throw(std::runtime_error) { DEBUG_STREAM << "Server::start()" << endl; @@ -61,6 +64,24 @@ void Server::start() //@todo: handle exceptions m_ioService_sp->reset(); + m_work_sp.reset(new boost::asio::io_service::work(*m_ioService_sp)); + + std::string localHost = m_configuration_sp->getLocalHost(); + unsigned int localPort = m_configuration_sp->getLocalPort(); + + INFO_STREAM << "LISTENING ON " << localHost << ":" << localPort << endl; + + boost::asio::ip::tcp::resolver::query query(localHost, + boost::lexical_cast<std::string>(localPort)); + + boost::asio::ip::tcp::resolver resolver(*m_ioService_sp); + boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query); + + m_acceptor_sp->open(endpoint.protocol()); + m_acceptor_sp->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); + m_acceptor_sp->bind(endpoint); + m_acceptor_sp->listen(); + m_threadGroup_sp.reset(new boost::thread_group); unsigned int workerNumber = m_configuration_sp->getWorkerNumber(); @@ -70,29 +91,32 @@ void Server::start() //@todo: handle exceptions for(unsigned int i=0; i<workerNumber; ++i) m_threadGroup_sp->add_thread(new boost::thread(&WorkerThread::run, worker)); - startListen(); - startAccept(); } //============================================================================== // Server::stop() //============================================================================== -void Server::stop() //@todo: handle exceptions +void Server::stop() throw(std::runtime_error) { DEBUG_STREAM << "Server::stop()" << endl; - m_acceptor_sp->close(); + boost::system::error_code errorCode; + m_acceptor_sp->close(errorCode); + + m_work_sp.reset(); m_ioService_sp->stop(); if(m_threadGroup_sp) { - m_threadGroup_sp->interrupt_all(); + //m_threadGroup_sp->interrupt_all(); m_threadGroup_sp->join_all(); } + m_threadGroup_sp.reset(); + m_dBManager_sp->disconnect(); } //============================================================================== @@ -120,27 +144,27 @@ std::string Server::readStatus() } //============================================================================== -// Server::readStatus() +// Server::writeState() //============================================================================== -void Server::startListen() //@todo: handle exceptions +void Server::writeState(Tango::DevState state) { - DEBUG_STREAM << "Server::startListen()" << endl; + DEBUG_STREAM << "Server::writeState()" << endl; - std::string localHost = m_configuration_sp->getLocalHost(); - unsigned int localPort = m_configuration_sp->getLocalPort(); + boost::mutex::scoped_lock stateLock(m_stateMutex); - INFO_STREAM << "LISTENING ON " << localHost << ":" << localPort << endl; + m_state = state; +} - boost::asio::ip::tcp::resolver::query query(localHost, - boost::lexical_cast<std::string>(localPort)); +//============================================================================== +// Server::writeStatus() +//============================================================================== +void Server::writeStatus(std::string status) +{ + DEBUG_STREAM << "Server::writeStatus()" << endl; - boost::asio::ip::tcp::resolver resolver(*m_ioService_sp); - boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query); + boost::mutex::scoped_lock statusLock(m_stateMutex); - m_acceptor_sp->open(endpoint.protocol()); - m_acceptor_sp->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); - m_acceptor_sp->bind(endpoint); - m_acceptor_sp->listen(); + m_status = status; } //============================================================================== @@ -152,9 +176,13 @@ void Server::handleAccept(Session::SP session_sp, DEBUG_STREAM << "Server::handleAccept()" << endl; if(!ec) + { session_sp->start(); + } else + { WARN_STREAM << "Server::handleAccept() " << ec.message() << endl; + } startAccept(); } diff --git a/src/Server.h b/src/Server.h index 6bcf9df684492a148fe745158592595a268190cb..adad47cc0856b186586c059a8e4c12679fca6f10 100644 --- a/src/Server.h +++ b/src/Server.h @@ -34,12 +34,12 @@ public: //------------------------------------------------------------------------------ // [Public] Thread management methods //------------------------------------------------------------------------------ - virtual void start(); + virtual void start() throw(std::runtime_error); - virtual void stop(); + virtual void stop() throw(std::runtime_error); //------------------------------------------------------------------------------ -// [Public] State status methods +// [Public] Read state and status methods //------------------------------------------------------------------------------ virtual Tango::DevState readState(); @@ -47,10 +47,15 @@ public: protected: //------------------------------------------------------------------------------ -// [Protected] Incoming connection methods +// [Protected] Write state and status methods //------------------------------------------------------------------------------ - virtual void startListen(); + virtual void writeState(Tango::DevState); + + virtual void writeStatus(std::string); +//------------------------------------------------------------------------------ +// [Protected] Incoming connection methods +//------------------------------------------------------------------------------ virtual void startAccept() = 0; virtual void handleAccept(Session::SP, const boost::system::error_code&); diff --git a/src/Session.cpp b/src/Session.cpp index 727079f6f26df446aeb60d5337d5fa78ad941286..33929976f80a730d42184fd18aaabcd2f2671f79 100644 --- a/src/Session.cpp +++ b/src/Session.cpp @@ -9,9 +9,12 @@ namespace MetadataExporter_ns Session::Session(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp, DBManager::SP dBManager_sp, boost::shared_ptr<boost::asio::io_service> ioService_sp) : Tango::LogAdapter(deviceImpl_p), m_configuration_sp(configuration_sp), - m_dBManager_sp(dBManager_sp), m_strand(*ioService_sp) + m_strand(*ioService_sp) { DEBUG_STREAM << "Session::Session()" << endl; + + m_protocolManager_sp = ProtocolManager::create(deviceImpl_p, + configuration_sp, dBManager_sp); } //============================================================================== @@ -87,57 +90,6 @@ void Session::handleWriteResponse(const boost::system::error_code& errorCode) } } -//============================================================================== -// Session::prepareResponse() -//============================================================================== -Session::ResponseSP Session::prepareResponse(Session::RequestSP request) - throw(std::runtime_error) -{ - DEBUG_STREAM << "Session::prepareResponse()" << endl; - - ResponseSP response_sp(new Response()); - - switch(request->type()) - { - case Request::AUTHORIZATION: - { - INFO_STREAM << "Session::prepareResponse() AUTHORIZATION REQUEST" << endl; - - const Request::Authorization& auth_req = request->authorization(); - - INFO_STREAM << "USERNAME " << auth_req.username() << endl; - INFO_STREAM << "PASSWORD " << auth_req.password() << endl; - - response_sp->set_type(Response::AUTHORIZATION); - - Response::Authorization* auth_resp = response_sp->mutable_authorization(); - auth_resp->set_state(Response::Authorization::ACCEPTED); - auth_resp->set_status("Authorization accepted"); - - break; - } - - case Request::VALIDATION: - { - INFO_STREAM << "Session::prepareResponse() VALIDATION REQUEST" << endl; - response_sp->set_type(Response::VALIDATION); - break; - } - - case Request::METADATA: - { - INFO_STREAM << "Session::prepareResponse() METADATA REQUEST" << endl; - response_sp->set_type(Response::METADATA); - break; - } - - default: - throw std::runtime_error("Unknown request type!"); - } - - return response_sp; -} - //============================================================================== // Session::encodeHeader() //============================================================================== diff --git a/src/Session.h b/src/Session.h index 8bbd7300e19d5798ab00570c6d9862d18c501c20..65e7381f0a0b5867b2d15389cafebeea5125ab2b 100644 --- a/src/Session.h +++ b/src/Session.h @@ -1,10 +1,11 @@ #ifndef SESSION_H #define SESSION_H -#include <Response.pb.h> -#include <Request.pb.h> #include <Configuration.h> #include <DBManager.h> +#include <ProtocolManager.h> +#include <Response.pb.h> +#include <Request.pb.h> #include <tango.h> @@ -23,12 +24,6 @@ public: //------------------------------------------------------------------------------ typedef boost::shared_ptr<Session> SP; -//------------------------------------------------------------------------------ -// [Public] Request Response shared pointer typedef -//------------------------------------------------------------------------------ - typedef boost::shared_ptr<Request> RequestSP; - typedef boost::shared_ptr<Response> ResponseSP; - protected: //------------------------------------------------------------------------------ // [Protected] Constructor destructor deleter @@ -68,12 +63,6 @@ protected: virtual void handleWriteResponse(const boost::system::error_code&); -//------------------------------------------------------------------------------ -// [Protected] Response method -//------------------------------------------------------------------------------ - virtual ResponseSP prepareResponse(RequestSP) - throw(std::runtime_error); - //------------------------------------------------------------------------------ // [Protected] Encode decode header methods //------------------------------------------------------------------------------ @@ -89,8 +78,8 @@ protected: //Configuration parameters shared pointer Configuration::SP m_configuration_sp; - //Database manger shared pointer - DBManager::SP m_dBManager_sp; + //Protocol manager class shared pointer + ProtocolManager::SP m_protocolManager_sp; //Synchronization mechanism boost::asio::io_service::strand m_strand; diff --git a/src/WorkerThread.cpp b/src/WorkerThread.cpp index 61af1d8985787d6b6c473886ed0305dbfb7e5e50..5a696ca7e4dd416387e66de60aa60f87fc8dd461 100644 --- a/src/WorkerThread.cpp +++ b/src/WorkerThread.cpp @@ -27,7 +27,7 @@ WorkerThread::~WorkerThread() //============================================================================== void WorkerThread::run() { - DEBUG_STREAM << "WorkerThread::run()" << endl; + INFO_STREAM << "WorkerThread::run() STARTING" << endl; while(true) { @@ -53,7 +53,7 @@ void WorkerThread::run() } } //while - DEBUG_STREAM << "WorkerThread::run() exiting" << endl; + INFO_STREAM << "WorkerThread::run() STOPPING" << endl; } } \ No newline at end of file