Skip to content
stream.py 3.18 KiB
Newer Older
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Websocket part"""

# Third-party modules
from astropy.time import Time
from astropy.io import fits
import time
import os
def send_timestamp(socketio):
    while True:
        now = Time.now()
        socketio.emit('timestamp', now.unix)
        socketio.sleep(1)
def send_status(app, socketio, url, sleep=4, once=False):
    name = url # maybe different?
    
    # First time
    with app.test_client() as client:
        old = client.get(url).get_json()
        socketio.emit(name, old)
    
    while not once:
        now = Time.now()
Davide Ricci's avatar
Davide Ricci committed
        with app.test_client() as client:
            res = client.get(url).get_json()
            if res != old:
                socketio.emit(name, res)
                old = res
        socketio.sleep(sleep)
def send_webcam(app, socketio, url, sleep=1):
Davide Ricci's avatar
Davide Ricci committed
    name = url # maybe different?
Davide Ricci's avatar
Davide Ricci committed
        with app.test_client() as client:
            res = client.get(url).get_json()
            img = res["response"].encode("ISO-8859-1")
            socketio.emit(url, bytes(img))
        socketio.sleep(sleep)
def tail_f(socketio, num_lines=30, sleep=0.25, once=False):
    today = Time.now().iso.split()[0]
    filename = f"./data/log/OARPAF.{today}.log"
    file_size = os.path.getsize(filename)

    # First time
    with open(filename, 'r') as f:
        lines = f.readlines()[-num_lines:]
        socketio.emit('new_lines', lines)  # Initial lines on first page load
    
    while not once:
        current_size = os.path.getsize(filename)
        if current_size < file_size:
            file_size = 0
        elif current_size > file_size:
            # File has been appended
            with open(filename, 'r') as f:
                f.seek(file_size)
                new_lines = f.readlines()
                file_size = current_size
                if new_lines:
                    socketio.emit('new_lines', new_lines)
        socketio.sleep(sleep)  # Sleep before checking again

def send_binary(socketio):
    import numpy as np
    from astropy.time import Time
    # while True:
    for i in range(1):
        dtype = "uint16"
        shape = (248,248)
        matrix = np.random.uniform(0, 65535, shape).astype(dtype)
        binary_data = matrix.tobytes()
        now = Time.now()
        data_bundle = {
            "shape": matrix.shape,
            "dtype": dtype,
            "data": binary_data,
            "timestamp": now.unix,
        }

        socketio.emit('data_bundle', data_bundle)
        print((Time.now()-now).sec, "emitted")
        print("-------")
        print(f"bin {len(binary_data)}")

        socketio.sleep(0.25)
def send_fits(socketio):
    """download a fits"""
        
    with fits.open("./temp.fits") as hdul:
        matrix = hdul[0].data
        header = hdul[0].header
        now = Time.now()
        data_bundle = {
            "shape": matrix.shape,
            "dtype": matrix.dtype.name,
            "data": matrix.tobytes(),
            "header": [list(c) for c in header.cards],
            "timestamp": now.unix,
        }
        socketio.emit('data_fits', data_bundle)
        print((Time.now()-now).sec, "emitted")
        print("-------")
        print(f"bin {len(matrix)}")