Skip to content
Snippets Groups Projects
Select Git revision
  • c1ab41f7322c8acabefa995a9cee38e64b7af547
  • master default protected
  • v4.5.2
  • v4.5.1
  • v4.5.0
  • v4.4.0
  • v4.3.3
  • v4.3.2
  • v4.3.1
  • v4.3.0
  • v4.2.0
  • v4.1.0
  • v4.0.2
  • v4.0.1
  • v4.0.0
  • v3.4.0
  • v3.3.0
  • v3.2.0
  • v3.1.1
  • v3.1.0
  • v3.0.1
  • v3.0.0
22 results

XmlConfig.cpp

Blame
  • ProtocolManager.cpp 12.24 KiB
    #include <ProtocolManager.h>
    #include <MetadataImporter.h>
    
    #include <boost/date_time.hpp>
    
    namespace MetadataImporter_ns
    {
    
    //==============================================================================
    //      ProtocolManager::ProtocolManager()
    //==============================================================================
    ProtocolManager::ProtocolManager(MetadataImporter* metadataImporter_p,
        Configuration::SP configuration_sp, DBManager::SP dBManager_sp) :
        Tango::LogAdapter(metadataImporter_p), m_metadataImporter_p(metadataImporter_p),
        m_configuration_sp(configuration_sp), m_dBManager_sp(dBManager_sp)
    {
        DEBUG_STREAM << "ProtocolManager::ProtocolManager()" << endl;
    
        m_isAuthorised = false;
        m_isValidated = false;
        m_hasMoreData = true;
    }
    
    //==============================================================================
    //      ProtocolManager::ProtocolManager()
    //==============================================================================
    ProtocolManager::~ProtocolManager()
    {
        DEBUG_STREAM << "ProtocolManager::~ProtocolManager()" << endl;
    }
    
    //==============================================================================
    //      ProtocolManager::ProtocolManager()
    //==============================================================================
    ProtocolManager::SP ProtocolManager::create(MetadataImporter* metadataImporter_p,
        Configuration::SP configuration_sp, DBManager::SP dBManager_sp)
    {
        ProtocolManager::SP d_sp(new ProtocolManager(metadataImporter_p,
            configuration_sp, dBManager_sp), ProtocolManager::Deleter());
    
        return d_sp;
    }
    
    //==============================================================================
    //      ProtocolManager::ProtocolManager()
    //==============================================================================
    void ProtocolManager::setRemoteEndpoint(std::string remoteEndpoint)
    {
        DEBUG_STREAM << "ProtocolManager::setRemoteEndpoint()" << endl;
    
        m_remoteEndpoint = remoteEndpoint;
    }
    
    //==============================================================================
    //      ProtocolManager::waitBeforeRequest()
    //==============================================================================
    bool ProtocolManager::waitBeforeRequest()
    {
        DEBUG_STREAM << "ProtocolManager::waitBeforeRequest()" << endl;
    
        return !m_hasMoreData;
    }
    
    //==============================================================================
    //      ProtocolManager::resetProtocolStatus()
    //==============================================================================
    void ProtocolManager::resetProtocolStatus()
    {
        DEBUG_STREAM << "ProtocolManager::resetProtocolStatus()" << endl;
    
        m_isAuthorised = false;
        m_isValidated = false;
        m_hasMoreData = true;
    }
    
    //==============================================================================
    //      ProtocolManager::createRequest()
    //==============================================================================
    RequestSP ProtocolManager::createRequest()
        throw(std::runtime_error, std::out_of_range)
    {
        DEBUG_STREAM << "ProtocolManager::createRequest()" << endl;
    
        RequestSP request_sp;
    
        if(!m_isAuthorised)
        {
            request_sp = createAuthroisation();
        }
        else if(!m_isValidated)
        {
            request_sp = createValidation();
        }
        else
        {
            request_sp = createMetadata();
        }
    
        if(!request_sp->IsInitialized())
            throw std::runtime_error("Not initialized request!");
    
        return request_sp;
    }
    
    //==============================================================================
    //      ProtocolManager::processResponse()
    //==============================================================================
    void ProtocolManager::processResponse(ResponseSP response_sp)
        throw(std::runtime_error, std::out_of_range)
    {
        DEBUG_STREAM << "ProtocolManager::processResponse()" << endl;
    
        if(!response_sp->IsInitialized())
            throw std::runtime_error("Not initialized response!");
    
        switch(response_sp->type())
        {
            case Response::AUTHORIZATION:
            {
                processAuthroisation(response_sp);
                break;
            }
            case Response::VALIDATION:
            {
                processValidation(response_sp);
                break;
            }
            case Response::METADATA:
            {
                processMetadata(response_sp);
                break;
            }
            default:
                throw std::runtime_error("Unknown response type");
        }
    }
    
    //==============================================================================
    //      ProtocolManager::createAuthroisation()
    //==============================================================================
    RequestSP ProtocolManager::createAuthroisation() throw(std::runtime_error)
    {
        DEBUG_STREAM << "ProtocolManager::createAuthroisation()" << endl;
    
        RequestSP request_sp(new Request);
    
        request_sp->set_type(Request::AUTHORIZATION);
    
        std::string user = m_configuration_sp->getRemoteUsername();
        std::string password = m_configuration_sp->getRemotePassword();
    
        #ifdef VERBOSE_DEBUG
            INFO_STREAM << "ProtocolManager::createAuthroisation() Send username "
                << user << " password " << password << " to " << m_remoteEndpoint << endl;
        #else
            INFO_STREAM << "ProtocolManager::createAuthroisation() Send to "
                << m_remoteEndpoint << endl;
        #endif
    
        Request::Authorization* authorization = request_sp->mutable_authorization();
        authorization->set_username(user);
        authorization->set_password(password);
    
        return request_sp;
    }
    
    //==============================================================================
    //      ProtocolManager::createValidation()
    //==============================================================================
    RequestSP ProtocolManager::createValidation() throw(std::runtime_error)
    {
        DEBUG_STREAM << "ProtocolManager::createValidation()" << endl;
    
        RequestSP request_sp(new Request);
    
        request_sp->set_type(Request::VALIDATION);
    
        std::string schema = m_configuration_sp->getDatabaseSchema();
        std::string table = m_configuration_sp->getDatabaseTable();
        std::string rschema = m_configuration_sp->getRemoteSchema();
        std::string rtable = m_configuration_sp->getRemoteTable();
    
        INFO_STREAM << "ProtocolManager::createValidation() Send local schema "
            << schema << " table " << table << " to " << m_remoteEndpoint << endl;
    
        Request::Validation* validation = request_sp->mutable_validation();
        validation->set_schema(rschema);
        validation->set_table(rtable);
    
        DBManager::InformationList informationList =
                m_dBManager_sp->retrieveInformation(schema, table);
    
        DBManager::InformationList::const_iterator it;
        for(it=informationList.begin(); it!=informationList.end(); ++it)
        {
            Request::Validation::Column* column = validation->add_columns();
    
            if(!it->get<0>())
                throw std::runtime_error("Empty column name in information schema");
    
            std::string columnName = it->get<0>().get();
            column->set_name(columnName);
    
            if(!it->get<1>())
                throw std::runtime_error("Empty column type in information schema");
    
            std::string columnType = it->get<1>().get();
            column->set_type(columnType);
    
            if(!it->get<2>())
                throw std::runtime_error("Empty column nullable in information schema");
    
            std::string isNullable = it->get<2>().get();
            column->set_nullable(isNullable);
    
            #ifdef VERBOSE_DEBUG
                INFO_STREAM << "ProtocolManager::createValidation() " << columnName
                    << " " << columnType << " " << isNullable << endl;
            #endif
        }
    
        return request_sp;
    }
    
    //==============================================================================
    //      ProtocolManager::createMetadata()
    //==============================================================================
    RequestSP ProtocolManager::createMetadata()
            throw(std::runtime_error, std::out_of_range)
    {
        DEBUG_STREAM << "ProtocolManager::createMetadata()" << endl;
    
        RequestSP request_sp(new Request);
    
        request_sp->set_type(Request::METADATA);
    
        std::string schema = m_configuration_sp->getDatabaseSchema();
        std::string table = m_configuration_sp->getDatabaseTable();
        std::string rschema = m_configuration_sp->getRemoteSchema();
        std::string rtable = m_configuration_sp->getRemoteTable();
    
        std::tm tmTimestamp = m_dBManager_sp->retrieveLastTimestamp(schema, table);
    
        boost::posix_time::ptime ptTimestamp =
            boost::posix_time::ptime_from_tm(tmTimestamp);
    
        INFO_STREAM << "ProtocolManager::createMetadata() Send schema "
            << rschema << " table " << rtable << " timestamp "
            << boost::posix_time::to_simple_string(ptTimestamp)
            << " to " << m_remoteEndpoint << endl;
    
        Request::Metadata* metadata = request_sp->mutable_metadata();
    
        metadata->set_timestamp(mktime(&tmTimestamp));
    
        return request_sp;
    }
    
    //==============================================================================
    //      ProtocolManager::processAuthroisation()
    //==============================================================================
    void ProtocolManager::processAuthroisation(ResponseSP response_sp)
        throw(std::runtime_error)
    {
        DEBUG_STREAM << "ProtocolManager::processAuthroisation()" << endl;
    
        const Response::Authorization& authorization = response_sp->authorization();
    
        if(authorization.state() == Response::Authorization::ACCEPTED)
        {
            INFO_STREAM << "ProtocolManager::processAuthroisation() State ACCEPTED "
                << "status " << authorization.status() << " from " << m_remoteEndpoint << endl;
    
            m_isAuthorised = true;
        }
        else
        {
            ERROR_STREAM << "ProtocolManager::processAuthroisation() State REJECTED "
                << "status " << authorization.status() << " from " << m_remoteEndpoint << endl;
    
            throw std::runtime_error(authorization.status());
        }
    }
    
    //==============================================================================
    //      ProtocolManager::processValidation()
    //==============================================================================
    void ProtocolManager::processValidation(ResponseSP response_sp)
        throw(std::runtime_error)
    {
        DEBUG_STREAM << "ProtocolManager::processValidation()" << endl;
    
        const Response::Validation& validation = response_sp->validation();
    
        if(validation.state() == Response::Validation::ACCEPTED)
        {
            INFO_STREAM << "ProtocolManager::processValidation() State ACCEPTED "
                << "status " << validation.status() << " from " << m_remoteEndpoint << endl;
    
            m_isValidated = true;
        }
        else
        {
            ERROR_STREAM << "ProtocolManager::processValidation() State REJECTED "
                << "status " << validation.status() << " from " << m_remoteEndpoint << endl;
    
            throw std::runtime_error(validation.status());
        }
    }
    
    //==============================================================================
    //      ProtocolManager::processMetadata()
    //==============================================================================
    void ProtocolManager::processMetadata(ResponseSP response_sp)
        throw(std::runtime_error, std::out_of_range)
    {
        DEBUG_STREAM << "ProtocolManager::processMetadata()" << endl;
    
        const Response::Metadata& metadata = response_sp->metadata();
    
        if(metadata.state() == Response::Metadata::ACCEPTED)
        {
            std::string schema = m_configuration_sp->getDatabaseSchema();
            std::string table = m_configuration_sp->getDatabaseTable();
            std::string rschema = m_configuration_sp->getRemoteSchema();
            std::string rtable = m_configuration_sp->getRemoteTable();
    
            INFO_STREAM << "ProtocolManager::processMetadata() State ACCEPTED "
                << "status " << metadata.status() << " schema " << rschema
                << " table " << rtable << " from " << m_remoteEndpoint << endl;
    
            if(metadata.rows_size() != 0)
            {
                m_dBManager_sp->persistMetadata(schema, table, metadata);
    
                m_hasMoreData = true;
            }
            else
            {
                m_hasMoreData = false;
            }
    
            m_metadataImporter_p->incrementTupleCounter(metadata.rows_size());
        }
        else
        {
            ERROR_STREAM << "ProtocolManager::processMetadata() State REJECTED "
                << "status " << metadata.status() << " from " << m_remoteEndpoint << endl;
    
            throw std::runtime_error(metadata.status());
        }
    }
    
    }   //namespace