Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • bias/providers/kafka-avro-provider
1 result
Show changes
Commits on Source (3)
Subproject commit 8a0ea2d0e699863df5fe1c91caf2d7b0855957be
Subproject commit a00f9a27afbf5f75dab7db2368b9b9b6fcb395e1
/*
*
* Created on: Mar 1, 2021
* Author: astrisw
*/
#pragma once
#include <Base_Provider.h>
......@@ -16,53 +10,74 @@
#include <mutex>
namespace inaf::oasbo::Providers {
/**
* @class KafkaAvroProvider
* @brief The KafkaAvroProvider class is a derived class of BaseProvider and provides functionality for sending
* BasePackets to a Kafka topic in Avro format.
* Check Base_Provider for other information.
*/
class KafkaAvroProvider: public BaseProvider {
protected:
//Kafka
RdKafka::Producer *producer = nullptr;
std::atomic<bool> stopFlag = false;
std::thread pollThread;
bool pollThreadFlag = false;
std::mutex closeMutex;
RdKafka::Producer *producer = nullptr; /**< Pointer to the Kafka producer object. */
std::atomic<bool> stopFlag = false; /**< Atomic flag to indicate if the provider should stop. */
std::thread pollThread; /**< Thread for polling Kafka events. */
bool pollThreadFlag = false; /**< Flag to indicate if the poll thread is running. */
std::mutex closeMutex; /**< Mutex for thread-safe closing of the provider. */
class KafkaDeliveryReportCb;
KafkaDeliveryReportCb *dr_cb = nullptr;
class KafkaConnectionCallback;
KafkaConnectionCallback *connection_cb = nullptr;
class KafkaDeliveryReportCb; /**< Forward declaration of the KafkaDeliveryReportCb class. */
KafkaDeliveryReportCb *dr_cb = nullptr; /**< Pointer to the Kafka delivery report callback object. */
class KafkaConnectionCallback; /**< Forward declaration of the KafkaConnectionCallback class. */
KafkaConnectionCallback *connection_cb = nullptr; /**< Pointer to the Kafka connection callback object. */
// AVRO
avro::EncoderPtr avroencoder;
avro::EncoderPtr avroencoder; /**< Pointer to the AVRO encoder object. */
KafkaAvroProvider();
KafkaAvroProvider(std::string ip, int port, std::string topic);
KafkaAvroProvider(); /**< Default constructor. */
KafkaAvroProvider(std::string ip, int port, std::string topic); /**< Constructor with parameters. */
/**
* @brief Function for the polling thread.
*/
void pollingThreadFunction();
void encodeToAvro(PacketLib::BasePacket&);
/**
* @brief Function for encoding a BasePacket to AVRO format.
* @param packet The BasePacket to encode.
*/
void encodeToAvro(Packets::BasePacket&);
public:
std::string brokerIp;
int brokerPort;
std::string brokerIp; /**< The IP address of the Kafka broker. */
int brokerPort; /**< The port number of the Kafka broker. */
void setDest(std::string dest) override;
void setDest(std::string dest) override {
this->dest = dest;
}
std::string getDest() override {
return dest;
}
std::string getDest() override;
int write(PacketLib::BasePacket&) override;
int write(PacketLib::BasePacket&, std::string dest) override;
int write(Packets::BasePacket&) override;
int write(Packets::BasePacket&, std::string dest) override;
int close() override;
int open() override;
bool isOpen() override;
~KafkaAvroProvider();
~KafkaAvroProvider(); /**< Destructor. */
friend class KafkaAvroProviderBuilder;
};
/**
* @class KafkaAvroProviderBuilder
* @brief The KafkaAvroProviderBuilder class is used to build KafkaAvroProvider objects with configurable parameters.
*/
class KafkaAvroProviderBuilder {
protected:
KafkaAvroProvider *provider;
......@@ -77,16 +92,44 @@ public:
KafkaAvroProviderBuilder(std::string ip, int port, std::string topic);
~KafkaAvroProviderBuilder();
/**
* @brief Resets the builder to its initial state.
*/
void reset();
/**
* @brief Configures the builder using a BaseConfigurator object.
* @param conf The BaseConfigurator object to configure from.
* @return A pointer to the KafkaAvroProviderBuilder.
*/
KafkaAvroProviderBuilder* configFrom(Configurators::BaseConfigurator &conf);
/**
* @brief Sets the IP address for the KafkaAvroProvider.
* @param ip The IP address to set.
* @return A pointer to the KafkaAvroProviderBuilder.
*/
KafkaAvroProviderBuilder* setIp(std::string ip);
/**
* @brief Sets the port number for the KafkaAvroProvider.
* @param port The port number to set.
* @return A pointer to the KafkaAvroProviderBuilder.
*/
KafkaAvroProviderBuilder* setPort(int port);
/**
* @brief Sets the topic for the KafkaAvroProvider.
* @param topic The topic to set.
* @return A pointer to the KafkaAvroProviderBuilder.
*/
KafkaAvroProviderBuilder* setTopic(std::string topic);
/**
* @brief Gets the built KafkaAvroProvider object.
* @return A pointer to the built KafkaAvroProvider.
*/
KafkaAvroProvider* getProvider();
};
}
} // namespace inaf::oasbo::Providers
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
#pragma once
#include <sstream>
......@@ -33,309 +32,286 @@ static const int TRIGGER_COUNT_SIZE = 64;
static const int NUM_PDM = 37;
struct PDMBlock {
bool pdmVal;
int32_t spare_1;
int32_t pdmID;
std::vector<int32_t > triggerCounts;
int32_t sipmTemp1;
int32_t sipmTemp2;
int32_t sipmTemp3;
int32_t sipmHighVoltage;
int32_t sipmCurrent;
PDMBlock() :
pdmVal(bool()),
spare_1(int32_t()),
pdmID(int32_t()),
triggerCounts(std::vector<int32_t >()),
sipmTemp1(int32_t()),
sipmTemp2(int32_t()),
sipmTemp3(int32_t()),
sipmHighVoltage(int32_t()),
sipmCurrent(int32_t())
{ }
bool pdmVal;
int32_t spare_1;
int32_t pdmID;
std::vector<int32_t> triggerCounts;
int32_t sipmTemp1;
int32_t sipmTemp2;
int32_t sipmTemp3;
int32_t sipmHighVoltage;
int32_t sipmCurrent;
PDMBlock() :
pdmVal(bool()), spare_1(int32_t()), pdmID(int32_t()), triggerCounts(
std::vector<int32_t>()), sipmTemp1(int32_t()), sipmTemp2(
int32_t()), sipmTemp3(int32_t()), sipmHighVoltage(
int32_t()), sipmCurrent(int32_t()) {
}
};
struct C11 {
int32_t telescopeID;
int32_t type;
int32_t subType;
int32_t ssc;
int32_t packetLength;
int32_t year;
int32_t month;
int32_t day;
int32_t hours;
int32_t minutes;
int32_t seconds;
bool validTime;
int32_t timeTagNanosec;
int32_t eventCounter;
bool lid;
bool fibSt;
bool fibCont;
bool fibPuls;
int32_t rgbCont;
int32_t rgbPuls;
int32_t spare_0;
int32_t pixelTriggerDiscriminatorThreshold;
bool ptm;
int32_t triggeredPixel;
int32_t timeWindow;
int32_t discriminatorChain;
std::vector<PDMBlock > PDMs;
C11() :
telescopeID(int32_t()),
type(int32_t()),
subType(int32_t()),
ssc(int32_t()),
packetLength(int32_t()),
year(int32_t()),
month(int32_t()),
day(int32_t()),
hours(int32_t()),
minutes(int32_t()),
seconds(int32_t()),
validTime(bool()),
timeTagNanosec(int32_t()),
eventCounter(int32_t()),
lid(bool()),
fibSt(bool()),
fibCont(bool()),
fibPuls(bool()),
rgbCont(int32_t()),
rgbPuls(int32_t()),
spare_0(int32_t()),
pixelTriggerDiscriminatorThreshold(int32_t()),
ptm(bool()),
triggeredPixel(int32_t()),
timeWindow(int32_t()),
discriminatorChain(int32_t()),
PDMs(std::vector<PDMBlock >())
{ }
int32_t telescopeID;
int32_t type;
int32_t subType;
int32_t ssc;
int32_t packetLength;
int32_t year;
int32_t month;
int32_t day;
int32_t hours;
int32_t minutes;
int32_t seconds;
bool validTime;
int32_t timeTagNanosec;
int32_t eventCounter;
bool lid;
bool fibSt;
bool fibCont;
bool fibPuls;
int32_t rgbCont;
int32_t rgbPuls;
int32_t spare_0;
int32_t pixelTriggerDiscriminatorThreshold;
bool ptm;
int32_t triggeredPixel;
int32_t timeWindow;
int32_t discriminatorChain;
std::vector<PDMBlock> PDMs;
C11() :
telescopeID(int32_t()), type(int32_t()), subType(int32_t()), ssc(
int32_t()), packetLength(int32_t()), year(int32_t()), month(
int32_t()), day(int32_t()), hours(int32_t()), minutes(
int32_t()), seconds(int32_t()), validTime(bool()), timeTagNanosec(
int32_t()), eventCounter(int32_t()), lid(bool()), fibSt(
bool()), fibCont(bool()), fibPuls(bool()), rgbCont(
int32_t()), rgbPuls(int32_t()), spare_0(int32_t()), pixelTriggerDiscriminatorThreshold(
int32_t()), ptm(bool()), triggeredPixel(int32_t()), timeWindow(
int32_t()), discriminatorChain(int32_t()), PDMs(
std::vector<PDMBlock>()) {
}
};
}
namespace avro {
template<> struct codec_traits<C11::PDMBlock> {
static void encode(Encoder& e, const C11::PDMBlock& v) {
avro::encode(e, v.pdmVal);
avro::encode(e, v.spare_1);
avro::encode(e, v.pdmID);
avro::encode(e, v.triggerCounts);
avro::encode(e, v.sipmTemp1);
avro::encode(e, v.sipmTemp2);
avro::encode(e, v.sipmTemp3);
avro::encode(e, v.sipmHighVoltage);
avro::encode(e, v.sipmCurrent);
}
static void decode(Decoder& d, C11::PDMBlock& v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder *>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.pdmVal);
break;
case 1:
avro::decode(d, v.spare_1);
break;
case 2:
avro::decode(d, v.pdmID);
break;
case 3:
avro::decode(d, v.triggerCounts);
break;
case 4:
avro::decode(d, v.sipmTemp1);
break;
case 5:
avro::decode(d, v.sipmTemp2);
break;
case 6:
avro::decode(d, v.sipmTemp3);
break;
case 7:
avro::decode(d, v.sipmHighVoltage);
break;
case 8:
avro::decode(d, v.sipmCurrent);
break;
default:
break;
}
}
} else {
avro::decode(d, v.pdmVal);
avro::decode(d, v.spare_1);
avro::decode(d, v.pdmID);
avro::decode(d, v.triggerCounts);
avro::decode(d, v.sipmTemp1);
avro::decode(d, v.sipmTemp2);
avro::decode(d, v.sipmTemp3);
avro::decode(d, v.sipmHighVoltage);
avro::decode(d, v.sipmCurrent);
}
}
static void encode(Encoder &e, const C11::PDMBlock &v) {
avro::encode(e, v.pdmVal);
avro::encode(e, v.spare_1);
avro::encode(e, v.pdmID);
avro::encode(e, v.triggerCounts);
avro::encode(e, v.sipmTemp1);
avro::encode(e, v.sipmTemp2);
avro::encode(e, v.sipmTemp3);
avro::encode(e, v.sipmHighVoltage);
avro::encode(e, v.sipmCurrent);
}
static void decode(Decoder &d, C11::PDMBlock &v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder*>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.pdmVal);
break;
case 1:
avro::decode(d, v.spare_1);
break;
case 2:
avro::decode(d, v.pdmID);
break;
case 3:
avro::decode(d, v.triggerCounts);
break;
case 4:
avro::decode(d, v.sipmTemp1);
break;
case 5:
avro::decode(d, v.sipmTemp2);
break;
case 6:
avro::decode(d, v.sipmTemp3);
break;
case 7:
avro::decode(d, v.sipmHighVoltage);
break;
case 8:
avro::decode(d, v.sipmCurrent);
break;
default:
break;
}
}
} else {
avro::decode(d, v.pdmVal);
avro::decode(d, v.spare_1);
avro::decode(d, v.pdmID);
avro::decode(d, v.triggerCounts);
avro::decode(d, v.sipmTemp1);
avro::decode(d, v.sipmTemp2);
avro::decode(d, v.sipmTemp3);
avro::decode(d, v.sipmHighVoltage);
avro::decode(d, v.sipmCurrent);
}
}
};
template<> struct codec_traits<C11::C11> {
static void encode(Encoder& e, const C11::C11& v) {
avro::encode(e, v.telescopeID);
avro::encode(e, v.type);
avro::encode(e, v.subType);
avro::encode(e, v.ssc);
avro::encode(e, v.packetLength);
avro::encode(e, v.year);
avro::encode(e, v.month);
avro::encode(e, v.day);
avro::encode(e, v.hours);
avro::encode(e, v.minutes);
avro::encode(e, v.seconds);
avro::encode(e, v.validTime);
avro::encode(e, v.timeTagNanosec);
avro::encode(e, v.eventCounter);
avro::encode(e, v.lid);
avro::encode(e, v.fibSt);
avro::encode(e, v.fibCont);
avro::encode(e, v.fibPuls);
avro::encode(e, v.rgbCont);
avro::encode(e, v.rgbPuls);
avro::encode(e, v.spare_0);
avro::encode(e, v.pixelTriggerDiscriminatorThreshold);
avro::encode(e, v.ptm);
avro::encode(e, v.triggeredPixel);
avro::encode(e, v.timeWindow);
avro::encode(e, v.discriminatorChain);
avro::encode(e, v.PDMs);
}
static void decode(Decoder& d, C11::C11& v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder *>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.telescopeID);
break;
case 1:
avro::decode(d, v.type);
break;
case 2:
avro::decode(d, v.subType);
break;
case 3:
avro::decode(d, v.ssc);
break;
case 4:
avro::decode(d, v.packetLength);
break;
case 5:
avro::decode(d, v.year);
break;
case 6:
avro::decode(d, v.month);
break;
case 7:
avro::decode(d, v.day);
break;
case 8:
avro::decode(d, v.hours);
break;
case 9:
avro::decode(d, v.minutes);
break;
case 10:
avro::decode(d, v.seconds);
break;
case 11:
avro::decode(d, v.validTime);
break;
case 12:
avro::decode(d, v.timeTagNanosec);
break;
case 13:
avro::decode(d, v.eventCounter);
break;
case 14:
avro::decode(d, v.lid);
break;
case 15:
avro::decode(d, v.fibSt);
break;
case 16:
avro::decode(d, v.fibCont);
break;
case 17:
avro::decode(d, v.fibPuls);
break;
case 18:
avro::decode(d, v.rgbCont);
break;
case 19:
avro::decode(d, v.rgbPuls);
break;
case 20:
avro::decode(d, v.spare_0);
break;
case 21:
avro::decode(d, v.pixelTriggerDiscriminatorThreshold);
break;
case 22:
avro::decode(d, v.ptm);
break;
case 23:
avro::decode(d, v.triggeredPixel);
break;
case 24:
avro::decode(d, v.timeWindow);
break;
case 25:
avro::decode(d, v.discriminatorChain);
break;
case 26:
avro::decode(d, v.PDMs);
break;
default:
break;
}
}
} else {
avro::decode(d, v.telescopeID);
avro::decode(d, v.type);
avro::decode(d, v.subType);
avro::decode(d, v.ssc);
avro::decode(d, v.packetLength);
avro::decode(d, v.year);
avro::decode(d, v.month);
avro::decode(d, v.day);
avro::decode(d, v.hours);
avro::decode(d, v.minutes);
avro::decode(d, v.seconds);
avro::decode(d, v.validTime);
avro::decode(d, v.timeTagNanosec);
avro::decode(d, v.eventCounter);
avro::decode(d, v.lid);
avro::decode(d, v.fibSt);
avro::decode(d, v.fibCont);
avro::decode(d, v.fibPuls);
avro::decode(d, v.rgbCont);
avro::decode(d, v.rgbPuls);
avro::decode(d, v.spare_0);
avro::decode(d, v.pixelTriggerDiscriminatorThreshold);
avro::decode(d, v.ptm);
avro::decode(d, v.triggeredPixel);
avro::decode(d, v.timeWindow);
avro::decode(d, v.discriminatorChain);
avro::decode(d, v.PDMs);
}
}
static void encode(Encoder &e, const C11::C11 &v) {
avro::encode(e, v.telescopeID);
avro::encode(e, v.type);
avro::encode(e, v.subType);
avro::encode(e, v.ssc);
avro::encode(e, v.packetLength);
avro::encode(e, v.year);
avro::encode(e, v.month);
avro::encode(e, v.day);
avro::encode(e, v.hours);
avro::encode(e, v.minutes);
avro::encode(e, v.seconds);
avro::encode(e, v.validTime);
avro::encode(e, v.timeTagNanosec);
avro::encode(e, v.eventCounter);
avro::encode(e, v.lid);
avro::encode(e, v.fibSt);
avro::encode(e, v.fibCont);
avro::encode(e, v.fibPuls);
avro::encode(e, v.rgbCont);
avro::encode(e, v.rgbPuls);
avro::encode(e, v.spare_0);
avro::encode(e, v.pixelTriggerDiscriminatorThreshold);
avro::encode(e, v.ptm);
avro::encode(e, v.triggeredPixel);
avro::encode(e, v.timeWindow);
avro::encode(e, v.discriminatorChain);
avro::encode(e, v.PDMs);
}
static void decode(Decoder &d, C11::C11 &v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder*>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.telescopeID);
break;
case 1:
avro::decode(d, v.type);
break;
case 2:
avro::decode(d, v.subType);
break;
case 3:
avro::decode(d, v.ssc);
break;
case 4:
avro::decode(d, v.packetLength);
break;
case 5:
avro::decode(d, v.year);
break;
case 6:
avro::decode(d, v.month);
break;
case 7:
avro::decode(d, v.day);
break;
case 8:
avro::decode(d, v.hours);
break;
case 9:
avro::decode(d, v.minutes);
break;
case 10:
avro::decode(d, v.seconds);
break;
case 11:
avro::decode(d, v.validTime);
break;
case 12:
avro::decode(d, v.timeTagNanosec);
break;
case 13:
avro::decode(d, v.eventCounter);
break;
case 14:
avro::decode(d, v.lid);
break;
case 15:
avro::decode(d, v.fibSt);
break;
case 16:
avro::decode(d, v.fibCont);
break;
case 17:
avro::decode(d, v.fibPuls);
break;
case 18:
avro::decode(d, v.rgbCont);
break;
case 19:
avro::decode(d, v.rgbPuls);
break;
case 20:
avro::decode(d, v.spare_0);
break;
case 21:
avro::decode(d, v.pixelTriggerDiscriminatorThreshold);
break;
case 22:
avro::decode(d, v.ptm);
break;
case 23:
avro::decode(d, v.triggeredPixel);
break;
case 24:
avro::decode(d, v.timeWindow);
break;
case 25:
avro::decode(d, v.discriminatorChain);
break;
case 26:
avro::decode(d, v.PDMs);
break;
default:
break;
}
}
} else {
avro::decode(d, v.telescopeID);
avro::decode(d, v.type);
avro::decode(d, v.subType);
avro::decode(d, v.ssc);
avro::decode(d, v.packetLength);
avro::decode(d, v.year);
avro::decode(d, v.month);
avro::decode(d, v.day);
avro::decode(d, v.hours);
avro::decode(d, v.minutes);
avro::decode(d, v.seconds);
avro::decode(d, v.validTime);
avro::decode(d, v.timeTagNanosec);
avro::decode(d, v.eventCounter);
avro::decode(d, v.lid);
avro::decode(d, v.fibSt);
avro::decode(d, v.fibCont);
avro::decode(d, v.fibPuls);
avro::decode(d, v.rgbCont);
avro::decode(d, v.rgbPuls);
avro::decode(d, v.spare_0);
avro::decode(d, v.pixelTriggerDiscriminatorThreshold);
avro::decode(d, v.ptm);
avro::decode(d, v.triggeredPixel);
avro::decode(d, v.timeWindow);
avro::decode(d, v.discriminatorChain);
avro::decode(d, v.PDMs);
}
}
};
void encodeC11(Encoder &avroencoder,
inaf::oasbo::PacketLib::BasePacket &packet) {
void encodeC11(Encoder &avroencoder, inaf::oasbo::Packets::BasePacket &packet) {
// PACKET HEADER
int offset;
for (offset = 0; offset < 11; offset++)
......@@ -371,9 +347,9 @@ void encodeC11(Encoder &avroencoder,
avroencoder.arrayStart();
avroencoder.setItemCount(C11::TRIGGER_COUNT_SIZE);
for(int i = 0; i < C11::TRIGGER_COUNT_SIZE; i++){
for (int i = 0; i < C11::TRIGGER_COUNT_SIZE; i++) {
avroencoder.startItem();
avro::encode(avroencoder, (int32_t) packet[offset+i].value());
avro::encode(avroencoder, (int32_t) packet[offset + i].value());
}
offset += C11::TRIGGER_COUNT_SIZE;
avroencoder.arrayEnd();
......
This diff is collapsed.
......@@ -27,81 +27,77 @@
#include <Base_Packet.h>
namespace CMD151 {
struct CMD151 {
int32_t telescopeID;
int32_t type;
int32_t subType;
int32_t ssc;
int32_t packetLength;
int32_t spare;
int32_t sync;
CMD151() :
telescopeID(int32_t()),
type(int32_t()),
subType(int32_t()),
ssc(int32_t()),
packetLength(int32_t()),
spare(int32_t()),
sync(int32_t())
{ }
int32_t telescopeID;
int32_t type;
int32_t subType;
int32_t ssc;
int32_t packetLength;
int32_t spare;
int32_t sync;
CMD151() :
telescopeID(int32_t()), type(int32_t()), subType(int32_t()), ssc(
int32_t()), packetLength(int32_t()), spare(int32_t()), sync(
int32_t()) {
}
};
}
namespace avro {
template<> struct codec_traits<CMD151::CMD151> {
static void encode(Encoder& e, const CMD151::CMD151& v) {
avro::encode(e, v.telescopeID);
avro::encode(e, v.type);
avro::encode(e, v.subType);
avro::encode(e, v.ssc);
avro::encode(e, v.packetLength);
avro::encode(e, v.spare);
avro::encode(e, v.sync);
}
static void decode(Decoder& d, CMD151::CMD151& v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder *>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.telescopeID);
break;
case 1:
avro::decode(d, v.type);
break;
case 2:
avro::decode(d, v.subType);
break;
case 3:
avro::decode(d, v.ssc);
break;
case 4:
avro::decode(d, v.packetLength);
break;
case 5:
avro::decode(d, v.spare);
break;
case 6:
avro::decode(d, v.sync);
break;
default:
break;
}
}
} else {
avro::decode(d, v.telescopeID);
avro::decode(d, v.type);
avro::decode(d, v.subType);
avro::decode(d, v.ssc);
avro::decode(d, v.packetLength);
avro::decode(d, v.spare);
avro::decode(d, v.sync);
}
}
static void encode(Encoder &e, const CMD151::CMD151 &v) {
avro::encode(e, v.telescopeID);
avro::encode(e, v.type);
avro::encode(e, v.subType);
avro::encode(e, v.ssc);
avro::encode(e, v.packetLength);
avro::encode(e, v.spare);
avro::encode(e, v.sync);
}
static void decode(Decoder &d, CMD151::CMD151 &v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder*>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.telescopeID);
break;
case 1:
avro::decode(d, v.type);
break;
case 2:
avro::decode(d, v.subType);
break;
case 3:
avro::decode(d, v.ssc);
break;
case 4:
avro::decode(d, v.packetLength);
break;
case 5:
avro::decode(d, v.spare);
break;
case 6:
avro::decode(d, v.sync);
break;
default:
break;
}
}
} else {
avro::decode(d, v.telescopeID);
avro::decode(d, v.type);
avro::decode(d, v.subType);
avro::decode(d, v.ssc);
avro::decode(d, v.packetLength);
avro::decode(d, v.spare);
avro::decode(d, v.sync);
}
}
};
void encodeCMD151(Encoder &avroencoder,
inaf::oasbo::PacketLib::BasePacket &packet) {
inaf::oasbo::Packets::BasePacket &packet) {
// PACKET HEADER
int offset;
for (offset = 0; offset < 7; offset++)
......
This diff is collapsed.
......@@ -34,7 +34,6 @@ static const int LOW_GAINS_SIZE = 64;
static const int TIME_TRIGGERS_SIZE = 64;
static const int NUM_PDM = 37;
struct PDMBlock {
bool pdmVal;
bool trgEnb;
......@@ -360,8 +359,7 @@ template<> struct codec_traits<S22::S22> {
}
};
void encodeS22(Encoder &avroencoder,
inaf::oasbo::PacketLib::BasePacket &packet) {
void encodeS22(Encoder &avroencoder, inaf::oasbo::Packets::BasePacket &packet) {
// PACKET HEADER
int offset;
for (offset = 0; offset < 11; offset++)
......@@ -395,25 +393,25 @@ void encodeS22(Encoder &avroencoder,
avroencoder.arrayStart();
avroencoder.setItemCount(S22::HIGH_GAINS_SIZE);
for(int i = 0; i < S22::HIGH_GAINS_SIZE; i++){
for (int i = 0; i < S22::HIGH_GAINS_SIZE; i++) {
avroencoder.startItem();
avro::encode(avroencoder, (int32_t) packet[offset+i].value());
avro::encode(avroencoder, (int32_t) packet[offset + i].value());
}
offset += S22::HIGH_GAINS_SIZE;
avroencoder.arrayEnd();
avroencoder.arrayStart();
avroencoder.setItemCount(S22::LOW_GAINS_SIZE);
for(int i = 0; i < S22::LOW_GAINS_SIZE; i++){
for (int i = 0; i < S22::LOW_GAINS_SIZE; i++) {
avroencoder.startItem();
avro::encode(avroencoder, (int32_t) packet[offset+i].value());
avro::encode(avroencoder, (int32_t) packet[offset + i].value());
}
offset += S22::LOW_GAINS_SIZE;
avroencoder.arrayEnd();
avroencoder.arrayStart();
avroencoder.setItemCount(S22::TIME_TRIGGERS_SIZE);
for(int i = 0; i < S22::TIME_TRIGGERS_SIZE; i++){
for (int i = 0; i < S22::TIME_TRIGGERS_SIZE; i++) {
avroencoder.startItem();
avro::encode(avroencoder, (int32_t) packet[offset+i].value());
avro::encode(avroencoder, (int32_t) packet[offset + i].value());
}
offset += S22::TIME_TRIGGERS_SIZE;
avroencoder.arrayEnd();
......
......@@ -31,238 +31,222 @@ static const int VAR_PIXELS_SIZE = 64;
static const int NUM_PDM = 37;
struct PDMBlock {
bool pdmVal;
int32_t samplingPar;
int32_t pdmID;
std::vector<int32_t > var_pixels;
PDMBlock() :
pdmVal(bool()),
samplingPar(int32_t()),
pdmID(int32_t()),
var_pixels(std::vector<int32_t >())
{ }
bool pdmVal;
int32_t samplingPar;
int32_t pdmID;
std::vector<int32_t> var_pixels;
PDMBlock() :
pdmVal(bool()), samplingPar(int32_t()), pdmID(int32_t()), var_pixels(
std::vector<int32_t>()) {
}
};
struct VAR102 {
int32_t telescopeID;
int32_t type;
int32_t subType;
int32_t ssc;
int32_t packetLength;
int32_t year;
int32_t month;
int32_t day;
int32_t hours;
int32_t minutes;
int32_t seconds;
bool validTime;
int32_t timeTagNanosec;
int32_t eventCounter;
bool lid;
bool fibSt;
bool fibCont;
bool fibPuls;
int32_t rgbCont;
int32_t rgbPuls;
int32_t spare_0;
std::vector<PDMBlock > PDMs;
VAR102() :
telescopeID(int32_t()),
type(int32_t()),
subType(int32_t()),
ssc(int32_t()),
packetLength(int32_t()),
year(int32_t()),
month(int32_t()),
day(int32_t()),
hours(int32_t()),
minutes(int32_t()),
seconds(int32_t()),
validTime(bool()),
timeTagNanosec(int32_t()),
eventCounter(int32_t()),
lid(bool()),
fibSt(bool()),
fibCont(bool()),
fibPuls(bool()),
rgbCont(int32_t()),
rgbPuls(int32_t()),
spare_0(int32_t()),
PDMs(std::vector<PDMBlock >())
{ }
int32_t telescopeID;
int32_t type;
int32_t subType;
int32_t ssc;
int32_t packetLength;
int32_t year;
int32_t month;
int32_t day;
int32_t hours;
int32_t minutes;
int32_t seconds;
bool validTime;
int32_t timeTagNanosec;
int32_t eventCounter;
bool lid;
bool fibSt;
bool fibCont;
bool fibPuls;
int32_t rgbCont;
int32_t rgbPuls;
int32_t spare_0;
std::vector<PDMBlock> PDMs;
VAR102() :
telescopeID(int32_t()), type(int32_t()), subType(int32_t()), ssc(
int32_t()), packetLength(int32_t()), year(int32_t()), month(
int32_t()), day(int32_t()), hours(int32_t()), minutes(
int32_t()), seconds(int32_t()), validTime(bool()), timeTagNanosec(
int32_t()), eventCounter(int32_t()), lid(bool()), fibSt(
bool()), fibCont(bool()), fibPuls(bool()), rgbCont(
int32_t()), rgbPuls(int32_t()), spare_0(int32_t()), PDMs(
std::vector<PDMBlock>()) {
}
};
}
namespace avro {
template<> struct codec_traits<VAR102::PDMBlock> {
static void encode(Encoder& e, const VAR102::PDMBlock& v) {
avro::encode(e, v.pdmVal);
avro::encode(e, v.samplingPar);
avro::encode(e, v.pdmID);
avro::encode(e, v.var_pixels);
}
static void decode(Decoder& d, VAR102::PDMBlock& v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder *>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.pdmVal);
break;
case 1:
avro::decode(d, v.samplingPar);
break;
case 2:
avro::decode(d, v.pdmID);
break;
case 3:
avro::decode(d, v.var_pixels);
break;
default:
break;
}
}
} else {
avro::decode(d, v.pdmVal);
avro::decode(d, v.samplingPar);
avro::decode(d, v.pdmID);
avro::decode(d, v.var_pixels);
}
}
static void encode(Encoder &e, const VAR102::PDMBlock &v) {
avro::encode(e, v.pdmVal);
avro::encode(e, v.samplingPar);
avro::encode(e, v.pdmID);
avro::encode(e, v.var_pixels);
}
static void decode(Decoder &d, VAR102::PDMBlock &v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder*>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.pdmVal);
break;
case 1:
avro::decode(d, v.samplingPar);
break;
case 2:
avro::decode(d, v.pdmID);
break;
case 3:
avro::decode(d, v.var_pixels);
break;
default:
break;
}
}
} else {
avro::decode(d, v.pdmVal);
avro::decode(d, v.samplingPar);
avro::decode(d, v.pdmID);
avro::decode(d, v.var_pixels);
}
}
};
template<> struct codec_traits<VAR102::VAR102> {
static void encode(Encoder& e, const VAR102::VAR102& v) {
avro::encode(e, v.telescopeID);
avro::encode(e, v.type);
avro::encode(e, v.subType);
avro::encode(e, v.ssc);
avro::encode(e, v.packetLength);
avro::encode(e, v.year);
avro::encode(e, v.month);
avro::encode(e, v.day);
avro::encode(e, v.hours);
avro::encode(e, v.minutes);
avro::encode(e, v.seconds);
avro::encode(e, v.validTime);
avro::encode(e, v.timeTagNanosec);
avro::encode(e, v.eventCounter);
avro::encode(e, v.lid);
avro::encode(e, v.fibSt);
avro::encode(e, v.fibCont);
avro::encode(e, v.fibPuls);
avro::encode(e, v.rgbCont);
avro::encode(e, v.rgbPuls);
avro::encode(e, v.spare_0);
avro::encode(e, v.PDMs);
}
static void decode(Decoder& d, VAR102::VAR102& v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder *>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.telescopeID);
break;
case 1:
avro::decode(d, v.type);
break;
case 2:
avro::decode(d, v.subType);
break;
case 3:
avro::decode(d, v.ssc);
break;
case 4:
avro::decode(d, v.packetLength);
break;
case 5:
avro::decode(d, v.year);
break;
case 6:
avro::decode(d, v.month);
break;
case 7:
avro::decode(d, v.day);
break;
case 8:
avro::decode(d, v.hours);
break;
case 9:
avro::decode(d, v.minutes);
break;
case 10:
avro::decode(d, v.seconds);
break;
case 11:
avro::decode(d, v.validTime);
break;
case 12:
avro::decode(d, v.timeTagNanosec);
break;
case 13:
avro::decode(d, v.eventCounter);
break;
case 14:
avro::decode(d, v.lid);
break;
case 15:
avro::decode(d, v.fibSt);
break;
case 16:
avro::decode(d, v.fibCont);
break;
case 17:
avro::decode(d, v.fibPuls);
break;
case 18:
avro::decode(d, v.rgbCont);
break;
case 19:
avro::decode(d, v.rgbPuls);
break;
case 20:
avro::decode(d, v.spare_0);
break;
case 21:
avro::decode(d, v.PDMs);
break;
default:
break;
}
}
} else {
avro::decode(d, v.telescopeID);
avro::decode(d, v.type);
avro::decode(d, v.subType);
avro::decode(d, v.ssc);
avro::decode(d, v.packetLength);
avro::decode(d, v.year);
avro::decode(d, v.month);
avro::decode(d, v.day);
avro::decode(d, v.hours);
avro::decode(d, v.minutes);
avro::decode(d, v.seconds);
avro::decode(d, v.validTime);
avro::decode(d, v.timeTagNanosec);
avro::decode(d, v.eventCounter);
avro::decode(d, v.lid);
avro::decode(d, v.fibSt);
avro::decode(d, v.fibCont);
avro::decode(d, v.fibPuls);
avro::decode(d, v.rgbCont);
avro::decode(d, v.rgbPuls);
avro::decode(d, v.spare_0);
avro::decode(d, v.PDMs);
}
}
static void encode(Encoder &e, const VAR102::VAR102 &v) {
avro::encode(e, v.telescopeID);
avro::encode(e, v.type);
avro::encode(e, v.subType);
avro::encode(e, v.ssc);
avro::encode(e, v.packetLength);
avro::encode(e, v.year);
avro::encode(e, v.month);
avro::encode(e, v.day);
avro::encode(e, v.hours);
avro::encode(e, v.minutes);
avro::encode(e, v.seconds);
avro::encode(e, v.validTime);
avro::encode(e, v.timeTagNanosec);
avro::encode(e, v.eventCounter);
avro::encode(e, v.lid);
avro::encode(e, v.fibSt);
avro::encode(e, v.fibCont);
avro::encode(e, v.fibPuls);
avro::encode(e, v.rgbCont);
avro::encode(e, v.rgbPuls);
avro::encode(e, v.spare_0);
avro::encode(e, v.PDMs);
}
static void decode(Decoder &d, VAR102::VAR102 &v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder*>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.telescopeID);
break;
case 1:
avro::decode(d, v.type);
break;
case 2:
avro::decode(d, v.subType);
break;
case 3:
avro::decode(d, v.ssc);
break;
case 4:
avro::decode(d, v.packetLength);
break;
case 5:
avro::decode(d, v.year);
break;
case 6:
avro::decode(d, v.month);
break;
case 7:
avro::decode(d, v.day);
break;
case 8:
avro::decode(d, v.hours);
break;
case 9:
avro::decode(d, v.minutes);
break;
case 10:
avro::decode(d, v.seconds);
break;
case 11:
avro::decode(d, v.validTime);
break;
case 12:
avro::decode(d, v.timeTagNanosec);
break;
case 13:
avro::decode(d, v.eventCounter);
break;
case 14:
avro::decode(d, v.lid);
break;
case 15:
avro::decode(d, v.fibSt);
break;
case 16:
avro::decode(d, v.fibCont);
break;
case 17:
avro::decode(d, v.fibPuls);
break;
case 18:
avro::decode(d, v.rgbCont);
break;
case 19:
avro::decode(d, v.rgbPuls);
break;
case 20:
avro::decode(d, v.spare_0);
break;
case 21:
avro::decode(d, v.PDMs);
break;
default:
break;
}
}
} else {
avro::decode(d, v.telescopeID);
avro::decode(d, v.type);
avro::decode(d, v.subType);
avro::decode(d, v.ssc);
avro::decode(d, v.packetLength);
avro::decode(d, v.year);
avro::decode(d, v.month);
avro::decode(d, v.day);
avro::decode(d, v.hours);
avro::decode(d, v.minutes);
avro::decode(d, v.seconds);
avro::decode(d, v.validTime);
avro::decode(d, v.timeTagNanosec);
avro::decode(d, v.eventCounter);
avro::decode(d, v.lid);
avro::decode(d, v.fibSt);
avro::decode(d, v.fibCont);
avro::decode(d, v.fibPuls);
avro::decode(d, v.rgbCont);
avro::decode(d, v.rgbPuls);
avro::decode(d, v.spare_0);
avro::decode(d, v.PDMs);
}
}
};
void encodeVAR102(Encoder &avroencoder,
inaf::oasbo::PacketLib::BasePacket &packet) {
inaf::oasbo::Packets::BasePacket &packet) {
// PACKET HEADER
int offset;
for (offset = 0; offset < 11; offset++)
......@@ -293,9 +277,9 @@ void encodeVAR102(Encoder &avroencoder,
avroencoder.arrayStart();
avroencoder.setItemCount(VAR102::VAR_PIXELS_SIZE);
for(int i = 0; i < VAR102::VAR_PIXELS_SIZE; i++){
for (int i = 0; i < VAR102::VAR_PIXELS_SIZE; i++) {
avroencoder.startItem();
avro::encode(avroencoder, (int32_t) packet[offset+i].value());
avro::encode(avroencoder, (int32_t) packet[offset + i].value());
}
offset += VAR102::VAR_PIXELS_SIZE;
avroencoder.arrayEnd();
......
......@@ -32,238 +32,222 @@ static const int VAR_PIXELS_SIZE = 64;
static const int NUM_PDM = 37;
struct PDMBlock {
bool pdmVal;
int32_t samplingPar;
int32_t pdmID;
std::vector<int32_t > var_pixels;
PDMBlock() :
pdmVal(bool()),
samplingPar(int32_t()),
pdmID(int32_t()),
var_pixels(std::vector<int32_t >())
{ }
bool pdmVal;
int32_t samplingPar;
int32_t pdmID;
std::vector<int32_t> var_pixels;
PDMBlock() :
pdmVal(bool()), samplingPar(int32_t()), pdmID(int32_t()), var_pixels(
std::vector<int32_t>()) {
}
};
struct VAR103 {
int32_t telescopeID;
int32_t type;
int32_t subType;
int32_t ssc;
int32_t packetLength;
int32_t year;
int32_t month;
int32_t day;
int32_t hours;
int32_t minutes;
int32_t seconds;
bool validTime;
int32_t timeTagNanosec;
int32_t eventCounter;
bool lid;
bool fibSt;
bool fibCont;
bool fibPuls;
int32_t rgbCont;
int32_t rgbPuls;
int32_t spare_0;
std::vector<PDMBlock > PDMs;
VAR103() :
telescopeID(int32_t()),
type(int32_t()),
subType(int32_t()),
ssc(int32_t()),
packetLength(int32_t()),
year(int32_t()),
month(int32_t()),
day(int32_t()),
hours(int32_t()),
minutes(int32_t()),
seconds(int32_t()),
validTime(bool()),
timeTagNanosec(int32_t()),
eventCounter(int32_t()),
lid(bool()),
fibSt(bool()),
fibCont(bool()),
fibPuls(bool()),
rgbCont(int32_t()),
rgbPuls(int32_t()),
spare_0(int32_t()),
PDMs(std::vector<PDMBlock >())
{ }
int32_t telescopeID;
int32_t type;
int32_t subType;
int32_t ssc;
int32_t packetLength;
int32_t year;
int32_t month;
int32_t day;
int32_t hours;
int32_t minutes;
int32_t seconds;
bool validTime;
int32_t timeTagNanosec;
int32_t eventCounter;
bool lid;
bool fibSt;
bool fibCont;
bool fibPuls;
int32_t rgbCont;
int32_t rgbPuls;
int32_t spare_0;
std::vector<PDMBlock> PDMs;
VAR103() :
telescopeID(int32_t()), type(int32_t()), subType(int32_t()), ssc(
int32_t()), packetLength(int32_t()), year(int32_t()), month(
int32_t()), day(int32_t()), hours(int32_t()), minutes(
int32_t()), seconds(int32_t()), validTime(bool()), timeTagNanosec(
int32_t()), eventCounter(int32_t()), lid(bool()), fibSt(
bool()), fibCont(bool()), fibPuls(bool()), rgbCont(
int32_t()), rgbPuls(int32_t()), spare_0(int32_t()), PDMs(
std::vector<PDMBlock>()) {
}
};
}
namespace avro {
template<> struct codec_traits<VAR103::PDMBlock> {
static void encode(Encoder& e, const VAR103::PDMBlock& v) {
avro::encode(e, v.pdmVal);
avro::encode(e, v.samplingPar);
avro::encode(e, v.pdmID);
avro::encode(e, v.var_pixels);
}
static void decode(Decoder& d, VAR103::PDMBlock& v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder *>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.pdmVal);
break;
case 1:
avro::decode(d, v.samplingPar);
break;
case 2:
avro::decode(d, v.pdmID);
break;
case 3:
avro::decode(d, v.var_pixels);
break;
default:
break;
}
}
} else {
avro::decode(d, v.pdmVal);
avro::decode(d, v.samplingPar);
avro::decode(d, v.pdmID);
avro::decode(d, v.var_pixels);
}
}
static void encode(Encoder &e, const VAR103::PDMBlock &v) {
avro::encode(e, v.pdmVal);
avro::encode(e, v.samplingPar);
avro::encode(e, v.pdmID);
avro::encode(e, v.var_pixels);
}
static void decode(Decoder &d, VAR103::PDMBlock &v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder*>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.pdmVal);
break;
case 1:
avro::decode(d, v.samplingPar);
break;
case 2:
avro::decode(d, v.pdmID);
break;
case 3:
avro::decode(d, v.var_pixels);
break;
default:
break;
}
}
} else {
avro::decode(d, v.pdmVal);
avro::decode(d, v.samplingPar);
avro::decode(d, v.pdmID);
avro::decode(d, v.var_pixels);
}
}
};
template<> struct codec_traits<VAR103::VAR103> {
static void encode(Encoder& e, const VAR103::VAR103& v) {
avro::encode(e, v.telescopeID);
avro::encode(e, v.type);
avro::encode(e, v.subType);
avro::encode(e, v.ssc);
avro::encode(e, v.packetLength);
avro::encode(e, v.year);
avro::encode(e, v.month);
avro::encode(e, v.day);
avro::encode(e, v.hours);
avro::encode(e, v.minutes);
avro::encode(e, v.seconds);
avro::encode(e, v.validTime);
avro::encode(e, v.timeTagNanosec);
avro::encode(e, v.eventCounter);
avro::encode(e, v.lid);
avro::encode(e, v.fibSt);
avro::encode(e, v.fibCont);
avro::encode(e, v.fibPuls);
avro::encode(e, v.rgbCont);
avro::encode(e, v.rgbPuls);
avro::encode(e, v.spare_0);
avro::encode(e, v.PDMs);
}
static void decode(Decoder& d, VAR103::VAR103& v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder *>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.telescopeID);
break;
case 1:
avro::decode(d, v.type);
break;
case 2:
avro::decode(d, v.subType);
break;
case 3:
avro::decode(d, v.ssc);
break;
case 4:
avro::decode(d, v.packetLength);
break;
case 5:
avro::decode(d, v.year);
break;
case 6:
avro::decode(d, v.month);
break;
case 7:
avro::decode(d, v.day);
break;
case 8:
avro::decode(d, v.hours);
break;
case 9:
avro::decode(d, v.minutes);
break;
case 10:
avro::decode(d, v.seconds);
break;
case 11:
avro::decode(d, v.validTime);
break;
case 12:
avro::decode(d, v.timeTagNanosec);
break;
case 13:
avro::decode(d, v.eventCounter);
break;
case 14:
avro::decode(d, v.lid);
break;
case 15:
avro::decode(d, v.fibSt);
break;
case 16:
avro::decode(d, v.fibCont);
break;
case 17:
avro::decode(d, v.fibPuls);
break;
case 18:
avro::decode(d, v.rgbCont);
break;
case 19:
avro::decode(d, v.rgbPuls);
break;
case 20:
avro::decode(d, v.spare_0);
break;
case 21:
avro::decode(d, v.PDMs);
break;
default:
break;
}
}
} else {
avro::decode(d, v.telescopeID);
avro::decode(d, v.type);
avro::decode(d, v.subType);
avro::decode(d, v.ssc);
avro::decode(d, v.packetLength);
avro::decode(d, v.year);
avro::decode(d, v.month);
avro::decode(d, v.day);
avro::decode(d, v.hours);
avro::decode(d, v.minutes);
avro::decode(d, v.seconds);
avro::decode(d, v.validTime);
avro::decode(d, v.timeTagNanosec);
avro::decode(d, v.eventCounter);
avro::decode(d, v.lid);
avro::decode(d, v.fibSt);
avro::decode(d, v.fibCont);
avro::decode(d, v.fibPuls);
avro::decode(d, v.rgbCont);
avro::decode(d, v.rgbPuls);
avro::decode(d, v.spare_0);
avro::decode(d, v.PDMs);
}
}
static void encode(Encoder &e, const VAR103::VAR103 &v) {
avro::encode(e, v.telescopeID);
avro::encode(e, v.type);
avro::encode(e, v.subType);
avro::encode(e, v.ssc);
avro::encode(e, v.packetLength);
avro::encode(e, v.year);
avro::encode(e, v.month);
avro::encode(e, v.day);
avro::encode(e, v.hours);
avro::encode(e, v.minutes);
avro::encode(e, v.seconds);
avro::encode(e, v.validTime);
avro::encode(e, v.timeTagNanosec);
avro::encode(e, v.eventCounter);
avro::encode(e, v.lid);
avro::encode(e, v.fibSt);
avro::encode(e, v.fibCont);
avro::encode(e, v.fibPuls);
avro::encode(e, v.rgbCont);
avro::encode(e, v.rgbPuls);
avro::encode(e, v.spare_0);
avro::encode(e, v.PDMs);
}
static void decode(Decoder &d, VAR103::VAR103 &v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder*>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.telescopeID);
break;
case 1:
avro::decode(d, v.type);
break;
case 2:
avro::decode(d, v.subType);
break;
case 3:
avro::decode(d, v.ssc);
break;
case 4:
avro::decode(d, v.packetLength);
break;
case 5:
avro::decode(d, v.year);
break;
case 6:
avro::decode(d, v.month);
break;
case 7:
avro::decode(d, v.day);
break;
case 8:
avro::decode(d, v.hours);
break;
case 9:
avro::decode(d, v.minutes);
break;
case 10:
avro::decode(d, v.seconds);
break;
case 11:
avro::decode(d, v.validTime);
break;
case 12:
avro::decode(d, v.timeTagNanosec);
break;
case 13:
avro::decode(d, v.eventCounter);
break;
case 14:
avro::decode(d, v.lid);
break;
case 15:
avro::decode(d, v.fibSt);
break;
case 16:
avro::decode(d, v.fibCont);
break;
case 17:
avro::decode(d, v.fibPuls);
break;
case 18:
avro::decode(d, v.rgbCont);
break;
case 19:
avro::decode(d, v.rgbPuls);
break;
case 20:
avro::decode(d, v.spare_0);
break;
case 21:
avro::decode(d, v.PDMs);
break;
default:
break;
}
}
} else {
avro::decode(d, v.telescopeID);
avro::decode(d, v.type);
avro::decode(d, v.subType);
avro::decode(d, v.ssc);
avro::decode(d, v.packetLength);
avro::decode(d, v.year);
avro::decode(d, v.month);
avro::decode(d, v.day);
avro::decode(d, v.hours);
avro::decode(d, v.minutes);
avro::decode(d, v.seconds);
avro::decode(d, v.validTime);
avro::decode(d, v.timeTagNanosec);
avro::decode(d, v.eventCounter);
avro::decode(d, v.lid);
avro::decode(d, v.fibSt);
avro::decode(d, v.fibCont);
avro::decode(d, v.fibPuls);
avro::decode(d, v.rgbCont);
avro::decode(d, v.rgbPuls);
avro::decode(d, v.spare_0);
avro::decode(d, v.PDMs);
}
}
};
void encodeVAR103(Encoder &avroencoder,
inaf::oasbo::PacketLib::BasePacket &packet) {
inaf::oasbo::Packets::BasePacket &packet) {
// PACKET HEADER
int offset;
for (offset = 0; offset < 11; offset++)
......@@ -294,9 +278,9 @@ void encodeVAR103(Encoder &avroencoder,
avroencoder.arrayStart();
avroencoder.setItemCount(VAR103::VAR_PIXELS_SIZE);
for(int i = 0; i < VAR103::VAR_PIXELS_SIZE; i++){
for (int i = 0; i < VAR103::VAR_PIXELS_SIZE; i++) {
avroencoder.startItem();
avro::encode(avroencoder, (int32_t) packet[offset+i].value());
avro::encode(avroencoder, (int32_t) packet[offset + i].value());
}
offset += VAR103::VAR_PIXELS_SIZE;
avroencoder.arrayEnd();
......
......@@ -63,8 +63,8 @@ KafkaAvroProvider::KafkaAvroProvider(std::string ip, int port,
KafkaAvroProvider::~KafkaAvroProvider() {
this->stopFlag = true; // Stopping the polling thread
if (pollThreadFlag){
if(this->pollThread.joinable()){
if (pollThreadFlag) {
if (this->pollThread.joinable()) {
pollThread.join();
}
}
......@@ -187,15 +187,15 @@ void KafkaAvroProvider::pollingThreadFunction() {
}
now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t" << "polling thread exit..."
<< "]\t[KafkaAvro Provider]\t" << "polling thread stop..."
<< std::endl;
}
int KafkaAvroProvider::write(PacketLib::BasePacket &packet) {
int KafkaAvroProvider::write(Packets::BasePacket &packet) {
return KafkaAvroProvider::write(packet, this->dest);
}
int KafkaAvroProvider::write(PacketLib::BasePacket &packet, std::string dest) {
int KafkaAvroProvider::write(Packets::BasePacket &packet, std::string dest) {
if (!isOpen()) {
time_t now = time(nullptr);
std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
......@@ -247,7 +247,7 @@ int KafkaAvroProvider::write(PacketLib::BasePacket &packet, std::string dest) {
return -1;
}
void KafkaAvroProvider::encodeToAvro(PacketLib::BasePacket &packet) {
void KafkaAvroProvider::encodeToAvro(Packets::BasePacket &packet) {
auto type = packet[1].value();
auto subtype = packet[2].value();
if (type == 2 && subtype == 2) {
......@@ -287,3 +287,12 @@ void KafkaAvroProvider::encodeToAvro(PacketLib::BasePacket &packet) {
}
}
void KafkaAvroProvider::setDest(std::string dest){
this->dest = dest;
}
std::string KafkaAvroProvider::getDest(){
return this->dest;
}