#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Websocket part""" # Third-party modules from astropy.time import Time import time import os def send_timestamp(socketio): while True: now = Time.now() socketio.emit('timestamp', now.unix) time.sleep(1) def send_status(app, socketio, url, timeout=1): name = url # maybe different? while True: with app.test_client() as client: res = client.get(url).get_json() socketio.emit(name, res) time.sleep(1) def send_webcam(app, socketio, url): name = url # maybe different? while True: with app.test_client() as client: res = client.get(url).get_json() img = res["response"].encode("ISO-8859-1") socketio.emit(url, bytes(img)) time.sleep(1) def tail_f(socketio, num_lines=30): today = Time.now().iso.split()[0] filename = f"./data/log/OARPAF.{today}.log" file_size = os.path.getsize(filename) while True: current_size = os.path.getsize(filename) if current_size < file_size: # File has been truncated or deleted, reset file pointer file_size = 0 with open(filename, 'r') as f: lines = f.readlines()[-num_lines:] elif current_size > file_size: # File has been appended with open(filename, 'r') as f: f.seek(file_size) new_lines = f.readlines() file_size = current_size if new_lines: socketio.emit('new_lines', new_lines) else: # File remains unchanged pass socketio.sleep(0.2) # Sleep for 2 seconds before checking again def start_broadcast(app, socketio): socketio.start_background_task(tail_f, socketio=socketio) socketio.start_background_task(send_timestamp, socketio=socketio) socketio.start_background_task(send_status, app=app, socketio=socketio, url="/api/dome/status") socketio.start_background_task(send_status, app=app, socketio=socketio, url="/api/telescope/status") socketio.start_background_task(send_status, app=app, socketio=socketio, url="/api/camera/status") socketio.start_background_task(send_status, app=app, socketio=socketio, url="/api/environment/status", timeout=40) socketio.start_background_task(send_status, app=app, socketio=socketio, url="/api/sequencer/run") socketio.start_background_task(send_webcam, app=app, socketio=socketio, url="/api/webcam/snapshot")