Skip to content
Snippets Groups Projects
Commit e8c91f0e authored by vertighel's avatar vertighel
Browse files

Google AI studio Revised versions of astelco and alpaca devices

parent 5a9387aa
No related branches found
No related tags found
No related merge requests found
Pipeline #29606 canceled
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Interface with an ASCOM Alpaca device.
"""
# System modules
import time
from urllib.parse import urlencode
# Third-party modules
import requests
# Other templates
from ..config.constants import dome_park_position
from ..utils import check
from ..utils.logger import log
from .basedevice import BaseDevice
class AlpacaDevice(BaseDevice):
"""Base wrapper class for ASCOM Alpaca devices."""
def __init__(self, url, dev=None, device_number=0):
"""
Initializes the Alpaca device.
Parameters
----------
url : str
The base URL of the Alpaca server (e.g., "http://192.168.1.100:11111").
dev : str, optional
The device type (e.g., "telescope", "dome").
device_number : int, optional
The device number on the Alpaca server, by default 0.
"""
super().__init__(url)
self.url = url
self._dev = dev
self.device_number = device_number
self.timeout = 10
@property
def addr(self):
"""
str: The full API endpoint address for the device.
"""
return f"{self.url}/api/v1/{self._dev}/{self.device_number}"
@check.request_errors
def get(self, method, params=None, timeout=None):
"""
Sends a HTTP GET request to the device.
Parameters
----------
method : str
The Alpaca API method name (e.g., "temperature").
params : dict, optional
A dictionary of query parameters.
timeout : int, optional
Request timeout in seconds. Defaults to the class timeout.
Returns
-------
any or None
The "Value" field from the Alpaca JSON response, or None on error.
"""
if params is None:
params = {}
if timeout is None:
timeout = self.timeout
res = requests.get(f"{self.addr}/{method}", params=params, timeout=timeout)
res.raise_for_status()
resj = res.json()
if resj["ErrorNumber"] != 0:
msg = f'ASCOM {resj["ErrorNumber"]}: {resj["ErrorMessage"]}'
log.error(msg)
self.error.append(msg)
return None
return resj.get("Value", None)
@check.request_errors
def put(self, method, data=None, timeout=None):
"""
Sends a HTTP PUT request to the device.
Parameters
----------
method : str
The Alpaca API method name (e.g., "connected").
data : dict, optional
A dictionary of form data to send.
timeout : int, optional
Request timeout in seconds. Defaults to the class timeout.
Returns
-------
any or None
The "Value" field if present, True on success, or None on error.
"""
if data is None:
data = {}
if timeout is None:
timeout = self.timeout
res = requests.put(f"{self.addr}/{method}", data=data, timeout=timeout)
res.raise_for_status()
resj = res.json()
if resj["ErrorNumber"] != 0:
msg = f'ASCOM {resj["ErrorNumber"]}: {resj["ErrorMessage"]}'
log.error(msg)
self.error.append(msg)
return None
return resj.get("Value", True)
@property
def name(self):
"""str or None: The name of the device."""
res = self.get("name")
if self.error:
return None
return res
@property
def connection(self):
"""bool or None: The connection status of the device."""
res = self.get("connected")
if self.error:
return None
return res
@connection.setter
def connection(self, b):
"""
Sets the connection status of the device.
Parameters
----------
b : bool
True to connect, False to disconnect.
"""
data = {"Connected": str(b)}
self.put("connected", data=data)
def command(self, cmd_string, raw=False, action="commandstring"):
"""
Sends a command string to the device via a non-standard action.
Note
----
This method uses a custom 'commandstring' or 'commandblind' action,
which is a non-standard Alpaca extension. Its use indicates that this
driver is tailored to a specific server implementation that tunnels
other protocols (like OpenTSI) through Alpaca.
Parameters
----------
cmd_string : str
The command string to execute.
raw : bool, optional
Indicates if the command is a raw command, by default False.
action : str, optional
The Alpaca action to use, either "commandstring" or "commandblind".
Returns
-------
any or None
The result of the command, or None on error.
"""
data = {"Command": cmd_string, "Raw": str(raw)}
res = self.put(action, data=data)
if self.error:
return None
return res
def commandblind(self, cmd_string, raw=False):
"""
Sends a "fire and forget" command string to the device.
See Also
--------
command
"""
return self.command(cmd_string, raw, action="commandblind")
class Switch(AlpacaDevice):
"""Wrapper for ASCOM Alpaca devices of "switch" type."""
def __init__(self, url, switch_id, device_number=0):
"""
Initializes the Switch interface.
Parameters
----------
url : str
The base URL of the Alpaca server.
switch_id : int
The ID of the specific switch device to control.
device_number : int, optional
The device number of the switch controller on the server, by default 0.
"""
super().__init__(url, dev="switch", device_number=device_number)
self.switch_id = switch_id
@property
def description(self):
"""str or None: The name or description of the switch."""
params = {"Id": self.switch_id}
res = self.get("getswitchname", params=params)
if self.error:
return None
return res
@property
def state(self):
"""
bool or None: The current state of the switch.
True if on, False if off.
"""
params = {"Id": self.switch_id}
res = self.get("getswitch", params=params)
if self.error:
return None
return res
@state.setter
def state(self, b):
"""
Sets the state of the switch.
Parameters
----------
b : bool
True to turn the switch on, False to turn it off.
"""
data = {"Id": self.switch_id, "State": str(b)}
self.put("setswitch", data=data)
class Dome(AlpacaDevice):
"""Wrapper for ASCOM Alpaca devices of "dome" type."""
def __init__(self, url, device_number=0):
"""
Initializes the Dome interface.
"""
super().__init__(url, dev="dome", device_number=device_number)
self.PARK = dome_park_position
self.TOLERANCE = 1.0 # deg tolerance for park
def park(self):
"""Puts the dome in its park position."""
self.put("park")
def sync(self, az):
"""
Synchronizes the dome's azimuth to a specific value.
"""
data = {"Azimuth": az}
self.put("synctoazimuth", data=data)
def abort(self):
"""Aborts any dome motion."""
self.put("abortslew")
@property
def is_parked(self):
"""bool or None: Checks if the dome is at its park position."""
res = self.get("atpark")
if self.error:
return None
return res
@property
def is_moving(self):
"""bool or None: Checks if the dome is slewing."""
res = self.get("slewing")
if self.error:
return None
return res
@property
def azimuth(self):
"""float or None: The current dome azimuth in degrees."""
res = self.get("azimuth")
if self.error:
return None
return res
@azimuth.setter
def azimuth(self, a):
"""
Slews the dome to a new azimuth.
Parameters
----------
a : float
The target azimuth in degrees.
"""
data = {"Azimuth": a}
self.put("slewtoazimuth", data=data)
@property
def shutter(self):
"""
int or None: The status of the dome shutter.
(0=Open, 1=Closed, 2=Opening, 3=Closing, 4=Error)
"""
res = self.get("shutterstatus")
if self.error:
return None
return res
@property
def open(self):
"""
bool or None: The dome shutter position.
True if open, False if closed, None otherwise.
"""
res = self.shutter
if res == 0:
return True
elif res == 1:
return False
return None
@open.setter
def open(self, b):
"""
Opens or closes the dome shutter.
"""
if b:
self.put("openshutter")
else:
self.put("closeshutter")
@property
def slave(self):
"""
bool or None: The slaved state of the dome.
"""
res = self.get("slaved")
if self.error:
return None
return res
@slave.setter
def slave(self, b):
"""
Slaves or un-slaves the dome from the telescope.
"""
data = {"Slaved": str(b)}
self.put("slaved", data=data)
class Telescope(AlpacaDevice):
"""Wrapper for ASCOM Alpaca devices of "telescope" type."""
def __init__(self, url, device_number=0):
"""
Initializes the Telescope interface.
"""
super().__init__(url, dev="telescope", device_number=device_number)
def track(self):
"""
Slews to the current target coordinates and begins tracking.
"""
self.tracking = True
self.put("slewtotargetasync")
def abort(self):
"""Aborts any telescope motion."""
self.put("abortslew")
@property
def is_moving(self):
"""bool or None: Checks if the telescope is slewing."""
res = self.get("slewing")
if self.error:
return None
return res
@property
def state(self):
"""
int or None: The global operational status of the telescope.
Uses a non-standard command.
"""
res = self.command("TELESCOPE.STATUS.GLOBAL")
if self.error:
return None
return res
@property
def park(self):
"""bool or None: Checks if the telescope is parked."""
res = self.get("atpark")
if self.error:
return None
return res
@park.setter
def park(self, b, timeout=120):
"""
Parks or unparks the telescope.
"""
if b:
self.put("park", timeout=timeout)
else:
self.put("unpark", timeout=timeout)
@property
def altaz(self):
"""list or None: Current [altitude, azimuth] in degrees."""
alt = self.get("altitude")
if self.error:
return [None, None]
az = self.get("azimuth")
if self.error:
return [None, None]
return [alt, az]
@altaz.setter
def altaz(self, a):
"""
Slews the telescope to the given Alt/Az coordinates.
"""
self.tracking = False
data = {"Altitude": a[0], "Azimuth": a[1]}
self.put("slewtoaltazasync", data=data)
@property
def radec(self):
"""list or None: Current [RA, Dec] in [hours, degrees]."""
ra = self.get("rightascension")
if self.error:
return [None, None]
dec = self.get("declination")
if self.error:
return [None, None]
return [ra, dec]
@radec.setter
def radec(self, a):
"""
Slews to the given RA/Dec coordinates and tracks.
"""
self.tracking = True
data = {"RightAscension": a[0], "Declination": a[1]}
self.put("slewtocoordinatesasync", data=data)
@property
def tracking(self):
"""bool or None: The sidereal tracking state of the telescope."""
res = self.get("tracking")
if self.error:
return None
return res
@tracking.setter
def tracking(self, b):
"""
Enables or disables sidereal tracking.
"""
data = {"Tracking": str(b)}
self.put("tracking", data=data)
class Focuser(AlpacaDevice):
"""Wrapper for ASCOM Alpaca devices of "focuser" type."""
def __init__(self, url, device_number=0):
"""
Initializes the Focuser interface.
"""
super().__init__(url, dev="focuser", device_number=device_number)
@property
def is_moving(self):
"""bool or None: Checks if the focuser is currently moving."""
res = self.get("ismoving")
if self.error:
return None
return res
@property
def position(self):
"""int or None: The current focuser position in steps or microns."""
res = self.get("position")
if self.error:
return None
return res
@position.setter
def position(self, s):
"""
Moves the focuser to a new absolute position.
Parameters
----------
s : int
The target position in steps or microns, as defined by the device.
"""
data = {"Position": s}
self.put("move", data=data)
class Rotator(AlpacaDevice):
"""Wrapper for ASCOM Alpaca devices of "rotator" type."""
def __init__(self, url, device_number=0):
"""
Initializes the Rotator interface.
"""
super().__init__(url, dev="rotator", device_number=device_number)
@property
def is_moving(self):
"""bool or None: Checks if the rotator is currently moving."""
res = self.get("ismoving")
if self.error:
return None
return res
@property
def position(self):
"""float or None: The current rotator position in degrees."""
res = self.get("position")
if self.error:
return None
return res
@position.setter
def position(self, s):
"""
Moves the rotator to a new absolute position.
Parameters
----------
s : float
The target position in degrees.
"""
data = {"Position": s}
self.put("moveabsolute", data=data)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Interface with an Astelco OpenTSI-based device.
This module provides a high-level Python interface for controlling devices
that adhere to the Astelco OpenTSI specification. It includes classes for
the main telescope, focuser, rotator, and sensors.
"""
import telnetlib
import time
import threading
from ..utils import check
from ..utils.logger import log
from .basedevice import BaseDevice
class OpenTSI(BaseDevice):
"""
Base wrapper class for Astelco OpenTSI devices.
This class handles the low-level telnet communication, including
connection, authentication, command sending, and response parsing.
Higher-level device classes inherit from this class.
"""
def __init__(self, url, username="admin", password="admin"):
"""
Initializes the OpenTSI device connection parameters.
Parameters
----------
url : str
The device URL in "host:port" format (e.g., "192.168.1.100:22").
username : str, optional
The username for authentication, by default "admin".
password : str, optional
The password for authentication, by default "admin".
"""
super().__init__(url)
self.host, self.port = url.split(":")
self.port = int(self.port)
self.username = username
self.password = password
self.timeout = 5
self._cmd_id_counter = 0
self._lock = threading.Lock()
def _get_cmd_id(self):
"""
Generates a unique, thread-safe command ID.
This is an internal helper method to ensure each command sent to the
device has a unique identifier for reliable response tracking.
Returns
-------
int
A new, unique command ID.
"""
with self._lock:
self._cmd_id_counter += 1
return self._cmd_id_counter
@check.telnet_errors
def _send(self, commands):
"""
Sends a batch of commands to the device and collects responses.
This is the low-level communication handler. It establishes a telnet
connection, authenticates, sends all provided commands, and then
asynchronously collects the responses until all commands are complete
or a timeout occurs.
Parameters
----------
commands : list of tuple
A list of commands to send, where each tuple is
(command_id, command_string).
Returns
-------
dict or None
A dictionary mapping command IDs to their responses. Each response
is a dict with 'status' and 'data' keys. Returns None if the
connection or authentication fails.
"""
tn = telnetlib.Telnet(self.host, self.port, timeout=self.timeout)
tn.read_until(b"OpenTPL server", timeout=self.timeout)
auth_cmd = f'auth plain "{self.username}" "{self.password}"\n'
tn.write(auth_cmd.encode('utf-8'))
auth_response = tn.read_until(b"AUTH OK", timeout=self.timeout)
if b"AUTH OK" not in auth_response:
log.error("Authentication failed.")
self.error.append("Authentication failed")
return None
res = {cmd_id: {"status": None, "data": None} for cmd_id, _ in commands}
for cmd_id, cmd_str in commands:
full_cmd = f"{cmd_id} {cmd_str}\n".encode('utf-8')
tn.write(full_cmd)
start_time = time.time()
completed_commands = 0
while completed_commands < len(commands) and (time.time() - start_time) < self.timeout:
try:
line_bytes = tn.read_until(b'\n', timeout=1)
if not line_bytes:
continue
line = line_bytes.decode('utf-8').strip()
parts = line.split(" ", 2)
if len(parts) < 2:
continue
resp_id = int(parts[0])
resp_type = parts[1]
if resp_id in res:
if resp_type == "COMMAND" and parts[2] == "OK":
res[resp_id]["status"] = "OK"
elif resp_type == "DATA" and "INLINE" in parts[2]:
data_content = parts[2].split("=", 1)[1] if "=" in parts[2] else parts[2]
try:
res[resp_id]["data"] = float(data_content)
except (ValueError, TypeError):
res[resp_id]["data"] = data_content.strip('"')
elif resp_type == "COMMAND" and parts[2] == "COMPLETE":
res[resp_id]["status"] = "COMPLETE"
completed_commands += 1
elif resp_type == "ERROR":
res[resp_id]["status"] = "ERROR"
res[resp_id]["data"] = parts[2]
self.error.append(f"Command {resp_id} failed: {parts[2]}")
completed_commands += 1
except (UnicodeDecodeError, IndexError, ValueError):
continue
tn.close()
return res
def get(self, message):
"""
Retrieves a single value from the device.
Parameters
----------
message : str
The OpenTSI variable to query (e.g., "POINTING.TRACK").
Returns
-------
str or float or int or None
The value returned by the device, or None on error.
"""
cmd_id = self._get_cmd_id()
commands = [(cmd_id, f"GET {message}")]
res = self._send(commands)
if self.error or not res:
return None
if res[cmd_id]["status"] != "ERROR":
return res[cmd_id]["data"]
return None
def put(self, key, val):
"""
Sets a single value on the device.
Parameters
----------
key : str
The OpenTSI variable to set (e.g., "POINTING.TRACK").
val : any
The value to assign to the variable.
Returns
-------
bool
True if the command was completed successfully, False otherwise.
"""
cmd_id = self._get_cmd_id()
commands = [(cmd_id, f"SET {key}={val}")]
res = self._send(commands)
if self.error or not res:
return False
if res[cmd_id]["status"] == "COMPLETE":
return True
return False
class Telescope(OpenTSI):
"""
High-level interface for controlling the telescope mount.
This class provides properties and methods for common telescope
operations like tracking, slewing, parking, and querying coordinates.
"""
@property
def tracking(self):
"""
bool or None: The tracking state of the telescope.
- ``True`` if the telescope is currently tracking or slewing.
- ``False`` if it is idle.
- ``None`` on communication error.
"""
res = self.get("POINTING.TRACK")
if self.error:
return None
try:
return bool(int(res))
except (TypeError, ValueError):
return None
@tracking.setter
def tracking(self, b):
"""
Sets the tracking state of the telescope.
Parameters
----------
b : bool
- ``True`` to start tracking the currently configured target.
- ``False`` to stop tracking.
"""
track_val = 1 if b else 0
self.put("POINTING.TRACK", track_val)
def track(self):
"""Convenience method to start telescope tracking."""
self.tracking = True
def abort(self):
"""Convenience method to stop telescope tracking."""
self.tracking = False
@property
def is_moving(self):
"""
bool or None: Reports if any telescope axis is currently moving.
Checks Bit 0 of TELESCOPE.MOTION_STATE.
"""
res = self.get("TELESCOPE.MOTION_STATE")
if self.error:
return None
try:
return bool(int(res) & 1)
except (TypeError, ValueError):
return None
@property
def status(self):
"""
str or None: The detailed hardware status list.
This returns a complex, delimited string from TELESCOPE.STATUS.LIST
for detailed diagnostics.
"""
res = self.get("TELESCOPE.STATUS.LIST")
if self.error:
return None
return res
@property
def state(self):
"""
int or None: The global operational status of the telescope.
Returns a bit-coded integer from TELESCOPE.STATUS.GLOBAL.
0 means operational.
"""
res = self.get("TELESCOPE.STATUS.GLOBAL")
if self.error:
return None
return res
def clear_error(self, n):
"""
Attempts to clear a hardware error.
Parameters
----------
n : int
The error code to clear, typically the value from `state`.
"""
return self.put("TELESCOPE.STATUS.CLEAR_ERROR", n)
@property
def clock(self):
"""float or None: The current UTC time from the device as a Unix timestamp."""
res = self.get("POSITION.LOCAL.UTC")
if self.error:
return None
return res
@property
def open(self):
"""
bool or None: The state of the main mirror cover.
- ``True`` if the cover is open.
- ``False`` if closed.
- ``None`` on error.
"""
res = self.get("AUXILIARY.COVER.REALPOS")
if self.error:
return None
try:
return res == 1.0
except (TypeError, ValueError):
return None
@open.setter
def open(self, b):
"""
Opens or closes the main mirror cover.
Parameters
----------
b : bool
- ``True`` to open the cover.
- ``False`` to close it.
"""
pos = 1.0 if b else 0.0
self.put("AUXILIARY.COVER.TARGETPOS", pos)
@property
def park(self):
"""
bool or None: The parked state of the telescope.
Checks Bit 6 of TELESCOPE.MOTION_STATE.
"""
res = self.get("TELESCOPE.MOTION_STATE")
if self.error:
return None
try:
return bool(int(res) & (1 << 6))
except (TypeError, ValueError):
return None
@park.setter
def park(self, b):
"""
Parks or unparks the telescope.
Parameters
----------
b : bool
- ``True`` to move the telescope to its park position.
- ``False`` to move it to its startup (unparked) position.
"""
park_val = 1 if b else 0
self.put("TELESCOPE.PARK", park_val)
@property
def altaz(self):
"""
list or None: The current Altitude/Azimuth of the telescope.
Returns
-------
list
A list containing [altitude, azimuth] in degrees, or [None, None].
"""
cmd_id1, cmd_id2 = self._get_cmd_id(), self._get_cmd_id()
commands = [
(cmd_id1, "GET POSITION.HORIZONTAL.ALT"),
(cmd_id2, "GET POSITION.HORIZONTAL.AZ")
]
res = self._send(commands)
if self.error or not res:
return [None, None]
return [res[cmd_id1]["data"], res[cmd_id2]["data"]]
@altaz.setter
def altaz(self, a):
"""
Slews the telescope to the specified Altitude/Azimuth coordinates.
Note
----
This is a high-level action that calls `slew_to_altaz`.
"""
self.slew_to_altaz(a[0], a[1])
def slew_to_altaz(self, alt, az):
"""
Executes a slew to the given horizontal coordinates and stops.
Parameters
----------
alt : float
Target altitude in degrees.
az : float
Target azimuth in degrees.
"""
self.put("POINTING.SETUP.DEROTATOR.SYNCMODE", 2)
self.put("OBJECT.HORIZONTAL.ALT", alt)
self.put("OBJECT.HORIZONTAL.AZ", az)
self.put("POINTING.TRACK", 2) # Go and stay there
@property
def radec(self):
"""
list or None: The current J2000 Right Ascension/Declination.
Returns
-------
list
A list containing [RA, Dec] in [hours, degrees], or [None, None].
"""
cmd_id1, cmd_id2 = self._get_cmd_id(), self._get_cmd_id()
commands = [
(cmd_id1, "GET POSITION.EQUATORIAL.RA_J2000"),
(cmd_id2, "GET POSITION.EQUATORIAL.DEC_J2000")
]
res = self._send(commands)
if self.error or not res:
return [None, None]
return [res[cmd_id1]["data"], res[cmd_id2]["data"]]
@radec.setter
def radec(self, a):
"""
Slews the telescope to the specified RA/Dec and begins tracking.
Note
----
This is a high-level action that calls `track_radec`.
"""
self.track_radec(a[0], a[1])
def track_radec(self, ra, dec):
"""
Executes a slew to the given equatorial coordinates and starts tracking.
Parameters
----------
ra : float
Target Right Ascension in hours.
dec : float
Target Declination in degrees.
"""
self.put("POINTING.SETUP.DEROTATOR.SYNCMODE", 2)
self.put("OBJECT.EQUATORIAL.RA", ra)
self.put("OBJECT.EQUATORIAL.DEC", dec)
self.put("POINTING.TRACK", 1) # Go and track
@property
def offset(self):
"""
list or None: The current pointing offsets.
These offsets are typically used for guiding adjustments.
Returns
-------
list
A list containing [ZD offset, Az offset] in degrees.
"""
cmd_id1, cmd_id2 = self._get_cmd_id(), self._get_cmd_id()
commands = [
(cmd_id1, "GET POSITION.INSTRUMENTAL.ZD.OFFSET"),
(cmd_id2, "GET POSITION.INSTRUMENTAL.AZ.OFFSET")
]
res = self._send(commands)
if self.error or not res:
return [None, None]
return [res[cmd_id1]["data"], res[cmd_id2]["data"]]
@offset.setter
def offset(self, a):
"""
Sets the pointing offsets for Zenith Distance and Azimuth.
Parameters
----------
a : list or tuple
A list or tuple containing [zd_offset, az_offset] in degrees.
"""
if abs(a[0]) > 0.9 or abs(a[1]) > 0.9:
log.error("Offset > 0.9 deg is too large. Aborting.")
return
self.put("POSITION.INSTRUMENTAL.ZD.OFFSET", a[0])
self.put("POSITION.INSTRUMENTAL.AZ.OFFSET", a[1])
@property
def coordinates(self):
"""
dict or None: A dictionary of all major telescope coordinates.
Returns
-------
dict
A dictionary with the following structure:
{
"radec": [ra_hours, dec_degrees],
"altaz": [alt_degrees, az_degrees],
"lst": lst_hours,
"utc": utc_unix_timestamp
}
"""
cmd_ids = [self._get_cmd_id() for _ in range(6)]
keys = ["POSITION.EQUATORIAL.RA_J2000", "POSITION.EQUATORIAL.DEC_J2000",
"POSITION.HORIZONTAL.ALT", "POSITION.HORIZONTAL.AZ",
"POSITION.LOCAL.SIDEREAL_TIME", "POSITION.LOCAL.UTC"]
commands = [(cid, f"GET {key}") for cid, key in zip(cmd_ids, keys)]
res = self._send(commands)
if self.error or not res:
return None
return {
"radec": [res[cmd_ids[0]]["data"], res[cmd_ids[1]]["data"]],
"altaz": [res[cmd_ids[2]]["data"], res[cmd_ids[3]]["data"]],
"lst": res[cmd_ids[4]]["data"],
"utc": res[cmd_ids[5]]["data"],
}
class Focuser(OpenTSI):
"""
Interface for controlling a telescope focuser.
"""
def __init__(self, url, index=0, **kwargs):
"""
Initializes the Focuser interface.
Parameters
----------
url : str
The device URL in "host:port" format.
index : int, optional
The device index for the focuser (e.g., for hexapods),
by default 0.
"""
super().__init__(url, **kwargs)
self.index = index
@property
def is_moving(self):
"""bool or None: Reports if the focuser is currently moving."""
res = self.get(f"POSITION.INSTRUMENTAL.FOCUS[{self.index}].MOTION_STATE")
if self.error:
return None
try:
return bool(int(res) & 1)
except (TypeError, ValueError):
return None
@property
def position(self):
"""
float or None: The current focuser position in microns.
Note
----
The driver converts the OpenTSI value (in mm) to microns.
"""
res = self.get(f"POSITION.INSTRUMENTAL.FOCUS[{self.index}].REALPOS")
if self.error:
return None
try:
return float(res) * 1000 # Convert mm to microns
except (TypeError, ValueError):
return None
@position.setter
def position(self, s):
"""
Moves the focuser to a new absolute position.
Parameters
----------
s : float
The target position in microns.
"""
pos_mm = s / 1000
self.put(f"POSITION.INSTRUMENTAL.FOCUS[{self.index}].TARGETPOS", pos_mm)
class Rotator(OpenTSI):
"""
Interface for controlling a telescope instrument rotator.
"""
def __init__(self, url, index=2, **kwargs):
"""
Initializes the Rotator interface.
Parameters
----------
url : str
The device URL in "host:port" format.
index : int, optional
The device index for the rotator, by default 2.
"""
super().__init__(url, **kwargs)
self.index = index
@property
def is_moving(self):
"""bool or None: Reports if the rotator is currently moving."""
res = self.get(f"POSITION.INSTRUMENTAL.DEROTATOR[{self.index}].MOTION_STATE")
if self.error:
return None
try:
return bool(int(res) & 1)
except (TypeError, ValueError):
return None
@property
def position(self):
"""float or None: The current rotator position in degrees."""
res = self.get(f"POSITION.INSTRUMENTAL.DEROTATOR[{self.index}].REALPOS")
if self.error:
return None
return res
@position.setter
def position(self, s):
"""
Moves the rotator to a new absolute position.
Parameters
----------
s : float
The target position in degrees.
"""
self.put(f"POSITION.INSTRUMENTAL.DEROTATOR[{self.index}].TARGETPOS", s)
class Sensor(OpenTSI):
"""
Interface for reading environmental sensors.
"""
@property
def temperature(self):
"""
list or None: Reads temperature sensors from the device.
Returns
-------
list
A list of temperatures in Celsius [sensor2, sensor3, sensor4].
"""
cmd_id1, cmd_id2, cmd_id3 = self._get_cmd_id(), self._get_cmd_id(), self._get_cmd_id()
commands = [
(cmd_id1, "GET AUXILIARY.SENSOR[2].VALUE"),
(cmd_id2, "GET AUXILIARY.SENSOR[3].VALUE"),
(cmd_id3, "GET AUXILIARY.SENSOR[4].VALUE"),
]
res = self._send(commands)
if self.error or not res:
return [None, None, None]
return [res[cmd_id1]["data"], res[cmd_id2]["data"], res[cmd_id3]["data"]]
@property
def humidity(self):
"""
None: Reads humidity (not specified in OpenTSI).
Note
----
The OpenTSI specification does not appear to have a standard
command for humidity. This method is a placeholder and will
always return None.
"""
log.warning("OpenTSI spec does not list a standard humidity sensor.")
return None
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment