diff --git a/src/Client.cpp b/src/Client.cpp index 7d664c0e01300f0fb407d1edc0c5d472552d30af..6248a53f12d4925086032e9d121daf6033846a2f 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -16,7 +16,7 @@ Client::Client(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_ { DEBUG_STREAM << "Client::Client()" << endl; - //GOOGLE_PROTOBUF_VERIFY_VERSION; + GOOGLE_PROTOBUF_VERIFY_VERSION; m_dBManager_sp = DBManager::create(deviceImpl_p, configuration_sp); @@ -42,7 +42,7 @@ Client::~Client() m_thread_sp->join(); } - //google::protobuf::ShutdownProtobufLibrary(); + google::protobuf::ShutdownProtobufLibrary(); } //============================================================================== @@ -54,8 +54,8 @@ void Client::start() m_dBManager_sp->connect(); -// m_protocolManager_sp = ProtocolManager::create(m_deviceImpl_p, -// m_configuration_sp, m_dBManager_sp); + m_protocolManager_sp = ProtocolManager::create(m_deviceImpl_p, + m_configuration_sp, m_dBManager_sp); m_ioService.reset(); @@ -84,7 +84,7 @@ void Client::stop() m_thread_sp.reset(); - //m_protocolManager_sp.reset(); + m_protocolManager_sp.reset(); m_dBManager_sp->disconnect(); } @@ -220,102 +220,112 @@ void Client::handleResolve(const boost::system::error_code& errorCode, } } -////============================================================================== -//// Client::handleRequest() -////============================================================================== -//void Client::handleWriteRequest(const boost::system::error_code& errorCode) -//{ -// DEBUG_STREAM << "Client::handleRequest()" << endl; -// -// if(!errorCode) -// { -// startReadResponseHeader(); -// } -// else -// { -// ERROR_STREAM << "Client::handleRequest() " << errorCode.message() << endl; -// -// writeState(Tango::FAULT); -// writeStatus(errorCode.message()); -// } -//} -// -////============================================================================== -//// Client::handleReadResponseHeader() -////============================================================================== -//void Client::handleReadResponseHeader(const boost::system::error_code& errorCode) -//{ -// DEBUG_STREAM << "Client::handleReadResponseHeader()" << endl; -// -// if(!errorCode) -// { -// boost::uint32_t bodySize = decodeHeader(m_readBuff); -// -// startReadResponseBody(bodySize); -// } -// else -// { -// ERROR_STREAM << "Client::handleReadResponseHeader() " << errorCode.message() << endl; -// -// writeState(Tango::FAULT); -// writeStatus(errorCode.message()); -// } -//} -// -////============================================================================== -//// Client::handleReadResponseBody() -////============================================================================== -//void Client::handleReadResponseBody(const boost::system::error_code& errorCode) -//{ -// DEBUG_STREAM << "Client::handleReadResponseBody()" << endl; -// -// if(!errorCode) -// { -// try -// { -// ResponseSP response_sp(new Response); -// -// response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE], -// m_readBuff.size() - HEADER_SIZE); -// -// m_protocolManager_sp->processResponse(response_sp); -// -// if(m_protocolManager_sp->waitBeforeRequest()) -// { -// m_requestResponseTimer.expires_from_now( -// boost::posix_time::seconds(m_configuration_sp->getRefreshTime())); -// -// m_requestResponseTimer.async_wait( -// boost::bind(&Client::startWriteRequest, this)); -// } -// else -// { -// startWriteRequest(); -// } -// } -// catch(std::exception& ec) -// { -// ERROR_STREAM << "Client::handleResponse() " << ec.what() << endl; -// -// writeState(Tango::FAULT); -// writeStatus(ec.what()); -// } -// catch(...) -// { -// ERROR_STREAM << "Client::handleResponse() Unknown error" << endl; -// -// writeState(Tango::FAULT); -// writeStatus("Unknown error"); -// } -// } -// else -// { -// ERROR_STREAM << "Client::handleResponse() " << errorCode.message() << endl; -// -// writeState(Tango::FAULT); -// writeStatus(errorCode.message()); -// } -//} +//============================================================================== +// Client::handleRequest() +//============================================================================== +void Client::handleWriteRequest(const boost::system::error_code& errorCode) +{ + DEBUG_STREAM << "Client::handleRequest()" << endl; + + if(!errorCode) + { + startReadResponseHeader(); + } + else + { + ERROR_STREAM << "Client::handleRequest() " << errorCode.message() << endl; + + writeState(Tango::FAULT); + writeStatus(errorCode.message()); + } +} + +//============================================================================== +// Client::handleReadResponseHeader() +//============================================================================== +void Client::handleReadResponseHeader(const boost::system::error_code& errorCode) +{ + DEBUG_STREAM << "Client::handleReadResponseHeader()" << endl; + + if(!errorCode) + { + boost::uint32_t bodySize = decodeHeader(m_readBuff); + + startReadResponseBody(bodySize); + } + else + { + ERROR_STREAM << "Client::handleReadResponseHeader() " << errorCode.message() << endl; + + writeState(Tango::FAULT); + writeStatus(errorCode.message()); + } +} + +//============================================================================== +// Client::handleReadResponseBody() +//============================================================================== +void Client::handleReadResponseBody(const boost::system::error_code& errorCode) +{ + DEBUG_STREAM << "Client::handleReadResponseBody()" << endl; + + if(!errorCode) + { + try + { + ResponseSP response_sp(new Response); + + response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE], + m_readBuff.size() - HEADER_SIZE); + + //m_protocolManager_sp->processResponse(response_sp); + + if(m_protocolManager_sp->waitBeforeRequest()) + { + m_requestResponseTimer.expires_from_now( + boost::posix_time::seconds(m_configuration_sp->getRefreshTime())); + + m_requestResponseTimer.async_wait( + boost::bind(&Client::startWriteRequest, this)); + } + else + { + startWriteRequest(); + } + } + catch(std::exception& ec) + { + ERROR_STREAM << "Client::handleResponse() " << ec.what() << endl; + + writeState(Tango::FAULT); + writeStatus(ec.what()); + } + catch(...) + { + ERROR_STREAM << "Client::handleResponse() Unknown error" << endl; + + writeState(Tango::FAULT); + writeStatus("Unknown error"); + } + } + else + { + ERROR_STREAM << "Client::handleResponse() " << errorCode.message() << endl; + + writeState(Tango::FAULT); + writeStatus(errorCode.message()); + } +} + +//============================================================================== +// Client::resetConnection() +//============================================================================== +void Client::handleReadData(const boost::system::error_code& errorCode) +{ + DEBUG_STREAM << "Client::handleReadData()" << endl; + + +} //============================================================================== // Client::resetConnection() diff --git a/src/Client.h b/src/Client.h index c48d60f6a55abf44f0d86e7eaac330aa5e25d612..53080e3d684808b47fe9ca36a3f4d9cbcb598ce5 100644 --- a/src/Client.h +++ b/src/Client.h @@ -3,9 +3,9 @@ #include <Configuration.h> #include <DBManager.h> -//#include <ProtocolManager.h> -//#include <Request.pb.h> -//#include <Response.pb.h> +#include <ProtocolManager.h> +#include <Request.pb.h> +#include <Response.pb.h> #include <tango.h> @@ -15,9 +15,6 @@ #include <boost/scoped_ptr.hpp> #include <boost/cstdint.hpp> -//#include <Request.pb.h> -//#include <Response.pb.h> - namespace DataImporter_ns { @@ -81,26 +78,33 @@ protected: virtual void handleConnect(const boost::system::error_code&, boost::asio::ip::tcp::resolver::iterator) = 0; -////------------------------------------------------------------------------------ -//// [Protected] Write request methods -////------------------------------------------------------------------------------ -// virtual void startWriteRequest() = 0; -// -// virtual void handleWriteRequest(const boost::system::error_code&); -// -////------------------------------------------------------------------------------ -//// [Protected] Read response header methods -////------------------------------------------------------------------------------ -// virtual void startReadResponseHeader() = 0; -// -// virtual void handleReadResponseHeader(const boost::system::error_code&); -// -////------------------------------------------------------------------------------ -//// [Protected] Read response body methods -////------------------------------------------------------------------------------ -// virtual void startReadResponseBody(boost::uint32_t) = 0; -// -// virtual void handleReadResponseBody(const boost::system::error_code&); +//------------------------------------------------------------------------------ +// [Protected] Write request methods +//------------------------------------------------------------------------------ + virtual void startWriteRequest() = 0; + + virtual void handleWriteRequest(const boost::system::error_code&); + +//------------------------------------------------------------------------------ +// [Protected] Read response header methods +//------------------------------------------------------------------------------ + virtual void startReadResponseHeader() = 0; + + virtual void handleReadResponseHeader(const boost::system::error_code&); + +//------------------------------------------------------------------------------ +// [Protected] Read response body methods +//------------------------------------------------------------------------------ + virtual void startReadResponseBody(boost::uint32_t) = 0; + + virtual void handleReadResponseBody(const boost::system::error_code&); + +//------------------------------------------------------------------------------ +// [Protected] Read response body methods +//------------------------------------------------------------------------------ + virtual void startReadData() = 0; + + virtual void handleReadData(const boost::system::error_code&); //------------------------------------------------------------------------------ // [Protected] Connection reset and timeout handler methods @@ -131,7 +135,7 @@ protected: DBManager::SP m_dBManager_sp; //Protocol manager shared pointer - //ProtocolManager::SP m_protocolManager_sp; + ProtocolManager::SP m_protocolManager_sp; //IO service instance boost::asio::io_service m_ioService; diff --git a/src/Configuration.h b/src/Configuration.h index de0b6859053759bf9affa41245067138289bb792..ce24ab655917b9c679f65ea1f3d5e4c102f97bc3 100644 --- a/src/Configuration.h +++ b/src/Configuration.h @@ -19,6 +19,9 @@ private: // [Private] Constructor destructor deleter //------------------------------------------------------------------------------ Configuration(std::string certificateFile, std::string storagePath, + std::string dIDBHost, unsigned int dIDBPort, std::string dIDBUser, + std::string dIDBPassword, std::string dIDBSchema, + std::string dIDBIndexTable, std::string dIDBRejectedTable, std::string remoteHost, unsigned int remotePort, std::string remoteUsername, std::string remotePassword, std::string databaseHost, unsigned int databasePort, @@ -26,6 +29,9 @@ private: std::string databaseSchema, std::string databaseTable, unsigned int refreshTime, unsigned int timeout) : m_certificateFile (certificateFile), m_storagePath(storagePath), + m_dIDBHost(dIDBHost), m_dIDBPort(dIDBPort), m_dIDBUser(dIDBUser), + m_dIDBPassword(dIDBPassword), m_dIDBSchema(dIDBSchema), + m_dIDBIndexTable(dIDBIndexTable), m_dIDBRejectedTable(dIDBRejectedTable), m_remoteHost(remoteHost), m_remotePort(remotePort), m_remoteUsername(remoteUsername), m_remotePassword(remotePassword), m_databaseHost(databaseHost), m_databasePort(databasePort), @@ -48,19 +54,22 @@ public: // [Public] Create class method //------------------------------------------------------------------------------ static Configuration::SP create(std::string certificateFile, - std::string storagePath, std::string remoteHost, - unsigned int remotePort, std::string remoteUsername, - std::string remotePassword, std::string databaseHost -, unsigned int databasePort, std::string databaseUsername, - std::string databasePassword, std::string databaseSchema, - std::string databaseTable, unsigned int refreshTime, - unsigned int timeout) + std::string storagePath, std::string dIDBHost, unsigned int dIDBPort, + std::string dIDBUser, std::string dIDBPassword, std::string dIDBSchema, + std::string dIDBIndexTable, std::string dIDBRejectedTable, + std::string remoteHost, unsigned int remotePort, + std::string remoteUsername, std::string remotePassword, + std::string databaseHost, unsigned int databasePort, + std::string databaseUsername, std::string databasePassword, + std::string databaseSchema, std::string databaseTable, + unsigned int refreshTime, unsigned int timeout) { Configuration::SP c_sp(new Configuration(certificateFile, storagePath, - remoteHost, remotePort, remoteUsername, remotePassword, - databaseHost, databasePort, databaseUsername, databasePassword, - databaseSchema, databaseTable, refreshTime, timeout), - Configuration::Deleter()); + dIDBHost, dIDBPort, dIDBUser, dIDBPassword, dIDBSchema, + dIDBIndexTable, dIDBRejectedTable, remoteHost, remotePort, + remoteUsername, remotePassword, databaseHost, databasePort, + databaseUsername, databasePassword, databaseSchema, databaseTable, + refreshTime, timeout), Configuration::Deleter()); return c_sp; } @@ -70,6 +79,13 @@ public: //------------------------------------------------------------------------------ std::string getCertificateFile() const { return m_certificateFile; } std::string getStoragePath() const { return m_storagePath; } + std::string getDIDBHost() const { return m_dIDBHost; } + unsigned int getDIDBPort() const { return m_dIDBPort; } + std::string getDIDBUser() const { return m_dIDBUser; } + std::string getDIDBPassword() const { return m_dIDBPassword; } + std::string getDIDBSchema() const { return m_dIDBSchema; } + std::string getDIDBIndexTable() const { return m_dIDBIndexTable; } + std::string getDIDBRejectedTable() const { return m_dIDBRejectedTable; } std::string getRemoteHost() const { return m_remoteHost; } unsigned int getRemotePort() const { return m_remotePort; } std::string getRemoteUsername() const { return m_remoteUsername; } @@ -88,10 +104,31 @@ private: // [Private] class variables //------------------------------------------------------------------------------ //Absolute path to certificate chain file - const std::string m_certificateFile; + const std::string m_certificateFile; //Absolute path to storage - const std::string m_storagePath; + const std::string m_storagePath; + + //Host where data import database is running + const std::string m_dIDBHost; + + //Port where data import database is listening + const unsigned int m_dIDBPort; + + //User to login in data import database + const std::string m_dIDBUser; + + //Password to login in data import database + const std::string m_dIDBPassword; + + //Schema where data import tables are located + const std::string m_dIDBSchema; + + //Index table name + const std::string m_dIDBIndexTable; + + //Rejected table name + const std::string m_dIDBRejectedTable; //Metadata exporter remote host const std::string m_remoteHost; diff --git a/src/DataImporter.cpp b/src/DataImporter.cpp index 785eef40b43a4d0cf3c7988fa133d6012e8f1f3f..b1a0d6d4434ef922eefa5b6f9410e9dfa9725bab 100644 --- a/src/DataImporter.cpp +++ b/src/DataImporter.cpp @@ -490,10 +490,31 @@ void DataImporter::get_device_property() checkIfDirectoryExists(storagePath); + if(dIDBHost.empty()) + throw(invalid_argument("DIDBHost property is empty or not defined")); + + if(dIDBPort<1 || dIDBPort>MAX_PORT_NUMBER) + throw(invalid_argument("DIDBPort property out of range or not defined")); + + if(dIDBUser.empty()) + throw(invalid_argument("DIDBUser property is empty or not defined")); + + if(dIDBPassword.empty()) + throw(invalid_argument("DIDBPassword property is empty or not defined")); + + if(dIDBSchema.empty()) + throw(invalid_argument("DIDBSchema property is empty or not defined")); + + if(dIDBIndexTable.empty()) + throw(invalid_argument("DIDBIndexTable property is empty or not defined")); + + if(dIDBRejectedTable.empty()) + throw(invalid_argument("DIDBRejectedTable property is empty or not defined")); + if(remoteHost.empty()) throw(invalid_argument("RemoteHost property is empty or not defined")); - if(remotePort<1 || remotePort>MAX_REMOTE_PORT) + if(remotePort<1 || remotePort>MAX_PORT_NUMBER) throw(invalid_argument("RemotePort property out of range or not defined")); if(remoteUsername.empty()) @@ -505,7 +526,7 @@ void DataImporter::get_device_property() if(databaseHost.empty()) throw(invalid_argument("DatabaseHost property is empty or not defined")); - if(databasePort<1 || databasePort>MAX_DATABASE_PORT) + if(databasePort<1 || databasePort>MAX_PORT_NUMBER) throw(invalid_argument("DatabasePort property out of range or not defined")); if(databaseUsername.empty()) @@ -527,9 +548,10 @@ void DataImporter::get_device_property() throw(invalid_argument("Timeout property out of range or not defined")); m_configuration_sp = Configuration::create(certificateFile, storagePath, - remoteHost, remotePort, remoteUsername, remotePassword, databaseHost, - databasePort, databaseUsername, databasePassword, databaseSchema, - databaseTable, refreshTime, timeout); + dIDBHost, dIDBPort, dIDBUser, dIDBPassword, dIDBSchema, dIDBIndexTable, + dIDBRejectedTable, remoteHost, remotePort, remoteUsername, remotePassword, + databaseHost, databasePort, databaseUsername, databasePassword, + databaseSchema, databaseTable, refreshTime, timeout); } catch(invalid_argument& ex) { @@ -715,6 +737,5 @@ void DataImporter::checkIfDirectoryExists(std::string directoryName) INFO_STREAM << "DataImporter::checkIfDirectoryExists() " << directoryName << endl; } - /*----- PROTECTED REGION END -----*/ // DataImporter::namespace_ending } // namespace diff --git a/src/DataImporter.h b/src/DataImporter.h index bbe9e5f5d60aed6cdc378df80592302915dbd024..4b139ef11175df8ed06aef8bb05e4107103133d5 100644 --- a/src/DataImporter.h +++ b/src/DataImporter.h @@ -73,11 +73,8 @@ class DataImporter : public TANGO_BASE_CLASS //Client class shared pointer Client::SP m_client_sp; - //Max port number allowed value for remote connection - static const unsigned int MAX_REMOTE_PORT = 65535; - - //Max port number allowed value for local database - static const unsigned int MAX_DATABASE_PORT = 65535; + //Max port number allowed value for data import database + static const unsigned int MAX_PORT_NUMBER = 65535; //Max time between remote server requests static const unsigned int MAX_REFRESH_TIME = 3600; @@ -94,7 +91,7 @@ public: string certificateFile; // StoragePath: Absolute path to storage string storagePath; - // DIDBHost: Hostname where data import database is running + // DIDBHost: Host where data import database is running string dIDBHost; // DIDBPort: Port where data import database is listening Tango::DevULong dIDBPort; diff --git a/src/PlainClient.cpp b/src/PlainClient.cpp index c6a837e39bcb0c4db98cdd76d4a1e5afbf14b879..0970fb335fe13fefab3ccf33555697ce33828377 100644 --- a/src/PlainClient.cpp +++ b/src/PlainClient.cpp @@ -114,9 +114,9 @@ void PlainClient::handleConnect(const boost::system::error_code& errorCode, writeState(Tango::ON); writeStatus(infoStream.str()); - //m_protocolManager_sp->setRemoteEndpoint(m_remoteEndpoint); + m_protocolManager_sp->setRemoteEndpoint(m_remoteEndpoint); - //startWriteRequest(); + startWriteRequest(); } else { @@ -132,90 +132,100 @@ void PlainClient::handleConnect(const boost::system::error_code& errorCode, } } -////============================================================================== -//// PlainClient::startRequest() -////============================================================================== -//void PlainClient::startWriteRequest() -//{ -// DEBUG_STREAM << "PlainClient::startRequest()" << endl; -// -// try -// { -// RequestSP request_sp = m_protocolManager_sp->createRequest(); -// -// boost::uint32_t bodySize = request_sp->ByteSize(); -// -// #ifdef VERBOSE_DEBUG -// INFO_STREAM << "PlainClient::startRequest() " -// << m_remoteEndpoint << " <<<< " << bodySize << " byte" << endl; -// #endif -// -// std::vector<boost::uint8_t> writeBuff; -// writeBuff.resize(HEADER_SIZE + bodySize); -// -// encodeHeader(writeBuff, bodySize); -// -// request_sp->SerializeToArray(&writeBuff[HEADER_SIZE], bodySize); -// -// m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30)); -// -// boost::asio::async_write(m_plainSocket, boost::asio::buffer(writeBuff), -// boost::bind(&PlainClient::handleWriteRequest, this, -// boost::asio::placeholders::error)); -// } -// catch(std::exception& ec) -// { -// ERROR_STREAM << "PlainClient::startWriteRequest() " << ec.what() << endl; -// -// writeState(Tango::FAULT); -// writeStatus(ec.what()); -// } -// catch(...) -// { -// ERROR_STREAM << "PlainClient::startWriteRequest() unknown error" << endl; -// -// writeState(Tango::FAULT); -// writeStatus("Unknown error"); -// } -//} -// -////============================================================================== -//// PlainClient::startReadResponseHeader() -////============================================================================== -//void PlainClient::startReadResponseHeader() -//{ -// DEBUG_STREAM << "PlainClient::startReadResponseHeader()" << endl; -// -// m_readBuff.resize(HEADER_SIZE); -// -// m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30)); -// -// boost::asio::async_read(m_plainSocket, boost::asio::buffer(m_readBuff), -// boost::bind(&PlainClient::handleReadResponseHeader, this, -// boost::asio::placeholders::error)); -//} -// -////============================================================================== -//// PlainClient::startReadResponseBody() -////============================================================================== -//void PlainClient::startReadResponseBody(boost::uint32_t bodySize) -//{ -// DEBUG_STREAM << "PlainClient::startReadResponseBody()" << endl; -// -// #ifdef VERBOSE_DEBUG -// INFO_STREAM << "PlainClient::startReadResponseBody() " -// << m_remoteEndpoint << " >>>> " << bodySize << " byte" << endl; -// #endif -// -// m_readBuff.resize(HEADER_SIZE + bodySize); -// -// boost::asio::mutable_buffers_1 mutableBuffer = -// boost::asio::buffer(&m_readBuff[HEADER_SIZE], bodySize); -// -// boost::asio::async_read(m_plainSocket, mutableBuffer, -// boost::bind(&PlainClient::handleReadResponseBody, this, -// boost::asio::placeholders::error)); -//} +//============================================================================== +// PlainClient::startRequest() +//============================================================================== +void PlainClient::startWriteRequest() +{ + DEBUG_STREAM << "PlainClient::startRequest()" << endl; + + try + { + //RequestSP request_sp = m_protocolManager_sp->createRequest(); + RequestSP request_sp(new Request); + + boost::uint32_t bodySize = request_sp->ByteSize(); + + #ifdef VERBOSE_DEBUG + INFO_STREAM << "PlainClient::startRequest() " + << m_remoteEndpoint << " <<<< " << bodySize << " byte" << endl; + #endif + + std::vector<boost::uint8_t> writeBuff; + writeBuff.resize(HEADER_SIZE + bodySize); + + encodeHeader(writeBuff, bodySize); + + request_sp->SerializeToArray(&writeBuff[HEADER_SIZE], bodySize); + + m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30)); + + boost::asio::async_write(m_plainSocket, boost::asio::buffer(writeBuff), + boost::bind(&PlainClient::handleWriteRequest, this, + boost::asio::placeholders::error)); + } + catch(std::exception& ec) + { + ERROR_STREAM << "PlainClient::startWriteRequest() " << ec.what() << endl; + + writeState(Tango::FAULT); + writeStatus(ec.what()); + } + catch(...) + { + ERROR_STREAM << "PlainClient::startWriteRequest() unknown error" << endl; + + writeState(Tango::FAULT); + writeStatus("Unknown error"); + } +} + +//============================================================================== +// PlainClient::startReadResponseHeader() +//============================================================================== +void PlainClient::startReadResponseHeader() +{ + DEBUG_STREAM << "PlainClient::startReadResponseHeader()" << endl; + + m_readBuff.resize(HEADER_SIZE); + + m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30)); + + boost::asio::async_read(m_plainSocket, boost::asio::buffer(m_readBuff), + boost::bind(&PlainClient::handleReadResponseHeader, this, + boost::asio::placeholders::error)); +} + +//============================================================================== +// PlainClient::startReadResponseBody() +//============================================================================== +void PlainClient::startReadResponseBody(boost::uint32_t bodySize) +{ + DEBUG_STREAM << "PlainClient::startReadResponseBody()" << endl; + + #ifdef VERBOSE_DEBUG + INFO_STREAM << "PlainClient::startReadResponseBody() " + << m_remoteEndpoint << " >>>> " << bodySize << " byte" << endl; + #endif + + m_readBuff.resize(HEADER_SIZE + bodySize); + + boost::asio::mutable_buffers_1 mutableBuffer = + boost::asio::buffer(&m_readBuff[HEADER_SIZE], bodySize); + + boost::asio::async_read(m_plainSocket, mutableBuffer, + boost::bind(&PlainClient::handleReadResponseBody, this, + boost::asio::placeholders::error)); +} + +//============================================================================== +// PlainClient::startReadData() +//============================================================================== +void PlainClient::startReadData() +{ + DEBUG_STREAM << "PlainClient::startReadData()" << endl; + +} //============================================================================== // PlainClient::closeConnection() diff --git a/src/PlainClient.h b/src/PlainClient.h index c17850dcc855f3ea48698f6e9d2b7c4f9e8045e3..f4014cf5e0f31f8808001db57ea135d4dbd07e07 100644 --- a/src/PlainClient.h +++ b/src/PlainClient.h @@ -49,11 +49,13 @@ protected: //------------------------------------------------------------------------------ // [Protected] Request response methods //------------------------------------------------------------------------------ -// virtual void startWriteRequest(); -// -// virtual void startReadResponseHeader(); -// -// virtual void startReadResponseBody(boost::uint32_t); + virtual void startWriteRequest(); + + virtual void startReadResponseHeader(); + + virtual void startReadResponseBody(boost::uint32_t); + + virtual void startReadData(); //------------------------------------------------------------------------------ // [Protected] Connection close method diff --git a/src/ProtocolManager.cpp b/src/ProtocolManager.cpp new file mode 100644 index 0000000000000000000000000000000000000000..123fe5d004ad02760de75c7db54028fb0f559a2d --- /dev/null +++ b/src/ProtocolManager.cpp @@ -0,0 +1,186 @@ +#include <ProtocolManager.h> + +#include <boost/date_time.hpp> + +namespace DataImporter_ns +{ + +//============================================================================== +// ProtocolManager::ProtocolManager() +//============================================================================== +ProtocolManager::ProtocolManager(Tango::DeviceImpl* deviceImpl_p, + Configuration::SP configuration_sp, DBManager::SP dBManager_sp) : + Tango::LogAdapter(deviceImpl_p), m_configuration_sp(configuration_sp), + m_dBManager_sp(dBManager_sp) +{ + DEBUG_STREAM << "ProtocolManager::ProtocolManager()" << endl; + + m_isAuthorised = false; + m_hasMoreData = true; +} + +//============================================================================== +// ProtocolManager::ProtocolManager() +//============================================================================== +ProtocolManager::~ProtocolManager() +{ + DEBUG_STREAM << "ProtocolManager::~ProtocolManager()" << endl; +} + +//============================================================================== +// ProtocolManager::ProtocolManager() +//============================================================================== +ProtocolManager::SP ProtocolManager::create(Tango::DeviceImpl* deviceImpl_p, + Configuration::SP configuration_sp, DBManager::SP dBManager_sp) +{ + ProtocolManager::SP d_sp(new ProtocolManager(deviceImpl_p, configuration_sp, + dBManager_sp), ProtocolManager::Deleter()); + + return d_sp; +} + +//============================================================================== +// ProtocolManager::ProtocolManager() +//============================================================================== +void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint) +{ + DEBUG_STREAM << "ProtocolManager::setRemoteEndpoint()" << endl; + + m_remoteEndpoint = remoteEndpoint; +} + +//============================================================================== +// ProtocolManager::waitBeforeRequest() +//============================================================================== +bool ProtocolManager::waitBeforeRequest() +{ + DEBUG_STREAM << "ProtocolManager::waitBeforeRequest()" << endl; + + return !m_hasMoreData; +} + +//============================================================================== +// ProtocolManager::resetProtocolStatus() +//============================================================================== +void ProtocolManager::resetProtocolStatus() +{ + DEBUG_STREAM << "ProtocolManager::resetProtocolStatus()" << endl; + + m_isAuthorised = false; + m_hasMoreData = true; +} + +//============================================================================== +// ProtocolManager::createRequest() +//============================================================================== +RequestSP ProtocolManager::createRequest() + throw(std::runtime_error, std::out_of_range) +{ + DEBUG_STREAM << "ProtocolManager::createRequest()" << endl; + + RequestSP request_sp; + + if(!m_isAuthorised) + { + //request_sp = createAuthroisation(); + } + else + { + //request_sp = createMetadata(); + } + + if(!request_sp->IsInitialized()) + throw std::runtime_error("Not initialized request!"); + + return request_sp; +} + +//============================================================================== +// ProtocolManager::processResponse() +//============================================================================== +void ProtocolManager::processResponse(ResponseSP response_sp) + throw(std::runtime_error, std::out_of_range) +{ + DEBUG_STREAM << "ProtocolManager::processResponse()" << endl; + + if(!response_sp->IsInitialized()) + throw std::runtime_error("Not initialized response!"); + +// switch(response_sp->type()) +// { +// case Response::AUTHORIZATION: +// { +// processAuthroisation(response_sp); +// break; +// } +// case Response::VALIDATION: +// { +// processValidation(response_sp); +// break; +// } +// case Response::METADATA: +// { +// processMetadata(response_sp); +// break; +// } +// default: +// throw std::runtime_error("Unknown response type"); +// } +} + +////============================================================================== +//// ProtocolManager::createAuthroisation() +////============================================================================== +//RequestSP ProtocolManager::createAuthroisation() throw(std::runtime_error) +//{ +// DEBUG_STREAM << "ProtocolManager::createAuthroisation()" << endl; +// +// RequestSP request_sp(new Request); +// +// request_sp->set_type(Request::AUTHORIZATION); +// +// std::string user = m_configuration_sp->getRemoteUsername(); +// std::string password = m_configuration_sp->getRemotePassword(); +// +// #ifdef VERBOSE_DEBUG +// INFO_STREAM << "ProtocolManager::createAuthroisation() Send username " +// << user << " password " << password << " to " << m_remoteEndpoint << endl; +// #else +// INFO_STREAM << "ProtocolManager::createAuthroisation() Send to " +// << m_remoteEndpoint << endl; +// #endif +// +// Request::Authorization* authorization = request_sp->mutable_authorization(); +// authorization->set_username(user); +// authorization->set_password(password); +// +// return request_sp; +//} + +////============================================================================== +//// ProtocolManager::processAuthroisation() +////============================================================================== +//void ProtocolManager::processAuthroisation(ResponseSP response_sp) +// throw(std::runtime_error) +//{ +// DEBUG_STREAM << "ProtocolManager::processAuthroisation()" << endl; +// +// const Response::Authorization& authorization = response_sp->authorization(); +// +// if(authorization.state() == Response::Authorization::ACCEPTED) +// { +// INFO_STREAM << "ProtocolManager::processAuthroisation() State ACCEPTED " +// << "status " << authorization.status() << " from " << m_remoteEndpoint << endl; +// +// m_isAuthorised = true; +// } +// else +// { +// ERROR_STREAM << "ProtocolManager::processAuthroisation() State REJECTED " +// << "status " << authorization.status() << " from " << m_remoteEndpoint << endl; +// +// throw std::runtime_error(authorization.status()); +// } +//} + +} //namespace diff --git a/src/ProtocolManager.h b/src/ProtocolManager.h new file mode 100644 index 0000000000000000000000000000000000000000..ebecde6bd42b143374ecf728c3d3f3566ef18c53 --- /dev/null +++ b/src/ProtocolManager.h @@ -0,0 +1,102 @@ +#ifndef PROTOCOLMANAGER_H +#define PROTOCOLMANAGER_H + +#include <Response.pb.h> +#include <Request.pb.h> +#include <Configuration.h> +#include <DBManager.h> + +#include <tango.h> + +namespace DataImporter_ns +{ + +//Protocol buffer request class shared pointer +typedef boost::shared_ptr<Request> RequestSP; + +//Protocol buffer response class shared pointer +typedef boost::shared_ptr<Response> ResponseSP; + +class ProtocolManager : public Tango::LogAdapter +{ +public: +//------------------------------------------------------------------------------ +// [Public] Shared pointer typedef +//------------------------------------------------------------------------------ + typedef boost::shared_ptr<ProtocolManager> SP; + +protected: +//------------------------------------------------------------------------------ +// [Protected] Constructor destructor deleter +//------------------------------------------------------------------------------ + ProtocolManager(Tango::DeviceImpl*, Configuration::SP, DBManager::SP); + + virtual ~ProtocolManager(); + + class Deleter; + friend Deleter; + class Deleter + { + public: + void operator()(ProtocolManager* d) { delete d; } + }; + +public: +//------------------------------------------------------------------------------ +// [Public] Class creation method +//------------------------------------------------------------------------------ + static ProtocolManager::SP create(Tango::DeviceImpl*, Configuration::SP, + DBManager::SP); + +//------------------------------------------------------------------------------ +// [Public] Remote endpoint setter method +//------------------------------------------------------------------------------ + virtual void setRemoteEndpoint(std::string); + +//------------------------------------------------------------------------------ +// [Public] Time between request method +//------------------------------------------------------------------------------ + virtual bool waitBeforeRequest(); + +//------------------------------------------------------------------------------ +// [Public] Reset protocol status method +//------------------------------------------------------------------------------ + virtual void resetProtocolStatus(); + +//------------------------------------------------------------------------------ +// [Public] Request response management method +//------------------------------------------------------------------------------ + virtual RequestSP createRequest() + throw(std::runtime_error, std::out_of_range); + + virtual void processResponse(ResponseSP) + throw(std::runtime_error, std::out_of_range); + +protected: +//------------------------------------------------------------------------------ +// [Protected] Request specific methods +//------------------------------------------------------------------------------ + +//------------------------------------------------------------------------------ +// [Protected] Class variables +//------------------------------------------------------------------------------ + //Configuration parameters shared pointer + Configuration::SP m_configuration_sp; + + //Database manger shared pointer + DBManager::SP m_dBManager_sp; + + //Client is authorised + bool m_isAuthorised; + + //Server has more data in buffer + bool m_hasMoreData; + + //Address and port of remote endpoint + std::string m_remoteEndpoint; +}; + +} //End of namespace + +#endif /* PROTOCOLMANAGER_H */ + diff --git a/src/SSLClient.cpp b/src/SSLClient.cpp index 57aee4849217d2597bfebbdbbec10271487a6e3f..e86867e43449cc90feb1eac5dc36dd628423104c 100644 --- a/src/SSLClient.cpp +++ b/src/SSLClient.cpp @@ -172,9 +172,9 @@ void SSLClient::handleHandShake(const boost::system::error_code& errorCode) writeState(Tango::ON); writeStatus(infoStream.str()); - //m_protocolManager_sp->setRemoteEndpoint(m_remoteEndpoint); + m_protocolManager_sp->setRemoteEndpoint(m_remoteEndpoint); - //startWriteRequest(); + startWriteRequest(); } else { @@ -185,90 +185,100 @@ void SSLClient::handleHandShake(const boost::system::error_code& errorCode) } } -////============================================================================== -//// SSLClient::startWriteRequest() -////============================================================================== -//void SSLClient::startWriteRequest() -//{ -// DEBUG_STREAM << "SSLClient::startWriteRequest()" << endl; -// -// try -// { -// RequestSP request_sp = m_protocolManager_sp->createRequest(); -// -// boost::uint32_t bodySize = request_sp->ByteSize(); -// -// #ifdef VERBOSE_DEBUG -// INFO_STREAM << "SSLClient::startWriteRequest() " -// << m_remoteEndpoint << " <<<< " << bodySize << " byte" << endl; -// #endif -// -// std::vector<boost::uint8_t> writeBuff; -// writeBuff.resize(HEADER_SIZE + bodySize); -// -// encodeHeader(writeBuff, bodySize); -// -// request_sp->SerializeToArray(&writeBuff[HEADER_SIZE], bodySize); -// -// m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30)); -// -// boost::asio::async_write(m_sSLSocket, boost::asio::buffer(writeBuff), -// boost::bind(&SSLClient::handleWriteRequest, this, -// boost::asio::placeholders::error)); -// } -// catch(std::exception& ec) -// { -// ERROR_STREAM << "SSLClient::startWriteRequest() " << ec.what() << endl; -// -// writeState(Tango::FAULT); -// writeStatus(ec.what()); -// } -// catch(...) -// { -// ERROR_STREAM << "SSLClient::startWriteRequest() unknown error" << endl; -// -// writeState(Tango::FAULT); -// writeStatus("Unknown error"); -// } -//} -// -////============================================================================== -//// SSLClient::startReadResponseHeader() -////============================================================================== -//void SSLClient::startReadResponseHeader() -//{ -// DEBUG_STREAM << "SSLClient::startReadResponseHeader()" << endl; -// -// m_readBuff.resize(HEADER_SIZE); -// -// m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30)); -// -// boost::asio::async_read(m_sSLSocket, boost::asio::buffer(m_readBuff), -// boost::bind(&SSLClient::handleReadResponseHeader, this, -// boost::asio::placeholders::error)); -//} -// -////============================================================================== -//// SSLClient::startReadResponseBody() -////============================================================================== -//void SSLClient::startReadResponseBody(boost::uint32_t bodySize) -//{ -// DEBUG_STREAM << "SSLClient::startReadResponseBody()" << endl; -// -// #ifdef VERBOSE_DEBUG -// INFO_STREAM << "SSLClient::startReadResponseBody() " -// << m_remoteEndpoint << " >>>> " << bodySize << " byte" << endl; -// #endif -// -// m_readBuff.resize(HEADER_SIZE + bodySize); -// -// boost::asio::mutable_buffers_1 mutableBuffer = -// boost::asio::buffer(&m_readBuff[HEADER_SIZE], bodySize); -// -// boost::asio::async_read(m_sSLSocket, mutableBuffer, -// boost::bind(&SSLClient::handleReadResponseBody, this, -// boost::asio::placeholders::error)); -//} +//============================================================================== +// SSLClient::startWriteRequest() +//============================================================================== +void SSLClient::startWriteRequest() +{ + DEBUG_STREAM << "SSLClient::startWriteRequest()" << endl; + + try + { + //RequestSP request_sp = m_protocolManager_sp->createRequest(); + RequestSP request_sp(new Request); + + boost::uint32_t bodySize = request_sp->ByteSize(); + + #ifdef VERBOSE_DEBUG + INFO_STREAM << "SSLClient::startWriteRequest() " + << m_remoteEndpoint << " <<<< " << bodySize << " byte" << endl; + #endif + + std::vector<boost::uint8_t> writeBuff; + writeBuff.resize(HEADER_SIZE + bodySize); + + encodeHeader(writeBuff, bodySize); + + request_sp->SerializeToArray(&writeBuff[HEADER_SIZE], bodySize); + + m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30)); + + boost::asio::async_write(m_sSLSocket, boost::asio::buffer(writeBuff), + boost::bind(&SSLClient::handleWriteRequest, this, + boost::asio::placeholders::error)); + } + catch(std::exception& ec) + { + ERROR_STREAM << "SSLClient::startWriteRequest() " << ec.what() << endl; + + writeState(Tango::FAULT); + writeStatus(ec.what()); + } + catch(...) + { + ERROR_STREAM << "SSLClient::startWriteRequest() unknown error" << endl; + + writeState(Tango::FAULT); + writeStatus("Unknown error"); + } +} + +//============================================================================== +// SSLClient::startReadResponseHeader() +//============================================================================== +void SSLClient::startReadResponseHeader() +{ + DEBUG_STREAM << "SSLClient::startReadResponseHeader()" << endl; + + m_readBuff.resize(HEADER_SIZE); + + m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30)); + + boost::asio::async_read(m_sSLSocket, boost::asio::buffer(m_readBuff), + boost::bind(&SSLClient::handleReadResponseHeader, this, + boost::asio::placeholders::error)); +} + +//============================================================================== +// SSLClient::startReadResponseBody() +//============================================================================== +void SSLClient::startReadResponseBody(boost::uint32_t bodySize) +{ + DEBUG_STREAM << "SSLClient::startReadResponseBody()" << endl; + + #ifdef VERBOSE_DEBUG + INFO_STREAM << "SSLClient::startReadResponseBody() " + << m_remoteEndpoint << " >>>> " << bodySize << " byte" << endl; + #endif + + m_readBuff.resize(HEADER_SIZE + bodySize); + + boost::asio::mutable_buffers_1 mutableBuffer = + boost::asio::buffer(&m_readBuff[HEADER_SIZE], bodySize); + + boost::asio::async_read(m_sSLSocket, mutableBuffer, + boost::bind(&SSLClient::handleReadResponseBody, this, + boost::asio::placeholders::error)); +} + +//============================================================================== +// SSLClient::startReadData() +//============================================================================== +void SSLClient::startReadData() +{ + DEBUG_STREAM << "SSLClient::startReadData()" << endl; + +} //============================================================================== // SSLClient::closeConnection() diff --git a/src/SSLClient.h b/src/SSLClient.h index 10f1fbbf7ca89bb574ab10898341547383e314c1..064991c19eb71edcdc6c78a521650319d73d6bdd 100644 --- a/src/SSLClient.h +++ b/src/SSLClient.h @@ -58,11 +58,13 @@ protected: //------------------------------------------------------------------------------ // [Protected] Request response methods //------------------------------------------------------------------------------ -// virtual void startWriteRequest(); -// -// virtual void startReadResponseHeader(); -// -// virtual void startReadResponseBody(boost::uint32_t); + virtual void startWriteRequest(); + + virtual void startReadResponseHeader(); + + virtual void startReadResponseBody(boost::uint32_t); + + virtual void startReadData(); //------------------------------------------------------------------------------ // [Protected] Connection close method