Skip to content
Snippets Groups Projects
Commit 207b9160 authored by Marco De Marco's avatar Marco De Marco
Browse files

Worker thread, event buffer, event thread and Configuration added

parent d246fe24
No related branches found
No related tags found
No related merge requests found
......@@ -16,7 +16,8 @@ CC=g++
CXX_DEBUG_FLAGS=-g -DVERBOSE_DEBUG
CXX_RELEASE_FLAGS=-O3
CXX_DEFAULT_FLAGS=-c -Wall -Wextra -std=c++11 -std=gnu++11
LDFLAGS=-Wall -lomniORB4 -lomniDynamic4 -lCOS4 -lomnithread -ltango -llog4tango
LDFLAGS=-Wall -lomniORB4 -lomniDynamic4 -lCOS4 -lomnithread -ltango -llog4tango \
-lboost_thread -lboost_filesystem -lboost_system
INC_PARM=$(foreach d, $(INC_DIR), -I$d)
LIB_PARM=$(foreach d, $(LIB_DIR), -L$d)
#================================================================================
......
/*----- PROTECTED REGION ID(PreProcessor::ClassFactory.cpp) ENABLED START -----*/
static const char *RcsId = "$Id: $";
//=============================================================================
//
// file : ClassFactory.cpp
......
#ifndef CONFIGURATION_H
#define CONFIGURATION_H
#include <iostream>
#include <stdint.h>
#include <vector>
#include <boost/shared_ptr.hpp>
namespace PreProcessor_ns
{
class Configuration
{
public:
//------------------------------------------------------------------------------
// [Public] Shared pointer typedef
//------------------------------------------------------------------------------
typedef boost::shared_ptr<Configuration> SP;
private:
//------------------------------------------------------------------------------
// [Private] Constructor destructor deleter
//------------------------------------------------------------------------------
Configuration(std::string watchPath, int workerNumber, int sleepTime, int waitTime,
int connectionNumber, uint32_t iNotifyMask): m_watchPath(watchPath),
m_workerNumber(workerNumber), m_sleepTime(sleepTime), m_waitTime(waitTime),
m_connectionNumber(connectionNumber), m_iNotifyMask(iNotifyMask) {}
virtual ~Configuration() {}
class Deleter;
friend class Deleter;
class Deleter
{
public:
void operator()(Configuration* c) { delete c; }
};
public:
//------------------------------------------------------------------------------
// [Public] User methods
//------------------------------------------------------------------------------
static Configuration::SP create(std::string watchPath, int workerNumber,
int sleepTime, int waitTime, int connectionNumber, uint32_t iNotifyMask)
{
Configuration::SP c_sp(new Configuration(watchPath, workerNumber, sleepTime,
waitTime, connectionNumber, iNotifyMask), Configuration::Deleter());
return c_sp;
}
std::string getWatchPath() const { return m_watchPath; }
unsigned int getWorkerNumber() const { return m_workerNumber; }
unsigned int getSleepTime() const { return m_sleepTime; }
unsigned int getWaitTime() const { return m_waitTime; }
unsigned int getConnectionNumber() const { return m_connectionNumber; }
uint32_t getINotifyMask() const { return m_iNotifyMask; }
private:
//------------------------------------------------------------------------------
// [Private] class variables
//------------------------------------------------------------------------------
//INotify watch path
const std::string m_watchPath;
//Worker thread number
const unsigned int m_workerNumber;
//Event thread sleep time
const unsigned int m_sleepTime;
//Worker thread wait time
const unsigned int m_waitTime;
//Number of connection per destination
const unsigned int m_connectionNumber;
//INotify mask
const uint32_t m_iNotifyMask;
};
} //End of namespace
#endif /*!CONFIGURATION_H*/
#include <EventBuffer.h>
#include <boost/thread/locks.hpp>
namespace PreProcessor_ns
{
//==============================================================================
// EventBuffer::EventBuffer()
//==============================================================================
EventBuffer::EventBuffer(Tango::DeviceImpl* deviceImpl_p) :
Tango::LogAdapter(deviceImpl_p)
{
DEBUG_STREAM << "EventBuffer::EventBuffer()" << endl;
}
//==============================================================================
// EventBuffer::~EventBuffer()
//==============================================================================
EventBuffer::~EventBuffer()
{
DEBUG_STREAM << "EventBuffer::~EventBuffer()" << endl;
}
//==============================================================================
// EventBuffer::insertNew()
//==============================================================================
EventBuffer::SP EventBuffer::create(Tango::DeviceImpl* deviceImpl_p)
{
EventBuffer::SP e_sp(new EventBuffer(deviceImpl_p),
EventBuffer::Deleter());
return e_sp;
}
//==============================================================================
// EventBuffer::insertNew()
//==============================================================================
void EventBuffer::insertNew(boost::filesystem::path path)
{
DEBUG_STREAM << "EventBuffer::insertNew()" << endl;
boost::mutex::scoped_lock lock(m_mutex);
bool inserted = m_buffer.insert(
std::pair<boost::filesystem::path, EventStatus>(path, UNPROCESSED) ).second;
if(inserted)
{
DEBUG_STREAM << "EventBuffer::insertNew() element "
<< path.string() << " inserted" << endl;
lock.unlock();
m_conditionVariable.notify_all();
}
else
WARN_STREAM << "EventBuffer::insertNew() element "
<< path.string() << " duplicated" << endl;
}
//==============================================================================
// EventBuffer::waitNew()
//==============================================================================
boost::filesystem::path EventBuffer::waitNew()
{
DEBUG_STREAM << "EventBuffer::waitNew()" << endl;
boost::mutex::scoped_lock lock(m_mutex);
do
{
std::map<boost::filesystem::path, EventStatus>::iterator it;
for(it=m_buffer.begin(); it!=m_buffer.end(); it++)
{
if(it->second == UNPROCESSED)
{
DEBUG_STREAM << "EventBuffer::waitNew() found new element:"
<< it->first.string() << endl;
it->second = ASSIGNED;
return it->first;
}
}
DEBUG_STREAM << "EventBuffer::waitNew() waiting new element" << endl;
m_conditionVariable.wait(lock);
}
while(true);
}
//==============================================================================
// EventBuffer::markAsProcessed()
//==============================================================================
void EventBuffer::markAsProcessed(boost::filesystem::path path)
{
DEBUG_STREAM << "EventBuffer::markAsProcessed()" << endl;
boost::mutex::scoped_lock lock(m_mutex);
std::map<boost::filesystem::path, EventStatus>::iterator it;
it = m_buffer.find(path);
if(it != m_buffer.end())
{
switch(it->second)
{
case UNPROCESSED:
ERROR_STREAM << "EventBuffer::markAsProcessed() element "
<< path.string() << " is marked not processed" << endl;
break;
case ASSIGNED:
it->second = PROCESSED;
DEBUG_STREAM << "EventBuffer::markAsProcessed() element "
<< path.string() << " marked as processed" << endl;
break;
case PROCESSED:
ERROR_STREAM << "EventBuffer::markAsProcessed() element "
<< path.string() << " already marked as processed" << endl;
break;
}
}
else
ERROR_STREAM << "EventBuffer::markAsProcessed() element"
<< path.string() << " not found" << endl;
}
//==============================================================================
// EventBuffer::removeAllProcessed()
//==============================================================================
void EventBuffer::removeAllProcessed()
{
DEBUG_STREAM << "EventBuffer::removeAllProcessed()" << endl;
boost::mutex::scoped_lock lock(m_mutex);
std::map<boost::filesystem::path, EventStatus>::iterator it;
for(it=m_buffer.begin(); it!=m_buffer.end(); ++it)
{
if(it->second == PROCESSED)
{
DEBUG_STREAM << "EventBuffer::removeAllProcessed() element "
<< it->first << "will be removed" << endl;
std::map<boost::filesystem::path, EventStatus>::iterator to_delete = it;
m_buffer.erase(to_delete);
}
}
}
//==============================================================================
// EventBuffer::size()
//==============================================================================
std::size_t EventBuffer::size()
{
DEBUG_STREAM << "EventBuffer::size()" << endl;
boost::mutex::scoped_lock lock(m_mutex);
return m_buffer.size();
}
} //namespace
#ifndef EVENT_BUFFER_H
#define EVENT_BUFFER_H
#include <tango.h>
#include <map>
#include <vector>
#include <iostream>
#include <stdexcept>
#include <boost/shared_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/filesystem.hpp>
namespace PreProcessor_ns
{
class EventBuffer : public Tango::LogAdapter
{
public:
//------------------------------------------------------------------------------
// [Public] Shared pointer typedef
//------------------------------------------------------------------------------
typedef boost::shared_ptr<EventBuffer> SP;
protected:
//------------------------------------------------------------------------------
// [Protected] Constructor destructor deleter
//------------------------------------------------------------------------------
EventBuffer(Tango::DeviceImpl*);
virtual ~EventBuffer();
class Deleter;
friend class Deleter;
class Deleter
{
public:
void operator()(EventBuffer* e) { delete e; }
};
public:
//------------------------------------------------------------------------------
// [Public] Users methods
//------------------------------------------------------------------------------
static EventBuffer::SP create(Tango::DeviceImpl*);
virtual void insertNew(boost::filesystem::path);
virtual boost::filesystem::path waitNew();
virtual void markAsProcessed(boost::filesystem::path);
virtual void removeAllProcessed();
virtual std::size_t size();
protected:
//------------------------------------------------------------------------------
// [Protected] Event status enumeration
//------------------------------------------------------------------------------
enum EventStatus { UNPROCESSED=0, ASSIGNED=1, PROCESSED=2 };
//------------------------------------------------------------------------------
// [Protected] Class variables
//------------------------------------------------------------------------------
//Access synchronization mutex
boost::mutex m_mutex;
//Access synchronization condition variable
boost::condition_variable m_conditionVariable;
//File buffer
std::map<boost::filesystem::path, EventStatus> m_buffer;
};
} //End of namespace
#endif /*!EVENT_BUFFER_H*/
#include <EventThread.h>
#include <PreProcessor.h>
#include <WorkerThread.h>
#include <cassert>
#include <unistd.h>
#include <fcntl.h>
#include <cerrno>
#include <boost/filesystem.hpp>
#include <boost/scoped_ptr.hpp>
namespace PreProcessor_ns
{
//==============================================================================
// EventThread::EventThread()
//==============================================================================
EventThread::EventThread(Tango::DeviceImpl* deviceImpl_p,
Configuration::SP configuration_sp) : Tango::LogAdapter(deviceImpl_p),
m_configuration_sp(configuration_sp)
{
DEBUG_STREAM << "EventThread::EventThread()" << endl;
m_state = Tango::OFF;
m_status = "Event thread not running";
}
//==============================================================================
// EventThread::~EventThread()
//==============================================================================
EventThread::~EventThread()
{
DEBUG_STREAM << "EventThread::~EventThread()" << endl;
if(m_threadGroup_sp)
{
m_threadGroup_sp->interrupt_all();
m_threadGroup_sp->join_all();
}
}
//==============================================================================
// EventThread::create()
//==============================================================================
EventThread::SP EventThread::create(Tango::DeviceImpl* deviceImpl_p,
Configuration::SP configuration_sp)
{
EventThread::SP e_sp(new EventThread(deviceImpl_p, configuration_sp),
EventThread::Deleter());
return e_sp;
}
//==============================================================================
// EventThread::start()
//==============================================================================
void EventThread::start()
{
DEBUG_STREAM << "EventThread::start()" << endl;
try
{
}
catch(std::exception& ex)
{
std::stringstream error_stream;
error_stream << "Event thread not running " << ex.what();
writeState(Tango::ALARM);
writeStatus(error_stream.str());
}
catch(...)
{
writeState(Tango::ALARM);
writeStatus("Event thread unknown exception");
}
}
//==============================================================================
// EventThread::stop()
//==============================================================================
void EventThread::stop()
{
DEBUG_STREAM << "EventThread::stop()" << endl;
if(m_threadGroup_sp)
{
m_threadGroup_sp->interrupt_all();
m_threadGroup_sp->join_all();
}
inotify_rm_watch(m_fileDescriptor, m_watchDescriptor);
if(m_fileDescriptor) { close(m_fileDescriptor); }
}
//==============================================================================
// EventThread::readState()
//==============================================================================
Tango::DevState EventThread::readState()
{
DEBUG_STREAM << "EventThread::readState()" << endl;
boost::mutex::scoped_lock stateLock(m_stateMutex);
return m_state;
}
//==============================================================================
// EventThread::readStatus()
//==============================================================================
std::string EventThread::readStatus()
{
DEBUG_STREAM << "EventThread::readStatus()" << endl;
boost::mutex::scoped_lock statusLock(m_statusMutex);
return m_status;
}
//==============================================================================
// EventThread::writeState()
//==============================================================================
void EventThread::writeState(Tango::DevState state)
{
DEBUG_STREAM << "Client::writeState()" << endl;
boost::mutex::scoped_lock stateLock(m_stateMutex);
m_state = state;
}
//==============================================================================
// EventThread::writeStatus()
//==============================================================================
void EventThread::writeStatus(std::string status)
{
DEBUG_STREAM << "Client::writeStatus()" << endl;
boost::mutex::scoped_lock statusLock(m_statusMutex);
m_status = status;
}
//==============================================================================
// EventThread::eventLoop()
//==============================================================================
void EventThread::eventLoop()
{
DEBUG_STREAM << "EventThread::eventLoop() starting loop" << endl;
unsigned int sleepTime = m_configuration_sp->getSleepTime();
boost::filesystem::path watchPath(m_configuration_sp->getWatchPath());
while(true)
{
try
{
char buffer[BUF_LEN];
int length = 0;
if((length = read( m_fileDescriptor, buffer, BUF_LEN )) < 0)
{
if(errno != EINTR && errno != EAGAIN)
{
writeState(Tango::ALARM);
writeStatus("Event thread error on watch path read");
}
else
{
writeState(Tango::ON);
writeStatus("Event thread running");
}
}
else
{
writeState(Tango::ON);
writeStatus("Event thread new data found");
}
struct inotify_event *event;
for(int i=0; i<length; i += EVENT_SIZE + event->len)
{
event = ( struct inotify_event * ) &buffer[ i ];
DEBUG_STREAM << "EVENT: " << event->name << endl; //TODO: delete me
//Add path to file name
boost::filesystem::path file(event->name);
boost::filesystem::path path(watchPath);
path /= file;
//Check if event is a regular file
if(boost::filesystem::is_regular_file(path))
m_eventBuffer_sp->insertNew(path);
}
m_eventBuffer_sp->removeAllProcessed();
DEBUG_STREAM << "EventThread::eventLoop() sleep for " << sleepTime << endl;
boost::posix_time::milliseconds sleepPosixTime(sleepTime);
boost::this_thread::sleep(sleepPosixTime);
}
catch(boost::thread_interrupted& ex)
{
DEBUG_STREAM << "EventThread::eventLoop() stopping loop" << endl;
writeState(Tango::OFF);
writeStatus("Event thread not running");
break;
}
catch(std::exception& ex)
{
std::stringstream error_stream;
error_stream << "Event thread not running " << ex.what();
writeState(Tango::ALARM);
writeStatus(error_stream.str());
}
catch(...)
{
writeState(Tango::ALARM);
writeStatus("Event thread unknown exception");
}
} //while
}
} //namespace
#ifndef EVENT_THREAD_H
#define EVENT_THREAD_H
#include <Configuration.h>
#include <EventBuffer.h>
#include <tango.h>
#include <stdexcept>
#include <sys/inotify.h>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
namespace PreProcessor_ns
{
class FitsImporter;
class EventThread : public Tango::LogAdapter
{
public:
//------------------------------------------------------------------------------
// [Public] Shared pointer typedef
//------------------------------------------------------------------------------
typedef boost::shared_ptr<EventThread> SP;
protected:
//------------------------------------------------------------------------------
// [Protected] Constructor destructor deleter
//------------------------------------------------------------------------------
EventThread(Tango::DeviceImpl*, Configuration::SP);
virtual ~EventThread();
class Deleter;
friend class Deleter;
class Deleter
{
public:
void operator()(EventThread* e) { delete e; }
};
public:
//------------------------------------------------------------------------------
// [Public] Class creation method
//------------------------------------------------------------------------------
static EventThread::SP create(Tango::DeviceImpl*, Configuration::SP);
//------------------------------------------------------------------------------
// [Public] Thread management methods
//------------------------------------------------------------------------------
virtual void start();
virtual void stop();
//------------------------------------------------------------------------------
// [Public] Read state and status methods
//------------------------------------------------------------------------------
virtual Tango::DevState readState();
virtual std::string readStatus();
protected:
//------------------------------------------------------------------------------
// [Protected] Write state and status methods
//------------------------------------------------------------------------------
virtual void writeState(Tango::DevState);
virtual void writeStatus(std::string);
//------------------------------------------------------------------------------
// [Protected] Utilities methods
//------------------------------------------------------------------------------
virtual void eventLoop();
//------------------------------------------------------------------------------
// [Protected] Class variables
//------------------------------------------------------------------------------
//Tango server class pointer
FitsImporter* m_fitsImporter_p;
//Boost thread group shared pointer
boost::scoped_ptr<boost::thread_group> m_threadGroup_sp;
//Configuration shared pointer
Configuration::SP m_configuration_sp;
//Event buffer shared pointer
EventBuffer::SP m_eventBuffer_sp;
//Tango state synchronization
boost::mutex m_stateMutex;
//Tango state variable
Tango::DevState m_state;
//Tango status synchronization
boost::mutex m_statusMutex;
//Tango status variable
std::string m_status;
//INotify file and watch descriptors
int m_fileDescriptor, m_watchDescriptor;
//INotify event size
static const std::size_t EVENT_SIZE = ( sizeof (struct inotify_event) );
//INotify event buffer length
static const std::size_t BUF_LEN = ( 1024 * ( EVENT_SIZE + 16 ) );
};
} // End of namespace
#endif /*!EVENT_THREAD_H*/
#include <WorkerThread.h>
#include <cassert>
#include <fstream>
#include <tango.h>
#include <boost/scoped_ptr.hpp>
#include <boost/iostreams/filtering_streambuf.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/iostreams/filter/gzip.hpp>
#include <boost/regex.hpp>
#include <boost/lexical_cast.hpp>
namespace PreProcessor_ns
{
//==============================================================================
// WorkerThread::WorkerThread()
//==============================================================================
WorkerThread::WorkerThread(Tango::DeviceImpl* deviceImpl_p,
EventBuffer::SP eventBuffer_sp) : Tango::LogAdapter(deviceImpl_p)
{
DEBUG_STREAM << "WorkerThread::WorkerThread()" << std::endl;
}
//==============================================================================
// WorkerThread::~WorkerThread()
//==============================================================================
WorkerThread::~WorkerThread()
{
DEBUG_STREAM << "WorkerThread::~WorkerThread()" << std::endl;
}
//==============================================================================
// WorkerThread::workerLoop()()
//==============================================================================
void WorkerThread::workerLoop()
{
DEBUG_STREAM << "WorkerThread::workerLoop(): starting loop" << endl;
unsigned int waitTime = m_configuration_sp->getWaitTime();
std::string watchPath = m_configuration_sp->getWatchPath();
// while(true)
// {
// try
// {
// //Wait new file event to process
// boost::filesystem::path origPath(m_eventBuffer_sp->waitNew());
// std::string fileName = origPath.stem().string();
//
// //If configured wait after new file event
// boost::posix_time::milliseconds waitPosixTime(waitTime);
// boost::this_thread::sleep( waitPosixTime );
//
// bool completed = false;
//
// INFO_STREAM << "WorkerThread::workerLoop() processing \""
// << fileName << "\"" << endl;
//
// try
// {
// //Open fits file
// boost::shared_ptr<CCfits::FITS> fitsFile_sp(
// new CCfits::FITS(origPath.string(), CCfits::Write));
//
// //Read all key in primary HDU
// CCfits::PHDU& phdu = fitsFile_sp->pHDU();
// phdu.readAllKeys();
//
// try
// {
// //Try to ingest using an instrument in list
// ingestUsingInstrumentList(origPath, fitsFile_sp);
//
// completed = true;
// m_fitsImporter_p->incrementRegularCounter();
// INFO_STREAM << "WorkerThread::workerLoop() \"" << fileName
// << "\" archived regularly" << endl;
// }
// catch(CCfits::FitsException& ex)
// {
// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.message() << endl;
// }
// catch(std::runtime_error& ex)
// {
// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.what() << endl;
// }
//
// if(!completed)
// {
// //Try to ingest using default instrument
// ingestUsingDefaultInstrument(origPath, fitsFile_sp);
//
// completed = true;
// m_fitsImporter_p->incrementWarningCounter();
// WARN_STREAM << "WorkerThread::workerLoop() \"" << fileName
// << "\" archived in default instrument" << endl;
// }
// }
// catch(CCfits::FitsException& ex)
// {
// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.message() << endl;
// }
// catch(std::runtime_error& ex)
// {
// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.what() << endl;
// }
//
// if(!completed)
// {
// //Cannot ingest new file => notify error
// m_fitsImporter_p->incrementErrorCounter();
// ERROR_STREAM << "WorkerThread::workerLoop() \"" << fileName
// << "\" not archived" << endl;
// }
//
// m_eventBuffer_sp->markAsProcessed(origPath);
// }
// catch(boost::thread_interrupted& ex)
// {
// DEBUG_STREAM << "WorkerThread::workerLoop() interrupt" << endl;
//
// break;
// }
// catch(std::exception& ex)
// {
// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.what() << endl;
// }
// catch(...)
// {
// ERROR_STREAM << "WorkerThread::workerLoop() unknown exception" << endl;
// }
// } //while
}
} //namespace
#ifndef WORKER_THREAD_H
#define WORKER_THREAD_H
#include <Configuration.h>
#include <EventBuffer.h>
#include <tango.h>
#include <stdexcept>
#include <sstream>
#include <boost/shared_ptr.hpp>
namespace PreProcessor_ns
{
class WorkerThread : public Tango::LogAdapter
{
public:
//------------------------------------------------------------------------------
// [Public] Constructor destructor
//------------------------------------------------------------------------------
WorkerThread(Tango::DeviceImpl*, EventBuffer::SP);
virtual ~WorkerThread();
//------------------------------------------------------------------------------
// [Public] Users method
//------------------------------------------------------------------------------
virtual void workerLoop();
protected:
//------------------------------------------------------------------------------
// [Protected] Utilities methods
//------------------------------------------------------------------------------
//------------------------------------------------------------------------------
// [Protected] Class variables
//------------------------------------------------------------------------------
//Event buffer shared pointer
EventBuffer::SP m_eventBuffer_sp;
//Configuration shared pointer
Configuration::SP m_configuration_sp;
};
} //End of namespace
#endif /*!WORKER_THREAD_H*/
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment