Newer
Older
/*
*
* Created on: Mar 1, 2021
* Author: astrisw
*/
#pragma once
#include <Base_Provider.h>
#include <unordered_map>
#include <librdkafka/rdkafkacpp.h>
#include <atomic>
#include <thread>
#include <Delivery_Report.h>
namespace inaf::oasbo::Providers {
class KafkaAvroProvider: public BaseProvider {
protected:
RdKafka::Producer *producer = nullptr;
std::atomic<bool> stopFlag = false;
std::thread pollThread;
DeliveryReportCb *dr_cb = nullptr;
KafkaAvroProvider();
KafkaAvroProvider(std::string ip, int port, std::string topic);
void pollingThreadFunction();
void encodeToAvro(PacketLib::BasePacket&);
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
std::string brokerIp;
int brokerPort;
void setDest(std::string dest) override {
this->dest = dest;
}
std::string getDest() override {
return dest;
}
int write(PacketLib::BasePacket&) override;
int write(PacketLib::BasePacket&, std::string dest) override;
int close() override;
int open() override;
bool isOpen() override;
~KafkaAvroProvider();
friend class KafkaAvroProviderBuilder;
};
class KafkaAvroProviderBuilder {
protected:
KafkaAvroProvider *provider;
public:
std::string config_target { "kafkaavroprovider" };
std::string ip_key { "ip" };
std::string port_key { "port" };
std::string topic_key { "topic" };
KafkaAvroProviderBuilder();
KafkaAvroProviderBuilder(std::string ip, int port, std::string topic);
~KafkaAvroProviderBuilder();
void reset();
KafkaAvroProviderBuilder* configFrom(Configurators::BaseConfigurator &conf);
KafkaAvroProviderBuilder* setIp(std::string ip);
KafkaAvroProviderBuilder* setPort(int port);
KafkaAvroProviderBuilder* setTopic(std::string topic);
KafkaAvroProvider* getProvider();
};
}