Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • bias/providers/kafka-avro-provider
1 result
Show changes
Commits on Source (1)
......@@ -23,10 +23,8 @@ protected:
//Kafka
RdKafka::Producer *producer = nullptr; /**< Pointer to the Kafka producer object. */
std::atomic<bool> stopFlag = false; /**< Atomic flag to indicate if the provider should stop. */
std::thread pollThread; /**< Thread for polling Kafka events. */
bool pollThreadFlag = false; /**< Flag to indicate if the poll thread is running. */
std::mutex closeMutex; /**< Mutex for thread-safe closing of the provider. */
bool _open = false; /**< internal variable to check the state of the provider */
class KafkaDeliveryReportCb; /**< Forward declaration of the KafkaDeliveryReportCb class. */
KafkaDeliveryReportCb *dr_cb = nullptr; /**< Pointer to the Kafka delivery report callback object. */
......@@ -38,7 +36,8 @@ protected:
KafkaAvroProvider(); /**< Default constructor. */
KafkaAvroProvider(std::string ip, int port, std::string topic); /**< Constructor with parameters. */
void _close(); /**< Performs the "privates" closing operations. */
void flush(); /**< Performs the flush of queued packets */
/**
* @brief Function for the polling thread.
*/
......
......@@ -25,14 +25,13 @@ public:
this->prov = prov;
}
void event_cb(RdKafka::Event &event) {
if(!this->prov->isOpen()) return;
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) {
time_t now = time(nullptr);
std::cerr << "["
<< std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t"
<< "All brokers are down, closing..." << std::endl;
prov->close();
this->prov->_close();
}
}
};
......@@ -63,13 +62,7 @@ KafkaAvroProvider::KafkaAvroProvider(std::string ip, int port,
}
KafkaAvroProvider::~KafkaAvroProvider() {
this->stopFlag = true; // Stopping the polling thread
if (pollThreadFlag) {
if (this->pollThread.joinable()) {
pollThread.join();
}
}
close();
_close();
}
int KafkaAvroProvider::open() {
......@@ -130,64 +123,73 @@ int KafkaAvroProvider::open() {
this->producer = nullptr;
return -1;
}
time_t now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t" << "Connected" << std::endl;
delete metadata;
this->stopFlag = false;
if (!pollThreadFlag) {
this->pollThread = std::thread(
&KafkaAvroProvider::pollingThreadFunction, this); // start polling
this->pollThread.detach();
pollThreadFlag = true;
}
this->_open = true;
time_t now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t" << "Connected" << std::endl;
return 1;
}
int KafkaAvroProvider::close() {
if (!isOpen())
if (!isOpen()) {
return 1;
if (this->closeMutex.try_lock()) {
time_t now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t"
<< "Flushing kafka queue in 10 seconds.." << std::endl;
producer->flush(10 * 1000);
}
this->flush();
this->_close();
time_t now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t" << "Closed" << std::endl;
return 1;
if (producer->outq_len() > 0) {
now = time(nullptr);
std::cerr << "["
<< std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t" << "" << producer->outq_len()
<< " message(s) were not delivered" << std::endl;
}
}
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t" << "Closed" << std::endl;
delete producer;
producer = nullptr;
this->_open = false;
this->closeMutex.unlock();
return 1;
void KafkaAvroProvider::flush(){
time_t now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t"
<< "Flushing kafka queue in 10 seconds.." << std::endl;
producer->flush(10 * 1000);
if (producer->outq_len() > 0) {
now = time(nullptr);
std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t" << "" << producer->outq_len()
<< " message(s) were not delivered" << std::endl;
}
return -1;
}
void KafkaAvroProvider::_close() {
this->pollThreadFlag = false;
this->_open = false;
delete producer;
producer = nullptr;
}
bool KafkaAvroProvider::isOpen() {
return this->_open;
}
void KafkaAvroProvider::pollingThreadFunction() {
producer->poll(1); // pool previous events
time_t now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t" << "polling thread start..."
<< std::endl;
while (!this->stopFlag) {
while (this->pollThreadFlag) {
if (isOpen()) {
this->producer->poll(0);
}
std::this_thread::sleep_for(std::chrono::seconds(1)); // pool every second
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // pool ten times at second
}
now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
......@@ -246,7 +248,7 @@ int KafkaAvroProvider::write(Packets::BasePacket &packet, std::string dest) {
}
time_t now = time(nullptr);
std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t" << " Failed to produce to topic "
<< "]\t[KafkaAvro Provider]\t" << "Failed to produce to topic "
<< dest << ": " << err << std::endl;
return -1;
}
......