Skip to content
Snippets Groups Projects
KafkaAvro_Provider.h 1.76 KiB
Newer Older
Valerio Pastore's avatar
Valerio Pastore committed
/*
 *
 *  Created on: Mar 1, 2021
 *      Author: astrisw
 */

#pragma once

#include <Base_Provider.h>
Valerio Pastore's avatar
.  
Valerio Pastore committed
#include <Base_Configurator.h>
Valerio Pastore's avatar
Valerio Pastore committed
#include <unordered_map>
#include <librdkafka/rdkafkacpp.h>
#include <atomic>
#include <thread>
#include <Delivery_Report.h>
Valerio Pastore's avatar
.  
Valerio Pastore committed
#include <Encoder.hh>

Valerio Pastore's avatar
Valerio Pastore committed

namespace inaf::oasbo::Providers {
class KafkaAvroProvider: public BaseProvider {
protected:

Valerio Pastore's avatar
.  
Valerio Pastore committed
	//Kafka
Valerio Pastore's avatar
Valerio Pastore committed
	RdKafka::Producer *producer = nullptr;
	std::atomic<bool> stopFlag = false;
	std::thread pollThread;
	DeliveryReportCb *dr_cb = nullptr;
Valerio Pastore's avatar
.  
Valerio Pastore committed
	// AVRO
	avro::EncoderPtr avroencoder;
Valerio Pastore's avatar
Valerio Pastore committed

	KafkaAvroProvider();
	KafkaAvroProvider(std::string ip, int port, std::string topic);

Valerio Pastore's avatar
.  
Valerio Pastore committed
	void pollingThreadFunction();
	void encodeToAvro(PacketLib::BasePacket&);
Valerio Pastore's avatar
Valerio Pastore committed

Valerio Pastore's avatar
.  
Valerio Pastore committed
public:
Valerio Pastore's avatar
Valerio Pastore committed
	std::string brokerIp;
	int brokerPort;

	void setDest(std::string dest) override {
		this->dest = dest;
	}
	std::string getDest() override {
		return dest;
	}

	int write(PacketLib::BasePacket&) override;
	int write(PacketLib::BasePacket&, std::string dest) override;

	int close() override;
	int open() override;
	bool isOpen() override;
	~KafkaAvroProvider();

	friend class KafkaAvroProviderBuilder;

};

class KafkaAvroProviderBuilder {
protected:
	KafkaAvroProvider *provider;

public:
	std::string config_target { "kafkaavroprovider" };
	std::string ip_key { "ip" };
	std::string port_key { "port" };
	std::string topic_key { "topic" };

	KafkaAvroProviderBuilder();
	KafkaAvroProviderBuilder(std::string ip, int port, std::string topic);
	~KafkaAvroProviderBuilder();

	void reset();

	KafkaAvroProviderBuilder* configFrom(Configurators::BaseConfigurator &conf);

	KafkaAvroProviderBuilder* setIp(std::string ip);

	KafkaAvroProviderBuilder* setPort(int port);

	KafkaAvroProviderBuilder* setTopic(std::string topic);

	KafkaAvroProvider* getProvider();
};
}