Skip to content
Snippets Groups Projects
Commit 6315c29b authored by Valerio Pastore's avatar Valerio Pastore
Browse files

to be tested with tcp connection errors

parent c4564ae6
No related branches found
No related tags found
No related merge requests found
Subproject commit 267288ef3bd4cbf468c9730fba29e4a07402f0b3
Subproject commit 124cf4c79d4f3f37c3b214ce07c80125dccffac1
Subproject commit 6db61b0ad0499fe596cb6e2176d93cb7e8c68b10
Subproject commit 4a6f9e51a53137ddc7c133dfac21d38efaa0611f
......@@ -21,8 +21,13 @@ protected:
std::thread checkProvConn;
bool checkProvThreadFlag = false;
std::thread receiveAndProcessThread;
bool runningFlag = false;
int receiveAndProcessPacket();
public:
AstriDAQ();
~AstriDAQ();
std::string getStateStr(Status);
void start() override;
......
......@@ -21,7 +21,7 @@ protected:
bool checkConnThreadFlag = false;
std::thread checkConn;
bool isConnected();
int connect();
int connectLoop();
std::set<std::string> keys;
bool stopFlag = false;
......
......@@ -33,12 +33,18 @@ DAQ::AstriDAQ::AstriDAQ() {
Packets::PacketStructureJson structure(ma_packet);
this->packet = new Packets::AstriMaGeneric(structure);
this->currentState = Status::STOP;
this->currentState = Status::INIT;
firstIdle = true;
}
DAQ::AstriDAQ::~AstriDAQ() {
switchState(Status::STOP);
}
std::string DAQ::AstriDAQ::getStateStr(Status state) {
switch (state) {
case Status::INIT:
return std::string("INIT");
case Status::STOP:
return std::string("STOP");
case Status::IDLE:
......@@ -82,7 +88,33 @@ void DAQ::AstriDAQ::updateDaqIDinConf() {
void DAQ::AstriDAQ::switchState(Status newState) {
if (newState == STOP) {
this->runningFlag = false;
std::cout << "1" << std::endl;
if (this->receiveAndProcessThread.joinable()) {
this->receiveAndProcessThread.join();
}
// stop receiving
this->receiver->closeConnectionToClient();
if (archiver->is_open()) {
this->archiver->close();
updateArchiverObservers();
}
this->checkProvThreadFlag = false;
if (this->checkProvConn.joinable()) {
this->checkProvConn.join();
}
this->provider->close();
std::for_each(this->observers.begin(), this->observers.end(),
[](inaf::oasbo::DAQ_observers::BaseDAQ_Observer *ob) {
ob->stop();
});
this->setCurrentState(Status::STOP);
std::cout << "5" << std::endl;
return;
}
......@@ -96,6 +128,16 @@ void DAQ::AstriDAQ::switchState(Status newState) {
archiver->setDest(dest);
archiver->open();
updateArchiverObservers();
this->receiveAndProcessThread = std::thread([this]() {
this->runningFlag = true;
while (this->runningFlag) {
if (this->currentState == Status::READY) {
std::this_thread::sleep_for(std::chrono::seconds(1));
} else {
receiveAndProcessPacket();
}
}
});
break;
default:
time_t now = time(nullptr);
......@@ -135,15 +177,6 @@ void DAQ::AstriDAQ::switchState(Status newState) {
updateArchiverObservers();
this->setCurrentState(Status::READY);
receiver->connectToClient();
if (!receiver->isConnectedToClient()) {
time_t now = time(nullptr);
std::cerr << "["
<< std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[Astri DAQ]\t"
<< "Unable to establish connection" << std::endl;
switchState(Status::STOP);
break;
}
incrementDaqID();
monitor->reset();
this->switchState(IDLE);
......@@ -197,7 +230,7 @@ void DAQ::AstriDAQ::switchState(Status newState) {
}
break;
}
case Status::STOP:
case Status::INIT:
switch (newState) {
case Status::READY:
std::for_each(this->observers.begin(), this->observers.end(),
......@@ -213,14 +246,16 @@ void DAQ::AstriDAQ::switchState(Status newState) {
}
this->checkProvThreadFlag = true;
this->checkProvConn = std::thread(&AstriDAQ::connectProvider, this);
this->setCurrentState(Status::READY);
receiver->connectToClient();
if (!receiver->isConnectedToClient()) {
time_t now = time(nullptr);
std::cerr << "["
<< std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[Astri DAQ]\t"
<< "Unable to connect Receiver" << std::endl;
this->setCurrentState(STOP);
<< "]\t[Astri DAQ]\t" << "Unable to connect Receiver"
<< std::endl;
this->switchState(STOP);
break;
} else {
time_t now = time(nullptr);
......@@ -228,9 +263,8 @@ void DAQ::AstriDAQ::switchState(Status newState) {
<< std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[Astri DAQ]\t" << "Receiver connected"
<< std::endl;
this->switchState(IDLE);
}
this->setCurrentState(Status::READY);
this->switchState(IDLE);
break;
default:
time_t now = time(nullptr);
......@@ -247,75 +281,51 @@ void DAQ::AstriDAQ::switchState(Status newState) {
}
}
void DAQ::AstriDAQ::start() {
incrementDaqID();
switchState(Status::READY);
// start receiving
while (currentState != Status::STOP && currentState != Status::READY) {
int rec = receiver->receiveFromClient(*this->packet);
if (rec <= 0) { // error in receiving or closed connection
if (!receiver->isConnectedToClient()) { // reconnect
time_t now = time(nullptr);
std::cout << "["
<< std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[Astri DAQ]\t" << "Connessione chiusa"
<< std::endl;
this->switchState(Status::READY);
}
continue;
}
if (!this->packet->isRecognizedHeader()) {
time_t now = time(nullptr);
std::cerr << "["
<< std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[Astri DAQ]\t" << "Packet header not recognized "
<< std::endl;
continue;
}
this->deliverPacket();
Packets::AstriMaGeneric *packet =
static_cast<Packets::AstriMaGeneric*>(this->packet); // downcast
if (packet->getType() == 15) { //NOTIF PACKET
time_t now = time(nullptr);
std::cout << "["
<< std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[Astri DAQ]" << std::endl;
monitor->printStats();
std::cout << "-------------------------------" << std::endl;
unsigned short cmd = ((this->packet->getBinaryPointer()[8]) << 8)
+ (this->packet->getBinaryPointer()[9]);
Status newState = cmd == 43690 ? Status::RUN : Status::IDLE;
this->setNextState(newState);
now = time(nullptr);
std::cout << "["
<< std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[Astri DAQ]\t" << "Switching from "
<< getStateStr(this->currentState) << " to -> "
<< getStateStr(newState) << std::endl;
this->switchState(getNextState());
}
int DAQ::AstriDAQ::receiveAndProcessPacket() {
int rec = receiver->receiveFromClient(*this->packet);
if (rec <= 0 && receiver->isConnectedToClient()) {
return -1;
}
// stop receiving
if (archiver->is_open()) {
this->archiver->close();
updateArchiverObservers();
if (rec <= 0 && !receiver->isConnectedToClient()) { // reconnect
time_t now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[Astri DAQ]\t" << "Connessione chiusa" << std::endl;
this->switchState(Status::READY);
return -1;
}
this->checkProvThreadFlag = false;
if (this->checkProvConn.joinable()) {
this->checkProvConn.join();
if (!this->packet->isRecognizedHeader()) {
time_t now = time(nullptr);
std::cerr << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[Astri DAQ]\t" << "Packet header not recognized "
<< std::endl;
return -1;
}
this->provider->close();
this->receiver->closeConnectionToClient();
std::for_each(this->observers.begin(), this->observers.end(),
[](inaf::oasbo::DAQ_observers::BaseDAQ_Observer *ob) {
ob->stop();
});
this->deliverPacket();
Packets::AstriMaGeneric *packet =
static_cast<Packets::AstriMaGeneric*>(this->packet); // downcast
if (packet->getType() == 15) { //NOTIF PACKET
time_t now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[Astri DAQ]" << std::endl;
monitor->printStats();
std::cout << "-------------------------------" << std::endl;
unsigned short cmd = ((this->packet->getBinaryPointer()[8]) << 8)
+ (this->packet->getBinaryPointer()[9]);
Status newState = cmd == 43690 ? Status::RUN : Status::IDLE;
now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[Astri DAQ]\t" << "Switching from "
<< getStateStr(this->currentState) << " to -> "
<< getStateStr(newState) << std::endl;
this->firstIdle = false;
this->switchState(newState);
}
return 1;
}
void DAQ::AstriDAQ::start() {
incrementDaqID();
switchState(Status::READY);
}
int DAQ::AstriDAQ::deliverPacket() {
......@@ -355,7 +365,7 @@ int AstriDAQ::connectProvider() {
<< std::endl;
}
}
std::this_thread::sleep_for(std::chrono::seconds(2));
std::this_thread::sleep_for(std::chrono::seconds(1));
}
return 1;
}
......
......@@ -238,7 +238,13 @@ void AstriRedisDAQObserver::start() {
return;
}
this->checkConnThreadFlag = true;
this->checkConn = std::thread(&AstriRedisDAQObserver::connect, this);
this->context = redisConnect(ip.c_str(), port);
if (isConnected()) {
time_t now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
<< "]\t[Redis Observer]" << "\tConnected" << std::endl;
}
this->checkConn = std::thread(&AstriRedisDAQObserver::connectLoop, this);
}
void AstriRedisDAQObserver::stop() {
......@@ -278,7 +284,7 @@ bool AstriRedisDAQObserver::isConnected() {
return (context != nullptr && !context->err);
}
int AstriRedisDAQObserver::connect() {
int AstriRedisDAQObserver::connectLoop() {
while (checkConnThreadFlag) {
if (!isConnected()) {
time_t now = time(nullptr);
......@@ -293,7 +299,7 @@ int AstriRedisDAQObserver::connect() {
<< "]\t[Redis Observer]" << "\tConnected" << std::endl;
}
}
std::this_thread::sleep_for(std::chrono::seconds(2));
std::this_thread::sleep_for(std::chrono::seconds(1));
}
return 1;
}
......@@ -73,8 +73,6 @@ int main(int argc, char **argv) {
"BIAS1ast01$1", "BIAS");
daq->registerObserver(&mysql_obs);
std::thread daqThread(&DAQ::AstriDAQ::start, daq);
std::signal(SIGINT, [](int signal) {
my_handler(signal);
});
......@@ -82,8 +80,12 @@ int main(int argc, char **argv) {
my_handler(signal);
});
if (daqThread.joinable())
daqThread.join();
daq->start();
while(daq->getCurrentState() != AstriDAQ::Status::STOP){
std::this_thread::sleep_for(std::chrono::seconds(1));
}
now = time(nullptr);
std::cout << "[" << std::put_time(localtime(&now), "%Y-%m-%d %H:%M:%S")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment