diff --git a/Makefile b/Makefile index 3b43a943d5c7b35e89f18336bf9c07353a927159..f17edc337555e99e43a79819637e72c0e028821c 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,8 @@ CC=g++ CXX_DEBUG_FLAGS=-g -DVERBOSE_DEBUG CXX_RELEASE_FLAGS=-O3 CXX_DEFAULT_FLAGS=-c -Wall -Wextra -std=c++11 -std=gnu++11 -LDFLAGS=-Wall -lomniORB4 -lomniDynamic4 -lCOS4 -lomnithread -ltango -llog4tango +LDFLAGS=-Wall -lomniORB4 -lomniDynamic4 -lCOS4 -lomnithread -ltango -llog4tango \ + -lboost_thread -lboost_filesystem -lboost_system INC_PARM=$(foreach d, $(INC_DIR), -I$d) LIB_PARM=$(foreach d, $(LIB_DIR), -L$d) #================================================================================ diff --git a/src/ClassFactory.cpp b/src/ClassFactory.cpp index b0d490f6fa7a3943da3997e1cc44b37063f8398c..6ed70cf528dc4bb7392956e6aa886be2dfb63d6b 100644 --- a/src/ClassFactory.cpp +++ b/src/ClassFactory.cpp @@ -1,5 +1,4 @@ /*----- PROTECTED REGION ID(PreProcessor::ClassFactory.cpp) ENABLED START -----*/ -static const char *RcsId = "$Id: $"; //============================================================================= // // file : ClassFactory.cpp diff --git a/src/Configuration.h b/src/Configuration.h new file mode 100644 index 0000000000000000000000000000000000000000..4c3f608d8612537d56d6443a7e5fb9b1ca5d6d08 --- /dev/null +++ b/src/Configuration.h @@ -0,0 +1,83 @@ +#ifndef CONFIGURATION_H +#define CONFIGURATION_H + +#include <iostream> +#include <stdint.h> +#include <vector> +#include <boost/shared_ptr.hpp> + +namespace PreProcessor_ns +{ + +class Configuration +{ +public: +//------------------------------------------------------------------------------ +// [Public] Shared pointer typedef +//------------------------------------------------------------------------------ + typedef boost::shared_ptr<Configuration> SP; + +private: +//------------------------------------------------------------------------------ +// [Private] Constructor destructor deleter +//------------------------------------------------------------------------------ + Configuration(std::string watchPath, int workerNumber, int sleepTime, int waitTime, + int connectionNumber, uint32_t iNotifyMask): m_watchPath(watchPath), + m_workerNumber(workerNumber), m_sleepTime(sleepTime), m_waitTime(waitTime), + m_connectionNumber(connectionNumber), m_iNotifyMask(iNotifyMask) {} + virtual ~Configuration() {} + + class Deleter; + friend class Deleter; + class Deleter + { + public: + void operator()(Configuration* c) { delete c; } + }; + +public: +//------------------------------------------------------------------------------ +// [Public] User methods +//------------------------------------------------------------------------------ + static Configuration::SP create(std::string watchPath, int workerNumber, + int sleepTime, int waitTime, int connectionNumber, uint32_t iNotifyMask) + { + Configuration::SP c_sp(new Configuration(watchPath, workerNumber, sleepTime, + waitTime, connectionNumber, iNotifyMask), Configuration::Deleter()); + + return c_sp; + } + + std::string getWatchPath() const { return m_watchPath; } + unsigned int getWorkerNumber() const { return m_workerNumber; } + unsigned int getSleepTime() const { return m_sleepTime; } + unsigned int getWaitTime() const { return m_waitTime; } + unsigned int getConnectionNumber() const { return m_connectionNumber; } + uint32_t getINotifyMask() const { return m_iNotifyMask; } + +private: +//------------------------------------------------------------------------------ +// [Private] class variables +//------------------------------------------------------------------------------ + //INotify watch path + const std::string m_watchPath; + + //Worker thread number + const unsigned int m_workerNumber; + + //Event thread sleep time + const unsigned int m_sleepTime; + + //Worker thread wait time + const unsigned int m_waitTime; + + //Number of connection per destination + const unsigned int m_connectionNumber; + + //INotify mask + const uint32_t m_iNotifyMask; +}; + +} //End of namespace + +#endif /*!CONFIGURATION_H*/ diff --git a/src/EventBuffer.cpp b/src/EventBuffer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0499e6dc62c4904319bf55acec2b3a2780f9f48c --- /dev/null +++ b/src/EventBuffer.cpp @@ -0,0 +1,166 @@ +#include <EventBuffer.h> + +#include <boost/thread/locks.hpp> + +namespace PreProcessor_ns +{ + +//============================================================================== +// EventBuffer::EventBuffer() +//============================================================================== +EventBuffer::EventBuffer(Tango::DeviceImpl* deviceImpl_p) : + Tango::LogAdapter(deviceImpl_p) +{ + DEBUG_STREAM << "EventBuffer::EventBuffer()" << endl; +} + +//============================================================================== +// EventBuffer::~EventBuffer() +//============================================================================== +EventBuffer::~EventBuffer() +{ + DEBUG_STREAM << "EventBuffer::~EventBuffer()" << endl; +} + +//============================================================================== +// EventBuffer::insertNew() +//============================================================================== +EventBuffer::SP EventBuffer::create(Tango::DeviceImpl* deviceImpl_p) +{ + EventBuffer::SP e_sp(new EventBuffer(deviceImpl_p), + EventBuffer::Deleter()); + + return e_sp; +} + +//============================================================================== +// EventBuffer::insertNew() +//============================================================================== +void EventBuffer::insertNew(boost::filesystem::path path) +{ + DEBUG_STREAM << "EventBuffer::insertNew()" << endl; + + boost::mutex::scoped_lock lock(m_mutex); + + bool inserted = m_buffer.insert( + std::pair<boost::filesystem::path, EventStatus>(path, UNPROCESSED) ).second; + + if(inserted) + { + DEBUG_STREAM << "EventBuffer::insertNew() element " + << path.string() << " inserted" << endl; + + lock.unlock(); + + m_conditionVariable.notify_all(); + } + else + WARN_STREAM << "EventBuffer::insertNew() element " + << path.string() << " duplicated" << endl; +} + +//============================================================================== +// EventBuffer::waitNew() +//============================================================================== +boost::filesystem::path EventBuffer::waitNew() +{ + DEBUG_STREAM << "EventBuffer::waitNew()" << endl; + + boost::mutex::scoped_lock lock(m_mutex); + + do + { + std::map<boost::filesystem::path, EventStatus>::iterator it; + for(it=m_buffer.begin(); it!=m_buffer.end(); it++) + { + if(it->second == UNPROCESSED) + { + DEBUG_STREAM << "EventBuffer::waitNew() found new element:" + << it->first.string() << endl; + + it->second = ASSIGNED; + return it->first; + } + } + + DEBUG_STREAM << "EventBuffer::waitNew() waiting new element" << endl; + + m_conditionVariable.wait(lock); + } + while(true); +} + +//============================================================================== +// EventBuffer::markAsProcessed() +//============================================================================== +void EventBuffer::markAsProcessed(boost::filesystem::path path) +{ + DEBUG_STREAM << "EventBuffer::markAsProcessed()" << endl; + + boost::mutex::scoped_lock lock(m_mutex); + + std::map<boost::filesystem::path, EventStatus>::iterator it; + + it = m_buffer.find(path); + + if(it != m_buffer.end()) + { + switch(it->second) + { + case UNPROCESSED: + ERROR_STREAM << "EventBuffer::markAsProcessed() element " + << path.string() << " is marked not processed" << endl; + break; + case ASSIGNED: + it->second = PROCESSED; + DEBUG_STREAM << "EventBuffer::markAsProcessed() element " + << path.string() << " marked as processed" << endl; + break; + case PROCESSED: + ERROR_STREAM << "EventBuffer::markAsProcessed() element " + << path.string() << " already marked as processed" << endl; + break; + } + } + else + ERROR_STREAM << "EventBuffer::markAsProcessed() element" + << path.string() << " not found" << endl; +} + +//============================================================================== +// EventBuffer::removeAllProcessed() +//============================================================================== +void EventBuffer::removeAllProcessed() +{ + DEBUG_STREAM << "EventBuffer::removeAllProcessed()" << endl; + + boost::mutex::scoped_lock lock(m_mutex); + + std::map<boost::filesystem::path, EventStatus>::iterator it; + for(it=m_buffer.begin(); it!=m_buffer.end(); ++it) + { + if(it->second == PROCESSED) + { + DEBUG_STREAM << "EventBuffer::removeAllProcessed() element " + << it->first << "will be removed" << endl; + + std::map<boost::filesystem::path, EventStatus>::iterator to_delete = it; + + m_buffer.erase(to_delete); + } + } +} + +//============================================================================== +// EventBuffer::size() +//============================================================================== +std::size_t EventBuffer::size() +{ + DEBUG_STREAM << "EventBuffer::size()" << endl; + + boost::mutex::scoped_lock lock(m_mutex); + + return m_buffer.size(); +} + +} //namespace diff --git a/src/EventBuffer.h b/src/EventBuffer.h new file mode 100644 index 0000000000000000000000000000000000000000..2433079938a20cc82d9b111218b1e851ca37ed0b --- /dev/null +++ b/src/EventBuffer.h @@ -0,0 +1,80 @@ +#ifndef EVENT_BUFFER_H +#define EVENT_BUFFER_H + +#include <tango.h> + +#include <map> +#include <vector> +#include <iostream> +#include <stdexcept> + +#include <boost/shared_ptr.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/condition_variable.hpp> +#include <boost/filesystem.hpp> + +namespace PreProcessor_ns +{ + +class EventBuffer : public Tango::LogAdapter +{ +public: +//------------------------------------------------------------------------------ +// [Public] Shared pointer typedef +//------------------------------------------------------------------------------ + typedef boost::shared_ptr<EventBuffer> SP; + +protected: +//------------------------------------------------------------------------------ +// [Protected] Constructor destructor deleter +//------------------------------------------------------------------------------ + EventBuffer(Tango::DeviceImpl*); + + virtual ~EventBuffer(); + + class Deleter; + friend class Deleter; + class Deleter + { + public: + void operator()(EventBuffer* e) { delete e; } + }; + +public: +//------------------------------------------------------------------------------ +// [Public] Users methods +//------------------------------------------------------------------------------ + static EventBuffer::SP create(Tango::DeviceImpl*); + + virtual void insertNew(boost::filesystem::path); + + virtual boost::filesystem::path waitNew(); + + virtual void markAsProcessed(boost::filesystem::path); + + virtual void removeAllProcessed(); + + virtual std::size_t size(); + +protected: +//------------------------------------------------------------------------------ +// [Protected] Event status enumeration +//------------------------------------------------------------------------------ + enum EventStatus { UNPROCESSED=0, ASSIGNED=1, PROCESSED=2 }; + +//------------------------------------------------------------------------------ +// [Protected] Class variables +//------------------------------------------------------------------------------ + //Access synchronization mutex + boost::mutex m_mutex; + + //Access synchronization condition variable + boost::condition_variable m_conditionVariable; + + //File buffer + std::map<boost::filesystem::path, EventStatus> m_buffer; +}; + +} //End of namespace + +#endif /*!EVENT_BUFFER_H*/ diff --git a/src/EventThread.cpp b/src/EventThread.cpp new file mode 100644 index 0000000000000000000000000000000000000000..85efd95dd17206fa27bcccafeeb64563e95c251f --- /dev/null +++ b/src/EventThread.cpp @@ -0,0 +1,232 @@ +#include <EventThread.h> +#include <PreProcessor.h> +#include <WorkerThread.h> + +#include <cassert> +#include <unistd.h> +#include <fcntl.h> +#include <cerrno> + +#include <boost/filesystem.hpp> +#include <boost/scoped_ptr.hpp> + +namespace PreProcessor_ns +{ + +//============================================================================== +// EventThread::EventThread() +//============================================================================== +EventThread::EventThread(Tango::DeviceImpl* deviceImpl_p, + Configuration::SP configuration_sp) : Tango::LogAdapter(deviceImpl_p), + m_configuration_sp(configuration_sp) +{ + DEBUG_STREAM << "EventThread::EventThread()" << endl; + + m_state = Tango::OFF; + m_status = "Event thread not running"; +} + +//============================================================================== +// EventThread::~EventThread() +//============================================================================== +EventThread::~EventThread() +{ + DEBUG_STREAM << "EventThread::~EventThread()" << endl; + + if(m_threadGroup_sp) + { + m_threadGroup_sp->interrupt_all(); + + m_threadGroup_sp->join_all(); + } +} + +//============================================================================== +// EventThread::create() +//============================================================================== +EventThread::SP EventThread::create(Tango::DeviceImpl* deviceImpl_p, + Configuration::SP configuration_sp) +{ + EventThread::SP e_sp(new EventThread(deviceImpl_p, configuration_sp), + EventThread::Deleter()); + + return e_sp; +} + +//============================================================================== +// EventThread::start() +//============================================================================== +void EventThread::start() +{ + DEBUG_STREAM << "EventThread::start()" << endl; + + try + { + + } + catch(std::exception& ex) + { + std::stringstream error_stream; + error_stream << "Event thread not running " << ex.what(); + writeState(Tango::ALARM); + writeStatus(error_stream.str()); + } + catch(...) + { + writeState(Tango::ALARM); + writeStatus("Event thread unknown exception"); + } +} + +//============================================================================== +// EventThread::stop() +//============================================================================== +void EventThread::stop() +{ + DEBUG_STREAM << "EventThread::stop()" << endl; + + if(m_threadGroup_sp) + { + m_threadGroup_sp->interrupt_all(); + + m_threadGroup_sp->join_all(); + } + + inotify_rm_watch(m_fileDescriptor, m_watchDescriptor); + + if(m_fileDescriptor) { close(m_fileDescriptor); } +} + +//============================================================================== +// EventThread::readState() +//============================================================================== +Tango::DevState EventThread::readState() +{ + DEBUG_STREAM << "EventThread::readState()" << endl; + + boost::mutex::scoped_lock stateLock(m_stateMutex); + + return m_state; +} + +//============================================================================== +// EventThread::readStatus() +//============================================================================== +std::string EventThread::readStatus() +{ + DEBUG_STREAM << "EventThread::readStatus()" << endl; + + boost::mutex::scoped_lock statusLock(m_statusMutex); + + return m_status; +} + +//============================================================================== +// EventThread::writeState() +//============================================================================== +void EventThread::writeState(Tango::DevState state) +{ + DEBUG_STREAM << "Client::writeState()" << endl; + + boost::mutex::scoped_lock stateLock(m_stateMutex); + + m_state = state; +} + +//============================================================================== +// EventThread::writeStatus() +//============================================================================== +void EventThread::writeStatus(std::string status) +{ + DEBUG_STREAM << "Client::writeStatus()" << endl; + + boost::mutex::scoped_lock statusLock(m_statusMutex); + + m_status = status; +} + +//============================================================================== +// EventThread::eventLoop() +//============================================================================== +void EventThread::eventLoop() +{ + DEBUG_STREAM << "EventThread::eventLoop() starting loop" << endl; + + unsigned int sleepTime = m_configuration_sp->getSleepTime(); + boost::filesystem::path watchPath(m_configuration_sp->getWatchPath()); + + while(true) + { + try + { + char buffer[BUF_LEN]; + int length = 0; + + if((length = read( m_fileDescriptor, buffer, BUF_LEN )) < 0) + { + if(errno != EINTR && errno != EAGAIN) + { + writeState(Tango::ALARM); + writeStatus("Event thread error on watch path read"); + } + else + { + writeState(Tango::ON); + writeStatus("Event thread running"); + } + } + else + { + writeState(Tango::ON); + writeStatus("Event thread new data found"); + } + + struct inotify_event *event; + for(int i=0; i<length; i += EVENT_SIZE + event->len) + { + event = ( struct inotify_event * ) &buffer[ i ]; + + DEBUG_STREAM << "EVENT: " << event->name << endl; //TODO: delete me + + //Add path to file name + boost::filesystem::path file(event->name); + boost::filesystem::path path(watchPath); + path /= file; + + //Check if event is a regular file + if(boost::filesystem::is_regular_file(path)) + m_eventBuffer_sp->insertNew(path); + } + + m_eventBuffer_sp->removeAllProcessed(); + + DEBUG_STREAM << "EventThread::eventLoop() sleep for " << sleepTime << endl; + + boost::posix_time::milliseconds sleepPosixTime(sleepTime); + boost::this_thread::sleep(sleepPosixTime); + } + catch(boost::thread_interrupted& ex) + { + DEBUG_STREAM << "EventThread::eventLoop() stopping loop" << endl; + + writeState(Tango::OFF); + writeStatus("Event thread not running"); + + break; + } + catch(std::exception& ex) + { + std::stringstream error_stream; + error_stream << "Event thread not running " << ex.what(); + writeState(Tango::ALARM); + writeStatus(error_stream.str()); + } + catch(...) + { + writeState(Tango::ALARM); + writeStatus("Event thread unknown exception"); + } + } //while +} + +} //namespace diff --git a/src/EventThread.h b/src/EventThread.h new file mode 100644 index 0000000000000000000000000000000000000000..5f8772fb6832b8db9272b12cea6badffba905a2a --- /dev/null +++ b/src/EventThread.h @@ -0,0 +1,116 @@ +#ifndef EVENT_THREAD_H +#define EVENT_THREAD_H + +#include <Configuration.h> +#include <EventBuffer.h> + +#include <tango.h> + +#include <stdexcept> +#include <sys/inotify.h> + +#include <boost/shared_ptr.hpp> +#include <boost/thread.hpp> + +namespace PreProcessor_ns +{ + +class FitsImporter; + +class EventThread : public Tango::LogAdapter +{ +public: +//------------------------------------------------------------------------------ +// [Public] Shared pointer typedef +//------------------------------------------------------------------------------ + typedef boost::shared_ptr<EventThread> SP; + +protected: +//------------------------------------------------------------------------------ +// [Protected] Constructor destructor deleter +//------------------------------------------------------------------------------ + EventThread(Tango::DeviceImpl*, Configuration::SP); + + virtual ~EventThread(); + + class Deleter; + friend class Deleter; + class Deleter + { + public: + void operator()(EventThread* e) { delete e; } + }; + +public: +//------------------------------------------------------------------------------ +// [Public] Class creation method +//------------------------------------------------------------------------------ + static EventThread::SP create(Tango::DeviceImpl*, Configuration::SP); + +//------------------------------------------------------------------------------ +// [Public] Thread management methods +//------------------------------------------------------------------------------ + virtual void start(); + + virtual void stop(); + +//------------------------------------------------------------------------------ +// [Public] Read state and status methods +//------------------------------------------------------------------------------ + virtual Tango::DevState readState(); + + virtual std::string readStatus(); + +protected: +//------------------------------------------------------------------------------ +// [Protected] Write state and status methods +//------------------------------------------------------------------------------ + virtual void writeState(Tango::DevState); + + virtual void writeStatus(std::string); + +//------------------------------------------------------------------------------ +// [Protected] Utilities methods +//------------------------------------------------------------------------------ + virtual void eventLoop(); + +//------------------------------------------------------------------------------ +// [Protected] Class variables +//------------------------------------------------------------------------------ + //Tango server class pointer + FitsImporter* m_fitsImporter_p; + + //Boost thread group shared pointer + boost::scoped_ptr<boost::thread_group> m_threadGroup_sp; + + //Configuration shared pointer + Configuration::SP m_configuration_sp; + + //Event buffer shared pointer + EventBuffer::SP m_eventBuffer_sp; + + //Tango state synchronization + boost::mutex m_stateMutex; + + //Tango state variable + Tango::DevState m_state; + + //Tango status synchronization + boost::mutex m_statusMutex; + + //Tango status variable + std::string m_status; + + //INotify file and watch descriptors + int m_fileDescriptor, m_watchDescriptor; + + //INotify event size + static const std::size_t EVENT_SIZE = ( sizeof (struct inotify_event) ); + + //INotify event buffer length + static const std::size_t BUF_LEN = ( 1024 * ( EVENT_SIZE + 16 ) ); +}; + +} // End of namespace + +#endif /*!EVENT_THREAD_H*/ diff --git a/src/WorkerThread.cpp b/src/WorkerThread.cpp new file mode 100644 index 0000000000000000000000000000000000000000..47a9e7b7616c7e2ef59c802d133697149b2d7ca7 --- /dev/null +++ b/src/WorkerThread.cpp @@ -0,0 +1,138 @@ +#include <WorkerThread.h> + +#include <cassert> +#include <fstream> + +#include <tango.h> + +#include <boost/scoped_ptr.hpp> +#include <boost/iostreams/filtering_streambuf.hpp> +#include <boost/iostreams/copy.hpp> +#include <boost/iostreams/filter/gzip.hpp> +#include <boost/regex.hpp> +#include <boost/lexical_cast.hpp> + +namespace PreProcessor_ns +{ + +//============================================================================== +// WorkerThread::WorkerThread() +//============================================================================== +WorkerThread::WorkerThread(Tango::DeviceImpl* deviceImpl_p, + EventBuffer::SP eventBuffer_sp) : Tango::LogAdapter(deviceImpl_p) +{ + DEBUG_STREAM << "WorkerThread::WorkerThread()" << std::endl; +} + +//============================================================================== +// WorkerThread::~WorkerThread() +//============================================================================== +WorkerThread::~WorkerThread() +{ + DEBUG_STREAM << "WorkerThread::~WorkerThread()" << std::endl; +} + +//============================================================================== +// WorkerThread::workerLoop()() +//============================================================================== +void WorkerThread::workerLoop() +{ + DEBUG_STREAM << "WorkerThread::workerLoop(): starting loop" << endl; + + unsigned int waitTime = m_configuration_sp->getWaitTime(); + std::string watchPath = m_configuration_sp->getWatchPath(); + +// while(true) +// { +// try +// { +// //Wait new file event to process +// boost::filesystem::path origPath(m_eventBuffer_sp->waitNew()); +// std::string fileName = origPath.stem().string(); +// +// //If configured wait after new file event +// boost::posix_time::milliseconds waitPosixTime(waitTime); +// boost::this_thread::sleep( waitPosixTime ); +// +// bool completed = false; +// +// INFO_STREAM << "WorkerThread::workerLoop() processing \"" +// << fileName << "\"" << endl; +// +// try +// { +// //Open fits file +// boost::shared_ptr<CCfits::FITS> fitsFile_sp( +// new CCfits::FITS(origPath.string(), CCfits::Write)); +// +// //Read all key in primary HDU +// CCfits::PHDU& phdu = fitsFile_sp->pHDU(); +// phdu.readAllKeys(); +// +// try +// { +// //Try to ingest using an instrument in list +// ingestUsingInstrumentList(origPath, fitsFile_sp); +// +// completed = true; +// m_fitsImporter_p->incrementRegularCounter(); +// INFO_STREAM << "WorkerThread::workerLoop() \"" << fileName +// << "\" archived regularly" << endl; +// } +// catch(CCfits::FitsException& ex) +// { +// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.message() << endl; +// } +// catch(std::runtime_error& ex) +// { +// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.what() << endl; +// } +// +// if(!completed) +// { +// //Try to ingest using default instrument +// ingestUsingDefaultInstrument(origPath, fitsFile_sp); +// +// completed = true; +// m_fitsImporter_p->incrementWarningCounter(); +// WARN_STREAM << "WorkerThread::workerLoop() \"" << fileName +// << "\" archived in default instrument" << endl; +// } +// } +// catch(CCfits::FitsException& ex) +// { +// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.message() << endl; +// } +// catch(std::runtime_error& ex) +// { +// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.what() << endl; +// } +// +// if(!completed) +// { +// //Cannot ingest new file => notify error +// m_fitsImporter_p->incrementErrorCounter(); +// ERROR_STREAM << "WorkerThread::workerLoop() \"" << fileName +// << "\" not archived" << endl; +// } +// +// m_eventBuffer_sp->markAsProcessed(origPath); +// } +// catch(boost::thread_interrupted& ex) +// { +// DEBUG_STREAM << "WorkerThread::workerLoop() interrupt" << endl; +// +// break; +// } +// catch(std::exception& ex) +// { +// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.what() << endl; +// } +// catch(...) +// { +// ERROR_STREAM << "WorkerThread::workerLoop() unknown exception" << endl; +// } +// } //while +} + +} //namespace diff --git a/src/WorkerThread.h b/src/WorkerThread.h new file mode 100644 index 0000000000000000000000000000000000000000..10df7800d4ae4cfb2ca727728d870de016d9c9e8 --- /dev/null +++ b/src/WorkerThread.h @@ -0,0 +1,49 @@ +#ifndef WORKER_THREAD_H +#define WORKER_THREAD_H + +#include <Configuration.h> +#include <EventBuffer.h> + +#include <tango.h> + +#include <stdexcept> +#include <sstream> + +#include <boost/shared_ptr.hpp> + +namespace PreProcessor_ns +{ + +class WorkerThread : public Tango::LogAdapter +{ +public: +//------------------------------------------------------------------------------ +// [Public] Constructor destructor +//------------------------------------------------------------------------------ + WorkerThread(Tango::DeviceImpl*, EventBuffer::SP); + + virtual ~WorkerThread(); + +//------------------------------------------------------------------------------ +// [Public] Users method +//------------------------------------------------------------------------------ + virtual void workerLoop(); + +protected: +//------------------------------------------------------------------------------ +// [Protected] Utilities methods +//------------------------------------------------------------------------------ + +//------------------------------------------------------------------------------ +// [Protected] Class variables +//------------------------------------------------------------------------------ + //Event buffer shared pointer + EventBuffer::SP m_eventBuffer_sp; + + //Configuration shared pointer + Configuration::SP m_configuration_sp; +}; + +} //End of namespace + +#endif /*!WORKER_THREAD_H*/