diff --git a/deps/Astri-Packets b/deps/Astri-Packets index b9c7127f512229371df5c98cda9a7bd728eaae83..a4c32676a99747fa8210886b20bdc39620635039 160000 --- a/deps/Astri-Packets +++ b/deps/Astri-Packets @@ -1 +1 @@ -Subproject commit b9c7127f512229371df5c98cda9a7bd728eaae83 +Subproject commit a4c32676a99747fa8210886b20bdc39620635039 diff --git a/deps/Base-DAQ b/deps/Base-DAQ index 8a0ea2d0e699863df5fe1c91caf2d7b0855957be..a00f9a27afbf5f75dab7db2368b9b9b6fcb395e1 160000 --- a/deps/Base-DAQ +++ b/deps/Base-DAQ @@ -1 +1 @@ -Subproject commit 8a0ea2d0e699863df5fe1c91caf2d7b0855957be +Subproject commit a00f9a27afbf5f75dab7db2368b9b9b6fcb395e1 diff --git a/deps/CL-Configurator b/deps/CL-Configurator index 040a6736589b17033f28c9ad2f71879c1fcc7453..4ab1353f28eccfa75f7216306ed9c893bced1083 160000 --- a/deps/CL-Configurator +++ b/deps/CL-Configurator @@ -1 +1 @@ -Subproject commit 040a6736589b17033f28c9ad2f71879c1fcc7453 +Subproject commit 4ab1353f28eccfa75f7216306ed9c893bced1083 diff --git a/deps/File-Receiver b/deps/File-Receiver index e06b5132e2da1e49d4338be58d913b6304e5df8a..a5881a457f74ea6f5b9b1320d02cc4fda2046821 160000 --- a/deps/File-Receiver +++ b/deps/File-Receiver @@ -1 +1 @@ -Subproject commit e06b5132e2da1e49d4338be58d913b6304e5df8a +Subproject commit a5881a457f74ea6f5b9b1320d02cc4fda2046821 diff --git a/deps/Kafka-Avro-Provider b/deps/Kafka-Avro-Provider index b8331fbae46c2b4f9670234caa2a9a23b389436e..3afcad80087b5d67cf3ebecf3df575727ba08493 160000 --- a/deps/Kafka-Avro-Provider +++ b/deps/Kafka-Avro-Provider @@ -1 +1 @@ -Subproject commit b8331fbae46c2b4f9670234caa2a9a23b389436e +Subproject commit 3afcad80087b5d67cf3ebecf3df575727ba08493 diff --git a/deps/Mysql-Configurator b/deps/Mysql-Configurator index ebbb0f36843dd8f58e3b0442bfe5078732b3e022..3cf7397511847e3ffbf42a736aac74a97b131210 160000 --- a/deps/Mysql-Configurator +++ b/deps/Mysql-Configurator @@ -1 +1 @@ -Subproject commit ebbb0f36843dd8f58e3b0442bfe5078732b3e022 +Subproject commit 3cf7397511847e3ffbf42a736aac74a97b131210 diff --git a/deps/Redis_Receiver b/deps/Redis_Receiver index 0c403c8b46b3379ae7fe777e518124e9e2477eb1..c2d49ee2a1096498b8c06b469829dd11ef1c7246 160000 --- a/deps/Redis_Receiver +++ b/deps/Redis_Receiver @@ -1 +1 @@ -Subproject commit 0c403c8b46b3379ae7fe777e518124e9e2477eb1 +Subproject commit c2d49ee2a1096498b8c06b469829dd11ef1c7246 diff --git a/deps/Yaml-Configurator b/deps/Yaml-Configurator index 8176ab6718c7b75c564b706cbf82e4133fcb6988..619b119743e1f4de6313910e037447f5fe2876d4 160000 --- a/deps/Yaml-Configurator +++ b/deps/Yaml-Configurator @@ -1 +1 @@ -Subproject commit 8176ab6718c7b75c564b706cbf82e4133fcb6988 +Subproject commit 619b119743e1f4de6313910e037447f5fe2876d4 diff --git a/include/Astri_MA_Processor.h b/include/Astri_MA_Processor.h index 06b6e5baf6890fa313f9dd04fd3fc9a7052c51ea..9a564fac269218f08e4fa66c2c9f15a3685b2ca3 100644 --- a/include/Astri_MA_Processor.h +++ b/include/Astri_MA_Processor.h @@ -1,10 +1,20 @@ -#include +/** + * @file Astri_MA_Processor.h + * @brief This file contains the declaration of the AstriMaProcessor class and AstriMaProcBuilder class. + * + * The AstriMaProcessor class is a derived class of BaseDAQ and represents the processor for ASTRI packets. + * It provides methods for starting and stopping the processor, as well as configuring the processor using the AstriMaProcBuilder class. + * + * The AstriMaProcBuilder class is a builder class for configuring the AstriMaProcessor. + * It provides methods for setting the receiver, provider, and packets to process, as well as other configuration options. + */ + +#include #include #include #include #include - #include #ifndef ASTRI_PACKETS_PATH @@ -13,67 +23,68 @@ namespace inaf::oasbo::Processors { -class AstriMaProcessor : public inaf::oasbo::DAQ::BaseDAQ { +class AstriMaProcessor: public inaf::oasbo::DAQ::BaseDAQ { protected: - std::map packetStructuresMap; - int closingTimeout; - int connectionTimeout; - int receiveAndProcessPacket(); - std::string addPrefixToKey(std::string packetKey); - int connectReceiver(); - int connectProvider(); - void generatePacketStructuresMap(); - std::string getStateStr(Status); - void printLog(std::ostream &os, std::string message); - void cleanup(); - std::thread receiveAndProcessThread; - bool receiveAndProcessFlag = false; + std::map packetStructuresMap; /**< Map of the structures of the packets. + * The key is the packet key, i.e. "22" for the S22 pakcets etc. + * The value is a pointer to the corresponding PacketStructureJson object. + */ + std::map packetsFlags; /**< Map of packets to process. + * the key is the packet key, i.e. "22" for the S22 pakcets etc. + * the value is a boolean flag, true if the packet is to be processed, false otherwise. + */ + void populatePacketStructuresMap(); /**< Method for generating the packet structures map. */ + int closingTimeout; /**< Timeout for stopping the processor, i.e. for the cleanup process. */ + int receiveAndProcessPacket(); /**< Method for receiving and processing a packet. */ + std::string addPrefixToKey(std::string packetKey); /**< Helper method for adding a prefix to a packet key (i.e the "S" to 22 etc). */ + int connectReceiver(); /**< Method for connecting the receiver. */ + int connectProvider(); /**< Method for connecting the provider. */ + int connectionTimeout; /**< Timeout for establishing the connection. */ + void printLog(std::ostream &os, std::string message); /**< Method for printing log messages. */ + void cleanup(); /**< Method for cleaning up resources. */ + std::thread receiveAndProcessThread; /**< Thread for receiving and processing packets. */ + bool receiveAndProcessFlag = false; /**< Flag for stopping the receiving ang processing */ public: - AstriMaProcessor(); - - std::map packetsFlags; - - - ~AstriMaProcessor(); - void start() override; - void stop() override; - void stop(int seconds); - void switchState(const Status) override; - int deliverPacket() override; - friend class AstriMaProcBuilder; - + AstriMaProcessor(); /**< Constructor for AstriMaProcessor. */ + ~AstriMaProcessor(); /**< Destructor for AstriMaProcessor. */ + void start() override; /**< Method for starting the processor. */ + void stop() override; /**< Method for stopping the processor. */ + void stop(int seconds); /**< Method for stopping the processor after a specified number of seconds. */ + void switchState(const Status) override; /**< Method for switching the state of the processor. */ + int deliverPacket() override; /**< Method for delivering a packet. */ + std::string getStateStr(Status) override; /**< Method for getting the state as a string. */ + friend class AstriMaProcBuilder; + /**< Friend class declaration for AstriMaProcBuilder. */ }; class AstriMaProcBuilder { protected: - AstriMaProcessor *proc; + AstriMaProcessor *proc; /**< Pointer to the AstriMaProcessor being built. */ public: - AstriMaProcBuilder(); - ~AstriMaProcBuilder(); - void reset(); - AstriMaProcBuilder* configFrom(Configurators::BaseConfigurator &conf); - AstriMaProcBuilder* setReceiver(Receivers::BaseReceiver*); - AstriMaProcBuilder* setProvider(Providers::BaseProvider*); - AstriMaProcBuilder* setPacket(PacketLib::BasePacket*); - AstriMaProcBuilder* addPacketToProcess(std::string ); - AstriMaProcBuilder* removePacketToProcess(std::string); - AstriMaProcessor* getProcessor(); - - static const std::string config_target; - - static const std::string connectionTimeout_key; - static const std::string closingTimeout_key; - static const std::string receiver_key; - static const std::string provider_key; - static const std::string packet_key; - static const std::string typesubtype_key; - - static const std::string redis_rcv_key; - static const std::string file_rcv_key; - static const std::string kafkaavro_prov_key; - static const std::string astri_ma_packet_key; - static const std::string astri_horn_packet_key; + AstriMaProcBuilder(); /**< Constructor for AstriMaProcBuilder. */ + ~AstriMaProcBuilder(); /**< Destructor for AstriMaProcBuilder. */ + void reset(); /**< Method for resetting the builder. */ + AstriMaProcBuilder* configFrom(Configurators::BaseConfigurator &conf); /**< Method for configuring the builder from a BaseConfigurator. */ + AstriMaProcBuilder* setReceiver(Receivers::BaseReceiver*); /**< Method for setting the receiver. */ + AstriMaProcBuilder* setProvider(Providers::BaseProvider*); /**< Method for setting the provider. */ + AstriMaProcBuilder* setPacket(Packets::BasePacket*); /**< Method for setting the packet. */ + AstriMaProcBuilder* addPacketToProcess(std::string); /**< Method for adding a packet to process. */ + AstriMaProcBuilder* removePacketToProcess(std::string); /**< Method for removing a packet to process. */ + AstriMaProcessor* getProcessor(); /**< Method for getting the built AstriMaProcessor. */ + static const std::string config_target; /**< Configuration target constant. */ + static const std::string connectionTimeout_key; /**< Connection timeout key constant. */ + static const std::string closingTimeout_key; /**< stopping timeout key constant. */ + static const std::string receiver_key; /**< Receiver key constant. */ + static const std::string provider_key; /**< Provider key constant. */ + static const std::string packet_key; /**< Packet key constant. */ + static const std::string typesubtype_key; /**< Type subtype key constant. */ + static const std::string redis_rcv_key; /**< Redis receiver key constant. */ + static const std::string file_rcv_key; /**< File receiver key constant. */ + static const std::string kafkaavro_prov_key; /**< Kafka Avro provider key constant. */ + static const std::string astri_ma_packet_key; /**< ASTRI MA packet key constant. */ + static const std::string astri_horn_packet_key; /**< ASTRI horn packet key constant. */ }; + } diff --git a/src/Astri_MA_Processor.cpp b/src/Astri_MA_Processor.cpp index 66a41af325cc879ce39c18bcfd5cb663f02f7ec2..dbf6e52ae2f0544aae155721f2f3b6644e8d8684 100644 --- a/src/Astri_MA_Processor.cpp +++ b/src/Astri_MA_Processor.cpp @@ -43,7 +43,7 @@ void AstriMaProcessor::switchState(Status newState) { switch (this->currentState) { case Status::INIT: { switch (newState) { - case Status::READY: + case Status::READY: // from INIT to READY if (connectReceiver() < 0 || connectProvider() < 0) { break; } @@ -60,9 +60,9 @@ void AstriMaProcessor::switchState(Status newState) { } case Status::READY: { switch (newState) { - case Status::RUN: + case Status::RUN: // from READY to RUN this->setCurrentState(newState); - if (!this->receiveAndProcessThread.joinable()) { // Start only the first time + if (!this->receiveAndProcessThread.joinable()) { // Start the receiveAndProcessThread only the first time the state is set to RUN from READY this->receiveAndProcessThread = std::thread([this]() { this->receiveAndProcessFlag = true; while (this->receiveAndProcessFlag) { @@ -87,7 +87,7 @@ void AstriMaProcessor::switchState(Status newState) { switch (newState) { case Status::RUN: break; - case Status::READY: + case Status::READY: // from RUN to READY if (connectReceiver() < 0 || connectProvider() < 0) { break; } @@ -107,15 +107,15 @@ void AstriMaProcessor::switchState(Status newState) { int AstriMaProcessor::connectReceiver() { if (receiver->connectToClient() > 0) return 1; - auto startTime = std::chrono::steady_clock::now(); auto currentTime = std::chrono::steady_clock::now(); auto elapsedTime = std::chrono::duration_cast( currentTime - startTime).count(); - while ((this->connectionTimeout < 0 || elapsedTime < this->connectionTimeout) && (this->currentState != Status::STOP)) { + while ((this->connectionTimeout < 0 || elapsedTime < this->connectionTimeout) + && (this->currentState != Status::STOP)) { if (receiver->connectToClient() > 0) return 1; - else { + else { // try every second std::this_thread::sleep_for(std::chrono::seconds(1)); currentTime = std::chrono::steady_clock::now(); elapsedTime = std::chrono::duration_cast( @@ -129,15 +129,15 @@ int AstriMaProcessor::connectReceiver() { int AstriMaProcessor::connectProvider() { if (provider->open() > 0) return 1; - auto startTime = std::chrono::steady_clock::now(); auto currentTime = std::chrono::steady_clock::now(); auto elapsedTime = std::chrono::duration_cast( currentTime - startTime).count(); - while ((this->connectionTimeout < 0 || elapsedTime < this->connectionTimeout) && (this->currentState != Status::STOP)) { + while ((this->connectionTimeout < 0 || elapsedTime < this->connectionTimeout) + && (this->currentState != Status::STOP)) { if (provider->open() > 0) return 1; - else { + else { // try every second std::this_thread::sleep_for(std::chrono::seconds(1)); currentTime = std::chrono::steady_clock::now(); elapsedTime = std::chrono::duration_cast( @@ -150,19 +150,19 @@ int AstriMaProcessor::connectProvider() { int AstriMaProcessor::deliverPacket() { Packets::AstriMaGeneric *packet = - static_cast(this->packet); // downcast + static_cast(this->packet); // From basePacket to AstriPacket to use getType and getSubType std::string flagKey = std::string(std::to_string(packet->getType())).append( std::to_string(packet->getSubType())); int res = 1; try { if (packetsFlags[flagKey]) { - PacketLib::BasePacketStructure *previous = - &packet->getPacketStructure(); - packet->updatePacketStructure(*packetStructuresMap[flagKey]); + Packets::BasePacketStructure *previous = + &packet->getPacketStructure(); // Save the previous packet structure which was the generic one + packet->updatePacketStructure(*packetStructuresMap[flagKey]); // update tghe packet structure with the specific one (i.e. C11, C14, S22, etc) //std::string key = addPrefixToKey(flagKey); // UNcomment if you want to dispatch packets based on type_subype //provider->write(packet,key); // UNcomment if you want to dispatch packets based on type_subype res = provider->write(*packet); // comment if you want to dispatch packets based on type_subype - packet->updatePacketStructure(*previous); + packet->updatePacketStructure(*previous); // restore the generic packet structure to be able to receive the next packet } } catch (std::out_of_range &ex) { printLog(std::cerr, "Packet type_subtype not recognized: " + flagKey); @@ -189,10 +189,8 @@ void AstriMaProcessor::stop(int seconds) { } void AstriMaProcessor::start() { - - generatePacketStructuresMap(); + populatePacketStructuresMap(); switchState(Status::READY); - if (this->currentState != Status::READY) switchState(Status::STOP); else { @@ -206,15 +204,12 @@ void AstriMaProcessor::cleanup() { if (receiveAndProcessThread.joinable()) { receiveAndProcessThread.join(); } - auto startTime = std::chrono::steady_clock::now(); - while (true) { - + while (true) { // break at the timeout or when there are no more packets to process if (receiveAndProcessPacket() < 1) { printLog(std::cout, "Stop processing"); break; } - auto currentTime = std::chrono::steady_clock::now(); auto elapsedTime = std::chrono::duration_cast( currentTime - startTime).count(); @@ -224,13 +219,13 @@ void AstriMaProcessor::cleanup() { break; } } - receiver->closeConnectionToClient(); provider->close(); } -void AstriMaProcessor::generatePacketStructuresMap() { - for (std::pair pair : packetsFlags) { - if (pair.second) { + +void AstriMaProcessor::populatePacketStructuresMap() { + for (std::pair pair : packetsFlags) { // for each packet type to process + if (pair.second) { // if set to true, it searches for the corresponding .json file std::string filePath = std::string(ASTRI_PACKETS_PATH).append( "/AstriMA/").append(pair.first).append(".json"); std::ifstream file(filePath); diff --git a/src/main.cpp b/src/main.cpp index c9f3f1c5b7e2c42fd7db84b172f76190940220f5..7b65e1c5a9faa771496c5579a9f006c3064f95b5 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,9 +1,16 @@ -//============================================================================ -// Name : Receiver.cpp -// Author : Valerio Pastore - -//============================================================================ - +/** + * @file main.cpp + * @brief This file contains the main function for the Astri MA Processor. + * + * The Astri MA Processor is responsible for processing data using the AstriMaProcBuilder. + * It reads configuration from YAML, MySQL, and command line arguments. + * The program starts and stops the processor. + * + * The main function sets up signal handlers for CTRL-C and CTRL-TERM. + * It also prints the program start time and configuration parameters. + * The program runs until the processor's current state is STOP. + * Finally, it cleans up resources and prints the termination message. + */ #include #include @@ -75,8 +82,7 @@ int main(int argc, char **argv) { // START proc->start(); - - while(proc->getCurrentState() != AstriMaProcessor::Status::STOP){ + while (proc->getCurrentState() != AstriMaProcessor::Status::STOP) { std::this_thread::sleep_for(std::chrono::seconds(1)); }