From 6c9ff03357727ff35877d1dee1bc4548dbf0e9e7 Mon Sep 17 00:00:00 2001 From: Marco De Marco <demarco@oats.inaf.it> Date: Tue, 11 Mar 2014 08:52:28 +0100 Subject: [PATCH] File manager and script completed, before test --- Makefile | 6 +- script/fits.sh | 104 ++++++++++++++++++++ script/script.sh | 67 ------------- src/EventThread.cpp | 32 +++--- src/EventThread.h | 2 - src/PreProcessor.cpp | 60 +++++++----- src/PreProcessor.h | 6 +- src/ScriptManager.cpp | 112 +++++++++++++++++---- src/ScriptManager.h | 12 ++- src/WorkerThread.cpp | 220 ++++++++++++++++++++++-------------------- src/WorkerThread.h | 13 ++- 11 files changed, 386 insertions(+), 248 deletions(-) create mode 100755 script/fits.sh delete mode 100755 script/script.sh diff --git a/Makefile b/Makefile index f17edc3..776eea4 100644 --- a/Makefile +++ b/Makefile @@ -7,17 +7,19 @@ INSTALL_DIR=/usr/local/bin INC_DIR=/usr/local/omniORB/include \ /usr/local/zeromq/include/zmq \ /usr/local/tango/include/tango \ + /usr/local/boost/include \ ./src LIB_DIR=/usr/local/omniORB/lib \ /usr/local/zeromq/lib \ - /usr/local/tango/lib + /usr/local/tango/lib \ + /usr/local/boost/lib #================================================================================ CC=g++ CXX_DEBUG_FLAGS=-g -DVERBOSE_DEBUG CXX_RELEASE_FLAGS=-O3 CXX_DEFAULT_FLAGS=-c -Wall -Wextra -std=c++11 -std=gnu++11 LDFLAGS=-Wall -lomniORB4 -lomniDynamic4 -lCOS4 -lomnithread -ltango -llog4tango \ - -lboost_thread -lboost_filesystem -lboost_system + -lboost_thread -lboost_filesystem -lboost_system -lboost_chrono INC_PARM=$(foreach d, $(INC_DIR), -I$d) LIB_PARM=$(foreach d, $(LIB_DIR), -L$d) #================================================================================ diff --git a/script/fits.sh b/script/fits.sh new file mode 100755 index 0000000..0f8c354 --- /dev/null +++ b/script/fits.sh @@ -0,0 +1,104 @@ +#!/bin/bash + +#: Title : fits.sh +#: Date : 2014/03/03 +#: Author : "Marco De Marco" <demarco@oats.inaf.it> +#: Version : 0.1 +#: Description : Fits verification and preproccessing script +#: Usage : +#: Response : + + +if [ "$1" == "CHECK" ]; then + + #: Section : CHECK + #: Parameter : none + #: Response : CHECK OK + #: : CHECK FATAL + + VERIFY_TOOL="/usr/local/bin/fitsverify" + CHECK_STRING="conform to the FITS format" + + res=$($VERIFY_TOOL 2>&1) + check=$(echo $res | grep "$CHECK_STRING" | wc | awk '{print $1}') + if [ "$check" -ge "1" ]; then + echo "CHECK OK" + else + echo "CHECK FATAL" + fi + exit 0 + +elif [ "$1" == "VERIFY" ]; then + + #: Section : VERIFY + #: Parameter : file path + #: Response : VERIFY OK + #: : VERIFY WAIT + #: : VERIFY FATAL + + VERIFY_TOOL="/usr/local/bin/fitsverify" + FATAL_ERROR="Fatal" + EOF_ERROR="End-of-file" + + file=$2 + file_name=${file##*/} + + #Check regular expression -> fatal + if [[ ! "${file_name,,}" =~ ^[^\.].*\.(fits|fit|fts).*$ ]]; then + echo "VERIFY FATAL : error on regular expression" + exit 0 + fi + + #if fits verify tools exists -> fatal + if [ ! -x $VERIFY_TOOL ]; then + echo "VERIFY FATAL : verify tools not exists" + exit 0 + fi + + #if fits file not exists -> fatal + if [ ! -f $file ]; then + echo "VERIFY FATAL : file not exists" + exit 0 + fi + + #Check with fits verify + res=$($VERIFY_TOOL $file 2>&1) + + #if fitsverify return fatal error -> wait + fatal=$(echo $res | grep "$FATAL_ERROR" | wc | awk '{print $1}') + if [ "$fatal" -ge "1" ]; then + echo "VERIFY WAIT" + exit 0 + fi + + #if fitsverify return end of file -> wait + eof=$(echo $res | grep "$EOF_ERROR" | wc | awk '{print $1}') + if [ "$eof" -ge "1" ]; then + echo "VERIFY WAIT" + exit 0 + fi + + #else -> ok + echo "VERIFY OK" + exit 0 + +elif [ "$1" == "PROCESS" ]; then + + #: Section : PROCESS + #: Parameter : file path + #: Response : PROCESS OK + #: : PROCESS FATAL + + echo "PROCESS OK" + exit 0 + +else + + #: Section : DEFAULT + #: Parameter : none + #: Response : UNKNOWN + + echo "UNKNOWN" + exit 0 + +fi diff --git a/script/script.sh b/script/script.sh deleted file mode 100755 index fb3caf5..0000000 --- a/script/script.sh +++ /dev/null @@ -1,67 +0,0 @@ -#!/bin/bash - -#----------------------------------------------------------------------- -# User parameters -#----------------------------------------------------------------------- -#Verify tool path -VERIFY_TOOL="/home/mdm/workspace/nexecs/test/tools/fitsverify" -CHECK_STRING="conform to the FITS format" - -FATAL_ERROR="Fatal" -EOF_ERROR="End-of-file" - -#----------------------------------------------------------------------- -# Verify script -#----------------------------------------------------------------------- -if [ "$1" == "CHECK" ]; then - res=$($VERIFY_TOOL 2>&1) - check=$(echo $res | grep "$CHECK_STRING" | wc | awk '{print $1}') - if [ "$check" -ge "1" ]; then - echo "CHECK OK" - else - echo "NOT OK" - fi - exit 0 -else - #Check regular expression -> fatal - file=$1 - file_name=${file##*/} - if [[ ! "${file_name,,}" =~ ^[^\.].*\.(fits|fts).*$ ]]; then - echo "FATAL" - exit 0 - fi - - #if fits verify tools exists -> fatal - if [ ! -x $VERIFY_TOOL ]; then - echo "FATAL" - exit 0 - fi - - #if fits file not exists -> fatal - if [ ! -f $1 ]; then - echo "FATAL" - exit 0 - fi - - #Check with fits verify - res=$($VERIFY_TOOL $1 2>&1) - - #if fitsverify return fatal error -> wait - fatal=$(echo $res | grep "$FATAL_ERROR" | wc | awk '{print $1}') - if [ "$fatal" -ge "1" ]; then - echo "WAIT" - exit 0 - fi - - #if fitsverify return end of file -> wait - eof=$(echo $res | grep "$EOF_ERROR" | wc | awk '{print $1}') - if [ "$eof" -ge "1" ]; then - echo "WAIT" - exit 0 - fi - - #else -> ok - echo "OK" - exit 0 -fi -#----------------------------------------------------------------------- diff --git a/src/EventThread.cpp b/src/EventThread.cpp index d308dc3..35ab559 100644 --- a/src/EventThread.cpp +++ b/src/EventThread.cpp @@ -1,5 +1,6 @@ #include <EventThread.h> #include <PreProcessor.h> +#include <ScriptManager.h> #include <WorkerThread.h> #include <cassert> @@ -9,6 +10,7 @@ #include <boost/filesystem.hpp> #include <boost/scoped_ptr.hpp> +#include <boost/chrono.hpp> namespace PreProcessor_ns { @@ -62,8 +64,6 @@ void EventThread::start() try { - initScriptManager(); - initEventBuffer(); initINotify(); @@ -151,14 +151,6 @@ void EventThread::writeStatus(std::string status) m_status = status; } -//============================================================================== -// EventThread::initScriptManager() -//============================================================================== -void EventThread::initScriptManager() throw(std::runtime_error) -{ - DEBUG_STREAM << "EventThread::initScriptManager()" << endl; -} - //============================================================================== // EventThread::initEventBuffer() //============================================================================== @@ -228,12 +220,15 @@ void EventThread::initThreadGroup() throw(std::runtime_error) { DEBUG_STREAM << "EventThread::initThreadGroup()" << endl; - m_threadGroup_sp.reset(new boost::thread_group); + ScriptManager::SP fileManager_sp = + ScriptManager::create(m_preProcessor_p, m_configuration_sp); + + fileManager_sp->checkScriptCompliance(); - unsigned int workerNumber = m_configuration_sp->getWorkerNumber(); + WorkerThread worker(m_preProcessor_p, m_eventBuffer_sp, + fileManager_sp, m_configuration_sp); - //Create a worker thread and pass all arguments - WorkerThread worker(m_preProcessor_p, m_eventBuffer_sp, m_configuration_sp); + m_threadGroup_sp.reset(new boost::thread_group); try { @@ -241,6 +236,8 @@ void EventThread::initThreadGroup() throw(std::runtime_error) m_threadGroup_sp->add_thread( new boost::thread(&EventThread::eventLoop, this)); + unsigned int workerNumber = m_configuration_sp->getWorkerNumber(); + //Add to thread group worker threads for(unsigned int i=0; i<workerNumber; i++) m_threadGroup_sp->add_thread( @@ -295,8 +292,6 @@ void EventThread::eventLoop() { event = ( struct inotify_event * ) &buffer[ i ]; - DEBUG_STREAM << "EVENT: " << event->name << endl; //TODO: delete me - //Add path to file name boost::filesystem::path file(event->name); boost::filesystem::path path(watchPath); @@ -311,8 +306,7 @@ void EventThread::eventLoop() DEBUG_STREAM << "EventThread::eventLoop() sleep for " << sleepTime << endl; - boost::posix_time::milliseconds sleepPosixTime(sleepTime); - boost::this_thread::sleep(sleepPosixTime); + boost::this_thread::sleep_for(boost::chrono::seconds(sleepTime)); } catch(boost::thread_interrupted& ex) { @@ -335,7 +329,7 @@ void EventThread::eventLoop() writeState(Tango::ALARM); writeStatus("Event thread unknown exception"); } - } //while + } //thread loop } } //namespace diff --git a/src/EventThread.h b/src/EventThread.h index b078a5d..ee248c5 100644 --- a/src/EventThread.h +++ b/src/EventThread.h @@ -72,8 +72,6 @@ protected: //------------------------------------------------------------------------------ // [Protected] Initialization methods //------------------------------------------------------------------------------ - virtual void initScriptManager() throw(std::runtime_error); - virtual void initEventBuffer() throw(std::runtime_error); virtual void initINotify() throw(std::runtime_error); diff --git a/src/PreProcessor.cpp b/src/PreProcessor.cpp index db2d5b7..cd898dc 100644 --- a/src/PreProcessor.cpp +++ b/src/PreProcessor.cpp @@ -157,29 +157,41 @@ void PreProcessor::init_device() /*----- PROTECTED REGION ID(PreProcessor::init_device) ENABLED START -----*/ - try - { - //Create event thread - m_eventThread_sp = EventThread::create(this, m_configuration_sp); + //Initialize regular file counters to zero + *attr_RegularFileCounter_read = 0; - //Start device if auto start enabled - if(autoStart) - { - INFO_STREAM << "FitsImporter::init_device() auto start enabled " << endl; - on(); - } - } - catch(std::exception& ex) - { - set_state(Tango::FAULT); - std::stringstream error_stream; - error_stream << "PreProcessor::init_device() " << ex.what() << std::endl; - set_status(error_stream.str()); - } - catch(...) + //Initialize warning file counters to zero + *attr_WarningFileCounter_read = 0; + + //Initialize error file counters to zero + *attr_ErrorFileCounter_read = 0; + + if(get_state() != Tango::FAULT) { - set_state(Tango::FAULT); - set_status("PreProcessor::init_device() unknown error"); + try + { + //Create event thread + m_eventThread_sp = EventThread::create(this, m_configuration_sp); + + //Start device if auto start enabled + if(autoStart) + { + INFO_STREAM << "PreProcessor::init_device() auto start enabled " << endl; + on(); + } + } + catch(std::exception& ex) + { + set_state(Tango::FAULT); + std::stringstream error_stream; + error_stream << "PreProcessor::init_device() " << ex.what() << std::endl; + set_status(error_stream.str()); + } + catch(...) + { + set_state(Tango::FAULT); + set_status("PreProcessor::init_device() unknown error"); + } } /*----- PROTECTED REGION END -----*/ // PreProcessor::init_device @@ -356,8 +368,8 @@ void PreProcessor::get_device_property() if(workerNumber<1 || workerNumber>MAX_WORKER_NUMBER) throw(invalid_argument("WorkerNumber property out of range or not defined")); - m_configuration_sp = Configuration::create(watchPath, - destPath, scriptPath, workerNumber, sleepTime, waitTime, inotifyMask); + m_configuration_sp = Configuration::create(watchPath, destPath, scriptPath, + workerNumber, sleepTime, waitTime, inotifyMask); } catch(invalid_argument& ex) { @@ -378,7 +390,7 @@ void PreProcessor::get_device_property() //-------------------------------------------------------- void PreProcessor::always_executed_hook() { - INFO_STREAM << "PreProcessor::always_executed_hook() " << device_name << endl; + DEBUG_STREAM << "PreProcessor::always_executed_hook() " << device_name << endl; /*----- PROTECTED REGION ID(PreProcessor::always_executed_hook) ENABLED START -----*/ if(get_state() != Tango::FAULT) diff --git a/src/PreProcessor.h b/src/PreProcessor.h index edc481b..573dde1 100644 --- a/src/PreProcessor.h +++ b/src/PreProcessor.h @@ -84,13 +84,13 @@ class PreProcessor : public TANGO_BASE_CLASS boost::mutex m_errorCounterMutex; //Min milli second of sleep time allowed - static const unsigned long MIN_SLEEP_TIME = 100; + static const unsigned long MIN_SLEEP_TIME = 1; //Max milli second of sleep time allowed - static const unsigned long MAX_SLEEP_TIME = 10000; + static const unsigned long MAX_SLEEP_TIME = 60; //Max milli second of wait time allowed - static const unsigned long MAX_WAIT_TIME = 10000; + static const unsigned long MAX_WAIT_TIME = 3600; //Max number of worker thread allowed static const unsigned int MAX_WORKER_NUMBER = 100; diff --git a/src/ScriptManager.cpp b/src/ScriptManager.cpp index 3d80b81..ce142c3 100644 --- a/src/ScriptManager.cpp +++ b/src/ScriptManager.cpp @@ -1,14 +1,11 @@ #include <ScriptManager.h> -#include <iostream> -#include <sstream> - namespace PreProcessor_ns { -//============================================================================= +//============================================================================== // ScriptManager::ScriptManager() -//============================================================================= +//============================================================================== ScriptManager::ScriptManager(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp) : Tango::LogAdapter(deviceImpl_p), m_configuration_sp(configuration_sp) @@ -16,9 +13,9 @@ ScriptManager::ScriptManager(Tango::DeviceImpl* deviceImpl_p, DEBUG_STREAM << "ScriptManager::ScriptManager()" << endl; } -//============================================================================= +//============================================================================== // ScriptManager::~ScriptManager() -//============================================================================= +//============================================================================== ScriptManager::~ScriptManager() { DEBUG_STREAM << "ScriptManager::~ScriptManager()" << endl; @@ -36,24 +33,101 @@ ScriptManager::SP ScriptManager::create(Tango::DeviceImpl* deviceImpl_p, return s_sp; } -//============================================================================= +//============================================================================== // ScriptManager::isReadyToArchive() -//============================================================================= -bool ScriptManager::isReadyToArchive(std::string fileName) - throw(std::runtime_error) +//============================================================================== +void ScriptManager::checkScriptCompliance() + throw(std::runtime_error) +{ + DEBUG_STREAM << "ScriptManager::checkScriptCompliance()" << endl; + + boost::filesystem::path scriptPath(m_configuration_sp->getScriptPath()); + + if(!boost::filesystem::exists(scriptPath)) + throw std::runtime_error("Script path not exists"); + + if(!boost::filesystem::is_regular_file(scriptPath)) + throw std::runtime_error("Script path is not regular file"); + + std::stringstream command; + command << m_configuration_sp->getScriptPath() << " CHECK"; + + std::string result = exec(command.str()); + + if(result.find("CHECK OK") == std::string::npos) + throw std::runtime_error("Invalid script"); +} + +//============================================================================== +// ScriptManager::isFileVerified() +//============================================================================== +bool ScriptManager::isFileVerified(boost::filesystem::path& filePath) + throw(std::runtime_error) +{ + DEBUG_STREAM << "ScriptManager::isFileVerified()" << endl; + + std::stringstream command; + command << m_configuration_sp->getScriptPath() + << " VERIFY " << filePath.string(); + + std::string result = exec(command.str()); + + if(result.find("VERIFY OK") != std::string::npos) + { + return true; + } + else if(result.find("VERIFY WAIT") != std::string::npos) + { + return false; + } + else if(result.find("VERIFY FATAL") != std::string::npos) + { + std::stringstream errorStream; + errorStream << "Verification error: " << result; + throw std::runtime_error(errorStream.str()); + } + else + { + std::stringstream errorStream; + errorStream << "Unknown verification error: " << result; + throw std::runtime_error(errorStream.str()); + } +} + +//============================================================================== +// ScriptManager::preProcessFile() +//============================================================================== +void ScriptManager::preProcessFile(boost::filesystem::path& filePath) + throw(std::runtime_error) { - DEBUG_STREAM << "ScriptManager::isReadyToArchive()" << endl; + DEBUG_STREAM << "ScriptManager::preProcessFile()" << endl; + + std::stringstream command; + command << m_configuration_sp->getScriptPath() + << " PROCESS " << filePath.string(); - return true; + std::string result = exec(command.str()); + + if(result.find("PROCESS FATAL") != std::string::npos) + { + std::stringstream errorStream; + errorStream << "Pre process error: " << result; + throw std::runtime_error(errorStream.str()); + } + else + { + std::stringstream errorStream; + errorStream << "Unknown pre process error: " << result; + throw std::runtime_error(errorStream.str()); + } } -//============================================================================= +//============================================================================== // ScriptManager::exec() -//============================================================================= -std::string ScriptManager::exec(std::string command) - throw(std::runtime_error) +//============================================================================== +std::string ScriptManager::exec(std::string command) throw(std::runtime_error) { - DEBUG_STREAM << "ScriptManager::exec()" << endl; + DEBUG_STREAM << "ScriptManager::exec() << " << command << endl; const int BUFFSIZE = 1024; @@ -73,7 +147,7 @@ std::string ScriptManager::exec(std::string command) pclose(pipe); - DEBUG_STREAM << "ScriptManager::exec() " << result << endl; + DEBUG_STREAM << "ScriptManager::exec() >> " << result << endl; return result; } diff --git a/src/ScriptManager.h b/src/ScriptManager.h index fd71def..a0538e4 100644 --- a/src/ScriptManager.h +++ b/src/ScriptManager.h @@ -9,6 +9,7 @@ #include <tango.h> #include <boost/shared_ptr.hpp> +#include <boost/filesystem.hpp> namespace PreProcessor_ns { @@ -44,9 +45,16 @@ public: static ScriptManager::SP create(Tango::DeviceImpl*, Configuration::SP); //------------------------------------------------------------------------------ -// [Public] Script method +// [Public] Script methods //------------------------------------------------------------------------------ - virtual bool isReadyToArchive(std::string) throw(std::runtime_error); + virtual void checkScriptCompliance() + throw(std::runtime_error); + + virtual bool isFileVerified(boost::filesystem::path&) + throw(std::runtime_error); + + virtual void preProcessFile(boost::filesystem::path&) + throw(std::runtime_error); protected: //------------------------------------------------------------------------------ diff --git a/src/WorkerThread.cpp b/src/WorkerThread.cpp index 34c42ab..a4d3720 100644 --- a/src/WorkerThread.cpp +++ b/src/WorkerThread.cpp @@ -1,17 +1,9 @@ #include <WorkerThread.h> #include <PreProcessor.h> -#include <cassert> -#include <fstream> - #include <tango.h> -#include <boost/scoped_ptr.hpp> -#include <boost/iostreams/filtering_streambuf.hpp> -#include <boost/iostreams/copy.hpp> -#include <boost/iostreams/filter/gzip.hpp> -#include <boost/regex.hpp> -#include <boost/lexical_cast.hpp> +#include <boost/chrono.hpp> namespace PreProcessor_ns { @@ -20,9 +12,10 @@ namespace PreProcessor_ns // WorkerThread::WorkerThread() //============================================================================== WorkerThread::WorkerThread(PreProcessor* preProcessor_p, - EventBuffer::SP eventBuffer_sp, Configuration::SP configuration_sp) : - Tango::LogAdapter(preProcessor_p), m_preProcessor_p(preProcessor_p), - m_eventBuffer_sp(eventBuffer_sp), m_configuration_sp(configuration_sp) + EventBuffer::SP eventBuffer_sp, ScriptManager::SP fileManager_sp, + Configuration::SP configuration_sp) : Tango::LogAdapter(preProcessor_p), + m_preProcessor_p(preProcessor_p), m_eventBuffer_sp(eventBuffer_sp), + m_fileManager_sp(fileManager_sp), m_configuration_sp(configuration_sp) { DEBUG_STREAM << "WorkerThread::WorkerThread()" << std::endl; } @@ -42,100 +35,115 @@ void WorkerThread::workerLoop() { DEBUG_STREAM << "WorkerThread::workerLoop(): starting loop" << endl; -// unsigned int waitTime = m_configuration_sp->getWaitTime(); -// std::string watchPath = m_configuration_sp->getWatchPath(); - -// while(true) -// { -// try -// { -// //Wait new file event to process -// boost::filesystem::path origPath(m_eventBuffer_sp->waitNew()); -// std::string fileName = origPath.stem().string(); -// -// //If configured wait after new file event -// boost::posix_time::milliseconds waitPosixTime(waitTime); -// boost::this_thread::sleep( waitPosixTime ); -// -// bool completed = false; -// -// INFO_STREAM << "WorkerThread::workerLoop() processing \"" -// << fileName << "\"" << endl; -// -// try -// { -// //Open fits file -// boost::shared_ptr<CCfits::FITS> fitsFile_sp( -// new CCfits::FITS(origPath.string(), CCfits::Write)); -// -// //Read all key in primary HDU -// CCfits::PHDU& phdu = fitsFile_sp->pHDU(); -// phdu.readAllKeys(); -// -// try -// { -// //Try to ingest using an instrument in list -// ingestUsingInstrumentList(origPath, fitsFile_sp); -// -// completed = true; -// m_fitsImporter_p->incrementRegularCounter(); -// INFO_STREAM << "WorkerThread::workerLoop() \"" << fileName -// << "\" archived regularly" << endl; -// } -// catch(CCfits::FitsException& ex) -// { -// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.message() << endl; -// } -// catch(std::runtime_error& ex) -// { -// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.what() << endl; -// } -// -// if(!completed) -// { -// //Try to ingest using default instrument -// ingestUsingDefaultInstrument(origPath, fitsFile_sp); -// -// completed = true; -// m_fitsImporter_p->incrementWarningCounter(); -// WARN_STREAM << "WorkerThread::workerLoop() \"" << fileName -// << "\" archived in default instrument" << endl; -// } -// } -// catch(CCfits::FitsException& ex) -// { -// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.message() << endl; -// } -// catch(std::runtime_error& ex) -// { -// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.what() << endl; -// } -// -// if(!completed) -// { -// //Cannot ingest new file => notify error -// m_fitsImporter_p->incrementErrorCounter(); -// ERROR_STREAM << "WorkerThread::workerLoop() \"" << fileName -// << "\" not archived" << endl; -// } -// -// m_eventBuffer_sp->markAsProcessed(origPath); -// } -// catch(boost::thread_interrupted& ex) -// { -// DEBUG_STREAM << "WorkerThread::workerLoop() interrupt" << endl; -// -// break; -// } -// catch(std::exception& ex) -// { -// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.what() << endl; -// } -// catch(...) -// { -// ERROR_STREAM << "WorkerThread::workerLoop() unknown exception" << endl; -// } -// } //while + //Sleep time between two verification attempts + boost::chrono::seconds sleepTime(m_configuration_sp->getSleepTime()); + + //Verification attempts timeout + boost::chrono::steady_clock::duration waitTime = + boost::chrono::seconds(m_configuration_sp->getWaitTime()); + + while(true) + { + try + { + //Wait new file event to process + boost::filesystem::path origPath(m_eventBuffer_sp->waitNew()); + + std::string fileName = origPath.stem().string(); + + //Start timer + boost::chrono::steady_clock::time_point start = + boost::chrono::steady_clock::now(); + + INFO_STREAM << "WorkerThread::workerLoop() processing \"" + << fileName << "\"" << endl; + try + { + bool verified = false; + + do + { + if(m_fileManager_sp->isFileVerified(origPath)) + { + verified = true; + break; + } + else + { + boost::this_thread::sleep_for(sleepTime); + } + } + while(boost::chrono::steady_clock::now()-start <= waitTime); + + copyToDestination(origPath); + + if(verified) + { + INFO_STREAM << "WorkerThread::workerLoop() \"" << fileName + << "\" ingested regularly" << endl; + m_preProcessor_p->incrementRegularCounter(); + } + else + { + WARN_STREAM << "WorkerThread::workerLoop() \"" << fileName + << "\" archived after timeout" << endl; + m_preProcessor_p->incrementWarningCounter(); + } + } + catch(std::runtime_error& ex) + { + ERROR_STREAM << "WorkerThread::workerLoop() \"" << fileName + << "\" not archived due to " << ex.what() << endl; + m_preProcessor_p->incrementErrorCounter(); + } + + m_eventBuffer_sp->markAsProcessed(origPath); + } + catch(boost::thread_interrupted& ex) + { + DEBUG_STREAM << "WorkerThread::workerLoop() interrupt" << endl; + break; + } + catch(std::exception& ex) + { + ERROR_STREAM << "WorkerThread::workerLoop() " << ex.what() << endl; + } + catch(...) + { + ERROR_STREAM << "WorkerThread::workerLoop() unknown exception" << endl; + } + } //thread loop +} + +//============================================================================== +// WorkerThread::moveFile()() +//============================================================================== +void WorkerThread::copyToDestination(boost::filesystem::path& origPath) + throw (std::runtime_error) +{ + DEBUG_STREAM << "WorkerThread::moveFile()" << endl; + + if(!boost::filesystem::is_regular_file(origPath)) + throw std::runtime_error( "Origin path \"" + + origPath.string() + "\" is not a regular file"); + + boost::filesystem::path destPath(m_configuration_sp->getDestPath()); + + if(!boost::filesystem::exists(destPath)) + throw std::runtime_error( "Destination path \"" + + destPath.string() + "\" not exists"); + + if(!boost::filesystem::is_directory(destPath)) + throw std::runtime_error( "Destination path \"" + + destPath.string() + "\" is not a directory"); + + destPath /= origPath.filename(); + + if(boost::filesystem::exists(destPath)) + throw std::runtime_error( "Destination path \"" + + destPath.string() + "\" already exists"); + + boost::filesystem::copy(origPath, destPath); } } //namespace diff --git a/src/WorkerThread.h b/src/WorkerThread.h index 77d7328..e83e81c 100644 --- a/src/WorkerThread.h +++ b/src/WorkerThread.h @@ -3,13 +3,12 @@ #include <Configuration.h> #include <EventBuffer.h> +#include <ScriptManager.h> #include <tango.h> -#include <stdexcept> -#include <sstream> - #include <boost/shared_ptr.hpp> +#include <boost/filesystem.hpp> namespace PreProcessor_ns { @@ -22,7 +21,8 @@ public: //------------------------------------------------------------------------------ // [Public] Constructor destructor //------------------------------------------------------------------------------ - WorkerThread(PreProcessor*, EventBuffer::SP, Configuration::SP); + WorkerThread(PreProcessor*, EventBuffer::SP, + ScriptManager::SP, Configuration::SP); virtual ~WorkerThread(); @@ -35,6 +35,8 @@ protected: //------------------------------------------------------------------------------ // [Protected] Utilities methods //------------------------------------------------------------------------------ + virtual void copyToDestination(boost::filesystem::path&) + throw (std::runtime_error); //------------------------------------------------------------------------------ // [Protected] Class variables @@ -45,6 +47,9 @@ protected: //Event buffer shared pointer EventBuffer::SP m_eventBuffer_sp; + //File manager shared pointer + ScriptManager::SP m_fileManager_sp; + //Configuration shared pointer Configuration::SP m_configuration_sp; }; -- GitLab