Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • bias/providers/kafka-avro-provider
1 result
Show changes
Commits on Source (1)
...@@ -23,10 +23,8 @@ protected: ...@@ -23,10 +23,8 @@ protected:
//Kafka //Kafka
RdKafka::Producer *producer = nullptr; /**< Pointer to the Kafka producer object. */ RdKafka::Producer *producer = nullptr; /**< Pointer to the Kafka producer object. */
std::atomic<bool> stopFlag = false; /**< Atomic flag to indicate if the provider should stop. */
std::thread pollThread; /**< Thread for polling Kafka events. */ std::thread pollThread; /**< Thread for polling Kafka events. */
bool pollThreadFlag = false; /**< Flag to indicate if the poll thread is running. */ bool pollThreadFlag = false; /**< Flag to indicate if the poll thread is running. */
std::mutex closeMutex; /**< Mutex for thread-safe closing of the provider. */
bool _open = false; /**< internal variable to check the state of the provider */ bool _open = false; /**< internal variable to check the state of the provider */
class KafkaDeliveryReportCb; /**< Forward declaration of the KafkaDeliveryReportCb class. */ class KafkaDeliveryReportCb; /**< Forward declaration of the KafkaDeliveryReportCb class. */
KafkaDeliveryReportCb *dr_cb = nullptr; /**< Pointer to the Kafka delivery report callback object. */ KafkaDeliveryReportCb *dr_cb = nullptr; /**< Pointer to the Kafka delivery report callback object. */
...@@ -38,7 +36,8 @@ protected: ...@@ -38,7 +36,8 @@ protected:
KafkaAvroProvider(); /**< Default constructor. */ KafkaAvroProvider(); /**< Default constructor. */
KafkaAvroProvider(std::string ip, int port, std::string topic); /**< Constructor with parameters. */ KafkaAvroProvider(std::string ip, int port, std::string topic); /**< Constructor with parameters. */
void _close(); /**< Performs the "privates" closing operations. */
void flush(); /**< Performs the flush of queued packets */
/** /**
* @brief Function for the polling thread. * @brief Function for the polling thread.
*/ */
......
...@@ -25,14 +25,13 @@ public: ...@@ -25,14 +25,13 @@ public:
this->prov = prov; this->prov = prov;
} }
void event_cb(RdKafka::Event &event) { void event_cb(RdKafka::Event &event) {
if(!this->prov->isOpen()) return;
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) { if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) {
time_t now = time(nullptr); time_t now = time(nullptr);
std::cerr << "[" std::cerr << "["
<< std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t" << "]\t[KafkaAvro Provider]\t"
<< "All brokers are down, closing..." << std::endl; << "All brokers are down, closing..." << std::endl;
prov->close(); this->prov->_close();
} }
} }
}; };
...@@ -63,13 +62,7 @@ KafkaAvroProvider::KafkaAvroProvider(std::string ip, int port, ...@@ -63,13 +62,7 @@ KafkaAvroProvider::KafkaAvroProvider(std::string ip, int port,
} }
KafkaAvroProvider::~KafkaAvroProvider() { KafkaAvroProvider::~KafkaAvroProvider() {
this->stopFlag = true; // Stopping the polling thread _close();
if (pollThreadFlag) {
if (this->pollThread.joinable()) {
pollThread.join();
}
}
close();
} }
int KafkaAvroProvider::open() { int KafkaAvroProvider::open() {
...@@ -130,64 +123,73 @@ int KafkaAvroProvider::open() { ...@@ -130,64 +123,73 @@ int KafkaAvroProvider::open() {
this->producer = nullptr; this->producer = nullptr;
return -1; return -1;
} }
time_t now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t" << "Connected" << std::endl;
delete metadata; delete metadata;
this->stopFlag = false;
if (!pollThreadFlag) { if (!pollThreadFlag) {
this->pollThread = std::thread( this->pollThread = std::thread(
&KafkaAvroProvider::pollingThreadFunction, this); // start polling &KafkaAvroProvider::pollingThreadFunction, this); // start polling
this->pollThread.detach();
pollThreadFlag = true; pollThreadFlag = true;
} }
this->_open = true; this->_open = true;
time_t now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t" << "Connected" << std::endl;
return 1; return 1;
} }
int KafkaAvroProvider::close() { int KafkaAvroProvider::close() {
if (!isOpen()) if (!isOpen()) {
return 1; return 1;
if (this->closeMutex.try_lock()) { }
time_t now = time(nullptr); this->flush();
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") this->_close();
<< "]\t[KafkaAvro Provider]\t" time_t now = time(nullptr);
<< "Flushing kafka queue in 10 seconds.." << std::endl; std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
producer->flush(10 * 1000); << "]\t[KafkaAvro Provider]\t" << "Closed" << std::endl;
return 1;
if (producer->outq_len() > 0) { }
now = time(nullptr);
std::cerr << "["
<< std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t" << "" << producer->outq_len()
<< " message(s) were not delivered" << std::endl;
}
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") void KafkaAvroProvider::flush(){
<< "]\t[KafkaAvro Provider]\t" << "Closed" << std::endl; time_t now = time(nullptr);
delete producer; std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
producer = nullptr; << "]\t[KafkaAvro Provider]\t"
this->_open = false; << "Flushing kafka queue in 10 seconds.." << std::endl;
this->closeMutex.unlock(); producer->flush(10 * 1000);
return 1;
if (producer->outq_len() > 0) {
now = time(nullptr);
std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t" << "" << producer->outq_len()
<< " message(s) were not delivered" << std::endl;
} }
return -1;
} }
void KafkaAvroProvider::_close() {
this->pollThreadFlag = false;
this->_open = false;
delete producer;
producer = nullptr;
}
bool KafkaAvroProvider::isOpen() { bool KafkaAvroProvider::isOpen() {
return this->_open; return this->_open;
} }
void KafkaAvroProvider::pollingThreadFunction() { void KafkaAvroProvider::pollingThreadFunction() {
producer->poll(1); // pool previous events
time_t now = time(nullptr); time_t now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t" << "polling thread start..." << "]\t[KafkaAvro Provider]\t" << "polling thread start..."
<< std::endl; << std::endl;
while (!this->stopFlag) { while (this->pollThreadFlag) {
if (isOpen()) { if (isOpen()) {
this->producer->poll(0); this->producer->poll(0);
} }
std::this_thread::sleep_for(std::chrono::seconds(1)); // pool every second std::this_thread::sleep_for(std::chrono::milliseconds(100)); // pool ten times at second
} }
now = time(nullptr); now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
...@@ -246,7 +248,7 @@ int KafkaAvroProvider::write(Packets::BasePacket &packet, std::string dest) { ...@@ -246,7 +248,7 @@ int KafkaAvroProvider::write(Packets::BasePacket &packet, std::string dest) {
} }
time_t now = time(nullptr); time_t now = time(nullptr);
std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[KafkaAvro Provider]\t" << " Failed to produce to topic " << "]\t[KafkaAvro Provider]\t" << "Failed to produce to topic "
<< dest << ": " << err << std::endl; << dest << ": " << err << std::endl;
return -1; return -1;
} }
......