Skip to content
Snippets Groups Projects
Commit 092e1824 authored by Marco De Marco's avatar Marco De Marco
Browse files

Ignore files, pre and post process added

parent 13e70b66
No related branches found
No related tags found
No related merge requests found
...@@ -23,12 +23,12 @@ private: ...@@ -23,12 +23,12 @@ private:
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
Configuration(std::string watchPath, std::string regularPath, Configuration(std::string watchPath, std::string regularPath,
std::string warningPath, std::string errorPath, std::string scriptPath, std::string warningPath, std::string errorPath, std::string scriptPath,
int workerNumber, int sleepTime, int waitTime, uint32_t iNotifyMask, int workerNumber, int sleepTime, int waitTime, uint32_t iNotifyMask) :
bool ignoreTimeout) : m_watchPath(watchPath), m_regularPath(regularPath), m_watchPath(watchPath), m_regularPath(regularPath),
m_warningPath(warningPath), m_errorPath(errorPath), m_warningPath(warningPath), m_errorPath(errorPath),
m_scriptPath(scriptPath), m_workerNumber(workerNumber), m_scriptPath(scriptPath), m_workerNumber(workerNumber),
m_sleepTime(sleepTime), m_waitTime(waitTime), m_sleepTime(sleepTime), m_waitTime(waitTime),
m_iNotifyMask(iNotifyMask), m_ignoreTimeout(ignoreTimeout) { } m_iNotifyMask(iNotifyMask) { }
virtual ~Configuration() {} virtual ~Configuration() {}
...@@ -47,11 +47,11 @@ public: ...@@ -47,11 +47,11 @@ public:
static Configuration::SP create(std::string watchPath, static Configuration::SP create(std::string watchPath,
std::string regularPath, std::string warningPath, std::string errorPath, std::string regularPath, std::string warningPath, std::string errorPath,
std::string scriptPath, int workerNumber, int sleepTime, int waitTime, std::string scriptPath, int workerNumber, int sleepTime, int waitTime,
uint32_t iNotifyMask, bool ignoreTimeout) uint32_t iNotifyMask)
{ {
Configuration::SP c_sp(new Configuration(watchPath, regularPath, Configuration::SP c_sp(new Configuration(watchPath, regularPath,
warningPath, errorPath, scriptPath, workerNumber, sleepTime, warningPath, errorPath, scriptPath, workerNumber, sleepTime,
waitTime, iNotifyMask, ignoreTimeout), Configuration::Deleter()); waitTime, iNotifyMask), Configuration::Deleter());
return c_sp; return c_sp;
} }
...@@ -65,7 +65,6 @@ public: ...@@ -65,7 +65,6 @@ public:
unsigned int getSleepTime() const { return m_sleepTime; } unsigned int getSleepTime() const { return m_sleepTime; }
unsigned int getWaitTime() const { return m_waitTime; } unsigned int getWaitTime() const { return m_waitTime; }
uint32_t getINotifyMask() const { return m_iNotifyMask; } uint32_t getINotifyMask() const { return m_iNotifyMask; }
bool getIgnoreTimeout() const { return m_ignoreTimeout; }
private: private:
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
...@@ -97,9 +96,6 @@ private: ...@@ -97,9 +96,6 @@ private:
//INotify mask //INotify mask
const uint32_t m_iNotifyMask; const uint32_t m_iNotifyMask;
//Ignore timeout
const bool m_ignoreTimeout;
}; };
} //End of namespace } //End of namespace
......
...@@ -229,7 +229,6 @@ void PreProcessor::get_device_property() ...@@ -229,7 +229,6 @@ void PreProcessor::get_device_property()
dev_prop.push_back(Tango::DbDatum("SleepTime")); dev_prop.push_back(Tango::DbDatum("SleepTime"));
dev_prop.push_back(Tango::DbDatum("WaitTime")); dev_prop.push_back(Tango::DbDatum("WaitTime"));
dev_prop.push_back(Tango::DbDatum("WorkerNumber")); dev_prop.push_back(Tango::DbDatum("WorkerNumber"));
dev_prop.push_back(Tango::DbDatum("IgnoreTimeout"));
dev_prop.push_back(Tango::DbDatum("AutoStart")); dev_prop.push_back(Tango::DbDatum("AutoStart"));
...@@ -345,17 +344,6 @@ void PreProcessor::get_device_property() ...@@ -345,17 +344,6 @@ void PreProcessor::get_device_property()
// And try to extract WorkerNumber value from database // And try to extract WorkerNumber value from database
if (dev_prop[i].is_empty()==false) dev_prop[i] >> workerNumber; if (dev_prop[i].is_empty()==false) dev_prop[i] >> workerNumber;
// Try to initialize IgnoreTimeout from class property
cl_prop = ds_class->get_class_property(dev_prop[++i].name);
if (cl_prop.is_empty()==false) cl_prop >> ignoreTimeout;
else {
// Try to initialize IgnoreTimeout from default device value
def_prop = ds_class->get_default_device_property(dev_prop[i].name);
if (def_prop.is_empty()==false) def_prop >> ignoreTimeout;
}
// And try to extract IgnoreTimeout value from database
if (dev_prop[i].is_empty()==false) dev_prop[i] >> ignoreTimeout;
// Try to initialize AutoStart from class property // Try to initialize AutoStart from class property
cl_prop = ds_class->get_class_property(dev_prop[++i].name); cl_prop = ds_class->get_class_property(dev_prop[++i].name);
if (cl_prop.is_empty()==false) cl_prop >> autoStart; if (cl_prop.is_empty()==false) cl_prop >> autoStart;
...@@ -418,7 +406,7 @@ void PreProcessor::get_device_property() ...@@ -418,7 +406,7 @@ void PreProcessor::get_device_property()
throw(invalid_argument("WorkerNumber property out of range or not defined")); throw(invalid_argument("WorkerNumber property out of range or not defined"));
m_configuration_sp = Configuration::create(watchPath, regularPath, warningPath, m_configuration_sp = Configuration::create(watchPath, regularPath, warningPath,
errorPath, scriptPath, workerNumber, sleepTime, waitTime, inotifyMask, ignoreTimeout); errorPath, scriptPath, workerNumber, sleepTime, waitTime, inotifyMask);
} }
catch(invalid_argument& ex) catch(invalid_argument& ex)
{ {
...@@ -439,7 +427,7 @@ void PreProcessor::get_device_property() ...@@ -439,7 +427,7 @@ void PreProcessor::get_device_property()
//-------------------------------------------------------- //--------------------------------------------------------
void PreProcessor::always_executed_hook() void PreProcessor::always_executed_hook()
{ {
INFO_STREAM << "PreProcessor::always_executed_hook() " << device_name << endl; DEBUG_STREAM << "PreProcessor::always_executed_hook() " << device_name << endl;
/*----- PROTECTED REGION ID(PreProcessor::always_executed_hook) ENABLED START -----*/ /*----- PROTECTED REGION ID(PreProcessor::always_executed_hook) ENABLED START -----*/
if(get_state() != Tango::FAULT) if(get_state() != Tango::FAULT)
......
...@@ -120,10 +120,6 @@ public: ...@@ -120,10 +120,6 @@ public:
Tango::DevUShort waitTime; Tango::DevUShort waitTime;
// WorkerNumber: // WorkerNumber:
Tango::DevUShort workerNumber; Tango::DevUShort workerNumber;
// IgnoreTimeout: Behaviour for missing end of file after timeout case:
// true = process those files like regulars files
// false = process those files like erroneous files
Tango::DevBoolean ignoreTimeout;
// AutoStart: Exec On command after init if state is not fault // AutoStart: Exec On command after init if state is not fault
Tango::DevBoolean autoStart; Tango::DevBoolean autoStart;
......
...@@ -41,11 +41,6 @@ ...@@ -41,11 +41,6 @@
<type xsi:type="pogoDsl:UShortType"/> <type xsi:type="pogoDsl:UShortType"/>
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/> <status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
</deviceProperties> </deviceProperties>
<deviceProperties name="IgnoreTimeout" description="Behaviour for missing end of file after timeout case: &#xA;true = process those files like regulars files&#xA;false = process those files like erroneous files">
<type xsi:type="pogoDsl:BooleanType"/>
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
<DefaultPropValue>false</DefaultPropValue>
</deviceProperties>
<deviceProperties name="AutoStart" description="Exec On command after init if state is not fault"> <deviceProperties name="AutoStart" description="Exec On command after init if state is not fault">
<type xsi:type="pogoDsl:BooleanType"/> <type xsi:type="pogoDsl:BooleanType"/>
<status abstract="false" inherited="false" concrete="true" concreteHere="true"/> <status abstract="false" inherited="false" concrete="true" concreteHere="true"/>
......
...@@ -395,20 +395,6 @@ void PreProcessorClass::set_default_property() ...@@ -395,20 +395,6 @@ void PreProcessorClass::set_default_property()
dev_def_prop.push_back(data); dev_def_prop.push_back(data);
add_wiz_dev_prop(prop_name, prop_desc, prop_def); add_wiz_dev_prop(prop_name, prop_desc, prop_def);
} }
else
add_wiz_dev_prop(prop_name, prop_desc);
prop_name = "IgnoreTimeout";
prop_desc = "Behaviour for missing end of file after timeout case: \ntrue = process those files like regulars files\nfalse = process those files like erroneous files";
prop_def = "false";
vect_data.clear();
vect_data.push_back("false");
if (prop_def.length()>0)
{
Tango::DbDatum data(prop_name);
data << vect_data ;
dev_def_prop.push_back(data);
add_wiz_dev_prop(prop_name, prop_desc, prop_def);
}
else else
add_wiz_dev_prop(prop_name, prop_desc); add_wiz_dev_prop(prop_name, prop_desc);
prop_name = "AutoStart"; prop_name = "AutoStart";
......
#include <ScriptManager.h> #include <ScriptManager.h>
#include <cstdio> #include <stdio.h>
namespace PreProcessor_ns namespace PreProcessor_ns
{ {
...@@ -57,7 +57,11 @@ void ScriptManager::checkScriptCompliance() ...@@ -57,7 +57,11 @@ void ScriptManager::checkScriptCompliance()
std::string result = exec(command.str()); std::string result = exec(command.str());
if(result.find("CHECK OK") == std::string::npos) if(result.find("CHECK OK") == std::string::npos)
throw std::runtime_error("Invalid script"); {
std::stringstream errorStream;
errorStream << "Check script error: " << result;
throw std::runtime_error(errorStream.str());
}
} }
//============================================================================== //==============================================================================
...@@ -78,7 +82,7 @@ bool ScriptManager::isFileNameValid(boost::filesystem::path& filePath) ...@@ -78,7 +82,7 @@ bool ScriptManager::isFileNameValid(boost::filesystem::path& filePath)
{ {
return true; return true;
} }
else if(result.find("VALID FATAL") != std::string::npos) else if(result.find("VALID IGNORE") != std::string::npos)
{ {
return false; return false;
} }
...@@ -137,29 +141,26 @@ void ScriptManager::preProcessFile(boost::filesystem::path& filePath, ...@@ -137,29 +141,26 @@ void ScriptManager::preProcessFile(boost::filesystem::path& filePath,
<< " PREPROCESS " << filePath.string(); << " PREPROCESS " << filePath.string();
if(verified == OK) if(verified == OK)
{
command << " OK"; command << " OK";
}
else if(verified == WAIT) else if(verified == WAIT)
{
command << " WAIT"; command << " WAIT";
}
else if(verified == FATAL) else if(verified == FATAL)
{
command << " FATAL"; command << " FATAL";
}
std::string result = exec(command.str()); std::string result = exec(command.str());
if(result.find("PREPROCESS OK") == std::string::npos) if(result.find("PREPROCESS OK") == std::string::npos)
{
if(result.find("PREPROCESS FATAL") != std::string::npos)
{ {
std::stringstream errorStream; std::stringstream errorStream;
errorStream << "Pre process error: " << result; errorStream << "Pre process error: " << result;
throw std::runtime_error(errorStream.str()); throw std::runtime_error(errorStream.str());
} }
else
{
std::stringstream errorStream;
errorStream << "Unknown pre process error: " << result;
throw std::runtime_error(errorStream.str());
}
}
} }
//============================================================================== //==============================================================================
...@@ -175,29 +176,26 @@ void ScriptManager::postProcessFile(boost::filesystem::path& filePath, ...@@ -175,29 +176,26 @@ void ScriptManager::postProcessFile(boost::filesystem::path& filePath,
<< " POSTPROCESS " << filePath.string(); << " POSTPROCESS " << filePath.string();
if(verified == OK) if(verified == OK)
{
command << " OK"; command << " OK";
}
else if(verified == WAIT) else if(verified == WAIT)
{
command << " WAIT"; command << " WAIT";
}
else if(verified == FATAL) else if(verified == FATAL)
{
command << " FATAL"; command << " FATAL";
}
std::string result = exec(command.str()); std::string result = exec(command.str());
if(result.find("POSTPROCESS OK") == std::string::npos) if(result.find("POSTPROCESS OK") == std::string::npos)
{
if(result.find("POSTPROCESS FATAL") != std::string::npos)
{ {
std::stringstream errorStream; std::stringstream errorStream;
errorStream << "Post process error: " << result; errorStream << "Post process error: " << result;
throw std::runtime_error(errorStream.str()); throw std::runtime_error(errorStream.str());
} }
else
{
std::stringstream errorStream;
errorStream << "Unknown post process error: " << result;
throw std::runtime_error(errorStream.str());
}
}
} }
//============================================================================== //==============================================================================
......
...@@ -42,9 +42,6 @@ void WorkerThread::workerLoop() ...@@ -42,9 +42,6 @@ void WorkerThread::workerLoop()
boost::chrono::steady_clock::duration waitTime = boost::chrono::steady_clock::duration waitTime =
boost::chrono::seconds(m_configuration_sp->getWaitTime()); boost::chrono::seconds(m_configuration_sp->getWaitTime());
//Process end of file after timeout like regualr files
bool ignoreTimeout = m_configuration_sp->getIgnoreTimeout();
while(true) while(true)
{ {
try try
...@@ -58,7 +55,7 @@ void WorkerThread::workerLoop() ...@@ -58,7 +55,7 @@ void WorkerThread::workerLoop()
boost::chrono::steady_clock::time_point start = boost::chrono::steady_clock::time_point start =
boost::chrono::steady_clock::now(); boost::chrono::steady_clock::now();
INFO_STREAM << "WorkerThread::workerLoop() processing \"" DEBUG_STREAM << "WorkerThread::workerLoop() processing \""
<< fileName << "\"" << endl; << fileName << "\"" << endl;
try try
{ {
...@@ -67,60 +64,62 @@ void WorkerThread::workerLoop() ...@@ -67,60 +64,62 @@ void WorkerThread::workerLoop()
{ {
ScriptManager::Verified verified; ScriptManager::Verified verified;
//Waiting for the file to be written until timeout
do do
{ {
//Check file state
verified = m_fileManager_sp->isFileVerified(origPath); verified = m_fileManager_sp->isFileVerified(origPath);
if(verified == ScriptManager::OK || if(verified == ScriptManager::OK ||
verified == ScriptManager::FATAL) verified == ScriptManager::FATAL)
break; break;
DEBUG_STREAM << "WorkerThread::workerLoop() waiting \""
<< fileName << "\" to be written" << endl;
boost::this_thread::sleep_for(sleepTime); boost::this_thread::sleep_for(sleepTime);
} }
while(boost::chrono::steady_clock::now()-start <= waitTime); while(boost::chrono::steady_clock::now()-start <= waitTime);
//Exec pre process script
m_fileManager_sp->preProcessFile(origPath, verified); m_fileManager_sp->preProcessFile(origPath, verified);
//Copy file to destination
copyToDestination(origPath, verified); copyToDestination(origPath, verified);
//Exec post process script
m_fileManager_sp->postProcessFile(origPath, verified); m_fileManager_sp->postProcessFile(origPath, verified);
if(verified == ScriptManager::Verified::OK) if(verified == ScriptManager::Verified::OK)
{ {
INFO_STREAM << "WorkerThread::workerLoop() \"" << fileName INFO_STREAM << "WorkerThread::workerLoop() \"" << fileName
<< "\" ingested regularly" << endl; << "\" is a regular file" << endl;
m_preProcessor_p->incrementRegularCounter(); m_preProcessor_p->incrementRegularCounter();
} }
else if(verified == ScriptManager::Verified::WAIT) else if(verified == ScriptManager::Verified::WAIT)
{ {
WARN_STREAM << "WorkerThread::workerLoop() \"" << fileName WARN_STREAM << "WorkerThread::workerLoop() \"" << fileName
<< "\" archived after timeout" << endl; << "\" has reached a wait eof timeout" << endl;
m_preProcessor_p->incrementWarningCounter(); m_preProcessor_p->incrementWarningCounter();
} }
else if(verified == ScriptManager::Verified::FATAL) else if(verified == ScriptManager::Verified::FATAL)
{ {
WARN_STREAM << "WorkerThread::workerLoop() \"" << fileName WARN_STREAM << "WorkerThread::workerLoop() \"" << fileName
<< "\" not archived due to fits fatal error" << endl; << "\" has fatal error" << endl;
m_preProcessor_p->incrementErrorCounter(); m_preProcessor_p->incrementErrorCounter();
} }
} }
else else
{ {
INFO_STREAM << "WorkerThread::workerLoop() \"" << fileName INFO_STREAM << "WorkerThread::workerLoop() \"" << fileName
<< "\" ignored" << endl; << "\" is an ignore file" << endl;
m_preProcessor_p->incrementIgnoredCounter(); m_preProcessor_p->incrementIgnoredCounter();
} }
} }
catch(std::exception& ex) catch(std::exception& ex)
{ {
ERROR_STREAM << "WorkerThread::workerLoop() \"" << fileName ERROR_STREAM << "WorkerThread::workerLoop() \"" << fileName
<< "\" not archived due to " << ex.what() << endl; << "\" has an error due to " << ex.what() << endl;
m_preProcessor_p->incrementErrorCounter();
}
catch(...)
{
ERROR_STREAM << "WorkerThread::workerLoop() \"" << fileName
<< "\" not archived due to unknown exception" << endl;
m_preProcessor_p->incrementErrorCounter(); m_preProcessor_p->incrementErrorCounter();
} }
...@@ -128,9 +127,17 @@ void WorkerThread::workerLoop() ...@@ -128,9 +127,17 @@ void WorkerThread::workerLoop()
} }
catch(boost::thread_interrupted& ex) catch(boost::thread_interrupted& ex)
{ {
DEBUG_STREAM << "WorkerThread::workerLoop() interrupt" << endl; DEBUG_STREAM << "WorkerThread::workerLoop() interrupt request" << endl;
break; break;
} }
catch(std::exception& ex)
{
ERROR_STREAM << "WorkerThread::workerLoop() " << ex.what() << endl;
}
catch(...)
{
ERROR_STREAM << "WorkerThread::workerLoop() unknown exception" << endl;
}
} //thread loop } //thread loop
} }
...@@ -140,31 +147,29 @@ void WorkerThread::workerLoop() ...@@ -140,31 +147,29 @@ void WorkerThread::workerLoop()
void WorkerThread::copyToDestination(boost::filesystem::path& origPath, void WorkerThread::copyToDestination(boost::filesystem::path& origPath,
ScriptManager::Verified verified) throw (std::runtime_error) ScriptManager::Verified verified) throw (std::runtime_error)
{ {
DEBUG_STREAM << "WorkerThread::moveFile()" << endl; DEBUG_STREAM << "WorkerThread::copyToDestination()" << endl;
//Process end of file after timeout like regualr files std::string fileName = origPath.stem().string();
bool ignoreTimeout = m_configuration_sp->getIgnoreTimeout();
boost::filesystem::path destPath; boost::filesystem::path destPath;
//Retrieve destination path based verification result
if(verified == ScriptManager::OK) if(verified == ScriptManager::OK)
{ {
destPath /= m_configuration_sp->getRegularPath(); destPath /= m_configuration_sp->getRegularPath();
} }
else if(verified == ScriptManager::WAIT) else if(verified == ScriptManager::WAIT)
{ {
if(ignoreTimeout)
destPath /= m_configuration_sp->getRegularPath();
else
destPath /= m_configuration_sp->getWarningPath(); destPath /= m_configuration_sp->getWarningPath();
} }
else else if(verified == ScriptManager::FATAL)
{ {
destPath /= m_configuration_sp->getErrorPath(); destPath /= m_configuration_sp->getErrorPath();
} }
//TODO: come fare per le destinazioni vuote? //If destination is defined then copy file
if(!destPath.empty())
{
if(!boost::filesystem::is_regular_file(origPath)) if(!boost::filesystem::is_regular_file(origPath))
throw std::runtime_error( "Origin path \"" + throw std::runtime_error( "Origin path \"" +
origPath.string() + "\" is not a regular file"); origPath.string() + "\" is not a regular file");
...@@ -184,6 +189,15 @@ void WorkerThread::copyToDestination(boost::filesystem::path& origPath, ...@@ -184,6 +189,15 @@ void WorkerThread::copyToDestination(boost::filesystem::path& origPath,
destPath.string() + "\" already exists"); destPath.string() + "\" already exists");
boost::filesystem::copy(origPath, destPath); boost::filesystem::copy(origPath, destPath);
DEBUG_STREAM << "WorkerThread::copyToDestination() \"" << fileName
<< "\" copied to \"" << destPath.parent_path() << "\"" << endl;
}
else
{
DEBUG_STREAM << "WorkerThread::copyToDestination() \"" << fileName
<< "\" has not a defined destination" << endl;
}
} }
} //namespace } //namespace
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment