Skip to content
Snippets Groups Projects
Select Git revision
  • 941d423117d0031251683765b926c029aa263392
  • master default
  • rocky-linux-9
  • development
  • v1.0.4
  • v1.0.3
  • v1.0.2
7 results

MySQLJWKSDAO.php

Blame
  • Client.cpp 15.78 KiB
    #include <Client.h>
    
    #include <boost/lexical_cast.hpp>
    #include <boost/bind.hpp>
    #include <fstream>
    
    namespace DataImporter_ns
    {
    
    //==============================================================================
    //      Client::Client()
    //==============================================================================
    Client::Client(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp) :
        Tango::LogAdapter(deviceImpl_p), m_deviceImpl_p(deviceImpl_p),
        m_configuration_sp(configuration_sp),  m_resolver(m_ioService),
        m_resetConnectionTimer(m_ioService), m_listsUpdateTimer(m_ioService)
    {
        DEBUG_STREAM << "Client::Client()" << endl;
    
        GOOGLE_PROTOBUF_VERIFY_VERSION;
    
        m_dBManager_sp = DBManager::create(deviceImpl_p, configuration_sp);
    
        m_state = Tango::OFF;
        m_status="Disconnected";
    }
    
    //==============================================================================
    //      Client::~Client()
    //==============================================================================
    Client::~Client()
    {
        DEBUG_STREAM << "Client::~Client()" << endl;
    
        m_ioService.stop();
    
        m_work_sp.reset();
    
        if(m_thread_sp)
        {
            //m_thread_sp->interrupt();
    
            m_thread_sp->join();
        }
    
        google::protobuf::ShutdownProtobufLibrary();
    }
    
    //==============================================================================
    //      Client::start()
    //==============================================================================
    void Client::start()
    {
        DEBUG_STREAM << "Client::start()" << endl;
    
        m_dBManager_sp->connectAll();
    
        m_protocolManager_sp = ProtocolManager::create(m_deviceImpl_p,
            m_configuration_sp, m_dBManager_sp);
    
        m_ioService.reset();
    
        m_work_sp.reset(new boost::asio::io_service::work(m_ioService));
    
        m_thread_sp.reset(new boost::thread(boost::bind(&Client::run, this)));
    
        Client::startUpdateLists();
    }
    
    //==============================================================================
    //      Client::stop()
    //==============================================================================
    void Client::stop()
    {
        DEBUG_STREAM << "Client::stop()" << endl;
    
        closeConnection();
    
        m_ioService.stop();
    
        m_work_sp.reset();
    
        if(m_thread_sp)
        {
            //m_thread_sp->interrupt();
    
            m_thread_sp->join();
        }
    
        m_thread_sp.reset();
    
        m_protocolManager_sp.reset();
    
        m_dBManager_sp->disconnectAll();
    
        writeState(Tango::OFF);
        writeStatus("Database loop paused");
    }
    
    //==============================================================================
    //      Client::readState()
    //==============================================================================
    Tango::DevState Client::readState()
    {
        DEBUG_STREAM << "Client::readState()" << endl;
    
        boost::mutex::scoped_lock stateLock(m_stateMutex);
    
        return m_state;
    }
    
    //==============================================================================
    //      Client::readStatus()
    //==============================================================================
    std::string Client::readStatus()
    {
        DEBUG_STREAM << "Client::readStatus()" << endl;
    
        boost::mutex::scoped_lock statusLock(m_statusMutex);
    
        return m_status;
    }
    
    //==============================================================================
    //      Client::writeState()
    //==============================================================================
    void Client::writeState(Tango::DevState state)
    {
        DEBUG_STREAM << "Client::writeState()" << endl;
    
        boost::mutex::scoped_lock stateLock(m_stateMutex);
    
        m_state = state;
    }
    
    //==============================================================================
    //      Client::writeStatus()
    //==============================================================================
    void Client::writeStatus(std::string status)
    {
        DEBUG_STREAM << "Client::writeStatus()" << endl;
    
        boost::mutex::scoped_lock statusLock(m_statusMutex);
    
        m_status = status;
    }
    
    //==============================================================================
    //      Client::run()
    //==============================================================================
    void Client::run()
    {
        DEBUG_STREAM << "Client::run() Starting" << endl;
    
        while(true)
        {
            try
            {
                boost::system::error_code ec;
                m_ioService.run(ec);
    
                if(ec)
                {
                    ERROR_STREAM << "Client::run() " << ec.message() << endl;
                }
                break;
            }
            catch(std::exception& ex)
            {
                ERROR_STREAM << "Client::run() " << ex.what() << endl;
            }
            catch(boost::thread_interrupted& ex)
            {
                DEBUG_STREAM << "Client::run() interrupt" << endl;
                break;
            }
        }
    
        DEBUG_STREAM << "Client::run() Stopping" << endl;
    }
    
    //==============================================================================
    //      Client::startUpdateLists()
    //==============================================================================
    void Client::startUpdateLists()
    {
        DEBUG_STREAM << "Client::startUpdateLists()" << endl;
    
        try
        {
            m_protocolManager_sp->updateFileLists();
    
            writeState(Tango::ON);
            writeStatus("Database loop active");
        }
        catch(std::exception& ec)
        {
            ERROR_STREAM << "Client::startUpdateLists() " << ec.what() << endl;
    
            writeState(Tango::ALARM);
            writeStatus(ec.what());
        }
        catch(...)
        {
            ERROR_STREAM << "Client::startUpdateLists() Unknown error" << endl;
    
            writeState(Tango::ALARM);
            writeStatus("Unknown error");
        }
    
        if(readState() != Tango::ALARM && m_protocolManager_sp->hasNextFile())
        {
            startResolve();
        }
        else
        {
            m_listsUpdateTimer.expires_from_now(
                boost::posix_time::seconds(m_configuration_sp->getRefreshTime()));
    
            m_listsUpdateTimer.async_wait(boost::bind(&Client::handleUpdateLists,
                this, boost::asio::placeholders::error));
        }
    }
    
    //==============================================================================
    //      Client::handleUpdateLists()
    //==============================================================================
    void Client::handleUpdateLists(const boost::system::error_code& errorCode)
    {
        DEBUG_STREAM << "Client::handleUpdateLists()" << endl;
    
        if(!errorCode)
        {
            startUpdateLists();
        }
        else if(errorCode == boost::asio::error::operation_aborted)
        {
            DEBUG_STREAM << "Client::handleUpdateLists() STOP" << endl;
        }
        else
        {
            ERROR_STREAM << "Client::handleResolve() " << errorCode.message() << endl;
    
            writeState(Tango::ALARM);
            writeStatus(errorCode.message());
        }
    }
    
    //==============================================================================
    //      Client::startResolve()
    //==============================================================================
    void Client::startResolve()
    {
        DEBUG_STREAM << "Client::startResolve()" << endl;
    
        std::stringstream infoStream;
        infoStream << "Resolving host: " << m_configuration_sp->getRemoteHost()
            << " port: " << m_configuration_sp->getRemotePort();
    
        INFO_STREAM << "Client::startResolve() " << infoStream.str() << endl;
    
        writeState(Tango::RUNNING);
        writeStatus(infoStream.str());
    
        boost::asio::ip::tcp::resolver::query query(m_configuration_sp->getRemoteHost(),
            boost::lexical_cast<std::string>(m_configuration_sp->getRemotePort()));
    
        m_resetConnectionTimer.expires_from_now(
            boost::posix_time::seconds(m_configuration_sp->getTimeout()));
    
        m_resolver.async_resolve(query, boost::bind(&Client::handleResolve, this,
            boost::asio::placeholders::error, boost::asio::placeholders::iterator));
    
        m_resetConnectionTimer.async_wait(boost::bind(&Client::resetConnection, this));
    }
    
    //==============================================================================
    //      Client::handleResolve()
    //==============================================================================
    void Client::handleResolve(const boost::system::error_code& errorCode,
        boost::asio::ip::tcp::resolver::iterator endPointIterator)
    {
        DEBUG_STREAM << "Client::handleResolve()" << endl;
    
        if(!errorCode)
        {
            startConnect(endPointIterator);
        }
        else
        {
            ERROR_STREAM << "Client::handleResolve() " << errorCode.message() << endl;
    
            writeState(Tango::ALARM);
            writeStatus(errorCode.message());
        }
    }
    
    //==============================================================================
    //      Client::handleRequest()
    //==============================================================================
    void Client::handleWriteRequest(const boost::system::error_code& errorCode)
    {
        DEBUG_STREAM << "Client::handleRequest()" << endl;
    
        if(!errorCode)
        {
            startReadResponseHeader();
        }
        else
        {
            ERROR_STREAM << "Client::handleRequest() " << errorCode.message() << endl;
    
            writeState(Tango::ALARM);
            writeStatus(errorCode.message());
        }
    }
    
    //==============================================================================
    //      Client::handleReadResponseHeader()
    //==============================================================================
    void Client::handleReadResponseHeader(const boost::system::error_code& errorCode)
    {
        DEBUG_STREAM << "Client::handleReadResponseHeader()" << endl;
    
        if(!errorCode)
        {
            boost::uint32_t bodySize = decodeHeader(m_readBuff);
    
            startReadResponseBody(bodySize);
        }
        else
        {
            ERROR_STREAM << "Client::handleReadResponseHeader() " << errorCode.message() << endl;
    
            writeState(Tango::ALARM);
            writeStatus(errorCode.message());
        }
    }
    
    //==============================================================================
    //      Client::handleReadResponseBody()
    //==============================================================================
    void Client::handleReadResponseBody(const boost::system::error_code& errorCode)
    {
        DEBUG_STREAM << "Client::handleReadResponseBody()" << endl;
    
        if(!errorCode)
        {
            try
            {
                ResponseSP response_sp(new Response);
    
                response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE],
                    m_readBuff.size() - HEADER_SIZE);
    
                startReadData(m_protocolManager_sp->processResponse(response_sp));
            }
            catch(std::logic_error& ec)
            {
                WARN_STREAM << "Client::handleReadResponseBody() " << ec.what() << endl;
    
                onTransferFailed();
            }
            catch(std::runtime_error& ec)
            {
                ERROR_STREAM << "Client::handleReadResponseBody() " << ec.what() << endl;
    
                writeState(Tango::ALARM);
                writeStatus(ec.what());
            }
            catch(...)
            {
                ERROR_STREAM << "Client::handleReadResponseBody() Unknown error" << endl;
    
                writeState(Tango::ALARM);
                writeStatus("Unknown error");
            }
        }
        else
        {
            ERROR_STREAM << "Client::handleReadResponseBody() " << errorCode.message() << endl;
    
            writeState(Tango::ALARM);
            writeStatus(errorCode.message());
        }
    }
    
    //==============================================================================
    //      Client::handleReadData()
    //==============================================================================
    void Client::handleReadData(FileWrapper::SP fileWrapper_sp, std::size_t recvBytes,
        const boost::system::error_code& errorCode)
    {
        if(!errorCode)
        {
            if(!fileWrapper_sp->isBad())
            {
                if(recvBytes>0)
                    fileWrapper_sp->write(m_fileBuff, recvBytes);
    
                if(!fileWrapper_sp->isCompleted())
                {
                    startReadData(fileWrapper_sp);
                }
                else
                {
                    onTransferCompleted(fileWrapper_sp);
                }
            }
            else
            {
                WARN_STREAM << "Client::handleReadData() bad I/O" << endl;
    
                fileWrapper_sp->cleanUp();
    
                onTransferFailed();
            }
        }
        else
        {
            ERROR_STREAM << "Client::handleReadData() "
                << errorCode.message() << " from " << m_remoteEndpoint << endl;
    
            writeState(Tango::ALARM);
            writeStatus(errorCode.message());
        }
    }
    
    //==============================================================================
    //      Client::onTransferCompleted()
    //==============================================================================
    void Client::onTransferCompleted(FileWrapper::SP fileWrapper_sp)
    {
        DEBUG_STREAM << "Client::onTransferCompleted()" << endl;
    
        try
        {
            m_protocolManager_sp->setFileTransfered(fileWrapper_sp);
    
            if(m_protocolManager_sp->hasNextFile())
            {
                startWriteRequest();
            }
            else
            {
                closeConnection();
    
                startUpdateLists();
            }
        }
        catch(std::exception& ec)
        {
            ERROR_STREAM << "Client::onTransferCompleted() " << ec.what() << endl;
    
            writeState(Tango::ALARM);
            writeStatus(ec.what());
        }
        catch(...)
        {
            ERROR_STREAM << "Client::onTransferCompleted() Unknown error" << endl;
    
            writeState(Tango::ALARM);
            writeStatus("Unknown error");
        }
    }
    
    //==============================================================================
    //      Client::onTransferFailed()
    //==============================================================================
    void Client::onTransferFailed()
    {
        DEBUG_STREAM << "Client::onTransferFailed()" << endl;
    
        try
        {
            m_protocolManager_sp->setFileFailed();
    
            if(m_protocolManager_sp->hasNextFile())
            {
                startWriteRequest();
            }
            else
            {
                closeConnection();
    
                startUpdateLists();
            }
        }
        catch(std::exception& ec)
        {
            ERROR_STREAM << "Client::onTransferFailed() " << ec.what() << endl;
    
            writeState(Tango::ALARM);
            writeStatus(ec.what());
        }
        catch(...)
        {
            ERROR_STREAM << "Client::onTransferFailed() Unknown error" << endl;
    
            writeState(Tango::ALARM);
            writeStatus("Unknown error");
        }
    }
    
    //==============================================================================
    //      Client::resetConnection()
    //==============================================================================
    void Client::resetConnection()
    {
        DEBUG_STREAM << "Client::resetConnection()" << endl;
    
        if(m_resetConnectionTimer.expires_at() <=
                boost::asio::deadline_timer::traits_type::now())
        {
            ERROR_STREAM << "Client::resetConnection() Connection timeout" << endl;
    
            closeConnection();
    
            startUpdateLists();
        }
    }
    
    //==============================================================================
    //      Client::encodeHeader()
    //==============================================================================
    void Client::encodeHeader(std::vector<boost::uint8_t>& buf, boost::uint32_t size)
        throw(std::runtime_error)
    {
        DEBUG_STREAM << "Client::encodeHeader()" << endl;
    
        if(buf.size() < HEADER_SIZE)
            throw std::runtime_error("Buffer to small to contain header!");
    
        buf[0] = static_cast<boost::uint8_t>((size >> 24) & 0xFF);
        buf[1] = static_cast<boost::uint8_t>((size >> 16) & 0xFF);
        buf[2] = static_cast<boost::uint8_t>((size >> 8) & 0xFF);
        buf[3] = static_cast<boost::uint8_t>(size & 0xFF);
    }
    
    //==============================================================================
    //      Client::decodeHeader()
    //==============================================================================
    boost::uint32_t Client::decodeHeader(std::vector<boost::uint8_t>& buf)
        throw(std::runtime_error)
    {
        DEBUG_STREAM << "Client::decodeHeader()" << endl;
    
        if(buf.size() < HEADER_SIZE)
            throw std::runtime_error("Buffer to small to contain header!");
    
        boost::uint32_t size = 0;
    
        for (unsigned i = 0; i < HEADER_SIZE; ++i)
            size = size * 256 + (static_cast<unsigned>(buf[i]) & 0xFF);
    
        return size;
    }
    
    }   //namespace