Skip to content
Snippets Groups Projects
Commit fb8d2ecb authored by Marco De Marco's avatar Marco De Marco
Browse files

Protocol manager refactored, before test

parent 7d82abb2
No related branches found
No related tags found
No related merge requests found
...@@ -188,7 +188,9 @@ void Client::startUpdateLists() ...@@ -188,7 +188,9 @@ void Client::startUpdateLists()
try try
{ {
m_protocolManager_sp->updateFileLists(); m_protocolManager_sp->updateNewList();
m_protocolManager_sp->updateFailedList();
writeState(Tango::ON); writeState(Tango::ON);
writeStatus("Database loop active"); writeStatus("Database loop active");
...@@ -208,8 +210,19 @@ void Client::startUpdateLists() ...@@ -208,8 +210,19 @@ void Client::startUpdateLists()
writeStatus("Unknown error"); writeStatus("Unknown error");
} }
if(readState() != Tango::ALARM && m_protocolManager_sp->hasNextFile()) if(readState() != Tango::ALARM &&
m_protocolManager_sp->hasNextNewList())
{
m_protocolManager_sp->setRecoveryMode(false);
startResolve();
}
else if(readState() != Tango::ALARM &&
m_protocolManager_sp->hasNextFailedList() &&
m_protocolManager_sp->isRecoveryTimeElapsed())
{ {
m_protocolManager_sp->setRecoveryMode(true);
startResolve(); startResolve();
} }
else else
...@@ -435,9 +448,13 @@ void Client::onTransferCompleted(FileWrapper::SP fileWrapper_sp) ...@@ -435,9 +448,13 @@ void Client::onTransferCompleted(FileWrapper::SP fileWrapper_sp)
try try
{ {
m_protocolManager_sp->setFileTransfered(fileWrapper_sp); if(!m_protocolManager_sp->getRecoveryMode())
m_protocolManager_sp->setNewFileTransfered(fileWrapper_sp);
else
m_protocolManager_sp->setFailedFileTransfered(fileWrapper_sp);
if(m_protocolManager_sp->hasNextFile()) if(m_protocolManager_sp->hasNextNewList() ||
m_protocolManager_sp->hasNextFailedList())
{ {
startWriteRequest(); startWriteRequest();
} }
...@@ -473,9 +490,13 @@ void Client::onTransferFailed() ...@@ -473,9 +490,13 @@ void Client::onTransferFailed()
try try
{ {
m_protocolManager_sp->setFileFailed(); if(!m_protocolManager_sp->getRecoveryMode())
m_protocolManager_sp->setNewFileFailed();
else
m_protocolManager_sp->setFailedFileFailed();
if(m_protocolManager_sp->hasNextFile()) if(m_protocolManager_sp->hasNextNewList() ||
m_protocolManager_sp->hasNextFailedList())
{ {
startWriteRequest(); startWriteRequest();
} }
......
...@@ -121,7 +121,12 @@ void PlainClient::startWriteRequest() ...@@ -121,7 +121,12 @@ void PlainClient::startWriteRequest()
try try
{ {
RequestSP request_sp = m_protocolManager_sp->createRequest(); RequestSP request_sp;
if(!m_protocolManager_sp->getRecoveryMode())
request_sp = m_protocolManager_sp->createNewListRequest();
else
request_sp = m_protocolManager_sp->createFailedListRequest();
boost::uint32_t bodySize = request_sp->ByteSize(); boost::uint32_t bodySize = request_sp->ByteSize();
......
...@@ -46,44 +46,82 @@ void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint) ...@@ -46,44 +46,82 @@ void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint)
} }
//============================================================================== //==============================================================================
// ProtocolManager::updateFileLists() // ProtocolManager::updateNewList()
//============================================================================== //==============================================================================
void ProtocolManager::updateFileLists() throw(std::runtime_error) void ProtocolManager::updateNewList() throw(std::runtime_error)
{ {
DEBUG_STREAM << "ProtocolManager::updateFileLists()" << endl; DEBUG_STREAM << "ProtocolManager::updateNewList()" << endl;
boost::posix_time::ptime m_lastTimestamp = boost::posix_time::ptime m_lastTimestamp =
m_dBManager_sp->retrieveLastTimestamp(); m_dBManager_sp->retrieveLastTimestamp();
DEBUG_STREAM << "ProtocolManager::updateFileLists() last timestamp " DEBUG_STREAM << "ProtocolManager::updateNewList() last timestamp "
<< boost::posix_time::to_simple_string(m_lastTimestamp) << endl; << boost::posix_time::to_simple_string(m_lastTimestamp) << endl;
m_newFileRowset_sp = m_dBManager_sp->retrieveNewFiles(m_lastTimestamp); m_newFileRowset_sp = m_dBManager_sp->retrieveNewFiles(m_lastTimestamp);
m_newFileRowsetIt = m_newFileRowset_sp->begin(); m_newFileRowsetIt = m_newFileRowset_sp->begin();
}
//==============================================================================
// ProtocolManager::updateFailedList()
//==============================================================================
void ProtocolManager::updateFailedList() throw(std::runtime_error)
{
DEBUG_STREAM << "ProtocolManager::updateFailedList()" << endl;
m_failedFileRowset_sp = m_dBManager_sp->retrieveFailedFiles(); m_failedFileRowset_sp = m_dBManager_sp->retrieveFailedFiles();
m_failedFileRowsetIt = m_failedFileRowset_sp->begin(); m_failedFileRowsetIt = m_failedFileRowset_sp->begin();
m_recoveryModeTime = boost::posix_time::second_clock::local_time();
} }
//============================================================================== //==============================================================================
// ProtocolManager::hasNextFile() // ProtocolManager::hasNextNewList()
//============================================================================== //==============================================================================
bool ProtocolManager::hasNextFile() bool ProtocolManager::hasNextNewList()
{ {
DEBUG_STREAM << "ProtocolManager::hasNextFile()" << endl; DEBUG_STREAM << "ProtocolManager::hasNextNewList()" << endl;
if(m_newFileRowset_sp && if(m_newFileRowset_sp &&
m_newFileRowsetIt != m_newFileRowset_sp->end()) m_newFileRowsetIt != m_newFileRowset_sp->end())
{ {
DEBUG_STREAM << "ProtocolManager::hasNextFile() from new list" << endl; DEBUG_STREAM << "ProtocolManager::hasNextNewList() true" << endl;
m_recoveryMode = false;
return true; return true;
} }
else
{
DEBUG_STREAM << "ProtocolManager::hasNextNewList() false" << endl;
return false;
}
}
//==============================================================================
// ProtocolManager::hasNextFailedList()
//==============================================================================
bool ProtocolManager::hasNextFailedList()
{
DEBUG_STREAM << "ProtocolManager::hasNextFailedList()" << endl;
if(m_failedFileRowset_sp && if(m_failedFileRowset_sp &&
m_failedFileRowsetIt != m_failedFileRowset_sp->end()) m_failedFileRowsetIt != m_failedFileRowset_sp->end())
{ {
DEBUG_STREAM << "ProtocolManager::hasNextFailedList() true" << endl;
return true;
}
else
{
DEBUG_STREAM << "ProtocolManager::hasNextFailedList() false" << endl;
return false;
}
}
//==============================================================================
// ProtocolManager::isRecoveryTimeElapsed()
//==============================================================================
bool ProtocolManager::isRecoveryTimeElapsed()
{
DEBUG_STREAM << "ProtocolManager::isRecoveryTimeElapsed()" << endl;
boost::posix_time::ptime now(boost::posix_time::second_clock::local_time()); boost::posix_time::ptime now(boost::posix_time::second_clock::local_time());
if(m_recoveryModeTime.is_not_a_date_time()) if(m_recoveryModeTime.is_not_a_date_time())
...@@ -91,33 +129,49 @@ bool ProtocolManager::hasNextFile() ...@@ -91,33 +129,49 @@ bool ProtocolManager::hasNextFile()
boost::posix_time::time_duration diff = now - m_recoveryModeTime; boost::posix_time::time_duration diff = now - m_recoveryModeTime;
if(diff.total_seconds() > 30) if(diff.total_seconds() > (int)m_configuration_sp->getRecoveryTime())
{ {
DEBUG_STREAM << "ProtocolManager::hasNextFile() from failed list" << endl; DEBUG_STREAM << "ProtocolManager::isRecoveryTimeElapsed() true" << endl;
m_recoveryModeTime = now;
m_recoveryMode = true;
return true; return true;
} }
else else
{ {
DEBUG_STREAM << "ProtocolManager::hasNextFile() " DEBUG_STREAM << "ProtocolManager::isRecoveryTimeElapsed() false" << endl;
<< "wait from failed list" << endl; return false;
} }
} }
DEBUG_STREAM << "ProtocolManager::hasNextFile() lists empty" << endl; //==============================================================================
// ProtocolManager::getRecoveryMode()
//==============================================================================
bool ProtocolManager::getRecoveryMode()
{
DEBUG_STREAM << "ProtocolManager::getRecoveryMode()" << endl;
m_recoveryMode = false; return m_recoveryMode;
return false; }
//==============================================================================
// ProtocolManager::setRecoveryMode()
//==============================================================================
void ProtocolManager::setRecoveryMode(bool recoveryMode)
{
DEBUG_STREAM << "ProtocolManager::setRecoveryMode()" << endl;
m_recoveryMode = recoveryMode;
} }
//============================================================================== //==============================================================================
// ProtocolManager::createRequest() // ProtocolManager::createNewListRequest()
//============================================================================== //==============================================================================
RequestSP ProtocolManager::createRequest() RequestSP ProtocolManager::createNewListRequest()
throw(std::logic_error, std::runtime_error) throw(std::logic_error, std::runtime_error)
{ {
DEBUG_STREAM << "ProtocolManager::createRequest()" << endl; DEBUG_STREAM << "ProtocolManager::createNewListRequest()" << endl;
if(!m_newFileRowset_sp ||
m_newFileRowsetIt == m_newFileRowset_sp->end())
throw std::runtime_error("New list not initialized or empty");
RequestSP request_sp(new Request); RequestSP request_sp(new Request);
...@@ -127,47 +181,60 @@ RequestSP ProtocolManager::createRequest() ...@@ -127,47 +181,60 @@ RequestSP ProtocolManager::createRequest()
request_sp->set_schema(m_configuration_sp->getDatabaseSchema()); request_sp->set_schema(m_configuration_sp->getDatabaseSchema());
request_sp->set_table(m_configuration_sp->getDatabaseTable()); request_sp->set_table(m_configuration_sp->getDatabaseTable());
int fileVersion;
std::string fileName;
if(!m_recoveryMode)
{
if(!m_newFileRowset_sp ||
m_newFileRowsetIt == m_newFileRowset_sp->end())
throw std::runtime_error("New list not initialized or empty");
if(!m_newFileRowsetIt->get<0>()) if(!m_newFileRowsetIt->get<0>())
throw std::invalid_argument("Empty file version found on new list"); throw std::invalid_argument("Empty file version found on new list");
fileVersion = m_newFileRowsetIt->get<0>().get(); int fileVersion = m_newFileRowsetIt->get<0>().get();
if(!m_newFileRowsetIt->get<1>()) if(!m_newFileRowsetIt->get<1>())
throw std::invalid_argument("Empty file name found on new list"); throw std::invalid_argument("Empty file name found on new list");
fileName = m_newFileRowsetIt->get<1>().get(); std::string fileName = m_newFileRowsetIt->get<1>().get();
INFO_STREAM << "ProtocolManager::createRequest() request new file " request_sp->set_file_version(fileVersion);
<< fileName << " version " << fileVersion << endl; request_sp->set_file_name(fileName);
INFO_STREAM << "ProtocolManager::createNewListRequest() file " << fileName
<< " version " << fileVersion << " to " << m_remoteEndpoint << endl;
if(!request_sp->IsInitialized())
throw std::runtime_error("Request not initialized");
return request_sp;
} }
else
//==============================================================================
// ProtocolManager::createFailedListRequest()
//==============================================================================
RequestSP ProtocolManager::createFailedListRequest()
throw(std::logic_error, std::runtime_error)
{ {
DEBUG_STREAM << "ProtocolManager::createFailedListRequest()" << endl;
if(!m_failedFileRowset_sp || if(!m_failedFileRowset_sp ||
m_failedFileRowsetIt == m_failedFileRowset_sp->end()) m_failedFileRowsetIt == m_failedFileRowset_sp->end())
throw std::runtime_error("Failed list not initialized or empty"); throw std::runtime_error("Failed list not initialized or empty");
RequestSP request_sp(new Request);
request_sp->set_username(m_configuration_sp->getDatabaseUsername());
request_sp->set_password(m_configuration_sp->getDatabasePassword());
request_sp->set_schema(m_configuration_sp->getDatabaseSchema());
request_sp->set_table(m_configuration_sp->getDatabaseTable());
if(!m_failedFileRowsetIt->get<0>()) if(!m_failedFileRowsetIt->get<0>())
throw std::invalid_argument("Empty file version found on failed list"); throw std::invalid_argument("Empty file version found on failed list");
fileVersion = m_failedFileRowsetIt->get<0>().get(); int fileVersion = m_failedFileRowsetIt->get<0>().get();
if(!m_failedFileRowsetIt->get<1>()) if(!m_failedFileRowsetIt->get<1>())
throw std::invalid_argument("Empty file name found on failed list"); throw std::invalid_argument("Empty file name found on failed list");
fileName = m_failedFileRowsetIt->get<1>().get(); std::string fileName = m_failedFileRowsetIt->get<1>().get();
INFO_STREAM << "ProtocolManager::createRequest() request failed file "
<< fileName << " version " << fileVersion << endl;
}
request_sp->set_file_version(fileVersion); request_sp->set_file_version(fileVersion);
request_sp->set_file_name(fileName); request_sp->set_file_name(fileName);
INFO_STREAM << "ProtocolManager::createFailedListRequest() file " << fileName
<< " version " << fileVersion << " to " << m_remoteEndpoint << endl;
if(!request_sp->IsInitialized()) if(!request_sp->IsInitialized())
throw std::runtime_error("Request not initialized"); throw std::runtime_error("Request not initialized");
...@@ -202,7 +269,8 @@ FileWrapper::SP ProtocolManager::processResponse(ResponseSP response_sp) ...@@ -202,7 +269,8 @@ FileWrapper::SP ProtocolManager::processResponse(ResponseSP response_sp)
boost::uint64_t fileSize = response_sp->file_size(); boost::uint64_t fileSize = response_sp->file_size();
INFO_STREAM << "ProtocolManager::processResponse() transfer file " INFO_STREAM << "ProtocolManager::processResponse() transfer file "
<< fileName << " version " << fileVersion << " size " << fileSize << endl; << fileName << " version " << fileVersion << " size " << fileSize
<< " from " << m_remoteEndpoint << endl;
return FileWrapper::create(m_deviceImpl_p, return FileWrapper::create(m_deviceImpl_p,
m_configuration_sp->getStoragePath(), filePath, m_configuration_sp->getStoragePath(), filePath,
...@@ -219,22 +287,20 @@ FileWrapper::SP ProtocolManager::processResponse(ResponseSP response_sp) ...@@ -219,22 +287,20 @@ FileWrapper::SP ProtocolManager::processResponse(ResponseSP response_sp)
} }
//============================================================================== //==============================================================================
// ProtocolManager::setFileTransfered() // ProtocolManager::setNewFileTransfered()
//============================================================================== //==============================================================================
void ProtocolManager::setFileTransfered(FileWrapper::SP fileWrapper_sp) void ProtocolManager::setNewFileTransfered(FileWrapper::SP fileWrapper_sp)
throw(std::logic_error, std::runtime_error) throw(std::logic_error, std::runtime_error)
{ {
DEBUG_STREAM << "ProtocolManager::setFileTransfered()" << endl; DEBUG_STREAM << "ProtocolManager::setNewFileTransfered()" << endl;
std::string storagePath = fileWrapper_sp->getStoragePath();
std::string filePath = fileWrapper_sp->getFilePath();
if(!m_recoveryMode)
{
if(!m_newFileRowset_sp || if(!m_newFileRowset_sp ||
m_newFileRowsetIt == m_newFileRowset_sp->end()) m_newFileRowsetIt == m_newFileRowset_sp->end())
throw std::runtime_error("New list not initialized or empty"); throw std::runtime_error("New list not initialized or empty");
std::string storagePath = fileWrapper_sp->getStoragePath();
std::string filePath = fileWrapper_sp->getFilePath();
if(!m_newFileRowsetIt->get<0>()) if(!m_newFileRowsetIt->get<0>())
throw std::invalid_argument("Empty file version found on new list"); throw std::invalid_argument("Empty file version found on new list");
int fileVersion = m_newFileRowsetIt->get<0>().get(); int fileVersion = m_newFileRowsetIt->get<0>().get();
...@@ -247,7 +313,7 @@ void ProtocolManager::setFileTransfered(FileWrapper::SP fileWrapper_sp) ...@@ -247,7 +313,7 @@ void ProtocolManager::setFileTransfered(FileWrapper::SP fileWrapper_sp)
throw std::invalid_argument("Empty update time found on new list"); throw std::invalid_argument("Empty update time found on new list");
std::tm currentTm = m_newFileRowsetIt->get<2>().get(); std::tm currentTm = m_newFileRowsetIt->get<2>().get();
INFO_STREAM << "ProtocolManager::setFileTransfered() file " INFO_STREAM << "ProtocolManager::setNewFileTransfered() file "
<< fileName << " version " << fileVersion << " transfered" << endl; << fileName << " version " << fileVersion << " transfered" << endl;
boost::posix_time::ptime currentPtime = boost::posix_time::ptime currentPtime =
...@@ -278,12 +344,22 @@ void ProtocolManager::setFileTransfered(FileWrapper::SP fileWrapper_sp) ...@@ -278,12 +344,22 @@ void ProtocolManager::setFileTransfered(FileWrapper::SP fileWrapper_sp)
auxTransaction_sp->commit(); auxTransaction_sp->commit();
mainTransaction_sp->commit(); mainTransaction_sp->commit();
} }
else
//==============================================================================
// ProtocolManager::setFailedFileTransfered()
//==============================================================================
void ProtocolManager::setFailedFileTransfered(FileWrapper::SP fileWrapper_sp)
throw(std::logic_error, std::runtime_error)
{ {
DEBUG_STREAM << "ProtocolManager::setFailedFileTransfered()" << endl;
if(!m_failedFileRowset_sp || if(!m_failedFileRowset_sp ||
m_failedFileRowsetIt == m_failedFileRowset_sp->end()) m_failedFileRowsetIt == m_failedFileRowset_sp->end())
throw std::runtime_error("Failed list not initialized or empty"); throw std::runtime_error("Failed list not initialized or empty");
std::string storagePath = fileWrapper_sp->getStoragePath();
std::string filePath = fileWrapper_sp->getFilePath();
if(!m_failedFileRowsetIt->get<0>()) if(!m_failedFileRowsetIt->get<0>())
throw std::invalid_argument("Empty file version found on failed list"); throw std::invalid_argument("Empty file version found on failed list");
int fileVersion = m_failedFileRowsetIt->get<0>().get(); int fileVersion = m_failedFileRowsetIt->get<0>().get();
...@@ -305,17 +381,15 @@ void ProtocolManager::setFileTransfered(FileWrapper::SP fileWrapper_sp) ...@@ -305,17 +381,15 @@ void ProtocolManager::setFileTransfered(FileWrapper::SP fileWrapper_sp)
auxTransaction_sp->commit(); auxTransaction_sp->commit();
mainTransaction_sp->commit(); mainTransaction_sp->commit();
} }
}
//============================================================================== //==============================================================================
// ProtocolManager::markAsFailed() // ProtocolManager::setNewFileFailed()
//============================================================================== //==============================================================================
void ProtocolManager::setFileFailed() throw(std::logic_error, std::runtime_error) void ProtocolManager::setNewFileFailed()
throw(std::logic_error, std::runtime_error)
{ {
DEBUG_STREAM << "ProtocolManager::markAsFailed()" << endl; DEBUG_STREAM << "ProtocolManager::setNewFileFailed()" << endl;
if(!m_recoveryMode)
{
if(!m_newFileRowset_sp || if(!m_newFileRowset_sp ||
m_newFileRowsetIt == m_newFileRowset_sp->end()) m_newFileRowsetIt == m_newFileRowset_sp->end())
throw std::runtime_error("New list not initialized or empty"); throw std::runtime_error("New list not initialized or empty");
...@@ -363,10 +437,16 @@ void ProtocolManager::setFileFailed() throw(std::logic_error, std::runtime_error ...@@ -363,10 +437,16 @@ void ProtocolManager::setFileFailed() throw(std::logic_error, std::runtime_error
auxTransaction_sp->commit(); auxTransaction_sp->commit();
mainTransaction_sp->commit(); mainTransaction_sp->commit();
} }
else
//==============================================================================
// ProtocolManager::setFailedFileFailed()
//==============================================================================
void ProtocolManager::setFailedFileFailed()
throw(std::logic_error, std::runtime_error)
{ {
DEBUG_STREAM << "ProtocolManager::setFailedFileFailed()" << endl;
++m_failedFileRowsetIt; ++m_failedFileRowsetIt;
} }
}
} //namespace } //namespace
...@@ -57,16 +57,35 @@ public: ...@@ -57,16 +57,35 @@ public:
virtual void setRemoteEndpoint(std::string); virtual void setRemoteEndpoint(std::string);
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// [Public] Files list methods // [Public] Files lists update methods
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
virtual void updateFileLists() throw(std::runtime_error); virtual void updateNewList() throw(std::runtime_error);
virtual bool hasNextFile(); virtual void updateFailedList() throw(std::runtime_error);
//------------------------------------------------------------------------------
// [Public] Files lists handling methods
//------------------------------------------------------------------------------
virtual bool hasNextNewList();
virtual bool hasNextFailedList();
virtual bool isRecoveryTimeElapsed();
//------------------------------------------------------------------------------
// [Public] Recovery mode getter and setter methods
//------------------------------------------------------------------------------
virtual bool getRecoveryMode();
virtual void setRecoveryMode(bool);
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// [Public] Request response methods // [Public] Request response methods
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
RequestSP createRequest() RequestSP createNewListRequest()
throw(std::logic_error, std::runtime_error);
RequestSP createFailedListRequest()
throw(std::logic_error, std::runtime_error); throw(std::logic_error, std::runtime_error);
FileWrapper::SP processResponse(ResponseSP) FileWrapper::SP processResponse(ResponseSP)
...@@ -75,10 +94,16 @@ public: ...@@ -75,10 +94,16 @@ public:
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// [Public] Files status methods // [Public] Files status methods
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
virtual void setFileTransfered(FileWrapper::SP) virtual void setNewFileTransfered(FileWrapper::SP)
throw(std::logic_error, std::runtime_error);
virtual void setFailedFileTransfered(FileWrapper::SP)
throw(std::logic_error, std::runtime_error);
virtual void setNewFileFailed()
throw(std::logic_error, std::runtime_error); throw(std::logic_error, std::runtime_error);
virtual void setFileFailed() virtual void setFailedFileFailed()
throw(std::logic_error, std::runtime_error); throw(std::logic_error, std::runtime_error);
protected: protected:
......
...@@ -174,7 +174,12 @@ void SSLClient::startWriteRequest() ...@@ -174,7 +174,12 @@ void SSLClient::startWriteRequest()
try try
{ {
RequestSP request_sp = m_protocolManager_sp->createRequest(); RequestSP request_sp;
if(!m_protocolManager_sp->getRecoveryMode())
request_sp = m_protocolManager_sp->createNewListRequest();
else
request_sp = m_protocolManager_sp->createFailedListRequest();
boost::uint32_t bodySize = request_sp->ByteSize(); boost::uint32_t bodySize = request_sp->ByteSize();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment