#include using namespace inaf::oasbo::Providers; RedisProvider::RedisProvider() { setIp("127.0.0.1"); setPort(6379); setKey("DAQ_key"); } RedisProvider::RedisProvider(std::string ip, int port, std::string key) { setIp(ip); setPort(port); setKey(key); } int RedisProvider::write(PacketLib::BasePacket &packet) { uint size = packet.getPacketStructureByteSize(); redisReply *r = (redisReply *) redisCommand(context, "XADD DAQ_key * data %b", packet.getBinaryPointer(), size); if (r == nullptr || r->type == REDIS_REPLY_ERROR) { std::cerr << "XADD command failed: " << r->str << std::endl; return -1; } return size; } int RedisProvider::write(PacketLib::BasePacket &packet, std::string key) { uint size = packet.getPacketStructureByteSize(); redisReply *r = (redisReply *) redisCommand(context, "XADD %s * data %b",key, packet.getBinaryPointer(), size); if (r == nullptr || r->type == REDIS_REPLY_ERROR) { std::cerr << "XADD command failed: " << r->str << std::endl; return -1; } return size; } int RedisProvider::open() { context = redisConnect(getIp().c_str(), getPort()); if (context == NULL || context->err) { if (context) { printf("Error: %s\n", context->errstr); } else { printf("Can't allocate Redis context\n"); } } return 1; } int RedisProvider::close() { return 1; } RedisProvider::~RedisProvider(){ std::cout << "Deleting Redis stream" << std::endl; }