Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
K
Kafka Avro Provider
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
BIAS
Providers
Kafka Avro Provider
Commits
71324ef6
Commit
71324ef6
authored
1 year ago
by
Valerio Pastore
Browse files
Options
Downloads
Patches
Plain Diff
update for log
parent
fce2fdaf
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
deps/Base-DAQ
+1
-1
1 addition, 1 deletion
deps/Base-DAQ
deps/avro
+1
-1
1 addition, 1 deletion
deps/avro
src/KafkaAvro_Provider.cpp
+105
-77
105 additions, 77 deletions
src/KafkaAvro_Provider.cpp
with
107 additions
and
79 deletions
Base-DAQ
@
f8149d28
Compare
ce99aaa2
...
f8149d28
Subproject commit
ce99aaa20706cca85200a375cee63d7414cebb74
Subproject commit
f8149d2831d10e7f467d96ee11335f01bfad81b5
This diff is collapsed.
Click to expand it.
avro
@
ccadb585
Compare
5847c1d9
...
ccadb585
Subproject commit
5847c1d9effa069d87123f45afe2f3c120b86716
Subproject commit
ccadb5851c7537c8428ddd1473a188f07a986212
This diff is collapsed.
Click to expand it.
src/KafkaAvro_Provider.cpp
+
105
−
77
View file @
71324ef6
...
...
@@ -2,6 +2,9 @@
#include
<memory>
#include
<chrono>
#include
<unistd.h>
#include
<ctime>
#include
<iomanip>
#include
<avroPackets/S22Packet.h>
using
namespace
inaf
::
oasbo
::
Providers
;
...
...
@@ -11,22 +14,29 @@ KafkaAvroProvider::KafkaAvroProvider() :
KafkaAvroProvider
(
"127.0.0.1"
,
9092
,
"Astri_ADAS_topic"
)
{
}
KafkaAvroProvider
::
KafkaAvroProvider
(
std
::
string
ip
,
int
port
,
std
::
string
topic_name
)
:
KafkaAvroProvider
::
KafkaAvroProvider
(
std
::
string
ip
,
int
port
,
std
::
string
topic_name
)
:
brokerIp
(
ip
),
brokerPort
(
port
)
{
setDest
(
topic_name
);
pdms
.
resize
(
NUM_PDM
);
for
(
int
i
=
0
;
i
<
NUM_PDM
;
i
++
)
for
(
int
i
=
0
;
i
<
NUM_PDM
;
i
++
)
pdms
[
i
]
=
i
;
}
}
void
KafkaAvroProvider
::
pollingThreadFunction
()
{
std
::
cout
<<
"Kafka Avro Provider: polling thread start..."
<<
std
::
endl
;
time_t
now
=
time
(
nullptr
);
std
::
cout
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
"polling thread start..."
<<
std
::
endl
;
while
(
!
this
->
stopFlag
)
{
if
(
isOpen
())
this
->
producer
->
poll
(
0
);
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
seconds
(
1
));
// pool every second
}
std
::
cout
<<
"Kafka Avro Provider: polling thread exit..."
<<
std
::
endl
;
now
=
time
(
nullptr
);
std
::
cout
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
"polling thread exit..."
<<
std
::
endl
;
}
int
KafkaAvroProvider
::
write
(
PacketLib
::
BasePacket
&
packet
)
{
...
...
@@ -35,10 +45,13 @@ int KafkaAvroProvider::write(PacketLib::BasePacket &packet) {
int
KafkaAvroProvider
::
write
(
PacketLib
::
BasePacket
&
packet
,
std
::
string
dest
)
{
if
(
!
isOpen
())
{
std
::
cerr
<<
"Kafka Avro provider: provider not open"
<<
std
::
endl
;
time_t
now
=
time
(
nullptr
);
std
::
cerr
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
"provider not open"
<<
std
::
endl
;
return
-
1
;
}
// Reset Avro encoder state
// Reset Avro encoder state
this
->
avroencoder
=
avro
::
binaryEncoder
();
auto
bin_os
=
avro
::
memoryOutputStream
();
this
->
avroencoder
->
init
(
*
bin_os
);
...
...
@@ -62,18 +75,19 @@ int KafkaAvroProvider::write(PacketLib::BasePacket &packet, std::string dest) {
/* Make a copy of the value */
RdKafka
::
Producer
::
RK_MSG_COPY
/* Copy payload */
,
/* Value */
(
char
*
)
payload
.
data
(),
payload
.
size
(),
NULL
,
0
,
0
,
NULL
);
(
char
*
)
payload
.
data
(),
payload
.
size
(),
NULL
,
0
,
0
,
NULL
);
if
(
err
==
RdKafka
::
ERR_NO_ERROR
)
return
1
;
attempts
+=
1
;
}
std
::
cerr
<<
"Kafka Avro Provider: Failed to produce to topic "
<<
dest
<<
": "
<<
err
<<
std
::
endl
;
time_t
now
=
time
(
nullptr
);
std
::
cerr
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
" Failed to produce to topic "
<<
dest
<<
": "
<<
err
<<
std
::
endl
;
return
-
1
;
}
...
...
@@ -87,21 +101,27 @@ int KafkaAvroProvider::open() {
std
::
string
(
this
->
brokerIp
).
append
(
":"
).
append
(
std
::
to_string
(
this
->
brokerPort
)),
errstr
)
!=
RdKafka
::
Conf
::
CONF_OK
)
{
std
::
cerr
<<
errstr
<<
std
::
endl
;
time_t
now
=
time
(
nullptr
);
std
::
cerr
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
errstr
<<
std
::
endl
;
delete
conf
;
return
-
1
;
}
this
->
dr_cb
=
new
DeliveryReportCb
();
if
(
conf
->
set
(
"dr_cb"
,
dr_cb
,
errstr
)
!=
RdKafka
::
Conf
::
CONF_OK
)
{
std
::
cerr
<<
errstr
<<
std
::
endl
;
time_t
now
=
time
(
nullptr
);
std
::
cerr
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
errstr
<<
std
::
endl
;
delete
conf
;
return
-
1
;
}
this
->
producer
=
RdKafka
::
Producer
::
create
(
conf
,
errstr
);
if
(
!
this
->
producer
)
{
this
->
producer
=
nullptr
;
std
::
cerr
<<
"Kafka Avro Provider: Failed to create producer: "
<<
errstr
<<
std
::
endl
;
time_t
now
=
time
(
nullptr
);
std
::
cerr
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
" Failed to create producer: "
<<
errstr
<<
std
::
endl
;
delete
conf
;
return
-
1
;
}
...
...
@@ -112,31 +132,39 @@ int KafkaAvroProvider::open() {
RdKafka
::
ErrorCode
err
=
this
->
producer
->
metadata
(
true
,
NULL
,
&
metadata
,
5000
);
if
(
err
!=
RdKafka
::
ERR_NO_ERROR
||
metadata
->
brokers
()
->
size
()
==
0
)
{
std
::
cerr
<<
"Kafka Avro Provider: No brokers available"
<<
std
::
endl
;
time_t
now
=
time
(
nullptr
);
std
::
cerr
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
"No brokers available"
<<
std
::
endl
;
delete
metadata
;
this
->
producer
=
nullptr
;
return
-
1
;
}
delete
metadata
;
this
->
stopFlag
=
false
;
this
->
pollThread
=
std
::
thread
(
&
KafkaAvroProvider
::
pollingThreadFunction
,
this
);
// start polling
this
->
pollThread
=
std
::
thread
(
&
KafkaAvroProvider
::
pollingThreadFunction
,
this
);
// start polling
return
1
;
}
int
KafkaAvroProvider
::
close
()
{
if
(
!
isOpen
())
return
1
;
std
::
cout
<<
"Kafka Avro Provider: Flushing kafka queue in 10 seconds.."
<<
std
::
endl
;
time_t
now
=
time
(
nullptr
);
std
::
cout
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
"Flushing kafka queue in 10 seconds.."
<<
std
::
endl
;
producer
->
flush
(
10
*
1000
);
if
(
producer
->
outq_len
()
>
0
)
std
::
cerr
<<
"Kafka Avro Provider: Warning, "
<<
producer
->
outq_len
()
<<
" message(s) were not delivered"
<<
std
::
endl
;
now
=
time
(
nullptr
);
std
::
cerr
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
""
<<
producer
->
outq_len
()
<<
" message(s) were not delivered"
<<
std
::
endl
;
this
->
stopFlag
=
true
;
// Stopping the polling thread
if
(
this
->
pollThread
.
joinable
())
if
(
this
->
pollThread
.
joinable
())
this
->
pollThread
.
join
();
delete
producer
;
...
...
@@ -154,47 +182,49 @@ KafkaAvroProvider::~KafkaAvroProvider() {
close
();
}
void
KafkaAvroProvider
::
encodeToAvro
(
PacketLib
::
BasePacket
&
packet
){
void
KafkaAvroProvider
::
encodeToAvro
(
PacketLib
::
BasePacket
&
packet
)
{
auto
type
=
packet
[
"Packet Type"
].
value
();
auto
subtype
=
packet
[
"Packet SubType"
].
value
();
if
(
type
==
2
&&
subtype
==
2
){
if
(
type
==
2
&&
subtype
==
2
)
{
encodeS22
(
packet
);
return
;
}
if
(
type
==
10
&&
subtype
==
1
){
if
(
type
==
10
&&
subtype
==
1
)
{
// encodeHK101(packet);
return
;
}
if
(
type
==
10
&&
subtype
==
2
){
if
(
type
==
10
&&
subtype
==
2
)
{
// encodeVAR102(packet);
return
;
}
if
(
type
==
10
&&
subtype
==
3
){
if
(
type
==
10
&&
subtype
==
3
)
{
// encodeVAR103(packet);
return
;
}
if
(
type
==
1
&&
subtype
==
1
){
if
(
type
==
1
&&
subtype
==
1
)
{
// encodeC11(packet);
return
;
}
if
(
type
==
1
&&
subtype
==
4
){
if
(
type
==
1
&&
subtype
==
4
)
{
// encodeC14(packet);
return
;
}
if
(
type
==
15
&&
subtype
==
1
){
if
(
type
==
15
&&
subtype
==
1
)
{
// encodeCMD151(packet);
return
;
}
else
{
std
::
cerr
<<
"Kafka Avro Provider: encoding to avro error -> no known packet: "
<<
type
<<
"_"
<<
subtype
<<
std
::
endl
;
}
else
{
time_t
now
=
time
(
nullptr
);
std
::
cerr
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
" to avro error -> no known packet: "
<<
type
<<
"_"
<<
subtype
<<
std
::
endl
;
return
;
}
}
#define HIGH_GAINS_SIZE 64
#define LOW_GAINS_SIZE 64
void
KafkaAvroProvider
::
encodeS22
(
PacketLib
::
BasePacket
&
packet
){
void
KafkaAvroProvider
::
encodeS22
(
PacketLib
::
BasePacket
&
packet
)
{
// AVRO PACKET INIT
S22
::
S22
s22
;
// PACKET HEADER
...
...
@@ -205,62 +235,60 @@ void KafkaAvroProvider::encodeS22(PacketLib::BasePacket& packet){
// PACKET DATA FIELD HEADER
s22
.
year
=
packet
[
5
].
value
();
s22
.
month
=
packet
[
6
].
value
();
s22
.
month
=
packet
[
6
].
value
();
s22
.
day
=
packet
[
7
].
value
();
s22
.
hours
=
packet
[
8
].
value
();
s22
.
minutes
=
packet
[
9
].
value
();
s22
.
seconds
=
packet
[
10
].
value
();
s22
.
validTime
=
packet
[
11
].
value
();
s22
.
timeTagNanosec
=
packet
[
12
].
value
();
s22
.
timeTagNanosec
=
packet
[
12
].
value
();
// PACKET DATA FIELD HEADER EXTENSION
s22
.
pixelTriggerDiscriminatorThreshold_pe
=
packet
[
21
].
value
();
s22
.
pixelTriggerChargeDiscriminatorThreshold_pe
=
packet
[
22
].
value
();
switch
(
packet
[
24
].
value
()){
case
0
:
s22
.
triggerType
=
S22
::
DISABLE
;
break
;
case
1
:
s22
.
triggerType
=
S22
::
INTERNAL
;
break
;
case
2
:
s22
.
triggerType
=
S22
::
TOPOLOGICO
;
break
;
case
3
:
s22
.
triggerType
=
S22
::
MAJORITY
;
break
;
default:
s22
.
triggerType
=
S22
::
DISABLE
;
switch
(
packet
[
24
].
value
())
{
case
0
:
s22
.
triggerType
=
S22
::
DISABLE
;
break
;
case
1
:
s22
.
triggerType
=
S22
::
INTERNAL
;
break
;
case
2
:
s22
.
triggerType
=
S22
::
TOPOLOGICO
;
break
;
case
3
:
s22
.
triggerType
=
S22
::
MAJORITY
;
break
;
default:
s22
.
triggerType
=
S22
::
DISABLE
;
}
s22
.
triggerConfig
=
packet
[
25
].
value
();
// PACKET SOURCE DATA FIELD
S22
::
PDMBlock
pdmBlock
;
std
::
for_each
(
pdms
.
begin
(),
pdms
.
end
(),[
&
pdmBlock
,
&
packet
,
&
s22
](
int
&
i
)
{
uint
offset
=
packet
.
getPacketStructure
().
indexOfField
(
"pdm val_#"
+
std
::
to_string
(
i
)).
value
();
pdmBlock
.
pdmVal
=
packet
[
offset
++
].
value
();
pdmBlock
.
triggerEnabled
=
packet
[
offset
++
].
value
();
pdmBlock
.
triggered
=
packet
[
offset
].
value
();
offset
+=
3
;
pdmBlock
.
pdmID
=
packet
[
offset
++
].
value
();
std
::
for_each
(
packet
.
begin
()
+
offset
,
packet
.
begin
()
+
offset
+
HIGH_GAINS_SIZE
,
[
&
pdmBlock
](
size_t
val
)
{
pdmBlock
.
highgains
.
push_back
(
val
);
});
std
::
for_each
(
packet
.
begin
()
+
offset
,
packet
.
begin
()
+
offset
+
LOW_GAINS_SIZE
,
[
&
pdmBlock
](
size_t
val
)
{
pdmBlock
.
lowgains
.
push_back
(
val
);
});
s22
.
PDMs
.
push_back
(
pdmBlock
);
});
std
::
for_each
(
pdms
.
begin
(),
pdms
.
end
(),
[
&
pdmBlock
,
&
packet
,
&
s22
](
int
&
i
)
{
uint
offset
=
packet
.
getPacketStructure
().
indexOfField
(
"pdm val_#"
+
std
::
to_string
(
i
)).
value
();
pdmBlock
.
pdmVal
=
packet
[
offset
++
].
value
();
pdmBlock
.
triggerEnabled
=
packet
[
offset
++
].
value
();
pdmBlock
.
triggered
=
packet
[
offset
].
value
();
offset
+=
3
;
pdmBlock
.
pdmID
=
packet
[
offset
++
].
value
();
std
::
for_each
(
packet
.
begin
()
+
offset
,
packet
.
begin
()
+
offset
+
HIGH_GAINS_SIZE
,
[
&
pdmBlock
](
size_t
val
)
{
pdmBlock
.
highgains
.
push_back
(
val
);
});
std
::
for_each
(
packet
.
begin
()
+
offset
,
packet
.
begin
()
+
offset
+
LOW_GAINS_SIZE
,
[
&
pdmBlock
](
size_t
val
)
{
pdmBlock
.
lowgains
.
push_back
(
val
);
});
s22
.
PDMs
.
push_back
(
pdmBlock
);
});
avro
::
encode
(
*
avroencoder
,
s22
);
}
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment