Skip to content
Snippets Groups Projects
Commit 70c1da20 authored by Valerio Pastore's avatar Valerio Pastore
Browse files

avro encoding rewrite for S22

parent bda254c1
No related branches found
No related tags found
No related merge requests found
......@@ -33,9 +33,6 @@ protected:
void pollingThreadFunction();
void encodeToAvro(PacketLib::BasePacket&);
void encodeS22(PacketLib::BasePacket&);
std::vector<int> pdms;
public:
std::string brokerIp;
......
{
"name": "S22",
"type": "record",
"fields": [
{
"name": "S22",
"type": "record",
"fields": [
{"name" : "telescopeID", "type" : "int"},
{"name" : "type", "type" : "int"},
{"name" : "subType", "type" : "int"},
{"name" : "ssc", "type" : "int"},
{"name" : "year", "type" : "int"},
{"name" : "month", "type" : "int"},
{"name" : "day", "type" : "int"},
{"name" : "hours", "type" : "int"},
{"name" : "minutes", "type" : "int"},
{"name" : "seconds", "type" : "int"},
{"name" : "validTime", "type" : "boolean"},
{"name" : "timeTagNanosec", "type" : "int"},
{"name" : "pixelTriggerDiscriminatorThreshold_pe", "type" : "int"},
{"name" : "pixelTriggerLowelDiscriminatorThreshold_pe", "type" : "int"},
{"name" : "triggerType", "type" : {"type": "enum","name": "triggerTypeEnum","symbols" : ["TOPOLOGICO", "MAJORITY"]}},
{"name" : "triggerConfig", "type" : "int"},
{"name" : "PDMs", "type" : {"type" : "array","items" : {
"name" : "PDMBlock",
"type" : "record",
"fields" : [
{"name" : "pdmVal", "type" : "boolean"},
{"name" : "triggerEnabled", "type" : "boolean"},
{"name" : "triggered", "type" : "boolean"},
{"name" : "pdmID", "type" : "int"},
{"name" : "highgains", "type": {"type" : "array","items" : "int"}},
{"name" : "lowgains", "type": {"type" : "array","items" : "int"}}
]
}}}
]
"name": "telescopeID",
"type": "int"
},
{
"name": "type",
"type": "int"
},
{
"name": "subType",
"type": "int"
},
{
"name": "ssc",
"type": "int"
},
{
"name": "packetLength",
"type": "int"
},
{
"name": "year",
"type": "int"
},
{
"name": "month",
"type": "int"
},
{
"name": "day",
"type": "int"
},
{
"name": "hours",
"type": "int"
},
{
"name": "minutes",
"type": "int"
},
{
"name": "seconds",
"type": "int"
},
{
"name": "validTime",
"type": "boolean"
},
{
"name": "timeTagNanosec",
"type": "int"
},
{
"name": "eventCounter",
"type": "int"
},
{
"name": "lid",
"type": "boolean"
},
{
"name": "fibSt",
"type": "boolean"
},
{
"name": "fibCont",
"type": "boolean"
},
{
"name": "fibPuls",
"type": "boolean"
},
{
"name": "rgbCont",
"type": "int"
},
{
"name": "rgbPuls",
"type": "int"
},
{
"name": "spare_0",
"type": "int"
},
{
"name": "pixelTriggerDiscriminatorThreshold",
"type": "int"
},
{
"name": "pixelTriggerChargeDiscriminatorThreshold",
"type": "int"
},
{
"name": "spare_1",
"type": "int"
},
{
"name": "triggerType",
"type": "int"
},
{
"name": "triggerConfig",
"type": "int"
},
{
"name": "PDMs",
"type": {
"type": "array",
"items": {
"name": "PDMBlock",
"type": "record",
"fields": [
{
"name": "pdmVal",
"type": "boolean"
},
{
"name": "trgEnb",
"type": "boolean"
},
{
"name": "trgPdm",
"type": "boolean"
},
{
"name": "evtVal",
"type": "boolean"
},
{
"name": "spare_2",
"type": "int"
},
{
"name": "pdmID",
"type": "int"
},
{
"name": "highgains",
"type": {
"type": "array",
"items": "int"
}
},
{
"name": "lowgains",
"type": {
"type": "array",
"items": "int"
}
},
{
"name": "timeTriggers",
"type": {
"type": "array",
"items": "int"
}
},
{
"name": "hitRegister",
"type": "long"
},
{
"name": "triggerMask",
"type": "long"
},
{
"name": "sipmTemp1",
"type": "int"
},
{
"name": "sipmTemp2",
"type": "int"
},
{
"name": "sipmTemp3",
"type": "int"
},
{
"name": "sipmHighVoltage",
"type": "int"
},
{
"name": "sipmCurrent",
"type": "int"
}
]
}
}
}
]
}
......@@ -16,13 +16,7 @@
* limitations under the License.
*/
/* Auto generated file of the S22 json packet (avrogencpp)*/
#ifndef S22PACKET_H_3377364056__H_
#define S22PACKET_H_3377364056__H_
#pragma once
#include <sstream>
#include "boost/any.hpp"
......@@ -30,246 +24,405 @@
#include <Encoder.hh>
#include <Decoder.hh>
#include <Base_Packet.h>
namespace S22 {
enum triggerTypeEnum {
DISABLE,
INTERNAL,
TOPOLOGICO,
MAJORITY,
};
static const int HIGH_GAINS_SIZE = 64;
static const int LOW_GAINS_SIZE = 64;
static const int TIME_TRIGGERS_SIZE = 64;
static const int NUM_PDM = 37;
struct PDMBlock {
bool pdmVal;
bool triggerEnabled;
bool triggered;
int32_t pdmID;
std::vector<int32_t > highgains;
std::vector<int32_t > lowgains;
PDMBlock() :
pdmVal(bool()),
triggerEnabled(bool()),
triggered(bool()),
pdmID(int32_t()),
highgains(std::vector<int32_t >()),
lowgains(std::vector<int32_t >())
{ }
bool pdmVal;
bool trgEnb;
bool trgPdm;
bool evtVal;
int32_t spare_2;
int32_t pdmID;
std::vector<int32_t> highgains;
std::vector<int32_t> lowgains;
std::vector<int32_t> timeTriggers;
int64_t hitRegister;
int64_t triggerMask;
int32_t sipmTemp1;
int32_t sipmTemp2;
int32_t sipmTemp3;
int32_t sipmHighVoltage;
int32_t sipmCurrent;
PDMBlock() :
pdmVal(bool()), trgEnb(bool()), trgPdm(bool()), evtVal(bool()), spare_2(
int32_t()), pdmID(int32_t()), highgains(
std::vector<int32_t>()), lowgains(std::vector<int32_t>()), timeTriggers(
std::vector<int32_t>()), hitRegister(int64_t()), triggerMask(
int64_t()), sipmTemp1(int32_t()), sipmTemp2(int32_t()), sipmTemp3(
int32_t()), sipmHighVoltage(int32_t()), sipmCurrent(
int32_t()) {
}
};
struct S22 {
int32_t telescopeID;
int32_t type;
int32_t subType;
int32_t ssc;
int32_t year;
int32_t month;
int32_t day;
int32_t hours;
int32_t minutes;
int32_t seconds;
bool validTime;
int32_t timeTagNanosec;
int32_t pixelTriggerDiscriminatorThreshold_pe;
int32_t pixelTriggerChargeDiscriminatorThreshold_pe;
triggerTypeEnum triggerType;
int32_t triggerConfig;
std::vector<PDMBlock > PDMs;
S22() :
telescopeID(int32_t()),
type(int32_t()),
subType(int32_t()),
ssc(int32_t()),
year(int32_t()),
month(int32_t()),
day(int32_t()),
hours(int32_t()),
minutes(int32_t()),
seconds(int32_t()),
validTime(bool()),
timeTagNanosec(int32_t()),
pixelTriggerDiscriminatorThreshold_pe(int32_t()),
pixelTriggerChargeDiscriminatorThreshold_pe(int32_t()),
triggerType(triggerTypeEnum()),
triggerConfig(int32_t()),
PDMs(std::vector<PDMBlock >())
{ }
int32_t telescopeID;
int32_t type;
int32_t subType;
int32_t ssc;
int32_t packetLength;
int32_t year;
int32_t month;
int32_t day;
int32_t hours;
int32_t minutes;
int32_t seconds;
bool validTime;
int32_t timeTagNanosec;
int32_t eventCounter;
bool lid;
bool fibSt;
bool fibCont;
bool fibPuls;
int32_t rgbCont;
int32_t rgbPuls;
int32_t spare_0;
int32_t pixelTriggerDiscriminatorThreshold;
int32_t pixelTriggerChargeDiscriminatorThreshold;
int32_t spare_1;
int32_t triggerType;
int32_t triggerConfig;
std::vector<PDMBlock> PDMs;
S22() :
telescopeID(int32_t()), type(int32_t()), subType(int32_t()), ssc(
int32_t()), packetLength(int32_t()), year(int32_t()), month(
int32_t()), day(int32_t()), hours(int32_t()), minutes(
int32_t()), seconds(int32_t()), validTime(bool()), timeTagNanosec(
int32_t()), eventCounter(int32_t()), lid(bool()), fibSt(
bool()), fibCont(bool()), fibPuls(bool()), rgbCont(
int32_t()), rgbPuls(int32_t()), spare_0(int32_t()), pixelTriggerDiscriminatorThreshold(
int32_t()), pixelTriggerChargeDiscriminatorThreshold(
int32_t()), spare_1(int32_t()), triggerType(int32_t()), triggerConfig(
int32_t()), PDMs(std::vector<PDMBlock>()) {
}
};
}
namespace avro {
template<> struct codec_traits<S22::triggerTypeEnum> {
static void encode(Encoder& e, S22::triggerTypeEnum v) {
if (v < S22::DISABLE || v > S22::MAJORITY)
{
std::ostringstream error;
error << "enum value " << v << " is out of bound for S22::triggerTypeEnum and cannot be encoded";
throw avro::Exception(error.str());
}
e.encodeEnum(v);
}
static void decode(Decoder& d, S22::triggerTypeEnum& v) {
size_t index = d.decodeEnum();
if (index < S22::DISABLE || index > S22::MAJORITY)
{
std::ostringstream error;
error << "enum value " << index << " is out of bound for S22::triggerTypeEnum and cannot be decoded";
throw avro::Exception(error.str());
}
v = static_cast<S22::triggerTypeEnum>(index);
}
};
template<> struct codec_traits<S22::PDMBlock> {
static void encode(Encoder& e, const S22::PDMBlock& v) {
avro::encode(e, v.pdmVal);
avro::encode(e, v.triggerEnabled);
avro::encode(e, v.triggered);
avro::encode(e, v.pdmID);
avro::encode(e, v.highgains);
avro::encode(e, v.lowgains);
}
static void decode(Decoder& d, S22::PDMBlock& v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder *>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.pdmVal);
break;
case 1:
avro::decode(d, v.triggerEnabled);
break;
case 2:
avro::decode(d, v.triggered);
break;
case 3:
avro::decode(d, v.pdmID);
break;
case 4:
avro::decode(d, v.highgains);
break;
case 5:
avro::decode(d, v.lowgains);
break;
default:
break;
}
}
} else {
avro::decode(d, v.pdmVal);
avro::decode(d, v.triggerEnabled);
avro::decode(d, v.triggered);
avro::decode(d, v.pdmID);
avro::decode(d, v.highgains);
avro::decode(d, v.lowgains);
}
}
static void encode(Encoder &e, const S22::PDMBlock &v) {
avro::encode(e, v.pdmVal);
avro::encode(e, v.trgEnb);
avro::encode(e, v.trgPdm);
avro::encode(e, v.evtVal);
avro::encode(e, v.spare_2);
avro::encode(e, v.pdmID);
avro::encode(e, v.highgains);
avro::encode(e, v.lowgains);
avro::encode(e, v.timeTriggers);
avro::encode(e, v.hitRegister);
avro::encode(e, v.triggerMask);
avro::encode(e, v.sipmTemp1);
avro::encode(e, v.sipmTemp2);
avro::encode(e, v.sipmTemp3);
avro::encode(e, v.sipmHighVoltage);
avro::encode(e, v.sipmCurrent);
}
static void decode(Decoder &d, S22::PDMBlock &v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder*>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.pdmVal);
break;
case 1:
avro::decode(d, v.trgEnb);
break;
case 2:
avro::decode(d, v.trgPdm);
break;
case 3:
avro::decode(d, v.evtVal);
break;
case 4:
avro::decode(d, v.spare_2);
break;
case 5:
avro::decode(d, v.pdmID);
break;
case 6:
avro::decode(d, v.highgains);
break;
case 7:
avro::decode(d, v.lowgains);
break;
case 8:
avro::decode(d, v.timeTriggers);
break;
case 9:
avro::decode(d, v.hitRegister);
break;
case 10:
avro::decode(d, v.triggerMask);
break;
case 11:
avro::decode(d, v.sipmTemp1);
break;
case 12:
avro::decode(d, v.sipmTemp2);
break;
case 13:
avro::decode(d, v.sipmTemp3);
break;
case 14:
avro::decode(d, v.sipmHighVoltage);
break;
case 15:
avro::decode(d, v.sipmCurrent);
break;
default:
break;
}
}
} else {
avro::decode(d, v.pdmVal);
avro::decode(d, v.trgEnb);
avro::decode(d, v.trgPdm);
avro::decode(d, v.evtVal);
avro::decode(d, v.spare_2);
avro::decode(d, v.pdmID);
avro::decode(d, v.highgains);
avro::decode(d, v.lowgains);
avro::decode(d, v.timeTriggers);
avro::decode(d, v.hitRegister);
avro::decode(d, v.triggerMask);
avro::decode(d, v.sipmTemp1);
avro::decode(d, v.sipmTemp2);
avro::decode(d, v.sipmTemp3);
avro::decode(d, v.sipmHighVoltage);
avro::decode(d, v.sipmCurrent);
}
}
};
template<> struct codec_traits<S22::S22> {
static void encode(Encoder& e, const S22::S22& v) {
avro::encode(e, v.telescopeID);
avro::encode(e, v.type);
avro::encode(e, v.subType);
avro::encode(e, v.ssc);
avro::encode(e, v.year);
avro::encode(e, v.month);
avro::encode(e, v.day);
avro::encode(e, v.hours);
avro::encode(e, v.minutes);
avro::encode(e, v.seconds);
avro::encode(e, v.validTime);
avro::encode(e, v.timeTagNanosec);
avro::encode(e, v.pixelTriggerDiscriminatorThreshold_pe);
avro::encode(e, v.pixelTriggerChargeDiscriminatorThreshold_pe);
avro::encode(e, v.triggerType);
avro::encode(e, v.triggerConfig);
avro::encode(e, v.PDMs);
}
static void decode(Decoder& d, S22::S22& v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder *>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.telescopeID);
break;
case 1:
avro::decode(d, v.type);
break;
case 2:
avro::decode(d, v.subType);
break;
case 3:
avro::decode(d, v.ssc);
break;
case 4:
avro::decode(d, v.year);
break;
case 5:
avro::decode(d, v.month);
break;
case 6:
avro::decode(d, v.day);
break;
case 7:
avro::decode(d, v.hours);
break;
case 8:
avro::decode(d, v.minutes);
break;
case 9:
avro::decode(d, v.seconds);
break;
case 10:
avro::decode(d, v.validTime);
break;
case 11:
avro::decode(d, v.timeTagNanosec);
break;
case 12:
avro::decode(d, v.pixelTriggerDiscriminatorThreshold_pe);
break;
case 13:
avro::decode(d, v.pixelTriggerChargeDiscriminatorThreshold_pe);
break;
case 14:
avro::decode(d, v.triggerType);
break;
case 15:
avro::decode(d, v.triggerConfig);
break;
case 16:
avro::decode(d, v.PDMs);
break;
default:
break;
}
}
} else {
avro::decode(d, v.telescopeID);
avro::decode(d, v.type);
avro::decode(d, v.subType);
avro::decode(d, v.ssc);
avro::decode(d, v.year);
avro::decode(d, v.month);
avro::decode(d, v.day);
avro::decode(d, v.hours);
avro::decode(d, v.minutes);
avro::decode(d, v.seconds);
avro::decode(d, v.validTime);
avro::decode(d, v.timeTagNanosec);
avro::decode(d, v.pixelTriggerDiscriminatorThreshold_pe);
avro::decode(d, v.pixelTriggerChargeDiscriminatorThreshold_pe);
avro::decode(d, v.triggerType);
avro::decode(d, v.triggerConfig);
avro::decode(d, v.PDMs);
}
}
static void encode(Encoder &e, const S22::S22 &v) {
avro::encode(e, v.telescopeID);
avro::encode(e, v.type);
avro::encode(e, v.subType);
avro::encode(e, v.ssc);
avro::encode(e, v.packetLength);
avro::encode(e, v.year);
avro::encode(e, v.month);
avro::encode(e, v.day);
avro::encode(e, v.hours);
avro::encode(e, v.minutes);
avro::encode(e, v.seconds);
avro::encode(e, v.validTime);
avro::encode(e, v.timeTagNanosec);
avro::encode(e, v.eventCounter);
avro::encode(e, v.lid);
avro::encode(e, v.fibSt);
avro::encode(e, v.fibCont);
avro::encode(e, v.fibPuls);
avro::encode(e, v.rgbCont);
avro::encode(e, v.rgbPuls);
avro::encode(e, v.spare_0);
avro::encode(e, v.pixelTriggerDiscriminatorThreshold);
avro::encode(e, v.pixelTriggerChargeDiscriminatorThreshold);
avro::encode(e, v.spare_1);
avro::encode(e, v.triggerType);
avro::encode(e, v.triggerConfig);
avro::encode(e, v.PDMs);
}
static void decode(Decoder &d, S22::S22 &v) {
if (avro::ResolvingDecoder *rd =
dynamic_cast<avro::ResolvingDecoder*>(&d)) {
const std::vector<size_t> fo = rd->fieldOrder();
for (std::vector<size_t>::const_iterator it = fo.begin();
it != fo.end(); ++it) {
switch (*it) {
case 0:
avro::decode(d, v.telescopeID);
break;
case 1:
avro::decode(d, v.type);
break;
case 2:
avro::decode(d, v.subType);
break;
case 3:
avro::decode(d, v.ssc);
break;
case 4:
avro::decode(d, v.packetLength);
break;
case 5:
avro::decode(d, v.year);
break;
case 6:
avro::decode(d, v.month);
break;
case 7:
avro::decode(d, v.day);
break;
case 8:
avro::decode(d, v.hours);
break;
case 9:
avro::decode(d, v.minutes);
break;
case 10:
avro::decode(d, v.seconds);
break;
case 11:
avro::decode(d, v.validTime);
break;
case 12:
avro::decode(d, v.timeTagNanosec);
break;
case 13:
avro::decode(d, v.eventCounter);
break;
case 14:
avro::decode(d, v.lid);
break;
case 15:
avro::decode(d, v.fibSt);
break;
case 16:
avro::decode(d, v.fibCont);
break;
case 17:
avro::decode(d, v.fibPuls);
break;
case 18:
avro::decode(d, v.rgbCont);
break;
case 19:
avro::decode(d, v.rgbPuls);
break;
case 20:
avro::decode(d, v.spare_0);
break;
case 21:
avro::decode(d, v.pixelTriggerDiscriminatorThreshold);
break;
case 22:
avro::decode(d, v.pixelTriggerChargeDiscriminatorThreshold);
break;
case 23:
avro::decode(d, v.spare_1);
break;
case 24:
avro::decode(d, v.triggerType);
break;
case 25:
avro::decode(d, v.triggerConfig);
break;
case 26:
avro::decode(d, v.PDMs);
break;
default:
break;
}
}
} else {
avro::decode(d, v.telescopeID);
avro::decode(d, v.type);
avro::decode(d, v.subType);
avro::decode(d, v.ssc);
avro::decode(d, v.packetLength);
avro::decode(d, v.year);
avro::decode(d, v.month);
avro::decode(d, v.day);
avro::decode(d, v.hours);
avro::decode(d, v.minutes);
avro::decode(d, v.seconds);
avro::decode(d, v.validTime);
avro::decode(d, v.timeTagNanosec);
avro::decode(d, v.eventCounter);
avro::decode(d, v.lid);
avro::decode(d, v.fibSt);
avro::decode(d, v.fibCont);
avro::decode(d, v.fibPuls);
avro::decode(d, v.rgbCont);
avro::decode(d, v.rgbPuls);
avro::decode(d, v.spare_0);
avro::decode(d, v.pixelTriggerDiscriminatorThreshold);
avro::decode(d, v.pixelTriggerChargeDiscriminatorThreshold);
avro::decode(d, v.spare_1);
avro::decode(d, v.triggerType);
avro::decode(d, v.triggerConfig);
avro::decode(d, v.PDMs);
}
}
};
void encodeS22(Encoder &avroencoder,
inaf::oasbo::PacketLib::BasePacket &packet) {
// PACKET HEADER
int offset;
for (offset = 0; offset < 11; offset++)
avro::encode(avroencoder, (int32_t) packet[offset].value());
avro::encode(avroencoder, (bool) packet[11].value());
avro::encode(avroencoder, (int32_t) packet[12].value());
avro::encode(avroencoder, (int32_t) packet[13].value());
avro::encode(avroencoder, (bool) packet[14].value());
avro::encode(avroencoder, (bool) packet[15].value());
avro::encode(avroencoder, (bool) packet[16].value());
avro::encode(avroencoder, (bool) packet[17].value());
// PACKET DATA FIELD HEADER EXTENSION
for (offset = 18; offset < 26; offset++)
avro::encode(avroencoder, (int32_t) packet[offset].value());
// PACKET SOURCE DATA FIELD
avroencoder.arrayStart();
avroencoder.setItemCount(S22::NUM_PDM);
for (int i = 0; i < S22::NUM_PDM; i++) {
avroencoder.startItem();
avro::encode(avroencoder, (bool) packet[offset++].value());
avro::encode(avroencoder, (bool) packet[offset++].value());
avro::encode(avroencoder, (bool) packet[offset++].value());
avro::encode(avroencoder, (bool) packet[offset++].value());
avro::encode(avroencoder, (int32_t) packet[offset++].value());
avro::encode(avroencoder, (int32_t) packet[offset++].value());
avroencoder.arrayStart();
avroencoder.setItemCount(S22::HIGH_GAINS_SIZE);
for(int i = 0; i < S22::HIGH_GAINS_SIZE; i++){
avroencoder.startItem();
avro::encode(avroencoder, (int32_t) packet[offset+i].value());
}
offset += S22::HIGH_GAINS_SIZE;
avroencoder.arrayEnd();
avroencoder.arrayStart();
avroencoder.setItemCount(S22::LOW_GAINS_SIZE);
for(int i = 0; i < S22::LOW_GAINS_SIZE; i++){
avroencoder.startItem();
avro::encode(avroencoder, (int32_t) packet[offset+i].value());
}
offset += S22::LOW_GAINS_SIZE;
avroencoder.arrayEnd();
avroencoder.arrayStart();
avroencoder.setItemCount(S22::TIME_TRIGGERS_SIZE);
for(int i = 0; i < S22::TIME_TRIGGERS_SIZE; i++){
avroencoder.startItem();
avro::encode(avroencoder, (int32_t) packet[offset+i].value());
}
offset += S22::TIME_TRIGGERS_SIZE;
avroencoder.arrayEnd();
avro::encode(avroencoder, (long) packet[offset++].value());
avro::encode(avroencoder, (long) packet[offset++].value());
avro::encode(avroencoder, (int32_t) packet[offset++].value());
avro::encode(avroencoder, (int32_t) packet[offset++].value());
avro::encode(avroencoder, (int32_t) packet[offset++].value());
avro::encode(avroencoder, (int32_t) packet[offset++].value());
avro::encode(avroencoder, (int32_t) packet[offset++].value());
}
avroencoder.arrayEnd();
}
}
#endif
......@@ -4,11 +4,14 @@
#include <unistd.h>
#include <ctime>
#include <iomanip>
#include <Specific.hh>
#include <Encoder.hh>
#include <avroPackets/S22Packet.h>
using namespace inaf::oasbo::Providers;
#define NUM_PDM 37
KafkaAvroProvider::KafkaAvroProvider() :
KafkaAvroProvider("127.0.0.1", 9092, "Astri_ADAS_topic") {
......@@ -18,9 +21,6 @@ KafkaAvroProvider::KafkaAvroProvider(std::string ip, int port,
std::string topic_name) :
brokerIp(ip), brokerPort(port) {
setDest(topic_name);
pdms.resize(NUM_PDM);
for (int i = 0; i < NUM_PDM; i++)
pdms[i] = i;
}
void KafkaAvroProvider::pollingThreadFunction() {
......@@ -63,7 +63,6 @@ int KafkaAvroProvider::write(PacketLib::BasePacket &packet, std::string dest) {
std::vector<uint8_t> payload;
auto v = avro::snapshot(*bin_os);
payload = std::move(*v);
int attempts = 0;
RdKafka::ErrorCode err;
while (attempts < 5) { //try to deliver messagr 5 times, then report error.
......@@ -156,17 +155,20 @@ int KafkaAvroProvider::close() {
<< "Flushing kafka queue in 10 seconds.." << std::endl;
producer->flush(10 * 1000);
if (producer->outq_len() > 0)
if (producer->outq_len() > 0) {
now = time(nullptr);
std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t" << "" << producer->outq_len()
<< " message(s) were not delivered" << std::endl;
std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t" << "" << producer->outq_len()
<< " message(s) were not delivered" << std::endl;
}
this->stopFlag = true; // Stopping the polling thread
if (this->pollThread.joinable())
this->pollThread.join();
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t"
<< "Closed" << std::endl;
delete producer;
producer = nullptr;
delete dr_cb;
......@@ -186,7 +188,7 @@ void KafkaAvroProvider::encodeToAvro(PacketLib::BasePacket &packet) {
auto type = packet["Packet Type"].value();
auto subtype = packet["Packet SubType"].value();
if (type == 2 && subtype == 2) {
encodeS22(packet);
avro::encodeS22(*avroencoder,packet);
return;
}
if (type == 10 && subtype == 1) {
......@@ -222,73 +224,5 @@ void KafkaAvroProvider::encodeToAvro(PacketLib::BasePacket &packet) {
}
}
#define HIGH_GAINS_SIZE 64
#define LOW_GAINS_SIZE 64
void KafkaAvroProvider::encodeS22(PacketLib::BasePacket &packet) {
// AVRO PACKET INIT
S22::S22 s22;
// PACKET HEADER
s22.telescopeID = packet[0].value();
s22.type = packet[1].value();
s22.subType = packet[2].value();
s22.ssc = packet[3].value();
// PACKET DATA FIELD HEADER
s22.year = packet[5].value();
s22.month = packet[6].value();
s22.day = packet[7].value();
s22.hours = packet[8].value();
s22.minutes = packet[9].value();
s22.seconds = packet[10].value();
s22.validTime = packet[11].value();
s22.timeTagNanosec = packet[12].value();
// PACKET DATA FIELD HEADER EXTENSION
s22.pixelTriggerDiscriminatorThreshold_pe = packet[21].value();
s22.pixelTriggerChargeDiscriminatorThreshold_pe = packet[22].value();
switch (packet[24].value()) {
case 0:
s22.triggerType = S22::DISABLE;
break;
case 1:
s22.triggerType = S22::INTERNAL;
break;
case 2:
s22.triggerType = S22::TOPOLOGICO;
break;
case 3:
s22.triggerType = S22::MAJORITY;
break;
default:
s22.triggerType = S22::DISABLE;
}
s22.triggerConfig = packet[25].value();
// PACKET SOURCE DATA FIELD
S22::PDMBlock pdmBlock;
std::for_each(pdms.begin(), pdms.end(),
[&pdmBlock, &packet, &s22](int &i) {
uint offset = packet.getPacketStructure().indexOfField(
"pdm val_#" + std::to_string(i)).value();
pdmBlock.pdmVal = packet[offset++].value();
pdmBlock.triggerEnabled = packet[offset++].value();
pdmBlock.triggered = packet[offset].value();
offset += 3;
pdmBlock.pdmID = packet[offset++].value();
std::for_each(packet.begin() + offset,
packet.begin() + offset + HIGH_GAINS_SIZE,
[&pdmBlock](size_t val) {
pdmBlock.highgains.push_back(val);
});
std::for_each(packet.begin() + offset,
packet.begin() + offset + LOW_GAINS_SIZE,
[&pdmBlock](size_t val) {
pdmBlock.lowgains.push_back(val);
});
s22.PDMs.push_back(pdmBlock);
});
avro::encode(*avroencoder, s22);
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment