Skip to content
Snippets Groups Projects
Commit 6c9ff033 authored by Marco De Marco's avatar Marco De Marco
Browse files

File manager and script completed, before test

parent 8da26526
No related branches found
No related tags found
No related merge requests found
...@@ -7,17 +7,19 @@ INSTALL_DIR=/usr/local/bin ...@@ -7,17 +7,19 @@ INSTALL_DIR=/usr/local/bin
INC_DIR=/usr/local/omniORB/include \ INC_DIR=/usr/local/omniORB/include \
/usr/local/zeromq/include/zmq \ /usr/local/zeromq/include/zmq \
/usr/local/tango/include/tango \ /usr/local/tango/include/tango \
/usr/local/boost/include \
./src ./src
LIB_DIR=/usr/local/omniORB/lib \ LIB_DIR=/usr/local/omniORB/lib \
/usr/local/zeromq/lib \ /usr/local/zeromq/lib \
/usr/local/tango/lib /usr/local/tango/lib \
/usr/local/boost/lib
#================================================================================ #================================================================================
CC=g++ CC=g++
CXX_DEBUG_FLAGS=-g -DVERBOSE_DEBUG CXX_DEBUG_FLAGS=-g -DVERBOSE_DEBUG
CXX_RELEASE_FLAGS=-O3 CXX_RELEASE_FLAGS=-O3
CXX_DEFAULT_FLAGS=-c -Wall -Wextra -std=c++11 -std=gnu++11 CXX_DEFAULT_FLAGS=-c -Wall -Wextra -std=c++11 -std=gnu++11
LDFLAGS=-Wall -lomniORB4 -lomniDynamic4 -lCOS4 -lomnithread -ltango -llog4tango \ LDFLAGS=-Wall -lomniORB4 -lomniDynamic4 -lCOS4 -lomnithread -ltango -llog4tango \
-lboost_thread -lboost_filesystem -lboost_system -lboost_thread -lboost_filesystem -lboost_system -lboost_chrono
INC_PARM=$(foreach d, $(INC_DIR), -I$d) INC_PARM=$(foreach d, $(INC_DIR), -I$d)
LIB_PARM=$(foreach d, $(LIB_DIR), -L$d) LIB_PARM=$(foreach d, $(LIB_DIR), -L$d)
#================================================================================ #================================================================================
......
#!/bin/bash #!/bin/bash
#----------------------------------------------------------------------- #: Title : fits.sh
# User parameters #: Date : 2014/03/03
#----------------------------------------------------------------------- #: Author : "Marco De Marco" <demarco@oats.inaf.it>
#Verify tool path #: Version : 0.1
VERIFY_TOOL="/home/mdm/workspace/nexecs/test/tools/fitsverify" #: Description : Fits verification and preproccessing script
CHECK_STRING="conform to the FITS format" #: Usage :
#: Response :
FATAL_ERROR="Fatal"
EOF_ERROR="End-of-file"
#-----------------------------------------------------------------------
# Verify script
#-----------------------------------------------------------------------
if [ "$1" == "CHECK" ]; then if [ "$1" == "CHECK" ]; then
#: Section : CHECK
#: Parameter : none
#: Response : CHECK OK
#: : CHECK FATAL
VERIFY_TOOL="/usr/local/bin/fitsverify"
CHECK_STRING="conform to the FITS format"
res=$($VERIFY_TOOL 2>&1) res=$($VERIFY_TOOL 2>&1)
check=$(echo $res | grep "$CHECK_STRING" | wc | awk '{print $1}') check=$(echo $res | grep "$CHECK_STRING" | wc | awk '{print $1}')
if [ "$check" -ge "1" ]; then if [ "$check" -ge "1" ]; then
echo "CHECK OK" echo "CHECK OK"
else else
echo "NOT OK" echo "CHECK FATAL"
fi fi
exit 0 exit 0
else
#Check regular expression -> fatal elif [ "$1" == "VERIFY" ]; then
file=$1
#: Section : VERIFY
#: Parameter : file path
#: Response : VERIFY OK
#: : VERIFY WAIT
#: : VERIFY FATAL
VERIFY_TOOL="/usr/local/bin/fitsverify"
FATAL_ERROR="Fatal"
EOF_ERROR="End-of-file"
file=$2
file_name=${file##*/} file_name=${file##*/}
if [[ ! "${file_name,,}" =~ ^[^\.].*\.(fits|fts).*$ ]]; then
echo "FATAL" #Check regular expression -> fatal
if [[ ! "${file_name,,}" =~ ^[^\.].*\.(fits|fit|fts).*$ ]]; then
echo "VERIFY FATAL : error on regular expression"
exit 0 exit 0
fi fi
#if fits verify tools exists -> fatal #if fits verify tools exists -> fatal
if [ ! -x $VERIFY_TOOL ]; then if [ ! -x $VERIFY_TOOL ]; then
echo "FATAL" echo "VERIFY FATAL : verify tools not exists"
exit 0 exit 0
fi fi
#if fits file not exists -> fatal #if fits file not exists -> fatal
if [ ! -f $1 ]; then if [ ! -f $file ]; then
echo "FATAL" echo "VERIFY FATAL : file not exists"
exit 0 exit 0
fi fi
#Check with fits verify #Check with fits verify
res=$($VERIFY_TOOL $1 2>&1) res=$($VERIFY_TOOL $file 2>&1)
#if fitsverify return fatal error -> wait #if fitsverify return fatal error -> wait
fatal=$(echo $res | grep "$FATAL_ERROR" | wc | awk '{print $1}') fatal=$(echo $res | grep "$FATAL_ERROR" | wc | awk '{print $1}')
if [ "$fatal" -ge "1" ]; then if [ "$fatal" -ge "1" ]; then
echo "WAIT" echo "VERIFY WAIT"
exit 0 exit 0
fi fi
#if fitsverify return end of file -> wait #if fitsverify return end of file -> wait
eof=$(echo $res | grep "$EOF_ERROR" | wc | awk '{print $1}') eof=$(echo $res | grep "$EOF_ERROR" | wc | awk '{print $1}')
if [ "$eof" -ge "1" ]; then if [ "$eof" -ge "1" ]; then
echo "WAIT" echo "VERIFY WAIT"
exit 0 exit 0
fi fi
#else -> ok #else -> ok
echo "OK" echo "VERIFY OK"
exit 0
elif [ "$1" == "PROCESS" ]; then
#: Section : PROCESS
#: Parameter : file path
#: Response : PROCESS OK
#: : PROCESS FATAL
echo "PROCESS OK"
exit 0 exit 0
else
#: Section : DEFAULT
#: Parameter : none
#: Response : UNKNOWN
echo "UNKNOWN"
exit 0
fi fi
#-----------------------------------------------------------------------
#include <EventThread.h> #include <EventThread.h>
#include <PreProcessor.h> #include <PreProcessor.h>
#include <ScriptManager.h>
#include <WorkerThread.h> #include <WorkerThread.h>
#include <cassert> #include <cassert>
...@@ -9,6 +10,7 @@ ...@@ -9,6 +10,7 @@
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include <boost/scoped_ptr.hpp> #include <boost/scoped_ptr.hpp>
#include <boost/chrono.hpp>
namespace PreProcessor_ns namespace PreProcessor_ns
{ {
...@@ -62,8 +64,6 @@ void EventThread::start() ...@@ -62,8 +64,6 @@ void EventThread::start()
try try
{ {
initScriptManager();
initEventBuffer(); initEventBuffer();
initINotify(); initINotify();
...@@ -151,14 +151,6 @@ void EventThread::writeStatus(std::string status) ...@@ -151,14 +151,6 @@ void EventThread::writeStatus(std::string status)
m_status = status; m_status = status;
} }
//==============================================================================
// EventThread::initScriptManager()
//==============================================================================
void EventThread::initScriptManager() throw(std::runtime_error)
{
DEBUG_STREAM << "EventThread::initScriptManager()" << endl;
}
//============================================================================== //==============================================================================
// EventThread::initEventBuffer() // EventThread::initEventBuffer()
//============================================================================== //==============================================================================
...@@ -228,12 +220,15 @@ void EventThread::initThreadGroup() throw(std::runtime_error) ...@@ -228,12 +220,15 @@ void EventThread::initThreadGroup() throw(std::runtime_error)
{ {
DEBUG_STREAM << "EventThread::initThreadGroup()" << endl; DEBUG_STREAM << "EventThread::initThreadGroup()" << endl;
m_threadGroup_sp.reset(new boost::thread_group); ScriptManager::SP fileManager_sp =
ScriptManager::create(m_preProcessor_p, m_configuration_sp);
unsigned int workerNumber = m_configuration_sp->getWorkerNumber(); fileManager_sp->checkScriptCompliance();
WorkerThread worker(m_preProcessor_p, m_eventBuffer_sp,
fileManager_sp, m_configuration_sp);
//Create a worker thread and pass all arguments m_threadGroup_sp.reset(new boost::thread_group);
WorkerThread worker(m_preProcessor_p, m_eventBuffer_sp, m_configuration_sp);
try try
{ {
...@@ -241,6 +236,8 @@ void EventThread::initThreadGroup() throw(std::runtime_error) ...@@ -241,6 +236,8 @@ void EventThread::initThreadGroup() throw(std::runtime_error)
m_threadGroup_sp->add_thread( m_threadGroup_sp->add_thread(
new boost::thread(&EventThread::eventLoop, this)); new boost::thread(&EventThread::eventLoop, this));
unsigned int workerNumber = m_configuration_sp->getWorkerNumber();
//Add to thread group worker threads //Add to thread group worker threads
for(unsigned int i=0; i<workerNumber; i++) for(unsigned int i=0; i<workerNumber; i++)
m_threadGroup_sp->add_thread( m_threadGroup_sp->add_thread(
...@@ -295,8 +292,6 @@ void EventThread::eventLoop() ...@@ -295,8 +292,6 @@ void EventThread::eventLoop()
{ {
event = ( struct inotify_event * ) &buffer[ i ]; event = ( struct inotify_event * ) &buffer[ i ];
DEBUG_STREAM << "EVENT: " << event->name << endl; //TODO: delete me
//Add path to file name //Add path to file name
boost::filesystem::path file(event->name); boost::filesystem::path file(event->name);
boost::filesystem::path path(watchPath); boost::filesystem::path path(watchPath);
...@@ -311,8 +306,7 @@ void EventThread::eventLoop() ...@@ -311,8 +306,7 @@ void EventThread::eventLoop()
DEBUG_STREAM << "EventThread::eventLoop() sleep for " << sleepTime << endl; DEBUG_STREAM << "EventThread::eventLoop() sleep for " << sleepTime << endl;
boost::posix_time::milliseconds sleepPosixTime(sleepTime); boost::this_thread::sleep_for(boost::chrono::seconds(sleepTime));
boost::this_thread::sleep(sleepPosixTime);
} }
catch(boost::thread_interrupted& ex) catch(boost::thread_interrupted& ex)
{ {
...@@ -335,7 +329,7 @@ void EventThread::eventLoop() ...@@ -335,7 +329,7 @@ void EventThread::eventLoop()
writeState(Tango::ALARM); writeState(Tango::ALARM);
writeStatus("Event thread unknown exception"); writeStatus("Event thread unknown exception");
} }
} //while } //thread loop
} }
} //namespace } //namespace
...@@ -72,8 +72,6 @@ protected: ...@@ -72,8 +72,6 @@ protected:
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// [Protected] Initialization methods // [Protected] Initialization methods
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
virtual void initScriptManager() throw(std::runtime_error);
virtual void initEventBuffer() throw(std::runtime_error); virtual void initEventBuffer() throw(std::runtime_error);
virtual void initINotify() throw(std::runtime_error); virtual void initINotify() throw(std::runtime_error);
......
...@@ -157,6 +157,17 @@ void PreProcessor::init_device() ...@@ -157,6 +157,17 @@ void PreProcessor::init_device()
/*----- PROTECTED REGION ID(PreProcessor::init_device) ENABLED START -----*/ /*----- PROTECTED REGION ID(PreProcessor::init_device) ENABLED START -----*/
//Initialize regular file counters to zero
*attr_RegularFileCounter_read = 0;
//Initialize warning file counters to zero
*attr_WarningFileCounter_read = 0;
//Initialize error file counters to zero
*attr_ErrorFileCounter_read = 0;
if(get_state() != Tango::FAULT)
{
try try
{ {
//Create event thread //Create event thread
...@@ -165,7 +176,7 @@ void PreProcessor::init_device() ...@@ -165,7 +176,7 @@ void PreProcessor::init_device()
//Start device if auto start enabled //Start device if auto start enabled
if(autoStart) if(autoStart)
{ {
INFO_STREAM << "FitsImporter::init_device() auto start enabled " << endl; INFO_STREAM << "PreProcessor::init_device() auto start enabled " << endl;
on(); on();
} }
} }
...@@ -181,6 +192,7 @@ void PreProcessor::init_device() ...@@ -181,6 +192,7 @@ void PreProcessor::init_device()
set_state(Tango::FAULT); set_state(Tango::FAULT);
set_status("PreProcessor::init_device() unknown error"); set_status("PreProcessor::init_device() unknown error");
} }
}
/*----- PROTECTED REGION END -----*/ // PreProcessor::init_device /*----- PROTECTED REGION END -----*/ // PreProcessor::init_device
} }
...@@ -356,8 +368,8 @@ void PreProcessor::get_device_property() ...@@ -356,8 +368,8 @@ void PreProcessor::get_device_property()
if(workerNumber<1 || workerNumber>MAX_WORKER_NUMBER) if(workerNumber<1 || workerNumber>MAX_WORKER_NUMBER)
throw(invalid_argument("WorkerNumber property out of range or not defined")); throw(invalid_argument("WorkerNumber property out of range or not defined"));
m_configuration_sp = Configuration::create(watchPath, m_configuration_sp = Configuration::create(watchPath, destPath, scriptPath,
destPath, scriptPath, workerNumber, sleepTime, waitTime, inotifyMask); workerNumber, sleepTime, waitTime, inotifyMask);
} }
catch(invalid_argument& ex) catch(invalid_argument& ex)
{ {
...@@ -378,7 +390,7 @@ void PreProcessor::get_device_property() ...@@ -378,7 +390,7 @@ void PreProcessor::get_device_property()
//-------------------------------------------------------- //--------------------------------------------------------
void PreProcessor::always_executed_hook() void PreProcessor::always_executed_hook()
{ {
INFO_STREAM << "PreProcessor::always_executed_hook() " << device_name << endl; DEBUG_STREAM << "PreProcessor::always_executed_hook() " << device_name << endl;
/*----- PROTECTED REGION ID(PreProcessor::always_executed_hook) ENABLED START -----*/ /*----- PROTECTED REGION ID(PreProcessor::always_executed_hook) ENABLED START -----*/
if(get_state() != Tango::FAULT) if(get_state() != Tango::FAULT)
......
...@@ -84,13 +84,13 @@ class PreProcessor : public TANGO_BASE_CLASS ...@@ -84,13 +84,13 @@ class PreProcessor : public TANGO_BASE_CLASS
boost::mutex m_errorCounterMutex; boost::mutex m_errorCounterMutex;
//Min milli second of sleep time allowed //Min milli second of sleep time allowed
static const unsigned long MIN_SLEEP_TIME = 100; static const unsigned long MIN_SLEEP_TIME = 1;
//Max milli second of sleep time allowed //Max milli second of sleep time allowed
static const unsigned long MAX_SLEEP_TIME = 10000; static const unsigned long MAX_SLEEP_TIME = 60;
//Max milli second of wait time allowed //Max milli second of wait time allowed
static const unsigned long MAX_WAIT_TIME = 10000; static const unsigned long MAX_WAIT_TIME = 3600;
//Max number of worker thread allowed //Max number of worker thread allowed
static const unsigned int MAX_WORKER_NUMBER = 100; static const unsigned int MAX_WORKER_NUMBER = 100;
......
#include <ScriptManager.h> #include <ScriptManager.h>
#include <iostream>
#include <sstream>
namespace PreProcessor_ns namespace PreProcessor_ns
{ {
//============================================================================= //==============================================================================
// ScriptManager::ScriptManager() // ScriptManager::ScriptManager()
//============================================================================= //==============================================================================
ScriptManager::ScriptManager(Tango::DeviceImpl* deviceImpl_p, ScriptManager::ScriptManager(Tango::DeviceImpl* deviceImpl_p,
Configuration::SP configuration_sp) : Tango::LogAdapter(deviceImpl_p), Configuration::SP configuration_sp) : Tango::LogAdapter(deviceImpl_p),
m_configuration_sp(configuration_sp) m_configuration_sp(configuration_sp)
...@@ -16,9 +13,9 @@ ScriptManager::ScriptManager(Tango::DeviceImpl* deviceImpl_p, ...@@ -16,9 +13,9 @@ ScriptManager::ScriptManager(Tango::DeviceImpl* deviceImpl_p,
DEBUG_STREAM << "ScriptManager::ScriptManager()" << endl; DEBUG_STREAM << "ScriptManager::ScriptManager()" << endl;
} }
//============================================================================= //==============================================================================
// ScriptManager::~ScriptManager() // ScriptManager::~ScriptManager()
//============================================================================= //==============================================================================
ScriptManager::~ScriptManager() ScriptManager::~ScriptManager()
{ {
DEBUG_STREAM << "ScriptManager::~ScriptManager()" << endl; DEBUG_STREAM << "ScriptManager::~ScriptManager()" << endl;
...@@ -36,24 +33,101 @@ ScriptManager::SP ScriptManager::create(Tango::DeviceImpl* deviceImpl_p, ...@@ -36,24 +33,101 @@ ScriptManager::SP ScriptManager::create(Tango::DeviceImpl* deviceImpl_p,
return s_sp; return s_sp;
} }
//============================================================================= //==============================================================================
// ScriptManager::isReadyToArchive() // ScriptManager::isReadyToArchive()
//============================================================================= //==============================================================================
bool ScriptManager::isReadyToArchive(std::string fileName) void ScriptManager::checkScriptCompliance()
throw(std::runtime_error) throw(std::runtime_error)
{ {
DEBUG_STREAM << "ScriptManager::isReadyToArchive()" << endl; DEBUG_STREAM << "ScriptManager::checkScriptCompliance()" << endl;
boost::filesystem::path scriptPath(m_configuration_sp->getScriptPath());
if(!boost::filesystem::exists(scriptPath))
throw std::runtime_error("Script path not exists");
if(!boost::filesystem::is_regular_file(scriptPath))
throw std::runtime_error("Script path is not regular file");
std::stringstream command;
command << m_configuration_sp->getScriptPath() << " CHECK";
std::string result = exec(command.str());
if(result.find("CHECK OK") == std::string::npos)
throw std::runtime_error("Invalid script");
}
//==============================================================================
// ScriptManager::isFileVerified()
//==============================================================================
bool ScriptManager::isFileVerified(boost::filesystem::path& filePath)
throw(std::runtime_error)
{
DEBUG_STREAM << "ScriptManager::isFileVerified()" << endl;
std::stringstream command;
command << m_configuration_sp->getScriptPath()
<< " VERIFY " << filePath.string();
std::string result = exec(command.str());
if(result.find("VERIFY OK") != std::string::npos)
{
return true; return true;
} }
else if(result.find("VERIFY WAIT") != std::string::npos)
{
return false;
}
else if(result.find("VERIFY FATAL") != std::string::npos)
{
std::stringstream errorStream;
errorStream << "Verification error: " << result;
throw std::runtime_error(errorStream.str());
}
else
{
std::stringstream errorStream;
errorStream << "Unknown verification error: " << result;
throw std::runtime_error(errorStream.str());
}
}
//============================================================================= //==============================================================================
// ScriptManager::exec() // ScriptManager::preProcessFile()
//============================================================================= //==============================================================================
std::string ScriptManager::exec(std::string command) void ScriptManager::preProcessFile(boost::filesystem::path& filePath)
throw(std::runtime_error) throw(std::runtime_error)
{ {
DEBUG_STREAM << "ScriptManager::exec()" << endl; DEBUG_STREAM << "ScriptManager::preProcessFile()" << endl;
std::stringstream command;
command << m_configuration_sp->getScriptPath()
<< " PROCESS " << filePath.string();
std::string result = exec(command.str());
if(result.find("PROCESS FATAL") != std::string::npos)
{
std::stringstream errorStream;
errorStream << "Pre process error: " << result;
throw std::runtime_error(errorStream.str());
}
else
{
std::stringstream errorStream;
errorStream << "Unknown pre process error: " << result;
throw std::runtime_error(errorStream.str());
}
}
//==============================================================================
// ScriptManager::exec()
//==============================================================================
std::string ScriptManager::exec(std::string command) throw(std::runtime_error)
{
DEBUG_STREAM << "ScriptManager::exec() << " << command << endl;
const int BUFFSIZE = 1024; const int BUFFSIZE = 1024;
...@@ -73,7 +147,7 @@ std::string ScriptManager::exec(std::string command) ...@@ -73,7 +147,7 @@ std::string ScriptManager::exec(std::string command)
pclose(pipe); pclose(pipe);
DEBUG_STREAM << "ScriptManager::exec() " << result << endl; DEBUG_STREAM << "ScriptManager::exec() >> " << result << endl;
return result; return result;
} }
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include <tango.h> #include <tango.h>
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
#include <boost/filesystem.hpp>
namespace PreProcessor_ns namespace PreProcessor_ns
{ {
...@@ -44,9 +45,16 @@ public: ...@@ -44,9 +45,16 @@ public:
static ScriptManager::SP create(Tango::DeviceImpl*, Configuration::SP); static ScriptManager::SP create(Tango::DeviceImpl*, Configuration::SP);
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// [Public] Script method // [Public] Script methods
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
virtual bool isReadyToArchive(std::string) throw(std::runtime_error); virtual void checkScriptCompliance()
throw(std::runtime_error);
virtual bool isFileVerified(boost::filesystem::path&)
throw(std::runtime_error);
virtual void preProcessFile(boost::filesystem::path&)
throw(std::runtime_error);
protected: protected:
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
......
#include <WorkerThread.h> #include <WorkerThread.h>
#include <PreProcessor.h> #include <PreProcessor.h>
#include <cassert>
#include <fstream>
#include <tango.h> #include <tango.h>
#include <boost/scoped_ptr.hpp> #include <boost/chrono.hpp>
#include <boost/iostreams/filtering_streambuf.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/iostreams/filter/gzip.hpp>
#include <boost/regex.hpp>
#include <boost/lexical_cast.hpp>
namespace PreProcessor_ns namespace PreProcessor_ns
{ {
...@@ -20,9 +12,10 @@ namespace PreProcessor_ns ...@@ -20,9 +12,10 @@ namespace PreProcessor_ns
// WorkerThread::WorkerThread() // WorkerThread::WorkerThread()
//============================================================================== //==============================================================================
WorkerThread::WorkerThread(PreProcessor* preProcessor_p, WorkerThread::WorkerThread(PreProcessor* preProcessor_p,
EventBuffer::SP eventBuffer_sp, Configuration::SP configuration_sp) : EventBuffer::SP eventBuffer_sp, ScriptManager::SP fileManager_sp,
Tango::LogAdapter(preProcessor_p), m_preProcessor_p(preProcessor_p), Configuration::SP configuration_sp) : Tango::LogAdapter(preProcessor_p),
m_eventBuffer_sp(eventBuffer_sp), m_configuration_sp(configuration_sp) m_preProcessor_p(preProcessor_p), m_eventBuffer_sp(eventBuffer_sp),
m_fileManager_sp(fileManager_sp), m_configuration_sp(configuration_sp)
{ {
DEBUG_STREAM << "WorkerThread::WorkerThread()" << std::endl; DEBUG_STREAM << "WorkerThread::WorkerThread()" << std::endl;
} }
...@@ -42,100 +35,115 @@ void WorkerThread::workerLoop() ...@@ -42,100 +35,115 @@ void WorkerThread::workerLoop()
{ {
DEBUG_STREAM << "WorkerThread::workerLoop(): starting loop" << endl; DEBUG_STREAM << "WorkerThread::workerLoop(): starting loop" << endl;
// unsigned int waitTime = m_configuration_sp->getWaitTime(); //Sleep time between two verification attempts
// std::string watchPath = m_configuration_sp->getWatchPath(); boost::chrono::seconds sleepTime(m_configuration_sp->getSleepTime());
// while(true) //Verification attempts timeout
// { boost::chrono::steady_clock::duration waitTime =
// try boost::chrono::seconds(m_configuration_sp->getWaitTime());
// {
// //Wait new file event to process while(true)
// boost::filesystem::path origPath(m_eventBuffer_sp->waitNew()); {
// std::string fileName = origPath.stem().string(); try
// {
// //If configured wait after new file event //Wait new file event to process
// boost::posix_time::milliseconds waitPosixTime(waitTime); boost::filesystem::path origPath(m_eventBuffer_sp->waitNew());
// boost::this_thread::sleep( waitPosixTime );
// std::string fileName = origPath.stem().string();
// bool completed = false;
// //Start timer
// INFO_STREAM << "WorkerThread::workerLoop() processing \"" boost::chrono::steady_clock::time_point start =
// << fileName << "\"" << endl; boost::chrono::steady_clock::now();
//
// try INFO_STREAM << "WorkerThread::workerLoop() processing \""
// { << fileName << "\"" << endl;
// //Open fits file try
// boost::shared_ptr<CCfits::FITS> fitsFile_sp( {
// new CCfits::FITS(origPath.string(), CCfits::Write)); bool verified = false;
//
// //Read all key in primary HDU do
// CCfits::PHDU& phdu = fitsFile_sp->pHDU(); {
// phdu.readAllKeys(); if(m_fileManager_sp->isFileVerified(origPath))
// {
// try verified = true;
// { break;
// //Try to ingest using an instrument in list }
// ingestUsingInstrumentList(origPath, fitsFile_sp); else
// {
// completed = true; boost::this_thread::sleep_for(sleepTime);
// m_fitsImporter_p->incrementRegularCounter(); }
// INFO_STREAM << "WorkerThread::workerLoop() \"" << fileName }
// << "\" archived regularly" << endl; while(boost::chrono::steady_clock::now()-start <= waitTime);
// }
// catch(CCfits::FitsException& ex) copyToDestination(origPath);
// {
// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.message() << endl; if(verified)
// } {
// catch(std::runtime_error& ex) INFO_STREAM << "WorkerThread::workerLoop() \"" << fileName
// { << "\" ingested regularly" << endl;
// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.what() << endl; m_preProcessor_p->incrementRegularCounter();
// } }
// else
// if(!completed) {
// { WARN_STREAM << "WorkerThread::workerLoop() \"" << fileName
// //Try to ingest using default instrument << "\" archived after timeout" << endl;
// ingestUsingDefaultInstrument(origPath, fitsFile_sp); m_preProcessor_p->incrementWarningCounter();
// }
// completed = true; }
// m_fitsImporter_p->incrementWarningCounter(); catch(std::runtime_error& ex)
// WARN_STREAM << "WorkerThread::workerLoop() \"" << fileName {
// << "\" archived in default instrument" << endl; ERROR_STREAM << "WorkerThread::workerLoop() \"" << fileName
// } << "\" not archived due to " << ex.what() << endl;
// } m_preProcessor_p->incrementErrorCounter();
// catch(CCfits::FitsException& ex) }
// {
// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.message() << endl; m_eventBuffer_sp->markAsProcessed(origPath);
// } }
// catch(std::runtime_error& ex) catch(boost::thread_interrupted& ex)
// { {
// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.what() << endl; DEBUG_STREAM << "WorkerThread::workerLoop() interrupt" << endl;
// } break;
// }
// if(!completed) catch(std::exception& ex)
// { {
// //Cannot ingest new file => notify error ERROR_STREAM << "WorkerThread::workerLoop() " << ex.what() << endl;
// m_fitsImporter_p->incrementErrorCounter(); }
// ERROR_STREAM << "WorkerThread::workerLoop() \"" << fileName catch(...)
// << "\" not archived" << endl; {
// } ERROR_STREAM << "WorkerThread::workerLoop() unknown exception" << endl;
// }
// m_eventBuffer_sp->markAsProcessed(origPath); } //thread loop
// } }
// catch(boost::thread_interrupted& ex)
// { //==============================================================================
// DEBUG_STREAM << "WorkerThread::workerLoop() interrupt" << endl; // WorkerThread::moveFile()()
// //==============================================================================
// break; void WorkerThread::copyToDestination(boost::filesystem::path& origPath)
// } throw (std::runtime_error)
// catch(std::exception& ex) {
// { DEBUG_STREAM << "WorkerThread::moveFile()" << endl;
// ERROR_STREAM << "WorkerThread::workerLoop() " << ex.what() << endl;
// } if(!boost::filesystem::is_regular_file(origPath))
// catch(...) throw std::runtime_error( "Origin path \"" +
// { origPath.string() + "\" is not a regular file");
// ERROR_STREAM << "WorkerThread::workerLoop() unknown exception" << endl;
// } boost::filesystem::path destPath(m_configuration_sp->getDestPath());
// } //while
if(!boost::filesystem::exists(destPath))
throw std::runtime_error( "Destination path \"" +
destPath.string() + "\" not exists");
if(!boost::filesystem::is_directory(destPath))
throw std::runtime_error( "Destination path \"" +
destPath.string() + "\" is not a directory");
destPath /= origPath.filename();
if(boost::filesystem::exists(destPath))
throw std::runtime_error( "Destination path \"" +
destPath.string() + "\" already exists");
boost::filesystem::copy(origPath, destPath);
} }
} //namespace } //namespace
...@@ -3,13 +3,12 @@ ...@@ -3,13 +3,12 @@
#include <Configuration.h> #include <Configuration.h>
#include <EventBuffer.h> #include <EventBuffer.h>
#include <ScriptManager.h>
#include <tango.h> #include <tango.h>
#include <stdexcept>
#include <sstream>
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
#include <boost/filesystem.hpp>
namespace PreProcessor_ns namespace PreProcessor_ns
{ {
...@@ -22,7 +21,8 @@ public: ...@@ -22,7 +21,8 @@ public:
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// [Public] Constructor destructor // [Public] Constructor destructor
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
WorkerThread(PreProcessor*, EventBuffer::SP, Configuration::SP); WorkerThread(PreProcessor*, EventBuffer::SP,
ScriptManager::SP, Configuration::SP);
virtual ~WorkerThread(); virtual ~WorkerThread();
...@@ -35,6 +35,8 @@ protected: ...@@ -35,6 +35,8 @@ protected:
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// [Protected] Utilities methods // [Protected] Utilities methods
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
virtual void copyToDestination(boost::filesystem::path&)
throw (std::runtime_error);
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// [Protected] Class variables // [Protected] Class variables
...@@ -45,6 +47,9 @@ protected: ...@@ -45,6 +47,9 @@ protected:
//Event buffer shared pointer //Event buffer shared pointer
EventBuffer::SP m_eventBuffer_sp; EventBuffer::SP m_eventBuffer_sp;
//File manager shared pointer
ScriptManager::SP m_fileManager_sp;
//Configuration shared pointer //Configuration shared pointer
Configuration::SP m_configuration_sp; Configuration::SP m_configuration_sp;
}; };
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment