Skip to content
Redis_Provider.cpp 1.53 KiB
Newer Older
Valerio Pastore's avatar
Valerio Pastore committed
#include <Redis_Provider.h>

Valerio Pastore's avatar
Valerio Pastore committed
using namespace inaf::oasbo::Providers;
Valerio Pastore's avatar
Valerio Pastore committed
RedisProvider::RedisProvider() {
Valerio Pastore's avatar
Valerio Pastore committed
	setIp("127.0.0.1");
	setPort(6379);
Valerio Pastore's avatar
Valerio Pastore committed
	setKey("DAQ_key");
}

Valerio Pastore's avatar
Valerio Pastore committed
RedisProvider::RedisProvider(std::string ip, int port, std::string key) {
Valerio Pastore's avatar
Valerio Pastore committed
	setIp(ip);
	setPort(port);
Valerio Pastore's avatar
Valerio Pastore committed
	setKey(key);
}

Valerio Pastore's avatar
Valerio Pastore committed
int RedisProvider::write(PacketLib::BasePacket &packet) {
Valerio Pastore's avatar
Valerio Pastore committed
	uint size = packet.getHeaderSize() + packet.getPayloadSize() + packet.getTailSize();
	redisReply *r = (redisReply *) redisCommand(context, "LPUSH DAQ_key %b", (char*) packet.getBinaryPointer(), size);
Valerio Pastore's avatar
Valerio Pastore committed
	  if (r == nullptr || r->type == REDIS_REPLY_ERROR) {
	        std::cerr << "LPUSH command failed: " << r->str << std::endl;
Valerio Pastore's avatar
Valerio Pastore committed
	        return -1;
	    }
	return size;
Valerio Pastore's avatar
Valerio Pastore committed
}

Valerio Pastore's avatar
Valerio Pastore committed
int RedisProvider::write(PacketLib::BasePacket &packet, std::string key) {
Valerio Pastore's avatar
Valerio Pastore committed
	uint size = packet.getHeaderSize() + packet.getPayloadSize() + packet.getTailSize();
	redisReply *r = (redisReply *) redisCommand(context, "LPUSH %s %b", key, (char*) packet.getBinaryPointer(), size);
Valerio Pastore's avatar
Valerio Pastore committed
	  if (r == nullptr || r->type == REDIS_REPLY_ERROR) {
	        std::cerr << "LPUSH command failed: " << r->str << std::endl;
Valerio Pastore's avatar
Valerio Pastore committed
	        return -1;
	    }
	return size;
Valerio Pastore's avatar
Valerio Pastore committed
}

Valerio Pastore's avatar
Valerio Pastore committed
int RedisProvider::open() {
Valerio Pastore's avatar
Valerio Pastore committed
	context = redisConnect(getIp().c_str(), getPort());
	if (context == NULL || context->err) {
	    if (context) {
	        printf("Error: %s\n", context->errstr);
	    } else {
	        printf("Can't allocate Redis context\n");
Valerio Pastore's avatar
Valerio Pastore committed
	}
Valerio Pastore's avatar
Valerio Pastore committed
	return 1;
Valerio Pastore's avatar
Valerio Pastore committed
}

Valerio Pastore's avatar
Valerio Pastore committed
int RedisProvider::close() {
Valerio Pastore's avatar
Valerio Pastore committed
	return 1;
Valerio Pastore's avatar
Valerio Pastore committed
}
Valerio Pastore's avatar
Valerio Pastore committed

RedisProvider::~RedisProvider(){
	std::cout << "Deleting Redis stream" << std::endl;
}