Newer
Older
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <netdb.h>
#include <regex>
FileReceiver(std::string(RAW_FILES_PATH).append("/test.raw"), 10) {
}
FileReceiver::FileReceiver(std::string source, int rate) :
rate(rate) {
if (!this->isConnectedToClient()) {
time_t now = time(nullptr);
std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[File Receiver]\t" << host << " not open!"
<< std::endl;
return -1;
}
if (ifile->tellg() == filesize) { // end of file, open next
if (!openNextFile()) { // all file processed
closeConnectionToClient();
return 0;
}
}
ssize_t headerSize = pack.getHeaderSize();
uint8_t *buff = new uint8_t[pack.getPacketStructureByteSize()];
pack.copyToMemory(buff, headerSize);
if (!pack.hasRecognizedHeader()) {
return -1;
}
ssize_t to_be_read = pack.getPayloadSize() + pack.getTailSize();
ifile->read((char*) &buff[headerSize], to_be_read);
rec = ifile->gcount();
if (rec != to_be_read) {
resetPacket(pack, headerSize);
std::chrono::nanoseconds waitTime(
static_cast<long long>(1000000000 / rate));
std::this_thread::sleep_for(waitTime);
}
namespace fs = std::filesystem;
std::string s = resolveEnvVar(host);
fs::path source(s);
if (!std::filesystem::exists(source)) {
time_t now = time(nullptr);
std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[File Receiver]\t" << host << " does not exists!"
<< std::endl;
return -1;
}
if (std::filesystem::is_regular_file(source)) { // host is only one file, not a folder.
filesToProcess.push_back(source);
}
else { // host is a folder, so iterates over all files into the folder
for (const auto &entry : std::filesystem::recursive_directory_iterator(
source)) {
if (entry.is_regular_file()) {
filesToProcess.push_back(entry.path().string());
}
}
if (!openNextFile()) // try to open first file
return -1;
return 1;
}
int FileReceiver::closeConnectionToClient() {
ifile->close();
}
return 1;
}
bool FileReceiver::isConnectedToClient() const {
return ifile != nullptr && ifile->is_open();
}
std::string FileReceiver::getHost() {
return this->host;
}
void FileReceiver::setHost(std::string host) {
this->host = host;
}
FileReceiver::~FileReceiver() {
if (ifile != nullptr && ifile->is_open()) {
void FileReceiver::resetPacket(Packets::BasePacket &pack, int bytes) {
uint8_t *buff = new uint8_t[bytes];
std::memset(buff, 0, bytes);
int toBeReset = std::min(
static_cast<int>(pack.getPacketStructureByteSize()), bytes);
int FileReceiver::openNextFile() {
while (filesCount < filesToProcess.size()) {
ifile = new std::ifstream(filesToProcess.at(filesCount),
std::ios::binary);
if (!ifile->is_open()) {
time_t now = time(nullptr);
std::cerr << "["
<< std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[File Receiver]\t" << "Cannot open "
<< filesToProcess.at(filesCount) << std::endl;
ifile = nullptr;
filesCount += 1;
continue; // go to next file
}
time_t now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[File Receiver]\t" << "Processing "
<< filesToProcess.at(filesCount) << std::endl;
// compute filesize
ifile->seekg(0, std::ios::end);
filesize = ifile->tellg();
ifile->seekg(0, std::ios::beg);
return 1;
}
return 0; // all file processed
}
std::string FileReceiver::resolveEnvVar(std::string path) {
// resolve env var if present
if (path.at(0) == '$') {
size_t pos = path.find('/'); // find first '/'
std::string env_var_name = path.substr(1, pos - 1);
char *env_var_value = std::getenv(env_var_name.c_str());
std::string env_var;
if (env_var_value != nullptr)
env_var = std::string(std::getenv(env_var_name.c_str()));
else
// env var does not exist! Replace it with $HOME
env_var = std::string(std::getenv("HOME"));
path = std::string(env_var).append(path.substr(pos));
}
return path;
}