Skip to content
stream.py 3.88 KiB
Newer Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Websocket part"""

# Third-party modules
from astropy.time import Time
from astropy.io import fits
import time
import os

class Status:
    def __init__(self):
        self.last = {}        

    def initial(self, socketio, names):
        for name in names:
            socketio.emit(name, status.last[name])
        
status = Status()
        

def send_timestamp(socketio):
    while True:
        now = Time.now()
        socketio.emit('timestamp', now.unix)
        socketio.sleep(1)
def send_status(app, socketio, url, sleep=4, once=False):
    name = url # maybe different?
    
    # First time
    with app.test_client() as client:
        old = client.get(url).get_json()
        socketio.emit(name, old)
    
    while not once:
        now = Time.now()
Davide Ricci's avatar
Davide Ricci committed
        with app.test_client() as client:
            res = client.get(url).get_json()
            if res != old:
                socketio.emit(name, res)
                old = res
        socketio.sleep(sleep)
def send_webcam(app, socketio, url, sleep=1):
Davide Ricci's avatar
Davide Ricci committed
    name = url # maybe different?
Davide Ricci's avatar
Davide Ricci committed
        with app.test_client() as client:
            res = client.get(url).get_json()
            if res:
                try:
                    img = res["response"].encode("ISO-8859-1")
                except TypeError as e:
                    img = b""
            else:
                img = b""
                    
            status.last[name] = bytes(img)                    
            if img != old:
                socketio.emit(url, bytes(img))
                old = img
                
        socketio.sleep(sleep)
def tail_f(socketio, num_lines=30, sleep=0.25, once=False):
    today = Time.now().iso.split()[0]
    filename = f"./data/log/OARPAF.{today}.log"
    file_size = os.path.getsize(filename)

    # First time
    with open(filename, 'r') as f:
        lines = f.readlines()[-num_lines:]
        socketio.emit('new_lines', lines)  # Initial lines on first page load
    
    while not once:
        current_size = os.path.getsize(filename)
        if current_size < file_size:
            file_size = 0
        elif current_size > file_size:
            # File has been appended
            with open(filename, 'r') as f:
                f.seek(file_size)
                new_lines = f.readlines()
                file_size = current_size
                if new_lines:
                    socketio.emit('new_lines', new_lines)
        socketio.sleep(sleep)  # Sleep before checking again
def send_binary(socketio):
    import numpy as np
    from astropy.time import Time
    # while True:
    for i in range(1):
        dtype = "uint16"
        shape = (248,248)
        matrix = np.random.uniform(0, 65535, shape).astype(dtype)
        binary_data = matrix.tobytes()
        now = Time.now()
        data_bundle = {
            "shape": matrix.shape,
            "dtype": dtype,
            "data": binary_data,
            "timestamp": now.unix,
        }

        socketio.emit('data_bundle', data_bundle)
        print((Time.now()-now).sec, "emitted")
        print(f"bin {len(binary_data)}")

def send_fits(socketio):
    """download a fits"""
        
    with fits.open("./temp.fits") as hdul:
        matrix = hdul[0].data
        header = hdul[0].header
        now = Time.now()
        data_bundle = {
            "shape": matrix.shape,
            "dtype": matrix.dtype.name,
            "data": matrix.tobytes(),
            "header": [list(c) for c in header.cards],
            "timestamp": now.unix,
        }
        socketio.emit('data_fits', data_bundle)
        print((Time.now()-now).sec, "emitted")
        print(f"bin {len(matrix)}")