diff --git a/noctua/devices/alpaca2.py b/noctua/devices/alpaca2.py new file mode 100644 index 0000000000000000000000000000000000000000..0c960930d3d4a82883680d655acfdfca2e072e58 --- /dev/null +++ b/noctua/devices/alpaca2.py @@ -0,0 +1,591 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Interface with an ASCOM Alpaca device. +""" + +# System modules +import time +from urllib.parse import urlencode + +# Third-party modules +import requests + +# Other templates +from ..config.constants import dome_park_position +from ..utils import check +from ..utils.logger import log +from .basedevice import BaseDevice + + +class AlpacaDevice(BaseDevice): + """Base wrapper class for ASCOM Alpaca devices.""" + + def __init__(self, url, dev=None, device_number=0): + """ + Initializes the Alpaca device. + + Parameters + ---------- + url : str + The base URL of the Alpaca server (e.g., "http://192.168.1.100:11111"). + dev : str, optional + The device type (e.g., "telescope", "dome"). + device_number : int, optional + The device number on the Alpaca server, by default 0. + """ + super().__init__(url) + self.url = url + self._dev = dev + self.device_number = device_number + self.timeout = 10 + + @property + def addr(self): + """ + str: The full API endpoint address for the device. + """ + return f"{self.url}/api/v1/{self._dev}/{self.device_number}" + + @check.request_errors + def get(self, method, params=None, timeout=None): + """ + Sends a HTTP GET request to the device. + + Parameters + ---------- + method : str + The Alpaca API method name (e.g., "temperature"). + params : dict, optional + A dictionary of query parameters. + timeout : int, optional + Request timeout in seconds. Defaults to the class timeout. + + Returns + ------- + any or None + The "Value" field from the Alpaca JSON response, or None on error. + """ + if params is None: + params = {} + if timeout is None: + timeout = self.timeout + + res = requests.get(f"{self.addr}/{method}", params=params, timeout=timeout) + res.raise_for_status() + + resj = res.json() + if resj["ErrorNumber"] != 0: + msg = f'ASCOM {resj["ErrorNumber"]}: {resj["ErrorMessage"]}' + log.error(msg) + self.error.append(msg) + return None + + return resj.get("Value", None) + + @check.request_errors + def put(self, method, data=None, timeout=None): + """ + Sends a HTTP PUT request to the device. + + Parameters + ---------- + method : str + The Alpaca API method name (e.g., "connected"). + data : dict, optional + A dictionary of form data to send. + timeout : int, optional + Request timeout in seconds. Defaults to the class timeout. + + Returns + ------- + any or None + The "Value" field if present, True on success, or None on error. + """ + if data is None: + data = {} + if timeout is None: + timeout = self.timeout + + res = requests.put(f"{self.addr}/{method}", data=data, timeout=timeout) + res.raise_for_status() + + resj = res.json() + if resj["ErrorNumber"] != 0: + msg = f'ASCOM {resj["ErrorNumber"]}: {resj["ErrorMessage"]}' + log.error(msg) + self.error.append(msg) + return None + + return resj.get("Value", True) + + @property + def name(self): + """str or None: The name of the device.""" + + res = self.get("name") + if self.error: + return None + return res + + @property + def connection(self): + """bool or None: The connection status of the device.""" + + res = self.get("connected") + if self.error: + return None + return res + + @connection.setter + def connection(self, b): + """ + Sets the connection status of the device. + + Parameters + ---------- + b : bool + True to connect, False to disconnect. + """ + + data = {"Connected": str(b)} + self.put("connected", data=data) + + def command(self, cmd_string, raw=False, action="commandstring"): + """ + Sends a command string to the device via a non-standard action. + + Note + ---- + This method uses a custom 'commandstring' or 'commandblind' action, + which is a non-standard Alpaca extension. Its use indicates that this + driver is tailored to a specific server implementation that tunnels + other protocols (like OpenTSI) through Alpaca. + + Parameters + ---------- + cmd_string : str + The command string to execute. + raw : bool, optional + Indicates if the command is a raw command, by default False. + action : str, optional + The Alpaca action to use, either "commandstring" or "commandblind". + + Returns + ------- + any or None + The result of the command, or None on error. + """ + data = {"Command": cmd_string, "Raw": str(raw)} + res = self.put(action, data=data) + if self.error: + return None + return res + + def commandblind(self, cmd_string, raw=False): + """ + Sends a "fire and forget" command string to the device. + + See Also + -------- + command + """ + return self.command(cmd_string, raw, action="commandblind") + + +class Switch(AlpacaDevice): + """Wrapper for ASCOM Alpaca devices of "switch" type.""" + + def __init__(self, url, switch_id, device_number=0): + """ + Initializes the Switch interface. + + Parameters + ---------- + url : str + The base URL of the Alpaca server. + switch_id : int + The ID of the specific switch device to control. + device_number : int, optional + The device number of the switch controller on the server, by default 0. + """ + super().__init__(url, dev="switch", device_number=device_number) + self.switch_id = switch_id + + @property + def description(self): + """str or None: The name or description of the switch.""" + + params = {"Id": self.switch_id} + res = self.get("getswitchname", params=params) + if self.error: + return None + return res + + @property + def state(self): + """ + bool or None: The current state of the switch. + True if on, False if off. + """ + + params = {"Id": self.switch_id} + res = self.get("getswitch", params=params) + if self.error: + return None + return res + + @state.setter + def state(self, b): + """ + Sets the state of the switch. + + Parameters + ---------- + b : bool + True to turn the switch on, False to turn it off. + """ + + data = {"Id": self.switch_id, "State": str(b)} + self.put("setswitch", data=data) + + +class Dome(AlpacaDevice): + """Wrapper for ASCOM Alpaca devices of "dome" type.""" + + def __init__(self, url, device_number=0): + """ + Initializes the Dome interface. + """ + super().__init__(url, dev="dome", device_number=device_number) + self.PARK = dome_park_position + self.TOLERANCE = 1.0 # deg tolerance for park + + def park(self): + """Puts the dome in its park position.""" + + self.put("park") + + def sync(self, az): + """ + Synchronizes the dome's azimuth to a specific value. + """ + + data = {"Azimuth": az} + self.put("synctoazimuth", data=data) + + def abort(self): + """Aborts any dome motion.""" + + self.put("abortslew") + + @property + def is_parked(self): + """bool or None: Checks if the dome is at its park position.""" + + res = self.get("atpark") + if self.error: + return None + return res + + @property + def is_moving(self): + """bool or None: Checks if the dome is slewing.""" + + res = self.get("slewing") + if self.error: + return None + return res + + @property + def azimuth(self): + """float or None: The current dome azimuth in degrees.""" + + res = self.get("azimuth") + if self.error: + return None + return res + + @azimuth.setter + def azimuth(self, a): + """ + Slews the dome to a new azimuth. + + Parameters + ---------- + a : float + The target azimuth in degrees. + """ + + data = {"Azimuth": a} + self.put("slewtoazimuth", data=data) + + @property + def shutter(self): + """ + int or None: The status of the dome shutter. + + (0=Open, 1=Closed, 2=Opening, 3=Closing, 4=Error) + """ + + res = self.get("shutterstatus") + if self.error: + return None + return res + + @property + def open(self): + """ + bool or None: The dome shutter position. + True if open, False if closed, None otherwise. + """ + + res = self.shutter + if res == 0: + return True + elif res == 1: + return False + return None + + @open.setter + def open(self, b): + """ + Opens or closes the dome shutter. + """ + + if b: + self.put("openshutter") + else: + self.put("closeshutter") + + @property + def slave(self): + """ + bool or None: The slaved state of the dome. + """ + + res = self.get("slaved") + if self.error: + return None + return res + + @slave.setter + def slave(self, b): + """ + Slaves or un-slaves the dome from the telescope. + """ + + data = {"Slaved": str(b)} + self.put("slaved", data=data) + + +class Telescope(AlpacaDevice): + """Wrapper for ASCOM Alpaca devices of "telescope" type.""" + + def __init__(self, url, device_number=0): + """ + Initializes the Telescope interface. + """ + super().__init__(url, dev="telescope", device_number=device_number) + + def track(self): + """ + Slews to the current target coordinates and begins tracking. + """ + + self.tracking = True + self.put("slewtotargetasync") + + def abort(self): + """Aborts any telescope motion.""" + + self.put("abortslew") + + @property + def is_moving(self): + """bool or None: Checks if the telescope is slewing.""" + + res = self.get("slewing") + if self.error: + return None + return res + + @property + def state(self): + """ + int or None: The global operational status of the telescope. + Uses a non-standard command. + """ + + res = self.command("TELESCOPE.STATUS.GLOBAL") + if self.error: + return None + return res + + @property + def park(self): + """bool or None: Checks if the telescope is parked.""" + + res = self.get("atpark") + if self.error: + return None + return res + + @park.setter + def park(self, b, timeout=120): + """ + Parks or unparks the telescope. + """ + + if b: + self.put("park", timeout=timeout) + else: + self.put("unpark", timeout=timeout) + + @property + def altaz(self): + """list or None: Current [altitude, azimuth] in degrees.""" + + alt = self.get("altitude") + if self.error: + return [None, None] + az = self.get("azimuth") + if self.error: + return [None, None] + return [alt, az] + + @altaz.setter + def altaz(self, a): + """ + Slews the telescope to the given Alt/Az coordinates. + """ + + self.tracking = False + data = {"Altitude": a[0], "Azimuth": a[1]} + self.put("slewtoaltazasync", data=data) + + @property + def radec(self): + """list or None: Current [RA, Dec] in [hours, degrees].""" + + ra = self.get("rightascension") + if self.error: + return [None, None] + dec = self.get("declination") + if self.error: + return [None, None] + return [ra, dec] + + @radec.setter + def radec(self, a): + """ + Slews to the given RA/Dec coordinates and tracks. + """ + + self.tracking = True + data = {"RightAscension": a[0], "Declination": a[1]} + self.put("slewtocoordinatesasync", data=data) + + @property + def tracking(self): + """bool or None: The sidereal tracking state of the telescope.""" + + res = self.get("tracking") + if self.error: + return None + return res + + @tracking.setter + def tracking(self, b): + """ + Enables or disables sidereal tracking. + """ + + data = {"Tracking": str(b)} + self.put("tracking", data=data) + + +class Focuser(AlpacaDevice): + """Wrapper for ASCOM Alpaca devices of "focuser" type.""" + + def __init__(self, url, device_number=0): + """ + Initializes the Focuser interface. + """ + super().__init__(url, dev="focuser", device_number=device_number) + + @property + def is_moving(self): + """bool or None: Checks if the focuser is currently moving.""" + + res = self.get("ismoving") + if self.error: + return None + return res + + @property + def position(self): + """int or None: The current focuser position in steps or microns.""" + + res = self.get("position") + if self.error: + return None + return res + + @position.setter + def position(self, s): + """ + Moves the focuser to a new absolute position. + + Parameters + ---------- + s : int + The target position in steps or microns, as defined by the device. + """ + + data = {"Position": s} + self.put("move", data=data) + + +class Rotator(AlpacaDevice): + """Wrapper for ASCOM Alpaca devices of "rotator" type.""" + + def __init__(self, url, device_number=0): + """ + Initializes the Rotator interface. + """ + super().__init__(url, dev="rotator", device_number=device_number) + + @property + def is_moving(self): + """bool or None: Checks if the rotator is currently moving.""" + + res = self.get("ismoving") + if self.error: + return None + return res + + @property + + def position(self): + """float or None: The current rotator position in degrees.""" + + res = self.get("position") + if self.error: + return None + return res + + @position.setter + def position(self, s): + """ + Moves the rotator to a new absolute position. + + Parameters + ---------- + s : float + The target position in degrees. + """ + + data = {"Position": s} + self.put("moveabsolute", data=data) diff --git a/noctua/devices/astelco2.py b/noctua/devices/astelco2.py new file mode 100644 index 0000000000000000000000000000000000000000..a32a253c09fdb53f57a7408c971eb1df6aa47d25 --- /dev/null +++ b/noctua/devices/astelco2.py @@ -0,0 +1,714 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Interface with an Astelco OpenTSI-based device. + +This module provides a high-level Python interface for controlling devices +that adhere to the Astelco OpenTSI specification. It includes classes for +the main telescope, focuser, rotator, and sensors. +""" + +import telnetlib +import time +import threading + +from ..utils import check +from ..utils.logger import log +from .basedevice import BaseDevice + + +class OpenTSI(BaseDevice): + """ + Base wrapper class for Astelco OpenTSI devices. + + This class handles the low-level telnet communication, including + connection, authentication, command sending, and response parsing. + Higher-level device classes inherit from this class. + """ + + def __init__(self, url, username="admin", password="admin"): + """ + Initializes the OpenTSI device connection parameters. + + Parameters + ---------- + url : str + The device URL in "host:port" format (e.g., "192.168.1.100:22"). + username : str, optional + The username for authentication, by default "admin". + password : str, optional + The password for authentication, by default "admin". + """ + super().__init__(url) + self.host, self.port = url.split(":") + self.port = int(self.port) + self.username = username + self.password = password + self.timeout = 5 + self._cmd_id_counter = 0 + self._lock = threading.Lock() + + def _get_cmd_id(self): + """ + Generates a unique, thread-safe command ID. + + This is an internal helper method to ensure each command sent to the + device has a unique identifier for reliable response tracking. + + Returns + ------- + int + A new, unique command ID. + """ + with self._lock: + self._cmd_id_counter += 1 + return self._cmd_id_counter + + @check.telnet_errors + def _send(self, commands): + """ + Sends a batch of commands to the device and collects responses. + + This is the low-level communication handler. It establishes a telnet + connection, authenticates, sends all provided commands, and then + asynchronously collects the responses until all commands are complete + or a timeout occurs. + + Parameters + ---------- + commands : list of tuple + A list of commands to send, where each tuple is + (command_id, command_string). + + Returns + ------- + dict or None + A dictionary mapping command IDs to their responses. Each response + is a dict with 'status' and 'data' keys. Returns None if the + connection or authentication fails. + """ + tn = telnetlib.Telnet(self.host, self.port, timeout=self.timeout) + tn.read_until(b"OpenTPL server", timeout=self.timeout) + + auth_cmd = f'auth plain "{self.username}" "{self.password}"\n' + tn.write(auth_cmd.encode('utf-8')) + auth_response = tn.read_until(b"AUTH OK", timeout=self.timeout) + if b"AUTH OK" not in auth_response: + log.error("Authentication failed.") + self.error.append("Authentication failed") + return None + + res = {cmd_id: {"status": None, "data": None} for cmd_id, _ in commands} + + for cmd_id, cmd_str in commands: + full_cmd = f"{cmd_id} {cmd_str}\n".encode('utf-8') + tn.write(full_cmd) + + start_time = time.time() + completed_commands = 0 + while completed_commands < len(commands) and (time.time() - start_time) < self.timeout: + try: + line_bytes = tn.read_until(b'\n', timeout=1) + if not line_bytes: + continue + + line = line_bytes.decode('utf-8').strip() + parts = line.split(" ", 2) + if len(parts) < 2: + continue + + resp_id = int(parts[0]) + resp_type = parts[1] + + if resp_id in res: + if resp_type == "COMMAND" and parts[2] == "OK": + res[resp_id]["status"] = "OK" + elif resp_type == "DATA" and "INLINE" in parts[2]: + data_content = parts[2].split("=", 1)[1] if "=" in parts[2] else parts[2] + try: + res[resp_id]["data"] = float(data_content) + except (ValueError, TypeError): + res[resp_id]["data"] = data_content.strip('"') + elif resp_type == "COMMAND" and parts[2] == "COMPLETE": + res[resp_id]["status"] = "COMPLETE" + completed_commands += 1 + elif resp_type == "ERROR": + res[resp_id]["status"] = "ERROR" + res[resp_id]["data"] = parts[2] + self.error.append(f"Command {resp_id} failed: {parts[2]}") + completed_commands += 1 + except (UnicodeDecodeError, IndexError, ValueError): + continue + + tn.close() + return res + + def get(self, message): + """ + Retrieves a single value from the device. + + Parameters + ---------- + message : str + The OpenTSI variable to query (e.g., "POINTING.TRACK"). + + Returns + ------- + str or float or int or None + The value returned by the device, or None on error. + """ + cmd_id = self._get_cmd_id() + commands = [(cmd_id, f"GET {message}")] + + res = self._send(commands) + if self.error or not res: + return None + + if res[cmd_id]["status"] != "ERROR": + return res[cmd_id]["data"] + return None + + def put(self, key, val): + """ + Sets a single value on the device. + + Parameters + ---------- + key : str + The OpenTSI variable to set (e.g., "POINTING.TRACK"). + val : any + The value to assign to the variable. + + Returns + ------- + bool + True if the command was completed successfully, False otherwise. + """ + cmd_id = self._get_cmd_id() + commands = [(cmd_id, f"SET {key}={val}")] + + res = self._send(commands) + if self.error or not res: + return False + + if res[cmd_id]["status"] == "COMPLETE": + return True + return False + + +class Telescope(OpenTSI): + """ + High-level interface for controlling the telescope mount. + + This class provides properties and methods for common telescope + operations like tracking, slewing, parking, and querying coordinates. + """ + + @property + def tracking(self): + """ + bool or None: The tracking state of the telescope. + + - ``True`` if the telescope is currently tracking or slewing. + - ``False`` if it is idle. + - ``None`` on communication error. + """ + + res = self.get("POINTING.TRACK") + if self.error: + return None + try: + return bool(int(res)) + except (TypeError, ValueError): + return None + + @tracking.setter + def tracking(self, b): + """ + Sets the tracking state of the telescope. + + Parameters + ---------- + b : bool + - ``True`` to start tracking the currently configured target. + - ``False`` to stop tracking. + """ + + track_val = 1 if b else 0 + self.put("POINTING.TRACK", track_val) + + def track(self): + """Convenience method to start telescope tracking.""" + + self.tracking = True + + def abort(self): + """Convenience method to stop telescope tracking.""" + + self.tracking = False + + @property + def is_moving(self): + """ + bool or None: Reports if any telescope axis is currently moving. + + Checks Bit 0 of TELESCOPE.MOTION_STATE. + """ + + res = self.get("TELESCOPE.MOTION_STATE") + if self.error: + return None + try: + return bool(int(res) & 1) + except (TypeError, ValueError): + return None + + @property + def status(self): + """ + str or None: The detailed hardware status list. + + This returns a complex, delimited string from TELESCOPE.STATUS.LIST + for detailed diagnostics. + """ + + res = self.get("TELESCOPE.STATUS.LIST") + if self.error: + return None + return res + + @property + def state(self): + """ + int or None: The global operational status of the telescope. + + Returns a bit-coded integer from TELESCOPE.STATUS.GLOBAL. + 0 means operational. + """ + + res = self.get("TELESCOPE.STATUS.GLOBAL") + if self.error: + return None + return res + + def clear_error(self, n): + """ + Attempts to clear a hardware error. + + Parameters + ---------- + n : int + The error code to clear, typically the value from `state`. + """ + + return self.put("TELESCOPE.STATUS.CLEAR_ERROR", n) + + @property + def clock(self): + """float or None: The current UTC time from the device as a Unix timestamp.""" + + res = self.get("POSITION.LOCAL.UTC") + if self.error: + return None + return res + + @property + def open(self): + """ + bool or None: The state of the main mirror cover. + + - ``True`` if the cover is open. + - ``False`` if closed. + - ``None`` on error. + """ + + res = self.get("AUXILIARY.COVER.REALPOS") + if self.error: + return None + try: + return res == 1.0 + except (TypeError, ValueError): + return None + + @open.setter + def open(self, b): + """ + Opens or closes the main mirror cover. + + Parameters + ---------- + b : bool + - ``True`` to open the cover. + - ``False`` to close it. + """ + + pos = 1.0 if b else 0.0 + self.put("AUXILIARY.COVER.TARGETPOS", pos) + + @property + def park(self): + """ + bool or None: The parked state of the telescope. + + Checks Bit 6 of TELESCOPE.MOTION_STATE. + """ + + res = self.get("TELESCOPE.MOTION_STATE") + if self.error: + return None + try: + return bool(int(res) & (1 << 6)) + except (TypeError, ValueError): + return None + + @park.setter + def park(self, b): + """ + Parks or unparks the telescope. + + Parameters + ---------- + b : bool + - ``True`` to move the telescope to its park position. + - ``False`` to move it to its startup (unparked) position. + """ + + park_val = 1 if b else 0 + self.put("TELESCOPE.PARK", park_val) + + @property + def altaz(self): + """ + list or None: The current Altitude/Azimuth of the telescope. + + Returns + ------- + list + A list containing [altitude, azimuth] in degrees, or [None, None]. + """ + + cmd_id1, cmd_id2 = self._get_cmd_id(), self._get_cmd_id() + commands = [ + (cmd_id1, "GET POSITION.HORIZONTAL.ALT"), + (cmd_id2, "GET POSITION.HORIZONTAL.AZ") + ] + res = self._send(commands) + if self.error or not res: + return [None, None] + return [res[cmd_id1]["data"], res[cmd_id2]["data"]] + + @altaz.setter + def altaz(self, a): + """ + Slews the telescope to the specified Altitude/Azimuth coordinates. + + Note + ---- + This is a high-level action that calls `slew_to_altaz`. + """ + + self.slew_to_altaz(a[0], a[1]) + + def slew_to_altaz(self, alt, az): + """ + Executes a slew to the given horizontal coordinates and stops. + + Parameters + ---------- + alt : float + Target altitude in degrees. + az : float + Target azimuth in degrees. + """ + + self.put("POINTING.SETUP.DEROTATOR.SYNCMODE", 2) + self.put("OBJECT.HORIZONTAL.ALT", alt) + self.put("OBJECT.HORIZONTAL.AZ", az) + self.put("POINTING.TRACK", 2) # Go and stay there + + @property + def radec(self): + """ + list or None: The current J2000 Right Ascension/Declination. + + Returns + ------- + list + A list containing [RA, Dec] in [hours, degrees], or [None, None]. + """ + + cmd_id1, cmd_id2 = self._get_cmd_id(), self._get_cmd_id() + commands = [ + (cmd_id1, "GET POSITION.EQUATORIAL.RA_J2000"), + (cmd_id2, "GET POSITION.EQUATORIAL.DEC_J2000") + ] + res = self._send(commands) + if self.error or not res: + return [None, None] + return [res[cmd_id1]["data"], res[cmd_id2]["data"]] + + @radec.setter + def radec(self, a): + """ + Slews the telescope to the specified RA/Dec and begins tracking. + + Note + ---- + This is a high-level action that calls `track_radec`. + """ + + self.track_radec(a[0], a[1]) + + def track_radec(self, ra, dec): + """ + Executes a slew to the given equatorial coordinates and starts tracking. + + Parameters + ---------- + ra : float + Target Right Ascension in hours. + dec : float + Target Declination in degrees. + """ + + self.put("POINTING.SETUP.DEROTATOR.SYNCMODE", 2) + self.put("OBJECT.EQUATORIAL.RA", ra) + self.put("OBJECT.EQUATORIAL.DEC", dec) + self.put("POINTING.TRACK", 1) # Go and track + + @property + def offset(self): + """ + list or None: The current pointing offsets. + + These offsets are typically used for guiding adjustments. + + Returns + ------- + list + A list containing [ZD offset, Az offset] in degrees. + """ + + cmd_id1, cmd_id2 = self._get_cmd_id(), self._get_cmd_id() + commands = [ + (cmd_id1, "GET POSITION.INSTRUMENTAL.ZD.OFFSET"), + (cmd_id2, "GET POSITION.INSTRUMENTAL.AZ.OFFSET") + ] + res = self._send(commands) + if self.error or not res: + return [None, None] + return [res[cmd_id1]["data"], res[cmd_id2]["data"]] + + @offset.setter + def offset(self, a): + """ + Sets the pointing offsets for Zenith Distance and Azimuth. + + Parameters + ---------- + a : list or tuple + A list or tuple containing [zd_offset, az_offset] in degrees. + """ + + if abs(a[0]) > 0.9 or abs(a[1]) > 0.9: + log.error("Offset > 0.9 deg is too large. Aborting.") + return + self.put("POSITION.INSTRUMENTAL.ZD.OFFSET", a[0]) + self.put("POSITION.INSTRUMENTAL.AZ.OFFSET", a[1]) + + @property + def coordinates(self): + """ + dict or None: A dictionary of all major telescope coordinates. + + Returns + ------- + dict + A dictionary with the following structure: + { + "radec": [ra_hours, dec_degrees], + "altaz": [alt_degrees, az_degrees], + "lst": lst_hours, + "utc": utc_unix_timestamp + } + """ + + cmd_ids = [self._get_cmd_id() for _ in range(6)] + keys = ["POSITION.EQUATORIAL.RA_J2000", "POSITION.EQUATORIAL.DEC_J2000", + "POSITION.HORIZONTAL.ALT", "POSITION.HORIZONTAL.AZ", + "POSITION.LOCAL.SIDEREAL_TIME", "POSITION.LOCAL.UTC"] + commands = [(cid, f"GET {key}") for cid, key in zip(cmd_ids, keys)] + + res = self._send(commands) + if self.error or not res: + return None + + return { + "radec": [res[cmd_ids[0]]["data"], res[cmd_ids[1]]["data"]], + "altaz": [res[cmd_ids[2]]["data"], res[cmd_ids[3]]["data"]], + "lst": res[cmd_ids[4]]["data"], + "utc": res[cmd_ids[5]]["data"], + } + + +class Focuser(OpenTSI): + """ + Interface for controlling a telescope focuser. + """ + def __init__(self, url, index=0, **kwargs): + """ + Initializes the Focuser interface. + + Parameters + ---------- + url : str + The device URL in "host:port" format. + index : int, optional + The device index for the focuser (e.g., for hexapods), + by default 0. + """ + + super().__init__(url, **kwargs) + self.index = index + + @property + def is_moving(self): + """bool or None: Reports if the focuser is currently moving.""" + + res = self.get(f"POSITION.INSTRUMENTAL.FOCUS[{self.index}].MOTION_STATE") + if self.error: + return None + try: + return bool(int(res) & 1) + except (TypeError, ValueError): + return None + + @property + def position(self): + """ + float or None: The current focuser position in microns. + + Note + ---- + The driver converts the OpenTSI value (in mm) to microns. + """ + + res = self.get(f"POSITION.INSTRUMENTAL.FOCUS[{self.index}].REALPOS") + if self.error: + return None + try: + return float(res) * 1000 # Convert mm to microns + except (TypeError, ValueError): + return None + + @position.setter + def position(self, s): + """ + Moves the focuser to a new absolute position. + + Parameters + ---------- + s : float + The target position in microns. + """ + + pos_mm = s / 1000 + self.put(f"POSITION.INSTRUMENTAL.FOCUS[{self.index}].TARGETPOS", pos_mm) + + +class Rotator(OpenTSI): + """ + Interface for controlling a telescope instrument rotator. + """ + def __init__(self, url, index=2, **kwargs): + """ + Initializes the Rotator interface. + + Parameters + ---------- + url : str + The device URL in "host:port" format. + index : int, optional + The device index for the rotator, by default 2. + """ + + super().__init__(url, **kwargs) + self.index = index + + @property + def is_moving(self): + """bool or None: Reports if the rotator is currently moving.""" + + res = self.get(f"POSITION.INSTRUMENTAL.DEROTATOR[{self.index}].MOTION_STATE") + if self.error: + return None + try: + return bool(int(res) & 1) + except (TypeError, ValueError): + return None + + @property + def position(self): + """float or None: The current rotator position in degrees.""" + + res = self.get(f"POSITION.INSTRUMENTAL.DEROTATOR[{self.index}].REALPOS") + if self.error: + return None + return res + + @position.setter + def position(self, s): + """ + Moves the rotator to a new absolute position. + + Parameters + ---------- + s : float + The target position in degrees. + """ + + self.put(f"POSITION.INSTRUMENTAL.DEROTATOR[{self.index}].TARGETPOS", s) + + +class Sensor(OpenTSI): + """ + Interface for reading environmental sensors. + """ + + @property + def temperature(self): + """ + list or None: Reads temperature sensors from the device. + + Returns + ------- + list + A list of temperatures in Celsius [sensor2, sensor3, sensor4]. + """ + + cmd_id1, cmd_id2, cmd_id3 = self._get_cmd_id(), self._get_cmd_id(), self._get_cmd_id() + commands = [ + (cmd_id1, "GET AUXILIARY.SENSOR[2].VALUE"), + (cmd_id2, "GET AUXILIARY.SENSOR[3].VALUE"), + (cmd_id3, "GET AUXILIARY.SENSOR[4].VALUE"), + ] + res = self._send(commands) + if self.error or not res: + return [None, None, None] + return [res[cmd_id1]["data"], res[cmd_id2]["data"], res[cmd_id3]["data"]] + + @property + def humidity(self): + """ + None: Reads humidity (not specified in OpenTSI). + + Note + ---- + The OpenTSI specification does not appear to have a standard + command for humidity. This method is a placeholder and will + always return None. + """ + + log.warning("OpenTSI spec does not list a standard humidity sensor.") + return None