Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • bias/processors/astri_ma_processor
1 result
Show changes
Commits on Source (3)
Subproject commit b9c7127f512229371df5c98cda9a7bd728eaae83 Subproject commit a4c32676a99747fa8210886b20bdc39620635039
Subproject commit 8a0ea2d0e699863df5fe1c91caf2d7b0855957be Subproject commit a00f9a27afbf5f75dab7db2368b9b9b6fcb395e1
Subproject commit 040a6736589b17033f28c9ad2f71879c1fcc7453 Subproject commit 4ab1353f28eccfa75f7216306ed9c893bced1083
Subproject commit e06b5132e2da1e49d4338be58d913b6304e5df8a Subproject commit a5881a457f74ea6f5b9b1320d02cc4fda2046821
Subproject commit b8331fbae46c2b4f9670234caa2a9a23b389436e Subproject commit 3afcad80087b5d67cf3ebecf3df575727ba08493
Subproject commit ebbb0f36843dd8f58e3b0442bfe5078732b3e022 Subproject commit 3cf7397511847e3ffbf42a736aac74a97b131210
Subproject commit 0c403c8b46b3379ae7fe777e518124e9e2477eb1 Subproject commit c2d49ee2a1096498b8c06b469829dd11ef1c7246
Subproject commit 8176ab6718c7b75c564b706cbf82e4133fcb6988 Subproject commit 619b119743e1f4de6313910e037447f5fe2876d4
#include <Base_DAQ.h>
/**
* @file Astri_MA_Processor.h
* @brief This file contains the declaration of the AstriMaProcessor class and AstriMaProcBuilder class.
*
* The AstriMaProcessor class is a derived class of BaseDAQ and represents the processor for ASTRI packets.
* It provides methods for starting and stopping the processor, as well as configuring the processor using the AstriMaProcBuilder class.
*
* The AstriMaProcBuilder class is a builder class for configuring the AstriMaProcessor.
* It provides methods for setting the receiver, provider, and packets to process, as well as other configuration options.
*/
#include <Base_DAQ.h>
#include <map> #include <map>
#include <Packet_Structure_Json.h> #include <Packet_Structure_Json.h>
#include <Base_Configurator.h> #include <Base_Configurator.h>
#include <Astri_MA_Generic.h> #include <Astri_MA_Generic.h>
#include <thread> #include <thread>
#ifndef ASTRI_PACKETS_PATH #ifndef ASTRI_PACKETS_PATH
...@@ -13,67 +23,68 @@ ...@@ -13,67 +23,68 @@
namespace inaf::oasbo::Processors { namespace inaf::oasbo::Processors {
class AstriMaProcessor : public inaf::oasbo::DAQ::BaseDAQ { class AstriMaProcessor: public inaf::oasbo::DAQ::BaseDAQ {
protected: protected:
std::map<std::string, Packets::PacketStructureJson *> packetStructuresMap; std::map<std::string, Packets::PacketStructureJson*> packetStructuresMap; /**< Map of the structures of the packets.
int closingTimeout; * The key is the packet key, i.e. "22" for the S22 pakcets etc.
int connectionTimeout; * The value is a pointer to the corresponding PacketStructureJson object.
int receiveAndProcessPacket(); */
std::string addPrefixToKey(std::string packetKey); std::map<std::string, bool> packetsFlags; /**< Map of packets to process.
int connectReceiver(); * the key is the packet key, i.e. "22" for the S22 pakcets etc.
int connectProvider(); * the value is a boolean flag, true if the packet is to be processed, false otherwise.
void generatePacketStructuresMap(); */
std::string getStateStr(Status); void populatePacketStructuresMap(); /**< Method for generating the packet structures map. */
void printLog(std::ostream &os, std::string message); int closingTimeout; /**< Timeout for stopping the processor, i.e. for the cleanup process. */
void cleanup(); int receiveAndProcessPacket(); /**< Method for receiving and processing a packet. */
std::thread receiveAndProcessThread; std::string addPrefixToKey(std::string packetKey); /**< Helper method for adding a prefix to a packet key (i.e the "S" to 22 etc). */
bool receiveAndProcessFlag = false; int connectReceiver(); /**< Method for connecting the receiver. */
int connectProvider(); /**< Method for connecting the provider. */
int connectionTimeout; /**< Timeout for establishing the connection. */
void printLog(std::ostream &os, std::string message); /**< Method for printing log messages. */
void cleanup(); /**< Method for cleaning up resources. */
std::thread receiveAndProcessThread; /**< Thread for receiving and processing packets. */
bool receiveAndProcessFlag = false; /**< Flag for stopping the receiving ang processing */
public: public:
AstriMaProcessor(); AstriMaProcessor(); /**< Constructor for AstriMaProcessor. */
~AstriMaProcessor(); /**< Destructor for AstriMaProcessor. */
std::map<std::string, bool> packetsFlags; void start() override; /**< Method for starting the processor. */
void stop() override; /**< Method for stopping the processor. */
void stop(int seconds); /**< Method for stopping the processor after a specified number of seconds. */
~AstriMaProcessor(); void switchState(const Status) override; /**< Method for switching the state of the processor. */
void start() override; int deliverPacket() override; /**< Method for delivering a packet. */
void stop() override; std::string getStateStr(Status) override; /**< Method for getting the state as a string. */
void stop(int seconds); friend class AstriMaProcBuilder;
void switchState(const Status) override; /**< Friend class declaration for AstriMaProcBuilder. */
int deliverPacket() override;
friend class AstriMaProcBuilder;
}; };
class AstriMaProcBuilder { class AstriMaProcBuilder {
protected: protected:
AstriMaProcessor *proc; AstriMaProcessor *proc; /**< Pointer to the AstriMaProcessor being built. */
public: public:
AstriMaProcBuilder(); AstriMaProcBuilder(); /**< Constructor for AstriMaProcBuilder. */
~AstriMaProcBuilder(); ~AstriMaProcBuilder(); /**< Destructor for AstriMaProcBuilder. */
void reset(); void reset(); /**< Method for resetting the builder. */
AstriMaProcBuilder* configFrom(Configurators::BaseConfigurator &conf); AstriMaProcBuilder* configFrom(Configurators::BaseConfigurator &conf); /**< Method for configuring the builder from a BaseConfigurator. */
AstriMaProcBuilder* setReceiver(Receivers::BaseReceiver*); AstriMaProcBuilder* setReceiver(Receivers::BaseReceiver*); /**< Method for setting the receiver. */
AstriMaProcBuilder* setProvider(Providers::BaseProvider*); AstriMaProcBuilder* setProvider(Providers::BaseProvider*); /**< Method for setting the provider. */
AstriMaProcBuilder* setPacket(PacketLib::BasePacket*); AstriMaProcBuilder* setPacket(Packets::BasePacket*); /**< Method for setting the packet. */
AstriMaProcBuilder* addPacketToProcess(std::string ); AstriMaProcBuilder* addPacketToProcess(std::string); /**< Method for adding a packet to process. */
AstriMaProcBuilder* removePacketToProcess(std::string); AstriMaProcBuilder* removePacketToProcess(std::string); /**< Method for removing a packet to process. */
AstriMaProcessor* getProcessor(); AstriMaProcessor* getProcessor(); /**< Method for getting the built AstriMaProcessor. */
static const std::string config_target;
static const std::string connectionTimeout_key;
static const std::string closingTimeout_key;
static const std::string receiver_key;
static const std::string provider_key;
static const std::string packet_key;
static const std::string typesubtype_key;
static const std::string redis_rcv_key;
static const std::string file_rcv_key;
static const std::string kafkaavro_prov_key;
static const std::string astri_ma_packet_key;
static const std::string astri_horn_packet_key;
static const std::string config_target; /**< Configuration target constant. */
static const std::string connectionTimeout_key; /**< Connection timeout key constant. */
static const std::string closingTimeout_key; /**< stopping timeout key constant. */
static const std::string receiver_key; /**< Receiver key constant. */
static const std::string provider_key; /**< Provider key constant. */
static const std::string packet_key; /**< Packet key constant. */
static const std::string typesubtype_key; /**< Type subtype key constant. */
static const std::string redis_rcv_key; /**< Redis receiver key constant. */
static const std::string file_rcv_key; /**< File receiver key constant. */
static const std::string kafkaavro_prov_key; /**< Kafka Avro provider key constant. */
static const std::string astri_ma_packet_key; /**< ASTRI MA packet key constant. */
static const std::string astri_horn_packet_key; /**< ASTRI horn packet key constant. */
}; };
} }
...@@ -43,7 +43,7 @@ void AstriMaProcessor::switchState(Status newState) { ...@@ -43,7 +43,7 @@ void AstriMaProcessor::switchState(Status newState) {
switch (this->currentState) { switch (this->currentState) {
case Status::INIT: { case Status::INIT: {
switch (newState) { switch (newState) {
case Status::READY: case Status::READY: // from INIT to READY
if (connectReceiver() < 0 || connectProvider() < 0) { if (connectReceiver() < 0 || connectProvider() < 0) {
break; break;
} }
...@@ -60,9 +60,9 @@ void AstriMaProcessor::switchState(Status newState) { ...@@ -60,9 +60,9 @@ void AstriMaProcessor::switchState(Status newState) {
} }
case Status::READY: { case Status::READY: {
switch (newState) { switch (newState) {
case Status::RUN: case Status::RUN: // from READY to RUN
this->setCurrentState(newState); this->setCurrentState(newState);
if (!this->receiveAndProcessThread.joinable()) { // Start only the first time if (!this->receiveAndProcessThread.joinable()) { // Start the receiveAndProcessThread only the first time the state is set to RUN from READY
this->receiveAndProcessThread = std::thread([this]() { this->receiveAndProcessThread = std::thread([this]() {
this->receiveAndProcessFlag = true; this->receiveAndProcessFlag = true;
while (this->receiveAndProcessFlag) { while (this->receiveAndProcessFlag) {
...@@ -87,7 +87,7 @@ void AstriMaProcessor::switchState(Status newState) { ...@@ -87,7 +87,7 @@ void AstriMaProcessor::switchState(Status newState) {
switch (newState) { switch (newState) {
case Status::RUN: case Status::RUN:
break; break;
case Status::READY: case Status::READY: // from RUN to READY
if (connectReceiver() < 0 || connectProvider() < 0) { if (connectReceiver() < 0 || connectProvider() < 0) {
break; break;
} }
...@@ -107,15 +107,15 @@ void AstriMaProcessor::switchState(Status newState) { ...@@ -107,15 +107,15 @@ void AstriMaProcessor::switchState(Status newState) {
int AstriMaProcessor::connectReceiver() { int AstriMaProcessor::connectReceiver() {
if (receiver->connectToClient() > 0) if (receiver->connectToClient() > 0)
return 1; return 1;
auto startTime = std::chrono::steady_clock::now(); auto startTime = std::chrono::steady_clock::now();
auto currentTime = std::chrono::steady_clock::now(); auto currentTime = std::chrono::steady_clock::now();
auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>( auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(
currentTime - startTime).count(); currentTime - startTime).count();
while ((this->connectionTimeout < 0 || elapsedTime < this->connectionTimeout) && (this->currentState != Status::STOP)) { while ((this->connectionTimeout < 0 || elapsedTime < this->connectionTimeout)
&& (this->currentState != Status::STOP)) {
if (receiver->connectToClient() > 0) if (receiver->connectToClient() > 0)
return 1; return 1;
else { else { // try every second
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
currentTime = std::chrono::steady_clock::now(); currentTime = std::chrono::steady_clock::now();
elapsedTime = std::chrono::duration_cast<std::chrono::seconds>( elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(
...@@ -129,15 +129,15 @@ int AstriMaProcessor::connectReceiver() { ...@@ -129,15 +129,15 @@ int AstriMaProcessor::connectReceiver() {
int AstriMaProcessor::connectProvider() { int AstriMaProcessor::connectProvider() {
if (provider->open() > 0) if (provider->open() > 0)
return 1; return 1;
auto startTime = std::chrono::steady_clock::now(); auto startTime = std::chrono::steady_clock::now();
auto currentTime = std::chrono::steady_clock::now(); auto currentTime = std::chrono::steady_clock::now();
auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>( auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(
currentTime - startTime).count(); currentTime - startTime).count();
while ((this->connectionTimeout < 0 || elapsedTime < this->connectionTimeout) && (this->currentState != Status::STOP)) { while ((this->connectionTimeout < 0 || elapsedTime < this->connectionTimeout)
&& (this->currentState != Status::STOP)) {
if (provider->open() > 0) if (provider->open() > 0)
return 1; return 1;
else { else { // try every second
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
currentTime = std::chrono::steady_clock::now(); currentTime = std::chrono::steady_clock::now();
elapsedTime = std::chrono::duration_cast<std::chrono::seconds>( elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(
...@@ -150,19 +150,19 @@ int AstriMaProcessor::connectProvider() { ...@@ -150,19 +150,19 @@ int AstriMaProcessor::connectProvider() {
int AstriMaProcessor::deliverPacket() { int AstriMaProcessor::deliverPacket() {
Packets::AstriMaGeneric *packet = Packets::AstriMaGeneric *packet =
static_cast<Packets::AstriMaGeneric*>(this->packet); // downcast static_cast<Packets::AstriMaGeneric*>(this->packet); // From basePacket to AstriPacket to use getType and getSubType
std::string flagKey = std::string(std::to_string(packet->getType())).append( std::string flagKey = std::string(std::to_string(packet->getType())).append(
std::to_string(packet->getSubType())); std::to_string(packet->getSubType()));
int res = 1; int res = 1;
try { try {
if (packetsFlags[flagKey]) { if (packetsFlags[flagKey]) {
PacketLib::BasePacketStructure *previous = Packets::BasePacketStructure *previous =
&packet->getPacketStructure(); &packet->getPacketStructure(); // Save the previous packet structure which was the generic one
packet->updatePacketStructure(*packetStructuresMap[flagKey]); packet->updatePacketStructure(*packetStructuresMap[flagKey]); // update tghe packet structure with the specific one (i.e. C11, C14, S22, etc)
//std::string key = addPrefixToKey(flagKey); // UNcomment if you want to dispatch packets based on type_subype //std::string key = addPrefixToKey(flagKey); // UNcomment if you want to dispatch packets based on type_subype
//provider->write(packet,key); // UNcomment if you want to dispatch packets based on type_subype //provider->write(packet,key); // UNcomment if you want to dispatch packets based on type_subype
res = provider->write(*packet); // comment if you want to dispatch packets based on type_subype res = provider->write(*packet); // comment if you want to dispatch packets based on type_subype
packet->updatePacketStructure(*previous); packet->updatePacketStructure(*previous); // restore the generic packet structure to be able to receive the next packet
} }
} catch (std::out_of_range &ex) { } catch (std::out_of_range &ex) {
printLog(std::cerr, "Packet type_subtype not recognized: " + flagKey); printLog(std::cerr, "Packet type_subtype not recognized: " + flagKey);
...@@ -189,10 +189,8 @@ void AstriMaProcessor::stop(int seconds) { ...@@ -189,10 +189,8 @@ void AstriMaProcessor::stop(int seconds) {
} }
void AstriMaProcessor::start() { void AstriMaProcessor::start() {
populatePacketStructuresMap();
generatePacketStructuresMap();
switchState(Status::READY); switchState(Status::READY);
if (this->currentState != Status::READY) if (this->currentState != Status::READY)
switchState(Status::STOP); switchState(Status::STOP);
else { else {
...@@ -206,15 +204,12 @@ void AstriMaProcessor::cleanup() { ...@@ -206,15 +204,12 @@ void AstriMaProcessor::cleanup() {
if (receiveAndProcessThread.joinable()) { if (receiveAndProcessThread.joinable()) {
receiveAndProcessThread.join(); receiveAndProcessThread.join();
} }
auto startTime = std::chrono::steady_clock::now(); auto startTime = std::chrono::steady_clock::now();
while (true) { while (true) { // break at the timeout or when there are no more packets to process
if (receiveAndProcessPacket() < 1) { if (receiveAndProcessPacket() < 1) {
printLog(std::cout, "Stop processing"); printLog(std::cout, "Stop processing");
break; break;
} }
auto currentTime = std::chrono::steady_clock::now(); auto currentTime = std::chrono::steady_clock::now();
auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>( auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(
currentTime - startTime).count(); currentTime - startTime).count();
...@@ -224,13 +219,13 @@ void AstriMaProcessor::cleanup() { ...@@ -224,13 +219,13 @@ void AstriMaProcessor::cleanup() {
break; break;
} }
} }
receiver->closeConnectionToClient(); receiver->closeConnectionToClient();
provider->close(); provider->close();
} }
void AstriMaProcessor::generatePacketStructuresMap() {
for (std::pair<std::string, bool> pair : packetsFlags) { void AstriMaProcessor::populatePacketStructuresMap() {
if (pair.second) { for (std::pair<std::string, bool> pair : packetsFlags) { // for each packet type to process
if (pair.second) { // if set to true, it searches for the corresponding .json file
std::string filePath = std::string(ASTRI_PACKETS_PATH).append( std::string filePath = std::string(ASTRI_PACKETS_PATH).append(
"/AstriMA/").append(pair.first).append(".json"); "/AstriMA/").append(pair.first).append(".json");
std::ifstream file(filePath); std::ifstream file(filePath);
......
//============================================================================ /**
// Name : Receiver.cpp * @file main.cpp
// Author : Valerio Pastore * @brief This file contains the main function for the Astri MA Processor.
*
//============================================================================ * The Astri MA Processor is responsible for processing data using the AstriMaProcBuilder.
* It reads configuration from YAML, MySQL, and command line arguments.
* The program starts and stops the processor.
*
* The main function sets up signal handlers for CTRL-C and CTRL-TERM.
* It also prints the program start time and configuration parameters.
* The program runs until the processor's current state is STOP.
* Finally, it cleans up resources and prints the termination message.
*/
#include <Astri_MA_Processor.h> #include <Astri_MA_Processor.h>
#include <Astri_MA_Generic.h> #include <Astri_MA_Generic.h>
...@@ -75,8 +82,7 @@ int main(int argc, char **argv) { ...@@ -75,8 +82,7 @@ int main(int argc, char **argv) {
// START // START
proc->start(); proc->start();
while (proc->getCurrentState() != AstriMaProcessor::Status::STOP) {
while(proc->getCurrentState() != AstriMaProcessor::Status::STOP){
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
} }
......