From 3850298a44abc26e28772a61675bdbfce90ab9b9 Mon Sep 17 00:00:00 2001 From: Valerio Pastore Date: Sun, 14 Jan 2024 20:37:10 +0100 Subject: [PATCH 1/2] adding doxygen --- include/Astri_MA_Processor.h | 121 +++++++++++++++++++---------------- src/Astri_MA_Processor.cpp | 41 +++++------- src/main.cpp | 18 ++++-- 3 files changed, 96 insertions(+), 84 deletions(-) diff --git a/include/Astri_MA_Processor.h b/include/Astri_MA_Processor.h index 06b6e5b..5401e65 100644 --- a/include/Astri_MA_Processor.h +++ b/include/Astri_MA_Processor.h @@ -1,10 +1,21 @@ -#include + +/** + * @file Astri_MA_Processor.h + * @brief This file contains the declaration of the AstriMaProcessor class and AstriMaProcBuilder class. + * + * The AstriMaProcessor class is a derived class of BaseDAQ and represents the processor for ASTRI packets. + * It provides methods for starting and stopping the processor, as well as configuring the processor using the AstriMaProcBuilder class. + * + * The AstriMaProcBuilder class is a builder class for configuring the AstriMaProcessor. + * It provides methods for setting the receiver, provider, and packets to process, as well as other configuration options. + */ + +#include #include #include #include #include - #include #ifndef ASTRI_PACKETS_PATH @@ -15,65 +26,65 @@ namespace inaf::oasbo::Processors { class AstriMaProcessor : public inaf::oasbo::DAQ::BaseDAQ { protected: - std::map packetStructuresMap; - int closingTimeout; - int connectionTimeout; - int receiveAndProcessPacket(); - std::string addPrefixToKey(std::string packetKey); - int connectReceiver(); - int connectProvider(); - void generatePacketStructuresMap(); - std::string getStateStr(Status); - void printLog(std::ostream &os, std::string message); - void cleanup(); - std::thread receiveAndProcessThread; - bool receiveAndProcessFlag = false; + std::map packetStructuresMap; /**< Map of the structures of the packets. + * The key is the packet key, i.e. "22" for the S22 pakcets etc. + * The value is a pointer to the corresponding PacketStructureJson object. + */ + std::map packetsFlags; /**< Map of packets to process. + * the key is the packet key, i.e. "22" for the S22 pakcets etc. + * the value is a boolean flag, true if the packet is to be processed, false otherwise. + */ + void populatePacketStructuresMap(); /**< Method for generating the packet structures map. */ + int closingTimeout; /**< Timeout for stopping the processor, i.e. for the cleanup process. */ + int receiveAndProcessPacket(); /**< Method for receiving and processing a packet. */ + std::string addPrefixToKey(std::string packetKey); /**< Helper method for adding a prefix to a packet key (i.e the "S" to 22 etc). */ + int connectReceiver(); /**< Method for connecting the receiver. */ + int connectProvider(); /**< Method for connecting the provider. */ + int connectionTimeout; /**< Timeout for establishing the connection. */ + void printLog(std::ostream &os, std::string message); /**< Method for printing log messages. */ + void cleanup(); /**< Method for cleaning up resources. */ + std::thread receiveAndProcessThread; /**< Thread for receiving and processing packets. */ + bool receiveAndProcessFlag = false; /**< Flag for stopping the receiving ang processing */ public: - AstriMaProcessor(); - - std::map packetsFlags; - - - ~AstriMaProcessor(); - void start() override; - void stop() override; - void stop(int seconds); - void switchState(const Status) override; - int deliverPacket() override; - friend class AstriMaProcBuilder; - + AstriMaProcessor(); /**< Constructor for AstriMaProcessor. */ + ~AstriMaProcessor(); /**< Destructor for AstriMaProcessor. */ + void start() override; /**< Method for starting the processor. */ + void stop() override; /**< Method for stopping the processor. */ + void stop(int seconds); /**< Method for stopping the processor after a specified number of seconds. */ + void switchState(const Status) override; /**< Method for switching the state of the processor. */ + int deliverPacket() override; /**< Method for delivering a packet. */ + std::string getStateStr(Status) override; /**< Method for getting the state as a string. */ + friend class AstriMaProcBuilder; /**< Friend class declaration for AstriMaProcBuilder. */ }; class AstriMaProcBuilder { protected: - AstriMaProcessor *proc; + AstriMaProcessor *proc; /**< Pointer to the AstriMaProcessor being built. */ public: - AstriMaProcBuilder(); - ~AstriMaProcBuilder(); - void reset(); - AstriMaProcBuilder* configFrom(Configurators::BaseConfigurator &conf); - AstriMaProcBuilder* setReceiver(Receivers::BaseReceiver*); - AstriMaProcBuilder* setProvider(Providers::BaseProvider*); - AstriMaProcBuilder* setPacket(PacketLib::BasePacket*); - AstriMaProcBuilder* addPacketToProcess(std::string ); - AstriMaProcBuilder* removePacketToProcess(std::string); - AstriMaProcessor* getProcessor(); - - static const std::string config_target; - - static const std::string connectionTimeout_key; - static const std::string closingTimeout_key; - static const std::string receiver_key; - static const std::string provider_key; - static const std::string packet_key; - static const std::string typesubtype_key; - - static const std::string redis_rcv_key; - static const std::string file_rcv_key; - static const std::string kafkaavro_prov_key; - static const std::string astri_ma_packet_key; - static const std::string astri_horn_packet_key; + AstriMaProcBuilder(); /**< Constructor for AstriMaProcBuilder. */ + ~AstriMaProcBuilder(); /**< Destructor for AstriMaProcBuilder. */ + void reset(); /**< Method for resetting the builder. */ + AstriMaProcBuilder* configFrom(Configurators::BaseConfigurator &conf); /**< Method for configuring the builder from a BaseConfigurator. */ + AstriMaProcBuilder* setReceiver(Receivers::BaseReceiver*); /**< Method for setting the receiver. */ + AstriMaProcBuilder* setProvider(Providers::BaseProvider*); /**< Method for setting the provider. */ + AstriMaProcBuilder* setPacket(PacketLib::BasePacket*); /**< Method for setting the packet. */ + AstriMaProcBuilder* addPacketToProcess(std::string ); /**< Method for adding a packet to process. */ + AstriMaProcBuilder* removePacketToProcess(std::string); /**< Method for removing a packet to process. */ + AstriMaProcessor* getProcessor(); /**< Method for getting the built AstriMaProcessor. */ + static const std::string config_target; /**< Configuration target constant. */ + static const std::string connectionTimeout_key; /**< Connection timeout key constant. */ + static const std::string closingTimeout_key; /**< stopping timeout key constant. */ + static const std::string receiver_key; /**< Receiver key constant. */ + static const std::string provider_key; /**< Provider key constant. */ + static const std::string packet_key; /**< Packet key constant. */ + static const std::string typesubtype_key; /**< Type subtype key constant. */ + static const std::string redis_rcv_key; /**< Redis receiver key constant. */ + static const std::string file_rcv_key; /**< File receiver key constant. */ + static const std::string kafkaavro_prov_key; /**< Kafka Avro provider key constant. */ + static const std::string astri_ma_packet_key; /**< ASTRI MA packet key constant. */ + static const std::string astri_horn_packet_key; /**< ASTRI horn packet key constant. */ }; -} + +} \ No newline at end of file diff --git a/src/Astri_MA_Processor.cpp b/src/Astri_MA_Processor.cpp index 66a41af..5019df5 100644 --- a/src/Astri_MA_Processor.cpp +++ b/src/Astri_MA_Processor.cpp @@ -43,7 +43,7 @@ void AstriMaProcessor::switchState(Status newState) { switch (this->currentState) { case Status::INIT: { switch (newState) { - case Status::READY: + case Status::READY: // from INIT to READY if (connectReceiver() < 0 || connectProvider() < 0) { break; } @@ -60,9 +60,9 @@ void AstriMaProcessor::switchState(Status newState) { } case Status::READY: { switch (newState) { - case Status::RUN: + case Status::RUN: // from READY to RUN this->setCurrentState(newState); - if (!this->receiveAndProcessThread.joinable()) { // Start only the first time + if (!this->receiveAndProcessThread.joinable()) { // Start the receiveAndProcessThread only the first time the state is set to RUN from READY this->receiveAndProcessThread = std::thread([this]() { this->receiveAndProcessFlag = true; while (this->receiveAndProcessFlag) { @@ -87,7 +87,7 @@ void AstriMaProcessor::switchState(Status newState) { switch (newState) { case Status::RUN: break; - case Status::READY: + case Status::READY: // from RUN to READY if (connectReceiver() < 0 || connectProvider() < 0) { break; } @@ -107,15 +107,14 @@ void AstriMaProcessor::switchState(Status newState) { int AstriMaProcessor::connectReceiver() { if (receiver->connectToClient() > 0) return 1; - auto startTime = std::chrono::steady_clock::now(); auto currentTime = std::chrono::steady_clock::now(); auto elapsedTime = std::chrono::duration_cast( currentTime - startTime).count(); - while ((this->connectionTimeout < 0 || elapsedTime < this->connectionTimeout) && (this->currentState != Status::STOP)) { + while ((this->connectionTimeout < 0 || elapsedTime < this->connectionTimeout) && (this->currentState != Status::STOP)) { if (receiver->connectToClient() > 0) return 1; - else { + else { // try every second std::this_thread::sleep_for(std::chrono::seconds(1)); currentTime = std::chrono::steady_clock::now(); elapsedTime = std::chrono::duration_cast( @@ -129,7 +128,6 @@ int AstriMaProcessor::connectReceiver() { int AstriMaProcessor::connectProvider() { if (provider->open() > 0) return 1; - auto startTime = std::chrono::steady_clock::now(); auto currentTime = std::chrono::steady_clock::now(); auto elapsedTime = std::chrono::duration_cast( @@ -137,7 +135,7 @@ int AstriMaProcessor::connectProvider() { while ((this->connectionTimeout < 0 || elapsedTime < this->connectionTimeout) && (this->currentState != Status::STOP)) { if (provider->open() > 0) return 1; - else { + else { // try every second std::this_thread::sleep_for(std::chrono::seconds(1)); currentTime = std::chrono::steady_clock::now(); elapsedTime = std::chrono::duration_cast( @@ -150,19 +148,19 @@ int AstriMaProcessor::connectProvider() { int AstriMaProcessor::deliverPacket() { Packets::AstriMaGeneric *packet = - static_cast(this->packet); // downcast + static_cast(this->packet); // From basePacket to AstriPacket to use getType and getSubType std::string flagKey = std::string(std::to_string(packet->getType())).append( std::to_string(packet->getSubType())); int res = 1; try { if (packetsFlags[flagKey]) { PacketLib::BasePacketStructure *previous = - &packet->getPacketStructure(); - packet->updatePacketStructure(*packetStructuresMap[flagKey]); + &packet->getPacketStructure(); // Save the previous packet structure which was the generic one + packet->updatePacketStructure(*packetStructuresMap[flagKey]); // update tghe packet structure with the specific one (i.e. C11, C14, S22, etc) //std::string key = addPrefixToKey(flagKey); // UNcomment if you want to dispatch packets based on type_subype //provider->write(packet,key); // UNcomment if you want to dispatch packets based on type_subype res = provider->write(*packet); // comment if you want to dispatch packets based on type_subype - packet->updatePacketStructure(*previous); + packet->updatePacketStructure(*previous); // restore the generic packet structure to be able to receive the next packet } } catch (std::out_of_range &ex) { printLog(std::cerr, "Packet type_subtype not recognized: " + flagKey); @@ -189,10 +187,8 @@ void AstriMaProcessor::stop(int seconds) { } void AstriMaProcessor::start() { - - generatePacketStructuresMap(); + populatePacketStructuresMap(); switchState(Status::READY); - if (this->currentState != Status::READY) switchState(Status::STOP); else { @@ -206,15 +202,12 @@ void AstriMaProcessor::cleanup() { if (receiveAndProcessThread.joinable()) { receiveAndProcessThread.join(); } - auto startTime = std::chrono::steady_clock::now(); - while (true) { - + while (true) { // break at the timeout or when there are no more packets to process if (receiveAndProcessPacket() < 1) { printLog(std::cout, "Stop processing"); break; } - auto currentTime = std::chrono::steady_clock::now(); auto elapsedTime = std::chrono::duration_cast( currentTime - startTime).count(); @@ -224,13 +217,13 @@ void AstriMaProcessor::cleanup() { break; } } - receiver->closeConnectionToClient(); provider->close(); } -void AstriMaProcessor::generatePacketStructuresMap() { - for (std::pair pair : packetsFlags) { - if (pair.second) { + +void AstriMaProcessor::populatePacketStructuresMap() { + for (std::pair pair : packetsFlags) { // for each packet type to process + if (pair.second) { // if set to true, it searches for the corresponding .json file std::string filePath = std::string(ASTRI_PACKETS_PATH).append( "/AstriMA/").append(pair.first).append(".json"); std::ifstream file(filePath); diff --git a/src/main.cpp b/src/main.cpp index c9f3f1c..ac301cb 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,9 +1,17 @@ -//============================================================================ -// Name : Receiver.cpp -// Author : Valerio Pastore - -//============================================================================ +/** + * @file main.cpp + * @brief This file contains the main function for the Astri MA Processor. + * + * The Astri MA Processor is responsible for processing data using the AstriMaProcBuilder. + * It reads configuration from YAML, MySQL, and command line arguments. + * The program starts and stops the processor. + * + * The main function sets up signal handlers for CTRL-C and CTRL-TERM. + * It also prints the program start time and configuration parameters. + * The program runs until the processor's current state is STOP. + * Finally, it cleans up resources and prints the termination message. + */ #include #include -- GitLab From 3416d590255fda398caee7d33bbd36b226959da8 Mon Sep 17 00:00:00 2001 From: valerio pastore Date: Sun, 14 Jan 2024 20:53:20 +0100 Subject: [PATCH 2/2] . --- deps/Astri-Packets | 2 +- deps/Base-DAQ | 2 +- deps/CL-Configurator | 2 +- deps/File-Receiver | 2 +- deps/Kafka-Avro-Provider | 2 +- deps/Mysql-Configurator | 2 +- deps/Redis_Receiver | 2 +- deps/Yaml-Configurator | 2 +- include/Astri_MA_Processor.h | 28 ++++++++++++++-------------- src/Astri_MA_Processor.cpp | 8 +++++--- src/main.cpp | 4 +--- 11 files changed, 28 insertions(+), 28 deletions(-) diff --git a/deps/Astri-Packets b/deps/Astri-Packets index b9c7127..a4c3267 160000 --- a/deps/Astri-Packets +++ b/deps/Astri-Packets @@ -1 +1 @@ -Subproject commit b9c7127f512229371df5c98cda9a7bd728eaae83 +Subproject commit a4c32676a99747fa8210886b20bdc39620635039 diff --git a/deps/Base-DAQ b/deps/Base-DAQ index 8a0ea2d..a00f9a2 160000 --- a/deps/Base-DAQ +++ b/deps/Base-DAQ @@ -1 +1 @@ -Subproject commit 8a0ea2d0e699863df5fe1c91caf2d7b0855957be +Subproject commit a00f9a27afbf5f75dab7db2368b9b9b6fcb395e1 diff --git a/deps/CL-Configurator b/deps/CL-Configurator index 040a673..4ab1353 160000 --- a/deps/CL-Configurator +++ b/deps/CL-Configurator @@ -1 +1 @@ -Subproject commit 040a6736589b17033f28c9ad2f71879c1fcc7453 +Subproject commit 4ab1353f28eccfa75f7216306ed9c893bced1083 diff --git a/deps/File-Receiver b/deps/File-Receiver index e06b513..a5881a4 160000 --- a/deps/File-Receiver +++ b/deps/File-Receiver @@ -1 +1 @@ -Subproject commit e06b5132e2da1e49d4338be58d913b6304e5df8a +Subproject commit a5881a457f74ea6f5b9b1320d02cc4fda2046821 diff --git a/deps/Kafka-Avro-Provider b/deps/Kafka-Avro-Provider index b8331fb..3afcad8 160000 --- a/deps/Kafka-Avro-Provider +++ b/deps/Kafka-Avro-Provider @@ -1 +1 @@ -Subproject commit b8331fbae46c2b4f9670234caa2a9a23b389436e +Subproject commit 3afcad80087b5d67cf3ebecf3df575727ba08493 diff --git a/deps/Mysql-Configurator b/deps/Mysql-Configurator index ebbb0f3..3cf7397 160000 --- a/deps/Mysql-Configurator +++ b/deps/Mysql-Configurator @@ -1 +1 @@ -Subproject commit ebbb0f36843dd8f58e3b0442bfe5078732b3e022 +Subproject commit 3cf7397511847e3ffbf42a736aac74a97b131210 diff --git a/deps/Redis_Receiver b/deps/Redis_Receiver index 0c403c8..c2d49ee 160000 --- a/deps/Redis_Receiver +++ b/deps/Redis_Receiver @@ -1 +1 @@ -Subproject commit 0c403c8b46b3379ae7fe777e518124e9e2477eb1 +Subproject commit c2d49ee2a1096498b8c06b469829dd11ef1c7246 diff --git a/deps/Yaml-Configurator b/deps/Yaml-Configurator index 8176ab6..619b119 160000 --- a/deps/Yaml-Configurator +++ b/deps/Yaml-Configurator @@ -1 +1 @@ -Subproject commit 8176ab6718c7b75c564b706cbf82e4133fcb6988 +Subproject commit 619b119743e1f4de6313910e037447f5fe2876d4 diff --git a/include/Astri_MA_Processor.h b/include/Astri_MA_Processor.h index 5401e65..9a564fa 100644 --- a/include/Astri_MA_Processor.h +++ b/include/Astri_MA_Processor.h @@ -1,5 +1,4 @@ - /** * @file Astri_MA_Processor.h * @brief This file contains the declaration of the AstriMaProcessor class and AstriMaProcBuilder class. @@ -24,16 +23,16 @@ namespace inaf::oasbo::Processors { -class AstriMaProcessor : public inaf::oasbo::DAQ::BaseDAQ { +class AstriMaProcessor: public inaf::oasbo::DAQ::BaseDAQ { protected: - std::map packetStructuresMap; /**< Map of the structures of the packets. - * The key is the packet key, i.e. "22" for the S22 pakcets etc. - * The value is a pointer to the corresponding PacketStructureJson object. - */ + std::map packetStructuresMap; /**< Map of the structures of the packets. + * The key is the packet key, i.e. "22" for the S22 pakcets etc. + * The value is a pointer to the corresponding PacketStructureJson object. + */ std::map packetsFlags; /**< Map of packets to process. - * the key is the packet key, i.e. "22" for the S22 pakcets etc. - * the value is a boolean flag, true if the packet is to be processed, false otherwise. - */ + * the key is the packet key, i.e. "22" for the S22 pakcets etc. + * the value is a boolean flag, true if the packet is to be processed, false otherwise. + */ void populatePacketStructuresMap(); /**< Method for generating the packet structures map. */ int closingTimeout; /**< Timeout for stopping the processor, i.e. for the cleanup process. */ int receiveAndProcessPacket(); /**< Method for receiving and processing a packet. */ @@ -54,8 +53,9 @@ public: void stop(int seconds); /**< Method for stopping the processor after a specified number of seconds. */ void switchState(const Status) override; /**< Method for switching the state of the processor. */ int deliverPacket() override; /**< Method for delivering a packet. */ - std::string getStateStr(Status) override; /**< Method for getting the state as a string. */ - friend class AstriMaProcBuilder; /**< Friend class declaration for AstriMaProcBuilder. */ + std::string getStateStr(Status) override; /**< Method for getting the state as a string. */ + friend class AstriMaProcBuilder; + /**< Friend class declaration for AstriMaProcBuilder. */ }; class AstriMaProcBuilder { @@ -68,8 +68,8 @@ public: AstriMaProcBuilder* configFrom(Configurators::BaseConfigurator &conf); /**< Method for configuring the builder from a BaseConfigurator. */ AstriMaProcBuilder* setReceiver(Receivers::BaseReceiver*); /**< Method for setting the receiver. */ AstriMaProcBuilder* setProvider(Providers::BaseProvider*); /**< Method for setting the provider. */ - AstriMaProcBuilder* setPacket(PacketLib::BasePacket*); /**< Method for setting the packet. */ - AstriMaProcBuilder* addPacketToProcess(std::string ); /**< Method for adding a packet to process. */ + AstriMaProcBuilder* setPacket(Packets::BasePacket*); /**< Method for setting the packet. */ + AstriMaProcBuilder* addPacketToProcess(std::string); /**< Method for adding a packet to process. */ AstriMaProcBuilder* removePacketToProcess(std::string); /**< Method for removing a packet to process. */ AstriMaProcessor* getProcessor(); /**< Method for getting the built AstriMaProcessor. */ @@ -87,4 +87,4 @@ public: static const std::string astri_horn_packet_key; /**< ASTRI horn packet key constant. */ }; -} \ No newline at end of file +} diff --git a/src/Astri_MA_Processor.cpp b/src/Astri_MA_Processor.cpp index 5019df5..dbf6e52 100644 --- a/src/Astri_MA_Processor.cpp +++ b/src/Astri_MA_Processor.cpp @@ -111,7 +111,8 @@ int AstriMaProcessor::connectReceiver() { auto currentTime = std::chrono::steady_clock::now(); auto elapsedTime = std::chrono::duration_cast( currentTime - startTime).count(); - while ((this->connectionTimeout < 0 || elapsedTime < this->connectionTimeout) && (this->currentState != Status::STOP)) { + while ((this->connectionTimeout < 0 || elapsedTime < this->connectionTimeout) + && (this->currentState != Status::STOP)) { if (receiver->connectToClient() > 0) return 1; else { // try every second @@ -132,7 +133,8 @@ int AstriMaProcessor::connectProvider() { auto currentTime = std::chrono::steady_clock::now(); auto elapsedTime = std::chrono::duration_cast( currentTime - startTime).count(); - while ((this->connectionTimeout < 0 || elapsedTime < this->connectionTimeout) && (this->currentState != Status::STOP)) { + while ((this->connectionTimeout < 0 || elapsedTime < this->connectionTimeout) + && (this->currentState != Status::STOP)) { if (provider->open() > 0) return 1; else { // try every second @@ -154,7 +156,7 @@ int AstriMaProcessor::deliverPacket() { int res = 1; try { if (packetsFlags[flagKey]) { - PacketLib::BasePacketStructure *previous = + Packets::BasePacketStructure *previous = &packet->getPacketStructure(); // Save the previous packet structure which was the generic one packet->updatePacketStructure(*packetStructuresMap[flagKey]); // update tghe packet structure with the specific one (i.e. C11, C14, S22, etc) //std::string key = addPrefixToKey(flagKey); // UNcomment if you want to dispatch packets based on type_subype diff --git a/src/main.cpp b/src/main.cpp index ac301cb..7b65e1c 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,4 +1,3 @@ - /** * @file main.cpp * @brief This file contains the main function for the Astri MA Processor. @@ -83,8 +82,7 @@ int main(int argc, char **argv) { // START proc->start(); - - while(proc->getCurrentState() != AstriMaProcessor::Status::STOP){ + while (proc->getCurrentState() != AstriMaProcessor::Status::STOP) { std::this_thread::sleep_for(std::chrono::seconds(1)); } -- GitLab