From 207b9160dbeb2fca2682d6b5eaefaf3cfb391c0d Mon Sep 17 00:00:00 2001
From: Marco De Marco <demarco@oats.inaf.it>
Date: Tue, 4 Feb 2014 23:18:18 +0100
Subject: [PATCH] Worker thread, event buffer, event thread and Configuration
 added

---
 Makefile             |   3 +-
 src/ClassFactory.cpp |   1 -
 src/Configuration.h  |  83 ++++++++++++++++
 src/EventBuffer.cpp  | 166 +++++++++++++++++++++++++++++++
 src/EventBuffer.h    |  80 +++++++++++++++
 src/EventThread.cpp  | 232 +++++++++++++++++++++++++++++++++++++++++++
 src/EventThread.h    | 116 ++++++++++++++++++++++
 src/WorkerThread.cpp | 138 +++++++++++++++++++++++++
 src/WorkerThread.h   |  49 +++++++++
 9 files changed, 866 insertions(+), 2 deletions(-)
 create mode 100644 src/Configuration.h
 create mode 100644 src/EventBuffer.cpp
 create mode 100644 src/EventBuffer.h
 create mode 100644 src/EventThread.cpp
 create mode 100644 src/EventThread.h
 create mode 100644 src/WorkerThread.cpp
 create mode 100644 src/WorkerThread.h

diff --git a/Makefile b/Makefile
index 3b43a94..f17edc3 100644
--- a/Makefile
+++ b/Makefile
@@ -16,7 +16,8 @@ CC=g++
 CXX_DEBUG_FLAGS=-g -DVERBOSE_DEBUG
 CXX_RELEASE_FLAGS=-O3
 CXX_DEFAULT_FLAGS=-c -Wall -Wextra -std=c++11 -std=gnu++11
-LDFLAGS=-Wall -lomniORB4 -lomniDynamic4 -lCOS4 -lomnithread -ltango -llog4tango
+LDFLAGS=-Wall -lomniORB4 -lomniDynamic4 -lCOS4 -lomnithread -ltango -llog4tango \
+	-lboost_thread -lboost_filesystem -lboost_system
 INC_PARM=$(foreach d, $(INC_DIR), -I$d)
 LIB_PARM=$(foreach d, $(LIB_DIR), -L$d)
 #================================================================================
diff --git a/src/ClassFactory.cpp b/src/ClassFactory.cpp
index b0d490f..6ed70cf 100644
--- a/src/ClassFactory.cpp
+++ b/src/ClassFactory.cpp
@@ -1,5 +1,4 @@
 /*----- PROTECTED REGION ID(PreProcessor::ClassFactory.cpp) ENABLED START -----*/
-static const char *RcsId = "$Id:  $";
 //=============================================================================
 //
 // file :        ClassFactory.cpp
diff --git a/src/Configuration.h b/src/Configuration.h
new file mode 100644
index 0000000..4c3f608
--- /dev/null
+++ b/src/Configuration.h
@@ -0,0 +1,83 @@
+#ifndef CONFIGURATION_H
+#define CONFIGURATION_H
+
+#include <iostream>
+#include <stdint.h>
+#include <vector>
+#include <boost/shared_ptr.hpp>
+
+namespace PreProcessor_ns
+{
+
+class Configuration
+{
+public:
+//------------------------------------------------------------------------------
+//	[Public] Shared pointer typedef
+//------------------------------------------------------------------------------
+	typedef boost::shared_ptr<Configuration> SP;
+
+private:
+//------------------------------------------------------------------------------
+//	[Private] Constructor destructor deleter
+//------------------------------------------------------------------------------
+	Configuration(std::string watchPath, int workerNumber, int sleepTime, int waitTime,
+		int connectionNumber, uint32_t iNotifyMask): m_watchPath(watchPath),
+		m_workerNumber(workerNumber), m_sleepTime(sleepTime), m_waitTime(waitTime),
+		m_connectionNumber(connectionNumber), m_iNotifyMask(iNotifyMask) {}
+	virtual ~Configuration() {}
+
+	class Deleter;
+	friend class Deleter;
+	class Deleter
+	{
+	public:
+		void operator()(Configuration* c) { delete c; }
+	};
+
+public:
+//------------------------------------------------------------------------------
+//	[Public] User methods
+//------------------------------------------------------------------------------
+	static Configuration::SP create(std::string watchPath, int workerNumber,
+        int sleepTime, int waitTime, int connectionNumber, uint32_t iNotifyMask)
+	{
+		Configuration::SP c_sp(new Configuration(watchPath, workerNumber, sleepTime,
+			 waitTime, connectionNumber, iNotifyMask), Configuration::Deleter());
+
+		return c_sp;
+	}
+
+	std::string getWatchPath() const { return m_watchPath; }
+	unsigned int getWorkerNumber() const { return m_workerNumber; }
+    unsigned int getSleepTime() const { return m_sleepTime; }
+	unsigned int getWaitTime() const { return m_waitTime; }
+	unsigned int getConnectionNumber() const { return m_connectionNumber; }
+	uint32_t getINotifyMask() const { return m_iNotifyMask; }
+
+private:
+//------------------------------------------------------------------------------
+//	[Private] class variables
+//------------------------------------------------------------------------------
+	//INotify watch path
+	const std::string m_watchPath;
+
+	//Worker thread number
+	const unsigned int m_workerNumber;
+
+	//Event thread sleep time
+	const unsigned int m_sleepTime;
+
+	//Worker thread wait time
+	const unsigned int m_waitTime;
+
+	//Number of connection per destination
+	const unsigned int m_connectionNumber;
+
+	//INotify mask
+	const uint32_t m_iNotifyMask;
+};
+
+}   //End of namespace
+
+#endif /*!CONFIGURATION_H*/
diff --git a/src/EventBuffer.cpp b/src/EventBuffer.cpp
new file mode 100644
index 0000000..0499e6d
--- /dev/null
+++ b/src/EventBuffer.cpp
@@ -0,0 +1,166 @@
+#include <EventBuffer.h>
+
+#include <boost/thread/locks.hpp>
+
+namespace PreProcessor_ns
+{
+
+//==============================================================================
+//	EventBuffer::EventBuffer()
+//==============================================================================
+EventBuffer::EventBuffer(Tango::DeviceImpl* deviceImpl_p) :
+	Tango::LogAdapter(deviceImpl_p)
+{
+		DEBUG_STREAM << "EventBuffer::EventBuffer()" << endl;
+}
+
+//==============================================================================
+//	EventBuffer::~EventBuffer()
+//==============================================================================
+EventBuffer::~EventBuffer()
+{
+	DEBUG_STREAM << "EventBuffer::~EventBuffer()" << endl;
+}
+
+//==============================================================================
+//	EventBuffer::insertNew()
+//==============================================================================
+EventBuffer::SP EventBuffer::create(Tango::DeviceImpl* deviceImpl_p)
+{
+    EventBuffer::SP e_sp(new EventBuffer(deviceImpl_p),
+        EventBuffer::Deleter());
+
+    return e_sp;
+}
+
+//==============================================================================
+//	EventBuffer::insertNew()
+//==============================================================================
+void EventBuffer::insertNew(boost::filesystem::path path)
+{
+    DEBUG_STREAM << "EventBuffer::insertNew()" << endl;
+
+	boost::mutex::scoped_lock lock(m_mutex);
+
+    bool inserted = m_buffer.insert(
+        std::pair<boost::filesystem::path, EventStatus>(path, UNPROCESSED) ).second;
+
+	if(inserted)
+    {
+        DEBUG_STREAM << "EventBuffer::insertNew() element "
+            << path.string() << " inserted" << endl;
+
+        lock.unlock();
+
+        m_conditionVariable.notify_all();
+    }
+    else
+        WARN_STREAM << "EventBuffer::insertNew() element "
+            << path.string() << " duplicated" << endl;
+}
+
+//==============================================================================
+//	EventBuffer::waitNew()
+//==============================================================================
+boost::filesystem::path EventBuffer::waitNew()
+{
+    DEBUG_STREAM << "EventBuffer::waitNew()" << endl;
+
+	boost::mutex::scoped_lock lock(m_mutex);
+
+	do
+	{
+		std::map<boost::filesystem::path, EventStatus>::iterator it;
+		for(it=m_buffer.begin(); it!=m_buffer.end(); it++)
+		{
+			if(it->second == UNPROCESSED)
+			{
+                DEBUG_STREAM << "EventBuffer::waitNew() found new element:"
+                    << it->first.string() << endl;
+
+				it->second = ASSIGNED;
+				return it->first;
+			}
+		}
+
+        DEBUG_STREAM << "EventBuffer::waitNew() waiting new element" << endl;
+
+		m_conditionVariable.wait(lock);
+	}
+	while(true);
+}
+
+//==============================================================================
+//	EventBuffer::markAsProcessed()
+//==============================================================================
+void EventBuffer::markAsProcessed(boost::filesystem::path path)
+{
+    DEBUG_STREAM << "EventBuffer::markAsProcessed()" << endl;
+
+    boost::mutex::scoped_lock lock(m_mutex);
+
+    std::map<boost::filesystem::path, EventStatus>::iterator it;
+
+	it = m_buffer.find(path);
+
+	if(it != m_buffer.end())
+    {
+        switch(it->second)
+        {
+            case UNPROCESSED:
+                ERROR_STREAM << "EventBuffer::markAsProcessed() element "
+                    << path.string() << " is marked not processed" << endl;
+                break;
+            case ASSIGNED:
+                it->second = PROCESSED;
+                DEBUG_STREAM << "EventBuffer::markAsProcessed() element "
+                    << path.string() << " marked as processed" << endl;
+                break;
+            case PROCESSED:
+                ERROR_STREAM << "EventBuffer::markAsProcessed() element "
+                    << path.string() << " already marked as processed" << endl;
+                break;
+        }
+    }
+    else
+        ERROR_STREAM << "EventBuffer::markAsProcessed() element"
+            << path.string() << " not found" << endl;
+}
+
+//==============================================================================
+//	EventBuffer::removeAllProcessed()
+//==============================================================================
+void EventBuffer::removeAllProcessed()
+{
+    DEBUG_STREAM << "EventBuffer::removeAllProcessed()" << endl;
+
+    boost::mutex::scoped_lock lock(m_mutex);
+
+    std::map<boost::filesystem::path, EventStatus>::iterator it;
+    for(it=m_buffer.begin(); it!=m_buffer.end(); ++it)
+    {
+        if(it->second == PROCESSED)
+        {
+            DEBUG_STREAM << "EventBuffer::removeAllProcessed() element "
+                << it->first << "will be removed" << endl;
+
+            std::map<boost::filesystem::path, EventStatus>::iterator to_delete = it;
+
+            m_buffer.erase(to_delete);
+        }
+    }
+}
+
+//==============================================================================
+//	EventBuffer::size()
+//==============================================================================
+std::size_t EventBuffer::size()
+{
+    DEBUG_STREAM << "EventBuffer::size()" << endl;
+
+	boost::mutex::scoped_lock lock(m_mutex);
+
+	return m_buffer.size();
+}
+
+}   //namespace
diff --git a/src/EventBuffer.h b/src/EventBuffer.h
new file mode 100644
index 0000000..2433079
--- /dev/null
+++ b/src/EventBuffer.h
@@ -0,0 +1,80 @@
+#ifndef EVENT_BUFFER_H
+#define EVENT_BUFFER_H
+
+#include <tango.h>
+
+#include <map>
+#include <vector>
+#include <iostream>
+#include <stdexcept>
+
+#include <boost/shared_ptr.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition_variable.hpp>
+#include <boost/filesystem.hpp>
+
+namespace PreProcessor_ns
+{
+
+class EventBuffer : public Tango::LogAdapter
+{
+public:
+//------------------------------------------------------------------------------
+//	[Public] Shared pointer typedef
+//------------------------------------------------------------------------------
+	typedef boost::shared_ptr<EventBuffer> SP;
+
+protected:
+//------------------------------------------------------------------------------
+//	[Protected] Constructor destructor deleter
+//------------------------------------------------------------------------------
+	EventBuffer(Tango::DeviceImpl*);
+
+	virtual ~EventBuffer();
+
+	class Deleter;
+	friend class Deleter;
+	class Deleter
+	{
+	public:
+		void operator()(EventBuffer* e) { delete e; }
+	};
+
+public:
+//------------------------------------------------------------------------------
+//	[Public] Users methods
+//------------------------------------------------------------------------------
+	static EventBuffer::SP create(Tango::DeviceImpl*);
+
+	virtual void insertNew(boost::filesystem::path);
+
+	virtual boost::filesystem::path waitNew();
+
+    virtual void markAsProcessed(boost::filesystem::path);
+
+    virtual void removeAllProcessed();
+
+	virtual std::size_t size();
+
+protected:
+//------------------------------------------------------------------------------
+//	[Protected] Event status enumeration
+//------------------------------------------------------------------------------
+    enum EventStatus { UNPROCESSED=0, ASSIGNED=1, PROCESSED=2 };
+
+//------------------------------------------------------------------------------
+//	[Protected] Class variables
+//------------------------------------------------------------------------------
+	//Access synchronization mutex
+	boost::mutex m_mutex;
+
+	//Access synchronization condition variable
+	boost::condition_variable m_conditionVariable;
+
+	//File buffer
+	std::map<boost::filesystem::path, EventStatus> m_buffer;
+};
+
+}   //End of namespace
+
+#endif /*!EVENT_BUFFER_H*/
diff --git a/src/EventThread.cpp b/src/EventThread.cpp
new file mode 100644
index 0000000..85efd95
--- /dev/null
+++ b/src/EventThread.cpp
@@ -0,0 +1,232 @@
+#include <EventThread.h>
+#include <PreProcessor.h>
+#include <WorkerThread.h>
+
+#include <cassert>
+#include <unistd.h>
+#include <fcntl.h>
+#include <cerrno>
+
+#include <boost/filesystem.hpp>
+#include <boost/scoped_ptr.hpp>
+
+namespace PreProcessor_ns
+{
+
+//==============================================================================
+//	EventThread::EventThread()
+//==============================================================================
+EventThread::EventThread(Tango::DeviceImpl* deviceImpl_p,
+    Configuration::SP configuration_sp) : Tango::LogAdapter(deviceImpl_p),
+    m_configuration_sp(configuration_sp)
+{
+	DEBUG_STREAM << "EventThread::EventThread()" << endl;
+
+    m_state = Tango::OFF;
+    m_status = "Event thread not running";
+}
+
+//==============================================================================
+//	EventThread::~EventThread()
+//==============================================================================
+EventThread::~EventThread()
+{
+	DEBUG_STREAM << "EventThread::~EventThread()" << endl;
+
+	if(m_threadGroup_sp)
+	{
+		m_threadGroup_sp->interrupt_all();
+
+		m_threadGroup_sp->join_all();
+	}
+}
+
+//==============================================================================
+//	EventThread::create()
+//==============================================================================
+EventThread::SP EventThread::create(Tango::DeviceImpl* deviceImpl_p,
+    Configuration::SP configuration_sp)
+{
+	EventThread::SP e_sp(new EventThread(deviceImpl_p, configuration_sp),
+        EventThread::Deleter());
+
+	return e_sp;
+}
+
+//==============================================================================
+//	EventThread::start()
+//==============================================================================
+void EventThread::start()
+{
+	DEBUG_STREAM << "EventThread::start()" << endl;
+
+    try
+    {
+
+    }
+    catch(std::exception& ex)
+    {
+        std::stringstream error_stream;
+        error_stream << "Event thread not running " << ex.what();
+        writeState(Tango::ALARM);
+        writeStatus(error_stream.str());
+    }
+    catch(...)
+    {
+        writeState(Tango::ALARM);
+        writeStatus("Event thread unknown exception");
+    }
+}
+
+//==============================================================================
+//	EventThread::stop()
+//==============================================================================
+void EventThread::stop()
+{
+	DEBUG_STREAM << "EventThread::stop()" << endl;
+
+	if(m_threadGroup_sp)
+	{
+		m_threadGroup_sp->interrupt_all();
+
+		m_threadGroup_sp->join_all();
+	}
+
+	inotify_rm_watch(m_fileDescriptor, m_watchDescriptor);
+
+	if(m_fileDescriptor) { close(m_fileDescriptor); }
+}
+
+//==============================================================================
+//	EventThread::readState()
+//==============================================================================
+Tango::DevState EventThread::readState()
+{
+    DEBUG_STREAM << "EventThread::readState()" << endl;
+
+    boost::mutex::scoped_lock stateLock(m_stateMutex);
+
+    return m_state;
+}
+
+//==============================================================================
+//	EventThread::readStatus()
+//==============================================================================
+std::string EventThread::readStatus()
+{
+    DEBUG_STREAM << "EventThread::readStatus()" << endl;
+
+    boost::mutex::scoped_lock statusLock(m_statusMutex);
+
+    return m_status;
+}
+
+//==============================================================================
+//      EventThread::writeState()
+//==============================================================================
+void EventThread::writeState(Tango::DevState state)
+{
+    DEBUG_STREAM << "Client::writeState()" << endl;
+
+    boost::mutex::scoped_lock stateLock(m_stateMutex);
+
+    m_state = state;
+}
+
+//==============================================================================
+//      EventThread::writeStatus()
+//==============================================================================
+void EventThread::writeStatus(std::string status)
+{
+    DEBUG_STREAM << "Client::writeStatus()" << endl;
+
+    boost::mutex::scoped_lock statusLock(m_statusMutex);
+
+    m_status = status;
+}
+
+//==============================================================================
+//	EventThread::eventLoop()
+//==============================================================================
+void EventThread::eventLoop()
+{
+	DEBUG_STREAM << "EventThread::eventLoop() starting loop" << endl;
+
+	unsigned int sleepTime = m_configuration_sp->getSleepTime();
+	boost::filesystem::path watchPath(m_configuration_sp->getWatchPath());
+
+	while(true)
+	{
+		try
+		{
+			char buffer[BUF_LEN];
+			int length = 0;
+
+			if((length = read( m_fileDescriptor, buffer, BUF_LEN )) < 0)
+			{
+				if(errno != EINTR && errno != EAGAIN)
+				{
+                    writeState(Tango::ALARM);
+                    writeStatus("Event thread error on watch path read");
+				}
+				else
+                {
+                    writeState(Tango::ON);
+                    writeStatus("Event thread running");
+                }
+			}
+			else
+            {
+                writeState(Tango::ON);
+                writeStatus("Event thread new data found");
+            }
+
+            struct inotify_event *event;
+            for(int i=0; i<length; i += EVENT_SIZE + event->len)
+            {
+                event = ( struct inotify_event * ) &buffer[ i ];
+
+                DEBUG_STREAM << "EVENT: " << event->name << endl; //TODO: delete me
+
+                //Add path to file name
+                boost::filesystem::path file(event->name);
+                boost::filesystem::path path(watchPath);
+                path /= file;
+
+                //Check if event is a regular file
+                if(boost::filesystem::is_regular_file(path))
+                    m_eventBuffer_sp->insertNew(path);
+            }
+
+            m_eventBuffer_sp->removeAllProcessed();
+
+            DEBUG_STREAM << "EventThread::eventLoop() sleep for " << sleepTime << endl;
+
+			boost::posix_time::milliseconds sleepPosixTime(sleepTime);
+			boost::this_thread::sleep(sleepPosixTime);
+		}
+		catch(boost::thread_interrupted& ex)
+		{
+            DEBUG_STREAM << "EventThread::eventLoop() stopping loop" << endl;
+
+            writeState(Tango::OFF);
+            writeStatus("Event thread not running");
+
+			break;
+		}
+        catch(std::exception& ex)
+        {
+            std::stringstream error_stream;
+            error_stream << "Event thread not running " << ex.what();
+            writeState(Tango::ALARM);
+            writeStatus(error_stream.str());
+        }
+		catch(...)
+		{
+            writeState(Tango::ALARM);
+            writeStatus("Event thread unknown exception");
+		}
+	} //while
+}
+
+}   //namespace
diff --git a/src/EventThread.h b/src/EventThread.h
new file mode 100644
index 0000000..5f8772f
--- /dev/null
+++ b/src/EventThread.h
@@ -0,0 +1,116 @@
+#ifndef EVENT_THREAD_H
+#define EVENT_THREAD_H
+
+#include <Configuration.h>
+#include <EventBuffer.h>
+
+#include <tango.h>
+
+#include <stdexcept>
+#include <sys/inotify.h>
+
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+
+namespace PreProcessor_ns
+{
+
+class FitsImporter;
+
+class EventThread : public Tango::LogAdapter
+{
+public:
+//------------------------------------------------------------------------------
+//	[Public] Shared pointer typedef
+//------------------------------------------------------------------------------
+	typedef boost::shared_ptr<EventThread> SP;
+
+protected:
+//------------------------------------------------------------------------------
+//	[Protected] Constructor destructor deleter
+//------------------------------------------------------------------------------
+	EventThread(Tango::DeviceImpl*, Configuration::SP);
+
+	virtual ~EventThread();
+
+	class Deleter;
+	friend class Deleter;
+	class Deleter
+	{
+	public:
+		void operator()(EventThread* e) { delete e; }
+	};
+
+public:
+//------------------------------------------------------------------------------
+//	[Public] Class creation method
+//------------------------------------------------------------------------------
+	static EventThread::SP create(Tango::DeviceImpl*, Configuration::SP);
+
+//------------------------------------------------------------------------------
+//  [Public] Thread management methods
+//------------------------------------------------------------------------------
+	virtual void start();
+
+	virtual void stop();
+
+//------------------------------------------------------------------------------
+//  [Public] Read state and status methods
+//------------------------------------------------------------------------------
+    virtual Tango::DevState readState();
+
+    virtual std::string readStatus();
+
+protected:
+//------------------------------------------------------------------------------
+//  [Protected] Write state and status methods
+//------------------------------------------------------------------------------
+    virtual void writeState(Tango::DevState);
+
+    virtual void writeStatus(std::string);
+
+//------------------------------------------------------------------------------
+//	[Protected] Utilities methods
+//------------------------------------------------------------------------------
+    virtual void eventLoop();
+
+//------------------------------------------------------------------------------
+//	[Protected] Class variables
+//------------------------------------------------------------------------------
+    //Tango server class pointer
+    FitsImporter* m_fitsImporter_p;
+
+	//Boost thread group shared pointer
+	boost::scoped_ptr<boost::thread_group> m_threadGroup_sp;
+
+    //Configuration shared pointer
+    Configuration::SP m_configuration_sp;
+
+	//Event buffer shared pointer
+	EventBuffer::SP m_eventBuffer_sp;
+
+    //Tango state synchronization
+    boost::mutex m_stateMutex;
+
+    //Tango state variable
+    Tango::DevState m_state;
+
+    //Tango status synchronization
+    boost::mutex m_statusMutex;
+
+    //Tango status variable
+    std::string m_status;
+
+    //INotify file and watch descriptors
+    int m_fileDescriptor, m_watchDescriptor;
+
+    //INotify event size
+    static const std::size_t EVENT_SIZE = ( sizeof (struct inotify_event) );
+
+    //INotify event buffer length
+    static const std::size_t BUF_LEN = ( 1024 * ( EVENT_SIZE + 16 ) );
+};
+
+}	//	End of namespace
+
+#endif /*!EVENT_THREAD_H*/
diff --git a/src/WorkerThread.cpp b/src/WorkerThread.cpp
new file mode 100644
index 0000000..47a9e7b
--- /dev/null
+++ b/src/WorkerThread.cpp
@@ -0,0 +1,138 @@
+#include <WorkerThread.h>
+
+#include <cassert>
+#include <fstream>
+
+#include <tango.h>
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/iostreams/filtering_streambuf.hpp>
+#include <boost/iostreams/copy.hpp>
+#include <boost/iostreams/filter/gzip.hpp>
+#include <boost/regex.hpp>
+#include <boost/lexical_cast.hpp>
+
+namespace PreProcessor_ns
+{
+
+//==============================================================================
+//	WorkerThread::WorkerThread()
+//==============================================================================
+WorkerThread::WorkerThread(Tango::DeviceImpl* deviceImpl_p,
+    EventBuffer::SP eventBuffer_sp) : Tango::LogAdapter(deviceImpl_p)
+{
+	DEBUG_STREAM << "WorkerThread::WorkerThread()" << std::endl;
+}
+
+//==============================================================================
+//	WorkerThread::~WorkerThread()
+//==============================================================================
+WorkerThread::~WorkerThread()
+{
+	DEBUG_STREAM << "WorkerThread::~WorkerThread()" << std::endl;
+}
+
+//==============================================================================
+//	WorkerThread::workerLoop()()
+//==============================================================================
+void WorkerThread::workerLoop()
+{
+    DEBUG_STREAM << "WorkerThread::workerLoop(): starting loop" << endl;
+
+	unsigned int waitTime = m_configuration_sp->getWaitTime();
+	std::string watchPath = m_configuration_sp->getWatchPath();
+
+//    while(true)
+//    {
+//        try
+//        {
+//            //Wait new file event to process
+//            boost::filesystem::path origPath(m_eventBuffer_sp->waitNew());
+//            std::string fileName = origPath.stem().string();
+//
+//            //If configured wait after new file event
+//            boost::posix_time::milliseconds waitPosixTime(waitTime);
+//            boost::this_thread::sleep( waitPosixTime );
+//
+//            bool completed = false;
+//
+//            INFO_STREAM << "WorkerThread::workerLoop() processing \""
+//                << fileName << "\"" << endl;
+//
+//            try
+//            {
+//                //Open fits file
+//                boost::shared_ptr<CCfits::FITS> fitsFile_sp(
+//                    new CCfits::FITS(origPath.string(), CCfits::Write));
+//
+//                //Read all key in primary HDU
+//                CCfits::PHDU& phdu = fitsFile_sp->pHDU();
+//                phdu.readAllKeys();
+//
+//                try
+//                {
+//                    //Try to ingest using an instrument in list
+//                    ingestUsingInstrumentList(origPath, fitsFile_sp);
+//
+//                    completed = true;
+//                    m_fitsImporter_p->incrementRegularCounter();
+//                    INFO_STREAM << "WorkerThread::workerLoop() \"" << fileName
+//                        << "\" archived regularly" << endl;
+//                }
+//                catch(CCfits::FitsException& ex)
+//                {
+//                    ERROR_STREAM << "WorkerThread::workerLoop() " << ex.message() << endl;
+//                }
+//                catch(std::runtime_error& ex)
+//                {
+//                    ERROR_STREAM << "WorkerThread::workerLoop() " << ex.what() << endl;
+//                }
+//
+//                if(!completed)
+//                {
+//                    //Try to ingest using default instrument
+//                    ingestUsingDefaultInstrument(origPath, fitsFile_sp);
+//
+//                    completed = true;
+//                    m_fitsImporter_p->incrementWarningCounter();
+//                    WARN_STREAM << "WorkerThread::workerLoop() \"" << fileName
+//                        << "\" archived in default instrument" << endl;
+//                }
+//            }
+//            catch(CCfits::FitsException& ex)
+//            {
+//                ERROR_STREAM << "WorkerThread::workerLoop() " << ex.message() << endl;
+//            }
+//            catch(std::runtime_error& ex)
+//            {
+//                ERROR_STREAM << "WorkerThread::workerLoop() " << ex.what() << endl;
+//            }
+//
+//            if(!completed)
+//            {
+//                //Cannot ingest new file => notify error
+//                m_fitsImporter_p->incrementErrorCounter();
+//                ERROR_STREAM << "WorkerThread::workerLoop() \"" << fileName
+//                    << "\" not archived" << endl;
+//            }
+//
+//            m_eventBuffer_sp->markAsProcessed(origPath);
+//        }
+//		catch(boost::thread_interrupted& ex)
+//		{
+//            DEBUG_STREAM << "WorkerThread::workerLoop() interrupt" << endl;
+//
+//			break;
+//		}
+//        catch(std::exception& ex)
+//        {
+//            ERROR_STREAM << "WorkerThread::workerLoop() " << ex.what() << endl;
+//        }
+//		catch(...)
+//		{
+//            ERROR_STREAM << "WorkerThread::workerLoop() unknown exception" << endl;
+//		}
+//    } //while
+}
+
+}   //namespace
diff --git a/src/WorkerThread.h b/src/WorkerThread.h
new file mode 100644
index 0000000..10df780
--- /dev/null
+++ b/src/WorkerThread.h
@@ -0,0 +1,49 @@
+#ifndef WORKER_THREAD_H
+#define WORKER_THREAD_H
+
+#include <Configuration.h>
+#include <EventBuffer.h>
+
+#include <tango.h>
+
+#include <stdexcept>
+#include <sstream>
+
+#include <boost/shared_ptr.hpp>
+
+namespace PreProcessor_ns
+{
+
+class WorkerThread : public Tango::LogAdapter
+{
+public:
+//------------------------------------------------------------------------------
+//	[Public] Constructor destructor
+//------------------------------------------------------------------------------
+	WorkerThread(Tango::DeviceImpl*, EventBuffer::SP);
+
+	virtual ~WorkerThread();
+
+//------------------------------------------------------------------------------
+//	[Public] Users method
+//------------------------------------------------------------------------------
+	virtual void workerLoop();
+
+protected:
+//------------------------------------------------------------------------------
+//	[Protected] Utilities methods
+//------------------------------------------------------------------------------
+
+//------------------------------------------------------------------------------
+//	[Protected] Class variables
+//------------------------------------------------------------------------------
+	//Event buffer shared pointer
+	EventBuffer::SP m_eventBuffer_sp;
+
+	//Configuration shared pointer
+	Configuration::SP m_configuration_sp;
+};
+
+}   //End of namespace
+
+#endif /*!WORKER_THREAD_H*/
-- 
GitLab