#include <EventThread.h> #include <PreProcessor.h> #include <WorkerThread.h> #include <cassert> #include <unistd.h> #include <fcntl.h> #include <cerrno> #include <boost/filesystem.hpp> #include <boost/scoped_ptr.hpp> namespace PreProcessor_ns { //============================================================================== // EventThread::EventThread() //============================================================================== EventThread::EventThread(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp) : Tango::LogAdapter(deviceImpl_p), m_configuration_sp(configuration_sp) { DEBUG_STREAM << "EventThread::EventThread()" << endl; m_state = Tango::OFF; m_status = "Event thread not running"; } //============================================================================== // EventThread::~EventThread() //============================================================================== EventThread::~EventThread() { DEBUG_STREAM << "EventThread::~EventThread()" << endl; if(m_threadGroup_sp) { m_threadGroup_sp->interrupt_all(); m_threadGroup_sp->join_all(); } } //============================================================================== // EventThread::create() //============================================================================== EventThread::SP EventThread::create(Tango::DeviceImpl* deviceImpl_p, Configuration::SP configuration_sp) { EventThread::SP e_sp(new EventThread(deviceImpl_p, configuration_sp), EventThread::Deleter()); return e_sp; } //============================================================================== // EventThread::start() //============================================================================== void EventThread::start() { DEBUG_STREAM << "EventThread::start()" << endl; try { } catch(std::exception& ex) { std::stringstream error_stream; error_stream << "Event thread not running " << ex.what(); writeState(Tango::ALARM); writeStatus(error_stream.str()); } catch(...) { writeState(Tango::ALARM); writeStatus("Event thread unknown exception"); } } //============================================================================== // EventThread::stop() //============================================================================== void EventThread::stop() { DEBUG_STREAM << "EventThread::stop()" << endl; if(m_threadGroup_sp) { m_threadGroup_sp->interrupt_all(); m_threadGroup_sp->join_all(); } inotify_rm_watch(m_fileDescriptor, m_watchDescriptor); if(m_fileDescriptor) { close(m_fileDescriptor); } } //============================================================================== // EventThread::readState() //============================================================================== Tango::DevState EventThread::readState() { DEBUG_STREAM << "EventThread::readState()" << endl; boost::mutex::scoped_lock stateLock(m_stateMutex); return m_state; } //============================================================================== // EventThread::readStatus() //============================================================================== std::string EventThread::readStatus() { DEBUG_STREAM << "EventThread::readStatus()" << endl; boost::mutex::scoped_lock statusLock(m_statusMutex); return m_status; } //============================================================================== // EventThread::writeState() //============================================================================== void EventThread::writeState(Tango::DevState state) { DEBUG_STREAM << "Client::writeState()" << endl; boost::mutex::scoped_lock stateLock(m_stateMutex); m_state = state; } //============================================================================== // EventThread::writeStatus() //============================================================================== void EventThread::writeStatus(std::string status) { DEBUG_STREAM << "Client::writeStatus()" << endl; boost::mutex::scoped_lock statusLock(m_statusMutex); m_status = status; } //============================================================================== // EventThread::eventLoop() //============================================================================== void EventThread::eventLoop() { DEBUG_STREAM << "EventThread::eventLoop() starting loop" << endl; unsigned int sleepTime = m_configuration_sp->getSleepTime(); boost::filesystem::path watchPath(m_configuration_sp->getWatchPath()); while(true) { try { char buffer[BUF_LEN]; int length = 0; if((length = read( m_fileDescriptor, buffer, BUF_LEN )) < 0) { if(errno != EINTR && errno != EAGAIN) { writeState(Tango::ALARM); writeStatus("Event thread error on watch path read"); } else { writeState(Tango::ON); writeStatus("Event thread running"); } } else { writeState(Tango::ON); writeStatus("Event thread new data found"); } struct inotify_event *event; for(int i=0; i<length; i += EVENT_SIZE + event->len) { event = ( struct inotify_event * ) &buffer[ i ]; DEBUG_STREAM << "EVENT: " << event->name << endl; //TODO: delete me //Add path to file name boost::filesystem::path file(event->name); boost::filesystem::path path(watchPath); path /= file; //Check if event is a regular file if(boost::filesystem::is_regular_file(path)) m_eventBuffer_sp->insertNew(path); } m_eventBuffer_sp->removeAllProcessed(); DEBUG_STREAM << "EventThread::eventLoop() sleep for " << sleepTime << endl; boost::posix_time::milliseconds sleepPosixTime(sleepTime); boost::this_thread::sleep(sleepPosixTime); } catch(boost::thread_interrupted& ex) { DEBUG_STREAM << "EventThread::eventLoop() stopping loop" << endl; writeState(Tango::OFF); writeStatus("Event thread not running"); break; } catch(std::exception& ex) { std::stringstream error_stream; error_stream << "Event thread not running " << ex.what(); writeState(Tango::ALARM); writeStatus(error_stream.str()); } catch(...) { writeState(Tango::ALARM); writeStatus("Event thread unknown exception"); } } //while } } //namespace