#include #include #include #include namespace DataImporter_ns { //============================================================================== // Client::Client() //============================================================================== Client::Client(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp) : Tango::LogAdapter(deviceImpl_p), m_deviceImpl_p(deviceImpl_p), m_configuration_sp(configuration_sp), m_resolver(m_ioService), m_resetConnectionTimer(m_ioService), m_listsUpdateTimer(m_ioService) { DEBUG_STREAM << "Client::Client()" << endl; GOOGLE_PROTOBUF_VERIFY_VERSION; m_dBManager_sp = DBManager::create(deviceImpl_p, configuration_sp); m_state = Tango::OFF; m_status="Disconnected"; } //============================================================================== // Client::~Client() //============================================================================== Client::~Client() { DEBUG_STREAM << "Client::~Client()" << endl; m_ioService.stop(); m_work_sp.reset(); if(m_thread_sp) { //m_thread_sp->interrupt(); m_thread_sp->join(); } google::protobuf::ShutdownProtobufLibrary(); } //============================================================================== // Client::start() //============================================================================== void Client::start() { DEBUG_STREAM << "Client::start()" << endl; m_dBManager_sp->connectAll(); m_protocolManager_sp = ProtocolManager::create(m_deviceImpl_p, m_configuration_sp, m_dBManager_sp); m_ioService.reset(); m_work_sp.reset(new boost::asio::io_service::work(m_ioService)); m_thread_sp.reset(new boost::thread(boost::bind(&Client::run, this))); Client::startUpdateLists(); } //============================================================================== // Client::stop() //============================================================================== void Client::stop() { DEBUG_STREAM << "Client::stop()" << endl; closeConnection(); m_ioService.stop(); m_work_sp.reset(); if(m_thread_sp) { //m_thread_sp->interrupt(); m_thread_sp->join(); } m_thread_sp.reset(); m_protocolManager_sp.reset(); m_dBManager_sp->disconnectAll(); writeState(Tango::OFF); writeStatus("Database loop paused"); } //============================================================================== // Client::readState() //============================================================================== Tango::DevState Client::readState() { DEBUG_STREAM << "Client::readState()" << endl; boost::mutex::scoped_lock stateLock(m_stateMutex); return m_state; } //============================================================================== // Client::readStatus() //============================================================================== std::string Client::readStatus() { DEBUG_STREAM << "Client::readStatus()" << endl; boost::mutex::scoped_lock statusLock(m_statusMutex); return m_status; } //============================================================================== // Client::writeState() //============================================================================== void Client::writeState(Tango::DevState state) { DEBUG_STREAM << "Client::writeState()" << endl; boost::mutex::scoped_lock stateLock(m_stateMutex); m_state = state; } //============================================================================== // Client::writeStatus() //============================================================================== void Client::writeStatus(std::string status) { DEBUG_STREAM << "Client::writeStatus()" << endl; boost::mutex::scoped_lock statusLock(m_statusMutex); m_status = status; } //============================================================================== // Client::run() //============================================================================== void Client::run() { DEBUG_STREAM << "Client::run() Starting" << endl; while(true) { try { boost::system::error_code ec; m_ioService.run(ec); if(ec) { ERROR_STREAM << "Client::run() " << ec.message() << endl; } break; } catch(std::exception& ex) { ERROR_STREAM << "Client::run() " << ex.what() << endl; } catch(boost::thread_interrupted& ex) { DEBUG_STREAM << "Client::run() interrupt" << endl; break; } } DEBUG_STREAM << "Client::run() Stopping" << endl; } //============================================================================== // Client::startUpdateLists() //============================================================================== void Client::startUpdateLists() { DEBUG_STREAM << "Client::startUpdateLists()" << endl; try { m_protocolManager_sp->updateFileLists(); writeState(Tango::ON); writeStatus("Database loop active"); } catch(std::exception& ec) { ERROR_STREAM << "Client::startUpdateLists() " << ec.what() << endl; writeState(Tango::ALARM); writeStatus(ec.what()); } catch(...) { ERROR_STREAM << "Client::startUpdateLists() Unknown error" << endl; writeState(Tango::ALARM); writeStatus("Unknown error"); } if(readState() != Tango::ALARM && m_protocolManager_sp->hasNextFile()) { startResolve(); } else { m_listsUpdateTimer.expires_from_now( boost::posix_time::seconds(m_configuration_sp->getRefreshTime())); m_listsUpdateTimer.async_wait(boost::bind(&Client::handleUpdateLists, this, boost::asio::placeholders::error)); } } //============================================================================== // Client::handleUpdateLists() //============================================================================== void Client::handleUpdateLists(const boost::system::error_code& errorCode) { DEBUG_STREAM << "Client::handleUpdateLists()" << endl; if(!errorCode) { startUpdateLists(); } else if(errorCode == boost::asio::error::operation_aborted) { DEBUG_STREAM << "Client::handleUpdateLists() STOP" << endl; } else { ERROR_STREAM << "Client::handleResolve() " << errorCode.message() << endl; writeState(Tango::ALARM); writeStatus(errorCode.message()); } } //============================================================================== // Client::startResolve() //============================================================================== void Client::startResolve() { DEBUG_STREAM << "Client::startResolve()" << endl; std::stringstream infoStream; infoStream << "Resolving host: " << m_configuration_sp->getRemoteHost() << " port: " << m_configuration_sp->getRemotePort(); INFO_STREAM << "Client::startResolve() " << infoStream.str() << endl; writeState(Tango::RUNNING); writeStatus(infoStream.str()); boost::asio::ip::tcp::resolver::query query(m_configuration_sp->getRemoteHost(), boost::lexical_cast(m_configuration_sp->getRemotePort())); m_resetConnectionTimer.expires_from_now( boost::posix_time::seconds(m_configuration_sp->getTimeout())); m_resolver.async_resolve(query, boost::bind(&Client::handleResolve, this, boost::asio::placeholders::error, boost::asio::placeholders::iterator)); m_resetConnectionTimer.async_wait(boost::bind(&Client::resetConnection, this)); } //============================================================================== // Client::handleResolve() //============================================================================== void Client::handleResolve(const boost::system::error_code& errorCode, boost::asio::ip::tcp::resolver::iterator endPointIterator) { DEBUG_STREAM << "Client::handleResolve()" << endl; if(!errorCode) { startConnect(endPointIterator); } else { ERROR_STREAM << "Client::handleResolve() " << errorCode.message() << endl; writeState(Tango::ALARM); writeStatus(errorCode.message()); } } //============================================================================== // Client::handleRequest() //============================================================================== void Client::handleWriteRequest(const boost::system::error_code& errorCode) { DEBUG_STREAM << "Client::handleRequest()" << endl; if(!errorCode) { startReadResponseHeader(); } else { ERROR_STREAM << "Client::handleRequest() " << errorCode.message() << endl; writeState(Tango::ALARM); writeStatus(errorCode.message()); } } //============================================================================== // Client::handleReadResponseHeader() //============================================================================== void Client::handleReadResponseHeader(const boost::system::error_code& errorCode) { DEBUG_STREAM << "Client::handleReadResponseHeader()" << endl; if(!errorCode) { boost::uint32_t bodySize = decodeHeader(m_readBuff); startReadResponseBody(bodySize); } else { ERROR_STREAM << "Client::handleReadResponseHeader() " << errorCode.message() << endl; writeState(Tango::ALARM); writeStatus(errorCode.message()); } } //============================================================================== // Client::handleReadResponseBody() //============================================================================== void Client::handleReadResponseBody(const boost::system::error_code& errorCode) { DEBUG_STREAM << "Client::handleReadResponseBody()" << endl; if(!errorCode) { try { ResponseSP response_sp(new Response); response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE], m_readBuff.size() - HEADER_SIZE); startReadData(m_protocolManager_sp->processResponse(response_sp)); } catch(std::logic_error& ec) { WARN_STREAM << "Client::handleReadResponseBody() " << ec.what() << endl; onTransferFailed(); } catch(std::runtime_error& ec) { ERROR_STREAM << "Client::handleReadResponseBody() " << ec.what() << endl; writeState(Tango::ALARM); writeStatus(ec.what()); } catch(...) { ERROR_STREAM << "Client::handleReadResponseBody() Unknown error" << endl; writeState(Tango::ALARM); writeStatus("Unknown error"); } } else { ERROR_STREAM << "Client::handleReadResponseBody() " << errorCode.message() << endl; writeState(Tango::ALARM); writeStatus(errorCode.message()); } } //============================================================================== // Client::handleReadData() //============================================================================== void Client::handleReadData(FileWrapper::SP fileWrapper_sp, std::size_t recvBytes, const boost::system::error_code& errorCode) { if(!errorCode) { if(!fileWrapper_sp->isBad()) { if(recvBytes>0) fileWrapper_sp->write(m_fileBuff, recvBytes); if(!fileWrapper_sp->isCompleted()) { startReadData(fileWrapper_sp); } else { onTransferCompleted(fileWrapper_sp); } } else { WARN_STREAM << "Client::handleReadData() bad I/O" << endl; fileWrapper_sp->cleanUp(); onTransferFailed(); } } else { ERROR_STREAM << "Client::handleReadData() " << errorCode.message() << " from " << m_remoteEndpoint << endl; writeState(Tango::ALARM); writeStatus(errorCode.message()); } } //============================================================================== // Client::onTransferCompleted() //============================================================================== void Client::onTransferCompleted(FileWrapper::SP fileWrapper_sp) { DEBUG_STREAM << "Client::onTransferCompleted()" << endl; try { m_protocolManager_sp->setFileTransfered(fileWrapper_sp); if(m_protocolManager_sp->hasNextFile()) { startWriteRequest(); } else { closeConnection(); startUpdateLists(); } } catch(std::exception& ec) { ERROR_STREAM << "Client::onTransferCompleted() " << ec.what() << endl; writeState(Tango::ALARM); writeStatus(ec.what()); } catch(...) { ERROR_STREAM << "Client::onTransferCompleted() Unknown error" << endl; writeState(Tango::ALARM); writeStatus("Unknown error"); } } //============================================================================== // Client::onTransferFailed() //============================================================================== void Client::onTransferFailed() { DEBUG_STREAM << "Client::onTransferFailed()" << endl; try { m_protocolManager_sp->setFileFailed(); if(m_protocolManager_sp->hasNextFile()) { startWriteRequest(); } else { closeConnection(); startUpdateLists(); } } catch(std::exception& ec) { ERROR_STREAM << "Client::onTransferFailed() " << ec.what() << endl; writeState(Tango::ALARM); writeStatus(ec.what()); } catch(...) { ERROR_STREAM << "Client::onTransferFailed() Unknown error" << endl; writeState(Tango::ALARM); writeStatus("Unknown error"); } } //============================================================================== // Client::resetConnection() //============================================================================== void Client::resetConnection() { DEBUG_STREAM << "Client::resetConnection()" << endl; if(m_resetConnectionTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()) { ERROR_STREAM << "Client::resetConnection() Connection timeout" << endl; closeConnection(); startUpdateLists(); } } //============================================================================== // Client::encodeHeader() //============================================================================== void Client::encodeHeader(std::vector& buf, boost::uint32_t size) throw(std::runtime_error) { DEBUG_STREAM << "Client::encodeHeader()" << endl; if(buf.size() < HEADER_SIZE) throw std::runtime_error("Buffer to small to contain header!"); buf[0] = static_cast((size >> 24) & 0xFF); buf[1] = static_cast((size >> 16) & 0xFF); buf[2] = static_cast((size >> 8) & 0xFF); buf[3] = static_cast(size & 0xFF); } //============================================================================== // Client::decodeHeader() //============================================================================== boost::uint32_t Client::decodeHeader(std::vector& buf) throw(std::runtime_error) { DEBUG_STREAM << "Client::decodeHeader()" << endl; if(buf.size() < HEADER_SIZE) throw std::runtime_error("Buffer to small to contain header!"); boost::uint32_t size = 0; for (unsigned i = 0; i < HEADER_SIZE; ++i) size = size * 256 + (static_cast(buf[i]) & 0xFF); return size; } } //namespace