Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
A
Astri MA Processor
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
BIAS
Processors
Astri MA Processor
Commits
ae611756
Commit
ae611756
authored
1 year ago
by
Valerio Pastore
Browse files
Options
Downloads
Patches
Plain Diff
connection improvements
parent
67bf8319
No related branches found
Branches containing commit
No related tags found
No related merge requests found
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
deps/Kafka-Avro-Provider
+1
-1
1 addition, 1 deletion
deps/Kafka-Avro-Provider
deps/Redis_Receiver
+1
-1
1 addition, 1 deletion
deps/Redis_Receiver
include/Astri_MA_Processor.h
+1
-0
1 addition, 0 deletions
include/Astri_MA_Processor.h
src/Astri_MA_Processor.cpp
+34
-17
34 additions, 17 deletions
src/Astri_MA_Processor.cpp
with
37 additions
and
19 deletions
Kafka-Avro-Provider
@
500c9232
Compare
0f9ac408
...
500c9232
Subproject commit
0f9ac408b8903f60f43ca5b113d8a96d0f53d562
Subproject commit
500c92328f7afe11a44a939f7cc08d17495c8570
This diff is collapsed.
Click to expand it.
Redis_Receiver
@
6765d3c0
Compare
8ad68648
...
6765d3c0
Subproject commit
8ad6864818ec55072cbbd4ea9eeee3a094cdf043
Subproject commit
6765d3c05a6003c67d793b6dee0ca537f0b9cf94
This diff is collapsed.
Click to expand it.
include/Astri_MA_Processor.h
+
1
−
0
View file @
ae611756
...
...
@@ -43,6 +43,7 @@ protected:
void
printLog
(
std
::
ostream
&
os
,
std
::
string
message
);
/**< Method for printing log messages. */
void
cleanup
();
/**< Method for cleaning up resources. */
std
::
thread
receiveAndProcessThread
;
/**< Thread for receiving and processing packets. */
void
receiveAndProcessThreadFunction
();
/** Function for the receiveAndProcess Thread. */
bool
receiveAndProcessFlag
=
false
;
/**< Flag for stopping the receiving ang processing */
public:
...
...
This diff is collapsed.
Click to expand it.
src/Astri_MA_Processor.cpp
+
34
−
17
View file @
ae611756
...
...
@@ -31,12 +31,18 @@ AstriMaProcessor::AstriMaProcessor() {
AstriMaProcessor
::~
AstriMaProcessor
()
{
for
(
auto
&
pair
:
packetStructuresMap
)
delete
pair
.
second
;
if
(
receiveAndProcessThread
.
joinable
())
{
receiveAndProcessThread
.
join
();
}
}
void
AstriMaProcessor
::
switchState
(
Status
newState
)
{
if
(
newState
==
STOP
)
{
cleanup
();
setCurrentState
(
Status
::
STOP
);
this
->
receiveAndProcessFlag
=
false
;
if
(
receiveAndProcessThread
.
joinable
())
{
receiveAndProcessThread
.
join
();
}
return
;
}
...
...
@@ -63,15 +69,7 @@ void AstriMaProcessor::switchState(Status newState) {
case
Status
::
RUN
:
// from READY to RUN
this
->
setCurrentState
(
newState
);
if
(
!
this
->
receiveAndProcessThread
.
joinable
())
{
// Start the receiveAndProcessThread only the first time the state is set to RUN from READY
this
->
receiveAndProcessThread
=
std
::
thread
([
this
]()
{
this
->
receiveAndProcessFlag
=
true
;
while
(
this
->
receiveAndProcessFlag
)
{
int
res
=
receiveAndProcessPacket
();
if
(
res
<
0
&&
receiveAndProcessFlag
)
{
// recv or provider connection went down
switchState
(
Status
::
READY
);
}
}
});
this
->
receiveAndProcessThread
=
std
::
thread
(
&
AstriMaProcessor
::
receiveAndProcessThreadFunction
,
this
);
}
break
;
default
:
...
...
@@ -88,7 +86,8 @@ void AstriMaProcessor::switchState(Status newState) {
case
Status
::
RUN
:
break
;
case
Status
::
READY
:
// from RUN to READY
if
(
connectReceiver
()
<
0
||
connectProvider
()
<
0
)
{
if
(
connectReceiver
()
<
0
||
connectProvider
()
<
0
)
{
// recv or provider connection went down, exit...
this
->
receiveAndProcessFlag
=
false
;
break
;
}
this
->
setCurrentState
(
Status
::
READY
);
...
...
@@ -104,9 +103,27 @@ void AstriMaProcessor::switchState(Status newState) {
}
}
void
AstriMaProcessor
::
receiveAndProcessThreadFunction
()
{
this
->
receiveAndProcessFlag
=
true
;
while
(
this
->
receiveAndProcessFlag
)
{
// becomes false if stop command or connection timeout exceeded (in next line switchState(Status::READY);)
int
res
=
receiveAndProcessPacket
();
if
(
res
<=
0
&&
receiveAndProcessFlag
)
{
// recv or provider connection went down
switchState
(
Status
::
READY
);
}
}
this
->
cleanup
();
this
->
setCurrentState
(
STOP
);
}
int
AstriMaProcessor
::
connectReceiver
()
{
if
(
receiver
->
connectToClient
()
>
0
)
if
(
receiver
->
isConnectedToClient
()){
return
1
;
}
if
(
receiver
->
connectToClient
()
>
0
){
return
1
;
}
auto
startTime
=
std
::
chrono
::
steady_clock
::
now
();
auto
currentTime
=
std
::
chrono
::
steady_clock
::
now
();
auto
elapsedTime
=
std
::
chrono
::
duration_cast
<
std
::
chrono
::
seconds
>
(
...
...
@@ -127,6 +144,8 @@ int AstriMaProcessor::connectReceiver() {
}
int
AstriMaProcessor
::
connectProvider
()
{
if
(
provider
->
isOpen
())
return
1
;
if
(
provider
->
open
()
>
0
)
return
1
;
auto
startTime
=
std
::
chrono
::
steady_clock
::
now
();
...
...
@@ -200,13 +219,11 @@ void AstriMaProcessor::start() {
void
AstriMaProcessor
::
cleanup
()
{
// STOPPING
this
->
receiveAndProcessFlag
=
false
;
if
(
receiveAndProcessThread
.
joinable
())
{
receiveAndProcessThread
.
join
();
}
auto
startTime
=
std
::
chrono
::
steady_clock
::
now
();
printLog
(
std
::
cout
,
"cleaning up..."
);
while
(
true
)
{
// break at the timeout or when there are no more packets to process
if
(
receiveAndProcessPacket
()
<
1
)
{
if
(
!
receiver
->
isConnectedToClient
()
||
!
provider
->
isOpen
()
||
receiveAndProcessPacket
()
<
1
)
{
printLog
(
std
::
cout
,
"Stop processing"
);
break
;
}
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment