Select Git revision
ProtocolManager.cpp
-
Andrea Bignamini authored
The remoteSchema and remote Table device properties have been added. The shcema and the table names on the local machine can be different with respect to the names in the remote machine.
Andrea Bignamini authoredThe remoteSchema and remote Table device properties have been added. The shcema and the table names on the local machine can be different with respect to the names in the remote machine.
ProtocolManager.cpp 12.24 KiB
#include <ProtocolManager.h>
#include <MetadataImporter.h>
#include <boost/date_time.hpp>
namespace MetadataImporter_ns
{
//==============================================================================
// ProtocolManager::ProtocolManager()
//==============================================================================
ProtocolManager::ProtocolManager(MetadataImporter* metadataImporter_p,
Configuration::SP configuration_sp, DBManager::SP dBManager_sp) :
Tango::LogAdapter(metadataImporter_p), m_metadataImporter_p(metadataImporter_p),
m_configuration_sp(configuration_sp), m_dBManager_sp(dBManager_sp)
{
DEBUG_STREAM << "ProtocolManager::ProtocolManager()" << endl;
m_isAuthorised = false;
m_isValidated = false;
m_hasMoreData = true;
}
//==============================================================================
// ProtocolManager::ProtocolManager()
//==============================================================================
ProtocolManager::~ProtocolManager()
{
DEBUG_STREAM << "ProtocolManager::~ProtocolManager()" << endl;
}
//==============================================================================
// ProtocolManager::ProtocolManager()
//==============================================================================
ProtocolManager::SP ProtocolManager::create(MetadataImporter* metadataImporter_p,
Configuration::SP configuration_sp, DBManager::SP dBManager_sp)
{
ProtocolManager::SP d_sp(new ProtocolManager(metadataImporter_p,
configuration_sp, dBManager_sp), ProtocolManager::Deleter());
return d_sp;
}
//==============================================================================
// ProtocolManager::ProtocolManager()
//==============================================================================
void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint)
{
DEBUG_STREAM << "ProtocolManager::setRemoteEndpoint()" << endl;
m_remoteEndpoint = remoteEndpoint;
}
//==============================================================================
// ProtocolManager::waitBeforeRequest()
//==============================================================================
bool ProtocolManager::waitBeforeRequest()
{
DEBUG_STREAM << "ProtocolManager::waitBeforeRequest()" << endl;
return !m_hasMoreData;
}
//==============================================================================
// ProtocolManager::resetProtocolStatus()
//==============================================================================
void ProtocolManager::resetProtocolStatus()
{
DEBUG_STREAM << "ProtocolManager::resetProtocolStatus()" << endl;
m_isAuthorised = false;
m_isValidated = false;
m_hasMoreData = true;
}
//==============================================================================
// ProtocolManager::createRequest()
//==============================================================================
RequestSP ProtocolManager::createRequest()
throw(std::runtime_error, std::out_of_range)
{
DEBUG_STREAM << "ProtocolManager::createRequest()" << endl;
RequestSP request_sp;
if(!m_isAuthorised)
{
request_sp = createAuthroisation();
}
else if(!m_isValidated)
{
request_sp = createValidation();
}
else
{
request_sp = createMetadata();
}
if(!request_sp->IsInitialized())
throw std::runtime_error("Not initialized request!");
return request_sp;
}
//==============================================================================
// ProtocolManager::processResponse()
//==============================================================================
void ProtocolManager::processResponse(ResponseSP response_sp)
throw(std::runtime_error, std::out_of_range)
{
DEBUG_STREAM << "ProtocolManager::processResponse()" << endl;
if(!response_sp->IsInitialized())
throw std::runtime_error("Not initialized response!");
switch(response_sp->type())
{
case Response::AUTHORIZATION:
{
processAuthroisation(response_sp);
break;
}
case Response::VALIDATION:
{
processValidation(response_sp);
break;
}
case Response::METADATA:
{
processMetadata(response_sp);
break;
}
default:
throw std::runtime_error("Unknown response type");
}
}
//==============================================================================
// ProtocolManager::createAuthroisation()
//==============================================================================
RequestSP ProtocolManager::createAuthroisation() throw(std::runtime_error)
{
DEBUG_STREAM << "ProtocolManager::createAuthroisation()" << endl;
RequestSP request_sp(new Request);
request_sp->set_type(Request::AUTHORIZATION);
std::string user = m_configuration_sp->getRemoteUsername();
std::string password = m_configuration_sp->getRemotePassword();
#ifdef VERBOSE_DEBUG
INFO_STREAM << "ProtocolManager::createAuthroisation() Send username "
<< user << " password " << password << " to " << m_remoteEndpoint << endl;
#else
INFO_STREAM << "ProtocolManager::createAuthroisation() Send to "
<< m_remoteEndpoint << endl;
#endif
Request::Authorization* authorization = request_sp->mutable_authorization();
authorization->set_username(user);
authorization->set_password(password);
return request_sp;
}
//==============================================================================
// ProtocolManager::createValidation()
//==============================================================================
RequestSP ProtocolManager::createValidation() throw(std::runtime_error)
{
DEBUG_STREAM << "ProtocolManager::createValidation()" << endl;
RequestSP request_sp(new Request);
request_sp->set_type(Request::VALIDATION);
std::string schema = m_configuration_sp->getDatabaseSchema();
std::string table = m_configuration_sp->getDatabaseTable();
std::string rschema = m_configuration_sp->getRemoteSchema();
std::string rtable = m_configuration_sp->getRemoteTable();
INFO_STREAM << "ProtocolManager::createValidation() Send local schema "
<< schema << " table " << table << " to " << m_remoteEndpoint << endl;
Request::Validation* validation = request_sp->mutable_validation();
validation->set_schema(rschema);
validation->set_table(rtable);
DBManager::InformationList informationList =
m_dBManager_sp->retrieveInformation(schema, table);
DBManager::InformationList::const_iterator it;
for(it=informationList.begin(); it!=informationList.end(); ++it)
{
Request::Validation::Column* column = validation->add_columns();
if(!it->get<0>())
throw std::runtime_error("Empty column name in information schema");
std::string columnName = it->get<0>().get();
column->set_name(columnName);
if(!it->get<1>())
throw std::runtime_error("Empty column type in information schema");
std::string columnType = it->get<1>().get();
column->set_type(columnType);
if(!it->get<2>())
throw std::runtime_error("Empty column nullable in information schema");
std::string isNullable = it->get<2>().get();
column->set_nullable(isNullable);
#ifdef VERBOSE_DEBUG
INFO_STREAM << "ProtocolManager::createValidation() " << columnName
<< " " << columnType << " " << isNullable << endl;
#endif
}
return request_sp;
}
//==============================================================================
// ProtocolManager::createMetadata()
//==============================================================================
RequestSP ProtocolManager::createMetadata()
throw(std::runtime_error, std::out_of_range)
{
DEBUG_STREAM << "ProtocolManager::createMetadata()" << endl;
RequestSP request_sp(new Request);
request_sp->set_type(Request::METADATA);
std::string schema = m_configuration_sp->getDatabaseSchema();
std::string table = m_configuration_sp->getDatabaseTable();
std::string rschema = m_configuration_sp->getRemoteSchema();
std::string rtable = m_configuration_sp->getRemoteTable();
std::tm tmTimestamp = m_dBManager_sp->retrieveLastTimestamp(schema, table);
boost::posix_time::ptime ptTimestamp =
boost::posix_time::ptime_from_tm(tmTimestamp);
INFO_STREAM << "ProtocolManager::createMetadata() Send schema "
<< rschema << " table " << rtable << " timestamp "
<< boost::posix_time::to_simple_string(ptTimestamp)
<< " to " << m_remoteEndpoint << endl;
Request::Metadata* metadata = request_sp->mutable_metadata();
metadata->set_timestamp(mktime(&tmTimestamp));
return request_sp;
}
//==============================================================================
// ProtocolManager::processAuthroisation()
//==============================================================================
void ProtocolManager::processAuthroisation(ResponseSP response_sp)
throw(std::runtime_error)
{
DEBUG_STREAM << "ProtocolManager::processAuthroisation()" << endl;
const Response::Authorization& authorization = response_sp->authorization();
if(authorization.state() == Response::Authorization::ACCEPTED)
{
INFO_STREAM << "ProtocolManager::processAuthroisation() State ACCEPTED "
<< "status " << authorization.status() << " from " << m_remoteEndpoint << endl;
m_isAuthorised = true;
}
else
{
ERROR_STREAM << "ProtocolManager::processAuthroisation() State REJECTED "
<< "status " << authorization.status() << " from " << m_remoteEndpoint << endl;
throw std::runtime_error(authorization.status());
}
}
//==============================================================================
// ProtocolManager::processValidation()
//==============================================================================
void ProtocolManager::processValidation(ResponseSP response_sp)
throw(std::runtime_error)
{
DEBUG_STREAM << "ProtocolManager::processValidation()" << endl;
const Response::Validation& validation = response_sp->validation();
if(validation.state() == Response::Validation::ACCEPTED)
{
INFO_STREAM << "ProtocolManager::processValidation() State ACCEPTED "
<< "status " << validation.status() << " from " << m_remoteEndpoint << endl;
m_isValidated = true;
}
else
{
ERROR_STREAM << "ProtocolManager::processValidation() State REJECTED "
<< "status " << validation.status() << " from " << m_remoteEndpoint << endl;
throw std::runtime_error(validation.status());
}
}
//==============================================================================
// ProtocolManager::processMetadata()
//==============================================================================
void ProtocolManager::processMetadata(ResponseSP response_sp)
throw(std::runtime_error, std::out_of_range)
{
DEBUG_STREAM << "ProtocolManager::processMetadata()" << endl;
const Response::Metadata& metadata = response_sp->metadata();
if(metadata.state() == Response::Metadata::ACCEPTED)
{
std::string schema = m_configuration_sp->getDatabaseSchema();
std::string table = m_configuration_sp->getDatabaseTable();
std::string rschema = m_configuration_sp->getRemoteSchema();
std::string rtable = m_configuration_sp->getRemoteTable();
INFO_STREAM << "ProtocolManager::processMetadata() State ACCEPTED "
<< "status " << metadata.status() << " schema " << rschema
<< " table " << rtable << " from " << m_remoteEndpoint << endl;
if(metadata.rows_size() != 0)
{
m_dBManager_sp->persistMetadata(schema, table, metadata);
m_hasMoreData = true;
}
else
{
m_hasMoreData = false;
}
m_metadataImporter_p->incrementTupleCounter(metadata.rows_size());
}
else
{
ERROR_STREAM << "ProtocolManager::processMetadata() State REJECTED "
<< "status " << metadata.status() << " from " << m_remoteEndpoint << endl;
throw std::runtime_error(metadata.status());
}
}
} //namespace