Select Git revision
import_executor.py
-
Cristiano Urban authored
Signed-off-by:
Cristiano Urban <cristiano.urban@inaf.it>
Cristiano Urban authoredSigned-off-by:
Cristiano Urban <cristiano.urban@inaf.it>
Client.cpp 15.78 KiB
#include <Client.h>
#include <boost/lexical_cast.hpp>
#include <boost/bind.hpp>
#include <fstream>
namespace DataImporter_ns
{
//==============================================================================
// Client::Client()
//==============================================================================
Client::Client(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp) :
Tango::LogAdapter(deviceImpl_p), m_deviceImpl_p(deviceImpl_p),
m_configuration_sp(configuration_sp), m_resolver(m_ioService),
m_resetConnectionTimer(m_ioService), m_listsUpdateTimer(m_ioService)
{
DEBUG_STREAM << "Client::Client()" << endl;
GOOGLE_PROTOBUF_VERIFY_VERSION;
m_dBManager_sp = DBManager::create(deviceImpl_p, configuration_sp);
m_state = Tango::OFF;
m_status="Disconnected";
}
//==============================================================================
// Client::~Client()
//==============================================================================
Client::~Client()
{
DEBUG_STREAM << "Client::~Client()" << endl;
m_ioService.stop();
m_work_sp.reset();
if(m_thread_sp)
{
//m_thread_sp->interrupt();
m_thread_sp->join();
}
google::protobuf::ShutdownProtobufLibrary();
}
//==============================================================================
// Client::start()
//==============================================================================
void Client::start()
{
DEBUG_STREAM << "Client::start()" << endl;
m_dBManager_sp->connectAll();
m_protocolManager_sp = ProtocolManager::create(m_deviceImpl_p,
m_configuration_sp, m_dBManager_sp);
m_ioService.reset();
m_work_sp.reset(new boost::asio::io_service::work(m_ioService));
m_thread_sp.reset(new boost::thread(boost::bind(&Client::run, this)));
Client::startUpdateLists();
}
//==============================================================================
// Client::stop()
//==============================================================================
void Client::stop()
{
DEBUG_STREAM << "Client::stop()" << endl;
closeConnection();
m_ioService.stop();
m_work_sp.reset();
if(m_thread_sp)
{
//m_thread_sp->interrupt();
m_thread_sp->join();
}
m_thread_sp.reset();
m_protocolManager_sp.reset();
m_dBManager_sp->disconnectAll();
writeState(Tango::OFF);
writeStatus("Database loop paused");
}
//==============================================================================
// Client::readState()
//==============================================================================
Tango::DevState Client::readState()
{
DEBUG_STREAM << "Client::readState()" << endl;
boost::mutex::scoped_lock stateLock(m_stateMutex);
return m_state;
}
//==============================================================================
// Client::readStatus()
//==============================================================================
std::string Client::readStatus()
{
DEBUG_STREAM << "Client::readStatus()" << endl;
boost::mutex::scoped_lock statusLock(m_statusMutex);
return m_status;
}
//==============================================================================
// Client::writeState()
//==============================================================================
void Client::writeState(Tango::DevState state)
{
DEBUG_STREAM << "Client::writeState()" << endl;
boost::mutex::scoped_lock stateLock(m_stateMutex);
m_state = state;
}
//==============================================================================
// Client::writeStatus()
//==============================================================================
void Client::writeStatus(std::string status)
{
DEBUG_STREAM << "Client::writeStatus()" << endl;
boost::mutex::scoped_lock statusLock(m_statusMutex);
m_status = status;
}
//==============================================================================
// Client::run()
//==============================================================================
void Client::run()
{
DEBUG_STREAM << "Client::run() Starting" << endl;
while(true)
{
try
{
boost::system::error_code ec;
m_ioService.run(ec);
if(ec)
{
ERROR_STREAM << "Client::run() " << ec.message() << endl;
}
break;
}
catch(std::exception& ex)
{
ERROR_STREAM << "Client::run() " << ex.what() << endl;
}
catch(boost::thread_interrupted& ex)
{
DEBUG_STREAM << "Client::run() interrupt" << endl;
break;
}
}
DEBUG_STREAM << "Client::run() Stopping" << endl;
}
//==============================================================================
// Client::startUpdateLists()
//==============================================================================
void Client::startUpdateLists()
{
DEBUG_STREAM << "Client::startUpdateLists()" << endl;
try
{
m_protocolManager_sp->updateFileLists();
writeState(Tango::ON);
writeStatus("Database loop active");
}
catch(std::exception& ec)
{
ERROR_STREAM << "Client::startUpdateLists() " << ec.what() << endl;
writeState(Tango::ALARM);
writeStatus(ec.what());
}
catch(...)
{
ERROR_STREAM << "Client::startUpdateLists() Unknown error" << endl;
writeState(Tango::ALARM);
writeStatus("Unknown error");
}
if(readState() != Tango::ALARM && m_protocolManager_sp->hasNextFile())
{
startResolve();
}
else
{
m_listsUpdateTimer.expires_from_now(
boost::posix_time::seconds(m_configuration_sp->getRefreshTime()));
m_listsUpdateTimer.async_wait(boost::bind(&Client::handleUpdateLists,
this, boost::asio::placeholders::error));
}
}
//==============================================================================
// Client::handleUpdateLists()
//==============================================================================
void Client::handleUpdateLists(const boost::system::error_code& errorCode)
{
DEBUG_STREAM << "Client::handleUpdateLists()" << endl;
if(!errorCode)
{
startUpdateLists();
}
else if(errorCode == boost::asio::error::operation_aborted)
{
DEBUG_STREAM << "Client::handleUpdateLists() STOP" << endl;
}
else
{
ERROR_STREAM << "Client::handleResolve() " << errorCode.message() << endl;
writeState(Tango::ALARM);
writeStatus(errorCode.message());
}
}
//==============================================================================
// Client::startResolve()
//==============================================================================
void Client::startResolve()
{
DEBUG_STREAM << "Client::startResolve()" << endl;
std::stringstream infoStream;
infoStream << "Resolving host: " << m_configuration_sp->getRemoteHost()
<< " port: " << m_configuration_sp->getRemotePort();
INFO_STREAM << "Client::startResolve() " << infoStream.str() << endl;
writeState(Tango::RUNNING);
writeStatus(infoStream.str());
boost::asio::ip::tcp::resolver::query query(m_configuration_sp->getRemoteHost(),
boost::lexical_cast<std::string>(m_configuration_sp->getRemotePort()));
m_resetConnectionTimer.expires_from_now(
boost::posix_time::seconds(m_configuration_sp->getTimeout()));
m_resolver.async_resolve(query, boost::bind(&Client::handleResolve, this,
boost::asio::placeholders::error, boost::asio::placeholders::iterator));
m_resetConnectionTimer.async_wait(boost::bind(&Client::resetConnection, this));
}
//==============================================================================
// Client::handleResolve()
//==============================================================================
void Client::handleResolve(const boost::system::error_code& errorCode,
boost::asio::ip::tcp::resolver::iterator endPointIterator)
{
DEBUG_STREAM << "Client::handleResolve()" << endl;
if(!errorCode)
{
startConnect(endPointIterator);
}
else
{
ERROR_STREAM << "Client::handleResolve() " << errorCode.message() << endl;
writeState(Tango::ALARM);
writeStatus(errorCode.message());
}
}
//==============================================================================
// Client::handleRequest()
//==============================================================================
void Client::handleWriteRequest(const boost::system::error_code& errorCode)
{
DEBUG_STREAM << "Client::handleRequest()" << endl;
if(!errorCode)
{
startReadResponseHeader();
}
else
{
ERROR_STREAM << "Client::handleRequest() " << errorCode.message() << endl;
writeState(Tango::ALARM);
writeStatus(errorCode.message());
}
}
//==============================================================================
// Client::handleReadResponseHeader()
//==============================================================================
void Client::handleReadResponseHeader(const boost::system::error_code& errorCode)
{
DEBUG_STREAM << "Client::handleReadResponseHeader()" << endl;
if(!errorCode)
{
boost::uint32_t bodySize = decodeHeader(m_readBuff);
startReadResponseBody(bodySize);
}
else
{
ERROR_STREAM << "Client::handleReadResponseHeader() " << errorCode.message() << endl;
writeState(Tango::ALARM);
writeStatus(errorCode.message());
}
}
//==============================================================================
// Client::handleReadResponseBody()
//==============================================================================
void Client::handleReadResponseBody(const boost::system::error_code& errorCode)
{
DEBUG_STREAM << "Client::handleReadResponseBody()" << endl;
if(!errorCode)
{
try
{
ResponseSP response_sp(new Response);
response_sp->ParseFromArray(&m_readBuff[HEADER_SIZE],
m_readBuff.size() - HEADER_SIZE);
startReadData(m_protocolManager_sp->processResponse(response_sp));
}
catch(std::logic_error& ec)
{
WARN_STREAM << "Client::handleReadResponseBody() " << ec.what() << endl;
onTransferFailed();
}
catch(std::runtime_error& ec)
{
ERROR_STREAM << "Client::handleReadResponseBody() " << ec.what() << endl;
writeState(Tango::ALARM);
writeStatus(ec.what());
}
catch(...)
{
ERROR_STREAM << "Client::handleReadResponseBody() Unknown error" << endl;
writeState(Tango::ALARM);
writeStatus("Unknown error");
}
}
else
{
ERROR_STREAM << "Client::handleReadResponseBody() " << errorCode.message() << endl;
writeState(Tango::ALARM);
writeStatus(errorCode.message());
}
}
//==============================================================================
// Client::handleReadData()
//==============================================================================
void Client::handleReadData(FileWrapper::SP fileWrapper_sp, std::size_t recvBytes,
const boost::system::error_code& errorCode)
{
if(!errorCode)
{
if(!fileWrapper_sp->isBad())
{
if(recvBytes>0)
fileWrapper_sp->write(m_fileBuff, recvBytes);
if(!fileWrapper_sp->isCompleted())
{
startReadData(fileWrapper_sp);
}
else
{
onTransferCompleted(fileWrapper_sp);
}
}
else
{
WARN_STREAM << "Client::handleReadData() bad I/O" << endl;
fileWrapper_sp->cleanUp();
onTransferFailed();
}
}
else
{
ERROR_STREAM << "Client::handleReadData() "
<< errorCode.message() << " from " << m_remoteEndpoint << endl;
writeState(Tango::ALARM);
writeStatus(errorCode.message());
}
}
//==============================================================================
// Client::onTransferCompleted()
//==============================================================================
void Client::onTransferCompleted(FileWrapper::SP fileWrapper_sp)
{
DEBUG_STREAM << "Client::onTransferCompleted()" << endl;
try
{
m_protocolManager_sp->setFileTransfered(fileWrapper_sp);
if(m_protocolManager_sp->hasNextFile())
{
startWriteRequest();
}
else
{
closeConnection();
startUpdateLists();
}
}
catch(std::exception& ec)
{
ERROR_STREAM << "Client::onTransferCompleted() " << ec.what() << endl;
writeState(Tango::ALARM);
writeStatus(ec.what());
}
catch(...)
{
ERROR_STREAM << "Client::onTransferCompleted() Unknown error" << endl;
writeState(Tango::ALARM);
writeStatus("Unknown error");
}
}
//==============================================================================
// Client::onTransferFailed()
//==============================================================================
void Client::onTransferFailed()
{
DEBUG_STREAM << "Client::onTransferFailed()" << endl;
try
{
m_protocolManager_sp->setFileFailed();
if(m_protocolManager_sp->hasNextFile())
{
startWriteRequest();
}
else
{
closeConnection();
startUpdateLists();
}
}
catch(std::exception& ec)
{
ERROR_STREAM << "Client::onTransferFailed() " << ec.what() << endl;
writeState(Tango::ALARM);
writeStatus(ec.what());
}
catch(...)
{
ERROR_STREAM << "Client::onTransferFailed() Unknown error" << endl;
writeState(Tango::ALARM);
writeStatus("Unknown error");
}
}
//==============================================================================
// Client::resetConnection()
//==============================================================================
void Client::resetConnection()
{
DEBUG_STREAM << "Client::resetConnection()" << endl;
if(m_resetConnectionTimer.expires_at() <=
boost::asio::deadline_timer::traits_type::now())
{
ERROR_STREAM << "Client::resetConnection() Connection timeout" << endl;
closeConnection();
startUpdateLists();
}
}
//==============================================================================
// Client::encodeHeader()
//==============================================================================
void Client::encodeHeader(std::vector<boost::uint8_t>& buf, boost::uint32_t size)
throw(std::runtime_error)
{
DEBUG_STREAM << "Client::encodeHeader()" << endl;
if(buf.size() < HEADER_SIZE)
throw std::runtime_error("Buffer to small to contain header!");
buf[0] = static_cast<boost::uint8_t>((size >> 24) & 0xFF);
buf[1] = static_cast<boost::uint8_t>((size >> 16) & 0xFF);
buf[2] = static_cast<boost::uint8_t>((size >> 8) & 0xFF);
buf[3] = static_cast<boost::uint8_t>(size & 0xFF);
}
//==============================================================================
// Client::decodeHeader()
//==============================================================================
boost::uint32_t Client::decodeHeader(std::vector<boost::uint8_t>& buf)
throw(std::runtime_error)
{
DEBUG_STREAM << "Client::decodeHeader()" << endl;
if(buf.size() < HEADER_SIZE)
throw std::runtime_error("Buffer to small to contain header!");
boost::uint32_t size = 0;
for (unsigned i = 0; i < HEADER_SIZE; ++i)
size = size * 256 + (static_cast<unsigned>(buf[i]) & 0xFF);
return size;
}
} //namespace