Select Git revision
XmlConfig.cpp
-
Andrea Zoli authoredAndrea Zoli authored
ProtocolManager.cpp 12.24 KiB
#include <ProtocolManager.h>
#include <MetadataImporter.h>
#include <boost/date_time.hpp>
namespace MetadataImporter_ns
{
//==============================================================================
// ProtocolManager::ProtocolManager()
//==============================================================================
ProtocolManager::ProtocolManager(MetadataImporter* metadataImporter_p,
Configuration::SP configuration_sp, DBManager::SP dBManager_sp) :
Tango::LogAdapter(metadataImporter_p), m_metadataImporter_p(metadataImporter_p),
m_configuration_sp(configuration_sp), m_dBManager_sp(dBManager_sp)
{
DEBUG_STREAM << "ProtocolManager::ProtocolManager()" << endl;
m_isAuthorised = false;
m_isValidated = false;
m_hasMoreData = true;
}
//==============================================================================
// ProtocolManager::ProtocolManager()
//==============================================================================
ProtocolManager::~ProtocolManager()
{
DEBUG_STREAM << "ProtocolManager::~ProtocolManager()" << endl;
}
//==============================================================================
// ProtocolManager::ProtocolManager()
//==============================================================================
ProtocolManager::SP ProtocolManager::create(MetadataImporter* metadataImporter_p,
Configuration::SP configuration_sp, DBManager::SP dBManager_sp)
{
ProtocolManager::SP d_sp(new ProtocolManager(metadataImporter_p,
configuration_sp, dBManager_sp), ProtocolManager::Deleter());
return d_sp;
}
//==============================================================================
// ProtocolManager::ProtocolManager()
//==============================================================================
void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint)
{
DEBUG_STREAM << "ProtocolManager::setRemoteEndpoint()" << endl;
m_remoteEndpoint = remoteEndpoint;
}
//==============================================================================
// ProtocolManager::waitBeforeRequest()
//==============================================================================
bool ProtocolManager::waitBeforeRequest()
{
DEBUG_STREAM << "ProtocolManager::waitBeforeRequest()" << endl;
return !m_hasMoreData;
}
//==============================================================================
// ProtocolManager::resetProtocolStatus()
//==============================================================================
void ProtocolManager::resetProtocolStatus()
{
DEBUG_STREAM << "ProtocolManager::resetProtocolStatus()" << endl;
m_isAuthorised = false;
m_isValidated = false;
m_hasMoreData = true;
}
//==============================================================================
// ProtocolManager::createRequest()
//==============================================================================
RequestSP ProtocolManager::createRequest()
throw(std::runtime_error, std::out_of_range)
{
DEBUG_STREAM << "ProtocolManager::createRequest()" << endl;
RequestSP request_sp;
if(!m_isAuthorised)
{
request_sp = createAuthroisation();
}
else if(!m_isValidated)
{
request_sp = createValidation();
}
else
{
request_sp = createMetadata();
}
if(!request_sp->IsInitialized())
throw std::runtime_error("Not initialized request!");
return request_sp;
}
//==============================================================================
// ProtocolManager::processResponse()
//==============================================================================
void ProtocolManager::processResponse(ResponseSP response_sp)
throw(std::runtime_error, std::out_of_range)
{
DEBUG_STREAM << "ProtocolManager::processResponse()" << endl;
if(!response_sp->IsInitialized())
throw std::runtime_error("Not initialized response!");
switch(response_sp->type())
{
case Response::AUTHORIZATION:
{
processAuthroisation(response_sp);
break;
}
case Response::VALIDATION:
{
processValidation(response_sp);
break;
}
case Response::METADATA:
{
processMetadata(response_sp);
break;
}
default:
throw std::runtime_error("Unknown response type");
}
}
//==============================================================================
// ProtocolManager::createAuthroisation()
//==============================================================================
RequestSP ProtocolManager::createAuthroisation() throw(std::runtime_error)
{
DEBUG_STREAM << "ProtocolManager::createAuthroisation()" << endl;
RequestSP request_sp(new Request);
request_sp->set_type(Request::AUTHORIZATION);
std::string user = m_configuration_sp->getRemoteUsername();
std::string password = m_configuration_sp->getRemotePassword();
#ifdef VERBOSE_DEBUG
INFO_STREAM << "ProtocolManager::createAuthroisation() Send username "
<< user << " password " << password << " to " << m_remoteEndpoint << endl;
#else
INFO_STREAM << "ProtocolManager::createAuthroisation() Send to "
<< m_remoteEndpoint << endl;
#endif
Request::Authorization* authorization = request_sp->mutable_authorization();
authorization->set_username(user);
authorization->set_password(password);
return request_sp;
}
//==============================================================================
// ProtocolManager::createValidation()
//==============================================================================
RequestSP ProtocolManager::createValidation() throw(std::runtime_error)
{
DEBUG_STREAM << "ProtocolManager::createValidation()" << endl;
RequestSP request_sp(new Request);
request_sp->set_type(Request::VALIDATION);
std::string schema = m_configuration_sp->getDatabaseSchema();
std::string table = m_configuration_sp->getDatabaseTable();
std::string rschema = m_configuration_sp->getRemoteSchema();
std::string rtable = m_configuration_sp->getRemoteTable();
INFO_STREAM << "ProtocolManager::createValidation() Send local schema "
<< schema << " table " << table << " to " << m_remoteEndpoint << endl;
Request::Validation* validation = request_sp->mutable_validation();
validation->set_schema(rschema);
validation->set_table(rtable);
DBManager::InformationList informationList =
m_dBManager_sp->retrieveInformation(schema, table);
DBManager::InformationList::const_iterator it;
for(it=informationList.begin(); it!=informationList.end(); ++it)
{
Request::Validation::Column* column = validation->add_columns();
if(!it->get<0>())
throw std::runtime_error("Empty column name in information schema");
std::string columnName = it->get<0>().get();
column->set_name(columnName);
if(!it->get<1>())
throw std::runtime_error("Empty column type in information schema");
std::string columnType = it->get<1>().get();
column->set_type(columnType);
if(!it->get<2>())
throw std::runtime_error("Empty column nullable in information schema");
std::string isNullable = it->get<2>().get();
column->set_nullable(isNullable);
#ifdef VERBOSE_DEBUG
INFO_STREAM << "ProtocolManager::createValidation() " << columnName
<< " " << columnType << " " << isNullable << endl;
#endif
}
return request_sp;
}
//==============================================================================
// ProtocolManager::createMetadata()
//==============================================================================
RequestSP ProtocolManager::createMetadata()
throw(std::runtime_error, std::out_of_range)
{
DEBUG_STREAM << "ProtocolManager::createMetadata()" << endl;
RequestSP request_sp(new Request);
request_sp->set_type(Request::METADATA);
std::string schema = m_configuration_sp->getDatabaseSchema();
std::string table = m_configuration_sp->getDatabaseTable();
std::string rschema = m_configuration_sp->getRemoteSchema();
std::string rtable = m_configuration_sp->getRemoteTable();
std::tm tmTimestamp = m_dBManager_sp->retrieveLastTimestamp(schema, table);
boost::posix_time::ptime ptTimestamp =
boost::posix_time::ptime_from_tm(tmTimestamp);
INFO_STREAM << "ProtocolManager::createMetadata() Send schema "
<< rschema << " table " << rtable << " timestamp "
<< boost::posix_time::to_simple_string(ptTimestamp)
<< " to " << m_remoteEndpoint << endl;
Request::Metadata* metadata = request_sp->mutable_metadata();
metadata->set_timestamp(mktime(&tmTimestamp));
return request_sp;
}
//==============================================================================
// ProtocolManager::processAuthroisation()
//==============================================================================
void ProtocolManager::processAuthroisation(ResponseSP response_sp)
throw(std::runtime_error)
{
DEBUG_STREAM << "ProtocolManager::processAuthroisation()" << endl;
const Response::Authorization& authorization = response_sp->authorization();
if(authorization.state() == Response::Authorization::ACCEPTED)
{
INFO_STREAM << "ProtocolManager::processAuthroisation() State ACCEPTED "
<< "status " << authorization.status() << " from " << m_remoteEndpoint << endl;
m_isAuthorised = true;
}
else
{
ERROR_STREAM << "ProtocolManager::processAuthroisation() State REJECTED "
<< "status " << authorization.status() << " from " << m_remoteEndpoint << endl;
throw std::runtime_error(authorization.status());
}
}
//==============================================================================
// ProtocolManager::processValidation()
//==============================================================================
void ProtocolManager::processValidation(ResponseSP response_sp)
throw(std::runtime_error)
{
DEBUG_STREAM << "ProtocolManager::processValidation()" << endl;
const Response::Validation& validation = response_sp->validation();
if(validation.state() == Response::Validation::ACCEPTED)
{
INFO_STREAM << "ProtocolManager::processValidation() State ACCEPTED "
<< "status " << validation.status() << " from " << m_remoteEndpoint << endl;
m_isValidated = true;
}
else
{
ERROR_STREAM << "ProtocolManager::processValidation() State REJECTED "
<< "status " << validation.status() << " from " << m_remoteEndpoint << endl;
throw std::runtime_error(validation.status());
}
}
//==============================================================================
// ProtocolManager::processMetadata()
//==============================================================================
void ProtocolManager::processMetadata(ResponseSP response_sp)
throw(std::runtime_error, std::out_of_range)
{
DEBUG_STREAM << "ProtocolManager::processMetadata()" << endl;
const Response::Metadata& metadata = response_sp->metadata();
if(metadata.state() == Response::Metadata::ACCEPTED)
{
std::string schema = m_configuration_sp->getDatabaseSchema();
std::string table = m_configuration_sp->getDatabaseTable();
std::string rschema = m_configuration_sp->getRemoteSchema();
std::string rtable = m_configuration_sp->getRemoteTable();
INFO_STREAM << "ProtocolManager::processMetadata() State ACCEPTED "
<< "status " << metadata.status() << " schema " << rschema
<< " table " << rtable << " from " << m_remoteEndpoint << endl;
if(metadata.rows_size() != 0)
{
m_dBManager_sp->persistMetadata(schema, table, metadata);
m_hasMoreData = true;
}
else
{
m_hasMoreData = false;
}
m_metadataImporter_p->incrementTupleCounter(metadata.rows_size());
}
else
{
ERROR_STREAM << "ProtocolManager::processMetadata() State REJECTED "
<< "status " << metadata.status() << " from " << m_remoteEndpoint << endl;
throw std::runtime_error(metadata.status());
}
}
} //namespace