#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Websocket part""" # Third-party modules from astropy.time import Time import time import os def send_timestamp(socketio): while True: now = Time.now() socketio.emit('timestamp', now.unix) time.sleep(1) def send_status(app, socketio, url, timeout=1): name = url # maybe different? while True: with app.test_client() as client: res = client.get(url).get_json() socketio.emit(name, res) time.sleep(1) def send_webcam(app, socketio, url): name = url # maybe different? while True: with app.test_client() as client: res = client.get(url).get_json() img = res["response"].encode("ISO-8859-1") socketio.emit(url, bytes(img)) time.sleep(1) def tail_f(socketio, num_lines=30): today = Time.now().iso.split()[0] filename = f"./data/log/OARPAF.{today}.log" file_size = os.path.getsize(filename) while True: current_size = os.path.getsize(filename) if current_size < file_size: # File has been truncated or deleted, reset file pointer file_size = 0 with open(filename, 'r') as f: lines = f.readlines()[-num_lines:] elif current_size > file_size: # File has been appended with open(filename, 'r') as f: f.seek(file_size) new_lines = f.readlines() file_size = current_size if new_lines: socketio.emit('new_lines', new_lines) else: # File remains unchanged pass socketio.sleep(0.2) # Sleep for 2 seconds before checking again def send_binary(socketio): import numpy as np from astropy.time import Time # while True: for i in range(1): now = Time.now() dtype = "uint16" shape = (2048,2048) matrix = np.random.uniform(0, 65535, shape).astype(dtype) binary_data = matrix.tobytes() data_bundle = { "shape": matrix.shape, "dtype": dtype, "data": binary_data, "timestamp": now.unix, } socketio.emit('data_bundle', data_bundle) print((Time.now()-now).sec, "emitted") print("-------") print(f"bin {len(binary_data)}") time.sleep(0.25) def start_broadcast(app, socketio): print("Starting broadcast") socketio.start_background_task(send_binary, socketio=socketio) socketio.start_background_task(send_timestamp, socketio=socketio) socketio.start_background_task(send_status, app=app, socketio=socketio, url="/api/dome/status") socketio.start_background_task(send_status, app=app, socketio=socketio, url="/api/telescope/status") socketio.start_background_task(send_status, app=app, socketio=socketio, url="/api/camera/status") socketio.start_background_task(send_status, app=app, socketio=socketio, url="/api/environment/status", timeout=40) socketio.start_background_task(send_status, app=app, socketio=socketio, url="/api/sequencer/run") socketio.start_background_task(send_webcam, app=app, socketio=socketio, url="/api/webcam/snapshot") socketio.start_background_task(tail_f, socketio=socketio)