Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
K
Kafka Avro Provider
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
BIAS
Providers
Kafka Avro Provider
Compare revisions
0f9ac408b8903f60f43ca5b113d8a96d0f53d562 to 500c92328f7afe11a44a939f7cc08d17495c8570
Compare revisions
Changes are shown as if the
source
revision was being merged into the
target
revision.
Learn more about comparing revisions.
Source
bias/providers/kafka-avro-provider
Select target project
No results found
500c92328f7afe11a44a939f7cc08d17495c8570
Select Git revision
Swap
Target
bias/providers/kafka-avro-provider
Select target project
bias/providers/kafka-avro-provider
1 result
0f9ac408b8903f60f43ca5b113d8a96d0f53d562
Select Git revision
Show changes
Only incoming changes from source
Include changes to target since source was created
Compare
Commits on Source (1)
connection improvements
· 500c9232
Valerio Pastore
authored
1 year ago
500c9232
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
include/KafkaAvro_Provider.h
+2
-3
2 additions, 3 deletions
include/KafkaAvro_Provider.h
src/KafkaAvro_Provider.cpp
+40
-38
40 additions, 38 deletions
src/KafkaAvro_Provider.cpp
with
42 additions
and
41 deletions
include/KafkaAvro_Provider.h
View file @
500c9232
...
...
@@ -23,10 +23,8 @@ protected:
//Kafka
RdKafka
::
Producer
*
producer
=
nullptr
;
/**< Pointer to the Kafka producer object. */
std
::
atomic
<
bool
>
stopFlag
=
false
;
/**< Atomic flag to indicate if the provider should stop. */
std
::
thread
pollThread
;
/**< Thread for polling Kafka events. */
bool
pollThreadFlag
=
false
;
/**< Flag to indicate if the poll thread is running. */
std
::
mutex
closeMutex
;
/**< Mutex for thread-safe closing of the provider. */
bool
_open
=
false
;
/**< internal variable to check the state of the provider */
class
KafkaDeliveryReportCb
;
/**< Forward declaration of the KafkaDeliveryReportCb class. */
KafkaDeliveryReportCb
*
dr_cb
=
nullptr
;
/**< Pointer to the Kafka delivery report callback object. */
...
...
@@ -38,7 +36,8 @@ protected:
KafkaAvroProvider
();
/**< Default constructor. */
KafkaAvroProvider
(
std
::
string
ip
,
int
port
,
std
::
string
topic
);
/**< Constructor with parameters. */
void
_close
();
/**< Performs the "privates" closing operations. */
void
flush
();
/**< Performs the flush of queued packets */
/**
* @brief Function for the polling thread.
*/
...
...
This diff is collapsed.
Click to expand it.
src/KafkaAvro_Provider.cpp
View file @
500c9232
...
...
@@ -25,14 +25,13 @@ public:
this
->
prov
=
prov
;
}
void
event_cb
(
RdKafka
::
Event
&
event
)
{
if
(
!
this
->
prov
->
isOpen
())
return
;
if
(
event
.
err
()
==
RdKafka
::
ERR__ALL_BROKERS_DOWN
)
{
time_t
now
=
time
(
nullptr
);
std
::
cerr
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
"All brokers are down, closing..."
<<
std
::
endl
;
prov
->
close
();
this
->
prov
->
_
close
();
}
}
};
...
...
@@ -63,13 +62,7 @@ KafkaAvroProvider::KafkaAvroProvider(std::string ip, int port,
}
KafkaAvroProvider
::~
KafkaAvroProvider
()
{
this
->
stopFlag
=
true
;
// Stopping the polling thread
if
(
pollThreadFlag
)
{
if
(
this
->
pollThread
.
joinable
())
{
pollThread
.
join
();
}
}
close
();
_close
();
}
int
KafkaAvroProvider
::
open
()
{
...
...
@@ -130,64 +123,73 @@ int KafkaAvroProvider::open() {
this
->
producer
=
nullptr
;
return
-
1
;
}
time_t
now
=
time
(
nullptr
);
std
::
cout
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
"Connected"
<<
std
::
endl
;
delete
metadata
;
this
->
stopFlag
=
false
;
if
(
!
pollThreadFlag
)
{
this
->
pollThread
=
std
::
thread
(
&
KafkaAvroProvider
::
pollingThreadFunction
,
this
);
// start polling
this
->
pollThread
.
detach
();
pollThreadFlag
=
true
;
}
this
->
_open
=
true
;
time_t
now
=
time
(
nullptr
);
std
::
cout
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
"Connected"
<<
std
::
endl
;
return
1
;
}
int
KafkaAvroProvider
::
close
()
{
if
(
!
isOpen
())
if
(
!
isOpen
())
{
return
1
;
if
(
this
->
closeMutex
.
try_lock
())
{
time_t
now
=
time
(
nullptr
);
std
::
cout
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
"Flushing kafka queue in 10 seconds.."
<<
std
::
endl
;
producer
->
flush
(
10
*
1000
);
}
this
->
flush
();
this
->
_close
();
time_t
now
=
time
(
nullptr
);
std
::
cout
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
"Closed"
<<
std
::
endl
;
return
1
;
if
(
producer
->
outq_len
()
>
0
)
{
now
=
time
(
nullptr
);
std
::
cerr
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
""
<<
producer
->
outq_len
()
<<
" message(s) were not delivered"
<<
std
::
endl
;
}
}
std
::
cout
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
"Closed"
<<
std
::
endl
;
delete
producer
;
producer
=
nullptr
;
this
->
_open
=
false
;
this
->
closeMutex
.
unlock
();
return
1
;
void
KafkaAvroProvider
::
flush
(){
time_t
now
=
time
(
nullptr
);
std
::
cout
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
"Flushing kafka queue in 10 seconds.."
<<
std
::
endl
;
producer
->
flush
(
10
*
1000
);
if
(
producer
->
outq_len
()
>
0
)
{
now
=
time
(
nullptr
);
std
::
cerr
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
""
<<
producer
->
outq_len
()
<<
" message(s) were not delivered"
<<
std
::
endl
;
}
return
-
1
;
}
void
KafkaAvroProvider
::
_close
()
{
this
->
pollThreadFlag
=
false
;
this
->
_open
=
false
;
delete
producer
;
producer
=
nullptr
;
}
bool
KafkaAvroProvider
::
isOpen
()
{
return
this
->
_open
;
}
void
KafkaAvroProvider
::
pollingThreadFunction
()
{
producer
->
poll
(
1
);
// pool previous events
time_t
now
=
time
(
nullptr
);
std
::
cout
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
"polling thread start..."
<<
std
::
endl
;
while
(
!
this
->
stop
Flag
)
{
while
(
this
->
pollThread
Flag
)
{
if
(
isOpen
())
{
this
->
producer
->
poll
(
0
);
}
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
seconds
(
1
));
// pool
every
second
std
::
this_thread
::
sleep_for
(
std
::
chrono
::
milli
seconds
(
1
00
));
// pool
ten times at
second
}
now
=
time
(
nullptr
);
std
::
cout
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
...
...
@@ -246,7 +248,7 @@ int KafkaAvroProvider::write(Packets::BasePacket &packet, std::string dest) {
}
time_t
now
=
time
(
nullptr
);
std
::
cerr
<<
"["
<<
std
::
put_time
(
localtime
(
&
now
),
"%Y-%m-%d %H:%M:%S"
)
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
"
Failed to produce to topic "
<<
"]
\t
[KafkaAvro Provider]
\t
"
<<
"Failed to produce to topic "
<<
dest
<<
": "
<<
err
<<
std
::
endl
;
return
-
1
;
}
...
...
This diff is collapsed.
Click to expand it.