diff --git a/csp-lmc-common/csp_lmc_common/CspSubarray.py b/csp-lmc-common/csp_lmc_common/CspSubarray.py index a41ee64d2c79f2e5132ad7830b3572eb782c2a2b..cc81572551f689e008cf858cc34637f844800c01 100644 --- a/csp-lmc-common/csp_lmc_common/CspSubarray.py +++ b/csp-lmc-common/csp_lmc_common/CspSubarray.py @@ -125,18 +125,13 @@ class CspSubarray(SKASubarray): information purpose only. :rtype: (ResultCode, str) """ - super().do() + (result_code, message) = super().do() device = self.target device._build_state = '{}, {}, {}'.format(release.name, release.version, release.description) self.logger.info("Initial state is:{}".format(device.get_state())) device._version_id = release.version # connect to CSP.LMC TANGO DB - #device.set_state(tango.DevState.INIT) - #device._health_state = HealthState.UNKNOWN - #device._admin_mode = AdminMode.ONLINE - #device._obs_mode = ObsMode.IDLE - #device._obs_state = ObsState.IDLE # _config_delay_expected: inherited from the SKAObsDevice base class # Note: the _config_delay_expected could be set equal the max among the sub-elements # sub-arrays expected times for the command @@ -326,7 +321,7 @@ class CspSubarray(SKASubarray): message = "CspSubarray Init command completed OK" self.logger.info(message) - return (ResultCode.OK, message) + return (result_code, message) class OnCommand(SKASubarray.OnCommand): def do(self): @@ -371,7 +366,7 @@ class CspSubarray(SKASubarray): # state of the subarray is READY than subarray is not re-configured. if (stored_configuration == received_configuration) and (target_device._obs_state == ObsState.READY): self.logger.info("Subarray is going to use the same configuration") - return + return (ResultCode.OK, msg) except Exception as e: self.logger.warning(str(e)) # go ahead and parse the received configuration @@ -384,6 +379,9 @@ class CspSubarray(SKASubarray): msg, "Configure", tango.ErrSeverity.ERR) + target_device._reconfiguring = False + if target_device.state_model._obs_state == ObsState.READY: + target_device._reconfiguring = True # Forward the Configure command to the sub-elements # components (subarrays, pst beams) for device in target_device._sc_subarray_assigned_fqdn: @@ -422,7 +420,8 @@ class CspSubarray(SKASubarray): try: # read the timeout configured for the operation on the device - target_device._sc_subarray_duration_expected[device]['configurescan'] = proxy.configureDelayExpected + target_device._sc_subarray_cmd_duration_expected[device]['configurescan'] = target_device._get_expected_delay(proxy, "configureDelayExpected") + self.logger.info("config delay: {}".format(target_device._sc_subarray_cmd_duration_expected[device]['configurescan'])) except AttributeError as attr_err: self.logger.info("No attribute {} on device {}".format(str(attr_err), device)) try: @@ -538,6 +537,7 @@ class CspSubarray(SKASubarray): # Note: the second check, can be useful if the timeout event is not received # (for example for a temporary connection timeout) elapsed_time = time.time() - target_device._sc_subarray_cmd_starting_time[device] + self.logger.info("elapsed_time:{}".format(elapsed_time)) if (elapsed_time > target_device._sc_subarray_cmd_duration_expected[device][cmd_name] or target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.TIMEOUT): @@ -548,7 +548,9 @@ class CspSubarray(SKASubarray): self.logger.info("elapsed_time:{} device {}".format(elapsed_time, device)) # check if sub-element command ended throwing an exception: in this case the # 'cmd_ended_cb' callback is invoked. - if target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.FAILED: + if target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.FAILED or \ + target_device._sc_subarray_obs_state[device] == ObsState.FAULT: + target_device._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.FAILED # execution ended for this sub-element, skip to the next one device_done[device] = True # update the progress counter inside the loop taking into account the number of devices @@ -563,7 +565,6 @@ class CspSubarray(SKASubarray): if all(value == True for value in device_done.values()): self.logger.info("All devices have been handled!") break - self.logger.info("6") # check for global timeout expiration # may be this check is not necessary if target_device._cmd_duration_expected[cmd_name] < (time.time() - command_start_time): @@ -575,9 +576,9 @@ class CspSubarray(SKASubarray): time.sleep(0.1) # end of the while loop # check for timeout/failure conditions on each sub-component - if any(target_device._sc_subarray_cmd_exec_state[device] == CmdExecState.TIMEOUT for device in device_list): + if any(target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.TIMEOUT for device in device_list): target_device._timeout_expired = True - if any(target_device._sc_subarray_cmd_exec_state[device] == CmdExecState.FAILED for device in device_list): + if any(target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.FAILED for device in device_list): target_device._failure_raised = True # reset sub-component execution flag target_device._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.IDLE @@ -600,7 +601,8 @@ class CspSubarray(SKASubarray): target_device._cmd_duration_measured[cmd_name] = time.time() - command_start_time target_device._cmd_progress[cmd_name] = 100 target_device._last_executed_command = cmd_name - target_device.configure_cmd_obj.succeeded() + self.logger.info("Configure ends with success!!") + return target_device.configure_cmd_obj.succeeded() else: return target_device.configure_cmd_obj.failed() @@ -640,118 +642,128 @@ class CspSubarray(SKASubarray): class ScanCommand(SKASubarray.ScanCommand): def do(self, argin): - - device = self.target + target_device = self.target try: - device._scan_id = int(argin[0]) + target_device._scan_id = int(argin[0]) except (ValueError, Exception) as err: msg = "Scan command invalid argument:{}".format(str(err)) self.logging.error(msg) return (ResultCode.FAILED, msg) # invoke the constructor for the command thread - self.logger.info("Received Scan command with id:{}".format(self._scan_id)) - for device in device._sc_subarray_assigned_fqdn: + self.logger.info("Received Scan command with id:{}".format(target_device._scan_id)) + self.logger.info("sc_subarray_assigned_fqdn: {}".format(target_device._sc_subarray_assigned_fqdn)) + for device in target_device._sc_subarray_assigned_fqdn: try: - proxy = device._sc_subarray_proxies[device] - if not device._sc_subarray_event_id[device]['scancmdprogress']: + proxy = target_device._sc_subarray_proxies[device] + if not target_device._sc_subarray_event_id[device]['scancmdprogress']: evt_id = proxy.subscribe_event("scanCmdProgress", tango.EventType.CHANGE_EVENT, - device._attributes_change_evt_cb, + target_device._attributes_change_evt_cb, stateless=False) - device._sc_subarray_event_id[device]['scancmdprogress'] = evt_id + target_device._sc_subarray_event_id[device]['scancmdprogress'] = evt_id except KeyError as key_err: self.logger.warning("No key {} found".format(key_err)) except tango.DevFailed as tango_err: self.logger.info(tango_err.args[0].desc) try: - proxy.command_inout_asynch("Scan", argin[0], device._cmd_ended_cb) + self.logger.info("Forwarding scan") + proxy.command_inout_asynch("Scan", argin[0], target_device._cmd_ended_cb) + self.logger.info("after Forwarding scan") except tango.DevFailed as tango_err: self.logger.info(tango_err.args[0].desc) # TODO: add check on the failed device. If CBF # throw an exception. - # invoke the constructor for the command thread - device._command_thread['scan'] = threading.Thread(target=self.__monitor_scan_execution, + # invoke the constructor for the command thread + target_device._command_thread['scan'] = threading.Thread(target=self.__monitor_scan_execution, name="Thread-Scan", - args=(device._sc_subarray_assigned_fqdn,)) - device._cmd_execution_state['scan'] = CmdExecState.RUNNING - device._command_thread['scan'].start() - return (ResultCode.STARTED, "Scan command started") - - def __monitor_scan_execution(self, device_list): - cmd_name = 'scan' - target_device = self.target - dev_successful_state = ObsState.READY - target_device._num_dev_completed_task[cmd_name] = 0 - target_device._list_dev_completed_task[cmd_name] = [] - target_device._cmd_progress[cmd_name] = 0 + args=(target_device._sc_subarray_assigned_fqdn,)) + target_device._cmd_execution_state['scan'] = CmdExecState.RUNNING + target_device._command_thread['scan'].start() + return (ResultCode.STARTED, "Scan command started") + + def __monitor_scan_execution(self, device_list): + cmd_name = 'scan' + target_device = self.target + dev_successful_state = ObsState.READY + target_device._num_dev_completed_task[cmd_name] = 0 + target_device._list_dev_completed_task[cmd_name] = [] + target_device._cmd_progress[cmd_name] = 0 + for device in device_list: + target_device._failure_message[cmd_name] = '' + target_device._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.RUNNING + self.logger.info("Device {} State {} expected value {}".format(device, + target_device._sc_subarray_obs_state[device], + dev_successful_state)) + # flag to signal when configuration ends on a sub-array sub-component + device_done = defaultdict(lambda:False) + elapsed_time = 0 + starting_time = time.time() + target_device._end_scan_event.clear() + # inside the end-less loop check the obsState of each sub-component + while True: + # Note: CbfSubarray changes the obsState value after forwarding the command + # (synchrnously) to FSP and VCC devices. This means that When the thread function enters, + # the loop, the obsState is still READY and the function exits immediately. + # Adding the wait delay here let the CbfSubarray to change the obsState and + # generate the event. + # Need to modify the CbfSubarray behavior + # TODO: add check of abort_command_event. + #if self._abort_command_event.is_set(): + # dev_successful_state = ObsState.IDLE + #if self._end_scan_event.is_set() or self._abort_command_event.is_set(): for device in device_list: - target_device._failure_message[cmd_name] = '' - target_device._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.RUNNING - self.logger.info("Device {} State {} expected value {}".format(device, - target_device._sc_subarray_obs_state[device], - dev_successful_state)) - # flag to signal when configuration ends on a sub-array sub-component - device_done = defaultdict(lambda:False) - elapsed_time = 0 - starting_time = time.time() - self._end_scan_event.clear() - # inside the end-less lop check the obsState of each sub-component - while True: - # Note: CbfSubarray changes the obsState value after forwarding the command - # (synchrnously) to FSP and VCC devices. This means that When the thread function enters, - # the loop, the obsState is still READY and the function exits immediately. - # Adding the wait delay here let the CbfSubarray to change the obsState and - # generate the event. - # Need to modify the CbfSubarray behavior - # TODO: add check of abort_command_event. - #if self._abort_command_event.is_set(): - # dev_successful_state = ObsState.IDLE - #if self._end_scan_event.is_set() or self._abort_command_event.is_set(): - if target_device._end_scan_event.is_set(): - for device in device_list: - if device_done[device] == True: - continue - # if the sub-component execution flag is no more RUNNING, the command has - # ended with or without success. Go to check next device state. - if target_device._sc_subarray_obs_state[device] == dev_successful_state: - self.logger.info("Command {} ended with success on device {}.".format(cmd_name, - device)) - # update the list and number of device that completed the task - target_device._num_dev_completed_task[cmd_name] += 1 - target_device._list_dev_completed_task[cmd_name].append(device) - # reset the value of the attribute reporting the execution state of - # the command - target_device._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.IDLE - target_device._sc_subarray_cmd_progress[device][cmd_name] = 100 - # command success: step to next device - device_done[device] = True - # check if sub-element command ended throwing an exception: in this case the - # 'cmd_ended_cb' callback is invoked. - if target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.FAILED: - # execution ended for this sub-element, skip to the next one - device_done[device] = True - if any(device_done.values()) and all(value == True for value in device_done.values()): - self.logger.info("All devices have been handled!") - break - time.sleep(0.2) - - # end of the while loop - # check for timeout/failure conditions on each sub-component - if any(target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.FAILED for device in device_list): - target_device._failure_raised = True - # reset sub-component execution flag - target_device._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.IDLE - target_device.scan_cmd_obj.failed() - # update the progress counter at the end of the loop - if all(target_device._sc_subarray_obs_state[fqdn] == dev_successful_state for fqdn in device_list): - target_device._cmd_progress[cmd_name] = 100 - elapsed_time = time.time() - starting_time - self.logger.info("Scan elapsed time:{}".format(elapsed_time)) - target_device._cmd_execution_state[cmd_name] = CmdExecState.IDLE - target_device._last_executed_command = cmd_name - target_device.scan_cmd_obj.succeeded() - - class EndScanCommand(SKASubarray.ScanCommand): + if device_done[device] == True: + continue + # if the sub-component execution flag is no more RUNNING, the command has + # ended with or without success. Go to check next device state. + if target_device._sc_subarray_obs_state[device] == dev_successful_state: + if target_device._end_scan_event.is_set(): + self.logger.info("Command {} ended with success on device {}.".format(cmd_name, + device)) + # update the list and number of device that completed the task + target_device._num_dev_completed_task[cmd_name] += 1 + target_device._list_dev_completed_task[cmd_name].append(device) + # reset the value of the attribute reporting the execution state of + # the command + target_device._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.IDLE + target_device._sc_subarray_cmd_progress[device][cmd_name] = 100 + # command success: step to next device + device_done[device] = True + # check if sub-element command ended throwing an exception: in this case the + # 'cmd_ended_cb' callback is invoked. + if target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.FAILED or\ + target_device._sc_subarray_obs_state[device] == ObsState.FAULT: + # execution ended for this sub-element, skip to the next one + target_device._failure_raised = True + device_done[device] = True + target_device._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.IDLE + # TODO: handle connection problems + #if target_device._sc_subarray_state[device] == DevState.UNKNOWN: + # self.logger.warning("Connection with device {} temporaly down".format(device)) + if any(device_done.values()) and all(value == True for value in device_done.values()): + self.logger.info("All devices have been handled!") + break + self.logger.info("Going to sleep") + time.sleep(0.2) + + # end of the while loop + target_device._cmd_execution_state[cmd_name] = CmdExecState.IDLE + # check for timeout/failure conditions on each sub-component + if any(target_device._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.FAILED for device in device_list): + target_device._failure_raised = True + #return target_device.scan_cmd_obj.failed() + return + # update the progress counter at the end of the loop + if all(target_device._sc_subarray_obs_state[fqdn] == dev_successful_state for fqdn in device_list): + target_device._cmd_progress[cmd_name] = 100 + elapsed_time = time.time() - starting_time + self.logger.info("Scan elapsed time:{}".format(elapsed_time)) + target_device._last_executed_command = cmd_name + #return target_device.scan_cmd_obj.succeeded() + self.logger.info("Exiting from the scan thread!") + return + + class EndScanCommand(SKASubarray.EndScanCommand): def do(self): target_device = self.target @@ -762,7 +774,10 @@ class CspSubarray(SKASubarray): for device in device_list: try: proxy = target_device._sc_subarray_proxies[device] - proxy.command_inout_asynch("EndScan", target_device._cmd_ended_cb) + #proxy.command_inout_asynch("EndScan", target_device._cmd_ended_cb) + # Note: at the moment EndScan does not support STARTED code so can't start + # asynchrnously + proxy.command_inout("EndScan") except KeyError as key_err: self.logger.warning("No key {} found".format(key_err)) except tango.DevFailed as tango_err: @@ -771,7 +786,7 @@ class CspSubarray(SKASubarray): # signal the failure raising the failure flag? # set the threading endScan event target_device._end_scan_event.set() - return (ResultCode.OK, "EndScan command executed OK") + return (ResultCode.STARTED, "EndScan command executed STARTED") ''' class AbortCommand(SKASubarray.AbortCommand): @@ -1058,6 +1073,7 @@ class CspSubarray(SKASubarray): if evt.attr_value.name.lower() in ["state", "healthstate", "adminmode", "obsstate"]: self.logger.info("call to update_subarray_state()") self.update_subarray_state() + self.logger.info("after update_subarray_state()") except tango.DevFailed as df: self.logger.error(str(df.args[0].desc)) except Exception as except_occurred: @@ -1152,15 +1168,17 @@ class CspSubarray(SKASubarray): if evt: if not evt.err: if evt.argout[0] == ResultCode.STARTED: - self.logger.info("Device {} is processing the command {}".format(evt.device, + self.logger.info("Device {} is processing the command {}".format(evt.device.dev_name(), evt.cmd_name)) if evt.argout[0] == ResultCode.OK: - self.logger.info("Device {} successfully processed the command {}".format(evt.device, + self.logger.info("Device {} successfully processed the command {}".format(evt.device.dev_name(), evt.cmd_name)) if evt.argout[0] == ResultCode.FAILED: - self.logger.info("Failure in Device {} while processing the command {}".format(evt.device, + self.logger.info("Failure in Device {} while processing the command {}".format(evt.device.dev_name(), evt.cmd_name)) self._sc_subarray_cmd_exec_state[evt.device.dev_name()][evt.cmd_name.lower()] = CmdExecState.FAILED + self.logger.info("sc_subarray_cmd_exec_state[{}][{}]:{}".format(evt.device.dev_name(), evt.cmd_name.lower(), + self._sc_subarray_cmd_exec_state[evt.device.dev_name()][evt.cmd_name.lower()])) self._failure_message[evt.cmd_name.lower()] += evt.argout[1] else: msg = "Error!!Command {} ended on device {}.\n".format(evt.cmd_name, @@ -1209,13 +1227,16 @@ class CspSubarray(SKASubarray): self.set_state(self._sc_subarray_state[self.CbfSubarray]) if self.get_state() == DevState.OFF: if self._sc_subarray_obs_state[self.CbfSubarray] == ObsState.EMPTY: + self.logger.info("1") self.off_cmd_obj.succeeded() if self.get_state() == DevState.ON: if self._sc_subarray_obs_state[self.CbfSubarray] == ObsState.EMPTY: + self.logger.info("2") self.on_cmd_obj.succeeded() if self._sc_subarray_obs_state[self.CbfSubarray] == ObsState.READY: self.configure_cmd_obj.succeeded() - self.logger.info("Csp subarray state: {} obsState: {}".format(self.get_state(), self.state_model.obs_state)) + self.logger.info("3") + #self.logger.info("Csp subarray state: {} obsState: {}".format(self.get_state(), self.state_model._obs_state)) return True def _update_subarray_health_state(self): @@ -1297,6 +1318,14 @@ class CspSubarray(SKASubarray): "Subarray obsState:{}".format( self._obs_state)) self.logger.info("Subarray ObsState:{}".format( self._obs_state)) ''' + def _open_connection(self, fqdn): + device_proxy = DeviceProxy(fqdn) + return device_proxy + + def _get_expected_delay(self, attr_name, proxy): + attr_value = proxy.read_attribute(attr_name) + return attr_value.value() + def connect_to_subarray_subcomponent(self, fqdn): """ @@ -1314,18 +1343,18 @@ class CspSubarray(SKASubarray): return # read the sub-componet adminMode (memorized) attribute from # the CSP.LMC TANGO DB. - #attribute_properties = self._csp_tango_db.get_device_attribute_property(fqdn, - # {'adminMode': ['__value']}) - #self.logger.debug("fqdn: {} attribute_properties: {}".format(fqdn, attribute_properties)) - #try: - # admin_mode_memorized = attribute_properties['adminMode']['__value'] - # self._sc_subarray_admin_mode[fqdn] = int(admin_mode_memorized[0]) - #except KeyError as key_error: - # self.logger.warning("No key {} found".format(str(key_error))) + attribute_properties = self._csp_tango_db.get_device_attribute_property(fqdn, + {'adminMode': ['__value']}) + self.logger.debug("fqdn: {} attribute_properties: {}".format(fqdn, attribute_properties)) + try: + admin_mode_memorized = attribute_properties['adminMode']['__value'] + self._sc_subarray_admin_mode[fqdn] = int(admin_mode_memorized[0]) + except KeyError as key_error: + self.logger.warning("No key {} found".format(str(key_error))) try: log_msg = "Trying connection to " + str(fqdn) + " device" self.logger.info(log_msg) - device_proxy = DeviceProxy(fqdn) + device_proxy = self._open_connection(fqdn) self.logger.info("fqdn: {} device_proxy: {}".format(fqdn, device_proxy)) # Note: The DeviceProxy is initialized even if the sub-component # device is not running (but defined into the TANGO DB! If not defined in the @@ -1364,11 +1393,11 @@ class CspSubarray(SKASubarray): stateless=True) self._sc_subarray_event_id[fqdn]['obsState'] = ev_id - ev_id = device_proxy.subscribe_event("obsMode", - EventType.CHANGE_EVENT, - self._sc_scm_change_event_cb, - stateless=True) - self._sc_subarray_event_id[fqdn]['obsMode'] = ev_id + #ev_id = device_proxy.subscribe_event("obsMode", + # EventType.CHANGE_EVENT, + # self._sc_scm_change_event_cb, + # stateless=True) + #self._sc_subarray_event_id[fqdn]['obsMode'] = ev_id except KeyError as key_err: log_msg = ("No key {} found".format(str(key_err))) diff --git a/csp-lmc-mid/csp_lmc_mid/MidCspSubarrayBase.py b/csp-lmc-mid/csp_lmc_mid/MidCspSubarrayBase.py index 3342002ae20485d7326231bca4cb3af4ba655314..2e53558838e484a9cbdfcd825a19c994da582784 100644 --- a/csp-lmc-mid/csp_lmc_mid/MidCspSubarrayBase.py +++ b/csp-lmc-mid/csp_lmc_mid/MidCspSubarrayBase.py @@ -23,6 +23,7 @@ import threading import time import json from collections import defaultdict +import logging # PROTECTED REGION END# //MidCspSubarrayBase.standardlibray_import @@ -46,6 +47,7 @@ from ska.base.control_model import HealthState, AdminMode, ObsState from csp_lmc_common.utils.cspcommons import CmdExecState #from csp_lmc_common.utils.decorators import AdminModeCheck, ObsStateCheck, SubarrayRejectCmd from csp_lmc_common.CspSubarray import CspSubarray, CspSubarrayStateModel +from csp_lmc_mid.receptors import Receptors # PROTECTED REGION END # // MidCspSubarrayBase.additionnal_import __all__ = ["MidCspSubarrayBase", "main"] @@ -80,27 +82,69 @@ class MidCspSubarrayBase(CspSubarray): information purpose only. :rtype: (ResultCode, str) """ - super().do() + (result_code, message) = super().do() device = self.target self.logger.info("Initial state is:{}".format(device.get_state())) device._assigned_vcc = [] device._assigned_fsp = [] - device._receptor_to_vcc_map = {} - device._receptor_id_list = [] + #device._receptor_to_vcc_map = {} + #device._receptor_id_list = [] + device._receptors = Receptors(device.CspMaster) message = "MidCspSubarray Init command completed OK" self.logger.info(message) - return (ResultCode.OK, message) + return (result_code, message) class OnCommand(CspSubarray.OnCommand): # NOTE: To remove when CBF Subarray implements the correct state def do(self): - self.logger.info("dev_State:{}".format(self.state_model.dev_state)) - super().do() - return (ResultCode.OK, "On command completed OK") + self.logger.info("MidCspSubarray On state:{}".format(self.state_model.dev_state)) + return super().do() + #return (ResultCode.OK, "On command completed OK") class AddReceptorsCommand(SKASubarray.AssignResourcesCommand): def do(self, argin): + self.logger.info("AddReceptors do method") device = self.target + # check if the CbfSubarray TANGO device is already running + # and the proxy registered + if not device._is_sc_subarray_running(device.CbfSubarray): + return (ResultCode.FAILED, log_msg) + proxy = device._sc_subarray_proxies[device.CbfSubarray] + device._reconfiguring = False + if self.state_model._obs_state == ObsState.IDLE: + device._reconfiguring = True + # forward the command to the CbfSubarray + try: + receptor_to_assign = self._validate_receptors_list(argin) + if not any(receptor_to_assign): + return (ResultCode.OK, "AddReceptors end OK") + proxy.command_inout_asynch("AddReceptors", receptor_to_assign, device._cmd_ended_cb) + + device._command_thread['addreceptors'] = threading.Thread(target=device._monitor_add_receptors, + name="Thread-AddReceptors", + args=(receptor_to_assign,)) + device._sc_subarray_cmd_starting_time[device.CbfSubarray] = time.time() + if device._cmd_execution_state['addreceptors'] != CmdExecState.FAILED: + device._cmd_execution_state['addreceptors'] = CmdExecState.RUNNING + device._command_thread['addreceptors'].start() + self.logger.info("AddReceptors obsState::{}".format(device.state_model.obs_state)) + return (ResultCode.STARTED, "AddReceptors Started") + except tango.DevFailed as tango_err: + self.logger.info("FAILURE!!") + tango.Except.throw_exception("Command failed", + tango_err.args[0].desc, + "AddReceptors", + tango.ErrSeverity.ERR) + return (ResultCode.FAILED, "Failure in AddReceptors") + + def _validate_receptors_list(self, argin): + """ + Validate the list of the receptor IDs. + The method check if: + - the receptor Ids are valid that is are within the [1, 198] range + - the receptor Ids are not already allocated to other subarray + """ + # PROTECTED REGION ID(CspSubarray.AddReceptors) ENABLED START # # Each vcc_id map to a vcc_fqdn inside CbfMaster, for example: # vcc_id = 1 -> mid_csp_cbf/vcc/vcc_001 @@ -111,48 +155,32 @@ class MidCspSubarrayBase(CspSubarray): # is built by CbfMaster and exported as attribute. # The max number of VCC allocated is defined by the VCC property of the CBF Master. - # the list of available receptor IDs. This number is mantained by the CspMaster - # and reported on request. + device = self.target available_receptors = [] # the list of receptor to assign to the subarray receptor_to_assign = [] + #receptors = Receptors(device.CspMaster) try: - # access to CspMaster to get information about the list of available receptors - # and the receptors affiliation to subarrays. - csp_master_proxy = DeviceProxy(device.CspMaster) - csp_master_proxy.ping() - available_receptors = csp_master_proxy.unassignedReceptorIDs - if not device._receptor_to_vcc_map: - cbf_master_addr = csp_master_proxy.cbfMasterAddress - cbf_master_proxy = tango.DeviceProxy(cbf_master_addr) - cbf_master_proxy.ping() - # build the list of receptor ids - receptor_to_vcc = cbf_master_proxy.receptorToVcc - device._receptor_to_vcc_map = dict([int(ID) for ID in pair.split(":")] - for pair in receptor_to_vcc) - # build the list of the installed receptors - device._receptor_id_list = list(device._receptor_to_vcc_map.keys()) + available_receptors = device._receptors.unassigned_ids() + self.logger.info("available_receptors: {}".format(available_receptors)) if not any(available_receptors): - log_msg = "No available receptor to add to subarray {}".format( - device.SubID) - self.logger.warning(log_msg) - return (ResultCode.OK, "AddResources OK") - receptor_membership = csp_master_proxy.receptorMembership + log_msg = "No available receptor to add to subarray {}".format( device.SubID) + self.logger.info(log_msg) + return [] + # the list of available receptor IDs. This number is mantained by the CspMaster + # and reported on request. + receptor_id_list = device._receptors.list_of_ids() + self.logger.info("receptor_list:{}".format(receptor_id_list)) + receptor_membership = device._receptors.subarray_affiliation() except tango.DevFailed as df: msg = "Failure in getting receptors information:" + \ str(df.args[0].reason) - return (ResultCode.FAILED, msg) - except AttributeError as attr_err: - msg = "Failure in reading {}: {}".format( - str(attr_err.args[0]), attr_err.__doc__) - tango.Except.throw_exception("AttributeError", - msg, - "AddReceptorsCommand", - tango.ErrSeverity.ERR) + tango.Except.throw_exception("Command failed", msg, + "AddReceptors", tango.ErrSeverity.ERR) for receptorId in argin: # check if the specified receptor id is a valid number (that is, belongs to the list # of provided receptors) - if receptorId in device._receptor_id_list: + if receptorId in receptor_id_list: # check if the receptor id is one of the available receptor Ids if receptorId in available_receptors: receptor_to_assign.append(receptorId) @@ -170,39 +198,14 @@ class MidCspSubarrayBase(CspSubarray): if not receptor_to_assign: log_msg = "No receptors to assign to the subarray" self.logger.info(log_msg) - return (ResultCode.OK, log_msg) + return [] - # check if the CbfSubarray TANGO device is already running - # and the proxy registered - if not device._is_sc_subarray_running(device.CbfSubarray): - log_msg = "Device {} is not running!".format(str(device.CbfSubarray)) - self.logger.error(log_msg) - return (ResultCode.FAILED, log_msg) - proxy = device._sc_subarray_proxies[device.CbfSubarray] # remove possible receptor repetition tmp = set(receptor_to_assign) receptor_to_assign = list(tmp) - # forward the command to the CbfSubarray - try: - proxy.command_inout_asynch("AddReceptors", receptor_to_assign, device._cmd_ended_cb) - - device._command_thread['addreceptors'] = threading.Thread(target=device._monitor_add_receptors, - name="Thread-AddReceptors", - args=(receptor_to_assign,)) - device._sc_subarray_cmd_starting_time[device.CbfSubarray] = time.time() - if device._cmd_execution_state['addreceptors'] != CmdExecState.FAILED: - device._cmd_execution_state['addreceptors'] = CmdExecState.RUNNING - device._command_thread['addreceptors'].start() - self.logger.info("AddReceptors obsState::{}".format(device.state_model.obs_state)) - return (ResultCode.STARTED, "AddReceptors Started") - except tango.DevFailed as tango_err: - tango.Except.throw_exception("Command failed", - tango_err.args[0].desc, - "AddReceptors", - tango.ErrSeverity.ERR) - return (ResultCode.FAILED, "Failure in AddReceptors") - + return receptor_to_assign # PROTECTED REGION END # // MidCspSubarrayBase.AddReceptors + #def succeeded(self): # self.logger.info("Eccomi") # action = "add_receptors_succeeded" @@ -291,7 +294,8 @@ class MidCspSubarrayBase(CspSubarray): device = self.target try: if len(device): - receptors = device._get_cbfsubarray_assigned_receptors() + #receptors = device._get_cbfsubarray_assigned_receptors() + receptors = assigned_receptors = self._receptors.assigned_to_subarray(self.SubID) return device._removereceptors_cmd_obj.do(receptors[:]) return (ResultCode.OK, "No receptor to remove") except tango.DevFailed as df: @@ -310,6 +314,10 @@ class MidCspSubarrayBase(CspSubarray): log_msg = ("Configure Command failure. Reason: {} " "Desc: {}".format(tango_err.args[0].reason, tango_err.args[0].desc)) + #tango.Except.throw_exception("Command failed", + # log_msg, + # "ConfigureScan execution", + # tango.ErrSeverity.ERR) self.logger.error(log_msg) return (ResultCode.FAILED, log_msg) @@ -389,7 +397,8 @@ class MidCspSubarrayBase(CspSubarray): self.logger.debug("Validate scan: {}".format(device._sc_subarray_assigned_fqdn)) def __len__(self): - assigned_receptors = self._get_cbfsubarray_assigned_receptors() + assigned_receptors = self._receptors.assigned_to_subarray(int(self.SubID)) + self.logger.info("len assigned_receptors:{}".format(assigned_receptors)) return len(assigned_receptors) def update_subarray_state(self): @@ -405,14 +414,22 @@ class MidCspSubarrayBase(CspSubarray): self.logger.info("MidCspSubarray") if CspSubarray.update_subarray_state(self): if self.get_state() == DevState.ON: + self.logger.info("1") if self._sc_subarray_obs_state[self.CbfSubarray] == ObsState.IDLE: + self.logger.info("2") self._addreceptors_cmd_obj.succeeded() - if self.get_state() == DevState.ON: if self._sc_subarray_obs_state[self.CbfSubarray] == ObsState.EMPTY: + self.logger.info("3") self._removereceptors_cmd_obj.succeeded() - self.logger.info("MidCsp subarray state: {} obsState: {}".format(self.get_state(), self.state_model.obs_state)) + self.logger.info("MidCsp subarray state: {} obsState: {}".format(self.get_state(), self.state_model._obs_state)) + self.logger.info("update_state MidCspSubarray end ") return True + def _open_connection(self, fqdn): + device_proxy = DeviceProxy(fqdn) + return device_proxy + + ''' def _get_cbfsubarray_assigned_receptors(self): """ @@ -428,6 +445,7 @@ class MidCspSubarrayBase(CspSubarray): except tango.DevFailed as tango_err: self.logger.warning("__monitor_add_receptors:{}".format(tango_err.args[0].desc)) return receptors + ''' def _monitor_add_receptors(self, receptors_to_be_added, args_dict=None): cmd_name = 'addreceptors' @@ -443,30 +461,36 @@ class MidCspSubarrayBase(CspSubarray): self.logger.info("Going to assign receptors {}".format(receptors_to_be_added)) while True: - # read the list of receptor IDs assigned to the CbfSubarray - receptors = self._get_cbfsubarray_assigned_receptors() - if len(receptors): - # get the ids in receptor_list that are also in receptor - receptors_assigned = [value for value in receptors_to_be_added if value in receptors] - self._num_dev_completed_task[cmd_name] = len(receptors_assigned) - if (len(receptors_assigned) == len(receptors_to_be_added)) and (self.get_state() == tango.DevState.ON): + self.logger.info("self._sc_subarray_obs_state:{}".format(self._sc_subarray_obs_state[device])) + if self._sc_subarray_obs_state[device] == ObsState.IDLE: + self.logger.info("Reconfiguring is:{}".format(self._reconfiguring)) + assigned_receptors = self._receptors.assigned_to_subarray(self.SubID) + self.logger.info("assigned_receptors:{}".format(assigned_receptors)) + if not self._reconfiguring: + self._num_dev_completed_task[cmd_name] = len(assigned_receptors) self.logger.info("All required receptors assigned!!") self._sc_subarray_cmd_progress[device][cmd_name] = 100 # calculate the real execution time for the command self._cmd_duration_measured[cmd_name] = ( time.time() - self._sc_subarray_cmd_starting_time[device]) break + if self._sc_subarray_obs_state[device] == ObsState.RESOURCING: + self._reconfiguring = False + self.logger.info("Reconfiguring is:{}".format(self._reconfiguring)) # check if sub-element command ended throwing an exception: in this case the # 'cmd_ended_cb' callback is invoked. - if self._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.FAILED: + if self._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.FAILED or\ + self._sc_subarray_obs_state[device] == ObsState.FAULT: self._failure_raised = True break elapsed_time = time.time() - \ self._sc_subarray_cmd_starting_time[device] + self.logger.info("elapsed_time: {}".format(elapsed_time)) + self.logger.info("_sc_subarray_cmd_duration_expected: {}".format(self._sc_subarray_cmd_duration_expected[device][cmd_name])) if (elapsed_time > self._sc_subarray_cmd_duration_expected[device][cmd_name] or self._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.TIMEOUT): - msg = ("Timeout executing {} command on device {}".format( - cmd_name, device)) + self._timeout_expired = True + msg = ("Timeout executing {} command on device {}".format(cmd_name, device)) self.logger.warning(msg) self._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.TIMEOUT break @@ -479,21 +503,14 @@ class MidCspSubarrayBase(CspSubarray): self._last_executed_command = cmd_name # update the progress counter at the end of the loop self._cmd_progress[cmd_name] = self._sc_subarray_cmd_progress[device][cmd_name] + self._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.IDLE + self._cmd_execution_state[cmd_name] = CmdExecState.IDLE # check for error conditions - if self._sc_subarray_state[self.CbfSubarray] != tango.DevState.ON: - msg = ("AddReceptors: device {} is in {}" - " State".format(device, self._sc_subarray_state[self.CbfSubarray])) - self.logger.warning(msg) - if self._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.TIMEOUT: - self._timeout_expired = True - self._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.IDLE - self._cmd_execution_state[cmd_name] = CmdExecState.IDLE - return (ResultCode.FAILED, "Timeout in receptor assignment!!") self.logger.info("CspSubarray failure flag:{}".format(self._failure_raised)) self.logger.info("CspSubarray timeout flag:{}".format(self._timeout_expired)) + if self._timeout_expired or self._failure_raised: + return self._addreceptors_cmd_obj.failed() # reset the command execution state - self._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.IDLE - self._cmd_execution_state[cmd_name] = CmdExecState.IDLE self.logger.info("AddReceptors end!") #self.AddReceptorsCommand.succeeded() return self._addreceptors_cmd_obj.succeeded() @@ -556,6 +573,8 @@ class MidCspSubarrayBase(CspSubarray): self._last_executed_command = cmd_name # update the progress counter at the end of the loop self._cmd_progress[cmd_name] = self._sc_subarray_cmd_progress[device][cmd_name] + self.logger.info("CspSubarray failure flag:{}".format(self._failure_raised)) + self.logger.info("CspSubarray timeout flag:{}".format(self._timeout_expired)) # check for error conditions if self._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.FAILED: self._failure_raised = True @@ -569,8 +588,6 @@ class MidCspSubarrayBase(CspSubarray): if self._sc_subarray_cmd_exec_state[device][cmd_name] == CmdExecState.TIMEOUT: self._timeout_expired = True return (ResultCode.FAILED, "RemoveReceptors ended with timeout") - self.logger.info("CspSubarray failure flag:{}".format(self._failure_raised)) - self.logger.info("CspSubarray timeout flag:{}".format(self._timeout_expired)) # reset the command exeuction state self._sc_subarray_cmd_exec_state[device][cmd_name] = CmdExecState.IDLE self._cmd_execution_state[cmd_name] = CmdExecState.IDLE @@ -731,6 +748,8 @@ class MidCspSubarrayBase(CspSubarray): # PROTECTED REGION ID(CspSubarray.vcc_read) ENABLED START # self._assigned_vcc.clear() # get the map VCC-receptor from CspMaster + #receptor_id_list = self._receptors.list_of_ids() + ''' if not self._receptor_to_vcc_map: try: csp_proxy = tango.DeviceProxy(self.CspMaster) @@ -746,12 +765,15 @@ class MidCspSubarrayBase(CspSubarray): self._receptor_id_list = list(self._receptor_to_vcc_map.keys()) except tango.DevFailed as tango_err: self.logger.warning(tango_err.args[0].desc) + ''' try: - assigned_receptors = self._sc_subarray_proxies[self.CbfSubarray].receptors + #assigned_receptors = self._sc_subarray_proxies[self.CbfSubarray].receptors + assigned_receptors = self._receptors.assigned_to_subarray(self.SubID) # NOTE: if receptors attribute is empty, assigned_receptors is an empty numpy array # and it will just be skipped by the for loop + receptor_to_vcc_map = self._receptors.receptor_to_vcc_map for receptor_id in assigned_receptors: - vcc_id = self._receptor_to_vcc_map[receptor_id] + vcc_id = receptor_to_vcc_map[receptor_id] self._assigned_vcc.append(vcc_id) except KeyError as key_err: msg = "No {} found".format(key_err) diff --git a/csp-lmc-mid/csp_lmc_mid/receptors.py b/csp-lmc-mid/csp_lmc_mid/receptors.py new file mode 100644 index 0000000000000000000000000000000000000000..2b0f3adfcc76f099b71d3c2662d84843eea2470a --- /dev/null +++ b/csp-lmc-mid/csp_lmc_mid/receptors.py @@ -0,0 +1,126 @@ +import tango +import logging +import time +from tango import DeviceProxy + +LOGGER = logging.getLogger(__name__) +class Receptors: + """ + Class to handle information about the Mid receptors. + """ + def __init__(self, csp_master_fqdn): + self._csp_master_fqdn = csp_master_fqdn + + def cbf_master_address(self): + """ + Return the address of the MID CspMaster holding the + information about receptors. + """ + try: + proxy = self.connect(self._csp_master_fqdn) + return proxy.cbfMasterAddress + except tango.DevFailed as tango_err: + tango.Except.re_throw_exception(tango_ex, + "Connection failed", + "Receptors class", + tango.ErrSeverity.ERR) + def connect(self, fqdn): + """ + Establish the connection with the specified + TANGO device + """ + try: + proxy = DeviceProxy(fqdn) + proxy.ping() + return proxy + except tango.DevFailed as tango_ex: + LOGGER.error(tango_ex.args[0].desc) + tango.Except.re_throw_exception(tango_ex, + "Connection failed", + "Receptors class", + tango.ErrSeverity.ERR) + def receptor_vcc_map(self): + """ + Return the dictionary with the map of VCC ids and + receptors ids. + """ + receptor_to_vcc_map= {} + try: + cbf_master_addr = self.cbf_master_address() + proxy = self.connect(cbf_master_addr) + receptor_to_vcc = proxy.receptorToVcc + receptor_to_vcc_map = dict([int(ID) for ID in pair.split(":")] + for pair in receptor_to_vcc) + return receptor_to_vcc_map + except tango.DevFailed as tango_ex: + LOGGER.error(tango_ex.args[0].desc) + tango.Except.re_throw_exception(tango_ex, + "Connection failed", + "Receptors class", + tango.ErrSeverity.ERR) + def list_of_ids(self): + """ + Return the list of the receptor ids deployed by the + system. + """ + try: + value = self.receptor_vcc_map() + return list(value.keys()) + except tango.DevFailed as tango_ex: + self.logging.error(tango_ex.args[0].desc) + tango.Except.re_throw_exception(tango_ex, + "Connection failed", + "Receptors class", + tango.ErrSeverity.ERR) + + def unassigned_ids(self): + """ + Return the list of the receptor ids not assigned to any + subarray. + """ + value = [] + try: + proxy = self.connect(self._csp_master_fqdn) + value = proxy.unassignedReceptorIDs + return value + except tango.DevFailed as tango_ex: + LOGGER.error(tango_ex.args[0].desc) + tango.Except.re_throw_exception(tango_ex, + "Connection failed", + "Receptors class", + tango.ErrSeverity.ERR) + + def subarray_affiliation(self): + """ + Return the list with the subarray affilition for each + receptor. + """ + value = [] + try: + proxy = self.connect(self._csp_master_fqdn) + value = proxy.receptorMembership + LOGGER.info("receptorMembership:{}".format(value)) + return value + except tango.DevFailed as tango_ex: + LOGGER.error(tango_ex.args[0].desc) + tango.Except.re_throw_exception(tango_ex, + "Connection failed", + "Receptors class", + tango.ErrSeverity.ERR) + + def assigned_to_subarray(self, sub_id): + """ + Return the list of the assigned receptors to the subarray. + """ + assigned_receptors = [] + try: + receptor_membership = self.subarray_affiliation() + assigned_receptors = [receptor_id for receptor_id, e in enumerate(receptor_membership) if e == int(sub_id)] + return assigned_receptors + except tango.DevFailed as tango_ex: + LOGGER.error(tango_ex.args[0].desc) + tango.Except.re_throw_exception(tango_ex, + "Connection failed", + "Receptors class", + tango.ErrSeverity.ERR) + diff --git a/csp-lmc-mid/tests/unit/midcspsubarray_unit_test.py b/csp-lmc-mid/tests/unit/midcspsubarray_unit_test.py new file mode 100644 index 0000000000000000000000000000000000000000..b09d729257b6c53b4030a3e88fd41e86805115f1 --- /dev/null +++ b/csp-lmc-mid/tests/unit/midcspsubarray_unit_test.py @@ -0,0 +1,699 @@ +import contextlib +import importlib +import sys +import os +import mock +import pytest +import tango +import time +from mock import Mock, MagicMock + +from ska.base.control_model import HealthState, ObsState +from ska.base.commands import ResultCode +from tango.test_context import DeviceTestContext +from tango import DevState, DevFailed, DeviceProxy +import csp_lmc_common +import csp_lmc_mid +from utils import Probe, Poller +from csp_lmc_common.CspSubarray import CspSubarray +from csp_lmc_mid.MidCspSubarrayBase import MidCspSubarrayBase + +file_path = os.path.dirname(os.path.abspath(__file__)) + +def test_midcspsubarray_state_and_obstate_value_AFTER_initialization(): + """ + Test the State and obsState values for the CpSubarray at the + end of the initialization process. + """ + device_under_test = MidCspSubarrayBase + cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' + cbf_subarray_state_attr = 'State' + dut_properties = { + 'CspMaster':'mid_csp/elt/master', + 'CbfSubarray': cbf_subarray_fqdn, + 'PssSubarray': 'mid_csp_pss/sub_elt/subarray_01', + 'PstSubarray': 'mid_csp_pst/sub_elt/subarray_01', + } + event_subscription_map = {} + cbf_subarray_device_proxy_mock = Mock() + cbf_subarray_device_proxy_mock.subscribe_event.side_effect = ( + lambda attr_name, event_type, callback, *args, **kwargs: event_subscription_map.update({attr_name: callback})) + + proxies_to_mock = { + cbf_subarray_fqdn: cbf_subarray_device_proxy_mock + } + + with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, proxies_to_mock=proxies_to_mock) as tango_context: + dummy_event = create_dummy_event(cbf_subarray_fqdn, 'state', DevState.OFF) + event_subscription_map[cbf_subarray_state_attr](dummy_event) + # Note: the state is in ALARM because the device uses forwarded attributes. + #assert tango_context.device.State() == DevState.OFF + assert tango_context.device.obsState == ObsState.EMPTY + +def test_midcspsubarray_state_AFTER_On_WITH_exception_raised_by_subelement_subarray(): + """ + Test the behavior of the CspSubarray when one of the sub-element subarray + raises a DevFailed exception. + """ + device_under_test = MidCspSubarrayBase + cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' + pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' + dut_properties = { + 'CbfSubarray': cbf_subarray_fqdn, + 'PssSubarray': pss_subarray_fqdn, + 'CspMaster':'mid_csp/elt/master', + } + cbf_subarray_device_proxy_mock = Mock() + pss_subarray_device_proxy_mock = Mock() + proxies_to_mock = { + cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, + pss_subarray_fqdn: pss_subarray_device_proxy_mock + } + + with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, + proxies_to_mock=proxies_to_mock) as tango_context: + cbf_subarray_device_proxy_mock.On.side_effect = raise_devfailed_exception + pss_subarray_device_proxy_mock.On.side_effect = return_ok + tango_context.device.On() + # Note: the state is in ALARM because the device uses forwarded attributes. + #assert tango_context.device.State() == DevState.FAULT + assert tango_context.device.obsState == ObsState.EMPTY + +def test_midcspsubarray_state_AFTER_On_command_WITH_failed_code_returned_by_subelement_subarray(): + """ + Test the behavior of the CspSubarray when on the sub-element subarray + return a FAILED code. + """ + device_under_test = MidCspSubarrayBase + cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' + pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' + dut_properties = { + 'CbfSubarray': cbf_subarray_fqdn, + 'PssSubarray': pss_subarray_fqdn, + 'CspMaster':'mid_csp/elt/master', + } + cbf_subarray_device_proxy_mock = Mock() + pss_subarray_device_proxy_mock = Mock() + proxies_to_mock = { + cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, + pss_subarray_fqdn: pss_subarray_device_proxy_mock, + + } + + with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, + proxies_to_mock=proxies_to_mock) as tango_context: + cbf_subarray_device_proxy_mock.On.side_effect = return_failed + pss_subarray_device_proxy_mock.On.side_effect = return_ok + tango_context.device.On() + #assert tango_context.device.State() == DevState.FAULT + assert tango_context.device.obsState == ObsState.EMPTY + +def test_midcspsubarray_state_AFTER_On_command_WITH_failed_code_returned_by_pss_subarray(): + """ + Test the behavior of the CspSubarray when on the sub-element subarray + return a FAILED code. + """ + device_under_test = MidCspSubarrayBase + cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' + pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' + dut_properties = { + 'CbfSubarray': cbf_subarray_fqdn, + 'PssSubarray': pss_subarray_fqdn, + 'CspMaster':'mid_csp/elt/master', + } + cbf_subarray_device_proxy_mock = Mock() + pss_subarray_device_proxy_mock = Mock() + proxies_to_mock = { + cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, + pss_subarray_fqdn: pss_subarray_device_proxy_mock + } + + with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, + proxies_to_mock=proxies_to_mock) as tango_context: + cbf_subarray_device_proxy_mock.On.side_effect = return_ok + pss_subarray_device_proxy_mock.On.side_effect = return_failed + tango_context.device.On() + #assert tango_context.device.State() == DevState.ON + assert tango_context.device.obsState == ObsState.EMPTY + assert tango_context.device.healthState == HealthState.DEGRADED + +def test_midcspsubarray_state_after_On_forwarded_to_subelement_subarray(): + """ + Test the behavior of the CspSubarray when on the sub-element subarray + return a FAILED code. + """ + device_under_test = MidCspSubarrayBase + cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' + pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' + dut_properties = { + 'CbfSubarray': cbf_subarray_fqdn, + 'PssSubarray': pss_subarray_fqdn, + 'CspMaster':'mid_csp/elt/master', + } + cbf_subarray_device_proxy_mock = Mock() + pss_subarray_device_proxy_mock = Mock() + proxies_to_mock = { + cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, + pss_subarray_fqdn: pss_subarray_device_proxy_mock, + } + + with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, + proxies_to_mock=proxies_to_mock) as tango_context: + cbf_subarray_device_proxy_mock.On.side_effect = return_ok + pss_subarray_device_proxy_mock.On.side_effect = return_ok + tango_context.device.On() + #assert tango_context.device.State() == DevState.ON + assert tango_context.device.obsState == ObsState.EMPTY + +def test_midcspsubarray_obsstate_AFTER_add_receptors_return_failed(): + """ + Test the behavior of the MidCspSubarray when receptors are added to the + CBF Subarray. + Expected result: state = ON obsState=IDLE + """ + device_under_test = MidCspSubarrayBase + cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' + pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' + cbf_subarray_state_attr = 'obsState' + dut_properties = { + 'CbfSubarray': cbf_subarray_fqdn, + 'PssSubarray': pss_subarray_fqdn, + 'CspMaster':'mid_csp/elt/master', + 'SubID': '1', + } + event_subscription_map = {} + cbf_subarray_device_proxy_mock = Mock() + pss_subarray_device_proxy_mock = Mock() + proxies_to_mock = { + cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, + pss_subarray_fqdn: pss_subarray_device_proxy_mock, + } + cbf_subarray_device_proxy_mock.command_inout_asynch.side_effect = ( + lambda command_name, argument, callback, *args, + **kwargs: event_subscription_map.update({command_name: callback})) + cbf_subarray_device_proxy_mock.subscribe_event.side_effect = ( + lambda attr_name, event_type, callback, *args, **kwargs: event_subscription_map.update({attr_name: callback})) + with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, + proxies_to_mock=proxies_to_mock) as tango_context: + with mock.patch('csp_lmc_mid.receptors.Receptors.assigned_to_subarray') as mock_assigned_to_subarray : + cbf_subarray_device_proxy_mock.On.side_effect = return_ok + pss_subarray_device_proxy_mock.On.side_effect = return_ok + tango_context.device.On() + dummy_event = create_dummy_event(cbf_subarray_fqdn, 'obsstate',ObsState.EMPTY) + event_subscription_map[cbf_subarray_state_attr](dummy_event) + mock_assigned_to_subarray.return_value = [] + tango_context.device.AddReceptors([1,2,11,17,21,23]) + dummy_event = command_callback_with_event_error("AddReceptors", cbf_subarray_fqdn) + event_subscription_map["AddReceptors"](dummy_event) + prober_obs_state = Probe(tango_context.device, 'obsState', ObsState.FAULT, f"Wrong CspSubarray state") + Poller(3, 0.1).check(prober_obs_state) + +''' +def test_midcspsubarray_obsstate_AFTER_add_receptors_raise_exception(): + """ + Test the behavior of the MidCspSubarray when receptors are added to the + CBF Subarray. + Expected result: state = ON obsState=IDLE + """ + device_under_test = MidCspSubarrayBase + cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' + pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' + cbf_subarray_state_attr = 'obsState' + dut_properties = { + 'CbfSubarray': cbf_subarray_fqdn, + 'PssSubarray': pss_subarray_fqdn, + 'CspMaster':'mid_csp/elt/master', + 'SubID': '1', + } + cbf_subarray_device_proxy_mock = Mock() + pss_subarray_device_proxy_mock = Mock() + proxies_to_mock = { + cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, + pss_subarray_fqdn: pss_subarray_device_proxy_mock, + } + event_subscription_map = {} + cbf_subarray_device_proxy_mock.command_inout_asynch.side_effect = ( + lambda command_name, argument, callback, *args, **kwargs: event_subscription_map.update({command_name: raise_devfailed_exception})) + cbf_subarray_device_proxy_mock.subscribe_event.side_effect = ( + lambda attr_name, event_type, callback, *args, **kwargs: event_subscription_map.update({attr_name: callback})) + with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, proxies_to_mock=proxies_to_mock) as tango_context: + cbf_subarray_device_proxy_mock.On.side_effect = return_ok + pss_subarray_device_proxy_mock.On.side_effect = return_ok + tango_context.device.On() + dummy_event = create_dummy_event(cbf_subarray_fqdn, 'obsstate',ObsState.EMPTY) + event_subscription_map[cbf_subarray_state_attr](dummy_event) + tango_context.device.AddReceptors([1,2,11,17,21,23]) + dummy_event = raise_exception("AddReceptors") + event_subscription_map["AddReceptors"](dummy_event) + assert tango_context.device.obsState == ObsState.FAULT +''' + +def test_midcspsubarray_obsstate_AFTER_add_receptors_to_cbf_subarray(): + """ + Test the behavior of the MidCspSubarray when receptors are added to the + CBF Subarray. + Expected result: state = ON obsState=IDLE + """ + device_under_test = MidCspSubarrayBase + cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' + pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' + cbf_subarray_state_attr = 'obsState' + dut_properties = { + 'CbfSubarray': cbf_subarray_fqdn, + 'PssSubarray': pss_subarray_fqdn, + 'CspMaster':'mid_csp/elt/master', + 'SubID': '1', + } + event_subscription_map = {} + cbf_subarray_device_proxy_mock = Mock() + pss_subarray_device_proxy_mock = Mock() + cbf_subarray_device_proxy_mock.subscribe_event.side_effect = ( + lambda attr_name, event_type, callback, *args, **kwargs: event_subscription_map.update({attr_name: callback})) + proxies_to_mock = { + cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, + pss_subarray_fqdn: cbf_subarray_device_proxy_mock, + } + with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, + proxies_to_mock=proxies_to_mock) as tango_context: + cbf_subarray_device_proxy_mock.On.side_effect = return_ok + pss_subarray_device_proxy_mock.On.side_effect = return_ok + tango_context.device.On() + dummy_event = create_dummy_event(cbf_subarray_fqdn, 'obstate',ObsState.IDLE) + event_subscription_map[cbf_subarray_state_attr](dummy_event) + with mock.patch('csp_lmc_mid.receptors.Receptors.list_of_ids') as mock_list_of_ids, \ + mock.patch('csp_lmc_mid.receptors.Receptors.unassigned_ids') as mock_unassigned_ids, \ + mock.patch('csp_lmc_mid.receptors.Receptors.assigned_to_subarray') as mock_assigned_to_subarray,\ + mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.__len__') as mock_len: + mock_list_of_ids.return_value = [1,2,3,4] + mock_unassigned_ids.return_value = [1,2] + mock_assigned_to_subarray.return_value = [1,2] + mock_len.return_value = len([1,2]) + tango_context.device.AddReceptors([1,2]) + dummy_event = create_dummy_event(cbf_subarray_fqdn, 'obsstate',ObsState.RESOURCING) + event_subscription_map[cbf_subarray_state_attr](dummy_event) + dummy_event = create_dummy_event(cbf_subarray_fqdn, 'obsstate',ObsState.IDLE) + event_subscription_map[cbf_subarray_state_attr](dummy_event) + assert tango_context.device.obsState == ObsState.IDLE + +def test_midcspsubarray_obsstate_AFTER_configure(): + """ + Test the behavior of the MidCspSubarray when the command Configure + is issued on the CBF Subarray. + Expected result: state = ON obsState=READY + """ + device_under_test = MidCspSubarrayBase + cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' + pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' + cbf_subarray_state_attr = 'obsState' + dut_properties = { + 'CbfSubarray': cbf_subarray_fqdn, + 'PssSubarray': pss_subarray_fqdn, + 'CspMaster':'mid_csp/elt/master', + 'SubID': '1', + } + event_subscription_map = {} + cbf_subarray_device_proxy_mock = Mock() + pss_subarray_device_proxy_mock = Mock() + cbf_subarray_device_proxy_mock.subscribe_event.side_effect = ( + lambda attr_name, event_type, callback, *args, **kwargs: event_subscription_map.update({attr_name: callback})) + proxies_to_mock = { + cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, + pss_subarray_fqdn: cbf_subarray_device_proxy_mock, + } + receptor_list = [1,2,3] + with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, + proxies_to_mock=proxies_to_mock) as tango_context: + with mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.AddReceptorsCommand.do') as mock_do,\ + mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.OnCommand.do') as mock_on_do,\ + mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase._get_expected_delay') as mock_delay_expected,\ + mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.__len__') as mock_len: + mock_len.return_value = len(receptor_list) + mock_do.return_value = (ResultCode.OK, "AddReceptors OK") + mock_on_do.return_value = (ResultCode.OK, "On command OK") + mock_delay_expected.return_value = 2 + tango_context.device.On() + tango_context.device.AddReceptors(receptor_list) + #assert tango_context.device.obsState == ObsState.IDLE + cbf_subarray_device_proxy_mock.configureDelay.return_value = 10 + configuration_string = load_json_file("test_ConfigureScan_ADR4.json") + tango_context.device.Configure(configuration_string) + #assert tango_context.device.obsState == ObsState.CONFIGURING + dummy_event = create_dummy_event(cbf_subarray_fqdn, 'obsstate',ObsState.READY) + event_subscription_map[cbf_subarray_state_attr](dummy_event) + prober_obs_state = Probe(tango_context.device, 'obsState', ObsState.READY, f"Wrong CspSubarray state") + Poller(3, 0.1).check(prober_obs_state) + assert tango_context.device.obsState == ObsState.READY + +def test_midcspsubarray_obsstate_AFTER_configure_WITH_wrong_json(): + """ + Test the behavior of the MidCspSubarray when the command Configure + is issued a wrong configuration file. + Expected result: state = ON obsState=IDLE + """ + device_under_test = MidCspSubarrayBase + cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' + pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' + dut_properties = { + 'CbfSubarray': cbf_subarray_fqdn, + 'PssSubarray': pss_subarray_fqdn, + 'CspMaster':'mid_csp/elt/master', + 'SubID': '1', + } + cbf_subarray_device_proxy_mock = Mock() + pss_subarray_device_proxy_mock = Mock() + proxies_to_mock = { + cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, + pss_subarray_fqdn: cbf_subarray_device_proxy_mock, + } + receptor_list = [1,2,3] + with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, + proxies_to_mock=proxies_to_mock) as tango_context: + with mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.AddReceptorsCommand.do') as mock_do,\ + mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.OnCommand.do') as mock_on_do,\ + mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.__len__') as mock_len: + mock_len.return_value = len(receptor_list) + mock_do.return_value = (ResultCode.OK, "AddReceptors OK") + mock_on_do.return_value = (ResultCode.OK, "On command OK") + tango_context.device.On() + tango_context.device.AddReceptors(receptor_list) + configuration_string = load_json_file("test_ConfigureScan_without_configID.json") + tango_context.device.Configure(configuration_string) + assert tango_context.device.obsState == ObsState.FAULT + +def test_midcspsubarray_obsstate_AFTER_configure_WITH_timeout(): + """ + Test the behavior of the MidCspSubarray when the command Configure + detect a timeout condition. + Expected result: state = ON obsState=FAULT + """ + device_under_test = MidCspSubarrayBase + cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' + pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' + cbf_subarray_state_attr = 'obsState' + dut_properties = { + 'CbfSubarray': cbf_subarray_fqdn, + 'PssSubarray': pss_subarray_fqdn, + 'CspMaster':'mid_csp/elt/master', + 'SubID': '1', + } + event_subscription_map = {} + cbf_subarray_device_proxy_mock = Mock() + pss_subarray_device_proxy_mock = Mock() + cbf_subarray_device_proxy_mock.subscribe_event.side_effect = ( + lambda attr_name, event_type, callback, *args, **kwargs: event_subscription_map.update({attr_name: callback})) + proxies_to_mock = { + cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, + pss_subarray_fqdn: cbf_subarray_device_proxy_mock, + } + receptor_list = [1,2,3] + with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, + proxies_to_mock=proxies_to_mock) as tango_context: + cbf_subarray_device_proxy_mock.Configure.side_effect = return_failed + with mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.AddReceptorsCommand.do') as mock_do,\ + mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.OnCommand.do') as mock_on_do,\ + mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase._get_expected_delay') as mock_delay_expected,\ + mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.__len__') as mock_len: + mock_len.return_value = len(receptor_list) + mock_do.return_value = (ResultCode.OK, "AddReceptors OK") + mock_on_do.return_value = (ResultCode.OK, "On command OK") + mock_delay_expected.return_value = 2 + tango_context.device.On() + tango_context.device.AddReceptors(receptor_list) + #assert tango_context.device.obsState == ObsState.IDLE + configuration_string = load_json_file("test_ConfigureScan_ADR4.json") + tango_context.device.Configure(configuration_string) + #assert tango_context.device.obsState == ObsState.CONFIGURING + prober_obs_state = Probe(tango_context.device, 'timeoutExpiredFlag', True, f"Wrong CspSubarray state") + Poller(3, 0.1).check(prober_obs_state) + assert tango_context.device.obsState == ObsState.FAULT + +def test_midcspsubarray_obsstate_AFTER_configure_WITH_cbf_in_FAULT(): + """ + Test the behavior of the MidCspSubarray when the command Configure + detect a failure condition. + Expected result: state = ON obsState=FAULT + """ + device_under_test = MidCspSubarrayBase + cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' + pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' + cbf_subarray_state_attr = 'obsState' + dut_properties = { + 'CbfSubarray': cbf_subarray_fqdn, + 'PssSubarray': pss_subarray_fqdn, + 'CspMaster':'mid_csp/elt/master', + 'SubID': '1', + } + event_subscription_map = {} + cbf_subarray_device_proxy_mock = Mock() + pss_subarray_device_proxy_mock = Mock() + cbf_subarray_device_proxy_mock.subscribe_event.side_effect = ( + lambda attr_name, event_type, callback, *args, **kwargs: event_subscription_map.update({attr_name: callback})) + proxies_to_mock = { + cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, + pss_subarray_fqdn: cbf_subarray_device_proxy_mock, + } + receptor_list = [1,2,3] + with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, + proxies_to_mock=proxies_to_mock) as tango_context: + cbf_subarray_device_proxy_mock.Configure.side_effect = return_failed + with mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.AddReceptorsCommand.do') as mock_do,\ + mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.OnCommand.do') as mock_on_do,\ + mock.patch('csp_lmc_common.CspSubarray.CspSubarray._get_expected_delay') as mock_delay_expected,\ + mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.__len__') as mock_len: + mock_len.return_value = len(receptor_list) + mock_do.return_value = (ResultCode.OK, "AddReceptors OK") + mock_on_do.return_value = (ResultCode.OK, "On command OK") + mock_delay_expected.return_value = 2 + tango_context.device.On() + tango_context.device.AddReceptors(receptor_list) + configuration_string = load_json_file("test_ConfigureScan_ADR4.json") + tango_context.device.Configure(configuration_string) + dummy_event = create_dummy_event(cbf_subarray_fqdn, 'obsstate',ObsState.FAULT) + event_subscription_map[cbf_subarray_state_attr](dummy_event) + prober_obs_state = Probe(tango_context.device, 'failureRaisedFlag', True, f"Wrong CspSubarray state") + Poller(3, 0.1).check(prober_obs_state) + assert tango_context.device.obsState == ObsState.FAULT + +def test_midcspsubarray_obsstate_AFTER_configure_WITH_cbf_returning_FAULT(): + """ + Test the behavior of the MidCspSubarray when the command Configure + detect a failure condition. + Expected result: state = ON obsState=FAULT + """ + device_under_test = MidCspSubarrayBase + cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' + pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' + cbf_subarray_state_attr = 'obsState' + dut_properties = { + 'CbfSubarray': cbf_subarray_fqdn, + 'PssSubarray': pss_subarray_fqdn, + 'CspMaster':'mid_csp/elt/master', + 'SubID': '1', + } + event_subscription_map = {} + cbf_subarray_device_proxy_mock = Mock() + pss_subarray_device_proxy_mock = Mock() + cbf_subarray_device_proxy_mock.command_inout_asynch.side_effect = ( + lambda command_name, argument, callback, *args, + **kwargs: event_subscription_map.update({command_name: callback})) + proxies_to_mock = { + cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, + pss_subarray_fqdn: cbf_subarray_device_proxy_mock, + } + receptor_list = [1,2,3] + with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, proxies_to_mock=proxies_to_mock) as tango_context: + with mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.AddReceptorsCommand.do') as mock_do,\ + mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.OnCommand.do') as mock_on_do,\ + mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.__len__') as mock_len: + mock_len.return_value = len(receptor_list) + mock_do.return_value = (ResultCode.OK, "AddReceptors OK") + mock_on_do.return_value = (ResultCode.OK, "On command OK") + tango_context.device.On() + tango_context.device.AddReceptors(receptor_list) + configuration_string = load_json_file("test_ConfigureScan_ADR4.json") + tango_context.device.Configure(configuration_string) + dummy_event = command_callback_with_event_error("ConfigureScan", cbf_subarray_fqdn) + event_subscription_map["ConfigureScan"](dummy_event) + prober_obs_state = Probe(tango_context.device, 'failureRaisedFlag', True, f"Wrong CspSubarray state") + Poller(3, 0.1).check(prober_obs_state) + assert tango_context.device.obsState == ObsState.FAULT + +''' +def test_midcspsubarray_obsstate_AFTER_end_scan(): + """ + Test the behavior of the MidCspSubarray when the command Configure + is issued on the CBF Subarray. + Expected result: state = ON obsState=READY + """ + device_under_test = MidCspSubarrayBase + cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' + pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' + cbf_subarray_state_attr = 'obsState' + dut_properties = { + 'CbfSubarray': cbf_subarray_fqdn, + 'PssSubarray': pss_subarray_fqdn, + 'CspMaster':'mid_csp/elt/master', + 'SubID': '1', + } + event_subscription_map = {} + cbf_subarray_device_proxy_mock = Mock() + pss_subarray_device_proxy_mock = Mock() + cbf_subarray_device_proxy_mock.subscribe_event.side_effect = ( + lambda attr_name, event_type, callback, *args, **kwargs: event_subscription_map.update({attr_name: callback})) + proxies_to_mock = { + cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, + pss_subarray_fqdn: cbf_subarray_device_proxy_mock, + } + receptor_list = [1,2,3] + cbf_subarray_device_proxy_mock.Scan.side_effect = return_started + #cbf_subarray_device_proxy_mock.Scan.side_effect = return_ok + with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, proxies_to_mock=proxies_to_mock) as tango_context: + with mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.AddReceptorsCommand.do') as mock_do,\ + mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.OnCommand.do') as mock_on_do,\ + mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.__len__') as mock_len: + mock_len.return_value = len(receptor_list) + mock_do.return_value = (ResultCode.OK, "AddReceptors OK") + mock_on_do.return_value = (ResultCode.OK, "On command OK") + #mock_configure_do.return_value = (ResultCode.OK, "Configure command OK") + tango_context.device.On() + tango_context.device.AddReceptors(receptor_list) + configuration_string = load_json_file("test_ConfigureScan_ADR4.json") + tango_context.device.Configure(configuration_string) + dummy_event = create_dummy_event(cbf_subarray_fqdn, 'obsstate',ObsState.READY) + event_subscription_map[cbf_subarray_state_attr](dummy_event) + prober_obs_state = Probe(tango_context.device, 'obsState', ObsState.READY, f"Wrong CspSubarray state") + Poller(3, 0.1).check(prober_obs_state) + tango_context.device.Scan(["1",]) + time.sleep(1) + tango_context.device.EndScan() + dummy_event = create_dummy_event(cbf_subarray_fqdn, 'obsstate',ObsState.READY) + event_subscription_map[cbf_subarray_state_attr](dummy_event) + prober_obs_state = Probe(tango_context.device, 'obsState', ObsState.READY, f"Wrong CspSubarray state") + Poller(3, 0.1).check(prober_obs_state) + assert tango_context.device.obsState == ObsState.READY +''' + +def test_midcspsubarray_obsreset(): + """ + Test the behavior of the MidCspSubarray when the command ObsReset + is issued. + Expected result: state = ON obsState=IDLE + """ + device_under_test = MidCspSubarrayBase + cbf_subarray_fqdn = 'mid_csp_cbf/sub_elt/subarray_01' + pss_subarray_fqdn = 'mid_csp_pss/sub_elt/subarray_01' + cbf_subarray_state_attr = 'obsState' + dut_properties = { + 'CbfSubarray': cbf_subarray_fqdn, + 'PssSubarray': pss_subarray_fqdn, + 'CspMaster':'mid_csp/elt/master', + 'SubID': '1', + } + event_subscription_map = {} + cbf_subarray_device_proxy_mock = Mock() + pss_subarray_device_proxy_mock = Mock() + cbf_subarray_device_proxy_mock.subscribe_event.side_effect = ( + lambda attr_name, event_type, callback, *args, **kwargs: event_subscription_map.update({attr_name: callback})) + proxies_to_mock = { + cbf_subarray_fqdn: cbf_subarray_device_proxy_mock, + pss_subarray_fqdn: cbf_subarray_device_proxy_mock, + } + receptor_list = [1,2,3] + with fake_tango_system(device_under_test, initial_dut_properties=dut_properties, proxies_to_mock=proxies_to_mock) as tango_context: + with mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.AddReceptorsCommand.do') as mock_do,\ + mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.OnCommand.do') as mock_on_do,\ + mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.ConfigureCommand.do') as mock_configure_do,\ + mock.patch('csp_lmc_mid.MidCspSubarrayBase.MidCspSubarrayBase.__len__') as mock_len: + mock_len.return_value = len(receptor_list) + mock_do.return_value = (ResultCode.OK, "AddReceptors OK") + mock_on_do.return_value = (ResultCode.OK, "On command OK") + mock_configure_do.return_value = (ResultCode.FAILED, "Configure command FAILED") + tango_context.device.On() + tango_context.device.AddReceptors(receptor_list) + tango_context.device.Configure("") + assert tango_context.device.obsState == ObsState.FAULT + tango_context.device.ObsReset() + assert tango_context.device.obsState == ObsState.IDLE + +def return_ok(): + """ + Return a FAILED code in the execution of a device method. + """ + message = "CBF Subarray Oncommand OK" + return (ResultCode.OK, message) + +def return_started(): + """ + Return a FAILED code in the execution of a device method. + """ + message = "CBF Subarray Oncommand OK" + return (ResultCode.STARTED, message) + +def return_failed(): + """ + Return a FAILED code in the execution of a device method. + """ + print("return failed") + return (ResultCode.FAILED, "On Command failed") + +def raise_devfailed_exception(): + """ + Raise an exception to test the failure of a device command + """ + print("raise_devfailed_exception") + tango.Except.throw_exception("Commandfailed", "This is error message for devfailed", + " ", tango.ErrSeverity.ERR) +def mock_event_dev_name(device_name): + return device_name + +def create_dummy_event(cbf_subarray_fqdn, attr_name, event_value): + """ + Create a mocked event object to test the event callback method + associate to the attribute at subscription. + param: cbf_subarray_fqdn the CBF Subarray FQDN + event_value the expected value + return: the fake event + """ + fake_event = Mock() + fake_event.err = False + fake_event.attr_name = f"{cbf_subarray_fqdn}/{attr_name}" + fake_event.attr_value.value = event_value + fake_event.attr_value.name = attr_name + fake_event.device.name = cbf_subarray_fqdn + fake_event.device.dev_name.side_effect=(lambda *args, **kwargs: mock_event_dev_name(cbf_subarray_fqdn)) + return fake_event + +def command_callback_with_event_error(command_name, cbf_subarray_fqdn): + print("command_callback_with_event_error:{}".format(command_name, cbf_subarray_fqdn)) + fake_event = MagicMock() + fake_event.err = False + fake_event.cmd_name = f"{command_name}" + fake_event.device.dev_name.side_effect=(lambda *args, **kwargs: mock_event_dev_name(cbf_subarray_fqdn)) + fake_event.argout = [ResultCode.FAILED, "Command Failed"] + return fake_event + +def raise_exception(command_name): + return raise_devfailed_exception() + +def load_json_file(file_name): + print(file_path) + print(os.path.dirname(__file__)) + path= os.path.join(os.path.dirname(__file__), '../acceptance_tests' , file_name) + with open(path, 'r') as f: + configuration_string = f.read().replace("\n", "") + return configuration_string + +@contextlib.contextmanager +def fake_tango_system(device_under_test, initial_dut_properties={}, proxies_to_mock={}, + device_proxy_import_path='tango.DeviceProxy'): + + with mock.patch(device_proxy_import_path) as patched_constructor: + patched_constructor.side_effect = lambda device_fqdn: proxies_to_mock.get(device_fqdn, Mock()) + patched_module = importlib.reload(sys.modules[device_under_test.__module__]) + + device_under_test = getattr(patched_module, device_under_test.__name__) + + device_test_context = DeviceTestContext(device_under_test, properties=initial_dut_properties) + device_test_context.start() + yield device_test_context + device_test_context.stop()