Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • bias/connection-protocols/file-receiver
1 result
Show changes
Commits on Source (2)
Subproject commit 8a0ea2d0e699863df5fe1c91caf2d7b0855957be Subproject commit a00f9a27afbf5f75dab7db2368b9b9b6fcb395e1
#pragma once #pragma once
#include <Base_Receiver.h> #include <Base_Receiver.h>
...@@ -9,24 +8,66 @@ ...@@ -9,24 +8,66 @@
#define RAW_FILES_PATH std::string(std::getenv("HOME")).append("/BIAS/config/RawTestFiles") #define RAW_FILES_PATH std::string(std::getenv("HOME")).append("/BIAS/config/RawTestFiles")
#endif #endif
/**
* @brief The namespace inaf::oasbo::Receivers contains classes related to receiving data.
*/
namespace inaf::oasbo::Receivers { namespace inaf::oasbo::Receivers {
/**
* @brief The FileReceiver class is a derived class of BaseReceiver and is responsible for receiving packets from files.
* check the Base_Receiver.h file for more information.
*/
class FileReceiver: public BaseReceiver { class FileReceiver: public BaseReceiver {
protected: protected:
/**
* @brief Constructs a FileReceiver object with a source and a rate.
* @param source The source files to receive.
* @param rate The rate at which the packets are received.
*/
FileReceiver(std::string source, int rate); FileReceiver(std::string source, int rate);
/**
* @brief Constructs a FileReceiver object with a source.
* @param source The source files to receive.
*/
FileReceiver(std::string source); FileReceiver(std::string source);
/**
* @brief Constructs a FileReceiver object.
*/
FileReceiver(); FileReceiver();
std::ifstream *ifile = nullptr;
ssize_t filesize = 0; std::ifstream *ifile = nullptr; /**< Pointer to the input file stream. */
std::vector<std::string> filesToProcess; ssize_t filesize = 0; /**< The size of the file being received. */
size_t filesCount = 0; std::vector<std::string> filesToProcess; /**< Vector of files to process. */
size_t filesCount = 0; /**< The number of files to process. */
/**
* @brief Opens the next file to process.
* @return integer representing the status of the operation.
*/
int openNextFile(); int openNextFile();
void resetPacket(PacketLib::BasePacket &pack, int bytes);
/**
* @brief Resets the first bytes of the packet to 0.
* @param pack The packet to reset.
* @param bytes The number of bytes to reset.
*/
void resetPacket(Packets::BasePacket &pack, int bytes);
/**
* @brief Resolves the environment variable in the given path.
* @param path The path containing the environment variable.
* @return The resolved path.
*/
std::string resolveEnvVar(std::string path); std::string resolveEnvVar(std::string path);
public: public:
int rate; int rate; /**< The rate at which the packets are received. */
/**
* @brief Destructor for the FileReceiver class.
*/
~FileReceiver(); ~FileReceiver();
std::string getHost() override; std::string getHost() override;
...@@ -35,25 +76,65 @@ public: ...@@ -35,25 +76,65 @@ public:
int connectToClient() override; int connectToClient() override;
int closeConnectionToClient() override; int closeConnectionToClient() override;
bool isConnectedToClient() const override; bool isConnectedToClient() const override;
int receiveFromClient(PacketLib::BasePacket&) override; int receiveFromClient(Packets::BasePacket&) override;
friend class FileReceiverBuilder; friend class FileReceiverBuilder;
}; };
/**
* @brief The FileReceiverBuilder class is responsible for building FileReceiver objects.
*/
class FileReceiverBuilder { class FileReceiverBuilder {
protected: protected:
FileReceiver *rcv; FileReceiver *rcv; /**< Pointer to the FileReceiver object being built. */
std::string sourceFile; std::string sourceFile; /**< The source file for the FileReceiver object. */
public: public:
std::string config_target { "filereceiver" }; std::string config_target { "filereceiver" }; /**< The configuration target for the FileReceiverBuilder. */
std::string source_key { "source" }; std::string source_key { "source" }; /**< The source key for the FileReceiverBuilder. */
std::string rate_key { "rate" }; std::string rate_key { "rate" }; /**< The rate key for the FileReceiverBuilder. */
/**
* @brief Constructs a FileReceiverBuilder object.
*/
FileReceiverBuilder(); FileReceiverBuilder();
/**
* @brief Destructor for the FileReceiverBuilder class.
*/
~FileReceiverBuilder(); ~FileReceiverBuilder();
/**
* @brief Resets the FileReceiverBuilder object.
*/
void reset(); void reset();
/**
* @brief Configures the FileReceiverBuilder object from a BaseConfigurator object.
* @param conf The BaseConfigurator object to configure from.
* @return A pointer to the FileReceiverBuilder object.
*/
FileReceiverBuilder* configFrom(Configurators::BaseConfigurator &conf); FileReceiverBuilder* configFrom(Configurators::BaseConfigurator &conf);
/**
* @brief Sets the source for the FileReceiverBuilder object.
* @param source The source of the files to receive.
* @return A pointer to the FileReceiverBuilder object.
*/
FileReceiverBuilder* setSource(std::string source); FileReceiverBuilder* setSource(std::string source);
/**
* @brief Sets the rate for the FileReceiverBuilder object.
* @param rate The rate at which the files are received.
* @return A pointer to the FileReceiverBuilder object.
*/
FileReceiverBuilder* setRate(int rate); FileReceiverBuilder* setRate(int rate);
/**
* @brief Gets the built FileReceiver object.
* @return A pointer to the built FileReceiver object.
*/
FileReceiver* getReceiver(); FileReceiver* getReceiver();
}; };
}
} // namespace inaf::oasbo::Receivers
...@@ -13,16 +13,17 @@ void FileReceiverBuilder::reset() { ...@@ -13,16 +13,17 @@ void FileReceiverBuilder::reset() {
this->rcv = new FileReceiver(); this->rcv = new FileReceiver();
} }
FileReceiverBuilder* FileReceiverBuilder::configFrom(Configurators::BaseConfigurator &conf) { FileReceiverBuilder* FileReceiverBuilder::configFrom(
Configurators::BaseConfigurator &conf) {
conf.readConfigFromSource(config_target); conf.readConfigFromSource(config_target);
std::map<std::string, std::string> params = conf.getConfig(); std::map<std::string, std::string> params = conf.getConfig();
std::string key = config_target+"_"+source_key; std::string key = config_target + "_" + source_key;
if (params.count(key) > 0) if (params.count(key) > 0)
rcv->setHost(params[key]); rcv->setHost(params[key]);
key = config_target+"_"+rate_key; key = config_target + "_" + rate_key;
if (params.count(key) > 0){ if (params.count(key) > 0) {
rcv->rate = std::stoi(params[key]); rcv->rate = std::stoi(params[key]);
} }
return this; return this;
......
...@@ -31,7 +31,7 @@ FileReceiver::FileReceiver(std::string source, int rate) : ...@@ -31,7 +31,7 @@ FileReceiver::FileReceiver(std::string source, int rate) :
this->setHost(source); this->setHost(source);
} }
int FileReceiver::receiveFromClient(PacketLib::BasePacket &pack) { int FileReceiver::receiveFromClient(Packets::BasePacket &pack) {
if (ifile->tellg() == filesize) { // end of file, open next if (ifile->tellg() == filesize) { // end of file, open next
if (!openNextFile()) { // all file processed if (!openNextFile()) { // all file processed
closeConnectionToClient(); closeConnectionToClient();
...@@ -75,18 +75,21 @@ int FileReceiver::connectToClient() { ...@@ -75,18 +75,21 @@ int FileReceiver::connectToClient() {
namespace fs = std::filesystem; namespace fs = std::filesystem;
std::string s = resolveEnvVar(host); std::string s = resolveEnvVar(host);
fs::path source(s); fs::path source(s);
if (!std::filesystem::exists(source)) { if (!std::filesystem::exists(source)) {
time_t now = time(nullptr); time_t now = time(nullptr);
std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") <<"]\t[File Receiver]\t" << host << " does not exists!" << std::endl; std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
return -1; << "]\t[File Receiver]\t" << host << " does not exists!"
} << std::endl;
return -1;
}
if (std::filesystem::is_regular_file(source)) { // host is only one file, not a folder. if (std::filesystem::is_regular_file(source)) { // host is only one file, not a folder.
filesToProcess.push_back(source); filesToProcess.push_back(source);
} }
else { // host is a folder, so iterates over all files into the folder else { // host is a folder, so iterates over all files into the folder
for (const auto &entry : std::filesystem::recursive_directory_iterator(source)) { for (const auto &entry : std::filesystem::recursive_directory_iterator(
source)) {
if (entry.is_regular_file()) { if (entry.is_regular_file()) {
filesToProcess.push_back(entry.path().string()); filesToProcess.push_back(entry.path().string());
} }
...@@ -94,7 +97,7 @@ int FileReceiver::connectToClient() { ...@@ -94,7 +97,7 @@ int FileReceiver::connectToClient() {
} }
// sort alphabetically // sort alphabetically
std::sort(filesToProcess.begin(), filesToProcess.end()); std::sort(filesToProcess.begin(), filesToProcess.end());
if (!openNextFile()) // try to open first file if (!openNextFile()) // try to open first file
return -1; return -1;
...@@ -127,7 +130,7 @@ FileReceiver::~FileReceiver() { ...@@ -127,7 +130,7 @@ FileReceiver::~FileReceiver() {
} }
} }
void FileReceiver::resetPacket(PacketLib::BasePacket &pack, int bytes) { void FileReceiver::resetPacket(Packets::BasePacket &pack, int bytes) {
uint8_t *buff = new uint8_t[bytes]; uint8_t *buff = new uint8_t[bytes];
std::memset(buff, 0, bytes); std::memset(buff, 0, bytes);
int toBeReset = std::min( int toBeReset = std::min(
...@@ -141,27 +144,30 @@ int FileReceiver::openNextFile() { ...@@ -141,27 +144,30 @@ int FileReceiver::openNextFile() {
ifile = new std::ifstream(filesToProcess.at(filesCount), ifile = new std::ifstream(filesToProcess.at(filesCount),
std::ios::binary); std::ios::binary);
if (!ifile->is_open()) { if (!ifile->is_open()) {
time_t now = time(nullptr); time_t now = time(nullptr);
std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") <<"]\t[File Receiver]\t" << "Cannot open " std::cerr << "["
<< std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[File Receiver]\t" << "Cannot open "
<< filesToProcess.at(filesCount) << std::endl; << filesToProcess.at(filesCount) << std::endl;
ifile = nullptr; ifile = nullptr;
filesCount += 1; filesCount += 1;
continue; // go to next file continue; // go to next file
} }
time_t now = time(nullptr); time_t now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S") <<"]\t[File Receiver]\t" << "Processing " << filesToProcess.at(filesCount) << std::endl; std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[File Receiver]\t" << "Processing "
<< filesToProcess.at(filesCount) << std::endl;
// compute filesize // compute filesize
ifile->seekg(0, std::ios::end); ifile->seekg(0, std::ios::end);
filesize = ifile->tellg(); filesize = ifile->tellg();
ifile->seekg(0, std::ios::beg); ifile->seekg(0, std::ios::beg);
filesCount += 1; filesCount += 1;
return 1; return 1;
} }
return 0; // all file processed return 0; // all file processed
} }
std::string FileReceiver::resolveEnvVar(std::string path) { std::string FileReceiver::resolveEnvVar(std::string path) {
// resolve env var if present // resolve env var if present
if (path.at(0) == '$') { if (path.at(0) == '$') {
......