diff --git a/src/Client.cpp b/src/Client.cpp index 096497ee14d06b133e9452a7cb78c0ef87ea61f6..abb1439f7a2f851c95ad35f53a1db00a75ee577e 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -223,6 +223,8 @@ void Client::handleUpdateLists() { m_listsUpdateTimer.expires_from_now( boost::posix_time::seconds(m_configuration_sp->getRefreshTime())); + + m_listsUpdateTimer.async_wait(boost::bind(&Client::handleUpdateLists, this)); } } @@ -333,16 +335,14 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE], m_readBuff.size() - HEADER_SIZE); - FileWrapper::SP fileWrapper_sp = - m_protocolManager_sp->processResponse(response_sp); - - startReadData(fileWrapper_sp); + startReadData(m_protocolManager_sp->processResponse(response_sp)); } catch(std::logic_error& ec) { WARN_STREAM << "Client::handleReadResponseBody() " << ec.what() << endl; - //TODO: mark file as failed and try with next next + writeState(Tango::ALARM); + writeStatus(ec.what()); } catch(std::runtime_error& ec) { @@ -350,8 +350,6 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) writeState(Tango::ALARM); writeStatus(ec.what()); - - //TODO: stop and set ALARM } catch(...) { @@ -359,8 +357,6 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode) writeState(Tango::ALARM); writeStatus("Unknown error"); - - //TODO: shit storm happens... stop and set ALARM } } else @@ -380,40 +376,50 @@ void Client::handleReadData(FileWrapper::SP fileWrapper_sp, std::size_t recvByte { if(!errorCode) { - //TODO: if output stream is bad? - - if(recvBytes>0) - fileWrapper_sp->write(m_fileBuff, recvBytes); - - if(!fileWrapper_sp->isCompleted()) + if(!fileWrapper_sp->isBad()) { - startReadData(fileWrapper_sp); - } - else - { - INFO_STREAM << "Client::handleReadData() transfer complete " << endl; + if(recvBytes>0) + fileWrapper_sp->write(m_fileBuff, recvBytes); - if(m_protocolManager_sp->hasNextFile()) + if(!fileWrapper_sp->isCompleted()) { - m_protocolManager_sp->nextFile(); - - startWriteRequest(); + startReadData(fileWrapper_sp); } else { - startUpdateLists(); + INFO_STREAM << "Client::handleReadData() transfer complete " << endl; + + m_protocolManager_sp->markAsCompleted(); + + m_protocolManager_sp->nextFile(); + + if(m_protocolManager_sp->hasNextFile()) + { + startWriteRequest(); + } + else + { + closeConnection(); + + startUpdateLists(); + } } } - } - else if(errorCode == boost::asio::error::eof) - { - DEBUG_STREAM << "Client::handleReadData() end of file from " - << m_remoteEndpoint << endl; + else + { + ERROR_STREAM << "Client::handleReadData() bad I/O" << endl; + + writeState(Tango::ALARM); + writeStatus("Bad I/O"); + } } else { ERROR_STREAM << "Client::handleReadData() " << errorCode.message() << " from " << m_remoteEndpoint << endl; + + writeState(Tango::ALARM); + writeStatus(errorCode.message()); } } @@ -429,15 +435,10 @@ void Client::resetConnection() { ERROR_STREAM << "Client::resetConnection() Connection timeout" << endl; - m_resetConnectionTimer.expires_at(boost::posix_time::pos_infin); - m_listsUpdateTimer.expires_at(boost::posix_time::pos_infin); - closeConnection(); startUpdateLists(); } - - m_resetConnectionTimer.async_wait(boost::bind(&Client::resetConnection, this)); } //============================================================================== diff --git a/src/PlainClient.cpp b/src/PlainClient.cpp index c8f5022b99ecdf6db94ba8d2e2ab8b26860bf0ec..13dfdce7f96917584fe932b11b54c55eaa1f0247 100644 --- a/src/PlainClient.cpp +++ b/src/PlainClient.cpp @@ -47,7 +47,8 @@ void PlainClient::startConnect(boost::asio::ip::tcp::resolver::iterator endPoint { DEBUG_STREAM << "PlainClient::startConnect()" << endl; - m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30)); + m_resetConnectionTimer.expires_from_now( + boost::posix_time::seconds(m_configuration_sp->getTimeout())); if(endPointIterator != boost::asio::ip::tcp::resolver::iterator()) { @@ -135,7 +136,8 @@ void PlainClient::startWriteRequest() request_sp->SerializeToArray(&writeBuff[HEADER_SIZE], bodySize); - m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30)); + m_resetConnectionTimer.expires_from_now( + boost::posix_time::seconds(m_configuration_sp->getTimeout())); boost::asio::async_write(m_plainSocket, boost::asio::buffer(writeBuff), boost::bind(&PlainClient::handleWriteRequest, this, @@ -173,7 +175,8 @@ void PlainClient::startReadResponseHeader() m_readBuff.resize(HEADER_SIZE); - m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30)); + m_resetConnectionTimer.expires_from_now( + boost::posix_time::seconds(m_configuration_sp->getTimeout())); boost::asio::async_read(m_plainSocket, boost::asio::buffer(m_readBuff), boost::bind(&PlainClient::handleReadResponseHeader, this, @@ -218,6 +221,9 @@ void PlainClient::startReadData(FileWrapper::SP fileWrapper_sp) m_fileBuff.resize(bufferSize); + m_resetConnectionTimer.expires_from_now( + boost::posix_time::seconds(m_configuration_sp->getTimeout())); + boost::asio::async_read(m_plainSocket, boost::asio::buffer(m_fileBuff), boost::bind(&PlainClient::handleReadData, this, fileWrapper_sp, boost::asio::placeholders::bytes_transferred, @@ -236,6 +242,8 @@ void PlainClient::closeConnection() INFO_STREAM << "PlainClient::closeConnection() " << infoStream.str() << endl; + m_resetConnectionTimer.expires_at(boost::posix_time::pos_infin); + boost::system::error_code errorCode; m_plainSocket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, errorCode); diff --git a/src/ProtocolManager.cpp b/src/ProtocolManager.cpp index 31b77de25bb4c843ffae513df45edeb6d999934a..9e1fdfacd353ca22ad43bf6880756e57137fdca4 100644 --- a/src/ProtocolManager.cpp +++ b/src/ProtocolManager.cpp @@ -100,6 +100,58 @@ bool ProtocolManager::hasNextFile() } } +//============================================================================== +// ProtocolManager::markAsCompleted() +//============================================================================== +void ProtocolManager::markAsCompleted() +{ + DEBUG_STREAM << "ProtocolManager::markAsCompleted()" << endl; + + if(!m_recoveryMode) + { + if(!m_newFileRowset_sp || + m_newFileRowsetIt == m_newFileRowset_sp->end()) + throw std::runtime_error("New list not initialized or empty"); + + if(!m_newFileRowsetIt->get<0>()) + throw std::invalid_argument("Empty file version found on new list"); + int fileVersion = m_newFileRowsetIt->get<0>().get(); + + if(!m_newFileRowsetIt->get<1>()) + throw std::invalid_argument("Empty file name found on new list"); + std::string fileName = m_newFileRowsetIt->get<1>().get(); + + if(!m_newFileRowsetIt->get<2>()) + throw std::invalid_argument("Empty update time found on new list"); + std::tm update_time = m_newFileRowsetIt->get<2>().get(); + + INFO_STREAM << "ProtocolManager::createRequest() mark completed " + << fileName << " version " << fileVersion << endl; + + boost::posix_time::ptime current_time = + boost::posix_time::ptime_from_tm(update_time); + + INFO_STREAM << "ProtocolManager::createRequest() " + << boost::posix_time::to_simple_string(current_time) << endl; + + m_dBManager_sp->persistLastTimestamp(current_time); + } + else + { + ERROR_STREAM << "ProtocolManager::createRequest() mark failed list" << endl; + } +} + +//============================================================================== +// ProtocolManager::markAsFailed() +//============================================================================== +void ProtocolManager::markAsFailed() +{ + DEBUG_STREAM << "ProtocolManager::markAsFailed()" << endl; + + +} + //============================================================================== // ProtocolManager::nextFile() //============================================================================== @@ -112,7 +164,7 @@ void ProtocolManager::nextFile() if(m_newFileRowset_sp && m_newFileRowsetIt != m_newFileRowset_sp->end()) { - DEBUG_STREAM << "ProtocolManager::nextFile() from new list" << endl; + DEBUG_STREAM << "ProtocolManager::nextFile() new list" << endl; ++m_newFileRowsetIt; } diff --git a/src/ProtocolManager.h b/src/ProtocolManager.h index 7daf8811076cc660811005daaaf89517f5c05f50..42ee2cf76fa97a58c95df2f32702897954487ff2 100644 --- a/src/ProtocolManager.h +++ b/src/ProtocolManager.h @@ -63,6 +63,10 @@ public: virtual bool hasNextFile(); + virtual void markAsCompleted(); + + virtual void markAsFailed(); + virtual void nextFile(); //------------------------------------------------------------------------------ @@ -78,8 +82,6 @@ protected: //------------------------------------------------------------------------------ // [Protected] File path method //------------------------------------------------------------------------------ - - virtual boost::filesystem::path composePath(std::string, std::string, int); //------------------------------------------------------------------------------ diff --git a/src/SSLClient.cpp b/src/SSLClient.cpp index 66a072e3df01c101ecc811c97656424367eea31e..8feed20d775c50540efba1a80d7a3f39fec482cb 100644 --- a/src/SSLClient.cpp +++ b/src/SSLClient.cpp @@ -65,7 +65,8 @@ void SSLClient::startConnect(boost::asio::ip::tcp::resolver::iterator endPointIt { DEBUG_STREAM << "SSLClient::startConnect()" << endl; - m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30)); + m_resetConnectionTimer.expires_from_now( + boost::posix_time::seconds(m_configuration_sp->getTimeout())); if(endPointIterator != boost::asio::ip::tcp::resolver::iterator()) { @@ -122,7 +123,8 @@ void SSLClient::startHandShake() { DEBUG_STREAM << "SSLClient::startHandShake()" << endl; - m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30)); + m_resetConnectionTimer.expires_from_now( + boost::posix_time::seconds(m_configuration_sp->getTimeout())); m_sSLSocket.async_handshake(boost::asio::ssl::stream_base::client, boost::bind(&SSLClient::handleHandShake, this, @@ -187,7 +189,8 @@ void SSLClient::startWriteRequest() request_sp->SerializeToArray(&writeBuff[HEADER_SIZE], bodySize); - m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30)); + m_resetConnectionTimer.expires_from_now( + boost::posix_time::seconds(m_configuration_sp->getTimeout())); boost::asio::async_write(m_sSLSocket, boost::asio::buffer(writeBuff), boost::bind(&SSLClient::handleWriteRequest, this, @@ -225,7 +228,8 @@ void SSLClient::startReadResponseHeader() m_readBuff.resize(HEADER_SIZE); - m_resetConnectionTimer.expires_from_now(boost::posix_time::seconds(30)); + m_resetConnectionTimer.expires_from_now( + boost::posix_time::seconds(m_configuration_sp->getTimeout())); boost::asio::async_read(m_sSLSocket, boost::asio::buffer(m_readBuff), boost::bind(&SSLClient::handleReadResponseHeader, this, @@ -270,6 +274,9 @@ void SSLClient::startReadData(FileWrapper::SP fileWrapper_sp) m_fileBuff.resize(bufferSize); + m_resetConnectionTimer.expires_from_now( + boost::posix_time::seconds(m_configuration_sp->getTimeout())); + boost::asio::async_read(m_sSLSocket, boost::asio::buffer(m_fileBuff), boost::bind(&SSLClient::handleReadData, this, fileWrapper_sp, boost::asio::placeholders::bytes_transferred, @@ -288,6 +295,8 @@ void SSLClient::closeConnection() INFO_STREAM << "SSLClient::closeConnection() " << infoStream.str() << endl; + m_resetConnectionTimer.expires_at(boost::posix_time::pos_infin); + boost::system::error_code errorCode; m_sSLSocket.lowest_layer().shutdown(