#include #include #include namespace MetadataImporter_ns { //============================================================================== // ProtocolManager::ProtocolManager() //============================================================================== ProtocolManager::ProtocolManager(MetadataImporter* metadataImporter_p, Configuration::SP configuration_sp, DBManager::SP dBManager_sp) : Tango::LogAdapter(metadataImporter_p), m_metadataImporter_p(metadataImporter_p), m_configuration_sp(configuration_sp), m_dBManager_sp(dBManager_sp) { DEBUG_STREAM << "ProtocolManager::ProtocolManager()" << endl; m_isAuthorised = false; m_isValidated = false; m_hasMoreData = true; } //============================================================================== // ProtocolManager::ProtocolManager() //============================================================================== ProtocolManager::~ProtocolManager() { DEBUG_STREAM << "ProtocolManager::~ProtocolManager()" << endl; } //============================================================================== // ProtocolManager::ProtocolManager() //============================================================================== ProtocolManager::SP ProtocolManager::create(MetadataImporter* metadataImporter_p, Configuration::SP configuration_sp, DBManager::SP dBManager_sp) { ProtocolManager::SP d_sp(new ProtocolManager(metadataImporter_p, configuration_sp, dBManager_sp), ProtocolManager::Deleter()); return d_sp; } //============================================================================== // ProtocolManager::ProtocolManager() //============================================================================== void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint) { DEBUG_STREAM << "ProtocolManager::setRemoteEndpoint()" << endl; m_remoteEndpoint = remoteEndpoint; } //============================================================================== // ProtocolManager::waitBeforeRequest() //============================================================================== bool ProtocolManager::waitBeforeRequest() { DEBUG_STREAM << "ProtocolManager::waitBeforeRequest()" << endl; return !m_hasMoreData; } //============================================================================== // ProtocolManager::resetProtocolStatus() //============================================================================== void ProtocolManager::resetProtocolStatus() { DEBUG_STREAM << "ProtocolManager::resetProtocolStatus()" << endl; m_isAuthorised = false; m_isValidated = false; m_hasMoreData = true; } //============================================================================== // ProtocolManager::createRequest() //============================================================================== RequestSP ProtocolManager::createRequest() throw(std::runtime_error, std::out_of_range) { DEBUG_STREAM << "ProtocolManager::createRequest()" << endl; RequestSP request_sp; if(!m_isAuthorised) { request_sp = createAuthroisation(); } else if(!m_isValidated) { request_sp = createValidation(); } else { request_sp = createMetadata(); } if(!request_sp->IsInitialized()) throw std::runtime_error("Not initialized request!"); return request_sp; } //============================================================================== // ProtocolManager::processResponse() //============================================================================== void ProtocolManager::processResponse(ResponseSP response_sp) throw(std::runtime_error, std::out_of_range) { DEBUG_STREAM << "ProtocolManager::processResponse()" << endl; if(!response_sp->IsInitialized()) throw std::runtime_error("Not initialized response!"); switch(response_sp->type()) { case Response::AUTHORIZATION: { processAuthroisation(response_sp); break; } case Response::VALIDATION: { processValidation(response_sp); break; } case Response::METADATA: { processMetadata(response_sp); break; } default: throw std::runtime_error("Unknown response type"); } } //============================================================================== // ProtocolManager::createAuthroisation() //============================================================================== RequestSP ProtocolManager::createAuthroisation() throw(std::runtime_error) { DEBUG_STREAM << "ProtocolManager::createAuthroisation()" << endl; RequestSP request_sp(new Request); request_sp->set_type(Request::AUTHORIZATION); std::string user = m_configuration_sp->getRemoteUsername(); std::string password = m_configuration_sp->getRemotePassword(); #ifdef VERBOSE_DEBUG INFO_STREAM << "ProtocolManager::createAuthroisation() Send username " << user << " password " << password << " to " << m_remoteEndpoint << endl; #else INFO_STREAM << "ProtocolManager::createAuthroisation() Send to " << m_remoteEndpoint << endl; #endif Request::Authorization* authorization = request_sp->mutable_authorization(); authorization->set_username(user); authorization->set_password(password); return request_sp; } //============================================================================== // ProtocolManager::createValidation() //============================================================================== RequestSP ProtocolManager::createValidation() throw(std::runtime_error) { DEBUG_STREAM << "ProtocolManager::createValidation()" << endl; RequestSP request_sp(new Request); request_sp->set_type(Request::VALIDATION); std::string schema = m_configuration_sp->getDatabaseSchema(); std::string table = m_configuration_sp->getDatabaseTable(); std::string rschema = m_configuration_sp->getRemoteSchema(); std::string rtable = m_configuration_sp->getRemoteTable(); INFO_STREAM << "ProtocolManager::createValidation() Send local schema " << schema << " table " << table << " to " << m_remoteEndpoint << endl; Request::Validation* validation = request_sp->mutable_validation(); validation->set_schema(rschema); validation->set_table(rtable); DBManager::InformationList informationList = m_dBManager_sp->retrieveInformation(schema, table); DBManager::InformationList::const_iterator it; for(it=informationList.begin(); it!=informationList.end(); ++it) { Request::Validation::Column* column = validation->add_columns(); if(!it->get<0>()) throw std::runtime_error("Empty column name in information schema"); std::string columnName = it->get<0>().get(); column->set_name(columnName); if(!it->get<1>()) throw std::runtime_error("Empty column type in information schema"); std::string columnType = it->get<1>().get(); column->set_type(columnType); if(!it->get<2>()) throw std::runtime_error("Empty column nullable in information schema"); std::string isNullable = it->get<2>().get(); column->set_nullable(isNullable); #ifdef VERBOSE_DEBUG INFO_STREAM << "ProtocolManager::createValidation() " << columnName << " " << columnType << " " << isNullable << endl; #endif } return request_sp; } //============================================================================== // ProtocolManager::createMetadata() //============================================================================== RequestSP ProtocolManager::createMetadata() throw(std::runtime_error, std::out_of_range) { DEBUG_STREAM << "ProtocolManager::createMetadata()" << endl; RequestSP request_sp(new Request); request_sp->set_type(Request::METADATA); std::string schema = m_configuration_sp->getDatabaseSchema(); std::string table = m_configuration_sp->getDatabaseTable(); std::string rschema = m_configuration_sp->getRemoteSchema(); std::string rtable = m_configuration_sp->getRemoteTable(); std::tm tmTimestamp = m_dBManager_sp->retrieveLastTimestamp(schema, table); boost::posix_time::ptime ptTimestamp = boost::posix_time::ptime_from_tm(tmTimestamp); INFO_STREAM << "ProtocolManager::createMetadata() Send schema " << rschema << " table " << rtable << " timestamp " << boost::posix_time::to_simple_string(ptTimestamp) << " to " << m_remoteEndpoint << endl; Request::Metadata* metadata = request_sp->mutable_metadata(); metadata->set_timestamp(mktime(&tmTimestamp)); return request_sp; } //============================================================================== // ProtocolManager::processAuthroisation() //============================================================================== void ProtocolManager::processAuthroisation(ResponseSP response_sp) throw(std::runtime_error) { DEBUG_STREAM << "ProtocolManager::processAuthroisation()" << endl; const Response::Authorization& authorization = response_sp->authorization(); if(authorization.state() == Response::Authorization::ACCEPTED) { INFO_STREAM << "ProtocolManager::processAuthroisation() State ACCEPTED " << "status " << authorization.status() << " from " << m_remoteEndpoint << endl; m_isAuthorised = true; } else { ERROR_STREAM << "ProtocolManager::processAuthroisation() State REJECTED " << "status " << authorization.status() << " from " << m_remoteEndpoint << endl; throw std::runtime_error(authorization.status()); } } //============================================================================== // ProtocolManager::processValidation() //============================================================================== void ProtocolManager::processValidation(ResponseSP response_sp) throw(std::runtime_error) { DEBUG_STREAM << "ProtocolManager::processValidation()" << endl; const Response::Validation& validation = response_sp->validation(); if(validation.state() == Response::Validation::ACCEPTED) { INFO_STREAM << "ProtocolManager::processValidation() State ACCEPTED " << "status " << validation.status() << " from " << m_remoteEndpoint << endl; m_isValidated = true; } else { ERROR_STREAM << "ProtocolManager::processValidation() State REJECTED " << "status " << validation.status() << " from " << m_remoteEndpoint << endl; throw std::runtime_error(validation.status()); } } //============================================================================== // ProtocolManager::processMetadata() //============================================================================== void ProtocolManager::processMetadata(ResponseSP response_sp) throw(std::runtime_error, std::out_of_range) { DEBUG_STREAM << "ProtocolManager::processMetadata()" << endl; const Response::Metadata& metadata = response_sp->metadata(); if(metadata.state() == Response::Metadata::ACCEPTED) { std::string schema = m_configuration_sp->getDatabaseSchema(); std::string table = m_configuration_sp->getDatabaseTable(); std::string rschema = m_configuration_sp->getRemoteSchema(); std::string rtable = m_configuration_sp->getRemoteTable(); INFO_STREAM << "ProtocolManager::processMetadata() State ACCEPTED " << "status " << metadata.status() << " schema " << rschema << " table " << rtable << " from " << m_remoteEndpoint << endl; if(metadata.rows_size() != 0) { m_dBManager_sp->persistMetadata(schema, table, metadata); m_hasMoreData = true; } else { m_hasMoreData = false; } m_metadataImporter_p->incrementTupleCounter(metadata.rows_size()); } else { ERROR_STREAM << "ProtocolManager::processMetadata() State REJECTED " << "status " << metadata.status() << " from " << m_remoteEndpoint << endl; throw std::runtime_error(metadata.status()); } } } //namespace