Skip to content
Snippets Groups Projects
Commit 675a9532 authored by Marco De Marco's avatar Marco De Marco
Browse files

Logic enhanced, not yet completed

parent 7b076faf
No related branches found
No related tags found
No related merge requests found
......@@ -215,7 +215,7 @@ void Client::handleUpdateLists()
{
DEBUG_STREAM << "Client::handleUpdateLists()" << endl;
if(readState() != Tango::ALARM && m_protocolManager_sp->hasNewFile())
if(readState() != Tango::ALARM && m_protocolManager_sp->hasNextFile())
{
startResolve();
}
......@@ -333,37 +333,39 @@ void Client::handleReadResponseBody(const boost::system::error_code& errorCode)
response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE],
m_readBuff.size() - HEADER_SIZE);
if(response_sp->state() == Response::REQUEST_ACCEPTED)
{
startReadData(m_protocolManager_sp->processResponse(response_sp));
}
else
{
ERROR_STREAM << "Client::handleResponse() "
<< response_sp->status() << endl;
FileWrapper::SP fileWrapper_sp =
m_protocolManager_sp->processResponse(response_sp);
writeState(Tango::ALARM);
writeStatus(response_sp->status());
}
startReadData(fileWrapper_sp);
}
catch(std::logic_error& ec)
{
WARN_STREAM << "Client::handleReadResponseBody() " << ec.what() << endl;
//TODO: mark file as failed and try with next next
}
catch(std::exception& ec)
catch(std::runtime_error& ec)
{
ERROR_STREAM << "Client::handleResponse() " << ec.what() << endl;
ERROR_STREAM << "Client::handleReadResponseBody() " << ec.what() << endl;
writeState(Tango::ALARM);
writeStatus(ec.what());
//TODO: stop and set ALARM
}
catch(...)
{
ERROR_STREAM << "Client::handleResponse() Unknown error" << endl;
ERROR_STREAM << "Client::handleReadResponseBody() Unknown error" << endl;
writeState(Tango::ALARM);
writeStatus("Unknown error");
//TODO: shit storm happens... stop and set ALARM
}
}
else
{
ERROR_STREAM << "Client::handleResponse() " << errorCode.message() << endl;
ERROR_STREAM << "Client::handleReadResponseBody() " << errorCode.message() << endl;
writeState(Tango::ALARM);
writeStatus(errorCode.message());
......@@ -391,12 +393,16 @@ void Client::handleReadData(FileWrapper::SP fileWrapper_sp, std::size_t recvByte
{
INFO_STREAM << "Client::handleReadData() transfer complete " << endl;
if(m_protocolManager_sp->hasNewFile())
if(m_protocolManager_sp->hasNextFile())
{
m_protocolManager_sp->nextFile();
startWriteRequest();
}
else
{
startUpdateLists();
}
}
}
else if(errorCode == boost::asio::error::eof)
......
......@@ -159,10 +159,10 @@ protected:
//Thread for IO service run scoped pointer
boost::scoped_ptr<boost::thread> m_thread_sp;
//First connection timeout
//Connection write read timeout
boost::asio::deadline_timer m_resetConnectionTimer;
//Request response timeout
//File list update time
boost::asio::deadline_timer m_listsUpdateTimer;
//Header size on binary stream
......@@ -189,7 +189,7 @@ protected:
//Read buffer size
const boost::uint64_t BUFFER_SIZE = 40960;
//Buffer for file data read from stream (TODO: unify two buffers)
//Buffer for file data read from stream
std::vector<char> m_fileBuff;
};
......
......@@ -121,6 +121,27 @@ DBManager::NewFileRowsetSP DBManager::retrieveNewFiles(boost::posix_time::ptime
return newFileRowset_sp;
}
//==============================================================================
// DBManager::updateNewFilePath()
//==============================================================================
void DBManager::updateNewFilePath(std::string storagePath, std::string filePath,
int fileVersion, std::string fileName) throw(soci::soci_error)
{
DEBUG_STREAM << "DBManager::updateNewFilePath()" << endl;
boost::mutex::scoped_lock lock(m_sessionMutex);
if(m_mainSession_sp->get_backend() == NULL)
m_mainSession_sp->reconnect();
*m_mainSession_sp << "insert into " << m_configuration_sp->getDatabaseSchema()
<< "." << m_configuration_sp->getDatabaseTable() << " (storage_path, "
<< "file_path) values (:storagePath, :filePath) where file_version = "
<< ":fileVersion and file_name like :FileName",
soci::use(storagePath, "storagePath"), soci::use(filePath, "filePath"),
soci::use(fileVersion, "fileVersion"), soci::use(fileName, "fileName");
}
//==============================================================================
// DBManager::retrieveLastTimestamp()
//==============================================================================
......@@ -136,7 +157,6 @@ boost::posix_time::ptime DBManager::retrieveLastTimestamp()
std::tm tm_time;
//FIXME: max return null, otherwise coalesce does not work
*m_auxSession_sp << "select coalesce(max(last_timestamp),'1970-01-01 00:00:00')"
<< " from "<< m_configuration_sp->getAuxDatabaseSchema()
<< "." << m_configuration_sp->getAuxDatabaseTimestampTable()
......
......@@ -80,6 +80,9 @@ public:
virtual NewFileRowsetSP retrieveNewFiles(boost::posix_time::ptime)
throw(soci::soci_error);
virtual void updateNewFilePath(std::string, std::string, int, std::string)
throw(soci::soci_error);
//------------------------------------------------------------------------------
// [Public] Timestamp methods
//------------------------------------------------------------------------------
......
......@@ -141,7 +141,14 @@ void PlainClient::startWriteRequest()
boost::bind(&PlainClient::handleWriteRequest, this,
boost::asio::placeholders::error));
}
catch(std::exception& ec)
catch(std::logic_error& ec)
{
WARN_STREAM << "PlainClient::startWriteRequest() " << ec.what() << endl;
writeState(Tango::ALARM);
writeStatus(ec.what());
}
catch(std::runtime_error& ec)
{
ERROR_STREAM << "PlainClient::startWriteRequest() " << ec.what() << endl;
......
......@@ -17,7 +17,7 @@ ProtocolManager::ProtocolManager(Tango::DeviceImpl* deviceImpl_p,
}
//==============================================================================
// ProtocolManager::ProtocolManager()
// ProtocolManager::~ProtocolManager()
//==============================================================================
ProtocolManager::~ProtocolManager()
{
......@@ -25,7 +25,7 @@ ProtocolManager::~ProtocolManager()
}
//==============================================================================
// ProtocolManager::ProtocolManager()
// ProtocolManager::create()
//==============================================================================
ProtocolManager::SP ProtocolManager::create(Tango::DeviceImpl* deviceImpl_p,
Configuration::SP configuration_sp, DBManager::SP dBManager_sp)
......@@ -37,7 +37,7 @@ ProtocolManager::SP ProtocolManager::create(Tango::DeviceImpl* deviceImpl_p,
}
//==============================================================================
// ProtocolManager::ProtocolManager()
// ProtocolManager::setRemoteEndpoint()
//==============================================================================
void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint)
{
......@@ -47,7 +47,7 @@ void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint)
}
//==============================================================================
// ProtocolManager::updateNewFileList()
// ProtocolManager::updateFileLists()
//==============================================================================
void ProtocolManager::updateFileLists() throw(soci::soci_error)
{
......@@ -69,16 +69,16 @@ void ProtocolManager::updateFileLists() throw(soci::soci_error)
}
//==============================================================================
// ProtocolManager::isNewFileListEmpty()
// ProtocolManager::hasNextFile()
//==============================================================================
bool ProtocolManager::hasNewFile()
bool ProtocolManager::hasNextFile()
{
DEBUG_STREAM << "ProtocolManager::isNewFileListEmpty()" << endl;
DEBUG_STREAM << "ProtocolManager::hasNextFile()" << endl;
if(m_newFileRowset_sp &&
m_newFileRowsetIt != m_newFileRowset_sp->end())
{
DEBUG_STREAM << "NEW FILE MODE" << endl;
DEBUG_STREAM << "ProtocolManager::hasNextFile() from new list" << endl;
m_recoveryMode = false;
return true;
......@@ -86,13 +86,18 @@ bool ProtocolManager::hasNewFile()
else if(m_failedFileRowset_sp &&
m_failedFileRowsetIt != m_failedFileRowset_sp->end())
{
DEBUG_STREAM << "RECOVERY MODE" << endl;
DEBUG_STREAM << "ProtocolManager::hasNextFile() from failed list" << endl;
m_recoveryMode = true;
return true;
}
else
{
DEBUG_STREAM << "ProtocolManager::hasNextFile() lists empty" << endl;
return false;
m_recoveryMode = false;
return false;
}
}
//==============================================================================
......@@ -107,7 +112,7 @@ void ProtocolManager::nextFile()
if(m_newFileRowset_sp &&
m_newFileRowsetIt != m_newFileRowset_sp->end())
{
DEBUG_STREAM << "NEXT NEW FILE MODE" << endl;
DEBUG_STREAM << "ProtocolManager::nextFile() from new list" << endl;
++m_newFileRowsetIt;
}
......@@ -117,7 +122,7 @@ void ProtocolManager::nextFile()
if(m_failedFileRowset_sp &&
m_failedFileRowsetIt != m_failedFileRowset_sp->end())
{
DEBUG_STREAM << "NEXT NEW FILE MODE" << endl;
DEBUG_STREAM << "ProtocolManager::nextFile() from failed list" << endl;
++m_failedFileRowsetIt;
}
......@@ -125,14 +130,12 @@ void ProtocolManager::nextFile()
}
//==============================================================================
// ProtocolManager::createNewFileRequest()
// ProtocolManager::createRequest()
//==============================================================================
RequestSP ProtocolManager::createRequest() throw(std::runtime_error)
RequestSP ProtocolManager::createRequest()
throw(std::logic_error, std::runtime_error)
{
DEBUG_STREAM << "ProtocolManager::createNewFileRequest()" << endl;
if(!hasNewFile())
throw std::runtime_error("New file list is empty");
DEBUG_STREAM << "ProtocolManager::createRequest()" << endl;
RequestSP request_sp(new Request);
......@@ -147,28 +150,36 @@ RequestSP ProtocolManager::createRequest() throw(std::runtime_error)
if(!m_recoveryMode)
{
if(!m_newFileRowset_sp ||
m_newFileRowsetIt == m_newFileRowset_sp->end())
throw std::runtime_error("New list not initialized or empty");
if(!m_newFileRowsetIt->get<0>())
throw std::runtime_error("Empty file version found");
throw std::invalid_argument("Empty file version found on new list");
fileVersion = m_newFileRowsetIt->get<0>().get();
if(!m_newFileRowsetIt->get<1>())
throw std::runtime_error("Empty file name found");
throw std::invalid_argument("Empty file name found on new list");
fileName = m_newFileRowsetIt->get<1>().get();
INFO_STREAM << "ProtocolManager::createTransfer() request new file "
INFO_STREAM << "ProtocolManager::createRequest() request new file "
<< fileName << " version " << fileVersion << endl;
}
else
{
if(!m_failedFileRowset_sp ||
m_failedFileRowsetIt == m_failedFileRowset_sp->end())
throw std::runtime_error("Failed list not initialized or empty");
if(!m_failedFileRowsetIt->get<0>())
throw std::runtime_error("Empty file version found");
throw std::invalid_argument("Empty file version found on failed list");
fileVersion = m_failedFileRowsetIt->get<0>().get();
if(!m_failedFileRowsetIt->get<1>())
throw std::runtime_error("Empty file name found");
throw std::invalid_argument("Empty file name found on failed list");
fileName = m_failedFileRowsetIt->get<1>().get();
INFO_STREAM << "ProtocolManager::createTransfer() request recovery file "
INFO_STREAM << "ProtocolManager::createRequest() request failed file "
<< fileName << " version " << fileVersion << endl;
}
......@@ -185,47 +196,82 @@ RequestSP ProtocolManager::createRequest() throw(std::runtime_error)
// ProtocolManager::processResponse()
//==============================================================================
FileWrapper::SP ProtocolManager::processResponse(ResponseSP response_sp)
throw(std::logic_error, std::runtime_error)
{
DEBUG_STREAM << "ProtocolManager::processResponse()" << endl;
if(!response_sp->IsInitialized())
throw std::runtime_error("Response not initialized");
std::string filePath = response_sp->file_path();
if(filePath.empty())
throw std::invalid_argument("Empty file path received");
int fileVersion = response_sp->file_version();
std::string fileName = response_sp->file_name();
if(fileName.empty())
throw std::invalid_argument("Empty file path received");
boost::uint64_t fileSize = response_sp->file_size();
INFO_STREAM << "ProtocolManager::processResponse() "
<< " transfer file " << fileName << " version "
<< fileVersion << " size " << fileSize << endl;
boost::filesystem::path destPath = composePath(
m_configuration_sp->getStoragePath(), filePath, fileVersion);
if(!boost::filesystem::exists(destPath))
boost::filesystem::create_directories(destPath);
if(!boost::filesystem::is_directory(destPath))
throw std::invalid_argument("Destination path \'"
+ destPath.string() + "\' is not a directory" );
FileWrapper::SP fileWrapper_sp;
if(response_sp->state() == Response::REQUEST_ACCEPTED)
{
INFO_STREAM << "ProtocolManager::processResponse() transfer file "
<< fileName << " version " << fileVersion << " size " << fileSize << endl;
// boost::filesystem::path path = composePath(m_configuration_sp->getStoragePath(),
// filePath, fileVersion, fileName);
destPath /= fileName;
boost::filesystem::path path(fileName);
fileWrapper_sp = FileWrapper::create(m_deviceImpl_p, destPath, fileSize);
}
else if(response_sp->state() == Response::METADATA_NOT_FOUND ||
response_sp->state() == Response::FILE_NOT_DOWNLOADED ||
response_sp->state() == Response::FILE_NOT_FOUND)
{
throw std::logic_error(response_sp->status());
}
else
{
throw std::runtime_error(response_sp->status());
}
return FileWrapper::create(m_deviceImpl_p, path, fileSize);
return fileWrapper_sp;
}
//==============================================================================
// ProtocolManager::composePath()
//==============================================================================
boost::filesystem::path ProtocolManager::composePath(std::string storagePath,
std::string filePath, int fileVersion, std::string fileName)
std::string filePath, int fileVersion)
{
DEBUG_STREAM << "ProtocolManager::composePath()" << endl;
boost::filesystem::path absolutePath(storagePath);
boost::filesystem::path path(storagePath);
absolutePath /= filePath;
path /= filePath;
std::stringstream fileStream;
fileStream << "/" << fileVersion << "/" << fileName;
fileStream << "/" << fileVersion;
absolutePath /= fileStream.str();
path /= fileStream.str();
DEBUG_STREAM << "ProtocolManager::composePath() "
<< absolutePath.string() << endl;
DEBUG_STREAM << "ProtocolManager::composePath() \'" << path << "\'" << endl;
return absolutePath;
return path;
}
} //namespace
......@@ -61,23 +61,26 @@ public:
//------------------------------------------------------------------------------
virtual void updateFileLists() throw(soci::soci_error);
virtual bool hasNewFile();
virtual bool hasNextFile();
virtual void nextFile();
//------------------------------------------------------------------------------
// [Public] Request response methods
//------------------------------------------------------------------------------
RequestSP createRequest() throw(std::runtime_error);
RequestSP createRequest()
throw(std::logic_error, std::runtime_error);
FileWrapper::SP processResponse(ResponseSP);
FileWrapper::SP processResponse(ResponseSP)
throw(std::logic_error, std::runtime_error);
protected:
//------------------------------------------------------------------------------
// [Protected] File path method
//------------------------------------------------------------------------------
virtual boost::filesystem::path composePath(std::string, std::string,
int, std::string);
virtual boost::filesystem::path composePath(std::string, std::string, int);
//------------------------------------------------------------------------------
// [Protected] Class variables
......@@ -97,6 +100,9 @@ protected:
//New file list current timestamp index
boost::posix_time::ptime m_currentTimestamp;
//Processing file from recovery list
bool m_recoveryMode;
//New file list shared pointer
DBManager::NewFileRowsetSP m_newFileRowset_sp;
......@@ -108,9 +114,6 @@ protected:
//Failed file list iterator
DBManager::FailedFileRowset::const_iterator m_failedFileRowsetIt;
//Processing file from recovery list
bool m_recoveryMode;
};
} //End of namespace
......
......@@ -193,7 +193,14 @@ void SSLClient::startWriteRequest()
boost::bind(&SSLClient::handleWriteRequest, this,
boost::asio::placeholders::error));
}
catch(std::exception& ec)
catch(std::logic_error& ec)
{
ERROR_STREAM << "SSLClient::startWriteRequest() " << ec.what() << endl;
writeState(Tango::ALARM);
writeStatus(ec.what());
}
catch(std::runtime_error& ec)
{
ERROR_STREAM << "SSLClient::startWriteRequest() " << ec.what() << endl;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment