diff --git a/noctua/config/devices.ini b/noctua/config/devices.ini index bc7814b171059f2c8301ec8814ea7cef407c8901..f2228c21c58634ac0b299099ee9c03a33f852da1 100644 --- a/noctua/config/devices.ini +++ b/noctua/config/devices.ini @@ -61,7 +61,7 @@ node = ASCOM_REMOTE outlet = 3 [cam] -module = stx +module = stx3 class = Camera node = STX diff --git a/noctua/devices/stx.py b/noctua/devices/stx.py index 3366bf678b9f645777fab041080d4b730b2a8535..ed9c75293d20e396eed246e1eea000bd2bcf1499 100644 --- a/noctua/devices/stx.py +++ b/noctua/devices/stx.py @@ -6,6 +6,7 @@ Interface with a SBIG STX camera device """ # System modules +import time from datetime import datetime from urllib.parse import urlencode @@ -28,6 +29,17 @@ class STX(BaseDevice): self.url = url self.addr = self.url self.timeout = 3 + self._last_command_time = 0 + self._command_interval = 0.05 # 50 milliseconds + + def _wait_if_needed(self): + """ + Ensures the 50ms command interval is respected between commands. + """ + elapsed = time.time() - self._last_command_time + if elapsed < self._command_interval: + time.sleep(self._command_interval - elapsed) + self._last_command_time = time.time() @property def connection(self): @@ -48,6 +60,7 @@ class STX(BaseDevice): def get(self, method, params=[]): '''Send a HTTP GET request to the device address.''' + self._wait_if_needed() # Wait before sending command res = requests.get(f"{self.addr}/{method}.cgi", params="&".join(params), timeout=self.timeout, verify=False) @@ -71,7 +84,8 @@ class STX(BaseDevice): @check.request_errors def put(self, method, params={}): '''Send a HTTP GET request to the device address.''' - + + self._wait_if_needed() # Wait before sending command res = requests.get(f"{self.addr}/{method}.cgi", params=urlencode(params), timeout=self.timeout) diff --git a/noctua/devices/stx2.py b/noctua/devices/stx2.py new file mode 100644 index 0000000000000000000000000000000000000000..420b7d672571c7a0bfea4f7163e6e85c79cc0fe0 --- /dev/null +++ b/noctua/devices/stx2.py @@ -0,0 +1,529 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Interface with a SBIG STX camera device +""" + +# System modules +import time +from datetime import datetime +from urllib.parse import urlencode + +# Third-party modules +import requests + +# Other templates +from ..config.constants import temp_fits +from ..utils import check +from ..utils.logger import log +from .basedevice import BaseDevice + + +class STX(BaseDevice): + """ + Base wrapper class for SBIG STX cameras. + Handles low-level communication and timing constraints. + """ + + def __init__(self, url): + """ + Initializes the STX device interface. + + Parameters + ---------- + url : str + The IP address or hostname of the camera. + """ + super().__init__(url) + self.url = url + self.addr = self.url + self.timeout = 3 + self._last_command_time = 0 + self._command_interval = 0.05 # 50 milliseconds + + def _wait_if_needed(self): + """ + Ensures the 50ms command interval is respected between commands. + """ + elapsed = time.time() - self._last_command_time + if elapsed < self._command_interval: + time.sleep(self._command_interval - elapsed) + self._last_command_time = time.time() + + @property + def connection(self): + """ + Checks the connection to the camera. + + Returns + ------- + bool + True if the camera responds to a version query, False otherwise. + """ + try: + self._wait_if_needed() + res = requests.get(f"{self.addr}/VersionNumbers.cgi", + timeout=self.timeout) + res.raise_for_status() + return True + except Exception: + self.error = ["Camera not connected"] + return False + + @check.request_errors + def get(self, method, params=None): + """ + Sends a HTTP GET request to the device. + + Parameters + ---------- + method : str + The API method name (e.g., "ImagerGetSettings"). + params : list of str, optional + A list of parameter names to query. + + Returns + ------- + str or list of str or float or list of float or None or list of None + A single value or a list of values from the camera. + Type conversion to float is attempted. Returns None on error. + """ + self._wait_if_needed() + if params is None: + params = [] + + res = requests.get(f"{self.addr}/{method}.cgi", + params="&".join(params), + timeout=self.timeout, verify=False) + res.raise_for_status() + + values = res.text.split("\r\n") + if not values[-1]: + values = values[:-1] + + processed_values = [] + for v in values: + try: + processed_values.append(float(v)) + except (ValueError, TypeError): + processed_values.append(v) + + if len(processed_values) == 1: + return processed_values[0] + return processed_values + + @check.request_errors + def put(self, method, params=None): + """ + Sends a command (as a HTTP GET request) to the device. + + Parameters + ---------- + method : str + The API method name (e.g., "ImagerSetSettings"). + params : dict, optional + A dictionary of parameter names and their values. + + Returns + ------- + str or list of str or None + The response text from the camera, or None on error. + """ + self._wait_if_needed() + if params is None: + params = {} + + res = requests.get(f"{self.addr}/{method}.cgi", + params=urlencode(params), + timeout=self.timeout) + res.raise_for_status() + + text = res.text.split("\r\n") + if not text[-1]: + text = text[:-1] + if len(text) == 1: + return text[0] + return text + + +class Camera(STX): + """ + High-level interface for the SBIG STX camera imaging CCD. + """ + + def abort(self): + """Aborts the current exposure.""" + self.put("ImagerAbortExposure") + + def start(self, duration, frametype, dt=None): + """ + Starts an exposure if the camera is idle. + + Parameters + ---------- + duration : float + Exposure time in seconds. + frametype : int + The type of frame (0=Dark, 1=Light, 2=Bias, 3=Flat). + dt : str, optional + ISO-formatted datetime string for the FITS header. + If None, the current UTC time is used. + """ + if self.state != 0: + log.error(f"Cannot start exposure, camera is not idle. State: {self.state}") + self.error.append("Camera not idle") + return + + if dt is None: + dt = datetime.utcnow().isoformat(timespec='milliseconds') + + params = {"Duration": duration, + "FrameType": frametype, + "DateTime": dt} + self.put("ImagerStartExposure", params=params) + + def download(self, filepath=temp_fits): + """ + Downloads the last completed image from the camera buffer. + + Parameters + ---------- + filepath : str, optional + The local path to save the FITS file to. + """ + self._wait_if_needed() + res = requests.get(f"{self.addr}/Imager.FIT") + with open(filepath, 'wb') as f: + f.write(res.content) + + def set_window(self, start_x, start_y, width, height): + """ + Sets the imaging sub-frame in a single, safe command. + Coordinates are for a 1x1 binned frame. + + Parameters + ---------- + start_x : int + The starting X pixel. + start_y : int + The starting Y pixel. + width : int + The width of the sub-frame in pixels. + height : int + The height of the sub-frame in pixels. + """ + if self.state != 0: + log.error(f"Cannot set window, camera is not idle. State: {self.state}") + self.error.append("Camera not idle") + return + + params = { + "StartX": start_x, + "StartY": start_y, + "NumX": width, + "NumY": height + } + + self.put("ImagerSetSettings", params=params) + + def full_frame(self): + """Sets the camera to use the full sensor area.""" + + params = ["CameraXSize", "CameraYSize"] + cam_x, cam_y = self.get("ImagerGetSettings", params=params) + if self.error: return [None, None] + + self.set_window(0, 0, int(cam_x), int(cam_y)) + + def half_frame(self): + """Sets the camera to use a centered 50% sub-frame.""" + + params = ["CameraXSize", "CameraYSize"] + cam_x, cam_y = self.get("ImagerGetSettings", params=params) + if self.error: return [None, None] + + start_x = int(cam_x) // 4 + start_y = int(cam_y) // 4 + width = int(cam_x) // 2 + height = int(cam_y) // 2 + + self.set_window(start_x, start_y, width, height) + + def small_frame(self): + """Sets the camera to use a centered 10% sub-frame.""" + + params = ["CameraXSize", "CameraYSize"] + cam_x, cam_y = self.get("ImagerGetSettings", params=params) + if self.error: return [None, None] + + cam_x, cam_y = res + start_x = int(cam_x) * 9 // 20 + start_y = int(cam_y) * 9 // 20 + width = int(cam_x) // 10 + height = int(cam_y) // 10 + + self.set_window(start_x, start_y, width, height) + + @property + def binning(self): + """list of int: The current [X, Y] binning factor.""" + + params = ["BinX", "BinY"] + binx, biny = self.get("ImagerGetSettings", params=params) + if self.error: return [None, None] + + return [int(binx), int(biny)] + + @binning.setter + def binning(self, b): + + if self.state != 0: + log.error(f"Cannot change binning, camera is not idle. State: {self.state}") + self.error.append("Camera not idle") + return + + params = {"BinX": b[0], "BinY": b[1]} + self.put("ImagerSetSettings", params=params) + + @property + def filter(self): + """int: The currently selected filter position (1-8).""" + + params = ["CurrentFilter"] + res = self.get("GetFilterSetting", params=params) + if self.error: return None + + return res + + @property + def cooler(self): + """bool: The current state of the CCD cooler (True=On, False=Off).""" + + params = ["CoolerState"] + res = self.get("ImagerGetSettings", params=params) + if self.error: return None + + return bool(res) + + @cooler.setter + def cooler(self, b): + + if self.state != 0: + log.error(f"Cannot change cooler state, camera is not idle. State: {self.state}") + self.error.append("Camera not idle") + return + + params = {"CoolerState": "1" if b else "0"} + self.put("ImagerSetSettings", params=params) + + @filter.setter + def filter(self, n): + + if self.is_moving != 0: + log.error(f"Cannot change filter, filter wheel is moving. State: {self.is_moving}") + self.error.append("Filter wheel busy") + return + + params = {"NewPosition": n} + self.put("ChangeFilter", params=params) + + @property + def temperature(self): + """float: The current CCD temperature in degrees Celsius.""" + + params = ["CCDTemperature"] + res = self.get("ImagerGetSettings", params=params) + if self.error: return None + + return round(res, 1) + + @temperature.setter + def temperature(self, t): + + if self.state != 0: + log.error(f"Cannot change temperature, camera is not idle. State: {self.state}") + self.error.append("Camera not idle") + return + + params = {"CCDTemperatureSetpoint": t} + self.put("ImagerSetSettings", params=params) + + @property + def all(self): + """dict: A comprehensive dictionary of the current camera state.""" + + params = ["AmbientTemperature", "CCDTemperatureSetpoint", "CCDTemperature", + "CoolerState", "CoolerPower", "BinX", "BinY", "CameraXSize", + "CameraYSize", "StartX", "StartY", "NumX", "NumY"] + res = self.get("ImagerGetSettings", params=params) + + if self.error or not res or len(res) != len(params): + return {} + + ambient, setpoint, temp, cool, fan, binx, biny, camx, camy, startx, starty, numx, numy = res + b_x, b_y = int(binx), int(biny) + x_start, y_start = int(startx) // b_x, int(starty) // b_y + x_end = (int(startx) + int(numx)) // b_x + y_end = (int(starty) + int(numy)) // b_y + + return {"ambient": round(ambient, 1), + "setpoint": round(setpoint, 1), + "temperature": round(temp, 1), + "cooler": bool(cool), + "fan": round(fan, 0), + "binning": [b_x, b_y], + "max_range": [int(camx) // b_x, int(camy) // b_y], + "xystart": [x_start, y_start], + "xyend": [x_end, y_end], + "xrange": [x_start, x_end], + "yrange": [y_start, y_end], + "center": [int(camx) // b_x // 2, int(camy) // b_y // 2]} + + @property + def ambient(self): + """float: The ambient temperature in degrees Celsius.""" + + params = ["AmbientTemperature"] + res = self.get("ImagerGetSettings", params=params) + if self.error: return None + + return round(res, 1) + + @property + def center(self): + """list of int: The sensor's [X, Y] center coordinates for current binning.""" + + params = ["CameraXSize", "CameraYSize", "BinX", "BinY"] + camx, camy, binx, biny = self.get("ImagerGetSettings", params=params) + if self.error: return [None, None] + + return [int(camx) // int(binx) // 2, int(camy) // int(biny) // 2] + + @property + def description(self): + """str: The camera model description string.""" + + res = self.get("Description") + if self.error: return None + + return res + + @property + def fan(self): + """int: The current cooler power level in percent.""" + + params = ["CoolerPower"] + res = self.get("ImagerGetSettings", params=params) + if self.error: return None + + return round(res) + + @property + def is_moving(self): + """int: The current state of the filter wheel. + 0=Idle, 1=Moving, 2=Error. + """ + + res = self.get("FilterState") + if self.error: return None + + return res + + @property + def max_range(self): + """list of int: The maximum sensor dimensions [X, Y] for current binning.""" + + params = ["CameraXSize", "CameraYSize"] + b_x, b_y = self.get("ImagerGetSettings", params=params) + if self.error: return [None, None] + + b_x, b_y = self.binning + return [int(res[0]) // b_x, int(res[1]) // b_y] + + @property + def setpoint(self): + """float: The current CCD temperature setpoint in degrees Celsius.""" + + params = ["CCDTemperatureSetpoint"] + res = self.get("ImagerGetSettings", params=params) + if self.error: return None + + return res + + @property + def state(self): + """int: The current state of the imaging CCD. + 0=Idle, 2=Exposing, 3=Reading, 5=Error. + """ + + res = self.get("ImagerState") + if self.error: return None + + return res + + @property + def ready(self): + """int: The status of the image buffer. + 1 if an image is ready for download, 0 otherwise. + """ + + res = self.get("ImagerImageReady") + if self.error: return None + + return res + + @property + def version(self): + """list of str: Camera firmware and API version numbers.""" + + res = self.get("VersionNumbers") + if self.error: return None + + return res + + @property + def xrange(self): + """list of int: The current [start, end] X-axis window for current binning.""" + + params = ["StartX", "NumX", "BinX"] + startx, numx, binx = self.get("ImagerGetSettings", params=params) + if self.error: return [None, None] + + x_start = int(startx) // int(binx) + x_end = (int(startx) + int(numx)) // int(binx) + return [x_start, x_end] + + @property + def yrange(self): + """list of int: The current [start, end] Y-axis window for current binning.""" + + params = ["StartY", "NumY", "BinY"] + starty, numy, biny = self.get("ImagerGetSettings", params=params) + if self.error: return [None, None] + + y_start = int(starty) // int(biny) + y_end = (int(starty) + int(numy)) // int(biny) + return [y_start, y_end] + + @property + def xystart(self): + """list of int: The current [X, Y] start coordinates for current binning.""" + + params = ["StartX", "StartY", "BinX", "BinY"] + startx, starty, binx, biny = self.get("ImagerGetSettings", params=params) + if self.error: return [None, None] + + return [int(startx) // int(binx), int(starty) // int(biny)] + + @property + def xyend(self): + """list of int: The current [X, Y] end coordinates for current binning.""" + + params = ["StartX", "StartY", "NumX", "NumY", "BinX", "BinY"] + startx, starty, numx, numy, binx, biny = self.get("ImagerGetSettings", params=params) + if self.error: return [None, None] + + x_end = (int(startx) + int(numx)) // int(binx) + y_end = (int(starty) + int(numy)) // int(biny) + return [x_end, y_end]