diff --git a/src/Client.cpp b/src/Client.cpp index 57d2f47e97aae3fd227ff920363275e35059bdd0..7942dd298e3c2f8e4af1d029063575b8444bf519 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -189,9 +189,7 @@ void Client::startUpdateLists() try { - m_protocolManager_sp->updateNewList(); - - m_protocolManager_sp->updateFailedList(); + m_protocolManager_sp->retrieveFiles(); writeState(Tango::ON); writeStatus("Database loop active"); @@ -211,19 +209,9 @@ void Client::startUpdateLists() writeStatus("Unknown error"); } - if(readState() != Tango::ALARM && - m_protocolManager_sp->hasNextNewList()) + if(readState() == Tango::ON && + m_protocolManager_sp->hasFilesToTransfer()) { - m_protocolManager_sp->setRecoveryMode(false); - - startResolve(); - } - else if(readState() != Tango::ALARM && - m_protocolManager_sp->hasNextFailedList() && - m_protocolManager_sp->isRecoveryTimeElapsed()) - { - m_protocolManager_sp->setRecoveryMode(true); - startResolve(); } else @@ -449,21 +437,10 @@ void Client::onTransferCompleted(FileWrapper::SP fileWrapper_sp) try { - if(!m_protocolManager_sp->getRecoveryMode()) - m_protocolManager_sp->setNewFileTransfered(fileWrapper_sp); - else - m_protocolManager_sp->setFailedFileTransfered(fileWrapper_sp); - - if(m_protocolManager_sp->hasNextNewList()) - { - m_protocolManager_sp->setRecoveryMode(false); + m_protocolManager_sp->setCurrentFileDownloaded(fileWrapper_sp); - startWriteRequest(); - } - else if(m_protocolManager_sp->hasNextFailedList()) + if(m_protocolManager_sp->hasNextFile()) { - m_protocolManager_sp->setRecoveryMode(true); - startWriteRequest(); } else @@ -498,21 +475,10 @@ void Client::onTransferFailed() try { - if(!m_protocolManager_sp->getRecoveryMode()) - m_protocolManager_sp->setNewFileFailed(); - else - m_protocolManager_sp->setFailedFileFailed(); + m_protocolManager_sp->setCurrentFileFailed(); - if(m_protocolManager_sp->hasNextNewList()) + if(m_protocolManager_sp->hasNextFile()) { - m_protocolManager_sp->setRecoveryMode(false); - - startWriteRequest(); - } - else if(m_protocolManager_sp->hasNextFailedList()) - { - m_protocolManager_sp->setRecoveryMode(true); - startWriteRequest(); } else diff --git a/src/Client.h b/src/Client.h index 9d9ba258e5fdfbc459ee7484414241965f4854c1..8d77a299808fb1cc55a1df0e4ebeb03fcae53072 100644 --- a/src/Client.h +++ b/src/Client.h @@ -174,12 +174,6 @@ protected: //File list update time boost::asio::deadline_timer m_listsUpdateTimer; - //Header size on binary stream - static const unsigned HEADER_SIZE = 4; - - //Buffer for binary data read from stream - std::vector<boost::uint8_t> m_readBuff; - //Tango state property mutex boost::mutex m_stateMutex; @@ -195,6 +189,12 @@ protected: //Address and port of remote endpoint std::string m_remoteEndpoint; + //Header size on binary stream + static const unsigned HEADER_SIZE = 4; + + //Buffer for binary data read from stream + std::vector<boost::uint8_t> m_readBuff; + //Read buffer size static const boost::uint64_t BUFFER_SIZE = 40960; diff --git a/src/DBManager.cpp b/src/DBManager.cpp index e8fbcfd24fb4cccb6327f30bd2809a8bc5de1ff1..80dad75e89f40e9d54ecb9f76e39a9033bd76a3b 100644 --- a/src/DBManager.cpp +++ b/src/DBManager.cpp @@ -145,49 +145,6 @@ DBManager::TransactionSP DBManager::getAuxTransaction() return transaction_sp; } -//============================================================================== -// DBManager::retrieveNewFiles() -//============================================================================== -DBManager::NewFileRowsetSP DBManager::retrieveNewFiles(boost::posix_time::ptime ptime) - throw(soci::soci_error) -{ - DEBUG_STREAM << "DBManager::retrieveNewFiles()" << endl; - - boost::mutex::scoped_lock lock(m_sessionMutex); - - if(m_mainSession_sp->get_backend() == NULL) - m_mainSession_sp->reconnect(); - - NewFileRowsetSP newFileRowset_sp(new NewFileRowset(m_mainSession_sp->prepare - << "select file_version, file_name, update_time from " - << m_configuration_sp->getDatabaseSchema() << "." - << m_configuration_sp->getDatabaseTable() << " where update_time>'" - << boost::posix_time::to_iso_string(ptime) << "' order by update_time asc")); - - return newFileRowset_sp; -} - -//============================================================================== -// DBManager::updateNewFilePath() -//============================================================================== -void DBManager::updateNewFilePath(std::string storagePath, std::string filePath, - int fileVersion, std::string fileName) throw(soci::soci_error) -{ - DEBUG_STREAM << "DBManager::updateNewFilePath()" << endl; - - boost::mutex::scoped_lock lock(m_sessionMutex); - - if(m_mainSession_sp->get_backend() == NULL) - m_mainSession_sp->reconnect(); - - *m_mainSession_sp << "update " << m_configuration_sp->getDatabaseSchema() - << "." << m_configuration_sp->getDatabaseTable() - << " set storage_path = :storagePath, file_path = :filePath " - << " where file_version = :fileVersion and file_name like :fileName", - soci::use(storagePath, "storagePath"), soci::use(filePath, "filePath"), - soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName"); -} - //============================================================================== // DBManager::retrieveLastTimestamp() //============================================================================== @@ -232,6 +189,49 @@ void DBManager::persistLastTimestamp(boost::posix_time::ptime ptime) << " last_timestamp='" << boost::posix_time::to_iso_string(ptime) << "'"; } +//============================================================================== +// DBManager::retrieveNewFiles() +//============================================================================== +DBManager::FileRowsetSP DBManager::retrieveNewFiles(boost::posix_time::ptime ptime) + throw(soci::soci_error) +{ + DEBUG_STREAM << "DBManager::retrieveNewFiles()" << endl; + + boost::mutex::scoped_lock lock(m_sessionMutex); + + if(m_mainSession_sp->get_backend() == NULL) + m_mainSession_sp->reconnect(); + + FileRowsetSP newFileRowset_sp(new FileRowset(m_mainSession_sp->prepare + << "select storage_path, file_path, file_version, file_name, update_time " + << "from " << m_configuration_sp->getDatabaseSchema() << "." + << m_configuration_sp->getDatabaseTable() << " where update_time>'" + << boost::posix_time::to_iso_string(ptime) << "' order by update_time asc")); + + return newFileRowset_sp; +} + +//============================================================================== +// DBManager::updateNewFilePath() +//============================================================================== +void DBManager::updateNewFilePath(std::string storagePath, std::string filePath, + int fileVersion, std::string fileName) throw(soci::soci_error) +{ + DEBUG_STREAM << "DBManager::updateNewFilePath()" << endl; + + boost::mutex::scoped_lock lock(m_sessionMutex); + + if(m_mainSession_sp->get_backend() == NULL) + m_mainSession_sp->reconnect(); + + *m_mainSession_sp << "update " << m_configuration_sp->getDatabaseSchema() + << "." << m_configuration_sp->getDatabaseTable() + << " set storage_path = :storagePath, file_path = :filePath " + << " where file_version = :fileVersion and file_name like :fileName", + soci::use(storagePath, "storagePath"), soci::use(filePath, "filePath"), + soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName"); +} + //============================================================================== // DBManager::addFailedFile() //============================================================================== @@ -276,7 +276,7 @@ void DBManager::removeFailedFile(int fileVersion, std::string fileName) //============================================================================== // DBManager::retrieveFailedFiles() //============================================================================== -DBManager::FailedFileRowsetSP DBManager::retrieveFailedFiles() +DBManager::FileRowsetSP DBManager::retrieveFailedFiles() throw(soci::soci_error) { DEBUG_STREAM << "DBManager::retrieveFailedFiles()" << endl; @@ -286,11 +286,15 @@ DBManager::FailedFileRowsetSP DBManager::retrieveFailedFiles() if(m_auxSession_sp->get_backend() == NULL) m_auxSession_sp->reconnect(); - FailedFileRowsetSP failedFileRowset_sp(new FailedFileRowset( - m_auxSession_sp->prepare << "select file_version, file_name from " + FileRowsetSP failedFileRowset_sp(new FileRowset( + m_auxSession_sp->prepare << "select m.storage_path, m.file_path, " + << " m.file_version, m.file_name, m.update_time from " + << m_configuration_sp->getDatabaseSchema() << "." + << m_configuration_sp->getDatabaseTable() << " as m join " << m_configuration_sp->getAuxDatabaseSchema() << "." - << m_configuration_sp->getAuxDatabaseFailedTable() - << " where device_name like '" << m_deviceName << "'")); + << m_configuration_sp->getAuxDatabaseFailedTable() << " as f " + << "on f.file_version = m.file_version and f.file_name = m.file_name " + << "where device_name like '" << m_deviceName << "'")); return failedFileRowset_sp; } diff --git a/src/DBManager.h b/src/DBManager.h index 0103655cab67f7c303c1e6dad0eb284120faa362..e1f5980f4fed2548b084fec7a2c2bc7744432968 100644 --- a/src/DBManager.h +++ b/src/DBManager.h @@ -77,28 +77,32 @@ public: TransactionSP getAuxTransaction(); //------------------------------------------------------------------------------ -// [Public] New file method +// [Public] Timestamp methods //------------------------------------------------------------------------------ - typedef boost::tuple< boost::optional<int>, boost::optional<std::string>, - boost::optional<std::tm> > NewFileRow; - - typedef soci::rowset< NewFileRow > NewFileRowset; - - typedef boost::shared_ptr< NewFileRowset > NewFileRowsetSP; - - virtual NewFileRowsetSP retrieveNewFiles(boost::posix_time::ptime) + virtual boost::posix_time::ptime retrieveLastTimestamp() throw(soci::soci_error); - virtual void updateNewFilePath(std::string, std::string, int, std::string) - throw(soci::soci_error); + virtual void persistLastTimestamp(boost::posix_time::ptime) + throw(soci::soci_error); + +//------------------------------------------------------------------------------ +// [Public] File row set definition +//------------------------------------------------------------------------------ + typedef boost::tuple< boost::optional<std::string>, + boost::optional<std::string>, boost::optional<int>, + boost::optional<std::string>, boost::optional<std::tm> > FileRow; + + typedef soci::rowset< FileRow > FileRowset; + typedef boost::shared_ptr< FileRowset > FileRowsetSP; + //------------------------------------------------------------------------------ -// [Public] Timestamp methods +// [Public] New file method //------------------------------------------------------------------------------ - virtual boost::posix_time::ptime retrieveLastTimestamp() + virtual FileRowsetSP retrieveNewFiles(boost::posix_time::ptime) throw(soci::soci_error); - virtual void persistLastTimestamp(boost::posix_time::ptime) + virtual void updateNewFilePath(std::string, std::string, int, std::string) throw(soci::soci_error); //------------------------------------------------------------------------------ @@ -110,14 +114,7 @@ public: virtual void removeFailedFile(int, std::string) throw(soci::soci_error); - typedef boost::tuple< boost::optional<int>, - boost::optional<std::string> > FailedFileRow; - - typedef soci::rowset< FailedFileRow > FailedFileRowset; - - typedef boost::shared_ptr< FailedFileRowset > FailedFileRowsetSP; - - virtual FailedFileRowsetSP retrieveFailedFiles() + virtual FileRowsetSP retrieveFailedFiles() throw(soci::soci_error); protected: diff --git a/src/PlainClient.cpp b/src/PlainClient.cpp index 88c77f52601ae14c73aaeaf73e577294a2b17962..8e9be2083bfa24694494d83bc6ae4916eb6e3cff 100644 --- a/src/PlainClient.cpp +++ b/src/PlainClient.cpp @@ -122,12 +122,7 @@ void PlainClient::startWriteRequest() try { - RequestSP request_sp; - - if(!m_protocolManager_sp->getRecoveryMode()) - request_sp = m_protocolManager_sp->createNewListRequest(); - else - request_sp = m_protocolManager_sp->createFailedListRequest(); + RequestSP request_sp = m_protocolManager_sp->createtRequest(); boost::uint32_t bodySize = request_sp->ByteSize(); @@ -150,12 +145,6 @@ void PlainClient::startWriteRequest() boost::bind(&PlainClient::handleWriteRequest, this, boost::asio::placeholders::error)); } - catch(std::logic_error& ec) - { - WARN_STREAM << "PlainClient::startWriteRequest() " << ec.what() << endl; - - onTransferFailed(); - } catch(std::runtime_error& ec) { ERROR_STREAM << "PlainClient::startWriteRequest() " << ec.what() << endl; diff --git a/src/ProtocolManager.cpp b/src/ProtocolManager.cpp index 1175b5fa7c026108759d2fdeac0c7512fc0f1fcd..fc91f6cd6b719f078a1ed927c555e685a65ef229 100644 --- a/src/ProtocolManager.cpp +++ b/src/ProtocolManager.cpp @@ -7,7 +7,7 @@ namespace DataImporter_ns { //============================================================================== -// ProtocolManager::ProtocolManager() +// ProtocolManager::ProtocolManager() //============================================================================== ProtocolManager::ProtocolManager(DataImporter* dataImporter_p, Configuration::SP configuration_sp, DBManager::SP dBManager_sp) : @@ -18,7 +18,7 @@ ProtocolManager::ProtocolManager(DataImporter* dataImporter_p, } //============================================================================== -// ProtocolManager::~ProtocolManager() +// ProtocolManager::~ProtocolManager() //============================================================================== ProtocolManager::~ProtocolManager() { @@ -26,7 +26,7 @@ ProtocolManager::~ProtocolManager() } //============================================================================== -// ProtocolManager::create() +// ProtocolManager::create() //============================================================================== ProtocolManager::SP ProtocolManager::create(DataImporter* dataImporter_p, Configuration::SP configuration_sp, DBManager::SP dBManager_sp) @@ -38,7 +38,7 @@ ProtocolManager::SP ProtocolManager::create(DataImporter* dataImporter_p, } //============================================================================== -// ProtocolManager::setRemoteEndpoint() +// ProtocolManager::setRemoteEndpoint() //============================================================================== void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint) { @@ -48,12 +48,12 @@ void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint) } //============================================================================== -// ProtocolManager::updateNewList() +// ProtocolManager::retrieveFiles() //============================================================================== -void ProtocolManager::updateNewList() throw(std::runtime_error) +void ProtocolManager::retrieveFiles() throw(std::runtime_error) { - DEBUG_STREAM << "ProtocolManager::updateNewList()" << endl; - + DEBUG_STREAM << "ProtocolManager::retrieveFiles()" << endl; + boost::posix_time::ptime m_lastTimestamp = m_dBManager_sp->retrieveLastTimestamp(); @@ -61,194 +61,88 @@ void ProtocolManager::updateNewList() throw(std::runtime_error) << boost::posix_time::to_simple_string(m_lastTimestamp) << endl; m_newFileRowset_sp = m_dBManager_sp->retrieveNewFiles(m_lastTimestamp); - + m_newFileRowsetIt = m_newFileRowset_sp->begin(); + + m_failedFileRowset_sp = m_dBManager_sp->retrieveFailedFiles(); + + m_failedFileRowsetIt = m_failedFileRowset_sp->begin(); } //============================================================================== -// ProtocolManager::updateFailedList() +// ProtocolManager::hasFilesToTransfer() //============================================================================== -void ProtocolManager::updateFailedList() throw(std::runtime_error) +bool ProtocolManager::hasFilesToTransfer() { - DEBUG_STREAM << "ProtocolManager::updateFailedList()" << endl; - - m_failedFileRowset_sp = m_dBManager_sp->retrieveFailedFiles(); - - m_failedFileRowsetIt = m_failedFileRowset_sp->begin(); + DEBUG_STREAM << "ProtocolManager::hasFilesToTransfer()" << endl; + + if(m_newFileRowset_sp && + m_newFileRowsetIt != m_newFileRowset_sp->end()) + { + INFO_STREAM << "ProtocolManager::hasFilesToTransfer() in new list" << endl; + return true; + } + else if(m_failedFileRowset_sp && + m_failedFileRowsetIt != m_failedFileRowset_sp->end()) + { + if(isRecoveryTimeElapsed()) + { + INFO_STREAM << "ProtocolManager::hasFilesToTransfer() in failed list" << endl; + return true; + } + } + + return false; } //============================================================================== -// ProtocolManager::hasNextNewList() +// ProtocolManager::hasNextFile() //============================================================================== -bool ProtocolManager::hasNextNewList() +bool ProtocolManager::hasNextFile() { - DEBUG_STREAM << "ProtocolManager::hasNextNewList()" << endl; + DEBUG_STREAM << "ProtocolManager::hasNextFile()" << endl; if(m_newFileRowset_sp && m_newFileRowsetIt != m_newFileRowset_sp->end()) { - DEBUG_STREAM << "ProtocolManager::hasNextNewList() true" << endl; return true; } - else - { - DEBUG_STREAM << "ProtocolManager::hasNextNewList() false" << endl; - return false; - } -} - -//============================================================================== -// ProtocolManager::hasNextFailedList() -//============================================================================== -bool ProtocolManager::hasNextFailedList() -{ - DEBUG_STREAM << "ProtocolManager::hasNextFailedList()" << endl; - - if(m_failedFileRowset_sp && + else if(m_failedFileRowset_sp && m_failedFileRowsetIt != m_failedFileRowset_sp->end()) { - DEBUG_STREAM << "ProtocolManager::hasNextFailedList() true" << endl; return true; } else { - DEBUG_STREAM << "ProtocolManager::hasNextFailedList() false" << endl; return false; - } + } } //============================================================================== -// ProtocolManager::isRecoveryTimeElapsed() +// ProtocolManager::createtRequest() //============================================================================== -bool ProtocolManager::isRecoveryTimeElapsed() +RequestSP ProtocolManager::createtRequest() throw(std::runtime_error) { - DEBUG_STREAM << "ProtocolManager::isRecoveryTimeElapsed()" << endl; - - boost::posix_time::ptime now(boost::posix_time::second_clock::local_time()); - - if(m_recoveryModeTime.is_not_a_date_time()) - m_recoveryModeTime = now; - - boost::posix_time::time_duration diff = now - m_recoveryModeTime; - - DEBUG_STREAM << "ProtocolManager::isRecoveryTimeElapsed() " << diff.total_seconds() - << "/" << (int)m_configuration_sp->getRecoveryTime() << endl; - - if(diff.total_seconds() > (int)m_configuration_sp->getRecoveryTime()) + DEBUG_STREAM << "ProtocolManager::createtRequest()" << endl; + + if(m_newFileRowset_sp && + m_newFileRowsetIt != m_newFileRowset_sp->end()) { - DEBUG_STREAM << "ProtocolManager::isRecoveryTimeElapsed() true" << endl; - m_recoveryModeTime = now; - return true; + return fillRequest(m_newFileRowsetIt); } - else + else if(m_failedFileRowset_sp && + m_failedFileRowsetIt != m_failedFileRowset_sp->end()) { - DEBUG_STREAM << "ProtocolManager::isRecoveryTimeElapsed() false" << endl; - return false; + return fillRequest(m_failedFileRowsetIt); } + else + { + throw std::runtime_error("Lists not initialized"); + } } //============================================================================== -// ProtocolManager::getRecoveryMode() -//============================================================================== -bool ProtocolManager::getRecoveryMode() -{ - DEBUG_STREAM << "ProtocolManager::getRecoveryMode()" << endl; - - return m_recoveryMode; -} - -//============================================================================== -// ProtocolManager::setRecoveryMode() -//============================================================================== -void ProtocolManager::setRecoveryMode(bool recoveryMode) -{ - DEBUG_STREAM << "ProtocolManager::setRecoveryMode()" << endl; - - m_recoveryMode = recoveryMode; -} - -//============================================================================== -// ProtocolManager::createNewListRequest() -//============================================================================== -RequestSP ProtocolManager::createNewListRequest() - throw(std::logic_error, std::runtime_error) -{ - DEBUG_STREAM << "ProtocolManager::createNewListRequest()" << endl; - - if(!m_newFileRowset_sp || - m_newFileRowsetIt == m_newFileRowset_sp->end()) - throw std::runtime_error("New list not initialized or empty"); - - RequestSP request_sp(new Request); - - request_sp->set_username(m_configuration_sp->getDatabaseUsername()); - request_sp->set_password(m_configuration_sp->getDatabasePassword()); - - request_sp->set_schema(m_configuration_sp->getDatabaseSchema()); - request_sp->set_table(m_configuration_sp->getDatabaseTable()); - - if(!m_newFileRowsetIt->get<0>()) - throw std::invalid_argument("Empty file version found on new list"); - int fileVersion = m_newFileRowsetIt->get<0>().get(); - - if(!m_newFileRowsetIt->get<1>()) - throw std::invalid_argument("Empty file name found on new list"); - std::string fileName = m_newFileRowsetIt->get<1>().get(); - - request_sp->set_file_version(fileVersion); - request_sp->set_file_name(fileName); - - INFO_STREAM << "ProtocolManager::createNewListRequest() file " << fileName - << " version " << fileVersion << " to " << m_remoteEndpoint << endl; - - if(!request_sp->IsInitialized()) - throw std::runtime_error("Request not initialized"); - - return request_sp; -} - -//============================================================================== -// ProtocolManager::createFailedListRequest() -//============================================================================== -RequestSP ProtocolManager::createFailedListRequest() - throw(std::logic_error, std::runtime_error) -{ - DEBUG_STREAM << "ProtocolManager::createFailedListRequest()" << endl; - - if(!m_failedFileRowset_sp || - m_failedFileRowsetIt == m_failedFileRowset_sp->end()) - throw std::runtime_error("Failed list not initialized or empty"); - - RequestSP request_sp(new Request); - - request_sp->set_username(m_configuration_sp->getDatabaseUsername()); - request_sp->set_password(m_configuration_sp->getDatabasePassword()); - - request_sp->set_schema(m_configuration_sp->getDatabaseSchema()); - request_sp->set_table(m_configuration_sp->getDatabaseTable()); - - if(!m_failedFileRowsetIt->get<0>()) - throw std::invalid_argument("Empty file version found on failed list"); - int fileVersion = m_failedFileRowsetIt->get<0>().get(); - - if(!m_failedFileRowsetIt->get<1>()) - throw std::invalid_argument("Empty file name found on failed list"); - std::string fileName = m_failedFileRowsetIt->get<1>().get(); - - request_sp->set_file_version(fileVersion); - request_sp->set_file_name(fileName); - - INFO_STREAM << "ProtocolManager::createFailedListRequest() file " << fileName - << " version " << fileVersion << " to " << m_remoteEndpoint << endl; - - if(!request_sp->IsInitialized()) - throw std::runtime_error("Request not initialized"); - - return request_sp; -} - -//============================================================================== -// ProtocolManager::processResponse() +// ProtocolManager::processResponse() //============================================================================== FileWrapper::SP ProtocolManager::processResponse(ResponseSP response_sp) throw(std::logic_error, std::runtime_error) @@ -263,14 +157,14 @@ FileWrapper::SP ProtocolManager::processResponse(ResponseSP response_sp) std::string filePath = response_sp->file_path(); if(filePath.empty()) - throw std::invalid_argument("Empty file path received"); + throw std::runtime_error("Empty file path received"); int fileVersion = response_sp->file_version(); std::string fileName = response_sp->file_name(); if(fileName.empty()) - throw std::invalid_argument("Empty file path received"); + throw std::runtime_error("Empty file path received"); boost::uint64_t fileSize = response_sp->file_size(); @@ -293,173 +187,215 @@ FileWrapper::SP ProtocolManager::processResponse(ResponseSP response_sp) } //============================================================================== -// ProtocolManager::setNewFileTransfered() +// ProtocolManager::setCurrentFileDownloaded() //============================================================================== -void ProtocolManager::setNewFileTransfered(FileWrapper::SP fileWrapper_sp) - throw(std::logic_error, std::runtime_error) +void ProtocolManager::setCurrentFileDownloaded(FileWrapper::SP fileWrapper_sp) + throw(std::runtime_error) { - DEBUG_STREAM << "ProtocolManager::setNewFileTransfered()" << endl; - - if(!m_newFileRowset_sp || - m_newFileRowsetIt == m_newFileRowset_sp->end()) - throw std::runtime_error("New list not initialized or empty"); + DEBUG_STREAM << "ProtocolManager::setCurrentFileDownloaded()" << endl; std::string storagePath = fileWrapper_sp->getStoragePath(); std::string filePath = fileWrapper_sp->getFilePath(); + + if(m_newFileRowset_sp && + m_newFileRowsetIt != m_newFileRowset_sp->end()) + { + if(!m_newFileRowsetIt->get<2>()) + throw std::runtime_error("Empty file version found"); + int fileVersion = m_newFileRowsetIt->get<2>().get(); - if(!m_newFileRowsetIt->get<0>()) - throw std::invalid_argument("Empty file version found on new list"); - int fileVersion = m_newFileRowsetIt->get<0>().get(); + if(!m_newFileRowsetIt->get<3>()) + throw std::runtime_error("Empty file name found"); + std::string fileName = m_newFileRowsetIt->get<3>().get(); - if(!m_newFileRowsetIt->get<1>()) - throw std::invalid_argument("Empty file name found on new list"); - std::string fileName = m_newFileRowsetIt->get<1>().get(); + if(!m_newFileRowsetIt->get<4>()) + throw std::runtime_error("Empty update time found"); + std::tm currentTm = m_newFileRowsetIt->get<4>().get(); - if(!m_newFileRowsetIt->get<2>()) - throw std::invalid_argument("Empty update time found on new list"); - std::tm currentTm = m_newFileRowsetIt->get<2>().get(); + INFO_STREAM << "ProtocolManager::setNewFileTransfered() file " + << fileName << " version " << fileVersion << " transfered" << endl; - INFO_STREAM << "ProtocolManager::setNewFileTransfered() file " - << fileName << " version " << fileVersion << " transfered" << endl; + boost::posix_time::ptime currentPtime = + boost::posix_time::ptime_from_tm(currentTm); - boost::posix_time::ptime currentPtime = - boost::posix_time::ptime_from_tm(currentTm); + boost::posix_time::ptime nextPtime(boost::posix_time::pos_infin); - boost::posix_time::ptime nextPtime(boost::posix_time::pos_infin); + ++m_newFileRowsetIt; - //FIXME: not incremented in case of exception!!! - ++m_newFileRowsetIt; + if(m_newFileRowsetIt != m_newFileRowset_sp->end()) + { + if(!m_newFileRowsetIt->get<4>()) + throw std::runtime_error("Empty next update time found"); + std::tm nextTm = m_newFileRowsetIt->get<4>().get(); - if(m_newFileRowsetIt != m_newFileRowset_sp->end()) - { - if(!m_newFileRowsetIt->get<2>()) - throw std::invalid_argument("Empty next update time found on new list"); - std::tm nextTm = m_newFileRowsetIt->get<2>().get(); + nextPtime =boost::posix_time::ptime_from_tm(nextTm); + } + + DBManager::TransactionSP auxTransaction_sp = m_dBManager_sp->getAuxTransaction(); + DBManager::TransactionSP mainTransaction_sp = m_dBManager_sp->getMainTransaction(); + + if(nextPtime > currentPtime) + m_dBManager_sp->persistLastTimestamp(currentPtime); - nextPtime =boost::posix_time::ptime_from_tm(nextTm); + m_dBManager_sp->updateNewFilePath(storagePath, filePath, fileVersion, fileName); + + auxTransaction_sp->commit(); + mainTransaction_sp->commit(); } + else if(m_failedFileRowset_sp && + m_failedFileRowsetIt != m_failedFileRowset_sp->end()) + { + if(!m_failedFileRowsetIt->get<2>()) + throw std::runtime_error("Empty file version found"); + int fileVersion = m_failedFileRowsetIt->get<2>().get(); - DBManager::TransactionSP auxTransaction_sp = m_dBManager_sp->getAuxTransaction(); - DBManager::TransactionSP mainTransaction_sp = m_dBManager_sp->getMainTransaction(); + if(!m_failedFileRowsetIt->get<3>()) + throw std::runtime_error("Empty file name found"); + string fileName = m_failedFileRowsetIt->get<3>().get(); - if(nextPtime > currentPtime) - m_dBManager_sp->persistLastTimestamp(currentPtime); + ++m_failedFileRowsetIt; - m_dBManager_sp->updateNewFilePath(storagePath, filePath, fileVersion, fileName); + DBManager::TransactionSP auxTransaction_sp = m_dBManager_sp->getAuxTransaction(); + DBManager::TransactionSP mainTransaction_sp = m_dBManager_sp->getMainTransaction(); - auxTransaction_sp->commit(); - mainTransaction_sp->commit(); + m_dBManager_sp->removeFailedFile(fileVersion, fileName); - m_dataImporter_p->incrementRegularCounter(); + m_dBManager_sp->updateNewFilePath(storagePath, filePath, fileVersion, fileName); + + auxTransaction_sp->commit(); + mainTransaction_sp->commit(); + } + else + { + throw std::runtime_error("Lists not initialized"); + } } //============================================================================== -// ProtocolManager::setFailedFileTransfered() +// ProtocolManager::setCurrentFileFailed() //============================================================================== -void ProtocolManager::setFailedFileTransfered(FileWrapper::SP fileWrapper_sp) - throw(std::logic_error, std::runtime_error) +void ProtocolManager::setCurrentFileFailed() throw(std::runtime_error) { - DEBUG_STREAM << "ProtocolManager::setFailedFileTransfered()" << endl; + DEBUG_STREAM << "ProtocolManager::setCurrentFileFailed()" << endl; - if(!m_failedFileRowset_sp || - m_failedFileRowsetIt == m_failedFileRowset_sp->end()) - throw std::runtime_error("Failed list not initialized or empty"); + if(m_newFileRowset_sp && + m_newFileRowsetIt != m_newFileRowset_sp->end()) + { + if(!m_newFileRowsetIt->get<2>()) + throw std::runtime_error("Empty file version found"); + int fileVersion = m_newFileRowsetIt->get<2>().get(); - std::string storagePath = fileWrapper_sp->getStoragePath(); - std::string filePath = fileWrapper_sp->getFilePath(); + if(!m_newFileRowsetIt->get<3>()) + throw std::runtime_error("Empty file name found"); + string fileName = m_newFileRowsetIt->get<3>().get(); + + if(!m_newFileRowsetIt->get<4>()) + throw std::runtime_error("Empty update time found"); + std::tm currentTm = m_newFileRowsetIt->get<4>().get(); - if(!m_failedFileRowsetIt->get<0>()) - throw std::invalid_argument("Empty file version found on failed list"); - int fileVersion = m_failedFileRowsetIt->get<0>().get(); + INFO_STREAM << "ProtocolManager::setFileFailed() file " + << fileName << " version " << fileVersion << " not transfered" << endl; - if(!m_failedFileRowsetIt->get<1>()) - throw std::invalid_argument("Empty file name found on failed list"); - string fileName = m_failedFileRowsetIt->get<1>().get(); + boost::posix_time::ptime currentPtime = + boost::posix_time::ptime_from_tm(currentTm); - //FIXME: not incremented in case of exception!!! - ++m_failedFileRowsetIt; + boost::posix_time::ptime nextPtime(boost::posix_time::pos_infin); - DBManager::TransactionSP auxTransaction_sp = m_dBManager_sp->getAuxTransaction(); - DBManager::TransactionSP mainTransaction_sp = m_dBManager_sp->getMainTransaction(); + ++m_newFileRowsetIt; - m_dBManager_sp->removeFailedFile(fileVersion, fileName); + if(m_newFileRowsetIt != m_newFileRowset_sp->end()) + { + if(!m_newFileRowsetIt->get<4>()) + throw std::runtime_error("Empty next update time found"); + std::tm nextTm = m_newFileRowsetIt->get<4>().get(); - m_dBManager_sp->updateNewFilePath(storagePath, filePath, fileVersion, fileName); + nextPtime =boost::posix_time::ptime_from_tm(nextTm); + } - auxTransaction_sp->commit(); - mainTransaction_sp->commit(); + DBManager::TransactionSP auxTransaction_sp = m_dBManager_sp->getAuxTransaction(); + DBManager::TransactionSP mainTransaction_sp = m_dBManager_sp->getMainTransaction(); - m_dataImporter_p->decrementFailedCounter(); - m_dataImporter_p->incrementRegularCounter(); + if(nextPtime > currentPtime) + m_dBManager_sp->persistLastTimestamp(currentPtime); + + m_dBManager_sp->addFailedFile(fileVersion, fileName); + + auxTransaction_sp->commit(); + mainTransaction_sp->commit(); + } + else if(m_failedFileRowset_sp && + m_failedFileRowsetIt != m_failedFileRowset_sp->end()) + { + ++m_failedFileRowsetIt; + } + else + { + throw std::runtime_error("Lists not initialized"); + } } //============================================================================== -// ProtocolManager::setNewFileFailed() +// ProtocolManager::isRecoveryTimeElapsed() //============================================================================== -void ProtocolManager::setNewFileFailed() - throw(std::logic_error, std::runtime_error) +bool ProtocolManager::isRecoveryTimeElapsed() { - DEBUG_STREAM << "ProtocolManager::setNewFileFailed()" << endl; - - if(!m_newFileRowset_sp || - m_newFileRowsetIt == m_newFileRowset_sp->end()) - throw std::runtime_error("New list not initialized or empty"); - - if(!m_newFileRowsetIt->get<0>()) - throw std::invalid_argument("Empty file version found on new list"); - int fileVersion = m_newFileRowsetIt->get<0>().get(); - - if(!m_newFileRowsetIt->get<1>()) - throw std::invalid_argument("Empty file name found on new list"); - string fileName = m_newFileRowsetIt->get<1>().get(); - - if(!m_newFileRowsetIt->get<2>()) - throw std::invalid_argument("Empty update time found on new list"); - std::tm currentTm = m_newFileRowsetIt->get<2>().get(); + DEBUG_STREAM << "ProtocolManager::isRecoveryTimeElapsed()" << endl; - INFO_STREAM << "ProtocolManager::setFileFailed() file " - << fileName << " version " << fileVersion << " not transfered" << endl; + boost::posix_time::ptime now(boost::posix_time::second_clock::local_time()); - boost::posix_time::ptime currentPtime = - boost::posix_time::ptime_from_tm(currentTm); + if(m_recoveryModeTime.is_not_a_date_time()) + m_recoveryModeTime = now; - boost::posix_time::ptime nextPtime(boost::posix_time::pos_infin); + boost::posix_time::time_duration diff = now - m_recoveryModeTime; - //FIXME: not incremented in case of exception!!! - ++m_newFileRowsetIt; + DEBUG_STREAM << "ProtocolManager::isRecoveryTimeElapsed() " << diff.total_seconds() + << "/" << (int)m_configuration_sp->getRecoveryTime() << endl; - if(m_newFileRowsetIt != m_newFileRowset_sp->end()) + if(diff.total_seconds() > (int)m_configuration_sp->getRecoveryTime()) { - if(!m_newFileRowsetIt->get<2>()) - throw std::invalid_argument("Empty next update time found on new list"); - std::tm nextTm = m_newFileRowsetIt->get<2>().get(); - - nextPtime =boost::posix_time::ptime_from_tm(nextTm); + m_recoveryModeTime = now; + return true; + } + else + { + return false; } +} - DBManager::TransactionSP auxTransaction_sp = m_dBManager_sp->getAuxTransaction(); - DBManager::TransactionSP mainTransaction_sp = m_dBManager_sp->getMainTransaction(); +//============================================================================== +// ProtocolManager::fillRequest() +//============================================================================== +RequestSP ProtocolManager::fillRequest(DBManager::FileRowset::const_iterator it) + throw(std::runtime_error) +{ + DEBUG_STREAM << "ProtocolManager::fillRequest()" << endl; + + RequestSP request_sp(new Request); - if(nextPtime > currentPtime) - m_dBManager_sp->persistLastTimestamp(currentPtime); + request_sp->set_username(m_configuration_sp->getDatabaseUsername()); + request_sp->set_password(m_configuration_sp->getDatabasePassword()); - m_dBManager_sp->addFailedFile(fileVersion, fileName); + request_sp->set_schema(m_configuration_sp->getDatabaseSchema()); + request_sp->set_table(m_configuration_sp->getDatabaseTable()); - auxTransaction_sp->commit(); - mainTransaction_sp->commit(); + if(!it->get<2>()) + throw std::runtime_error("Empty file version found"); + int fileVersion = it->get<2>().get(); - m_dataImporter_p->incrementFailedCounter(); -} + if(!it->get<3>()) + throw std::runtime_error("Empty file name found"); + std::string fileName = it->get<3>().get(); -//============================================================================== -// ProtocolManager::setFailedFileFailed() -//============================================================================== -void ProtocolManager::setFailedFileFailed() - throw(std::logic_error, std::runtime_error) -{ - DEBUG_STREAM << "ProtocolManager::setFailedFileFailed()" << endl; + request_sp->set_file_version(fileVersion); + request_sp->set_file_name(fileName); + + INFO_STREAM << "ProtocolManager::fillRequest() file " << fileName + << " version " << fileVersion << " to " << m_remoteEndpoint << endl; + + if(!request_sp->IsInitialized()) + throw std::runtime_error("Request not initialized"); - ++m_failedFileRowsetIt; + return request_sp; } } //namespace diff --git a/src/ProtocolManager.h b/src/ProtocolManager.h index 47234de8fd638e75d2734cc5253e1195ae59c385..b5909bad60b69602668ee699bb4a48d6b240808b 100644 --- a/src/ProtocolManager.h +++ b/src/ProtocolManager.h @@ -48,67 +48,55 @@ protected: public: //------------------------------------------------------------------------------ -// [Public] Class creation method +// [Public] Class creation method //------------------------------------------------------------------------------ static ProtocolManager::SP create(DataImporter*, Configuration::SP, DBManager::SP); //------------------------------------------------------------------------------ -// [Public] Remote endpoint setter method +// [Public] Remote endpoint setter method //------------------------------------------------------------------------------ virtual void setRemoteEndpoint(std::string); //------------------------------------------------------------------------------ -// [Public] Files lists update methods +// [Public] Files lists methods //------------------------------------------------------------------------------ - virtual void updateNewList() throw(std::runtime_error); + virtual void retrieveFiles() throw(std::runtime_error); - virtual void updateFailedList() throw(std::runtime_error); + virtual bool hasFilesToTransfer(); + + virtual bool hasNextFile(); //------------------------------------------------------------------------------ -// [Public] Files lists handling methods +// [Public] Request response methods //------------------------------------------------------------------------------ - virtual bool hasNextNewList(); + RequestSP createtRequest() + throw(std::runtime_error); - virtual bool hasNextFailedList(); - - virtual bool isRecoveryTimeElapsed(); + FileWrapper::SP processResponse(ResponseSP) + throw(std::logic_error, std::runtime_error); //------------------------------------------------------------------------------ -// [Public] Recovery mode getter and setter methods +// [Public] Files status methods //------------------------------------------------------------------------------ - virtual bool getRecoveryMode(); + virtual void setCurrentFileDownloaded(FileWrapper::SP) + throw(std::runtime_error); - virtual void setRecoveryMode(bool); + virtual void setCurrentFileFailed() + throw(std::runtime_error); +protected: //------------------------------------------------------------------------------ -// [Public] Request response methods +// [Protected] Elapsed recovery time method //------------------------------------------------------------------------------ - RequestSP createNewListRequest() - throw(std::logic_error, std::runtime_error); - - RequestSP createFailedListRequest() - throw(std::logic_error, std::runtime_error); - - FileWrapper::SP processResponse(ResponseSP) - throw(std::logic_error, std::runtime_error); - + virtual bool isRecoveryTimeElapsed(); + //------------------------------------------------------------------------------ -// [Public] Files status methods +// [Protected] Fill request utility method //------------------------------------------------------------------------------ - virtual void setNewFileTransfered(FileWrapper::SP) - throw(std::logic_error, std::runtime_error); - - virtual void setFailedFileTransfered(FileWrapper::SP) - throw(std::logic_error, std::runtime_error); - - virtual void setNewFileFailed() - throw(std::logic_error, std::runtime_error); - - virtual void setFailedFileFailed() - throw(std::logic_error, std::runtime_error); - -protected: + virtual RequestSP fillRequest(DBManager::FileRowset::const_iterator) + throw(std::runtime_error); + //------------------------------------------------------------------------------ // [Protected] Class variables //------------------------------------------------------------------------------ @@ -124,23 +112,20 @@ protected: //Address and port of remote endpoint std::string m_remoteEndpoint; - //Processing file from failed list - bool m_recoveryMode; - //Processing file from failed list last timestamp boost::posix_time::ptime m_recoveryModeTime; //New file list shared pointer - DBManager::NewFileRowsetSP m_newFileRowset_sp; + DBManager::FileRowsetSP m_newFileRowset_sp; //New file list iterator - DBManager::NewFileRowset::const_iterator m_newFileRowsetIt; + DBManager::FileRowset::const_iterator m_newFileRowsetIt; //Failed file list shared pointer - DBManager::FailedFileRowsetSP m_failedFileRowset_sp; + DBManager::FileRowsetSP m_failedFileRowset_sp; //Failed file list iterator - DBManager::FailedFileRowset::const_iterator m_failedFileRowsetIt; + DBManager::FileRowset::const_iterator m_failedFileRowsetIt; }; } //End of namespace diff --git a/src/SSLClient.cpp b/src/SSLClient.cpp index c520eb1a3429f45f62c5350c6770168d772ad830..e3a8b090a44329f3cb00e105089a2a57d774f688 100644 --- a/src/SSLClient.cpp +++ b/src/SSLClient.cpp @@ -175,12 +175,7 @@ void SSLClient::startWriteRequest() try { - RequestSP request_sp; - - if(!m_protocolManager_sp->getRecoveryMode()) - request_sp = m_protocolManager_sp->createNewListRequest(); - else - request_sp = m_protocolManager_sp->createFailedListRequest(); + RequestSP request_sp = m_protocolManager_sp->createtRequest(); boost::uint32_t bodySize = request_sp->ByteSize(); @@ -203,12 +198,6 @@ void SSLClient::startWriteRequest() boost::bind(&SSLClient::handleWriteRequest, this, boost::asio::placeholders::error)); } - catch(std::logic_error& ec) - { - WARN_STREAM << "SSLClient::startWriteRequest() " << ec.what() << endl; - - onTransferFailed(); - } catch(std::runtime_error& ec) { ERROR_STREAM << "SSLClient::startWriteRequest() " << ec.what() << endl;