Skip to content
Snippets Groups Projects

Resolve CT-215

Merged Gianluca Marotta requested to merge CT-215 into master
3 files
+ 184
53
Compare changes
  • Side-by-side
  • Inline
Files
3
@@ -26,7 +26,7 @@ import json
# tango imports
import tango
from tango import DebugIt
from tango import DebugIt, EnsureOmniThread
from tango.server import run
from tango.server import Device
from tango.server import attribute, command
@@ -38,7 +38,7 @@ from tango import AttrWriteType, DeviceProxy
from ska.base import SKASubarray, SKASubarrayStateModel
#from ska.base import utils
from ska.base.commands import ActionCommand, ResultCode
from ska.base.faults import CapabilityValidationError, CommandError
from ska.base.faults import CapabilityValidationError, CommandError,StateModelError
from ska.base.control_model import HealthState, AdminMode, ObsState, ObsMode
from .utils.cspcommons import CmdExecState
from .utils.decorators import transaction_id
@@ -47,30 +47,61 @@ from .csp_manage_json import JsonConfiguration
# PROTECTED REGION END # // CspSubarray.additionnal_import
__all__ = ["CspSubarray", "main"]
'''
class CspSubarrayStateModel(SKASubarrayStateModel):
_subarray_transitions = {
('READY', 'goto_idle_succeeded'): (
"IDLE",
lambda self: self._set_obs_state(ObsState.IDLE)
),
('READY', 'goto_idle_started'): ("READY",None),
('READY', 'goto_idle_failed'): (
"OBSFAULT",
lambda self: self._set_obs_state(ObsState.FAULT)
),
}
def __init__(self, dev_state_callback=None):
"""
Implements the state model for the CSP SKASubarray,
This new State Model use automatic transitions of the ObservationStateMachine.
In addition to any transitions added explicitly, a to_«state»() method
is created automatically whenever a state is added to a Machine instance.
This method transitions to the target state no matter which state the machine is currently in.
Automatic transitions are used at the server re-initialization to
align the CSP Subarray with the current state of its
sub-elements.
"""
def __init__(
self,
logger,
op_state_callback=None,
admin_mode_callback=None,
obs_state_callback=None,
):
"""
Initialises the model. Note that this does not imply moving to
INIT state. The INIT state is managed by the model itself.
:param logger: the logger to be used by this state model.
:type logger: a logger that implements the standard library
logger interface
:param op_state_callback: A callback to be called when a
transition implies a change to op state
:type op_state_callback: callable
:param admin_mode_callback: A callback to be called when a
transition causes a change to device admin_mode
:type admin_mode_callback: callable
:param obs_state_callback: A callback to be called when a
transition causes a change to device obs_state
:type obs_state_callback: callable
"""
super().__init__(
dev_state_callback=dev_state_callback,
logger,
op_state_callback=op_state_callback,
admin_mode_callback=admin_mode_callback,
obs_state_callback=obs_state_callback,
)
self.update_transitions(self._subarray_transitions)
'''
# add direct transition from EMPTY to another observing state.
self._action_breakdown["force_to_idle"] = ("force_to_idle", None)
self._action_breakdown["force_to_empty"] = ("force_to_empty", None)
self._action_breakdown["force_to_ready"] = ("force_to_ready", None)
self._action_breakdown["force_to_aborted"] = ("force_to_aborted", None)
self._action_breakdown["force_to_scanning"] = ("force_to_scanning", None)
# add transtions to the ObservationStateMachine
self._observation_state_machine.add_transition(trigger='force_to_empty', source='*', dest='EMPTY')
self._observation_state_machine.add_transition(trigger='force_to_idle', source='*', dest='IDLE')
self._observation_state_machine.add_transition(trigger='force_to_ready', source='*', dest='READY')
self._observation_state_machine.add_transition(trigger='force_to_scanning', source='*', dest='SCANNING')
self._observation_state_machine.add_transition(trigger='force_to_aborted', source='*', dest='ABORTED')
class CspSubarray(SKASubarray):
"""
@@ -115,6 +146,72 @@ class CspSubarray(SKASubarray):
"""
class ForceObsStateTransitionCommand(ActionCommand):
"""
Class to handle the re-initialization of the Device server.
"""
def __init__(self, target, state_model, logger=None):
"""
Constructor for ForceObsStateTransition.
It's not a TANGO command.
This command in invoked at CSP.LMC Subarray re-initialization.
:param target: the object that this command acts upon; for
example, the SKASubarray device for which this class
implements the command
:type target: object
:param state_model: the state model that this command uses
to check that it is allowed to run, and that it drives
with actions.
:type state_model: :py:class:`SKASubarrayStateModel`
:param logger: the logger to be used by this Command. If not
provided, then a default module logger will be used.
:type logger: a logger that implements the standard library
logger interface
"""
super().__init__(
target, state_model, "force", start_action=False, logger=logger
)
self.action = None
def __call__(self, argin=None):
"""
Override the __call__ method to set the action to execute when the succeeded method
is called.
"""
self.action = argin
super().__call__(argin)
def check_allowed(self):
"""
check_allowed is invoked before the do() in the command __call__ method.
The basic behavior is to check for a transition named 'force_succeeded'.
At re-initialization there are several 'succeeded' transitions, depending on
the expected observing state of the CSP Subarray, so it's necessary to modify
the succeeded method name with the one stored in the action attribute.
"""
self.logger.info("check for transition trigger {}".format(self.action))
self._succeeded_hook = self.action
return super().check_allowed()
def do(self, argin):
self.action = argin
return (ResultCode.OK, f"Executed {argin}")
def succeeded(self):
"""
Action to take on successful completion of device server
re-initialization.
"""
self.logger.info("Execute succeeded with arg {}".format(self.action))
if not self.action:
self.logger.info("Action not specified!!")
self.state_model.perform_action(self.action)
def failed(self):
self.state_model.perform_action("fatal_error")
class InitCommand(SKASubarray.InitCommand):
"""
A class for the SKASubarray's init_device() "command".
@@ -131,6 +228,8 @@ class CspSubarray(SKASubarray):
(result_code, message) = super().do()
device = self.target
self.logger.info("CspSubarray INIT COMMAND STARTED!!")
self.logger.info("CspSubarray obs_state: {}".format(device._obs_state))
device._build_state = '{}, {}, {}'.format(release.name, release.version, release.description)
device._version_id = release.version
# connect to CSP.LMC TANGO DB
@@ -214,6 +313,7 @@ class CspSubarray(SKASubarray):
# keys: the command name('on, 'off'...)
# values: thread instance
device._command_thread = {}
device._stop_thread = defaultdict(lambda: False)
# _end_scan_event: thread event to signal EndScan
device._end_scan_event = threading.Event()
@@ -308,18 +408,139 @@ class CspSubarray(SKASubarray):
#device._reserved_search_beam_num = 0
device._assigned_timing_beams= []
device._assigned_vlbi_beams = []
# Try connection with the CBF sub-array
device.connect_to_subarray_subcomponent(device.CbfSubarray)
# Try connection with the PSS sub-array
device.connect_to_subarray_subcomponent(device.PssSubarray)
device._command_thread['init'] = threading.Thread(target=self.initialize_thread,
name="Thread-Initialization ObsState Alignment",
args=())
device._command_thread['init'].start()
# to use the push model in command_inout_asynch (the one with the callback parameter),
# change the global TANGO model to PUSH_CALLBACK.
apiutil = tango.ApiUtil.instance()
apiutil.set_asynch_cb_sub_model(tango.cb_sub_model.PUSH_CALLBACK)
self.logger.info(message)
return (result_code, message)
return (ResultCode.STARTED, "CSP Subarray Init STARTED")
def initialize_thread(self):
try:
with EnsureOmniThread():
self.logger.info("Init thread started")
device = self.target
args = (device, device.state_model, self.logger)
device.force_cmd_obj = device.ForceObsStateTransitionCommand(*args)
on_handler = device.OnCommand(*args)
# Try connection with the CBF sub-array
device.connect_to_subarray_subcomponent(device.CbfSubarray)
# TODO: add connection to CSPMaster to get information
# on subarrayMembership for PSS, PST and VLBI beams.
# Try connection with the PSS sub-array
device.connect_to_subarray_subcomponent(device.PssSubarray)
# put the device to OFF/EMPTY: no transition is allowed from INIT state
self.succeeded()
if device._sc_subarray_state[device.CbfSubarray] is not DevState.ON:
return
# put the device to ON/EMPTY
on_handler.succeeded()
# CASE B: CSP is ON
target_obs_state = device.obs_state_evaluator()
if target_obs_state in ['RESOURCING', 'ABORTING', 'CONFIGURING', 'RESETTING','RESTARTING']:
self.logger.info("Still to implement transitional state different from SCANNINNG")
return
#self.monitor_running_command(target_obs_state)
self.logger.info('CSP is already ON. Aligning to Sub-elements...')
device.set_csp_obs_state(target_obs_state)
if target_obs_state == 'SCANNING':
return self.monitor_running_command(target_obs_state)
except Exception as msg:
self.logger.info(f'error in thread: {msg}')
def monitor_running_command(self, csp_obs_state):
"""
Helper method to monitor the CSP Subarray observing state at re-initialization if
the observing state is in a transitional state.
NOTE: Currently onlt the SCANNING obsState is handled.
:param csp_obs_state: the CSP.LMC Subarray observing state.
:type csp_obs_state: string
"""
device = self.target
if csp_obs_state == 'SCANNING':
args = (device, device.state_model, self.logger)
handler = device.ScanCommand(*args)
device._command_thread['scan'] = threading.Thread(target=handler.monitor_scan_execution,
name="Thread-Scan",
args=(device._sc_subarray_fqdn,))
device._cmd_execution_state['scan'] = CmdExecState.RUNNING
self.logger.info("Start scan thread")
device._command_thread['scan'].start()
def obs_state_evaluator(self):
"""
Helper method to evaluate the CSP Subarray observing state starting from the
SCM values of its components.
Criteria: If any one of the componts is in a transitional state, the CSP subarray
assumeis that value
If components are in a steady observing state, the values must be equal
otherwise the CSP Subarray is set in FAULT.
A component not ONLINE/MAINTENANCE does not contribute to the observing
state value.
:return: The observing state.
:rtype: string with the observing state name.
"""
target_obs_state = 'FAULT'
obs_states_list = []
allowed_coupled = {'RESOURCING': 'IDLE',
'RESOURCING': 'EMPTY',
'CONFIGURING': 'READY',
'SCANNING': 'READY',
'ABORTING': 'ABORTED',
'RESETTING': 'IDLE',
'RESTARTING': 'EMPTY'
}
transitional_states = allowed_coupled.keys()
for fqdn in self._sc_subarray_fqdn:
self.logger.info("fqdn {} admin mode: {}".format(fqdn, self._sc_subarray_admin_mode[fqdn]))
if self._sc_subarray_admin_mode[fqdn] not in [AdminMode.ONLINE,
AdminMode.MAINTENANCE
]:
continue
obs_state_name = ObsState(self._sc_subarray_obs_state[fqdn]).name
obs_states_list.append(obs_state_name)
self.logger.info("obs_state_evaluator: {}".format(obs_states_list))
# first creates an intersection list
transitional_present = set(transitional_states) & set(obs_states_list)
self.logger.info("transitional_present: {}".format(transitional_present))
# CASE 1: Transitional states NOT present
if not transitional_present:
# CASE 1.1: All obsStates are EQUAL
if len(set(obs_states_list)) == 1:
target_obs_state = obs_states_list[0]
else:
state =list(transitional_present)[0]
for sub_elt_obs_state in obs_states_list:
if sub_elt_obs_state is not (state or allowed_coupled[state]):
break
# CASE 2.2: Other obs_states are ALLOWED
else:
target_obs_state = state
self.logger.info("Evaluated CSP Subarray obsState: {}".format(target_obs_state))
# CASE 1: Transitional states NOT present
return target_obs_state
def set_csp_obs_state(self, state):
"""
Set the Subarray observing state to the specified value.
:param state: The target observing state value
:type state: string
:return: None
"""
try:
if state == 'FAULT':
self.force_cmd_obj.failed()
self.force_cmd_obj(f"force_to_{state.lower()}")
except StateModelError as state_error:
self.logger.warning(state_error)
class OnCommand(SKASubarray.OnCommand):
def do(self):
@@ -442,6 +663,7 @@ class CspSubarray(SKASubarray):
# the dictionary with the scan configuration
self.logger.info("ConfigureCommand at {}".format(time.time()))
target_device = self.target
try:
# if the stored configuration attribute is not empty, check
@@ -594,9 +816,10 @@ class CspSubarray(SKASubarray):
device_done = defaultdict(lambda:False)
# inside the end-less lop check the obsState of each sub-component
device_list = input_arg[0]
self.logger.info("Trhead started at {}".format(time.time()))
while True:
if target_device._abort_obs_event.is_set():
self.logger.info("Received and ABORT request during configuration")
self.logger.info("Received and ABORT request during configuration {}".format(time.time()))
command_progress = 0
for device in device_list:
self.logger.info("Current device {} obsState is {}".format(device,
@@ -619,8 +842,9 @@ class CspSubarray(SKASubarray):
target_device._reconfiguring))
if target_device._sc_subarray_obs_state[device] == dev_successful_state:
if not target_device._reconfiguring:
self.logger.info("Command {} ended with success on device {}.".format(cmd_name,
device))
self.logger.info("Command {} ended with success on device {} at {}.".format(cmd_name,
device,
time.time()))
# update the list and number of device that completed the task
target_device._num_dev_completed_task[cmd_name] += 1
target_device._list_dev_completed_task[cmd_name].append(device)
@@ -665,7 +889,7 @@ class CspSubarray(SKASubarray):
self.logger.info("device {} is in {}: reconfiguring is:{}".format(device, ObsState(target_device._sc_subarray_obs_state[device]).name,
target_device._reconfiguring))
if all(value == True for value in device_done.values()):
self.logger.info("All devices have been handled!")
self.logger.info("All devices have been handled at time {}!".format(time.time()))
break
# check for global timeout expiration
# may be this check is not necessary
@@ -679,6 +903,7 @@ class CspSubarray(SKASubarray):
# end of the while loop
# acquire the mutex during the check of configuration success/failure. We don't want
# to receive an boart during this phase otherwise could happen strange situation
self.logger.info("GOING To lock mutex at {}".format(time.time()))
with target_device._mutex_obs_state:
# check for timeout/failure conditions on each sub-component
if any(target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.TIMEOUT for device in device_list):
@@ -694,8 +919,8 @@ class CspSubarray(SKASubarray):
self.logger.info("CspSubarray timeout flag:{}".format(target_device._timeout_expired))
if target_device._abort_obs_event.is_set():
if target_device._timeout_expired or target_device._failure_raised:
return target_device.abort_cmd_obj.failed()
self.logger.info("Abort configure ends with success!!")
return self.failed()
self.logger.info("Abort configure ends with success!! {}".format(time.time()))
if all(target_device._sc_subarray_obs_state[fqdn] == ObsState.ABORTED for fqdn in device_list):
return target_device.abort_cmd_obj.succeeded()
return target_device.abort_cmd_obj.abort_monitoring(device_list)
@@ -704,16 +929,16 @@ class CspSubarray(SKASubarray):
# if failure/timeout found check if the CBF subarray is configured. In
# this case the CSP.LMC Subarray obsState is set to READY.
if target_device._sc_subarray_obs_state[target_device.CbfSubarray] == ObsState.READY:
return target_device.configure_cmd_obj.succeeded()
return self.succeeded()
self.logger.info("Configure ends with failure")
return target_device.configure_cmd_obj.failed()
return self.failed()
if all(target_device._sc_subarray_obs_state[fqdn] == ObsState.READY for fqdn in device_list):
target_device._valid_scan_configuration = input_arg[1]
target_device._cmd_duration_measured[cmd_name] = time.time() - command_start_time
target_device._cmd_progress[cmd_name] = 100
target_device._last_executed_command = cmd_name
self.logger.info("Configure ends with success!!")
return target_device.configure_cmd_obj.succeeded()
self.logger.info("Configure ends with success!! {}".format(time.time()))
return self.succeeded()
def validate_scan_configuration(self, argin):
"""
@@ -790,13 +1015,17 @@ class CspSubarray(SKASubarray):
target_device._command_thread['scan'] = threading.Thread(target=self.monitor_scan_execution,
name="Thread-Scan",
args=(target_device._sc_subarray_assigned_fqdn,))
self.logger.info("Thread scan: {}".format(target_device._command_thread['scan']))
target_device._cmd_execution_state['scan'] = CmdExecState.RUNNING
target_device._command_thread['scan'].start()
return (ResultCode.STARTED, "Scan command started")
def monitor_scan_execution(self, device_list):
self.logger.info("Starting scan thread")
cmd_name = 'scan'
target_device = self.target
target_device._end_scan_event.clear()
target_device._abort_obs_event.clear()
dev_successful_state = ObsState.READY
target_device._num_dev_completed_task[cmd_name] = 0
target_device._list_dev_completed_task[cmd_name] = []
@@ -812,12 +1041,14 @@ class CspSubarray(SKASubarray):
elapsed_time = 0
starting_time = time.time()
stop_scan = False
target_device._end_scan_event.clear()
target_device._abort_obs_event.clear()
# inside the end-less loop check the obsState of each sub-component
while True:
self.logger.info("abort:{}".format(target_device._abort_obs_event.is_set()))
self.logger.info("end:{}".format(target_device._end_scan_event.is_set()))
#self.logger.info("abort:{}".format(target_device._abort_obs_event.is_set()))
#self.logger.info("end:{}".format(target_device._end_scan_event.is_set()))
if target_device._stop_thread[cmd_name]:
target_device._stop_thread[cmd_name] = False
self.logger.info("STOPPING THE THREAD!!!")
return
if target_device._end_scan_event.is_set() or target_device._abort_obs_event.is_set():
if not stop_scan:
stop_scan = True
@@ -857,7 +1088,6 @@ class CspSubarray(SKASubarray):
def do(self):
target_device = self.target
device_list = target_device._sc_subarray_assigned_fqdn
self.logger.info("EndScan assigned_fqdn: {}".format(device_list))
if not any(target_device._sc_subarray_assigned_fqdn):
# need to add a check also on PSTBeams belonging to subarray
device_list = target_device._sc_subarray_fqdn
@@ -876,7 +1106,7 @@ class CspSubarray(SKASubarray):
self.logger.error("device {}: {}-{}".format(reply.dev_name(), err.desc, err.reason))
else:
(result_code,msg) = reply.get_data()
self.logger.error("device {}: {}".format(reply.dev_name(), msg))
self.logger.info("device {}: {}".format(reply.dev_name(), msg))
if any(target_device._sc_subarray_obs_state[device]== ObsState.FAULT for device in device_list):
return (ResultCode.FAILED, "EndScan Command FAILED")
return (ResultCode.OK, "EndScan command executed OK")
@@ -930,6 +1160,10 @@ class CspSubarray(SKASubarray):
device_done = defaultdict(lambda:False)
# inside the end-less loop check the obsState of each sub-component
while True:
if target_device._stop_thread[cmd_name]:
target_device._stop_thread[cmd_name] = False
self.logger.info("STOPPING THE THREAD!!!")
return
time.sleep(0.1)
for device in device_list:
if device_done[device] == True:
@@ -959,13 +1193,13 @@ class CspSubarray(SKASubarray):
# end of the while loop
# check for timeout/failure conditions on each sub-component
if target_device._failure_raised or target_device._timeout_expired:
return target_device.obsreset_cmd_obj.failed()
return self.failed()
if all(target_device._sc_subarray_obs_state[fqdn] == dev_successful_state for fqdn in device_list):
target_device._cmd_progress[cmd_name] = 100
target_device._last_executed_command = cmd_name
self.logger.info("ObsReset ends with success")
return target_device.obsreset_cmd_obj.succeeded()
return self.succeeded()
class AbortCommand(SKASubarray.AbortCommand):
@@ -1040,13 +1274,13 @@ class CspSubarray(SKASubarray):
# end of the while loop
# check for timeout/failure conditions on each sub-component
if target_device._failure_raised or target_device._timeout_expired:
return target_device.abort_cmd_obj.failed()
return self.failed()
if all(target_device._sc_subarray_obs_state[fqdn] == ObsState.ABORTED for fqdn in device_list):
target_device._cmd_progress[cmd_name] = 100
target_device._last_executed_command = cmd_name
self.logger.info("Abort ends with success")
return target_device.abort_cmd_obj.succeeded()
return self.succeeded()
class RestartCommand(SKASubarray.RestartCommand):
@@ -1089,6 +1323,10 @@ class CspSubarray(SKASubarray):
elapsed_time = 0
starting_time = time.time()
while True:
if target_device._stop_thread[cmd_name]:
target_device._stop_thread[cmd_name] = False
self.logger.info("STOPPING THE THREAD!!!")
return
for device in device_list:
if device_done[device] == True:
continue
@@ -1123,216 +1361,14 @@ class CspSubarray(SKASubarray):
# end of the while loop
# check for timeout/failure conditions on each sub-component
if target_device._failure_raised or target_device._timeout_expired:
return target_device.restart_cmd_obj.failed()
return self.failed()
if all(target_device._sc_subarray_obs_state[fqdn] == ObsState.EMPTY for fqdn in device_list):
target_device._cmd_progress[cmd_name] = 100
target_device._last_executed_command = cmd_name
self.logger.info("Restart ends with success")
return target_device.restart_cmd_obj.succeeded()
'''
class GoToIdleCommand(ActionCommand):
"""
A class for the CSPSubarray's GoToIdle() command.
"""
def __init__(self, target, state_model, logger=None):
super().__init__(
target, state_model, "goto_idle", start_action=True, logger=logger
)
def do(self):
"""
Stateless hook for GoToIdle() command functionality.
:return: A tuple containing a return code and a string
message indicating status. The message is for
information purpose only.
:rtype: (ResultCode, str)
"""
target_device = self.target
device_list = target_device._sc_subarray_assigned_fqdn
if not any(target_device._sc_subarray_assigned_fqdn):
# need to add a check also on PSTBeams belonging to subarray
device_list = target_device._sc_subarray_fqdn
for device in device_list:
# TODO: check if the device is running
# set to 3 sec. the duration expected
target_device._sc_subarray_cmd_duration_expected[device]['gotoidle'] = 3
target_device._cmd_duration_expected['gotoidle']
try:
proxy = target_device._sc_subarray_proxies[device]
# register the starting time for the command
target_device._sc_subarray_cmd_starting_time[device] = time.time()
target_device._timeout_expired = False
target_device._failure_raised = False
proxy.command_inout_asynch("GoToIdle", target_device._cmd_ended_cb)
# read the timeout attribute configured for this command
# if not implemented an AttributeError exception is thrown
# and the default value is used
target_device._sc_subarray_cmd_duration_expected[device]['gotoidle'] = proxy.gotoIdleCmdDurationExpected
except KeyError as key_err:
msg = "GoToIdle execution:no key {} found".format(str(key_err))
self.logger.warning(msg)
return (ResultCode.FAILED, msg)
except tango.DevFailed as tango_err:
msg = "GotToIdle execution: {}".format(tango_err.args[0].desc)
self.logger.warning(msg)
return (ResultCode.FAILED, msg)
except AttributeError as attr_err:
self.logger.info("Attribute {} not exported by device {}".format(str(attr_err),
device))
target_device._sc_subarray_cmd_exec_state[device]['gotoidle'] = CmdExecState.RUNNING
if target_device._sc_subarray_cmd_duration_expected[device]['gotoidle'] > target_device._cmd_duration_expected['gotoidle']:
target_device._cmd_duration_expected['gotoidle'] = target_device._sc_subarray_cmd_duration_expected[device]['gotoidle']
# invoke the constructor for the command thread
target_device._command_thread['gotoidle'] = threading.Thread(target=self._gotoidle,
name="Thread-GotoIdle",
args=(target_device._sc_subarray_assigned_fqdn,))
target_device._cmd_execution_state['gotoidle'] = CmdExecState.RUNNING
target_device._command_thread['gotoidle'].start()
# sleep for a while to let the thread start
#time.sleep(0.2)
# TODO:
# add some check on command exeuction: end state has to be IDLE for each
# sub-array sub-component
message = "GoToIdle command STARTED"
self.logger.info(message)
return (ResultCode.STARTED, message)
return self.succeeded()
def _gotoidle(self, device_list, **args_dict):
"""
Thread target function invoked from GoToIdle method.
It monitors the obsState value of each sub-array sub-component
looking for timeout or failure conditions.
:param device_list: the FQDNs of the sub-array sub-components
:return: None
"""
target_device = self.target
cmd_name = 'gotoidle'
dev_successful_state = ObsState.IDLE
# tango_cmd_name: is the TANGO command name with the capital letter
# In the dictionary keys, is generally used the command name in lower letters
target_device._num_dev_completed_task[cmd_name] = 0
target_device._list_dev_completed_task[cmd_name] = []
target_device._cmd_progress[cmd_name] = 0
target_device._cmd_duration_measured[cmd_name] = 0
# sub-component command execution measured time
sc_cmd_duration_measured = defaultdict(lambda:defaultdict(lambda:0))
# loop on the devices and issue asynchrnously the Configure command
command_progress = target_device._cmd_progress[cmd_name]
# flag to signal when configuration ends on a sub-array sub-component
device_done = defaultdict(lambda:False)
# inside the end-less lop check the obsState of each sub-component
while True:
for device in device_list:
elapsed_time = time.time() - target_device._sc_subarray_cmd_starting_time[device]
sc_cmd_duration_measured[device][cmd_name] = elapsed_time
self.logger.debug("Command {} obs_state: {}".format(cmd_name,
target_device._sc_subarray_obs_state[device]))
if device_done[device] == True:
continue
# if the sub-component execution flag is no more RUNNING, the command has
# ended with or without success. Go to check next device state.
if target_device._sc_subarray_obs_state[device] == dev_successful_state:
self.logger.info("Command {} ended with success on device {}.".format(cmd_name,
device))
# update the list and number of device that completed the task
target_device._num_dev_completed_task[cmd_name] += 1
target_device._list_dev_completed_task[cmd_name].append(device)
# reset the value of the attribute reporting the execution state of
# the command
target_device._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.IDLE
target_device._sc_subarray_cmd_progress[device][cmd_name] = 100
# calculate the execution time for the command as the max value of all the execution times
if sc_cmd_duration_measured[device][cmd_name] >= target_device._cmd_duration_measured[cmd_name]:
target_device._cmd_duration_measured[cmd_name] = sc_cmd_duration_measured[device][cmd_name]
# command success: step to next device
device_done[device] = True
# check for timeout event. A timeout event can be detected in two ways:
# 1- the sub-element implements the 'onTimeoutExpired' attribute configured
# for change event
# 2- the CspMaster periodically checks the time elapsed from the start
# of the command: if the value is greater than the sub-element expected time
# for command execution, the sub-element command execution state is set
# to TIMEOUT
# Note: the second check, can be useful if the timeout event is not received
# (for example for a temporary connection timeout)
#elapsed_time = time.time() - self._sc_subarray_cmd_starting_time[device]
#sc_cmd_duration_measured[device][cmd_name] = elapsed_time
if (elapsed_time > target_device._sc_subarray_cmd_duration_expected[device][cmd_name] or
target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.TIMEOUT):
msg = ("Timeout executing {} command on device {}".format(cmd_name, device))
self.logger.warning(msg)
target_device._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.TIMEOUT
device_done[device] = True
self.logger.info("elapsed_time:{} device {}".format(elapsed_time, device))
# check if sub-element command ended throwing an exception: in this case the
# 'cmd_ended_cb' callback is invoked.
if target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.FAILED:
# execution ended for this sub-element, skip to the next one
device_done[device] = True
# update the progress counter inside the loop taking into account the number of devices
# executing the command
target_device._cmd_progress[cmd_name] = command_progress+ target_device._sc_subarray_cmd_progress[device][cmd_name]/len(device_list)
self.logger.debug("Command {} on device {} obsState {}:".format(cmd_name,device,
target_device._sc_subarray_cmd_exec_state[device][cmd_name]))
if all(value == True for value in device_done.values()):
self.logger.info("All devices have been handled!")
break
self.logger.info("Sleeping...")
time.sleep(0.1)
# end of the while loop
# check for timeout/failure conditions on each sub-component
for device in device_list:
self.logger.info("Device {} running state is : {}".format(device,
target_device._sc_subarray_cmd_exec_state[device][cmd_name]))
if target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.TIMEOUT:
target_device._timeout_expired = True
if target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.FAILED:
target_device._failure_raised = True
# reset sub-component execution flag
# update the progress counter at the end of the loop
target_device._cmd_progress[cmd_name] = command_progress + target_device._sc_subarray_cmd_progress[device][cmd_name]/len(device_list)
self.logger.info("1")
target_device._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.IDLE
if target_device._failure_raised or target_device._timeout_expired:
return target_device.gotoidle_cmd_obj.failed()
self.logger.info("2")
if all(target_device._sc_subarray_obs_state[fqdn] == dev_successful_state for fqdn in device_list):
target_device._last_executed_command = cmd_name
# reset the CSP Subarray command execution flag
target_device._cmd_execution_state[cmd_name] = CmdExecState.IDLE
return target_device.gotoidle_cmd_obj.succeeded()
def check_allowed(self):
"""
Whether this command is allowed to be run in current device
state
:return: True if this command is allowed to be run in
current device state
:rtype: boolean
:raises: DevFailed if this command is not allowed to be run
in current device state
"""
if self.state_model.dev_state == DevState.ON:
return True
msg = "GoToIdle not allowed in State {}".format(self.state_model.dev_state)
tango.Except.throw_exception("API_CommandFailed",
msg,
"GoToIdle",
tango.ErrSeverity.ERR)
#if not self.state_model.obs_state in [ObsState.READY]:
# msg = "GoToIdle not allowed in obsState {}".format(self.state_model.obs_state)
# tango.Except.throw_exception("API_CommandFailed",
# msg,
# "GoToIdle",
# tango.ErrSeverity.ERR)
'''
class GoToIdleCommand(SKASubarray.EndCommand):
def do(self):
target_device = self.target
@@ -1440,9 +1476,9 @@ class CspSubarray(SKASubarray):
self.logger.info(log_msg)
# update CSP sub-array SCM
#07-2020: with the new base classes, transitions are handled via actions.
#if evt.attr_value.name.lower() in ["state", "healthstate", "adminmode", "obsstate"]:
#if evt.attr_value.name.lower() in ["obsstate"]:
# self.update_subarray_state()
if evt.attr_value.name.lower() == "healthstate":
if evt.attr_value.name.lower() in ["healthstate"]:
self._update_subarray_health_state()
except tango.DevFailed as df:
self.logger.error(str(df.args[0].desc))
@@ -1585,33 +1621,21 @@ class CspSubarray(SKASubarray):
Class protected method.
Retrieve the State attribute values of the CSP sub-elements and aggregate
them to build up the CSP global state.
This method should be called only when no command is running.
:param: None
:return: None
"""
self.logger.info("update_subarray_state")
self._update_subarray_health_state()
if self._command_thread:
a = [self._command_thread[cmd_name].is_alive() for cmd_name in self._command_thread.keys()]
self.logger.info("list of running threds:{}".format(a))
if any(self._command_thread[cmd_name].is_alive() for cmd_name in self._command_thread.keys()):
self.logger.info("A command is already running...the obsState is not updated")
return False
# CSP state reflects the status of CBF. Only if CBF is present
# CSP can work. The state of PSS and PST sub-elements only contributes
# to determine the CSP health state.
if self._sc_subarray_state[self.CbfSubarray] == DevState.OFF:
if self._sc_subarray_obs_state[self.CbfSubarray] == ObsState.EMPTY:
self.off_cmd_obj.succeeded()
if self._sc_subarray_state[self.CbfSubarray] == DevState.ON:
if self._sc_subarray_obs_state[self.CbfSubarray] == ObsState.EMPTY:
self.on_cmd_obj.succeeded()
if self._sc_subarray_obs_state[self.CbfSubarray] == ObsState.READY:
self.configure_cmd_obj.succeeded()
#self.set_state(self._sc_subarray_state[self.CbfSubarray])
#self.logger.info("Csp subarray state: {} obsState: {}".format(self.get_state(), self.state_model._obs_state))
return True
# check if a long-running command is in execution
for key, thread in self._command_thread.items():
if thread.is_alive():
self.logger.info("Tread {} is running".format(key))
return
target_obs_state = self.obs_state_evaluator()
if target_obs_state != self._obs_state:
self.set_csp_obs_state(target_obs_state)
def _update_subarray_health_state(self):
"""
@@ -1838,30 +1862,31 @@ class CspSubarray(SKASubarray):
# ----------------
# Class private methods
# ----------------
'''
def _init_state_model(self):
"""
Sets up the state model for the device
"""
self.state_model = CspSubarrayStateModel(
dev_state_callback=self._update_state,
logger=self.logger,
op_state_callback=self._update_state,
admin_mode_callback=self._update_admin_mode,
obs_state_callback=self._update_obs_state,
)
'''
def init_command_objects(self):
"""
Sets up the command objects
Sets up the command objects.
The init_command_method is called after InitCommand in the
SKABaseDevice class.
This means that the command object handler has to be defined into
the InitCommandClass.
"""
super().init_command_objects()
args = (self, self.state_model, self.logger)
self.configure_cmd_obj = self.ConfigureCommand(*args)
self.off_cmd_obj = self.OffCommand(*args)
self.on_cmd_obj = self.OnCommand(*args)
self.scan_cmd_obj = self.ScanCommand(*args)
self.gotoidle_cmd_obj = self.GoToIdleCommand(*args)
self.obsreset_cmd_obj = self.ObsResetCommand(*args)
self.abort_cmd_obj = self.AbortCommand(*args)
self.restart_cmd_obj = self.RestartCommand(*args)
self.register_command_object("GoToIdle", self.GoToIdleCommand(*args))
self.register_command_object("Configure", self.ConfigureCommand(*args))
self.register_command_object("Scan", self.ScanCommand(*args))
@@ -2321,6 +2346,12 @@ class CspSubarray(SKASubarray):
"""
# PROTECTED REGION ID(CspSubarray.delete_device) ENABLED START #
#release the allocated event resources
# check for running threads and stop them
for key, thread in self._command_thread.items():
is_alive = thread.is_alive()
if is_alive:
self._stop_thread[key] = True
thread.join()
event_to_remove = {}
for fqdn in self._sc_subarray_fqdn:
try:
@@ -2844,6 +2875,7 @@ class CspSubarray(SKASubarray):
:return:'DevVarLongStringArray'
"""
self.logger.info("CALL ABORT at time {}".format(time.time()))
with self._mutex_obs_state:
handler = self.get_command_object("Abort")
(result_code, message) = handler()
Loading