Skip to content
Snippets Groups Projects
File_Receiver.cpp 4.92 KiB
Newer Older
Valerio Pastore's avatar
Valerio Pastore committed
#include <iostream>
#include <fstream>
Valerio Pastore's avatar
Valerio Pastore committed
#include <string>
#include <filesystem>

Valerio Pastore's avatar
Valerio Pastore committed
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <netdb.h>
#include <regex>
#include <algorithm>
Valerio Pastore's avatar
Valerio Pastore committed

#include <File_Receiver.h>
Valerio Pastore's avatar
.  
Valerio Pastore committed
#include <chrono>
#include <thread>
Valerio Pastore's avatar
Valerio Pastore committed
#include <ctime>
#include <iomanip>

Valerio Pastore's avatar
Valerio Pastore committed
using namespace inaf::oasbo::Receivers;
Valerio Pastore's avatar
Valerio Pastore committed

Valerio Pastore's avatar
.  
Valerio Pastore committed
FileReceiver::FileReceiver() :
		FileReceiver(std::string(RAW_FILES_PATH).append("/test.raw"), 10) {
Valerio Pastore's avatar
.  
Valerio Pastore committed
}

Valerio Pastore's avatar
Valerio Pastore committed
FileReceiver::FileReceiver(std::string source) :
		FileReceiver(source, 10) {
Valerio Pastore's avatar
Valerio Pastore committed
}

FileReceiver::FileReceiver(std::string source, int rate) :
		rate(rate) {
Valerio Pastore's avatar
.  
Valerio Pastore committed
	this->setHost(source);
Valerio Pastore's avatar
Valerio Pastore committed
}

Valerio Pastore's avatar
.  
Valerio Pastore committed
int FileReceiver::receiveFromClient(Packets::BasePacket &pack) {
Valerio Pastore's avatar
Valerio Pastore committed
	if (!this->isConnectedToClient()) {
		time_t now = time(nullptr);
		std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
				<< "]\t[File Receiver]\t" << host << " not open!"
				<< std::endl;
		return -1;
	}
	if (ifile->tellg() == filesize) { // end of file, open next
		if (!openNextFile()) { // all file processed
			closeConnectionToClient();
			return 0;
		}
Valerio Pastore's avatar
Valerio Pastore committed
	}
	ssize_t headerSize = pack.getHeaderSize();
	uint8_t *buff = new uint8_t[pack.getPacketStructureByteSize()];
	ifile->read((char*) buff, headerSize);
Valerio Pastore's avatar
Valerio Pastore committed
	ssize_t rec = ifile->gcount();
	if (rec != headerSize) {
Valerio Pastore's avatar
.  
Valerio Pastore committed
		delete[] buff;
Valerio Pastore's avatar
Valerio Pastore committed
		return -1;
	}
Valerio Pastore's avatar
Valerio Pastore committed
	pack.copyToMemory(buff, headerSize);
	if (!pack.hasRecognizedHeader()) {
Valerio Pastore's avatar
Valerio Pastore committed
		resetPacket(pack, headerSize);
Valerio Pastore's avatar
.  
Valerio Pastore committed
		delete[] buff;
Valerio Pastore's avatar
Valerio Pastore committed
		return -1;
	}
	ssize_t to_be_read = pack.getPayloadSize() + pack.getTailSize();
	ifile->read((char*) &buff[headerSize], to_be_read);
Valerio Pastore's avatar
Valerio Pastore committed
	rec = ifile->gcount();
	if (rec != to_be_read) {
		resetPacket(pack, headerSize);
Valerio Pastore's avatar
.  
Valerio Pastore committed
		delete[] buff;
Valerio Pastore's avatar
Valerio Pastore committed
		return -1;
	}
Valerio Pastore's avatar
Valerio Pastore committed
	pack.copyToMemory(&buff[headerSize], rec, headerSize);
Valerio Pastore's avatar
.  
Valerio Pastore committed
	delete[] buff;
Valerio Pastore's avatar
Valerio Pastore committed

	if (rate > 0) {
		std::chrono::nanoseconds waitTime(
				static_cast<long long>(1000000000 / rate));
		std::this_thread::sleep_for(waitTime);
	}
Valerio Pastore's avatar
Valerio Pastore committed
	return 1;
}

int FileReceiver::connectToClient() {
Valerio Pastore's avatar
Valerio Pastore committed
	namespace fs = std::filesystem;
	std::string s = resolveEnvVar(host);
	fs::path source(s);
Valerio Pastore's avatar
.  
Valerio Pastore committed
	if (!std::filesystem::exists(source)) {
		time_t now = time(nullptr);
		std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
				<< "]\t[File Receiver]\t" << host << " does not exists!"
				<< std::endl;
		return -1;
	}
Valerio Pastore's avatar
Valerio Pastore committed
	if (std::filesystem::is_regular_file(source)) { // host is only one file, not a folder.
		filesToProcess.push_back(source);
	}

	else { // host is a folder, so iterates over all files into the folder
Valerio Pastore's avatar
.  
Valerio Pastore committed
		for (const auto &entry : std::filesystem::recursive_directory_iterator(
				source)) {
			if (entry.is_regular_file()) {
				filesToProcess.push_back(entry.path().string());
			}
		}
Valerio Pastore's avatar
Valerio Pastore committed
	}

	// sort alphabetically
Valerio Pastore's avatar
.  
Valerio Pastore committed
	std::sort(filesToProcess.begin(), filesToProcess.end());

	if (!openNextFile()) // try to open first file
		return -1;
Valerio Pastore's avatar
Valerio Pastore committed
	return 1;
}

int FileReceiver::closeConnectionToClient() {
Valerio Pastore's avatar
Valerio Pastore committed
	if (isConnectedToClient()) {
Valerio Pastore's avatar
Valerio Pastore committed
		ifile->close();
	}
	return 1;
}

bool FileReceiver::isConnectedToClient() const {
	return ifile != nullptr && ifile->is_open();
Valerio Pastore's avatar
Valerio Pastore committed
}

std::string FileReceiver::getHost() {
	return this->host;
}

void FileReceiver::setHost(std::string host) {
	this->host = host;
}

FileReceiver::~FileReceiver() {
	if (ifile != nullptr && ifile->is_open()) {
Valerio Pastore's avatar
Valerio Pastore committed
		ifile->close();
		delete ifile;
	}
}

Valerio Pastore's avatar
.  
Valerio Pastore committed
void FileReceiver::resetPacket(Packets::BasePacket &pack, int bytes) {
Valerio Pastore's avatar
Valerio Pastore committed
	uint8_t *buff = new uint8_t[bytes];
	std::memset(buff, 0, bytes);
	int toBeReset = std::min(
			static_cast<int>(pack.getPacketStructureByteSize()), bytes);
Valerio Pastore's avatar
Valerio Pastore committed
	pack.copyToMemory(buff, toBeReset);
Valerio Pastore's avatar
.  
Valerio Pastore committed
	delete[] buff;
Valerio Pastore's avatar
Valerio Pastore committed
}

int FileReceiver::openNextFile() {
	while (filesCount < filesToProcess.size()) {
		ifile = new std::ifstream(filesToProcess.at(filesCount),
				std::ios::binary);
		if (!ifile->is_open()) {
Valerio Pastore's avatar
.  
Valerio Pastore committed
			time_t now = time(nullptr);
			std::cerr << "["
					<< std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
					<< "]\t[File Receiver]\t" << "Cannot open "
					<< filesToProcess.at(filesCount) << std::endl;
			ifile = nullptr;
			filesCount += 1;
			continue; // go to next file
		}
Valerio Pastore's avatar
.  
Valerio Pastore committed
		time_t now = time(nullptr);
		std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
				<< "]\t[File Receiver]\t" << "Processing "
				<< filesToProcess.at(filesCount) << std::endl;
		// compute filesize
		ifile->seekg(0, std::ios::end);
		filesize = ifile->tellg();
		ifile->seekg(0, std::ios::beg);
Valerio Pastore's avatar
.  
Valerio Pastore committed

		filesCount += 1;
		return 1;
	}
	return 0; // all file processed
}

Valerio Pastore's avatar
Valerio Pastore committed
std::string FileReceiver::resolveEnvVar(std::string path) {
	// resolve env var if present
	if (path.at(0) == '$') {
		size_t pos = path.find('/'); // find first '/'
		std::string env_var_name = path.substr(1, pos - 1);
		char *env_var_value = std::getenv(env_var_name.c_str());
		std::string env_var;
		if (env_var_value != nullptr)
			env_var = std::string(std::getenv(env_var_name.c_str()));
		else
			// env var does not exist! Replace it with $HOME
			env_var = std::string(std::getenv("HOME"));

		path = std::string(env_var).append(path.substr(pos));
	}
	return path;
}